use crate::engine::error::EngineError;
use crate::engine::query::{query_inner, space_key};
use crate::engine::session::VersionVector;
use crate::infinitedb_core::{
address::{DimensionVector, RevisionId, SpaceId},
block::Record,
branch::BranchId,
query::Query,
};
use crate::InfiniteDb;
#[derive(Debug, Clone)]
enum ReadVisibility {
Vector(VersionVector),
Scalar(RevisionId),
}
pub struct ReadTxn<'a> {
db: &'a InfiniteDb,
branch: BranchId,
visibility: ReadVisibility,
captured_vector: VersionVector,
}
impl<'a> ReadTxn<'a> {
pub fn new(db: &'a InfiniteDb) -> Self {
let captured_vector = db.capture_version_vector();
Self {
db,
branch: BranchId::MAIN,
visibility: ReadVisibility::Vector(captured_vector.clone()),
captured_vector,
}
}
pub fn version_vector(&self) -> &VersionVector {
&self.captured_vector
}
pub fn on_branch(mut self, branch: BranchId) -> Self {
self.branch = branch;
self
}
pub fn as_of(mut self, rev: RevisionId) -> Self {
self.visibility = ReadVisibility::Scalar(rev);
self
}
fn pinned_vector(&self) -> Option<&VersionVector> {
match &self.visibility {
ReadVisibility::Vector(v) => Some(v),
ReadVisibility::Scalar(_) => None,
}
}
fn scalar_as_of(&self) -> Option<RevisionId> {
match &self.visibility {
ReadVisibility::Vector(_) => None,
ReadVisibility::Scalar(r) => Some(*r),
}
}
pub fn query(&self, space: SpaceId) -> Result<Vec<Record>, EngineError> {
self.db.query_on_branch_pinned(
self.branch,
space,
self.scalar_as_of(),
self.pinned_vector(),
)
}
pub fn query_bbox(
&self,
space: SpaceId,
min: DimensionVector,
max: DimensionVector,
) -> Result<Vec<Record>, EngineError> {
self.db.query_bbox_on_branch_pinned(
self.branch,
space,
min,
max,
self.scalar_as_of(),
self.pinned_vector(),
)
}
pub fn execute(&self, q: &Query) -> Result<Vec<Record>, EngineError> {
let as_of = q.as_of.or(self.scalar_as_of());
let pinned = self.pinned_vector();
let spaces = self.db.spaces.read();
let key_range = match q.key_range {
Some(kr) => Some(kr),
None => q.range.as_ref().map(|r| {
let ka = space_key(&spaces, q.space, &r.min);
let kb = space_key(&spaces, q.space, &r.max);
if ka <= kb {
(ka, kb)
} else {
(kb, ka)
}
}),
};
let ctx = self.db.query_ctx();
let branch_id = if self.branch == BranchId::MAIN {
None
} else {
Some(self.branch)
};
let mut results = query_inner(
&self.db.store,
&self.db.snapshots,
None,
&spaces,
&self.db.session_watermarks,
q.space,
key_range,
as_of,
pinned,
q.include_tombstones,
Some(ctx.hilbert_tails),
Some(&self.db.branch_overlays),
branch_id,
)
.map_err(EngineError::from)?;
if let Some(range) = &q.range {
results.retain(|r| r.address.point.within(&range.min, &range.max));
}
Ok(results)
}
}