Skip to main content

infinite_db/concurrent/
read_txn.rs

1//! Point-in-time read transaction.
2//!
3//! Pin a concurrent read view at a revision ceiling so queries do not observe
4//! in-flight writes. Obtain one from [`InfiniteDb::read`].
5//!
6//! ```no_run
7//! use infinite_db::InfiniteDb;
8//! use infinite_db::infinitedb_core::address::SpaceId;
9//!
10//! # fn example(db: &InfiniteDb) -> std::io::Result<()> {
11//! let txn = db.read();
12//! let rows = txn.query(SpaceId(1))?;
13//! # Ok(())
14//! # }
15//! ```
16
17use crate::engine::query::{query_inner, space_key};
18use crate::infinitedb_core::{
19    address::{DimensionVector, RevisionId, SpaceId},
20    block::Record,
21    branch::BranchId,
22    query::Query,
23};
24use crate::InfiniteDb;
25
26/// Concurrent read view pinned at a revision ceiling.
27pub struct ReadTxn<'a> {
28    db: &'a InfiniteDb,
29    branch: BranchId,
30    as_of: Option<RevisionId>,
31}
32
33impl<'a> ReadTxn<'a> {
34    /// Pin reads at the current database revision on `main`.
35    pub fn new(db: &'a InfiniteDb) -> Self {
36        let as_of = Some(RevisionId(db.revision()));
37        Self {
38            db,
39            branch: BranchId::MAIN,
40            as_of,
41        }
42    }
43
44    /// Read through a branch overlay instead of `main`.
45    pub fn on_branch(mut self, branch: BranchId) -> Self {
46        self.branch = branch;
47        self
48    }
49
50    /// Override the revision ceiling for this read view.
51    pub fn as_of(mut self, rev: RevisionId) -> Self {
52        self.as_of = Some(rev);
53        self
54    }
55
56    /// Query all live records in `space` within this read view.
57    pub fn query(&self, space: SpaceId) -> std::io::Result<Vec<Record>> {
58        self.db.query_on_branch(self.branch, space, self.as_of)
59    }
60
61    /// Bounding-box query within this read view.
62    pub fn query_bbox(
63        &self,
64        space: SpaceId,
65        min: DimensionVector,
66        max: DimensionVector,
67    ) -> std::io::Result<Vec<Record>> {
68        self.db
69            .query_bbox_on_branch(self.branch, space, min, max, self.as_of)
70    }
71
72    /// Execute a [`Query`] descriptor against this read view.
73    ///
74    /// When `q.range` is set, an exact per-record coordinate filter is applied
75    /// so results match [`InfiniteDb::query_bbox`].
76    pub fn execute(&self, q: &Query) -> std::io::Result<Vec<Record>> {
77        let as_of = q.as_of.or(self.as_of);
78        let spaces = self.db.spaces.read();
79        let key_range = match q.key_range {
80            Some(kr) => Some(kr),
81            None => q.range.as_ref().map(|r| {
82                let ka = space_key(&spaces, q.space, &r.min);
83                let kb = space_key(&spaces, q.space, &r.max);
84                if ka <= kb {
85                    (ka, kb)
86                } else {
87                    (kb, ka)
88                }
89            }),
90        };
91
92        let ctx = self.db.query_ctx();
93        let branch_id = if self.branch == BranchId::MAIN {
94            None
95        } else {
96            Some(self.branch)
97        };
98        let mut results = query_inner(
99            &self.db.store,
100            &self.db.snapshots,
101            ctx.live_tail,
102            ctx.space_tails,
103            &spaces,
104            &self.db.revision,
105            q.space,
106            key_range,
107            as_of,
108            q.include_tombstones,
109            ctx.hilbert_tails,
110            Some(&self.db.branch_overlays),
111            branch_id,
112        )?;
113
114        if let Some(range) = &q.range {
115            results.retain(|r| r.address.point.within(&range.min, &range.max));
116        }
117
118        Ok(results)
119    }
120}