use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn, SortOptions};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::function::AccumulatorArgs;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
Accumulator, AggregateUDFImpl, ArrayFunctionSignature, Expr, Signature,
TypeSignature, Volatility,
};
use datafusion_physical_expr_common::aggregate::utils::{
down_cast_any_ref, get_sort_options, ordering_fields,
};
use datafusion_physical_expr_common::aggregate::AggregateExpr;
use datafusion_physical_expr_common::expressions;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr_common::utils::reverse_order_bys;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
make_udaf_expr_and_func!(
FirstValue,
first_value,
"Returns the first value in a group of values.",
first_value_udaf
);
pub struct FirstValue {
signature: Signature,
aliases: Vec<String>,
}
impl Debug for FirstValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("FirstValue")
.field("name", &self.name())
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}
impl Default for FirstValue {
fn default() -> Self {
Self::new()
}
}
impl FirstValue {
pub fn new() -> Self {
Self {
aliases: vec![String::from("FIRST_VALUE")],
signature: Signature::one_of(
vec![
TypeSignature::ArraySignature(ArrayFunctionSignature::Array),
TypeSignature::Uniform(1, NUMERICS.to_vec()),
],
Volatility::Immutable,
),
}
}
}
impl AggregateUDFImpl for FirstValue {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"FIRST_VALUE"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
let mut all_sort_orders = vec![];
let mut sort_exprs = vec![];
for expr in acc_args.sort_exprs {
if let Expr::Sort(sort) = expr {
if let Expr::Column(col) = sort.expr.as_ref() {
let name = &col.name;
let e = expressions::column::col(name, acc_args.schema)?;
sort_exprs.push(PhysicalSortExpr {
expr: e,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
}
}
if !sort_exprs.is_empty() {
all_sort_orders.extend(sort_exprs);
}
let ordering_req = all_sort_orders;
let ordering_dtypes = ordering_req
.iter()
.map(|e| e.expr.data_type(acc_args.schema))
.collect::<Result<Vec<_>>>()?;
let requirement_satisfied = ordering_req.is_empty();
FirstValueAccumulator::try_new(
acc_args.data_type,
&ordering_dtypes,
ordering_req,
acc_args.ignore_nulls,
)
.map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
}
fn state_fields(
&self,
name: &str,
value_type: DataType,
ordering_fields: Vec<Field>,
) -> Result<Vec<Field>> {
let mut fields = vec![Field::new(
format_state_name(name, "first_value"),
value_type,
true,
)];
fields.extend(ordering_fields);
fields.push(Field::new("is_set", DataType::Boolean, true));
Ok(fields)
}
fn aliases(&self) -> &[String] {
&self.aliases
}
}
#[derive(Debug)]
pub struct FirstValueAccumulator {
first: ScalarValue,
is_set: bool,
orderings: Vec<ScalarValue>,
ordering_req: LexOrdering,
requirement_satisfied: bool,
ignore_nulls: bool,
}
impl FirstValueAccumulator {
pub fn try_new(
data_type: &DataType,
ordering_dtypes: &[DataType],
ordering_req: LexOrdering,
ignore_nulls: bool,
) -> Result<Self> {
let orderings = ordering_dtypes
.iter()
.map(ScalarValue::try_from)
.collect::<Result<Vec<_>>>()?;
let requirement_satisfied = ordering_req.is_empty();
ScalarValue::try_from(data_type).map(|first| Self {
first,
is_set: false,
orderings,
ordering_req,
requirement_satisfied,
ignore_nulls,
})
}
pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self {
self.requirement_satisfied = requirement_satisfied;
self
}
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.first = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
}
fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in FIRST_VALUE");
};
if self.requirement_satisfied {
if self.ignore_nulls {
for i in 0..value.len() {
if !value.is_null(i) {
return Ok(Some(i));
}
}
return Ok(None);
} else {
return Ok((!value.is_empty()).then_some(0));
}
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| SortColumn {
values: values.clone(),
options: Some(req.options),
})
.collect::<Vec<_>>();
if self.ignore_nulls {
let indices = lexsort_to_indices(&sort_columns, None)?;
for index in indices.iter().flatten() {
if !value.is_null(index as usize) {
return Ok(Some(index as usize));
}
}
Ok(None)
} else {
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}
}
impl Accumulator for FirstValueAccumulator {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.first.clone()];
result.extend(self.orderings.iter().cloned());
result.push(ScalarValue::Boolean(Some(self.is_set)));
Ok(result)
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if !self.is_set {
if let Some(first_idx) = self.get_first_idx(values)? {
let row = get_row_at_idx(values, first_idx)?;
self.update_with_new_row(&row);
}
} else if !self.requirement_satisfied {
if let Some(first_idx) = self.get_first_idx(values)? {
let row = get_row_at_idx(values, first_idx)?;
let orderings = &row[1..];
if compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_gt()
{
self.update_with_new_row(&row);
}
}
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let is_set_idx = states.len() - 1;
let flags = states[is_set_idx].as_boolean();
let filtered_states = filter_states_according_to_is_set(states, flags)?;
let sort_cols =
convert_to_sort_cols(&filtered_states[1..is_set_idx], &self.ordering_req);
let ordered_states = if sort_cols.is_empty() {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
get_arrayref_at_indices(&filtered_states, &indices)?
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
let first_ordering = &first_row[1..is_set_idx];
let sort_options = get_sort_options(&self.ordering_req);
if !self.is_set
|| compare_rows(&self.orderings, first_ordering, &sort_options)?.is_gt()
{
self.update_with_new_row(&first_row[0..is_set_idx]);
}
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.first.clone())
}
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.first)
+ self.first.size()
+ ScalarValue::size_of_vec(&self.orderings)
- std::mem::size_of_val(&self.orderings)
}
}
#[derive(Debug, Clone)]
pub struct FirstValuePhysicalExpr {
name: String,
input_data_type: DataType,
order_by_data_types: Vec<DataType>,
expr: Arc<dyn PhysicalExpr>,
ordering_req: LexOrdering,
requirement_satisfied: bool,
ignore_nulls: bool,
state_fields: Vec<Field>,
}
impl FirstValuePhysicalExpr {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
input_data_type: DataType,
ordering_req: LexOrdering,
order_by_data_types: Vec<DataType>,
state_fields: Vec<Field>,
) -> Self {
let requirement_satisfied = ordering_req.is_empty();
Self {
name: name.into(),
input_data_type,
order_by_data_types,
expr,
ordering_req,
requirement_satisfied,
ignore_nulls: false,
state_fields,
}
}
pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
self.ignore_nulls = ignore_nulls;
self
}
pub fn name(&self) -> &str {
&self.name
}
pub fn input_data_type(&self) -> &DataType {
&self.input_data_type
}
pub fn order_by_data_types(&self) -> &Vec<DataType> {
&self.order_by_data_types
}
pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
}
pub fn ordering_req(&self) -> &LexOrdering {
&self.ordering_req
}
pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self {
self.requirement_satisfied = requirement_satisfied;
self
}
pub fn convert_to_last(self) -> LastValuePhysicalExpr {
let mut name = format!("LAST{}", &self.name[5..]);
replace_order_by_clause(&mut name);
let FirstValuePhysicalExpr {
expr,
input_data_type,
ordering_req,
order_by_data_types,
..
} = self;
LastValuePhysicalExpr::new(
expr,
name,
input_data_type,
reverse_order_bys(&ordering_req),
order_by_data_types,
)
}
}
impl AggregateExpr for FirstValuePhysicalExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn field(&self) -> Result<Field> {
Ok(Field::new(&self.name, self.input_data_type.clone(), true))
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
FirstValueAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
self.ignore_nulls,
)
.map(|acc| {
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
})
}
fn state_fields(&self) -> Result<Vec<Field>> {
if !self.state_fields.is_empty() {
return Ok(self.state_fields.clone());
}
let mut fields = vec![Field::new(
format_state_name(&self.name, "first_value"),
self.input_data_type.clone(),
true,
)];
fields.extend(ordering_fields(
&self.ordering_req,
&self.order_by_data_types,
));
fields.push(Field::new(
format_state_name(&self.name, "is_set"),
DataType::Boolean,
true,
));
Ok(fields)
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
(!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
fn name(&self) -> &str {
&self.name
}
fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
Some(Arc::new(self.clone().convert_to_last()))
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
FirstValueAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
self.ignore_nulls,
)
.map(|acc| {
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
})
}
}
impl PartialEq<dyn Any> for FirstValuePhysicalExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.name == x.name
&& self.input_data_type == x.input_data_type
&& self.order_by_data_types == x.order_by_data_types
&& self.expr.eq(&x.expr)
})
.unwrap_or(false)
}
}
#[derive(Debug, Clone)]
pub struct LastValuePhysicalExpr {
name: String,
input_data_type: DataType,
order_by_data_types: Vec<DataType>,
expr: Arc<dyn PhysicalExpr>,
ordering_req: LexOrdering,
requirement_satisfied: bool,
ignore_nulls: bool,
}
impl LastValuePhysicalExpr {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
input_data_type: DataType,
ordering_req: LexOrdering,
order_by_data_types: Vec<DataType>,
) -> Self {
let requirement_satisfied = ordering_req.is_empty();
Self {
name: name.into(),
input_data_type,
order_by_data_types,
expr,
ordering_req,
requirement_satisfied,
ignore_nulls: false,
}
}
pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
self.ignore_nulls = ignore_nulls;
self
}
pub fn name(&self) -> &str {
&self.name
}
pub fn input_data_type(&self) -> &DataType {
&self.input_data_type
}
pub fn order_by_data_types(&self) -> &Vec<DataType> {
&self.order_by_data_types
}
pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
}
pub fn ordering_req(&self) -> &LexOrdering {
&self.ordering_req
}
pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self {
self.requirement_satisfied = requirement_satisfied;
self
}
pub fn convert_to_first(self) -> FirstValuePhysicalExpr {
let mut name = format!("FIRST{}", &self.name[4..]);
replace_order_by_clause(&mut name);
let LastValuePhysicalExpr {
expr,
input_data_type,
ordering_req,
order_by_data_types,
..
} = self;
FirstValuePhysicalExpr::new(
expr,
name,
input_data_type,
reverse_order_bys(&ordering_req),
order_by_data_types,
vec![],
)
}
}
impl AggregateExpr for LastValuePhysicalExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn field(&self) -> Result<Field> {
Ok(Field::new(&self.name, self.input_data_type.clone(), true))
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
LastValueAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
self.ignore_nulls,
)
.map(|acc| {
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
})
}
fn state_fields(&self) -> Result<Vec<Field>> {
let mut fields = vec![Field::new(
format_state_name(&self.name, "last_value"),
self.input_data_type.clone(),
true,
)];
fields.extend(ordering_fields(
&self.ordering_req,
&self.order_by_data_types,
));
fields.push(Field::new(
format_state_name(&self.name, "is_set"),
DataType::Boolean,
true,
));
Ok(fields)
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
(!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
fn name(&self) -> &str {
&self.name
}
fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
Some(Arc::new(self.clone().convert_to_first()))
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
LastValueAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
self.ignore_nulls,
)
.map(|acc| {
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
})
}
}
impl PartialEq<dyn Any> for LastValuePhysicalExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.name == x.name
&& self.input_data_type == x.input_data_type
&& self.order_by_data_types == x.order_by_data_types
&& self.expr.eq(&x.expr)
})
.unwrap_or(false)
}
}
#[derive(Debug)]
struct LastValueAccumulator {
last: ScalarValue,
is_set: bool,
orderings: Vec<ScalarValue>,
ordering_req: LexOrdering,
requirement_satisfied: bool,
ignore_nulls: bool,
}
impl LastValueAccumulator {
pub fn try_new(
data_type: &DataType,
ordering_dtypes: &[DataType],
ordering_req: LexOrdering,
ignore_nulls: bool,
) -> Result<Self> {
let orderings = ordering_dtypes
.iter()
.map(ScalarValue::try_from)
.collect::<Result<Vec<_>>>()?;
let requirement_satisfied = ordering_req.is_empty();
ScalarValue::try_from(data_type).map(|last| Self {
last,
is_set: false,
orderings,
ordering_req,
requirement_satisfied,
ignore_nulls,
})
}
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.last = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
}
fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in LAST_VALUE");
};
if self.requirement_satisfied {
if self.ignore_nulls {
for i in (0..value.len()).rev() {
if !value.is_null(i) {
return Ok(Some(i));
}
}
return Ok(None);
} else {
return Ok((!value.is_empty()).then_some(value.len() - 1));
}
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| {
SortColumn {
values: values.clone(),
options: Some(!req.options),
}
})
.collect::<Vec<_>>();
if self.ignore_nulls {
let indices = lexsort_to_indices(&sort_columns, None)?;
for index in indices.iter().flatten() {
if !value.is_null(index as usize) {
return Ok(Some(index as usize));
}
}
Ok(None)
} else {
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}
fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self {
self.requirement_satisfied = requirement_satisfied;
self
}
}
impl Accumulator for LastValueAccumulator {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.last.clone()];
result.extend(self.orderings.clone());
result.push(ScalarValue::Boolean(Some(self.is_set)));
Ok(result)
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if !self.is_set || self.requirement_satisfied {
if let Some(last_idx) = self.get_last_idx(values)? {
let row = get_row_at_idx(values, last_idx)?;
self.update_with_new_row(&row);
}
} else if let Some(last_idx) = self.get_last_idx(values)? {
let row = get_row_at_idx(values, last_idx)?;
let orderings = &row[1..];
if compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_lt()
{
self.update_with_new_row(&row);
}
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let is_set_idx = states.len() - 1;
let flags = states[is_set_idx].as_boolean();
let filtered_states = filter_states_according_to_is_set(states, flags)?;
let sort_cols =
convert_to_sort_cols(&filtered_states[1..is_set_idx], &self.ordering_req);
let ordered_states = if sort_cols.is_empty() {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
get_arrayref_at_indices(&filtered_states, &indices)?
};
if !ordered_states[0].is_empty() {
let last_idx = ordered_states[0].len() - 1;
let last_row = get_row_at_idx(&ordered_states, last_idx)?;
let last_ordering = &last_row[1..is_set_idx];
let sort_options = get_sort_options(&self.ordering_req);
if !self.is_set
|| compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt()
{
self.update_with_new_row(&last_row[0..is_set_idx]);
}
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.last.clone())
}
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.last)
+ self.last.size()
+ ScalarValue::size_of_vec(&self.orderings)
- std::mem::size_of_val(&self.orderings)
}
}
fn filter_states_according_to_is_set(
states: &[ArrayRef],
flags: &BooleanArray,
) -> Result<Vec<ArrayRef>> {
states
.iter()
.map(|state| compute::filter(state, flags).map_err(|e| arrow_datafusion_err!(e)))
.collect::<Result<Vec<_>>>()
}
fn convert_to_sort_cols(
arrs: &[ArrayRef],
sort_exprs: &[PhysicalSortExpr],
) -> Vec<SortColumn> {
arrs.iter()
.zip(sort_exprs.iter())
.map(|(item, sort_expr)| SortColumn {
values: item.clone(),
options: Some(sort_expr.options),
})
.collect::<Vec<_>>()
}
fn replace_order_by_clause(order_by: &mut String) {
let suffixes = [
(" DESC NULLS FIRST]", " ASC NULLS LAST]"),
(" ASC NULLS FIRST]", " DESC NULLS LAST]"),
(" DESC NULLS LAST]", " ASC NULLS FIRST]"),
(" ASC NULLS LAST]", " DESC NULLS FIRST]"),
];
if let Some(start) = order_by.find("ORDER BY [") {
if let Some(end) = order_by[start..].find(']') {
let order_by_start = start + 9;
let order_by_end = start + end;
let column_order = &order_by[order_by_start..=order_by_end];
for &(suffix, replacement) in &suffixes {
if column_order.ends_with(suffix) {
let new_order = column_order.replace(suffix, replacement);
order_by.replace_range(order_by_start..=order_by_end, &new_order);
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use arrow::array::Int64Array;
use super::*;
#[test]
fn test_first_last_value_value() -> Result<()> {
let mut first_accumulator =
FirstValueAccumulator::try_new(&DataType::Int64, &[], vec![], false)?;
let mut last_accumulator =
LastValueAccumulator::try_new(&DataType::Int64, &[], vec![], false)?;
let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
let arrs = ranges
.into_iter()
.map(|(start, end)| {
Arc::new(Int64Array::from((start..end).collect::<Vec<_>>())) as ArrayRef
})
.collect::<Vec<_>>();
for arr in arrs {
first_accumulator.update_batch(&[arr.clone()])?;
last_accumulator.update_batch(&[arr])?;
}
assert_eq!(first_accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(12)));
Ok(())
}
#[test]
fn test_first_last_state_after_merge() -> Result<()> {
let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
let arrs = ranges
.into_iter()
.map(|(start, end)| {
Arc::new((start..end).collect::<Int64Array>()) as ArrayRef
})
.collect::<Vec<_>>();
let mut first_accumulator =
FirstValueAccumulator::try_new(&DataType::Int64, &[], vec![], false)?;
first_accumulator.update_batch(&[arrs[0].clone()])?;
let state1 = first_accumulator.state()?;
let mut first_accumulator =
FirstValueAccumulator::try_new(&DataType::Int64, &[], vec![], false)?;
first_accumulator.update_batch(&[arrs[1].clone()])?;
let state2 = first_accumulator.state()?;
assert_eq!(state1.len(), state2.len());
let mut states = vec![];
for idx in 0..state1.len() {
states.push(arrow::compute::concat(&[
&state1[idx].to_array()?,
&state2[idx].to_array()?,
])?);
}
let mut first_accumulator =
FirstValueAccumulator::try_new(&DataType::Int64, &[], vec![], false)?;
first_accumulator.merge_batch(&states)?;
let merged_state = first_accumulator.state()?;
assert_eq!(merged_state.len(), state1.len());
let mut last_accumulator =
LastValueAccumulator::try_new(&DataType::Int64, &[], vec![], false)?;
last_accumulator.update_batch(&[arrs[0].clone()])?;
let state1 = last_accumulator.state()?;
let mut last_accumulator =
LastValueAccumulator::try_new(&DataType::Int64, &[], vec![], false)?;
last_accumulator.update_batch(&[arrs[1].clone()])?;
let state2 = last_accumulator.state()?;
assert_eq!(state1.len(), state2.len());
let mut states = vec![];
for idx in 0..state1.len() {
states.push(arrow::compute::concat(&[
&state1[idx].to_array()?,
&state2[idx].to_array()?,
])?);
}
let mut last_accumulator =
LastValueAccumulator::try_new(&DataType::Int64, &[], vec![], false)?;
last_accumulator.merge_batch(&states)?;
let merged_state = last_accumulator.state()?;
assert_eq!(merged_state.len(), state1.len());
Ok(())
}
}