#[cfg(feature = "std")]
mod parallel;
mod sequential;
#[cfg(feature = "std")]
pub use parallel::ParallelRunner;
pub use sequential::SequentialRunner;
use serde::Serialize;
use crate::pipeline::{DatasetEvent, DatasetRef, StepInfo, Steps};
use crate::error::PondError;
use crate::hooks::Hooks;
#[cfg(feature = "std")]
pub(crate) fn dispatch_dataset_event(
item: &dyn StepInfo,
ds: &DatasetRef<'_>,
event: DatasetEvent,
names: &std::collections::HashMap<usize, std::string::String>,
hooks: &impl Hooks,
) {
let ds = DatasetRef { name: names.get(&ds.id).map(|s: &std::string::String| s.as_str()), ..*ds };
dispatch_dataset_event_raw(item, &ds, event, hooks);
}
pub(crate) fn dispatch_dataset_event_raw(
item: &dyn StepInfo,
ds: &DatasetRef<'_>,
event: DatasetEvent,
hooks: &impl Hooks,
) {
match event {
DatasetEvent::BeforeLoad => hooks.for_each_hook(&mut |h| h.before_dataset_loaded(item, ds)),
DatasetEvent::AfterLoad => hooks.for_each_hook(&mut |h| h.after_dataset_loaded(item, ds)),
DatasetEvent::BeforeSave => hooks.for_each_hook(&mut |h| h.before_dataset_saved(item, ds)),
DatasetEvent::AfterSave => hooks.for_each_hook(&mut |h| h.after_dataset_saved(item, ds)),
}
}
pub trait Runner {
fn name(&self) -> &'static str;
fn run<E>(
&self,
pipe: &impl Steps<E>,
catalog: &impl Serialize,
params: &impl Serialize,
hooks: &impl Hooks,
) -> Result<(), E>
where
E: From<PondError> + Send + Sync + core::fmt::Display + core::fmt::Debug + 'static;
}
pub trait Runners {
fn first_name(&self) -> &'static str;
fn run_by_name<E>(
&self,
name: &str,
pipe: &impl Steps<E>,
catalog: &impl Serialize,
params: &impl Serialize,
hooks: &impl Hooks,
) -> Option<Result<(), E>>
where
E: From<PondError> + Send + Sync + core::fmt::Display + core::fmt::Debug + 'static;
fn for_each_name(&self, f: &mut dyn FnMut(&str));
}
macro_rules! impl_runners {
($($R:ident $idx:tt),+) => {
impl<$($R: Runner),+> Runners for ($($R,)+) {
fn first_name(&self) -> &'static str {
self.0.name()
}
fn run_by_name<E>(
&self,
name: &str,
pipe: &impl Steps<E>,
catalog: &impl Serialize,
params: &impl Serialize,
hooks: &impl Hooks,
) -> Option<Result<(), E>>
where
E: From<PondError> + Send + Sync + core::fmt::Display + core::fmt::Debug + 'static,
{
$(
if self.$idx.name() == name {
return Some(self.$idx.run(pipe, catalog, params, hooks));
}
)+
None
}
fn for_each_name(&self, f: &mut dyn FnMut(&str)) {
$(f(self.$idx.name());)+
}
}
};
}
impl_runners!(R0 0);
impl_runners!(R0 0, R1 1);
impl_runners!(R0 0, R1 1, R2 2);
impl_runners!(R0 0, R1 1, R2 2, R3 3);
impl_runners!(R0 0, R1 1, R2 2, R3 3, R4 4);