polars_mem_engine/executors/
executor.rs1use super::*;
2
3pub trait Executor: Send + Sync {
11 fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult<DataFrame>;
12
13 fn is_cache_prefiller(&self) -> bool {
14 false
15 }
16}
17
18type SinkFn =
19 Box<dyn FnMut(DataFrame, &mut ExecutionState) -> PolarsResult<Option<DataFrame>> + Send + Sync>;
20pub struct SinkExecutor {
21 pub name: String,
22 pub input: Box<dyn Executor>,
23 pub f: SinkFn,
24}
25
26impl Executor for SinkExecutor {
27 fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
28 state.should_stop()?;
29 #[cfg(debug_assertions)]
30 {
31 if state.verbose() {
32 eprintln!("run sink_{}", self.name)
33 }
34 }
35 let df = self.input.execute(state)?;
36
37 let profile_name = if state.has_node_timer() {
38 Cow::Owned(format!(".sink_{}()", &self.name))
39 } else {
40 Cow::Borrowed("")
41 };
42
43 state.clone().record(
44 || (self.f)(df, state).map(|df| df.unwrap_or_else(DataFrame::empty)),
45 profile_name,
46 )
47 }
48}
49
50pub struct Dummy {}
51impl Executor for Dummy {
52 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
53 panic!("should not get here");
54 }
55}
56
57impl Default for Box<dyn Executor> {
58 fn default() -> Self {
59 Box::new(Dummy {})
60 }
61}