Skip to main content

polars_python/functions/
lazy.rs

1use polars::lazy::dsl;
2use polars::prelude::*;
3use polars_plan::plans::DynLiteralValue;
4use polars_plan::prelude::UnionArgs;
5use polars_utils::python_function::PythonObject;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
9
10use crate::conversion::any_value::py_object_to_any_value;
11use crate::conversion::{Wrap, get_lf};
12use crate::error::PyPolarsErr;
13use crate::expr::ToExprs;
14use crate::expr::datatype::PyDataTypeExpr;
15use crate::lazyframe::PyOptFlags;
16use crate::utils::EnterPolarsExt;
17use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};
18
19macro_rules! set_unwrapped_or_0 {
20    ($($var:ident),+ $(,)?) => {
21        $(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
22    };
23}
24
25#[pyfunction]
26pub fn rolling_corr(
27    x: PyExpr,
28    y: PyExpr,
29    window_size: IdxSize,
30    min_periods: IdxSize,
31    ddof: u8,
32) -> PyExpr {
33    dsl::rolling_corr(
34        x.inner,
35        y.inner,
36        RollingCovOptions {
37            min_periods,
38            window_size,
39            ddof,
40        },
41    )
42    .into()
43}
44
45#[pyfunction]
46pub fn rolling_cov(
47    x: PyExpr,
48    y: PyExpr,
49    window_size: IdxSize,
50    min_periods: IdxSize,
51    ddof: u8,
52) -> PyExpr {
53    dsl::rolling_cov(
54        x.inner,
55        y.inner,
56        RollingCovOptions {
57            min_periods,
58            window_size,
59            ddof,
60        },
61    )
62    .into()
63}
64
65#[pyfunction]
66pub fn arg_sort_by(
67    by: Vec<PyExpr>,
68    descending: Vec<bool>,
69    nulls_last: Vec<bool>,
70    multithreaded: bool,
71    maintain_order: bool,
72) -> PyExpr {
73    let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
74    dsl::arg_sort_by(
75        by,
76        SortMultipleOptions {
77            descending,
78            nulls_last,
79            multithreaded,
80            maintain_order,
81            limit: None,
82        },
83    )
84    .into()
85}
86#[pyfunction]
87pub fn arg_where(condition: PyExpr) -> PyExpr {
88    dsl::arg_where(condition.inner).into()
89}
90
91#[pyfunction]
92pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
93    let exprs = exprs.to_exprs();
94    if exprs.is_empty() {
95        return Err(PyValueError::new_err(
96            "expected at least 1 expression in 'as_struct'",
97        ));
98    }
99    Ok(dsl::as_struct(exprs).into())
100}
101
102#[pyfunction]
103pub fn field(names: Vec<String>) -> PyExpr {
104    dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
105}
106
107#[pyfunction]
108pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
109    let exprs = exprs.to_exprs();
110    dsl::coalesce(&exprs).into()
111}
112
113#[pyfunction]
114pub fn col(name: &str) -> PyExpr {
115    dsl::col(name).into()
116}
117
118#[pyfunction]
119pub fn element() -> PyExpr {
120    dsl::element().into()
121}
122
123fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {
124    lfs.into_iter()
125        .map(|lf| lf.ldf.into_inner().logical_plan)
126        .collect()
127}
128
129#[pyfunction]
130pub fn collect_all(
131    lfs: Vec<PyLazyFrame>,
132    engine: Wrap<Engine>,
133    optflags: PyOptFlags,
134    py: Python<'_>,
135) -> PyResult<Vec<PyDataFrame>> {
136    let plans = lfs_to_plans(lfs);
137    let dfs = py.enter_polars(|| {
138        LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
139    })?;
140    Ok(dfs.into_iter().map(Into::into).collect())
141}
142
143#[pyfunction]
144pub fn collect_all_lazy(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags) -> PyResult<PyLazyFrame> {
145    let plans = lfs_to_plans(lfs);
146
147    for plan in &plans {
148        if !matches!(plan, DslPlan::Sink { .. }) {
149            return Err(PyValueError::new_err(
150                "all LazyFrames must end with a sink to use 'collect_all(lazy=True)'",
151            ));
152        }
153    }
154
155    Ok(LazyFrame::from_logical_plan(
156        DslPlan::SinkMultiple { inputs: plans },
157        optflags.inner.into_inner(),
158    )
159    .into())
160}
161
162#[pyfunction]
163pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {
164    let plans = lfs_to_plans(lfs);
165    let explained =
166        py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner.into_inner()))?;
167    Ok(explained)
168}
169
170#[pyfunction]
171pub fn collect_all_with_callback(
172    lfs: Vec<PyLazyFrame>,
173    engine: Wrap<Engine>,
174    optflags: PyOptFlags,
175    lambda: Py<PyAny>,
176    py: Python<'_>,
177) {
178    let plans = lfs
179        .into_iter()
180        .map(|lf| lf.ldf.into_inner().logical_plan)
181        .collect();
182    let result = py
183        .enter_polars(|| {
184            LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
185        })
186        .map(|dfs| {
187            dfs.into_iter()
188                .map(Into::into)
189                .collect::<Vec<PyDataFrame>>()
190        });
191
192    Python::attach(|py| match result {
193        Ok(dfs) => {
194            lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
195        },
196        Err(err) => {
197            lambda
198                .call1(py, (PyErr::from(err),))
199                .map_err(|err| err.restore(py))
200                .ok();
201        },
202    })
203}
204
205#[pyfunction]
206pub fn concat_lf(
207    seq: &Bound<'_, PyAny>,
208    rechunk: bool,
209    parallel: bool,
210    to_supertypes: bool,
211    maintain_order: bool,
212) -> PyResult<PyLazyFrame> {
213    let len = seq.len()?;
214    let mut lfs = Vec::with_capacity(len);
215
216    for res in seq.try_iter()? {
217        let item = res?;
218        let lf = get_lf(&item)?;
219        lfs.push(lf);
220    }
221
222    let lf = dsl::concat(
223        lfs,
224        UnionArgs {
225            rechunk,
226            parallel,
227            to_supertypes,
228            maintain_order,
229            ..Default::default()
230        },
231    )
232    .map_err(PyPolarsErr::from)?;
233    Ok(lf.into())
234}
235
236#[pyfunction]
237pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
238    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
239    let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
240    Ok(expr.into())
241}
242
243#[pyfunction]
244pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
245    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
246    let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
247    Ok(expr.into())
248}
249
250#[pyfunction]
251pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
252    let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
253    dsl::concat_str(s, separator, ignore_nulls).into()
254}
255
256#[pyfunction]
257pub fn len() -> PyExpr {
258    dsl::len().into()
259}
260
261#[pyfunction]
262pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
263    dsl::cov(a.inner, b.inner, ddof).into()
264}
265
266#[pyfunction]
267#[cfg(feature = "trigonometry")]
268pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
269    y.inner.arctan2(x.inner).into()
270}
271
272#[pyfunction]
273pub fn cum_fold(
274    acc: PyExpr,
275    lambda: Py<PyAny>,
276    exprs: Vec<PyExpr>,
277    returns_scalar: bool,
278    return_dtype: Option<PyDataTypeExpr>,
279    include_init: bool,
280) -> PyExpr {
281    let exprs = exprs.to_exprs();
282    let func = PlanCallback::new_python(PythonObject(lambda));
283    dsl::cum_fold_exprs(
284        acc.inner,
285        func,
286        exprs,
287        returns_scalar,
288        return_dtype.map(|v| v.inner),
289        include_init,
290    )
291    .into()
292}
293
294#[pyfunction]
295pub fn cum_reduce(
296    lambda: Py<PyAny>,
297    exprs: Vec<PyExpr>,
298    returns_scalar: bool,
299    return_dtype: Option<PyDataTypeExpr>,
300) -> PyExpr {
301    let exprs = exprs.to_exprs();
302
303    let func = PlanCallback::new_python(PythonObject(lambda));
304    dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
305}
306
307#[pyfunction]
308#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
309pub fn datetime(
310    year: PyExpr,
311    month: PyExpr,
312    day: PyExpr,
313    hour: Option<PyExpr>,
314    minute: Option<PyExpr>,
315    second: Option<PyExpr>,
316    microsecond: Option<PyExpr>,
317    time_unit: Wrap<TimeUnit>,
318    time_zone: Wrap<Option<TimeZone>>,
319    ambiguous: PyExpr,
320) -> PyExpr {
321    let year = year.inner;
322    let month = month.inner;
323    let day = day.inner;
324    set_unwrapped_or_0!(hour, minute, second, microsecond);
325    let ambiguous = ambiguous.inner;
326    let time_unit = time_unit.0;
327    let time_zone = time_zone.0;
328    let args = DatetimeArgs {
329        year,
330        month,
331        day,
332        hour,
333        minute,
334        second,
335        microsecond,
336        time_unit,
337        time_zone,
338        ambiguous,
339    };
340    dsl::datetime(args).into()
341}
342
343#[pyfunction]
344pub fn concat_lf_diagonal(
345    lfs: &Bound<'_, PyAny>,
346    rechunk: bool,
347    parallel: bool,
348    to_supertypes: bool,
349    maintain_order: bool,
350) -> PyResult<PyLazyFrame> {
351    let iter = lfs.try_iter()?;
352
353    let lfs = iter
354        .map(|item| {
355            let item = item?;
356            get_lf(&item)
357        })
358        .collect::<PyResult<Vec<_>>>()?;
359
360    let lf = dsl::functions::concat_lf_diagonal(
361        lfs,
362        UnionArgs {
363            rechunk,
364            parallel,
365            to_supertypes,
366            maintain_order,
367            ..Default::default()
368        },
369    )
370    .map_err(PyPolarsErr::from)?;
371    Ok(lf.into())
372}
373
374#[pyfunction]
375pub fn concat_lf_horizontal(
376    lfs: &Bound<'_, PyAny>,
377    parallel: bool,
378    strict: bool,
379) -> PyResult<PyLazyFrame> {
380    let iter = lfs.try_iter()?;
381
382    let lfs = iter
383        .map(|item| {
384            let item = item?;
385            get_lf(&item)
386        })
387        .collect::<PyResult<Vec<_>>>()?;
388
389    let lf = dsl::functions::concat_lf_horizontal(
390        lfs,
391        HConcatOptions {
392            parallel,
393            strict,
394            broadcast_unit_length: Default::default(),
395        },
396    )
397    .map_err(PyPolarsErr::from)?;
398    Ok(lf.into())
399}
400
401#[pyfunction]
402pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
403    let e = e.to_exprs();
404    let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
405    Ok(e.into())
406}
407
408#[pyfunction]
409#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
410pub fn duration(
411    weeks: Option<PyExpr>,
412    days: Option<PyExpr>,
413    hours: Option<PyExpr>,
414    minutes: Option<PyExpr>,
415    seconds: Option<PyExpr>,
416    milliseconds: Option<PyExpr>,
417    microseconds: Option<PyExpr>,
418    nanoseconds: Option<PyExpr>,
419    time_unit: Wrap<TimeUnit>,
420) -> PyExpr {
421    set_unwrapped_or_0!(
422        weeks,
423        days,
424        hours,
425        minutes,
426        seconds,
427        milliseconds,
428        microseconds,
429        nanoseconds,
430    );
431    let args = DurationArgs {
432        weeks,
433        days,
434        hours,
435        minutes,
436        seconds,
437        milliseconds,
438        microseconds,
439        nanoseconds,
440        time_unit: time_unit.0,
441    };
442    dsl::duration(args).into()
443}
444
445#[pyfunction]
446pub fn fold(
447    acc: PyExpr,
448    lambda: Py<PyAny>,
449    exprs: Vec<PyExpr>,
450    returns_scalar: bool,
451    return_dtype: Option<PyDataTypeExpr>,
452) -> PyExpr {
453    let exprs = exprs.to_exprs();
454    let func = PlanCallback::new_python(PythonObject(lambda));
455    dsl::fold_exprs(
456        acc.inner,
457        func,
458        exprs,
459        returns_scalar,
460        return_dtype.map(|w| w.inner),
461    )
462    .into()
463}
464
465#[pyfunction]
466pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
467    let py = value.py();
468    if value.is_instance_of::<PyBool>() {
469        let val = value.extract::<bool>()?;
470        Ok(dsl::lit(val).into())
471    } else if let Ok(int) = value.cast::<PyInt>() {
472        let v = int
473            .extract::<i128>()
474            .map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
475            .map_err(PyPolarsErr::from)?;
476        Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())
477    } else if let Ok(float) = value.cast::<PyFloat>() {
478        let val = float.extract::<f64>()?;
479        Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())
480    } else if let Ok(pystr) = value.cast::<PyString>() {
481        Ok(dsl::lit(pystr.to_string()).into())
482    } else if let Ok(series) = value.extract::<PySeries>() {
483        let s = series.series.into_inner();
484        if is_scalar {
485            let av = s
486                .get(0)
487                .map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
488            let av = av.into_static();
489            Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
490        } else {
491            Ok(dsl::lit(s).into())
492        }
493    } else if value.is_none() {
494        Ok(dsl::lit(Null {}).into())
495    } else if let Ok(value) = value.cast::<PyBytes>() {
496        Ok(dsl::lit(value.as_bytes()).into())
497    } else {
498        let raise = || {
499            PyTypeError::new_err(format!(
500                "cannot create expression literal for value of type {}.\
501                    \n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
502                value
503                    .get_type()
504                    .qualname()
505                    .map(|s| s.to_string())
506                    .unwrap_or("unknown".to_owned()),
507            ))
508        };
509
510        let av = py_object_to_any_value(value, true, allow_object).map_err(|_| raise())?;
511        match av {
512            #[cfg(feature = "object")]
513            AnyValue::ObjectOwned(_) => {
514                // Check again for object allowance as for cached addresses this is not checked.
515                if allow_object {
516                    let s = PySeries::new_object(py, "", vec![value.extract()?], false)
517                        .series
518                        .into_inner();
519                    Ok(dsl::lit(s).into())
520                } else {
521                    Err(raise())
522                }
523            },
524            _ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
525        }
526    }
527}
528
529#[pyfunction]
530#[pyo3(signature = (pyexpr, lambda, output_type, is_elementwise, returns_scalar))]
531pub fn map_expr(
532    pyexpr: Vec<PyExpr>,
533    lambda: Py<PyAny>,
534    output_type: Option<PyDataTypeExpr>,
535    is_elementwise: bool,
536    returns_scalar: bool,
537) -> PyExpr {
538    map::lazy::map_expr(&pyexpr, lambda, output_type, is_elementwise, returns_scalar)
539}
540
541#[pyfunction]
542pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
543    dsl::pearson_corr(a.inner, b.inner).into()
544}
545
546#[pyfunction]
547pub fn reduce(
548    lambda: Py<PyAny>,
549    exprs: Vec<PyExpr>,
550    returns_scalar: bool,
551    return_dtype: Option<PyDataTypeExpr>,
552) -> PyExpr {
553    let exprs = exprs.to_exprs();
554    let func = PlanCallback::new_python(PythonObject(lambda));
555    dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
556}
557
558#[pyfunction]
559#[pyo3(signature = (value, n, dtype=None))]
560pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {
561    let mut value = value.inner;
562    let n = n.inner;
563
564    if let Some(dtype) = dtype {
565        value = value.cast(dtype.0);
566    }
567
568    dsl::repeat(value, n).into()
569}
570
571#[pyfunction]
572pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
573    #[cfg(feature = "propagate_nans")]
574    {
575        dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
576    }
577    #[cfg(not(feature = "propagate_nans"))]
578    {
579        panic!("activate 'propagate_nans'")
580    }
581}
582
583#[pyfunction]
584#[cfg(feature = "sql")]
585pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
586    let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
587    Ok(expr.into())
588}