quill-sql 0.3.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
use std::collections::HashMap;
use std::sync::{Arc, LazyLock};

use crate::catalog::{
    Catalog, CatalogSchema, CatalogTable, Column, DataType, Schema, SchemaRef,
    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use crate::database::Database;
use crate::error::{QuillSQLError, QuillSQLResult};
use crate::storage::engine::TableHandle;
use crate::storage::holt::HoltTableHandle;
use crate::storage::tuple::Tuple;
use crate::utils::scalar::ScalarValue;
use crate::utils::table_ref::TableReference;

pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";
pub static INFORMATION_SCHEMA_INDEXES: &str = "indexes";

pub static SCHEMAS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
    Arc::new(Schema::new(vec![
        Column::new("catalog", DataType::Varchar(None), false),
        Column::new("schema", DataType::Varchar(None), false),
    ]))
});

pub static TABLES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
    Arc::new(Schema::new(vec![
        Column::new("table_catalog", DataType::Varchar(None), false),
        Column::new("table_schema", DataType::Varchar(None), false),
        Column::new("table_name", DataType::Varchar(None), false),
        Column::new("first_page_id", DataType::UInt32, false),
    ]))
});

pub static COLUMNS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
    Arc::new(Schema::new(vec![
        Column::new("table_catalog", DataType::Varchar(None), false),
        Column::new("table_schema", DataType::Varchar(None), false),
        Column::new("table_name", DataType::Varchar(None), false),
        Column::new("column_name", DataType::Varchar(None), false),
        Column::new("data_type", DataType::Varchar(None), false),
        Column::new("nullable", DataType::Boolean, false),
        Column::new("default", DataType::Varchar(None), false),
    ]))
});

pub static INDEXES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
    Arc::new(Schema::new(vec![
        Column::new("table_catalog", DataType::Varchar(None), false),
        Column::new("table_schema", DataType::Varchar(None), false),
        Column::new("table_name", DataType::Varchar(None), false),
        Column::new("index_name", DataType::Varchar(None), false),
        Column::new("key_schema", DataType::Varchar(None), false),
        Column::new("internal_max_size", DataType::UInt32, false),
        Column::new("leaf_max_size", DataType::UInt32, false),
        Column::new("header_page_id", DataType::UInt32, false),
    ]))
});

pub fn load_catalog_data(db: &mut Database) -> QuillSQLResult<()> {
    load_information_schema(&mut db.catalog)?;
    load_schemas(db)?;
    create_default_schema_if_not_exists(&mut db.catalog)?;
    load_user_tables(db)?;
    load_descriptor_tables(db)?;
    load_user_indexes(db)?;
    load_descriptor_indexes(db)?;
    db.catalog.refresh_information_schema_projection()?;
    Ok(())
}

fn create_default_schema_if_not_exists(catalog: &mut Catalog) -> QuillSQLResult<()> {
    if !catalog.schemas.contains_key(DEFAULT_SCHEMA_NAME) {
        catalog.create_schema(DEFAULT_SCHEMA_NAME)?;
    }
    Ok(())
}

fn load_information_schema(catalog: &mut Catalog) -> QuillSQLResult<()> {
    let holt = catalog.holt_store.clone();
    let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);

    for (table_name, schema) in [
        (INFORMATION_SCHEMA_SCHEMAS, SCHEMAS_SCHEMA.clone()),
        (INFORMATION_SCHEMA_TABLES, TABLES_SCHEMA.clone()),
        (INFORMATION_SCHEMA_COLUMNS, COLUMNS_SCHEMA.clone()),
        (INFORMATION_SCHEMA_INDEXES, INDEXES_SCHEMA.clone()),
    ] {
        let table_ref = TableReference::Full {
            catalog: DEFAULT_CATALOG_NAME.to_string(),
            schema: INFORMATION_SCHEMA_NAME.to_string(),
            table: table_name.to_string(),
        };
        let table_id = match holt.table_descriptor(&table_ref)? {
            Some(table_id) => table_id,
            None => holt.create_table_descriptor(&table_ref, schema.as_ref())?,
        };
        information_schema.tables.insert(
            table_name.to_string(),
            CatalogTable::new(table_name, schema, table_id),
        );
    }

    catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
    Ok(())
}

