tempest-engine 0.0.2

Relational database engine for TempestDB
Documentation
use std::{ops::Bound, path::PathBuf};

use bytes::{Bytes, BytesMut};
use derive_more::{Display, Error, From};
use itertools::Itertools;
use tempest_io::Io;
use tempest_kv::{Storage, StorageError, StorageHandle, batch::WriteBatch};
use tempest_rt::{JoinHandle, now};
use tempest_tql::{ParseError, parse};

use crate::{
    base::EngineStorageStrategy,
    catalog::{Catalog, CatalogError, CatalogState},
    config::EngineConfig,
    ctrl::hlc::HlcGenerator,
    query::{
        QueryResult,
        eval::{CompiledExpr, eval_compiled},
        physical::PhysicalPlanNode,
        plan::{LogicalPlanNode, LogicalPlanner, PlanError},
    },
    row::{
        decoder::{RowDecodeError, RowDecoder},
        encoder::RowEncoder,
        resolved::ResolvedTable,
    },
    types::TempestValue,
};

#[macro_use]
extern crate tracing;

mod base;
mod ctrl;
mod row;

pub mod catalog;
pub mod config;
pub mod query;
pub mod types;

#[cfg(test)]
mod tests;

#[derive(Debug, Display, Error, From)]
pub enum EngineError {
    #[display("catalog error: {}", _0)]
    Catalog(CatalogError),

    #[display("plan error: {}", _0)]
    Plan(PlanError),

    #[display("parse query errors: {}", _0.iter().join(", "))]
    Parse(#[error(not(source))] Vec<ParseError>),

    #[display("storage error: {}", _0)]
    Storage(StorageError),

    #[display("row decode error: {}", _0)]
    RowDecode(RowDecodeError),
}

pub struct Engine<I: Io> {
    _dir: PathBuf,

    hlc: HlcGenerator,
    catalog: Catalog<I>,
    storage: StorageHandle,
    storage_join: JoinHandle<()>,

    _config: EngineConfig,
}

impl<I: Io> Engine<I> {
    pub async fn open(dir: PathBuf, config: EngineConfig) -> Result<Self, EngineError> {
        let hlc = HlcGenerator::new();
        let catalog = Catalog::open(dir.join("catalog"), config.catalog.clone()).await?;
        let (storage, storage_join) =
            Storage::<I, EngineStorageStrategy>::init(dir.join("storage"), config.storage.clone());
        Ok(Self {
            _dir: dir,

            hlc,
            catalog,
            storage,
            storage_join,

            _config: config,
        })
    }

    async fn scan_rows(
        &self,
        table: &ResolvedTable<'_>,
        start: Bound<Bytes>,
        end: Bound<Bytes>,
        predicate: Option<CompiledExpr>,
    ) -> Result<Vec<Vec<TempestValue<'static>>>, EngineError> {
        let decoder = RowDecoder::new(&table, &self.catalog);

        // start at the right prefix, to skip other tables
        let mut rx = self.storage.scan(start, end).await?;
        let mut rows = Vec::new();
        while let Ok(result) = rx.recv().await {
            let (mut key, mut value) = result?;

            let row = decoder.decode_row(&mut key, &mut value)?;

            if let Some(predicate) = &predicate {
                let TempestValue::Bool(b) = eval_compiled(predicate, &row) else {
                    unreachable!("predicates always evaluate to a boolean")
                };
                if !b {
                    continue;
                }
            }

            rows.push(row);
        }

        Ok(rows)
    }

