use super::eval::{compare_values, eval_expr};
use super::node::PlanNode;
use super::types::{Row, Schema};
use crate::sql::ast::Expr;
use sochdb_core::Result;
pub struct SortKey {
pub expr: Expr,
pub ascending: bool,
pub nulls_first: bool,
}
pub struct SortNode {
input: Box<dyn PlanNode>,
sort_keys: Vec<SortKey>,
buffer: Option<Vec<Row>>,
pos: usize,
}
impl SortNode {
pub fn new(input: Box<dyn PlanNode>, sort_keys: Vec<SortKey>) -> Self {
Self {
input,
sort_keys,
buffer: None,
pos: 0,
}
}
fn materialize_and_sort(&mut self) -> Result<()> {
if self.buffer.is_some() {
return Ok(());
}
let mut rows = self.input.collect_all()?;
let schema = self.input.schema().clone();
let mut keyed: Vec<(Vec<crate::soch_ql::SochValue>, Row)> = rows
.drain(..)
.map(|row| {
let keys: Vec<crate::soch_ql::SochValue> = self
.sort_keys
.iter()
.map(|sk| {
eval_expr(&sk.expr, &row, &schema)
.unwrap_or(crate::soch_ql::SochValue::Null)
})
.collect();
(keys, row)
})
.collect();
let sort_keys = &self.sort_keys;
keyed.sort_by(|(ka, _), (kb, _)| {
for (i, sk) in sort_keys.iter().enumerate() {
let a = &ka[i];
let b = &kb[i];
let a_null = matches!(a, crate::soch_ql::SochValue::Null);
let b_null = matches!(b, crate::soch_ql::SochValue::Null);
if a_null && b_null {
continue;
}
if a_null {
return if sk.nulls_first {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
};
}
if b_null {
return if sk.nulls_first {
std::cmp::Ordering::Greater
} else {
std::cmp::Ordering::Less
};
}
if let Some(ord) = compare_values(a, b) {
let ord = if sk.ascending { ord } else { ord.reverse() };
if ord != std::cmp::Ordering::Equal {
return ord;
}
}
}
std::cmp::Ordering::Equal
});
self.buffer = Some(keyed.into_iter().map(|(_, row)| row).collect());
Ok(())
}
}
impl PlanNode for SortNode {
fn schema(&self) -> &Schema {
self.input.schema()
}
fn next(&mut self) -> Result<Option<Row>> {
self.materialize_and_sort()?;
if let Some(buf) = &self.buffer {
if self.pos < buf.len() {
let row = buf[self.pos].clone();
self.pos += 1;
Ok(Some(row))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
fn reset(&mut self) -> Result<()> {
self.pos = 0;
Ok(())
}
}