use crate::error::PondError;
use super::into_result::IntoNodeResult;
use super::stable::{StableFn, StableTuple};
use super::traits::{DatasetEvent, DatasetRef, NodeInput, NodeOutput, StepInfo, RunnableStep};
pub trait CompatibleOutput<O: StableTuple> {}
impl<O: StableTuple> CompatibleOutput<O> for O {}
impl<O: StableTuple, E> CompatibleOutput<O> for Result<O, E> {}
pub struct Node<F, Input: NodeInput, Output: NodeOutput>
where
F: StableFn<Input::Args>,
F::Output: CompatibleOutput<Output::Output>,
{
pub name: &'static str,
pub func: F,
pub input: Input,
pub output: Output,
}
impl<F, Input, Output> StepInfo for Node<F, Input, Output>
where
Input: NodeInput + Send + Sync,
Output: NodeOutput + Send + Sync,
F: StableFn<Input::Args> + Send + Sync,
F::Output: CompatibleOutput<Output::Output>,
{
fn name(&self) -> &'static str {
self.name
}
fn is_leaf(&self) -> bool {
true
}
fn type_string(&self) -> &'static str {
core::any::type_name::<F>()
}
fn for_each_child<'a>(&'a self, _f: &mut dyn FnMut(&'a dyn StepInfo)) {}
fn for_each_input<'s>(&'s self, f: &mut dyn FnMut(&DatasetRef<'s>)) {
self.input.for_each_input(f);
}
fn for_each_output<'s>(&'s self, f: &mut dyn FnMut(&DatasetRef<'s>)) {
self.output.for_each_output(f);
}
}
impl<F, Input, Output, E, R> RunnableStep<E> for Node<F, Input, Output>
where
Input: NodeInput + Send + Sync,
Output: NodeOutput + Send + Sync,
F: StableFn<Input::Args, Output = R> + Send + Sync,
R: IntoNodeResult<Output::Output, E> + CompatibleOutput<Output::Output>,
E: From<PondError>,
{
fn call(&self, on_event: &mut dyn FnMut(&DatasetRef<'_>, DatasetEvent)) -> Result<(), E> {
let args = self.input.load_data(on_event).map_err(E::from)?;
let result = StableFn::call(&self.func, args);
let output = result.into_node_result()?;
self.output.save_data(output, on_event).map_err(E::from)?;
Ok(())
}
fn for_each_child_step<'a>(&'a self, _f: &mut dyn FnMut(&'a dyn RunnableStep<E>)) {}
fn as_pipeline_info(&self) -> &dyn StepInfo { self }
}