Skip to main content

sparrowdb/
lib.rs

1// Suppress rustdoc warnings for Markdown links that are valid on GitHub/docs.rs
2// but cannot be resolved as Rust item paths (e.g., [LICENSE], [docs/...]).
3#![allow(rustdoc::bare_urls)]
4#![allow(rustdoc::broken_intra_doc_links)]
5//! SparrowDB — top-level public API.
6//!
7//! ## Transaction model (Phase 5 — SWMR)
8//!
9//! * **Single Writer**: at most one [`WriteTx`] may be active at a time.
10//!   [`GraphDb::begin_write`] returns [`Error::WriterBusy`] if a writer is
11//!   already open.
12//!
13//! * **Multiple Readers**: any number of [`ReadTx`] handles may coexist with
14//!   an active writer.  Each reader pins the committed `txn_id` at open time
15//!   and sees only data committed at or before that snapshot.
16//!
17//! ## Snapshot isolation for `set_node_col`
18//!
19//! Property updates (`set_node_col`) are recorded in an **in-memory version
20//! chain** keyed by `(NodeId, col_id)`.  Each entry is a `(txn_id, Value)`
21//! pair.  When a `ReadTx` reads a property it first consults the version chain
22//! and returns the most-recent value with `txn_id <= snapshot_txn_id`, falling
23//! back to the on-disk value written by `create_node` if no such entry exists.
24//!
25//! ## Quick start
26//!
27//! ```no_run
28//! use sparrowdb::GraphDb;
29//! let db = GraphDb::open(std::path::Path::new("/tmp/my.sparrow")).unwrap();
30//! db.checkpoint().unwrap();
31//! db.optimize().unwrap();
32//! ```
33
34use sparrowdb_catalog::catalog::{Catalog, LabelId};
35use sparrowdb_common::{col_id_of, TxnId};
36use sparrowdb_execution::Engine;
37
38// ── Export / import module ────────────────────────────────────────────────────
39pub mod export;
40pub use export::{EdgeDump, GraphDump, NodeDump};
41
42// ── Public re-exports ─────────────────────────────────────────────────────────
43//
44// Re-export the types that consumers of the top-level `sparrowdb` crate need
45// without also having to depend on the sub-crates directly.
46
47/// Query result returned by [`GraphDb::execute`] and [`GraphDb::execute_write`].
48/// Contains column names and rows of scalar [`Value`] cells.
49pub use sparrowdb_execution::QueryResult;
50
51/// Scalar value type used in query results and node property reads/writes.
52pub use sparrowdb_storage::node_store::Value;
53
54/// Opaque 64-bit node identifier.
55pub use sparrowdb_common::NodeId;
56
57/// Opaque 64-bit edge identifier.
58pub use sparrowdb_common::EdgeId;
59
60/// Error type returned by all SparrowDB operations.
61pub use sparrowdb_common::Error;
62
63/// Convenience alias: `std::result::Result<T, sparrowdb::Error>`.
64pub use sparrowdb_common::Result;
65
66/// Per-rel-table edge property cache (SPA-261).
67type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
68
69// ── DbStats ───────────────────────────────────────────────────────────────────
70
71/// Storage-size snapshot returned by [`GraphDb::stats`] (SPA-171).
72#[derive(Debug, Clone, Default)]
73pub struct DbStats {
74    /// Total bytes on disk across all database files.
75    pub total_bytes: u64,
76    /// Bytes consumed by node column files for each label.
77    pub bytes_per_label: std::collections::HashMap<String, u64>,
78    /// High-water mark (HWM) per label — the highest slot index allocated for
79    /// that label, plus one.  Soft-deleted nodes whose slots have not yet been
80    /// reclaimed by compaction are still included in this count.  The value
81    /// converges to the true live-node count once compaction / GC runs.
82    pub node_count_per_label: std::collections::HashMap<String, u64>,
83    /// Total edges across all relationship types (delta log + CSR).
84    pub edge_count: u64,
85    /// Bytes used by all WAL segment files under `wal/`.
86    pub wal_bytes: u64,
87}
88
89use sparrowdb_storage::csr::CsrForward;
90use sparrowdb_storage::edge_store::{EdgeStore, RelTableId};
91use sparrowdb_storage::maintenance::MaintenanceEngine;
92use sparrowdb_storage::node_store::NodeStore;
93use sparrowdb_storage::wal::codec::{WalPayload, WalRecordKind};
94use sparrowdb_storage::wal::writer::WalWriter;
95use std::collections::{HashMap, HashSet};
96use std::path::{Path, PathBuf};
97use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
98use std::sync::{Arc, Mutex, RwLock};
99use tracing::info_span;
100
101// ── Version chain ─────────────────────────────────────────────────────────────
102
103/// Key for the version map: (packed NodeId value, column id).
104type VersionKey = (u64, u32);
105
106/// One committed version of a property value.
107#[derive(Clone)]
108struct Version {
109    /// The `txn_id` at which this value was committed.
110    committed_at: u64,
111    value: Value,
112}
113
114/// In-memory MVCC version store for property updates.
115///
116/// A `Vec<Version>` per `(NodeId, col_id)` key, sorted ascending by
117/// `committed_at`.  Readers binary-search for the latest version ≤ their
118/// snapshot.
119#[derive(Default)]
120struct VersionStore {
121    map: HashMap<VersionKey, Vec<Version>>,
122}
123
124impl VersionStore {
125    /// Record a committed property update.
126    fn insert(&mut self, node_id: NodeId, col_id: u32, committed_at: u64, value: Value) {
127        let versions = self.map.entry((node_id.0, col_id)).or_default();
128        versions.push(Version {
129            committed_at,
130            value,
131        });
132        // Keep sorted — writers are serialized so this is always appended in
133        // order, but sort to be safe.
134        versions.sort_by_key(|v| v.committed_at);
135    }
136
137    /// Return the most-recent value committed at or before `snapshot_txn_id`,
138    /// or `None` if no update exists within the snapshot window.
139    fn get_at(&self, node_id: NodeId, col_id: u32, snapshot_txn_id: u64) -> Option<Value> {
140        let versions = self.map.get(&(node_id.0, col_id))?;
141        // Binary search for the rightmost version with committed_at ≤ snapshot.
142        let idx = versions.partition_point(|v| v.committed_at <= snapshot_txn_id);
143        if idx == 0 {
144            None
145        } else {
146            Some(versions[idx - 1].value.clone())
147        }
148    }
149}
150
151// ── Pending write buffer ──────────────────────────────────────────────────────
152
153/// One staged update: the new value plus the before-image read from disk/
154/// version-chain at the time the update was staged.
155struct StagedUpdate {
156    /// Before-image — the value a reader with `snapshot ≤ prev_txn_id` should
157    /// see.  Stored at `prev_txn_id` in the version chain on commit so that
158    /// pre-existing readers retain correct snapshot access even after the
159    /// on-disk column has been overwritten.
160    ///
161    /// `None` if the key already had an entry in the version chain (so the
162    /// chain already carries the correct before-image).
163    before_image: Option<(u64, Value)>, // (txn_id_to_store_at, value)
164    /// The new value to commit.
165    new_value: Value,
166    /// Human-readable property key name used for WAL emission.
167    /// For columns staged by col_id directly (e.g. from create_node), this is
168    /// the synthesized form `"col_{col_id}"`.
169    key_name: String,
170}
171
172/// Staged (uncommitted) property updates for an active `WriteTx`.
173///
174/// Keyed by `(NodeId, col_id)`; only the latest staged value for each key
175/// matters.
176#[derive(Default)]
177struct WriteBuffer {
178    updates: HashMap<VersionKey, StagedUpdate>,
179}
180
181// ── WAL mutation log entries ──────────────────────────────────────────────────
182
183/// A mutation staged in a `WriteTx` for WAL emission at commit time.
184enum WalMutation {
185    /// A node was created (SPA-123, SPA-127).
186    NodeCreate {
187        node_id: NodeId,
188        label_id: u32,
189        /// Property data as `(col_id, value)` pairs used for on-disk writes.
190        props: Vec<(u32, Value)>,
191        /// Human-readable property names parallel to `props`.
192        ///
193        /// When property names are available (e.g. from a Cypher literal) they
194        /// are recorded here so the WAL can store them for schema introspection
195        /// (`CALL db.schema()`).  Empty when names are not known (e.g. low-level
196        /// `create_node` calls that only have col_ids).
197        prop_names: Vec<String>,
198    },
199    /// A node was deleted (SPA-125, SPA-127).
200    NodeDelete { node_id: NodeId },
201    /// An edge was created (SPA-126, SPA-127).
202    EdgeCreate {
203        edge_id: EdgeId,
204        src: NodeId,
205        dst: NodeId,
206        rel_type: String,
207        /// Human-readable (name, value) pairs for WAL observability and schema introspection.
208        prop_entries: Vec<(String, Value)>,
209    },
210    /// A specific directed edge was deleted.
211    EdgeDelete {
212        src: NodeId,
213        dst: NodeId,
214        rel_type: String,
215    },
216}
217
218// ── Pending structural operations (SPA-181 buffering) ────────────────────────
219
220/// A buffered structural mutation that will be applied to storage on commit.
221///
222/// Property updates (`set_node_col`) already go through [`WriteBuffer`] and
223/// are flushed by the existing commit machinery.  These `PendingOp` entries
224/// cover the remaining mutations that previously wrote directly to disk
225/// before commit.
226///
227/// Note: catalog schema changes (`create_label`, `get_or_create_rel_type_id`)
228/// are still written immediately because labels are idempotent metadata.
229/// Full schema-change atomicity is deferred to a future phase.
230enum PendingOp {
231    /// Create a node: write to the node-store at the pre-reserved `slot` and
232    /// advance the on-disk HWM.
233    NodeCreate {
234        label_id: u32,
235        slot: u32,
236        props: Vec<(u32, Value)>,
237    },
238    /// Delete a node: tombstone col_0 with `u64::MAX`.
239    NodeDelete { node_id: NodeId },
240    /// Create an edge: append to the edge delta log.
241    EdgeCreate {
242        src: NodeId,
243        dst: NodeId,
244        rel_table_id: RelTableId,
245        /// Encoded (col_id, value_u64) pairs to persist in edge_props.bin.
246        props: Vec<(u32, u64)>,
247    },
248    /// Delete an edge: rewrite the delta log excluding this record.
249    EdgeDelete {
250        src: NodeId,
251        dst: NodeId,
252        rel_table_id: RelTableId,
253    },
254}
255
256// ── Node version tracker (for MVCC conflict detection, SPA-128) ───────────────
257
258/// Tracks the `txn_id` of the last committed write to each node.
259#[derive(Default)]
260struct NodeVersions {
261    /// `node_id.0 → last_committed_txn_id`.
262    map: HashMap<u64, u64>,
263}
264
265impl NodeVersions {
266    fn set(&mut self, node_id: NodeId, txn_id: u64) {
267        self.map.insert(node_id.0, txn_id);
268    }
269
270    fn get(&self, node_id: NodeId) -> u64 {
271        self.map.get(&node_id.0).copied().unwrap_or(0)
272    }
273}
274
275// ── Shared inner state ────────────────────────────────────────────────────────
276
277struct DbInner {
278    path: PathBuf,
279    /// Monotonically increasing; starts at 0 (no writes committed yet).
280    /// Incremented atomically after each successful `WriteTx` commit.
281    current_txn_id: AtomicU64,
282    /// Ensures at most one writer exists at a time.
283    ///
284    /// `false` = no active writer; `true` = writer active.
285    /// Use `compare_exchange(false, true)` for try-lock semantics.
286    write_locked: AtomicBool,
287    /// MVCC version chains for property updates (snapshot-isolation support).
288    versions: RwLock<VersionStore>,
289    /// Per-node last-committed txn_id (write-write conflict detection, SPA-128).
290    node_versions: RwLock<NodeVersions>,
291    /// Optional 32-byte encryption key (SPA-98).  Retained for future use
292    /// (e.g., reopening the WAL writer after a full segment rotation triggered
293    /// by an external checkpoint).  The key is encoded into the WAL writer at
294    /// open time via [`WalWriter::open_encrypted`].
295    #[allow(dead_code)]
296    encryption_key: Option<[u8; 32]>,
297    unique_constraints: RwLock<HashSet<(u32, u32)>>,
298    /// Shared property-index cache (SPA-187).
299    ///
300    /// Read queries clone from this at `Engine::new_with_cached_index` time and
301    /// write their lazily-populated index back after execution.  Write
302    /// transactions invalidate it via `clear()` on commit so the next read
303    /// re-populates from the updated column files.
304    prop_index: RwLock<sparrowdb_storage::property_index::PropertyIndex>,
305    /// Shared catalog cache (SPA-188).
306    ///
307    /// Read queries clone from this to avoid re-parsing the TLV catalog file
308    /// on every query.  DDL operations (label/rel-type creation, constraints)
309    /// refresh the cache via `invalidate_catalog()`.
310    catalog: RwLock<Catalog>,
311    /// Cached per-type CSR forward map (SPA-189).
312    ///
313    /// Populated at `GraphDb::open` time and cloned into each `Engine`
314    /// instance.  Invalidated after CHECKPOINT / OPTIMIZE since those compact
315    /// the delta log into new CSR base files.
316    csr_map: RwLock<HashMap<u32, CsrForward>>,
317    /// Cached label row counts (SPA-190).
318    ///
319    /// Maps `LabelId → node count` (high-water mark).  Built once at open
320    /// time and refreshed after any write that creates or deletes nodes.
321    /// Passed to `Engine::new_with_all_caches` to avoid per-label HWM disk
322    /// reads on every read query.
323    label_row_counts: RwLock<HashMap<LabelId, usize>>,
324    /// Persistent WAL writer (SPA-210).
325    ///
326    /// Reused across transaction commits to avoid paying segment-scan and
327    /// file-open overhead on every write.  Protected by a `Mutex` because
328    /// only one writer exists at a time (the `write_locked` flag enforces
329    /// SWMR, so contention is zero in practice).
330    wal_writer: Mutex<WalWriter>,
331    /// Shared edge-property cache (SPA-261).
332    edge_props_cache: EdgePropsCache,
333}
334
335// ── Write-lock guard (SPA-181: replaces 'static transmute UB) ────────────────
336
337/// RAII guard that holds the exclusive writer lock for a [`DbInner`].
338///
339/// Acquired by [`GraphDb::begin_write`] via an atomic compare-exchange
340/// (`false → true`).  Released on `Drop` by resetting the flag to `false`.
341/// Because the guard holds an [`Arc<DbInner>`] the `DbInner` (and the
342/// `AtomicBool` it contains) is guaranteed to outlive the guard — no unsafe
343/// lifetime extension needed.
344struct WriteGuard {
345    inner: Arc<DbInner>,
346}
347
348impl WriteGuard {
349    /// Try to acquire the write lock.  Returns `None` if already held.
350    fn try_acquire(inner: &Arc<DbInner>) -> Option<Self> {
351        inner
352            .write_locked
353            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
354            .ok()
355            .map(|_| WriteGuard {
356                inner: Arc::clone(inner),
357            })
358    }
359}
360
361impl Drop for WriteGuard {
362    fn drop(&mut self) {
363        // Release the lock.
364        self.inner.write_locked.store(false, Ordering::Release);
365    }
366}
367
368// ── GraphDb ───────────────────────────────────────────────────────────────────
369
370/// The top-level SparrowDB handle.
371///
372/// Cheaply cloneable — all clones share the same underlying state.
373/// Obtain an instance via [`GraphDb::open`] or the free [`open`] function.
374#[derive(Clone)]
375pub struct GraphDb {
376    inner: Arc<DbInner>,
377}
378
379impl GraphDb {
380    /// Return the filesystem path of this database.
381    pub fn path(&self) -> &Path {
382        &self.inner.path
383    }
384
385    /// Open (or create) a SparrowDB database at `path`.
386    pub fn open(path: &Path) -> Result<Self> {
387        std::fs::create_dir_all(path)?;
388        let catalog = Catalog::open(path)?;
389        let wal_dir = path.join("wal");
390        let wal_writer = WalWriter::open(&wal_dir)?;
391        let label_row_counts = RwLock::new(build_label_row_counts_from_disk(&catalog, path));
392        Ok(GraphDb {
393            inner: Arc::new(DbInner {
394                path: path.to_path_buf(),
395                current_txn_id: AtomicU64::new(0),
396                write_locked: AtomicBool::new(false),
397                versions: RwLock::new(VersionStore::default()),
398                node_versions: RwLock::new(NodeVersions::default()),
399                encryption_key: None,
400                unique_constraints: RwLock::new(HashSet::new()),
401                prop_index: RwLock::new(sparrowdb_storage::property_index::PropertyIndex::new()),
402                catalog: RwLock::new(catalog),
403                csr_map: RwLock::new(open_csr_map(path)),
404                label_row_counts,
405                wal_writer: Mutex::new(wal_writer),
406                edge_props_cache: Arc::new(RwLock::new(HashMap::new())),
407            }),
408        })
409    }
410
411    /// Open (or create) an encrypted SparrowDB database at `path` (SPA-98).
412    ///
413    /// WAL payloads are encrypted with XChaCha20-Poly1305 using `key`.
414    /// The same 32-byte key must be supplied on every subsequent open of this
415    /// database — opening with a wrong or missing key will cause WAL replay to
416    /// fail with [`Error::EncryptionAuthFailed`].
417    ///
418    /// # Errors
419    /// Returns an error if the directory cannot be created.
420    pub fn open_encrypted(path: &Path, key: [u8; 32]) -> Result<Self> {
421        std::fs::create_dir_all(path)?;
422        let catalog = Catalog::open(path)?;
423        let wal_dir = path.join("wal");
424        let wal_writer = WalWriter::open_encrypted(&wal_dir, key)?;
425        let label_row_counts = RwLock::new(build_label_row_counts_from_disk(&catalog, path));
426        Ok(GraphDb {
427            inner: Arc::new(DbInner {
428                path: path.to_path_buf(),
429                current_txn_id: AtomicU64::new(0),
430                write_locked: AtomicBool::new(false),
431                versions: RwLock::new(VersionStore::default()),
432                node_versions: RwLock::new(NodeVersions::default()),
433                encryption_key: Some(key),
434                unique_constraints: RwLock::new(HashSet::new()),
435                prop_index: RwLock::new(sparrowdb_storage::property_index::PropertyIndex::new()),
436                catalog: RwLock::new(catalog),
437                csr_map: RwLock::new(open_csr_map(path)),
438                label_row_counts,
439                wal_writer: Mutex::new(wal_writer),
440                edge_props_cache: Arc::new(RwLock::new(HashMap::new())),
441            }),
442        })
443    }
444
445    /// Invalidate the shared property-index cache (SPA-187).
446    ///
447    /// Called after every write-transaction commit so the next read query
448    /// re-populates from the updated column files on disk.
449    fn invalidate_prop_index(&self) {
450        self.inner
451            .prop_index
452            .write()
453            .expect("prop_index RwLock poisoned")
454            .clear();
455    }
456
457    /// Refresh the shared catalog cache from disk (SPA-188) and simultaneously
458    /// rebuild the cached label row counts (SPA-190).
459    ///
460    /// Called after DDL operations and any write commit so the next read query
461    /// sees the updated schema and fresh node counts without extra disk reads.
462    fn invalidate_catalog(&self) {
463        if let Ok(fresh) = Catalog::open(&self.inner.path) {
464            // Rebuild row counts while we have the fresh catalog in hand —
465            // no extra I/O to open it again.
466            let new_counts = build_label_row_counts_from_disk(&fresh, &self.inner.path);
467            *self.inner.catalog.write().expect("catalog RwLock poisoned") = fresh;
468            *self
469                .inner
470                .label_row_counts
471                .write()
472                .expect("label_row_counts RwLock poisoned") = new_counts;
473        }
474    }
475
476    /// Build a read-optimised `Engine` using all available shared caches
477    /// (PropertyIndex, CSR map, label row counts) — SPA-190.
478    ///
479    /// All read query paths should use this instead of calling
480    /// `Engine::new_with_cached_index` directly.
481    fn build_read_engine(
482        &self,
483        store: NodeStore,
484        catalog: Catalog,
485        csrs: HashMap<u32, CsrForward>,
486    ) -> Engine {
487        Engine::new_with_all_caches(
488            store,
489            catalog,
490            csrs,
491            &self.inner.path,
492            Some(&self.inner.prop_index),
493            Some(self.cached_label_row_counts()),
494            Some(Arc::clone(&self.inner.edge_props_cache)),
495        )
496    }
497
498    /// Return a clone of the cached label row counts (SPA-190).
499    fn cached_label_row_counts(&self) -> HashMap<LabelId, usize> {
500        self.inner
501            .label_row_counts
502            .read()
503            .expect("label_row_counts RwLock poisoned")
504            .clone()
505    }
506
507    /// Clone the cached catalog for use in a single query (SPA-188).
508    fn catalog_snapshot(&self) -> Catalog {
509        self.inner
510            .catalog
511            .read()
512            .expect("catalog RwLock poisoned")
513            .clone()
514    }
515
516    /// Refresh the cached CSR forward map from disk (SPA-189).
517    ///
518    /// Called after CHECKPOINT / OPTIMIZE since those compact the delta log
519    /// into new CSR base files.  Regular writes do NOT need this because
520    /// the delta log is read separately by the engine.
521    ///
522    /// Only replaces the cache when `open_csr_map` succeeds in loading the
523    /// catalog; a failed reload (e.g. transient I/O error) leaves the
524    /// existing in-memory map intact rather than replacing it with an empty
525    /// map that would cause subsequent queries to miss all checkpointed edges.
526    /// Clear the entire edge-props cache (SPA-261).
527    fn invalidate_edge_props_cache(&self) {
528        self.inner
529            .edge_props_cache
530            .write()
531            .expect("edge_props_cache poisoned")
532            .clear();
533    }
534
535    fn invalidate_csr_map(&self) {
536        if let Ok(fresh) = try_open_csr_map(&self.inner.path) {
537            *self.inner.csr_map.write().expect("csr_map RwLock poisoned") = fresh;
538        }
539    }
540
541    /// Clone the cached CSR map for use in a single query (SPA-189).
542    fn cached_csr_map(&self) -> HashMap<u32, CsrForward> {
543        self.inner
544            .csr_map
545            .read()
546            .expect("csr_map RwLock poisoned")
547            .clone()
548    }
549
550    /// Open a read-only snapshot transaction.
551    ///
552    /// The returned [`ReadTx`] sees all data committed at or before the
553    /// current `txn_id` at the moment of this call.
554    pub fn begin_read(&self) -> Result<ReadTx> {
555        let snapshot_txn_id = self.inner.current_txn_id.load(Ordering::Acquire);
556        let store = NodeStore::open(&self.inner.path)?;
557        Ok(ReadTx {
558            snapshot_txn_id,
559            store,
560            inner: Arc::clone(&self.inner),
561        })
562    }
563
564    /// Open a write transaction.
565    ///
566    /// Returns [`Error::WriterBusy`] immediately if another [`WriteTx`] is
567    /// already active (try-lock semantics — does not block).
568    pub fn begin_write(&self) -> Result<WriteTx> {
569        // Atomically acquire the write lock (false → true).
570        // No unsafe transmute — WriteGuard holds Arc<DbInner> and resets
571        // the AtomicBool on Drop, so the lock flag always outlives the guard.
572        let guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
573        let snapshot_txn_id = self.inner.current_txn_id.load(Ordering::Acquire);
574        let store = NodeStore::open(&self.inner.path)?;
575        // WriteTx opens a fresh catalog from disk because it may create labels
576        // or rel types, and needs the latest on-disk state to avoid ID conflicts.
577        let catalog = Catalog::open(&self.inner.path)?;
578        Ok(WriteTx {
579            inner: Arc::clone(&self.inner),
580            store,
581            catalog,
582            write_buf: WriteBuffer::default(),
583            wal_mutations: Vec::new(),
584            dirty_nodes: HashSet::new(),
585            snapshot_txn_id,
586            _guard: guard,
587            committed: false,
588            fulltext_pending: HashMap::new(),
589            pending_ops: Vec::new(),
590        })
591    }
592
593    /// Run a CHECKPOINT: fold the delta log into CSR base files, emit WAL
594    /// records, and publish the new `wal_checkpoint_lsn` to the metapage.
595    ///
596    /// Acquires the writer lock for the duration — no concurrent writes.
597    ///
598    /// Returns [`Error::WriterBusy`] immediately if a [`WriteTx`] is currently
599    /// active, rather than blocking indefinitely.  This prevents deadlocks on
600    /// single-threaded callers and avoids unbounded waits on multi-threaded ones.
601    pub fn checkpoint(&self) -> Result<()> {
602        let _guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
603        let catalog = self.catalog_snapshot();
604        let node_store = NodeStore::open(&self.inner.path)?;
605        let (rel_table_ids, n_nodes) =
606            collect_maintenance_params(&catalog, &node_store, &self.inner.path);
607        let engine = MaintenanceEngine::new(&self.inner.path);
608        engine.checkpoint(&rel_table_ids, n_nodes)?;
609        self.invalidate_csr_map();
610        self.invalidate_edge_props_cache();
611        Ok(())
612    }
613
614    /// Run an OPTIMIZE: same as CHECKPOINT but additionally sorts each source
615    /// node's neighbor list by `(dst_node_id)` ascending.
616    ///
617    /// Acquires the writer lock for the duration — no concurrent writes.
618    ///
619    /// Returns [`Error::WriterBusy`] immediately if a [`WriteTx`] is currently
620    /// active, rather than blocking indefinitely.  This prevents deadlocks on
621    /// single-threaded callers and avoids unbounded waits on multi-threaded ones.
622    pub fn optimize(&self) -> Result<()> {
623        let _guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
624        let catalog = self.catalog_snapshot();
625        let node_store = NodeStore::open(&self.inner.path)?;
626        let (rel_table_ids, n_nodes) =
627            collect_maintenance_params(&catalog, &node_store, &self.inner.path);
628        let engine = MaintenanceEngine::new(&self.inner.path);
629        engine.optimize(&rel_table_ids, n_nodes)?;
630        self.invalidate_csr_map();
631        self.invalidate_edge_props_cache();
632        Ok(())
633    }
634
635    /// Return a storage-size snapshot for this database (SPA-171).
636    ///
637    /// Pure read — no locks acquired, no writes performed.
638    ///
639    /// # Best-effort semantics
640    ///
641    /// Filesystem entries that cannot be read (missing directories, permission
642    /// errors, partially corrupt files) are silently skipped.  The returned
643    /// snapshot may therefore undercount bytes or edges if the database
644    /// directory is in an inconsistent state.  Callers that need exact
645    /// accounting should treat the reported values as a lower bound.
646    pub fn stats(&self) -> Result<DbStats> {
647        let db_root = &self.inner.path;
648        let catalog = self.catalog_snapshot();
649        let node_store = NodeStore::open(db_root)?;
650        let mut stats = DbStats::default();
651
652        for (label_id, label_name) in catalog.list_labels()? {
653            let lid = label_id as u32;
654            stats.node_count_per_label.insert(
655                label_name.clone(),
656                node_store.hwm_for_label(lid).unwrap_or(0),
657            );
658            let mut lb: u64 = 0;
659            if let Ok(es) = std::fs::read_dir(db_root.join("nodes").join(lid.to_string())) {
660                for e in es.flatten() {
661                    if let Ok(m) = e.metadata() {
662                        lb += m.len();
663                    }
664                }
665            }
666            stats.bytes_per_label.insert(label_name, lb);
667        }
668
669        if let Ok(es) = std::fs::read_dir(db_root.join("wal")) {
670            for e in es.flatten() {
671                let n = e.file_name();
672                let ns = n.to_string_lossy();
673                if ns.starts_with("segment-") && ns.ends_with(".wal") {
674                    if let Ok(m) = e.metadata() {
675                        stats.wal_bytes += m.len();
676                    }
677                }
678            }
679        }
680
681        const DR: u64 = 20; // DeltaRecord: src(8) + dst(8) + rel_id(4) = 20 bytes
682        if let Ok(ts) = std::fs::read_dir(db_root.join("edges")) {
683            for t in ts.flatten() {
684                if !t.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
685                    continue;
686                }
687                let rd = t.path();
688                if let Ok(m) = std::fs::metadata(rd.join("delta.log")) {
689                    stats.edge_count += m.len().checked_div(DR).unwrap_or(0);
690                }
691                let fp = rd.join("base.fwd.csr");
692                if fp.exists() {
693                    if let Ok(b) = std::fs::read(&fp) {
694                        if let Ok(csr) = CsrForward::decode(&b) {
695                            stats.edge_count += csr.n_edges();
696                        }
697                    }
698                }
699            }
700        }
701
702        stats.total_bytes = dir_size_bytes(db_root);
703        Ok(stats)
704    }
705
706    /// Execute a Cypher query.
707    ///
708    /// Read-only statements (`MATCH … RETURN`) are executed against a
709    /// point-in-time snapshot.  Mutation statements (`MERGE`, `MATCH … SET`,
710    /// `MATCH … DELETE`) open a write transaction internally and commit on
711    /// success.  `CREATE` statements auto-register labels and write nodes (SPA-156).
712    /// `CHECKPOINT` and `OPTIMIZE` delegate to the maintenance engine (SPA-189).
713    pub fn execute(&self, cypher: &str) -> Result<QueryResult> {
714        use sparrowdb_cypher::ast::Statement;
715        use sparrowdb_cypher::{bind, parse};
716
717        let stmt = parse(cypher)?;
718        let catalog_snap = self.catalog_snapshot();
719        let bound = bind(stmt, &catalog_snap)?;
720
721        // SPA-189: wire CHECKPOINT and OPTIMIZE to their real implementations
722        // before entering the mutation / read-only dispatch below.
723        match &bound.inner {
724            Statement::Checkpoint => {
725                self.checkpoint()?;
726                return Ok(QueryResult::empty(vec![]));
727            }
728            Statement::Optimize => {
729                self.optimize()?;
730                return Ok(QueryResult::empty(vec![]));
731            }
732            Statement::CreateConstraint { label, property } => {
733                return self.register_unique_constraint(label, property);
734            }
735            _ => {}
736        }
737
738        if Engine::is_mutation(&bound.inner) {
739            match bound.inner {
740                Statement::Merge(ref m) => self.execute_merge(m),
741                Statement::MatchMergeRel(ref mm) => self.execute_match_merge_rel(mm),
742                Statement::MatchMutate(ref mm) => self.execute_match_mutate(mm),
743                Statement::MatchCreate(ref mc) => self.execute_match_create(mc),
744                // Standalone CREATE with edges — must go through WriteTx so
745                // create_edge can register the rel type and write the WAL.
746                Statement::Create(ref c) => self.execute_create_standalone(c),
747                _ => unreachable!(),
748            }
749        } else {
750            let _span = info_span!("sparrowdb.query").entered();
751
752            let mut engine = {
753                let _open_span = info_span!("sparrowdb.open_engine").entered();
754                let csrs = self.cached_csr_map();
755                self.build_read_engine(NodeStore::open(&self.inner.path)?, catalog_snap, csrs)
756            };
757
758            let result = {
759                let _exec_span = info_span!("sparrowdb.execute").entered();
760                engine.execute_statement(bound.inner)?
761            };
762
763            // Write lazily-loaded columns back to the shared cache.
764            engine.write_back_prop_index(&self.inner.prop_index);
765
766            tracing::debug!(rows = result.rows.len(), "query complete");
767            Ok(result)
768        }
769    }
770
771    /// Execute a Cypher query with a per-query timeout (SPA-254).
772    ///
773    /// Identical to [`execute`](Self::execute) but sets a deadline of
774    /// `Instant::now() + timeout` before dispatching to the engine.  The
775    /// engine checks the deadline at the top of every hot scan / traversal
776    /// loop.  If the deadline passes before the query completes,
777    /// [`Error::QueryTimeout`] is returned.
778    ///
779    /// `execute_with_timeout` only supports read-only (`MATCH … RETURN`)
780    /// statements today; mutation statements are forwarded to the existing
781    /// write-transaction code path **without** a timeout (they typically
782    /// complete in O(1) writes and are not the source of runaway queries).
783    ///
784    /// # Example
785    /// ```no_run
786    /// use sparrowdb::GraphDb;
787    /// use std::time::Duration;
788    ///
789    /// let db = GraphDb::open(std::path::Path::new("/tmp/g.sparrow")).unwrap();
790    /// let result = db.execute_with_timeout(
791    ///     "MATCH (n:Person) RETURN n.name",
792    ///     Duration::from_secs(5),
793    /// );
794    /// match result {
795    ///     Ok(qr) => println!("{} rows", qr.rows.len()),
796    ///     Err(sparrowdb::Error::QueryTimeout) => eprintln!("query timed out"),
797    ///     Err(e) => eprintln!("error: {e}"),
798    /// }
799    /// ```
800    pub fn execute_with_timeout(
801        &self,
802        cypher: &str,
803        timeout: std::time::Duration,
804    ) -> Result<QueryResult> {
805        use sparrowdb_cypher::ast::Statement;
806        use sparrowdb_cypher::{bind, parse};
807
808        let stmt = parse(cypher)?;
809        let catalog_snap = self.catalog_snapshot();
810        let bound = bind(stmt, &catalog_snap)?;
811
812        // Delegate CHECKPOINT/OPTIMIZE immediately — no timeout needed.
813        match &bound.inner {
814            Statement::Checkpoint => {
815                self.checkpoint()?;
816                return Ok(QueryResult::empty(vec![]));
817            }
818            Statement::Optimize => {
819                self.optimize()?;
820                return Ok(QueryResult::empty(vec![]));
821            }
822            Statement::CreateConstraint { label, property } => {
823                return self.register_unique_constraint(label, property);
824            }
825            _ => {}
826        }
827
828        if Engine::is_mutation(&bound.inner) {
829            // Mutation statements go through the standard write-transaction
830            // path; they are not the source of runaway queries.
831            return match bound.inner {
832                Statement::Merge(ref m) => self.execute_merge(m),
833                Statement::MatchMutate(ref mm) => self.execute_match_mutate(mm),
834                Statement::MatchCreate(ref mc) => self.execute_match_create(mc),
835                Statement::Create(ref c) => self.execute_create_standalone(c),
836                _ => unreachable!(),
837            };
838        }
839
840        let _span = info_span!("sparrowdb.query_with_timeout").entered();
841        let deadline = std::time::Instant::now() + timeout;
842
843        let mut engine = {
844            let _open_span = info_span!("sparrowdb.open_engine").entered();
845            let csrs = self.cached_csr_map();
846            self.build_read_engine(NodeStore::open(&self.inner.path)?, catalog_snap, csrs)
847                .with_deadline(deadline)
848        };
849
850        let result = {
851            let _exec_span = info_span!("sparrowdb.execute").entered();
852            engine.execute_statement(bound.inner)?
853        };
854
855        engine.write_back_prop_index(&self.inner.prop_index);
856
857        tracing::debug!(rows = result.rows.len(), "query_with_timeout complete");
858        Ok(result)
859    }
860
861    /// Internal: execute a standalone `CREATE (a)-[:R]->(b)` statement.
862    ///
863    /// Creates all declared nodes, then for each edge in the pattern looks up
864    /// the freshly-created node IDs by variable name and calls
865    /// `WriteTx::create_edge`.  This is needed when the CREATE clause contains
866    /// relationship patterns, because edge creation must be routed through a
867    /// write transaction so the relationship type is registered in the catalog
868    /// and the edge is appended to the WAL (SPA-182).
869    fn register_unique_constraint(&self, label: &str, property: &str) -> Result<QueryResult> {
870        // SPA-234: auto-create label if absent so the constraint is registered
871        // even before any nodes of this label exist.  Subsequent CREATE
872        // statements look up the label_id from the persisted catalog and compare
873        // against the same unique_constraints set.
874        //
875        // Acquire the writer lock so that catalog mutations here cannot race
876        // with an active WriteTx that also holds an open Catalog handle.
877        // Both paths derive next_label_id from their own in-memory counter, so
878        // concurrent catalog writes without this guard can assign duplicate IDs.
879        let _guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
880        let mut catalog = sparrowdb_catalog::catalog::Catalog::open(&self.inner.path)?;
881        let label_id: u32 = match catalog.get_label(label)? {
882            Some(id) => id as u32,
883            None => {
884                let id = catalog.create_label(label)? as u32;
885                self.invalidate_catalog();
886                id
887            }
888        };
889        let col_id = sparrowdb_common::col_id_of(property);
890        self.inner
891            .unique_constraints
892            .write()
893            .unwrap()
894            .insert((label_id, col_id));
895        Ok(QueryResult::empty(vec![]))
896    }
897
898    fn execute_create_standalone(
899        &self,
900        create: &sparrowdb_cypher::ast::CreateStatement,
901    ) -> Result<QueryResult> {
902        // Pre-flight: verify that every variable referenced by an edge is
903        // declared as a named node in this CREATE clause.  Doing this before
904        // opening the write transaction ensures that a malformed CREATE fails
905        // cleanly without leaving orphaned nodes on disk.
906        let declared_vars: HashSet<&str> = create
907            .nodes
908            .iter()
909            .filter(|n| !n.var.is_empty())
910            .map(|n| n.var.as_str())
911            .collect();
912
913        for (left_var, _, right_var) in &create.edges {
914            if !left_var.is_empty() && !declared_vars.contains(left_var.as_str()) {
915                return Err(sparrowdb_common::Error::InvalidArgument(format!(
916                    "CREATE edge references undeclared variable '{left_var}'"
917                )));
918            }
919            if !right_var.is_empty() && !declared_vars.contains(right_var.as_str()) {
920                return Err(sparrowdb_common::Error::InvalidArgument(format!(
921                    "CREATE edge references undeclared variable '{right_var}'"
922                )));
923            }
924        }
925
926        // SPA-208: reject reserved __SO_ label prefix on nodes.
927        for node in &create.nodes {
928            if let Some(label) = node.labels.first() {
929                if is_reserved_label(label) {
930                    return Err(reserved_label_error(label));
931                }
932            }
933        }
934
935        // SPA-208: reject reserved __SO_ rel-type prefix on edges.
936        for (_, rel_pat, _) in &create.edges {
937            if is_reserved_label(&rel_pat.rel_type) {
938                return Err(reserved_label_error(&rel_pat.rel_type));
939            }
940        }
941
942        let mut tx = self.begin_write()?;
943
944        // Map variable name → NodeId for all newly created nodes.
945        let mut var_to_node: HashMap<String, NodeId> = HashMap::new();
946
947        for node in &create.nodes {
948            let label = node.labels.first().cloned().unwrap_or_default();
949            let label_id: u32 = match tx.catalog.get_label(&label)? {
950                Some(id) => id as u32,
951                None => tx.catalog.create_label(&label)? as u32,
952            };
953
954            let named_props: Vec<(String, Value)> = node
955                .props
956                .iter()
957                .map(|entry| {
958                    let val = match &entry.value {
959                        sparrowdb_cypher::ast::Expr::Literal(
960                            sparrowdb_cypher::ast::Literal::Null,
961                        ) => Err(sparrowdb_common::Error::InvalidArgument(format!(
962                            "CREATE property '{}' is null; use a concrete value",
963                            entry.key
964                        ))),
965                        sparrowdb_cypher::ast::Expr::Literal(
966                            sparrowdb_cypher::ast::Literal::Param(p),
967                        ) => Err(sparrowdb_common::Error::InvalidArgument(format!(
968                            "CREATE property '{}' references parameter ${p}; runtime parameters are not yet supported in standalone CREATE",
969                            entry.key
970                        ))),
971                        sparrowdb_cypher::ast::Expr::Literal(lit) => {
972                            Ok(literal_to_value(lit))
973                        }
974                        _ => Err(sparrowdb_common::Error::InvalidArgument(format!(
975                            "CREATE property '{}' must be a literal value (int, float, bool, or string)",
976                            entry.key
977                        ))),
978                    }?;
979                    Ok((entry.key.clone(), val))
980                })
981                .collect::<Result<Vec<_>>>()?;
982
983            // SPA-234: enforce UNIQUE constraints before writing.
984            //
985            // We compare decoded Values rather than raw u64 heap pointers.
986            // Strings longer than 7 bytes are stored as store-specific heap
987            // pointers: the same string written by two different NodeStore
988            // handles produces two different raw u64 values.  Decoding each
989            // stored raw u64 back to Value and comparing is correct for all
990            // value types (inline integers/short strings AND heap-overflow strings).
991            //
992            // We check both the committed on-disk values (via check_store) AND
993            // nodes already buffered in tx.pending_ops (but not yet committed).
994            // Without the pending-ops check, two nodes with the same constrained
995            // value in one CREATE statement would both pass the on-disk check and
996            // then be committed together, violating the constraint.
997            {
998                let constraints = self.inner.unique_constraints.read().unwrap();
999                if !constraints.is_empty() {
1000                    let check_store = NodeStore::open(&self.inner.path)?;
1001                    for (prop_name, val) in &named_props {
1002                        let col_id = sparrowdb_common::col_id_of(prop_name);
1003                        if constraints.contains(&(label_id, col_id)) {
1004                            // Check committed on-disk values.
1005                            // Propagate I/O errors — silencing them would disable
1006                            // the constraint check on read failure.
1007                            let existing_raws = check_store.read_col_all(label_id, col_id)?;
1008                            let conflict_on_disk = existing_raws.iter().any(|&raw| {
1009                                if raw == 0 || raw == u64::MAX {
1010                                    return false;
1011                                }
1012                                check_store.decode_raw_value(raw) == *val
1013                            });
1014                            // Check nodes buffered earlier in this same statement.
1015                            let conflict_in_tx = tx.pending_ops.iter().any(|op| match op {
1016                                PendingOp::NodeCreate {
1017                                    label_id: pending_label_id,
1018                                    props,
1019                                    ..
1020                                } => {
1021                                    *pending_label_id == label_id
1022                                        && props.iter().any(|(existing_col_id, existing_val)| {
1023                                            *existing_col_id == col_id && *existing_val == *val
1024                                        })
1025                                }
1026                                _ => false,
1027                            });
1028                            if conflict_on_disk || conflict_in_tx {
1029                                return Err(sparrowdb_common::Error::InvalidArgument(format!(
1030                                    "UNIQUE constraint violation: label \"{label}\" already has a node with {prop_name} = {:?}", val
1031                                )));
1032                            }
1033                        }
1034                    }
1035                }
1036            }
1037
1038            let node_id = tx.create_node_named(label_id, &named_props)?;
1039
1040            // Record the binding so edge patterns can resolve (src_var, dst_var).
1041            if !node.var.is_empty() {
1042                var_to_node.insert(node.var.clone(), node_id);
1043            }
1044        }
1045
1046        // Create edges between the freshly-created nodes.
1047        for (left_var, rel_pat, right_var) in &create.edges {
1048            let src = var_to_node.get(left_var).copied().ok_or_else(|| {
1049                sparrowdb_common::Error::InvalidArgument(format!(
1050                    "CREATE edge references unresolved variable '{left_var}'"
1051                ))
1052            })?;
1053            let dst = var_to_node.get(right_var).copied().ok_or_else(|| {
1054                sparrowdb_common::Error::InvalidArgument(format!(
1055                    "CREATE edge references unresolved variable '{right_var}'"
1056                ))
1057            })?;
1058            // Convert rel_pat.props (AST PropEntries) to HashMap<String, Value>.
1059            let edge_props: HashMap<String, Value> = rel_pat
1060                .props
1061                .iter()
1062                .map(|pe| {
1063                    let val = match &pe.value {
1064                        sparrowdb_cypher::ast::Expr::Literal(lit) => literal_to_value(lit),
1065                        _ => {
1066                            return Err(sparrowdb_common::Error::InvalidArgument(format!(
1067                                "CREATE edge property '{}' must be a literal value",
1068                                pe.key
1069                            )))
1070                        }
1071                    };
1072                    Ok((pe.key.clone(), val))
1073                })
1074                .collect::<Result<HashMap<_, _>>>()?;
1075            tx.create_edge(src, dst, &rel_pat.rel_type, edge_props)?;
1076        }
1077
1078        tx.commit()?;
1079        self.invalidate_prop_index();
1080        self.invalidate_catalog();
1081        Ok(QueryResult::empty(vec![]))
1082    }
1083
1084    /// Execute a Cypher query with runtime parameter bindings.
1085    ///
1086    /// This is the parameterised variant of [`execute`].  Parameters are
1087    /// supplied as a `HashMap<String, sparrowdb_execution::Value>` and are
1088    /// available inside the query as `$name` expressions.
1089    ///
1090    /// # Example
1091    ///
1092    /// ```no_run
1093    /// use sparrowdb::GraphDb;
1094    /// use sparrowdb_execution::Value;
1095    /// use std::collections::HashMap;
1096    ///
1097    /// let db = GraphDb::open(std::path::Path::new("/tmp/my.sparrow")).unwrap();
1098    /// let mut params = HashMap::new();
1099    /// params.insert("names".into(), Value::List(vec![
1100    ///     Value::String("Alice".into()),
1101    ///     Value::String("Bob".into()),
1102    /// ]));
1103    /// let result = db.execute_with_params("UNWIND $names AS name RETURN name", params).unwrap();
1104    /// ```
1105    pub fn execute_with_params(
1106        &self,
1107        cypher: &str,
1108        params: HashMap<String, sparrowdb_execution::Value>,
1109    ) -> Result<QueryResult> {
1110        use sparrowdb_cypher::{bind, parse};
1111
1112        let stmt = parse(cypher)?;
1113        let catalog_snap = self.catalog_snapshot();
1114        let bound = bind(stmt, &catalog_snap)?;
1115
1116        use sparrowdb_cypher::ast::Statement;
1117        if let Statement::CreateConstraint { label, property } = &bound.inner {
1118            return self.register_unique_constraint(label, property);
1119        }
1120        if Engine::is_mutation(&bound.inner) {
1121            // Route mutations through params-aware helpers (SPA-218).
1122            use sparrowdb_cypher::ast::Statement as Stmt;
1123            return match bound.inner {
1124                Stmt::Merge(ref m) => self.execute_merge_with_params(m, &params),
1125                Stmt::MatchMutate(ref mm) => self.execute_match_mutate_with_params(mm, &params),
1126                _ => Err(Error::InvalidArgument(
1127                    "execute_with_params: parameterized MATCH...CREATE and standalone CREATE \
1128                     are not yet supported; use MERGE or MATCH...SET with $params"
1129                        .into(),
1130                )),
1131            };
1132        }
1133
1134        let _span = info_span!("sparrowdb.query_with_params").entered();
1135
1136        let mut engine = {
1137            let _open_span = info_span!("sparrowdb.open_engine").entered();
1138            let csrs = self.cached_csr_map();
1139            self.build_read_engine(NodeStore::open(&self.inner.path)?, catalog_snap, csrs)
1140                .with_params(params)
1141        };
1142
1143        let result = {
1144            let _exec_span = info_span!("sparrowdb.execute").entered();
1145            engine.execute_statement(bound.inner)?
1146        };
1147
1148        engine.write_back_prop_index(&self.inner.prop_index);
1149
1150        tracing::debug!(rows = result.rows.len(), "query_with_params complete");
1151        Ok(result)
1152    }
1153
1154    /// Internal: execute a MERGE statement by opening a write transaction.
1155    fn execute_merge(&self, m: &sparrowdb_cypher::ast::MergeStatement) -> Result<QueryResult> {
1156        let props: HashMap<String, Value> = m
1157            .props
1158            .iter()
1159            .map(|pe| (pe.key.clone(), expr_to_value(&pe.value)))
1160            .collect();
1161        let mut tx = self.begin_write()?;
1162        let node_id = tx.merge_node(&m.label, props.clone())?;
1163        tx.commit()?;
1164        self.invalidate_prop_index();
1165        self.invalidate_catalog();
1166
1167        // If the statement has a RETURN clause, project the merged node's properties.
1168        if let Some(ref ret) = m.return_clause {
1169            use sparrowdb_cypher::ast::Expr;
1170            // Use the execution-layer Value type for building QueryResult rows.
1171            type ExecValue = sparrowdb_execution::Value;
1172
1173            let var = if m.var.is_empty() {
1174                "n"
1175            } else {
1176                m.var.as_str()
1177            };
1178
1179            // Collect all property names referenced in the RETURN clause so we
1180            // can look them up by col_id from the actual on-disk node state.
1181            // This is correct even when the node already existed with extra
1182            // properties not present in the merge pattern (SPA-215 / CodeAnt bug).
1183            let return_props: Vec<String> = ret
1184                .items
1185                .iter()
1186                .filter_map(|item| match &item.expr {
1187                    Expr::PropAccess { var: v, prop } if v.as_str() == var => Some(prop.clone()),
1188                    _ => None,
1189                })
1190                .collect();
1191
1192            // Derive col_ids for every property name referenced in RETURN.
1193            let return_col_ids: Vec<u32> =
1194                return_props.iter().map(|name| fnv1a_col_id(name)).collect();
1195
1196            // Read the actual on-disk node state via NodeStore::get_node, which
1197            // uses Value::from_u64 (type-tag-aware decoding) to correctly
1198            // reconstruct Bytes/String values rather than misinterpreting them as
1199            // raw integers.  We open a fresh NodeStore after the write committed
1200            // so we see the fully-merged node state.
1201            let store = NodeStore::open(&self.inner.path)?;
1202            let stored = store.get_node(node_id, &return_col_ids).unwrap_or_default();
1203
1204            // Build an eval map: "{var}.{prop_name}" -> ExecValue from the
1205            // actual stored values rather than the input pattern props.
1206            let mut row_vals: HashMap<String, ExecValue> = HashMap::new();
1207            for (prop_name, col_id) in return_props.iter().zip(return_col_ids.iter()) {
1208                if let Some((_, val)) = stored.iter().find(|(c, _)| c == col_id) {
1209                    let exec_val = storage_value_to_exec(val);
1210                    row_vals.insert(format!("{var}.{prop_name}"), exec_val);
1211                }
1212            }
1213
1214            // Derive column names from the RETURN items.
1215            let columns: Vec<String> = ret
1216                .items
1217                .iter()
1218                .map(|item| {
1219                    item.alias.clone().unwrap_or_else(|| match &item.expr {
1220                        Expr::PropAccess { var: v, prop } => format!("{v}.{prop}"),
1221                        Expr::Var(v) => v.clone(),
1222                        _ => "?".to_string(),
1223                    })
1224                })
1225                .collect();
1226
1227            // Evaluate each RETURN expression using the actual-state prop map.
1228            let row: Vec<ExecValue> = ret
1229                .items
1230                .iter()
1231                .map(|item| eval_expr_merge(&item.expr, &row_vals))
1232                .collect();
1233
1234            return Ok(QueryResult {
1235                columns,
1236                rows: vec![row],
1237            });
1238        }
1239
1240        Ok(QueryResult::empty(vec![]))
1241    }
1242
1243    /// Params-aware MERGE with $param support (SPA-218).
1244    fn execute_merge_with_params(
1245        &self,
1246        m: &sparrowdb_cypher::ast::MergeStatement,
1247        params: &HashMap<String, sparrowdb_execution::Value>,
1248    ) -> Result<QueryResult> {
1249        let props: HashMap<String, Value> = m
1250            .props
1251            .iter()
1252            .map(|pe| {
1253                let val = expr_to_value_with_params(&pe.value, params)?;
1254                Ok((pe.key.clone(), val))
1255            })
1256            .collect::<Result<HashMap<_, _>>>()?;
1257        let mut tx = self.begin_write()?;
1258        let node_id = tx.merge_node(&m.label, props.clone())?;
1259        tx.commit()?;
1260        self.invalidate_prop_index();
1261        self.invalidate_catalog();
1262        if let Some(ref ret) = m.return_clause {
1263            use sparrowdb_cypher::ast::Expr;
1264            type ExecValue = sparrowdb_execution::Value;
1265            let var = if m.var.is_empty() {
1266                "n"
1267            } else {
1268                m.var.as_str()
1269            };
1270            let return_props: Vec<String> = ret
1271                .items
1272                .iter()
1273                .filter_map(|item| match &item.expr {
1274                    Expr::PropAccess { var: v, prop } if v.as_str() == var => Some(prop.clone()),
1275                    _ => None,
1276                })
1277                .collect();
1278            let return_col_ids: Vec<u32> =
1279                return_props.iter().map(|name| fnv1a_col_id(name)).collect();
1280            let store = NodeStore::open(&self.inner.path)?;
1281            let stored = store.get_node(node_id, &return_col_ids).unwrap_or_default();
1282            let mut row_vals: HashMap<String, ExecValue> = HashMap::new();
1283            for (prop_name, col_id) in return_props.iter().zip(return_col_ids.iter()) {
1284                if let Some((_, val)) = stored.iter().find(|(c, _)| c == col_id) {
1285                    row_vals.insert(format!("{var}.{prop_name}"), storage_value_to_exec(val));
1286                }
1287            }
1288            let columns: Vec<String> = ret
1289                .items
1290                .iter()
1291                .map(|item| {
1292                    item.alias.clone().unwrap_or_else(|| match &item.expr {
1293                        Expr::PropAccess { var: v, prop } => format!("{v}.{prop}"),
1294                        Expr::Var(v) => v.clone(),
1295                        _ => "?".to_string(),
1296                    })
1297                })
1298                .collect();
1299            let row: Vec<ExecValue> = ret
1300                .items
1301                .iter()
1302                .map(|item| eval_expr_merge(&item.expr, &row_vals))
1303                .collect();
1304            return Ok(QueryResult {
1305                columns,
1306                rows: vec![row],
1307            });
1308        }
1309        Ok(QueryResult::empty(vec![]))
1310    }
1311
1312    /// Params-aware MATCH...SET with $param support (SPA-218).
1313    fn execute_match_mutate_with_params(
1314        &self,
1315        mm: &sparrowdb_cypher::ast::MatchMutateStatement,
1316        params: &HashMap<String, sparrowdb_execution::Value>,
1317    ) -> Result<QueryResult> {
1318        let mut tx = self.begin_write()?;
1319        let csrs = self.cached_csr_map();
1320        let engine = Engine::new(
1321            NodeStore::open(&self.inner.path)?,
1322            self.catalog_snapshot(),
1323            csrs,
1324            &self.inner.path,
1325        );
1326
1327        // SPA-219: `MATCH (a)-[r:REL]->(b) DELETE r` — edge delete path.
1328        if is_edge_delete_mutation(mm) {
1329            let edges = engine.scan_match_mutate_edges(mm)?;
1330            for (src, dst, rel_type) in edges {
1331                tx.delete_edge(src, dst, &rel_type)?;
1332            }
1333            tx.commit()?;
1334            self.invalidate_csr_map();
1335            self.invalidate_prop_index();
1336            self.invalidate_catalog();
1337            return Ok(QueryResult::empty(vec![]));
1338        }
1339
1340        let matching_ids = engine.scan_match_mutate(mm)?;
1341        if matching_ids.is_empty() {
1342            return Ok(QueryResult::empty(vec![]));
1343        }
1344        match &mm.mutation {
1345            sparrowdb_cypher::ast::Mutation::Set { prop, value, .. } => {
1346                let sv = expr_to_value_with_params(value, params)?;
1347                for node_id in matching_ids {
1348                    tx.set_property(node_id, prop, sv.clone())?;
1349                }
1350            }
1351            sparrowdb_cypher::ast::Mutation::Delete { .. } => {
1352                for node_id in matching_ids {
1353                    tx.delete_node(node_id)?;
1354                }
1355            }
1356        }
1357        tx.commit()?;
1358        self.invalidate_prop_index();
1359        self.invalidate_catalog();
1360        Ok(QueryResult::empty(vec![]))
1361    }
1362
1363    /// Internal: execute a MATCH … SET / DELETE by scanning then writing.
1364    ///
1365    /// The write lock is acquired **before** the scan so that no concurrent
1366    /// writer can commit between the scan and the mutation, preventing stale
1367    /// matches from being mutated.
1368    fn execute_match_mutate(
1369        &self,
1370        mm: &sparrowdb_cypher::ast::MatchMutateStatement,
1371    ) -> Result<QueryResult> {
1372        // Acquire the write lock first.  From this point on no other writer
1373        // can commit until we call tx.commit() or drop tx.
1374        let mut tx = self.begin_write()?;
1375
1376        // Build an Engine that reads from the same on-disk snapshot the write
1377        // transaction was opened against.  Because we hold the write lock,
1378        // the data on disk cannot change between this scan and the mutations
1379        // below.
1380        let csrs = self.cached_csr_map();
1381        let engine = Engine::new(
1382            NodeStore::open(&self.inner.path)?,
1383            self.catalog_snapshot(),
1384            csrs,
1385            &self.inner.path,
1386        );
1387
1388        // SPA-219: `MATCH (a)-[r:REL]->(b) DELETE r` — edge delete path.
1389        if is_edge_delete_mutation(mm) {
1390            let edges = engine.scan_match_mutate_edges(mm)?;
1391            for (src, dst, rel_type) in edges {
1392                tx.delete_edge(src, dst, &rel_type)?;
1393            }
1394            tx.commit()?;
1395            self.invalidate_csr_map();
1396            self.invalidate_prop_index();
1397            self.invalidate_catalog();
1398            return Ok(QueryResult::empty(vec![]));
1399        }
1400
1401        // Collect matching node ids via the engine's scan (lock already held).
1402        let matching_ids = engine.scan_match_mutate(mm)?;
1403
1404        if matching_ids.is_empty() {
1405            return Ok(QueryResult::empty(vec![]));
1406        }
1407
1408        match &mm.mutation {
1409            sparrowdb_cypher::ast::Mutation::Set { prop, value, .. } => {
1410                let sv = expr_to_value(value);
1411                for node_id in matching_ids {
1412                    tx.set_property(node_id, prop, sv.clone())?;
1413                }
1414            }
1415            sparrowdb_cypher::ast::Mutation::Delete { .. } => {
1416                for node_id in matching_ids {
1417                    tx.delete_node(node_id)?;
1418                }
1419            }
1420        }
1421
1422        tx.commit()?;
1423        self.invalidate_prop_index();
1424        self.invalidate_catalog();
1425        Ok(QueryResult::empty(vec![]))
1426    }
1427
1428    /// Internal: execute a `MATCH … CREATE (a)-[:R]->(b)` statement.
1429    ///
1430    /// 1. Acquires the write lock (preventing concurrent writes).
1431    /// 2. Opens a read-capable Engine to execute the MATCH clause, producing
1432    ///    one correlated binding row per match result (`var → NodeId`).
1433    /// 3. For each result row, calls `WriteTx::create_edge` using the exact
1434    ///    node IDs bound by that row — never re-scans or builds a Cartesian
1435    ///    product across all node candidates (SPA-183).
1436    /// 4. Commits the write transaction.
1437    ///
1438    /// If the MATCH finds no rows the CREATE is a no-op (no edges created,
1439    /// no error — SPA-168).
1440    fn execute_match_create(
1441        &self,
1442        mc: &sparrowdb_cypher::ast::MatchCreateStatement,
1443    ) -> Result<QueryResult> {
1444        // Acquire the write lock first so no concurrent writer can commit
1445        // between the scan and the edge creation.
1446        let mut tx = self.begin_write()?;
1447
1448        // Build an Engine for the read-scan phase.
1449        //
1450        // Use build_read_engine so that the lazy property index and cached
1451        // label row counts are reused.  MATCH…CREATE only creates edges — it
1452        // never changes node property columns — so the cached data remains
1453        // valid across calls.
1454        let csrs = self.cached_csr_map();
1455        let engine = self.build_read_engine(
1456            NodeStore::open(&self.inner.path)?,
1457            self.catalog_snapshot(),
1458            csrs,
1459        );
1460
1461        // SPA-208: reject reserved __SO_ rel-type prefix on edges in the CREATE clause.
1462        // Check before the Unimplemented guard so the reserved-name error takes priority.
1463        for (_, rel_pat, _) in &mc.create.edges {
1464            if is_reserved_label(&rel_pat.rel_type) {
1465                return Err(reserved_label_error(&rel_pat.rel_type));
1466            }
1467        }
1468
1469        // MATCH…CREATE only supports edge creation from matched bindings.
1470        // Standalone node creation (CREATE clause with no edges) is not yet
1471        // implemented.  Note: the parser includes edge-endpoint nodes in
1472        // mc.create.nodes even for pure edge patterns, so we guard on
1473        // mc.create.edges being empty rather than mc.create.nodes being
1474        // non-empty (SPA-183 had the wrong guard).
1475        if mc.create.edges.is_empty() {
1476            return Err(Error::Unimplemented);
1477        }
1478
1479        // Execute the MATCH clause to get correlated binding rows.
1480        // Each row is a HashMap<variable_name, NodeId> representing one
1481        // matched combination.  This replaces the old `scan_match_create`
1482        // approach which collected candidates per variable independently and
1483        // then took a full Cartesian product — causing N² edge creation when
1484        // multiple nodes of the same label existed (SPA-183).
1485
1486        let matched_rows = engine.scan_match_create_rows(mc)?;
1487
1488        // Write any newly-loaded index columns back to the shared cache so
1489        // subsequent MATCH…CREATE calls can reuse them without re-reading disk.
1490        // Node property columns are NOT changed by this operation (only edges
1491        // are created), so the cache remains valid.
1492        engine.write_back_prop_index(&self.inner.prop_index);
1493
1494        if matched_rows.is_empty() {
1495            return Ok(QueryResult::empty(vec![]));
1496        }
1497
1498        // For each matched row, create every edge declared in the CREATE clause
1499        // using the NodeIds bound in that specific row.
1500        for row in &matched_rows {
1501            for (left_var, rel_pat, right_var) in &mc.create.edges {
1502                let src = row.get(left_var).copied().ok_or_else(|| {
1503                    Error::InvalidArgument(format!(
1504                        "CREATE references unbound variable: {left_var}"
1505                    ))
1506                })?;
1507                let dst = row.get(right_var).copied().ok_or_else(|| {
1508                    Error::InvalidArgument(format!(
1509                        "CREATE references unbound variable: {right_var}"
1510                    ))
1511                })?;
1512                let edge_props: HashMap<String, Value> = rel_pat
1513                    .props
1514                    .iter()
1515                    .map(|pe| {
1516                        let val = match &pe.value {
1517                            sparrowdb_cypher::ast::Expr::Literal(lit) => literal_to_value(lit),
1518                            _ => {
1519                                return Err(Error::InvalidArgument(format!(
1520                                    "CREATE edge property '{}' must be a literal value",
1521                                    pe.key
1522                                )))
1523                            }
1524                        };
1525                        Ok((pe.key.clone(), val))
1526                    })
1527                    .collect::<Result<HashMap<_, _>>>()?;
1528                tx.create_edge(src, dst, &rel_pat.rel_type, edge_props)?;
1529            }
1530        }
1531
1532        tx.commit()?;
1533        // Note: do NOT call invalidate_prop_index() here — MATCH…CREATE only
1534        // creates edges, not node properties, so the cached column data remains
1535        // valid.  Only invalidate the catalog in case a new rel-type was registered.
1536        self.invalidate_catalog();
1537        Ok(QueryResult::empty(vec![]))
1538    }
1539
1540    /// Execute `MATCH … MERGE (a)-[r:TYPE]->(b)`: find-or-create a relationship.
1541    ///
1542    /// Algorithm (SPA-233):
1543    /// 1. MATCH the node patterns to get correlated (src, dst) NodeId pairs.
1544    /// 2. For each pair, look up the rel-type in the catalog.  If the rel table
1545    ///    does not exist yet, the relationship cannot exist → create it.
1546    /// 3. If the rel table exists, scan the delta log and (if checkpointed) the
1547    ///    CSR forward file to check whether a (src_slot, dst_slot) edge already
1548    ///    exists for this rel type.
1549    /// 4. If no existing edge is found, call `create_edge` to create it.
1550    ///
1551    /// The write lock is acquired before the scan so that no concurrent writer
1552    /// can race between the existence check and the edge creation.
1553    fn execute_match_merge_rel(
1554        &self,
1555        mm: &sparrowdb_cypher::ast::MatchMergeRelStatement,
1556    ) -> Result<QueryResult> {
1557        use sparrowdb_storage::edge_store::EdgeStore;
1558
1559        // Acquire the write lock first.
1560        let mut tx = self.begin_write()?;
1561
1562        // Build an Engine for the read-scan phase.
1563        let csrs = self.cached_csr_map();
1564        let engine = Engine::new(
1565            NodeStore::open(&self.inner.path)?,
1566            self.catalog_snapshot(),
1567            csrs,
1568            &self.inner.path,
1569        );
1570
1571        // Guard: reject reserved rel-type prefix used by internal system edges.
1572        if mm.rel_type.starts_with("__SO_") {
1573            return Err(Error::InvalidArgument(format!(
1574                "relationship type '{}' is reserved",
1575                mm.rel_type
1576            )));
1577        }
1578
1579        // Scan MATCH patterns to get correlated (src_var, dst_var) NodeId rows.
1580        let matched_rows = engine.scan_match_merge_rel_rows(mm)?;
1581        if matched_rows.is_empty() {
1582            // No matched nodes → nothing to merge; this is a no-op (not an error).
1583            return Ok(QueryResult::empty(vec![]));
1584        }
1585
1586        for row in &matched_rows {
1587            // Resolve the bound node IDs for this row.
1588            let src = if mm.src_var.is_empty() {
1589                // Anonymous source — cannot merge without a variable binding.
1590                return Err(Error::InvalidArgument(
1591                    "MERGE relationship pattern source variable is anonymous; \
1592                     use a named variable bound by the MATCH clause"
1593                        .into(),
1594                ));
1595            } else {
1596                *row.get(&mm.src_var).ok_or_else(|| {
1597                    Error::InvalidArgument(format!(
1598                        "MERGE references unbound source variable: {}",
1599                        mm.src_var
1600                    ))
1601                })?
1602            };
1603
1604            let dst = if mm.dst_var.is_empty() {
1605                return Err(Error::InvalidArgument(
1606                    "MERGE relationship pattern destination variable is anonymous; \
1607                     use a named variable bound by the MATCH clause"
1608                        .into(),
1609                ));
1610            } else {
1611                *row.get(&mm.dst_var).ok_or_else(|| {
1612                    Error::InvalidArgument(format!(
1613                        "MERGE references unbound destination variable: {}",
1614                        mm.dst_var
1615                    ))
1616                })?
1617            };
1618
1619            // Derive label IDs from the packed NodeIds.
1620            let src_label_id = (src.0 >> 32) as u16;
1621            let dst_label_id = (dst.0 >> 32) as u16;
1622            let src_slot = src.0 & 0xFFFF_FFFF;
1623            let dst_slot = dst.0 & 0xFFFF_FFFF;
1624
1625            // Look up whether a rel table for this type already exists in the catalog.
1626            // `get_rel_table` returns `Ok(None)` if no such table is registered yet,
1627            // meaning no edge of this type can exist.
1628            let catalog_rel_id_opt =
1629                tx.catalog
1630                    .get_rel_table(src_label_id, dst_label_id, &mm.rel_type)?;
1631
1632            let edge_already_exists = if let Some(catalog_rel_id) = catalog_rel_id_opt {
1633                let rel_table_id = RelTableId(catalog_rel_id as u32);
1634
1635                if let Ok(store) = EdgeStore::open(&self.inner.path, rel_table_id) {
1636                    // Check delta log.
1637                    let in_delta = store
1638                        .read_delta()?
1639                        .iter()
1640                        .any(|rec| rec.src.0 == src.0 && rec.dst.0 == dst.0);
1641
1642                    // Check CSR base (post-checkpoint edges).
1643                    let in_csr = if let Ok(csr) = store.open_fwd() {
1644                        csr.neighbors(src_slot).contains(&dst_slot)
1645                    } else {
1646                        false
1647                    };
1648
1649                    // Also check pending (uncommitted in this same tx) edge creates.
1650                    let in_pending = tx.pending_ops.iter().any(|op| {
1651                        matches!(
1652                            op,
1653                            PendingOp::EdgeCreate {
1654                                src: ps,
1655                                dst: pd,
1656                                rel_table_id: prt,
1657                                ..
1658                            } if *ps == src && *pd == dst && *prt == rel_table_id
1659                        )
1660                    });
1661
1662                    in_delta || in_csr || in_pending
1663                } else {
1664                    false
1665                }
1666            } else {
1667                false
1668            };
1669
1670            if !edge_already_exists {
1671                tx.create_edge(src, dst, &mm.rel_type, HashMap::new())?;
1672            }
1673        }
1674
1675        tx.commit()?;
1676        self.invalidate_prop_index();
1677        self.invalidate_catalog();
1678        Ok(QueryResult::empty(vec![]))
1679    }
1680
1681    /// Create (or overwrite) a named full-text index.
1682    ///
1683    /// Creates the on-disk backing file for the named index so that
1684    /// `CALL db.index.fulltext.queryNodes(name, query)` can find it.
1685    /// If an index with the same name already exists its contents are cleared
1686    /// (overwrite semantics).  Document ingestion happens separately via
1687    /// [`WriteTx::add_to_fulltext_index`].
1688    ///
1689    /// Acquires the writer lock — returns [`Error::WriterBusy`] if a
1690    /// [`WriteTx`] is already active.
1691    ///
1692    /// # Example
1693    /// ```no_run
1694    /// # use sparrowdb::GraphDb;
1695    /// # let db = GraphDb::open(std::path::Path::new("/tmp/test")).unwrap();
1696    /// db.create_fulltext_index("searchIndex")?;
1697    /// # Ok::<(), sparrowdb_common::Error>(())
1698    /// ```
1699    pub fn create_fulltext_index(&self, name: &str) -> Result<()> {
1700        use sparrowdb_storage::fulltext_index::FulltextIndex;
1701        // Acquire the writer lock so this cannot race with an active WriteTx
1702        // that is reading or flushing the same index file.
1703        let _guard = WriteGuard::try_acquire(&self.inner).ok_or(Error::WriterBusy)?;
1704        FulltextIndex::create(&self.inner.path, name)?;
1705        Ok(())
1706    }
1707
1708    /// Execute multiple Cypher queries as a single atomic batch.
1709    ///
1710    /// All queries share one [`WriteTx`] and a **single WAL fsync** at the end,
1711    /// making this significantly faster than N individual [`execute`] calls when
1712    /// ingesting many nodes (e.g. Neo4j import, bulk data load).
1713    ///
1714    /// ## Semantics
1715    ///
1716    /// * **Atomicity** — if any query in the batch fails, the entire batch is
1717    ///   rolled back (the [`WriteTx`] is dropped without committing).
1718    /// * **Ordering** — queries are executed in slice order; each query sees the
1719    ///   mutations of all preceding queries in the batch.
1720    /// * **Single fsync** — the WAL is synced exactly once, after the last
1721    ///   query in the batch has been applied successfully.
1722    ///
1723    /// ## Restrictions
1724    ///
1725    /// * Only **write** statements are accepted (CREATE, MERGE, MATCH…SET,
1726    ///   MATCH…DELETE, MATCH…CREATE).  Read-only `MATCH … RETURN` queries are
1727    ///   allowed but their results are included in the returned `Vec`.
1728    /// * `CHECKPOINT` and `OPTIMIZE` inside a batch are executed immediately
1729    ///   and do **not** participate in the shared transaction; they act as
1730    ///   no-ops for atomicity purposes.
1731    /// * Parameters are not supported — use [`execute_with_params`] for that.
1732    ///
1733    /// ## Example
1734    ///
1735    /// ```no_run
1736    /// use sparrowdb::GraphDb;
1737    ///
1738    /// let db = GraphDb::open(std::path::Path::new("/tmp/batch.sparrow")).unwrap();
1739    ///
1740    /// let queries: Vec<&str> = (0..100)
1741    ///     .map(|i| Box::leak(format!("CREATE (n:Person {{id: {i}}})", i = i).into_boxed_str()) as &str)
1742    ///     .collect();
1743    ///
1744    /// let results = db.execute_batch(&queries).unwrap();
1745    /// assert_eq!(results.len(), 100);
1746    /// ```
1747    ///
1748    /// [`execute`]: GraphDb::execute
1749    /// [`execute_with_params`]: GraphDb::execute_with_params
1750    pub fn execute_batch(&self, queries: &[&str]) -> Result<Vec<QueryResult>> {
1751        use sparrowdb_cypher::ast::Statement;
1752        use sparrowdb_cypher::{bind, parse};
1753
1754        if queries.is_empty() {
1755            return Ok(Vec::new());
1756        }
1757
1758        // Pre-parse and bind all queries before acquiring the writer lock.
1759        // A syntax or bind error fails fast without ever locking.
1760        let catalog_snap = self.catalog_snapshot();
1761        let bound_stmts: Vec<_> = queries
1762            .iter()
1763            .map(|q| {
1764                let stmt = parse(q)?;
1765                bind(stmt, &catalog_snap)
1766            })
1767            .collect::<Result<Vec<_>>>()?;
1768
1769        // CHECKPOINT and OPTIMIZE cannot run inside a WriteTx (they acquire
1770        // the writer lock themselves).  Execute them eagerly in document order
1771        // and record placeholder results; they do not participate in the
1772        // shared transaction's atomicity.
1773        let mut early_results: Vec<Option<QueryResult>> = vec![None; bound_stmts.len()];
1774        for (i, bound) in bound_stmts.iter().enumerate() {
1775            match &bound.inner {
1776                Statement::Checkpoint => {
1777                    self.checkpoint()?;
1778                    early_results[i] = Some(QueryResult::empty(vec![]));
1779                }
1780                Statement::Optimize => {
1781                    self.optimize()?;
1782                    early_results[i] = Some(QueryResult::empty(vec![]));
1783                }
1784                Statement::CreateConstraint { label, property } => {
1785                    early_results[i] = Some(self.register_unique_constraint(label, property)?);
1786                }
1787                _ => {}
1788            }
1789        }
1790
1791        // Acquire a single WriteTx for all structural + property mutations.
1792        let mut tx = self.begin_write()?;
1793        let mut results: Vec<Option<QueryResult>> = vec![None; bound_stmts.len()];
1794
1795        for (i, bound) in bound_stmts.into_iter().enumerate() {
1796            if early_results[i].is_some() {
1797                continue;
1798            }
1799
1800            let result = if Engine::is_mutation(&bound.inner) {
1801                self.execute_batch_mutation(bound.inner, &mut tx)?
1802            } else {
1803                // Read-only statement: execute against current snapshot.
1804                let csrs = self.cached_csr_map();
1805                let batch_catalog = self.catalog_snapshot();
1806                let mut engine = Engine::new(
1807                    NodeStore::open(&self.inner.path)?,
1808                    batch_catalog,
1809                    csrs,
1810                    &self.inner.path,
1811                );
1812                engine.execute_statement(bound.inner)?
1813            };
1814            results[i] = Some(result);
1815        }
1816
1817        // Single fsync commit — all mutations land in one WAL transaction.
1818        tx.commit()?;
1819        self.invalidate_prop_index();
1820        self.invalidate_catalog();
1821
1822        // Merge early (CHECKPOINT/OPTIMIZE) and mutation results in order.
1823        let final_results = results
1824            .into_iter()
1825            .enumerate()
1826            .map(|(i, r)| {
1827                r.or_else(|| early_results[i].take())
1828                    .unwrap_or_else(|| QueryResult::empty(vec![]))
1829            })
1830            .collect();
1831
1832        Ok(final_results)
1833    }
1834
1835    /// Internal: apply a single mutation statement to an already-open [`WriteTx`].
1836    ///
1837    /// Called by [`execute_batch`] to accumulate multiple mutations into one
1838    /// transaction (and therefore one WAL fsync on commit).
1839    fn execute_batch_mutation(
1840        &self,
1841        stmt: sparrowdb_cypher::ast::Statement,
1842        tx: &mut WriteTx,
1843    ) -> Result<QueryResult> {
1844        use sparrowdb_cypher::ast::Statement;
1845
1846        match stmt {
1847            Statement::Create(ref c) => {
1848                // Inline the logic from execute_create_standalone using the
1849                // shared tx rather than opening a new WriteTx.
1850                let declared_vars: HashSet<&str> = c
1851                    .nodes
1852                    .iter()
1853                    .filter(|n| !n.var.is_empty())
1854                    .map(|n| n.var.as_str())
1855                    .collect();
1856                for (left_var, _, right_var) in &c.edges {
1857                    if !left_var.is_empty() && !declared_vars.contains(left_var.as_str()) {
1858                        return Err(sparrowdb_common::Error::InvalidArgument(format!(
1859                            "CREATE edge references undeclared variable '{left_var}'"
1860                        )));
1861                    }
1862                    if !right_var.is_empty() && !declared_vars.contains(right_var.as_str()) {
1863                        return Err(sparrowdb_common::Error::InvalidArgument(format!(
1864                            "CREATE edge references undeclared variable '{right_var}'"
1865                        )));
1866                    }
1867                }
1868                for node in &c.nodes {
1869                    if let Some(label) = node.labels.first() {
1870                        if is_reserved_label(label) {
1871                            return Err(reserved_label_error(label));
1872                        }
1873                    }
1874                }
1875                for (_, rel_pat, _) in &c.edges {
1876                    if is_reserved_label(&rel_pat.rel_type) {
1877                        return Err(reserved_label_error(&rel_pat.rel_type));
1878                    }
1879                }
1880
1881                let mut var_to_node: HashMap<String, NodeId> = HashMap::new();
1882                for node in &c.nodes {
1883                    let label = node.labels.first().cloned().unwrap_or_default();
1884                    let label_id: u32 = match tx.catalog.get_label(&label)? {
1885                        Some(id) => id as u32,
1886                        None => tx.catalog.create_label(&label)? as u32,
1887                    };
1888                    let named_props: Vec<(String, Value)> = node
1889                        .props
1890                        .iter()
1891                        .map(|entry| {
1892                            let val = match &entry.value {
1893                                sparrowdb_cypher::ast::Expr::Literal(
1894                                    sparrowdb_cypher::ast::Literal::Null,
1895                                ) => Err(sparrowdb_common::Error::InvalidArgument(format!(
1896                                    "CREATE property '{}' is null",
1897                                    entry.key
1898                                ))),
1899                                sparrowdb_cypher::ast::Expr::Literal(
1900                                    sparrowdb_cypher::ast::Literal::Param(p),
1901                                ) => Err(sparrowdb_common::Error::InvalidArgument(format!(
1902                                    "CREATE property '{}' references parameter ${p}",
1903                                    entry.key
1904                                ))),
1905                                sparrowdb_cypher::ast::Expr::Literal(lit) => {
1906                                    Ok(literal_to_value(lit))
1907                                }
1908                                _ => Err(sparrowdb_common::Error::InvalidArgument(format!(
1909                                    "CREATE property '{}' must be a literal",
1910                                    entry.key
1911                                ))),
1912                            }?;
1913                            Ok((entry.key.clone(), val))
1914                        })
1915                        .collect::<Result<Vec<_>>>()?;
1916                    let node_id = tx.create_node_named(label_id, &named_props)?;
1917                    if !node.var.is_empty() {
1918                        var_to_node.insert(node.var.clone(), node_id);
1919                    }
1920                }
1921                for (left_var, rel_pat, right_var) in &c.edges {
1922                    let src = var_to_node.get(left_var).copied().ok_or_else(|| {
1923                        sparrowdb_common::Error::InvalidArgument(format!(
1924                            "CREATE edge references unresolved variable '{left_var}'"
1925                        ))
1926                    })?;
1927                    let dst = var_to_node.get(right_var).copied().ok_or_else(|| {
1928                        sparrowdb_common::Error::InvalidArgument(format!(
1929                            "CREATE edge references unresolved variable '{right_var}'"
1930                        ))
1931                    })?;
1932                    tx.create_edge(src, dst, &rel_pat.rel_type, HashMap::new())?;
1933                }
1934                Ok(QueryResult::empty(vec![]))
1935            }
1936
1937            Statement::Merge(ref m) => {
1938                let props: HashMap<String, Value> = m
1939                    .props
1940                    .iter()
1941                    .map(|pe| (pe.key.clone(), expr_to_value(&pe.value)))
1942                    .collect();
1943                tx.merge_node(&m.label, props)?;
1944                Ok(QueryResult::empty(vec![]))
1945            }
1946
1947            Statement::MatchMutate(ref mm) => {
1948                let csrs = self.cached_csr_map();
1949                let engine = Engine::new(
1950                    NodeStore::open(&self.inner.path)?,
1951                    self.catalog_snapshot(),
1952                    csrs,
1953                    &self.inner.path,
1954                );
1955                // SPA-219: edge delete path.
1956                if is_edge_delete_mutation(mm) {
1957                    let edges = engine.scan_match_mutate_edges(mm)?;
1958                    for (src, dst, rel_type) in edges {
1959                        tx.delete_edge(src, dst, &rel_type)?;
1960                    }
1961                } else {
1962                    let matching_ids = engine.scan_match_mutate(mm)?;
1963                    for node_id in matching_ids {
1964                        match &mm.mutation {
1965                            sparrowdb_cypher::ast::Mutation::Set { prop, value, .. } => {
1966                                let sv = expr_to_value(value);
1967                                tx.set_property(node_id, prop, sv)?;
1968                            }
1969                            sparrowdb_cypher::ast::Mutation::Delete { .. } => {
1970                                tx.delete_node(node_id)?;
1971                            }
1972                        }
1973                    }
1974                }
1975                Ok(QueryResult::empty(vec![]))
1976            }
1977
1978            Statement::MatchCreate(ref mc) => {
1979                for (_, rel_pat, _) in &mc.create.edges {
1980                    if is_reserved_label(&rel_pat.rel_type) {
1981                        return Err(reserved_label_error(&rel_pat.rel_type));
1982                    }
1983                }
1984                if mc.create.edges.is_empty() {
1985                    return Err(Error::Unimplemented);
1986                }
1987                let csrs = self.cached_csr_map();
1988                let engine = Engine::new(
1989                    NodeStore::open(&self.inner.path)?,
1990                    self.catalog_snapshot(),
1991                    csrs,
1992                    &self.inner.path,
1993                );
1994                let matched_rows = engine.scan_match_create_rows(mc)?;
1995                for row in &matched_rows {
1996                    for (left_var, rel_pat, right_var) in &mc.create.edges {
1997                        let src = row.get(left_var).copied().ok_or_else(|| {
1998                            Error::InvalidArgument(format!(
1999                                "CREATE references unbound variable: {left_var}"
2000                            ))
2001                        })?;
2002                        let dst = row.get(right_var).copied().ok_or_else(|| {
2003                            Error::InvalidArgument(format!(
2004                                "CREATE references unbound variable: {right_var}"
2005                            ))
2006                        })?;
2007                        tx.create_edge(src, dst, &rel_pat.rel_type, HashMap::new())?;
2008                    }
2009                }
2010                Ok(QueryResult::empty(vec![]))
2011            }
2012
2013            Statement::MatchMergeRel(ref mm) => {
2014                // Find-or-create relationship batch variant (SPA-233).
2015                //
2016                // NOTE: MATCH...MERGE in a batch reads committed on-disk state only;
2017                // nodes/edges created earlier in the same batch are not visible to
2018                // the MERGE existence check.  If in-batch visibility is required,
2019                // flush the transaction to disk before issuing MATCH...MERGE.
2020                let csrs = self.cached_csr_map();
2021                let engine = Engine::new(
2022                    NodeStore::open(&self.inner.path)?,
2023                    self.catalog_snapshot(),
2024                    csrs,
2025                    &self.inner.path,
2026                );
2027                // Guard: reject reserved rel-type prefix used by internal system edges.
2028                if mm.rel_type.starts_with("__SO_") {
2029                    return Err(Error::InvalidArgument(format!(
2030                        "relationship type '{}' is reserved",
2031                        mm.rel_type
2032                    )));
2033                }
2034                let matched_rows = engine.scan_match_merge_rel_rows(mm)?;
2035                for row in &matched_rows {
2036                    let src = *row.get(&mm.src_var).ok_or_else(|| {
2037                        Error::InvalidArgument(format!(
2038                            "MERGE references unbound source variable: {}",
2039                            mm.src_var
2040                        ))
2041                    })?;
2042                    let dst = *row.get(&mm.dst_var).ok_or_else(|| {
2043                        Error::InvalidArgument(format!(
2044                            "MERGE references unbound destination variable: {}",
2045                            mm.dst_var
2046                        ))
2047                    })?;
2048                    let src_label_id = (src.0 >> 32) as u16;
2049                    let dst_label_id = (dst.0 >> 32) as u16;
2050                    let src_slot = src.0 & 0xFFFF_FFFF;
2051                    let dst_slot = dst.0 & 0xFFFF_FFFF;
2052
2053                    let catalog_rel_id_opt =
2054                        tx.catalog
2055                            .get_rel_table(src_label_id, dst_label_id, &mm.rel_type)?;
2056
2057                    let edge_exists = if let Some(crid) = catalog_rel_id_opt {
2058                        let rtid = RelTableId(crid as u32);
2059                        if let Ok(store) = EdgeStore::open(&self.inner.path, rtid) {
2060                            let in_delta = store
2061                                .read_delta()?
2062                                .iter()
2063                                .any(|rec| rec.src.0 == src.0 && rec.dst.0 == dst.0);
2064                            let in_csr = if let Ok(csr) = store.open_fwd() {
2065                                csr.neighbors(src_slot).contains(&dst_slot)
2066                            } else {
2067                                false
2068                            };
2069                            let in_pending = tx.pending_ops.iter().any(|op| {
2070                                matches!(
2071                                    op,
2072                                    PendingOp::EdgeCreate {
2073                                        src: ps, dst: pd, rel_table_id: prt, ..
2074                                    } if *ps == src && *pd == dst && *prt == rtid
2075                                )
2076                            });
2077                            in_delta || in_csr || in_pending
2078                        } else {
2079                            false
2080                        }
2081                    } else {
2082                        false
2083                    };
2084
2085                    if !edge_exists {
2086                        tx.create_edge(src, dst, &mm.rel_type, HashMap::new())?;
2087                    }
2088                }
2089                Ok(QueryResult::empty(vec![]))
2090            }
2091
2092            _ => Err(Error::Unimplemented),
2093        }
2094    }
2095
2096    /// Return `(node_count, edge_count)` by summing the high-water marks
2097    /// across all catalog labels (nodes) and delta-log record counts across
2098    /// all registered relationship tables (edges).
2099    ///
2100    /// Both counts reflect committed storage state; in-flight write
2101    /// transactions are not visible.
2102    ///
2103    /// **Note:** `node_count` is based on per-label high-water marks and
2104    /// therefore includes soft-deleted nodes whose slots have not yet been
2105    /// reclaimed.  The count will converge to the true live-node count once
2106    /// compaction / GC is implemented.
2107    pub fn db_counts(&self) -> Result<(u64, u64)> {
2108        let path = &self.inner.path;
2109        let catalog = self.catalog_snapshot();
2110        let node_store = NodeStore::open(path)?;
2111
2112        // Node count: sum hwm_for_label across every registered label.
2113        let node_count: u64 = catalog
2114            .list_labels()
2115            .unwrap_or_default()
2116            .iter()
2117            .map(|(label_id, _name)| node_store.hwm_for_label(*label_id as u32).unwrap_or(0))
2118            .sum();
2119
2120        // Edge count: for each registered rel table, sum CSR base edges (post-checkpoint)
2121        // plus delta-log records (pre-checkpoint / unflushed).
2122        let rel_table_ids = catalog.list_rel_table_ids();
2123        let ids_to_scan: Vec<u64> = if rel_table_ids.is_empty() {
2124            // No rel tables in catalog yet — fall back to the default table (id=0).
2125            vec![0]
2126        } else {
2127            rel_table_ids.iter().map(|(id, _, _, _)| *id).collect()
2128        };
2129        let edge_count: u64 = ids_to_scan
2130            .iter()
2131            .map(|&id| {
2132                let Ok(store) = EdgeStore::open(path, RelTableId(id as u32)) else {
2133                    return 0;
2134                };
2135                let csr_edges = store.open_fwd().map(|csr| csr.n_edges()).unwrap_or(0);
2136                let delta_edges = store
2137                    .read_delta()
2138                    .map(|records| records.len() as u64)
2139                    .unwrap_or(0);
2140                csr_edges + delta_edges
2141            })
2142            .sum();
2143
2144        Ok((node_count, edge_count))
2145    }
2146
2147    /// Return all node label names currently registered in the catalog (SPA-209).
2148    ///
2149    /// Labels are registered automatically the first time a node with that label
2150    /// is created. The returned list is ordered by insertion (i.e. by `LabelId`).
2151    pub fn labels(&self) -> Result<Vec<String>> {
2152        let catalog = self.catalog_snapshot();
2153        Ok(catalog
2154            .list_labels()?
2155            .into_iter()
2156            .map(|(_id, name)| name)
2157            .collect())
2158    }
2159
2160    /// Return all relationship type names currently registered in the catalog (SPA-209).
2161    ///
2162    /// The returned list is deduplicated and sorted alphabetically.
2163    pub fn relationship_types(&self) -> Result<Vec<String>> {
2164        let catalog = self.catalog_snapshot();
2165        let mut types: Vec<String> = catalog
2166            .list_rel_tables()?
2167            .into_iter()
2168            .map(|(_src, _dst, rel_type)| rel_type)
2169            .collect();
2170        types.sort();
2171        types.dedup();
2172        Ok(types)
2173    }
2174
2175    /// Return the top-`limit` nodes of the given `label` ordered by out-degree
2176    /// descending (SPA-168).
2177    ///
2178    /// Each element of the returned `Vec` is `(node_id, out_degree)`.  Ties are
2179    /// broken by ascending node id for determinism.
2180    ///
2181    /// This is an O(N log k) operation backed by the pre-computed
2182    /// [`DegreeCache`](sparrowdb_execution::DegreeCache) — no edge scan is
2183    /// performed at query time.
2184    ///
2185    /// Returns an empty `Vec` when `limit == 0`, the label is unknown, or the
2186    /// label has no nodes.
2187    ///
2188    /// # Errors
2189    /// Propagates I/O errors from opening the node store, CSR files, or
2190    /// catalog.
2191    pub fn top_degree_nodes(&self, label: &str, limit: usize) -> Result<Vec<(u64, u32)>> {
2192        if limit == 0 {
2193            return Ok(vec![]);
2194        }
2195        let path = &self.inner.path;
2196        let catalog = self.catalog_snapshot();
2197        let label_id: u32 = match catalog.get_label(label)? {
2198            Some(id) => id as u32,
2199            None => return Ok(vec![]),
2200        };
2201        let csrs = open_csr_map(path);
2202        let engine = Engine::new(NodeStore::open(path)?, catalog, csrs, path);
2203        engine.top_k_by_degree(label_id, limit)
2204    }
2205
2206    /// Export the graph as a DOT (Graphviz) string for visualization.
2207    ///
2208    /// Queries all nodes via Cypher and reads edges directly from storage so
2209    /// that the output is correct regardless of whether the database has been
2210    /// checkpointed (CSR) or not (delta log).  The result can be piped through
2211    /// `dot -Tsvg` to produce an SVG, or written to a `.dot` file for offline
2212    /// rendering.
2213    ///
2214    /// ## Example
2215    ///
2216    /// ```no_run
2217    /// # use sparrowdb::GraphDb;
2218    /// # let db = GraphDb::open(std::path::Path::new("/tmp/my.sparrow")).unwrap();
2219    /// let dot = db.export_dot().unwrap();
2220    /// std::fs::write("graph.dot", &dot).unwrap();
2221    /// // Then: dot -Tsvg graph.dot -o graph.svg
2222    /// ```
2223    pub fn export_dot(&self) -> Result<String> {
2224        /// Escape a string for use inside a DOT label (backslash + double-quote).
2225        fn dot_escape(s: &str) -> String {
2226            s.replace('\\', "\\\\").replace('"', "\\\"")
2227        }
2228
2229        /// Format a `Value` as a human-readable string for DOT labels / edge
2230        /// labels.  Falls back to the `Display` impl for most variants.
2231        fn fmt_val(v: &sparrowdb_execution::types::Value) -> String {
2232            match v {
2233                sparrowdb_execution::types::Value::String(s) => s.clone(),
2234                sparrowdb_execution::types::Value::Int64(i) => i.to_string(),
2235                sparrowdb_execution::types::Value::List(items) => {
2236                    // labels(n) returns a List of String items.
2237                    items
2238                        .iter()
2239                        .filter_map(|it| {
2240                            if let sparrowdb_execution::types::Value::String(s) = it {
2241                                Some(s.as_str())
2242                            } else {
2243                                None
2244                            }
2245                        })
2246                        .collect::<Vec<_>>()
2247                        .join(":")
2248                }
2249                other => format!("{other}"),
2250            }
2251        }
2252
2253        let path = &self.inner.path;
2254        let catalog = self.catalog_snapshot();
2255
2256        // Query all nodes via Cypher — id(n) is reliably Int64 from node scans.
2257        let nodes =
2258            self.execute("MATCH (n) RETURN id(n) AS nid, labels(n) AS lbls, n.name AS nm")?;
2259
2260        // Read edges directly from storage.
2261        //
2262        // The Cypher engine's project_hop_row does not expose id(a)/id(b) in
2263        // hop patterns (SPA-149 follow-up), so we read the delta log
2264        // (pre-checkpoint) and CSR (post-checkpoint) directly.
2265        //
2266        // NodeId encoding: (label_id as u64) << 32 | slot
2267        // This is the same as the value returned by id(n) for node scans.
2268        let rel_tables = catalog.list_rel_tables_with_ids();
2269        let mut edge_triples: Vec<(i64, String, i64)> = Vec::new();
2270
2271        for (catalog_id, src_label_id, dst_label_id, rel_type) in &rel_tables {
2272            let storage_rel_id = sparrowdb_storage::edge_store::RelTableId(*catalog_id as u32);
2273
2274            if let Ok(store) = sparrowdb_storage::edge_store::EdgeStore::open(path, storage_rel_id)
2275            {
2276                // Delta log: stores full NodeId pairs directly.
2277                if let Ok(records) = store.read_delta() {
2278                    for rec in records {
2279                        edge_triples.push((rec.src.0 as i64, rel_type.clone(), rec.dst.0 as i64));
2280                    }
2281                }
2282
2283                // CSR: (src_slot, dst_slot) relative to label IDs.
2284                if let Ok(csr) = store.open_fwd() {
2285                    let n_nodes = csr.n_nodes();
2286                    for src_slot in 0..n_nodes {
2287                        let src_id = ((*src_label_id as u64) << 32 | src_slot) as i64;
2288                        for &dst_slot in csr.neighbors(src_slot) {
2289                            let dst_id = ((*dst_label_id as u64) << 32 | dst_slot) as i64;
2290                            edge_triples.push((src_id, rel_type.clone(), dst_id));
2291                        }
2292                    }
2293                }
2294            }
2295        }
2296
2297        // Build DOT output.
2298        let mut dot =
2299            String::from("digraph SparrowDB {\n  rankdir=LR;\n  node [shape=ellipse];\n\n");
2300
2301        for row in &nodes.rows {
2302            let node_id = match &row[0] {
2303                sparrowdb_execution::types::Value::Int64(i) => *i,
2304                _ => continue,
2305            };
2306            let label_str = fmt_val(&row[1]);
2307            let name_str = match &row[2] {
2308                sparrowdb_execution::types::Value::Null => String::new(),
2309                v => fmt_val(v),
2310            };
2311            let display_label = if name_str.is_empty() {
2312                format!("{}\\nid={}", dot_escape(&label_str), node_id)
2313            } else {
2314                format!("{}\\n{}", dot_escape(&label_str), dot_escape(&name_str))
2315            };
2316            dot.push_str(&format!("  n{node_id} [label=\"{display_label}\"];\n"));
2317        }
2318
2319        dot.push('\n');
2320
2321        for (src_id, rel_type, dst_id) in &edge_triples {
2322            dot.push_str(&format!(
2323                "  n{src_id} -> n{dst_id} [label=\"{}\"];\n",
2324                dot_escape(rel_type)
2325            ));
2326        }
2327
2328        dot.push_str("}\n");
2329        Ok(dot)
2330    }
2331
2332    /// SPA-247: Convenience wrapper for upsert — find an existing node with
2333    /// `(label, match_key=match_value)` and update its properties, or create a
2334    /// new one if none exists.
2335    ///
2336    /// Opens a single-operation write transaction, calls
2337    /// [`WriteTx::merge_node_by_property`], and commits.
2338    ///
2339    /// Returns `(NodeId, created)`.
2340    pub fn merge_node_by_property(
2341        &self,
2342        label: &str,
2343        match_key: &str,
2344        match_value: &Value,
2345        properties: HashMap<String, Value>,
2346    ) -> Result<(NodeId, bool)> {
2347        let mut tx = self.begin_write()?;
2348        let result = tx.merge_node_by_property(label, match_key, match_value, properties)?;
2349        tx.commit()?;
2350        self.invalidate_prop_index();
2351        self.invalidate_catalog();
2352        Ok(result)
2353    }
2354
2355    /// Convenience wrapper: remove the directed edge `src → dst` of `rel_type`.
2356    ///
2357    /// Opens a single-operation write transaction, calls
2358    /// [`WriteTx::delete_edge`], and commits.  Suitable for callers that need
2359    /// to remove a specific edge without managing a transaction explicitly.
2360    ///
2361    /// Returns [`Error::WriterBusy`] if another write transaction is already
2362    /// active; returns [`Error::InvalidArgument`] if the rel type or edge is
2363    /// not found.
2364    pub fn delete_edge(&self, src: NodeId, dst: NodeId, rel_type: &str) -> Result<()> {
2365        let mut tx = self.begin_write()?;
2366        tx.delete_edge(src, dst, rel_type)?;
2367        tx.commit()?;
2368        self.invalidate_csr_map();
2369        Ok(())
2370    }
2371}
2372
2373/// Convenience wrapper — equivalent to [`GraphDb::open`].
2374pub fn open(path: &Path) -> Result<GraphDb> {
2375    GraphDb::open(path)
2376}
2377
2378/// Migrate WAL segments from legacy v21 (CRC32 IEEE) to v2 (CRC32C Castagnoli).
2379///
2380/// Call this on a database path **before** opening it with [`GraphDb::open`].
2381/// The database must not be open by any other process.
2382///
2383/// This is safe to run on a database that is already at v2 — those segments are
2384/// simply skipped.  The migration is idempotent.
2385///
2386/// # Example
2387///
2388/// ```no_run
2389/// use std::path::Path;
2390/// let result = sparrowdb::migrate_wal(Path::new("/path/to/my.sparrow")).unwrap();
2391/// println!("Converted {} segments", result.segments_converted);
2392/// ```
2393pub fn migrate_wal(db_path: &Path) -> Result<sparrowdb_storage::wal::migrate::MigrationResult> {
2394    let wal_dir = db_path.join("wal");
2395    if !wal_dir.exists() {
2396        return Ok(sparrowdb_storage::wal::migrate::MigrationResult {
2397            segments_inspected: 0,
2398            segments_converted: 0,
2399            segments_skipped: 0,
2400            records_converted: 0,
2401        });
2402    }
2403    sparrowdb_storage::wal::migrate::migrate_wal(&wal_dir)
2404}
2405
2406// ── Legacy alias ──────────────────────────────────────────────────────────────
2407
2408/// Legacy alias kept for backward compatibility with Phase 0 tests.
2409pub type SparrowDB = GraphDb;
2410
2411// ── ReadTx ────────────────────────────────────────────────────────────────────
2412
2413/// A read-only snapshot transaction.
2414///
2415/// Pinned at the `txn_id` current when this handle was opened; immune to
2416/// subsequent writer commits for the lifetime of this handle.
2417pub struct ReadTx {
2418    /// The committed `txn_id` this reader is pinned to.
2419    pub snapshot_txn_id: u64,
2420    store: NodeStore,
2421    inner: Arc<DbInner>,
2422}
2423
2424impl ReadTx {
2425    /// Read the `Int64` property values of a node at the pinned snapshot.
2426    ///
2427    /// For each column the version chain is consulted first; if a value was
2428    /// committed at or before `snapshot_txn_id` it shadows the on-disk value.
2429    pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, Value)>> {
2430        let versions = self.inner.versions.read().expect("version lock poisoned");
2431        let raw = self.store.get_node_raw(node_id, col_ids)?;
2432        let result = raw
2433            .into_iter()
2434            .map(|(col_id, raw_val)| {
2435                // Check version chain first.
2436                if let Some(v) = versions.get_at(node_id, col_id, self.snapshot_txn_id) {
2437                    (col_id, v)
2438                } else {
2439                    (col_id, self.store.decode_raw_value(raw_val))
2440                }
2441            })
2442            .collect();
2443        Ok(result)
2444    }
2445
2446    /// Return the snapshot `TxnId` this reader is pinned to.
2447    pub fn snapshot(&self) -> TxnId {
2448        TxnId(self.snapshot_txn_id)
2449    }
2450
2451    /// Execute a read-only Cypher query against the pinned snapshot.
2452    ///
2453    /// ## Snapshot isolation
2454    ///
2455    /// The query sees exactly the committed state at the moment
2456    /// [`begin_read`](GraphDb::begin_read) was called.  Any writes committed
2457    /// after that point — even fully committed ones — are invisible until a
2458    /// new `ReadTx` is opened.
2459    ///
2460    /// ## Concurrency
2461    ///
2462    /// Multiple `ReadTx` handles may run `query` concurrently.  No write lock
2463    /// is acquired; only the shared read-paths of the catalog, CSR, and
2464    /// property-index caches are accessed.
2465    ///
2466    /// ## Mutation statements rejected
2467    ///
2468    /// Passing a mutation statement (`CREATE`, `MERGE`, `MATCH … SET`,
2469    /// `MATCH … DELETE`, `CHECKPOINT`, `OPTIMIZE`, etc.) returns
2470    /// [`Error::ReadOnly`].  Use [`GraphDb::execute`] for mutations.
2471    ///
2472    /// # Example
2473    /// ```no_run
2474    /// use sparrowdb::GraphDb;
2475    ///
2476    /// let db = GraphDb::open(std::path::Path::new("/tmp/g.sparrow")).unwrap();
2477    /// let tx = db.begin_read().unwrap();
2478    /// let result = tx.query("MATCH (n:Person) RETURN n.name").unwrap();
2479    /// println!("{} rows", result.rows.len());
2480    /// ```
2481    pub fn query(&self, cypher: &str) -> crate::Result<QueryResult> {
2482        use sparrowdb_cypher::{bind, parse};
2483
2484        let stmt = parse(cypher)?;
2485
2486        // Take a snapshot of the catalog from the shared cache (no disk I/O if
2487        // the catalog is already warm).
2488        let catalog_snap = self
2489            .inner
2490            .catalog
2491            .read()
2492            .expect("catalog RwLock poisoned")
2493            .clone();
2494
2495        let bound = bind(stmt, &catalog_snap)?;
2496
2497        // Reject any statement that would mutate state — ReadTx is read-only.
2498        if Engine::is_mutation(&bound.inner) {
2499            return Err(crate::Error::ReadOnly);
2500        }
2501
2502        // Also reject DDL / maintenance statements.
2503        use sparrowdb_cypher::ast::Statement;
2504        match &bound.inner {
2505            Statement::Checkpoint | Statement::Optimize | Statement::CreateConstraint { .. } => {
2506                return Err(crate::Error::ReadOnly);
2507            }
2508            _ => {}
2509        }
2510
2511        let _span = info_span!("sparrowdb.readtx.query").entered();
2512
2513        let csrs = self
2514            .inner
2515            .csr_map
2516            .read()
2517            .expect("csr_map RwLock poisoned")
2518            .clone();
2519
2520        let mut engine = {
2521            let _open_span = info_span!("sparrowdb.readtx.open_engine").entered();
2522            let row_counts = self
2523                .inner
2524                .label_row_counts
2525                .read()
2526                .expect("label_row_counts RwLock poisoned")
2527                .clone();
2528            Engine::new_with_all_caches(
2529                NodeStore::open(&self.inner.path)?,
2530                catalog_snap,
2531                csrs,
2532                &self.inner.path,
2533                Some(&self.inner.prop_index),
2534                Some(row_counts),
2535                Some(Arc::clone(&self.inner.edge_props_cache)),
2536            )
2537        };
2538
2539        let result = {
2540            let _exec_span = info_span!("sparrowdb.readtx.execute").entered();
2541            engine.execute_statement(bound.inner)?
2542        };
2543
2544        // Write lazily-loaded columns back to the shared property-index cache
2545        // so subsequent queries benefit from warm column data.
2546        engine.write_back_prop_index(&self.inner.prop_index);
2547
2548        tracing::debug!(
2549            rows = result.rows.len(),
2550            snapshot_txn_id = self.snapshot_txn_id,
2551            "readtx query complete"
2552        );
2553        Ok(result)
2554    }
2555}
2556
2557// ── WriteTx ───────────────────────────────────────────────────────────────────
2558
2559/// A write transaction.
2560///
2561/// Only one may be active at a time (writer-lock held for the lifetime of
2562/// this struct).  Commit by calling [`WriteTx::commit`]; uncommitted changes
2563/// are discarded on drop.
2564///
2565/// # Atomicity guarantee (SPA-181)
2566///
2567/// All mutations (`create_node`, `create_edge`, `delete_node`, `create_label`,
2568/// `merge_node`) are buffered in memory until [`commit`] is called.  If the
2569/// transaction is dropped without committing, **no changes are persisted** —
2570/// the database remains in the state it was at the time [`begin_write`] was
2571/// called.
2572///
2573/// [`begin_write`]: GraphDb::begin_write
2574/// [`commit`]: WriteTx::commit
2575#[must_use = "call commit() to persist changes, or drop to discard"]
2576pub struct WriteTx {
2577    inner: Arc<DbInner>,
2578    store: NodeStore,
2579    catalog: Catalog,
2580    /// Staged property updates (not yet visible to readers).
2581    write_buf: WriteBuffer,
2582    /// Staged WAL mutation records to emit on commit.
2583    wal_mutations: Vec<WalMutation>,
2584    /// Set of node IDs written by this transaction (for MVCC conflict detection).
2585    dirty_nodes: HashSet<u64>,
2586    /// The committed txn_id at the time this WriteTx was opened (MVCC snapshot).
2587    snapshot_txn_id: u64,
2588    /// Held for the lifetime of this WriteTx; released on drop.
2589    /// Uses AtomicBool-based guard — no unsafe lifetime extension needed (SPA-181).
2590    _guard: WriteGuard,
2591    committed: bool,
2592    /// In-flight fulltext index updates — flushed to disk on commit.
2593    ///
2594    /// Caching open indexes here avoids one open+flush per `add_to_fulltext_index`
2595    /// call; instead we batch all additions and flush each index exactly once.
2596    fulltext_pending: HashMap<String, sparrowdb_storage::fulltext_index::FulltextIndex>,
2597    /// Buffered structural mutations (create_node, delete_node, create_edge,
2598    /// create_label) not yet written to disk.  Flushed atomically on commit.
2599    pending_ops: Vec<PendingOp>,
2600}
2601
2602impl WriteTx {
2603    // ── Core node/property API (pre-Phase 7) ─────────────────────────────────
2604
2605    /// Create a new node under `label_id` with the given properties.
2606    ///
2607    /// Returns the packed [`NodeId`].
2608    ///
2609    /// The node is **not** written to disk until [`commit`] is called.
2610    /// Dropping the transaction without committing discards this operation.
2611    ///
2612    /// [`commit`]: WriteTx::commit
2613    pub fn create_node(&mut self, label_id: u32, props: &[(u32, Value)]) -> Result<NodeId> {
2614        // Allocate a node ID by consulting the in-memory HWM.  We peek at
2615        // the current HWM to compute the future NodeId without actually
2616        // writing anything to disk yet.
2617        let slot = self.store.peek_next_slot(label_id)?;
2618        let node_id = NodeId((label_id as u64) << 32 | slot as u64);
2619        self.dirty_nodes.insert(node_id.0);
2620        self.pending_ops.push(PendingOp::NodeCreate {
2621            label_id,
2622            slot,
2623            props: props.to_vec(),
2624        });
2625        self.wal_mutations.push(WalMutation::NodeCreate {
2626            node_id,
2627            label_id,
2628            props: props.to_vec(),
2629            // Low-level create_node: no property names available.
2630            prop_names: Vec::new(),
2631        });
2632        Ok(node_id)
2633    }
2634
2635    /// Create a new node with named properties, recording names in the WAL.
2636    ///
2637    /// Like [`create_node`] but accepts `(name, value)` pairs so that the
2638    /// property names are preserved in the WAL record for schema introspection
2639    /// (`CALL db.schema()`).  Col-ids are derived via [`col_id_of`].
2640    pub fn create_node_named(
2641        &mut self,
2642        label_id: u32,
2643        named_props: &[(String, Value)],
2644    ) -> Result<NodeId> {
2645        let props: Vec<(u32, Value)> = named_props
2646            .iter()
2647            .map(|(name, v)| (col_id_of(name), v.clone()))
2648            .collect();
2649        let prop_names: Vec<String> = named_props.iter().map(|(n, _)| n.clone()).collect();
2650        let slot = self.store.peek_next_slot(label_id)?;
2651        let node_id = NodeId((label_id as u64) << 32 | slot as u64);
2652        self.dirty_nodes.insert(node_id.0);
2653        self.pending_ops.push(PendingOp::NodeCreate {
2654            label_id,
2655            slot,
2656            props: props.clone(),
2657        });
2658        self.wal_mutations.push(WalMutation::NodeCreate {
2659            node_id,
2660            label_id,
2661            props,
2662            prop_names,
2663        });
2664        Ok(node_id)
2665    }
2666
2667    /// Stage a property update.
2668    ///
2669    /// The new value is not visible to readers until [`commit`] is called.
2670    ///
2671    /// On the first update to a `(node_id, col_id)` key during this
2672    /// transaction, the current on-disk value is read and stored as the
2673    /// before-image so that readers with older snapshots continue to see the
2674    /// correct value after commit overwrites the column file.
2675    pub fn set_node_col(&mut self, node_id: NodeId, col_id: u32, value: Value) {
2676        self.set_node_col_named(node_id, col_id, format!("col_{col_id}"), value);
2677    }
2678
2679    /// Stage a property update with an explicit human-readable key name for WAL.
2680    fn set_node_col_named(&mut self, node_id: NodeId, col_id: u32, key_name: String, value: Value) {
2681        let key = (node_id.0, col_id);
2682        self.dirty_nodes.insert(node_id.0);
2683
2684        if self.write_buf.updates.contains_key(&key) {
2685            // Already staged this key — just update the new_value.
2686            let entry = self.write_buf.updates.get_mut(&key).unwrap();
2687            entry.new_value = value;
2688            // Keep the key_name from the first staging (it's the same column).
2689            return;
2690        }
2691
2692        // First update to this key in this transaction.  Capture the
2693        // before-image so readers pinned before our commit retain access.
2694        let prev_txn_id = self.inner.current_txn_id.load(Ordering::Acquire);
2695
2696        // Check whether the version chain already has an entry for this key.
2697        // If so, the chain is already correct and we don't need to add the
2698        // before-image separately.
2699        let already_in_chain = {
2700            let vs = self.inner.versions.read().expect("version lock");
2701            vs.map.contains_key(&key)
2702        };
2703
2704        let before_image = if already_in_chain {
2705            None
2706        } else {
2707            // Read the current on-disk value as the before-image.
2708            let disk_val = self
2709                .store
2710                .get_node_raw(node_id, &[col_id])
2711                .ok()
2712                .and_then(|mut v| v.pop())
2713                .map(|(_, raw)| self.store.decode_raw_value(raw));
2714            disk_val.map(|v| (prev_txn_id, v))
2715        };
2716
2717        self.write_buf.updates.insert(
2718            key,
2719            StagedUpdate {
2720                before_image,
2721                new_value: value,
2722                key_name,
2723            },
2724        );
2725    }
2726
2727    /// Create a label in the schema catalog.
2728    ///
2729    /// # Note on atomicity
2730    ///
2731    /// Schema changes (label creation) are written to the catalog file
2732    /// immediately and are not rolled back if the transaction is later
2733    /// dropped without committing.  Label creation is idempotent at the
2734    /// catalog level: a duplicate name returns `Error::AlreadyExists`.
2735    /// Full schema-change atomicity is deferred to a future phase.
2736    pub fn create_label(&mut self, name: &str) -> Result<u16> {
2737        self.catalog.create_label(name)
2738    }
2739
2740    /// Look up `name` in the catalog, creating it if it does not yet exist.
2741    ///
2742    /// Returns the `label_id` as a `u32` (upper 32 bits of a packed NodeId).
2743    /// Unlike [`create_label`], this method is idempotent: calling it multiple
2744    /// times with the same name always returns the same id.
2745    ///
2746    /// Primarily used by the bulk-import path (SPA-148) where labels may be
2747    /// seen for the first time on any row.
2748    ///
2749    /// [`create_label`]: WriteTx::create_label
2750    pub fn get_or_create_label_id(&mut self, name: &str) -> Result<u32> {
2751        match self.catalog.get_label(name)? {
2752            Some(id) => Ok(id as u32),
2753            None => Ok(self.catalog.create_label(name)? as u32),
2754        }
2755    }
2756
2757    // ── Phase 7 mutation API (SPA-123 … SPA-126) ─────────────────────────────
2758
2759    /// SPA-123: Find or create a node matching `label` + `props`.
2760    ///
2761    /// Scans the node store for a slot whose columns match every key→value
2762    /// pair in `props` (using [`fnv1a_col_id`] to derive column IDs from
2763    /// key strings).  Returns the existing [`NodeId`] if found, or creates a
2764    /// new node and returns the new id.
2765    ///
2766    /// The label is resolved (or created) in the catalog.
2767    pub fn merge_node(&mut self, label: &str, props: HashMap<String, Value>) -> Result<NodeId> {
2768        // Resolve / create label.
2769        let label_id: u32 = match self.catalog.get_label(label)? {
2770            Some(id) => id as u32,
2771            None => self.catalog.create_label(label)? as u32,
2772        };
2773
2774        // Build col list from props keys.
2775        let col_kv: Vec<(String, u32, Value)> = props
2776            .into_iter()
2777            .map(|(k, v)| {
2778                let col_id = fnv1a_col_id(&k);
2779                (k, col_id, v)
2780            })
2781            .collect();
2782        let col_ids: Vec<u32> = col_kv.iter().map(|&(_, col_id, _)| col_id).collect();
2783
2784        // First, check buffered (not-yet-committed) node creates in pending_ops.
2785        // This ensures merge_node is idempotent within a single transaction.
2786        for op in &self.pending_ops {
2787            if let PendingOp::NodeCreate {
2788                label_id: op_label_id,
2789                slot: op_slot,
2790                props: op_props,
2791            } = op
2792            {
2793                if *op_label_id == label_id {
2794                    let candidate = NodeId((label_id as u64) << 32 | *op_slot as u64);
2795                    let matches = col_kv.iter().all(|(_, col_id, want_val)| {
2796                        op_props
2797                            .iter()
2798                            .find(|&&(c, _)| c == *col_id)
2799                            // Compare in-memory Value objects directly so long
2800                            // strings (> 7 bytes) are not truncated (SPA-212).
2801                            .map(|(_, v)| v == want_val)
2802                            .unwrap_or(false)
2803                    });
2804                    if matches {
2805                        return Ok(candidate);
2806                    }
2807                }
2808            }
2809        }
2810
2811        // Scan on-disk slots for a match (only checks committed/on-disk nodes).
2812        // Use disk_hwm_for_label to avoid scanning slots that were only reserved
2813        // in-memory by peek_next_slot but not yet flushed to disk.
2814        let disk_hwm = self.store.disk_hwm_for_label(label_id)?;
2815        for slot in 0..disk_hwm {
2816            let candidate = NodeId((label_id as u64) << 32 | slot);
2817            if let Ok(stored) = self.store.get_node_raw(candidate, &col_ids) {
2818                let matches = col_kv.iter().all(|(_, col_id, want_val)| {
2819                    stored
2820                        .iter()
2821                        .find(|&&(c, _)| c == *col_id)
2822                        .map(|&(_, raw)| {
2823                            // Compare decoded values so overflow strings (> 7 bytes)
2824                            // match correctly (SPA-212).
2825                            self.store.decode_raw_value(raw) == *want_val
2826                        })
2827                        .unwrap_or(false)
2828                });
2829                if matches {
2830                    return Ok(candidate);
2831                }
2832            }
2833        }
2834
2835        // Not found — create a new node (buffered, same as create_node).
2836        let disk_props: Vec<(u32, Value)> = col_kv
2837            .iter()
2838            .map(|(_, col_id, v)| (*col_id, v.clone()))
2839            .collect();
2840        // Preserve property names from the col_kv tuples (key, col_id, value).
2841        let disk_prop_names: Vec<String> = col_kv.iter().map(|(k, _, _)| k.clone()).collect();
2842        let slot = self.store.peek_next_slot(label_id)?;
2843        let node_id = NodeId((label_id as u64) << 32 | slot as u64);
2844        self.dirty_nodes.insert(node_id.0);
2845        self.pending_ops.push(PendingOp::NodeCreate {
2846            label_id,
2847            slot,
2848            props: disk_props.clone(),
2849        });
2850        self.wal_mutations.push(WalMutation::NodeCreate {
2851            node_id,
2852            label_id,
2853            props: disk_props,
2854            prop_names: disk_prop_names,
2855        });
2856        Ok(node_id)
2857    }
2858
2859    /// SPA-247: Upsert a node — find an existing node with `(label, match_key=match_value)`
2860    /// and update its properties, or create a new one if none exists.
2861    ///
2862    /// Returns `(NodeId, created)` where `created` is `true` when a new node
2863    /// was inserted and `false` when an existing node was found and updated.
2864    ///
2865    /// The lookup scans both pending (in-transaction) nodes and committed
2866    /// on-disk nodes for a slot whose label matches and whose `match_key`
2867    /// column equals `match_value`.  On a hit the remaining `properties` are
2868    /// applied via [`set_property`]; on a miss a new node is created via
2869    /// [`merge_node`] with `match_key=match_value` merged into `properties`.
2870    pub fn merge_node_by_property(
2871        &mut self,
2872        label: &str,
2873        match_key: &str,
2874        match_value: &Value,
2875        properties: HashMap<String, Value>,
2876    ) -> Result<(NodeId, bool)> {
2877        let label_id: u32 = match self.catalog.get_label(label)? {
2878            Some(id) => id as u32,
2879            None => {
2880                // Label doesn't exist yet — no node can match. Create it.
2881                let mut full_props = properties;
2882                full_props.insert(match_key.to_string(), match_value.clone());
2883                let node_id = self.merge_node(label, full_props)?;
2884                return Ok((node_id, true));
2885            }
2886        };
2887
2888        let match_col_id = fnv1a_col_id(match_key);
2889        let match_col_ids = vec![match_col_id];
2890
2891        // Step 1: Check pending (in-transaction) nodes.
2892        for op in &self.pending_ops {
2893            if let PendingOp::NodeCreate {
2894                label_id: op_label_id,
2895                slot: op_slot,
2896                props: op_props,
2897            } = op
2898            {
2899                if *op_label_id == label_id {
2900                    let matches = op_props
2901                        .iter()
2902                        .find(|&&(c, _)| c == match_col_id)
2903                        .map(|(_, v)| v == match_value)
2904                        .unwrap_or(false);
2905                    if matches {
2906                        let node_id = NodeId((label_id as u64) << 32 | *op_slot as u64);
2907                        // Update remaining properties on the existing node.
2908                        for (k, v) in &properties {
2909                            self.set_property(node_id, k, v.clone())?;
2910                        }
2911                        return Ok((node_id, false));
2912                    }
2913                }
2914            }
2915        }
2916
2917        // Step 2: Scan on-disk committed nodes.
2918        let disk_hwm = self.store.disk_hwm_for_label(label_id)?;
2919        for slot in 0..disk_hwm {
2920            let candidate = NodeId((label_id as u64) << 32 | slot);
2921            if let Ok(stored) = self.store.get_node_raw(candidate, &match_col_ids) {
2922                let matches = stored
2923                    .iter()
2924                    .find(|&&(c, _)| c == match_col_id)
2925                    .map(|&(_, raw)| self.store.decode_raw_value(raw) == *match_value)
2926                    .unwrap_or(false);
2927                if matches {
2928                    // Update remaining properties on the existing node.
2929                    for (k, v) in &properties {
2930                        self.set_property(candidate, k, v.clone())?;
2931                    }
2932                    return Ok((candidate, false));
2933                }
2934            }
2935        }
2936
2937        // Step 3: Not found — create new node with match_key included.
2938        let mut full_props = properties;
2939        full_props.insert(match_key.to_string(), match_value.clone());
2940        let node_id = self.merge_node(label, full_props)?;
2941        Ok((node_id, true))
2942    }
2943
2944    /// SPA-124: Update a named property on a node.
2945    ///
2946    /// Derives a stable column ID from `key` via [`fnv1a_col_id`] and stages
2947    /// the update through [`set_node_col`] (which records the before-image in
2948    /// the write buffer).  WAL emission happens once at commit time via the
2949    /// `updates` loop in `write_mutation_wal`.
2950    pub fn set_property(&mut self, node_id: NodeId, key: &str, val: Value) -> Result<()> {
2951        let col_id = fnv1a_col_id(key);
2952        self.dirty_nodes.insert(node_id.0);
2953
2954        // Stage the update through the write buffer (records before-image for
2955        // WAL and MVCC). WAL emission happens exactly once at commit time via
2956        // the `updates` loop in `write_mutation_wal`.  Pass the human-readable
2957        // key name so the WAL record carries it (not the synthesized col_{id}).
2958        self.set_node_col_named(node_id, col_id, key.to_string(), val);
2959
2960        Ok(())
2961    }
2962
2963    /// SPA-125: Delete a node, with edge-attachment check.
2964    ///
2965    /// Returns [`Error::NodeHasEdges`] if the node is referenced by any edge
2966    /// in the delta log.  On success, queues a `NodeDelete` WAL record and
2967    /// buffers the tombstone write; the on-disk tombstone is only applied when
2968    /// [`commit`] is called.
2969    ///
2970    /// [`commit`]: WriteTx::commit
2971    pub fn delete_node(&mut self, node_id: NodeId) -> Result<()> {
2972        // SPA-185: check ALL per-type delta logs for attached edges, not just
2973        // the hardcoded RelTableId(0).  Always include table-0 so that any
2974        // edges written before the catalog had entries are still detected.
2975        let rel_entries = self.catalog.list_rel_table_ids();
2976        let mut rel_ids_to_check: Vec<u32> =
2977            rel_entries.iter().map(|(id, _, _, _)| *id as u32).collect();
2978        // Always include the legacy table-0 slot.  If it is already in the
2979        // catalog list this dedup prevents a double-read.
2980        if !rel_ids_to_check.contains(&0u32) {
2981            rel_ids_to_check.push(0u32);
2982        }
2983        for rel_id in rel_ids_to_check {
2984            let delta = EdgeStore::open(&self.inner.path, RelTableId(rel_id))
2985                .and_then(|s| s.read_delta())
2986                .unwrap_or_default();
2987            if delta.iter().any(|r| r.src == node_id || r.dst == node_id) {
2988                return Err(Error::NodeHasEdges { node_id: node_id.0 });
2989            }
2990        }
2991
2992        // Also check buffered (not-yet-committed) edge creates in this
2993        // transaction — a node that has already been connected in the current
2994        // transaction cannot be deleted before commit.
2995        let has_buffered_edge = self.pending_ops.iter().any(|op| {
2996            matches!(op, PendingOp::EdgeCreate { src, dst, .. } if *src == node_id || *dst == node_id)
2997        });
2998        if has_buffered_edge {
2999            return Err(Error::NodeHasEdges { node_id: node_id.0 });
3000        }
3001
3002        // Buffer the tombstone — do NOT write to disk yet.
3003        self.dirty_nodes.insert(node_id.0);
3004        self.pending_ops.push(PendingOp::NodeDelete { node_id });
3005        self.wal_mutations.push(WalMutation::NodeDelete { node_id });
3006        Ok(())
3007    }
3008
3009    /// SPA-126: Create a directed edge `src → dst` with the given type.
3010    ///
3011    /// Buffers the edge creation; the delta-log append and WAL record are only
3012    /// written to disk when [`commit`] is called.  If the transaction is dropped
3013    /// without committing, no edge is persisted.
3014    ///
3015    /// Registers the relationship type name in the catalog so that queries
3016    /// like `MATCH (a)-[:REL]->(b)` can resolve the type (SPA-158).
3017    /// Returns the new [`EdgeId`].
3018    ///
3019    /// [`commit`]: WriteTx::commit
3020    pub fn create_edge(
3021        &mut self,
3022        src: NodeId,
3023        dst: NodeId,
3024        rel_type: &str,
3025        props: HashMap<String, Value>,
3026    ) -> Result<EdgeId> {
3027        // Derive label IDs from the packed node IDs (upper 32 bits).
3028        let src_label_id = (src.0 >> 32) as u16;
3029        let dst_label_id = (dst.0 >> 32) as u16;
3030
3031        // Register (or retrieve) the rel type in the catalog.
3032        // Catalog mutation is immediate and not transactional. This is acceptable
3033        // for now as rel type creation is idempotent. Full schema-change
3034        // atomicity is deferred to a future phase.
3035        let catalog_rel_id =
3036            self.catalog
3037                .get_or_create_rel_type_id(src_label_id, dst_label_id, rel_type)?;
3038        let rel_table_id = RelTableId(catalog_rel_id as u32);
3039
3040        // Compute the edge ID from the on-disk delta log size, offset by the
3041        // number of edges already buffered in this transaction for the same
3042        // rel_table_id.  Without this offset, multiple create_edge calls in the
3043        // same transaction would all derive the same on-disk base and collide.
3044        let base_edge_id = EdgeStore::peek_next_edge_id(&self.inner.path, rel_table_id)?;
3045        let buffered_count = self
3046            .pending_ops
3047            .iter()
3048            .filter(|op| {
3049                matches!(
3050                    op,
3051                    PendingOp::EdgeCreate {
3052                        rel_table_id: pending_rel_table_id,
3053                        ..
3054                    } if *pending_rel_table_id == rel_table_id
3055                )
3056            })
3057            .count() as u64;
3058        let edge_id = EdgeId(base_edge_id.0 + buffered_count);
3059
3060        // Convert HashMap<String, Value> props to (col_id, value_u64) pairs
3061        // using the canonical FNV-1a col_id derivation so read and write agree.
3062        // SPA-229: use NodeStore::encode_value (not val.to_u64()) so that
3063        // Value::Float is stored via f64::to_bits() in the heap rather than
3064        // panicking with "cannot be inline-encoded".
3065        let encoded_props: Vec<(u32, u64)> = props
3066            .iter()
3067            .map(|(name, val)| -> Result<(u32, u64)> {
3068                Ok((col_id_of(name), self.store.encode_value(val)?))
3069            })
3070            .collect::<Result<Vec<_>>>()?;
3071
3072        // Human-readable entries for WAL schema introspection.
3073        let prop_entries: Vec<(String, Value)> = props.into_iter().collect();
3074
3075        // Buffer the edge append — do NOT write to disk yet.
3076        self.pending_ops.push(PendingOp::EdgeCreate {
3077            src,
3078            dst,
3079            rel_table_id,
3080            props: encoded_props,
3081        });
3082        self.wal_mutations.push(WalMutation::EdgeCreate {
3083            edge_id,
3084            src,
3085            dst,
3086            rel_type: rel_type.to_string(),
3087            prop_entries,
3088        });
3089        Ok(edge_id)
3090    }
3091
3092    /// Delete the directed edge `src → dst` with the given relationship type.
3093    ///
3094    /// Resolves the relationship type to a `RelTableId` via the catalog, then
3095    /// buffers an `EdgeDelete` operation.  At commit time the edge is excised
3096    /// from the on-disk delta log.
3097    ///
3098    /// Returns [`Error::InvalidArgument`] if the relationship type is not
3099    /// registered in the catalog, or if no matching edge record exists in the
3100    /// delta log for the resolved table.
3101    ///
3102    /// Unblocks `SparrowOntology::init(force=true)` which needs to remove all
3103    /// existing edges before re-seeding the ontology graph.
3104    ///
3105    /// [`commit`]: WriteTx::commit
3106    pub fn delete_edge(&mut self, src: NodeId, dst: NodeId, rel_type: &str) -> Result<()> {
3107        let src_label_id = (src.0 >> 32) as u16;
3108        let dst_label_id = (dst.0 >> 32) as u16;
3109
3110        // Resolve the rel type in the catalog.  We do not create a new entry —
3111        // deleting a non-existent rel type is always an error.
3112        // The catalog's RelTableId is u64; EdgeStore's is RelTableId(u32).
3113        let catalog_rel_id = self
3114            .catalog
3115            .get_rel_table(src_label_id, dst_label_id, rel_type)?
3116            .ok_or_else(|| {
3117                Error::InvalidArgument(format!(
3118                    "relationship type '{}' not found in catalog for labels ({src_label_id}, {dst_label_id})",
3119                    rel_type
3120                ))
3121            })?;
3122        let rel_table_id = RelTableId(catalog_rel_id as u32);
3123
3124        // If the edge was created in this same transaction (not yet committed),
3125        // cancel the create rather than scheduling a delete.
3126        let buffered_pos = self.pending_ops.iter().position(|op| {
3127            matches!(
3128                op,
3129                PendingOp::EdgeCreate {
3130                    src: os,
3131                    dst: od,
3132                    rel_table_id: ort,
3133                    ..
3134                } if *os == src && *od == dst && *ort == rel_table_id
3135            )
3136        });
3137
3138        if let Some(pos) = buffered_pos {
3139            // Remove the buffered create and its corresponding WAL mutation.
3140            self.pending_ops.remove(pos);
3141            // The WalMutation vec is parallel to pending_ops only for structural
3142            // ops; find and remove the matching EdgeCreate entry.
3143            if let Some(wpos) = self.wal_mutations.iter().position(|m| {
3144                matches!(m, WalMutation::EdgeCreate { src: ms, dst: md, .. } if *ms == src && *md == dst)
3145            }) {
3146                self.wal_mutations.remove(wpos);
3147            }
3148            return Ok(());
3149        }
3150
3151        // Edge is on disk — schedule the deletion for commit time.
3152        self.pending_ops.push(PendingOp::EdgeDelete {
3153            src,
3154            dst,
3155            rel_table_id,
3156        });
3157        self.wal_mutations.push(WalMutation::EdgeDelete {
3158            src,
3159            dst,
3160            rel_type: rel_type.to_string(),
3161        });
3162        Ok(())
3163    }
3164
3165    // ── MVCC conflict detection (SPA-128) ────────────────────────────────────
3166
3167    /// Check for write-write conflicts before committing.
3168    ///
3169    /// A conflict is detected when another `WriteTx` has committed a change
3170    /// to a node that this transaction also dirtied, at a `txn_id` greater
3171    /// than our snapshot.
3172    fn detect_conflicts(&self) -> Result<()> {
3173        let nv = self.inner.node_versions.read().expect("node_versions lock");
3174        for &raw in &self.dirty_nodes {
3175            let last_write = nv.get(NodeId(raw));
3176            if last_write > self.snapshot_txn_id {
3177                return Err(Error::WriteWriteConflict { node_id: raw });
3178            }
3179        }
3180        Ok(())
3181    }
3182
3183    // ── Full-text index maintenance ──────────────────────────────────────────
3184
3185    /// Add a node document to a named full-text index.
3186    ///
3187    /// Call after creating or updating a node to keep the index current.
3188    /// The `text` should be the concatenated string value(s) of the indexed
3189    /// properties.  Changes are flushed to disk immediately (no WAL for v1).
3190    ///
3191    /// # Example
3192    /// ```no_run
3193    /// # use sparrowdb::GraphDb;
3194    /// # use sparrowdb_common::NodeId;
3195    /// # let db = GraphDb::open(std::path::Path::new("/tmp/test")).unwrap();
3196    /// # let mut tx = db.begin_write().unwrap();
3197    /// # let node_id = NodeId(0);
3198    /// tx.add_to_fulltext_index("searchIndex", node_id, "some searchable text")?;
3199    /// # Ok::<(), sparrowdb_common::Error>(())
3200    /// ```
3201    pub fn add_to_fulltext_index(
3202        &mut self,
3203        index_name: &str,
3204        node_id: NodeId,
3205        text: &str,
3206    ) -> Result<()> {
3207        use sparrowdb_storage::fulltext_index::FulltextIndex;
3208        // Lazily open the index and cache it for the lifetime of this
3209        // transaction.  All additions are batched and flushed once on commit,
3210        // avoiding an open+flush round-trip per document.
3211        let idx = match self.fulltext_pending.get_mut(index_name) {
3212            Some(existing) => existing,
3213            None => {
3214                let opened = FulltextIndex::open(&self.inner.path, index_name)?;
3215                self.fulltext_pending.insert(index_name.to_owned(), opened);
3216                self.fulltext_pending.get_mut(index_name).unwrap()
3217            }
3218        };
3219        idx.add_document(node_id.0, text);
3220        Ok(())
3221    }
3222
3223    // ── Commit ───────────────────────────────────────────────────────────────
3224
3225    /// Commit the transaction.
3226    ///
3227    /// WAL-first protocol (SPA-184):
3228    ///
3229    /// 1. Detects write-write conflicts (SPA-128).
3230    /// 2. Drains staged property updates (does NOT apply to disk yet).
3231    /// 3. Atomically increments the global `current_txn_id` to obtain the new
3232    ///    transaction ID that will label all WAL records.
3233    /// 4. **Writes WAL records and fsyncs** (Begin + all structural mutations +
3234    ///    all property updates + Commit) so that the intent is durable before
3235    ///    any data page is touched (SPA-184).
3236    /// 5. Applies all buffered structural operations to storage (SPA-181):
3237    ///    node creates, node deletes, edge creates.
3238    /// 6. Flushes all staged `set_node_col` updates to disk.
3239    /// 7. Records before-images in the version chain at the previous `txn_id`,
3240    ///    preserving snapshot access for currently-open readers.
3241    /// 8. Records the new values in the version chain at the new `txn_id`.
3242    /// 9. Updates per-node version table for future conflict detection.
3243    #[must_use = "check the Result; a failed commit means nothing was written"]
3244    pub fn commit(mut self) -> Result<TxnId> {
3245        // Step 1: MVCC conflict abort (SPA-128).
3246        self.detect_conflicts()?;
3247
3248        // Step 2: Drain staged property updates — collect but do NOT write to
3249        // disk yet.  We need them in hand to emit WAL records before touching
3250        // any data page.
3251        let updates: Vec<((u64, u32), StagedUpdate)> = self.write_buf.updates.drain().collect();
3252
3253        // Step 3: Increment txn_id with Release ordering.  We need the ID now
3254        // so that WAL records (emitted next) carry the correct txn_id.
3255        let new_id = self.inner.current_txn_id.fetch_add(1, Ordering::Release) + 1;
3256
3257        // Step 4: WAL-first — write all mutation records and fsync before
3258        // touching any data page (SPA-184).  A crash between here and the end
3259        // of Step 6 is recoverable: WAL replay will re-apply the ops.
3260        write_mutation_wal(
3261            &self.inner.wal_writer,
3262            new_id,
3263            &updates,
3264            &self.wal_mutations,
3265        )?;
3266
3267        // Step 5: Apply buffered structural operations to disk (SPA-181).
3268        // WAL is already durable at this point; a crash here is safe.
3269        //
3270        // SPA-212 (write-amplification fix): NodeCreate ops are collected into a
3271        // single batch and written via `batch_write_node_creates`, which opens
3272        // each (label_id, col_id) file only once regardless of how many nodes
3273        // share that column.  This reduces file-open syscalls from O(nodes×cols)
3274        // to O(labels×cols) per transaction commit.
3275        let mut col_writes: Vec<(u32, u32, u32, u64, bool)> = Vec::new();
3276        // All (label_id, slot) pairs for created nodes — needed for HWM
3277        // advancement, even when a node has zero properties.
3278        let mut node_slots: Vec<(u32, u32)> = Vec::new();
3279
3280        for op in self.pending_ops.drain(..) {
3281            match op {
3282                PendingOp::NodeCreate {
3283                    label_id,
3284                    slot,
3285                    props,
3286                } => {
3287                    // Track this node's (label_id, slot) for HWM advancement.
3288                    node_slots.push((label_id, slot));
3289                    // Encode each property value and push (label_id, col_id,
3290                    // slot, raw_u64, is_present) into the batch buffer.
3291                    for (col_id, ref val) in props {
3292                        let raw = self.store.encode_value(val)?;
3293                        col_writes.push((label_id, col_id, slot, raw, true));
3294                    }
3295                }
3296                PendingOp::NodeDelete { node_id } => {
3297                    self.store.tombstone_node(node_id)?;
3298                }
3299                PendingOp::EdgeCreate {
3300                    src,
3301                    dst,
3302                    rel_table_id,
3303                    props,
3304                } => {
3305                    let mut es = EdgeStore::open(&self.inner.path, rel_table_id)?;
3306                    es.create_edge(src, rel_table_id, dst)?;
3307                    // Persist edge properties keyed by (src_slot, dst_slot) so
3308                    // that reads work correctly after CHECKPOINT (SPA-240).
3309                    if !props.is_empty() {
3310                        let src_slot = src.0 & 0xFFFF_FFFF;
3311                        let dst_slot = dst.0 & 0xFFFF_FFFF;
3312                        for (col_id, value) in &props {
3313                            es.set_edge_prop(src_slot, dst_slot, *col_id, *value)?;
3314                        }
3315                        // SPA-261: invalidate cached edge props for this rel table.
3316                        self.inner
3317                            .edge_props_cache
3318                            .write()
3319                            .expect("edge_props_cache poisoned")
3320                            .remove(&rel_table_id.0);
3321                    }
3322                }
3323                PendingOp::EdgeDelete {
3324                    src,
3325                    dst,
3326                    rel_table_id,
3327                } => {
3328                    let mut es = EdgeStore::open(&self.inner.path, rel_table_id)?;
3329                    es.delete_edge(src, dst)?;
3330                }
3331            }
3332        }
3333
3334        // Flush all NodeCreate column writes in one batched call.
3335        // O(labels × cols) file opens instead of O(nodes × cols).
3336        self.store
3337            .batch_write_node_creates(col_writes, &node_slots)?;
3338
3339        // Step 5b: Persist HWMs for all labels that received new nodes in Step 5.
3340        //
3341        // batch_write_node_creates() advances the in-memory HWM and marks
3342        // labels dirty (hwm_dirty).  We flush all dirty HWMs here — once per
3343        // commit — avoiding an fsync storm during bulk imports (SPA-217
3344        // regression fix).  Crash safety is preserved: the WAL record written
3345        // in Step 4 is already durable; on crash-recovery, the WAL replayer
3346        // re-applies all NodeCreate ops and re-advances the HWM.
3347        self.store.flush_hwms()?;
3348
3349        // Step 6: Flush property updates to disk.
3350        // Use `upsert_node_col` so that columns added by `set_property` (which
3351        // may not have been initialised during `create_node`) are created and
3352        // zero-padded automatically.
3353        for ((node_raw, col_id), ref staged) in &updates {
3354            self.store
3355                .upsert_node_col(NodeId(*node_raw), *col_id, &staged.new_value)?;
3356        }
3357
3358        // Step 7+8: Publish versions.
3359        {
3360            let mut vs = self.inner.versions.write().expect("version lock poisoned");
3361            for ((node_raw, col_id), ref staged) in &updates {
3362                // Publish before-image at the previous txn_id so that readers
3363                // pinned at that snapshot continue to see the correct value.
3364                if let Some((prev_txn_id, ref before_val)) = staged.before_image {
3365                    vs.insert(NodeId(*node_raw), *col_id, prev_txn_id, before_val.clone());
3366                }
3367                // Publish new value at the current txn_id.
3368                vs.insert(NodeId(*node_raw), *col_id, new_id, staged.new_value.clone());
3369            }
3370        }
3371
3372        // Step 9: Advance per-node version table.
3373        {
3374            let mut nv = self
3375                .inner
3376                .node_versions
3377                .write()
3378                .expect("node_versions lock");
3379            for &raw in &self.dirty_nodes {
3380                nv.set(NodeId(raw), new_id);
3381            }
3382        }
3383
3384        // Step 10: Flush any pending fulltext index updates.
3385        // The primary DB mutations above are already durable (WAL written,
3386        // txn_id advanced).  A flush failure here must NOT return Err and
3387        // cause the caller to retry an already-committed transaction.  Log the
3388        // error so operators can investigate but treat the commit as successful.
3389        for (name, mut idx) in self.fulltext_pending.drain() {
3390            if let Err(e) = idx.flush() {
3391                tracing::error!(index = %name, error = %e, "fulltext index flush failed post-commit; index may be stale until next write");
3392            }
3393        }
3394
3395        // Step 11: Refresh the shared catalog cache so subsequent reads see
3396        // any labels / rel types created in this transaction (SPA-188).
3397        // Also rebuild the label-row-count cache (SPA-190) from the freshly
3398        // opened catalog to avoid a second I/O trip.
3399        if let Ok(fresh) = Catalog::open(&self.inner.path) {
3400            let new_counts = build_label_row_counts_from_disk(&fresh, &self.inner.path);
3401            *self.inner.catalog.write().expect("catalog RwLock poisoned") = fresh;
3402            *self
3403                .inner
3404                .label_row_counts
3405                .write()
3406                .expect("label_row_counts RwLock poisoned") = new_counts;
3407        }
3408
3409        // Step 12: Invalidate the shared property-index cache (SPA-259).
3410        //
3411        // GraphDb-level execute functions (execute_create_standalone, etc.)
3412        // call invalidate_prop_index() again after this — the extra clear
3413        // is harmless (just bumps the generation counter).  The critical
3414        // case is when a caller uses WriteTx directly without going through
3415        // a GraphDb::execute path, which previously left the cache stale.
3416        self.inner
3417            .prop_index
3418            .write()
3419            .expect("prop_index RwLock poisoned")
3420            .clear();
3421
3422        self.committed = true;
3423        Ok(TxnId(new_id))
3424    }
3425}
3426
3427impl Drop for WriteTx {
3428    fn drop(&mut self) {
3429        // Uncommitted staged updates are discarded here — no writes to disk.
3430        // The write lock is released by dropping `_guard` (WriteGuard).
3431        let _ = self.committed;
3432    }
3433}
3434
3435// ── WAL mutation emission (SPA-127) ───────────────────────────────────────────
3436
3437/// Append mutation WAL records for a committed transaction.
3438///
3439/// Uses the persistent `wal_writer` cached in [`DbInner`] — no segment scan
3440/// or file-open overhead on each call (SPA-210).  Emits:
3441/// - `Begin`
3442/// - `NodeUpdate` for each staged property change in `updates`
3443/// - `NodeCreate` / `NodeUpdate` / `NodeDelete` / `EdgeCreate` for each structural mutation
3444/// - `Commit`
3445/// - fsync
3446fn write_mutation_wal(
3447    wal_writer: &Mutex<WalWriter>,
3448    txn_id: u64,
3449    updates: &[((u64, u32), StagedUpdate)],
3450    mutations: &[WalMutation],
3451) -> Result<()> {
3452    // Nothing to record if there were no mutations or property updates.
3453    if updates.is_empty() && mutations.is_empty() {
3454        return Ok(());
3455    }
3456
3457    let mut wal = wal_writer.lock().unwrap_or_else(|e| e.into_inner());
3458    let txn = TxnId(txn_id);
3459
3460    wal.append(WalRecordKind::Begin, txn, WalPayload::Empty)?;
3461
3462    // NodeUpdate records for each property change in the write buffer.
3463    for ((node_raw, col_id), staged) in updates {
3464        let before_bytes = staged
3465            .before_image
3466            .as_ref()
3467            .map(|(_, v)| value_to_wal_bytes(v))
3468            .unwrap_or_else(|| 0u64.to_le_bytes().to_vec());
3469        let after_bytes = value_to_wal_bytes(&staged.new_value);
3470        wal.append(
3471            WalRecordKind::NodeUpdate,
3472            txn,
3473            WalPayload::NodeUpdate {
3474                node_id: *node_raw,
3475                key: staged.key_name.clone(),
3476                col_id: *col_id,
3477                before: before_bytes,
3478                after: after_bytes,
3479            },
3480        )?;
3481    }
3482
3483    // Structural mutations.
3484    for m in mutations {
3485        match m {
3486            WalMutation::NodeCreate {
3487                node_id,
3488                label_id,
3489                props,
3490                prop_names,
3491            } => {
3492                // Use human-readable names when available; fall back to
3493                // "col_{hash}" when only the col_id is known (low-level API).
3494                let wal_props: Vec<(String, Vec<u8>)> = props
3495                    .iter()
3496                    .enumerate()
3497                    .map(|(i, (col_id, v))| {
3498                        let name = prop_names
3499                            .get(i)
3500                            .filter(|n| !n.is_empty())
3501                            .cloned()
3502                            .unwrap_or_else(|| format!("col_{col_id}"));
3503                        (name, value_to_wal_bytes(v))
3504                    })
3505                    .collect();
3506                wal.append(
3507                    WalRecordKind::NodeCreate,
3508                    txn,
3509                    WalPayload::NodeCreate {
3510                        node_id: node_id.0,
3511                        label_id: *label_id,
3512                        props: wal_props,
3513                    },
3514                )?;
3515            }
3516            WalMutation::NodeDelete { node_id } => {
3517                wal.append(
3518                    WalRecordKind::NodeDelete,
3519                    txn,
3520                    WalPayload::NodeDelete { node_id: node_id.0 },
3521                )?;
3522            }
3523            WalMutation::EdgeCreate {
3524                edge_id,
3525                src,
3526                dst,
3527                rel_type,
3528                prop_entries,
3529            } => {
3530                let wal_props: Vec<(String, Vec<u8>)> = prop_entries
3531                    .iter()
3532                    .map(|(name, val)| (name.clone(), value_to_wal_bytes(val)))
3533                    .collect();
3534                wal.append(
3535                    WalRecordKind::EdgeCreate,
3536                    txn,
3537                    WalPayload::EdgeCreate {
3538                        edge_id: edge_id.0,
3539                        src: src.0,
3540                        dst: dst.0,
3541                        rel_type: rel_type.clone(),
3542                        props: wal_props,
3543                    },
3544                )?;
3545            }
3546            WalMutation::EdgeDelete { src, dst, rel_type } => {
3547                wal.append(
3548                    WalRecordKind::EdgeDelete,
3549                    txn,
3550                    WalPayload::EdgeDelete {
3551                        src: src.0,
3552                        dst: dst.0,
3553                        rel_type: rel_type.clone(),
3554                    },
3555                )?;
3556            }
3557        }
3558    }
3559
3560    wal.append(WalRecordKind::Commit, txn, WalPayload::Empty)?;
3561    wal.fsync()?;
3562    Ok(())
3563}
3564
3565// ── FNV-1a col_id derivation ─────────────────────────────────────────────────
3566
3567/// Derive a stable `u32` column ID from a property key name.
3568///
3569/// Delegates to [`sparrowdb_common::col_id_of`] — the single canonical
3570/// FNV-1a implementation shared by storage and execution (SPA-160).
3571pub fn fnv1a_col_id(key: &str) -> u32 {
3572    col_id_of(key)
3573}
3574
3575// ── Cypher string utilities ────────────────────────────────────────────────────
3576
3577/// Escape a Rust `&str` so it can be safely interpolated inside a single-quoted
3578/// Cypher string literal.
3579///
3580/// Two characters require escaping inside Cypher single-quoted strings:
3581/// * `\` → `\\`  (backslash must be doubled first to avoid misinterpreting
3582///   the subsequent escape sequence)
3583/// * `'` → `\'`  (prevents premature termination of the string literal)
3584///
3585/// # Example
3586///
3587/// ```
3588/// use sparrowdb::cypher_escape_string;
3589/// let safe = cypher_escape_string("O'Reilly");
3590/// let cypher = format!("MATCH (n {{name: '{safe}'}}) RETURN n");
3591/// assert_eq!(cypher, "MATCH (n {name: 'O\\'Reilly'}) RETURN n");
3592/// ```
3593///
3594/// **Prefer parameterized queries** (`execute_with_params`) over string
3595/// interpolation whenever possible — this function is provided for the cases
3596/// where dynamic query construction cannot be avoided (SPA-218).
3597pub fn cypher_escape_string(s: &str) -> String {
3598    // Process backslash first so that the apostrophe replacement below does
3599    // not accidentally double-escape newly-inserted backslashes.
3600    s.replace('\\', "\\\\").replace('\'', "\\'")
3601}
3602
3603// ── Mutation value helpers ─────────────────────────────────────────────────────
3604
3605/// Convert a Cypher [`Literal`] to a storage [`Value`].
3606fn literal_to_value(lit: &sparrowdb_cypher::ast::Literal) -> Value {
3607    use sparrowdb_cypher::ast::Literal;
3608    match lit {
3609        Literal::Int(n) => Value::Int64(*n),
3610        // Float stored as Value::Float — NodeStore::encode_value writes the full
3611        // 8 IEEE-754 bytes to the overflow heap (SPA-267).
3612        Literal::Float(f) => Value::Float(*f),
3613        Literal::Bool(b) => Value::Int64(if *b { 1 } else { 0 }),
3614        Literal::String(s) => Value::Bytes(s.as_bytes().to_vec()),
3615        Literal::Null | Literal::Param(_) => Value::Int64(0),
3616    }
3617}
3618
3619/// Convert a Cypher [`Expr`] to a storage [`Value`].
3620///
3621/// Returns `true` when the `DELETE` clause variable in a `MatchMutateStatement`
3622/// refers to a relationship pattern variable rather than a node variable.
3623///
3624/// Used to route `MATCH (a)-[r:REL]->(b) DELETE r` to the edge-delete path
3625/// instead of the node-delete path.
3626fn is_edge_delete_mutation(mm: &sparrowdb_cypher::ast::MatchMutateStatement) -> bool {
3627    let sparrowdb_cypher::ast::Mutation::Delete { var } = &mm.mutation else {
3628        return false;
3629    };
3630    mm.match_patterns
3631        .iter()
3632        .any(|p| p.rels.iter().any(|r| !r.var.is_empty() && &r.var == var))
3633}
3634
3635/// Only literal expressions are supported for SET values at this stage.
3636fn expr_to_value(expr: &sparrowdb_cypher::ast::Expr) -> Value {
3637    use sparrowdb_cypher::ast::Expr;
3638    match expr {
3639        Expr::Literal(lit) => literal_to_value(lit),
3640        _ => Value::Int64(0),
3641    }
3642}
3643
3644fn literal_to_value_with_params(
3645    lit: &sparrowdb_cypher::ast::Literal,
3646    params: &HashMap<String, sparrowdb_execution::Value>,
3647) -> Result<Value> {
3648    use sparrowdb_cypher::ast::Literal;
3649    match lit {
3650        Literal::Int(n) => Ok(Value::Int64(*n)),
3651        Literal::Float(f) => Ok(Value::Float(*f)),
3652        Literal::Bool(b) => Ok(Value::Int64(if *b { 1 } else { 0 })),
3653        Literal::String(s) => Ok(Value::Bytes(s.as_bytes().to_vec())),
3654        Literal::Null => Ok(Value::Int64(0)),
3655        Literal::Param(p) => match params.get(p.as_str()) {
3656            Some(v) => Ok(exec_value_to_storage(v)),
3657            None => Err(Error::InvalidArgument(format!(
3658                "parameter ${p} was referenced in the query but not supplied"
3659            ))),
3660        },
3661    }
3662}
3663
3664fn expr_to_value_with_params(
3665    expr: &sparrowdb_cypher::ast::Expr,
3666    params: &HashMap<String, sparrowdb_execution::Value>,
3667) -> Result<Value> {
3668    use sparrowdb_cypher::ast::Expr;
3669    match expr {
3670        Expr::Literal(lit) => literal_to_value_with_params(lit, params),
3671        _ => Err(Error::InvalidArgument(
3672            "property value must be a literal or $parameter".into(),
3673        )),
3674    }
3675}
3676
3677fn exec_value_to_storage(v: &sparrowdb_execution::Value) -> Value {
3678    use sparrowdb_execution::Value as EV;
3679    match v {
3680        EV::Int64(n) => Value::Int64(*n),
3681        EV::Float64(f) => Value::Float(*f),
3682        EV::Bool(b) => Value::Int64(if *b { 1 } else { 0 }),
3683        EV::String(s) => Value::Bytes(s.as_bytes().to_vec()),
3684        _ => Value::Int64(0),
3685    }
3686}
3687
3688/// Encode a storage [`Value`] to a raw little-endian byte vector for WAL
3689/// logging.  Unlike [`Value::to_u64`], this never panics for `Float` values —
3690/// it emits the full 8 IEEE-754 bytes so the WAL payload is lossless.
3691///
3692/// The bytes are for observability only (schema introspection uses only the
3693/// property *name*, not the value); correctness of on-disk storage is
3694/// handled separately via [`NodeStore::encode_value`].
3695fn value_to_wal_bytes(v: &Value) -> Vec<u8> {
3696    match v {
3697        Value::Float(f) => f.to_bits().to_le_bytes().to_vec(),
3698        // For Int64 and Bytes the existing to_u64() encoding is fine.
3699        other => other.to_u64().to_le_bytes().to_vec(),
3700    }
3701}
3702
3703/// Convert a storage-layer `Value` (Int64 / Bytes / Float) to the execution-layer
3704/// `Value` (Int64 / String / Float64 / Null / …) used in `QueryResult` rows.
3705fn storage_value_to_exec(val: &Value) -> sparrowdb_execution::Value {
3706    match val {
3707        Value::Int64(n) => sparrowdb_execution::Value::Int64(*n),
3708        Value::Bytes(b) => {
3709            sparrowdb_execution::Value::String(String::from_utf8_lossy(b).into_owned())
3710        }
3711        Value::Float(f) => sparrowdb_execution::Value::Float64(*f),
3712    }
3713}
3714
3715/// Evaluate a RETURN expression against a simple name→ExecValue map built
3716/// from the merged node's properties.  Used exclusively by `execute_merge`.
3717///
3718/// Supports `PropAccess` (e.g. `n.name`) and `Literal`; everything else
3719/// falls back to `Null`.
3720fn eval_expr_merge(
3721    expr: &sparrowdb_cypher::ast::Expr,
3722    vals: &HashMap<String, sparrowdb_execution::Value>,
3723) -> sparrowdb_execution::Value {
3724    use sparrowdb_cypher::ast::{Expr, Literal};
3725    match expr {
3726        Expr::PropAccess { var, prop } => {
3727            let key = format!("{var}.{prop}");
3728            vals.get(&key)
3729                .cloned()
3730                .unwrap_or(sparrowdb_execution::Value::Null)
3731        }
3732        Expr::Literal(lit) => match lit {
3733            Literal::Int(n) => sparrowdb_execution::Value::Int64(*n),
3734            Literal::Float(f) => sparrowdb_execution::Value::Float64(*f),
3735            Literal::Bool(b) => sparrowdb_execution::Value::Bool(*b),
3736            Literal::String(s) => sparrowdb_execution::Value::String(s.clone()),
3737            Literal::Null | Literal::Param(_) => sparrowdb_execution::Value::Null,
3738        },
3739        Expr::Var(v) => vals
3740            .get(v.as_str())
3741            .cloned()
3742            .unwrap_or(sparrowdb_execution::Value::Null),
3743        _ => sparrowdb_execution::Value::Null,
3744    }
3745}
3746
3747// ── Private helpers ───────────────────────────────────────────────────────────
3748
3749fn collect_maintenance_params(
3750    catalog: &Catalog,
3751    node_store: &NodeStore,
3752    db_root: &Path,
3753) -> (Vec<u32>, u64) {
3754    // SPA-185: collect all registered rel table IDs from the catalog instead
3755    // of hardcoding [0].  This ensures every per-type edge store is checkpointed.
3756    // Always include table-0 so that any edges written before the catalog had
3757    // entries (legacy data or pre-SPA-185 databases) are also checkpointed.
3758    let rel_table_entries = catalog.list_rel_table_ids();
3759    let mut rel_table_ids: Vec<u32> = rel_table_entries
3760        .iter()
3761        .map(|(id, _, _, _)| *id as u32)
3762        .collect();
3763    // Always include the legacy table-0 slot.  Dedup if already present.
3764    if !rel_table_ids.contains(&0u32) {
3765        rel_table_ids.push(0u32);
3766    }
3767
3768    // n_nodes must cover the highest node-store HWM AND the highest node ID
3769    // present in any delta record, so that the CSR bounds check passes even
3770    // when edges were inserted without going through the node-store API.
3771    let hwm_n_nodes: u64 = catalog
3772        .list_labels()
3773        .unwrap_or_default()
3774        .iter()
3775        .map(|(label_id, _name)| node_store.hwm_for_label(*label_id as u32).unwrap_or(0))
3776        .sum::<u64>();
3777
3778    // Also scan delta records for the maximum *slot* index used.
3779    // NodeIds encode `(label_id << 32) | slot`; the CSR is indexed by slot,
3780    // so we must strip the label bits before computing n_nodes.  Using the
3781    // full NodeId here would inflate n_nodes into the billions (e.g. label_id=1
3782    // → slot 0 appears as 0x0000_0001_0000_0000) and the CSR would be built
3783    // with nonsensical degree arrays — the SPA-186 root cause.
3784    let delta_max: u64 = rel_table_ids
3785        .iter()
3786        .filter_map(|&rel_id| {
3787            EdgeStore::open(db_root, RelTableId(rel_id))
3788                .ok()
3789                .and_then(|s| s.read_delta().ok())
3790        })
3791        .flat_map(|records| {
3792            records.into_iter().flat_map(|r| {
3793                // Strip label bits — CSR needs slot indices only.
3794                let src_slot = r.src.0 & 0xFFFF_FFFF;
3795                let dst_slot = r.dst.0 & 0xFFFF_FFFF;
3796                [src_slot, dst_slot].into_iter()
3797            })
3798        })
3799        .max()
3800        .map(|max_slot| max_slot + 1)
3801        .unwrap_or(0);
3802
3803    let n_nodes = hwm_n_nodes.max(delta_max).max(1);
3804
3805    (rel_table_ids, n_nodes)
3806}
3807
3808/// Open all per-type CSR forward files that exist on disk, keyed by rel_table_id.
3809///
3810/// SPA-185: replaces the old `open_csr_forward` that only opened `RelTableId(0)`.
3811/// The catalog is used to discover all registered rel types.
3812/// Build a `LabelId → node count` map by reading each label's HWM from disk
3813/// (SPA-190).  Called at `GraphDb::open()` and after node-mutating writes.
3814fn build_label_row_counts_from_disk(catalog: &Catalog, db_root: &Path) -> HashMap<LabelId, usize> {
3815    let store = match NodeStore::open(db_root) {
3816        Ok(s) => s,
3817        Err(_) => return HashMap::new(),
3818    };
3819    catalog
3820        .list_labels()
3821        .unwrap_or_default()
3822        .into_iter()
3823        .filter_map(|(lid, _name)| {
3824            let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
3825            if hwm > 0 {
3826                Some((lid, hwm as usize))
3827            } else {
3828                None
3829            }
3830        })
3831        .collect()
3832}
3833
3834fn open_csr_map(path: &Path) -> HashMap<u32, CsrForward> {
3835    let catalog = match Catalog::open(path) {
3836        Ok(c) => c,
3837        Err(_) => return HashMap::new(),
3838    };
3839    let mut map = HashMap::new();
3840
3841    // Collect rel IDs from catalog.
3842    let mut rel_ids: Vec<u32> = catalog
3843        .list_rel_table_ids()
3844        .into_iter()
3845        .map(|(id, _, _, _)| id as u32)
3846        .collect();
3847
3848    // Always include the legacy table-0 slot so that checkpointed CSRs
3849    // written before the catalog had entries (pre-SPA-185 data) are loaded.
3850    if !rel_ids.contains(&0u32) {
3851        rel_ids.push(0u32);
3852    }
3853
3854    for rid in rel_ids {
3855        if let Ok(store) = EdgeStore::open(path, RelTableId(rid)) {
3856            if let Ok(csr) = store.open_fwd() {
3857                map.insert(rid, csr);
3858            }
3859        }
3860    }
3861    map
3862}
3863
3864/// Like [`open_csr_map`] but surfaces the catalog-open error so callers can
3865/// decide whether to replace an existing cache.  Used by
3866/// [`GraphDb::invalidate_csr_map`] to avoid clobbering a valid in-memory map
3867/// with an empty one when the catalog is transiently unreadable.
3868fn try_open_csr_map(path: &Path) -> Result<HashMap<u32, CsrForward>> {
3869    let catalog = Catalog::open(path)?;
3870    let mut map = HashMap::new();
3871
3872    let mut rel_ids: Vec<u32> = catalog
3873        .list_rel_table_ids()
3874        .into_iter()
3875        .map(|(id, _, _, _)| id as u32)
3876        .collect();
3877
3878    if !rel_ids.contains(&0u32) {
3879        rel_ids.push(0u32);
3880    }
3881
3882    for rid in rel_ids {
3883        if let Ok(store) = EdgeStore::open(path, RelTableId(rid)) {
3884            if let Ok(csr) = store.open_fwd() {
3885                map.insert(rid, csr);
3886            }
3887        }
3888    }
3889    Ok(map)
3890}
3891
3892// ── Storage-size helpers (SPA-171) ────────────────────────────────────────────
3893
3894fn dir_size_bytes(dir: &Path) -> u64 {
3895    let mut total: u64 = 0;
3896    let Ok(entries) = std::fs::read_dir(dir) else {
3897        return 0;
3898    };
3899    for e in entries.flatten() {
3900        let p = e.path();
3901        if p.is_dir() {
3902            total += dir_size_bytes(&p);
3903        } else if let Ok(m) = std::fs::metadata(&p) {
3904            total += m.len();
3905        }
3906    }
3907    total
3908}
3909
3910// ── Reserved label/type protection (SPA-208) ──────────────────────────────────
3911
3912/// Returns `true` if `label` starts with the reserved `__SO_` prefix.
3913///
3914/// The `__SO_` namespace is reserved for internal SparrowDB system objects.
3915/// Any attempt to CREATE a node or relationship using a label/type in this
3916/// namespace is rejected with an [`Error::InvalidArgument`].
3917#[inline]
3918fn is_reserved_label(label: &str) -> bool {
3919    label.starts_with("__SO_")
3920}
3921
3922/// Return an [`Error::InvalidArgument`] for a reserved label/type.
3923fn reserved_label_error(label: &str) -> sparrowdb_common::Error {
3924    sparrowdb_common::Error::InvalidArgument(format!(
3925        "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
3926    ))
3927}
3928
3929// ── Tests ─────────────────────────────────────────────────────────────────────
3930
3931#[cfg(test)]
3932mod tests {
3933    use super::*;
3934
3935    /// Phase 0 gate: create a DB directory, open it, and let it drop cleanly.
3936    #[test]
3937    fn open_and_close_empty_db() {
3938        let dir = tempfile::tempdir().expect("tempdir");
3939        let db_path = dir.path().join("test.sparrow");
3940
3941        assert!(!db_path.exists());
3942        let db = GraphDb::open(&db_path).expect("open must succeed");
3943        assert!(db_path.is_dir());
3944        drop(db);
3945    }
3946
3947    #[test]
3948    fn open_existing_dir_succeeds() {
3949        let dir = tempfile::tempdir().expect("tempdir");
3950        let db_path = dir.path().join("existing.sparrow");
3951        std::fs::create_dir_all(&db_path).unwrap();
3952        let db = GraphDb::open(&db_path).expect("open existing must succeed");
3953        drop(db);
3954    }
3955
3956    #[test]
3957    fn open_fn_signature() {
3958        let _: fn(&Path) -> Result<GraphDb> = open;
3959    }
3960
3961    #[test]
3962    fn begin_read_returns_snapshot_zero_before_any_write() {
3963        let dir = tempfile::tempdir().unwrap();
3964        let db = GraphDb::open(dir.path()).unwrap();
3965        let rx = db.begin_read().unwrap();
3966        assert_eq!(rx.snapshot_txn_id, 0);
3967    }
3968
3969    #[test]
3970    fn begin_write_increments_txn_id_on_commit() {
3971        let dir = tempfile::tempdir().unwrap();
3972        let db = GraphDb::open(dir.path()).unwrap();
3973
3974        let tx = db.begin_write().unwrap();
3975        let committed = tx.commit().unwrap();
3976        assert_eq!(committed.0, 1);
3977
3978        let tx2 = db.begin_write().unwrap();
3979        let committed2 = tx2.commit().unwrap();
3980        assert_eq!(committed2.0, 2);
3981    }
3982
3983    #[test]
3984    fn second_begin_write_returns_writer_busy() {
3985        let dir = tempfile::tempdir().unwrap();
3986        let db = GraphDb::open(dir.path()).unwrap();
3987
3988        let _tx1 = db.begin_write().unwrap();
3989        let err = db.begin_write().err().expect("expected Err");
3990        assert!(
3991            matches!(err, Error::WriterBusy),
3992            "expected WriterBusy, got {err}"
3993        );
3994    }
3995
3996    #[test]
3997    fn write_lock_released_after_commit() {
3998        let dir = tempfile::tempdir().unwrap();
3999        let db = GraphDb::open(dir.path()).unwrap();
4000
4001        let tx = db.begin_write().unwrap();
4002        tx.commit().unwrap();
4003
4004        // Should succeed because the lock was released on commit/drop.
4005        let tx2 = db.begin_write().unwrap();
4006        tx2.commit().unwrap();
4007    }
4008
4009    #[test]
4010    fn graphdb_checkpoint_on_empty_db() {
4011        let dir = tempfile::tempdir().unwrap();
4012        let db = GraphDb::open(dir.path()).unwrap();
4013        // Checkpoint on empty DB must not panic or error.
4014        db.checkpoint()
4015            .expect("checkpoint must succeed on empty DB");
4016    }
4017
4018    #[test]
4019    fn graphdb_optimize_on_empty_db() {
4020        let dir = tempfile::tempdir().unwrap();
4021        let db = GraphDb::open(dir.path()).unwrap();
4022        db.optimize().expect("optimize must succeed on empty DB");
4023    }
4024
4025    /// SPA-162: checkpoint() must not deadlock after a write transaction has
4026    /// been committed and dropped.
4027    #[test]
4028    fn checkpoint_does_not_deadlock_after_write() {
4029        use std::sync::Arc;
4030
4031        let dir = tempfile::tempdir().unwrap();
4032        let db = Arc::new(GraphDb::open(dir.path()).unwrap());
4033
4034        // Run a write transaction and commit it.
4035        let mut tx = db.begin_write().unwrap();
4036        tx.create_node(0, &[]).unwrap();
4037        tx.commit().unwrap();
4038
4039        // checkpoint() must complete without hanging — run it on a thread so
4040        // the test runner can time out rather than block the whole suite.
4041        let db2 = Arc::clone(&db);
4042        let handle = std::thread::spawn(move || {
4043            db2.checkpoint().unwrap();
4044        });
4045        handle
4046            .join()
4047            .expect("checkpoint thread must complete without panic");
4048    }
4049
4050    /// SPA-162: checkpoint() must return WriterBusy (not deadlock) when a
4051    /// WriteTx is currently active.  Before the fix, calling lock() from
4052    /// checkpoint() while the same thread held the mutex would hang forever.
4053    #[test]
4054    fn checkpoint_returns_writer_busy_while_write_tx_active() {
4055        let dir = tempfile::tempdir().unwrap();
4056        let db = GraphDb::open(dir.path()).unwrap();
4057
4058        // Hold an active write transaction without committing it.
4059        let tx = db.begin_write().unwrap();
4060
4061        // checkpoint() must return WriterBusy immediately — not deadlock.
4062        let result = db.checkpoint();
4063        assert!(
4064            matches!(result, Err(Error::WriterBusy)),
4065            "expected WriterBusy while WriteTx active, got: {result:?}"
4066        );
4067
4068        // Drop the transaction; checkpoint must now succeed.
4069        drop(tx);
4070        db.checkpoint()
4071            .expect("checkpoint must succeed after WriteTx dropped");
4072    }
4073
4074    /// SPA-181: Dropping a WriteTx without calling commit() must not persist
4075    /// any mutations.  The node-store HWM must remain at 0 after a dropped tx.
4076    #[test]
4077    fn dropped_write_tx_persists_no_nodes() {
4078        use sparrowdb_storage::node_store::NodeStore;
4079
4080        let dir = tempfile::tempdir().unwrap();
4081        let db = GraphDb::open(dir.path()).unwrap();
4082
4083        {
4084            let mut tx = db.begin_write().unwrap();
4085            // Stage a node creation — should NOT be written to disk.
4086            let _node_id = tx.create_node(0, &[]).unwrap();
4087            // Drop WITHOUT calling commit().
4088        }
4089
4090        // Verify no node was persisted.
4091        let store = NodeStore::open(dir.path()).unwrap();
4092        let hwm = store.hwm_for_label(0).unwrap();
4093        assert_eq!(hwm, 0, "dropped tx must not persist any nodes (SPA-181)");
4094
4095        // A subsequent write transaction must be obtainable (lock released).
4096        let tx2 = db
4097            .begin_write()
4098            .expect("write lock must be released after drop");
4099        tx2.commit().unwrap();
4100    }
4101
4102    /// SPA-181: A sequence of create_node + create_edge where the whole
4103    /// transaction is dropped leaves the store entirely unchanged.
4104    #[test]
4105    fn dropped_write_tx_persists_no_edges() {
4106        use sparrowdb_storage::edge_store::EdgeStore;
4107
4108        let dir = tempfile::tempdir().unwrap();
4109        let db = GraphDb::open(dir.path()).unwrap();
4110
4111        // First, commit two nodes so we have valid src/dst IDs.
4112        let (src, dst) = {
4113            let mut tx = db.begin_write().unwrap();
4114            let src = tx.create_node(0, &[]).unwrap();
4115            let dst = tx.create_node(0, &[]).unwrap();
4116            tx.commit().unwrap();
4117            (src, dst)
4118        };
4119
4120        {
4121            let mut tx = db.begin_write().unwrap();
4122            // Stage an edge — should NOT be written to disk.
4123            tx.create_edge(src, dst, "KNOWS", std::collections::HashMap::new())
4124                .unwrap();
4125            // Drop WITHOUT calling commit().
4126        }
4127
4128        // Verify no edge was persisted (delta log must be empty or absent).
4129        let delta = EdgeStore::open(dir.path(), sparrowdb_storage::edge_store::RelTableId(0))
4130            .and_then(|s| s.read_delta())
4131            .unwrap_or_default();
4132        assert_eq!(
4133            delta.len(),
4134            0,
4135            "dropped tx must not persist any edges (SPA-181)"
4136        );
4137    }
4138
4139    /// SPA-181: The old UB transmute is gone.  Verify the write lock cycles
4140    /// correctly: acquire → use → drop → acquire again.
4141    #[test]
4142    fn write_guard_releases_lock_on_drop() {
4143        let dir = tempfile::tempdir().unwrap();
4144        let db = GraphDb::open(dir.path()).unwrap();
4145
4146        for _ in 0..5 {
4147            let tx = db.begin_write().expect("lock must be free");
4148            assert!(matches!(db.begin_write(), Err(Error::WriterBusy)));
4149            drop(tx);
4150        }
4151    }
4152
4153    // ── SPA-171: GraphDb::stats() ─────────────────────────────────────────────
4154
4155    #[test]
4156    fn stats_empty_db() {
4157        let dir = tempfile::tempdir().unwrap();
4158        let db = GraphDb::open(dir.path()).unwrap();
4159        let stats = db.stats().unwrap();
4160        assert_eq!(stats.edge_count, 0);
4161        assert_eq!(stats.node_count_per_label.len(), 0);
4162        // SPA-210: WalWriter is opened eagerly at GraphDb::open time, creating
4163        // the initial segment file (1-byte version header).  wal_bytes may be
4164        // a small non-zero value before any commits.
4165        // (Previously the WAL directory was only created on first commit.)
4166        let _ = stats.wal_bytes; // accept any value — presence is fine
4167    }
4168
4169    #[test]
4170    fn stats_after_writes() {
4171        let dir = tempfile::tempdir().unwrap();
4172        let db = GraphDb::open(dir.path()).unwrap();
4173        // Create two nodes and one edge via low-level API.
4174        let (src, dst) = {
4175            let mut tx = db.begin_write().unwrap();
4176            let a = tx.create_node(0, &[]).unwrap();
4177            let b = tx.create_node(0, &[]).unwrap();
4178            tx.commit().unwrap();
4179            (a, b)
4180        };
4181        {
4182            let mut tx = db.begin_write().unwrap();
4183            tx.create_edge(src, dst, "KNOWS", std::collections::HashMap::new())
4184                .unwrap();
4185            tx.commit().unwrap();
4186        }
4187        // Also create a labeled node via Cypher so catalog has a label entry.
4188        db.execute("CREATE (n:Person {name: 'Alice'})").unwrap();
4189        let stats = db.stats().unwrap();
4190        assert!(
4191            stats
4192                .node_count_per_label
4193                .get("Person")
4194                .copied()
4195                .unwrap_or(0)
4196                >= 1
4197        );
4198        assert_eq!(
4199            stats.edge_count, 1,
4200            "expected 1 edge, got {}",
4201            stats.edge_count
4202        );
4203        assert!(stats.wal_bytes > 0);
4204        assert!(stats.total_bytes >= stats.wal_bytes);
4205        assert!(stats.bytes_per_label.get("Person").copied().unwrap_or(0) > 0);
4206    }
4207
4208    #[test]
4209    fn call_db_stats_cypher() {
4210        let dir = tempfile::tempdir().unwrap();
4211        let db = GraphDb::open(dir.path()).unwrap();
4212        db.execute("CREATE (n:Widget {name: 'w1'})").unwrap();
4213        let result = db.execute("CALL db.stats() YIELD metric, value").unwrap();
4214        assert_eq!(result.columns, vec!["metric", "value"]);
4215        assert!(!result.rows.is_empty());
4216        let metrics: std::collections::HashMap<_, _> = result
4217            .rows
4218            .iter()
4219            .filter_map(|row| match (&row[0], &row[1]) {
4220                (sparrowdb_execution::Value::String(m), sparrowdb_execution::Value::Int64(v)) => {
4221                    Some((m.clone(), *v))
4222                }
4223                _ => None,
4224            })
4225            .collect();
4226        assert!(metrics.contains_key("total_bytes"));
4227        assert!(metrics.contains_key("wal_bytes"));
4228        assert!(metrics.contains_key("edge_count"));
4229        assert!(metrics.contains_key("nodes.Widget"));
4230        assert!(metrics.contains_key("label_bytes.Widget"));
4231        assert!(metrics["total_bytes"] > 0);
4232    }
4233
4234    #[test]
4235    fn stats_edge_count_after_checkpoint() {
4236        let dir = tempfile::tempdir().unwrap();
4237        let db = GraphDb::open(dir.path()).unwrap();
4238        let (n1, n2) = {
4239            let mut tx = db.begin_write().unwrap();
4240            let a = tx.create_node(0, &[]).unwrap();
4241            let b = tx.create_node(0, &[]).unwrap();
4242            tx.commit().unwrap();
4243            (a, b)
4244        };
4245        {
4246            let mut tx = db.begin_write().unwrap();
4247            tx.create_edge(n1, n2, "LINK", std::collections::HashMap::new())
4248                .unwrap();
4249            tx.commit().unwrap();
4250        }
4251        let before = db.stats().unwrap();
4252        assert!(
4253            before.edge_count > 0,
4254            "edge_count must be > 0 before checkpoint"
4255        );
4256        db.checkpoint().unwrap();
4257        let after = db.stats().unwrap();
4258        assert_eq!(after.edge_count, before.edge_count);
4259    }
4260}