nodo_std/
pipe.rs

1// Copyright 2023 David Weikersdorfer
2
3use core::marker::PhantomData;
4use nodo::prelude::*;
5
6// FIXME replace nodo::Pipe with this one
7pub struct Pipe<T, S, F> {
8    callback: F,
9    marker: PhantomData<(T, S)>,
10}
11
12#[derive(Config)]
13pub enum PipeConfig {
14    OneToOne,
15    Dynamic,
16}
17
18impl<T, S, F> Pipe<T, S, F> {
19    pub fn new(callback: F) -> Self {
20        Self {
21            callback,
22            marker: PhantomData,
23        }
24    }
25}
26
27impl<T, S, F> Codelet for Pipe<T, S, F>
28where
29    T: Send + Sync,
30    S: Clone + Send + Sync,
31    F: FnMut(T) -> S + Send,
32{
33    type Status = DefaultStatus;
34    type Config = PipeConfig;
35    type Rx = DoubleBufferRx<T>;
36    type Tx = DoubleBufferTx<S>;
37    type Signals = ();
38
39    fn build_bundles(config: &Self::Config) -> (Self::Rx, Self::Tx) {
40        match config {
41            PipeConfig::OneToOne => (
42                DoubleBufferRx::new(OverflowPolicy::Reject(1), RetentionPolicy::EnforceEmpty),
43                DoubleBufferTx::new(1),
44            ),
45            PipeConfig::Dynamic => (
46                DoubleBufferRx::new_auto_size(),
47                DoubleBufferTx::new_auto_size(),
48            ),
49        }
50    }
51
52    fn start(&mut self, _cx: Context<Self>, rx: &mut Self::Rx, _tx: &mut Self::Tx) -> Outcome {
53        // FIXME There is a general problem in nodo as messages can be received during start, and
54        //       if they are not handled EnforceEmpty will panic.
55        //       In the wild only this codelet seem to have this problem so we fix it here for now.
56        while let Some(_) = rx.try_pop() {}
57        SUCCESS
58    }
59
60    fn step(&mut self, ctx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
61        match ctx.config {
62            PipeConfig::OneToOne => {
63                if let Some(msg) = rx.try_pop() {
64                    tx.push((self.callback)(msg))?;
65                    SUCCESS
66                } else {
67                    SKIPPED
68                }
69            }
70            PipeConfig::Dynamic => {
71                let skipped = rx.is_empty();
72                while let Some(msg) = rx.try_pop() {
73                    tx.push((self.callback)(msg))?;
74                }
75                if skipped {
76                    SKIPPED
77                } else {
78                    SUCCESS
79                }
80            }
81        }
82    }
83}