use core::marker::PhantomData;
use nodo::prelude::*;
pub struct Source<T, F> {
callback: F,
marker: PhantomData<T>,
}
impl<T, F> Source<T, F> {
pub fn new(callback: F) -> Self {
Self {
callback,
marker: PhantomData,
}
}
}
impl<T, F> Codelet for Source<T, F>
where
T: Send + Sync + Clone,
F: FnMut() -> T + Send,
{
type Status = DefaultStatus;
type Config = ();
type Rx = ();
type Tx = DoubleBufferTx<T>;
type Signals = ();
fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
((), DoubleBufferTx::new(1))
}
fn step(&mut self, _: Context<Self>, _: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
tx.push((self.callback)())?;
SUCCESS
}
}
pub struct SourceWithError<T, F> {
callback: F,
marker: PhantomData<T>,
}
impl<T, F> SourceWithError<T, F> {
pub fn new(callback: F) -> Self {
Self {
callback,
marker: PhantomData,
}
}
}
impl<T, F> Codelet for SourceWithError<T, F>
where
T: Send + Sync + Clone,
F: FnMut() -> eyre::Result<T> + Send,
{
type Status = DefaultStatus;
type Config = ();
type Rx = ();
type Tx = DoubleBufferTx<T>;
type Signals = ();
fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
((), DoubleBufferTx::new(1))
}
fn step(&mut self, _: Context<Self>, _: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
tx.push((self.callback)()?)?;
SUCCESS
}
}