quill-sql 0.2.0

A tiny yet serious SQL database in Rust with ARIES-style WAL, 2PL, and B+Tree indexes.
Documentation
use std::collections::VecDeque;

use parking_lot::Mutex;

use crate::catalog::SchemaRef;
use crate::error::QuillSQLError;
use crate::storage::page::{RecordId, TupleMeta};
use crate::storage::table_heap::TableIterator;
use crate::transaction::LockMode;
use crate::utils::table_ref::TableReference;
use crate::{
    error::QuillSQLResult,
    execution::{ExecutionContext, VolcanoExecutor},
    storage::tuple::Tuple,
};

const PREFETCH_BATCH: usize = 64;

#[derive(Debug)]
pub struct PhysicalSeqScan {
    pub table: TableReference,
    pub table_schema: SchemaRef,
    pub streaming_hint: Option<bool>,

    iterator: Mutex<Option<TableIterator>>,
    prefetch: Mutex<VecDeque<(RecordId, TupleMeta, Tuple)>>,
}

impl PhysicalSeqScan {
    pub fn new(table: TableReference, table_schema: SchemaRef) -> Self {
        PhysicalSeqScan {
            table,
            table_schema,
            streaming_hint: None,
            iterator: Mutex::new(None),
            prefetch: Mutex::new(VecDeque::new()),
        }
    }

    fn refill_buffer(&self) -> QuillSQLResult<bool> {
        let mut fetched = VecDeque::with_capacity(PREFETCH_BATCH);
        {
            let mut guard = self.iterator.lock();
            let iterator = guard.as_mut().ok_or_else(|| {
                QuillSQLError::Execution("table iterator not created".to_string())
            })?;
            for _ in 0..PREFETCH_BATCH {
                match iterator.next()? {
                    Some(entry) => fetched.push_back(entry),
                    None => break,
                }
            }
        }
        if fetched.is_empty() {
            return Ok(false);
        }
        let mut buffer = self.prefetch.lock();
        buffer.extend(fetched);
        Ok(true)
    }

    fn consume_row(
        &self,
        context: &mut ExecutionContext,
        rid: RecordId,
        meta: TupleMeta,
        tuple: Tuple,
    ) -> QuillSQLResult<Option<Tuple>> {
        context.read_visible_tuple(&self.table, rid, &meta, tuple)
    }
}

impl VolcanoExecutor for PhysicalSeqScan {
    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
        context.lock_table(self.table.clone(), LockMode::IntentionShared)?;
        let table_heap = context.table_heap(&self.table)?;
        let iter = if let Some(h) = self.streaming_hint {
            TableIterator::new_with_hint(table_heap, .., Some(h))
        } else {
            TableIterator::new(table_heap, ..)
        };
        {
            let mut guard = self.iterator.lock();
            *guard = Some(iter);
        }
        self.prefetch.lock().clear();
        Ok(())
    }

    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
        loop {
            if let Some((rid, meta, tuple)) = {
                let mut buffer = self.prefetch.lock();
                buffer.pop_front()
            } {
                if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
                    return Ok(Some(result));
                }
                continue;
            }

            if !self.refill_buffer()? {
                return Ok(None);
            }
        }
    }

    fn output_schema(&self) -> SchemaRef {
        self.table_schema.clone()
    }
}

impl std::fmt::Display for PhysicalSeqScan {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "SeqScan")
    }
}