postcard_rpc/server/impls/
test_channels.rs1use core::{
4 convert::Infallible,
5 future::{pending, Future},
6 sync::atomic::{AtomicU32, Ordering},
7};
8use std::sync::Arc;
9
10use crate::{
11 header::{VarHeader, VarKey, VarKeyKind, VarSeq},
12 host_client::util::Stopper,
13 server::{
14 AsWireRxErrorKind, AsWireTxErrorKind, WireRx, WireRxErrorKind, WireSpawn, WireTx,
15 WireTxErrorKind,
16 },
17 standard_icd::LoggingTopic,
18 Topic,
19};
20use core::fmt::Arguments;
21use thiserror::Error;
22use tokio::{select, sync::mpsc};
23
24pub mod dispatch_impl {
30 pub use crate::host_client::util::Stopper;
31 use crate::{
32 header::VarKeyKind,
33 server::{Dispatch, Server},
34 };
35
36 pub use super::tokio_spawn as spawn_fn;
37
38 pub struct Settings {
40 pub tx: WireTxImpl,
42 pub rx: WireRxImpl,
44 pub buf: usize,
46 pub kkind: VarKeyKind,
48 }
49
50 pub type WireTxImpl = super::ChannelWireTx;
52 pub type WireRxImpl = super::ChannelWireRx;
54 pub type WireSpawnImpl = super::ChannelWireSpawn;
56 pub type WireRxBuf = Box<[u8]>;
58
59 pub fn new_server<D>(
61 dispatch: D,
62 settings: Settings,
63 ) -> crate::server::Server<WireTxImpl, WireRxImpl, WireRxBuf, D>
64 where
65 D: Dispatch<Tx = WireTxImpl>,
66 {
67 let buf = vec![0; settings.buf];
68 Server::new(
69 settings.tx,
70 settings.rx,
71 buf.into_boxed_slice(),
72 dispatch,
73 settings.kkind,
74 )
75 }
76
77 pub fn new_server_stoppable<D>(
81 dispatch: D,
82 mut settings: Settings,
83 ) -> (
84 crate::server::Server<WireTxImpl, WireRxImpl, WireRxBuf, D>,
85 Stopper,
86 )
87 where
88 D: Dispatch<Tx = WireTxImpl>,
89 {
90 let stopper = Stopper::new();
91 settings.tx.set_stopper(stopper.clone());
92 settings.rx.set_stopper(stopper.clone());
93 let buf = vec![0; settings.buf];
94 let me = Server::new(
95 settings.tx,
96 settings.rx,
97 buf.into_boxed_slice(),
98 dispatch,
99 settings.kkind,
100 );
101 (me, stopper)
102 }
103}
104
105#[derive(Clone)]
111pub struct ChannelWireTx {
112 tx: mpsc::Sender<Vec<u8>>,
113 log_ctr: Arc<AtomicU32>,
114 stopper: Option<Stopper>,
115}
116
117impl ChannelWireTx {
118 pub fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
120 Self {
121 tx,
122 log_ctr: Arc::new(AtomicU32::new(0)),
123 stopper: None,
124 }
125 }
126
127 pub fn set_stopper(&mut self, stopper: Stopper) {
129 self.stopper = Some(stopper);
130 }
131
132 async fn inner_send(&self, msg: Vec<u8>) -> Result<(), ChannelWireTxError> {
133 let stop_fut = async {
134 if let Some(s) = self.stopper.as_ref() {
135 s.wait_stopped().await;
136 } else {
137 pending::<()>().await;
138 }
139 };
140 select! {
141 _ = stop_fut => {
142 Err(ChannelWireTxError::ChannelClosed)
143 }
144 res = self.tx.send(msg) => {
145 match res {
146 Ok(()) => Ok(()),
147 Err(_) => Err(ChannelWireTxError::ChannelClosed)
148 }
149 }
150 }
151 }
152}
153
154impl WireTx for ChannelWireTx {
155 type Error = ChannelWireTxError;
156
157 async fn send<T: serde::Serialize + ?Sized>(
158 &self,
159 hdr: crate::header::VarHeader,
160 msg: &T,
161 ) -> Result<(), Self::Error> {
162 let mut hdr_ser = hdr.write_to_vec();
163 let bdy_ser = postcard::to_stdvec(msg).unwrap();
164 hdr_ser.extend_from_slice(&bdy_ser);
165 self.inner_send(hdr_ser).await
166 }
167
168 async fn send_raw(&self, buf: &[u8]) -> Result<(), Self::Error> {
169 let buf = buf.to_vec();
170 self.inner_send(buf).await
171 }
172
173 async fn send_log_str(&self, kkind: VarKeyKind, s: &str) -> Result<(), Self::Error> {
174 let ctr = self.log_ctr.fetch_add(1, Ordering::Relaxed);
175 let key = match kkind {
176 VarKeyKind::Key1 => VarKey::Key1(LoggingTopic::TOPIC_KEY1),
177 VarKeyKind::Key2 => VarKey::Key2(LoggingTopic::TOPIC_KEY2),
178 VarKeyKind::Key4 => VarKey::Key4(LoggingTopic::TOPIC_KEY4),
179 VarKeyKind::Key8 => VarKey::Key8(LoggingTopic::TOPIC_KEY),
180 };
181 let wh = VarHeader {
182 key,
183 seq_no: VarSeq::Seq4(ctr),
184 };
185 let msg = s.to_string();
186
187 self.send::<<LoggingTopic as Topic>::Message>(wh, &msg)
188 .await
189 }
190
191 async fn send_log_fmt<'a>(
192 &self,
193 kkind: VarKeyKind,
194 a: Arguments<'a>,
195 ) -> Result<(), Self::Error> {
196 let ctr = self.log_ctr.fetch_add(1, Ordering::Relaxed);
197 let key = match kkind {
198 VarKeyKind::Key1 => VarKey::Key1(LoggingTopic::TOPIC_KEY1),
199 VarKeyKind::Key2 => VarKey::Key2(LoggingTopic::TOPIC_KEY2),
200 VarKeyKind::Key4 => VarKey::Key4(LoggingTopic::TOPIC_KEY4),
201 VarKeyKind::Key8 => VarKey::Key8(LoggingTopic::TOPIC_KEY),
202 };
203 let wh = VarHeader {
204 key,
205 seq_no: VarSeq::Seq4(ctr),
206 };
207 let mut buf = wh.write_to_vec();
208 let msg = format!("{a}");
209 let msg = postcard::to_stdvec(&msg).unwrap();
210 buf.extend_from_slice(&msg);
211 self.inner_send(buf).await
212 }
213}
214
215#[derive(Debug, Error)]
217#[cfg_attr(feature = "defmt", derive(defmt::Format))]
218pub enum ChannelWireTxError {
219 #[error("channel closed")]
221 ChannelClosed,
222}
223
224impl AsWireTxErrorKind for ChannelWireTxError {
225 fn as_kind(&self) -> WireTxErrorKind {
226 match self {
227 ChannelWireTxError::ChannelClosed => WireTxErrorKind::ConnectionClosed,
228 }
229 }
230}
231
232pub struct ChannelWireRx {
238 rx: mpsc::Receiver<Vec<u8>>,
239 stopper: Option<Stopper>,
240}
241
242impl ChannelWireRx {
243 pub fn new(rx: mpsc::Receiver<Vec<u8>>) -> Self {
245 Self { rx, stopper: None }
246 }
247
248 pub fn set_stopper(&mut self, stopper: Stopper) {
250 self.stopper = Some(stopper);
251 }
252}
253
254impl WireRx for ChannelWireRx {
255 type Error = ChannelWireRxError;
256
257 async fn receive<'a>(&mut self, buf: &'a mut [u8]) -> Result<&'a mut [u8], Self::Error> {
258 let ChannelWireRx { rx, stopper } = self;
260 let stop_fut = async {
261 if let Some(s) = stopper.as_ref() {
262 s.wait_stopped().await;
263 } else {
264 pending::<()>().await;
265 }
266 };
267
268 select! {
269 _ = stop_fut => {
270 Err(ChannelWireRxError::ChannelClosed)
271 }
272 msg = rx.recv() => {
273 let msg = msg.ok_or(ChannelWireRxError::ChannelClosed)?;
274 let out = buf
275 .get_mut(..msg.len())
276 .ok_or(ChannelWireRxError::MessageTooLarge)?;
277 out.copy_from_slice(&msg);
278 Ok(out)
279 }
280 }
281 }
282}
283
284#[derive(Debug, Error)]
286#[cfg_attr(feature = "defmt", derive(defmt::Format))]
287pub enum ChannelWireRxError {
288 #[error("channel closed")]
290 ChannelClosed,
291 #[error("message too large")]
293 MessageTooLarge,
294}
295
296impl AsWireRxErrorKind for ChannelWireRxError {
297 fn as_kind(&self) -> WireRxErrorKind {
298 match self {
299 ChannelWireRxError::ChannelClosed => WireRxErrorKind::ConnectionClosed,
300 ChannelWireRxError::MessageTooLarge => WireRxErrorKind::ReceivedMessageTooLarge,
301 }
302 }
303}
304
305#[derive(Clone)]
311pub struct ChannelWireSpawn;
312
313impl WireSpawn for ChannelWireSpawn {
314 type Error = Infallible;
315
316 type Info = ();
317
318 fn info(&self) -> &Self::Info {
319 &()
320 }
321}
322
323pub fn tokio_spawn<Sp, F>(_sp: &Sp, fut: F) -> Result<(), Sp::Error>
325where
326 Sp: WireSpawn<Error = Infallible, Info = ()>,
327 F: Future<Output = ()> + 'static + Send,
328{
329 tokio::task::spawn(fut);
330 Ok(())
331}