tempest-engine 0.0.2

Relational database engine for TempestDB
Documentation
use std::ops::Bound;

use bytes::{Bytes, BytesMut};
use tempest_core::tempest_str::TempestStr;
use tempest_core::utils::next_prefix;
use tempest_io::Io;
use tempest_rt::now;

use crate::{
    Engine,
    catalog::schema::TableId,
    query::eval::CompiledExpr,
    row::encoder::RowEncoder,
    types::TempestValue,
};

pub(crate) enum PhysicalPlanNode {
    Insert {
        key: Bytes,
        value: Bytes,
    },
    Scan {
        start: Bound<Bytes>,
        end: Bound<Bytes>,
        /// (flat position, column name) pairs in output order
        columns: Vec<(usize, TempestStr<'static>)>,
        table_id: TableId,
        predicate: Option<CompiledExpr>,
    },
    Delete {
        start: Bound<Bytes>,
        end: Bound<Bytes>,
        table_id: TableId,
        predicate: Option<CompiledExpr>,
    },
}

impl<I: Io> Engine<I> {
    pub(crate) fn plan_physical_insert(
        &mut self,
        table_id: TableId,
        row: Vec<TempestValue<'static>>,
    ) -> PhysicalPlanNode {
        let resolved = self.catalog.resolved_table(table_id);

        let hlc = self.hlc.generate(now::<I>().as_millis() as u64);
        let encoder = RowEncoder::new(&resolved);
        let mut key_buf = BytesMut::new();
        let mut value_buf = BytesMut::new();
        encoder.encode_row(&row, hlc, &mut key_buf, &mut value_buf);

        PhysicalPlanNode::Insert {
            key: key_buf.freeze(),
            value: value_buf.freeze(),
        }
    }

    pub(crate) fn plan_physical_select(
        &self,
        table_id: TableId,
        columns: Vec<(usize, TempestStr<'static>)>,
        predicate: Option<CompiledExpr>,
    ) -> PhysicalPlanNode {
        let resolved = self.catalog.resolved_table(table_id);

        let encoder = RowEncoder::new(&resolved);
        let seek_prefix = encoder.search_prefix();
        let end_bound = Bound::Excluded(
            next_prefix(&seek_prefix).expect("table prefix should always have a successor prefix"),
        );
        let start_bound = Bound::Included(seek_prefix);
        PhysicalPlanNode::Scan {
            start: start_bound,
            end: end_bound,
            columns,
            table_id,
            predicate,
        }
    }

    pub(crate) fn plan_physical_delete(
        &self,
        table_id: TableId,
        predicate: Option<CompiledExpr>,
    ) -> PhysicalPlanNode {
        let resolved = self.catalog.resolved_table(table_id);

        let encoder = RowEncoder::new(&resolved);
        let seek_prefix = encoder.search_prefix();
        let end_bound = Bound::Excluded(
            next_prefix(&seek_prefix).expect("table prefix should always have a successor prefix"),
        );
        let start_bound = Bound::Included(seek_prefix);
        PhysicalPlanNode::Delete {
            start: start_bound,
            end: end_bound,
            table_id,
            predicate,
        }
    }
}