vortex_array/aggregate_fn/fns/first/
mod.rs1use vortex_error::VortexResult;
5use vortex_session::registry::CachedId;
6
7use crate::ArrayRef;
8use crate::Columnar;
9use crate::ExecutionCtx;
10use crate::aggregate_fn::Accumulator;
11use crate::aggregate_fn::AggregateFnId;
12use crate::aggregate_fn::AggregateFnVTable;
13use crate::aggregate_fn::DynAccumulator;
14use crate::aggregate_fn::EmptyOptions;
15use crate::dtype::DType;
16use crate::scalar::Scalar;
17
18pub fn first(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
22 let mut acc = Accumulator::try_new(First, EmptyOptions, array.dtype().clone())?;
23 acc.accumulate(array, ctx)?;
24 acc.finish()
25}
26
27#[derive(Clone, Debug)]
29pub struct First;
30
31pub struct FirstPartial {
33 return_dtype: DType,
35 value: Option<Scalar>,
37}
38
39impl AggregateFnVTable for First {
40 type Options = EmptyOptions;
41 type Partial = FirstPartial;
42
43 fn id(&self) -> AggregateFnId {
44 static ID: CachedId = CachedId::new("vortex.first");
45 *ID
46 }
47
48 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
49 unimplemented!("First is not yet serializable");
50 }
51
52 fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
53 Some(input_dtype.as_nullable())
54 }
55
56 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
57 self.return_dtype(options, input_dtype)
58 }
59
60 fn empty_partial(
61 &self,
62 _options: &Self::Options,
63 input_dtype: &DType,
64 ) -> VortexResult<Self::Partial> {
65 Ok(FirstPartial {
66 return_dtype: input_dtype.as_nullable(),
67 value: None,
68 })
69 }
70
71 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
72 if partial.value.is_none() && !other.is_null() {
74 partial.value = Some(other);
75 }
76 Ok(())
77 }
78
79 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
80 Ok(match &partial.value {
81 Some(v) => v.clone(),
82 None => Scalar::null(partial.return_dtype.clone()),
83 })
84 }
85
86 fn reset(&self, partial: &mut Self::Partial) {
87 partial.value = None;
88 }
89
90 #[inline]
91 fn is_saturated(&self, partial: &Self::Partial) -> bool {
92 partial.value.is_some()
93 }
94
95 fn try_accumulate(
96 &self,
97 partial: &mut Self::Partial,
98 batch: &ArrayRef,
99 ctx: &mut ExecutionCtx,
100 ) -> VortexResult<bool> {
101 if partial.value.is_some() {
102 return Ok(true);
103 }
104 if let Some(idx) = batch.validity()?.execute_mask(batch.len(), ctx)?.first() {
105 let scalar = batch.execute_scalar(idx, ctx)?;
106 partial.value = Some(scalar.into_nullable());
107 }
108 Ok(true)
109 }
110
111 fn accumulate(
112 &self,
113 _partial: &mut Self::Partial,
114 _batch: &Columnar,
115 _ctx: &mut ExecutionCtx,
116 ) -> VortexResult<()> {
117 unreachable!("First::try_accumulate handles all arrays")
118 }
119
120 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
121 Ok(partials)
122 }
123
124 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
125 self.to_scalar(partial)
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use vortex_buffer::buffer;
132 use vortex_error::VortexResult;
133
134 use crate::IntoArray;
135 use crate::VortexSessionExecute;
136 use crate::aggregate_fn::Accumulator;
137 use crate::aggregate_fn::AggregateFnVTable;
138 use crate::aggregate_fn::DynAccumulator;
139 use crate::aggregate_fn::EmptyOptions;
140 use crate::aggregate_fn::fns::first::First;
141 use crate::aggregate_fn::fns::first::first;
142 use crate::array_session;
143 use crate::arrays::ChunkedArray;
144 use crate::arrays::ConstantArray;
145 use crate::arrays::PrimitiveArray;
146 use crate::arrays::VarBinArray;
147 use crate::dtype::DType;
148 use crate::dtype::Nullability;
149 use crate::dtype::Nullability::Nullable;
150 use crate::dtype::PType;
151 use crate::scalar::Scalar;
152 use crate::validity::Validity;
153
154 #[test]
155 fn first_non_null() -> VortexResult<()> {
156 let array = PrimitiveArray::new(buffer![10i32, 20, 30], Validity::NonNullable).into_array();
157 let mut ctx = array_session().create_execution_ctx();
158 assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(10i32, Nullable));
159 Ok(())
160 }
161
162 #[test]
163 fn first_skips_leading_nulls() -> VortexResult<()> {
164 let array =
165 PrimitiveArray::from_option_iter([None, None, Some(7i32), Some(8)]).into_array();
166 let mut ctx = array_session().create_execution_ctx();
167 assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(7i32, Nullable));
168 Ok(())
169 }
170
171 #[test]
172 fn first_all_null() -> VortexResult<()> {
173 let array = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]).into_array();
174 let mut ctx = array_session().create_execution_ctx();
175 let dtype = DType::Primitive(PType::I32, Nullable);
176 assert_eq!(first(&array, &mut ctx)?, Scalar::null(dtype));
177 Ok(())
178 }
179
180 #[test]
181 fn first_empty() -> VortexResult<()> {
182 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
183 let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
184 let result = acc.finish()?;
185 assert_eq!(result, Scalar::null(DType::Primitive(PType::I32, Nullable)));
186 Ok(())
187 }
188
189 #[test]
190 fn first_constant() -> VortexResult<()> {
191 let array = ConstantArray::new(42i32, 10).into_array();
192 let mut ctx = array_session().create_execution_ctx();
193 assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(42i32, Nullable));
194 Ok(())
195 }
196
197 #[test]
198 fn first_constant_null() -> VortexResult<()> {
199 let dtype = DType::Primitive(PType::I32, Nullable);
200 let array = ConstantArray::new(Scalar::null(dtype.clone()), 10).into_array();
201 let mut ctx = array_session().create_execution_ctx();
202 assert_eq!(first(&array, &mut ctx)?, Scalar::null(dtype));
203 Ok(())
204 }
205
206 #[test]
207 fn first_varbin() -> VortexResult<()> {
208 let array = VarBinArray::from_iter(
209 vec![None, Some("hello"), Some("world")],
210 DType::Utf8(Nullable),
211 )
212 .into_array();
213 let mut ctx = array_session().create_execution_ctx();
214 assert_eq!(first(&array, &mut ctx)?, Scalar::utf8("hello", Nullable));
215 Ok(())
216 }
217
218 #[test]
219 fn first_multi_batch_picks_earliest_non_null() -> VortexResult<()> {
220 let mut ctx = array_session().create_execution_ctx();
221 let dtype = DType::Primitive(PType::I32, Nullable);
222 let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
223
224 let batch1 = PrimitiveArray::from_option_iter::<i32, _>([None, None]).into_array();
226 acc.accumulate(&batch1, &mut ctx)?;
227 assert!(!acc.is_saturated());
228
229 let batch2 = PrimitiveArray::from_option_iter([None, Some(99i32), Some(100)]).into_array();
231 acc.accumulate(&batch2, &mut ctx)?;
232 assert!(acc.is_saturated());
233
234 let batch3 = PrimitiveArray::from_option_iter([Some(1i32)]).into_array();
236 acc.accumulate(&batch3, &mut ctx)?;
237
238 let result = acc.finish()?;
239 assert_eq!(result, Scalar::primitive(99i32, Nullable));
240 Ok(())
241 }
242
243 #[test]
244 fn first_finish_resets_state() -> VortexResult<()> {
245 let mut ctx = array_session().create_execution_ctx();
246 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
247 let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
248
249 let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
250 acc.accumulate(&batch1, &mut ctx)?;
251 assert_eq!(acc.finish()?, Scalar::primitive(10i32, Nullable));
252
253 let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
254 acc.accumulate(&batch2, &mut ctx)?;
255 assert_eq!(acc.finish()?, Scalar::primitive(3i32, Nullable));
256 Ok(())
257 }
258
259 #[test]
260 fn first_state_merge() -> VortexResult<()> {
261 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
262 let mut state = First.empty_partial(&EmptyOptions, &dtype)?;
263
264 First.combine_partials(&mut state, Scalar::null(dtype.as_nullable()))?;
266 assert!(!First.is_saturated(&state));
267
268 First.combine_partials(&mut state, Scalar::primitive(5i32, Nullable))?;
269 assert!(First.is_saturated(&state));
270
271 First.combine_partials(&mut state, Scalar::primitive(7i32, Nullable))?;
273 assert_eq!(First.to_scalar(&state)?, Scalar::primitive(5i32, Nullable));
274 Ok(())
275 }
276
277 #[test]
278 fn first_chunked() -> VortexResult<()> {
279 let chunk1 = PrimitiveArray::from_option_iter::<i32, _>([None, None]);
280 let chunk2 = PrimitiveArray::from_option_iter([None, Some(42i32), Some(100)]);
281 let dtype = chunk1.dtype().clone();
282 let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
283 let mut ctx = array_session().create_execution_ctx();
284 assert_eq!(
285 first(&chunked.into_array(), &mut ctx)?,
286 Scalar::primitive(42i32, Nullable)
287 );
288 Ok(())
289 }
290}