polars_python/lazyframe/
general.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::PathBuf;
4
5use polars::io::{HiveOptions, RowIndex};
6use polars::time::*;
7use polars_core::prelude::*;
8#[cfg(feature = "parquet")]
9use polars_parquet::arrow::write::StatisticsOptions;
10use polars_plan::plans::ScanSources;
11use pyo3::prelude::*;
12use pyo3::pybacked::PyBackedStr;
13use pyo3::types::{PyDict, PyList};
14
15use super::PyLazyFrame;
16use crate::error::PyPolarsErr;
17use crate::expr::ToExprs;
18use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
19use crate::lazyframe::visit::NodeTraverser;
20use crate::prelude::*;
21use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
22
23fn pyobject_to_first_path_and_scan_sources(
24    obj: PyObject,
25) -> PyResult<(Option<PathBuf>, ScanSources)> {
26    use crate::file::{get_python_scan_source_input, PythonScanSourceInput};
27    Ok(match get_python_scan_source_input(obj, false)? {
28        PythonScanSourceInput::Path(path) => {
29            (Some(path.clone()), ScanSources::Paths([path].into()))
30        },
31        PythonScanSourceInput::File(file) => (None, ScanSources::Files([file].into())),
32        PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
33    })
34}
35
36#[pymethods]
37#[allow(clippy::should_implement_trait)]
38impl PyLazyFrame {
39    #[staticmethod]
40    #[cfg(feature = "json")]
41    #[allow(clippy::too_many_arguments)]
42    #[pyo3(signature = (
43        source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
44        row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
45    ))]
46    fn new_from_ndjson(
47        source: Option<PyObject>,
48        sources: Wrap<ScanSources>,
49        infer_schema_length: Option<usize>,
50        schema: Option<Wrap<Schema>>,
51        schema_overrides: Option<Wrap<Schema>>,
52        batch_size: Option<NonZeroUsize>,
53        n_rows: Option<usize>,
54        low_memory: bool,
55        rechunk: bool,
56        row_index: Option<(String, IdxSize)>,
57        ignore_errors: bool,
58        include_file_paths: Option<String>,
59        cloud_options: Option<Vec<(String, String)>>,
60        credential_provider: Option<PyObject>,
61        retries: usize,
62        file_cache_ttl: Option<u64>,
63    ) -> PyResult<Self> {
64        use cloud::credential_provider::PlCredentialProvider;
65        let row_index = row_index.map(|(name, offset)| RowIndex {
66            name: name.into(),
67            offset,
68        });
69
70        let sources = sources.0;
71        let (first_path, sources) = match source {
72            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
73            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
74        };
75
76        let mut r = LazyJsonLineReader::new_with_sources(sources);
77
78        #[cfg(feature = "cloud")]
79        if let Some(first_path) = first_path {
80            let first_path_url = first_path.to_string_lossy();
81
82            let mut cloud_options =
83                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
84            cloud_options = cloud_options
85                .with_max_retries(retries)
86                .with_credential_provider(
87                    credential_provider.map(PlCredentialProvider::from_python_func_object),
88                );
89
90            if let Some(file_cache_ttl) = file_cache_ttl {
91                cloud_options.file_cache_ttl = file_cache_ttl;
92            }
93
94            r = r.with_cloud_options(Some(cloud_options));
95        };
96
97        let lf = r
98            .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
99            .with_batch_size(batch_size)
100            .with_n_rows(n_rows)
101            .low_memory(low_memory)
102            .with_rechunk(rechunk)
103            .with_schema(schema.map(|schema| Arc::new(schema.0)))
104            .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
105            .with_row_index(row_index)
106            .with_ignore_errors(ignore_errors)
107            .with_include_file_paths(include_file_paths.map(|x| x.into()))
108            .finish()
109            .map_err(PyPolarsErr::from)?;
110
111        Ok(lf.into())
112    }
113
114    #[staticmethod]
115    #[cfg(feature = "csv")]
116    #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
117        low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
118        infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
119        encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
120        cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
121    )
122    )]
123    fn new_from_csv(
124        source: Option<PyObject>,
125        sources: Wrap<ScanSources>,
126        separator: &str,
127        has_header: bool,
128        ignore_errors: bool,
129        skip_rows: usize,
130        skip_lines: usize,
131        n_rows: Option<usize>,
132        cache: bool,
133        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
134        low_memory: bool,
135        comment_prefix: Option<&str>,
136        quote_char: Option<&str>,
137        null_values: Option<Wrap<NullValues>>,
138        missing_utf8_is_empty_string: bool,
139        infer_schema_length: Option<usize>,
140        with_schema_modify: Option<PyObject>,
141        rechunk: bool,
142        skip_rows_after_header: usize,
143        encoding: Wrap<CsvEncoding>,
144        row_index: Option<(String, IdxSize)>,
145        try_parse_dates: bool,
146        eol_char: &str,
147        raise_if_empty: bool,
148        truncate_ragged_lines: bool,
149        decimal_comma: bool,
150        glob: bool,
151        schema: Option<Wrap<Schema>>,
152        cloud_options: Option<Vec<(String, String)>>,
153        credential_provider: Option<PyObject>,
154        retries: usize,
155        file_cache_ttl: Option<u64>,
156        include_file_paths: Option<String>,
157    ) -> PyResult<Self> {
158        #[cfg(feature = "cloud")]
159        use cloud::credential_provider::PlCredentialProvider;
160
161        let null_values = null_values.map(|w| w.0);
162        let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
163        let separator = separator
164            .as_bytes()
165            .first()
166            .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
167            .copied()
168            .map_err(PyPolarsErr::from)?;
169        let eol_char = eol_char
170            .as_bytes()
171            .first()
172            .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
173            .copied()
174            .map_err(PyPolarsErr::from)?;
175        let row_index = row_index.map(|(name, offset)| RowIndex {
176            name: name.into(),
177            offset,
178        });
179
180        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
181            overwrite_dtype
182                .into_iter()
183                .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
184                .collect::<Schema>()
185        });
186
187        let sources = sources.0;
188        let (first_path, sources) = match source {
189            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
190            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
191        };
192
193        let mut r = LazyCsvReader::new_with_sources(sources);
194
195        #[cfg(feature = "cloud")]
196        if let Some(first_path) = first_path {
197            let first_path_url = first_path.to_string_lossy();
198
199            let mut cloud_options =
200                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
201            if let Some(file_cache_ttl) = file_cache_ttl {
202                cloud_options.file_cache_ttl = file_cache_ttl;
203            }
204            cloud_options = cloud_options
205                .with_max_retries(retries)
206                .with_credential_provider(
207                    credential_provider.map(PlCredentialProvider::from_python_func_object),
208                );
209            r = r.with_cloud_options(Some(cloud_options));
210        }
211
212        let mut r = r
213            .with_infer_schema_length(infer_schema_length)
214            .with_separator(separator)
215            .with_has_header(has_header)
216            .with_ignore_errors(ignore_errors)
217            .with_skip_rows(skip_rows)
218            .with_skip_lines(skip_lines)
219            .with_n_rows(n_rows)
220            .with_cache(cache)
221            .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
222            .with_schema(schema.map(|schema| Arc::new(schema.0)))
223            .with_low_memory(low_memory)
224            .with_comment_prefix(comment_prefix.map(|x| x.into()))
225            .with_quote_char(quote_char)
226            .with_eol_char(eol_char)
227            .with_rechunk(rechunk)
228            .with_skip_rows_after_header(skip_rows_after_header)
229            .with_encoding(encoding.0)
230            .with_row_index(row_index)
231            .with_try_parse_dates(try_parse_dates)
232            .with_null_values(null_values)
233            .with_missing_is_null(!missing_utf8_is_empty_string)
234            .with_truncate_ragged_lines(truncate_ragged_lines)
235            .with_decimal_comma(decimal_comma)
236            .with_glob(glob)
237            .with_raise_if_empty(raise_if_empty)
238            .with_include_file_paths(include_file_paths.map(|x| x.into()));
239
240        if let Some(lambda) = with_schema_modify {
241            let f = |schema: Schema| {
242                let iter = schema.iter_names().map(|s| s.as_str());
243                Python::with_gil(|py| {
244                    let names = PyList::new(py, iter).unwrap();
245
246                    let out = lambda.call1(py, (names,)).expect("python function failed");
247                    let new_names = out
248                        .extract::<Vec<String>>(py)
249                        .expect("python function should return List[str]");
250                    polars_ensure!(new_names.len() == schema.len(),
251                        ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
252                    );
253                    Ok(schema
254                        .iter_values()
255                        .zip(new_names)
256                        .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
257                        .collect())
258                })
259            };
260            r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
261        }
262
263        Ok(r.finish().map_err(PyPolarsErr::from)?.into())
264    }
265
266    #[cfg(feature = "parquet")]
267    #[staticmethod]
268    #[pyo3(signature = (
269        source, sources, n_rows, cache, parallel, rechunk, row_index, low_memory, cloud_options,
270        credential_provider, use_statistics, hive_partitioning, schema, hive_schema,
271        try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns,
272    ))]
273    fn new_from_parquet(
274        source: Option<PyObject>,
275        sources: Wrap<ScanSources>,
276        n_rows: Option<usize>,
277        cache: bool,
278        parallel: Wrap<ParallelStrategy>,
279        rechunk: bool,
280        row_index: Option<(String, IdxSize)>,
281        low_memory: bool,
282        cloud_options: Option<Vec<(String, String)>>,
283        credential_provider: Option<PyObject>,
284        use_statistics: bool,
285        hive_partitioning: Option<bool>,
286        schema: Option<Wrap<Schema>>,
287        hive_schema: Option<Wrap<Schema>>,
288        try_parse_hive_dates: bool,
289        retries: usize,
290        glob: bool,
291        include_file_paths: Option<String>,
292        allow_missing_columns: bool,
293    ) -> PyResult<Self> {
294        use cloud::credential_provider::PlCredentialProvider;
295
296        let parallel = parallel.0;
297        let hive_schema = hive_schema.map(|s| Arc::new(s.0));
298
299        let row_index = row_index.map(|(name, offset)| RowIndex {
300            name: name.into(),
301            offset,
302        });
303
304        let hive_options = HiveOptions {
305            enabled: hive_partitioning,
306            hive_start_idx: 0,
307            schema: hive_schema,
308            try_parse_dates: try_parse_hive_dates,
309        };
310
311        let mut args = ScanArgsParquet {
312            n_rows,
313            cache,
314            parallel,
315            rechunk,
316            row_index,
317            low_memory,
318            cloud_options: None,
319            use_statistics,
320            schema: schema.map(|x| Arc::new(x.0)),
321            hive_options,
322            glob,
323            include_file_paths: include_file_paths.map(|x| x.into()),
324            allow_missing_columns,
325        };
326
327        let sources = sources.0;
328        let (first_path, sources) = match source {
329            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
330            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
331        };
332
333        #[cfg(feature = "cloud")]
334        if let Some(first_path) = first_path {
335            let first_path_url = first_path.to_string_lossy();
336            let cloud_options =
337                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
338            args.cloud_options = Some(
339                cloud_options
340                    .with_max_retries(retries)
341                    .with_credential_provider(
342                        credential_provider.map(PlCredentialProvider::from_python_func_object),
343                    ),
344            );
345        }
346
347        let lf = LazyFrame::scan_parquet_sources(sources, args).map_err(PyPolarsErr::from)?;
348
349        Ok(lf.into())
350    }
351
352    #[cfg(feature = "ipc")]
353    #[staticmethod]
354    #[pyo3(signature = (
355        source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
356        hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
357        include_file_paths
358    ))]
359    fn new_from_ipc(
360        source: Option<PyObject>,
361        sources: Wrap<ScanSources>,
362        n_rows: Option<usize>,
363        cache: bool,
364        rechunk: bool,
365        row_index: Option<(String, IdxSize)>,
366        cloud_options: Option<Vec<(String, String)>>,
367        credential_provider: Option<PyObject>,
368        hive_partitioning: Option<bool>,
369        hive_schema: Option<Wrap<Schema>>,
370        try_parse_hive_dates: bool,
371        retries: usize,
372        file_cache_ttl: Option<u64>,
373        include_file_paths: Option<String>,
374    ) -> PyResult<Self> {
375        #[cfg(feature = "cloud")]
376        use cloud::credential_provider::PlCredentialProvider;
377        let row_index = row_index.map(|(name, offset)| RowIndex {
378            name: name.into(),
379            offset,
380        });
381
382        let hive_options = HiveOptions {
383            enabled: hive_partitioning,
384            hive_start_idx: 0,
385            schema: hive_schema.map(|x| Arc::new(x.0)),
386            try_parse_dates: try_parse_hive_dates,
387        };
388
389        let mut args = ScanArgsIpc {
390            n_rows,
391            cache,
392            rechunk,
393            row_index,
394            cloud_options: None,
395            hive_options,
396            include_file_paths: include_file_paths.map(|x| x.into()),
397        };
398
399        let sources = sources.0;
400        let (first_path, sources) = match source {
401            None => (sources.first_path().map(|p| p.to_path_buf()), sources),
402            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
403        };
404
405        #[cfg(feature = "cloud")]
406        if let Some(first_path) = first_path {
407            let first_path_url = first_path.to_string_lossy();
408
409            let mut cloud_options =
410                parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
411            if let Some(file_cache_ttl) = file_cache_ttl {
412                cloud_options.file_cache_ttl = file_cache_ttl;
413            }
414            args.cloud_options = Some(
415                cloud_options
416                    .with_max_retries(retries)
417                    .with_credential_provider(
418                        credential_provider.map(PlCredentialProvider::from_python_func_object),
419                    ),
420            );
421        }
422
423        let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
424        Ok(lf.into())
425    }
426
427    #[staticmethod]
428    fn scan_from_python_function_arrow_schema(
429        schema: &Bound<'_, PyList>,
430        scan_fn: PyObject,
431        pyarrow: bool,
432    ) -> PyResult<Self> {
433        let schema = pyarrow_schema_to_rust(schema)?;
434        Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into())
435    }
436
437    #[staticmethod]
438    fn scan_from_python_function_pl_schema(
439        schema: Vec<(PyBackedStr, Wrap<DataType>)>,
440        scan_fn: PyObject,
441        pyarrow: bool,
442    ) -> PyResult<Self> {
443        let schema = Schema::from_iter(
444            schema
445                .into_iter()
446                .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
447        );
448        Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into())
449    }
450
451    fn describe_plan(&self) -> PyResult<String> {
452        self.ldf
453            .describe_plan()
454            .map_err(PyPolarsErr::from)
455            .map_err(Into::into)
456    }
457
458    fn describe_optimized_plan(&self) -> PyResult<String> {
459        self.ldf
460            .describe_optimized_plan()
461            .map_err(PyPolarsErr::from)
462            .map_err(Into::into)
463    }
464
465    fn describe_plan_tree(&self) -> PyResult<String> {
466        self.ldf
467            .describe_plan_tree()
468            .map_err(PyPolarsErr::from)
469            .map_err(Into::into)
470    }
471
472    fn describe_optimized_plan_tree(&self) -> PyResult<String> {
473        self.ldf
474            .describe_optimized_plan_tree()
475            .map_err(PyPolarsErr::from)
476            .map_err(Into::into)
477    }
478
479    fn to_dot(&self, optimized: bool) -> PyResult<String> {
480        let result = self.ldf.to_dot(optimized).map_err(PyPolarsErr::from)?;
481        Ok(result)
482    }
483
484    fn optimization_toggle(
485        &self,
486        type_coercion: bool,
487        type_check: bool,
488        predicate_pushdown: bool,
489        projection_pushdown: bool,
490        simplify_expression: bool,
491        slice_pushdown: bool,
492        comm_subplan_elim: bool,
493        comm_subexpr_elim: bool,
494        cluster_with_columns: bool,
495        collapse_joins: bool,
496        streaming: bool,
497        _eager: bool,
498        _check_order: bool,
499        #[allow(unused_variables)] new_streaming: bool,
500    ) -> Self {
501        let ldf = self.ldf.clone();
502        let mut ldf = ldf
503            .with_type_coercion(type_coercion)
504            .with_type_check(type_check)
505            .with_predicate_pushdown(predicate_pushdown)
506            .with_simplify_expr(simplify_expression)
507            .with_slice_pushdown(slice_pushdown)
508            .with_cluster_with_columns(cluster_with_columns)
509            .with_collapse_joins(collapse_joins)
510            .with_check_order(_check_order)
511            ._with_eager(_eager)
512            .with_projection_pushdown(projection_pushdown);
513
514        #[cfg(feature = "streaming")]
515        {
516            ldf = ldf.with_streaming(streaming);
517        }
518
519        #[cfg(feature = "new_streaming")]
520        {
521            ldf = ldf.with_new_streaming(new_streaming);
522        }
523
524        #[cfg(feature = "cse")]
525        {
526            ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
527            ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
528        }
529
530        ldf.into()
531    }
532
533    fn sort(
534        &self,
535        by_column: &str,
536        descending: bool,
537        nulls_last: bool,
538        maintain_order: bool,
539        multithreaded: bool,
540    ) -> Self {
541        let ldf = self.ldf.clone();
542        ldf.sort(
543            [by_column],
544            SortMultipleOptions {
545                descending: vec![descending],
546                nulls_last: vec![nulls_last],
547                multithreaded,
548                maintain_order,
549                limit: None,
550            },
551        )
552        .into()
553    }
554
555    fn sort_by_exprs(
556        &self,
557        by: Vec<PyExpr>,
558        descending: Vec<bool>,
559        nulls_last: Vec<bool>,
560        maintain_order: bool,
561        multithreaded: bool,
562    ) -> Self {
563        let ldf = self.ldf.clone();
564        let exprs = by.to_exprs();
565        ldf.sort_by_exprs(
566            exprs,
567            SortMultipleOptions {
568                descending,
569                nulls_last,
570                maintain_order,
571                multithreaded,
572                limit: None,
573            },
574        )
575        .into()
576    }
577
578    fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
579        let ldf = self.ldf.clone();
580        let exprs = by.to_exprs();
581        ldf.top_k(
582            k,
583            exprs,
584            SortMultipleOptions::new().with_order_descending_multi(reverse),
585        )
586        .into()
587    }
588
589    fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
590        let ldf = self.ldf.clone();
591        let exprs = by.to_exprs();
592        ldf.bottom_k(
593            k,
594            exprs,
595            SortMultipleOptions::new().with_order_descending_multi(reverse),
596        )
597        .into()
598    }
599
600    fn cache(&self) -> Self {
601        let ldf = self.ldf.clone();
602        ldf.cache().into()
603    }
604
605    fn profile(&self, py: Python) -> PyResult<(PyDataFrame, PyDataFrame)> {
606        // if we don't allow threads and we have udfs trying to acquire the gil from different
607        // threads we deadlock.
608        let (df, time_df) = py.allow_threads(|| {
609            let ldf = self.ldf.clone();
610            ldf.profile().map_err(PyPolarsErr::from)
611        })?;
612        Ok((df.into(), time_df.into()))
613    }
614
615    #[pyo3(signature = (lambda_post_opt=None))]
616    fn collect(&self, py: Python, lambda_post_opt: Option<PyObject>) -> PyResult<PyDataFrame> {
617        // if we don't allow threads and we have udfs trying to acquire the gil from different
618        // threads we deadlock.
619        let df = py.allow_threads(|| {
620            let ldf = self.ldf.clone();
621            if let Some(lambda) = lambda_post_opt {
622                ldf._collect_post_opt(|root, lp_arena, expr_arena| {
623                    Python::with_gil(|py| {
624                        let nt = NodeTraverser::new(
625                            root,
626                            std::mem::take(lp_arena),
627                            std::mem::take(expr_arena),
628                        );
629
630                        // Get a copy of the arena's.
631                        let arenas = nt.get_arenas();
632
633                        // Pass the node visitor which allows the python callback to replace parts of the query plan.
634                        // Remove "cuda" or specify better once we have multiple post-opt callbacks.
635                        lambda.call1(py, (nt,)).map_err(
636                            |e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e),
637                        )?;
638
639                        // Unpack the arena's.
640                        // At this point the `nt` is useless.
641
642                        std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
643                        std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
644
645                        Ok(())
646                    })
647                })
648            } else {
649                ldf.collect()
650            }
651            .map_err(PyPolarsErr::from)
652        })?;
653        Ok(df.into())
654    }
655
656    #[pyo3(signature = (lambda,))]
657    fn collect_with_callback(&self, lambda: PyObject) {
658        let ldf = self.ldf.clone();
659
660        polars_core::POOL.spawn(move || {
661            let result = ldf
662                .collect()
663                .map(PyDataFrame::new)
664                .map_err(PyPolarsErr::from);
665
666            Python::with_gil(|py| match result {
667                Ok(df) => {
668                    lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
669                },
670                Err(err) => {
671                    lambda
672                        .call1(py, (PyErr::from(err),))
673                        .map_err(|err| err.restore(py))
674                        .ok();
675                },
676            });
677        });
678    }
679
680    #[cfg(all(feature = "streaming", feature = "parquet"))]
681    #[pyo3(signature = (
682        path, compression, compression_level, statistics, row_group_size, data_page_size,
683        maintain_order, cloud_options, credential_provider, retries
684    ))]
685    fn sink_parquet(
686        &self,
687        py: Python,
688        path: PathBuf,
689        compression: &str,
690        compression_level: Option<i32>,
691        statistics: Wrap<StatisticsOptions>,
692        row_group_size: Option<usize>,
693        data_page_size: Option<usize>,
694        maintain_order: bool,
695        cloud_options: Option<Vec<(String, String)>>,
696        credential_provider: Option<PyObject>,
697        retries: usize,
698    ) -> PyResult<()> {
699        let compression = parse_parquet_compression(compression, compression_level)?;
700
701        let options = ParquetWriteOptions {
702            compression,
703            statistics: statistics.0,
704            row_group_size,
705            data_page_size,
706            maintain_order,
707        };
708
709        let cloud_options = {
710            let cloud_options =
711                parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?;
712            Some(
713                cloud_options
714                    .with_max_retries(retries)
715                    .with_credential_provider(
716                        credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object),
717                    ),
718            )
719        };
720
721        // if we don't allow threads and we have udfs trying to acquire the gil from different
722        // threads we deadlock.
723        py.allow_threads(|| {
724            let ldf = self.ldf.clone();
725            ldf.sink_parquet(&path, options, cloud_options)
726                .map_err(PyPolarsErr::from)
727        })?;
728        Ok(())
729    }
730
731    #[cfg(all(feature = "streaming", feature = "ipc"))]
732    #[pyo3(signature = (path, compression, maintain_order, cloud_options, credential_provider, retries))]
733    fn sink_ipc(
734        &self,
735        py: Python,
736        path: PathBuf,
737        compression: Option<Wrap<IpcCompression>>,
738        maintain_order: bool,
739        cloud_options: Option<Vec<(String, String)>>,
740        credential_provider: Option<PyObject>,
741        retries: usize,
742    ) -> PyResult<()> {
743        let options = IpcWriterOptions {
744            compression: compression.map(|c| c.0),
745            maintain_order,
746        };
747
748        #[cfg(feature = "cloud")]
749        let cloud_options = {
750            let cloud_options =
751                parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?;
752            Some(
753                cloud_options
754                    .with_max_retries(retries)
755                    .with_credential_provider(
756                        credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object),
757                    ),
758            )
759        };
760
761        #[cfg(not(feature = "cloud"))]
762        let cloud_options = None;
763
764        // if we don't allow threads and we have udfs trying to acquire the gil from different
765        // threads we deadlock.
766        py.allow_threads(|| {
767            let ldf = self.ldf.clone();
768            ldf.sink_ipc(path, options, cloud_options)
769                .map_err(PyPolarsErr::from)
770        })?;
771        Ok(())
772    }
773
774    #[cfg(all(feature = "streaming", feature = "csv"))]
775    #[pyo3(signature = (
776        path, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
777        datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
778        quote_style, maintain_order, cloud_options, credential_provider, retries
779    ))]
780    fn sink_csv(
781        &self,
782        py: Python,
783        path: PathBuf,
784        include_bom: bool,
785        include_header: bool,
786        separator: u8,
787        line_terminator: String,
788        quote_char: u8,
789        batch_size: NonZeroUsize,
790        datetime_format: Option<String>,
791        date_format: Option<String>,
792        time_format: Option<String>,
793        float_scientific: Option<bool>,
794        float_precision: Option<usize>,
795        null_value: Option<String>,
796        quote_style: Option<Wrap<QuoteStyle>>,
797        maintain_order: bool,
798        cloud_options: Option<Vec<(String, String)>>,
799        credential_provider: Option<PyObject>,
800        retries: usize,
801    ) -> PyResult<()> {
802        let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
803        let null_value = null_value.unwrap_or(SerializeOptions::default().null);
804
805        let serialize_options = SerializeOptions {
806            date_format,
807            time_format,
808            datetime_format,
809            float_scientific,
810            float_precision,
811            separator,
812            quote_char,
813            null: null_value,
814            line_terminator,
815            quote_style,
816        };
817
818        let options = CsvWriterOptions {
819            include_bom,
820            include_header,
821            maintain_order,
822            batch_size,
823            serialize_options,
824        };
825
826        #[cfg(feature = "cloud")]
827        let cloud_options = {
828            let cloud_options =
829                parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?;
830            Some(
831                cloud_options
832                    .with_max_retries(retries)
833                    .with_credential_provider(
834                        credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object),
835                    ),
836            )
837        };
838
839        #[cfg(not(feature = "cloud"))]
840        let cloud_options = None;
841
842        // if we don't allow threads and we have udfs trying to acquire the gil from different
843        // threads we deadlock.
844        py.allow_threads(|| {
845            let ldf = self.ldf.clone();
846            ldf.sink_csv(path, options, cloud_options)
847                .map_err(PyPolarsErr::from)
848        })?;
849        Ok(())
850    }
851
852    #[allow(clippy::too_many_arguments)]
853    #[cfg(all(feature = "streaming", feature = "json"))]
854    #[pyo3(signature = (path, maintain_order, cloud_options, credential_provider, retries))]
855    fn sink_json(
856        &self,
857        py: Python,
858        path: PathBuf,
859        maintain_order: bool,
860        cloud_options: Option<Vec<(String, String)>>,
861        credential_provider: Option<PyObject>,
862        retries: usize,
863    ) -> PyResult<()> {
864        let options = JsonWriterOptions { maintain_order };
865
866        let cloud_options = {
867            let cloud_options =
868                parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?;
869            Some(
870                cloud_options
871                    .with_max_retries(retries)
872                    .with_credential_provider(
873                        credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object),
874                    ),
875            )
876        };
877
878        // if we don't allow threads and we have udfs trying to acquire the gil from different
879        // threads we deadlock.
880        py.allow_threads(|| {
881            let ldf = self.ldf.clone();
882            ldf.sink_json(path, options, cloud_options)
883                .map_err(PyPolarsErr::from)
884        })?;
885        Ok(())
886    }
887
888    fn fetch(&self, py: Python, n_rows: usize) -> PyResult<PyDataFrame> {
889        let ldf = self.ldf.clone();
890        let df = py.allow_threads(|| ldf.fetch(n_rows).map_err(PyPolarsErr::from))?;
891        Ok(df.into())
892    }
893
894    fn filter(&mut self, predicate: PyExpr) -> Self {
895        let ldf = self.ldf.clone();
896        ldf.filter(predicate.inner).into()
897    }
898
899    fn select(&mut self, exprs: Vec<PyExpr>) -> Self {
900        let ldf = self.ldf.clone();
901        let exprs = exprs.to_exprs();
902        ldf.select(exprs).into()
903    }
904
905    fn select_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
906        let ldf = self.ldf.clone();
907        let exprs = exprs.to_exprs();
908        ldf.select_seq(exprs).into()
909    }
910
911    fn group_by(&mut self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
912        let ldf = self.ldf.clone();
913        let by = by.to_exprs();
914        let lazy_gb = if maintain_order {
915            ldf.group_by_stable(by)
916        } else {
917            ldf.group_by(by)
918        };
919
920        PyLazyGroupBy { lgb: Some(lazy_gb) }
921    }
922
923    fn rolling(
924        &mut self,
925        index_column: PyExpr,
926        period: &str,
927        offset: &str,
928        closed: Wrap<ClosedWindow>,
929        by: Vec<PyExpr>,
930    ) -> PyResult<PyLazyGroupBy> {
931        let closed_window = closed.0;
932        let ldf = self.ldf.clone();
933        let by = by
934            .into_iter()
935            .map(|pyexpr| pyexpr.inner)
936            .collect::<Vec<_>>();
937        let lazy_gb = ldf.rolling(
938            index_column.inner,
939            by,
940            RollingGroupOptions {
941                index_column: "".into(),
942                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
943                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
944                closed_window,
945            },
946        );
947
948        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
949    }
950
951    fn group_by_dynamic(
952        &mut self,
953        index_column: PyExpr,
954        every: &str,
955        period: &str,
956        offset: &str,
957        label: Wrap<Label>,
958        include_boundaries: bool,
959        closed: Wrap<ClosedWindow>,
960        group_by: Vec<PyExpr>,
961        start_by: Wrap<StartBy>,
962    ) -> PyResult<PyLazyGroupBy> {
963        let closed_window = closed.0;
964        let group_by = group_by
965            .into_iter()
966            .map(|pyexpr| pyexpr.inner)
967            .collect::<Vec<_>>();
968        let ldf = self.ldf.clone();
969        let lazy_gb = ldf.group_by_dynamic(
970            index_column.inner,
971            group_by,
972            DynamicGroupOptions {
973                every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
974                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
975                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
976                label: label.0,
977                include_boundaries,
978                closed_window,
979                start_by: start_by.0,
980                ..Default::default()
981            },
982        );
983
984        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
985    }
986
987    fn with_context(&self, contexts: Vec<Self>) -> Self {
988        let contexts = contexts.into_iter().map(|ldf| ldf.ldf).collect::<Vec<_>>();
989        self.ldf.clone().with_context(contexts).into()
990    }
991
992    #[cfg(feature = "asof_join")]
993    #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
994    fn join_asof(
995        &self,
996        other: Self,
997        left_on: PyExpr,
998        right_on: PyExpr,
999        left_by: Option<Vec<PyBackedStr>>,
1000        right_by: Option<Vec<PyBackedStr>>,
1001        allow_parallel: bool,
1002        force_parallel: bool,
1003        suffix: String,
1004        strategy: Wrap<AsofStrategy>,
1005        tolerance: Option<Wrap<AnyValue<'_>>>,
1006        tolerance_str: Option<String>,
1007        coalesce: bool,
1008        allow_eq: bool,
1009        check_sortedness: bool,
1010    ) -> PyResult<Self> {
1011        let coalesce = if coalesce {
1012            JoinCoalesce::CoalesceColumns
1013        } else {
1014            JoinCoalesce::KeepColumns
1015        };
1016        let ldf = self.ldf.clone();
1017        let other = other.ldf;
1018        let left_on = left_on.inner;
1019        let right_on = right_on.inner;
1020        Ok(ldf
1021            .join_builder()
1022            .with(other)
1023            .left_on([left_on])
1024            .right_on([right_on])
1025            .allow_parallel(allow_parallel)
1026            .force_parallel(force_parallel)
1027            .coalesce(coalesce)
1028            .how(JoinType::AsOf(AsOfOptions {
1029                strategy: strategy.0,
1030                left_by: left_by.map(strings_to_pl_smallstr),
1031                right_by: right_by.map(strings_to_pl_smallstr),
1032                tolerance: tolerance.map(|t| t.0.into_static()),
1033                tolerance_str: tolerance_str.map(|s| s.into()),
1034                allow_eq,
1035                check_sortedness,
1036            }))
1037            .suffix(suffix)
1038            .finish()
1039            .into())
1040    }
1041
1042    #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, join_nulls, how, suffix, validate, maintain_order, coalesce=None))]
1043    fn join(
1044        &self,
1045        other: Self,
1046        left_on: Vec<PyExpr>,
1047        right_on: Vec<PyExpr>,
1048        allow_parallel: bool,
1049        force_parallel: bool,
1050        join_nulls: bool,
1051        how: Wrap<JoinType>,
1052        suffix: String,
1053        validate: Wrap<JoinValidation>,
1054        maintain_order: Wrap<MaintainOrderJoin>,
1055        coalesce: Option<bool>,
1056    ) -> PyResult<Self> {
1057        let coalesce = match coalesce {
1058            None => JoinCoalesce::JoinSpecific,
1059            Some(true) => JoinCoalesce::CoalesceColumns,
1060            Some(false) => JoinCoalesce::KeepColumns,
1061        };
1062        let ldf = self.ldf.clone();
1063        let other = other.ldf;
1064        let left_on = left_on
1065            .into_iter()
1066            .map(|pyexpr| pyexpr.inner)
1067            .collect::<Vec<_>>();
1068        let right_on = right_on
1069            .into_iter()
1070            .map(|pyexpr| pyexpr.inner)
1071            .collect::<Vec<_>>();
1072
1073        Ok(ldf
1074            .join_builder()
1075            .with(other)
1076            .left_on(left_on)
1077            .right_on(right_on)
1078            .allow_parallel(allow_parallel)
1079            .force_parallel(force_parallel)
1080            .join_nulls(join_nulls)
1081            .how(how.0)
1082            .suffix(suffix)
1083            .validate(validate.0)
1084            .coalesce(coalesce)
1085            .maintain_order(maintain_order.0)
1086            .finish()
1087            .into())
1088    }
1089
1090    fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1091        let ldf = self.ldf.clone();
1092        let other = other.ldf;
1093
1094        let predicates = predicates.to_exprs();
1095
1096        Ok(ldf
1097            .join_builder()
1098            .with(other)
1099            .suffix(suffix)
1100            .join_where(predicates)
1101            .into())
1102    }
1103
1104    fn with_columns(&mut self, exprs: Vec<PyExpr>) -> Self {
1105        let ldf = self.ldf.clone();
1106        ldf.with_columns(exprs.to_exprs()).into()
1107    }
1108
1109    fn with_columns_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1110        let ldf = self.ldf.clone();
1111        ldf.with_columns_seq(exprs.to_exprs()).into()
1112    }
1113
1114    fn rename(&mut self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1115        let ldf = self.ldf.clone();
1116        ldf.rename(existing, new, strict).into()
1117    }
1118
1119    fn reverse(&self) -> Self {
1120        let ldf = self.ldf.clone();
1121        ldf.reverse().into()
1122    }
1123
1124    #[pyo3(signature = (n, fill_value=None))]
1125    fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1126        let lf = self.ldf.clone();
1127        let out = match fill_value {
1128            Some(v) => lf.shift_and_fill(n.inner, v.inner),
1129            None => lf.shift(n.inner),
1130        };
1131        out.into()
1132    }
1133
1134    fn fill_nan(&self, fill_value: PyExpr) -> Self {
1135        let ldf = self.ldf.clone();
1136        ldf.fill_nan(fill_value.inner).into()
1137    }
1138
1139    fn min(&self) -> Self {
1140        let ldf = self.ldf.clone();
1141        let out = ldf.min();
1142        out.into()
1143    }
1144
1145    fn max(&self) -> Self {
1146        let ldf = self.ldf.clone();
1147        let out = ldf.max();
1148        out.into()
1149    }
1150
1151    fn sum(&self) -> Self {
1152        let ldf = self.ldf.clone();
1153        let out = ldf.sum();
1154        out.into()
1155    }
1156
1157    fn mean(&self) -> Self {
1158        let ldf = self.ldf.clone();
1159        let out = ldf.mean();
1160        out.into()
1161    }
1162
1163    fn std(&self, ddof: u8) -> Self {
1164        let ldf = self.ldf.clone();
1165        let out = ldf.std(ddof);
1166        out.into()
1167    }
1168
1169    fn var(&self, ddof: u8) -> Self {
1170        let ldf = self.ldf.clone();
1171        let out = ldf.var(ddof);
1172        out.into()
1173    }
1174
1175    fn median(&self) -> Self {
1176        let ldf = self.ldf.clone();
1177        let out = ldf.median();
1178        out.into()
1179    }
1180
1181    fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1182        let ldf = self.ldf.clone();
1183        let out = ldf.quantile(quantile.inner, interpolation.0);
1184        out.into()
1185    }
1186
1187    fn explode(&self, column: Vec<PyExpr>) -> Self {
1188        let ldf = self.ldf.clone();
1189        let column = column.to_exprs();
1190        ldf.explode(column).into()
1191    }
1192
1193    fn null_count(&self) -> Self {
1194        let ldf = self.ldf.clone();
1195        ldf.null_count().into()
1196    }
1197
1198    #[pyo3(signature = (maintain_order, subset, keep))]
1199    fn unique(
1200        &self,
1201        maintain_order: bool,
1202        subset: Option<Vec<PyExpr>>,
1203        keep: Wrap<UniqueKeepStrategy>,
1204    ) -> Self {
1205        let ldf = self.ldf.clone();
1206        let subset = subset.map(|e| e.to_exprs());
1207        match maintain_order {
1208            true => ldf.unique_stable_generic(subset, keep.0),
1209            false => ldf.unique_generic(subset, keep.0),
1210        }
1211        .into()
1212    }
1213
1214    #[pyo3(signature = (subset=None))]
1215    fn drop_nans(&self, subset: Option<Vec<PyExpr>>) -> Self {
1216        let ldf = self.ldf.clone();
1217        let subset = subset.map(|e| e.to_exprs());
1218        ldf.drop_nans(subset).into()
1219    }
1220
1221    #[pyo3(signature = (subset=None))]
1222    fn drop_nulls(&self, subset: Option<Vec<PyExpr>>) -> Self {
1223        let ldf = self.ldf.clone();
1224        let subset = subset.map(|e| e.to_exprs());
1225        ldf.drop_nulls(subset).into()
1226    }
1227
1228    #[pyo3(signature = (offset, len=None))]
1229    fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1230        let ldf = self.ldf.clone();
1231        ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1232    }
1233
1234    fn tail(&self, n: IdxSize) -> Self {
1235        let ldf = self.ldf.clone();
1236        ldf.tail(n).into()
1237    }
1238
1239    #[cfg(feature = "pivot")]
1240    #[pyo3(signature = (on, index, value_name, variable_name))]
1241    fn unpivot(
1242        &self,
1243        on: Vec<PyExpr>,
1244        index: Vec<PyExpr>,
1245        value_name: Option<String>,
1246        variable_name: Option<String>,
1247    ) -> Self {
1248        let args = UnpivotArgsDSL {
1249            on: on.into_iter().map(|e| e.inner.into()).collect(),
1250            index: index.into_iter().map(|e| e.inner.into()).collect(),
1251            value_name: value_name.map(|s| s.into()),
1252            variable_name: variable_name.map(|s| s.into()),
1253        };
1254
1255        let ldf = self.ldf.clone();
1256        ldf.unpivot(args).into()
1257    }
1258
1259    #[pyo3(signature = (name, offset=None))]
1260    fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1261        let ldf = self.ldf.clone();
1262        ldf.with_row_index(name, offset).into()
1263    }
1264
1265    #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1266    fn map_batches(
1267        &self,
1268        lambda: PyObject,
1269        predicate_pushdown: bool,
1270        projection_pushdown: bool,
1271        slice_pushdown: bool,
1272        streamable: bool,
1273        schema: Option<Wrap<Schema>>,
1274        validate_output: bool,
1275    ) -> Self {
1276        let mut opt = OptFlags::default();
1277        opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1278        opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1279        opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1280        opt.set(OptFlags::STREAMING, streamable);
1281
1282        self.ldf
1283            .clone()
1284            .map_python(
1285                lambda.into(),
1286                opt,
1287                schema.map(|s| Arc::new(s.0)),
1288                validate_output,
1289            )
1290            .into()
1291    }
1292
1293    fn drop(&self, columns: Vec<PyExpr>, strict: bool) -> Self {
1294        let ldf = self.ldf.clone();
1295        let columns = columns.to_exprs();
1296        if strict {
1297            ldf.drop(columns)
1298        } else {
1299            ldf.drop_no_validate(columns)
1300        }
1301        .into()
1302    }
1303
1304    fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1305        let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1306        cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1307        self.ldf.clone().cast(cast_map, strict).into()
1308    }
1309
1310    fn cast_all(&self, dtype: Wrap<DataType>, strict: bool) -> Self {
1311        self.ldf.clone().cast_all(dtype.0, strict).into()
1312    }
1313
1314    fn clone(&self) -> Self {
1315        self.ldf.clone().into()
1316    }
1317
1318    fn collect_schema<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1319        let schema = py
1320            .allow_threads(|| self.ldf.collect_schema())
1321            .map_err(PyPolarsErr::from)?;
1322
1323        let schema_dict = PyDict::new(py);
1324        schema.iter_fields().for_each(|fld| {
1325            schema_dict
1326                .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1327                .unwrap()
1328        });
1329        Ok(schema_dict)
1330    }
1331
1332    fn unnest(&self, columns: Vec<PyExpr>) -> Self {
1333        let columns = columns.to_exprs();
1334        self.ldf.clone().unnest(columns).into()
1335    }
1336
1337    fn count(&self) -> Self {
1338        let ldf = self.ldf.clone();
1339        ldf.count().into()
1340    }
1341
1342    #[cfg(feature = "merge_sorted")]
1343    fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1344        let out = self
1345            .ldf
1346            .clone()
1347            .merge_sorted(other.ldf, key)
1348            .map_err(PyPolarsErr::from)?;
1349        Ok(out.into())
1350    }
1351}