Trait datafusion::logical_expr::GroupsAccumulator

source ·
pub trait GroupsAccumulator: Send {
    // Required methods
    fn update_batch(
        &mut self,
        values: &[Arc<dyn Array>],
        group_indices: &[usize],
        opt_filter: Option<&BooleanArray>,
        total_num_groups: usize,
    ) -> Result<(), DataFusionError>;
    fn evaluate(
        &mut self,
        emit_to: EmitTo,
    ) -> Result<Arc<dyn Array>, DataFusionError>;
    fn state(
        &mut self,
        emit_to: EmitTo,
    ) -> Result<Vec<Arc<dyn Array>>, DataFusionError>;
    fn merge_batch(
        &mut self,
        values: &[Arc<dyn Array>],
        group_indices: &[usize],
        opt_filter: Option<&BooleanArray>,
        total_num_groups: usize,
    ) -> Result<(), DataFusionError>;
    fn size(&self) -> usize;

    // Provided methods
    fn convert_to_state(
        &self,
        _values: &[Arc<dyn Array>],
        _opt_filter: Option<&BooleanArray>,
    ) -> Result<Vec<Arc<dyn Array>>, DataFusionError> { ... }
    fn supports_convert_to_state(&self) -> bool { ... }
}
Expand description

GroupAccumulator implements a single aggregate (e.g. AVG) and stores the state for all groups internally.

§Notes on Implementing GroupAccumulator

All aggregates must first implement the simpler Accumulator trait, which handles state for a single group. Implementing GroupsAccumulator is optional and is harder to implement than Accumulator, but can be much faster for queries with many group values. See the Aggregating Millions of Groups Fast blog for more background.

§Details

Each group is assigned a group_index by the hash table and each accumulator manages the specific state, one per group_index.

group_indexes are contiguous (there aren’t gaps), and thus it is expected that each GroupAccumulator will use something like Vec<..> to store the group states.

Required Methods§

source

fn update_batch( &mut self, values: &[Arc<dyn Array>], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<(), DataFusionError>

Updates the accumulator’s state from its arguments, encoded as a vector of ArrayRefs.

  • values: the input arguments to the accumulator

  • group_indices: To which groups do the rows in values belong, group id)

  • opt_filter: if present, only update aggregate state using values[i] if opt_filter[i] is true

  • total_num_groups: the number of groups (the largest group_index is thus total_num_groups - 1).

Note that subsequent calls to update_batch may have larger total_num_groups as new groups are seen.

source

fn evaluate( &mut self, emit_to: EmitTo, ) -> Result<Arc<dyn Array>, DataFusionError>

Returns the final aggregate value for each group as a single RecordBatch, resetting the internal state.

The rows returned must be in group_index order: The value for group_index 0, followed by 1, etc. Any group_index that did not have values, should be null.

For example, a SUM accumulator maintains a running sum for each group, and evaluate will produce that running sum as its output for all groups, in group_index order

If emit_to is EmitTo::All, the accumulator should return all groups and release / reset its internal state equivalent to when it was first created.

If emit_to is EmitTo::First, only the first n groups should be emitted and the state for those first groups removed. State for the remaining groups must be retained for future use. The group_indices on subsequent calls to update_batch or merge_batch will be shifted down by n. See EmitTo::First for more details.

source

fn state( &mut self, emit_to: EmitTo, ) -> Result<Vec<Arc<dyn Array>>, DataFusionError>

Returns the intermediate aggregate state for this accumulator, used for multi-phase grouping, resetting its internal state.

See Accumulator::state for more information on multi-phase aggregation.

For example, AVG might return two arrays: SUM and COUNT but the MIN aggregate would just return a single array.

Note more sophisticated internal state can be passed as single StructArray rather than multiple arrays.

See Self::evaluate for details on the required output order and emit_to.

source

fn merge_batch( &mut self, values: &[Arc<dyn Array>], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<(), DataFusionError>

Merges intermediate state (the output from Self::state) into this accumulator’s current state.

For some aggregates (such as SUM), merge_batch is the same as update_batch, but for some aggregates (such as COUNT, where the partial counts must be summed) the operations differ. See Self::state for more details on how state is used and merged.

  • values: arrays produced from calling state previously to the accumulator

Other arguments are the same as for Self::update_batch;

source

fn size(&self) -> usize

Amount of memory used to store the state of this accumulator, in bytes.

This function is called once per batch, so it should be O(n) to compute, not O(num_groups)

Provided Methods§

source

fn convert_to_state( &self, _values: &[Arc<dyn Array>], _opt_filter: Option<&BooleanArray>, ) -> Result<Vec<Arc<dyn Array>>, DataFusionError>

Converts an input batch directly the intermediate aggregate state.

This is the equivalent of treating each input row as its own group. It is invoked when the Partial phase of a multi-phase aggregation is not reducing the cardinality enough to warrant spending more effort on pre-aggregation (see Background section below), and switches to passing intermediate state directly on to the next aggregation phase.

Examples:

  • COUNT: an array of 1s for each row in the input batch.
  • SUM/MIN/MAX: the input values themselves.
§Arguments
  • values: the input arguments to the accumulator
  • opt_filter: if present, any row where opt_filter[i] is false should be ignored
§Background

In a multi-phase aggregation (see Accumulator::state), the initial Partial phase reduces the cardinality of the input data as soon as possible in the plan.

This strategy is very effective for queries with a small number of groups, as most of the data is aggregated immediately and only a small amount of data must be repartitioned (see Accumulator::state for background)

However, for queries with a large number of groups, the Partial phase often does not reduce the cardinality enough to warrant the memory and CPU cost of actually performing the aggregation. For such cases, the HashAggregate operator will dynamically switch to passing intermediate state directly to the next aggregation phase with minimal processing using this method.

source

fn supports_convert_to_state(&self) -> bool

Returns true if Self::convert_to_state is implemented to support intermediate aggregate state conversion.

Implementations on Foreign Types§

source§

impl<F> GroupsAccumulator for BooleanGroupsAccumulator<F>
where F: Fn(bool, bool) -> bool + Send + Sync,

source§

fn update_batch( &mut self, values: &[Arc<dyn Array>], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<(), DataFusionError>

source§

fn evaluate( &mut self, emit_to: EmitTo, ) -> Result<Arc<dyn Array>, DataFusionError>

source§

fn state( &mut self, emit_to: EmitTo, ) -> Result<Vec<Arc<dyn Array>>, DataFusionError>

source§

fn merge_batch( &mut self, values: &[Arc<dyn Array>], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<(), DataFusionError>

source§

fn size(&self) -> usize

source§

fn convert_to_state( &self, values: &[Arc<dyn Array>], opt_filter: Option<&BooleanArray>, ) -> Result<Vec<Arc<dyn Array>>, DataFusionError>

source§

fn supports_convert_to_state(&self) -> bool

source§

impl<T, F> GroupsAccumulator for PrimitiveGroupsAccumulator<T, F>

source§

fn convert_to_state( &self, values: &[Arc<dyn Array>], opt_filter: Option<&BooleanArray>, ) -> Result<Vec<Arc<dyn Array>>, DataFusionError>

Converts an input batch directly to a state batch

The state is:

  • self.prim_fn for all non null, non filtered values
  • null otherwise
source§

fn update_batch( &mut self, values: &[Arc<dyn Array>], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<(), DataFusionError>

source§

fn evaluate( &mut self, emit_to: EmitTo, ) -> Result<Arc<dyn Array>, DataFusionError>

source§

fn state( &mut self, emit_to: EmitTo, ) -> Result<Vec<Arc<dyn Array>>, DataFusionError>

source§

fn merge_batch( &mut self, values: &[Arc<dyn Array>], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<(), DataFusionError>

source§

fn supports_convert_to_state(&self) -> bool

source§

fn size(&self) -> usize

Implementors§