1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3
4use either::Either;
5use polars::io::RowIndex;
6use polars::time::*;
7use polars_core::prelude::*;
8#[cfg(feature = "parquet")]
9use polars_parquet::arrow::write::StatisticsOptions;
10use polars_plan::dsl::ScanSources;
11use polars_plan::plans::{AExpr, HintIR, IR, Sorted};
12use polars_utils::arena::{Arena, Node};
13use polars_utils::python_function::PythonObject;
14use pyo3::exceptions::{PyTypeError, PyValueError};
15use pyo3::prelude::*;
16use pyo3::pybacked::PyBackedStr;
17use pyo3::types::{PyDict, PyDictMethods, PyList};
18
19use super::{PyLazyFrame, PyOptFlags, SinkTarget};
20use crate::error::PyPolarsErr;
21use crate::expr::ToExprs;
22use crate::expr::datatype::PyDataTypeExpr;
23use crate::expr::selector::PySelector;
24use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
25use crate::io::PyScanOptions;
26use crate::lazyframe::visit::NodeTraverser;
27use crate::prelude::*;
28use crate::utils::{EnterPolarsExt, to_py_err};
29use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
30
31fn pyobject_to_first_path_and_scan_sources(
32 obj: Py<PyAny>,
33) -> PyResult<(Option<PlPath>, ScanSources)> {
34 use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
35 Ok(match get_python_scan_source_input(obj, false)? {
36 PythonScanSourceInput::Path(path) => (
37 Some(path.clone()),
38 ScanSources::Paths(FromIterator::from_iter([path])),
39 ),
40 PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
41 PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
42 })
43}
44
45fn post_opt_callback(
46 lambda: &Py<PyAny>,
47 root: Node,
48 lp_arena: &mut Arena<IR>,
49 expr_arena: &mut Arena<AExpr>,
50 duration_since_start: Option<std::time::Duration>,
51) -> PolarsResult<()> {
52 Python::attach(|py| {
53 let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
54
55 let arenas = nt.get_arenas();
57
58 lambda
61 .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
62 .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
63
64 std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
68 std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
69
70 Ok(())
71 })
72}
73
74#[pymethods]
75#[allow(clippy::should_implement_trait)]
76impl PyLazyFrame {
77 #[staticmethod]
78 #[cfg(feature = "json")]
79 #[allow(clippy::too_many_arguments)]
80 #[pyo3(signature = (
81 source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
82 row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
83 ))]
84 fn new_from_ndjson(
85 source: Option<Py<PyAny>>,
86 sources: Wrap<ScanSources>,
87 infer_schema_length: Option<usize>,
88 schema: Option<Wrap<Schema>>,
89 schema_overrides: Option<Wrap<Schema>>,
90 batch_size: Option<NonZeroUsize>,
91 n_rows: Option<usize>,
92 low_memory: bool,
93 rechunk: bool,
94 row_index: Option<(String, IdxSize)>,
95 ignore_errors: bool,
96 include_file_paths: Option<String>,
97 cloud_options: Option<Vec<(String, String)>>,
98 credential_provider: Option<Py<PyAny>>,
99 retries: usize,
100 file_cache_ttl: Option<u64>,
101 ) -> PyResult<Self> {
102 use cloud::credential_provider::PlCredentialProvider;
103 let row_index = row_index.map(|(name, offset)| RowIndex {
104 name: name.into(),
105 offset,
106 });
107
108 let sources = sources.0;
109 let (first_path, sources) = match source {
110 None => (sources.first_path().map(|p| p.into_owned()), sources),
111 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
112 };
113
114 let mut r = LazyJsonLineReader::new_with_sources(sources);
115
116 #[cfg(feature = "cloud")]
117 if let Some(first_path) = first_path {
118 let first_path_url = first_path.to_str();
119
120 let mut cloud_options =
121 parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
122 cloud_options = cloud_options
123 .with_max_retries(retries)
124 .with_credential_provider(
125 credential_provider.map(PlCredentialProvider::from_python_builder),
126 );
127
128 if let Some(file_cache_ttl) = file_cache_ttl {
129 cloud_options.file_cache_ttl = file_cache_ttl;
130 }
131
132 r = r.with_cloud_options(Some(cloud_options));
133 };
134
135 let lf = r
136 .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
137 .with_batch_size(batch_size)
138 .with_n_rows(n_rows)
139 .low_memory(low_memory)
140 .with_rechunk(rechunk)
141 .with_schema(schema.map(|schema| Arc::new(schema.0)))
142 .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
143 .with_row_index(row_index)
144 .with_ignore_errors(ignore_errors)
145 .with_include_file_paths(include_file_paths.map(|x| x.into()))
146 .finish()
147 .map_err(PyPolarsErr::from)?;
148
149 Ok(lf.into())
150 }
151
152 #[staticmethod]
153 #[cfg(feature = "csv")]
154 #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
155 low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
156 infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
157 encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
158 cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
159 )
160 )]
161 fn new_from_csv(
162 source: Option<Py<PyAny>>,
163 sources: Wrap<ScanSources>,
164 separator: &str,
165 has_header: bool,
166 ignore_errors: bool,
167 skip_rows: usize,
168 skip_lines: usize,
169 n_rows: Option<usize>,
170 cache: bool,
171 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
172 low_memory: bool,
173 comment_prefix: Option<&str>,
174 quote_char: Option<&str>,
175 null_values: Option<Wrap<NullValues>>,
176 missing_utf8_is_empty_string: bool,
177 infer_schema_length: Option<usize>,
178 with_schema_modify: Option<Py<PyAny>>,
179 rechunk: bool,
180 skip_rows_after_header: usize,
181 encoding: Wrap<CsvEncoding>,
182 row_index: Option<(String, IdxSize)>,
183 try_parse_dates: bool,
184 eol_char: &str,
185 raise_if_empty: bool,
186 truncate_ragged_lines: bool,
187 decimal_comma: bool,
188 glob: bool,
189 schema: Option<Wrap<Schema>>,
190 cloud_options: Option<Vec<(String, String)>>,
191 credential_provider: Option<Py<PyAny>>,
192 retries: usize,
193 file_cache_ttl: Option<u64>,
194 include_file_paths: Option<String>,
195 ) -> PyResult<Self> {
196 #[cfg(feature = "cloud")]
197 use cloud::credential_provider::PlCredentialProvider;
198
199 let null_values = null_values.map(|w| w.0);
200 let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
201 let separator = separator
202 .as_bytes()
203 .first()
204 .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
205 .copied()
206 .map_err(PyPolarsErr::from)?;
207 let eol_char = eol_char
208 .as_bytes()
209 .first()
210 .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
211 .copied()
212 .map_err(PyPolarsErr::from)?;
213 let row_index = row_index.map(|(name, offset)| RowIndex {
214 name: name.into(),
215 offset,
216 });
217
218 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
219 overwrite_dtype
220 .into_iter()
221 .map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
222 .collect::<Schema>()
223 });
224
225 let sources = sources.0;
226 let (first_path, sources) = match source {
227 None => (sources.first_path().map(|p| p.into_owned()), sources),
228 Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
229 };
230
231 let mut r = LazyCsvReader::new_with_sources(sources);
232
233 #[cfg(feature = "cloud")]
234 if let Some(first_path) = first_path {
235 let first_path_url = first_path.to_str();
236
237 let mut cloud_options =
238 parse_cloud_options(first_path_url, cloud_options.unwrap_or_default())?;
239 if let Some(file_cache_ttl) = file_cache_ttl {
240 cloud_options.file_cache_ttl = file_cache_ttl;
241 }
242 cloud_options = cloud_options
243 .with_max_retries(retries)
244 .with_credential_provider(
245 credential_provider.map(PlCredentialProvider::from_python_builder),
246 );
247 r = r.with_cloud_options(Some(cloud_options));
248 }
249
250 let mut r = r
251 .with_infer_schema_length(infer_schema_length)
252 .with_separator(separator)
253 .with_has_header(has_header)
254 .with_ignore_errors(ignore_errors)
255 .with_skip_rows(skip_rows)
256 .with_skip_lines(skip_lines)
257 .with_n_rows(n_rows)
258 .with_cache(cache)
259 .with_dtype_overwrite(overwrite_dtype.map(Arc::new))
260 .with_schema(schema.map(|schema| Arc::new(schema.0)))
261 .with_low_memory(low_memory)
262 .with_comment_prefix(comment_prefix.map(|x| x.into()))
263 .with_quote_char(quote_char)
264 .with_eol_char(eol_char)
265 .with_rechunk(rechunk)
266 .with_skip_rows_after_header(skip_rows_after_header)
267 .with_encoding(encoding.0)
268 .with_row_index(row_index)
269 .with_try_parse_dates(try_parse_dates)
270 .with_null_values(null_values)
271 .with_missing_is_null(!missing_utf8_is_empty_string)
272 .with_truncate_ragged_lines(truncate_ragged_lines)
273 .with_decimal_comma(decimal_comma)
274 .with_glob(glob)
275 .with_raise_if_empty(raise_if_empty)
276 .with_include_file_paths(include_file_paths.map(|x| x.into()));
277
278 if let Some(lambda) = with_schema_modify {
279 let f = |schema: Schema| {
280 let iter = schema.iter_names().map(|s| s.as_str());
281 Python::attach(|py| {
282 let names = PyList::new(py, iter).unwrap();
283
284 let out = lambda.call1(py, (names,)).expect("python function failed");
285 let new_names = out
286 .extract::<Vec<String>>(py)
287 .expect("python function should return List[str]");
288 polars_ensure!(new_names.len() == schema.len(),
289 ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
290 );
291 Ok(schema
292 .iter_values()
293 .zip(new_names)
294 .map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
295 .collect())
296 })
297 };
298 r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
299 }
300
301 Ok(r.finish().map_err(PyPolarsErr::from)?.into())
302 }
303
304 #[cfg(feature = "parquet")]
305 #[staticmethod]
306 #[pyo3(signature = (
307 sources, schema, scan_options, parallel, low_memory, use_statistics
308 ))]
309 fn new_from_parquet(
310 sources: Wrap<ScanSources>,
311 schema: Option<Wrap<Schema>>,
312 scan_options: PyScanOptions,
313 parallel: Wrap<ParallelStrategy>,
314 low_memory: bool,
315 use_statistics: bool,
316 ) -> PyResult<Self> {
317 use crate::utils::to_py_err;
318
319 let parallel = parallel.0;
320
321 let options = ParquetOptions {
322 schema: schema.map(|x| Arc::new(x.0)),
323 parallel,
324 low_memory,
325 use_statistics,
326 };
327
328 let sources = sources.0;
329 let first_path = sources.first_path().map(|p| p.into_owned());
330
331 let unified_scan_args =
332 scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
333
334 let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
335 .map_err(to_py_err)?
336 .build()
337 .into();
338
339 Ok(lf.into())
340 }
341
342 #[cfg(feature = "ipc")]
343 #[staticmethod]
344 #[pyo3(signature = (sources, scan_options, file_cache_ttl))]
345 fn new_from_ipc(
346 sources: Wrap<ScanSources>,
347 scan_options: PyScanOptions,
348 file_cache_ttl: Option<u64>,
349 ) -> PyResult<Self> {
350 let options = IpcScanOptions;
351
352 let sources = sources.0;
353 let first_path = sources.first_path().map(|p| p.into_owned());
354
355 let mut unified_scan_args =
356 scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
357
358 if let Some(file_cache_ttl) = file_cache_ttl {
359 unified_scan_args
360 .cloud_options
361 .get_or_insert_default()
362 .file_cache_ttl = file_cache_ttl;
363 }
364
365 let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)
366 .map_err(PyPolarsErr::from)?;
367 Ok(lf.into())
368 }
369
370 #[cfg(feature = "scan_lines")]
371 #[staticmethod]
372 #[pyo3(signature = (sources, scan_options, name, file_cache_ttl))]
373 fn new_from_scan_lines(
374 sources: Wrap<ScanSources>,
375 scan_options: PyScanOptions,
376 name: PyBackedStr,
377 file_cache_ttl: Option<u64>,
378 ) -> PyResult<Self> {
379 let sources = sources.0;
380 let first_path = sources.first_path().map(|p| p.into_owned());
381
382 let mut unified_scan_args =
383 scan_options.extract_unified_scan_args(first_path.as_ref().map(|p| p.as_ref()))?;
384
385 if let Some(file_cache_ttl) = file_cache_ttl {
386 unified_scan_args
387 .cloud_options
388 .get_or_insert_default()
389 .file_cache_ttl = file_cache_ttl;
390 }
391
392 let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())
393 .map_err(to_py_err)?
394 .build();
395 let lf: LazyFrame = dsl.into();
396
397 Ok(lf.into())
398 }
399
400 #[staticmethod]
401 #[pyo3(signature = (
402 dataset_object
403 ))]
404 fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {
405 let lf =
406 LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
407 .into();
408
409 Ok(lf)
410 }
411
412 #[staticmethod]
413 fn scan_from_python_function_arrow_schema(
414 schema: &Bound<'_, PyList>,
415 scan_fn: Py<PyAny>,
416 pyarrow: bool,
417 validate_schema: bool,
418 is_pure: bool,
419 ) -> PyResult<Self> {
420 let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
421
422 Ok(LazyFrame::scan_from_python_function(
423 Either::Right(schema),
424 scan_fn,
425 pyarrow,
426 validate_schema,
427 is_pure,
428 )
429 .into())
430 }
431
432 #[staticmethod]
433 fn scan_from_python_function_pl_schema(
434 schema: Vec<(PyBackedStr, Wrap<DataType>)>,
435 scan_fn: Py<PyAny>,
436 pyarrow: bool,
437 validate_schema: bool,
438 is_pure: bool,
439 ) -> PyResult<Self> {
440 let schema = Arc::new(Schema::from_iter(
441 schema
442 .into_iter()
443 .map(|(name, dt)| Field::new((&*name).into(), dt.0)),
444 ));
445 Ok(LazyFrame::scan_from_python_function(
446 Either::Right(schema),
447 scan_fn,
448 pyarrow,
449 validate_schema,
450 is_pure,
451 )
452 .into())
453 }
454
455 #[staticmethod]
456 fn scan_from_python_function_schema_function(
457 schema_fn: Py<PyAny>,
458 scan_fn: Py<PyAny>,
459 validate_schema: bool,
460 is_pure: bool,
461 ) -> PyResult<Self> {
462 Ok(LazyFrame::scan_from_python_function(
463 Either::Left(schema_fn),
464 scan_fn,
465 false,
466 validate_schema,
467 is_pure,
468 )
469 .into())
470 }
471
472 fn describe_plan(&self, py: Python) -> PyResult<String> {
473 py.enter_polars(|| self.ldf.read().describe_plan())
474 }
475
476 fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
477 py.enter_polars(|| self.ldf.read().describe_optimized_plan())
478 }
479
480 fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
481 py.enter_polars(|| self.ldf.read().describe_plan_tree())
482 }
483
484 fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
485 py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
486 }
487
488 fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
489 py.enter_polars(|| self.ldf.read().to_dot(optimized))
490 }
491
492 #[cfg(feature = "new_streaming")]
493 fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
494 py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
495 }
496
497 fn sort(
498 &self,
499 by_column: &str,
500 descending: bool,
501 nulls_last: bool,
502 maintain_order: bool,
503 multithreaded: bool,
504 ) -> Self {
505 let ldf = self.ldf.read().clone();
506 ldf.sort(
507 [by_column],
508 SortMultipleOptions {
509 descending: vec![descending],
510 nulls_last: vec![nulls_last],
511 multithreaded,
512 maintain_order,
513 limit: None,
514 },
515 )
516 .into()
517 }
518
519 fn sort_by_exprs(
520 &self,
521 by: Vec<PyExpr>,
522 descending: Vec<bool>,
523 nulls_last: Vec<bool>,
524 maintain_order: bool,
525 multithreaded: bool,
526 ) -> Self {
527 let ldf = self.ldf.read().clone();
528 let exprs = by.to_exprs();
529 ldf.sort_by_exprs(
530 exprs,
531 SortMultipleOptions {
532 descending,
533 nulls_last,
534 maintain_order,
535 multithreaded,
536 limit: None,
537 },
538 )
539 .into()
540 }
541
542 fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
543 let ldf = self.ldf.read().clone();
544 let exprs = by.to_exprs();
545 ldf.top_k(
546 k,
547 exprs,
548 SortMultipleOptions::new().with_order_descending_multi(reverse),
549 )
550 .into()
551 }
552
553 fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
554 let ldf = self.ldf.read().clone();
555 let exprs = by.to_exprs();
556 ldf.bottom_k(
557 k,
558 exprs,
559 SortMultipleOptions::new().with_order_descending_multi(reverse),
560 )
561 .into()
562 }
563
564 fn cache(&self) -> Self {
565 let ldf = self.ldf.read().clone();
566 ldf.cache().into()
567 }
568
569 #[pyo3(signature = (optflags))]
570 fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
571 let ldf = self.ldf.read().clone();
572 ldf.with_optimizations(optflags.inner.into_inner()).into()
573 }
574
575 #[pyo3(signature = (lambda_post_opt))]
576 fn profile(
577 &self,
578 py: Python<'_>,
579 lambda_post_opt: Option<Py<PyAny>>,
580 ) -> PyResult<(PyDataFrame, PyDataFrame)> {
581 let (df, time_df) = py.enter_polars(|| {
582 let ldf = self.ldf.read().clone();
583 if let Some(lambda) = lambda_post_opt {
584 ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
585 post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
586 })
587 } else {
588 ldf.profile()
589 }
590 })?;
591 Ok((df.into(), time_df.into()))
592 }
593
594 #[pyo3(signature = (engine, lambda_post_opt))]
595 fn collect(
596 &self,
597 py: Python<'_>,
598 engine: Wrap<Engine>,
599 lambda_post_opt: Option<Py<PyAny>>,
600 ) -> PyResult<PyDataFrame> {
601 py.enter_polars_df(|| {
602 let ldf = self.ldf.read().clone();
603 if let Some(lambda) = lambda_post_opt {
604 ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
605 post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
606 })
607 } else {
608 ldf.collect_with_engine(engine.0)
609 }
610 })
611 }
612
613 #[pyo3(signature = (engine, lambda))]
614 fn collect_with_callback(
615 &self,
616 py: Python<'_>,
617 engine: Wrap<Engine>,
618 lambda: Py<PyAny>,
619 ) -> PyResult<()> {
620 py.enter_polars_ok(|| {
621 let ldf = self.ldf.read().clone();
622
623 polars_core::POOL.spawn(move || {
624 let result = ldf
625 .collect_with_engine(engine.0)
626 .map(PyDataFrame::new)
627 .map_err(PyPolarsErr::from);
628
629 Python::attach(|py| match result {
630 Ok(df) => {
631 lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
632 },
633 Err(err) => {
634 lambda
635 .call1(py, (PyErr::from(err),))
636 .map_err(|err| err.restore(py))
637 .ok();
638 },
639 });
640 });
641 })
642 }
643
644 #[cfg(feature = "parquet")]
645 #[pyo3(signature = (
646 target, compression, compression_level, statistics, row_group_size, data_page_size,
647 cloud_options, credential_provider, retries, sink_options, metadata, field_overwrites,
648 ))]
649 fn sink_parquet(
650 &self,
651 py: Python<'_>,
652 target: SinkTarget,
653 compression: &str,
654 compression_level: Option<i32>,
655 statistics: Wrap<StatisticsOptions>,
656 row_group_size: Option<usize>,
657 data_page_size: Option<usize>,
658 cloud_options: Option<Vec<(String, String)>>,
659 credential_provider: Option<Py<PyAny>>,
660 retries: usize,
661 sink_options: Wrap<SinkOptions>,
662 metadata: Wrap<Option<KeyValueMetadata>>,
663 field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
664 ) -> PyResult<PyLazyFrame> {
665 let compression = parse_parquet_compression(compression, compression_level)?;
666
667 let options = ParquetWriteOptions {
668 compression,
669 statistics: statistics.0,
670 row_group_size,
671 data_page_size,
672 key_value_metadata: metadata.0,
673 field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
674 };
675
676 let cloud_options = match target.base_path() {
677 None => None,
678 Some(base_path) => {
679 let cloud_options =
680 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
681 Some(
682 cloud_options
683 .with_max_retries(retries)
684 .with_credential_provider(
685 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
686 ),
687 )
688 },
689 };
690
691 py.enter_polars(|| {
692 let ldf = self.ldf.read().clone();
693 match target {
694 SinkTarget::File(target) => {
695 ldf.sink_parquet(target, options, cloud_options, sink_options.0)
696 },
697 SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
698 Arc::new(partition.base_path.0),
699 partition.file_path_cb.map(PartitionTargetCallback::Python),
700 partition.variant,
701 options,
702 cloud_options,
703 sink_options.0,
704 partition.per_partition_sort_by,
705 partition.finish_callback,
706 ),
707 }
708 .into()
709 })
710 .map(Into::into)
711 .map_err(Into::into)
712 }
713
714 #[cfg(feature = "ipc")]
715 #[pyo3(signature = (
716 target, compression, compat_level, cloud_options, credential_provider, retries,
717 sink_options
718 ))]
719 fn sink_ipc(
720 &self,
721 py: Python<'_>,
722 target: SinkTarget,
723 compression: Wrap<Option<IpcCompression>>,
724 compat_level: PyCompatLevel,
725 cloud_options: Option<Vec<(String, String)>>,
726 credential_provider: Option<Py<PyAny>>,
727 retries: usize,
728 sink_options: Wrap<SinkOptions>,
729 ) -> PyResult<PyLazyFrame> {
730 let options = IpcWriterOptions {
731 compression: compression.0,
732 compat_level: compat_level.0,
733 ..Default::default()
734 };
735
736 #[cfg(feature = "cloud")]
737 let cloud_options = match target.base_path() {
738 None => None,
739 Some(base_path) => {
740 let cloud_options =
741 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
742 Some(
743 cloud_options
744 .with_max_retries(retries)
745 .with_credential_provider(
746 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
747 ),
748 )
749 },
750 };
751
752 #[cfg(not(feature = "cloud"))]
753 let cloud_options = None;
754
755 py.enter_polars(|| {
756 let ldf = self.ldf.read().clone();
757 match target {
758 SinkTarget::File(target) => {
759 ldf.sink_ipc(target, options, cloud_options, sink_options.0)
760 },
761 SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
762 Arc::new(partition.base_path.0),
763 partition.file_path_cb.map(PartitionTargetCallback::Python),
764 partition.variant,
765 options,
766 cloud_options,
767 sink_options.0,
768 partition.per_partition_sort_by,
769 partition.finish_callback,
770 ),
771 }
772 })
773 .map(Into::into)
774 .map_err(Into::into)
775 }
776
777 #[cfg(feature = "csv")]
778 #[pyo3(signature = (
779 target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
780 datetime_format, date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
781 quote_style, cloud_options, credential_provider, retries, sink_options
782 ))]
783 fn sink_csv(
784 &self,
785 py: Python<'_>,
786 target: SinkTarget,
787 include_bom: bool,
788 include_header: bool,
789 separator: u8,
790 line_terminator: String,
791 quote_char: u8,
792 batch_size: NonZeroUsize,
793 datetime_format: Option<String>,
794 date_format: Option<String>,
795 time_format: Option<String>,
796 float_scientific: Option<bool>,
797 float_precision: Option<usize>,
798 decimal_comma: bool,
799 null_value: Option<String>,
800 quote_style: Option<Wrap<QuoteStyle>>,
801 cloud_options: Option<Vec<(String, String)>>,
802 credential_provider: Option<Py<PyAny>>,
803 retries: usize,
804 sink_options: Wrap<SinkOptions>,
805 ) -> PyResult<PyLazyFrame> {
806 let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
807 let null_value = null_value.unwrap_or(SerializeOptions::default().null);
808
809 let serialize_options = SerializeOptions {
810 date_format,
811 time_format,
812 datetime_format,
813 float_scientific,
814 float_precision,
815 decimal_comma,
816 separator,
817 quote_char,
818 null: null_value,
819 line_terminator,
820 quote_style,
821 };
822
823 let options = CsvWriterOptions {
824 include_bom,
825 include_header,
826 batch_size,
827 serialize_options,
828 };
829
830 #[cfg(feature = "cloud")]
831 let cloud_options = match target.base_path() {
832 None => None,
833 Some(base_path) => {
834 let cloud_options =
835 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
836 Some(
837 cloud_options
838 .with_max_retries(retries)
839 .with_credential_provider(
840 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
841 ),
842 )
843 },
844 };
845
846 #[cfg(not(feature = "cloud"))]
847 let cloud_options = None;
848
849 py.enter_polars(|| {
850 let ldf = self.ldf.read().clone();
851 match target {
852 SinkTarget::File(target) => {
853 ldf.sink_csv(target, options, cloud_options, sink_options.0)
854 },
855 SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
856 Arc::new(partition.base_path.0),
857 partition.file_path_cb.map(PartitionTargetCallback::Python),
858 partition.variant,
859 options,
860 cloud_options,
861 sink_options.0,
862 partition.per_partition_sort_by,
863 partition.finish_callback,
864 ),
865 }
866 })
867 .map(Into::into)
868 .map_err(Into::into)
869 }
870
871 #[allow(clippy::too_many_arguments)]
872 #[cfg(feature = "json")]
873 #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
874 fn sink_json(
875 &self,
876 py: Python<'_>,
877 target: SinkTarget,
878 cloud_options: Option<Vec<(String, String)>>,
879 credential_provider: Option<Py<PyAny>>,
880 retries: usize,
881 sink_options: Wrap<SinkOptions>,
882 ) -> PyResult<PyLazyFrame> {
883 let options = JsonWriterOptions {};
884
885 let cloud_options = match target.base_path() {
886 None => None,
887 Some(base_path) => {
888 let cloud_options =
889 parse_cloud_options(base_path.to_str(), cloud_options.unwrap_or_default())?;
890 Some(
891 cloud_options
892 .with_max_retries(retries)
893 .with_credential_provider(
894 credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_builder),
895 ),
896 )
897 },
898 };
899
900 py.enter_polars(|| {
901 let ldf = self.ldf.read().clone();
902 match target {
903 SinkTarget::File(path) => {
904 ldf.sink_json(path, options, cloud_options, sink_options.0)
905 },
906 SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
907 Arc::new(partition.base_path.0),
908 partition.file_path_cb.map(PartitionTargetCallback::Python),
909 partition.variant,
910 options,
911 cloud_options,
912 sink_options.0,
913 partition.per_partition_sort_by,
914 partition.finish_callback,
915 ),
916 }
917 })
918 .map(Into::into)
919 .map_err(Into::into)
920 }
921
922 #[pyo3(signature = (function, maintain_order, chunk_size))]
923 pub fn sink_batches(
924 &self,
925 py: Python<'_>,
926 function: Py<PyAny>,
927 maintain_order: bool,
928 chunk_size: Option<NonZeroUsize>,
929 ) -> PyResult<PyLazyFrame> {
930 let ldf = self.ldf.read().clone();
931 py.enter_polars(|| {
932 ldf.sink_batches(
933 PlanCallback::new_python(PythonObject(function)),
934 maintain_order,
935 chunk_size,
936 )
937 })
938 .map(Into::into)
939 .map_err(Into::into)
940 }
941
942 fn filter(&self, predicate: PyExpr) -> Self {
943 self.ldf.read().clone().filter(predicate.inner).into()
944 }
945
946 fn remove(&self, predicate: PyExpr) -> Self {
947 let ldf = self.ldf.read().clone();
948 ldf.remove(predicate.inner).into()
949 }
950
951 fn select(&self, exprs: Vec<PyExpr>) -> Self {
952 let ldf = self.ldf.read().clone();
953 let exprs = exprs.to_exprs();
954 ldf.select(exprs).into()
955 }
956
957 fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
958 let ldf = self.ldf.read().clone();
959 let exprs = exprs.to_exprs();
960 ldf.select_seq(exprs).into()
961 }
962
963 fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
964 let ldf = self.ldf.read().clone();
965 let by = by.to_exprs();
966 let lazy_gb = if maintain_order {
967 ldf.group_by_stable(by)
968 } else {
969 ldf.group_by(by)
970 };
971
972 PyLazyGroupBy { lgb: Some(lazy_gb) }
973 }
974
975 fn rolling(
976 &self,
977 index_column: PyExpr,
978 period: &str,
979 offset: &str,
980 closed: Wrap<ClosedWindow>,
981 by: Vec<PyExpr>,
982 ) -> PyResult<PyLazyGroupBy> {
983 let closed_window = closed.0;
984 let ldf = self.ldf.read().clone();
985 let by = by
986 .into_iter()
987 .map(|pyexpr| pyexpr.inner)
988 .collect::<Vec<_>>();
989 let lazy_gb = ldf.rolling(
990 index_column.inner,
991 by,
992 RollingGroupOptions {
993 index_column: "".into(),
994 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
995 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
996 closed_window,
997 },
998 );
999
1000 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1001 }
1002
1003 fn group_by_dynamic(
1004 &self,
1005 index_column: PyExpr,
1006 every: &str,
1007 period: &str,
1008 offset: &str,
1009 label: Wrap<Label>,
1010 include_boundaries: bool,
1011 closed: Wrap<ClosedWindow>,
1012 group_by: Vec<PyExpr>,
1013 start_by: Wrap<StartBy>,
1014 ) -> PyResult<PyLazyGroupBy> {
1015 let closed_window = closed.0;
1016 let group_by = group_by
1017 .into_iter()
1018 .map(|pyexpr| pyexpr.inner)
1019 .collect::<Vec<_>>();
1020 let ldf = self.ldf.read().clone();
1021 let lazy_gb = ldf.group_by_dynamic(
1022 index_column.inner,
1023 group_by,
1024 DynamicGroupOptions {
1025 every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
1026 period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
1027 offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
1028 label: label.0,
1029 include_boundaries,
1030 closed_window,
1031 start_by: start_by.0,
1032 ..Default::default()
1033 },
1034 );
1035
1036 Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
1037 }
1038
1039 fn with_context(&self, contexts: Vec<Self>) -> Self {
1040 let contexts = contexts
1041 .into_iter()
1042 .map(|ldf| ldf.ldf.into_inner())
1043 .collect::<Vec<_>>();
1044 self.ldf.read().clone().with_context(contexts).into()
1045 }
1046
1047 #[cfg(feature = "asof_join")]
1048 #[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
1049 fn join_asof(
1050 &self,
1051 other: Self,
1052 left_on: PyExpr,
1053 right_on: PyExpr,
1054 left_by: Option<Vec<PyBackedStr>>,
1055 right_by: Option<Vec<PyBackedStr>>,
1056 allow_parallel: bool,
1057 force_parallel: bool,
1058 suffix: String,
1059 strategy: Wrap<AsofStrategy>,
1060 tolerance: Option<Wrap<AnyValue<'_>>>,
1061 tolerance_str: Option<String>,
1062 coalesce: bool,
1063 allow_eq: bool,
1064 check_sortedness: bool,
1065 ) -> PyResult<Self> {
1066 let coalesce = if coalesce {
1067 JoinCoalesce::CoalesceColumns
1068 } else {
1069 JoinCoalesce::KeepColumns
1070 };
1071 let ldf = self.ldf.read().clone();
1072 let other = other.ldf.into_inner();
1073 let left_on = left_on.inner;
1074 let right_on = right_on.inner;
1075 Ok(ldf
1076 .join_builder()
1077 .with(other)
1078 .left_on([left_on])
1079 .right_on([right_on])
1080 .allow_parallel(allow_parallel)
1081 .force_parallel(force_parallel)
1082 .coalesce(coalesce)
1083 .how(JoinType::AsOf(Box::new(AsOfOptions {
1084 strategy: strategy.0,
1085 left_by: left_by.map(strings_to_pl_smallstr),
1086 right_by: right_by.map(strings_to_pl_smallstr),
1087 tolerance: tolerance.map(|t| {
1088 let av = t.0.into_static();
1089 let dtype = av.dtype();
1090 Scalar::new(dtype, av)
1091 }),
1092 tolerance_str: tolerance_str.map(|s| s.into()),
1093 allow_eq,
1094 check_sortedness,
1095 })))
1096 .suffix(suffix)
1097 .finish()
1098 .into())
1099 }
1100
1101 #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1102 fn join(
1103 &self,
1104 other: Self,
1105 left_on: Vec<PyExpr>,
1106 right_on: Vec<PyExpr>,
1107 allow_parallel: bool,
1108 force_parallel: bool,
1109 nulls_equal: bool,
1110 how: Wrap<JoinType>,
1111 suffix: String,
1112 validate: Wrap<JoinValidation>,
1113 maintain_order: Wrap<MaintainOrderJoin>,
1114 coalesce: Option<bool>,
1115 ) -> PyResult<Self> {
1116 let coalesce = match coalesce {
1117 None => JoinCoalesce::JoinSpecific,
1118 Some(true) => JoinCoalesce::CoalesceColumns,
1119 Some(false) => JoinCoalesce::KeepColumns,
1120 };
1121 let ldf = self.ldf.read().clone();
1122 let other = other.ldf.into_inner();
1123 let left_on = left_on
1124 .into_iter()
1125 .map(|pyexpr| pyexpr.inner)
1126 .collect::<Vec<_>>();
1127 let right_on = right_on
1128 .into_iter()
1129 .map(|pyexpr| pyexpr.inner)
1130 .collect::<Vec<_>>();
1131
1132 Ok(ldf
1133 .join_builder()
1134 .with(other)
1135 .left_on(left_on)
1136 .right_on(right_on)
1137 .allow_parallel(allow_parallel)
1138 .force_parallel(force_parallel)
1139 .join_nulls(nulls_equal)
1140 .how(how.0)
1141 .suffix(suffix)
1142 .validate(validate.0)
1143 .coalesce(coalesce)
1144 .maintain_order(maintain_order.0)
1145 .finish()
1146 .into())
1147 }
1148
1149 fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1150 let ldf = self.ldf.read().clone();
1151 let other = other.ldf.into_inner();
1152
1153 let predicates = predicates.to_exprs();
1154
1155 Ok(ldf
1156 .join_builder()
1157 .with(other)
1158 .suffix(suffix)
1159 .join_where(predicates)
1160 .into())
1161 }
1162
1163 fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1164 let ldf = self.ldf.read().clone();
1165 ldf.with_columns(exprs.to_exprs()).into()
1166 }
1167
1168 fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1169 let ldf = self.ldf.read().clone();
1170 ldf.with_columns_seq(exprs.to_exprs()).into()
1171 }
1172
1173 fn match_to_schema<'py>(
1174 &self,
1175 schema: Wrap<Schema>,
1176 missing_columns: &Bound<'py, PyAny>,
1177 missing_struct_fields: &Bound<'py, PyAny>,
1178 extra_columns: Wrap<ExtraColumnsPolicy>,
1179 extra_struct_fields: &Bound<'py, PyAny>,
1180 integer_cast: &Bound<'py, PyAny>,
1181 float_cast: &Bound<'py, PyAny>,
1182 ) -> PyResult<Self> {
1183 fn parse_missing_columns<'py>(
1184 schema: &Schema,
1185 missing_columns: &Bound<'py, PyAny>,
1186 ) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1187 let mut out = Vec::with_capacity(schema.len());
1188 if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1189 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1190 } else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1191 out.extend(std::iter::repeat_n(
1192 MissingColumnsPolicyOrExpr::Raise,
1193 schema.len(),
1194 ));
1195 for (key, value) in dict.iter() {
1196 let key = key.extract::<String>()?;
1197 let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1198 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1199 }
1200 } else {
1201 return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1202 }
1203 Ok(out)
1204 }
1205 fn parse_missing_struct_fields<'py>(
1206 schema: &Schema,
1207 missing_struct_fields: &Bound<'py, PyAny>,
1208 ) -> PyResult<Vec<MissingColumnsPolicy>> {
1209 let mut out = Vec::with_capacity(schema.len());
1210 if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1211 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1212 } else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1213 out.extend(std::iter::repeat_n(
1214 MissingColumnsPolicy::Raise,
1215 schema.len(),
1216 ));
1217 for (key, value) in dict.iter() {
1218 let key = key.extract::<String>()?;
1219 let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1220 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1221 }
1222 } else {
1223 return Err(PyTypeError::new_err(
1224 "Invalid value for `missing_struct_fields`",
1225 ));
1226 }
1227 Ok(out)
1228 }
1229 fn parse_extra_struct_fields<'py>(
1230 schema: &Schema,
1231 extra_struct_fields: &Bound<'py, PyAny>,
1232 ) -> PyResult<Vec<ExtraColumnsPolicy>> {
1233 let mut out = Vec::with_capacity(schema.len());
1234 if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1235 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1236 } else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1237 out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1238 for (key, value) in dict.iter() {
1239 let key = key.extract::<String>()?;
1240 let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1241 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1242 }
1243 } else {
1244 return Err(PyTypeError::new_err(
1245 "Invalid value for `extra_struct_fields`",
1246 ));
1247 }
1248 Ok(out)
1249 }
1250 fn parse_cast<'py>(
1251 schema: &Schema,
1252 cast: &Bound<'py, PyAny>,
1253 ) -> PyResult<Vec<UpcastOrForbid>> {
1254 let mut out = Vec::with_capacity(schema.len());
1255 if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1256 out.extend(std::iter::repeat_n(policy.0, schema.len()));
1257 } else if let Ok(dict) = cast.downcast::<PyDict>() {
1258 out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1259 for (key, value) in dict.iter() {
1260 let key = key.extract::<String>()?;
1261 let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1262 out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1263 }
1264 } else {
1265 return Err(PyTypeError::new_err(
1266 "Invalid value for `integer_cast` / `float_cast`",
1267 ));
1268 }
1269 Ok(out)
1270 }
1271
1272 let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1273 let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1274 let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1275 let integer_cast = parse_cast(&schema.0, integer_cast)?;
1276 let float_cast = parse_cast(&schema.0, float_cast)?;
1277
1278 let per_column = (0..schema.0.len())
1279 .map(|i| MatchToSchemaPerColumn {
1280 missing_columns: missing_columns[i].clone(),
1281 missing_struct_fields: missing_struct_fields[i],
1282 extra_struct_fields: extra_struct_fields[i],
1283 integer_cast: integer_cast[i],
1284 float_cast: float_cast[i],
1285 })
1286 .collect();
1287
1288 let ldf = self.ldf.read().clone();
1289 Ok(ldf
1290 .match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1291 .into())
1292 }
1293
1294 fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {
1295 let ldf = self.ldf.read().clone();
1296 let function = PythonObject(callback);
1297 ldf.pipe_with_schema(PlanCallback::new_python(function))
1298 .into()
1299 }
1300
1301 fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1302 let ldf = self.ldf.read().clone();
1303 ldf.rename(existing, new, strict).into()
1304 }
1305
1306 fn reverse(&self) -> Self {
1307 let ldf = self.ldf.read().clone();
1308 ldf.reverse().into()
1309 }
1310
1311 #[pyo3(signature = (n, fill_value=None))]
1312 fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1313 let lf = self.ldf.read().clone();
1314 let out = match fill_value {
1315 Some(v) => lf.shift_and_fill(n.inner, v.inner),
1316 None => lf.shift(n.inner),
1317 };
1318 out.into()
1319 }
1320
1321 fn fill_nan(&self, fill_value: PyExpr) -> Self {
1322 let ldf = self.ldf.read().clone();
1323 ldf.fill_nan(fill_value.inner).into()
1324 }
1325
1326 fn min(&self) -> Self {
1327 let ldf = self.ldf.read().clone();
1328 let out = ldf.min();
1329 out.into()
1330 }
1331
1332 fn max(&self) -> Self {
1333 let ldf = self.ldf.read().clone();
1334 let out = ldf.max();
1335 out.into()
1336 }
1337
1338 fn sum(&self) -> Self {
1339 let ldf = self.ldf.read().clone();
1340 let out = ldf.sum();
1341 out.into()
1342 }
1343
1344 fn mean(&self) -> Self {
1345 let ldf = self.ldf.read().clone();
1346 let out = ldf.mean();
1347 out.into()
1348 }
1349
1350 fn std(&self, ddof: u8) -> Self {
1351 let ldf = self.ldf.read().clone();
1352 let out = ldf.std(ddof);
1353 out.into()
1354 }
1355
1356 fn var(&self, ddof: u8) -> Self {
1357 let ldf = self.ldf.read().clone();
1358 let out = ldf.var(ddof);
1359 out.into()
1360 }
1361
1362 fn median(&self) -> Self {
1363 let ldf = self.ldf.read().clone();
1364 let out = ldf.median();
1365 out.into()
1366 }
1367
1368 fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1369 let ldf = self.ldf.read().clone();
1370 let out = ldf.quantile(quantile.inner, interpolation.0);
1371 out.into()
1372 }
1373
1374 fn explode(&self, subset: PySelector) -> Self {
1375 self.ldf.read().clone().explode(subset.inner).into()
1376 }
1377
1378 fn null_count(&self) -> Self {
1379 let ldf = self.ldf.read().clone();
1380 ldf.null_count().into()
1381 }
1382
1383 #[pyo3(signature = (maintain_order, subset, keep))]
1384 fn unique(
1385 &self,
1386 maintain_order: bool,
1387 subset: Option<PySelector>,
1388 keep: Wrap<UniqueKeepStrategy>,
1389 ) -> Self {
1390 let ldf = self.ldf.read().clone();
1391 let subset = subset.map(|e| e.inner);
1392 match maintain_order {
1393 true => ldf.unique_stable_generic(subset, keep.0),
1394 false => ldf.unique_generic(subset, keep.0),
1395 }
1396 .into()
1397 }
1398
1399 fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1400 self.ldf
1401 .read()
1402 .clone()
1403 .drop_nans(subset.map(|e| e.inner))
1404 .into()
1405 }
1406
1407 fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1408 self.ldf
1409 .read()
1410 .clone()
1411 .drop_nulls(subset.map(|e| e.inner))
1412 .into()
1413 }
1414
1415 #[pyo3(signature = (offset, len=None))]
1416 fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1417 let ldf = self.ldf.read().clone();
1418 ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1419 }
1420
1421 fn tail(&self, n: IdxSize) -> Self {
1422 let ldf = self.ldf.read().clone();
1423 ldf.tail(n).into()
1424 }
1425
1426 #[cfg(feature = "pivot")]
1427 #[pyo3(signature = (on, index, value_name, variable_name))]
1428 fn unpivot(
1429 &self,
1430 on: PySelector,
1431 index: PySelector,
1432 value_name: Option<String>,
1433 variable_name: Option<String>,
1434 ) -> Self {
1435 let args = UnpivotArgsDSL {
1436 on: on.inner,
1437 index: index.inner,
1438 value_name: value_name.map(|s| s.into()),
1439 variable_name: variable_name.map(|s| s.into()),
1440 };
1441
1442 let ldf = self.ldf.read().clone();
1443 ldf.unpivot(args).into()
1444 }
1445
1446 #[pyo3(signature = (name, offset=None))]
1447 fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1448 let ldf = self.ldf.read().clone();
1449 ldf.with_row_index(name, offset).into()
1450 }
1451
1452 #[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1453 fn map_batches(
1454 &self,
1455 function: Py<PyAny>,
1456 predicate_pushdown: bool,
1457 projection_pushdown: bool,
1458 slice_pushdown: bool,
1459 streamable: bool,
1460 schema: Option<Wrap<Schema>>,
1461 validate_output: bool,
1462 ) -> Self {
1463 let mut opt = OptFlags::default();
1464 opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1465 opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1466 opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1467 opt.set(OptFlags::NEW_STREAMING, streamable);
1468
1469 self.ldf
1470 .read()
1471 .clone()
1472 .map_python(
1473 function.into(),
1474 opt,
1475 schema.map(|s| Arc::new(s.0)),
1476 validate_output,
1477 )
1478 .into()
1479 }
1480
1481 fn drop(&self, columns: PySelector) -> Self {
1482 self.ldf.read().clone().drop(columns.inner).into()
1483 }
1484
1485 fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1486 let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1487 cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1488 self.ldf.read().clone().cast(cast_map, strict).into()
1489 }
1490
1491 fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1492 self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1493 }
1494
1495 fn clone(&self) -> Self {
1496 self.ldf.read().clone().into()
1497 }
1498
1499 fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1500 let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1501
1502 let schema_dict = PyDict::new(py);
1503 schema.iter_fields().for_each(|fld| {
1504 schema_dict
1505 .set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1506 .unwrap()
1507 });
1508 Ok(schema_dict)
1509 }
1510
1511 fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {
1512 self.ldf
1513 .read()
1514 .clone()
1515 .unnest(columns.inner, separator.map(PlSmallStr::from_str))
1516 .into()
1517 }
1518
1519 fn count(&self) -> Self {
1520 let ldf = self.ldf.read().clone();
1521 ldf.count().into()
1522 }
1523
1524 #[cfg(feature = "merge_sorted")]
1525 fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1526 let out = self
1527 .ldf
1528 .read()
1529 .clone()
1530 .merge_sorted(other.ldf.into_inner(), key)
1531 .map_err(PyPolarsErr::from)?;
1532 Ok(out.into())
1533 }
1534
1535 fn hint_sorted(
1536 &self,
1537 columns: Vec<String>,
1538 descending: Vec<bool>,
1539 nulls_last: Vec<bool>,
1540 ) -> PyResult<Self> {
1541 if columns.len() != descending.len() && descending.len() != 1 {
1542 return Err(PyValueError::new_err(
1543 "`set_sorted` expects the same amount of `columns` as `descending` values.",
1544 ));
1545 }
1546 if columns.len() != nulls_last.len() && nulls_last.len() != 1 {
1547 return Err(PyValueError::new_err(
1548 "`set_sorted` expects the same amount of `columns` as `nulls_last` values.",
1549 ));
1550 }
1551
1552 let mut sorted = columns
1553 .iter()
1554 .map(|c| Sorted {
1555 column: PlSmallStr::from_str(c.as_str()),
1556 descending: false,
1557 nulls_last: false,
1558 })
1559 .collect::<Vec<_>>();
1560
1561 if !columns.is_empty() {
1562 if descending.len() != 1 {
1563 sorted
1564 .iter_mut()
1565 .zip(descending)
1566 .for_each(|(s, d)| s.descending = d);
1567 } else if descending[0] {
1568 sorted.iter_mut().for_each(|s| s.descending = true);
1569 }
1570
1571 if nulls_last.len() != 1 {
1572 sorted
1573 .iter_mut()
1574 .zip(nulls_last)
1575 .for_each(|(s, d)| s.nulls_last = d);
1576 } else if nulls_last[0] {
1577 sorted.iter_mut().for_each(|s| s.nulls_last = true);
1578 }
1579 }
1580
1581 let out = self
1582 .ldf
1583 .read()
1584 .clone()
1585 .hint(HintIR::Sorted(sorted.into()))
1586 .map_err(PyPolarsErr::from)?;
1587 Ok(out.into())
1588 }
1589}
1590
1591#[cfg(feature = "parquet")]
1592impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1593 fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1594 use polars_io::parquet::write::ParquetFieldOverwrites;
1595
1596 let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1597
1598 let name = PyDictMethods::get_item(&parsed, "name")?
1599 .map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1600 .transpose()?;
1601 let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1602 PyResult::Ok(ChildFieldOverwrites::None),
1603 |v| {
1604 Ok(
1605 if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1606 ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1607 } else {
1608 ChildFieldOverwrites::ListLike(Box::new(
1609 v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1610 ))
1611 },
1612 )
1613 },
1614 )?;
1615
1616 let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1617 .map(|v| v.extract::<i32>())
1618 .transpose()?;
1619
1620 let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1621 .map(|v| v.extract::<Vec<(String, Option<String>)>>())
1622 .transpose()?;
1623 let metadata = metadata.map(|v| {
1624 v.into_iter()
1625 .map(|v| MetadataKeyValue {
1626 key: v.0.into(),
1627 value: v.1.map(|v| v.into()),
1628 })
1629 .collect()
1630 });
1631
1632 let required = PyDictMethods::get_item(&parsed, "required")?
1633 .map(|v| v.extract::<bool>())
1634 .transpose()?;
1635
1636 Ok(Wrap(ParquetFieldOverwrites {
1637 name,
1638 children,
1639 field_id,
1640 metadata,
1641 required,
1642 }))
1643 }
1644}