quill_sql/execution/physical_plan/
sort.rs1use std::cell::RefCell;
4use std::cmp::Ordering as CmpOrdering;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7
8use crate::catalog::SchemaRef;
9use crate::error::QuillSQLError;
10use crate::plan::logical_plan::OrderByExpr;
11use crate::utils::scalar::ScalarValue;
12use crate::{
13 error::QuillSQLResult,
14 execution::{ExecutionContext, VolcanoExecutor},
15 storage::tuple::Tuple,
16};
17
18use super::PhysicalPlan;
19
20#[derive(Debug)]
21pub struct PhysicalSort {
22 pub order_bys: Vec<OrderByExpr>,
23 pub input: Arc<PhysicalPlan>,
24
25 all_tuples: RefCell<Vec<Tuple>>,
26 cursor: AtomicUsize,
27}
28impl PhysicalSort {
29 pub fn new(order_bys: Vec<OrderByExpr>, input: Arc<PhysicalPlan>) -> Self {
30 PhysicalSort {
31 order_bys,
32 input,
33 all_tuples: RefCell::new(Vec::new()),
34 cursor: AtomicUsize::new(0),
35 }
36 }
37
38 fn compare_keys(
39 &self,
40 left: &[ScalarValue],
41 right: &[ScalarValue],
42 ) -> QuillSQLResult<CmpOrdering> {
43 for (idx, order) in self.order_bys.iter().enumerate() {
44 let ordering = if order.asc {
45 left[idx].partial_cmp(&right[idx])
46 } else {
47 right[idx].partial_cmp(&left[idx])
48 }
49 .ok_or_else(|| {
50 QuillSQLError::Execution(format!(
51 "Can not compare {:?} and {:?}",
52 left[idx], right[idx]
53 ))
54 })?;
55 if ordering != CmpOrdering::Equal {
56 return Ok(ordering);
57 }
58 }
59 Ok(CmpOrdering::Equal)
60 }
61}
62
63impl VolcanoExecutor for PhysicalSort {
64 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
65 self.input.init(context)?;
66 self.all_tuples.borrow_mut().clear();
67 self.cursor.store(0, Ordering::SeqCst);
68 Ok(())
69 }
70
71 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
72 if self.all_tuples.borrow().is_empty() {
73 let mut keyed_rows: Vec<(Tuple, Vec<ScalarValue>)> = Vec::new();
74 while let Some(tuple) = self.input.next(context)? {
75 let mut keys = Vec::with_capacity(self.order_bys.len());
76 for order in &self.order_bys {
77 keys.push(context.eval_expr(&order.expr, &tuple)?);
78 }
79 keyed_rows.push((tuple, keys));
80 }
81
82 let mut error = None;
83 keyed_rows.sort_by(|(_, left_keys), (_, right_keys)| {
84 match self.compare_keys(left_keys, right_keys) {
85 Ok(ord) => ord,
86 Err(e) => {
87 error = Some(e);
88 CmpOrdering::Equal
89 }
90 }
91 });
92 if let Some(err) = error {
93 return Err(err);
94 }
95
96 let tuples = keyed_rows
97 .into_iter()
98 .map(|(tuple, _)| tuple)
99 .collect::<Vec<_>>();
100 *self.all_tuples.borrow_mut() = tuples;
101 }
102
103 let cursor = self.cursor.fetch_add(1, Ordering::SeqCst);
104 let tuples_ref = self.all_tuples.borrow();
105 if cursor >= tuples_ref.len() {
106 Ok(None)
107 } else {
108 Ok(tuples_ref.get(cursor).cloned())
109 }
110 }
111
112 fn output_schema(&self) -> SchemaRef {
113 self.input.output_schema()
114 }
115}
116
117impl std::fmt::Display for PhysicalSort {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 write!(
120 f,
121 "Sort: {}",
122 self.order_bys
123 .iter()
124 .map(|e| format!("{e}"))
125 .collect::<Vec<_>>()
126 .join(", ")
127 )
128 }
129}