rt 0.19.0

A real-time operating system capable of full preemption
Documentation
use core::{
    cell::UnsafeCell,
    marker::PhantomData,
    mem::{MaybeUninit, forget, size_of},
    panic::{RefUnwindSafe, UnwindSafe},
    ptr::from_ref,
};

use crate::{
    bindings::{
        RT_QUEUE_MAX_SIZE_, rt_queue, rt_queue_peek, rt_queue_pop, rt_queue_push,
        rt_queue_timedpeek, rt_queue_timedpop, rt_queue_timedpush, rt_queue_trypeek,
        rt_queue_trypop, rt_queue_trypush,
    },
    cell::SyncUnsafeCell,
    ptr_macros::{ptr_to_field, ptr_to_field_mut},
    sync::sem::c_sem_init,
    tick::Utick,
};

/// A multi-producer, multi-consumer, bounded queue.
///
/// `Queue` supports blocking, non-blocking, and timed push, pop, and peek operations. The queue
/// allows up to `n` tasks to access it concurrently, where `n` is the depth of the queue.
pub struct Queue<T: ?Sized> {
    pub(crate) queue: UnsafeCell<rt_queue>,
    _phantom_data: PhantomData<T>,
}

unsafe impl<T: ?Sized + Send> Send for Queue<T> {}
unsafe impl<T: ?Sized + Send> Sync for Queue<T> {}
impl<T: ?Sized> UnwindSafe for Queue<T> {}
impl<T: ?Sized> RefUnwindSafe for Queue<T> {}

impl<T> Queue<T> {
    /// Initialize a new `Queue` with the given slot and data buffers.
    ///
    /// # Safety
    ///
    /// This must be called with a pointer to a `static` `Queue` and be used to initialize that
    /// same `Queue`. Users should use the `rt::sync::queue!` macro to create a `Queue` rather than
    /// call `Queue::init` directly.
    #[must_use]
    pub const unsafe fn init<const N: usize>(
        this: *const Self,
        slots: &'static [SyncUnsafeCell<u8>; N],
        data: &'static [SyncUnsafeCell<MaybeUninit<T>>; N],
    ) -> Queue<T> {
        const {
            assert!(N <= RT_QUEUE_MAX_SIZE_, "queue is too large");
        }

        let queue = UnsafeCell::raw_get(ptr_to_field!(this, queue));
        let slots_ptr = SyncUnsafeCell::raw_get(slots.as_ptr());
        let data_ptr = SyncUnsafeCell::raw_get(data.as_ptr()).cast();
        Queue {
            queue: UnsafeCell::new(rt_queue {
                push_sem: c_sem_init(ptr_to_field_mut!(queue, push_sem), N as i32, i32::MAX),
                pop_sem: c_sem_init(ptr_to_field_mut!(queue, pop_sem), 0, i32::MAX),
                enq: 0,
                deq: 0,
                slots: slots_ptr,
                data: data_ptr,
                num_elems: N,
                elem_size: size_of::<T>(),
            }),
            _phantom_data: PhantomData,
        }
    }

    /// Adds an element to the queue, blocking until there is space in the queue.
    #[inline]
    pub fn push(&self, elem: T) {
        let queue = self.queue.get();
        let ptr = from_ref(&elem).cast();
        unsafe {
            rt_queue_push(queue, ptr);
        }
        forget(elem);
    }

    /// Removes an element from the queue, blocking while the queue is empty.
    #[inline]
    pub fn pop(&self) -> T {
        let mut elem = MaybeUninit::<T>::uninit();
        let queue = self.queue.get();
        let ptr = elem.as_mut_ptr().cast();
        unsafe {
            rt_queue_pop(queue, ptr);
            elem.assume_init()
        }
    }

    /// Adds an element to the queue if there is space and returns the element to the caller
    /// otherwise.
    #[inline]
    pub fn try_push(&self, elem: T) -> Result<(), T> {
        let queue = self.queue.get();
        let ptr = from_ref(&elem).cast();
        if unsafe { rt_queue_trypush(queue, ptr) } {
            forget(elem);
            Ok(())
        } else {
            Err(elem)
        }
    }

