Skip to main content

vortex_array/aggregate_fn/
erased.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Type-erased aggregate function ([`AggregateFnRef`]).
5
6use std::fmt::Debug;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::hash::Hash;
10use std::hash::Hasher;
11use std::sync::Arc;
12
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_utils::debug_with::DebugWith;
16
17use crate::aggregate_fn::AccumulatorRef;
18use crate::aggregate_fn::AggregateFnId;
19use crate::aggregate_fn::AggregateFnVTable;
20use crate::aggregate_fn::GroupedAccumulatorRef;
21use crate::aggregate_fn::options::AggregateFnOptions;
22use crate::aggregate_fn::typed::AggregateFnInner;
23use crate::aggregate_fn::typed::DynAggregateFn;
24use crate::dtype::DType;
25
26/// A type-erased aggregate function, pairing a vtable with bound options behind a trait object.
27///
28/// This stores an [`AggregateFnVTable`] and its options behind an `Arc<dyn DynAggregateFn>`,
29/// allowing heterogeneous storage and dispatch.
30///
31/// Use [`super::AggregateFn::new()`] to construct, and [`super::AggregateFn::erased()`] to
32/// obtain an [`AggregateFnRef`].
33#[derive(Clone)]
34pub struct AggregateFnRef(pub(super) Arc<dyn DynAggregateFn>);
35
36impl AggregateFnRef {
37    /// Returns the ID of this aggregate function.
38    pub fn id(&self) -> AggregateFnId {
39        self.0.id()
40    }
41
42    /// Returns whether the aggregate function is of the given vtable type.
43    pub fn is<V: AggregateFnVTable>(&self) -> bool {
44        self.0.as_any().is::<AggregateFnInner<V>>()
45    }
46
47    /// Returns the typed options for this aggregate function if it matches the given vtable type.
48    pub fn as_opt<V: AggregateFnVTable>(&self) -> Option<&V::Options> {
49        self.downcast_inner::<V>().map(|inner| &inner.options)
50    }
51
52    /// Returns a reference to the typed vtable if it matches the given vtable type.
53    pub fn vtable_ref<V: AggregateFnVTable>(&self) -> Option<&V> {
54        self.downcast_inner::<V>().map(|inner| &inner.vtable)
55    }
56
57    /// Downcast the inner to the concrete `AggregateFnInner<V>`.
58    fn downcast_inner<V: AggregateFnVTable>(&self) -> Option<&AggregateFnInner<V>> {
59        self.0.as_any().downcast_ref::<AggregateFnInner<V>>()
60    }
61
62    /// Returns the typed options for this aggregate function if it matches the given vtable type.
63    ///
64    /// # Panics
65    ///
66    /// Panics if the vtable type does not match.
67    pub fn as_<V: AggregateFnVTable>(&self) -> &V::Options {
68        self.as_opt::<V>()
69            .vortex_expect("Aggregate function options type mismatch")
70    }
71
72    /// The type-erased options for this aggregate function.
73    pub fn options(&self) -> AggregateFnOptions<'_> {
74        AggregateFnOptions { inner: &*self.0 }
75    }
76
77    /// Coerce the input type for this aggregate function.
78    pub fn coerce_args(&self, input_dtype: &DType) -> VortexResult<DType> {
79        self.0.coerce_args(input_dtype)
80    }
81
82    /// Compute the return [`DType`] per group given the input element type.
83    ///
84    /// Returns `None` if the input dtype is not supported by the aggregate function.
85    pub fn return_dtype(&self, input_dtype: &DType) -> Option<DType> {
86        self.0.return_dtype(input_dtype)
87    }
88
89    /// DType of the intermediate accumulator state.
90    ///
91    /// Returns `None` if the input dtype is not supported by the aggregate function.
92    pub fn state_dtype(&self, input_dtype: &DType) -> Option<DType> {
93        self.0.state_dtype(input_dtype)
94    }
95
96    /// Create an accumulator for streaming aggregation.
97    pub fn accumulator(&self, input_dtype: &DType) -> VortexResult<AccumulatorRef> {
98        self.0.accumulator(input_dtype)
99    }
100
101    /// Create a grouped accumulator for grouped streaming aggregation.
102    pub fn accumulator_grouped(&self, input_dtype: &DType) -> VortexResult<GroupedAccumulatorRef> {
103        self.0.accumulator_grouped(input_dtype)
104    }
105}
106
107impl Debug for AggregateFnRef {
108    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("AggregateFnRef")
110            .field("vtable", &self.0.id())
111            .field("options", &DebugWith(|fmt| self.0.options_debug(fmt)))
112            .finish()
113    }
114}
115
116impl Display for AggregateFnRef {
117    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118        write!(f, "{}(", self.0.id())?;
119        self.0.options_display(f)?;
120        write!(f, ")")
121    }
122}
123
124impl PartialEq for AggregateFnRef {
125    fn eq(&self, other: &Self) -> bool {
126        self.0.id() == other.0.id() && self.0.options_eq(other.0.options_any())
127    }
128}
129impl Eq for AggregateFnRef {}
130
131impl Hash for AggregateFnRef {
132    fn hash<H: Hasher>(&self, state: &mut H) {
133        self.0.id().hash(state);
134        self.0.options_hash(state);
135    }
136}