quill_sql/execution/physical_plan/
insert.rs

1use log::debug;
2use std::sync::atomic::Ordering;
3use std::sync::{atomic::AtomicU32, Arc, OnceLock};
4
5use crate::catalog::{SchemaRef, INSERT_OUTPUT_SCHEMA_REF};
6use crate::error::QuillSQLError;
7use crate::storage::{engine::TableBinding, tuple::Tuple};
8use crate::transaction::LockMode;
9use crate::utils::scalar::ScalarValue;
10use crate::utils::table_ref::TableReference;
11use crate::{
12    error::QuillSQLResult,
13    execution::{ExecutionContext, VolcanoExecutor},
14};
15
16use super::PhysicalPlan;
17
18#[derive(Debug)]
19pub struct PhysicalInsert {
20    pub table: TableReference,
21    pub table_schema: SchemaRef,
22    pub projected_schema: SchemaRef,
23    pub input: Arc<PhysicalPlan>,
24
25    insert_rows: AtomicU32,
26    table_binding: OnceLock<TableBinding>,
27}
28impl PhysicalInsert {
29    pub fn new(
30        table: TableReference,
31        table_schema: SchemaRef,
32        projected_schema: SchemaRef,
33        input: Arc<PhysicalPlan>,
34    ) -> Self {
35        Self {
36            table,
37            table_schema,
38            projected_schema,
39            input,
40            insert_rows: AtomicU32::new(0),
41            table_binding: OnceLock::new(),
42        }
43    }
44}
45impl VolcanoExecutor for PhysicalInsert {
46    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
47        debug!("init insert executor");
48        self.input.init(context)?;
49        self.insert_rows.store(0, Ordering::SeqCst);
50        context.txn_ctx().ensure_writable(&self.table, "INSERT")?;
51        if context
52            .txn_ctx_mut()
53            .lock_table(self.table.clone(), LockMode::IntentionExclusive)
54            .is_err()
55        {
56            return Err(QuillSQLError::Execution(format!(
57                "failed to acquire IX lock on table {}",
58                self.table
59            )));
60        }
61        if self.table_binding.get().is_none() {
62            let binding = context.table(&self.table)?;
63            let _ = self.table_binding.set(binding);
64        }
65        Ok(())
66    }
67    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
68        loop {
69            let next_tuple = self.input.next(context)?;
70            if next_tuple.is_none() {
71                // only return insert_rows when input exhausted
72                return if self.insert_rows.load(Ordering::SeqCst) == 0 {
73                    Ok(None)
74                } else {
75                    let insert_rows = self.insert_rows.swap(0, Ordering::SeqCst);
76                    Ok(Some(Tuple::new(
77                        self.output_schema(),
78                        vec![ScalarValue::Int32(Some(insert_rows as i32))],
79                    )))
80                };
81            }
82            let tuple = next_tuple.unwrap();
83
84            // cast values
85            let mut casted_data = vec![];
86            for (idx, value) in tuple.data.iter().enumerate() {
87                let target_type = self.projected_schema.column_with_index(idx)?.data_type;
88                casted_data.push(value.cast_to(&target_type)?);
89            }
90
91            // fill default values
92            let mut full_data = vec![];
93            for col in self.table_schema.columns.iter() {
94                if let Ok(idx) = self
95                    .projected_schema
96                    .index_of(col.relation.as_ref(), &col.name)
97                {
98                    full_data.push(casted_data[idx].clone());
99                } else {
100                    full_data.push(col.default.clone())
101                }
102            }
103
104            let tuple = Tuple::new(self.table_schema.clone(), full_data);
105
106            let binding = self
107                .table_binding
108                .get()
109                .expect("table binding not initialized");
110            binding.insert(context.txn_ctx_mut(), &tuple)?;
111
112            self.insert_rows.fetch_add(1, Ordering::SeqCst);
113        }
114    }
115
116    fn output_schema(&self) -> SchemaRef {
117        INSERT_OUTPUT_SCHEMA_REF.clone()
118    }
119}
120
121impl std::fmt::Display for PhysicalInsert {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        write!(f, "Insert")
124    }
125}