iridium_core 0.1.8

SQL Server-compatible Rust engine core for Iridium SQL
Documentation
use std::collections::HashSet;

use crate::ast::{DdlStatement, DmlStatement, SessionStatement, Statement, TransactionStatement};
use crate::error::DbError;

use super::conflict::detect_conflicts;
use super::database::{EngineCatalog, EngineStorage};
use super::journal::{Journal, JournalEvent, WriteKind};
use super::locks::{SessionId, TxWorkspace};
use super::session::SharedState;
use super::table_util::{collect_read_tables, collect_write_tables};
use super::transaction::{TransactionManager, WriteIntentKind};

#[allow(clippy::too_many_arguments)]
pub(crate) fn execute_transaction_statement<C, S>(
    state: &SharedState<C, S>,
    session_id: SessionId,
    tx_manager: &mut TransactionManager<C, S, super::session::SessionSnapshot>,
    journal: &mut dyn Journal,
    workspace_slot: &mut Option<TxWorkspace<C, S>>,
    ctx: &mut super::context::ExecutionContext,
    session_options: &mut super::tooling::SessionOptions,
    stmt: Statement,
) -> Result<Option<super::result::QueryResult>, DbError>
where
    C: EngineCatalog,
    S: EngineStorage,
{
    match stmt {
        Statement::Transaction(TransactionStatement::Begin(name)) => {
            if tx_manager.depth == 0 {
                let (workspace_catalog, workspace_storage, commit_ts, table_versions) = {
                    let storage_guard = state.storage.read();
                    (
                        storage_guard.catalog.clone(),
                        storage_guard.storage.clone(),
                        storage_guard.commit_ts,
                        storage_guard.table_versions.clone(),
                    )
                };
                tx_manager.commit_ts = commit_ts;
                let extra = ctx.create_snapshot(session_options);
                let tx_id = state.allocate_tx_id();
                tx_manager.begin(name.clone(), commit_ts, tx_id, extra)?;
                *workspace_slot = Some(TxWorkspace {
                    catalog: workspace_catalog,
                    storage: workspace_storage,
                    base_table_versions: table_versions,
                    read_tables: HashSet::new(),
                    write_tables: HashSet::new(),
                    acquired_locks: Vec::new(),
                });
                state.wal_begin(tx_id, &format!("{:?}", tx_manager.session_isolation_level));
                journal.record(JournalEvent::Begin {
                    isolation_level: tx_manager.session_isolation_level,
                    name,
                });
            } else {
                tx_manager.depth += 1;
            }
            Ok(None)
        }
        Statement::Transaction(TransactionStatement::Commit(_)) => {
            if tx_manager.depth == 0 {
                return Err(DbError::Execution(
                    "COMMIT without active transaction".into(),
                ));
            }
            if tx_manager.xact_state == -1 {
                return Err(DbError::Execution(
                    "The current transaction cannot be committed and cannot support operations that write to the log file. Roll back the transaction.".into(),
                ));
            }
            tx_manager.depth -= 1;
            if tx_manager.depth > 0 {
                return Ok(None);
            }
            let tx = tx_manager
                .active
                .as_ref()
                .ok_or_else(|| DbError::Execution("COMMIT without active transaction".into()))?;
            let tx_id = tx.id;
            let workspace = workspace_slot.as_ref().ok_or_else(|| {
                DbError::Execution("internal error: missing transaction workspace".into())
            })?;

            let mut storage_guard = state.storage.write();

            let conflicts = detect_conflicts(
                tx.isolation_level,
                &workspace.base_table_versions,
                &workspace.read_tables,
                &workspace.write_tables,
                &storage_guard.table_versions,
            );
            if conflicts {
                return Err(DbError::Execution(
                    "transaction conflict detected during COMMIT".into(),
                ));
            }

            let next_commit_ts = storage_guard.commit_ts + 1;
            let mut next_table_versions = storage_guard.table_versions.clone();
            for table in &workspace.write_tables {
                next_table_versions.insert(table.clone(), next_commit_ts);
            }
            let checkpoint = super::durability::RecoveryCheckpoint {
                catalog: workspace.catalog.clone(),
                storage_data: workspace.storage.get_checkpoint_data(),
                commit_ts: next_commit_ts,
                table_versions: next_table_versions.clone(),
            };
            state.durability.lock().persist_checkpoint(&checkpoint)?;
            state.wal_commit(tx_id);

            storage_guard.catalog = workspace.catalog.clone();
            storage_guard.storage = workspace.storage.clone();
            storage_guard.commit_ts = next_commit_ts;
            for table in &workspace.write_tables {
                storage_guard
                    .table_versions
                    .insert(table.clone(), next_commit_ts);
            }
            state
                .table_locks
                .lock()
                .release_workspace_locks(session_id, workspace_slot, 0);
            state.dirty_buffer.lock().clear_session(session_id);
            if session_options.cursor_close_on_commit {
                ctx.session.cursors.clear();
                *ctx.session.fetch_status = -1;
            }
            tx_manager.active = None;
            tx_manager.commit_ts = storage_guard.commit_ts;
            tx_manager.xact_state = 0;
            *workspace_slot = None;
            journal.record(JournalEvent::Commit);
            Ok(None)
        }
        Statement::Transaction(TransactionStatement::Rollback(savepoint)) => {
            {
                let tx_id = tx_manager.active.as_ref().map(|tx| tx.id).unwrap_or(0);
                let workspace = workspace_slot.as_mut().ok_or_else(|| {
                    DbError::Execution("ROLLBACK without active transaction".into())
                })?;
                let mut extra = ctx.create_snapshot(session_options);
                let full_rollback = tx_manager.rollback(
                    savepoint.clone(),
                    &mut workspace.catalog,
                    &mut workspace.storage,
                    &mut extra,
                )?;
                ctx.restore_snapshot(extra, session_options);

                if full_rollback {
                    state.wal_rollback(tx_id);
                    {
                        let durability = state.durability.lock();
                        if let Some(checkpoint) = durability.latest_checkpoint() {
                            drop(durability);
                            let mut storage_guard = state.storage.write();
                            storage_guard.catalog = checkpoint.catalog;
                            let _ = storage_guard
                                .storage
                                .restore_from_checkpoint(checkpoint.storage_data);
                            storage_guard.commit_ts = checkpoint.commit_ts;
                            storage_guard.table_versions = checkpoint.table_versions;
                            drop(storage_guard);
                        }
                    }
                    tx_manager.active = None;
                    tx_manager.depth = 0;
                    tx_manager.xact_state = 0;
                    state
                        .table_locks
                        .lock()
                        .release_workspace_locks(session_id, workspace_slot, 0);
                    state.dirty_buffer.lock().clear_session(session_id);
                    if session_options.cursor_close_on_commit {
                        ctx.session.cursors.clear();
                        *ctx.session.fetch_status = -1;
                    }
                    *workspace_slot = None;
                } else {
                    if let Some(ref sp_name) = savepoint {
                        state.wal_savepoint(tx_id, sp_name);
                    }
                    if let Some(ref active_tx) = tx_manager.active {
                        let keep = active_tx.write_set.len();
                        if workspace.write_tables.len() > keep {
                            let mut names: Vec<_> =
                                workspace.write_tables.iter().cloned().collect();
                            names.sort();
                            names.truncate(keep);
                            workspace.write_tables = names.into_iter().collect();
                        }
                        let keep_depth = active_tx.savepoints.len();
                        state.table_locks.lock().release_workspace_locks(
                            session_id,
                            workspace_slot,
                            keep_depth,
                        );
                    }
                }
            }
            journal.record(JournalEvent::Rollback { savepoint });
            Ok(None)
        }
        Statement::Transaction(TransactionStatement::Save(name)) => {
            let tx_id = tx_manager.active.as_ref().map(|tx| tx.id).unwrap_or(0);
            let workspace = workspace_slot.as_ref().ok_or_else(|| {
                DbError::Execution("SAVE TRANSACTION without active transaction".into())
            })?;
            let extra = ctx.create_snapshot(session_options);
            tx_manager.save(name.clone(), &workspace.catalog, &workspace.storage, &extra)?;
            state.wal_savepoint(tx_id, &name);
            journal.record(JournalEvent::Savepoint { name });
            Ok(None)
        }
        Statement::Session(SessionStatement::SetTransactionIsolationLevel(level)) => {
            tx_manager.set_isolation_level(level)?;
            journal.record(JournalEvent::SetIsolationLevel {
                isolation_level: level,
            });
            Ok(None)
        }
        _ => Err(DbError::Execution(
            "internal error while executing transaction statement".into(),
        )),
    }
}

