datafusion-expr-common 48.0.0

Logical plan and expression representation for DataFusion query engine
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Accumulator module contains the trait definition for aggregation function's accumulators.

use arrow::array::ArrayRef;
use datafusion_common::{internal_err, Result, ScalarValue};
use std::fmt::Debug;

/// Tracks an aggregate function's state.
///
/// `Accumulator`s are stateful objects that implement a single group. They
/// aggregate values from multiple rows together into a final output aggregate.
///
/// [`GroupsAccumulator]` is an additional more performant (but also complex) API
/// that manages state for multiple groups at once.
///
/// An accumulator knows how to:
/// * update its state from inputs via [`update_batch`]
///
/// * compute the final value from its internal state via [`evaluate`]
///
/// * retract an update to its state from given inputs via
///   [`retract_batch`] (when used as a window aggregate [window
///   function])
///
/// * convert its internal state to a vector of aggregate values via
///   [`state`] and combine the state from multiple accumulators
///   via [`merge_batch`], as part of efficient multi-phase grouping.
///
/// [`update_batch`]: Self::update_batch
/// [`retract_batch`]: Self::retract_batch
/// [`state`]: Self::state
/// [`evaluate`]: Self::evaluate
/// [`merge_batch`]: Self::merge_batch
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
pub trait Accumulator: Send + Sync + Debug {
    /// Updates the accumulator's state from its input.
    ///
    /// `values` contains the arguments to this aggregate function.
    ///
    /// For example, the `SUM` accumulator maintains a running sum,
    /// and `update_batch` adds each of the input values to the
    /// running sum.
    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;

    /// Returns the final aggregate value, consuming the internal state.
    ///
    /// For example, the `SUM` accumulator maintains a running sum,
    /// and `evaluate` will produce that running sum as its output.
    ///
    /// This function should not be called twice, otherwise it will
    /// result in potentially non-deterministic behavior.
    ///
    /// This function gets `&mut self` to allow for the accumulator to build
    /// arrow-compatible internal state that can be returned without copying
    /// when possible (for example distinct strings)
    fn evaluate(&mut self) -> Result<ScalarValue>;

    /// Returns the allocated size required for this accumulator, in
    /// bytes, including `Self`.
    ///
    /// This value is used to calculate the memory used during
    /// execution so DataFusion can stay within its allotted limit.
    ///
    /// "Allocated" means that for internal containers such as `Vec`,
    /// the `capacity` should be used not the `len`.
    fn size(&self) -> usize;

