quill-sql 0.2.0

A tiny yet serious SQL database in Rust with ARIES-style WAL, 2PL, and B+Tree indexes.
Documentation
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use once_cell::sync::Lazy;
use std::sync::OnceLock;

use crate::buffer::BufferManager;
use crate::error::{QuillSQLError, QuillSQLResult};
use crate::recovery::wal::codec::{
    decode_page_delta, decode_page_write, ResourceManagerId, WalFrame,
};
use crate::recovery::Lsn;
use crate::storage::disk_scheduler::DiskScheduler;

#[derive(Clone)]
pub struct RedoContext {
    pub disk_scheduler: Arc<DiskScheduler>,
    pub buffer_pool: Option<Arc<BufferManager>>,
}

#[derive(Clone)]
pub struct UndoContext {
    pub disk_scheduler: Arc<DiskScheduler>,
    pub buffer_pool: Option<Arc<BufferManager>>,
}

pub trait ResourceManager: Send + Sync {
    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize>;
    fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()>;

    fn transaction_id(&self, _frame: &WalFrame) -> Option<u64> {
        None
    }
}

static REGISTRY: Lazy<RwLock<HashMap<ResourceManagerId, Arc<dyn ResourceManager>>>> =
    Lazy::new(|| RwLock::new(HashMap::new()));

pub fn register_resource_manager(id: ResourceManagerId, manager: Arc<dyn ResourceManager>) {
    let mut guard = REGISTRY
        .write()
        .expect("resource manager registry poisoned");
    guard.insert(id, manager);
}

pub fn get_resource_manager(id: ResourceManagerId) -> Option<Arc<dyn ResourceManager>> {
    let guard = REGISTRY.read().expect("resource manager registry poisoned");
    guard.get(&id).cloned()
}

#[derive(Default)]
struct PageResourceManager;

impl PageResourceManager {
    fn page_requires_redo(
        &self,
        ctx: &RedoContext,
        page_id: u32,
        record_lsn: Lsn,
    ) -> QuillSQLResult<bool> {
        if let Some(bpm) = &ctx.buffer_pool {
            match bpm.fetch_page_read(page_id) {
                Ok(guard) => Ok(guard.lsn() < record_lsn),
                Err(_) => Ok(true),
            }
        } else {
            Ok(true)
        }
    }

    fn redo_page_write(
        &self,
        ctx: &RedoContext,
        payload: crate::recovery::wal::codec::PageWritePayload,
    ) -> QuillSQLResult<()> {
        debug_assert_eq!(payload.page_image.len(), crate::buffer::PAGE_SIZE);
        let bytes = bytes::Bytes::from(payload.page_image);
        let rx = ctx.disk_scheduler.schedule_write(payload.page_id, bytes)?;
        rx.recv().map_err(|e| {
            QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
        })??;
        Ok(())
    }

    fn redo_page_delta(
        &self,
        ctx: &RedoContext,
        payload: crate::recovery::wal::codec::PageDeltaPayload,
    ) -> QuillSQLResult<()> {
        let rx = ctx.disk_scheduler.schedule_read(payload.page_id)?;
        let buf: bytes::BytesMut = rx.recv().map_err(|e| {
            QuillSQLError::Internal(format!("WAL recovery read recv failed: {}", e))
        })??;
        if buf.len() != crate::buffer::PAGE_SIZE {
            return Err(QuillSQLError::Internal(format!(
                "Unexpected page size {} while applying delta",
                buf.len()
            )));
        }
        let mut page_bytes = buf.to_vec();
        let start = payload.offset as usize;
        if start >= crate::buffer::PAGE_SIZE {
            return Err(QuillSQLError::Internal(format!(
                "PageDelta start out of bounds: offset={} page_size={}",
                start,
                crate::buffer::PAGE_SIZE
            )));
        }
        let end = match start.checked_add(payload.data.len()) {
            Some(e) if e <= crate::buffer::PAGE_SIZE => e,
            _ => {
                return Err(QuillSQLError::Internal(format!(
                    "PageDelta out of bounds: offset={} len={} page_size={}",
                    start,
                    payload.data.len(),
                    crate::buffer::PAGE_SIZE
                )))
            }
        };
        page_bytes[start..end].copy_from_slice(&payload.data);
        let rxw = ctx
            .disk_scheduler
            .schedule_write(payload.page_id, bytes::Bytes::from(page_bytes))?;
        rxw.recv().map_err(|e| {
            QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
        })??;
        Ok(())
    }
}

impl ResourceManager for PageResourceManager {
    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize> {
        match frame.info {
            0 => {
                let payload = decode_page_write(&frame.body)?;
                if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
                    return Ok(0);
                }
                self.redo_page_write(ctx, payload)?;
                Ok(1)
            }
            1 => {
                let payload = decode_page_delta(&frame.body)?;
                if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
                    return Ok(0);
                }
                self.redo_page_delta(ctx, payload)?;
                Ok(1)
            }
            other => Err(QuillSQLError::Internal(format!(
                "Unknown Page info kind: {}",
                other
            ))),
        }
    }

    fn undo(&self, _frame: &WalFrame, _ctx: &UndoContext) -> QuillSQLResult<()> {
        Ok(())
    }
}

static DEFAULT_RESOURCE_MANAGERS: OnceLock<()> = OnceLock::new();

pub fn ensure_default_resource_managers_registered() {
    DEFAULT_RESOURCE_MANAGERS.get_or_init(|| {
        register_resource_manager(
            ResourceManagerId::Page,
            Arc::new(PageResourceManager::default()),
        );
        crate::storage::heap_recovery::ensure_heap_resource_manager_registered();
    });
}