use super::*;
pub trait Executor: Send + Sync {
fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult<DataFrame>;
fn is_cache_prefiller(&self) -> bool {
false
}
}
type SinkFn =
Box<dyn FnMut(DataFrame, &mut ExecutionState) -> PolarsResult<Option<DataFrame>> + Send + Sync>;
pub struct SinkExecutor {
pub name: PlSmallStr,
pub input: Box<dyn Executor>,
pub f: SinkFn,
}
impl Executor for SinkExecutor {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
state.should_stop()?;
#[cfg(debug_assertions)]
{
if state.verbose() {
eprintln!("run sink_{}", self.name)
}
}
let df = self.input.execute(state)?;
let profile_name = if state.has_node_timer() {
Cow::Owned(format!(".sink_{}()", &self.name))
} else {
Cow::Borrowed("")
};
state.clone().record(
|| (self.f)(df, state).map(|df| df.unwrap_or_else(DataFrame::empty)),
profile_name,
)
}
}
pub struct Dummy {}
impl Executor for Dummy {
fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
panic!("should not get here");
}
}
impl Default for Box<dyn Executor> {
fn default() -> Self {
Box::new(Dummy {})
}
}