Skip to main content

lora_database/
database.rs

1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, BufWriter};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use anyhow::{anyhow, Result};
8use lora_analyzer::Analyzer;
9use lora_ast::Document;
10use lora_compiler::{CompiledQuery, Compiler};
11use lora_executor::{
12    ExecuteOptions, LoraValue, MutableExecutionContext, MutableExecutor, QueryResult,
13};
14use lora_parser::parse_query;
15use lora_store::{
16    GraphStorage, GraphStorageMut, InMemoryGraph, MutationEvent, MutationRecorder, SnapshotMeta,
17    Snapshotable,
18};
19use lora_wal::{replay_dir, Lsn, Wal, WalConfig, WalRecorder, WroteCommit};
20
21/// Minimal abstraction any transport can depend on to run Lora queries.
22pub trait QueryRunner: Send + Sync + 'static {
23    fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
24}
25
26/// Owns the graph store and orchestrates parse → analyze → compile → execute.
27///
28/// Optionally drives a write-ahead log: when constructed via
29/// [`Database::open_with_wal`] or [`Database::recover`] the database
30/// holds an [`Arc<WalRecorder>`] that brackets every query with
31/// `begin → mutations → commit/abort → flush` while the engine mutex
32/// is held, so the WAL order is exactly the in-memory commit order.
33/// When constructed via [`Database::in_memory`] / [`Database::from_graph`]
34/// the WAL handle is `None` and the engine pays only the existing
35/// `MutationRecorder::record` null-pointer check per mutation.
36pub struct Database<S> {
37    store: Arc<Mutex<S>>,
38    wal: Option<Arc<WalRecorder>>,
39}
40
41impl Database<InMemoryGraph> {
42    /// Convenience constructor: a fresh, empty in-memory graph database.
43    pub fn in_memory() -> Self {
44        Self::from_graph(InMemoryGraph::new())
45    }
46
47    /// Open or create a WAL-enabled in-memory database from a fresh
48    /// graph.
49    ///
50    /// `WalConfig::Disabled` falls back to [`Database::in_memory`].
51    /// Otherwise, opens the WAL directory, replays any committed
52    /// events into a fresh graph, installs a [`WalRecorder`] on the
53    /// graph, and returns a database ready to serve queries.
54    ///
55    /// To restore from a snapshot in addition to the WAL, use
56    /// [`Database::recover`] instead.
57    pub fn open_with_wal(wal_config: WalConfig) -> Result<Self> {
58        match wal_config {
59            WalConfig::Disabled => Ok(Self::in_memory()),
60            WalConfig::Enabled {
61                dir,
62                sync_mode,
63                segment_target_bytes,
64            } => {
65                let mut graph = InMemoryGraph::new();
66                let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, Lsn::ZERO)?;
67                replay_into(&mut graph, events)?;
68                let recorder = Arc::new(WalRecorder::new(wal));
69                graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
70                Ok(Self {
71                    store: Arc::new(Mutex::new(graph)),
72                    wal: Some(recorder),
73                })
74            }
75        }
76    }
77
78    /// Restore from a snapshot file then replay any WAL records past
79    /// it.
80    ///
81    /// The snapshot's `wal_lsn` (when set) becomes the replay fence —
82    /// events at or below that LSN are already represented in the
83    /// loaded snapshot and are skipped. A missing snapshot file is
84    /// treated as "fresh start" so operators can pass the same path
85    /// on every boot.
86    ///
87    /// If the WAL contains a checkpoint marker newer than the
88    /// snapshot's `wal_lsn`, a one-line warning is printed to stderr
89    /// — the snapshot is stale relative to a more recent checkpoint
90    /// the operator is presumably aware of. Recovery still proceeds
91    /// from the snapshot's fence (replay re-applies every record
92    /// above it, which is conservative-correct); a tighter contract
93    /// is deferred to v2 because verifying that the marker's
94    /// snapshot file actually exists and is loadable is a separate
95    /// observability concern.
96    pub fn recover(snapshot_path: impl AsRef<Path>, wal_config: WalConfig) -> Result<Self> {
97        let snapshot_path = snapshot_path.as_ref();
98        let mut graph = InMemoryGraph::new();
99        let snapshot_lsn = match File::open(snapshot_path) {
100            Ok(f) => {
101                let reader = BufReader::new(f);
102                let meta = graph.load_snapshot(reader)?;
103                meta.wal_lsn.map(Lsn::new).unwrap_or(Lsn::ZERO)
104            }
105            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Lsn::ZERO,
106            Err(e) => return Err(e.into()),
107        };
108
109        match wal_config {
110            WalConfig::Disabled => Ok(Self::from_graph(graph)),
111            WalConfig::Enabled {
112                dir,
113                sync_mode,
114                segment_target_bytes,
115            } => {
116                // Diagnostic peek at the WAL's newest checkpoint
117                // marker so we can warn the operator about a stale
118                // snapshot before we start replaying. Treat any error
119                // as "no marker" — the subsequent `Wal::open` will
120                // surface the real failure if there is one.
121                if dir.exists() {
122                    if let Ok(outcome) = replay_dir(&dir, Lsn::ZERO) {
123                        if let Some(marker) = outcome.checkpoint_lsn_observed {
124                            if marker > snapshot_lsn {
125                                eprintln!(
126                                    "lora-wal: snapshot at LSN {} is older than the newest \
127                                     checkpoint marker on disk (LSN {}). Replaying every WAL \
128                                     record above LSN {}; consider passing the more recent \
129                                     snapshot to --restore-from.",
130                                    snapshot_lsn.raw(),
131                                    marker.raw(),
132                                    snapshot_lsn.raw()
133                                );
134                            }
135                        }
136                    }
137                }
138
139                let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, snapshot_lsn)?;
140                replay_into(&mut graph, events)?;
141                let recorder = Arc::new(WalRecorder::new(wal));
142                graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
143                Ok(Self {
144                    store: Arc::new(Mutex::new(graph)),
145                    wal: Some(recorder),
146                })
147            }
148        }
149    }
150}
151
152impl<S> Database<S>
153where
154    S: GraphStorage + GraphStorageMut,
155{
156    /// Build a database from a pre-wrapped, shared store.
157    pub fn new(store: Arc<Mutex<S>>) -> Self {
158        Self { store, wal: None }
159    }
160
161    /// Build a database by taking ownership of a bare graph store.
162    pub fn from_graph(graph: S) -> Self {
163        Self::new(Arc::new(Mutex::new(graph)))
164    }
165
166    /// Handle to the installed WAL recorder, if any. Exposed for
167    /// admin paths (checkpoint, truncate, observability) that need
168    /// to drive the WAL outside the standard query lifecycle.
169    pub fn wal(&self) -> Option<&Arc<WalRecorder>> {
170        self.wal.as_ref()
171    }
172
173    /// Handle to the underlying shared store — useful for callers that need
174    /// to snapshot or share the graph across multiple databases.
175    pub fn store(&self) -> &Arc<Mutex<S>> {
176        &self.store
177    }
178
179    /// Parse a query string into an AST without executing it.
180    pub fn parse(&self, query: &str) -> Result<Document> {
181        Ok(parse_query(query)?)
182    }
183
184    fn lock_store(&self) -> MutexGuard<'_, S> {
185        self.store
186            .lock()
187            .unwrap_or_else(|poisoned| poisoned.into_inner())
188    }
189
190    fn compile_query(&self, query: &str) -> Result<(MutexGuard<'_, S>, CompiledQuery)> {
191        let document = self.parse(query)?;
192        let store = self.lock_store();
193
194        let resolved = {
195            let mut analyzer = Analyzer::new(&*store);
196            analyzer.analyze(&document)?
197        };
198
199        let compiled = Compiler::compile(&resolved);
200        Ok((store, compiled))
201    }
202
203    /// Execute a query and return its result.
204    pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
205        self.execute_with_params(query, options, BTreeMap::new())
206    }
207
208    /// Execute a query with bound parameters.
209    ///
210    /// When a WAL is attached the call is bracketed by a transaction:
211    ///
212    /// 1. `recorder.arm()` after analyze + compile (so a parse /
213    ///    semantic / compile error never opens a tx that has to be
214    ///    immediately aborted). Arming is *cheap*: no record is
215    ///    appended to the WAL yet, so a pure read query that
216    ///    completes here pays nothing for the WAL hot path.
217    /// 2. The executor runs; every primitive mutation fires
218    ///    `MutationRecorder::record`, which on its first call
219    ///    lazily issues `Wal::begin` and from then on forwards
220    ///    every event to `Wal::append`.
221    /// 3. On Ok, `recorder.commit()` writes a `TxCommit` only when a
222    ///    `TxBegin` was actually allocated; the surrounding
223    ///    `recorder.flush()` runs only in that case so a read-only
224    ///    query never pays an `fsync`.
225    /// 4. On Err, `recorder.abort()` marks the (lazily-issued) tx
226    ///    for replay-time discard; if no `TxBegin` was issued,
227    ///    abort is a no-op on the WAL. The engine has no rollback,
228    ///    so the in-memory state may already be partially mutated;
229    ///    the abort marker is what gives the *durable* layer
230    ///    per-query atomicity.
231    /// 5. The recorder's poisoned flag is polled once (it also
232    ///    surfaces background-flusher fsync failures from
233    ///    `SyncMode::Group`). If set, the query fails loudly with the
234    ///    durability error so the caller can act on it; the WAL
235    ///    refuses further appends until the operator restarts the
236    ///    database, which recovers from the last consistent
237    ///    snapshot + WAL.
238    pub fn execute_with_params(
239        &self,
240        query: &str,
241        options: Option<ExecuteOptions>,
242        params: BTreeMap<String, LoraValue>,
243    ) -> Result<QueryResult> {
244        let (mut store, compiled) = self.compile_query(query)?;
245
246        if let Some(rec) = &self.wal {
247            rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
248        }
249
250        let exec_result: Result<QueryResult> = (|| {
251            let mut executor = MutableExecutor::new(MutableExecutionContext {
252                storage: &mut *store,
253                params,
254            });
255            Ok(executor.execute_compiled(&compiled, options)?)
256        })();
257
258        if let Some(rec) = &self.wal {
259            match &exec_result {
260                Ok(_) => match rec.commit() {
261                    Ok(WroteCommit::Yes) => {
262                        rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
263                    }
264                    Ok(WroteCommit::No) => {
265                        // Read-only query: no records were written
266                        // and there is nothing to fsync. Skip flush
267                        // entirely so PerCommit pays zero fsyncs on
268                        // pure reads.
269                    }
270                    Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
271                },
272                Err(_) => {
273                    // Best-effort abort. If the WAL saw mutations, durable
274                    // recovery will discard them but the live in-memory store
275                    // may already be ahead of durable state. Quarantine this
276                    // handle so callers restart instead of serving from a
277                    // potentially divergent graph.
278                    if matches!(rec.abort(), Ok(true)) {
279                        rec.poison(
280                            "query mutated the live graph before failing; restart from snapshot + WAL required",
281                        );
282                    }
283                }
284            }
285            if let Some(reason) = rec.poisoned() {
286                return Err(anyhow!("WAL poisoned: {reason}"));
287            }
288        }
289
290        exec_result
291    }
292
293    // ---------- Storage-agnostic utility helpers ----------
294    //
295    // Bindings previously reached into `Arc<Mutex<InMemoryGraph>>` to answer
296    // stat / admin calls; these helpers let them depend on `Database<S>`
297    // instead, so swapping in a new backend only requires changing one type
298    // parameter.
299
300    /// Drop every node and relationship.
301    ///
302    /// When a WAL is attached, `clear()` is wrapped in `arm`/`commit`
303    /// so the `MutationEvent::Clear` fired by the store reaches the
304    /// log inside a transaction (without arming, the recorder would
305    /// poison itself on the first event). WAL failures here are
306    /// best-effort: the in-memory state is still cleared so the
307    /// caller's contract holds, but the recorder's poisoned flag
308    /// will surface to the next query.
309    pub fn clear(&self) {
310        let mut guard = self.lock_store();
311        match &self.wal {
312            None => guard.clear(),
313            Some(rec) => {
314                let armed = rec.arm();
315                guard.clear();
316                if armed.is_ok() {
317                    // `clear()` always emits a `MutationEvent::Clear`,
318                    // so commit returns `WroteCommit::Yes` and we
319                    // flush. If that order ever changes, the worst
320                    // case is one redundant flush call.
321                    let _ = rec.commit();
322                    let _ = rec.flush();
323                }
324            }
325        }
326    }
327
328    /// Number of nodes currently in the graph.
329    pub fn node_count(&self) -> usize {
330        let guard = self.lock_store();
331        guard.node_count()
332    }
333
334    /// Number of relationships currently in the graph.
335    pub fn relationship_count(&self) -> usize {
336        let guard = self.lock_store();
337        guard.relationship_count()
338    }
339
340    /// Run a closure with a shared borrow of the underlying store. Used by
341    /// bindings to answer ad-hoc queries without locking the mutex themselves.
342    pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
343        let guard = self.lock_store();
344        f(&*guard)
345    }
346
347    /// Run a closure with an exclusive borrow of the underlying store. Reserved
348    /// for admin paths (restore, bulk load); regular mutation goes through
349    /// `execute_with_params`.
350    pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
351        let mut guard = self.lock_store();
352        f(&mut *guard)
353    }
354}
355
356// ---------------------------------------------------------------------------
357// Snapshot helpers
358//
359// A second impl block so the `Snapshotable` bound only constrains backends
360// that actually need it. `Database<InMemoryGraph>` picks these up
361// automatically; hypothetical backends that don't implement `Snapshotable`
362// still get the core query API above.
363// ---------------------------------------------------------------------------
364
365impl<S> Database<S>
366where
367    S: GraphStorage + GraphStorageMut + Snapshotable,
368{
369    /// Serialize the current graph state to the given path. Writes are
370    /// atomic: the payload goes to `<path>.tmp`, is `fsync`'d, and then
371    /// renamed over the target; a torn write can never leave a half-written
372    /// file at `path`. If any step before the rename fails, the stale
373    /// `<path>.tmp` is removed so a crashed save never leaks scratch files.
374    ///
375    /// Holds the store mutex for the duration of the save so concurrent
376    /// queries see a consistent point-in-time snapshot.
377    pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
378        let path = path.as_ref();
379        let tmp = snapshot_tmp_path(path);
380
381        // Acquire the lock once so the snapshot is point-in-time consistent.
382        let guard = self.lock_store();
383
384        let file = OpenOptions::new()
385            .write(true)
386            .create(true)
387            .truncate(true)
388            .open(&tmp)?;
389        // Arm cleanup immediately after `open` succeeds: every early return
390        // below must either surface an error *and* unlink the tmp, or commit
391        // the guard once the rename takes effect.
392        let tmp_guard = TempFileGuard::new(tmp.clone());
393        let mut writer = BufWriter::new(file);
394
395        let meta = guard.save_snapshot(&mut writer)?;
396
397        // Flush the BufWriter before fsync; otherwise we fsync an empty
398        // underlying file.
399        use std::io::Write;
400        writer.flush()?;
401        let file = writer.into_inner().map_err(|e| e.into_error())?;
402        file.sync_all()?;
403        drop(file);
404
405        std::fs::rename(&tmp, path)?;
406        // The tmp path no longer has a file behind it — disarm the guard so
407        // it doesn't try to remove the just-renamed target by name race.
408        tmp_guard.commit();
409
410        // Best-effort parent-dir fsync so the rename itself is durable on
411        // power loss. Non-fatal if the parent can't be opened.
412        if let Some(parent) = path.parent() {
413            if let Ok(dir) = File::open(parent) {
414                let _ = dir.sync_all();
415            }
416        }
417
418        Ok(meta)
419    }
420
421    /// Replace the current graph state with a snapshot loaded from `path`.
422    /// Holds the store mutex for the duration of the load; concurrent
423    /// queries block until restore completes.
424    pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
425        let file = File::open(path.as_ref())?;
426        let reader = BufReader::new(file);
427
428        let mut guard = self.lock_store();
429        Ok(guard.load_snapshot(reader)?)
430    }
431}
432
433impl Database<InMemoryGraph> {
434    /// Convenience constructor: open (or create) an empty in-memory database
435    /// and immediately restore it from `path`. Errors if the file cannot be
436    /// opened or the snapshot is malformed.
437    pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
438        let db = Self::in_memory();
439        db.load_snapshot_from(path)?;
440        Ok(db)
441    }
442
443    /// Take a checkpoint: snapshot the current state with the WAL's
444    /// `durable_lsn` stamped into the header, append a `Checkpoint`
445    /// marker to the WAL, then drop sealed segments at or below the
446    /// fence.
447    ///
448    /// Errors with "checkpoint requires WAL enabled" when called on a
449    /// database constructed without a WAL — operators that just want
450    /// a fence-less dump should use [`save_snapshot_to`] instead.
451    ///
452    /// The mutex-held window covers snapshot serialization plus the
453    /// checkpoint marker append. Truncation runs after the rename
454    /// but still under the mutex; making it concurrent with queries
455    /// is a v2 concern (see `docs/decisions/0004-wal.md`).
456    pub fn checkpoint_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
457        let recorder = self
458            .wal
459            .as_ref()
460            .ok_or_else(|| anyhow!("checkpoint requires WAL enabled"))?;
461        let path = path.as_ref();
462        let tmp = snapshot_tmp_path(path);
463
464        let guard = self.lock_store();
465
466        // Make every record appended so far durable, then capture
467        // the LSN that becomes the snapshot fence.
468        recorder
469            .force_fsync()
470            .map_err(|e| anyhow!("WAL fsync before checkpoint failed: {e}"))?;
471        let snapshot_lsn = recorder.wal().durable_lsn();
472
473        let file = OpenOptions::new()
474            .write(true)
475            .create(true)
476            .truncate(true)
477            .open(&tmp)?;
478        let tmp_guard = TempFileGuard::new(tmp.clone());
479        let mut writer = BufWriter::new(file);
480        let meta = guard.save_checkpoint(&mut writer, snapshot_lsn.raw())?;
481
482        use std::io::Write;
483        writer.flush()?;
484        let file = writer.into_inner().map_err(|e| e.into_error())?;
485        file.sync_all()?;
486        drop(file);
487
488        std::fs::rename(&tmp, path)?;
489        tmp_guard.commit();
490
491        if let Some(parent) = path.parent() {
492            if let Ok(dir) = File::open(parent) {
493                let _ = dir.sync_all();
494            }
495        }
496
497        // Append the checkpoint marker AFTER the rename succeeds —
498        // this preserves the invariant that a `Checkpoint` record
499        // in the WAL implies the snapshot it points at exists.
500        recorder
501            .checkpoint_marker(snapshot_lsn)
502            .map_err(|e| anyhow!("WAL checkpoint marker failed: {e}"))?;
503        recorder
504            .force_fsync()
505            .map_err(|e| anyhow!("WAL fsync after checkpoint marker failed: {e}"))?;
506
507        // Best-effort segment truncation. Failure here doesn't undo
508        // the checkpoint — the next call will retry.
509        let _ = recorder.truncate_up_to(snapshot_lsn);
510
511        Ok(meta)
512    }
513}
514
515fn snapshot_tmp_path(target: &Path) -> PathBuf {
516    let mut tmp = target.as_os_str().to_owned();
517    tmp.push(".tmp");
518    PathBuf::from(tmp)
519}
520
521/// RAII handle that deletes its path on drop unless [`commit`] is called.
522///
523/// The snapshot save path creates `<target>.tmp` before the payload is
524/// written; if any step between then and the final rename fails (or the
525/// thread unwinds), the guard's `Drop` removes the scratch file so a crashed
526/// save never leaves leftovers on disk.
527///
528/// [`commit`]: Self::commit
529struct TempFileGuard {
530    path: Option<PathBuf>,
531}
532
533impl TempFileGuard {
534    fn new(path: PathBuf) -> Self {
535        Self { path: Some(path) }
536    }
537
538    /// Disarm the guard. Call this once the tmp file's contents have been
539    /// handed off (e.g. renamed to their final destination) so the `Drop`
540    /// impl does not try to remove them.
541    fn commit(mut self) {
542        self.path.take();
543    }
544}
545
546impl Drop for TempFileGuard {
547    fn drop(&mut self) {
548        if let Some(path) = self.path.take() {
549            // Best-effort: cleanup failure is not worth surfacing — the
550            // worst case is a leaked scratch file that the next save
551            // overwrites via `OpenOptions::truncate(true)`.
552            let _ = std::fs::remove_file(path);
553        }
554    }
555}
556
557/// Storage-agnostic admin surface for HTTP / binding callers that want to
558/// drive snapshot operations without naming the backend type parameter.
559///
560/// `Database<S>` picks up a blanket impl when `S: Snapshotable + 'static`.
561/// Transports (e.g. `lora-server`) type-erase on `Arc<dyn SnapshotAdmin>`.
562pub trait SnapshotAdmin: Send + Sync + 'static {
563    fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
564    fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
565}
566
567impl<S> SnapshotAdmin for Database<S>
568where
569    S: GraphStorage + GraphStorageMut + Snapshotable + Send + 'static,
570{
571    fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
572        self.save_snapshot_to(path)
573    }
574
575    fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
576        self.load_snapshot_from(path)
577    }
578}
579
580/// Storage-agnostic admin surface for the WAL.
581///
582/// `Database<InMemoryGraph>` picks up the blanket impl below when a
583/// WAL is attached. Transports (e.g. `lora-server`) type-erase on
584/// `Arc<dyn WalAdmin>` so they don't need to name the backend type
585/// parameter.
586///
587/// All LSNs cross the trait boundary as raw `u64` so callers don't
588/// need a dependency on `lora-wal`.
589pub trait WalAdmin: Send + Sync + 'static {
590    /// Take a checkpoint at `path`. The snapshot's header is stamped
591    /// with the WAL's `durable_lsn`; older sealed segments are then
592    /// dropped.
593    fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta>;
594
595    /// Snapshot of the WAL's current state — durable / next LSN,
596    /// active / oldest segment id. Cheap; a single mutex acquisition.
597    fn wal_status(&self) -> Result<WalStatus>;
598
599    /// Drop sealed segments at or below `fence_lsn`. Idempotent.
600    fn wal_truncate(&self, fence_lsn: u64) -> Result<()>;
601}
602
603/// Snapshot of WAL state returned by [`WalAdmin::wal_status`].
604///
605/// `bg_failure` is the latched fsync error from the background flusher
606/// (only meaningful under `SyncMode::Group`). When `Some`, the WAL is
607/// poisoned and every subsequent commit will fail loudly until the
608/// operator restarts from the last consistent snapshot + WAL.
609#[derive(Debug, Clone)]
610pub struct WalStatus {
611    pub durable_lsn: u64,
612    pub next_lsn: u64,
613    pub active_segment_id: u64,
614    pub oldest_segment_id: u64,
615    pub bg_failure: Option<String>,
616}
617
618impl WalAdmin for Database<InMemoryGraph> {
619    fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta> {
620        self.checkpoint_to(path)
621    }
622
623    fn wal_status(&self) -> Result<WalStatus> {
624        let recorder = self
625            .wal
626            .as_ref()
627            .ok_or_else(|| anyhow!("WAL not enabled"))?;
628        let wal = recorder.wal();
629        Ok(WalStatus {
630            durable_lsn: wal.durable_lsn().raw(),
631            next_lsn: wal.next_lsn().raw(),
632            active_segment_id: wal.active_segment_id(),
633            oldest_segment_id: wal.oldest_segment_id(),
634            bg_failure: wal.bg_failure(),
635        })
636    }
637
638    fn wal_truncate(&self, fence_lsn: u64) -> Result<()> {
639        let recorder = self
640            .wal
641            .as_ref()
642            .ok_or_else(|| anyhow!("WAL not enabled"))?;
643        recorder.truncate_up_to(Lsn::new(fence_lsn))?;
644        Ok(())
645    }
646}
647
648impl<S> QueryRunner for Database<S>
649where
650    S: GraphStorage + GraphStorageMut + Send + 'static,
651{
652    fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
653        Database::execute(self, query, options)
654    }
655}
656
657// ---------------------------------------------------------------------------
658// Replay
659// ---------------------------------------------------------------------------
660
661/// Apply a `MutationEvent` stream to an in-memory graph by dispatching
662/// each variant to the matching store operation.
663///
664/// Creation events are replayed through id-preserving paths, not the
665/// normal allocator-backed mutation methods. That matters after aborted
666/// transactions: an aborted create can consume id `N` in the original
667/// process, be dropped by replay, and leave the next committed create at
668/// id `N + 1`. Reusing the regular allocator would shift ids downward.
669///
670/// Replay must be invoked **before** the `WalRecorder` is installed
671/// on the graph. Otherwise the replay's own mutations would fire the
672/// recorder and re-write the same events to the WAL, doubling them on
673/// the next recovery.
674fn replay_into(graph: &mut InMemoryGraph, events: Vec<MutationEvent>) -> Result<()> {
675    for (idx, event) in events.into_iter().enumerate() {
676        match event {
677            MutationEvent::CreateNode {
678                id,
679                labels,
680                properties,
681            } => {
682                graph
683                    .replay_create_node(id, labels, properties)
684                    .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
685            }
686            MutationEvent::CreateRelationship {
687                id,
688                src,
689                dst,
690                rel_type,
691                properties,
692            } => {
693                graph
694                    .replay_create_relationship(id, src, dst, &rel_type, properties)
695                    .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
696            }
697            MutationEvent::SetNodeProperty {
698                node_id,
699                key,
700                value,
701            } => {
702                if !graph.set_node_property(node_id, key, value) {
703                    return Err(anyhow!(
704                        "WAL replay failed at event {idx}: missing node {node_id} for property set"
705                    ));
706                }
707            }
708            MutationEvent::RemoveNodeProperty { node_id, key } => {
709                if !graph.remove_node_property(node_id, &key) {
710                    return Err(anyhow!(
711                        "WAL replay failed at event {idx}: missing node {node_id} for property removal"
712                    ));
713                }
714            }
715            MutationEvent::AddNodeLabel { node_id, label } => {
716                if !graph.add_node_label(node_id, &label) {
717                    return Err(anyhow!(
718                        "WAL replay failed at event {idx}: missing node {node_id} for label add"
719                    ));
720                }
721            }
722            MutationEvent::RemoveNodeLabel { node_id, label } => {
723                if !graph.remove_node_label(node_id, &label) {
724                    return Err(anyhow!(
725                        "WAL replay failed at event {idx}: missing node {node_id} for label removal"
726                    ));
727                }
728            }
729            MutationEvent::SetRelationshipProperty { rel_id, key, value } => {
730                if !graph.set_relationship_property(rel_id, key, value) {
731                    return Err(anyhow!(
732                        "WAL replay failed at event {idx}: missing relationship {rel_id} for property set"
733                    ));
734                }
735            }
736            MutationEvent::RemoveRelationshipProperty { rel_id, key } => {
737                if !graph.remove_relationship_property(rel_id, &key) {
738                    return Err(anyhow!(
739                        "WAL replay failed at event {idx}: missing relationship {rel_id} for property removal"
740                    ));
741                }
742            }
743            MutationEvent::DeleteRelationship { rel_id } => {
744                if !graph.delete_relationship(rel_id) {
745                    return Err(anyhow!(
746                        "WAL replay failed at event {idx}: missing relationship {rel_id} for delete"
747                    ));
748                }
749            }
750            MutationEvent::DeleteNode { node_id } => {
751                if !graph.delete_node(node_id) {
752                    return Err(anyhow!(
753                        "WAL replay failed at event {idx}: missing or attached node {node_id} for delete"
754                    ));
755                }
756            }
757            MutationEvent::DetachDeleteNode { node_id } => {
758                // After the cascading DeleteRelationship +
759                // DeleteNode events have already replayed, the node
760                // is gone and this becomes a no-op. Calling it
761                // anyway is harmless.
762                graph.detach_delete_node(node_id);
763            }
764            MutationEvent::Clear => {
765                graph.clear();
766            }
767        }
768    }
769    Ok(())
770}