quill_sql/execution/physical_plan/
insert.rs

1use log::debug;
2use std::sync::atomic::Ordering;
3use std::sync::{atomic::AtomicU32, Arc};
4
5use crate::catalog::{SchemaRef, INSERT_OUTPUT_SCHEMA_REF};
6use crate::error::QuillSQLError;
7use crate::storage::page::TupleMeta;
8use crate::storage::tuple::Tuple;
9use crate::transaction::LockMode;
10use crate::utils::scalar::ScalarValue;
11use crate::utils::table_ref::TableReference;
12use crate::{
13    error::QuillSQLResult,
14    execution::{ExecutionContext, VolcanoExecutor},
15};
16
17use super::PhysicalPlan;
18
19#[derive(Debug)]
20pub struct PhysicalInsert {
21    pub table: TableReference,
22    pub table_schema: SchemaRef,
23    pub projected_schema: SchemaRef,
24    pub input: Arc<PhysicalPlan>,
25
26    insert_rows: AtomicU32,
27}
28impl PhysicalInsert {
29    pub fn new(
30        table: TableReference,
31        table_schema: SchemaRef,
32        projected_schema: SchemaRef,
33        input: Arc<PhysicalPlan>,
34    ) -> Self {
35        Self {
36            table,
37            table_schema,
38            projected_schema,
39            input,
40            insert_rows: AtomicU32::new(0),
41        }
42    }
43}
44impl VolcanoExecutor for PhysicalInsert {
45    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
46        debug!("init insert executor");
47        self.input.init(context)?;
48        self.insert_rows.store(0, Ordering::SeqCst);
49        context.ensure_writable(&self.table, "INSERT")?;
50        if context
51            .txn_mgr
52            .acquire_table_lock(
53                context.txn,
54                self.table.clone(),
55                LockMode::IntentionExclusive,
56            )
57            .is_err()
58        {
59            return Err(QuillSQLError::Execution(format!(
60                "failed to acquire IX lock on table {}",
61                self.table
62            )));
63        }
64        Ok(())
65    }
66    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
67        loop {
68            let next_tuple = self.input.next(context)?;
69            if next_tuple.is_none() {
70                // only return insert_rows when input exhausted
71                return if self.insert_rows.load(Ordering::SeqCst) == 0 {
72                    Ok(None)
73                } else {
74                    let insert_rows = self.insert_rows.swap(0, Ordering::SeqCst);
75                    Ok(Some(Tuple::new(
76                        self.output_schema(),
77                        vec![ScalarValue::Int32(Some(insert_rows as i32))],
78                    )))
79                };
80            }
81            let tuple = next_tuple.unwrap();
82
83            // cast values
84            let mut casted_data = vec![];
85            for (idx, value) in tuple.data.iter().enumerate() {
86                let target_type = self.projected_schema.column_with_index(idx)?.data_type;
87                casted_data.push(value.cast_to(&target_type)?);
88            }
89
90            // fill default values
91            let mut full_data = vec![];
92            for col in self.table_schema.columns.iter() {
93                if let Ok(idx) = self
94                    .projected_schema
95                    .index_of(col.relation.as_ref(), &col.name)
96                {
97                    full_data.push(casted_data[idx].clone());
98                } else {
99                    full_data.push(col.default.clone())
100                }
101            }
102
103            let tuple = Tuple::new(self.table_schema.clone(), full_data);
104
105            let table_heap = context.catalog.table_heap(&self.table)?;
106            let meta = TupleMeta {
107                insert_txn_id: context.txn.id(),
108                delete_txn_id: 0,
109                is_deleted: false,
110            };
111            let rid = table_heap.insert_tuple(&meta, &tuple)?;
112            let mut index_links = Vec::new();
113
114            let indexes = context.catalog.table_indexes(&self.table)?;
115            for index in indexes {
116                if let Ok(key_tuple) = tuple.project_with_schema(index.key_schema.clone()) {
117                    let root_page_id = index.get_root_page_id()?;
118                    index.insert(&key_tuple, rid)?;
119                    index_links.push((index.clone(), key_tuple));
120                    let new_root_page_id = index.get_root_page_id()?;
121                    if new_root_page_id != root_page_id {
122                        // root change comment
123                    }
124                }
125            }
126
127            context
128                .txn
129                .push_insert_undo(table_heap.clone(), rid, index_links);
130
131            self.insert_rows.fetch_add(1, Ordering::SeqCst);
132        }
133    }
134
135    fn output_schema(&self) -> SchemaRef {
136        INSERT_OUTPUT_SCHEMA_REF.clone()
137    }
138}
139
140impl std::fmt::Display for PhysicalInsert {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        write!(f, "Insert")
143    }
144}