vortex_array/aggregate_fn/fns/null_count/
mod.rs1use vortex_error::VortexExpect;
5use vortex_error::VortexResult;
6use vortex_error::vortex_err;
7use vortex_session::VortexSession;
8use vortex_session::registry::CachedId;
9
10use crate::ArrayRef;
11use crate::Columnar;
12use crate::ExecutionCtx;
13use crate::IntoArray;
14use crate::aggregate_fn::Accumulator;
15use crate::aggregate_fn::AggregateFnId;
16use crate::aggregate_fn::AggregateFnVTable;
17use crate::aggregate_fn::DynAccumulator;
18use crate::aggregate_fn::EmptyOptions;
19use crate::dtype::DType;
20use crate::dtype::Nullability::NonNullable;
21use crate::dtype::PType;
22use crate::expr::stats::Precision;
23use crate::expr::stats::Stat;
24use crate::expr::stats::StatsProvider;
25use crate::scalar::Scalar;
26use crate::scalar::ScalarValue;
27
28pub fn null_count(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
30 if let Precision::Exact(null_count_scalar) = array.statistics().get(Stat::NullCount) {
31 return usize::try_from(&null_count_scalar)
32 .map_err(|e| vortex_err!("Failed to convert null count stat to usize: {e}"));
33 }
34
35 let mut acc = Accumulator::try_new(NullCount, EmptyOptions, array.dtype().clone())?;
36 acc.accumulate(array, ctx)?;
37 let result = acc.finish()?;
38
39 let count = result
40 .as_primitive()
41 .typed_value::<u64>()
42 .vortex_expect("null_count result should not be null");
43 let count_usize = usize::try_from(count).vortex_expect("Cannot be more nulls than usize::MAX");
44
45 array
46 .statistics()
47 .set(Stat::NullCount, Precision::Exact(ScalarValue::from(count)));
48
49 Ok(count_usize)
50}
51
52#[derive(Clone, Debug)]
56pub struct NullCount;
57
58impl AggregateFnVTable for NullCount {
59 type Options = EmptyOptions;
60 type Partial = u64;
61
62 fn id(&self) -> AggregateFnId {
63 static ID: CachedId = CachedId::new("vortex.null_count");
64 *ID
65 }
66
67 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
68 Ok(Some(vec![]))
69 }
70
71 fn deserialize(
72 &self,
73 _metadata: &[u8],
74 _session: &VortexSession,
75 ) -> VortexResult<Self::Options> {
76 Ok(EmptyOptions)
77 }
78
79 fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
80 Some(DType::Primitive(PType::U64, NonNullable))
81 }
82
83 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
84 self.return_dtype(options, input_dtype)
85 }
86
87 fn empty_partial(
88 &self,
89 _options: &Self::Options,
90 _input_dtype: &DType,
91 ) -> VortexResult<Self::Partial> {
92 Ok(0)
93 }
94
95 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
96 let count = other
97 .as_primitive()
98 .typed_value::<u64>()
99 .vortex_expect("null_count partial should not be null");
100 *partial += count;
101 Ok(())
102 }
103
104 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
105 Ok(Scalar::primitive(*partial, NonNullable))
106 }
107
108 fn reset(&self, partial: &mut Self::Partial) {
109 *partial = 0;
110 }
111
112 #[inline]
113 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
114 false
115 }
116
117 fn try_accumulate(
118 &self,
119 state: &mut Self::Partial,
120 batch: &ArrayRef,
121 ctx: &mut ExecutionCtx,
122 ) -> VortexResult<bool> {
123 *state += batch.invalid_count(ctx)? as u64;
124 Ok(true)
125 }
126
127 fn accumulate(
128 &self,
129 partial: &mut Self::Partial,
130 batch: &Columnar,
131 ctx: &mut ExecutionCtx,
132 ) -> VortexResult<()> {
133 *partial += match batch {
134 Columnar::Constant(c) => {
135 if c.scalar().is_null() {
136 c.len() as u64
137 } else {
138 0
139 }
140 }
141 Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? as u64,
142 };
143 Ok(())
144 }
145
146 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
147 Ok(partials)
148 }
149
150 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
151 self.to_scalar(partial)
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use vortex_error::VortexResult;
158
159 use crate::IntoArray;
160 use crate::VortexSessionExecute;
161 use crate::aggregate_fn::Accumulator;
162 use crate::aggregate_fn::DynAccumulator;
163 use crate::aggregate_fn::EmptyOptions;
164 use crate::aggregate_fn::fns::null_count::NullCount;
165 use crate::aggregate_fn::fns::null_count::null_count;
166 use crate::array_session;
167 use crate::arrays::PrimitiveArray;
168 use crate::dtype::DType;
169 use crate::dtype::Nullability;
170 use crate::dtype::PType;
171 use crate::expr::stats::Precision;
172 use crate::expr::stats::Stat;
173 use crate::expr::stats::StatsProviderExt;
174
175 #[test]
176 fn null_count_with_nulls() -> VortexResult<()> {
177 let array =
178 PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), None]).into_array();
179 let mut ctx = array_session().create_execution_ctx();
180
181 assert_eq!(null_count(&array, &mut ctx)?, 2);
182 assert_eq!(
183 array.statistics().get_as::<u64>(Stat::NullCount),
184 Precision::exact(2u64)
185 );
186 Ok(())
187 }
188
189 #[test]
190 fn null_count_multi_batch() -> VortexResult<()> {
191 let mut ctx = array_session().create_execution_ctx();
192 let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
193 let mut acc = Accumulator::try_new(NullCount, EmptyOptions, dtype)?;
194
195 let batch1 = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
196 acc.accumulate(&batch1, &mut ctx)?;
197
198 let batch2 = PrimitiveArray::from_option_iter([None, Some(5i32), None]).into_array();
199 acc.accumulate(&batch2, &mut ctx)?;
200
201 let result = acc.finish()?;
202 assert_eq!(result.as_primitive().typed_value::<u64>(), Some(3));
203 Ok(())
204 }
205}