vortex_array/aggregate_fn/fns/first/
mod.rs1use vortex_error::VortexResult;
5
6use crate::ArrayRef;
7use crate::Columnar;
8use crate::ExecutionCtx;
9use crate::aggregate_fn::Accumulator;
10use crate::aggregate_fn::AggregateFnId;
11use crate::aggregate_fn::AggregateFnVTable;
12use crate::aggregate_fn::DynAccumulator;
13use crate::aggregate_fn::EmptyOptions;
14use crate::dtype::DType;
15use crate::scalar::Scalar;
16
17pub fn first(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
21 let mut acc = Accumulator::try_new(First, EmptyOptions, array.dtype().clone())?;
22 acc.accumulate(array, ctx)?;
23 acc.finish()
24}
25
26#[derive(Clone, Debug)]
28pub struct First;
29
30pub struct FirstPartial {
32 return_dtype: DType,
34 value: Option<Scalar>,
36}
37
38impl AggregateFnVTable for First {
39 type Options = EmptyOptions;
40 type Partial = FirstPartial;
41
42 fn id(&self) -> AggregateFnId {
43 AggregateFnId::new_ref("vortex.first")
44 }
45
46 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
47 unimplemented!("First is not yet serializable");
48 }
49
50 fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
51 Some(input_dtype.as_nullable())
52 }
53
54 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
55 self.return_dtype(options, input_dtype)
56 }
57
58 fn empty_partial(
59 &self,
60 _options: &Self::Options,
61 input_dtype: &DType,
62 ) -> VortexResult<Self::Partial> {
63 Ok(FirstPartial {
64 return_dtype: input_dtype.as_nullable(),
65 value: None,
66 })
67 }
68
69 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
70 if partial.value.is_none() && !other.is_null() {
72 partial.value = Some(other);
73 }
74 Ok(())
75 }
76
77 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
78 Ok(match &partial.value {
79 Some(v) => v.clone(),
80 None => Scalar::null(partial.return_dtype.clone()),
81 })
82 }
83
84 fn reset(&self, partial: &mut Self::Partial) {
85 partial.value = None;
86 }
87
88 #[inline]
89 fn is_saturated(&self, partial: &Self::Partial) -> bool {
90 partial.value.is_some()
91 }
92
93 fn try_accumulate(
94 &self,
95 partial: &mut Self::Partial,
96 batch: &ArrayRef,
97 _ctx: &mut ExecutionCtx,
98 ) -> VortexResult<bool> {
99 if partial.value.is_some() {
100 return Ok(true);
101 }
102 if let Some(idx) = batch.validity_mask()?.first() {
103 let scalar = batch.scalar_at(idx)?;
104 partial.value = Some(scalar.into_nullable());
105 }
106 Ok(true)
107 }
108
109 fn accumulate(
110 &self,
111 _partial: &mut Self::Partial,
112 _batch: &Columnar,
113 _ctx: &mut ExecutionCtx,
114 ) -> VortexResult<()> {
115 unreachable!("First::try_accumulate handles all arrays")
116 }
117
118 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
119 Ok(partials)
120 }
121
122 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
123 self.to_scalar(partial)
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use vortex_buffer::buffer;
130 use vortex_error::VortexResult;
131
132 use crate::IntoArray;
133 use crate::LEGACY_SESSION;
134 use crate::VortexSessionExecute;
135 use crate::aggregate_fn::Accumulator;
136 use crate::aggregate_fn::AggregateFnVTable;
137 use crate::aggregate_fn::DynAccumulator;
138 use crate::aggregate_fn::EmptyOptions;
139 use crate::aggregate_fn::fns::first::First;
140 use crate::aggregate_fn::fns::first::first;
141 use crate::arrays::ChunkedArray;
142 use crate::arrays::ConstantArray;
143 use crate::arrays::PrimitiveArray;
144 use crate::arrays::VarBinArray;
145 use crate::dtype::DType;
146 use crate::dtype::Nullability;
147 use crate::dtype::Nullability::Nullable;
148 use crate::dtype::PType;
149 use crate::scalar::Scalar;
150 use crate::validity::Validity;
151
152 #[test]
153 fn first_non_null() -> VortexResult<()> {
154 let array = PrimitiveArray::new(buffer![10i32, 20, 30], Validity::NonNullable).into_array();
155 let mut ctx = LEGACY_SESSION.create_execution_ctx();
156 assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(10i32, Nullable));
157 Ok(())
158 }
159
160 #[test]
161 fn first_skips_leading_nulls() -> VortexResult<()> {
162 let array =
163 PrimitiveArray::from_option_iter([None, None, Some(7i32), Some(8)]).into_array();
164 let mut ctx = LEGACY_SESSION.create_execution_ctx();
165 assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(7i32, Nullable));
166 Ok(())
167 }
168
169 #[test]
170 fn first_all_null() -> VortexResult<()> {
171 let array = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]).into_array();
172 let mut ctx = LEGACY_SESSION.create_execution_ctx();
173 let dtype = DType::Primitive(PType::I32, Nullable);
174 assert_eq!(first(&array, &mut ctx)?, Scalar::null(dtype));
175 Ok(())
176 }
177
178 #[test]
179 fn first_empty() -> VortexResult<()> {
180 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
181 let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
182 let result = acc.finish()?;
183 assert_eq!(result, Scalar::null(DType::Primitive(PType::I32, Nullable)));
184 Ok(())
185 }
186
187 #[test]
188 fn first_constant() -> VortexResult<()> {
189 let array = ConstantArray::new(42i32, 10).into_array();
190 let mut ctx = LEGACY_SESSION.create_execution_ctx();
191 assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(42i32, Nullable));
192 Ok(())
193 }
194
195 #[test]
196 fn first_constant_null() -> VortexResult<()> {
197 let dtype = DType::Primitive(PType::I32, Nullable);
198 let array = ConstantArray::new(Scalar::null(dtype.clone()), 10).into_array();
199 let mut ctx = LEGACY_SESSION.create_execution_ctx();
200 assert_eq!(first(&array, &mut ctx)?, Scalar::null(dtype));
201 Ok(())
202 }
203
204 #[test]
205 fn first_varbin() -> VortexResult<()> {
206 let array = VarBinArray::from_iter(
207 vec![None, Some("hello"), Some("world")],
208 DType::Utf8(Nullable),
209 )
210 .into_array();
211 let mut ctx = LEGACY_SESSION.create_execution_ctx();
212 assert_eq!(first(&array, &mut ctx)?, Scalar::utf8("hello", Nullable));
213 Ok(())
214 }
215
216 #[test]
217 fn first_multi_batch_picks_earliest_non_null() -> VortexResult<()> {
218 let mut ctx = LEGACY_SESSION.create_execution_ctx();
219 let dtype = DType::Primitive(PType::I32, Nullable);
220 let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
221
222 let batch1 = PrimitiveArray::from_option_iter::<i32, _>([None, None]).into_array();
224 acc.accumulate(&batch1, &mut ctx)?;
225 assert!(!acc.is_saturated());
226
227 let batch2 = PrimitiveArray::from_option_iter([None, Some(99i32), Some(100)]).into_array();
229 acc.accumulate(&batch2, &mut ctx)?;
230 assert!(acc.is_saturated());
231
232 let batch3 = PrimitiveArray::from_option_iter([Some(1i32)]).into_array();
234 acc.accumulate(&batch3, &mut ctx)?;
235
236 let result = acc.finish()?;
237 assert_eq!(result, Scalar::primitive(99i32, Nullable));
238 Ok(())
239 }
240
241 #[test]
242 fn first_finish_resets_state() -> VortexResult<()> {
243 let mut ctx = LEGACY_SESSION.create_execution_ctx();
244 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
245 let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
246
247 let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
248 acc.accumulate(&batch1, &mut ctx)?;
249 assert_eq!(acc.finish()?, Scalar::primitive(10i32, Nullable));
250
251 let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
252 acc.accumulate(&batch2, &mut ctx)?;
253 assert_eq!(acc.finish()?, Scalar::primitive(3i32, Nullable));
254 Ok(())
255 }
256
257 #[test]
258 fn first_state_merge() -> VortexResult<()> {
259 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
260 let mut state = First.empty_partial(&EmptyOptions, &dtype)?;
261
262 First.combine_partials(&mut state, Scalar::null(dtype.as_nullable()))?;
264 assert!(!First.is_saturated(&state));
265
266 First.combine_partials(&mut state, Scalar::primitive(5i32, Nullable))?;
267 assert!(First.is_saturated(&state));
268
269 First.combine_partials(&mut state, Scalar::primitive(7i32, Nullable))?;
271 assert_eq!(First.to_scalar(&state)?, Scalar::primitive(5i32, Nullable));
272 Ok(())
273 }
274
275 #[test]
276 fn first_chunked() -> VortexResult<()> {
277 let chunk1 = PrimitiveArray::from_option_iter::<i32, _>([None, None]);
278 let chunk2 = PrimitiveArray::from_option_iter([None, Some(42i32), Some(100)]);
279 let dtype = chunk1.dtype().clone();
280 let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
281 let mut ctx = LEGACY_SESSION.create_execution_ctx();
282 assert_eq!(
283 first(&chunked.into_array(), &mut ctx)?,
284 Scalar::primitive(42i32, Nullable)
285 );
286 Ok(())
287 }
288}