vortex_array/aggregate_fn/fns/all_non_null/
mod.rs1use vortex_error::VortexResult;
5use vortex_session::VortexSession;
6use vortex_session::registry::CachedId;
7
8use crate::ArrayRef;
9use crate::Columnar;
10use crate::ExecutionCtx;
11use crate::IntoArray;
12use crate::aggregate_fn::AggregateFnId;
13use crate::aggregate_fn::AggregateFnVTable;
14use crate::aggregate_fn::EmptyOptions;
15use crate::dtype::DType;
16use crate::dtype::Nullability;
17use crate::scalar::Scalar;
18
19#[derive(Clone, Debug)]
23pub struct AllNonNull;
24
25impl AggregateFnVTable for AllNonNull {
26 type Options = EmptyOptions;
27 type Partial = bool;
28
29 fn id(&self) -> AggregateFnId {
30 static ID: CachedId = CachedId::new("vortex.all_non_null");
31 *ID
32 }
33
34 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
35 Ok(Some(vec![]))
36 }
37
38 fn deserialize(
39 &self,
40 _metadata: &[u8],
41 _session: &VortexSession,
42 ) -> VortexResult<Self::Options> {
43 Ok(EmptyOptions)
44 }
45
46 fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
47 Some(DType::Bool(Nullability::NonNullable))
48 }
49
50 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
51 self.return_dtype(options, input_dtype)
52 }
53
54 fn empty_partial(
55 &self,
56 _options: &Self::Options,
57 _input_dtype: &DType,
58 ) -> VortexResult<Self::Partial> {
59 Ok(true)
60 }
61
62 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
63 *partial &= bool::try_from(&other)?;
64 Ok(())
65 }
66
67 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
68 Ok(Scalar::bool(*partial, Nullability::NonNullable))
69 }
70
71 fn reset(&self, partial: &mut Self::Partial) {
72 *partial = true;
73 }
74
75 fn is_saturated(&self, partial: &Self::Partial) -> bool {
76 !*partial
77 }
78
79 fn try_accumulate(
80 &self,
81 state: &mut Self::Partial,
82 batch: &ArrayRef,
83 ctx: &mut ExecutionCtx,
84 ) -> VortexResult<bool> {
85 *state &= batch.invalid_count(ctx)? == 0;
86 Ok(true)
87 }
88
89 fn accumulate(
90 &self,
91 partial: &mut Self::Partial,
92 batch: &Columnar,
93 ctx: &mut ExecutionCtx,
94 ) -> VortexResult<()> {
95 *partial &= match batch {
98 Columnar::Constant(c) => c.is_empty() || !c.scalar().is_null(),
99 Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? == 0,
100 };
101 Ok(())
102 }
103
104 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
105 Ok(partials)
106 }
107
108 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
109 self.to_scalar(partial)
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use vortex_error::VortexResult;
116
117 use crate::IntoArray;
118 use crate::VortexSessionExecute;
119 use crate::aggregate_fn::Accumulator;
120 use crate::aggregate_fn::DynAccumulator;
121 use crate::aggregate_fn::EmptyOptions;
122 use crate::aggregate_fn::fns::all_non_null::AllNonNull;
123 use crate::array_session;
124 use crate::arrays::PrimitiveArray;
125 use crate::dtype::DType;
126 use crate::dtype::Nullability;
127 use crate::dtype::PType;
128
129 #[test]
130 fn all_non_null_aggregate_fn() -> VortexResult<()> {
131 let mut ctx = array_session().create_execution_ctx();
132 let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
133 let mut acc = Accumulator::try_new(AllNonNull, EmptyOptions, dtype)?;
134
135 let batch = PrimitiveArray::from_option_iter([Some(1i32), Some(2), Some(3)]).into_array();
136 acc.accumulate(&batch, &mut ctx)?;
137
138 assert!(bool::try_from(&acc.finish()?)?);
139 Ok(())
140 }
141
142 #[test]
143 fn all_non_null_false_with_nulls() -> VortexResult<()> {
144 let mut ctx = array_session().create_execution_ctx();
145 let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
146 let mut acc = Accumulator::try_new(AllNonNull, EmptyOptions, dtype)?;
147
148 let batch = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
149 acc.accumulate(&batch, &mut ctx)?;
150
151 assert!(!bool::try_from(&acc.finish()?)?);
152 Ok(())
153 }
154
155 #[test]
156 fn all_non_null_true_for_empty_input() -> VortexResult<()> {
157 let mut ctx = array_session().create_execution_ctx();
158 let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
159 let mut acc = Accumulator::try_new(AllNonNull, EmptyOptions, dtype)?;
160
161 let batch = PrimitiveArray::empty::<i32>(Nullability::Nullable).into_array();
162 acc.accumulate(&batch, &mut ctx)?;
163
164 assert!(bool::try_from(&acc.finish()?)?);
165 Ok(())
166 }
167}