datafusion_physical_expr/aggregate.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub(crate) mod groups_accumulator {
19 #[allow(unused_imports)]
20 pub(crate) mod accumulate {
21 pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState;
22 }
23 pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{
24 accumulate::NullState, GroupsAccumulatorAdapter,
25 };
26}
27pub(crate) mod stats {
28 pub use datafusion_functions_aggregate_common::stats::StatsType;
29}
30pub mod utils {
31 #[allow(deprecated)] // allow adjust_output_array
32 pub use datafusion_functions_aggregate_common::utils::{
33 adjust_output_array, get_accum_scalar_values_as_arrays, get_sort_options,
34 ordering_fields, DecimalAverager, Hashable,
35 };
36}
37
38use std::fmt::Debug;
39use std::sync::Arc;
40
41use crate::expressions::Column;
42
43use arrow::compute::SortOptions;
44use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
45use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
46use datafusion_expr::{AggregateUDF, ReversedUDAF, SetMonotonicity};
47use datafusion_expr_common::accumulator::Accumulator;
48use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
49use datafusion_expr_common::type_coercion::aggregates::check_arg_count;
50use datafusion_functions_aggregate_common::accumulator::{
51 AccumulatorArgs, StateFieldsArgs,
52};
53use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
54use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
56use datafusion_physical_expr_common::utils::reverse_order_bys;
57
58/// Builder for physical [`AggregateFunctionExpr`]
59///
60/// `AggregateFunctionExpr` contains the information necessary to call
61/// an aggregate expression.
62#[derive(Debug, Clone)]
63pub struct AggregateExprBuilder {
64 fun: Arc<AggregateUDF>,
65 /// Physical expressions of the aggregate function
66 args: Vec<Arc<dyn PhysicalExpr>>,
67 alias: Option<String>,
68 /// A human readable name
69 human_display: String,
70 /// Arrow Schema for the aggregate function
71 schema: SchemaRef,
72 /// The physical order by expressions
73 ordering_req: LexOrdering,
74 /// Whether to ignore null values
75 ignore_nulls: bool,
76 /// Whether is distinct aggregate function
77 is_distinct: bool,
78 /// Whether the expression is reversed
79 is_reversed: bool,
80}
81
82impl AggregateExprBuilder {
83 pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> Self {
84 Self {
85 fun,
86 args,
87 alias: None,
88 human_display: String::default(),
89 schema: Arc::new(Schema::empty()),
90 ordering_req: LexOrdering::default(),
91 ignore_nulls: false,
92 is_distinct: false,
93 is_reversed: false,
94 }
95 }
96
97 /// Constructs an `AggregateFunctionExpr` from the builder
98 ///
99 /// Note that an [`Self::alias`] must be provided before calling this method.
100 ///
101 /// # Example: Create an [`AggregateUDF`]
102 ///
103 /// In the following example, [`AggregateFunctionExpr`] will be built using [`AggregateExprBuilder`]
104 /// which provides a build function. Full example could be accessed from the source file.
105 ///
106 /// ```
107 /// # use std::any::Any;
108 /// # use std::sync::Arc;
109 /// # use arrow::datatypes::{DataType, FieldRef};
110 /// # use datafusion_common::{Result, ScalarValue};
111 /// # use datafusion_expr::{col, ColumnarValue, Documentation, Signature, Volatility, Expr};
112 /// # use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator, function::{AccumulatorArgs, StateFieldsArgs}};
113 /// # use arrow::datatypes::Field;
114 /// #
115 /// # #[derive(Debug, Clone)]
116 /// # struct FirstValueUdf {
117 /// # signature: Signature,
118 /// # }
119 /// #
120 /// # impl FirstValueUdf {
121 /// # fn new() -> Self {
122 /// # Self {
123 /// # signature: Signature::any(1, Volatility::Immutable),
124 /// # }
125 /// # }
126 /// # }
127 /// #
128 /// # impl AggregateUDFImpl for FirstValueUdf {
129 /// # fn as_any(&self) -> &dyn Any {
130 /// # unimplemented!()
131 /// # }
132 /// # fn name(&self) -> &str {
133 /// # unimplemented!()
134 /// }
135 /// # fn signature(&self) -> &Signature {
136 /// # unimplemented!()
137 /// # }
138 /// # fn return_type(&self, args: &[DataType]) -> Result<DataType> {
139 /// # unimplemented!()
140 /// # }
141 /// #
142 /// # fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
143 /// # unimplemented!()
144 /// # }
145 /// #
146 /// # fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
147 /// # unimplemented!()
148 /// # }
149 /// #
150 /// # fn documentation(&self) -> Option<&Documentation> {
151 /// # unimplemented!()
152 /// # }
153 /// # }
154 /// #
155 /// # let first_value = AggregateUDF::from(FirstValueUdf::new());
156 /// # let expr = first_value.call(vec![col("a")]);
157 /// #
158 /// # use datafusion_physical_expr::expressions::Column;
159 /// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
160 /// # use datafusion_physical_expr::aggregate::AggregateExprBuilder;
161 /// # use datafusion_physical_expr::expressions::PhysicalSortExpr;
162 /// # use datafusion_physical_expr::PhysicalSortRequirement;
163 /// #
164 /// fn build_aggregate_expr() -> Result<()> {
165 /// let args = vec![Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>];
166 /// let order_by = vec![PhysicalSortExpr {
167 /// expr: Arc::new(Column::new("x", 1)) as Arc<dyn PhysicalExpr>,
168 /// options: Default::default(),
169 /// }];
170 ///
171 /// let first_value = AggregateUDF::from(FirstValueUdf::new());
172 ///
173 /// let aggregate_expr = AggregateExprBuilder::new(
174 /// Arc::new(first_value),
175 /// args
176 /// )
177 /// .order_by(order_by.into())
178 /// .alias("first_a_by_x")
179 /// .ignore_nulls()
180 /// .build()?;
181 ///
182 /// Ok(())
183 /// }
184 /// ```
185 ///
186 /// This creates a physical expression equivalent to SQL:
187 /// `first_value(a ORDER BY x) IGNORE NULLS AS first_a_by_x`
188 pub fn build(self) -> Result<AggregateFunctionExpr> {
189 let Self {
190 fun,
191 args,
192 alias,
193 human_display,
194 schema,
195 ordering_req,
196 ignore_nulls,
197 is_distinct,
198 is_reversed,
199 } = self;
200 if args.is_empty() {
201 return internal_err!("args should not be empty");
202 }
203
204 let mut ordering_fields = vec![];
205
206 if !ordering_req.is_empty() {
207 let ordering_types = ordering_req
208 .iter()
209 .map(|e| e.expr.data_type(&schema))
210 .collect::<Result<Vec<_>>>()?;
211
212 ordering_fields =
213 utils::ordering_fields(ordering_req.as_ref(), &ordering_types);
214 }
215
216 let input_exprs_fields = args
217 .iter()
218 .map(|arg| arg.return_field(&schema))
219 .collect::<Result<Vec<_>>>()?;
220
221 check_arg_count(
222 fun.name(),
223 &input_exprs_fields,
224 &fun.signature().type_signature,
225 )?;
226
227 let return_field = fun.return_field(&input_exprs_fields)?;
228 let is_nullable = fun.is_nullable();
229 let name = match alias {
230 None => {
231 return internal_err!(
232 "AggregateExprBuilder::alias must be provided prior to calling build"
233 )
234 }
235 Some(alias) => alias,
236 };
237
238 Ok(AggregateFunctionExpr {
239 fun: Arc::unwrap_or_clone(fun),
240 args,
241 return_field,
242 name,
243 human_display,
244 schema: Arc::unwrap_or_clone(schema),
245 ordering_req,
246 ignore_nulls,
247 ordering_fields,
248 is_distinct,
249 input_fields: input_exprs_fields,
250 is_reversed,
251 is_nullable,
252 })
253 }
254
255 pub fn alias(mut self, alias: impl Into<String>) -> Self {
256 self.alias = Some(alias.into());
257 self
258 }
259
260 pub fn human_display(mut self, name: String) -> Self {
261 self.human_display = name;
262 self
263 }
264
265 pub fn schema(mut self, schema: SchemaRef) -> Self {
266 self.schema = schema;
267 self
268 }
269
270 pub fn order_by(mut self, order_by: LexOrdering) -> Self {
271 self.ordering_req = order_by;
272 self
273 }
274
275 pub fn reversed(mut self) -> Self {
276 self.is_reversed = true;
277 self
278 }
279
280 pub fn with_reversed(mut self, is_reversed: bool) -> Self {
281 self.is_reversed = is_reversed;
282 self
283 }
284
285 pub fn distinct(mut self) -> Self {
286 self.is_distinct = true;
287 self
288 }
289
290 pub fn with_distinct(mut self, is_distinct: bool) -> Self {
291 self.is_distinct = is_distinct;
292 self
293 }
294
295 pub fn ignore_nulls(mut self) -> Self {
296 self.ignore_nulls = true;
297 self
298 }
299
300 pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
301 self.ignore_nulls = ignore_nulls;
302 self
303 }
304}
305
306/// Physical aggregate expression of a UDAF.
307///
308/// Instances are constructed via [`AggregateExprBuilder`].
309#[derive(Debug, Clone)]
310pub struct AggregateFunctionExpr {
311 fun: AggregateUDF,
312 args: Vec<Arc<dyn PhysicalExpr>>,
313 /// Output / return field of this aggregate
314 return_field: FieldRef,
315 /// Output column name that this expression creates
316 name: String,
317 /// Simplified name for `tree` explain.
318 human_display: String,
319 schema: Schema,
320 // The physical order by expressions
321 ordering_req: LexOrdering,
322 // Whether to ignore null values
323 ignore_nulls: bool,
324 // fields used for order sensitive aggregation functions
325 ordering_fields: Vec<FieldRef>,
326 is_distinct: bool,
327 is_reversed: bool,
328 input_fields: Vec<FieldRef>,
329 is_nullable: bool,
330}
331
332impl AggregateFunctionExpr {
333 /// Return the `AggregateUDF` used by this `AggregateFunctionExpr`
334 pub fn fun(&self) -> &AggregateUDF {
335 &self.fun
336 }
337
338 /// expressions that are passed to the Accumulator.
339 /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
340 pub fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
341 self.args.clone()
342 }
343
344 /// Human readable name such as `"MIN(c2)"`.
345 pub fn name(&self) -> &str {
346 &self.name
347 }
348
349 /// Simplified name for `tree` explain.
350 pub fn human_display(&self) -> &str {
351 &self.human_display
352 }
353
354 /// Return if the aggregation is distinct
355 pub fn is_distinct(&self) -> bool {
356 self.is_distinct
357 }
358
359 /// Return if the aggregation ignores nulls
360 pub fn ignore_nulls(&self) -> bool {
361 self.ignore_nulls
362 }
363
364 /// Return if the aggregation is reversed
365 pub fn is_reversed(&self) -> bool {
366 self.is_reversed
367 }
368
369 /// Return if the aggregation is nullable
370 pub fn is_nullable(&self) -> bool {
371 self.is_nullable
372 }
373
374 /// the field of the final result of this aggregation.
375 pub fn field(&self) -> FieldRef {
376 self.return_field
377 .as_ref()
378 .clone()
379 .with_name(&self.name)
380 .into()
381 }
382
383 /// the accumulator used to accumulate values from the expressions.
384 /// the accumulator expects the same number of arguments as `expressions` and must
385 /// return states with the same description as `state_fields`
386 pub fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
387 let acc_args = AccumulatorArgs {
388 return_field: Arc::clone(&self.return_field),
389 schema: &self.schema,
390 ignore_nulls: self.ignore_nulls,
391 ordering_req: self.ordering_req.as_ref(),
392 is_distinct: self.is_distinct,
393 name: &self.name,
394 is_reversed: self.is_reversed,
395 exprs: &self.args,
396 };
397
398 self.fun.accumulator(acc_args)
399 }
400
401 /// the field of the final result of this aggregation.
402 pub fn state_fields(&self) -> Result<Vec<FieldRef>> {
403 let args = StateFieldsArgs {
404 name: &self.name,
405 input_fields: &self.input_fields,
406 return_field: Arc::clone(&self.return_field),
407 ordering_fields: &self.ordering_fields,
408 is_distinct: self.is_distinct,
409 };
410
411 self.fun.state_fields(args)
412 }
413
414 /// Order by requirements for the aggregate function
415 /// By default it is `None` (there is no requirement)
416 /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this
417 pub fn order_bys(&self) -> Option<&LexOrdering> {
418 if self.ordering_req.is_empty() {
419 return None;
420 }
421
422 if !self.order_sensitivity().is_insensitive() {
423 return Some(self.ordering_req.as_ref());
424 }
425
426 None
427 }
428
429 /// Indicates whether aggregator can produce the correct result with any
430 /// arbitrary input ordering. By default, we assume that aggregate expressions
431 /// are order insensitive.
432 pub fn order_sensitivity(&self) -> AggregateOrderSensitivity {
433 if !self.ordering_req.is_empty() {
434 // If there is requirement, use the sensitivity of the implementation
435 self.fun.order_sensitivity()
436 } else {
437 // If no requirement, aggregator is order insensitive
438 AggregateOrderSensitivity::Insensitive
439 }
440 }
441
442 /// Sets the indicator whether ordering requirements of the aggregator is
443 /// satisfied by its input. If this is not the case, aggregators with order
444 /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
445 /// the correct result with possibly more work internally.
446 ///
447 /// # Returns
448 ///
449 /// Returns `Ok(Some(updated_expr))` if the process completes successfully.
450 /// If the expression can benefit from existing input ordering, but does
451 /// not implement the method, returns an error. Order insensitive and hard
452 /// requirement aggregators return `Ok(None)`.
453 pub fn with_beneficial_ordering(
454 self: Arc<Self>,
455 beneficial_ordering: bool,
456 ) -> Result<Option<AggregateFunctionExpr>> {
457 let Some(updated_fn) = self
458 .fun
459 .clone()
460 .with_beneficial_ordering(beneficial_ordering)?
461 else {
462 return Ok(None);
463 };
464
465 AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec())
466 .order_by(self.ordering_req.clone())
467 .schema(Arc::new(self.schema.clone()))
468 .alias(self.name().to_string())
469 .with_ignore_nulls(self.ignore_nulls)
470 .with_distinct(self.is_distinct)
471 .with_reversed(self.is_reversed)
472 .build()
473 .map(Some)
474 }
475
476 /// Creates accumulator implementation that supports retract
477 pub fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
478 let args = AccumulatorArgs {
479 return_field: Arc::clone(&self.return_field),
480 schema: &self.schema,
481 ignore_nulls: self.ignore_nulls,
482 ordering_req: self.ordering_req.as_ref(),
483 is_distinct: self.is_distinct,
484 name: &self.name,
485 is_reversed: self.is_reversed,
486 exprs: &self.args,
487 };
488
489 let accumulator = self.fun.create_sliding_accumulator(args)?;
490
491 // Accumulators that have window frame startings different
492 // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to
493 // implement retract_batch method in order to run correctly
494 // currently in DataFusion.
495 //
496 // If this `retract_batches` is not present, there is no way
497 // to calculate result correctly. For example, the query
498 //
499 // ```sql
500 // SELECT
501 // SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a
502 // FROM
503 // t
504 // ```
505 //
506 // 1. First sum value will be the sum of rows between `[0, 1)`,
507 //
508 // 2. Second sum value will be the sum of rows between `[0, 2)`
509 //
510 // 3. Third sum value will be the sum of rows between `[1, 3)`, etc.
511 //
512 // Since the accumulator keeps the running sum:
513 //
514 // 1. First sum we add to the state sum value between `[0, 1)`
515 //
516 // 2. Second sum we add to the state sum value between `[1, 2)`
517 // (`[0, 1)` is already in the state sum, hence running sum will
518 // cover `[0, 2)` range)
519 //
520 // 3. Third sum we add to the state sum value between `[2, 3)`
521 // (`[0, 2)` is already in the state sum). Also we need to
522 // retract values between `[0, 1)` by this way we can obtain sum
523 // between [1, 3) which is indeed the appropriate range.
524 //
525 // When we use `UNBOUNDED PRECEDING` in the query starting
526 // index will always be 0 for the desired range, and hence the
527 // `retract_batch` method will not be called. In this case
528 // having retract_batch is not a requirement.
529 //
530 // This approach is a a bit different than window function
531 // approach. In window function (when they use a window frame)
532 // they get all the desired range during evaluation.
533 if !accumulator.supports_retract_batch() {
534 return not_impl_err!(
535 "Aggregate can not be used as a sliding accumulator because \
536 `retract_batch` is not implemented: {}",
537 self.name
538 );
539 }
540 Ok(accumulator)
541 }
542
543 /// If the aggregate expression has a specialized
544 /// [`GroupsAccumulator`] implementation. If this returns true,
545 /// `[Self::create_groups_accumulator`] will be called.
546 pub fn groups_accumulator_supported(&self) -> bool {
547 let args = AccumulatorArgs {
548 return_field: Arc::clone(&self.return_field),
549 schema: &self.schema,
550 ignore_nulls: self.ignore_nulls,
551 ordering_req: self.ordering_req.as_ref(),
552 is_distinct: self.is_distinct,
553 name: &self.name,
554 is_reversed: self.is_reversed,
555 exprs: &self.args,
556 };
557 self.fun.groups_accumulator_supported(args)
558 }
559
560 /// Return a specialized [`GroupsAccumulator`] that manages state
561 /// for all groups.
562 ///
563 /// For maximum performance, a [`GroupsAccumulator`] should be
564 /// implemented in addition to [`Accumulator`].
565 pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
566 let args = AccumulatorArgs {
567 return_field: Arc::clone(&self.return_field),
568 schema: &self.schema,
569 ignore_nulls: self.ignore_nulls,
570 ordering_req: self.ordering_req.as_ref(),
571 is_distinct: self.is_distinct,
572 name: &self.name,
573 is_reversed: self.is_reversed,
574 exprs: &self.args,
575 };
576 self.fun.create_groups_accumulator(args)
577 }
578
579 /// Construct an expression that calculates the aggregate in reverse.
580 /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
581 /// For aggregates that do not support calculation in reverse,
582 /// returns None (which is the default value).
583 pub fn reverse_expr(&self) -> Option<AggregateFunctionExpr> {
584 match self.fun.reverse_udf() {
585 ReversedUDAF::NotSupported => None,
586 ReversedUDAF::Identical => Some(self.clone()),
587 ReversedUDAF::Reversed(reverse_udf) => {
588 let reverse_ordering_req = reverse_order_bys(self.ordering_req.as_ref());
589 let mut name = self.name().to_string();
590 // If the function is changed, we need to reverse order_by clause as well
591 // i.e. First(a order by b asc null first) -> Last(a order by b desc null last)
592 if self.fun().name() == reverse_udf.name() {
593 } else {
594 replace_order_by_clause(&mut name);
595 }
596 replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name());
597
598 AggregateExprBuilder::new(reverse_udf, self.args.to_vec())
599 .order_by(reverse_ordering_req)
600 .schema(Arc::new(self.schema.clone()))
601 .alias(name)
602 .with_ignore_nulls(self.ignore_nulls)
603 .with_distinct(self.is_distinct)
604 .with_reversed(!self.is_reversed)
605 .build()
606 .ok()
607 }
608 }
609 }
610
611 /// Returns all expressions used in the [`AggregateFunctionExpr`].
612 /// These expressions are (1)function arguments, (2) order by expressions.
613 pub fn all_expressions(&self) -> AggregatePhysicalExpressions {
614 let args = self.expressions();
615 let order_bys = self
616 .order_bys()
617 .cloned()
618 .unwrap_or_else(LexOrdering::default);
619 let order_by_exprs = order_bys
620 .iter()
621 .map(|sort_expr| Arc::clone(&sort_expr.expr))
622 .collect::<Vec<_>>();
623 AggregatePhysicalExpressions {
624 args,
625 order_by_exprs,
626 }
627 }
628
629 /// Rewrites [`AggregateFunctionExpr`], with new expressions given. The argument should be consistent
630 /// with the return value of the [`AggregateFunctionExpr::all_expressions`] method.
631 /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
632 pub fn with_new_expressions(
633 &self,
634 _args: Vec<Arc<dyn PhysicalExpr>>,
635 _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
636 ) -> Option<AggregateFunctionExpr> {
637 None
638 }
639
640 /// If this function is max, return (output_field, true)
641 /// if the function is min, return (output_field, false)
642 /// otherwise return None (the default)
643 ///
644 /// output_field is the name of the column produced by this aggregate
645 ///
646 /// Note: this is used to use special aggregate implementations in certain conditions
647 pub fn get_minmax_desc(&self) -> Option<(FieldRef, bool)> {
648 self.fun.is_descending().map(|flag| (self.field(), flag))
649 }
650
651 /// Returns default value of the function given the input is Null
652 /// Most of the aggregate function return Null if input is Null,
653 /// while `count` returns 0 if input is Null
654 pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
655 self.fun.default_value(data_type)
656 }
657
658 /// Indicates whether the aggregation function is monotonic as a set
659 /// function. See [`SetMonotonicity`] for details.
660 pub fn set_monotonicity(&self) -> SetMonotonicity {
661 let field = self.field();
662 let data_type = field.data_type();
663 self.fun.inner().set_monotonicity(data_type)
664 }
665
666 /// Returns `PhysicalSortExpr` based on the set monotonicity of the function.
667 pub fn get_result_ordering(&self, aggr_func_idx: usize) -> Option<PhysicalSortExpr> {
668 // If the aggregate expressions are set-monotonic, the output data is
669 // naturally ordered with it per group or partition.
670 let monotonicity = self.set_monotonicity();
671 if monotonicity == SetMonotonicity::NotMonotonic {
672 return None;
673 }
674 let expr = Arc::new(Column::new(self.name(), aggr_func_idx));
675 let options =
676 SortOptions::new(monotonicity == SetMonotonicity::Decreasing, false);
677 Some(PhysicalSortExpr { expr, options })
678 }
679}
680
681/// Stores the physical expressions used inside the `AggregateExpr`.
682pub struct AggregatePhysicalExpressions {
683 /// Aggregate function arguments
684 pub args: Vec<Arc<dyn PhysicalExpr>>,
685 /// Order by expressions
686 pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
687}
688
689impl PartialEq for AggregateFunctionExpr {
690 fn eq(&self, other: &Self) -> bool {
691 self.name == other.name
692 && self.return_field == other.return_field
693 && self.fun == other.fun
694 && self.args.len() == other.args.len()
695 && self
696 .args
697 .iter()
698 .zip(other.args.iter())
699 .all(|(this_arg, other_arg)| this_arg.eq(other_arg))
700 }
701}
702
703fn replace_order_by_clause(order_by: &mut String) {
704 let suffixes = [
705 (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
706 (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
707 (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
708 (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
709 ];
710
711 if let Some(start) = order_by.find("ORDER BY [") {
712 if let Some(end) = order_by[start..].find(']') {
713 let order_by_start = start + 9;
714 let order_by_end = start + end;
715
716 let column_order = &order_by[order_by_start..=order_by_end];
717 for (suffix, replacement) in suffixes {
718 if column_order.ends_with(suffix) {
719 let new_order = column_order.replace(suffix, replacement);
720 order_by.replace_range(order_by_start..=order_by_end, &new_order);
721 break;
722 }
723 }
724 }
725 }
726}
727
728fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) {
729 *aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
730}