use std::ops::Bound;
use bytes::{Bytes, BytesMut};
use tempest_core::tempest_str::TempestStr;
use tempest_core::utils::next_prefix;
use tempest_io::Io;
use tempest_rt::now;
use crate::{
Engine,
catalog::schema::TableId,
query::eval::CompiledExpr,
row::encoder::RowEncoder,
types::TempestValue,
};
pub(crate) enum PhysicalPlanNode {
Insert {
key: Bytes,
value: Bytes,
},
Scan {
start: Bound<Bytes>,
end: Bound<Bytes>,
columns: Vec<(usize, TempestStr<'static>)>,
table_id: TableId,
predicate: Option<CompiledExpr>,
},
Delete {
start: Bound<Bytes>,
end: Bound<Bytes>,
table_id: TableId,
predicate: Option<CompiledExpr>,
},
}
impl<I: Io> Engine<I> {
pub(crate) fn plan_physical_insert(
&mut self,
table_id: TableId,
row: Vec<TempestValue<'static>>,
) -> PhysicalPlanNode {
let resolved = self.catalog.resolved_table(table_id);
let hlc = self.hlc.generate(now::<I>().as_millis() as u64);
let encoder = RowEncoder::new(&resolved);
let mut key_buf = BytesMut::new();
let mut value_buf = BytesMut::new();
encoder.encode_row(&row, hlc, &mut key_buf, &mut value_buf);
PhysicalPlanNode::Insert {
key: key_buf.freeze(),
value: value_buf.freeze(),
}
}
pub(crate) fn plan_physical_select(
&self,
table_id: TableId,
columns: Vec<(usize, TempestStr<'static>)>,
predicate: Option<CompiledExpr>,
) -> PhysicalPlanNode {
let resolved = self.catalog.resolved_table(table_id);
let encoder = RowEncoder::new(&resolved);
let seek_prefix = encoder.search_prefix();
let end_bound = Bound::Excluded(
next_prefix(&seek_prefix).expect("table prefix should always have a successor prefix"),
);
let start_bound = Bound::Included(seek_prefix);
PhysicalPlanNode::Scan {
start: start_bound,
end: end_bound,
columns,
table_id,
predicate,
}
}
pub(crate) fn plan_physical_delete(
&self,
table_id: TableId,
predicate: Option<CompiledExpr>,
) -> PhysicalPlanNode {
let resolved = self.catalog.resolved_table(table_id);
let encoder = RowEncoder::new(&resolved);
let seek_prefix = encoder.search_prefix();
let end_bound = Bound::Excluded(
next_prefix(&seek_prefix).expect("table prefix should always have a successor prefix"),
);
let start_bound = Bound::Included(seek_prefix);
PhysicalPlanNode::Delete {
start: start_bound,
end: end_bound,
table_id,
predicate,
}
}
}