1use std::any::Any;
21use std::fmt::Debug;
22use std::hash::{DefaultHasher, Hash, Hasher};
23use std::mem::size_of_val;
24use std::sync::Arc;
25
26use arrow::array::{
27 Array, ArrayRef, ArrowPrimitiveType, AsArray, BooleanArray, BooleanBufferBuilder,
28 PrimitiveArray,
29};
30use arrow::buffer::{BooleanBuffer, NullBuffer};
31use arrow::compute::{self, LexicographicalComparator, SortColumn, SortOptions};
32use arrow::datatypes::{
33 DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, FieldRef,
34 Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
35 Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
36 TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
37 TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
38 UInt8Type,
39};
40use datafusion_common::cast::as_boolean_array;
41use datafusion_common::utils::{compare_rows, extract_row_at_idx_to_buf, get_row_at_idx};
42use datafusion_common::{
43 arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
44};
45use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
46use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
47use datafusion_expr::{
48 Accumulator, AggregateUDFImpl, Documentation, EmitTo, Expr, ExprFunctionExt,
49 GroupsAccumulator, ReversedUDAF, Signature, SortExpr, Volatility,
50};
51use datafusion_functions_aggregate_common::utils::get_sort_options;
52use datafusion_macros::user_doc;
53use datafusion_physical_expr_common::sort_expr::LexOrdering;
54
55create_func!(FirstValue, first_value_udaf);
56create_func!(LastValue, last_value_udaf);
57
58pub fn first_value(expression: Expr, order_by: Vec<SortExpr>) -> Expr {
60 first_value_udaf()
61 .call(vec![expression])
62 .order_by(order_by)
63 .build()
64 .unwrap()
66}
67
68pub fn last_value(expression: Expr, order_by: Vec<SortExpr>) -> Expr {
70 last_value_udaf()
71 .call(vec![expression])
72 .order_by(order_by)
73 .build()
74 .unwrap()
76}
77
78#[user_doc(
79 doc_section(label = "General Functions"),
80 description = "Returns the first element in an aggregation group according to the requested ordering. If no ordering is given, returns an arbitrary element from the group.",
81 syntax_example = "first_value(expression [ORDER BY expression])",
82 sql_example = r#"```sql
83> SELECT first_value(column_name ORDER BY other_column) FROM table_name;
84+-----------------------------------------------+
85| first_value(column_name ORDER BY other_column)|
86+-----------------------------------------------+
87| first_element |
88+-----------------------------------------------+
89```"#,
90 standard_argument(name = "expression",)
91)]
92pub struct FirstValue {
93 signature: Signature,
94 is_input_pre_ordered: bool,
95}
96
97impl Debug for FirstValue {
98 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
99 f.debug_struct("FirstValue")
100 .field("name", &self.name())
101 .field("signature", &self.signature)
102 .field("accumulator", &"<FUNC>")
103 .finish()
104 }
105}
106
107impl Default for FirstValue {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113impl FirstValue {
114 pub fn new() -> Self {
115 Self {
116 signature: Signature::any(1, Volatility::Immutable),
117 is_input_pre_ordered: false,
118 }
119 }
120}
121
122impl AggregateUDFImpl for FirstValue {
123 fn as_any(&self) -> &dyn Any {
124 self
125 }
126
127 fn name(&self) -> &str {
128 "first_value"
129 }
130
131 fn signature(&self) -> &Signature {
132 &self.signature
133 }
134
135 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
136 Ok(arg_types[0].clone())
137 }
138
139 fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
140 let Some(ordering) = LexOrdering::new(acc_args.order_bys.to_vec()) else {
141 return TrivialFirstValueAccumulator::try_new(
142 acc_args.return_field.data_type(),
143 acc_args.ignore_nulls,
144 )
145 .map(|acc| Box::new(acc) as _);
146 };
147 let ordering_dtypes = ordering
148 .iter()
149 .map(|e| e.expr.data_type(acc_args.schema))
150 .collect::<Result<Vec<_>>>()?;
151 Ok(Box::new(FirstValueAccumulator::try_new(
152 acc_args.return_field.data_type(),
153 &ordering_dtypes,
154 ordering,
155 self.is_input_pre_ordered,
156 acc_args.ignore_nulls,
157 )?))
158 }
159
160 fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
161 let mut fields = vec![Field::new(
162 format_state_name(args.name, "first_value"),
163 args.return_type().clone(),
164 true,
165 )
166 .into()];
167 fields.extend(args.ordering_fields.iter().cloned());
168 fields.push(Field::new("is_set", DataType::Boolean, true).into());
169 Ok(fields)
170 }
171
172 fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
173 use DataType::*;
174 !args.order_bys.is_empty()
175 && matches!(
176 args.return_field.data_type(),
177 Int8 | Int16
178 | Int32
179 | Int64
180 | UInt8
181 | UInt16
182 | UInt32
183 | UInt64
184 | Float16
185 | Float32
186 | Float64
187 | Decimal128(_, _)
188 | Decimal256(_, _)
189 | Date32
190 | Date64
191 | Time32(_)
192 | Time64(_)
193 | Timestamp(_, _)
194 )
195 }
196
197 fn create_groups_accumulator(
198 &self,
199 args: AccumulatorArgs,
200 ) -> Result<Box<dyn GroupsAccumulator>> {
201 fn create_accumulator<T: ArrowPrimitiveType + Send>(
202 args: AccumulatorArgs,
203 ) -> Result<Box<dyn GroupsAccumulator>> {
204 let Some(ordering) = LexOrdering::new(args.order_bys.to_vec()) else {
205 return internal_err!("Groups accumulator must have an ordering.");
206 };
207
208 let ordering_dtypes = ordering
209 .iter()
210 .map(|e| e.expr.data_type(args.schema))
211 .collect::<Result<Vec<_>>>()?;
212
213 FirstPrimitiveGroupsAccumulator::<T>::try_new(
214 ordering,
215 args.ignore_nulls,
216 args.return_field.data_type(),
217 &ordering_dtypes,
218 true,
219 )
220 .map(|acc| Box::new(acc) as _)
221 }
222
223 match args.return_field.data_type() {
224 DataType::Int8 => create_accumulator::<Int8Type>(args),
225 DataType::Int16 => create_accumulator::<Int16Type>(args),
226 DataType::Int32 => create_accumulator::<Int32Type>(args),
227 DataType::Int64 => create_accumulator::<Int64Type>(args),
228 DataType::UInt8 => create_accumulator::<UInt8Type>(args),
229 DataType::UInt16 => create_accumulator::<UInt16Type>(args),
230 DataType::UInt32 => create_accumulator::<UInt32Type>(args),
231 DataType::UInt64 => create_accumulator::<UInt64Type>(args),
232 DataType::Float16 => create_accumulator::<Float16Type>(args),
233 DataType::Float32 => create_accumulator::<Float32Type>(args),
234 DataType::Float64 => create_accumulator::<Float64Type>(args),
235
236 DataType::Decimal128(_, _) => create_accumulator::<Decimal128Type>(args),
237 DataType::Decimal256(_, _) => create_accumulator::<Decimal256Type>(args),
238
239 DataType::Timestamp(TimeUnit::Second, _) => {
240 create_accumulator::<TimestampSecondType>(args)
241 }
242 DataType::Timestamp(TimeUnit::Millisecond, _) => {
243 create_accumulator::<TimestampMillisecondType>(args)
244 }
245 DataType::Timestamp(TimeUnit::Microsecond, _) => {
246 create_accumulator::<TimestampMicrosecondType>(args)
247 }
248 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
249 create_accumulator::<TimestampNanosecondType>(args)
250 }
251
252 DataType::Date32 => create_accumulator::<Date32Type>(args),
253 DataType::Date64 => create_accumulator::<Date64Type>(args),
254 DataType::Time32(TimeUnit::Second) => {
255 create_accumulator::<Time32SecondType>(args)
256 }
257 DataType::Time32(TimeUnit::Millisecond) => {
258 create_accumulator::<Time32MillisecondType>(args)
259 }
260
261 DataType::Time64(TimeUnit::Microsecond) => {
262 create_accumulator::<Time64MicrosecondType>(args)
263 }
264 DataType::Time64(TimeUnit::Nanosecond) => {
265 create_accumulator::<Time64NanosecondType>(args)
266 }
267
268 _ => internal_err!(
269 "GroupsAccumulator not supported for first_value({})",
270 args.return_field.data_type()
271 ),
272 }
273 }
274
275 fn with_beneficial_ordering(
276 self: Arc<Self>,
277 beneficial_ordering: bool,
278 ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
279 Ok(Some(Arc::new(Self {
280 signature: self.signature.clone(),
281 is_input_pre_ordered: beneficial_ordering,
282 })))
283 }
284
285 fn order_sensitivity(&self) -> AggregateOrderSensitivity {
286 AggregateOrderSensitivity::Beneficial
287 }
288
289 fn reverse_expr(&self) -> ReversedUDAF {
290 ReversedUDAF::Reversed(last_value_udaf())
291 }
292
293 fn documentation(&self) -> Option<&Documentation> {
294 self.doc()
295 }
296
297 fn equals(&self, other: &dyn AggregateUDFImpl) -> bool {
298 let Some(other) = other.as_any().downcast_ref::<Self>() else {
299 return false;
300 };
301 let Self {
302 signature,
303 is_input_pre_ordered,
304 } = self;
305 signature == &other.signature
306 && is_input_pre_ordered == &other.is_input_pre_ordered
307 }
308
309 fn hash_value(&self) -> u64 {
310 let Self {
311 signature,
312 is_input_pre_ordered,
313 } = self;
314 let mut hasher = DefaultHasher::new();
315 std::any::type_name::<Self>().hash(&mut hasher);
316 signature.hash(&mut hasher);
317 is_input_pre_ordered.hash(&mut hasher);
318 hasher.finish()
319 }
320}
321
322struct FirstPrimitiveGroupsAccumulator<T>
324where
325 T: ArrowPrimitiveType + Send,
326{
327 vals: Vec<T::Native>,
329 orderings: Vec<Vec<ScalarValue>>,
334 is_sets: BooleanBufferBuilder,
337 null_builder: BooleanBufferBuilder,
339 size_of_orderings: usize,
344
345 min_of_each_group_buf: (Vec<usize>, BooleanBufferBuilder),
350
351 ordering_req: LexOrdering,
355 pick_first_in_group: bool,
358 sort_options: Vec<SortOptions>,
360 ignore_nulls: bool,
362 data_type: DataType,
364 default_orderings: Vec<ScalarValue>,
365}
366
367impl<T> FirstPrimitiveGroupsAccumulator<T>
368where
369 T: ArrowPrimitiveType + Send,
370{
371 fn try_new(
372 ordering_req: LexOrdering,
373 ignore_nulls: bool,
374 data_type: &DataType,
375 ordering_dtypes: &[DataType],
376 pick_first_in_group: bool,
377 ) -> Result<Self> {
378 let default_orderings = ordering_dtypes
379 .iter()
380 .map(ScalarValue::try_from)
381 .collect::<Result<_>>()?;
382
383 let sort_options = get_sort_options(&ordering_req);
384
385 Ok(Self {
386 null_builder: BooleanBufferBuilder::new(0),
387 ordering_req,
388 sort_options,
389 ignore_nulls,
390 default_orderings,
391 data_type: data_type.clone(),
392 vals: Vec::new(),
393 orderings: Vec::new(),
394 is_sets: BooleanBufferBuilder::new(0),
395 size_of_orderings: 0,
396 min_of_each_group_buf: (Vec::new(), BooleanBufferBuilder::new(0)),
397 pick_first_in_group,
398 })
399 }
400
401 fn should_update_state(
402 &self,
403 group_idx: usize,
404 new_ordering_values: &[ScalarValue],
405 ) -> Result<bool> {
406 if !self.is_sets.get_bit(group_idx) {
407 return Ok(true);
408 }
409
410 assert!(new_ordering_values.len() == self.ordering_req.len());
411 let current_ordering = &self.orderings[group_idx];
412 compare_rows(current_ordering, new_ordering_values, &self.sort_options).map(|x| {
413 if self.pick_first_in_group {
414 x.is_gt()
415 } else {
416 x.is_lt()
417 }
418 })
419 }
420
421 fn take_orderings(&mut self, emit_to: EmitTo) -> Vec<Vec<ScalarValue>> {
422 let result = emit_to.take_needed(&mut self.orderings);
423
424 match emit_to {
425 EmitTo::All => self.size_of_orderings = 0,
426 EmitTo::First(_) => {
427 self.size_of_orderings -=
428 result.iter().map(ScalarValue::size_of_vec).sum::<usize>()
429 }
430 }
431
432 result
433 }
434
435 fn take_need(
436 bool_buf_builder: &mut BooleanBufferBuilder,
437 emit_to: EmitTo,
438 ) -> BooleanBuffer {
439 let bool_buf = bool_buf_builder.finish();
440 match emit_to {
441 EmitTo::All => bool_buf,
442 EmitTo::First(n) => {
443 let first_n: BooleanBuffer = bool_buf.iter().take(n).collect();
448 for b in bool_buf.iter().skip(n) {
450 bool_buf_builder.append(b);
451 }
452 first_n
453 }
454 }
455 }
456
457 fn resize_states(&mut self, new_size: usize) {
458 self.vals.resize(new_size, T::default_value());
459
460 self.null_builder.resize(new_size);
461
462 if self.orderings.len() < new_size {
463 let current_len = self.orderings.len();
464
465 self.orderings
466 .resize(new_size, self.default_orderings.clone());
467
468 self.size_of_orderings += (new_size - current_len)
469 * ScalarValue::size_of_vec(
470 self.orderings.last().unwrap(),
474 );
475 }
476
477 self.is_sets.resize(new_size);
478
479 self.min_of_each_group_buf.0.resize(new_size, 0);
480 self.min_of_each_group_buf.1.resize(new_size);
481 }
482
483 fn update_state(
484 &mut self,
485 group_idx: usize,
486 orderings: &[ScalarValue],
487 new_val: T::Native,
488 is_null: bool,
489 ) {
490 self.vals[group_idx] = new_val;
491 self.is_sets.set_bit(group_idx, true);
492
493 self.null_builder.set_bit(group_idx, !is_null);
494
495 assert!(orderings.len() == self.ordering_req.len());
496 let old_size = ScalarValue::size_of_vec(&self.orderings[group_idx]);
497 self.orderings[group_idx].clear();
498 self.orderings[group_idx].extend_from_slice(orderings);
499 let new_size = ScalarValue::size_of_vec(&self.orderings[group_idx]);
500 self.size_of_orderings = self.size_of_orderings - old_size + new_size;
501 }
502
503 fn take_state(
504 &mut self,
505 emit_to: EmitTo,
506 ) -> (ArrayRef, Vec<Vec<ScalarValue>>, BooleanBuffer) {
507 emit_to.take_needed(&mut self.min_of_each_group_buf.0);
508 self.min_of_each_group_buf
509 .1
510 .truncate(self.min_of_each_group_buf.0.len());
511
512 (
513 self.take_vals_and_null_buf(emit_to),
514 self.take_orderings(emit_to),
515 Self::take_need(&mut self.is_sets, emit_to),
516 )
517 }
518
519 #[cfg(test)]
521 fn compute_size_of_orderings(&self) -> usize {
522 self.orderings
523 .iter()
524 .map(ScalarValue::size_of_vec)
525 .sum::<usize>()
526 }
527 fn get_filtered_min_of_each_group(
532 &mut self,
533 orderings: &[ArrayRef],
534 group_indices: &[usize],
535 opt_filter: Option<&BooleanArray>,
536 vals: &PrimitiveArray<T>,
537 is_set_arr: Option<&BooleanArray>,
538 ) -> Result<Vec<(usize, usize)>> {
539 self.min_of_each_group_buf.1.truncate(0);
541 self.min_of_each_group_buf
542 .1
543 .append_n(self.vals.len(), false);
544
545 let comparator = {
549 assert_eq!(orderings.len(), self.ordering_req.len());
550 let sort_columns = orderings
551 .iter()
552 .zip(self.ordering_req.iter())
553 .map(|(array, req)| SortColumn {
554 values: Arc::clone(array),
555 options: Some(req.options),
556 })
557 .collect::<Vec<_>>();
558
559 LexicographicalComparator::try_new(&sort_columns)?
560 };
561
562 for (idx_in_val, group_idx) in group_indices.iter().enumerate() {
563 let group_idx = *group_idx;
564
565 let passed_filter = opt_filter.is_none_or(|x| x.value(idx_in_val));
566 let is_set = is_set_arr.is_none_or(|x| x.value(idx_in_val));
567
568 if !passed_filter || !is_set {
569 continue;
570 }
571
572 if self.ignore_nulls && vals.is_null(idx_in_val) {
573 continue;
574 }
575
576 let is_valid = self.min_of_each_group_buf.1.get_bit(group_idx);
577
578 if !is_valid {
579 self.min_of_each_group_buf.1.set_bit(group_idx, true);
580 self.min_of_each_group_buf.0[group_idx] = idx_in_val;
581 } else {
582 let ordering = comparator
583 .compare(self.min_of_each_group_buf.0[group_idx], idx_in_val);
584
585 if (ordering.is_gt() && self.pick_first_in_group)
586 || (ordering.is_lt() && !self.pick_first_in_group)
587 {
588 self.min_of_each_group_buf.0[group_idx] = idx_in_val;
589 }
590 }
591 }
592
593 Ok(self
594 .min_of_each_group_buf
595 .0
596 .iter()
597 .enumerate()
598 .filter(|(group_idx, _)| self.min_of_each_group_buf.1.get_bit(*group_idx))
599 .map(|(group_idx, idx_in_val)| (group_idx, *idx_in_val))
600 .collect::<Vec<_>>())
601 }
602
603 fn take_vals_and_null_buf(&mut self, emit_to: EmitTo) -> ArrayRef {
604 let r = emit_to.take_needed(&mut self.vals);
605
606 let null_buf = NullBuffer::new(Self::take_need(&mut self.null_builder, emit_to));
607
608 let values = PrimitiveArray::<T>::new(r.into(), Some(null_buf)) .with_data_type(self.data_type.clone());
610 Arc::new(values)
611 }
612}
613
614impl<T> GroupsAccumulator for FirstPrimitiveGroupsAccumulator<T>
615where
616 T: ArrowPrimitiveType + Send,
617{
618 fn update_batch(
619 &mut self,
620 values_and_order_cols: &[ArrayRef],
622 group_indices: &[usize],
623 opt_filter: Option<&BooleanArray>,
624 total_num_groups: usize,
625 ) -> Result<()> {
626 self.resize_states(total_num_groups);
627
628 let vals = values_and_order_cols[0].as_primitive::<T>();
629
630 let mut ordering_buf = Vec::with_capacity(self.ordering_req.len());
631
632 for (group_idx, idx) in self
634 .get_filtered_min_of_each_group(
635 &values_and_order_cols[1..],
636 group_indices,
637 opt_filter,
638 vals,
639 None,
640 )?
641 .into_iter()
642 {
643 extract_row_at_idx_to_buf(
644 &values_and_order_cols[1..],
645 idx,
646 &mut ordering_buf,
647 )?;
648
649 if self.should_update_state(group_idx, &ordering_buf)? {
650 self.update_state(
651 group_idx,
652 &ordering_buf,
653 vals.value(idx),
654 vals.is_null(idx),
655 );
656 }
657 }
658
659 Ok(())
660 }
661
662 fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
663 Ok(self.take_state(emit_to).0)
664 }
665
666 fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
667 let (val_arr, orderings, is_sets) = self.take_state(emit_to);
668 let mut result = Vec::with_capacity(self.orderings.len() + 2);
669
670 result.push(val_arr);
671
672 let ordering_cols = {
673 let mut ordering_cols = Vec::with_capacity(self.ordering_req.len());
674 for _ in 0..self.ordering_req.len() {
675 ordering_cols.push(Vec::with_capacity(self.orderings.len()));
676 }
677 for row in orderings.into_iter() {
678 assert_eq!(row.len(), self.ordering_req.len());
679 for (col_idx, ordering) in row.into_iter().enumerate() {
680 ordering_cols[col_idx].push(ordering);
681 }
682 }
683
684 ordering_cols
685 };
686 for ordering_col in ordering_cols {
687 result.push(ScalarValue::iter_to_array(ordering_col)?);
688 }
689
690 result.push(Arc::new(BooleanArray::new(is_sets, None)));
691
692 Ok(result)
693 }
694
695 fn merge_batch(
696 &mut self,
697 values: &[ArrayRef],
698 group_indices: &[usize],
699 opt_filter: Option<&BooleanArray>,
700 total_num_groups: usize,
701 ) -> Result<()> {
702 self.resize_states(total_num_groups);
703
704 let mut ordering_buf = Vec::with_capacity(self.ordering_req.len());
705
706 let (is_set_arr, val_and_order_cols) = match values.split_last() {
707 Some(result) => result,
708 None => return internal_err!("Empty row in FIRST_VALUE"),
709 };
710
711 let is_set_arr = as_boolean_array(is_set_arr)?;
712
713 let vals = values[0].as_primitive::<T>();
714 let groups = self.get_filtered_min_of_each_group(
716 &val_and_order_cols[1..],
717 group_indices,
718 opt_filter,
719 vals,
720 Some(is_set_arr),
721 )?;
722
723 for (group_idx, idx) in groups.into_iter() {
724 extract_row_at_idx_to_buf(&val_and_order_cols[1..], idx, &mut ordering_buf)?;
725
726 if self.should_update_state(group_idx, &ordering_buf)? {
727 self.update_state(
728 group_idx,
729 &ordering_buf,
730 vals.value(idx),
731 vals.is_null(idx),
732 );
733 }
734 }
735
736 Ok(())
737 }
738
739 fn size(&self) -> usize {
740 self.vals.capacity() * size_of::<T::Native>()
741 + self.null_builder.capacity() / 8 + self.is_sets.capacity() / 8
743 + self.size_of_orderings
744 + self.min_of_each_group_buf.0.capacity() * size_of::<usize>()
745 + self.min_of_each_group_buf.1.capacity() / 8
746 }
747
748 fn supports_convert_to_state(&self) -> bool {
749 true
750 }
751
752 fn convert_to_state(
753 &self,
754 values: &[ArrayRef],
755 opt_filter: Option<&BooleanArray>,
756 ) -> Result<Vec<ArrayRef>> {
757 let mut result = values.to_vec();
758 match opt_filter {
759 Some(f) => {
760 result.push(Arc::new(f.clone()));
761 Ok(result)
762 }
763 None => {
764 result.push(Arc::new(BooleanArray::from(vec![true; values[0].len()])));
765 Ok(result)
766 }
767 }
768 }
769}
770
771#[derive(Debug)]
776pub struct TrivialFirstValueAccumulator {
777 first: ScalarValue,
778 is_set: bool,
780 ignore_nulls: bool,
782}
783
784impl TrivialFirstValueAccumulator {
785 pub fn try_new(data_type: &DataType, ignore_nulls: bool) -> Result<Self> {
787 ScalarValue::try_from(data_type).map(|first| Self {
788 first,
789 is_set: false,
790 ignore_nulls,
791 })
792 }
793}
794
795impl Accumulator for TrivialFirstValueAccumulator {
796 fn state(&mut self) -> Result<Vec<ScalarValue>> {
797 Ok(vec![self.first.clone(), ScalarValue::from(self.is_set)])
798 }
799
800 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
801 if !self.is_set {
802 let value = &values[0];
804 let mut first_idx = None;
805 if self.ignore_nulls {
806 for i in 0..value.len() {
808 if !value.is_null(i) {
809 first_idx = Some(i);
810 break;
811 }
812 }
813 } else if !value.is_empty() {
814 first_idx = Some(0);
816 }
817 if let Some(first_idx) = first_idx {
818 let mut row = get_row_at_idx(values, first_idx)?;
819 self.first = row.swap_remove(0);
820 self.first.compact();
821 self.is_set = true;
822 }
823 }
824 Ok(())
825 }
826
827 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
828 if !self.is_set {
831 let flags = states[1].as_boolean();
832 let filtered_states =
833 filter_states_according_to_is_set(&states[0..1], flags)?;
834 if let Some(first) = filtered_states.first() {
835 if !first.is_empty() {
836 self.first = ScalarValue::try_from_array(first, 0)?;
837 self.is_set = true;
838 }
839 }
840 }
841 Ok(())
842 }
843
844 fn evaluate(&mut self) -> Result<ScalarValue> {
845 Ok(self.first.clone())
846 }
847
848 fn size(&self) -> usize {
849 size_of_val(self) - size_of_val(&self.first) + self.first.size()
850 }
851}
852
853#[derive(Debug)]
854pub struct FirstValueAccumulator {
855 first: ScalarValue,
856 is_set: bool,
858 orderings: Vec<ScalarValue>,
861 ordering_req: LexOrdering,
863 is_input_pre_ordered: bool,
865 ignore_nulls: bool,
867}
868
869impl FirstValueAccumulator {
870 pub fn try_new(
872 data_type: &DataType,
873 ordering_dtypes: &[DataType],
874 ordering_req: LexOrdering,
875 is_input_pre_ordered: bool,
876 ignore_nulls: bool,
877 ) -> Result<Self> {
878 let orderings = ordering_dtypes
879 .iter()
880 .map(ScalarValue::try_from)
881 .collect::<Result<_>>()?;
882 ScalarValue::try_from(data_type).map(|first| Self {
883 first,
884 is_set: false,
885 orderings,
886 ordering_req,
887 is_input_pre_ordered,
888 ignore_nulls,
889 })
890 }
891
892 fn update_with_new_row(&mut self, mut row: Vec<ScalarValue>) {
894 for s in row.iter_mut() {
896 s.compact();
897 }
898 self.first = row.remove(0);
899 self.orderings = row;
900 self.is_set = true;
901 }
902
903 fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
904 let [value, ordering_values @ ..] = values else {
905 return internal_err!("Empty row in FIRST_VALUE");
906 };
907 if self.is_input_pre_ordered {
908 if self.ignore_nulls {
910 for i in 0..value.len() {
912 if !value.is_null(i) {
913 return Ok(Some(i));
914 }
915 }
916 return Ok(None);
917 } else {
918 return Ok((!value.is_empty()).then_some(0));
920 }
921 }
922
923 let sort_columns = ordering_values
924 .iter()
925 .zip(self.ordering_req.iter())
926 .map(|(values, req)| SortColumn {
927 values: Arc::clone(values),
928 options: Some(req.options),
929 })
930 .collect::<Vec<_>>();
931
932 let comparator = LexicographicalComparator::try_new(&sort_columns)?;
933
934 let min_index = if self.ignore_nulls {
935 (0..value.len())
936 .filter(|&index| !value.is_null(index))
937 .min_by(|&a, &b| comparator.compare(a, b))
938 } else {
939 (0..value.len()).min_by(|&a, &b| comparator.compare(a, b))
940 };
941
942 Ok(min_index)
943 }
944}
945
946impl Accumulator for FirstValueAccumulator {
947 fn state(&mut self) -> Result<Vec<ScalarValue>> {
948 let mut result = vec![self.first.clone()];
949 result.extend(self.orderings.iter().cloned());
950 result.push(ScalarValue::from(self.is_set));
951 Ok(result)
952 }
953
954 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
955 if let Some(first_idx) = self.get_first_idx(values)? {
956 let row = get_row_at_idx(values, first_idx)?;
957 if !self.is_set
958 || (!self.is_input_pre_ordered
959 && compare_rows(
960 &self.orderings,
961 &row[1..],
962 &get_sort_options(&self.ordering_req),
963 )?
964 .is_gt())
965 {
966 self.update_with_new_row(row);
967 }
968 }
969 Ok(())
970 }
971
972 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
973 let is_set_idx = states.len() - 1;
976 let flags = states[is_set_idx].as_boolean();
977 let filtered_states =
978 filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
979 let sort_columns =
981 convert_to_sort_cols(&filtered_states[1..is_set_idx], &self.ordering_req);
982
983 let comparator = LexicographicalComparator::try_new(&sort_columns)?;
984 let min = (0..filtered_states[0].len()).min_by(|&a, &b| comparator.compare(a, b));
985
986 if let Some(first_idx) = min {
987 let mut first_row = get_row_at_idx(&filtered_states, first_idx)?;
988 let first_ordering = &first_row[1..is_set_idx];
990 let sort_options = get_sort_options(&self.ordering_req);
991 if !self.is_set
993 || compare_rows(&self.orderings, first_ordering, &sort_options)?.is_gt()
994 {
995 assert!(is_set_idx <= first_row.len());
999 first_row.resize(is_set_idx, ScalarValue::Null);
1000 self.update_with_new_row(first_row);
1001 }
1002 }
1003 Ok(())
1004 }
1005
1006 fn evaluate(&mut self) -> Result<ScalarValue> {
1007 Ok(self.first.clone())
1008 }
1009
1010 fn size(&self) -> usize {
1011 size_of_val(self) - size_of_val(&self.first)
1012 + self.first.size()
1013 + ScalarValue::size_of_vec(&self.orderings)
1014 - size_of_val(&self.orderings)
1015 }
1016}
1017
1018#[user_doc(
1019 doc_section(label = "General Functions"),
1020 description = "Returns the last element in an aggregation group according to the requested ordering. If no ordering is given, returns an arbitrary element from the group.",
1021 syntax_example = "last_value(expression [ORDER BY expression])",
1022 sql_example = r#"```sql
1023> SELECT last_value(column_name ORDER BY other_column) FROM table_name;
1024+-----------------------------------------------+
1025| last_value(column_name ORDER BY other_column) |
1026+-----------------------------------------------+
1027| last_element |
1028+-----------------------------------------------+
1029```"#,
1030 standard_argument(name = "expression",)
1031)]
1032pub struct LastValue {
1033 signature: Signature,
1034 is_input_pre_ordered: bool,
1035}
1036
1037impl Debug for LastValue {
1038 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1039 f.debug_struct("LastValue")
1040 .field("name", &self.name())
1041 .field("signature", &self.signature)
1042 .field("accumulator", &"<FUNC>")
1043 .finish()
1044 }
1045}
1046
1047impl Default for LastValue {
1048 fn default() -> Self {
1049 Self::new()
1050 }
1051}
1052
1053impl LastValue {
1054 pub fn new() -> Self {
1055 Self {
1056 signature: Signature::any(1, Volatility::Immutable),
1057 is_input_pre_ordered: false,
1058 }
1059 }
1060}
1061
1062impl AggregateUDFImpl for LastValue {
1063 fn as_any(&self) -> &dyn Any {
1064 self
1065 }
1066
1067 fn name(&self) -> &str {
1068 "last_value"
1069 }
1070
1071 fn signature(&self) -> &Signature {
1072 &self.signature
1073 }
1074
1075 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1076 Ok(arg_types[0].clone())
1077 }
1078
1079 fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
1080 let Some(ordering) = LexOrdering::new(acc_args.order_bys.to_vec()) else {
1081 return TrivialLastValueAccumulator::try_new(
1082 acc_args.return_field.data_type(),
1083 acc_args.ignore_nulls,
1084 )
1085 .map(|acc| Box::new(acc) as _);
1086 };
1087 let ordering_dtypes = ordering
1088 .iter()
1089 .map(|e| e.expr.data_type(acc_args.schema))
1090 .collect::<Result<Vec<_>>>()?;
1091 Ok(Box::new(LastValueAccumulator::try_new(
1092 acc_args.return_field.data_type(),
1093 &ordering_dtypes,
1094 ordering,
1095 self.is_input_pre_ordered,
1096 acc_args.ignore_nulls,
1097 )?))
1098 }
1099
1100 fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
1101 let mut fields = vec![Field::new(
1102 format_state_name(args.name, "last_value"),
1103 args.return_field.data_type().clone(),
1104 true,
1105 )
1106 .into()];
1107 fields.extend(args.ordering_fields.iter().cloned());
1108 fields.push(Field::new("is_set", DataType::Boolean, true).into());
1109 Ok(fields)
1110 }
1111
1112 fn with_beneficial_ordering(
1113 self: Arc<Self>,
1114 beneficial_ordering: bool,
1115 ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
1116 Ok(Some(Arc::new(Self {
1117 signature: self.signature.clone(),
1118 is_input_pre_ordered: beneficial_ordering,
1119 })))
1120 }
1121
1122 fn order_sensitivity(&self) -> AggregateOrderSensitivity {
1123 AggregateOrderSensitivity::Beneficial
1124 }
1125
1126 fn reverse_expr(&self) -> ReversedUDAF {
1127 ReversedUDAF::Reversed(first_value_udaf())
1128 }
1129
1130 fn documentation(&self) -> Option<&Documentation> {
1131 self.doc()
1132 }
1133
1134 fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
1135 use DataType::*;
1136 !args.order_bys.is_empty()
1137 && matches!(
1138 args.return_field.data_type(),
1139 Int8 | Int16
1140 | Int32
1141 | Int64
1142 | UInt8
1143 | UInt16
1144 | UInt32
1145 | UInt64
1146 | Float16
1147 | Float32
1148 | Float64
1149 | Decimal128(_, _)
1150 | Decimal256(_, _)
1151 | Date32
1152 | Date64
1153 | Time32(_)
1154 | Time64(_)
1155 | Timestamp(_, _)
1156 )
1157 }
1158
1159 fn create_groups_accumulator(
1160 &self,
1161 args: AccumulatorArgs,
1162 ) -> Result<Box<dyn GroupsAccumulator>> {
1163 fn create_accumulator<T>(
1164 args: AccumulatorArgs,
1165 ) -> Result<Box<dyn GroupsAccumulator>>
1166 where
1167 T: ArrowPrimitiveType + Send,
1168 {
1169 let Some(ordering) = LexOrdering::new(args.order_bys.to_vec()) else {
1170 return internal_err!("Groups accumulator must have an ordering.");
1171 };
1172
1173 let ordering_dtypes = ordering
1174 .iter()
1175 .map(|e| e.expr.data_type(args.schema))
1176 .collect::<Result<Vec<_>>>()?;
1177
1178 Ok(Box::new(FirstPrimitiveGroupsAccumulator::<T>::try_new(
1179 ordering,
1180 args.ignore_nulls,
1181 args.return_field.data_type(),
1182 &ordering_dtypes,
1183 false,
1184 )?))
1185 }
1186
1187 match args.return_field.data_type() {
1188 DataType::Int8 => create_accumulator::<Int8Type>(args),
1189 DataType::Int16 => create_accumulator::<Int16Type>(args),
1190 DataType::Int32 => create_accumulator::<Int32Type>(args),
1191 DataType::Int64 => create_accumulator::<Int64Type>(args),
1192 DataType::UInt8 => create_accumulator::<UInt8Type>(args),
1193 DataType::UInt16 => create_accumulator::<UInt16Type>(args),
1194 DataType::UInt32 => create_accumulator::<UInt32Type>(args),
1195 DataType::UInt64 => create_accumulator::<UInt64Type>(args),
1196 DataType::Float16 => create_accumulator::<Float16Type>(args),
1197 DataType::Float32 => create_accumulator::<Float32Type>(args),
1198 DataType::Float64 => create_accumulator::<Float64Type>(args),
1199
1200 DataType::Decimal128(_, _) => create_accumulator::<Decimal128Type>(args),
1201 DataType::Decimal256(_, _) => create_accumulator::<Decimal256Type>(args),
1202
1203 DataType::Timestamp(TimeUnit::Second, _) => {
1204 create_accumulator::<TimestampSecondType>(args)
1205 }
1206 DataType::Timestamp(TimeUnit::Millisecond, _) => {
1207 create_accumulator::<TimestampMillisecondType>(args)
1208 }
1209 DataType::Timestamp(TimeUnit::Microsecond, _) => {
1210 create_accumulator::<TimestampMicrosecondType>(args)
1211 }
1212 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
1213 create_accumulator::<TimestampNanosecondType>(args)
1214 }
1215
1216 DataType::Date32 => create_accumulator::<Date32Type>(args),
1217 DataType::Date64 => create_accumulator::<Date64Type>(args),
1218 DataType::Time32(TimeUnit::Second) => {
1219 create_accumulator::<Time32SecondType>(args)
1220 }
1221 DataType::Time32(TimeUnit::Millisecond) => {
1222 create_accumulator::<Time32MillisecondType>(args)
1223 }
1224
1225 DataType::Time64(TimeUnit::Microsecond) => {
1226 create_accumulator::<Time64MicrosecondType>(args)
1227 }
1228 DataType::Time64(TimeUnit::Nanosecond) => {
1229 create_accumulator::<Time64NanosecondType>(args)
1230 }
1231
1232 _ => {
1233 internal_err!(
1234 "GroupsAccumulator not supported for last_value({})",
1235 args.return_field.data_type()
1236 )
1237 }
1238 }
1239 }
1240
1241 fn equals(&self, other: &dyn AggregateUDFImpl) -> bool {
1242 let Some(other) = other.as_any().downcast_ref::<Self>() else {
1243 return false;
1244 };
1245 let Self {
1246 signature,
1247 is_input_pre_ordered,
1248 } = self;
1249 signature == &other.signature
1250 && is_input_pre_ordered == &other.is_input_pre_ordered
1251 }
1252
1253 fn hash_value(&self) -> u64 {
1254 let Self {
1255 signature,
1256 is_input_pre_ordered,
1257 } = self;
1258 let mut hasher = DefaultHasher::new();
1259 std::any::type_name::<Self>().hash(&mut hasher);
1260 signature.hash(&mut hasher);
1261 is_input_pre_ordered.hash(&mut hasher);
1262 hasher.finish()
1263 }
1264}
1265
1266#[derive(Debug)]
1271pub struct TrivialLastValueAccumulator {
1272 last: ScalarValue,
1273 is_set: bool,
1277 ignore_nulls: bool,
1279}
1280
1281impl TrivialLastValueAccumulator {
1282 pub fn try_new(data_type: &DataType, ignore_nulls: bool) -> Result<Self> {
1284 ScalarValue::try_from(data_type).map(|last| Self {
1285 last,
1286 is_set: false,
1287 ignore_nulls,
1288 })
1289 }
1290}
1291
1292impl Accumulator for TrivialLastValueAccumulator {
1293 fn state(&mut self) -> Result<Vec<ScalarValue>> {
1294 Ok(vec![self.last.clone(), ScalarValue::from(self.is_set)])
1295 }
1296
1297 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1298 let value = &values[0];
1300 let mut last_idx = None;
1301 if self.ignore_nulls {
1302 for i in (0..value.len()).rev() {
1304 if !value.is_null(i) {
1305 last_idx = Some(i);
1306 break;
1307 }
1308 }
1309 } else if !value.is_empty() {
1310 last_idx = Some(value.len() - 1);
1312 }
1313 if let Some(last_idx) = last_idx {
1314 let mut row = get_row_at_idx(values, last_idx)?;
1315 self.last = row.swap_remove(0);
1316 self.last.compact();
1317 self.is_set = true;
1318 }
1319 Ok(())
1320 }
1321
1322 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1323 let flags = states[1].as_boolean();
1326 let filtered_states = filter_states_according_to_is_set(&states[0..1], flags)?;
1327 if let Some(last) = filtered_states.last() {
1328 if !last.is_empty() {
1329 self.last = ScalarValue::try_from_array(last, 0)?;
1330 self.is_set = true;
1331 }
1332 }
1333 Ok(())
1334 }
1335
1336 fn evaluate(&mut self) -> Result<ScalarValue> {
1337 Ok(self.last.clone())
1338 }
1339
1340 fn size(&self) -> usize {
1341 size_of_val(self) - size_of_val(&self.last) + self.last.size()
1342 }
1343}
1344
1345#[derive(Debug)]
1346struct LastValueAccumulator {
1347 last: ScalarValue,
1348 is_set: bool,
1352 orderings: Vec<ScalarValue>,
1355 ordering_req: LexOrdering,
1357 is_input_pre_ordered: bool,
1359 ignore_nulls: bool,
1361}
1362
1363impl LastValueAccumulator {
1364 pub fn try_new(
1366 data_type: &DataType,
1367 ordering_dtypes: &[DataType],
1368 ordering_req: LexOrdering,
1369 is_input_pre_ordered: bool,
1370 ignore_nulls: bool,
1371 ) -> Result<Self> {
1372 let orderings = ordering_dtypes
1373 .iter()
1374 .map(ScalarValue::try_from)
1375 .collect::<Result<_>>()?;
1376 ScalarValue::try_from(data_type).map(|last| Self {
1377 last,
1378 is_set: false,
1379 orderings,
1380 ordering_req,
1381 is_input_pre_ordered,
1382 ignore_nulls,
1383 })
1384 }
1385
1386 fn update_with_new_row(&mut self, mut row: Vec<ScalarValue>) {
1388 for s in row.iter_mut() {
1390 s.compact();
1391 }
1392 self.last = row.remove(0);
1393 self.orderings = row;
1394 self.is_set = true;
1395 }
1396
1397 fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
1398 let [value, ordering_values @ ..] = values else {
1399 return internal_err!("Empty row in LAST_VALUE");
1400 };
1401 if self.is_input_pre_ordered {
1402 if self.ignore_nulls {
1404 for i in (0..value.len()).rev() {
1406 if !value.is_null(i) {
1407 return Ok(Some(i));
1408 }
1409 }
1410 return Ok(None);
1411 } else {
1412 return Ok((!value.is_empty()).then_some(value.len() - 1));
1413 }
1414 }
1415
1416 let sort_columns = ordering_values
1417 .iter()
1418 .zip(self.ordering_req.iter())
1419 .map(|(values, req)| SortColumn {
1420 values: Arc::clone(values),
1421 options: Some(req.options),
1422 })
1423 .collect::<Vec<_>>();
1424
1425 let comparator = LexicographicalComparator::try_new(&sort_columns)?;
1426 let max_ind = if self.ignore_nulls {
1427 (0..value.len())
1428 .filter(|&index| !(value.is_null(index)))
1429 .max_by(|&a, &b| comparator.compare(a, b))
1430 } else {
1431 (0..value.len()).max_by(|&a, &b| comparator.compare(a, b))
1432 };
1433
1434 Ok(max_ind)
1435 }
1436}
1437
1438impl Accumulator for LastValueAccumulator {
1439 fn state(&mut self) -> Result<Vec<ScalarValue>> {
1440 let mut result = vec![self.last.clone()];
1441 result.extend(self.orderings.clone());
1442 result.push(ScalarValue::from(self.is_set));
1443 Ok(result)
1444 }
1445
1446 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1447 if let Some(last_idx) = self.get_last_idx(values)? {
1448 let row = get_row_at_idx(values, last_idx)?;
1449 let orderings = &row[1..];
1450 if !self.is_set
1452 || self.is_input_pre_ordered
1453 || compare_rows(
1454 &self.orderings,
1455 orderings,
1456 &get_sort_options(&self.ordering_req),
1457 )?
1458 .is_lt()
1459 {
1460 self.update_with_new_row(row);
1461 }
1462 }
1463 Ok(())
1464 }
1465
1466 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1467 let is_set_idx = states.len() - 1;
1470 let flags = states[is_set_idx].as_boolean();
1471 let filtered_states =
1472 filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
1473 let sort_columns =
1475 convert_to_sort_cols(&filtered_states[1..is_set_idx], &self.ordering_req);
1476
1477 let comparator = LexicographicalComparator::try_new(&sort_columns)?;
1478 let max = (0..filtered_states[0].len()).max_by(|&a, &b| comparator.compare(a, b));
1479
1480 if let Some(last_idx) = max {
1481 let mut last_row = get_row_at_idx(&filtered_states, last_idx)?;
1482 let last_ordering = &last_row[1..is_set_idx];
1484 let sort_options = get_sort_options(&self.ordering_req);
1485 if !self.is_set
1488 || self.is_input_pre_ordered
1489 || compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt()
1490 {
1491 assert!(is_set_idx <= last_row.len());
1495 last_row.resize(is_set_idx, ScalarValue::Null);
1496 self.update_with_new_row(last_row);
1497 }
1498 }
1499 Ok(())
1500 }
1501
1502 fn evaluate(&mut self) -> Result<ScalarValue> {
1503 Ok(self.last.clone())
1504 }
1505
1506 fn size(&self) -> usize {
1507 size_of_val(self) - size_of_val(&self.last)
1508 + self.last.size()
1509 + ScalarValue::size_of_vec(&self.orderings)
1510 - size_of_val(&self.orderings)
1511 }
1512}
1513
1514fn filter_states_according_to_is_set(
1517 states: &[ArrayRef],
1518 flags: &BooleanArray,
1519) -> Result<Vec<ArrayRef>> {
1520 states
1521 .iter()
1522 .map(|state| compute::filter(state, flags).map_err(|e| arrow_datafusion_err!(e)))
1523 .collect()
1524}
1525
1526fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> Vec<SortColumn> {
1528 arrs.iter()
1529 .zip(sort_exprs.iter())
1530 .map(|(item, sort_expr)| SortColumn {
1531 values: Arc::clone(item),
1532 options: Some(sort_expr.options),
1533 })
1534 .collect()
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539 use std::iter::repeat_with;
1540
1541 use arrow::{
1542 array::{Int64Array, ListArray},
1543 compute::SortOptions,
1544 datatypes::Schema,
1545 };
1546 use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
1547
1548 use super::*;
1549
1550 #[test]
1551 fn test_first_last_value_value() -> Result<()> {
1552 let mut first_accumulator =
1553 TrivialFirstValueAccumulator::try_new(&DataType::Int64, false)?;
1554 let mut last_accumulator =
1555 TrivialLastValueAccumulator::try_new(&DataType::Int64, false)?;
1556 let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
1559 let arrs = ranges
1561 .into_iter()
1562 .map(|(start, end)| {
1563 Arc::new(Int64Array::from((start..end).collect::<Vec<_>>())) as ArrayRef
1564 })
1565 .collect::<Vec<_>>();
1566 for arr in arrs {
1567 first_accumulator.update_batch(&[Arc::clone(&arr)])?;
1570 last_accumulator.update_batch(&[arr])?;
1572 }
1573 assert_eq!(first_accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
1575 assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(12)));
1577 Ok(())
1578 }
1579
1580 #[test]
1581 fn test_first_last_state_after_merge() -> Result<()> {
1582 let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
1583 let arrs = ranges
1585 .into_iter()
1586 .map(|(start, end)| {
1587 Arc::new((start..end).collect::<Int64Array>()) as ArrayRef
1588 })
1589 .collect::<Vec<_>>();
1590
1591 let mut first_accumulator =
1593 TrivialFirstValueAccumulator::try_new(&DataType::Int64, false)?;
1594
1595 first_accumulator.update_batch(&[Arc::clone(&arrs[0])])?;
1596 let state1 = first_accumulator.state()?;
1597
1598 let mut first_accumulator =
1599 TrivialFirstValueAccumulator::try_new(&DataType::Int64, false)?;
1600 first_accumulator.update_batch(&[Arc::clone(&arrs[1])])?;
1601 let state2 = first_accumulator.state()?;
1602
1603 assert_eq!(state1.len(), state2.len());
1604
1605 let mut states = vec![];
1606
1607 for idx in 0..state1.len() {
1608 states.push(compute::concat(&[
1609 &state1[idx].to_array()?,
1610 &state2[idx].to_array()?,
1611 ])?);
1612 }
1613
1614 let mut first_accumulator =
1615 TrivialFirstValueAccumulator::try_new(&DataType::Int64, false)?;
1616 first_accumulator.merge_batch(&states)?;
1617
1618 let merged_state = first_accumulator.state()?;
1619 assert_eq!(merged_state.len(), state1.len());
1620
1621 let mut last_accumulator =
1623 TrivialLastValueAccumulator::try_new(&DataType::Int64, false)?;
1624
1625 last_accumulator.update_batch(&[Arc::clone(&arrs[0])])?;
1626 let state1 = last_accumulator.state()?;
1627
1628 let mut last_accumulator =
1629 TrivialLastValueAccumulator::try_new(&DataType::Int64, false)?;
1630 last_accumulator.update_batch(&[Arc::clone(&arrs[1])])?;
1631 let state2 = last_accumulator.state()?;
1632
1633 assert_eq!(state1.len(), state2.len());
1634
1635 let mut states = vec![];
1636
1637 for idx in 0..state1.len() {
1638 states.push(compute::concat(&[
1639 &state1[idx].to_array()?,
1640 &state2[idx].to_array()?,
1641 ])?);
1642 }
1643
1644 let mut last_accumulator =
1645 TrivialLastValueAccumulator::try_new(&DataType::Int64, false)?;
1646 last_accumulator.merge_batch(&states)?;
1647
1648 let merged_state = last_accumulator.state()?;
1649 assert_eq!(merged_state.len(), state1.len());
1650
1651 Ok(())
1652 }
1653
1654 #[test]
1655 fn test_first_group_acc() -> Result<()> {
1656 let schema = Arc::new(Schema::new(vec![
1657 Field::new("a", DataType::Int64, true),
1658 Field::new("b", DataType::Int64, true),
1659 Field::new("c", DataType::Int64, true),
1660 Field::new("d", DataType::Int32, true),
1661 Field::new("e", DataType::Boolean, true),
1662 ]));
1663
1664 let sort_keys = [PhysicalSortExpr {
1665 expr: col("c", &schema).unwrap(),
1666 options: SortOptions::default(),
1667 }];
1668
1669 let mut group_acc = FirstPrimitiveGroupsAccumulator::<Int64Type>::try_new(
1670 sort_keys.into(),
1671 true,
1672 &DataType::Int64,
1673 &[DataType::Int64],
1674 true,
1675 )?;
1676
1677 let mut val_with_orderings = {
1678 let mut val_with_orderings = Vec::<ArrayRef>::new();
1679
1680 let vals = Arc::new(Int64Array::from(vec![Some(1), None, Some(3), Some(-6)]));
1681 let orderings = Arc::new(Int64Array::from(vec![1, -9, 3, -6]));
1682
1683 val_with_orderings.push(vals);
1684 val_with_orderings.push(orderings);
1685
1686 val_with_orderings
1687 };
1688
1689 group_acc.update_batch(
1690 &val_with_orderings,
1691 &[0, 1, 2, 1],
1692 Some(&BooleanArray::from(vec![true, true, false, true])),
1693 3,
1694 )?;
1695 assert_eq!(
1696 group_acc.size_of_orderings,
1697 group_acc.compute_size_of_orderings()
1698 );
1699
1700 let state = group_acc.state(EmitTo::All)?;
1701
1702 let expected_state: Vec<Arc<dyn Array>> = vec![
1703 Arc::new(Int64Array::from(vec![Some(1), Some(-6), None])),
1704 Arc::new(Int64Array::from(vec![Some(1), Some(-6), None])),
1705 Arc::new(BooleanArray::from(vec![true, true, false])),
1706 ];
1707 assert_eq!(state, expected_state);
1708
1709 assert_eq!(
1710 group_acc.size_of_orderings,
1711 group_acc.compute_size_of_orderings()
1712 );
1713
1714 group_acc.merge_batch(
1715 &state,
1716 &[0, 1, 2],
1717 Some(&BooleanArray::from(vec![true, false, false])),
1718 3,
1719 )?;
1720
1721 assert_eq!(
1722 group_acc.size_of_orderings,
1723 group_acc.compute_size_of_orderings()
1724 );
1725
1726 val_with_orderings.clear();
1727 val_with_orderings.push(Arc::new(Int64Array::from(vec![6, 6])));
1728 val_with_orderings.push(Arc::new(Int64Array::from(vec![6, 6])));
1729
1730 group_acc.update_batch(&val_with_orderings, &[1, 2], None, 4)?;
1731
1732 let binding = group_acc.evaluate(EmitTo::All)?;
1733 let eval_result = binding.as_any().downcast_ref::<Int64Array>().unwrap();
1734
1735 let expect: PrimitiveArray<Int64Type> =
1736 Int64Array::from(vec![Some(1), Some(6), Some(6), None]);
1737
1738 assert_eq!(eval_result, &expect);
1739
1740 assert_eq!(
1741 group_acc.size_of_orderings,
1742 group_acc.compute_size_of_orderings()
1743 );
1744
1745 Ok(())
1746 }
1747
1748 #[test]
1749 fn test_group_acc_size_of_ordering() -> Result<()> {
1750 let schema = Arc::new(Schema::new(vec![
1751 Field::new("a", DataType::Int64, true),
1752 Field::new("b", DataType::Int64, true),
1753 Field::new("c", DataType::Int64, true),
1754 Field::new("d", DataType::Int32, true),
1755 Field::new("e", DataType::Boolean, true),
1756 ]));
1757
1758 let sort_keys = [PhysicalSortExpr {
1759 expr: col("c", &schema).unwrap(),
1760 options: SortOptions::default(),
1761 }];
1762
1763 let mut group_acc = FirstPrimitiveGroupsAccumulator::<Int64Type>::try_new(
1764 sort_keys.into(),
1765 true,
1766 &DataType::Int64,
1767 &[DataType::Int64],
1768 true,
1769 )?;
1770
1771 let val_with_orderings = {
1772 let mut val_with_orderings = Vec::<ArrayRef>::new();
1773
1774 let vals = Arc::new(Int64Array::from(vec![Some(1), None, Some(3), Some(-6)]));
1775 let orderings = Arc::new(Int64Array::from(vec![1, -9, 3, -6]));
1776
1777 val_with_orderings.push(vals);
1778 val_with_orderings.push(orderings);
1779
1780 val_with_orderings
1781 };
1782
1783 for _ in 0..10 {
1784 group_acc.update_batch(
1785 &val_with_orderings,
1786 &[0, 1, 2, 1],
1787 Some(&BooleanArray::from(vec![true, true, false, true])),
1788 100,
1789 )?;
1790 assert_eq!(
1791 group_acc.size_of_orderings,
1792 group_acc.compute_size_of_orderings()
1793 );
1794
1795 group_acc.state(EmitTo::First(2))?;
1796 assert_eq!(
1797 group_acc.size_of_orderings,
1798 group_acc.compute_size_of_orderings()
1799 );
1800
1801 let s = group_acc.state(EmitTo::All)?;
1802 assert_eq!(
1803 group_acc.size_of_orderings,
1804 group_acc.compute_size_of_orderings()
1805 );
1806
1807 group_acc.merge_batch(&s, &Vec::from_iter(0..s[0].len()), None, 100)?;
1808 assert_eq!(
1809 group_acc.size_of_orderings,
1810 group_acc.compute_size_of_orderings()
1811 );
1812
1813 group_acc.evaluate(EmitTo::First(2))?;
1814 assert_eq!(
1815 group_acc.size_of_orderings,
1816 group_acc.compute_size_of_orderings()
1817 );
1818
1819 group_acc.evaluate(EmitTo::All)?;
1820 assert_eq!(
1821 group_acc.size_of_orderings,
1822 group_acc.compute_size_of_orderings()
1823 );
1824 }
1825
1826 Ok(())
1827 }
1828
1829 #[test]
1830 fn test_last_group_acc() -> Result<()> {
1831 let schema = Arc::new(Schema::new(vec![
1832 Field::new("a", DataType::Int64, true),
1833 Field::new("b", DataType::Int64, true),
1834 Field::new("c", DataType::Int64, true),
1835 Field::new("d", DataType::Int32, true),
1836 Field::new("e", DataType::Boolean, true),
1837 ]));
1838
1839 let sort_keys = [PhysicalSortExpr {
1840 expr: col("c", &schema).unwrap(),
1841 options: SortOptions::default(),
1842 }];
1843
1844 let mut group_acc = FirstPrimitiveGroupsAccumulator::<Int64Type>::try_new(
1845 sort_keys.into(),
1846 true,
1847 &DataType::Int64,
1848 &[DataType::Int64],
1849 false,
1850 )?;
1851
1852 let mut val_with_orderings = {
1853 let mut val_with_orderings = Vec::<ArrayRef>::new();
1854
1855 let vals = Arc::new(Int64Array::from(vec![Some(1), None, Some(3), Some(-6)]));
1856 let orderings = Arc::new(Int64Array::from(vec![1, -9, 3, -6]));
1857
1858 val_with_orderings.push(vals);
1859 val_with_orderings.push(orderings);
1860
1861 val_with_orderings
1862 };
1863
1864 group_acc.update_batch(
1865 &val_with_orderings,
1866 &[0, 1, 2, 1],
1867 Some(&BooleanArray::from(vec![true, true, false, true])),
1868 3,
1869 )?;
1870
1871 let state = group_acc.state(EmitTo::All)?;
1872
1873 let expected_state: Vec<Arc<dyn Array>> = vec![
1874 Arc::new(Int64Array::from(vec![Some(1), Some(-6), None])),
1875 Arc::new(Int64Array::from(vec![Some(1), Some(-6), None])),
1876 Arc::new(BooleanArray::from(vec![true, true, false])),
1877 ];
1878 assert_eq!(state, expected_state);
1879
1880 group_acc.merge_batch(
1881 &state,
1882 &[0, 1, 2],
1883 Some(&BooleanArray::from(vec![true, false, false])),
1884 3,
1885 )?;
1886
1887 val_with_orderings.clear();
1888 val_with_orderings.push(Arc::new(Int64Array::from(vec![66, 6])));
1889 val_with_orderings.push(Arc::new(Int64Array::from(vec![66, 6])));
1890
1891 group_acc.update_batch(&val_with_orderings, &[1, 2], None, 4)?;
1892
1893 let binding = group_acc.evaluate(EmitTo::All)?;
1894 let eval_result = binding.as_any().downcast_ref::<Int64Array>().unwrap();
1895
1896 let expect: PrimitiveArray<Int64Type> =
1897 Int64Array::from(vec![Some(1), Some(66), Some(6), None]);
1898
1899 assert_eq!(eval_result, &expect);
1900
1901 Ok(())
1902 }
1903
1904 #[test]
1905 fn test_first_list_acc_size() -> Result<()> {
1906 fn size_after_batch(values: &[ArrayRef]) -> Result<usize> {
1907 let mut first_accumulator = TrivialFirstValueAccumulator::try_new(
1908 &DataType::List(Arc::new(Field::new_list_field(DataType::Int64, false))),
1909 false,
1910 )?;
1911
1912 first_accumulator.update_batch(values)?;
1913
1914 Ok(first_accumulator.size())
1915 }
1916
1917 let batch1 = ListArray::from_iter_primitive::<Int32Type, _, _>(
1918 repeat_with(|| Some(vec![Some(1)])).take(10000),
1919 );
1920 let batch2 =
1921 ListArray::from_iter_primitive::<Int32Type, _, _>([Some(vec![Some(1)])]);
1922
1923 let size1 = size_after_batch(&[Arc::new(batch1)])?;
1924 let size2 = size_after_batch(&[Arc::new(batch2)])?;
1925 assert_eq!(size1, size2);
1926
1927 Ok(())
1928 }
1929
1930 #[test]
1931 fn test_last_list_acc_size() -> Result<()> {
1932 fn size_after_batch(values: &[ArrayRef]) -> Result<usize> {
1933 let mut last_accumulator = TrivialLastValueAccumulator::try_new(
1934 &DataType::List(Arc::new(Field::new_list_field(DataType::Int64, false))),
1935 false,
1936 )?;
1937
1938 last_accumulator.update_batch(values)?;
1939
1940 Ok(last_accumulator.size())
1941 }
1942
1943 let batch1 = ListArray::from_iter_primitive::<Int32Type, _, _>(
1944 repeat_with(|| Some(vec![Some(1)])).take(10000),
1945 );
1946 let batch2 =
1947 ListArray::from_iter_primitive::<Int32Type, _, _>([Some(vec![Some(1)])]);
1948
1949 let size1 = size_after_batch(&[Arc::new(batch1)])?;
1950 let size2 = size_after_batch(&[Arc::new(batch2)])?;
1951 assert_eq!(size1, size2);
1952
1953 Ok(())
1954 }
1955}