use std::cmp::Ordering;
use super::*;
use crate::array::{DataChunk, DataChunkBuilder, RowRef};
use crate::types::DataType;
pub struct OrderExecutor {
pub order_keys: RecExpr,
pub types: Vec<DataType>,
}
impl OrderExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self, child: BoxedExecutor) {
let mut chunks = vec![];
#[for_await]
for chunk in child {
let chunk = chunk?;
let order_key_chunk = Evaluator::new(&self.order_keys).eval_list(&chunk)?;
chunks.push(order_key_chunk.row_concat(chunk));
}
let mut rows = gen_row_array(&chunks);
let orders = Evaluator::new(&self.order_keys).orders();
rows.sort_unstable_by(|row1, row2| cmp(row1, row2, &orders));
let order_keys_len = self.order_keys.as_ref().last().unwrap().as_list().len();
let mut builder = DataChunkBuilder::new(&self.types, PROCESSING_WINDOW_SIZE);
for row in rows {
if let Some(chunk) = builder.push_row(row.values().skip(order_keys_len)) {
yield chunk;
}
}
if let Some(chunk) = builder.take() {
yield chunk;
}
}
}
fn cmp(row1: &RowRef, row2: &RowRef, orders: &[bool]) -> Ordering {
for ((v1, v2), desc) in row1.values().zip(row2.values()).zip(orders) {
match v1.cmp(&v2) {
Ordering::Equal => continue,
o if *desc => return o.reverse(),
o => return o,
}
}
Ordering::Equal
}
fn gen_row_array(chunks: &[DataChunk]) -> Vec<RowRef<'_>> {
chunks.iter().flat_map(|chunk| chunk.rows()).collect()
}