1mod buffer;
2pub mod bus;
3pub mod vec;
4
5use crate::message::vec::MessageVec;
6use crate::util::triple::TripleBufferedHead;
7use crate::util::IndexSet;
8use std::alloc::Layout;
9use std::any::TypeId;
10use std::fmt::Formatter;
11use std::marker::PhantomData;
12use std::ops::Deref;
13
14pub struct PaddingMessage;
35
36impl PaddingMessage {
37 pub(crate) const MESSAGE_INDEX: usize = 0;
38}
39static_assertions::assert_eq_size!(PaddingMessage, ());
40
41pub struct ShutdownCommand(PhantomData<()>);
42
43impl ShutdownCommand {
44 pub(crate) const MESSAGE_INDEX: usize = 1;
45
46 pub(crate) fn new() -> Self {
47 Self(Default::default())
49 }
50}
51
52static_assertions::assert_eq_size!(ShutdownCommand, ());
53
54pub struct ShutdownRequestedEvent(PhantomData<()>);
55
56impl ShutdownRequestedEvent {
57 pub(crate) fn new() -> Self {
58 Self(Default::default())
60 }
61}
62
63static_assertions::assert_eq_size!(ShutdownRequestedEvent, ());
64
65#[derive(Debug, Clone, Eq, PartialEq)]
66pub struct MessageRegistry {
67 message_index_set: IndexSet<TypeId>,
73
74 message_size: MessageSize,
77}
78
79impl MessageRegistry {
80 pub fn new() -> Self {
81 Self::default()
82 }
83
84 #[inline]
85 pub fn get_index_of<E: 'static + Send + Sync>(&self) -> Option<usize> {
86 self.get_index(TypeId::of::<E>())
87 }
88
89 #[inline]
90 pub fn get_index(&self, tid: TypeId) -> Option<usize> {
91 self.message_index_set.get_index_of(&tid)
92 }
93
94 #[inline]
95 pub fn register_of<E: 'static + Send + Sync>(&mut self) -> usize {
96 self.message_size = self.message_size.max(MessageSize::of::<E>());
97 self.message_index_set.insert_full(TypeId::of::<E>()).0
98 }
99
100 pub(crate) fn register_all<I: Iterator<Item = TypeId>>(
101 &mut self,
102 tids: I,
103 max_message_size: MessageSize,
104 ) -> &mut Self {
105 for tid in tids {
106 self.message_index_set.insert(tid);
107 }
108 self.message_size = self.message_size.max(max_message_size);
109
110 self
111 }
112
113 #[inline]
114 pub fn len(&self) -> usize {
115 self.message_index_set.len()
116 }
117
118 #[inline]
119 pub fn message_size(&self) -> MessageSize {
120 self.message_size
121 }
122}
123
124impl Default for MessageRegistry {
125 fn default() -> Self {
126 let mut registry = Self {
127 message_index_set: Default::default(),
128 message_size: Default::default(),
129 };
130
131 let e_idx = registry.register_of::<PaddingMessage>();
132 assert_eq!(e_idx, PaddingMessage::MESSAGE_INDEX);
133
134 let e_idx = registry.register_of::<ShutdownCommand>();
135 assert_eq!(e_idx, ShutdownCommand::MESSAGE_INDEX);
136
137 registry
138 }
139}
140
141#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
142#[repr(transparent)]
143pub struct MessageSize(usize);
144
145impl MessageSize {
146 pub const MESSAGE_ALIGN: usize = std::mem::align_of::<usize>();
147
148 #[inline(always)]
149 pub(crate) fn of<T: 'static + Sized + Send + Sync>() -> Self {
150 assert_eq!(std::mem::align_of::<usize>() % std::mem::align_of::<T>(), 0);
157 Self(Self::for_size(std::mem::size_of::<T>()))
158 }
159
160 #[inline(always)]
161 pub(crate) fn for_size(message_size: usize) -> usize {
162 let alignments = (message_size / Self::MESSAGE_ALIGN)
163 + if message_size % Self::MESSAGE_ALIGN == 0 {
164 0
165 } else {
166 1
167 };
168
169 alignments * Self::MESSAGE_ALIGN
170 }
171
172 #[inline]
173 pub fn inner(self) -> usize {
174 self.0
175 }
176}
177
178impl Deref for MessageSize {
179 type Target = usize;
180
181 fn deref(&self) -> &Self::Target {
182 &self.0
183 }
184}
185
186impl Into<usize> for MessageSize {
187 fn into(self) -> usize {
188 self.0
189 }
190}
191
192impl Default for MessageSize {
193 fn default() -> Self {
194 Self(Self::MESSAGE_ALIGN)
195 }
196}
197
198pub struct UntypedMessage {
199 e_idx: usize,
200 tid: TypeId,
201 data: *mut u8,
202 data_size: usize,
203 drop_fn: Option<fn(*mut u8)>,
204}
205
206impl UntypedMessage {
207 pub(crate) unsafe fn new<E: 'static + Send + Sync>(e_idx: usize, message: E) -> Self {
208 let data_len = MessageSize::of::<E>().inner();
209 let layout = Self::layout(data_len);
210 let ptr = std::alloc::alloc(layout) as *mut E;
211 ptr.write(message);
212 let drop_fn: Option<fn(*mut u8)> = if std::mem::needs_drop::<E>() {
213 Some(|ptr| (ptr as *mut E).drop_in_place())
214 } else {
215 None
216 };
217
218 Self {
219 e_idx,
220 tid: TypeId::of::<E>(),
221 data: ptr as *mut u8,
222 data_size: data_len,
223 drop_fn,
224 }
225 }
226
227 fn layout(size: usize) -> Layout {
228 Layout::from_size_align(size, MessageSize::MESSAGE_ALIGN).unwrap()
229 }
230}
231
232impl Drop for UntypedMessage {
233 #[inline]
234 fn drop(&mut self) {
235 unsafe {
236 if let Some(drop) = self.drop_fn {
237 (drop)(self.data as *mut u8);
238 }
239 std::alloc::dealloc(self.data as *mut u8, Self::layout(self.data_size));
240 }
241 }
242}
243
244unsafe impl Send for UntypedMessage {}
245unsafe impl Sync for UntypedMessage {}
246
247pub struct MessageSender {
248 head: TripleBufferedHead<MessageVec>,
249 _pd: PhantomData<*mut u8>,
251}
252
253static_assertions::assert_not_impl_any!(MessageSender: Send, Sync);
254
255impl MessageSender {
256 pub fn new(head: TripleBufferedHead<MessageVec>) -> Self {
257 Self {
258 head,
259 _pd: Default::default(),
260 }
261 }
262
263 #[inline]
264 pub fn send<E: 'static + Send + Sync>(&self, message: E) -> bool {
265 let send = self.head.write().push(message);
266 if !send {
267 log::debug!(
268 "skipping sending of unhandled message type: {}",
269 std::any::type_name::<E>()
270 );
271 }
272
273 send
274 }
275
276 #[inline]
277 pub fn send_iter<I: IntoIterator<Item = E>, E: 'static + Send + Sync>(
278 &self,
279 messages: I,
280 ) -> bool {
281 let send = self.head.write().extend(messages);
282 if !send {
283 log::debug!(
284 "skipping sending of unhandled message type: {}",
285 std::any::type_name::<E>()
286 );
287 }
288
289 send
290 }
291
292 #[inline]
293 pub fn send_all(&self, messages: &mut MessageVec) {
294 self.head.write().extend_vec(messages);
295 }
296
297 #[inline]
298 pub fn buffer(&self) -> MessageVec {
299 let registry = self.head.write().get_registry().clone();
300
301 MessageVec::new(registry)
302 }
303
304 #[inline]
305 pub fn prepare<E: 'static + Send + Sync>(&self, message: E) -> Option<UntypedMessage> {
306 unsafe {
307 self.head
309 .write()
310 .get_registry()
311 .get_index_of::<E>()
312 .map(|e_idx| UntypedMessage::new(e_idx, message))
313 }
314 }
315
316 #[inline]
317 pub fn send_untyped(&self, mut message: UntypedMessage) {
318 unsafe {
319 let mut head = self.head.write();
320 match head.get_registry().get_index(message.tid) {
321 Some(e_idx) if e_idx == message.e_idx => {
322 head.push_untyped(
323 e_idx,
324 message.data,
325 message.data_size,
326 message.drop_fn.take(),
327 );
328 }
329 _ => {
330 panic!("untyped message is incompatible with message registry");
331 }
332 }
333 }
334 }
335}
336
337impl std::fmt::Debug for MessageSender {
338 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
339 f.write_str("MessageSender")
340 }
341}
342
343impl Clone for MessageSender {
344 fn clone(&self) -> Self {
345 Self {
346 head: self.head.clone(),
347 _pd: Default::default(),
348 }
349 }
350}
351
352#[cfg(test)]
353mod test {
354 use super::MessageSize;
355
356 #[test]
357 fn aligned_size_of() {
358 assert_eq!(
359 MessageSize::of::<[u8; 1]>().inner() % MessageSize::MESSAGE_ALIGN,
360 0
361 );
362 assert_eq!(
363 MessageSize::of::<[u8; 3]>().inner() % MessageSize::MESSAGE_ALIGN,
364 0
365 );
366 assert_eq!(
367 MessageSize::of::<[u8; 7]>().inner() % MessageSize::MESSAGE_ALIGN,
368 0
369 );
370 assert_eq!(
371 MessageSize::of::<[u8; 15]>().inner() % MessageSize::MESSAGE_ALIGN,
372 0
373 );
374 assert_eq!(
375 MessageSize::of::<[u8; 31]>().inner() % MessageSize::MESSAGE_ALIGN,
376 0
377 );
378 assert_eq!(
379 MessageSize::of::<[u8; 63]>().inner() % MessageSize::MESSAGE_ALIGN,
380 0
381 );
382 }
383}