Skip to main content

polars_python/lazyframe/
general.rs

1use std::collections::HashMap;
2use std::ffi::CString;
3use std::num::NonZeroUsize;
4
5use arrow::ffi::export_iterator;
6use either::Either;
7use parking_lot::Mutex;
8use polars::io::RowIndex;
9use polars::time::*;
10use polars_core::prelude::*;
11#[cfg(feature = "parquet")]
12use polars_parquet::arrow::write::StatisticsOptions;
13use polars_plan::dsl::ScanSources;
14use polars_plan::plans::{AExpr, HintIR, IR, Sorted};
15use polars_utils::arena::{Arena, Node};
16use polars_utils::python_function::PythonObject;
17use pyo3::exceptions::{PyTypeError, PyValueError};
18use pyo3::prelude::*;
19use pyo3::pybacked::PyBackedStr;
20use pyo3::types::{PyCapsule, PyDict, PyDictMethods, PyList};
21
22use super::{PyLazyFrame, PyOptFlags};
23use crate::error::PyPolarsErr;
24use crate::expr::ToExprs;
25use crate::expr::datatype::PyDataTypeExpr;
26use crate::expr::selector::PySelector;
27use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
28#[cfg(feature = "json")]
29use crate::io::cloud_options::OptPyCloudOptions;
30use crate::io::scan_options::PyScanOptions;
31use crate::io::sink_options::PySinkOptions;
32use crate::io::sink_output::PyFileSinkDestination;
33use crate::lazyframe::visit::NodeTraverser;
34use crate::prelude::*;
35use crate::utils::{EnterPolarsExt, to_py_err};
36use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
37
38fn pyobject_to_first_path_and_scan_sources(
39    obj: Py<PyAny>,
40) -> PyResult<(Option<PlRefPath>, ScanSources)> {
41    use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
42    Ok(match get_python_scan_source_input(obj, false)? {
43        PythonScanSourceInput::Path(path) => (
44            Some(path.clone()),
45            ScanSources::Paths(FromIterator::from_iter([path])),
46        ),
47        PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
48        PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
49    })
50}
51
52fn post_opt_callback(
53    lambda: &Py<PyAny>,
54    root: Node,
55    lp_arena: &mut Arena<IR>,
56    expr_arena: &mut Arena<AExpr>,
57    duration_since_start: Option<std::time::Duration>,
58) -> PolarsResult<()> {
59    Python::attach(|py| {
60        let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
61
62        // Get a copy of the arenas.
63        let arenas = nt.get_arenas();
64
65        // Pass the node visitor which allows the python callback to replace parts of the query plan.
66        // Remove "cuda" or specify better once we have multiple post-opt callbacks.
67        lambda
68            .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
69            .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
70
71        // Unpack the arenas.
72        // At this point the `nt` is useless.
73
74        std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
75        std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
76
77        Ok(())
78    })
79}
80
81#[pymethods]
82#[allow(clippy::should_implement_trait)]
83impl PyLazyFrame {
84    #[staticmethod]
85    #[cfg(feature = "json")]
86    #[allow(clippy::too_many_arguments)]
87    #[pyo3(signature = (
88        source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
89        row_index, ignore_errors, include_file_paths, cloud_options, credential_provider
90    ))]
91    fn new_from_ndjson(
92        source: Option<Py<PyAny>>,
93        sources: Wrap<ScanSources>,
94        infer_schema_length: Option<usize>,
95        schema: Option<Wrap<Schema>>,
96        schema_overrides: Option<Wrap<Schema>>,
97        batch_size: Option<NonZeroUsize>,
98        n_rows: Option<usize>,
99        low_memory: bool,
100        rechunk: bool,
101        row_index: Option<(String, IdxSize)>,
102        ignore_errors: bool,
103        include_file_paths: Option<String>,
104        cloud_options: OptPyCloudOptions,
105        credential_provider: Option<Py<PyAny>>,
106    ) -> PyResult<Self> {
107        let row_index = row_index.map(|(name, offset)| RowIndex {
108            name: name.into(),
109            offset,
110        });
111
112        let sources = sources.0;
113        let (first_path, sources) = match source {
114            None => (sources.first_path().cloned(), sources),
115            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
116        };
117
118        let mut r = LazyJsonLineReader::new_with_sources(sources);
119
120        if let Some(first_path) = first_path {
121            let first_path_url = first_path.as_str();
122
123            let cloud_options = cloud_options.extract_opt_cloud_options(
124                CloudScheme::from_path(first_path_url),
125                credential_provider,
126            )?;
127
128            r = r.with_cloud_options(cloud_options);
129        };
130
131        let lf = r
132            .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
133            .with_batch_size(batch_size)
134            .with_n_rows(n_rows)
135            .low_memory(low_memory)
136            .with_rechunk(rechunk)
137            .with_schema(schema.map(|schema| Arc::new(schema.0)))
138            .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
139            .with_row_index(row_index)
140            .with_ignore_errors(ignore_errors)
141            .with_include_file_paths(include_file_paths.map(|x| x.into()))
142            .finish()
143            .map_err(PyPolarsErr::from)?;
144
145        Ok(lf.into())
146    }
147
148    #[staticmethod]
149    #[cfg(feature = "csv")]
150    #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
151        low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
152        infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
153        encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
154        cloud_options, credential_provider, include_file_paths
155    )
156    )]
157    fn new_from_csv(
158        source: Option<Py<PyAny>>,
159        sources: Wrap<ScanSources>,
160        separator: &str,
161        has_header: bool,
162        ignore_errors: bool,
163        skip_rows: usize,
164        skip_lines: usize,
165        n_rows: Option<usize>,
166        cache: bool,
167        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
168        low_memory: bool,
169        comment_prefix: Option<&str>,
170        quote_char: Option<&str>,
171        null_values: Option<Wrap<NullValues>>,
172        missing_utf8_is_empty_string: bool,
173        infer_schema_length: Option<usize>,
174        with_schema_modify: Option<Py<PyAny>>,
175        rechunk: bool,
176        skip_rows_after_header: usize,
177        encoding: Wrap<CsvEncoding>,
178        row_index: Option<(String, IdxSize)>,
179        try_parse_dates: bool,
180        eol_char: &str,
181        raise_if_empty: bool,
182        truncate_ragged_lines: bool,
183        decimal_comma: bool,
184        glob: bool,
185        schema: Option<Wrap<Schema>>,
186        cloud_options: OptPyCloudOptions,
187        credential_provider: Option<Py<PyAny>>,
188        include_file_paths: Option<String>,
189    ) -> PyResult<Self> {
190        let null_values = null_values.map(|w| w.0);
191        let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
192        let separator = separator
193            .as_bytes()
194            .first()
195            .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
196            .copied()
197            .map_err(PyPolarsErr::from)?;
198        let eol_char = eol_char
199            .as_bytes()
200            .first()
201            .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
202            .copied()
203            .map_err(PyPolarsErr::from)?;
204        let row_index = row_index.map(|(name, offset)| RowIndex {
205            name: name.into(),
206            offset,
207        });
208
209        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
210            overwrite_dtype
211                .into_iter()
212                .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
213                .collect::<Schema>()
214        });
215
216        let sources = sources.0;
217        let (first_path, sources) = match source {
218            None => (sources.first_path().cloned(), sources),
219            Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
220        };
221
222        let mut r = LazyCsvReader::new_with_sources(sources);
223
224        if let Some(first_path) = first_path {
225            let first_path_url = first_path.as_str();
226            let cloud_options = cloud_options.extract_opt_cloud_options(
227                CloudScheme::from_path(first_path_url),
228                credential_provider,
229            )?;
230            r = r.with_cloud_options(cloud_options);
231        }
232
233        let mut r = r
234            .with_infer_schema_length(infer_schema_length)
235            .with_separator(separator)
236            .with_has_header(has_header)
237            .with_ignore_errors(ignore_errors)
238            .with_skip_rows(skip_rows)
239            .with_skip_lines(skip_lines)
240            .with_n_rows(n_rows)
241            .with_cache(cache)
242            .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
243            .with_schema(schema.map(|schema| Arc::new(schema.0)))
244            .with_low_memory(low_memory)
245            .with_comment_prefix(comment_prefix.map(|x| x.into()))
246            .with_quote_char(quote_char)
247            .with_eol_char(eol_char)
248            .with_rechunk(rechunk)
249            .with_skip_rows_after_header(skip_rows_after_header)
250            .with_encoding(encoding.0)
251            .with_row_index(row_index)
252            .with_try_parse_dates(try_parse_dates)
253            .with_null_values(null_values)
254            .with_missing_is_null(!missing_utf8_is_empty_string)
255            .with_truncate_ragged_lines(truncate_ragged_lines)
256            .with_decimal_comma(decimal_comma)
257            .with_glob(glob)
258            .with_raise_if_empty(raise_if_empty)
259            .with_include_file_paths(include_file_paths.map(|x| x.into()));
260
261        if let Some(lambda) = with_schema_modify {
262            let f = |schema: Schema| {
263                let iter = schema.iter_names().map(|s| s.as_str());
264                Python::attach(|py| {
265                    let names = PyList::new(py, iter).unwrap();
266
267                    let out = lambda.call1(py, (names,)).expect("python function failed");
268                    let new_names = out
269                        .extract::<Vec<String>>(py)
270                        .expect("python function should return List[str]");
271                    polars_ensure!(new_names.len() == schema.len(),
272                        ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
273                    );
274                    Ok(schema
275                        .iter_values()
276                        .zip(new_names)
277                        .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
278                        .collect())
279                })
280            };
281            r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
282        }
283
284        Ok(r.finish().map_err(PyPolarsErr::from)?.into())
285    }
286
287    #[cfg(feature = "parquet")]
288    #[staticmethod]
289    #[pyo3(signature = (
290        sources, schema, scan_options, parallel, low_memory, use_statistics
291    ))]
292    fn new_from_parquet(
293        sources: Wrap<ScanSources>,
294        schema: Option<Wrap<Schema>>,
295        scan_options: PyScanOptions,
296        parallel: Wrap<ParallelStrategy>,
297        low_memory: bool,
298        use_statistics: bool,
299    ) -> PyResult<Self> {
300        use crate::utils::to_py_err;
301
302        let parallel = parallel.0;
303
304        let options = ParquetOptions {
305            schema: schema.map(|x| Arc::new(x.0)),
306            parallel,
307            low_memory,
308            use_statistics,
309        };
310
311        let sources = sources.0;
312        let first_path = sources.first_path();
313
314        let unified_scan_args =
315            scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
316
317        let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
318            .map_err(to_py_err)?
319            .build()
320            .into();
321
322        Ok(lf.into())
323    }
324
325    #[cfg(feature = "ipc")]
326    #[staticmethod]
327    #[pyo3(signature = (sources, record_batch_statistics, scan_options))]
328    fn new_from_ipc(
329        sources: Wrap<ScanSources>,
330        record_batch_statistics: bool,
331        scan_options: PyScanOptions,
332    ) -> PyResult<Self> {
333        let options = IpcScanOptions {
334            record_batch_statistics,
335            checked: Default::default(),
336        };
337
338        let sources = sources.0;
339        let first_path = sources.first_path().cloned();
340
341        let unified_scan_args =
342            scan_options.extract_unified_scan_args(first_path.as_ref().and_then(|x| x.scheme()))?;
343
344        let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)
345            .map_err(PyPolarsErr::from)?;
346        Ok(lf.into())
347    }
348
349    #[cfg(feature = "scan_lines")]
350    #[staticmethod]
351    #[pyo3(signature = (sources, scan_options, name))]
352    fn new_from_scan_lines(
353        sources: Wrap<ScanSources>,
354        scan_options: PyScanOptions,
355        name: PyBackedStr,
356    ) -> PyResult<Self> {
357        let sources = sources.0;
358        let first_path = sources.first_path();
359
360        let unified_scan_args =
361            scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
362
363        let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())
364            .map_err(to_py_err)?
365            .build();
366        let lf: LazyFrame = dsl.into();
367
368        Ok(lf.into())
369    }
370
371    #[staticmethod]
372    #[pyo3(signature = (
373        dataset_object
374    ))]
375    fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {
376        let lf =
377            LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
378                .into();
379
380        Ok(lf)
381    }
382
383    #[staticmethod]
384    fn scan_from_python_function_arrow_schema(
385        schema: &Bound<'_, PyList>,
386        scan_fn: Py<PyAny>,
387        pyarrow: bool,
388        validate_schema: bool,
389        is_pure: bool,
390    ) -> PyResult<Self> {
391        let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
392
393        Ok(LazyFrame::scan_from_python_function(
394            Either::Right(schema),
395            scan_fn,
396            pyarrow,
397            validate_schema,
398            is_pure,
399        )
400        .into())
401    }
402
403    #[staticmethod]
404    fn scan_from_python_function_pl_schema(
405        schema: Vec<(PyBackedStr, Wrap<DataType>)>,
406        scan_fn: Py<PyAny>,
407        pyarrow: bool,
408        validate_schema: bool,
409        is_pure: bool,
410    ) -> PyResult<Self> {
411        let schema = Arc::new(Schema::from_iter(
412            schema
413                .into_iter()
414                .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
415        ));
416        Ok(LazyFrame::scan_from_python_function(
417            Either::Right(schema),
418            scan_fn,
419            pyarrow,
420            validate_schema,
421            is_pure,
422        )
423        .into())
424    }
425
426    #[staticmethod]
427    fn scan_from_python_function_schema_function(
428        schema_fn: Py<PyAny>,
429        scan_fn: Py<PyAny>,
430        validate_schema: bool,
431        is_pure: bool,
432    ) -> PyResult<Self> {
433        Ok(LazyFrame::scan_from_python_function(
434            Either::Left(schema_fn),
435            scan_fn,
436            false,
437            validate_schema,
438            is_pure,
439        )
440        .into())
441    }
442
443    fn describe_plan(&self, py: Python) -> PyResult<String> {
444        py.enter_polars(|| self.ldf.read().describe_plan())
445    }
446
447    fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
448        py.enter_polars(|| self.ldf.read().describe_optimized_plan())
449    }
450
451    fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
452        py.enter_polars(|| self.ldf.read().describe_plan_tree())
453    }
454
455    fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
456        py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
457    }
458
459    fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
460        py.enter_polars(|| self.ldf.read().to_dot(optimized))
461    }
462
463    #[cfg(feature = "new_streaming")]
464    fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
465        py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
466    }
467
468    fn sort(
469        &self,
470        by_column: &str,
471        descending: bool,
472        nulls_last: bool,
473        maintain_order: bool,
474        multithreaded: bool,
475    ) -> Self {
476        let ldf = self.ldf.read().clone();
477        ldf.sort(
478            [by_column],
479            SortMultipleOptions {
480                descending: vec![descending],
481                nulls_last: vec![nulls_last],
482                multithreaded,
483                maintain_order,
484                limit: None,
485            },
486        )
487        .into()
488    }
489
490    fn sort_by_exprs(
491        &self,
492        by: Vec<PyExpr>,
493        descending: Vec<bool>,
494        nulls_last: Vec<bool>,
495        maintain_order: bool,
496        multithreaded: bool,
497    ) -> Self {
498        let ldf = self.ldf.read().clone();
499        let exprs = by.to_exprs();
500        ldf.sort_by_exprs(
501            exprs,
502            SortMultipleOptions {
503                descending,
504                nulls_last,
505                maintain_order,
506                multithreaded,
507                limit: None,
508            },
509        )
510        .into()
511    }
512
513    fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
514        let ldf = self.ldf.read().clone();
515        let exprs = by.to_exprs();
516        ldf.top_k(
517            k,
518            exprs,
519            SortMultipleOptions::new().with_order_descending_multi(reverse),
520        )
521        .into()
522    }
523
524    fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
525        let ldf = self.ldf.read().clone();
526        let exprs = by.to_exprs();
527        ldf.bottom_k(
528            k,
529            exprs,
530            SortMultipleOptions::new().with_order_descending_multi(reverse),
531        )
532        .into()
533    }
534
535    fn cache(&self) -> Self {
536        let ldf = self.ldf.read().clone();
537        ldf.cache().into()
538    }
539
540    #[pyo3(signature = (optflags))]
541    fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
542        let ldf = self.ldf.read().clone();
543        ldf.with_optimizations(optflags.inner.into_inner()).into()
544    }
545
546    #[pyo3(signature = (lambda_post_opt))]
547    fn profile(
548        &self,
549        py: Python<'_>,
550        lambda_post_opt: Option<Py<PyAny>>,
551    ) -> PyResult<(PyDataFrame, PyDataFrame)> {
552        let (df, time_df) = py.enter_polars(|| {
553            let ldf = self.ldf.read().clone();
554            if let Some(lambda) = lambda_post_opt {
555                ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
556                    post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
557                })
558            } else {
559                ldf.profile()
560            }
561        })?;
562        Ok((df.into(), time_df.into()))
563    }
564
565    #[pyo3(signature = (engine, lambda_post_opt))]
566    fn collect(
567        &self,
568        py: Python<'_>,
569        engine: Wrap<Engine>,
570        lambda_post_opt: Option<Py<PyAny>>,
571    ) -> PyResult<PyDataFrame> {
572        py.enter_polars_df(|| {
573            let ldf = self.ldf.read().clone();
574            if let Some(lambda) = lambda_post_opt {
575                ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
576                    post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
577                })
578            } else {
579                ldf.collect_with_engine(engine.0)
580            }
581        })
582    }
583
584    #[cfg(feature = "async")]
585    #[pyo3(signature = (engine, lambda))]
586    fn collect_with_callback(
587        &self,
588        py: Python<'_>,
589        engine: Wrap<Engine>,
590        lambda: Py<PyAny>,
591    ) -> PyResult<()> {
592        py.enter_polars_ok(|| {
593            let ldf = self.ldf.read().clone();
594
595            // We use a tokio spawn_blocking here as it has a high blocking
596            // thread pool limit.
597            polars_io::pl_async::get_runtime().spawn_blocking(move || {
598                let result = ldf
599                    .collect_with_engine(engine.0)
600                    .map(PyDataFrame::new)
601                    .map_err(PyPolarsErr::from);
602
603                Python::attach(|py| match result {
604                    Ok(df) => {
605                        lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
606                    },
607                    Err(err) => {
608                        lambda
609                            .call1(py, (PyErr::from(err),))
610                            .map_err(|err| err.restore(py))
611                            .ok();
612                    },
613                });
614            });
615        })
616    }
617
618    #[cfg(feature = "async")]
619    fn collect_batches(
620        &self,
621        py: Python<'_>,
622        engine: Wrap<Engine>,
623        maintain_order: bool,
624        chunk_size: Option<NonZeroUsize>,
625        lazy: bool,
626    ) -> PyResult<PyCollectBatches> {
627        py.enter_polars(|| {
628            let ldf = self.ldf.read().clone();
629
630            let collect_batches = ldf
631                .clone()
632                .collect_batches(engine.0, maintain_order, chunk_size, lazy)
633                .map_err(PyPolarsErr::from)?;
634
635            PyResult::Ok(PyCollectBatches {
636                inner: Arc::new(Mutex::new(collect_batches)),
637                ldf,
638            })
639        })
640    }
641
642    #[cfg(feature = "parquet")]
643    #[pyo3(signature = (
644        target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size,
645        metadata, arrow_schema
646    ))]
647    fn sink_parquet(
648        &self,
649        py: Python<'_>,
650        target: PyFileSinkDestination,
651        sink_options: PySinkOptions,
652        compression: &str,
653        compression_level: Option<i32>,
654        statistics: Wrap<StatisticsOptions>,
655        row_group_size: Option<usize>,
656        data_page_size: Option<usize>,
657        metadata: Wrap<Option<KeyValueMetadata>>,
658        arrow_schema: Option<Wrap<ArrowSchema>>,
659    ) -> PyResult<PyLazyFrame> {
660        let compression = parse_parquet_compression(compression, compression_level)?;
661
662        let options = ParquetWriteOptions {
663            compression,
664            statistics: statistics.0,
665            row_group_size,
666            data_page_size,
667            key_value_metadata: metadata.0,
668            arrow_schema: arrow_schema.map(|x| Arc::new(x.0)),
669            compat_level: None,
670        };
671
672        let target = target.extract_file_sink_destination()?;
673        let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
674
675        py.enter_polars(|| {
676            self.ldf
677                .read()
678                .clone()
679                .sink(
680                    target,
681                    FileWriteFormat::Parquet(Arc::new(options)),
682                    unified_sink_args,
683                )
684                .into()
685        })
686        .map(Into::into)
687        .map_err(Into::into)
688    }
689
690    #[cfg(feature = "ipc")]
691    #[pyo3(signature = (
692        target, sink_options, compression, compat_level, record_batch_size, record_batch_statistics
693    ))]
694    fn sink_ipc(
695        &self,
696        py: Python<'_>,
697        target: PyFileSinkDestination,
698        sink_options: PySinkOptions,
699        compression: Wrap<Option<IpcCompression>>,
700        compat_level: PyCompatLevel,
701        record_batch_size: Option<usize>,
702        record_batch_statistics: bool,
703    ) -> PyResult<PyLazyFrame> {
704        let options = IpcWriterOptions {
705            compression: compression.0,
706            compat_level: compat_level.0,
707            record_batch_size,
708            record_batch_statistics,
709        };
710
711        let target = target.extract_file_sink_destination()?;
712        let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
713
714        py.enter_polars(|| {
715            self.ldf
716                .read()
717                .clone()
718                .sink(target, FileWriteFormat::Ipc(options), unified_sink_args)
719                .into()
720        })
721        .map(Into::into)
722        .map_err(Into::into)
723    }
724
725    #[cfg(feature = "csv")]
726    #[pyo3(signature = (
727        target, sink_options, include_bom, compression, compression_level, check_extension,
728        include_header, separator, line_terminator, quote_char, batch_size, datetime_format,
729        date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
730        quote_style
731    ))]
732    fn sink_csv(
733        &self,
734        py: Python<'_>,
735        target: PyFileSinkDestination,
736        sink_options: PySinkOptions,
737        include_bom: bool,
738        compression: &str,
739        compression_level: Option<u32>,
740        check_extension: bool,
741        include_header: bool,
742        separator: u8,
743        line_terminator: Wrap<PlSmallStr>,
744        quote_char: u8,
745        batch_size: NonZeroUsize,
746        datetime_format: Option<Wrap<PlSmallStr>>,
747        date_format: Option<Wrap<PlSmallStr>>,
748        time_format: Option<Wrap<PlSmallStr>>,
749        float_scientific: Option<bool>,
750        float_precision: Option<usize>,
751        decimal_comma: bool,
752        null_value: Option<Wrap<PlSmallStr>>,
753        quote_style: Option<Wrap<QuoteStyle>>,
754    ) -> PyResult<PyLazyFrame> {
755        let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
756        let null_value = null_value
757            .map(|x| x.0)
758            .unwrap_or(SerializeOptions::default().null);
759
760        let serialize_options = SerializeOptions {
761            date_format: date_format.map(|x| x.0),
762            time_format: time_format.map(|x| x.0),
763            datetime_format: datetime_format.map(|x| x.0),
764            float_scientific,
765            float_precision,
766            decimal_comma,
767            separator,
768            quote_char,
769            null: null_value,
770            line_terminator: line_terminator.0,
771            quote_style,
772        };
773
774        let options = CsvWriterOptions {
775            include_bom,
776            compression: ExternalCompression::try_from(compression, compression_level)
777                .map_err(PyPolarsErr::from)?,
778            check_extension,
779            include_header,
780            batch_size,
781            serialize_options: serialize_options.into(),
782        };
783
784        let target = target.extract_file_sink_destination()?;
785        let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
786
787        py.enter_polars(|| {
788            self.ldf
789                .read()
790                .clone()
791                .sink(target, FileWriteFormat::Csv(options), unified_sink_args)
792                .into()
793        })
794        .map(Into::into)
795        .map_err(Into::into)
796    }
797
798    #[allow(clippy::too_many_arguments)]
799    #[cfg(feature = "json")]
800    #[pyo3(signature = (target, compression, compression_level, check_extension, sink_options))]
801    fn sink_ndjson(
802        &self,
803        py: Python<'_>,
804        target: PyFileSinkDestination,
805        compression: &str,
806        compression_level: Option<u32>,
807        check_extension: bool,
808        sink_options: PySinkOptions,
809    ) -> PyResult<PyLazyFrame> {
810        let options = NDJsonWriterOptions {
811            compression: ExternalCompression::try_from(compression, compression_level)
812                .map_err(PyPolarsErr::from)?,
813            check_extension,
814        };
815
816        let target = target.extract_file_sink_destination()?;
817        let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
818
819        py.enter_polars(|| {
820            self.ldf
821                .read()
822                .clone()
823                .sink(target, FileWriteFormat::NDJson(options), unified_sink_args)
824                .into()
825        })
826        .map(Into::into)
827        .map_err(Into::into)
828    }
829
830    #[pyo3(signature = (function, maintain_order, chunk_size))]
831    pub fn sink_batches(
832        &self,
833        py: Python<'_>,
834        function: Py<PyAny>,
835        maintain_order: bool,
836        chunk_size: Option<NonZeroUsize>,
837    ) -> PyResult<PyLazyFrame> {
838        let ldf = self.ldf.read().clone();
839        py.enter_polars(|| {
840            ldf.sink_batches(
841                PlanCallback::new_python(PythonObject(function)),
842                maintain_order,
843                chunk_size,
844            )
845        })
846        .map(Into::into)
847        .map_err(Into::into)
848    }
849
850    fn filter(&self, predicate: PyExpr) -> Self {
851        self.ldf.read().clone().filter(predicate.inner).into()
852    }
853
854    fn remove(&self, predicate: PyExpr) -> Self {
855        let ldf = self.ldf.read().clone();
856        ldf.remove(predicate.inner).into()
857    }
858
859    fn select(&self, exprs: Vec<PyExpr>) -> Self {
860        let ldf = self.ldf.read().clone();
861        let exprs = exprs.to_exprs();
862        ldf.select(exprs).into()
863    }
864
865    fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
866        let ldf = self.ldf.read().clone();
867        let exprs = exprs.to_exprs();
868        ldf.select_seq(exprs).into()
869    }
870
871    fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
872        let ldf = self.ldf.read().clone();
873        let by = by.to_exprs();
874        let lazy_gb = if maintain_order {
875            ldf.group_by_stable(by)
876        } else {
877            ldf.group_by(by)
878        };
879
880        PyLazyGroupBy { lgb: Some(lazy_gb) }
881    }
882
883    fn rolling(
884        &self,
885        index_column: PyExpr,
886        period: &str,
887        offset: &str,
888        closed: Wrap<ClosedWindow>,
889        by: Vec<PyExpr>,
890    ) -> PyResult<PyLazyGroupBy> {
891        let closed_window = closed.0;
892        let ldf = self.ldf.read().clone();
893        let by = by
894            .into_iter()
895            .map(|pyexpr| pyexpr.inner)
896            .collect::<Vec<_>>();
897        let lazy_gb = ldf.rolling(
898            index_column.inner,
899            by,
900            RollingGroupOptions {
901                index_column: "".into(),
902                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
903                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
904                closed_window,
905            },
906        );
907
908        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
909    }
910
911    fn group_by_dynamic(
912        &self,
913        index_column: PyExpr,
914        every: &str,
915        period: &str,
916        offset: &str,
917        label: Wrap<Label>,
918        include_boundaries: bool,
919        closed: Wrap<ClosedWindow>,
920        group_by: Vec<PyExpr>,
921        start_by: Wrap<StartBy>,
922    ) -> PyResult<PyLazyGroupBy> {
923        let closed_window = closed.0;
924        let group_by = group_by
925            .into_iter()
926            .map(|pyexpr| pyexpr.inner)
927            .collect::<Vec<_>>();
928        let ldf = self.ldf.read().clone();
929        let lazy_gb = ldf.group_by_dynamic(
930            index_column.inner,
931            group_by,
932            DynamicGroupOptions {
933                every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
934                period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
935                offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
936                label: label.0,
937                include_boundaries,
938                closed_window,
939                start_by: start_by.0,
940                ..Default::default()
941            },
942        );
943
944        Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
945    }
946
947    fn with_context(&self, contexts: Vec<Self>) -> Self {
948        let contexts = contexts
949            .into_iter()
950            .map(|ldf| ldf.ldf.into_inner())
951            .collect::<Vec<_>>();
952        self.ldf.read().clone().with_context(contexts).into()
953    }
954
955    #[cfg(feature = "asof_join")]
956    #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
957    fn join_asof(
958        &self,
959        other: Self,
960        left_on: PyExpr,
961        right_on: PyExpr,
962        left_by: Option<Vec<PyBackedStr>>,
963        right_by: Option<Vec<PyBackedStr>>,
964        allow_parallel: bool,
965        force_parallel: bool,
966        suffix: String,
967        strategy: Wrap<AsofStrategy>,
968        tolerance: Option<Wrap<AnyValue<'_>>>,
969        tolerance_str: Option<String>,
970        coalesce: bool,
971        allow_eq: bool,
972        check_sortedness: bool,
973    ) -> PyResult<Self> {
974        let coalesce = if coalesce {
975            JoinCoalesce::CoalesceColumns
976        } else {
977            JoinCoalesce::KeepColumns
978        };
979        let ldf = self.ldf.read().clone();
980        let other = other.ldf.into_inner();
981        let left_on = left_on.inner;
982        let right_on = right_on.inner;
983        Ok(ldf
984            .join_builder()
985            .with(other)
986            .left_on([left_on])
987            .right_on([right_on])
988            .allow_parallel(allow_parallel)
989            .force_parallel(force_parallel)
990            .coalesce(coalesce)
991            .how(JoinType::AsOf(Box::new(AsOfOptions {
992                strategy: strategy.0,
993                left_by: left_by.map(strings_to_pl_smallstr),
994                right_by: right_by.map(strings_to_pl_smallstr),
995                tolerance: tolerance.map(|t| {
996                    let av = t.0.into_static();
997                    let dtype = av.dtype();
998                    Scalar::new(dtype, av)
999                }),
1000                tolerance_str: tolerance_str.map(|s| s.into()),
1001                allow_eq,
1002                check_sortedness,
1003            })))
1004            .suffix(suffix)
1005            .finish()
1006            .into())
1007    }
1008
1009    #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1010    fn join(
1011        &self,
1012        other: Self,
1013        left_on: Vec<PyExpr>,
1014        right_on: Vec<PyExpr>,
1015        allow_parallel: bool,
1016        force_parallel: bool,
1017        nulls_equal: bool,
1018        how: Wrap<JoinType>,
1019        suffix: String,
1020        validate: Wrap<JoinValidation>,
1021        maintain_order: Wrap<MaintainOrderJoin>,
1022        coalesce: Option<bool>,
1023    ) -> PyResult<Self> {
1024        let coalesce = match coalesce {
1025            None => JoinCoalesce::JoinSpecific,
1026            Some(true) => JoinCoalesce::CoalesceColumns,
1027            Some(false) => JoinCoalesce::KeepColumns,
1028        };
1029        let ldf = self.ldf.read().clone();
1030        let other = other.ldf.into_inner();
1031        let left_on = left_on
1032            .into_iter()
1033            .map(|pyexpr| pyexpr.inner)
1034            .collect::<Vec<_>>();
1035        let right_on = right_on
1036            .into_iter()
1037            .map(|pyexpr| pyexpr.inner)
1038            .collect::<Vec<_>>();
1039
1040        Ok(ldf
1041            .join_builder()
1042            .with(other)
1043            .left_on(left_on)
1044            .right_on(right_on)
1045            .allow_parallel(allow_parallel)
1046            .force_parallel(force_parallel)
1047            .join_nulls(nulls_equal)
1048            .how(how.0)
1049            .suffix(suffix)
1050            .validate(validate.0)
1051            .coalesce(coalesce)
1052            .maintain_order(maintain_order.0)
1053            .finish()
1054            .into())
1055    }
1056
1057    fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1058        let ldf = self.ldf.read().clone();
1059        let other = other.ldf.into_inner();
1060
1061        let predicates = predicates.to_exprs();
1062
1063        Ok(ldf
1064            .join_builder()
1065            .with(other)
1066            .suffix(suffix)
1067            .join_where(predicates)
1068            .into())
1069    }
1070
1071    fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1072        let ldf = self.ldf.read().clone();
1073        ldf.with_columns(exprs.to_exprs()).into()
1074    }
1075
1076    fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1077        let ldf = self.ldf.read().clone();
1078        ldf.with_columns_seq(exprs.to_exprs()).into()
1079    }
1080
1081    fn match_to_schema<'py>(
1082        &self,
1083        schema: Wrap<Schema>,
1084        missing_columns: &Bound<'py, PyAny>,
1085        missing_struct_fields: &Bound<'py, PyAny>,
1086        extra_columns: Wrap<ExtraColumnsPolicy>,
1087        extra_struct_fields: &Bound<'py, PyAny>,
1088        integer_cast: &Bound<'py, PyAny>,
1089        float_cast: &Bound<'py, PyAny>,
1090    ) -> PyResult<Self> {
1091        fn parse_missing_columns<'py>(
1092            schema: &Schema,
1093            missing_columns: &Bound<'py, PyAny>,
1094        ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1095            let mut out = Vec::with_capacity(schema.len());
1096            if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1097                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1098            } else if let Ok(dict) = missing_columns.cast::<PyDict>() {
1099                out.extend(std::iter::repeat_n(
1100                    MissingColumnsPolicyOrExpr::Raise,
1101                    schema.len(),
1102                ));
1103                for (key, value) in dict.iter() {
1104                    let key = key.extract::<String>()?;
1105                    let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1106                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1107                }
1108            } else {
1109                return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1110            }
1111            Ok(out)
1112        }
1113        fn parse_missing_struct_fields<'py>(
1114            schema: &Schema,
1115            missing_struct_fields: &Bound<'py, PyAny>,
1116        ) -> PyResult<Vec<MissingColumnsPolicy>> {
1117            let mut out = Vec::with_capacity(schema.len());
1118            if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1119                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1120            } else if let Ok(dict) = missing_struct_fields.cast::<PyDict>() {
1121                out.extend(std::iter::repeat_n(
1122                    MissingColumnsPolicy::Raise,
1123                    schema.len(),
1124                ));
1125                for (key, value) in dict.iter() {
1126                    let key = key.extract::<String>()?;
1127                    let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1128                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1129                }
1130            } else {
1131                return Err(PyTypeError::new_err(
1132                    "Invalid value for `missing_struct_fields`",
1133                ));
1134            }
1135            Ok(out)
1136        }
1137        fn parse_extra_struct_fields<'py>(
1138            schema: &Schema,
1139            extra_struct_fields: &Bound<'py, PyAny>,
1140        ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1141            let mut out = Vec::with_capacity(schema.len());
1142            if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1143                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1144            } else if let Ok(dict) = extra_struct_fields.cast::<PyDict>() {
1145                out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1146                for (key, value) in dict.iter() {
1147                    let key = key.extract::<String>()?;
1148                    let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1149                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1150                }
1151            } else {
1152                return Err(PyTypeError::new_err(
1153                    "Invalid value for `extra_struct_fields`",
1154                ));
1155            }
1156            Ok(out)
1157        }
1158        fn parse_cast<'py>(
1159            schema: &Schema,
1160            cast: &Bound<'py, PyAny>,
1161        ) -> PyResult<Vec<UpcastOrForbid>> {
1162            let mut out = Vec::with_capacity(schema.len());
1163            if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1164                out.extend(std::iter::repeat_n(policy.0, schema.len()));
1165            } else if let Ok(dict) = cast.cast::<PyDict>() {
1166                out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1167                for (key, value) in dict.iter() {
1168                    let key = key.extract::<String>()?;
1169                    let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1170                    out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1171                }
1172            } else {
1173                return Err(PyTypeError::new_err(
1174                    "Invalid value for `integer_cast` / `float_cast`",
1175                ));
1176            }
1177            Ok(out)
1178        }
1179
1180        let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1181        let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1182        let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1183        let integer_cast = parse_cast(&schema.0, integer_cast)?;
1184        let float_cast = parse_cast(&schema.0, float_cast)?;
1185
1186        let per_column = (0..schema.0.len())
1187            .map(|i| MatchToSchemaPerColumn {
1188                missing_columns: missing_columns[i].clone(),
1189                missing_struct_fields: missing_struct_fields[i],
1190                extra_struct_fields: extra_struct_fields[i],
1191                integer_cast: integer_cast[i],
1192                float_cast: float_cast[i],
1193            })
1194            .collect();
1195
1196        let ldf = self.ldf.read().clone();
1197        Ok(ldf
1198            .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1199            .into())
1200    }
1201
1202    fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {
1203        let ldf = self.ldf.read().clone();
1204        let function = PythonObject(callback);
1205        ldf.pipe_with_schema(PlanCallback::new_python(function))
1206            .into()
1207    }
1208
1209    fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1210        let ldf = self.ldf.read().clone();
1211        ldf.rename(existing, new, strict).into()
1212    }
1213
1214    fn reverse(&self) -> Self {
1215        let ldf = self.ldf.read().clone();
1216        ldf.reverse().into()
1217    }
1218
1219    #[pyo3(signature = (n, fill_value=None))]
1220    fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1221        let lf = self.ldf.read().clone();
1222        let out = match fill_value {
1223            Some(v) => lf.shift_and_fill(n.inner, v.inner),
1224            None => lf.shift(n.inner),
1225        };
1226        out.into()
1227    }
1228
1229    fn fill_nan(&self, fill_value: PyExpr) -> Self {
1230        let ldf = self.ldf.read().clone();
1231        ldf.fill_nan(fill_value.inner).into()
1232    }
1233
1234    fn min(&self) -> Self {
1235        let ldf = self.ldf.read().clone();
1236        let out = ldf.min();
1237        out.into()
1238    }
1239
1240    fn max(&self) -> Self {
1241        let ldf = self.ldf.read().clone();
1242        let out = ldf.max();
1243        out.into()
1244    }
1245
1246    fn sum(&self) -> Self {
1247        let ldf = self.ldf.read().clone();
1248        let out = ldf.sum();
1249        out.into()
1250    }
1251
1252    fn mean(&self) -> Self {
1253        let ldf = self.ldf.read().clone();
1254        let out = ldf.mean();
1255        out.into()
1256    }
1257
1258    fn std(&self, ddof: u8) -> Self {
1259        let ldf = self.ldf.read().clone();
1260        let out = ldf.std(ddof);
1261        out.into()
1262    }
1263
1264    fn var(&self, ddof: u8) -> Self {
1265        let ldf = self.ldf.read().clone();
1266        let out = ldf.var(ddof);
1267        out.into()
1268    }
1269
1270    fn median(&self) -> Self {
1271        let ldf = self.ldf.read().clone();
1272        let out = ldf.median();
1273        out.into()
1274    }
1275
1276    fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1277        let ldf = self.ldf.read().clone();
1278        let out = ldf.quantile(quantile.inner, interpolation.0);
1279        out.into()
1280    }
1281
1282    fn explode(&self, subset: PySelector, empty_as_null: bool, keep_nulls: bool) -> Self {
1283        self.ldf
1284            .read()
1285            .clone()
1286            .explode(
1287                subset.inner,
1288                ExplodeOptions {
1289                    empty_as_null,
1290                    keep_nulls,
1291                },
1292            )
1293            .into()
1294    }
1295
1296    fn null_count(&self) -> Self {
1297        let ldf = self.ldf.read().clone();
1298        ldf.null_count().into()
1299    }
1300
1301    #[pyo3(signature = (maintain_order, subset, keep))]
1302    fn unique(
1303        &self,
1304        maintain_order: bool,
1305        subset: Option<Vec<PyExpr>>,
1306        keep: Wrap<UniqueKeepStrategy>,
1307    ) -> Self {
1308        let ldf = self.ldf.read().clone();
1309        let subset = subset.map(|exprs| exprs.into_iter().map(|e| e.inner).collect());
1310        match maintain_order {
1311            true => ldf.unique_stable_generic(subset, keep.0),
1312            false => ldf.unique_generic(subset, keep.0),
1313        }
1314        .into()
1315    }
1316
1317    fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1318        self.ldf
1319            .read()
1320            .clone()
1321            .drop_nans(subset.map(|e| e.inner))
1322            .into()
1323    }
1324
1325    fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1326        self.ldf
1327            .read()
1328            .clone()
1329            .drop_nulls(subset.map(|e| e.inner))
1330            .into()
1331    }
1332
1333    #[pyo3(signature = (offset, len=None))]
1334    fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1335        let ldf = self.ldf.read().clone();
1336        ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1337    }
1338
1339    fn tail(&self, n: IdxSize) -> Self {
1340        let ldf = self.ldf.read().clone();
1341        ldf.tail(n).into()
1342    }
1343
1344    #[cfg(feature = "pivot")]
1345    #[pyo3(signature = (on, on_columns, index, values, agg, maintain_order, separator))]
1346    fn pivot(
1347        &self,
1348        on: PySelector,
1349        on_columns: PyDataFrame,
1350        index: PySelector,
1351        values: PySelector,
1352        agg: PyExpr,
1353        maintain_order: bool,
1354        separator: String,
1355    ) -> Self {
1356        let ldf = self.ldf.read().clone();
1357        ldf.pivot(
1358            on.inner,
1359            Arc::new(on_columns.df.read().clone()),
1360            index.inner,
1361            values.inner,
1362            agg.inner,
1363            maintain_order,
1364            separator.into(),
1365        )
1366        .into()
1367    }
1368
1369    #[cfg(feature = "pivot")]
1370    #[pyo3(signature = (on, index, value_name, variable_name))]
1371    fn unpivot(
1372        &self,
1373        on: Option<PySelector>,
1374        index: PySelector,
1375        value_name: Option<String>,
1376        variable_name: Option<String>,
1377    ) -> Self {
1378        let args = UnpivotArgsDSL {
1379            on: on.map(|on| on.inner),
1380            index: index.inner,
1381            value_name: value_name.map(|s| s.into()),
1382            variable_name: variable_name.map(|s| s.into()),
1383        };
1384
1385        let ldf = self.ldf.read().clone();
1386        ldf.unpivot(args).into()
1387    }
1388
1389    #[pyo3(signature = (name, offset=None))]
1390    fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1391        let ldf = self.ldf.read().clone();
1392        ldf.with_row_index(name, offset).into()
1393    }
1394
1395    #[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1396    fn map_batches(
1397        &self,
1398        function: Py<PyAny>,
1399        predicate_pushdown: bool,
1400        projection_pushdown: bool,
1401        slice_pushdown: bool,
1402        streamable: bool,
1403        schema: Option<Wrap<Schema>>,
1404        validate_output: bool,
1405    ) -> Self {
1406        let mut opt = OptFlags::default();
1407        opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1408        opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1409        opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1410        opt.set(OptFlags::NEW_STREAMING, streamable);
1411
1412        self.ldf
1413            .read()
1414            .clone()
1415            .map_python(
1416                function.into(),
1417                opt,
1418                schema.map(|s| Arc::new(s.0)),
1419                validate_output,
1420            )
1421            .into()
1422    }
1423
1424    fn drop(&self, columns: PySelector) -> Self {
1425        self.ldf.read().clone().drop(columns.inner).into()
1426    }
1427
1428    fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1429        let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1430        cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1431        self.ldf.read().clone().cast(cast_map, strict).into()
1432    }
1433
1434    fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1435        self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1436    }
1437
1438    fn clone(&self) -> Self {
1439        self.ldf.read().clone().into()
1440    }
1441
1442    fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1443        let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1444
1445        let schema_dict = PyDict::new(py);
1446        schema.iter_fields().for_each(|fld| {
1447            schema_dict
1448                .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1449                .unwrap()
1450        });
1451        Ok(schema_dict)
1452    }
1453
1454    fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {
1455        self.ldf
1456            .read()
1457            .clone()
1458            .unnest(columns.inner, separator.map(PlSmallStr::from_str))
1459            .into()
1460    }
1461
1462    fn count(&self) -> Self {
1463        let ldf = self.ldf.read().clone();
1464        ldf.count().into()
1465    }
1466
1467    #[cfg(feature = "merge_sorted")]
1468    fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1469        let out = self
1470            .ldf
1471            .read()
1472            .clone()
1473            .merge_sorted(other.ldf.into_inner(), key)
1474            .map_err(PyPolarsErr::from)?;
1475        Ok(out.into())
1476    }
1477
1478    fn _node_name(&self) -> &str {
1479        let plan = &self.ldf.read().logical_plan;
1480        plan.into()
1481    }
1482
1483    fn hint_sorted(
1484        &self,
1485        columns: Vec<String>,
1486        descending: Vec<bool>,
1487        nulls_last: Vec<bool>,
1488    ) -> PyResult<Self> {
1489        if columns.len() != descending.len() && descending.len() != 1 {
1490            return Err(PyValueError::new_err(
1491                "`set_sorted` expects the same amount of `columns` as `descending` values.",
1492            ));
1493        }
1494        if columns.len() != nulls_last.len() && nulls_last.len() != 1 {
1495            return Err(PyValueError::new_err(
1496                "`set_sorted` expects the same amount of `columns` as `nulls_last` values.",
1497            ));
1498        }
1499
1500        let mut sorted = columns
1501            .iter()
1502            .map(|c| Sorted {
1503                column: PlSmallStr::from_str(c.as_str()),
1504                descending: Some(false),
1505                nulls_last: Some(false),
1506            })
1507            .collect::<Vec<_>>();
1508
1509        if !columns.is_empty() {
1510            if descending.len() != 1 {
1511                sorted
1512                    .iter_mut()
1513                    .zip(descending)
1514                    .for_each(|(s, d)| s.descending = Some(d));
1515            } else if descending[0] {
1516                sorted.iter_mut().for_each(|s| s.descending = Some(true));
1517            }
1518
1519            if nulls_last.len() != 1 {
1520                sorted
1521                    .iter_mut()
1522                    .zip(nulls_last)
1523                    .for_each(|(s, d)| s.nulls_last = Some(d));
1524            } else if nulls_last[0] {
1525                sorted.iter_mut().for_each(|s| s.nulls_last = Some(true));
1526            }
1527        }
1528
1529        let out = self
1530            .ldf
1531            .read()
1532            .clone()
1533            .hint(HintIR::Sorted(sorted.into()))
1534            .map_err(PyPolarsErr::from)?;
1535        Ok(out.into())
1536    }
1537}
1538
1539#[pyclass(frozen)]
1540struct PyCollectBatches {
1541    inner: Arc<Mutex<CollectBatches>>,
1542    ldf: LazyFrame,
1543}
1544
1545#[pymethods]
1546impl PyCollectBatches {
1547    fn start(&self) {
1548        self.inner.lock().start();
1549    }
1550
1551    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
1552        slf
1553    }
1554
1555    fn __next__(slf: PyRef<'_, Self>, py: Python) -> PyResult<Option<PyDataFrame>> {
1556        let inner = Arc::clone(&slf.inner);
1557        py.enter_polars(|| PolarsResult::Ok(inner.lock().next().transpose()?.map(PyDataFrame::new)))
1558    }
1559
1560    #[allow(unused_variables)]
1561    #[pyo3(signature = (requested_schema=None))]
1562    fn __arrow_c_stream__<'py>(
1563        &self,
1564        py: Python<'py>,
1565        requested_schema: Option<Py<PyAny>>,
1566    ) -> PyResult<Bound<'py, PyCapsule>> {
1567        let mut ldf = self.ldf.clone();
1568        let schema = ldf
1569            .collect_schema()
1570            .map_err(PyPolarsErr::from)?
1571            .to_arrow(CompatLevel::newest());
1572
1573        let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());
1574
1575        let iter = Box::new(ArrowStreamIterator::new(self.inner.clone(), dtype.clone()));
1576        let field = ArrowField::new(PlSmallStr::EMPTY, dtype, false);
1577        let stream = export_iterator(iter, field);
1578        let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
1579        PyCapsule::new(py, stream, Some(stream_capsule_name))
1580    }
1581}
1582
1583pub struct ArrowStreamIterator {
1584    inner: Arc<Mutex<CollectBatches>>,
1585    dtype: ArrowDataType,
1586}
1587
1588impl ArrowStreamIterator {
1589    fn new(inner: Arc<Mutex<CollectBatches>>, schema: ArrowDataType) -> Self {
1590        Self {
1591            inner,
1592            dtype: schema,
1593        }
1594    }
1595}
1596
1597impl Iterator for ArrowStreamIterator {
1598    type Item = PolarsResult<ArrayRef>;
1599
1600    fn next(&mut self) -> Option<Self::Item> {
1601        let next = self.inner.lock().next();
1602        match next {
1603            None => None,
1604            Some(Err(err)) => Some(Err(err)),
1605            Some(Ok(df)) => {
1606                let height = df.height();
1607                let arrays = df.rechunk_into_arrow(CompatLevel::newest());
1608                Some(Ok(Box::new(arrow::array::StructArray::new(
1609                    self.dtype.clone(),
1610                    height,
1611                    arrays,
1612                    None,
1613                ))))
1614            },
1615        }
1616    }
1617}