1use polars::lazy::dsl;
2use polars::prelude::*;
3use polars_plan::prelude::UnionArgs;
4use pyo3::exceptions::{PyTypeError, PyValueError};
5use pyo3::prelude::*;
6use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
7
8use crate::conversion::any_value::py_object_to_any_value;
9use crate::conversion::{get_lf, Wrap};
10use crate::error::PyPolarsErr;
11use crate::expr::ToExprs;
12use crate::map::lazy::binary_lambda;
13use crate::prelude::vec_extract_wrapped;
14use crate::{map, PyDataFrame, PyExpr, PyLazyFrame, PySeries};
15
16macro_rules! set_unwrapped_or_0 {
17 ($($var:ident),+ $(,)?) => {
18 $(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
19 };
20}
21
22#[pyfunction]
23pub fn rolling_corr(
24 x: PyExpr,
25 y: PyExpr,
26 window_size: IdxSize,
27 min_periods: IdxSize,
28 ddof: u8,
29) -> PyExpr {
30 dsl::rolling_corr(
31 x.inner,
32 y.inner,
33 RollingCovOptions {
34 min_periods,
35 window_size,
36 ddof,
37 },
38 )
39 .into()
40}
41
42#[pyfunction]
43pub fn rolling_cov(
44 x: PyExpr,
45 y: PyExpr,
46 window_size: IdxSize,
47 min_periods: IdxSize,
48 ddof: u8,
49) -> PyExpr {
50 dsl::rolling_cov(
51 x.inner,
52 y.inner,
53 RollingCovOptions {
54 min_periods,
55 window_size,
56 ddof,
57 },
58 )
59 .into()
60}
61
62#[pyfunction]
63pub fn arg_sort_by(
64 by: Vec<PyExpr>,
65 descending: Vec<bool>,
66 nulls_last: Vec<bool>,
67 multithreaded: bool,
68 maintain_order: bool,
69) -> PyExpr {
70 let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
71 dsl::arg_sort_by(
72 by,
73 SortMultipleOptions {
74 descending,
75 nulls_last,
76 multithreaded,
77 maintain_order,
78 limit: None,
79 },
80 )
81 .into()
82}
83#[pyfunction]
84pub fn arg_where(condition: PyExpr) -> PyExpr {
85 dsl::arg_where(condition.inner).into()
86}
87
88#[pyfunction]
89pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
90 let exprs = exprs.to_exprs();
91 if exprs.is_empty() {
92 return Err(PyValueError::new_err(
93 "expected at least 1 expression in 'as_struct'",
94 ));
95 }
96 Ok(dsl::as_struct(exprs).into())
97}
98
99#[pyfunction]
100pub fn field(names: Vec<String>) -> PyExpr {
101 dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
102}
103
104#[pyfunction]
105pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
106 let exprs = exprs.to_exprs();
107 dsl::coalesce(&exprs).into()
108}
109
110#[pyfunction]
111pub fn col(name: &str) -> PyExpr {
112 dsl::col(name).into()
113}
114
115#[pyfunction]
116pub fn collect_all(lfs: Vec<PyLazyFrame>, py: Python) -> PyResult<Vec<PyDataFrame>> {
117 use polars_core::utils::rayon::prelude::*;
118
119 let out = py.allow_threads(|| {
120 polars_core::POOL.install(|| {
121 lfs.par_iter()
122 .map(|lf| {
123 let df = lf.ldf.clone().collect()?;
124 Ok(PyDataFrame::new(df))
125 })
126 .collect::<polars_core::error::PolarsResult<Vec<_>>>()
127 .map_err(PyPolarsErr::from)
128 })
129 });
130
131 Ok(out?)
132}
133
134#[pyfunction]
135pub fn collect_all_with_callback(lfs: Vec<PyLazyFrame>, lambda: PyObject) {
136 use polars_core::utils::rayon::prelude::*;
137
138 polars_core::POOL.spawn(move || {
139 let result = lfs
140 .par_iter()
141 .map(|lf| {
142 let df = lf.ldf.clone().collect()?;
143 Ok(PyDataFrame::new(df))
144 })
145 .collect::<polars_core::error::PolarsResult<Vec<_>>>()
146 .map_err(PyPolarsErr::from);
147
148 Python::with_gil(|py| match result {
149 Ok(dfs) => {
150 lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
151 },
152 Err(err) => {
153 lambda
154 .call1(py, (PyErr::from(err),))
155 .map_err(|err| err.restore(py))
156 .ok();
157 },
158 })
159 })
160}
161
162#[pyfunction]
163pub fn cols(names: Vec<String>) -> PyExpr {
164 dsl::cols(names).into()
165}
166
167#[pyfunction]
168pub fn concat_lf(
169 seq: &Bound<'_, PyAny>,
170 rechunk: bool,
171 parallel: bool,
172 to_supertypes: bool,
173) -> PyResult<PyLazyFrame> {
174 let len = seq.len()?;
175 let mut lfs = Vec::with_capacity(len);
176
177 for res in seq.try_iter()? {
178 let item = res?;
179 let lf = get_lf(&item)?;
180 lfs.push(lf);
181 }
182
183 let lf = dsl::concat(
184 lfs,
185 UnionArgs {
186 rechunk,
187 parallel,
188 to_supertypes,
189 ..Default::default()
190 },
191 )
192 .map_err(PyPolarsErr::from)?;
193 Ok(lf.into())
194}
195
196#[pyfunction]
197pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
198 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
199 let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
200 Ok(expr.into())
201}
202
203#[pyfunction]
204pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
205 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
206 let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
207 Ok(expr.into())
208}
209
210#[pyfunction]
211pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
212 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
213 dsl::concat_str(s, separator, ignore_nulls).into()
214}
215
216#[pyfunction]
217pub fn len() -> PyExpr {
218 dsl::len().into()
219}
220
221#[pyfunction]
222pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
223 dsl::cov(a.inner, b.inner, ddof).into()
224}
225
226#[pyfunction]
227#[cfg(feature = "trigonometry")]
228pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
229 y.inner.arctan2(x.inner).into()
230}
231
232#[pyfunction]
233pub fn cum_fold(acc: PyExpr, lambda: PyObject, exprs: Vec<PyExpr>, include_init: bool) -> PyExpr {
234 let exprs = exprs.to_exprs();
235
236 let func = move |a: Column, b: Column| {
237 binary_lambda(
238 &lambda,
239 a.take_materialized_series(),
240 b.take_materialized_series(),
241 )
242 .map(|v| v.map(Column::from))
243 };
244 dsl::cum_fold_exprs(acc.inner, func, exprs, include_init).into()
245}
246
247#[pyfunction]
248pub fn cum_reduce(lambda: PyObject, exprs: Vec<PyExpr>) -> PyExpr {
249 let exprs = exprs.to_exprs();
250
251 let func = move |a: Column, b: Column| {
252 binary_lambda(
253 &lambda,
254 a.take_materialized_series(),
255 b.take_materialized_series(),
256 )
257 .map(|v| v.map(Column::from))
258 };
259 dsl::cum_reduce_exprs(func, exprs).into()
260}
261
262#[pyfunction]
263#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=None, ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
264pub fn datetime(
265 year: PyExpr,
266 month: PyExpr,
267 day: PyExpr,
268 hour: Option<PyExpr>,
269 minute: Option<PyExpr>,
270 second: Option<PyExpr>,
271 microsecond: Option<PyExpr>,
272 time_unit: Wrap<TimeUnit>,
273 time_zone: Option<Wrap<TimeZone>>,
274 ambiguous: PyExpr,
275) -> PyExpr {
276 let year = year.inner;
277 let month = month.inner;
278 let day = day.inner;
279 set_unwrapped_or_0!(hour, minute, second, microsecond);
280 let ambiguous = ambiguous.inner;
281 let time_unit = time_unit.0;
282 let time_zone = time_zone.map(|x| x.0);
283 let args = DatetimeArgs {
284 year,
285 month,
286 day,
287 hour,
288 minute,
289 second,
290 microsecond,
291 time_unit,
292 time_zone,
293 ambiguous,
294 };
295 dsl::datetime(args).into()
296}
297
298#[pyfunction]
299pub fn concat_lf_diagonal(
300 lfs: &Bound<'_, PyAny>,
301 rechunk: bool,
302 parallel: bool,
303 to_supertypes: bool,
304) -> PyResult<PyLazyFrame> {
305 let iter = lfs.try_iter()?;
306
307 let lfs = iter
308 .map(|item| {
309 let item = item?;
310 get_lf(&item)
311 })
312 .collect::<PyResult<Vec<_>>>()?;
313
314 let lf = dsl::functions::concat_lf_diagonal(
315 lfs,
316 UnionArgs {
317 rechunk,
318 parallel,
319 to_supertypes,
320 ..Default::default()
321 },
322 )
323 .map_err(PyPolarsErr::from)?;
324 Ok(lf.into())
325}
326
327#[pyfunction]
328pub fn concat_lf_horizontal(lfs: &Bound<'_, PyAny>, parallel: bool) -> PyResult<PyLazyFrame> {
329 let iter = lfs.try_iter()?;
330
331 let lfs = iter
332 .map(|item| {
333 let item = item?;
334 get_lf(&item)
335 })
336 .collect::<PyResult<Vec<_>>>()?;
337
338 let args = UnionArgs {
339 rechunk: false, parallel,
341 to_supertypes: false,
342 ..Default::default()
343 };
344 let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;
345 Ok(lf.into())
346}
347
348#[pyfunction]
349pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
350 let e = e.to_exprs();
351 let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
352 Ok(e.into())
353}
354
355#[pyfunction]
356pub fn dtype_cols(dtypes: Vec<Wrap<DataType>>) -> PyResult<PyExpr> {
357 let dtypes = vec_extract_wrapped(dtypes);
358 Ok(dsl::dtype_cols(dtypes).into())
359}
360
361#[pyfunction]
362pub fn index_cols(indices: Vec<i64>) -> PyExpr {
363 if indices.len() == 1 {
364 dsl::nth(indices[0])
365 } else {
366 dsl::index_cols(indices)
367 }
368 .into()
369}
370
371#[pyfunction]
372#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
373pub fn duration(
374 weeks: Option<PyExpr>,
375 days: Option<PyExpr>,
376 hours: Option<PyExpr>,
377 minutes: Option<PyExpr>,
378 seconds: Option<PyExpr>,
379 milliseconds: Option<PyExpr>,
380 microseconds: Option<PyExpr>,
381 nanoseconds: Option<PyExpr>,
382 time_unit: Wrap<TimeUnit>,
383) -> PyExpr {
384 set_unwrapped_or_0!(
385 weeks,
386 days,
387 hours,
388 minutes,
389 seconds,
390 milliseconds,
391 microseconds,
392 nanoseconds,
393 );
394 let args = DurationArgs {
395 weeks,
396 days,
397 hours,
398 minutes,
399 seconds,
400 milliseconds,
401 microseconds,
402 nanoseconds,
403 time_unit: time_unit.0,
404 };
405 dsl::duration(args).into()
406}
407
408#[pyfunction]
409pub fn first() -> PyExpr {
410 dsl::first().into()
411}
412
413#[pyfunction]
414pub fn fold(acc: PyExpr, lambda: PyObject, exprs: Vec<PyExpr>) -> PyExpr {
415 let exprs = exprs.to_exprs();
416
417 let func = move |a: Column, b: Column| {
418 binary_lambda(
419 &lambda,
420 a.take_materialized_series(),
421 b.take_materialized_series(),
422 )
423 .map(|v| v.map(Column::from))
424 };
425 dsl::fold_exprs(acc.inner, func, exprs).into()
426}
427
428#[pyfunction]
429pub fn last() -> PyExpr {
430 dsl::last().into()
431}
432
433#[pyfunction]
434pub fn nth(n: i64) -> PyExpr {
435 dsl::nth(n).into()
436}
437
438#[pyfunction]
439pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
440 let py = value.py();
441 if value.is_instance_of::<PyBool>() {
442 let val = value.extract::<bool>()?;
443 Ok(dsl::lit(val).into())
444 } else if let Ok(int) = value.downcast::<PyInt>() {
445 let v = int
446 .extract::<i128>()
447 .map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
448 .map_err(PyPolarsErr::from)?;
449 Ok(Expr::Literal(LiteralValue::Int(v)).into())
450 } else if let Ok(float) = value.downcast::<PyFloat>() {
451 let val = float.extract::<f64>()?;
452 Ok(Expr::Literal(LiteralValue::Float(val)).into())
453 } else if let Ok(pystr) = value.downcast::<PyString>() {
454 Ok(dsl::lit(pystr.to_string()).into())
455 } else if let Ok(series) = value.extract::<PySeries>() {
456 let s = series.series;
457 if is_scalar {
458 let av = s
459 .get(0)
460 .map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
461 let av = av.into_static();
462 Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
463 } else {
464 Ok(dsl::lit(s).into())
465 }
466 } else if value.is_none() {
467 Ok(dsl::lit(Null {}).into())
468 } else if let Ok(value) = value.downcast::<PyBytes>() {
469 Ok(dsl::lit(value.as_bytes()).into())
470 } else {
471 let av = py_object_to_any_value(value, true, allow_object).map_err(|_| {
472 PyTypeError::new_err(
473 format!(
474 "cannot create expression literal for value of type {}.\
475 \n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
476 value.get_type().qualname().map(|s|s.to_string()).unwrap_or("unknown".to_owned()),
477 )
478 )
479 })?;
480 match av {
481 #[cfg(feature = "object")]
482 AnyValue::ObjectOwned(_) => {
483 let s = PySeries::new_object(py, "", vec![value.extract()?], false).series;
484 Ok(dsl::lit(s).into())
485 },
486 _ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
487 }
488 }
489}
490
491#[pyfunction]
492#[pyo3(signature = (pyexpr, lambda, output_type, map_groups, returns_scalar))]
493pub fn map_mul(
494 py: Python,
495 pyexpr: Vec<PyExpr>,
496 lambda: PyObject,
497 output_type: Option<Wrap<DataType>>,
498 map_groups: bool,
499 returns_scalar: bool,
500) -> PyExpr {
501 map::lazy::map_mul(&pyexpr, py, lambda, output_type, map_groups, returns_scalar)
502}
503
504#[pyfunction]
505pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
506 dsl::pearson_corr(a.inner, b.inner).into()
507}
508
509#[pyfunction]
510pub fn reduce(lambda: PyObject, exprs: Vec<PyExpr>) -> PyExpr {
511 let exprs = exprs.to_exprs();
512
513 let func = move |a: Column, b: Column| {
514 binary_lambda(
515 &lambda,
516 a.take_materialized_series(),
517 b.take_materialized_series(),
518 )
519 .map(|v| v.map(Column::from))
520 };
521 dsl::reduce_exprs(func, exprs).into()
522}
523
524#[pyfunction]
525#[pyo3(signature = (value, n, dtype=None))]
526pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyResult<PyExpr> {
527 let mut value = value.inner;
528 let n = n.inner;
529
530 if let Some(dtype) = dtype {
531 value = value.cast(dtype.0);
532 }
533
534 Ok(dsl::repeat(value, n).into())
535}
536
537#[pyfunction]
538pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
539 #[cfg(feature = "propagate_nans")]
540 {
541 dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
542 }
543 #[cfg(not(feature = "propagate_nans"))]
544 {
545 panic!("activate 'propagate_nans'")
546 }
547}
548
549#[pyfunction]
550#[cfg(feature = "sql")]
551pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
552 let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
553 Ok(expr.into())
554}