sochdb_query/executor/
sort.rs1use super::eval::{eval_expr, compare_values};
6use super::node::PlanNode;
7use super::types::{Row, Schema};
8use crate::sql::ast::Expr;
9use sochdb_core::Result;
10
11pub struct SortKey {
13 pub expr: Expr,
14 pub ascending: bool,
16 pub nulls_first: bool,
18}
19
20pub struct SortNode {
30 input: Box<dyn PlanNode>,
31 sort_keys: Vec<SortKey>,
32 buffer: Option<Vec<Row>>,
34 pos: usize,
35}
36
37impl SortNode {
38 pub fn new(input: Box<dyn PlanNode>, sort_keys: Vec<SortKey>) -> Self {
39 Self {
40 input,
41 sort_keys,
42 buffer: None,
43 pos: 0,
44 }
45 }
46
47 fn materialize_and_sort(&mut self) -> Result<()> {
48 if self.buffer.is_some() {
49 return Ok(());
50 }
51
52 let mut rows = self.input.collect_all()?;
54 let schema = self.input.schema().clone();
55
56 let mut keyed: Vec<(Vec<crate::soch_ql::SochValue>, Row)> = rows
58 .drain(..)
59 .map(|row| {
60 let keys: Vec<crate::soch_ql::SochValue> = self
61 .sort_keys
62 .iter()
63 .map(|sk| eval_expr(&sk.expr, &row, &schema).unwrap_or(crate::soch_ql::SochValue::Null))
64 .collect();
65 (keys, row)
66 })
67 .collect();
68
69 let sort_keys = &self.sort_keys;
71 keyed.sort_by(|(ka, _), (kb, _)| {
72 for (i, sk) in sort_keys.iter().enumerate() {
73 let a = &ka[i];
74 let b = &kb[i];
75
76 let a_null = matches!(a, crate::soch_ql::SochValue::Null);
78 let b_null = matches!(b, crate::soch_ql::SochValue::Null);
79
80 if a_null && b_null {
81 continue;
82 }
83 if a_null {
84 return if sk.nulls_first {
85 std::cmp::Ordering::Less
86 } else {
87 std::cmp::Ordering::Greater
88 };
89 }
90 if b_null {
91 return if sk.nulls_first {
92 std::cmp::Ordering::Greater
93 } else {
94 std::cmp::Ordering::Less
95 };
96 }
97
98 if let Some(ord) = compare_values(a, b) {
99 let ord = if sk.ascending { ord } else { ord.reverse() };
100 if ord != std::cmp::Ordering::Equal {
101 return ord;
102 }
103 }
104 }
105 std::cmp::Ordering::Equal
106 });
107
108 self.buffer = Some(keyed.into_iter().map(|(_, row)| row).collect());
109 Ok(())
110 }
111}
112
113impl PlanNode for SortNode {
114 fn schema(&self) -> &Schema {
115 self.input.schema()
116 }
117
118 fn next(&mut self) -> Result<Option<Row>> {
119 self.materialize_and_sort()?;
120
121 if let Some(buf) = &self.buffer {
122 if self.pos < buf.len() {
123 let row = buf[self.pos].clone();
124 self.pos += 1;
125 Ok(Some(row))
126 } else {
127 Ok(None)
128 }
129 } else {
130 Ok(None)
131 }
132 }
133
134 fn reset(&mut self) -> Result<()> {
135 self.pos = 0;
136 Ok(())
138 }
139}