dust_dds 0.15.0

Data Distribution Service (DDS) implementation
Documentation
use core::{
    cell::RefCell,
    future::Future,
    pin::Pin,
    task::{Context, Poll, Waker},
};

use alloc::{collections::VecDeque, sync::Arc};

use crate::infrastructure::error::DdsError;

use critical_section::Mutex;

pub fn mpsc_channel<T>() -> (MpscSender<T>, MpscReceiver<T>) {
    let inner = Arc::new(Mutex::new(RefCell::new(MpscInner {
        data: VecDeque::with_capacity(64),
        waker: None,
        is_closed: false,
    })));
    (
        MpscSender {
            inner: inner.clone(),
        },
        MpscReceiver { inner },
    )
}

struct MpscInner<T> {
    data: VecDeque<T>,
    waker: Option<Waker>,
    is_closed: bool,
}

impl<T> core::fmt::Debug for MpscInner<T> {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        f.debug_struct("MpscInner")
            .field("data_length", &self.data.len())
            .field("waker", &self.waker)
            .finish()
    }
}

#[derive(Debug)]
pub enum MpscSenderError {
    Closed,
}

impl From<MpscSenderError> for DdsError {
    fn from(_value: MpscSenderError) -> Self {
        DdsError::AlreadyDeleted
    }
}

pub struct MpscSender<T> {
    inner: Arc<Mutex<RefCell<MpscInner<T>>>>,
}

impl<T> Clone for MpscSender<T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}

impl<T> core::fmt::Debug for MpscSender<T> {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        f.debug_struct("MpscSender")
            .field("inner", &self.inner)
            .finish()
    }
}

impl<T> MpscSender<T> {
    pub async fn send(&self, value: T) -> Result<(), MpscSenderError> {
        critical_section::with(|cs| {
            let mut inner_lock = self.inner.borrow(cs).borrow_mut();
            if inner_lock.is_closed {
                Err(MpscSenderError::Closed)
            } else {
                inner_lock.data.push_back(value);
                if let Some(w) = inner_lock.waker.take() {
                    w.wake();
                }
                Ok(())
            }
        })
    }
}

pub struct MpscReceiver<T> {
    inner: Arc<Mutex<RefCell<MpscInner<T>>>>,
}

impl<T> MpscReceiver<T> {
    pub async fn receive(&self) -> Option<T> {
        MpscReceiverFuture {
            inner: self.inner.clone(),
        }
        .await
    }
}

struct MpscReceiverFuture<T> {
    inner: Arc<Mutex<RefCell<MpscInner<T>>>>,
}

impl<T> Future for MpscReceiverFuture<T> {
    type Output = Option<T>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        critical_section::with(|cs| {
            let mut inner_lock = self.inner.borrow(cs).borrow_mut();
            if let Some(value) = inner_lock.data.pop_front() {
                Poll::Ready(Some(value))
            } else if inner_lock.is_closed {
                Poll::Ready(None)
            } else {
                inner_lock.waker.replace(cx.waker().clone());
                Poll::Pending
            }
        })
    }
}