1use core::marker::PhantomData;
4use nodo::prelude::*;
5
6pub struct Pipe<T, S, F> {
8 callback: F,
9 marker: PhantomData<(T, S)>,
10}
11
12#[derive(Config)]
13pub enum PipeConfig {
14 OneToOne,
15 Dynamic,
16}
17
18impl<T, S, F> Pipe<T, S, F> {
19 pub fn new(callback: F) -> Self {
20 Self {
21 callback,
22 marker: PhantomData,
23 }
24 }
25}
26
27impl<T, S, F> Codelet for Pipe<T, S, F>
28where
29 T: Send + Sync,
30 S: Clone + Send + Sync,
31 F: FnMut(T) -> S + Send,
32{
33 type Status = DefaultStatus;
34 type Config = PipeConfig;
35 type Rx = DoubleBufferRx<T>;
36 type Tx = DoubleBufferTx<S>;
37 type Signals = ();
38
39 fn build_bundles(config: &Self::Config) -> (Self::Rx, Self::Tx) {
40 match config {
41 PipeConfig::OneToOne => (
42 DoubleBufferRx::new(OverflowPolicy::Reject(1), RetentionPolicy::EnforceEmpty),
43 DoubleBufferTx::new(1),
44 ),
45 PipeConfig::Dynamic => (
46 DoubleBufferRx::new_auto_size(),
47 DoubleBufferTx::new_auto_size(),
48 ),
49 }
50 }
51
52 fn start(&mut self, _cx: Context<Self>, rx: &mut Self::Rx, _tx: &mut Self::Tx) -> Outcome {
53 while let Some(_) = rx.try_pop() {}
57 SUCCESS
58 }
59
60 fn step(&mut self, ctx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
61 match ctx.config {
62 PipeConfig::OneToOne => {
63 if let Some(msg) = rx.try_pop() {
64 tx.push((self.callback)(msg))?;
65 SUCCESS
66 } else {
67 SKIPPED
68 }
69 }
70 PipeConfig::Dynamic => {
71 let skipped = rx.is_empty();
72 while let Some(msg) = rx.try_pop() {
73 tx.push((self.callback)(msg))?;
74 }
75 if skipped {
76 SKIPPED
77 } else {
78 SUCCESS
79 }
80 }
81 }
82 }
83}