1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::PathBuf;
4
5use either::Either;
6use polars::io::{HiveOptions, RowIndex};
7use polars::time::*;
8use polars_core::prelude::*;
9#[cfg(feature = "parquet")]
10use polars_parquet::arrow::write::StatisticsOptions;
11use polars_plan::dsl::ScanSources;
12use polars_plan::plans::{AExpr, IR};
13use polars_utils::arena::{Arena, Node};
14use polars_utils::python_function::PythonObject;
15use pyo3::exceptions::PyTypeError;
16use pyo3::prelude::*;
17use pyo3::pybacked::PyBackedStr;
18use pyo3::types::{PyDict, PyDictMethods, PyList};
19
20use super::{PyLazyFrame, PyOptFlags, SinkTarget};
21use crate::error::PyPolarsErr;
22use crate::expr::ToExprs;
23use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
24use crate::lazyframe::visit::NodeTraverser;
25use crate::prelude::*;
26use crate::utils::{EnterPolarsExt, to_py_err};
27use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
28
29fn pyobject_to_first_path_and_scan_sources(
30 obj: PyObject,
31) -> PyResult<(Option<PathBuf>, ScanSources)> {
32 use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
33 Ok(match get_python_scan_source_input(obj, false)? {
34 PythonScanSourceInput::Path(path) => {
35 (Some(path.clone()), ScanSources::Paths([path].into()))
36 },
37 PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
38 PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
39 })
40}
41
42fn post_opt_callback(
43 lambda: &PyObject,
44 root: Node,
45 lp_arena: &mut Arena<IR>,
46 expr_arena: &mut Arena<AExpr>,
47 duration_since_start: Option<std::time::Duration>,
48) -> PolarsResult<()> {
49 Python::with_gil(|py| {
50 let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
51
52 let arenas = nt.get_arenas();
54
55 lambda
58 .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
59 .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
60
61 std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
65 std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
66
67 Ok(())
68 })
69}
70
71#[pymethods]
72#[allow(clippy::should_implement_trait)]
73impl PyLazyFrame {
74 #[staticmethod]
75 #[cfg(feature = "json")]
76 #[allow(clippy::too_many_arguments)]
77 #[pyo3(signature = (
78 source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
79 row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
80 ))]
81 fn new_from_ndjson(
82 source: Option<PyObject>,
83 sources: Wrap<ScanSources>,
84 infer_schema_length: Option<usize>,
85 schema: Option<Wrap<Schema>>,
86 schema_overrides: Option<Wrap<Schema>>,
87 batch_size: Option<NonZeroUsize>,
88 n_rows: Option<usize>,
89 low_memory: bool,
90 rechunk: bool,
91 row_index: Option<(String, IdxSize)>,
92 ignore_errors: bool,
93 include_file_paths: Option<String>,
94 cloud_options: Option<Vec<(String, String)>>,
95 credential_provider: Option<PyObject>,
96 retries: usize,
97 file_cache_ttl: Option<u64>,
98 ) -> PyResult<Self> {
99 use cloud::credential_provider::PlCredentialProvider;
100 let row_index = row_index.map(|(name, offset)| RowIndex {
101 name: name.into(),
102 offset,
103 });
104
105 let sources = sources.0;
106 let (first_path, sources) = match source {
107 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
108 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
109 };
110
111 let mut r = LazyJsonLineReader::new_with_sources(sources);
112
113 #[cfg(feature = "cloud")]
114 if let Some(first_path) = first_path {
115 let first_path_url = first_path.to_string_lossy();
116
117 let mut cloud_options =
118 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
119 cloud_options = cloud_options
120 .with_max_retries(retries)
121 .with_credential_provider(
122 credential_provider.map(PlCredentialProvider::from_python_builder),
123 );
124
125 if let Some(file_cache_ttl) = file_cache_ttl {
126 cloud_options.file_cache_ttl = file_cache_ttl;
127 }
128
129 r = r.with_cloud_options(Some(cloud_options));
130 };
131
132 let lf = r
133 .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
134 .with_batch_size(batch_size)
135 .with_n_rows(n_rows)
136 .low_memory(low_memory)
137 .with_rechunk(rechunk)
138 .with_schema(schema.map(|schema| Arc::new(schema.0)))
139 .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
140 .with_row_index(row_index)
141 .with_ignore_errors(ignore_errors)
142 .with_include_file_paths(include_file_paths.map(|x| x.into()))
143 .finish()
144 .map_err(PyPolarsErr::from)?;
145
146 Ok(lf.into())
147 }
148
149 #[staticmethod]
150 #[cfg(feature = "csv")]
151 #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
152 low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
153 infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
154 encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
155 cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
156 )
157 )]
158 fn new_from_csv(
159 source: Option<PyObject>,
160 sources: Wrap<ScanSources>,
161 separator: &str,
162 has_header: bool,
163 ignore_errors: bool,
164 skip_rows: usize,
165 skip_lines: usize,
166 n_rows: Option<usize>,
167 cache: bool,
168 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
169 low_memory: bool,
170 comment_prefix: Option<&str>,
171 quote_char: Option<&str>,
172 null_values: Option<Wrap<NullValues>>,
173 missing_utf8_is_empty_string: bool,
174 infer_schema_length: Option<usize>,
175 with_schema_modify: Option<PyObject>,
176 rechunk: bool,
177 skip_rows_after_header: usize,
178 encoding: Wrap<CsvEncoding>,
179 row_index: Option<(String, IdxSize)>,
180 try_parse_dates: bool,
181 eol_char: &str,
182 raise_if_empty: bool,
183 truncate_ragged_lines: bool,
184 decimal_comma: bool,
185 glob: bool,
186 schema: Option<Wrap<Schema>>,
187 cloud_options: Option<Vec<(String, String)>>,
188 credential_provider: Option<PyObject>,
189 retries: usize,
190 file_cache_ttl: Option<u64>,
191 include_file_paths: Option<String>,
192 ) -> PyResult<Self> {
193 #[cfg(feature = "cloud")]
194 use cloud::credential_provider::PlCredentialProvider;
195
196 let null_values = null_values.map(|w| w.0);
197 let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
198 let separator = separator
199 .as_bytes()
200 .first()
201 .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
202 .copied()
203 .map_err(PyPolarsErr::from)?;
204 let eol_char = eol_char
205 .as_bytes()
206 .first()
207 .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
208 .copied()
209 .map_err(PyPolarsErr::from)?;
210 let row_index = row_index.map(|(name, offset)| RowIndex {
211 name: name.into(),
212 offset,
213 });
214
215 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
216 overwrite_dtype
217 .into_iter()
218 .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
219 .collect::<Schema>()
220 });
221
222 let sources = sources.0;
223 let (first_path, sources) = match source {
224 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
225 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
226 };
227
228 let mut r = LazyCsvReader::new_with_sources(sources);
229
230 #[cfg(feature = "cloud")]
231 if let Some(first_path) = first_path {
232 let first_path_url = first_path.to_string_lossy();
233
234 let mut cloud_options =
235 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
236 if let Some(file_cache_ttl) = file_cache_ttl {
237 cloud_options.file_cache_ttl = file_cache_ttl;
238 }
239 cloud_options = cloud_options
240 .with_max_retries(retries)
241 .with_credential_provider(
242 credential_provider.map(PlCredentialProvider::from_python_builder),
243 );
244 r = r.with_cloud_options(Some(cloud_options));
245 }
246
247 let mut r = r
248 .with_infer_schema_length(infer_schema_length)
249 .with_separator(separator)
250 .with_has_header(has_header)
251 .with_ignore_errors(ignore_errors)
252 .with_skip_rows(skip_rows)
253 .with_skip_lines(skip_lines)
254 .with_n_rows(n_rows)
255 .with_cache(cache)
256 .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
257 .with_schema(schema.map(|schema| Arc::new(schema.0)))
258 .with_low_memory(low_memory)
259 .with_comment_prefix(comment_prefix.map(|x| x.into()))
260 .with_quote_char(quote_char)
261 .with_eol_char(eol_char)
262 .with_rechunk(rechunk)
263 .with_skip_rows_after_header(skip_rows_after_header)
264 .with_encoding(encoding.0)
265 .with_row_index(row_index)
266 .with_try_parse_dates(try_parse_dates)
267 .with_null_values(null_values)
268 .with_missing_is_null(!missing_utf8_is_empty_string)
269 .with_truncate_ragged_lines(truncate_ragged_lines)
270 .with_decimal_comma(decimal_comma)
271 .with_glob(glob)
272 .with_raise_if_empty(raise_if_empty)
273 .with_include_file_paths(include_file_paths.map(|x| x.into()));
274
275 if let Some(lambda) = with_schema_modify {
276 let f = |schema: Schema| {
277 let iter = schema.iter_names().map(|s| s.as_str());
278 Python::with_gil(|py| {
279 let names = PyList::new(py, iter).unwrap();
280
281 let out = lambda.call1(py, (names,)).expect("python function failed");
282 let new_names = out
283 .extract::<Vec<String>>(py)
284 .expect("python function should return List[str]");
285 polars_ensure!(new_names.len() == schema.len(),
286 ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
287 );
288 Ok(schema
289 .iter_values()
290 .zip(new_names)
291 .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
292 .collect())
293 })
294 };
295 r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
296 }
297
298 Ok(r.finish().map_err(PyPolarsErr::from)?.into())
299 }
300
301 #[cfg(feature = "parquet")]
302 #[staticmethod]
303 #[pyo3(signature = (
304 source, sources, n_rows, cache, parallel, rechunk, row_index, low_memory, cloud_options,
305 credential_provider, use_statistics, hive_partitioning, schema, hive_schema,
306 try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns,
307 cast_options,
308 ))]
309 fn new_from_parquet(
310 source: Option<PyObject>,
311 sources: Wrap<ScanSources>,
312 n_rows: Option<usize>,
313 cache: bool,
314 parallel: Wrap<ParallelStrategy>,
315 rechunk: bool,
316 row_index: Option<(String, IdxSize)>,
317 low_memory: bool,
318 cloud_options: Option<Vec<(String, String)>>,
319 credential_provider: Option<PyObject>,
320 use_statistics: bool,
321 hive_partitioning: Option<bool>,
322 schema: Option<Wrap<Schema>>,
323 hive_schema: Option<Wrap<Schema>>,
324 try_parse_hive_dates: bool,
325 retries: usize,
326 glob: bool,
327 include_file_paths: Option<String>,
328 allow_missing_columns: bool,
329 cast_options: Wrap<CastColumnsPolicy>,
330 ) -> PyResult<Self> {
331 use cloud::credential_provider::PlCredentialProvider;
332 use polars_utils::slice_enum::Slice;
333
334 use crate::utils::to_py_err;
335
336 let parallel = parallel.0;
337
338 let options = ParquetOptions {
339 schema: schema.map(|x| Arc::new(x.0)),
340 parallel,
341 low_memory,
342 use_statistics,
343 };
344
345 let sources = sources.0;
346 let (first_path, sources) = match source {
347 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
348 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
349 };
350
351 let cloud_options = if let Some(first_path) = first_path {
352 #[cfg(feature = "cloud")]
353 {
354 let first_path_url = first_path.to_string_lossy();
355 let cloud_options =
356 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
357
358 Some(
359 cloud_options
360 .with_max_retries(retries)
361 .with_credential_provider(
362 credential_provider.map(PlCredentialProvider::from_python_builder),
363 ),
364 )
365 }
366
367 #[cfg(not(feature = "cloud"))]
368 {
369 None
370 }
371 } else {
372 None
373 };
374
375 let hive_schema = hive_schema.map(|s| Arc::new(s.0));
376
377 let row_index = row_index.map(|(name, offset)| RowIndex {
378 name: name.into(),
379 offset,
380 });
381
382 let hive_options = HiveOptions {
383 enabled: hive_partitioning,
384 hive_start_idx: 0,
385 schema: hive_schema,
386 try_parse_dates: try_parse_hive_dates,
387 };
388
389 let unified_scan_args = UnifiedScanArgs {
390 schema: None,
391 cloud_options,
392 hive_options,
393 rechunk,
394 cache,
395 glob,
396 projection: None,
397 row_index,
398 pre_slice: n_rows.map(|len| Slice::Positive { offset: 0, len }),
399 cast_columns_policy: cast_options.0,
400 missing_columns_policy: if allow_missing_columns {
401 MissingColumnsPolicy::Insert
402 } else {
403 MissingColumnsPolicy::Raise
404 },
405 include_file_paths: include_file_paths.map(|x| x.into()),
406 };
407
408 let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
409 .map_err(to_py_err)?
410 .build()
411 .into();
412
413 Ok(lf.into())
414 }
415
416 #[cfg(feature = "ipc")]
417 #[staticmethod]
418 #[pyo3(signature = (
419 source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
420 hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
421 include_file_paths
422 ))]
423 fn new_from_ipc(
424 source: Option<PyObject>,
425 sources: Wrap<ScanSources>,
426 n_rows: Option<usize>,
427 cache: bool,
428 rechunk: bool,
429 row_index: Option<(String, IdxSize)>,
430 cloud_options: Option<Vec<(String, String)>>,
431 credential_provider: Option<PyObject>,
432 hive_partitioning: Option<bool>,
433 hive_schema: Option<Wrap<Schema>>,
434 try_parse_hive_dates: bool,
435 retries: usize,
436 file_cache_ttl: Option<u64>,
437 include_file_paths: Option<String>,
438 ) -> PyResult<Self> {
439 #[cfg(feature = "cloud")]
440 use cloud::credential_provider::PlCredentialProvider;
441 let row_index = row_index.map(|(name, offset)| RowIndex {
442 name: name.into(),
443 offset,
444 });
445
446 let hive_options = HiveOptions {
447 enabled: hive_partitioning,
448 hive_start_idx: 0,
449 schema: hive_schema.map(|x| Arc::new(x.0)),
450 try_parse_dates: try_parse_hive_dates,
451 };
452
453 let mut args = ScanArgsIpc {
454 n_rows,
455 cache,
456 rechunk,
457 row_index,
458 cloud_options: None,
459 hive_options,
460 include_file_paths: include_file_paths.map(|x| x.into()),
461 };
462
463 let sources = sources.0;
464 let (first_path, sources) = match source {
465 None => (sources.first_path().map(|p| p.to_path_buf()), sources),
466 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
467 };
468
469 #[cfg(feature = "cloud")]
470 if let Some(first_path) = first_path {
471 let first_path_url = first_path.to_string_lossy();
472
473 let mut cloud_options =
474 parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?;
475 if let Some(file_cache_ttl) = file_cache_ttl {
476 cloud_options.file_cache_ttl = file_cache_ttl;
477 }
478 args.cloud_options = Some(
479 cloud_options
480 .with_max_retries(retries)
481 .with_credential_provider(
482 credential_provider.map(PlCredentialProvider::from_python_builder),
483 ),
484 );
485 }
486
487 let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
488 Ok(lf.into())
489 }
490
491 #[staticmethod]
492 #[pyo3(signature = (
493 dataset_object
494 ))]
495 fn new_from_dataset_object(dataset_object: PyObject) -> PyResult<Self> {
496 use crate::dataset::dataset_provider_funcs;
497
498 polars_plan::dsl::DATASET_PROVIDER_VTABLE.get_or_init(|| PythonDatasetProviderVTable {
499 reader_name: dataset_provider_funcs::reader_name,
500 schema: dataset_provider_funcs::schema,
501 to_dataset_scan: dataset_provider_funcs::to_dataset_scan,
502 });
503
504 let lf =
505 LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
506 .into();
507
508 Ok(lf)
509 }
510
511 #[staticmethod]
512 fn scan_from_python_function_arrow_schema(
513 schema: &Bound<'_, PyList>,
514 scan_fn: PyObject,
515 pyarrow: bool,
516 validate_schema: bool,
517 ) -> PyResult<Self> {
518 let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
519
520 Ok(LazyFrame::scan_from_python_function(
521 Either::Right(schema),
522 scan_fn,
523 pyarrow,
524 validate_schema,
525 )
526 .into())
527 }
528
529 #[staticmethod]
530 fn scan_from_python_function_pl_schema(
531 schema: Vec<(PyBackedStr, Wrap<DataType>)>,
532 scan_fn: PyObject,
533 pyarrow: bool,
534 validate_schema: bool,
535 ) -> PyResult<Self> {
536 let schema = Arc::new(Schema::from_iter(
537 schema
538 .into_iter()
539 .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
540 ));
541 Ok(LazyFrame::scan_from_python_function(
542 Either::Right(schema),
543 scan_fn,
544 pyarrow,
545 validate_schema,
546 )
547 .into())
548 }
549
550 #[staticmethod]
551 fn scan_from_python_function_schema_function(
552 schema_fn: PyObject,
553 scan_fn: PyObject,
554 validate_schema: bool,
555 ) -> PyResult<Self> {
556 Ok(LazyFrame::scan_from_python_function(
557 Either::Left(schema_fn),
558 scan_fn,
559 false,
560 validate_schema,
561 )
562 .into())
563 }
564
565 fn describe_plan(&self, py: Python) -> PyResult<String> {
566 py.enter_polars(|| self.ldf.describe_plan())
567 }
568
569 fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
570 py.enter_polars(|| self.ldf.describe_optimized_plan())
571 }
572
573 fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
574 py.enter_polars(|| self.ldf.describe_plan_tree())
575 }
576
577 fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
578 py.enter_polars(|| self.ldf.describe_optimized_plan_tree())
579 }
580
581 fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
582 py.enter_polars(|| self.ldf.to_dot(optimized))
583 }
584
585 #[cfg(feature = "new_streaming")]
586 fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
587 py.enter_polars(|| self.ldf.to_dot_streaming_phys(optimized))
588 }
589
590 fn optimization_toggle(
591 &self,
592 type_coercion: bool,
593 type_check: bool,
594 predicate_pushdown: bool,
595 projection_pushdown: bool,
596 simplify_expression: bool,
597 slice_pushdown: bool,
598 comm_subplan_elim: bool,
599 comm_subexpr_elim: bool,
600 cluster_with_columns: bool,
601 collapse_joins: bool,
602 streaming: bool,
603 _eager: bool,
604 _check_order: bool,
605 #[allow(unused_variables)] new_streaming: bool,
606 ) -> Self {
607 let ldf = self.ldf.clone();
608 let mut ldf = ldf
609 .with_type_coercion(type_coercion)
610 .with_type_check(type_check)
611 .with_predicate_pushdown(predicate_pushdown)
612 .with_simplify_expr(simplify_expression)
613 .with_slice_pushdown(slice_pushdown)
614 .with_cluster_with_columns(cluster_with_columns)
615 .with_collapse_joins(collapse_joins)
616 .with_check_order(_check_order)
617 ._with_eager(_eager)
618 .with_projection_pushdown(projection_pushdown);
619
620 #[cfg(feature = "streaming")]
621 {
622 ldf = ldf.with_streaming(streaming);
623 }
624
625 #[cfg(feature = "new_streaming")]
626 {
627 ldf = ldf.with_new_streaming(new_streaming);
628 }
629
630 #[cfg(feature = "cse")]
631 {
632 ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
633 ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
634 }
635
636 ldf.into()
637 }
638
639 fn sort(
640 &self,
641 by_column: &str,
642 descending: bool,
643 nulls_last: bool,
644 maintain_order: bool,
645 multithreaded: bool,
646 ) -> Self {
647 let ldf = self.ldf.clone();
648 ldf.sort(
649 [by_column],
650 SortMultipleOptions {
651 descending: vec![descending],
652 nulls_last: vec![nulls_last],
653 multithreaded,
654 maintain_order,
655 limit: None,
656 },
657 )
658 .into()
659 }
660
661 fn sort_by_exprs(
662 &self,
663 by: Vec<PyExpr>,
664 descending: Vec<bool>,
665 nulls_last: Vec<bool>,
666 maintain_order: bool,
667 multithreaded: bool,
668 ) -> Self {
669 let ldf = self.ldf.clone();
670 let exprs = by.to_exprs();
671 ldf.sort_by_exprs(
672 exprs,
673 SortMultipleOptions {
674 descending,
675 nulls_last,
676 maintain_order,
677 multithreaded,
678 limit: None,
679 },
680 )
681 .into()
682 }
683
684 fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
685 let ldf = self.ldf.clone();
686 let exprs = by.to_exprs();
687 ldf.top_k(
688 k,
689 exprs,
690 SortMultipleOptions::new().with_order_descending_multi(reverse),
691 )
692 .into()
693 }
694
695 fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
696 let ldf = self.ldf.clone();
697 let exprs = by.to_exprs();
698 ldf.bottom_k(
699 k,
700 exprs,
701 SortMultipleOptions::new().with_order_descending_multi(reverse),
702 )
703 .into()
704 }
705
706 fn cache(&self) -> Self {
707 let ldf = self.ldf.clone();
708 ldf.cache().into()
709 }
710
711 #[pyo3(signature = (optflags))]
712 fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
713 let ldf = self.ldf.clone();
714 ldf.with_optimizations(optflags.inner).into()
715 }
716
717 #[pyo3(signature = (lambda_post_opt=None))]
718 fn profile(
719 &self,
720 py: Python<'_>,
721 lambda_post_opt: Option<PyObject>,
722 ) -> PyResult<(PyDataFrame, PyDataFrame)> {
723 let (df, time_df) = py.enter_polars(|| {
724 let ldf = self.ldf.clone();
725 if let Some(lambda) = lambda_post_opt {
726 ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
727 post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
728 })
729 } else {
730 ldf.profile()
731 }
732 })?;
733 Ok((df.into(), time_df.into()))
734 }
735
736 #[pyo3(signature = (engine, lambda_post_opt=None))]
737 fn collect(
738 &self,
739 py: Python<'_>,
740 engine: Wrap<Engine>,
741 lambda_post_opt: Option<PyObject>,
742 ) -> PyResult<PyDataFrame> {
743 py.enter_polars_df(|| {
744 let ldf = self.ldf.clone();
745 if let Some(lambda) = lambda_post_opt {
746 ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
747 post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
748 })
749 } else {
750 ldf.collect_with_engine(engine.0)
751 }
752 })
753 }
754
755 #[pyo3(signature = (engine, lambda))]
756 fn collect_with_callback(
757 &self,
758 py: Python<'_>,
759 engine: Wrap<Engine>,
760 lambda: PyObject,
761 ) -> PyResult<()> {
762 py.enter_polars_ok(|| {
763 let ldf = self.ldf.clone();
764
765 polars_core::POOL.spawn(move || {
766 let result = ldf
767 .collect_with_engine(engine.0)
768 .map(PyDataFrame::new)
769 .map_err(PyPolarsErr::from);
770
771 Python::with_gil(|py| match result {
772 Ok(df) => {
773 lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
774 },
775 Err(err) => {
776 lambda
777 .call1(py, (PyErr::from(err),))
778 .map_err(|err| err.restore(py))
779 .ok();
780 },
781 });
782 });
783 })
784 }
785
786 #[cfg(all(feature = "streaming", feature = "parquet"))]
787 #[pyo3(signature = (
788 target, compression, compression_level, statistics, row_group_size, data_page_size,
789 cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
790 ))]
791 fn sink_parquet(
792 &self,
793 py: Python<'_>,
794 target: SinkTarget,
795 compression: &str,
796 compression_level: Option<i32>,
797 statistics: Wrap<StatisticsOptions>,
798 row_group_size: Option<usize>,
799 data_page_size: Option<usize>,
800 cloud_options: Option<Vec<(String, String)>>,
801 credential_provider: Option<PyObject>,
802 retries: usize,
803 sink_options: Wrap<SinkOptions>,
804 metadata: Wrap<Option<KeyValueMetadata>>,
805 field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
806 ) -> PyResult<PyLazyFrame> {
807 let compression = parse_parquet_compression(compression, compression_level)?;
808
809 let options = ParquetWriteOptions {
810 compression,
811 statistics: statistics.0,
812 row_group_size,
813 data_page_size,
814 key_value_metadata: metadata.0,
815 field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
816 };
817
818 let cloud_options = match target.base_path() {
819 None => None,
820 Some(base_path) => {
821 let cloud_options = parse_cloud_options(
822 base_path.to_str().unwrap(),
823 cloud_options.unwrap_or_default(),
824 )?;
825 Some(
826 cloud_options
827 .with_max_retries(retries)
828 .with_credential_provider(
829 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
830 ),
831 )
832 },
833 };
834
835 py.enter_polars(|| {
836 let ldf = self.ldf.clone();
837 match target {
838 SinkTarget::File(target) => {
839 ldf.sink_parquet(target, options, cloud_options, sink_options.0)
840 },
841 SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
842 Arc::new(partition.base_path),
843 partition.file_path_cb.map(PartitionTargetCallback::Python),
844 partition.variant,
845 options,
846 cloud_options,
847 sink_options.0,
848 ),
849 }
850 .into()
851 })
852 .map(Into::into)
853 .map_err(Into::into)
854 }
855
856 #[cfg(all(feature = "streaming", feature = "ipc"))]
857 #[pyo3(signature = (
858 target, compression, compat_level, cloud_options, credential_provider, retries,
859 sink_options
860 ))]
861 fn sink_ipc(
862 &self,
863 py: Python<'_>,
864 target: SinkTarget,
865 compression: Wrap<Option<IpcCompression>>,
866 compat_level: PyCompatLevel,
867 cloud_options: Option<Vec<(String, String)>>,
868 credential_provider: Option<PyObject>,
869 retries: usize,
870 sink_options: Wrap<SinkOptions>,
871 ) -> PyResult<PyLazyFrame> {
872 let options = IpcWriterOptions {
873 compression: compression.0,
874 compat_level: compat_level.0,
875 ..Default::default()
876 };
877
878 #[cfg(feature = "cloud")]
879 let cloud_options = match target.base_path() {
880 None => None,
881 Some(base_path) => {
882 let cloud_options = parse_cloud_options(
883 base_path.to_str().unwrap(),
884 cloud_options.unwrap_or_default(),
885 )?;
886 Some(
887 cloud_options
888 .with_max_retries(retries)
889 .with_credential_provider(
890 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
891 ),
892 )
893 },
894 };
895
896 #[cfg(not(feature = "cloud"))]
897 let cloud_options = None;
898
899 py.enter_polars(|| {
900 let ldf = self.ldf.clone();
901 match target {
902 SinkTarget::File(target) => {
903 ldf.sink_ipc(target, options, cloud_options, sink_options.0)
904 },
905 SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
906 Arc::new(partition.base_path),
907 partition.file_path_cb.map(PartitionTargetCallback::Python),
908 partition.variant,
909 options,
910 cloud_options,
911 sink_options.0,
912 ),
913 }
914 })
915 .map(Into::into)
916 .map_err(Into::into)
917 }
918
919 #[cfg(all(feature = "streaming", feature = "csv"))]
920 #[pyo3(signature = (
921 target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
922 datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
923 quote_style, cloud_options, credential_provider, retries, sink_options
924 ))]
925 fn sink_csv(
926 &self,
927 py: Python<'_>,
928 target: SinkTarget,
929 include_bom: bool,
930 include_header: bool,
931 separator: u8,
932 line_terminator: String,
933 quote_char: u8,
934 batch_size: NonZeroUsize,
935 datetime_format: Option<String>,
936 date_format: Option<String>,
937 time_format: Option<String>,
938 float_scientific: Option<bool>,
939 float_precision: Option<usize>,
940 null_value: Option<String>,
941 quote_style: Option<Wrap<QuoteStyle>>,
942 cloud_options: Option<Vec<(String, String)>>,
943 credential_provider: Option<PyObject>,
944 retries: usize,
945 sink_options: Wrap<SinkOptions>,
946 ) -> PyResult<PyLazyFrame> {
947 let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
948 let null_value = null_value.unwrap_or(SerializeOptions::default().null);
949
950 let serialize_options = SerializeOptions {
951 date_format,
952 time_format,
953 datetime_format,
954 float_scientific,
955 float_precision,
956 separator,
957 quote_char,
958 null: null_value,
959 line_terminator,
960 quote_style,
961 };
962
963 let options = CsvWriterOptions {
964 include_bom,
965 include_header,
966 batch_size,
967 serialize_options,
968 };
969
970 #[cfg(feature = "cloud")]
971 let cloud_options = match target.base_path() {
972 None => None,
973 Some(base_path) => {
974 let cloud_options = parse_cloud_options(
975 base_path.to_str().unwrap(),
976 cloud_options.unwrap_or_default(),
977 )?;
978 Some(
979 cloud_options
980 .with_max_retries(retries)
981 .with_credential_provider(
982 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
983 ),
984 )
985 },
986 };
987
988 #[cfg(not(feature = "cloud"))]
989 let cloud_options = None;
990
991 py.enter_polars(|| {
992 let ldf = self.ldf.clone();
993 match target {
994 SinkTarget::File(target) => {
995 ldf.sink_csv(target, options, cloud_options, sink_options.0)
996 },
997 SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
998 Arc::new(partition.base_path),
999 partition.file_path_cb.map(PartitionTargetCallback::Python),
1000 partition.variant,
1001 options,
1002 cloud_options,
1003 sink_options.0,
1004 ),
1005 }
1006 })
1007 .map(Into::into)
1008 .map_err(Into::into)
1009 }
1010
1011 #[allow(clippy::too_many_arguments)]
1012 #[cfg(all(feature = "streaming", feature = "json"))]
1013 #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
1014 fn sink_json(
1015 &self,
1016 py: Python<'_>,
1017 target: SinkTarget,
1018 cloud_options: Option<Vec<(String, String)>>,
1019 credential_provider: Option<PyObject>,
1020 retries: usize,
1021 sink_options: Wrap<SinkOptions>,
1022 ) -> PyResult<PyLazyFrame> {
1023 let options = JsonWriterOptions {};
1024
1025 let cloud_options = match target.base_path() {
1026 None => None,
1027 Some(base_path) => {
1028 let cloud_options = parse_cloud_options(
1029 base_path.to_str().unwrap(),
1030 cloud_options.unwrap_or_default(),
1031 )?;
1032 Some(
1033 cloud_options
1034 .with_max_retries(retries)
1035 .with_credential_provider(
1036 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
1037 ),
1038 )
1039 },
1040 };
1041
1042 py.enter_polars(|| {
1043 let ldf = self.ldf.clone();
1044 match target {
1045 SinkTarget::File(path) => {
1046 ldf.sink_json(path, options, cloud_options, sink_options.0)
1047 },
1048 SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
1049 Arc::new(partition.base_path),
1050 partition.file_path_cb.map(PartitionTargetCallback::Python),
1051 partition.variant,
1052 options,
1053 cloud_options,
1054 sink_options.0,
1055 ),
1056 }
1057 })
1058 .map(Into::into)
1059 .map_err(Into::into)
1060 }
1061
1062 fn fetch(&self, py: Python<'_>, n_rows: usize) -> PyResult<PyDataFrame> {
1063 let ldf = self.ldf.clone();
1064 py.enter_polars_df(|| ldf.fetch(n_rows))
1065 }
1066
1067 fn filter(&mut self, predicate: PyExpr) -> Self {
1068 let ldf = self.ldf.clone();
1069 ldf.filter(predicate.inner).into()
1070 }
1071
1072 fn remove(&mut self, predicate: PyExpr) -> Self {
1073 let ldf = self.ldf.clone();
1074 ldf.remove(predicate.inner).into()
1075 }
1076
1077 fn select(&mut self, exprs: Vec<PyExpr>) -> Self {
1078 let ldf = self.ldf.clone();
1079 let exprs = exprs.to_exprs();
1080 ldf.select(exprs).into()
1081 }
1082
1083 fn select_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1084 let ldf = self.ldf.clone();
1085 let exprs = exprs.to_exprs();
1086 ldf.select_seq(exprs).into()
1087 }
1088
1089 fn group_by(&mut self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
1090 let ldf = self.ldf.clone();
1091 let by = by.to_exprs();
1092 let lazy_gb = if maintain_order {
1093 ldf.group_by_stable(by)
1094 } else {
1095 ldf.group_by(by)
1096 };
1097
1098 PyLazyGroupBy { lgb: Some(lazy_gb) }
1099 }
1100
1101 fn rolling(
1102 &mut self,
1103 index_column: PyExpr,
1104 period: &str,
1105 offset: &str,
1106 closed: Wrap<ClosedWindow>,
1107 by: Vec<PyExpr>,
1108 ) -> PyResult<PyLazyGroupBy> {
1109 let closed_window = closed.0;
1110 let ldf = self.ldf.clone();
1111 let by = by
1112 .into_iter()
1113 .map(|pyexpr| pyexpr.inner)
1114 .collect::<Vec<_>>();
1115 let lazy_gb = ldf.rolling(
1116 index_column.inner,
1117 by,
1118 RollingGroupOptions {
1119 index_column: "".into(),
1120 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1121 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1122 closed_window,
1123 },
1124 );
1125
1126 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1127 }
1128
1129 fn group_by_dynamic(
1130 &mut self,
1131 index_column: PyExpr,
1132 every: &str,
1133 period: &str,
1134 offset: &str,
1135 label: Wrap<Label>,
1136 include_boundaries: bool,
1137 closed: Wrap<ClosedWindow>,
1138 group_by: Vec<PyExpr>,
1139 start_by: Wrap<StartBy>,
1140 ) -> PyResult<PyLazyGroupBy> {
1141 let closed_window = closed.0;
1142 let group_by = group_by
1143 .into_iter()
1144 .map(|pyexpr| pyexpr.inner)
1145 .collect::<Vec<_>>();
1146 let ldf = self.ldf.clone();
1147 let lazy_gb = ldf.group_by_dynamic(
1148 index_column.inner,
1149 group_by,
1150 DynamicGroupOptions {
1151 every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1152 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1153 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1154 label: label.0,
1155 include_boundaries,
1156 closed_window,
1157 start_by: start_by.0,
1158 ..Default::default()
1159 },
1160 );
1161
1162 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1163 }
1164
1165 fn with_context(&self, contexts: Vec<Self>) -> Self {
1166 let contexts = contexts.into_iter().map(|ldf| ldf.ldf).collect::<Vec<_>>();
1167 self.ldf.clone().with_context(contexts).into()
1168 }
1169
1170 #[cfg(feature = "asof_join")]
1171 #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1172 fn join_asof(
1173 &self,
1174 other: Self,
1175 left_on: PyExpr,
1176 right_on: PyExpr,
1177 left_by: Option<Vec<PyBackedStr>>,
1178 right_by: Option<Vec<PyBackedStr>>,
1179 allow_parallel: bool,
1180 force_parallel: bool,
1181 suffix: String,
1182 strategy: Wrap<AsofStrategy>,
1183 tolerance: Option<Wrap<AnyValue<'_>>>,
1184 tolerance_str: Option<String>,
1185 coalesce: bool,
1186 allow_eq: bool,
1187 check_sortedness: bool,
1188 ) -> PyResult<Self> {
1189 let coalesce = if coalesce {
1190 JoinCoalesce::CoalesceColumns
1191 } else {
1192 JoinCoalesce::KeepColumns
1193 };
1194 let ldf = self.ldf.clone();
1195 let other = other.ldf;
1196 let left_on = left_on.inner;
1197 let right_on = right_on.inner;
1198 Ok(ldf
1199 .join_builder()
1200 .with(other)
1201 .left_on([left_on])
1202 .right_on([right_on])
1203 .allow_parallel(allow_parallel)
1204 .force_parallel(force_parallel)
1205 .coalesce(coalesce)
1206 .how(JoinType::AsOf(AsOfOptions {
1207 strategy: strategy.0,
1208 left_by: left_by.map(strings_to_pl_smallstr),
1209 right_by: right_by.map(strings_to_pl_smallstr),
1210 tolerance: tolerance.map(|t| t.0.into_static()),
1211 tolerance_str: tolerance_str.map(|s| s.into()),
1212 allow_eq,
1213 check_sortedness,
1214 }))
1215 .suffix(suffix)
1216 .finish()
1217 .into())
1218 }
1219
1220 #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1221 fn join(
1222 &self,
1223 other: Self,
1224 left_on: Vec<PyExpr>,
1225 right_on: Vec<PyExpr>,
1226 allow_parallel: bool,
1227 force_parallel: bool,
1228 nulls_equal: bool,
1229 how: Wrap<JoinType>,
1230 suffix: String,
1231 validate: Wrap<JoinValidation>,
1232 maintain_order: Wrap<MaintainOrderJoin>,
1233 coalesce: Option<bool>,
1234 ) -> PyResult<Self> {
1235 let coalesce = match coalesce {
1236 None => JoinCoalesce::JoinSpecific,
1237 Some(true) => JoinCoalesce::CoalesceColumns,
1238 Some(false) => JoinCoalesce::KeepColumns,
1239 };
1240 let ldf = self.ldf.clone();
1241 let other = other.ldf;
1242 let left_on = left_on
1243 .into_iter()
1244 .map(|pyexpr| pyexpr.inner)
1245 .collect::<Vec<_>>();
1246 let right_on = right_on
1247 .into_iter()
1248 .map(|pyexpr| pyexpr.inner)
1249 .collect::<Vec<_>>();
1250
1251 Ok(ldf
1252 .join_builder()
1253 .with(other)
1254 .left_on(left_on)
1255 .right_on(right_on)
1256 .allow_parallel(allow_parallel)
1257 .force_parallel(force_parallel)
1258 .join_nulls(nulls_equal)
1259 .how(how.0)
1260 .suffix(suffix)
1261 .validate(validate.0)
1262 .coalesce(coalesce)
1263 .maintain_order(maintain_order.0)
1264 .finish()
1265 .into())
1266 }
1267
1268 fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1269 let ldf = self.ldf.clone();
1270 let other = other.ldf;
1271
1272 let predicates = predicates.to_exprs();
1273
1274 Ok(ldf
1275 .join_builder()
1276 .with(other)
1277 .suffix(suffix)
1278 .join_where(predicates)
1279 .into())
1280 }
1281
1282 fn with_columns(&mut self, exprs: Vec<PyExpr>) -> Self {
1283 let ldf = self.ldf.clone();
1284 ldf.with_columns(exprs.to_exprs()).into()
1285 }
1286
1287 fn with_columns_seq(&mut self, exprs: Vec<PyExpr>) -> Self {
1288 let ldf = self.ldf.clone();
1289 ldf.with_columns_seq(exprs.to_exprs()).into()
1290 }
1291
1292 fn match_to_schema<'py>(
1293 &self,
1294 schema: Wrap<Schema>,
1295 missing_columns: &Bound<'py, PyAny>,
1296 missing_struct_fields: &Bound<'py, PyAny>,
1297 extra_columns: Wrap<ExtraColumnsPolicy>,
1298 extra_struct_fields: &Bound<'py, PyAny>,
1299 integer_cast: &Bound<'py, PyAny>,
1300 float_cast: &Bound<'py, PyAny>,
1301 ) -> PyResult<Self> {
1302 fn parse_missing_columns<'py>(
1303 schema: &Schema,
1304 missing_columns: &Bound<'py, PyAny>,
1305 ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1306 let mut out = Vec::with_capacity(schema.len());
1307 if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1308 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1309 } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1310 out.extend(std::iter::repeat_n(
1311 MissingColumnsPolicyOrExpr::Raise,
1312 schema.len(),
1313 ));
1314 for (key, value) in dict.iter() {
1315 let key = key.extract::<String>()?;
1316 let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1317 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1318 }
1319 } else {
1320 return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1321 }
1322 Ok(out)
1323 }
1324 fn parse_missing_struct_fields<'py>(
1325 schema: &Schema,
1326 missing_struct_fields: &Bound<'py, PyAny>,
1327 ) -> PyResult<Vec<MissingColumnsPolicy>> {
1328 let mut out = Vec::with_capacity(schema.len());
1329 if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1330 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1331 } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1332 out.extend(std::iter::repeat_n(
1333 MissingColumnsPolicy::Raise,
1334 schema.len(),
1335 ));
1336 for (key, value) in dict.iter() {
1337 let key = key.extract::<String>()?;
1338 let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1339 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1340 }
1341 } else {
1342 return Err(PyTypeError::new_err(
1343 "Invalid value for `missing_struct_fields`",
1344 ));
1345 }
1346 Ok(out)
1347 }
1348 fn parse_extra_struct_fields<'py>(
1349 schema: &Schema,
1350 extra_struct_fields: &Bound<'py, PyAny>,
1351 ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1352 let mut out = Vec::with_capacity(schema.len());
1353 if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1354 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1355 } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1356 out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1357 for (key, value) in dict.iter() {
1358 let key = key.extract::<String>()?;
1359 let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1360 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1361 }
1362 } else {
1363 return Err(PyTypeError::new_err(
1364 "Invalid value for `extra_struct_fields`",
1365 ));
1366 }
1367 Ok(out)
1368 }
1369 fn parse_cast<'py>(
1370 schema: &Schema,
1371 cast: &Bound<'py, PyAny>,
1372 ) -> PyResult<Vec<UpcastOrForbid>> {
1373 let mut out = Vec::with_capacity(schema.len());
1374 if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1375 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1376 } else if let Ok(dict) = cast.downcast::<PyDict>() {
1377 out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1378 for (key, value) in dict.iter() {
1379 let key = key.extract::<String>()?;
1380 let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1381 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1382 }
1383 } else {
1384 return Err(PyTypeError::new_err(
1385 "Invalid value for `integer_cast` / `float_cast`",
1386 ));
1387 }
1388 Ok(out)
1389 }
1390
1391 let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1392 let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1393 let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1394 let integer_cast = parse_cast(&schema.0, integer_cast)?;
1395 let float_cast = parse_cast(&schema.0, float_cast)?;
1396
1397 let per_column = (0..schema.0.len())
1398 .map(|i| MatchToSchemaPerColumn {
1399 missing_columns: missing_columns[i].clone(),
1400 missing_struct_fields: missing_struct_fields[i],
1401 extra_struct_fields: extra_struct_fields[i],
1402 integer_cast: integer_cast[i],
1403 float_cast: float_cast[i],
1404 })
1405 .collect();
1406
1407 let ldf = self.ldf.clone();
1408 Ok(ldf
1409 .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1410 .into())
1411 }
1412
1413 fn rename(&mut self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1414 let ldf = self.ldf.clone();
1415 ldf.rename(existing, new, strict).into()
1416 }
1417
1418 fn reverse(&self) -> Self {
1419 let ldf = self.ldf.clone();
1420 ldf.reverse().into()
1421 }
1422
1423 #[pyo3(signature = (n, fill_value=None))]
1424 fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1425 let lf = self.ldf.clone();
1426 let out = match fill_value {
1427 Some(v) => lf.shift_and_fill(n.inner, v.inner),
1428 None => lf.shift(n.inner),
1429 };
1430 out.into()
1431 }
1432
1433 fn fill_nan(&self, fill_value: PyExpr) -> Self {
1434 let ldf = self.ldf.clone();
1435 ldf.fill_nan(fill_value.inner).into()
1436 }
1437
1438 fn min(&self) -> Self {
1439 let ldf = self.ldf.clone();
1440 let out = ldf.min();
1441 out.into()
1442 }
1443
1444 fn max(&self) -> Self {
1445 let ldf = self.ldf.clone();
1446 let out = ldf.max();
1447 out.into()
1448 }
1449
1450 fn sum(&self) -> Self {
1451 let ldf = self.ldf.clone();
1452 let out = ldf.sum();
1453 out.into()
1454 }
1455
1456 fn mean(&self) -> Self {
1457 let ldf = self.ldf.clone();
1458 let out = ldf.mean();
1459 out.into()
1460 }
1461
1462 fn std(&self, ddof: u8) -> Self {
1463 let ldf = self.ldf.clone();
1464 let out = ldf.std(ddof);
1465 out.into()
1466 }
1467
1468 fn var(&self, ddof: u8) -> Self {
1469 let ldf = self.ldf.clone();
1470 let out = ldf.var(ddof);
1471 out.into()
1472 }
1473
1474 fn median(&self) -> Self {
1475 let ldf = self.ldf.clone();
1476 let out = ldf.median();
1477 out.into()
1478 }
1479
1480 fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1481 let ldf = self.ldf.clone();
1482 let out = ldf.quantile(quantile.inner, interpolation.0);
1483 out.into()
1484 }
1485
1486 fn explode(&self, column: Vec<PyExpr>) -> Self {
1487 let ldf = self.ldf.clone();
1488 let column = column.to_exprs();
1489 ldf.explode(column).into()
1490 }
1491
1492 fn null_count(&self) -> Self {
1493 let ldf = self.ldf.clone();
1494 ldf.null_count().into()
1495 }
1496
1497 #[pyo3(signature = (maintain_order, subset, keep))]
1498 fn unique(
1499 &self,
1500 maintain_order: bool,
1501 subset: Option<Vec<PyExpr>>,
1502 keep: Wrap<UniqueKeepStrategy>,
1503 ) -> Self {
1504 let ldf = self.ldf.clone();
1505 let subset = subset.map(|e| e.to_exprs());
1506 match maintain_order {
1507 true => ldf.unique_stable_generic(subset, keep.0),
1508 false => ldf.unique_generic(subset, keep.0),
1509 }
1510 .into()
1511 }
1512
1513 #[pyo3(signature = (subset=None))]
1514 fn drop_nans(&self, subset: Option<Vec<PyExpr>>) -> Self {
1515 let ldf = self.ldf.clone();
1516 let subset = subset.map(|e| e.to_exprs());
1517 ldf.drop_nans(subset).into()
1518 }
1519
1520 #[pyo3(signature = (subset=None))]
1521 fn drop_nulls(&self, subset: Option<Vec<PyExpr>>) -> Self {
1522 let ldf = self.ldf.clone();
1523 let subset = subset.map(|e| e.to_exprs());
1524 ldf.drop_nulls(subset).into()
1525 }
1526
1527 #[pyo3(signature = (offset, len=None))]
1528 fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1529 let ldf = self.ldf.clone();
1530 ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1531 }
1532
1533 fn tail(&self, n: IdxSize) -> Self {
1534 let ldf = self.ldf.clone();
1535 ldf.tail(n).into()
1536 }
1537
1538 #[cfg(feature = "pivot")]
1539 #[pyo3(signature = (on, index, value_name, variable_name))]
1540 fn unpivot(
1541 &self,
1542 on: Vec<PyExpr>,
1543 index: Vec<PyExpr>,
1544 value_name: Option<String>,
1545 variable_name: Option<String>,
1546 ) -> Self {
1547 let args = UnpivotArgsDSL {
1548 on: on.into_iter().map(|e| e.inner.into()).collect(),
1549 index: index.into_iter().map(|e| e.inner.into()).collect(),
1550 value_name: value_name.map(|s| s.into()),
1551 variable_name: variable_name.map(|s| s.into()),
1552 };
1553
1554 let ldf = self.ldf.clone();
1555 ldf.unpivot(args).into()
1556 }
1557
1558 #[pyo3(signature = (name, offset=None))]
1559 fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1560 let ldf = self.ldf.clone();
1561 ldf.with_row_index(name, offset).into()
1562 }
1563
1564 #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1565 fn map_batches(
1566 &self,
1567 lambda: PyObject,
1568 predicate_pushdown: bool,
1569 projection_pushdown: bool,
1570 slice_pushdown: bool,
1571 streamable: bool,
1572 schema: Option<Wrap<Schema>>,
1573 validate_output: bool,
1574 ) -> Self {
1575 let mut opt = OptFlags::default();
1576 opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1577 opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1578 opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1579 opt.set(OptFlags::STREAMING, streamable);
1580
1581 self.ldf
1582 .clone()
1583 .map_python(
1584 lambda.into(),
1585 opt,
1586 schema.map(|s| Arc::new(s.0)),
1587 validate_output,
1588 )
1589 .into()
1590 }
1591
1592 fn drop(&self, columns: Vec<PyExpr>, strict: bool) -> Self {
1593 let ldf = self.ldf.clone();
1594 let columns = columns.to_exprs();
1595 if strict {
1596 ldf.drop(columns)
1597 } else {
1598 ldf.drop_no_validate(columns)
1599 }
1600 .into()
1601 }
1602
1603 fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1604 let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1605 cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1606 self.ldf.clone().cast(cast_map, strict).into()
1607 }
1608
1609 fn cast_all(&self, dtype: Wrap<DataType>, strict: bool) -> Self {
1610 self.ldf.clone().cast_all(dtype.0, strict).into()
1611 }
1612
1613 fn clone(&self) -> Self {
1614 self.ldf.clone().into()
1615 }
1616
1617 fn collect_schema<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1618 let schema = py.enter_polars(|| self.ldf.collect_schema())?;
1619
1620 let schema_dict = PyDict::new(py);
1621 schema.iter_fields().for_each(|fld| {
1622 schema_dict
1623 .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1624 .unwrap()
1625 });
1626 Ok(schema_dict)
1627 }
1628
1629 fn unnest(&self, columns: Vec<PyExpr>) -> Self {
1630 let columns = columns.to_exprs();
1631 self.ldf.clone().unnest(columns).into()
1632 }
1633
1634 fn count(&self) -> Self {
1635 let ldf = self.ldf.clone();
1636 ldf.count().into()
1637 }
1638
1639 #[cfg(feature = "merge_sorted")]
1640 fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1641 let out = self
1642 .ldf
1643 .clone()
1644 .merge_sorted(other.ldf, key)
1645 .map_err(PyPolarsErr::from)?;
1646 Ok(out.into())
1647 }
1648}
1649
1650#[cfg(feature = "parquet")]
1651impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1652 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1653 use polars_io::parquet::write::ParquetFieldOverwrites;
1654
1655 let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1656
1657 let name = PyDictMethods::get_item(&parsed, "name")?
1658 .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1659 .transpose()?;
1660 let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1661 PyResult::Ok(ChildFieldOverwrites::None),
1662 |v| {
1663 Ok(
1664 if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1665 ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1666 } else {
1667 ChildFieldOverwrites::ListLike(Box::new(
1668 v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1669 ))
1670 },
1671 )
1672 },
1673 )?;
1674
1675 let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1676 .map(|v| v.extract::<i32>())
1677 .transpose()?;
1678
1679 let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1680 .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1681 .transpose()?;
1682 let metadata = metadata.map(|v| {
1683 v.into_iter()
1684 .map(|v| MetadataKeyValue {
1685 key: v.0.into(),
1686 value: v.1.map(|v| v.into()),
1687 })
1688 .collect()
1689 });
1690
1691 Ok(Wrap(ParquetFieldOverwrites {
1692 name,
1693 children,
1694 field_id,
1695 metadata,
1696 }))
1697 }
1698}