    /// Returns the intermediate state of the accumulator, consuming the
    /// intermediate state.
    ///
    /// This function should not be called twice, otherwise it will
    /// result in potentially non-deterministic behavior.
    ///
    /// This function gets `&mut self` to allow for the accumulator to build
    /// arrow-compatible internal state that can be returned without copying
    /// when possible (for example distinct strings).
    ///
    /// Intermediate state is used for "multi-phase" grouping in
    /// DataFusion, where an aggregate is computed in parallel with
    /// multiple `Accumulator` instances, as described below:
    ///
    /// # Multi-Phase Grouping
    ///
    /// ```text
    ///    ///                               │                   evaluate() is called to
    ///                               │                   produce the final aggregate
    ///                               │                   value per group
    ///    ///                  ┌─────────────────────────┐
    ///                  │GroupBy                  │
    ///                  │(AggregateMode::Final)   │      state() is called for each
    ///                  │                         │      group and the resulting
    ///                  └─────────────────────────┘      RecordBatches passed to the
    ///                                                   Final GroupBy via merge_batch()
    ///    ///    ///              ┌────────────────┴───────────────┐
    ///              │                                │
    ///              │                                │
    /// ┌─────────────────────────┐      ┌─────────────────────────┐
    /// │        GroupBy          │      │        GroupBy          │
    /// │(AggregateMode::Partial) │      │(AggregateMode::Partial) │
    /// └─────────────────────────┘      └─────────────────────────┘
    ///              ▲                                ▲
    ///              │                                │    update_batch() is called for
    ///              │                                │    each input RecordBatch
    ///         .─────────.                      .─────────.
    ///      ,─'           '─.                ,─'           '─.
    ///     ;      Input      :              ;      Input      :
    ///     :   Partition 0   ;              :   Partition 1   ;
    ///      ╲               ╱                ╲               ╱
    ///       '─.         ,─'                  '─.         ,─'
    ///          `───────'                        `───────'
    /// ```
    ///
    /// The partial state is serialized as `Arrays` and then combined
    /// with other partial states from different instances of this
    /// Accumulator (that ran on different partitions, for example).
    ///
    /// The state can be and often is a different type than the output
    /// type of the [`Accumulator`] and needs different merge
    /// operations (for example, the partial state for `COUNT` needs
    /// to be summed together)
    ///
    /// Some accumulators can return multiple values for their
    /// intermediate states. For example, the average accumulator
    /// tracks `sum` and `n`, and this function should return a vector
    /// of two values, sum and n.
    ///
    /// Note that [`ScalarValue::List`] can be used to pass multiple
    /// values if the number of intermediate values is not known at
    /// planning time (e.g. for `MEDIAN`)
    ///
    /// # Multi-phase repartitioned Grouping
    ///
    /// Many multi-phase grouping plans contain a Repartition operation
    /// as well as shown below:
    ///
    /// ```text
    ///                ▲                          ▲
    ///                │                          │
    ///                │                          │
    ///                │                          │
    ///                │                          │
    ///                │                          │
    ///    ┌───────────────────────┐  ┌───────────────────────┐       4. Each AggregateMode::Final
    ///    │GroupBy                │  │GroupBy                │       GroupBy has an entry for its
    ///    │(AggregateMode::Final) │  │(AggregateMode::Final) │       subset of groups (in this case
    ///    │                       │  │                       │       that means half the entries)
    ///    └───────────────────────┘  └───────────────────────┘
    ///                ▲                          ▲
    ///                │                          │
    ///                └─────────────┬────────────┘
    ///    ///    ///    ///                 ┌─────────────────────────┐                   3. Repartitioning by hash(group
    ///                 │       Repartition       │                   keys) ensures that each distinct
    ///                 │         HASH(x)         │                   group key now appears in exactly
    ///                 └─────────────────────────┘                   one partition
    ///    ///    ///              ┌───────────────┴─────────────┐
    ///              │                             │
    ///              │                             │
    /// ┌─────────────────────────┐  ┌──────────────────────────┐     2. Each AggregateMode::Partial
    /// │        GroupBy          │  │       GroupBy            │     GroupBy has an entry for *all*
    /// │(AggregateMode::Partial) │  │ (AggregateMode::Partial) │     the groups
    /// └─────────────────────────┘  └──────────────────────────┘
    ///              ▲                             ▲
    ///              │                             │
    ///              │                             │
    ///         .─────────.                   .─────────.
    ///      ,─'           '─.             ,─'           '─.
    ///     ;      Input      :           ;      Input      :         1. Since input data is
    ///     :   Partition 0   ;           :   Partition 1   ;         arbitrarily or RoundRobin
    ///      ╲               ╱             ╲               ╱          distributed, each partition
    ///       '─.         ,─'               '─.         ,─'           likely has all distinct
    ///          `───────'                     `───────'
    /// ```
    ///
    /// This structure is used so that the `AggregateMode::Partial` accumulators
    /// reduces the cardinality of the input as soon as possible. Typically,
    /// each partial accumulator sees all groups in the input as the group keys
    /// are evenly distributed across the input.
    ///
    /// The final output is computed by repartitioning the result of
    /// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
    /// that each distinct group key appears in exactly one of the
    /// `AggregateMode::Final` GroupBy nodes. The outputs of the final nodes are
    /// then unioned together to produce the overall final output.
    ///
    /// Here is an example that shows the distribution of groups in the
    /// different phases
    ///
    /// ```text
    ///               ┌─────┐                ┌─────┐
    ///               │  1  │                │  3  │
    ///               ├─────┤                ├─────┤
    ///               │  2  │                │  4  │                After repartitioning by
    ///               └─────┘                └─────┘                hash(group keys), each distinct
    ///               ┌─────┐                ┌─────┐                group key now appears in exactly
    ///               │  1  │                │  3  │                one partition
    ///               ├─────┤                ├─────┤
    ///               │  2  │                │  4  │
    ///               └─────┘                └─────┘
    ///
    ///
    /// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
    ///
    ///               ┌─────┐                ┌─────┐
    ///               │  2  │                │  2  │
    ///               ├─────┤                ├─────┤
    ///               │  1  │                │  2  │
    ///               ├─────┤                ├─────┤
    ///               │  3  │                │  3  │
    ///               ├─────┤                ├─────┤
    ///               │  4  │                │  1  │
    ///               └─────┘                └─────┘                Input data is arbitrarily or
    ///                 ...                    ...                  RoundRobin distributed, each
    ///               ┌─────┐                ┌─────┐                partition likely has all
    ///               │  1  │                │  4  │                distinct group keys
    ///               ├─────┤                ├─────┤
    ///               │  4  │                │  3  │
    ///               ├─────┤                ├─────┤
    ///               │  1  │                │  1  │
    ///               ├─────┤                ├─────┤
    ///               │  4  │                │  3  │
    ///               └─────┘                └─────┘
    ///
    ///           group values           group values
    ///           in partition 0         in partition 1
    /// ```
    fn state(&mut self) -> Result<Vec<ScalarValue>>;

