quill-sql 0.2.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::buffer::BufferManager;
use crate::error::QuillSQLResult;
use crate::recovery::resource_manager::{
    ensure_default_resource_managers_registered, get_resource_manager, UndoContext,
};
use crate::recovery::wal::codec::{
    decode_clr, decode_transaction, ClrPayload, ResourceManagerId, TransactionRecordKind, WalFrame,
};
use crate::recovery::wal_record::WalRecordPayload;
use crate::recovery::{Lsn, WalManager};
use crate::storage::disk_scheduler::DiskScheduler;

#[derive(Default)]
struct UndoRecord {
    next: Option<Lsn>,
    payload: Option<WalFrame>,
}

#[derive(Default)]
struct UndoIndex {
    heads: HashMap<u64, Option<Lsn>>,
    entries: HashMap<Lsn, UndoRecord>,
    active: HashSet<u64>,
}

impl UndoIndex {
    fn observe(&mut self, frame: &WalFrame) -> QuillSQLResult<()> {
        match frame.rmid {
            ResourceManagerId::Clr => {
                let clr = decode_clr(&frame.body)?;
                let next = if clr.undo_next_lsn == 0 {
                    None
                } else {
                    Some(clr.undo_next_lsn)
                };
                self.entries.insert(
                    frame.lsn,
                    UndoRecord {
                        next,
                        payload: None,
                    },
                );
                self.heads.insert(clr.txn_id, next);
                self.active.insert(clr.txn_id);
            }
            ResourceManagerId::Transaction => {
                let tx = decode_transaction(&frame.body, frame.info)?;
                match tx.marker {
                    TransactionRecordKind::Begin => {
                        self.active.insert(tx.txn_id);
                        self.heads.entry(tx.txn_id).or_insert(None);
                    }
                    TransactionRecordKind::Commit | TransactionRecordKind::Abort => {
                        self.active.remove(&tx.txn_id);
                        self.heads.insert(tx.txn_id, None);
                    }
                }
            }
            _ => {
                if let Some(manager) = get_resource_manager(frame.rmid) {
                    if let Some(txn_id) = manager.transaction_id(frame) {
                        let prev = self.head_for(txn_id);
                        self.entries.insert(
                            frame.lsn,
                            UndoRecord {
                                next: prev,
                                payload: Some(frame.clone()),
                            },
                        );
                        self.heads.insert(txn_id, Some(frame.lsn));
                        self.active.insert(txn_id);
                    }
                }
            }
        }
        Ok(())
    }

    fn head_for(&self, txn_id: u64) -> Option<Lsn> {
        self.heads.get(&txn_id).copied().flatten()
    }

    fn entry(&self, lsn: Lsn) -> Option<&UndoRecord> {
        self.entries.get(&lsn)
    }

    fn active_transactions(&self) -> Vec<u64> {
        let mut txns: Vec<u64> = self.active.iter().copied().collect();
        txns.sort_unstable();
        txns
    }
}

#[derive(Default)]
pub struct UndoOutcome {
    pub loser_transactions: Vec<u64>,
    pub max_clr_lsn: Lsn,
}

pub struct UndoExecutor {
    wal: Arc<WalManager>,
    disk_scheduler: Arc<DiskScheduler>,
    buffer_pool: Option<Arc<BufferManager>>,
    index: UndoIndex,
}

impl UndoExecutor {
    pub fn new(
        wal: Arc<WalManager>,
        disk_scheduler: Arc<DiskScheduler>,
        buffer_pool: Option<Arc<BufferManager>>,
    ) -> Self {
        ensure_default_resource_managers_registered();
        Self {
            wal,
            disk_scheduler,
            buffer_pool,
            index: UndoIndex::default(),
        }
    }

    pub fn observe(&mut self, frame: &WalFrame) -> QuillSQLResult<()> {
        self.index.observe(frame)
    }

    pub fn finalize(self) -> QuillSQLResult<UndoOutcome> {
        let losers = self.index.active_transactions();
        let mut max_clr_lsn = 0;

        for txn_id in losers.iter().copied() {
            let mut cursor = self.index.head_for(txn_id);
            while let Some(lsn) = cursor {
                let Some(entry) = self.index.entry(lsn) else {
                    break;
                };
                if let Some(rec) = &entry.payload {
                    if let Some(manager) = get_resource_manager(rec.rmid) {
                        let ctx = UndoContext {
                            disk_scheduler: self.disk_scheduler.clone(),
                            buffer_pool: self.buffer_pool.clone(),
                        };
                        manager.undo(rec, &ctx)?;
                    }
                    let undo_next = entry.next.unwrap_or(0);
                    let clr = self.wal.append_record_with(|_| {
                        WalRecordPayload::Clr(ClrPayload {
                            txn_id,
                            undone_lsn: lsn,
                            undo_next_lsn: undo_next,
                        })
                    })?;
                    if clr.end_lsn > max_clr_lsn {
                        max_clr_lsn = clr.end_lsn;
                    }
                }
                cursor = entry.next;
            }
        }

        Ok(UndoOutcome {
            loser_transactions: losers,
            max_clr_lsn,
        })
    }
}