use std::{ops::Bound, path::PathBuf};
use bytes::{Bytes, BytesMut};
use derive_more::{Display, Error, From};
use itertools::Itertools;
use tempest_io::Io;
use tempest_kv::{Storage, StorageError, StorageHandle, batch::WriteBatch};
use tempest_rt::{JoinHandle, now};
use tempest_tql::{ParseError, parse};
use crate::{
base::EngineStorageStrategy,
catalog::{Catalog, CatalogError, CatalogState},
config::EngineConfig,
ctrl::hlc::HlcGenerator,
query::{
QueryResult,
eval::{CompiledExpr, eval_compiled},
physical::PhysicalPlanNode,
plan::{LogicalPlanNode, LogicalPlanner, PlanError},
},
row::{
decoder::{RowDecodeError, RowDecoder},
encoder::RowEncoder,
resolved::ResolvedTable,
},
types::TempestValue,
};
#[macro_use]
extern crate tracing;
mod base;
mod ctrl;
mod row;
pub mod catalog;
pub mod config;
pub mod query;
pub mod types;
#[cfg(test)]
mod tests;
#[derive(Debug, Display, Error, From)]
pub enum EngineError {
#[display("catalog error: {}", _0)]
Catalog(CatalogError),
#[display("plan error: {}", _0)]
Plan(PlanError),
#[display("parse query errors: {}", _0.iter().join(", "))]
Parse(#[error(not(source))] Vec<ParseError>),
#[display("storage error: {}", _0)]
Storage(StorageError),
#[display("row decode error: {}", _0)]
RowDecode(RowDecodeError),
}
pub struct Engine<I: Io> {
_dir: PathBuf,
hlc: HlcGenerator,
catalog: Catalog<I>,
storage: StorageHandle,
storage_join: JoinHandle<()>,
_config: EngineConfig,
}
impl<I: Io> Engine<I> {
pub async fn open(dir: PathBuf, config: EngineConfig) -> Result<Self, EngineError> {
let hlc = HlcGenerator::new();
let catalog = Catalog::open(dir.join("catalog"), config.catalog.clone()).await?;
let (storage, storage_join) =
Storage::<I, EngineStorageStrategy>::init(dir.join("storage"), config.storage.clone());
Ok(Self {
_dir: dir,
hlc,
catalog,
storage,
storage_join,
_config: config,
})
}
async fn scan_rows(
&self,
table: &ResolvedTable<'_>,
start: Bound<Bytes>,
end: Bound<Bytes>,
predicate: Option<CompiledExpr>,
) -> Result<Vec<Vec<TempestValue<'static>>>, EngineError> {
let decoder = RowDecoder::new(&table, &self.catalog);
let mut rx = self.storage.scan(start, end).await?;
let mut rows = Vec::new();
while let Ok(result) = rx.recv().await {
let (mut key, mut value) = result?;
let row = decoder.decode_row(&mut key, &mut value)?;
if let Some(predicate) = &predicate {
let TempestValue::Bool(b) = eval_compiled(predicate, &row) else {
unreachable!("predicates always evaluate to a boolean")
};
if !b {
continue;
}
}
rows.push(row);
}
Ok(rows)
}
async fn execute_physical_plan(
&mut self,
plan: PhysicalPlanNode,
) -> Result<QueryResult, EngineError> {
match plan {
PhysicalPlanNode::Insert { key, value } => {
let mut batch = WriteBatch::new();
batch.put(&key, &value);
self.storage.write(batch).await?;
Ok(QueryResult::Empty)
}
PhysicalPlanNode::Scan {
start,
end,
columns,
table_id,
predicate,
} => {
let table = self.catalog.resolved_table(table_id);
let rows = self.scan_rows(&table, start, end, predicate).await?;
let col_indices: Vec<usize> = columns.iter().map(|(pos, _)| *pos).collect();
let col_names: Vec<_> = columns.into_iter().map(|(_, name)| name).collect();
let mut projected_rows = Vec::with_capacity(rows.len());
for row in rows {
let projected_row = col_indices.iter().map(|&i| row[i].clone()).collect();
projected_rows.push(projected_row);
}
Ok(QueryResult::Rows {
columns: col_names,
rows: projected_rows,
})
}
PhysicalPlanNode::Delete {
start,
end,
table_id,
predicate,
} => {
let table = self.catalog.resolved_table(table_id);
let rows = self.scan_rows(&table, start, end, predicate).await?;
let deleted = rows.len();
let encoder = RowEncoder::new(&table);
let hlc = self.hlc.generate(now::<I>().as_millis() as u64);
let mut batch = WriteBatch::new();
for row in rows {
let mut key = BytesMut::new();
encoder.encode_row_key(&row, hlc, &mut key);
batch.delete(&key);
}
self.storage.write(batch).await?;
Ok(QueryResult::RowsChanged(deleted))
}
}
}
async fn execute_plan(&mut self, plan: LogicalPlanNode) -> Result<QueryResult, EngineError> {
match plan {
LogicalPlanNode::CreateDatabase(schema) => {
self.catalog.create_database(schema).await?;
Ok(QueryResult::Empty)
}
LogicalPlanNode::CreateType(schema) => {
self.catalog.create_type(schema).await?;
Ok(QueryResult::Empty)
}
LogicalPlanNode::CreateEnum(schema) => {
self.catalog.create_enum(schema).await?;
Ok(QueryResult::Empty)
}
LogicalPlanNode::CreateTable(schema) => {
self.catalog.create_table(schema).await?;
Ok(QueryResult::Empty)
}
LogicalPlanNode::Insert { table_id, row } => {
let physical = self.plan_physical_insert(table_id, row);
self.execute_physical_plan(physical).await
}
LogicalPlanNode::Select {
table_id,
columns,
predicate,
} => {
let physical = self.plan_physical_select(table_id, columns, predicate);
self.execute_physical_plan(physical).await
}
LogicalPlanNode::Delete {
table_id,
predicate,
} => {
let physical = self.plan_physical_delete(table_id, predicate);
self.execute_physical_plan(physical).await
}
}
}
#[instrument(skip(self), level = "debug")]
pub async fn execute(&mut self, query: &str) -> Result<Vec<QueryResult>, EngineError> {
let (stmts, errors) = parse(query);
if !errors.is_empty() {
return Err(EngineError::Parse(errors));
}
let mut results = Vec::new();
for stmt in stmts {
let plan = LogicalPlanner::new(&self.catalog).plan(stmt)?;
results.push(self.execute_plan(plan).await?);
}
Ok(results)
}
pub fn catalog(&self) -> &CatalogState {
&self.catalog
}
pub async fn shutdown(self) -> Result<(), EngineError> {
drop(self.storage);
let _ = self.storage_join.await;
if let Err(err) = self.catalog.shutdown().await {
error!("failed to shut down catalog: {}", err);
}
Ok(())
}
}