1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3
4use either::Either;
5use polars::io::{HiveOptions, RowIndex};
6use polars::time::*;
7use polars_core::prelude::*;
8#[cfg(feature = "parquet")]
9use polars_parquet::arrow::write::StatisticsOptions;
10use polars_plan::dsl::ScanSources;
11use polars_plan::plans::{AExpr, IR};
12use polars_utils::arena::{Arena, Node};
13use polars_utils::python_function::PythonObject;
14use pyo3::exceptions::PyTypeError;
15use pyo3::prelude::*;
16use pyo3::pybacked::PyBackedStr;
17use pyo3::types::{PyDict, PyDictMethods, PyList};
18
19use super::{PyLazyFrame, PyOptFlags, SinkTarget};
20use crate::error::PyPolarsErr;
21use crate::expr::ToExprs;
22use crate::expr::datatype::PyDataTypeExpr;
23use crate::expr::selector::PySelector;
24use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
25use crate::io::PyScanOptions;
26use crate::lazyframe::visit::NodeTraverser;
27use crate::prelude::*;
28use crate::utils::{EnterPolarsExt, to_py_err};
29use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
30
31fn pyobject_to_first_path_and_scan_sources(
32 obj: PyObject,
33) -> PyResult<(Option<PlPath>, ScanSources)> {
34 use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
35 Ok(match get_python_scan_source_input(obj, false)? {
36 PythonScanSourceInput::Path(path) => {
37 (Some(path.clone()), ScanSources::Paths([path].into()))
38 },
39 PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
40 PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
41 })
42}
43
44fn post_opt_callback(
45 lambda: &PyObject,
46 root: Node,
47 lp_arena: &mut Arena<IR>,
48 expr_arena: &mut Arena<AExpr>,
49 duration_since_start: Option<std::time::Duration>,
50) -> PolarsResult<()> {
51 Python::with_gil(|py| {
52 let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
53
54 let arenas = nt.get_arenas();
56
57 lambda
60 .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
61 .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
62
63 std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
67 std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
68
69 Ok(())
70 })
71}
72
73#[pymethods]
74#[allow(clippy::should_implement_trait)]
75impl PyLazyFrame {
76 #[staticmethod]
77 #[cfg(feature = "json")]
78 #[allow(clippy::too_many_arguments)]
79 #[pyo3(signature = (
80 source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
81 row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
82 ))]
83 fn new_from_ndjson(
84 source: Option<PyObject>,
85 sources: Wrap<ScanSources>,
86 infer_schema_length: Option<usize>,
87 schema: Option<Wrap<Schema>>,
88 schema_overrides: Option<Wrap<Schema>>,
89 batch_size: Option<NonZeroUsize>,
90 n_rows: Option<usize>,
91 low_memory: bool,
92 rechunk: bool,
93 row_index: Option<(String, IdxSize)>,
94 ignore_errors: bool,
95 include_file_paths: Option<String>,
96 cloud_options: Option<Vec<(String, String)>>,
97 credential_provider: Option<PyObject>,
98 retries: usize,
99 file_cache_ttl: Option<u64>,
100 ) -> PyResult<Self> {
101 use cloud::credential_provider::PlCredentialProvider;
102 let row_index = row_index.map(|(name, offset)| RowIndex {
103 name: name.into(),
104 offset,
105 });
106
107 let sources = sources.0;
108 let (first_path, sources) = match source {
109 None => (sources.first_path().map(|p| p.into_owned()), sources),
110 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
111 };
112
113 let mut r = LazyJsonLineReader::new_with_sources(sources);
114
115 #[cfg(feature = "cloud")]
116 if let Some(first_path) = first_path {
117 let first_path_url = first_path.to_str();
118
119 let mut cloud_options =
120 parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
121 cloud_options = cloud_options
122 .with_max_retries(retries)
123 .with_credential_provider(
124 credential_provider.map(PlCredentialProvider::from_python_builder),
125 );
126
127 if let Some(file_cache_ttl) = file_cache_ttl {
128 cloud_options.file_cache_ttl = file_cache_ttl;
129 }
130
131 r = r.with_cloud_options(Some(cloud_options));
132 };
133
134 let lf = r
135 .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
136 .with_batch_size(batch_size)
137 .with_n_rows(n_rows)
138 .low_memory(low_memory)
139 .with_rechunk(rechunk)
140 .with_schema(schema.map(|schema| Arc::new(schema.0)))
141 .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
142 .with_row_index(row_index)
143 .with_ignore_errors(ignore_errors)
144 .with_include_file_paths(include_file_paths.map(|x| x.into()))
145 .finish()
146 .map_err(PyPolarsErr::from)?;
147
148 Ok(lf.into())
149 }
150
151 #[staticmethod]
152 #[cfg(feature = "csv")]
153 #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
154 low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
155 infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
156 encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
157 cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
158 )
159 )]
160 fn new_from_csv(
161 source: Option<PyObject>,
162 sources: Wrap<ScanSources>,
163 separator: &str,
164 has_header: bool,
165 ignore_errors: bool,
166 skip_rows: usize,
167 skip_lines: usize,
168 n_rows: Option<usize>,
169 cache: bool,
170 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
171 low_memory: bool,
172 comment_prefix: Option<&str>,
173 quote_char: Option<&str>,
174 null_values: Option<Wrap<NullValues>>,
175 missing_utf8_is_empty_string: bool,
176 infer_schema_length: Option<usize>,
177 with_schema_modify: Option<PyObject>,
178 rechunk: bool,
179 skip_rows_after_header: usize,
180 encoding: Wrap<CsvEncoding>,
181 row_index: Option<(String, IdxSize)>,
182 try_parse_dates: bool,
183 eol_char: &str,
184 raise_if_empty: bool,
185 truncate_ragged_lines: bool,
186 decimal_comma: bool,
187 glob: bool,
188 schema: Option<Wrap<Schema>>,
189 cloud_options: Option<Vec<(String, String)>>,
190 credential_provider: Option<PyObject>,
191 retries: usize,
192 file_cache_ttl: Option<u64>,
193 include_file_paths: Option<String>,
194 ) -> PyResult<Self> {
195 #[cfg(feature = "cloud")]
196 use cloud::credential_provider::PlCredentialProvider;
197
198 let null_values = null_values.map(|w| w.0);
199 let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
200 let separator = separator
201 .as_bytes()
202 .first()
203 .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
204 .copied()
205 .map_err(PyPolarsErr::from)?;
206 let eol_char = eol_char
207 .as_bytes()
208 .first()
209 .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
210 .copied()
211 .map_err(PyPolarsErr::from)?;
212 let row_index = row_index.map(|(name, offset)| RowIndex {
213 name: name.into(),
214 offset,
215 });
216
217 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
218 overwrite_dtype
219 .into_iter()
220 .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
221 .collect::<Schema>()
222 });
223
224 let sources = sources.0;
225 let (first_path, sources) = match source {
226 None => (sources.first_path().map(|p| p.into_owned()), sources),
227 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
228 };
229
230 let mut r = LazyCsvReader::new_with_sources(sources);
231
232 #[cfg(feature = "cloud")]
233 if let Some(first_path) = first_path {
234 let first_path_url = first_path.to_str();
235
236 let mut cloud_options =
237 parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
238 if let Some(file_cache_ttl) = file_cache_ttl {
239 cloud_options.file_cache_ttl = file_cache_ttl;
240 }
241 cloud_options = cloud_options
242 .with_max_retries(retries)
243 .with_credential_provider(
244 credential_provider.map(PlCredentialProvider::from_python_builder),
245 );
246 r = r.with_cloud_options(Some(cloud_options));
247 }
248
249 let mut r = r
250 .with_infer_schema_length(infer_schema_length)
251 .with_separator(separator)
252 .with_has_header(has_header)
253 .with_ignore_errors(ignore_errors)
254 .with_skip_rows(skip_rows)
255 .with_skip_lines(skip_lines)
256 .with_n_rows(n_rows)
257 .with_cache(cache)
258 .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
259 .with_schema(schema.map(|schema| Arc::new(schema.0)))
260 .with_low_memory(low_memory)
261 .with_comment_prefix(comment_prefix.map(|x| x.into()))
262 .with_quote_char(quote_char)
263 .with_eol_char(eol_char)
264 .with_rechunk(rechunk)
265 .with_skip_rows_after_header(skip_rows_after_header)
266 .with_encoding(encoding.0)
267 .with_row_index(row_index)
268 .with_try_parse_dates(try_parse_dates)
269 .with_null_values(null_values)
270 .with_missing_is_null(!missing_utf8_is_empty_string)
271 .with_truncate_ragged_lines(truncate_ragged_lines)
272 .with_decimal_comma(decimal_comma)
273 .with_glob(glob)
274 .with_raise_if_empty(raise_if_empty)
275 .with_include_file_paths(include_file_paths.map(|x| x.into()));
276
277 if let Some(lambda) = with_schema_modify {
278 let f = |schema: Schema| {
279 let iter = schema.iter_names().map(|s| s.as_str());
280 Python::with_gil(|py| {
281 let names = PyList::new(py, iter).unwrap();
282
283 let out = lambda.call1(py, (names,)).expect("python function failed");
284 let new_names = out
285 .extract::<Vec<String>>(py)
286 .expect("python function should return List[str]");
287 polars_ensure!(new_names.len() == schema.len(),
288 ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
289 );
290 Ok(schema
291 .iter_values()
292 .zip(new_names)
293 .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
294 .collect())
295 })
296 };
297 r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
298 }
299
300 Ok(r.finish().map_err(PyPolarsErr::from)?.into())
301 }
302
303 #[cfg(feature = "parquet")]
304 #[staticmethod]
305 #[pyo3(signature = (
306 sources, schema, scan_options, parallel, low_memory, use_statistics
307 ))]
308 fn new_from_parquet(
309 sources: Wrap<ScanSources>,
310 schema: Option<Wrap<Schema>>,
311 scan_options: PyScanOptions,
312 parallel: Wrap<ParallelStrategy>,
313 low_memory: bool,
314 use_statistics: bool,
315 ) -> PyResult<Self> {
316 use crate::utils::to_py_err;
317
318 let parallel = parallel.0;
319
320 let options = ParquetOptions {
321 schema: schema.map(|x| Arc::new(x.0)),
322 parallel,
323 low_memory,
324 use_statistics,
325 };
326
327 let sources = sources.0;
328 let first_path = sources.first_path().map(|p| p.into_owned());
329
330 let unified_scan_args =
331 scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
332
333 let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
334 .map_err(to_py_err)?
335 .build()
336 .into();
337
338 Ok(lf.into())
339 }
340
341 #[cfg(feature = "ipc")]
342 #[staticmethod]
343 #[pyo3(signature = (
344 source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
345 hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
346 include_file_paths
347 ))]
348 fn new_from_ipc(
349 source: Option<PyObject>,
350 sources: Wrap<ScanSources>,
351 n_rows: Option<usize>,
352 cache: bool,
353 rechunk: bool,
354 row_index: Option<(String, IdxSize)>,
355 cloud_options: Option<Vec<(String, String)>>,
356 credential_provider: Option<PyObject>,
357 hive_partitioning: Option<bool>,
358 hive_schema: Option<Wrap<Schema>>,
359 try_parse_hive_dates: bool,
360 retries: usize,
361 file_cache_ttl: Option<u64>,
362 include_file_paths: Option<String>,
363 ) -> PyResult<Self> {
364 #[cfg(feature = "cloud")]
365 use cloud::credential_provider::PlCredentialProvider;
366 let row_index = row_index.map(|(name, offset)| RowIndex {
367 name: name.into(),
368 offset,
369 });
370
371 let hive_options = HiveOptions {
372 enabled: hive_partitioning,
373 hive_start_idx: 0,
374 schema: hive_schema.map(|x| Arc::new(x.0)),
375 try_parse_dates: try_parse_hive_dates,
376 };
377
378 let mut args = ScanArgsIpc {
379 n_rows,
380 cache,
381 rechunk,
382 row_index,
383 cloud_options: None,
384 hive_options,
385 include_file_paths: include_file_paths.map(|x| x.into()),
386 };
387
388 let sources = sources.0;
389 let (first_path, sources) = match source {
390 None => (sources.first_path().map(|p| p.into_owned()), sources),
391 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
392 };
393
394 #[cfg(feature = "cloud")]
395 if let Some(first_path) = first_path {
396 let first_path_url = first_path.to_str();
397
398 let mut cloud_options =
399 parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
400 if let Some(file_cache_ttl) = file_cache_ttl {
401 cloud_options.file_cache_ttl = file_cache_ttl;
402 }
403 args.cloud_options = Some(
404 cloud_options
405 .with_max_retries(retries)
406 .with_credential_provider(
407 credential_provider.map(PlCredentialProvider::from_python_builder),
408 ),
409 );
410 }
411
412 let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
413 Ok(lf.into())
414 }
415
416 #[staticmethod]
417 #[pyo3(signature = (
418 dataset_object
419 ))]
420 fn new_from_dataset_object(dataset_object: PyObject) -> PyResult<Self> {
421 use crate::dataset::dataset_provider_funcs;
422
423 polars_plan::dsl::DATASET_PROVIDER_VTABLE.get_or_init(|| PythonDatasetProviderVTable {
424 name: dataset_provider_funcs::name,
425 schema: dataset_provider_funcs::schema,
426 to_dataset_scan: dataset_provider_funcs::to_dataset_scan,
427 });
428
429 let lf =
430 LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
431 .into();
432
433 Ok(lf)
434 }
435
436 #[staticmethod]
437 fn scan_from_python_function_arrow_schema(
438 schema: &Bound<'_, PyList>,
439 scan_fn: PyObject,
440 pyarrow: bool,
441 validate_schema: bool,
442 ) -> PyResult<Self> {
443 let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
444
445 Ok(LazyFrame::scan_from_python_function(
446 Either::Right(schema),
447 scan_fn,
448 pyarrow,
449 validate_schema,
450 )
451 .into())
452 }
453
454 #[staticmethod]
455 fn scan_from_python_function_pl_schema(
456 schema: Vec<(PyBackedStr, Wrap<DataType>)>,
457 scan_fn: PyObject,
458 pyarrow: bool,
459 validate_schema: bool,
460 ) -> PyResult<Self> {
461 let schema = Arc::new(Schema::from_iter(
462 schema
463 .into_iter()
464 .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
465 ));
466 Ok(LazyFrame::scan_from_python_function(
467 Either::Right(schema),
468 scan_fn,
469 pyarrow,
470 validate_schema,
471 )
472 .into())
473 }
474
475 #[staticmethod]
476 fn scan_from_python_function_schema_function(
477 schema_fn: PyObject,
478 scan_fn: PyObject,
479 validate_schema: bool,
480 ) -> PyResult<Self> {
481 Ok(LazyFrame::scan_from_python_function(
482 Either::Left(schema_fn),
483 scan_fn,
484 false,
485 validate_schema,
486 )
487 .into())
488 }
489
490 fn describe_plan(&self, py: Python) -> PyResult<String> {
491 py.enter_polars(|| self.ldf.describe_plan())
492 }
493
494 fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
495 py.enter_polars(|| self.ldf.describe_optimized_plan())
496 }
497
498 fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
499 py.enter_polars(|| self.ldf.describe_plan_tree())
500 }
501
502 fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
503 py.enter_polars(|| self.ldf.describe_optimized_plan_tree())
504 }
505
506 fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
507 py.enter_polars(|| self.ldf.to_dot(optimized))
508 }
509
510 #[cfg(feature = "new_streaming")]
511 fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
512 py.enter_polars(|| self.ldf.to_dot_streaming_phys(optimized))
513 }
514
515 fn optimization_toggle(
516 &self,
517 type_coercion: bool,
518 type_check: bool,
519 predicate_pushdown: bool,
520 projection_pushdown: bool,
521 simplify_expression: bool,
522 slice_pushdown: bool,
523 comm_subplan_elim: bool,
524 comm_subexpr_elim: bool,
525 cluster_with_columns: bool,
526 collapse_joins: bool,
527 _eager: bool,
528 _check_order: bool,
529 #[allow(unused_variables)] new_streaming: bool,
530 ) -> Self {
531 let ldf = self.ldf.clone();
532 let mut ldf = ldf
533 .with_type_coercion(type_coercion)
534 .with_type_check(type_check)
535 .with_predicate_pushdown(predicate_pushdown)
536 .with_simplify_expr(simplify_expression)
537 .with_slice_pushdown(slice_pushdown)
538 .with_cluster_with_columns(cluster_with_columns)
539 .with_collapse_joins(collapse_joins)
540 .with_check_order(_check_order)
541 ._with_eager(_eager)
542 .with_projection_pushdown(projection_pushdown);
543
544 #[cfg(feature = "new_streaming")]
545 {
546 ldf = ldf.with_new_streaming(new_streaming);
547 }
548
549 #[cfg(feature = "cse")]
550 {
551 ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
552 ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
553 }
554
555 ldf.into()
556 }
557
558 fn sort(
559 &self,
560 by_column: &str,
561 descending: bool,
562 nulls_last: bool,
563 maintain_order: bool,
564 multithreaded: bool,
565 ) -> Self {
566 let ldf = self.ldf.clone();
567 ldf.sort(
568 [by_column],
569 SortMultipleOptions {
570 descending: vec![descending],
571 nulls_last: vec![nulls_last],
572 multithreaded,
573 maintain_order,
574 limit: None,
575 },
576 )
577 .into()
578 }
579
580 fn sort_by_exprs(
581 &self,
582 by: Vec<PyExpr>,
583 descending: Vec<bool>,
584 nulls_last: Vec<bool>,
585 maintain_order: bool,
586 multithreaded: bool,
587 ) -> Self {
588 let ldf = self.ldf.clone();
589 let exprs = by.to_exprs();
590 ldf.sort_by_exprs(
591 exprs,
592 SortMultipleOptions {
593 descending,
594 nulls_last,
595 maintain_order,
596 multithreaded,
597 limit: None,
598 },
599 )
600 .into()
601 }
602
603 fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
604 let ldf = self.ldf.clone();
605 let exprs = by.to_exprs();
606 ldf.top_k(
607 k,
608 exprs,
609 SortMultipleOptions::new().with_order_descending_multi(reverse),
610 )
611 .into()
612 }
613
614 fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
615 let ldf = self.ldf.clone();
616 let exprs = by.to_exprs();
617 ldf.bottom_k(
618 k,
619 exprs,
620 SortMultipleOptions::new().with_order_descending_multi(reverse),
621 )
622 .into()
623 }
624
625 fn cache(&self) -> Self {
626 let ldf = self.ldf.clone();
627 ldf.cache().into()
628 }
629
630 #[pyo3(signature = (optflags))]
631 fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
632 let ldf = self.ldf.clone();
633 ldf.with_optimizations(optflags.inner).into()
634 }
635
636 #[pyo3(signature = (lambda_post_opt=None))]
637 fn profile(
638 &self,
639 py: Python<'_>,
640 lambda_post_opt: Option<PyObject>,
641 ) -> PyResult<(PyDataFrame, PyDataFrame)> {
642 let (df, time_df) = py.enter_polars(|| {
643 let ldf = self.ldf.clone();
644 if let Some(lambda) = lambda_post_opt {
645 ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
646 post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
647 })
648 } else {
649 ldf.profile()
650 }
651 })?;
652 Ok((df.into(), time_df.into()))
653 }
654
655 #[pyo3(signature = (engine, lambda_post_opt=None))]
656 fn collect(
657 &self,
658 py: Python<'_>,
659 engine: Wrap<Engine>,
660 lambda_post_opt: Option<PyObject>,
661 ) -> PyResult<PyDataFrame> {
662 py.enter_polars_df(|| {
663 let ldf = self.ldf.clone();
664 if let Some(lambda) = lambda_post_opt {
665 ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
666 post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
667 })
668 } else {
669 ldf.collect_with_engine(engine.0)
670 }
671 })
672 }
673
674 #[pyo3(signature = (engine, lambda))]
675 fn collect_with_callback(
676 &self,
677 py: Python<'_>,
678 engine: Wrap<Engine>,
679 lambda: PyObject,
680 ) -> PyResult<()> {
681 py.enter_polars_ok(|| {
682 let ldf = self.ldf.clone();
683
684 polars_core::POOL.spawn(move || {
685 let result = ldf
686 .collect_with_engine(engine.0)
687 .map(PyDataFrame::new)
688 .map_err(PyPolarsErr::from);
689
690 Python::with_gil(|py| match result {
691 Ok(df) => {
692 lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
693 },
694 Err(err) => {
695 lambda
696 .call1(py, (PyErr::from(err),))
697 .map_err(|err| err.restore(py))
698 .ok();
699 },
700 });
701 });
702 })
703 }
704
705 #[cfg(feature = "parquet")]
706 #[pyo3(signature = (
707 target, compression, compression_level, statistics, row_group_size, data_page_size,
708 cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
709 ))]
710 fn sink_parquet(
711 &self,
712 py: Python<'_>,
713 target: SinkTarget,
714 compression: &str,
715 compression_level: Option<i32>,
716 statistics: Wrap<StatisticsOptions>,
717 row_group_size: Option<usize>,
718 data_page_size: Option<usize>,
719 cloud_options: Option<Vec<(String, String)>>,
720 credential_provider: Option<PyObject>,
721 retries: usize,
722 sink_options: Wrap<SinkOptions>,
723 metadata: Wrap<Option<KeyValueMetadata>>,
724 field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
725 ) -> PyResult<PyLazyFrame> {
726 let compression = parse_parquet_compression(compression, compression_level)?;
727
728 let options = ParquetWriteOptions {
729 compression,
730 statistics: statistics.0,
731 row_group_size,
732 data_page_size,
733 key_value_metadata: metadata.0,
734 field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
735 };
736
737 let cloud_options = match target.base_path() {
738 None => None,
739 Some(base_path) => {
740 let cloud_options =
741 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
742 Some(
743 cloud_options
744 .with_max_retries(retries)
745 .with_credential_provider(
746 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
747 ),
748 )
749 },
750 };
751
752 py.enter_polars(|| {
753 let ldf = self.ldf.clone();
754 match target {
755 SinkTarget::File(target) => {
756 ldf.sink_parquet(target, options, cloud_options, sink_options.0)
757 },
758 SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
759 Arc::new(partition.base_path.0),
760 partition.file_path_cb.map(PartitionTargetCallback::Python),
761 partition.variant,
762 options,
763 cloud_options,
764 sink_options.0,
765 partition.per_partition_sort_by,
766 partition.finish_callback,
767 ),
768 }
769 .into()
770 })
771 .map(Into::into)
772 .map_err(Into::into)
773 }
774
775 #[cfg(feature = "ipc")]
776 #[pyo3(signature = (
777 target, compression, compat_level, cloud_options, credential_provider, retries,
778 sink_options
779 ))]
780 fn sink_ipc(
781 &self,
782 py: Python<'_>,
783 target: SinkTarget,
784 compression: Wrap<Option<IpcCompression>>,
785 compat_level: PyCompatLevel,
786 cloud_options: Option<Vec<(String, String)>>,
787 credential_provider: Option<PyObject>,
788 retries: usize,
789 sink_options: Wrap<SinkOptions>,
790 ) -> PyResult<PyLazyFrame> {
791 let options = IpcWriterOptions {
792 compression: compression.0,
793 compat_level: compat_level.0,
794 ..Default::default()
795 };
796
797 #[cfg(feature = "cloud")]
798 let cloud_options = match target.base_path() {
799 None => None,
800 Some(base_path) => {
801 let cloud_options =
802 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
803 Some(
804 cloud_options
805 .with_max_retries(retries)
806 .with_credential_provider(
807 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
808 ),
809 )
810 },
811 };
812
813 #[cfg(not(feature = "cloud"))]
814 let cloud_options = None;
815
816 py.enter_polars(|| {
817 let ldf = self.ldf.clone();
818 match target {
819 SinkTarget::File(target) => {
820 ldf.sink_ipc(target, options, cloud_options, sink_options.0)
821 },
822 SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
823 Arc::new(partition.base_path.0),
824 partition.file_path_cb.map(PartitionTargetCallback::Python),
825 partition.variant,
826 options,
827 cloud_options,
828 sink_options.0,
829 partition.per_partition_sort_by,
830 partition.finish_callback,
831 ),
832 }
833 })
834 .map(Into::into)
835 .map_err(Into::into)
836 }
837
838 #[cfg(feature = "csv")]
839 #[pyo3(signature = (
840 target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
841 datetime_format, date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
842 quote_style, cloud_options, credential_provider, retries, sink_options
843 ))]
844 fn sink_csv(
845 &self,
846 py: Python<'_>,
847 target: SinkTarget,
848 include_bom: bool,
849 include_header: bool,
850 separator: u8,
851 line_terminator: String,
852 quote_char: u8,
853 batch_size: NonZeroUsize,
854 datetime_format: Option<String>,
855 date_format: Option<String>,
856 time_format: Option<String>,
857 float_scientific: Option<bool>,
858 float_precision: Option<usize>,
859 decimal_comma: bool,
860 null_value: Option<String>,
861 quote_style: Option<Wrap<QuoteStyle>>,
862 cloud_options: Option<Vec<(String, String)>>,
863 credential_provider: Option<PyObject>,
864 retries: usize,
865 sink_options: Wrap<SinkOptions>,
866 ) -> PyResult<PyLazyFrame> {
867 let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
868 let null_value = null_value.unwrap_or(SerializeOptions::default().null);
869
870 let serialize_options = SerializeOptions {
871 date_format,
872 time_format,
873 datetime_format,
874 float_scientific,
875 float_precision,
876 decimal_comma,
877 separator,
878 quote_char,
879 null: null_value,
880 line_terminator,
881 quote_style,
882 };
883
884 let options = CsvWriterOptions {
885 include_bom,
886 include_header,
887 batch_size,
888 serialize_options,
889 };
890
891 #[cfg(feature = "cloud")]
892 let cloud_options = match target.base_path() {
893 None => None,
894 Some(base_path) => {
895 let cloud_options =
896 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
897 Some(
898 cloud_options
899 .with_max_retries(retries)
900 .with_credential_provider(
901 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
902 ),
903 )
904 },
905 };
906
907 #[cfg(not(feature = "cloud"))]
908 let cloud_options = None;
909
910 py.enter_polars(|| {
911 let ldf = self.ldf.clone();
912 match target {
913 SinkTarget::File(target) => {
914 ldf.sink_csv(target, options, cloud_options, sink_options.0)
915 },
916 SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
917 Arc::new(partition.base_path.0),
918 partition.file_path_cb.map(PartitionTargetCallback::Python),
919 partition.variant,
920 options,
921 cloud_options,
922 sink_options.0,
923 partition.per_partition_sort_by,
924 partition.finish_callback,
925 ),
926 }
927 })
928 .map(Into::into)
929 .map_err(Into::into)
930 }
931
932 #[allow(clippy::too_many_arguments)]
933 #[cfg(feature = "json")]
934 #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
935 fn sink_json(
936 &self,
937 py: Python<'_>,
938 target: SinkTarget,
939 cloud_options: Option<Vec<(String, String)>>,
940 credential_provider: Option<PyObject>,
941 retries: usize,
942 sink_options: Wrap<SinkOptions>,
943 ) -> PyResult<PyLazyFrame> {
944 let options = JsonWriterOptions {};
945
946 let cloud_options = match target.base_path() {
947 None => None,
948 Some(base_path) => {
949 let cloud_options =
950 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
951 Some(
952 cloud_options
953 .with_max_retries(retries)
954 .with_credential_provider(
955 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
956 ),
957 )
958 },
959 };
960
961 py.enter_polars(|| {
962 let ldf = self.ldf.clone();
963 match target {
964 SinkTarget::File(path) => {
965 ldf.sink_json(path, options, cloud_options, sink_options.0)
966 },
967 SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
968 Arc::new(partition.base_path.0),
969 partition.file_path_cb.map(PartitionTargetCallback::Python),
970 partition.variant,
971 options,
972 cloud_options,
973 sink_options.0,
974 partition.per_partition_sort_by,
975 partition.finish_callback,
976 ),
977 }
978 })
979 .map(Into::into)
980 .map_err(Into::into)
981 }
982
983 fn filter(&mut self, predicate: PyExpr) -> Self {
984 let ldf = self.ldf.clone();
985 ldf.filter(predicate.inner).into()
986 }
987
988 fn remove(&mut self, predicate: PyExpr) -> Self {
989 let ldf = self.ldf.clone();
990 ldf.remove(predicate.inner).into()
991 }
992
993 fn select(&mut self, exprs: Vec<PyExpr>) -> Self {
994 let ldf = self.ldf.clone();
995 let exprs = exprs.to_exprs();
996 ldf.select(exprs).into()
997 }
998
999 fn select_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1000 let ldf = self.ldf.clone();
1001 let exprs = exprs.to_exprs();
1002 ldf.select_seq(exprs).into()
1003 }
1004
1005 fn group_by(&mut self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
1006 let ldf = self.ldf.clone();
1007 let by = by.to_exprs();
1008 let lazy_gb = if maintain_order {
1009 ldf.group_by_stable(by)
1010 } else {
1011 ldf.group_by(by)
1012 };
1013
1014 PyLazyGroupBy { lgb: Some(lazy_gb) }
1015 }
1016
1017 fn rolling(
1018 &mut self,
1019 index_column: PyExpr,
1020 period: &str,
1021 offset: &str,
1022 closed: Wrap<ClosedWindow>,
1023 by: Vec<PyExpr>,
1024 ) -> PyResult<PyLazyGroupBy> {
1025 let closed_window = closed.0;
1026 let ldf = self.ldf.clone();
1027 let by = by
1028 .into_iter()
1029 .map(|pyexpr| pyexpr.inner)
1030 .collect::<Vec<_>>();
1031 let lazy_gb = ldf.rolling(
1032 index_column.inner,
1033 by,
1034 RollingGroupOptions {
1035 index_column: "".into(),
1036 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1037 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1038 closed_window,
1039 },
1040 );
1041
1042 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1043 }
1044
1045 fn group_by_dynamic(
1046 &mut self,
1047 index_column: PyExpr,
1048 every: &str,
1049 period: &str,
1050 offset: &str,
1051 label: Wrap<Label>,
1052 include_boundaries: bool,
1053 closed: Wrap<ClosedWindow>,
1054 group_by: Vec<PyExpr>,
1055 start_by: Wrap<StartBy>,
1056 ) -> PyResult<PyLazyGroupBy> {
1057 let closed_window = closed.0;
1058 let group_by = group_by
1059 .into_iter()
1060 .map(|pyexpr| pyexpr.inner)
1061 .collect::<Vec<_>>();
1062 let ldf = self.ldf.clone();
1063 let lazy_gb = ldf.group_by_dynamic(
1064 index_column.inner,
1065 group_by,
1066 DynamicGroupOptions {
1067 every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1068 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1069 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1070 label: label.0,
1071 include_boundaries,
1072 closed_window,
1073 start_by: start_by.0,
1074 ..Default::default()
1075 },
1076 );
1077
1078 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1079 }
1080
1081 fn with_context(&self, contexts: Vec<Self>) -> Self {
1082 let contexts = contexts.into_iter().map(|ldf| ldf.ldf).collect::<Vec<_>>();
1083 self.ldf.clone().with_context(contexts).into()
1084 }
1085
1086 #[cfg(feature = "asof_join")]
1087 #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1088 fn join_asof(
1089 &self,
1090 other: Self,
1091 left_on: PyExpr,
1092 right_on: PyExpr,
1093 left_by: Option<Vec<PyBackedStr>>,
1094 right_by: Option<Vec<PyBackedStr>>,
1095 allow_parallel: bool,
1096 force_parallel: bool,
1097 suffix: String,
1098 strategy: Wrap<AsofStrategy>,
1099 tolerance: Option<Wrap<AnyValue<'_>>>,
1100 tolerance_str: Option<String>,
1101 coalesce: bool,
1102 allow_eq: bool,
1103 check_sortedness: bool,
1104 ) -> PyResult<Self> {
1105 let coalesce = if coalesce {
1106 JoinCoalesce::CoalesceColumns
1107 } else {
1108 JoinCoalesce::KeepColumns
1109 };
1110 let ldf = self.ldf.clone();
1111 let other = other.ldf;
1112 let left_on = left_on.inner;
1113 let right_on = right_on.inner;
1114 Ok(ldf
1115 .join_builder()
1116 .with(other)
1117 .left_on([left_on])
1118 .right_on([right_on])
1119 .allow_parallel(allow_parallel)
1120 .force_parallel(force_parallel)
1121 .coalesce(coalesce)
1122 .how(JoinType::AsOf(Box::new(AsOfOptions {
1123 strategy: strategy.0,
1124 left_by: left_by.map(strings_to_pl_smallstr),
1125 right_by: right_by.map(strings_to_pl_smallstr),
1126 tolerance: tolerance.map(|t| {
1127 let av = t.0.into_static();
1128 let dtype = av.dtype();
1129 Scalar::new(dtype, av)
1130 }),
1131 tolerance_str: tolerance_str.map(|s| s.into()),
1132 allow_eq,
1133 check_sortedness,
1134 })))
1135 .suffix(suffix)
1136 .finish()
1137 .into())
1138 }
1139
1140 #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1141 fn join(
1142 &self,
1143 other: Self,
1144 left_on: Vec<PyExpr>,
1145 right_on: Vec<PyExpr>,
1146 allow_parallel: bool,
1147 force_parallel: bool,
1148 nulls_equal: bool,
1149 how: Wrap<JoinType>,
1150 suffix: String,
1151 validate: Wrap<JoinValidation>,
1152 maintain_order: Wrap<MaintainOrderJoin>,
1153 coalesce: Option<bool>,
1154 ) -> PyResult<Self> {
1155 let coalesce = match coalesce {
1156 None => JoinCoalesce::JoinSpecific,
1157 Some(true) => JoinCoalesce::CoalesceColumns,
1158 Some(false) => JoinCoalesce::KeepColumns,
1159 };
1160 let ldf = self.ldf.clone();
1161 let other = other.ldf;
1162 let left_on = left_on
1163 .into_iter()
1164 .map(|pyexpr| pyexpr.inner)
1165 .collect::<Vec<_>>();
1166 let right_on = right_on
1167 .into_iter()
1168 .map(|pyexpr| pyexpr.inner)
1169 .collect::<Vec<_>>();
1170
1171 Ok(ldf
1172 .join_builder()
1173 .with(other)
1174 .left_on(left_on)
1175 .right_on(right_on)
1176 .allow_parallel(allow_parallel)
1177 .force_parallel(force_parallel)
1178 .join_nulls(nulls_equal)
1179 .how(how.0)
1180 .suffix(suffix)
1181 .validate(validate.0)
1182 .coalesce(coalesce)
1183 .maintain_order(maintain_order.0)
1184 .finish()
1185 .into())
1186 }
1187
1188 fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1189 let ldf = self.ldf.clone();
1190 let other = other.ldf;
1191
1192 let predicates = predicates.to_exprs();
1193
1194 Ok(ldf
1195 .join_builder()
1196 .with(other)
1197 .suffix(suffix)
1198 .join_where(predicates)
1199 .into())
1200 }
1201
1202 fn with_columns(&mut self, exprs: Vec<PyExpr>) -> Self {
1203 let ldf = self.ldf.clone();
1204 ldf.with_columns(exprs.to_exprs()).into()
1205 }
1206
1207 fn with_columns_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1208 let ldf = self.ldf.clone();
1209 ldf.with_columns_seq(exprs.to_exprs()).into()
1210 }
1211
1212 fn match_to_schema<'py>(
1213 &self,
1214 schema: Wrap<Schema>,
1215 missing_columns: &Bound<'py, PyAny>,
1216 missing_struct_fields: &Bound<'py, PyAny>,
1217 extra_columns: Wrap<ExtraColumnsPolicy>,
1218 extra_struct_fields: &Bound<'py, PyAny>,
1219 integer_cast: &Bound<'py, PyAny>,
1220 float_cast: &Bound<'py, PyAny>,
1221 ) -> PyResult<Self> {
1222 fn parse_missing_columns<'py>(
1223 schema: &Schema,
1224 missing_columns: &Bound<'py, PyAny>,
1225 ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1226 let mut out = Vec::with_capacity(schema.len());
1227 if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1228 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1229 } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1230 out.extend(std::iter::repeat_n(
1231 MissingColumnsPolicyOrExpr::Raise,
1232 schema.len(),
1233 ));
1234 for (key, value) in dict.iter() {
1235 let key = key.extract::<String>()?;
1236 let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1237 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1238 }
1239 } else {
1240 return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1241 }
1242 Ok(out)
1243 }
1244 fn parse_missing_struct_fields<'py>(
1245 schema: &Schema,
1246 missing_struct_fields: &Bound<'py, PyAny>,
1247 ) -> PyResult<Vec<MissingColumnsPolicy>> {
1248 let mut out = Vec::with_capacity(schema.len());
1249 if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1250 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1251 } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1252 out.extend(std::iter::repeat_n(
1253 MissingColumnsPolicy::Raise,
1254 schema.len(),
1255 ));
1256 for (key, value) in dict.iter() {
1257 let key = key.extract::<String>()?;
1258 let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1259 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1260 }
1261 } else {
1262 return Err(PyTypeError::new_err(
1263 "Invalid value for `missing_struct_fields`",
1264 ));
1265 }
1266 Ok(out)
1267 }
1268 fn parse_extra_struct_fields<'py>(
1269 schema: &Schema,
1270 extra_struct_fields: &Bound<'py, PyAny>,
1271 ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1272 let mut out = Vec::with_capacity(schema.len());
1273 if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1274 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1275 } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1276 out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1277 for (key, value) in dict.iter() {
1278 let key = key.extract::<String>()?;
1279 let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1280 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1281 }
1282 } else {
1283 return Err(PyTypeError::new_err(
1284 "Invalid value for `extra_struct_fields`",
1285 ));
1286 }
1287 Ok(out)
1288 }
1289 fn parse_cast<'py>(
1290 schema: &Schema,
1291 cast: &Bound<'py, PyAny>,
1292 ) -> PyResult<Vec<UpcastOrForbid>> {
1293 let mut out = Vec::with_capacity(schema.len());
1294 if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1295 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1296 } else if let Ok(dict) = cast.downcast::<PyDict>() {
1297 out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1298 for (key, value) in dict.iter() {
1299 let key = key.extract::<String>()?;
1300 let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1301 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1302 }
1303 } else {
1304 return Err(PyTypeError::new_err(
1305 "Invalid value for `integer_cast` / `float_cast`",
1306 ));
1307 }
1308 Ok(out)
1309 }
1310
1311 let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1312 let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1313 let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1314 let integer_cast = parse_cast(&schema.0, integer_cast)?;
1315 let float_cast = parse_cast(&schema.0, float_cast)?;
1316
1317 let per_column = (0..schema.0.len())
1318 .map(|i| MatchToSchemaPerColumn {
1319 missing_columns: missing_columns[i].clone(),
1320 missing_struct_fields: missing_struct_fields[i],
1321 extra_struct_fields: extra_struct_fields[i],
1322 integer_cast: integer_cast[i],
1323 float_cast: float_cast[i],
1324 })
1325 .collect();
1326
1327 let ldf = self.ldf.clone();
1328 Ok(ldf
1329 .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1330 .into())
1331 }
1332
1333 fn rename(&mut self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1334 let ldf = self.ldf.clone();
1335 ldf.rename(existing, new, strict).into()
1336 }
1337
1338 fn reverse(&self) -> Self {
1339 let ldf = self.ldf.clone();
1340 ldf.reverse().into()
1341 }
1342
1343 #[pyo3(signature = (n, fill_value=None))]
1344 fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1345 let lf = self.ldf.clone();
1346 let out = match fill_value {
1347 Some(v) => lf.shift_and_fill(n.inner, v.inner),
1348 None => lf.shift(n.inner),
1349 };
1350 out.into()
1351 }
1352
1353 fn fill_nan(&self, fill_value: PyExpr) -> Self {
1354 let ldf = self.ldf.clone();
1355 ldf.fill_nan(fill_value.inner).into()
1356 }
1357
1358 fn min(&self) -> Self {
1359 let ldf = self.ldf.clone();
1360 let out = ldf.min();
1361 out.into()
1362 }
1363
1364 fn max(&self) -> Self {
1365 let ldf = self.ldf.clone();
1366 let out = ldf.max();
1367 out.into()
1368 }
1369
1370 fn sum(&self) -> Self {
1371 let ldf = self.ldf.clone();
1372 let out = ldf.sum();
1373 out.into()
1374 }
1375
1376 fn mean(&self) -> Self {
1377 let ldf = self.ldf.clone();
1378 let out = ldf.mean();
1379 out.into()
1380 }
1381
1382 fn std(&self, ddof: u8) -> Self {
1383 let ldf = self.ldf.clone();
1384 let out = ldf.std(ddof);
1385 out.into()
1386 }
1387
1388 fn var(&self, ddof: u8) -> Self {
1389 let ldf = self.ldf.clone();
1390 let out = ldf.var(ddof);
1391 out.into()
1392 }
1393
1394 fn median(&self) -> Self {
1395 let ldf = self.ldf.clone();
1396 let out = ldf.median();
1397 out.into()
1398 }
1399
1400 fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1401 let ldf = self.ldf.clone();
1402 let out = ldf.quantile(quantile.inner, interpolation.0);
1403 out.into()
1404 }
1405
1406 fn explode(&self, subset: PySelector) -> Self {
1407 self.ldf.clone().explode(subset.inner).into()
1408 }
1409
1410 fn null_count(&self) -> Self {
1411 let ldf = self.ldf.clone();
1412 ldf.null_count().into()
1413 }
1414
1415 #[pyo3(signature = (maintain_order, subset, keep))]
1416 fn unique(
1417 &self,
1418 maintain_order: bool,
1419 subset: Option<PySelector>,
1420 keep: Wrap<UniqueKeepStrategy>,
1421 ) -> Self {
1422 let ldf = self.ldf.clone();
1423 let subset = subset.map(|e| e.inner);
1424 match maintain_order {
1425 true => ldf.unique_stable_generic(subset, keep.0),
1426 false => ldf.unique_generic(subset, keep.0),
1427 }
1428 .into()
1429 }
1430
1431 fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1432 self.ldf.clone().drop_nans(subset.map(|e| e.inner)).into()
1433 }
1434
1435 fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1436 self.ldf.clone().drop_nulls(subset.map(|e| e.inner)).into()
1437 }
1438
1439 #[pyo3(signature = (offset, len=None))]
1440 fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1441 let ldf = self.ldf.clone();
1442 ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1443 }
1444
1445 fn tail(&self, n: IdxSize) -> Self {
1446 let ldf = self.ldf.clone();
1447 ldf.tail(n).into()
1448 }
1449
1450 #[cfg(feature = "pivot")]
1451 #[pyo3(signature = (on, index, value_name, variable_name))]
1452 fn unpivot(
1453 &self,
1454 on: PySelector,
1455 index: PySelector,
1456 value_name: Option<String>,
1457 variable_name: Option<String>,
1458 ) -> Self {
1459 let args = UnpivotArgsDSL {
1460 on: on.inner,
1461 index: index.inner,
1462 value_name: value_name.map(|s| s.into()),
1463 variable_name: variable_name.map(|s| s.into()),
1464 };
1465
1466 let ldf = self.ldf.clone();
1467 ldf.unpivot(args).into()
1468 }
1469
1470 #[pyo3(signature = (name, offset=None))]
1471 fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1472 let ldf = self.ldf.clone();
1473 ldf.with_row_index(name, offset).into()
1474 }
1475
1476 #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1477 fn map_batches(
1478 &self,
1479 lambda: PyObject,
1480 predicate_pushdown: bool,
1481 projection_pushdown: bool,
1482 slice_pushdown: bool,
1483 streamable: bool,
1484 schema: Option<Wrap<Schema>>,
1485 validate_output: bool,
1486 ) -> Self {
1487 let mut opt = OptFlags::default();
1488 opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1489 opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1490 opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1491 opt.set(OptFlags::NEW_STREAMING, streamable);
1492
1493 self.ldf
1494 .clone()
1495 .map_python(
1496 lambda.into(),
1497 opt,
1498 schema.map(|s| Arc::new(s.0)),
1499 validate_output,
1500 )
1501 .into()
1502 }
1503
1504 fn drop(&self, columns: PySelector) -> Self {
1505 self.ldf.clone().drop(columns.inner).into()
1506 }
1507
1508 fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1509 let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1510 cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1511 self.ldf.clone().cast(cast_map, strict).into()
1512 }
1513
1514 fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1515 self.ldf.clone().cast_all(dtype.inner, strict).into()
1516 }
1517
1518 fn clone(&self) -> Self {
1519 self.ldf.clone().into()
1520 }
1521
1522 fn collect_schema<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1523 let schema = py.enter_polars(|| self.ldf.collect_schema())?;
1524
1525 let schema_dict = PyDict::new(py);
1526 schema.iter_fields().for_each(|fld| {
1527 schema_dict
1528 .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1529 .unwrap()
1530 });
1531 Ok(schema_dict)
1532 }
1533
1534 fn unnest(&self, columns: PySelector) -> Self {
1535 self.ldf.clone().unnest(columns.inner).into()
1536 }
1537
1538 fn count(&self) -> Self {
1539 let ldf = self.ldf.clone();
1540 ldf.count().into()
1541 }
1542
1543 #[cfg(feature = "merge_sorted")]
1544 fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1545 let out = self
1546 .ldf
1547 .clone()
1548 .merge_sorted(other.ldf, key)
1549 .map_err(PyPolarsErr::from)?;
1550 Ok(out.into())
1551 }
1552}
1553
1554#[cfg(feature = "parquet")]
1555impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1556 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1557 use polars_io::parquet::write::ParquetFieldOverwrites;
1558
1559 let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1560
1561 let name = PyDictMethods::get_item(&parsed, "name")?
1562 .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1563 .transpose()?;
1564 let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1565 PyResult::Ok(ChildFieldOverwrites::None),
1566 |v| {
1567 Ok(
1568 if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1569 ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1570 } else {
1571 ChildFieldOverwrites::ListLike(Box::new(
1572 v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1573 ))
1574 },
1575 )
1576 },
1577 )?;
1578
1579 let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1580 .map(|v| v.extract::<i32>())
1581 .transpose()?;
1582
1583 let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1584 .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1585 .transpose()?;
1586 let metadata = metadata.map(|v| {
1587 v.into_iter()
1588 .map(|v| MetadataKeyValue {
1589 key: v.0.into(),
1590 value: v.1.map(|v| v.into()),
1591 })
1592 .collect()
1593 });
1594
1595 let required = PyDictMethods::get_item(&parsed, "required")?
1596 .map(|v| v.extract::<bool>())
1597 .transpose()?;
1598
1599 Ok(Wrap(ParquetFieldOverwrites {
1600 name,
1601 children,
1602 field_id,
1603 metadata,
1604 required,
1605 }))
1606 }
1607}