datafusion_physical_expr/aggregate.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub(crate) mod groups_accumulator {
19 #[allow(unused_imports)]
20 pub(crate) mod accumulate {
21 pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState;
22 }
23 pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{
24 accumulate::NullState, GroupsAccumulatorAdapter,
25 };
26}
27pub(crate) mod stats {
28 pub use datafusion_functions_aggregate_common::stats::StatsType;
29}
30pub mod utils {
31 #[allow(deprecated)] // allow adjust_output_array
32 pub use datafusion_functions_aggregate_common::utils::{
33 adjust_output_array, get_accum_scalar_values_as_arrays, get_sort_options,
34 ordering_fields, DecimalAverager, Hashable,
35 };
36}
37
38use std::fmt::Debug;
39use std::sync::Arc;
40
41use crate::expressions::Column;
42
43use arrow::compute::SortOptions;
44use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
45use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
46use datafusion_expr::{AggregateUDF, ReversedUDAF, SetMonotonicity};
47use datafusion_expr_common::accumulator::Accumulator;
48use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
49use datafusion_expr_common::type_coercion::aggregates::check_arg_count;
50use datafusion_functions_aggregate_common::accumulator::{
51 AccumulatorArgs, StateFieldsArgs,
52};
53use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
54use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
56
57/// Builder for physical [`AggregateFunctionExpr`]
58///
59/// `AggregateFunctionExpr` contains the information necessary to call
60/// an aggregate expression.
61#[derive(Debug, Clone)]
62pub struct AggregateExprBuilder {
63 fun: Arc<AggregateUDF>,
64 /// Physical expressions of the aggregate function
65 args: Vec<Arc<dyn PhysicalExpr>>,
66 alias: Option<String>,
67 /// A human readable name
68 human_display: String,
69 /// Arrow Schema for the aggregate function
70 schema: SchemaRef,
71 /// The physical order by expressions
72 order_bys: Vec<PhysicalSortExpr>,
73 /// Whether to ignore null values
74 ignore_nulls: bool,
75 /// Whether is distinct aggregate function
76 is_distinct: bool,
77 /// Whether the expression is reversed
78 is_reversed: bool,
79}
80
81impl AggregateExprBuilder {
82 pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> Self {
83 Self {
84 fun,
85 args,
86 alias: None,
87 human_display: String::default(),
88 schema: Arc::new(Schema::empty()),
89 order_bys: vec![],
90 ignore_nulls: false,
91 is_distinct: false,
92 is_reversed: false,
93 }
94 }
95
96 /// Constructs an `AggregateFunctionExpr` from the builder
97 ///
98 /// Note that an [`Self::alias`] must be provided before calling this method.
99 ///
100 /// # Example: Create an [`AggregateUDF`]
101 ///
102 /// In the following example, [`AggregateFunctionExpr`] will be built using [`AggregateExprBuilder`]
103 /// which provides a build function. Full example could be accessed from the source file.
104 ///
105 /// ```
106 /// # use std::any::Any;
107 /// # use std::sync::Arc;
108 /// # use arrow::datatypes::{DataType, FieldRef};
109 /// # use datafusion_common::{Result, ScalarValue};
110 /// # use datafusion_expr::{col, ColumnarValue, Documentation, Signature, Volatility, Expr};
111 /// # use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator, function::{AccumulatorArgs, StateFieldsArgs}};
112 /// # use arrow::datatypes::Field;
113 /// #
114 /// # #[derive(Debug, Clone)]
115 /// # struct FirstValueUdf {
116 /// # signature: Signature,
117 /// # }
118 /// #
119 /// # impl FirstValueUdf {
120 /// # fn new() -> Self {
121 /// # Self {
122 /// # signature: Signature::any(1, Volatility::Immutable),
123 /// # }
124 /// # }
125 /// # }
126 /// #
127 /// # impl AggregateUDFImpl for FirstValueUdf {
128 /// # fn as_any(&self) -> &dyn Any {
129 /// # unimplemented!()
130 /// # }
131 /// #
132 /// # fn name(&self) -> &str {
133 /// # unimplemented!()
134 /// # }
135 /// #
136 /// # fn signature(&self) -> &Signature {
137 /// # unimplemented!()
138 /// # }
139 /// #
140 /// # fn return_type(&self, args: &[DataType]) -> Result<DataType> {
141 /// # unimplemented!()
142 /// # }
143 /// #
144 /// # fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
145 /// # unimplemented!()
146 /// # }
147 /// #
148 /// # fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
149 /// # unimplemented!()
150 /// # }
151 /// #
152 /// # fn documentation(&self) -> Option<&Documentation> {
153 /// # unimplemented!()
154 /// # }
155 /// # }
156 /// #
157 /// # let first_value = AggregateUDF::from(FirstValueUdf::new());
158 /// # let expr = first_value.call(vec![col("a")]);
159 /// #
160 /// # use datafusion_physical_expr::expressions::Column;
161 /// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
162 /// # use datafusion_physical_expr::aggregate::AggregateExprBuilder;
163 /// # use datafusion_physical_expr::expressions::PhysicalSortExpr;
164 /// # use datafusion_physical_expr::PhysicalSortRequirement;
165 /// #
166 /// fn build_aggregate_expr() -> Result<()> {
167 /// let args = vec![Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>];
168 /// let order_by = vec![PhysicalSortExpr {
169 /// expr: Arc::new(Column::new("x", 1)) as Arc<dyn PhysicalExpr>,
170 /// options: Default::default(),
171 /// }];
172 ///
173 /// let first_value = AggregateUDF::from(FirstValueUdf::new());
174 ///
175 /// let aggregate_expr = AggregateExprBuilder::new(
176 /// Arc::new(first_value),
177 /// args
178 /// )
179 /// .order_by(order_by)
180 /// .alias("first_a_by_x")
181 /// .ignore_nulls()
182 /// .build()?;
183 ///
184 /// Ok(())
185 /// }
186 /// ```
187 ///
188 /// This creates a physical expression equivalent to SQL:
189 /// `first_value(a ORDER BY x) IGNORE NULLS AS first_a_by_x`
190 pub fn build(self) -> Result<AggregateFunctionExpr> {
191 let Self {
192 fun,
193 args,
194 alias,
195 human_display,
196 schema,
197 order_bys,
198 ignore_nulls,
199 is_distinct,
200 is_reversed,
201 } = self;
202 if args.is_empty() {
203 return internal_err!("args should not be empty");
204 }
205
206 let ordering_types = order_bys
207 .iter()
208 .map(|e| e.expr.data_type(&schema))
209 .collect::<Result<Vec<_>>>()?;
210
211 let ordering_fields = utils::ordering_fields(&order_bys, &ordering_types);
212
213 let input_exprs_fields = args
214 .iter()
215 .map(|arg| arg.return_field(&schema))
216 .collect::<Result<Vec<_>>>()?;
217
218 check_arg_count(
219 fun.name(),
220 &input_exprs_fields,
221 &fun.signature().type_signature,
222 )?;
223
224 let return_field = fun.return_field(&input_exprs_fields)?;
225 let is_nullable = fun.is_nullable();
226 let name = match alias {
227 None => {
228 return internal_err!(
229 "AggregateExprBuilder::alias must be provided prior to calling build"
230 )
231 }
232 Some(alias) => alias,
233 };
234
235 Ok(AggregateFunctionExpr {
236 fun: Arc::unwrap_or_clone(fun),
237 args,
238 return_field,
239 name,
240 human_display,
241 schema: Arc::unwrap_or_clone(schema),
242 order_bys,
243 ignore_nulls,
244 ordering_fields,
245 is_distinct,
246 input_fields: input_exprs_fields,
247 is_reversed,
248 is_nullable,
249 })
250 }
251
252 pub fn alias(mut self, alias: impl Into<String>) -> Self {
253 self.alias = Some(alias.into());
254 self
255 }
256
257 pub fn human_display(mut self, name: String) -> Self {
258 self.human_display = name;
259 self
260 }
261
262 pub fn schema(mut self, schema: SchemaRef) -> Self {
263 self.schema = schema;
264 self
265 }
266
267 pub fn order_by(mut self, order_bys: Vec<PhysicalSortExpr>) -> Self {
268 self.order_bys = order_bys;
269 self
270 }
271
272 pub fn reversed(mut self) -> Self {
273 self.is_reversed = true;
274 self
275 }
276
277 pub fn with_reversed(mut self, is_reversed: bool) -> Self {
278 self.is_reversed = is_reversed;
279 self
280 }
281
282 pub fn distinct(mut self) -> Self {
283 self.is_distinct = true;
284 self
285 }
286
287 pub fn with_distinct(mut self, is_distinct: bool) -> Self {
288 self.is_distinct = is_distinct;
289 self
290 }
291
292 pub fn ignore_nulls(mut self) -> Self {
293 self.ignore_nulls = true;
294 self
295 }
296
297 pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
298 self.ignore_nulls = ignore_nulls;
299 self
300 }
301}
302
303/// Physical aggregate expression of a UDAF.
304///
305/// Instances are constructed via [`AggregateExprBuilder`].
306#[derive(Debug, Clone)]
307pub struct AggregateFunctionExpr {
308 fun: AggregateUDF,
309 args: Vec<Arc<dyn PhysicalExpr>>,
310 /// Output / return field of this aggregate
311 return_field: FieldRef,
312 /// Output column name that this expression creates
313 name: String,
314 /// Simplified name for `tree` explain.
315 human_display: String,
316 schema: Schema,
317 // The physical order by expressions
318 order_bys: Vec<PhysicalSortExpr>,
319 // Whether to ignore null values
320 ignore_nulls: bool,
321 // fields used for order sensitive aggregation functions
322 ordering_fields: Vec<FieldRef>,
323 is_distinct: bool,
324 is_reversed: bool,
325 input_fields: Vec<FieldRef>,
326 is_nullable: bool,
327}
328
329impl AggregateFunctionExpr {
330 /// Return the `AggregateUDF` used by this `AggregateFunctionExpr`
331 pub fn fun(&self) -> &AggregateUDF {
332 &self.fun
333 }
334
335 /// expressions that are passed to the Accumulator.
336 /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
337 pub fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
338 self.args.clone()
339 }
340
341 /// Human readable name such as `"MIN(c2)"`.
342 pub fn name(&self) -> &str {
343 &self.name
344 }
345
346 /// Simplified name for `tree` explain.
347 pub fn human_display(&self) -> &str {
348 &self.human_display
349 }
350
351 /// Return if the aggregation is distinct
352 pub fn is_distinct(&self) -> bool {
353 self.is_distinct
354 }
355
356 /// Return if the aggregation ignores nulls
357 pub fn ignore_nulls(&self) -> bool {
358 self.ignore_nulls
359 }
360
361 /// Return if the aggregation is reversed
362 pub fn is_reversed(&self) -> bool {
363 self.is_reversed
364 }
365
366 /// Return if the aggregation is nullable
367 pub fn is_nullable(&self) -> bool {
368 self.is_nullable
369 }
370
371 /// the field of the final result of this aggregation.
372 pub fn field(&self) -> FieldRef {
373 self.return_field
374 .as_ref()
375 .clone()
376 .with_name(&self.name)
377 .into()
378 }
379
380 /// the accumulator used to accumulate values from the expressions.
381 /// the accumulator expects the same number of arguments as `expressions` and must
382 /// return states with the same description as `state_fields`
383 pub fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
384 let acc_args = AccumulatorArgs {
385 return_field: Arc::clone(&self.return_field),
386 schema: &self.schema,
387 ignore_nulls: self.ignore_nulls,
388 order_bys: self.order_bys.as_ref(),
389 is_distinct: self.is_distinct,
390 name: &self.name,
391 is_reversed: self.is_reversed,
392 exprs: &self.args,
393 };
394
395 self.fun.accumulator(acc_args)
396 }
397
398 /// the field of the final result of this aggregation.
399 pub fn state_fields(&self) -> Result<Vec<FieldRef>> {
400 let args = StateFieldsArgs {
401 name: &self.name,
402 input_fields: &self.input_fields,
403 return_field: Arc::clone(&self.return_field),
404 ordering_fields: &self.ordering_fields,
405 is_distinct: self.is_distinct,
406 };
407
408 self.fun.state_fields(args)
409 }
410
411 /// Returns the ORDER BY expressions for the aggregate function.
412 pub fn order_bys(&self) -> &[PhysicalSortExpr] {
413 if self.order_sensitivity().is_insensitive() {
414 &[]
415 } else {
416 &self.order_bys
417 }
418 }
419
420 /// Indicates whether aggregator can produce the correct result with any
421 /// arbitrary input ordering. By default, we assume that aggregate expressions
422 /// are order insensitive.
423 pub fn order_sensitivity(&self) -> AggregateOrderSensitivity {
424 if self.order_bys.is_empty() {
425 AggregateOrderSensitivity::Insensitive
426 } else {
427 // If there is an ORDER BY clause, use the sensitivity of the implementation:
428 self.fun.order_sensitivity()
429 }
430 }
431
432 /// Sets the indicator whether ordering requirements of the aggregator is
433 /// satisfied by its input. If this is not the case, aggregators with order
434 /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
435 /// the correct result with possibly more work internally.
436 ///
437 /// # Returns
438 ///
439 /// Returns `Ok(Some(updated_expr))` if the process completes successfully.
440 /// If the expression can benefit from existing input ordering, but does
441 /// not implement the method, returns an error. Order insensitive and hard
442 /// requirement aggregators return `Ok(None)`.
443 pub fn with_beneficial_ordering(
444 self: Arc<Self>,
445 beneficial_ordering: bool,
446 ) -> Result<Option<AggregateFunctionExpr>> {
447 let Some(updated_fn) = self
448 .fun
449 .clone()
450 .with_beneficial_ordering(beneficial_ordering)?
451 else {
452 return Ok(None);
453 };
454
455 AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec())
456 .order_by(self.order_bys.clone())
457 .schema(Arc::new(self.schema.clone()))
458 .alias(self.name().to_string())
459 .with_ignore_nulls(self.ignore_nulls)
460 .with_distinct(self.is_distinct)
461 .with_reversed(self.is_reversed)
462 .build()
463 .map(Some)
464 }
465
466 /// Creates accumulator implementation that supports retract
467 pub fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
468 let args = AccumulatorArgs {
469 return_field: Arc::clone(&self.return_field),
470 schema: &self.schema,
471 ignore_nulls: self.ignore_nulls,
472 order_bys: self.order_bys.as_ref(),
473 is_distinct: self.is_distinct,
474 name: &self.name,
475 is_reversed: self.is_reversed,
476 exprs: &self.args,
477 };
478
479 let accumulator = self.fun.create_sliding_accumulator(args)?;
480
481 // Accumulators that have window frame startings different
482 // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to
483 // implement retract_batch method in order to run correctly
484 // currently in DataFusion.
485 //
486 // If this `retract_batches` is not present, there is no way
487 // to calculate result correctly. For example, the query
488 //
489 // ```sql
490 // SELECT
491 // SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a
492 // FROM
493 // t
494 // ```
495 //
496 // 1. First sum value will be the sum of rows between `[0, 1)`,
497 //
498 // 2. Second sum value will be the sum of rows between `[0, 2)`
499 //
500 // 3. Third sum value will be the sum of rows between `[1, 3)`, etc.
501 //
502 // Since the accumulator keeps the running sum:
503 //
504 // 1. First sum we add to the state sum value between `[0, 1)`
505 //
506 // 2. Second sum we add to the state sum value between `[1, 2)`
507 // (`[0, 1)` is already in the state sum, hence running sum will
508 // cover `[0, 2)` range)
509 //
510 // 3. Third sum we add to the state sum value between `[2, 3)`
511 // (`[0, 2)` is already in the state sum). Also we need to
512 // retract values between `[0, 1)` by this way we can obtain sum
513 // between [1, 3) which is indeed the appropriate range.
514 //
515 // When we use `UNBOUNDED PRECEDING` in the query starting
516 // index will always be 0 for the desired range, and hence the
517 // `retract_batch` method will not be called. In this case
518 // having retract_batch is not a requirement.
519 //
520 // This approach is a a bit different than window function
521 // approach. In window function (when they use a window frame)
522 // they get all the desired range during evaluation.
523 if !accumulator.supports_retract_batch() {
524 return not_impl_err!(
525 "Aggregate can not be used as a sliding accumulator because \
526 `retract_batch` is not implemented: {}",
527 self.name
528 );
529 }
530 Ok(accumulator)
531 }
532
533 /// If the aggregate expression has a specialized
534 /// [`GroupsAccumulator`] implementation. If this returns true,
535 /// `[Self::create_groups_accumulator`] will be called.
536 pub fn groups_accumulator_supported(&self) -> bool {
537 let args = AccumulatorArgs {
538 return_field: Arc::clone(&self.return_field),
539 schema: &self.schema,
540 ignore_nulls: self.ignore_nulls,
541 order_bys: self.order_bys.as_ref(),
542 is_distinct: self.is_distinct,
543 name: &self.name,
544 is_reversed: self.is_reversed,
545 exprs: &self.args,
546 };
547 self.fun.groups_accumulator_supported(args)
548 }
549
550 /// Return a specialized [`GroupsAccumulator`] that manages state
551 /// for all groups.
552 ///
553 /// For maximum performance, a [`GroupsAccumulator`] should be
554 /// implemented in addition to [`Accumulator`].
555 pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
556 let args = AccumulatorArgs {
557 return_field: Arc::clone(&self.return_field),
558 schema: &self.schema,
559 ignore_nulls: self.ignore_nulls,
560 order_bys: self.order_bys.as_ref(),
561 is_distinct: self.is_distinct,
562 name: &self.name,
563 is_reversed: self.is_reversed,
564 exprs: &self.args,
565 };
566 self.fun.create_groups_accumulator(args)
567 }
568
569 /// Construct an expression that calculates the aggregate in reverse.
570 /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
571 /// For aggregates that do not support calculation in reverse,
572 /// returns None (which is the default value).
573 pub fn reverse_expr(&self) -> Option<AggregateFunctionExpr> {
574 match self.fun.reverse_udf() {
575 ReversedUDAF::NotSupported => None,
576 ReversedUDAF::Identical => Some(self.clone()),
577 ReversedUDAF::Reversed(reverse_udf) => {
578 let mut name = self.name().to_string();
579 // If the function is changed, we need to reverse order_by clause as well
580 // i.e. First(a order by b asc null first) -> Last(a order by b desc null last)
581 if self.fun().name() != reverse_udf.name() {
582 replace_order_by_clause(&mut name);
583 }
584 replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name());
585
586 AggregateExprBuilder::new(reverse_udf, self.args.to_vec())
587 .order_by(self.order_bys.iter().map(|e| e.reverse()).collect())
588 .schema(Arc::new(self.schema.clone()))
589 .alias(name)
590 .with_ignore_nulls(self.ignore_nulls)
591 .with_distinct(self.is_distinct)
592 .with_reversed(!self.is_reversed)
593 .build()
594 .ok()
595 }
596 }
597 }
598
599 /// Returns all expressions used in the [`AggregateFunctionExpr`].
600 /// These expressions are (1)function arguments, (2) order by expressions.
601 pub fn all_expressions(&self) -> AggregatePhysicalExpressions {
602 let args = self.expressions();
603 let order_by_exprs = self
604 .order_bys()
605 .iter()
606 .map(|sort_expr| Arc::clone(&sort_expr.expr))
607 .collect();
608 AggregatePhysicalExpressions {
609 args,
610 order_by_exprs,
611 }
612 }
613
614 /// Rewrites [`AggregateFunctionExpr`], with new expressions given. The argument should be consistent
615 /// with the return value of the [`AggregateFunctionExpr::all_expressions`] method.
616 /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
617 pub fn with_new_expressions(
618 &self,
619 _args: Vec<Arc<dyn PhysicalExpr>>,
620 _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
621 ) -> Option<AggregateFunctionExpr> {
622 None
623 }
624
625 /// If this function is max, return (output_field, true)
626 /// if the function is min, return (output_field, false)
627 /// otherwise return None (the default)
628 ///
629 /// output_field is the name of the column produced by this aggregate
630 ///
631 /// Note: this is used to use special aggregate implementations in certain conditions
632 pub fn get_minmax_desc(&self) -> Option<(FieldRef, bool)> {
633 self.fun.is_descending().map(|flag| (self.field(), flag))
634 }
635
636 /// Returns default value of the function given the input is Null
637 /// Most of the aggregate function return Null if input is Null,
638 /// while `count` returns 0 if input is Null
639 pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
640 self.fun.default_value(data_type)
641 }
642
643 /// Indicates whether the aggregation function is monotonic as a set
644 /// function. See [`SetMonotonicity`] for details.
645 pub fn set_monotonicity(&self) -> SetMonotonicity {
646 let field = self.field();
647 let data_type = field.data_type();
648 self.fun.inner().set_monotonicity(data_type)
649 }
650
651 /// Returns `PhysicalSortExpr` based on the set monotonicity of the function.
652 pub fn get_result_ordering(&self, aggr_func_idx: usize) -> Option<PhysicalSortExpr> {
653 // If the aggregate expressions are set-monotonic, the output data is
654 // naturally ordered with it per group or partition.
655 let monotonicity = self.set_monotonicity();
656 if monotonicity == SetMonotonicity::NotMonotonic {
657 return None;
658 }
659 let expr = Arc::new(Column::new(self.name(), aggr_func_idx));
660 let options =
661 SortOptions::new(monotonicity == SetMonotonicity::Decreasing, false);
662 Some(PhysicalSortExpr { expr, options })
663 }
664}
665
666/// Stores the physical expressions used inside the `AggregateExpr`.
667pub struct AggregatePhysicalExpressions {
668 /// Aggregate function arguments
669 pub args: Vec<Arc<dyn PhysicalExpr>>,
670 /// Order by expressions
671 pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
672}
673
674impl PartialEq for AggregateFunctionExpr {
675 fn eq(&self, other: &Self) -> bool {
676 self.name == other.name
677 && self.return_field == other.return_field
678 && self.fun == other.fun
679 && self.args.len() == other.args.len()
680 && self
681 .args
682 .iter()
683 .zip(other.args.iter())
684 .all(|(this_arg, other_arg)| this_arg.eq(other_arg))
685 }
686}
687
688fn replace_order_by_clause(order_by: &mut String) {
689 let suffixes = [
690 (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
691 (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
692 (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
693 (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
694 ];
695
696 if let Some(start) = order_by.find("ORDER BY [") {
697 if let Some(end) = order_by[start..].find(']') {
698 let order_by_start = start + 9;
699 let order_by_end = start + end;
700
701 let column_order = &order_by[order_by_start..=order_by_end];
702 for (suffix, replacement) in suffixes {
703 if column_order.ends_with(suffix) {
704 let new_order = column_order.replace(suffix, replacement);
705 order_by.replace_range(order_by_start..=order_by_end, &new_order);
706 break;
707 }
708 }
709 }
710 }
711}
712
713fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) {
714 *aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
715}