quill_sql/execution/physical_plan/
insert.rs1use log::debug;
2use std::sync::atomic::Ordering;
3use std::sync::{atomic::AtomicU32, Arc};
4
5use crate::catalog::{SchemaRef, INSERT_OUTPUT_SCHEMA_REF};
6use crate::error::QuillSQLError;
7use crate::storage::page::TupleMeta;
8use crate::storage::tuple::Tuple;
9use crate::transaction::LockMode;
10use crate::utils::scalar::ScalarValue;
11use crate::utils::table_ref::TableReference;
12use crate::{
13 error::QuillSQLResult,
14 execution::{ExecutionContext, VolcanoExecutor},
15};
16
17use super::PhysicalPlan;
18
19#[derive(Debug)]
20pub struct PhysicalInsert {
21 pub table: TableReference,
22 pub table_schema: SchemaRef,
23 pub projected_schema: SchemaRef,
24 pub input: Arc<PhysicalPlan>,
25
26 insert_rows: AtomicU32,
27}
28impl PhysicalInsert {
29 pub fn new(
30 table: TableReference,
31 table_schema: SchemaRef,
32 projected_schema: SchemaRef,
33 input: Arc<PhysicalPlan>,
34 ) -> Self {
35 Self {
36 table,
37 table_schema,
38 projected_schema,
39 input,
40 insert_rows: AtomicU32::new(0),
41 }
42 }
43}
44impl VolcanoExecutor for PhysicalInsert {
45 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
46 debug!("init insert executor");
47 self.input.init(context)?;
48 self.insert_rows.store(0, Ordering::SeqCst);
49 context.ensure_writable(&self.table, "INSERT")?;
50 if context
51 .txn_mgr
52 .acquire_table_lock(
53 context.txn,
54 self.table.clone(),
55 LockMode::IntentionExclusive,
56 )
57 .is_err()
58 {
59 return Err(QuillSQLError::Execution(format!(
60 "failed to acquire IX lock on table {}",
61 self.table
62 )));
63 }
64 Ok(())
65 }
66 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
67 loop {
68 let next_tuple = self.input.next(context)?;
69 if next_tuple.is_none() {
70 return if self.insert_rows.load(Ordering::SeqCst) == 0 {
72 Ok(None)
73 } else {
74 let insert_rows = self.insert_rows.swap(0, Ordering::SeqCst);
75 Ok(Some(Tuple::new(
76 self.output_schema(),
77 vec![ScalarValue::Int32(Some(insert_rows as i32))],
78 )))
79 };
80 }
81 let tuple = next_tuple.unwrap();
82
83 let mut casted_data = vec![];
85 for (idx, value) in tuple.data.iter().enumerate() {
86 let target_type = self.projected_schema.column_with_index(idx)?.data_type;
87 casted_data.push(value.cast_to(&target_type)?);
88 }
89
90 let mut full_data = vec![];
92 for col in self.table_schema.columns.iter() {
93 if let Ok(idx) = self
94 .projected_schema
95 .index_of(col.relation.as_ref(), &col.name)
96 {
97 full_data.push(casted_data[idx].clone());
98 } else {
99 full_data.push(col.default.clone())
100 }
101 }
102
103 let tuple = Tuple::new(self.table_schema.clone(), full_data);
104
105 let table_heap = context.catalog.table_heap(&self.table)?;
106 let meta = TupleMeta {
107 insert_txn_id: context.txn.id(),
108 delete_txn_id: 0,
109 is_deleted: false,
110 };
111 let rid = table_heap.insert_tuple(&meta, &tuple)?;
112 let mut index_links = Vec::new();
113
114 let indexes = context.catalog.table_indexes(&self.table)?;
115 for index in indexes {
116 if let Ok(key_tuple) = tuple.project_with_schema(index.key_schema.clone()) {
117 let root_page_id = index.get_root_page_id()?;
118 index.insert(&key_tuple, rid)?;
119 index_links.push((index.clone(), key_tuple));
120 let new_root_page_id = index.get_root_page_id()?;
121 if new_root_page_id != root_page_id {
122 }
124 }
125 }
126
127 context
128 .txn
129 .push_insert_undo(table_heap.clone(), rid, index_links);
130
131 self.insert_rows.fetch_add(1, Ordering::SeqCst);
132 }
133 }
134
135 fn output_schema(&self) -> SchemaRef {
136 INSERT_OUTPUT_SCHEMA_REF.clone()
137 }
138}
139
140impl std::fmt::Display for PhysicalInsert {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 write!(f, "Insert")
143 }
144}