fn load_schemas(db: &mut Database) -> QuillSQLResult<()> {
    for tuple in information_schema_rows(db, INFORMATION_SCHEMA_SCHEMAS)? {
        let ScalarValue::Varchar(Some(schema_name)) = tuple.value(1)? else {
            return Err(QuillSQLError::Internal(
                "invalid schema value in information_schema.schemas".to_string(),
            ));
        };
        db.catalog
            .load_schema(schema_name.clone(), CatalogSchema::new(schema_name));
    }
    Ok(())
}

fn load_user_tables(db: &mut Database) -> QuillSQLResult<()> {
    let mut column_map: HashMap<(String, String, String), Vec<Column>> = HashMap::new();
    for tuple in information_schema_rows(db, INFORMATION_SCHEMA_COLUMNS)? {
        let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
            return Err(QuillSQLError::Internal(
                "invalid catalog in information_schema.columns".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
            return Err(QuillSQLError::Internal(
                "invalid schema in information_schema.columns".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
            return Err(QuillSQLError::Internal(
                "invalid table in information_schema.columns".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(column_name)) = tuple.value(3)? else {
            return Err(QuillSQLError::Internal(
                "invalid column name in information_schema.columns".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(data_type_str)) = tuple.value(4)? else {
            return Err(QuillSQLError::Internal(
                "invalid data type in information_schema.columns".to_string(),
            ));
        };
        let ScalarValue::Boolean(Some(nullable)) = tuple.value(5)? else {
            return Err(QuillSQLError::Internal(
                "invalid nullable flag in information_schema.columns".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(default)) = tuple.value(6)? else {
            return Err(QuillSQLError::Internal(
                "invalid default in information_schema.columns".to_string(),
            ));
        };

        let data_type: DataType = data_type_str.as_str().try_into()?;
        let default_value = ScalarValue::from_string(default, data_type)?;
        column_map
            .entry((catalog.clone(), schema.clone(), table.clone()))
            .or_default()
            .push(
                Column::new(column_name.clone(), data_type, *nullable).with_default(default_value),
            );
    }

    for tuple in information_schema_rows(db, INFORMATION_SCHEMA_TABLES)? {
        let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
            return Err(QuillSQLError::Internal(
                "invalid catalog in information_schema.tables".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
            return Err(QuillSQLError::Internal(
                "invalid schema in information_schema.tables".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
            return Err(QuillSQLError::Internal(
                "invalid table name in information_schema.tables".to_string(),
            ));
        };
        let table_ref = TableReference::Full {
            catalog: catalog.to_string(),
            schema: schema.to_string(),
            table: table.to_string(),
        };
        let Some(table_id) = db.catalog.holt_store.table_descriptor(&table_ref)? else {
            continue;
        };
        let columns = column_map
            .remove(&(catalog.clone(), schema.clone(), table.clone()))
            .unwrap_or_default();
        db.catalog.load_table(
            table_ref,
            table.clone(),
            Arc::new(Schema::new(columns)),
            table_id,
        )?;
    }
    Ok(())
}

fn load_user_indexes(db: &mut Database) -> QuillSQLResult<()> {
    for tuple in information_schema_rows(db, INFORMATION_SCHEMA_INDEXES)? {
        let ScalarValue::Varchar(Some(catalog_name)) = tuple.value(0)? else {
            return Err(QuillSQLError::Internal(
                "invalid catalog in information_schema.indexes".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(table_schema_name)) = tuple.value(1)? else {
            return Err(QuillSQLError::Internal(
                "invalid schema in information_schema.indexes".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(table_name)) = tuple.value(2)? else {
            return Err(QuillSQLError::Internal(
                "invalid table in information_schema.indexes".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(index_name)) = tuple.value(3)? else {
            return Err(QuillSQLError::Internal(
                "invalid index name in information_schema.indexes".to_string(),
            ));
        };
        let ScalarValue::Varchar(Some(key_schema_str)) = tuple.value(4)? else {
            return Err(QuillSQLError::Internal(
                "invalid key schema in information_schema.indexes".to_string(),
            ));
        };

        let table_ref = TableReference::Full {
            catalog: catalog_name.clone(),
            schema: table_schema_name.clone(),
            table: table_name.clone(),
        };
        let Some(index_id) = db
            .catalog
            .holt_store
            .index_descriptor(&table_ref, index_name)?
        else {
            continue;
        };
        let table_schema = db.catalog.table_schema(&table_ref)?;
        let key_schema = Arc::new(parse_key_schema_from_varchar(
            key_schema_str.as_str(),
            table_schema,
        )?);
        db.catalog
            .load_index(table_ref, index_name, key_schema, index_id)?;
    }
    Ok(())
}

fn load_descriptor_tables(db: &mut Database) -> QuillSQLResult<()> {
    for descriptor in db.catalog.holt_store.table_descriptors()? {
        if descriptor.table_ref.schema() == Some(INFORMATION_SCHEMA_NAME) {
            continue;
        }
        if db.catalog.try_table_schema(&descriptor.table_ref).is_some() {
            continue;
        }
        let schema_name = descriptor
            .table_ref
            .schema()
            .unwrap_or(DEFAULT_SCHEMA_NAME)
            .to_string();
        if !db.catalog.schemas.contains_key(&schema_name) {
            db.catalog
                .load_schema(schema_name.clone(), CatalogSchema::new(schema_name));
        }
        db.catalog.load_table(
            descriptor.table_ref,
            descriptor.table_name,
            descriptor.schema,
            descriptor.table_id,
        )?;
    }
    Ok(())
}

fn load_descriptor_indexes(db: &mut Database) -> QuillSQLResult<()> {
    for descriptor in db.catalog.holt_store.index_descriptors()? {
        if descriptor.table_ref.schema() == Some(INFORMATION_SCHEMA_NAME) {
            continue;
        }
        if db.catalog.try_table_schema(&descriptor.table_ref).is_none() {
            continue;
        }
        if db
            .catalog
            .table_indexes(&descriptor.table_ref)
            .map(|indexes| {
                indexes
                    .iter()
                    .any(|index| index.name == descriptor.index_name)
            })
            .unwrap_or(false)
        {
            continue;
        }
        db.catalog.load_index(
            descriptor.table_ref,
            descriptor.index_name,
            descriptor.key_schema,
            descriptor.index_id,
        )?;
    }
    Ok(())
}

fn information_schema_rows(db: &Database, table_name: &str) -> QuillSQLResult<Vec<Tuple>> {
    let information_schema = db
        .catalog
        .schemas
        .get(INFORMATION_SCHEMA_NAME)
        .ok_or_else(|| QuillSQLError::Internal("information_schema not initialized".to_string()))?;
    let table = information_schema.tables.get(table_name).ok_or_else(|| {
        QuillSQLError::Internal(format!("information_schema.{table_name} missing"))
    })?;
    let table_ref = TableReference::Full {
        catalog: DEFAULT_CATALOG_NAME.to_string(),
        schema: INFORMATION_SCHEMA_NAME.to_string(),
        table: table_name.to_string(),
    };
    let handle = HoltTableHandle::new(
        table_ref,
        table.schema.clone(),
        table.table_id,
        db.catalog.holt_store.clone(),
    );
    let mut stream = handle.full_scan()?;
    let mut rows = Vec::new();
    while let Some((_rid, meta, tuple)) = stream.next()? {
        if !meta.is_deleted {
            rows.push(tuple);
        }
    }
    Ok(rows)
}

pub fn key_schema_to_varchar(key_schema: &Schema) -> String {
    key_schema
        .columns
        .iter()
        .map(|col| col.name.as_str())
        .collect::<Vec<_>>()
        .join(", ")
}

fn parse_key_schema_from_varchar(varchar: &str, table_schema: SchemaRef) -> QuillSQLResult<Schema> {
    let column_names = varchar
        .split(',')
        .map(|name| name.trim())
        .collect::<Vec<&str>>();
    let indices = column_names
        .into_iter()
        .map(|name| table_schema.index_of(None, name))
        .collect::<QuillSQLResult<Vec<usize>>>()?;
    table_schema.project(&indices)
}