quill-sql 0.2.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
use std::fs;
use std::path::Path;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use tempfile::TempDir;

use crate::buffer::{AtomicPageId, BufferManager};
use crate::catalog::{Column, DataType, Schema, SchemaRef};
use crate::config::WalConfig;
use crate::error::QuillSQLResult;
use crate::recovery::control_file::ControlFileManager;
use crate::recovery::{RecoveryManager, WalManager};
use crate::storage::disk_manager::DiskManager;
use crate::storage::disk_scheduler::DiskScheduler;
use crate::storage::heap::table_heap::TableHeap;
use crate::storage::heap::MvccHeap;
use crate::storage::tuple::Tuple;
use crate::transaction::{CommandId, TransactionId};
use crate::utils::scalar::ScalarValue;

fn wal_config(dir: &Path, segment_size: u64) -> WalConfig {
    WalConfig {
        directory: dir.to_path_buf(),
        segment_size,
        sync_on_flush: false,
        persist_control_file_on_flush: false,
        writer_interval_ms: None,
        buffer_capacity: 1_024,
        flush_coalesce_bytes: 64 * 1_024,
        synchronous_commit: false,
        checkpoint_interval_ms: None,
        retain_segments: 8,
    }
}

fn build_runtime(
    db_path: &Path,
    cfg: &WalConfig,
) -> (Arc<BufferManager>, Arc<WalManager>, Arc<DiskScheduler>) {
    fs::create_dir_all(&cfg.directory).expect("wal dir");
    let disk_manager = Arc::new(DiskManager::try_new(db_path).expect("disk manager"));
    let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
    let buffer_pool = Arc::new(BufferManager::new(512, disk_scheduler.clone()));
    let (control_file, init_state) =
        ControlFileManager::load_or_init(&cfg.directory, cfg.segment_size).expect("control file");
    let control_file = Arc::new(control_file);
    let wal_manager = Arc::new(
        WalManager::new_with_scheduler(
            cfg.clone(),
            Some(init_state),
            Some(control_file),
            disk_scheduler.clone(),
        )
        .expect("wal manager"),
    );
    buffer_pool.set_wal_manager(wal_manager.clone());
    (buffer_pool, wal_manager, disk_scheduler)
}

fn insert_rows(heap: Arc<TableHeap>, schema: &SchemaRef, rows: usize) {
    let mvcc = MvccHeap::new(heap.clone());
    for i in 0..rows {
        let tuple = Tuple::new(
            schema.clone(),
            vec![
                ScalarValue::Int32(Some(i as i32)),
                ScalarValue::Int32(Some((i * 2) as i32)),
            ],
        );
        let _ = mvcc
            .insert(&tuple, TransactionId::default(), CommandId::default())
            .expect("insert tuple");
    }
}

fn count_rows(heap: &TableHeap) -> QuillSQLResult<usize> {
    let mut count = 0;
    let mut rid = heap.get_first_rid()?;
    while let Some(current) = rid {
        count += 1;
        rid = heap.get_next_rid(current)?;
    }
    Ok(count)
}

#[test]
fn wal_force_flush_recovers_heap() {
    let temp = TempDir::new().expect("tempdir");
    let db_path = temp.path().join("heap_force.db");
    let wal_path = temp.path().join("wal_force");
    let cfg = wal_config(&wal_path, 16 * 1024);

    let schema = Arc::new(Schema::new(vec![
        Column::new("id", DataType::Int32, false),
        Column::new("val", DataType::Int32, false),
    ]));

    let (first_page, last_page) = {
        let (buffer, wal_manager, _scheduler) = build_runtime(&db_path, &cfg);
        let heap = Arc::new(TableHeap::try_new(schema.clone(), buffer.clone()).expect("heap"));
        insert_rows(heap.clone(), &schema, 128);
        buffer.flush_all_pages().expect("flush heap pages");
        let target = wal_manager.max_assigned_lsn();
        wal_manager.flush_until(target).expect("force wal flush");
        (
            heap.first_page_id.load(Ordering::SeqCst),
            heap.last_page_id.load(Ordering::SeqCst),
        )
    };

    let (buffer, wal_manager, scheduler) = build_runtime(&db_path, &cfg);
    let summary = RecoveryManager::new(wal_manager.clone(), scheduler.clone())
        .with_buffer_pool(buffer.clone())
        .replay()
        .expect("replay");
    assert!(summary.redo_count > 0);

    let reopened_heap = TableHeap {
        schema: schema.clone(),
        buffer_pool: buffer.clone(),
        first_page_id: AtomicPageId::new(first_page),
        last_page_id: AtomicPageId::new(last_page),
    };
    let count = count_rows(&reopened_heap).expect("row count");
    assert_eq!(count, 128);
}

#[test]
fn wal_redo_handles_segment_rotation() {
    let temp = TempDir::new().expect("tempdir");
    let db_path = temp.path().join("heap_rotate.db");
    let wal_path = temp.path().join("wal_rotate");
    let cfg = wal_config(&wal_path, 4 * 1024);

    let schema = Arc::new(Schema::new(vec![
        Column::new("id", DataType::Int32, false),
        Column::new("val", DataType::Int32, false),
    ]));

    let (first_page, last_page) = {
        let (buffer, wal_manager, _scheduler) = build_runtime(&db_path, &cfg);
        let heap = Arc::new(TableHeap::try_new(schema.clone(), buffer.clone()).expect("heap"));
        insert_rows(heap.clone(), &schema, 512);
        buffer.flush_all_pages().expect("flush heap pages");
        let target = wal_manager.max_assigned_lsn();
        wal_manager.flush_until(target).expect("wal flush");
        (
            heap.first_page_id.load(Ordering::SeqCst),
            heap.last_page_id.load(Ordering::SeqCst),
        )
    };

    let wal_files: Vec<_> = fs::read_dir(&cfg.directory)
        .expect("wal dir")
        .filter_map(|entry| {
            let entry = entry.ok()?;
            let name = entry.file_name().to_string_lossy().into_owned();
            if name.starts_with("wal_") && name.ends_with(".log") {
                Some(entry.path())
            } else {
                None
            }
        })
        .collect();
    assert!(
        wal_files.len() > 1,
        "expected multiple WAL segments, found {}",
        wal_files.len()
    );

    let (buffer, wal_manager, scheduler) = build_runtime(&db_path, &cfg);
    let summary = RecoveryManager::new(wal_manager.clone(), scheduler.clone())
        .with_buffer_pool(buffer.clone())
        .replay()
        .expect("replay");
    assert!(summary.redo_count > 0);

    let reopened_heap = TableHeap {
        schema,
        buffer_pool: buffer.clone(),
        first_page_id: AtomicPageId::new(first_page),
        last_page_id: AtomicPageId::new(last_page),
    };
    let count = count_rows(&reopened_heap).expect("row count");
    assert_eq!(count, 512);
}