1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3
4use either::Either;
5use polars::io::{HiveOptions, RowIndex};
6use polars::time::*;
7use polars_core::prelude::*;
8#[cfg(feature = "parquet")]
9use polars_parquet::arrow::write::StatisticsOptions;
10use polars_plan::dsl::ScanSources;
11use polars_plan::plans::{AExpr, IR};
12use polars_utils::arena::{Arena, Node};
13use polars_utils::python_function::PythonObject;
14use pyo3::exceptions::PyTypeError;
15use pyo3::prelude::*;
16use pyo3::pybacked::PyBackedStr;
17use pyo3::types::{PyDict, PyDictMethods, PyList};
18
19use super::{PyLazyFrame, PyOptFlags, SinkTarget};
20use crate::error::PyPolarsErr;
21use crate::expr::ToExprs;
22use crate::expr::datatype::PyDataTypeExpr;
23use crate::expr::selector::PySelector;
24use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
25use crate::io::PyScanOptions;
26use crate::lazyframe::visit::NodeTraverser;
27use crate::prelude::*;
28use crate::utils::{EnterPolarsExt, to_py_err};
29use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
30
31fn pyobject_to_first_path_and_scan_sources(
32 obj: PyObject,
33) -> PyResult<(Option<PlPath>, ScanSources)> {
34 use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
35 Ok(match get_python_scan_source_input(obj, false)? {
36 PythonScanSourceInput::Path(path) => {
37 (Some(path.clone()), ScanSources::Paths([path].into()))
38 },
39 PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
40 PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
41 })
42}
43
44fn post_opt_callback(
45 lambda: &PyObject,
46 root: Node,
47 lp_arena: &mut Arena<IR>,
48 expr_arena: &mut Arena<AExpr>,
49 duration_since_start: Option<std::time::Duration>,
50) -> PolarsResult<()> {
51 Python::with_gil(|py| {
52 let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
53
54 let arenas = nt.get_arenas();
56
57 lambda
60 .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
61 .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
62
63 std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
67 std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
68
69 Ok(())
70 })
71}
72
73#[pymethods]
74#[allow(clippy::should_implement_trait)]
75impl PyLazyFrame {
76 #[staticmethod]
77 #[cfg(feature = "json")]
78 #[allow(clippy::too_many_arguments)]
79 #[pyo3(signature = (
80 source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
81 row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
82 ))]
83 fn new_from_ndjson(
84 source: Option<PyObject>,
85 sources: Wrap<ScanSources>,
86 infer_schema_length: Option<usize>,
87 schema: Option<Wrap<Schema>>,
88 schema_overrides: Option<Wrap<Schema>>,
89 batch_size: Option<NonZeroUsize>,
90 n_rows: Option<usize>,
91 low_memory: bool,
92 rechunk: bool,
93 row_index: Option<(String, IdxSize)>,
94 ignore_errors: bool,
95 include_file_paths: Option<String>,
96 cloud_options: Option<Vec<(String, String)>>,
97 credential_provider: Option<PyObject>,
98 retries: usize,
99 file_cache_ttl: Option<u64>,
100 ) -> PyResult<Self> {
101 use cloud::credential_provider::PlCredentialProvider;
102 let row_index = row_index.map(|(name, offset)| RowIndex {
103 name: name.into(),
104 offset,
105 });
106
107 let sources = sources.0;
108 let (first_path, sources) = match source {
109 None => (sources.first_path().map(|p| p.into_owned()), sources),
110 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
111 };
112
113 let mut r = LazyJsonLineReader::new_with_sources(sources);
114
115 #[cfg(feature = "cloud")]
116 if let Some(first_path) = first_path {
117 let first_path_url = first_path.to_str();
118
119 let mut cloud_options =
120 parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
121 cloud_options = cloud_options
122 .with_max_retries(retries)
123 .with_credential_provider(
124 credential_provider.map(PlCredentialProvider::from_python_builder),
125 );
126
127 if let Some(file_cache_ttl) = file_cache_ttl {
128 cloud_options.file_cache_ttl = file_cache_ttl;
129 }
130
131 r = r.with_cloud_options(Some(cloud_options));
132 };
133
134 let lf = r
135 .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
136 .with_batch_size(batch_size)
137 .with_n_rows(n_rows)
138 .low_memory(low_memory)
139 .with_rechunk(rechunk)
140 .with_schema(schema.map(|schema| Arc::new(schema.0)))
141 .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
142 .with_row_index(row_index)
143 .with_ignore_errors(ignore_errors)
144 .with_include_file_paths(include_file_paths.map(|x| x.into()))
145 .finish()
146 .map_err(PyPolarsErr::from)?;
147
148 Ok(lf.into())
149 }
150
151 #[staticmethod]
152 #[cfg(feature = "csv")]
153 #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
154 low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
155 infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
156 encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
157 cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
158 )
159 )]
160 fn new_from_csv(
161 source: Option<PyObject>,
162 sources: Wrap<ScanSources>,
163 separator: &str,
164 has_header: bool,
165 ignore_errors: bool,
166 skip_rows: usize,
167 skip_lines: usize,
168 n_rows: Option<usize>,
169 cache: bool,
170 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
171 low_memory: bool,
172 comment_prefix: Option<&str>,
173 quote_char: Option<&str>,
174 null_values: Option<Wrap<NullValues>>,
175 missing_utf8_is_empty_string: bool,
176 infer_schema_length: Option<usize>,
177 with_schema_modify: Option<PyObject>,
178 rechunk: bool,
179 skip_rows_after_header: usize,
180 encoding: Wrap<CsvEncoding>,
181 row_index: Option<(String, IdxSize)>,
182 try_parse_dates: bool,
183 eol_char: &str,
184 raise_if_empty: bool,
185 truncate_ragged_lines: bool,
186 decimal_comma: bool,
187 glob: bool,
188 schema: Option<Wrap<Schema>>,
189 cloud_options: Option<Vec<(String, String)>>,
190 credential_provider: Option<PyObject>,
191 retries: usize,
192 file_cache_ttl: Option<u64>,
193 include_file_paths: Option<String>,
194 ) -> PyResult<Self> {
195 #[cfg(feature = "cloud")]
196 use cloud::credential_provider::PlCredentialProvider;
197
198 let null_values = null_values.map(|w| w.0);
199 let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
200 let separator = separator
201 .as_bytes()
202 .first()
203 .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
204 .copied()
205 .map_err(PyPolarsErr::from)?;
206 let eol_char = eol_char
207 .as_bytes()
208 .first()
209 .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
210 .copied()
211 .map_err(PyPolarsErr::from)?;
212 let row_index = row_index.map(|(name, offset)| RowIndex {
213 name: name.into(),
214 offset,
215 });
216
217 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
218 overwrite_dtype
219 .into_iter()
220 .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
221 .collect::<Schema>()
222 });
223
224 let sources = sources.0;
225 let (first_path, sources) = match source {
226 None => (sources.first_path().map(|p| p.into_owned()), sources),
227 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
228 };
229
230 let mut r = LazyCsvReader::new_with_sources(sources);
231
232 #[cfg(feature = "cloud")]
233 if let Some(first_path) = first_path {
234 let first_path_url = first_path.to_str();
235
236 let mut cloud_options =
237 parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
238 if let Some(file_cache_ttl) = file_cache_ttl {
239 cloud_options.file_cache_ttl = file_cache_ttl;
240 }
241 cloud_options = cloud_options
242 .with_max_retries(retries)
243 .with_credential_provider(
244 credential_provider.map(PlCredentialProvider::from_python_builder),
245 );
246 r = r.with_cloud_options(Some(cloud_options));
247 }
248
249 let mut r = r
250 .with_infer_schema_length(infer_schema_length)
251 .with_separator(separator)
252 .with_has_header(has_header)
253 .with_ignore_errors(ignore_errors)
254 .with_skip_rows(skip_rows)
255 .with_skip_lines(skip_lines)
256 .with_n_rows(n_rows)
257 .with_cache(cache)
258 .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
259 .with_schema(schema.map(|schema| Arc::new(schema.0)))
260 .with_low_memory(low_memory)
261 .with_comment_prefix(comment_prefix.map(|x| x.into()))
262 .with_quote_char(quote_char)
263 .with_eol_char(eol_char)
264 .with_rechunk(rechunk)
265 .with_skip_rows_after_header(skip_rows_after_header)
266 .with_encoding(encoding.0)
267 .with_row_index(row_index)
268 .with_try_parse_dates(try_parse_dates)
269 .with_null_values(null_values)
270 .with_missing_is_null(!missing_utf8_is_empty_string)
271 .with_truncate_ragged_lines(truncate_ragged_lines)
272 .with_decimal_comma(decimal_comma)
273 .with_glob(glob)
274 .with_raise_if_empty(raise_if_empty)
275 .with_include_file_paths(include_file_paths.map(|x| x.into()));
276
277 if let Some(lambda) = with_schema_modify {
278 let f = |schema: Schema| {
279 let iter = schema.iter_names().map(|s| s.as_str());
280 Python::with_gil(|py| {
281 let names = PyList::new(py, iter).unwrap();
282
283 let out = lambda.call1(py, (names,)).expect("python function failed");
284 let new_names = out
285 .extract::<Vec<String>>(py)
286 .expect("python function should return List[str]");
287 polars_ensure!(new_names.len() == schema.len(),
288 ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
289 );
290 Ok(schema
291 .iter_values()
292 .zip(new_names)
293 .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
294 .collect())
295 })
296 };
297 r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
298 }
299
300 Ok(r.finish().map_err(PyPolarsErr::from)?.into())
301 }
302
303 #[cfg(feature = "parquet")]
304 #[staticmethod]
305 #[pyo3(signature = (
306 sources, schema, scan_options, parallel, low_memory, use_statistics
307 ))]
308 fn new_from_parquet(
309 sources: Wrap<ScanSources>,
310 schema: Option<Wrap<Schema>>,
311 scan_options: PyScanOptions,
312 parallel: Wrap<ParallelStrategy>,
313 low_memory: bool,
314 use_statistics: bool,
315 ) -> PyResult<Self> {
316 use crate::utils::to_py_err;
317
318 let parallel = parallel.0;
319
320 let options = ParquetOptions {
321 schema: schema.map(|x| Arc::new(x.0)),
322 parallel,
323 low_memory,
324 use_statistics,
325 };
326
327 let sources = sources.0;
328 let first_path = sources.first_path().map(|p| p.into_owned());
329
330 let unified_scan_args =
331 scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
332
333 let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
334 .map_err(to_py_err)?
335 .build()
336 .into();
337
338 Ok(lf.into())
339 }
340
341 #[cfg(feature = "ipc")]
342 #[staticmethod]
343 #[pyo3(signature = (
344 source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider,
345 hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl,
346 include_file_paths
347 ))]
348 fn new_from_ipc(
349 source: Option<PyObject>,
350 sources: Wrap<ScanSources>,
351 n_rows: Option<usize>,
352 cache: bool,
353 rechunk: bool,
354 row_index: Option<(String, IdxSize)>,
355 cloud_options: Option<Vec<(String, String)>>,
356 credential_provider: Option<PyObject>,
357 hive_partitioning: Option<bool>,
358 hive_schema: Option<Wrap<Schema>>,
359 try_parse_hive_dates: bool,
360 retries: usize,
361 file_cache_ttl: Option<u64>,
362 include_file_paths: Option<String>,
363 ) -> PyResult<Self> {
364 #[cfg(feature = "cloud")]
365 use cloud::credential_provider::PlCredentialProvider;
366 let row_index = row_index.map(|(name, offset)| RowIndex {
367 name: name.into(),
368 offset,
369 });
370
371 let hive_options = HiveOptions {
372 enabled: hive_partitioning,
373 hive_start_idx: 0,
374 schema: hive_schema.map(|x| Arc::new(x.0)),
375 try_parse_dates: try_parse_hive_dates,
376 };
377
378 let mut args = ScanArgsIpc {
379 n_rows,
380 cache,
381 rechunk,
382 row_index,
383 cloud_options: None,
384 hive_options,
385 include_file_paths: include_file_paths.map(|x| x.into()),
386 };
387
388 let sources = sources.0;
389 let (first_path, sources) = match source {
390 None => (sources.first_path().map(|p| p.into_owned()), sources),
391 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
392 };
393
394 #[cfg(feature = "cloud")]
395 if let Some(first_path) = first_path {
396 let first_path_url = first_path.to_str();
397
398 let mut cloud_options =
399 parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
400 if let Some(file_cache_ttl) = file_cache_ttl {
401 cloud_options.file_cache_ttl = file_cache_ttl;
402 }
403 args.cloud_options = Some(
404 cloud_options
405 .with_max_retries(retries)
406 .with_credential_provider(
407 credential_provider.map(PlCredentialProvider::from_python_builder),
408 ),
409 );
410 }
411
412 let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?;
413 Ok(lf.into())
414 }
415
416 #[staticmethod]
417 #[pyo3(signature = (
418 dataset_object
419 ))]
420 fn new_from_dataset_object(dataset_object: PyObject) -> PyResult<Self> {
421 let lf =
422 LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
423 .into();
424
425 Ok(lf)
426 }
427
428 #[staticmethod]
429 fn scan_from_python_function_arrow_schema(
430 schema: &Bound<'_, PyList>,
431 scan_fn: PyObject,
432 pyarrow: bool,
433 validate_schema: bool,
434 is_pure: bool,
435 ) -> PyResult<Self> {
436 let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
437
438 Ok(LazyFrame::scan_from_python_function(
439 Either::Right(schema),
440 scan_fn,
441 pyarrow,
442 validate_schema,
443 is_pure,
444 )
445 .into())
446 }
447
448 #[staticmethod]
449 fn scan_from_python_function_pl_schema(
450 schema: Vec<(PyBackedStr, Wrap<DataType>)>,
451 scan_fn: PyObject,
452 pyarrow: bool,
453 validate_schema: bool,
454 is_pure: bool,
455 ) -> PyResult<Self> {
456 let schema = Arc::new(Schema::from_iter(
457 schema
458 .into_iter()
459 .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
460 ));
461 Ok(LazyFrame::scan_from_python_function(
462 Either::Right(schema),
463 scan_fn,
464 pyarrow,
465 validate_schema,
466 is_pure,
467 )
468 .into())
469 }
470
471 #[staticmethod]
472 fn scan_from_python_function_schema_function(
473 schema_fn: PyObject,
474 scan_fn: PyObject,
475 validate_schema: bool,
476 is_pure: bool,
477 ) -> PyResult<Self> {
478 Ok(LazyFrame::scan_from_python_function(
479 Either::Left(schema_fn),
480 scan_fn,
481 false,
482 validate_schema,
483 is_pure,
484 )
485 .into())
486 }
487
488 fn describe_plan(&self, py: Python) -> PyResult<String> {
489 py.enter_polars(|| self.ldf.read().describe_plan())
490 }
491
492 fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
493 py.enter_polars(|| self.ldf.read().describe_optimized_plan())
494 }
495
496 fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
497 py.enter_polars(|| self.ldf.read().describe_plan_tree())
498 }
499
500 fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
501 py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
502 }
503
504 fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
505 py.enter_polars(|| self.ldf.read().to_dot(optimized))
506 }
507
508 #[cfg(feature = "new_streaming")]
509 fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
510 py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
511 }
512
513 fn optimization_toggle(
514 &self,
515 type_coercion: bool,
516 type_check: bool,
517 predicate_pushdown: bool,
518 projection_pushdown: bool,
519 simplify_expression: bool,
520 slice_pushdown: bool,
521 comm_subplan_elim: bool,
522 comm_subexpr_elim: bool,
523 cluster_with_columns: bool,
524 collapse_joins: bool,
525 _eager: bool,
526 _check_order: bool,
527 #[allow(unused_variables)] new_streaming: bool,
528 ) -> Self {
529 let ldf = self.ldf.read().clone();
530 let mut ldf = ldf
531 .with_type_coercion(type_coercion)
532 .with_type_check(type_check)
533 .with_predicate_pushdown(predicate_pushdown)
534 .with_simplify_expr(simplify_expression)
535 .with_slice_pushdown(slice_pushdown)
536 .with_cluster_with_columns(cluster_with_columns)
537 .with_collapse_joins(collapse_joins)
538 .with_check_order(_check_order)
539 ._with_eager(_eager)
540 .with_projection_pushdown(projection_pushdown);
541
542 #[cfg(feature = "new_streaming")]
543 {
544 ldf = ldf.with_new_streaming(new_streaming);
545 }
546
547 #[cfg(feature = "cse")]
548 {
549 ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
550 ldf = ldf.with_comm_subexpr_elim(comm_subexpr_elim);
551 }
552
553 ldf.into()
554 }
555
556 fn sort(
557 &self,
558 by_column: &str,
559 descending: bool,
560 nulls_last: bool,
561 maintain_order: bool,
562 multithreaded: bool,
563 ) -> Self {
564 let ldf = self.ldf.read().clone();
565 ldf.sort(
566 [by_column],
567 SortMultipleOptions {
568 descending: vec![descending],
569 nulls_last: vec![nulls_last],
570 multithreaded,
571 maintain_order,
572 limit: None,
573 },
574 )
575 .into()
576 }
577
578 fn sort_by_exprs(
579 &self,
580 by: Vec<PyExpr>,
581 descending: Vec<bool>,
582 nulls_last: Vec<bool>,
583 maintain_order: bool,
584 multithreaded: bool,
585 ) -> Self {
586 let ldf = self.ldf.read().clone();
587 let exprs = by.to_exprs();
588 ldf.sort_by_exprs(
589 exprs,
590 SortMultipleOptions {
591 descending,
592 nulls_last,
593 maintain_order,
594 multithreaded,
595 limit: None,
596 },
597 )
598 .into()
599 }
600
601 fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
602 let ldf = self.ldf.read().clone();
603 let exprs = by.to_exprs();
604 ldf.top_k(
605 k,
606 exprs,
607 SortMultipleOptions::new().with_order_descending_multi(reverse),
608 )
609 .into()
610 }
611
612 fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
613 let ldf = self.ldf.read().clone();
614 let exprs = by.to_exprs();
615 ldf.bottom_k(
616 k,
617 exprs,
618 SortMultipleOptions::new().with_order_descending_multi(reverse),
619 )
620 .into()
621 }
622
623 fn cache(&self) -> Self {
624 let ldf = self.ldf.read().clone();
625 ldf.cache().into()
626 }
627
628 #[pyo3(signature = (optflags))]
629 fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
630 let ldf = self.ldf.read().clone();
631 ldf.with_optimizations(optflags.inner.into_inner()).into()
632 }
633
634 #[pyo3(signature = (lambda_post_opt))]
635 fn profile(
636 &self,
637 py: Python<'_>,
638 lambda_post_opt: Option<PyObject>,
639 ) -> PyResult<(PyDataFrame, PyDataFrame)> {
640 let (df, time_df) = py.enter_polars(|| {
641 let ldf = self.ldf.read().clone();
642 if let Some(lambda) = lambda_post_opt {
643 ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
644 post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
645 })
646 } else {
647 ldf.profile()
648 }
649 })?;
650 Ok((df.into(), time_df.into()))
651 }
652
653 #[pyo3(signature = (engine, lambda_post_opt))]
654 fn collect(
655 &self,
656 py: Python<'_>,
657 engine: Wrap<Engine>,
658 lambda_post_opt: Option<PyObject>,
659 ) -> PyResult<PyDataFrame> {
660 py.enter_polars_df(|| {
661 let ldf = self.ldf.read().clone();
662 if let Some(lambda) = lambda_post_opt {
663 ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
664 post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
665 })
666 } else {
667 ldf.collect_with_engine(engine.0)
668 }
669 })
670 }
671
672 #[pyo3(signature = (engine, lambda))]
673 fn collect_with_callback(
674 &self,
675 py: Python<'_>,
676 engine: Wrap<Engine>,
677 lambda: PyObject,
678 ) -> PyResult<()> {
679 py.enter_polars_ok(|| {
680 let ldf = self.ldf.read().clone();
681
682 polars_core::POOL.spawn(move || {
683 let result = ldf
684 .collect_with_engine(engine.0)
685 .map(PyDataFrame::new)
686 .map_err(PyPolarsErr::from);
687
688 Python::with_gil(|py| match result {
689 Ok(df) => {
690 lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
691 },
692 Err(err) => {
693 lambda
694 .call1(py, (PyErr::from(err),))
695 .map_err(|err| err.restore(py))
696 .ok();
697 },
698 });
699 });
700 })
701 }
702
703 #[cfg(feature = "parquet")]
704 #[pyo3(signature = (
705 target, compression, compression_level, statistics, row_group_size, data_page_size,
706 cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
707 ))]
708 fn sink_parquet(
709 &self,
710 py: Python<'_>,
711 target: SinkTarget,
712 compression: &str,
713 compression_level: Option<i32>,
714 statistics: Wrap<StatisticsOptions>,
715 row_group_size: Option<usize>,
716 data_page_size: Option<usize>,
717 cloud_options: Option<Vec<(String, String)>>,
718 credential_provider: Option<PyObject>,
719 retries: usize,
720 sink_options: Wrap<SinkOptions>,
721 metadata: Wrap<Option<KeyValueMetadata>>,
722 field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
723 ) -> PyResult<PyLazyFrame> {
724 let compression = parse_parquet_compression(compression, compression_level)?;
725
726 let options = ParquetWriteOptions {
727 compression,
728 statistics: statistics.0,
729 row_group_size,
730 data_page_size,
731 key_value_metadata: metadata.0,
732 field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
733 };
734
735 let cloud_options = match target.base_path() {
736 None => None,
737 Some(base_path) => {
738 let cloud_options =
739 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
740 Some(
741 cloud_options
742 .with_max_retries(retries)
743 .with_credential_provider(
744 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
745 ),
746 )
747 },
748 };
749
750 py.enter_polars(|| {
751 let ldf = self.ldf.read().clone();
752 match target {
753 SinkTarget::File(target) => {
754 ldf.sink_parquet(target, options, cloud_options, sink_options.0)
755 },
756 SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
757 Arc::new(partition.base_path.0),
758 partition.file_path_cb.map(PartitionTargetCallback::Python),
759 partition.variant,
760 options,
761 cloud_options,
762 sink_options.0,
763 partition.per_partition_sort_by,
764 partition.finish_callback,
765 ),
766 }
767 .into()
768 })
769 .map(Into::into)
770 .map_err(Into::into)
771 }
772
773 #[cfg(feature = "ipc")]
774 #[pyo3(signature = (
775 target, compression, compat_level, cloud_options, credential_provider, retries,
776 sink_options
777 ))]
778 fn sink_ipc(
779 &self,
780 py: Python<'_>,
781 target: SinkTarget,
782 compression: Wrap<Option<IpcCompression>>,
783 compat_level: PyCompatLevel,
784 cloud_options: Option<Vec<(String, String)>>,
785 credential_provider: Option<PyObject>,
786 retries: usize,
787 sink_options: Wrap<SinkOptions>,
788 ) -> PyResult<PyLazyFrame> {
789 let options = IpcWriterOptions {
790 compression: compression.0,
791 compat_level: compat_level.0,
792 ..Default::default()
793 };
794
795 #[cfg(feature = "cloud")]
796 let cloud_options = match target.base_path() {
797 None => None,
798 Some(base_path) => {
799 let cloud_options =
800 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
801 Some(
802 cloud_options
803 .with_max_retries(retries)
804 .with_credential_provider(
805 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
806 ),
807 )
808 },
809 };
810
811 #[cfg(not(feature = "cloud"))]
812 let cloud_options = None;
813
814 py.enter_polars(|| {
815 let ldf = self.ldf.read().clone();
816 match target {
817 SinkTarget::File(target) => {
818 ldf.sink_ipc(target, options, cloud_options, sink_options.0)
819 },
820 SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
821 Arc::new(partition.base_path.0),
822 partition.file_path_cb.map(PartitionTargetCallback::Python),
823 partition.variant,
824 options,
825 cloud_options,
826 sink_options.0,
827 partition.per_partition_sort_by,
828 partition.finish_callback,
829 ),
830 }
831 })
832 .map(Into::into)
833 .map_err(Into::into)
834 }
835
836 #[cfg(feature = "csv")]
837 #[pyo3(signature = (
838 target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
839 datetime_format, date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
840 quote_style, cloud_options, credential_provider, retries, sink_options
841 ))]
842 fn sink_csv(
843 &self,
844 py: Python<'_>,
845 target: SinkTarget,
846 include_bom: bool,
847 include_header: bool,
848 separator: u8,
849 line_terminator: String,
850 quote_char: u8,
851 batch_size: NonZeroUsize,
852 datetime_format: Option<String>,
853 date_format: Option<String>,
854 time_format: Option<String>,
855 float_scientific: Option<bool>,
856 float_precision: Option<usize>,
857 decimal_comma: bool,
858 null_value: Option<String>,
859 quote_style: Option<Wrap<QuoteStyle>>,
860 cloud_options: Option<Vec<(String, String)>>,
861 credential_provider: Option<PyObject>,
862 retries: usize,
863 sink_options: Wrap<SinkOptions>,
864 ) -> PyResult<PyLazyFrame> {
865 let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
866 let null_value = null_value.unwrap_or(SerializeOptions::default().null);
867
868 let serialize_options = SerializeOptions {
869 date_format,
870 time_format,
871 datetime_format,
872 float_scientific,
873 float_precision,
874 decimal_comma,
875 separator,
876 quote_char,
877 null: null_value,
878 line_terminator,
879 quote_style,
880 };
881
882 let options = CsvWriterOptions {
883 include_bom,
884 include_header,
885 batch_size,
886 serialize_options,
887 };
888
889 #[cfg(feature = "cloud")]
890 let cloud_options = match target.base_path() {
891 None => None,
892 Some(base_path) => {
893 let cloud_options =
894 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
895 Some(
896 cloud_options
897 .with_max_retries(retries)
898 .with_credential_provider(
899 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
900 ),
901 )
902 },
903 };
904
905 #[cfg(not(feature = "cloud"))]
906 let cloud_options = None;
907
908 py.enter_polars(|| {
909 let ldf = self.ldf.read().clone();
910 match target {
911 SinkTarget::File(target) => {
912 ldf.sink_csv(target, options, cloud_options, sink_options.0)
913 },
914 SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
915 Arc::new(partition.base_path.0),
916 partition.file_path_cb.map(PartitionTargetCallback::Python),
917 partition.variant,
918 options,
919 cloud_options,
920 sink_options.0,
921 partition.per_partition_sort_by,
922 partition.finish_callback,
923 ),
924 }
925 })
926 .map(Into::into)
927 .map_err(Into::into)
928 }
929
930 #[allow(clippy::too_many_arguments)]
931 #[cfg(feature = "json")]
932 #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
933 fn sink_json(
934 &self,
935 py: Python<'_>,
936 target: SinkTarget,
937 cloud_options: Option<Vec<(String, String)>>,
938 credential_provider: Option<PyObject>,
939 retries: usize,
940 sink_options: Wrap<SinkOptions>,
941 ) -> PyResult<PyLazyFrame> {
942 let options = JsonWriterOptions {};
943
944 let cloud_options = match target.base_path() {
945 None => None,
946 Some(base_path) => {
947 let cloud_options =
948 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
949 Some(
950 cloud_options
951 .with_max_retries(retries)
952 .with_credential_provider(
953 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
954 ),
955 )
956 },
957 };
958
959 py.enter_polars(|| {
960 let ldf = self.ldf.read().clone();
961 match target {
962 SinkTarget::File(path) => {
963 ldf.sink_json(path, options, cloud_options, sink_options.0)
964 },
965 SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
966 Arc::new(partition.base_path.0),
967 partition.file_path_cb.map(PartitionTargetCallback::Python),
968 partition.variant,
969 options,
970 cloud_options,
971 sink_options.0,
972 partition.per_partition_sort_by,
973 partition.finish_callback,
974 ),
975 }
976 })
977 .map(Into::into)
978 .map_err(Into::into)
979 }
980
981 fn filter(&self, predicate: PyExpr) -> Self {
982 let ldf = self.ldf.read().clone();
983 ldf.filter(predicate.inner).into()
984 }
985
986 fn remove(&self, predicate: PyExpr) -> Self {
987 let ldf = self.ldf.read().clone();
988 ldf.remove(predicate.inner).into()
989 }
990
991 fn select(&self, exprs: Vec<PyExpr>) -> Self {
992 let ldf = self.ldf.read().clone();
993 let exprs = exprs.to_exprs();
994 ldf.select(exprs).into()
995 }
996
997 fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
998 let ldf = self.ldf.read().clone();
999 let exprs = exprs.to_exprs();
1000 ldf.select_seq(exprs).into()
1001 }
1002
1003 fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
1004 let ldf = self.ldf.read().clone();
1005 let by = by.to_exprs();
1006 let lazy_gb = if maintain_order {
1007 ldf.group_by_stable(by)
1008 } else {
1009 ldf.group_by(by)
1010 };
1011
1012 PyLazyGroupBy { lgb: Some(lazy_gb) }
1013 }
1014
1015 fn rolling(
1016 &self,
1017 index_column: PyExpr,
1018 period: &str,
1019 offset: &str,
1020 closed: Wrap<ClosedWindow>,
1021 by: Vec<PyExpr>,
1022 ) -> PyResult<PyLazyGroupBy> {
1023 let closed_window = closed.0;
1024 let ldf = self.ldf.read().clone();
1025 let by = by
1026 .into_iter()
1027 .map(|pyexpr| pyexpr.inner)
1028 .collect::<Vec<_>>();
1029 let lazy_gb = ldf.rolling(
1030 index_column.inner,
1031 by,
1032 RollingGroupOptions {
1033 index_column: "".into(),
1034 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1035 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1036 closed_window,
1037 },
1038 );
1039
1040 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1041 }
1042
1043 fn group_by_dynamic(
1044 &self,
1045 index_column: PyExpr,
1046 every: &str,
1047 period: &str,
1048 offset: &str,
1049 label: Wrap<Label>,
1050 include_boundaries: bool,
1051 closed: Wrap<ClosedWindow>,
1052 group_by: Vec<PyExpr>,
1053 start_by: Wrap<StartBy>,
1054 ) -> PyResult<PyLazyGroupBy> {
1055 let closed_window = closed.0;
1056 let group_by = group_by
1057 .into_iter()
1058 .map(|pyexpr| pyexpr.inner)
1059 .collect::<Vec<_>>();
1060 let ldf = self.ldf.read().clone();
1061 let lazy_gb = ldf.group_by_dynamic(
1062 index_column.inner,
1063 group_by,
1064 DynamicGroupOptions {
1065 every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1066 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1067 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1068 label: label.0,
1069 include_boundaries,
1070 closed_window,
1071 start_by: start_by.0,
1072 ..Default::default()
1073 },
1074 );
1075
1076 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1077 }
1078
1079 fn with_context(&self, contexts: Vec<Self>) -> Self {
1080 let contexts = contexts
1081 .into_iter()
1082 .map(|ldf| ldf.ldf.into_inner())
1083 .collect::<Vec<_>>();
1084 self.ldf.read().clone().with_context(contexts).into()
1085 }
1086
1087 #[cfg(feature = "asof_join")]
1088 #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1089 fn join_asof(
1090 &self,
1091 other: Self,
1092 left_on: PyExpr,
1093 right_on: PyExpr,
1094 left_by: Option<Vec<PyBackedStr>>,
1095 right_by: Option<Vec<PyBackedStr>>,
1096 allow_parallel: bool,
1097 force_parallel: bool,
1098 suffix: String,
1099 strategy: Wrap<AsofStrategy>,
1100 tolerance: Option<Wrap<AnyValue<'_>>>,
1101 tolerance_str: Option<String>,
1102 coalesce: bool,
1103 allow_eq: bool,
1104 check_sortedness: bool,
1105 ) -> PyResult<Self> {
1106 let coalesce = if coalesce {
1107 JoinCoalesce::CoalesceColumns
1108 } else {
1109 JoinCoalesce::KeepColumns
1110 };
1111 let ldf = self.ldf.read().clone();
1112 let other = other.ldf.into_inner();
1113 let left_on = left_on.inner;
1114 let right_on = right_on.inner;
1115 Ok(ldf
1116 .join_builder()
1117 .with(other)
1118 .left_on([left_on])
1119 .right_on([right_on])
1120 .allow_parallel(allow_parallel)
1121 .force_parallel(force_parallel)
1122 .coalesce(coalesce)
1123 .how(JoinType::AsOf(Box::new(AsOfOptions {
1124 strategy: strategy.0,
1125 left_by: left_by.map(strings_to_pl_smallstr),
1126 right_by: right_by.map(strings_to_pl_smallstr),
1127 tolerance: tolerance.map(|t| {
1128 let av = t.0.into_static();
1129 let dtype = av.dtype();
1130 Scalar::new(dtype, av)
1131 }),
1132 tolerance_str: tolerance_str.map(|s| s.into()),
1133 allow_eq,
1134 check_sortedness,
1135 })))
1136 .suffix(suffix)
1137 .finish()
1138 .into())
1139 }
1140
1141 #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1142 fn join(
1143 &self,
1144 other: Self,
1145 left_on: Vec<PyExpr>,
1146 right_on: Vec<PyExpr>,
1147 allow_parallel: bool,
1148 force_parallel: bool,
1149 nulls_equal: bool,
1150 how: Wrap<JoinType>,
1151 suffix: String,
1152 validate: Wrap<JoinValidation>,
1153 maintain_order: Wrap<MaintainOrderJoin>,
1154 coalesce: Option<bool>,
1155 ) -> PyResult<Self> {
1156 let coalesce = match coalesce {
1157 None => JoinCoalesce::JoinSpecific,
1158 Some(true) => JoinCoalesce::CoalesceColumns,
1159 Some(false) => JoinCoalesce::KeepColumns,
1160 };
1161 let ldf = self.ldf.read().clone();
1162 let other = other.ldf.into_inner();
1163 let left_on = left_on
1164 .into_iter()
1165 .map(|pyexpr| pyexpr.inner)
1166 .collect::<Vec<_>>();
1167 let right_on = right_on
1168 .into_iter()
1169 .map(|pyexpr| pyexpr.inner)
1170 .collect::<Vec<_>>();
1171
1172 Ok(ldf
1173 .join_builder()
1174 .with(other)
1175 .left_on(left_on)
1176 .right_on(right_on)
1177 .allow_parallel(allow_parallel)
1178 .force_parallel(force_parallel)
1179 .join_nulls(nulls_equal)
1180 .how(how.0)
1181 .suffix(suffix)
1182 .validate(validate.0)
1183 .coalesce(coalesce)
1184 .maintain_order(maintain_order.0)
1185 .finish()
1186 .into())
1187 }
1188
1189 fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1190 let ldf = self.ldf.read().clone();
1191 let other = other.ldf.into_inner();
1192
1193 let predicates = predicates.to_exprs();
1194
1195 Ok(ldf
1196 .join_builder()
1197 .with(other)
1198 .suffix(suffix)
1199 .join_where(predicates)
1200 .into())
1201 }
1202
1203 fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1204 let ldf = self.ldf.read().clone();
1205 ldf.with_columns(exprs.to_exprs()).into()
1206 }
1207
1208 fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1209 let ldf = self.ldf.read().clone();
1210 ldf.with_columns_seq(exprs.to_exprs()).into()
1211 }
1212
1213 fn match_to_schema<'py>(
1214 &self,
1215 schema: Wrap<Schema>,
1216 missing_columns: &Bound<'py, PyAny>,
1217 missing_struct_fields: &Bound<'py, PyAny>,
1218 extra_columns: Wrap<ExtraColumnsPolicy>,
1219 extra_struct_fields: &Bound<'py, PyAny>,
1220 integer_cast: &Bound<'py, PyAny>,
1221 float_cast: &Bound<'py, PyAny>,
1222 ) -> PyResult<Self> {
1223 fn parse_missing_columns<'py>(
1224 schema: &Schema,
1225 missing_columns: &Bound<'py, PyAny>,
1226 ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1227 let mut out = Vec::with_capacity(schema.len());
1228 if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1229 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1230 } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1231 out.extend(std::iter::repeat_n(
1232 MissingColumnsPolicyOrExpr::Raise,
1233 schema.len(),
1234 ));
1235 for (key, value) in dict.iter() {
1236 let key = key.extract::<String>()?;
1237 let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1238 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1239 }
1240 } else {
1241 return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1242 }
1243 Ok(out)
1244 }
1245 fn parse_missing_struct_fields<'py>(
1246 schema: &Schema,
1247 missing_struct_fields: &Bound<'py, PyAny>,
1248 ) -> PyResult<Vec<MissingColumnsPolicy>> {
1249 let mut out = Vec::with_capacity(schema.len());
1250 if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1251 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1252 } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1253 out.extend(std::iter::repeat_n(
1254 MissingColumnsPolicy::Raise,
1255 schema.len(),
1256 ));
1257 for (key, value) in dict.iter() {
1258 let key = key.extract::<String>()?;
1259 let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1260 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1261 }
1262 } else {
1263 return Err(PyTypeError::new_err(
1264 "Invalid value for `missing_struct_fields`",
1265 ));
1266 }
1267 Ok(out)
1268 }
1269 fn parse_extra_struct_fields<'py>(
1270 schema: &Schema,
1271 extra_struct_fields: &Bound<'py, PyAny>,
1272 ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1273 let mut out = Vec::with_capacity(schema.len());
1274 if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1275 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1276 } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1277 out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1278 for (key, value) in dict.iter() {
1279 let key = key.extract::<String>()?;
1280 let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1281 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1282 }
1283 } else {
1284 return Err(PyTypeError::new_err(
1285 "Invalid value for `extra_struct_fields`",
1286 ));
1287 }
1288 Ok(out)
1289 }
1290 fn parse_cast<'py>(
1291 schema: &Schema,
1292 cast: &Bound<'py, PyAny>,
1293 ) -> PyResult<Vec<UpcastOrForbid>> {
1294 let mut out = Vec::with_capacity(schema.len());
1295 if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1296 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1297 } else if let Ok(dict) = cast.downcast::<PyDict>() {
1298 out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1299 for (key, value) in dict.iter() {
1300 let key = key.extract::<String>()?;
1301 let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1302 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1303 }
1304 } else {
1305 return Err(PyTypeError::new_err(
1306 "Invalid value for `integer_cast` / `float_cast`",
1307 ));
1308 }
1309 Ok(out)
1310 }
1311
1312 let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1313 let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1314 let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1315 let integer_cast = parse_cast(&schema.0, integer_cast)?;
1316 let float_cast = parse_cast(&schema.0, float_cast)?;
1317
1318 let per_column = (0..schema.0.len())
1319 .map(|i| MatchToSchemaPerColumn {
1320 missing_columns: missing_columns[i].clone(),
1321 missing_struct_fields: missing_struct_fields[i],
1322 extra_struct_fields: extra_struct_fields[i],
1323 integer_cast: integer_cast[i],
1324 float_cast: float_cast[i],
1325 })
1326 .collect();
1327
1328 let ldf = self.ldf.read().clone();
1329 Ok(ldf
1330 .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1331 .into())
1332 }
1333
1334 fn pipe_with_schema(&self, callback: PyObject) -> Self {
1335 let ldf = self.ldf.read().clone();
1336 let function = PythonObject(callback);
1337 ldf.pipe_with_schema(PlanCallback::new_python(function))
1338 .into()
1339 }
1340
1341 fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1342 let ldf = self.ldf.read().clone();
1343 ldf.rename(existing, new, strict).into()
1344 }
1345
1346 fn reverse(&self) -> Self {
1347 let ldf = self.ldf.read().clone();
1348 ldf.reverse().into()
1349 }
1350
1351 #[pyo3(signature = (n, fill_value=None))]
1352 fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1353 let lf = self.ldf.read().clone();
1354 let out = match fill_value {
1355 Some(v) => lf.shift_and_fill(n.inner, v.inner),
1356 None => lf.shift(n.inner),
1357 };
1358 out.into()
1359 }
1360
1361 fn fill_nan(&self, fill_value: PyExpr) -> Self {
1362 let ldf = self.ldf.read().clone();
1363 ldf.fill_nan(fill_value.inner).into()
1364 }
1365
1366 fn min(&self) -> Self {
1367 let ldf = self.ldf.read().clone();
1368 let out = ldf.min();
1369 out.into()
1370 }
1371
1372 fn max(&self) -> Self {
1373 let ldf = self.ldf.read().clone();
1374 let out = ldf.max();
1375 out.into()
1376 }
1377
1378 fn sum(&self) -> Self {
1379 let ldf = self.ldf.read().clone();
1380 let out = ldf.sum();
1381 out.into()
1382 }
1383
1384 fn mean(&self) -> Self {
1385 let ldf = self.ldf.read().clone();
1386 let out = ldf.mean();
1387 out.into()
1388 }
1389
1390 fn std(&self, ddof: u8) -> Self {
1391 let ldf = self.ldf.read().clone();
1392 let out = ldf.std(ddof);
1393 out.into()
1394 }
1395
1396 fn var(&self, ddof: u8) -> Self {
1397 let ldf = self.ldf.read().clone();
1398 let out = ldf.var(ddof);
1399 out.into()
1400 }
1401
1402 fn median(&self) -> Self {
1403 let ldf = self.ldf.read().clone();
1404 let out = ldf.median();
1405 out.into()
1406 }
1407
1408 fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1409 let ldf = self.ldf.read().clone();
1410 let out = ldf.quantile(quantile.inner, interpolation.0);
1411 out.into()
1412 }
1413
1414 fn explode(&self, subset: PySelector) -> Self {
1415 self.ldf.read().clone().explode(subset.inner).into()
1416 }
1417
1418 fn null_count(&self) -> Self {
1419 let ldf = self.ldf.read().clone();
1420 ldf.null_count().into()
1421 }
1422
1423 #[pyo3(signature = (maintain_order, subset, keep))]
1424 fn unique(
1425 &self,
1426 maintain_order: bool,
1427 subset: Option<PySelector>,
1428 keep: Wrap<UniqueKeepStrategy>,
1429 ) -> Self {
1430 let ldf = self.ldf.read().clone();
1431 let subset = subset.map(|e| e.inner);
1432 match maintain_order {
1433 true => ldf.unique_stable_generic(subset, keep.0),
1434 false => ldf.unique_generic(subset, keep.0),
1435 }
1436 .into()
1437 }
1438
1439 fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1440 self.ldf
1441 .read()
1442 .clone()
1443 .drop_nans(subset.map(|e| e.inner))
1444 .into()
1445 }
1446
1447 fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1448 self.ldf
1449 .read()
1450 .clone()
1451 .drop_nulls(subset.map(|e| e.inner))
1452 .into()
1453 }
1454
1455 #[pyo3(signature = (offset, len=None))]
1456 fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1457 let ldf = self.ldf.read().clone();
1458 ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1459 }
1460
1461 fn tail(&self, n: IdxSize) -> Self {
1462 let ldf = self.ldf.read().clone();
1463 ldf.tail(n).into()
1464 }
1465
1466 #[cfg(feature = "pivot")]
1467 #[pyo3(signature = (on, index, value_name, variable_name))]
1468 fn unpivot(
1469 &self,
1470 on: PySelector,
1471 index: PySelector,
1472 value_name: Option<String>,
1473 variable_name: Option<String>,
1474 ) -> Self {
1475 let args = UnpivotArgsDSL {
1476 on: on.inner,
1477 index: index.inner,
1478 value_name: value_name.map(|s| s.into()),
1479 variable_name: variable_name.map(|s| s.into()),
1480 };
1481
1482 let ldf = self.ldf.read().clone();
1483 ldf.unpivot(args).into()
1484 }
1485
1486 #[pyo3(signature = (name, offset=None))]
1487 fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1488 let ldf = self.ldf.read().clone();
1489 ldf.with_row_index(name, offset).into()
1490 }
1491
1492 #[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1493 fn map_batches(
1494 &self,
1495 function: PyObject,
1496 predicate_pushdown: bool,
1497 projection_pushdown: bool,
1498 slice_pushdown: bool,
1499 streamable: bool,
1500 schema: Option<Wrap<Schema>>,
1501 validate_output: bool,
1502 ) -> Self {
1503 let mut opt = OptFlags::default();
1504 opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1505 opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1506 opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1507 opt.set(OptFlags::NEW_STREAMING, streamable);
1508
1509 self.ldf
1510 .read()
1511 .clone()
1512 .map_python(
1513 function.into(),
1514 opt,
1515 schema.map(|s| Arc::new(s.0)),
1516 validate_output,
1517 )
1518 .into()
1519 }
1520
1521 fn drop(&self, columns: PySelector) -> Self {
1522 self.ldf.read().clone().drop(columns.inner).into()
1523 }
1524
1525 fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1526 let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1527 cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1528 self.ldf.read().clone().cast(cast_map, strict).into()
1529 }
1530
1531 fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1532 self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1533 }
1534
1535 fn clone(&self) -> Self {
1536 self.ldf.read().clone().into()
1537 }
1538
1539 fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1540 let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1541
1542 let schema_dict = PyDict::new(py);
1543 schema.iter_fields().for_each(|fld| {
1544 schema_dict
1545 .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1546 .unwrap()
1547 });
1548 Ok(schema_dict)
1549 }
1550
1551 fn unnest(&self, columns: PySelector) -> Self {
1552 self.ldf.read().clone().unnest(columns.inner).into()
1553 }
1554
1555 fn count(&self) -> Self {
1556 let ldf = self.ldf.read().clone();
1557 ldf.count().into()
1558 }
1559
1560 #[cfg(feature = "merge_sorted")]
1561 fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1562 let out = self
1563 .ldf
1564 .read()
1565 .clone()
1566 .merge_sorted(other.ldf.into_inner(), key)
1567 .map_err(PyPolarsErr::from)?;
1568 Ok(out.into())
1569 }
1570}
1571
1572#[cfg(feature = "parquet")]
1573impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1574 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1575 use polars_io::parquet::write::ParquetFieldOverwrites;
1576
1577 let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1578
1579 let name = PyDictMethods::get_item(&parsed, "name")?
1580 .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1581 .transpose()?;
1582 let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1583 PyResult::Ok(ChildFieldOverwrites::None),
1584 |v| {
1585 Ok(
1586 if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1587 ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1588 } else {
1589 ChildFieldOverwrites::ListLike(Box::new(
1590 v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1591 ))
1592 },
1593 )
1594 },
1595 )?;
1596
1597 let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1598 .map(|v| v.extract::<i32>())
1599 .transpose()?;
1600
1601 let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1602 .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1603 .transpose()?;
1604 let metadata = metadata.map(|v| {
1605 v.into_iter()
1606 .map(|v| MetadataKeyValue {
1607 key: v.0.into(),
1608 value: v.1.map(|v| v.into()),
1609 })
1610 .collect()
1611 });
1612
1613 let required = PyDictMethods::get_item(&parsed, "required")?
1614 .map(|v| v.extract::<bool>())
1615 .transpose()?;
1616
1617 Ok(Wrap(ParquetFieldOverwrites {
1618 name,
1619 children,
1620 field_id,
1621 metadata,
1622 required,
1623 }))
1624 }
1625}