pub(crate) fn force_xact_abort<C, S>(
    state: &SharedState<C, S>,
    session_id: SessionId,
    tx_manager: &mut TransactionManager<C, S, super::session::SessionSnapshot>,
    journal: &mut dyn Journal,
    workspace_slot: &mut Option<TxWorkspace<C, S>>,
    ctx: &mut super::context::ExecutionContext,
    session_options: &mut super::tooling::SessionOptions,
) where
    C: EngineCatalog,
    S: EngineStorage,
{
    if tx_manager.active.is_none() {
        return;
    }
    let tx_id = tx_manager.active.as_ref().map(|tx| tx.id).unwrap_or(0);
    if let Some(workspace) = workspace_slot.as_mut() {
        let mut extra = ctx.create_snapshot(session_options);
        let _ = tx_manager.rollback(
            None,
            &mut workspace.catalog,
            &mut workspace.storage,
            &mut extra,
        );
        ctx.restore_snapshot(extra, session_options);
    }
    state.wal_rollback(tx_id);
    state
        .table_locks
        .lock()
        .release_workspace_locks(session_id, workspace_slot, 0);
    state.dirty_buffer.lock().clear_session(session_id);
    if session_options.cursor_close_on_commit {
        ctx.session.cursors.clear();
        *ctx.session.fetch_status = -1;
    }
    *workspace_slot = None;
    tx_manager.active = None;
    tx_manager.depth = 0;
    tx_manager.commit_ts = state.storage.read().commit_ts;
    tx_manager.xact_state = 0;
    journal.record(JournalEvent::Rollback { savepoint: None });
}