    async fn execute_physical_plan(
        &mut self,
        plan: PhysicalPlanNode,
    ) -> Result<QueryResult, EngineError> {
        match plan {
            PhysicalPlanNode::Insert { key, value } => {
                // dispatch write command
                let mut batch = WriteBatch::new();
                batch.put(&key, &value);
                self.storage.write(batch).await?;

                // TODO: number of lines modified
                Ok(QueryResult::Empty)
            }
            PhysicalPlanNode::Scan {
                start,
                end,
                columns,
                table_id,
                predicate,
            } => {
                let table = self.catalog.resolved_table(table_id);

                let rows = self.scan_rows(&table, start, end, predicate).await?;

                let col_indices: Vec<usize> = columns.iter().map(|(pos, _)| *pos).collect();
                let col_names: Vec<_> = columns.into_iter().map(|(_, name)| name).collect();

                let mut projected_rows = Vec::with_capacity(rows.len());
                for row in rows {
                    // TODO: get around this clone by removing sparsely from the decoded values
                    let projected_row = col_indices.iter().map(|&i| row[i].clone()).collect();
                    projected_rows.push(projected_row);
                }

                Ok(QueryResult::Rows {
                    columns: col_names,
                    rows: projected_rows,
                })
            }
            PhysicalPlanNode::Delete {
                start,
                end,
                table_id,
                predicate,
            } => {
                let table = self.catalog.resolved_table(table_id);

                let rows = self.scan_rows(&table, start, end, predicate).await?;
                let deleted = rows.len();

                let encoder = RowEncoder::new(&table);
                let hlc = self.hlc.generate(now::<I>().as_millis() as u64);

                let mut batch = WriteBatch::new();
                for row in rows {
                    // re-encode key with new HLC, then insert into delete batch
                    let mut key = BytesMut::new();
                    encoder.encode_row_key(&row, hlc, &mut key);
                    batch.delete(&key);
                }

                self.storage.write(batch).await?;

                Ok(QueryResult::RowsChanged(deleted))
            }
        }
    }

    async fn execute_plan(&mut self, plan: LogicalPlanNode) -> Result<QueryResult, EngineError> {
        match plan {
            LogicalPlanNode::CreateDatabase(schema) => {
                self.catalog.create_database(schema).await?;
                Ok(QueryResult::Empty)
            }
            LogicalPlanNode::CreateType(schema) => {
                self.catalog.create_type(schema).await?;
                Ok(QueryResult::Empty)
            }
            LogicalPlanNode::CreateEnum(schema) => {
                self.catalog.create_enum(schema).await?;
                Ok(QueryResult::Empty)
            }
            LogicalPlanNode::CreateTable(schema) => {
                self.catalog.create_table(schema).await?;
                Ok(QueryResult::Empty)
            }
            LogicalPlanNode::Insert { table_id, row } => {
                let physical = self.plan_physical_insert(table_id, row);
                self.execute_physical_plan(physical).await
            }
            LogicalPlanNode::Select {
                table_id,
                columns,
                predicate,
            } => {
                let physical = self.plan_physical_select(table_id, columns, predicate);
                self.execute_physical_plan(physical).await
            }
            LogicalPlanNode::Delete {
                table_id,
                predicate,
            } => {
                let physical = self.plan_physical_delete(table_id, predicate);
                self.execute_physical_plan(physical).await
            }
        }
    }

    #[instrument(skip(self), level = "debug")]
    pub async fn execute(&mut self, query: &str) -> Result<Vec<QueryResult>, EngineError> {
        let (stmts, errors) = parse(query);
        if !errors.is_empty() {
            return Err(EngineError::Parse(errors));
        }

        // TODO: group transactions here or in ast?
        // => ast as `transaction { ... }` seems good
        let mut results = Vec::new();
        for stmt in stmts {
            let plan = LogicalPlanner::new(&self.catalog).plan(stmt)?;
            results.push(self.execute_plan(plan).await?);
        }
        Ok(results)
    }

    pub fn catalog(&self) -> &CatalogState {
        &self.catalog
    }

    pub async fn shutdown(self) -> Result<(), EngineError> {
        drop(self.storage);
        let _ = self.storage_join.await;
        if let Err(err) = self.catalog.shutdown().await {
            error!("failed to shut down catalog: {}", err);
        }
        Ok(())
    }
}