quill_sql/execution/physical_plan/
values.rs1use std::sync::atomic::{AtomicU32, Ordering};
2
3use crate::catalog::SchemaRef;
4use crate::expression::Expr;
5use crate::storage::tuple::{Tuple, EMPTY_TUPLE};
6use crate::utils::scalar::ScalarValue;
7use crate::{
8 error::QuillSQLResult,
9 execution::{ExecutionContext, VolcanoExecutor},
10};
11
12#[derive(Debug)]
13pub struct PhysicalValues {
14 pub schema: SchemaRef,
15 pub rows: Vec<Vec<Expr>>,
16
17 cursor: AtomicU32,
18}
19impl PhysicalValues {
20 pub fn new(schema: SchemaRef, rows: Vec<Vec<Expr>>) -> Self {
21 PhysicalValues {
22 schema,
23 rows,
24 cursor: AtomicU32::new(0),
25 }
26 }
27}
28impl VolcanoExecutor for PhysicalValues {
29 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
30 let cursor = self.cursor.fetch_add(1, Ordering::SeqCst) as usize;
31 if cursor < self.rows.len() {
32 let values = self.rows[cursor]
33 .iter()
34 .map(|e| context.eval_expr(e, &EMPTY_TUPLE))
35 .collect::<QuillSQLResult<Vec<ScalarValue>>>()?;
36 debug_assert_eq!(self.schema.column_count(), values.len());
37
38 let casted_values = values
39 .iter()
40 .zip(self.schema.columns.iter())
41 .map(|(val, col)| val.cast_to(&col.data_type))
42 .collect::<QuillSQLResult<Vec<ScalarValue>>>()?;
43
44 Ok(Some(Tuple::new(self.output_schema(), casted_values)))
45 } else {
46 Ok(None)
47 }
48 }
49
50 fn output_schema(&self) -> SchemaRef {
51 self.schema.clone()
52 }
53}
54
55impl std::fmt::Display for PhysicalValues {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 write!(f, "Values: rows={}", self.rows.len())
58 }
59}