1pub mod analyzer;
2pub use analyzer::Analyzer;
3pub use analyzer::AnalyzerError;
4
5pub mod executor;
6pub use executor::Executor;
7pub use executor::ExecutorError;
8
9pub mod io;
10use futures::pin_mut;
11use io::{RowManager, VisibleRowManager};
12pub mod objects;
13use objects::ParseTree;
14
15pub mod planner;
16pub use planner::Planner;
17pub use planner::PlannerError;
18
19pub mod rewriter;
20pub use rewriter::Rewriter;
21pub use rewriter::RewriterError;
22
23pub mod sql_parser;
24pub use sql_parser::SqlParser;
25pub use sql_parser::SqlParserError;
26
27pub mod test_objects;
30pub use test_objects::get_row;
31pub use test_objects::get_table;
32
33pub mod transactions;
34use transactions::{TransactionId, TransactionManager};
35
36use self::io::block_layer::file_manager2::FileManager2;
37use self::io::block_layer::free_space_manager::FreeSpaceManager;
38use self::io::ConstraintManager;
39use self::io::IndexManager;
40use self::objects::QueryResult;
41use std::ops::Deref;
42use std::sync::Arc;
43use thiserror::Error;
44use tokio_stream::StreamExt;
45
46#[derive(Clone)]
47pub struct Engine {
48 analyzer: Analyzer,
49 executor: Executor,
50}
51
52impl Engine {
53 pub fn new(file_manager: Arc<FileManager2>, tran_manager: TransactionManager) -> Engine {
54 let fsm = FreeSpaceManager::new(file_manager.clone());
55 let vis_row_man =
56 VisibleRowManager::new(RowManager::new(file_manager.clone(), fsm), tran_manager);
57 let index_manager = IndexManager::new(file_manager);
58 let con_man = ConstraintManager::new(index_manager, vis_row_man.clone());
59 Engine {
60 analyzer: Analyzer::new(vis_row_man),
61 executor: Executor::new(con_man),
62 }
63 }
64
65 pub async fn process_query(
66 &mut self,
67 tran_id: TransactionId,
68 query: String,
69 ) -> Result<QueryResult, EngineError> {
70 let parse_tree = SqlParser::parse(&query)?;
72
73 if Engine::should_bypass_planning(&parse_tree) {
74 let output_rows = self.executor.execute_utility(tran_id, parse_tree).await?;
75 return Ok(QueryResult {
76 columns: vec![],
77 rows: output_rows,
78 });
79 }
80
81 let query_tree = self.analyzer.analyze(tran_id, parse_tree).await?;
83
84 let rewrite_tree = Rewriter::rewrite(query_tree.clone())?;
86
87 let planned_stmt = Planner::plan(rewrite_tree)?;
89
90 let mut result = vec![];
92 let execute_stream = self.executor.clone().execute(tran_id, planned_stmt);
93 pin_mut!(execute_stream);
94
95 while let Some(value) = execute_stream.next().await {
96 result.push(value?);
97 }
98
99 let output_columns = query_tree.targets.iter().map(|t| t.0.clone()).collect();
100
101 Ok(QueryResult {
102 columns: output_columns,
103 rows: result,
104 })
105 }
106
107 fn should_bypass_planning(parse_tree: &ParseTree) -> bool {
108 matches!(parse_tree.deref(), ParseTree::CreateTable(_))
109 }
110}
111
112#[derive(Debug, Error)]
113pub enum EngineError {
114 #[error(transparent)]
115 AnalyzerError(#[from] AnalyzerError),
116 #[error(transparent)]
117 ExecutorError(#[from] ExecutorError),
118 #[error(transparent)]
119 QueryNotUtf8(#[from] std::string::FromUtf8Error),
120 #[error(transparent)]
121 RewriterError(#[from] RewriterError),
122 #[error(transparent)]
123 ParseError(#[from] SqlParserError),
124 #[error(transparent)]
125 PlannerError(#[from] PlannerError),
126}
127
128#[cfg(test)]
129mod tests {
130 use tempfile::TempDir;
131
132 use super::transactions::TransactionManager;
133 use super::*;
134
135 #[tokio::test]
136 async fn create_insert_select() -> Result<(), Box<dyn std::error::Error>> {
137 let tmp = TempDir::new()?;
138 let tmp_dir = tmp.path().as_os_str().to_os_string();
139
140 let create_test = "create table foo (bar text)".to_string();
141 let insert_test = "insert into foo values('test text')".to_string();
142 let select_test = "select bar from foo".to_string();
143
144 let mut transaction_manager = TransactionManager::new();
145 let mut engine = Engine::new(
146 Arc::new(FileManager2::new(tmp_dir)?),
147 transaction_manager.clone(),
148 );
149
150 let tran = transaction_manager.start_trans().await?;
151 engine.process_query(tran, create_test).await?;
152 transaction_manager.commit_trans(tran).await?;
153
154 engine.process_query(tran, insert_test).await?;
155 engine.process_query(tran, select_test).await?;
156
157 Ok(())
158 }
159}