polars_python/functions/
lazy.rs

1use polars::lazy::dsl;
2use polars::prelude::*;
3use polars_plan::plans::DynLiteralValue;
4use polars_plan::prelude::UnionArgs;
5use polars_utils::python_function::PythonObject;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
9
10use crate::conversion::any_value::py_object_to_any_value;
11use crate::conversion::{Wrap, get_lf};
12use crate::error::PyPolarsErr;
13use crate::expr::ToExprs;
14use crate::expr::datatype::PyDataTypeExpr;
15use crate::lazyframe::PyOptFlags;
16use crate::utils::EnterPolarsExt;
17use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};
18
19macro_rules! set_unwrapped_or_0 {
20    ($($var:ident),+ $(,)?) => {
21        $(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
22    };
23}
24
25#[pyfunction]
26pub fn rolling_corr(
27    x: PyExpr,
28    y: PyExpr,
29    window_size: IdxSize,
30    min_periods: IdxSize,
31    ddof: u8,
32) -> PyExpr {
33    dsl::rolling_corr(
34        x.inner,
35        y.inner,
36        RollingCovOptions {
37            min_periods,
38            window_size,
39            ddof,
40        },
41    )
42    .into()
43}
44
45#[pyfunction]
46pub fn rolling_cov(
47    x: PyExpr,
48    y: PyExpr,
49    window_size: IdxSize,
50    min_periods: IdxSize,
51    ddof: u8,
52) -> PyExpr {
53    dsl::rolling_cov(
54        x.inner,
55        y.inner,
56        RollingCovOptions {
57            min_periods,
58            window_size,
59            ddof,
60        },
61    )
62    .into()
63}
64
65#[pyfunction]
66pub fn arg_sort_by(
67    by: Vec<PyExpr>,
68    descending: Vec<bool>,
69    nulls_last: Vec<bool>,
70    multithreaded: bool,
71    maintain_order: bool,
72) -> PyExpr {
73    let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
74    dsl::arg_sort_by(
75        by,
76        SortMultipleOptions {
77            descending,
78            nulls_last,
79            multithreaded,
80            maintain_order,
81            limit: None,
82        },
83    )
84    .into()
85}
86#[pyfunction]
87pub fn arg_where(condition: PyExpr) -> PyExpr {
88    dsl::arg_where(condition.inner).into()
89}
90
91#[pyfunction]
92pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
93    let exprs = exprs.to_exprs();
94    if exprs.is_empty() {
95        return Err(PyValueError::new_err(
96            "expected at least 1 expression in 'as_struct'",
97        ));
98    }
99    Ok(dsl::as_struct(exprs).into())
100}
101
102#[pyfunction]
103pub fn field(names: Vec<String>) -> PyExpr {
104    dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
105}
106
107#[pyfunction]
108pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
109    let exprs = exprs.to_exprs();
110    dsl::coalesce(&exprs).into()
111}
112
113#[pyfunction]
114pub fn col(name: &str) -> PyExpr {
115    dsl::col(name).into()
116}
117
118#[pyfunction]
119pub fn element() -> PyExpr {
120    dsl::element().into()
121}
122
123fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {
124    lfs.into_iter()
125        .map(|lf| lf.ldf.into_inner().logical_plan)
126        .collect()
127}
128
129#[pyfunction]
130pub fn collect_all(
131    lfs: Vec<PyLazyFrame>,
132    engine: Wrap<Engine>,
133    optflags: PyOptFlags,
134    py: Python<'_>,
135) -> PyResult<Vec<PyDataFrame>> {
136    let plans = lfs_to_plans(lfs);
137    let dfs = py.enter_polars(|| {
138        LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
139    })?;
140    Ok(dfs.into_iter().map(Into::into).collect())
141}
142
143#[pyfunction]
144pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {
145    let plans = lfs_to_plans(lfs);
146    let explained =
147        py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner.into_inner()))?;
148    Ok(explained)
149}
150
151#[pyfunction]
152pub fn collect_all_with_callback(
153    lfs: Vec<PyLazyFrame>,
154    engine: Wrap<Engine>,
155    optflags: PyOptFlags,
156    lambda: Py<PyAny>,
157    py: Python<'_>,
158) {
159    let plans = lfs
160        .into_iter()
161        .map(|lf| lf.ldf.into_inner().logical_plan)
162        .collect();
163    let result = py
164        .enter_polars(|| {
165            LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
166        })
167        .map(|dfs| {
168            dfs.into_iter()
169                .map(Into::into)
170                .collect::<Vec<PyDataFrame>>()
171        });
172
173    Python::attach(|py| match result {
174        Ok(dfs) => {
175            lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
176        },
177        Err(err) => {
178            lambda
179                .call1(py, (PyErr::from(err),))
180                .map_err(|err| err.restore(py))
181                .ok();
182        },
183    })
184}
185
186#[pyfunction]
187pub fn concat_lf(
188    seq: &Bound<'_, PyAny>,
189    rechunk: bool,
190    parallel: bool,
191    to_supertypes: bool,
192    maintain_order: bool,
193) -> PyResult<PyLazyFrame> {
194    let len = seq.len()?;
195    let mut lfs = Vec::with_capacity(len);
196
197    for res in seq.try_iter()? {
198        let item = res?;
199        let lf = get_lf(&item)?;
200        lfs.push(lf);
201    }
202
203    let lf = dsl::concat(
204        lfs,
205        UnionArgs {
206            rechunk,
207            parallel,
208            to_supertypes,
209            maintain_order,
210            ..Default::default()
211        },
212    )
213    .map_err(PyPolarsErr::from)?;
214    Ok(lf.into())
215}
216
217#[pyfunction]
218pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
219    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
220    let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
221    Ok(expr.into())
222}
223
224#[pyfunction]
225pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
226    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
227    let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
228    Ok(expr.into())
229}
230
231#[pyfunction]
232pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
233    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
234    dsl::concat_str(s, separator, ignore_nulls).into()
235}
236
237#[pyfunction]
238pub fn len() -> PyExpr {
239    dsl::len().into()
240}
241
242#[pyfunction]
243pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
244    dsl::cov(a.inner, b.inner, ddof).into()
245}
246
247#[pyfunction]
248#[cfg(feature = "trigonometry")]
249pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
250    y.inner.arctan2(x.inner).into()
251}
252
253#[pyfunction]
254pub fn cum_fold(
255    acc: PyExpr,
256    lambda: Py<PyAny>,
257    exprs: Vec<PyExpr>,
258    returns_scalar: bool,
259    return_dtype: Option<PyDataTypeExpr>,
260    include_init: bool,
261) -> PyExpr {
262    let exprs = exprs.to_exprs();
263    let func = PlanCallback::new_python(PythonObject(lambda));
264    dsl::cum_fold_exprs(
265        acc.inner,
266        func,
267        exprs,
268        returns_scalar,
269        return_dtype.map(|v| v.inner),
270        include_init,
271    )
272    .into()
273}
274
275#[pyfunction]
276pub fn cum_reduce(
277    lambda: Py<PyAny>,
278    exprs: Vec<PyExpr>,
279    returns_scalar: bool,
280    return_dtype: Option<PyDataTypeExpr>,
281) -> PyExpr {
282    let exprs = exprs.to_exprs();
283
284    let func = PlanCallback::new_python(PythonObject(lambda));
285    dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
286}
287
288#[pyfunction]
289#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
290pub fn datetime(
291    year: PyExpr,
292    month: PyExpr,
293    day: PyExpr,
294    hour: Option<PyExpr>,
295    minute: Option<PyExpr>,
296    second: Option<PyExpr>,
297    microsecond: Option<PyExpr>,
298    time_unit: Wrap<TimeUnit>,
299    time_zone: Wrap<Option<TimeZone>>,
300    ambiguous: PyExpr,
301) -> PyExpr {
302    let year = year.inner;
303    let month = month.inner;
304    let day = day.inner;
305    set_unwrapped_or_0!(hour, minute, second, microsecond);
306    let ambiguous = ambiguous.inner;
307    let time_unit = time_unit.0;
308    let time_zone = time_zone.0;
309    let args = DatetimeArgs {
310        year,
311        month,
312        day,
313        hour,
314        minute,
315        second,
316        microsecond,
317        time_unit,
318        time_zone,
319        ambiguous,
320    };
321    dsl::datetime(args).into()
322}
323
324#[pyfunction]
325pub fn concat_lf_diagonal(
326    lfs: &Bound<'_, PyAny>,
327    rechunk: bool,
328    parallel: bool,
329    to_supertypes: bool,
330    maintain_order: bool,
331) -> PyResult<PyLazyFrame> {
332    let iter = lfs.try_iter()?;
333
334    let lfs = iter
335        .map(|item| {
336            let item = item?;
337            get_lf(&item)
338        })
339        .collect::<PyResult<Vec<_>>>()?;
340
341    let lf = dsl::functions::concat_lf_diagonal(
342        lfs,
343        UnionArgs {
344            rechunk,
345            parallel,
346            to_supertypes,
347            maintain_order,
348            ..Default::default()
349        },
350    )
351    .map_err(PyPolarsErr::from)?;
352    Ok(lf.into())
353}
354
355#[pyfunction]
356pub fn concat_lf_horizontal(lfs: &Bound<'_, PyAny>, parallel: bool) -> PyResult<PyLazyFrame> {
357    let iter = lfs.try_iter()?;
358
359    let lfs = iter
360        .map(|item| {
361            let item = item?;
362            get_lf(&item)
363        })
364        .collect::<PyResult<Vec<_>>>()?;
365
366    let args = UnionArgs {
367        rechunk: false, // No need to rechunk with horizontal concatenation
368        parallel,
369        to_supertypes: false,
370        ..Default::default()
371    };
372    let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;
373    Ok(lf.into())
374}
375
376#[pyfunction]
377pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
378    let e = e.to_exprs();
379    let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
380    Ok(e.into())
381}
382
383#[pyfunction]
384#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
385pub fn duration(
386    weeks: Option<PyExpr>,
387    days: Option<PyExpr>,
388    hours: Option<PyExpr>,
389    minutes: Option<PyExpr>,
390    seconds: Option<PyExpr>,
391    milliseconds: Option<PyExpr>,
392    microseconds: Option<PyExpr>,
393    nanoseconds: Option<PyExpr>,
394    time_unit: Wrap<TimeUnit>,
395) -> PyExpr {
396    set_unwrapped_or_0!(
397        weeks,
398        days,
399        hours,
400        minutes,
401        seconds,
402        milliseconds,
403        microseconds,
404        nanoseconds,
405    );
406    let args = DurationArgs {
407        weeks,
408        days,
409        hours,
410        minutes,
411        seconds,
412        milliseconds,
413        microseconds,
414        nanoseconds,
415        time_unit: time_unit.0,
416    };
417    dsl::duration(args).into()
418}
419
420#[pyfunction]
421pub fn fold(
422    acc: PyExpr,
423    lambda: Py<PyAny>,
424    exprs: Vec<PyExpr>,
425    returns_scalar: bool,
426    return_dtype: Option<PyDataTypeExpr>,
427) -> PyExpr {
428    let exprs = exprs.to_exprs();
429    let func = PlanCallback::new_python(PythonObject(lambda));
430    dsl::fold_exprs(
431        acc.inner,
432        func,
433        exprs,
434        returns_scalar,
435        return_dtype.map(|w| w.inner),
436    )
437    .into()
438}
439
440#[pyfunction]
441pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
442    let py = value.py();
443    if value.is_instance_of::<PyBool>() {
444        let val = value.extract::<bool>()?;
445        Ok(dsl::lit(val).into())
446    } else if let Ok(int) = value.downcast::<PyInt>() {
447        let v = int
448            .extract::<i128>()
449            .map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
450            .map_err(PyPolarsErr::from)?;
451        Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())
452    } else if let Ok(float) = value.downcast::<PyFloat>() {
453        let val = float.extract::<f64>()?;
454        Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())
455    } else if let Ok(pystr) = value.downcast::<PyString>() {
456        Ok(dsl::lit(pystr.to_string()).into())
457    } else if let Ok(series) = value.extract::<PySeries>() {
458        let s = series.series.into_inner();
459        if is_scalar {
460            let av = s
461                .get(0)
462                .map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
463            let av = av.into_static();
464            Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
465        } else {
466            Ok(dsl::lit(s).into())
467        }
468    } else if value.is_none() {
469        Ok(dsl::lit(Null {}).into())
470    } else if let Ok(value) = value.downcast::<PyBytes>() {
471        Ok(dsl::lit(value.as_bytes()).into())
472    } else {
473        let av = py_object_to_any_value(value, true, allow_object).map_err(|_| {
474            PyTypeError::new_err(
475                format!(
476                    "cannot create expression literal for value of type {}.\
477                    \n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
478                    value.get_type().qualname().map(|s|s.to_string()).unwrap_or("unknown".to_owned()),
479                )
480            )
481        })?;
482        match av {
483            #[cfg(feature = "object")]
484            AnyValue::ObjectOwned(_) => {
485                let s = PySeries::new_object(py, "", vec![value.extract()?], false)
486                    .series
487                    .into_inner();
488                Ok(dsl::lit(s).into())
489            },
490            _ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
491        }
492    }
493}
494
495#[pyfunction]
496#[pyo3(signature = (pyexpr, lambda, output_type, is_elementwise, returns_scalar))]
497pub fn map_expr(
498    pyexpr: Vec<PyExpr>,
499    lambda: Py<PyAny>,
500    output_type: Option<PyDataTypeExpr>,
501    is_elementwise: bool,
502    returns_scalar: bool,
503) -> PyExpr {
504    map::lazy::map_expr(&pyexpr, lambda, output_type, is_elementwise, returns_scalar)
505}
506
507#[pyfunction]
508pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
509    dsl::pearson_corr(a.inner, b.inner).into()
510}
511
512#[pyfunction]
513pub fn reduce(
514    lambda: Py<PyAny>,
515    exprs: Vec<PyExpr>,
516    returns_scalar: bool,
517    return_dtype: Option<PyDataTypeExpr>,
518) -> PyExpr {
519    let exprs = exprs.to_exprs();
520    let func = PlanCallback::new_python(PythonObject(lambda));
521    dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
522}
523
524#[pyfunction]
525#[pyo3(signature = (value, n, dtype=None))]
526pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {
527    let mut value = value.inner;
528    let n = n.inner;
529
530    if let Some(dtype) = dtype {
531        value = value.cast(dtype.0);
532    }
533
534    dsl::repeat(value, n).into()
535}
536
537#[pyfunction]
538pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
539    #[cfg(feature = "propagate_nans")]
540    {
541        dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
542    }
543    #[cfg(not(feature = "propagate_nans"))]
544    {
545        panic!("activate 'propagate_nans'")
546    }
547}
548
549#[pyfunction]
550#[cfg(feature = "sql")]
551pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
552    let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
553    Ok(expr.into())
554}