1use std::collections::HashMap;
2use std::ffi::CString;
3use std::num::NonZeroUsize;
4
5use arrow::ffi::export_iterator;
6use either::Either;
7use parking_lot::Mutex;
8use polars::io::RowIndex;
9use polars::time::*;
10use polars_core::prelude::*;
11#[cfg(feature = "parquet")]
12use polars_parquet::arrow::write::StatisticsOptions;
13use polars_plan::dsl::ScanSources;
14use polars_plan::plans::{AExpr, HintIR, IR, Sorted};
15use polars_utils::arena::{Arena, Node};
16use polars_utils::python_function::PythonObject;
17use pyo3::exceptions::{PyTypeError, PyValueError};
18use pyo3::prelude::*;
19use pyo3::pybacked::PyBackedStr;
20use pyo3::types::{PyCapsule, PyDict, PyDictMethods, PyList};
21
22use super::{PyLazyFrame, PyOptFlags};
23use crate::error::PyPolarsErr;
24use crate::expr::ToExprs;
25use crate::expr::datatype::PyDataTypeExpr;
26use crate::expr::selector::PySelector;
27use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
28#[cfg(feature = "json")]
29use crate::io::cloud_options::OptPyCloudOptions;
30use crate::io::scan_options::PyScanOptions;
31use crate::io::sink_options::PySinkOptions;
32use crate::io::sink_output::PyFileSinkDestination;
33use crate::lazyframe::visit::NodeTraverser;
34use crate::prelude::*;
35use crate::utils::{EnterPolarsExt, to_py_err};
36use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
37
38fn pyobject_to_first_path_and_scan_sources(
39 obj: Py<PyAny>,
40) -> PyResult<(Option<PlRefPath>, ScanSources)> {
41 use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
42 Ok(match get_python_scan_source_input(obj, false)? {
43 PythonScanSourceInput::Path(path) => (
44 Some(path.clone()),
45 ScanSources::Paths(FromIterator::from_iter([path])),
46 ),
47 PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
48 PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
49 })
50}
51
52fn post_opt_callback(
53 lambda: &Py<PyAny>,
54 root: Node,
55 lp_arena: &mut Arena<IR>,
56 expr_arena: &mut Arena<AExpr>,
57 duration_since_start: Option<std::time::Duration>,
58) -> PolarsResult<()> {
59 Python::attach(|py| {
60 let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
61
62 let arenas = nt.get_arenas();
64
65 lambda
68 .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
69 .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
70
71 std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
75 std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
76
77 Ok(())
78 })
79}
80
81#[pymethods]
82#[allow(clippy::should_implement_trait)]
83impl PyLazyFrame {
84 #[staticmethod]
85 #[cfg(feature = "json")]
86 #[allow(clippy::too_many_arguments)]
87 #[pyo3(signature = (
88 source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
89 row_index, ignore_errors, include_file_paths, cloud_options, credential_provider
90 ))]
91 fn new_from_ndjson(
92 source: Option<Py<PyAny>>,
93 sources: Wrap<ScanSources>,
94 infer_schema_length: Option<usize>,
95 schema: Option<Wrap<Schema>>,
96 schema_overrides: Option<Wrap<Schema>>,
97 batch_size: Option<NonZeroUsize>,
98 n_rows: Option<usize>,
99 low_memory: bool,
100 rechunk: bool,
101 row_index: Option<(String, IdxSize)>,
102 ignore_errors: bool,
103 include_file_paths: Option<String>,
104 cloud_options: OptPyCloudOptions,
105 credential_provider: Option<Py<PyAny>>,
106 ) -> PyResult<Self> {
107 let row_index = row_index.map(|(name, offset)| RowIndex {
108 name: name.into(),
109 offset,
110 });
111
112 let sources = sources.0;
113 let (first_path, sources) = match source {
114 None => (sources.first_path().cloned(), sources),
115 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
116 };
117
118 let mut r = LazyJsonLineReader::new_with_sources(sources);
119
120 if let Some(first_path) = first_path {
121 let first_path_url = first_path.as_str();
122
123 let cloud_options = cloud_options.extract_opt_cloud_options(
124 CloudScheme::from_path(first_path_url),
125 credential_provider,
126 )?;
127
128 r = r.with_cloud_options(cloud_options);
129 };
130
131 let lf = r
132 .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
133 .with_batch_size(batch_size)
134 .with_n_rows(n_rows)
135 .low_memory(low_memory)
136 .with_rechunk(rechunk)
137 .with_schema(schema.map(|schema| Arc::new(schema.0)))
138 .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
139 .with_row_index(row_index)
140 .with_ignore_errors(ignore_errors)
141 .with_include_file_paths(include_file_paths.map(|x| x.into()))
142 .finish()
143 .map_err(PyPolarsErr::from)?;
144
145 Ok(lf.into())
146 }
147
148 #[staticmethod]
149 #[cfg(feature = "csv")]
150 #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
151 low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
152 infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
153 encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
154 cloud_options, credential_provider, include_file_paths
155 )
156 )]
157 fn new_from_csv(
158 source: Option<Py<PyAny>>,
159 sources: Wrap<ScanSources>,
160 separator: &str,
161 has_header: bool,
162 ignore_errors: bool,
163 skip_rows: usize,
164 skip_lines: usize,
165 n_rows: Option<usize>,
166 cache: bool,
167 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
168 low_memory: bool,
169 comment_prefix: Option<&str>,
170 quote_char: Option<&str>,
171 null_values: Option<Wrap<NullValues>>,
172 missing_utf8_is_empty_string: bool,
173 infer_schema_length: Option<usize>,
174 with_schema_modify: Option<Py<PyAny>>,
175 rechunk: bool,
176 skip_rows_after_header: usize,
177 encoding: Wrap<CsvEncoding>,
178 row_index: Option<(String, IdxSize)>,
179 try_parse_dates: bool,
180 eol_char: &str,
181 raise_if_empty: bool,
182 truncate_ragged_lines: bool,
183 decimal_comma: bool,
184 glob: bool,
185 schema: Option<Wrap<Schema>>,
186 cloud_options: OptPyCloudOptions,
187 credential_provider: Option<Py<PyAny>>,
188 include_file_paths: Option<String>,
189 ) -> PyResult<Self> {
190 let null_values = null_values.map(|w| w.0);
191 let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
192 let separator = separator
193 .as_bytes()
194 .first()
195 .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
196 .copied()
197 .map_err(PyPolarsErr::from)?;
198 let eol_char = eol_char
199 .as_bytes()
200 .first()
201 .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
202 .copied()
203 .map_err(PyPolarsErr::from)?;
204 let row_index = row_index.map(|(name, offset)| RowIndex {
205 name: name.into(),
206 offset,
207 });
208
209 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
210 overwrite_dtype
211 .into_iter()
212 .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
213 .collect::<Schema>()
214 });
215
216 let sources = sources.0;
217 let (first_path, sources) = match source {
218 None => (sources.first_path().cloned(), sources),
219 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
220 };
221
222 let mut r = LazyCsvReader::new_with_sources(sources);
223
224 if let Some(first_path) = first_path {
225 let first_path_url = first_path.as_str();
226 let cloud_options = cloud_options.extract_opt_cloud_options(
227 CloudScheme::from_path(first_path_url),
228 credential_provider,
229 )?;
230 r = r.with_cloud_options(cloud_options);
231 }
232
233 let mut r = r
234 .with_infer_schema_length(infer_schema_length)
235 .with_separator(separator)
236 .with_has_header(has_header)
237 .with_ignore_errors(ignore_errors)
238 .with_skip_rows(skip_rows)
239 .with_skip_lines(skip_lines)
240 .with_n_rows(n_rows)
241 .with_cache(cache)
242 .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
243 .with_schema(schema.map(|schema| Arc::new(schema.0)))
244 .with_low_memory(low_memory)
245 .with_comment_prefix(comment_prefix.map(|x| x.into()))
246 .with_quote_char(quote_char)
247 .with_eol_char(eol_char)
248 .with_rechunk(rechunk)
249 .with_skip_rows_after_header(skip_rows_after_header)
250 .with_encoding(encoding.0)
251 .with_row_index(row_index)
252 .with_try_parse_dates(try_parse_dates)
253 .with_null_values(null_values)
254 .with_missing_is_null(!missing_utf8_is_empty_string)
255 .with_truncate_ragged_lines(truncate_ragged_lines)
256 .with_decimal_comma(decimal_comma)
257 .with_glob(glob)
258 .with_raise_if_empty(raise_if_empty)
259 .with_include_file_paths(include_file_paths.map(|x| x.into()));
260
261 if let Some(lambda) = with_schema_modify {
262 let f = |schema: Schema| {
263 let iter = schema.iter_names().map(|s| s.as_str());
264 Python::attach(|py| {
265 let names = PyList::new(py, iter).unwrap();
266
267 let out = lambda.call1(py, (names,)).expect("python function failed");
268 let new_names = out
269 .extract::<Vec<String>>(py)
270 .expect("python function should return List[str]");
271 polars_ensure!(new_names.len() == schema.len(),
272 ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
273 );
274 Ok(schema
275 .iter_values()
276 .zip(new_names)
277 .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
278 .collect())
279 })
280 };
281 r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
282 }
283
284 Ok(r.finish().map_err(PyPolarsErr::from)?.into())
285 }
286
287 #[cfg(feature = "parquet")]
288 #[staticmethod]
289 #[pyo3(signature = (
290 sources, schema, scan_options, parallel, low_memory, use_statistics
291 ))]
292 fn new_from_parquet(
293 sources: Wrap<ScanSources>,
294 schema: Option<Wrap<Schema>>,
295 scan_options: PyScanOptions,
296 parallel: Wrap<ParallelStrategy>,
297 low_memory: bool,
298 use_statistics: bool,
299 ) -> PyResult<Self> {
300 use crate::utils::to_py_err;
301
302 let parallel = parallel.0;
303
304 let options = ParquetOptions {
305 schema: schema.map(|x| Arc::new(x.0)),
306 parallel,
307 low_memory,
308 use_statistics,
309 };
310
311 let sources = sources.0;
312 let first_path = sources.first_path();
313
314 let unified_scan_args =
315 scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
316
317 let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
318 .map_err(to_py_err)?
319 .build()
320 .into();
321
322 Ok(lf.into())
323 }
324
325 #[cfg(feature = "ipc")]
326 #[staticmethod]
327 #[pyo3(signature = (sources, record_batch_statistics, scan_options))]
328 fn new_from_ipc(
329 sources: Wrap<ScanSources>,
330 record_batch_statistics: bool,
331 scan_options: PyScanOptions,
332 ) -> PyResult<Self> {
333 let options = IpcScanOptions {
334 record_batch_statistics,
335 checked: Default::default(),
336 };
337
338 let sources = sources.0;
339 let first_path = sources.first_path().cloned();
340
341 let unified_scan_args =
342 scan_options.extract_unified_scan_args(first_path.as_ref().and_then(|x| x.scheme()))?;
343
344 let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)
345 .map_err(PyPolarsErr::from)?;
346 Ok(lf.into())
347 }
348
349 #[cfg(feature = "scan_lines")]
350 #[staticmethod]
351 #[pyo3(signature = (sources, scan_options, name))]
352 fn new_from_scan_lines(
353 sources: Wrap<ScanSources>,
354 scan_options: PyScanOptions,
355 name: PyBackedStr,
356 ) -> PyResult<Self> {
357 let sources = sources.0;
358 let first_path = sources.first_path();
359
360 let unified_scan_args =
361 scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
362
363 let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())
364 .map_err(to_py_err)?
365 .build();
366 let lf: LazyFrame = dsl.into();
367
368 Ok(lf.into())
369 }
370
371 #[staticmethod]
372 #[pyo3(signature = (
373 dataset_object
374 ))]
375 fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {
376 let lf =
377 LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
378 .into();
379
380 Ok(lf)
381 }
382
383 #[staticmethod]
384 fn scan_from_python_function_arrow_schema(
385 schema: &Bound<'_, PyList>,
386 scan_fn: Py<PyAny>,
387 pyarrow: bool,
388 validate_schema: bool,
389 is_pure: bool,
390 ) -> PyResult<Self> {
391 let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
392
393 Ok(LazyFrame::scan_from_python_function(
394 Either::Right(schema),
395 scan_fn,
396 pyarrow,
397 validate_schema,
398 is_pure,
399 )
400 .into())
401 }
402
403 #[staticmethod]
404 fn scan_from_python_function_pl_schema(
405 schema: Vec<(PyBackedStr, Wrap<DataType>)>,
406 scan_fn: Py<PyAny>,
407 pyarrow: bool,
408 validate_schema: bool,
409 is_pure: bool,
410 ) -> PyResult<Self> {
411 let schema = Arc::new(Schema::from_iter(
412 schema
413 .into_iter()
414 .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
415 ));
416 Ok(LazyFrame::scan_from_python_function(
417 Either::Right(schema),
418 scan_fn,
419 pyarrow,
420 validate_schema,
421 is_pure,
422 )
423 .into())
424 }
425
426 #[staticmethod]
427 fn scan_from_python_function_schema_function(
428 schema_fn: Py<PyAny>,
429 scan_fn: Py<PyAny>,
430 validate_schema: bool,
431 is_pure: bool,
432 ) -> PyResult<Self> {
433 Ok(LazyFrame::scan_from_python_function(
434 Either::Left(schema_fn),
435 scan_fn,
436 false,
437 validate_schema,
438 is_pure,
439 )
440 .into())
441 }
442
443 fn describe_plan(&self, py: Python) -> PyResult<String> {
444 py.enter_polars(|| self.ldf.read().describe_plan())
445 }
446
447 fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
448 py.enter_polars(|| self.ldf.read().describe_optimized_plan())
449 }
450
451 fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
452 py.enter_polars(|| self.ldf.read().describe_plan_tree())
453 }
454
455 fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
456 py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
457 }
458
459 fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
460 py.enter_polars(|| self.ldf.read().to_dot(optimized))
461 }
462
463 #[cfg(feature = "new_streaming")]
464 fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
465 py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
466 }
467
468 fn sort(
469 &self,
470 by_column: &str,
471 descending: bool,
472 nulls_last: bool,
473 maintain_order: bool,
474 multithreaded: bool,
475 ) -> Self {
476 let ldf = self.ldf.read().clone();
477 ldf.sort(
478 [by_column],
479 SortMultipleOptions {
480 descending: vec![descending],
481 nulls_last: vec![nulls_last],
482 multithreaded,
483 maintain_order,
484 limit: None,
485 },
486 )
487 .into()
488 }
489
490 fn sort_by_exprs(
491 &self,
492 by: Vec<PyExpr>,
493 descending: Vec<bool>,
494 nulls_last: Vec<bool>,
495 maintain_order: bool,
496 multithreaded: bool,
497 ) -> Self {
498 let ldf = self.ldf.read().clone();
499 let exprs = by.to_exprs();
500 ldf.sort_by_exprs(
501 exprs,
502 SortMultipleOptions {
503 descending,
504 nulls_last,
505 maintain_order,
506 multithreaded,
507 limit: None,
508 },
509 )
510 .into()
511 }
512
513 fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
514 let ldf = self.ldf.read().clone();
515 let exprs = by.to_exprs();
516 ldf.top_k(
517 k,
518 exprs,
519 SortMultipleOptions::new().with_order_descending_multi(reverse),
520 )
521 .into()
522 }
523
524 fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
525 let ldf = self.ldf.read().clone();
526 let exprs = by.to_exprs();
527 ldf.bottom_k(
528 k,
529 exprs,
530 SortMultipleOptions::new().with_order_descending_multi(reverse),
531 )
532 .into()
533 }
534
535 fn cache(&self) -> Self {
536 let ldf = self.ldf.read().clone();
537 ldf.cache().into()
538 }
539
540 #[pyo3(signature = (optflags))]
541 fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
542 let ldf = self.ldf.read().clone();
543 ldf.with_optimizations(optflags.inner.into_inner()).into()
544 }
545
546 #[pyo3(signature = (lambda_post_opt))]
547 fn profile(
548 &self,
549 py: Python<'_>,
550 lambda_post_opt: Option<Py<PyAny>>,
551 ) -> PyResult<(PyDataFrame, PyDataFrame)> {
552 let (df, time_df) = py.enter_polars(|| {
553 let ldf = self.ldf.read().clone();
554 if let Some(lambda) = lambda_post_opt {
555 ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
556 post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
557 })
558 } else {
559 ldf.profile()
560 }
561 })?;
562 Ok((df.into(), time_df.into()))
563 }
564
565 #[pyo3(signature = (engine, lambda_post_opt))]
566 fn collect(
567 &self,
568 py: Python<'_>,
569 engine: Wrap<Engine>,
570 lambda_post_opt: Option<Py<PyAny>>,
571 ) -> PyResult<PyDataFrame> {
572 py.enter_polars_df(|| {
573 let ldf = self.ldf.read().clone();
574 if let Some(lambda) = lambda_post_opt {
575 ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
576 post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
577 })
578 } else {
579 ldf.collect_with_engine(engine.0)
580 }
581 })
582 }
583
584 #[cfg(feature = "async")]
585 #[pyo3(signature = (engine, lambda))]
586 fn collect_with_callback(
587 &self,
588 py: Python<'_>,
589 engine: Wrap<Engine>,
590 lambda: Py<PyAny>,
591 ) -> PyResult<()> {
592 py.enter_polars_ok(|| {
593 let ldf = self.ldf.read().clone();
594
595 polars_io::pl_async::get_runtime().spawn_blocking(move || {
598 let result = ldf
599 .collect_with_engine(engine.0)
600 .map(PyDataFrame::new)
601 .map_err(PyPolarsErr::from);
602
603 Python::attach(|py| match result {
604 Ok(df) => {
605 lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
606 },
607 Err(err) => {
608 lambda
609 .call1(py, (PyErr::from(err),))
610 .map_err(|err| err.restore(py))
611 .ok();
612 },
613 });
614 });
615 })
616 }
617
618 #[cfg(feature = "async")]
619 fn collect_batches(
620 &self,
621 py: Python<'_>,
622 engine: Wrap<Engine>,
623 maintain_order: bool,
624 chunk_size: Option<NonZeroUsize>,
625 lazy: bool,
626 ) -> PyResult<PyCollectBatches> {
627 py.enter_polars(|| {
628 let ldf = self.ldf.read().clone();
629
630 let collect_batches = ldf
631 .clone()
632 .collect_batches(engine.0, maintain_order, chunk_size, lazy)
633 .map_err(PyPolarsErr::from)?;
634
635 PyResult::Ok(PyCollectBatches {
636 inner: Arc::new(Mutex::new(collect_batches)),
637 ldf,
638 })
639 })
640 }
641
642 #[cfg(feature = "parquet")]
643 #[pyo3(signature = (
644 target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size,
645 metadata, arrow_schema
646 ))]
647 fn sink_parquet(
648 &self,
649 py: Python<'_>,
650 target: PyFileSinkDestination,
651 sink_options: PySinkOptions,
652 compression: &str,
653 compression_level: Option<i32>,
654 statistics: Wrap<StatisticsOptions>,
655 row_group_size: Option<usize>,
656 data_page_size: Option<usize>,
657 metadata: Wrap<Option<KeyValueMetadata>>,
658 arrow_schema: Option<Wrap<ArrowSchema>>,
659 ) -> PyResult<PyLazyFrame> {
660 let compression = parse_parquet_compression(compression, compression_level)?;
661
662 let options = ParquetWriteOptions {
663 compression,
664 statistics: statistics.0,
665 row_group_size,
666 data_page_size,
667 key_value_metadata: metadata.0,
668 arrow_schema: arrow_schema.map(|x| Arc::new(x.0)),
669 compat_level: None,
670 };
671
672 let target = target.extract_file_sink_destination()?;
673 let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
674
675 py.enter_polars(|| {
676 self.ldf
677 .read()
678 .clone()
679 .sink(
680 target,
681 FileWriteFormat::Parquet(Arc::new(options)),
682 unified_sink_args,
683 )
684 .into()
685 })
686 .map(Into::into)
687 .map_err(Into::into)
688 }
689
690 #[cfg(feature = "ipc")]
691 #[pyo3(signature = (
692 target, sink_options, compression, compat_level, record_batch_size, record_batch_statistics
693 ))]
694 fn sink_ipc(
695 &self,
696 py: Python<'_>,
697 target: PyFileSinkDestination,
698 sink_options: PySinkOptions,
699 compression: Wrap<Option<IpcCompression>>,
700 compat_level: PyCompatLevel,
701 record_batch_size: Option<usize>,
702 record_batch_statistics: bool,
703 ) -> PyResult<PyLazyFrame> {
704 let options = IpcWriterOptions {
705 compression: compression.0,
706 compat_level: compat_level.0,
707 record_batch_size,
708 record_batch_statistics,
709 };
710
711 let target = target.extract_file_sink_destination()?;
712 let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
713
714 py.enter_polars(|| {
715 self.ldf
716 .read()
717 .clone()
718 .sink(target, FileWriteFormat::Ipc(options), unified_sink_args)
719 .into()
720 })
721 .map(Into::into)
722 .map_err(Into::into)
723 }
724
725 #[cfg(feature = "csv")]
726 #[pyo3(signature = (
727 target, sink_options, include_bom, compression, compression_level, check_extension,
728 include_header, separator, line_terminator, quote_char, batch_size, datetime_format,
729 date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
730 quote_style
731 ))]
732 fn sink_csv(
733 &self,
734 py: Python<'_>,
735 target: PyFileSinkDestination,
736 sink_options: PySinkOptions,
737 include_bom: bool,
738 compression: &str,
739 compression_level: Option<u32>,
740 check_extension: bool,
741 include_header: bool,
742 separator: u8,
743 line_terminator: Wrap<PlSmallStr>,
744 quote_char: u8,
745 batch_size: NonZeroUsize,
746 datetime_format: Option<Wrap<PlSmallStr>>,
747 date_format: Option<Wrap<PlSmallStr>>,
748 time_format: Option<Wrap<PlSmallStr>>,
749 float_scientific: Option<bool>,
750 float_precision: Option<usize>,
751 decimal_comma: bool,
752 null_value: Option<Wrap<PlSmallStr>>,
753 quote_style: Option<Wrap<QuoteStyle>>,
754 ) -> PyResult<PyLazyFrame> {
755 let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
756 let null_value = null_value
757 .map(|x| x.0)
758 .unwrap_or(SerializeOptions::default().null);
759
760 let serialize_options = SerializeOptions {
761 date_format: date_format.map(|x| x.0),
762 time_format: time_format.map(|x| x.0),
763 datetime_format: datetime_format.map(|x| x.0),
764 float_scientific,
765 float_precision,
766 decimal_comma,
767 separator,
768 quote_char,
769 null: null_value,
770 line_terminator: line_terminator.0,
771 quote_style,
772 };
773
774 let options = CsvWriterOptions {
775 include_bom,
776 compression: ExternalCompression::try_from(compression, compression_level)
777 .map_err(PyPolarsErr::from)?,
778 check_extension,
779 include_header,
780 batch_size,
781 serialize_options: serialize_options.into(),
782 };
783
784 let target = target.extract_file_sink_destination()?;
785 let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
786
787 py.enter_polars(|| {
788 self.ldf
789 .read()
790 .clone()
791 .sink(target, FileWriteFormat::Csv(options), unified_sink_args)
792 .into()
793 })
794 .map(Into::into)
795 .map_err(Into::into)
796 }
797
798 #[allow(clippy::too_many_arguments)]
799 #[cfg(feature = "json")]
800 #[pyo3(signature = (target, compression, compression_level, check_extension, sink_options))]
801 fn sink_ndjson(
802 &self,
803 py: Python<'_>,
804 target: PyFileSinkDestination,
805 compression: &str,
806 compression_level: Option<u32>,
807 check_extension: bool,
808 sink_options: PySinkOptions,
809 ) -> PyResult<PyLazyFrame> {
810 let options = NDJsonWriterOptions {
811 compression: ExternalCompression::try_from(compression, compression_level)
812 .map_err(PyPolarsErr::from)?,
813 check_extension,
814 };
815
816 let target = target.extract_file_sink_destination()?;
817 let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
818
819 py.enter_polars(|| {
820 self.ldf
821 .read()
822 .clone()
823 .sink(target, FileWriteFormat::NDJson(options), unified_sink_args)
824 .into()
825 })
826 .map(Into::into)
827 .map_err(Into::into)
828 }
829
830 #[pyo3(signature = (function, maintain_order, chunk_size))]
831 pub fn sink_batches(
832 &self,
833 py: Python<'_>,
834 function: Py<PyAny>,
835 maintain_order: bool,
836 chunk_size: Option<NonZeroUsize>,
837 ) -> PyResult<PyLazyFrame> {
838 let ldf = self.ldf.read().clone();
839 py.enter_polars(|| {
840 ldf.sink_batches(
841 PlanCallback::new_python(PythonObject(function)),
842 maintain_order,
843 chunk_size,
844 )
845 })
846 .map(Into::into)
847 .map_err(Into::into)
848 }
849
850 fn filter(&self, predicate: PyExpr) -> Self {
851 self.ldf.read().clone().filter(predicate.inner).into()
852 }
853
854 fn remove(&self, predicate: PyExpr) -> Self {
855 let ldf = self.ldf.read().clone();
856 ldf.remove(predicate.inner).into()
857 }
858
859 fn select(&self, exprs: Vec<PyExpr>) -> Self {
860 let ldf = self.ldf.read().clone();
861 let exprs = exprs.to_exprs();
862 ldf.select(exprs).into()
863 }
864
865 fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
866 let ldf = self.ldf.read().clone();
867 let exprs = exprs.to_exprs();
868 ldf.select_seq(exprs).into()
869 }
870
871 fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
872 let ldf = self.ldf.read().clone();
873 let by = by.to_exprs();
874 let lazy_gb = if maintain_order {
875 ldf.group_by_stable(by)
876 } else {
877 ldf.group_by(by)
878 };
879
880 PyLazyGroupBy { lgb: Some(lazy_gb) }
881 }
882
883 fn rolling(
884 &self,
885 index_column: PyExpr,
886 period: &str,
887 offset: &str,
888 closed: Wrap<ClosedWindow>,
889 by: Vec<PyExpr>,
890 ) -> PyResult<PyLazyGroupBy> {
891 let closed_window = closed.0;
892 let ldf = self.ldf.read().clone();
893 let by = by
894 .into_iter()
895 .map(|pyexpr| pyexpr.inner)
896 .collect::<Vec<_>>();
897 let lazy_gb = ldf.rolling(
898 index_column.inner,
899 by,
900 RollingGroupOptions {
901 index_column: "".into(),
902 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
903 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
904 closed_window,
905 },
906 );
907
908 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
909 }
910
911 fn group_by_dynamic(
912 &self,
913 index_column: PyExpr,
914 every: &str,
915 period: &str,
916 offset: &str,
917 label: Wrap<Label>,
918 include_boundaries: bool,
919 closed: Wrap<ClosedWindow>,
920 group_by: Vec<PyExpr>,
921 start_by: Wrap<StartBy>,
922 ) -> PyResult<PyLazyGroupBy> {
923 let closed_window = closed.0;
924 let group_by = group_by
925 .into_iter()
926 .map(|pyexpr| pyexpr.inner)
927 .collect::<Vec<_>>();
928 let ldf = self.ldf.read().clone();
929 let lazy_gb = ldf.group_by_dynamic(
930 index_column.inner,
931 group_by,
932 DynamicGroupOptions {
933 every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
934 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
935 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
936 label: label.0,
937 include_boundaries,
938 closed_window,
939 start_by: start_by.0,
940 ..Default::default()
941 },
942 );
943
944 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
945 }
946
947 fn with_context(&self, contexts: Vec<Self>) -> Self {
948 let contexts = contexts
949 .into_iter()
950 .map(|ldf| ldf.ldf.into_inner())
951 .collect::<Vec<_>>();
952 self.ldf.read().clone().with_context(contexts).into()
953 }
954
955 #[cfg(feature = "asof_join")]
956 #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
957 fn join_asof(
958 &self,
959 other: Self,
960 left_on: PyExpr,
961 right_on: PyExpr,
962 left_by: Option<Vec<PyBackedStr>>,
963 right_by: Option<Vec<PyBackedStr>>,
964 allow_parallel: bool,
965 force_parallel: bool,
966 suffix: String,
967 strategy: Wrap<AsofStrategy>,
968 tolerance: Option<Wrap<AnyValue<'_>>>,
969 tolerance_str: Option<String>,
970 coalesce: bool,
971 allow_eq: bool,
972 check_sortedness: bool,
973 ) -> PyResult<Self> {
974 let coalesce = if coalesce {
975 JoinCoalesce::CoalesceColumns
976 } else {
977 JoinCoalesce::KeepColumns
978 };
979 let ldf = self.ldf.read().clone();
980 let other = other.ldf.into_inner();
981 let left_on = left_on.inner;
982 let right_on = right_on.inner;
983 Ok(ldf
984 .join_builder()
985 .with(other)
986 .left_on([left_on])
987 .right_on([right_on])
988 .allow_parallel(allow_parallel)
989 .force_parallel(force_parallel)
990 .coalesce(coalesce)
991 .how(JoinType::AsOf(Box::new(AsOfOptions {
992 strategy: strategy.0,
993 left_by: left_by.map(strings_to_pl_smallstr),
994 right_by: right_by.map(strings_to_pl_smallstr),
995 tolerance: tolerance.map(|t| {
996 let av = t.0.into_static();
997 let dtype = av.dtype();
998 Scalar::new(dtype, av)
999 }),
1000 tolerance_str: tolerance_str.map(|s| s.into()),
1001 allow_eq,
1002 check_sortedness,
1003 })))
1004 .suffix(suffix)
1005 .finish()
1006 .into())
1007 }
1008
1009 #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1010 fn join(
1011 &self,
1012 other: Self,
1013 left_on: Vec<PyExpr>,
1014 right_on: Vec<PyExpr>,
1015 allow_parallel: bool,
1016 force_parallel: bool,
1017 nulls_equal: bool,
1018 how: Wrap<JoinType>,
1019 suffix: String,
1020 validate: Wrap<JoinValidation>,
1021 maintain_order: Wrap<MaintainOrderJoin>,
1022 coalesce: Option<bool>,
1023 ) -> PyResult<Self> {
1024 let coalesce = match coalesce {
1025 None => JoinCoalesce::JoinSpecific,
1026 Some(true) => JoinCoalesce::CoalesceColumns,
1027 Some(false) => JoinCoalesce::KeepColumns,
1028 };
1029 let ldf = self.ldf.read().clone();
1030 let other = other.ldf.into_inner();
1031 let left_on = left_on
1032 .into_iter()
1033 .map(|pyexpr| pyexpr.inner)
1034 .collect::<Vec<_>>();
1035 let right_on = right_on
1036 .into_iter()
1037 .map(|pyexpr| pyexpr.inner)
1038 .collect::<Vec<_>>();
1039
1040 Ok(ldf
1041 .join_builder()
1042 .with(other)
1043 .left_on(left_on)
1044 .right_on(right_on)
1045 .allow_parallel(allow_parallel)
1046 .force_parallel(force_parallel)
1047 .join_nulls(nulls_equal)
1048 .how(how.0)
1049 .suffix(suffix)
1050 .validate(validate.0)
1051 .coalesce(coalesce)
1052 .maintain_order(maintain_order.0)
1053 .finish()
1054 .into())
1055 }
1056
1057 fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1058 let ldf = self.ldf.read().clone();
1059 let other = other.ldf.into_inner();
1060
1061 let predicates = predicates.to_exprs();
1062
1063 Ok(ldf
1064 .join_builder()
1065 .with(other)
1066 .suffix(suffix)
1067 .join_where(predicates)
1068 .into())
1069 }
1070
1071 fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1072 let ldf = self.ldf.read().clone();
1073 ldf.with_columns(exprs.to_exprs()).into()
1074 }
1075
1076 fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1077 let ldf = self.ldf.read().clone();
1078 ldf.with_columns_seq(exprs.to_exprs()).into()
1079 }
1080
1081 fn match_to_schema<'py>(
1082 &self,
1083 schema: Wrap<Schema>,
1084 missing_columns: &Bound<'py, PyAny>,
1085 missing_struct_fields: &Bound<'py, PyAny>,
1086 extra_columns: Wrap<ExtraColumnsPolicy>,
1087 extra_struct_fields: &Bound<'py, PyAny>,
1088 integer_cast: &Bound<'py, PyAny>,
1089 float_cast: &Bound<'py, PyAny>,
1090 ) -> PyResult<Self> {
1091 fn parse_missing_columns<'py>(
1092 schema: &Schema,
1093 missing_columns: &Bound<'py, PyAny>,
1094 ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1095 let mut out = Vec::with_capacity(schema.len());
1096 if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1097 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1098 } else if let Ok(dict) = missing_columns.cast::<PyDict>() {
1099 out.extend(std::iter::repeat_n(
1100 MissingColumnsPolicyOrExpr::Raise,
1101 schema.len(),
1102 ));
1103 for (key, value) in dict.iter() {
1104 let key = key.extract::<String>()?;
1105 let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1106 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1107 }
1108 } else {
1109 return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1110 }
1111 Ok(out)
1112 }
1113 fn parse_missing_struct_fields<'py>(
1114 schema: &Schema,
1115 missing_struct_fields: &Bound<'py, PyAny>,
1116 ) -> PyResult<Vec<MissingColumnsPolicy>> {
1117 let mut out = Vec::with_capacity(schema.len());
1118 if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1119 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1120 } else if let Ok(dict) = missing_struct_fields.cast::<PyDict>() {
1121 out.extend(std::iter::repeat_n(
1122 MissingColumnsPolicy::Raise,
1123 schema.len(),
1124 ));
1125 for (key, value) in dict.iter() {
1126 let key = key.extract::<String>()?;
1127 let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1128 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1129 }
1130 } else {
1131 return Err(PyTypeError::new_err(
1132 "Invalid value for `missing_struct_fields`",
1133 ));
1134 }
1135 Ok(out)
1136 }
1137 fn parse_extra_struct_fields<'py>(
1138 schema: &Schema,
1139 extra_struct_fields: &Bound<'py, PyAny>,
1140 ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1141 let mut out = Vec::with_capacity(schema.len());
1142 if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1143 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1144 } else if let Ok(dict) = extra_struct_fields.cast::<PyDict>() {
1145 out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1146 for (key, value) in dict.iter() {
1147 let key = key.extract::<String>()?;
1148 let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1149 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1150 }
1151 } else {
1152 return Err(PyTypeError::new_err(
1153 "Invalid value for `extra_struct_fields`",
1154 ));
1155 }
1156 Ok(out)
1157 }
1158 fn parse_cast<'py>(
1159 schema: &Schema,
1160 cast: &Bound<'py, PyAny>,
1161 ) -> PyResult<Vec<UpcastOrForbid>> {
1162 let mut out = Vec::with_capacity(schema.len());
1163 if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1164 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1165 } else if let Ok(dict) = cast.cast::<PyDict>() {
1166 out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1167 for (key, value) in dict.iter() {
1168 let key = key.extract::<String>()?;
1169 let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1170 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1171 }
1172 } else {
1173 return Err(PyTypeError::new_err(
1174 "Invalid value for `integer_cast` / `float_cast`",
1175 ));
1176 }
1177 Ok(out)
1178 }
1179
1180 let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1181 let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1182 let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1183 let integer_cast = parse_cast(&schema.0, integer_cast)?;
1184 let float_cast = parse_cast(&schema.0, float_cast)?;
1185
1186 let per_column = (0..schema.0.len())
1187 .map(|i| MatchToSchemaPerColumn {
1188 missing_columns: missing_columns[i].clone(),
1189 missing_struct_fields: missing_struct_fields[i],
1190 extra_struct_fields: extra_struct_fields[i],
1191 integer_cast: integer_cast[i],
1192 float_cast: float_cast[i],
1193 })
1194 .collect();
1195
1196 let ldf = self.ldf.read().clone();
1197 Ok(ldf
1198 .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1199 .into())
1200 }
1201
1202 fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {
1203 let ldf = self.ldf.read().clone();
1204 let function = PythonObject(callback);
1205 ldf.pipe_with_schema(PlanCallback::new_python(function))
1206 .into()
1207 }
1208
1209 fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1210 let ldf = self.ldf.read().clone();
1211 ldf.rename(existing, new, strict).into()
1212 }
1213
1214 fn reverse(&self) -> Self {
1215 let ldf = self.ldf.read().clone();
1216 ldf.reverse().into()
1217 }
1218
1219 #[pyo3(signature = (n, fill_value=None))]
1220 fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1221 let lf = self.ldf.read().clone();
1222 let out = match fill_value {
1223 Some(v) => lf.shift_and_fill(n.inner, v.inner),
1224 None => lf.shift(n.inner),
1225 };
1226 out.into()
1227 }
1228
1229 fn fill_nan(&self, fill_value: PyExpr) -> Self {
1230 let ldf = self.ldf.read().clone();
1231 ldf.fill_nan(fill_value.inner).into()
1232 }
1233
1234 fn min(&self) -> Self {
1235 let ldf = self.ldf.read().clone();
1236 let out = ldf.min();
1237 out.into()
1238 }
1239
1240 fn max(&self) -> Self {
1241 let ldf = self.ldf.read().clone();
1242 let out = ldf.max();
1243 out.into()
1244 }
1245
1246 fn sum(&self) -> Self {
1247 let ldf = self.ldf.read().clone();
1248 let out = ldf.sum();
1249 out.into()
1250 }
1251
1252 fn mean(&self) -> Self {
1253 let ldf = self.ldf.read().clone();
1254 let out = ldf.mean();
1255 out.into()
1256 }
1257
1258 fn std(&self, ddof: u8) -> Self {
1259 let ldf = self.ldf.read().clone();
1260 let out = ldf.std(ddof);
1261 out.into()
1262 }
1263
1264 fn var(&self, ddof: u8) -> Self {
1265 let ldf = self.ldf.read().clone();
1266 let out = ldf.var(ddof);
1267 out.into()
1268 }
1269
1270 fn median(&self) -> Self {
1271 let ldf = self.ldf.read().clone();
1272 let out = ldf.median();
1273 out.into()
1274 }
1275
1276 fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1277 let ldf = self.ldf.read().clone();
1278 let out = ldf.quantile(quantile.inner, interpolation.0);
1279 out.into()
1280 }
1281
1282 fn explode(&self, subset: PySelector, empty_as_null: bool, keep_nulls: bool) -> Self {
1283 self.ldf
1284 .read()
1285 .clone()
1286 .explode(
1287 subset.inner,
1288 ExplodeOptions {
1289 empty_as_null,
1290 keep_nulls,
1291 },
1292 )
1293 .into()
1294 }
1295
1296 fn null_count(&self) -> Self {
1297 let ldf = self.ldf.read().clone();
1298 ldf.null_count().into()
1299 }
1300
1301 #[pyo3(signature = (maintain_order, subset, keep))]
1302 fn unique(
1303 &self,
1304 maintain_order: bool,
1305 subset: Option<Vec<PyExpr>>,
1306 keep: Wrap<UniqueKeepStrategy>,
1307 ) -> Self {
1308 let ldf = self.ldf.read().clone();
1309 let subset = subset.map(|exprs| exprs.into_iter().map(|e| e.inner).collect());
1310 match maintain_order {
1311 true => ldf.unique_stable_generic(subset, keep.0),
1312 false => ldf.unique_generic(subset, keep.0),
1313 }
1314 .into()
1315 }
1316
1317 fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1318 self.ldf
1319 .read()
1320 .clone()
1321 .drop_nans(subset.map(|e| e.inner))
1322 .into()
1323 }
1324
1325 fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1326 self.ldf
1327 .read()
1328 .clone()
1329 .drop_nulls(subset.map(|e| e.inner))
1330 .into()
1331 }
1332
1333 #[pyo3(signature = (offset, len=None))]
1334 fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1335 let ldf = self.ldf.read().clone();
1336 ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1337 }
1338
1339 fn tail(&self, n: IdxSize) -> Self {
1340 let ldf = self.ldf.read().clone();
1341 ldf.tail(n).into()
1342 }
1343
1344 #[cfg(feature = "pivot")]
1345 #[pyo3(signature = (on, on_columns, index, values, agg, maintain_order, separator))]
1346 fn pivot(
1347 &self,
1348 on: PySelector,
1349 on_columns: PyDataFrame,
1350 index: PySelector,
1351 values: PySelector,
1352 agg: PyExpr,
1353 maintain_order: bool,
1354 separator: String,
1355 ) -> Self {
1356 let ldf = self.ldf.read().clone();
1357 ldf.pivot(
1358 on.inner,
1359 Arc::new(on_columns.df.read().clone()),
1360 index.inner,
1361 values.inner,
1362 agg.inner,
1363 maintain_order,
1364 separator.into(),
1365 )
1366 .into()
1367 }
1368
1369 #[cfg(feature = "pivot")]
1370 #[pyo3(signature = (on, index, value_name, variable_name))]
1371 fn unpivot(
1372 &self,
1373 on: Option<PySelector>,
1374 index: PySelector,
1375 value_name: Option<String>,
1376 variable_name: Option<String>,
1377 ) -> Self {
1378 let args = UnpivotArgsDSL {
1379 on: on.map(|on| on.inner),
1380 index: index.inner,
1381 value_name: value_name.map(|s| s.into()),
1382 variable_name: variable_name.map(|s| s.into()),
1383 };
1384
1385 let ldf = self.ldf.read().clone();
1386 ldf.unpivot(args).into()
1387 }
1388
1389 #[pyo3(signature = (name, offset=None))]
1390 fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1391 let ldf = self.ldf.read().clone();
1392 ldf.with_row_index(name, offset).into()
1393 }
1394
1395 #[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1396 fn map_batches(
1397 &self,
1398 function: Py<PyAny>,
1399 predicate_pushdown: bool,
1400 projection_pushdown: bool,
1401 slice_pushdown: bool,
1402 streamable: bool,
1403 schema: Option<Wrap<Schema>>,
1404 validate_output: bool,
1405 ) -> Self {
1406 let mut opt = OptFlags::default();
1407 opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1408 opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1409 opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1410 opt.set(OptFlags::NEW_STREAMING, streamable);
1411
1412 self.ldf
1413 .read()
1414 .clone()
1415 .map_python(
1416 function.into(),
1417 opt,
1418 schema.map(|s| Arc::new(s.0)),
1419 validate_output,
1420 )
1421 .into()
1422 }
1423
1424 fn drop(&self, columns: PySelector) -> Self {
1425 self.ldf.read().clone().drop(columns.inner).into()
1426 }
1427
1428 fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1429 let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1430 cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1431 self.ldf.read().clone().cast(cast_map, strict).into()
1432 }
1433
1434 fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1435 self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1436 }
1437
1438 fn clone(&self) -> Self {
1439 self.ldf.read().clone().into()
1440 }
1441
1442 fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1443 let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1444
1445 let schema_dict = PyDict::new(py);
1446 schema.iter_fields().for_each(|fld| {
1447 schema_dict
1448 .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1449 .unwrap()
1450 });
1451 Ok(schema_dict)
1452 }
1453
1454 fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {
1455 self.ldf
1456 .read()
1457 .clone()
1458 .unnest(columns.inner, separator.map(PlSmallStr::from_str))
1459 .into()
1460 }
1461
1462 fn count(&self) -> Self {
1463 let ldf = self.ldf.read().clone();
1464 ldf.count().into()
1465 }
1466
1467 #[cfg(feature = "merge_sorted")]
1468 fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1469 let out = self
1470 .ldf
1471 .read()
1472 .clone()
1473 .merge_sorted(other.ldf.into_inner(), key)
1474 .map_err(PyPolarsErr::from)?;
1475 Ok(out.into())
1476 }
1477
1478 fn _node_name(&self) -> &str {
1479 let plan = &self.ldf.read().logical_plan;
1480 plan.into()
1481 }
1482
1483 fn hint_sorted(
1484 &self,
1485 columns: Vec<String>,
1486 descending: Vec<bool>,
1487 nulls_last: Vec<bool>,
1488 ) -> PyResult<Self> {
1489 if columns.len() != descending.len() && descending.len() != 1 {
1490 return Err(PyValueError::new_err(
1491 "`set_sorted` expects the same amount of `columns` as `descending` values.",
1492 ));
1493 }
1494 if columns.len() != nulls_last.len() && nulls_last.len() != 1 {
1495 return Err(PyValueError::new_err(
1496 "`set_sorted` expects the same amount of `columns` as `nulls_last` values.",
1497 ));
1498 }
1499
1500 let mut sorted = columns
1501 .iter()
1502 .map(|c| Sorted {
1503 column: PlSmallStr::from_str(c.as_str()),
1504 descending: Some(false),
1505 nulls_last: Some(false),
1506 })
1507 .collect::<Vec<_>>();
1508
1509 if !columns.is_empty() {
1510 if descending.len() != 1 {
1511 sorted
1512 .iter_mut()
1513 .zip(descending)
1514 .for_each(|(s, d)| s.descending = Some(d));
1515 } else if descending[0] {
1516 sorted.iter_mut().for_each(|s| s.descending = Some(true));
1517 }
1518
1519 if nulls_last.len() != 1 {
1520 sorted
1521 .iter_mut()
1522 .zip(nulls_last)
1523 .for_each(|(s, d)| s.nulls_last = Some(d));
1524 } else if nulls_last[0] {
1525 sorted.iter_mut().for_each(|s| s.nulls_last = Some(true));
1526 }
1527 }
1528
1529 let out = self
1530 .ldf
1531 .read()
1532 .clone()
1533 .hint(HintIR::Sorted(sorted.into()))
1534 .map_err(PyPolarsErr::from)?;
1535 Ok(out.into())
1536 }
1537}
1538
1539#[pyclass(frozen)]
1540struct PyCollectBatches {
1541 inner: Arc<Mutex<CollectBatches>>,
1542 ldf: LazyFrame,
1543}
1544
1545#[pymethods]
1546impl PyCollectBatches {
1547 fn start(&self) {
1548 self.inner.lock().start();
1549 }
1550
1551 fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
1552 slf
1553 }
1554
1555 fn __next__(slf: PyRef<'_, Self>, py: Python) -> PyResult<Option<PyDataFrame>> {
1556 let inner = Arc::clone(&slf.inner);
1557 py.enter_polars(|| PolarsResult::Ok(inner.lock().next().transpose()?.map(PyDataFrame::new)))
1558 }
1559
1560 #[allow(unused_variables)]
1561 #[pyo3(signature = (requested_schema=None))]
1562 fn __arrow_c_stream__<'py>(
1563 &self,
1564 py: Python<'py>,
1565 requested_schema: Option<Py<PyAny>>,
1566 ) -> PyResult<Bound<'py, PyCapsule>> {
1567 let mut ldf = self.ldf.clone();
1568 let schema = ldf
1569 .collect_schema()
1570 .map_err(PyPolarsErr::from)?
1571 .to_arrow(CompatLevel::newest());
1572
1573 let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());
1574
1575 let iter = Box::new(ArrowStreamIterator::new(self.inner.clone(), dtype.clone()));
1576 let field = ArrowField::new(PlSmallStr::EMPTY, dtype, false);
1577 let stream = export_iterator(iter, field);
1578 let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
1579 PyCapsule::new(py, stream, Some(stream_capsule_name))
1580 }
1581}
1582
1583pub struct ArrowStreamIterator {
1584 inner: Arc<Mutex<CollectBatches>>,
1585 dtype: ArrowDataType,
1586}
1587
1588impl ArrowStreamIterator {
1589 fn new(inner: Arc<Mutex<CollectBatches>>, schema: ArrowDataType) -> Self {
1590 Self {
1591 inner,
1592 dtype: schema,
1593 }
1594 }
1595}
1596
1597impl Iterator for ArrowStreamIterator {
1598 type Item = PolarsResult<ArrayRef>;
1599
1600 fn next(&mut self) -> Option<Self::Item> {
1601 let next = self.inner.lock().next();
1602 match next {
1603 None => None,
1604 Some(Err(err)) => Some(Err(err)),
1605 Some(Ok(df)) => {
1606 let height = df.height();
1607 let arrays = df.rechunk_into_arrow(CompatLevel::newest());
1608 Some(Ok(Box::new(arrow::array::StructArray::new(
1609 self.dtype.clone(),
1610 height,
1611 arrays,
1612 None,
1613 ))))
1614 },
1615 }
1616 }
1617}