Trait datafusion_expr::Accumulator
source · pub trait Accumulator: Send + Sync + Debug {
// Required methods
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
fn evaluate(&mut self) -> Result<ScalarValue>;
fn size(&self) -> usize;
fn state(&mut self) -> Result<Vec<ScalarValue>>;
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
// Provided methods
fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> { ... }
fn supports_retract_batch(&self) -> bool { ... }
}Expand description
Tracks an aggregate function’s state.
Accumulators are stateful objects that implement a single group. They
aggregate values from multiple rows together into a final output aggregate.
[GroupsAccumulator] is an additional more performant (but also complex) API
that manages state for multiple groups at once.
An accumulator knows how to:
-
update its state from inputs via
update_batch -
compute the final value from its internal state via
evaluate -
retract an update to its state from given inputs via
retract_batch(when used as a window aggregate window function) -
convert its internal state to a vector of aggregate values via
stateand combine the state from multiple accumulators’ viamerge_batch, as part of efficient multi-phase grouping.
Required Methods§
sourcefn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>
Updates the accumulator’s state from its input.
values contains the arguments to this aggregate function.
For example, the SUM accumulator maintains a running sum,
and update_batch adds each of the input values to the
running sum.
sourcefn evaluate(&mut self) -> Result<ScalarValue>
fn evaluate(&mut self) -> Result<ScalarValue>
Returns the final aggregate value, consuming the internal state.
For example, the SUM accumulator maintains a running sum,
and evaluate will produce that running sum as its output.
After this call, the accumulator’s internal state should be equivalent to when it was first created.
This function gets &mut self to allow for the accumulator to build
arrow compatible internal state that can be returned without copying
when possible (for example distinct strings)
sourcefn size(&self) -> usize
fn size(&self) -> usize
Returns the allocated size required for this accumulator, in
bytes, including Self.
This value is used to calculate the memory used during execution so DataFusion can stay within its allotted limit.
“Allocated” means that for internal containers such as Vec,
the capacity should be used not the len.
sourcefn state(&mut self) -> Result<Vec<ScalarValue>>
fn state(&mut self) -> Result<Vec<ScalarValue>>
Returns the intermediate state of the accumulator, consuming the intermediate state.
After this call, the accumulator’s internal state should be equivalent to when it was first created.
This function gets &mut self to allow for the accumulator to build
arrow compatible internal state that can be returned without copying
when possible (for example distinct strings).
Intermediate state is used for “multi-phase” grouping in
DataFusion, where an aggregate is computed in parallel with
multiple Accumulator instances, as illustrated below:
§MultiPhase Grouping
▲
│ evaluate() is called to
│ produce the final aggregate
│ value per group
│
┌─────────────────────────┐
│GroupBy │
│(AggregateMode::Final) │ state() is called for each
│ │ group and the resulting
└─────────────────────────┘ RecordBatches passed to the
▲
│
┌────────────────┴───────────────┐
│ │
│ │
┌─────────────────────────┐ ┌─────────────────────────┐
│ GroubyBy │ │ GroubyBy │
│(AggregateMode::Partial) │ │(AggregateMode::Partial) │
└─────────────────────────┘ └────────────▲────────────┘
▲ │
│ │ update_batch() is called for
│ │ each input RecordBatch
.─────────. .─────────.
,─' '─. ,─' '─.
; Input : ; Input :
: Partition 0 ; : Partition 1 ;
╲ ╱ ╲ ╱
'─. ,─' '─. ,─'
`───────' `───────'
The partial state is serialied as Arrays and then combined
with other partial states from different instances of this
Accumulator (that ran on different partitions, for example).
The state can be and often is a different type than the output
type of the Accumulator and needs different merge
operations (for example, the partial state for COUNT needs
to be summed together)
Some accumulators can return multiple values for their
intermediate states. For example average, tracks sum and
n, and this function should return
a vector of two values, sum and n.
Note that ScalarValue::List can be used to pass multiple
values if the number of intermediate values is not known at
planning time (e.g. for MEDIAN)
sourcefn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>
Updates the accumulator’s state from an Array containing one
or more intermediate values.
For some aggregates (such as SUM), merge_batch is the same
as update_batch, but for some aggregrates (such as COUNT)
the operations differ. See Self::state for more details on how
state is used and merged.
The states array passed was formed by concatenating the
results of calling Self::state on zero or more other
Accumulator instances.
Provided Methods§
sourcefn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()>
fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()>
Retracts (removed) an update (caused by the given inputs) to accumulator’s state.
This is the inverse operation of Self::update_batch and is used
to incrementally calculate window aggregates where the OVER
clause defines a bounded window.
§Example
For example, given the following input partition
│ current │
window
│ │
┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
Input │ A │ B │ C │ D │ E │ F │ G │ H │ I │
partition └────┴────┴────┴────┼────┴────┴────┴────┼────┘
│ next │
window
First, Self::evaluate will be called to produce the output
for the current window.
Then, to advance to the next window:
First, Self::retract_batch will be called with the values
that are leaving the window, [B, C, D] and then
Self::update_batch will be called with the values that are
entering the window, [F, G, H].
sourcefn supports_retract_batch(&self) -> bool
fn supports_retract_batch(&self) -> bool
Does the accumulator support incrementally updating its value by removing values.
If this function returns true, Self::retract_batch will be
called for sliding window functions such as queries with an
OVER (ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING)