pub trait AggregateUDFImpl:
Debug
+ DynEq
+ DynHash
+ Send
+ Sync
+ Any {
Show 29 methods
// Required methods
fn name(&self) -> &str;
fn signature(&self) -> &Signature;
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
fn accumulator(
&self,
acc_args: AccumulatorArgs<'_>,
) -> Result<Box<dyn Accumulator>>;
// Provided methods
fn aliases(&self) -> &[String] { ... }
fn schema_name(&self, params: &AggregateFunctionParams) -> Result<String> { ... }
fn human_display(&self, params: &AggregateFunctionParams) -> Result<String> { ... }
fn window_function_schema_name(
&self,
params: &WindowFunctionParams,
) -> Result<String> { ... }
fn display_name(&self, params: &AggregateFunctionParams) -> Result<String> { ... }
fn window_function_display_name(
&self,
params: &WindowFunctionParams,
) -> Result<String> { ... }
fn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef> { ... }
fn is_nullable(&self) -> bool { ... }
fn state_fields(&self, args: StateFieldsArgs<'_>) -> Result<Vec<FieldRef>> { ... }
fn groups_accumulator_supported(&self, _args: AccumulatorArgs<'_>) -> bool { ... }
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs<'_>,
) -> Result<Box<dyn GroupsAccumulator>> { ... }
fn create_sliding_accumulator(
&self,
args: AccumulatorArgs<'_>,
) -> Result<Box<dyn Accumulator>> { ... }
fn with_beneficial_ordering(
self: Arc<Self>,
_beneficial_ordering: bool,
) -> Result<Option<Arc<dyn AggregateUDFImpl>>> { ... }
fn order_sensitivity(&self) -> AggregateOrderSensitivity { ... }
fn simplify(&self) -> Option<AggregateFunctionSimplification> { ... }
fn simplify_expr_op_literal(
&self,
_agg_function: &AggregateFunction,
_arg: &Expr,
_op: Operator,
_lit: &Expr,
_arg_is_left: bool,
) -> Result<Option<Expr>> { ... }
fn reverse_expr(&self) -> ReversedUDAF { ... }
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> { ... }
fn is_descending(&self) -> Option<bool> { ... }
fn value_from_stats(
&self,
_statistics_args: &StatisticsArgs<'_>,
) -> Option<ScalarValue> { ... }
fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> { ... }
fn supports_null_handling_clause(&self) -> bool { ... }
fn supports_within_group_clause(&self) -> bool { ... }
fn documentation(&self) -> Option<&Documentation> { ... }
fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity { ... }
}Expand description
Trait for implementing AggregateUDF.
This trait exposes the full API for implementing user defined aggregate functions and can be used to implement any function.
See advanced_udaf.rs for a full example with complete implementation and
AggregateUDF for other available options.
§Basic Example
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct GeoMeanUdf {
signature: Signature,
}
impl GeoMeanUdf {
fn new() -> Self {
Self {
signature: Signature::uniform(1, vec![DataType::Float64], Volatility::Immutable),
}
}
}
static DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
Documentation::builder(DOC_SECTION_AGGREGATE, "calculates a geometric mean", "geo_mean(2.0)")
.with_argument("arg1", "The Float64 number for the geometric mean")
.build()
});
fn get_doc() -> &'static Documentation {
&DOCUMENTATION
}
/// Implement the AggregateUDFImpl trait for GeoMeanUdf
impl AggregateUDFImpl for GeoMeanUdf {
fn name(&self) -> &str { "geo_mean" }
fn signature(&self) -> &Signature { &self.signature }
fn return_type(&self, args: &[DataType]) -> Result<DataType> {
if !matches!(args.get(0), Some(&DataType::Float64)) {
return plan_err!("geo_mean only accepts Float64 arguments");
}
Ok(DataType::Float64)
}
// This is the accumulator factory; DataFusion uses it to create new accumulators.
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { unimplemented!() }
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
Ok(vec![
Arc::new(args.return_field.as_ref().clone().with_name("value")),
Arc::new(Field::new("ordering", DataType::UInt32, true))
])
}
fn documentation(&self) -> Option<&Documentation> {
Some(get_doc())
}
}
// Create a new AggregateUDF from the implementation
let geometric_mean = AggregateUDF::from(GeoMeanUdf::new());
// Call the function `geo_mean(col)`
let expr = geometric_mean.call(vec![col("a")]);Required Methods§
Sourcefn signature(&self) -> &Signature
fn signature(&self) -> &Signature
Returns the function’s Signature for information about what input
types are accepted and the function’s Volatility.
Sourcefn return_type(&self, arg_types: &[DataType]) -> Result<DataType>
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>
What DataType will be returned by this function, given the types of
the arguments
Sourcefn accumulator(
&self,
acc_args: AccumulatorArgs<'_>,
) -> Result<Box<dyn Accumulator>>
fn accumulator( &self, acc_args: AccumulatorArgs<'_>, ) -> Result<Box<dyn Accumulator>>
Return a new Accumulator that aggregates values for a specific
group during query execution.
acc_args: AccumulatorArgs contains information about how the
aggregate function was called.
Provided Methods§
Sourcefn aliases(&self) -> &[String]
fn aliases(&self) -> &[String]
Returns any aliases (alternate names) for this function.
Note: aliases should only include names other than Self::name.
Defaults to [] (no aliases)
Sourcefn schema_name(&self, params: &AggregateFunctionParams) -> Result<String>
fn schema_name(&self, params: &AggregateFunctionParams) -> Result<String>
Returns the name of the column this expression would create
See Expr::schema_name for details
Example of schema_name: count(DISTINCT column1) FILTER (WHERE column2 > 10) ORDER BY [..]
Sourcefn human_display(&self, params: &AggregateFunctionParams) -> Result<String>
fn human_display(&self, params: &AggregateFunctionParams) -> Result<String>
Returns a human readable expression.
See Expr::human_display for details.
Sourcefn window_function_schema_name(
&self,
params: &WindowFunctionParams,
) -> Result<String>
fn window_function_schema_name( &self, params: &WindowFunctionParams, ) -> Result<String>
Returns the name of the column this expression would create
See Expr::schema_name for details
Different from schema_name in that it is used for window aggregate function
Example of schema_name: count(DISTINCT column1) FILTER (WHERE column2 > 10) [PARTITION BY [..]] [ORDER BY [..]]
Sourcefn display_name(&self, params: &AggregateFunctionParams) -> Result<String>
fn display_name(&self, params: &AggregateFunctionParams) -> Result<String>
Returns the user-defined display name of function, given the arguments
This can be used to customize the output column name generated by this function.
Defaults to function_name([DISTINCT] column1, column2, ..) [null_treatment] [filter] [order_by [..]]
Sourcefn window_function_display_name(
&self,
params: &WindowFunctionParams,
) -> Result<String>
fn window_function_display_name( &self, params: &WindowFunctionParams, ) -> Result<String>
Returns the user-defined display name of function, given the arguments
This can be used to customize the output column name generated by this function.
Different from display_name in that it is used for window aggregate function
Defaults to function_name([DISTINCT] column1, column2, ..) [null_treatment] [partition by [..]] [order_by [..]]
Sourcefn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef>
fn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef>
What type will be returned by this function, given the arguments?
By default, this function calls Self::return_type with the
types of each argument.
§Notes
Most UDFs should implement Self::return_type and not this
function as the output type for most functions only depends on the types
of their inputs (e.g. sum(f64) is always f64).
This function can be used for more advanced cases such as:
- specifying nullability
- return types based on the values of the arguments (rather than their types.
- return types based on metadata within the fields of the inputs
Sourcefn is_nullable(&self) -> bool
fn is_nullable(&self) -> bool
Whether the aggregate function is nullable.
Nullable means that the function could return null for any inputs.
For example, aggregate functions like COUNT always return a non null value
but others like MIN will return NULL if there is nullable input.
Note that if the function is declared as not nullable, make sure the AggregateUDFImpl::default_value is non-null
Sourcefn state_fields(&self, args: StateFieldsArgs<'_>) -> Result<Vec<FieldRef>>
fn state_fields(&self, args: StateFieldsArgs<'_>) -> Result<Vec<FieldRef>>
Return the fields used to store the intermediate state of this accumulator.
See Accumulator::state for background information.
args: StateFieldsArgs contains arguments passed to the
aggregate function’s accumulator.
§Notes:
The default implementation returns a single state field named name
with the same type as value_type. This is suitable for aggregates such
as SUM or MIN where partial state can be combined by applying the
same aggregate.
For aggregates such as AVG where the partial state is more complex
(e.g. a COUNT and a SUM), this method is used to define the additional
fields.
The name of the fields must be unique within the query and thus should
be derived from name. See format_state_name for a utility function
to generate a unique name.
Sourcefn groups_accumulator_supported(&self, _args: AccumulatorArgs<'_>) -> bool
fn groups_accumulator_supported(&self, _args: AccumulatorArgs<'_>) -> bool
If the aggregate expression has a specialized
GroupsAccumulator implementation. If this returns true,
[Self::create_groups_accumulator] will be called.
§Notes
Even if this function returns true, DataFusion will still use
Self::accumulator for certain queries, such as when this aggregate is
used as a window function or when there no GROUP BY columns in the
query.
Sourcefn create_groups_accumulator(
&self,
_args: AccumulatorArgs<'_>,
) -> Result<Box<dyn GroupsAccumulator>>
fn create_groups_accumulator( &self, _args: AccumulatorArgs<'_>, ) -> Result<Box<dyn GroupsAccumulator>>
Return a specialized GroupsAccumulator that manages state
for all groups.
For maximum performance, a GroupsAccumulator should be
implemented in addition to Accumulator.
Sourcefn create_sliding_accumulator(
&self,
args: AccumulatorArgs<'_>,
) -> Result<Box<dyn Accumulator>>
fn create_sliding_accumulator( &self, args: AccumulatorArgs<'_>, ) -> Result<Box<dyn Accumulator>>
Sliding accumulator is an alternative accumulator that can be used for window functions. It has retract method to revert the previous update.
See retract_batch for more details.
Sourcefn with_beneficial_ordering(
self: Arc<Self>,
_beneficial_ordering: bool,
) -> Result<Option<Arc<dyn AggregateUDFImpl>>>
fn with_beneficial_ordering( self: Arc<Self>, _beneficial_ordering: bool, ) -> Result<Option<Arc<dyn AggregateUDFImpl>>>
Sets the indicator whether ordering requirements of the AggregateUDFImpl is
satisfied by its input. If this is not the case, UDFs with order
sensitivity AggregateOrderSensitivity::Beneficial can still produce
the correct result with possibly more work internally.
§Returns
Returns Ok(Some(updated_udf)) if the process completes successfully.
If the expression can benefit from existing input ordering, but does
not implement the method, returns an error. Order insensitive and hard
requirement aggregators return Ok(None).
Sourcefn order_sensitivity(&self) -> AggregateOrderSensitivity
fn order_sensitivity(&self) -> AggregateOrderSensitivity
Gets the order sensitivity of the UDF. See AggregateOrderSensitivity
for possible options.
Sourcefn simplify(&self) -> Option<AggregateFunctionSimplification>
fn simplify(&self) -> Option<AggregateFunctionSimplification>
Returns an optional hook for simplifying this user-defined aggregate.
Use this hook to apply function-specific rewrites during optimization.
The default implementation returns None.
For example, percentile_cont(x, 0.0) and percentile_cont(x, 1.0) can
be rewritten to MIN(x) or MAX(x) depending on the ORDER BY
direction.
DataFusion already simplifies arguments and performs constant folding
(for example, my_add(1, 2) -> 3). For nested expressions, the optimizer
runs simplification in multiple passes, so arguments are typically
simplified before this hook is invoked. As a result, UDF implementations
usually do not need to handle argument simplification themselves.
See configuration datafusion.optimizer.max_passes for details on how many
optimization passes may be applied.
§Returns
None if simplify is not defined.
Or, a closure (AggregateFunctionSimplification) invoked with:
aggregate_function: AggregateFunction with already simplified argumentsinfo: crate::simplify::SimplifyContext
The closure returns a simplified Expr or an error.
§Notes
The returned expression must have the same schema as the original expression, including both the data type and nullability. For example, if the original expression is nullable, the returned expression must also be nullable, otherwise it may lead to schema verification errors later in query planning.
Sourcefn simplify_expr_op_literal(
&self,
_agg_function: &AggregateFunction,
_arg: &Expr,
_op: Operator,
_lit: &Expr,
_arg_is_left: bool,
) -> Result<Option<Expr>>
fn simplify_expr_op_literal( &self, _agg_function: &AggregateFunction, _arg: &Expr, _op: Operator, _lit: &Expr, _arg_is_left: bool, ) -> Result<Option<Expr>>
Rewrite the aggregate to have simpler arguments
This query pattern is not common in most real workloads, and most aggregate implementations can safely ignore it. This API is included in DataFusion because it is important for ClickBench Q29. See backstory on https://github.com/apache/datafusion/issues/15524
§Rewrite Overview
The idea is to rewrite multiple aggregates with “complex arguments” into ones with simpler arguments that can be optimized by common subexpression elimination (CSE). At a high level the rewrite looks like
Aggregate(SUM(x + 1), SUM(x + 2), ...)
Into
Aggregate(SUM(x) + 1 * COUNT(x), SUM(x) + 2 * COUNT(x), ...)
While this rewrite may seem worse (slower) than the original as it computes more aggregate expressions, the common subexpression elimination (CSE) can then reduce the number of distinct aggregates the query actually needs to compute with a rewrite like
Projection(_A + 1*_B, _A + 2*_B)Aggregate(_A = SUM(x), _B = COUNT(x))
This optimization is extremely important for ClickBench Q29, which has 90
such expressions for some reason, and so this optimization results in
only two aggregates being needed. The DataFusion optimizer will invoke
this method when it detects multiple aggregates in a query that share
arguments of the form <arg> <op> <literal>.
§API
If agg_function supports the rewrite, it should return a semantically
equivalent expression (likely with more aggregate expressions, but
simpler arguments)
This is only called when:
- There are no “special” aggregate params (filters, null handling, etc)
- Aggregate functions with exactly one
Exprargument - There are no volatile expressions
Arguments
agg_function: the original aggregate function detected with complex arguments.arg: The common argument shared across multiple aggregates (e.g.xin the example above)op: the operator between the common argument and the literal (e.g.+inx + 1or1 + x)lit: the literal argument (e.g.1or2in the example above)arg_is_left: whether the common argument is on the left or right of the operator (e.g.trueforx + 1and false for1 + x)
The default implementation returns None, which is what most aggregates
should do.
Sourcefn reverse_expr(&self) -> ReversedUDAF
fn reverse_expr(&self) -> ReversedUDAF
Returns the reverse expression of the aggregate function.
Sourcefn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>>
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>>
Coerce arguments of a function call to types that the function can evaluate.
This function is only called if AggregateUDFImpl::signature returns crate::TypeSignature::UserDefined. Most
UDAFs should return one of the other variants of TypeSignature which handle common
cases
See the type coercion module documentation for more details on type coercion
For example, if your function requires a floating point arguments, but the user calls
it like my_func(1::int) (aka with 1 as an integer), coerce_types could return [DataType::Float64]
to ensure the argument was cast to 1::double
§Parameters
arg_types: The argument types of the arguments this function with
§Return value
A Vec the same length as arg_types. DataFusion will CAST the function call
arguments to these specific types.
Sourcefn is_descending(&self) -> Option<bool>
fn is_descending(&self) -> Option<bool>
If this function is max, return true If the function is min, return false Otherwise return None (the default)
Note: this is used to use special aggregate implementations in certain conditions
Sourcefn value_from_stats(
&self,
_statistics_args: &StatisticsArgs<'_>,
) -> Option<ScalarValue>
fn value_from_stats( &self, _statistics_args: &StatisticsArgs<'_>, ) -> Option<ScalarValue>
Return the value of this aggregate function if it can be determined entirely from statistics and arguments.
Using a ScalarValue rather than a runtime computation can significantly
improving query performance.
For example, if the minimum value of column x is known to be 42 from
statistics, then the aggregate MIN(x) should return Some(ScalarValue(42))
Sourcefn default_value(&self, data_type: &DataType) -> Result<ScalarValue>
fn default_value(&self, data_type: &DataType) -> Result<ScalarValue>
Returns default value of the function given the input is all null.
Most of the aggregate function return Null if input is Null,
while count returns 0 if input is Null
Sourcefn supports_null_handling_clause(&self) -> bool
fn supports_null_handling_clause(&self) -> bool
If this function supports [IGNORE NULLS | RESPECT NULLS] SQL clause,
return true. Otherwise, return false which will cause an error to be
raised during SQL parsing if these clauses are detected for this function.
Functions which implement this as true are expected to handle the resulting
null handling config present in AccumulatorArgs, ignore_nulls.
Sourcefn supports_within_group_clause(&self) -> bool
fn supports_within_group_clause(&self) -> bool
If this function supports the WITHIN GROUP (ORDER BY column [ASC|DESC])
SQL syntax, return true. Otherwise, return false (default) which will
cause an error when parsing SQL where this syntax is detected for this
function.
This function should return true for ordered-set aggregate functions
only.
§Ordered-set aggregate functions
Ordered-set aggregate functions allow specifying a sort order that affects
how the function calculates its result, unlike other aggregate functions
like sum or count. For example, percentile_cont is an ordered-set
aggregate function that calculates the exact percentile value from a list
of values; the output of calculating the 0.75 percentile depends on if
you’re calculating on an ascending or descending list of values.
An example of how an ordered-set aggregate function is called with the
WITHIN GROUP SQL syntax:
-- Ascending
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY c1 ASC) FROM table;
-- Default ordering is ascending if not explicitly specified
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY c1) FROM table;
-- Descending
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY c1 DESC) FROM table;This calculates the 0.75 percentile of the column c1 from table,
according to the specific ordering. The column specified in the WITHIN GROUP
ordering clause is taken as the column to calculate values on; specifying
the WITHIN GROUP clause is optional so these queries are equivalent:
-- If no WITHIN GROUP is specified then default ordering is implementation
-- dependent; in this case ascending for percentile_cont
SELECT percentile_cont(c1, 0.75) FROM table;
SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY c1 ASC) FROM table;Aggregate UDFs can define their default ordering if the function is called
without the WITHIN GROUP clause, though a default of ascending is the
standard practice.
Ordered-set aggregate function implementations are responsible for handling
the input sort order themselves (e.g. percentile_cont must buffer and
sort the values internally). That is, DataFusion does not introduce any
kind of sort into the plan for these functions with this syntax.
Sourcefn documentation(&self) -> Option<&Documentation>
fn documentation(&self) -> Option<&Documentation>
Returns the documentation for this Aggregate UDF.
Documentation can be accessed programmatically as well as generating publicly facing documentation.
Sourcefn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity
fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity
Indicates whether the aggregation function is monotonic as a set
function. See SetMonotonicity for details.
Implementations§
Source§impl dyn AggregateUDFImpl
impl dyn AggregateUDFImpl
Sourcepub fn is<T: AggregateUDFImpl>(&self) -> bool
pub fn is<T: AggregateUDFImpl>(&self) -> bool
Returns true if the implementation is of type T.
Works correctly when called on Arc<dyn AggregateUDFImpl> via auto-deref.
Sourcepub fn downcast_ref<T: AggregateUDFImpl>(&self) -> Option<&T>
pub fn downcast_ref<T: AggregateUDFImpl>(&self) -> Option<&T>
Attempts to downcast to a concrete type T, returning None if the
implementation is not of that type.
Works correctly when called on Arc<dyn AggregateUDFImpl> via auto-deref,
unlike (&arc as &dyn Any).downcast_ref::<T>() which would attempt to
downcast the Arc itself.
Trait Implementations§
Source§impl PartialEq for dyn AggregateUDFImpl
impl PartialEq for dyn AggregateUDFImpl
Source§impl PartialOrd for dyn AggregateUDFImpl
impl PartialOrd for dyn AggregateUDFImpl
Dyn Compatibility§
This trait is dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".