embedded-svc 0.24.0

A set of traits for services higher level than embedded-hal and typically found in embedded microcontrollers with WiFi or BLE support.
Documentation
#[cfg(feature = "alloc")]
pub mod server {
    use core::future::Future;
    use core::marker::PhantomData;
    use core::pin::Pin;
    use core::task::{Context, Poll, Waker};
    use core::{mem, slice};

    extern crate alloc;
    use alloc::sync::Arc;

    use heapless;

    use crate::macros::svc_log;
    use crate::utils::mutex::{Condvar, Mutex, RawCondvar};
    use crate::ws::{callback_server::*, *};

    #[cfg(all(feature = "nightly", feature = "experimental"))]
    pub use async_traits_impl::*;

    #[allow(dead_code)]
    pub struct AsyncSender<U, S> {
        unblocker: U,
        sender: S,
    }

    impl<S> AsyncSender<(), S>
    where
        S: Sender + SessionProvider + Send + Clone + 'static,
    {
        pub async fn send_blocking(
            &mut self,
            frame_type: FrameType,
            frame_data: &[u8],
        ) -> Result<(), S::Error> {
            async move {
                svc_log!(
                    info,
                    "Sending data (frame_type={:?}, frame_len={}) to WS connection {:?}",
                    frame_type,
                    frame_data.len(),
                    self.sender.session()
                );

                self.sender.send(frame_type, frame_data)
            }
            .await
        }
    }

    #[cfg(feature = "nightly")]
    impl<U, S> AsyncSender<U, S>
    where
        U: crate::executor::asynch::Unblocker,
        S: Sender + SessionProvider + Send + Clone + 'static,
        S::Error: Send + Sync + 'static,
    {
        pub async fn send(
            &mut self,
            frame_type: FrameType,
            frame_data: &[u8],
        ) -> Result<(), S::Error> {
            #[cfg(not(feature = "std"))]
            use alloc::borrow::ToOwned;

            svc_log!(
                info,
                "Sending data (frame_type={:?}, frame_len={}) to WS connection {:?}",
                frame_type,
                frame_data.len(),
                self.sender.session()
            );

            let mut sender = self.sender.clone();
            let frame_data: alloc::vec::Vec<u8> = frame_data.to_owned();

            self.unblocker
                .unblock(move || sender.send(frame_type, &frame_data))
                .await
        }
    }

    #[allow(dead_code)]
    pub struct AsyncReceiver<C, E>
    where
        C: RawCondvar,
    {
        shared: Arc<Mutex<C::RawMutex, SharedReceiverState>>,
        condvar: Arc<Condvar<C>>,
        _ep: PhantomData<fn() -> E>,
    }

