quill-sql 0.2.0

A tiny yet serious SQL database in Rust with ARIES-style WAL, 2PL, and B+Tree indexes.
Documentation
use std::collections::VecDeque;
use std::ops::{Bound, RangeBounds};
use std::sync::Arc;

use parking_lot::Mutex;

use crate::catalog::SchemaRef;
use crate::error::QuillSQLError;
use crate::execution::{ExecutionContext, VolcanoExecutor};
use crate::storage::index::btree_index::TreeIndexIterator;
use crate::storage::page::{RecordId, TupleMeta};
use crate::storage::table_heap::TableHeap;
use crate::transaction::{IsolationLevel, LockMode};
use crate::utils::table_ref::TableReference;
use crate::{error::QuillSQLResult, storage::tuple::Tuple};

const INDEX_PREFETCH_BATCH: usize = 64;
const INVISIBLE_THRESHOLD: usize = 2048;

#[derive(Debug)]
pub struct PhysicalIndexScan {
    table_ref: TableReference,
    index_name: String,
    table_schema: SchemaRef,
    start_bound: Bound<Tuple>,
    end_bound: Bound<Tuple>,
    iterator: Mutex<Option<TreeIndexIterator>>,
    table_heap: Mutex<Option<Arc<TableHeap>>>,
    buffer: Mutex<VecDeque<(RecordId, TupleMeta, Tuple)>>,
    invisible_hits: Mutex<usize>,
}

impl PhysicalIndexScan {
    pub fn new<R: RangeBounds<Tuple>>(
        table_ref: TableReference,
        index_name: String,
        table_schema: SchemaRef,
        range: R,
    ) -> Self {
        Self {
            table_ref,
            index_name,
            table_schema,
            start_bound: range.start_bound().cloned(),
            end_bound: range.end_bound().cloned(),
            iterator: Mutex::new(None),
            table_heap: Mutex::new(None),
            buffer: Mutex::new(VecDeque::new()),
            invisible_hits: Mutex::new(0),
        }
    }

    fn refill_buffer(&self) -> QuillSQLResult<bool> {
        let table_heap = {
            let guard = self.table_heap.lock();
            guard
                .clone()
                .ok_or_else(|| QuillSQLError::Execution("table heap not initialized".to_string()))?
        };

        let mut fetched = VecDeque::with_capacity(INDEX_PREFETCH_BATCH);
        {
            let mut iter_guard = self.iterator.lock();
            let iterator = iter_guard.as_mut().ok_or_else(|| {
                QuillSQLError::Execution("index iterator not created".to_string())
            })?;
            for _ in 0..INDEX_PREFETCH_BATCH {
                match iterator.next()? {
                    Some(rid) => {
                        let (meta, tuple) = table_heap.full_tuple(rid)?;
                        fetched.push_back((rid, meta, tuple));
                    }
                    None => break,
                }
            }
        }

        if fetched.is_empty() {
            return Ok(false);
        }

        let mut buffer = self.buffer.lock();
        buffer.extend(fetched);
        Ok(true)
    }

    fn handle_invisible(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
        let mut cnt = self.invisible_hits.lock();
        *cnt += 1;
        if *cnt >= INVISIBLE_THRESHOLD {
            *cnt = 0;
            if let Some(index_arc) = context.catalog.index(&self.table_ref, &self.index_name)? {
                index_arc.note_potential_garbage(INVISIBLE_THRESHOLD);
            }
        }
        Ok(())
    }

    fn consume_row(
        &self,
        context: &mut ExecutionContext,
        rid: RecordId,
        meta: TupleMeta,
        tuple: Tuple,
    ) -> QuillSQLResult<Option<Tuple>> {
        context.read_visible_tuple(&self.table_ref, rid, &meta, tuple)
    }
}

impl VolcanoExecutor for PhysicalIndexScan {
    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
        if matches!(
            context.txn().isolation_level(),
            IsolationLevel::ReadCommitted
                | IsolationLevel::RepeatableRead
                | IsolationLevel::Serializable
        ) {
            context.lock_table(self.table_ref.clone(), LockMode::IntentionShared)?;
        }

        let table_heap = context.table_heap(&self.table_ref)?;
        let index = context
            .catalog
            .index(&self.table_ref, &self.index_name)?
            .unwrap();

        {
            let mut iter_guard = self.iterator.lock();
            *iter_guard = Some(TreeIndexIterator::new(
                index,
                (self.start_bound.clone(), self.end_bound.clone()),
            ));
        }

        {
            let mut heap_guard = self.table_heap.lock();
            *heap_guard = Some(table_heap);
        }

        self.buffer.lock().clear();
        *self.invisible_hits.lock() = 0;

        Ok(())
    }

    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
        loop {
            if let Some((rid, meta, tuple)) = {
                let mut buffer = self.buffer.lock();
                buffer.pop_front()
            } {
                if meta.is_deleted {
                    self.handle_invisible(context)?;
                    continue;
                }
                if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
                    return Ok(Some(result));
                }
                continue;
            }

            if !self.refill_buffer()? {
                return Ok(None);
            }
        }
    }

    fn output_schema(&self) -> SchemaRef {
        self.table_schema.clone()
    }
}

impl std::fmt::Display for PhysicalIndexScan {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "IndexScan: {}", self.index_name)
    }
}