quill-sql 0.3.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
mod aggregate;
mod analyze;
mod create_index;
mod create_table;
mod delete;
mod drop_index;
mod drop_table;
mod empty_relation;
mod filter;
mod insert;
mod join;
mod limit;
mod project;
mod sort;
mod table_scan;
mod update;
mod util;
mod values;

pub use aggregate::Aggregate;
pub use analyze::Analyze;
pub use create_index::CreateIndex;
pub use create_table::CreateTable;
pub use delete::Delete;
pub use drop_index::DropIndex;
pub use drop_table::DropTable;
pub use empty_relation::EmptyRelation;
pub use filter::Filter;
pub use insert::Insert;
pub use join::{Join, JoinType};
pub use limit::Limit;
pub use project::Project;
pub use sort::{OrderByExpr, Sort};
pub use table_scan::TableScan;
pub use update::Update;
pub use util::*;
pub use values::Values;

use crate::catalog::{
    SchemaRef, DELETE_OUTPUT_SCHEMA_REF, EMPTY_SCHEMA_REF, INSERT_OUTPUT_SCHEMA_REF,
    UPDATE_OUTPUT_SCHEMA_REF,
};
use crate::error::{QuillSQLError, QuillSQLResult};
use crate::transaction::IsolationLevel;
use sqlparser::ast::{TransactionAccessMode, TransactionMode};
use std::sync::Arc;

#[derive(Debug, Clone)]
pub enum LogicalPlan {
    CreateTable(CreateTable),
    CreateIndex(CreateIndex),
    DropTable(DropTable),
    DropIndex(DropIndex),
    Filter(Filter),
    Insert(Insert),
    Join(Join),
    Limit(Limit),
    Project(Project),
    TableScan(TableScan),
    Sort(Sort),
    Values(Values),
    EmptyRelation(EmptyRelation),
    Aggregate(Aggregate),
    Update(Update),
    Delete(Delete),
    Analyze(Analyze),
    BeginTransaction(TransactionModes),
    CommitTransaction,
    RollbackTransaction,
    SetTransaction {
        scope: TransactionScope,
        modes: TransactionModes,
    },
}

impl LogicalPlan {
    pub fn schema(&self) -> &SchemaRef {
        match self {
            LogicalPlan::CreateTable(_) => &EMPTY_SCHEMA_REF,
            LogicalPlan::CreateIndex(_) => &EMPTY_SCHEMA_REF,
            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
            LogicalPlan::Insert(_) => &INSERT_OUTPUT_SCHEMA_REF,
            LogicalPlan::Join(Join { schema, .. }) => schema,
            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
            LogicalPlan::Project(Project { schema, .. }) => schema,
            LogicalPlan::TableScan(TableScan { table_schema, .. }) => table_schema,
            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
            LogicalPlan::Values(Values { schema, .. }) => schema,
            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
            LogicalPlan::Update(_) => &UPDATE_OUTPUT_SCHEMA_REF,
            LogicalPlan::Delete(_) => &DELETE_OUTPUT_SCHEMA_REF,
            LogicalPlan::DropTable(_) => &EMPTY_SCHEMA_REF,
            LogicalPlan::DropIndex(_) => &EMPTY_SCHEMA_REF,
            LogicalPlan::Analyze(_) => &EMPTY_SCHEMA_REF,
            LogicalPlan::BeginTransaction(_)
            | LogicalPlan::CommitTransaction
            | LogicalPlan::RollbackTransaction
            | LogicalPlan::SetTransaction { .. } => &EMPTY_SCHEMA_REF,
        }
    }

    pub fn inputs(&self) -> Vec<&LogicalPlan> {
        match self {
            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
            LogicalPlan::Insert(Insert { input, .. }) => vec![input],
            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
            LogicalPlan::Project(Project { input, .. }) => vec![input],
            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
            LogicalPlan::CreateTable(_)
            | LogicalPlan::CreateIndex(_)
            | LogicalPlan::DropTable(_)
            | LogicalPlan::DropIndex(_)
            | LogicalPlan::TableScan(_)
            | LogicalPlan::Values(_)
            | LogicalPlan::Update(_)
            | LogicalPlan::Delete(_)
            | LogicalPlan::EmptyRelation(_)
            | LogicalPlan::Analyze(_)
            | LogicalPlan::BeginTransaction(_)
            | LogicalPlan::CommitTransaction
            | LogicalPlan::RollbackTransaction
            | LogicalPlan::SetTransaction { .. } => vec![],
        }
    }

    pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> QuillSQLResult<LogicalPlan> {
        match self {
            LogicalPlan::Filter(Filter { predicate, .. }) => Ok(LogicalPlan::Filter(Filter {
                predicate: predicate.clone(),
                input: expect_single_input(inputs)?,
            })),
            LogicalPlan::Insert(Insert {
                table,
                table_schema,
                projected_schema,
                ..
            }) => Ok(LogicalPlan::Insert(Insert {
                table: table.clone(),
                table_schema: table_schema.clone(),
                projected_schema: projected_schema.clone(),
                input: expect_single_input(inputs)?,
            })),
            LogicalPlan::Join(Join {
                join_type,
                condition,
                schema,
                ..
            }) => {
                let (left, right) = expect_two_inputs(inputs)?;
                Ok(LogicalPlan::Join(Join {
                    left,
                    right,
                    join_type: *join_type,
                    condition: condition.clone(),
                    schema: schema.clone(),
                }))
            }
            LogicalPlan::Limit(Limit { limit, offset, .. }) => Ok(LogicalPlan::Limit(Limit {
                limit: *limit,
                offset: *offset,
                input: expect_single_input(inputs)?,
            })),
            LogicalPlan::Project(Project { exprs, schema, .. }) => {
                Ok(LogicalPlan::Project(Project {
                    exprs: exprs.clone(),
                    schema: schema.clone(),
                    input: expect_single_input(inputs)?,
                }))
            }
            LogicalPlan::Sort(Sort {
                order_by, limit, ..
            }) => Ok(LogicalPlan::Sort(Sort {
                order_by: order_by.clone(),
                limit: *limit,
                input: expect_single_input(inputs)?,
            })),
            LogicalPlan::Aggregate(Aggregate {
                group_exprs,
                aggr_exprs,
                schema,
                ..
            }) => Ok(LogicalPlan::Aggregate(Aggregate {
                group_exprs: group_exprs.clone(),
                aggr_exprs: aggr_exprs.clone(),
                schema: schema.clone(),
                input: expect_single_input(inputs)?,
            })),
            LogicalPlan::CreateTable(_)
            | LogicalPlan::CreateIndex(_)
            | LogicalPlan::DropTable(_)
            | LogicalPlan::DropIndex(_)
            | LogicalPlan::TableScan(_)
            | LogicalPlan::Values(_)
            | LogicalPlan::Update(_)
            | LogicalPlan::Delete(_)
            | LogicalPlan::EmptyRelation(_)
            | LogicalPlan::Analyze(_)
            | LogicalPlan::BeginTransaction(_)
            | LogicalPlan::CommitTransaction
            | LogicalPlan::RollbackTransaction
            | LogicalPlan::SetTransaction { .. } => Ok(self.clone()),
        }
    }
}

fn expect_single_input(inputs: &[LogicalPlan]) -> QuillSQLResult<Arc<LogicalPlan>> {
    expect_input_at(inputs, 0, "should have at least one input")
}

fn expect_two_inputs(
    inputs: &[LogicalPlan],
) -> QuillSQLResult<(Arc<LogicalPlan>, Arc<LogicalPlan>)> {
    let left = expect_input_at(inputs, 0, "should have at least two inputs")?;
    let right = expect_input_at(inputs, 1, "should have at least two inputs")?;
    Ok((left, right))
}

fn expect_input_at(
    inputs: &[LogicalPlan],
    index: usize,
    expectation: &str,
) -> QuillSQLResult<Arc<LogicalPlan>> {
    inputs
        .get(index)
        .cloned()
        .map(Arc::new)
        .ok_or_else(|| QuillSQLError::Internal(format!("inputs {:?} {}", inputs, expectation)))
}

impl std::fmt::Display for LogicalPlan {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            LogicalPlan::CreateTable(v) => write!(f, "{v}"),
            LogicalPlan::CreateIndex(v) => write!(f, "{v}"),
            LogicalPlan::DropTable(v) => write!(f, "{v}"),
            LogicalPlan::DropIndex(v) => write!(f, "{v}"),
            LogicalPlan::Filter(v) => write!(f, "{v}"),
            LogicalPlan::Insert(v) => write!(f, "{v}"),
            LogicalPlan::Join(v) => write!(f, "{v}"),
            LogicalPlan::Limit(v) => write!(f, "{v}"),
            LogicalPlan::Project(v) => write!(f, "{v}"),
            LogicalPlan::TableScan(v) => write!(f, "{v}"),
            LogicalPlan::Sort(v) => write!(f, "{v}"),
            LogicalPlan::Values(v) => write!(f, "{v}"),
            LogicalPlan::EmptyRelation(v) => write!(f, "{v}"),
            LogicalPlan::Aggregate(v) => write!(f, "{v}"),
            LogicalPlan::Update(v) => write!(f, "{v}"),
            LogicalPlan::Delete(v) => write!(f, "{v}"),
            LogicalPlan::Analyze(v) => write!(f, "Analyze: {}", v.table),
            LogicalPlan::BeginTransaction(_) => write!(f, "BeginTransaction"),
            LogicalPlan::CommitTransaction => write!(f, "CommitTransaction"),
            LogicalPlan::RollbackTransaction => write!(f, "RollbackTransaction"),
            LogicalPlan::SetTransaction { .. } => write!(f, "SetTransaction"),
        }
    }
}

#[derive(Debug, Clone, Copy)]
pub enum TransactionScope {
    Session,
    Transaction,
}

#[derive(Debug, Clone, Default)]
pub struct TransactionModes {
    pub isolation_level: Option<IsolationLevel>,
    pub access_mode: Option<TransactionAccessMode>,
}

impl TransactionModes {
    pub fn from_modes(modes: &[TransactionMode]) -> Self {
        let mut result = TransactionModes::default();
        for mode in modes {
            match mode {
                TransactionMode::IsolationLevel(level) => {
                    let isolation = match level {
                        sqlparser::ast::TransactionIsolationLevel::ReadUncommitted => {
                            IsolationLevel::ReadUncommitted
                        }
                        sqlparser::ast::TransactionIsolationLevel::ReadCommitted => {
                            IsolationLevel::ReadCommitted
                        }
                        sqlparser::ast::TransactionIsolationLevel::RepeatableRead => {
                            IsolationLevel::RepeatableRead
                        }
                        sqlparser::ast::TransactionIsolationLevel::Serializable => {
                            IsolationLevel::Serializable
                        }
                    };
                    result.isolation_level = Some(isolation);
                }
                TransactionMode::AccessMode(mode) => {
                    result.access_mode = Some(*mode);
                }
            }
        }
        result
    }

    pub fn unwrap_effective_isolation(&self, fallback: IsolationLevel) -> IsolationLevel {
        self.isolation_level.unwrap_or(fallback)
    }

    pub fn unwrap_effective_access_mode(
        &self,
        fallback: TransactionAccessMode,
    ) -> TransactionAccessMode {
        self.access_mode.unwrap_or(fallback)
    }
}