polars_python/functions/
lazy.rs

1use polars::lazy::dsl;
2use polars::prelude::*;
3use polars_plan::plans::DynLiteralValue;
4use polars_plan::prelude::UnionArgs;
5use pyo3::exceptions::{PyTypeError, PyValueError};
6use pyo3::prelude::*;
7use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
8
9use crate::conversion::any_value::py_object_to_any_value;
10use crate::conversion::{Wrap, get_lf};
11use crate::error::PyPolarsErr;
12use crate::expr::ToExprs;
13use crate::lazyframe::PyOptFlags;
14use crate::map::lazy::binary_lambda;
15use crate::prelude::vec_extract_wrapped;
16use crate::utils::EnterPolarsExt;
17use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};
18
19macro_rules! set_unwrapped_or_0 {
20    ($($var:ident),+ $(,)?) => {
21        $(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
22    };
23}
24
25#[pyfunction]
26pub fn rolling_corr(
27    x: PyExpr,
28    y: PyExpr,
29    window_size: IdxSize,
30    min_periods: IdxSize,
31    ddof: u8,
32) -> PyExpr {
33    dsl::rolling_corr(
34        x.inner,
35        y.inner,
36        RollingCovOptions {
37            min_periods,
38            window_size,
39            ddof,
40        },
41    )
42    .into()
43}
44
45#[pyfunction]
46pub fn rolling_cov(
47    x: PyExpr,
48    y: PyExpr,
49    window_size: IdxSize,
50    min_periods: IdxSize,
51    ddof: u8,
52) -> PyExpr {
53    dsl::rolling_cov(
54        x.inner,
55        y.inner,
56        RollingCovOptions {
57            min_periods,
58            window_size,
59            ddof,
60        },
61    )
62    .into()
63}
64
65#[pyfunction]
66pub fn arg_sort_by(
67    by: Vec<PyExpr>,
68    descending: Vec<bool>,
69    nulls_last: Vec<bool>,
70    multithreaded: bool,
71    maintain_order: bool,
72) -> PyExpr {
73    let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
74    dsl::arg_sort_by(
75        by,
76        SortMultipleOptions {
77            descending,
78            nulls_last,
79            multithreaded,
80            maintain_order,
81            limit: None,
82        },
83    )
84    .into()
85}
86#[pyfunction]
87pub fn arg_where(condition: PyExpr) -> PyExpr {
88    dsl::arg_where(condition.inner).into()
89}
90
91#[pyfunction]
92pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
93    let exprs = exprs.to_exprs();
94    if exprs.is_empty() {
95        return Err(PyValueError::new_err(
96            "expected at least 1 expression in 'as_struct'",
97        ));
98    }
99    Ok(dsl::as_struct(exprs).into())
100}
101
102#[pyfunction]
103pub fn field(names: Vec<String>) -> PyExpr {
104    dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
105}
106
107#[pyfunction]
108pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
109    let exprs = exprs.to_exprs();
110    dsl::coalesce(&exprs).into()
111}
112
113#[pyfunction]
114pub fn col(name: &str) -> PyExpr {
115    dsl::col(name).into()
116}
117
118fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {
119    lfs.into_iter().map(|lf| lf.ldf.logical_plan).collect()
120}
121
122#[pyfunction]
123pub fn collect_all(
124    lfs: Vec<PyLazyFrame>,
125    engine: Wrap<Engine>,
126    optflags: PyOptFlags,
127    py: Python<'_>,
128) -> PyResult<Vec<PyDataFrame>> {
129    let plans = lfs_to_plans(lfs);
130    let dfs =
131        py.enter_polars(|| LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner))?;
132    Ok(dfs.into_iter().map(Into::into).collect())
133}
134
135#[pyfunction]
136pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {
137    let plans = lfs_to_plans(lfs);
138    let explained = py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner))?;
139    Ok(explained)
140}
141
142#[pyfunction]
143pub fn collect_all_with_callback(
144    lfs: Vec<PyLazyFrame>,
145    engine: Wrap<Engine>,
146    optflags: PyOptFlags,
147    lambda: PyObject,
148    py: Python<'_>,
149) {
150    let plans = lfs.into_iter().map(|lf| lf.ldf.logical_plan).collect();
151    let result = py
152        .enter_polars(|| LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner))
153        .map(|dfs| {
154            dfs.into_iter()
155                .map(Into::into)
156                .collect::<Vec<PyDataFrame>>()
157        });
158
159    Python::with_gil(|py| match result {
160        Ok(dfs) => {
161            lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
162        },
163        Err(err) => {
164            lambda
165                .call1(py, (PyErr::from(err),))
166                .map_err(|err| err.restore(py))
167                .ok();
168        },
169    })
170}
171
172#[pyfunction]
173pub fn cols(names: Vec<String>) -> PyExpr {
174    dsl::cols(names).into()
175}
176
177#[pyfunction]
178pub fn concat_lf(
179    seq: &Bound<'_, PyAny>,
180    rechunk: bool,
181    parallel: bool,
182    to_supertypes: bool,
183) -> PyResult<PyLazyFrame> {
184    let len = seq.len()?;
185    let mut lfs = Vec::with_capacity(len);
186
187    for res in seq.try_iter()? {
188        let item = res?;
189        let lf = get_lf(&item)?;
190        lfs.push(lf);
191    }
192
193    let lf = dsl::concat(
194        lfs,
195        UnionArgs {
196            rechunk,
197            parallel,
198            to_supertypes,
199            ..Default::default()
200        },
201    )
202    .map_err(PyPolarsErr::from)?;
203    Ok(lf.into())
204}
205
206#[pyfunction]
207pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
208    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
209    let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
210    Ok(expr.into())
211}
212
213#[pyfunction]
214pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
215    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
216    let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
217    Ok(expr.into())
218}
219
220#[pyfunction]
221pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
222    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
223    dsl::concat_str(s, separator, ignore_nulls).into()
224}
225
226#[pyfunction]
227pub fn len() -> PyExpr {
228    dsl::len().into()
229}
230
231#[pyfunction]
232pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
233    dsl::cov(a.inner, b.inner, ddof).into()
234}
235
236#[pyfunction]
237#[cfg(feature = "trigonometry")]
238pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
239    y.inner.arctan2(x.inner).into()
240}
241
242#[pyfunction]
243pub fn cum_fold(acc: PyExpr, lambda: PyObject, exprs: Vec<PyExpr>, include_init: bool) -> PyExpr {
244    let exprs = exprs.to_exprs();
245
246    let func = move |a: Column, b: Column| {
247        binary_lambda(
248            &lambda,
249            a.take_materialized_series(),
250            b.take_materialized_series(),
251        )
252        .map(|v| v.map(Column::from))
253    };
254    dsl::cum_fold_exprs(acc.inner, func, exprs, include_init).into()
255}
256
257#[pyfunction]
258pub fn cum_reduce(lambda: PyObject, exprs: Vec<PyExpr>) -> PyExpr {
259    let exprs = exprs.to_exprs();
260
261    let func = move |a: Column, b: Column| {
262        binary_lambda(
263            &lambda,
264            a.take_materialized_series(),
265            b.take_materialized_series(),
266        )
267        .map(|v| v.map(Column::from))
268    };
269    dsl::cum_reduce_exprs(func, exprs).into()
270}
271
272#[pyfunction]
273#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
274pub fn datetime(
275    year: PyExpr,
276    month: PyExpr,
277    day: PyExpr,
278    hour: Option<PyExpr>,
279    minute: Option<PyExpr>,
280    second: Option<PyExpr>,
281    microsecond: Option<PyExpr>,
282    time_unit: Wrap<TimeUnit>,
283    time_zone: Wrap<Option<TimeZone>>,
284    ambiguous: PyExpr,
285) -> PyExpr {
286    let year = year.inner;
287    let month = month.inner;
288    let day = day.inner;
289    set_unwrapped_or_0!(hour, minute, second, microsecond);
290    let ambiguous = ambiguous.inner;
291    let time_unit = time_unit.0;
292    let time_zone = time_zone.0;
293    let args = DatetimeArgs {
294        year,
295        month,
296        day,
297        hour,
298        minute,
299        second,
300        microsecond,
301        time_unit,
302        time_zone,
303        ambiguous,
304    };
305    dsl::datetime(args).into()
306}
307
308#[pyfunction]
309pub fn concat_lf_diagonal(
310    lfs: &Bound<'_, PyAny>,
311    rechunk: bool,
312    parallel: bool,
313    to_supertypes: bool,
314) -> PyResult<PyLazyFrame> {
315    let iter = lfs.try_iter()?;
316
317    let lfs = iter
318        .map(|item| {
319            let item = item?;
320            get_lf(&item)
321        })
322        .collect::<PyResult<Vec<_>>>()?;
323
324    let lf = dsl::functions::concat_lf_diagonal(
325        lfs,
326        UnionArgs {
327            rechunk,
328            parallel,
329            to_supertypes,
330            ..Default::default()
331        },
332    )
333    .map_err(PyPolarsErr::from)?;
334    Ok(lf.into())
335}
336
337#[pyfunction]
338pub fn concat_lf_horizontal(lfs: &Bound<'_, PyAny>, parallel: bool) -> PyResult<PyLazyFrame> {
339    let iter = lfs.try_iter()?;
340
341    let lfs = iter
342        .map(|item| {
343            let item = item?;
344            get_lf(&item)
345        })
346        .collect::<PyResult<Vec<_>>>()?;
347
348    let args = UnionArgs {
349        rechunk: false, // No need to rechunk with horizontal concatenation
350        parallel,
351        to_supertypes: false,
352        ..Default::default()
353    };
354    let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;
355    Ok(lf.into())
356}
357
358#[pyfunction]
359pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
360    let e = e.to_exprs();
361    let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
362    Ok(e.into())
363}
364
365#[pyfunction]
366pub fn dtype_cols(dtypes: Vec<Wrap<DataType>>) -> PyResult<PyExpr> {
367    let dtypes = vec_extract_wrapped(dtypes);
368    Ok(dsl::dtype_cols(dtypes).into())
369}
370
371#[pyfunction]
372pub fn index_cols(indices: Vec<i64>) -> PyExpr {
373    if indices.len() == 1 {
374        dsl::nth(indices[0])
375    } else {
376        dsl::index_cols(indices)
377    }
378    .into()
379}
380
381#[pyfunction]
382#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
383pub fn duration(
384    weeks: Option<PyExpr>,
385    days: Option<PyExpr>,
386    hours: Option<PyExpr>,
387    minutes: Option<PyExpr>,
388    seconds: Option<PyExpr>,
389    milliseconds: Option<PyExpr>,
390    microseconds: Option<PyExpr>,
391    nanoseconds: Option<PyExpr>,
392    time_unit: Wrap<TimeUnit>,
393) -> PyExpr {
394    set_unwrapped_or_0!(
395        weeks,
396        days,
397        hours,
398        minutes,
399        seconds,
400        milliseconds,
401        microseconds,
402        nanoseconds,
403    );
404    let args = DurationArgs {
405        weeks,
406        days,
407        hours,
408        minutes,
409        seconds,
410        milliseconds,
411        microseconds,
412        nanoseconds,
413        time_unit: time_unit.0,
414    };
415    dsl::duration(args).into()
416}
417
418#[pyfunction]
419pub fn first() -> PyExpr {
420    dsl::first().into()
421}
422
423#[pyfunction]
424pub fn fold(
425    acc: PyExpr,
426    lambda: PyObject,
427    exprs: Vec<PyExpr>,
428    returns_scalar: bool,
429    return_dtype: Option<Wrap<DataType>>,
430) -> PyExpr {
431    let exprs = exprs.to_exprs();
432
433    let func = move |a: Column, b: Column| {
434        binary_lambda(
435            &lambda,
436            a.take_materialized_series(),
437            b.take_materialized_series(),
438        )
439        .map(|v| v.map(Column::from))
440    };
441    dsl::fold_exprs(
442        acc.inner,
443        func,
444        exprs,
445        returns_scalar,
446        return_dtype.map(|w| w.0),
447    )
448    .into()
449}
450
451#[pyfunction]
452pub fn last() -> PyExpr {
453    dsl::last().into()
454}
455
456#[pyfunction]
457pub fn nth(n: i64) -> PyExpr {
458    dsl::nth(n).into()
459}
460
461#[pyfunction]
462pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
463    let py = value.py();
464    if value.is_instance_of::<PyBool>() {
465        let val = value.extract::<bool>()?;
466        Ok(dsl::lit(val).into())
467    } else if let Ok(int) = value.downcast::<PyInt>() {
468        let v = int
469            .extract::<i128>()
470            .map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
471            .map_err(PyPolarsErr::from)?;
472        Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())
473    } else if let Ok(float) = value.downcast::<PyFloat>() {
474        let val = float.extract::<f64>()?;
475        Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())
476    } else if let Ok(pystr) = value.downcast::<PyString>() {
477        Ok(dsl::lit(pystr.to_string()).into())
478    } else if let Ok(series) = value.extract::<PySeries>() {
479        let s = series.series;
480        if is_scalar {
481            let av = s
482                .get(0)
483                .map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
484            let av = av.into_static();
485            Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
486        } else {
487            Ok(dsl::lit(s).into())
488        }
489    } else if value.is_none() {
490        Ok(dsl::lit(Null {}).into())
491    } else if let Ok(value) = value.downcast::<PyBytes>() {
492        Ok(dsl::lit(value.as_bytes()).into())
493    } else {
494        let av = py_object_to_any_value(value, true, allow_object).map_err(|_| {
495            PyTypeError::new_err(
496                format!(
497                    "cannot create expression literal for value of type {}.\
498                    \n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
499                    value.get_type().qualname().map(|s|s.to_string()).unwrap_or("unknown".to_owned()),
500                )
501            )
502        })?;
503        match av {
504            #[cfg(feature = "object")]
505            AnyValue::ObjectOwned(_) => {
506                let s = PySeries::new_object(py, "", vec![value.extract()?], false).series;
507                Ok(dsl::lit(s).into())
508            },
509            _ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
510        }
511    }
512}
513
514#[pyfunction]
515#[pyo3(signature = (pyexpr, lambda, output_type, map_groups, returns_scalar))]
516pub fn map_mul(
517    py: Python<'_>,
518    pyexpr: Vec<PyExpr>,
519    lambda: PyObject,
520    output_type: Option<Wrap<DataType>>,
521    map_groups: bool,
522    returns_scalar: bool,
523) -> PyExpr {
524    map::lazy::map_mul(&pyexpr, py, lambda, output_type, map_groups, returns_scalar)
525}
526
527#[pyfunction]
528pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
529    dsl::pearson_corr(a.inner, b.inner).into()
530}
531
532#[pyfunction]
533pub fn reduce(lambda: PyObject, exprs: Vec<PyExpr>) -> PyExpr {
534    let exprs = exprs.to_exprs();
535
536    let func = move |a: Column, b: Column| {
537        binary_lambda(
538            &lambda,
539            a.take_materialized_series(),
540            b.take_materialized_series(),
541        )
542        .map(|v| v.map(Column::from))
543    };
544    dsl::reduce_exprs(func, exprs).into()
545}
546
547#[pyfunction]
548#[pyo3(signature = (value, n, dtype=None))]
549pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {
550    let mut value = value.inner;
551    let n = n.inner;
552
553    if let Some(dtype) = dtype {
554        value = value.cast(dtype.0);
555    }
556
557    dsl::repeat(value, n).into()
558}
559
560#[pyfunction]
561pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
562    #[cfg(feature = "propagate_nans")]
563    {
564        dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
565    }
566    #[cfg(not(feature = "propagate_nans"))]
567    {
568        panic!("activate 'propagate_nans'")
569    }
570}
571
572#[pyfunction]
573#[cfg(feature = "sql")]
574pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
575    let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
576    Ok(expr.into())
577}