quill-sql 0.3.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
use crate::error::QuillSQLResult;
use crate::storage::record::{RecordId, TupleMeta};
use crate::storage::tuple::Tuple;
use crate::storage::{IndexHandle, TableHandle};
use sqlparser::ast::TransactionAccessMode;
use std::str::FromStr;
use std::sync::Arc;

use super::TransactionSnapshot;

pub type TransactionId = u64;
pub type CommandId = u32;

pub const INVALID_COMMAND_ID: CommandId = CommandId::MAX;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
    ReadUncommitted,
    ReadCommitted,
    RepeatableRead,
    Serializable,
}

impl IsolationLevel {
    pub fn as_str(&self) -> &'static str {
        match self {
            IsolationLevel::ReadUncommitted => "read-uncommitted",
            IsolationLevel::ReadCommitted => "read-committed",
            IsolationLevel::RepeatableRead => "repeatable-read",
            IsolationLevel::Serializable => "serializable",
        }
    }
}

impl FromStr for IsolationLevel {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s.trim().to_ascii_lowercase().as_str() {
            "read-uncommitted" | "ru" => Ok(IsolationLevel::ReadUncommitted),
            "read-committed" | "rc" => Ok(IsolationLevel::ReadCommitted),
            "repeatable-read" | "rr" => Ok(IsolationLevel::RepeatableRead),
            "serializable" | "sr" | "serial" => Ok(IsolationLevel::Serializable),
            other => Err(format!("unknown isolation level '{}'", other)),
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionState {
    Running,
    Tainted,
    Committed,
    Aborted,
}

#[derive(Clone)]
pub enum UndoAction {
    Insert {
        table: Arc<dyn TableHandle>,
        rid: RecordId,
        indexes: Vec<IndexUndoLink>,
    },
    Delete {
        table: Arc<dyn TableHandle>,
        rid: RecordId,
        prev_meta: TupleMeta,
        prev_tuple: Tuple,
        indexes: Vec<IndexUndoLink>,
    },
}

#[derive(Clone)]
pub struct IndexUndoLink {
    pub index: Arc<dyn IndexHandle>,
    pub key: Tuple,
    pub rid: RecordId,
}

pub(crate) type IndexUndoEntries = Vec<(Arc<dyn IndexHandle>, Tuple, RecordId)>;

pub(crate) struct UpdateUndo {
    pub old_rid: RecordId,
    pub new_rid: RecordId,
    pub prev_meta: TupleMeta,
    pub prev_tuple: Tuple,
    pub new_keys: IndexUndoEntries,
    pub old_keys: IndexUndoEntries,
}

impl std::fmt::Debug for IndexUndoLink {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("IndexUndoLink")
            .field("index", &self.index.name())
            .field("key", &self.key)
            .field("rid", &self.rid)
            .finish()
    }
}

impl std::fmt::Debug for UndoAction {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Insert {
                table,
                rid,
                indexes,
            } => f
                .debug_struct("Insert")
                .field("table", &table.table_ref())
                .field("rid", rid)
                .field("indexes", indexes)
                .finish(),
            Self::Delete {
                table,
                rid,
                prev_meta,
                prev_tuple,
                indexes,
            } => f
                .debug_struct("Delete")
                .field("table", &table.table_ref())
                .field("rid", rid)
                .field("prev_meta", prev_meta)
                .field("prev_tuple", prev_tuple)
                .field("indexes", indexes)
                .finish(),
        }
    }
}

impl UndoAction {
    pub fn undo(self, txn_id: TransactionId) -> QuillSQLResult<()> {
        match self {
            UndoAction::Insert {
                table,
                rid,
                indexes,
            } => {
                for link in indexes.into_iter() {
                    link.index.delete(&link.key, link.rid, txn_id)?;
                }
                table.undo_insert(rid, txn_id)?;
                Ok(())
            }
            UndoAction::Delete {
                table,
                rid,
                prev_meta,
                prev_tuple,
                indexes,
            } => {
                for link in indexes {
                    link.index.insert(&link.key, link.rid, txn_id)?;
                }
                table.undo_delete(rid, prev_meta, prev_tuple)?;
                Ok(())
            }
        }
    }
}

