polars_python/lazyframe/
general.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::PathBuf;
4
5use either::Either;
6use polars::io::{HiveOptions, RowIndex};
7use polars::time::*;
8use polars_core::prelude::*;
9#[cfg(feature = "parquet")]
10use polars_parquet::arrow::write::StatisticsOptions;
11use polars_plan::dsl::ScanSources;
12use polars_plan::plans::{AExpr, IR};
13use polars_utils::arena::{Arena, Node};
14use polars_utils::python_function::PythonObject;
15use pyo3::exceptions::PyTypeError;
16use pyo3::prelude::*;
17use pyo3::pybacked::PyBackedStr;
18use pyo3::types::{PyDict, PyDictMethods, PyList};
19
20use super::{PyLazyFrame, PyOptFlags, SinkTarget};
21use crate::error::PyPolarsErr;
22use crate::expr::ToExprs;
23use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
24use crate::lazyframe::visit::NodeTraverser;
25use crate::prelude::*;
26use crate::utils::{EnterPolarsExt, to_py_err};
27use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
28
29fn pyobject_to_first_path_and_scan_sources(
30    obj: PyObject,
31) -> PyResult<(Option<PathBuf>, ScanSources)> {
32    use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
33    Ok(match get_python_scan_source_input(obj, false)? {
34        PythonScanSourceInput::Path(path) => {
35            (Some(path.clone()), ScanSources::Paths([path].into()))
36        },
37        PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
38        PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
39    })
40}
41
42fn post_opt_callback(
43    lambda: &PyObject,
44    root: Node,
45    lp_arena: &mut Arena<IR>,
46    expr_arena: &mut Arena<AExpr>,
47    duration_since_start: Option<std::time::Duration>,
48) -> PolarsResult<()> {
49    Python::with_gil(|py| {
50        let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
51
52        // Get a copy of the arenas.
53        let arenas = nt.get_arenas();
54
55        // Pass the node visitor which allows the python callback to replace parts of the query plan.
56        // Remove "cuda" or specify better once we have multiple post-opt callbacks.
57        lambda
58            .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
59            .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
60
61        // Unpack the arenas.
62        // At this point the `nt` is useless.
63
64        std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
65        std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
66
67        Ok(())
68    })
69}
70
71#[pymethods]
72#[allow(clippy::should_implement_trait)]
73impl PyLazyFrame {
74    #[staticmethod]
75    #[cfg(feature = "json")]
76    #[allow(clippy::too_many_arguments)]
77    #[pyo3(signature = (
78        source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
79        row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
80    ))]
81    fn new_from_ndjson(
82        source: Option<PyObject>,
83        sources: Wrap<ScanSources>,
84        infer_schema_length: Option<usize>,
85        schema: Option<Wrap<Schema>>,
86        schema_overrides: Option<Wrap<Schema>>,
87        batch_size: Option<NonZeroUsize>,
88        n_rows: Option<usize>,
89        low_memory: bool,
90        rechunk: bool,
91        row_index: Option<(String, IdxSize)>,
92        ignore_errors: bool,
93        include_file_paths: Option<String>,
94        cloud_options: Option<Vec<(String, String)>>,
95        credential_provider: Option<PyObject>,
96        retries: usize,
97        file_cache_ttl: Option<u64>,
98    ) -> PyResult<Self> {
99        use cloud::credential_provider::PlCredentialProvider;
100        let row_index = row_index.map(|(name, offset)| RowIndex {
101            name: name.into(),
102            offset,
103        });
104
105        let sources = sources.0;
106        let (first_path, sources) = match source {
107            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
108            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
109        };
110
111        let mut r = LazyJsonLineReader::new_with_sources(sources);
112
113        #[cfg(feature = "cloud")]
114        if let Some(first_path) = first_path {
115            let first_path_url = first_path.to_string_lossy();
116
117            let mut cloud_options =
118                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
119            cloud_options = cloud_options
120                .with_max_retries(retries)
121                .with_credential_provider(
122                    credential_provider.map(PlCredentialProvider::from_python_builder),
123                );
124
125            if let Some(file_cache_ttl) = file_cache_ttl {
126                cloud_options.file_cache_ttl = file_cache_ttl;
127            }
128
129            r = r.with_cloud_options(Some(cloud_options));
130        };
131
132        let lf = r
133            .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
134            .with_batch_size(batch_size)
135            .with_n_rows(n_rows)
136            .low_memory(low_memory)
137            .with_rechunk(rechunk)
138            .with_schema(schema.map(|schema| Arc::new(schema.0)))
139            .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
140            .with_row_index(row_index)
141            .with_ignore_errors(ignore_errors)
142            .with_include_file_paths(include_file_paths.map(|x| x.into()))
143            .finish()
144            .map_err(PyPolarsErr::from)?;
145
146        Ok(lf.into())
147    }
148
149    #[staticmethod]
150    #[cfg(feature = "csv")]
151    #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
152        low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
153        infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
154        encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
155        cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
156    )
157    )]
158    fn new_from_csv(
159        source: Option<PyObject>,
160        sources: Wrap<ScanSources>,
161        separator: &str,
162        has_header: bool,
163        ignore_errors: bool,
164        skip_rows: usize,
165        skip_lines: usize,
166        n_rows: Option<usize>,
167        cache: bool,
168        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
169        low_memory: bool,
170        comment_prefix: Option<&str>,
171        quote_char: Option<&str>,
172        null_values: Option<Wrap<NullValues>>,
173        missing_utf8_is_empty_string: bool,
174        infer_schema_length: Option<usize>,
175        with_schema_modify: Option<PyObject>,
176        rechunk: bool,
177        skip_rows_after_header: usize,
178        encoding: Wrap<CsvEncoding>,
179        row_index: Option<(String, IdxSize)>,
180        try_parse_dates: bool,
181        eol_char: &str,
182        raise_if_empty: bool,
183        truncate_ragged_lines: bool,
184        decimal_comma: bool,
185        glob: bool,
186        schema: Option<Wrap<Schema>>,
187        cloud_options: Option<Vec<(String, String)>>,
188        credential_provider: Option<PyObject>,
189        retries: usize,
190        file_cache_ttl: Option<u64>,
191        include_file_paths: Option<String>,
192    ) -> PyResult<Self> {
193        #[cfg(feature = "cloud")]
194        use cloud::credential_provider::PlCredentialProvider;
195
196        let null_values = null_values.map(|w| w.0);
197        let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
198        let separator = separator
199            .as_bytes()
200            .first()
201            .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
202            .copied()
203            .map_err(PyPolarsErr::from)?;
204        let eol_char = eol_char
205            .as_bytes()
206            .first()
207            .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
208            .copied()
209            .map_err(PyPolarsErr::from)?;
210        let row_index = row_index.map(|(name, offset)| RowIndex {
211            name: name.into(),
212            offset,
213        });
214
215        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
216            overwrite_dtype
217                .into_iter()
218                .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
219                .collect::<Schema>()
220        });
221
222        let sources = sources.0;
223        let (first_path, sources) = match source {
224            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
225            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
226        };
227
228        let mut r = LazyCsvReader::new_with_sources(sources);
229
230        #[cfg(feature = "cloud")]
231        if let Some(first_path) = first_path {
232            let first_path_url = first_path.to_string_lossy();
233
234            let mut cloud_options =
235                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
236            if let Some(file_cache_ttl) = file_cache_ttl {
237                cloud_options.file_cache_ttl = file_cache_ttl;
238            }
239            cloud_options = cloud_options
240                .with_max_retries(retries)
241                .with_credential_provider(
242                    credential_provider.map(PlCredentialProvider::from_python_builder),
243                );
244            r = r.with_cloud_options(Some(cloud_options));
245        }
246
247        let mut r = r
248            .with_infer_schema_length(infer_schema_length)
249            .with_separator(separator)
250            .with_has_header(has_header)
251            .with_ignore_errors(ignore_errors)
252            .with_skip_rows(skip_rows)
253            .with_skip_lines(skip_lines)
254            .with_n_rows(n_rows)
255            .with_cache(cache)
256            .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
257            .with_schema(schema.map(|schema| Arc::new(schema.0)))
258            .with_low_memory(low_memory)
259            .with_comment_prefix(comment_prefix.map(|x| x.into()))
260            .with_quote_char(quote_char)
261            .with_eol_char(eol_char)
262            .with_rechunk(rechunk)
263            .with_skip_rows_after_header(skip_rows_after_header)
264            .with_encoding(encoding.0)
265            .with_row_index(row_index)
266            .with_try_parse_dates(try_parse_dates)
267            .with_null_values(null_values)
268            .with_missing_is_null(!missing_utf8_is_empty_string)
269            .with_truncate_ragged_lines(truncate_ragged_lines)
270            .with_decimal_comma(decimal_comma)
271            .with_glob(glob)
272            .with_raise_if_empty(raise_if_empty)
273            .with_include_file_paths(include_file_paths.map(|x| x.into()));
274
275        if let Some(lambda) = with_schema_modify {
276            let f = |schema: Schema| {
277                let iter = schema.iter_names().map(|s| s.as_str());
278                Python::with_gil(|py| {
279                    let names = PyList::new(py, iter).unwrap();
280
281                    let out = lambda.call1(py, (names,)).expect("python function failed");
282                    let new_names = out
283                        .extract::<Vec<String>>(py)
284                        .expect("python function should return List[str]");
285                    polars_ensure!(new_names.len() == schema.len(),
286                        ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
287                    );
288                    Ok(schema
289                        .iter_values()
290                        .zip(new_names)
291                        .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
292                        .collect())
293                })
294            };
295            r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
296        }
297
298        Ok(r.finish().map_err(PyPolarsErr::from)?.into())
299    }
300
301    #[cfg(feature = "parquet")]
302    #[staticmethod]
303    #[pyo3(signature = (
304        source, sources, n_rows, cache, parallel, rechunk, row_index, low_memory, cloud_options,
305        credential_provider, use_statistics, hive_partitioning, schema, hive_schema,
306        try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns,
307        cast_options,
308    ))]
309    fn new_from_parquet(
310        source: Option<PyObject>,
311        sources: Wrap<ScanSources>,
312        n_rows: Option<usize>,
313        cache: bool,
314        parallel: Wrap<ParallelStrategy>,
315        rechunk: bool,
316        row_index: Option<(String, IdxSize)>,
317        low_memory: bool,
318        cloud_options: Option<Vec<(String, String)>>,
319        credential_provider: Option<PyObject>,
320        use_statistics: bool,
321        hive_partitioning: Option<bool>,
322        schema: Option<Wrap<Schema>>,
323        hive_schema: Option<Wrap<Schema>>,
324        try_parse_hive_dates: bool,
325        retries: usize,
326        glob: bool,
327        include_file_paths: Option<String>,
328        allow_missing_columns: bool,
329        cast_options: Wrap<CastColumnsPolicy>,
330    ) -> PyResult<Self> {
331        use cloud::credential_provider::PlCredentialProvider;
332        use polars_utils::slice_enum::Slice;
333
334        use crate::utils::to_py_err;
335
336        let parallel = parallel.0;
337
338        let options = ParquetOptions {
339            schema: schema.map(|x| Arc::new(x.0)),
340            parallel,
341            low_memory,
342            use_statistics,
343        };
344
345        let sources = sources.0;
346        let (first_path, sources) = match source {
347            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
348            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
349        };
350
351        let cloud_options = if let Some(first_path) = first_path {
352            #[cfg(feature = "cloud")]
353            {
354                let first_path_url = first_path.to_string_lossy();
355                let cloud_options =
356                    parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
357
358                Some(
359                    cloud_options
360                        .with_max_retries(retries)
361                        .with_credential_provider(
362                            credential_provider.map(PlCredentialProvider::from_python_builder),
363                        ),
364                )
365            }
366
367            #[cfg(not(feature = "cloud"))]
368            {
369                None
370            }
371        } else {
372            None
373        };
374
375        let hive_schema = hive_schema.map(|s| Arc::new(s.0));
376
377        let row_index = row_index.map(|(name, offset)| RowIndex {
378            name: name.into(),
379            offset,
380        });
381
382        let hive_options = HiveOptions {
383            enabled: hive_partitioning,
384            hive_start_idx: 0,
385            schema: hive_schema,
386            try_parse_dates: try_parse_hive_dates,
387        };
388
389        let unified_scan_args = UnifiedScanArgs {
390            schema: None,
391            cloud_options,
392            hive_options,
393            rechunk,
394            cache,
395            glob,
396            projection: None,
397            row_index,
398            pre_slice: n_rows.map(|len| Slice::Positive { offset: 0, len }),
399            cast_columns_policy: cast_options.0,
400            missing_columns_policy: if allow_missing_columns {
401                MissingColumnsPolicy::Insert
402            } else {
403                MissingColumnsPolicy::Raise
404            },
405            include_file_paths: include_file_paths.map(|x| x.into()),
406        };
407
408        let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
409            .map_err(to_py_err)?
410            .build()
411            .into();
412
413        Ok(lf.into())
414    }
415
416    #[cfg(feature = "ipc")]
417    #[staticmethod]
418    #[pyo3(signature = (
419        source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
420        hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
421        include_file_paths
422    ))]
423    fn new_from_ipc(
424        source: Option<PyObject>,
425        sources: Wrap<ScanSources>,
426        n_rows: Option<usize>,
427        cache: bool,
428        rechunk: bool,
429        row_index: Option<(String, IdxSize)>,
430        cloud_options: Option<Vec<(String, String)>>,
431        credential_provider: Option<PyObject>,
432        hive_partitioning: Option<bool>,
433        hive_schema: Option<Wrap<Schema>>,
434        try_parse_hive_dates: bool,
435        retries: usize,
436        file_cache_ttl: Option<u64>,
437        include_file_paths: Option<String>,
438    ) -> PyResult<Self> {
439        #[cfg(feature = "cloud")]
440        use cloud::credential_provider::PlCredentialProvider;
441        let row_index = row_index.map(|(name, offset)| RowIndex {
442            name: name.into(),
443            offset,
444        });
445
446        let hive_options = HiveOptions {
447            enabled: hive_partitioning,
448            hive_start_idx: 0,
449            schema: hive_schema.map(|x| Arc::new(x.0)),
450            try_parse_dates: try_parse_hive_dates,
451        };
452
453        let mut args = ScanArgsIpc {
454            n_rows,
455            cache,
456            rechunk,
457            row_index,
458            cloud_options: None,
459            hive_options,
460            include_file_paths: include_file_paths.map(|x| x.into()),
461        };
462
463        let sources = sources.0;
464        let (first_path, sources) = match source {
465            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
466            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
467        };
468
469        #[cfg(feature = "cloud")]
470        if let Some(first_path) = first_path {
471            let first_path_url = first_path.to_string_lossy();
472
473            let mut cloud_options =
474                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
475            if let Some(file_cache_ttl) = file_cache_ttl {
476                cloud_options.file_cache_ttl = file_cache_ttl;
477            }
478            args.cloud_options = Some(
479                cloud_options
480                    .with_max_retries(retries)
481                    .with_credential_provider(
482                        credential_provider.map(PlCredentialProvider::from_python_builder),
483                    ),
484            );
485        }
486
487        let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
488        Ok(lf.into())
489    }
490
491    #[staticmethod]
492    #[pyo3(signature = (
493        dataset_object
494    ))]
495    fn new_from_dataset_object(dataset_object: PyObject) -> PyResult<Self> {
496        use crate::dataset::dataset_provider_funcs;
497
498        polars_plan::dsl::DATASET_PROVIDER_VTABLE.get_or_init(|| PythonDatasetProviderVTable {
499            reader_name: dataset_provider_funcs::reader_name,
500            schema: dataset_provider_funcs::schema,
501            to_dataset_scan: dataset_provider_funcs::to_dataset_scan,
502        });
503
504        let lf =
505            LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
506                .into();
507
508        Ok(lf)
509    }
510
511    #[staticmethod]
512    fn scan_from_python_function_arrow_schema(
513        schema: &Bound<'_, PyList>,
514        scan_fn: PyObject,
515        pyarrow: bool,
516        validate_schema: bool,
517    ) -> PyResult<Self> {
518        let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
519
520        Ok(LazyFrame::scan_from_python_function(
521            Either::Right(schema),
522            scan_fn,
523            pyarrow,
524            validate_schema,
525        )
526        .into())
527    }
528
529    #[staticmethod]
530    fn scan_from_python_function_pl_schema(
531        schema: Vec<(PyBackedStr, Wrap<DataType>)>,
532        scan_fn: PyObject,
533        pyarrow: bool,
534        validate_schema: bool,
535    ) -> PyResult<Self> {
536        let schema = Arc::new(Schema::from_iter(
537            schema
538                .into_iter()
539                .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
540        ));
541        Ok(LazyFrame::scan_from_python_function(
542            Either::Right(schema),
543            scan_fn,
544            pyarrow,
545            validate_schema,
546        )
547        .into())
548    }
549
550    #[staticmethod]
551    fn scan_from_python_function_schema_function(
552        schema_fn: PyObject,
553        scan_fn: PyObject,
554        validate_schema: bool,
555    ) -> PyResult<Self> {
556        Ok(LazyFrame::scan_from_python_function(
557            Either::Left(schema_fn),
558            scan_fn,
559            false,
560            validate_schema,
561        )
562        .into())
563    }
564
565    fn describe_plan(&self, py: Python) -> PyResult<String> {
566        py.enter_polars(|| self.ldf.describe_plan())
567    }
568
569    fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
570        py.enter_polars(|| self.ldf.describe_optimized_plan())
571    }
572
573    fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
574        py.enter_polars(|| self.ldf.describe_plan_tree())
575    }
576
577    fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
578        py.enter_polars(|| self.ldf.describe_optimized_plan_tree())
579    }
580
581    fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
582        py.enter_polars(|| self.ldf.to_dot(optimized))
583    }
584
585    #[cfg(feature = "new_streaming")]
586    fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
587        py.enter_polars(|| self.ldf.to_dot_streaming_phys(optimized))
588    }
589
590    fn optimization_toggle(
591        &self,
592        type_coercion: bool,
593        type_check: bool,
594        predicate_pushdown: bool,
595        projection_pushdown: bool,
596        simplify_expression: bool,
597        slice_pushdown: bool,
598        comm_subplan_elim: bool,
599        comm_subexpr_elim: bool,
600        cluster_with_columns: bool,
601        collapse_joins: bool,
602        streaming: bool,
603        _eager: bool,
604        _check_order: bool,
605        #[allow(unused_variables)] new_streaming: bool,
606    ) -> Self {
607        let ldf = self.ldf.clone();
608        let mut ldf = ldf
609            .with_type_coercion(type_coercion)
610            .with_type_check(type_check)
611            .with_predicate_pushdown(predicate_pushdown)
612            .with_simplify_expr(simplify_expression)
613            .with_slice_pushdown(slice_pushdown)
614            .with_cluster_with_columns(cluster_with_columns)
615            .with_collapse_joins(collapse_joins)
616            .with_check_order(_check_order)
617            ._with_eager(_eager)
618            .with_projection_pushdown(projection_pushdown);
619
620        #[cfg(feature = "streaming")]
621        {
622            ldf = ldf.with_streaming(streaming);
623        }
624
625        #[cfg(feature = "new_streaming")]
626        {
627            ldf = ldf.with_new_streaming(new_streaming);
628        }
629
630        #[cfg(feature = "cse")]
631        {
632            ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
633            ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
634        }
635
636        ldf.into()
637    }
638
639    fn sort(
640        &self,
641        by_column: &str,
642        descending: bool,
643        nulls_last: bool,
644        maintain_order: bool,
645        multithreaded: bool,
646    ) -> Self {
647        let ldf = self.ldf.clone();
648        ldf.sort(
649            [by_column],
650            SortMultipleOptions {
651                descending: vec![descending],
652                nulls_last: vec![nulls_last],
653                multithreaded,
654                maintain_order,
655                limit: None,
656            },
657        )
658        .into()
659    }
660
661    fn sort_by_exprs(
662        &self,
663        by: Vec<PyExpr>,
664        descending: Vec<bool>,
665        nulls_last: Vec<bool>,
666        maintain_order: bool,
667        multithreaded: bool,
668    ) -> Self {
669        let ldf = self.ldf.clone();
670        let exprs = by.to_exprs();
671        ldf.sort_by_exprs(
672            exprs,
673            SortMultipleOptions {
674                descending,
675                nulls_last,
676                maintain_order,
677                multithreaded,
678                limit: None,
679            },
680        )
681        .into()
682    }
683
684    fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
685        let ldf = self.ldf.clone();
686        let exprs = by.to_exprs();
687        ldf.top_k(
688            k,
689            exprs,
690            SortMultipleOptions::new().with_order_descending_multi(reverse),
691        )
692        .into()
693    }
694
695    fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
696        let ldf = self.ldf.clone();
697        let exprs = by.to_exprs();
698        ldf.bottom_k(
699            k,
700            exprs,
701            SortMultipleOptions::new().with_order_descending_multi(reverse),
702        )
703        .into()
704    }
705
706    fn cache(&self) -> Self {
707        let ldf = self.ldf.clone();
708        ldf.cache().into()
709    }
710
711    #[pyo3(signature = (optflags))]
712    fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
713        let ldf = self.ldf.clone();
714        ldf.with_optimizations(optflags.inner).into()
715    }
716
717    #[pyo3(signature = (lambda_post_opt=None))]
718    fn profile(
719        &self,
720        py: Python<'_>,
721        lambda_post_opt: Option<PyObject>,
722    ) -> PyResult<(PyDataFrame, PyDataFrame)> {
723        let (df, time_df) = py.enter_polars(|| {
724            let ldf = self.ldf.clone();
725            if let Some(lambda) = lambda_post_opt {
726                ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
727                    post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
728                })
729            } else {
730                ldf.profile()
731            }
732        })?;
733        Ok((df.into(), time_df.into()))
734    }
735
736    #[pyo3(signature = (engine, lambda_post_opt=None))]
737    fn collect(
738        &self,
739        py: Python<'_>,
740        engine: Wrap<Engine>,
741        lambda_post_opt: Option<PyObject>,
742    ) -> PyResult<PyDataFrame> {
743        py.enter_polars_df(|| {
744            let ldf = self.ldf.clone();
745            if let Some(lambda) = lambda_post_opt {
746                ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
747                    post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
748                })
749            } else {
750                ldf.collect_with_engine(engine.0)
751            }
752        })
753    }
754
755    #[pyo3(signature = (engine, lambda))]
756    fn collect_with_callback(
757        &self,
758        py: Python<'_>,
759        engine: Wrap<Engine>,
760        lambda: PyObject,
761    ) -> PyResult<()> {
762        py.enter_polars_ok(|| {
763            let ldf = self.ldf.clone();
764
765            polars_core::POOL.spawn(move || {
766                let result = ldf
767                    .collect_with_engine(engine.0)
768                    .map(PyDataFrame::new)
769                    .map_err(PyPolarsErr::from);
770
771                Python::with_gil(|py| match result {
772                    Ok(df) => {
773                        lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
774                    },
775                    Err(err) => {
776                        lambda
777                            .call1(py, (PyErr::from(err),))
778                            .map_err(|err| err.restore(py))
779                            .ok();
780                    },
781                });
782            });
783        })
784    }
785
786    #[cfg(all(feature = "streaming", feature = "parquet"))]
787    #[pyo3(signature = (
788        target, compression, compression_level, statistics, row_group_size, data_page_size,
789        cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
790    ))]
791    fn sink_parquet(
792        &self,
793        py: Python<'_>,
794        target: SinkTarget,
795        compression: &str,
796        compression_level: Option<i32>,
797        statistics: Wrap<StatisticsOptions>,
798        row_group_size: Option<usize>,
799        data_page_size: Option<usize>,
800        cloud_options: Option<Vec<(String, String)>>,
801        credential_provider: Option<PyObject>,
802        retries: usize,
803        sink_options: Wrap<SinkOptions>,
804        metadata: Wrap<Option<KeyValueMetadata>>,
805        field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
806    ) -> PyResult<PyLazyFrame> {
807        let compression = parse_parquet_compression(compression, compression_level)?;
808
809        let options = ParquetWriteOptions {
810            compression,
811            statistics: statistics.0,
812            row_group_size,
813            data_page_size,
814            key_value_metadata: metadata.0,
815            field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
816        };
817
818        let cloud_options = match target.base_path() {
819            None => None,
820            Some(base_path) => {
821                let cloud_options = parse_cloud_options(
822                    base_path.to_str().unwrap(),
823                    cloud_options.unwrap_or_default(),
824                )?;
825                Some(
826                    cloud_options
827                        .with_max_retries(retries)
828                        .with_credential_provider(
829                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
830                        ),
831                )
832            },
833        };
834
835        py.enter_polars(|| {
836            let ldf = self.ldf.clone();
837            match target {
838                SinkTarget::File(target) => {
839                    ldf.sink_parquet(target, options, cloud_options, sink_options.0)
840                },
841                SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
842                    Arc::new(partition.base_path),
843                    partition.file_path_cb.map(PartitionTargetCallback::Python),
844                    partition.variant,
845                    options,
846                    cloud_options,
847                    sink_options.0,
848                ),
849            }
850            .into()
851        })
852        .map(Into::into)
853        .map_err(Into::into)
854    }
855
856    #[cfg(all(feature = "streaming", feature = "ipc"))]
857    #[pyo3(signature = (
858        target, compression, compat_level, cloud_options, credential_provider, retries,
859        sink_options
860    ))]
861    fn sink_ipc(
862        &self,
863        py: Python<'_>,
864        target: SinkTarget,
865        compression: Wrap<Option<IpcCompression>>,
866        compat_level: PyCompatLevel,
867        cloud_options: Option<Vec<(String, String)>>,
868        credential_provider: Option<PyObject>,
869        retries: usize,
870        sink_options: Wrap<SinkOptions>,
871    ) -> PyResult<PyLazyFrame> {
872        let options = IpcWriterOptions {
873            compression: compression.0,
874            compat_level: compat_level.0,
875            ..Default::default()
876        };
877
878        #[cfg(feature = "cloud")]
879        let cloud_options = match target.base_path() {
880            None => None,
881            Some(base_path) => {
882                let cloud_options = parse_cloud_options(
883                    base_path.to_str().unwrap(),
884                    cloud_options.unwrap_or_default(),
885                )?;
886                Some(
887                    cloud_options
888                        .with_max_retries(retries)
889                        .with_credential_provider(
890                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
891                        ),
892                )
893            },
894        };
895
896        #[cfg(not(feature = "cloud"))]
897        let cloud_options = None;
898
899        py.enter_polars(|| {
900            let ldf = self.ldf.clone();
901            match target {
902                SinkTarget::File(target) => {
903                    ldf.sink_ipc(target, options, cloud_options, sink_options.0)
904                },
905                SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
906                    Arc::new(partition.base_path),
907                    partition.file_path_cb.map(PartitionTargetCallback::Python),
908                    partition.variant,
909                    options,
910                    cloud_options,
911                    sink_options.0,
912                ),
913            }
914        })
915        .map(Into::into)
916        .map_err(Into::into)
917    }
918
919    #[cfg(all(feature = "streaming", feature = "csv"))]
920    #[pyo3(signature = (
921        target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
922        datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
923        quote_style, cloud_options, credential_provider, retries, sink_options
924    ))]
925    fn sink_csv(
926        &self,
927        py: Python<'_>,
928        target: SinkTarget,
929        include_bom: bool,
930        include_header: bool,
931        separator: u8,
932        line_terminator: String,
933        quote_char: u8,
934        batch_size: NonZeroUsize,
935        datetime_format: Option<String>,
936        date_format: Option<String>,
937        time_format: Option<String>,
938        float_scientific: Option<bool>,
939        float_precision: Option<usize>,
940        null_value: Option<String>,
941        quote_style: Option<Wrap<QuoteStyle>>,
942        cloud_options: Option<Vec<(String, String)>>,
943        credential_provider: Option<PyObject>,
944        retries: usize,
945        sink_options: Wrap<SinkOptions>,
946    ) -> PyResult<PyLazyFrame> {
947        let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
948        let null_value = null_value.unwrap_or(SerializeOptions::default().null);
949
950        let serialize_options = SerializeOptions {
951            date_format,
952            time_format,
953            datetime_format,
954            float_scientific,
955            float_precision,
956            separator,
957            quote_char,
958            null: null_value,
959            line_terminator,
960            quote_style,
961        };
962
963        let options = CsvWriterOptions {
964            include_bom,
965            include_header,
966            batch_size,
967            serialize_options,
968        };
969
970        #[cfg(feature = "cloud")]
971        let cloud_options = match target.base_path() {
972            None => None,
973            Some(base_path) => {
974                let cloud_options = parse_cloud_options(
975                    base_path.to_str().unwrap(),
976                    cloud_options.unwrap_or_default(),
977                )?;
978                Some(
979                    cloud_options
980                        .with_max_retries(retries)
981                        .with_credential_provider(
982                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
983                        ),
984                )
985            },
986        };
987
988        #[cfg(not(feature = "cloud"))]
989        let cloud_options = None;
990
991        py.enter_polars(|| {
992            let ldf = self.ldf.clone();
993            match target {
994                SinkTarget::File(target) => {
995                    ldf.sink_csv(target, options, cloud_options, sink_options.0)
996                },
997                SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
998                    Arc::new(partition.base_path),
999                    partition.file_path_cb.map(PartitionTargetCallback::Python),
1000                    partition.variant,
1001                    options,
1002                    cloud_options,
1003                    sink_options.0,
1004                ),
1005            }
1006        })
1007        .map(Into::into)
1008        .map_err(Into::into)
1009    }
1010
1011    #[allow(clippy::too_many_arguments)]
1012    #[cfg(all(feature = "streaming", feature = "json"))]
1013    #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
1014    fn sink_json(
1015        &self,
1016        py: Python<'_>,
1017        target: SinkTarget,
1018        cloud_options: Option<Vec<(String, String)>>,
1019        credential_provider: Option<PyObject>,
1020        retries: usize,
1021        sink_options: Wrap<SinkOptions>,
1022    ) -> PyResult<PyLazyFrame> {
1023        let options = JsonWriterOptions {};
1024
1025        let cloud_options = match target.base_path() {
1026            None => None,
1027            Some(base_path) => {
1028                let cloud_options = parse_cloud_options(
1029                    base_path.to_str().unwrap(),
1030                    cloud_options.unwrap_or_default(),
1031                )?;
1032                Some(
1033                cloud_options
1034                    .with_max_retries(retries)
1035                    .with_credential_provider(
1036                        credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
1037                    ),
1038            )
1039            },
1040        };
1041
1042        py.enter_polars(|| {
1043            let ldf = self.ldf.clone();
1044            match target {
1045                SinkTarget::File(path) => {
1046                    ldf.sink_json(path, options, cloud_options, sink_options.0)
1047                },
1048                SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
1049                    Arc::new(partition.base_path),
1050                    partition.file_path_cb.map(PartitionTargetCallback::Python),
1051                    partition.variant,
1052                    options,
1053                    cloud_options,
1054                    sink_options.0,
1055                ),
1056            }
1057        })
1058        .map(Into::into)
1059        .map_err(Into::into)
1060    }
1061
1062    fn fetch(&self, py: Python<'_>, n_rows: usize) -> PyResult<PyDataFrame> {
1063        let ldf = self.ldf.clone();
1064        py.enter_polars_df(|| ldf.fetch(n_rows))
1065    }
1066
1067    fn filter(&mut self, predicate: PyExpr) -> Self {
1068        let ldf = self.ldf.clone();
1069        ldf.filter(predicate.inner).into()
1070    }
1071
1072    fn remove(&mut self, predicate: PyExpr) -> Self {
1073        let ldf = self.ldf.clone();
1074        ldf.remove(predicate.inner).into()
1075    }
1076
1077    fn select(&mut self, exprs: Vec<PyExpr>) -> Self {
1078        let ldf = self.ldf.clone();
1079        let exprs = exprs.to_exprs();
1080        ldf.select(exprs).into()
1081    }
1082
1083    fn select_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1084        let ldf = self.ldf.clone();
1085        let exprs = exprs.to_exprs();
1086        ldf.select_seq(exprs).into()
1087    }
1088
1089    fn group_by(&mut self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
1090        let ldf = self.ldf.clone();
1091        let by = by.to_exprs();
1092        let lazy_gb = if maintain_order {
1093            ldf.group_by_stable(by)
1094        } else {
1095            ldf.group_by(by)
1096        };
1097
1098        PyLazyGroupBy { lgb: Some(lazy_gb) }
1099    }
1100
1101    fn rolling(
1102        &mut self,
1103        index_column: PyExpr,
1104        period: &str,
1105        offset: &str,
1106        closed: Wrap<ClosedWindow>,
1107        by: Vec<PyExpr>,
1108    ) -> PyResult<PyLazyGroupBy> {
1109        let closed_window = closed.0;
1110        let ldf = self.ldf.clone();
1111        let by = by
1112            .into_iter()
1113            .map(|pyexpr| pyexpr.inner)
1114            .collect::<Vec<_>>();
1115        let lazy_gb = ldf.rolling(
1116            index_column.inner,
1117            by,
1118            RollingGroupOptions {
1119                index_column: "".into(),
1120                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1121                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1122                closed_window,
1123            },
1124        );
1125
1126        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1127    }
1128
1129    fn group_by_dynamic(
1130        &mut self,
1131        index_column: PyExpr,
1132        every: &str,
1133        period: &str,
1134        offset: &str,
1135        label: Wrap<Label>,
1136        include_boundaries: bool,
1137        closed: Wrap<ClosedWindow>,
1138        group_by: Vec<PyExpr>,
1139        start_by: Wrap<StartBy>,
1140    ) -> PyResult<PyLazyGroupBy> {
1141        let closed_window = closed.0;
1142        let group_by = group_by
1143            .into_iter()
1144            .map(|pyexpr| pyexpr.inner)
1145            .collect::<Vec<_>>();
1146        let ldf = self.ldf.clone();
1147        let lazy_gb = ldf.group_by_dynamic(
1148            index_column.inner,
1149            group_by,
1150            DynamicGroupOptions {
1151                every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1152                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1153                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1154                label: label.0,
1155                include_boundaries,
1156                closed_window,
1157                start_by: start_by.0,
1158                ..Default::default()
1159            },
1160        );
1161
1162        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1163    }
1164
1165    fn with_context(&self, contexts: Vec<Self>) -> Self {
1166        let contexts = contexts.into_iter().map(|ldf| ldf.ldf).collect::<Vec<_>>();
1167        self.ldf.clone().with_context(contexts).into()
1168    }
1169
1170    #[cfg(feature = "asof_join")]
1171    #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1172    fn join_asof(
1173        &self,
1174        other: Self,
1175        left_on: PyExpr,
1176        right_on: PyExpr,
1177        left_by: Option<Vec<PyBackedStr>>,
1178        right_by: Option<Vec<PyBackedStr>>,
1179        allow_parallel: bool,
1180        force_parallel: bool,
1181        suffix: String,
1182        strategy: Wrap<AsofStrategy>,
1183        tolerance: Option<Wrap<AnyValue<'_>>>,
1184        tolerance_str: Option<String>,
1185        coalesce: bool,
1186        allow_eq: bool,
1187        check_sortedness: bool,
1188    ) -> PyResult<Self> {
1189        let coalesce = if coalesce {
1190            JoinCoalesce::CoalesceColumns
1191        } else {
1192            JoinCoalesce::KeepColumns
1193        };
1194        let ldf = self.ldf.clone();
1195        let other = other.ldf;
1196        let left_on = left_on.inner;
1197        let right_on = right_on.inner;
1198        Ok(ldf
1199            .join_builder()
1200            .with(other)
1201            .left_on([left_on])
1202            .right_on([right_on])
1203            .allow_parallel(allow_parallel)
1204            .force_parallel(force_parallel)
1205            .coalesce(coalesce)
1206            .how(JoinType::AsOf(AsOfOptions {
1207                strategy: strategy.0,
1208                left_by: left_by.map(strings_to_pl_smallstr),
1209                right_by: right_by.map(strings_to_pl_smallstr),
1210                tolerance: tolerance.map(|t| t.0.into_static()),
1211                tolerance_str: tolerance_str.map(|s| s.into()),
1212                allow_eq,
1213                check_sortedness,
1214            }))
1215            .suffix(suffix)
1216            .finish()
1217            .into())
1218    }
1219
1220    #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1221    fn join(
1222        &self,
1223        other: Self,
1224        left_on: Vec<PyExpr>,
1225        right_on: Vec<PyExpr>,
1226        allow_parallel: bool,
1227        force_parallel: bool,
1228        nulls_equal: bool,
1229        how: Wrap<JoinType>,
1230        suffix: String,
1231        validate: Wrap<JoinValidation>,
1232        maintain_order: Wrap<MaintainOrderJoin>,
1233        coalesce: Option<bool>,
1234    ) -> PyResult<Self> {
1235        let coalesce = match coalesce {
1236            None => JoinCoalesce::JoinSpecific,
1237            Some(true) => JoinCoalesce::CoalesceColumns,
1238            Some(false) => JoinCoalesce::KeepColumns,
1239        };
1240        let ldf = self.ldf.clone();
1241        let other = other.ldf;
1242        let left_on = left_on
1243            .into_iter()
1244            .map(|pyexpr| pyexpr.inner)
1245            .collect::<Vec<_>>();
1246        let right_on = right_on
1247            .into_iter()
1248            .map(|pyexpr| pyexpr.inner)
1249            .collect::<Vec<_>>();
1250
1251        Ok(ldf
1252            .join_builder()
1253            .with(other)
1254            .left_on(left_on)
1255            .right_on(right_on)
1256            .allow_parallel(allow_parallel)
1257            .force_parallel(force_parallel)
1258            .join_nulls(nulls_equal)
1259            .how(how.0)
1260            .suffix(suffix)
1261            .validate(validate.0)
1262            .coalesce(coalesce)
1263            .maintain_order(maintain_order.0)
1264            .finish()
1265            .into())
1266    }
1267
1268    fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1269        let ldf = self.ldf.clone();
1270        let other = other.ldf;
1271
1272        let predicates = predicates.to_exprs();
1273
1274        Ok(ldf
1275            .join_builder()
1276            .with(other)
1277            .suffix(suffix)
1278            .join_where(predicates)
1279            .into())
1280    }
1281
1282    fn with_columns(&mut self, exprs: Vec<PyExpr>) -> Self {
1283        let ldf = self.ldf.clone();
1284        ldf.with_columns(exprs.to_exprs()).into()
1285    }
1286
1287    fn with_columns_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1288        let ldf = self.ldf.clone();
1289        ldf.with_columns_seq(exprs.to_exprs()).into()
1290    }
1291
1292    fn match_to_schema<'py>(
1293        &self,
1294        schema: Wrap<Schema>,
1295        missing_columns: &Bound<'py, PyAny>,
1296        missing_struct_fields: &Bound<'py, PyAny>,
1297        extra_columns: Wrap<ExtraColumnsPolicy>,
1298        extra_struct_fields: &Bound<'py, PyAny>,
1299        integer_cast: &Bound<'py, PyAny>,
1300        float_cast: &Bound<'py, PyAny>,
1301    ) -> PyResult<Self> {
1302        fn parse_missing_columns<'py>(
1303            schema: &Schema,
1304            missing_columns: &Bound<'py, PyAny>,
1305        ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1306            let mut out = Vec::with_capacity(schema.len());
1307            if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1308                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1309            } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1310                out.extend(std::iter::repeat_n(
1311                    MissingColumnsPolicyOrExpr::Raise,
1312                    schema.len(),
1313                ));
1314                for (key, value) in dict.iter() {
1315                    let key = key.extract::<String>()?;
1316                    let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1317                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1318                }
1319            } else {
1320                return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1321            }
1322            Ok(out)
1323        }
1324        fn parse_missing_struct_fields<'py>(
1325            schema: &Schema,
1326            missing_struct_fields: &Bound<'py, PyAny>,
1327        ) -> PyResult<Vec<MissingColumnsPolicy>> {
1328            let mut out = Vec::with_capacity(schema.len());
1329            if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1330                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1331            } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1332                out.extend(std::iter::repeat_n(
1333                    MissingColumnsPolicy::Raise,
1334                    schema.len(),
1335                ));
1336                for (key, value) in dict.iter() {
1337                    let key = key.extract::<String>()?;
1338                    let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1339                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1340                }
1341            } else {
1342                return Err(PyTypeError::new_err(
1343                    "Invalid value for `missing_struct_fields`",
1344                ));
1345            }
1346            Ok(out)
1347        }
1348        fn parse_extra_struct_fields<'py>(
1349            schema: &Schema,
1350            extra_struct_fields: &Bound<'py, PyAny>,
1351        ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1352            let mut out = Vec::with_capacity(schema.len());
1353            if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1354                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1355            } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1356                out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1357                for (key, value) in dict.iter() {
1358                    let key = key.extract::<String>()?;
1359                    let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1360                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1361                }
1362            } else {
1363                return Err(PyTypeError::new_err(
1364                    "Invalid value for `extra_struct_fields`",
1365                ));
1366            }
1367            Ok(out)
1368        }
1369        fn parse_cast<'py>(
1370            schema: &Schema,
1371            cast: &Bound<'py, PyAny>,
1372        ) -> PyResult<Vec<UpcastOrForbid>> {
1373            let mut out = Vec::with_capacity(schema.len());
1374            if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1375                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1376            } else if let Ok(dict) = cast.downcast::<PyDict>() {
1377                out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1378                for (key, value) in dict.iter() {
1379                    let key = key.extract::<String>()?;
1380                    let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1381                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1382                }
1383            } else {
1384                return Err(PyTypeError::new_err(
1385                    "Invalid value for `integer_cast` / `float_cast`",
1386                ));
1387            }
1388            Ok(out)
1389        }
1390
1391        let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1392        let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1393        let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1394        let integer_cast = parse_cast(&schema.0, integer_cast)?;
1395        let float_cast = parse_cast(&schema.0, float_cast)?;
1396
1397        let per_column = (0..schema.0.len())
1398            .map(|i| MatchToSchemaPerColumn {
1399                missing_columns: missing_columns[i].clone(),
1400                missing_struct_fields: missing_struct_fields[i],
1401                extra_struct_fields: extra_struct_fields[i],
1402                integer_cast: integer_cast[i],
1403                float_cast: float_cast[i],
1404            })
1405            .collect();
1406
1407        let ldf = self.ldf.clone();
1408        Ok(ldf
1409            .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1410            .into())
1411    }
1412
1413    fn rename(&mut self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1414        let ldf = self.ldf.clone();
1415        ldf.rename(existing, new, strict).into()
1416    }
1417
1418    fn reverse(&self) -> Self {
1419        let ldf = self.ldf.clone();
1420        ldf.reverse().into()
1421    }
1422
1423    #[pyo3(signature = (n, fill_value=None))]
1424    fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1425        let lf = self.ldf.clone();
1426        let out = match fill_value {
1427            Some(v) => lf.shift_and_fill(n.inner, v.inner),
1428            None => lf.shift(n.inner),
1429        };
1430        out.into()
1431    }
1432
1433    fn fill_nan(&self, fill_value: PyExpr) -> Self {
1434        let ldf = self.ldf.clone();
1435        ldf.fill_nan(fill_value.inner).into()
1436    }
1437
1438    fn min(&self) -> Self {
1439        let ldf = self.ldf.clone();
1440        let out = ldf.min();
1441        out.into()
1442    }
1443
1444    fn max(&self) -> Self {
1445        let ldf = self.ldf.clone();
1446        let out = ldf.max();
1447        out.into()
1448    }
1449
1450    fn sum(&self) -> Self {
1451        let ldf = self.ldf.clone();
1452        let out = ldf.sum();
1453        out.into()
1454    }
1455
1456    fn mean(&self) -> Self {
1457        let ldf = self.ldf.clone();
1458        let out = ldf.mean();
1459        out.into()
1460    }
1461
1462    fn std(&self, ddof: u8) -> Self {
1463        let ldf = self.ldf.clone();
1464        let out = ldf.std(ddof);
1465        out.into()
1466    }
1467
1468    fn var(&self, ddof: u8) -> Self {
1469        let ldf = self.ldf.clone();
1470        let out = ldf.var(ddof);
1471        out.into()
1472    }
1473
1474    fn median(&self) -> Self {
1475        let ldf = self.ldf.clone();
1476        let out = ldf.median();
1477        out.into()
1478    }
1479
1480    fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1481        let ldf = self.ldf.clone();
1482        let out = ldf.quantile(quantile.inner, interpolation.0);
1483        out.into()
1484    }
1485
1486    fn explode(&self, column: Vec<PyExpr>) -> Self {
1487        let ldf = self.ldf.clone();
1488        let column = column.to_exprs();
1489        ldf.explode(column).into()
1490    }
1491
1492    fn null_count(&self) -> Self {
1493        let ldf = self.ldf.clone();
1494        ldf.null_count().into()
1495    }
1496
1497    #[pyo3(signature = (maintain_order, subset, keep))]
1498    fn unique(
1499        &self,
1500        maintain_order: bool,
1501        subset: Option<Vec<PyExpr>>,
1502        keep: Wrap<UniqueKeepStrategy>,
1503    ) -> Self {
1504        let ldf = self.ldf.clone();
1505        let subset = subset.map(|e| e.to_exprs());
1506        match maintain_order {
1507            true => ldf.unique_stable_generic(subset, keep.0),
1508            false => ldf.unique_generic(subset, keep.0),
1509        }
1510        .into()
1511    }
1512
1513    #[pyo3(signature = (subset=None))]
1514    fn drop_nans(&self, subset: Option<Vec<PyExpr>>) -> Self {
1515        let ldf = self.ldf.clone();
1516        let subset = subset.map(|e| e.to_exprs());
1517        ldf.drop_nans(subset).into()
1518    }
1519
1520    #[pyo3(signature = (subset=None))]
1521    fn drop_nulls(&self, subset: Option<Vec<PyExpr>>) -> Self {
1522        let ldf = self.ldf.clone();
1523        let subset = subset.map(|e| e.to_exprs());
1524        ldf.drop_nulls(subset).into()
1525    }
1526
1527    #[pyo3(signature = (offset, len=None))]
1528    fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1529        let ldf = self.ldf.clone();
1530        ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1531    }
1532
1533    fn tail(&self, n: IdxSize) -> Self {
1534        let ldf = self.ldf.clone();
1535        ldf.tail(n).into()
1536    }
1537
1538    #[cfg(feature = "pivot")]
1539    #[pyo3(signature = (on, index, value_name, variable_name))]
1540    fn unpivot(
1541        &self,
1542        on: Vec<PyExpr>,
1543        index: Vec<PyExpr>,
1544        value_name: Option<String>,
1545        variable_name: Option<String>,
1546    ) -> Self {
1547        let args = UnpivotArgsDSL {
1548            on: on.into_iter().map(|e| e.inner.into()).collect(),
1549            index: index.into_iter().map(|e| e.inner.into()).collect(),
1550            value_name: value_name.map(|s| s.into()),
1551            variable_name: variable_name.map(|s| s.into()),
1552        };
1553
1554        let ldf = self.ldf.clone();
1555        ldf.unpivot(args).into()
1556    }
1557
1558    #[pyo3(signature = (name, offset=None))]
1559    fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1560        let ldf = self.ldf.clone();
1561        ldf.with_row_index(name, offset).into()
1562    }
1563
1564    #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1565    fn map_batches(
1566        &self,
1567        lambda: PyObject,
1568        predicate_pushdown: bool,
1569        projection_pushdown: bool,
1570        slice_pushdown: bool,
1571        streamable: bool,
1572        schema: Option<Wrap<Schema>>,
1573        validate_output: bool,
1574    ) -> Self {
1575        let mut opt = OptFlags::default();
1576        opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1577        opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1578        opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1579        opt.set(OptFlags::STREAMING, streamable);
1580
1581        self.ldf
1582            .clone()
1583            .map_python(
1584                lambda.into(),
1585                opt,
1586                schema.map(|s| Arc::new(s.0)),
1587                validate_output,
1588            )
1589            .into()
1590    }
1591
1592    fn drop(&self, columns: Vec<PyExpr>, strict: bool) -> Self {
1593        let ldf = self.ldf.clone();
1594        let columns = columns.to_exprs();
1595        if strict {
1596            ldf.drop(columns)
1597        } else {
1598            ldf.drop_no_validate(columns)
1599        }
1600        .into()
1601    }
1602
1603    fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1604        let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1605        cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1606        self.ldf.clone().cast(cast_map, strict).into()
1607    }
1608
1609    fn cast_all(&self, dtype: Wrap<DataType>, strict: bool) -> Self {
1610        self.ldf.clone().cast_all(dtype.0, strict).into()
1611    }
1612
1613    fn clone(&self) -> Self {
1614        self.ldf.clone().into()
1615    }
1616
1617    fn collect_schema<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1618        let schema = py.enter_polars(|| self.ldf.collect_schema())?;
1619
1620        let schema_dict = PyDict::new(py);
1621        schema.iter_fields().for_each(|fld| {
1622            schema_dict
1623                .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1624                .unwrap()
1625        });
1626        Ok(schema_dict)
1627    }
1628
1629    fn unnest(&self, columns: Vec<PyExpr>) -> Self {
1630        let columns = columns.to_exprs();
1631        self.ldf.clone().unnest(columns).into()
1632    }
1633
1634    fn count(&self) -> Self {
1635        let ldf = self.ldf.clone();
1636        ldf.count().into()
1637    }
1638
1639    #[cfg(feature = "merge_sorted")]
1640    fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1641        let out = self
1642            .ldf
1643            .clone()
1644            .merge_sorted(other.ldf, key)
1645            .map_err(PyPolarsErr::from)?;
1646        Ok(out.into())
1647    }
1648}
1649
1650#[cfg(feature = "parquet")]
1651impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1652    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1653        use polars_io::parquet::write::ParquetFieldOverwrites;
1654
1655        let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1656
1657        let name = PyDictMethods::get_item(&parsed, "name")?
1658            .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1659            .transpose()?;
1660        let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1661            PyResult::Ok(ChildFieldOverwrites::None),
1662            |v| {
1663                Ok(
1664                    if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1665                        ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1666                    } else {
1667                        ChildFieldOverwrites::ListLike(Box::new(
1668                            v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1669                        ))
1670                    },
1671                )
1672            },
1673        )?;
1674
1675        let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1676            .map(|v| v.extract::<i32>())
1677            .transpose()?;
1678
1679        let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1680            .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1681            .transpose()?;
1682        let metadata = metadata.map(|v| {
1683            v.into_iter()
1684                .map(|v| MetadataKeyValue {
1685                    key: v.0.into(),
1686                    value: v.1.map(|v| v.into()),
1687                })
1688                .collect()
1689        });
1690
1691        Ok(Wrap(ParquetFieldOverwrites {
1692            name,
1693            children,
1694            field_id,
1695            metadata,
1696        }))
1697    }
1698}