Skip to main content

vortex_array/aggregate_fn/fns/last/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_error::VortexResult;
5use vortex_session::registry::CachedId;
6
7use crate::ArrayRef;
8use crate::Columnar;
9use crate::ExecutionCtx;
10use crate::aggregate_fn::Accumulator;
11use crate::aggregate_fn::AggregateFnId;
12use crate::aggregate_fn::AggregateFnVTable;
13use crate::aggregate_fn::DynAccumulator;
14use crate::aggregate_fn::EmptyOptions;
15use crate::dtype::DType;
16use crate::scalar::Scalar;
17
18/// Return the last non-null value of an array.
19///
20/// See [`Last`] for details.
21pub fn last(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
22    let mut acc = Accumulator::try_new(Last, EmptyOptions, array.dtype().clone())?;
23    acc.accumulate(array, ctx)?;
24    acc.finish()
25}
26
27/// Return the last non-null value seen across all batches.
28#[derive(Clone, Debug)]
29pub struct Last;
30
31/// Partial accumulator state for the [`Last`] aggregate.
32pub struct LastPartial {
33    /// The nullable version of the input dtype, used for the result and for empty/all-null inputs.
34    return_dtype: DType,
35    /// The last non-null value seen so far, or `None` if no non-null value has been observed.
36    value: Option<Scalar>,
37}
38
39impl AggregateFnVTable for Last {
40    type Options = EmptyOptions;
41    type Partial = LastPartial;
42
43    fn id(&self) -> AggregateFnId {
44        static ID: CachedId = CachedId::new("vortex.last");
45        *ID
46    }
47
48    fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
49        unimplemented!("Last is not yet serializable");
50    }
51
52    fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
53        Some(input_dtype.as_nullable())
54    }
55
56    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
57        self.return_dtype(options, input_dtype)
58    }
59
60    fn empty_partial(
61        &self,
62        _options: &Self::Options,
63        input_dtype: &DType,
64    ) -> VortexResult<Self::Partial> {
65        Ok(LastPartial {
66            return_dtype: input_dtype.as_nullable(),
67            value: None,
68        })
69    }
70
71    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
72        // Each new non-null partial replaces the previous one; nulls are ignored.
73        if !other.is_null() {
74            partial.value = Some(other);
75        }
76        Ok(())
77    }
78
79    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
80        Ok(match &partial.value {
81            Some(v) => v.clone(),
82            None => Scalar::null(partial.return_dtype.clone()),
83        })
84    }
85
86    fn reset(&self, partial: &mut Self::Partial) {
87        partial.value = None;
88    }
89
90    #[inline]
91    fn is_saturated(&self, _partial: &Self::Partial) -> bool {
92        // Last can never short-circuit: a later batch can always supersede the current value.
93        false
94    }
95
96    fn try_accumulate(
97        &self,
98        partial: &mut Self::Partial,
99        batch: &ArrayRef,
100        ctx: &mut ExecutionCtx,
101    ) -> VortexResult<bool> {
102        if let Some(idx) = batch.validity()?.execute_mask(batch.len(), ctx)?.last() {
103            let scalar = batch.execute_scalar(idx, ctx)?;
104            partial.value = Some(scalar.into_nullable());
105        }
106        Ok(true)
107    }
108
109    fn accumulate(
110        &self,
111        _partial: &mut Self::Partial,
112        _batch: &Columnar,
113        _ctx: &mut ExecutionCtx,
114    ) -> VortexResult<()> {
115        unreachable!("Last::try_accumulate handles all arrays")
116    }
117
118    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
119        Ok(partials)
120    }
121
122    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
123        self.to_scalar(partial)
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use vortex_buffer::buffer;
130    use vortex_error::VortexResult;
131
132    use crate::IntoArray;
133    use crate::VortexSessionExecute;
134    use crate::aggregate_fn::Accumulator;
135    use crate::aggregate_fn::AggregateFnVTable;
136    use crate::aggregate_fn::DynAccumulator;
137    use crate::aggregate_fn::EmptyOptions;
138    use crate::aggregate_fn::fns::last::Last;
139    use crate::aggregate_fn::fns::last::last;
140    use crate::array_session;
141    use crate::arrays::ChunkedArray;
142    use crate::arrays::ConstantArray;
143    use crate::arrays::PrimitiveArray;
144    use crate::arrays::VarBinArray;
145    use crate::dtype::DType;
146    use crate::dtype::Nullability;
147    use crate::dtype::Nullability::Nullable;
148    use crate::dtype::PType;
149    use crate::scalar::Scalar;
150    use crate::validity::Validity;
151
152    #[test]
153    fn last_non_null() -> VortexResult<()> {
154        let array = PrimitiveArray::new(buffer![10i32, 20, 30], Validity::NonNullable).into_array();
155        let mut ctx = array_session().create_execution_ctx();
156        assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(30i32, Nullable));
157        Ok(())
158    }
159
160    #[test]
161    fn last_skips_trailing_nulls() -> VortexResult<()> {
162        let array =
163            PrimitiveArray::from_option_iter([Some(7i32), Some(8), None, None]).into_array();
164        let mut ctx = array_session().create_execution_ctx();
165        assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(8i32, Nullable));
166        Ok(())
167    }
168
169    #[test]
170    fn last_all_null() -> VortexResult<()> {
171        let array = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]).into_array();
172        let mut ctx = array_session().create_execution_ctx();
173        let dtype = DType::Primitive(PType::I32, Nullable);
174        assert_eq!(last(&array, &mut ctx)?, Scalar::null(dtype));
175        Ok(())
176    }
177
178    #[test]
179    fn last_empty() -> VortexResult<()> {
180        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
181        let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
182        let result = acc.finish()?;
183        assert_eq!(result, Scalar::null(DType::Primitive(PType::I32, Nullable)));
184        Ok(())
185    }
186
187    #[test]
188    fn last_constant() -> VortexResult<()> {
189        let array = ConstantArray::new(42i32, 10).into_array();
190        let mut ctx = array_session().create_execution_ctx();
191        assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(42i32, Nullable));
192        Ok(())
193    }
194
195    #[test]
196    fn last_constant_null() -> VortexResult<()> {
197        let dtype = DType::Primitive(PType::I32, Nullable);
198        let array = ConstantArray::new(Scalar::null(dtype.clone()), 10).into_array();
199        let mut ctx = array_session().create_execution_ctx();
200        assert_eq!(last(&array, &mut ctx)?, Scalar::null(dtype));
201        Ok(())
202    }
203
204    #[test]
205    fn last_varbin() -> VortexResult<()> {
206        let array = VarBinArray::from_iter(
207            vec![Some("hello"), Some("world"), None],
208            DType::Utf8(Nullable),
209        )
210        .into_array();
211        let mut ctx = array_session().create_execution_ctx();
212        assert_eq!(last(&array, &mut ctx)?, Scalar::utf8("world", Nullable));
213        Ok(())
214    }
215
216    #[test]
217    fn last_multi_batch_picks_latest_non_null() -> VortexResult<()> {
218        let mut ctx = array_session().create_execution_ctx();
219        let dtype = DType::Primitive(PType::I32, Nullable);
220        let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
221
222        let batch1 = PrimitiveArray::from_option_iter([Some(1i32), Some(2)]).into_array();
223        acc.accumulate(&batch1, &mut ctx)?;
224
225        // All-null batch must not clobber the previously-stored value.
226        let batch2 = PrimitiveArray::from_option_iter::<i32, _>([None, None]).into_array();
227        acc.accumulate(&batch2, &mut ctx)?;
228
229        let batch3 = PrimitiveArray::from_option_iter([Some(99i32), None]).into_array();
230        acc.accumulate(&batch3, &mut ctx)?;
231
232        // Last is never saturated; later batches keep updating it.
233        assert!(!acc.is_saturated());
234
235        let result = acc.finish()?;
236        assert_eq!(result, Scalar::primitive(99i32, Nullable));
237        Ok(())
238    }
239
240    #[test]
241    fn last_finish_resets_state() -> VortexResult<()> {
242        let mut ctx = array_session().create_execution_ctx();
243        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
244        let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
245
246        let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
247        acc.accumulate(&batch1, &mut ctx)?;
248        assert_eq!(acc.finish()?, Scalar::primitive(20i32, Nullable));
249
250        let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
251        acc.accumulate(&batch2, &mut ctx)?;
252        assert_eq!(acc.finish()?, Scalar::primitive(9i32, Nullable));
253        Ok(())
254    }
255
256    #[test]
257    fn last_state_merge() -> VortexResult<()> {
258        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
259        let mut state = Last.empty_partial(&EmptyOptions, &dtype)?;
260
261        Last.combine_partials(&mut state, Scalar::primitive(5i32, Nullable))?;
262        assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(5i32, Nullable));
263
264        // A later non-null partial replaces the prior value.
265        Last.combine_partials(&mut state, Scalar::primitive(7i32, Nullable))?;
266        assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(7i32, Nullable));
267
268        // A null partial must not clobber the stored value.
269        Last.combine_partials(&mut state, Scalar::null(dtype.as_nullable()))?;
270        assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(7i32, Nullable));
271        Ok(())
272    }
273
274    #[test]
275    fn last_chunked() -> VortexResult<()> {
276        let chunk1 = PrimitiveArray::from_option_iter([Some(42i32), Some(100)]);
277        let chunk2 = PrimitiveArray::from_option_iter::<i32, _>([None, None]);
278        let dtype = chunk1.dtype().clone();
279        let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
280        let mut ctx = array_session().create_execution_ctx();
281        assert_eq!(
282            last(&chunked.into_array(), &mut ctx)?,
283            Scalar::primitive(100i32, Nullable)
284        );
285        Ok(())
286    }
287}