roundabout/
message.rs

1mod buffer;
2pub mod bus;
3pub mod vec;
4
5use crate::message::vec::MessageVec;
6use crate::util::triple::TripleBufferedHead;
7use crate::util::IndexSet;
8use std::alloc::Layout;
9use std::any::TypeId;
10use std::fmt::Formatter;
11use std::marker::PhantomData;
12use std::ops::Deref;
13
14// TODO:
15//  const message index POC
16//  should also work with once cell and lazy, would be slower but more safe and support more OS (e.g. Mac, Non Unixes/windows)
17// #[dynamic(10)]
18// static GLOBAL_MESSAGE_COUNTER: AtomicUsize = AtomicUsize::new(0);
19//
20// ------------------------------------------------------------------------------
21// GENERATED:
22// ------------------------------------------------------------------------------
23// // Unix and Windows support only, best performance
24// #[dynamic(0)]
25// static mut ID_MESSAGE_INDEX: usize = unsafe { GLOBAL_MESSAGE_COUNTER.fetch_add(1, Ordering::Relaxed) };
26//
27// pub trait Message: Sized + Send + Sync + 'static {
28//     #[inline]
29//     unsafe fn index() -> usize {
30//         *ID_MESSAGE_INDEX
31//     }
32// }
33
34pub struct PaddingMessage;
35
36impl PaddingMessage {
37    pub(crate) const MESSAGE_INDEX: usize = 0;
38}
39static_assertions::assert_eq_size!(PaddingMessage, ());
40
41pub struct ShutdownCommand(PhantomData<()>);
42
43impl ShutdownCommand {
44    pub(crate) const MESSAGE_INDEX: usize = 1;
45
46    pub(crate) fn new() -> Self {
47        // non public constructable shutdown message
48        Self(Default::default())
49    }
50}
51
52static_assertions::assert_eq_size!(ShutdownCommand, ());
53
54pub struct ShutdownRequestedEvent(PhantomData<()>);
55
56impl ShutdownRequestedEvent {
57    pub(crate) fn new() -> Self {
58        // non public constructable shutdown requested message
59        Self(Default::default())
60    }
61}
62
63static_assertions::assert_eq_size!(ShutdownRequestedEvent, ());
64
65#[derive(Debug, Clone, Eq, PartialEq)]
66pub struct MessageRegistry {
67    // TODO: const message lookup
68    //  don't use TypeId as it is unsound and may collide
69    //  -> use a proc macro to generated an associated uuid, maybe it is possible to
70    //  even get better performance with the capabilities of proc macros during dispatching?
71    //  see https://github.com/rust-lang/rust/issues/10389
72    message_index_set: IndexSet<TypeId>,
73
74    // TODO: const message lookup
75    // message_lkp_tbl: Vec<Option<usize>>,
76    message_size: MessageSize,
77}
78
79impl MessageRegistry {
80    pub fn new() -> Self {
81        Self::default()
82    }
83
84    #[inline]
85    pub fn get_index_of<E: 'static + Send + Sync>(&self) -> Option<usize> {
86        self.get_index(TypeId::of::<E>())
87    }
88
89    #[inline]
90    pub fn get_index(&self, tid: TypeId) -> Option<usize> {
91        self.message_index_set.get_index_of(&tid)
92    }
93
94    #[inline]
95    pub fn register_of<E: 'static + Send + Sync>(&mut self) -> usize {
96        self.message_size = self.message_size.max(MessageSize::of::<E>());
97        self.message_index_set.insert_full(TypeId::of::<E>()).0
98    }
99
100    pub(crate) fn register_all<I: Iterator<Item = TypeId>>(
101        &mut self,
102        tids: I,
103        max_message_size: MessageSize,
104    ) -> &mut Self {
105        for tid in tids {
106            self.message_index_set.insert(tid);
107        }
108        self.message_size = self.message_size.max(max_message_size);
109
110        self
111    }
112
113    #[inline]
114    pub fn len(&self) -> usize {
115        self.message_index_set.len()
116    }
117
118    #[inline]
119    pub fn message_size(&self) -> MessageSize {
120        self.message_size
121    }
122}
123
124impl Default for MessageRegistry {
125    fn default() -> Self {
126        let mut registry = Self {
127            message_index_set: Default::default(),
128            message_size: Default::default(),
129        };
130
131        let e_idx = registry.register_of::<PaddingMessage>();
132        assert_eq!(e_idx, PaddingMessage::MESSAGE_INDEX);
133
134        let e_idx = registry.register_of::<ShutdownCommand>();
135        assert_eq!(e_idx, ShutdownCommand::MESSAGE_INDEX);
136
137        registry
138    }
139}
140
141#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
142#[repr(transparent)]
143pub struct MessageSize(usize);
144
145impl MessageSize {
146    pub const MESSAGE_ALIGN: usize = std::mem::align_of::<usize>();
147
148    #[inline(always)]
149    pub(crate) fn of<T: 'static + Sized + Send + Sync>() -> Self {
150        // TODO:
151        //  Make this a compile time check,
152        //  once static assertions can support generic parameters from the outer function
153        //  see:
154        //      https://github.com/rust-lang/rfcs/issues/2790,
155        //      https://github.com/nvzqz/static-assertions-rs/issues/21
156        assert_eq!(std::mem::align_of::<usize>() % std::mem::align_of::<T>(), 0);
157        Self(Self::for_size(std::mem::size_of::<T>()))
158    }
159
160    #[inline(always)]
161    pub(crate) fn for_size(message_size: usize) -> usize {
162        let alignments = (message_size / Self::MESSAGE_ALIGN)
163            + if message_size % Self::MESSAGE_ALIGN == 0 {
164                0
165            } else {
166                1
167            };
168
169        alignments * Self::MESSAGE_ALIGN
170    }
171
172    #[inline]
173    pub fn inner(self) -> usize {
174        self.0
175    }
176}
177
178impl Deref for MessageSize {
179    type Target = usize;
180
181    fn deref(&self) -> &Self::Target {
182        &self.0
183    }
184}
185
186impl Into<usize> for MessageSize {
187    fn into(self) -> usize {
188        self.0
189    }
190}
191
192impl Default for MessageSize {
193    fn default() -> Self {
194        Self(Self::MESSAGE_ALIGN)
195    }
196}
197
198pub struct UntypedMessage {
199    e_idx: usize,
200    tid: TypeId,
201    data: *mut u8,
202    data_size: usize,
203    drop_fn: Option<fn(*mut u8)>,
204}
205
206impl UntypedMessage {
207    pub(crate) unsafe fn new<E: 'static + Send + Sync>(e_idx: usize, message: E) -> Self {
208        let data_len = MessageSize::of::<E>().inner();
209        let layout = Self::layout(data_len);
210        let ptr = std::alloc::alloc(layout) as *mut E;
211        ptr.write(message);
212        let drop_fn: Option<fn(*mut u8)> = if std::mem::needs_drop::<E>() {
213            Some(|ptr| (ptr as *mut E).drop_in_place())
214        } else {
215            None
216        };
217
218        Self {
219            e_idx,
220            tid: TypeId::of::<E>(),
221            data: ptr as *mut u8,
222            data_size: data_len,
223            drop_fn,
224        }
225    }
226
227    fn layout(size: usize) -> Layout {
228        Layout::from_size_align(size, MessageSize::MESSAGE_ALIGN).unwrap()
229    }
230}
231
232impl Drop for UntypedMessage {
233    #[inline]
234    fn drop(&mut self) {
235        unsafe {
236            if let Some(drop) = self.drop_fn {
237                (drop)(self.data as *mut u8);
238            }
239            std::alloc::dealloc(self.data as *mut u8, Self::layout(self.data_size));
240        }
241    }
242}
243
244unsafe impl Send for UntypedMessage {}
245unsafe impl Sync for UntypedMessage {}
246
247pub struct MessageSender {
248    head: TripleBufferedHead<MessageVec>,
249    // Don't auto impl send and sync as TripleBufferedHead should be thread specific
250    _pd: PhantomData<*mut u8>,
251}
252
253static_assertions::assert_not_impl_any!(MessageSender: Send, Sync);
254
255impl MessageSender {
256    pub fn new(head: TripleBufferedHead<MessageVec>) -> Self {
257        Self {
258            head,
259            _pd: Default::default(),
260        }
261    }
262
263    #[inline]
264    pub fn send<E: 'static + Send + Sync>(&self, message: E) -> bool {
265        let send = self.head.write().push(message);
266        if !send {
267            log::debug!(
268                "skipping sending of unhandled message type: {}",
269                std::any::type_name::<E>()
270            );
271        }
272
273        send
274    }
275
276    #[inline]
277    pub fn send_iter<I: IntoIterator<Item = E>, E: 'static + Send + Sync>(
278        &self,
279        messages: I,
280    ) -> bool {
281        let send = self.head.write().extend(messages);
282        if !send {
283            log::debug!(
284                "skipping sending of unhandled message type: {}",
285                std::any::type_name::<E>()
286            );
287        }
288
289        send
290    }
291
292    #[inline]
293    pub fn send_all(&self, messages: &mut MessageVec) {
294        self.head.write().extend_vec(messages);
295    }
296
297    #[inline]
298    pub fn buffer(&self) -> MessageVec {
299        let registry = self.head.write().get_registry().clone();
300
301        MessageVec::new(registry)
302    }
303
304    #[inline]
305    pub fn prepare<E: 'static + Send + Sync>(&self, message: E) -> Option<UntypedMessage> {
306        unsafe {
307            // Optimization: static resolution of e_idx
308            self.head
309                .write()
310                .get_registry()
311                .get_index_of::<E>()
312                .map(|e_idx| UntypedMessage::new(e_idx, message))
313        }
314    }
315
316    #[inline]
317    pub fn send_untyped(&self, mut message: UntypedMessage) {
318        unsafe {
319            let mut head = self.head.write();
320            match head.get_registry().get_index(message.tid) {
321                Some(e_idx) if e_idx == message.e_idx => {
322                    head.push_untyped(
323                        e_idx,
324                        message.data,
325                        message.data_size,
326                        message.drop_fn.take(),
327                    );
328                }
329                _ => {
330                    panic!("untyped message is incompatible with message registry");
331                }
332            }
333        }
334    }
335}
336
337impl std::fmt::Debug for MessageSender {
338    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
339        f.write_str("MessageSender")
340    }
341}
342
343impl Clone for MessageSender {
344    fn clone(&self) -> Self {
345        Self {
346            head: self.head.clone(),
347            _pd: Default::default(),
348        }
349    }
350}
351
352#[cfg(test)]
353mod test {
354    use super::MessageSize;
355
356    #[test]
357    fn aligned_size_of() {
358        assert_eq!(
359            MessageSize::of::<[u8; 1]>().inner() % MessageSize::MESSAGE_ALIGN,
360            0
361        );
362        assert_eq!(
363            MessageSize::of::<[u8; 3]>().inner() % MessageSize::MESSAGE_ALIGN,
364            0
365        );
366        assert_eq!(
367            MessageSize::of::<[u8; 7]>().inner() % MessageSize::MESSAGE_ALIGN,
368            0
369        );
370        assert_eq!(
371            MessageSize::of::<[u8; 15]>().inner() % MessageSize::MESSAGE_ALIGN,
372            0
373        );
374        assert_eq!(
375            MessageSize::of::<[u8; 31]>().inner() % MessageSize::MESSAGE_ALIGN,
376            0
377        );
378        assert_eq!(
379            MessageSize::of::<[u8; 63]>().inner() % MessageSize::MESSAGE_ALIGN,
380            0
381        );
382    }
383}