1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// ── ReadTx ────────────────────────────────────────────────────────────────────
use crate::types::DbInner;
use sparrowdb_common::{NodeId, TxnId};
use sparrowdb_execution::{Engine, QueryResult};
use sparrowdb_storage::node_store::{NodeStore, Value};
use std::sync::Arc;
use tracing::info_span;
/// A read-only snapshot transaction.
///
/// Pinned at the `txn_id` current when this handle was opened; immune to
/// subsequent writer commits for the lifetime of this handle.
pub struct ReadTx {
/// The committed `txn_id` this reader is pinned to.
pub snapshot_txn_id: u64,
pub(crate) store: NodeStore,
pub(crate) inner: Arc<DbInner>,
}
impl ReadTx {
/// Read the `Int64` property values of a node at the pinned snapshot.
///
/// For each column the version chain is consulted first; if a value was
/// committed at or before `snapshot_txn_id` it shadows the on-disk value.
pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> crate::Result<Vec<(u32, Value)>> {
let versions = self.inner.versions.read().expect("version lock poisoned");
let raw = self.store.get_node_raw(node_id, col_ids)?;
let result = raw
.into_iter()
.map(|(col_id, raw_val)| {
// Check version chain first.
if let Some(v) = versions.get_at(node_id, col_id, self.snapshot_txn_id) {
(col_id, v)
} else {
(col_id, self.store.decode_raw_value(raw_val))
}
})
.collect();
Ok(result)
}
/// Return the snapshot `TxnId` this reader is pinned to.
pub fn snapshot(&self) -> TxnId {
TxnId(self.snapshot_txn_id)
}
/// Execute a read-only Cypher query against the pinned snapshot.
///
/// ## Snapshot isolation
///
/// The query sees exactly the committed state at the moment
/// [`begin_read`](crate::GraphDb::begin_read) was called. Any writes committed
/// after that point — even fully committed ones — are invisible until a
/// new `ReadTx` is opened.
///
/// ## Concurrency
///
/// Multiple `ReadTx` handles may run `query` concurrently. No write lock
/// is acquired; only the shared read-paths of the catalog, CSR, and
/// property-index caches are accessed.
///
/// ## Mutation statements rejected
///
/// Passing a mutation statement (`CREATE`, `MERGE`, `MATCH … SET`,
/// `MATCH … DELETE`, `CHECKPOINT`, `OPTIMIZE`, etc.) returns
/// [`Error::ReadOnly`]. Use [`GraphDb::execute`](crate::GraphDb::execute) for mutations.
///
/// # Example
/// ```no_run
/// use sparrowdb::GraphDb;
///
/// let db = GraphDb::open(std::path::Path::new("/tmp/g.sparrow")).unwrap();
/// let tx = db.begin_read().unwrap();
/// let result = tx.query("MATCH (n:Person) RETURN n.name").unwrap();
/// println!("{} rows", result.rows.len());
/// ```
pub fn query(&self, cypher: &str) -> crate::Result<QueryResult> {
use sparrowdb_cypher::{bind, parse};
let stmt = parse(cypher)?;
// Take a snapshot of the catalog from the shared cache (no disk I/O if
// the catalog is already warm).
let catalog_snap = self
.inner
.catalog
.read()
.expect("catalog RwLock poisoned")
.clone();
let bound = bind(stmt, &catalog_snap)?;
// Reject any statement that would mutate state — ReadTx is read-only.
if Engine::is_mutation(&bound.inner) {
return Err(crate::Error::ReadOnly);
}
// Also reject DDL / maintenance statements.
use sparrowdb_cypher::ast::Statement;
match &bound.inner {
Statement::Checkpoint | Statement::Optimize | Statement::CreateConstraint { .. } => {
return Err(crate::Error::ReadOnly);
}
_ => {}
}
let _span = info_span!("sparrowdb.readtx.query").entered();
let csrs = self
.inner
.csr_map
.read()
.expect("csr_map RwLock poisoned")
.clone();
let mut engine = {
let _open_span = info_span!("sparrowdb.readtx.open_engine").entered();
let row_counts = self
.inner
.label_row_counts
.read()
.expect("label_row_counts RwLock poisoned")
.clone();
Engine::new_with_all_caches(
NodeStore::open(&self.inner.path)?,
catalog_snap,
csrs,
&self.inner.path,
Some(&self.inner.prop_index),
Some(row_counts),
Some(Arc::clone(&self.inner.edge_props_cache)),
)
};
let result = {
let _exec_span = info_span!("sparrowdb.readtx.execute").entered();
engine.execute_statement(bound.inner)?
};
// Write lazily-loaded columns back to the shared property-index cache
// so subsequent queries benefit from warm column data.
engine.write_back_prop_index(&self.inner.prop_index);
// SPA-286: persist updated index to disk if new columns were loaded.
self.inner.persist_prop_index();
tracing::debug!(
rows = result.rows.len(),
snapshot_txn_id = self.snapshot_txn_id,
"readtx query complete"
);
Ok(result)
}
}
impl Drop for ReadTx {
fn drop(&mut self) {
// Unregister this reader's snapshot from the active-readers map.
// When the count drops to zero the entry is removed so GC can advance
// the watermark past this snapshot.
if let Ok(mut ar) = self.inner.active_readers.lock() {
if let std::collections::btree_map::Entry::Occupied(mut e) =
ar.entry(self.snapshot_txn_id)
{
let count = e.get_mut();
if *count <= 1 {
e.remove();
} else {
*count -= 1;
}
}
}
}
}