sparrowdb/lib.rs
1// Suppress rustdoc warnings for Markdown links that are valid on GitHub/docs.rs
2// but cannot be resolved as Rust item paths (e.g., [LICENSE], [docs/...]).
3#![allow(rustdoc::bare_urls)]
4#![allow(rustdoc::broken_intra_doc_links)]
5//! SparrowDB — top-level public API.
6//!
7//! ## Transaction model (Phase 5 — SWMR)
8//!
9//! * **Single Writer**: at most one [`WriteTx`] may be active at a time.
10//! [`GraphDb::begin_write`] returns [`Error::WriterBusy`] if a writer is
11//! already open.
12//!
13//! * **Multiple Readers**: any number of [`ReadTx`] handles may coexist with
14//! an active writer. Each reader pins the committed `txn_id` at open time
15//! and sees only data committed at or before that snapshot.
16//!
17//! ## Snapshot isolation for `set_node_col`
18//!
19//! Property updates (`set_node_col`) are recorded in an **in-memory version
20//! chain** keyed by `(NodeId, col_id)`. Each entry is a `(txn_id, Value)`
21//! pair. When a `ReadTx` reads a property it first consults the version chain
22//! and returns the most-recent value with `txn_id <= snapshot_txn_id`, falling
23//! back to the on-disk value written by `create_node` if no such entry exists.
24//!
25//! ## Quick start
26//!
27//! ```no_run
28//! use sparrowdb::GraphDb;
29//! let db = GraphDb::open(std::path::Path::new("/tmp/my.sparrow")).unwrap();
30//! db.checkpoint().unwrap();
31//! db.optimize().unwrap();
32//! ```
33
34use sparrowdb_catalog::catalog::{Catalog, LabelId};
35use sparrowdb_common::{col_id_of, TxnId};
36use sparrowdb_execution::Engine;
37
38// ── Export / import module ────────────────────────────────────────────────────
39pub mod export;
40pub use export::{EdgeDump, GraphDump, NodeDump};
41
42// ── Public re-exports ─────────────────────────────────────────────────────────
43//
44// Re-export the types that consumers of the top-level `sparrowdb` crate need
45// without also having to depend on the sub-crates directly.
46
47/// Query result returned by [`GraphDb::execute`] and [`GraphDb::execute_write`].
48/// Contains column names and rows of scalar [`Value`] cells.
49pub use sparrowdb_execution::QueryResult;
50
51/// Scalar value type used in query results and node property reads/writes.
52pub use sparrowdb_storage::node_store::Value;
53
54/// Opaque 64-bit node identifier.
55pub use sparrowdb_common::NodeId;
56
57/// Opaque 64-bit edge identifier.
58pub use sparrowdb_common::EdgeId;
59
60/// Error type returned by all SparrowDB operations.
61pub use sparrowdb_common::Error;
62
63/// Convenience alias: `std::result::Result<T, sparrowdb::Error>`.
64pub use sparrowdb_common::Result;
65
66/// Per-rel-table edge property cache (SPA-261).
67type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
68
69// ── DbStats ───────────────────────────────────────────────────────────────────
70
71/// Storage-size snapshot returned by [`GraphDb::stats`] (SPA-171).
72#[derive(Debug, Clone, Default)]
73pub struct DbStats {
74 /// Total bytes on disk across all database files.
75 pub total_bytes: u64,
76 /// Bytes consumed by node column files for each label.
77 pub bytes_per_label: std::collections::HashMap<String, u64>,
78 /// High-water mark (HWM) per label — the highest slot index allocated for
79 /// that label, plus one. Soft-deleted nodes whose slots have not yet been
80 /// reclaimed by compaction are still included in this count. The value
81 /// converges to the true live-node count once compaction / GC runs.
82 pub node_count_per_label: std::collections::HashMap<String, u64>,
83 /// Total edges across all relationship types (delta log + CSR).
84 pub edge_count: u64,
85 /// Bytes used by all WAL segment files under `wal/`.
86 pub wal_bytes: u64,
87}
88
89use sparrowdb_storage::csr::CsrForward;
90use sparrowdb_storage::edge_store::{EdgeStore, RelTableId};
91use sparrowdb_storage::maintenance::MaintenanceEngine;
92use sparrowdb_storage::node_store::NodeStore;
93use sparrowdb_storage::wal::codec::{WalPayload, WalRecordKind};
94use sparrowdb_storage::wal::writer::WalWriter;
95use std::collections::{HashMap, HashSet};
96use std::path::{Path, PathBuf};
97use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
98use std::sync::{Arc, Mutex, RwLock};
99use tracing::info_span;
100
101// ── Version chain ─────────────────────────────────────────────────────────────
102
103/// Key for the version map: (packed NodeId value, column id).
104type VersionKey = (u64, u32);
105
106/// One committed version of a property value.
107#[derive(Clone)]
108struct Version {
109 /// The `txn_id` at which this value was committed.
110 committed_at: u64,
111 value: Value,
112}
113
114/// In-memory MVCC version store for property updates.
115///
116/// A `Vec<Version>` per `(NodeId, col_id)` key, sorted ascending by
117/// `committed_at`. Readers binary-search for the latest version ≤ their
118/// snapshot.
119#[derive(Default)]
120struct VersionStore {
121 map: HashMap<VersionKey, Vec<Version>>,
122}
123
124impl VersionStore {
125 /// Record a committed property update.
126 fn insert(&mut self, node_id: NodeId, col_id: u32, committed_at: u64, value: Value) {
127 let versions = self.map.entry((node_id.0, col_id)).or_default();
128 versions.push(Version {
129 committed_at,
130 value,
131 });
132 // Keep sorted — writers are serialized so this is always appended in
133 // order, but sort to be safe.
134 versions.sort_by_key(|v| v.committed_at);
135 }
136
137 /// Return the most-recent value committed at or before `snapshot_txn_id`,
138 /// or `None` if no update exists within the snapshot window.
139 fn get_at(&self, node_id: NodeId, col_id: u32, snapshot_txn_id: u64) -> Option<Value> {
140 let versions = self.map.get(&(node_id.0, col_id))?;
141 // Binary search for the rightmost version with committed_at ≤ snapshot.
142 let idx = versions.partition_point(|v| v.committed_at <= snapshot_txn_id);
143 if idx == 0 {
144 None
145 } else {
146 Some(versions[idx - 1].value.clone())
147 }
148 }
149}
150
151// ── Pending write buffer ──────────────────────────────────────────────────────
152
153/// One staged update: the new value plus the before-image read from disk/
154/// version-chain at the time the update was staged.
155struct StagedUpdate {
156 /// Before-image — the value a reader with `snapshot ≤ prev_txn_id` should
157 /// see. Stored at `prev_txn_id` in the version chain on commit so that
158 /// pre-existing readers retain correct snapshot access even after the
159 /// on-disk column has been overwritten.
160 ///
161 /// `None` if the key already had an entry in the version chain (so the
162 /// chain already carries the correct before-image).
163 before_image: Option<(u64, Value)>, // (txn_id_to_store_at, value)
164 /// The new value to commit.
165 new_value: Value,
166 /// Human-readable property key name used for WAL emission.
167 /// For columns staged by col_id directly (e.g. from create_node), this is
168 /// the synthesized form `"col_{col_id}"`.
169 key_name: String,
170}
171
172/// Staged (uncommitted) property updates for an active `WriteTx`.
173///
174/// Keyed by `(NodeId, col_id)`; only the latest staged value for each key
175/// matters.
176#[derive(Default)]
177struct WriteBuffer {
178 updates: HashMap<VersionKey, StagedUpdate>,
179}
180
181// ── WAL mutation log entries ──────────────────────────────────────────────────
182
183/// A mutation staged in a `WriteTx` for WAL emission at commit time.
184enum WalMutation {
185 /// A node was created (SPA-123, SPA-127).
186 NodeCreate {
187 node_id: NodeId,
188 label_id: u32,
189 /// Property data as `(col_id, value)` pairs used for on-disk writes.
190 props: Vec<(u32, Value)>,
191 /// Human-readable property names parallel to `props`.
192 ///
193 /// When property names are available (e.g. from a Cypher literal) they
194 /// are recorded here so the WAL can store them for schema introspection
195 /// (`CALL db.schema()`). Empty when names are not known (e.g. low-level
196 /// `create_node` calls that only have col_ids).
197 prop_names: Vec<String>,
198 },
199 /// A node was deleted (SPA-125, SPA-127).
200 NodeDelete { node_id: NodeId },
201 /// An edge was created (SPA-126, SPA-127).
202 EdgeCreate {
203 edge_id: EdgeId,
204 src: NodeId,
205 dst: NodeId,
206 rel_type: String,
207 /// Human-readable (name, value) pairs for WAL observability and schema introspection.
208 prop_entries: Vec<(String, Value)>,
209 },
210 /// A specific directed edge was deleted.
211 EdgeDelete {
212 src: NodeId,
213 dst: NodeId,
214 rel_type: String,
215 },
216}
217
218// ── Pending structural operations (SPA-181 buffering) ────────────────────────
219
220/// A buffered structural mutation that will be applied to storage on commit.
221///
222/// Property updates (`set_node_col`) already go through [`WriteBuffer`] and
223/// are flushed by the existing commit machinery. These `PendingOp` entries
224/// cover the remaining mutations that previously wrote directly to disk
225/// before commit.
226///
227/// Note: catalog schema changes (`create_label`, `get_or_create_rel_type_id`)
228/// are still written immediately because labels are idempotent metadata.
229/// Full schema-change atomicity is deferred to a future phase.
230enum PendingOp {
231 /// Create a node: write to the node-store at the pre-reserved `slot` and
232 /// advance the on-disk HWM.
233 NodeCreate {
234 label_id: u32,
235 slot: u32,
236 props: Vec<(u32, Value)>,
237 },
238 /// Delete a node: tombstone col_0 with `u64::MAX`.
239 NodeDelete { node_id: NodeId },
240 /// Create an edge: append to the edge delta log.
241 EdgeCreate {
242 src: NodeId,
243 dst: NodeId,
244 rel_table_id: RelTableId,
245 /// Encoded (col_id, value_u64) pairs to persist in edge_props.bin.
246 props: Vec<(u32, u64)>,
247 },
248 /// Delete an edge: rewrite the delta log excluding this record.
249 EdgeDelete {
250 src: NodeId,
251 dst: NodeId,
252 rel_table_id: RelTableId,
253 },
254}
255
256// ── Node version tracker (for MVCC conflict detection, SPA-128) ───────────────
257
258/// Tracks the `txn_id` of the last committed write to each node.
259#[derive(Default)]
260struct NodeVersions {
261 /// `node_id.0 → last_committed_txn_id`.
262 map: HashMap<u64, u64>,
263}
264
265impl NodeVersions {
266 fn set(&mut self, node_id: NodeId, txn_id: u64) {
267 self.map.insert(node_id.0, txn_id);
268 }
269
270 fn get(&self, node_id: NodeId) -> u64 {
271 self.map.get(&node_id.0).copied().unwrap_or(0)
272 }
273}
274
275// ── Shared inner state ────────────────────────────────────────────────────────
276
277struct DbInner {
278 path: PathBuf,
279 /// Monotonically increasing; starts at 0 (no writes committed yet).
280 /// Incremented atomically after each successful `WriteTx` commit.
281 current_txn_id: AtomicU64,
282 /// Ensures at most one writer exists at a time.
283 ///
284 /// `false` = no active writer; `true` = writer active.
285 /// Use `compare_exchange(false, true)` for try-lock semantics.
286 write_locked: AtomicBool,
287 /// MVCC version chains for property updates (snapshot-isolation support).
288 versions: RwLock<VersionStore>,
289 /// Per-node last-committed txn_id (write-write conflict detection, SPA-128).
290 node_versions: RwLock<NodeVersions>,
291 /// Optional 32-byte encryption key (SPA-98). Retained for future use
292 /// (e.g., reopening the WAL writer after a full segment rotation triggered
293 /// by an external checkpoint). The key is encoded into the WAL writer at
294 /// open time via [`WalWriter::open_encrypted`].
295 #[allow(dead_code)]
296 encryption_key: Option<[u8; 32]>,
297 unique_constraints: RwLock<HashSet<(u32, u32)>>,
298 /// Shared property-index cache (SPA-187).
299 ///
300 /// Read queries clone from this at `Engine::new_with_cached_index` time and
301 /// write their lazily-populated index back after execution. Write
302 /// transactions invalidate it via `clear()` on commit so the next read
303 /// re-populates from the updated column files.
304 prop_index: RwLock<sparrowdb_storage::property_index::PropertyIndex>,
305 /// Shared catalog cache (SPA-188).
306 ///
307 /// Read queries clone from this to avoid re-parsing the TLV catalog file
308 /// on every query. DDL operations (label/rel-type creation, constraints)
309 /// refresh the cache via `invalidate_catalog()`.
310 catalog: RwLock<Catalog>,
311 /// Cached per-type CSR forward map (SPA-189).
312 ///
313 /// Populated at `GraphDb::open` time and cloned into each `Engine`
314 /// instance. Invalidated after CHECKPOINT / OPTIMIZE since those compact
315 /// the delta log into new CSR base files.
316 csr_map: RwLock<HashMap<u32, CsrForward>>,
317 /// Cached label row counts (SPA-190).
318 ///
319 /// Maps `LabelId → node count` (high-water mark). Built once at open
320 /// time and refreshed after any write that creates or deletes nodes.
321 /// Passed to `Engine::new_with_all_caches` to avoid per-label HWM disk
322 /// reads on every read query.
323 label_row_counts: RwLock<HashMap<LabelId, usize>>,
324 /// Persistent WAL writer (SPA-210).
325 ///
326 /// Reused across transaction commits to avoid paying segment-scan and
327 /// file-open overhead on every write. Protected by a `Mutex` because
328 /// only one writer exists at a time (the `write_locked` flag enforces
329 /// SWMR, so contention is zero in practice).
330 wal_writer: Mutex<WalWriter>,
331 /// Shared edge-property cache (SPA-261).
332 edge_props_cache: EdgePropsCache,
333}
334
335// ── Write-lock guard (SPA-181: replaces 'static transmute UB) ────────────────
336
337/// RAII guard that holds the exclusive writer lock for a [`DbInner`].
338///
339/// Acquired by [`GraphDb::begin_write`] via an atomic compare-exchange
340/// (`false → true`). Released on `Drop` by resetting the flag to `false`.
341/// Because the guard holds an [`Arc<DbInner>`] the `DbInner` (and the
342/// `AtomicBool` it contains) is guaranteed to outlive the guard — no unsafe
343/// lifetime extension needed.
344struct WriteGuard {
345 inner: Arc<DbInner>,
346}
347
348impl WriteGuard {
349 /// Try to acquire the write lock. Returns `None` if already held.
350 fn try_acquire(inner: &Arc<DbInner>) -> Option<Self> {
351 inner
352 .write_locked
353 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
354 .ok()
355 .map(|_| WriteGuard {
356 inner: Arc::clone(inner),
357 })
358 }
359}
360
361impl Drop for WriteGuard {
362 fn drop(&mut self) {
363 // Release the lock.
364 self.inner.write_locked.store(false, Ordering::Release);
365 }
366}
367
368// ── GraphDb ───────────────────────────────────────────────────────────────────
369
370/// The top-level SparrowDB handle.
371///
372/// Cheaply cloneable — all clones share the same underlying state.
373/// Obtain an instance via [`GraphDb::open`] or the free [`open`] function.
374#[derive(Clone)]
375pub struct GraphDb {
376 inner: Arc<DbInner>,
377}
378
379impl GraphDb {
380 /// Return the filesystem path of this database.
381 pub fn path(&self) -> &Path {
382 &self.inner.path
383 }
384
385 /// Open (or create) a SparrowDB database at `path`.
386 pub fn open(path: &Path) -> Result<Self> {
387 std::fs::create_dir_all(path)?;
388 let catalog = Catalog::open(path)?;
389 let wal_dir = path.join("wal");
390 let wal_writer = WalWriter::open(&wal_dir)?;
391 let label_row_counts = RwLock::new(build_label_row_counts_from_disk(&catalog, path));
392 Ok(GraphDb {
393 inner: Arc::new(DbInner {
394 path: path.to_path_buf(),
395 current_txn_id: AtomicU64::new(0),
396 write_locked: AtomicBool::new(false),
397 versions: RwLock::new(VersionStore::default()),
398 node_versions: RwLock::new(NodeVersions::default()),
399 encryption_key: None,
400 unique_constraints: RwLock::new(HashSet::new()),
401 prop_index: RwLock::new(sparrowdb_storage::property_index::PropertyIndex::new()),
402 catalog: RwLock::new(catalog),
403 csr_map: RwLock::new(open_csr_map(path)),
404 label_row_counts,
405 wal_writer: Mutex::new(wal_writer),
406 edge_props_cache: Arc::new(RwLock::new(HashMap::new())),
407 }),
408 })
409 }
410
411 /// Open (or create) an encrypted SparrowDB database at `path` (SPA-98).
412 ///
413 /// WAL payloads are encrypted with XChaCha20-Poly1305 using `key`.
414 /// The same 32-byte key must be supplied on every subsequent open of this
415 /// database — opening with a wrong or missing key will cause WAL replay to
416 /// fail with [`Error::EncryptionAuthFailed`].
417 ///
418 /// # Errors
419 /// Returns an error if the directory cannot be created.
420 pub fn open_encrypted(path: &Path, key: [u8; 32]) -> Result<Self> {
421 std::fs::create_dir_all(path)?;
422 let catalog = Catalog::open(path)?;
423 let wal_dir = path.join("wal");
424 let wal_writer = WalWriter::open_encrypted(&wal_dir, key)?;
425 let label_row_counts = RwLock::new(build_label_row_counts_from_disk(&catalog, path));
426 Ok(GraphDb {
427 inner: Arc::new(DbInner {
428 path: path.to_path_buf(),
429 current_txn_id: AtomicU64::new(0),
430 write_locked: AtomicBool::new(false),
431 versions: RwLock::new(VersionStore::default()),
432 node_versions: RwLock::new(NodeVersions::default()),
433 encryption_key: Some(key),
434 unique_constraints: RwLock::new(HashSet::new()),
435 prop_index: RwLock::new(sparrowdb_storage::property_index::PropertyIndex::new()),
436 catalog: RwLock::new(catalog),
437 csr_map: RwLock::new(open_csr_map(path)),
438 label_row_counts,
439 wal_writer: Mutex::new(wal_writer),
440 edge_props_cache: Arc::new(RwLock::new(HashMap::new())),
441 }),
442 })
443 }
444
445 /// Invalidate the shared property-index cache (SPA-187).
446 ///
447 /// Called after every write-transaction commit so the next read query
448 /// re-populates from the updated column files on disk.
449 fn invalidate_prop_index(&self) {
450 self.inner
451 .prop_index
452 .write()
453 .expect("prop_index RwLock poisoned")
454 .clear();
455 }
456
457 /// Refresh the shared catalog cache from disk (SPA-188) and simultaneously
458 /// rebuild the cached label row counts (SPA-190).
459 ///
460 /// Called after DDL operations and any write commit so the next read query
461 /// sees the updated schema and fresh node counts without extra disk reads.
462 fn invalidate_catalog(&self) {
463 if let Ok(fresh) = Catalog::open(&self.inner.path) {
464 // Rebuild row counts while we have the fresh catalog in hand —
465 // no extra I/O to open it again.
466 let new_counts = build_label_row_counts_from_disk(&fresh, &self.inner.path);
467 *self.inner.catalog.write().expect("catalog RwLock poisoned") = fresh;
468 *self
469 .inner
470 .label_row_counts
471 .write()
472 .expect("label_row_counts RwLock poisoned") = new_counts;
473 }
474 }
475
476 /// Build a read-optimised `Engine` using all available shared caches
477 /// (PropertyIndex, CSR map, label row counts) — SPA-190.
478 ///
479 /// All read query paths should use this instead of calling
480 /// `Engine::new_with_cached_index` directly.
481 fn build_read_engine(
482 &self,
483 store: NodeStore,
484 catalog: Catalog,
485 csrs: HashMap<u32, CsrForward>,
486 ) -> Engine {
487 Engine::new_with_all_caches(
488 store,
489 catalog,
490 csrs,
491 &self.inner.path,
492 Some(&self.inner.prop_index),
493 Some(self.cached_label_row_counts()),
494 Some(Arc::clone(&self.inner.edge_props_cache)),
495 )
496 }
497
498 /// Return a clone of the cached label row counts (SPA-190).
499 fn cached_label_row_counts(&self) -> HashMap<LabelId, usize> {
500 self.inner
501 .label_row_counts
502 .read()
503 .expect("label_row_counts RwLock poisoned")
504 .clone()
505 }
506
507 /// Clone the cached catalog for use in a single query (SPA-188).
508 fn catalog_snapshot(&self) -> Catalog {
509 self.inner
510 .catalog
511 .read()
512 .expect("catalog RwLock poisoned")
513 .clone()
514 }
515
516 /// Refresh the cached CSR forward map from disk (SPA-189).
517 ///
518 /// Called after CHECKPOINT / OPTIMIZE since those compact the delta log
519 /// into new CSR base files. Regular writes do NOT need this because
520 /// the delta log is read separately by the engine.
521 ///
522 /// Only replaces the cache when `open_csr_map` succeeds in loading the
523 /// catalog; a failed reload (e.g. transient I/O error) leaves the
524 /// existing in-memory map intact rather than replacing it with an empty
525 /// map that would cause subsequent queries to miss all checkpointed edges.
526 /// Clear the entire edge-props cache (SPA-261).
527 fn invalidate_edge_props_cache(&self) {
528 self.inner
529 .edge_props_cache
530 .write()
531 .expect("edge_props_cache poisoned")
532 .clear();
533 }
534
535 fn invalidate_csr_map(&self) {
536 if let Ok(fresh) = try_open_csr_map(&self.inner.path) {
537 *self.inner.csr_map.write().expect("csr_map RwLock poisoned") = fresh;
538 }
539 }
540
541 /// Clone the cached CSR map for use in a single query (SPA-189).
542 fn cached_csr_map(&self) -> HashMap<u32, CsrForward> {
543 self.inner
544 .csr_map
545 .read()
546 .expect("csr_map RwLock poisoned")
547 .clone()
548 }
549
550 /// Open a read-only snapshot transaction.
551 ///
552 /// The returned [`ReadTx`] sees all data committed at or before the
553 /// current `txn_id` at the moment of this call.
554 pub fn begin_read(&self) -> Result<ReadTx> {
555 let snapshot_txn_id = self.inner.current_txn_id.load(Ordering::Acquire);
556 let store = NodeStore::open(&self.inner.path)?;
557 Ok(ReadTx {
558 snapshot_txn_id,
559 store,
560 inner: Arc::clone(&self.inner),
561 })
562 }
563
564 /// Open a write transaction.
565 ///
566 /// Returns [`Error::WriterBusy`] immediately if another [`WriteTx`] is
567 /// already active (try-lock semantics — does not block).
568 pub fn begin_write(&self) -> Result<WriteTx> {
569 // Atomically acquire the write lock (false → true).
570 // No unsafe transmute — WriteGuard holds Arc<DbInner> and resets
571 // the AtomicBool on Drop, so the lock flag always outlives the guard.
572 let guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
573 let snapshot_txn_id = self.inner.current_txn_id.load(Ordering::Acquire);
574 let store = NodeStore::open(&self.inner.path)?;
575 // WriteTx opens a fresh catalog from disk because it may create labels
576 // or rel types, and needs the latest on-disk state to avoid ID conflicts.
577 let catalog = Catalog::open(&self.inner.path)?;
578 Ok(WriteTx {
579 inner: Arc::clone(&self.inner),
580 store,
581 catalog,
582 write_buf: WriteBuffer::default(),
583 wal_mutations: Vec::new(),
584 dirty_nodes: HashSet::new(),
585 snapshot_txn_id,
586 _guard: guard,
587 committed: false,
588 fulltext_pending: HashMap::new(),
589 pending_ops: Vec::new(),
590 })
591 }
592
593 /// Run a CHECKPOINT: fold the delta log into CSR base files, emit WAL
594 /// records, and publish the new `wal_checkpoint_lsn` to the metapage.
595 ///
596 /// Acquires the writer lock for the duration — no concurrent writes.
597 ///
598 /// Returns [`Error::WriterBusy`] immediately if a [`WriteTx`] is currently
599 /// active, rather than blocking indefinitely. This prevents deadlocks on
600 /// single-threaded callers and avoids unbounded waits on multi-threaded ones.
601 pub fn checkpoint(&self) -> Result<()> {
602 let _guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
603 let catalog = self.catalog_snapshot();
604 let node_store = NodeStore::open(&self.inner.path)?;
605 let (rel_table_ids, n_nodes) =
606 collect_maintenance_params(&catalog, &node_store, &self.inner.path);
607 let engine = MaintenanceEngine::new(&self.inner.path);
608 engine.checkpoint(&rel_table_ids, n_nodes)?;
609 self.invalidate_csr_map();
610 self.invalidate_edge_props_cache();
611 Ok(())
612 }
613
614 /// Run an OPTIMIZE: same as CHECKPOINT but additionally sorts each source
615 /// node's neighbor list by `(dst_node_id)` ascending.
616 ///
617 /// Acquires the writer lock for the duration — no concurrent writes.
618 ///
619 /// Returns [`Error::WriterBusy`] immediately if a [`WriteTx`] is currently
620 /// active, rather than blocking indefinitely. This prevents deadlocks on
621 /// single-threaded callers and avoids unbounded waits on multi-threaded ones.
622 pub fn optimize(&self) -> Result<()> {
623 let _guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
624 let catalog = self.catalog_snapshot();
625 let node_store = NodeStore::open(&self.inner.path)?;
626 let (rel_table_ids, n_nodes) =
627 collect_maintenance_params(&catalog, &node_store, &self.inner.path);
628 let engine = MaintenanceEngine::new(&self.inner.path);
629 engine.optimize(&rel_table_ids, n_nodes)?;
630 self.invalidate_csr_map();
631 self.invalidate_edge_props_cache();
632 Ok(())
633 }
634
635 /// Return a storage-size snapshot for this database (SPA-171).
636 ///
637 /// Pure read — no locks acquired, no writes performed.
638 ///
639 /// # Best-effort semantics
640 ///
641 /// Filesystem entries that cannot be read (missing directories, permission
642 /// errors, partially corrupt files) are silently skipped. The returned
643 /// snapshot may therefore undercount bytes or edges if the database
644 /// directory is in an inconsistent state. Callers that need exact
645 /// accounting should treat the reported values as a lower bound.
646 pub fn stats(&self) -> Result<DbStats> {
647 let db_root = &self.inner.path;
648 let catalog = self.catalog_snapshot();
649 let node_store = NodeStore::open(db_root)?;
650 let mut stats = DbStats::default();
651
652 for (label_id, label_name) in catalog.list_labels()? {
653 let lid = label_id as u32;
654 stats.node_count_per_label.insert(
655 label_name.clone(),
656 node_store.hwm_for_label(lid).unwrap_or(0),
657 );
658 let mut lb: u64 = 0;
659 if let Ok(es) = std::fs::read_dir(db_root.join("nodes").join(lid.to_string())) {
660 for e in es.flatten() {
661 if let Ok(m) = e.metadata() {
662 lb += m.len();
663 }
664 }
665 }
666 stats.bytes_per_label.insert(label_name, lb);
667 }
668
669 if let Ok(es) = std::fs::read_dir(db_root.join("wal")) {
670 for e in es.flatten() {
671 let n = e.file_name();
672 let ns = n.to_string_lossy();
673 if ns.starts_with("segment-") && ns.ends_with(".wal") {
674 if let Ok(m) = e.metadata() {
675 stats.wal_bytes += m.len();
676 }
677 }
678 }
679 }
680
681 const DR: u64 = 20; // DeltaRecord: src(8) + dst(8) + rel_id(4) = 20 bytes
682 if let Ok(ts) = std::fs::read_dir(db_root.join("edges")) {
683 for t in ts.flatten() {
684 if !t.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
685 continue;
686 }
687 let rd = t.path();
688 if let Ok(m) = std::fs::metadata(rd.join("delta.log")) {
689 stats.edge_count += m.len().checked_div(DR).unwrap_or(0);
690 }
691 let fp = rd.join("base.fwd.csr");
692 if fp.exists() {
693 if let Ok(b) = std::fs::read(&fp) {
694 if let Ok(csr) = CsrForward::decode(&b) {
695 stats.edge_count += csr.n_edges();
696 }
697 }
698 }
699 }
700 }
701
702 stats.total_bytes = dir_size_bytes(db_root);
703 Ok(stats)
704 }
705
706 /// Execute a Cypher query.
707 ///
708 /// Read-only statements (`MATCH … RETURN`) are executed against a
709 /// point-in-time snapshot. Mutation statements (`MERGE`, `MATCH … SET`,
710 /// `MATCH … DELETE`) open a write transaction internally and commit on
711 /// success. `CREATE` statements auto-register labels and write nodes (SPA-156).
712 /// `CHECKPOINT` and `OPTIMIZE` delegate to the maintenance engine (SPA-189).
713 pub fn execute(&self, cypher: &str) -> Result<QueryResult> {
714 use sparrowdb_cypher::ast::Statement;
715 use sparrowdb_cypher::{bind, parse};
716
717 let stmt = parse(cypher)?;
718 let catalog_snap = self.catalog_snapshot();
719 let bound = bind(stmt, &catalog_snap)?;
720
721 // SPA-189: wire CHECKPOINT and OPTIMIZE to their real implementations
722 // before entering the mutation / read-only dispatch below.
723 match &bound.inner {
724 Statement::Checkpoint => {
725 self.checkpoint()?;
726 return Ok(QueryResult::empty(vec![]));
727 }
728 Statement::Optimize => {
729 self.optimize()?;
730 return Ok(QueryResult::empty(vec![]));
731 }
732 Statement::CreateConstraint { label, property } => {
733 return self.register_unique_constraint(label, property);
734 }
735 _ => {}
736 }
737
738 if Engine::is_mutation(&bound.inner) {
739 match bound.inner {
740 Statement::Merge(ref m) => self.execute_merge(m),
741 Statement::MatchMergeRel(ref mm) => self.execute_match_merge_rel(mm),
742 Statement::MatchMutate(ref mm) => self.execute_match_mutate(mm),
743 Statement::MatchCreate(ref mc) => self.execute_match_create(mc),
744 // Standalone CREATE with edges — must go through WriteTx so
745 // create_edge can register the rel type and write the WAL.
746 Statement::Create(ref c) => self.execute_create_standalone(c),
747 _ => unreachable!(),
748 }
749 } else {
750 let _span = info_span!("sparrowdb.query").entered();
751
752 let mut engine = {
753 let _open_span = info_span!("sparrowdb.open_engine").entered();
754 let csrs = self.cached_csr_map();
755 self.build_read_engine(NodeStore::open(&self.inner.path)?, catalog_snap, csrs)
756 };
757
758 let result = {
759 let _exec_span = info_span!("sparrowdb.execute").entered();
760 engine.execute_statement(bound.inner)?
761 };
762
763 // Write lazily-loaded columns back to the shared cache.
764 engine.write_back_prop_index(&self.inner.prop_index);
765
766 tracing::debug!(rows = result.rows.len(), "query complete");
767 Ok(result)
768 }
769 }
770
771 /// Execute a Cypher query with a per-query timeout (SPA-254).
772 ///
773 /// Identical to [`execute`](Self::execute) but sets a deadline of
774 /// `Instant::now() + timeout` before dispatching to the engine. The
775 /// engine checks the deadline at the top of every hot scan / traversal
776 /// loop. If the deadline passes before the query completes,
777 /// [`Error::QueryTimeout`] is returned.
778 ///
779 /// `execute_with_timeout` only supports read-only (`MATCH … RETURN`)
780 /// statements today; mutation statements are forwarded to the existing
781 /// write-transaction code path **without** a timeout (they typically
782 /// complete in O(1) writes and are not the source of runaway queries).
783 ///
784 /// # Example
785 /// ```no_run
786 /// use sparrowdb::GraphDb;
787 /// use std::time::Duration;
788 ///
789 /// let db = GraphDb::open(std::path::Path::new("/tmp/g.sparrow")).unwrap();
790 /// let result = db.execute_with_timeout(
791 /// "MATCH (n:Person) RETURN n.name",
792 /// Duration::from_secs(5),
793 /// );
794 /// match result {
795 /// Ok(qr) => println!("{} rows", qr.rows.len()),
796 /// Err(sparrowdb::Error::QueryTimeout) => eprintln!("query timed out"),
797 /// Err(e) => eprintln!("error: {e}"),
798 /// }
799 /// ```
800 pub fn execute_with_timeout(
801 &self,
802 cypher: &str,
803 timeout: std::time::Duration,
804 ) -> Result<QueryResult> {
805 use sparrowdb_cypher::ast::Statement;
806 use sparrowdb_cypher::{bind, parse};
807
808 let stmt = parse(cypher)?;
809 let catalog_snap = self.catalog_snapshot();
810 let bound = bind(stmt, &catalog_snap)?;
811
812 // Delegate CHECKPOINT/OPTIMIZE immediately — no timeout needed.
813 match &bound.inner {
814 Statement::Checkpoint => {
815 self.checkpoint()?;
816 return Ok(QueryResult::empty(vec![]));
817 }
818 Statement::Optimize => {
819 self.optimize()?;
820 return Ok(QueryResult::empty(vec![]));
821 }
822 Statement::CreateConstraint { label, property } => {
823 return self.register_unique_constraint(label, property);
824 }
825 _ => {}
826 }
827
828 if Engine::is_mutation(&bound.inner) {
829 // Mutation statements go through the standard write-transaction
830 // path; they are not the source of runaway queries.
831 return match bound.inner {
832 Statement::Merge(ref m) => self.execute_merge(m),
833 Statement::MatchMutate(ref mm) => self.execute_match_mutate(mm),
834 Statement::MatchCreate(ref mc) => self.execute_match_create(mc),
835 Statement::Create(ref c) => self.execute_create_standalone(c),
836 _ => unreachable!(),
837 };
838 }
839
840 let _span = info_span!("sparrowdb.query_with_timeout").entered();
841 let deadline = std::time::Instant::now() + timeout;
842
843 let mut engine = {
844 let _open_span = info_span!("sparrowdb.open_engine").entered();
845 let csrs = self.cached_csr_map();
846 self.build_read_engine(NodeStore::open(&self.inner.path)?, catalog_snap, csrs)
847 .with_deadline(deadline)
848 };
849
850 let result = {
851 let _exec_span = info_span!("sparrowdb.execute").entered();
852 engine.execute_statement(bound.inner)?
853 };
854
855 engine.write_back_prop_index(&self.inner.prop_index);
856
857 tracing::debug!(rows = result.rows.len(), "query_with_timeout complete");
858 Ok(result)
859 }
860
861 /// Internal: execute a standalone `CREATE (a)-[:R]->(b)` statement.
862 ///
863 /// Creates all declared nodes, then for each edge in the pattern looks up
864 /// the freshly-created node IDs by variable name and calls
865 /// `WriteTx::create_edge`. This is needed when the CREATE clause contains
866 /// relationship patterns, because edge creation must be routed through a
867 /// write transaction so the relationship type is registered in the catalog
868 /// and the edge is appended to the WAL (SPA-182).
869 fn register_unique_constraint(&self, label: &str, property: &str) -> Result<QueryResult> {
870 // SPA-234: auto-create label if absent so the constraint is registered
871 // even before any nodes of this label exist. Subsequent CREATE
872 // statements look up the label_id from the persisted catalog and compare
873 // against the same unique_constraints set.
874 //
875 // Acquire the writer lock so that catalog mutations here cannot race
876 // with an active WriteTx that also holds an open Catalog handle.
877 // Both paths derive next_label_id from their own in-memory counter, so
878 // concurrent catalog writes without this guard can assign duplicate IDs.
879 let _guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
880 let mut catalog = sparrowdb_catalog::catalog::Catalog::open(&self.inner.path)?;
881 let label_id: u32 = match catalog.get_label(label)? {
882 Some(id) => id as u32,
883 None => {
884 let id = catalog.create_label(label)? as u32;
885 self.invalidate_catalog();
886 id
887 }
888 };
889 let col_id = sparrowdb_common::col_id_of(property);
890 self.inner
891 .unique_constraints
892 .write()
893 .unwrap()
894 .insert((label_id, col_id));
895 Ok(QueryResult::empty(vec![]))
896 }
897
898 fn execute_create_standalone(
899 &self,
900 create: &sparrowdb_cypher::ast::CreateStatement,
901 ) -> Result<QueryResult> {
902 // Pre-flight: verify that every variable referenced by an edge is
903 // declared as a named node in this CREATE clause. Doing this before
904 // opening the write transaction ensures that a malformed CREATE fails
905 // cleanly without leaving orphaned nodes on disk.
906 let declared_vars: HashSet<&str> = create
907 .nodes
908 .iter()
909 .filter(|n| !n.var.is_empty())
910 .map(|n| n.var.as_str())
911 .collect();
912
913 for (left_var, _, right_var) in &create.edges {
914 if !left_var.is_empty() && !declared_vars.contains(left_var.as_str()) {
915 return Err(sparrowdb_common::Error::InvalidArgument(format!(
916 "CREATE edge references undeclared variable '{left_var}'"
917 )));
918 }
919 if !right_var.is_empty() && !declared_vars.contains(right_var.as_str()) {
920 return Err(sparrowdb_common::Error::InvalidArgument(format!(
921 "CREATE edge references undeclared variable '{right_var}'"
922 )));
923 }
924 }
925
926 // SPA-208: reject reserved __SO_ label prefix on nodes.
927 for node in &create.nodes {
928 if let Some(label) = node.labels.first() {
929 if is_reserved_label(label) {
930 return Err(reserved_label_error(label));
931 }
932 }
933 }
934
935 // SPA-208: reject reserved __SO_ rel-type prefix on edges.
936 for (_, rel_pat, _) in &create.edges {
937 if is_reserved_label(&rel_pat.rel_type) {
938 return Err(reserved_label_error(&rel_pat.rel_type));
939 }
940 }
941
942 let mut tx = self.begin_write()?;
943
944 // Map variable name → NodeId for all newly created nodes.
945 let mut var_to_node: HashMap<String, NodeId> = HashMap::new();
946
947 for node in &create.nodes {
948 let label = node.labels.first().cloned().unwrap_or_default();
949 let label_id: u32 = match tx.catalog.get_label(&label)? {
950 Some(id) => id as u32,
951 None => tx.catalog.create_label(&label)? as u32,
952 };
953
954 let named_props: Vec<(String, Value)> = node
955 .props
956 .iter()
957 .map(|entry| {
958 let val = match &entry.value {
959 sparrowdb_cypher::ast::Expr::Literal(
960 sparrowdb_cypher::ast::Literal::Null,
961 ) => Err(sparrowdb_common::Error::InvalidArgument(format!(
962 "CREATE property '{}' is null; use a concrete value",
963 entry.key
964 ))),
965 sparrowdb_cypher::ast::Expr::Literal(
966 sparrowdb_cypher::ast::Literal::Param(p),
967 ) => Err(sparrowdb_common::Error::InvalidArgument(format!(
968 "CREATE property '{}' references parameter ${p}; runtime parameters are not yet supported in standalone CREATE",
969 entry.key
970 ))),
971 sparrowdb_cypher::ast::Expr::Literal(lit) => {
972 Ok(literal_to_value(lit))
973 }
974 _ => Err(sparrowdb_common::Error::InvalidArgument(format!(
975 "CREATE property '{}' must be a literal value (int, float, bool, or string)",
976 entry.key
977 ))),
978 }?;
979 Ok((entry.key.clone(), val))
980 })
981 .collect::<Result<Vec<_>>>()?;
982
983 // SPA-234: enforce UNIQUE constraints before writing.
984 //
985 // We compare decoded Values rather than raw u64 heap pointers.
986 // Strings longer than 7 bytes are stored as store-specific heap
987 // pointers: the same string written by two different NodeStore
988 // handles produces two different raw u64 values. Decoding each
989 // stored raw u64 back to Value and comparing is correct for all
990 // value types (inline integers/short strings AND heap-overflow strings).
991 //
992 // We check both the committed on-disk values (via check_store) AND
993 // nodes already buffered in tx.pending_ops (but not yet committed).
994 // Without the pending-ops check, two nodes with the same constrained
995 // value in one CREATE statement would both pass the on-disk check and
996 // then be committed together, violating the constraint.
997 {
998 let constraints = self.inner.unique_constraints.read().unwrap();
999 if !constraints.is_empty() {
1000 let check_store = NodeStore::open(&self.inner.path)?;
1001 for (prop_name, val) in &named_props {
1002 let col_id = sparrowdb_common::col_id_of(prop_name);
1003 if constraints.contains(&(label_id, col_id)) {
1004 // Check committed on-disk values.
1005 // Propagate I/O errors — silencing them would disable
1006 // the constraint check on read failure.
1007 let existing_raws = check_store.read_col_all(label_id, col_id)?;
1008 let conflict_on_disk = existing_raws.iter().any(|&raw| {
1009 if raw == 0 || raw == u64::MAX {
1010 return false;
1011 }
1012 check_store.decode_raw_value(raw) == *val
1013 });
1014 // Check nodes buffered earlier in this same statement.
1015 let conflict_in_tx = tx.pending_ops.iter().any(|op| match op {
1016 PendingOp::NodeCreate {
1017 label_id: pending_label_id,
1018 props,
1019 ..
1020 } => {
1021 *pending_label_id == label_id
1022 && props.iter().any(|(existing_col_id, existing_val)| {
1023 *existing_col_id == col_id && *existing_val == *val
1024 })
1025 }
1026 _ => false,
1027 });
1028 if conflict_on_disk || conflict_in_tx {
1029 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1030 "UNIQUE constraint violation: label \"{label}\" already has a node with {prop_name} = {:?}", val
1031 )));
1032 }
1033 }
1034 }
1035 }
1036 }
1037
1038 let node_id = tx.create_node_named(label_id, &named_props)?;
1039
1040 // Record the binding so edge patterns can resolve (src_var, dst_var).
1041 if !node.var.is_empty() {
1042 var_to_node.insert(node.var.clone(), node_id);
1043 }
1044 }
1045
1046 // Create edges between the freshly-created nodes.
1047 for (left_var, rel_pat, right_var) in &create.edges {
1048 let src = var_to_node.get(left_var).copied().ok_or_else(|| {
1049 sparrowdb_common::Error::InvalidArgument(format!(
1050 "CREATE edge references unresolved variable '{left_var}'"
1051 ))
1052 })?;
1053 let dst = var_to_node.get(right_var).copied().ok_or_else(|| {
1054 sparrowdb_common::Error::InvalidArgument(format!(
1055 "CREATE edge references unresolved variable '{right_var}'"
1056 ))
1057 })?;
1058 // Convert rel_pat.props (AST PropEntries) to HashMap<String, Value>.
1059 let edge_props: HashMap<String, Value> = rel_pat
1060 .props
1061 .iter()
1062 .map(|pe| {
1063 let val = match &pe.value {
1064 sparrowdb_cypher::ast::Expr::Literal(lit) => literal_to_value(lit),
1065 _ => {
1066 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1067 "CREATE edge property '{}' must be a literal value",
1068 pe.key
1069 )))
1070 }
1071 };
1072 Ok((pe.key.clone(), val))
1073 })
1074 .collect::<Result<HashMap<_, _>>>()?;
1075 tx.create_edge(src, dst, &rel_pat.rel_type, edge_props)?;
1076 }
1077
1078 tx.commit()?;
1079 self.invalidate_prop_index();
1080 self.invalidate_catalog();
1081 Ok(QueryResult::empty(vec![]))
1082 }
1083
1084 /// Execute a Cypher query with runtime parameter bindings.
1085 ///
1086 /// This is the parameterised variant of [`execute`]. Parameters are
1087 /// supplied as a `HashMap<String, sparrowdb_execution::Value>` and are
1088 /// available inside the query as `$name` expressions.
1089 ///
1090 /// # Example
1091 ///
1092 /// ```no_run
1093 /// use sparrowdb::GraphDb;
1094 /// use sparrowdb_execution::Value;
1095 /// use std::collections::HashMap;
1096 ///
1097 /// let db = GraphDb::open(std::path::Path::new("/tmp/my.sparrow")).unwrap();
1098 /// let mut params = HashMap::new();
1099 /// params.insert("names".into(), Value::List(vec![
1100 /// Value::String("Alice".into()),
1101 /// Value::String("Bob".into()),
1102 /// ]));
1103 /// let result = db.execute_with_params("UNWIND $names AS name RETURN name", params).unwrap();
1104 /// ```
1105 pub fn execute_with_params(
1106 &self,
1107 cypher: &str,
1108 params: HashMap<String, sparrowdb_execution::Value>,
1109 ) -> Result<QueryResult> {
1110 use sparrowdb_cypher::{bind, parse};
1111
1112 let stmt = parse(cypher)?;
1113 let catalog_snap = self.catalog_snapshot();
1114 let bound = bind(stmt, &catalog_snap)?;
1115
1116 use sparrowdb_cypher::ast::Statement;
1117 if let Statement::CreateConstraint { label, property } = &bound.inner {
1118 return self.register_unique_constraint(label, property);
1119 }
1120 if Engine::is_mutation(&bound.inner) {
1121 // Route mutations through params-aware helpers (SPA-218).
1122 use sparrowdb_cypher::ast::Statement as Stmt;
1123 return match bound.inner {
1124 Stmt::Merge(ref m) => self.execute_merge_with_params(m, ¶ms),
1125 Stmt::MatchMutate(ref mm) => self.execute_match_mutate_with_params(mm, ¶ms),
1126 _ => Err(Error::InvalidArgument(
1127 "execute_with_params: parameterized MATCH...CREATE and standalone CREATE \
1128 are not yet supported; use MERGE or MATCH...SET with $params"
1129 .into(),
1130 )),
1131 };
1132 }
1133
1134 let _span = info_span!("sparrowdb.query_with_params").entered();
1135
1136 let mut engine = {
1137 let _open_span = info_span!("sparrowdb.open_engine").entered();
1138 let csrs = self.cached_csr_map();
1139 self.build_read_engine(NodeStore::open(&self.inner.path)?, catalog_snap, csrs)
1140 .with_params(params)
1141 };
1142
1143 let result = {
1144 let _exec_span = info_span!("sparrowdb.execute").entered();
1145 engine.execute_statement(bound.inner)?
1146 };
1147
1148 engine.write_back_prop_index(&self.inner.prop_index);
1149
1150 tracing::debug!(rows = result.rows.len(), "query_with_params complete");
1151 Ok(result)
1152 }
1153
1154 /// Internal: execute a MERGE statement by opening a write transaction.
1155 fn execute_merge(&self, m: &sparrowdb_cypher::ast::MergeStatement) -> Result<QueryResult> {
1156 let props: HashMap<String, Value> = m
1157 .props
1158 .iter()
1159 .map(|pe| (pe.key.clone(), expr_to_value(&pe.value)))
1160 .collect();
1161 let mut tx = self.begin_write()?;
1162 let node_id = tx.merge_node(&m.label, props.clone())?;
1163 tx.commit()?;
1164 self.invalidate_prop_index();
1165 self.invalidate_catalog();
1166
1167 // If the statement has a RETURN clause, project the merged node's properties.
1168 if let Some(ref ret) = m.return_clause {
1169 use sparrowdb_cypher::ast::Expr;
1170 // Use the execution-layer Value type for building QueryResult rows.
1171 type ExecValue = sparrowdb_execution::Value;
1172
1173 let var = if m.var.is_empty() {
1174 "n"
1175 } else {
1176 m.var.as_str()
1177 };
1178
1179 // Collect all property names referenced in the RETURN clause so we
1180 // can look them up by col_id from the actual on-disk node state.
1181 // This is correct even when the node already existed with extra
1182 // properties not present in the merge pattern (SPA-215 / CodeAnt bug).
1183 let return_props: Vec<String> = ret
1184 .items
1185 .iter()
1186 .filter_map(|item| match &item.expr {
1187 Expr::PropAccess { var: v, prop } if v.as_str() == var => Some(prop.clone()),
1188 _ => None,
1189 })
1190 .collect();
1191
1192 // Derive col_ids for every property name referenced in RETURN.
1193 let return_col_ids: Vec<u32> =
1194 return_props.iter().map(|name| fnv1a_col_id(name)).collect();
1195
1196 // Read the actual on-disk node state via NodeStore::get_node, which
1197 // uses Value::from_u64 (type-tag-aware decoding) to correctly
1198 // reconstruct Bytes/String values rather than misinterpreting them as
1199 // raw integers. We open a fresh NodeStore after the write committed
1200 // so we see the fully-merged node state.
1201 let store = NodeStore::open(&self.inner.path)?;
1202 let stored = store.get_node(node_id, &return_col_ids).unwrap_or_default();
1203
1204 // Build an eval map: "{var}.{prop_name}" -> ExecValue from the
1205 // actual stored values rather than the input pattern props.
1206 let mut row_vals: HashMap<String, ExecValue> = HashMap::new();
1207 for (prop_name, col_id) in return_props.iter().zip(return_col_ids.iter()) {
1208 if let Some((_, val)) = stored.iter().find(|(c, _)| c == col_id) {
1209 let exec_val = storage_value_to_exec(val);
1210 row_vals.insert(format!("{var}.{prop_name}"), exec_val);
1211 }
1212 }
1213
1214 // Derive column names from the RETURN items.
1215 let columns: Vec<String> = ret
1216 .items
1217 .iter()
1218 .map(|item| {
1219 item.alias.clone().unwrap_or_else(|| match &item.expr {
1220 Expr::PropAccess { var: v, prop } => format!("{v}.{prop}"),
1221 Expr::Var(v) => v.clone(),
1222 _ => "?".to_string(),
1223 })
1224 })
1225 .collect();
1226
1227 // Evaluate each RETURN expression using the actual-state prop map.
1228 let row: Vec<ExecValue> = ret
1229 .items
1230 .iter()
1231 .map(|item| eval_expr_merge(&item.expr, &row_vals))
1232 .collect();
1233
1234 return Ok(QueryResult {
1235 columns,
1236 rows: vec![row],
1237 });
1238 }
1239
1240 Ok(QueryResult::empty(vec![]))
1241 }
1242
1243 /// Params-aware MERGE with $param support (SPA-218).
1244 fn execute_merge_with_params(
1245 &self,
1246 m: &sparrowdb_cypher::ast::MergeStatement,
1247 params: &HashMap<String, sparrowdb_execution::Value>,
1248 ) -> Result<QueryResult> {
1249 let props: HashMap<String, Value> = m
1250 .props
1251 .iter()
1252 .map(|pe| {
1253 let val = expr_to_value_with_params(&pe.value, params)?;
1254 Ok((pe.key.clone(), val))
1255 })
1256 .collect::<Result<HashMap<_, _>>>()?;
1257 let mut tx = self.begin_write()?;
1258 let node_id = tx.merge_node(&m.label, props.clone())?;
1259 tx.commit()?;
1260 self.invalidate_prop_index();
1261 self.invalidate_catalog();
1262 if let Some(ref ret) = m.return_clause {
1263 use sparrowdb_cypher::ast::Expr;
1264 type ExecValue = sparrowdb_execution::Value;
1265 let var = if m.var.is_empty() {
1266 "n"
1267 } else {
1268 m.var.as_str()
1269 };
1270 let return_props: Vec<String> = ret
1271 .items
1272 .iter()
1273 .filter_map(|item| match &item.expr {
1274 Expr::PropAccess { var: v, prop } if v.as_str() == var => Some(prop.clone()),
1275 _ => None,
1276 })
1277 .collect();
1278 let return_col_ids: Vec<u32> =
1279 return_props.iter().map(|name| fnv1a_col_id(name)).collect();
1280 let store = NodeStore::open(&self.inner.path)?;
1281 let stored = store.get_node(node_id, &return_col_ids).unwrap_or_default();
1282 let mut row_vals: HashMap<String, ExecValue> = HashMap::new();
1283 for (prop_name, col_id) in return_props.iter().zip(return_col_ids.iter()) {
1284 if let Some((_, val)) = stored.iter().find(|(c, _)| c == col_id) {
1285 row_vals.insert(format!("{var}.{prop_name}"), storage_value_to_exec(val));
1286 }
1287 }
1288 let columns: Vec<String> = ret
1289 .items
1290 .iter()
1291 .map(|item| {
1292 item.alias.clone().unwrap_or_else(|| match &item.expr {
1293 Expr::PropAccess { var: v, prop } => format!("{v}.{prop}"),
1294 Expr::Var(v) => v.clone(),
1295 _ => "?".to_string(),
1296 })
1297 })
1298 .collect();
1299 let row: Vec<ExecValue> = ret
1300 .items
1301 .iter()
1302 .map(|item| eval_expr_merge(&item.expr, &row_vals))
1303 .collect();
1304 return Ok(QueryResult {
1305 columns,
1306 rows: vec![row],
1307 });
1308 }
1309 Ok(QueryResult::empty(vec![]))
1310 }
1311
1312 /// Params-aware MATCH...SET with $param support (SPA-218).
1313 fn execute_match_mutate_with_params(
1314 &self,
1315 mm: &sparrowdb_cypher::ast::MatchMutateStatement,
1316 params: &HashMap<String, sparrowdb_execution::Value>,
1317 ) -> Result<QueryResult> {
1318 let mut tx = self.begin_write()?;
1319 let csrs = self.cached_csr_map();
1320 let engine = Engine::new(
1321 NodeStore::open(&self.inner.path)?,
1322 self.catalog_snapshot(),
1323 csrs,
1324 &self.inner.path,
1325 );
1326
1327 // SPA-219: `MATCH (a)-[r:REL]->(b) DELETE r` — edge delete path.
1328 if is_edge_delete_mutation(mm) {
1329 let edges = engine.scan_match_mutate_edges(mm)?;
1330 for (src, dst, rel_type) in edges {
1331 tx.delete_edge(src, dst, &rel_type)?;
1332 }
1333 tx.commit()?;
1334 self.invalidate_csr_map();
1335 self.invalidate_prop_index();
1336 self.invalidate_catalog();
1337 return Ok(QueryResult::empty(vec![]));
1338 }
1339
1340 let matching_ids = engine.scan_match_mutate(mm)?;
1341 if matching_ids.is_empty() {
1342 return Ok(QueryResult::empty(vec![]));
1343 }
1344 match &mm.mutation {
1345 sparrowdb_cypher::ast::Mutation::Set { prop, value, .. } => {
1346 let sv = expr_to_value_with_params(value, params)?;
1347 for node_id in matching_ids {
1348 tx.set_property(node_id, prop, sv.clone())?;
1349 }
1350 }
1351 sparrowdb_cypher::ast::Mutation::Delete { .. } => {
1352 for node_id in matching_ids {
1353 tx.delete_node(node_id)?;
1354 }
1355 }
1356 }
1357 tx.commit()?;
1358 self.invalidate_prop_index();
1359 self.invalidate_catalog();
1360 Ok(QueryResult::empty(vec![]))
1361 }
1362
1363 /// Internal: execute a MATCH … SET / DELETE by scanning then writing.
1364 ///
1365 /// The write lock is acquired **before** the scan so that no concurrent
1366 /// writer can commit between the scan and the mutation, preventing stale
1367 /// matches from being mutated.
1368 fn execute_match_mutate(
1369 &self,
1370 mm: &sparrowdb_cypher::ast::MatchMutateStatement,
1371 ) -> Result<QueryResult> {
1372 // Acquire the write lock first. From this point on no other writer
1373 // can commit until we call tx.commit() or drop tx.
1374 let mut tx = self.begin_write()?;
1375
1376 // Build an Engine that reads from the same on-disk snapshot the write
1377 // transaction was opened against. Because we hold the write lock,
1378 // the data on disk cannot change between this scan and the mutations
1379 // below.
1380 let csrs = self.cached_csr_map();
1381 let engine = Engine::new(
1382 NodeStore::open(&self.inner.path)?,
1383 self.catalog_snapshot(),
1384 csrs,
1385 &self.inner.path,
1386 );
1387
1388 // SPA-219: `MATCH (a)-[r:REL]->(b) DELETE r` — edge delete path.
1389 if is_edge_delete_mutation(mm) {
1390 let edges = engine.scan_match_mutate_edges(mm)?;
1391 for (src, dst, rel_type) in edges {
1392 tx.delete_edge(src, dst, &rel_type)?;
1393 }
1394 tx.commit()?;
1395 self.invalidate_csr_map();
1396 self.invalidate_prop_index();
1397 self.invalidate_catalog();
1398 return Ok(QueryResult::empty(vec![]));
1399 }
1400
1401 // Collect matching node ids via the engine's scan (lock already held).
1402 let matching_ids = engine.scan_match_mutate(mm)?;
1403
1404 if matching_ids.is_empty() {
1405 return Ok(QueryResult::empty(vec![]));
1406 }
1407
1408 match &mm.mutation {
1409 sparrowdb_cypher::ast::Mutation::Set { prop, value, .. } => {
1410 let sv = expr_to_value(value);
1411 for node_id in matching_ids {
1412 tx.set_property(node_id, prop, sv.clone())?;
1413 }
1414 }
1415 sparrowdb_cypher::ast::Mutation::Delete { .. } => {
1416 for node_id in matching_ids {
1417 tx.delete_node(node_id)?;
1418 }
1419 }
1420 }
1421
1422 tx.commit()?;
1423 self.invalidate_prop_index();
1424 self.invalidate_catalog();
1425 Ok(QueryResult::empty(vec![]))
1426 }
1427
1428 /// Internal: execute a `MATCH … CREATE (a)-[:R]->(b)` statement.
1429 ///
1430 /// 1. Acquires the write lock (preventing concurrent writes).
1431 /// 2. Opens a read-capable Engine to execute the MATCH clause, producing
1432 /// one correlated binding row per match result (`var → NodeId`).
1433 /// 3. For each result row, calls `WriteTx::create_edge` using the exact
1434 /// node IDs bound by that row — never re-scans or builds a Cartesian
1435 /// product across all node candidates (SPA-183).
1436 /// 4. Commits the write transaction.
1437 ///
1438 /// If the MATCH finds no rows the CREATE is a no-op (no edges created,
1439 /// no error — SPA-168).
1440 fn execute_match_create(
1441 &self,
1442 mc: &sparrowdb_cypher::ast::MatchCreateStatement,
1443 ) -> Result<QueryResult> {
1444 // Acquire the write lock first so no concurrent writer can commit
1445 // between the scan and the edge creation.
1446 let mut tx = self.begin_write()?;
1447
1448 // Build an Engine for the read-scan phase.
1449 //
1450 // Use build_read_engine so that the lazy property index and cached
1451 // label row counts are reused. MATCH…CREATE only creates edges — it
1452 // never changes node property columns — so the cached data remains
1453 // valid across calls.
1454 let csrs = self.cached_csr_map();
1455 let engine = self.build_read_engine(
1456 NodeStore::open(&self.inner.path)?,
1457 self.catalog_snapshot(),
1458 csrs,
1459 );
1460
1461 // SPA-208: reject reserved __SO_ rel-type prefix on edges in the CREATE clause.
1462 // Check before the Unimplemented guard so the reserved-name error takes priority.
1463 for (_, rel_pat, _) in &mc.create.edges {
1464 if is_reserved_label(&rel_pat.rel_type) {
1465 return Err(reserved_label_error(&rel_pat.rel_type));
1466 }
1467 }
1468
1469 // MATCH…CREATE only supports edge creation from matched bindings.
1470 // Standalone node creation (CREATE clause with no edges) is not yet
1471 // implemented. Note: the parser includes edge-endpoint nodes in
1472 // mc.create.nodes even for pure edge patterns, so we guard on
1473 // mc.create.edges being empty rather than mc.create.nodes being
1474 // non-empty (SPA-183 had the wrong guard).
1475 if mc.create.edges.is_empty() {
1476 return Err(Error::Unimplemented);
1477 }
1478
1479 // Execute the MATCH clause to get correlated binding rows.
1480 // Each row is a HashMap<variable_name, NodeId> representing one
1481 // matched combination. This replaces the old `scan_match_create`
1482 // approach which collected candidates per variable independently and
1483 // then took a full Cartesian product — causing N² edge creation when
1484 // multiple nodes of the same label existed (SPA-183).
1485
1486 let matched_rows = engine.scan_match_create_rows(mc)?;
1487
1488 // Write any newly-loaded index columns back to the shared cache so
1489 // subsequent MATCH…CREATE calls can reuse them without re-reading disk.
1490 // Node property columns are NOT changed by this operation (only edges
1491 // are created), so the cache remains valid.
1492 engine.write_back_prop_index(&self.inner.prop_index);
1493
1494 if matched_rows.is_empty() {
1495 return Ok(QueryResult::empty(vec![]));
1496 }
1497
1498 // For each matched row, create every edge declared in the CREATE clause
1499 // using the NodeIds bound in that specific row.
1500 for row in &matched_rows {
1501 for (left_var, rel_pat, right_var) in &mc.create.edges {
1502 let src = row.get(left_var).copied().ok_or_else(|| {
1503 Error::InvalidArgument(format!(
1504 "CREATE references unbound variable: {left_var}"
1505 ))
1506 })?;
1507 let dst = row.get(right_var).copied().ok_or_else(|| {
1508 Error::InvalidArgument(format!(
1509 "CREATE references unbound variable: {right_var}"
1510 ))
1511 })?;
1512 let edge_props: HashMap<String, Value> = rel_pat
1513 .props
1514 .iter()
1515 .map(|pe| {
1516 let val = match &pe.value {
1517 sparrowdb_cypher::ast::Expr::Literal(lit) => literal_to_value(lit),
1518 _ => {
1519 return Err(Error::InvalidArgument(format!(
1520 "CREATE edge property '{}' must be a literal value",
1521 pe.key
1522 )))
1523 }
1524 };
1525 Ok((pe.key.clone(), val))
1526 })
1527 .collect::<Result<HashMap<_, _>>>()?;
1528 tx.create_edge(src, dst, &rel_pat.rel_type, edge_props)?;
1529 }
1530 }
1531
1532 tx.commit()?;
1533 // Note: do NOT call invalidate_prop_index() here — MATCH…CREATE only
1534 // creates edges, not node properties, so the cached column data remains
1535 // valid. Only invalidate the catalog in case a new rel-type was registered.
1536 self.invalidate_catalog();
1537 Ok(QueryResult::empty(vec![]))
1538 }
1539
1540 /// Execute `MATCH … MERGE (a)-[r:TYPE]->(b)`: find-or-create a relationship.
1541 ///
1542 /// Algorithm (SPA-233):
1543 /// 1. MATCH the node patterns to get correlated (src, dst) NodeId pairs.
1544 /// 2. For each pair, look up the rel-type in the catalog. If the rel table
1545 /// does not exist yet, the relationship cannot exist → create it.
1546 /// 3. If the rel table exists, scan the delta log and (if checkpointed) the
1547 /// CSR forward file to check whether a (src_slot, dst_slot) edge already
1548 /// exists for this rel type.
1549 /// 4. If no existing edge is found, call `create_edge` to create it.
1550 ///
1551 /// The write lock is acquired before the scan so that no concurrent writer
1552 /// can race between the existence check and the edge creation.
1553 fn execute_match_merge_rel(
1554 &self,
1555 mm: &sparrowdb_cypher::ast::MatchMergeRelStatement,
1556 ) -> Result<QueryResult> {
1557 use sparrowdb_storage::edge_store::EdgeStore;
1558
1559 // Acquire the write lock first.
1560 let mut tx = self.begin_write()?;
1561
1562 // Build an Engine for the read-scan phase.
1563 let csrs = self.cached_csr_map();
1564 let engine = Engine::new(
1565 NodeStore::open(&self.inner.path)?,
1566 self.catalog_snapshot(),
1567 csrs,
1568 &self.inner.path,
1569 );
1570
1571 // Guard: reject reserved rel-type prefix used by internal system edges.
1572 if mm.rel_type.starts_with("__SO_") {
1573 return Err(Error::InvalidArgument(format!(
1574 "relationship type '{}' is reserved",
1575 mm.rel_type
1576 )));
1577 }
1578
1579 // Scan MATCH patterns to get correlated (src_var, dst_var) NodeId rows.
1580 let matched_rows = engine.scan_match_merge_rel_rows(mm)?;
1581 if matched_rows.is_empty() {
1582 // No matched nodes → nothing to merge; this is a no-op (not an error).
1583 return Ok(QueryResult::empty(vec![]));
1584 }
1585
1586 for row in &matched_rows {
1587 // Resolve the bound node IDs for this row.
1588 let src = if mm.src_var.is_empty() {
1589 // Anonymous source — cannot merge without a variable binding.
1590 return Err(Error::InvalidArgument(
1591 "MERGE relationship pattern source variable is anonymous; \
1592 use a named variable bound by the MATCH clause"
1593 .into(),
1594 ));
1595 } else {
1596 *row.get(&mm.src_var).ok_or_else(|| {
1597 Error::InvalidArgument(format!(
1598 "MERGE references unbound source variable: {}",
1599 mm.src_var
1600 ))
1601 })?
1602 };
1603
1604 let dst = if mm.dst_var.is_empty() {
1605 return Err(Error::InvalidArgument(
1606 "MERGE relationship pattern destination variable is anonymous; \
1607 use a named variable bound by the MATCH clause"
1608 .into(),
1609 ));
1610 } else {
1611 *row.get(&mm.dst_var).ok_or_else(|| {
1612 Error::InvalidArgument(format!(
1613 "MERGE references unbound destination variable: {}",
1614 mm.dst_var
1615 ))
1616 })?
1617 };
1618
1619 // Derive label IDs from the packed NodeIds.
1620 let src_label_id = (src.0 >> 32) as u16;
1621 let dst_label_id = (dst.0 >> 32) as u16;
1622 let src_slot = src.0 & 0xFFFF_FFFF;
1623 let dst_slot = dst.0 & 0xFFFF_FFFF;
1624
1625 // Look up whether a rel table for this type already exists in the catalog.
1626 // `get_rel_table` returns `Ok(None)` if no such table is registered yet,
1627 // meaning no edge of this type can exist.
1628 let catalog_rel_id_opt =
1629 tx.catalog
1630 .get_rel_table(src_label_id, dst_label_id, &mm.rel_type)?;
1631
1632 let edge_already_exists = if let Some(catalog_rel_id) = catalog_rel_id_opt {
1633 let rel_table_id = RelTableId(catalog_rel_id as u32);
1634
1635 if let Ok(store) = EdgeStore::open(&self.inner.path, rel_table_id) {
1636 // Check delta log.
1637 let in_delta = store
1638 .read_delta()?
1639 .iter()
1640 .any(|rec| rec.src.0 == src.0 && rec.dst.0 == dst.0);
1641
1642 // Check CSR base (post-checkpoint edges).
1643 let in_csr = if let Ok(csr) = store.open_fwd() {
1644 csr.neighbors(src_slot).contains(&dst_slot)
1645 } else {
1646 false
1647 };
1648
1649 // Also check pending (uncommitted in this same tx) edge creates.
1650 let in_pending = tx.pending_ops.iter().any(|op| {
1651 matches!(
1652 op,
1653 PendingOp::EdgeCreate {
1654 src: ps,
1655 dst: pd,
1656 rel_table_id: prt,
1657 ..
1658 } if *ps == src && *pd == dst && *prt == rel_table_id
1659 )
1660 });
1661
1662 in_delta || in_csr || in_pending
1663 } else {
1664 false
1665 }
1666 } else {
1667 false
1668 };
1669
1670 if !edge_already_exists {
1671 tx.create_edge(src, dst, &mm.rel_type, HashMap::new())?;
1672 }
1673 }
1674
1675 tx.commit()?;
1676 self.invalidate_prop_index();
1677 self.invalidate_catalog();
1678 Ok(QueryResult::empty(vec![]))
1679 }
1680
1681 /// Create (or overwrite) a named full-text index.
1682 ///
1683 /// Creates the on-disk backing file for the named index so that
1684 /// `CALL db.index.fulltext.queryNodes(name, query)` can find it.
1685 /// If an index with the same name already exists its contents are cleared
1686 /// (overwrite semantics). Document ingestion happens separately via
1687 /// [`WriteTx::add_to_fulltext_index`].
1688 ///
1689 /// Acquires the writer lock — returns [`Error::WriterBusy`] if a
1690 /// [`WriteTx`] is already active.
1691 ///
1692 /// # Example
1693 /// ```no_run
1694 /// # use sparrowdb::GraphDb;
1695 /// # let db = GraphDb::open(std::path::Path::new("/tmp/test")).unwrap();
1696 /// db.create_fulltext_index("searchIndex")?;
1697 /// # Ok::<(), sparrowdb_common::Error>(())
1698 /// ```
1699 pub fn create_fulltext_index(&self, name: &str) -> Result<()> {
1700 use sparrowdb_storage::fulltext_index::FulltextIndex;
1701 // Acquire the writer lock so this cannot race with an active WriteTx
1702 // that is reading or flushing the same index file.
1703 let _guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
1704 FulltextIndex::create(&self.inner.path, name)?;
1705 Ok(())
1706 }
1707
1708 /// Execute multiple Cypher queries as a single atomic batch.
1709 ///
1710 /// All queries share one [`WriteTx`] and a **single WAL fsync** at the end,
1711 /// making this significantly faster than N individual [`execute`] calls when
1712 /// ingesting many nodes (e.g. Neo4j import, bulk data load).
1713 ///
1714 /// ## Semantics
1715 ///
1716 /// * **Atomicity** — if any query in the batch fails, the entire batch is
1717 /// rolled back (the [`WriteTx`] is dropped without committing).
1718 /// * **Ordering** — queries are executed in slice order; each query sees the
1719 /// mutations of all preceding queries in the batch.
1720 /// * **Single fsync** — the WAL is synced exactly once, after the last
1721 /// query in the batch has been applied successfully.
1722 ///
1723 /// ## Restrictions
1724 ///
1725 /// * Only **write** statements are accepted (CREATE, MERGE, MATCH…SET,
1726 /// MATCH…DELETE, MATCH…CREATE). Read-only `MATCH … RETURN` queries are
1727 /// allowed but their results are included in the returned `Vec`.
1728 /// * `CHECKPOINT` and `OPTIMIZE` inside a batch are executed immediately
1729 /// and do **not** participate in the shared transaction; they act as
1730 /// no-ops for atomicity purposes.
1731 /// * Parameters are not supported — use [`execute_with_params`] for that.
1732 ///
1733 /// ## Example
1734 ///
1735 /// ```no_run
1736 /// use sparrowdb::GraphDb;
1737 ///
1738 /// let db = GraphDb::open(std::path::Path::new("/tmp/batch.sparrow")).unwrap();
1739 ///
1740 /// let queries: Vec<&str> = (0..100)
1741 /// .map(|i| Box::leak(format!("CREATE (n:Person {{id: {i}}})", i = i).into_boxed_str()) as &str)
1742 /// .collect();
1743 ///
1744 /// let results = db.execute_batch(&queries).unwrap();
1745 /// assert_eq!(results.len(), 100);
1746 /// ```
1747 ///
1748 /// [`execute`]: GraphDb::execute
1749 /// [`execute_with_params`]: GraphDb::execute_with_params
1750 pub fn execute_batch(&self, queries: &[&str]) -> Result<Vec<QueryResult>> {
1751 use sparrowdb_cypher::ast::Statement;
1752 use sparrowdb_cypher::{bind, parse};
1753
1754 if queries.is_empty() {
1755 return Ok(Vec::new());
1756 }
1757
1758 // Pre-parse and bind all queries before acquiring the writer lock.
1759 // A syntax or bind error fails fast without ever locking.
1760 let catalog_snap = self.catalog_snapshot();
1761 let bound_stmts: Vec<_> = queries
1762 .iter()
1763 .map(|q| {
1764 let stmt = parse(q)?;
1765 bind(stmt, &catalog_snap)
1766 })
1767 .collect::<Result<Vec<_>>>()?;
1768
1769 // CHECKPOINT and OPTIMIZE cannot run inside a WriteTx (they acquire
1770 // the writer lock themselves). Execute them eagerly in document order
1771 // and record placeholder results; they do not participate in the
1772 // shared transaction's atomicity.
1773 let mut early_results: Vec<Option<QueryResult>> = vec![None; bound_stmts.len()];
1774 for (i, bound) in bound_stmts.iter().enumerate() {
1775 match &bound.inner {
1776 Statement::Checkpoint => {
1777 self.checkpoint()?;
1778 early_results[i] = Some(QueryResult::empty(vec![]));
1779 }
1780 Statement::Optimize => {
1781 self.optimize()?;
1782 early_results[i] = Some(QueryResult::empty(vec![]));
1783 }
1784 Statement::CreateConstraint { label, property } => {
1785 early_results[i] = Some(self.register_unique_constraint(label, property)?);
1786 }
1787 _ => {}
1788 }
1789 }
1790
1791 // Acquire a single WriteTx for all structural + property mutations.
1792 let mut tx = self.begin_write()?;
1793 let mut results: Vec<Option<QueryResult>> = vec![None; bound_stmts.len()];
1794
1795 for (i, bound) in bound_stmts.into_iter().enumerate() {
1796 if early_results[i].is_some() {
1797 continue;
1798 }
1799
1800 let result = if Engine::is_mutation(&bound.inner) {
1801 self.execute_batch_mutation(bound.inner, &mut tx)?
1802 } else {
1803 // Read-only statement: execute against current snapshot.
1804 let csrs = self.cached_csr_map();
1805 let batch_catalog = self.catalog_snapshot();
1806 let mut engine = Engine::new(
1807 NodeStore::open(&self.inner.path)?,
1808 batch_catalog,
1809 csrs,
1810 &self.inner.path,
1811 );
1812 engine.execute_statement(bound.inner)?
1813 };
1814 results[i] = Some(result);
1815 }
1816
1817 // Single fsync commit — all mutations land in one WAL transaction.
1818 tx.commit()?;
1819 self.invalidate_prop_index();
1820 self.invalidate_catalog();
1821
1822 // Merge early (CHECKPOINT/OPTIMIZE) and mutation results in order.
1823 let final_results = results
1824 .into_iter()
1825 .enumerate()
1826 .map(|(i, r)| {
1827 r.or_else(|| early_results[i].take())
1828 .unwrap_or_else(|| QueryResult::empty(vec![]))
1829 })
1830 .collect();
1831
1832 Ok(final_results)
1833 }
1834
1835 /// Internal: apply a single mutation statement to an already-open [`WriteTx`].
1836 ///
1837 /// Called by [`execute_batch`] to accumulate multiple mutations into one
1838 /// transaction (and therefore one WAL fsync on commit).
1839 fn execute_batch_mutation(
1840 &self,
1841 stmt: sparrowdb_cypher::ast::Statement,
1842 tx: &mut WriteTx,
1843 ) -> Result<QueryResult> {
1844 use sparrowdb_cypher::ast::Statement;
1845
1846 match stmt {
1847 Statement::Create(ref c) => {
1848 // Inline the logic from execute_create_standalone using the
1849 // shared tx rather than opening a new WriteTx.
1850 let declared_vars: HashSet<&str> = c
1851 .nodes
1852 .iter()
1853 .filter(|n| !n.var.is_empty())
1854 .map(|n| n.var.as_str())
1855 .collect();
1856 for (left_var, _, right_var) in &c.edges {
1857 if !left_var.is_empty() && !declared_vars.contains(left_var.as_str()) {
1858 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1859 "CREATE edge references undeclared variable '{left_var}'"
1860 )));
1861 }
1862 if !right_var.is_empty() && !declared_vars.contains(right_var.as_str()) {
1863 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1864 "CREATE edge references undeclared variable '{right_var}'"
1865 )));
1866 }
1867 }
1868 for node in &c.nodes {
1869 if let Some(label) = node.labels.first() {
1870 if is_reserved_label(label) {
1871 return Err(reserved_label_error(label));
1872 }
1873 }
1874 }
1875 for (_, rel_pat, _) in &c.edges {
1876 if is_reserved_label(&rel_pat.rel_type) {
1877 return Err(reserved_label_error(&rel_pat.rel_type));
1878 }
1879 }
1880
1881 let mut var_to_node: HashMap<String, NodeId> = HashMap::new();
1882 for node in &c.nodes {
1883 let label = node.labels.first().cloned().unwrap_or_default();
1884 let label_id: u32 = match tx.catalog.get_label(&label)? {
1885 Some(id) => id as u32,
1886 None => tx.catalog.create_label(&label)? as u32,
1887 };
1888 let named_props: Vec<(String, Value)> = node
1889 .props
1890 .iter()
1891 .map(|entry| {
1892 let val = match &entry.value {
1893 sparrowdb_cypher::ast::Expr::Literal(
1894 sparrowdb_cypher::ast::Literal::Null,
1895 ) => Err(sparrowdb_common::Error::InvalidArgument(format!(
1896 "CREATE property '{}' is null",
1897 entry.key
1898 ))),
1899 sparrowdb_cypher::ast::Expr::Literal(
1900 sparrowdb_cypher::ast::Literal::Param(p),
1901 ) => Err(sparrowdb_common::Error::InvalidArgument(format!(
1902 "CREATE property '{}' references parameter ${p}",
1903 entry.key
1904 ))),
1905 sparrowdb_cypher::ast::Expr::Literal(lit) => {
1906 Ok(literal_to_value(lit))
1907 }
1908 _ => Err(sparrowdb_common::Error::InvalidArgument(format!(
1909 "CREATE property '{}' must be a literal",
1910 entry.key
1911 ))),
1912 }?;
1913 Ok((entry.key.clone(), val))
1914 })
1915 .collect::<Result<Vec<_>>>()?;
1916 let node_id = tx.create_node_named(label_id, &named_props)?;
1917 if !node.var.is_empty() {
1918 var_to_node.insert(node.var.clone(), node_id);
1919 }
1920 }
1921 for (left_var, rel_pat, right_var) in &c.edges {
1922 let src = var_to_node.get(left_var).copied().ok_or_else(|| {
1923 sparrowdb_common::Error::InvalidArgument(format!(
1924 "CREATE edge references unresolved variable '{left_var}'"
1925 ))
1926 })?;
1927 let dst = var_to_node.get(right_var).copied().ok_or_else(|| {
1928 sparrowdb_common::Error::InvalidArgument(format!(
1929 "CREATE edge references unresolved variable '{right_var}'"
1930 ))
1931 })?;
1932 tx.create_edge(src, dst, &rel_pat.rel_type, HashMap::new())?;
1933 }
1934 Ok(QueryResult::empty(vec![]))
1935 }
1936
1937 Statement::Merge(ref m) => {
1938 let props: HashMap<String, Value> = m
1939 .props
1940 .iter()
1941 .map(|pe| (pe.key.clone(), expr_to_value(&pe.value)))
1942 .collect();
1943 tx.merge_node(&m.label, props)?;
1944 Ok(QueryResult::empty(vec![]))
1945 }
1946
1947 Statement::MatchMutate(ref mm) => {
1948 let csrs = self.cached_csr_map();
1949 let engine = Engine::new(
1950 NodeStore::open(&self.inner.path)?,
1951 self.catalog_snapshot(),
1952 csrs,
1953 &self.inner.path,
1954 );
1955 // SPA-219: edge delete path.
1956 if is_edge_delete_mutation(mm) {
1957 let edges = engine.scan_match_mutate_edges(mm)?;
1958 for (src, dst, rel_type) in edges {
1959 tx.delete_edge(src, dst, &rel_type)?;
1960 }
1961 } else {
1962 let matching_ids = engine.scan_match_mutate(mm)?;
1963 for node_id in matching_ids {
1964 match &mm.mutation {
1965 sparrowdb_cypher::ast::Mutation::Set { prop, value, .. } => {
1966 let sv = expr_to_value(value);
1967 tx.set_property(node_id, prop, sv)?;
1968 }
1969 sparrowdb_cypher::ast::Mutation::Delete { .. } => {
1970 tx.delete_node(node_id)?;
1971 }
1972 }
1973 }
1974 }
1975 Ok(QueryResult::empty(vec![]))
1976 }
1977
1978 Statement::MatchCreate(ref mc) => {
1979 for (_, rel_pat, _) in &mc.create.edges {
1980 if is_reserved_label(&rel_pat.rel_type) {
1981 return Err(reserved_label_error(&rel_pat.rel_type));
1982 }
1983 }
1984 if mc.create.edges.is_empty() {
1985 return Err(Error::Unimplemented);
1986 }
1987 let csrs = self.cached_csr_map();
1988 let engine = Engine::new(
1989 NodeStore::open(&self.inner.path)?,
1990 self.catalog_snapshot(),
1991 csrs,
1992 &self.inner.path,
1993 );
1994 let matched_rows = engine.scan_match_create_rows(mc)?;
1995 for row in &matched_rows {
1996 for (left_var, rel_pat, right_var) in &mc.create.edges {
1997 let src = row.get(left_var).copied().ok_or_else(|| {
1998 Error::InvalidArgument(format!(
1999 "CREATE references unbound variable: {left_var}"
2000 ))
2001 })?;
2002 let dst = row.get(right_var).copied().ok_or_else(|| {
2003 Error::InvalidArgument(format!(
2004 "CREATE references unbound variable: {right_var}"
2005 ))
2006 })?;
2007 tx.create_edge(src, dst, &rel_pat.rel_type, HashMap::new())?;
2008 }
2009 }
2010 Ok(QueryResult::empty(vec![]))
2011 }
2012
2013 Statement::MatchMergeRel(ref mm) => {
2014 // Find-or-create relationship batch variant (SPA-233).
2015 //
2016 // NOTE: MATCH...MERGE in a batch reads committed on-disk state only;
2017 // nodes/edges created earlier in the same batch are not visible to
2018 // the MERGE existence check. If in-batch visibility is required,
2019 // flush the transaction to disk before issuing MATCH...MERGE.
2020 let csrs = self.cached_csr_map();
2021 let engine = Engine::new(
2022 NodeStore::open(&self.inner.path)?,
2023 self.catalog_snapshot(),
2024 csrs,
2025 &self.inner.path,
2026 );
2027 // Guard: reject reserved rel-type prefix used by internal system edges.
2028 if mm.rel_type.starts_with("__SO_") {
2029 return Err(Error::InvalidArgument(format!(
2030 "relationship type '{}' is reserved",
2031 mm.rel_type
2032 )));
2033 }
2034 let matched_rows = engine.scan_match_merge_rel_rows(mm)?;
2035 for row in &matched_rows {
2036 let src = *row.get(&mm.src_var).ok_or_else(|| {
2037 Error::InvalidArgument(format!(
2038 "MERGE references unbound source variable: {}",
2039 mm.src_var
2040 ))
2041 })?;
2042 let dst = *row.get(&mm.dst_var).ok_or_else(|| {
2043 Error::InvalidArgument(format!(
2044 "MERGE references unbound destination variable: {}",
2045 mm.dst_var
2046 ))
2047 })?;
2048 let src_label_id = (src.0 >> 32) as u16;
2049 let dst_label_id = (dst.0 >> 32) as u16;
2050 let src_slot = src.0 & 0xFFFF_FFFF;
2051 let dst_slot = dst.0 & 0xFFFF_FFFF;
2052
2053 let catalog_rel_id_opt =
2054 tx.catalog
2055 .get_rel_table(src_label_id, dst_label_id, &mm.rel_type)?;
2056
2057 let edge_exists = if let Some(crid) = catalog_rel_id_opt {
2058 let rtid = RelTableId(crid as u32);
2059 if let Ok(store) = EdgeStore::open(&self.inner.path, rtid) {
2060 let in_delta = store
2061 .read_delta()?
2062 .iter()
2063 .any(|rec| rec.src.0 == src.0 && rec.dst.0 == dst.0);
2064 let in_csr = if let Ok(csr) = store.open_fwd() {
2065 csr.neighbors(src_slot).contains(&dst_slot)
2066 } else {
2067 false
2068 };
2069 let in_pending = tx.pending_ops.iter().any(|op| {
2070 matches!(
2071 op,
2072 PendingOp::EdgeCreate {
2073 src: ps, dst: pd, rel_table_id: prt, ..
2074 } if *ps == src && *pd == dst && *prt == rtid
2075 )
2076 });
2077 in_delta || in_csr || in_pending
2078 } else {
2079 false
2080 }
2081 } else {
2082 false
2083 };
2084
2085 if !edge_exists {
2086 tx.create_edge(src, dst, &mm.rel_type, HashMap::new())?;
2087 }
2088 }
2089 Ok(QueryResult::empty(vec![]))
2090 }
2091
2092 _ => Err(Error::Unimplemented),
2093 }
2094 }
2095
2096 /// Return `(node_count, edge_count)` by summing the high-water marks
2097 /// across all catalog labels (nodes) and delta-log record counts across
2098 /// all registered relationship tables (edges).
2099 ///
2100 /// Both counts reflect committed storage state; in-flight write
2101 /// transactions are not visible.
2102 ///
2103 /// **Note:** `node_count` is based on per-label high-water marks and
2104 /// therefore includes soft-deleted nodes whose slots have not yet been
2105 /// reclaimed. The count will converge to the true live-node count once
2106 /// compaction / GC is implemented.
2107 pub fn db_counts(&self) -> Result<(u64, u64)> {
2108 let path = &self.inner.path;
2109 let catalog = self.catalog_snapshot();
2110 let node_store = NodeStore::open(path)?;
2111
2112 // Node count: sum hwm_for_label across every registered label.
2113 let node_count: u64 = catalog
2114 .list_labels()
2115 .unwrap_or_default()
2116 .iter()
2117 .map(|(label_id, _name)| node_store.hwm_for_label(*label_id as u32).unwrap_or(0))
2118 .sum();
2119
2120 // Edge count: for each registered rel table, sum CSR base edges (post-checkpoint)
2121 // plus delta-log records (pre-checkpoint / unflushed).
2122 let rel_table_ids = catalog.list_rel_table_ids();
2123 let ids_to_scan: Vec<u64> = if rel_table_ids.is_empty() {
2124 // No rel tables in catalog yet — fall back to the default table (id=0).
2125 vec![0]
2126 } else {
2127 rel_table_ids.iter().map(|(id, _, _, _)| *id).collect()
2128 };
2129 let edge_count: u64 = ids_to_scan
2130 .iter()
2131 .map(|&id| {
2132 let Ok(store) = EdgeStore::open(path, RelTableId(id as u32)) else {
2133 return 0;
2134 };
2135 let csr_edges = store.open_fwd().map(|csr| csr.n_edges()).unwrap_or(0);
2136 let delta_edges = store
2137 .read_delta()
2138 .map(|records| records.len() as u64)
2139 .unwrap_or(0);
2140 csr_edges + delta_edges
2141 })
2142 .sum();
2143
2144 Ok((node_count, edge_count))
2145 }
2146
2147 /// Return all node label names currently registered in the catalog (SPA-209).
2148 ///
2149 /// Labels are registered automatically the first time a node with that label
2150 /// is created. The returned list is ordered by insertion (i.e. by `LabelId`).
2151 pub fn labels(&self) -> Result<Vec<String>> {
2152 let catalog = self.catalog_snapshot();
2153 Ok(catalog
2154 .list_labels()?
2155 .into_iter()
2156 .map(|(_id, name)| name)
2157 .collect())
2158 }
2159
2160 /// Return all relationship type names currently registered in the catalog (SPA-209).
2161 ///
2162 /// The returned list is deduplicated and sorted alphabetically.
2163 pub fn relationship_types(&self) -> Result<Vec<String>> {
2164 let catalog = self.catalog_snapshot();
2165 let mut types: Vec<String> = catalog
2166 .list_rel_tables()?
2167 .into_iter()
2168 .map(|(_src, _dst, rel_type)| rel_type)
2169 .collect();
2170 types.sort();
2171 types.dedup();
2172 Ok(types)
2173 }
2174
2175 /// Return the top-`limit` nodes of the given `label` ordered by out-degree
2176 /// descending (SPA-168).
2177 ///
2178 /// Each element of the returned `Vec` is `(node_id, out_degree)`. Ties are
2179 /// broken by ascending node id for determinism.
2180 ///
2181 /// This is an O(N log k) operation backed by the pre-computed
2182 /// [`DegreeCache`](sparrowdb_execution::DegreeCache) — no edge scan is
2183 /// performed at query time.
2184 ///
2185 /// Returns an empty `Vec` when `limit == 0`, the label is unknown, or the
2186 /// label has no nodes.
2187 ///
2188 /// # Errors
2189 /// Propagates I/O errors from opening the node store, CSR files, or
2190 /// catalog.
2191 pub fn top_degree_nodes(&self, label: &str, limit: usize) -> Result<Vec<(u64, u32)>> {
2192 if limit == 0 {
2193 return Ok(vec![]);
2194 }
2195 let path = &self.inner.path;
2196 let catalog = self.catalog_snapshot();
2197 let label_id: u32 = match catalog.get_label(label)? {
2198 Some(id) => id as u32,
2199 None => return Ok(vec![]),
2200 };
2201 let csrs = open_csr_map(path);
2202 let engine = Engine::new(NodeStore::open(path)?, catalog, csrs, path);
2203 engine.top_k_by_degree(label_id, limit)
2204 }
2205
2206 /// Export the graph as a DOT (Graphviz) string for visualization.
2207 ///
2208 /// Queries all nodes via Cypher and reads edges directly from storage so
2209 /// that the output is correct regardless of whether the database has been
2210 /// checkpointed (CSR) or not (delta log). The result can be piped through
2211 /// `dot -Tsvg` to produce an SVG, or written to a `.dot` file for offline
2212 /// rendering.
2213 ///
2214 /// ## Example
2215 ///
2216 /// ```no_run
2217 /// # use sparrowdb::GraphDb;
2218 /// # let db = GraphDb::open(std::path::Path::new("/tmp/my.sparrow")).unwrap();
2219 /// let dot = db.export_dot().unwrap();
2220 /// std::fs::write("graph.dot", &dot).unwrap();
2221 /// // Then: dot -Tsvg graph.dot -o graph.svg
2222 /// ```
2223 pub fn export_dot(&self) -> Result<String> {
2224 /// Escape a string for use inside a DOT label (backslash + double-quote).
2225 fn dot_escape(s: &str) -> String {
2226 s.replace('\\', "\\\\").replace('"', "\\\"")
2227 }
2228
2229 /// Format a `Value` as a human-readable string for DOT labels / edge
2230 /// labels. Falls back to the `Display` impl for most variants.
2231 fn fmt_val(v: &sparrowdb_execution::types::Value) -> String {
2232 match v {
2233 sparrowdb_execution::types::Value::String(s) => s.clone(),
2234 sparrowdb_execution::types::Value::Int64(i) => i.to_string(),
2235 sparrowdb_execution::types::Value::List(items) => {
2236 // labels(n) returns a List of String items.
2237 items
2238 .iter()
2239 .filter_map(|it| {
2240 if let sparrowdb_execution::types::Value::String(s) = it {
2241 Some(s.as_str())
2242 } else {
2243 None
2244 }
2245 })
2246 .collect::<Vec<_>>()
2247 .join(":")
2248 }
2249 other => format!("{other}"),
2250 }
2251 }
2252
2253 let path = &self.inner.path;
2254 let catalog = self.catalog_snapshot();
2255
2256 // Query all nodes via Cypher — id(n) is reliably Int64 from node scans.
2257 let nodes =
2258 self.execute("MATCH (n) RETURN id(n) AS nid, labels(n) AS lbls, n.name AS nm")?;
2259
2260 // Read edges directly from storage.
2261 //
2262 // The Cypher engine's project_hop_row does not expose id(a)/id(b) in
2263 // hop patterns (SPA-149 follow-up), so we read the delta log
2264 // (pre-checkpoint) and CSR (post-checkpoint) directly.
2265 //
2266 // NodeId encoding: (label_id as u64) << 32 | slot
2267 // This is the same as the value returned by id(n) for node scans.
2268 let rel_tables = catalog.list_rel_tables_with_ids();
2269 let mut edge_triples: Vec<(i64, String, i64)> = Vec::new();
2270
2271 for (catalog_id, src_label_id, dst_label_id, rel_type) in &rel_tables {
2272 let storage_rel_id = sparrowdb_storage::edge_store::RelTableId(*catalog_id as u32);
2273
2274 if let Ok(store) = sparrowdb_storage::edge_store::EdgeStore::open(path, storage_rel_id)
2275 {
2276 // Delta log: stores full NodeId pairs directly.
2277 if let Ok(records) = store.read_delta() {
2278 for rec in records {
2279 edge_triples.push((rec.src.0 as i64, rel_type.clone(), rec.dst.0 as i64));
2280 }
2281 }
2282
2283 // CSR: (src_slot, dst_slot) relative to label IDs.
2284 if let Ok(csr) = store.open_fwd() {
2285 let n_nodes = csr.n_nodes();
2286 for src_slot in 0..n_nodes {
2287 let src_id = ((*src_label_id as u64) << 32 | src_slot) as i64;
2288 for &dst_slot in csr.neighbors(src_slot) {
2289 let dst_id = ((*dst_label_id as u64) << 32 | dst_slot) as i64;
2290 edge_triples.push((src_id, rel_type.clone(), dst_id));
2291 }
2292 }
2293 }
2294 }
2295 }
2296
2297 // Build DOT output.
2298 let mut dot =
2299 String::from("digraph SparrowDB {\n rankdir=LR;\n node [shape=ellipse];\n\n");
2300
2301 for row in &nodes.rows {
2302 let node_id = match &row[0] {
2303 sparrowdb_execution::types::Value::Int64(i) => *i,
2304 _ => continue,
2305 };
2306 let label_str = fmt_val(&row[1]);
2307 let name_str = match &row[2] {
2308 sparrowdb_execution::types::Value::Null => String::new(),
2309 v => fmt_val(v),
2310 };
2311 let display_label = if name_str.is_empty() {
2312 format!("{}\\nid={}", dot_escape(&label_str), node_id)
2313 } else {
2314 format!("{}\\n{}", dot_escape(&label_str), dot_escape(&name_str))
2315 };
2316 dot.push_str(&format!(" n{node_id} [label=\"{display_label}\"];\n"));
2317 }
2318
2319 dot.push('\n');
2320
2321 for (src_id, rel_type, dst_id) in &edge_triples {
2322 dot.push_str(&format!(
2323 " n{src_id} -> n{dst_id} [label=\"{}\"];\n",
2324 dot_escape(rel_type)
2325 ));
2326 }
2327
2328 dot.push_str("}\n");
2329 Ok(dot)
2330 }
2331
2332 /// SPA-247: Convenience wrapper for upsert — find an existing node with
2333 /// `(label, match_key=match_value)` and update its properties, or create a
2334 /// new one if none exists.
2335 ///
2336 /// Opens a single-operation write transaction, calls
2337 /// [`WriteTx::merge_node_by_property`], and commits.
2338 ///
2339 /// Returns `(NodeId, created)`.
2340 pub fn merge_node_by_property(
2341 &self,
2342 label: &str,
2343 match_key: &str,
2344 match_value: &Value,
2345 properties: HashMap<String, Value>,
2346 ) -> Result<(NodeId, bool)> {
2347 let mut tx = self.begin_write()?;
2348 let result = tx.merge_node_by_property(label, match_key, match_value, properties)?;
2349 tx.commit()?;
2350 self.invalidate_prop_index();
2351 self.invalidate_catalog();
2352 Ok(result)
2353 }
2354
2355 /// Convenience wrapper: remove the directed edge `src → dst` of `rel_type`.
2356 ///
2357 /// Opens a single-operation write transaction, calls
2358 /// [`WriteTx::delete_edge`], and commits. Suitable for callers that need
2359 /// to remove a specific edge without managing a transaction explicitly.
2360 ///
2361 /// Returns [`Error::WriterBusy`] if another write transaction is already
2362 /// active; returns [`Error::InvalidArgument`] if the rel type or edge is
2363 /// not found.
2364 pub fn delete_edge(&self, src: NodeId, dst: NodeId, rel_type: &str) -> Result<()> {
2365 let mut tx = self.begin_write()?;
2366 tx.delete_edge(src, dst, rel_type)?;
2367 tx.commit()?;
2368 self.invalidate_csr_map();
2369 Ok(())
2370 }
2371}
2372
2373/// Convenience wrapper — equivalent to [`GraphDb::open`].
2374pub fn open(path: &Path) -> Result<GraphDb> {
2375 GraphDb::open(path)
2376}
2377
2378/// Migrate WAL segments from legacy v21 (CRC32 IEEE) to v2 (CRC32C Castagnoli).
2379///
2380/// Call this on a database path **before** opening it with [`GraphDb::open`].
2381/// The database must not be open by any other process.
2382///
2383/// This is safe to run on a database that is already at v2 — those segments are
2384/// simply skipped. The migration is idempotent.
2385///
2386/// # Example
2387///
2388/// ```no_run
2389/// use std::path::Path;
2390/// let result = sparrowdb::migrate_wal(Path::new("/path/to/my.sparrow")).unwrap();
2391/// println!("Converted {} segments", result.segments_converted);
2392/// ```
2393pub fn migrate_wal(db_path: &Path) -> Result<sparrowdb_storage::wal::migrate::MigrationResult> {
2394 let wal_dir = db_path.join("wal");
2395 if !wal_dir.exists() {
2396 return Ok(sparrowdb_storage::wal::migrate::MigrationResult {
2397 segments_inspected: 0,
2398 segments_converted: 0,
2399 segments_skipped: 0,
2400 records_converted: 0,
2401 });
2402 }
2403 sparrowdb_storage::wal::migrate::migrate_wal(&wal_dir)
2404}
2405
2406// ── Legacy alias ──────────────────────────────────────────────────────────────
2407
2408/// Legacy alias kept for backward compatibility with Phase 0 tests.
2409pub type SparrowDB = GraphDb;
2410
2411// ── ReadTx ────────────────────────────────────────────────────────────────────
2412
2413/// A read-only snapshot transaction.
2414///
2415/// Pinned at the `txn_id` current when this handle was opened; immune to
2416/// subsequent writer commits for the lifetime of this handle.
2417pub struct ReadTx {
2418 /// The committed `txn_id` this reader is pinned to.
2419 pub snapshot_txn_id: u64,
2420 store: NodeStore,
2421 inner: Arc<DbInner>,
2422}
2423
2424impl ReadTx {
2425 /// Read the `Int64` property values of a node at the pinned snapshot.
2426 ///
2427 /// For each column the version chain is consulted first; if a value was
2428 /// committed at or before `snapshot_txn_id` it shadows the on-disk value.
2429 pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, Value)>> {
2430 let versions = self.inner.versions.read().expect("version lock poisoned");
2431 let raw = self.store.get_node_raw(node_id, col_ids)?;
2432 let result = raw
2433 .into_iter()
2434 .map(|(col_id, raw_val)| {
2435 // Check version chain first.
2436 if let Some(v) = versions.get_at(node_id, col_id, self.snapshot_txn_id) {
2437 (col_id, v)
2438 } else {
2439 (col_id, self.store.decode_raw_value(raw_val))
2440 }
2441 })
2442 .collect();
2443 Ok(result)
2444 }
2445
2446 /// Return the snapshot `TxnId` this reader is pinned to.
2447 pub fn snapshot(&self) -> TxnId {
2448 TxnId(self.snapshot_txn_id)
2449 }
2450
2451 /// Execute a read-only Cypher query against the pinned snapshot.
2452 ///
2453 /// ## Snapshot isolation
2454 ///
2455 /// The query sees exactly the committed state at the moment
2456 /// [`begin_read`](GraphDb::begin_read) was called. Any writes committed
2457 /// after that point — even fully committed ones — are invisible until a
2458 /// new `ReadTx` is opened.
2459 ///
2460 /// ## Concurrency
2461 ///
2462 /// Multiple `ReadTx` handles may run `query` concurrently. No write lock
2463 /// is acquired; only the shared read-paths of the catalog, CSR, and
2464 /// property-index caches are accessed.
2465 ///
2466 /// ## Mutation statements rejected
2467 ///
2468 /// Passing a mutation statement (`CREATE`, `MERGE`, `MATCH … SET`,
2469 /// `MATCH … DELETE`, `CHECKPOINT`, `OPTIMIZE`, etc.) returns
2470 /// [`Error::ReadOnly`]. Use [`GraphDb::execute`] for mutations.
2471 ///
2472 /// # Example
2473 /// ```no_run
2474 /// use sparrowdb::GraphDb;
2475 ///
2476 /// let db = GraphDb::open(std::path::Path::new("/tmp/g.sparrow")).unwrap();
2477 /// let tx = db.begin_read().unwrap();
2478 /// let result = tx.query("MATCH (n:Person) RETURN n.name").unwrap();
2479 /// println!("{} rows", result.rows.len());
2480 /// ```
2481 pub fn query(&self, cypher: &str) -> crate::Result<QueryResult> {
2482 use sparrowdb_cypher::{bind, parse};
2483
2484 let stmt = parse(cypher)?;
2485
2486 // Take a snapshot of the catalog from the shared cache (no disk I/O if
2487 // the catalog is already warm).
2488 let catalog_snap = self
2489 .inner
2490 .catalog
2491 .read()
2492 .expect("catalog RwLock poisoned")
2493 .clone();
2494
2495 let bound = bind(stmt, &catalog_snap)?;
2496
2497 // Reject any statement that would mutate state — ReadTx is read-only.
2498 if Engine::is_mutation(&bound.inner) {
2499 return Err(crate::Error::ReadOnly);
2500 }
2501
2502 // Also reject DDL / maintenance statements.
2503 use sparrowdb_cypher::ast::Statement;
2504 match &bound.inner {
2505 Statement::Checkpoint | Statement::Optimize | Statement::CreateConstraint { .. } => {
2506 return Err(crate::Error::ReadOnly);
2507 }
2508 _ => {}
2509 }
2510
2511 let _span = info_span!("sparrowdb.readtx.query").entered();
2512
2513 let csrs = self
2514 .inner
2515 .csr_map
2516 .read()
2517 .expect("csr_map RwLock poisoned")
2518 .clone();
2519
2520 let mut engine = {
2521 let _open_span = info_span!("sparrowdb.readtx.open_engine").entered();
2522 let row_counts = self
2523 .inner
2524 .label_row_counts
2525 .read()
2526 .expect("label_row_counts RwLock poisoned")
2527 .clone();
2528 Engine::new_with_all_caches(
2529 NodeStore::open(&self.inner.path)?,
2530 catalog_snap,
2531 csrs,
2532 &self.inner.path,
2533 Some(&self.inner.prop_index),
2534 Some(row_counts),
2535 Some(Arc::clone(&self.inner.edge_props_cache)),
2536 )
2537 };
2538
2539 let result = {
2540 let _exec_span = info_span!("sparrowdb.readtx.execute").entered();
2541 engine.execute_statement(bound.inner)?
2542 };
2543
2544 // Write lazily-loaded columns back to the shared property-index cache
2545 // so subsequent queries benefit from warm column data.
2546 engine.write_back_prop_index(&self.inner.prop_index);
2547
2548 tracing::debug!(
2549 rows = result.rows.len(),
2550 snapshot_txn_id = self.snapshot_txn_id,
2551 "readtx query complete"
2552 );
2553 Ok(result)
2554 }
2555}
2556
2557// ── WriteTx ───────────────────────────────────────────────────────────────────
2558
2559/// A write transaction.
2560///
2561/// Only one may be active at a time (writer-lock held for the lifetime of
2562/// this struct). Commit by calling [`WriteTx::commit`]; uncommitted changes
2563/// are discarded on drop.
2564///
2565/// # Atomicity guarantee (SPA-181)
2566///
2567/// All mutations (`create_node`, `create_edge`, `delete_node`, `create_label`,
2568/// `merge_node`) are buffered in memory until [`commit`] is called. If the
2569/// transaction is dropped without committing, **no changes are persisted** —
2570/// the database remains in the state it was at the time [`begin_write`] was
2571/// called.
2572///
2573/// [`begin_write`]: GraphDb::begin_write
2574/// [`commit`]: WriteTx::commit
2575#[must_use = "call commit() to persist changes, or drop to discard"]
2576pub struct WriteTx {
2577 inner: Arc<DbInner>,
2578 store: NodeStore,
2579 catalog: Catalog,
2580 /// Staged property updates (not yet visible to readers).
2581 write_buf: WriteBuffer,
2582 /// Staged WAL mutation records to emit on commit.
2583 wal_mutations: Vec<WalMutation>,
2584 /// Set of node IDs written by this transaction (for MVCC conflict detection).
2585 dirty_nodes: HashSet<u64>,
2586 /// The committed txn_id at the time this WriteTx was opened (MVCC snapshot).
2587 snapshot_txn_id: u64,
2588 /// Held for the lifetime of this WriteTx; released on drop.
2589 /// Uses AtomicBool-based guard — no unsafe lifetime extension needed (SPA-181).
2590 _guard: WriteGuard,
2591 committed: bool,
2592 /// In-flight fulltext index updates — flushed to disk on commit.
2593 ///
2594 /// Caching open indexes here avoids one open+flush per `add_to_fulltext_index`
2595 /// call; instead we batch all additions and flush each index exactly once.
2596 fulltext_pending: HashMap<String, sparrowdb_storage::fulltext_index::FulltextIndex>,
2597 /// Buffered structural mutations (create_node, delete_node, create_edge,
2598 /// create_label) not yet written to disk. Flushed atomically on commit.
2599 pending_ops: Vec<PendingOp>,
2600}
2601
2602impl WriteTx {
2603 // ── Core node/property API (pre-Phase 7) ─────────────────────────────────
2604
2605 /// Create a new node under `label_id` with the given properties.
2606 ///
2607 /// Returns the packed [`NodeId`].
2608 ///
2609 /// The node is **not** written to disk until [`commit`] is called.
2610 /// Dropping the transaction without committing discards this operation.
2611 ///
2612 /// [`commit`]: WriteTx::commit
2613 pub fn create_node(&mut self, label_id: u32, props: &[(u32, Value)]) -> Result<NodeId> {
2614 // Allocate a node ID by consulting the in-memory HWM. We peek at
2615 // the current HWM to compute the future NodeId without actually
2616 // writing anything to disk yet.
2617 let slot = self.store.peek_next_slot(label_id)?;
2618 let node_id = NodeId((label_id as u64) << 32 | slot as u64);
2619 self.dirty_nodes.insert(node_id.0);
2620 self.pending_ops.push(PendingOp::NodeCreate {
2621 label_id,
2622 slot,
2623 props: props.to_vec(),
2624 });
2625 self.wal_mutations.push(WalMutation::NodeCreate {
2626 node_id,
2627 label_id,
2628 props: props.to_vec(),
2629 // Low-level create_node: no property names available.
2630 prop_names: Vec::new(),
2631 });
2632 Ok(node_id)
2633 }
2634
2635 /// Create a new node with named properties, recording names in the WAL.
2636 ///
2637 /// Like [`create_node`] but accepts `(name, value)` pairs so that the
2638 /// property names are preserved in the WAL record for schema introspection
2639 /// (`CALL db.schema()`). Col-ids are derived via [`col_id_of`].
2640 pub fn create_node_named(
2641 &mut self,
2642 label_id: u32,
2643 named_props: &[(String, Value)],
2644 ) -> Result<NodeId> {
2645 let props: Vec<(u32, Value)> = named_props
2646 .iter()
2647 .map(|(name, v)| (col_id_of(name), v.clone()))
2648 .collect();
2649 let prop_names: Vec<String> = named_props.iter().map(|(n, _)| n.clone()).collect();
2650 let slot = self.store.peek_next_slot(label_id)?;
2651 let node_id = NodeId((label_id as u64) << 32 | slot as u64);
2652 self.dirty_nodes.insert(node_id.0);
2653 self.pending_ops.push(PendingOp::NodeCreate {
2654 label_id,
2655 slot,
2656 props: props.clone(),
2657 });
2658 self.wal_mutations.push(WalMutation::NodeCreate {
2659 node_id,
2660 label_id,
2661 props,
2662 prop_names,
2663 });
2664 Ok(node_id)
2665 }
2666
2667 /// Stage a property update.
2668 ///
2669 /// The new value is not visible to readers until [`commit`] is called.
2670 ///
2671 /// On the first update to a `(node_id, col_id)` key during this
2672 /// transaction, the current on-disk value is read and stored as the
2673 /// before-image so that readers with older snapshots continue to see the
2674 /// correct value after commit overwrites the column file.
2675 pub fn set_node_col(&mut self, node_id: NodeId, col_id: u32, value: Value) {
2676 self.set_node_col_named(node_id, col_id, format!("col_{col_id}"), value);
2677 }
2678
2679 /// Stage a property update with an explicit human-readable key name for WAL.
2680 fn set_node_col_named(&mut self, node_id: NodeId, col_id: u32, key_name: String, value: Value) {
2681 let key = (node_id.0, col_id);
2682 self.dirty_nodes.insert(node_id.0);
2683
2684 if self.write_buf.updates.contains_key(&key) {
2685 // Already staged this key — just update the new_value.
2686 let entry = self.write_buf.updates.get_mut(&key).unwrap();
2687 entry.new_value = value;
2688 // Keep the key_name from the first staging (it's the same column).
2689 return;
2690 }
2691
2692 // First update to this key in this transaction. Capture the
2693 // before-image so readers pinned before our commit retain access.
2694 let prev_txn_id = self.inner.current_txn_id.load(Ordering::Acquire);
2695
2696 // Check whether the version chain already has an entry for this key.
2697 // If so, the chain is already correct and we don't need to add the
2698 // before-image separately.
2699 let already_in_chain = {
2700 let vs = self.inner.versions.read().expect("version lock");
2701 vs.map.contains_key(&key)
2702 };
2703
2704 let before_image = if already_in_chain {
2705 None
2706 } else {
2707 // Read the current on-disk value as the before-image.
2708 let disk_val = self
2709 .store
2710 .get_node_raw(node_id, &[col_id])
2711 .ok()
2712 .and_then(|mut v| v.pop())
2713 .map(|(_, raw)| self.store.decode_raw_value(raw));
2714 disk_val.map(|v| (prev_txn_id, v))
2715 };
2716
2717 self.write_buf.updates.insert(
2718 key,
2719 StagedUpdate {
2720 before_image,
2721 new_value: value,
2722 key_name,
2723 },
2724 );
2725 }
2726
2727 /// Create a label in the schema catalog.
2728 ///
2729 /// # Note on atomicity
2730 ///
2731 /// Schema changes (label creation) are written to the catalog file
2732 /// immediately and are not rolled back if the transaction is later
2733 /// dropped without committing. Label creation is idempotent at the
2734 /// catalog level: a duplicate name returns `Error::AlreadyExists`.
2735 /// Full schema-change atomicity is deferred to a future phase.
2736 pub fn create_label(&mut self, name: &str) -> Result<u16> {
2737 self.catalog.create_label(name)
2738 }
2739
2740 /// Look up `name` in the catalog, creating it if it does not yet exist.
2741 ///
2742 /// Returns the `label_id` as a `u32` (upper 32 bits of a packed NodeId).
2743 /// Unlike [`create_label`], this method is idempotent: calling it multiple
2744 /// times with the same name always returns the same id.
2745 ///
2746 /// Primarily used by the bulk-import path (SPA-148) where labels may be
2747 /// seen for the first time on any row.
2748 ///
2749 /// [`create_label`]: WriteTx::create_label
2750 pub fn get_or_create_label_id(&mut self, name: &str) -> Result<u32> {
2751 match self.catalog.get_label(name)? {
2752 Some(id) => Ok(id as u32),
2753 None => Ok(self.catalog.create_label(name)? as u32),
2754 }
2755 }
2756
2757 // ── Phase 7 mutation API (SPA-123 … SPA-126) ─────────────────────────────
2758
2759 /// SPA-123: Find or create a node matching `label` + `props`.
2760 ///
2761 /// Scans the node store for a slot whose columns match every key→value
2762 /// pair in `props` (using [`fnv1a_col_id`] to derive column IDs from
2763 /// key strings). Returns the existing [`NodeId`] if found, or creates a
2764 /// new node and returns the new id.
2765 ///
2766 /// The label is resolved (or created) in the catalog.
2767 pub fn merge_node(&mut self, label: &str, props: HashMap<String, Value>) -> Result<NodeId> {
2768 // Resolve / create label.
2769 let label_id: u32 = match self.catalog.get_label(label)? {
2770 Some(id) => id as u32,
2771 None => self.catalog.create_label(label)? as u32,
2772 };
2773
2774 // Build col list from props keys.
2775 let col_kv: Vec<(String, u32, Value)> = props
2776 .into_iter()
2777 .map(|(k, v)| {
2778 let col_id = fnv1a_col_id(&k);
2779 (k, col_id, v)
2780 })
2781 .collect();
2782 let col_ids: Vec<u32> = col_kv.iter().map(|&(_, col_id, _)| col_id).collect();
2783
2784 // First, check buffered (not-yet-committed) node creates in pending_ops.
2785 // This ensures merge_node is idempotent within a single transaction.
2786 for op in &self.pending_ops {
2787 if let PendingOp::NodeCreate {
2788 label_id: op_label_id,
2789 slot: op_slot,
2790 props: op_props,
2791 } = op
2792 {
2793 if *op_label_id == label_id {
2794 let candidate = NodeId((label_id as u64) << 32 | *op_slot as u64);
2795 let matches = col_kv.iter().all(|(_, col_id, want_val)| {
2796 op_props
2797 .iter()
2798 .find(|&&(c, _)| c == *col_id)
2799 // Compare in-memory Value objects directly so long
2800 // strings (> 7 bytes) are not truncated (SPA-212).
2801 .map(|(_, v)| v == want_val)
2802 .unwrap_or(false)
2803 });
2804 if matches {
2805 return Ok(candidate);
2806 }
2807 }
2808 }
2809 }
2810
2811 // Scan on-disk slots for a match (only checks committed/on-disk nodes).
2812 // Use disk_hwm_for_label to avoid scanning slots that were only reserved
2813 // in-memory by peek_next_slot but not yet flushed to disk.
2814 let disk_hwm = self.store.disk_hwm_for_label(label_id)?;
2815 for slot in 0..disk_hwm {
2816 let candidate = NodeId((label_id as u64) << 32 | slot);
2817 if let Ok(stored) = self.store.get_node_raw(candidate, &col_ids) {
2818 let matches = col_kv.iter().all(|(_, col_id, want_val)| {
2819 stored
2820 .iter()
2821 .find(|&&(c, _)| c == *col_id)
2822 .map(|&(_, raw)| {
2823 // Compare decoded values so overflow strings (> 7 bytes)
2824 // match correctly (SPA-212).
2825 self.store.decode_raw_value(raw) == *want_val
2826 })
2827 .unwrap_or(false)
2828 });
2829 if matches {
2830 return Ok(candidate);
2831 }
2832 }
2833 }
2834
2835 // Not found — create a new node (buffered, same as create_node).
2836 let disk_props: Vec<(u32, Value)> = col_kv
2837 .iter()
2838 .map(|(_, col_id, v)| (*col_id, v.clone()))
2839 .collect();
2840 // Preserve property names from the col_kv tuples (key, col_id, value).
2841 let disk_prop_names: Vec<String> = col_kv.iter().map(|(k, _, _)| k.clone()).collect();
2842 let slot = self.store.peek_next_slot(label_id)?;
2843 let node_id = NodeId((label_id as u64) << 32 | slot as u64);
2844 self.dirty_nodes.insert(node_id.0);
2845 self.pending_ops.push(PendingOp::NodeCreate {
2846 label_id,
2847 slot,
2848 props: disk_props.clone(),
2849 });
2850 self.wal_mutations.push(WalMutation::NodeCreate {
2851 node_id,
2852 label_id,
2853 props: disk_props,
2854 prop_names: disk_prop_names,
2855 });
2856 Ok(node_id)
2857 }
2858
2859 /// SPA-247: Upsert a node — find an existing node with `(label, match_key=match_value)`
2860 /// and update its properties, or create a new one if none exists.
2861 ///
2862 /// Returns `(NodeId, created)` where `created` is `true` when a new node
2863 /// was inserted and `false` when an existing node was found and updated.
2864 ///
2865 /// The lookup scans both pending (in-transaction) nodes and committed
2866 /// on-disk nodes for a slot whose label matches and whose `match_key`
2867 /// column equals `match_value`. On a hit the remaining `properties` are
2868 /// applied via [`set_property`]; on a miss a new node is created via
2869 /// [`merge_node`] with `match_key=match_value` merged into `properties`.
2870 pub fn merge_node_by_property(
2871 &mut self,
2872 label: &str,
2873 match_key: &str,
2874 match_value: &Value,
2875 properties: HashMap<String, Value>,
2876 ) -> Result<(NodeId, bool)> {
2877 let label_id: u32 = match self.catalog.get_label(label)? {
2878 Some(id) => id as u32,
2879 None => {
2880 // Label doesn't exist yet — no node can match. Create it.
2881 let mut full_props = properties;
2882 full_props.insert(match_key.to_string(), match_value.clone());
2883 let node_id = self.merge_node(label, full_props)?;
2884 return Ok((node_id, true));
2885 }
2886 };
2887
2888 let match_col_id = fnv1a_col_id(match_key);
2889 let match_col_ids = vec![match_col_id];
2890
2891 // Step 1: Check pending (in-transaction) nodes.
2892 for op in &self.pending_ops {
2893 if let PendingOp::NodeCreate {
2894 label_id: op_label_id,
2895 slot: op_slot,
2896 props: op_props,
2897 } = op
2898 {
2899 if *op_label_id == label_id {
2900 let matches = op_props
2901 .iter()
2902 .find(|&&(c, _)| c == match_col_id)
2903 .map(|(_, v)| v == match_value)
2904 .unwrap_or(false);
2905 if matches {
2906 let node_id = NodeId((label_id as u64) << 32 | *op_slot as u64);
2907 // Update remaining properties on the existing node.
2908 for (k, v) in &properties {
2909 self.set_property(node_id, k, v.clone())?;
2910 }
2911 return Ok((node_id, false));
2912 }
2913 }
2914 }
2915 }
2916
2917 // Step 2: Scan on-disk committed nodes.
2918 let disk_hwm = self.store.disk_hwm_for_label(label_id)?;
2919 for slot in 0..disk_hwm {
2920 let candidate = NodeId((label_id as u64) << 32 | slot);
2921 if let Ok(stored) = self.store.get_node_raw(candidate, &match_col_ids) {
2922 let matches = stored
2923 .iter()
2924 .find(|&&(c, _)| c == match_col_id)
2925 .map(|&(_, raw)| self.store.decode_raw_value(raw) == *match_value)
2926 .unwrap_or(false);
2927 if matches {
2928 // Update remaining properties on the existing node.
2929 for (k, v) in &properties {
2930 self.set_property(candidate, k, v.clone())?;
2931 }
2932 return Ok((candidate, false));
2933 }
2934 }
2935 }
2936
2937 // Step 3: Not found — create new node with match_key included.
2938 let mut full_props = properties;
2939 full_props.insert(match_key.to_string(), match_value.clone());
2940 let node_id = self.merge_node(label, full_props)?;
2941 Ok((node_id, true))
2942 }
2943
2944 /// SPA-124: Update a named property on a node.
2945 ///
2946 /// Derives a stable column ID from `key` via [`fnv1a_col_id`] and stages
2947 /// the update through [`set_node_col`] (which records the before-image in
2948 /// the write buffer). WAL emission happens once at commit time via the
2949 /// `updates` loop in `write_mutation_wal`.
2950 pub fn set_property(&mut self, node_id: NodeId, key: &str, val: Value) -> Result<()> {
2951 let col_id = fnv1a_col_id(key);
2952 self.dirty_nodes.insert(node_id.0);
2953
2954 // Stage the update through the write buffer (records before-image for
2955 // WAL and MVCC). WAL emission happens exactly once at commit time via
2956 // the `updates` loop in `write_mutation_wal`. Pass the human-readable
2957 // key name so the WAL record carries it (not the synthesized col_{id}).
2958 self.set_node_col_named(node_id, col_id, key.to_string(), val);
2959
2960 Ok(())
2961 }
2962
2963 /// SPA-125: Delete a node, with edge-attachment check.
2964 ///
2965 /// Returns [`Error::NodeHasEdges`] if the node is referenced by any edge
2966 /// in the delta log. On success, queues a `NodeDelete` WAL record and
2967 /// buffers the tombstone write; the on-disk tombstone is only applied when
2968 /// [`commit`] is called.
2969 ///
2970 /// [`commit`]: WriteTx::commit
2971 pub fn delete_node(&mut self, node_id: NodeId) -> Result<()> {
2972 // SPA-185: check ALL per-type delta logs for attached edges, not just
2973 // the hardcoded RelTableId(0). Always include table-0 so that any
2974 // edges written before the catalog had entries are still detected.
2975 let rel_entries = self.catalog.list_rel_table_ids();
2976 let mut rel_ids_to_check: Vec<u32> =
2977 rel_entries.iter().map(|(id, _, _, _)| *id as u32).collect();
2978 // Always include the legacy table-0 slot. If it is already in the
2979 // catalog list this dedup prevents a double-read.
2980 if !rel_ids_to_check.contains(&0u32) {
2981 rel_ids_to_check.push(0u32);
2982 }
2983 for rel_id in rel_ids_to_check {
2984 let delta = EdgeStore::open(&self.inner.path, RelTableId(rel_id))
2985 .and_then(|s| s.read_delta())
2986 .unwrap_or_default();
2987 if delta.iter().any(|r| r.src == node_id || r.dst == node_id) {
2988 return Err(Error::NodeHasEdges { node_id: node_id.0 });
2989 }
2990 }
2991
2992 // Also check buffered (not-yet-committed) edge creates in this
2993 // transaction — a node that has already been connected in the current
2994 // transaction cannot be deleted before commit.
2995 let has_buffered_edge = self.pending_ops.iter().any(|op| {
2996 matches!(op, PendingOp::EdgeCreate { src, dst, .. } if *src == node_id || *dst == node_id)
2997 });
2998 if has_buffered_edge {
2999 return Err(Error::NodeHasEdges { node_id: node_id.0 });
3000 }
3001
3002 // Buffer the tombstone — do NOT write to disk yet.
3003 self.dirty_nodes.insert(node_id.0);
3004 self.pending_ops.push(PendingOp::NodeDelete { node_id });
3005 self.wal_mutations.push(WalMutation::NodeDelete { node_id });
3006 Ok(())
3007 }
3008
3009 /// SPA-126: Create a directed edge `src → dst` with the given type.
3010 ///
3011 /// Buffers the edge creation; the delta-log append and WAL record are only
3012 /// written to disk when [`commit`] is called. If the transaction is dropped
3013 /// without committing, no edge is persisted.
3014 ///
3015 /// Registers the relationship type name in the catalog so that queries
3016 /// like `MATCH (a)-[:REL]->(b)` can resolve the type (SPA-158).
3017 /// Returns the new [`EdgeId`].
3018 ///
3019 /// [`commit`]: WriteTx::commit
3020 pub fn create_edge(
3021 &mut self,
3022 src: NodeId,
3023 dst: NodeId,
3024 rel_type: &str,
3025 props: HashMap<String, Value>,
3026 ) -> Result<EdgeId> {
3027 // Derive label IDs from the packed node IDs (upper 32 bits).
3028 let src_label_id = (src.0 >> 32) as u16;
3029 let dst_label_id = (dst.0 >> 32) as u16;
3030
3031 // Register (or retrieve) the rel type in the catalog.
3032 // Catalog mutation is immediate and not transactional. This is acceptable
3033 // for now as rel type creation is idempotent. Full schema-change
3034 // atomicity is deferred to a future phase.
3035 let catalog_rel_id =
3036 self.catalog
3037 .get_or_create_rel_type_id(src_label_id, dst_label_id, rel_type)?;
3038 let rel_table_id = RelTableId(catalog_rel_id as u32);
3039
3040 // Compute the edge ID from the on-disk delta log size, offset by the
3041 // number of edges already buffered in this transaction for the same
3042 // rel_table_id. Without this offset, multiple create_edge calls in the
3043 // same transaction would all derive the same on-disk base and collide.
3044 let base_edge_id = EdgeStore::peek_next_edge_id(&self.inner.path, rel_table_id)?;
3045 let buffered_count = self
3046 .pending_ops
3047 .iter()
3048 .filter(|op| {
3049 matches!(
3050 op,
3051 PendingOp::EdgeCreate {
3052 rel_table_id: pending_rel_table_id,
3053 ..
3054 } if *pending_rel_table_id == rel_table_id
3055 )
3056 })
3057 .count() as u64;
3058 let edge_id = EdgeId(base_edge_id.0 + buffered_count);
3059
3060 // Convert HashMap<String, Value> props to (col_id, value_u64) pairs
3061 // using the canonical FNV-1a col_id derivation so read and write agree.
3062 // SPA-229: use NodeStore::encode_value (not val.to_u64()) so that
3063 // Value::Float is stored via f64::to_bits() in the heap rather than
3064 // panicking with "cannot be inline-encoded".
3065 let encoded_props: Vec<(u32, u64)> = props
3066 .iter()
3067 .map(|(name, val)| -> Result<(u32, u64)> {
3068 Ok((col_id_of(name), self.store.encode_value(val)?))
3069 })
3070 .collect::<Result<Vec<_>>>()?;
3071
3072 // Human-readable entries for WAL schema introspection.
3073 let prop_entries: Vec<(String, Value)> = props.into_iter().collect();
3074
3075 // Buffer the edge append — do NOT write to disk yet.
3076 self.pending_ops.push(PendingOp::EdgeCreate {
3077 src,
3078 dst,
3079 rel_table_id,
3080 props: encoded_props,
3081 });
3082 self.wal_mutations.push(WalMutation::EdgeCreate {
3083 edge_id,
3084 src,
3085 dst,
3086 rel_type: rel_type.to_string(),
3087 prop_entries,
3088 });
3089 Ok(edge_id)
3090 }
3091
3092 /// Delete the directed edge `src → dst` with the given relationship type.
3093 ///
3094 /// Resolves the relationship type to a `RelTableId` via the catalog, then
3095 /// buffers an `EdgeDelete` operation. At commit time the edge is excised
3096 /// from the on-disk delta log.
3097 ///
3098 /// Returns [`Error::InvalidArgument`] if the relationship type is not
3099 /// registered in the catalog, or if no matching edge record exists in the
3100 /// delta log for the resolved table.
3101 ///
3102 /// Unblocks `SparrowOntology::init(force=true)` which needs to remove all
3103 /// existing edges before re-seeding the ontology graph.
3104 ///
3105 /// [`commit`]: WriteTx::commit
3106 pub fn delete_edge(&mut self, src: NodeId, dst: NodeId, rel_type: &str) -> Result<()> {
3107 let src_label_id = (src.0 >> 32) as u16;
3108 let dst_label_id = (dst.0 >> 32) as u16;
3109
3110 // Resolve the rel type in the catalog. We do not create a new entry —
3111 // deleting a non-existent rel type is always an error.
3112 // The catalog's RelTableId is u64; EdgeStore's is RelTableId(u32).
3113 let catalog_rel_id = self
3114 .catalog
3115 .get_rel_table(src_label_id, dst_label_id, rel_type)?
3116 .ok_or_else(|| {
3117 Error::InvalidArgument(format!(
3118 "relationship type '{}' not found in catalog for labels ({src_label_id}, {dst_label_id})",
3119 rel_type
3120 ))
3121 })?;
3122 let rel_table_id = RelTableId(catalog_rel_id as u32);
3123
3124 // If the edge was created in this same transaction (not yet committed),
3125 // cancel the create rather than scheduling a delete.
3126 let buffered_pos = self.pending_ops.iter().position(|op| {
3127 matches!(
3128 op,
3129 PendingOp::EdgeCreate {
3130 src: os,
3131 dst: od,
3132 rel_table_id: ort,
3133 ..
3134 } if *os == src && *od == dst && *ort == rel_table_id
3135 )
3136 });
3137
3138 if let Some(pos) = buffered_pos {
3139 // Remove the buffered create and its corresponding WAL mutation.
3140 self.pending_ops.remove(pos);
3141 // The WalMutation vec is parallel to pending_ops only for structural
3142 // ops; find and remove the matching EdgeCreate entry.
3143 if let Some(wpos) = self.wal_mutations.iter().position(|m| {
3144 matches!(m, WalMutation::EdgeCreate { src: ms, dst: md, .. } if *ms == src && *md == dst)
3145 }) {
3146 self.wal_mutations.remove(wpos);
3147 }
3148 return Ok(());
3149 }
3150
3151 // Edge is on disk — schedule the deletion for commit time.
3152 self.pending_ops.push(PendingOp::EdgeDelete {
3153 src,
3154 dst,
3155 rel_table_id,
3156 });
3157 self.wal_mutations.push(WalMutation::EdgeDelete {
3158 src,
3159 dst,
3160 rel_type: rel_type.to_string(),
3161 });
3162 Ok(())
3163 }
3164
3165 // ── MVCC conflict detection (SPA-128) ────────────────────────────────────
3166
3167 /// Check for write-write conflicts before committing.
3168 ///
3169 /// A conflict is detected when another `WriteTx` has committed a change
3170 /// to a node that this transaction also dirtied, at a `txn_id` greater
3171 /// than our snapshot.
3172 fn detect_conflicts(&self) -> Result<()> {
3173 let nv = self.inner.node_versions.read().expect("node_versions lock");
3174 for &raw in &self.dirty_nodes {
3175 let last_write = nv.get(NodeId(raw));
3176 if last_write > self.snapshot_txn_id {
3177 return Err(Error::WriteWriteConflict { node_id: raw });
3178 }
3179 }
3180 Ok(())
3181 }
3182
3183 // ── Full-text index maintenance ──────────────────────────────────────────
3184
3185 /// Add a node document to a named full-text index.
3186 ///
3187 /// Call after creating or updating a node to keep the index current.
3188 /// The `text` should be the concatenated string value(s) of the indexed
3189 /// properties. Changes are flushed to disk immediately (no WAL for v1).
3190 ///
3191 /// # Example
3192 /// ```no_run
3193 /// # use sparrowdb::GraphDb;
3194 /// # use sparrowdb_common::NodeId;
3195 /// # let db = GraphDb::open(std::path::Path::new("/tmp/test")).unwrap();
3196 /// # let mut tx = db.begin_write().unwrap();
3197 /// # let node_id = NodeId(0);
3198 /// tx.add_to_fulltext_index("searchIndex", node_id, "some searchable text")?;
3199 /// # Ok::<(), sparrowdb_common::Error>(())
3200 /// ```
3201 pub fn add_to_fulltext_index(
3202 &mut self,
3203 index_name: &str,
3204 node_id: NodeId,
3205 text: &str,
3206 ) -> Result<()> {
3207 use sparrowdb_storage::fulltext_index::FulltextIndex;
3208 // Lazily open the index and cache it for the lifetime of this
3209 // transaction. All additions are batched and flushed once on commit,
3210 // avoiding an open+flush round-trip per document.
3211 let idx = match self.fulltext_pending.get_mut(index_name) {
3212 Some(existing) => existing,
3213 None => {
3214 let opened = FulltextIndex::open(&self.inner.path, index_name)?;
3215 self.fulltext_pending.insert(index_name.to_owned(), opened);
3216 self.fulltext_pending.get_mut(index_name).unwrap()
3217 }
3218 };
3219 idx.add_document(node_id.0, text);
3220 Ok(())
3221 }
3222
3223 // ── Commit ───────────────────────────────────────────────────────────────
3224
3225 /// Commit the transaction.
3226 ///
3227 /// WAL-first protocol (SPA-184):
3228 ///
3229 /// 1. Detects write-write conflicts (SPA-128).
3230 /// 2. Drains staged property updates (does NOT apply to disk yet).
3231 /// 3. Atomically increments the global `current_txn_id` to obtain the new
3232 /// transaction ID that will label all WAL records.
3233 /// 4. **Writes WAL records and fsyncs** (Begin + all structural mutations +
3234 /// all property updates + Commit) so that the intent is durable before
3235 /// any data page is touched (SPA-184).
3236 /// 5. Applies all buffered structural operations to storage (SPA-181):
3237 /// node creates, node deletes, edge creates.
3238 /// 6. Flushes all staged `set_node_col` updates to disk.
3239 /// 7. Records before-images in the version chain at the previous `txn_id`,
3240 /// preserving snapshot access for currently-open readers.
3241 /// 8. Records the new values in the version chain at the new `txn_id`.
3242 /// 9. Updates per-node version table for future conflict detection.
3243 #[must_use = "check the Result; a failed commit means nothing was written"]
3244 pub fn commit(mut self) -> Result<TxnId> {
3245 // Step 1: MVCC conflict abort (SPA-128).
3246 self.detect_conflicts()?;
3247
3248 // Step 2: Drain staged property updates — collect but do NOT write to
3249 // disk yet. We need them in hand to emit WAL records before touching
3250 // any data page.
3251 let updates: Vec<((u64, u32), StagedUpdate)> = self.write_buf.updates.drain().collect();
3252
3253 // Step 3: Increment txn_id with Release ordering. We need the ID now
3254 // so that WAL records (emitted next) carry the correct txn_id.
3255 let new_id = self.inner.current_txn_id.fetch_add(1, Ordering::Release) + 1;
3256
3257 // Step 4: WAL-first — write all mutation records and fsync before
3258 // touching any data page (SPA-184). A crash between here and the end
3259 // of Step 6 is recoverable: WAL replay will re-apply the ops.
3260 write_mutation_wal(
3261 &self.inner.wal_writer,
3262 new_id,
3263 &updates,
3264 &self.wal_mutations,
3265 )?;
3266
3267 // Step 5: Apply buffered structural operations to disk (SPA-181).
3268 // WAL is already durable at this point; a crash here is safe.
3269 //
3270 // SPA-212 (write-amplification fix): NodeCreate ops are collected into a
3271 // single batch and written via `batch_write_node_creates`, which opens
3272 // each (label_id, col_id) file only once regardless of how many nodes
3273 // share that column. This reduces file-open syscalls from O(nodes×cols)
3274 // to O(labels×cols) per transaction commit.
3275 let mut col_writes: Vec<(u32, u32, u32, u64, bool)> = Vec::new();
3276 // All (label_id, slot) pairs for created nodes — needed for HWM
3277 // advancement, even when a node has zero properties.
3278 let mut node_slots: Vec<(u32, u32)> = Vec::new();
3279
3280 for op in self.pending_ops.drain(..) {
3281 match op {
3282 PendingOp::NodeCreate {
3283 label_id,
3284 slot,
3285 props,
3286 } => {
3287 // Track this node's (label_id, slot) for HWM advancement.
3288 node_slots.push((label_id, slot));
3289 // Encode each property value and push (label_id, col_id,
3290 // slot, raw_u64, is_present) into the batch buffer.
3291 for (col_id, ref val) in props {
3292 let raw = self.store.encode_value(val)?;
3293 col_writes.push((label_id, col_id, slot, raw, true));
3294 }
3295 }
3296 PendingOp::NodeDelete { node_id } => {
3297 self.store.tombstone_node(node_id)?;
3298 }
3299 PendingOp::EdgeCreate {
3300 src,
3301 dst,
3302 rel_table_id,
3303 props,
3304 } => {
3305 let mut es = EdgeStore::open(&self.inner.path, rel_table_id)?;
3306 es.create_edge(src, rel_table_id, dst)?;
3307 // Persist edge properties keyed by (src_slot, dst_slot) so
3308 // that reads work correctly after CHECKPOINT (SPA-240).
3309 if !props.is_empty() {
3310 let src_slot = src.0 & 0xFFFF_FFFF;
3311 let dst_slot = dst.0 & 0xFFFF_FFFF;
3312 for (col_id, value) in &props {
3313 es.set_edge_prop(src_slot, dst_slot, *col_id, *value)?;
3314 }
3315 // SPA-261: invalidate cached edge props for this rel table.
3316 self.inner
3317 .edge_props_cache
3318 .write()
3319 .expect("edge_props_cache poisoned")
3320 .remove(&rel_table_id.0);
3321 }
3322 }
3323 PendingOp::EdgeDelete {
3324 src,
3325 dst,
3326 rel_table_id,
3327 } => {
3328 let mut es = EdgeStore::open(&self.inner.path, rel_table_id)?;
3329 es.delete_edge(src, dst)?;
3330 }
3331 }
3332 }
3333
3334 // Flush all NodeCreate column writes in one batched call.
3335 // O(labels × cols) file opens instead of O(nodes × cols).
3336 self.store
3337 .batch_write_node_creates(col_writes, &node_slots)?;
3338
3339 // Step 5b: Persist HWMs for all labels that received new nodes in Step 5.
3340 //
3341 // batch_write_node_creates() advances the in-memory HWM and marks
3342 // labels dirty (hwm_dirty). We flush all dirty HWMs here — once per
3343 // commit — avoiding an fsync storm during bulk imports (SPA-217
3344 // regression fix). Crash safety is preserved: the WAL record written
3345 // in Step 4 is already durable; on crash-recovery, the WAL replayer
3346 // re-applies all NodeCreate ops and re-advances the HWM.
3347 self.store.flush_hwms()?;
3348
3349 // Step 6: Flush property updates to disk.
3350 // Use `upsert_node_col` so that columns added by `set_property` (which
3351 // may not have been initialised during `create_node`) are created and
3352 // zero-padded automatically.
3353 for ((node_raw, col_id), ref staged) in &updates {
3354 self.store
3355 .upsert_node_col(NodeId(*node_raw), *col_id, &staged.new_value)?;
3356 }
3357
3358 // Step 7+8: Publish versions.
3359 {
3360 let mut vs = self.inner.versions.write().expect("version lock poisoned");
3361 for ((node_raw, col_id), ref staged) in &updates {
3362 // Publish before-image at the previous txn_id so that readers
3363 // pinned at that snapshot continue to see the correct value.
3364 if let Some((prev_txn_id, ref before_val)) = staged.before_image {
3365 vs.insert(NodeId(*node_raw), *col_id, prev_txn_id, before_val.clone());
3366 }
3367 // Publish new value at the current txn_id.
3368 vs.insert(NodeId(*node_raw), *col_id, new_id, staged.new_value.clone());
3369 }
3370 }
3371
3372 // Step 9: Advance per-node version table.
3373 {
3374 let mut nv = self
3375 .inner
3376 .node_versions
3377 .write()
3378 .expect("node_versions lock");
3379 for &raw in &self.dirty_nodes {
3380 nv.set(NodeId(raw), new_id);
3381 }
3382 }
3383
3384 // Step 10: Flush any pending fulltext index updates.
3385 // The primary DB mutations above are already durable (WAL written,
3386 // txn_id advanced). A flush failure here must NOT return Err and
3387 // cause the caller to retry an already-committed transaction. Log the
3388 // error so operators can investigate but treat the commit as successful.
3389 for (name, mut idx) in self.fulltext_pending.drain() {
3390 if let Err(e) = idx.flush() {
3391 tracing::error!(index = %name, error = %e, "fulltext index flush failed post-commit; index may be stale until next write");
3392 }
3393 }
3394
3395 // Step 11: Refresh the shared catalog cache so subsequent reads see
3396 // any labels / rel types created in this transaction (SPA-188).
3397 // Also rebuild the label-row-count cache (SPA-190) from the freshly
3398 // opened catalog to avoid a second I/O trip.
3399 if let Ok(fresh) = Catalog::open(&self.inner.path) {
3400 let new_counts = build_label_row_counts_from_disk(&fresh, &self.inner.path);
3401 *self.inner.catalog.write().expect("catalog RwLock poisoned") = fresh;
3402 *self
3403 .inner
3404 .label_row_counts
3405 .write()
3406 .expect("label_row_counts RwLock poisoned") = new_counts;
3407 }
3408
3409 // Step 12: Invalidate the shared property-index cache (SPA-259).
3410 //
3411 // GraphDb-level execute functions (execute_create_standalone, etc.)
3412 // call invalidate_prop_index() again after this — the extra clear
3413 // is harmless (just bumps the generation counter). The critical
3414 // case is when a caller uses WriteTx directly without going through
3415 // a GraphDb::execute path, which previously left the cache stale.
3416 self.inner
3417 .prop_index
3418 .write()
3419 .expect("prop_index RwLock poisoned")
3420 .clear();
3421
3422 self.committed = true;
3423 Ok(TxnId(new_id))
3424 }
3425}
3426
3427impl Drop for WriteTx {
3428 fn drop(&mut self) {
3429 // Uncommitted staged updates are discarded here — no writes to disk.
3430 // The write lock is released by dropping `_guard` (WriteGuard).
3431 let _ = self.committed;
3432 }
3433}
3434
3435// ── WAL mutation emission (SPA-127) ───────────────────────────────────────────
3436
3437/// Append mutation WAL records for a committed transaction.
3438///
3439/// Uses the persistent `wal_writer` cached in [`DbInner`] — no segment scan
3440/// or file-open overhead on each call (SPA-210). Emits:
3441/// - `Begin`
3442/// - `NodeUpdate` for each staged property change in `updates`
3443/// - `NodeCreate` / `NodeUpdate` / `NodeDelete` / `EdgeCreate` for each structural mutation
3444/// - `Commit`
3445/// - fsync
3446fn write_mutation_wal(
3447 wal_writer: &Mutex<WalWriter>,
3448 txn_id: u64,
3449 updates: &[((u64, u32), StagedUpdate)],
3450 mutations: &[WalMutation],
3451) -> Result<()> {
3452 // Nothing to record if there were no mutations or property updates.
3453 if updates.is_empty() && mutations.is_empty() {
3454 return Ok(());
3455 }
3456
3457 let mut wal = wal_writer.lock().unwrap_or_else(|e| e.into_inner());
3458 let txn = TxnId(txn_id);
3459
3460 wal.append(WalRecordKind::Begin, txn, WalPayload::Empty)?;
3461
3462 // NodeUpdate records for each property change in the write buffer.
3463 for ((node_raw, col_id), staged) in updates {
3464 let before_bytes = staged
3465 .before_image
3466 .as_ref()
3467 .map(|(_, v)| value_to_wal_bytes(v))
3468 .unwrap_or_else(|| 0u64.to_le_bytes().to_vec());
3469 let after_bytes = value_to_wal_bytes(&staged.new_value);
3470 wal.append(
3471 WalRecordKind::NodeUpdate,
3472 txn,
3473 WalPayload::NodeUpdate {
3474 node_id: *node_raw,
3475 key: staged.key_name.clone(),
3476 col_id: *col_id,
3477 before: before_bytes,
3478 after: after_bytes,
3479 },
3480 )?;
3481 }
3482
3483 // Structural mutations.
3484 for m in mutations {
3485 match m {
3486 WalMutation::NodeCreate {
3487 node_id,
3488 label_id,
3489 props,
3490 prop_names,
3491 } => {
3492 // Use human-readable names when available; fall back to
3493 // "col_{hash}" when only the col_id is known (low-level API).
3494 let wal_props: Vec<(String, Vec<u8>)> = props
3495 .iter()
3496 .enumerate()
3497 .map(|(i, (col_id, v))| {
3498 let name = prop_names
3499 .get(i)
3500 .filter(|n| !n.is_empty())
3501 .cloned()
3502 .unwrap_or_else(|| format!("col_{col_id}"));
3503 (name, value_to_wal_bytes(v))
3504 })
3505 .collect();
3506 wal.append(
3507 WalRecordKind::NodeCreate,
3508 txn,
3509 WalPayload::NodeCreate {
3510 node_id: node_id.0,
3511 label_id: *label_id,
3512 props: wal_props,
3513 },
3514 )?;
3515 }
3516 WalMutation::NodeDelete { node_id } => {
3517 wal.append(
3518 WalRecordKind::NodeDelete,
3519 txn,
3520 WalPayload::NodeDelete { node_id: node_id.0 },
3521 )?;
3522 }
3523 WalMutation::EdgeCreate {
3524 edge_id,
3525 src,
3526 dst,
3527 rel_type,
3528 prop_entries,
3529 } => {
3530 let wal_props: Vec<(String, Vec<u8>)> = prop_entries
3531 .iter()
3532 .map(|(name, val)| (name.clone(), value_to_wal_bytes(val)))
3533 .collect();
3534 wal.append(
3535 WalRecordKind::EdgeCreate,
3536 txn,
3537 WalPayload::EdgeCreate {
3538 edge_id: edge_id.0,
3539 src: src.0,
3540 dst: dst.0,
3541 rel_type: rel_type.clone(),
3542 props: wal_props,
3543 },
3544 )?;
3545 }
3546 WalMutation::EdgeDelete { src, dst, rel_type } => {
3547 wal.append(
3548 WalRecordKind::EdgeDelete,
3549 txn,
3550 WalPayload::EdgeDelete {
3551 src: src.0,
3552 dst: dst.0,
3553 rel_type: rel_type.clone(),
3554 },
3555 )?;
3556 }
3557 }
3558 }
3559
3560 wal.append(WalRecordKind::Commit, txn, WalPayload::Empty)?;
3561 wal.fsync()?;
3562 Ok(())
3563}
3564
3565// ── FNV-1a col_id derivation ─────────────────────────────────────────────────
3566
3567/// Derive a stable `u32` column ID from a property key name.
3568///
3569/// Delegates to [`sparrowdb_common::col_id_of`] — the single canonical
3570/// FNV-1a implementation shared by storage and execution (SPA-160).
3571pub fn fnv1a_col_id(key: &str) -> u32 {
3572 col_id_of(key)
3573}
3574
3575// ── Cypher string utilities ────────────────────────────────────────────────────
3576
3577/// Escape a Rust `&str` so it can be safely interpolated inside a single-quoted
3578/// Cypher string literal.
3579///
3580/// Two characters require escaping inside Cypher single-quoted strings:
3581/// * `\` → `\\` (backslash must be doubled first to avoid misinterpreting
3582/// the subsequent escape sequence)
3583/// * `'` → `\'` (prevents premature termination of the string literal)
3584///
3585/// # Example
3586///
3587/// ```
3588/// use sparrowdb::cypher_escape_string;
3589/// let safe = cypher_escape_string("O'Reilly");
3590/// let cypher = format!("MATCH (n {{name: '{safe}'}}) RETURN n");
3591/// assert_eq!(cypher, "MATCH (n {name: 'O\\'Reilly'}) RETURN n");
3592/// ```
3593///
3594/// **Prefer parameterized queries** (`execute_with_params`) over string
3595/// interpolation whenever possible — this function is provided for the cases
3596/// where dynamic query construction cannot be avoided (SPA-218).
3597pub fn cypher_escape_string(s: &str) -> String {
3598 // Process backslash first so that the apostrophe replacement below does
3599 // not accidentally double-escape newly-inserted backslashes.
3600 s.replace('\\', "\\\\").replace('\'', "\\'")
3601}
3602
3603// ── Mutation value helpers ─────────────────────────────────────────────────────
3604
3605/// Convert a Cypher [`Literal`] to a storage [`Value`].
3606fn literal_to_value(lit: &sparrowdb_cypher::ast::Literal) -> Value {
3607 use sparrowdb_cypher::ast::Literal;
3608 match lit {
3609 Literal::Int(n) => Value::Int64(*n),
3610 // Float stored as Value::Float — NodeStore::encode_value writes the full
3611 // 8 IEEE-754 bytes to the overflow heap (SPA-267).
3612 Literal::Float(f) => Value::Float(*f),
3613 Literal::Bool(b) => Value::Int64(if *b { 1 } else { 0 }),
3614 Literal::String(s) => Value::Bytes(s.as_bytes().to_vec()),
3615 Literal::Null | Literal::Param(_) => Value::Int64(0),
3616 }
3617}
3618
3619/// Convert a Cypher [`Expr`] to a storage [`Value`].
3620///
3621/// Returns `true` when the `DELETE` clause variable in a `MatchMutateStatement`
3622/// refers to a relationship pattern variable rather than a node variable.
3623///
3624/// Used to route `MATCH (a)-[r:REL]->(b) DELETE r` to the edge-delete path
3625/// instead of the node-delete path.
3626fn is_edge_delete_mutation(mm: &sparrowdb_cypher::ast::MatchMutateStatement) -> bool {
3627 let sparrowdb_cypher::ast::Mutation::Delete { var } = &mm.mutation else {
3628 return false;
3629 };
3630 mm.match_patterns
3631 .iter()
3632 .any(|p| p.rels.iter().any(|r| !r.var.is_empty() && &r.var == var))
3633}
3634
3635/// Only literal expressions are supported for SET values at this stage.
3636fn expr_to_value(expr: &sparrowdb_cypher::ast::Expr) -> Value {
3637 use sparrowdb_cypher::ast::Expr;
3638 match expr {
3639 Expr::Literal(lit) => literal_to_value(lit),
3640 _ => Value::Int64(0),
3641 }
3642}
3643
3644fn literal_to_value_with_params(
3645 lit: &sparrowdb_cypher::ast::Literal,
3646 params: &HashMap<String, sparrowdb_execution::Value>,
3647) -> Result<Value> {
3648 use sparrowdb_cypher::ast::Literal;
3649 match lit {
3650 Literal::Int(n) => Ok(Value::Int64(*n)),
3651 Literal::Float(f) => Ok(Value::Float(*f)),
3652 Literal::Bool(b) => Ok(Value::Int64(if *b { 1 } else { 0 })),
3653 Literal::String(s) => Ok(Value::Bytes(s.as_bytes().to_vec())),
3654 Literal::Null => Ok(Value::Int64(0)),
3655 Literal::Param(p) => match params.get(p.as_str()) {
3656 Some(v) => Ok(exec_value_to_storage(v)),
3657 None => Err(Error::InvalidArgument(format!(
3658 "parameter ${p} was referenced in the query but not supplied"
3659 ))),
3660 },
3661 }
3662}
3663
3664fn expr_to_value_with_params(
3665 expr: &sparrowdb_cypher::ast::Expr,
3666 params: &HashMap<String, sparrowdb_execution::Value>,
3667) -> Result<Value> {
3668 use sparrowdb_cypher::ast::Expr;
3669 match expr {
3670 Expr::Literal(lit) => literal_to_value_with_params(lit, params),
3671 _ => Err(Error::InvalidArgument(
3672 "property value must be a literal or $parameter".into(),
3673 )),
3674 }
3675}
3676
3677fn exec_value_to_storage(v: &sparrowdb_execution::Value) -> Value {
3678 use sparrowdb_execution::Value as EV;
3679 match v {
3680 EV::Int64(n) => Value::Int64(*n),
3681 EV::Float64(f) => Value::Float(*f),
3682 EV::Bool(b) => Value::Int64(if *b { 1 } else { 0 }),
3683 EV::String(s) => Value::Bytes(s.as_bytes().to_vec()),
3684 _ => Value::Int64(0),
3685 }
3686}
3687
3688/// Encode a storage [`Value`] to a raw little-endian byte vector for WAL
3689/// logging. Unlike [`Value::to_u64`], this never panics for `Float` values —
3690/// it emits the full 8 IEEE-754 bytes so the WAL payload is lossless.
3691///
3692/// The bytes are for observability only (schema introspection uses only the
3693/// property *name*, not the value); correctness of on-disk storage is
3694/// handled separately via [`NodeStore::encode_value`].
3695fn value_to_wal_bytes(v: &Value) -> Vec<u8> {
3696 match v {
3697 Value::Float(f) => f.to_bits().to_le_bytes().to_vec(),
3698 // For Int64 and Bytes the existing to_u64() encoding is fine.
3699 other => other.to_u64().to_le_bytes().to_vec(),
3700 }
3701}
3702
3703/// Convert a storage-layer `Value` (Int64 / Bytes / Float) to the execution-layer
3704/// `Value` (Int64 / String / Float64 / Null / …) used in `QueryResult` rows.
3705fn storage_value_to_exec(val: &Value) -> sparrowdb_execution::Value {
3706 match val {
3707 Value::Int64(n) => sparrowdb_execution::Value::Int64(*n),
3708 Value::Bytes(b) => {
3709 sparrowdb_execution::Value::String(String::from_utf8_lossy(b).into_owned())
3710 }
3711 Value::Float(f) => sparrowdb_execution::Value::Float64(*f),
3712 }
3713}
3714
3715/// Evaluate a RETURN expression against a simple name→ExecValue map built
3716/// from the merged node's properties. Used exclusively by `execute_merge`.
3717///
3718/// Supports `PropAccess` (e.g. `n.name`) and `Literal`; everything else
3719/// falls back to `Null`.
3720fn eval_expr_merge(
3721 expr: &sparrowdb_cypher::ast::Expr,
3722 vals: &HashMap<String, sparrowdb_execution::Value>,
3723) -> sparrowdb_execution::Value {
3724 use sparrowdb_cypher::ast::{Expr, Literal};
3725 match expr {
3726 Expr::PropAccess { var, prop } => {
3727 let key = format!("{var}.{prop}");
3728 vals.get(&key)
3729 .cloned()
3730 .unwrap_or(sparrowdb_execution::Value::Null)
3731 }
3732 Expr::Literal(lit) => match lit {
3733 Literal::Int(n) => sparrowdb_execution::Value::Int64(*n),
3734 Literal::Float(f) => sparrowdb_execution::Value::Float64(*f),
3735 Literal::Bool(b) => sparrowdb_execution::Value::Bool(*b),
3736 Literal::String(s) => sparrowdb_execution::Value::String(s.clone()),
3737 Literal::Null | Literal::Param(_) => sparrowdb_execution::Value::Null,
3738 },
3739 Expr::Var(v) => vals
3740 .get(v.as_str())
3741 .cloned()
3742 .unwrap_or(sparrowdb_execution::Value::Null),
3743 _ => sparrowdb_execution::Value::Null,
3744 }
3745}
3746
3747// ── Private helpers ───────────────────────────────────────────────────────────
3748
3749fn collect_maintenance_params(
3750 catalog: &Catalog,
3751 node_store: &NodeStore,
3752 db_root: &Path,
3753) -> (Vec<u32>, u64) {
3754 // SPA-185: collect all registered rel table IDs from the catalog instead
3755 // of hardcoding [0]. This ensures every per-type edge store is checkpointed.
3756 // Always include table-0 so that any edges written before the catalog had
3757 // entries (legacy data or pre-SPA-185 databases) are also checkpointed.
3758 let rel_table_entries = catalog.list_rel_table_ids();
3759 let mut rel_table_ids: Vec<u32> = rel_table_entries
3760 .iter()
3761 .map(|(id, _, _, _)| *id as u32)
3762 .collect();
3763 // Always include the legacy table-0 slot. Dedup if already present.
3764 if !rel_table_ids.contains(&0u32) {
3765 rel_table_ids.push(0u32);
3766 }
3767
3768 // n_nodes must cover the highest node-store HWM AND the highest node ID
3769 // present in any delta record, so that the CSR bounds check passes even
3770 // when edges were inserted without going through the node-store API.
3771 let hwm_n_nodes: u64 = catalog
3772 .list_labels()
3773 .unwrap_or_default()
3774 .iter()
3775 .map(|(label_id, _name)| node_store.hwm_for_label(*label_id as u32).unwrap_or(0))
3776 .sum::<u64>();
3777
3778 // Also scan delta records for the maximum *slot* index used.
3779 // NodeIds encode `(label_id << 32) | slot`; the CSR is indexed by slot,
3780 // so we must strip the label bits before computing n_nodes. Using the
3781 // full NodeId here would inflate n_nodes into the billions (e.g. label_id=1
3782 // → slot 0 appears as 0x0000_0001_0000_0000) and the CSR would be built
3783 // with nonsensical degree arrays — the SPA-186 root cause.
3784 let delta_max: u64 = rel_table_ids
3785 .iter()
3786 .filter_map(|&rel_id| {
3787 EdgeStore::open(db_root, RelTableId(rel_id))
3788 .ok()
3789 .and_then(|s| s.read_delta().ok())
3790 })
3791 .flat_map(|records| {
3792 records.into_iter().flat_map(|r| {
3793 // Strip label bits — CSR needs slot indices only.
3794 let src_slot = r.src.0 & 0xFFFF_FFFF;
3795 let dst_slot = r.dst.0 & 0xFFFF_FFFF;
3796 [src_slot, dst_slot].into_iter()
3797 })
3798 })
3799 .max()
3800 .map(|max_slot| max_slot + 1)
3801 .unwrap_or(0);
3802
3803 let n_nodes = hwm_n_nodes.max(delta_max).max(1);
3804
3805 (rel_table_ids, n_nodes)
3806}
3807
3808/// Open all per-type CSR forward files that exist on disk, keyed by rel_table_id.
3809///
3810/// SPA-185: replaces the old `open_csr_forward` that only opened `RelTableId(0)`.
3811/// The catalog is used to discover all registered rel types.
3812/// Build a `LabelId → node count` map by reading each label's HWM from disk
3813/// (SPA-190). Called at `GraphDb::open()` and after node-mutating writes.
3814fn build_label_row_counts_from_disk(catalog: &Catalog, db_root: &Path) -> HashMap<LabelId, usize> {
3815 let store = match NodeStore::open(db_root) {
3816 Ok(s) => s,
3817 Err(_) => return HashMap::new(),
3818 };
3819 catalog
3820 .list_labels()
3821 .unwrap_or_default()
3822 .into_iter()
3823 .filter_map(|(lid, _name)| {
3824 let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
3825 if hwm > 0 {
3826 Some((lid, hwm as usize))
3827 } else {
3828 None
3829 }
3830 })
3831 .collect()
3832}
3833
3834fn open_csr_map(path: &Path) -> HashMap<u32, CsrForward> {
3835 let catalog = match Catalog::open(path) {
3836 Ok(c) => c,
3837 Err(_) => return HashMap::new(),
3838 };
3839 let mut map = HashMap::new();
3840
3841 // Collect rel IDs from catalog.
3842 let mut rel_ids: Vec<u32> = catalog
3843 .list_rel_table_ids()
3844 .into_iter()
3845 .map(|(id, _, _, _)| id as u32)
3846 .collect();
3847
3848 // Always include the legacy table-0 slot so that checkpointed CSRs
3849 // written before the catalog had entries (pre-SPA-185 data) are loaded.
3850 if !rel_ids.contains(&0u32) {
3851 rel_ids.push(0u32);
3852 }
3853
3854 for rid in rel_ids {
3855 if let Ok(store) = EdgeStore::open(path, RelTableId(rid)) {
3856 if let Ok(csr) = store.open_fwd() {
3857 map.insert(rid, csr);
3858 }
3859 }
3860 }
3861 map
3862}
3863
3864/// Like [`open_csr_map`] but surfaces the catalog-open error so callers can
3865/// decide whether to replace an existing cache. Used by
3866/// [`GraphDb::invalidate_csr_map`] to avoid clobbering a valid in-memory map
3867/// with an empty one when the catalog is transiently unreadable.
3868fn try_open_csr_map(path: &Path) -> Result<HashMap<u32, CsrForward>> {
3869 let catalog = Catalog::open(path)?;
3870 let mut map = HashMap::new();
3871
3872 let mut rel_ids: Vec<u32> = catalog
3873 .list_rel_table_ids()
3874 .into_iter()
3875 .map(|(id, _, _, _)| id as u32)
3876 .collect();
3877
3878 if !rel_ids.contains(&0u32) {
3879 rel_ids.push(0u32);
3880 }
3881
3882 for rid in rel_ids {
3883 if let Ok(store) = EdgeStore::open(path, RelTableId(rid)) {
3884 if let Ok(csr) = store.open_fwd() {
3885 map.insert(rid, csr);
3886 }
3887 }
3888 }
3889 Ok(map)
3890}
3891
3892// ── Storage-size helpers (SPA-171) ────────────────────────────────────────────
3893
3894fn dir_size_bytes(dir: &Path) -> u64 {
3895 let mut total: u64 = 0;
3896 let Ok(entries) = std::fs::read_dir(dir) else {
3897 return 0;
3898 };
3899 for e in entries.flatten() {
3900 let p = e.path();
3901 if p.is_dir() {
3902 total += dir_size_bytes(&p);
3903 } else if let Ok(m) = std::fs::metadata(&p) {
3904 total += m.len();
3905 }
3906 }
3907 total
3908}
3909
3910// ── Reserved label/type protection (SPA-208) ──────────────────────────────────
3911
3912/// Returns `true` if `label` starts with the reserved `__SO_` prefix.
3913///
3914/// The `__SO_` namespace is reserved for internal SparrowDB system objects.
3915/// Any attempt to CREATE a node or relationship using a label/type in this
3916/// namespace is rejected with an [`Error::InvalidArgument`].
3917#[inline]
3918fn is_reserved_label(label: &str) -> bool {
3919 label.starts_with("__SO_")
3920}
3921
3922/// Return an [`Error::InvalidArgument`] for a reserved label/type.
3923fn reserved_label_error(label: &str) -> sparrowdb_common::Error {
3924 sparrowdb_common::Error::InvalidArgument(format!(
3925 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
3926 ))
3927}
3928
3929// ── Tests ─────────────────────────────────────────────────────────────────────
3930
3931#[cfg(test)]
3932mod tests {
3933 use super::*;
3934
3935 /// Phase 0 gate: create a DB directory, open it, and let it drop cleanly.
3936 #[test]
3937 fn open_and_close_empty_db() {
3938 let dir = tempfile::tempdir().expect("tempdir");
3939 let db_path = dir.path().join("test.sparrow");
3940
3941 assert!(!db_path.exists());
3942 let db = GraphDb::open(&db_path).expect("open must succeed");
3943 assert!(db_path.is_dir());
3944 drop(db);
3945 }
3946
3947 #[test]
3948 fn open_existing_dir_succeeds() {
3949 let dir = tempfile::tempdir().expect("tempdir");
3950 let db_path = dir.path().join("existing.sparrow");
3951 std::fs::create_dir_all(&db_path).unwrap();
3952 let db = GraphDb::open(&db_path).expect("open existing must succeed");
3953 drop(db);
3954 }
3955
3956 #[test]
3957 fn open_fn_signature() {
3958 let _: fn(&Path) -> Result<GraphDb> = open;
3959 }
3960
3961 #[test]
3962 fn begin_read_returns_snapshot_zero_before_any_write() {
3963 let dir = tempfile::tempdir().unwrap();
3964 let db = GraphDb::open(dir.path()).unwrap();
3965 let rx = db.begin_read().unwrap();
3966 assert_eq!(rx.snapshot_txn_id, 0);
3967 }
3968
3969 #[test]
3970 fn begin_write_increments_txn_id_on_commit() {
3971 let dir = tempfile::tempdir().unwrap();
3972 let db = GraphDb::open(dir.path()).unwrap();
3973
3974 let tx = db.begin_write().unwrap();
3975 let committed = tx.commit().unwrap();
3976 assert_eq!(committed.0, 1);
3977
3978 let tx2 = db.begin_write().unwrap();
3979 let committed2 = tx2.commit().unwrap();
3980 assert_eq!(committed2.0, 2);
3981 }
3982
3983 #[test]
3984 fn second_begin_write_returns_writer_busy() {
3985 let dir = tempfile::tempdir().unwrap();
3986 let db = GraphDb::open(dir.path()).unwrap();
3987
3988 let _tx1 = db.begin_write().unwrap();
3989 let err = db.begin_write().err().expect("expected Err");
3990 assert!(
3991 matches!(err, Error::WriterBusy),
3992 "expected WriterBusy, got {err}"
3993 );
3994 }
3995
3996 #[test]
3997 fn write_lock_released_after_commit() {
3998 let dir = tempfile::tempdir().unwrap();
3999 let db = GraphDb::open(dir.path()).unwrap();
4000
4001 let tx = db.begin_write().unwrap();
4002 tx.commit().unwrap();
4003
4004 // Should succeed because the lock was released on commit/drop.
4005 let tx2 = db.begin_write().unwrap();
4006 tx2.commit().unwrap();
4007 }
4008
4009 #[test]
4010 fn graphdb_checkpoint_on_empty_db() {
4011 let dir = tempfile::tempdir().unwrap();
4012 let db = GraphDb::open(dir.path()).unwrap();
4013 // Checkpoint on empty DB must not panic or error.
4014 db.checkpoint()
4015 .expect("checkpoint must succeed on empty DB");
4016 }
4017
4018 #[test]
4019 fn graphdb_optimize_on_empty_db() {
4020 let dir = tempfile::tempdir().unwrap();
4021 let db = GraphDb::open(dir.path()).unwrap();
4022 db.optimize().expect("optimize must succeed on empty DB");
4023 }
4024
4025 /// SPA-162: checkpoint() must not deadlock after a write transaction has
4026 /// been committed and dropped.
4027 #[test]
4028 fn checkpoint_does_not_deadlock_after_write() {
4029 use std::sync::Arc;
4030
4031 let dir = tempfile::tempdir().unwrap();
4032 let db = Arc::new(GraphDb::open(dir.path()).unwrap());
4033
4034 // Run a write transaction and commit it.
4035 let mut tx = db.begin_write().unwrap();
4036 tx.create_node(0, &[]).unwrap();
4037 tx.commit().unwrap();
4038
4039 // checkpoint() must complete without hanging — run it on a thread so
4040 // the test runner can time out rather than block the whole suite.
4041 let db2 = Arc::clone(&db);
4042 let handle = std::thread::spawn(move || {
4043 db2.checkpoint().unwrap();
4044 });
4045 handle
4046 .join()
4047 .expect("checkpoint thread must complete without panic");
4048 }
4049
4050 /// SPA-162: checkpoint() must return WriterBusy (not deadlock) when a
4051 /// WriteTx is currently active. Before the fix, calling lock() from
4052 /// checkpoint() while the same thread held the mutex would hang forever.
4053 #[test]
4054 fn checkpoint_returns_writer_busy_while_write_tx_active() {
4055 let dir = tempfile::tempdir().unwrap();
4056 let db = GraphDb::open(dir.path()).unwrap();
4057
4058 // Hold an active write transaction without committing it.
4059 let tx = db.begin_write().unwrap();
4060
4061 // checkpoint() must return WriterBusy immediately — not deadlock.
4062 let result = db.checkpoint();
4063 assert!(
4064 matches!(result, Err(Error::WriterBusy)),
4065 "expected WriterBusy while WriteTx active, got: {result:?}"
4066 );
4067
4068 // Drop the transaction; checkpoint must now succeed.
4069 drop(tx);
4070 db.checkpoint()
4071 .expect("checkpoint must succeed after WriteTx dropped");
4072 }
4073
4074 /// SPA-181: Dropping a WriteTx without calling commit() must not persist
4075 /// any mutations. The node-store HWM must remain at 0 after a dropped tx.
4076 #[test]
4077 fn dropped_write_tx_persists_no_nodes() {
4078 use sparrowdb_storage::node_store::NodeStore;
4079
4080 let dir = tempfile::tempdir().unwrap();
4081 let db = GraphDb::open(dir.path()).unwrap();
4082
4083 {
4084 let mut tx = db.begin_write().unwrap();
4085 // Stage a node creation — should NOT be written to disk.
4086 let _node_id = tx.create_node(0, &[]).unwrap();
4087 // Drop WITHOUT calling commit().
4088 }
4089
4090 // Verify no node was persisted.
4091 let store = NodeStore::open(dir.path()).unwrap();
4092 let hwm = store.hwm_for_label(0).unwrap();
4093 assert_eq!(hwm, 0, "dropped tx must not persist any nodes (SPA-181)");
4094
4095 // A subsequent write transaction must be obtainable (lock released).
4096 let tx2 = db
4097 .begin_write()
4098 .expect("write lock must be released after drop");
4099 tx2.commit().unwrap();
4100 }
4101
4102 /// SPA-181: A sequence of create_node + create_edge where the whole
4103 /// transaction is dropped leaves the store entirely unchanged.
4104 #[test]
4105 fn dropped_write_tx_persists_no_edges() {
4106 use sparrowdb_storage::edge_store::EdgeStore;
4107
4108 let dir = tempfile::tempdir().unwrap();
4109 let db = GraphDb::open(dir.path()).unwrap();
4110
4111 // First, commit two nodes so we have valid src/dst IDs.
4112 let (src, dst) = {
4113 let mut tx = db.begin_write().unwrap();
4114 let src = tx.create_node(0, &[]).unwrap();
4115 let dst = tx.create_node(0, &[]).unwrap();
4116 tx.commit().unwrap();
4117 (src, dst)
4118 };
4119
4120 {
4121 let mut tx = db.begin_write().unwrap();
4122 // Stage an edge — should NOT be written to disk.
4123 tx.create_edge(src, dst, "KNOWS", std::collections::HashMap::new())
4124 .unwrap();
4125 // Drop WITHOUT calling commit().
4126 }
4127
4128 // Verify no edge was persisted (delta log must be empty or absent).
4129 let delta = EdgeStore::open(dir.path(), sparrowdb_storage::edge_store::RelTableId(0))
4130 .and_then(|s| s.read_delta())
4131 .unwrap_or_default();
4132 assert_eq!(
4133 delta.len(),
4134 0,
4135 "dropped tx must not persist any edges (SPA-181)"
4136 );
4137 }
4138
4139 /// SPA-181: The old UB transmute is gone. Verify the write lock cycles
4140 /// correctly: acquire → use → drop → acquire again.
4141 #[test]
4142 fn write_guard_releases_lock_on_drop() {
4143 let dir = tempfile::tempdir().unwrap();
4144 let db = GraphDb::open(dir.path()).unwrap();
4145
4146 for _ in 0..5 {
4147 let tx = db.begin_write().expect("lock must be free");
4148 assert!(matches!(db.begin_write(), Err(Error::WriterBusy)));
4149 drop(tx);
4150 }
4151 }
4152
4153 // ── SPA-171: GraphDb::stats() ─────────────────────────────────────────────
4154
4155 #[test]
4156 fn stats_empty_db() {
4157 let dir = tempfile::tempdir().unwrap();
4158 let db = GraphDb::open(dir.path()).unwrap();
4159 let stats = db.stats().unwrap();
4160 assert_eq!(stats.edge_count, 0);
4161 assert_eq!(stats.node_count_per_label.len(), 0);
4162 // SPA-210: WalWriter is opened eagerly at GraphDb::open time, creating
4163 // the initial segment file (1-byte version header). wal_bytes may be
4164 // a small non-zero value before any commits.
4165 // (Previously the WAL directory was only created on first commit.)
4166 let _ = stats.wal_bytes; // accept any value — presence is fine
4167 }
4168
4169 #[test]
4170 fn stats_after_writes() {
4171 let dir = tempfile::tempdir().unwrap();
4172 let db = GraphDb::open(dir.path()).unwrap();
4173 // Create two nodes and one edge via low-level API.
4174 let (src, dst) = {
4175 let mut tx = db.begin_write().unwrap();
4176 let a = tx.create_node(0, &[]).unwrap();
4177 let b = tx.create_node(0, &[]).unwrap();
4178 tx.commit().unwrap();
4179 (a, b)
4180 };
4181 {
4182 let mut tx = db.begin_write().unwrap();
4183 tx.create_edge(src, dst, "KNOWS", std::collections::HashMap::new())
4184 .unwrap();
4185 tx.commit().unwrap();
4186 }
4187 // Also create a labeled node via Cypher so catalog has a label entry.
4188 db.execute("CREATE (n:Person {name: 'Alice'})").unwrap();
4189 let stats = db.stats().unwrap();
4190 assert!(
4191 stats
4192 .node_count_per_label
4193 .get("Person")
4194 .copied()
4195 .unwrap_or(0)
4196 >= 1
4197 );
4198 assert_eq!(
4199 stats.edge_count, 1,
4200 "expected 1 edge, got {}",
4201 stats.edge_count
4202 );
4203 assert!(stats.wal_bytes > 0);
4204 assert!(stats.total_bytes >= stats.wal_bytes);
4205 assert!(stats.bytes_per_label.get("Person").copied().unwrap_or(0) > 0);
4206 }
4207
4208 #[test]
4209 fn call_db_stats_cypher() {
4210 let dir = tempfile::tempdir().unwrap();
4211 let db = GraphDb::open(dir.path()).unwrap();
4212 db.execute("CREATE (n:Widget {name: 'w1'})").unwrap();
4213 let result = db.execute("CALL db.stats() YIELD metric, value").unwrap();
4214 assert_eq!(result.columns, vec!["metric", "value"]);
4215 assert!(!result.rows.is_empty());
4216 let metrics: std::collections::HashMap<_, _> = result
4217 .rows
4218 .iter()
4219 .filter_map(|row| match (&row[0], &row[1]) {
4220 (sparrowdb_execution::Value::String(m), sparrowdb_execution::Value::Int64(v)) => {
4221 Some((m.clone(), *v))
4222 }
4223 _ => None,
4224 })
4225 .collect();
4226 assert!(metrics.contains_key("total_bytes"));
4227 assert!(metrics.contains_key("wal_bytes"));
4228 assert!(metrics.contains_key("edge_count"));
4229 assert!(metrics.contains_key("nodes.Widget"));
4230 assert!(metrics.contains_key("label_bytes.Widget"));
4231 assert!(metrics["total_bytes"] > 0);
4232 }
4233
4234 #[test]
4235 fn stats_edge_count_after_checkpoint() {
4236 let dir = tempfile::tempdir().unwrap();
4237 let db = GraphDb::open(dir.path()).unwrap();
4238 let (n1, n2) = {
4239 let mut tx = db.begin_write().unwrap();
4240 let a = tx.create_node(0, &[]).unwrap();
4241 let b = tx.create_node(0, &[]).unwrap();
4242 tx.commit().unwrap();
4243 (a, b)
4244 };
4245 {
4246 let mut tx = db.begin_write().unwrap();
4247 tx.create_edge(n1, n2, "LINK", std::collections::HashMap::new())
4248 .unwrap();
4249 tx.commit().unwrap();
4250 }
4251 let before = db.stats().unwrap();
4252 assert!(
4253 before.edge_count > 0,
4254 "edge_count must be > 0 before checkpoint"
4255 );
4256 db.checkpoint().unwrap();
4257 let after = db.stats().unwrap();
4258 assert_eq!(after.edge_count, before.edge_count);
4259 }
4260}