use crate::{Result, Tuple, Schema, Value};
use super::{PhysicalOperator, TimeoutContext, compare_values};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::sync::Arc;
struct HeapEntry {
key: Vec<Value>,
asc: Arc<Vec<bool>>,
tuple: Tuple,
}
impl PartialEq for HeapEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for HeapEntry {}
impl PartialOrd for HeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapEntry {
fn cmp(&self, other: &Self) -> Ordering {
for (i, (a, b)) in self.key.iter().zip(other.key.iter()).enumerate() {
let mut c = compare_values(a, b);
if !self.asc.get(i).copied().unwrap_or(true) {
c = c.reverse();
}
if c != Ordering::Equal {
return c;
}
}
Ordering::Equal
}
}
pub struct TopKOperator {
sorted: Vec<Tuple>,
cursor: usize,
schema: Arc<Schema>,
}
impl TopKOperator {
pub fn new(
mut input: Box<dyn PhysicalOperator>,
exprs: Vec<crate::sql::LogicalExpr>,
asc: Vec<bool>,
k: usize,
timeout_ctx: Option<TimeoutContext>,
) -> Result<Self> {
let schema = input.schema();
let evaluator = crate::sql::Evaluator::new(schema.clone());
let asc = Arc::new(asc);
if k == 0 {
return Ok(Self { sorted: Vec::new(), cursor: 0, schema });
}
let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::with_capacity(k + 1);
while let Some(tuple) = input.next()? {
if let Some(ref ctx) = timeout_ctx {
ctx.check_timeout()?;
}
let mut key = Vec::with_capacity(exprs.len());
for expr in &exprs {
match evaluator.evaluate(expr, &tuple) {
Ok(v) => key.push(v),
Err(_) => key.push(Value::Null),
}
}
let entry = HeapEntry { key, asc: Arc::clone(&asc), tuple };
if heap.len() < k {
heap.push(entry);
} else {
let replace = heap.peek().map(|top| entry < *top).unwrap_or(false);
if replace {
heap.pop();
heap.push(entry);
}
}
}
let mut entries: Vec<HeapEntry> = heap.into_sorted_vec();
let sorted: Vec<Tuple> = entries.drain(..).map(|e| e.tuple).collect();
Ok(Self { sorted, cursor: 0, schema })
}
pub fn with_timeout(self, _: Option<TimeoutContext>) -> Self { self }
}
impl PhysicalOperator for TopKOperator {
fn next(&mut self) -> Result<Option<Tuple>> {
let out = self.sorted.get(self.cursor).cloned();
if out.is_some() {
self.cursor += 1;
}
Ok(out)
}
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
}