    /// Updates the accumulator's state from an `Array` containing one
    /// or more intermediate values.
    ///
    /// For some aggregates (such as `SUM`), merge_batch is the same
    /// as `update_batch`, but for some aggregates (such as `COUNT`)
    /// the operations differ. See [`Self::state`] for more details on how
    /// state is used and merged.
    ///
    /// The `states` array passed was formed by concatenating the
    /// results of calling [`Self::state`] on zero or more other
    /// `Accumulator` instances.
    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;

    /// Retracts (removed) an update (caused by the given inputs) to
    /// accumulator's state.
    ///
    /// This is the inverse operation of [`Self::update_batch`] and is used
    /// to incrementally calculate window aggregates where the `OVER`
    /// clause defines a bounded window.
    ///
    /// # Example
    ///
    /// For example, given the following input partition
    ///
    /// ```text
    ///                     │      current      │
    ///                            window
    ///                     │                   │
    ///                ┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
    ///     Input      │ A  │ B  │ C  │ D  │ E  │ F  │ G  │ H  │ I  │
    ///   partition    └────┴────┴────┴────┼────┴────┴────┴────┼────┘
    ///
    ///                                    │         next      │
    ///                                             window
    /// ```
    ///
    /// First, [`Self::evaluate`] will be called to produce the output
    /// for the current window.
    ///
    /// Then, to advance to the next window:
    ///
    /// First, [`Self::retract_batch`] will be called with the values
    /// that are leaving the window, `[B, C, D]` and then
    /// [`Self::update_batch`] will be called with the values that are
    /// entering the window, `[F, G, H]`.
    fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
        // TODO add retract for all accumulators
        internal_err!(
            "Retract should be implemented for aggregate functions when used with custom window frame queries"
        )
    }

    /// Does the accumulator support incrementally updating its value
    /// by *removing* values.
    ///
    /// If this function returns true, [`Self::retract_batch`] will be
    /// called for sliding window functions such as queries with an
    /// `OVER (ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING)`
    fn supports_retract_batch(&self) -> bool {
        false
    }
}