postcard_rpc/server/impls/
test_channels.rs

1//! Implementation that uses channels for local testing
2
3use core::{
4    convert::Infallible,
5    future::{pending, Future},
6    sync::atomic::{AtomicU32, Ordering},
7};
8use std::sync::Arc;
9
10use crate::{
11    header::{VarHeader, VarKey, VarKeyKind, VarSeq},
12    host_client::util::Stopper,
13    server::{
14        AsWireRxErrorKind, AsWireTxErrorKind, WireRx, WireRxErrorKind, WireSpawn, WireTx,
15        WireTxErrorKind,
16    },
17    standard_icd::LoggingTopic,
18    Topic,
19};
20use core::fmt::Arguments;
21use thiserror::Error;
22use tokio::{select, sync::mpsc};
23
24//////////////////////////////////////////////////////////////////////////////
25// DISPATCH IMPL
26//////////////////////////////////////////////////////////////////////////////
27
28/// A collection of types and aliases useful for importing the correct types
29pub mod dispatch_impl {
30    pub use crate::host_client::util::Stopper;
31    use crate::{
32        header::VarKeyKind,
33        server::{Dispatch, Server},
34    };
35
36    pub use super::tokio_spawn as spawn_fn;
37
38    /// The settings necessary for creating a new channel server
39    pub struct Settings {
40        /// The frame sender
41        pub tx: WireTxImpl,
42        /// The frame receiver
43        pub rx: WireRxImpl,
44        /// The size of the receive buffer
45        pub buf: usize,
46        /// The sender key size to use
47        pub kkind: VarKeyKind,
48    }
49
50    /// Type alias for `WireTx` impl
51    pub type WireTxImpl = super::ChannelWireTx;
52    /// Type alias for `WireRx` impl
53    pub type WireRxImpl = super::ChannelWireRx;
54    /// Type alias for `WireSpawn` impl
55    pub type WireSpawnImpl = super::ChannelWireSpawn;
56    /// Type alias for the receive buffer
57    pub type WireRxBuf = Box<[u8]>;
58
59    /// Create a new server using the [`Settings`] and [`Dispatch`] implementation
60    pub fn new_server<D>(
61        dispatch: D,
62        settings: Settings,
63    ) -> crate::server::Server<WireTxImpl, WireRxImpl, WireRxBuf, D>
64    where
65        D: Dispatch<Tx = WireTxImpl>,
66    {
67        let buf = vec![0; settings.buf];
68        Server::new(
69            settings.tx,
70            settings.rx,
71            buf.into_boxed_slice(),
72            dispatch,
73            settings.kkind,
74        )
75    }
76
77    /// Create a new server using the [`Settings`] and [`Dispatch`] implementation
78    ///
79    /// Also returns a [`Stopper`] that can be used to halt the server's operation
80    pub fn new_server_stoppable<D>(
81        dispatch: D,
82        mut settings: Settings,
83    ) -> (
84        crate::server::Server<WireTxImpl, WireRxImpl, WireRxBuf, D>,
85        Stopper,
86    )
87    where
88        D: Dispatch<Tx = WireTxImpl>,
89    {
90        let stopper = Stopper::new();
91        settings.tx.set_stopper(stopper.clone());
92        settings.rx.set_stopper(stopper.clone());
93        let buf = vec![0; settings.buf];
94        let me = Server::new(
95            settings.tx,
96            settings.rx,
97            buf.into_boxed_slice(),
98            dispatch,
99            settings.kkind,
100        );
101        (me, stopper)
102    }
103}
104
105//////////////////////////////////////////////////////////////////////////////
106// TX
107//////////////////////////////////////////////////////////////////////////////
108
109/// A [`WireTx`] impl using tokio mpsc channels
110#[derive(Clone)]
111pub struct ChannelWireTx {
112    tx: mpsc::Sender<Vec<u8>>,
113    log_ctr: Arc<AtomicU32>,
114    stopper: Option<Stopper>,
115}
116
117impl ChannelWireTx {
118    /// Create a new [`ChannelWireTx`]
119    pub fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
120        Self {
121            tx,
122            log_ctr: Arc::new(AtomicU32::new(0)),
123            stopper: None,
124        }
125    }
126
127    /// Add a stopper to listen for "close" methods
128    pub fn set_stopper(&mut self, stopper: Stopper) {
129        self.stopper = Some(stopper);
130    }
131
132    async fn inner_send(&self, msg: Vec<u8>) -> Result<(), ChannelWireTxError> {
133        let stop_fut = async {
134            if let Some(s) = self.stopper.as_ref() {
135                s.wait_stopped().await;
136            } else {
137                pending::<()>().await;
138            }
139        };
140        select! {
141            _ = stop_fut => {
142                Err(ChannelWireTxError::ChannelClosed)
143            }
144            res = self.tx.send(msg) => {
145                match res {
146                    Ok(()) => Ok(()),
147                    Err(_) => Err(ChannelWireTxError::ChannelClosed)
148                }
149            }
150        }
151    }
152}
153
154impl WireTx for ChannelWireTx {
155    type Error = ChannelWireTxError;
156
157    async fn send<T: serde::Serialize + ?Sized>(
158        &self,
159        hdr: crate::header::VarHeader,
160        msg: &T,
161    ) -> Result<(), Self::Error> {
162        let mut hdr_ser = hdr.write_to_vec();
163        let bdy_ser = postcard::to_stdvec(msg).unwrap();
164        hdr_ser.extend_from_slice(&bdy_ser);
165        self.inner_send(hdr_ser).await
166    }
167
168    async fn send_raw(&self, buf: &[u8]) -> Result<(), Self::Error> {
169        let buf = buf.to_vec();
170        self.inner_send(buf).await
171    }
172
173    async fn send_log_str(&self, kkind: VarKeyKind, s: &str) -> Result<(), Self::Error> {
174        let ctr = self.log_ctr.fetch_add(1, Ordering::Relaxed);
175        let key = match kkind {
176            VarKeyKind::Key1 => VarKey::Key1(LoggingTopic::TOPIC_KEY1),
177            VarKeyKind::Key2 => VarKey::Key2(LoggingTopic::TOPIC_KEY2),
178            VarKeyKind::Key4 => VarKey::Key4(LoggingTopic::TOPIC_KEY4),
179            VarKeyKind::Key8 => VarKey::Key8(LoggingTopic::TOPIC_KEY),
180        };
181        let wh = VarHeader {
182            key,
183            seq_no: VarSeq::Seq4(ctr),
184        };
185        let msg = s.to_string();
186
187        self.send::<<LoggingTopic as Topic>::Message>(wh, &msg)
188            .await
189    }
190
191    async fn send_log_fmt<'a>(
192        &self,
193        kkind: VarKeyKind,
194        a: Arguments<'a>,
195    ) -> Result<(), Self::Error> {
196        let ctr = self.log_ctr.fetch_add(1, Ordering::Relaxed);
197        let key = match kkind {
198            VarKeyKind::Key1 => VarKey::Key1(LoggingTopic::TOPIC_KEY1),
199            VarKeyKind::Key2 => VarKey::Key2(LoggingTopic::TOPIC_KEY2),
200            VarKeyKind::Key4 => VarKey::Key4(LoggingTopic::TOPIC_KEY4),
201            VarKeyKind::Key8 => VarKey::Key8(LoggingTopic::TOPIC_KEY),
202        };
203        let wh = VarHeader {
204            key,
205            seq_no: VarSeq::Seq4(ctr),
206        };
207        let mut buf = wh.write_to_vec();
208        let msg = format!("{a}");
209        let msg = postcard::to_stdvec(&msg).unwrap();
210        buf.extend_from_slice(&msg);
211        self.inner_send(buf).await
212    }
213}
214
215/// A wire tx error
216#[derive(Debug, Error)]
217#[cfg_attr(feature = "defmt", derive(defmt::Format))]
218pub enum ChannelWireTxError {
219    /// The receiver closed the channel
220    #[error("channel closed")]
221    ChannelClosed,
222}
223
224impl AsWireTxErrorKind for ChannelWireTxError {
225    fn as_kind(&self) -> WireTxErrorKind {
226        match self {
227            ChannelWireTxError::ChannelClosed => WireTxErrorKind::ConnectionClosed,
228        }
229    }
230}
231
232//////////////////////////////////////////////////////////////////////////////
233// RX
234//////////////////////////////////////////////////////////////////////////////
235
236/// A [`WireRx`] impl using tokio mpsc channels
237pub struct ChannelWireRx {
238    rx: mpsc::Receiver<Vec<u8>>,
239    stopper: Option<Stopper>,
240}
241
242impl ChannelWireRx {
243    /// Create a new [`ChannelWireRx`]
244    pub fn new(rx: mpsc::Receiver<Vec<u8>>) -> Self {
245        Self { rx, stopper: None }
246    }
247
248    /// Add a stopper to listen for "close" methods
249    pub fn set_stopper(&mut self, stopper: Stopper) {
250        self.stopper = Some(stopper);
251    }
252}
253
254impl WireRx for ChannelWireRx {
255    type Error = ChannelWireRxError;
256
257    async fn receive<'a>(&mut self, buf: &'a mut [u8]) -> Result<&'a mut [u8], Self::Error> {
258        // todo: some kind of receive_owned?
259        let ChannelWireRx { rx, stopper } = self;
260        let stop_fut = async {
261            if let Some(s) = stopper.as_ref() {
262                s.wait_stopped().await;
263            } else {
264                pending::<()>().await;
265            }
266        };
267
268        select! {
269            _ = stop_fut => {
270                Err(ChannelWireRxError::ChannelClosed)
271            }
272            msg = rx.recv() => {
273                let msg = msg.ok_or(ChannelWireRxError::ChannelClosed)?;
274                let out = buf
275                    .get_mut(..msg.len())
276                    .ok_or(ChannelWireRxError::MessageTooLarge)?;
277                out.copy_from_slice(&msg);
278                Ok(out)
279            }
280        }
281    }
282}
283
284/// A wire rx error
285#[derive(Debug, Error)]
286#[cfg_attr(feature = "defmt", derive(defmt::Format))]
287pub enum ChannelWireRxError {
288    /// The sender closed the channel
289    #[error("channel closed")]
290    ChannelClosed,
291    /// The sender sent a too-large message
292    #[error("message too large")]
293    MessageTooLarge,
294}
295
296impl AsWireRxErrorKind for ChannelWireRxError {
297    fn as_kind(&self) -> WireRxErrorKind {
298        match self {
299            ChannelWireRxError::ChannelClosed => WireRxErrorKind::ConnectionClosed,
300            ChannelWireRxError::MessageTooLarge => WireRxErrorKind::ReceivedMessageTooLarge,
301        }
302    }
303}
304
305//////////////////////////////////////////////////////////////////////////////
306// SPAWN
307//////////////////////////////////////////////////////////////////////////////
308
309/// A wire spawn implementation
310#[derive(Clone)]
311pub struct ChannelWireSpawn;
312
313impl WireSpawn for ChannelWireSpawn {
314    type Error = Infallible;
315
316    type Info = ();
317
318    fn info(&self) -> &Self::Info {
319        &()
320    }
321}
322
323/// Spawn a task using tokio
324pub fn tokio_spawn<Sp, F>(_sp: &Sp, fut: F) -> Result<(), Sp::Error>
325where
326    Sp: WireSpawn<Error = Infallible, Info = ()>,
327    F: Future<Output = ()> + 'static + Send,
328{
329    tokio::task::spawn(fut);
330    Ok(())
331}