Skip to main content

vortex_array/aggregate_fn/fns/null_count/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_error::VortexExpect;
5use vortex_error::VortexResult;
6use vortex_error::vortex_err;
7use vortex_session::VortexSession;
8use vortex_session::registry::CachedId;
9
10use crate::ArrayRef;
11use crate::Columnar;
12use crate::ExecutionCtx;
13use crate::IntoArray;
14use crate::aggregate_fn::Accumulator;
15use crate::aggregate_fn::AggregateFnId;
16use crate::aggregate_fn::AggregateFnVTable;
17use crate::aggregate_fn::DynAccumulator;
18use crate::aggregate_fn::EmptyOptions;
19use crate::dtype::DType;
20use crate::dtype::Nullability::NonNullable;
21use crate::dtype::PType;
22use crate::expr::stats::Precision;
23use crate::expr::stats::Stat;
24use crate::expr::stats::StatsProvider;
25use crate::scalar::Scalar;
26use crate::scalar::ScalarValue;
27
28/// Return the number of null values in an array.
29pub fn null_count(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
30    if let Precision::Exact(null_count_scalar) = array.statistics().get(Stat::NullCount) {
31        return usize::try_from(&null_count_scalar)
32            .map_err(|e| vortex_err!("Failed to convert null count stat to usize: {e}"));
33    }
34
35    let mut acc = Accumulator::try_new(NullCount, EmptyOptions, array.dtype().clone())?;
36    acc.accumulate(array, ctx)?;
37    let result = acc.finish()?;
38
39    let count = result
40        .as_primitive()
41        .typed_value::<u64>()
42        .vortex_expect("null_count result should not be null");
43    let count_usize = usize::try_from(count).vortex_expect("Cannot be more nulls than usize::MAX");
44
45    array
46        .statistics()
47        .set(Stat::NullCount, Precision::Exact(ScalarValue::from(count)));
48
49    Ok(count_usize)
50}
51
52/// Count the number of null values in an array.
53///
54/// Applies to all types and returns a non-null `u64`.
55#[derive(Clone, Debug)]
56pub struct NullCount;
57
58impl AggregateFnVTable for NullCount {
59    type Options = EmptyOptions;
60    type Partial = u64;
61
62    fn id(&self) -> AggregateFnId {
63        static ID: CachedId = CachedId::new("vortex.null_count");
64        *ID
65    }
66
67    fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
68        Ok(Some(vec![]))
69    }
70
71    fn deserialize(
72        &self,
73        _metadata: &[u8],
74        _session: &VortexSession,
75    ) -> VortexResult<Self::Options> {
76        Ok(EmptyOptions)
77    }
78
79    fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
80        Some(DType::Primitive(PType::U64, NonNullable))
81    }
82
83    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
84        self.return_dtype(options, input_dtype)
85    }
86
87    fn empty_partial(
88        &self,
89        _options: &Self::Options,
90        _input_dtype: &DType,
91    ) -> VortexResult<Self::Partial> {
92        Ok(0)
93    }
94
95    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
96        let count = other
97            .as_primitive()
98            .typed_value::<u64>()
99            .vortex_expect("null_count partial should not be null");
100        *partial += count;
101        Ok(())
102    }
103
104    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
105        Ok(Scalar::primitive(*partial, NonNullable))
106    }
107
108    fn reset(&self, partial: &mut Self::Partial) {
109        *partial = 0;
110    }
111
112    #[inline]
113    fn is_saturated(&self, _partial: &Self::Partial) -> bool {
114        false
115    }
116
117    fn try_accumulate(
118        &self,
119        state: &mut Self::Partial,
120        batch: &ArrayRef,
121        ctx: &mut ExecutionCtx,
122    ) -> VortexResult<bool> {
123        *state += batch.invalid_count(ctx)? as u64;
124        Ok(true)
125    }
126
127    fn accumulate(
128        &self,
129        partial: &mut Self::Partial,
130        batch: &Columnar,
131        ctx: &mut ExecutionCtx,
132    ) -> VortexResult<()> {
133        *partial += match batch {
134            Columnar::Constant(c) => {
135                if c.scalar().is_null() {
136                    c.len() as u64
137                } else {
138                    0
139                }
140            }
141            Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? as u64,
142        };
143        Ok(())
144    }
145
146    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
147        Ok(partials)
148    }
149
150    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
151        self.to_scalar(partial)
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use vortex_error::VortexResult;
158
159    use crate::IntoArray;
160    use crate::VortexSessionExecute;
161    use crate::aggregate_fn::Accumulator;
162    use crate::aggregate_fn::DynAccumulator;
163    use crate::aggregate_fn::EmptyOptions;
164    use crate::aggregate_fn::fns::null_count::NullCount;
165    use crate::aggregate_fn::fns::null_count::null_count;
166    use crate::array_session;
167    use crate::arrays::PrimitiveArray;
168    use crate::dtype::DType;
169    use crate::dtype::Nullability;
170    use crate::dtype::PType;
171    use crate::expr::stats::Precision;
172    use crate::expr::stats::Stat;
173    use crate::expr::stats::StatsProviderExt;
174
175    #[test]
176    fn null_count_with_nulls() -> VortexResult<()> {
177        let array =
178            PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), None]).into_array();
179        let mut ctx = array_session().create_execution_ctx();
180
181        assert_eq!(null_count(&array, &mut ctx)?, 2);
182        assert_eq!(
183            array.statistics().get_as::<u64>(Stat::NullCount),
184            Precision::exact(2u64)
185        );
186        Ok(())
187    }
188
189    #[test]
190    fn null_count_multi_batch() -> VortexResult<()> {
191        let mut ctx = array_session().create_execution_ctx();
192        let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
193        let mut acc = Accumulator::try_new(NullCount, EmptyOptions, dtype)?;
194
195        let batch1 = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
196        acc.accumulate(&batch1, &mut ctx)?;
197
198        let batch2 = PrimitiveArray::from_option_iter([None, Some(5i32), None]).into_array();
199        acc.accumulate(&batch2, &mut ctx)?;
200
201        let result = acc.finish()?;
202        assert_eq!(result.as_primitive().typed_value::<u64>(), Some(3));
203        Ok(())
204    }
205}