feophantlib/engine/
executor.rs

1use crate::constants::SystemTables;
2use crate::engine::objects::types::BaseSqlTypes;
3use crate::engine::objects::{ConstraintMapper, SqlTuple};
4
5use super::io::{ConstraintManager, ConstraintManagerError};
6use super::objects::types::SqlTypeDefinition;
7use super::objects::{ParseTree, Plan, PlannedStatement, SqlTupleError, Table};
8use super::transactions::TransactionId;
9use async_stream::try_stream;
10use futures::stream::Stream;
11use std::convert::TryFrom;
12use std::num::TryFromIntError;
13use std::pin::Pin;
14use std::sync::Arc;
15use thiserror::Error;
16use uuid::Uuid;
17
18//TODO way too many clones / Arc flipping. Unsure if I could make use of references better
19
20#[derive(Clone)]
21pub struct Executor {
22    cons_man: ConstraintManager,
23}
24
25impl Executor {
26    pub fn new(cons_man: ConstraintManager) -> Executor {
27        Executor { cons_man }
28    }
29
30    pub fn execute(
31        self,
32        tran_id: TransactionId,
33        plan_tree: PlannedStatement,
34    ) -> Pin<Box<dyn Stream<Item = Result<SqlTuple, ExecutorError>> + Send>> {
35        self.execute_plans(tran_id, plan_tree.plan)
36    }
37
38    fn execute_plans(
39        self,
40        tran_id: TransactionId,
41        plan: Arc<Plan>,
42    ) -> Pin<Box<dyn Stream<Item = Result<SqlTuple, ExecutorError>> + Send>> {
43        match plan.as_ref() {
44            Plan::CartesianJoin(cp) => {
45                self.cartesian_join(tran_id, cp.left.clone(), cp.right.clone())
46            }
47            Plan::FullTableScan(fts) => {
48                self.full_table_scan(tran_id, fts.src_table.clone(), fts.target_type.clone())
49            }
50            Plan::ModifyTable(mt) => self.modify_table(tran_id, &mt.table, mt.source.clone()),
51            Plan::StaticData(sd) => self.static_data(sd.clone()),
52        }
53    }
54
55    fn cartesian_join(
56        self,
57        tran_id: TransactionId,
58        left: Arc<Plan>,
59        right: Arc<Plan>,
60    ) -> Pin<Box<impl Stream<Item = Result<SqlTuple, ExecutorError>>>> {
61        let s = try_stream! {
62            for await left_data in self.clone().execute_plans(tran_id, left) {
63                let left_data = left_data?;
64
65                for await right_data in self.clone().execute_plans(tran_id, right.clone()) {
66                    let right_data = right_data?;
67
68                    yield SqlTuple::merge(&left_data, &right_data);
69                }
70            }
71        };
72        Box::pin(s)
73    }
74
75    fn full_table_scan(
76        self,
77        tran_id: TransactionId,
78        src_table: Arc<Table>,
79        target_type: Arc<SqlTypeDefinition>,
80    ) -> Pin<Box<impl Stream<Item = Result<SqlTuple, ExecutorError>>>> {
81        let s = try_stream! {
82            let vis = self.cons_man;
83
84            for await row in vis.get_stream(tran_id, src_table.clone()) {
85                let data = row?.user_data.clone();
86
87                //Need to rewrite to the column / order needed
88                let requested_row = data.filter_map(&src_table.sql_type, &target_type)?;
89
90                yield requested_row;
91            }
92        };
93        Box::pin(s)
94    }
95
96    fn modify_table(
97        self,
98        tran_id: TransactionId,
99        table: &Arc<Table>,
100        source: Arc<Plan>,
101    ) -> Pin<Box<impl Stream<Item = Result<SqlTuple, ExecutorError>>>> {
102        let vis = self.clone().cons_man;
103        let table = table.clone();
104
105        let s = try_stream! {
106            for await val in self.clone().execute_plans(tran_id, source) {
107                let unwrapped_val = val?;
108                vis.clone()
109                    .insert_row(tran_id, &table, unwrapped_val.clone())
110                    .await?;
111                yield unwrapped_val;
112            }
113        };
114        Box::pin(s)
115    }
116
117    fn static_data(
118        self,
119        rows: Arc<Vec<SqlTuple>>,
120    ) -> Pin<Box<impl Stream<Item = Result<SqlTuple, ExecutorError>>>> {
121        let s = try_stream! {
122            for row in rows.as_ref().iter() {
123                yield row.clone();
124            }
125        };
126        Box::pin(s)
127    }
128
129    //Bypass planning since there isn't anything optimize
130    pub async fn execute_utility(
131        &mut self,
132        tran_id: TransactionId,
133        parse_tree: ParseTree,
134    ) -> Result<Vec<SqlTuple>, ExecutorError> {
135        let mut cm = self.cons_man.clone();
136
137        let create_table = match parse_tree {
138            ParseTree::CreateTable(t) => t,
139            _ => return Err(ExecutorError::NotUtility()),
140        };
141
142        let table_id = Uuid::new_v4();
143        let pg_class = SystemTables::PgClass.value();
144        let table_row = SqlTuple(vec![
145            Some(BaseSqlTypes::Uuid(table_id)),
146            Some(BaseSqlTypes::Text(create_table.table_name.clone())),
147        ]);
148
149        cm.insert_row(tran_id, &pg_class, table_row).await?;
150
151        let mut primary_key_cols = vec![];
152
153        let pg_attribute = SystemTables::PgAttribute.value();
154        for i in 0..create_table.provided_columns.len() {
155            let cm = self.cons_man.clone();
156            let i_u32 = u32::try_from(i).map_err(ExecutorError::ConversionError)?;
157            let table_row = SqlTuple(vec![
158                Some(BaseSqlTypes::Uuid(table_id)),
159                Some(BaseSqlTypes::Text(
160                    create_table.provided_columns[i].name.clone(),
161                )),
162                Some(BaseSqlTypes::Text(
163                    //TODO we did not validate that it is a real type
164                    create_table.provided_columns[i].sql_type.clone(),
165                )),
166                Some(BaseSqlTypes::Integer(i_u32)),
167                Some(BaseSqlTypes::Bool(create_table.provided_columns[i].null)),
168            ]);
169            cm.clone()
170                .insert_row(tran_id, &pg_attribute, table_row)
171                .await?;
172
173            if create_table.provided_columns[i].primary_key {
174                primary_key_cols.push(BaseSqlTypes::Integer(i_u32));
175            }
176        }
177
178        if !primary_key_cols.is_empty() {
179            //We assume the the order that columns with primary key were defined are the order desired
180            let pk_id = Uuid::new_v4();
181            let primary_key_index = SqlTuple(vec![
182                Some(BaseSqlTypes::Uuid(pk_id)),
183                Some(BaseSqlTypes::Uuid(table_id)),
184                Some(BaseSqlTypes::Text(format!(
185                    "{}_primary_key_index",
186                    create_table.table_name
187                ))),
188                Some(BaseSqlTypes::Array(primary_key_cols)),
189                Some(BaseSqlTypes::Bool(true)),
190            ]);
191            let pg_index = SystemTables::PgIndex.value();
192            self.cons_man
193                .clone()
194                .insert_row(tran_id, &pg_index, primary_key_index)
195                .await?;
196
197            //Now we can insert the constraint
198            let primary_key_constraint = SqlTuple(vec![
199                Some(BaseSqlTypes::Uuid(Uuid::new_v4())),
200                Some(BaseSqlTypes::Uuid(table_id)),
201                Some(BaseSqlTypes::Uuid(pk_id)),
202                Some(BaseSqlTypes::Text(format!(
203                    "{}_primary_key",
204                    create_table.table_name
205                ))),
206                Some(BaseSqlTypes::Text(ConstraintMapper::PrimaryKey.to_string())),
207            ]);
208            let pg_constraint = SystemTables::PgConstraint.value();
209            self.cons_man
210                .clone()
211                .insert_row(tran_id, &pg_constraint, primary_key_constraint)
212                .await?;
213        }
214
215        Ok(vec![])
216    }
217}
218
219#[derive(Debug, Error)]
220pub enum ExecutorError {
221    #[error("Not a utility statement")]
222    NotUtility(),
223    #[error(transparent)]
224    SqlTupleError(#[from] SqlTupleError),
225    #[error(transparent)]
226    ConstraintManagerError(#[from] ConstraintManagerError),
227    #[error("Unable to convert usize to u32")]
228    ConversionError(#[from] TryFromIntError),
229    #[error("Recursive Plans Not Allowed")]
230    RecursionNotAllowed(),
231    #[error("Unknown")]
232    Unknown(),
233}