feophantlib/
engine.rs

1pub mod analyzer;
2pub use analyzer::Analyzer;
3pub use analyzer::AnalyzerError;
4
5pub mod executor;
6pub use executor::Executor;
7pub use executor::ExecutorError;
8
9pub mod io;
10use futures::pin_mut;
11use io::{RowManager, VisibleRowManager};
12pub mod objects;
13use objects::ParseTree;
14
15pub mod planner;
16pub use planner::Planner;
17pub use planner::PlannerError;
18
19pub mod rewriter;
20pub use rewriter::Rewriter;
21pub use rewriter::RewriterError;
22
23pub mod sql_parser;
24pub use sql_parser::SqlParser;
25pub use sql_parser::SqlParserError;
26
27//This module is solely to reduce code duplication in tests.
28//I was conditionally compliling it but that breaks benchmarks and integration tests
29pub mod test_objects;
30pub use test_objects::get_row;
31pub use test_objects::get_table;
32
33pub mod transactions;
34use transactions::{TransactionId, TransactionManager};
35
36use self::io::block_layer::file_manager2::FileManager2;
37use self::io::block_layer::free_space_manager::FreeSpaceManager;
38use self::io::ConstraintManager;
39use self::io::IndexManager;
40use self::objects::QueryResult;
41use std::ops::Deref;
42use std::sync::Arc;
43use thiserror::Error;
44use tokio_stream::StreamExt;
45
46#[derive(Clone)]
47pub struct Engine {
48    analyzer: Analyzer,
49    executor: Executor,
50}
51
52impl Engine {
53    pub fn new(file_manager: Arc<FileManager2>, tran_manager: TransactionManager) -> Engine {
54        let fsm = FreeSpaceManager::new(file_manager.clone());
55        let vis_row_man =
56            VisibleRowManager::new(RowManager::new(file_manager.clone(), fsm), tran_manager);
57        let index_manager = IndexManager::new(file_manager);
58        let con_man = ConstraintManager::new(index_manager, vis_row_man.clone());
59        Engine {
60            analyzer: Analyzer::new(vis_row_man),
61            executor: Executor::new(con_man),
62        }
63    }
64
65    pub async fn process_query(
66        &mut self,
67        tran_id: TransactionId,
68        query: String,
69    ) -> Result<QueryResult, EngineError> {
70        //Parse it - I need to figure out if I should do statement splitting here
71        let parse_tree = SqlParser::parse(&query)?;
72
73        if Engine::should_bypass_planning(&parse_tree) {
74            let output_rows = self.executor.execute_utility(tran_id, parse_tree).await?;
75            return Ok(QueryResult {
76                columns: vec![],
77                rows: output_rows,
78            });
79        }
80
81        //Analyze it
82        let query_tree = self.analyzer.analyze(tran_id, parse_tree).await?;
83
84        //Rewrite it - noop for right now
85        let rewrite_tree = Rewriter::rewrite(query_tree.clone())?;
86
87        //Plan it
88        let planned_stmt = Planner::plan(rewrite_tree)?;
89
90        //Execute it, single shot for now
91        let mut result = vec![];
92        let execute_stream = self.executor.clone().execute(tran_id, planned_stmt);
93        pin_mut!(execute_stream);
94
95        while let Some(value) = execute_stream.next().await {
96            result.push(value?);
97        }
98
99        let output_columns = query_tree.targets.iter().map(|t| t.0.clone()).collect();
100
101        Ok(QueryResult {
102            columns: output_columns,
103            rows: result,
104        })
105    }
106
107    fn should_bypass_planning(parse_tree: &ParseTree) -> bool {
108        matches!(parse_tree.deref(), ParseTree::CreateTable(_))
109    }
110}
111
112#[derive(Debug, Error)]
113pub enum EngineError {
114    #[error(transparent)]
115    AnalyzerError(#[from] AnalyzerError),
116    #[error(transparent)]
117    ExecutorError(#[from] ExecutorError),
118    #[error(transparent)]
119    QueryNotUtf8(#[from] std::string::FromUtf8Error),
120    #[error(transparent)]
121    RewriterError(#[from] RewriterError),
122    #[error(transparent)]
123    ParseError(#[from] SqlParserError),
124    #[error(transparent)]
125    PlannerError(#[from] PlannerError),
126}
127
128#[cfg(test)]
129mod tests {
130    use tempfile::TempDir;
131
132    use super::transactions::TransactionManager;
133    use super::*;
134
135    #[tokio::test]
136    async fn create_insert_select() -> Result<(), Box<dyn std::error::Error>> {
137        let tmp = TempDir::new()?;
138        let tmp_dir = tmp.path().as_os_str().to_os_string();
139
140        let create_test = "create table foo (bar text)".to_string();
141        let insert_test = "insert into foo values('test text')".to_string();
142        let select_test = "select bar from foo".to_string();
143
144        let mut transaction_manager = TransactionManager::new();
145        let mut engine = Engine::new(
146            Arc::new(FileManager2::new(tmp_dir)?),
147            transaction_manager.clone(),
148        );
149
150        let tran = transaction_manager.start_trans().await?;
151        engine.process_query(tran, create_test).await?;
152        transaction_manager.commit_trans(tran).await?;
153
154        engine.process_query(tran, insert_test).await?;
155        engine.process_query(tran, select_test).await?;
156
157        Ok(())
158    }
159}