quill_sql/execution/physical_plan/
sort.rs

1use std::cmp::Ordering as CmpOrdering;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4
5use crate::catalog::SchemaRef;
6use crate::error::QuillSQLError;
7use crate::plan::logical_plan::OrderByExpr;
8use crate::utils::scalar::ScalarValue;
9use crate::{
10    error::QuillSQLResult,
11    execution::{ExecutionContext, VolcanoExecutor},
12    storage::tuple::Tuple,
13};
14
15use super::PhysicalPlan;
16
17#[derive(Debug)]
18pub struct PhysicalSort {
19    pub order_bys: Vec<OrderByExpr>,
20    pub input: Arc<PhysicalPlan>,
21
22    all_tuples: Mutex<Vec<Tuple>>,
23    cursor: AtomicUsize,
24}
25impl PhysicalSort {
26    pub fn new(order_bys: Vec<OrderByExpr>, input: Arc<PhysicalPlan>) -> Self {
27        PhysicalSort {
28            order_bys,
29            input,
30            all_tuples: Mutex::new(Vec::new()),
31            cursor: AtomicUsize::new(0),
32        }
33    }
34
35    fn compare_keys(
36        &self,
37        left: &[ScalarValue],
38        right: &[ScalarValue],
39    ) -> QuillSQLResult<CmpOrdering> {
40        for (idx, order) in self.order_bys.iter().enumerate() {
41            let ordering = if order.asc {
42                left[idx].partial_cmp(&right[idx])
43            } else {
44                right[idx].partial_cmp(&left[idx])
45            }
46            .ok_or_else(|| {
47                QuillSQLError::Execution(format!(
48                    "Can not compare {:?} and {:?}",
49                    left[idx], right[idx]
50                ))
51            })?;
52            if ordering != CmpOrdering::Equal {
53                return Ok(ordering);
54            }
55        }
56        Ok(CmpOrdering::Equal)
57    }
58}
59
60impl VolcanoExecutor for PhysicalSort {
61    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
62        self.input.init(context)?;
63        *self.all_tuples.lock().unwrap() = vec![];
64        self.cursor.store(0, Ordering::SeqCst);
65        Ok(())
66    }
67
68    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
69        if self.all_tuples.lock().unwrap().is_empty() {
70            let mut keyed_rows: Vec<(Tuple, Vec<ScalarValue>)> = Vec::new();
71            while let Some(tuple) = self.input.next(context)? {
72                let mut keys = Vec::with_capacity(self.order_bys.len());
73                for order in &self.order_bys {
74                    keys.push(context.eval_expr(&order.expr, &tuple)?);
75                }
76                keyed_rows.push((tuple, keys));
77            }
78
79            let mut error = None;
80            keyed_rows.sort_by(|(_, left_keys), (_, right_keys)| {
81                match self.compare_keys(left_keys, right_keys) {
82                    Ok(ord) => ord,
83                    Err(e) => {
84                        error = Some(e);
85                        CmpOrdering::Equal
86                    }
87                }
88            });
89            if let Some(err) = error {
90                return Err(err);
91            }
92
93            let tuples = keyed_rows
94                .into_iter()
95                .map(|(tuple, _)| tuple)
96                .collect::<Vec<_>>();
97            *self.all_tuples.lock().unwrap() = tuples;
98        }
99
100        let cursor = self.cursor.fetch_add(1, Ordering::SeqCst);
101        if cursor >= self.all_tuples.lock().unwrap().len() {
102            Ok(None)
103        } else {
104            Ok(self.all_tuples.lock().unwrap().get(cursor).cloned())
105        }
106    }
107
108    fn output_schema(&self) -> SchemaRef {
109        self.input.output_schema()
110    }
111}
112
113impl std::fmt::Display for PhysicalSort {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        write!(
116            f,
117            "Sort: {}",
118            self.order_bys
119                .iter()
120                .map(|e| format!("{e}"))
121                .collect::<Vec<_>>()
122                .join(", ")
123        )
124    }
125}