quill-sql 0.2.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use once_cell::sync::Lazy;
use std::sync::OnceLock;

use crate::buffer::BufferManager;
use crate::error::{QuillSQLError, QuillSQLResult};
use crate::recovery::wal::codec::{decode_page_write, ResourceManagerId, WalFrame};
use crate::recovery::Lsn;
use crate::storage::disk_scheduler::DiskScheduler;

#[derive(Clone)]
pub struct RedoContext {
    pub disk_scheduler: Arc<DiskScheduler>,
    pub buffer_pool: Option<Arc<BufferManager>>,
}

#[derive(Clone)]
pub struct UndoContext {
    pub disk_scheduler: Arc<DiskScheduler>,
    pub buffer_pool: Option<Arc<BufferManager>>,
}

pub trait ResourceManager: Send + Sync {
    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize>;
    fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()>;

    fn transaction_id(&self, _frame: &WalFrame) -> Option<u64> {
        None
    }
}

static REGISTRY: Lazy<RwLock<HashMap<ResourceManagerId, Arc<dyn ResourceManager>>>> =
    Lazy::new(|| RwLock::new(HashMap::new()));

pub fn register_resource_manager(id: ResourceManagerId, manager: Arc<dyn ResourceManager>) {
    let mut guard = REGISTRY
        .write()
        .expect("resource manager registry poisoned");
    guard.insert(id, manager);
}

pub fn get_resource_manager(id: ResourceManagerId) -> Option<Arc<dyn ResourceManager>> {
    let guard = REGISTRY.read().expect("resource manager registry poisoned");
    guard.get(&id).cloned()
}

#[derive(Default)]
struct PageResourceManager;

impl PageResourceManager {
    fn page_requires_redo(
        &self,
        ctx: &RedoContext,
        page_id: u32,
        record_lsn: Lsn,
    ) -> QuillSQLResult<bool> {
        if let Some(bpm) = &ctx.buffer_pool {
            match bpm.fetch_page_read(page_id) {
                Ok(guard) => Ok(guard.lsn() < record_lsn),
                Err(_) => Ok(true),
            }
        } else {
            Ok(true)
        }
    }

    fn redo_page_write(
        &self,
        ctx: &RedoContext,
        payload: crate::recovery::wal::codec::PageWritePayload,
    ) -> QuillSQLResult<()> {
        debug_assert_eq!(payload.page_image.len(), crate::buffer::PAGE_SIZE);
        let bytes = bytes::Bytes::from(payload.page_image);
        let rx = ctx.disk_scheduler.schedule_write(payload.page_id, bytes)?;
        rx.recv().map_err(|e| {
            QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
        })??;
        Ok(())
    }
}

impl ResourceManager for PageResourceManager {
    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize> {
        if frame.info != 0 {
            return Err(QuillSQLError::Internal(format!(
                "Unknown Page info kind: {}",
                frame.info
            )));
        }
        let payload = decode_page_write(&frame.body)?;
        if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
            return Ok(0);
        }
        self.redo_page_write(ctx, payload)?;
        Ok(1)
    }

    fn undo(&self, _frame: &WalFrame, _ctx: &UndoContext) -> QuillSQLResult<()> {
        Ok(())
    }
}

static DEFAULT_RESOURCE_MANAGERS: OnceLock<()> = OnceLock::new();

pub fn ensure_default_resource_managers_registered() {
    DEFAULT_RESOURCE_MANAGERS.get_or_init(|| {
        register_resource_manager(
            ResourceManagerId::Page,
            Arc::new(PageResourceManager::default()),
        );
        crate::storage::heap_recovery::ensure_heap_resource_manager_registered();
        crate::storage::index::index_recovery::ensure_index_resource_manager_registered();
    });
}