polars_mem_engine/executors/
executor.rs

1use super::*;
2
3// Executor are the executors of the physical plan and produce DataFrames. They
4// combine physical expressions, which produce Series.
5
6/// Executors will evaluate physical expressions and collect them in a DataFrame.
7///
8/// Executors have other executors as input. By having a tree of executors we can execute the
9/// physical plan until the last executor is evaluated.
10pub trait Executor: Send + Sync {
11    fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult<DataFrame>;
12
13    fn is_cache_prefiller(&self) -> bool {
14        false
15    }
16}
17
18type SinkFn =
19    Box<dyn FnMut(DataFrame, &mut ExecutionState) -> PolarsResult<Option<DataFrame>> + Send + Sync>;
20pub struct SinkExecutor {
21    pub name: String,
22    pub input: Box<dyn Executor>,
23    pub f: SinkFn,
24}
25
26impl Executor for SinkExecutor {
27    fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
28        state.should_stop()?;
29        #[cfg(debug_assertions)]
30        {
31            if state.verbose() {
32                eprintln!("run sink_{}", self.name)
33            }
34        }
35        let df = self.input.execute(state)?;
36
37        let profile_name = if state.has_node_timer() {
38            Cow::Owned(format!(".sink_{}()", &self.name))
39        } else {
40            Cow::Borrowed("")
41        };
42
43        state.clone().record(
44            || (self.f)(df, state).map(|df| df.unwrap_or_else(DataFrame::empty)),
45            profile_name,
46        )
47    }
48}
49
50pub struct Dummy {}
51impl Executor for Dummy {
52    fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
53        panic!("should not get here");
54    }
55}
56
57impl Default for Box<dyn Executor> {
58    fn default() -> Self {
59        Box::new(Dummy {})
60    }
61}