datafusion_functions_aggregate/
first_last.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the FIRST_VALUE/LAST_VALUE aggregations.
19
20use std::any::Any;
21use std::fmt::Debug;
22use std::hash::{DefaultHasher, Hash, Hasher};
23use std::mem::size_of_val;
24use std::sync::Arc;
25
26use arrow::array::{
27    Array, ArrayRef, ArrowPrimitiveType, AsArray, BooleanArray, BooleanBufferBuilder,
28    PrimitiveArray,
29};
30use arrow::buffer::{BooleanBuffer, NullBuffer};
31use arrow::compute::{self, LexicographicalComparator, SortColumn, SortOptions};
32use arrow::datatypes::{
33    DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, FieldRef,
34    Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
35    Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
36    TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
37    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
38    UInt8Type,
39};
40use datafusion_common::cast::as_boolean_array;
41use datafusion_common::utils::{compare_rows, extract_row_at_idx_to_buf, get_row_at_idx};
42use datafusion_common::{
43    arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
44};
45use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
46use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
47use datafusion_expr::{
48    Accumulator, AggregateUDFImpl, Documentation, EmitTo, Expr, ExprFunctionExt,
49    GroupsAccumulator, ReversedUDAF, Signature, SortExpr, Volatility,
50};
51use datafusion_functions_aggregate_common::utils::get_sort_options;
52use datafusion_macros::user_doc;
53use datafusion_physical_expr_common::sort_expr::LexOrdering;
54
55create_func!(FirstValue, first_value_udaf);
56create_func!(LastValue, last_value_udaf);
57
58/// Returns the first value in a group of values.
59pub fn first_value(expression: Expr, order_by: Vec<SortExpr>) -> Expr {
60    first_value_udaf()
61        .call(vec![expression])
62        .order_by(order_by)
63        .build()
64        // guaranteed to be `Expr::AggregateFunction`
65        .unwrap()
66}
67
68/// Returns the last value in a group of values.
69pub fn last_value(expression: Expr, order_by: Vec<SortExpr>) -> Expr {
70    last_value_udaf()
71        .call(vec![expression])
72        .order_by(order_by)
73        .build()
74        // guaranteed to be `Expr::AggregateFunction`
75        .unwrap()
76}
77
78#[user_doc(
79    doc_section(label = "General Functions"),
80    description = "Returns the first element in an aggregation group according to the requested ordering. If no ordering is given, returns an arbitrary element from the group.",
81    syntax_example = "first_value(expression [ORDER BY expression])",
82    sql_example = r#"```sql
83> SELECT first_value(column_name ORDER BY other_column) FROM table_name;
84+-----------------------------------------------+
85| first_value(column_name ORDER BY other_column)|
86+-----------------------------------------------+
87| first_element                                 |
88+-----------------------------------------------+
89```"#,
90    standard_argument(name = "expression",)
91)]
92pub struct FirstValue {
93    signature: Signature,
94    is_input_pre_ordered: bool,
95}
96
97impl Debug for FirstValue {
98    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
99        f.debug_struct("FirstValue")
100            .field("name", &self.name())
101            .field("signature", &self.signature)
102            .field("accumulator", &"<FUNC>")
103            .finish()
104    }
105}
106
107impl Default for FirstValue {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl FirstValue {
114    pub fn new() -> Self {
115        Self {
116            signature: Signature::any(1, Volatility::Immutable),
117            is_input_pre_ordered: false,
118        }
119    }
120}
121
122impl AggregateUDFImpl for FirstValue {
123    fn as_any(&self) -> &dyn Any {
124        self
125    }
126
127    fn name(&self) -> &str {
128        "first_value"
129    }
130
131    fn signature(&self) -> &Signature {
132        &self.signature
133    }
134
135    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
136        Ok(arg_types[0].clone())
137    }
138
139    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
140        let Some(ordering) = LexOrdering::new(acc_args.order_bys.to_vec()) else {
141            return TrivialFirstValueAccumulator::try_new(
142                acc_args.return_field.data_type(),
143                acc_args.ignore_nulls,
144            )
145            .map(|acc| Box::new(acc) as _);
146        };
147        let ordering_dtypes = ordering
148            .iter()
149            .map(|e| e.expr.data_type(acc_args.schema))
150            .collect::<Result<Vec<_>>>()?;
151        Ok(Box::new(FirstValueAccumulator::try_new(
152            acc_args.return_field.data_type(),
153            &ordering_dtypes,
154            ordering,
155            self.is_input_pre_ordered,
156            acc_args.ignore_nulls,
157        )?))
158    }
159
160    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
161        let mut fields = vec![Field::new(
162            format_state_name(args.name, "first_value"),
163            args.return_type().clone(),
164            true,
165        )
166        .into()];
167        fields.extend(args.ordering_fields.iter().cloned());
168        fields.push(Field::new("is_set", DataType::Boolean, true).into());
169        Ok(fields)
170    }
171
172    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
173        use DataType::*;
174        !args.order_bys.is_empty()
175            && matches!(
176                args.return_field.data_type(),
177                Int8 | Int16
178                    | Int32
179                    | Int64
180                    | UInt8
181                    | UInt16
182                    | UInt32
183                    | UInt64
184                    | Float16
185                    | Float32
186                    | Float64
187                    | Decimal128(_, _)
188                    | Decimal256(_, _)
189                    | Date32
190                    | Date64
191                    | Time32(_)
192                    | Time64(_)
193                    | Timestamp(_, _)
194            )
195    }
196
197    fn create_groups_accumulator(
198        &self,
199        args: AccumulatorArgs,
200    ) -> Result<Box<dyn GroupsAccumulator>> {
201        fn create_accumulator<T: ArrowPrimitiveType + Send>(
202            args: AccumulatorArgs,
203        ) -> Result<Box<dyn GroupsAccumulator>> {
204            let Some(ordering) = LexOrdering::new(args.order_bys.to_vec()) else {
205                return internal_err!("Groups accumulator must have an ordering.");
206            };
207
208            let ordering_dtypes = ordering
209                .iter()
210                .map(|e| e.expr.data_type(args.schema))
211                .collect::<Result<Vec<_>>>()?;
212
213            FirstPrimitiveGroupsAccumulator::<T>::try_new(
214                ordering,
215                args.ignore_nulls,
216                args.return_field.data_type(),
217                &ordering_dtypes,
218                true,
219            )
220            .map(|acc| Box::new(acc) as _)
221        }
222
223        match args.return_field.data_type() {
224            DataType::Int8 => create_accumulator::<Int8Type>(args),
225            DataType::Int16 => create_accumulator::<Int16Type>(args),
226            DataType::Int32 => create_accumulator::<Int32Type>(args),
227            DataType::Int64 => create_accumulator::<Int64Type>(args),
228            DataType::UInt8 => create_accumulator::<UInt8Type>(args),
229            DataType::UInt16 => create_accumulator::<UInt16Type>(args),
230            DataType::UInt32 => create_accumulator::<UInt32Type>(args),
231            DataType::UInt64 => create_accumulator::<UInt64Type>(args),
232            DataType::Float16 => create_accumulator::<Float16Type>(args),
233            DataType::Float32 => create_accumulator::<Float32Type>(args),
234            DataType::Float64 => create_accumulator::<Float64Type>(args),
235
236            DataType::Decimal128(_, _) => create_accumulator::<Decimal128Type>(args),
237            DataType::Decimal256(_, _) => create_accumulator::<Decimal256Type>(args),
238
239            DataType::Timestamp(TimeUnit::Second, _) => {
240                create_accumulator::<TimestampSecondType>(args)
241            }
242            DataType::Timestamp(TimeUnit::Millisecond, _) => {
243                create_accumulator::<TimestampMillisecondType>(args)
244            }
245            DataType::Timestamp(TimeUnit::Microsecond, _) => {
246                create_accumulator::<TimestampMicrosecondType>(args)
247            }
248            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
249                create_accumulator::<TimestampNanosecondType>(args)
250            }
251
252            DataType::Date32 => create_accumulator::<Date32Type>(args),
253            DataType::Date64 => create_accumulator::<Date64Type>(args),
254            DataType::Time32(TimeUnit::Second) => {
255                create_accumulator::<Time32SecondType>(args)
256            }
257            DataType::Time32(TimeUnit::Millisecond) => {
258                create_accumulator::<Time32MillisecondType>(args)
259            }
260
261            DataType::Time64(TimeUnit::Microsecond) => {
262                create_accumulator::<Time64MicrosecondType>(args)
263            }
264            DataType::Time64(TimeUnit::Nanosecond) => {
265                create_accumulator::<Time64NanosecondType>(args)
266            }
267
268            _ => internal_err!(
269                "GroupsAccumulator not supported for first_value({})",
270                args.return_field.data_type()
271            ),
272        }
273    }
274
275    fn with_beneficial_ordering(
276        self: Arc<Self>,
277        beneficial_ordering: bool,
278    ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
279        Ok(Some(Arc::new(Self {
280            signature: self.signature.clone(),
281            is_input_pre_ordered: beneficial_ordering,
282        })))
283    }
284
285    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
286        AggregateOrderSensitivity::Beneficial
287    }
288
289    fn reverse_expr(&self) -> ReversedUDAF {
290        ReversedUDAF::Reversed(last_value_udaf())
291    }
292
293    fn documentation(&self) -> Option<&Documentation> {
294        self.doc()
295    }
296
297    fn equals(&self, other: &dyn AggregateUDFImpl) -> bool {
298        let Some(other) = other.as_any().downcast_ref::<Self>() else {
299            return false;
300        };
301        let Self {
302            signature,
303            is_input_pre_ordered,
304        } = self;
305        signature == &other.signature
306            && is_input_pre_ordered == &other.is_input_pre_ordered
307    }
308
309    fn hash_value(&self) -> u64 {
310        let Self {
311            signature,
312            is_input_pre_ordered,
313        } = self;
314        let mut hasher = DefaultHasher::new();
315        std::any::type_name::<Self>().hash(&mut hasher);
316        signature.hash(&mut hasher);
317        is_input_pre_ordered.hash(&mut hasher);
318        hasher.finish()
319    }
320}
321
322// TODO: rename to PrimitiveGroupsAccumulator
323struct FirstPrimitiveGroupsAccumulator<T>
324where
325    T: ArrowPrimitiveType + Send,
326{
327    // ================ state ===========
328    vals: Vec<T::Native>,
329    // Stores ordering values, of the aggregator requirement corresponding to first value
330    // of the aggregator.
331    // The `orderings` are stored row-wise, meaning that `orderings[group_idx]`
332    // represents the ordering values corresponding to the `group_idx`-th group.
333    orderings: Vec<Vec<ScalarValue>>,
334    // At the beginning, `is_sets[group_idx]` is false, which means `first` is not seen yet.
335    // Once we see the first value, we set the `is_sets[group_idx]` flag
336    is_sets: BooleanBufferBuilder,
337    // null_builder[group_idx] == false => vals[group_idx] is null
338    null_builder: BooleanBufferBuilder,
339    // size of `self.orderings`
340    // Calculating the memory usage of `self.orderings` using `ScalarValue::size_of_vec` is quite costly.
341    // Therefore, we cache it and compute `size_of` only after each update
342    // to avoid calling `ScalarValue::size_of_vec` by Self.size.
343    size_of_orderings: usize,
344
345    // buffer for `get_filtered_min_of_each_group`
346    // filter_min_of_each_group_buf.0[group_idx] -> idx_in_val
347    // only valid if filter_min_of_each_group_buf.1[group_idx] == true
348    // TODO: rename to extreme_of_each_group_buf
349    min_of_each_group_buf: (Vec<usize>, BooleanBufferBuilder),
350
351    // =========== option ============
352
353    // Stores the applicable ordering requirement.
354    ordering_req: LexOrdering,
355    // true: take first element in an aggregation group according to the requested ordering.
356    // false: take last element in an aggregation group according to the requested ordering.
357    pick_first_in_group: bool,
358    // derived from `ordering_req`.
359    sort_options: Vec<SortOptions>,
360    // Ignore null values.
361    ignore_nulls: bool,
362    /// The output type
363    data_type: DataType,
364    default_orderings: Vec<ScalarValue>,
365}
366
367impl<T> FirstPrimitiveGroupsAccumulator<T>
368where
369    T: ArrowPrimitiveType + Send,
370{
371    fn try_new(
372        ordering_req: LexOrdering,
373        ignore_nulls: bool,
374        data_type: &DataType,
375        ordering_dtypes: &[DataType],
376        pick_first_in_group: bool,
377    ) -> Result<Self> {
378        let default_orderings = ordering_dtypes
379            .iter()
380            .map(ScalarValue::try_from)
381            .collect::<Result<_>>()?;
382
383        let sort_options = get_sort_options(&ordering_req);
384
385        Ok(Self {
386            null_builder: BooleanBufferBuilder::new(0),
387            ordering_req,
388            sort_options,
389            ignore_nulls,
390            default_orderings,
391            data_type: data_type.clone(),
392            vals: Vec::new(),
393            orderings: Vec::new(),
394            is_sets: BooleanBufferBuilder::new(0),
395            size_of_orderings: 0,
396            min_of_each_group_buf: (Vec::new(), BooleanBufferBuilder::new(0)),
397            pick_first_in_group,
398        })
399    }
400
401    fn should_update_state(
402        &self,
403        group_idx: usize,
404        new_ordering_values: &[ScalarValue],
405    ) -> Result<bool> {
406        if !self.is_sets.get_bit(group_idx) {
407            return Ok(true);
408        }
409
410        assert!(new_ordering_values.len() == self.ordering_req.len());
411        let current_ordering = &self.orderings[group_idx];
412        compare_rows(current_ordering, new_ordering_values, &self.sort_options).map(|x| {
413            if self.pick_first_in_group {
414                x.is_gt()
415            } else {
416                x.is_lt()
417            }
418        })
419    }
420
421    fn take_orderings(&mut self, emit_to: EmitTo) -> Vec<Vec<ScalarValue>> {
422        let result = emit_to.take_needed(&mut self.orderings);
423
424        match emit_to {
425            EmitTo::All => self.size_of_orderings = 0,
426            EmitTo::First(_) => {
427                self.size_of_orderings -=
428                    result.iter().map(ScalarValue::size_of_vec).sum::<usize>()
429            }
430        }
431
432        result
433    }
434
435    fn take_need(
436        bool_buf_builder: &mut BooleanBufferBuilder,
437        emit_to: EmitTo,
438    ) -> BooleanBuffer {
439        let bool_buf = bool_buf_builder.finish();
440        match emit_to {
441            EmitTo::All => bool_buf,
442            EmitTo::First(n) => {
443                // split off the first N values in seen_values
444                //
445                // TODO make this more efficient rather than two
446                // copies and bitwise manipulation
447                let first_n: BooleanBuffer = bool_buf.iter().take(n).collect();
448                // reset the existing buffer
449                for b in bool_buf.iter().skip(n) {
450                    bool_buf_builder.append(b);
451                }
452                first_n
453            }
454        }
455    }
456
457    fn resize_states(&mut self, new_size: usize) {
458        self.vals.resize(new_size, T::default_value());
459
460        self.null_builder.resize(new_size);
461
462        if self.orderings.len() < new_size {
463            let current_len = self.orderings.len();
464
465            self.orderings
466                .resize(new_size, self.default_orderings.clone());
467
468            self.size_of_orderings += (new_size - current_len)
469                * ScalarValue::size_of_vec(
470                    // Note: In some cases (such as in the unit test below)
471                    // ScalarValue::size_of_vec(&self.default_orderings) != ScalarValue::size_of_vec(&self.default_orderings.clone())
472                    // This may be caused by the different vec.capacity() values?
473                    self.orderings.last().unwrap(),
474                );
475        }
476
477        self.is_sets.resize(new_size);
478
479        self.min_of_each_group_buf.0.resize(new_size, 0);
480        self.min_of_each_group_buf.1.resize(new_size);
481    }
482
483    fn update_state(
484        &mut self,
485        group_idx: usize,
486        orderings: &[ScalarValue],
487        new_val: T::Native,
488        is_null: bool,
489    ) {
490        self.vals[group_idx] = new_val;
491        self.is_sets.set_bit(group_idx, true);
492
493        self.null_builder.set_bit(group_idx, !is_null);
494
495        assert!(orderings.len() == self.ordering_req.len());
496        let old_size = ScalarValue::size_of_vec(&self.orderings[group_idx]);
497        self.orderings[group_idx].clear();
498        self.orderings[group_idx].extend_from_slice(orderings);
499        let new_size = ScalarValue::size_of_vec(&self.orderings[group_idx]);
500        self.size_of_orderings = self.size_of_orderings - old_size + new_size;
501    }
502
503    fn take_state(
504        &mut self,
505        emit_to: EmitTo,
506    ) -> (ArrayRef, Vec<Vec<ScalarValue>>, BooleanBuffer) {
507        emit_to.take_needed(&mut self.min_of_each_group_buf.0);
508        self.min_of_each_group_buf
509            .1
510            .truncate(self.min_of_each_group_buf.0.len());
511
512        (
513            self.take_vals_and_null_buf(emit_to),
514            self.take_orderings(emit_to),
515            Self::take_need(&mut self.is_sets, emit_to),
516        )
517    }
518
519    // should be used in test only
520    #[cfg(test)]
521    fn compute_size_of_orderings(&self) -> usize {
522        self.orderings
523            .iter()
524            .map(ScalarValue::size_of_vec)
525            .sum::<usize>()
526    }
527    /// Returns a vector of tuples `(group_idx, idx_in_val)` representing the index of the
528    /// minimum value in `orderings` for each group, using lexicographical comparison.
529    /// Values are filtered using `opt_filter` and `is_set_arr` if provided.
530    /// TODO: rename to get_filtered_extreme_of_each_group
531    fn get_filtered_min_of_each_group(
532        &mut self,
533        orderings: &[ArrayRef],
534        group_indices: &[usize],
535        opt_filter: Option<&BooleanArray>,
536        vals: &PrimitiveArray<T>,
537        is_set_arr: Option<&BooleanArray>,
538    ) -> Result<Vec<(usize, usize)>> {
539        // Set all values in min_of_each_group_buf.1 to false.
540        self.min_of_each_group_buf.1.truncate(0);
541        self.min_of_each_group_buf
542            .1
543            .append_n(self.vals.len(), false);
544
545        // No need to call `clear` since `self.min_of_each_group_buf.0[group_idx]`
546        // is only valid when `self.min_of_each_group_buf.1[group_idx] == true`.
547
548        let comparator = {
549            assert_eq!(orderings.len(), self.ordering_req.len());
550            let sort_columns = orderings
551                .iter()
552                .zip(self.ordering_req.iter())
553                .map(|(array, req)| SortColumn {
554                    values: Arc::clone(array),
555                    options: Some(req.options),
556                })
557                .collect::<Vec<_>>();
558
559            LexicographicalComparator::try_new(&sort_columns)?
560        };
561
562        for (idx_in_val, group_idx) in group_indices.iter().enumerate() {
563            let group_idx = *group_idx;
564
565            let passed_filter = opt_filter.is_none_or(|x| x.value(idx_in_val));
566            let is_set = is_set_arr.is_none_or(|x| x.value(idx_in_val));
567
568            if !passed_filter || !is_set {
569                continue;
570            }
571
572            if self.ignore_nulls && vals.is_null(idx_in_val) {
573                continue;
574            }
575
576            let is_valid = self.min_of_each_group_buf.1.get_bit(group_idx);
577
578            if !is_valid {
579                self.min_of_each_group_buf.1.set_bit(group_idx, true);
580                self.min_of_each_group_buf.0[group_idx] = idx_in_val;
581            } else {
582                let ordering = comparator
583                    .compare(self.min_of_each_group_buf.0[group_idx], idx_in_val);
584
585                if (ordering.is_gt() && self.pick_first_in_group)
586                    || (ordering.is_lt() && !self.pick_first_in_group)
587                {
588                    self.min_of_each_group_buf.0[group_idx] = idx_in_val;
589                }
590            }
591        }
592
593        Ok(self
594            .min_of_each_group_buf
595            .0
596            .iter()
597            .enumerate()
598            .filter(|(group_idx, _)| self.min_of_each_group_buf.1.get_bit(*group_idx))
599            .map(|(group_idx, idx_in_val)| (group_idx, *idx_in_val))
600            .collect::<Vec<_>>())
601    }
602
603    fn take_vals_and_null_buf(&mut self, emit_to: EmitTo) -> ArrayRef {
604        let r = emit_to.take_needed(&mut self.vals);
605
606        let null_buf = NullBuffer::new(Self::take_need(&mut self.null_builder, emit_to));
607
608        let values = PrimitiveArray::<T>::new(r.into(), Some(null_buf)) // no copy
609            .with_data_type(self.data_type.clone());
610        Arc::new(values)
611    }
612}
613
614impl<T> GroupsAccumulator for FirstPrimitiveGroupsAccumulator<T>
615where
616    T: ArrowPrimitiveType + Send,
617{
618    fn update_batch(
619        &mut self,
620        // e.g. first_value(a order by b): values_and_order_cols will be [a, b]
621        values_and_order_cols: &[ArrayRef],
622        group_indices: &[usize],
623        opt_filter: Option<&BooleanArray>,
624        total_num_groups: usize,
625    ) -> Result<()> {
626        self.resize_states(total_num_groups);
627
628        let vals = values_and_order_cols[0].as_primitive::<T>();
629
630        let mut ordering_buf = Vec::with_capacity(self.ordering_req.len());
631
632        // The overhead of calling `extract_row_at_idx_to_buf` is somewhat high, so we need to minimize its calls as much as possible.
633        for (group_idx, idx) in self
634            .get_filtered_min_of_each_group(
635                &values_and_order_cols[1..],
636                group_indices,
637                opt_filter,
638                vals,
639                None,
640            )?
641            .into_iter()
642        {
643            extract_row_at_idx_to_buf(
644                &values_and_order_cols[1..],
645                idx,
646                &mut ordering_buf,
647            )?;
648
649            if self.should_update_state(group_idx, &ordering_buf)? {
650                self.update_state(
651                    group_idx,
652                    &ordering_buf,
653                    vals.value(idx),
654                    vals.is_null(idx),
655                );
656            }
657        }
658
659        Ok(())
660    }
661
662    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
663        Ok(self.take_state(emit_to).0)
664    }
665
666    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
667        let (val_arr, orderings, is_sets) = self.take_state(emit_to);
668        let mut result = Vec::with_capacity(self.orderings.len() + 2);
669
670        result.push(val_arr);
671
672        let ordering_cols = {
673            let mut ordering_cols = Vec::with_capacity(self.ordering_req.len());
674            for _ in 0..self.ordering_req.len() {
675                ordering_cols.push(Vec::with_capacity(self.orderings.len()));
676            }
677            for row in orderings.into_iter() {
678                assert_eq!(row.len(), self.ordering_req.len());
679                for (col_idx, ordering) in row.into_iter().enumerate() {
680                    ordering_cols[col_idx].push(ordering);
681                }
682            }
683
684            ordering_cols
685        };
686        for ordering_col in ordering_cols {
687            result.push(ScalarValue::iter_to_array(ordering_col)?);
688        }
689
690        result.push(Arc::new(BooleanArray::new(is_sets, None)));
691
692        Ok(result)
693    }
694
695    fn merge_batch(
696        &mut self,
697        values: &[ArrayRef],
698        group_indices: &[usize],
699        opt_filter: Option<&BooleanArray>,
700        total_num_groups: usize,
701    ) -> Result<()> {
702        self.resize_states(total_num_groups);
703
704        let mut ordering_buf = Vec::with_capacity(self.ordering_req.len());
705
706        let (is_set_arr, val_and_order_cols) = match values.split_last() {
707            Some(result) => result,
708            None => return internal_err!("Empty row in FIRST_VALUE"),
709        };
710
711        let is_set_arr = as_boolean_array(is_set_arr)?;
712
713        let vals = values[0].as_primitive::<T>();
714        // The overhead of calling `extract_row_at_idx_to_buf` is somewhat high, so we need to minimize its calls as much as possible.
715        let groups = self.get_filtered_min_of_each_group(
716            &val_and_order_cols[1..],
717            group_indices,
718            opt_filter,
719            vals,
720            Some(is_set_arr),
721        )?;
722
723        for (group_idx, idx) in groups.into_iter() {
724            extract_row_at_idx_to_buf(&val_and_order_cols[1..], idx, &mut ordering_buf)?;
725
726            if self.should_update_state(group_idx, &ordering_buf)? {
727                self.update_state(
728                    group_idx,
729                    &ordering_buf,
730                    vals.value(idx),
731                    vals.is_null(idx),
732                );
733            }
734        }
735
736        Ok(())
737    }
738
739    fn size(&self) -> usize {
740        self.vals.capacity() * size_of::<T::Native>()
741            + self.null_builder.capacity() / 8 // capacity is in bits, so convert to bytes
742            + self.is_sets.capacity() / 8
743            + self.size_of_orderings
744            + self.min_of_each_group_buf.0.capacity() * size_of::<usize>()
745            + self.min_of_each_group_buf.1.capacity() / 8
746    }
747
748    fn supports_convert_to_state(&self) -> bool {
749        true
750    }
751
752    fn convert_to_state(
753        &self,
754        values: &[ArrayRef],
755        opt_filter: Option<&BooleanArray>,
756    ) -> Result<Vec<ArrayRef>> {
757        let mut result = values.to_vec();
758        match opt_filter {
759            Some(f) => {
760                result.push(Arc::new(f.clone()));
761                Ok(result)
762            }
763            None => {
764                result.push(Arc::new(BooleanArray::from(vec![true; values[0].len()])));
765                Ok(result)
766            }
767        }
768    }
769}
770
771/// This accumulator is used when there is no ordering specified for the
772/// `FIRST_VALUE` aggregation. It simply returns the first value it sees
773/// according to the pre-existing ordering of the input data, and provides
774/// a fast path for this case without needing to maintain any ordering state.
775#[derive(Debug)]
776pub struct TrivialFirstValueAccumulator {
777    first: ScalarValue,
778    // Whether we have seen the first value yet.
779    is_set: bool,
780    // Ignore null values.
781    ignore_nulls: bool,
782}
783
784impl TrivialFirstValueAccumulator {
785    /// Creates a new `TrivialFirstValueAccumulator` for the given `data_type`.
786    pub fn try_new(data_type: &DataType, ignore_nulls: bool) -> Result<Self> {
787        ScalarValue::try_from(data_type).map(|first| Self {
788            first,
789            is_set: false,
790            ignore_nulls,
791        })
792    }
793}
794
795impl Accumulator for TrivialFirstValueAccumulator {
796    fn state(&mut self) -> Result<Vec<ScalarValue>> {
797        Ok(vec![self.first.clone(), ScalarValue::from(self.is_set)])
798    }
799
800    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
801        if !self.is_set {
802            // Get first entry according to the pre-existing ordering (0th index):
803            let value = &values[0];
804            let mut first_idx = None;
805            if self.ignore_nulls {
806                // If ignoring nulls, find the first non-null value.
807                for i in 0..value.len() {
808                    if !value.is_null(i) {
809                        first_idx = Some(i);
810                        break;
811                    }
812                }
813            } else if !value.is_empty() {
814                // If not ignoring nulls, return the first value if it exists.
815                first_idx = Some(0);
816            }
817            if let Some(first_idx) = first_idx {
818                let mut row = get_row_at_idx(values, first_idx)?;
819                self.first = row.swap_remove(0);
820                self.first.compact();
821                self.is_set = true;
822            }
823        }
824        Ok(())
825    }
826
827    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
828        // FIRST_VALUE(first1, first2, first3, ...)
829        // Second index contains is_set flag.
830        if !self.is_set {
831            let flags = states[1].as_boolean();
832            let filtered_states =
833                filter_states_according_to_is_set(&states[0..1], flags)?;
834            if let Some(first) = filtered_states.first() {
835                if !first.is_empty() {
836                    self.first = ScalarValue::try_from_array(first, 0)?;
837                    self.is_set = true;
838                }
839            }
840        }
841        Ok(())
842    }
843
844    fn evaluate(&mut self) -> Result<ScalarValue> {
845        Ok(self.first.clone())
846    }
847
848    fn size(&self) -> usize {
849        size_of_val(self) - size_of_val(&self.first) + self.first.size()
850    }
851}
852
853#[derive(Debug)]
854pub struct FirstValueAccumulator {
855    first: ScalarValue,
856    // Whether we have seen the first value yet.
857    is_set: bool,
858    // Stores values of the ordering columns corresponding to the first value.
859    // These values are used during merging of multiple partitions.
860    orderings: Vec<ScalarValue>,
861    // Stores the applicable ordering requirement.
862    ordering_req: LexOrdering,
863    // Stores whether incoming data already satisfies the ordering requirement.
864    is_input_pre_ordered: bool,
865    // Ignore null values.
866    ignore_nulls: bool,
867}
868
869impl FirstValueAccumulator {
870    /// Creates a new `FirstValueAccumulator` for the given `data_type`.
871    pub fn try_new(
872        data_type: &DataType,
873        ordering_dtypes: &[DataType],
874        ordering_req: LexOrdering,
875        is_input_pre_ordered: bool,
876        ignore_nulls: bool,
877    ) -> Result<Self> {
878        let orderings = ordering_dtypes
879            .iter()
880            .map(ScalarValue::try_from)
881            .collect::<Result<_>>()?;
882        ScalarValue::try_from(data_type).map(|first| Self {
883            first,
884            is_set: false,
885            orderings,
886            ordering_req,
887            is_input_pre_ordered,
888            ignore_nulls,
889        })
890    }
891
892    // Updates state with the values in the given row.
893    fn update_with_new_row(&mut self, mut row: Vec<ScalarValue>) {
894        // Ensure any Array based scalars hold have a single value to reduce memory pressure
895        for s in row.iter_mut() {
896            s.compact();
897        }
898        self.first = row.remove(0);
899        self.orderings = row;
900        self.is_set = true;
901    }
902
903    fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
904        let [value, ordering_values @ ..] = values else {
905            return internal_err!("Empty row in FIRST_VALUE");
906        };
907        if self.is_input_pre_ordered {
908            // Get first entry according to the pre-existing ordering (0th index):
909            if self.ignore_nulls {
910                // If ignoring nulls, find the first non-null value.
911                for i in 0..value.len() {
912                    if !value.is_null(i) {
913                        return Ok(Some(i));
914                    }
915                }
916                return Ok(None);
917            } else {
918                // If not ignoring nulls, return the first value if it exists.
919                return Ok((!value.is_empty()).then_some(0));
920            }
921        }
922
923        let sort_columns = ordering_values
924            .iter()
925            .zip(self.ordering_req.iter())
926            .map(|(values, req)| SortColumn {
927                values: Arc::clone(values),
928                options: Some(req.options),
929            })
930            .collect::<Vec<_>>();
931
932        let comparator = LexicographicalComparator::try_new(&sort_columns)?;
933
934        let min_index = if self.ignore_nulls {
935            (0..value.len())
936                .filter(|&index| !value.is_null(index))
937                .min_by(|&a, &b| comparator.compare(a, b))
938        } else {
939            (0..value.len()).min_by(|&a, &b| comparator.compare(a, b))
940        };
941
942        Ok(min_index)
943    }
944}
945
946impl Accumulator for FirstValueAccumulator {
947    fn state(&mut self) -> Result<Vec<ScalarValue>> {
948        let mut result = vec![self.first.clone()];
949        result.extend(self.orderings.iter().cloned());
950        result.push(ScalarValue::from(self.is_set));
951        Ok(result)
952    }
953
954    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
955        if let Some(first_idx) = self.get_first_idx(values)? {
956            let row = get_row_at_idx(values, first_idx)?;
957            if !self.is_set
958                || (!self.is_input_pre_ordered
959                    && compare_rows(
960                        &self.orderings,
961                        &row[1..],
962                        &get_sort_options(&self.ordering_req),
963                    )?
964                    .is_gt())
965            {
966                self.update_with_new_row(row);
967            }
968        }
969        Ok(())
970    }
971
972    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
973        // FIRST_VALUE(first1, first2, first3, ...)
974        // last index contains is_set flag.
975        let is_set_idx = states.len() - 1;
976        let flags = states[is_set_idx].as_boolean();
977        let filtered_states =
978            filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
979        // 1..is_set_idx range corresponds to ordering section
980        let sort_columns =
981            convert_to_sort_cols(&filtered_states[1..is_set_idx], &self.ordering_req);
982
983        let comparator = LexicographicalComparator::try_new(&sort_columns)?;
984        let min = (0..filtered_states[0].len()).min_by(|&a, &b| comparator.compare(a, b));
985
986        if let Some(first_idx) = min {
987            let mut first_row = get_row_at_idx(&filtered_states, first_idx)?;
988            // When collecting orderings, we exclude the is_set flag from the state.
989            let first_ordering = &first_row[1..is_set_idx];
990            let sort_options = get_sort_options(&self.ordering_req);
991            // Either there is no existing value, or there is an earlier version in new data.
992            if !self.is_set
993                || compare_rows(&self.orderings, first_ordering, &sort_options)?.is_gt()
994            {
995                // Update with first value in the state. Note that we should exclude the
996                // is_set flag from the state. Otherwise, we will end up with a state
997                // containing two is_set flags.
998                assert!(is_set_idx <= first_row.len());
999                first_row.resize(is_set_idx, ScalarValue::Null);
1000                self.update_with_new_row(first_row);
1001            }
1002        }
1003        Ok(())
1004    }
1005
1006    fn evaluate(&mut self) -> Result<ScalarValue> {
1007        Ok(self.first.clone())
1008    }
1009
1010    fn size(&self) -> usize {
1011        size_of_val(self) - size_of_val(&self.first)
1012            + self.first.size()
1013            + ScalarValue::size_of_vec(&self.orderings)
1014            - size_of_val(&self.orderings)
1015    }
1016}
1017
1018#[user_doc(
1019    doc_section(label = "General Functions"),
1020    description = "Returns the last element in an aggregation group according to the requested ordering. If no ordering is given, returns an arbitrary element from the group.",
1021    syntax_example = "last_value(expression [ORDER BY expression])",
1022    sql_example = r#"```sql
1023> SELECT last_value(column_name ORDER BY other_column) FROM table_name;
1024+-----------------------------------------------+
1025| last_value(column_name ORDER BY other_column) |
1026+-----------------------------------------------+
1027| last_element                                  |
1028+-----------------------------------------------+
1029```"#,
1030    standard_argument(name = "expression",)
1031)]
1032pub struct LastValue {
1033    signature: Signature,
1034    is_input_pre_ordered: bool,
1035}
1036
1037impl Debug for LastValue {
1038    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1039        f.debug_struct("LastValue")
1040            .field("name", &self.name())
1041            .field("signature", &self.signature)
1042            .field("accumulator", &"<FUNC>")
1043            .finish()
1044    }
1045}
1046
1047impl Default for LastValue {
1048    fn default() -> Self {
1049        Self::new()
1050    }
1051}
1052
1053impl LastValue {
1054    pub fn new() -> Self {
1055        Self {
1056            signature: Signature::any(1, Volatility::Immutable),
1057            is_input_pre_ordered: false,
1058        }
1059    }
1060}
1061
1062impl AggregateUDFImpl for LastValue {
1063    fn as_any(&self) -> &dyn Any {
1064        self
1065    }
1066
1067    fn name(&self) -> &str {
1068        "last_value"
1069    }
1070
1071    fn signature(&self) -> &Signature {
1072        &self.signature
1073    }
1074
1075    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1076        Ok(arg_types[0].clone())
1077    }
1078
1079    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
1080        let Some(ordering) = LexOrdering::new(acc_args.order_bys.to_vec()) else {
1081            return TrivialLastValueAccumulator::try_new(
1082                acc_args.return_field.data_type(),
1083                acc_args.ignore_nulls,
1084            )
1085            .map(|acc| Box::new(acc) as _);
1086        };
1087        let ordering_dtypes = ordering
1088            .iter()
1089            .map(|e| e.expr.data_type(acc_args.schema))
1090            .collect::<Result<Vec<_>>>()?;
1091        Ok(Box::new(LastValueAccumulator::try_new(
1092            acc_args.return_field.data_type(),
1093            &ordering_dtypes,
1094            ordering,
1095            self.is_input_pre_ordered,
1096            acc_args.ignore_nulls,
1097        )?))
1098    }
1099
1100    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
1101        let mut fields = vec![Field::new(
1102            format_state_name(args.name, "last_value"),
1103            args.return_field.data_type().clone(),
1104            true,
1105        )
1106        .into()];
1107        fields.extend(args.ordering_fields.iter().cloned());
1108        fields.push(Field::new("is_set", DataType::Boolean, true).into());
1109        Ok(fields)
1110    }
1111
1112    fn with_beneficial_ordering(
1113        self: Arc<Self>,
1114        beneficial_ordering: bool,
1115    ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
1116        Ok(Some(Arc::new(Self {
1117            signature: self.signature.clone(),
1118            is_input_pre_ordered: beneficial_ordering,
1119        })))
1120    }
1121
1122    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
1123        AggregateOrderSensitivity::Beneficial
1124    }
1125
1126    fn reverse_expr(&self) -> ReversedUDAF {
1127        ReversedUDAF::Reversed(first_value_udaf())
1128    }
1129
1130    fn documentation(&self) -> Option<&Documentation> {
1131        self.doc()
1132    }
1133
1134    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
1135        use DataType::*;
1136        !args.order_bys.is_empty()
1137            && matches!(
1138                args.return_field.data_type(),
1139                Int8 | Int16
1140                    | Int32
1141                    | Int64
1142                    | UInt8
1143                    | UInt16
1144                    | UInt32
1145                    | UInt64
1146                    | Float16
1147                    | Float32
1148                    | Float64
1149                    | Decimal128(_, _)
1150                    | Decimal256(_, _)
1151                    | Date32
1152                    | Date64
1153                    | Time32(_)
1154                    | Time64(_)
1155                    | Timestamp(_, _)
1156            )
1157    }
1158
1159    fn create_groups_accumulator(
1160        &self,
1161        args: AccumulatorArgs,
1162    ) -> Result<Box<dyn GroupsAccumulator>> {
1163        fn create_accumulator<T>(
1164            args: AccumulatorArgs,
1165        ) -> Result<Box<dyn GroupsAccumulator>>
1166        where
1167            T: ArrowPrimitiveType + Send,
1168        {
1169            let Some(ordering) = LexOrdering::new(args.order_bys.to_vec()) else {
1170                return internal_err!("Groups accumulator must have an ordering.");
1171            };
1172
1173            let ordering_dtypes = ordering
1174                .iter()
1175                .map(|e| e.expr.data_type(args.schema))
1176                .collect::<Result<Vec<_>>>()?;
1177
1178            Ok(Box::new(FirstPrimitiveGroupsAccumulator::<T>::try_new(
1179                ordering,
1180                args.ignore_nulls,
1181                args.return_field.data_type(),
1182                &ordering_dtypes,
1183                false,
1184            )?))
1185        }
1186
1187        match args.return_field.data_type() {
1188            DataType::Int8 => create_accumulator::<Int8Type>(args),
1189            DataType::Int16 => create_accumulator::<Int16Type>(args),
1190            DataType::Int32 => create_accumulator::<Int32Type>(args),
1191            DataType::Int64 => create_accumulator::<Int64Type>(args),
1192            DataType::UInt8 => create_accumulator::<UInt8Type>(args),
1193            DataType::UInt16 => create_accumulator::<UInt16Type>(args),
1194            DataType::UInt32 => create_accumulator::<UInt32Type>(args),
1195            DataType::UInt64 => create_accumulator::<UInt64Type>(args),
1196            DataType::Float16 => create_accumulator::<Float16Type>(args),
1197            DataType::Float32 => create_accumulator::<Float32Type>(args),
1198            DataType::Float64 => create_accumulator::<Float64Type>(args),
1199
1200            DataType::Decimal128(_, _) => create_accumulator::<Decimal128Type>(args),
1201            DataType::Decimal256(_, _) => create_accumulator::<Decimal256Type>(args),
1202
1203            DataType::Timestamp(TimeUnit::Second, _) => {
1204                create_accumulator::<TimestampSecondType>(args)
1205            }
1206            DataType::Timestamp(TimeUnit::Millisecond, _) => {
1207                create_accumulator::<TimestampMillisecondType>(args)
1208            }
1209            DataType::Timestamp(TimeUnit::Microsecond, _) => {
1210                create_accumulator::<TimestampMicrosecondType>(args)
1211            }
1212            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
1213                create_accumulator::<TimestampNanosecondType>(args)
1214            }
1215
1216            DataType::Date32 => create_accumulator::<Date32Type>(args),
1217            DataType::Date64 => create_accumulator::<Date64Type>(args),
1218            DataType::Time32(TimeUnit::Second) => {
1219                create_accumulator::<Time32SecondType>(args)
1220            }
1221            DataType::Time32(TimeUnit::Millisecond) => {
1222                create_accumulator::<Time32MillisecondType>(args)
1223            }
1224
1225            DataType::Time64(TimeUnit::Microsecond) => {
1226                create_accumulator::<Time64MicrosecondType>(args)
1227            }
1228            DataType::Time64(TimeUnit::Nanosecond) => {
1229                create_accumulator::<Time64NanosecondType>(args)
1230            }
1231
1232            _ => {
1233                internal_err!(
1234                    "GroupsAccumulator not supported for last_value({})",
1235                    args.return_field.data_type()
1236                )
1237            }
1238        }
1239    }
1240
1241    fn equals(&self, other: &dyn AggregateUDFImpl) -> bool {
1242        let Some(other) = other.as_any().downcast_ref::<Self>() else {
1243            return false;
1244        };
1245        let Self {
1246            signature,
1247            is_input_pre_ordered,
1248        } = self;
1249        signature == &other.signature
1250            && is_input_pre_ordered == &other.is_input_pre_ordered
1251    }
1252
1253    fn hash_value(&self) -> u64 {
1254        let Self {
1255            signature,
1256            is_input_pre_ordered,
1257        } = self;
1258        let mut hasher = DefaultHasher::new();
1259        std::any::type_name::<Self>().hash(&mut hasher);
1260        signature.hash(&mut hasher);
1261        is_input_pre_ordered.hash(&mut hasher);
1262        hasher.finish()
1263    }
1264}
1265
1266/// This accumulator is used when there is no ordering specified for the
1267/// `LAST_VALUE` aggregation. It simply updates the last value it sees
1268/// according to the pre-existing ordering of the input data, and provides
1269/// a fast path for this case without needing to maintain any ordering state.
1270#[derive(Debug)]
1271pub struct TrivialLastValueAccumulator {
1272    last: ScalarValue,
1273    // The `is_set` flag keeps track of whether the last value is finalized.
1274    // This information is used to discriminate genuine NULLs and NULLS that
1275    // occur due to empty partitions.
1276    is_set: bool,
1277    // Ignore null values.
1278    ignore_nulls: bool,
1279}
1280
1281impl TrivialLastValueAccumulator {
1282    /// Creates a new `TrivialLastValueAccumulator` for the given `data_type`.
1283    pub fn try_new(data_type: &DataType, ignore_nulls: bool) -> Result<Self> {
1284        ScalarValue::try_from(data_type).map(|last| Self {
1285            last,
1286            is_set: false,
1287            ignore_nulls,
1288        })
1289    }
1290}
1291
1292impl Accumulator for TrivialLastValueAccumulator {
1293    fn state(&mut self) -> Result<Vec<ScalarValue>> {
1294        Ok(vec![self.last.clone(), ScalarValue::from(self.is_set)])
1295    }
1296
1297    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1298        // Get last entry according to the pre-existing ordering (0th index):
1299        let value = &values[0];
1300        let mut last_idx = None;
1301        if self.ignore_nulls {
1302            // If ignoring nulls, find the last non-null value.
1303            for i in (0..value.len()).rev() {
1304                if !value.is_null(i) {
1305                    last_idx = Some(i);
1306                    break;
1307                }
1308            }
1309        } else if !value.is_empty() {
1310            // If not ignoring nulls, return the last value if it exists.
1311            last_idx = Some(value.len() - 1);
1312        }
1313        if let Some(last_idx) = last_idx {
1314            let mut row = get_row_at_idx(values, last_idx)?;
1315            self.last = row.swap_remove(0);
1316            self.last.compact();
1317            self.is_set = true;
1318        }
1319        Ok(())
1320    }
1321
1322    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1323        // LAST_VALUE(last1, last2, last3, ...)
1324        // Second index contains is_set flag.
1325        let flags = states[1].as_boolean();
1326        let filtered_states = filter_states_according_to_is_set(&states[0..1], flags)?;
1327        if let Some(last) = filtered_states.last() {
1328            if !last.is_empty() {
1329                self.last = ScalarValue::try_from_array(last, 0)?;
1330                self.is_set = true;
1331            }
1332        }
1333        Ok(())
1334    }
1335
1336    fn evaluate(&mut self) -> Result<ScalarValue> {
1337        Ok(self.last.clone())
1338    }
1339
1340    fn size(&self) -> usize {
1341        size_of_val(self) - size_of_val(&self.last) + self.last.size()
1342    }
1343}
1344
1345#[derive(Debug)]
1346struct LastValueAccumulator {
1347    last: ScalarValue,
1348    // The `is_set` flag keeps track of whether the last value is finalized.
1349    // This information is used to discriminate genuine NULLs and NULLS that
1350    // occur due to empty partitions.
1351    is_set: bool,
1352    // Stores values of the ordering columns corresponding to the first value.
1353    // These values are used during merging of multiple partitions.
1354    orderings: Vec<ScalarValue>,
1355    // Stores the applicable ordering requirement.
1356    ordering_req: LexOrdering,
1357    // Stores whether incoming data already satisfies the ordering requirement.
1358    is_input_pre_ordered: bool,
1359    // Ignore null values.
1360    ignore_nulls: bool,
1361}
1362
1363impl LastValueAccumulator {
1364    /// Creates a new `LastValueAccumulator` for the given `data_type`.
1365    pub fn try_new(
1366        data_type: &DataType,
1367        ordering_dtypes: &[DataType],
1368        ordering_req: LexOrdering,
1369        is_input_pre_ordered: bool,
1370        ignore_nulls: bool,
1371    ) -> Result<Self> {
1372        let orderings = ordering_dtypes
1373            .iter()
1374            .map(ScalarValue::try_from)
1375            .collect::<Result<_>>()?;
1376        ScalarValue::try_from(data_type).map(|last| Self {
1377            last,
1378            is_set: false,
1379            orderings,
1380            ordering_req,
1381            is_input_pre_ordered,
1382            ignore_nulls,
1383        })
1384    }
1385
1386    // Updates state with the values in the given row.
1387    fn update_with_new_row(&mut self, mut row: Vec<ScalarValue>) {
1388        // Ensure any Array based scalars hold have a single value to reduce memory pressure
1389        for s in row.iter_mut() {
1390            s.compact();
1391        }
1392        self.last = row.remove(0);
1393        self.orderings = row;
1394        self.is_set = true;
1395    }
1396
1397    fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
1398        let [value, ordering_values @ ..] = values else {
1399            return internal_err!("Empty row in LAST_VALUE");
1400        };
1401        if self.is_input_pre_ordered {
1402            // Get last entry according to the order of data:
1403            if self.ignore_nulls {
1404                // If ignoring nulls, find the last non-null value.
1405                for i in (0..value.len()).rev() {
1406                    if !value.is_null(i) {
1407                        return Ok(Some(i));
1408                    }
1409                }
1410                return Ok(None);
1411            } else {
1412                return Ok((!value.is_empty()).then_some(value.len() - 1));
1413            }
1414        }
1415
1416        let sort_columns = ordering_values
1417            .iter()
1418            .zip(self.ordering_req.iter())
1419            .map(|(values, req)| SortColumn {
1420                values: Arc::clone(values),
1421                options: Some(req.options),
1422            })
1423            .collect::<Vec<_>>();
1424
1425        let comparator = LexicographicalComparator::try_new(&sort_columns)?;
1426        let max_ind = if self.ignore_nulls {
1427            (0..value.len())
1428                .filter(|&index| !(value.is_null(index)))
1429                .max_by(|&a, &b| comparator.compare(a, b))
1430        } else {
1431            (0..value.len()).max_by(|&a, &b| comparator.compare(a, b))
1432        };
1433
1434        Ok(max_ind)
1435    }
1436}
1437
1438impl Accumulator for LastValueAccumulator {
1439    fn state(&mut self) -> Result<Vec<ScalarValue>> {
1440        let mut result = vec![self.last.clone()];
1441        result.extend(self.orderings.clone());
1442        result.push(ScalarValue::from(self.is_set));
1443        Ok(result)
1444    }
1445
1446    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1447        if let Some(last_idx) = self.get_last_idx(values)? {
1448            let row = get_row_at_idx(values, last_idx)?;
1449            let orderings = &row[1..];
1450            // Update when there is a more recent entry
1451            if !self.is_set
1452                || self.is_input_pre_ordered
1453                || compare_rows(
1454                    &self.orderings,
1455                    orderings,
1456                    &get_sort_options(&self.ordering_req),
1457                )?
1458                .is_lt()
1459            {
1460                self.update_with_new_row(row);
1461            }
1462        }
1463        Ok(())
1464    }
1465
1466    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1467        // LAST_VALUE(last1, last2, last3, ...)
1468        // last index contains is_set flag.
1469        let is_set_idx = states.len() - 1;
1470        let flags = states[is_set_idx].as_boolean();
1471        let filtered_states =
1472            filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
1473        // 1..is_set_idx range corresponds to ordering section
1474        let sort_columns =
1475            convert_to_sort_cols(&filtered_states[1..is_set_idx], &self.ordering_req);
1476
1477        let comparator = LexicographicalComparator::try_new(&sort_columns)?;
1478        let max = (0..filtered_states[0].len()).max_by(|&a, &b| comparator.compare(a, b));
1479
1480        if let Some(last_idx) = max {
1481            let mut last_row = get_row_at_idx(&filtered_states, last_idx)?;
1482            // When collecting orderings, we exclude the is_set flag from the state.
1483            let last_ordering = &last_row[1..is_set_idx];
1484            let sort_options = get_sort_options(&self.ordering_req);
1485            // Either there is no existing value, or there is a newer (latest)
1486            // version in the new data:
1487            if !self.is_set
1488                || self.is_input_pre_ordered
1489                || compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt()
1490            {
1491                // Update with last value in the state. Note that we should exclude the
1492                // is_set flag from the state. Otherwise, we will end up with a state
1493                // containing two is_set flags.
1494                assert!(is_set_idx <= last_row.len());
1495                last_row.resize(is_set_idx, ScalarValue::Null);
1496                self.update_with_new_row(last_row);
1497            }
1498        }
1499        Ok(())
1500    }
1501
1502    fn evaluate(&mut self) -> Result<ScalarValue> {
1503        Ok(self.last.clone())
1504    }
1505
1506    fn size(&self) -> usize {
1507        size_of_val(self) - size_of_val(&self.last)
1508            + self.last.size()
1509            + ScalarValue::size_of_vec(&self.orderings)
1510            - size_of_val(&self.orderings)
1511    }
1512}
1513
1514/// Filters states according to the `is_set` flag at the last column and returns
1515/// the resulting states.
1516fn filter_states_according_to_is_set(
1517    states: &[ArrayRef],
1518    flags: &BooleanArray,
1519) -> Result<Vec<ArrayRef>> {
1520    states
1521        .iter()
1522        .map(|state| compute::filter(state, flags).map_err(|e| arrow_datafusion_err!(e)))
1523        .collect()
1524}
1525
1526/// Combines array refs and their corresponding orderings to construct `SortColumn`s.
1527fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> Vec<SortColumn> {
1528    arrs.iter()
1529        .zip(sort_exprs.iter())
1530        .map(|(item, sort_expr)| SortColumn {
1531            values: Arc::clone(item),
1532            options: Some(sort_expr.options),
1533        })
1534        .collect()
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539    use std::iter::repeat_with;
1540
1541    use arrow::{
1542        array::{Int64Array, ListArray},
1543        compute::SortOptions,
1544        datatypes::Schema,
1545    };
1546    use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
1547
1548    use super::*;
1549
1550    #[test]
1551    fn test_first_last_value_value() -> Result<()> {
1552        let mut first_accumulator =
1553            TrivialFirstValueAccumulator::try_new(&DataType::Int64, false)?;
1554        let mut last_accumulator =
1555            TrivialLastValueAccumulator::try_new(&DataType::Int64, false)?;
1556        // first value in the tuple is start of the range (inclusive),
1557        // second value in the tuple is end of the range (exclusive)
1558        let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
1559        // create 3 ArrayRefs between each interval e.g from 0 to 9, 1 to 10, 2 to 12
1560        let arrs = ranges
1561            .into_iter()
1562            .map(|(start, end)| {
1563                Arc::new(Int64Array::from((start..end).collect::<Vec<_>>())) as ArrayRef
1564            })
1565            .collect::<Vec<_>>();
1566        for arr in arrs {
1567            // Once first_value is set, accumulator should remember it.
1568            // It shouldn't update first_value for each new batch
1569            first_accumulator.update_batch(&[Arc::clone(&arr)])?;
1570            // last_value should be updated for each new batch.
1571            last_accumulator.update_batch(&[arr])?;
1572        }
1573        // First Value comes from the first value of the first batch which is 0
1574        assert_eq!(first_accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
1575        // Last value comes from the last value of the last batch which is 12
1576        assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(12)));
1577        Ok(())
1578    }
1579
1580    #[test]
1581    fn test_first_last_state_after_merge() -> Result<()> {
1582        let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
1583        // create 3 ArrayRefs between each interval e.g from 0 to 9, 1 to 10, 2 to 12
1584        let arrs = ranges
1585            .into_iter()
1586            .map(|(start, end)| {
1587                Arc::new((start..end).collect::<Int64Array>()) as ArrayRef
1588            })
1589            .collect::<Vec<_>>();
1590
1591        // FirstValueAccumulator
1592        let mut first_accumulator =
1593            TrivialFirstValueAccumulator::try_new(&DataType::Int64, false)?;
1594
1595        first_accumulator.update_batch(&[Arc::clone(&arrs[0])])?;
1596        let state1 = first_accumulator.state()?;
1597
1598        let mut first_accumulator =
1599            TrivialFirstValueAccumulator::try_new(&DataType::Int64, false)?;
1600        first_accumulator.update_batch(&[Arc::clone(&arrs[1])])?;
1601        let state2 = first_accumulator.state()?;
1602
1603        assert_eq!(state1.len(), state2.len());
1604
1605        let mut states = vec![];
1606
1607        for idx in 0..state1.len() {
1608            states.push(compute::concat(&[
1609                &state1[idx].to_array()?,
1610                &state2[idx].to_array()?,
1611            ])?);
1612        }
1613
1614        let mut first_accumulator =
1615            TrivialFirstValueAccumulator::try_new(&DataType::Int64, false)?;
1616        first_accumulator.merge_batch(&states)?;
1617
1618        let merged_state = first_accumulator.state()?;
1619        assert_eq!(merged_state.len(), state1.len());
1620
1621        // LastValueAccumulator
1622        let mut last_accumulator =
1623            TrivialLastValueAccumulator::try_new(&DataType::Int64, false)?;
1624
1625        last_accumulator.update_batch(&[Arc::clone(&arrs[0])])?;
1626        let state1 = last_accumulator.state()?;
1627
1628        let mut last_accumulator =
1629            TrivialLastValueAccumulator::try_new(&DataType::Int64, false)?;
1630        last_accumulator.update_batch(&[Arc::clone(&arrs[1])])?;
1631        let state2 = last_accumulator.state()?;
1632
1633        assert_eq!(state1.len(), state2.len());
1634
1635        let mut states = vec![];
1636
1637        for idx in 0..state1.len() {
1638            states.push(compute::concat(&[
1639                &state1[idx].to_array()?,
1640                &state2[idx].to_array()?,
1641            ])?);
1642        }
1643
1644        let mut last_accumulator =
1645            TrivialLastValueAccumulator::try_new(&DataType::Int64, false)?;
1646        last_accumulator.merge_batch(&states)?;
1647
1648        let merged_state = last_accumulator.state()?;
1649        assert_eq!(merged_state.len(), state1.len());
1650
1651        Ok(())
1652    }
1653
1654    #[test]
1655    fn test_first_group_acc() -> Result<()> {
1656        let schema = Arc::new(Schema::new(vec![
1657            Field::new("a", DataType::Int64, true),
1658            Field::new("b", DataType::Int64, true),
1659            Field::new("c", DataType::Int64, true),
1660            Field::new("d", DataType::Int32, true),
1661            Field::new("e", DataType::Boolean, true),
1662        ]));
1663
1664        let sort_keys = [PhysicalSortExpr {
1665            expr: col("c", &schema).unwrap(),
1666            options: SortOptions::default(),
1667        }];
1668
1669        let mut group_acc = FirstPrimitiveGroupsAccumulator::<Int64Type>::try_new(
1670            sort_keys.into(),
1671            true,
1672            &DataType::Int64,
1673            &[DataType::Int64],
1674            true,
1675        )?;
1676
1677        let mut val_with_orderings = {
1678            let mut val_with_orderings = Vec::<ArrayRef>::new();
1679
1680            let vals = Arc::new(Int64Array::from(vec![Some(1), None, Some(3), Some(-6)]));
1681            let orderings = Arc::new(Int64Array::from(vec![1, -9, 3, -6]));
1682
1683            val_with_orderings.push(vals);
1684            val_with_orderings.push(orderings);
1685
1686            val_with_orderings
1687        };
1688
1689        group_acc.update_batch(
1690            &val_with_orderings,
1691            &[0, 1, 2, 1],
1692            Some(&BooleanArray::from(vec![true, true, false, true])),
1693            3,
1694        )?;
1695        assert_eq!(
1696            group_acc.size_of_orderings,
1697            group_acc.compute_size_of_orderings()
1698        );
1699
1700        let state = group_acc.state(EmitTo::All)?;
1701
1702        let expected_state: Vec<Arc<dyn Array>> = vec![
1703            Arc::new(Int64Array::from(vec![Some(1), Some(-6), None])),
1704            Arc::new(Int64Array::from(vec![Some(1), Some(-6), None])),
1705            Arc::new(BooleanArray::from(vec![true, true, false])),
1706        ];
1707        assert_eq!(state, expected_state);
1708
1709        assert_eq!(
1710            group_acc.size_of_orderings,
1711            group_acc.compute_size_of_orderings()
1712        );
1713
1714        group_acc.merge_batch(
1715            &state,
1716            &[0, 1, 2],
1717            Some(&BooleanArray::from(vec![true, false, false])),
1718            3,
1719        )?;
1720
1721        assert_eq!(
1722            group_acc.size_of_orderings,
1723            group_acc.compute_size_of_orderings()
1724        );
1725
1726        val_with_orderings.clear();
1727        val_with_orderings.push(Arc::new(Int64Array::from(vec![6, 6])));
1728        val_with_orderings.push(Arc::new(Int64Array::from(vec![6, 6])));
1729
1730        group_acc.update_batch(&val_with_orderings, &[1, 2], None, 4)?;
1731
1732        let binding = group_acc.evaluate(EmitTo::All)?;
1733        let eval_result = binding.as_any().downcast_ref::<Int64Array>().unwrap();
1734
1735        let expect: PrimitiveArray<Int64Type> =
1736            Int64Array::from(vec![Some(1), Some(6), Some(6), None]);
1737
1738        assert_eq!(eval_result, &expect);
1739
1740        assert_eq!(
1741            group_acc.size_of_orderings,
1742            group_acc.compute_size_of_orderings()
1743        );
1744
1745        Ok(())
1746    }
1747
1748    #[test]
1749    fn test_group_acc_size_of_ordering() -> Result<()> {
1750        let schema = Arc::new(Schema::new(vec![
1751            Field::new("a", DataType::Int64, true),
1752            Field::new("b", DataType::Int64, true),
1753            Field::new("c", DataType::Int64, true),
1754            Field::new("d", DataType::Int32, true),
1755            Field::new("e", DataType::Boolean, true),
1756        ]));
1757
1758        let sort_keys = [PhysicalSortExpr {
1759            expr: col("c", &schema).unwrap(),
1760            options: SortOptions::default(),
1761        }];
1762
1763        let mut group_acc = FirstPrimitiveGroupsAccumulator::<Int64Type>::try_new(
1764            sort_keys.into(),
1765            true,
1766            &DataType::Int64,
1767            &[DataType::Int64],
1768            true,
1769        )?;
1770
1771        let val_with_orderings = {
1772            let mut val_with_orderings = Vec::<ArrayRef>::new();
1773
1774            let vals = Arc::new(Int64Array::from(vec![Some(1), None, Some(3), Some(-6)]));
1775            let orderings = Arc::new(Int64Array::from(vec![1, -9, 3, -6]));
1776
1777            val_with_orderings.push(vals);
1778            val_with_orderings.push(orderings);
1779
1780            val_with_orderings
1781        };
1782
1783        for _ in 0..10 {
1784            group_acc.update_batch(
1785                &val_with_orderings,
1786                &[0, 1, 2, 1],
1787                Some(&BooleanArray::from(vec![true, true, false, true])),
1788                100,
1789            )?;
1790            assert_eq!(
1791                group_acc.size_of_orderings,
1792                group_acc.compute_size_of_orderings()
1793            );
1794
1795            group_acc.state(EmitTo::First(2))?;
1796            assert_eq!(
1797                group_acc.size_of_orderings,
1798                group_acc.compute_size_of_orderings()
1799            );
1800
1801            let s = group_acc.state(EmitTo::All)?;
1802            assert_eq!(
1803                group_acc.size_of_orderings,
1804                group_acc.compute_size_of_orderings()
1805            );
1806
1807            group_acc.merge_batch(&s, &Vec::from_iter(0..s[0].len()), None, 100)?;
1808            assert_eq!(
1809                group_acc.size_of_orderings,
1810                group_acc.compute_size_of_orderings()
1811            );
1812
1813            group_acc.evaluate(EmitTo::First(2))?;
1814            assert_eq!(
1815                group_acc.size_of_orderings,
1816                group_acc.compute_size_of_orderings()
1817            );
1818
1819            group_acc.evaluate(EmitTo::All)?;
1820            assert_eq!(
1821                group_acc.size_of_orderings,
1822                group_acc.compute_size_of_orderings()
1823            );
1824        }
1825
1826        Ok(())
1827    }
1828
1829    #[test]
1830    fn test_last_group_acc() -> Result<()> {
1831        let schema = Arc::new(Schema::new(vec![
1832            Field::new("a", DataType::Int64, true),
1833            Field::new("b", DataType::Int64, true),
1834            Field::new("c", DataType::Int64, true),
1835            Field::new("d", DataType::Int32, true),
1836            Field::new("e", DataType::Boolean, true),
1837        ]));
1838
1839        let sort_keys = [PhysicalSortExpr {
1840            expr: col("c", &schema).unwrap(),
1841            options: SortOptions::default(),
1842        }];
1843
1844        let mut group_acc = FirstPrimitiveGroupsAccumulator::<Int64Type>::try_new(
1845            sort_keys.into(),
1846            true,
1847            &DataType::Int64,
1848            &[DataType::Int64],
1849            false,
1850        )?;
1851
1852        let mut val_with_orderings = {
1853            let mut val_with_orderings = Vec::<ArrayRef>::new();
1854
1855            let vals = Arc::new(Int64Array::from(vec![Some(1), None, Some(3), Some(-6)]));
1856            let orderings = Arc::new(Int64Array::from(vec![1, -9, 3, -6]));
1857
1858            val_with_orderings.push(vals);
1859            val_with_orderings.push(orderings);
1860
1861            val_with_orderings
1862        };
1863
1864        group_acc.update_batch(
1865            &val_with_orderings,
1866            &[0, 1, 2, 1],
1867            Some(&BooleanArray::from(vec![true, true, false, true])),
1868            3,
1869        )?;
1870
1871        let state = group_acc.state(EmitTo::All)?;
1872
1873        let expected_state: Vec<Arc<dyn Array>> = vec![
1874            Arc::new(Int64Array::from(vec![Some(1), Some(-6), None])),
1875            Arc::new(Int64Array::from(vec![Some(1), Some(-6), None])),
1876            Arc::new(BooleanArray::from(vec![true, true, false])),
1877        ];
1878        assert_eq!(state, expected_state);
1879
1880        group_acc.merge_batch(
1881            &state,
1882            &[0, 1, 2],
1883            Some(&BooleanArray::from(vec![true, false, false])),
1884            3,
1885        )?;
1886
1887        val_with_orderings.clear();
1888        val_with_orderings.push(Arc::new(Int64Array::from(vec![66, 6])));
1889        val_with_orderings.push(Arc::new(Int64Array::from(vec![66, 6])));
1890
1891        group_acc.update_batch(&val_with_orderings, &[1, 2], None, 4)?;
1892
1893        let binding = group_acc.evaluate(EmitTo::All)?;
1894        let eval_result = binding.as_any().downcast_ref::<Int64Array>().unwrap();
1895
1896        let expect: PrimitiveArray<Int64Type> =
1897            Int64Array::from(vec![Some(1), Some(66), Some(6), None]);
1898
1899        assert_eq!(eval_result, &expect);
1900
1901        Ok(())
1902    }
1903
1904    #[test]
1905    fn test_first_list_acc_size() -> Result<()> {
1906        fn size_after_batch(values: &[ArrayRef]) -> Result<usize> {
1907            let mut first_accumulator = TrivialFirstValueAccumulator::try_new(
1908                &DataType::List(Arc::new(Field::new_list_field(DataType::Int64, false))),
1909                false,
1910            )?;
1911
1912            first_accumulator.update_batch(values)?;
1913
1914            Ok(first_accumulator.size())
1915        }
1916
1917        let batch1 = ListArray::from_iter_primitive::<Int32Type, _, _>(
1918            repeat_with(|| Some(vec![Some(1)])).take(10000),
1919        );
1920        let batch2 =
1921            ListArray::from_iter_primitive::<Int32Type, _, _>([Some(vec![Some(1)])]);
1922
1923        let size1 = size_after_batch(&[Arc::new(batch1)])?;
1924        let size2 = size_after_batch(&[Arc::new(batch2)])?;
1925        assert_eq!(size1, size2);
1926
1927        Ok(())
1928    }
1929
1930    #[test]
1931    fn test_last_list_acc_size() -> Result<()> {
1932        fn size_after_batch(values: &[ArrayRef]) -> Result<usize> {
1933            let mut last_accumulator = TrivialLastValueAccumulator::try_new(
1934                &DataType::List(Arc::new(Field::new_list_field(DataType::Int64, false))),
1935                false,
1936            )?;
1937
1938            last_accumulator.update_batch(values)?;
1939
1940            Ok(last_accumulator.size())
1941        }
1942
1943        let batch1 = ListArray::from_iter_primitive::<Int32Type, _, _>(
1944            repeat_with(|| Some(vec![Some(1)])).take(10000),
1945        );
1946        let batch2 =
1947            ListArray::from_iter_primitive::<Int32Type, _, _>([Some(vec![Some(1)])]);
1948
1949        let size1 = size_after_batch(&[Arc::new(batch1)])?;
1950        let size2 = size_after_batch(&[Arc::new(batch2)])?;
1951        assert_eq!(size1, size2);
1952
1953        Ok(())
1954    }
1955}