1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::PathBuf;
4
5use either::Either;
6use polars::io::{HiveOptions, RowIndex};
7use polars::time::*;
8use polars_core::prelude::*;
9#[cfg(feature = "parquet")]
10use polars_parquet::arrow::write::StatisticsOptions;
11use polars_plan::dsl::ScanSources;
12use polars_plan::plans::{AExpr, IR};
13use polars_utils::arena::{Arena, Node};
14use polars_utils::python_function::PythonObject;
15use pyo3::exceptions::PyTypeError;
16use pyo3::prelude::*;
17use pyo3::pybacked::PyBackedStr;
18use pyo3::types::{PyDict, PyDictMethods, PyList};
19
20use super::{PyLazyFrame, PyOptFlags, SinkTarget};
21use crate::error::PyPolarsErr;
22use crate::expr::ToExprs;
23use crate::expr::datatype::PyDataTypeExpr;
24use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
25use crate::io::PyScanOptions;
26use crate::lazyframe::visit::NodeTraverser;
27use crate::prelude::*;
28use crate::utils::{EnterPolarsExt, to_py_err};
29use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
30
31fn pyobject_to_first_path_and_scan_sources(
32 obj: PyObject,
33) -> PyResult<(Option<PathBuf>, ScanSources)> {
34 use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
35 Ok(match get_python_scan_source_input(obj, false)? {
36 PythonScanSourceInput::Path(path) => {
37 (Some(path.clone()), ScanSources::Paths([path].into()))
38 },
39 PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
40 PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
41 })
42}
43
44fn post_opt_callback(
45 lambda: &PyObject,
46 root: Node,
47 lp_arena: &mut Arena<IR>,
48 expr_arena: &mut Arena<AExpr>,
49 duration_since_start: Option<std::time::Duration>,
50) -> PolarsResult<()> {
51 Python::with_gil(|py| {
52 let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
53
54 let arenas = nt.get_arenas();
56
57 lambda
60 .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
61 .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
62
63 std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
67 std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
68
69 Ok(())
70 })
71}
72
73#[pymethods]
74#[allow(clippy::should_implement_trait)]
75impl PyLazyFrame {
76 #[staticmethod]
77 #[cfg(feature = "json")]
78 #[allow(clippy::too_many_arguments)]
79 #[pyo3(signature = (
80 source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
81 row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
82 ))]
83 fn new_from_ndjson(
84 source: Option<PyObject>,
85 sources: Wrap<ScanSources>,
86 infer_schema_length: Option<usize>,
87 schema: Option<Wrap<Schema>>,
88 schema_overrides: Option<Wrap<Schema>>,
89 batch_size: Option<NonZeroUsize>,
90 n_rows: Option<usize>,
91 low_memory: bool,
92 rechunk: bool,
93 row_index: Option<(String, IdxSize)>,
94 ignore_errors: bool,
95 include_file_paths: Option<String>,
96 cloud_options: Option<Vec<(String, String)>>,
97 credential_provider: Option<PyObject>,
98 retries: usize,
99 file_cache_ttl: Option<u64>,
100 ) -> PyResult<Self> {
101 use cloud::credential_provider::PlCredentialProvider;
102 let row_index = row_index.map(|(name, offset)| RowIndex {
103 name: name.into(),
104 offset,
105 });
106
107 let sources = sources.0;
108 let (first_path, sources) = match source {
109 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
110 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
111 };
112
113 let mut r = LazyJsonLineReader::new_with_sources(sources);
114
115 #[cfg(feature = "cloud")]
116 if let Some(first_path) = first_path {
117 let first_path_url = first_path.to_string_lossy();
118
119 let mut cloud_options =
120 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
121 cloud_options = cloud_options
122 .with_max_retries(retries)
123 .with_credential_provider(
124 credential_provider.map(PlCredentialProvider::from_python_builder),
125 );
126
127 if let Some(file_cache_ttl) = file_cache_ttl {
128 cloud_options.file_cache_ttl = file_cache_ttl;
129 }
130
131 r = r.with_cloud_options(Some(cloud_options));
132 };
133
134 let lf = r
135 .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
136 .with_batch_size(batch_size)
137 .with_n_rows(n_rows)
138 .low_memory(low_memory)
139 .with_rechunk(rechunk)
140 .with_schema(schema.map(|schema| Arc::new(schema.0)))
141 .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
142 .with_row_index(row_index)
143 .with_ignore_errors(ignore_errors)
144 .with_include_file_paths(include_file_paths.map(|x| x.into()))
145 .finish()
146 .map_err(PyPolarsErr::from)?;
147
148 Ok(lf.into())
149 }
150
151 #[staticmethod]
152 #[cfg(feature = "csv")]
153 #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
154 low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
155 infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
156 encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
157 cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
158 )
159 )]
160 fn new_from_csv(
161 source: Option<PyObject>,
162 sources: Wrap<ScanSources>,
163 separator: &str,
164 has_header: bool,
165 ignore_errors: bool,
166 skip_rows: usize,
167 skip_lines: usize,
168 n_rows: Option<usize>,
169 cache: bool,
170 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
171 low_memory: bool,
172 comment_prefix: Option<&str>,
173 quote_char: Option<&str>,
174 null_values: Option<Wrap<NullValues>>,
175 missing_utf8_is_empty_string: bool,
176 infer_schema_length: Option<usize>,
177 with_schema_modify: Option<PyObject>,
178 rechunk: bool,
179 skip_rows_after_header: usize,
180 encoding: Wrap<CsvEncoding>,
181 row_index: Option<(String, IdxSize)>,
182 try_parse_dates: bool,
183 eol_char: &str,
184 raise_if_empty: bool,
185 truncate_ragged_lines: bool,
186 decimal_comma: bool,
187 glob: bool,
188 schema: Option<Wrap<Schema>>,
189 cloud_options: Option<Vec<(String, String)>>,
190 credential_provider: Option<PyObject>,
191 retries: usize,
192 file_cache_ttl: Option<u64>,
193 include_file_paths: Option<String>,
194 ) -> PyResult<Self> {
195 #[cfg(feature = "cloud")]
196 use cloud::credential_provider::PlCredentialProvider;
197
198 let null_values = null_values.map(|w| w.0);
199 let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
200 let separator = separator
201 .as_bytes()
202 .first()
203 .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
204 .copied()
205 .map_err(PyPolarsErr::from)?;
206 let eol_char = eol_char
207 .as_bytes()
208 .first()
209 .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
210 .copied()
211 .map_err(PyPolarsErr::from)?;
212 let row_index = row_index.map(|(name, offset)| RowIndex {
213 name: name.into(),
214 offset,
215 });
216
217 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
218 overwrite_dtype
219 .into_iter()
220 .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
221 .collect::<Schema>()
222 });
223
224 let sources = sources.0;
225 let (first_path, sources) = match source {
226 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
227 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
228 };
229
230 let mut r = LazyCsvReader::new_with_sources(sources);
231
232 #[cfg(feature = "cloud")]
233 if let Some(first_path) = first_path {
234 let first_path_url = first_path.to_string_lossy();
235
236 let mut cloud_options =
237 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
238 if let Some(file_cache_ttl) = file_cache_ttl {
239 cloud_options.file_cache_ttl = file_cache_ttl;
240 }
241 cloud_options = cloud_options
242 .with_max_retries(retries)
243 .with_credential_provider(
244 credential_provider.map(PlCredentialProvider::from_python_builder),
245 );
246 r = r.with_cloud_options(Some(cloud_options));
247 }
248
249 let mut r = r
250 .with_infer_schema_length(infer_schema_length)
251 .with_separator(separator)
252 .with_has_header(has_header)
253 .with_ignore_errors(ignore_errors)
254 .with_skip_rows(skip_rows)
255 .with_skip_lines(skip_lines)
256 .with_n_rows(n_rows)
257 .with_cache(cache)
258 .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
259 .with_schema(schema.map(|schema| Arc::new(schema.0)))
260 .with_low_memory(low_memory)
261 .with_comment_prefix(comment_prefix.map(|x| x.into()))
262 .with_quote_char(quote_char)
263 .with_eol_char(eol_char)
264 .with_rechunk(rechunk)
265 .with_skip_rows_after_header(skip_rows_after_header)
266 .with_encoding(encoding.0)
267 .with_row_index(row_index)
268 .with_try_parse_dates(try_parse_dates)
269 .with_null_values(null_values)
270 .with_missing_is_null(!missing_utf8_is_empty_string)
271 .with_truncate_ragged_lines(truncate_ragged_lines)
272 .with_decimal_comma(decimal_comma)
273 .with_glob(glob)
274 .with_raise_if_empty(raise_if_empty)
275 .with_include_file_paths(include_file_paths.map(|x| x.into()));
276
277 if let Some(lambda) = with_schema_modify {
278 let f = |schema: Schema| {
279 let iter = schema.iter_names().map(|s| s.as_str());
280 Python::with_gil(|py| {
281 let names = PyList::new(py, iter).unwrap();
282
283 let out = lambda.call1(py, (names,)).expect("python function failed");
284 let new_names = out
285 .extract::<Vec<String>>(py)
286 .expect("python function should return List[str]");
287 polars_ensure!(new_names.len() == schema.len(),
288 ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
289 );
290 Ok(schema
291 .iter_values()
292 .zip(new_names)
293 .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
294 .collect())
295 })
296 };
297 r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
298 }
299
300 Ok(r.finish().map_err(PyPolarsErr::from)?.into())
301 }
302
303 #[cfg(feature = "parquet")]
304 #[staticmethod]
305 #[pyo3(signature = (
306 sources, schema, scan_options, parallel, low_memory, use_statistics
307 ))]
308 fn new_from_parquet(
309 sources: Wrap<ScanSources>,
310 schema: Option<Wrap<Schema>>,
311 scan_options: PyScanOptions,
312 parallel: Wrap<ParallelStrategy>,
313 low_memory: bool,
314 use_statistics: bool,
315 ) -> PyResult<Self> {
316 use crate::utils::to_py_err;
317
318 let parallel = parallel.0;
319
320 let options = ParquetOptions {
321 schema: schema.map(|x| Arc::new(x.0)),
322 parallel,
323 low_memory,
324 use_statistics,
325 };
326
327 let sources = sources.0;
328 let first_path = sources.first_path().map(|p| p.to_path_buf());
329
330 let unified_scan_args = scan_options.extract_unified_scan_args(first_path.as_ref())?;
331
332 let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
333 .map_err(to_py_err)?
334 .build()
335 .into();
336
337 Ok(lf.into())
338 }
339
340 #[cfg(feature = "ipc")]
341 #[staticmethod]
342 #[pyo3(signature = (
343 source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
344 hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
345 include_file_paths
346 ))]
347 fn new_from_ipc(
348 source: Option<PyObject>,
349 sources: Wrap<ScanSources>,
350 n_rows: Option<usize>,
351 cache: bool,
352 rechunk: bool,
353 row_index: Option<(String, IdxSize)>,
354 cloud_options: Option<Vec<(String, String)>>,
355 credential_provider: Option<PyObject>,
356 hive_partitioning: Option<bool>,
357 hive_schema: Option<Wrap<Schema>>,
358 try_parse_hive_dates: bool,
359 retries: usize,
360 file_cache_ttl: Option<u64>,
361 include_file_paths: Option<String>,
362 ) -> PyResult<Self> {
363 #[cfg(feature = "cloud")]
364 use cloud::credential_provider::PlCredentialProvider;
365 let row_index = row_index.map(|(name, offset)| RowIndex {
366 name: name.into(),
367 offset,
368 });
369
370 let hive_options = HiveOptions {
371 enabled: hive_partitioning,
372 hive_start_idx: 0,
373 schema: hive_schema.map(|x| Arc::new(x.0)),
374 try_parse_dates: try_parse_hive_dates,
375 };
376
377 let mut args = ScanArgsIpc {
378 n_rows,
379 cache,
380 rechunk,
381 row_index,
382 cloud_options: None,
383 hive_options,
384 include_file_paths: include_file_paths.map(|x| x.into()),
385 };
386
387 let sources = sources.0;
388 let (first_path, sources) = match source {
389 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
390 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
391 };
392
393 #[cfg(feature = "cloud")]
394 if let Some(first_path) = first_path {
395 let first_path_url = first_path.to_string_lossy();
396
397 let mut cloud_options =
398 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
399 if let Some(file_cache_ttl) = file_cache_ttl {
400 cloud_options.file_cache_ttl = file_cache_ttl;
401 }
402 args.cloud_options = Some(
403 cloud_options
404 .with_max_retries(retries)
405 .with_credential_provider(
406 credential_provider.map(PlCredentialProvider::from_python_builder),
407 ),
408 );
409 }
410
411 let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
412 Ok(lf.into())
413 }
414
415 #[staticmethod]
416 #[pyo3(signature = (
417 dataset_object
418 ))]
419 fn new_from_dataset_object(dataset_object: PyObject) -> PyResult<Self> {
420 use crate::dataset::dataset_provider_funcs;
421
422 polars_plan::dsl::DATASET_PROVIDER_VTABLE.get_or_init(|| PythonDatasetProviderVTable {
423 reader_name: dataset_provider_funcs::reader_name,
424 schema: dataset_provider_funcs::schema,
425 to_dataset_scan: dataset_provider_funcs::to_dataset_scan,
426 });
427
428 let lf =
429 LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
430 .into();
431
432 Ok(lf)
433 }
434
435 #[staticmethod]
436 fn scan_from_python_function_arrow_schema(
437 schema: &Bound<'_, PyList>,
438 scan_fn: PyObject,
439 pyarrow: bool,
440 validate_schema: bool,
441 ) -> PyResult<Self> {
442 let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
443
444 Ok(LazyFrame::scan_from_python_function(
445 Either::Right(schema),
446 scan_fn,
447 pyarrow,
448 validate_schema,
449 )
450 .into())
451 }
452
453 #[staticmethod]
454 fn scan_from_python_function_pl_schema(
455 schema: Vec<(PyBackedStr, Wrap<DataType>)>,
456 scan_fn: PyObject,
457 pyarrow: bool,
458 validate_schema: bool,
459 ) -> PyResult<Self> {
460 let schema = Arc::new(Schema::from_iter(
461 schema
462 .into_iter()
463 .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
464 ));
465 Ok(LazyFrame::scan_from_python_function(
466 Either::Right(schema),
467 scan_fn,
468 pyarrow,
469 validate_schema,
470 )
471 .into())
472 }
473
474 #[staticmethod]
475 fn scan_from_python_function_schema_function(
476 schema_fn: PyObject,
477 scan_fn: PyObject,
478 validate_schema: bool,
479 ) -> PyResult<Self> {
480 Ok(LazyFrame::scan_from_python_function(
481 Either::Left(schema_fn),
482 scan_fn,
483 false,
484 validate_schema,
485 )
486 .into())
487 }
488
489 fn describe_plan(&self, py: Python) -> PyResult<String> {
490 py.enter_polars(|| self.ldf.describe_plan())
491 }
492
493 fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
494 py.enter_polars(|| self.ldf.describe_optimized_plan())
495 }
496
497 fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
498 py.enter_polars(|| self.ldf.describe_plan_tree())
499 }
500
501 fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
502 py.enter_polars(|| self.ldf.describe_optimized_plan_tree())
503 }
504
505 fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
506 py.enter_polars(|| self.ldf.to_dot(optimized))
507 }
508
509 #[cfg(feature = "new_streaming")]
510 fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
511 py.enter_polars(|| self.ldf.to_dot_streaming_phys(optimized))
512 }
513
514 fn optimization_toggle(
515 &self,
516 type_coercion: bool,
517 type_check: bool,
518 predicate_pushdown: bool,
519 projection_pushdown: bool,
520 simplify_expression: bool,
521 slice_pushdown: bool,
522 comm_subplan_elim: bool,
523 comm_subexpr_elim: bool,
524 cluster_with_columns: bool,
525 collapse_joins: bool,
526 _eager: bool,
527 _check_order: bool,
528 #[allow(unused_variables)] new_streaming: bool,
529 ) -> Self {
530 let ldf = self.ldf.clone();
531 let mut ldf = ldf
532 .with_type_coercion(type_coercion)
533 .with_type_check(type_check)
534 .with_predicate_pushdown(predicate_pushdown)
535 .with_simplify_expr(simplify_expression)
536 .with_slice_pushdown(slice_pushdown)
537 .with_cluster_with_columns(cluster_with_columns)
538 .with_collapse_joins(collapse_joins)
539 .with_check_order(_check_order)
540 ._with_eager(_eager)
541 .with_projection_pushdown(projection_pushdown);
542
543 #[cfg(feature = "new_streaming")]
544 {
545 ldf = ldf.with_new_streaming(new_streaming);
546 }
547
548 #[cfg(feature = "cse")]
549 {
550 ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
551 ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
552 }
553
554 ldf.into()
555 }
556
557 fn sort(
558 &self,
559 by_column: &str,
560 descending: bool,
561 nulls_last: bool,
562 maintain_order: bool,
563 multithreaded: bool,
564 ) -> Self {
565 let ldf = self.ldf.clone();
566 ldf.sort(
567 [by_column],
568 SortMultipleOptions {
569 descending: vec![descending],
570 nulls_last: vec![nulls_last],
571 multithreaded,
572 maintain_order,
573 limit: None,
574 },
575 )
576 .into()
577 }
578
579 fn sort_by_exprs(
580 &self,
581 by: Vec<PyExpr>,
582 descending: Vec<bool>,
583 nulls_last: Vec<bool>,
584 maintain_order: bool,
585 multithreaded: bool,
586 ) -> Self {
587 let ldf = self.ldf.clone();
588 let exprs = by.to_exprs();
589 ldf.sort_by_exprs(
590 exprs,
591 SortMultipleOptions {
592 descending,
593 nulls_last,
594 maintain_order,
595 multithreaded,
596 limit: None,
597 },
598 )
599 .into()
600 }
601
602 fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
603 let ldf = self.ldf.clone();
604 let exprs = by.to_exprs();
605 ldf.top_k(
606 k,
607 exprs,
608 SortMultipleOptions::new().with_order_descending_multi(reverse),
609 )
610 .into()
611 }
612
613 fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
614 let ldf = self.ldf.clone();
615 let exprs = by.to_exprs();
616 ldf.bottom_k(
617 k,
618 exprs,
619 SortMultipleOptions::new().with_order_descending_multi(reverse),
620 )
621 .into()
622 }
623
624 fn cache(&self) -> Self {
625 let ldf = self.ldf.clone();
626 ldf.cache().into()
627 }
628
629 #[pyo3(signature = (optflags))]
630 fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
631 let ldf = self.ldf.clone();
632 ldf.with_optimizations(optflags.inner).into()
633 }
634
635 #[pyo3(signature = (lambda_post_opt=None))]
636 fn profile(
637 &self,
638 py: Python<'_>,
639 lambda_post_opt: Option<PyObject>,
640 ) -> PyResult<(PyDataFrame, PyDataFrame)> {
641 let (df, time_df) = py.enter_polars(|| {
642 let ldf = self.ldf.clone();
643 if let Some(lambda) = lambda_post_opt {
644 ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
645 post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
646 })
647 } else {
648 ldf.profile()
649 }
650 })?;
651 Ok((df.into(), time_df.into()))
652 }
653
654 #[pyo3(signature = (engine, lambda_post_opt=None))]
655 fn collect(
656 &self,
657 py: Python<'_>,
658 engine: Wrap<Engine>,
659 lambda_post_opt: Option<PyObject>,
660 ) -> PyResult<PyDataFrame> {
661 py.enter_polars_df(|| {
662 let ldf = self.ldf.clone();
663 if let Some(lambda) = lambda_post_opt {
664 ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
665 post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
666 })
667 } else {
668 ldf.collect_with_engine(engine.0)
669 }
670 })
671 }
672
673 #[pyo3(signature = (engine, lambda))]
674 fn collect_with_callback(
675 &self,
676 py: Python<'_>,
677 engine: Wrap<Engine>,
678 lambda: PyObject,
679 ) -> PyResult<()> {
680 py.enter_polars_ok(|| {
681 let ldf = self.ldf.clone();
682
683 polars_core::POOL.spawn(move || {
684 let result = ldf
685 .collect_with_engine(engine.0)
686 .map(PyDataFrame::new)
687 .map_err(PyPolarsErr::from);
688
689 Python::with_gil(|py| match result {
690 Ok(df) => {
691 lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
692 },
693 Err(err) => {
694 lambda
695 .call1(py, (PyErr::from(err),))
696 .map_err(|err| err.restore(py))
697 .ok();
698 },
699 });
700 });
701 })
702 }
703
704 #[cfg(feature = "parquet")]
705 #[pyo3(signature = (
706 target, compression, compression_level, statistics, row_group_size, data_page_size,
707 cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
708 ))]
709 fn sink_parquet(
710 &self,
711 py: Python<'_>,
712 target: SinkTarget,
713 compression: &str,
714 compression_level: Option<i32>,
715 statistics: Wrap<StatisticsOptions>,
716 row_group_size: Option<usize>,
717 data_page_size: Option<usize>,
718 cloud_options: Option<Vec<(String, String)>>,
719 credential_provider: Option<PyObject>,
720 retries: usize,
721 sink_options: Wrap<SinkOptions>,
722 metadata: Wrap<Option<KeyValueMetadata>>,
723 field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
724 ) -> PyResult<PyLazyFrame> {
725 let compression = parse_parquet_compression(compression, compression_level)?;
726
727 let options = ParquetWriteOptions {
728 compression,
729 statistics: statistics.0,
730 row_group_size,
731 data_page_size,
732 key_value_metadata: metadata.0,
733 field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
734 };
735
736 let cloud_options = match target.base_path() {
737 None => None,
738 Some(base_path) => {
739 let cloud_options = parse_cloud_options(
740 base_path.to_str().unwrap(),
741 cloud_options.unwrap_or_default(),
742 )?;
743 Some(
744 cloud_options
745 .with_max_retries(retries)
746 .with_credential_provider(
747 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
748 ),
749 )
750 },
751 };
752
753 py.enter_polars(|| {
754 let ldf = self.ldf.clone();
755 match target {
756 SinkTarget::File(target) => {
757 ldf.sink_parquet(target, options, cloud_options, sink_options.0)
758 },
759 SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
760 Arc::new(partition.base_path),
761 partition.file_path_cb.map(PartitionTargetCallback::Python),
762 partition.variant,
763 options,
764 cloud_options,
765 sink_options.0,
766 partition.per_partition_sort_by,
767 partition.finish_callback,
768 ),
769 }
770 .into()
771 })
772 .map(Into::into)
773 .map_err(Into::into)
774 }
775
776 #[cfg(feature = "ipc")]
777 #[pyo3(signature = (
778 target, compression, compat_level, cloud_options, credential_provider, retries,
779 sink_options
780 ))]
781 fn sink_ipc(
782 &self,
783 py: Python<'_>,
784 target: SinkTarget,
785 compression: Wrap<Option<IpcCompression>>,
786 compat_level: PyCompatLevel,
787 cloud_options: Option<Vec<(String, String)>>,
788 credential_provider: Option<PyObject>,
789 retries: usize,
790 sink_options: Wrap<SinkOptions>,
791 ) -> PyResult<PyLazyFrame> {
792 let options = IpcWriterOptions {
793 compression: compression.0,
794 compat_level: compat_level.0,
795 ..Default::default()
796 };
797
798 #[cfg(feature = "cloud")]
799 let cloud_options = match target.base_path() {
800 None => None,
801 Some(base_path) => {
802 let cloud_options = parse_cloud_options(
803 base_path.to_str().unwrap(),
804 cloud_options.unwrap_or_default(),
805 )?;
806 Some(
807 cloud_options
808 .with_max_retries(retries)
809 .with_credential_provider(
810 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
811 ),
812 )
813 },
814 };
815
816 #[cfg(not(feature = "cloud"))]
817 let cloud_options = None;
818
819 py.enter_polars(|| {
820 let ldf = self.ldf.clone();
821 match target {
822 SinkTarget::File(target) => {
823 ldf.sink_ipc(target, options, cloud_options, sink_options.0)
824 },
825 SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
826 Arc::new(partition.base_path),
827 partition.file_path_cb.map(PartitionTargetCallback::Python),
828 partition.variant,
829 options,
830 cloud_options,
831 sink_options.0,
832 partition.per_partition_sort_by,
833 partition.finish_callback,
834 ),
835 }
836 })
837 .map(Into::into)
838 .map_err(Into::into)
839 }
840
841 #[cfg(feature = "csv")]
842 #[pyo3(signature = (
843 target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
844 datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
845 quote_style, cloud_options, credential_provider, retries, sink_options
846 ))]
847 fn sink_csv(
848 &self,
849 py: Python<'_>,
850 target: SinkTarget,
851 include_bom: bool,
852 include_header: bool,
853 separator: u8,
854 line_terminator: String,
855 quote_char: u8,
856 batch_size: NonZeroUsize,
857 datetime_format: Option<String>,
858 date_format: Option<String>,
859 time_format: Option<String>,
860 float_scientific: Option<bool>,
861 float_precision: Option<usize>,
862 null_value: Option<String>,
863 quote_style: Option<Wrap<QuoteStyle>>,
864 cloud_options: Option<Vec<(String, String)>>,
865 credential_provider: Option<PyObject>,
866 retries: usize,
867 sink_options: Wrap<SinkOptions>,
868 ) -> PyResult<PyLazyFrame> {
869 let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
870 let null_value = null_value.unwrap_or(SerializeOptions::default().null);
871
872 let serialize_options = SerializeOptions {
873 date_format,
874 time_format,
875 datetime_format,
876 float_scientific,
877 float_precision,
878 separator,
879 quote_char,
880 null: null_value,
881 line_terminator,
882 quote_style,
883 };
884
885 let options = CsvWriterOptions {
886 include_bom,
887 include_header,
888 batch_size,
889 serialize_options,
890 };
891
892 #[cfg(feature = "cloud")]
893 let cloud_options = match target.base_path() {
894 None => None,
895 Some(base_path) => {
896 let cloud_options = parse_cloud_options(
897 base_path.to_str().unwrap(),
898 cloud_options.unwrap_or_default(),
899 )?;
900 Some(
901 cloud_options
902 .with_max_retries(retries)
903 .with_credential_provider(
904 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
905 ),
906 )
907 },
908 };
909
910 #[cfg(not(feature = "cloud"))]
911 let cloud_options = None;
912
913 py.enter_polars(|| {
914 let ldf = self.ldf.clone();
915 match target {
916 SinkTarget::File(target) => {
917 ldf.sink_csv(target, options, cloud_options, sink_options.0)
918 },
919 SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
920 Arc::new(partition.base_path),
921 partition.file_path_cb.map(PartitionTargetCallback::Python),
922 partition.variant,
923 options,
924 cloud_options,
925 sink_options.0,
926 partition.per_partition_sort_by,
927 partition.finish_callback,
928 ),
929 }
930 })
931 .map(Into::into)
932 .map_err(Into::into)
933 }
934
935 #[allow(clippy::too_many_arguments)]
936 #[cfg(feature = "json")]
937 #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
938 fn sink_json(
939 &self,
940 py: Python<'_>,
941 target: SinkTarget,
942 cloud_options: Option<Vec<(String, String)>>,
943 credential_provider: Option<PyObject>,
944 retries: usize,
945 sink_options: Wrap<SinkOptions>,
946 ) -> PyResult<PyLazyFrame> {
947 let options = JsonWriterOptions {};
948
949 let cloud_options = match target.base_path() {
950 None => None,
951 Some(base_path) => {
952 let cloud_options = parse_cloud_options(
953 base_path.to_str().unwrap(),
954 cloud_options.unwrap_or_default(),
955 )?;
956 Some(
957 cloud_options
958 .with_max_retries(retries)
959 .with_credential_provider(
960 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
961 ),
962 )
963 },
964 };
965
966 py.enter_polars(|| {
967 let ldf = self.ldf.clone();
968 match target {
969 SinkTarget::File(path) => {
970 ldf.sink_json(path, options, cloud_options, sink_options.0)
971 },
972 SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
973 Arc::new(partition.base_path),
974 partition.file_path_cb.map(PartitionTargetCallback::Python),
975 partition.variant,
976 options,
977 cloud_options,
978 sink_options.0,
979 partition.per_partition_sort_by,
980 partition.finish_callback,
981 ),
982 }
983 })
984 .map(Into::into)
985 .map_err(Into::into)
986 }
987
988 fn fetch(&self, py: Python<'_>, n_rows: usize) -> PyResult<PyDataFrame> {
989 let ldf = self.ldf.clone();
990 py.enter_polars_df(|| ldf.fetch(n_rows))
991 }
992
993 fn filter(&mut self, predicate: PyExpr) -> Self {
994 let ldf = self.ldf.clone();
995 ldf.filter(predicate.inner).into()
996 }
997
998 fn remove(&mut self, predicate: PyExpr) -> Self {
999 let ldf = self.ldf.clone();
1000 ldf.remove(predicate.inner).into()
1001 }
1002
1003 fn select(&mut self, exprs: Vec<PyExpr>) -> Self {
1004 let ldf = self.ldf.clone();
1005 let exprs = exprs.to_exprs();
1006 ldf.select(exprs).into()
1007 }
1008
1009 fn select_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1010 let ldf = self.ldf.clone();
1011 let exprs = exprs.to_exprs();
1012 ldf.select_seq(exprs).into()
1013 }
1014
1015 fn group_by(&mut self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
1016 let ldf = self.ldf.clone();
1017 let by = by.to_exprs();
1018 let lazy_gb = if maintain_order {
1019 ldf.group_by_stable(by)
1020 } else {
1021 ldf.group_by(by)
1022 };
1023
1024 PyLazyGroupBy { lgb: Some(lazy_gb) }
1025 }
1026
1027 fn rolling(
1028 &mut self,
1029 index_column: PyExpr,
1030 period: &str,
1031 offset: &str,
1032 closed: Wrap<ClosedWindow>,
1033 by: Vec<PyExpr>,
1034 ) -> PyResult<PyLazyGroupBy> {
1035 let closed_window = closed.0;
1036 let ldf = self.ldf.clone();
1037 let by = by
1038 .into_iter()
1039 .map(|pyexpr| pyexpr.inner)
1040 .collect::<Vec<_>>();
1041 let lazy_gb = ldf.rolling(
1042 index_column.inner,
1043 by,
1044 RollingGroupOptions {
1045 index_column: "".into(),
1046 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1047 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1048 closed_window,
1049 },
1050 );
1051
1052 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1053 }
1054
1055 fn group_by_dynamic(
1056 &mut self,
1057 index_column: PyExpr,
1058 every: &str,
1059 period: &str,
1060 offset: &str,
1061 label: Wrap<Label>,
1062 include_boundaries: bool,
1063 closed: Wrap<ClosedWindow>,
1064 group_by: Vec<PyExpr>,
1065 start_by: Wrap<StartBy>,
1066 ) -> PyResult<PyLazyGroupBy> {
1067 let closed_window = closed.0;
1068 let group_by = group_by
1069 .into_iter()
1070 .map(|pyexpr| pyexpr.inner)
1071 .collect::<Vec<_>>();
1072 let ldf = self.ldf.clone();
1073 let lazy_gb = ldf.group_by_dynamic(
1074 index_column.inner,
1075 group_by,
1076 DynamicGroupOptions {
1077 every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1078 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1079 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1080 label: label.0,
1081 include_boundaries,
1082 closed_window,
1083 start_by: start_by.0,
1084 ..Default::default()
1085 },
1086 );
1087
1088 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1089 }
1090
1091 fn with_context(&self, contexts: Vec<Self>) -> Self {
1092 let contexts = contexts.into_iter().map(|ldf| ldf.ldf).collect::<Vec<_>>();
1093 self.ldf.clone().with_context(contexts).into()
1094 }
1095
1096 #[cfg(feature = "asof_join")]
1097 #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1098 fn join_asof(
1099 &self,
1100 other: Self,
1101 left_on: PyExpr,
1102 right_on: PyExpr,
1103 left_by: Option<Vec<PyBackedStr>>,
1104 right_by: Option<Vec<PyBackedStr>>,
1105 allow_parallel: bool,
1106 force_parallel: bool,
1107 suffix: String,
1108 strategy: Wrap<AsofStrategy>,
1109 tolerance: Option<Wrap<AnyValue<'_>>>,
1110 tolerance_str: Option<String>,
1111 coalesce: bool,
1112 allow_eq: bool,
1113 check_sortedness: bool,
1114 ) -> PyResult<Self> {
1115 let coalesce = if coalesce {
1116 JoinCoalesce::CoalesceColumns
1117 } else {
1118 JoinCoalesce::KeepColumns
1119 };
1120 let ldf = self.ldf.clone();
1121 let other = other.ldf;
1122 let left_on = left_on.inner;
1123 let right_on = right_on.inner;
1124 Ok(ldf
1125 .join_builder()
1126 .with(other)
1127 .left_on([left_on])
1128 .right_on([right_on])
1129 .allow_parallel(allow_parallel)
1130 .force_parallel(force_parallel)
1131 .coalesce(coalesce)
1132 .how(JoinType::AsOf(AsOfOptions {
1133 strategy: strategy.0,
1134 left_by: left_by.map(strings_to_pl_smallstr),
1135 right_by: right_by.map(strings_to_pl_smallstr),
1136 tolerance: tolerance.map(|t| t.0.into_static()),
1137 tolerance_str: tolerance_str.map(|s| s.into()),
1138 allow_eq,
1139 check_sortedness,
1140 }))
1141 .suffix(suffix)
1142 .finish()
1143 .into())
1144 }
1145
1146 #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1147 fn join(
1148 &self,
1149 other: Self,
1150 left_on: Vec<PyExpr>,
1151 right_on: Vec<PyExpr>,
1152 allow_parallel: bool,
1153 force_parallel: bool,
1154 nulls_equal: bool,
1155 how: Wrap<JoinType>,
1156 suffix: String,
1157 validate: Wrap<JoinValidation>,
1158 maintain_order: Wrap<MaintainOrderJoin>,
1159 coalesce: Option<bool>,
1160 ) -> PyResult<Self> {
1161 let coalesce = match coalesce {
1162 None => JoinCoalesce::JoinSpecific,
1163 Some(true) => JoinCoalesce::CoalesceColumns,
1164 Some(false) => JoinCoalesce::KeepColumns,
1165 };
1166 let ldf = self.ldf.clone();
1167 let other = other.ldf;
1168 let left_on = left_on
1169 .into_iter()
1170 .map(|pyexpr| pyexpr.inner)
1171 .collect::<Vec<_>>();
1172 let right_on = right_on
1173 .into_iter()
1174 .map(|pyexpr| pyexpr.inner)
1175 .collect::<Vec<_>>();
1176
1177 Ok(ldf
1178 .join_builder()
1179 .with(other)
1180 .left_on(left_on)
1181 .right_on(right_on)
1182 .allow_parallel(allow_parallel)
1183 .force_parallel(force_parallel)
1184 .join_nulls(nulls_equal)
1185 .how(how.0)
1186 .suffix(suffix)
1187 .validate(validate.0)
1188 .coalesce(coalesce)
1189 .maintain_order(maintain_order.0)
1190 .finish()
1191 .into())
1192 }
1193
1194 fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1195 let ldf = self.ldf.clone();
1196 let other = other.ldf;
1197
1198 let predicates = predicates.to_exprs();
1199
1200 Ok(ldf
1201 .join_builder()
1202 .with(other)
1203 .suffix(suffix)
1204 .join_where(predicates)
1205 .into())
1206 }
1207
1208 fn with_columns(&mut self, exprs: Vec<PyExpr>) -> Self {
1209 let ldf = self.ldf.clone();
1210 ldf.with_columns(exprs.to_exprs()).into()
1211 }
1212
1213 fn with_columns_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1214 let ldf = self.ldf.clone();
1215 ldf.with_columns_seq(exprs.to_exprs()).into()
1216 }
1217
1218 fn match_to_schema<'py>(
1219 &self,
1220 schema: Wrap<Schema>,
1221 missing_columns: &Bound<'py, PyAny>,
1222 missing_struct_fields: &Bound<'py, PyAny>,
1223 extra_columns: Wrap<ExtraColumnsPolicy>,
1224 extra_struct_fields: &Bound<'py, PyAny>,
1225 integer_cast: &Bound<'py, PyAny>,
1226 float_cast: &Bound<'py, PyAny>,
1227 ) -> PyResult<Self> {
1228 fn parse_missing_columns<'py>(
1229 schema: &Schema,
1230 missing_columns: &Bound<'py, PyAny>,
1231 ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1232 let mut out = Vec::with_capacity(schema.len());
1233 if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1234 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1235 } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1236 out.extend(std::iter::repeat_n(
1237 MissingColumnsPolicyOrExpr::Raise,
1238 schema.len(),
1239 ));
1240 for (key, value) in dict.iter() {
1241 let key = key.extract::<String>()?;
1242 let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1243 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1244 }
1245 } else {
1246 return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1247 }
1248 Ok(out)
1249 }
1250 fn parse_missing_struct_fields<'py>(
1251 schema: &Schema,
1252 missing_struct_fields: &Bound<'py, PyAny>,
1253 ) -> PyResult<Vec<MissingColumnsPolicy>> {
1254 let mut out = Vec::with_capacity(schema.len());
1255 if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1256 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1257 } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1258 out.extend(std::iter::repeat_n(
1259 MissingColumnsPolicy::Raise,
1260 schema.len(),
1261 ));
1262 for (key, value) in dict.iter() {
1263 let key = key.extract::<String>()?;
1264 let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1265 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1266 }
1267 } else {
1268 return Err(PyTypeError::new_err(
1269 "Invalid value for `missing_struct_fields`",
1270 ));
1271 }
1272 Ok(out)
1273 }
1274 fn parse_extra_struct_fields<'py>(
1275 schema: &Schema,
1276 extra_struct_fields: &Bound<'py, PyAny>,
1277 ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1278 let mut out = Vec::with_capacity(schema.len());
1279 if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1280 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1281 } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1282 out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1283 for (key, value) in dict.iter() {
1284 let key = key.extract::<String>()?;
1285 let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1286 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1287 }
1288 } else {
1289 return Err(PyTypeError::new_err(
1290 "Invalid value for `extra_struct_fields`",
1291 ));
1292 }
1293 Ok(out)
1294 }
1295 fn parse_cast<'py>(
1296 schema: &Schema,
1297 cast: &Bound<'py, PyAny>,
1298 ) -> PyResult<Vec<UpcastOrForbid>> {
1299 let mut out = Vec::with_capacity(schema.len());
1300 if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1301 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1302 } else if let Ok(dict) = cast.downcast::<PyDict>() {
1303 out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1304 for (key, value) in dict.iter() {
1305 let key = key.extract::<String>()?;
1306 let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1307 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1308 }
1309 } else {
1310 return Err(PyTypeError::new_err(
1311 "Invalid value for `integer_cast` / `float_cast`",
1312 ));
1313 }
1314 Ok(out)
1315 }
1316
1317 let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1318 let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1319 let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1320 let integer_cast = parse_cast(&schema.0, integer_cast)?;
1321 let float_cast = parse_cast(&schema.0, float_cast)?;
1322
1323 let per_column = (0..schema.0.len())
1324 .map(|i| MatchToSchemaPerColumn {
1325 missing_columns: missing_columns[i].clone(),
1326 missing_struct_fields: missing_struct_fields[i],
1327 extra_struct_fields: extra_struct_fields[i],
1328 integer_cast: integer_cast[i],
1329 float_cast: float_cast[i],
1330 })
1331 .collect();
1332
1333 let ldf = self.ldf.clone();
1334 Ok(ldf
1335 .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1336 .into())
1337 }
1338
1339 fn rename(&mut self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1340 let ldf = self.ldf.clone();
1341 ldf.rename(existing, new, strict).into()
1342 }
1343
1344 fn reverse(&self) -> Self {
1345 let ldf = self.ldf.clone();
1346 ldf.reverse().into()
1347 }
1348
1349 #[pyo3(signature = (n, fill_value=None))]
1350 fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1351 let lf = self.ldf.clone();
1352 let out = match fill_value {
1353 Some(v) => lf.shift_and_fill(n.inner, v.inner),
1354 None => lf.shift(n.inner),
1355 };
1356 out.into()
1357 }
1358
1359 fn fill_nan(&self, fill_value: PyExpr) -> Self {
1360 let ldf = self.ldf.clone();
1361 ldf.fill_nan(fill_value.inner).into()
1362 }
1363
1364 fn min(&self) -> Self {
1365 let ldf = self.ldf.clone();
1366 let out = ldf.min();
1367 out.into()
1368 }
1369
1370 fn max(&self) -> Self {
1371 let ldf = self.ldf.clone();
1372 let out = ldf.max();
1373 out.into()
1374 }
1375
1376 fn sum(&self) -> Self {
1377 let ldf = self.ldf.clone();
1378 let out = ldf.sum();
1379 out.into()
1380 }
1381
1382 fn mean(&self) -> Self {
1383 let ldf = self.ldf.clone();
1384 let out = ldf.mean();
1385 out.into()
1386 }
1387
1388 fn std(&self, ddof: u8) -> Self {
1389 let ldf = self.ldf.clone();
1390 let out = ldf.std(ddof);
1391 out.into()
1392 }
1393
1394 fn var(&self, ddof: u8) -> Self {
1395 let ldf = self.ldf.clone();
1396 let out = ldf.var(ddof);
1397 out.into()
1398 }
1399
1400 fn median(&self) -> Self {
1401 let ldf = self.ldf.clone();
1402 let out = ldf.median();
1403 out.into()
1404 }
1405
1406 fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1407 let ldf = self.ldf.clone();
1408 let out = ldf.quantile(quantile.inner, interpolation.0);
1409 out.into()
1410 }
1411
1412 fn explode(&self, column: Vec<PyExpr>) -> Self {
1413 let ldf = self.ldf.clone();
1414 let column = column.to_exprs();
1415 ldf.explode(column).into()
1416 }
1417
1418 fn null_count(&self) -> Self {
1419 let ldf = self.ldf.clone();
1420 ldf.null_count().into()
1421 }
1422
1423 #[pyo3(signature = (maintain_order, subset, keep))]
1424 fn unique(
1425 &self,
1426 maintain_order: bool,
1427 subset: Option<Vec<PyExpr>>,
1428 keep: Wrap<UniqueKeepStrategy>,
1429 ) -> Self {
1430 let ldf = self.ldf.clone();
1431 let subset = subset.map(|e| e.to_exprs());
1432 match maintain_order {
1433 true => ldf.unique_stable_generic(subset, keep.0),
1434 false => ldf.unique_generic(subset, keep.0),
1435 }
1436 .into()
1437 }
1438
1439 #[pyo3(signature = (subset=None))]
1440 fn drop_nans(&self, subset: Option<Vec<PyExpr>>) -> Self {
1441 let ldf = self.ldf.clone();
1442 let subset = subset.map(|e| e.to_exprs());
1443 ldf.drop_nans(subset).into()
1444 }
1445
1446 #[pyo3(signature = (subset=None))]
1447 fn drop_nulls(&self, subset: Option<Vec<PyExpr>>) -> Self {
1448 let ldf = self.ldf.clone();
1449 let subset = subset.map(|e| e.to_exprs());
1450 ldf.drop_nulls(subset).into()
1451 }
1452
1453 #[pyo3(signature = (offset, len=None))]
1454 fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1455 let ldf = self.ldf.clone();
1456 ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1457 }
1458
1459 fn tail(&self, n: IdxSize) -> Self {
1460 let ldf = self.ldf.clone();
1461 ldf.tail(n).into()
1462 }
1463
1464 #[cfg(feature = "pivot")]
1465 #[pyo3(signature = (on, index, value_name, variable_name))]
1466 fn unpivot(
1467 &self,
1468 on: Vec<PyExpr>,
1469 index: Vec<PyExpr>,
1470 value_name: Option<String>,
1471 variable_name: Option<String>,
1472 ) -> Self {
1473 let args = UnpivotArgsDSL {
1474 on: on.into_iter().map(|e| e.inner.into()).collect(),
1475 index: index.into_iter().map(|e| e.inner.into()).collect(),
1476 value_name: value_name.map(|s| s.into()),
1477 variable_name: variable_name.map(|s| s.into()),
1478 };
1479
1480 let ldf = self.ldf.clone();
1481 ldf.unpivot(args).into()
1482 }
1483
1484 #[pyo3(signature = (name, offset=None))]
1485 fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1486 let ldf = self.ldf.clone();
1487 ldf.with_row_index(name, offset).into()
1488 }
1489
1490 #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1491 fn map_batches(
1492 &self,
1493 lambda: PyObject,
1494 predicate_pushdown: bool,
1495 projection_pushdown: bool,
1496 slice_pushdown: bool,
1497 streamable: bool,
1498 schema: Option<Wrap<Schema>>,
1499 validate_output: bool,
1500 ) -> Self {
1501 let mut opt = OptFlags::default();
1502 opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1503 opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1504 opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1505 opt.set(OptFlags::NEW_STREAMING, streamable);
1506
1507 self.ldf
1508 .clone()
1509 .map_python(
1510 lambda.into(),
1511 opt,
1512 schema.map(|s| Arc::new(s.0)),
1513 validate_output,
1514 )
1515 .into()
1516 }
1517
1518 fn drop(&self, columns: Vec<PyExpr>, strict: bool) -> Self {
1519 let ldf = self.ldf.clone();
1520 let columns = columns.to_exprs();
1521 if strict {
1522 ldf.drop(columns)
1523 } else {
1524 ldf.drop_no_validate(columns)
1525 }
1526 .into()
1527 }
1528
1529 fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1530 let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1531 cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1532 self.ldf.clone().cast(cast_map, strict).into()
1533 }
1534
1535 fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1536 self.ldf.clone().cast_all(dtype.inner, strict).into()
1537 }
1538
1539 fn clone(&self) -> Self {
1540 self.ldf.clone().into()
1541 }
1542
1543 fn collect_schema<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1544 let schema = py.enter_polars(|| self.ldf.collect_schema())?;
1545
1546 let schema_dict = PyDict::new(py);
1547 schema.iter_fields().for_each(|fld| {
1548 schema_dict
1549 .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1550 .unwrap()
1551 });
1552 Ok(schema_dict)
1553 }
1554
1555 fn unnest(&self, columns: Vec<PyExpr>) -> Self {
1556 let columns = columns.to_exprs();
1557 self.ldf.clone().unnest(columns).into()
1558 }
1559
1560 fn count(&self) -> Self {
1561 let ldf = self.ldf.clone();
1562 ldf.count().into()
1563 }
1564
1565 #[cfg(feature = "merge_sorted")]
1566 fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1567 let out = self
1568 .ldf
1569 .clone()
1570 .merge_sorted(other.ldf, key)
1571 .map_err(PyPolarsErr::from)?;
1572 Ok(out.into())
1573 }
1574}
1575
1576#[cfg(feature = "parquet")]
1577impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1578 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1579 use polars_io::parquet::write::ParquetFieldOverwrites;
1580
1581 let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1582
1583 let name = PyDictMethods::get_item(&parsed, "name")?
1584 .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1585 .transpose()?;
1586 let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1587 PyResult::Ok(ChildFieldOverwrites::None),
1588 |v| {
1589 Ok(
1590 if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1591 ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1592 } else {
1593 ChildFieldOverwrites::ListLike(Box::new(
1594 v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1595 ))
1596 },
1597 )
1598 },
1599 )?;
1600
1601 let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1602 .map(|v| v.extract::<i32>())
1603 .transpose()?;
1604
1605 let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1606 .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1607 .transpose()?;
1608 let metadata = metadata.map(|v| {
1609 v.into_iter()
1610 .map(|v| MetadataKeyValue {
1611 key: v.0.into(),
1612 value: v.1.map(|v| v.into()),
1613 })
1614 .collect()
1615 });
1616
1617 let required = PyDictMethods::get_item(&parsed, "required")?
1618 .map(|v| v.extract::<bool>())
1619 .transpose()?;
1620
1621 Ok(Wrap(ParquetFieldOverwrites {
1622 name,
1623 children,
1624 field_id,
1625 metadata,
1626 required,
1627 }))
1628 }
1629}