nodo_std/
sink.rs

1// Copyright 2023 David Weikersdorfer
2
3use core::marker::PhantomData;
4use nodo::prelude::*;
5
6/// A codelet which calls a callback for every received message
7pub struct Sink<T, F> {
8    callback: F,
9    marker: PhantomData<T>,
10}
11
12impl<T, F> Sink<T, F> {
13    pub fn new(callback: F) -> Self {
14        Self {
15            callback,
16            marker: PhantomData,
17        }
18    }
19}
20
21impl<T, F> Codelet for Sink<T, F>
22where
23    T: Send + Sync,
24    F: FnMut(T) -> Outcome + Send,
25{
26    type Status = DefaultStatus;
27    type Config = ();
28    type Rx = DoubleBufferRx<T>;
29    type Tx = ();
30
31    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
32        (DoubleBufferRx::new_auto_size(), ())
33    }
34
35    fn step(&mut self, _: &Context<Self>, rx: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
36        if rx.is_empty() {
37            SKIPPED
38        } else {
39            while let Some(msg) = rx.try_pop() {
40                (self.callback)(msg)?;
41            }
42            SUCCESS
43        }
44    }
45}