1use polars::lazy::dsl;
2use polars::prelude::*;
3use polars_plan::plans::DynLiteralValue;
4use polars_plan::prelude::UnionArgs;
5use polars_utils::python_function::PythonObject;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
9
10use crate::conversion::any_value::py_object_to_any_value;
11use crate::conversion::{Wrap, get_lf};
12use crate::error::PyPolarsErr;
13use crate::expr::ToExprs;
14use crate::expr::datatype::PyDataTypeExpr;
15use crate::lazyframe::PyOptFlags;
16use crate::utils::EnterPolarsExt;
17use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};
18
19macro_rules! set_unwrapped_or_0 {
20 ($($var:ident),+ $(,)?) => {
21 $(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
22 };
23}
24
25#[pyfunction]
26pub fn rolling_corr(
27 x: PyExpr,
28 y: PyExpr,
29 window_size: IdxSize,
30 min_periods: IdxSize,
31 ddof: u8,
32) -> PyExpr {
33 dsl::rolling_corr(
34 x.inner,
35 y.inner,
36 RollingCovOptions {
37 min_periods,
38 window_size,
39 ddof,
40 },
41 )
42 .into()
43}
44
45#[pyfunction]
46pub fn rolling_cov(
47 x: PyExpr,
48 y: PyExpr,
49 window_size: IdxSize,
50 min_periods: IdxSize,
51 ddof: u8,
52) -> PyExpr {
53 dsl::rolling_cov(
54 x.inner,
55 y.inner,
56 RollingCovOptions {
57 min_periods,
58 window_size,
59 ddof,
60 },
61 )
62 .into()
63}
64
65#[pyfunction]
66pub fn arg_sort_by(
67 by: Vec<PyExpr>,
68 descending: Vec<bool>,
69 nulls_last: Vec<bool>,
70 multithreaded: bool,
71 maintain_order: bool,
72) -> PyExpr {
73 let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
74 dsl::arg_sort_by(
75 by,
76 SortMultipleOptions {
77 descending,
78 nulls_last,
79 multithreaded,
80 maintain_order,
81 limit: None,
82 },
83 )
84 .into()
85}
86#[pyfunction]
87pub fn arg_where(condition: PyExpr) -> PyExpr {
88 dsl::arg_where(condition.inner).into()
89}
90
91#[pyfunction]
92pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
93 let exprs = exprs.to_exprs();
94 if exprs.is_empty() {
95 return Err(PyValueError::new_err(
96 "expected at least 1 expression in 'as_struct'",
97 ));
98 }
99 Ok(dsl::as_struct(exprs).into())
100}
101
102#[pyfunction]
103pub fn field(names: Vec<String>) -> PyExpr {
104 dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
105}
106
107#[pyfunction]
108pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
109 let exprs = exprs.to_exprs();
110 dsl::coalesce(&exprs).into()
111}
112
113#[pyfunction]
114pub fn col(name: &str) -> PyExpr {
115 dsl::col(name).into()
116}
117
118#[pyfunction]
119pub fn element() -> PyExpr {
120 dsl::element().into()
121}
122
123fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {
124 lfs.into_iter()
125 .map(|lf| lf.ldf.into_inner().logical_plan)
126 .collect()
127}
128
129#[pyfunction]
130pub fn collect_all(
131 lfs: Vec<PyLazyFrame>,
132 engine: Wrap<Engine>,
133 optflags: PyOptFlags,
134 py: Python<'_>,
135) -> PyResult<Vec<PyDataFrame>> {
136 let plans = lfs_to_plans(lfs);
137 let dfs = py.enter_polars(|| {
138 LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
139 })?;
140 Ok(dfs.into_iter().map(Into::into).collect())
141}
142
143#[pyfunction]
144pub fn collect_all_lazy(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags) -> PyResult<PyLazyFrame> {
145 let plans = lfs_to_plans(lfs);
146
147 for plan in &plans {
148 if !matches!(plan, DslPlan::Sink { .. }) {
149 return Err(PyValueError::new_err(
150 "all LazyFrames must end with a sink to use 'collect_all(lazy=True)'",
151 ));
152 }
153 }
154
155 Ok(LazyFrame::from_logical_plan(
156 DslPlan::SinkMultiple { inputs: plans },
157 optflags.inner.into_inner(),
158 )
159 .into())
160}
161
162#[pyfunction]
163pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {
164 let plans = lfs_to_plans(lfs);
165 let explained =
166 py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner.into_inner()))?;
167 Ok(explained)
168}
169
170#[pyfunction]
171pub fn collect_all_with_callback(
172 lfs: Vec<PyLazyFrame>,
173 engine: Wrap<Engine>,
174 optflags: PyOptFlags,
175 lambda: Py<PyAny>,
176 py: Python<'_>,
177) {
178 let plans = lfs
179 .into_iter()
180 .map(|lf| lf.ldf.into_inner().logical_plan)
181 .collect();
182 let result = py
183 .enter_polars(|| {
184 LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
185 })
186 .map(|dfs| {
187 dfs.into_iter()
188 .map(Into::into)
189 .collect::<Vec<PyDataFrame>>()
190 });
191
192 Python::attach(|py| match result {
193 Ok(dfs) => {
194 lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
195 },
196 Err(err) => {
197 lambda
198 .call1(py, (PyErr::from(err),))
199 .map_err(|err| err.restore(py))
200 .ok();
201 },
202 })
203}
204
205#[pyfunction]
206pub fn concat_lf(
207 seq: &Bound<'_, PyAny>,
208 rechunk: bool,
209 parallel: bool,
210 to_supertypes: bool,
211 maintain_order: bool,
212) -> PyResult<PyLazyFrame> {
213 let len = seq.len()?;
214 let mut lfs = Vec::with_capacity(len);
215
216 for res in seq.try_iter()? {
217 let item = res?;
218 let lf = get_lf(&item)?;
219 lfs.push(lf);
220 }
221
222 let lf = dsl::concat(
223 lfs,
224 UnionArgs {
225 rechunk,
226 parallel,
227 to_supertypes,
228 maintain_order,
229 ..Default::default()
230 },
231 )
232 .map_err(PyPolarsErr::from)?;
233 Ok(lf.into())
234}
235
236#[pyfunction]
237pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
238 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
239 let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
240 Ok(expr.into())
241}
242
243#[pyfunction]
244pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
245 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
246 let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
247 Ok(expr.into())
248}
249
250#[pyfunction]
251pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
252 let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
253 dsl::concat_str(s, separator, ignore_nulls).into()
254}
255
256#[pyfunction]
257pub fn len() -> PyExpr {
258 dsl::len().into()
259}
260
261#[pyfunction]
262pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
263 dsl::cov(a.inner, b.inner, ddof).into()
264}
265
266#[pyfunction]
267#[cfg(feature = "trigonometry")]
268pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
269 y.inner.arctan2(x.inner).into()
270}
271
272#[pyfunction]
273pub fn cum_fold(
274 acc: PyExpr,
275 lambda: Py<PyAny>,
276 exprs: Vec<PyExpr>,
277 returns_scalar: bool,
278 return_dtype: Option<PyDataTypeExpr>,
279 include_init: bool,
280) -> PyExpr {
281 let exprs = exprs.to_exprs();
282 let func = PlanCallback::new_python(PythonObject(lambda));
283 dsl::cum_fold_exprs(
284 acc.inner,
285 func,
286 exprs,
287 returns_scalar,
288 return_dtype.map(|v| v.inner),
289 include_init,
290 )
291 .into()
292}
293
294#[pyfunction]
295pub fn cum_reduce(
296 lambda: Py<PyAny>,
297 exprs: Vec<PyExpr>,
298 returns_scalar: bool,
299 return_dtype: Option<PyDataTypeExpr>,
300) -> PyExpr {
301 let exprs = exprs.to_exprs();
302
303 let func = PlanCallback::new_python(PythonObject(lambda));
304 dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
305}
306
307#[pyfunction]
308#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
309pub fn datetime(
310 year: PyExpr,
311 month: PyExpr,
312 day: PyExpr,
313 hour: Option<PyExpr>,
314 minute: Option<PyExpr>,
315 second: Option<PyExpr>,
316 microsecond: Option<PyExpr>,
317 time_unit: Wrap<TimeUnit>,
318 time_zone: Wrap<Option<TimeZone>>,
319 ambiguous: PyExpr,
320) -> PyExpr {
321 let year = year.inner;
322 let month = month.inner;
323 let day = day.inner;
324 set_unwrapped_or_0!(hour, minute, second, microsecond);
325 let ambiguous = ambiguous.inner;
326 let time_unit = time_unit.0;
327 let time_zone = time_zone.0;
328 let args = DatetimeArgs {
329 year,
330 month,
331 day,
332 hour,
333 minute,
334 second,
335 microsecond,
336 time_unit,
337 time_zone,
338 ambiguous,
339 };
340 dsl::datetime(args).into()
341}
342
343#[pyfunction]
344pub fn concat_lf_diagonal(
345 lfs: &Bound<'_, PyAny>,
346 rechunk: bool,
347 parallel: bool,
348 to_supertypes: bool,
349 maintain_order: bool,
350) -> PyResult<PyLazyFrame> {
351 let iter = lfs.try_iter()?;
352
353 let lfs = iter
354 .map(|item| {
355 let item = item?;
356 get_lf(&item)
357 })
358 .collect::<PyResult<Vec<_>>>()?;
359
360 let lf = dsl::functions::concat_lf_diagonal(
361 lfs,
362 UnionArgs {
363 rechunk,
364 parallel,
365 to_supertypes,
366 maintain_order,
367 ..Default::default()
368 },
369 )
370 .map_err(PyPolarsErr::from)?;
371 Ok(lf.into())
372}
373
374#[pyfunction]
375pub fn concat_lf_horizontal(
376 lfs: &Bound<'_, PyAny>,
377 parallel: bool,
378 strict: bool,
379) -> PyResult<PyLazyFrame> {
380 let iter = lfs.try_iter()?;
381
382 let lfs = iter
383 .map(|item| {
384 let item = item?;
385 get_lf(&item)
386 })
387 .collect::<PyResult<Vec<_>>>()?;
388
389 let lf = dsl::functions::concat_lf_horizontal(
390 lfs,
391 HConcatOptions {
392 parallel,
393 strict,
394 broadcast_unit_length: Default::default(),
395 },
396 )
397 .map_err(PyPolarsErr::from)?;
398 Ok(lf.into())
399}
400
401#[pyfunction]
402pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
403 let e = e.to_exprs();
404 let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
405 Ok(e.into())
406}
407
408#[pyfunction]
409#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
410pub fn duration(
411 weeks: Option<PyExpr>,
412 days: Option<PyExpr>,
413 hours: Option<PyExpr>,
414 minutes: Option<PyExpr>,
415 seconds: Option<PyExpr>,
416 milliseconds: Option<PyExpr>,
417 microseconds: Option<PyExpr>,
418 nanoseconds: Option<PyExpr>,
419 time_unit: Wrap<TimeUnit>,
420) -> PyExpr {
421 set_unwrapped_or_0!(
422 weeks,
423 days,
424 hours,
425 minutes,
426 seconds,
427 milliseconds,
428 microseconds,
429 nanoseconds,
430 );
431 let args = DurationArgs {
432 weeks,
433 days,
434 hours,
435 minutes,
436 seconds,
437 milliseconds,
438 microseconds,
439 nanoseconds,
440 time_unit: time_unit.0,
441 };
442 dsl::duration(args).into()
443}
444
445#[pyfunction]
446pub fn fold(
447 acc: PyExpr,
448 lambda: Py<PyAny>,
449 exprs: Vec<PyExpr>,
450 returns_scalar: bool,
451 return_dtype: Option<PyDataTypeExpr>,
452) -> PyExpr {
453 let exprs = exprs.to_exprs();
454 let func = PlanCallback::new_python(PythonObject(lambda));
455 dsl::fold_exprs(
456 acc.inner,
457 func,
458 exprs,
459 returns_scalar,
460 return_dtype.map(|w| w.inner),
461 )
462 .into()
463}
464
465#[pyfunction]
466pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
467 let py = value.py();
468 if value.is_instance_of::<PyBool>() {
469 let val = value.extract::<bool>()?;
470 Ok(dsl::lit(val).into())
471 } else if let Ok(int) = value.cast::<PyInt>() {
472 let v = int
473 .extract::<i128>()
474 .map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
475 .map_err(PyPolarsErr::from)?;
476 Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())
477 } else if let Ok(float) = value.cast::<PyFloat>() {
478 let val = float.extract::<f64>()?;
479 Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())
480 } else if let Ok(pystr) = value.cast::<PyString>() {
481 Ok(dsl::lit(pystr.to_string()).into())
482 } else if let Ok(series) = value.extract::<PySeries>() {
483 let s = series.series.into_inner();
484 if is_scalar {
485 let av = s
486 .get(0)
487 .map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
488 let av = av.into_static();
489 Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
490 } else {
491 Ok(dsl::lit(s).into())
492 }
493 } else if value.is_none() {
494 Ok(dsl::lit(Null {}).into())
495 } else if let Ok(value) = value.cast::<PyBytes>() {
496 Ok(dsl::lit(value.as_bytes()).into())
497 } else {
498 let raise = || {
499 PyTypeError::new_err(format!(
500 "cannot create expression literal for value of type {}.\
501 \n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
502 value
503 .get_type()
504 .qualname()
505 .map(|s| s.to_string())
506 .unwrap_or("unknown".to_owned()),
507 ))
508 };
509
510 let av = py_object_to_any_value(value, true, allow_object).map_err(|_| raise())?;
511 match av {
512 #[cfg(feature = "object")]
513 AnyValue::ObjectOwned(_) => {
514 if allow_object {
516 let s = PySeries::new_object(py, "", vec![value.extract()?], false)
517 .series
518 .into_inner();
519 Ok(dsl::lit(s).into())
520 } else {
521 Err(raise())
522 }
523 },
524 _ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
525 }
526 }
527}
528
529#[pyfunction]
530#[pyo3(signature = (pyexpr, lambda, output_type, is_elementwise, returns_scalar))]
531pub fn map_expr(
532 pyexpr: Vec<PyExpr>,
533 lambda: Py<PyAny>,
534 output_type: Option<PyDataTypeExpr>,
535 is_elementwise: bool,
536 returns_scalar: bool,
537) -> PyExpr {
538 map::lazy::map_expr(&pyexpr, lambda, output_type, is_elementwise, returns_scalar)
539}
540
541#[pyfunction]
542pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
543 dsl::pearson_corr(a.inner, b.inner).into()
544}
545
546#[pyfunction]
547pub fn reduce(
548 lambda: Py<PyAny>,
549 exprs: Vec<PyExpr>,
550 returns_scalar: bool,
551 return_dtype: Option<PyDataTypeExpr>,
552) -> PyExpr {
553 let exprs = exprs.to_exprs();
554 let func = PlanCallback::new_python(PythonObject(lambda));
555 dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
556}
557
558#[pyfunction]
559#[pyo3(signature = (value, n, dtype=None))]
560pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {
561 let mut value = value.inner;
562 let n = n.inner;
563
564 if let Some(dtype) = dtype {
565 value = value.cast(dtype.0);
566 }
567
568 dsl::repeat(value, n).into()
569}
570
571#[pyfunction]
572pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
573 #[cfg(feature = "propagate_nans")]
574 {
575 dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
576 }
577 #[cfg(not(feature = "propagate_nans"))]
578 {
579 panic!("activate 'propagate_nans'")
580 }
581}
582
583#[pyfunction]
584#[cfg(feature = "sql")]
585pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
586 let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
587 Ok(expr.into())
588}