Skip to main content

quill_sql/execution/physical_plan/
values.rs

1use std::sync::atomic::{AtomicU32, Ordering};
2
3use crate::catalog::SchemaRef;
4use crate::expression::Expr;
5use crate::storage::tuple::{Tuple, EMPTY_TUPLE};
6use crate::utils::scalar::ScalarValue;
7use crate::{
8    error::QuillSQLResult,
9    execution::{ExecutionContext, VolcanoExecutor},
10};
11
12#[derive(Debug)]
13pub struct PhysicalValues {
14    pub schema: SchemaRef,
15    pub rows: Vec<Vec<Expr>>,
16
17    cursor: AtomicU32,
18}
19impl PhysicalValues {
20    pub fn new(schema: SchemaRef, rows: Vec<Vec<Expr>>) -> Self {
21        PhysicalValues {
22            schema,
23            rows,
24            cursor: AtomicU32::new(0),
25        }
26    }
27}
28impl VolcanoExecutor for PhysicalValues {
29    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
30        let cursor = self.cursor.fetch_add(1, Ordering::SeqCst) as usize;
31        if cursor < self.rows.len() {
32            let values = self.rows[cursor]
33                .iter()
34                .map(|e| context.eval_expr(e, &EMPTY_TUPLE))
35                .collect::<QuillSQLResult<Vec<ScalarValue>>>()?;
36            debug_assert_eq!(self.schema.column_count(), values.len());
37
38            let casted_values = values
39                .iter()
40                .zip(self.schema.columns.iter())
41                .map(|(val, col)| val.cast_to(&col.data_type))
42                .collect::<QuillSQLResult<Vec<ScalarValue>>>()?;
43
44            Ok(Some(Tuple::new(self.output_schema(), casted_values)))
45        } else {
46            Ok(None)
47        }
48    }
49
50    fn output_schema(&self) -> SchemaRef {
51        self.schema.clone()
52    }
53}
54
55impl std::fmt::Display for PhysicalValues {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        write!(f, "Values: rows={}", self.rows.len())
58    }
59}