polars_plan/dsl/functions/
horizontal.rs

1use super::*;
2
3#[cfg(feature = "dtype-struct")]
4fn cum_fold_dtype() -> GetOutput {
5    GetOutput::map_fields(|fields| {
6        let mut st = fields[0].dtype.clone();
7        for fld in &fields[1..] {
8            st = get_supertype(&st, &fld.dtype).unwrap();
9        }
10        Ok(Field::new(
11            fields[0].name.clone(),
12            DataType::Struct(
13                fields
14                    .iter()
15                    .map(|fld| Field::new(fld.name().clone(), st.clone()))
16                    .collect(),
17            ),
18        ))
19    })
20}
21
22/// Accumulate over multiple columns horizontally / row wise.
23pub fn fold_exprs<F, E>(
24    acc: Expr,
25    f: F,
26    exprs: E,
27    returns_scalar: bool,
28    return_dtype: Option<DataType>,
29) -> Expr
30where
31    F: 'static + Fn(Column, Column) -> PolarsResult<Option<Column>> + Send + Sync,
32    E: AsRef<[Expr]>,
33{
34    let mut exprs_v = Vec::with_capacity(exprs.as_ref().len() + 1);
35    exprs_v.push(acc);
36    exprs_v.extend(exprs.as_ref().iter().cloned());
37    let exprs = exprs_v;
38
39    let function = new_column_udf(move |columns: &mut [Column]| {
40        let mut acc = columns.first().unwrap().clone();
41        for c in &columns[1..] {
42            if let Some(a) = f(acc.clone(), c.clone())? {
43                acc = a
44            }
45        }
46        Ok(Some(acc))
47    });
48
49    let output_type = return_dtype
50        .map(GetOutput::from_type)
51        .unwrap_or_else(|| GetOutput::first());
52
53    Expr::AnonymousFunction {
54        input: exprs,
55        function,
56        // Take the type of the accumulator.
57        output_type,
58        options: FunctionOptions::groupwise()
59            .with_fmt_str("fold")
60            .with_flags(|mut f| {
61                f |= FunctionFlags::INPUT_WILDCARD_EXPANSION;
62                f.set(FunctionFlags::RETURNS_SCALAR, returns_scalar);
63                f
64            }),
65    }
66}
67
68/// Analogous to [`Iterator::reduce`](std::iter::Iterator::reduce).
69///
70/// An accumulator is initialized to the series given by the first expression in `exprs`, and then each subsequent value
71/// of the accumulator is computed from `f(acc, next_expr_series)`. If `exprs` is empty, an error is returned when
72/// `collect` is called.
73pub fn reduce_exprs<F, E>(f: F, exprs: E) -> Expr
74where
75    F: 'static + Fn(Column, Column) -> PolarsResult<Option<Column>> + Send + Sync,
76    E: AsRef<[Expr]>,
77{
78    let exprs = exprs.as_ref().to_vec();
79
80    let function = new_column_udf(move |columns: &mut [Column]| {
81        let mut c_iter = columns.iter();
82
83        match c_iter.next() {
84            Some(acc) => {
85                let mut acc = acc.clone();
86
87                for c in c_iter {
88                    if let Some(a) = f(acc.clone(), c.clone())? {
89                        acc = a
90                    }
91                }
92                Ok(Some(acc))
93            },
94            None => Err(polars_err!(ComputeError: "`reduce` did not have any expressions to fold")),
95        }
96    });
97
98    Expr::AnonymousFunction {
99        input: exprs,
100        function,
101        output_type: GetOutput::super_type(),
102        options: FunctionOptions::aggregation()
103            .with_fmt_str("reduce")
104            .with_flags(|f| f | FunctionFlags::INPUT_WILDCARD_EXPANSION),
105    }
106}
107
108/// Accumulate over multiple columns horizontally / row wise.
109#[cfg(feature = "dtype-struct")]
110pub fn cum_reduce_exprs<F, E>(f: F, exprs: E) -> Expr
111where
112    F: 'static + Fn(Column, Column) -> PolarsResult<Option<Column>> + Send + Sync,
113    E: AsRef<[Expr]>,
114{
115    let exprs = exprs.as_ref().to_vec();
116
117    let function = new_column_udf(move |columns: &mut [Column]| {
118        let mut c_iter = columns.iter();
119
120        match c_iter.next() {
121            Some(acc) => {
122                let mut acc = acc.clone();
123                let mut result = vec![acc.clone()];
124
125                for c in c_iter {
126                    let name = c.name().clone();
127                    if let Some(a) = f(acc.clone(), c.clone())? {
128                        acc = a;
129                    }
130                    acc.rename(name);
131                    result.push(acc.clone());
132                }
133
134                StructChunked::from_columns(acc.name().clone(), result[0].len(), &result)
135                    .map(|ca| Some(ca.into_column()))
136            },
137            None => Err(polars_err!(ComputeError: "`reduce` did not have any expressions to fold")),
138        }
139    });
140
141    Expr::AnonymousFunction {
142        input: exprs,
143        function,
144        output_type: cum_fold_dtype(),
145        options: FunctionOptions::aggregation()
146            .with_fmt_str("cum_reduce")
147            .with_flags(|f| f | FunctionFlags::INPUT_WILDCARD_EXPANSION),
148    }
149}
150
151/// Accumulate over multiple columns horizontally / row wise.
152#[cfg(feature = "dtype-struct")]
153pub fn cum_fold_exprs<F, E>(acc: Expr, f: F, exprs: E, include_init: bool) -> Expr
154where
155    F: 'static + Fn(Column, Column) -> PolarsResult<Option<Column>> + Send + Sync,
156    E: AsRef<[Expr]>,
157{
158    let mut exprs = exprs.as_ref().to_vec();
159    exprs.push(acc);
160
161    let function = new_column_udf(move |columns: &mut [Column]| {
162        let mut columns = columns.to_vec();
163        let mut acc = columns.pop().unwrap();
164
165        let mut result = vec![];
166        if include_init {
167            result.push(acc.clone())
168        }
169
170        for c in columns {
171            let name = c.name().clone();
172            if let Some(a) = f(acc.clone(), c)? {
173                acc = a;
174                acc.rename(name);
175                result.push(acc.clone());
176            }
177        }
178
179        StructChunked::from_columns(acc.name().clone(), result[0].len(), &result)
180            .map(|ca| Some(ca.into_column()))
181    });
182
183    Expr::AnonymousFunction {
184        input: exprs,
185        function,
186        output_type: cum_fold_dtype(),
187        options: FunctionOptions::aggregation()
188            .with_fmt_str("cum_fold")
189            .with_flags(|f| f | FunctionFlags::INPUT_WILDCARD_EXPANSION),
190    }
191}
192
193/// Create a new column with the bitwise-and of the elements in each row.
194///
195/// The name of the resulting column will be "all"; use [`alias`](Expr::alias) to choose a different name.
196pub fn all_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
197    let exprs = exprs.as_ref().to_vec();
198    polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
199    // This will be reduced to `expr & expr` during conversion to IR.
200    Ok(Expr::n_ary(
201        FunctionExpr::Boolean(BooleanFunction::AllHorizontal),
202        exprs,
203    ))
204}
205
206/// Create a new column with the bitwise-or of the elements in each row.
207///
208/// The name of the resulting column will be "any"; use [`alias`](Expr::alias) to choose a different name.
209pub fn any_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
210    let exprs = exprs.as_ref().to_vec();
211    polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
212    // This will be reduced to `expr | expr` during conversion to IR.
213    Ok(Expr::n_ary(
214        FunctionExpr::Boolean(BooleanFunction::AnyHorizontal),
215        exprs,
216    ))
217}
218
219/// Create a new column with the maximum value per row.
220///
221/// The name of the resulting column will be `"max"`; use [`alias`](Expr::alias) to choose a different name.
222pub fn max_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
223    let exprs = exprs.as_ref().to_vec();
224    polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
225    Ok(Expr::n_ary(FunctionExpr::MaxHorizontal, exprs))
226}
227
228/// Create a new column with the minimum value per row.
229///
230/// The name of the resulting column will be `"min"`; use [`alias`](Expr::alias) to choose a different name.
231pub fn min_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
232    let exprs = exprs.as_ref().to_vec();
233    polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
234    Ok(Expr::n_ary(FunctionExpr::MinHorizontal, exprs))
235}
236
237/// Sum all values horizontally across columns.
238pub fn sum_horizontal<E: AsRef<[Expr]>>(exprs: E, ignore_nulls: bool) -> PolarsResult<Expr> {
239    let exprs = exprs.as_ref().to_vec();
240    polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
241    Ok(Expr::n_ary(
242        FunctionExpr::SumHorizontal { ignore_nulls },
243        exprs,
244    ))
245}
246
247/// Compute the mean of all values horizontally across columns.
248pub fn mean_horizontal<E: AsRef<[Expr]>>(exprs: E, ignore_nulls: bool) -> PolarsResult<Expr> {
249    let exprs = exprs.as_ref().to_vec();
250    polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
251    Ok(Expr::n_ary(
252        FunctionExpr::MeanHorizontal { ignore_nulls },
253        exprs,
254    ))
255}
256
257/// Folds the expressions from left to right keeping the first non-null values.
258///
259/// It is an error to provide an empty `exprs`.
260pub fn coalesce(exprs: &[Expr]) -> Expr {
261    Expr::n_ary(FunctionExpr::Coalesce, exprs.to_vec())
262}