Skip to main content

quill_sql/execution/physical_plan/
insert.rs

1use log::debug;
2use std::rc::Rc;
3use std::sync::atomic::Ordering;
4use std::sync::{atomic::AtomicU32, OnceLock};
5
6use crate::catalog::{SchemaRef, INSERT_OUTPUT_SCHEMA_REF};
7use crate::error::QuillSQLError;
8use crate::storage::{engine::TableBinding, tuple::Tuple};
9use crate::transaction::LockMode;
10use crate::utils::scalar::ScalarValue;
11use crate::utils::table_ref::TableReference;
12use crate::{
13    error::QuillSQLResult,
14    execution::{ExecutionContext, VolcanoExecutor},
15};
16
17use super::PhysicalPlan;
18
19#[derive(Debug)]
20pub struct PhysicalInsert {
21    pub table: TableReference,
22    pub table_schema: SchemaRef,
23    pub projected_schema: SchemaRef,
24    pub input: Rc<PhysicalPlan>,
25
26    insert_rows: AtomicU32,
27    table_binding: OnceLock<TableBinding>,
28}
29impl PhysicalInsert {
30    pub fn new(
31        table: TableReference,
32        table_schema: SchemaRef,
33        projected_schema: SchemaRef,
34        input: Rc<PhysicalPlan>,
35    ) -> Self {
36        Self {
37            table,
38            table_schema,
39            projected_schema,
40            input,
41            insert_rows: AtomicU32::new(0),
42            table_binding: OnceLock::new(),
43        }
44    }
45}
46impl VolcanoExecutor for PhysicalInsert {
47    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
48        debug!("init insert executor");
49        self.input.init(context)?;
50        self.insert_rows.store(0, Ordering::SeqCst);
51        context.txn_ctx().ensure_writable(&self.table, "INSERT")?;
52        if context
53            .txn_ctx_mut()
54            .lock_table(self.table.clone(), LockMode::IntentionExclusive)
55            .is_err()
56        {
57            return Err(QuillSQLError::Execution(format!(
58                "failed to acquire IX lock on table {}",
59                self.table
60            )));
61        }
62        if self.table_binding.get().is_none() {
63            let binding = context.table(&self.table)?;
64            let _ = self.table_binding.set(binding);
65        }
66        Ok(())
67    }
68    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
69        loop {
70            let next_tuple = self.input.next(context)?;
71            if next_tuple.is_none() {
72                // only return insert_rows when input exhausted
73                return if self.insert_rows.load(Ordering::SeqCst) == 0 {
74                    Ok(None)
75                } else {
76                    let insert_rows = self.insert_rows.swap(0, Ordering::SeqCst);
77                    Ok(Some(Tuple::new(
78                        self.output_schema(),
79                        vec![ScalarValue::Int32(Some(insert_rows as i32))],
80                    )))
81                };
82            }
83            let tuple = next_tuple.unwrap();
84
85            // cast values
86            let mut casted_data = vec![];
87            for (idx, value) in tuple.data.iter().enumerate() {
88                let target_type = self.projected_schema.column_with_index(idx)?.data_type;
89                casted_data.push(value.cast_to(&target_type)?);
90            }
91
92            // fill default values
93            let mut full_data = vec![];
94            for col in self.table_schema.columns.iter() {
95                if let Ok(idx) = self
96                    .projected_schema
97                    .index_of(col.relation.as_ref(), &col.name)
98                {
99                    full_data.push(casted_data[idx].clone());
100                } else {
101                    full_data.push(col.default.clone())
102                }
103            }
104
105            let tuple = Tuple::new(self.table_schema.clone(), full_data);
106
107            let binding = self
108                .table_binding
109                .get()
110                .expect("table binding not initialized");
111            binding.insert(context.txn_ctx_mut(), &tuple)?;
112
113            self.insert_rows.fetch_add(1, Ordering::SeqCst);
114        }
115    }
116
117    fn output_schema(&self) -> SchemaRef {
118        INSERT_OUTPUT_SCHEMA_REF.clone()
119    }
120}
121
122impl std::fmt::Display for PhysicalInsert {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        write!(f, "Insert")
125    }
126}