1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::PathBuf;
4
5use polars::io::{HiveOptions, RowIndex};
6use polars::time::*;
7use polars_core::prelude::*;
8#[cfg(feature = "parquet")]
9use polars_parquet::arrow::write::StatisticsOptions;
10use polars_plan::plans::ScanSources;
11use pyo3::prelude::*;
12use pyo3::pybacked::PyBackedStr;
13use pyo3::types::{PyDict, PyList};
14
15use super::PyLazyFrame;
16use crate::error::PyPolarsErr;
17use crate::expr::ToExprs;
18use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
19use crate::lazyframe::visit::NodeTraverser;
20use crate::prelude::*;
21use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
22
23fn pyobject_to_first_path_and_scan_sources(
24 obj: PyObject,
25) -> PyResult<(Option<PathBuf>, ScanSources)> {
26 use crate::file::{get_python_scan_source_input, PythonScanSourceInput};
27 Ok(match get_python_scan_source_input(obj, false)? {
28 PythonScanSourceInput::Path(path) => {
29 (Some(path.clone()), ScanSources::Paths([path].into()))
30 },
31 PythonScanSourceInput::File(file) => (None, ScanSources::Files([file].into())),
32 PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
33 })
34}
35
36#[pymethods]
37#[allow(clippy::should_implement_trait)]
38impl PyLazyFrame {
39 #[staticmethod]
40 #[cfg(feature = "json")]
41 #[allow(clippy::too_many_arguments)]
42 #[pyo3(signature = (
43 source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
44 row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
45 ))]
46 fn new_from_ndjson(
47 source: Option<PyObject>,
48 sources: Wrap<ScanSources>,
49 infer_schema_length: Option<usize>,
50 schema: Option<Wrap<Schema>>,
51 schema_overrides: Option<Wrap<Schema>>,
52 batch_size: Option<NonZeroUsize>,
53 n_rows: Option<usize>,
54 low_memory: bool,
55 rechunk: bool,
56 row_index: Option<(String, IdxSize)>,
57 ignore_errors: bool,
58 include_file_paths: Option<String>,
59 cloud_options: Option<Vec<(String, String)>>,
60 credential_provider: Option<PyObject>,
61 retries: usize,
62 file_cache_ttl: Option<u64>,
63 ) -> PyResult<Self> {
64 use cloud::credential_provider::PlCredentialProvider;
65 let row_index = row_index.map(|(name, offset)| RowIndex {
66 name: name.into(),
67 offset,
68 });
69
70 let sources = sources.0;
71 let (first_path, sources) = match source {
72 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
73 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
74 };
75
76 let mut r = LazyJsonLineReader::new_with_sources(sources);
77
78 #[cfg(feature = "cloud")]
79 if let Some(first_path) = first_path {
80 let first_path_url = first_path.to_string_lossy();
81
82 let mut cloud_options =
83 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
84 cloud_options = cloud_options
85 .with_max_retries(retries)
86 .with_credential_provider(
87 credential_provider.map(PlCredentialProvider::from_python_func_object),
88 );
89
90 if let Some(file_cache_ttl) = file_cache_ttl {
91 cloud_options.file_cache_ttl = file_cache_ttl;
92 }
93
94 r = r.with_cloud_options(Some(cloud_options));
95 };
96
97 let lf = r
98 .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
99 .with_batch_size(batch_size)
100 .with_n_rows(n_rows)
101 .low_memory(low_memory)
102 .with_rechunk(rechunk)
103 .with_schema(schema.map(|schema| Arc::new(schema.0)))
104 .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
105 .with_row_index(row_index)
106 .with_ignore_errors(ignore_errors)
107 .with_include_file_paths(include_file_paths.map(|x| x.into()))
108 .finish()
109 .map_err(PyPolarsErr::from)?;
110
111 Ok(lf.into())
112 }
113
114 #[staticmethod]
115 #[cfg(feature = "csv")]
116 #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
117 low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
118 infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
119 encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
120 cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
121 )
122 )]
123 fn new_from_csv(
124 source: Option<PyObject>,
125 sources: Wrap<ScanSources>,
126 separator: &str,
127 has_header: bool,
128 ignore_errors: bool,
129 skip_rows: usize,
130 skip_lines: usize,
131 n_rows: Option<usize>,
132 cache: bool,
133 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
134 low_memory: bool,
135 comment_prefix: Option<&str>,
136 quote_char: Option<&str>,
137 null_values: Option<Wrap<NullValues>>,
138 missing_utf8_is_empty_string: bool,
139 infer_schema_length: Option<usize>,
140 with_schema_modify: Option<PyObject>,
141 rechunk: bool,
142 skip_rows_after_header: usize,
143 encoding: Wrap<CsvEncoding>,
144 row_index: Option<(String, IdxSize)>,
145 try_parse_dates: bool,
146 eol_char: &str,
147 raise_if_empty: bool,
148 truncate_ragged_lines: bool,
149 decimal_comma: bool,
150 glob: bool,
151 schema: Option<Wrap<Schema>>,
152 cloud_options: Option<Vec<(String, String)>>,
153 credential_provider: Option<PyObject>,
154 retries: usize,
155 file_cache_ttl: Option<u64>,
156 include_file_paths: Option<String>,
157 ) -> PyResult<Self> {
158 #[cfg(feature = "cloud")]
159 use cloud::credential_provider::PlCredentialProvider;
160
161 let null_values = null_values.map(|w| w.0);
162 let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
163 let separator = separator
164 .as_bytes()
165 .first()
166 .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
167 .copied()
168 .map_err(PyPolarsErr::from)?;
169 let eol_char = eol_char
170 .as_bytes()
171 .first()
172 .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
173 .copied()
174 .map_err(PyPolarsErr::from)?;
175 let row_index = row_index.map(|(name, offset)| RowIndex {
176 name: name.into(),
177 offset,
178 });
179
180 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
181 overwrite_dtype
182 .into_iter()
183 .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
184 .collect::<Schema>()
185 });
186
187 let sources = sources.0;
188 let (first_path, sources) = match source {
189 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
190 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
191 };
192
193 let mut r = LazyCsvReader::new_with_sources(sources);
194
195 #[cfg(feature = "cloud")]
196 if let Some(first_path) = first_path {
197 let first_path_url = first_path.to_string_lossy();
198
199 let mut cloud_options =
200 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
201 if let Some(file_cache_ttl) = file_cache_ttl {
202 cloud_options.file_cache_ttl = file_cache_ttl;
203 }
204 cloud_options = cloud_options
205 .with_max_retries(retries)
206 .with_credential_provider(
207 credential_provider.map(PlCredentialProvider::from_python_func_object),
208 );
209 r = r.with_cloud_options(Some(cloud_options));
210 }
211
212 let mut r = r
213 .with_infer_schema_length(infer_schema_length)
214 .with_separator(separator)
215 .with_has_header(has_header)
216 .with_ignore_errors(ignore_errors)
217 .with_skip_rows(skip_rows)
218 .with_skip_lines(skip_lines)
219 .with_n_rows(n_rows)
220 .with_cache(cache)
221 .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
222 .with_schema(schema.map(|schema| Arc::new(schema.0)))
223 .with_low_memory(low_memory)
224 .with_comment_prefix(comment_prefix.map(|x| x.into()))
225 .with_quote_char(quote_char)
226 .with_eol_char(eol_char)
227 .with_rechunk(rechunk)
228 .with_skip_rows_after_header(skip_rows_after_header)
229 .with_encoding(encoding.0)
230 .with_row_index(row_index)
231 .with_try_parse_dates(try_parse_dates)
232 .with_null_values(null_values)
233 .with_missing_is_null(!missing_utf8_is_empty_string)
234 .with_truncate_ragged_lines(truncate_ragged_lines)
235 .with_decimal_comma(decimal_comma)
236 .with_glob(glob)
237 .with_raise_if_empty(raise_if_empty)
238 .with_include_file_paths(include_file_paths.map(|x| x.into()));
239
240 if let Some(lambda) = with_schema_modify {
241 let f = |schema: Schema| {
242 let iter = schema.iter_names().map(|s| s.as_str());
243 Python::with_gil(|py| {
244 let names = PyList::new(py, iter).unwrap();
245
246 let out = lambda.call1(py, (names,)).expect("python function failed");
247 let new_names = out
248 .extract::<Vec<String>>(py)
249 .expect("python function should return List[str]");
250 polars_ensure!(new_names.len() == schema.len(),
251 ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
252 );
253 Ok(schema
254 .iter_values()
255 .zip(new_names)
256 .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
257 .collect())
258 })
259 };
260 r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
261 }
262
263 Ok(r.finish().map_err(PyPolarsErr::from)?.into())
264 }
265
266 #[cfg(feature = "parquet")]
267 #[staticmethod]
268 #[pyo3(signature = (
269 source, sources, n_rows, cache, parallel, rechunk, row_index, low_memory, cloud_options,
270 credential_provider, use_statistics, hive_partitioning, schema, hive_schema,
271 try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns,
272 ))]
273 fn new_from_parquet(
274 source: Option<PyObject>,
275 sources: Wrap<ScanSources>,
276 n_rows: Option<usize>,
277 cache: bool,
278 parallel: Wrap<ParallelStrategy>,
279 rechunk: bool,
280 row_index: Option<(String, IdxSize)>,
281 low_memory: bool,
282 cloud_options: Option<Vec<(String, String)>>,
283 credential_provider: Option<PyObject>,
284 use_statistics: bool,
285 hive_partitioning: Option<bool>,
286 schema: Option<Wrap<Schema>>,
287 hive_schema: Option<Wrap<Schema>>,
288 try_parse_hive_dates: bool,
289 retries: usize,
290 glob: bool,
291 include_file_paths: Option<String>,
292 allow_missing_columns: bool,
293 ) -> PyResult<Self> {
294 use cloud::credential_provider::PlCredentialProvider;
295
296 let parallel = parallel.0;
297 let hive_schema = hive_schema.map(|s| Arc::new(s.0));
298
299 let row_index = row_index.map(|(name, offset)| RowIndex {
300 name: name.into(),
301 offset,
302 });
303
304 let hive_options = HiveOptions {
305 enabled: hive_partitioning,
306 hive_start_idx: 0,
307 schema: hive_schema,
308 try_parse_dates: try_parse_hive_dates,
309 };
310
311 let mut args = ScanArgsParquet {
312 n_rows,
313 cache,
314 parallel,
315 rechunk,
316 row_index,
317 low_memory,
318 cloud_options: None,
319 use_statistics,
320 schema: schema.map(|x| Arc::new(x.0)),
321 hive_options,
322 glob,
323 include_file_paths: include_file_paths.map(|x| x.into()),
324 allow_missing_columns,
325 };
326
327 let sources = sources.0;
328 let (first_path, sources) = match source {
329 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
330 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
331 };
332
333 #[cfg(feature = "cloud")]
334 if let Some(first_path) = first_path {
335 let first_path_url = first_path.to_string_lossy();
336 let cloud_options =
337 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
338 args.cloud_options = Some(
339 cloud_options
340 .with_max_retries(retries)
341 .with_credential_provider(
342 credential_provider.map(PlCredentialProvider::from_python_func_object),
343 ),
344 );
345 }
346
347 let lf = LazyFrame::scan_parquet_sources(sources, args).map_err(PyPolarsErr::from)?;
348
349 Ok(lf.into())
350 }
351
352 #[cfg(feature = "ipc")]
353 #[staticmethod]
354 #[pyo3(signature = (
355 source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
356 hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
357 include_file_paths
358 ))]
359 fn new_from_ipc(
360 source: Option<PyObject>,
361 sources: Wrap<ScanSources>,
362 n_rows: Option<usize>,
363 cache: bool,
364 rechunk: bool,
365 row_index: Option<(String, IdxSize)>,
366 cloud_options: Option<Vec<(String, String)>>,
367 credential_provider: Option<PyObject>,
368 hive_partitioning: Option<bool>,
369 hive_schema: Option<Wrap<Schema>>,
370 try_parse_hive_dates: bool,
371 retries: usize,
372 file_cache_ttl: Option<u64>,
373 include_file_paths: Option<String>,
374 ) -> PyResult<Self> {
375 #[cfg(feature = "cloud")]
376 use cloud::credential_provider::PlCredentialProvider;
377 let row_index = row_index.map(|(name, offset)| RowIndex {
378 name: name.into(),
379 offset,
380 });
381
382 let hive_options = HiveOptions {
383 enabled: hive_partitioning,
384 hive_start_idx: 0,
385 schema: hive_schema.map(|x| Arc::new(x.0)),
386 try_parse_dates: try_parse_hive_dates,
387 };
388
389 let mut args = ScanArgsIpc {
390 n_rows,
391 cache,
392 rechunk,
393 row_index,
394 cloud_options: None,
395 hive_options,
396 include_file_paths: include_file_paths.map(|x| x.into()),
397 };
398
399 let sources = sources.0;
400 let (first_path, sources) = match source {
401 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
402 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
403 };
404
405 #[cfg(feature = "cloud")]
406 if let Some(first_path) = first_path {
407 let first_path_url = first_path.to_string_lossy();
408
409 let mut cloud_options =
410 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
411 if let Some(file_cache_ttl) = file_cache_ttl {
412 cloud_options.file_cache_ttl = file_cache_ttl;
413 }
414 args.cloud_options = Some(
415 cloud_options
416 .with_max_retries(retries)
417 .with_credential_provider(
418 credential_provider.map(PlCredentialProvider::from_python_func_object),
419 ),
420 );
421 }
422
423 let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
424 Ok(lf.into())
425 }
426
427 #[staticmethod]
428 fn scan_from_python_function_arrow_schema(
429 schema: &Bound<'_, PyList>,
430 scan_fn: PyObject,
431 pyarrow: bool,
432 ) -> PyResult<Self> {
433 let schema = pyarrow_schema_to_rust(schema)?;
434 Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into())
435 }
436
437 #[staticmethod]
438 fn scan_from_python_function_pl_schema(
439 schema: Vec<(PyBackedStr, Wrap<DataType>)>,
440 scan_fn: PyObject,
441 pyarrow: bool,
442 ) -> PyResult<Self> {
443 let schema = Schema::from_iter(
444 schema
445 .into_iter()
446 .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
447 );
448 Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into())
449 }
450
451 fn describe_plan(&self) -> PyResult<String> {
452 self.ldf
453 .describe_plan()
454 .map_err(PyPolarsErr::from)
455 .map_err(Into::into)
456 }
457
458 fn describe_optimized_plan(&self) -> PyResult<String> {
459 self.ldf
460 .describe_optimized_plan()
461 .map_err(PyPolarsErr::from)
462 .map_err(Into::into)
463 }
464
465 fn describe_plan_tree(&self) -> PyResult<String> {
466 self.ldf
467 .describe_plan_tree()
468 .map_err(PyPolarsErr::from)
469 .map_err(Into::into)
470 }
471
472 fn describe_optimized_plan_tree(&self) -> PyResult<String> {
473 self.ldf
474 .describe_optimized_plan_tree()
475 .map_err(PyPolarsErr::from)
476 .map_err(Into::into)
477 }
478
479 fn to_dot(&self, optimized: bool) -> PyResult<String> {
480 let result = self.ldf.to_dot(optimized).map_err(PyPolarsErr::from)?;
481 Ok(result)
482 }
483
484 fn optimization_toggle(
485 &self,
486 type_coercion: bool,
487 type_check: bool,
488 predicate_pushdown: bool,
489 projection_pushdown: bool,
490 simplify_expression: bool,
491 slice_pushdown: bool,
492 comm_subplan_elim: bool,
493 comm_subexpr_elim: bool,
494 cluster_with_columns: bool,
495 collapse_joins: bool,
496 streaming: bool,
497 _eager: bool,
498 _check_order: bool,
499 #[allow(unused_variables)] new_streaming: bool,
500 ) -> Self {
501 let ldf = self.ldf.clone();
502 let mut ldf = ldf
503 .with_type_coercion(type_coercion)
504 .with_type_check(type_check)
505 .with_predicate_pushdown(predicate_pushdown)
506 .with_simplify_expr(simplify_expression)
507 .with_slice_pushdown(slice_pushdown)
508 .with_cluster_with_columns(cluster_with_columns)
509 .with_collapse_joins(collapse_joins)
510 .with_check_order(_check_order)
511 ._with_eager(_eager)
512 .with_projection_pushdown(projection_pushdown);
513
514 #[cfg(feature = "streaming")]
515 {
516 ldf = ldf.with_streaming(streaming);
517 }
518
519 #[cfg(feature = "new_streaming")]
520 {
521 ldf = ldf.with_new_streaming(new_streaming);
522 }
523
524 #[cfg(feature = "cse")]
525 {
526 ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
527 ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
528 }
529
530 ldf.into()
531 }
532
533 fn sort(
534 &self,
535 by_column: &str,
536 descending: bool,
537 nulls_last: bool,
538 maintain_order: bool,
539 multithreaded: bool,
540 ) -> Self {
541 let ldf = self.ldf.clone();
542 ldf.sort(
543 [by_column],
544 SortMultipleOptions {
545 descending: vec![descending],
546 nulls_last: vec![nulls_last],
547 multithreaded,
548 maintain_order,
549 limit: None,
550 },
551 )
552 .into()
553 }
554
555 fn sort_by_exprs(
556 &self,
557 by: Vec<PyExpr>,
558 descending: Vec<bool>,
559 nulls_last: Vec<bool>,
560 maintain_order: bool,
561 multithreaded: bool,
562 ) -> Self {
563 let ldf = self.ldf.clone();
564 let exprs = by.to_exprs();
565 ldf.sort_by_exprs(
566 exprs,
567 SortMultipleOptions {
568 descending,
569 nulls_last,
570 maintain_order,
571 multithreaded,
572 limit: None,
573 },
574 )
575 .into()
576 }
577
578 fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
579 let ldf = self.ldf.clone();
580 let exprs = by.to_exprs();
581 ldf.top_k(
582 k,
583 exprs,
584 SortMultipleOptions::new().with_order_descending_multi(reverse),
585 )
586 .into()
587 }
588
589 fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
590 let ldf = self.ldf.clone();
591 let exprs = by.to_exprs();
592 ldf.bottom_k(
593 k,
594 exprs,
595 SortMultipleOptions::new().with_order_descending_multi(reverse),
596 )
597 .into()
598 }
599
600 fn cache(&self) -> Self {
601 let ldf = self.ldf.clone();
602 ldf.cache().into()
603 }
604
605 fn profile(&self, py: Python) -> PyResult<(PyDataFrame, PyDataFrame)> {
606 let (df, time_df) = py.allow_threads(|| {
609 let ldf = self.ldf.clone();
610 ldf.profile().map_err(PyPolarsErr::from)
611 })?;
612 Ok((df.into(), time_df.into()))
613 }
614
615 #[pyo3(signature = (lambda_post_opt=None))]
616 fn collect(&self, py: Python, lambda_post_opt: Option<PyObject>) -> PyResult<PyDataFrame> {
617 let df = py.allow_threads(|| {
620 let ldf = self.ldf.clone();
621 if let Some(lambda) = lambda_post_opt {
622 ldf._collect_post_opt(|root, lp_arena, expr_arena| {
623 Python::with_gil(|py| {
624 let nt = NodeTraverser::new(
625 root,
626 std::mem::take(lp_arena),
627 std::mem::take(expr_arena),
628 );
629
630 let arenas = nt.get_arenas();
632
633 lambda.call1(py, (nt,)).map_err(
636 |e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e),
637 )?;
638
639 std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
643 std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
644
645 Ok(())
646 })
647 })
648 } else {
649 ldf.collect()
650 }
651 .map_err(PyPolarsErr::from)
652 })?;
653 Ok(df.into())
654 }
655
656 #[pyo3(signature = (lambda,))]
657 fn collect_with_callback(&self, lambda: PyObject) {
658 let ldf = self.ldf.clone();
659
660 polars_core::POOL.spawn(move || {
661 let result = ldf
662 .collect()
663 .map(PyDataFrame::new)
664 .map_err(PyPolarsErr::from);
665
666 Python::with_gil(|py| match result {
667 Ok(df) => {
668 lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
669 },
670 Err(err) => {
671 lambda
672 .call1(py, (PyErr::from(err),))
673 .map_err(|err| err.restore(py))
674 .ok();
675 },
676 });
677 });
678 }
679
680 #[cfg(all(feature = "streaming", feature = "parquet"))]
681 #[pyo3(signature = (
682 path, compression, compression_level, statistics, row_group_size, data_page_size,
683 maintain_order, cloud_options, credential_provider, retries
684 ))]
685 fn sink_parquet(
686 &self,
687 py: Python,
688 path: PathBuf,
689 compression: &str,
690 compression_level: Option<i32>,
691 statistics: Wrap<StatisticsOptions>,
692 row_group_size: Option<usize>,
693 data_page_size: Option<usize>,
694 maintain_order: bool,
695 cloud_options: Option<Vec<(String, String)>>,
696 credential_provider: Option<PyObject>,
697 retries: usize,
698 ) -> PyResult<()> {
699 let compression = parse_parquet_compression(compression, compression_level)?;
700
701 let options = ParquetWriteOptions {
702 compression,
703 statistics: statistics.0,
704 row_group_size,
705 data_page_size,
706 maintain_order,
707 };
708
709 let cloud_options = {
710 let cloud_options =
711 parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?;
712 Some(
713 cloud_options
714 .with_max_retries(retries)
715 .with_credential_provider(
716 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object),
717 ),
718 )
719 };
720
721 py.allow_threads(|| {
724 let ldf = self.ldf.clone();
725 ldf.sink_parquet(&path, options, cloud_options)
726 .map_err(PyPolarsErr::from)
727 })?;
728 Ok(())
729 }
730
731 #[cfg(all(feature = "streaming", feature = "ipc"))]
732 #[pyo3(signature = (path, compression, maintain_order, cloud_options, credential_provider, retries))]
733 fn sink_ipc(
734 &self,
735 py: Python,
736 path: PathBuf,
737 compression: Option<Wrap<IpcCompression>>,
738 maintain_order: bool,
739 cloud_options: Option<Vec<(String, String)>>,
740 credential_provider: Option<PyObject>,
741 retries: usize,
742 ) -> PyResult<()> {
743 let options = IpcWriterOptions {
744 compression: compression.map(|c| c.0),
745 maintain_order,
746 };
747
748 #[cfg(feature = "cloud")]
749 let cloud_options = {
750 let cloud_options =
751 parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?;
752 Some(
753 cloud_options
754 .with_max_retries(retries)
755 .with_credential_provider(
756 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object),
757 ),
758 )
759 };
760
761 #[cfg(not(feature = "cloud"))]
762 let cloud_options = None;
763
764 py.allow_threads(|| {
767 let ldf = self.ldf.clone();
768 ldf.sink_ipc(path, options, cloud_options)
769 .map_err(PyPolarsErr::from)
770 })?;
771 Ok(())
772 }
773
774 #[cfg(all(feature = "streaming", feature = "csv"))]
775 #[pyo3(signature = (
776 path, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
777 datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
778 quote_style, maintain_order, cloud_options, credential_provider, retries
779 ))]
780 fn sink_csv(
781 &self,
782 py: Python,
783 path: PathBuf,
784 include_bom: bool,
785 include_header: bool,
786 separator: u8,
787 line_terminator: String,
788 quote_char: u8,
789 batch_size: NonZeroUsize,
790 datetime_format: Option<String>,
791 date_format: Option<String>,
792 time_format: Option<String>,
793 float_scientific: Option<bool>,
794 float_precision: Option<usize>,
795 null_value: Option<String>,
796 quote_style: Option<Wrap<QuoteStyle>>,
797 maintain_order: bool,
798 cloud_options: Option<Vec<(String, String)>>,
799 credential_provider: Option<PyObject>,
800 retries: usize,
801 ) -> PyResult<()> {
802 let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
803 let null_value = null_value.unwrap_or(SerializeOptions::default().null);
804
805 let serialize_options = SerializeOptions {
806 date_format,
807 time_format,
808 datetime_format,
809 float_scientific,
810 float_precision,
811 separator,
812 quote_char,
813 null: null_value,
814 line_terminator,
815 quote_style,
816 };
817
818 let options = CsvWriterOptions {
819 include_bom,
820 include_header,
821 maintain_order,
822 batch_size,
823 serialize_options,
824 };
825
826 #[cfg(feature = "cloud")]
827 let cloud_options = {
828 let cloud_options =
829 parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?;
830 Some(
831 cloud_options
832 .with_max_retries(retries)
833 .with_credential_provider(
834 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object),
835 ),
836 )
837 };
838
839 #[cfg(not(feature = "cloud"))]
840 let cloud_options = None;
841
842 py.allow_threads(|| {
845 let ldf = self.ldf.clone();
846 ldf.sink_csv(path, options, cloud_options)
847 .map_err(PyPolarsErr::from)
848 })?;
849 Ok(())
850 }
851
852 #[allow(clippy::too_many_arguments)]
853 #[cfg(all(feature = "streaming", feature = "json"))]
854 #[pyo3(signature = (path, maintain_order, cloud_options, credential_provider, retries))]
855 fn sink_json(
856 &self,
857 py: Python,
858 path: PathBuf,
859 maintain_order: bool,
860 cloud_options: Option<Vec<(String, String)>>,
861 credential_provider: Option<PyObject>,
862 retries: usize,
863 ) -> PyResult<()> {
864 let options = JsonWriterOptions { maintain_order };
865
866 let cloud_options = {
867 let cloud_options =
868 parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?;
869 Some(
870 cloud_options
871 .with_max_retries(retries)
872 .with_credential_provider(
873 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object),
874 ),
875 )
876 };
877
878 py.allow_threads(|| {
881 let ldf = self.ldf.clone();
882 ldf.sink_json(path, options, cloud_options)
883 .map_err(PyPolarsErr::from)
884 })?;
885 Ok(())
886 }
887
888 fn fetch(&self, py: Python, n_rows: usize) -> PyResult<PyDataFrame> {
889 let ldf = self.ldf.clone();
890 let df = py.allow_threads(|| ldf.fetch(n_rows).map_err(PyPolarsErr::from))?;
891 Ok(df.into())
892 }
893
894 fn filter(&mut self, predicate: PyExpr) -> Self {
895 let ldf = self.ldf.clone();
896 ldf.filter(predicate.inner).into()
897 }
898
899 fn select(&mut self, exprs: Vec<PyExpr>) -> Self {
900 let ldf = self.ldf.clone();
901 let exprs = exprs.to_exprs();
902 ldf.select(exprs).into()
903 }
904
905 fn select_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
906 let ldf = self.ldf.clone();
907 let exprs = exprs.to_exprs();
908 ldf.select_seq(exprs).into()
909 }
910
911 fn group_by(&mut self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
912 let ldf = self.ldf.clone();
913 let by = by.to_exprs();
914 let lazy_gb = if maintain_order {
915 ldf.group_by_stable(by)
916 } else {
917 ldf.group_by(by)
918 };
919
920 PyLazyGroupBy { lgb: Some(lazy_gb) }
921 }
922
923 fn rolling(
924 &mut self,
925 index_column: PyExpr,
926 period: &str,
927 offset: &str,
928 closed: Wrap<ClosedWindow>,
929 by: Vec<PyExpr>,
930 ) -> PyResult<PyLazyGroupBy> {
931 let closed_window = closed.0;
932 let ldf = self.ldf.clone();
933 let by = by
934 .into_iter()
935 .map(|pyexpr| pyexpr.inner)
936 .collect::<Vec<_>>();
937 let lazy_gb = ldf.rolling(
938 index_column.inner,
939 by,
940 RollingGroupOptions {
941 index_column: "".into(),
942 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
943 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
944 closed_window,
945 },
946 );
947
948 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
949 }
950
951 fn group_by_dynamic(
952 &mut self,
953 index_column: PyExpr,
954 every: &str,
955 period: &str,
956 offset: &str,
957 label: Wrap<Label>,
958 include_boundaries: bool,
959 closed: Wrap<ClosedWindow>,
960 group_by: Vec<PyExpr>,
961 start_by: Wrap<StartBy>,
962 ) -> PyResult<PyLazyGroupBy> {
963 let closed_window = closed.0;
964 let group_by = group_by
965 .into_iter()
966 .map(|pyexpr| pyexpr.inner)
967 .collect::<Vec<_>>();
968 let ldf = self.ldf.clone();
969 let lazy_gb = ldf.group_by_dynamic(
970 index_column.inner,
971 group_by,
972 DynamicGroupOptions {
973 every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
974 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
975 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
976 label: label.0,
977 include_boundaries,
978 closed_window,
979 start_by: start_by.0,
980 ..Default::default()
981 },
982 );
983
984 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
985 }
986
987 fn with_context(&self, contexts: Vec<Self>) -> Self {
988 let contexts = contexts.into_iter().map(|ldf| ldf.ldf).collect::<Vec<_>>();
989 self.ldf.clone().with_context(contexts).into()
990 }
991
992 #[cfg(feature = "asof_join")]
993 #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
994 fn join_asof(
995 &self,
996 other: Self,
997 left_on: PyExpr,
998 right_on: PyExpr,
999 left_by: Option<Vec<PyBackedStr>>,
1000 right_by: Option<Vec<PyBackedStr>>,
1001 allow_parallel: bool,
1002 force_parallel: bool,
1003 suffix: String,
1004 strategy: Wrap<AsofStrategy>,
1005 tolerance: Option<Wrap<AnyValue<'_>>>,
1006 tolerance_str: Option<String>,
1007 coalesce: bool,
1008 allow_eq: bool,
1009 check_sortedness: bool,
1010 ) -> PyResult<Self> {
1011 let coalesce = if coalesce {
1012 JoinCoalesce::CoalesceColumns
1013 } else {
1014 JoinCoalesce::KeepColumns
1015 };
1016 let ldf = self.ldf.clone();
1017 let other = other.ldf;
1018 let left_on = left_on.inner;
1019 let right_on = right_on.inner;
1020 Ok(ldf
1021 .join_builder()
1022 .with(other)
1023 .left_on([left_on])
1024 .right_on([right_on])
1025 .allow_parallel(allow_parallel)
1026 .force_parallel(force_parallel)
1027 .coalesce(coalesce)
1028 .how(JoinType::AsOf(AsOfOptions {
1029 strategy: strategy.0,
1030 left_by: left_by.map(strings_to_pl_smallstr),
1031 right_by: right_by.map(strings_to_pl_smallstr),
1032 tolerance: tolerance.map(|t| t.0.into_static()),
1033 tolerance_str: tolerance_str.map(|s| s.into()),
1034 allow_eq,
1035 check_sortedness,
1036 }))
1037 .suffix(suffix)
1038 .finish()
1039 .into())
1040 }
1041
1042 #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, join_nulls, how, suffix, validate, maintain_order, coalesce=None))]
1043 fn join(
1044 &self,
1045 other: Self,
1046 left_on: Vec<PyExpr>,
1047 right_on: Vec<PyExpr>,
1048 allow_parallel: bool,
1049 force_parallel: bool,
1050 join_nulls: bool,
1051 how: Wrap<JoinType>,
1052 suffix: String,
1053 validate: Wrap<JoinValidation>,
1054 maintain_order: Wrap<MaintainOrderJoin>,
1055 coalesce: Option<bool>,
1056 ) -> PyResult<Self> {
1057 let coalesce = match coalesce {
1058 None => JoinCoalesce::JoinSpecific,
1059 Some(true) => JoinCoalesce::CoalesceColumns,
1060 Some(false) => JoinCoalesce::KeepColumns,
1061 };
1062 let ldf = self.ldf.clone();
1063 let other = other.ldf;
1064 let left_on = left_on
1065 .into_iter()
1066 .map(|pyexpr| pyexpr.inner)
1067 .collect::<Vec<_>>();
1068 let right_on = right_on
1069 .into_iter()
1070 .map(|pyexpr| pyexpr.inner)
1071 .collect::<Vec<_>>();
1072
1073 Ok(ldf
1074 .join_builder()
1075 .with(other)
1076 .left_on(left_on)
1077 .right_on(right_on)
1078 .allow_parallel(allow_parallel)
1079 .force_parallel(force_parallel)
1080 .join_nulls(join_nulls)
1081 .how(how.0)
1082 .suffix(suffix)
1083 .validate(validate.0)
1084 .coalesce(coalesce)
1085 .maintain_order(maintain_order.0)
1086 .finish()
1087 .into())
1088 }
1089
1090 fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1091 let ldf = self.ldf.clone();
1092 let other = other.ldf;
1093
1094 let predicates = predicates.to_exprs();
1095
1096 Ok(ldf
1097 .join_builder()
1098 .with(other)
1099 .suffix(suffix)
1100 .join_where(predicates)
1101 .into())
1102 }
1103
1104 fn with_columns(&mut self, exprs: Vec<PyExpr>) -> Self {
1105 let ldf = self.ldf.clone();
1106 ldf.with_columns(exprs.to_exprs()).into()
1107 }
1108
1109 fn with_columns_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1110 let ldf = self.ldf.clone();
1111 ldf.with_columns_seq(exprs.to_exprs()).into()
1112 }
1113
1114 fn rename(&mut self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1115 let ldf = self.ldf.clone();
1116 ldf.rename(existing, new, strict).into()
1117 }
1118
1119 fn reverse(&self) -> Self {
1120 let ldf = self.ldf.clone();
1121 ldf.reverse().into()
1122 }
1123
1124 #[pyo3(signature = (n, fill_value=None))]
1125 fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1126 let lf = self.ldf.clone();
1127 let out = match fill_value {
1128 Some(v) => lf.shift_and_fill(n.inner, v.inner),
1129 None => lf.shift(n.inner),
1130 };
1131 out.into()
1132 }
1133
1134 fn fill_nan(&self, fill_value: PyExpr) -> Self {
1135 let ldf = self.ldf.clone();
1136 ldf.fill_nan(fill_value.inner).into()
1137 }
1138
1139 fn min(&self) -> Self {
1140 let ldf = self.ldf.clone();
1141 let out = ldf.min();
1142 out.into()
1143 }
1144
1145 fn max(&self) -> Self {
1146 let ldf = self.ldf.clone();
1147 let out = ldf.max();
1148 out.into()
1149 }
1150
1151 fn sum(&self) -> Self {
1152 let ldf = self.ldf.clone();
1153 let out = ldf.sum();
1154 out.into()
1155 }
1156
1157 fn mean(&self) -> Self {
1158 let ldf = self.ldf.clone();
1159 let out = ldf.mean();
1160 out.into()
1161 }
1162
1163 fn std(&self, ddof: u8) -> Self {
1164 let ldf = self.ldf.clone();
1165 let out = ldf.std(ddof);
1166 out.into()
1167 }
1168
1169 fn var(&self, ddof: u8) -> Self {
1170 let ldf = self.ldf.clone();
1171 let out = ldf.var(ddof);
1172 out.into()
1173 }
1174
1175 fn median(&self) -> Self {
1176 let ldf = self.ldf.clone();
1177 let out = ldf.median();
1178 out.into()
1179 }
1180
1181 fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1182 let ldf = self.ldf.clone();
1183 let out = ldf.quantile(quantile.inner, interpolation.0);
1184 out.into()
1185 }
1186
1187 fn explode(&self, column: Vec<PyExpr>) -> Self {
1188 let ldf = self.ldf.clone();
1189 let column = column.to_exprs();
1190 ldf.explode(column).into()
1191 }
1192
1193 fn null_count(&self) -> Self {
1194 let ldf = self.ldf.clone();
1195 ldf.null_count().into()
1196 }
1197
1198 #[pyo3(signature = (maintain_order, subset, keep))]
1199 fn unique(
1200 &self,
1201 maintain_order: bool,
1202 subset: Option<Vec<PyExpr>>,
1203 keep: Wrap<UniqueKeepStrategy>,
1204 ) -> Self {
1205 let ldf = self.ldf.clone();
1206 let subset = subset.map(|e| e.to_exprs());
1207 match maintain_order {
1208 true => ldf.unique_stable_generic(subset, keep.0),
1209 false => ldf.unique_generic(subset, keep.0),
1210 }
1211 .into()
1212 }
1213
1214 #[pyo3(signature = (subset=None))]
1215 fn drop_nans(&self, subset: Option<Vec<PyExpr>>) -> Self {
1216 let ldf = self.ldf.clone();
1217 let subset = subset.map(|e| e.to_exprs());
1218 ldf.drop_nans(subset).into()
1219 }
1220
1221 #[pyo3(signature = (subset=None))]
1222 fn drop_nulls(&self, subset: Option<Vec<PyExpr>>) -> Self {
1223 let ldf = self.ldf.clone();
1224 let subset = subset.map(|e| e.to_exprs());
1225 ldf.drop_nulls(subset).into()
1226 }
1227
1228 #[pyo3(signature = (offset, len=None))]
1229 fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1230 let ldf = self.ldf.clone();
1231 ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1232 }
1233
1234 fn tail(&self, n: IdxSize) -> Self {
1235 let ldf = self.ldf.clone();
1236 ldf.tail(n).into()
1237 }
1238
1239 #[cfg(feature = "pivot")]
1240 #[pyo3(signature = (on, index, value_name, variable_name))]
1241 fn unpivot(
1242 &self,
1243 on: Vec<PyExpr>,
1244 index: Vec<PyExpr>,
1245 value_name: Option<String>,
1246 variable_name: Option<String>,
1247 ) -> Self {
1248 let args = UnpivotArgsDSL {
1249 on: on.into_iter().map(|e| e.inner.into()).collect(),
1250 index: index.into_iter().map(|e| e.inner.into()).collect(),
1251 value_name: value_name.map(|s| s.into()),
1252 variable_name: variable_name.map(|s| s.into()),
1253 };
1254
1255 let ldf = self.ldf.clone();
1256 ldf.unpivot(args).into()
1257 }
1258
1259 #[pyo3(signature = (name, offset=None))]
1260 fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1261 let ldf = self.ldf.clone();
1262 ldf.with_row_index(name, offset).into()
1263 }
1264
1265 #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1266 fn map_batches(
1267 &self,
1268 lambda: PyObject,
1269 predicate_pushdown: bool,
1270 projection_pushdown: bool,
1271 slice_pushdown: bool,
1272 streamable: bool,
1273 schema: Option<Wrap<Schema>>,
1274 validate_output: bool,
1275 ) -> Self {
1276 let mut opt = OptFlags::default();
1277 opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1278 opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1279 opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1280 opt.set(OptFlags::STREAMING, streamable);
1281
1282 self.ldf
1283 .clone()
1284 .map_python(
1285 lambda.into(),
1286 opt,
1287 schema.map(|s| Arc::new(s.0)),
1288 validate_output,
1289 )
1290 .into()
1291 }
1292
1293 fn drop(&self, columns: Vec<PyExpr>, strict: bool) -> Self {
1294 let ldf = self.ldf.clone();
1295 let columns = columns.to_exprs();
1296 if strict {
1297 ldf.drop(columns)
1298 } else {
1299 ldf.drop_no_validate(columns)
1300 }
1301 .into()
1302 }
1303
1304 fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1305 let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1306 cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1307 self.ldf.clone().cast(cast_map, strict).into()
1308 }
1309
1310 fn cast_all(&self, dtype: Wrap<DataType>, strict: bool) -> Self {
1311 self.ldf.clone().cast_all(dtype.0, strict).into()
1312 }
1313
1314 fn clone(&self) -> Self {
1315 self.ldf.clone().into()
1316 }
1317
1318 fn collect_schema<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1319 let schema = py
1320 .allow_threads(|| self.ldf.collect_schema())
1321 .map_err(PyPolarsErr::from)?;
1322
1323 let schema_dict = PyDict::new(py);
1324 schema.iter_fields().for_each(|fld| {
1325 schema_dict
1326 .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1327 .unwrap()
1328 });
1329 Ok(schema_dict)
1330 }
1331
1332 fn unnest(&self, columns: Vec<PyExpr>) -> Self {
1333 let columns = columns.to_exprs();
1334 self.ldf.clone().unnest(columns).into()
1335 }
1336
1337 fn count(&self) -> Self {
1338 let ldf = self.ldf.clone();
1339 ldf.count().into()
1340 }
1341
1342 #[cfg(feature = "merge_sorted")]
1343 fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1344 let out = self
1345 .ldf
1346 .clone()
1347 .merge_sorted(other.ldf, key)
1348 .map_err(PyPolarsErr::from)?;
1349 Ok(out.into())
1350 }
1351}