iridium_core 0.1.10

SQL Server-compatible Rust engine core for Iridium SQL
Documentation
use crate::ast::InsertStmt;
use crate::catalog::TableDef;
use crate::error::DbError;
use crate::storage::StoredRow;
use crate::types::{DataType, Value};

use super::super::context::ExecutionContext;
use super::super::evaluator::eval_expr_to_type_constant;
use super::super::model::single_row_context;
use super::super::result::QueryResult;
use super::super::string_norm::normalize_identifier;

use super::output::build_output_result;
use super::validation::{apply_ansi_padding, enforce_string_length};
use super::MutationExecutor;

impl<'a> MutationExecutor<'a> {
    pub(crate) fn execute_insert_with_context(
        &mut self,
        mut stmt: InsertStmt,
        ctx: &mut ExecutionContext<'_>,
    ) -> Result<Option<QueryResult>, DbError> {
        if let Some(mapped) = ctx.resolve_table_name(&stmt.table.name) {
            stmt.table.name = mapped;
            if stmt.table.schema.is_none() {
                stmt.table.schema = Some("dbo".to_string());
            }
        }
        let schema = stmt.table.schema_or_dbo().to_string();
        let table_name = stmt.table.name.clone();

        let table = self
            .catalog
            .find_table(&schema, &table_name)
            .ok_or_else(|| DbError::table_not_found(&schema, &table_name))?;
        let table = table.clone();

        let table_id = table.id;

        // Check for INSTEAD OF INSERT trigger
        let instead_of_triggers = if ctx.frame.skip_instead_of {
            vec![]
        } else {
            self.find_triggers(&table, crate::ast::TriggerEvent::Insert)
                .into_iter()
                .filter(|t| t.is_instead_of)
                .collect::<Vec<_>>()
        };

        let insert_columns = self.get_insert_columns(&table, &stmt.columns);
        let rowcount_limit = if ctx.options.rowcount == 0 {
            None
        } else {
            Some(ctx.options.rowcount as usize)
        };
        let rows =
            self.collect_insert_rows(&table, &insert_columns, &stmt.source, ctx, rowcount_limit)?;

        if !instead_of_triggers.is_empty() {
            self.execute_triggers(
                &table,
                crate::ast::TriggerEvent::Insert,
                true,
                &rows,
                &[],
                ctx,
            )?;

            if let Some(output) = stmt.output {
                let inserted: Vec<&crate::storage::StoredRow> = rows.iter().collect();
                let result = build_output_result(&output, &table, &inserted, &[])?;
                if let Some(target) = stmt.output_into {
                    if let Some(result) = result.as_ref() {
                        self.insert_output_into(&target, result, ctx)?;
                    } else {
                        return Err(DbError::Execution("OUTPUT INTO produced no result".into()));
                    }
                    return Ok(None);
                }
                return Ok(result);
            }
            return Ok(None);
        }

        let has_after_triggers = !self
            .find_triggers(&table, crate::ast::TriggerEvent::Insert)
            .into_iter()
            .filter(|t| !t.is_instead_of)
            .collect::<Vec<_>>()
            .is_empty();

        let collect_rows = stmt.output.is_some() || has_after_triggers;
        let inserted_rows_for_output =
            self.commit_insert_rows(&table, table_id, rows, ctx, collect_rows)?;

        self.execute_triggers(
            &table,
            crate::ast::TriggerEvent::Insert,
            false,
            &inserted_rows_for_output,
            &[],
            ctx,
        )?;

        if let Some(output) = stmt.output {
            let inserted: Vec<&crate::storage::StoredRow> =
                inserted_rows_for_output.iter().collect();
            let result = build_output_result(&output, &table, &inserted, &[])?;
            if let Some(target) = stmt.output_into {
                if let Some(result) = result.as_ref() {
                    self.insert_output_into(&target, result, ctx)?;
                } else {
                    return Err(DbError::Execution("OUTPUT INTO produced no result".into()));
                }
                return Ok(None);
            }
            return Ok(result);
        }

        Ok(None)
    }

    fn get_insert_columns(
        &self,
        table: &TableDef,
        stmt_columns: &Option<Vec<String>>,
    ) -> Vec<String> {
        if let Some(cols) = stmt_columns.clone() {
            cols
        } else {
            table
                .columns
                .iter()
                .filter(|c| c.computed_expr.is_none())
                .map(|c| c.name.clone())
                .collect::<Vec<_>>()
        }
    }

