use arrow::compute::SortOptions;
use datafusion_common::utils::compare_rows;
use datafusion_common::{exec_err, ScalarValue};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
#[derive(Debug, PartialEq, Eq)]
struct CustomElement<'a> {
branch_idx: usize,
value: ScalarValue,
ordering: Vec<ScalarValue>,
sort_options: &'a [SortOptions],
}
impl<'a> CustomElement<'a> {
fn new(
branch_idx: usize,
value: ScalarValue,
ordering: Vec<ScalarValue>,
sort_options: &'a [SortOptions],
) -> Self {
Self {
branch_idx,
value,
ordering,
sort_options,
}
}
fn ordering(
&self,
current: &[ScalarValue],
target: &[ScalarValue],
) -> datafusion_common::Result<Ordering> {
compare_rows(current, target, self.sort_options)
}
}
impl Ord for CustomElement<'_> {
fn cmp(&self, other: &Self) -> Ordering {
self.ordering(&self.ordering, &other.ordering)
.map(|ordering| ordering.reverse())
.unwrap()
}
}
impl PartialOrd for CustomElement<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub fn merge_ordered_arrays(
values: &mut [VecDeque<ScalarValue>],
ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
sort_options: &[SortOptions],
) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
let mut heap = BinaryHeap::<CustomElement>::new();
if values.len() != ordering_values.len()
|| values
.iter()
.zip(ordering_values.iter())
.any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
{
return exec_err!(
"Expects values arguments and/or ordering_values arguments to have same size"
);
}
let n_branch = values.len();
let mut merged_values = vec![];
let mut merged_orderings = vec![];
loop {
let minimum = if let Some(minimum) = heap.pop() {
minimum
} else {
for branch_idx in 0..n_branch {
if let Some(orderings) = ordering_values[branch_idx].pop_front() {
let value = values[branch_idx].pop_front().unwrap();
heap.push(CustomElement::new(
branch_idx,
value,
orderings,
sort_options,
));
}
}
if let Some(minimum) = heap.pop() {
minimum
} else {
break;
}
};
let CustomElement {
branch_idx,
value,
ordering,
..
} = minimum;
merged_values.push(value);
merged_orderings.push(ordering);
if let Some(orderings) = ordering_values[branch_idx].pop_front() {
let value = values[branch_idx].pop_front().unwrap();
heap.push(CustomElement::new(
branch_idx,
value,
orderings,
sort_options,
));
}
}
Ok((merged_values, merged_orderings))
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::sync::Arc;
use arrow::array::{ArrayRef, Int64Array};
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::{Result, ScalarValue};
#[test]
fn test_merge_asc() -> Result<()> {
let lhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
];
let n_row = lhs_arrays[0].len();
let lhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let rhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
];
let n_row = rhs_arrays[0].len();
let rhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let sort_options = vec![
SortOptions {
descending: false,
nulls_first: false,
},
SortOptions {
descending: false,
nulls_first: false,
},
];
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
let lhs_vals = (0..lhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
let rhs_vals = (0..rhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let expected =
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
let expected_ts = vec![
Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
];
let (merged_vals, merged_ts) = merge_ordered_arrays(
&mut [lhs_vals, rhs_vals],
&mut [lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
(0..merged_ts.len())
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
)
})
.collect::<Result<Vec<_>>>()?;
assert_eq!(&merged_vals, &expected);
assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
#[test]
fn test_merge_desc() -> Result<()> {
let lhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
];
let n_row = lhs_arrays[0].len();
let lhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let rhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
];
let n_row = rhs_arrays[0].len();
let rhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let sort_options = vec![
SortOptions {
descending: true,
nulls_first: false,
},
SortOptions {
descending: true,
nulls_first: false,
},
];
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
let lhs_vals = (0..lhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
let rhs_vals = (0..rhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let expected =
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
let expected_ts = vec![
Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
];
let (merged_vals, merged_ts) = merge_ordered_arrays(
&mut [lhs_vals, rhs_vals],
&mut [lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
(0..merged_ts.len())
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
)
})
.collect::<Result<Vec<_>>>()?;
assert_eq!(&merged_vals, &expected);
assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
}