1use std::fmt::Debug;
2use std::mem;
3use std::mem::size_of;
4use std::os::fd::RawFd;
5use std::os::fd::{AsRawFd, OwnedFd};
6use std::ptr::slice_from_raw_parts;
7use std::time::Instant;
8
9use a653rs::bindings::PortDirection;
10use memfd::{FileSeal, Memfd, MemfdOptions};
11use memmap2::MmapMut;
12
13use crate::channel::{PortConfig, QueuingChannelConfig};
14use crate::error::{ResultExt, SystemError, TypedError, TypedResult};
15use crate::partition::QueuingConstant;
16use crate::queuing::concurrent_queue::ConcurrentQueue;
17
18pub mod concurrent_queue {
19 use std::cell::UnsafeCell;
20 use std::fmt::{Debug, Formatter};
21 use std::sync::atomic::{AtomicUsize, Ordering};
22 use std::{mem, ptr};
23
24 #[repr(C)]
52 pub struct ConcurrentQueue {
53 pub msg_size: usize,
54 pub msg_capacity: usize,
55
56 len: AtomicUsize,
57 first: AtomicUsize,
58 data: UnsafeCell<[u8]>,
59 }
60
61 unsafe impl Send for ConcurrentQueue {}
62 unsafe impl Sync for ConcurrentQueue {}
63
64 impl ptr_meta::Pointee for ConcurrentQueue {
65 type Metadata = usize;
66 }
67
68 impl ConcurrentQueue {
69 pub fn size(element_size: usize, capacity: usize) -> usize {
73 let mut size = Self::fields_size() + element_size * capacity; let alignment = Self::align();
79 let sub_alignment_mask = alignment - 1;
80 if size & sub_alignment_mask > 0 {
81 size = (size & !sub_alignment_mask) + alignment;
83 }
84
85 size
86 }
87
88 fn fields_size() -> usize {
90 mem::size_of::<usize>() + mem::size_of::<usize>() + mem::size_of::<AtomicUsize>() + mem::size_of::<AtomicUsize>() }
95
96 fn align() -> usize {
98 mem::align_of::<usize>()
101 }
102
103 pub fn init_at(buffer: &mut [u8], element_size: usize, capacity: usize) -> &Self {
112 assert_eq!(buffer.len(), Self::size(element_size, capacity));
113
114 let queue = unsafe { &mut *Self::buf_to_self_mut(buffer) };
117
118 queue.msg_size = element_size;
119 queue.msg_capacity = capacity;
120 unsafe {
122 ptr::write(&mut queue.len, AtomicUsize::new(0));
123 ptr::write(&mut queue.first, AtomicUsize::new(0));
124 }
125
126 queue
127 }
128
129 fn buf_to_self(buffer: *const [u8]) -> *const Self {
132 let (buf_ptr, mut buf_len): (*const (), usize) = ptr_meta::PtrExt::to_raw_parts(buffer);
133 buf_len -= Self::fields_size();
134
135 ptr_meta::from_raw_parts(buf_ptr, buf_len)
136 }
137
138 fn buf_to_self_mut(buffer: *mut [u8]) -> *mut Self {
141 let (buf_ptr, mut buf_len): (*mut (), usize) = ptr_meta::PtrExt::to_raw_parts(buffer);
142 buf_len -= Self::fields_size();
143
144 ptr_meta::from_raw_parts_mut(buf_ptr, buf_len)
145 }
146
147 pub unsafe fn load_from(buffer: &[u8]) -> &Self {
155 let obj = &*Self::buf_to_self(buffer);
156
157 debug_assert!(obj.len.load(Ordering::SeqCst) <= obj.msg_capacity); debug_assert!(obj.first.load(Ordering::SeqCst) < obj.msg_capacity); let obj_data = obj.data.get().as_ref().unwrap();
165 debug_assert_eq!(
166 obj_data.len(),
167 Self::size(obj.msg_size, obj.msg_capacity) - Self::fields_size()
168 );
169
170 obj
171 }
172
173 fn to_physical_idx(&self, first: usize, idx: usize) -> usize {
176 (first + idx) % self.msg_capacity * self.msg_size
177 }
178
179 pub fn get(&self, idx: usize) -> Option<&[u8]> {
181 assert!(idx < self.msg_capacity);
182
183 let current_len = self.len.load(Ordering::SeqCst);
184 if idx > current_len {
185 return None;
186 }
187
188 let idx = self.to_physical_idx(self.first.load(Ordering::SeqCst), idx);
189
190 let msg = &unsafe { self.data.get().as_mut().unwrap() }[idx..(idx + self.msg_size)];
191 Some(msg)
192 }
193
194 pub fn push(&self, data: &[u8]) -> Option<&mut [u8]> {
197 assert_eq!(data.len(), self.msg_size);
198
199 self.push_then(|entry| entry.copy_from_slice(data))
200 }
201
202 pub fn push_then<F: FnOnce(&'_ mut [u8])>(&self, set_element: F) -> Option<&mut [u8]> {
206 let current_len = self.len.load(Ordering::SeqCst);
207 if current_len == self.msg_capacity {
208 return None;
209 }
210
211 let insert_idx = self.len.fetch_add(1, Ordering::SeqCst);
212
213 let idx = self.to_physical_idx(self.first.load(Ordering::SeqCst), insert_idx);
214 let element_slot =
215 &mut unsafe { self.data.get().as_mut().unwrap() }[idx..(idx + self.msg_size)];
216
217 set_element(element_slot);
218
219 Some(element_slot)
220 }
221
222 pub fn pop(&self) -> Option<Box<[u8]>> {
224 self.pop_then(|entry| Vec::from(entry).into_boxed_slice())
225 }
226
227 pub fn pop_then<F: FnOnce(&'_ [u8]) -> T, T>(&'_ self, map_element: F) -> Option<T> {
232 self.len
234 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |len| len.checked_sub(1))
235 .ok()?;
236
237 let prev_first = self
238 .first
239 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
240 Some((x + 1) % self.msg_capacity)
241 })
242 .unwrap();
243
244 let idx = self.to_physical_idx(prev_first, 0);
245
246 let msg = &unsafe { &*self.data.get() }[idx..(idx + self.msg_size)];
247
248 Some(map_element(msg))
249 }
250
251 pub fn peek_then<T, F: FnOnce(Option<&[u8]>) -> T>(&self, f: F) -> T {
252 let len = self.len.load(Ordering::SeqCst);
253
254 let msg = (len > 0).then(|| {
255 let first = self.first.load(Ordering::SeqCst);
256 let idx = self.to_physical_idx(first, 0);
257 unsafe { &(&*self.data.get())[idx..(idx + self.msg_size)] }
258 });
259
260 f(msg)
261 }
262
263 pub fn len(&self) -> usize {
265 self.len.load(Ordering::SeqCst)
266 }
267
268 #[must_use]
269 pub fn is_empty(&self) -> bool {
270 self.len() == 0
271 }
272
273 pub fn clear(&self) {
274 self.len.store(0, Ordering::SeqCst);
275 }
276 }
277
278 impl Debug for ConcurrentQueue {
279 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
280 f.debug_struct("ConcurrentQueue")
281 .field("msg_size", &self.msg_size)
282 .field("msg_capacity", &self.msg_capacity)
283 .field("len", &self.len)
284 .field("first", &self.first)
285 .finish_non_exhaustive()
286 }
287 }
288}
289
290#[derive(Debug)]
291struct SourceDatagram<'a> {
292 num_messages_in_destination: &'a mut usize,
293 has_overflowed: &'a mut bool,
294 message_queue: &'a ConcurrentQueue,
295}
296
297#[derive(Debug)]
298struct DestinationDatagram<'a> {
299 num_messages_in_source: &'a mut usize,
300 clear_requested_timestamp: &'a mut Option<Instant>,
301 has_overflowed: &'a mut bool,
302 message_queue: &'a ConcurrentQueue,
303}
304
305impl<'a> SourceDatagram<'a> {
306 fn size(msg_size: usize, msg_capacity: usize) -> usize {
307 size_of::<usize>() + size_of::<bool>() + ConcurrentQueue::size(Message::size(msg_size), msg_capacity) }
311
312 fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
313 let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
314 let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };
315
316 let message_queue = ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity);
317
318 Self {
319 num_messages_in_destination,
320 has_overflowed,
321 message_queue,
322 }
323 }
324
325 unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
326 let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
327 let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };
328
329 let message_queue = ConcurrentQueue::load_from(buffer);
330
331 Self {
332 num_messages_in_destination,
333 has_overflowed,
334 message_queue,
335 }
336 }
337
338 fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&'_ mut self, f: F) -> Option<T> {
339 self.message_queue
340 .pop_then(|entry| f(Message::from_bytes(entry)))
341 }
342
343 fn push<'b>(&'b mut self, data: &'_ [u8], message_timestamp: Instant) -> Option<Message<'b>> {
344 let queue_is_full = *self.num_messages_in_destination + self.message_queue.len()
349 == self.message_queue.msg_capacity;
350
351 if queue_is_full {
352 *self.has_overflowed = true;
353 return None;
354 }
355 let entry = self.message_queue
356 .push_then(|entry| Message::init_at(entry, data, message_timestamp)).expect("push to be successful because we just checked if there is space in both the source and destination");
357
358 Some(Message::from_bytes(entry))
359 }
360}
361
362impl<'a> DestinationDatagram<'a> {
363 fn size(msg_size: usize, msg_capacity: usize) -> usize {
364 size_of::<usize>() + size_of::<bool>() + size_of::<Option<Instant>>() + ConcurrentQueue::size(Message::size(msg_size), msg_capacity) }
369 fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
370 let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
371 let (clear_requested_timestamp, buffer) =
372 unsafe { buffer.strip_field_mut::<Option<Instant>>() };
373 let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };
374
375 *num_messages_in_source = 0;
376 unsafe {
377 std::ptr::write(clear_requested_timestamp, None);
378 std::ptr::write(has_overflowed, false);
379 }
380
381 Self {
382 num_messages_in_source,
383 clear_requested_timestamp,
384 has_overflowed,
385 message_queue: ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity),
386 }
387 }
388 unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
389 let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
390 let (clear_requested_timestamp, buffer) =
391 unsafe { buffer.strip_field_mut::<Option<Instant>>() };
392 let (has_overflown, buffer) = unsafe { buffer.strip_field_mut::<bool>() };
393
394 Self {
395 num_messages_in_source,
396 clear_requested_timestamp,
397 has_overflowed: has_overflown,
398 message_queue: ConcurrentQueue::load_from(buffer),
399 }
400 }
401
402 fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&mut self, msg_mapper: F) -> Option<(T, bool)> {
406 self.message_queue
407 .pop_then(|entry| msg_mapper(Message::from_bytes(entry)))
408 .map(|t| (t, *self.has_overflowed))
409 }
410
411 fn push<'b>(&'b mut self, data: &'_ [u8]) -> Option<Message<'b>> {
413 let entry = self.message_queue.push(data)?;
414 let msg = Message::from_bytes(entry);
415
416 Some(msg)
417 }
418}
419
420struct Message<'a> {
421 len: &'a usize,
422 timestamp: &'a Instant,
423 data: &'a [u8],
428}
429
430impl<'a> Message<'a> {
431 fn size(msg_size: usize) -> usize {
432 size_of::<usize>() + size_of::<Instant>() + msg_size }
436 fn from_bytes(bytes: &'a [u8]) -> Self {
437 let (len, bytes) = unsafe { bytes.strip_field::<usize>() };
438 let (timestamp, data) = unsafe { bytes.strip_field::<Instant>() };
439
440 assert!(
441 *len <= data.len(),
442 "*len={} data.len()={}",
443 *len,
444 data.len()
445 );
446
447 Self {
448 len,
449 timestamp,
450 data,
451 }
452 }
453
454 fn init_at(uninitialized_bytes: &mut [u8], data: &[u8], initialization_timestamp: Instant) {
455 let (len_field, uninitialized_bytes) =
456 unsafe { uninitialized_bytes.strip_field_mut::<usize>() };
457 let (timestamp, data_field) = unsafe { uninitialized_bytes.strip_field_mut::<Instant>() };
458 assert!(data_field.len() >= data.len());
459
460 unsafe {
461 std::ptr::write(timestamp, initialization_timestamp);
462 }
463
464 *len_field = data.len();
465 data_field[0..data.len()].copy_from_slice(data);
466 }
467
468 fn to_bytes(&self) -> &[u8] {
469 unsafe {
472 &*slice_from_raw_parts(
473 self.len as *const usize as *const u8,
474 Self::size(self.data.len()),
475 )
476 }
477 }
478
479 fn get_data(&self) -> &[u8] {
480 &self.data[0..*self.len]
481 }
482}
483
484#[derive(Debug)]
485pub struct Queuing {
486 msg_size: usize,
487 max_num_msg: usize,
488
489 source_receiver: MmapMut,
490 source: OwnedFd,
491 source_port: PortConfig,
492
493 destination_sender: MmapMut,
494 destination: OwnedFd,
495 destination_port: PortConfig,
496}
497
498impl TryFrom<QueuingChannelConfig> for Queuing {
499 type Error = TypedError;
500
501 fn try_from(config: QueuingChannelConfig) -> Result<Self, Self::Error> {
502 let msg_size = config.msg_size.as_u64() as usize;
503 let msg_num = config.msg_num;
504
505 let source_port_name = config.source.name();
506 let (source_receiver, source) = Self::source(
507 format!("queuing_{source_port_name}_source"),
508 msg_size,
509 config.msg_num,
510 )?;
511 let (destination_sender, destination) = Self::destination(
512 format!("queuing_{source_port_name}_destination"),
513 msg_size,
514 config.msg_num,
515 )?;
516
517 Ok(Self {
518 msg_size,
519 max_num_msg: msg_num,
520 source_receiver,
521 source,
522 source_port: config.source,
523 destination_sender,
524 destination,
525 destination_port: config.destination,
526 })
527 }
528}
529
530impl Queuing {
531 pub fn constant(&self, part: impl AsRef<str>) -> Option<QueuingConstant> {
532 let (dir, fd, port) = if self.source_port.partition.eq(part.as_ref()) {
533 (
534 PortDirection::Source,
535 self.source_fd(),
536 &self.source_port.port,
537 )
538 } else {
539 (
540 PortDirection::Destination,
541 self.destination_fd(),
542 &self.destination_port.port,
543 )
544 };
545
546 Some(QueuingConstant {
547 name: port.clone(),
548 dir,
549 msg_size: self.msg_size,
550 max_num_msg: self.max_num_msg,
551 fd,
552 })
553 }
554
555 pub fn name(&self) -> String {
556 format!("{}:{}", &self.source_port.partition, self.source_port.port)
557 }
558
559 fn memfd(name: impl AsRef<str>, size: usize) -> TypedResult<Memfd> {
560 let mem = MemfdOptions::default()
561 .close_on_exec(false)
562 .allow_sealing(true)
563 .create(name)
564 .typ(SystemError::Panic)?;
565 mem.as_file().set_len(size as u64).typ(SystemError::Panic)?;
566 mem.add_seals(&[FileSeal::SealShrink, FileSeal::SealGrow])
567 .typ(SystemError::Panic)?;
568
569 Ok(mem)
570 }
571
572 fn source(
573 name: impl AsRef<str>,
574 msg_size: usize,
575 max_num_msgs: usize,
576 ) -> TypedResult<(MmapMut, OwnedFd)> {
577 let mem = Self::memfd(name, SourceDatagram::size(msg_size, max_num_msgs))?;
578
579 let mut mmap = unsafe { MmapMut::map_mut(mem.as_raw_fd()).typ(SystemError::Panic)? };
580
581 mem.add_seals(&[FileSeal::SealSeal])
582 .typ(SystemError::Panic)?;
583
584 SourceDatagram::init_at(msg_size, max_num_msgs, mmap.as_mut());
585
586 Ok((mmap, mem.into_file().into()))
587 }
588
589 fn destination(
590 name: impl AsRef<str>,
591 msg_size: usize,
592 msg_capacity: usize,
593 ) -> TypedResult<(MmapMut, OwnedFd)> {
594 let mem = Self::memfd(name, DestinationDatagram::size(msg_size, msg_capacity))?;
595
596 let mut mmap = unsafe { MmapMut::map_mut(mem.as_raw_fd()).typ(SystemError::Panic)? };
597
598 mem.add_seals(&[FileSeal::SealSeal])
599 .typ(SystemError::Panic)?;
600
601 DestinationDatagram::init_at(msg_size, msg_capacity, mmap.as_mut());
602
603 Ok((mmap, mem.into_file().into()))
604 }
605
606 pub fn swap(&mut self) -> bool {
608 let mut source_datagram =
610 unsafe { SourceDatagram::load_from(self.source_receiver.as_mut()) };
611 let mut destination_datagram =
612 unsafe { DestinationDatagram::load_from(self.destination_sender.as_mut()) };
613
614 if let Some(clear_requested_at) = mem::take(destination_datagram.clear_requested_timestamp)
620 {
621 while source_datagram.message_queue.peek_then(|msg| {
622 msg.map_or(false, |msg| {
623 &clear_requested_at > Message::from_bytes(msg).timestamp
624 })
625 }) {
626 source_datagram.message_queue.pop_then(|_| ());
627 }
628 };
629
630 let mut num_msg_swapped = 0;
632 while let Some(_new_destination_msg) =
633 source_datagram.pop_then(|msg| destination_datagram.push(msg.to_bytes()).expect("push to always succeed, because source and destination datagrams can only contain `msg_capacity` messages in total"))
634 {
635 num_msg_swapped += 1;
636 }
637
638 *source_datagram.num_messages_in_destination = destination_datagram.message_queue.len();
639 *destination_datagram.has_overflowed = *source_datagram.has_overflowed;
640
641 trace!("Swapped {num_msg_swapped} messages: Destination={destination_datagram:?} Source={source_datagram:?}");
642
643 num_msg_swapped > 0
644 }
645
646 pub fn source_fd(&self) -> RawFd {
647 self.source.as_raw_fd()
648 }
649 pub fn destination_fd(&self) -> RawFd {
650 self.destination.as_raw_fd()
651 }
652}
653
654#[derive(Debug)]
655pub struct QueuingSource(MmapMut);
656
657impl QueuingSource {
658 pub fn write(&mut self, data: &[u8], message_timestamp: Instant) -> Option<usize> {
661 let mut datagram = unsafe { SourceDatagram::load_from(&mut self.0) };
662
663 let res = datagram.push(data, message_timestamp).map(|msg| *msg.len);
664
665 if res.is_some() {
666 *datagram.has_overflowed = false;
670 }
671
672 res
673 }
674
675 pub fn get_current_num_messages(&mut self) -> usize {
676 let datagram = unsafe { SourceDatagram::load_from(&mut self.0) };
677
678 datagram.message_queue.len() + *datagram.num_messages_in_destination
679 }
680}
681
682impl TryFrom<RawFd> for QueuingSource {
683 type Error = TypedError;
684
685 fn try_from(file: RawFd) -> Result<Self, Self::Error> {
686 let mmap = unsafe { MmapMut::map_mut(file).typ(SystemError::Panic)? };
687
688 Ok(Self(mmap))
689 }
690}
691
692impl QueuingDestination {
693 pub fn read(&mut self, buffer: &mut [u8]) -> Option<(usize, bool)> {
697 let mut datagram = unsafe { DestinationDatagram::load_from(&mut self.0) };
698
699 let read_bytes_and_overflowed_flag = datagram.pop_then(|msg| {
700 let data = msg.get_data();
701 let len = data.len().min(buffer.len());
702 buffer[..len].copy_from_slice(&data[..len]);
703
704 len
705 });
706
707 read_bytes_and_overflowed_flag
708 }
709
710 pub fn get_current_num_messages(&mut self) -> usize {
711 let datagram = unsafe { DestinationDatagram::load_from(&mut self.0) };
712 datagram.message_queue.len() + *datagram.num_messages_in_source
713 }
714
715 pub fn clear(&mut self, current_time: Instant) {
716 let datagram = unsafe { DestinationDatagram::load_from(&mut self.0) };
717 datagram.message_queue.clear();
718 *datagram.clear_requested_timestamp = Some(current_time);
719 }
720}
721
722#[derive(Debug)]
723pub struct QueuingDestination(MmapMut);
724
725impl TryFrom<RawFd> for QueuingDestination {
726 type Error = TypedError;
727
728 fn try_from(file: RawFd) -> Result<Self, Self::Error> {
729 let mmap = unsafe { MmapMut::map_mut(file).typ(SystemError::Panic)? };
730
731 Ok(Self(mmap))
732 }
733}
734
735trait StripFieldExt {
737 unsafe fn strip_field<T>(&self) -> (&T, &Self);
738 unsafe fn strip_field_mut<T>(&mut self) -> (&mut T, &mut Self);
739}
740
741impl StripFieldExt for [u8] {
742 unsafe fn strip_field<T>(&self) -> (&T, &Self) {
745 assert!(self.len() >= size_of::<T>());
746 let (field, rest) = self.split_at(size_of::<T>());
747 let field = (field.as_ptr() as *const T).as_ref().unwrap();
748 (field, rest)
749 }
750
751 unsafe fn strip_field_mut<T>(&mut self) -> (&mut T, &mut Self) {
753 assert!(self.len() >= size_of::<T>());
754 let (field, rest) = self.split_at_mut(size_of::<T>());
755 let field = (field.as_ptr() as *mut T).as_mut().unwrap();
756 (field, rest)
757 }
758}