Skip to main content

vortex_array/aggregate_fn/fns/last/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_error::VortexResult;
5
6use crate::ArrayRef;
7use crate::Columnar;
8use crate::ExecutionCtx;
9use crate::aggregate_fn::Accumulator;
10use crate::aggregate_fn::AggregateFnId;
11use crate::aggregate_fn::AggregateFnVTable;
12use crate::aggregate_fn::DynAccumulator;
13use crate::aggregate_fn::EmptyOptions;
14use crate::dtype::DType;
15use crate::scalar::Scalar;
16
17/// Return the last non-null value of an array.
18///
19/// See [`Last`] for details.
20pub fn last(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
21    let mut acc = Accumulator::try_new(Last, EmptyOptions, array.dtype().clone())?;
22    acc.accumulate(array, ctx)?;
23    acc.finish()
24}
25
26/// Return the last non-null value seen across all batches.
27#[derive(Clone, Debug)]
28pub struct Last;
29
30/// Partial accumulator state for the [`Last`] aggregate.
31pub struct LastPartial {
32    /// The nullable version of the input dtype, used for the result and for empty/all-null inputs.
33    return_dtype: DType,
34    /// The last non-null value seen so far, or `None` if no non-null value has been observed.
35    value: Option<Scalar>,
36}
37
38impl AggregateFnVTable for Last {
39    type Options = EmptyOptions;
40    type Partial = LastPartial;
41
42    fn id(&self) -> AggregateFnId {
43        AggregateFnId::new_ref("vortex.last")
44    }
45
46    fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
47        unimplemented!("Last is not yet serializable");
48    }
49
50    fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
51        Some(input_dtype.as_nullable())
52    }
53
54    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
55        self.return_dtype(options, input_dtype)
56    }
57
58    fn empty_partial(
59        &self,
60        _options: &Self::Options,
61        input_dtype: &DType,
62    ) -> VortexResult<Self::Partial> {
63        Ok(LastPartial {
64            return_dtype: input_dtype.as_nullable(),
65            value: None,
66        })
67    }
68
69    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
70        // Each new non-null partial replaces the previous one; nulls are ignored.
71        if !other.is_null() {
72            partial.value = Some(other);
73        }
74        Ok(())
75    }
76
77    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
78        Ok(match &partial.value {
79            Some(v) => v.clone(),
80            None => Scalar::null(partial.return_dtype.clone()),
81        })
82    }
83
84    fn reset(&self, partial: &mut Self::Partial) {
85        partial.value = None;
86    }
87
88    #[inline]
89    fn is_saturated(&self, _partial: &Self::Partial) -> bool {
90        // Last can never short-circuit: a later batch can always supersede the current value.
91        false
92    }
93
94    fn try_accumulate(
95        &self,
96        partial: &mut Self::Partial,
97        batch: &ArrayRef,
98        _ctx: &mut ExecutionCtx,
99    ) -> VortexResult<bool> {
100        if let Some(idx) = batch.validity_mask()?.last() {
101            let scalar = batch.scalar_at(idx)?;
102            partial.value = Some(scalar.into_nullable());
103        }
104        Ok(true)
105    }
106
107    fn accumulate(
108        &self,
109        _partial: &mut Self::Partial,
110        _batch: &Columnar,
111        _ctx: &mut ExecutionCtx,
112    ) -> VortexResult<()> {
113        unreachable!("Last::try_accumulate handles all arrays")
114    }
115
116    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
117        Ok(partials)
118    }
119
120    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
121        self.to_scalar(partial)
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use vortex_buffer::buffer;
128    use vortex_error::VortexResult;
129
130    use crate::IntoArray;
131    use crate::LEGACY_SESSION;
132    use crate::VortexSessionExecute;
133    use crate::aggregate_fn::Accumulator;
134    use crate::aggregate_fn::AggregateFnVTable;
135    use crate::aggregate_fn::DynAccumulator;
136    use crate::aggregate_fn::EmptyOptions;
137    use crate::aggregate_fn::fns::last::Last;
138    use crate::aggregate_fn::fns::last::last;
139    use crate::arrays::ChunkedArray;
140    use crate::arrays::ConstantArray;
141    use crate::arrays::PrimitiveArray;
142    use crate::arrays::VarBinArray;
143    use crate::dtype::DType;
144    use crate::dtype::Nullability;
145    use crate::dtype::Nullability::Nullable;
146    use crate::dtype::PType;
147    use crate::scalar::Scalar;
148    use crate::validity::Validity;
149
150    #[test]
151    fn last_non_null() -> VortexResult<()> {
152        let array = PrimitiveArray::new(buffer![10i32, 20, 30], Validity::NonNullable).into_array();
153        let mut ctx = LEGACY_SESSION.create_execution_ctx();
154        assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(30i32, Nullable));
155        Ok(())
156    }
157
158    #[test]
159    fn last_skips_trailing_nulls() -> VortexResult<()> {
160        let array =
161            PrimitiveArray::from_option_iter([Some(7i32), Some(8), None, None]).into_array();
162        let mut ctx = LEGACY_SESSION.create_execution_ctx();
163        assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(8i32, Nullable));
164        Ok(())
165    }
166
167    #[test]
168    fn last_all_null() -> VortexResult<()> {
169        let array = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]).into_array();
170        let mut ctx = LEGACY_SESSION.create_execution_ctx();
171        let dtype = DType::Primitive(PType::I32, Nullable);
172        assert_eq!(last(&array, &mut ctx)?, Scalar::null(dtype));
173        Ok(())
174    }
175
176    #[test]
177    fn last_empty() -> VortexResult<()> {
178        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
179        let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
180        let result = acc.finish()?;
181        assert_eq!(result, Scalar::null(DType::Primitive(PType::I32, Nullable)));
182        Ok(())
183    }
184
185    #[test]
186    fn last_constant() -> VortexResult<()> {
187        let array = ConstantArray::new(42i32, 10).into_array();
188        let mut ctx = LEGACY_SESSION.create_execution_ctx();
189        assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(42i32, Nullable));
190        Ok(())
191    }
192
193    #[test]
194    fn last_constant_null() -> VortexResult<()> {
195        let dtype = DType::Primitive(PType::I32, Nullable);
196        let array = ConstantArray::new(Scalar::null(dtype.clone()), 10).into_array();
197        let mut ctx = LEGACY_SESSION.create_execution_ctx();
198        assert_eq!(last(&array, &mut ctx)?, Scalar::null(dtype));
199        Ok(())
200    }
201
202    #[test]
203    fn last_varbin() -> VortexResult<()> {
204        let array = VarBinArray::from_iter(
205            vec![Some("hello"), Some("world"), None],
206            DType::Utf8(Nullable),
207        )
208        .into_array();
209        let mut ctx = LEGACY_SESSION.create_execution_ctx();
210        assert_eq!(last(&array, &mut ctx)?, Scalar::utf8("world", Nullable));
211        Ok(())
212    }
213
214    #[test]
215    fn last_multi_batch_picks_latest_non_null() -> VortexResult<()> {
216        let mut ctx = LEGACY_SESSION.create_execution_ctx();
217        let dtype = DType::Primitive(PType::I32, Nullable);
218        let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
219
220        let batch1 = PrimitiveArray::from_option_iter([Some(1i32), Some(2)]).into_array();
221        acc.accumulate(&batch1, &mut ctx)?;
222
223        // All-null batch must not clobber the previously-stored value.
224        let batch2 = PrimitiveArray::from_option_iter::<i32, _>([None, None]).into_array();
225        acc.accumulate(&batch2, &mut ctx)?;
226
227        let batch3 = PrimitiveArray::from_option_iter([Some(99i32), None]).into_array();
228        acc.accumulate(&batch3, &mut ctx)?;
229
230        // Last is never saturated; later batches keep updating it.
231        assert!(!acc.is_saturated());
232
233        let result = acc.finish()?;
234        assert_eq!(result, Scalar::primitive(99i32, Nullable));
235        Ok(())
236    }
237
238    #[test]
239    fn last_finish_resets_state() -> VortexResult<()> {
240        let mut ctx = LEGACY_SESSION.create_execution_ctx();
241        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
242        let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
243
244        let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
245        acc.accumulate(&batch1, &mut ctx)?;
246        assert_eq!(acc.finish()?, Scalar::primitive(20i32, Nullable));
247
248        let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
249        acc.accumulate(&batch2, &mut ctx)?;
250        assert_eq!(acc.finish()?, Scalar::primitive(9i32, Nullable));
251        Ok(())
252    }
253
254    #[test]
255    fn last_state_merge() -> VortexResult<()> {
256        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
257        let mut state = Last.empty_partial(&EmptyOptions, &dtype)?;
258
259        Last.combine_partials(&mut state, Scalar::primitive(5i32, Nullable))?;
260        assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(5i32, Nullable));
261
262        // A later non-null partial replaces the prior value.
263        Last.combine_partials(&mut state, Scalar::primitive(7i32, Nullable))?;
264        assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(7i32, Nullable));
265
266        // A null partial must not clobber the stored value.
267        Last.combine_partials(&mut state, Scalar::null(dtype.as_nullable()))?;
268        assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(7i32, Nullable));
269        Ok(())
270    }
271
272    #[test]
273    fn last_chunked() -> VortexResult<()> {
274        let chunk1 = PrimitiveArray::from_option_iter([Some(42i32), Some(100)]);
275        let chunk2 = PrimitiveArray::from_option_iter::<i32, _>([None, None]);
276        let dtype = chunk1.dtype().clone();
277        let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
278        let mut ctx = LEGACY_SESSION.create_execution_ctx();
279        assert_eq!(
280            last(&chunked.into_array(), &mut ctx)?,
281            Scalar::primitive(100i32, Nullable)
282        );
283        Ok(())
284    }
285}