    pub(crate) fn build_row_from_values(
        &mut self,
        table: &TableDef,
        insert_columns: &[String],
        row_values: Vec<Value>,
        ctx: &mut ExecutionContext<'_>,
    ) -> Result<StoredRow, DbError> {
        let mut final_values = vec![crate::types::Value::Null; table.columns.len()];
        for (input_col, val) in insert_columns.iter().zip(row_values.iter()) {
            let col_idx = table
                .columns
                .iter()
                .position(|c| c.name.eq_ignore_ascii_case(input_col))
                .ok_or_else(|| DbError::column_not_found(input_col))?;

            let col = &table.columns[col_idx];
            if col.computed_expr.is_some() {
                return Err(DbError::Execution(format!(
                    "cannot insert explicit value for computed column '{}'",
                    col.name
                )));
            }
            let mut value = val.clone();
            apply_ansi_padding(&mut value, &col.data_type, col.ansi_padding_on);
            enforce_string_length(&col.data_type, &value, &col.name)?;
            final_values[col_idx] = value;
        }
        let mut temp_row = crate::storage::StoredRow {
            values: final_values,
            deleted: false,
        };
        self.apply_missing_values(table, &mut temp_row.values, ctx)?;
        for (col, value) in table.columns.iter().zip(temp_row.values.iter_mut()) {
            apply_ansi_padding(value, &col.data_type, col.ansi_padding_on);
            enforce_string_length(&col.data_type, value, &col.name)?;
        }
        Ok(temp_row)
    }

    pub(crate) fn build_insert_row(
        &mut self,
        table: &TableDef,
        insert_columns: &[String],
        values: Vec<crate::ast::Expr>,
        ctx: &mut ExecutionContext<'_>,
    ) -> Result<StoredRow, DbError> {
        if insert_columns.len() != values.len() {
            return Err(DbError::Execution(
                "insert column count does not match values count".to_string(),
            ));
        }

        let mut final_values = vec![Value::Null; table.columns.len()];

        for (input_col, expr) in insert_columns.iter().zip(values.iter()) {
            let col_idx = table
                .columns
                .iter()
                .position(|c| c.name.eq_ignore_ascii_case(input_col))
                .ok_or_else(|| DbError::column_not_found(input_col))?;

            let col = &table.columns[col_idx];
            if col.computed_expr.is_some() {
                return Err(DbError::Execution(format!(
                    "cannot insert explicit value for computed column '{}'",
                    col.name
                )));
            }
            let value = eval_expr_to_type_constant(
                expr,
                &col.data_type,
                ctx,
                self.catalog,
                self.storage,
                self.clock,
            )?;
            let mut value = value;
            apply_ansi_padding(&mut value, &col.data_type, col.ansi_padding_on);
            enforce_string_length(&col.data_type, &value, &col.name)?;
            final_values[col_idx] = value;
        }

        self.apply_missing_values(table, &mut final_values, ctx)?;
        for (col, value) in table.columns.iter().zip(final_values.iter_mut()) {
            apply_ansi_padding(value, &col.data_type, col.ansi_padding_on);
            enforce_string_length(&col.data_type, value, &col.name)?;
        }

        Ok(StoredRow {
            values: final_values,
            deleted: false,
        })
    }

    pub(crate) fn apply_missing_values(
        &mut self,
        table: &TableDef,
        final_values: &mut [Value],
        ctx: &mut ExecutionContext<'_>,
    ) -> Result<(), DbError> {
        for (idx, col) in table.columns.iter().enumerate() {
            if col.computed_expr.is_some() {
                continue;
            }
            if matches!(final_values[idx], Value::Null) {
                if col.identity.is_some() {
                    let next_val = self.catalog.next_identity_value(table.id, &col.name)?;
                    ctx.set_last_identity(next_val);
                    final_values[idx] = match &col.data_type {
                        DataType::TinyInt => Value::TinyInt(next_val as u8),
                        DataType::SmallInt => Value::SmallInt(next_val as i16),
                        DataType::Int => Value::Int(next_val as i32),
                        DataType::BigInt => Value::BigInt(next_val),
                        DataType::Decimal { scale, .. } => {
                            let raw = next_val as i128 * 10i128.pow(*scale as u32);
                            Value::Decimal(raw, *scale)
                        }
                        _ => {
                            return Err(DbError::Execution(format!(
                                "identity not supported for column type {:?}",
                                col.data_type
                            )))
                        }
                    };
                    continue;
                }

                if let Some(default_expr) = &col.default {
                    final_values[idx] = eval_expr_to_type_constant(
                        default_expr,
                        &col.data_type,
                        ctx,
                        self.catalog,
                        self.storage,
                        self.clock,
                    )?;
                    continue;
                }

                if !col.nullable {
                    return Err(DbError::Execution(format!(
                        "column '{}' does not allow NULL",
                        col.name
                    )));
                }
            } else if col.identity.is_some() {
                let table_upper = normalize_identifier(&table.name);
                if !ctx.options.identity_insert.contains(&table_upper) {
                    return Err(DbError::Execution(format!(
                        "Cannot insert explicit value for identity column '{}' in table '{}' when IDENTITY_INSERT is set to OFF.",
                        col.name, table.name
                    )));
                }
            }
        }

        for (idx, col) in table.columns.iter().enumerate() {
            if let Some(computed) = &col.computed_expr {
                let snapshot = StoredRow {
                    values: final_values.to_vec(),
                    deleted: false,
                };
                let joined = single_row_context(table, snapshot);
                let value = super::super::evaluator::eval_expr(
                    computed,
                    &joined,
                    ctx,
                    self.catalog,
                    self.storage,
                    self.clock,
                )?;
                final_values[idx] = value;
            }
        }

        for (col, value) in table.columns.iter().zip(final_values.iter_mut()) {
            apply_ansi_padding(value, &col.data_type, col.ansi_padding_on);
        }
        Ok(())
    }
}