titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use crate::error::{RecvError, SendError, TryRecvError};
use crate::io::base::{BaseRx, BaseTx, RxMarker, RxPairExt, TxMarker, TxPairExt};
use crate::utils::CancelToken;
use crossbeam::utils::Backoff;
use ringbuf::consumer::Consumer;
use ringbuf::producer::Producer;
use ringbuf::traits::Split;
use ringbuf::{HeapCons, HeapProd, HeapRb};
use std::thread;
use std::time::{Duration, Instant};

pub struct RingBuffer;

impl RingBuffer {
    pub fn bounded<T>(capacity: usize) -> (RingSender<T>, RingReceiver<T>) {
        // HeapRb — thread-safe storage; split() returns producer/consumer halves.
        let rb = HeapRb::<T>::new(capacity);
        let (prod, cons) = rb.split();

        (RingSender { prod }, RingReceiver { cons })
    }
}

pub struct RingSender<E> {
    prod: HeapProd<E>,
}

impl<E: Send + 'static + Clone> BaseTx for RingSender<E> {
    type EventType = E;

    #[inline]
    fn try_send(&mut self, a: E) -> Result<(), SendError<E>> {
        // Producer has non-blocking try_push: Ok(()) or Err(a) if the buffer is full.
        self.prod.try_push(a).map_err(|v| SendError::full(Some(v)))
    }

    fn send(
        &mut self,
        mut a: E,
        cancel: &CancelToken,
        timeout: Option<Duration>,
    ) -> Result<(), SendError<E>> {
        let start = Instant::now();
        let backoff = Backoff::new();
        let mut spins: u32 = 0;

        loop {
            if cancel.is_cancelled() {
                return Err(SendError::cancelled(Some(a)));
            }
            if let Some(t) = timeout
                && start.elapsed() >= t
            {
                return Err(SendError::timeout(Some(a)));
            }

            match self.prod.try_push(a) {
                Ok(()) => return Ok(()),
                Err(aa) => {
                    a = aa; // buffer is full, we keep ownership and continue retrying
                    spins = spins.saturating_add(1);
                    if spins < 64 {
                        backoff.spin();
                    } else if spins < 256 {
                        backoff.snooze(); // roughly a yield
                    } else {
                        thread::sleep(Duration::from_micros(2));
                    }
                }
            }
        }
    }
}

pub struct RingReceiver<E> {
    cons: HeapCons<E>,
}

impl<E: Send + 'static> BaseRx for RingReceiver<E> {
    type EventType = E;

    #[inline]
    fn try_recv(&mut self) -> Result<E, TryRecvError> {
        self.cons.try_pop().ok_or(TryRecvError::Empty)
    }

    fn recv(&mut self, cancel: &CancelToken, timeout: Option<Duration>) -> Result<E, RecvError> {
        let start = Instant::now();
        let backoff = Backoff::new();
        let mut spins: u32 = 0;

        loop {
            if cancel.is_cancelled() {
                return Err(RecvError::Cancelled);
            }
            if let Some(t) = timeout
                && start.elapsed() >= t
            {
                return Err(RecvError::Timeout);
            }

            match self.cons.try_pop() {
                Some(a) => return Ok(a),
                None => {
                    spins = spins.saturating_add(1);
                    if spins < 64 {
                        backoff.spin();
                    } else if spins < 256 {
                        backoff.snooze(); // roughly a yield
                    } else {
                        thread::sleep(Duration::from_micros(2));
                    }
                }
            }
        }
    }
}

impl<E: Send + 'static + Clone> RxPairExt for RingReceiver<E> {
    type TxHalf = RingSender<E>;

    fn bound(cap: usize) -> (Self::TxHalf, Self) {
        RingBuffer::bounded(cap)
    }
    fn unbound() -> (Self::TxHalf, Self) {
        RingBuffer::bounded(99_999)
    }
}

impl<E: Send + 'static + Clone> TxPairExt for RingSender<E> {
    type RxHalf = RingReceiver<E>;

    fn bound(cap: usize) -> (Self, Self::RxHalf) {
        RingBuffer::bounded(cap)
    }
    fn unbound() -> (Self, Self::RxHalf) {
        RingBuffer::bounded(99_999)
    }
}

impl<E: Send + 'static> TxMarker for RingSender<E> {}

impl<E: Send + 'static> RxMarker for RingReceiver<E> {}