    /// Removes an element from the queue if it is non-empty.
    #[inline]
    pub fn try_pop(&self) -> Option<T> {
        let mut elem = MaybeUninit::<T>::uninit();
        let queue = self.queue.get();
        let ptr = elem.as_mut_ptr().cast();
        if unsafe { rt_queue_trypop(queue, ptr) } {
            Some(unsafe { elem.assume_init() })
        } else {
            None
        }
    }

    #[inline]
    pub fn timed_push(&self, elem: T, ticks: Utick) -> Result<(), T> {
        let queue = self.queue.get();
        let ptr = from_ref(&elem).cast();
        if unsafe { rt_queue_timedpush(queue, ptr, ticks) } {
            forget(elem);
            Ok(())
        } else {
            Err(elem)
        }
    }

    #[inline]
    pub fn timed_pop(&self, ticks: Utick) -> Option<T> {
        let mut elem = MaybeUninit::<T>::uninit();
        let queue = self.queue.get();
        let ptr = elem.as_mut_ptr().cast();
        if unsafe { rt_queue_timedpop(queue, ptr, ticks) } {
            Some(unsafe { elem.assume_init() })
        } else {
            None
        }
    }
}

/// Peeking at elements requires `T: Clone` because the original instance of the element still
/// exists in the queue. Unlike `pop`, which moves the element out of the queue, `peek` copies the
/// element into a local variable in order to call clone on it, returns the clone, and then does
/// not drop its local copy as it's wrapped in a MaybeUninit.
impl<T: Clone> Queue<T> {
    /// Gets the next element of the queue, blocking while the queue is empty.
    #[inline]
    pub fn peek(&self) -> T {
        let mut elem = MaybeUninit::<T>::uninit();
        let queue = self.queue.get();
        let ptr = elem.as_mut_ptr().cast();
        unsafe {
            rt_queue_peek(queue, ptr);
            elem.assume_init_ref()
        }
        .clone()
    }

    /// Gets the next element of the queue if it is non-empty.
    #[inline]
    pub fn try_peek(&self) -> Option<T> {
        let mut elem = MaybeUninit::<T>::uninit();
        let queue = self.queue.get();
        let ptr = elem.as_mut_ptr().cast();
        if unsafe { rt_queue_trypeek(queue, ptr) } {
            Some(unsafe { elem.assume_init_ref() }.clone())
        } else {
            None
        }
    }

    #[inline]
    pub fn timed_peek(&self, ticks: Utick) -> Option<T> {
        let mut elem = MaybeUninit::<T>::uninit();
        let queue = self.queue.get();
        let ptr = elem.as_mut_ptr().cast();
        if unsafe { rt_queue_timedpeek(queue, ptr, ticks) } {
            Some(unsafe { elem.assume_init_ref() }.clone())
        } else {
            None
        }
    }
}

#[macro_export]
macro_rules! queue {
    ($name: ident, $type: ty, $num: expr) => {
        static $name: $crate::sync::Queue<$type> = {
            use core::mem::MaybeUninit;

            use $crate::cell::SyncUnsafeCell;

            let ptr = &raw const $name;
            static SLOTS: [SyncUnsafeCell<u8>; $num] = [const { SyncUnsafeCell::new(0) }; $num];
            static DATA: [SyncUnsafeCell<MaybeUninit<$type>>; $num] =
                [const { SyncUnsafeCell::new(MaybeUninit::zeroed()) }; $num];
            unsafe { $crate::sync::Queue::init(ptr, &SLOTS, &DATA) }
        };
    };
}

#[cfg(test)]
mod tests {
    queue!(QUEUE, i32, 10);

    #[test]
    fn fast_path() {
        QUEUE.push(1);
        assert_eq!(QUEUE.pop(), 1);
        assert_eq!(QUEUE.try_pop(), None);
    }
}