vortex_array/aggregate_fn/fns/last/
mod.rs1use vortex_error::VortexResult;
5
6use crate::ArrayRef;
7use crate::Columnar;
8use crate::ExecutionCtx;
9use crate::aggregate_fn::Accumulator;
10use crate::aggregate_fn::AggregateFnId;
11use crate::aggregate_fn::AggregateFnVTable;
12use crate::aggregate_fn::DynAccumulator;
13use crate::aggregate_fn::EmptyOptions;
14use crate::dtype::DType;
15use crate::scalar::Scalar;
16
17pub fn last(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
21 let mut acc = Accumulator::try_new(Last, EmptyOptions, array.dtype().clone())?;
22 acc.accumulate(array, ctx)?;
23 acc.finish()
24}
25
26#[derive(Clone, Debug)]
28pub struct Last;
29
30pub struct LastPartial {
32 return_dtype: DType,
34 value: Option<Scalar>,
36}
37
38impl AggregateFnVTable for Last {
39 type Options = EmptyOptions;
40 type Partial = LastPartial;
41
42 fn id(&self) -> AggregateFnId {
43 AggregateFnId::new_ref("vortex.last")
44 }
45
46 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
47 unimplemented!("Last is not yet serializable");
48 }
49
50 fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
51 Some(input_dtype.as_nullable())
52 }
53
54 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
55 self.return_dtype(options, input_dtype)
56 }
57
58 fn empty_partial(
59 &self,
60 _options: &Self::Options,
61 input_dtype: &DType,
62 ) -> VortexResult<Self::Partial> {
63 Ok(LastPartial {
64 return_dtype: input_dtype.as_nullable(),
65 value: None,
66 })
67 }
68
69 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
70 if !other.is_null() {
72 partial.value = Some(other);
73 }
74 Ok(())
75 }
76
77 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
78 Ok(match &partial.value {
79 Some(v) => v.clone(),
80 None => Scalar::null(partial.return_dtype.clone()),
81 })
82 }
83
84 fn reset(&self, partial: &mut Self::Partial) {
85 partial.value = None;
86 }
87
88 #[inline]
89 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
90 false
92 }
93
94 fn try_accumulate(
95 &self,
96 partial: &mut Self::Partial,
97 batch: &ArrayRef,
98 _ctx: &mut ExecutionCtx,
99 ) -> VortexResult<bool> {
100 if let Some(idx) = batch.validity_mask()?.last() {
101 let scalar = batch.scalar_at(idx)?;
102 partial.value = Some(scalar.into_nullable());
103 }
104 Ok(true)
105 }
106
107 fn accumulate(
108 &self,
109 _partial: &mut Self::Partial,
110 _batch: &Columnar,
111 _ctx: &mut ExecutionCtx,
112 ) -> VortexResult<()> {
113 unreachable!("Last::try_accumulate handles all arrays")
114 }
115
116 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
117 Ok(partials)
118 }
119
120 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
121 self.to_scalar(partial)
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use vortex_buffer::buffer;
128 use vortex_error::VortexResult;
129
130 use crate::IntoArray;
131 use crate::LEGACY_SESSION;
132 use crate::VortexSessionExecute;
133 use crate::aggregate_fn::Accumulator;
134 use crate::aggregate_fn::AggregateFnVTable;
135 use crate::aggregate_fn::DynAccumulator;
136 use crate::aggregate_fn::EmptyOptions;
137 use crate::aggregate_fn::fns::last::Last;
138 use crate::aggregate_fn::fns::last::last;
139 use crate::arrays::ChunkedArray;
140 use crate::arrays::ConstantArray;
141 use crate::arrays::PrimitiveArray;
142 use crate::arrays::VarBinArray;
143 use crate::dtype::DType;
144 use crate::dtype::Nullability;
145 use crate::dtype::Nullability::Nullable;
146 use crate::dtype::PType;
147 use crate::scalar::Scalar;
148 use crate::validity::Validity;
149
150 #[test]
151 fn last_non_null() -> VortexResult<()> {
152 let array = PrimitiveArray::new(buffer![10i32, 20, 30], Validity::NonNullable).into_array();
153 let mut ctx = LEGACY_SESSION.create_execution_ctx();
154 assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(30i32, Nullable));
155 Ok(())
156 }
157
158 #[test]
159 fn last_skips_trailing_nulls() -> VortexResult<()> {
160 let array =
161 PrimitiveArray::from_option_iter([Some(7i32), Some(8), None, None]).into_array();
162 let mut ctx = LEGACY_SESSION.create_execution_ctx();
163 assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(8i32, Nullable));
164 Ok(())
165 }
166
167 #[test]
168 fn last_all_null() -> VortexResult<()> {
169 let array = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]).into_array();
170 let mut ctx = LEGACY_SESSION.create_execution_ctx();
171 let dtype = DType::Primitive(PType::I32, Nullable);
172 assert_eq!(last(&array, &mut ctx)?, Scalar::null(dtype));
173 Ok(())
174 }
175
176 #[test]
177 fn last_empty() -> VortexResult<()> {
178 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
179 let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
180 let result = acc.finish()?;
181 assert_eq!(result, Scalar::null(DType::Primitive(PType::I32, Nullable)));
182 Ok(())
183 }
184
185 #[test]
186 fn last_constant() -> VortexResult<()> {
187 let array = ConstantArray::new(42i32, 10).into_array();
188 let mut ctx = LEGACY_SESSION.create_execution_ctx();
189 assert_eq!(last(&array, &mut ctx)?, Scalar::primitive(42i32, Nullable));
190 Ok(())
191 }
192
193 #[test]
194 fn last_constant_null() -> VortexResult<()> {
195 let dtype = DType::Primitive(PType::I32, Nullable);
196 let array = ConstantArray::new(Scalar::null(dtype.clone()), 10).into_array();
197 let mut ctx = LEGACY_SESSION.create_execution_ctx();
198 assert_eq!(last(&array, &mut ctx)?, Scalar::null(dtype));
199 Ok(())
200 }
201
202 #[test]
203 fn last_varbin() -> VortexResult<()> {
204 let array = VarBinArray::from_iter(
205 vec![Some("hello"), Some("world"), None],
206 DType::Utf8(Nullable),
207 )
208 .into_array();
209 let mut ctx = LEGACY_SESSION.create_execution_ctx();
210 assert_eq!(last(&array, &mut ctx)?, Scalar::utf8("world", Nullable));
211 Ok(())
212 }
213
214 #[test]
215 fn last_multi_batch_picks_latest_non_null() -> VortexResult<()> {
216 let mut ctx = LEGACY_SESSION.create_execution_ctx();
217 let dtype = DType::Primitive(PType::I32, Nullable);
218 let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
219
220 let batch1 = PrimitiveArray::from_option_iter([Some(1i32), Some(2)]).into_array();
221 acc.accumulate(&batch1, &mut ctx)?;
222
223 let batch2 = PrimitiveArray::from_option_iter::<i32, _>([None, None]).into_array();
225 acc.accumulate(&batch2, &mut ctx)?;
226
227 let batch3 = PrimitiveArray::from_option_iter([Some(99i32), None]).into_array();
228 acc.accumulate(&batch3, &mut ctx)?;
229
230 assert!(!acc.is_saturated());
232
233 let result = acc.finish()?;
234 assert_eq!(result, Scalar::primitive(99i32, Nullable));
235 Ok(())
236 }
237
238 #[test]
239 fn last_finish_resets_state() -> VortexResult<()> {
240 let mut ctx = LEGACY_SESSION.create_execution_ctx();
241 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
242 let mut acc = Accumulator::try_new(Last, EmptyOptions, dtype)?;
243
244 let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
245 acc.accumulate(&batch1, &mut ctx)?;
246 assert_eq!(acc.finish()?, Scalar::primitive(20i32, Nullable));
247
248 let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
249 acc.accumulate(&batch2, &mut ctx)?;
250 assert_eq!(acc.finish()?, Scalar::primitive(9i32, Nullable));
251 Ok(())
252 }
253
254 #[test]
255 fn last_state_merge() -> VortexResult<()> {
256 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
257 let mut state = Last.empty_partial(&EmptyOptions, &dtype)?;
258
259 Last.combine_partials(&mut state, Scalar::primitive(5i32, Nullable))?;
260 assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(5i32, Nullable));
261
262 Last.combine_partials(&mut state, Scalar::primitive(7i32, Nullable))?;
264 assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(7i32, Nullable));
265
266 Last.combine_partials(&mut state, Scalar::null(dtype.as_nullable()))?;
268 assert_eq!(Last.to_scalar(&state)?, Scalar::primitive(7i32, Nullable));
269 Ok(())
270 }
271
272 #[test]
273 fn last_chunked() -> VortexResult<()> {
274 let chunk1 = PrimitiveArray::from_option_iter([Some(42i32), Some(100)]);
275 let chunk2 = PrimitiveArray::from_option_iter::<i32, _>([None, None]);
276 let dtype = chunk1.dtype().clone();
277 let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
278 let mut ctx = LEGACY_SESSION.create_execution_ctx();
279 assert_eq!(
280 last(&chunked.into_array(), &mut ctx)?,
281 Scalar::primitive(100i32, Nullable)
282 );
283 Ok(())
284 }
285}