polars_python/lazyframe/
general.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3
4use either::Either;
5use polars::io::RowIndex;
6use polars::time::*;
7use polars_core::prelude::*;
8#[cfg(feature = "parquet")]
9use polars_parquet::arrow::write::StatisticsOptions;
10use polars_plan::dsl::ScanSources;
11use polars_plan::plans::{AExpr, HintIR, IR, Sorted};
12use polars_utils::arena::{Arena, Node};
13use polars_utils::python_function::PythonObject;
14use pyo3::exceptions::{PyTypeError, PyValueError};
15use pyo3::prelude::*;
16use pyo3::pybacked::PyBackedStr;
17use pyo3::types::{PyDict, PyDictMethods, PyList};
18
19use super::{PyLazyFrame, PyOptFlags, SinkTarget};
20use crate::error::PyPolarsErr;
21use crate::expr::ToExprs;
22use crate::expr::datatype::PyDataTypeExpr;
23use crate::expr::selector::PySelector;
24use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
25use crate::io::PyScanOptions;
26use crate::lazyframe::visit::NodeTraverser;
27use crate::prelude::*;
28use crate::utils::{EnterPolarsExt, to_py_err};
29use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
30
31fn pyobject_to_first_path_and_scan_sources(
32    obj: Py<PyAny>,
33) -> PyResult<(Option<PlPath>, ScanSources)> {
34    use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
35    Ok(match get_python_scan_source_input(obj, false)? {
36        PythonScanSourceInput::Path(path) => (
37            Some(path.clone()),
38            ScanSources::Paths(FromIterator::from_iter([path])),
39        ),
40        PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
41        PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
42    })
43}
44
45fn post_opt_callback(
46    lambda: &Py<PyAny>,
47    root: Node,
48    lp_arena: &mut Arena<IR>,
49    expr_arena: &mut Arena<AExpr>,
50    duration_since_start: Option<std::time::Duration>,
51) -> PolarsResult<()> {
52    Python::attach(|py| {
53        let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
54
55        // Get a copy of the arenas.
56        let arenas = nt.get_arenas();
57
58        // Pass the node visitor which allows the python callback to replace parts of the query plan.
59        // Remove "cuda" or specify better once we have multiple post-opt callbacks.
60        lambda
61            .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
62            .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
63
64        // Unpack the arenas.
65        // At this point the `nt` is useless.
66
67        std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
68        std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
69
70        Ok(())
71    })
72}
73
74#[pymethods]
75#[allow(clippy::should_implement_trait)]
76impl PyLazyFrame {
77    #[staticmethod]
78    #[cfg(feature = "json")]
79    #[allow(clippy::too_many_arguments)]
80    #[pyo3(signature = (
81        source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
82        row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
83    ))]
84    fn new_from_ndjson(
85        source: Option<Py<PyAny>>,
86        sources: Wrap<ScanSources>,
87        infer_schema_length: Option<usize>,
88        schema: Option<Wrap<Schema>>,
89        schema_overrides: Option<Wrap<Schema>>,
90        batch_size: Option<NonZeroUsize>,
91        n_rows: Option<usize>,
92        low_memory: bool,
93        rechunk: bool,
94        row_index: Option<(String, IdxSize)>,
95        ignore_errors: bool,
96        include_file_paths: Option<String>,
97        cloud_options: Option<Vec<(String, String)>>,
98        credential_provider: Option<Py<PyAny>>,
99        retries: usize,
100        file_cache_ttl: Option<u64>,
101    ) -> PyResult<Self> {
102        use cloud::credential_provider::PlCredentialProvider;
103        let row_index = row_index.map(|(name, offset)| RowIndex {
104            name: name.into(),
105            offset,
106        });
107
108        let sources = sources.0;
109        let (first_path, sources) = match source {
110            None => (sources.first_path().map(|p| p.into_owned()), sources),
111            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
112        };
113
114        let mut r = LazyJsonLineReader::new_with_sources(sources);
115
116        #[cfg(feature = "cloud")]
117        if let Some(first_path) = first_path {
118            let first_path_url = first_path.to_str();
119
120            let mut cloud_options =
121                parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
122            cloud_options = cloud_options
123                .with_max_retries(retries)
124                .with_credential_provider(
125                    credential_provider.map(PlCredentialProvider::from_python_builder),
126                );
127
128            if let Some(file_cache_ttl) = file_cache_ttl {
129                cloud_options.file_cache_ttl = file_cache_ttl;
130            }
131
132            r = r.with_cloud_options(Some(cloud_options));
133        };
134
135        let lf = r
136            .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
137            .with_batch_size(batch_size)
138            .with_n_rows(n_rows)
139            .low_memory(low_memory)
140            .with_rechunk(rechunk)
141            .with_schema(schema.map(|schema| Arc::new(schema.0)))
142            .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
143            .with_row_index(row_index)
144            .with_ignore_errors(ignore_errors)
145            .with_include_file_paths(include_file_paths.map(|x| x.into()))
146            .finish()
147            .map_err(PyPolarsErr::from)?;
148
149        Ok(lf.into())
150    }
151
152    #[staticmethod]
153    #[cfg(feature = "csv")]
154    #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
155        low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
156        infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
157        encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
158        cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
159    )
160    )]
161    fn new_from_csv(
162        source: Option<Py<PyAny>>,
163        sources: Wrap<ScanSources>,
164        separator: &str,
165        has_header: bool,
166        ignore_errors: bool,
167        skip_rows: usize,
168        skip_lines: usize,
169        n_rows: Option<usize>,
170        cache: bool,
171        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
172        low_memory: bool,
173        comment_prefix: Option<&str>,
174        quote_char: Option<&str>,
175        null_values: Option<Wrap<NullValues>>,
176        missing_utf8_is_empty_string: bool,
177        infer_schema_length: Option<usize>,
178        with_schema_modify: Option<Py<PyAny>>,
179        rechunk: bool,
180        skip_rows_after_header: usize,
181        encoding: Wrap<CsvEncoding>,
182        row_index: Option<(String, IdxSize)>,
183        try_parse_dates: bool,
184        eol_char: &str,
185        raise_if_empty: bool,
186        truncate_ragged_lines: bool,
187        decimal_comma: bool,
188        glob: bool,
189        schema: Option<Wrap<Schema>>,
190        cloud_options: Option<Vec<(String, String)>>,
191        credential_provider: Option<Py<PyAny>>,
192        retries: usize,
193        file_cache_ttl: Option<u64>,
194        include_file_paths: Option<String>,
195    ) -> PyResult<Self> {
196        #[cfg(feature = "cloud")]
197        use cloud::credential_provider::PlCredentialProvider;
198
199        let null_values = null_values.map(|w| w.0);
200        let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
201        let separator = separator
202            .as_bytes()
203            .first()
204            .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
205            .copied()
206            .map_err(PyPolarsErr::from)?;
207        let eol_char = eol_char
208            .as_bytes()
209            .first()
210            .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
211            .copied()
212            .map_err(PyPolarsErr::from)?;
213        let row_index = row_index.map(|(name, offset)| RowIndex {
214            name: name.into(),
215            offset,
216        });
217
218        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
219            overwrite_dtype
220                .into_iter()
221                .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
222                .collect::<Schema>()
223        });
224
225        let sources = sources.0;
226        let (first_path, sources) = match source {
227            None => (sources.first_path().map(|p| p.into_owned()), sources),
228            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
229        };
230
231        let mut r = LazyCsvReader::new_with_sources(sources);
232
233        #[cfg(feature = "cloud")]
234        if let Some(first_path) = first_path {
235            let first_path_url = first_path.to_str();
236
237            let mut cloud_options =
238                parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
239            if let Some(file_cache_ttl) = file_cache_ttl {
240                cloud_options.file_cache_ttl = file_cache_ttl;
241            }
242            cloud_options = cloud_options
243                .with_max_retries(retries)
244                .with_credential_provider(
245                    credential_provider.map(PlCredentialProvider::from_python_builder),
246                );
247            r = r.with_cloud_options(Some(cloud_options));
248        }
249
250        let mut r = r
251            .with_infer_schema_length(infer_schema_length)
252            .with_separator(separator)
253            .with_has_header(has_header)
254            .with_ignore_errors(ignore_errors)
255            .with_skip_rows(skip_rows)
256            .with_skip_lines(skip_lines)
257            .with_n_rows(n_rows)
258            .with_cache(cache)
259            .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
260            .with_schema(schema.map(|schema| Arc::new(schema.0)))
261            .with_low_memory(low_memory)
262            .with_comment_prefix(comment_prefix.map(|x| x.into()))
263            .with_quote_char(quote_char)
264            .with_eol_char(eol_char)
265            .with_rechunk(rechunk)
266            .with_skip_rows_after_header(skip_rows_after_header)
267            .with_encoding(encoding.0)
268            .with_row_index(row_index)
269            .with_try_parse_dates(try_parse_dates)
270            .with_null_values(null_values)
271            .with_missing_is_null(!missing_utf8_is_empty_string)
272            .with_truncate_ragged_lines(truncate_ragged_lines)
273            .with_decimal_comma(decimal_comma)
274            .with_glob(glob)
275            .with_raise_if_empty(raise_if_empty)
276            .with_include_file_paths(include_file_paths.map(|x| x.into()));
277
278        if let Some(lambda) = with_schema_modify {
279            let f = |schema: Schema| {
280                let iter = schema.iter_names().map(|s| s.as_str());
281                Python::attach(|py| {
282                    let names = PyList::new(py, iter).unwrap();
283
284                    let out = lambda.call1(py, (names,)).expect("python function failed");
285                    let new_names = out
286                        .extract::<Vec<String>>(py)
287                        .expect("python function should return List[str]");
288                    polars_ensure!(new_names.len() == schema.len(),
289                        ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
290                    );
291                    Ok(schema
292                        .iter_values()
293                        .zip(new_names)
294                        .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
295                        .collect())
296                })
297            };
298            r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
299        }
300
301        Ok(r.finish().map_err(PyPolarsErr::from)?.into())
302    }
303
304    #[cfg(feature = "parquet")]
305    #[staticmethod]
306    #[pyo3(signature = (
307        sources, schema, scan_options, parallel, low_memory, use_statistics
308    ))]
309    fn new_from_parquet(
310        sources: Wrap<ScanSources>,
311        schema: Option<Wrap<Schema>>,
312        scan_options: PyScanOptions,
313        parallel: Wrap<ParallelStrategy>,
314        low_memory: bool,
315        use_statistics: bool,
316    ) -> PyResult<Self> {
317        use crate::utils::to_py_err;
318
319        let parallel = parallel.0;
320
321        let options = ParquetOptions {
322            schema: schema.map(|x| Arc::new(x.0)),
323            parallel,
324            low_memory,
325            use_statistics,
326        };
327
328        let sources = sources.0;
329        let first_path = sources.first_path().map(|p| p.into_owned());
330
331        let unified_scan_args =
332            scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
333
334        let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
335            .map_err(to_py_err)?
336            .build()
337            .into();
338
339        Ok(lf.into())
340    }
341
342    #[cfg(feature = "ipc")]
343    #[staticmethod]
344    #[pyo3(signature = (sources, scan_options, file_cache_ttl))]
345    fn new_from_ipc(
346        sources: Wrap<ScanSources>,
347        scan_options: PyScanOptions,
348        file_cache_ttl: Option<u64>,
349    ) -> PyResult<Self> {
350        let options = IpcScanOptions;
351
352        let sources = sources.0;
353        let first_path = sources.first_path().map(|p| p.into_owned());
354
355        let mut unified_scan_args =
356            scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
357
358        if let Some(file_cache_ttl) = file_cache_ttl {
359            unified_scan_args
360                .cloud_options
361                .get_or_insert_default()
362                .file_cache_ttl = file_cache_ttl;
363        }
364
365        let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)
366            .map_err(PyPolarsErr::from)?;
367        Ok(lf.into())
368    }
369
370    #[cfg(feature = "scan_lines")]
371    #[staticmethod]
372    #[pyo3(signature = (sources, scan_options, name, file_cache_ttl))]
373    fn new_from_scan_lines(
374        sources: Wrap<ScanSources>,
375        scan_options: PyScanOptions,
376        name: PyBackedStr,
377        file_cache_ttl: Option<u64>,
378    ) -> PyResult<Self> {
379        let sources = sources.0;
380        let first_path = sources.first_path().map(|p| p.into_owned());
381
382        let mut unified_scan_args =
383            scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
384
385        if let Some(file_cache_ttl) = file_cache_ttl {
386            unified_scan_args
387                .cloud_options
388                .get_or_insert_default()
389                .file_cache_ttl = file_cache_ttl;
390        }
391
392        let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())
393            .map_err(to_py_err)?
394            .build();
395        let lf: LazyFrame = dsl.into();
396
397        Ok(lf.into())
398    }
399
400    #[staticmethod]
401    #[pyo3(signature = (
402        dataset_object
403    ))]
404    fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {
405        let lf =
406            LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
407                .into();
408
409        Ok(lf)
410    }
411
412    #[staticmethod]
413    fn scan_from_python_function_arrow_schema(
414        schema: &Bound<'_, PyList>,
415        scan_fn: Py<PyAny>,
416        pyarrow: bool,
417        validate_schema: bool,
418        is_pure: bool,
419    ) -> PyResult<Self> {
420        let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
421
422        Ok(LazyFrame::scan_from_python_function(
423            Either::Right(schema),
424            scan_fn,
425            pyarrow,
426            validate_schema,
427            is_pure,
428        )
429        .into())
430    }
431
432    #[staticmethod]
433    fn scan_from_python_function_pl_schema(
434        schema: Vec<(PyBackedStr, Wrap<DataType>)>,
435        scan_fn: Py<PyAny>,
436        pyarrow: bool,
437        validate_schema: bool,
438        is_pure: bool,
439    ) -> PyResult<Self> {
440        let schema = Arc::new(Schema::from_iter(
441            schema
442                .into_iter()
443                .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
444        ));
445        Ok(LazyFrame::scan_from_python_function(
446            Either::Right(schema),
447            scan_fn,
448            pyarrow,
449            validate_schema,
450            is_pure,
451        )
452        .into())
453    }
454
455    #[staticmethod]
456    fn scan_from_python_function_schema_function(
457        schema_fn: Py<PyAny>,
458        scan_fn: Py<PyAny>,
459        validate_schema: bool,
460        is_pure: bool,
461    ) -> PyResult<Self> {
462        Ok(LazyFrame::scan_from_python_function(
463            Either::Left(schema_fn),
464            scan_fn,
465            false,
466            validate_schema,
467            is_pure,
468        )
469        .into())
470    }
471
472    fn describe_plan(&self, py: Python) -> PyResult<String> {
473        py.enter_polars(|| self.ldf.read().describe_plan())
474    }
475
476    fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
477        py.enter_polars(|| self.ldf.read().describe_optimized_plan())
478    }
479
480    fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
481        py.enter_polars(|| self.ldf.read().describe_plan_tree())
482    }
483
484    fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
485        py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
486    }
487
488    fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
489        py.enter_polars(|| self.ldf.read().to_dot(optimized))
490    }
491
492    #[cfg(feature = "new_streaming")]
493    fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
494        py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
495    }
496
497    fn sort(
498        &self,
499        by_column: &str,
500        descending: bool,
501        nulls_last: bool,
502        maintain_order: bool,
503        multithreaded: bool,
504    ) -> Self {
505        let ldf = self.ldf.read().clone();
506        ldf.sort(
507            [by_column],
508            SortMultipleOptions {
509                descending: vec![descending],
510                nulls_last: vec![nulls_last],
511                multithreaded,
512                maintain_order,
513                limit: None,
514            },
515        )
516        .into()
517    }
518
519    fn sort_by_exprs(
520        &self,
521        by: Vec<PyExpr>,
522        descending: Vec<bool>,
523        nulls_last: Vec<bool>,
524        maintain_order: bool,
525        multithreaded: bool,
526    ) -> Self {
527        let ldf = self.ldf.read().clone();
528        let exprs = by.to_exprs();
529        ldf.sort_by_exprs(
530            exprs,
531            SortMultipleOptions {
532                descending,
533                nulls_last,
534                maintain_order,
535                multithreaded,
536                limit: None,
537            },
538        )
539        .into()
540    }
541
542    fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
543        let ldf = self.ldf.read().clone();
544        let exprs = by.to_exprs();
545        ldf.top_k(
546            k,
547            exprs,
548            SortMultipleOptions::new().with_order_descending_multi(reverse),
549        )
550        .into()
551    }
552
553    fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
554        let ldf = self.ldf.read().clone();
555        let exprs = by.to_exprs();
556        ldf.bottom_k(
557            k,
558            exprs,
559            SortMultipleOptions::new().with_order_descending_multi(reverse),
560        )
561        .into()
562    }
563
564    fn cache(&self) -> Self {
565        let ldf = self.ldf.read().clone();
566        ldf.cache().into()
567    }
568
569    #[pyo3(signature = (optflags))]
570    fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
571        let ldf = self.ldf.read().clone();
572        ldf.with_optimizations(optflags.inner.into_inner()).into()
573    }
574
575    #[pyo3(signature = (lambda_post_opt))]
576    fn profile(
577        &self,
578        py: Python<'_>,
579        lambda_post_opt: Option<Py<PyAny>>,
580    ) -> PyResult<(PyDataFrame, PyDataFrame)> {
581        let (df, time_df) = py.enter_polars(|| {
582            let ldf = self.ldf.read().clone();
583            if let Some(lambda) = lambda_post_opt {
584                ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
585                    post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
586                })
587            } else {
588                ldf.profile()
589            }
590        })?;
591        Ok((df.into(), time_df.into()))
592    }
593
594    #[pyo3(signature = (engine, lambda_post_opt))]
595    fn collect(
596        &self,
597        py: Python<'_>,
598        engine: Wrap<Engine>,
599        lambda_post_opt: Option<Py<PyAny>>,
600    ) -> PyResult<PyDataFrame> {
601        py.enter_polars_df(|| {
602            let ldf = self.ldf.read().clone();
603            if let Some(lambda) = lambda_post_opt {
604                ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
605                    post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
606                })
607            } else {
608                ldf.collect_with_engine(engine.0)
609            }
610        })
611    }
612
613    #[pyo3(signature = (engine, lambda))]
614    fn collect_with_callback(
615        &self,
616        py: Python<'_>,
617        engine: Wrap<Engine>,
618        lambda: Py<PyAny>,
619    ) -> PyResult<()> {
620        py.enter_polars_ok(|| {
621            let ldf = self.ldf.read().clone();
622
623            polars_core::POOL.spawn(move || {
624                let result = ldf
625                    .collect_with_engine(engine.0)
626                    .map(PyDataFrame::new)
627                    .map_err(PyPolarsErr::from);
628
629                Python::attach(|py| match result {
630                    Ok(df) => {
631                        lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
632                    },
633                    Err(err) => {
634                        lambda
635                            .call1(py, (PyErr::from(err),))
636                            .map_err(|err| err.restore(py))
637                            .ok();
638                    },
639                });
640            });
641        })
642    }
643
644    #[cfg(feature = "parquet")]
645    #[pyo3(signature = (
646        target, compression, compression_level, statistics, row_group_size, data_page_size,
647        cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
648    ))]
649    fn sink_parquet(
650        &self,
651        py: Python<'_>,
652        target: SinkTarget,
653        compression: &str,
654        compression_level: Option<i32>,
655        statistics: Wrap<StatisticsOptions>,
656        row_group_size: Option<usize>,
657        data_page_size: Option<usize>,
658        cloud_options: Option<Vec<(String, String)>>,
659        credential_provider: Option<Py<PyAny>>,
660        retries: usize,
661        sink_options: Wrap<SinkOptions>,
662        metadata: Wrap<Option<KeyValueMetadata>>,
663        field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
664    ) -> PyResult<PyLazyFrame> {
665        let compression = parse_parquet_compression(compression, compression_level)?;
666
667        let options = ParquetWriteOptions {
668            compression,
669            statistics: statistics.0,
670            row_group_size,
671            data_page_size,
672            key_value_metadata: metadata.0,
673            field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
674        };
675
676        let cloud_options = match target.base_path() {
677            None => None,
678            Some(base_path) => {
679                let cloud_options =
680                    parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
681                Some(
682                    cloud_options
683                        .with_max_retries(retries)
684                        .with_credential_provider(
685                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
686                        ),
687                )
688            },
689        };
690
691        py.enter_polars(|| {
692            let ldf = self.ldf.read().clone();
693            match target {
694                SinkTarget::File(target) => {
695                    ldf.sink_parquet(target, options, cloud_options, sink_options.0)
696                },
697                SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
698                    Arc::new(partition.base_path.0),
699                    partition.file_path_cb.map(PartitionTargetCallback::Python),
700                    partition.variant,
701                    options,
702                    cloud_options,
703                    sink_options.0,
704                    partition.per_partition_sort_by,
705                    partition.finish_callback,
706                ),
707            }
708            .into()
709        })
710        .map(Into::into)
711        .map_err(Into::into)
712    }
713
714    #[cfg(feature = "ipc")]
715    #[pyo3(signature = (
716        target, compression, compat_level, cloud_options, credential_provider, retries,
717        sink_options
718    ))]
719    fn sink_ipc(
720        &self,
721        py: Python<'_>,
722        target: SinkTarget,
723        compression: Wrap<Option<IpcCompression>>,
724        compat_level: PyCompatLevel,
725        cloud_options: Option<Vec<(String, String)>>,
726        credential_provider: Option<Py<PyAny>>,
727        retries: usize,
728        sink_options: Wrap<SinkOptions>,
729    ) -> PyResult<PyLazyFrame> {
730        let options = IpcWriterOptions {
731            compression: compression.0,
732            compat_level: compat_level.0,
733            ..Default::default()
734        };
735
736        #[cfg(feature = "cloud")]
737        let cloud_options = match target.base_path() {
738            None => None,
739            Some(base_path) => {
740                let cloud_options =
741                    parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
742                Some(
743                    cloud_options
744                        .with_max_retries(retries)
745                        .with_credential_provider(
746                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
747                        ),
748                )
749            },
750        };
751
752        #[cfg(not(feature = "cloud"))]
753        let cloud_options = None;
754
755        py.enter_polars(|| {
756            let ldf = self.ldf.read().clone();
757            match target {
758                SinkTarget::File(target) => {
759                    ldf.sink_ipc(target, options, cloud_options, sink_options.0)
760                },
761                SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
762                    Arc::new(partition.base_path.0),
763                    partition.file_path_cb.map(PartitionTargetCallback::Python),
764                    partition.variant,
765                    options,
766                    cloud_options,
767                    sink_options.0,
768                    partition.per_partition_sort_by,
769                    partition.finish_callback,
770                ),
771            }
772        })
773        .map(Into::into)
774        .map_err(Into::into)
775    }
776
777    #[cfg(feature = "csv")]
778    #[pyo3(signature = (
779        target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
780        datetime_format, date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
781        quote_style, cloud_options, credential_provider, retries, sink_options
782    ))]
783    fn sink_csv(
784        &self,
785        py: Python<'_>,
786        target: SinkTarget,
787        include_bom: bool,
788        include_header: bool,
789        separator: u8,
790        line_terminator: String,
791        quote_char: u8,
792        batch_size: NonZeroUsize,
793        datetime_format: Option<String>,
794        date_format: Option<String>,
795        time_format: Option<String>,
796        float_scientific: Option<bool>,
797        float_precision: Option<usize>,
798        decimal_comma: bool,
799        null_value: Option<String>,
800        quote_style: Option<Wrap<QuoteStyle>>,
801        cloud_options: Option<Vec<(String, String)>>,
802        credential_provider: Option<Py<PyAny>>,
803        retries: usize,
804        sink_options: Wrap<SinkOptions>,
805    ) -> PyResult<PyLazyFrame> {
806        let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
807        let null_value = null_value.unwrap_or(SerializeOptions::default().null);
808
809        let serialize_options = SerializeOptions {
810            date_format,
811            time_format,
812            datetime_format,
813            float_scientific,
814            float_precision,
815            decimal_comma,
816            separator,
817            quote_char,
818            null: null_value,
819            line_terminator,
820            quote_style,
821        };
822
823        let options = CsvWriterOptions {
824            include_bom,
825            include_header,
826            batch_size,
827            serialize_options,
828        };
829
830        #[cfg(feature = "cloud")]
831        let cloud_options = match target.base_path() {
832            None => None,
833            Some(base_path) => {
834                let cloud_options =
835                    parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
836                Some(
837                    cloud_options
838                        .with_max_retries(retries)
839                        .with_credential_provider(
840                            credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
841                        ),
842                )
843            },
844        };
845
846        #[cfg(not(feature = "cloud"))]
847        let cloud_options = None;
848
849        py.enter_polars(|| {
850            let ldf = self.ldf.read().clone();
851            match target {
852                SinkTarget::File(target) => {
853                    ldf.sink_csv(target, options, cloud_options, sink_options.0)
854                },
855                SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
856                    Arc::new(partition.base_path.0),
857                    partition.file_path_cb.map(PartitionTargetCallback::Python),
858                    partition.variant,
859                    options,
860                    cloud_options,
861                    sink_options.0,
862                    partition.per_partition_sort_by,
863                    partition.finish_callback,
864                ),
865            }
866        })
867        .map(Into::into)
868        .map_err(Into::into)
869    }
870
871    #[allow(clippy::too_many_arguments)]
872    #[cfg(feature = "json")]
873    #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
874    fn sink_json(
875        &self,
876        py: Python<'_>,
877        target: SinkTarget,
878        cloud_options: Option<Vec<(String, String)>>,
879        credential_provider: Option<Py<PyAny>>,
880        retries: usize,
881        sink_options: Wrap<SinkOptions>,
882    ) -> PyResult<PyLazyFrame> {
883        let options = JsonWriterOptions {};
884
885        let cloud_options = match target.base_path() {
886            None => None,
887            Some(base_path) => {
888                let cloud_options =
889                    parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
890                Some(
891                cloud_options
892                    .with_max_retries(retries)
893                    .with_credential_provider(
894                        credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
895                    ),
896            )
897            },
898        };
899
900        py.enter_polars(|| {
901            let ldf = self.ldf.read().clone();
902            match target {
903                SinkTarget::File(path) => {
904                    ldf.sink_json(path, options, cloud_options, sink_options.0)
905                },
906                SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
907                    Arc::new(partition.base_path.0),
908                    partition.file_path_cb.map(PartitionTargetCallback::Python),
909                    partition.variant,
910                    options,
911                    cloud_options,
912                    sink_options.0,
913                    partition.per_partition_sort_by,
914                    partition.finish_callback,
915                ),
916            }
917        })
918        .map(Into::into)
919        .map_err(Into::into)
920    }
921
922    #[pyo3(signature = (function, maintain_order, chunk_size))]
923    pub fn sink_batches(
924        &self,
925        py: Python<'_>,
926        function: Py<PyAny>,
927        maintain_order: bool,
928        chunk_size: Option<NonZeroUsize>,
929    ) -> PyResult<PyLazyFrame> {
930        let ldf = self.ldf.read().clone();
931        py.enter_polars(|| {
932            ldf.sink_batches(
933                PlanCallback::new_python(PythonObject(function)),
934                maintain_order,
935                chunk_size,
936            )
937        })
938        .map(Into::into)
939        .map_err(Into::into)
940    }
941
942    fn filter(&self, predicate: PyExpr) -> Self {
943        self.ldf.read().clone().filter(predicate.inner).into()
944    }
945
946    fn remove(&self, predicate: PyExpr) -> Self {
947        let ldf = self.ldf.read().clone();
948        ldf.remove(predicate.inner).into()
949    }
950
951    fn select(&self, exprs: Vec<PyExpr>) -> Self {
952        let ldf = self.ldf.read().clone();
953        let exprs = exprs.to_exprs();
954        ldf.select(exprs).into()
955    }
956
957    fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
958        let ldf = self.ldf.read().clone();
959        let exprs = exprs.to_exprs();
960        ldf.select_seq(exprs).into()
961    }
962
963    fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
964        let ldf = self.ldf.read().clone();
965        let by = by.to_exprs();
966        let lazy_gb = if maintain_order {
967            ldf.group_by_stable(by)
968        } else {
969            ldf.group_by(by)
970        };
971
972        PyLazyGroupBy { lgb: Some(lazy_gb) }
973    }
974
975    fn rolling(
976        &self,
977        index_column: PyExpr,
978        period: &str,
979        offset: &str,
980        closed: Wrap<ClosedWindow>,
981        by: Vec<PyExpr>,
982    ) -> PyResult<PyLazyGroupBy> {
983        let closed_window = closed.0;
984        let ldf = self.ldf.read().clone();
985        let by = by
986            .into_iter()
987            .map(|pyexpr| pyexpr.inner)
988            .collect::<Vec<_>>();
989        let lazy_gb = ldf.rolling(
990            index_column.inner,
991            by,
992            RollingGroupOptions {
993                index_column: "".into(),
994                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
995                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
996                closed_window,
997            },
998        );
999
1000        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1001    }
1002
1003    fn group_by_dynamic(
1004        &self,
1005        index_column: PyExpr,
1006        every: &str,
1007        period: &str,
1008        offset: &str,
1009        label: Wrap<Label>,
1010        include_boundaries: bool,
1011        closed: Wrap<ClosedWindow>,
1012        group_by: Vec<PyExpr>,
1013        start_by: Wrap<StartBy>,
1014    ) -> PyResult<PyLazyGroupBy> {
1015        let closed_window = closed.0;
1016        let group_by = group_by
1017            .into_iter()
1018            .map(|pyexpr| pyexpr.inner)
1019            .collect::<Vec<_>>();
1020        let ldf = self.ldf.read().clone();
1021        let lazy_gb = ldf.group_by_dynamic(
1022            index_column.inner,
1023            group_by,
1024            DynamicGroupOptions {
1025                every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1026                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1027                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1028                label: label.0,
1029                include_boundaries,
1030                closed_window,
1031                start_by: start_by.0,
1032                ..Default::default()
1033            },
1034        );
1035
1036        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1037    }
1038
1039    fn with_context(&self, contexts: Vec<Self>) -> Self {
1040        let contexts = contexts
1041            .into_iter()
1042            .map(|ldf| ldf.ldf.into_inner())
1043            .collect::<Vec<_>>();
1044        self.ldf.read().clone().with_context(contexts).into()
1045    }
1046
1047    #[cfg(feature = "asof_join")]
1048    #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1049    fn join_asof(
1050        &self,
1051        other: Self,
1052        left_on: PyExpr,
1053        right_on: PyExpr,
1054        left_by: Option<Vec<PyBackedStr>>,
1055        right_by: Option<Vec<PyBackedStr>>,
1056        allow_parallel: bool,
1057        force_parallel: bool,
1058        suffix: String,
1059        strategy: Wrap<AsofStrategy>,
1060        tolerance: Option<Wrap<AnyValue<'_>>>,
1061        tolerance_str: Option<String>,
1062        coalesce: bool,
1063        allow_eq: bool,
1064        check_sortedness: bool,
1065    ) -> PyResult<Self> {
1066        let coalesce = if coalesce {
1067            JoinCoalesce::CoalesceColumns
1068        } else {
1069            JoinCoalesce::KeepColumns
1070        };
1071        let ldf = self.ldf.read().clone();
1072        let other = other.ldf.into_inner();
1073        let left_on = left_on.inner;
1074        let right_on = right_on.inner;
1075        Ok(ldf
1076            .join_builder()
1077            .with(other)
1078            .left_on([left_on])
1079            .right_on([right_on])
1080            .allow_parallel(allow_parallel)
1081            .force_parallel(force_parallel)
1082            .coalesce(coalesce)
1083            .how(JoinType::AsOf(Box::new(AsOfOptions {
1084                strategy: strategy.0,
1085                left_by: left_by.map(strings_to_pl_smallstr),
1086                right_by: right_by.map(strings_to_pl_smallstr),
1087                tolerance: tolerance.map(|t| {
1088                    let av = t.0.into_static();
1089                    let dtype = av.dtype();
1090                    Scalar::new(dtype, av)
1091                }),
1092                tolerance_str: tolerance_str.map(|s| s.into()),
1093                allow_eq,
1094                check_sortedness,
1095            })))
1096            .suffix(suffix)
1097            .finish()
1098            .into())
1099    }
1100
1101    #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1102    fn join(
1103        &self,
1104        other: Self,
1105        left_on: Vec<PyExpr>,
1106        right_on: Vec<PyExpr>,
1107        allow_parallel: bool,
1108        force_parallel: bool,
1109        nulls_equal: bool,
1110        how: Wrap<JoinType>,
1111        suffix: String,
1112        validate: Wrap<JoinValidation>,
1113        maintain_order: Wrap<MaintainOrderJoin>,
1114        coalesce: Option<bool>,
1115    ) -> PyResult<Self> {
1116        let coalesce = match coalesce {
1117            None => JoinCoalesce::JoinSpecific,
1118            Some(true) => JoinCoalesce::CoalesceColumns,
1119            Some(false) => JoinCoalesce::KeepColumns,
1120        };
1121        let ldf = self.ldf.read().clone();
1122        let other = other.ldf.into_inner();
1123        let left_on = left_on
1124            .into_iter()
1125            .map(|pyexpr| pyexpr.inner)
1126            .collect::<Vec<_>>();
1127        let right_on = right_on
1128            .into_iter()
1129            .map(|pyexpr| pyexpr.inner)
1130            .collect::<Vec<_>>();
1131
1132        Ok(ldf
1133            .join_builder()
1134            .with(other)
1135            .left_on(left_on)
1136            .right_on(right_on)
1137            .allow_parallel(allow_parallel)
1138            .force_parallel(force_parallel)
1139            .join_nulls(nulls_equal)
1140            .how(how.0)
1141            .suffix(suffix)
1142            .validate(validate.0)
1143            .coalesce(coalesce)
1144            .maintain_order(maintain_order.0)
1145            .finish()
1146            .into())
1147    }
1148
1149    fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1150        let ldf = self.ldf.read().clone();
1151        let other = other.ldf.into_inner();
1152
1153        let predicates = predicates.to_exprs();
1154
1155        Ok(ldf
1156            .join_builder()
1157            .with(other)
1158            .suffix(suffix)
1159            .join_where(predicates)
1160            .into())
1161    }
1162
1163    fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1164        let ldf = self.ldf.read().clone();
1165        ldf.with_columns(exprs.to_exprs()).into()
1166    }
1167
1168    fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1169        let ldf = self.ldf.read().clone();
1170        ldf.with_columns_seq(exprs.to_exprs()).into()
1171    }
1172
1173    fn match_to_schema<'py>(
1174        &self,
1175        schema: Wrap<Schema>,
1176        missing_columns: &Bound<'py, PyAny>,
1177        missing_struct_fields: &Bound<'py, PyAny>,
1178        extra_columns: Wrap<ExtraColumnsPolicy>,
1179        extra_struct_fields: &Bound<'py, PyAny>,
1180        integer_cast: &Bound<'py, PyAny>,
1181        float_cast: &Bound<'py, PyAny>,
1182    ) -> PyResult<Self> {
1183        fn parse_missing_columns<'py>(
1184            schema: &Schema,
1185            missing_columns: &Bound<'py, PyAny>,
1186        ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1187            let mut out = Vec::with_capacity(schema.len());
1188            if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1189                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1190            } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1191                out.extend(std::iter::repeat_n(
1192                    MissingColumnsPolicyOrExpr::Raise,
1193                    schema.len(),
1194                ));
1195                for (key, value) in dict.iter() {
1196                    let key = key.extract::<String>()?;
1197                    let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1198                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1199                }
1200            } else {
1201                return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1202            }
1203            Ok(out)
1204        }
1205        fn parse_missing_struct_fields<'py>(
1206            schema: &Schema,
1207            missing_struct_fields: &Bound<'py, PyAny>,
1208        ) -> PyResult<Vec<MissingColumnsPolicy>> {
1209            let mut out = Vec::with_capacity(schema.len());
1210            if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1211                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1212            } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1213                out.extend(std::iter::repeat_n(
1214                    MissingColumnsPolicy::Raise,
1215                    schema.len(),
1216                ));
1217                for (key, value) in dict.iter() {
1218                    let key = key.extract::<String>()?;
1219                    let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1220                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1221                }
1222            } else {
1223                return Err(PyTypeError::new_err(
1224                    "Invalid value for `missing_struct_fields`",
1225                ));
1226            }
1227            Ok(out)
1228        }
1229        fn parse_extra_struct_fields<'py>(
1230            schema: &Schema,
1231            extra_struct_fields: &Bound<'py, PyAny>,
1232        ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1233            let mut out = Vec::with_capacity(schema.len());
1234            if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1235                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1236            } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1237                out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1238                for (key, value) in dict.iter() {
1239                    let key = key.extract::<String>()?;
1240                    let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1241                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1242                }
1243            } else {
1244                return Err(PyTypeError::new_err(
1245                    "Invalid value for `extra_struct_fields`",
1246                ));
1247            }
1248            Ok(out)
1249        }
1250        fn parse_cast<'py>(
1251            schema: &Schema,
1252            cast: &Bound<'py, PyAny>,
1253        ) -> PyResult<Vec<UpcastOrForbid>> {
1254            let mut out = Vec::with_capacity(schema.len());
1255            if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1256                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1257            } else if let Ok(dict) = cast.downcast::<PyDict>() {
1258                out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1259                for (key, value) in dict.iter() {
1260                    let key = key.extract::<String>()?;
1261                    let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1262                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1263                }
1264            } else {
1265                return Err(PyTypeError::new_err(
1266                    "Invalid value for `integer_cast` / `float_cast`",
1267                ));
1268            }
1269            Ok(out)
1270        }
1271
1272        let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1273        let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1274        let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1275        let integer_cast = parse_cast(&schema.0, integer_cast)?;
1276        let float_cast = parse_cast(&schema.0, float_cast)?;
1277
1278        let per_column = (0..schema.0.len())
1279            .map(|i| MatchToSchemaPerColumn {
1280                missing_columns: missing_columns[i].clone(),
1281                missing_struct_fields: missing_struct_fields[i],
1282                extra_struct_fields: extra_struct_fields[i],
1283                integer_cast: integer_cast[i],
1284                float_cast: float_cast[i],
1285            })
1286            .collect();
1287
1288        let ldf = self.ldf.read().clone();
1289        Ok(ldf
1290            .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1291            .into())
1292    }
1293
1294    fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {
1295        let ldf = self.ldf.read().clone();
1296        let function = PythonObject(callback);
1297        ldf.pipe_with_schema(PlanCallback::new_python(function))
1298            .into()
1299    }
1300
1301    fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1302        let ldf = self.ldf.read().clone();
1303        ldf.rename(existing, new, strict).into()
1304    }
1305
1306    fn reverse(&self) -> Self {
1307        let ldf = self.ldf.read().clone();
1308        ldf.reverse().into()
1309    }
1310
1311    #[pyo3(signature = (n, fill_value=None))]
1312    fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1313        let lf = self.ldf.read().clone();
1314        let out = match fill_value {
1315            Some(v) => lf.shift_and_fill(n.inner, v.inner),
1316            None => lf.shift(n.inner),
1317        };
1318        out.into()
1319    }
1320
1321    fn fill_nan(&self, fill_value: PyExpr) -> Self {
1322        let ldf = self.ldf.read().clone();
1323        ldf.fill_nan(fill_value.inner).into()
1324    }
1325
1326    fn min(&self) -> Self {
1327        let ldf = self.ldf.read().clone();
1328        let out = ldf.min();
1329        out.into()
1330    }
1331
1332    fn max(&self) -> Self {
1333        let ldf = self.ldf.read().clone();
1334        let out = ldf.max();
1335        out.into()
1336    }
1337
1338    fn sum(&self) -> Self {
1339        let ldf = self.ldf.read().clone();
1340        let out = ldf.sum();
1341        out.into()
1342    }
1343
1344    fn mean(&self) -> Self {
1345        let ldf = self.ldf.read().clone();
1346        let out = ldf.mean();
1347        out.into()
1348    }
1349
1350    fn std(&self, ddof: u8) -> Self {
1351        let ldf = self.ldf.read().clone();
1352        let out = ldf.std(ddof);
1353        out.into()
1354    }
1355
1356    fn var(&self, ddof: u8) -> Self {
1357        let ldf = self.ldf.read().clone();
1358        let out = ldf.var(ddof);
1359        out.into()
1360    }
1361
1362    fn median(&self) -> Self {
1363        let ldf = self.ldf.read().clone();
1364        let out = ldf.median();
1365        out.into()
1366    }
1367
1368    fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1369        let ldf = self.ldf.read().clone();
1370        let out = ldf.quantile(quantile.inner, interpolation.0);
1371        out.into()
1372    }
1373
1374    fn explode(&self, subset: PySelector) -> Self {
1375        self.ldf.read().clone().explode(subset.inner).into()
1376    }
1377
1378    fn null_count(&self) -> Self {
1379        let ldf = self.ldf.read().clone();
1380        ldf.null_count().into()
1381    }
1382
1383    #[pyo3(signature = (maintain_order, subset, keep))]
1384    fn unique(
1385        &self,
1386        maintain_order: bool,
1387        subset: Option<PySelector>,
1388        keep: Wrap<UniqueKeepStrategy>,
1389    ) -> Self {
1390        let ldf = self.ldf.read().clone();
1391        let subset = subset.map(|e| e.inner);
1392        match maintain_order {
1393            true => ldf.unique_stable_generic(subset, keep.0),
1394            false => ldf.unique_generic(subset, keep.0),
1395        }
1396        .into()
1397    }
1398
1399    fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1400        self.ldf
1401            .read()
1402            .clone()
1403            .drop_nans(subset.map(|e| e.inner))
1404            .into()
1405    }
1406
1407    fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1408        self.ldf
1409            .read()
1410            .clone()
1411            .drop_nulls(subset.map(|e| e.inner))
1412            .into()
1413    }
1414
1415    #[pyo3(signature = (offset, len=None))]
1416    fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1417        let ldf = self.ldf.read().clone();
1418        ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1419    }
1420
1421    fn tail(&self, n: IdxSize) -> Self {
1422        let ldf = self.ldf.read().clone();
1423        ldf.tail(n).into()
1424    }
1425
1426    #[cfg(feature = "pivot")]
1427    #[pyo3(signature = (on, index, value_name, variable_name))]
1428    fn unpivot(
1429        &self,
1430        on: PySelector,
1431        index: PySelector,
1432        value_name: Option<String>,
1433        variable_name: Option<String>,
1434    ) -> Self {
1435        let args = UnpivotArgsDSL {
1436            on: on.inner,
1437            index: index.inner,
1438            value_name: value_name.map(|s| s.into()),
1439            variable_name: variable_name.map(|s| s.into()),
1440        };
1441
1442        let ldf = self.ldf.read().clone();
1443        ldf.unpivot(args).into()
1444    }
1445
1446    #[pyo3(signature = (name, offset=None))]
1447    fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1448        let ldf = self.ldf.read().clone();
1449        ldf.with_row_index(name, offset).into()
1450    }
1451
1452    #[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1453    fn map_batches(
1454        &self,
1455        function: Py<PyAny>,
1456        predicate_pushdown: bool,
1457        projection_pushdown: bool,
1458        slice_pushdown: bool,
1459        streamable: bool,
1460        schema: Option<Wrap<Schema>>,
1461        validate_output: bool,
1462    ) -> Self {
1463        let mut opt = OptFlags::default();
1464        opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1465        opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1466        opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1467        opt.set(OptFlags::NEW_STREAMING, streamable);
1468
1469        self.ldf
1470            .read()
1471            .clone()
1472            .map_python(
1473                function.into(),
1474                opt,
1475                schema.map(|s| Arc::new(s.0)),
1476                validate_output,
1477            )
1478            .into()
1479    }
1480
1481    fn drop(&self, columns: PySelector) -> Self {
1482        self.ldf.read().clone().drop(columns.inner).into()
1483    }
1484
1485    fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1486        let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1487        cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1488        self.ldf.read().clone().cast(cast_map, strict).into()
1489    }
1490
1491    fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1492        self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1493    }
1494
1495    fn clone(&self) -> Self {
1496        self.ldf.read().clone().into()
1497    }
1498
1499    fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1500        let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1501
1502        let schema_dict = PyDict::new(py);
1503        schema.iter_fields().for_each(|fld| {
1504            schema_dict
1505                .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1506                .unwrap()
1507        });
1508        Ok(schema_dict)
1509    }
1510
1511    fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {
1512        self.ldf
1513            .read()
1514            .clone()
1515            .unnest(columns.inner, separator.map(PlSmallStr::from_str))
1516            .into()
1517    }
1518
1519    fn count(&self) -> Self {
1520        let ldf = self.ldf.read().clone();
1521        ldf.count().into()
1522    }
1523
1524    #[cfg(feature = "merge_sorted")]
1525    fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1526        let out = self
1527            .ldf
1528            .read()
1529            .clone()
1530            .merge_sorted(other.ldf.into_inner(), key)
1531            .map_err(PyPolarsErr::from)?;
1532        Ok(out.into())
1533    }
1534
1535    fn hint_sorted(
1536        &self,
1537        columns: Vec<String>,
1538        descending: Vec<bool>,
1539        nulls_last: Vec<bool>,
1540    ) -> PyResult<Self> {
1541        if columns.len() != descending.len() && descending.len() != 1 {
1542            return Err(PyValueError::new_err(
1543                "`set_sorted` expects the same amount of `columns` as `descending` values.",
1544            ));
1545        }
1546        if columns.len() != nulls_last.len() && nulls_last.len() != 1 {
1547            return Err(PyValueError::new_err(
1548                "`set_sorted` expects the same amount of `columns` as `nulls_last` values.",
1549            ));
1550        }
1551
1552        let mut sorted = columns
1553            .iter()
1554            .map(|c| Sorted {
1555                column: PlSmallStr::from_str(c.as_str()),
1556                descending: false,
1557                nulls_last: false,
1558            })
1559            .collect::<Vec<_>>();
1560
1561        if !columns.is_empty() {
1562            if descending.len() != 1 {
1563                sorted
1564                    .iter_mut()
1565                    .zip(descending)
1566                    .for_each(|(s, d)| s.descending = d);
1567            } else if descending[0] {
1568                sorted.iter_mut().for_each(|s| s.descending = true);
1569            }
1570
1571            if nulls_last.len() != 1 {
1572                sorted
1573                    .iter_mut()
1574                    .zip(nulls_last)
1575                    .for_each(|(s, d)| s.nulls_last = d);
1576            } else if nulls_last[0] {
1577                sorted.iter_mut().for_each(|s| s.nulls_last = true);
1578            }
1579        }
1580
1581        let out = self
1582            .ldf
1583            .read()
1584            .clone()
1585            .hint(HintIR::Sorted(sorted.into()))
1586            .map_err(PyPolarsErr::from)?;
1587        Ok(out.into())
1588    }
1589}
1590
1591#[cfg(feature = "parquet")]
1592impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1593    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1594        use polars_io::parquet::write::ParquetFieldOverwrites;
1595
1596        let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1597
1598        let name = PyDictMethods::get_item(&parsed, "name")?
1599            .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1600            .transpose()?;
1601        let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1602            PyResult::Ok(ChildFieldOverwrites::None),
1603            |v| {
1604                Ok(
1605                    if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1606                        ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1607                    } else {
1608                        ChildFieldOverwrites::ListLike(Box::new(
1609                            v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1610                        ))
1611                    },
1612                )
1613            },
1614        )?;
1615
1616        let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1617            .map(|v| v.extract::<i32>())
1618            .transpose()?;
1619
1620        let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1621            .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1622            .transpose()?;
1623        let metadata = metadata.map(|v| {
1624            v.into_iter()
1625                .map(|v| MetadataKeyValue {
1626                    key: v.0.into(),
1627                    value: v.1.map(|v| v.into()),
1628                })
1629                .collect()
1630        });
1631
1632        let required = PyDictMethods::get_item(&parsed, "required")?
1633            .map(|v| v.extract::<bool>())
1634            .transpose()?;
1635
1636        Ok(Wrap(ParquetFieldOverwrites {
1637            name,
1638            children,
1639            field_id,
1640            metadata,
1641            required,
1642        }))
1643    }
1644}