polars_plan/dsl/functions/
horizontal.rs1use super::*;
2
3#[cfg(feature = "dtype-struct")]
4fn cum_fold_dtype() -> GetOutput {
5 GetOutput::map_fields(|fields| {
6 let mut st = fields[0].dtype.clone();
7 for fld in &fields[1..] {
8 st = get_supertype(&st, &fld.dtype).unwrap();
9 }
10 Ok(Field::new(
11 fields[0].name.clone(),
12 DataType::Struct(
13 fields
14 .iter()
15 .map(|fld| Field::new(fld.name().clone(), st.clone()))
16 .collect(),
17 ),
18 ))
19 })
20}
21
22pub fn fold_exprs<F, E>(
24 acc: Expr,
25 f: F,
26 exprs: E,
27 returns_scalar: bool,
28 return_dtype: Option<DataType>,
29) -> Expr
30where
31 F: 'static + Fn(Column, Column) -> PolarsResult<Option<Column>> + Send + Sync,
32 E: AsRef<[Expr]>,
33{
34 let mut exprs_v = Vec::with_capacity(exprs.as_ref().len() + 1);
35 exprs_v.push(acc);
36 exprs_v.extend(exprs.as_ref().iter().cloned());
37 let exprs = exprs_v;
38
39 let function = new_column_udf(move |columns: &mut [Column]| {
40 let mut acc = columns.first().unwrap().clone();
41 for c in &columns[1..] {
42 if let Some(a) = f(acc.clone(), c.clone())? {
43 acc = a
44 }
45 }
46 Ok(Some(acc))
47 });
48
49 let output_type = return_dtype
50 .map(GetOutput::from_type)
51 .unwrap_or_else(|| GetOutput::first());
52
53 Expr::AnonymousFunction {
54 input: exprs,
55 function,
56 output_type,
58 options: FunctionOptions::groupwise()
59 .with_fmt_str("fold")
60 .with_flags(|mut f| {
61 f |= FunctionFlags::INPUT_WILDCARD_EXPANSION;
62 f.set(FunctionFlags::RETURNS_SCALAR, returns_scalar);
63 f
64 }),
65 }
66}
67
68pub fn reduce_exprs<F, E>(f: F, exprs: E) -> Expr
74where
75 F: 'static + Fn(Column, Column) -> PolarsResult<Option<Column>> + Send + Sync,
76 E: AsRef<[Expr]>,
77{
78 let exprs = exprs.as_ref().to_vec();
79
80 let function = new_column_udf(move |columns: &mut [Column]| {
81 let mut c_iter = columns.iter();
82
83 match c_iter.next() {
84 Some(acc) => {
85 let mut acc = acc.clone();
86
87 for c in c_iter {
88 if let Some(a) = f(acc.clone(), c.clone())? {
89 acc = a
90 }
91 }
92 Ok(Some(acc))
93 },
94 None => Err(polars_err!(ComputeError: "`reduce` did not have any expressions to fold")),
95 }
96 });
97
98 Expr::AnonymousFunction {
99 input: exprs,
100 function,
101 output_type: GetOutput::super_type(),
102 options: FunctionOptions::aggregation()
103 .with_fmt_str("reduce")
104 .with_flags(|f| f | FunctionFlags::INPUT_WILDCARD_EXPANSION),
105 }
106}
107
108#[cfg(feature = "dtype-struct")]
110pub fn cum_reduce_exprs<F, E>(f: F, exprs: E) -> Expr
111where
112 F: 'static + Fn(Column, Column) -> PolarsResult<Option<Column>> + Send + Sync,
113 E: AsRef<[Expr]>,
114{
115 let exprs = exprs.as_ref().to_vec();
116
117 let function = new_column_udf(move |columns: &mut [Column]| {
118 let mut c_iter = columns.iter();
119
120 match c_iter.next() {
121 Some(acc) => {
122 let mut acc = acc.clone();
123 let mut result = vec![acc.clone()];
124
125 for c in c_iter {
126 let name = c.name().clone();
127 if let Some(a) = f(acc.clone(), c.clone())? {
128 acc = a;
129 }
130 acc.rename(name);
131 result.push(acc.clone());
132 }
133
134 StructChunked::from_columns(acc.name().clone(), result[0].len(), &result)
135 .map(|ca| Some(ca.into_column()))
136 },
137 None => Err(polars_err!(ComputeError: "`reduce` did not have any expressions to fold")),
138 }
139 });
140
141 Expr::AnonymousFunction {
142 input: exprs,
143 function,
144 output_type: cum_fold_dtype(),
145 options: FunctionOptions::aggregation()
146 .with_fmt_str("cum_reduce")
147 .with_flags(|f| f | FunctionFlags::INPUT_WILDCARD_EXPANSION),
148 }
149}
150
151#[cfg(feature = "dtype-struct")]
153pub fn cum_fold_exprs<F, E>(acc: Expr, f: F, exprs: E, include_init: bool) -> Expr
154where
155 F: 'static + Fn(Column, Column) -> PolarsResult<Option<Column>> + Send + Sync,
156 E: AsRef<[Expr]>,
157{
158 let mut exprs = exprs.as_ref().to_vec();
159 exprs.push(acc);
160
161 let function = new_column_udf(move |columns: &mut [Column]| {
162 let mut columns = columns.to_vec();
163 let mut acc = columns.pop().unwrap();
164
165 let mut result = vec![];
166 if include_init {
167 result.push(acc.clone())
168 }
169
170 for c in columns {
171 let name = c.name().clone();
172 if let Some(a) = f(acc.clone(), c)? {
173 acc = a;
174 acc.rename(name);
175 result.push(acc.clone());
176 }
177 }
178
179 StructChunked::from_columns(acc.name().clone(), result[0].len(), &result)
180 .map(|ca| Some(ca.into_column()))
181 });
182
183 Expr::AnonymousFunction {
184 input: exprs,
185 function,
186 output_type: cum_fold_dtype(),
187 options: FunctionOptions::aggregation()
188 .with_fmt_str("cum_fold")
189 .with_flags(|f| f | FunctionFlags::INPUT_WILDCARD_EXPANSION),
190 }
191}
192
193pub fn all_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
197 let exprs = exprs.as_ref().to_vec();
198 polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
199 Ok(Expr::n_ary(
201 FunctionExpr::Boolean(BooleanFunction::AllHorizontal),
202 exprs,
203 ))
204}
205
206pub fn any_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
210 let exprs = exprs.as_ref().to_vec();
211 polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
212 Ok(Expr::n_ary(
214 FunctionExpr::Boolean(BooleanFunction::AnyHorizontal),
215 exprs,
216 ))
217}
218
219pub fn max_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
223 let exprs = exprs.as_ref().to_vec();
224 polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
225 Ok(Expr::n_ary(FunctionExpr::MaxHorizontal, exprs))
226}
227
228pub fn min_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
232 let exprs = exprs.as_ref().to_vec();
233 polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
234 Ok(Expr::n_ary(FunctionExpr::MinHorizontal, exprs))
235}
236
237pub fn sum_horizontal<E: AsRef<[Expr]>>(exprs: E, ignore_nulls: bool) -> PolarsResult<Expr> {
239 let exprs = exprs.as_ref().to_vec();
240 polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
241 Ok(Expr::n_ary(
242 FunctionExpr::SumHorizontal { ignore_nulls },
243 exprs,
244 ))
245}
246
247pub fn mean_horizontal<E: AsRef<[Expr]>>(exprs: E, ignore_nulls: bool) -> PolarsResult<Expr> {
249 let exprs = exprs.as_ref().to_vec();
250 polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");
251 Ok(Expr::n_ary(
252 FunctionExpr::MeanHorizontal { ignore_nulls },
253 exprs,
254 ))
255}
256
257pub fn coalesce(exprs: &[Expr]) -> Expr {
261 Expr::n_ary(FunctionExpr::Coalesce, exprs.to_vec())
262}