1use polars::lazy::dsl;
2use polars::prelude::*;
3use polars_plan::plans::DynLiteralValue;
4use polars_plan::prelude::UnionArgs;
5use polars_utils::python_function::PythonObject;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
9
10use crate::conversion::any_value::py_object_to_any_value;
11use crate::conversion::{Wrap, get_lf};
12use crate::error::PyPolarsErr;
13use crate::expr::ToExprs;
14use crate::expr::datatype::PyDataTypeExpr;
15use crate::lazyframe::PyOptFlags;
16use crate::utils::EnterPolarsExt;
17use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};
18
19macro_rules! set_unwrapped_or_0 {
20 ($($var:ident),+ $(,)?) => {
21 $(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
22 };
23}
24
25#[pyfunction]
26pub fn rolling_corr(
27 x: PyExpr,
28 y: PyExpr,
29 window_size: IdxSize,
30 min_periods: IdxSize,
31 ddof: u8,
32) -> PyExpr {
33 dsl::rolling_corr(
34 x.inner,
35 y.inner,
36 RollingCovOptions {
37 min_periods,
38 window_size,
39 ddof,
40 },
41 )
42 .into()
43}
44
45#[pyfunction]
46pub fn rolling_cov(
47 x: PyExpr,
48 y: PyExpr,
49 window_size: IdxSize,
50 min_periods: IdxSize,
51 ddof: u8,
52) -> PyExpr {
53 dsl::rolling_cov(
54 x.inner,
55 y.inner,
56 RollingCovOptions {
57 min_periods,
58 window_size,
59 ddof,
60 },
61 )
62 .into()
63}
64
65#[pyfunction]
66pub fn arg_sort_by(
67 by: Vec<PyExpr>,
68 descending: Vec<bool>,
69 nulls_last: Vec<bool>,
70 multithreaded: bool,
71 maintain_order: bool,
72) -> PyExpr {
73 let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
74 dsl::arg_sort_by(
75 by,
76 SortMultipleOptions {
77 descending,
78 nulls_last,
79 multithreaded,
80 maintain_order,
81 limit: None,
82 },
83 )
84 .into()
85}
86#[pyfunction]
87pub fn arg_where(condition: PyExpr) -> PyExpr {
88 dsl::arg_where(condition.inner).into()
89}
90
91#[pyfunction]
92pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
93 let exprs = exprs.to_exprs();
94 if exprs.is_empty() {
95 return Err(PyValueError::new_err(
96 "expected at least 1 expression in 'as_struct'",
97 ));
98 }
99 Ok(dsl::as_struct(exprs).into())
100}
101
102#[pyfunction]
103pub fn field(names: Vec<String>) -> PyExpr {
104 dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
105}
106
107#[pyfunction]
108pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
109 let exprs = exprs.to_exprs();
110 dsl::coalesce(&exprs).into()
111}
112
113#[pyfunction]
114pub fn col(name: &str) -> PyExpr {
115 dsl::col(name).into()
116}
117
118fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {
119 lfs.into_iter().map(|lf| lf.ldf.logical_plan).collect()
120}
121
122#[pyfunction]
123pub fn collect_all(
124 lfs: Vec<PyLazyFrame>,
125 engine: Wrap<Engine>,
126 optflags: PyOptFlags,
127 py: Python<'_>,
128) -> PyResult<Vec<PyDataFrame>> {
129 let plans = lfs_to_plans(lfs);
130 let dfs =
131 py.enter_polars(|| LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner))?;
132 Ok(dfs.into_iter().map(Into::into).collect())
133}
134
135#[pyfunction]
136pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {
137 let plans = lfs_to_plans(lfs);
138 let explained = py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner))?;
139 Ok(explained)
140}
141
142#[pyfunction]
143pub fn collect_all_with_callback(
144 lfs: Vec<PyLazyFrame>,
145 engine: Wrap<Engine>,
146 optflags: PyOptFlags,
147 lambda: PyObject,
148 py: Python<'_>,
149) {
150 let plans = lfs.into_iter().map(|lf| lf.ldf.logical_plan).collect();
151 let result = py
152 .enter_polars(|| LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner))
153 .map(|dfs| {
154 dfs.into_iter()
155 .map(Into::into)
156 .collect::<Vec<PyDataFrame>>()
157 });
158
159 Python::with_gil(|py| match result {
160 Ok(dfs) => {
161 lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
162 },
163 Err(err) => {
164 lambda
165 .call1(py, (PyErr::from(err),))
166 .map_err(|err| err.restore(py))
167 .ok();
168 },
169 })
170}
171
172#[pyfunction]
173pub fn concat_lf(
174 seq: &Bound<'_, PyAny>,
175 rechunk: bool,
176 parallel: bool,
177 to_supertypes: bool,
178) -> PyResult<PyLazyFrame> {
179 let len = seq.len()?;
180 let mut lfs = Vec::with_capacity(len);
181
182 for res in seq.try_iter()? {
183 let item = res?;
184 let lf = get_lf(&item)?;
185 lfs.push(lf);
186 }
187
188 let lf = dsl::concat(
189 lfs,
190 UnionArgs {
191 rechunk,
192 parallel,
193 to_supertypes,
194 ..Default::default()
195 },
196 )
197 .map_err(PyPolarsErr::from)?;
198 Ok(lf.into())
199}
200
201#[pyfunction]
202pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
203 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
204 let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
205 Ok(expr.into())
206}
207
208#[pyfunction]
209pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
210 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
211 let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
212 Ok(expr.into())
213}
214
215#[pyfunction]
216pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
217 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
218 dsl::concat_str(s, separator, ignore_nulls).into()
219}
220
221#[pyfunction]
222pub fn len() -> PyExpr {
223 dsl::len().into()
224}
225
226#[pyfunction]
227pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
228 dsl::cov(a.inner, b.inner, ddof).into()
229}
230
231#[pyfunction]
232#[cfg(feature = "trigonometry")]
233pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
234 y.inner.arctan2(x.inner).into()
235}
236
237#[pyfunction]
238pub fn cum_fold(
239 acc: PyExpr,
240 lambda: PyObject,
241 exprs: Vec<PyExpr>,
242 returns_scalar: bool,
243 return_dtype: Option<PyDataTypeExpr>,
244 include_init: bool,
245) -> PyExpr {
246 let exprs = exprs.to_exprs();
247 let func = PlanCallback::new_python(PythonObject(lambda));
248 dsl::cum_fold_exprs(
249 acc.inner,
250 func,
251 exprs,
252 returns_scalar,
253 return_dtype.map(|v| v.inner),
254 include_init,
255 )
256 .into()
257}
258
259#[pyfunction]
260pub fn cum_reduce(
261 lambda: PyObject,
262 exprs: Vec<PyExpr>,
263 returns_scalar: bool,
264 return_dtype: Option<PyDataTypeExpr>,
265) -> PyExpr {
266 let exprs = exprs.to_exprs();
267
268 let func = PlanCallback::new_python(PythonObject(lambda));
269 dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
270}
271
272#[pyfunction]
273#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
274pub fn datetime(
275 year: PyExpr,
276 month: PyExpr,
277 day: PyExpr,
278 hour: Option<PyExpr>,
279 minute: Option<PyExpr>,
280 second: Option<PyExpr>,
281 microsecond: Option<PyExpr>,
282 time_unit: Wrap<TimeUnit>,
283 time_zone: Wrap<Option<TimeZone>>,
284 ambiguous: PyExpr,
285) -> PyExpr {
286 let year = year.inner;
287 let month = month.inner;
288 let day = day.inner;
289 set_unwrapped_or_0!(hour, minute, second, microsecond);
290 let ambiguous = ambiguous.inner;
291 let time_unit = time_unit.0;
292 let time_zone = time_zone.0;
293 let args = DatetimeArgs {
294 year,
295 month,
296 day,
297 hour,
298 minute,
299 second,
300 microsecond,
301 time_unit,
302 time_zone,
303 ambiguous,
304 };
305 dsl::datetime(args).into()
306}
307
308#[pyfunction]
309pub fn concat_lf_diagonal(
310 lfs: &Bound<'_, PyAny>,
311 rechunk: bool,
312 parallel: bool,
313 to_supertypes: bool,
314) -> PyResult<PyLazyFrame> {
315 let iter = lfs.try_iter()?;
316
317 let lfs = iter
318 .map(|item| {
319 let item = item?;
320 get_lf(&item)
321 })
322 .collect::<PyResult<Vec<_>>>()?;
323
324 let lf = dsl::functions::concat_lf_diagonal(
325 lfs,
326 UnionArgs {
327 rechunk,
328 parallel,
329 to_supertypes,
330 ..Default::default()
331 },
332 )
333 .map_err(PyPolarsErr::from)?;
334 Ok(lf.into())
335}
336
337#[pyfunction]
338pub fn concat_lf_horizontal(lfs: &Bound<'_, PyAny>, parallel: bool) -> PyResult<PyLazyFrame> {
339 let iter = lfs.try_iter()?;
340
341 let lfs = iter
342 .map(|item| {
343 let item = item?;
344 get_lf(&item)
345 })
346 .collect::<PyResult<Vec<_>>>()?;
347
348 let args = UnionArgs {
349 rechunk: false, parallel,
351 to_supertypes: false,
352 ..Default::default()
353 };
354 let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;
355 Ok(lf.into())
356}
357
358#[pyfunction]
359pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
360 let e = e.to_exprs();
361 let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
362 Ok(e.into())
363}
364
365#[pyfunction]
366#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
367pub fn duration(
368 weeks: Option<PyExpr>,
369 days: Option<PyExpr>,
370 hours: Option<PyExpr>,
371 minutes: Option<PyExpr>,
372 seconds: Option<PyExpr>,
373 milliseconds: Option<PyExpr>,
374 microseconds: Option<PyExpr>,
375 nanoseconds: Option<PyExpr>,
376 time_unit: Wrap<TimeUnit>,
377) -> PyExpr {
378 set_unwrapped_or_0!(
379 weeks,
380 days,
381 hours,
382 minutes,
383 seconds,
384 milliseconds,
385 microseconds,
386 nanoseconds,
387 );
388 let args = DurationArgs {
389 weeks,
390 days,
391 hours,
392 minutes,
393 seconds,
394 milliseconds,
395 microseconds,
396 nanoseconds,
397 time_unit: time_unit.0,
398 };
399 dsl::duration(args).into()
400}
401
402#[pyfunction]
403pub fn fold(
404 acc: PyExpr,
405 lambda: PyObject,
406 exprs: Vec<PyExpr>,
407 returns_scalar: bool,
408 return_dtype: Option<PyDataTypeExpr>,
409) -> PyExpr {
410 let exprs = exprs.to_exprs();
411 let func = PlanCallback::new_python(PythonObject(lambda));
412 dsl::fold_exprs(
413 acc.inner,
414 func,
415 exprs,
416 returns_scalar,
417 return_dtype.map(|w| w.inner),
418 )
419 .into()
420}
421
422#[pyfunction]
423pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
424 let py = value.py();
425 if value.is_instance_of::<PyBool>() {
426 let val = value.extract::<bool>()?;
427 Ok(dsl::lit(val).into())
428 } else if let Ok(int) = value.downcast::<PyInt>() {
429 let v = int
430 .extract::<i128>()
431 .map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
432 .map_err(PyPolarsErr::from)?;
433 Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())
434 } else if let Ok(float) = value.downcast::<PyFloat>() {
435 let val = float.extract::<f64>()?;
436 Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())
437 } else if let Ok(pystr) = value.downcast::<PyString>() {
438 Ok(dsl::lit(pystr.to_string()).into())
439 } else if let Ok(series) = value.extract::<PySeries>() {
440 let s = series.series;
441 if is_scalar {
442 let av = s
443 .get(0)
444 .map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
445 let av = av.into_static();
446 Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
447 } else {
448 Ok(dsl::lit(s).into())
449 }
450 } else if value.is_none() {
451 Ok(dsl::lit(Null {}).into())
452 } else if let Ok(value) = value.downcast::<PyBytes>() {
453 Ok(dsl::lit(value.as_bytes()).into())
454 } else {
455 let av = py_object_to_any_value(value, true, allow_object).map_err(|_| {
456 PyTypeError::new_err(
457 format!(
458 "cannot create expression literal for value of type {}.\
459 \n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
460 value.get_type().qualname().map(|s|s.to_string()).unwrap_or("unknown".to_owned()),
461 )
462 )
463 })?;
464 match av {
465 #[cfg(feature = "object")]
466 AnyValue::ObjectOwned(_) => {
467 let s = PySeries::new_object(py, "", vec![value.extract()?], false).series;
468 Ok(dsl::lit(s).into())
469 },
470 _ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
471 }
472 }
473}
474
475#[pyfunction]
476#[pyo3(signature = (pyexpr, lambda, output_type, map_groups, returns_scalar))]
477pub fn map_mul(
478 py: Python<'_>,
479 pyexpr: Vec<PyExpr>,
480 lambda: PyObject,
481 output_type: Option<Wrap<DataType>>,
482 map_groups: bool,
483 returns_scalar: bool,
484) -> PyExpr {
485 map::lazy::map_mul(&pyexpr, py, lambda, output_type, map_groups, returns_scalar)
486}
487
488#[pyfunction]
489pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
490 dsl::pearson_corr(a.inner, b.inner).into()
491}
492
493#[pyfunction]
494pub fn reduce(
495 lambda: PyObject,
496 exprs: Vec<PyExpr>,
497 returns_scalar: bool,
498 return_dtype: Option<PyDataTypeExpr>,
499) -> PyExpr {
500 let exprs = exprs.to_exprs();
501 let func = PlanCallback::new_python(PythonObject(lambda));
502 dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
503}
504
505#[pyfunction]
506#[pyo3(signature = (value, n, dtype=None))]
507pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {
508 let mut value = value.inner;
509 let n = n.inner;
510
511 if let Some(dtype) = dtype {
512 value = value.cast(dtype.0);
513 }
514
515 dsl::repeat(value, n).into()
516}
517
518#[pyfunction]
519pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
520 #[cfg(feature = "propagate_nans")]
521 {
522 dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
523 }
524 #[cfg(not(feature = "propagate_nans"))]
525 {
526 panic!("activate 'propagate_nans'")
527 }
528}
529
530#[pyfunction]
531#[cfg(feature = "sql")]
532pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
533 let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
534 Ok(expr.into())
535}