datafusion_physical_expr/
aggregate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub(crate) mod groups_accumulator {
19    #[allow(unused_imports)]
20    pub(crate) mod accumulate {
21        pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState;
22    }
23    pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{
24        accumulate::NullState, GroupsAccumulatorAdapter,
25    };
26}
27pub(crate) mod stats {
28    pub use datafusion_functions_aggregate_common::stats::StatsType;
29}
30pub mod utils {
31    #[allow(deprecated)] // allow adjust_output_array
32    pub use datafusion_functions_aggregate_common::utils::{
33        adjust_output_array, get_accum_scalar_values_as_arrays, get_sort_options,
34        ordering_fields, DecimalAverager, Hashable,
35    };
36}
37
38use std::fmt::Debug;
39use std::sync::Arc;
40
41use crate::expressions::Column;
42
43use arrow::compute::SortOptions;
44use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
45use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
46use datafusion_expr::{AggregateUDF, ReversedUDAF, SetMonotonicity};
47use datafusion_expr_common::accumulator::Accumulator;
48use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
49use datafusion_expr_common::type_coercion::aggregates::check_arg_count;
50use datafusion_functions_aggregate_common::accumulator::{
51    AccumulatorArgs, StateFieldsArgs,
52};
53use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
54use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
56
57/// Builder for physical [`AggregateFunctionExpr`]
58///
59/// `AggregateFunctionExpr` contains the information necessary to call
60/// an aggregate expression.
61#[derive(Debug, Clone)]
62pub struct AggregateExprBuilder {
63    fun: Arc<AggregateUDF>,
64    /// Physical expressions of the aggregate function
65    args: Vec<Arc<dyn PhysicalExpr>>,
66    alias: Option<String>,
67    /// A human readable name
68    human_display: String,
69    /// Arrow Schema for the aggregate function
70    schema: SchemaRef,
71    /// The physical order by expressions
72    order_bys: Vec<PhysicalSortExpr>,
73    /// Whether to ignore null values
74    ignore_nulls: bool,
75    /// Whether is distinct aggregate function
76    is_distinct: bool,
77    /// Whether the expression is reversed
78    is_reversed: bool,
79}
80
81impl AggregateExprBuilder {
82    pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> Self {
83        Self {
84            fun,
85            args,
86            alias: None,
87            human_display: String::default(),
88            schema: Arc::new(Schema::empty()),
89            order_bys: vec![],
90            ignore_nulls: false,
91            is_distinct: false,
92            is_reversed: false,
93        }
94    }
95
96    /// Constructs an `AggregateFunctionExpr` from the builder
97    ///
98    /// Note that an [`Self::alias`] must be provided before calling this method.
99    ///
100    /// # Example: Create an [`AggregateUDF`]
101    ///
102    /// In the following example, [`AggregateFunctionExpr`] will be built using [`AggregateExprBuilder`]
103    /// which provides a build function. Full example could be accessed from the source file.
104    ///
105    /// ```
106    /// # use std::any::Any;
107    /// # use std::sync::Arc;
108    /// # use arrow::datatypes::{DataType, FieldRef};
109    /// # use datafusion_common::{Result, ScalarValue};
110    /// # use datafusion_expr::{col, ColumnarValue, Documentation, Signature, Volatility, Expr};
111    /// # use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator, function::{AccumulatorArgs, StateFieldsArgs}};
112    /// # use arrow::datatypes::Field;
113    /// #
114    /// # #[derive(Debug, Clone)]
115    /// # struct FirstValueUdf {
116    /// #     signature: Signature,
117    /// # }
118    /// #
119    /// # impl FirstValueUdf {
120    /// #     fn new() -> Self {
121    /// #         Self {
122    /// #             signature: Signature::any(1, Volatility::Immutable),
123    /// #         }
124    /// #     }
125    /// # }
126    /// #
127    /// # impl AggregateUDFImpl for FirstValueUdf {
128    /// #     fn as_any(&self) -> &dyn Any {
129    /// #         unimplemented!()
130    /// #     }
131    /// #
132    /// #     fn name(&self) -> &str {
133    /// #         unimplemented!()
134    /// #     }
135    /// #
136    /// #     fn signature(&self) -> &Signature {
137    /// #         unimplemented!()
138    /// #     }
139    /// #
140    /// #     fn return_type(&self, args: &[DataType]) -> Result<DataType> {
141    /// #         unimplemented!()
142    /// #     }
143    /// #
144    /// #     fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
145    /// #         unimplemented!()
146    /// #         }
147    /// #     
148    /// #     fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
149    /// #         unimplemented!()
150    /// #     }
151    /// #
152    /// #     fn documentation(&self) -> Option<&Documentation> {
153    /// #         unimplemented!()
154    /// #     }
155    /// # }
156    /// #
157    /// # let first_value = AggregateUDF::from(FirstValueUdf::new());
158    /// # let expr = first_value.call(vec![col("a")]);
159    /// #
160    /// # use datafusion_physical_expr::expressions::Column;
161    /// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
162    /// # use datafusion_physical_expr::aggregate::AggregateExprBuilder;
163    /// # use datafusion_physical_expr::expressions::PhysicalSortExpr;
164    /// # use datafusion_physical_expr::PhysicalSortRequirement;
165    /// #
166    /// fn build_aggregate_expr() -> Result<()> {
167    ///     let args = vec![Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>];
168    ///     let order_by = vec![PhysicalSortExpr {
169    ///         expr: Arc::new(Column::new("x", 1)) as Arc<dyn PhysicalExpr>,
170    ///         options: Default::default(),
171    ///     }];
172    ///
173    ///     let first_value = AggregateUDF::from(FirstValueUdf::new());
174    ///
175    ///     let aggregate_expr = AggregateExprBuilder::new(
176    ///         Arc::new(first_value),
177    ///         args
178    ///     )
179    ///     .order_by(order_by)
180    ///     .alias("first_a_by_x")
181    ///     .ignore_nulls()
182    ///     .build()?;
183    ///
184    ///     Ok(())
185    /// }
186    /// ```
187    ///
188    /// This creates a physical expression equivalent to SQL:
189    /// `first_value(a ORDER BY x) IGNORE NULLS AS first_a_by_x`
190    pub fn build(self) -> Result<AggregateFunctionExpr> {
191        let Self {
192            fun,
193            args,
194            alias,
195            human_display,
196            schema,
197            order_bys,
198            ignore_nulls,
199            is_distinct,
200            is_reversed,
201        } = self;
202        if args.is_empty() {
203            return internal_err!("args should not be empty");
204        }
205
206        let ordering_types = order_bys
207            .iter()
208            .map(|e| e.expr.data_type(&schema))
209            .collect::<Result<Vec<_>>>()?;
210
211        let ordering_fields = utils::ordering_fields(&order_bys, &ordering_types);
212
213        let input_exprs_fields = args
214            .iter()
215            .map(|arg| arg.return_field(&schema))
216            .collect::<Result<Vec<_>>>()?;
217
218        check_arg_count(
219            fun.name(),
220            &input_exprs_fields,
221            &fun.signature().type_signature,
222        )?;
223
224        let return_field = fun.return_field(&input_exprs_fields)?;
225        let is_nullable = fun.is_nullable();
226        let name = match alias {
227            None => {
228                return internal_err!(
229                    "AggregateExprBuilder::alias must be provided prior to calling build"
230                )
231            }
232            Some(alias) => alias,
233        };
234
235        Ok(AggregateFunctionExpr {
236            fun: Arc::unwrap_or_clone(fun),
237            args,
238            return_field,
239            name,
240            human_display,
241            schema: Arc::unwrap_or_clone(schema),
242            order_bys,
243            ignore_nulls,
244            ordering_fields,
245            is_distinct,
246            input_fields: input_exprs_fields,
247            is_reversed,
248            is_nullable,
249        })
250    }
251
252    pub fn alias(mut self, alias: impl Into<String>) -> Self {
253        self.alias = Some(alias.into());
254        self
255    }
256
257    pub fn human_display(mut self, name: String) -> Self {
258        self.human_display = name;
259        self
260    }
261
262    pub fn schema(mut self, schema: SchemaRef) -> Self {
263        self.schema = schema;
264        self
265    }
266
267    pub fn order_by(mut self, order_bys: Vec<PhysicalSortExpr>) -> Self {
268        self.order_bys = order_bys;
269        self
270    }
271
272    pub fn reversed(mut self) -> Self {
273        self.is_reversed = true;
274        self
275    }
276
277    pub fn with_reversed(mut self, is_reversed: bool) -> Self {
278        self.is_reversed = is_reversed;
279        self
280    }
281
282    pub fn distinct(mut self) -> Self {
283        self.is_distinct = true;
284        self
285    }
286
287    pub fn with_distinct(mut self, is_distinct: bool) -> Self {
288        self.is_distinct = is_distinct;
289        self
290    }
291
292    pub fn ignore_nulls(mut self) -> Self {
293        self.ignore_nulls = true;
294        self
295    }
296
297    pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
298        self.ignore_nulls = ignore_nulls;
299        self
300    }
301}
302
303/// Physical aggregate expression of a UDAF.
304///
305/// Instances are constructed via [`AggregateExprBuilder`].
306#[derive(Debug, Clone)]
307pub struct AggregateFunctionExpr {
308    fun: AggregateUDF,
309    args: Vec<Arc<dyn PhysicalExpr>>,
310    /// Output / return field of this aggregate
311    return_field: FieldRef,
312    /// Output column name that this expression creates
313    name: String,
314    /// Simplified name for `tree` explain.
315    human_display: String,
316    schema: Schema,
317    // The physical order by expressions
318    order_bys: Vec<PhysicalSortExpr>,
319    // Whether to ignore null values
320    ignore_nulls: bool,
321    // fields used for order sensitive aggregation functions
322    ordering_fields: Vec<FieldRef>,
323    is_distinct: bool,
324    is_reversed: bool,
325    input_fields: Vec<FieldRef>,
326    is_nullable: bool,
327}
328
329impl AggregateFunctionExpr {
330    /// Return the `AggregateUDF` used by this `AggregateFunctionExpr`
331    pub fn fun(&self) -> &AggregateUDF {
332        &self.fun
333    }
334
335    /// expressions that are passed to the Accumulator.
336    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
337    pub fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
338        self.args.clone()
339    }
340
341    /// Human readable name such as `"MIN(c2)"`.
342    pub fn name(&self) -> &str {
343        &self.name
344    }
345
346    /// Simplified name for `tree` explain.
347    pub fn human_display(&self) -> &str {
348        &self.human_display
349    }
350
351    /// Return if the aggregation is distinct
352    pub fn is_distinct(&self) -> bool {
353        self.is_distinct
354    }
355
356    /// Return if the aggregation ignores nulls
357    pub fn ignore_nulls(&self) -> bool {
358        self.ignore_nulls
359    }
360
361    /// Return if the aggregation is reversed
362    pub fn is_reversed(&self) -> bool {
363        self.is_reversed
364    }
365
366    /// Return if the aggregation is nullable
367    pub fn is_nullable(&self) -> bool {
368        self.is_nullable
369    }
370
371    /// the field of the final result of this aggregation.
372    pub fn field(&self) -> FieldRef {
373        self.return_field
374            .as_ref()
375            .clone()
376            .with_name(&self.name)
377            .into()
378    }
379
380    /// the accumulator used to accumulate values from the expressions.
381    /// the accumulator expects the same number of arguments as `expressions` and must
382    /// return states with the same description as `state_fields`
383    pub fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
384        let acc_args = AccumulatorArgs {
385            return_field: Arc::clone(&self.return_field),
386            schema: &self.schema,
387            ignore_nulls: self.ignore_nulls,
388            order_bys: self.order_bys.as_ref(),
389            is_distinct: self.is_distinct,
390            name: &self.name,
391            is_reversed: self.is_reversed,
392            exprs: &self.args,
393        };
394
395        self.fun.accumulator(acc_args)
396    }
397
398    /// the field of the final result of this aggregation.
399    pub fn state_fields(&self) -> Result<Vec<FieldRef>> {
400        let args = StateFieldsArgs {
401            name: &self.name,
402            input_fields: &self.input_fields,
403            return_field: Arc::clone(&self.return_field),
404            ordering_fields: &self.ordering_fields,
405            is_distinct: self.is_distinct,
406        };
407
408        self.fun.state_fields(args)
409    }
410
411    /// Returns the ORDER BY expressions for the aggregate function.
412    pub fn order_bys(&self) -> &[PhysicalSortExpr] {
413        if self.order_sensitivity().is_insensitive() {
414            &[]
415        } else {
416            &self.order_bys
417        }
418    }
419
420    /// Indicates whether aggregator can produce the correct result with any
421    /// arbitrary input ordering. By default, we assume that aggregate expressions
422    /// are order insensitive.
423    pub fn order_sensitivity(&self) -> AggregateOrderSensitivity {
424        if self.order_bys.is_empty() {
425            AggregateOrderSensitivity::Insensitive
426        } else {
427            // If there is an ORDER BY clause, use the sensitivity of the implementation:
428            self.fun.order_sensitivity()
429        }
430    }
431
432    /// Sets the indicator whether ordering requirements of the aggregator is
433    /// satisfied by its input. If this is not the case, aggregators with order
434    /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
435    /// the correct result with possibly more work internally.
436    ///
437    /// # Returns
438    ///
439    /// Returns `Ok(Some(updated_expr))` if the process completes successfully.
440    /// If the expression can benefit from existing input ordering, but does
441    /// not implement the method, returns an error. Order insensitive and hard
442    /// requirement aggregators return `Ok(None)`.
443    pub fn with_beneficial_ordering(
444        self: Arc<Self>,
445        beneficial_ordering: bool,
446    ) -> Result<Option<AggregateFunctionExpr>> {
447        let Some(updated_fn) = self
448            .fun
449            .clone()
450            .with_beneficial_ordering(beneficial_ordering)?
451        else {
452            return Ok(None);
453        };
454
455        AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec())
456            .order_by(self.order_bys.clone())
457            .schema(Arc::new(self.schema.clone()))
458            .alias(self.name().to_string())
459            .with_ignore_nulls(self.ignore_nulls)
460            .with_distinct(self.is_distinct)
461            .with_reversed(self.is_reversed)
462            .build()
463            .map(Some)
464    }
465
466    /// Creates accumulator implementation that supports retract
467    pub fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
468        let args = AccumulatorArgs {
469            return_field: Arc::clone(&self.return_field),
470            schema: &self.schema,
471            ignore_nulls: self.ignore_nulls,
472            order_bys: self.order_bys.as_ref(),
473            is_distinct: self.is_distinct,
474            name: &self.name,
475            is_reversed: self.is_reversed,
476            exprs: &self.args,
477        };
478
479        let accumulator = self.fun.create_sliding_accumulator(args)?;
480
481        // Accumulators that have window frame startings different
482        // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to
483        // implement retract_batch method in order to run correctly
484        // currently in DataFusion.
485        //
486        // If this `retract_batches` is not present, there is no way
487        // to calculate result correctly. For example, the query
488        //
489        // ```sql
490        // SELECT
491        //  SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a
492        // FROM
493        //  t
494        // ```
495        //
496        // 1. First sum value will be the sum of rows between `[0, 1)`,
497        //
498        // 2. Second sum value will be the sum of rows between `[0, 2)`
499        //
500        // 3. Third sum value will be the sum of rows between `[1, 3)`, etc.
501        //
502        // Since the accumulator keeps the running sum:
503        //
504        // 1. First sum we add to the state sum value between `[0, 1)`
505        //
506        // 2. Second sum we add to the state sum value between `[1, 2)`
507        // (`[0, 1)` is already in the state sum, hence running sum will
508        // cover `[0, 2)` range)
509        //
510        // 3. Third sum we add to the state sum value between `[2, 3)`
511        // (`[0, 2)` is already in the state sum).  Also we need to
512        // retract values between `[0, 1)` by this way we can obtain sum
513        // between [1, 3) which is indeed the appropriate range.
514        //
515        // When we use `UNBOUNDED PRECEDING` in the query starting
516        // index will always be 0 for the desired range, and hence the
517        // `retract_batch` method will not be called. In this case
518        // having retract_batch is not a requirement.
519        //
520        // This approach is a a bit different than window function
521        // approach. In window function (when they use a window frame)
522        // they get all the desired range during evaluation.
523        if !accumulator.supports_retract_batch() {
524            return not_impl_err!(
525                "Aggregate can not be used as a sliding accumulator because \
526                     `retract_batch` is not implemented: {}",
527                self.name
528            );
529        }
530        Ok(accumulator)
531    }
532
533    /// If the aggregate expression has a specialized
534    /// [`GroupsAccumulator`] implementation. If this returns true,
535    /// `[Self::create_groups_accumulator`] will be called.
536    pub fn groups_accumulator_supported(&self) -> bool {
537        let args = AccumulatorArgs {
538            return_field: Arc::clone(&self.return_field),
539            schema: &self.schema,
540            ignore_nulls: self.ignore_nulls,
541            order_bys: self.order_bys.as_ref(),
542            is_distinct: self.is_distinct,
543            name: &self.name,
544            is_reversed: self.is_reversed,
545            exprs: &self.args,
546        };
547        self.fun.groups_accumulator_supported(args)
548    }
549
550    /// Return a specialized [`GroupsAccumulator`] that manages state
551    /// for all groups.
552    ///
553    /// For maximum performance, a [`GroupsAccumulator`] should be
554    /// implemented in addition to [`Accumulator`].
555    pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
556        let args = AccumulatorArgs {
557            return_field: Arc::clone(&self.return_field),
558            schema: &self.schema,
559            ignore_nulls: self.ignore_nulls,
560            order_bys: self.order_bys.as_ref(),
561            is_distinct: self.is_distinct,
562            name: &self.name,
563            is_reversed: self.is_reversed,
564            exprs: &self.args,
565        };
566        self.fun.create_groups_accumulator(args)
567    }
568
569    /// Construct an expression that calculates the aggregate in reverse.
570    /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
571    /// For aggregates that do not support calculation in reverse,
572    /// returns None (which is the default value).
573    pub fn reverse_expr(&self) -> Option<AggregateFunctionExpr> {
574        match self.fun.reverse_udf() {
575            ReversedUDAF::NotSupported => None,
576            ReversedUDAF::Identical => Some(self.clone()),
577            ReversedUDAF::Reversed(reverse_udf) => {
578                let mut name = self.name().to_string();
579                // If the function is changed, we need to reverse order_by clause as well
580                // i.e. First(a order by b asc null first) -> Last(a order by b desc null last)
581                if self.fun().name() != reverse_udf.name() {
582                    replace_order_by_clause(&mut name);
583                }
584                replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name());
585
586                AggregateExprBuilder::new(reverse_udf, self.args.to_vec())
587                    .order_by(self.order_bys.iter().map(|e| e.reverse()).collect())
588                    .schema(Arc::new(self.schema.clone()))
589                    .alias(name)
590                    .with_ignore_nulls(self.ignore_nulls)
591                    .with_distinct(self.is_distinct)
592                    .with_reversed(!self.is_reversed)
593                    .build()
594                    .ok()
595            }
596        }
597    }
598
599    /// Returns all expressions used in the [`AggregateFunctionExpr`].
600    /// These expressions are  (1)function arguments, (2) order by expressions.
601    pub fn all_expressions(&self) -> AggregatePhysicalExpressions {
602        let args = self.expressions();
603        let order_by_exprs = self
604            .order_bys()
605            .iter()
606            .map(|sort_expr| Arc::clone(&sort_expr.expr))
607            .collect();
608        AggregatePhysicalExpressions {
609            args,
610            order_by_exprs,
611        }
612    }
613
614    /// Rewrites [`AggregateFunctionExpr`], with new expressions given. The argument should be consistent
615    /// with the return value of the [`AggregateFunctionExpr::all_expressions`] method.
616    /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
617    pub fn with_new_expressions(
618        &self,
619        _args: Vec<Arc<dyn PhysicalExpr>>,
620        _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
621    ) -> Option<AggregateFunctionExpr> {
622        None
623    }
624
625    /// If this function is max, return (output_field, true)
626    /// if the function is min, return (output_field, false)
627    /// otherwise return None (the default)
628    ///
629    /// output_field is the name of the column produced by this aggregate
630    ///
631    /// Note: this is used to use special aggregate implementations in certain conditions
632    pub fn get_minmax_desc(&self) -> Option<(FieldRef, bool)> {
633        self.fun.is_descending().map(|flag| (self.field(), flag))
634    }
635
636    /// Returns default value of the function given the input is Null
637    /// Most of the aggregate function return Null if input is Null,
638    /// while `count` returns 0 if input is Null
639    pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
640        self.fun.default_value(data_type)
641    }
642
643    /// Indicates whether the aggregation function is monotonic as a set
644    /// function. See [`SetMonotonicity`] for details.
645    pub fn set_monotonicity(&self) -> SetMonotonicity {
646        let field = self.field();
647        let data_type = field.data_type();
648        self.fun.inner().set_monotonicity(data_type)
649    }
650
651    /// Returns `PhysicalSortExpr` based on the set monotonicity of the function.
652    pub fn get_result_ordering(&self, aggr_func_idx: usize) -> Option<PhysicalSortExpr> {
653        // If the aggregate expressions are set-monotonic, the output data is
654        // naturally ordered with it per group or partition.
655        let monotonicity = self.set_monotonicity();
656        if monotonicity == SetMonotonicity::NotMonotonic {
657            return None;
658        }
659        let expr = Arc::new(Column::new(self.name(), aggr_func_idx));
660        let options =
661            SortOptions::new(monotonicity == SetMonotonicity::Decreasing, false);
662        Some(PhysicalSortExpr { expr, options })
663    }
664}
665
666/// Stores the physical expressions used inside the `AggregateExpr`.
667pub struct AggregatePhysicalExpressions {
668    /// Aggregate function arguments
669    pub args: Vec<Arc<dyn PhysicalExpr>>,
670    /// Order by expressions
671    pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
672}
673
674impl PartialEq for AggregateFunctionExpr {
675    fn eq(&self, other: &Self) -> bool {
676        self.name == other.name
677            && self.return_field == other.return_field
678            && self.fun == other.fun
679            && self.args.len() == other.args.len()
680            && self
681                .args
682                .iter()
683                .zip(other.args.iter())
684                .all(|(this_arg, other_arg)| this_arg.eq(other_arg))
685    }
686}
687
688fn replace_order_by_clause(order_by: &mut String) {
689    let suffixes = [
690        (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
691        (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
692        (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
693        (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
694    ];
695
696    if let Some(start) = order_by.find("ORDER BY [") {
697        if let Some(end) = order_by[start..].find(']') {
698            let order_by_start = start + 9;
699            let order_by_end = start + end;
700
701            let column_order = &order_by[order_by_start..=order_by_end];
702            for (suffix, replacement) in suffixes {
703                if column_order.ends_with(suffix) {
704                    let new_order = column_order.replace(suffix, replacement);
705                    order_by.replace_range(order_by_start..=order_by_end, &new_order);
706                    break;
707                }
708            }
709        }
710    }
711}
712
713fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) {
714    *aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
715}