datafusion_physical_expr/
aggregate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub(crate) mod groups_accumulator {
19    #[allow(unused_imports)]
20    pub(crate) mod accumulate {
21        pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState;
22    }
23    pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{
24        accumulate::NullState, GroupsAccumulatorAdapter,
25    };
26}
27pub(crate) mod stats {
28    pub use datafusion_functions_aggregate_common::stats::StatsType;
29}
30pub mod utils {
31    #[allow(deprecated)] // allow adjust_output_array
32    pub use datafusion_functions_aggregate_common::utils::{
33        adjust_output_array, get_accum_scalar_values_as_arrays, get_sort_options,
34        ordering_fields, DecimalAverager, Hashable,
35    };
36}
37
38use std::fmt::Debug;
39use std::sync::Arc;
40
41use crate::expressions::Column;
42
43use arrow::compute::SortOptions;
44use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
45use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
46use datafusion_expr::{AggregateUDF, ReversedUDAF, SetMonotonicity};
47use datafusion_expr_common::accumulator::Accumulator;
48use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
49use datafusion_expr_common::type_coercion::aggregates::check_arg_count;
50use datafusion_functions_aggregate_common::accumulator::{
51    AccumulatorArgs, StateFieldsArgs,
52};
53use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
54use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
56use datafusion_physical_expr_common::utils::reverse_order_bys;
57
58/// Builder for physical [`AggregateFunctionExpr`]
59///
60/// `AggregateFunctionExpr` contains the information necessary to call
61/// an aggregate expression.
62#[derive(Debug, Clone)]
63pub struct AggregateExprBuilder {
64    fun: Arc<AggregateUDF>,
65    /// Physical expressions of the aggregate function
66    args: Vec<Arc<dyn PhysicalExpr>>,
67    alias: Option<String>,
68    /// A human readable name
69    human_display: String,
70    /// Arrow Schema for the aggregate function
71    schema: SchemaRef,
72    /// The physical order by expressions
73    ordering_req: LexOrdering,
74    /// Whether to ignore null values
75    ignore_nulls: bool,
76    /// Whether is distinct aggregate function
77    is_distinct: bool,
78    /// Whether the expression is reversed
79    is_reversed: bool,
80}
81
82impl AggregateExprBuilder {
83    pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> Self {
84        Self {
85            fun,
86            args,
87            alias: None,
88            human_display: String::default(),
89            schema: Arc::new(Schema::empty()),
90            ordering_req: LexOrdering::default(),
91            ignore_nulls: false,
92            is_distinct: false,
93            is_reversed: false,
94        }
95    }
96
97    /// Constructs an `AggregateFunctionExpr` from the builder
98    ///
99    /// Note that an [`Self::alias`] must be provided before calling this method.
100    ///
101    /// # Example: Create an [`AggregateUDF`]
102    ///
103    /// In the following example, [`AggregateFunctionExpr`] will be built using [`AggregateExprBuilder`]
104    /// which provides a build function. Full example could be accessed from the source file.
105    ///
106    /// ```
107    /// # use std::any::Any;
108    /// # use std::sync::Arc;
109    /// # use arrow::datatypes::{DataType, FieldRef};
110    /// # use datafusion_common::{Result, ScalarValue};
111    /// # use datafusion_expr::{col, ColumnarValue, Documentation, Signature, Volatility, Expr};
112    /// # use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator, function::{AccumulatorArgs, StateFieldsArgs}};
113    /// # use arrow::datatypes::Field;
114    /// #
115    /// # #[derive(Debug, Clone)]
116    /// # struct FirstValueUdf {
117    /// #     signature: Signature,
118    /// # }
119    /// #
120    /// # impl FirstValueUdf {
121    /// #     fn new() -> Self {
122    /// #         Self {
123    /// #             signature: Signature::any(1, Volatility::Immutable),
124    /// #         }
125    /// #     }
126    /// # }
127    /// #
128    /// # impl AggregateUDFImpl for FirstValueUdf {
129    /// #     fn as_any(&self) -> &dyn Any {
130    /// #         unimplemented!()
131    /// # }
132    /// #     fn name(&self) -> &str {
133    /// #         unimplemented!()
134    /// }
135    /// #     fn signature(&self) -> &Signature {
136    /// #         unimplemented!()
137    /// # }
138    /// #     fn return_type(&self, args: &[DataType]) -> Result<DataType> {
139    /// #         unimplemented!()
140    /// #     }
141    /// #     
142    /// #     fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
143    /// #         unimplemented!()
144    /// #         }
145    /// #     
146    /// #     fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
147    /// #         unimplemented!()
148    /// #     }
149    /// #     
150    /// #     fn documentation(&self) -> Option<&Documentation> {
151    /// #         unimplemented!()
152    /// #     }
153    /// # }
154    /// #
155    /// # let first_value = AggregateUDF::from(FirstValueUdf::new());
156    /// # let expr = first_value.call(vec![col("a")]);
157    /// #
158    /// # use datafusion_physical_expr::expressions::Column;
159    /// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
160    /// # use datafusion_physical_expr::aggregate::AggregateExprBuilder;
161    /// # use datafusion_physical_expr::expressions::PhysicalSortExpr;
162    /// # use datafusion_physical_expr::PhysicalSortRequirement;
163    /// #
164    /// fn build_aggregate_expr() -> Result<()> {
165    ///     let args = vec![Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>];
166    ///     let order_by = vec![PhysicalSortExpr {
167    ///         expr: Arc::new(Column::new("x", 1)) as Arc<dyn PhysicalExpr>,
168    ///         options: Default::default(),
169    ///     }];
170    ///
171    ///     let first_value = AggregateUDF::from(FirstValueUdf::new());
172    ///     
173    ///     let aggregate_expr = AggregateExprBuilder::new(
174    ///         Arc::new(first_value),
175    ///         args
176    ///     )
177    ///     .order_by(order_by.into())
178    ///     .alias("first_a_by_x")
179    ///     .ignore_nulls()
180    ///     .build()?;
181    ///     
182    ///     Ok(())
183    /// }
184    /// ```
185    ///
186    /// This creates a physical expression equivalent to SQL:
187    /// `first_value(a ORDER BY x) IGNORE NULLS AS first_a_by_x`
188    pub fn build(self) -> Result<AggregateFunctionExpr> {
189        let Self {
190            fun,
191            args,
192            alias,
193            human_display,
194            schema,
195            ordering_req,
196            ignore_nulls,
197            is_distinct,
198            is_reversed,
199        } = self;
200        if args.is_empty() {
201            return internal_err!("args should not be empty");
202        }
203
204        let mut ordering_fields = vec![];
205
206        if !ordering_req.is_empty() {
207            let ordering_types = ordering_req
208                .iter()
209                .map(|e| e.expr.data_type(&schema))
210                .collect::<Result<Vec<_>>>()?;
211
212            ordering_fields =
213                utils::ordering_fields(ordering_req.as_ref(), &ordering_types);
214        }
215
216        let input_exprs_fields = args
217            .iter()
218            .map(|arg| arg.return_field(&schema))
219            .collect::<Result<Vec<_>>>()?;
220
221        check_arg_count(
222            fun.name(),
223            &input_exprs_fields,
224            &fun.signature().type_signature,
225        )?;
226
227        let return_field = fun.return_field(&input_exprs_fields)?;
228        let is_nullable = fun.is_nullable();
229        let name = match alias {
230            None => {
231                return internal_err!(
232                    "AggregateExprBuilder::alias must be provided prior to calling build"
233                )
234            }
235            Some(alias) => alias,
236        };
237
238        Ok(AggregateFunctionExpr {
239            fun: Arc::unwrap_or_clone(fun),
240            args,
241            return_field,
242            name,
243            human_display,
244            schema: Arc::unwrap_or_clone(schema),
245            ordering_req,
246            ignore_nulls,
247            ordering_fields,
248            is_distinct,
249            input_fields: input_exprs_fields,
250            is_reversed,
251            is_nullable,
252        })
253    }
254
255    pub fn alias(mut self, alias: impl Into<String>) -> Self {
256        self.alias = Some(alias.into());
257        self
258    }
259
260    pub fn human_display(mut self, name: String) -> Self {
261        self.human_display = name;
262        self
263    }
264
265    pub fn schema(mut self, schema: SchemaRef) -> Self {
266        self.schema = schema;
267        self
268    }
269
270    pub fn order_by(mut self, order_by: LexOrdering) -> Self {
271        self.ordering_req = order_by;
272        self
273    }
274
275    pub fn reversed(mut self) -> Self {
276        self.is_reversed = true;
277        self
278    }
279
280    pub fn with_reversed(mut self, is_reversed: bool) -> Self {
281        self.is_reversed = is_reversed;
282        self
283    }
284
285    pub fn distinct(mut self) -> Self {
286        self.is_distinct = true;
287        self
288    }
289
290    pub fn with_distinct(mut self, is_distinct: bool) -> Self {
291        self.is_distinct = is_distinct;
292        self
293    }
294
295    pub fn ignore_nulls(mut self) -> Self {
296        self.ignore_nulls = true;
297        self
298    }
299
300    pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
301        self.ignore_nulls = ignore_nulls;
302        self
303    }
304}
305
306/// Physical aggregate expression of a UDAF.
307///
308/// Instances are constructed via [`AggregateExprBuilder`].
309#[derive(Debug, Clone)]
310pub struct AggregateFunctionExpr {
311    fun: AggregateUDF,
312    args: Vec<Arc<dyn PhysicalExpr>>,
313    /// Output / return field of this aggregate
314    return_field: FieldRef,
315    /// Output column name that this expression creates
316    name: String,
317    /// Simplified name for `tree` explain.
318    human_display: String,
319    schema: Schema,
320    // The physical order by expressions
321    ordering_req: LexOrdering,
322    // Whether to ignore null values
323    ignore_nulls: bool,
324    // fields used for order sensitive aggregation functions
325    ordering_fields: Vec<FieldRef>,
326    is_distinct: bool,
327    is_reversed: bool,
328    input_fields: Vec<FieldRef>,
329    is_nullable: bool,
330}
331
332impl AggregateFunctionExpr {
333    /// Return the `AggregateUDF` used by this `AggregateFunctionExpr`
334    pub fn fun(&self) -> &AggregateUDF {
335        &self.fun
336    }
337
338    /// expressions that are passed to the Accumulator.
339    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
340    pub fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
341        self.args.clone()
342    }
343
344    /// Human readable name such as `"MIN(c2)"`.
345    pub fn name(&self) -> &str {
346        &self.name
347    }
348
349    /// Simplified name for `tree` explain.
350    pub fn human_display(&self) -> &str {
351        &self.human_display
352    }
353
354    /// Return if the aggregation is distinct
355    pub fn is_distinct(&self) -> bool {
356        self.is_distinct
357    }
358
359    /// Return if the aggregation ignores nulls
360    pub fn ignore_nulls(&self) -> bool {
361        self.ignore_nulls
362    }
363
364    /// Return if the aggregation is reversed
365    pub fn is_reversed(&self) -> bool {
366        self.is_reversed
367    }
368
369    /// Return if the aggregation is nullable
370    pub fn is_nullable(&self) -> bool {
371        self.is_nullable
372    }
373
374    /// the field of the final result of this aggregation.
375    pub fn field(&self) -> FieldRef {
376        self.return_field
377            .as_ref()
378            .clone()
379            .with_name(&self.name)
380            .into()
381    }
382
383    /// the accumulator used to accumulate values from the expressions.
384    /// the accumulator expects the same number of arguments as `expressions` and must
385    /// return states with the same description as `state_fields`
386    pub fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
387        let acc_args = AccumulatorArgs {
388            return_field: Arc::clone(&self.return_field),
389            schema: &self.schema,
390            ignore_nulls: self.ignore_nulls,
391            ordering_req: self.ordering_req.as_ref(),
392            is_distinct: self.is_distinct,
393            name: &self.name,
394            is_reversed: self.is_reversed,
395            exprs: &self.args,
396        };
397
398        self.fun.accumulator(acc_args)
399    }
400
401    /// the field of the final result of this aggregation.
402    pub fn state_fields(&self) -> Result<Vec<FieldRef>> {
403        let args = StateFieldsArgs {
404            name: &self.name,
405            input_fields: &self.input_fields,
406            return_field: Arc::clone(&self.return_field),
407            ordering_fields: &self.ordering_fields,
408            is_distinct: self.is_distinct,
409        };
410
411        self.fun.state_fields(args)
412    }
413
414    /// Order by requirements for the aggregate function
415    /// By default it is `None` (there is no requirement)
416    /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this
417    pub fn order_bys(&self) -> Option<&LexOrdering> {
418        if self.ordering_req.is_empty() {
419            return None;
420        }
421
422        if !self.order_sensitivity().is_insensitive() {
423            return Some(self.ordering_req.as_ref());
424        }
425
426        None
427    }
428
429    /// Indicates whether aggregator can produce the correct result with any
430    /// arbitrary input ordering. By default, we assume that aggregate expressions
431    /// are order insensitive.
432    pub fn order_sensitivity(&self) -> AggregateOrderSensitivity {
433        if !self.ordering_req.is_empty() {
434            // If there is requirement, use the sensitivity of the implementation
435            self.fun.order_sensitivity()
436        } else {
437            // If no requirement, aggregator is order insensitive
438            AggregateOrderSensitivity::Insensitive
439        }
440    }
441
442    /// Sets the indicator whether ordering requirements of the aggregator is
443    /// satisfied by its input. If this is not the case, aggregators with order
444    /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
445    /// the correct result with possibly more work internally.
446    ///
447    /// # Returns
448    ///
449    /// Returns `Ok(Some(updated_expr))` if the process completes successfully.
450    /// If the expression can benefit from existing input ordering, but does
451    /// not implement the method, returns an error. Order insensitive and hard
452    /// requirement aggregators return `Ok(None)`.
453    pub fn with_beneficial_ordering(
454        self: Arc<Self>,
455        beneficial_ordering: bool,
456    ) -> Result<Option<AggregateFunctionExpr>> {
457        let Some(updated_fn) = self
458            .fun
459            .clone()
460            .with_beneficial_ordering(beneficial_ordering)?
461        else {
462            return Ok(None);
463        };
464
465        AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec())
466            .order_by(self.ordering_req.clone())
467            .schema(Arc::new(self.schema.clone()))
468            .alias(self.name().to_string())
469            .with_ignore_nulls(self.ignore_nulls)
470            .with_distinct(self.is_distinct)
471            .with_reversed(self.is_reversed)
472            .build()
473            .map(Some)
474    }
475
476    /// Creates accumulator implementation that supports retract
477    pub fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
478        let args = AccumulatorArgs {
479            return_field: Arc::clone(&self.return_field),
480            schema: &self.schema,
481            ignore_nulls: self.ignore_nulls,
482            ordering_req: self.ordering_req.as_ref(),
483            is_distinct: self.is_distinct,
484            name: &self.name,
485            is_reversed: self.is_reversed,
486            exprs: &self.args,
487        };
488
489        let accumulator = self.fun.create_sliding_accumulator(args)?;
490
491        // Accumulators that have window frame startings different
492        // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to
493        // implement retract_batch method in order to run correctly
494        // currently in DataFusion.
495        //
496        // If this `retract_batches` is not present, there is no way
497        // to calculate result correctly. For example, the query
498        //
499        // ```sql
500        // SELECT
501        //  SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a
502        // FROM
503        //  t
504        // ```
505        //
506        // 1. First sum value will be the sum of rows between `[0, 1)`,
507        //
508        // 2. Second sum value will be the sum of rows between `[0, 2)`
509        //
510        // 3. Third sum value will be the sum of rows between `[1, 3)`, etc.
511        //
512        // Since the accumulator keeps the running sum:
513        //
514        // 1. First sum we add to the state sum value between `[0, 1)`
515        //
516        // 2. Second sum we add to the state sum value between `[1, 2)`
517        // (`[0, 1)` is already in the state sum, hence running sum will
518        // cover `[0, 2)` range)
519        //
520        // 3. Third sum we add to the state sum value between `[2, 3)`
521        // (`[0, 2)` is already in the state sum).  Also we need to
522        // retract values between `[0, 1)` by this way we can obtain sum
523        // between [1, 3) which is indeed the appropriate range.
524        //
525        // When we use `UNBOUNDED PRECEDING` in the query starting
526        // index will always be 0 for the desired range, and hence the
527        // `retract_batch` method will not be called. In this case
528        // having retract_batch is not a requirement.
529        //
530        // This approach is a a bit different than window function
531        // approach. In window function (when they use a window frame)
532        // they get all the desired range during evaluation.
533        if !accumulator.supports_retract_batch() {
534            return not_impl_err!(
535                "Aggregate can not be used as a sliding accumulator because \
536                     `retract_batch` is not implemented: {}",
537                self.name
538            );
539        }
540        Ok(accumulator)
541    }
542
543    /// If the aggregate expression has a specialized
544    /// [`GroupsAccumulator`] implementation. If this returns true,
545    /// `[Self::create_groups_accumulator`] will be called.
546    pub fn groups_accumulator_supported(&self) -> bool {
547        let args = AccumulatorArgs {
548            return_field: Arc::clone(&self.return_field),
549            schema: &self.schema,
550            ignore_nulls: self.ignore_nulls,
551            ordering_req: self.ordering_req.as_ref(),
552            is_distinct: self.is_distinct,
553            name: &self.name,
554            is_reversed: self.is_reversed,
555            exprs: &self.args,
556        };
557        self.fun.groups_accumulator_supported(args)
558    }
559
560    /// Return a specialized [`GroupsAccumulator`] that manages state
561    /// for all groups.
562    ///
563    /// For maximum performance, a [`GroupsAccumulator`] should be
564    /// implemented in addition to [`Accumulator`].
565    pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
566        let args = AccumulatorArgs {
567            return_field: Arc::clone(&self.return_field),
568            schema: &self.schema,
569            ignore_nulls: self.ignore_nulls,
570            ordering_req: self.ordering_req.as_ref(),
571            is_distinct: self.is_distinct,
572            name: &self.name,
573            is_reversed: self.is_reversed,
574            exprs: &self.args,
575        };
576        self.fun.create_groups_accumulator(args)
577    }
578
579    /// Construct an expression that calculates the aggregate in reverse.
580    /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
581    /// For aggregates that do not support calculation in reverse,
582    /// returns None (which is the default value).
583    pub fn reverse_expr(&self) -> Option<AggregateFunctionExpr> {
584        match self.fun.reverse_udf() {
585            ReversedUDAF::NotSupported => None,
586            ReversedUDAF::Identical => Some(self.clone()),
587            ReversedUDAF::Reversed(reverse_udf) => {
588                let reverse_ordering_req = reverse_order_bys(self.ordering_req.as_ref());
589                let mut name = self.name().to_string();
590                // If the function is changed, we need to reverse order_by clause as well
591                // i.e. First(a order by b asc null first) -> Last(a order by b desc null last)
592                if self.fun().name() == reverse_udf.name() {
593                } else {
594                    replace_order_by_clause(&mut name);
595                }
596                replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name());
597
598                AggregateExprBuilder::new(reverse_udf, self.args.to_vec())
599                    .order_by(reverse_ordering_req)
600                    .schema(Arc::new(self.schema.clone()))
601                    .alias(name)
602                    .with_ignore_nulls(self.ignore_nulls)
603                    .with_distinct(self.is_distinct)
604                    .with_reversed(!self.is_reversed)
605                    .build()
606                    .ok()
607            }
608        }
609    }
610
611    /// Returns all expressions used in the [`AggregateFunctionExpr`].
612    /// These expressions are  (1)function arguments, (2) order by expressions.
613    pub fn all_expressions(&self) -> AggregatePhysicalExpressions {
614        let args = self.expressions();
615        let order_bys = self
616            .order_bys()
617            .cloned()
618            .unwrap_or_else(LexOrdering::default);
619        let order_by_exprs = order_bys
620            .iter()
621            .map(|sort_expr| Arc::clone(&sort_expr.expr))
622            .collect::<Vec<_>>();
623        AggregatePhysicalExpressions {
624            args,
625            order_by_exprs,
626        }
627    }
628
629    /// Rewrites [`AggregateFunctionExpr`], with new expressions given. The argument should be consistent
630    /// with the return value of the [`AggregateFunctionExpr::all_expressions`] method.
631    /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
632    pub fn with_new_expressions(
633        &self,
634        _args: Vec<Arc<dyn PhysicalExpr>>,
635        _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
636    ) -> Option<AggregateFunctionExpr> {
637        None
638    }
639
640    /// If this function is max, return (output_field, true)
641    /// if the function is min, return (output_field, false)
642    /// otherwise return None (the default)
643    ///
644    /// output_field is the name of the column produced by this aggregate
645    ///
646    /// Note: this is used to use special aggregate implementations in certain conditions
647    pub fn get_minmax_desc(&self) -> Option<(FieldRef, bool)> {
648        self.fun.is_descending().map(|flag| (self.field(), flag))
649    }
650
651    /// Returns default value of the function given the input is Null
652    /// Most of the aggregate function return Null if input is Null,
653    /// while `count` returns 0 if input is Null
654    pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
655        self.fun.default_value(data_type)
656    }
657
658    /// Indicates whether the aggregation function is monotonic as a set
659    /// function. See [`SetMonotonicity`] for details.
660    pub fn set_monotonicity(&self) -> SetMonotonicity {
661        let field = self.field();
662        let data_type = field.data_type();
663        self.fun.inner().set_monotonicity(data_type)
664    }
665
666    /// Returns `PhysicalSortExpr` based on the set monotonicity of the function.
667    pub fn get_result_ordering(&self, aggr_func_idx: usize) -> Option<PhysicalSortExpr> {
668        // If the aggregate expressions are set-monotonic, the output data is
669        // naturally ordered with it per group or partition.
670        let monotonicity = self.set_monotonicity();
671        if monotonicity == SetMonotonicity::NotMonotonic {
672            return None;
673        }
674        let expr = Arc::new(Column::new(self.name(), aggr_func_idx));
675        let options =
676            SortOptions::new(monotonicity == SetMonotonicity::Decreasing, false);
677        Some(PhysicalSortExpr { expr, options })
678    }
679}
680
681/// Stores the physical expressions used inside the `AggregateExpr`.
682pub struct AggregatePhysicalExpressions {
683    /// Aggregate function arguments
684    pub args: Vec<Arc<dyn PhysicalExpr>>,
685    /// Order by expressions
686    pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
687}
688
689impl PartialEq for AggregateFunctionExpr {
690    fn eq(&self, other: &Self) -> bool {
691        self.name == other.name
692            && self.return_field == other.return_field
693            && self.fun == other.fun
694            && self.args.len() == other.args.len()
695            && self
696                .args
697                .iter()
698                .zip(other.args.iter())
699                .all(|(this_arg, other_arg)| this_arg.eq(other_arg))
700    }
701}
702
703fn replace_order_by_clause(order_by: &mut String) {
704    let suffixes = [
705        (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
706        (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
707        (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
708        (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
709    ];
710
711    if let Some(start) = order_by.find("ORDER BY [") {
712        if let Some(end) = order_by[start..].find(']') {
713            let order_by_start = start + 9;
714            let order_by_end = start + end;
715
716            let column_order = &order_by[order_by_start..=order_by_end];
717            for (suffix, replacement) in suffixes {
718                if column_order.ends_with(suffix) {
719                    let new_order = column_order.replace(suffix, replacement);
720                    order_by.replace_range(order_by_start..=order_by_end, &new_order);
721                    break;
722                }
723            }
724        }
725    }
726}
727
728fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) {
729    *aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
730}