use super::super::super::ast::{NullsPlacement, OrderItem};
use super::super::super::result::ResultRow;
use super::super::CypherExecutor;
use super::RowStream;
use crate::datatypes::values::Value;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
#[derive(Clone, Copy)]
struct SortSpec {
ascending: bool,
nulls: NullsPlacement,
}
struct HeapEntry {
sort_keys: Vec<Value>,
specs: std::sync::Arc<[SortSpec]>,
row: ResultRow,
}
impl HeapEntry {
fn cmp_better_first(&self, other: &Self) -> Ordering {
for (i, spec) in self.specs.iter().enumerate() {
let a = self.sort_keys.get(i).unwrap_or(&Value::Null);
let b = other.sort_keys.get(i).unwrap_or(&Value::Null);
let a_null = matches!(a, Value::Null);
let b_null = matches!(b, Value::Null);
match (a_null, b_null) {
(true, true) => continue,
(true, false) => {
return match spec.nulls {
NullsPlacement::First => Ordering::Less,
NullsPlacement::Last => Ordering::Greater,
};
}
(false, true) => {
return match spec.nulls {
NullsPlacement::First => Ordering::Greater,
NullsPlacement::Last => Ordering::Less,
};
}
(false, false) => {}
}
let raw =
crate::graph::core::filtering::compare_values(a, b).unwrap_or(Ordering::Equal);
let oriented = if spec.ascending { raw } else { raw.reverse() };
if oriented != Ordering::Equal {
return oriented;
}
}
Ordering::Equal
}
}
impl PartialEq for HeapEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp_better_first(other) == Ordering::Equal
}
}
impl Eq for HeapEntry {}
impl PartialOrd for HeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.cmp_better_first(other)
}
}
pub fn apply<'q>(
executor: &'q CypherExecutor<'q>,
upstream: RowStream<'q>,
order_items: &[OrderItem],
limit: usize,
) -> Result<RowStream<'q>, String> {
let columns = upstream.columns_owned();
if limit == 0 {
for row in upstream {
row?;
}
return Ok(RowStream::from_vec(Vec::new(), columns));
}
let folded_exprs: Vec<_> = order_items
.iter()
.map(|item| executor.fold_constants_expr(&item.expression))
.collect();
let specs: std::sync::Arc<[SortSpec]> = order_items
.iter()
.map(|item| SortSpec {
ascending: item.ascending,
nulls: item.effective_nulls(),
})
.collect::<Vec<_>>()
.into();
let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::with_capacity(limit + 1);
for row in upstream {
let row = row?;
let sort_keys: Vec<Value> = folded_exprs
.iter()
.map(|expr| {
executor
.evaluate_expression(expr, &row)
.unwrap_or(Value::Null)
})
.collect();
let entry = HeapEntry {
sort_keys,
specs: specs.clone(),
row,
};
if heap.len() < limit {
heap.push(entry);
} else {
if let Some(root) = heap.peek() {
if entry.cmp(root) == Ordering::Less {
heap.push(entry);
heap.pop();
}
}
}
}
let entries = heap.into_sorted_vec();
let rows: Vec<ResultRow> = entries.into_iter().map(|e| e.row).collect();
Ok(RowStream::from_vec(rows, columns))
}