quill_sql/execution/physical_plan/
sort.rs

1//! Full in-memory sort operator (teaching-friendly implementation).
2
3use std::cell::RefCell;
4use std::cmp::Ordering as CmpOrdering;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7
8use crate::catalog::SchemaRef;
9use crate::error::QuillSQLError;
10use crate::plan::logical_plan::OrderByExpr;
11use crate::utils::scalar::ScalarValue;
12use crate::{
13    error::QuillSQLResult,
14    execution::{ExecutionContext, VolcanoExecutor},
15    storage::tuple::Tuple,
16};
17
18use super::PhysicalPlan;
19
20#[derive(Debug)]
21pub struct PhysicalSort {
22    pub order_bys: Vec<OrderByExpr>,
23    pub input: Arc<PhysicalPlan>,
24
25    all_tuples: RefCell<Vec<Tuple>>,
26    cursor: AtomicUsize,
27}
28impl PhysicalSort {
29    pub fn new(order_bys: Vec<OrderByExpr>, input: Arc<PhysicalPlan>) -> Self {
30        PhysicalSort {
31            order_bys,
32            input,
33            all_tuples: RefCell::new(Vec::new()),
34            cursor: AtomicUsize::new(0),
35        }
36    }
37
38    fn compare_keys(
39        &self,
40        left: &[ScalarValue],
41        right: &[ScalarValue],
42    ) -> QuillSQLResult<CmpOrdering> {
43        for (idx, order) in self.order_bys.iter().enumerate() {
44            let ordering = if order.asc {
45                left[idx].partial_cmp(&right[idx])
46            } else {
47                right[idx].partial_cmp(&left[idx])
48            }
49            .ok_or_else(|| {
50                QuillSQLError::Execution(format!(
51                    "Can not compare {:?} and {:?}",
52                    left[idx], right[idx]
53                ))
54            })?;
55            if ordering != CmpOrdering::Equal {
56                return Ok(ordering);
57            }
58        }
59        Ok(CmpOrdering::Equal)
60    }
61}
62
63impl VolcanoExecutor for PhysicalSort {
64    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
65        self.input.init(context)?;
66        self.all_tuples.borrow_mut().clear();
67        self.cursor.store(0, Ordering::SeqCst);
68        Ok(())
69    }
70
71    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
72        if self.all_tuples.borrow().is_empty() {
73            let mut keyed_rows: Vec<(Tuple, Vec<ScalarValue>)> = Vec::new();
74            while let Some(tuple) = self.input.next(context)? {
75                let mut keys = Vec::with_capacity(self.order_bys.len());
76                for order in &self.order_bys {
77                    keys.push(context.eval_expr(&order.expr, &tuple)?);
78                }
79                keyed_rows.push((tuple, keys));
80            }
81
82            let mut error = None;
83            keyed_rows.sort_by(|(_, left_keys), (_, right_keys)| {
84                match self.compare_keys(left_keys, right_keys) {
85                    Ok(ord) => ord,
86                    Err(e) => {
87                        error = Some(e);
88                        CmpOrdering::Equal
89                    }
90                }
91            });
92            if let Some(err) = error {
93                return Err(err);
94            }
95
96            let tuples = keyed_rows
97                .into_iter()
98                .map(|(tuple, _)| tuple)
99                .collect::<Vec<_>>();
100            *self.all_tuples.borrow_mut() = tuples;
101        }
102
103        let cursor = self.cursor.fetch_add(1, Ordering::SeqCst);
104        let tuples_ref = self.all_tuples.borrow();
105        if cursor >= tuples_ref.len() {
106            Ok(None)
107        } else {
108            Ok(tuples_ref.get(cursor).cloned())
109        }
110    }
111
112    fn output_schema(&self) -> SchemaRef {
113        self.input.output_schema()
114    }
115}
116
117impl std::fmt::Display for PhysicalSort {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        write!(
120            f,
121            "Sort: {}",
122            self.order_bys
123                .iter()
124                .map(|e| format!("{e}"))
125                .collect::<Vec<_>>()
126                .join(", ")
127        )
128    }
129}