quill-sql 0.3.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF};
use crate::error::QuillSQLError;
use crate::expression::{ColumnExpr, Expr};
use crate::plan::logical_plan::OrderByExpr;
use crate::storage::tuple::Tuple;
use crate::utils::table_ref::TableReference;
use crate::{
    error::QuillSQLResult,
    execution::{ExecutionContext, VolcanoExecutor},
};
use std::sync::Arc;

#[derive(Debug, derive_new::new)]
pub struct PhysicalCreateIndex {
    pub name: String,
    pub table: TableReference,
    pub table_schema: SchemaRef,
    pub columns: Vec<OrderByExpr>,
}

impl VolcanoExecutor for PhysicalCreateIndex {
    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
        let mut key_indices = vec![];
        for col in self.columns.iter() {
            match col.expr.as_ref() {
                Expr::Column(ColumnExpr { name, .. }) => {
                    key_indices.push(self.table_schema.index_of(None, name)?);
                }
                _ => {
                    return Err(QuillSQLError::Execution(format!(
                        "The expr should be column instead of {}",
                        col.expr
                    )))
                }
            }
        }
        let key_schema = Arc::new(self.table_schema.project(&key_indices)?);
        context
            .catalog
            .create_index(self.name.clone(), &self.table, key_schema)?;
        backfill_index(context, &self.table, &self.name)?;
        Ok(None)
    }
    fn output_schema(&self) -> SchemaRef {
        EMPTY_SCHEMA_REF.clone()
    }
}

fn backfill_index(
    context: &mut ExecutionContext,
    table: &TableReference,
    index_name: &str,
) -> QuillSQLResult<()> {
    let binding = context.table(table)?;
    let index = binding
        .indexes()
        .iter()
        .find(|index| index.name() == index_name)
        .cloned()
        .ok_or_else(|| QuillSQLError::Execution(format!("index {index_name} not found")))?;
    let mut stream = binding.scan()?;
    while let Some((rid, meta, tuple)) = stream.next()? {
        if meta.is_deleted || !context.txn_ctx().is_visible(&meta) {
            continue;
        }
        if let Ok(key) = tuple.project_with_schema(index.key_schema()) {
            index.insert(&key, rid, context.txn_ctx().txn_id())?;
        }
    }
    Ok(())
}

impl std::fmt::Display for PhysicalCreateIndex {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "CreateIndex: {}", self.name)
    }
}