polars_python/lazyframe/
general.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::PathBuf;
4
5use either::Either;
6use polars::io::{HiveOptions, RowIndex};
7use polars::time::*;
8use polars_core::prelude::*;
9#[cfg(feature = "parquet")]
10use polars_parquet::arrow::write::StatisticsOptions;
11use polars_plan::dsl::ScanSources;
12use polars_plan::plans::{AExpr, IR};
13use polars_utils::arena::{Arena, Node};
14use polars_utils::python_function::PythonObject;
15use pyo3::exceptions::PyTypeError;
16use pyo3::prelude::*;
17use pyo3::pybacked::PyBackedStr;
18use pyo3::types::{PyDict, PyDictMethods, PyList};
19
20use super::{PyLazyFrame, PyOptFlags, SinkTarget};
21use crate::error::PyPolarsErr;
22use crate::expr::ToExprs;
23use crate::expr::datatype::PyDataTypeExpr;
24use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
25use crate::io::PyScanOptions;
26use crate::lazyframe::visit::NodeTraverser;
27use crate::prelude::*;
28use crate::utils::{EnterPolarsExt, to_py_err};
29use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
30
31fn pyobject_to_first_path_and_scan_sources(
32    obj: PyObject,
33) -> PyResult<(Option<PathBuf>, ScanSources)> {
34    use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
35    Ok(match get_python_scan_source_input(obj, false)? {
36        PythonScanSourceInput::Path(path) => {
37            (Some(path.clone()), ScanSources::Paths([path].into()))
38        },
39        PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
40        PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
41    })
42}
43
44fn post_opt_callback(
45    lambda: &PyObject,
46    root: Node,
47    lp_arena: &mut Arena<IR>,
48    expr_arena: &mut Arena<AExpr>,
49    duration_since_start: Option<std::time::Duration>,
50) -> PolarsResult<()> {
51    Python::with_gil(|py| {
52        let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
53
54        // Get a copy of the arenas.
55        let arenas = nt.get_arenas();
56
57        // Pass the node visitor which allows the python callback to replace parts of the query plan.
58        // Remove "cuda" or specify better once we have multiple post-opt callbacks.
59        lambda
60            .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
61            .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
62
63        // Unpack the arenas.
64        // At this point the `nt` is useless.
65
66        std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
67        std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
68
69        Ok(())
70    })
71}
72
73#[pymethods]
74#[allow(clippy::should_implement_trait)]
75impl PyLazyFrame {
76    #[staticmethod]
77    #[cfg(feature = "json")]
78    #[allow(clippy::too_many_arguments)]
79    #[pyo3(signature = (
80        source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
81        row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
82    ))]
83    fn new_from_ndjson(
84        source: Option<PyObject>,
85        sources: Wrap<ScanSources>,
86        infer_schema_length: Option<usize>,
87        schema: Option<Wrap<Schema>>,
88        schema_overrides: Option<Wrap<Schema>>,
89        batch_size: Option<NonZeroUsize>,
90        n_rows: Option<usize>,
91        low_memory: bool,
92        rechunk: bool,
93        row_index: Option<(String, IdxSize)>,
94        ignore_errors: bool,
95        include_file_paths: Option<String>,
96        cloud_options: Option<Vec<(String, String)>>,
97        credential_provider: Option<PyObject>,
98        retries: usize,
99        file_cache_ttl: Option<u64>,
100    ) -> PyResult<Self> {
101        use cloud::credential_provider::PlCredentialProvider;
102        let row_index = row_index.map(|(name, offset)| RowIndex {
103            name: name.into(),
104            offset,
105        });
106
107        let sources = sources.0;
108        let (first_path, sources) = match source {
109            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
110            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
111        };
112
113        let mut r = LazyJsonLineReader::new_with_sources(sources);
114
115        #[cfg(feature = "cloud")]
116        if let Some(first_path) = first_path {
117            let first_path_url = first_path.to_string_lossy();
118
119            let mut cloud_options =
120                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
121            cloud_options = cloud_options
122                .with_max_retries(retries)
123                .with_credential_provider(
124                    credential_provider.map(PlCredentialProvider::from_python_builder),
125                );
126
127            if let Some(file_cache_ttl) = file_cache_ttl {
128                cloud_options.file_cache_ttl = file_cache_ttl;
129            }
130
131            r = r.with_cloud_options(Some(cloud_options));
132        };
133
134        let lf = r
135            .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
136            .with_batch_size(batch_size)
137            .with_n_rows(n_rows)
138            .low_memory(low_memory)
139            .with_rechunk(rechunk)
140            .with_schema(schema.map(|schema| Arc::new(schema.0)))
141            .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
142            .with_row_index(row_index)
143            .with_ignore_errors(ignore_errors)
144            .with_include_file_paths(include_file_paths.map(|x| x.into()))
145            .finish()
146            .map_err(PyPolarsErr::from)?;
147
148        Ok(lf.into())
149    }
150
151    #[staticmethod]
152    #[cfg(feature = "csv")]
153    #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
154        low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
155        infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
156        encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
157        cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
158    )
159    )]
160    fn new_from_csv(
161        source: Option<PyObject>,
162        sources: Wrap<ScanSources>,
163        separator: &str,
164        has_header: bool,
165        ignore_errors: bool,
166        skip_rows: usize,
167        skip_lines: usize,
168        n_rows: Option<usize>,
169        cache: bool,
170        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
171        low_memory: bool,
172        comment_prefix: Option<&str>,
173        quote_char: Option<&str>,
174        null_values: Option<Wrap<NullValues>>,
175        missing_utf8_is_empty_string: bool,
176        infer_schema_length: Option<usize>,
177        with_schema_modify: Option<PyObject>,
178        rechunk: bool,
179        skip_rows_after_header: usize,
180        encoding: Wrap<CsvEncoding>,
181        row_index: Option<(String, IdxSize)>,
182        try_parse_dates: bool,
183        eol_char: &str,
184        raise_if_empty: bool,
185        truncate_ragged_lines: bool,
186        decimal_comma: bool,
187        glob: bool,
188        schema: Option<Wrap<Schema>>,
189        cloud_options: Option<Vec<(String, String)>>,
190        credential_provider: Option<PyObject>,
191        retries: usize,
192        file_cache_ttl: Option<u64>,
193        include_file_paths: Option<String>,
194    ) -> PyResult<Self> {
195        #[cfg(feature = "cloud")]
196        use cloud::credential_provider::PlCredentialProvider;
197
198        let null_values = null_values.map(|w| w.0);
199        let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
200        let separator = separator
201            .as_bytes()
202            .first()
203            .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
204            .copied()
205            .map_err(PyPolarsErr::from)?;
206        let eol_char = eol_char
207            .as_bytes()
208            .first()
209            .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
210            .copied()
211            .map_err(PyPolarsErr::from)?;
212        let row_index = row_index.map(|(name, offset)| RowIndex {
213            name: name.into(),
214            offset,
215        });
216
217        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
218            overwrite_dtype
219                .into_iter()
220                .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
221                .collect::<Schema>()
222        });
223
224        let sources = sources.0;
225        let (first_path, sources) = match source {
226            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
227            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
228        };
229
230        let mut r = LazyCsvReader::new_with_sources(sources);
231
232        #[cfg(feature = "cloud")]
233        if let Some(first_path) = first_path {
234            let first_path_url = first_path.to_string_lossy();
235
236            let mut cloud_options =
237                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
238            if let Some(file_cache_ttl) = file_cache_ttl {
239                cloud_options.file_cache_ttl = file_cache_ttl;
240            }
241            cloud_options = cloud_options
242                .with_max_retries(retries)
243                .with_credential_provider(
244                    credential_provider.map(PlCredentialProvider::from_python_builder),
245                );
246            r = r.with_cloud_options(Some(cloud_options));
247        }
248
249        let mut r = r
250            .with_infer_schema_length(infer_schema_length)
251            .with_separator(separator)
252            .with_has_header(has_header)
253            .with_ignore_errors(ignore_errors)
254            .with_skip_rows(skip_rows)
255            .with_skip_lines(skip_lines)
256            .with_n_rows(n_rows)
257            .with_cache(cache)
258            .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
259            .with_schema(schema.map(|schema| Arc::new(schema.0)))
260            .with_low_memory(low_memory)
261            .with_comment_prefix(comment_prefix.map(|x| x.into()))
262            .with_quote_char(quote_char)
263            .with_eol_char(eol_char)
264            .with_rechunk(rechunk)
265            .with_skip_rows_after_header(skip_rows_after_header)
266            .with_encoding(encoding.0)
267            .with_row_index(row_index)
268            .with_try_parse_dates(try_parse_dates)
269            .with_null_values(null_values)
270            .with_missing_is_null(!missing_utf8_is_empty_string)
271            .with_truncate_ragged_lines(truncate_ragged_lines)
272            .with_decimal_comma(decimal_comma)
273            .with_glob(glob)
274            .with_raise_if_empty(raise_if_empty)
275            .with_include_file_paths(include_file_paths.map(|x| x.into()));
276
277        if let Some(lambda) = with_schema_modify {
278            let f = |schema: Schema| {
279                let iter = schema.iter_names().map(|s| s.as_str());
280                Python::with_gil(|py| {
281                    let names = PyList::new(py, iter).unwrap();
282
283                    let out = lambda.call1(py, (names,)).expect("python function failed");
284                    let new_names = out
285                        .extract::<Vec<String>>(py)
286                        .expect("python function should return List[str]");
287                    polars_ensure!(new_names.len() == schema.len(),
288                        ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
289                    );
290                    Ok(schema
291                        .iter_values()
292                        .zip(new_names)
293                        .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
294                        .collect())
295                })
296            };
297            r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
298        }
299
300        Ok(r.finish().map_err(PyPolarsErr::from)?.into())
301    }
302
303    #[cfg(feature = "parquet")]
304    #[staticmethod]
305    #[pyo3(signature = (
306        sources, schema, scan_options, parallel, low_memory, use_statistics
307    ))]
308    fn new_from_parquet(
309        sources: Wrap<ScanSources>,
310        schema: Option<Wrap<Schema>>,
311        scan_options: PyScanOptions,
312        parallel: Wrap<ParallelStrategy>,
313        low_memory: bool,
314        use_statistics: bool,
315    ) -> PyResult<Self> {
316        use crate::utils::to_py_err;
317
318        let parallel = parallel.0;
319
320        let options = ParquetOptions {
321            schema: schema.map(|x| Arc::new(x.0)),
322            parallel,
323            low_memory,
324            use_statistics,
325        };
326
327        let sources = sources.0;
328        let first_path = sources.first_path().map(|p| p.to_path_buf());
329
330        let unified_scan_args = scan_options.extract_unified_scan_args(first_path.as_ref())?;
331
332        let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
333            .map_err(to_py_err)?
334            .build()
335            .into();
336
337        Ok(lf.into())
338    }
339
340    #[cfg(feature = "ipc")]
341    #[staticmethod]
342    #[pyo3(signature = (
343        source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
344        hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
345        include_file_paths
346    ))]
347    fn new_from_ipc(
348        source: Option<PyObject>,
349        sources: Wrap<ScanSources>,
350        n_rows: Option<usize>,
351        cache: bool,
352        rechunk: bool,
353        row_index: Option<(String, IdxSize)>,
354        cloud_options: Option<Vec<(String, String)>>,
355        credential_provider: Option<PyObject>,
356        hive_partitioning: Option<bool>,
357        hive_schema: Option<Wrap<Schema>>,
358        try_parse_hive_dates: bool,
359        retries: usize,
360        file_cache_ttl: Option<u64>,
361        include_file_paths: Option<String>,
362    ) -> PyResult<Self> {
363        #[cfg(feature = "cloud")]
364        use cloud::credential_provider::PlCredentialProvider;
365        let row_index = row_index.map(|(name, offset)| RowIndex {
366            name: name.into(),
367            offset,
368        });
369
370        let hive_options = HiveOptions {
371            enabled: hive_partitioning,
372            hive_start_idx: 0,
373            schema: hive_schema.map(|x| Arc::new(x.0)),
374            try_parse_dates: try_parse_hive_dates,
375        };
376
377        let mut args = ScanArgsIpc {
378            n_rows,
379            cache,
380            rechunk,
381            row_index,
382            cloud_options: None,
383            hive_options,
384            include_file_paths: include_file_paths.map(|x| x.into()),
385        };
386
387        let sources = sources.0;
388        let (first_path, sources) = match source {
389            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
390            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
391        };
392
393        #[cfg(feature = "cloud")]
394        if let Some(first_path) = first_path {
395            let first_path_url = first_path.to_string_lossy();
396
397            let mut cloud_options =
398                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
399            if let Some(file_cache_ttl) = file_cache_ttl {
400                cloud_options.file_cache_ttl = file_cache_ttl;
401            }
402            args.cloud_options = Some(
403                cloud_options
404                    .with_max_retries(retries)
405                    .with_credential_provider(
406                        credential_provider.map(PlCredentialProvider::from_python_builder),
407                    ),
408            );
409        }
410
411        let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
412        Ok(lf.into())
413    }
414
415    #[staticmethod]
416    #[pyo3(signature = (
417        dataset_object
418    ))]
419    fn new_from_dataset_object(dataset_object: PyObject) -> PyResult<Self> {
420        use crate::dataset::dataset_provider_funcs;
421
422        polars_plan::dsl::DATASET_PROVIDER_VTABLE.get_or_init(|| PythonDatasetProviderVTable {
423            reader_name: dataset_provider_funcs::reader_name,
424            schema: dataset_provider_funcs::schema,
425            to_dataset_scan: dataset_provider_funcs::to_dataset_scan,
426        });
427
428        let lf =
429            LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
430                .into();
431
432        Ok(lf)
433    }
434
435    #[staticmethod]
436    fn scan_from_python_function_arrow_schema(
437        schema: &Bound<'_, PyList>,
438        scan_fn: PyObject,
439        pyarrow: bool,
440        validate_schema: bool,
441    ) -> PyResult<Self> {
442        let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
443
444        Ok(LazyFrame::scan_from_python_function(
445            Either::Right(schema),
446            scan_fn,
447            pyarrow,
448            validate_schema,
449        )
450        .into())
451    }
452
453    #[staticmethod]
454    fn scan_from_python_function_pl_schema(
455        schema: Vec<(PyBackedStr, Wrap<DataType>)>,
456        scan_fn: PyObject,
457        pyarrow: bool,
458        validate_schema: bool,
459    ) -> PyResult<Self> {
460        let schema = Arc::new(Schema::from_iter(
461            schema
462                .into_iter()
463                .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
464        ));
465        Ok(LazyFrame::scan_from_python_function(
466            Either::Right(schema),
467            scan_fn,
468            pyarrow,
469            validate_schema,
470        )
471        .into())
472    }
473
474    #[staticmethod]
475    fn scan_from_python_function_schema_function(
476        schema_fn: PyObject,
477        scan_fn: PyObject,
478        validate_schema: bool,
479    ) -> PyResult<Self> {
480        Ok(LazyFrame::scan_from_python_function(
481            Either::Left(schema_fn),
482            scan_fn,
483            false,
484            validate_schema,
485        )
486        .into())
487    }
488
489    fn describe_plan(&self, py: Python) -> PyResult<String> {
490        py.enter_polars(|| self.ldf.describe_plan())
491    }
492
493    fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
494        py.enter_polars(|| self.ldf.describe_optimized_plan())
495    }
496
497    fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
498        py.enter_polars(|| self.ldf.describe_plan_tree())
499    }
500
501    fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
502        py.enter_polars(|| self.ldf.describe_optimized_plan_tree())
503    }
504
505    fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
506        py.enter_polars(|| self.ldf.to_dot(optimized))
507    }
508
509    #[cfg(feature = "new_streaming")]
510    fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
511        py.enter_polars(|| self.ldf.to_dot_streaming_phys(optimized))
512    }
513
514    fn optimization_toggle(
515        &self,
516        type_coercion: bool,
517        type_check: bool,
518        predicate_pushdown: bool,
519        projection_pushdown: bool,
520        simplify_expression: bool,
521        slice_pushdown: bool,
522        comm_subplan_elim: bool,
523        comm_subexpr_elim: bool,
524        cluster_with_columns: bool,
525        collapse_joins: bool,
526        _eager: bool,
527        _check_order: bool,
528        #[allow(unused_variables)] new_streaming: bool,
529    ) -> Self {
530        let ldf = self.ldf.clone();
531        let mut ldf = ldf
532            .with_type_coercion(type_coercion)
533            .with_type_check(type_check)
534            .with_predicate_pushdown(predicate_pushdown)
535            .with_simplify_expr(simplify_expression)
536            .with_slice_pushdown(slice_pushdown)
537            .with_cluster_with_columns(cluster_with_columns)
538            .with_collapse_joins(collapse_joins)
539            .with_check_order(_check_order)
540            ._with_eager(_eager)
541            .with_projection_pushdown(projection_pushdown);
542
543        #[cfg(feature = "new_streaming")]
544        {
545            ldf = ldf.with_new_streaming(new_streaming);
546        }
547
548        #[cfg(feature = "cse")]
549        {
550            ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
551            ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
552        }
553
554        ldf.into()
555    }
556
557    fn sort(
558        &self,
559        by_column: &str,
560        descending: bool,
561        nulls_last: bool,
562        maintain_order: bool,
563        multithreaded: bool,
564    ) -> Self {
565        let ldf = self.ldf.clone();
566        ldf.sort(
567            [by_column],
568            SortMultipleOptions {
569                descending: vec![descending],
570                nulls_last: vec![nulls_last],
571                multithreaded,
572                maintain_order,
573                limit: None,
574            },
575        )
576        .into()
577    }
578
579    fn sort_by_exprs(
580        &self,
581        by: Vec<PyExpr>,
582        descending: Vec<bool>,
583        nulls_last: Vec<bool>,
584        maintain_order: bool,
585        multithreaded: bool,
586    ) -> Self {
587        let ldf = self.ldf.clone();
588        let exprs = by.to_exprs();
589        ldf.sort_by_exprs(
590            exprs,
591            SortMultipleOptions {
592                descending,
593                nulls_last,
594                maintain_order,
595                multithreaded,
596                limit: None,
597            },
598        )
599        .into()
600    }
601
602    fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
603        let ldf = self.ldf.clone();
604        let exprs = by.to_exprs();
605        ldf.top_k(
606            k,
607            exprs,
608            SortMultipleOptions::new().with_order_descending_multi(reverse),
609        )
610        .into()
611    }
612
613    fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
614        let ldf = self.ldf.clone();
615        let exprs = by.to_exprs();
616        ldf.bottom_k(
617            k,
618            exprs,
619            SortMultipleOptions::new().with_order_descending_multi(reverse),
620        )
621        .into()
622    }
623
624    fn cache(&self) -> Self {
625        let ldf = self.ldf.clone();
626        ldf.cache().into()
627    }
628
629    #[pyo3(signature = (optflags))]
630    fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
631        let ldf = self.ldf.clone();
632        ldf.with_optimizations(optflags.inner).into()
633    }
634
635    #[pyo3(signature = (lambda_post_opt=None))]
636    fn profile(
637        &self,
638        py: Python<'_>,
639        lambda_post_opt: Option<PyObject>,
640    ) -> PyResult<(PyDataFrame, PyDataFrame)> {
641        let (df, time_df) = py.enter_polars(|| {
642            let ldf = self.ldf.clone();
643            if let Some(lambda) = lambda_post_opt {
644                ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
645                    post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
646                })
647            } else {
648                ldf.profile()
649            }
650        })?;
651        Ok((df.into(), time_df.into()))
652    }
653
654    #[pyo3(signature = (engine, lambda_post_opt=None))]
655    fn collect(
656        &self,
657        py: Python<'_>,
658        engine: Wrap<Engine>,
659        lambda_post_opt: Option<PyObject>,
660    ) -> PyResult<PyDataFrame> {
661        py.enter_polars_df(|| {
662            let ldf = self.ldf.clone();
663            if let Some(lambda) = lambda_post_opt {
664                ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
665                    post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
666                })
667            } else {
668                ldf.collect_with_engine(engine.0)
669            }
670        })
671    }
672
673    #[pyo3(signature = (engine, lambda))]
674    fn collect_with_callback(
675        &self,
676        py: Python<'_>,
677        engine: Wrap<Engine>,
678        lambda: PyObject,
679    ) -> PyResult<()> {
680        py.enter_polars_ok(|| {
681            let ldf = self.ldf.clone();
682
683            polars_core::POOL.spawn(move || {
684                let result = ldf
685                    .collect_with_engine(engine.0)
686                    .map(PyDataFrame::new)
687                    .map_err(PyPolarsErr::from);
688
689                Python::with_gil(|py| match result {
690                    Ok(df) => {
691                        lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
692                    },
693                    Err(err) => {
694                        lambda
695                            .call1(py, (PyErr::from(err),))
696                            .map_err(|err| err.restore(py))
697                            .ok();
698                    },
699                });
700            });
701        })
702    }
703
704    #[cfg(feature = "parquet")]
705    #[pyo3(signature = (
706        target, compression, compression_level, statistics, row_group_size, data_page_size,
707        cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
708    ))]
709    fn sink_parquet(
710        &self,
711        py: Python<'_>,
712        target: SinkTarget,
713        compression: &str,
714        compression_level: Option<i32>,
715        statistics: Wrap<StatisticsOptions>,
716        row_group_size: Option<usize>,
717        data_page_size: Option<usize>,
718        cloud_options: Option<Vec<(String, String)>>,
719        credential_provider: Option<PyObject>,
720        retries: usize,
721        sink_options: Wrap<SinkOptions>,
722        metadata: Wrap<Option<KeyValueMetadata>>,
723        field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
724    ) -> PyResult<PyLazyFrame> {
725        let compression = parse_parquet_compression(compression, compression_level)?;
726
727        let options = ParquetWriteOptions {
728            compression,
729            statistics: statistics.0,
730            row_group_size,
731            data_page_size,
732            key_value_metadata: metadata.0,
733            field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
734        };
735
736        let cloud_options = match target.base_path() {
737            None => None,
738            Some(base_path) => {
739                let cloud_options = parse_cloud_options(
740                    base_path.to_str().unwrap(),
741                    cloud_options.unwrap_or_default(),
742                )?;
743                Some(
744                    cloud_options
745                        .with_max_retries(retries)
746                        .with_credential_provider(
747                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
748                        ),
749                )
750            },
751        };
752
753        py.enter_polars(|| {
754            let ldf = self.ldf.clone();
755            match target {
756                SinkTarget::File(target) => {
757                    ldf.sink_parquet(target, options, cloud_options, sink_options.0)
758                },
759                SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
760                    Arc::new(partition.base_path),
761                    partition.file_path_cb.map(PartitionTargetCallback::Python),
762                    partition.variant,
763                    options,
764                    cloud_options,
765                    sink_options.0,
766                    partition.per_partition_sort_by,
767                    partition.finish_callback,
768                ),
769            }
770            .into()
771        })
772        .map(Into::into)
773        .map_err(Into::into)
774    }
775
776    #[cfg(feature = "ipc")]
777    #[pyo3(signature = (
778        target, compression, compat_level, cloud_options, credential_provider, retries,
779        sink_options
780    ))]
781    fn sink_ipc(
782        &self,
783        py: Python<'_>,
784        target: SinkTarget,
785        compression: Wrap<Option<IpcCompression>>,
786        compat_level: PyCompatLevel,
787        cloud_options: Option<Vec<(String, String)>>,
788        credential_provider: Option<PyObject>,
789        retries: usize,
790        sink_options: Wrap<SinkOptions>,
791    ) -> PyResult<PyLazyFrame> {
792        let options = IpcWriterOptions {
793            compression: compression.0,
794            compat_level: compat_level.0,
795            ..Default::default()
796        };
797
798        #[cfg(feature = "cloud")]
799        let cloud_options = match target.base_path() {
800            None => None,
801            Some(base_path) => {
802                let cloud_options = parse_cloud_options(
803                    base_path.to_str().unwrap(),
804                    cloud_options.unwrap_or_default(),
805                )?;
806                Some(
807                    cloud_options
808                        .with_max_retries(retries)
809                        .with_credential_provider(
810                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
811                        ),
812                )
813            },
814        };
815
816        #[cfg(not(feature = "cloud"))]
817        let cloud_options = None;
818
819        py.enter_polars(|| {
820            let ldf = self.ldf.clone();
821            match target {
822                SinkTarget::File(target) => {
823                    ldf.sink_ipc(target, options, cloud_options, sink_options.0)
824                },
825                SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
826                    Arc::new(partition.base_path),
827                    partition.file_path_cb.map(PartitionTargetCallback::Python),
828                    partition.variant,
829                    options,
830                    cloud_options,
831                    sink_options.0,
832                    partition.per_partition_sort_by,
833                    partition.finish_callback,
834                ),
835            }
836        })
837        .map(Into::into)
838        .map_err(Into::into)
839    }
840
841    #[cfg(feature = "csv")]
842    #[pyo3(signature = (
843        target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
844        datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
845        quote_style, cloud_options, credential_provider, retries, sink_options
846    ))]
847    fn sink_csv(
848        &self,
849        py: Python<'_>,
850        target: SinkTarget,
851        include_bom: bool,
852        include_header: bool,
853        separator: u8,
854        line_terminator: String,
855        quote_char: u8,
856        batch_size: NonZeroUsize,
857        datetime_format: Option<String>,
858        date_format: Option<String>,
859        time_format: Option<String>,
860        float_scientific: Option<bool>,
861        float_precision: Option<usize>,
862        null_value: Option<String>,
863        quote_style: Option<Wrap<QuoteStyle>>,
864        cloud_options: Option<Vec<(String, String)>>,
865        credential_provider: Option<PyObject>,
866        retries: usize,
867        sink_options: Wrap<SinkOptions>,
868    ) -> PyResult<PyLazyFrame> {
869        let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
870        let null_value = null_value.unwrap_or(SerializeOptions::default().null);
871
872        let serialize_options = SerializeOptions {
873            date_format,
874            time_format,
875            datetime_format,
876            float_scientific,
877            float_precision,
878            separator,
879            quote_char,
880            null: null_value,
881            line_terminator,
882            quote_style,
883        };
884
885        let options = CsvWriterOptions {
886            include_bom,
887            include_header,
888            batch_size,
889            serialize_options,
890        };
891
892        #[cfg(feature = "cloud")]
893        let cloud_options = match target.base_path() {
894            None => None,
895            Some(base_path) => {
896                let cloud_options = parse_cloud_options(
897                    base_path.to_str().unwrap(),
898                    cloud_options.unwrap_or_default(),
899                )?;
900                Some(
901                    cloud_options
902                        .with_max_retries(retries)
903                        .with_credential_provider(
904                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
905                        ),
906                )
907            },
908        };
909
910        #[cfg(not(feature = "cloud"))]
911        let cloud_options = None;
912
913        py.enter_polars(|| {
914            let ldf = self.ldf.clone();
915            match target {
916                SinkTarget::File(target) => {
917                    ldf.sink_csv(target, options, cloud_options, sink_options.0)
918                },
919                SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
920                    Arc::new(partition.base_path),
921                    partition.file_path_cb.map(PartitionTargetCallback::Python),
922                    partition.variant,
923                    options,
924                    cloud_options,
925                    sink_options.0,
926                    partition.per_partition_sort_by,
927                    partition.finish_callback,
928                ),
929            }
930        })
931        .map(Into::into)
932        .map_err(Into::into)
933    }
934
935    #[allow(clippy::too_many_arguments)]
936    #[cfg(feature = "json")]
937    #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
938    fn sink_json(
939        &self,
940        py: Python<'_>,
941        target: SinkTarget,
942        cloud_options: Option<Vec<(String, String)>>,
943        credential_provider: Option<PyObject>,
944        retries: usize,
945        sink_options: Wrap<SinkOptions>,
946    ) -> PyResult<PyLazyFrame> {
947        let options = JsonWriterOptions {};
948
949        let cloud_options = match target.base_path() {
950            None => None,
951            Some(base_path) => {
952                let cloud_options = parse_cloud_options(
953                    base_path.to_str().unwrap(),
954                    cloud_options.unwrap_or_default(),
955                )?;
956                Some(
957                cloud_options
958                    .with_max_retries(retries)
959                    .with_credential_provider(
960                        credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
961                    ),
962            )
963            },
964        };
965
966        py.enter_polars(|| {
967            let ldf = self.ldf.clone();
968            match target {
969                SinkTarget::File(path) => {
970                    ldf.sink_json(path, options, cloud_options, sink_options.0)
971                },
972                SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
973                    Arc::new(partition.base_path),
974                    partition.file_path_cb.map(PartitionTargetCallback::Python),
975                    partition.variant,
976                    options,
977                    cloud_options,
978                    sink_options.0,
979                    partition.per_partition_sort_by,
980                    partition.finish_callback,
981                ),
982            }
983        })
984        .map(Into::into)
985        .map_err(Into::into)
986    }
987
988    fn fetch(&self, py: Python<'_>, n_rows: usize) -> PyResult<PyDataFrame> {
989        let ldf = self.ldf.clone();
990        py.enter_polars_df(|| ldf.fetch(n_rows))
991    }
992
993    fn filter(&mut self, predicate: PyExpr) -> Self {
994        let ldf = self.ldf.clone();
995        ldf.filter(predicate.inner).into()
996    }
997
998    fn remove(&mut self, predicate: PyExpr) -> Self {
999        let ldf = self.ldf.clone();
1000        ldf.remove(predicate.inner).into()
1001    }
1002
1003    fn select(&mut self, exprs: Vec<PyExpr>) -> Self {
1004        let ldf = self.ldf.clone();
1005        let exprs = exprs.to_exprs();
1006        ldf.select(exprs).into()
1007    }
1008
1009    fn select_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1010        let ldf = self.ldf.clone();
1011        let exprs = exprs.to_exprs();
1012        ldf.select_seq(exprs).into()
1013    }
1014
1015    fn group_by(&mut self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
1016        let ldf = self.ldf.clone();
1017        let by = by.to_exprs();
1018        let lazy_gb = if maintain_order {
1019            ldf.group_by_stable(by)
1020        } else {
1021            ldf.group_by(by)
1022        };
1023
1024        PyLazyGroupBy { lgb: Some(lazy_gb) }
1025    }
1026
1027    fn rolling(
1028        &mut self,
1029        index_column: PyExpr,
1030        period: &str,
1031        offset: &str,
1032        closed: Wrap<ClosedWindow>,
1033        by: Vec<PyExpr>,
1034    ) -> PyResult<PyLazyGroupBy> {
1035        let closed_window = closed.0;
1036        let ldf = self.ldf.clone();
1037        let by = by
1038            .into_iter()
1039            .map(|pyexpr| pyexpr.inner)
1040            .collect::<Vec<_>>();
1041        let lazy_gb = ldf.rolling(
1042            index_column.inner,
1043            by,
1044            RollingGroupOptions {
1045                index_column: "".into(),
1046                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1047                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1048                closed_window,
1049            },
1050        );
1051
1052        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1053    }
1054
1055    fn group_by_dynamic(
1056        &mut self,
1057        index_column: PyExpr,
1058        every: &str,
1059        period: &str,
1060        offset: &str,
1061        label: Wrap<Label>,
1062        include_boundaries: bool,
1063        closed: Wrap<ClosedWindow>,
1064        group_by: Vec<PyExpr>,
1065        start_by: Wrap<StartBy>,
1066    ) -> PyResult<PyLazyGroupBy> {
1067        let closed_window = closed.0;
1068        let group_by = group_by
1069            .into_iter()
1070            .map(|pyexpr| pyexpr.inner)
1071            .collect::<Vec<_>>();
1072        let ldf = self.ldf.clone();
1073        let lazy_gb = ldf.group_by_dynamic(
1074            index_column.inner,
1075            group_by,
1076            DynamicGroupOptions {
1077                every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1078                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1079                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1080                label: label.0,
1081                include_boundaries,
1082                closed_window,
1083                start_by: start_by.0,
1084                ..Default::default()
1085            },
1086        );
1087
1088        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1089    }
1090
1091    fn with_context(&self, contexts: Vec<Self>) -> Self {
1092        let contexts = contexts.into_iter().map(|ldf| ldf.ldf).collect::<Vec<_>>();
1093        self.ldf.clone().with_context(contexts).into()
1094    }
1095
1096    #[cfg(feature = "asof_join")]
1097    #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1098    fn join_asof(
1099        &self,
1100        other: Self,
1101        left_on: PyExpr,
1102        right_on: PyExpr,
1103        left_by: Option<Vec<PyBackedStr>>,
1104        right_by: Option<Vec<PyBackedStr>>,
1105        allow_parallel: bool,
1106        force_parallel: bool,
1107        suffix: String,
1108        strategy: Wrap<AsofStrategy>,
1109        tolerance: Option<Wrap<AnyValue<'_>>>,
1110        tolerance_str: Option<String>,
1111        coalesce: bool,
1112        allow_eq: bool,
1113        check_sortedness: bool,
1114    ) -> PyResult<Self> {
1115        let coalesce = if coalesce {
1116            JoinCoalesce::CoalesceColumns
1117        } else {
1118            JoinCoalesce::KeepColumns
1119        };
1120        let ldf = self.ldf.clone();
1121        let other = other.ldf;
1122        let left_on = left_on.inner;
1123        let right_on = right_on.inner;
1124        Ok(ldf
1125            .join_builder()
1126            .with(other)
1127            .left_on([left_on])
1128            .right_on([right_on])
1129            .allow_parallel(allow_parallel)
1130            .force_parallel(force_parallel)
1131            .coalesce(coalesce)
1132            .how(JoinType::AsOf(AsOfOptions {
1133                strategy: strategy.0,
1134                left_by: left_by.map(strings_to_pl_smallstr),
1135                right_by: right_by.map(strings_to_pl_smallstr),
1136                tolerance: tolerance.map(|t| t.0.into_static()),
1137                tolerance_str: tolerance_str.map(|s| s.into()),
1138                allow_eq,
1139                check_sortedness,
1140            }))
1141            .suffix(suffix)
1142            .finish()
1143            .into())
1144    }
1145
1146    #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1147    fn join(
1148        &self,
1149        other: Self,
1150        left_on: Vec<PyExpr>,
1151        right_on: Vec<PyExpr>,
1152        allow_parallel: bool,
1153        force_parallel: bool,
1154        nulls_equal: bool,
1155        how: Wrap<JoinType>,
1156        suffix: String,
1157        validate: Wrap<JoinValidation>,
1158        maintain_order: Wrap<MaintainOrderJoin>,
1159        coalesce: Option<bool>,
1160    ) -> PyResult<Self> {
1161        let coalesce = match coalesce {
1162            None => JoinCoalesce::JoinSpecific,
1163            Some(true) => JoinCoalesce::CoalesceColumns,
1164            Some(false) => JoinCoalesce::KeepColumns,
1165        };
1166        let ldf = self.ldf.clone();
1167        let other = other.ldf;
1168        let left_on = left_on
1169            .into_iter()
1170            .map(|pyexpr| pyexpr.inner)
1171            .collect::<Vec<_>>();
1172        let right_on = right_on
1173            .into_iter()
1174            .map(|pyexpr| pyexpr.inner)
1175            .collect::<Vec<_>>();
1176
1177        Ok(ldf
1178            .join_builder()
1179            .with(other)
1180            .left_on(left_on)
1181            .right_on(right_on)
1182            .allow_parallel(allow_parallel)
1183            .force_parallel(force_parallel)
1184            .join_nulls(nulls_equal)
1185            .how(how.0)
1186            .suffix(suffix)
1187            .validate(validate.0)
1188            .coalesce(coalesce)
1189            .maintain_order(maintain_order.0)
1190            .finish()
1191            .into())
1192    }
1193
1194    fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1195        let ldf = self.ldf.clone();
1196        let other = other.ldf;
1197
1198        let predicates = predicates.to_exprs();
1199
1200        Ok(ldf
1201            .join_builder()
1202            .with(other)
1203            .suffix(suffix)
1204            .join_where(predicates)
1205            .into())
1206    }
1207
1208    fn with_columns(&mut self, exprs: Vec<PyExpr>) -> Self {
1209        let ldf = self.ldf.clone();
1210        ldf.with_columns(exprs.to_exprs()).into()
1211    }
1212
1213    fn with_columns_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1214        let ldf = self.ldf.clone();
1215        ldf.with_columns_seq(exprs.to_exprs()).into()
1216    }
1217
1218    fn match_to_schema<'py>(
1219        &self,
1220        schema: Wrap<Schema>,
1221        missing_columns: &Bound<'py, PyAny>,
1222        missing_struct_fields: &Bound<'py, PyAny>,
1223        extra_columns: Wrap<ExtraColumnsPolicy>,
1224        extra_struct_fields: &Bound<'py, PyAny>,
1225        integer_cast: &Bound<'py, PyAny>,
1226        float_cast: &Bound<'py, PyAny>,
1227    ) -> PyResult<Self> {
1228        fn parse_missing_columns<'py>(
1229            schema: &Schema,
1230            missing_columns: &Bound<'py, PyAny>,
1231        ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1232            let mut out = Vec::with_capacity(schema.len());
1233            if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1234                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1235            } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1236                out.extend(std::iter::repeat_n(
1237                    MissingColumnsPolicyOrExpr::Raise,
1238                    schema.len(),
1239                ));
1240                for (key, value) in dict.iter() {
1241                    let key = key.extract::<String>()?;
1242                    let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1243                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1244                }
1245            } else {
1246                return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1247            }
1248            Ok(out)
1249        }
1250        fn parse_missing_struct_fields<'py>(
1251            schema: &Schema,
1252            missing_struct_fields: &Bound<'py, PyAny>,
1253        ) -> PyResult<Vec<MissingColumnsPolicy>> {
1254            let mut out = Vec::with_capacity(schema.len());
1255            if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1256                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1257            } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1258                out.extend(std::iter::repeat_n(
1259                    MissingColumnsPolicy::Raise,
1260                    schema.len(),
1261                ));
1262                for (key, value) in dict.iter() {
1263                    let key = key.extract::<String>()?;
1264                    let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1265                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1266                }
1267            } else {
1268                return Err(PyTypeError::new_err(
1269                    "Invalid value for `missing_struct_fields`",
1270                ));
1271            }
1272            Ok(out)
1273        }
1274        fn parse_extra_struct_fields<'py>(
1275            schema: &Schema,
1276            extra_struct_fields: &Bound<'py, PyAny>,
1277        ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1278            let mut out = Vec::with_capacity(schema.len());
1279            if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1280                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1281            } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1282                out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1283                for (key, value) in dict.iter() {
1284                    let key = key.extract::<String>()?;
1285                    let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1286                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1287                }
1288            } else {
1289                return Err(PyTypeError::new_err(
1290                    "Invalid value for `extra_struct_fields`",
1291                ));
1292            }
1293            Ok(out)
1294        }
1295        fn parse_cast<'py>(
1296            schema: &Schema,
1297            cast: &Bound<'py, PyAny>,
1298        ) -> PyResult<Vec<UpcastOrForbid>> {
1299            let mut out = Vec::with_capacity(schema.len());
1300            if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1301                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1302            } else if let Ok(dict) = cast.downcast::<PyDict>() {
1303                out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1304                for (key, value) in dict.iter() {
1305                    let key = key.extract::<String>()?;
1306                    let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1307                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1308                }
1309            } else {
1310                return Err(PyTypeError::new_err(
1311                    "Invalid value for `integer_cast` / `float_cast`",
1312                ));
1313            }
1314            Ok(out)
1315        }
1316
1317        let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1318        let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1319        let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1320        let integer_cast = parse_cast(&schema.0, integer_cast)?;
1321        let float_cast = parse_cast(&schema.0, float_cast)?;
1322
1323        let per_column = (0..schema.0.len())
1324            .map(|i| MatchToSchemaPerColumn {
1325                missing_columns: missing_columns[i].clone(),
1326                missing_struct_fields: missing_struct_fields[i],
1327                extra_struct_fields: extra_struct_fields[i],
1328                integer_cast: integer_cast[i],
1329                float_cast: float_cast[i],
1330            })
1331            .collect();
1332
1333        let ldf = self.ldf.clone();
1334        Ok(ldf
1335            .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1336            .into())
1337    }
1338
1339    fn rename(&mut self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1340        let ldf = self.ldf.clone();
1341        ldf.rename(existing, new, strict).into()
1342    }
1343
1344    fn reverse(&self) -> Self {
1345        let ldf = self.ldf.clone();
1346        ldf.reverse().into()
1347    }
1348
1349    #[pyo3(signature = (n, fill_value=None))]
1350    fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1351        let lf = self.ldf.clone();
1352        let out = match fill_value {
1353            Some(v) => lf.shift_and_fill(n.inner, v.inner),
1354            None => lf.shift(n.inner),
1355        };
1356        out.into()
1357    }
1358
1359    fn fill_nan(&self, fill_value: PyExpr) -> Self {
1360        let ldf = self.ldf.clone();
1361        ldf.fill_nan(fill_value.inner).into()
1362    }
1363
1364    fn min(&self) -> Self {
1365        let ldf = self.ldf.clone();
1366        let out = ldf.min();
1367        out.into()
1368    }
1369
1370    fn max(&self) -> Self {
1371        let ldf = self.ldf.clone();
1372        let out = ldf.max();
1373        out.into()
1374    }
1375
1376    fn sum(&self) -> Self {
1377        let ldf = self.ldf.clone();
1378        let out = ldf.sum();
1379        out.into()
1380    }
1381
1382    fn mean(&self) -> Self {
1383        let ldf = self.ldf.clone();
1384        let out = ldf.mean();
1385        out.into()
1386    }
1387
1388    fn std(&self, ddof: u8) -> Self {
1389        let ldf = self.ldf.clone();
1390        let out = ldf.std(ddof);
1391        out.into()
1392    }
1393
1394    fn var(&self, ddof: u8) -> Self {
1395        let ldf = self.ldf.clone();
1396        let out = ldf.var(ddof);
1397        out.into()
1398    }
1399
1400    fn median(&self) -> Self {
1401        let ldf = self.ldf.clone();
1402        let out = ldf.median();
1403        out.into()
1404    }
1405
1406    fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1407        let ldf = self.ldf.clone();
1408        let out = ldf.quantile(quantile.inner, interpolation.0);
1409        out.into()
1410    }
1411
1412    fn explode(&self, column: Vec<PyExpr>) -> Self {
1413        let ldf = self.ldf.clone();
1414        let column = column.to_exprs();
1415        ldf.explode(column).into()
1416    }
1417
1418    fn null_count(&self) -> Self {
1419        let ldf = self.ldf.clone();
1420        ldf.null_count().into()
1421    }
1422
1423    #[pyo3(signature = (maintain_order, subset, keep))]
1424    fn unique(
1425        &self,
1426        maintain_order: bool,
1427        subset: Option<Vec<PyExpr>>,
1428        keep: Wrap<UniqueKeepStrategy>,
1429    ) -> Self {
1430        let ldf = self.ldf.clone();
1431        let subset = subset.map(|e| e.to_exprs());
1432        match maintain_order {
1433            true => ldf.unique_stable_generic(subset, keep.0),
1434            false => ldf.unique_generic(subset, keep.0),
1435        }
1436        .into()
1437    }
1438
1439    #[pyo3(signature = (subset=None))]
1440    fn drop_nans(&self, subset: Option<Vec<PyExpr>>) -> Self {
1441        let ldf = self.ldf.clone();
1442        let subset = subset.map(|e| e.to_exprs());
1443        ldf.drop_nans(subset).into()
1444    }
1445
1446    #[pyo3(signature = (subset=None))]
1447    fn drop_nulls(&self, subset: Option<Vec<PyExpr>>) -> Self {
1448        let ldf = self.ldf.clone();
1449        let subset = subset.map(|e| e.to_exprs());
1450        ldf.drop_nulls(subset).into()
1451    }
1452
1453    #[pyo3(signature = (offset, len=None))]
1454    fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1455        let ldf = self.ldf.clone();
1456        ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1457    }
1458
1459    fn tail(&self, n: IdxSize) -> Self {
1460        let ldf = self.ldf.clone();
1461        ldf.tail(n).into()
1462    }
1463
1464    #[cfg(feature = "pivot")]
1465    #[pyo3(signature = (on, index, value_name, variable_name))]
1466    fn unpivot(
1467        &self,
1468        on: Vec<PyExpr>,
1469        index: Vec<PyExpr>,
1470        value_name: Option<String>,
1471        variable_name: Option<String>,
1472    ) -> Self {
1473        let args = UnpivotArgsDSL {
1474            on: on.into_iter().map(|e| e.inner.into()).collect(),
1475            index: index.into_iter().map(|e| e.inner.into()).collect(),
1476            value_name: value_name.map(|s| s.into()),
1477            variable_name: variable_name.map(|s| s.into()),
1478        };
1479
1480        let ldf = self.ldf.clone();
1481        ldf.unpivot(args).into()
1482    }
1483
1484    #[pyo3(signature = (name, offset=None))]
1485    fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1486        let ldf = self.ldf.clone();
1487        ldf.with_row_index(name, offset).into()
1488    }
1489
1490    #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1491    fn map_batches(
1492        &self,
1493        lambda: PyObject,
1494        predicate_pushdown: bool,
1495        projection_pushdown: bool,
1496        slice_pushdown: bool,
1497        streamable: bool,
1498        schema: Option<Wrap<Schema>>,
1499        validate_output: bool,
1500    ) -> Self {
1501        let mut opt = OptFlags::default();
1502        opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1503        opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1504        opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1505        opt.set(OptFlags::NEW_STREAMING, streamable);
1506
1507        self.ldf
1508            .clone()
1509            .map_python(
1510                lambda.into(),
1511                opt,
1512                schema.map(|s| Arc::new(s.0)),
1513                validate_output,
1514            )
1515            .into()
1516    }
1517
1518    fn drop(&self, columns: Vec<PyExpr>, strict: bool) -> Self {
1519        let ldf = self.ldf.clone();
1520        let columns = columns.to_exprs();
1521        if strict {
1522            ldf.drop(columns)
1523        } else {
1524            ldf.drop_no_validate(columns)
1525        }
1526        .into()
1527    }
1528
1529    fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1530        let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1531        cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1532        self.ldf.clone().cast(cast_map, strict).into()
1533    }
1534
1535    fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1536        self.ldf.clone().cast_all(dtype.inner, strict).into()
1537    }
1538
1539    fn clone(&self) -> Self {
1540        self.ldf.clone().into()
1541    }
1542
1543    fn collect_schema<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1544        let schema = py.enter_polars(|| self.ldf.collect_schema())?;
1545
1546        let schema_dict = PyDict::new(py);
1547        schema.iter_fields().for_each(|fld| {
1548            schema_dict
1549                .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1550                .unwrap()
1551        });
1552        Ok(schema_dict)
1553    }
1554
1555    fn unnest(&self, columns: Vec<PyExpr>) -> Self {
1556        let columns = columns.to_exprs();
1557        self.ldf.clone().unnest(columns).into()
1558    }
1559
1560    fn count(&self) -> Self {
1561        let ldf = self.ldf.clone();
1562        ldf.count().into()
1563    }
1564
1565    #[cfg(feature = "merge_sorted")]
1566    fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1567        let out = self
1568            .ldf
1569            .clone()
1570            .merge_sorted(other.ldf, key)
1571            .map_err(PyPolarsErr::from)?;
1572        Ok(out.into())
1573    }
1574}
1575
1576#[cfg(feature = "parquet")]
1577impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1578    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1579        use polars_io::parquet::write::ParquetFieldOverwrites;
1580
1581        let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1582
1583        let name = PyDictMethods::get_item(&parsed, "name")?
1584            .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1585            .transpose()?;
1586        let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1587            PyResult::Ok(ChildFieldOverwrites::None),
1588            |v| {
1589                Ok(
1590                    if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1591                        ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1592                    } else {
1593                        ChildFieldOverwrites::ListLike(Box::new(
1594                            v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1595                        ))
1596                    },
1597                )
1598            },
1599        )?;
1600
1601        let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1602            .map(|v| v.extract::<i32>())
1603            .transpose()?;
1604
1605        let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1606            .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1607            .transpose()?;
1608        let metadata = metadata.map(|v| {
1609            v.into_iter()
1610                .map(|v| MetadataKeyValue {
1611                    key: v.0.into(),
1612                    value: v.1.map(|v| v.into()),
1613                })
1614                .collect()
1615        });
1616
1617        let required = PyDictMethods::get_item(&parsed, "required")?
1618            .map(|v| v.extract::<bool>())
1619            .transpose()?;
1620
1621        Ok(Wrap(ParquetFieldOverwrites {
1622            name,
1623            children,
1624            field_id,
1625            metadata,
1626            required,
1627        }))
1628    }
1629}