1use crate::constants::SystemTables;
2use crate::engine::objects::types::BaseSqlTypes;
3use crate::engine::objects::{ConstraintMapper, SqlTuple};
4
5use super::io::{ConstraintManager, ConstraintManagerError};
6use super::objects::types::SqlTypeDefinition;
7use super::objects::{ParseTree, Plan, PlannedStatement, SqlTupleError, Table};
8use super::transactions::TransactionId;
9use async_stream::try_stream;
10use futures::stream::Stream;
11use std::convert::TryFrom;
12use std::num::TryFromIntError;
13use std::pin::Pin;
14use std::sync::Arc;
15use thiserror::Error;
16use uuid::Uuid;
17
18#[derive(Clone)]
21pub struct Executor {
22 cons_man: ConstraintManager,
23}
24
25impl Executor {
26 pub fn new(cons_man: ConstraintManager) -> Executor {
27 Executor { cons_man }
28 }
29
30 pub fn execute(
31 self,
32 tran_id: TransactionId,
33 plan_tree: PlannedStatement,
34 ) -> Pin<Box<dyn Stream<Item = Result<SqlTuple, ExecutorError>> + Send>> {
35 self.execute_plans(tran_id, plan_tree.plan)
36 }
37
38 fn execute_plans(
39 self,
40 tran_id: TransactionId,
41 plan: Arc<Plan>,
42 ) -> Pin<Box<dyn Stream<Item = Result<SqlTuple, ExecutorError>> + Send>> {
43 match plan.as_ref() {
44 Plan::CartesianJoin(cp) => {
45 self.cartesian_join(tran_id, cp.left.clone(), cp.right.clone())
46 }
47 Plan::FullTableScan(fts) => {
48 self.full_table_scan(tran_id, fts.src_table.clone(), fts.target_type.clone())
49 }
50 Plan::ModifyTable(mt) => self.modify_table(tran_id, &mt.table, mt.source.clone()),
51 Plan::StaticData(sd) => self.static_data(sd.clone()),
52 }
53 }
54
55 fn cartesian_join(
56 self,
57 tran_id: TransactionId,
58 left: Arc<Plan>,
59 right: Arc<Plan>,
60 ) -> Pin<Box<impl Stream<Item = Result<SqlTuple, ExecutorError>>>> {
61 let s = try_stream! {
62 for await left_data in self.clone().execute_plans(tran_id, left) {
63 let left_data = left_data?;
64
65 for await right_data in self.clone().execute_plans(tran_id, right.clone()) {
66 let right_data = right_data?;
67
68 yield SqlTuple::merge(&left_data, &right_data);
69 }
70 }
71 };
72 Box::pin(s)
73 }
74
75 fn full_table_scan(
76 self,
77 tran_id: TransactionId,
78 src_table: Arc<Table>,
79 target_type: Arc<SqlTypeDefinition>,
80 ) -> Pin<Box<impl Stream<Item = Result<SqlTuple, ExecutorError>>>> {
81 let s = try_stream! {
82 let vis = self.cons_man;
83
84 for await row in vis.get_stream(tran_id, src_table.clone()) {
85 let data = row?.user_data.clone();
86
87 let requested_row = data.filter_map(&src_table.sql_type, &target_type)?;
89
90 yield requested_row;
91 }
92 };
93 Box::pin(s)
94 }
95
96 fn modify_table(
97 self,
98 tran_id: TransactionId,
99 table: &Arc<Table>,
100 source: Arc<Plan>,
101 ) -> Pin<Box<impl Stream<Item = Result<SqlTuple, ExecutorError>>>> {
102 let vis = self.clone().cons_man;
103 let table = table.clone();
104
105 let s = try_stream! {
106 for await val in self.clone().execute_plans(tran_id, source) {
107 let unwrapped_val = val?;
108 vis.clone()
109 .insert_row(tran_id, &table, unwrapped_val.clone())
110 .await?;
111 yield unwrapped_val;
112 }
113 };
114 Box::pin(s)
115 }
116
117 fn static_data(
118 self,
119 rows: Arc<Vec<SqlTuple>>,
120 ) -> Pin<Box<impl Stream<Item = Result<SqlTuple, ExecutorError>>>> {
121 let s = try_stream! {
122 for row in rows.as_ref().iter() {
123 yield row.clone();
124 }
125 };
126 Box::pin(s)
127 }
128
129 pub async fn execute_utility(
131 &mut self,
132 tran_id: TransactionId,
133 parse_tree: ParseTree,
134 ) -> Result<Vec<SqlTuple>, ExecutorError> {
135 let mut cm = self.cons_man.clone();
136
137 let create_table = match parse_tree {
138 ParseTree::CreateTable(t) => t,
139 _ => return Err(ExecutorError::NotUtility()),
140 };
141
142 let table_id = Uuid::new_v4();
143 let pg_class = SystemTables::PgClass.value();
144 let table_row = SqlTuple(vec![
145 Some(BaseSqlTypes::Uuid(table_id)),
146 Some(BaseSqlTypes::Text(create_table.table_name.clone())),
147 ]);
148
149 cm.insert_row(tran_id, &pg_class, table_row).await?;
150
151 let mut primary_key_cols = vec![];
152
153 let pg_attribute = SystemTables::PgAttribute.value();
154 for i in 0..create_table.provided_columns.len() {
155 let cm = self.cons_man.clone();
156 let i_u32 = u32::try_from(i).map_err(ExecutorError::ConversionError)?;
157 let table_row = SqlTuple(vec![
158 Some(BaseSqlTypes::Uuid(table_id)),
159 Some(BaseSqlTypes::Text(
160 create_table.provided_columns[i].name.clone(),
161 )),
162 Some(BaseSqlTypes::Text(
163 create_table.provided_columns[i].sql_type.clone(),
165 )),
166 Some(BaseSqlTypes::Integer(i_u32)),
167 Some(BaseSqlTypes::Bool(create_table.provided_columns[i].null)),
168 ]);
169 cm.clone()
170 .insert_row(tran_id, &pg_attribute, table_row)
171 .await?;
172
173 if create_table.provided_columns[i].primary_key {
174 primary_key_cols.push(BaseSqlTypes::Integer(i_u32));
175 }
176 }
177
178 if !primary_key_cols.is_empty() {
179 let pk_id = Uuid::new_v4();
181 let primary_key_index = SqlTuple(vec![
182 Some(BaseSqlTypes::Uuid(pk_id)),
183 Some(BaseSqlTypes::Uuid(table_id)),
184 Some(BaseSqlTypes::Text(format!(
185 "{}_primary_key_index",
186 create_table.table_name
187 ))),
188 Some(BaseSqlTypes::Array(primary_key_cols)),
189 Some(BaseSqlTypes::Bool(true)),
190 ]);
191 let pg_index = SystemTables::PgIndex.value();
192 self.cons_man
193 .clone()
194 .insert_row(tran_id, &pg_index, primary_key_index)
195 .await?;
196
197 let primary_key_constraint = SqlTuple(vec![
199 Some(BaseSqlTypes::Uuid(Uuid::new_v4())),
200 Some(BaseSqlTypes::Uuid(table_id)),
201 Some(BaseSqlTypes::Uuid(pk_id)),
202 Some(BaseSqlTypes::Text(format!(
203 "{}_primary_key",
204 create_table.table_name
205 ))),
206 Some(BaseSqlTypes::Text(ConstraintMapper::PrimaryKey.to_string())),
207 ]);
208 let pg_constraint = SystemTables::PgConstraint.value();
209 self.cons_man
210 .clone()
211 .insert_row(tran_id, &pg_constraint, primary_key_constraint)
212 .await?;
213 }
214
215 Ok(vec![])
216 }
217}
218
219#[derive(Debug, Error)]
220pub enum ExecutorError {
221 #[error("Not a utility statement")]
222 NotUtility(),
223 #[error(transparent)]
224 SqlTupleError(#[from] SqlTupleError),
225 #[error(transparent)]
226 ConstraintManagerError(#[from] ConstraintManagerError),
227 #[error("Unable to convert usize to u32")]
228 ConversionError(#[from] TryFromIntError),
229 #[error("Recursive Plans Not Allowed")]
230 RecursionNotAllowed(),
231 #[error("Unknown")]
232 Unknown(),
233}