vortex_array/aggregate_fn/fns/null_count/
mod.rs1use vortex_error::VortexExpect;
5use vortex_error::VortexResult;
6use vortex_error::vortex_err;
7
8use crate::ArrayRef;
9use crate::Columnar;
10use crate::ExecutionCtx;
11use crate::IntoArray;
12use crate::aggregate_fn::Accumulator;
13use crate::aggregate_fn::AggregateFnId;
14use crate::aggregate_fn::AggregateFnVTable;
15use crate::aggregate_fn::DynAccumulator;
16use crate::aggregate_fn::EmptyOptions;
17use crate::dtype::DType;
18use crate::dtype::Nullability::NonNullable;
19use crate::dtype::PType;
20use crate::expr::stats::Precision;
21use crate::expr::stats::Stat;
22use crate::expr::stats::StatsProvider;
23use crate::scalar::Scalar;
24use crate::scalar::ScalarValue;
25
26pub fn null_count(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
28 if let Some(Precision::Exact(null_count_scalar)) = array.statistics().get(Stat::NullCount) {
29 return usize::try_from(&null_count_scalar)
30 .map_err(|e| vortex_err!("Failed to convert null count stat to usize: {e}"));
31 }
32
33 let mut acc = Accumulator::try_new(NullCount, EmptyOptions, array.dtype().clone())?;
34 acc.accumulate(array, ctx)?;
35 let result = acc.finish()?;
36
37 let count = result
38 .as_primitive()
39 .typed_value::<u64>()
40 .vortex_expect("null_count result should not be null");
41 let count_usize = usize::try_from(count).vortex_expect("Cannot be more nulls than usize::MAX");
42
43 array
44 .statistics()
45 .set(Stat::NullCount, Precision::Exact(ScalarValue::from(count)));
46
47 Ok(count_usize)
48}
49
50#[derive(Clone, Debug)]
54pub struct NullCount;
55
56impl AggregateFnVTable for NullCount {
57 type Options = EmptyOptions;
58 type Partial = u64;
59
60 fn id(&self) -> AggregateFnId {
61 AggregateFnId::new("vortex.null_count")
62 }
63
64 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
65 Ok(None)
66 }
67
68 fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
69 Some(DType::Primitive(PType::U64, NonNullable))
70 }
71
72 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
73 self.return_dtype(options, input_dtype)
74 }
75
76 fn empty_partial(
77 &self,
78 _options: &Self::Options,
79 _input_dtype: &DType,
80 ) -> VortexResult<Self::Partial> {
81 Ok(0)
82 }
83
84 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
85 let count = other
86 .as_primitive()
87 .typed_value::<u64>()
88 .vortex_expect("null_count partial should not be null");
89 *partial += count;
90 Ok(())
91 }
92
93 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
94 Ok(Scalar::primitive(*partial, NonNullable))
95 }
96
97 fn reset(&self, partial: &mut Self::Partial) {
98 *partial = 0;
99 }
100
101 #[inline]
102 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
103 false
104 }
105
106 fn try_accumulate(
107 &self,
108 state: &mut Self::Partial,
109 batch: &ArrayRef,
110 ctx: &mut ExecutionCtx,
111 ) -> VortexResult<bool> {
112 *state += batch.invalid_count(ctx)? as u64;
113 Ok(true)
114 }
115
116 fn accumulate(
117 &self,
118 partial: &mut Self::Partial,
119 batch: &Columnar,
120 ctx: &mut ExecutionCtx,
121 ) -> VortexResult<()> {
122 *partial += match batch {
123 Columnar::Constant(c) => {
124 if c.scalar().is_null() {
125 c.len() as u64
126 } else {
127 0
128 }
129 }
130 Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? as u64,
131 };
132 Ok(())
133 }
134
135 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
136 Ok(partials)
137 }
138
139 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
140 self.to_scalar(partial)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use vortex_error::VortexResult;
147
148 use crate::IntoArray;
149 use crate::LEGACY_SESSION;
150 use crate::VortexSessionExecute;
151 use crate::aggregate_fn::Accumulator;
152 use crate::aggregate_fn::DynAccumulator;
153 use crate::aggregate_fn::EmptyOptions;
154 use crate::aggregate_fn::fns::null_count::NullCount;
155 use crate::aggregate_fn::fns::null_count::null_count;
156 use crate::arrays::PrimitiveArray;
157 use crate::dtype::DType;
158 use crate::dtype::Nullability;
159 use crate::dtype::PType;
160 use crate::expr::stats::Precision;
161 use crate::expr::stats::Stat;
162 use crate::expr::stats::StatsProviderExt;
163
164 #[test]
165 fn null_count_with_nulls() -> VortexResult<()> {
166 let array =
167 PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), None]).into_array();
168 let mut ctx = LEGACY_SESSION.create_execution_ctx();
169
170 assert_eq!(null_count(&array, &mut ctx)?, 2);
171 assert_eq!(
172 array.statistics().get_as::<u64>(Stat::NullCount),
173 Some(Precision::exact(2u64))
174 );
175 Ok(())
176 }
177
178 #[test]
179 fn null_count_multi_batch() -> VortexResult<()> {
180 let mut ctx = LEGACY_SESSION.create_execution_ctx();
181 let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
182 let mut acc = Accumulator::try_new(NullCount, EmptyOptions, dtype)?;
183
184 let batch1 = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
185 acc.accumulate(&batch1, &mut ctx)?;
186
187 let batch2 = PrimitiveArray::from_option_iter([None, Some(5i32), None]).into_array();
188 acc.accumulate(&batch2, &mut ctx)?;
189
190 let result = acc.finish()?;
191 assert_eq!(result.as_primitive().typed_value::<u64>(), Some(3));
192 Ok(())
193 }
194}