polars_python/functions/
lazy.rs

1use polars::lazy::dsl;
2use polars::prelude::*;
3use polars_plan::plans::DynLiteralValue;
4use polars_plan::prelude::UnionArgs;
5use polars_utils::python_function::PythonObject;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
9
10use crate::conversion::any_value::py_object_to_any_value;
11use crate::conversion::{Wrap, get_lf};
12use crate::error::PyPolarsErr;
13use crate::expr::ToExprs;
14use crate::expr::datatype::PyDataTypeExpr;
15use crate::lazyframe::PyOptFlags;
16use crate::utils::EnterPolarsExt;
17use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};
18
19macro_rules! set_unwrapped_or_0 {
20    ($($var:ident),+ $(,)?) => {
21        $(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
22    };
23}
24
25#[pyfunction]
26pub fn rolling_corr(
27    x: PyExpr,
28    y: PyExpr,
29    window_size: IdxSize,
30    min_periods: IdxSize,
31    ddof: u8,
32) -> PyExpr {
33    dsl::rolling_corr(
34        x.inner,
35        y.inner,
36        RollingCovOptions {
37            min_periods,
38            window_size,
39            ddof,
40        },
41    )
42    .into()
43}
44
45#[pyfunction]
46pub fn rolling_cov(
47    x: PyExpr,
48    y: PyExpr,
49    window_size: IdxSize,
50    min_periods: IdxSize,
51    ddof: u8,
52) -> PyExpr {
53    dsl::rolling_cov(
54        x.inner,
55        y.inner,
56        RollingCovOptions {
57            min_periods,
58            window_size,
59            ddof,
60        },
61    )
62    .into()
63}
64
65#[pyfunction]
66pub fn arg_sort_by(
67    by: Vec<PyExpr>,
68    descending: Vec<bool>,
69    nulls_last: Vec<bool>,
70    multithreaded: bool,
71    maintain_order: bool,
72) -> PyExpr {
73    let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
74    dsl::arg_sort_by(
75        by,
76        SortMultipleOptions {
77            descending,
78            nulls_last,
79            multithreaded,
80            maintain_order,
81            limit: None,
82        },
83    )
84    .into()
85}
86#[pyfunction]
87pub fn arg_where(condition: PyExpr) -> PyExpr {
88    dsl::arg_where(condition.inner).into()
89}
90
91#[pyfunction]
92pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
93    let exprs = exprs.to_exprs();
94    if exprs.is_empty() {
95        return Err(PyValueError::new_err(
96            "expected at least 1 expression in 'as_struct'",
97        ));
98    }
99    Ok(dsl::as_struct(exprs).into())
100}
101
102#[pyfunction]
103pub fn field(names: Vec<String>) -> PyExpr {
104    dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
105}
106
107#[pyfunction]
108pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
109    let exprs = exprs.to_exprs();
110    dsl::coalesce(&exprs).into()
111}
112
113#[pyfunction]
114pub fn col(name: &str) -> PyExpr {
115    dsl::col(name).into()
116}
117
118fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {
119    lfs.into_iter()
120        .map(|lf| lf.ldf.into_inner().logical_plan)
121        .collect()
122}
123
124#[pyfunction]
125pub fn collect_all(
126    lfs: Vec<PyLazyFrame>,
127    engine: Wrap<Engine>,
128    optflags: PyOptFlags,
129    py: Python<'_>,
130) -> PyResult<Vec<PyDataFrame>> {
131    let plans = lfs_to_plans(lfs);
132    let dfs = py.enter_polars(|| {
133        LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
134    })?;
135    Ok(dfs.into_iter().map(Into::into).collect())
136}
137
138#[pyfunction]
139pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {
140    let plans = lfs_to_plans(lfs);
141    let explained =
142        py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner.into_inner()))?;
143    Ok(explained)
144}
145
146#[pyfunction]
147pub fn collect_all_with_callback(
148    lfs: Vec<PyLazyFrame>,
149    engine: Wrap<Engine>,
150    optflags: PyOptFlags,
151    lambda: PyObject,
152    py: Python<'_>,
153) {
154    let plans = lfs
155        .into_iter()
156        .map(|lf| lf.ldf.into_inner().logical_plan)
157        .collect();
158    let result = py
159        .enter_polars(|| {
160            LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
161        })
162        .map(|dfs| {
163            dfs.into_iter()
164                .map(Into::into)
165                .collect::<Vec<PyDataFrame>>()
166        });
167
168    Python::with_gil(|py| match result {
169        Ok(dfs) => {
170            lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
171        },
172        Err(err) => {
173            lambda
174                .call1(py, (PyErr::from(err),))
175                .map_err(|err| err.restore(py))
176                .ok();
177        },
178    })
179}
180
181#[pyfunction]
182pub fn concat_lf(
183    seq: &Bound<'_, PyAny>,
184    rechunk: bool,
185    parallel: bool,
186    to_supertypes: bool,
187) -> PyResult<PyLazyFrame> {
188    let len = seq.len()?;
189    let mut lfs = Vec::with_capacity(len);
190
191    for res in seq.try_iter()? {
192        let item = res?;
193        let lf = get_lf(&item)?;
194        lfs.push(lf);
195    }
196
197    let lf = dsl::concat(
198        lfs,
199        UnionArgs {
200            rechunk,
201            parallel,
202            to_supertypes,
203            ..Default::default()
204        },
205    )
206    .map_err(PyPolarsErr::from)?;
207    Ok(lf.into())
208}
209
210#[pyfunction]
211pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
212    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
213    let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
214    Ok(expr.into())
215}
216
217#[pyfunction]
218pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
219    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
220    let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
221    Ok(expr.into())
222}
223
224#[pyfunction]
225pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
226    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
227    dsl::concat_str(s, separator, ignore_nulls).into()
228}
229
230#[pyfunction]
231pub fn len() -> PyExpr {
232    dsl::len().into()
233}
234
235#[pyfunction]
236pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
237    dsl::cov(a.inner, b.inner, ddof).into()
238}
239
240#[pyfunction]
241#[cfg(feature = "trigonometry")]
242pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
243    y.inner.arctan2(x.inner).into()
244}
245
246#[pyfunction]
247pub fn cum_fold(
248    acc: PyExpr,
249    lambda: PyObject,
250    exprs: Vec<PyExpr>,
251    returns_scalar: bool,
252    return_dtype: Option<PyDataTypeExpr>,
253    include_init: bool,
254) -> PyExpr {
255    let exprs = exprs.to_exprs();
256    let func = PlanCallback::new_python(PythonObject(lambda));
257    dsl::cum_fold_exprs(
258        acc.inner,
259        func,
260        exprs,
261        returns_scalar,
262        return_dtype.map(|v| v.inner),
263        include_init,
264    )
265    .into()
266}
267
268#[pyfunction]
269pub fn cum_reduce(
270    lambda: PyObject,
271    exprs: Vec<PyExpr>,
272    returns_scalar: bool,
273    return_dtype: Option<PyDataTypeExpr>,
274) -> PyExpr {
275    let exprs = exprs.to_exprs();
276
277    let func = PlanCallback::new_python(PythonObject(lambda));
278    dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
279}
280
281#[pyfunction]
282#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
283pub fn datetime(
284    year: PyExpr,
285    month: PyExpr,
286    day: PyExpr,
287    hour: Option<PyExpr>,
288    minute: Option<PyExpr>,
289    second: Option<PyExpr>,
290    microsecond: Option<PyExpr>,
291    time_unit: Wrap<TimeUnit>,
292    time_zone: Wrap<Option<TimeZone>>,
293    ambiguous: PyExpr,
294) -> PyExpr {
295    let year = year.inner;
296    let month = month.inner;
297    let day = day.inner;
298    set_unwrapped_or_0!(hour, minute, second, microsecond);
299    let ambiguous = ambiguous.inner;
300    let time_unit = time_unit.0;
301    let time_zone = time_zone.0;
302    let args = DatetimeArgs {
303        year,
304        month,
305        day,
306        hour,
307        minute,
308        second,
309        microsecond,
310        time_unit,
311        time_zone,
312        ambiguous,
313    };
314    dsl::datetime(args).into()
315}
316
317#[pyfunction]
318pub fn concat_lf_diagonal(
319    lfs: &Bound<'_, PyAny>,
320    rechunk: bool,
321    parallel: bool,
322    to_supertypes: bool,
323) -> PyResult<PyLazyFrame> {
324    let iter = lfs.try_iter()?;
325
326    let lfs = iter
327        .map(|item| {
328            let item = item?;
329            get_lf(&item)
330        })
331        .collect::<PyResult<Vec<_>>>()?;
332
333    let lf = dsl::functions::concat_lf_diagonal(
334        lfs,
335        UnionArgs {
336            rechunk,
337            parallel,
338            to_supertypes,
339            ..Default::default()
340        },
341    )
342    .map_err(PyPolarsErr::from)?;
343    Ok(lf.into())
344}
345
346#[pyfunction]
347pub fn concat_lf_horizontal(lfs: &Bound<'_, PyAny>, parallel: bool) -> PyResult<PyLazyFrame> {
348    let iter = lfs.try_iter()?;
349
350    let lfs = iter
351        .map(|item| {
352            let item = item?;
353            get_lf(&item)
354        })
355        .collect::<PyResult<Vec<_>>>()?;
356
357    let args = UnionArgs {
358        rechunk: false, // No need to rechunk with horizontal concatenation
359        parallel,
360        to_supertypes: false,
361        ..Default::default()
362    };
363    let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;
364    Ok(lf.into())
365}
366
367#[pyfunction]
368pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
369    let e = e.to_exprs();
370    let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
371    Ok(e.into())
372}
373
374#[pyfunction]
375#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
376pub fn duration(
377    weeks: Option<PyExpr>,
378    days: Option<PyExpr>,
379    hours: Option<PyExpr>,
380    minutes: Option<PyExpr>,
381    seconds: Option<PyExpr>,
382    milliseconds: Option<PyExpr>,
383    microseconds: Option<PyExpr>,
384    nanoseconds: Option<PyExpr>,
385    time_unit: Wrap<TimeUnit>,
386) -> PyExpr {
387    set_unwrapped_or_0!(
388        weeks,
389        days,
390        hours,
391        minutes,
392        seconds,
393        milliseconds,
394        microseconds,
395        nanoseconds,
396    );
397    let args = DurationArgs {
398        weeks,
399        days,
400        hours,
401        minutes,
402        seconds,
403        milliseconds,
404        microseconds,
405        nanoseconds,
406        time_unit: time_unit.0,
407    };
408    dsl::duration(args).into()
409}
410
411#[pyfunction]
412pub fn fold(
413    acc: PyExpr,
414    lambda: PyObject,
415    exprs: Vec<PyExpr>,
416    returns_scalar: bool,
417    return_dtype: Option<PyDataTypeExpr>,
418) -> PyExpr {
419    let exprs = exprs.to_exprs();
420    let func = PlanCallback::new_python(PythonObject(lambda));
421    dsl::fold_exprs(
422        acc.inner,
423        func,
424        exprs,
425        returns_scalar,
426        return_dtype.map(|w| w.inner),
427    )
428    .into()
429}
430
431#[pyfunction]
432pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
433    let py = value.py();
434    if value.is_instance_of::<PyBool>() {
435        let val = value.extract::<bool>()?;
436        Ok(dsl::lit(val).into())
437    } else if let Ok(int) = value.downcast::<PyInt>() {
438        let v = int
439            .extract::<i128>()
440            .map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
441            .map_err(PyPolarsErr::from)?;
442        Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())
443    } else if let Ok(float) = value.downcast::<PyFloat>() {
444        let val = float.extract::<f64>()?;
445        Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())
446    } else if let Ok(pystr) = value.downcast::<PyString>() {
447        Ok(dsl::lit(pystr.to_string()).into())
448    } else if let Ok(series) = value.extract::<PySeries>() {
449        let s = series.series.into_inner();
450        if is_scalar {
451            let av = s
452                .get(0)
453                .map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
454            let av = av.into_static();
455            Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
456        } else {
457            Ok(dsl::lit(s).into())
458        }
459    } else if value.is_none() {
460        Ok(dsl::lit(Null {}).into())
461    } else if let Ok(value) = value.downcast::<PyBytes>() {
462        Ok(dsl::lit(value.as_bytes()).into())
463    } else {
464        let av = py_object_to_any_value(value, true, allow_object).map_err(|_| {
465            PyTypeError::new_err(
466                format!(
467                    "cannot create expression literal for value of type {}.\
468                    \n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
469                    value.get_type().qualname().map(|s|s.to_string()).unwrap_or("unknown".to_owned()),
470                )
471            )
472        })?;
473        match av {
474            #[cfg(feature = "object")]
475            AnyValue::ObjectOwned(_) => {
476                let s = PySeries::new_object(py, "", vec![value.extract()?], false)
477                    .series
478                    .into_inner();
479                Ok(dsl::lit(s).into())
480            },
481            _ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
482        }
483    }
484}
485
486#[pyfunction]
487#[pyo3(signature = (pyexpr, lambda, output_type, is_elementwise, returns_scalar))]
488pub fn map_expr(
489    pyexpr: Vec<PyExpr>,
490    lambda: PyObject,
491    output_type: Option<PyDataTypeExpr>,
492    is_elementwise: bool,
493    returns_scalar: bool,
494) -> PyExpr {
495    map::lazy::map_expr(&pyexpr, lambda, output_type, is_elementwise, returns_scalar)
496}
497
498#[pyfunction]
499pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
500    dsl::pearson_corr(a.inner, b.inner).into()
501}
502
503#[pyfunction]
504pub fn reduce(
505    lambda: PyObject,
506    exprs: Vec<PyExpr>,
507    returns_scalar: bool,
508    return_dtype: Option<PyDataTypeExpr>,
509) -> PyExpr {
510    let exprs = exprs.to_exprs();
511    let func = PlanCallback::new_python(PythonObject(lambda));
512    dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
513}
514
515#[pyfunction]
516#[pyo3(signature = (value, n, dtype=None))]
517pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {
518    let mut value = value.inner;
519    let n = n.inner;
520
521    if let Some(dtype) = dtype {
522        value = value.cast(dtype.0);
523    }
524
525    dsl::repeat(value, n).into()
526}
527
528#[pyfunction]
529pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
530    #[cfg(feature = "propagate_nans")]
531    {
532        dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
533    }
534    #[cfg(not(feature = "propagate_nans"))]
535    {
536        panic!("activate 'propagate_nans'")
537    }
538}
539
540#[pyfunction]
541#[cfg(feature = "sql")]
542pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
543    let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
544    Ok(expr.into())
545}