polars_python/functions/
lazy.rs

1use polars::lazy::dsl;
2use polars::prelude::*;
3use polars_plan::prelude::UnionArgs;
4use pyo3::exceptions::{PyTypeError, PyValueError};
5use pyo3::prelude::*;
6use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
7
8use crate::conversion::any_value::py_object_to_any_value;
9use crate::conversion::{get_lf, Wrap};
10use crate::error::PyPolarsErr;
11use crate::expr::ToExprs;
12use crate::map::lazy::binary_lambda;
13use crate::prelude::vec_extract_wrapped;
14use crate::{map, PyDataFrame, PyExpr, PyLazyFrame, PySeries};
15
16macro_rules! set_unwrapped_or_0 {
17    ($($var:ident),+ $(,)?) => {
18        $(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
19    };
20}
21
22#[pyfunction]
23pub fn rolling_corr(
24    x: PyExpr,
25    y: PyExpr,
26    window_size: IdxSize,
27    min_periods: IdxSize,
28    ddof: u8,
29) -> PyExpr {
30    dsl::rolling_corr(
31        x.inner,
32        y.inner,
33        RollingCovOptions {
34            min_periods,
35            window_size,
36            ddof,
37        },
38    )
39    .into()
40}
41
42#[pyfunction]
43pub fn rolling_cov(
44    x: PyExpr,
45    y: PyExpr,
46    window_size: IdxSize,
47    min_periods: IdxSize,
48    ddof: u8,
49) -> PyExpr {
50    dsl::rolling_cov(
51        x.inner,
52        y.inner,
53        RollingCovOptions {
54            min_periods,
55            window_size,
56            ddof,
57        },
58    )
59    .into()
60}
61
62#[pyfunction]
63pub fn arg_sort_by(
64    by: Vec<PyExpr>,
65    descending: Vec<bool>,
66    nulls_last: Vec<bool>,
67    multithreaded: bool,
68    maintain_order: bool,
69) -> PyExpr {
70    let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
71    dsl::arg_sort_by(
72        by,
73        SortMultipleOptions {
74            descending,
75            nulls_last,
76            multithreaded,
77            maintain_order,
78            limit: None,
79        },
80    )
81    .into()
82}
83#[pyfunction]
84pub fn arg_where(condition: PyExpr) -> PyExpr {
85    dsl::arg_where(condition.inner).into()
86}
87
88#[pyfunction]
89pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
90    let exprs = exprs.to_exprs();
91    if exprs.is_empty() {
92        return Err(PyValueError::new_err(
93            "expected at least 1 expression in 'as_struct'",
94        ));
95    }
96    Ok(dsl::as_struct(exprs).into())
97}
98
99#[pyfunction]
100pub fn field(names: Vec<String>) -> PyExpr {
101    dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
102}
103
104#[pyfunction]
105pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
106    let exprs = exprs.to_exprs();
107    dsl::coalesce(&exprs).into()
108}
109
110#[pyfunction]
111pub fn col(name: &str) -> PyExpr {
112    dsl::col(name).into()
113}
114
115#[pyfunction]
116pub fn collect_all(lfs: Vec<PyLazyFrame>, py: Python) -> PyResult<Vec<PyDataFrame>> {
117    use polars_core::utils::rayon::prelude::*;
118
119    let out = py.allow_threads(|| {
120        polars_core::POOL.install(|| {
121            lfs.par_iter()
122                .map(|lf| {
123                    let df = lf.ldf.clone().collect()?;
124                    Ok(PyDataFrame::new(df))
125                })
126                .collect::<polars_core::error::PolarsResult<Vec<_>>>()
127                .map_err(PyPolarsErr::from)
128        })
129    });
130
131    Ok(out?)
132}
133
134#[pyfunction]
135pub fn collect_all_with_callback(lfs: Vec<PyLazyFrame>, lambda: PyObject) {
136    use polars_core::utils::rayon::prelude::*;
137
138    polars_core::POOL.spawn(move || {
139        let result = lfs
140            .par_iter()
141            .map(|lf| {
142                let df = lf.ldf.clone().collect()?;
143                Ok(PyDataFrame::new(df))
144            })
145            .collect::<polars_core::error::PolarsResult<Vec<_>>>()
146            .map_err(PyPolarsErr::from);
147
148        Python::with_gil(|py| match result {
149            Ok(dfs) => {
150                lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
151            },
152            Err(err) => {
153                lambda
154                    .call1(py, (PyErr::from(err),))
155                    .map_err(|err| err.restore(py))
156                    .ok();
157            },
158        })
159    })
160}
161
162#[pyfunction]
163pub fn cols(names: Vec<String>) -> PyExpr {
164    dsl::cols(names).into()
165}
166
167#[pyfunction]
168pub fn concat_lf(
169    seq: &Bound<'_, PyAny>,
170    rechunk: bool,
171    parallel: bool,
172    to_supertypes: bool,
173) -> PyResult<PyLazyFrame> {
174    let len = seq.len()?;
175    let mut lfs = Vec::with_capacity(len);
176
177    for res in seq.try_iter()? {
178        let item = res?;
179        let lf = get_lf(&item)?;
180        lfs.push(lf);
181    }
182
183    let lf = dsl::concat(
184        lfs,
185        UnionArgs {
186            rechunk,
187            parallel,
188            to_supertypes,
189            ..Default::default()
190        },
191    )
192    .map_err(PyPolarsErr::from)?;
193    Ok(lf.into())
194}
195
196#[pyfunction]
197pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
198    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
199    let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
200    Ok(expr.into())
201}
202
203#[pyfunction]
204pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
205    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
206    let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
207    Ok(expr.into())
208}
209
210#[pyfunction]
211pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
212    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
213    dsl::concat_str(s, separator, ignore_nulls).into()
214}
215
216#[pyfunction]
217pub fn len() -> PyExpr {
218    dsl::len().into()
219}
220
221#[pyfunction]
222pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
223    dsl::cov(a.inner, b.inner, ddof).into()
224}
225
226#[pyfunction]
227#[cfg(feature = "trigonometry")]
228pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
229    y.inner.arctan2(x.inner).into()
230}
231
232#[pyfunction]
233pub fn cum_fold(acc: PyExpr, lambda: PyObject, exprs: Vec<PyExpr>, include_init: bool) -> PyExpr {
234    let exprs = exprs.to_exprs();
235
236    let func = move |a: Column, b: Column| {
237        binary_lambda(
238            &lambda,
239            a.take_materialized_series(),
240            b.take_materialized_series(),
241        )
242        .map(|v| v.map(Column::from))
243    };
244    dsl::cum_fold_exprs(acc.inner, func, exprs, include_init).into()
245}
246
247#[pyfunction]
248pub fn cum_reduce(lambda: PyObject, exprs: Vec<PyExpr>) -> PyExpr {
249    let exprs = exprs.to_exprs();
250
251    let func = move |a: Column, b: Column| {
252        binary_lambda(
253            &lambda,
254            a.take_materialized_series(),
255            b.take_materialized_series(),
256        )
257        .map(|v| v.map(Column::from))
258    };
259    dsl::cum_reduce_exprs(func, exprs).into()
260}
261
262#[pyfunction]
263#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=None, ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
264pub fn datetime(
265    year: PyExpr,
266    month: PyExpr,
267    day: PyExpr,
268    hour: Option<PyExpr>,
269    minute: Option<PyExpr>,
270    second: Option<PyExpr>,
271    microsecond: Option<PyExpr>,
272    time_unit: Wrap<TimeUnit>,
273    time_zone: Option<Wrap<TimeZone>>,
274    ambiguous: PyExpr,
275) -> PyExpr {
276    let year = year.inner;
277    let month = month.inner;
278    let day = day.inner;
279    set_unwrapped_or_0!(hour, minute, second, microsecond);
280    let ambiguous = ambiguous.inner;
281    let time_unit = time_unit.0;
282    let time_zone = time_zone.map(|x| x.0);
283    let args = DatetimeArgs {
284        year,
285        month,
286        day,
287        hour,
288        minute,
289        second,
290        microsecond,
291        time_unit,
292        time_zone,
293        ambiguous,
294    };
295    dsl::datetime(args).into()
296}
297
298#[pyfunction]
299pub fn concat_lf_diagonal(
300    lfs: &Bound<'_, PyAny>,
301    rechunk: bool,
302    parallel: bool,
303    to_supertypes: bool,
304) -> PyResult<PyLazyFrame> {
305    let iter = lfs.try_iter()?;
306
307    let lfs = iter
308        .map(|item| {
309            let item = item?;
310            get_lf(&item)
311        })
312        .collect::<PyResult<Vec<_>>>()?;
313
314    let lf = dsl::functions::concat_lf_diagonal(
315        lfs,
316        UnionArgs {
317            rechunk,
318            parallel,
319            to_supertypes,
320            ..Default::default()
321        },
322    )
323    .map_err(PyPolarsErr::from)?;
324    Ok(lf.into())
325}
326
327#[pyfunction]
328pub fn concat_lf_horizontal(lfs: &Bound<'_, PyAny>, parallel: bool) -> PyResult<PyLazyFrame> {
329    let iter = lfs.try_iter()?;
330
331    let lfs = iter
332        .map(|item| {
333            let item = item?;
334            get_lf(&item)
335        })
336        .collect::<PyResult<Vec<_>>>()?;
337
338    let args = UnionArgs {
339        rechunk: false, // No need to rechunk with horizontal concatenation
340        parallel,
341        to_supertypes: false,
342        ..Default::default()
343    };
344    let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;
345    Ok(lf.into())
346}
347
348#[pyfunction]
349pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
350    let e = e.to_exprs();
351    let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
352    Ok(e.into())
353}
354
355#[pyfunction]
356pub fn dtype_cols(dtypes: Vec<Wrap<DataType>>) -> PyResult<PyExpr> {
357    let dtypes = vec_extract_wrapped(dtypes);
358    Ok(dsl::dtype_cols(dtypes).into())
359}
360
361#[pyfunction]
362pub fn index_cols(indices: Vec<i64>) -> PyExpr {
363    if indices.len() == 1 {
364        dsl::nth(indices[0])
365    } else {
366        dsl::index_cols(indices)
367    }
368    .into()
369}
370
371#[pyfunction]
372#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
373pub fn duration(
374    weeks: Option<PyExpr>,
375    days: Option<PyExpr>,
376    hours: Option<PyExpr>,
377    minutes: Option<PyExpr>,
378    seconds: Option<PyExpr>,
379    milliseconds: Option<PyExpr>,
380    microseconds: Option<PyExpr>,
381    nanoseconds: Option<PyExpr>,
382    time_unit: Wrap<TimeUnit>,
383) -> PyExpr {
384    set_unwrapped_or_0!(
385        weeks,
386        days,
387        hours,
388        minutes,
389        seconds,
390        milliseconds,
391        microseconds,
392        nanoseconds,
393    );
394    let args = DurationArgs {
395        weeks,
396        days,
397        hours,
398        minutes,
399        seconds,
400        milliseconds,
401        microseconds,
402        nanoseconds,
403        time_unit: time_unit.0,
404    };
405    dsl::duration(args).into()
406}
407
408#[pyfunction]
409pub fn first() -> PyExpr {
410    dsl::first().into()
411}
412
413#[pyfunction]
414pub fn fold(acc: PyExpr, lambda: PyObject, exprs: Vec<PyExpr>) -> PyExpr {
415    let exprs = exprs.to_exprs();
416
417    let func = move |a: Column, b: Column| {
418        binary_lambda(
419            &lambda,
420            a.take_materialized_series(),
421            b.take_materialized_series(),
422        )
423        .map(|v| v.map(Column::from))
424    };
425    dsl::fold_exprs(acc.inner, func, exprs).into()
426}
427
428#[pyfunction]
429pub fn last() -> PyExpr {
430    dsl::last().into()
431}
432
433#[pyfunction]
434pub fn nth(n: i64) -> PyExpr {
435    dsl::nth(n).into()
436}
437
438#[pyfunction]
439pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
440    let py = value.py();
441    if value.is_instance_of::<PyBool>() {
442        let val = value.extract::<bool>()?;
443        Ok(dsl::lit(val).into())
444    } else if let Ok(int) = value.downcast::<PyInt>() {
445        let v = int
446            .extract::<i128>()
447            .map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
448            .map_err(PyPolarsErr::from)?;
449        Ok(Expr::Literal(LiteralValue::Int(v)).into())
450    } else if let Ok(float) = value.downcast::<PyFloat>() {
451        let val = float.extract::<f64>()?;
452        Ok(Expr::Literal(LiteralValue::Float(val)).into())
453    } else if let Ok(pystr) = value.downcast::<PyString>() {
454        Ok(dsl::lit(pystr.to_string()).into())
455    } else if let Ok(series) = value.extract::<PySeries>() {
456        let s = series.series;
457        if is_scalar {
458            let av = s
459                .get(0)
460                .map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
461            let av = av.into_static();
462            Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
463        } else {
464            Ok(dsl::lit(s).into())
465        }
466    } else if value.is_none() {
467        Ok(dsl::lit(Null {}).into())
468    } else if let Ok(value) = value.downcast::<PyBytes>() {
469        Ok(dsl::lit(value.as_bytes()).into())
470    } else {
471        let av = py_object_to_any_value(value, true, allow_object).map_err(|_| {
472            PyTypeError::new_err(
473                format!(
474                    "cannot create expression literal for value of type {}.\
475                    \n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
476                    value.get_type().qualname().map(|s|s.to_string()).unwrap_or("unknown".to_owned()),
477                )
478            )
479        })?;
480        match av {
481            #[cfg(feature = "object")]
482            AnyValue::ObjectOwned(_) => {
483                let s = PySeries::new_object(py, "", vec![value.extract()?], false).series;
484                Ok(dsl::lit(s).into())
485            },
486            _ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
487        }
488    }
489}
490
491#[pyfunction]
492#[pyo3(signature = (pyexpr, lambda, output_type, map_groups, returns_scalar))]
493pub fn map_mul(
494    py: Python,
495    pyexpr: Vec<PyExpr>,
496    lambda: PyObject,
497    output_type: Option<Wrap<DataType>>,
498    map_groups: bool,
499    returns_scalar: bool,
500) -> PyExpr {
501    map::lazy::map_mul(&pyexpr, py, lambda, output_type, map_groups, returns_scalar)
502}
503
504#[pyfunction]
505pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
506    dsl::pearson_corr(a.inner, b.inner).into()
507}
508
509#[pyfunction]
510pub fn reduce(lambda: PyObject, exprs: Vec<PyExpr>) -> PyExpr {
511    let exprs = exprs.to_exprs();
512
513    let func = move |a: Column, b: Column| {
514        binary_lambda(
515            &lambda,
516            a.take_materialized_series(),
517            b.take_materialized_series(),
518        )
519        .map(|v| v.map(Column::from))
520    };
521    dsl::reduce_exprs(func, exprs).into()
522}
523
524#[pyfunction]
525#[pyo3(signature = (value, n, dtype=None))]
526pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyResult<PyExpr> {
527    let mut value = value.inner;
528    let n = n.inner;
529
530    if let Some(dtype) = dtype {
531        value = value.cast(dtype.0);
532    }
533
534    Ok(dsl::repeat(value, n).into())
535}
536
537#[pyfunction]
538pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
539    #[cfg(feature = "propagate_nans")]
540    {
541        dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
542    }
543    #[cfg(not(feature = "propagate_nans"))]
544    {
545        panic!("activate 'propagate_nans'")
546    }
547}
548
549#[pyfunction]
550#[cfg(feature = "sql")]
551pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
552    let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
553    Ok(expr.into())
554}