pub struct Transaction {
    id: TransactionId,
    isolation_level: IsolationLevel,
    access_mode: TransactionAccessMode,
    state: TransactionState,
    undo_actions: Vec<UndoAction>,
    current_command_id: CommandId,
    next_command_id: CommandId,
    snapshot: Option<TransactionSnapshot>,
}

impl Transaction {
    pub fn new(
        id: TransactionId,
        isolation_level: IsolationLevel,
        access_mode: TransactionAccessMode,
    ) -> Self {
        Self {
            id,
            isolation_level,
            access_mode,
            state: TransactionState::Running,
            undo_actions: Vec::new(),
            current_command_id: INVALID_COMMAND_ID,
            next_command_id: 0,
            snapshot: None,
        }
    }

    pub fn id(&self) -> TransactionId {
        self.id
    }

    pub fn isolation_level(&self) -> IsolationLevel {
        self.isolation_level
    }

    pub fn access_mode(&self) -> TransactionAccessMode {
        self.access_mode
    }

    pub fn set_isolation_level(&mut self, isolation_level: IsolationLevel) {
        self.isolation_level = isolation_level;
        if matches!(
            isolation_level,
            IsolationLevel::ReadCommitted | IsolationLevel::ReadUncommitted
        ) {
            self.clear_snapshot();
        }
    }

    pub fn state(&self) -> TransactionState {
        self.state
    }

    pub fn begin_command(&mut self) -> CommandId {
        let cid = self.next_command_id;
        self.current_command_id = cid;
        self.next_command_id = self.next_command_id.wrapping_add(1);
        cid
    }

    pub fn current_command_id(&self) -> CommandId {
        self.current_command_id
    }

    pub(crate) fn set_state(&mut self, state: TransactionState) {
        self.state = state;
    }

    pub fn update_access_mode(&mut self, access_mode: TransactionAccessMode) {
        self.access_mode = access_mode;
    }

    pub(crate) fn push_insert_undo(
        &mut self,
        table: Arc<dyn TableHandle>,
        rid: RecordId,
        indexes: IndexUndoEntries,
    ) {
        self.undo_actions.push(UndoAction::Insert {
            table,
            rid,
            indexes: indexes
                .into_iter()
                .map(|(index, key, rid)| IndexUndoLink { index, key, rid })
                .collect(),
        });
    }

    pub(crate) fn push_update_undo(&mut self, table: Arc<dyn TableHandle>, undo: UpdateUndo) {
        self.undo_actions.push(UndoAction::Insert {
            table: table.clone(),
            rid: undo.new_rid,
            indexes: undo
                .new_keys
                .into_iter()
                .map(|(index, key, rid)| IndexUndoLink { index, key, rid })
                .collect(),
        });
        self.undo_actions.push(UndoAction::Delete {
            table: table.clone(),
            rid: undo.old_rid,
            prev_meta: undo.prev_meta,
            prev_tuple: undo.prev_tuple,
            indexes: undo
                .old_keys
                .into_iter()
                .map(|(index, key, rid)| IndexUndoLink { index, key, rid })
                .collect(),
        });
    }

    pub(crate) fn push_delete_undo(
        &mut self,
        table: Arc<dyn TableHandle>,
        rid: RecordId,
        prev_meta: TupleMeta,
        prev_tuple: Tuple,
        indexes: IndexUndoEntries,
    ) {
        self.undo_actions.push(UndoAction::Delete {
            table,
            rid,
            prev_meta,
            prev_tuple,
            indexes: indexes
                .into_iter()
                .map(|(index, key, rid)| IndexUndoLink { index, key, rid })
                .collect(),
        });
    }

    pub(crate) fn pop_undo_action(&mut self) -> Option<UndoAction> {
        self.undo_actions.pop()
    }

    pub(crate) fn clear_undo(&mut self) {
        self.undo_actions.clear();
    }

    pub fn snapshot(&self) -> Option<&TransactionSnapshot> {
        self.snapshot.as_ref()
    }

    pub fn set_snapshot(&mut self, snapshot: TransactionSnapshot) {
        self.snapshot = Some(snapshot);
    }

    pub fn clear_snapshot(&mut self) {
        self.snapshot = None;
    }
}