embedded-svc 0.24.0

A set of traits for services higher level than embedded-hal and typically found in embedded microcontrollers with WiFi or BLE support.
Documentation
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};

extern crate alloc;
use alloc::sync::Arc;

use crate::utils::mutex::{Condvar, Mutex, RawCondvar};

#[cfg(all(feature = "nightly", feature = "experimental"))]
pub use async_traits_impl::*;

pub struct AsyncPostbox<U, P, PB> {
    unblocker: U,
    blocking_postbox: PB,
    _payload_type: PhantomData<fn() -> P>,
}

impl<U, P, PB> AsyncPostbox<U, P, PB> {
    pub const fn new(unblocker: U, blocking_postbox: PB) -> Self {
        Self {
            unblocker,
            blocking_postbox,
            _payload_type: PhantomData,
        }
    }

    pub async fn send(&self, value: P)
    where
        P: Clone + Send + 'static,
        PB: crate::event_bus::Postbox<P> + Clone + Send + Sync + 'static,
    {
        self.blocking_postbox
            .post(&value, None)
            .map(|_| ())
            .unwrap()
    }
}

impl<U, P, PB> Clone for AsyncPostbox<U, P, PB>
where
    U: Clone,
    PB: Clone,
{
    fn clone(&self) -> Self {
        Self {
            unblocker: self.unblocker.clone(),
            blocking_postbox: self.blocking_postbox.clone(),
            _payload_type: PhantomData,
        }
    }
}

pub struct SubscriptionState<P, S> {
    subscription: Option<S>,
    value: Option<P>,
    waker: Option<Waker>,
}

#[allow(clippy::type_complexity)]
pub struct AsyncSubscription<CV, P, S>(
    Arc<(Mutex<CV::RawMutex, SubscriptionState<P, S>>, Condvar<CV>)>,
)
where
    CV: RawCondvar,
    P: Send,
    S: Send;

impl<CV, P, S> AsyncSubscription<CV, P, S>
where
    CV: RawCondvar + Send + Sync,
    CV::RawMutex: Send + Sync,
    S: Send,
    P: Clone + Send,
{
    pub async fn recv(&mut self) -> P {
        NextFuture(self).await
    }
}

pub struct NextFuture<'a, CV, P, S>(&'a AsyncSubscription<CV, P, S>)
where
    CV: RawCondvar + Send + Sync,
    CV::RawMutex: Send + Sync,
    P: Clone + Send,
    S: Send;

impl<'a, CV, P, S> Drop for NextFuture<'a, CV, P, S>
where
    CV: RawCondvar + Send + Sync,
    CV::RawMutex: Send + Sync,
    P: Clone + Send,
    S: Send,
{
    fn drop(&mut self) {
        let mut state = self.0 .0 .0.lock();
        state.waker = None;
    }
}

impl<'a, CV, P, S> Future for NextFuture<'a, CV, P, S>
where
    CV: RawCondvar + Send + Sync,
    CV::RawMutex: Send + Sync,
    P: Clone + Send,
    S: Send,
{
    type Output = P;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut state = self.0 .0 .0.lock();

        let value = mem::replace(&mut state.value, None);

        if let Some(value) = value {
            self.0 .0 .1.notify_all();

            Poll::Ready(value)
        } else {
            state.waker = Some(cx.waker().clone());

            self.0 .0 .1.notify_all();

            Poll::Pending
        }
    }
}

pub struct AsyncEventBus<U, CV, E> {
    unblocker: U,
    event_bus: E,
    _condvar_type: PhantomData<fn() -> CV>,
}

impl<U, CV, E> AsyncEventBus<U, CV, E> {
    pub const fn new(unblocker: U, event_bus: E) -> Self {
        Self {
            unblocker,
            event_bus,
            _condvar_type: PhantomData,
        }
    }
}

impl<U, CV, E> AsyncEventBus<U, CV, E>
where
    CV: RawCondvar + Send + Sync + 'static,
    CV::RawMutex: Send + Sync + 'static,
{
    pub fn subscribe<P>(&self) -> Result<AsyncSubscription<CV, P, E::Subscription>, E::Error>
    where
        P: Clone + Send + 'static,
        E: crate::event_bus::EventBus<P>,
        E::Subscription: Send + 'static,
    {
        let state = Arc::new((
            Mutex::new(SubscriptionState {
                subscription: None,
                value: None,
                waker: None,
            }),
            Condvar::new(),
        ));

        let subscription_state = Arc::downgrade(&state);

        let subscription = self.event_bus.subscribe(move |payload| {
            if let Some(state) = subscription_state.upgrade() {
                let pair: &(Mutex<CV::RawMutex, _>, Condvar<CV>) = &state;

                let (mut state, condvar) = (pair.0.lock(), &pair.1);

                while state.value.is_some() {
                    if let Some(waker) = mem::replace(&mut state.waker, None) {
                        waker.wake();
                    }

                    state = condvar.wait(state);
                }

                state.value = Some(payload.clone());

                if let Some(waker) = mem::replace(&mut state.waker, None) {
                    waker.wake();
                }
            }
        })?;

        state.0.lock().subscription = Some(subscription);

        Ok(AsyncSubscription(state))
    }
}

impl<CV, E> AsyncEventBus<(), CV, E>
where
    CV: RawCondvar + Send + Sync + 'static,
{
    pub fn postbox<P>(&self) -> Result<AsyncPostbox<(), P, E::Postbox>, E::Error>
    where
        P: Clone + Send + 'static,
        E::Postbox: Clone + Send + 'static,
        E: crate::event_bus::PostboxProvider<P>,
        E::Error: Send + Sync + 'static,
    {
        self.event_bus
            .postbox()
            .map(|blocking_postbox| AsyncPostbox::new((), blocking_postbox))
    }
}

