Skip to main content

tempest_engine/
lib.rs

1use std::{ops::Bound, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use derive_more::{Display, Error, From};
5use itertools::Itertools;
6use tempest_io::Io;
7use tempest_kv::{Storage, StorageError, StorageHandle, batch::WriteBatch};
8use tempest_rt::{JoinHandle, now};
9use tempest_tql::{ParseError, parse};
10
11use crate::{
12    base::EngineStorageStrategy,
13    catalog::{Catalog, CatalogError, CatalogState},
14    config::EngineConfig,
15    ctrl::hlc::HlcGenerator,
16    query::{
17        QueryResult,
18        eval::{CompiledExpr, eval_compiled},
19        physical::PhysicalPlanNode,
20        plan::{LogicalPlanNode, LogicalPlanner, PlanError},
21    },
22    row::{
23        decoder::{RowDecodeError, RowDecoder},
24        encoder::RowEncoder,
25        resolved::ResolvedTable,
26    },
27    types::TempestValue,
28};
29
30#[macro_use]
31extern crate tracing;
32
33mod base;
34mod ctrl;
35mod row;
36
37pub mod catalog;
38pub mod config;
39pub mod query;
40pub mod types;
41
42#[cfg(test)]
43mod tests;
44
45#[derive(Debug, Display, Error, From)]
46pub enum EngineError {
47    #[display("catalog error: {}", _0)]
48    Catalog(CatalogError),
49
50    #[display("plan error: {}", _0)]
51    Plan(PlanError),
52
53    #[display("parse query errors: {}", _0.iter().join(", "))]
54    Parse(#[error(not(source))] Vec<ParseError>),
55
56    #[display("storage error: {}", _0)]
57    Storage(StorageError),
58
59    #[display("row decode error: {}", _0)]
60    RowDecode(RowDecodeError),
61}
62
63pub struct Engine<I: Io> {
64    _dir: PathBuf,
65
66    hlc: HlcGenerator,
67    catalog: Catalog<I>,
68    storage: StorageHandle,
69    storage_join: JoinHandle<()>,
70
71    _config: EngineConfig,
72}
73
74impl<I: Io> Engine<I> {
75    pub async fn open(dir: PathBuf, config: EngineConfig) -> Result<Self, EngineError> {
76        let hlc = HlcGenerator::new();
77        let catalog = Catalog::open(dir.join("catalog"), config.catalog.clone()).await?;
78        let (storage, storage_join) =
79            Storage::<I, EngineStorageStrategy>::init(dir.join("storage"), config.storage.clone());
80        Ok(Self {
81            _dir: dir,
82
83            hlc,
84            catalog,
85            storage,
86            storage_join,
87
88            _config: config,
89        })
90    }
91
92    async fn scan_rows(
93        &self,
94        table: &ResolvedTable<'_>,
95        start: Bound<Bytes>,
96        end: Bound<Bytes>,
97        predicate: Option<CompiledExpr>,
98    ) -> Result<Vec<Vec<TempestValue<'static>>>, EngineError> {
99        let decoder = RowDecoder::new(&table, &self.catalog);
100
101        // start at the right prefix, to skip other tables
102        let mut rx = self.storage.scan(start, end).await?;
103        let mut rows = Vec::new();
104        while let Ok(result) = rx.recv().await {
105            let (mut key, mut value) = result?;
106
107            let row = decoder.decode_row(&mut key, &mut value)?;
108
109            if let Some(predicate) = &predicate {
110                let TempestValue::Bool(b) = eval_compiled(predicate, &row) else {
111                    unreachable!("predicates always evaluate to a boolean")
112                };
113                if !b {
114                    continue;
115                }
116            }
117
118            rows.push(row);
119        }
120
121        Ok(rows)
122    }
123
124    async fn execute_physical_plan(
125        &mut self,
126        plan: PhysicalPlanNode,
127    ) -> Result<QueryResult, EngineError> {
128        match plan {
129            PhysicalPlanNode::Insert { key, value } => {
130                // dispatch write command
131                let mut batch = WriteBatch::new();
132                batch.put(&key, &value);
133                self.storage.write(batch).await?;
134
135                // TODO: number of lines modified
136                Ok(QueryResult::Empty)
137            }
138            PhysicalPlanNode::Scan {
139                start,
140                end,
141                columns,
142                table_id,
143                predicate,
144            } => {
145                let table = self.catalog.resolved_table(table_id);
146
147                let rows = self.scan_rows(&table, start, end, predicate).await?;
148
149                let col_indices: Vec<usize> = columns.iter().map(|(pos, _)| *pos).collect();
150                let col_names: Vec<_> = columns.into_iter().map(|(_, name)| name).collect();
151
152                let mut projected_rows = Vec::with_capacity(rows.len());
153                for row in rows {
154                    // TODO: get around this clone by removing sparsely from the decoded values
155                    let projected_row = col_indices.iter().map(|&i| row[i].clone()).collect();
156                    projected_rows.push(projected_row);
157                }
158
159                Ok(QueryResult::Rows {
160                    columns: col_names,
161                    rows: projected_rows,
162                })
163            }
164            PhysicalPlanNode::Delete {
165                start,
166                end,
167                table_id,
168                predicate,
169            } => {
170                let table = self.catalog.resolved_table(table_id);
171
172                let rows = self.scan_rows(&table, start, end, predicate).await?;
173                let deleted = rows.len();
174
175                let encoder = RowEncoder::new(&table);
176                let hlc = self.hlc.generate(now::<I>().as_millis() as u64);
177
178                let mut batch = WriteBatch::new();
179                for row in rows {
180                    // re-encode key with new HLC, then insert into delete batch
181                    let mut key = BytesMut::new();
182                    encoder.encode_row_key(&row, hlc, &mut key);
183                    batch.delete(&key);
184                }
185
186                self.storage.write(batch).await?;
187
188                Ok(QueryResult::RowsChanged(deleted))
189            }
190        }
191    }
192
193    async fn execute_plan(&mut self, plan: LogicalPlanNode) -> Result<QueryResult, EngineError> {
194        match plan {
195            LogicalPlanNode::CreateDatabase(schema) => {
196                self.catalog.create_database(schema).await?;
197                Ok(QueryResult::Empty)
198            }
199            LogicalPlanNode::CreateType(schema) => {
200                self.catalog.create_type(schema).await?;
201                Ok(QueryResult::Empty)
202            }
203            LogicalPlanNode::CreateEnum(schema) => {
204                self.catalog.create_enum(schema).await?;
205                Ok(QueryResult::Empty)
206            }
207            LogicalPlanNode::CreateTable(schema) => {
208                self.catalog.create_table(schema).await?;
209                Ok(QueryResult::Empty)
210            }
211            LogicalPlanNode::Insert { table_id, row } => {
212                let physical = self.plan_physical_insert(table_id, row);
213                self.execute_physical_plan(physical).await
214            }
215            LogicalPlanNode::Select {
216                table_id,
217                columns,
218                predicate,
219            } => {
220                let physical = self.plan_physical_select(table_id, columns, predicate);
221                self.execute_physical_plan(physical).await
222            }
223            LogicalPlanNode::Delete {
224                table_id,
225                predicate,
226            } => {
227                let physical = self.plan_physical_delete(table_id, predicate);
228                self.execute_physical_plan(physical).await
229            }
230        }
231    }
232
233    #[instrument(skip(self), level = "debug")]
234    pub async fn execute(&mut self, query: &str) -> Result<Vec<QueryResult>, EngineError> {
235        let (stmts, errors) = parse(query);
236        if !errors.is_empty() {
237            return Err(EngineError::Parse(errors));
238        }
239
240        // TODO: group transactions here or in ast?
241        // => ast as `transaction { ... }` seems good
242        let mut results = Vec::new();
243        for stmt in stmts {
244            let plan = LogicalPlanner::new(&self.catalog).plan(stmt)?;
245            results.push(self.execute_plan(plan).await?);
246        }
247        Ok(results)
248    }
249
250    pub fn catalog(&self) -> &CatalogState {
251        &self.catalog
252    }
253
254    pub async fn shutdown(self) -> Result<(), EngineError> {
255        drop(self.storage);
256        let _ = self.storage_join.await;
257        if let Err(err) = self.catalog.shutdown().await {
258            error!("failed to shut down catalog: {}", err);
259        }
260        Ok(())
261    }
262}