1use core::marker::PhantomData;
4use nodo::prelude::*;
5
6pub struct Sink<T, F> {
8 callback: F,
9 marker: PhantomData<T>,
10}
11
12impl<T, F> Sink<T, F> {
13 pub fn new(callback: F) -> Self {
14 Self {
15 callback,
16 marker: PhantomData,
17 }
18 }
19}
20
21impl<T, F> Codelet for Sink<T, F>
22where
23 T: Send + Sync,
24 F: FnMut(T) -> Outcome + Send,
25{
26 type Status = DefaultStatus;
27 type Config = ();
28 type Rx = DoubleBufferRx<T>;
29 type Tx = ();
30
31 fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
32 (DoubleBufferRx::new_auto_size(), ())
33 }
34
35 fn step(&mut self, _: &Context<Self>, rx: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
36 if rx.is_empty() {
37 SKIPPED
38 } else {
39 while let Some(msg) = rx.try_pop() {
40 (self.callback)(msg)?;
41 }
42 SUCCESS
43 }
44 }
45}