use log::debug;
use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicU32, Arc};
use crate::catalog::{SchemaRef, INSERT_OUTPUT_SCHEMA_REF};
use crate::common::TableReference;
use crate::storage::EMPTY_TUPLE_META;
use crate::{
common::ScalarValue,
execution::{ExecutionContext, VolcanoExecutor},
storage::Tuple,
BustubxResult,
};
use super::PhysicalPlan;
#[derive(Debug)]
pub struct PhysicalInsert {
pub table: TableReference,
pub table_schema: SchemaRef,
pub projected_schema: SchemaRef,
pub input: Arc<PhysicalPlan>,
insert_rows: AtomicU32,
}
impl PhysicalInsert {
pub fn new(
table: TableReference,
table_schema: SchemaRef,
projected_schema: SchemaRef,
input: Arc<PhysicalPlan>,
) -> Self {
Self {
table,
table_schema,
projected_schema,
input,
insert_rows: AtomicU32::new(0),
}
}
}
impl VolcanoExecutor for PhysicalInsert {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
debug!("init insert executor");
self.input.init(context)?;
self.insert_rows.store(0, Ordering::SeqCst);
Ok(())
}
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
loop {
let next_tuple = self.input.next(context)?;
if next_tuple.is_none() {
return if self.insert_rows.load(Ordering::SeqCst) == 0 {
Ok(None)
} else {
let insert_rows = self.insert_rows.swap(0, Ordering::SeqCst);
Ok(Some(Tuple::new(
self.output_schema(),
vec![ScalarValue::Int32(Some(insert_rows as i32))],
)))
};
}
let tuple = next_tuple.unwrap();
let mut casted_data = vec![];
for (idx, value) in tuple.data.iter().enumerate() {
let target_type = self.projected_schema.column_with_index(idx)?.data_type;
if target_type == value.data_type() {
casted_data.push(value.clone());
} else {
casted_data.push(value.cast_to(&target_type)?);
}
}
let tuple = Tuple::new(self.table_schema.clone(), casted_data);
let table_heap = context.catalog.table_heap(&self.table)?;
let rid = table_heap.insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
let indexes = context.catalog.table_indexes(&self.table)?;
for index in indexes {
if let Ok(key_tuple) = tuple.project_with_schema(index.key_schema.clone()) {
let root_page_id = index.root_page_id.load(Ordering::SeqCst);
index.insert(&key_tuple, rid)?;
let new_root_page_id = index.root_page_id.load(Ordering::SeqCst);
if new_root_page_id != root_page_id {
}
}
}
self.insert_rows.fetch_add(1, Ordering::SeqCst);
}
}
fn output_schema(&self) -> SchemaRef {
INSERT_OUTPUT_SCHEMA_REF.clone()
}
}
impl std::fmt::Display for PhysicalInsert {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Insert")
}
}