impl<U, CV, E> Clone for AsyncEventBus<U, CV, E>
where
    U: Clone,
    E: Clone,
{
    fn clone(&self) -> Self {
        Self {
            unblocker: self.unblocker.clone(),
            event_bus: self.event_bus.clone(),
            _condvar_type: PhantomData,
        }
    }
}

#[cfg(all(feature = "nightly", feature = "experimental"))]
mod async_traits_impl {
    use core::future::Future;

    use crate::event_bus::asynch::{ErrorType, EventBus, PostboxProvider, Receiver, Sender};
    use crate::executor::asynch::Unblocker;
    use crate::utils::asyncify::{AsyncWrapper, UnblockingAsyncWrapper};
    use crate::utils::mutex::RawCondvar;

    use super::{AsyncEventBus, AsyncPostbox, AsyncSubscription, NextFuture};

    impl<U, P, PB> Sender for AsyncPostbox<U, P, PB>
    where
        U: Unblocker,
        P: Clone + Send + 'static,
        PB: crate::event_bus::Postbox<P> + Clone + Send + Sync + 'static,
    {
        type Data = P;

        type SendFuture<'a>
        = U::UnblockFuture<()> where Self: 'a;

        fn send(&self, value: Self::Data) -> Self::SendFuture<'_> {
            let value = value;
            let blocking_postbox = self.blocking_postbox.clone();

            self.unblocker
                .unblock(move || blocking_postbox.post(&value, None).map(|_| ()).unwrap())
        }
    }

    impl<P, PB> Sender for AsyncPostbox<(), P, PB>
    where
        P: Clone + Send + 'static,
        PB: crate::event_bus::Postbox<P> + Clone + Send + Sync + 'static,
    {
        type Data = P;

        type SendFuture<'a>
        = impl Future<Output = ()> + 'a where Self: 'a;

        fn send(&self, value: Self::Data) -> Self::SendFuture<'_> {
            async move { AsyncPostbox::send(self, value).await }
        }
    }

    impl<P, PB> AsyncWrapper<PB> for AsyncPostbox<(), P, PB> {
        fn new(sync: PB) -> Self {
            AsyncPostbox::new((), sync)
        }
    }

    impl<U, P, PB> UnblockingAsyncWrapper<U, PB> for AsyncPostbox<U, P, PB> {
        fn new(unblocker: U, sync: PB) -> Self {
            AsyncPostbox::new(unblocker, sync)
        }
    }

    impl<CV, P, S> Receiver for AsyncSubscription<CV, P, S>
    where
        CV: RawCondvar + Send + Sync,
        CV::RawMutex: Send + Sync,
        S: Send,
        P: Clone + Send,
    {
        type Data = P;

        type RecvFuture<'a>
        = NextFuture<'a, CV, P, S> where Self: 'a;

        fn recv(&self) -> Self::RecvFuture<'_> {
            NextFuture(self)
        }
    }

    impl<U, CV, E> UnblockingAsyncWrapper<U, E> for AsyncEventBus<U, CV, E> {
        fn new(unblocker: U, sync: E) -> Self {
            AsyncEventBus::new(unblocker, sync)
        }
    }

    impl<CV, E> AsyncWrapper<E> for AsyncEventBus<(), CV, E> {
        fn new(sync: E) -> Self {
            AsyncEventBus::new((), sync)
        }
    }

    impl<U, CV, E> ErrorType for AsyncEventBus<U, CV, E>
    where
        E: ErrorType,
    {
        type Error = E::Error;
    }

    impl<U, CV, P, E> EventBus<P> for AsyncEventBus<U, CV, E>
    where
        CV: RawCondvar + Send + Sync + 'static,
        CV::RawMutex: Send + Sync + 'static,
        P: Clone + Send + 'static,
        E: crate::event_bus::EventBus<P>,
        E::Subscription: Send + 'static,
    {
        type Subscription = AsyncSubscription<CV, P, E::Subscription>;

        fn subscribe(&self) -> Result<Self::Subscription, Self::Error> {
            AsyncEventBus::subscribe(self)
        }
    }

    impl<U, CV, P, E> PostboxProvider<P> for AsyncEventBus<U, CV, E>
    where
        U: Unblocker + Clone,
        CV: RawCondvar + Send + Sync + 'static,
        P: Clone + Send + 'static,
        E::Postbox: Clone + Send + Sync + 'static,
        E: crate::event_bus::PostboxProvider<P>,
        Self::Error: Send + Sync + 'static,
    {
        type Postbox = AsyncPostbox<U, P, E::Postbox>;

        fn postbox(&self) -> Result<Self::Postbox, Self::Error> {
            self.event_bus
                .postbox()
                .map(|blocking_postbox| AsyncPostbox::new(self.unblocker.clone(), blocking_postbox))
        }
    }

    impl<CV, P, E> PostboxProvider<P> for AsyncEventBus<(), CV, E>
    where
        CV: RawCondvar + Send + Sync + 'static,
        P: Clone + Send + 'static,
        E::Postbox: Clone + Send + Sync + 'static,
        E: crate::event_bus::PostboxProvider<P>,
        Self::Error: Send + Sync + 'static,
    {
        type Postbox = AsyncPostbox<(), P, E::Postbox>;

        fn postbox(&self) -> Result<Self::Postbox, Self::Error> {
            AsyncEventBus::postbox(self)
        }
    }
}