quill-sql 0.2.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
#![cfg(not(target_os = "linux"))]

use crate::config::IOSchedulerConfig;
use crate::error::QuillSQLResult;
use crate::storage::disk_manager::DiskManager;
use crate::storage::disk_scheduler::{DiskRequest, RequestQueue, RequestReceiver, RequestSender};
use bytes::{Bytes, BytesMut};
use std::fs::{create_dir_all, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;

pub fn spawn_runtime(
    disk_manager: Arc<DiskManager>,
    config: IOSchedulerConfig,
) -> (RequestSender, Vec<thread::JoinHandle<()>>) {
    let queue = Arc::new(RequestQueue::new());
    let sender = RequestSender::new(queue.clone());

    let mut worker_threads = Vec::with_capacity(config.workers);
    for i in 0..config.workers {
        let dm = disk_manager.clone();
        let rx = RequestReceiver::new(queue.clone());
        let handle = thread::Builder::new()
            .name(format!("disk-scheduler-blocking-worker-{}", i))
            .spawn(move || worker_loop(rx, dm))
            .expect("Failed to spawn blocking disk scheduler worker");
        worker_threads.push(handle);
    }

    (sender, worker_threads)
}

fn worker_loop(receiver: RequestReceiver, disk_manager: Arc<DiskManager>) {
    while let Some(request) = receiver.recv() {
        match request {
            DiskRequest::ReadPage {
                page_id,
                result_sender,
            } => {
                let result = disk_manager
                    .read_page(page_id)
                    .map(|page| BytesMut::from(&page[..]));
                let _ = result_sender.send(result);
            }
            DiskRequest::ReadPages {
                page_ids,
                result_sender,
            } => {
                let mut buffers = Vec::with_capacity(page_ids.len());
                let mut error = None;
                for pid in page_ids {
                    match disk_manager.read_page(pid) {
                        Ok(page) => buffers.push(BytesMut::from(&page[..])),
                        Err(err) => {
                            error = Some(err);
                            break;
                        }
                    }
                }
                let result = match error {
                    Some(err) => Err(err),
                    None => Ok(buffers),
                };
                let _ = result_sender.send(result);
            }
            DiskRequest::WritePage {
                page_id,
                data,
                result_sender,
            } => {
                let result = disk_manager.write_page(page_id, data.as_ref());
                let _ = result_sender.send(result);
            }
            DiskRequest::WriteWal {
                path,
                offset,
                data,
                sync,
                result_sender,
            } => {
                let result = write_wal_blocking(&path, offset, &data, sync);
                let _ = result_sender.send(result);
            }
            DiskRequest::FsyncWal {
                path,
                result_sender,
            } => {
                let result = fsync_wal_blocking(&path);
                let _ = result_sender.send(result);
            }
            DiskRequest::AllocatePage { result_sender } => {
                let result = disk_manager.allocate_page();
                let _ = result_sender.send(result);
            }
            DiskRequest::DeallocatePage {
                page_id,
                result_sender,
            } => {
                let result = disk_manager.deallocate_page(page_id);
                let _ = result_sender.send(result);
            }
            DiskRequest::Shutdown => break,
        }
    }
}

fn write_wal_blocking(path: &PathBuf, offset: u64, data: &Bytes, sync: bool) -> QuillSQLResult<()> {
    if let Some(parent) = path.parent() {
        if !parent.exists() {
            create_dir_all(parent)?;
        }
    }
    let mut file = OpenOptions::new()
        .create(true)
        .read(true)
        .write(true)
        .open(path)?;
    file.seek(SeekFrom::Start(offset))?;
    file.write_all(data.as_ref())?;
    if sync {
        file.sync_data()?;
    }
    Ok(())
}

fn fsync_wal_blocking(path: &PathBuf) -> QuillSQLResult<()> {
    let mut file = OpenOptions::new()
        .create(true)
        .read(true)
        .write(true)
        .open(path)?;
    file.sync_all()?;
    Ok(())
}