1use std::{ops::Bound, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use derive_more::{Display, Error, From};
5use itertools::Itertools;
6use tempest_io::Io;
7use tempest_kv::{Storage, StorageError, StorageHandle, batch::WriteBatch};
8use tempest_rt::{JoinHandle, now};
9use tempest_tql::{ParseError, parse};
10
11use crate::{
12 base::EngineStorageStrategy,
13 catalog::{Catalog, CatalogError, CatalogState},
14 config::EngineConfig,
15 ctrl::hlc::HlcGenerator,
16 query::{
17 QueryResult,
18 eval::{CompiledExpr, eval_compiled},
19 physical::PhysicalPlanNode,
20 plan::{LogicalPlanNode, LogicalPlanner, PlanError},
21 },
22 row::{
23 decoder::{RowDecodeError, RowDecoder},
24 encoder::RowEncoder,
25 resolved::ResolvedTable,
26 },
27 types::TempestValue,
28};
29
30#[macro_use]
31extern crate tracing;
32
33mod base;
34mod ctrl;
35mod row;
36
37pub mod catalog;
38pub mod config;
39pub mod query;
40pub mod types;
41
42#[cfg(test)]
43mod tests;
44
45#[derive(Debug, Display, Error, From)]
46pub enum EngineError {
47 #[display("catalog error: {}", _0)]
48 Catalog(CatalogError),
49
50 #[display("plan error: {}", _0)]
51 Plan(PlanError),
52
53 #[display("parse query errors: {}", _0.iter().join(", "))]
54 Parse(#[error(not(source))] Vec<ParseError>),
55
56 #[display("storage error: {}", _0)]
57 Storage(StorageError),
58
59 #[display("row decode error: {}", _0)]
60 RowDecode(RowDecodeError),
61}
62
63pub struct Engine<I: Io> {
64 _dir: PathBuf,
65
66 hlc: HlcGenerator,
67 catalog: Catalog<I>,
68 storage: StorageHandle,
69 storage_join: JoinHandle<()>,
70
71 _config: EngineConfig,
72}
73
74impl<I: Io> Engine<I> {
75 pub async fn open(dir: PathBuf, config: EngineConfig) -> Result<Self, EngineError> {
76 let hlc = HlcGenerator::new();
77 let catalog = Catalog::open(dir.join("catalog"), config.catalog.clone()).await?;
78 let (storage, storage_join) =
79 Storage::<I, EngineStorageStrategy>::init(dir.join("storage"), config.storage.clone());
80 Ok(Self {
81 _dir: dir,
82
83 hlc,
84 catalog,
85 storage,
86 storage_join,
87
88 _config: config,
89 })
90 }
91
92 async fn scan_rows(
93 &self,
94 table: &ResolvedTable<'_>,
95 start: Bound<Bytes>,
96 end: Bound<Bytes>,
97 predicate: Option<CompiledExpr>,
98 ) -> Result<Vec<Vec<TempestValue<'static>>>, EngineError> {
99 let decoder = RowDecoder::new(&table, &self.catalog);
100
101 let mut rx = self.storage.scan(start, end).await?;
103 let mut rows = Vec::new();
104 while let Ok(result) = rx.recv().await {
105 let (mut key, mut value) = result?;
106
107 let row = decoder.decode_row(&mut key, &mut value)?;
108
109 if let Some(predicate) = &predicate {
110 let TempestValue::Bool(b) = eval_compiled(predicate, &row) else {
111 unreachable!("predicates always evaluate to a boolean")
112 };
113 if !b {
114 continue;
115 }
116 }
117
118 rows.push(row);
119 }
120
121 Ok(rows)
122 }
123
124 async fn execute_physical_plan(
125 &mut self,
126 plan: PhysicalPlanNode,
127 ) -> Result<QueryResult, EngineError> {
128 match plan {
129 PhysicalPlanNode::Insert { key, value } => {
130 let mut batch = WriteBatch::new();
132 batch.put(&key, &value);
133 self.storage.write(batch).await?;
134
135 Ok(QueryResult::Empty)
137 }
138 PhysicalPlanNode::Scan {
139 start,
140 end,
141 columns,
142 table_id,
143 predicate,
144 } => {
145 let table = self.catalog.resolved_table(table_id);
146
147 let rows = self.scan_rows(&table, start, end, predicate).await?;
148
149 let col_indices: Vec<usize> = columns.iter().map(|(pos, _)| *pos).collect();
150 let col_names: Vec<_> = columns.into_iter().map(|(_, name)| name).collect();
151
152 let mut projected_rows = Vec::with_capacity(rows.len());
153 for row in rows {
154 let projected_row = col_indices.iter().map(|&i| row[i].clone()).collect();
156 projected_rows.push(projected_row);
157 }
158
159 Ok(QueryResult::Rows {
160 columns: col_names,
161 rows: projected_rows,
162 })
163 }
164 PhysicalPlanNode::Delete {
165 start,
166 end,
167 table_id,
168 predicate,
169 } => {
170 let table = self.catalog.resolved_table(table_id);
171
172 let rows = self.scan_rows(&table, start, end, predicate).await?;
173 let deleted = rows.len();
174
175 let encoder = RowEncoder::new(&table);
176 let hlc = self.hlc.generate(now::<I>().as_millis() as u64);
177
178 let mut batch = WriteBatch::new();
179 for row in rows {
180 let mut key = BytesMut::new();
182 encoder.encode_row_key(&row, hlc, &mut key);
183 batch.delete(&key);
184 }
185
186 self.storage.write(batch).await?;
187
188 Ok(QueryResult::RowsChanged(deleted))
189 }
190 }
191 }
192
193 async fn execute_plan(&mut self, plan: LogicalPlanNode) -> Result<QueryResult, EngineError> {
194 match plan {
195 LogicalPlanNode::CreateDatabase(schema) => {
196 self.catalog.create_database(schema).await?;
197 Ok(QueryResult::Empty)
198 }
199 LogicalPlanNode::CreateType(schema) => {
200 self.catalog.create_type(schema).await?;
201 Ok(QueryResult::Empty)
202 }
203 LogicalPlanNode::CreateEnum(schema) => {
204 self.catalog.create_enum(schema).await?;
205 Ok(QueryResult::Empty)
206 }
207 LogicalPlanNode::CreateTable(schema) => {
208 self.catalog.create_table(schema).await?;
209 Ok(QueryResult::Empty)
210 }
211 LogicalPlanNode::Insert { table_id, row } => {
212 let physical = self.plan_physical_insert(table_id, row);
213 self.execute_physical_plan(physical).await
214 }
215 LogicalPlanNode::Select {
216 table_id,
217 columns,
218 predicate,
219 } => {
220 let physical = self.plan_physical_select(table_id, columns, predicate);
221 self.execute_physical_plan(physical).await
222 }
223 LogicalPlanNode::Delete {
224 table_id,
225 predicate,
226 } => {
227 let physical = self.plan_physical_delete(table_id, predicate);
228 self.execute_physical_plan(physical).await
229 }
230 }
231 }
232
233 #[instrument(skip(self), level = "debug")]
234 pub async fn execute(&mut self, query: &str) -> Result<Vec<QueryResult>, EngineError> {
235 let (stmts, errors) = parse(query);
236 if !errors.is_empty() {
237 return Err(EngineError::Parse(errors));
238 }
239
240 let mut results = Vec::new();
243 for stmt in stmts {
244 let plan = LogicalPlanner::new(&self.catalog).plan(stmt)?;
245 results.push(self.execute_plan(plan).await?);
246 }
247 Ok(results)
248 }
249
250 pub fn catalog(&self) -> &CatalogState {
251 &self.catalog
252 }
253
254 pub async fn shutdown(self) -> Result<(), EngineError> {
255 drop(self.storage);
256 let _ = self.storage_join.await;
257 if let Err(err) = self.catalog.shutdown().await {
258 error!("failed to shut down catalog: {}", err);
259 }
260 Ok(())
261 }
262}