Skip to main content

vortex_array/aggregate_fn/fns/first/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_error::VortexResult;
5
6use crate::ArrayRef;
7use crate::Columnar;
8use crate::ExecutionCtx;
9use crate::aggregate_fn::Accumulator;
10use crate::aggregate_fn::AggregateFnId;
11use crate::aggregate_fn::AggregateFnVTable;
12use crate::aggregate_fn::DynAccumulator;
13use crate::aggregate_fn::EmptyOptions;
14use crate::dtype::DType;
15use crate::scalar::Scalar;
16
17/// Return the first non-null value of an array.
18///
19/// See [`First`] for details.
20pub fn first(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
21    let mut acc = Accumulator::try_new(First, EmptyOptions, array.dtype().clone())?;
22    acc.accumulate(array, ctx)?;
23    acc.finish()
24}
25
26/// Return the first non-null value seen across all batches.
27#[derive(Clone, Debug)]
28pub struct First;
29
30/// Partial accumulator state for the [`First`] aggregate.
31pub struct FirstPartial {
32    /// The nullable version of the input dtype, used for the result and for empty/all-null inputs.
33    return_dtype: DType,
34    /// The first non-null value seen so far, or `None` if no non-null value has been observed.
35    value: Option<Scalar>,
36}
37
38impl AggregateFnVTable for First {
39    type Options = EmptyOptions;
40    type Partial = FirstPartial;
41
42    fn id(&self) -> AggregateFnId {
43        AggregateFnId::new_ref("vortex.first")
44    }
45
46    fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
47        unimplemented!("First is not yet serializable");
48    }
49
50    fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
51        Some(input_dtype.as_nullable())
52    }
53
54    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
55        self.return_dtype(options, input_dtype)
56    }
57
58    fn empty_partial(
59        &self,
60        _options: &Self::Options,
61        input_dtype: &DType,
62    ) -> VortexResult<Self::Partial> {
63        Ok(FirstPartial {
64            return_dtype: input_dtype.as_nullable(),
65            value: None,
66        })
67    }
68
69    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
70        // Only the first non-null partial wins; later ones are ignored.
71        if partial.value.is_none() && !other.is_null() {
72            partial.value = Some(other);
73        }
74        Ok(())
75    }
76
77    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
78        Ok(match &partial.value {
79            Some(v) => v.clone(),
80            None => Scalar::null(partial.return_dtype.clone()),
81        })
82    }
83
84    fn reset(&self, partial: &mut Self::Partial) {
85        partial.value = None;
86    }
87
88    #[inline]
89    fn is_saturated(&self, partial: &Self::Partial) -> bool {
90        partial.value.is_some()
91    }
92
93    fn try_accumulate(
94        &self,
95        partial: &mut Self::Partial,
96        batch: &ArrayRef,
97        _ctx: &mut ExecutionCtx,
98    ) -> VortexResult<bool> {
99        if partial.value.is_some() {
100            return Ok(true);
101        }
102        if let Some(idx) = batch.validity_mask()?.first() {
103            let scalar = batch.scalar_at(idx)?;
104            partial.value = Some(scalar.into_nullable());
105        }
106        Ok(true)
107    }
108
109    fn accumulate(
110        &self,
111        _partial: &mut Self::Partial,
112        _batch: &Columnar,
113        _ctx: &mut ExecutionCtx,
114    ) -> VortexResult<()> {
115        unreachable!("First::try_accumulate handles all arrays")
116    }
117
118    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
119        Ok(partials)
120    }
121
122    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
123        self.to_scalar(partial)
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use vortex_buffer::buffer;
130    use vortex_error::VortexResult;
131
132    use crate::IntoArray;
133    use crate::LEGACY_SESSION;
134    use crate::VortexSessionExecute;
135    use crate::aggregate_fn::Accumulator;
136    use crate::aggregate_fn::AggregateFnVTable;
137    use crate::aggregate_fn::DynAccumulator;
138    use crate::aggregate_fn::EmptyOptions;
139    use crate::aggregate_fn::fns::first::First;
140    use crate::aggregate_fn::fns::first::first;
141    use crate::arrays::ChunkedArray;
142    use crate::arrays::ConstantArray;
143    use crate::arrays::PrimitiveArray;
144    use crate::arrays::VarBinArray;
145    use crate::dtype::DType;
146    use crate::dtype::Nullability;
147    use crate::dtype::Nullability::Nullable;
148    use crate::dtype::PType;
149    use crate::scalar::Scalar;
150    use crate::validity::Validity;
151
152    #[test]
153    fn first_non_null() -> VortexResult<()> {
154        let array = PrimitiveArray::new(buffer![10i32, 20, 30], Validity::NonNullable).into_array();
155        let mut ctx = LEGACY_SESSION.create_execution_ctx();
156        assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(10i32, Nullable));
157        Ok(())
158    }
159
160    #[test]
161    fn first_skips_leading_nulls() -> VortexResult<()> {
162        let array =
163            PrimitiveArray::from_option_iter([None, None, Some(7i32), Some(8)]).into_array();
164        let mut ctx = LEGACY_SESSION.create_execution_ctx();
165        assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(7i32, Nullable));
166        Ok(())
167    }
168
169    #[test]
170    fn first_all_null() -> VortexResult<()> {
171        let array = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]).into_array();
172        let mut ctx = LEGACY_SESSION.create_execution_ctx();
173        let dtype = DType::Primitive(PType::I32, Nullable);
174        assert_eq!(first(&array, &mut ctx)?, Scalar::null(dtype));
175        Ok(())
176    }
177
178    #[test]
179    fn first_empty() -> VortexResult<()> {
180        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
181        let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
182        let result = acc.finish()?;
183        assert_eq!(result, Scalar::null(DType::Primitive(PType::I32, Nullable)));
184        Ok(())
185    }
186
187    #[test]
188    fn first_constant() -> VortexResult<()> {
189        let array = ConstantArray::new(42i32, 10).into_array();
190        let mut ctx = LEGACY_SESSION.create_execution_ctx();
191        assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(42i32, Nullable));
192        Ok(())
193    }
194
195    #[test]
196    fn first_constant_null() -> VortexResult<()> {
197        let dtype = DType::Primitive(PType::I32, Nullable);
198        let array = ConstantArray::new(Scalar::null(dtype.clone()), 10).into_array();
199        let mut ctx = LEGACY_SESSION.create_execution_ctx();
200        assert_eq!(first(&array, &mut ctx)?, Scalar::null(dtype));
201        Ok(())
202    }
203
204    #[test]
205    fn first_varbin() -> VortexResult<()> {
206        let array = VarBinArray::from_iter(
207            vec![None, Some("hello"), Some("world")],
208            DType::Utf8(Nullable),
209        )
210        .into_array();
211        let mut ctx = LEGACY_SESSION.create_execution_ctx();
212        assert_eq!(first(&array, &mut ctx)?, Scalar::utf8("hello", Nullable));
213        Ok(())
214    }
215
216    #[test]
217    fn first_multi_batch_picks_earliest_non_null() -> VortexResult<()> {
218        let mut ctx = LEGACY_SESSION.create_execution_ctx();
219        let dtype = DType::Primitive(PType::I32, Nullable);
220        let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
221
222        // First batch is all null - should not saturate.
223        let batch1 = PrimitiveArray::from_option_iter::<i32, _>([None, None]).into_array();
224        acc.accumulate(&batch1, &mut ctx)?;
225        assert!(!acc.is_saturated());
226
227        // Second batch contains the first non-null value.
228        let batch2 = PrimitiveArray::from_option_iter([None, Some(99i32), Some(100)]).into_array();
229        acc.accumulate(&batch2, &mut ctx)?;
230        assert!(acc.is_saturated());
231
232        // Third batch must be ignored - First is already saturated.
233        let batch3 = PrimitiveArray::from_option_iter([Some(1i32)]).into_array();
234        acc.accumulate(&batch3, &mut ctx)?;
235
236        let result = acc.finish()?;
237        assert_eq!(result, Scalar::primitive(99i32, Nullable));
238        Ok(())
239    }
240
241    #[test]
242    fn first_finish_resets_state() -> VortexResult<()> {
243        let mut ctx = LEGACY_SESSION.create_execution_ctx();
244        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
245        let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
246
247        let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
248        acc.accumulate(&batch1, &mut ctx)?;
249        assert_eq!(acc.finish()?, Scalar::primitive(10i32, Nullable));
250
251        let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
252        acc.accumulate(&batch2, &mut ctx)?;
253        assert_eq!(acc.finish()?, Scalar::primitive(3i32, Nullable));
254        Ok(())
255    }
256
257    #[test]
258    fn first_state_merge() -> VortexResult<()> {
259        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
260        let mut state = First.empty_partial(&EmptyOptions, &dtype)?;
261
262        // A null partial means the sub-accumulator saw nothing valid - should be ignored.
263        First.combine_partials(&mut state, Scalar::null(dtype.as_nullable()))?;
264        assert!(!First.is_saturated(&state));
265
266        First.combine_partials(&mut state, Scalar::primitive(5i32, Nullable))?;
267        assert!(First.is_saturated(&state));
268
269        // Subsequent valid partials are dropped.
270        First.combine_partials(&mut state, Scalar::primitive(7i32, Nullable))?;
271        assert_eq!(First.to_scalar(&state)?, Scalar::primitive(5i32, Nullable));
272        Ok(())
273    }
274
275    #[test]
276    fn first_chunked() -> VortexResult<()> {
277        let chunk1 = PrimitiveArray::from_option_iter::<i32, _>([None, None]);
278        let chunk2 = PrimitiveArray::from_option_iter([None, Some(42i32), Some(100)]);
279        let dtype = chunk1.dtype().clone();
280        let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
281        let mut ctx = LEGACY_SESSION.create_execution_ctx();
282        assert_eq!(
283            first(&chunked.into_array(), &mut ctx)?,
284            Scalar::primitive(42i32, Nullable)
285        );
286        Ok(())
287    }
288}