1use std::collections::HashMap;
2use std::ffi::CString;
3use std::num::NonZeroUsize;
4
5use arrow::ffi::export_iterator;
6use either::Either;
7use parking_lot::Mutex;
8#[cfg(feature = "pivot")]
9use polars::frame::PivotColumnNaming;
10use polars::io::RowIndex;
11use polars::prelude::iceberg_sink_state::IcebergSinkState;
12use polars::time::*;
13use polars_core::prelude::*;
14use polars_core::query_result::QueryResult;
15#[cfg(feature = "parquet")]
16use polars_parquet::arrow::write::StatisticsOptions;
17use polars_plan::dsl::ScanSources;
18use polars_plan::plans::{AExpr, HintIR, IR, Sorted};
19use polars_utils::arena::{Arena, Node};
20use polars_utils::python_function::PythonObject;
21use pyo3::exceptions::{PyTypeError, PyValueError};
22use pyo3::prelude::*;
23use pyo3::pybacked::PyBackedStr;
24use pyo3::types::{PyCapsule, PyDict, PyDictMethods, PyList};
25
26use super::{PyLazyFrame, PyOptFlags};
27use crate::error::PyPolarsErr;
28use crate::expr::ToExprs;
29use crate::expr::datatype::PyDataTypeExpr;
30use crate::expr::selector::PySelector;
31use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
32#[cfg(feature = "json")]
33use crate::io::cloud_options::OptPyCloudOptions;
34use crate::io::scan_options::PyScanOptions;
35use crate::io::sink_options::PySinkOptions;
36use crate::io::sink_output::PyFileSinkDestination;
37use crate::lazyframe::visit::NodeTraverser;
38use crate::prelude::*;
39use crate::utils::{EnterPolarsExt, to_py_err};
40use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
41
42fn pyobject_to_first_path_and_scan_sources(
43 obj: Py<PyAny>,
44) -> PyResult<(Option<PlRefPath>, ScanSources)> {
45 use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
46 Ok(match get_python_scan_source_input(obj, false)? {
47 PythonScanSourceInput::Path(path) => (
48 Some(path.clone()),
49 ScanSources::Paths(FromIterator::from_iter([path])),
50 ),
51 PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
52 PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
53 })
54}
55
56fn post_opt_callback(
57 lambda: &Py<PyAny>,
58 root: Node,
59 lp_arena: &mut Arena<IR>,
60 expr_arena: &mut Arena<AExpr>,
61 duration_since_start: Option<std::time::Duration>,
62) -> PolarsResult<()> {
63 Python::attach(|py| {
64 let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
65
66 let arenas = nt.get_arenas();
68
69 lambda
72 .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
73 .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
74
75 std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
79 std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
80
81 Ok(())
82 })
83}
84
85#[pymethods]
86#[allow(clippy::should_implement_trait)]
87impl PyLazyFrame {
88 #[staticmethod]
89 #[cfg(feature = "json")]
90 #[allow(clippy::too_many_arguments)]
91 #[pyo3(signature = (
92 source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
93 row_index, ignore_errors, include_file_paths, cloud_options, credential_provider
94 ))]
95 fn new_from_ndjson(
96 source: Option<Py<PyAny>>,
97 sources: Wrap<ScanSources>,
98 infer_schema_length: Option<usize>,
99 schema: Option<Wrap<Schema>>,
100 schema_overrides: Option<Wrap<Schema>>,
101 batch_size: Option<NonZeroUsize>,
102 n_rows: Option<usize>,
103 low_memory: bool,
104 rechunk: bool,
105 row_index: Option<(String, IdxSize)>,
106 ignore_errors: bool,
107 include_file_paths: Option<String>,
108 cloud_options: OptPyCloudOptions,
109 credential_provider: Option<Py<PyAny>>,
110 ) -> PyResult<Self> {
111 let row_index = row_index.map(|(name, offset)| RowIndex {
112 name: name.into(),
113 offset,
114 });
115
116 let sources = sources.0;
117 let (first_path, sources) = match source {
118 None => (sources.first_path().cloned(), sources),
119 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
120 };
121
122 let mut r = LazyJsonLineReader::new_with_sources(sources);
123
124 if let Some(first_path) = first_path {
125 let first_path_url = first_path.as_str();
126
127 let cloud_options = cloud_options.extract_opt_cloud_options(
128 CloudScheme::from_path(first_path_url),
129 credential_provider,
130 )?;
131
132 r = r.with_cloud_options(cloud_options);
133 };
134
135 let lf = r
136 .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
137 .with_batch_size(batch_size)
138 .with_n_rows(n_rows)
139 .low_memory(low_memory)
140 .with_rechunk(rechunk)
141 .with_schema(schema.map(|schema| Arc::new(schema.0)))
142 .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
143 .with_row_index(row_index)
144 .with_ignore_errors(ignore_errors)
145 .with_include_file_paths(include_file_paths.map(|x| x.into()))
146 .finish()
147 .map_err(PyPolarsErr::from)?;
148
149 Ok(lf.into())
150 }
151
152 #[staticmethod]
153 #[cfg(feature = "csv")]
154 #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
155 low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
156 infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
157 encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
158 cloud_options, credential_provider, include_file_paths, missing_columns
159 )
160 )]
161 fn new_from_csv(
162 source: Option<Py<PyAny>>,
163 sources: Wrap<ScanSources>,
164 separator: &str,
165 has_header: bool,
166 ignore_errors: bool,
167 skip_rows: usize,
168 skip_lines: usize,
169 n_rows: Option<usize>,
170 cache: bool,
171 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
172 low_memory: bool,
173 comment_prefix: Option<&str>,
174 quote_char: Option<&str>,
175 null_values: Option<Wrap<NullValues>>,
176 missing_utf8_is_empty_string: bool,
177 infer_schema_length: Option<usize>,
178 with_schema_modify: Option<Py<PyAny>>,
179 rechunk: bool,
180 skip_rows_after_header: usize,
181 encoding: Wrap<CsvEncoding>,
182 row_index: Option<(String, IdxSize)>,
183 try_parse_dates: bool,
184 eol_char: &str,
185 raise_if_empty: bool,
186 truncate_ragged_lines: bool,
187 decimal_comma: bool,
188 glob: bool,
189 schema: Option<Wrap<Schema>>,
190 cloud_options: OptPyCloudOptions,
191 credential_provider: Option<Py<PyAny>>,
192 include_file_paths: Option<String>,
193 missing_columns: Option<Wrap<MissingColumnsPolicy>>,
194 ) -> PyResult<Self> {
195 let null_values = null_values.map(|w| w.0);
196 let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
197 let separator = separator
198 .as_bytes()
199 .first()
200 .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
201 .copied()
202 .map_err(PyPolarsErr::from)?;
203 let eol_char = eol_char
204 .as_bytes()
205 .first()
206 .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
207 .copied()
208 .map_err(PyPolarsErr::from)?;
209 let row_index = row_index.map(|(name, offset)| RowIndex {
210 name: name.into(),
211 offset,
212 });
213
214 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
215 overwrite_dtype
216 .into_iter()
217 .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
218 .collect::<Schema>()
219 });
220
221 let sources = sources.0;
222 let (first_path, sources) = match source {
223 None => (sources.first_path().cloned(), sources),
224 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
225 };
226
227 let mut r = LazyCsvReader::new_with_sources(sources);
228
229 if let Some(first_path) = first_path {
230 let first_path_url = first_path.as_str();
231 let cloud_options = cloud_options.extract_opt_cloud_options(
232 CloudScheme::from_path(first_path_url),
233 credential_provider,
234 )?;
235 r = r.with_cloud_options(cloud_options);
236 }
237
238 let mut r = r
239 .with_infer_schema_length(infer_schema_length)
240 .with_separator(separator)
241 .with_has_header(has_header)
242 .with_ignore_errors(ignore_errors)
243 .with_skip_rows(skip_rows)
244 .with_skip_lines(skip_lines)
245 .with_n_rows(n_rows)
246 .with_cache(cache)
247 .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
248 .with_schema(schema.map(|schema| Arc::new(schema.0)))
249 .with_low_memory(low_memory)
250 .with_comment_prefix(comment_prefix.map(|x| x.into()))
251 .with_quote_char(quote_char)
252 .with_eol_char(eol_char)
253 .with_rechunk(rechunk)
254 .with_skip_rows_after_header(skip_rows_after_header)
255 .with_encoding(encoding.0)
256 .with_row_index(row_index)
257 .with_try_parse_dates(try_parse_dates)
258 .with_null_values(null_values)
259 .with_missing_is_null(!missing_utf8_is_empty_string)
260 .with_truncate_ragged_lines(truncate_ragged_lines)
261 .with_decimal_comma(decimal_comma)
262 .with_glob(glob)
263 .with_raise_if_empty(raise_if_empty)
264 .with_include_file_paths(include_file_paths.map(|x| x.into()))
265 .with_missing_columns_policy(missing_columns.map(|x| x.0));
266
267 if let Some(lambda) = with_schema_modify {
268 let f = |schema: Schema| {
269 let iter = schema.iter_names().map(|s| s.as_str());
270 Python::attach(|py| {
271 let names = PyList::new(py, iter).unwrap();
272
273 let out = lambda.call1(py, (names,)).expect("python function failed");
274 let new_names = out
275 .extract::<Vec<String>>(py)
276 .expect("python function should return List[str]");
277 polars_ensure!(new_names.len() == schema.len(),
278 ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
279 );
280 Ok(schema
281 .iter_values()
282 .zip(new_names)
283 .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
284 .collect())
285 })
286 };
287 r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
288 }
289
290 Ok(r.finish().map_err(PyPolarsErr::from)?.into())
291 }
292
293 #[cfg(feature = "parquet")]
294 #[staticmethod]
295 #[pyo3(signature = (
296 sources, schema, scan_options, parallel, low_memory, use_statistics
297 ))]
298 fn new_from_parquet(
299 sources: Wrap<ScanSources>,
300 schema: Option<Wrap<Schema>>,
301 scan_options: PyScanOptions,
302 parallel: Wrap<ParallelStrategy>,
303 low_memory: bool,
304 use_statistics: bool,
305 ) -> PyResult<Self> {
306 use crate::utils::to_py_err;
307
308 let parallel = parallel.0;
309
310 let options = ParquetOptions {
311 schema: schema.map(|x| Arc::new(x.0)),
312 parallel,
313 low_memory,
314 use_statistics,
315 };
316
317 let sources = sources.0;
318 let first_path = sources.first_path();
319
320 let unified_scan_args =
321 scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
322
323 let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
324 .map_err(to_py_err)?
325 .build()
326 .into();
327
328 Ok(lf.into())
329 }
330
331 #[cfg(feature = "ipc")]
332 #[staticmethod]
333 #[pyo3(signature = (sources, record_batch_statistics, scan_options))]
334 fn new_from_ipc(
335 sources: Wrap<ScanSources>,
336 record_batch_statistics: bool,
337 scan_options: PyScanOptions,
338 ) -> PyResult<Self> {
339 let options = IpcScanOptions {
340 record_batch_statistics,
341 checked: Default::default(),
342 };
343
344 let sources = sources.0;
345 let first_path = sources.first_path().cloned();
346
347 let unified_scan_args =
348 scan_options.extract_unified_scan_args(first_path.as_ref().and_then(|x| x.scheme()))?;
349
350 let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)
351 .map_err(PyPolarsErr::from)?;
352 Ok(lf.into())
353 }
354
355 #[cfg(feature = "scan_lines")]
356 #[staticmethod]
357 #[pyo3(signature = (sources, scan_options, name))]
358 fn new_from_scan_lines(
359 sources: Wrap<ScanSources>,
360 scan_options: PyScanOptions,
361 name: PyBackedStr,
362 ) -> PyResult<Self> {
363 let sources = sources.0;
364 let first_path = sources.first_path();
365
366 let unified_scan_args =
367 scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
368
369 let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())
370 .map_err(to_py_err)?
371 .build();
372 let lf: LazyFrame = dsl.into();
373
374 Ok(lf.into())
375 }
376
377 #[cfg(feature = "scan_lines")]
378 #[staticmethod]
379 #[pyo3(signature = (sources, scan_options, name))]
380 fn new_from_expand_paths(
381 sources: Wrap<ScanSources>,
382 scan_options: PyScanOptions,
383 name: PyBackedStr,
384 ) -> PyResult<Self> {
385 let sources = sources.0;
386 let first_path = sources.first_path();
387
388 let unified_scan_args =
389 scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
390
391 let dsl: DslPlan = DslBuilder::expand_paths(sources, unified_scan_args, (&*name).into())
392 .map_err(to_py_err)?
393 .build();
394 let lf: LazyFrame = dsl.into();
395
396 Ok(lf.into())
397 }
398
399 #[staticmethod]
400 #[pyo3(signature = (
401 dataset_object
402 ))]
403 fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {
404 let lf =
405 LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
406 .into();
407
408 Ok(lf)
409 }
410
411 #[staticmethod]
412 fn scan_from_python_function_arrow_schema(
413 schema: &Bound<'_, PyList>,
414 scan_fn: Py<PyAny>,
415 pyarrow: bool,
416 validate_schema: bool,
417 is_pure: bool,
418 ) -> PyResult<Self> {
419 let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
420
421 Ok(LazyFrame::scan_from_python_function(
422 Either::Right(schema),
423 scan_fn,
424 pyarrow,
425 validate_schema,
426 is_pure,
427 )
428 .into())
429 }
430
431 #[staticmethod]
432 fn scan_from_python_function_pl_schema(
433 schema: Vec<(PyBackedStr, Wrap<DataType>)>,
434 scan_fn: Py<PyAny>,
435 pyarrow: bool,
436 validate_schema: bool,
437 is_pure: bool,
438 ) -> PyResult<Self> {
439 let schema = Arc::new(Schema::from_iter(
440 schema
441 .into_iter()
442 .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
443 ));
444 Ok(LazyFrame::scan_from_python_function(
445 Either::Right(schema),
446 scan_fn,
447 pyarrow,
448 validate_schema,
449 is_pure,
450 )
451 .into())
452 }
453
454 #[staticmethod]
455 fn scan_from_python_function_schema_function(
456 schema_fn: Py<PyAny>,
457 scan_fn: Py<PyAny>,
458 validate_schema: bool,
459 is_pure: bool,
460 ) -> PyResult<Self> {
461 Ok(LazyFrame::scan_from_python_function(
462 Either::Left(schema_fn),
463 scan_fn,
464 false,
465 validate_schema,
466 is_pure,
467 )
468 .into())
469 }
470
471 fn describe_plan(&self, py: Python) -> PyResult<String> {
472 py.enter_polars(|| self.ldf.read().describe_plan())
473 }
474
475 fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
476 py.enter_polars(|| self.ldf.read().describe_optimized_plan())
477 }
478
479 fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
480 py.enter_polars(|| self.ldf.read().describe_plan_tree())
481 }
482
483 fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
484 py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
485 }
486
487 fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
488 py.enter_polars(|| self.ldf.read().to_dot(optimized))
489 }
490
491 #[cfg(feature = "streaming")]
492 fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
493 py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
494 }
495
496 fn sort(
497 &self,
498 by_column: &str,
499 descending: bool,
500 nulls_last: bool,
501 maintain_order: bool,
502 multithreaded: bool,
503 ) -> Self {
504 let ldf = self.ldf.read().clone();
505 ldf.sort(
506 [by_column],
507 SortMultipleOptions {
508 descending: vec![descending],
509 nulls_last: vec![nulls_last],
510 multithreaded,
511 maintain_order,
512 limit: None,
513 },
514 )
515 .into()
516 }
517
518 fn sort_by_exprs(
519 &self,
520 by: Vec<PyExpr>,
521 descending: Vec<bool>,
522 nulls_last: Vec<bool>,
523 maintain_order: bool,
524 multithreaded: bool,
525 ) -> Self {
526 let ldf = self.ldf.read().clone();
527 let exprs = by.to_exprs();
528 ldf.sort_by_exprs(
529 exprs,
530 SortMultipleOptions {
531 descending,
532 nulls_last,
533 maintain_order,
534 multithreaded,
535 limit: None,
536 },
537 )
538 .into()
539 }
540
541 fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
542 let ldf = self.ldf.read().clone();
543 let exprs = by.to_exprs();
544 ldf.top_k(
545 k,
546 exprs,
547 SortMultipleOptions::new().with_order_descending_multi(reverse),
548 )
549 .into()
550 }
551
552 fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
553 let ldf = self.ldf.read().clone();
554 let exprs = by.to_exprs();
555 ldf.bottom_k(
556 k,
557 exprs,
558 SortMultipleOptions::new().with_order_descending_multi(reverse),
559 )
560 .into()
561 }
562
563 fn cache(&self) -> Self {
564 let ldf = self.ldf.read().clone();
565 ldf.cache().into()
566 }
567
568 #[pyo3(signature = (optflags))]
569 fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
570 let ldf = self.ldf.read().clone();
571 ldf.with_optimizations(optflags.inner.into_inner()).into()
572 }
573
574 #[pyo3(signature = (lambda_post_opt))]
575 fn profile(
576 &self,
577 py: Python<'_>,
578 lambda_post_opt: Option<Py<PyAny>>,
579 ) -> PyResult<(PyDataFrame, PyDataFrame)> {
580 let (df, time_df) = py.enter_polars(|| {
581 let ldf = self.ldf.read().clone();
582 if let Some(lambda) = lambda_post_opt {
583 ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
584 post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
585 })
586 } else {
587 ldf.profile()
588 }
589 })?;
590 Ok((df.into(), time_df.into()))
591 }
592
593 #[pyo3(signature = (engine, lambda_post_opt))]
594 fn collect(
595 &self,
596 py: Python<'_>,
597 engine: Wrap<Engine>,
598 lambda_post_opt: Option<Py<PyAny>>,
599 ) -> PyResult<PyDataFrame> {
600 py.enter_polars_df(|| {
601 let ldf = self.ldf.read().clone();
602 if let Some(lambda) = lambda_post_opt {
603 ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
604 post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
605 })
606 } else {
607 ldf.collect_with_engine(engine.0).map(|r| match r {
608 QueryResult::Single(df) => df,
609 QueryResult::Multiple(_) => DataFrame::empty(),
611 })
612 }
613 })
614 }
615
616 #[cfg(feature = "async")]
617 #[pyo3(signature = (engine, lambda))]
618 fn collect_with_callback(
619 &self,
620 py: Python<'_>,
621 engine: Wrap<Engine>,
622 lambda: Py<PyAny>,
623 ) -> PyResult<()> {
624 py.enter_polars_ok(|| {
625 let ldf = self.ldf.read().clone();
626
627 polars_core::runtime::ASYNC.spawn_blocking(move || {
630 let result = ldf
631 .collect_with_engine(engine.0)
632 .map(|r| match r {
633 QueryResult::Single(df) => df,
634 QueryResult::Multiple(_) => DataFrame::empty(),
636 })
637 .map(PyDataFrame::new)
638 .map_err(PyPolarsErr::from);
639
640 Python::attach(|py| match result {
641 Ok(df) => {
642 lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
643 },
644 Err(err) => {
645 lambda
646 .call1(py, (PyErr::from(err),))
647 .map_err(|err| err.restore(py))
648 .ok();
649 },
650 });
651 });
652 })
653 }
654
655 #[cfg(feature = "async")]
656 fn collect_batches(
657 &self,
658 py: Python<'_>,
659 engine: Wrap<Engine>,
660 maintain_order: bool,
661 chunk_size: Option<NonZeroUsize>,
662 lazy: bool,
663 ) -> PyResult<PyCollectBatches> {
664 py.enter_polars(|| {
665 let ldf = self.ldf.read().clone();
666
667 let collect_batches = ldf
668 .clone()
669 .collect_batches(engine.0, maintain_order, chunk_size, lazy)
670 .map_err(PyPolarsErr::from)?;
671
672 PyResult::Ok(PyCollectBatches {
673 inner: Arc::new(Mutex::new(collect_batches)),
674 ldf,
675 })
676 })
677 }
678
679 #[cfg(feature = "parquet")]
680 #[pyo3(signature = (
681 target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size,
682 metadata, arrow_schema
683 ))]
684 fn sink_parquet(
685 &self,
686 py: Python<'_>,
687 target: PyFileSinkDestination,
688 sink_options: PySinkOptions,
689 compression: &str,
690 compression_level: Option<i32>,
691 statistics: Wrap<StatisticsOptions>,
692 row_group_size: Option<usize>,
693 data_page_size: Option<usize>,
694 metadata: Wrap<Option<KeyValueMetadata>>,
695 arrow_schema: Option<Wrap<ArrowSchema>>,
696 ) -> PyResult<PyLazyFrame> {
697 let compression = parse_parquet_compression(compression, compression_level)?;
698
699 let options = ParquetWriteOptions {
700 compression,
701 statistics: statistics.0,
702 row_group_size,
703 data_page_size,
704 key_value_metadata: metadata.0,
705 arrow_schema: arrow_schema.map(|x| Arc::new(x.0)),
706 compat_level: None,
707 };
708
709 let target = target.extract_file_sink_destination()?;
710 let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
711
712 py.enter_polars(|| {
713 self.ldf
714 .read()
715 .clone()
716 .sink(
717 target,
718 FileWriteFormat::Parquet(Arc::new(options)),
719 unified_sink_args,
720 )
721 .into()
722 })
723 .map(Into::into)
724 .map_err(Into::into)
725 }
726
727 #[cfg(feature = "ipc")]
728 #[pyo3(signature = (
729 target, sink_options, compression, compat_level, record_batch_size, record_batch_statistics
730 ))]
731 fn sink_ipc(
732 &self,
733 py: Python<'_>,
734 target: PyFileSinkDestination,
735 sink_options: PySinkOptions,
736 compression: Wrap<Option<IpcCompression>>,
737 compat_level: PyCompatLevel,
738 record_batch_size: Option<usize>,
739 record_batch_statistics: bool,
740 ) -> PyResult<PyLazyFrame> {
741 let options = IpcWriterOptions {
742 compression: compression.0,
743 compat_level: compat_level.0,
744 record_batch_size,
745 record_batch_statistics,
746 };
747
748 let target = target.extract_file_sink_destination()?;
749 let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
750
751 py.enter_polars(|| {
752 self.ldf
753 .read()
754 .clone()
755 .sink(target, FileWriteFormat::Ipc(options), unified_sink_args)
756 .into()
757 })
758 .map(Into::into)
759 .map_err(Into::into)
760 }
761
762 #[cfg(feature = "csv")]
763 #[pyo3(signature = (
764 target, sink_options, include_bom, compression, compression_level, check_extension,
765 include_header, separator, line_terminator, quote_char, batch_size, datetime_format,
766 date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
767 quote_style
768 ))]
769 fn sink_csv(
770 &self,
771 py: Python<'_>,
772 target: PyFileSinkDestination,
773 sink_options: PySinkOptions,
774 include_bom: bool,
775 compression: &str,
776 compression_level: Option<u32>,
777 check_extension: bool,
778 include_header: bool,
779 separator: u8,
780 line_terminator: Wrap<PlSmallStr>,
781 quote_char: u8,
782 batch_size: NonZeroUsize,
783 datetime_format: Option<Wrap<PlSmallStr>>,
784 date_format: Option<Wrap<PlSmallStr>>,
785 time_format: Option<Wrap<PlSmallStr>>,
786 float_scientific: Option<bool>,
787 float_precision: Option<usize>,
788 decimal_comma: bool,
789 null_value: Option<Wrap<PlSmallStr>>,
790 quote_style: Option<Wrap<QuoteStyle>>,
791 ) -> PyResult<PyLazyFrame> {
792 let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
793 let null_value = null_value
794 .map(|x| x.0)
795 .unwrap_or(SerializeOptions::default().null);
796
797 let serialize_options = SerializeOptions {
798 date_format: date_format.map(|x| x.0),
799 time_format: time_format.map(|x| x.0),
800 datetime_format: datetime_format.map(|x| x.0),
801 float_scientific,
802 float_precision,
803 decimal_comma,
804 separator,
805 quote_char,
806 null: null_value,
807 line_terminator: line_terminator.0,
808 quote_style,
809 };
810
811 let options = CsvWriterOptions {
812 include_bom,
813 compression: ExternalCompression::try_from(compression, compression_level)
814 .map_err(PyPolarsErr::from)?,
815 check_extension,
816 include_header,
817 batch_size,
818 serialize_options: serialize_options.into(),
819 };
820
821 let target = target.extract_file_sink_destination()?;
822 let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
823
824 py.enter_polars(|| {
825 self.ldf
826 .read()
827 .clone()
828 .sink(target, FileWriteFormat::Csv(options), unified_sink_args)
829 .into()
830 })
831 .map(Into::into)
832 .map_err(Into::into)
833 }
834
835 #[allow(clippy::too_many_arguments)]
836 #[cfg(feature = "json")]
837 #[pyo3(signature = (target, compression, compression_level, check_extension, sink_options))]
838 fn sink_ndjson(
839 &self,
840 py: Python<'_>,
841 target: PyFileSinkDestination,
842 compression: &str,
843 compression_level: Option<u32>,
844 check_extension: bool,
845 sink_options: PySinkOptions,
846 ) -> PyResult<PyLazyFrame> {
847 let options = NDJsonWriterOptions {
848 compression: ExternalCompression::try_from(compression, compression_level)
849 .map_err(PyPolarsErr::from)?,
850 check_extension,
851 };
852
853 let target = target.extract_file_sink_destination()?;
854 let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
855
856 py.enter_polars(|| {
857 self.ldf
858 .read()
859 .clone()
860 .sink(target, FileWriteFormat::NDJson(options), unified_sink_args)
861 .into()
862 })
863 .map(Into::into)
864 .map_err(Into::into)
865 }
866
867 #[pyo3(signature = (function, maintain_order, chunk_size))]
868 pub fn sink_batches(
869 &self,
870 py: Python<'_>,
871 function: Py<PyAny>,
872 maintain_order: bool,
873 chunk_size: Option<NonZeroUsize>,
874 ) -> PyResult<PyLazyFrame> {
875 let ldf = self.ldf.read().clone();
876 py.enter_polars(|| {
877 ldf.sink_batches(
878 PlanCallback::new_python(PythonObject(function)),
879 maintain_order,
880 chunk_size,
881 )
882 })
883 .map(Into::into)
884 .map_err(Into::into)
885 }
886
887 pub fn sink_iceberg(&self, py: Python<'_>, sink_state_obj: Py<PyAny>) -> PyResult<PyLazyFrame> {
888 let sink_state: IcebergSinkState = sink_state_obj.extract(py)?;
889 let mut ldf = { self.ldf.read().clone() };
890
891 ldf.logical_plan = DslPlan::Sink {
892 input: Arc::new(ldf.logical_plan),
893 payload: SinkType::Iceberg(sink_state),
894 };
895
896 Ok(ldf.into())
897 }
898
899 fn filter(&self, predicate: PyExpr) -> Self {
900 self.ldf.read().clone().filter(predicate.inner).into()
901 }
902
903 fn remove(&self, predicate: PyExpr) -> Self {
904 let ldf = self.ldf.read().clone();
905 ldf.remove(predicate.inner).into()
906 }
907
908 fn select(&self, exprs: Vec<PyExpr>) -> Self {
909 let ldf = self.ldf.read().clone();
910 let exprs = exprs.to_exprs();
911 ldf.select(exprs).into()
912 }
913
914 fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
915 let ldf = self.ldf.read().clone();
916 let exprs = exprs.to_exprs();
917 ldf.select_seq(exprs).into()
918 }
919
920 fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
921 let ldf = self.ldf.read().clone();
922 let by = by.to_exprs();
923 let lazy_gb = if maintain_order {
924 ldf.group_by_stable(by)
925 } else {
926 ldf.group_by(by)
927 };
928
929 PyLazyGroupBy { lgb: Some(lazy_gb) }
930 }
931
932 fn rolling(
933 &self,
934 index_column: PyExpr,
935 period: &str,
936 offset: &str,
937 closed: Wrap<ClosedWindow>,
938 by: Vec<PyExpr>,
939 ) -> PyResult<PyLazyGroupBy> {
940 let closed_window = closed.0;
941 let ldf = self.ldf.read().clone();
942 let by = by
943 .into_iter()
944 .map(|pyexpr| pyexpr.inner)
945 .collect::<Vec<_>>();
946 let lazy_gb = ldf.rolling(
947 index_column.inner,
948 by,
949 RollingGroupOptions {
950 index_column: "".into(),
951 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
952 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
953 closed_window,
954 },
955 );
956
957 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
958 }
959
960 fn group_by_dynamic(
961 &self,
962 index_column: PyExpr,
963 every: &str,
964 period: &str,
965 offset: &str,
966 label: Wrap<Label>,
967 include_boundaries: bool,
968 closed: Wrap<ClosedWindow>,
969 group_by: Vec<PyExpr>,
970 start_by: Wrap<StartBy>,
971 ) -> PyResult<PyLazyGroupBy> {
972 let closed_window = closed.0;
973 let group_by = group_by
974 .into_iter()
975 .map(|pyexpr| pyexpr.inner)
976 .collect::<Vec<_>>();
977 let ldf = self.ldf.read().clone();
978 let lazy_gb = ldf.group_by_dynamic(
979 index_column.inner,
980 group_by,
981 DynamicGroupOptions {
982 every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
983 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
984 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
985 label: label.0,
986 include_boundaries,
987 closed_window,
988 start_by: start_by.0,
989 ..Default::default()
990 },
991 );
992
993 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
994 }
995
996 fn with_context(&self, contexts: Vec<Self>) -> Self {
997 let contexts = contexts
998 .into_iter()
999 .map(|ldf| ldf.ldf.into_inner())
1000 .collect::<Vec<_>>();
1001 self.ldf.read().clone().with_context(contexts).into()
1002 }
1003
1004 #[cfg(feature = "asof_join")]
1005 #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1006 fn join_asof(
1007 &self,
1008 other: Self,
1009 left_on: PyExpr,
1010 right_on: PyExpr,
1011 left_by: Option<Vec<PyBackedStr>>,
1012 right_by: Option<Vec<PyBackedStr>>,
1013 allow_parallel: bool,
1014 force_parallel: bool,
1015 suffix: String,
1016 strategy: Wrap<AsofStrategy>,
1017 tolerance: Option<Wrap<AnyValue<'_>>>,
1018 tolerance_str: Option<String>,
1019 coalesce: bool,
1020 allow_eq: bool,
1021 check_sortedness: bool,
1022 ) -> PyResult<Self> {
1023 let coalesce = if coalesce {
1024 JoinCoalesce::CoalesceColumns
1025 } else {
1026 JoinCoalesce::KeepColumns
1027 };
1028 let ldf = self.ldf.read().clone();
1029 let other = other.ldf.into_inner();
1030 let left_on = left_on.inner;
1031 let right_on = right_on.inner;
1032 Ok(ldf
1033 .join_builder()
1034 .with(other)
1035 .left_on([left_on])
1036 .right_on([right_on])
1037 .allow_parallel(allow_parallel)
1038 .force_parallel(force_parallel)
1039 .coalesce(coalesce)
1040 .how(JoinType::AsOf(Box::new(AsOfOptions {
1041 strategy: strategy.0,
1042 left_by: left_by.map(strings_to_pl_smallstr),
1043 right_by: right_by.map(strings_to_pl_smallstr),
1044 tolerance: tolerance.map(|t| {
1045 let av = t.0.into_static();
1046 let dtype = av.dtype();
1047 Scalar::new(dtype, av)
1048 }),
1049 tolerance_str: tolerance_str.map(|s| s.into()),
1050 allow_eq,
1051 check_sortedness,
1052 })))
1053 .suffix(suffix)
1054 .finish()
1055 .into())
1056 }
1057
1058 #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1059 fn join(
1060 &self,
1061 other: Self,
1062 left_on: Vec<PyExpr>,
1063 right_on: Vec<PyExpr>,
1064 allow_parallel: bool,
1065 force_parallel: bool,
1066 nulls_equal: bool,
1067 how: Wrap<JoinType>,
1068 suffix: String,
1069 validate: Wrap<JoinValidation>,
1070 maintain_order: Wrap<MaintainOrderJoin>,
1071 coalesce: Option<bool>,
1072 ) -> PyResult<Self> {
1073 let coalesce = match coalesce {
1074 None => JoinCoalesce::JoinSpecific,
1075 Some(true) => JoinCoalesce::CoalesceColumns,
1076 Some(false) => JoinCoalesce::KeepColumns,
1077 };
1078 let ldf = self.ldf.read().clone();
1079 let other = other.ldf.into_inner();
1080 let left_on = left_on
1081 .into_iter()
1082 .map(|pyexpr| pyexpr.inner)
1083 .collect::<Vec<_>>();
1084 let right_on = right_on
1085 .into_iter()
1086 .map(|pyexpr| pyexpr.inner)
1087 .collect::<Vec<_>>();
1088
1089 Ok(ldf
1090 .join_builder()
1091 .with(other)
1092 .left_on(left_on)
1093 .right_on(right_on)
1094 .allow_parallel(allow_parallel)
1095 .force_parallel(force_parallel)
1096 .join_nulls(nulls_equal)
1097 .how(how.0)
1098 .suffix(suffix)
1099 .validate(validate.0)
1100 .coalesce(coalesce)
1101 .maintain_order(maintain_order.0)
1102 .finish()
1103 .into())
1104 }
1105
1106 fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1107 let ldf = self.ldf.read().clone();
1108 let other = other.ldf.into_inner();
1109
1110 let predicates = predicates.to_exprs();
1111
1112 Ok(ldf
1113 .join_builder()
1114 .with(other)
1115 .suffix(suffix)
1116 .join_where(predicates)
1117 .into())
1118 }
1119
1120 fn gather(&self, idxs: Self, null_on_oob: bool) -> Self {
1121 let ldf = self.ldf.read().clone();
1122 let idxs = idxs.ldf.into_inner();
1123 ldf.gather(idxs, null_on_oob).into()
1124 }
1125
1126 fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1127 let ldf = self.ldf.read().clone();
1128 ldf.with_columns(exprs.to_exprs()).into()
1129 }
1130
1131 fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1132 let ldf = self.ldf.read().clone();
1133 ldf.with_columns_seq(exprs.to_exprs()).into()
1134 }
1135
1136 fn match_to_schema<'py>(
1137 &self,
1138 schema: Wrap<Schema>,
1139 missing_columns: &Bound<'py, PyAny>,
1140 missing_struct_fields: &Bound<'py, PyAny>,
1141 extra_columns: Wrap<ExtraColumnsPolicy>,
1142 extra_struct_fields: &Bound<'py, PyAny>,
1143 integer_cast: &Bound<'py, PyAny>,
1144 float_cast: &Bound<'py, PyAny>,
1145 ) -> PyResult<Self> {
1146 fn parse_missing_columns<'py>(
1147 schema: &Schema,
1148 missing_columns: &Bound<'py, PyAny>,
1149 ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1150 let mut out = Vec::with_capacity(schema.len());
1151 if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1152 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1153 } else if let Ok(dict) = missing_columns.cast::<PyDict>() {
1154 out.extend(std::iter::repeat_n(
1155 MissingColumnsPolicyOrExpr::Raise,
1156 schema.len(),
1157 ));
1158 for (key, value) in dict.iter() {
1159 let key = key.extract::<String>()?;
1160 let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1161 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1162 }
1163 } else {
1164 return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1165 }
1166 Ok(out)
1167 }
1168 fn parse_missing_struct_fields<'py>(
1169 schema: &Schema,
1170 missing_struct_fields: &Bound<'py, PyAny>,
1171 ) -> PyResult<Vec<MissingColumnsPolicy>> {
1172 let mut out = Vec::with_capacity(schema.len());
1173 if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1174 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1175 } else if let Ok(dict) = missing_struct_fields.cast::<PyDict>() {
1176 out.extend(std::iter::repeat_n(
1177 MissingColumnsPolicy::Raise,
1178 schema.len(),
1179 ));
1180 for (key, value) in dict.iter() {
1181 let key = key.extract::<String>()?;
1182 let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1183 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1184 }
1185 } else {
1186 return Err(PyTypeError::new_err(
1187 "Invalid value for `missing_struct_fields`",
1188 ));
1189 }
1190 Ok(out)
1191 }
1192 fn parse_extra_struct_fields<'py>(
1193 schema: &Schema,
1194 extra_struct_fields: &Bound<'py, PyAny>,
1195 ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1196 let mut out = Vec::with_capacity(schema.len());
1197 if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1198 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1199 } else if let Ok(dict) = extra_struct_fields.cast::<PyDict>() {
1200 out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1201 for (key, value) in dict.iter() {
1202 let key = key.extract::<String>()?;
1203 let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1204 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1205 }
1206 } else {
1207 return Err(PyTypeError::new_err(
1208 "Invalid value for `extra_struct_fields`",
1209 ));
1210 }
1211 Ok(out)
1212 }
1213 fn parse_cast<'py>(
1214 schema: &Schema,
1215 cast: &Bound<'py, PyAny>,
1216 ) -> PyResult<Vec<UpcastOrForbid>> {
1217 let mut out = Vec::with_capacity(schema.len());
1218 if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1219 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1220 } else if let Ok(dict) = cast.cast::<PyDict>() {
1221 out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1222 for (key, value) in dict.iter() {
1223 let key = key.extract::<String>()?;
1224 let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1225 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1226 }
1227 } else {
1228 return Err(PyTypeError::new_err(
1229 "Invalid value for `integer_cast` / `float_cast`",
1230 ));
1231 }
1232 Ok(out)
1233 }
1234
1235 let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1236 let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1237 let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1238 let integer_cast = parse_cast(&schema.0, integer_cast)?;
1239 let float_cast = parse_cast(&schema.0, float_cast)?;
1240
1241 let per_column = (0..schema.0.len())
1242 .map(|i| MatchToSchemaPerColumn {
1243 missing_columns: missing_columns[i].clone(),
1244 missing_struct_fields: missing_struct_fields[i],
1245 extra_struct_fields: extra_struct_fields[i],
1246 integer_cast: integer_cast[i],
1247 float_cast: float_cast[i],
1248 })
1249 .collect();
1250
1251 let ldf = self.ldf.read().clone();
1252 Ok(ldf
1253 .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1254 .into())
1255 }
1256
1257 fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {
1258 let ldf = self.ldf.read().clone();
1259 let function = PythonObject(callback);
1260 ldf.pipe_with_schema(PlanCallback::new_python(function))
1261 .into()
1262 }
1263
1264 fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1265 let ldf = self.ldf.read().clone();
1266 ldf.rename(existing, new, strict).into()
1267 }
1268
1269 fn reverse(&self) -> Self {
1270 let ldf = self.ldf.read().clone();
1271 ldf.reverse().into()
1272 }
1273
1274 #[pyo3(signature = (n, fill_value=None))]
1275 fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1276 let lf = self.ldf.read().clone();
1277 let out = match fill_value {
1278 Some(v) => lf.shift_and_fill(n.inner, v.inner),
1279 None => lf.shift(n.inner),
1280 };
1281 out.into()
1282 }
1283
1284 fn fill_nan(&self, fill_value: PyExpr) -> Self {
1285 let ldf = self.ldf.read().clone();
1286 ldf.fill_nan(fill_value.inner).into()
1287 }
1288
1289 fn min(&self) -> Self {
1290 let ldf = self.ldf.read().clone();
1291 let out = ldf.min();
1292 out.into()
1293 }
1294
1295 fn max(&self) -> Self {
1296 let ldf = self.ldf.read().clone();
1297 let out = ldf.max();
1298 out.into()
1299 }
1300
1301 fn sum(&self) -> Self {
1302 let ldf = self.ldf.read().clone();
1303 let out = ldf.sum();
1304 out.into()
1305 }
1306
1307 fn mean(&self) -> Self {
1308 let ldf = self.ldf.read().clone();
1309 let out = ldf.mean();
1310 out.into()
1311 }
1312
1313 fn std(&self, ddof: u8) -> Self {
1314 let ldf = self.ldf.read().clone();
1315 let out = ldf.std(ddof);
1316 out.into()
1317 }
1318
1319 fn var(&self, ddof: u8) -> Self {
1320 let ldf = self.ldf.read().clone();
1321 let out = ldf.var(ddof);
1322 out.into()
1323 }
1324
1325 fn median(&self) -> Self {
1326 let ldf = self.ldf.read().clone();
1327 let out = ldf.median();
1328 out.into()
1329 }
1330
1331 fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1332 let ldf = self.ldf.read().clone();
1333 let out = ldf.quantile(quantile.inner, interpolation.0);
1334 out.into()
1335 }
1336
1337 fn explode(&self, subset: PySelector, empty_as_null: bool, keep_nulls: bool) -> Self {
1338 self.ldf
1339 .read()
1340 .clone()
1341 .explode(
1342 subset.inner,
1343 ExplodeOptions {
1344 empty_as_null,
1345 keep_nulls,
1346 },
1347 )
1348 .into()
1349 }
1350
1351 fn null_count(&self) -> Self {
1352 let ldf = self.ldf.read().clone();
1353 ldf.null_count().into()
1354 }
1355
1356 #[pyo3(signature = (maintain_order, subset, keep))]
1357 fn unique(
1358 &self,
1359 maintain_order: bool,
1360 subset: Option<Vec<PyExpr>>,
1361 keep: Wrap<UniqueKeepStrategy>,
1362 ) -> Self {
1363 let ldf = self.ldf.read().clone();
1364 let subset = subset.map(|exprs| exprs.into_iter().map(|e| e.inner).collect());
1365 match maintain_order {
1366 true => ldf.unique_stable_generic(subset, keep.0),
1367 false => ldf.unique_generic(subset, keep.0),
1368 }
1369 .into()
1370 }
1371
1372 fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1373 self.ldf
1374 .read()
1375 .clone()
1376 .drop_nans(subset.map(|e| e.inner))
1377 .into()
1378 }
1379
1380 fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1381 self.ldf
1382 .read()
1383 .clone()
1384 .drop_nulls(subset.map(|e| e.inner))
1385 .into()
1386 }
1387
1388 #[pyo3(signature = (offset, len=None))]
1389 fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1390 let ldf = self.ldf.read().clone();
1391 ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1392 }
1393
1394 fn tail(&self, n: IdxSize) -> Self {
1395 let ldf = self.ldf.read().clone();
1396 ldf.tail(n).into()
1397 }
1398
1399 #[cfg(feature = "pivot")]
1400 #[pyo3(signature = (on, on_columns, index, values, agg, maintain_order, separator, column_naming))]
1401 fn pivot(
1402 &self,
1403 on: PySelector,
1404 on_columns: PyDataFrame,
1405 index: PySelector,
1406 values: PySelector,
1407 agg: PyExpr,
1408 maintain_order: bool,
1409 separator: String,
1410 column_naming: Wrap<PivotColumnNaming>,
1411 ) -> Self {
1412 let ldf = self.ldf.read().clone();
1413 ldf.pivot(
1414 on.inner,
1415 Arc::new(on_columns.df.read().clone()),
1416 index.inner,
1417 values.inner,
1418 agg.inner,
1419 maintain_order,
1420 separator.into(),
1421 column_naming.0,
1422 )
1423 .into()
1424 }
1425
1426 #[cfg(feature = "pivot")]
1427 #[pyo3(signature = (on, index, value_name, variable_name))]
1428 fn unpivot(
1429 &self,
1430 on: Option<PySelector>,
1431 index: PySelector,
1432 value_name: Option<String>,
1433 variable_name: Option<String>,
1434 ) -> Self {
1435 let args = UnpivotArgsDSL {
1436 on: on.map(|on| on.inner),
1437 index: index.inner,
1438 value_name: value_name.map(|s| s.into()),
1439 variable_name: variable_name.map(|s| s.into()),
1440 };
1441
1442 let ldf = self.ldf.read().clone();
1443 ldf.unpivot(args).into()
1444 }
1445
1446 #[pyo3(signature = (name, offset=None))]
1447 fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1448 let ldf = self.ldf.read().clone();
1449 ldf.with_row_index(name, offset).into()
1450 }
1451
1452 #[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1453 fn map_batches(
1454 &self,
1455 function: Py<PyAny>,
1456 predicate_pushdown: bool,
1457 projection_pushdown: bool,
1458 slice_pushdown: bool,
1459 streamable: bool,
1460 schema: Option<Wrap<Schema>>,
1461 validate_output: bool,
1462 ) -> Self {
1463 let mut opt = OptFlags::default();
1464 opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1465 opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1466 opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1467 opt.set(OptFlags::STREAMING, streamable);
1468
1469 self.ldf
1470 .read()
1471 .clone()
1472 .map_python(
1473 function.into(),
1474 opt,
1475 schema.map(|s| Arc::new(s.0)),
1476 validate_output,
1477 )
1478 .into()
1479 }
1480
1481 fn drop(&self, columns: PySelector) -> Self {
1482 self.ldf.read().clone().drop(columns.inner).into()
1483 }
1484
1485 fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1486 let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1487 cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1488 self.ldf.read().clone().cast(cast_map, strict).into()
1489 }
1490
1491 fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1492 self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1493 }
1494
1495 fn clone(&self) -> Self {
1496 self.ldf.read().clone().into()
1497 }
1498
1499 fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1500 let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1501
1502 let schema_dict = PyDict::new(py);
1503 schema.iter_fields().for_each(|fld| {
1504 schema_dict
1505 .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1506 .unwrap()
1507 });
1508 Ok(schema_dict)
1509 }
1510
1511 fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {
1512 self.ldf
1513 .read()
1514 .clone()
1515 .unnest(columns.inner, separator.map(PlSmallStr::from_str))
1516 .into()
1517 }
1518
1519 fn count(&self) -> Self {
1520 let ldf = self.ldf.read().clone();
1521 ldf.count().into()
1522 }
1523
1524 #[cfg(feature = "merge_sorted")]
1525 fn merge_sorted(&self, other: Self, key: &str, maintain_order: bool) -> PyResult<Self> {
1526 let out = self
1527 .ldf
1528 .read()
1529 .clone()
1530 .merge_sorted(other.ldf.into_inner(), key, maintain_order)
1531 .map_err(PyPolarsErr::from)?;
1532 Ok(out.into())
1533 }
1534
1535 fn _node_name(&self) -> &str {
1536 let plan = &self.ldf.read().logical_plan;
1537 plan.into()
1538 }
1539
1540 fn hint_sorted(
1541 &self,
1542 columns: Vec<String>,
1543 descending: Vec<bool>,
1544 nulls_last: Vec<bool>,
1545 ) -> PyResult<Self> {
1546 if columns.len() != descending.len() && descending.len() != 1 {
1547 return Err(PyValueError::new_err(
1548 "`set_sorted` expects the same amount of `columns` as `descending` values.",
1549 ));
1550 }
1551 if columns.len() != nulls_last.len() && nulls_last.len() != 1 {
1552 return Err(PyValueError::new_err(
1553 "`set_sorted` expects the same amount of `columns` as `nulls_last` values.",
1554 ));
1555 }
1556
1557 let mut sorted = columns
1558 .iter()
1559 .map(|c| Sorted {
1560 column: PlSmallStr::from_str(c.as_str()),
1561 descending: Some(false),
1562 nulls_last: Some(false),
1563 })
1564 .collect::<Vec<_>>();
1565
1566 if !columns.is_empty() {
1567 if descending.len() != 1 {
1568 sorted
1569 .iter_mut()
1570 .zip(descending)
1571 .for_each(|(s, d)| s.descending = Some(d));
1572 } else if descending[0] {
1573 sorted.iter_mut().for_each(|s| s.descending = Some(true));
1574 }
1575
1576 if nulls_last.len() != 1 {
1577 sorted
1578 .iter_mut()
1579 .zip(nulls_last)
1580 .for_each(|(s, d)| s.nulls_last = Some(d));
1581 } else if nulls_last[0] {
1582 sorted.iter_mut().for_each(|s| s.nulls_last = Some(true));
1583 }
1584 }
1585
1586 let out = self
1587 .ldf
1588 .read()
1589 .clone()
1590 .hint(HintIR::Sorted(sorted.into()))
1591 .map_err(PyPolarsErr::from)?;
1592 Ok(out.into())
1593 }
1594}
1595
1596#[pyclass(frozen)]
1597struct PyCollectBatches {
1598 inner: Arc<Mutex<CollectBatches>>,
1599 ldf: LazyFrame,
1600}
1601
1602#[pymethods]
1603impl PyCollectBatches {
1604 fn start(&self) {
1605 self.inner.lock().start();
1606 }
1607
1608 fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
1609 slf
1610 }
1611
1612 fn __next__(slf: PyRef<'_, Self>, py: Python) -> PyResult<Option<PyDataFrame>> {
1613 let inner = Arc::clone(&slf.inner);
1614 py.enter_polars(|| PolarsResult::Ok(inner.lock().next().transpose()?.map(PyDataFrame::new)))
1615 }
1616
1617 #[allow(unused_variables)]
1618 #[pyo3(signature = (requested_schema=None))]
1619 fn __arrow_c_stream__<'py>(
1620 &self,
1621 py: Python<'py>,
1622 requested_schema: Option<Py<PyAny>>,
1623 ) -> PyResult<Bound<'py, PyCapsule>> {
1624 let mut ldf = self.ldf.clone();
1625 let schema = ldf
1626 .collect_schema()
1627 .map_err(PyPolarsErr::from)?
1628 .to_arrow(CompatLevel::newest());
1629
1630 let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());
1631
1632 let iter = Box::new(ArrowStreamIterator::new(self.inner.clone(), dtype.clone()));
1633 let field = ArrowField::new(PlSmallStr::EMPTY, dtype, false);
1634 let stream = export_iterator(iter, field);
1635 let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
1636 PyCapsule::new(py, stream, Some(stream_capsule_name))
1637 }
1638}
1639
1640pub struct ArrowStreamIterator {
1641 inner: Arc<Mutex<CollectBatches>>,
1642 dtype: ArrowDataType,
1643}
1644
1645impl ArrowStreamIterator {
1646 fn new(inner: Arc<Mutex<CollectBatches>>, schema: ArrowDataType) -> Self {
1647 Self {
1648 inner,
1649 dtype: schema,
1650 }
1651 }
1652}
1653
1654impl Iterator for ArrowStreamIterator {
1655 type Item = PolarsResult<ArrayRef>;
1656
1657 fn next(&mut self) -> Option<Self::Item> {
1658 let next = self.inner.lock().next();
1659 match next {
1660 None => None,
1661 Some(Err(err)) => Some(Err(err)),
1662 Some(Ok(df)) => {
1663 let height = df.height();
1664 let arrays = df.rechunk_into_arrow(CompatLevel::newest());
1665 Some(Ok(Box::new(arrow::array::StructArray::new(
1666 self.dtype.clone(),
1667 height,
1668 arrays,
1669 None,
1670 ))))
1671 },
1672 }
1673 }
1674}