datafusion_physical_expr/aggregate.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub(crate) mod groups_accumulator {
19 #[allow(unused_imports)]
20 pub(crate) mod accumulate {
21 pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState;
22 }
23 pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{
24 accumulate::NullState, GroupsAccumulatorAdapter,
25 };
26}
27pub(crate) mod stats {
28 pub use datafusion_functions_aggregate_common::stats::StatsType;
29}
30pub mod utils {
31 pub use datafusion_functions_aggregate_common::utils::{
32 get_accum_scalar_values_as_arrays, get_sort_options, ordering_fields,
33 DecimalAverager, Hashable,
34 };
35}
36
37use std::fmt::Debug;
38use std::sync::Arc;
39
40use crate::expressions::Column;
41
42use arrow::compute::SortOptions;
43use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
44use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
45use datafusion_expr::{AggregateUDF, ReversedUDAF, SetMonotonicity};
46use datafusion_expr_common::accumulator::Accumulator;
47use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
48use datafusion_expr_common::type_coercion::aggregates::check_arg_count;
49use datafusion_functions_aggregate_common::accumulator::{
50 AccumulatorArgs, StateFieldsArgs,
51};
52use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
53use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
54use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
55
56/// Builder for physical [`AggregateFunctionExpr`]
57///
58/// `AggregateFunctionExpr` contains the information necessary to call
59/// an aggregate expression.
60#[derive(Debug, Clone)]
61pub struct AggregateExprBuilder {
62 fun: Arc<AggregateUDF>,
63 /// Physical expressions of the aggregate function
64 args: Vec<Arc<dyn PhysicalExpr>>,
65 alias: Option<String>,
66 /// A human readable name
67 human_display: String,
68 /// Arrow Schema for the aggregate function
69 schema: SchemaRef,
70 /// The physical order by expressions
71 order_bys: Vec<PhysicalSortExpr>,
72 /// Whether to ignore null values
73 ignore_nulls: bool,
74 /// Whether is distinct aggregate function
75 is_distinct: bool,
76 /// Whether the expression is reversed
77 is_reversed: bool,
78}
79
80impl AggregateExprBuilder {
81 pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> Self {
82 Self {
83 fun,
84 args,
85 alias: None,
86 human_display: String::default(),
87 schema: Arc::new(Schema::empty()),
88 order_bys: vec![],
89 ignore_nulls: false,
90 is_distinct: false,
91 is_reversed: false,
92 }
93 }
94
95 /// Constructs an `AggregateFunctionExpr` from the builder
96 ///
97 /// Note that an [`Self::alias`] must be provided before calling this method.
98 ///
99 /// # Example: Create an [`AggregateUDF`]
100 ///
101 /// In the following example, [`AggregateFunctionExpr`] will be built using [`AggregateExprBuilder`]
102 /// which provides a build function. Full example could be accessed from the source file.
103 ///
104 /// ```
105 /// # use std::any::Any;
106 /// # use std::sync::Arc;
107 /// # use arrow::datatypes::{DataType, FieldRef};
108 /// # use datafusion_common::{Result, ScalarValue};
109 /// # use datafusion_expr::{col, ColumnarValue, Documentation, Signature, Volatility, Expr};
110 /// # use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator, function::{AccumulatorArgs, StateFieldsArgs}};
111 /// # use arrow::datatypes::Field;
112 /// #
113 /// # #[derive(Debug, Clone, PartialEq, Eq, Hash)]
114 /// # struct FirstValueUdf {
115 /// # signature: Signature,
116 /// # }
117 /// #
118 /// # impl FirstValueUdf {
119 /// # fn new() -> Self {
120 /// # Self {
121 /// # signature: Signature::any(1, Volatility::Immutable),
122 /// # }
123 /// # }
124 /// # }
125 /// #
126 /// # impl AggregateUDFImpl for FirstValueUdf {
127 /// # fn as_any(&self) -> &dyn Any {
128 /// # unimplemented!()
129 /// # }
130 /// #
131 /// # fn name(&self) -> &str {
132 /// # unimplemented!()
133 /// # }
134 /// #
135 /// # fn signature(&self) -> &Signature {
136 /// # unimplemented!()
137 /// # }
138 /// #
139 /// # fn return_type(&self, args: &[DataType]) -> Result<DataType> {
140 /// # unimplemented!()
141 /// # }
142 /// #
143 /// # fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
144 /// # unimplemented!()
145 /// # }
146 /// #
147 /// # fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
148 /// # unimplemented!()
149 /// # }
150 /// #
151 /// # fn documentation(&self) -> Option<&Documentation> {
152 /// # unimplemented!()
153 /// # }
154 /// # }
155 /// #
156 /// # let first_value = AggregateUDF::from(FirstValueUdf::new());
157 /// # let expr = first_value.call(vec![col("a")]);
158 /// #
159 /// # use datafusion_physical_expr::expressions::Column;
160 /// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
161 /// # use datafusion_physical_expr::aggregate::AggregateExprBuilder;
162 /// # use datafusion_physical_expr::expressions::PhysicalSortExpr;
163 /// # use datafusion_physical_expr::PhysicalSortRequirement;
164 /// #
165 /// fn build_aggregate_expr() -> Result<()> {
166 /// let args = vec![Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>];
167 /// let order_by = vec![PhysicalSortExpr {
168 /// expr: Arc::new(Column::new("x", 1)) as Arc<dyn PhysicalExpr>,
169 /// options: Default::default(),
170 /// }];
171 ///
172 /// let first_value = AggregateUDF::from(FirstValueUdf::new());
173 ///
174 /// let aggregate_expr = AggregateExprBuilder::new(
175 /// Arc::new(first_value),
176 /// args
177 /// )
178 /// .order_by(order_by)
179 /// .alias("first_a_by_x")
180 /// .ignore_nulls()
181 /// .build()?;
182 ///
183 /// Ok(())
184 /// }
185 /// ```
186 ///
187 /// This creates a physical expression equivalent to SQL:
188 /// `first_value(a ORDER BY x) IGNORE NULLS AS first_a_by_x`
189 pub fn build(self) -> Result<AggregateFunctionExpr> {
190 let Self {
191 fun,
192 args,
193 alias,
194 human_display,
195 schema,
196 order_bys,
197 ignore_nulls,
198 is_distinct,
199 is_reversed,
200 } = self;
201 if args.is_empty() {
202 return internal_err!("args should not be empty");
203 }
204
205 let ordering_types = order_bys
206 .iter()
207 .map(|e| e.expr.data_type(&schema))
208 .collect::<Result<Vec<_>>>()?;
209
210 let ordering_fields = utils::ordering_fields(&order_bys, &ordering_types);
211
212 let input_exprs_fields = args
213 .iter()
214 .map(|arg| arg.return_field(&schema))
215 .collect::<Result<Vec<_>>>()?;
216
217 check_arg_count(
218 fun.name(),
219 &input_exprs_fields,
220 &fun.signature().type_signature,
221 )?;
222
223 let return_field = fun.return_field(&input_exprs_fields)?;
224 let is_nullable = fun.is_nullable();
225 let name = match alias {
226 None => {
227 return internal_err!(
228 "AggregateExprBuilder::alias must be provided prior to calling build"
229 )
230 }
231 Some(alias) => alias,
232 };
233
234 let arg_fields = args
235 .iter()
236 .map(|e| e.return_field(schema.as_ref()))
237 .collect::<Result<Vec<_>>>()?;
238
239 Ok(AggregateFunctionExpr {
240 fun: Arc::unwrap_or_clone(fun),
241 args,
242 arg_fields,
243 return_field,
244 name,
245 human_display,
246 schema: Arc::unwrap_or_clone(schema),
247 order_bys,
248 ignore_nulls,
249 ordering_fields,
250 is_distinct,
251 input_fields: input_exprs_fields,
252 is_reversed,
253 is_nullable,
254 })
255 }
256
257 pub fn alias(mut self, alias: impl Into<String>) -> Self {
258 self.alias = Some(alias.into());
259 self
260 }
261
262 pub fn human_display(mut self, name: String) -> Self {
263 self.human_display = name;
264 self
265 }
266
267 pub fn schema(mut self, schema: SchemaRef) -> Self {
268 self.schema = schema;
269 self
270 }
271
272 pub fn order_by(mut self, order_bys: Vec<PhysicalSortExpr>) -> Self {
273 self.order_bys = order_bys;
274 self
275 }
276
277 pub fn reversed(mut self) -> Self {
278 self.is_reversed = true;
279 self
280 }
281
282 pub fn with_reversed(mut self, is_reversed: bool) -> Self {
283 self.is_reversed = is_reversed;
284 self
285 }
286
287 pub fn distinct(mut self) -> Self {
288 self.is_distinct = true;
289 self
290 }
291
292 pub fn with_distinct(mut self, is_distinct: bool) -> Self {
293 self.is_distinct = is_distinct;
294 self
295 }
296
297 pub fn ignore_nulls(mut self) -> Self {
298 self.ignore_nulls = true;
299 self
300 }
301
302 pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
303 self.ignore_nulls = ignore_nulls;
304 self
305 }
306}
307
308/// Physical aggregate expression of a UDAF.
309///
310/// Instances are constructed via [`AggregateExprBuilder`].
311#[derive(Debug, Clone)]
312pub struct AggregateFunctionExpr {
313 fun: AggregateUDF,
314 args: Vec<Arc<dyn PhysicalExpr>>,
315 /// Fields corresponding to args (same order & length)
316 arg_fields: Vec<FieldRef>,
317 /// Output / return field of this aggregate
318 return_field: FieldRef,
319 /// Output column name that this expression creates
320 name: String,
321 /// Simplified name for `tree` explain.
322 human_display: String,
323 schema: Schema,
324 // The physical order by expressions
325 order_bys: Vec<PhysicalSortExpr>,
326 // Whether to ignore null values
327 ignore_nulls: bool,
328 // fields used for order sensitive aggregation functions
329 ordering_fields: Vec<FieldRef>,
330 is_distinct: bool,
331 is_reversed: bool,
332 input_fields: Vec<FieldRef>,
333 is_nullable: bool,
334}
335
336impl AggregateFunctionExpr {
337 /// Return the `AggregateUDF` used by this `AggregateFunctionExpr`
338 pub fn fun(&self) -> &AggregateUDF {
339 &self.fun
340 }
341
342 /// expressions that are passed to the Accumulator.
343 /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
344 pub fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
345 self.args.clone()
346 }
347
348 /// Human readable name such as `"MIN(c2)"`.
349 pub fn name(&self) -> &str {
350 &self.name
351 }
352
353 /// Simplified name for `tree` explain.
354 pub fn human_display(&self) -> &str {
355 &self.human_display
356 }
357
358 /// Return if the aggregation is distinct
359 pub fn is_distinct(&self) -> bool {
360 self.is_distinct
361 }
362
363 /// Return if the aggregation ignores nulls
364 pub fn ignore_nulls(&self) -> bool {
365 self.ignore_nulls
366 }
367
368 /// Return if the aggregation is reversed
369 pub fn is_reversed(&self) -> bool {
370 self.is_reversed
371 }
372
373 /// Return if the aggregation is nullable
374 pub fn is_nullable(&self) -> bool {
375 self.is_nullable
376 }
377
378 /// the field of the final result of this aggregation.
379 pub fn field(&self) -> FieldRef {
380 self.return_field
381 .as_ref()
382 .clone()
383 .with_name(&self.name)
384 .into()
385 }
386
387 /// the accumulator used to accumulate values from the expressions.
388 /// the accumulator expects the same number of arguments as `expressions` and must
389 /// return states with the same description as `state_fields`
390 pub fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
391 let acc_args = AccumulatorArgs {
392 return_field: Arc::clone(&self.return_field),
393 schema: &self.schema,
394 expr_fields: &self.arg_fields,
395 ignore_nulls: self.ignore_nulls,
396 order_bys: self.order_bys.as_ref(),
397 is_distinct: self.is_distinct,
398 name: &self.name,
399 is_reversed: self.is_reversed,
400 exprs: &self.args,
401 };
402
403 self.fun.accumulator(acc_args)
404 }
405
406 /// the field of the final result of this aggregation.
407 pub fn state_fields(&self) -> Result<Vec<FieldRef>> {
408 let args = StateFieldsArgs {
409 name: &self.name,
410 input_fields: &self.input_fields,
411 return_field: Arc::clone(&self.return_field),
412 ordering_fields: &self.ordering_fields,
413 is_distinct: self.is_distinct,
414 };
415
416 self.fun.state_fields(args)
417 }
418
419 /// Returns the ORDER BY expressions for the aggregate function.
420 pub fn order_bys(&self) -> &[PhysicalSortExpr] {
421 if self.order_sensitivity().is_insensitive() {
422 &[]
423 } else {
424 &self.order_bys
425 }
426 }
427
428 /// Indicates whether aggregator can produce the correct result with any
429 /// arbitrary input ordering. By default, we assume that aggregate expressions
430 /// are order insensitive.
431 pub fn order_sensitivity(&self) -> AggregateOrderSensitivity {
432 if self.order_bys.is_empty() {
433 AggregateOrderSensitivity::Insensitive
434 } else {
435 // If there is an ORDER BY clause, use the sensitivity of the implementation:
436 self.fun.order_sensitivity()
437 }
438 }
439
440 /// Sets the indicator whether ordering requirements of the aggregator is
441 /// satisfied by its input. If this is not the case, aggregators with order
442 /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
443 /// the correct result with possibly more work internally.
444 ///
445 /// # Returns
446 ///
447 /// Returns `Ok(Some(updated_expr))` if the process completes successfully.
448 /// If the expression can benefit from existing input ordering, but does
449 /// not implement the method, returns an error. Order insensitive and hard
450 /// requirement aggregators return `Ok(None)`.
451 pub fn with_beneficial_ordering(
452 self: Arc<Self>,
453 beneficial_ordering: bool,
454 ) -> Result<Option<AggregateFunctionExpr>> {
455 let Some(updated_fn) = self
456 .fun
457 .clone()
458 .with_beneficial_ordering(beneficial_ordering)?
459 else {
460 return Ok(None);
461 };
462
463 AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec())
464 .order_by(self.order_bys.clone())
465 .schema(Arc::new(self.schema.clone()))
466 .alias(self.name().to_string())
467 .with_ignore_nulls(self.ignore_nulls)
468 .with_distinct(self.is_distinct)
469 .with_reversed(self.is_reversed)
470 .build()
471 .map(Some)
472 }
473
474 /// Creates accumulator implementation that supports retract
475 pub fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
476 let args = AccumulatorArgs {
477 return_field: Arc::clone(&self.return_field),
478 schema: &self.schema,
479 expr_fields: &self.arg_fields,
480 ignore_nulls: self.ignore_nulls,
481 order_bys: self.order_bys.as_ref(),
482 is_distinct: self.is_distinct,
483 name: &self.name,
484 is_reversed: self.is_reversed,
485 exprs: &self.args,
486 };
487
488 let accumulator = self.fun.create_sliding_accumulator(args)?;
489
490 // Accumulators that have window frame startings different
491 // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to
492 // implement retract_batch method in order to run correctly
493 // currently in DataFusion.
494 //
495 // If this `retract_batches` is not present, there is no way
496 // to calculate result correctly. For example, the query
497 //
498 // ```sql
499 // SELECT
500 // SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a
501 // FROM
502 // t
503 // ```
504 //
505 // 1. First sum value will be the sum of rows between `[0, 1)`,
506 //
507 // 2. Second sum value will be the sum of rows between `[0, 2)`
508 //
509 // 3. Third sum value will be the sum of rows between `[1, 3)`, etc.
510 //
511 // Since the accumulator keeps the running sum:
512 //
513 // 1. First sum we add to the state sum value between `[0, 1)`
514 //
515 // 2. Second sum we add to the state sum value between `[1, 2)`
516 // (`[0, 1)` is already in the state sum, hence running sum will
517 // cover `[0, 2)` range)
518 //
519 // 3. Third sum we add to the state sum value between `[2, 3)`
520 // (`[0, 2)` is already in the state sum). Also we need to
521 // retract values between `[0, 1)` by this way we can obtain sum
522 // between [1, 3) which is indeed the appropriate range.
523 //
524 // When we use `UNBOUNDED PRECEDING` in the query starting
525 // index will always be 0 for the desired range, and hence the
526 // `retract_batch` method will not be called. In this case
527 // having retract_batch is not a requirement.
528 //
529 // This approach is a a bit different than window function
530 // approach. In window function (when they use a window frame)
531 // they get all the desired range during evaluation.
532 if !accumulator.supports_retract_batch() {
533 return not_impl_err!(
534 "Aggregate can not be used as a sliding accumulator because \
535 `retract_batch` is not implemented: {}",
536 self.name
537 );
538 }
539 Ok(accumulator)
540 }
541
542 /// If the aggregate expression has a specialized
543 /// [`GroupsAccumulator`] implementation. If this returns true,
544 /// `[Self::create_groups_accumulator`] will be called.
545 pub fn groups_accumulator_supported(&self) -> bool {
546 let args = AccumulatorArgs {
547 return_field: Arc::clone(&self.return_field),
548 schema: &self.schema,
549 expr_fields: &self.arg_fields,
550 ignore_nulls: self.ignore_nulls,
551 order_bys: self.order_bys.as_ref(),
552 is_distinct: self.is_distinct,
553 name: &self.name,
554 is_reversed: self.is_reversed,
555 exprs: &self.args,
556 };
557 self.fun.groups_accumulator_supported(args)
558 }
559
560 /// Return a specialized [`GroupsAccumulator`] that manages state
561 /// for all groups.
562 ///
563 /// For maximum performance, a [`GroupsAccumulator`] should be
564 /// implemented in addition to [`Accumulator`].
565 pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
566 let args = AccumulatorArgs {
567 return_field: Arc::clone(&self.return_field),
568 schema: &self.schema,
569 expr_fields: &self.arg_fields,
570 ignore_nulls: self.ignore_nulls,
571 order_bys: self.order_bys.as_ref(),
572 is_distinct: self.is_distinct,
573 name: &self.name,
574 is_reversed: self.is_reversed,
575 exprs: &self.args,
576 };
577 self.fun.create_groups_accumulator(args)
578 }
579
580 /// Construct an expression that calculates the aggregate in reverse.
581 /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
582 /// For aggregates that do not support calculation in reverse,
583 /// returns None (which is the default value).
584 pub fn reverse_expr(&self) -> Option<AggregateFunctionExpr> {
585 match self.fun.reverse_udf() {
586 ReversedUDAF::NotSupported => None,
587 ReversedUDAF::Identical => Some(self.clone()),
588 ReversedUDAF::Reversed(reverse_udf) => {
589 let mut name = self.name().to_string();
590 // If the function is changed, we need to reverse order_by clause as well
591 // i.e. First(a order by b asc null first) -> Last(a order by b desc null last)
592 if self.fun().name() != reverse_udf.name() {
593 replace_order_by_clause(&mut name);
594 }
595 replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name());
596
597 AggregateExprBuilder::new(reverse_udf, self.args.to_vec())
598 .order_by(self.order_bys.iter().map(|e| e.reverse()).collect())
599 .schema(Arc::new(self.schema.clone()))
600 .alias(name)
601 .with_ignore_nulls(self.ignore_nulls)
602 .with_distinct(self.is_distinct)
603 .with_reversed(!self.is_reversed)
604 .build()
605 .ok()
606 }
607 }
608 }
609
610 /// Returns all expressions used in the [`AggregateFunctionExpr`].
611 /// These expressions are (1)function arguments, (2) order by expressions.
612 pub fn all_expressions(&self) -> AggregatePhysicalExpressions {
613 let args = self.expressions();
614 let order_by_exprs = self
615 .order_bys()
616 .iter()
617 .map(|sort_expr| Arc::clone(&sort_expr.expr))
618 .collect();
619 AggregatePhysicalExpressions {
620 args,
621 order_by_exprs,
622 }
623 }
624
625 /// Rewrites [`AggregateFunctionExpr`], with new expressions given. The argument should be consistent
626 /// with the return value of the [`AggregateFunctionExpr::all_expressions`] method.
627 /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
628 pub fn with_new_expressions(
629 &self,
630 args: Vec<Arc<dyn PhysicalExpr>>,
631 order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
632 ) -> Option<AggregateFunctionExpr> {
633 if args.len() != self.args.len()
634 || (self.order_sensitivity() != AggregateOrderSensitivity::Insensitive
635 && order_by_exprs.len() != self.order_bys.len())
636 {
637 return None;
638 }
639
640 let new_order_bys = self
641 .order_bys
642 .iter()
643 .zip(order_by_exprs)
644 .map(|(req, new_expr)| PhysicalSortExpr {
645 expr: new_expr,
646 options: req.options,
647 })
648 .collect();
649
650 Some(AggregateFunctionExpr {
651 fun: self.fun.clone(),
652 args,
653 // TODO: need to align arg_fields here with new args
654 // https://github.com/apache/datafusion/issues/18149
655 arg_fields: self.arg_fields.clone(),
656 return_field: Arc::clone(&self.return_field),
657 name: self.name.clone(),
658 // TODO: Human name should be updated after re-write to not mislead
659 human_display: self.human_display.clone(),
660 schema: self.schema.clone(),
661 order_bys: new_order_bys,
662 ignore_nulls: self.ignore_nulls,
663 ordering_fields: self.ordering_fields.clone(),
664 is_distinct: self.is_distinct,
665 is_reversed: false,
666 input_fields: self.input_fields.clone(),
667 is_nullable: self.is_nullable,
668 })
669 }
670
671 /// If this function is max, return (output_field, true)
672 /// if the function is min, return (output_field, false)
673 /// otherwise return None (the default)
674 ///
675 /// output_field is the name of the column produced by this aggregate
676 ///
677 /// Note: this is used to use special aggregate implementations in certain conditions
678 pub fn get_minmax_desc(&self) -> Option<(FieldRef, bool)> {
679 self.fun.is_descending().map(|flag| (self.field(), flag))
680 }
681
682 /// Returns default value of the function given the input is Null
683 /// Most of the aggregate function return Null if input is Null,
684 /// while `count` returns 0 if input is Null
685 pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
686 self.fun.default_value(data_type)
687 }
688
689 /// Indicates whether the aggregation function is monotonic as a set
690 /// function. See [`SetMonotonicity`] for details.
691 pub fn set_monotonicity(&self) -> SetMonotonicity {
692 let field = self.field();
693 let data_type = field.data_type();
694 self.fun.inner().set_monotonicity(data_type)
695 }
696
697 /// Returns `PhysicalSortExpr` based on the set monotonicity of the function.
698 pub fn get_result_ordering(&self, aggr_func_idx: usize) -> Option<PhysicalSortExpr> {
699 // If the aggregate expressions are set-monotonic, the output data is
700 // naturally ordered with it per group or partition.
701 let monotonicity = self.set_monotonicity();
702 if monotonicity == SetMonotonicity::NotMonotonic {
703 return None;
704 }
705 let expr = Arc::new(Column::new(self.name(), aggr_func_idx));
706 let options =
707 SortOptions::new(monotonicity == SetMonotonicity::Decreasing, false);
708 Some(PhysicalSortExpr { expr, options })
709 }
710}
711
712/// Stores the physical expressions used inside the `AggregateExpr`.
713pub struct AggregatePhysicalExpressions {
714 /// Aggregate function arguments
715 pub args: Vec<Arc<dyn PhysicalExpr>>,
716 /// Order by expressions
717 pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
718}
719
720impl PartialEq for AggregateFunctionExpr {
721 fn eq(&self, other: &Self) -> bool {
722 self.name == other.name
723 && self.return_field == other.return_field
724 && self.fun == other.fun
725 && self.args.len() == other.args.len()
726 && self
727 .args
728 .iter()
729 .zip(other.args.iter())
730 .all(|(this_arg, other_arg)| this_arg.eq(other_arg))
731 }
732}
733
734fn replace_order_by_clause(order_by: &mut String) {
735 let suffixes = [
736 (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
737 (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
738 (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
739 (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
740 ];
741
742 if let Some(start) = order_by.find("ORDER BY [") {
743 if let Some(end) = order_by[start..].find(']') {
744 let order_by_start = start + 9;
745 let order_by_end = start + end;
746
747 let column_order = &order_by[order_by_start..=order_by_end];
748 for (suffix, replacement) in suffixes {
749 if column_order.ends_with(suffix) {
750 let new_order = column_order.replace(suffix, replacement);
751 order_by.replace_range(order_by_start..=order_by_end, &new_order);
752 break;
753 }
754 }
755 }
756 }
757}
758
759fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) {
760 *aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
761}