polars_plan/plans/functions/
mod.rs

1mod count;
2mod dsl;
3#[cfg(feature = "merge_sorted")]
4mod merge_sorted;
5#[cfg(feature = "python")]
6mod python_udf;
7mod rename;
8mod schema;
9
10use std::borrow::Cow;
11use std::fmt::{Debug, Display, Formatter};
12use std::hash::{Hash, Hasher};
13use std::sync::{Arc, Mutex};
14
15pub use dsl::*;
16use polars_core::error::feature_gated;
17use polars_core::prelude::*;
18use polars_utils::pl_str::PlSmallStr;
19#[cfg(feature = "serde")]
20use serde::{Deserialize, Serialize};
21use strum_macros::IntoStaticStr;
22
23#[cfg(feature = "python")]
24use crate::dsl::python_udf::PythonFunction;
25#[cfg(feature = "merge_sorted")]
26use crate::plans::functions::merge_sorted::merge_sorted;
27use crate::plans::ir::ScanSourcesDisplay;
28use crate::prelude::*;
29
30#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
31#[derive(Clone, IntoStaticStr)]
32#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
33pub enum FunctionIR {
34    RowIndex {
35        name: PlSmallStr,
36        offset: Option<IdxSize>,
37        // Might be cached.
38        #[cfg_attr(feature = "ir_serde", serde(skip))]
39        schema: CachedSchema,
40    },
41    #[cfg(feature = "python")]
42    OpaquePython(OpaquePythonUdf),
43
44    FastCount {
45        sources: ScanSources,
46        scan_type: FileScan,
47        alias: Option<PlSmallStr>,
48    },
49
50    Unnest {
51        columns: Arc<[PlSmallStr]>,
52    },
53    Rechunk,
54    // The two DataFrames are temporary concatenated
55    // this indicates until which chunk the data is from the left df
56    // this trick allows us to reuse the `Union` architecture to get map over
57    // two DataFrames
58    #[cfg(feature = "merge_sorted")]
59    MergeSorted {
60        // sorted column that serves as the key
61        column: PlSmallStr,
62    },
63    Rename {
64        existing: Arc<[PlSmallStr]>,
65        new: Arc<[PlSmallStr]>,
66        // A column name gets swapped with an existing column
67        swapping: bool,
68        #[cfg_attr(feature = "ir_serde", serde(skip))]
69        schema: CachedSchema,
70    },
71    Explode {
72        columns: Arc<[PlSmallStr]>,
73        #[cfg_attr(feature = "ir_serde", serde(skip))]
74        schema: CachedSchema,
75    },
76    #[cfg(feature = "pivot")]
77    Unpivot {
78        args: Arc<UnpivotArgsIR>,
79        #[cfg_attr(feature = "ir_serde", serde(skip))]
80        schema: CachedSchema,
81    },
82    #[cfg_attr(feature = "ir_serde", serde(skip))]
83    Opaque {
84        function: Arc<dyn DataFrameUdf>,
85        schema: Option<Arc<dyn UdfSchema>>,
86        ///  allow predicate pushdown optimizations
87        predicate_pd: bool,
88        ///  allow projection pushdown optimizations
89        projection_pd: bool,
90        streamable: bool,
91        // used for formatting
92        fmt_str: PlSmallStr,
93    },
94    /// Streaming engine pipeline
95    #[cfg_attr(feature = "ir_serde", serde(skip))]
96    Pipeline {
97        function: Arc<Mutex<dyn DataFrameUdfMut>>,
98        schema: SchemaRef,
99        original: Option<Arc<IRPlan>>,
100    },
101}
102
103impl Eq for FunctionIR {}
104
105impl PartialEq for FunctionIR {
106    fn eq(&self, other: &Self) -> bool {
107        use FunctionIR::*;
108        match (self, other) {
109            (Rechunk, Rechunk) => true,
110            (
111                FastCount {
112                    sources: srcs_l, ..
113                },
114                FastCount {
115                    sources: srcs_r, ..
116                },
117            ) => srcs_l == srcs_r,
118            (
119                Rename {
120                    existing: existing_l,
121                    new: new_l,
122                    ..
123                },
124                Rename {
125                    existing: existing_r,
126                    new: new_r,
127                    ..
128                },
129            ) => existing_l == existing_r && new_l == new_r,
130            (Explode { columns: l, .. }, Explode { columns: r, .. }) => l == r,
131            #[cfg(feature = "pivot")]
132            (Unpivot { args: l, .. }, Unpivot { args: r, .. }) => l == r,
133            (RowIndex { name: l, .. }, RowIndex { name: r, .. }) => l == r,
134            #[cfg(feature = "merge_sorted")]
135            (MergeSorted { column: l }, MergeSorted { column: r }) => l == r,
136            _ => false,
137        }
138    }
139}
140
141impl Hash for FunctionIR {
142    fn hash<H: Hasher>(&self, state: &mut H) {
143        std::mem::discriminant(self).hash(state);
144        match self {
145            #[cfg(feature = "python")]
146            FunctionIR::OpaquePython { .. } => {},
147            FunctionIR::Opaque { fmt_str, .. } => fmt_str.hash(state),
148            FunctionIR::FastCount {
149                sources,
150                scan_type,
151                alias,
152            } => {
153                sources.hash(state);
154                scan_type.hash(state);
155                alias.hash(state);
156            },
157            FunctionIR::Pipeline { .. } => {},
158            FunctionIR::Unnest { columns } => columns.hash(state),
159            FunctionIR::Rechunk => {},
160            #[cfg(feature = "merge_sorted")]
161            FunctionIR::MergeSorted { column } => column.hash(state),
162            FunctionIR::Rename {
163                existing,
164                new,
165                swapping: _,
166                ..
167            } => {
168                existing.hash(state);
169                new.hash(state);
170            },
171            FunctionIR::Explode { columns, schema: _ } => columns.hash(state),
172            #[cfg(feature = "pivot")]
173            FunctionIR::Unpivot { args, schema: _ } => args.hash(state),
174            FunctionIR::RowIndex {
175                name,
176                schema: _,
177                offset,
178            } => {
179                name.hash(state);
180                offset.hash(state);
181            },
182        }
183    }
184}
185
186impl FunctionIR {
187    /// Whether this function can run on batches of data at a time.
188    pub fn is_streamable(&self) -> bool {
189        use FunctionIR::*;
190        match self {
191            Rechunk | Pipeline { .. } => false,
192            #[cfg(feature = "merge_sorted")]
193            MergeSorted { .. } => false,
194            FastCount { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => true,
195            #[cfg(feature = "pivot")]
196            Unpivot { .. } => true,
197            Opaque { streamable, .. } => *streamable,
198            #[cfg(feature = "python")]
199            OpaquePython(OpaquePythonUdf { streamable, .. }) => *streamable,
200            RowIndex { .. } => false,
201        }
202    }
203
204    /// Whether this function will increase the number of rows
205    pub fn expands_rows(&self) -> bool {
206        use FunctionIR::*;
207        match self {
208            #[cfg(feature = "merge_sorted")]
209            MergeSorted { .. } => true,
210            #[cfg(feature = "pivot")]
211            Unpivot { .. } => true,
212            Explode { .. } => true,
213            _ => false,
214        }
215    }
216
217    pub(crate) fn allow_predicate_pd(&self) -> bool {
218        use FunctionIR::*;
219        match self {
220            Opaque { predicate_pd, .. } => *predicate_pd,
221            #[cfg(feature = "python")]
222            OpaquePython(OpaquePythonUdf { predicate_pd, .. }) => *predicate_pd,
223            #[cfg(feature = "pivot")]
224            Unpivot { .. } => true,
225            Rechunk | Unnest { .. } | Rename { .. } | Explode { .. } => true,
226            #[cfg(feature = "merge_sorted")]
227            MergeSorted { .. } => true,
228            RowIndex { .. } | FastCount { .. } => false,
229            Pipeline { .. } => unimplemented!(),
230        }
231    }
232
233    pub(crate) fn allow_projection_pd(&self) -> bool {
234        use FunctionIR::*;
235        match self {
236            Opaque { projection_pd, .. } => *projection_pd,
237            #[cfg(feature = "python")]
238            OpaquePython(OpaquePythonUdf { projection_pd, .. }) => *projection_pd,
239            Rechunk | FastCount { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => true,
240            #[cfg(feature = "pivot")]
241            Unpivot { .. } => true,
242            #[cfg(feature = "merge_sorted")]
243            MergeSorted { .. } => true,
244            RowIndex { .. } => true,
245            Pipeline { .. } => unimplemented!(),
246        }
247    }
248
249    pub(crate) fn additional_projection_pd_columns(&self) -> Cow<[PlSmallStr]> {
250        use FunctionIR::*;
251        match self {
252            Unnest { columns } => Cow::Borrowed(columns.as_ref()),
253            Explode { columns, .. } => Cow::Borrowed(columns.as_ref()),
254            #[cfg(feature = "merge_sorted")]
255            MergeSorted { column, .. } => Cow::Owned(vec![column.clone()]),
256            _ => Cow::Borrowed(&[]),
257        }
258    }
259
260    pub fn evaluate(&self, mut df: DataFrame) -> PolarsResult<DataFrame> {
261        use FunctionIR::*;
262        match self {
263            Opaque { function, .. } => function.call_udf(df),
264            #[cfg(feature = "python")]
265            OpaquePython(OpaquePythonUdf {
266                function,
267                validate_output,
268                schema,
269                ..
270            }) => python_udf::call_python_udf(function, df, *validate_output, schema.clone()),
271            FastCount {
272                sources,
273                scan_type,
274                alias,
275            } => count::count_rows(sources, scan_type, alias.clone()),
276            Rechunk => {
277                df.as_single_chunk_par();
278                Ok(df)
279            },
280            #[cfg(feature = "merge_sorted")]
281            MergeSorted { column } => merge_sorted(&df, column.as_ref()),
282            Unnest { columns: _columns } => {
283                feature_gated!("dtype-struct", df.unnest(_columns.iter().cloned()))
284            },
285            Pipeline { function, .. } => {
286                // we use a global string cache here as streaming chunks all have different rev maps
287                #[cfg(feature = "dtype-categorical")]
288                {
289                    let _sc = StringCacheHolder::hold();
290                    function.lock().unwrap().call_udf(df)
291                }
292
293                #[cfg(not(feature = "dtype-categorical"))]
294                {
295                    function.lock().unwrap().call_udf(df)
296                }
297            },
298            Rename { existing, new, .. } => rename::rename_impl(df, existing, new),
299            Explode { columns, .. } => df.explode(columns.iter().cloned()),
300            #[cfg(feature = "pivot")]
301            Unpivot { args, .. } => {
302                use polars_ops::pivot::UnpivotDF;
303                let args = (**args).clone();
304                df.unpivot2(args)
305            },
306            RowIndex { name, offset, .. } => df.with_row_index(name.clone(), *offset),
307        }
308    }
309
310    pub fn to_streaming_lp(&self) -> Option<IRPlanRef> {
311        let Self::Pipeline {
312            function: _,
313            schema: _,
314            original,
315        } = self
316        else {
317            return None;
318        };
319
320        Some(original.as_ref()?.as_ref().as_ref())
321    }
322}
323
324impl Debug for FunctionIR {
325    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
326        write!(f, "{self}")
327    }
328}
329
330impl Display for FunctionIR {
331    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332        use FunctionIR::*;
333        match self {
334            Opaque { fmt_str, .. } => write!(f, "{fmt_str}"),
335            Unnest { columns } => {
336                write!(f, "UNNEST by:")?;
337                let columns = columns.as_ref();
338                fmt_column_delimited(f, columns, "[", "]")
339            },
340            Pipeline { original, .. } => {
341                if let Some(original) = original {
342                    let ir_display = original.as_ref().display();
343
344                    writeln!(f, "--- STREAMING")?;
345                    write!(f, "{ir_display}")?;
346                    let indent = 2;
347                    write!(f, "{:indent$}--- END STREAMING", "")
348                } else {
349                    write!(f, "STREAMING")
350                }
351            },
352            FastCount {
353                sources,
354                scan_type,
355                alias,
356            } => {
357                let scan_type: &str = scan_type.into();
358                let default_column_name = PlSmallStr::from_static(crate::constants::LEN);
359                let alias = alias.as_ref().unwrap_or(&default_column_name);
360
361                write!(
362                    f,
363                    "FAST COUNT ({scan_type}) {} as \"{alias}\"",
364                    ScanSourcesDisplay(sources)
365                )
366            },
367            v => {
368                let s: &str = v.into();
369                write!(f, "{s}")
370            },
371        }
372    }
373}