Skip to main content

vortex_array/aggregate_fn/fns/first/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_error::VortexResult;
5use vortex_session::registry::CachedId;
6
7use crate::ArrayRef;
8use crate::Columnar;
9use crate::ExecutionCtx;
10use crate::aggregate_fn::Accumulator;
11use crate::aggregate_fn::AggregateFnId;
12use crate::aggregate_fn::AggregateFnVTable;
13use crate::aggregate_fn::DynAccumulator;
14use crate::aggregate_fn::EmptyOptions;
15use crate::dtype::DType;
16use crate::scalar::Scalar;
17
18/// Return the first non-null value of an array.
19///
20/// See [`First`] for details.
21pub fn first(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
22    let mut acc = Accumulator::try_new(First, EmptyOptions, array.dtype().clone())?;
23    acc.accumulate(array, ctx)?;
24    acc.finish()
25}
26
27/// Return the first non-null value seen across all batches.
28#[derive(Clone, Debug)]
29pub struct First;
30
31/// Partial accumulator state for the [`First`] aggregate.
32pub struct FirstPartial {
33    /// The nullable version of the input dtype, used for the result and for empty/all-null inputs.
34    return_dtype: DType,
35    /// The first non-null value seen so far, or `None` if no non-null value has been observed.
36    value: Option<Scalar>,
37}
38
39impl AggregateFnVTable for First {
40    type Options = EmptyOptions;
41    type Partial = FirstPartial;
42
43    fn id(&self) -> AggregateFnId {
44        static ID: CachedId = CachedId::new("vortex.first");
45        *ID
46    }
47
48    fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
49        unimplemented!("First is not yet serializable");
50    }
51
52    fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
53        Some(input_dtype.as_nullable())
54    }
55
56    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
57        self.return_dtype(options, input_dtype)
58    }
59
60    fn empty_partial(
61        &self,
62        _options: &Self::Options,
63        input_dtype: &DType,
64    ) -> VortexResult<Self::Partial> {
65        Ok(FirstPartial {
66            return_dtype: input_dtype.as_nullable(),
67            value: None,
68        })
69    }
70
71    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
72        // Only the first non-null partial wins; later ones are ignored.
73        if partial.value.is_none() && !other.is_null() {
74            partial.value = Some(other);
75        }
76        Ok(())
77    }
78
79    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
80        Ok(match &partial.value {
81            Some(v) => v.clone(),
82            None => Scalar::null(partial.return_dtype.clone()),
83        })
84    }
85
86    fn reset(&self, partial: &mut Self::Partial) {
87        partial.value = None;
88    }
89
90    #[inline]
91    fn is_saturated(&self, partial: &Self::Partial) -> bool {
92        partial.value.is_some()
93    }
94
95    fn try_accumulate(
96        &self,
97        partial: &mut Self::Partial,
98        batch: &ArrayRef,
99        ctx: &mut ExecutionCtx,
100    ) -> VortexResult<bool> {
101        if partial.value.is_some() {
102            return Ok(true);
103        }
104        if let Some(idx) = batch.validity()?.execute_mask(batch.len(), ctx)?.first() {
105            let scalar = batch.execute_scalar(idx, ctx)?;
106            partial.value = Some(scalar.into_nullable());
107        }
108        Ok(true)
109    }
110
111    fn accumulate(
112        &self,
113        _partial: &mut Self::Partial,
114        _batch: &Columnar,
115        _ctx: &mut ExecutionCtx,
116    ) -> VortexResult<()> {
117        unreachable!("First::try_accumulate handles all arrays")
118    }
119
120    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
121        Ok(partials)
122    }
123
124    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
125        self.to_scalar(partial)
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use vortex_buffer::buffer;
132    use vortex_error::VortexResult;
133
134    use crate::IntoArray;
135    use crate::VortexSessionExecute;
136    use crate::aggregate_fn::Accumulator;
137    use crate::aggregate_fn::AggregateFnVTable;
138    use crate::aggregate_fn::DynAccumulator;
139    use crate::aggregate_fn::EmptyOptions;
140    use crate::aggregate_fn::fns::first::First;
141    use crate::aggregate_fn::fns::first::first;
142    use crate::array_session;
143    use crate::arrays::ChunkedArray;
144    use crate::arrays::ConstantArray;
145    use crate::arrays::PrimitiveArray;
146    use crate::arrays::VarBinArray;
147    use crate::dtype::DType;
148    use crate::dtype::Nullability;
149    use crate::dtype::Nullability::Nullable;
150    use crate::dtype::PType;
151    use crate::scalar::Scalar;
152    use crate::validity::Validity;
153
154    #[test]
155    fn first_non_null() -> VortexResult<()> {
156        let array = PrimitiveArray::new(buffer![10i32, 20, 30], Validity::NonNullable).into_array();
157        let mut ctx = array_session().create_execution_ctx();
158        assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(10i32, Nullable));
159        Ok(())
160    }
161
162    #[test]
163    fn first_skips_leading_nulls() -> VortexResult<()> {
164        let array =
165            PrimitiveArray::from_option_iter([None, None, Some(7i32), Some(8)]).into_array();
166        let mut ctx = array_session().create_execution_ctx();
167        assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(7i32, Nullable));
168        Ok(())
169    }
170
171    #[test]
172    fn first_all_null() -> VortexResult<()> {
173        let array = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]).into_array();
174        let mut ctx = array_session().create_execution_ctx();
175        let dtype = DType::Primitive(PType::I32, Nullable);
176        assert_eq!(first(&array, &mut ctx)?, Scalar::null(dtype));
177        Ok(())
178    }
179
180    #[test]
181    fn first_empty() -> VortexResult<()> {
182        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
183        let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
184        let result = acc.finish()?;
185        assert_eq!(result, Scalar::null(DType::Primitive(PType::I32, Nullable)));
186        Ok(())
187    }
188
189    #[test]
190    fn first_constant() -> VortexResult<()> {
191        let array = ConstantArray::new(42i32, 10).into_array();
192        let mut ctx = array_session().create_execution_ctx();
193        assert_eq!(first(&array, &mut ctx)?, Scalar::primitive(42i32, Nullable));
194        Ok(())
195    }
196
197    #[test]
198    fn first_constant_null() -> VortexResult<()> {
199        let dtype = DType::Primitive(PType::I32, Nullable);
200        let array = ConstantArray::new(Scalar::null(dtype.clone()), 10).into_array();
201        let mut ctx = array_session().create_execution_ctx();
202        assert_eq!(first(&array, &mut ctx)?, Scalar::null(dtype));
203        Ok(())
204    }
205
206    #[test]
207    fn first_varbin() -> VortexResult<()> {
208        let array = VarBinArray::from_iter(
209            vec![None, Some("hello"), Some("world")],
210            DType::Utf8(Nullable),
211        )
212        .into_array();
213        let mut ctx = array_session().create_execution_ctx();
214        assert_eq!(first(&array, &mut ctx)?, Scalar::utf8("hello", Nullable));
215        Ok(())
216    }
217
218    #[test]
219    fn first_multi_batch_picks_earliest_non_null() -> VortexResult<()> {
220        let mut ctx = array_session().create_execution_ctx();
221        let dtype = DType::Primitive(PType::I32, Nullable);
222        let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
223
224        // First batch is all null - should not saturate.
225        let batch1 = PrimitiveArray::from_option_iter::<i32, _>([None, None]).into_array();
226        acc.accumulate(&batch1, &mut ctx)?;
227        assert!(!acc.is_saturated());
228
229        // Second batch contains the first non-null value.
230        let batch2 = PrimitiveArray::from_option_iter([None, Some(99i32), Some(100)]).into_array();
231        acc.accumulate(&batch2, &mut ctx)?;
232        assert!(acc.is_saturated());
233
234        // Third batch must be ignored - First is already saturated.
235        let batch3 = PrimitiveArray::from_option_iter([Some(1i32)]).into_array();
236        acc.accumulate(&batch3, &mut ctx)?;
237
238        let result = acc.finish()?;
239        assert_eq!(result, Scalar::primitive(99i32, Nullable));
240        Ok(())
241    }
242
243    #[test]
244    fn first_finish_resets_state() -> VortexResult<()> {
245        let mut ctx = array_session().create_execution_ctx();
246        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
247        let mut acc = Accumulator::try_new(First, EmptyOptions, dtype)?;
248
249        let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
250        acc.accumulate(&batch1, &mut ctx)?;
251        assert_eq!(acc.finish()?, Scalar::primitive(10i32, Nullable));
252
253        let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
254        acc.accumulate(&batch2, &mut ctx)?;
255        assert_eq!(acc.finish()?, Scalar::primitive(3i32, Nullable));
256        Ok(())
257    }
258
259    #[test]
260    fn first_state_merge() -> VortexResult<()> {
261        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
262        let mut state = First.empty_partial(&EmptyOptions, &dtype)?;
263
264        // A null partial means the sub-accumulator saw nothing valid - should be ignored.
265        First.combine_partials(&mut state, Scalar::null(dtype.as_nullable()))?;
266        assert!(!First.is_saturated(&state));
267
268        First.combine_partials(&mut state, Scalar::primitive(5i32, Nullable))?;
269        assert!(First.is_saturated(&state));
270
271        // Subsequent valid partials are dropped.
272        First.combine_partials(&mut state, Scalar::primitive(7i32, Nullable))?;
273        assert_eq!(First.to_scalar(&state)?, Scalar::primitive(5i32, Nullable));
274        Ok(())
275    }
276
277    #[test]
278    fn first_chunked() -> VortexResult<()> {
279        let chunk1 = PrimitiveArray::from_option_iter::<i32, _>([None, None]);
280        let chunk2 = PrimitiveArray::from_option_iter([None, Some(42i32), Some(100)]);
281        let dtype = chunk1.dtype().clone();
282        let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
283        let mut ctx = array_session().create_execution_ctx();
284        assert_eq!(
285            first(&chunked.into_array(), &mut ctx)?,
286            Scalar::primitive(42i32, Nullable)
287        );
288        Ok(())
289    }
290}