nodo_std/
source.rs

1// Copyright 2023 David Weikersdorfer
2
3use core::marker::PhantomData;
4use nodo::prelude::*;
5
6/// A codelet which calls a callback each tick and publishes what it returns
7pub struct Source<T, F> {
8    callback: F,
9    marker: PhantomData<T>,
10}
11
12impl<T, F> Source<T, F> {
13    pub fn new(callback: F) -> Self {
14        Self {
15            callback,
16            marker: PhantomData,
17        }
18    }
19}
20
21impl<T, F> Codelet for Source<T, F>
22where
23    T: Send + Sync + Clone,
24    F: FnMut() -> T + Send,
25{
26    type Status = DefaultStatus;
27    type Config = ();
28    type Rx = ();
29    type Tx = DoubleBufferTx<T>;
30    type Signals = ();
31
32    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
33        ((), DoubleBufferTx::new(1))
34    }
35
36    fn step(&mut self, _: Context<Self>, _: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
37        tx.push((self.callback)())?;
38        SUCCESS
39    }
40}
41
42/// A codelet which calls a callback each tick and publishes what it returns
43pub struct SourceWithError<T, F> {
44    callback: F,
45    marker: PhantomData<T>,
46}
47
48impl<T, F> SourceWithError<T, F> {
49    pub fn new(callback: F) -> Self {
50        Self {
51            callback,
52            marker: PhantomData,
53        }
54    }
55}
56
57impl<T, F> Codelet for SourceWithError<T, F>
58where
59    T: Send + Sync + Clone,
60    F: FnMut() -> eyre::Result<T> + Send,
61{
62    type Status = DefaultStatus;
63    type Config = ();
64    type Rx = ();
65    type Tx = DoubleBufferTx<T>;
66    type Signals = ();
67
68    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
69        ((), DoubleBufferTx::new(1))
70    }
71
72    fn step(&mut self, _: Context<Self>, _: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
73        tx.push((self.callback)()?)?;
74        SUCCESS
75    }
76}