1mod count;
2mod dsl;
3#[cfg(feature = "merge_sorted")]
4mod merge_sorted;
5#[cfg(feature = "python")]
6mod python_udf;
7mod rename;
8mod schema;
9
10use std::borrow::Cow;
11use std::fmt::{Debug, Display, Formatter};
12use std::hash::{Hash, Hasher};
13use std::sync::{Arc, Mutex};
14
15pub use dsl::*;
16use polars_core::error::feature_gated;
17use polars_core::prelude::*;
18use polars_utils::pl_str::PlSmallStr;
19#[cfg(feature = "serde")]
20use serde::{Deserialize, Serialize};
21use strum_macros::IntoStaticStr;
22
23#[cfg(feature = "python")]
24use crate::dsl::python_udf::PythonFunction;
25#[cfg(feature = "merge_sorted")]
26use crate::plans::functions::merge_sorted::merge_sorted;
27use crate::plans::ir::ScanSourcesDisplay;
28use crate::prelude::*;
29
30#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
31#[derive(Clone, IntoStaticStr)]
32#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
33pub enum FunctionIR {
34 RowIndex {
35 name: PlSmallStr,
36 offset: Option<IdxSize>,
37 #[cfg_attr(feature = "ir_serde", serde(skip))]
39 schema: CachedSchema,
40 },
41 #[cfg(feature = "python")]
42 OpaquePython(OpaquePythonUdf),
43
44 FastCount {
45 sources: ScanSources,
46 scan_type: FileScan,
47 alias: Option<PlSmallStr>,
48 },
49
50 Unnest {
51 columns: Arc<[PlSmallStr]>,
52 },
53 Rechunk,
54 #[cfg(feature = "merge_sorted")]
59 MergeSorted {
60 column: PlSmallStr,
62 },
63 Rename {
64 existing: Arc<[PlSmallStr]>,
65 new: Arc<[PlSmallStr]>,
66 swapping: bool,
68 #[cfg_attr(feature = "ir_serde", serde(skip))]
69 schema: CachedSchema,
70 },
71 Explode {
72 columns: Arc<[PlSmallStr]>,
73 #[cfg_attr(feature = "ir_serde", serde(skip))]
74 schema: CachedSchema,
75 },
76 #[cfg(feature = "pivot")]
77 Unpivot {
78 args: Arc<UnpivotArgsIR>,
79 #[cfg_attr(feature = "ir_serde", serde(skip))]
80 schema: CachedSchema,
81 },
82 #[cfg_attr(feature = "ir_serde", serde(skip))]
83 Opaque {
84 function: Arc<dyn DataFrameUdf>,
85 schema: Option<Arc<dyn UdfSchema>>,
86 predicate_pd: bool,
88 projection_pd: bool,
90 streamable: bool,
91 fmt_str: PlSmallStr,
93 },
94 #[cfg_attr(feature = "ir_serde", serde(skip))]
96 Pipeline {
97 function: Arc<Mutex<dyn DataFrameUdfMut>>,
98 schema: SchemaRef,
99 original: Option<Arc<IRPlan>>,
100 },
101}
102
103impl Eq for FunctionIR {}
104
105impl PartialEq for FunctionIR {
106 fn eq(&self, other: &Self) -> bool {
107 use FunctionIR::*;
108 match (self, other) {
109 (Rechunk, Rechunk) => true,
110 (
111 FastCount {
112 sources: srcs_l, ..
113 },
114 FastCount {
115 sources: srcs_r, ..
116 },
117 ) => srcs_l == srcs_r,
118 (
119 Rename {
120 existing: existing_l,
121 new: new_l,
122 ..
123 },
124 Rename {
125 existing: existing_r,
126 new: new_r,
127 ..
128 },
129 ) => existing_l == existing_r && new_l == new_r,
130 (Explode { columns: l, .. }, Explode { columns: r, .. }) => l == r,
131 #[cfg(feature = "pivot")]
132 (Unpivot { args: l, .. }, Unpivot { args: r, .. }) => l == r,
133 (RowIndex { name: l, .. }, RowIndex { name: r, .. }) => l == r,
134 #[cfg(feature = "merge_sorted")]
135 (MergeSorted { column: l }, MergeSorted { column: r }) => l == r,
136 _ => false,
137 }
138 }
139}
140
141impl Hash for FunctionIR {
142 fn hash<H: Hasher>(&self, state: &mut H) {
143 std::mem::discriminant(self).hash(state);
144 match self {
145 #[cfg(feature = "python")]
146 FunctionIR::OpaquePython { .. } => {},
147 FunctionIR::Opaque { fmt_str, .. } => fmt_str.hash(state),
148 FunctionIR::FastCount {
149 sources,
150 scan_type,
151 alias,
152 } => {
153 sources.hash(state);
154 scan_type.hash(state);
155 alias.hash(state);
156 },
157 FunctionIR::Pipeline { .. } => {},
158 FunctionIR::Unnest { columns } => columns.hash(state),
159 FunctionIR::Rechunk => {},
160 #[cfg(feature = "merge_sorted")]
161 FunctionIR::MergeSorted { column } => column.hash(state),
162 FunctionIR::Rename {
163 existing,
164 new,
165 swapping: _,
166 ..
167 } => {
168 existing.hash(state);
169 new.hash(state);
170 },
171 FunctionIR::Explode { columns, schema: _ } => columns.hash(state),
172 #[cfg(feature = "pivot")]
173 FunctionIR::Unpivot { args, schema: _ } => args.hash(state),
174 FunctionIR::RowIndex {
175 name,
176 schema: _,
177 offset,
178 } => {
179 name.hash(state);
180 offset.hash(state);
181 },
182 }
183 }
184}
185
186impl FunctionIR {
187 pub fn is_streamable(&self) -> bool {
189 use FunctionIR::*;
190 match self {
191 Rechunk | Pipeline { .. } => false,
192 #[cfg(feature = "merge_sorted")]
193 MergeSorted { .. } => false,
194 FastCount { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => true,
195 #[cfg(feature = "pivot")]
196 Unpivot { .. } => true,
197 Opaque { streamable, .. } => *streamable,
198 #[cfg(feature = "python")]
199 OpaquePython(OpaquePythonUdf { streamable, .. }) => *streamable,
200 RowIndex { .. } => false,
201 }
202 }
203
204 pub fn expands_rows(&self) -> bool {
206 use FunctionIR::*;
207 match self {
208 #[cfg(feature = "merge_sorted")]
209 MergeSorted { .. } => true,
210 #[cfg(feature = "pivot")]
211 Unpivot { .. } => true,
212 Explode { .. } => true,
213 _ => false,
214 }
215 }
216
217 pub(crate) fn allow_predicate_pd(&self) -> bool {
218 use FunctionIR::*;
219 match self {
220 Opaque { predicate_pd, .. } => *predicate_pd,
221 #[cfg(feature = "python")]
222 OpaquePython(OpaquePythonUdf { predicate_pd, .. }) => *predicate_pd,
223 #[cfg(feature = "pivot")]
224 Unpivot { .. } => true,
225 Rechunk | Unnest { .. } | Rename { .. } | Explode { .. } => true,
226 #[cfg(feature = "merge_sorted")]
227 MergeSorted { .. } => true,
228 RowIndex { .. } | FastCount { .. } => false,
229 Pipeline { .. } => unimplemented!(),
230 }
231 }
232
233 pub(crate) fn allow_projection_pd(&self) -> bool {
234 use FunctionIR::*;
235 match self {
236 Opaque { projection_pd, .. } => *projection_pd,
237 #[cfg(feature = "python")]
238 OpaquePython(OpaquePythonUdf { projection_pd, .. }) => *projection_pd,
239 Rechunk | FastCount { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => true,
240 #[cfg(feature = "pivot")]
241 Unpivot { .. } => true,
242 #[cfg(feature = "merge_sorted")]
243 MergeSorted { .. } => true,
244 RowIndex { .. } => true,
245 Pipeline { .. } => unimplemented!(),
246 }
247 }
248
249 pub(crate) fn additional_projection_pd_columns(&self) -> Cow<[PlSmallStr]> {
250 use FunctionIR::*;
251 match self {
252 Unnest { columns } => Cow::Borrowed(columns.as_ref()),
253 Explode { columns, .. } => Cow::Borrowed(columns.as_ref()),
254 #[cfg(feature = "merge_sorted")]
255 MergeSorted { column, .. } => Cow::Owned(vec![column.clone()]),
256 _ => Cow::Borrowed(&[]),
257 }
258 }
259
260 pub fn evaluate(&self, mut df: DataFrame) -> PolarsResult<DataFrame> {
261 use FunctionIR::*;
262 match self {
263 Opaque { function, .. } => function.call_udf(df),
264 #[cfg(feature = "python")]
265 OpaquePython(OpaquePythonUdf {
266 function,
267 validate_output,
268 schema,
269 ..
270 }) => python_udf::call_python_udf(function, df, *validate_output, schema.clone()),
271 FastCount {
272 sources,
273 scan_type,
274 alias,
275 } => count::count_rows(sources, scan_type, alias.clone()),
276 Rechunk => {
277 df.as_single_chunk_par();
278 Ok(df)
279 },
280 #[cfg(feature = "merge_sorted")]
281 MergeSorted { column } => merge_sorted(&df, column.as_ref()),
282 Unnest { columns: _columns } => {
283 feature_gated!("dtype-struct", df.unnest(_columns.iter().cloned()))
284 },
285 Pipeline { function, .. } => {
286 #[cfg(feature = "dtype-categorical")]
288 {
289 let _sc = StringCacheHolder::hold();
290 function.lock().unwrap().call_udf(df)
291 }
292
293 #[cfg(not(feature = "dtype-categorical"))]
294 {
295 function.lock().unwrap().call_udf(df)
296 }
297 },
298 Rename { existing, new, .. } => rename::rename_impl(df, existing, new),
299 Explode { columns, .. } => df.explode(columns.iter().cloned()),
300 #[cfg(feature = "pivot")]
301 Unpivot { args, .. } => {
302 use polars_ops::pivot::UnpivotDF;
303 let args = (**args).clone();
304 df.unpivot2(args)
305 },
306 RowIndex { name, offset, .. } => df.with_row_index(name.clone(), *offset),
307 }
308 }
309
310 pub fn to_streaming_lp(&self) -> Option<IRPlanRef> {
311 let Self::Pipeline {
312 function: _,
313 schema: _,
314 original,
315 } = self
316 else {
317 return None;
318 };
319
320 Some(original.as_ref()?.as_ref().as_ref())
321 }
322}
323
324impl Debug for FunctionIR {
325 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
326 write!(f, "{self}")
327 }
328}
329
330impl Display for FunctionIR {
331 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332 use FunctionIR::*;
333 match self {
334 Opaque { fmt_str, .. } => write!(f, "{fmt_str}"),
335 Unnest { columns } => {
336 write!(f, "UNNEST by:")?;
337 let columns = columns.as_ref();
338 fmt_column_delimited(f, columns, "[", "]")
339 },
340 Pipeline { original, .. } => {
341 if let Some(original) = original {
342 let ir_display = original.as_ref().display();
343
344 writeln!(f, "--- STREAMING")?;
345 write!(f, "{ir_display}")?;
346 let indent = 2;
347 write!(f, "{:indent$}--- END STREAMING", "")
348 } else {
349 write!(f, "STREAMING")
350 }
351 },
352 FastCount {
353 sources,
354 scan_type,
355 alias,
356 } => {
357 let scan_type: &str = scan_type.into();
358 let default_column_name = PlSmallStr::from_static(crate::constants::LEN);
359 let alias = alias.as_ref().unwrap_or(&default_column_name);
360
361 write!(
362 f,
363 "FAST COUNT ({scan_type}) {} as \"{alias}\"",
364 ScanSourcesDisplay(sources)
365 )
366 },
367 v => {
368 let s: &str = v.into();
369 write!(f, "{s}")
370 },
371 }
372 }
373}