Skip to main content

polars_python/lazyframe/
general.rs

1use std::collections::HashMap;
2use std::ffi::CString;
3use std::num::NonZeroUsize;
4
5use arrow::ffi::export_iterator;
6use either::Either;
7use parking_lot::Mutex;
8#[cfg(feature = "pivot")]
9use polars::frame::PivotColumnNaming;
10use polars::io::RowIndex;
11use polars::prelude::iceberg_sink_state::IcebergSinkState;
12use polars::time::*;
13use polars_core::prelude::*;
14use polars_core::query_result::QueryResult;
15#[cfg(feature = "parquet")]
16use polars_parquet::arrow::write::StatisticsOptions;
17use polars_plan::dsl::ScanSources;
18use polars_plan::plans::{AExpr, HintIR, IR, Sorted};
19use polars_utils::arena::{Arena, Node};
20use polars_utils::python_function::PythonObject;
21use pyo3::exceptions::{PyTypeError, PyValueError};
22use pyo3::prelude::*;
23use pyo3::pybacked::PyBackedStr;
24use pyo3::types::{PyCapsule, PyDict, PyDictMethods, PyList};
25
26use super::{PyLazyFrame, PyOptFlags};
27use crate::error::PyPolarsErr;
28use crate::expr::ToExprs;
29use crate::expr::datatype::PyDataTypeExpr;
30use crate::expr::selector::PySelector;
31use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
32#[cfg(feature = "json")]
33use crate::io::cloud_options::OptPyCloudOptions;
34use crate::io::scan_options::PyScanOptions;
35use crate::io::sink_options::PySinkOptions;
36use crate::io::sink_output::PyFileSinkDestination;
37use crate::lazyframe::visit::NodeTraverser;
38use crate::prelude::*;
39use crate::utils::{EnterPolarsExt, to_py_err};
40use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
41
42fn pyobject_to_first_path_and_scan_sources(
43    obj: Py<PyAny>,
44) -> PyResult<(Option<PlRefPath>, ScanSources)> {
45    use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
46    Ok(match get_python_scan_source_input(obj, false)? {
47        PythonScanSourceInput::Path(path) => (
48            Some(path.clone()),
49            ScanSources::Paths(FromIterator::from_iter([path])),
50        ),
51        PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
52        PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
53    })
54}
55
56fn post_opt_callback(
57    lambda: &Py<PyAny>,
58    root: Node,
59    lp_arena: &mut Arena<IR>,
60    expr_arena: &mut Arena<AExpr>,
61    duration_since_start: Option<std::time::Duration>,
62) -> PolarsResult<()> {
63    Python::attach(|py| {
64        let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
65
66        // Get a copy of the arenas.
67        let arenas = nt.get_arenas();
68
69        // Pass the node visitor which allows the python callback to replace parts of the query plan.
70        // Remove "cuda" or specify better once we have multiple post-opt callbacks.
71        lambda
72            .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
73            .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
74
75        // Unpack the arenas.
76        // At this point the `nt` is useless.
77
78        std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
79        std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
80
81        Ok(())
82    })
83}
84
85#[pymethods]
86#[allow(clippy::should_implement_trait)]
87impl PyLazyFrame {
88    #[staticmethod]
89    #[cfg(feature = "json")]
90    #[allow(clippy::too_many_arguments)]
91    #[pyo3(signature = (
92        source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
93        row_index, ignore_errors, include_file_paths, cloud_options, credential_provider
94    ))]
95    fn new_from_ndjson(
96        source: Option<Py<PyAny>>,
97        sources: Wrap<ScanSources>,
98        infer_schema_length: Option<usize>,
99        schema: Option<Wrap<Schema>>,
100        schema_overrides: Option<Wrap<Schema>>,
101        batch_size: Option<NonZeroUsize>,
102        n_rows: Option<usize>,
103        low_memory: bool,
104        rechunk: bool,
105        row_index: Option<(String, IdxSize)>,
106        ignore_errors: bool,
107        include_file_paths: Option<String>,
108        cloud_options: OptPyCloudOptions,
109        credential_provider: Option<Py<PyAny>>,
110    ) -> PyResult<Self> {
111        let row_index = row_index.map(|(name, offset)| RowIndex {
112            name: name.into(),
113            offset,
114        });
115
116        let sources = sources.0;
117        let (first_path, sources) = match source {
118            None => (sources.first_path().cloned(), sources),
119            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
120        };
121
122        let mut r = LazyJsonLineReader::new_with_sources(sources);
123
124        if let Some(first_path) = first_path {
125            let first_path_url = first_path.as_str();
126
127            let cloud_options = cloud_options.extract_opt_cloud_options(
128                CloudScheme::from_path(first_path_url),
129                credential_provider,
130            )?;
131
132            r = r.with_cloud_options(cloud_options);
133        };
134
135        let lf = r
136            .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
137            .with_batch_size(batch_size)
138            .with_n_rows(n_rows)
139            .low_memory(low_memory)
140            .with_rechunk(rechunk)
141            .with_schema(schema.map(|schema| Arc::new(schema.0)))
142            .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
143            .with_row_index(row_index)
144            .with_ignore_errors(ignore_errors)
145            .with_include_file_paths(include_file_paths.map(|x| x.into()))
146            .finish()
147            .map_err(PyPolarsErr::from)?;
148
149        Ok(lf.into())
150    }
151
152    #[staticmethod]
153    #[cfg(feature = "csv")]
154    #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
155        low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
156        infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
157        encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
158        cloud_options, credential_provider, include_file_paths, missing_columns
159    )
160    )]
161    fn new_from_csv(
162        source: Option<Py<PyAny>>,
163        sources: Wrap<ScanSources>,
164        separator: &str,
165        has_header: bool,
166        ignore_errors: bool,
167        skip_rows: usize,
168        skip_lines: usize,
169        n_rows: Option<usize>,
170        cache: bool,
171        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
172        low_memory: bool,
173        comment_prefix: Option<&str>,
174        quote_char: Option<&str>,
175        null_values: Option<Wrap<NullValues>>,
176        missing_utf8_is_empty_string: bool,
177        infer_schema_length: Option<usize>,
178        with_schema_modify: Option<Py<PyAny>>,
179        rechunk: bool,
180        skip_rows_after_header: usize,
181        encoding: Wrap<CsvEncoding>,
182        row_index: Option<(String, IdxSize)>,
183        try_parse_dates: bool,
184        eol_char: &str,
185        raise_if_empty: bool,
186        truncate_ragged_lines: bool,
187        decimal_comma: bool,
188        glob: bool,
189        schema: Option<Wrap<Schema>>,
190        cloud_options: OptPyCloudOptions,
191        credential_provider: Option<Py<PyAny>>,
192        include_file_paths: Option<String>,
193        missing_columns: Option<Wrap<MissingColumnsPolicy>>,
194    ) -> PyResult<Self> {
195        let null_values = null_values.map(|w| w.0);
196        let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
197        let separator = separator
198            .as_bytes()
199            .first()
200            .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
201            .copied()
202            .map_err(PyPolarsErr::from)?;
203        let eol_char = eol_char
204            .as_bytes()
205            .first()
206            .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
207            .copied()
208            .map_err(PyPolarsErr::from)?;
209        let row_index = row_index.map(|(name, offset)| RowIndex {
210            name: name.into(),
211            offset,
212        });
213
214        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
215            overwrite_dtype
216                .into_iter()
217                .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
218                .collect::<Schema>()
219        });
220
221        let sources = sources.0;
222        let (first_path, sources) = match source {
223            None => (sources.first_path().cloned(), sources),
224            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
225        };
226
227        let mut r = LazyCsvReader::new_with_sources(sources);
228
229        if let Some(first_path) = first_path {
230            let first_path_url = first_path.as_str();
231            let cloud_options = cloud_options.extract_opt_cloud_options(
232                CloudScheme::from_path(first_path_url),
233                credential_provider,
234            )?;
235            r = r.with_cloud_options(cloud_options);
236        }
237
238        let mut r = r
239            .with_infer_schema_length(infer_schema_length)
240            .with_separator(separator)
241            .with_has_header(has_header)
242            .with_ignore_errors(ignore_errors)
243            .with_skip_rows(skip_rows)
244            .with_skip_lines(skip_lines)
245            .with_n_rows(n_rows)
246            .with_cache(cache)
247            .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
248            .with_schema(schema.map(|schema| Arc::new(schema.0)))
249            .with_low_memory(low_memory)
250            .with_comment_prefix(comment_prefix.map(|x| x.into()))
251            .with_quote_char(quote_char)
252            .with_eol_char(eol_char)
253            .with_rechunk(rechunk)
254            .with_skip_rows_after_header(skip_rows_after_header)
255            .with_encoding(encoding.0)
256            .with_row_index(row_index)
257            .with_try_parse_dates(try_parse_dates)
258            .with_null_values(null_values)
259            .with_missing_is_null(!missing_utf8_is_empty_string)
260            .with_truncate_ragged_lines(truncate_ragged_lines)
261            .with_decimal_comma(decimal_comma)
262            .with_glob(glob)
263            .with_raise_if_empty(raise_if_empty)
264            .with_include_file_paths(include_file_paths.map(|x| x.into()))
265            .with_missing_columns_policy(missing_columns.map(|x| x.0));
266
267        if let Some(lambda) = with_schema_modify {
268            let f = |schema: Schema| {
269                let iter = schema.iter_names().map(|s| s.as_str());
270                Python::attach(|py| {
271                    let names = PyList::new(py, iter).unwrap();
272
273                    let out = lambda.call1(py, (names,)).expect("python function failed");
274                    let new_names = out
275                        .extract::<Vec<String>>(py)
276                        .expect("python function should return List[str]");
277                    polars_ensure!(new_names.len() == schema.len(),
278                        ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
279                    );
280                    Ok(schema
281                        .iter_values()
282                        .zip(new_names)
283                        .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
284                        .collect())
285                })
286            };
287            r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
288        }
289
290        Ok(r.finish().map_err(PyPolarsErr::from)?.into())
291    }
292
293    #[cfg(feature = "parquet")]
294    #[staticmethod]
295    #[pyo3(signature = (
296        sources, schema, scan_options, parallel, low_memory, use_statistics
297    ))]
298    fn new_from_parquet(
299        sources: Wrap<ScanSources>,
300        schema: Option<Wrap<Schema>>,
301        scan_options: PyScanOptions,
302        parallel: Wrap<ParallelStrategy>,
303        low_memory: bool,
304        use_statistics: bool,
305    ) -> PyResult<Self> {
306        use crate::utils::to_py_err;
307
308        let parallel = parallel.0;
309
310        let options = ParquetOptions {
311            schema: schema.map(|x| Arc::new(x.0)),
312            parallel,
313            low_memory,
314            use_statistics,
315        };
316
317        let sources = sources.0;
318        let first_path = sources.first_path();
319
320        let unified_scan_args =
321            scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
322
323        let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
324            .map_err(to_py_err)?
325            .build()
326            .into();
327
328        Ok(lf.into())
329    }
330
331    #[cfg(feature = "ipc")]
332    #[staticmethod]
333    #[pyo3(signature = (sources, record_batch_statistics, scan_options))]
334    fn new_from_ipc(
335        sources: Wrap<ScanSources>,
336        record_batch_statistics: bool,
337        scan_options: PyScanOptions,
338    ) -> PyResult<Self> {
339        let options = IpcScanOptions {
340            record_batch_statistics,
341            checked: Default::default(),
342        };
343
344        let sources = sources.0;
345        let first_path = sources.first_path().cloned();
346
347        let unified_scan_args =
348            scan_options.extract_unified_scan_args(first_path.as_ref().and_then(|x| x.scheme()))?;
349
350        let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)
351            .map_err(PyPolarsErr::from)?;
352        Ok(lf.into())
353    }
354
355    #[cfg(feature = "scan_lines")]
356    #[staticmethod]
357    #[pyo3(signature = (sources, scan_options, name))]
358    fn new_from_scan_lines(
359        sources: Wrap<ScanSources>,
360        scan_options: PyScanOptions,
361        name: PyBackedStr,
362    ) -> PyResult<Self> {
363        let sources = sources.0;
364        let first_path = sources.first_path();
365
366        let unified_scan_args =
367            scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
368
369        let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())
370            .map_err(to_py_err)?
371            .build();
372        let lf: LazyFrame = dsl.into();
373
374        Ok(lf.into())
375    }
376
377    #[cfg(feature = "scan_lines")]
378    #[staticmethod]
379    #[pyo3(signature = (sources, scan_options, name))]
380    fn new_from_expand_paths(
381        sources: Wrap<ScanSources>,
382        scan_options: PyScanOptions,
383        name: PyBackedStr,
384    ) -> PyResult<Self> {
385        let sources = sources.0;
386        let first_path = sources.first_path();
387
388        let unified_scan_args =
389            scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
390
391        let dsl: DslPlan = DslBuilder::expand_paths(sources, unified_scan_args, (&*name).into())
392            .map_err(to_py_err)?
393            .build();
394        let lf: LazyFrame = dsl.into();
395
396        Ok(lf.into())
397    }
398
399    #[staticmethod]
400    #[pyo3(signature = (
401        dataset_object
402    ))]
403    fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {
404        let lf =
405            LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
406                .into();
407
408        Ok(lf)
409    }
410
411    #[staticmethod]
412    fn scan_from_python_function_arrow_schema(
413        schema: &Bound<'_, PyList>,
414        scan_fn: Py<PyAny>,
415        pyarrow: bool,
416        validate_schema: bool,
417        is_pure: bool,
418    ) -> PyResult<Self> {
419        let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
420
421        Ok(LazyFrame::scan_from_python_function(
422            Either::Right(schema),
423            scan_fn,
424            pyarrow,
425            validate_schema,
426            is_pure,
427        )
428        .into())
429    }
430
431    #[staticmethod]
432    fn scan_from_python_function_pl_schema(
433        schema: Vec<(PyBackedStr, Wrap<DataType>)>,
434        scan_fn: Py<PyAny>,
435        pyarrow: bool,
436        validate_schema: bool,
437        is_pure: bool,
438    ) -> PyResult<Self> {
439        let schema = Arc::new(Schema::from_iter(
440            schema
441                .into_iter()
442                .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
443        ));
444        Ok(LazyFrame::scan_from_python_function(
445            Either::Right(schema),
446            scan_fn,
447            pyarrow,
448            validate_schema,
449            is_pure,
450        )
451        .into())
452    }
453
454    #[staticmethod]
455    fn scan_from_python_function_schema_function(
456        schema_fn: Py<PyAny>,
457        scan_fn: Py<PyAny>,
458        validate_schema: bool,
459        is_pure: bool,
460    ) -> PyResult<Self> {
461        Ok(LazyFrame::scan_from_python_function(
462            Either::Left(schema_fn),
463            scan_fn,
464            false,
465            validate_schema,
466            is_pure,
467        )
468        .into())
469    }
470
471    fn describe_plan(&self, py: Python) -> PyResult<String> {
472        py.enter_polars(|| self.ldf.read().describe_plan())
473    }
474
475    fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
476        py.enter_polars(|| self.ldf.read().describe_optimized_plan())
477    }
478
479    fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
480        py.enter_polars(|| self.ldf.read().describe_plan_tree())
481    }
482
483    fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
484        py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
485    }
486
487    fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
488        py.enter_polars(|| self.ldf.read().to_dot(optimized))
489    }
490
491    #[cfg(feature = "streaming")]
492    fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
493        py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
494    }
495
496    fn sort(
497        &self,
498        by_column: &str,
499        descending: bool,
500        nulls_last: bool,
501        maintain_order: bool,
502        multithreaded: bool,
503    ) -> Self {
504        let ldf = self.ldf.read().clone();
505        ldf.sort(
506            [by_column],
507            SortMultipleOptions {
508                descending: vec![descending],
509                nulls_last: vec![nulls_last],
510                multithreaded,
511                maintain_order,
512                limit: None,
513            },
514        )
515        .into()
516    }
517
518    fn sort_by_exprs(
519        &self,
520        by: Vec<PyExpr>,
521        descending: Vec<bool>,
522        nulls_last: Vec<bool>,
523        maintain_order: bool,
524        multithreaded: bool,
525    ) -> Self {
526        let ldf = self.ldf.read().clone();
527        let exprs = by.to_exprs();
528        ldf.sort_by_exprs(
529            exprs,
530            SortMultipleOptions {
531                descending,
532                nulls_last,
533                maintain_order,
534                multithreaded,
535                limit: None,
536            },
537        )
538        .into()
539    }
540
541    fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
542        let ldf = self.ldf.read().clone();
543        let exprs = by.to_exprs();
544        ldf.top_k(
545            k,
546            exprs,
547            SortMultipleOptions::new().with_order_descending_multi(reverse),
548        )
549        .into()
550    }
551
552    fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
553        let ldf = self.ldf.read().clone();
554        let exprs = by.to_exprs();
555        ldf.bottom_k(
556            k,
557            exprs,
558            SortMultipleOptions::new().with_order_descending_multi(reverse),
559        )
560        .into()
561    }
562
563    fn cache(&self) -> Self {
564        let ldf = self.ldf.read().clone();
565        ldf.cache().into()
566    }
567
568    #[pyo3(signature = (optflags))]
569    fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
570        let ldf = self.ldf.read().clone();
571        ldf.with_optimizations(optflags.inner.into_inner()).into()
572    }
573
574    #[pyo3(signature = (lambda_post_opt))]
575    fn profile(
576        &self,
577        py: Python<'_>,
578        lambda_post_opt: Option<Py<PyAny>>,
579    ) -> PyResult<(PyDataFrame, PyDataFrame)> {
580        let (df, time_df) = py.enter_polars(|| {
581            let ldf = self.ldf.read().clone();
582            if let Some(lambda) = lambda_post_opt {
583                ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
584                    post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
585                })
586            } else {
587                ldf.profile()
588            }
589        })?;
590        Ok((df.into(), time_df.into()))
591    }
592
593    #[pyo3(signature = (engine, lambda_post_opt))]
594    fn collect(
595        &self,
596        py: Python<'_>,
597        engine: Wrap<Engine>,
598        lambda_post_opt: Option<Py<PyAny>>,
599    ) -> PyResult<PyDataFrame> {
600        py.enter_polars_df(|| {
601            let ldf = self.ldf.read().clone();
602            if let Some(lambda) = lambda_post_opt {
603                ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
604                    post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
605                })
606            } else {
607                ldf.collect_with_engine(engine.0).map(|r| match r {
608                    QueryResult::Single(df) => df,
609                    // TODO: Should return query results
610                    QueryResult::Multiple(_) => DataFrame::empty(),
611                })
612            }
613        })
614    }
615
616    #[cfg(feature = "async")]
617    #[pyo3(signature = (engine, lambda))]
618    fn collect_with_callback(
619        &self,
620        py: Python<'_>,
621        engine: Wrap<Engine>,
622        lambda: Py<PyAny>,
623    ) -> PyResult<()> {
624        py.enter_polars_ok(|| {
625            let ldf = self.ldf.read().clone();
626
627            // We use a tokio spawn_blocking here as it has a high blocking
628            // thread pool limit.
629            polars_core::runtime::ASYNC.spawn_blocking(move || {
630                let result = ldf
631                    .collect_with_engine(engine.0)
632                    .map(|r| match r {
633                        QueryResult::Single(df) => df,
634                        // TODO: Should return query results
635                        QueryResult::Multiple(_) => DataFrame::empty(),
636                    })
637                    .map(PyDataFrame::new)
638                    .map_err(PyPolarsErr::from);
639
640                Python::attach(|py| match result {
641                    Ok(df) => {
642                        lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
643                    },
644                    Err(err) => {
645                        lambda
646                            .call1(py, (PyErr::from(err),))
647                            .map_err(|err| err.restore(py))
648                            .ok();
649                    },
650                });
651            });
652        })
653    }
654
655    #[cfg(feature = "async")]
656    fn collect_batches(
657        &self,
658        py: Python<'_>,
659        engine: Wrap<Engine>,
660        maintain_order: bool,
661        chunk_size: Option<NonZeroUsize>,
662        lazy: bool,
663    ) -> PyResult<PyCollectBatches> {
664        py.enter_polars(|| {
665            let ldf = self.ldf.read().clone();
666
667            let collect_batches = ldf
668                .clone()
669                .collect_batches(engine.0, maintain_order, chunk_size, lazy)
670                .map_err(PyPolarsErr::from)?;
671
672            PyResult::Ok(PyCollectBatches {
673                inner: Arc::new(Mutex::new(collect_batches)),
674                ldf,
675            })
676        })
677    }
678
679    #[cfg(feature = "parquet")]
680    #[pyo3(signature = (
681        target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size,
682        metadata, arrow_schema
683    ))]
684    fn sink_parquet(
685        &self,
686        py: Python<'_>,
687        target: PyFileSinkDestination,
688        sink_options: PySinkOptions,
689        compression: &str,
690        compression_level: Option<i32>,
691        statistics: Wrap<StatisticsOptions>,
692        row_group_size: Option<usize>,
693        data_page_size: Option<usize>,
694        metadata: Wrap<Option<KeyValueMetadata>>,
695        arrow_schema: Option<Wrap<ArrowSchema>>,
696    ) -> PyResult<PyLazyFrame> {
697        let compression = parse_parquet_compression(compression, compression_level)?;
698
699        let options = ParquetWriteOptions {
700            compression,
701            statistics: statistics.0,
702            row_group_size,
703            data_page_size,
704            key_value_metadata: metadata.0,
705            arrow_schema: arrow_schema.map(|x| Arc::new(x.0)),
706            compat_level: None,
707        };
708
709        let target = target.extract_file_sink_destination()?;
710        let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
711
712        py.enter_polars(|| {
713            self.ldf
714                .read()
715                .clone()
716                .sink(
717                    target,
718                    FileWriteFormat::Parquet(Arc::new(options)),
719                    unified_sink_args,
720                )
721                .into()
722        })
723        .map(Into::into)
724        .map_err(Into::into)
725    }
726
727    #[cfg(feature = "ipc")]
728    #[pyo3(signature = (
729        target, sink_options, compression, compat_level, record_batch_size, record_batch_statistics
730    ))]
731    fn sink_ipc(
732        &self,
733        py: Python<'_>,
734        target: PyFileSinkDestination,
735        sink_options: PySinkOptions,
736        compression: Wrap<Option<IpcCompression>>,
737        compat_level: PyCompatLevel,
738        record_batch_size: Option<usize>,
739        record_batch_statistics: bool,
740    ) -> PyResult<PyLazyFrame> {
741        let options = IpcWriterOptions {
742            compression: compression.0,
743            compat_level: compat_level.0,
744            record_batch_size,
745            record_batch_statistics,
746        };
747
748        let target = target.extract_file_sink_destination()?;
749        let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
750
751        py.enter_polars(|| {
752            self.ldf
753                .read()
754                .clone()
755                .sink(target, FileWriteFormat::Ipc(options), unified_sink_args)
756                .into()
757        })
758        .map(Into::into)
759        .map_err(Into::into)
760    }
761
762    #[cfg(feature = "csv")]
763    #[pyo3(signature = (
764        target, sink_options, include_bom, compression, compression_level, check_extension,
765        include_header, separator, line_terminator, quote_char, batch_size, datetime_format,
766        date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
767        quote_style
768    ))]
769    fn sink_csv(
770        &self,
771        py: Python<'_>,
772        target: PyFileSinkDestination,
773        sink_options: PySinkOptions,
774        include_bom: bool,
775        compression: &str,
776        compression_level: Option<u32>,
777        check_extension: bool,
778        include_header: bool,
779        separator: u8,
780        line_terminator: Wrap<PlSmallStr>,
781        quote_char: u8,
782        batch_size: NonZeroUsize,
783        datetime_format: Option<Wrap<PlSmallStr>>,
784        date_format: Option<Wrap<PlSmallStr>>,
785        time_format: Option<Wrap<PlSmallStr>>,
786        float_scientific: Option<bool>,
787        float_precision: Option<usize>,
788        decimal_comma: bool,
789        null_value: Option<Wrap<PlSmallStr>>,
790        quote_style: Option<Wrap<QuoteStyle>>,
791    ) -> PyResult<PyLazyFrame> {
792        let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
793        let null_value = null_value
794            .map(|x| x.0)
795            .unwrap_or(SerializeOptions::default().null);
796
797        let serialize_options = SerializeOptions {
798            date_format: date_format.map(|x| x.0),
799            time_format: time_format.map(|x| x.0),
800            datetime_format: datetime_format.map(|x| x.0),
801            float_scientific,
802            float_precision,
803            decimal_comma,
804            separator,
805            quote_char,
806            null: null_value,
807            line_terminator: line_terminator.0,
808            quote_style,
809        };
810
811        let options = CsvWriterOptions {
812            include_bom,
813            compression: ExternalCompression::try_from(compression, compression_level)
814                .map_err(PyPolarsErr::from)?,
815            check_extension,
816            include_header,
817            batch_size,
818            serialize_options: serialize_options.into(),
819        };
820
821        let target = target.extract_file_sink_destination()?;
822        let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
823
824        py.enter_polars(|| {
825            self.ldf
826                .read()
827                .clone()
828                .sink(target, FileWriteFormat::Csv(options), unified_sink_args)
829                .into()
830        })
831        .map(Into::into)
832        .map_err(Into::into)
833    }
834
835    #[allow(clippy::too_many_arguments)]
836    #[cfg(feature = "json")]
837    #[pyo3(signature = (target, compression, compression_level, check_extension, sink_options))]
838    fn sink_ndjson(
839        &self,
840        py: Python<'_>,
841        target: PyFileSinkDestination,
842        compression: &str,
843        compression_level: Option<u32>,
844        check_extension: bool,
845        sink_options: PySinkOptions,
846    ) -> PyResult<PyLazyFrame> {
847        let options = NDJsonWriterOptions {
848            compression: ExternalCompression::try_from(compression, compression_level)
849                .map_err(PyPolarsErr::from)?,
850            check_extension,
851        };
852
853        let target = target.extract_file_sink_destination()?;
854        let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
855
856        py.enter_polars(|| {
857            self.ldf
858                .read()
859                .clone()
860                .sink(target, FileWriteFormat::NDJson(options), unified_sink_args)
861                .into()
862        })
863        .map(Into::into)
864        .map_err(Into::into)
865    }
866
867    #[pyo3(signature = (function, maintain_order, chunk_size))]
868    pub fn sink_batches(
869        &self,
870        py: Python<'_>,
871        function: Py<PyAny>,
872        maintain_order: bool,
873        chunk_size: Option<NonZeroUsize>,
874    ) -> PyResult<PyLazyFrame> {
875        let ldf = self.ldf.read().clone();
876        py.enter_polars(|| {
877            ldf.sink_batches(
878                PlanCallback::new_python(PythonObject(function)),
879                maintain_order,
880                chunk_size,
881            )
882        })
883        .map(Into::into)
884        .map_err(Into::into)
885    }
886
887    pub fn sink_iceberg(&self, py: Python<'_>, sink_state_obj: Py<PyAny>) -> PyResult<PyLazyFrame> {
888        let sink_state: IcebergSinkState = sink_state_obj.extract(py)?;
889        let mut ldf = { self.ldf.read().clone() };
890
891        ldf.logical_plan = DslPlan::Sink {
892            input: Arc::new(ldf.logical_plan),
893            payload: SinkType::Iceberg(sink_state),
894        };
895
896        Ok(ldf.into())
897    }
898
899    fn filter(&self, predicate: PyExpr) -> Self {
900        self.ldf.read().clone().filter(predicate.inner).into()
901    }
902
903    fn remove(&self, predicate: PyExpr) -> Self {
904        let ldf = self.ldf.read().clone();
905        ldf.remove(predicate.inner).into()
906    }
907
908    fn select(&self, exprs: Vec<PyExpr>) -> Self {
909        let ldf = self.ldf.read().clone();
910        let exprs = exprs.to_exprs();
911        ldf.select(exprs).into()
912    }
913
914    fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
915        let ldf = self.ldf.read().clone();
916        let exprs = exprs.to_exprs();
917        ldf.select_seq(exprs).into()
918    }
919
920    fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
921        let ldf = self.ldf.read().clone();
922        let by = by.to_exprs();
923        let lazy_gb = if maintain_order {
924            ldf.group_by_stable(by)
925        } else {
926            ldf.group_by(by)
927        };
928
929        PyLazyGroupBy { lgb: Some(lazy_gb) }
930    }
931
932    fn rolling(
933        &self,
934        index_column: PyExpr,
935        period: &str,
936        offset: &str,
937        closed: Wrap<ClosedWindow>,
938        by: Vec<PyExpr>,
939    ) -> PyResult<PyLazyGroupBy> {
940        let closed_window = closed.0;
941        let ldf = self.ldf.read().clone();
942        let by = by
943            .into_iter()
944            .map(|pyexpr| pyexpr.inner)
945            .collect::<Vec<_>>();
946        let lazy_gb = ldf.rolling(
947            index_column.inner,
948            by,
949            RollingGroupOptions {
950                index_column: "".into(),
951                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
952                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
953                closed_window,
954            },
955        );
956
957        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
958    }
959
960    fn group_by_dynamic(
961        &self,
962        index_column: PyExpr,
963        every: &str,
964        period: &str,
965        offset: &str,
966        label: Wrap<Label>,
967        include_boundaries: bool,
968        closed: Wrap<ClosedWindow>,
969        group_by: Vec<PyExpr>,
970        start_by: Wrap<StartBy>,
971    ) -> PyResult<PyLazyGroupBy> {
972        let closed_window = closed.0;
973        let group_by = group_by
974            .into_iter()
975            .map(|pyexpr| pyexpr.inner)
976            .collect::<Vec<_>>();
977        let ldf = self.ldf.read().clone();
978        let lazy_gb = ldf.group_by_dynamic(
979            index_column.inner,
980            group_by,
981            DynamicGroupOptions {
982                every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
983                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
984                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
985                label: label.0,
986                include_boundaries,
987                closed_window,
988                start_by: start_by.0,
989                ..Default::default()
990            },
991        );
992
993        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
994    }
995
996    fn with_context(&self, contexts: Vec<Self>) -> Self {
997        let contexts = contexts
998            .into_iter()
999            .map(|ldf| ldf.ldf.into_inner())
1000            .collect::<Vec<_>>();
1001        self.ldf.read().clone().with_context(contexts).into()
1002    }
1003
1004    #[cfg(feature = "asof_join")]
1005    #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1006    fn join_asof(
1007        &self,
1008        other: Self,
1009        left_on: PyExpr,
1010        right_on: PyExpr,
1011        left_by: Option<Vec<PyBackedStr>>,
1012        right_by: Option<Vec<PyBackedStr>>,
1013        allow_parallel: bool,
1014        force_parallel: bool,
1015        suffix: String,
1016        strategy: Wrap<AsofStrategy>,
1017        tolerance: Option<Wrap<AnyValue<'_>>>,
1018        tolerance_str: Option<String>,
1019        coalesce: bool,
1020        allow_eq: bool,
1021        check_sortedness: bool,
1022    ) -> PyResult<Self> {
1023        let coalesce = if coalesce {
1024            JoinCoalesce::CoalesceColumns
1025        } else {
1026            JoinCoalesce::KeepColumns
1027        };
1028        let ldf = self.ldf.read().clone();
1029        let other = other.ldf.into_inner();
1030        let left_on = left_on.inner;
1031        let right_on = right_on.inner;
1032        Ok(ldf
1033            .join_builder()
1034            .with(other)
1035            .left_on([left_on])
1036            .right_on([right_on])
1037            .allow_parallel(allow_parallel)
1038            .force_parallel(force_parallel)
1039            .coalesce(coalesce)
1040            .how(JoinType::AsOf(Box::new(AsOfOptions {
1041                strategy: strategy.0,
1042                left_by: left_by.map(strings_to_pl_smallstr),
1043                right_by: right_by.map(strings_to_pl_smallstr),
1044                tolerance: tolerance.map(|t| {
1045                    let av = t.0.into_static();
1046                    let dtype = av.dtype();
1047                    Scalar::new(dtype, av)
1048                }),
1049                tolerance_str: tolerance_str.map(|s| s.into()),
1050                allow_eq,
1051                check_sortedness,
1052            })))
1053            .suffix(suffix)
1054            .finish()
1055            .into())
1056    }
1057
1058    #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1059    fn join(
1060        &self,
1061        other: Self,
1062        left_on: Vec<PyExpr>,
1063        right_on: Vec<PyExpr>,
1064        allow_parallel: bool,
1065        force_parallel: bool,
1066        nulls_equal: bool,
1067        how: Wrap<JoinType>,
1068        suffix: String,
1069        validate: Wrap<JoinValidation>,
1070        maintain_order: Wrap<MaintainOrderJoin>,
1071        coalesce: Option<bool>,
1072    ) -> PyResult<Self> {
1073        let coalesce = match coalesce {
1074            None => JoinCoalesce::JoinSpecific,
1075            Some(true) => JoinCoalesce::CoalesceColumns,
1076            Some(false) => JoinCoalesce::KeepColumns,
1077        };
1078        let ldf = self.ldf.read().clone();
1079        let other = other.ldf.into_inner();
1080        let left_on = left_on
1081            .into_iter()
1082            .map(|pyexpr| pyexpr.inner)
1083            .collect::<Vec<_>>();
1084        let right_on = right_on
1085            .into_iter()
1086            .map(|pyexpr| pyexpr.inner)
1087            .collect::<Vec<_>>();
1088
1089        Ok(ldf
1090            .join_builder()
1091            .with(other)
1092            .left_on(left_on)
1093            .right_on(right_on)
1094            .allow_parallel(allow_parallel)
1095            .force_parallel(force_parallel)
1096            .join_nulls(nulls_equal)
1097            .how(how.0)
1098            .suffix(suffix)
1099            .validate(validate.0)
1100            .coalesce(coalesce)
1101            .maintain_order(maintain_order.0)
1102            .finish()
1103            .into())
1104    }
1105
1106    fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1107        let ldf = self.ldf.read().clone();
1108        let other = other.ldf.into_inner();
1109
1110        let predicates = predicates.to_exprs();
1111
1112        Ok(ldf
1113            .join_builder()
1114            .with(other)
1115            .suffix(suffix)
1116            .join_where(predicates)
1117            .into())
1118    }
1119
1120    fn gather(&self, idxs: Self, null_on_oob: bool) -> Self {
1121        let ldf = self.ldf.read().clone();
1122        let idxs = idxs.ldf.into_inner();
1123        ldf.gather(idxs, null_on_oob).into()
1124    }
1125
1126    fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1127        let ldf = self.ldf.read().clone();
1128        ldf.with_columns(exprs.to_exprs()).into()
1129    }
1130
1131    fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1132        let ldf = self.ldf.read().clone();
1133        ldf.with_columns_seq(exprs.to_exprs()).into()
1134    }
1135
1136    fn match_to_schema<'py>(
1137        &self,
1138        schema: Wrap<Schema>,
1139        missing_columns: &Bound<'py, PyAny>,
1140        missing_struct_fields: &Bound<'py, PyAny>,
1141        extra_columns: Wrap<ExtraColumnsPolicy>,
1142        extra_struct_fields: &Bound<'py, PyAny>,
1143        integer_cast: &Bound<'py, PyAny>,
1144        float_cast: &Bound<'py, PyAny>,
1145    ) -> PyResult<Self> {
1146        fn parse_missing_columns<'py>(
1147            schema: &Schema,
1148            missing_columns: &Bound<'py, PyAny>,
1149        ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1150            let mut out = Vec::with_capacity(schema.len());
1151            if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1152                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1153            } else if let Ok(dict) = missing_columns.cast::<PyDict>() {
1154                out.extend(std::iter::repeat_n(
1155                    MissingColumnsPolicyOrExpr::Raise,
1156                    schema.len(),
1157                ));
1158                for (key, value) in dict.iter() {
1159                    let key = key.extract::<String>()?;
1160                    let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1161                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1162                }
1163            } else {
1164                return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1165            }
1166            Ok(out)
1167        }
1168        fn parse_missing_struct_fields<'py>(
1169            schema: &Schema,
1170            missing_struct_fields: &Bound<'py, PyAny>,
1171        ) -> PyResult<Vec<MissingColumnsPolicy>> {
1172            let mut out = Vec::with_capacity(schema.len());
1173            if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1174                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1175            } else if let Ok(dict) = missing_struct_fields.cast::<PyDict>() {
1176                out.extend(std::iter::repeat_n(
1177                    MissingColumnsPolicy::Raise,
1178                    schema.len(),
1179                ));
1180                for (key, value) in dict.iter() {
1181                    let key = key.extract::<String>()?;
1182                    let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1183                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1184                }
1185            } else {
1186                return Err(PyTypeError::new_err(
1187                    "Invalid value for `missing_struct_fields`",
1188                ));
1189            }
1190            Ok(out)
1191        }
1192        fn parse_extra_struct_fields<'py>(
1193            schema: &Schema,
1194            extra_struct_fields: &Bound<'py, PyAny>,
1195        ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1196            let mut out = Vec::with_capacity(schema.len());
1197            if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1198                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1199            } else if let Ok(dict) = extra_struct_fields.cast::<PyDict>() {
1200                out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1201                for (key, value) in dict.iter() {
1202                    let key = key.extract::<String>()?;
1203                    let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1204                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1205                }
1206            } else {
1207                return Err(PyTypeError::new_err(
1208                    "Invalid value for `extra_struct_fields`",
1209                ));
1210            }
1211            Ok(out)
1212        }
1213        fn parse_cast<'py>(
1214            schema: &Schema,
1215            cast: &Bound<'py, PyAny>,
1216        ) -> PyResult<Vec<UpcastOrForbid>> {
1217            let mut out = Vec::with_capacity(schema.len());
1218            if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1219                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1220            } else if let Ok(dict) = cast.cast::<PyDict>() {
1221                out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1222                for (key, value) in dict.iter() {
1223                    let key = key.extract::<String>()?;
1224                    let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1225                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1226                }
1227            } else {
1228                return Err(PyTypeError::new_err(
1229                    "Invalid value for `integer_cast` / `float_cast`",
1230                ));
1231            }
1232            Ok(out)
1233        }
1234
1235        let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1236        let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1237        let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1238        let integer_cast = parse_cast(&schema.0, integer_cast)?;
1239        let float_cast = parse_cast(&schema.0, float_cast)?;
1240
1241        let per_column = (0..schema.0.len())
1242            .map(|i| MatchToSchemaPerColumn {
1243                missing_columns: missing_columns[i].clone(),
1244                missing_struct_fields: missing_struct_fields[i],
1245                extra_struct_fields: extra_struct_fields[i],
1246                integer_cast: integer_cast[i],
1247                float_cast: float_cast[i],
1248            })
1249            .collect();
1250
1251        let ldf = self.ldf.read().clone();
1252        Ok(ldf
1253            .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1254            .into())
1255    }
1256
1257    fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {
1258        let ldf = self.ldf.read().clone();
1259        let function = PythonObject(callback);
1260        ldf.pipe_with_schema(PlanCallback::new_python(function))
1261            .into()
1262    }
1263
1264    fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1265        let ldf = self.ldf.read().clone();
1266        ldf.rename(existing, new, strict).into()
1267    }
1268
1269    fn reverse(&self) -> Self {
1270        let ldf = self.ldf.read().clone();
1271        ldf.reverse().into()
1272    }
1273
1274    #[pyo3(signature = (n, fill_value=None))]
1275    fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1276        let lf = self.ldf.read().clone();
1277        let out = match fill_value {
1278            Some(v) => lf.shift_and_fill(n.inner, v.inner),
1279            None => lf.shift(n.inner),
1280        };
1281        out.into()
1282    }
1283
1284    fn fill_nan(&self, fill_value: PyExpr) -> Self {
1285        let ldf = self.ldf.read().clone();
1286        ldf.fill_nan(fill_value.inner).into()
1287    }
1288
1289    fn min(&self) -> Self {
1290        let ldf = self.ldf.read().clone();
1291        let out = ldf.min();
1292        out.into()
1293    }
1294
1295    fn max(&self) -> Self {
1296        let ldf = self.ldf.read().clone();
1297        let out = ldf.max();
1298        out.into()
1299    }
1300
1301    fn sum(&self) -> Self {
1302        let ldf = self.ldf.read().clone();
1303        let out = ldf.sum();
1304        out.into()
1305    }
1306
1307    fn mean(&self) -> Self {
1308        let ldf = self.ldf.read().clone();
1309        let out = ldf.mean();
1310        out.into()
1311    }
1312
1313    fn std(&self, ddof: u8) -> Self {
1314        let ldf = self.ldf.read().clone();
1315        let out = ldf.std(ddof);
1316        out.into()
1317    }
1318
1319    fn var(&self, ddof: u8) -> Self {
1320        let ldf = self.ldf.read().clone();
1321        let out = ldf.var(ddof);
1322        out.into()
1323    }
1324
1325    fn median(&self) -> Self {
1326        let ldf = self.ldf.read().clone();
1327        let out = ldf.median();
1328        out.into()
1329    }
1330
1331    fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1332        let ldf = self.ldf.read().clone();
1333        let out = ldf.quantile(quantile.inner, interpolation.0);
1334        out.into()
1335    }
1336
1337    fn explode(&self, subset: PySelector, empty_as_null: bool, keep_nulls: bool) -> Self {
1338        self.ldf
1339            .read()
1340            .clone()
1341            .explode(
1342                subset.inner,
1343                ExplodeOptions {
1344                    empty_as_null,
1345                    keep_nulls,
1346                },
1347            )
1348            .into()
1349    }
1350
1351    fn null_count(&self) -> Self {
1352        let ldf = self.ldf.read().clone();
1353        ldf.null_count().into()
1354    }
1355
1356    #[pyo3(signature = (maintain_order, subset, keep))]
1357    fn unique(
1358        &self,
1359        maintain_order: bool,
1360        subset: Option<Vec<PyExpr>>,
1361        keep: Wrap<UniqueKeepStrategy>,
1362    ) -> Self {
1363        let ldf = self.ldf.read().clone();
1364        let subset = subset.map(|exprs| exprs.into_iter().map(|e| e.inner).collect());
1365        match maintain_order {
1366            true => ldf.unique_stable_generic(subset, keep.0),
1367            false => ldf.unique_generic(subset, keep.0),
1368        }
1369        .into()
1370    }
1371
1372    fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1373        self.ldf
1374            .read()
1375            .clone()
1376            .drop_nans(subset.map(|e| e.inner))
1377            .into()
1378    }
1379
1380    fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1381        self.ldf
1382            .read()
1383            .clone()
1384            .drop_nulls(subset.map(|e| e.inner))
1385            .into()
1386    }
1387
1388    #[pyo3(signature = (offset, len=None))]
1389    fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1390        let ldf = self.ldf.read().clone();
1391        ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1392    }
1393
1394    fn tail(&self, n: IdxSize) -> Self {
1395        let ldf = self.ldf.read().clone();
1396        ldf.tail(n).into()
1397    }
1398
1399    #[cfg(feature = "pivot")]
1400    #[pyo3(signature = (on, on_columns, index, values, agg, maintain_order, separator, column_naming))]
1401    fn pivot(
1402        &self,
1403        on: PySelector,
1404        on_columns: PyDataFrame,
1405        index: PySelector,
1406        values: PySelector,
1407        agg: PyExpr,
1408        maintain_order: bool,
1409        separator: String,
1410        column_naming: Wrap<PivotColumnNaming>,
1411    ) -> Self {
1412        let ldf = self.ldf.read().clone();
1413        ldf.pivot(
1414            on.inner,
1415            Arc::new(on_columns.df.read().clone()),
1416            index.inner,
1417            values.inner,
1418            agg.inner,
1419            maintain_order,
1420            separator.into(),
1421            column_naming.0,
1422        )
1423        .into()
1424    }
1425
1426    #[cfg(feature = "pivot")]
1427    #[pyo3(signature = (on, index, value_name, variable_name))]
1428    fn unpivot(
1429        &self,
1430        on: Option<PySelector>,
1431        index: PySelector,
1432        value_name: Option<String>,
1433        variable_name: Option<String>,
1434    ) -> Self {
1435        let args = UnpivotArgsDSL {
1436            on: on.map(|on| on.inner),
1437            index: index.inner,
1438            value_name: value_name.map(|s| s.into()),
1439            variable_name: variable_name.map(|s| s.into()),
1440        };
1441
1442        let ldf = self.ldf.read().clone();
1443        ldf.unpivot(args).into()
1444    }
1445
1446    #[pyo3(signature = (name, offset=None))]
1447    fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1448        let ldf = self.ldf.read().clone();
1449        ldf.with_row_index(name, offset).into()
1450    }
1451
1452    #[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1453    fn map_batches(
1454        &self,
1455        function: Py<PyAny>,
1456        predicate_pushdown: bool,
1457        projection_pushdown: bool,
1458        slice_pushdown: bool,
1459        streamable: bool,
1460        schema: Option<Wrap<Schema>>,
1461        validate_output: bool,
1462    ) -> Self {
1463        let mut opt = OptFlags::default();
1464        opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1465        opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1466        opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1467        opt.set(OptFlags::STREAMING, streamable);
1468
1469        self.ldf
1470            .read()
1471            .clone()
1472            .map_python(
1473                function.into(),
1474                opt,
1475                schema.map(|s| Arc::new(s.0)),
1476                validate_output,
1477            )
1478            .into()
1479    }
1480
1481    fn drop(&self, columns: PySelector) -> Self {
1482        self.ldf.read().clone().drop(columns.inner).into()
1483    }
1484
1485    fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1486        let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1487        cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1488        self.ldf.read().clone().cast(cast_map, strict).into()
1489    }
1490
1491    fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1492        self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1493    }
1494
1495    fn clone(&self) -> Self {
1496        self.ldf.read().clone().into()
1497    }
1498
1499    fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1500        let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1501
1502        let schema_dict = PyDict::new(py);
1503        schema.iter_fields().for_each(|fld| {
1504            schema_dict
1505                .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1506                .unwrap()
1507        });
1508        Ok(schema_dict)
1509    }
1510
1511    fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {
1512        self.ldf
1513            .read()
1514            .clone()
1515            .unnest(columns.inner, separator.map(PlSmallStr::from_str))
1516            .into()
1517    }
1518
1519    fn count(&self) -> Self {
1520        let ldf = self.ldf.read().clone();
1521        ldf.count().into()
1522    }
1523
1524    #[cfg(feature = "merge_sorted")]
1525    fn merge_sorted(&self, other: Self, key: &str, maintain_order: bool) -> PyResult<Self> {
1526        let out = self
1527            .ldf
1528            .read()
1529            .clone()
1530            .merge_sorted(other.ldf.into_inner(), key, maintain_order)
1531            .map_err(PyPolarsErr::from)?;
1532        Ok(out.into())
1533    }
1534
1535    fn _node_name(&self) -> &str {
1536        let plan = &self.ldf.read().logical_plan;
1537        plan.into()
1538    }
1539
1540    fn hint_sorted(
1541        &self,
1542        columns: Vec<String>,
1543        descending: Vec<bool>,
1544        nulls_last: Vec<bool>,
1545    ) -> PyResult<Self> {
1546        if columns.len() != descending.len() && descending.len() != 1 {
1547            return Err(PyValueError::new_err(
1548                "`set_sorted` expects the same amount of `columns` as `descending` values.",
1549            ));
1550        }
1551        if columns.len() != nulls_last.len() && nulls_last.len() != 1 {
1552            return Err(PyValueError::new_err(
1553                "`set_sorted` expects the same amount of `columns` as `nulls_last` values.",
1554            ));
1555        }
1556
1557        let mut sorted = columns
1558            .iter()
1559            .map(|c| Sorted {
1560                column: PlSmallStr::from_str(c.as_str()),
1561                descending: Some(false),
1562                nulls_last: Some(false),
1563            })
1564            .collect::<Vec<_>>();
1565
1566        if !columns.is_empty() {
1567            if descending.len() != 1 {
1568                sorted
1569                    .iter_mut()
1570                    .zip(descending)
1571                    .for_each(|(s, d)| s.descending = Some(d));
1572            } else if descending[0] {
1573                sorted.iter_mut().for_each(|s| s.descending = Some(true));
1574            }
1575
1576            if nulls_last.len() != 1 {
1577                sorted
1578                    .iter_mut()
1579                    .zip(nulls_last)
1580                    .for_each(|(s, d)| s.nulls_last = Some(d));
1581            } else if nulls_last[0] {
1582                sorted.iter_mut().for_each(|s| s.nulls_last = Some(true));
1583            }
1584        }
1585
1586        let out = self
1587            .ldf
1588            .read()
1589            .clone()
1590            .hint(HintIR::Sorted(sorted.into()))
1591            .map_err(PyPolarsErr::from)?;
1592        Ok(out.into())
1593    }
1594}
1595
1596#[pyclass(frozen)]
1597struct PyCollectBatches {
1598    inner: Arc<Mutex<CollectBatches>>,
1599    ldf: LazyFrame,
1600}
1601
1602#[pymethods]
1603impl PyCollectBatches {
1604    fn start(&self) {
1605        self.inner.lock().start();
1606    }
1607
1608    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
1609        slf
1610    }
1611
1612    fn __next__(slf: PyRef<'_, Self>, py: Python) -> PyResult<Option<PyDataFrame>> {
1613        let inner = Arc::clone(&slf.inner);
1614        py.enter_polars(|| PolarsResult::Ok(inner.lock().next().transpose()?.map(PyDataFrame::new)))
1615    }
1616
1617    #[allow(unused_variables)]
1618    #[pyo3(signature = (requested_schema=None))]
1619    fn __arrow_c_stream__<'py>(
1620        &self,
1621        py: Python<'py>,
1622        requested_schema: Option<Py<PyAny>>,
1623    ) -> PyResult<Bound<'py, PyCapsule>> {
1624        let mut ldf = self.ldf.clone();
1625        let schema = ldf
1626            .collect_schema()
1627            .map_err(PyPolarsErr::from)?
1628            .to_arrow(CompatLevel::newest());
1629
1630        let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());
1631
1632        let iter = Box::new(ArrowStreamIterator::new(self.inner.clone(), dtype.clone()));
1633        let field = ArrowField::new(PlSmallStr::EMPTY, dtype, false);
1634        let stream = export_iterator(iter, field);
1635        let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
1636        PyCapsule::new(py, stream, Some(stream_capsule_name))
1637    }
1638}
1639
1640pub struct ArrowStreamIterator {
1641    inner: Arc<Mutex<CollectBatches>>,
1642    dtype: ArrowDataType,
1643}
1644
1645impl ArrowStreamIterator {
1646    fn new(inner: Arc<Mutex<CollectBatches>>, schema: ArrowDataType) -> Self {
1647        Self {
1648            inner,
1649            dtype: schema,
1650        }
1651    }
1652}
1653
1654impl Iterator for ArrowStreamIterator {
1655    type Item = PolarsResult<ArrayRef>;
1656
1657    fn next(&mut self) -> Option<Self::Item> {
1658        let next = self.inner.lock().next();
1659        match next {
1660            None => None,
1661            Some(Err(err)) => Some(Err(err)),
1662            Some(Ok(df)) => {
1663                let height = df.height();
1664                let arrays = df.rechunk_into_arrow(CompatLevel::newest());
1665                Some(Ok(Box::new(arrow::array::StructArray::new(
1666                    self.dtype.clone(),
1667                    height,
1668                    arrays,
1669                    None,
1670                ))))
1671            },
1672        }
1673    }
1674}