quill_sql/execution/physical_plan/
sort.rs1use std::cmp::Ordering as CmpOrdering;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4
5use crate::catalog::SchemaRef;
6use crate::error::QuillSQLError;
7use crate::plan::logical_plan::OrderByExpr;
8use crate::utils::scalar::ScalarValue;
9use crate::{
10 error::QuillSQLResult,
11 execution::{ExecutionContext, VolcanoExecutor},
12 storage::tuple::Tuple,
13};
14
15use super::PhysicalPlan;
16
17#[derive(Debug)]
18pub struct PhysicalSort {
19 pub order_bys: Vec<OrderByExpr>,
20 pub input: Arc<PhysicalPlan>,
21
22 all_tuples: Mutex<Vec<Tuple>>,
23 cursor: AtomicUsize,
24}
25impl PhysicalSort {
26 pub fn new(order_bys: Vec<OrderByExpr>, input: Arc<PhysicalPlan>) -> Self {
27 PhysicalSort {
28 order_bys,
29 input,
30 all_tuples: Mutex::new(Vec::new()),
31 cursor: AtomicUsize::new(0),
32 }
33 }
34
35 fn compare_keys(
36 &self,
37 left: &[ScalarValue],
38 right: &[ScalarValue],
39 ) -> QuillSQLResult<CmpOrdering> {
40 for (idx, order) in self.order_bys.iter().enumerate() {
41 let ordering = if order.asc {
42 left[idx].partial_cmp(&right[idx])
43 } else {
44 right[idx].partial_cmp(&left[idx])
45 }
46 .ok_or_else(|| {
47 QuillSQLError::Execution(format!(
48 "Can not compare {:?} and {:?}",
49 left[idx], right[idx]
50 ))
51 })?;
52 if ordering != CmpOrdering::Equal {
53 return Ok(ordering);
54 }
55 }
56 Ok(CmpOrdering::Equal)
57 }
58}
59
60impl VolcanoExecutor for PhysicalSort {
61 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
62 self.input.init(context)?;
63 *self.all_tuples.lock().unwrap() = vec![];
64 self.cursor.store(0, Ordering::SeqCst);
65 Ok(())
66 }
67
68 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
69 if self.all_tuples.lock().unwrap().is_empty() {
70 let mut keyed_rows: Vec<(Tuple, Vec<ScalarValue>)> = Vec::new();
71 while let Some(tuple) = self.input.next(context)? {
72 let mut keys = Vec::with_capacity(self.order_bys.len());
73 for order in &self.order_bys {
74 keys.push(context.eval_expr(&order.expr, &tuple)?);
75 }
76 keyed_rows.push((tuple, keys));
77 }
78
79 let mut error = None;
80 keyed_rows.sort_by(|(_, left_keys), (_, right_keys)| {
81 match self.compare_keys(left_keys, right_keys) {
82 Ok(ord) => ord,
83 Err(e) => {
84 error = Some(e);
85 CmpOrdering::Equal
86 }
87 }
88 });
89 if let Some(err) = error {
90 return Err(err);
91 }
92
93 let tuples = keyed_rows
94 .into_iter()
95 .map(|(tuple, _)| tuple)
96 .collect::<Vec<_>>();
97 *self.all_tuples.lock().unwrap() = tuples;
98 }
99
100 let cursor = self.cursor.fetch_add(1, Ordering::SeqCst);
101 if cursor >= self.all_tuples.lock().unwrap().len() {
102 Ok(None)
103 } else {
104 Ok(self.all_tuples.lock().unwrap().get(cursor).cloned())
105 }
106 }
107
108 fn output_schema(&self) -> SchemaRef {
109 self.input.output_schema()
110 }
111}
112
113impl std::fmt::Display for PhysicalSort {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 write!(
116 f,
117 "Sort: {}",
118 self.order_bys
119 .iter()
120 .map(|e| format!("{e}"))
121 .collect::<Vec<_>>()
122 .join(", ")
123 )
124 }
125}