a653rs_linux_core/
queuing.rs

1use std::fmt::Debug;
2use std::mem;
3use std::mem::size_of;
4use std::os::fd::RawFd;
5use std::os::fd::{AsRawFd, OwnedFd};
6use std::ptr::slice_from_raw_parts;
7use std::time::Instant;
8
9use a653rs::bindings::PortDirection;
10use memfd::{FileSeal, Memfd, MemfdOptions};
11use memmap2::MmapMut;
12
13use crate::channel::{PortConfig, QueuingChannelConfig};
14use crate::error::{ResultExt, SystemError, TypedError, TypedResult};
15use crate::partition::QueuingConstant;
16use crate::queuing::concurrent_queue::ConcurrentQueue;
17
18pub mod concurrent_queue {
19    use std::cell::UnsafeCell;
20    use std::fmt::{Debug, Formatter};
21    use std::sync::atomic::{AtomicUsize, Ordering};
22    use std::{mem, ptr};
23
24    /// An unsized bounded concurrent queue (Fifo) that makes use of atomics and
25    /// does not use pointers internally. This allows the queue to be
26    /// created inside a buffer of type `&[u8]` via [ConcurrentQueue::init_at].
27    /// The required buffer size can be requested in advance via
28    /// [ConcurrentQueue::size] by providing the size and maximum number of
29    /// entries. # Example
30    /// ```
31    /// # use a653rs_linux_core::queuing::concurrent_queue::ConcurrentQueue;
32    /// // Create a ConcurrentQueue inside of a Vec<u8> buffer object
33    /// let required_size = ConcurrentQueue::size(1, 4);
34    /// let mut  buffer = vec![0u8; required_size];
35    /// ConcurrentQueue::init_at(&mut buffer, 1, 4);
36    /// let queue1 = unsafe { ConcurrentQueue::load_from(&buffer) };
37    /// let queue2 = unsafe { ConcurrentQueue::load_from(&buffer) };
38    ///
39    /// // Let's push some values in the queue
40    /// assert!(queue1.push(&[1]).is_some());
41    /// assert!(queue2.push(&[2]).is_some());
42    ///
43    /// // Now pop them using the Fifo method
44    /// assert_eq!(queue2.pop().unwrap()[0], 1);
45    /// assert_eq!(queue1.pop().unwrap()[0], 2);
46    ///
47    /// // When the queue is empty, pop will return None
48    /// assert_eq!(queue1.pop(), None);
49    /// assert_eq!(queue2.pop(), None);
50    /// ```
51    #[repr(C)]
52    pub struct ConcurrentQueue {
53        pub msg_size: usize,
54        pub msg_capacity: usize,
55
56        len: AtomicUsize,
57        first: AtomicUsize,
58        data: UnsafeCell<[u8]>,
59    }
60
61    unsafe impl Send for ConcurrentQueue {}
62    unsafe impl Sync for ConcurrentQueue {}
63
64    impl ptr_meta::Pointee for ConcurrentQueue {
65        type Metadata = usize;
66    }
67
68    impl ConcurrentQueue {
69        /// Calculates the required buffer size to fit a MessageQueue object
70        /// with `capacity` maximum elements and a fixed size of `element_size`
71        /// bytes per element.
72        pub fn size(element_size: usize, capacity: usize) -> usize {
73            let mut size = Self::fields_size() + element_size * capacity; // data
74
75            // We need to include extra padding for calculating this structs size,
76            // because of `#[repr(C)]` the compiler may add padding to this struct for
77            // alignment purposes,
78            let alignment = Self::align();
79            let sub_alignment_mask = alignment - 1;
80            if size & sub_alignment_mask > 0 {
81                // If the size ended with non-aligned bytes, we add the necessary padding.
82                size = (size & !sub_alignment_mask) + alignment;
83            }
84
85            size
86        }
87
88        /// Returns the size of bytes of this struct's fields
89        fn fields_size() -> usize {
90            mem::size_of::<usize>() // entry_size
91                + mem::size_of::<usize>() // capacity
92                + mem::size_of::<AtomicUsize>() // len
93                + mem::size_of::<AtomicUsize>() // first
94        }
95
96        /// Returns this struct's alignment
97        fn align() -> usize {
98            // This structs maximum alignment is that of a usize (or AtomicUsize, which has
99            // the same data layout)
100            mem::align_of::<usize>()
101        }
102
103        /// Creates a new empty ConcurrentQueue in given buffer.
104        /// Even though this function returns a reference to the newly created
105        /// ConcurrentQueue, it should be dropped to release the mutable
106        /// reference to the buffer.
107        ///
108        /// # Panics
109        /// If the buffer size is not exactly the required size to fit this
110        /// `ConcurrentQueue` object.
111        pub fn init_at(buffer: &mut [u8], element_size: usize, capacity: usize) -> &Self {
112            assert_eq!(buffer.len(), Self::size(element_size, capacity));
113
114            // We cast the `buffer` reference to a `Self` pointer, which can then safely be
115            // dereferenced
116            let queue = unsafe { &mut *Self::buf_to_self_mut(buffer) };
117
118            queue.msg_size = element_size;
119            queue.msg_capacity = capacity;
120            // Use `ptr::write` to prevent the compiler from trying to drop previous values.
121            unsafe {
122                ptr::write(&mut queue.len, AtomicUsize::new(0));
123                ptr::write(&mut queue.first, AtomicUsize::new(0));
124            }
125
126            queue
127        }
128
129        /// Converts the given buffer pointer to a ConcurrentQueue pointer and
130        /// handles shortening the wide-pointer metadata.
131        fn buf_to_self(buffer: *const [u8]) -> *const Self {
132            let (buf_ptr, mut buf_len): (*const (), usize) = ptr_meta::PtrExt::to_raw_parts(buffer);
133            buf_len -= Self::fields_size();
134
135            ptr_meta::from_raw_parts(buf_ptr, buf_len)
136        }
137
138        /// Converts the given mutable buffer pointer to a ConcurrentQueue
139        /// pointer and handles shortening the wide-pointer metadata.
140        fn buf_to_self_mut(buffer: *mut [u8]) -> *mut Self {
141            let (buf_ptr, mut buf_len): (*mut (), usize) = ptr_meta::PtrExt::to_raw_parts(buffer);
142            buf_len -= Self::fields_size();
143
144            ptr_meta::from_raw_parts_mut(buf_ptr, buf_len)
145        }
146
147        /// Loads a `ConcurrentQueue` from the specified buffer.
148        /// # Safety
149        /// The buffer must contain exactly one valid ConcurrentQueue, which has
150        /// to be initialized through [ConcurrentQueue::init_at]. Also
151        /// mutating or reading raw values from the buffer may result in
152        /// UB, because the ConcurrentQueue relies on internal safety mechanisms
153        /// to prevent UB due to shared mutable state.
154        pub unsafe fn load_from(buffer: &[u8]) -> &Self {
155            let obj = &*Self::buf_to_self(buffer);
156
157            // Perform some validity checks
158            debug_assert!(obj.len.load(Ordering::SeqCst) <= obj.msg_capacity); // Check length
159            debug_assert!(obj.first.load(Ordering::SeqCst) < obj.msg_capacity); // Check first idx
160
161            // Also check if unsized data field is of correct size
162            // Note: obj_data may be longer than `obj.msg_size * obj.msg_capacity` due to
163            // alignment padding. To correct we call `Self::size`.
164            let obj_data = obj.data.get().as_ref().unwrap();
165            debug_assert_eq!(
166                obj_data.len(),
167                Self::size(obj.msg_size, obj.msg_capacity) - Self::fields_size()
168            );
169
170            obj
171        }
172
173        /// Calculates the physical starting index of an element inside of the
174        /// data array.
175        fn to_physical_idx(&self, first: usize, idx: usize) -> usize {
176            (first + idx) % self.msg_capacity * self.msg_size
177        }
178
179        /// Gets an element from the queue at a specific index
180        pub fn get(&self, idx: usize) -> Option<&[u8]> {
181            assert!(idx < self.msg_capacity);
182
183            let current_len = self.len.load(Ordering::SeqCst);
184            if idx > current_len {
185                return None;
186            }
187
188            let idx = self.to_physical_idx(self.first.load(Ordering::SeqCst), idx);
189
190            let msg = &unsafe { self.data.get().as_mut().unwrap() }[idx..(idx + self.msg_size)];
191            Some(msg)
192        }
193
194        /// Pushes an element to the back of the queue. If there was space, a
195        /// mutable reference to the inserted element is returned.
196        pub fn push(&self, data: &[u8]) -> Option<&mut [u8]> {
197            assert_eq!(data.len(), self.msg_size);
198
199            self.push_then(|entry| entry.copy_from_slice(data))
200        }
201
202        /// Pushes an uninitialized element and then calls a closure to set its
203        /// memory in-place. If there was space, a mutable reference to
204        /// the inserted element is returned.
205        pub fn push_then<F: FnOnce(&'_ mut [u8])>(&self, set_element: F) -> Option<&mut [u8]> {
206            let current_len = self.len.load(Ordering::SeqCst);
207            if current_len == self.msg_capacity {
208                return None;
209            }
210
211            let insert_idx = self.len.fetch_add(1, Ordering::SeqCst);
212
213            let idx = self.to_physical_idx(self.first.load(Ordering::SeqCst), insert_idx);
214            let element_slot =
215                &mut unsafe { self.data.get().as_mut().unwrap() }[idx..(idx + self.msg_size)];
216
217            set_element(element_slot);
218
219            Some(element_slot)
220        }
221
222        /// Tries to pop an element from the front of the queue.
223        pub fn pop(&self) -> Option<Box<[u8]>> {
224            self.pop_then(|entry| Vec::from(entry).into_boxed_slice())
225        }
226
227        /// Calls a mapping closure on the first element that is about to be
228        /// popped from the queue. Only the return value of the closure
229        /// is returned by this function. If the popped element is
230        /// needed as owned data, consider using [ConcurrentQueue::pop] instead.
231        pub fn pop_then<F: FnOnce(&'_ [u8]) -> T, T>(&'_ self, map_element: F) -> Option<T> {
232            // Decrement length
233            self.len
234                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |len| len.checked_sub(1))
235                .ok()?;
236
237            let prev_first = self
238                .first
239                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
240                    Some((x + 1) % self.msg_capacity)
241                })
242                .unwrap();
243
244            let idx = self.to_physical_idx(prev_first, 0);
245
246            let msg = &unsafe { &*self.data.get() }[idx..(idx + self.msg_size)];
247
248            Some(map_element(msg))
249        }
250
251        pub fn peek_then<T, F: FnOnce(Option<&[u8]>) -> T>(&self, f: F) -> T {
252            let len = self.len.load(Ordering::SeqCst);
253
254            let msg = (len > 0).then(|| {
255                let first = self.first.load(Ordering::SeqCst);
256                let idx = self.to_physical_idx(first, 0);
257                unsafe { &(&*self.data.get())[idx..(idx + self.msg_size)] }
258            });
259
260            f(msg)
261        }
262
263        /// Returns the current length of this queue
264        pub fn len(&self) -> usize {
265            self.len.load(Ordering::SeqCst)
266        }
267
268        #[must_use]
269        pub fn is_empty(&self) -> bool {
270            self.len() == 0
271        }
272
273        pub fn clear(&self) {
274            self.len.store(0, Ordering::SeqCst);
275        }
276    }
277
278    impl Debug for ConcurrentQueue {
279        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
280            f.debug_struct("ConcurrentQueue")
281                .field("msg_size", &self.msg_size)
282                .field("msg_capacity", &self.msg_capacity)
283                .field("len", &self.len)
284                .field("first", &self.first)
285                .finish_non_exhaustive()
286        }
287    }
288}
289
290#[derive(Debug)]
291struct SourceDatagram<'a> {
292    num_messages_in_destination: &'a mut usize,
293    has_overflowed: &'a mut bool,
294    message_queue: &'a ConcurrentQueue,
295}
296
297#[derive(Debug)]
298struct DestinationDatagram<'a> {
299    num_messages_in_source: &'a mut usize,
300    clear_requested_timestamp: &'a mut Option<Instant>,
301    has_overflowed: &'a mut bool,
302    message_queue: &'a ConcurrentQueue,
303}
304
305impl<'a> SourceDatagram<'a> {
306    fn size(msg_size: usize, msg_capacity: usize) -> usize {
307        size_of::<usize>() // number of messages in destination
308            + size_of::<bool>() // flag if queue has overflowed
309            + ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
310    }
311
312    fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
313        let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
314        let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };
315
316        let message_queue = ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity);
317
318        Self {
319            num_messages_in_destination,
320            has_overflowed,
321            message_queue,
322        }
323    }
324
325    unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
326        let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
327        let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };
328
329        let message_queue = ConcurrentQueue::load_from(buffer);
330
331        Self {
332            num_messages_in_destination,
333            has_overflowed,
334            message_queue,
335        }
336    }
337
338    fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&'_ mut self, f: F) -> Option<T> {
339        self.message_queue
340            .pop_then(|entry| f(Message::from_bytes(entry)))
341    }
342
343    fn push<'b>(&'b mut self, data: &'_ [u8], message_timestamp: Instant) -> Option<Message<'b>> {
344        // We need to check if there is enough space left in the queue.
345        // This is important, because we could theoretically store twice the number of
346        // our queue size, because we use a separate source and destination queueu.
347        // Thus we need to limit the number of messages in both queues at the same time.
348        let queue_is_full = *self.num_messages_in_destination + self.message_queue.len()
349            == self.message_queue.msg_capacity;
350
351        if queue_is_full {
352            *self.has_overflowed = true;
353            return None;
354        }
355        let entry = self.message_queue
356            .push_then(|entry| Message::init_at(entry, data, message_timestamp)).expect("push to be successful because we just checked if there is space in both the source and destination");
357
358        Some(Message::from_bytes(entry))
359    }
360}
361
362impl<'a> DestinationDatagram<'a> {
363    fn size(msg_size: usize, msg_capacity: usize) -> usize {
364        size_of::<usize>() // number of messages in source
365            + size_of::<bool>() // flag if queue is overflowed
366            + size_of::<Option<Instant>>() // flag for the timestamp when a clear was requested
367            + ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
368    }
369    fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
370        let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
371        let (clear_requested_timestamp, buffer) =
372            unsafe { buffer.strip_field_mut::<Option<Instant>>() };
373        let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };
374
375        *num_messages_in_source = 0;
376        unsafe {
377            std::ptr::write(clear_requested_timestamp, None);
378            std::ptr::write(has_overflowed, false);
379        }
380
381        Self {
382            num_messages_in_source,
383            clear_requested_timestamp,
384            has_overflowed,
385            message_queue: ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity),
386        }
387    }
388    unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
389        let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
390        let (clear_requested_timestamp, buffer) =
391            unsafe { buffer.strip_field_mut::<Option<Instant>>() };
392        let (has_overflown, buffer) = unsafe { buffer.strip_field_mut::<bool>() };
393
394        Self {
395            num_messages_in_source,
396            clear_requested_timestamp,
397            has_overflowed: has_overflown,
398            message_queue: ConcurrentQueue::load_from(buffer),
399        }
400    }
401
402    /// Takes a closure that maps the popped message to some type.
403    /// If there is a message in the queue, the resulting type and a flag
404    /// whether the queue has overflowed is returned.
405    fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&mut self, msg_mapper: F) -> Option<(T, bool)> {
406        self.message_queue
407            .pop_then(|entry| msg_mapper(Message::from_bytes(entry)))
408            .map(|t| (t, *self.has_overflowed))
409    }
410
411    /// Pushes a data onto the destination queue
412    fn push<'b>(&'b mut self, data: &'_ [u8]) -> Option<Message<'b>> {
413        let entry = self.message_queue.push(data)?;
414        let msg = Message::from_bytes(entry);
415
416        Some(msg)
417    }
418}
419
420struct Message<'a> {
421    len: &'a usize,
422    timestamp: &'a Instant,
423    /// This data slice is always of the same size, controlled by the owning
424    /// ConcurrentQueue. That means, that only the first `self.len` bytes in
425    /// it contain actual data. Use [Message::get_data] to access just the
426    /// contained bytes.
427    data: &'a [u8],
428}
429
430impl<'a> Message<'a> {
431    fn size(msg_size: usize) -> usize {
432        size_of::<usize>() // length of this message
433            + size_of::<Instant>() // timestamp when this message was sent
434            + msg_size // actual message byte data
435    }
436    fn from_bytes(bytes: &'a [u8]) -> Self {
437        let (len, bytes) = unsafe { bytes.strip_field::<usize>() };
438        let (timestamp, data) = unsafe { bytes.strip_field::<Instant>() };
439
440        assert!(
441            *len <= data.len(),
442            "*len={} data.len()={}",
443            *len,
444            data.len()
445        );
446
447        Self {
448            len,
449            timestamp,
450            data,
451        }
452    }
453
454    fn init_at(uninitialized_bytes: &mut [u8], data: &[u8], initialization_timestamp: Instant) {
455        let (len_field, uninitialized_bytes) =
456            unsafe { uninitialized_bytes.strip_field_mut::<usize>() };
457        let (timestamp, data_field) = unsafe { uninitialized_bytes.strip_field_mut::<Instant>() };
458        assert!(data_field.len() >= data.len());
459
460        unsafe {
461            std::ptr::write(timestamp, initialization_timestamp);
462        }
463
464        *len_field = data.len();
465        data_field[0..data.len()].copy_from_slice(data);
466    }
467
468    fn to_bytes(&self) -> &[u8] {
469        // # Safety
470        // len and data should be contiguous memory
471        unsafe {
472            &*slice_from_raw_parts(
473                self.len as *const usize as *const u8,
474                Self::size(self.data.len()),
475            )
476        }
477    }
478
479    fn get_data(&self) -> &[u8] {
480        &self.data[0..*self.len]
481    }
482}
483
484#[derive(Debug)]
485pub struct Queuing {
486    msg_size: usize,
487    max_num_msg: usize,
488
489    source_receiver: MmapMut,
490    source: OwnedFd,
491    source_port: PortConfig,
492
493    destination_sender: MmapMut,
494    destination: OwnedFd,
495    destination_port: PortConfig,
496}
497
498impl TryFrom<QueuingChannelConfig> for Queuing {
499    type Error = TypedError;
500
501    fn try_from(config: QueuingChannelConfig) -> Result<Self, Self::Error> {
502        let msg_size = config.msg_size.as_u64() as usize;
503        let msg_num = config.msg_num;
504
505        let source_port_name = config.source.name();
506        let (source_receiver, source) = Self::source(
507            format!("queuing_{source_port_name}_source"),
508            msg_size,
509            config.msg_num,
510        )?;
511        let (destination_sender, destination) = Self::destination(
512            format!("queuing_{source_port_name}_destination"),
513            msg_size,
514            config.msg_num,
515        )?;
516
517        Ok(Self {
518            msg_size,
519            max_num_msg: msg_num,
520            source_receiver,
521            source,
522            source_port: config.source,
523            destination_sender,
524            destination,
525            destination_port: config.destination,
526        })
527    }
528}
529
530impl Queuing {
531    pub fn constant(&self, part: impl AsRef<str>) -> Option<QueuingConstant> {
532        let (dir, fd, port) = if self.source_port.partition.eq(part.as_ref()) {
533            (
534                PortDirection::Source,
535                self.source_fd(),
536                &self.source_port.port,
537            )
538        } else {
539            (
540                PortDirection::Destination,
541                self.destination_fd(),
542                &self.destination_port.port,
543            )
544        };
545
546        Some(QueuingConstant {
547            name: port.clone(),
548            dir,
549            msg_size: self.msg_size,
550            max_num_msg: self.max_num_msg,
551            fd,
552        })
553    }
554
555    pub fn name(&self) -> String {
556        format!("{}:{}", &self.source_port.partition, self.source_port.port)
557    }
558
559    fn memfd(name: impl AsRef<str>, size: usize) -> TypedResult<Memfd> {
560        let mem = MemfdOptions::default()
561            .close_on_exec(false)
562            .allow_sealing(true)
563            .create(name)
564            .typ(SystemError::Panic)?;
565        mem.as_file().set_len(size as u64).typ(SystemError::Panic)?;
566        mem.add_seals(&[FileSeal::SealShrink, FileSeal::SealGrow])
567            .typ(SystemError::Panic)?;
568
569        Ok(mem)
570    }
571
572    fn source(
573        name: impl AsRef<str>,
574        msg_size: usize,
575        max_num_msgs: usize,
576    ) -> TypedResult<(MmapMut, OwnedFd)> {
577        let mem = Self::memfd(name, SourceDatagram::size(msg_size, max_num_msgs))?;
578
579        let mut mmap = unsafe { MmapMut::map_mut(mem.as_raw_fd()).typ(SystemError::Panic)? };
580
581        mem.add_seals(&[FileSeal::SealSeal])
582            .typ(SystemError::Panic)?;
583
584        SourceDatagram::init_at(msg_size, max_num_msgs, mmap.as_mut());
585
586        Ok((mmap, mem.into_file().into()))
587    }
588
589    fn destination(
590        name: impl AsRef<str>,
591        msg_size: usize,
592        msg_capacity: usize,
593    ) -> TypedResult<(MmapMut, OwnedFd)> {
594        let mem = Self::memfd(name, DestinationDatagram::size(msg_size, msg_capacity))?;
595
596        let mut mmap = unsafe { MmapMut::map_mut(mem.as_raw_fd()).typ(SystemError::Panic)? };
597
598        mem.add_seals(&[FileSeal::SealSeal])
599            .typ(SystemError::Panic)?;
600
601        DestinationDatagram::init_at(msg_size, msg_capacity, mmap.as_mut());
602
603        Ok((mmap, mem.into_file().into()))
604    }
605
606    /// Returns true if messages have been transferred
607    pub fn swap(&mut self) -> bool {
608        // Parse datagrams
609        let mut source_datagram =
610            unsafe { SourceDatagram::load_from(self.source_receiver.as_mut()) };
611        let mut destination_datagram =
612            unsafe { DestinationDatagram::load_from(self.destination_sender.as_mut()) };
613
614        // If a clear was requested by the destination, we pop all messages from the
615        // source queue with a timestamp before the timestamp of the clear request.
616        // This is not actually needed for ARINC653 Part 4, as only one partition can
617        // run at a time and all messages are swapped to the destination buffer after
618        // every partition execution.
619        if let Some(clear_requested_at) = mem::take(destination_datagram.clear_requested_timestamp)
620        {
621            while source_datagram.message_queue.peek_then(|msg| {
622                msg.map_or(false, |msg| {
623                    &clear_requested_at > Message::from_bytes(msg).timestamp
624                })
625            }) {
626                source_datagram.message_queue.pop_then(|_| ());
627            }
628        };
629
630        // Copy new messages from source to destination
631        let mut num_msg_swapped = 0;
632        while let Some(_new_destination_msg) =
633            source_datagram.pop_then(|msg| destination_datagram.push(msg.to_bytes()).expect("push to always succeed, because source and destination datagrams can only contain `msg_capacity` messages in total"))
634        {
635            num_msg_swapped += 1;
636        }
637
638        *source_datagram.num_messages_in_destination = destination_datagram.message_queue.len();
639        *destination_datagram.has_overflowed = *source_datagram.has_overflowed;
640
641        trace!("Swapped {num_msg_swapped} messages: Destination={destination_datagram:?} Source={source_datagram:?}");
642
643        num_msg_swapped > 0
644    }
645
646    pub fn source_fd(&self) -> RawFd {
647        self.source.as_raw_fd()
648    }
649    pub fn destination_fd(&self) -> RawFd {
650        self.destination.as_raw_fd()
651    }
652}
653
654#[derive(Debug)]
655pub struct QueuingSource(MmapMut);
656
657impl QueuingSource {
658    /// If the message was successfully enqueued, the number of bytes written is
659    /// returned.
660    pub fn write(&mut self, data: &[u8], message_timestamp: Instant) -> Option<usize> {
661        let mut datagram = unsafe { SourceDatagram::load_from(&mut self.0) };
662
663        let res = datagram.push(data, message_timestamp).map(|msg| *msg.len);
664
665        if res.is_some() {
666            // The standard states, that the receiver should only be able to detect whether
667            // the last message caused an overflow. Because we have now sent a
668            // message successfully, thus we can now reset this flag.
669            *datagram.has_overflowed = false;
670        }
671
672        res
673    }
674
675    pub fn get_current_num_messages(&mut self) -> usize {
676        let datagram = unsafe { SourceDatagram::load_from(&mut self.0) };
677
678        datagram.message_queue.len() + *datagram.num_messages_in_destination
679    }
680}
681
682impl TryFrom<RawFd> for QueuingSource {
683    type Error = TypedError;
684
685    fn try_from(file: RawFd) -> Result<Self, Self::Error> {
686        let mmap = unsafe { MmapMut::map_mut(file).typ(SystemError::Panic)? };
687
688        Ok(Self(mmap))
689    }
690}
691
692impl QueuingDestination {
693    /// Reads the current message from the queue into a buffer and increments
694    /// the current read index. If a message was successfully read, the
695    /// number of bytes read and whether the queue has overflowed.
696    pub fn read(&mut self, buffer: &mut [u8]) -> Option<(usize, bool)> {
697        let mut datagram = unsafe { DestinationDatagram::load_from(&mut self.0) };
698
699        let read_bytes_and_overflowed_flag = datagram.pop_then(|msg| {
700            let data = msg.get_data();
701            let len = data.len().min(buffer.len());
702            buffer[..len].copy_from_slice(&data[..len]);
703
704            len
705        });
706
707        read_bytes_and_overflowed_flag
708    }
709
710    pub fn get_current_num_messages(&mut self) -> usize {
711        let datagram = unsafe { DestinationDatagram::load_from(&mut self.0) };
712        datagram.message_queue.len() + *datagram.num_messages_in_source
713    }
714
715    pub fn clear(&mut self, current_time: Instant) {
716        let datagram = unsafe { DestinationDatagram::load_from(&mut self.0) };
717        datagram.message_queue.clear();
718        *datagram.clear_requested_timestamp = Some(current_time);
719    }
720}
721
722#[derive(Debug)]
723pub struct QueuingDestination(MmapMut);
724
725impl TryFrom<RawFd> for QueuingDestination {
726    type Error = TypedError;
727
728    fn try_from(file: RawFd) -> Result<Self, Self::Error> {
729        let mmap = unsafe { MmapMut::map_mut(file).typ(SystemError::Panic)? };
730
731        Ok(Self(mmap))
732    }
733}
734
735/// An extension trait for stripping generic types off of byte arrays.
736trait StripFieldExt {
737    unsafe fn strip_field<T>(&self) -> (&T, &Self);
738    unsafe fn strip_field_mut<T>(&mut self) -> (&mut T, &mut Self);
739}
740
741impl StripFieldExt for [u8] {
742    /// # Safety
743    /// The byte array must start with an initialized and valid `T`
744    unsafe fn strip_field<T>(&self) -> (&T, &Self) {
745        assert!(self.len() >= size_of::<T>());
746        let (field, rest) = self.split_at(size_of::<T>());
747        let field = (field.as_ptr() as *const T).as_ref().unwrap();
748        (field, rest)
749    }
750
751    /// The byte array must start with an initialized and valid `T`
752    unsafe fn strip_field_mut<T>(&mut self) -> (&mut T, &mut Self) {
753        assert!(self.len() >= size_of::<T>());
754        let (field, rest) = self.split_at_mut(size_of::<T>());
755        let field = (field.as_ptr() as *mut T).as_mut().unwrap();
756        (field, rest)
757    }
758}