1use core::marker::PhantomData;
4use nodo::prelude::*;
5
6pub struct Source<T, F> {
8 callback: F,
9 marker: PhantomData<T>,
10}
11
12impl<T, F> Source<T, F> {
13 pub fn new(callback: F) -> Self {
14 Self {
15 callback,
16 marker: PhantomData,
17 }
18 }
19}
20
21impl<T, F> Codelet for Source<T, F>
22where
23 T: Send + Sync + Clone,
24 F: FnMut() -> T + Send,
25{
26 type Status = DefaultStatus;
27 type Config = ();
28 type Rx = ();
29 type Tx = DoubleBufferTx<T>;
30 type Signals = ();
31
32 fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
33 ((), DoubleBufferTx::new(1))
34 }
35
36 fn step(&mut self, _: Context<Self>, _: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
37 tx.push((self.callback)())?;
38 SUCCESS
39 }
40}
41
42pub struct SourceWithError<T, F> {
44 callback: F,
45 marker: PhantomData<T>,
46}
47
48impl<T, F> SourceWithError<T, F> {
49 pub fn new(callback: F) -> Self {
50 Self {
51 callback,
52 marker: PhantomData,
53 }
54 }
55}
56
57impl<T, F> Codelet for SourceWithError<T, F>
58where
59 T: Send + Sync + Clone,
60 F: FnMut() -> eyre::Result<T> + Send,
61{
62 type Status = DefaultStatus;
63 type Config = ();
64 type Rx = ();
65 type Tx = DoubleBufferTx<T>;
66 type Signals = ();
67
68 fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
69 ((), DoubleBufferTx::new(1))
70 }
71
72 fn step(&mut self, _: Context<Self>, _: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
73 tx.push((self.callback)()?)?;
74 SUCCESS
75 }
76}