polars_python/lazyframe/
general.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3
4use either::Either;
5use polars::io::{HiveOptions, RowIndex};
6use polars::time::*;
7use polars_core::prelude::*;
8#[cfg(feature = "parquet")]
9use polars_parquet::arrow::write::StatisticsOptions;
10use polars_plan::dsl::ScanSources;
11use polars_plan::plans::{AExpr, IR};
12use polars_utils::arena::{Arena, Node};
13use polars_utils::python_function::PythonObject;
14use pyo3::exceptions::PyTypeError;
15use pyo3::prelude::*;
16use pyo3::pybacked::PyBackedStr;
17use pyo3::types::{PyDict, PyDictMethods, PyList};
18
19use super::{PyLazyFrame, PyOptFlags, SinkTarget};
20use crate::error::PyPolarsErr;
21use crate::expr::ToExprs;
22use crate::expr::datatype::PyDataTypeExpr;
23use crate::expr::selector::PySelector;
24use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
25use crate::io::PyScanOptions;
26use crate::lazyframe::visit::NodeTraverser;
27use crate::prelude::*;
28use crate::utils::{EnterPolarsExt, to_py_err};
29use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
30
31fn pyobject_to_first_path_and_scan_sources(
32    obj: PyObject,
33) -> PyResult<(Option<PlPath>, ScanSources)> {
34    use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
35    Ok(match get_python_scan_source_input(obj, false)? {
36        PythonScanSourceInput::Path(path) => {
37            (Some(path.clone()), ScanSources::Paths([path].into()))
38        },
39        PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
40        PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
41    })
42}
43
44fn post_opt_callback(
45    lambda: &PyObject,
46    root: Node,
47    lp_arena: &mut Arena<IR>,
48    expr_arena: &mut Arena<AExpr>,
49    duration_since_start: Option<std::time::Duration>,
50) -> PolarsResult<()> {
51    Python::with_gil(|py| {
52        let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
53
54        // Get a copy of the arenas.
55        let arenas = nt.get_arenas();
56
57        // Pass the node visitor which allows the python callback to replace parts of the query plan.
58        // Remove "cuda" or specify better once we have multiple post-opt callbacks.
59        lambda
60            .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
61            .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
62
63        // Unpack the arenas.
64        // At this point the `nt` is useless.
65
66        std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
67        std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
68
69        Ok(())
70    })
71}
72
73#[pymethods]
74#[allow(clippy::should_implement_trait)]
75impl PyLazyFrame {
76    #[staticmethod]
77    #[cfg(feature = "json")]
78    #[allow(clippy::too_many_arguments)]
79    #[pyo3(signature = (
80        source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
81        row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
82    ))]
83    fn new_from_ndjson(
84        source: Option<PyObject>,
85        sources: Wrap<ScanSources>,
86        infer_schema_length: Option<usize>,
87        schema: Option<Wrap<Schema>>,
88        schema_overrides: Option<Wrap<Schema>>,
89        batch_size: Option<NonZeroUsize>,
90        n_rows: Option<usize>,
91        low_memory: bool,
92        rechunk: bool,
93        row_index: Option<(String, IdxSize)>,
94        ignore_errors: bool,
95        include_file_paths: Option<String>,
96        cloud_options: Option<Vec<(String, String)>>,
97        credential_provider: Option<PyObject>,
98        retries: usize,
99        file_cache_ttl: Option<u64>,
100    ) -> PyResult<Self> {
101        use cloud::credential_provider::PlCredentialProvider;
102        let row_index = row_index.map(|(name, offset)| RowIndex {
103            name: name.into(),
104            offset,
105        });
106
107        let sources = sources.0;
108        let (first_path, sources) = match source {
109            None => (sources.first_path().map(|p| p.into_owned()), sources),
110            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
111        };
112
113        let mut r = LazyJsonLineReader::new_with_sources(sources);
114
115        #[cfg(feature = "cloud")]
116        if let Some(first_path) = first_path {
117            let first_path_url = first_path.to_str();
118
119            let mut cloud_options =
120                parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
121            cloud_options = cloud_options
122                .with_max_retries(retries)
123                .with_credential_provider(
124                    credential_provider.map(PlCredentialProvider::from_python_builder),
125                );
126
127            if let Some(file_cache_ttl) = file_cache_ttl {
128                cloud_options.file_cache_ttl = file_cache_ttl;
129            }
130
131            r = r.with_cloud_options(Some(cloud_options));
132        };
133
134        let lf = r
135            .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
136            .with_batch_size(batch_size)
137            .with_n_rows(n_rows)
138            .low_memory(low_memory)
139            .with_rechunk(rechunk)
140            .with_schema(schema.map(|schema| Arc::new(schema.0)))
141            .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
142            .with_row_index(row_index)
143            .with_ignore_errors(ignore_errors)
144            .with_include_file_paths(include_file_paths.map(|x| x.into()))
145            .finish()
146            .map_err(PyPolarsErr::from)?;
147
148        Ok(lf.into())
149    }
150
151    #[staticmethod]
152    #[cfg(feature = "csv")]
153    #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
154        low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
155        infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
156        encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
157        cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
158    )
159    )]
160    fn new_from_csv(
161        source: Option<PyObject>,
162        sources: Wrap<ScanSources>,
163        separator: &str,
164        has_header: bool,
165        ignore_errors: bool,
166        skip_rows: usize,
167        skip_lines: usize,
168        n_rows: Option<usize>,
169        cache: bool,
170        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
171        low_memory: bool,
172        comment_prefix: Option<&str>,
173        quote_char: Option<&str>,
174        null_values: Option<Wrap<NullValues>>,
175        missing_utf8_is_empty_string: bool,
176        infer_schema_length: Option<usize>,
177        with_schema_modify: Option<PyObject>,
178        rechunk: bool,
179        skip_rows_after_header: usize,
180        encoding: Wrap<CsvEncoding>,
181        row_index: Option<(String, IdxSize)>,
182        try_parse_dates: bool,
183        eol_char: &str,
184        raise_if_empty: bool,
185        truncate_ragged_lines: bool,
186        decimal_comma: bool,
187        glob: bool,
188        schema: Option<Wrap<Schema>>,
189        cloud_options: Option<Vec<(String, String)>>,
190        credential_provider: Option<PyObject>,
191        retries: usize,
192        file_cache_ttl: Option<u64>,
193        include_file_paths: Option<String>,
194    ) -> PyResult<Self> {
195        #[cfg(feature = "cloud")]
196        use cloud::credential_provider::PlCredentialProvider;
197
198        let null_values = null_values.map(|w| w.0);
199        let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
200        let separator = separator
201            .as_bytes()
202            .first()
203            .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
204            .copied()
205            .map_err(PyPolarsErr::from)?;
206        let eol_char = eol_char
207            .as_bytes()
208            .first()
209            .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
210            .copied()
211            .map_err(PyPolarsErr::from)?;
212        let row_index = row_index.map(|(name, offset)| RowIndex {
213            name: name.into(),
214            offset,
215        });
216
217        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
218            overwrite_dtype
219                .into_iter()
220                .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
221                .collect::<Schema>()
222        });
223
224        let sources = sources.0;
225        let (first_path, sources) = match source {
226            None => (sources.first_path().map(|p| p.into_owned()), sources),
227            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
228        };
229
230        let mut r = LazyCsvReader::new_with_sources(sources);
231
232        #[cfg(feature = "cloud")]
233        if let Some(first_path) = first_path {
234            let first_path_url = first_path.to_str();
235
236            let mut cloud_options =
237                parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
238            if let Some(file_cache_ttl) = file_cache_ttl {
239                cloud_options.file_cache_ttl = file_cache_ttl;
240            }
241            cloud_options = cloud_options
242                .with_max_retries(retries)
243                .with_credential_provider(
244                    credential_provider.map(PlCredentialProvider::from_python_builder),
245                );
246            r = r.with_cloud_options(Some(cloud_options));
247        }
248
249        let mut r = r
250            .with_infer_schema_length(infer_schema_length)
251            .with_separator(separator)
252            .with_has_header(has_header)
253            .with_ignore_errors(ignore_errors)
254            .with_skip_rows(skip_rows)
255            .with_skip_lines(skip_lines)
256            .with_n_rows(n_rows)
257            .with_cache(cache)
258            .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
259            .with_schema(schema.map(|schema| Arc::new(schema.0)))
260            .with_low_memory(low_memory)
261            .with_comment_prefix(comment_prefix.map(|x| x.into()))
262            .with_quote_char(quote_char)
263            .with_eol_char(eol_char)
264            .with_rechunk(rechunk)
265            .with_skip_rows_after_header(skip_rows_after_header)
266            .with_encoding(encoding.0)
267            .with_row_index(row_index)
268            .with_try_parse_dates(try_parse_dates)
269            .with_null_values(null_values)
270            .with_missing_is_null(!missing_utf8_is_empty_string)
271            .with_truncate_ragged_lines(truncate_ragged_lines)
272            .with_decimal_comma(decimal_comma)
273            .with_glob(glob)
274            .with_raise_if_empty(raise_if_empty)
275            .with_include_file_paths(include_file_paths.map(|x| x.into()));
276
277        if let Some(lambda) = with_schema_modify {
278            let f = |schema: Schema| {
279                let iter = schema.iter_names().map(|s| s.as_str());
280                Python::with_gil(|py| {
281                    let names = PyList::new(py, iter).unwrap();
282
283                    let out = lambda.call1(py, (names,)).expect("python function failed");
284                    let new_names = out
285                        .extract::<Vec<String>>(py)
286                        .expect("python function should return List[str]");
287                    polars_ensure!(new_names.len() == schema.len(),
288                        ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
289                    );
290                    Ok(schema
291                        .iter_values()
292                        .zip(new_names)
293                        .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
294                        .collect())
295                })
296            };
297            r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
298        }
299
300        Ok(r.finish().map_err(PyPolarsErr::from)?.into())
301    }
302
303    #[cfg(feature = "parquet")]
304    #[staticmethod]
305    #[pyo3(signature = (
306        sources, schema, scan_options, parallel, low_memory, use_statistics
307    ))]
308    fn new_from_parquet(
309        sources: Wrap<ScanSources>,
310        schema: Option<Wrap<Schema>>,
311        scan_options: PyScanOptions,
312        parallel: Wrap<ParallelStrategy>,
313        low_memory: bool,
314        use_statistics: bool,
315    ) -> PyResult<Self> {
316        use crate::utils::to_py_err;
317
318        let parallel = parallel.0;
319
320        let options = ParquetOptions {
321            schema: schema.map(|x| Arc::new(x.0)),
322            parallel,
323            low_memory,
324            use_statistics,
325        };
326
327        let sources = sources.0;
328        let first_path = sources.first_path().map(|p| p.into_owned());
329
330        let unified_scan_args =
331            scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
332
333        let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
334            .map_err(to_py_err)?
335            .build()
336            .into();
337
338        Ok(lf.into())
339    }
340
341    #[cfg(feature = "ipc")]
342    #[staticmethod]
343    #[pyo3(signature = (
344        source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
345        hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
346        include_file_paths
347    ))]
348    fn new_from_ipc(
349        source: Option<PyObject>,
350        sources: Wrap<ScanSources>,
351        n_rows: Option<usize>,
352        cache: bool,
353        rechunk: bool,
354        row_index: Option<(String, IdxSize)>,
355        cloud_options: Option<Vec<(String, String)>>,
356        credential_provider: Option<PyObject>,
357        hive_partitioning: Option<bool>,
358        hive_schema: Option<Wrap<Schema>>,
359        try_parse_hive_dates: bool,
360        retries: usize,
361        file_cache_ttl: Option<u64>,
362        include_file_paths: Option<String>,
363    ) -> PyResult<Self> {
364        #[cfg(feature = "cloud")]
365        use cloud::credential_provider::PlCredentialProvider;
366        let row_index = row_index.map(|(name, offset)| RowIndex {
367            name: name.into(),
368            offset,
369        });
370
371        let hive_options = HiveOptions {
372            enabled: hive_partitioning,
373            hive_start_idx: 0,
374            schema: hive_schema.map(|x| Arc::new(x.0)),
375            try_parse_dates: try_parse_hive_dates,
376        };
377
378        let mut args = ScanArgsIpc {
379            n_rows,
380            cache,
381            rechunk,
382            row_index,
383            cloud_options: None,
384            hive_options,
385            include_file_paths: include_file_paths.map(|x| x.into()),
386        };
387
388        let sources = sources.0;
389        let (first_path, sources) = match source {
390            None => (sources.first_path().map(|p| p.into_owned()), sources),
391            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
392        };
393
394        #[cfg(feature = "cloud")]
395        if let Some(first_path) = first_path {
396            let first_path_url = first_path.to_str();
397
398            let mut cloud_options =
399                parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
400            if let Some(file_cache_ttl) = file_cache_ttl {
401                cloud_options.file_cache_ttl = file_cache_ttl;
402            }
403            args.cloud_options = Some(
404                cloud_options
405                    .with_max_retries(retries)
406                    .with_credential_provider(
407                        credential_provider.map(PlCredentialProvider::from_python_builder),
408                    ),
409            );
410        }
411
412        let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
413        Ok(lf.into())
414    }
415
416    #[staticmethod]
417    #[pyo3(signature = (
418        dataset_object
419    ))]
420    fn new_from_dataset_object(dataset_object: PyObject) -> PyResult<Self> {
421        let lf =
422            LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
423                .into();
424
425        Ok(lf)
426    }
427
428    #[staticmethod]
429    fn scan_from_python_function_arrow_schema(
430        schema: &Bound<'_, PyList>,
431        scan_fn: PyObject,
432        pyarrow: bool,
433        validate_schema: bool,
434        is_pure: bool,
435    ) -> PyResult<Self> {
436        let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
437
438        Ok(LazyFrame::scan_from_python_function(
439            Either::Right(schema),
440            scan_fn,
441            pyarrow,
442            validate_schema,
443            is_pure,
444        )
445        .into())
446    }
447
448    #[staticmethod]
449    fn scan_from_python_function_pl_schema(
450        schema: Vec<(PyBackedStr, Wrap<DataType>)>,
451        scan_fn: PyObject,
452        pyarrow: bool,
453        validate_schema: bool,
454        is_pure: bool,
455    ) -> PyResult<Self> {
456        let schema = Arc::new(Schema::from_iter(
457            schema
458                .into_iter()
459                .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
460        ));
461        Ok(LazyFrame::scan_from_python_function(
462            Either::Right(schema),
463            scan_fn,
464            pyarrow,
465            validate_schema,
466            is_pure,
467        )
468        .into())
469    }
470
471    #[staticmethod]
472    fn scan_from_python_function_schema_function(
473        schema_fn: PyObject,
474        scan_fn: PyObject,
475        validate_schema: bool,
476        is_pure: bool,
477    ) -> PyResult<Self> {
478        Ok(LazyFrame::scan_from_python_function(
479            Either::Left(schema_fn),
480            scan_fn,
481            false,
482            validate_schema,
483            is_pure,
484        )
485        .into())
486    }
487
488    fn describe_plan(&self, py: Python) -> PyResult<String> {
489        py.enter_polars(|| self.ldf.read().describe_plan())
490    }
491
492    fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
493        py.enter_polars(|| self.ldf.read().describe_optimized_plan())
494    }
495
496    fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
497        py.enter_polars(|| self.ldf.read().describe_plan_tree())
498    }
499
500    fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
501        py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
502    }
503
504    fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
505        py.enter_polars(|| self.ldf.read().to_dot(optimized))
506    }
507
508    #[cfg(feature = "new_streaming")]
509    fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
510        py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
511    }
512
513    fn optimization_toggle(
514        &self,
515        type_coercion: bool,
516        type_check: bool,
517        predicate_pushdown: bool,
518        projection_pushdown: bool,
519        simplify_expression: bool,
520        slice_pushdown: bool,
521        comm_subplan_elim: bool,
522        comm_subexpr_elim: bool,
523        cluster_with_columns: bool,
524        collapse_joins: bool,
525        _eager: bool,
526        _check_order: bool,
527        #[allow(unused_variables)] new_streaming: bool,
528    ) -> Self {
529        let ldf = self.ldf.read().clone();
530        let mut ldf = ldf
531            .with_type_coercion(type_coercion)
532            .with_type_check(type_check)
533            .with_predicate_pushdown(predicate_pushdown)
534            .with_simplify_expr(simplify_expression)
535            .with_slice_pushdown(slice_pushdown)
536            .with_cluster_with_columns(cluster_with_columns)
537            .with_collapse_joins(collapse_joins)
538            .with_check_order(_check_order)
539            ._with_eager(_eager)
540            .with_projection_pushdown(projection_pushdown);
541
542        #[cfg(feature = "new_streaming")]
543        {
544            ldf = ldf.with_new_streaming(new_streaming);
545        }
546
547        #[cfg(feature = "cse")]
548        {
549            ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
550            ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
551        }
552
553        ldf.into()
554    }
555
556    fn sort(
557        &self,
558        by_column: &str,
559        descending: bool,
560        nulls_last: bool,
561        maintain_order: bool,
562        multithreaded: bool,
563    ) -> Self {
564        let ldf = self.ldf.read().clone();
565        ldf.sort(
566            [by_column],
567            SortMultipleOptions {
568                descending: vec![descending],
569                nulls_last: vec![nulls_last],
570                multithreaded,
571                maintain_order,
572                limit: None,
573            },
574        )
575        .into()
576    }
577
578    fn sort_by_exprs(
579        &self,
580        by: Vec<PyExpr>,
581        descending: Vec<bool>,
582        nulls_last: Vec<bool>,
583        maintain_order: bool,
584        multithreaded: bool,
585    ) -> Self {
586        let ldf = self.ldf.read().clone();
587        let exprs = by.to_exprs();
588        ldf.sort_by_exprs(
589            exprs,
590            SortMultipleOptions {
591                descending,
592                nulls_last,
593                maintain_order,
594                multithreaded,
595                limit: None,
596            },
597        )
598        .into()
599    }
600
601    fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
602        let ldf = self.ldf.read().clone();
603        let exprs = by.to_exprs();
604        ldf.top_k(
605            k,
606            exprs,
607            SortMultipleOptions::new().with_order_descending_multi(reverse),
608        )
609        .into()
610    }
611
612    fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
613        let ldf = self.ldf.read().clone();
614        let exprs = by.to_exprs();
615        ldf.bottom_k(
616            k,
617            exprs,
618            SortMultipleOptions::new().with_order_descending_multi(reverse),
619        )
620        .into()
621    }
622
623    fn cache(&self) -> Self {
624        let ldf = self.ldf.read().clone();
625        ldf.cache().into()
626    }
627
628    #[pyo3(signature = (optflags))]
629    fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
630        let ldf = self.ldf.read().clone();
631        ldf.with_optimizations(optflags.inner.into_inner()).into()
632    }
633
634    #[pyo3(signature = (lambda_post_opt))]
635    fn profile(
636        &self,
637        py: Python<'_>,
638        lambda_post_opt: Option<PyObject>,
639    ) -> PyResult<(PyDataFrame, PyDataFrame)> {
640        let (df, time_df) = py.enter_polars(|| {
641            let ldf = self.ldf.read().clone();
642            if let Some(lambda) = lambda_post_opt {
643                ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
644                    post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
645                })
646            } else {
647                ldf.profile()
648            }
649        })?;
650        Ok((df.into(), time_df.into()))
651    }
652
653    #[pyo3(signature = (engine, lambda_post_opt))]
654    fn collect(
655        &self,
656        py: Python<'_>,
657        engine: Wrap<Engine>,
658        lambda_post_opt: Option<PyObject>,
659    ) -> PyResult<PyDataFrame> {
660        py.enter_polars_df(|| {
661            let ldf = self.ldf.read().clone();
662            if let Some(lambda) = lambda_post_opt {
663                ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
664                    post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
665                })
666            } else {
667                ldf.collect_with_engine(engine.0)
668            }
669        })
670    }
671
672    #[pyo3(signature = (engine, lambda))]
673    fn collect_with_callback(
674        &self,
675        py: Python<'_>,
676        engine: Wrap<Engine>,
677        lambda: PyObject,
678    ) -> PyResult<()> {
679        py.enter_polars_ok(|| {
680            let ldf = self.ldf.read().clone();
681
682            polars_core::POOL.spawn(move || {
683                let result = ldf
684                    .collect_with_engine(engine.0)
685                    .map(PyDataFrame::new)
686                    .map_err(PyPolarsErr::from);
687
688                Python::with_gil(|py| match result {
689                    Ok(df) => {
690                        lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
691                    },
692                    Err(err) => {
693                        lambda
694                            .call1(py, (PyErr::from(err),))
695                            .map_err(|err| err.restore(py))
696                            .ok();
697                    },
698                });
699            });
700        })
701    }
702
703    #[cfg(feature = "parquet")]
704    #[pyo3(signature = (
705        target, compression, compression_level, statistics, row_group_size, data_page_size,
706        cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
707    ))]
708    fn sink_parquet(
709        &self,
710        py: Python<'_>,
711        target: SinkTarget,
712        compression: &str,
713        compression_level: Option<i32>,
714        statistics: Wrap<StatisticsOptions>,
715        row_group_size: Option<usize>,
716        data_page_size: Option<usize>,
717        cloud_options: Option<Vec<(String, String)>>,
718        credential_provider: Option<PyObject>,
719        retries: usize,
720        sink_options: Wrap<SinkOptions>,
721        metadata: Wrap<Option<KeyValueMetadata>>,
722        field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
723    ) -> PyResult<PyLazyFrame> {
724        let compression = parse_parquet_compression(compression, compression_level)?;
725
726        let options = ParquetWriteOptions {
727            compression,
728            statistics: statistics.0,
729            row_group_size,
730            data_page_size,
731            key_value_metadata: metadata.0,
732            field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
733        };
734
735        let cloud_options = match target.base_path() {
736            None => None,
737            Some(base_path) => {
738                let cloud_options =
739                    parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
740                Some(
741                    cloud_options
742                        .with_max_retries(retries)
743                        .with_credential_provider(
744                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
745                        ),
746                )
747            },
748        };
749
750        py.enter_polars(|| {
751            let ldf = self.ldf.read().clone();
752            match target {
753                SinkTarget::File(target) => {
754                    ldf.sink_parquet(target, options, cloud_options, sink_options.0)
755                },
756                SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
757                    Arc::new(partition.base_path.0),
758                    partition.file_path_cb.map(PartitionTargetCallback::Python),
759                    partition.variant,
760                    options,
761                    cloud_options,
762                    sink_options.0,
763                    partition.per_partition_sort_by,
764                    partition.finish_callback,
765                ),
766            }
767            .into()
768        })
769        .map(Into::into)
770        .map_err(Into::into)
771    }
772
773    #[cfg(feature = "ipc")]
774    #[pyo3(signature = (
775        target, compression, compat_level, cloud_options, credential_provider, retries,
776        sink_options
777    ))]
778    fn sink_ipc(
779        &self,
780        py: Python<'_>,
781        target: SinkTarget,
782        compression: Wrap<Option<IpcCompression>>,
783        compat_level: PyCompatLevel,
784        cloud_options: Option<Vec<(String, String)>>,
785        credential_provider: Option<PyObject>,
786        retries: usize,
787        sink_options: Wrap<SinkOptions>,
788    ) -> PyResult<PyLazyFrame> {
789        let options = IpcWriterOptions {
790            compression: compression.0,
791            compat_level: compat_level.0,
792            ..Default::default()
793        };
794
795        #[cfg(feature = "cloud")]
796        let cloud_options = match target.base_path() {
797            None => None,
798            Some(base_path) => {
799                let cloud_options =
800                    parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
801                Some(
802                    cloud_options
803                        .with_max_retries(retries)
804                        .with_credential_provider(
805                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
806                        ),
807                )
808            },
809        };
810
811        #[cfg(not(feature = "cloud"))]
812        let cloud_options = None;
813
814        py.enter_polars(|| {
815            let ldf = self.ldf.read().clone();
816            match target {
817                SinkTarget::File(target) => {
818                    ldf.sink_ipc(target, options, cloud_options, sink_options.0)
819                },
820                SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
821                    Arc::new(partition.base_path.0),
822                    partition.file_path_cb.map(PartitionTargetCallback::Python),
823                    partition.variant,
824                    options,
825                    cloud_options,
826                    sink_options.0,
827                    partition.per_partition_sort_by,
828                    partition.finish_callback,
829                ),
830            }
831        })
832        .map(Into::into)
833        .map_err(Into::into)
834    }
835
836    #[cfg(feature = "csv")]
837    #[pyo3(signature = (
838        target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
839        datetime_format, date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
840        quote_style, cloud_options, credential_provider, retries, sink_options
841    ))]
842    fn sink_csv(
843        &self,
844        py: Python<'_>,
845        target: SinkTarget,
846        include_bom: bool,
847        include_header: bool,
848        separator: u8,
849        line_terminator: String,
850        quote_char: u8,
851        batch_size: NonZeroUsize,
852        datetime_format: Option<String>,
853        date_format: Option<String>,
854        time_format: Option<String>,
855        float_scientific: Option<bool>,
856        float_precision: Option<usize>,
857        decimal_comma: bool,
858        null_value: Option<String>,
859        quote_style: Option<Wrap<QuoteStyle>>,
860        cloud_options: Option<Vec<(String, String)>>,
861        credential_provider: Option<PyObject>,
862        retries: usize,
863        sink_options: Wrap<SinkOptions>,
864    ) -> PyResult<PyLazyFrame> {
865        let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
866        let null_value = null_value.unwrap_or(SerializeOptions::default().null);
867
868        let serialize_options = SerializeOptions {
869            date_format,
870            time_format,
871            datetime_format,
872            float_scientific,
873            float_precision,
874            decimal_comma,
875            separator,
876            quote_char,
877            null: null_value,
878            line_terminator,
879            quote_style,
880        };
881
882        let options = CsvWriterOptions {
883            include_bom,
884            include_header,
885            batch_size,
886            serialize_options,
887        };
888
889        #[cfg(feature = "cloud")]
890        let cloud_options = match target.base_path() {
891            None => None,
892            Some(base_path) => {
893                let cloud_options =
894                    parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
895                Some(
896                    cloud_options
897                        .with_max_retries(retries)
898                        .with_credential_provider(
899                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
900                        ),
901                )
902            },
903        };
904
905        #[cfg(not(feature = "cloud"))]
906        let cloud_options = None;
907
908        py.enter_polars(|| {
909            let ldf = self.ldf.read().clone();
910            match target {
911                SinkTarget::File(target) => {
912                    ldf.sink_csv(target, options, cloud_options, sink_options.0)
913                },
914                SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
915                    Arc::new(partition.base_path.0),
916                    partition.file_path_cb.map(PartitionTargetCallback::Python),
917                    partition.variant,
918                    options,
919                    cloud_options,
920                    sink_options.0,
921                    partition.per_partition_sort_by,
922                    partition.finish_callback,
923                ),
924            }
925        })
926        .map(Into::into)
927        .map_err(Into::into)
928    }
929
930    #[allow(clippy::too_many_arguments)]
931    #[cfg(feature = "json")]
932    #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
933    fn sink_json(
934        &self,
935        py: Python<'_>,
936        target: SinkTarget,
937        cloud_options: Option<Vec<(String, String)>>,
938        credential_provider: Option<PyObject>,
939        retries: usize,
940        sink_options: Wrap<SinkOptions>,
941    ) -> PyResult<PyLazyFrame> {
942        let options = JsonWriterOptions {};
943
944        let cloud_options = match target.base_path() {
945            None => None,
946            Some(base_path) => {
947                let cloud_options =
948                    parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
949                Some(
950                cloud_options
951                    .with_max_retries(retries)
952                    .with_credential_provider(
953                        credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
954                    ),
955            )
956            },
957        };
958
959        py.enter_polars(|| {
960            let ldf = self.ldf.read().clone();
961            match target {
962                SinkTarget::File(path) => {
963                    ldf.sink_json(path, options, cloud_options, sink_options.0)
964                },
965                SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
966                    Arc::new(partition.base_path.0),
967                    partition.file_path_cb.map(PartitionTargetCallback::Python),
968                    partition.variant,
969                    options,
970                    cloud_options,
971                    sink_options.0,
972                    partition.per_partition_sort_by,
973                    partition.finish_callback,
974                ),
975            }
976        })
977        .map(Into::into)
978        .map_err(Into::into)
979    }
980
981    fn filter(&self, predicate: PyExpr) -> Self {
982        let ldf = self.ldf.read().clone();
983        ldf.filter(predicate.inner).into()
984    }
985
986    fn remove(&self, predicate: PyExpr) -> Self {
987        let ldf = self.ldf.read().clone();
988        ldf.remove(predicate.inner).into()
989    }
990
991    fn select(&self, exprs: Vec<PyExpr>) -> Self {
992        let ldf = self.ldf.read().clone();
993        let exprs = exprs.to_exprs();
994        ldf.select(exprs).into()
995    }
996
997    fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
998        let ldf = self.ldf.read().clone();
999        let exprs = exprs.to_exprs();
1000        ldf.select_seq(exprs).into()
1001    }
1002
1003    fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
1004        let ldf = self.ldf.read().clone();
1005        let by = by.to_exprs();
1006        let lazy_gb = if maintain_order {
1007            ldf.group_by_stable(by)
1008        } else {
1009            ldf.group_by(by)
1010        };
1011
1012        PyLazyGroupBy { lgb: Some(lazy_gb) }
1013    }
1014
1015    fn rolling(
1016        &self,
1017        index_column: PyExpr,
1018        period: &str,
1019        offset: &str,
1020        closed: Wrap<ClosedWindow>,
1021        by: Vec<PyExpr>,
1022    ) -> PyResult<PyLazyGroupBy> {
1023        let closed_window = closed.0;
1024        let ldf = self.ldf.read().clone();
1025        let by = by
1026            .into_iter()
1027            .map(|pyexpr| pyexpr.inner)
1028            .collect::<Vec<_>>();
1029        let lazy_gb = ldf.rolling(
1030            index_column.inner,
1031            by,
1032            RollingGroupOptions {
1033                index_column: "".into(),
1034                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1035                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1036                closed_window,
1037            },
1038        );
1039
1040        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1041    }
1042
1043    fn group_by_dynamic(
1044        &self,
1045        index_column: PyExpr,
1046        every: &str,
1047        period: &str,
1048        offset: &str,
1049        label: Wrap<Label>,
1050        include_boundaries: bool,
1051        closed: Wrap<ClosedWindow>,
1052        group_by: Vec<PyExpr>,
1053        start_by: Wrap<StartBy>,
1054    ) -> PyResult<PyLazyGroupBy> {
1055        let closed_window = closed.0;
1056        let group_by = group_by
1057            .into_iter()
1058            .map(|pyexpr| pyexpr.inner)
1059            .collect::<Vec<_>>();
1060        let ldf = self.ldf.read().clone();
1061        let lazy_gb = ldf.group_by_dynamic(
1062            index_column.inner,
1063            group_by,
1064            DynamicGroupOptions {
1065                every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1066                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1067                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1068                label: label.0,
1069                include_boundaries,
1070                closed_window,
1071                start_by: start_by.0,
1072                ..Default::default()
1073            },
1074        );
1075
1076        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1077    }
1078
1079    fn with_context(&self, contexts: Vec<Self>) -> Self {
1080        let contexts = contexts
1081            .into_iter()
1082            .map(|ldf| ldf.ldf.into_inner())
1083            .collect::<Vec<_>>();
1084        self.ldf.read().clone().with_context(contexts).into()
1085    }
1086
1087    #[cfg(feature = "asof_join")]
1088    #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1089    fn join_asof(
1090        &self,
1091        other: Self,
1092        left_on: PyExpr,
1093        right_on: PyExpr,
1094        left_by: Option<Vec<PyBackedStr>>,
1095        right_by: Option<Vec<PyBackedStr>>,
1096        allow_parallel: bool,
1097        force_parallel: bool,
1098        suffix: String,
1099        strategy: Wrap<AsofStrategy>,
1100        tolerance: Option<Wrap<AnyValue<'_>>>,
1101        tolerance_str: Option<String>,
1102        coalesce: bool,
1103        allow_eq: bool,
1104        check_sortedness: bool,
1105    ) -> PyResult<Self> {
1106        let coalesce = if coalesce {
1107            JoinCoalesce::CoalesceColumns
1108        } else {
1109            JoinCoalesce::KeepColumns
1110        };
1111        let ldf = self.ldf.read().clone();
1112        let other = other.ldf.into_inner();
1113        let left_on = left_on.inner;
1114        let right_on = right_on.inner;
1115        Ok(ldf
1116            .join_builder()
1117            .with(other)
1118            .left_on([left_on])
1119            .right_on([right_on])
1120            .allow_parallel(allow_parallel)
1121            .force_parallel(force_parallel)
1122            .coalesce(coalesce)
1123            .how(JoinType::AsOf(Box::new(AsOfOptions {
1124                strategy: strategy.0,
1125                left_by: left_by.map(strings_to_pl_smallstr),
1126                right_by: right_by.map(strings_to_pl_smallstr),
1127                tolerance: tolerance.map(|t| {
1128                    let av = t.0.into_static();
1129                    let dtype = av.dtype();
1130                    Scalar::new(dtype, av)
1131                }),
1132                tolerance_str: tolerance_str.map(|s| s.into()),
1133                allow_eq,
1134                check_sortedness,
1135            })))
1136            .suffix(suffix)
1137            .finish()
1138            .into())
1139    }
1140
1141    #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1142    fn join(
1143        &self,
1144        other: Self,
1145        left_on: Vec<PyExpr>,
1146        right_on: Vec<PyExpr>,
1147        allow_parallel: bool,
1148        force_parallel: bool,
1149        nulls_equal: bool,
1150        how: Wrap<JoinType>,
1151        suffix: String,
1152        validate: Wrap<JoinValidation>,
1153        maintain_order: Wrap<MaintainOrderJoin>,
1154        coalesce: Option<bool>,
1155    ) -> PyResult<Self> {
1156        let coalesce = match coalesce {
1157            None => JoinCoalesce::JoinSpecific,
1158            Some(true) => JoinCoalesce::CoalesceColumns,
1159            Some(false) => JoinCoalesce::KeepColumns,
1160        };
1161        let ldf = self.ldf.read().clone();
1162        let other = other.ldf.into_inner();
1163        let left_on = left_on
1164            .into_iter()
1165            .map(|pyexpr| pyexpr.inner)
1166            .collect::<Vec<_>>();
1167        let right_on = right_on
1168            .into_iter()
1169            .map(|pyexpr| pyexpr.inner)
1170            .collect::<Vec<_>>();
1171
1172        Ok(ldf
1173            .join_builder()
1174            .with(other)
1175            .left_on(left_on)
1176            .right_on(right_on)
1177            .allow_parallel(allow_parallel)
1178            .force_parallel(force_parallel)
1179            .join_nulls(nulls_equal)
1180            .how(how.0)
1181            .suffix(suffix)
1182            .validate(validate.0)
1183            .coalesce(coalesce)
1184            .maintain_order(maintain_order.0)
1185            .finish()
1186            .into())
1187    }
1188
1189    fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1190        let ldf = self.ldf.read().clone();
1191        let other = other.ldf.into_inner();
1192
1193        let predicates = predicates.to_exprs();
1194
1195        Ok(ldf
1196            .join_builder()
1197            .with(other)
1198            .suffix(suffix)
1199            .join_where(predicates)
1200            .into())
1201    }
1202
1203    fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1204        let ldf = self.ldf.read().clone();
1205        ldf.with_columns(exprs.to_exprs()).into()
1206    }
1207
1208    fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1209        let ldf = self.ldf.read().clone();
1210        ldf.with_columns_seq(exprs.to_exprs()).into()
1211    }
1212
1213    fn match_to_schema<'py>(
1214        &self,
1215        schema: Wrap<Schema>,
1216        missing_columns: &Bound<'py, PyAny>,
1217        missing_struct_fields: &Bound<'py, PyAny>,
1218        extra_columns: Wrap<ExtraColumnsPolicy>,
1219        extra_struct_fields: &Bound<'py, PyAny>,
1220        integer_cast: &Bound<'py, PyAny>,
1221        float_cast: &Bound<'py, PyAny>,
1222    ) -> PyResult<Self> {
1223        fn parse_missing_columns<'py>(
1224            schema: &Schema,
1225            missing_columns: &Bound<'py, PyAny>,
1226        ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1227            let mut out = Vec::with_capacity(schema.len());
1228            if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1229                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1230            } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1231                out.extend(std::iter::repeat_n(
1232                    MissingColumnsPolicyOrExpr::Raise,
1233                    schema.len(),
1234                ));
1235                for (key, value) in dict.iter() {
1236                    let key = key.extract::<String>()?;
1237                    let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1238                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1239                }
1240            } else {
1241                return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1242            }
1243            Ok(out)
1244        }
1245        fn parse_missing_struct_fields<'py>(
1246            schema: &Schema,
1247            missing_struct_fields: &Bound<'py, PyAny>,
1248        ) -> PyResult<Vec<MissingColumnsPolicy>> {
1249            let mut out = Vec::with_capacity(schema.len());
1250            if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1251                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1252            } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1253                out.extend(std::iter::repeat_n(
1254                    MissingColumnsPolicy::Raise,
1255                    schema.len(),
1256                ));
1257                for (key, value) in dict.iter() {
1258                    let key = key.extract::<String>()?;
1259                    let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1260                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1261                }
1262            } else {
1263                return Err(PyTypeError::new_err(
1264                    "Invalid value for `missing_struct_fields`",
1265                ));
1266            }
1267            Ok(out)
1268        }
1269        fn parse_extra_struct_fields<'py>(
1270            schema: &Schema,
1271            extra_struct_fields: &Bound<'py, PyAny>,
1272        ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1273            let mut out = Vec::with_capacity(schema.len());
1274            if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1275                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1276            } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1277                out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1278                for (key, value) in dict.iter() {
1279                    let key = key.extract::<String>()?;
1280                    let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1281                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1282                }
1283            } else {
1284                return Err(PyTypeError::new_err(
1285                    "Invalid value for `extra_struct_fields`",
1286                ));
1287            }
1288            Ok(out)
1289        }
1290        fn parse_cast<'py>(
1291            schema: &Schema,
1292            cast: &Bound<'py, PyAny>,
1293        ) -> PyResult<Vec<UpcastOrForbid>> {
1294            let mut out = Vec::with_capacity(schema.len());
1295            if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1296                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1297            } else if let Ok(dict) = cast.downcast::<PyDict>() {
1298                out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1299                for (key, value) in dict.iter() {
1300                    let key = key.extract::<String>()?;
1301                    let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1302                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1303                }
1304            } else {
1305                return Err(PyTypeError::new_err(
1306                    "Invalid value for `integer_cast` / `float_cast`",
1307                ));
1308            }
1309            Ok(out)
1310        }
1311
1312        let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1313        let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1314        let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1315        let integer_cast = parse_cast(&schema.0, integer_cast)?;
1316        let float_cast = parse_cast(&schema.0, float_cast)?;
1317
1318        let per_column = (0..schema.0.len())
1319            .map(|i| MatchToSchemaPerColumn {
1320                missing_columns: missing_columns[i].clone(),
1321                missing_struct_fields: missing_struct_fields[i],
1322                extra_struct_fields: extra_struct_fields[i],
1323                integer_cast: integer_cast[i],
1324                float_cast: float_cast[i],
1325            })
1326            .collect();
1327
1328        let ldf = self.ldf.read().clone();
1329        Ok(ldf
1330            .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1331            .into())
1332    }
1333
1334    fn pipe_with_schema(&self, callback: PyObject) -> Self {
1335        let ldf = self.ldf.read().clone();
1336        let function = PythonObject(callback);
1337        ldf.pipe_with_schema(PlanCallback::new_python(function))
1338            .into()
1339    }
1340
1341    fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1342        let ldf = self.ldf.read().clone();
1343        ldf.rename(existing, new, strict).into()
1344    }
1345
1346    fn reverse(&self) -> Self {
1347        let ldf = self.ldf.read().clone();
1348        ldf.reverse().into()
1349    }
1350
1351    #[pyo3(signature = (n, fill_value=None))]
1352    fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1353        let lf = self.ldf.read().clone();
1354        let out = match fill_value {
1355            Some(v) => lf.shift_and_fill(n.inner, v.inner),
1356            None => lf.shift(n.inner),
1357        };
1358        out.into()
1359    }
1360
1361    fn fill_nan(&self, fill_value: PyExpr) -> Self {
1362        let ldf = self.ldf.read().clone();
1363        ldf.fill_nan(fill_value.inner).into()
1364    }
1365
1366    fn min(&self) -> Self {
1367        let ldf = self.ldf.read().clone();
1368        let out = ldf.min();
1369        out.into()
1370    }
1371
1372    fn max(&self) -> Self {
1373        let ldf = self.ldf.read().clone();
1374        let out = ldf.max();
1375        out.into()
1376    }
1377
1378    fn sum(&self) -> Self {
1379        let ldf = self.ldf.read().clone();
1380        let out = ldf.sum();
1381        out.into()
1382    }
1383
1384    fn mean(&self) -> Self {
1385        let ldf = self.ldf.read().clone();
1386        let out = ldf.mean();
1387        out.into()
1388    }
1389
1390    fn std(&self, ddof: u8) -> Self {
1391        let ldf = self.ldf.read().clone();
1392        let out = ldf.std(ddof);
1393        out.into()
1394    }
1395
1396    fn var(&self, ddof: u8) -> Self {
1397        let ldf = self.ldf.read().clone();
1398        let out = ldf.var(ddof);
1399        out.into()
1400    }
1401
1402    fn median(&self) -> Self {
1403        let ldf = self.ldf.read().clone();
1404        let out = ldf.median();
1405        out.into()
1406    }
1407
1408    fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1409        let ldf = self.ldf.read().clone();
1410        let out = ldf.quantile(quantile.inner, interpolation.0);
1411        out.into()
1412    }
1413
1414    fn explode(&self, subset: PySelector) -> Self {
1415        self.ldf.read().clone().explode(subset.inner).into()
1416    }
1417
1418    fn null_count(&self) -> Self {
1419        let ldf = self.ldf.read().clone();
1420        ldf.null_count().into()
1421    }
1422
1423    #[pyo3(signature = (maintain_order, subset, keep))]
1424    fn unique(
1425        &self,
1426        maintain_order: bool,
1427        subset: Option<PySelector>,
1428        keep: Wrap<UniqueKeepStrategy>,
1429    ) -> Self {
1430        let ldf = self.ldf.read().clone();
1431        let subset = subset.map(|e| e.inner);
1432        match maintain_order {
1433            true => ldf.unique_stable_generic(subset, keep.0),
1434            false => ldf.unique_generic(subset, keep.0),
1435        }
1436        .into()
1437    }
1438
1439    fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1440        self.ldf
1441            .read()
1442            .clone()
1443            .drop_nans(subset.map(|e| e.inner))
1444            .into()
1445    }
1446
1447    fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1448        self.ldf
1449            .read()
1450            .clone()
1451            .drop_nulls(subset.map(|e| e.inner))
1452            .into()
1453    }
1454
1455    #[pyo3(signature = (offset, len=None))]
1456    fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1457        let ldf = self.ldf.read().clone();
1458        ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1459    }
1460
1461    fn tail(&self, n: IdxSize) -> Self {
1462        let ldf = self.ldf.read().clone();
1463        ldf.tail(n).into()
1464    }
1465
1466    #[cfg(feature = "pivot")]
1467    #[pyo3(signature = (on, index, value_name, variable_name))]
1468    fn unpivot(
1469        &self,
1470        on: PySelector,
1471        index: PySelector,
1472        value_name: Option<String>,
1473        variable_name: Option<String>,
1474    ) -> Self {
1475        let args = UnpivotArgsDSL {
1476            on: on.inner,
1477            index: index.inner,
1478            value_name: value_name.map(|s| s.into()),
1479            variable_name: variable_name.map(|s| s.into()),
1480        };
1481
1482        let ldf = self.ldf.read().clone();
1483        ldf.unpivot(args).into()
1484    }
1485
1486    #[pyo3(signature = (name, offset=None))]
1487    fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1488        let ldf = self.ldf.read().clone();
1489        ldf.with_row_index(name, offset).into()
1490    }
1491
1492    #[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1493    fn map_batches(
1494        &self,
1495        function: PyObject,
1496        predicate_pushdown: bool,
1497        projection_pushdown: bool,
1498        slice_pushdown: bool,
1499        streamable: bool,
1500        schema: Option<Wrap<Schema>>,
1501        validate_output: bool,
1502    ) -> Self {
1503        let mut opt = OptFlags::default();
1504        opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1505        opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1506        opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1507        opt.set(OptFlags::NEW_STREAMING, streamable);
1508
1509        self.ldf
1510            .read()
1511            .clone()
1512            .map_python(
1513                function.into(),
1514                opt,
1515                schema.map(|s| Arc::new(s.0)),
1516                validate_output,
1517            )
1518            .into()
1519    }
1520
1521    fn drop(&self, columns: PySelector) -> Self {
1522        self.ldf.read().clone().drop(columns.inner).into()
1523    }
1524
1525    fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1526        let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1527        cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1528        self.ldf.read().clone().cast(cast_map, strict).into()
1529    }
1530
1531    fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1532        self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1533    }
1534
1535    fn clone(&self) -> Self {
1536        self.ldf.read().clone().into()
1537    }
1538
1539    fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1540        let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1541
1542        let schema_dict = PyDict::new(py);
1543        schema.iter_fields().for_each(|fld| {
1544            schema_dict
1545                .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1546                .unwrap()
1547        });
1548        Ok(schema_dict)
1549    }
1550
1551    fn unnest(&self, columns: PySelector) -> Self {
1552        self.ldf.read().clone().unnest(columns.inner).into()
1553    }
1554
1555    fn count(&self) -> Self {
1556        let ldf = self.ldf.read().clone();
1557        ldf.count().into()
1558    }
1559
1560    #[cfg(feature = "merge_sorted")]
1561    fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1562        let out = self
1563            .ldf
1564            .read()
1565            .clone()
1566            .merge_sorted(other.ldf.into_inner(), key)
1567            .map_err(PyPolarsErr::from)?;
1568        Ok(out.into())
1569    }
1570}
1571
1572#[cfg(feature = "parquet")]
1573impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1574    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1575        use polars_io::parquet::write::ParquetFieldOverwrites;
1576
1577        let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1578
1579        let name = PyDictMethods::get_item(&parsed, "name")?
1580            .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1581            .transpose()?;
1582        let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1583            PyResult::Ok(ChildFieldOverwrites::None),
1584            |v| {
1585                Ok(
1586                    if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1587                        ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1588                    } else {
1589                        ChildFieldOverwrites::ListLike(Box::new(
1590                            v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1591                        ))
1592                    },
1593                )
1594            },
1595        )?;
1596
1597        let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1598            .map(|v| v.extract::<i32>())
1599            .transpose()?;
1600
1601        let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1602            .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1603            .transpose()?;
1604        let metadata = metadata.map(|v| {
1605            v.into_iter()
1606                .map(|v| MetadataKeyValue {
1607                    key: v.0.into(),
1608                    value: v.1.map(|v| v.into()),
1609                })
1610                .collect()
1611        });
1612
1613        let required = PyDictMethods::get_item(&parsed, "required")?
1614            .map(|v| v.extract::<bool>())
1615            .transpose()?;
1616
1617        Ok(Wrap(ParquetFieldOverwrites {
1618            name,
1619            children,
1620            field_id,
1621            metadata,
1622            required,
1623        }))
1624    }
1625}