1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
pub mod analyzer;
pub use analyzer::Analyzer;
pub use analyzer::AnalyzerError;

pub mod executor;
pub use executor::Executor;
pub use executor::ExecutorError;

pub mod io;
use futures::pin_mut;
use io::{IOManager, RowManager, VisibleRowManager};
pub mod objects;
use objects::ParseTree;

pub mod planner;
pub use planner::Planner;
pub use planner::PlannerError;

pub mod rewriter;
pub use rewriter::Rewriter;
pub use rewriter::RewriterError;

pub mod sql_parser;
pub use sql_parser::SqlParser;
pub use sql_parser::SqlParserError;

pub mod transactions;
use transactions::{TransactionId, TransactionManager};

use self::io::ConstraintManager;
use self::objects::QueryResult;
use std::ops::Deref;
use thiserror::Error;
use tokio_stream::StreamExt;

#[derive(Clone, Debug)]
pub struct Engine {
    analyzer: Analyzer,
    executor: Executor,
}

impl Engine {
    pub fn new(io_manager: IOManager, tran_manager: TransactionManager) -> Engine {
        let vis_row_man = VisibleRowManager::new(RowManager::new(io_manager), tran_manager);
        let con_man = ConstraintManager::new(vis_row_man.clone());
        Engine {
            analyzer: Analyzer::new(vis_row_man),
            executor: Executor::new(con_man),
        }
    }

    pub async fn process_query(
        &mut self,
        tran_id: TransactionId,
        query: String,
    ) -> Result<QueryResult, EngineError> {
        //Parse it - I need to figure out if I should do statement splitting here
        let parse_tree = SqlParser::parse(&query)?;

        if Engine::should_bypass_planning(&parse_tree) {
            let output_rows = self.executor.execute_utility(tran_id, parse_tree).await?;
            return Ok(QueryResult {
                columns: vec![],
                rows: output_rows,
            });
        }

        //Analyze it
        let query_tree = self.analyzer.analyze(tran_id, parse_tree).await?;

        //Rewrite it - noop for right now
        let rewrite_tree = Rewriter::rewrite(query_tree.clone())?;

        //Plan it
        let planned_stmt = Planner::plan(rewrite_tree)?;

        //Execute it, single shot for now
        let mut result = vec![];
        let execute_stream = self.executor.clone().execute(tran_id, planned_stmt);
        pin_mut!(execute_stream);

        while let Some(value) = execute_stream.next().await {
            result.push(value?);
        }

        let output_columns = query_tree.targets.iter().map(|t| t.0.clone()).collect();

        return Ok(QueryResult {
            columns: output_columns,
            rows: result,
        });
    }

    fn should_bypass_planning(parse_tree: &ParseTree) -> bool {
        match parse_tree.deref() {
            ParseTree::CreateTable(_) => true,
            _ => false,
        }
    }
}

#[derive(Debug, Error)]
pub enum EngineError {
    #[error(transparent)]
    AnalyzerError(#[from] AnalyzerError),
    #[error(transparent)]
    ExecutorError(#[from] ExecutorError),
    #[error(transparent)]
    QueryNotUtf8(#[from] std::string::FromUtf8Error),
    #[error(transparent)]
    RewriterError(#[from] RewriterError),
    #[error(transparent)]
    ParseError(#[from] SqlParserError),
    #[error(transparent)]
    PlannerError(#[from] PlannerError),
}

#[cfg(test)]
mod tests {
    use super::io::IOManager;
    use super::transactions::TransactionManager;
    use super::*;

    macro_rules! aw {
        ($e:expr) => {
            tokio_test::block_on($e)
        };
    }

    #[test]
    fn create_insert_select() -> Result<(), Box<dyn std::error::Error>> {
        let create_test = "create table foo (bar text)".to_string();
        let insert_test = "insert into foo values('test text')".to_string();
        let select_test = "select bar from foo".to_string();

        let mut transaction_manager = TransactionManager::new();
        let mut engine = Engine::new(IOManager::new(), transaction_manager.clone());

        let tran = aw!(transaction_manager.start_trans())?;
        aw!(engine.process_query(tran, create_test))?;
        aw!(transaction_manager.commit_trans(tran))?;

        aw!(engine.process_query(tran, insert_test))?;
        aw!(engine.process_query(tran, select_test))?;

        Ok(())
    }
}