use arrow::compute::SortOptions;
use datafusion_common::utils::compare_rows;
use datafusion_common::{exec_err, ScalarValue};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
#[derive(Debug, PartialEq, Eq)]
struct CustomElement<'a> {
branch_idx: usize,
value: ScalarValue,
ordering: Vec<ScalarValue>,
sort_options: &'a [SortOptions],
}
impl<'a> CustomElement<'a> {
fn new(
branch_idx: usize,
value: ScalarValue,
ordering: Vec<ScalarValue>,
sort_options: &'a [SortOptions],
) -> Self {
Self {
branch_idx,
value,
ordering,
sort_options,
}
}
fn ordering(
&self,
current: &[ScalarValue],
target: &[ScalarValue],
) -> datafusion_common::Result<Ordering> {
compare_rows(current, target, self.sort_options)
}
}
impl<'a> Ord for CustomElement<'a> {
fn cmp(&self, other: &Self) -> Ordering {
self.ordering(&self.ordering, &other.ordering)
.map(|ordering| ordering.reverse())
.unwrap()
}
}
impl<'a> PartialOrd for CustomElement<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub fn merge_ordered_arrays(
values: &mut [VecDeque<ScalarValue>],
ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
sort_options: &[SortOptions],
) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
let mut heap = BinaryHeap::<CustomElement>::new();
if values.len() != ordering_values.len()
|| values
.iter()
.zip(ordering_values.iter())
.any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
{
return exec_err!(
"Expects values arguments and/or ordering_values arguments to have same size"
);
}
let n_branch = values.len();
let mut merged_values = vec![];
let mut merged_orderings = vec![];
loop {
let minimum = if let Some(minimum) = heap.pop() {
minimum
} else {
for branch_idx in 0..n_branch {
if let Some(orderings) = ordering_values[branch_idx].pop_front() {
let value = values[branch_idx].pop_front().unwrap();
heap.push(CustomElement::new(
branch_idx,
value,
orderings,
sort_options,
));
}
}
if let Some(minimum) = heap.pop() {
minimum
} else {
break;
}
};
let CustomElement {
branch_idx,
value,
ordering,
..
} = minimum;
merged_values.push(value);
merged_orderings.push(ordering);
if let Some(orderings) = ordering_values[branch_idx].pop_front() {
let value = values[branch_idx].pop_front().unwrap();
heap.push(CustomElement::new(
branch_idx,
value,
orderings,
sort_options,
));
}
}
Ok((merged_values, merged_orderings))
}