pub(crate) fn register_read_tables<C, S>(
    workspace_slot: &mut Option<TxWorkspace<C, S>>,
    stmt: &Statement,
) {
    if let Some(workspace) = workspace_slot.as_mut() {
        for table in collect_read_tables(stmt) {
            workspace.read_tables.insert(table);
        }
    }
}

pub(crate) fn register_workspace_write_tables<C, S>(
    workspace_slot: &mut Option<TxWorkspace<C, S>>,
    stmt: &Statement,
) {
    if let Some(workspace) = workspace_slot.as_mut() {
        for table in collect_write_tables(stmt) {
            workspace.write_tables.insert(table);
        }
    }
}

pub(crate) fn register_write_intent<C, S>(
    tx_manager: &mut TransactionManager<C, S, super::session::SessionSnapshot>,
    journal: &mut dyn Journal,
    stmt: &Statement,
) where
    C: EngineCatalog,
    S: EngineStorage,
{
    if tx_manager.active.is_none() {
        return;
    }

    let (kind, table) = match stmt {
        Statement::Dml(DmlStatement::Insert(s)) => {
            (WriteIntentKind::Insert, Some(s.table.name.clone()))
        }
        Statement::Dml(DmlStatement::Update(s)) => {
            (WriteIntentKind::Update, Some(s.table.name.clone()))
        }
        Statement::Dml(DmlStatement::Delete(s)) => {
            (WriteIntentKind::Delete, Some(s.table.name.clone()))
        }
        Statement::Ddl(DdlStatement::CreateTable(s)) => {
            (WriteIntentKind::Ddl, Some(s.name.name.clone()))
        }
        Statement::Ddl(DdlStatement::DropTable(s)) => {
            (WriteIntentKind::Ddl, Some(s.name.name.clone()))
        }
        Statement::Ddl(DdlStatement::AlterTable(s)) => {
            (WriteIntentKind::Ddl, Some(s.table.name.clone()))
        }
        Statement::Ddl(DdlStatement::TruncateTable(s)) => {
            (WriteIntentKind::Ddl, Some(s.name.name.clone()))
        }
        Statement::Ddl(DdlStatement::CreateIndex(s)) => {
            (WriteIntentKind::Ddl, Some(s.table.name.clone()))
        }
        Statement::Ddl(DdlStatement::DropIndex(s)) => {
            (WriteIntentKind::Ddl, Some(s.table.name.clone()))
        }
        Statement::Ddl(DdlStatement::CreateSchema(_))
        | Statement::Ddl(DdlStatement::DropSchema(_)) => (WriteIntentKind::Ddl, None),
        _ => return,
    };

    tx_manager.register_write_intent(kind, table.clone());
    journal.record(JournalEvent::WriteIntent {
        kind: map_write_kind(kind),
        table,
    });
}

fn map_write_kind(kind: WriteIntentKind) -> WriteKind {
    match kind {
        WriteIntentKind::Insert => WriteKind::Insert,
        WriteIntentKind::Update => WriteKind::Update,
        WriteIntentKind::Delete => WriteKind::Delete,
        WriteIntentKind::Ddl => WriteKind::Ddl,
    }
}