datafusion_physical_expr/
aggregate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub(crate) mod groups_accumulator {
19    #[allow(unused_imports)]
20    pub(crate) mod accumulate {
21        pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState;
22    }
23    pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{
24        accumulate::NullState, GroupsAccumulatorAdapter,
25    };
26}
27pub(crate) mod stats {
28    pub use datafusion_functions_aggregate_common::stats::StatsType;
29}
30pub mod utils {
31    pub use datafusion_functions_aggregate_common::utils::{
32        get_accum_scalar_values_as_arrays, get_sort_options, ordering_fields,
33        DecimalAverager, Hashable,
34    };
35}
36
37use std::fmt::Debug;
38use std::sync::Arc;
39
40use crate::expressions::Column;
41
42use arrow::compute::SortOptions;
43use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
44use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
45use datafusion_expr::{AggregateUDF, ReversedUDAF, SetMonotonicity};
46use datafusion_expr_common::accumulator::Accumulator;
47use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
48use datafusion_expr_common::type_coercion::aggregates::check_arg_count;
49use datafusion_functions_aggregate_common::accumulator::{
50    AccumulatorArgs, StateFieldsArgs,
51};
52use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
53use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
54use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
55
56/// Builder for physical [`AggregateFunctionExpr`]
57///
58/// `AggregateFunctionExpr` contains the information necessary to call
59/// an aggregate expression.
60#[derive(Debug, Clone)]
61pub struct AggregateExprBuilder {
62    fun: Arc<AggregateUDF>,
63    /// Physical expressions of the aggregate function
64    args: Vec<Arc<dyn PhysicalExpr>>,
65    alias: Option<String>,
66    /// A human readable name
67    human_display: String,
68    /// Arrow Schema for the aggregate function
69    schema: SchemaRef,
70    /// The physical order by expressions
71    order_bys: Vec<PhysicalSortExpr>,
72    /// Whether to ignore null values
73    ignore_nulls: bool,
74    /// Whether is distinct aggregate function
75    is_distinct: bool,
76    /// Whether the expression is reversed
77    is_reversed: bool,
78}
79
80impl AggregateExprBuilder {
81    pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> Self {
82        Self {
83            fun,
84            args,
85            alias: None,
86            human_display: String::default(),
87            schema: Arc::new(Schema::empty()),
88            order_bys: vec![],
89            ignore_nulls: false,
90            is_distinct: false,
91            is_reversed: false,
92        }
93    }
94
95    /// Constructs an `AggregateFunctionExpr` from the builder
96    ///
97    /// Note that an [`Self::alias`] must be provided before calling this method.
98    ///
99    /// # Example: Create an [`AggregateUDF`]
100    ///
101    /// In the following example, [`AggregateFunctionExpr`] will be built using [`AggregateExprBuilder`]
102    /// which provides a build function. Full example could be accessed from the source file.
103    ///
104    /// ```
105    /// # use std::any::Any;
106    /// # use std::sync::Arc;
107    /// # use arrow::datatypes::{DataType, FieldRef};
108    /// # use datafusion_common::{Result, ScalarValue};
109    /// # use datafusion_expr::{col, ColumnarValue, Documentation, Signature, Volatility, Expr};
110    /// # use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator, function::{AccumulatorArgs, StateFieldsArgs}};
111    /// # use arrow::datatypes::Field;
112    /// #
113    /// # #[derive(Debug, Clone, PartialEq, Eq, Hash)]
114    /// # struct FirstValueUdf {
115    /// #     signature: Signature,
116    /// # }
117    /// #
118    /// # impl FirstValueUdf {
119    /// #     fn new() -> Self {
120    /// #         Self {
121    /// #             signature: Signature::any(1, Volatility::Immutable),
122    /// #         }
123    /// #     }
124    /// # }
125    /// #
126    /// # impl AggregateUDFImpl for FirstValueUdf {
127    /// #     fn as_any(&self) -> &dyn Any {
128    /// #         unimplemented!()
129    /// #     }
130    /// #
131    /// #     fn name(&self) -> &str {
132    /// #         unimplemented!()
133    /// #     }
134    /// #
135    /// #     fn signature(&self) -> &Signature {
136    /// #         unimplemented!()
137    /// #     }
138    /// #
139    /// #     fn return_type(&self, args: &[DataType]) -> Result<DataType> {
140    /// #         unimplemented!()
141    /// #     }
142    /// #
143    /// #     fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
144    /// #         unimplemented!()
145    /// #         }
146    /// #
147    /// #     fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
148    /// #         unimplemented!()
149    /// #     }
150    /// #
151    /// #     fn documentation(&self) -> Option<&Documentation> {
152    /// #         unimplemented!()
153    /// #     }
154    /// # }
155    /// #
156    /// # let first_value = AggregateUDF::from(FirstValueUdf::new());
157    /// # let expr = first_value.call(vec![col("a")]);
158    /// #
159    /// # use datafusion_physical_expr::expressions::Column;
160    /// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
161    /// # use datafusion_physical_expr::aggregate::AggregateExprBuilder;
162    /// # use datafusion_physical_expr::expressions::PhysicalSortExpr;
163    /// # use datafusion_physical_expr::PhysicalSortRequirement;
164    /// #
165    /// fn build_aggregate_expr() -> Result<()> {
166    ///     let args = vec![Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>];
167    ///     let order_by = vec![PhysicalSortExpr {
168    ///         expr: Arc::new(Column::new("x", 1)) as Arc<dyn PhysicalExpr>,
169    ///         options: Default::default(),
170    ///     }];
171    ///
172    ///     let first_value = AggregateUDF::from(FirstValueUdf::new());
173    ///
174    ///     let aggregate_expr = AggregateExprBuilder::new(
175    ///         Arc::new(first_value),
176    ///         args
177    ///     )
178    ///     .order_by(order_by)
179    ///     .alias("first_a_by_x")
180    ///     .ignore_nulls()
181    ///     .build()?;
182    ///
183    ///     Ok(())
184    /// }
185    /// ```
186    ///
187    /// This creates a physical expression equivalent to SQL:
188    /// `first_value(a ORDER BY x) IGNORE NULLS AS first_a_by_x`
189    pub fn build(self) -> Result<AggregateFunctionExpr> {
190        let Self {
191            fun,
192            args,
193            alias,
194            human_display,
195            schema,
196            order_bys,
197            ignore_nulls,
198            is_distinct,
199            is_reversed,
200        } = self;
201        if args.is_empty() {
202            return internal_err!("args should not be empty");
203        }
204
205        let ordering_types = order_bys
206            .iter()
207            .map(|e| e.expr.data_type(&schema))
208            .collect::<Result<Vec<_>>>()?;
209
210        let ordering_fields = utils::ordering_fields(&order_bys, &ordering_types);
211
212        let input_exprs_fields = args
213            .iter()
214            .map(|arg| arg.return_field(&schema))
215            .collect::<Result<Vec<_>>>()?;
216
217        check_arg_count(
218            fun.name(),
219            &input_exprs_fields,
220            &fun.signature().type_signature,
221        )?;
222
223        let return_field = fun.return_field(&input_exprs_fields)?;
224        let is_nullable = fun.is_nullable();
225        let name = match alias {
226            None => {
227                return internal_err!(
228                    "AggregateExprBuilder::alias must be provided prior to calling build"
229                )
230            }
231            Some(alias) => alias,
232        };
233
234        let arg_fields = args
235            .iter()
236            .map(|e| e.return_field(schema.as_ref()))
237            .collect::<Result<Vec<_>>>()?;
238
239        Ok(AggregateFunctionExpr {
240            fun: Arc::unwrap_or_clone(fun),
241            args,
242            arg_fields,
243            return_field,
244            name,
245            human_display,
246            schema: Arc::unwrap_or_clone(schema),
247            order_bys,
248            ignore_nulls,
249            ordering_fields,
250            is_distinct,
251            input_fields: input_exprs_fields,
252            is_reversed,
253            is_nullable,
254        })
255    }
256
257    pub fn alias(mut self, alias: impl Into<String>) -> Self {
258        self.alias = Some(alias.into());
259        self
260    }
261
262    pub fn human_display(mut self, name: String) -> Self {
263        self.human_display = name;
264        self
265    }
266
267    pub fn schema(mut self, schema: SchemaRef) -> Self {
268        self.schema = schema;
269        self
270    }
271
272    pub fn order_by(mut self, order_bys: Vec<PhysicalSortExpr>) -> Self {
273        self.order_bys = order_bys;
274        self
275    }
276
277    pub fn reversed(mut self) -> Self {
278        self.is_reversed = true;
279        self
280    }
281
282    pub fn with_reversed(mut self, is_reversed: bool) -> Self {
283        self.is_reversed = is_reversed;
284        self
285    }
286
287    pub fn distinct(mut self) -> Self {
288        self.is_distinct = true;
289        self
290    }
291
292    pub fn with_distinct(mut self, is_distinct: bool) -> Self {
293        self.is_distinct = is_distinct;
294        self
295    }
296
297    pub fn ignore_nulls(mut self) -> Self {
298        self.ignore_nulls = true;
299        self
300    }
301
302    pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
303        self.ignore_nulls = ignore_nulls;
304        self
305    }
306}
307
308/// Physical aggregate expression of a UDAF.
309///
310/// Instances are constructed via [`AggregateExprBuilder`].
311#[derive(Debug, Clone)]
312pub struct AggregateFunctionExpr {
313    fun: AggregateUDF,
314    args: Vec<Arc<dyn PhysicalExpr>>,
315    /// Fields corresponding to args (same order & length)
316    arg_fields: Vec<FieldRef>,
317    /// Output / return field of this aggregate
318    return_field: FieldRef,
319    /// Output column name that this expression creates
320    name: String,
321    /// Simplified name for `tree` explain.
322    human_display: String,
323    schema: Schema,
324    // The physical order by expressions
325    order_bys: Vec<PhysicalSortExpr>,
326    // Whether to ignore null values
327    ignore_nulls: bool,
328    // fields used for order sensitive aggregation functions
329    ordering_fields: Vec<FieldRef>,
330    is_distinct: bool,
331    is_reversed: bool,
332    input_fields: Vec<FieldRef>,
333    is_nullable: bool,
334}
335
336impl AggregateFunctionExpr {
337    /// Return the `AggregateUDF` used by this `AggregateFunctionExpr`
338    pub fn fun(&self) -> &AggregateUDF {
339        &self.fun
340    }
341
342    /// expressions that are passed to the Accumulator.
343    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
344    pub fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
345        self.args.clone()
346    }
347
348    /// Human readable name such as `"MIN(c2)"`.
349    pub fn name(&self) -> &str {
350        &self.name
351    }
352
353    /// Simplified name for `tree` explain.
354    pub fn human_display(&self) -> &str {
355        &self.human_display
356    }
357
358    /// Return if the aggregation is distinct
359    pub fn is_distinct(&self) -> bool {
360        self.is_distinct
361    }
362
363    /// Return if the aggregation ignores nulls
364    pub fn ignore_nulls(&self) -> bool {
365        self.ignore_nulls
366    }
367
368    /// Return if the aggregation is reversed
369    pub fn is_reversed(&self) -> bool {
370        self.is_reversed
371    }
372
373    /// Return if the aggregation is nullable
374    pub fn is_nullable(&self) -> bool {
375        self.is_nullable
376    }
377
378    /// the field of the final result of this aggregation.
379    pub fn field(&self) -> FieldRef {
380        self.return_field
381            .as_ref()
382            .clone()
383            .with_name(&self.name)
384            .into()
385    }
386
387    /// the accumulator used to accumulate values from the expressions.
388    /// the accumulator expects the same number of arguments as `expressions` and must
389    /// return states with the same description as `state_fields`
390    pub fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
391        let acc_args = AccumulatorArgs {
392            return_field: Arc::clone(&self.return_field),
393            schema: &self.schema,
394            expr_fields: &self.arg_fields,
395            ignore_nulls: self.ignore_nulls,
396            order_bys: self.order_bys.as_ref(),
397            is_distinct: self.is_distinct,
398            name: &self.name,
399            is_reversed: self.is_reversed,
400            exprs: &self.args,
401        };
402
403        self.fun.accumulator(acc_args)
404    }
405
406    /// the field of the final result of this aggregation.
407    pub fn state_fields(&self) -> Result<Vec<FieldRef>> {
408        let args = StateFieldsArgs {
409            name: &self.name,
410            input_fields: &self.input_fields,
411            return_field: Arc::clone(&self.return_field),
412            ordering_fields: &self.ordering_fields,
413            is_distinct: self.is_distinct,
414        };
415
416        self.fun.state_fields(args)
417    }
418
419    /// Returns the ORDER BY expressions for the aggregate function.
420    pub fn order_bys(&self) -> &[PhysicalSortExpr] {
421        if self.order_sensitivity().is_insensitive() {
422            &[]
423        } else {
424            &self.order_bys
425        }
426    }
427
428    /// Indicates whether aggregator can produce the correct result with any
429    /// arbitrary input ordering. By default, we assume that aggregate expressions
430    /// are order insensitive.
431    pub fn order_sensitivity(&self) -> AggregateOrderSensitivity {
432        if self.order_bys.is_empty() {
433            AggregateOrderSensitivity::Insensitive
434        } else {
435            // If there is an ORDER BY clause, use the sensitivity of the implementation:
436            self.fun.order_sensitivity()
437        }
438    }
439
440    /// Sets the indicator whether ordering requirements of the aggregator is
441    /// satisfied by its input. If this is not the case, aggregators with order
442    /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
443    /// the correct result with possibly more work internally.
444    ///
445    /// # Returns
446    ///
447    /// Returns `Ok(Some(updated_expr))` if the process completes successfully.
448    /// If the expression can benefit from existing input ordering, but does
449    /// not implement the method, returns an error. Order insensitive and hard
450    /// requirement aggregators return `Ok(None)`.
451    pub fn with_beneficial_ordering(
452        self: Arc<Self>,
453        beneficial_ordering: bool,
454    ) -> Result<Option<AggregateFunctionExpr>> {
455        let Some(updated_fn) = self
456            .fun
457            .clone()
458            .with_beneficial_ordering(beneficial_ordering)?
459        else {
460            return Ok(None);
461        };
462
463        AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec())
464            .order_by(self.order_bys.clone())
465            .schema(Arc::new(self.schema.clone()))
466            .alias(self.name().to_string())
467            .with_ignore_nulls(self.ignore_nulls)
468            .with_distinct(self.is_distinct)
469            .with_reversed(self.is_reversed)
470            .build()
471            .map(Some)
472    }
473
474    /// Creates accumulator implementation that supports retract
475    pub fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
476        let args = AccumulatorArgs {
477            return_field: Arc::clone(&self.return_field),
478            schema: &self.schema,
479            expr_fields: &self.arg_fields,
480            ignore_nulls: self.ignore_nulls,
481            order_bys: self.order_bys.as_ref(),
482            is_distinct: self.is_distinct,
483            name: &self.name,
484            is_reversed: self.is_reversed,
485            exprs: &self.args,
486        };
487
488        let accumulator = self.fun.create_sliding_accumulator(args)?;
489
490        // Accumulators that have window frame startings different
491        // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to
492        // implement retract_batch method in order to run correctly
493        // currently in DataFusion.
494        //
495        // If this `retract_batches` is not present, there is no way
496        // to calculate result correctly. For example, the query
497        //
498        // ```sql
499        // SELECT
500        //  SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a
501        // FROM
502        //  t
503        // ```
504        //
505        // 1. First sum value will be the sum of rows between `[0, 1)`,
506        //
507        // 2. Second sum value will be the sum of rows between `[0, 2)`
508        //
509        // 3. Third sum value will be the sum of rows between `[1, 3)`, etc.
510        //
511        // Since the accumulator keeps the running sum:
512        //
513        // 1. First sum we add to the state sum value between `[0, 1)`
514        //
515        // 2. Second sum we add to the state sum value between `[1, 2)`
516        // (`[0, 1)` is already in the state sum, hence running sum will
517        // cover `[0, 2)` range)
518        //
519        // 3. Third sum we add to the state sum value between `[2, 3)`
520        // (`[0, 2)` is already in the state sum).  Also we need to
521        // retract values between `[0, 1)` by this way we can obtain sum
522        // between [1, 3) which is indeed the appropriate range.
523        //
524        // When we use `UNBOUNDED PRECEDING` in the query starting
525        // index will always be 0 for the desired range, and hence the
526        // `retract_batch` method will not be called. In this case
527        // having retract_batch is not a requirement.
528        //
529        // This approach is a a bit different than window function
530        // approach. In window function (when they use a window frame)
531        // they get all the desired range during evaluation.
532        if !accumulator.supports_retract_batch() {
533            return not_impl_err!(
534                "Aggregate can not be used as a sliding accumulator because \
535                     `retract_batch` is not implemented: {}",
536                self.name
537            );
538        }
539        Ok(accumulator)
540    }
541
542    /// If the aggregate expression has a specialized
543    /// [`GroupsAccumulator`] implementation. If this returns true,
544    /// `[Self::create_groups_accumulator`] will be called.
545    pub fn groups_accumulator_supported(&self) -> bool {
546        let args = AccumulatorArgs {
547            return_field: Arc::clone(&self.return_field),
548            schema: &self.schema,
549            expr_fields: &self.arg_fields,
550            ignore_nulls: self.ignore_nulls,
551            order_bys: self.order_bys.as_ref(),
552            is_distinct: self.is_distinct,
553            name: &self.name,
554            is_reversed: self.is_reversed,
555            exprs: &self.args,
556        };
557        self.fun.groups_accumulator_supported(args)
558    }
559
560    /// Return a specialized [`GroupsAccumulator`] that manages state
561    /// for all groups.
562    ///
563    /// For maximum performance, a [`GroupsAccumulator`] should be
564    /// implemented in addition to [`Accumulator`].
565    pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
566        let args = AccumulatorArgs {
567            return_field: Arc::clone(&self.return_field),
568            schema: &self.schema,
569            expr_fields: &self.arg_fields,
570            ignore_nulls: self.ignore_nulls,
571            order_bys: self.order_bys.as_ref(),
572            is_distinct: self.is_distinct,
573            name: &self.name,
574            is_reversed: self.is_reversed,
575            exprs: &self.args,
576        };
577        self.fun.create_groups_accumulator(args)
578    }
579
580    /// Construct an expression that calculates the aggregate in reverse.
581    /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
582    /// For aggregates that do not support calculation in reverse,
583    /// returns None (which is the default value).
584    pub fn reverse_expr(&self) -> Option<AggregateFunctionExpr> {
585        match self.fun.reverse_udf() {
586            ReversedUDAF::NotSupported => None,
587            ReversedUDAF::Identical => Some(self.clone()),
588            ReversedUDAF::Reversed(reverse_udf) => {
589                let mut name = self.name().to_string();
590                // If the function is changed, we need to reverse order_by clause as well
591                // i.e. First(a order by b asc null first) -> Last(a order by b desc null last)
592                if self.fun().name() != reverse_udf.name() {
593                    replace_order_by_clause(&mut name);
594                }
595                replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name());
596
597                AggregateExprBuilder::new(reverse_udf, self.args.to_vec())
598                    .order_by(self.order_bys.iter().map(|e| e.reverse()).collect())
599                    .schema(Arc::new(self.schema.clone()))
600                    .alias(name)
601                    .with_ignore_nulls(self.ignore_nulls)
602                    .with_distinct(self.is_distinct)
603                    .with_reversed(!self.is_reversed)
604                    .build()
605                    .ok()
606            }
607        }
608    }
609
610    /// Returns all expressions used in the [`AggregateFunctionExpr`].
611    /// These expressions are  (1)function arguments, (2) order by expressions.
612    pub fn all_expressions(&self) -> AggregatePhysicalExpressions {
613        let args = self.expressions();
614        let order_by_exprs = self
615            .order_bys()
616            .iter()
617            .map(|sort_expr| Arc::clone(&sort_expr.expr))
618            .collect();
619        AggregatePhysicalExpressions {
620            args,
621            order_by_exprs,
622        }
623    }
624
625    /// Rewrites [`AggregateFunctionExpr`], with new expressions given. The argument should be consistent
626    /// with the return value of the [`AggregateFunctionExpr::all_expressions`] method.
627    /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
628    pub fn with_new_expressions(
629        &self,
630        args: Vec<Arc<dyn PhysicalExpr>>,
631        order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
632    ) -> Option<AggregateFunctionExpr> {
633        if args.len() != self.args.len()
634            || (self.order_sensitivity() != AggregateOrderSensitivity::Insensitive
635                && order_by_exprs.len() != self.order_bys.len())
636        {
637            return None;
638        }
639
640        let new_order_bys = self
641            .order_bys
642            .iter()
643            .zip(order_by_exprs)
644            .map(|(req, new_expr)| PhysicalSortExpr {
645                expr: new_expr,
646                options: req.options,
647            })
648            .collect();
649
650        Some(AggregateFunctionExpr {
651            fun: self.fun.clone(),
652            args,
653            // TODO: need to align arg_fields here with new args
654            //       https://github.com/apache/datafusion/issues/18149
655            arg_fields: self.arg_fields.clone(),
656            return_field: Arc::clone(&self.return_field),
657            name: self.name.clone(),
658            // TODO: Human name should be updated after re-write to not mislead
659            human_display: self.human_display.clone(),
660            schema: self.schema.clone(),
661            order_bys: new_order_bys,
662            ignore_nulls: self.ignore_nulls,
663            ordering_fields: self.ordering_fields.clone(),
664            is_distinct: self.is_distinct,
665            is_reversed: false,
666            input_fields: self.input_fields.clone(),
667            is_nullable: self.is_nullable,
668        })
669    }
670
671    /// If this function is max, return (output_field, true)
672    /// if the function is min, return (output_field, false)
673    /// otherwise return None (the default)
674    ///
675    /// output_field is the name of the column produced by this aggregate
676    ///
677    /// Note: this is used to use special aggregate implementations in certain conditions
678    pub fn get_minmax_desc(&self) -> Option<(FieldRef, bool)> {
679        self.fun.is_descending().map(|flag| (self.field(), flag))
680    }
681
682    /// Returns default value of the function given the input is Null
683    /// Most of the aggregate function return Null if input is Null,
684    /// while `count` returns 0 if input is Null
685    pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
686        self.fun.default_value(data_type)
687    }
688
689    /// Indicates whether the aggregation function is monotonic as a set
690    /// function. See [`SetMonotonicity`] for details.
691    pub fn set_monotonicity(&self) -> SetMonotonicity {
692        let field = self.field();
693        let data_type = field.data_type();
694        self.fun.inner().set_monotonicity(data_type)
695    }
696
697    /// Returns `PhysicalSortExpr` based on the set monotonicity of the function.
698    pub fn get_result_ordering(&self, aggr_func_idx: usize) -> Option<PhysicalSortExpr> {
699        // If the aggregate expressions are set-monotonic, the output data is
700        // naturally ordered with it per group or partition.
701        let monotonicity = self.set_monotonicity();
702        if monotonicity == SetMonotonicity::NotMonotonic {
703            return None;
704        }
705        let expr = Arc::new(Column::new(self.name(), aggr_func_idx));
706        let options =
707            SortOptions::new(monotonicity == SetMonotonicity::Decreasing, false);
708        Some(PhysicalSortExpr { expr, options })
709    }
710}
711
712/// Stores the physical expressions used inside the `AggregateExpr`.
713pub struct AggregatePhysicalExpressions {
714    /// Aggregate function arguments
715    pub args: Vec<Arc<dyn PhysicalExpr>>,
716    /// Order by expressions
717    pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
718}
719
720impl PartialEq for AggregateFunctionExpr {
721    fn eq(&self, other: &Self) -> bool {
722        self.name == other.name
723            && self.return_field == other.return_field
724            && self.fun == other.fun
725            && self.args.len() == other.args.len()
726            && self
727                .args
728                .iter()
729                .zip(other.args.iter())
730                .all(|(this_arg, other_arg)| this_arg.eq(other_arg))
731    }
732}
733
734fn replace_order_by_clause(order_by: &mut String) {
735    let suffixes = [
736        (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
737        (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
738        (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
739        (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
740    ];
741
742    if let Some(start) = order_by.find("ORDER BY [") {
743        if let Some(end) = order_by[start..].find(']') {
744            let order_by_start = start + 9;
745            let order_by_end = start + end;
746
747            let column_order = &order_by[order_by_start..=order_by_end];
748            for (suffix, replacement) in suffixes {
749                if column_order.ends_with(suffix) {
750                    let new_order = column_order.replace(suffix, replacement);
751                    order_by.replace_range(order_by_start..=order_by_end, &new_order);
752                    break;
753                }
754            }
755        }
756    }
757}
758
759fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) {
760    *aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
761}