    impl<C, E> AsyncReceiver<C, E>
    where
        C: RawCondvar + Send + Sync,
        C::RawMutex: Send + Sync,
    {
        pub fn recv<'a>(
            &'a mut self,
            frame_data_buf: &'a mut [u8],
        ) -> AsyncReceiverFuture<'a, C, E> {
            AsyncReceiverFuture {
                receiver: self,
                frame_data_buf,
                _ep: PhantomData,
            }
        }
    }

    pub enum ReceiverData {
        None,
        Metadata((FrameType, usize)),
        Data(*mut u8),
        DataCopied,
        Closed,
    }

    unsafe impl Send for ReceiverData {}

    pub struct SharedReceiverState {
        waker: Option<Waker>,
        data: ReceiverData,
    }

    pub struct ConnectionState<M, S> {
        session: S,
        receiver_state: Arc<M>,
    }

    pub struct AsyncReceiverFuture<'a, C, E>
    where
        C: RawCondvar,
    {
        receiver: &'a mut AsyncReceiver<C, E>,
        frame_data_buf: &'a mut [u8],
        _ep: PhantomData<fn() -> E>,
    }

    impl<'a, C, E> Future for AsyncReceiverFuture<'a, C, E>
    where
        C: RawCondvar,
    {
        type Output = Result<(FrameType, usize), E>;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let frame_data_buf_ptr = self.frame_data_buf.as_mut_ptr();
            let mut shared = self.receiver.shared.lock();

            if let ReceiverData::Metadata((frame_type, size)) = shared.data {
                if self.frame_data_buf.len() >= size {
                    shared.data = ReceiverData::Data(frame_data_buf_ptr);

                    self.receiver.condvar.notify_all();

                    while !matches!(shared.data, ReceiverData::DataCopied) {
                        shared = self.receiver.condvar.wait(shared);
                    }

                    shared.data = ReceiverData::None;
                    self.receiver.condvar.notify_all();
                }

                Poll::Ready(Ok((frame_type, size)))
            } else if let ReceiverData::Closed = shared.data {
                Poll::Ready(Ok((FrameType::Close, 0)))
            } else {
                shared.waker = Some(cx.waker().clone());
                Poll::Pending
            }
        }
    }

    #[allow(clippy::type_complexity)]
    pub struct SharedAcceptorState<C, S>
    where
        C: RawCondvar + Send + Sync,
        C::RawMutex: Send + Sync,
        S: Send,
    {
        waker: Option<Waker>,
        data: Option<Option<(Arc<Mutex<C::RawMutex, SharedReceiverState>>, S)>>,
    }

    pub struct AsyncAcceptor<U, C, S>
    where
        C: RawCondvar + Send + Sync,
        C::RawMutex: Send + Sync,
        S: Send,
    {
        unblocker: U,
        accept: Arc<Mutex<C::RawMutex, SharedAcceptorState<C, S>>>,
        condvar: Arc<Condvar<C>>,
    }

    impl<U, C, S> AsyncAcceptor<U, C, S>
    where
        C: RawCondvar + Send + Sync,
        C::RawMutex: Send + Sync,
        S: Sender + SessionProvider + Send + Clone + 'static,
        S::Error: Send + Sync + 'static,
    {
        pub fn accept(&self) -> &AsyncAcceptor<U, C, S> {
            self
        }
    }

    impl<'a, U, C, S> Future for &'a AsyncAcceptor<U, C, S>
    where
        U: Clone,
        C: RawCondvar + Send + Sync,
        C::RawMutex: Send + Sync,
        S: Sender + Send + Clone + 'static,
    {
        type Output = Result<
            (AsyncSender<U, S>, AsyncReceiver<C, <S as ErrorType>::Error>),
            <S as ErrorType>::Error,
        >;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let mut accept = self.accept.lock();

            match mem::replace(&mut accept.data, None) {
                Some(Some((shared, sender))) => {
                    let sender = AsyncSender {
                        unblocker: self.unblocker.clone(),
                        sender,
                    };

                    let receiver = AsyncReceiver {
                        shared,
                        condvar: self.condvar.clone(),
                        _ep: PhantomData,
                    };

                    self.condvar.notify_all();

                    Poll::Ready(Ok((sender, receiver)))
                }
                Some(None) => {
                    accept.data = Some(None);
                    Poll::Pending
                }
                None => {
                    accept.waker = Some(cx.waker().clone());
                    Poll::Pending
                }
            }
        }
    }

    #[allow(clippy::type_complexity)]
    pub struct Processor<const N: usize, const F: usize, C, W>
    where
        C: RawCondvar + Send + Sync,
        C::RawMutex: Send + Sync,
        W: SenderFactory + SessionProvider,
        W::Sender: Send,
    {
        connections:
            heapless::Vec<ConnectionState<Mutex<C::RawMutex, SharedReceiverState>, W::Session>, N>,
        frame_data_buf: [u8; F],
        accept: Arc<Mutex<C::RawMutex, SharedAcceptorState<C, W::Sender>>>,
        condvar: Arc<Condvar<C>>,
    }

    impl<const N: usize, const F: usize, C, W> Processor<N, F, C, W>
    where
        C: RawCondvar + Send + Sync,
        C::RawMutex: Send + Sync,
        W: SenderFactory + SessionProvider,
        W::Sender: Send,
    {
        pub fn new<U>(unblocker: U) -> (Self, AsyncAcceptor<U, C, W::Sender>) {
            let this = Self {
                connections: heapless::Vec::new(),
                frame_data_buf: [0_u8; F],
                accept: Arc::new(Mutex::new(SharedAcceptorState {
                    waker: None,
                    data: None,
                })),
                condvar: Arc::new(Condvar::new()),
            };

            let acceptor = AsyncAcceptor {
                unblocker,
                accept: this.accept.clone(),
                condvar: this.condvar.clone(),
            };

            (this, acceptor)
        }

        pub fn process<'a>(&'a mut self, connection: &'a mut W) -> Result<(), W::Error>
        where
            W: Sender + Receiver,
        {
            if connection.is_new() {
                let session = connection.session();

                svc_log!(info, "New WS connection {:?}", session);

                if !self.process_accept(session, connection) {
                    return connection.send(FrameType::Close, &[]);
                }
            } else if connection.is_closed() {
                let session = connection.session();

                if let Some(index) = self
                    .connections
                    .iter()
                    .enumerate()
                    .find_map(|(index, conn)| (conn.session == session).then(|| index))
                {
                    let conn = self.connections.swap_remove(index);

                    Self::process_receive_close(&conn.receiver_state);
                    svc_log!(info, "Closed WS connection {:?}", session);
                }
            } else {
                let session = connection.session();
                let (frame_type, len) = connection.recv(&mut self.frame_data_buf)?;

                svc_log!(
                    info,
                    "Incoming data (frame_type={:?}, frame_len={}) from WS connection {:?}",
                    frame_type,
                    len,
                    session
                );

                if let Some(connection) = self
                    .connections
                    .iter()
                    .find(|connection| connection.session == session)
                {
                    self.process_receive(&connection.receiver_state, frame_type, len)
                }
            }

            Ok(())
        }

        fn process_accept<'a>(&'a mut self, session: W::Session, sender: &'a mut W) -> bool {
            if self.connections.len() < N {
                let receiver_state = Arc::new(Mutex::new(SharedReceiverState {
                    waker: None,
                    data: ReceiverData::None,
                }));

                let state = ConnectionState {
                    session,
                    receiver_state: receiver_state.clone(),
                };

                self.connections
                    .push(state)
                    .unwrap_or_else(|_| unreachable!());

                let sender = sender.create().unwrap();

                let mut accept = self.accept.lock();

                accept.data = Some(Some((receiver_state, sender)));

                if let Some(waker) = mem::replace(&mut accept.waker, None) {
                    waker.wake();
                }

                while accept.data.is_some() {
                    accept = self.condvar.wait(accept);
                }

                true
            } else {
                false
            }
        }

        fn process_receive(
            &self,
            state: &Mutex<C::RawMutex, SharedReceiverState>,
            frame_type: FrameType,
            len: usize,
        ) {
            let mut shared = state.lock();

            shared.data = ReceiverData::Metadata((frame_type, len));

            if let Some(waker) = mem::replace(&mut shared.waker, None) {
                waker.wake();
            }

            loop {
                if let ReceiverData::Data(buf) = &shared.data {
                    unsafe { slice::from_raw_parts_mut(*buf, len) }
                        .copy_from_slice(&self.frame_data_buf[..len]);
                    shared.data = ReceiverData::DataCopied;
                    self.condvar.notify_all();

                    break;
                }

                shared = self.condvar.wait(shared);
            }

            while !matches!(shared.data, ReceiverData::None) {
                shared = self.condvar.wait(shared);
            }
        }

        fn process_accept_close(&mut self) {
            let mut accept = self.accept.lock();

            accept.data = Some(None);

            if let Some(waker) = mem::replace(&mut accept.waker, None) {
                waker.wake();
            }
        }

        fn process_receive_close(state: &Mutex<C::RawMutex, SharedReceiverState>) {
            let mut shared = state.lock();

            shared.data = ReceiverData::Closed;

            if let Some(waker) = mem::replace(&mut shared.waker, None) {
                waker.wake();
            }
        }
    }

    impl<const N: usize, const F: usize, C, W> Drop for Processor<N, F, C, W>
    where
        C: RawCondvar + Send + Sync,
        C::RawMutex: Send + Sync,
        W: SenderFactory + SessionProvider,
        W::Sender: Send,
    {
        fn drop(&mut self) {
            self.process_accept_close();
        }
    }

    #[cfg(all(feature = "nightly", feature = "experimental"))]
    mod async_traits_impl {
        use core::fmt::Debug;
        use core::future::Future;
        use core::marker::PhantomData;

        use crate::executor::asynch::Unblocker;
        use crate::utils::mutex::RawCondvar;
        use crate::ws::{callback_server::*, *};

        use super::{AsyncAcceptor, AsyncReceiver, AsyncReceiverFuture, AsyncSender};

        impl<U, S> ErrorType for AsyncSender<U, S>
        where
            S: ErrorType,
        {
            type Error = S::Error;
        }

        impl<U, S> asynch::Sender for AsyncSender<U, S>
        where
            U: Unblocker,
            S: Sender + SessionProvider + Send + Clone + 'static,
            S::Error: Send + Sync + 'static,
        {
            type SendFuture<'a>
            = impl Future<Output = Result<(), Self::Error>> + 'a where Self: 'a;

            fn send<'a>(
                &'a mut self,
                frame_type: FrameType,
                frame_data: &'a [u8],
            ) -> Self::SendFuture<'a> {
                async move { AsyncSender::send(self, frame_type, frame_data).await }
            }
        }

        impl<S> asynch::Sender for AsyncSender<(), S>
        where
            S: Sender + SessionProvider + Send + Clone + 'static,
        {
            type SendFuture<'a>
            = impl Future<Output = Result<(), Self::Error>> + 'a where Self: 'a;

            fn send<'a>(
                &'a mut self,
                frame_type: FrameType,
                frame_data: &'a [u8],
            ) -> Self::SendFuture<'a> {
                async move { AsyncSender::send_blocking(self, frame_type, frame_data).await }
            }
        }

        impl<C, E> ErrorType for AsyncReceiver<C, E>
        where
            C: RawCondvar,
            E: Debug,
        {
            type Error = E;
        }

        impl<C, E> asynch::Receiver for AsyncReceiver<C, E>
        where
            C: RawCondvar + Send + Sync,
            C::RawMutex: Send + Sync,
            E: Debug,
        {
            type ReceiveFuture<'a>
            = AsyncReceiverFuture<'a, C, E> where Self: 'a;

            fn recv<'a>(&'a mut self, frame_data_buf: &'a mut [u8]) -> Self::ReceiveFuture<'a> {
                AsyncReceiverFuture {
                    receiver: self,
                    frame_data_buf,
                    _ep: PhantomData,
                }
            }
        }

        impl<U, C, S> ErrorType for AsyncAcceptor<U, C, S>
        where
            C: RawCondvar + Send + Sync,
            C::RawMutex: Send + Sync,
            S: Send + ErrorType,
        {
            type Error = <S as ErrorType>::Error;
        }

        impl<U, C, S> asynch::server::Acceptor for AsyncAcceptor<U, C, S>
        where
            U: Unblocker + Clone + Send,
            C: RawCondvar + Send + Sync,
            C::RawMutex: Send + Sync,
            C::RawMutex: Send + Sync,
            S: Sender + SessionProvider + Send + Clone + 'static,
            S::Error: Send + Sync + 'static,
        {
            type Sender<'a> = AsyncSender<U, S> where U: 'a, C: 'a, S: 'a;
            type Receiver<'a> = AsyncReceiver<C, S::Error> where U: 'a, C: 'a;

            type AcceptFuture<'a>
            = &'a Self where Self: 'a;

            fn accept(&self) -> Self::AcceptFuture<'_> {
                self
            }
        }

        impl<C, S> asynch::server::Acceptor for AsyncAcceptor<(), C, S>
        where
            C: RawCondvar + Send + Sync,
            C::RawMutex: Send + Sync,
            C::RawMutex: Send + Sync,
            S: Sender + SessionProvider + Send + Clone + 'static,
        {
            type Sender<'a> = AsyncSender<(), S> where C: 'a, S: 'a;
            type Receiver<'a> = AsyncReceiver<C, S::Error> where C: 'a;

            type AcceptFuture<'a>
            = &'a Self where Self: 'a;

            fn accept(&self) -> Self::AcceptFuture<'_> {
                self
            }
        }
    }
}