pub trait Accumulator: Send + Sync + Debug {
    // Required methods
    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
    fn evaluate(&mut self) -> Result<ScalarValue>;
    fn size(&self) -> usize;
    fn state(&mut self) -> Result<Vec<ScalarValue>>;
    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;

    // Provided methods
    fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> { ... }
    fn supports_retract_batch(&self) -> bool { ... }
}
Expand description

Tracks an aggregate function’s state.

Accumulators are stateful objects that implement a single group. They aggregate values from multiple rows together into a final output aggregate.

[GroupsAccumulator] is an additional more performant (but also complex) API that manages state for multiple groups at once.

An accumulator knows how to:

  • update its state from inputs via update_batch

  • compute the final value from its internal state via evaluate

  • retract an update to its state from given inputs via retract_batch (when used as a window aggregate window function)

  • convert its internal state to a vector of aggregate values via state and combine the state from multiple accumulators’ via merge_batch, as part of efficient multi-phase grouping.

Required Methods§

source

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>

Updates the accumulator’s state from its input.

values contains the arguments to this aggregate function.

For example, the SUM accumulator maintains a running sum, and update_batch adds each of the input values to the running sum.

source

fn evaluate(&mut self) -> Result<ScalarValue>

Returns the final aggregate value, consuming the internal state.

For example, the SUM accumulator maintains a running sum, and evaluate will produce that running sum as its output.

After this call, the accumulator’s internal state should be equivalent to when it was first created.

This function gets &mut self to allow for the accumulator to build arrow compatible internal state that can be returned without copying when possible (for example distinct strings)

source

fn size(&self) -> usize

Returns the allocated size required for this accumulator, in bytes, including Self.

This value is used to calculate the memory used during execution so DataFusion can stay within its allotted limit.

“Allocated” means that for internal containers such as Vec, the capacity should be used not the len.

source

fn state(&mut self) -> Result<Vec<ScalarValue>>

Returns the intermediate state of the accumulator, consuming the intermediate state.

After this call, the accumulator’s internal state should be equivalent to when it was first created.

This function gets &mut self to allow for the accumulator to build arrow compatible internal state that can be returned without copying when possible (for example distinct strings).

Intermediate state is used for “multi-phase” grouping in DataFusion, where an aggregate is computed in parallel with multiple Accumulator instances, as illustrated below:

§MultiPhase Grouping
                              ▲
                              │                   evaluate() is called to
                              │                   produce the final aggregate
                              │                   value per group
                              │
                 ┌─────────────────────────┐
                 │GroupBy                  │
                 │(AggregateMode::Final)   │      state() is called for each
                 │                         │      group and the resulting
                 └─────────────────────────┘      RecordBatches passed to the
                              ▲
                              │
             ┌────────────────┴───────────────┐
             │                                │
             │                                │
┌─────────────────────────┐      ┌─────────────────────────┐
│        GroubyBy         │      │        GroubyBy         │
│(AggregateMode::Partial) │      │(AggregateMode::Partial) │
└─────────────────────────┘      └────────────▲────────────┘
             ▲                                │
             │                                │    update_batch() is called for
             │                                │    each input RecordBatch
        .─────────.                      .─────────.
     ,─'           '─.                ,─'           '─.
    ;      Input      :              ;      Input      :
    :   Partition 0   ;              :   Partition 1   ;
     ╲               ╱                ╲               ╱
      '─.         ,─'                  '─.         ,─'
         `───────'                        `───────'

The partial state is serialied as Arrays and then combined with other partial states from different instances of this Accumulator (that ran on different partitions, for example).

The state can be and often is a different type than the output type of the Accumulator and needs different merge operations (for example, the partial state for COUNT needs to be summed together)

Some accumulators can return multiple values for their intermediate states. For example average, tracks sum and n, and this function should return a vector of two values, sum and n.

Note that ScalarValue::List can be used to pass multiple values if the number of intermediate values is not known at planning time (e.g. for MEDIAN)

source

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>

Updates the accumulator’s state from an Array containing one or more intermediate values.

For some aggregates (such as SUM), merge_batch is the same as update_batch, but for some aggregrates (such as COUNT) the operations differ. See Self::state for more details on how state is used and merged.

The states array passed was formed by concatenating the results of calling Self::state on zero or more other Accumulator instances.

Provided Methods§

source

fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()>

Retracts (removed) an update (caused by the given inputs) to accumulator’s state.

This is the inverse operation of Self::update_batch and is used to incrementally calculate window aggregates where the OVER clause defines a bounded window.

§Example

For example, given the following input partition

                    │      current      │
                           window
                    │                   │
               ┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
    Input      │ A  │ B  │ C  │ D  │ E  │ F  │ G  │ H  │ I  │
  partition    └────┴────┴────┴────┼────┴────┴────┴────┼────┘

                                   │         next      │
                                            window

First, Self::evaluate will be called to produce the output for the current window.

Then, to advance to the next window:

First, Self::retract_batch will be called with the values that are leaving the window, [B, C, D] and then Self::update_batch will be called with the values that are entering the window, [F, G, H].

source

fn supports_retract_batch(&self) -> bool

Does the accumulator support incrementally updating its value by removing values.

If this function returns true, Self::retract_batch will be called for sliding window functions such as queries with an OVER (ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING)

Implementors§