use crate::ast::InsertStmt;
use crate::catalog::TableDef;
use crate::error::DbError;
use crate::storage::StoredRow;
use crate::types::{DataType, Value};
use super::super::context::ExecutionContext;
use super::super::evaluator::eval_expr_to_type_constant;
use super::super::model::single_row_context;
use super::super::result::QueryResult;
use super::super::string_norm::normalize_identifier;
use super::output::build_output_result;
use super::validation::{apply_ansi_padding, enforce_string_length};
use super::MutationExecutor;
impl<'a> MutationExecutor<'a> {
pub(crate) fn execute_insert_with_context(
&mut self,
mut stmt: InsertStmt,
ctx: &mut ExecutionContext<'_>,
) -> Result<Option<QueryResult>, DbError> {
if let Some(mapped) = ctx.resolve_table_name(&stmt.table.name) {
stmt.table.name = mapped;
if stmt.table.schema.is_none() {
stmt.table.schema = Some("dbo".to_string());
}
}
let schema = stmt.table.schema_or_dbo().to_string();
let table_name = stmt.table.name.clone();
let table = self
.catalog
.find_table(&schema, &table_name)
.ok_or_else(|| DbError::table_not_found(&schema, &table_name))?;
let table = table.clone();
let table_id = table.id;
let instead_of_triggers = if ctx.frame.skip_instead_of {
vec![]
} else {
self.find_triggers(&table, crate::ast::TriggerEvent::Insert)
.into_iter()
.filter(|t| t.is_instead_of)
.collect::<Vec<_>>()
};
let insert_columns = self.get_insert_columns(&table, &stmt.columns);
let rowcount_limit = if ctx.options.rowcount == 0 {
None
} else {
Some(ctx.options.rowcount as usize)
};
let rows =
self.collect_insert_rows(&table, &insert_columns, &stmt.source, ctx, rowcount_limit)?;
if !instead_of_triggers.is_empty() {
self.execute_triggers(
&table,
crate::ast::TriggerEvent::Insert,
true,
&rows,
&[],
ctx,
)?;
if let Some(output) = stmt.output {
let inserted: Vec<&crate::storage::StoredRow> = rows.iter().collect();
let result = build_output_result(&output, &table, &inserted, &[])?;
if let Some(target) = stmt.output_into {
if let Some(result) = result.as_ref() {
self.insert_output_into(&target, result, ctx)?;
} else {
return Err(DbError::Execution("OUTPUT INTO produced no result".into()));
}
return Ok(None);
}
return Ok(result);
}
return Ok(None);
}
let has_after_triggers = !self
.find_triggers(&table, crate::ast::TriggerEvent::Insert)
.into_iter()
.filter(|t| !t.is_instead_of)
.collect::<Vec<_>>()
.is_empty();
let collect_rows = stmt.output.is_some() || has_after_triggers;
let inserted_rows_for_output =
self.commit_insert_rows(&table, table_id, rows, ctx, collect_rows)?;
self.execute_triggers(
&table,
crate::ast::TriggerEvent::Insert,
false,
&inserted_rows_for_output,
&[],
ctx,
)?;
if let Some(output) = stmt.output {
let inserted: Vec<&crate::storage::StoredRow> =
inserted_rows_for_output.iter().collect();
let result = build_output_result(&output, &table, &inserted, &[])?;
if let Some(target) = stmt.output_into {
if let Some(result) = result.as_ref() {
self.insert_output_into(&target, result, ctx)?;
} else {
return Err(DbError::Execution("OUTPUT INTO produced no result".into()));
}
return Ok(None);
}
return Ok(result);
}
Ok(None)
}
fn get_insert_columns(
&self,
table: &TableDef,
stmt_columns: &Option<Vec<String>>,
) -> Vec<String> {
if let Some(cols) = stmt_columns.clone() {
cols
} else {
table
.columns
.iter()
.filter(|c| c.computed_expr.is_none())
.map(|c| c.name.clone())
.collect::<Vec<_>>()
}
}
pub(crate) fn build_row_from_values(
&mut self,
table: &TableDef,
insert_columns: &[String],
row_values: Vec<Value>,
ctx: &mut ExecutionContext<'_>,
) -> Result<StoredRow, DbError> {
let mut final_values = vec![crate::types::Value::Null; table.columns.len()];
for (input_col, val) in insert_columns.iter().zip(row_values.iter()) {
let col_idx = table
.columns
.iter()
.position(|c| c.name.eq_ignore_ascii_case(input_col))
.ok_or_else(|| DbError::column_not_found(input_col))?;
let col = &table.columns[col_idx];
if col.computed_expr.is_some() {
return Err(DbError::Execution(format!(
"cannot insert explicit value for computed column '{}'",
col.name
)));
}
let mut value = val.clone();
apply_ansi_padding(&mut value, &col.data_type, col.ansi_padding_on);
enforce_string_length(&col.data_type, &value, &col.name)?;
final_values[col_idx] = value;
}
let mut temp_row = crate::storage::StoredRow {
values: final_values,
deleted: false,
};
self.apply_missing_values(table, &mut temp_row.values, ctx)?;
for (col, value) in table.columns.iter().zip(temp_row.values.iter_mut()) {
apply_ansi_padding(value, &col.data_type, col.ansi_padding_on);
enforce_string_length(&col.data_type, value, &col.name)?;
}
Ok(temp_row)
}
pub(crate) fn build_insert_row(
&mut self,
table: &TableDef,
insert_columns: &[String],
values: Vec<crate::ast::Expr>,
ctx: &mut ExecutionContext<'_>,
) -> Result<StoredRow, DbError> {
if insert_columns.len() != values.len() {
return Err(DbError::Execution(
"insert column count does not match values count".to_string(),
));
}
let mut final_values = vec![Value::Null; table.columns.len()];
for (input_col, expr) in insert_columns.iter().zip(values.iter()) {
let col_idx = table
.columns
.iter()
.position(|c| c.name.eq_ignore_ascii_case(input_col))
.ok_or_else(|| DbError::column_not_found(input_col))?;
let col = &table.columns[col_idx];
if col.computed_expr.is_some() {
return Err(DbError::Execution(format!(
"cannot insert explicit value for computed column '{}'",
col.name
)));
}
let value = eval_expr_to_type_constant(
expr,
&col.data_type,
ctx,
self.catalog,
self.storage,
self.clock,
)?;
let mut value = value;
apply_ansi_padding(&mut value, &col.data_type, col.ansi_padding_on);
enforce_string_length(&col.data_type, &value, &col.name)?;
final_values[col_idx] = value;
}
self.apply_missing_values(table, &mut final_values, ctx)?;
for (col, value) in table.columns.iter().zip(final_values.iter_mut()) {
apply_ansi_padding(value, &col.data_type, col.ansi_padding_on);
enforce_string_length(&col.data_type, value, &col.name)?;
}
Ok(StoredRow {
values: final_values,
deleted: false,
})
}
pub(crate) fn apply_missing_values(
&mut self,
table: &TableDef,
final_values: &mut [Value],
ctx: &mut ExecutionContext<'_>,
) -> Result<(), DbError> {
for (idx, col) in table.columns.iter().enumerate() {
if col.computed_expr.is_some() {
continue;
}
if matches!(final_values[idx], Value::Null) {
if col.identity.is_some() {
let next_val = self.catalog.next_identity_value(table.id, &col.name)?;
ctx.set_last_identity(next_val);
final_values[idx] = match &col.data_type {
DataType::TinyInt => Value::TinyInt(next_val as u8),
DataType::SmallInt => Value::SmallInt(next_val as i16),
DataType::Int => Value::Int(next_val as i32),
DataType::BigInt => Value::BigInt(next_val),
DataType::Decimal { scale, .. } => {
let raw = next_val as i128 * 10i128.pow(*scale as u32);
Value::Decimal(raw, *scale)
}
_ => {
return Err(DbError::Execution(format!(
"identity not supported for column type {:?}",
col.data_type
)))
}
};
continue;
}
if let Some(default_expr) = &col.default {
final_values[idx] = eval_expr_to_type_constant(
default_expr,
&col.data_type,
ctx,
self.catalog,
self.storage,
self.clock,
)?;
continue;
}
if !col.nullable {
return Err(DbError::Execution(format!(
"column '{}' does not allow NULL",
col.name
)));
}
} else if col.identity.is_some() {
let table_upper = normalize_identifier(&table.name);
if !ctx.session.identity_insert.contains(&table_upper) {
return Err(DbError::Execution(format!(
"Cannot insert explicit value for identity column '{}' in table '{}' when IDENTITY_INSERT is set to OFF.",
col.name, table.name
)));
}
}
}
for (idx, col) in table.columns.iter().enumerate() {
if let Some(computed) = &col.computed_expr {
let snapshot = StoredRow {
values: final_values.to_vec(),
deleted: false,
};
let joined = single_row_context(table, snapshot);
let value = super::super::evaluator::eval_expr(
computed,
&joined,
ctx,
self.catalog,
self.storage,
self.clock,
)?;
final_values[idx] = value;
}
}
for (col, value) in table.columns.iter().zip(final_values.iter_mut()) {
apply_ansi_padding(value, &col.data_type, col.ansi_padding_on);
}
Ok(())
}
}