vortex_array/aggregate_fn/fns/all_non_null/
mod.rs1use vortex_error::VortexResult;
5
6use crate::ArrayRef;
7use crate::Columnar;
8use crate::ExecutionCtx;
9use crate::IntoArray;
10use crate::aggregate_fn::AggregateFnId;
11use crate::aggregate_fn::AggregateFnVTable;
12use crate::aggregate_fn::EmptyOptions;
13use crate::dtype::DType;
14use crate::dtype::Nullability;
15use crate::scalar::Scalar;
16
17#[derive(Clone, Debug)]
21pub struct AllNonNull;
22
23impl AggregateFnVTable for AllNonNull {
24 type Options = EmptyOptions;
25 type Partial = bool;
26
27 fn id(&self) -> AggregateFnId {
28 AggregateFnId::new("vortex.all_non_null")
29 }
30
31 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
32 Ok(None)
33 }
34
35 fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
36 Some(DType::Bool(Nullability::NonNullable))
37 }
38
39 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
40 self.return_dtype(options, input_dtype)
41 }
42
43 fn empty_partial(
44 &self,
45 _options: &Self::Options,
46 _input_dtype: &DType,
47 ) -> VortexResult<Self::Partial> {
48 Ok(true)
49 }
50
51 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
52 *partial &= bool::try_from(&other)?;
53 Ok(())
54 }
55
56 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
57 Ok(Scalar::bool(*partial, Nullability::NonNullable))
58 }
59
60 fn reset(&self, partial: &mut Self::Partial) {
61 *partial = true;
62 }
63
64 fn is_saturated(&self, partial: &Self::Partial) -> bool {
65 !*partial
66 }
67
68 fn try_accumulate(
69 &self,
70 state: &mut Self::Partial,
71 batch: &ArrayRef,
72 ctx: &mut ExecutionCtx,
73 ) -> VortexResult<bool> {
74 *state &= batch.invalid_count(ctx)? == 0;
75 Ok(true)
76 }
77
78 fn accumulate(
79 &self,
80 partial: &mut Self::Partial,
81 batch: &Columnar,
82 ctx: &mut ExecutionCtx,
83 ) -> VortexResult<()> {
84 *partial &= match batch {
87 Columnar::Constant(c) => c.is_empty() || !c.scalar().is_null(),
88 Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? == 0,
89 };
90 Ok(())
91 }
92
93 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
94 Ok(partials)
95 }
96
97 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
98 self.to_scalar(partial)
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use vortex_error::VortexResult;
105
106 use crate::IntoArray;
107 use crate::LEGACY_SESSION;
108 use crate::VortexSessionExecute;
109 use crate::aggregate_fn::Accumulator;
110 use crate::aggregate_fn::DynAccumulator;
111 use crate::aggregate_fn::EmptyOptions;
112 use crate::aggregate_fn::fns::all_non_null::AllNonNull;
113 use crate::arrays::PrimitiveArray;
114 use crate::dtype::DType;
115 use crate::dtype::Nullability;
116 use crate::dtype::PType;
117
118 #[test]
119 fn all_non_null_aggregate_fn() -> VortexResult<()> {
120 let mut ctx = LEGACY_SESSION.create_execution_ctx();
121 let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
122 let mut acc = Accumulator::try_new(AllNonNull, EmptyOptions, dtype)?;
123
124 let batch = PrimitiveArray::from_option_iter([Some(1i32), Some(2), Some(3)]).into_array();
125 acc.accumulate(&batch, &mut ctx)?;
126
127 assert!(bool::try_from(&acc.finish()?)?);
128 Ok(())
129 }
130
131 #[test]
132 fn all_non_null_false_with_nulls() -> VortexResult<()> {
133 let mut ctx = LEGACY_SESSION.create_execution_ctx();
134 let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
135 let mut acc = Accumulator::try_new(AllNonNull, EmptyOptions, dtype)?;
136
137 let batch = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
138 acc.accumulate(&batch, &mut ctx)?;
139
140 assert!(!bool::try_from(&acc.finish()?)?);
141 Ok(())
142 }
143
144 #[test]
145 fn all_non_null_true_for_empty_input() -> VortexResult<()> {
146 let mut ctx = LEGACY_SESSION.create_execution_ctx();
147 let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
148 let mut acc = Accumulator::try_new(AllNonNull, EmptyOptions, dtype)?;
149
150 let batch = PrimitiveArray::empty::<i32>(Nullability::Nullable).into_array();
151 acc.accumulate(&batch, &mut ctx)?;
152
153 assert!(bool::try_from(&acc.finish()?)?);
154 Ok(())
155 }
156}