use std::any::Any;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::sync::Arc;
use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::expressions::format_state_name;
use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
use arrow_array::{Array, ListArray};
use arrow_schema::{Fields, SortOptions};
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
use itertools::izip;
#[derive(Debug)]
pub struct OrderSensitiveArrayAgg {
name: String,
input_data_type: DataType,
order_by_data_types: Vec<DataType>,
expr: Arc<dyn PhysicalExpr>,
ordering_req: LexOrdering,
}
impl OrderSensitiveArrayAgg {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
input_data_type: DataType,
order_by_data_types: Vec<DataType>,
ordering_req: LexOrdering,
) -> Self {
Self {
name: name.into(),
expr,
input_data_type,
order_by_data_types,
ordering_req,
}
}
}
impl AggregateExpr for OrderSensitiveArrayAgg {
fn as_any(&self) -> &dyn Any {
self
}
fn field(&self) -> Result<Field> {
Ok(Field::new_list(
&self.name,
Field::new("item", self.input_data_type.clone(), true),
false,
))
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
)?))
}
fn state_fields(&self) -> Result<Vec<Field>> {
let mut fields = vec![Field::new_list(
format_state_name(&self.name, "array_agg"),
Field::new("item", self.input_data_type.clone(), true),
false,
)];
let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
fields.push(Field::new_list(
format_state_name(&self.name, "array_agg_orderings"),
Field::new("item", DataType::Struct(Fields::from(orderings)), true),
false,
));
Ok(fields)
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
if self.ordering_req.is_empty() {
None
} else {
Some(&self.ordering_req)
}
}
fn name(&self) -> &str {
&self.name
}
}
impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.name == x.name
&& self.input_data_type == x.input_data_type
&& self.order_by_data_types == x.order_by_data_types
&& self.expr.eq(&x.expr)
})
.unwrap_or(false)
}
}
#[derive(Debug)]
pub(crate) struct OrderSensitiveArrayAggAccumulator {
values: Vec<ScalarValue>,
ordering_values: Vec<Vec<ScalarValue>>,
datatypes: Vec<DataType>,
ordering_req: LexOrdering,
}
impl OrderSensitiveArrayAggAccumulator {
pub fn try_new(
datatype: &DataType,
ordering_dtypes: &[DataType],
ordering_req: LexOrdering,
) -> Result<Self> {
let mut datatypes = vec![datatype.clone()];
datatypes.extend(ordering_dtypes.iter().cloned());
Ok(Self {
values: vec![],
ordering_values: vec![],
datatypes,
ordering_req,
})
}
}
impl Accumulator for OrderSensitiveArrayAggAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let n_row = values[0].len();
for index in 0..n_row {
let row = get_row_at_idx(values, index)?;
self.values.push(row[0].clone());
self.ordering_values.push(row[1..].to_vec());
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
let array_agg_values = &states[0];
let agg_orderings = &states[1];
if agg_orderings.as_any().is::<ListArray>() {
let mut partition_values = vec![];
let mut partition_ordering_values = vec![];
partition_values.push(self.values.clone());
partition_ordering_values.push(self.ordering_values.clone());
for index in 0..agg_orderings.len() {
let ordering = ScalarValue::try_from_array(agg_orderings, index)?;
let other_ordering_values =
self.convert_array_agg_to_orderings(ordering)?;
let array_agg_res = ScalarValue::try_from_array(array_agg_values, index)?;
if let ScalarValue::List(Some(other_values), _) = array_agg_res {
partition_values.push(other_values);
partition_ordering_values.push(other_ordering_values);
} else {
return internal_err!("ARRAY_AGG state must be list!");
}
}
let sort_options = self
.ordering_req
.iter()
.map(|sort_expr| sort_expr.options)
.collect::<Vec<_>>();
let (new_values, new_orderings) = merge_ordered_arrays(
&partition_values,
&partition_ordering_values,
&sort_options,
)?;
self.values = new_values;
self.ordering_values = new_orderings;
} else {
return Err(DataFusionError::Execution(
"Expects to receive a list array".to_string(),
));
}
Ok(())
}
fn state(&self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.evaluate()?];
result.push(self.evaluate_orderings()?);
Ok(result)
}
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_list(
Some(self.values.clone()),
self.datatypes[0].clone(),
))
}
fn size(&self) -> usize {
let mut total = std::mem::size_of_val(self)
+ ScalarValue::size_of_vec(&self.values)
- std::mem::size_of_val(&self.values);
total +=
std::mem::size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
for row in &self.ordering_values {
total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row);
}
total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
for dtype in &self.datatypes {
total += dtype.size() - std::mem::size_of_val(dtype);
}
total += std::mem::size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
total
}
}
impl OrderSensitiveArrayAggAccumulator {
fn convert_array_agg_to_orderings(
&self,
in_data: ScalarValue,
) -> Result<Vec<Vec<ScalarValue>>> {
if let ScalarValue::List(Some(list_vals), _field_ref) = in_data {
list_vals.into_iter().map(|struct_vals| {
if let ScalarValue::Struct(Some(orderings), _fields) = struct_vals {
Ok(orderings)
} else {
Err(DataFusionError::Execution(format!(
"Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
struct_vals.get_datatype()
)))
}
}).collect::<Result<Vec<_>>>()
} else {
Err(DataFusionError::Execution(format!(
"Expects to receive ScalarValue::List(Some(..), _) but got:{:?}",
in_data.get_datatype()
)))
}
}
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields.clone());
let orderings = self
.ordering_values
.iter()
.map(|ordering| {
ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
})
.collect();
let struct_type = DataType::Struct(Fields::from(fields));
Ok(ScalarValue::new_list(Some(orderings), struct_type))
}
}
#[derive(Debug, PartialEq, Eq)]
struct CustomElement<'a> {
branch_idx: usize,
value: ScalarValue,
ordering: Vec<ScalarValue>,
sort_options: &'a [SortOptions],
}
impl<'a> CustomElement<'a> {
fn new(
branch_idx: usize,
value: ScalarValue,
ordering: Vec<ScalarValue>,
sort_options: &'a [SortOptions],
) -> Self {
Self {
branch_idx,
value,
ordering,
sort_options,
}
}
fn ordering(
&self,
current: &[ScalarValue],
target: &[ScalarValue],
) -> Result<Ordering> {
compare_rows(current, target, self.sort_options)
}
}
impl<'a> Ord for CustomElement<'a> {
fn cmp(&self, other: &Self) -> Ordering {
self.ordering(&self.ordering, &other.ordering)
.map(|ordering| ordering.reverse())
.unwrap()
}
}
impl<'a> PartialOrd for CustomElement<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
fn merge_ordered_arrays(
values: &[Vec<ScalarValue>],
ordering_values: &[Vec<Vec<ScalarValue>>],
sort_options: &[SortOptions],
) -> Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
let mut heap: BinaryHeap<CustomElement> = BinaryHeap::new();
if !(values.len() == ordering_values.len()
&& values
.iter()
.zip(ordering_values.iter())
.all(|(vals, ordering_vals)| vals.len() == ordering_vals.len()))
{
return Err(DataFusionError::Execution(
"Expects values arguments and/or ordering_values arguments to have same size"
.to_string(),
));
}
let n_branch = values.len();
let mut indices = vec![0_usize; n_branch];
let end_indices = (0..n_branch)
.map(|idx| values[idx].len())
.collect::<Vec<_>>();
let mut merged_values = vec![];
let mut merged_orderings = vec![];
loop {
let min_elem = if let Some(min_elem) = heap.pop() {
min_elem
} else {
for (idx, end_idx, ordering, branch_index) in izip!(
indices.iter(),
end_indices.iter(),
ordering_values.iter(),
0..n_branch
) {
if idx == end_idx {
continue;
}
let elem = CustomElement::new(
branch_index,
values[branch_index][*idx].clone(),
ordering[*idx].to_vec(),
sort_options,
);
heap.push(elem);
}
if let Some(min_elem) = heap.pop() {
min_elem
} else {
break;
}
};
let branch_idx = min_elem.branch_idx;
indices[branch_idx] += 1;
let row_idx = indices[branch_idx];
merged_values.push(min_elem.value.clone());
merged_orderings.push(min_elem.ordering.clone());
if row_idx < end_indices[branch_idx] {
let value = values[branch_idx][row_idx].clone();
let ordering_row = ordering_values[branch_idx][row_idx].to_vec();
let elem = CustomElement::new(branch_idx, value, ordering_row, sort_options);
heap.push(elem);
}
}
Ok((merged_values, merged_orderings))
}
#[cfg(test)]
mod tests {
use crate::aggregate::array_agg_ordered::merge_ordered_arrays;
use arrow_array::{Array, ArrayRef, Int64Array};
use arrow_schema::SortOptions;
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::{Result, ScalarValue};
use std::sync::Arc;
#[test]
fn test_merge_asc() -> Result<()> {
let lhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
];
let n_row = lhs_arrays[0].len();
let lhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
.collect::<Result<Vec<_>>>()?;
let rhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
];
let n_row = rhs_arrays[0].len();
let rhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
.collect::<Result<Vec<_>>>()?;
let sort_options = vec![
SortOptions {
descending: false,
nulls_first: false,
},
SortOptions {
descending: false,
nulls_first: false,
},
];
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
let lhs_vals = (0..lhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
.collect::<Result<Vec<_>>>()?;
let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
let rhs_vals = (0..rhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
.collect::<Result<Vec<_>>>()?;
let expected =
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
let expected_ts = vec![
Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
];
let (merged_vals, merged_ts) = merge_ordered_arrays(
&[lhs_vals, rhs_vals],
&[lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
(0..merged_ts.len())
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
)
})
.collect::<Result<Vec<_>>>()?;
assert_eq!(&merged_vals, &expected);
assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
#[test]
fn test_merge_desc() -> Result<()> {
let lhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
];
let n_row = lhs_arrays[0].len();
let lhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
.collect::<Result<Vec<_>>>()?;
let rhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
];
let n_row = rhs_arrays[0].len();
let rhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
.collect::<Result<Vec<_>>>()?;
let sort_options = vec![
SortOptions {
descending: true,
nulls_first: false,
},
SortOptions {
descending: true,
nulls_first: false,
},
];
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
let lhs_vals = (0..lhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
.collect::<Result<Vec<_>>>()?;
let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
let rhs_vals = (0..rhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
.collect::<Result<Vec<_>>>()?;
let expected =
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
let expected_ts = vec![
Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
];
let (merged_vals, merged_ts) = merge_ordered_arrays(
&[lhs_vals, rhs_vals],
&[lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
(0..merged_ts.len())
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
)
})
.collect::<Result<Vec<_>>>()?;
assert_eq!(&merged_vals, &expected);
assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
}