quill_sql/execution/physical_plan/
insert.rs

1use log::debug;
2use std::sync::atomic::Ordering;
3use std::sync::{atomic::AtomicU32, Arc};
4
5use crate::catalog::{SchemaRef, INSERT_OUTPUT_SCHEMA_REF};
6use crate::error::QuillSQLError;
7use crate::storage::tuple::Tuple;
8use crate::transaction::LockMode;
9use crate::utils::scalar::ScalarValue;
10use crate::utils::table_ref::TableReference;
11use crate::{
12    error::QuillSQLResult,
13    execution::{ExecutionContext, VolcanoExecutor},
14};
15
16use super::PhysicalPlan;
17
18#[derive(Debug)]
19pub struct PhysicalInsert {
20    pub table: TableReference,
21    pub table_schema: SchemaRef,
22    pub projected_schema: SchemaRef,
23    pub input: Arc<PhysicalPlan>,
24
25    insert_rows: AtomicU32,
26}
27impl PhysicalInsert {
28    pub fn new(
29        table: TableReference,
30        table_schema: SchemaRef,
31        projected_schema: SchemaRef,
32        input: Arc<PhysicalPlan>,
33    ) -> Self {
34        Self {
35            table,
36            table_schema,
37            projected_schema,
38            input,
39            insert_rows: AtomicU32::new(0),
40        }
41    }
42}
43impl VolcanoExecutor for PhysicalInsert {
44    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
45        debug!("init insert executor");
46        self.input.init(context)?;
47        self.insert_rows.store(0, Ordering::SeqCst);
48        context.ensure_writable(&self.table, "INSERT")?;
49        if context
50            .lock_table(self.table.clone(), LockMode::IntentionExclusive)
51            .is_err()
52        {
53            return Err(QuillSQLError::Execution(format!(
54                "failed to acquire IX lock on table {}",
55                self.table
56            )));
57        }
58        Ok(())
59    }
60    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
61        loop {
62            let next_tuple = self.input.next(context)?;
63            if next_tuple.is_none() {
64                // only return insert_rows when input exhausted
65                return if self.insert_rows.load(Ordering::SeqCst) == 0 {
66                    Ok(None)
67                } else {
68                    let insert_rows = self.insert_rows.swap(0, Ordering::SeqCst);
69                    Ok(Some(Tuple::new(
70                        self.output_schema(),
71                        vec![ScalarValue::Int32(Some(insert_rows as i32))],
72                    )))
73                };
74            }
75            let tuple = next_tuple.unwrap();
76
77            // cast values
78            let mut casted_data = vec![];
79            for (idx, value) in tuple.data.iter().enumerate() {
80                let target_type = self.projected_schema.column_with_index(idx)?.data_type;
81                casted_data.push(value.cast_to(&target_type)?);
82            }
83
84            // fill default values
85            let mut full_data = vec![];
86            for col in self.table_schema.columns.iter() {
87                if let Ok(idx) = self
88                    .projected_schema
89                    .index_of(col.relation.as_ref(), &col.name)
90                {
91                    full_data.push(casted_data[idx].clone());
92                } else {
93                    full_data.push(col.default.clone())
94                }
95            }
96
97            let tuple = Tuple::new(self.table_schema.clone(), full_data);
98
99            context.insert_tuple_with_indexes(&self.table, &tuple)?;
100
101            self.insert_rows.fetch_add(1, Ordering::SeqCst);
102        }
103    }
104
105    fn output_schema(&self) -> SchemaRef {
106        INSERT_OUTPUT_SCHEMA_REF.clone()
107    }
108}
109
110impl std::fmt::Display for PhysicalInsert {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        write!(f, "Insert")
113    }
114}