lora_database/database.rs
1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, BufWriter};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use anyhow::{anyhow, Result};
8use lora_analyzer::Analyzer;
9use lora_ast::Document;
10use lora_compiler::{CompiledQuery, Compiler};
11use lora_executor::{
12 ExecuteOptions, LoraValue, MutableExecutionContext, MutableExecutor, QueryResult,
13};
14use lora_parser::parse_query;
15use lora_store::{
16 GraphStorage, GraphStorageMut, InMemoryGraph, MutationEvent, MutationRecorder, SnapshotMeta,
17 Snapshotable,
18};
19use lora_wal::{replay_dir, Lsn, Wal, WalConfig, WalRecorder, WroteCommit};
20
21/// Minimal abstraction any transport can depend on to run Lora queries.
22pub trait QueryRunner: Send + Sync + 'static {
23 fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
24}
25
26/// Owns the graph store and orchestrates parse → analyze → compile → execute.
27///
28/// Optionally drives a write-ahead log: when constructed via
29/// [`Database::open_with_wal`] or [`Database::recover`] the database
30/// holds an [`Arc<WalRecorder>`] that brackets every query with
31/// `begin → mutations → commit/abort → flush` while the engine mutex
32/// is held, so the WAL order is exactly the in-memory commit order.
33/// When constructed via [`Database::in_memory`] / [`Database::from_graph`]
34/// the WAL handle is `None` and the engine pays only the existing
35/// `MutationRecorder::record` null-pointer check per mutation.
36pub struct Database<S> {
37 store: Arc<Mutex<S>>,
38 wal: Option<Arc<WalRecorder>>,
39}
40
41impl Database<InMemoryGraph> {
42 /// Convenience constructor: a fresh, empty in-memory graph database.
43 pub fn in_memory() -> Self {
44 Self::from_graph(InMemoryGraph::new())
45 }
46
47 /// Open or create a WAL-enabled in-memory database from a fresh
48 /// graph.
49 ///
50 /// `WalConfig::Disabled` falls back to [`Database::in_memory`].
51 /// Otherwise, opens the WAL directory, replays any committed
52 /// events into a fresh graph, installs a [`WalRecorder`] on the
53 /// graph, and returns a database ready to serve queries.
54 ///
55 /// To restore from a snapshot in addition to the WAL, use
56 /// [`Database::recover`] instead.
57 pub fn open_with_wal(wal_config: WalConfig) -> Result<Self> {
58 match wal_config {
59 WalConfig::Disabled => Ok(Self::in_memory()),
60 WalConfig::Enabled {
61 dir,
62 sync_mode,
63 segment_target_bytes,
64 } => {
65 let mut graph = InMemoryGraph::new();
66 let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, Lsn::ZERO)?;
67 replay_into(&mut graph, events)?;
68 let recorder = Arc::new(WalRecorder::new(wal));
69 graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
70 Ok(Self {
71 store: Arc::new(Mutex::new(graph)),
72 wal: Some(recorder),
73 })
74 }
75 }
76 }
77
78 /// Restore from a snapshot file then replay any WAL records past
79 /// it.
80 ///
81 /// The snapshot's `wal_lsn` (when set) becomes the replay fence —
82 /// events at or below that LSN are already represented in the
83 /// loaded snapshot and are skipped. A missing snapshot file is
84 /// treated as "fresh start" so operators can pass the same path
85 /// on every boot.
86 ///
87 /// If the WAL contains a checkpoint marker newer than the
88 /// snapshot's `wal_lsn`, a one-line warning is printed to stderr
89 /// — the snapshot is stale relative to a more recent checkpoint
90 /// the operator is presumably aware of. Recovery still proceeds
91 /// from the snapshot's fence (replay re-applies every record
92 /// above it, which is conservative-correct); a tighter contract
93 /// is deferred to v2 because verifying that the marker's
94 /// snapshot file actually exists and is loadable is a separate
95 /// observability concern.
96 pub fn recover(snapshot_path: impl AsRef<Path>, wal_config: WalConfig) -> Result<Self> {
97 let snapshot_path = snapshot_path.as_ref();
98 let mut graph = InMemoryGraph::new();
99 let snapshot_lsn = match File::open(snapshot_path) {
100 Ok(f) => {
101 let reader = BufReader::new(f);
102 let meta = graph.load_snapshot(reader)?;
103 meta.wal_lsn.map(Lsn::new).unwrap_or(Lsn::ZERO)
104 }
105 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Lsn::ZERO,
106 Err(e) => return Err(e.into()),
107 };
108
109 match wal_config {
110 WalConfig::Disabled => Ok(Self::from_graph(graph)),
111 WalConfig::Enabled {
112 dir,
113 sync_mode,
114 segment_target_bytes,
115 } => {
116 // Diagnostic peek at the WAL's newest checkpoint
117 // marker so we can warn the operator about a stale
118 // snapshot before we start replaying. Treat any error
119 // as "no marker" — the subsequent `Wal::open` will
120 // surface the real failure if there is one.
121 if dir.exists() {
122 if let Ok(outcome) = replay_dir(&dir, Lsn::ZERO) {
123 if let Some(marker) = outcome.checkpoint_lsn_observed {
124 if marker > snapshot_lsn {
125 eprintln!(
126 "lora-wal: snapshot at LSN {} is older than the newest \
127 checkpoint marker on disk (LSN {}). Replaying every WAL \
128 record above LSN {}; consider passing the more recent \
129 snapshot to --restore-from.",
130 snapshot_lsn.raw(),
131 marker.raw(),
132 snapshot_lsn.raw()
133 );
134 }
135 }
136 }
137 }
138
139 let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, snapshot_lsn)?;
140 replay_into(&mut graph, events)?;
141 let recorder = Arc::new(WalRecorder::new(wal));
142 graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
143 Ok(Self {
144 store: Arc::new(Mutex::new(graph)),
145 wal: Some(recorder),
146 })
147 }
148 }
149 }
150}
151
152impl<S> Database<S>
153where
154 S: GraphStorage + GraphStorageMut,
155{
156 /// Build a database from a pre-wrapped, shared store.
157 pub fn new(store: Arc<Mutex<S>>) -> Self {
158 Self { store, wal: None }
159 }
160
161 /// Build a database by taking ownership of a bare graph store.
162 pub fn from_graph(graph: S) -> Self {
163 Self::new(Arc::new(Mutex::new(graph)))
164 }
165
166 /// Handle to the installed WAL recorder, if any. Exposed for
167 /// admin paths (checkpoint, truncate, observability) that need
168 /// to drive the WAL outside the standard query lifecycle.
169 pub fn wal(&self) -> Option<&Arc<WalRecorder>> {
170 self.wal.as_ref()
171 }
172
173 /// Handle to the underlying shared store — useful for callers that need
174 /// to snapshot or share the graph across multiple databases.
175 pub fn store(&self) -> &Arc<Mutex<S>> {
176 &self.store
177 }
178
179 /// Parse a query string into an AST without executing it.
180 pub fn parse(&self, query: &str) -> Result<Document> {
181 Ok(parse_query(query)?)
182 }
183
184 fn lock_store(&self) -> MutexGuard<'_, S> {
185 self.store
186 .lock()
187 .unwrap_or_else(|poisoned| poisoned.into_inner())
188 }
189
190 fn compile_query(&self, query: &str) -> Result<(MutexGuard<'_, S>, CompiledQuery)> {
191 let document = self.parse(query)?;
192 let store = self.lock_store();
193
194 let resolved = {
195 let mut analyzer = Analyzer::new(&*store);
196 analyzer.analyze(&document)?
197 };
198
199 let compiled = Compiler::compile(&resolved);
200 Ok((store, compiled))
201 }
202
203 /// Execute a query and return its result.
204 pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
205 self.execute_with_params(query, options, BTreeMap::new())
206 }
207
208 /// Execute a query with bound parameters.
209 ///
210 /// When a WAL is attached the call is bracketed by a transaction:
211 ///
212 /// 1. `recorder.arm()` after analyze + compile (so a parse /
213 /// semantic / compile error never opens a tx that has to be
214 /// immediately aborted). Arming is *cheap*: no record is
215 /// appended to the WAL yet, so a pure read query that
216 /// completes here pays nothing for the WAL hot path.
217 /// 2. The executor runs; every primitive mutation fires
218 /// `MutationRecorder::record`, which on its first call
219 /// lazily issues `Wal::begin` and from then on forwards
220 /// every event to `Wal::append`.
221 /// 3. On Ok, `recorder.commit()` writes a `TxCommit` only when a
222 /// `TxBegin` was actually allocated; the surrounding
223 /// `recorder.flush()` runs only in that case so a read-only
224 /// query never pays an `fsync`.
225 /// 4. On Err, `recorder.abort()` marks the (lazily-issued) tx
226 /// for replay-time discard; if no `TxBegin` was issued,
227 /// abort is a no-op on the WAL. The engine has no rollback,
228 /// so the in-memory state may already be partially mutated;
229 /// the abort marker is what gives the *durable* layer
230 /// per-query atomicity.
231 /// 5. The recorder's poisoned flag is polled once (it also
232 /// surfaces background-flusher fsync failures from
233 /// `SyncMode::Group`). If set, the query fails loudly with the
234 /// durability error so the caller can act on it; the WAL
235 /// refuses further appends until the operator restarts the
236 /// database, which recovers from the last consistent
237 /// snapshot + WAL.
238 pub fn execute_with_params(
239 &self,
240 query: &str,
241 options: Option<ExecuteOptions>,
242 params: BTreeMap<String, LoraValue>,
243 ) -> Result<QueryResult> {
244 let (mut store, compiled) = self.compile_query(query)?;
245
246 if let Some(rec) = &self.wal {
247 rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
248 }
249
250 let exec_result: Result<QueryResult> = (|| {
251 let mut executor = MutableExecutor::new(MutableExecutionContext {
252 storage: &mut *store,
253 params,
254 });
255 Ok(executor.execute_compiled(&compiled, options)?)
256 })();
257
258 if let Some(rec) = &self.wal {
259 match &exec_result {
260 Ok(_) => match rec.commit() {
261 Ok(WroteCommit::Yes) => {
262 rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
263 }
264 Ok(WroteCommit::No) => {
265 // Read-only query: no records were written
266 // and there is nothing to fsync. Skip flush
267 // entirely so PerCommit pays zero fsyncs on
268 // pure reads.
269 }
270 Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
271 },
272 Err(_) => {
273 // Best-effort abort. If the WAL saw mutations, durable
274 // recovery will discard them but the live in-memory store
275 // may already be ahead of durable state. Quarantine this
276 // handle so callers restart instead of serving from a
277 // potentially divergent graph.
278 if matches!(rec.abort(), Ok(true)) {
279 rec.poison(
280 "query mutated the live graph before failing; restart from snapshot + WAL required",
281 );
282 }
283 }
284 }
285 if let Some(reason) = rec.poisoned() {
286 return Err(anyhow!("WAL poisoned: {reason}"));
287 }
288 }
289
290 exec_result
291 }
292
293 // ---------- Storage-agnostic utility helpers ----------
294 //
295 // Bindings previously reached into `Arc<Mutex<InMemoryGraph>>` to answer
296 // stat / admin calls; these helpers let them depend on `Database<S>`
297 // instead, so swapping in a new backend only requires changing one type
298 // parameter.
299
300 /// Drop every node and relationship.
301 ///
302 /// When a WAL is attached, `clear()` is wrapped in `arm`/`commit`
303 /// so the `MutationEvent::Clear` fired by the store reaches the
304 /// log inside a transaction (without arming, the recorder would
305 /// poison itself on the first event). WAL failures here are
306 /// best-effort: the in-memory state is still cleared so the
307 /// caller's contract holds, but the recorder's poisoned flag
308 /// will surface to the next query.
309 pub fn clear(&self) {
310 let mut guard = self.lock_store();
311 match &self.wal {
312 None => guard.clear(),
313 Some(rec) => {
314 let armed = rec.arm();
315 guard.clear();
316 if armed.is_ok() {
317 // `clear()` always emits a `MutationEvent::Clear`,
318 // so commit returns `WroteCommit::Yes` and we
319 // flush. If that order ever changes, the worst
320 // case is one redundant flush call.
321 let _ = rec.commit();
322 let _ = rec.flush();
323 }
324 }
325 }
326 }
327
328 /// Number of nodes currently in the graph.
329 pub fn node_count(&self) -> usize {
330 let guard = self.lock_store();
331 guard.node_count()
332 }
333
334 /// Number of relationships currently in the graph.
335 pub fn relationship_count(&self) -> usize {
336 let guard = self.lock_store();
337 guard.relationship_count()
338 }
339
340 /// Run a closure with a shared borrow of the underlying store. Used by
341 /// bindings to answer ad-hoc queries without locking the mutex themselves.
342 pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
343 let guard = self.lock_store();
344 f(&*guard)
345 }
346
347 /// Run a closure with an exclusive borrow of the underlying store. Reserved
348 /// for admin paths (restore, bulk load); regular mutation goes through
349 /// `execute_with_params`.
350 pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
351 let mut guard = self.lock_store();
352 f(&mut *guard)
353 }
354}
355
356// ---------------------------------------------------------------------------
357// Snapshot helpers
358//
359// A second impl block so the `Snapshotable` bound only constrains backends
360// that actually need it. `Database<InMemoryGraph>` picks these up
361// automatically; hypothetical backends that don't implement `Snapshotable`
362// still get the core query API above.
363// ---------------------------------------------------------------------------
364
365impl<S> Database<S>
366where
367 S: GraphStorage + GraphStorageMut + Snapshotable,
368{
369 /// Serialize the current graph state to the given path. Writes are
370 /// atomic: the payload goes to `<path>.tmp`, is `fsync`'d, and then
371 /// renamed over the target; a torn write can never leave a half-written
372 /// file at `path`. If any step before the rename fails, the stale
373 /// `<path>.tmp` is removed so a crashed save never leaks scratch files.
374 ///
375 /// Holds the store mutex for the duration of the save so concurrent
376 /// queries see a consistent point-in-time snapshot.
377 pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
378 let path = path.as_ref();
379 let tmp = snapshot_tmp_path(path);
380
381 // Acquire the lock once so the snapshot is point-in-time consistent.
382 let guard = self.lock_store();
383
384 let file = OpenOptions::new()
385 .write(true)
386 .create(true)
387 .truncate(true)
388 .open(&tmp)?;
389 // Arm cleanup immediately after `open` succeeds: every early return
390 // below must either surface an error *and* unlink the tmp, or commit
391 // the guard once the rename takes effect.
392 let tmp_guard = TempFileGuard::new(tmp.clone());
393 let mut writer = BufWriter::new(file);
394
395 let meta = guard.save_snapshot(&mut writer)?;
396
397 // Flush the BufWriter before fsync; otherwise we fsync an empty
398 // underlying file.
399 use std::io::Write;
400 writer.flush()?;
401 let file = writer.into_inner().map_err(|e| e.into_error())?;
402 file.sync_all()?;
403 drop(file);
404
405 std::fs::rename(&tmp, path)?;
406 // The tmp path no longer has a file behind it — disarm the guard so
407 // it doesn't try to remove the just-renamed target by name race.
408 tmp_guard.commit();
409
410 // Best-effort parent-dir fsync so the rename itself is durable on
411 // power loss. Non-fatal if the parent can't be opened.
412 if let Some(parent) = path.parent() {
413 if let Ok(dir) = File::open(parent) {
414 let _ = dir.sync_all();
415 }
416 }
417
418 Ok(meta)
419 }
420
421 /// Replace the current graph state with a snapshot loaded from `path`.
422 /// Holds the store mutex for the duration of the load; concurrent
423 /// queries block until restore completes.
424 pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
425 let file = File::open(path.as_ref())?;
426 let reader = BufReader::new(file);
427
428 let mut guard = self.lock_store();
429 Ok(guard.load_snapshot(reader)?)
430 }
431}
432
433impl Database<InMemoryGraph> {
434 /// Convenience constructor: open (or create) an empty in-memory database
435 /// and immediately restore it from `path`. Errors if the file cannot be
436 /// opened or the snapshot is malformed.
437 pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
438 let db = Self::in_memory();
439 db.load_snapshot_from(path)?;
440 Ok(db)
441 }
442
443 /// Take a checkpoint: snapshot the current state with the WAL's
444 /// `durable_lsn` stamped into the header, append a `Checkpoint`
445 /// marker to the WAL, then drop sealed segments at or below the
446 /// fence.
447 ///
448 /// Errors with "checkpoint requires WAL enabled" when called on a
449 /// database constructed without a WAL — operators that just want
450 /// a fence-less dump should use [`save_snapshot_to`] instead.
451 ///
452 /// The mutex-held window covers snapshot serialization plus the
453 /// checkpoint marker append. Truncation runs after the rename
454 /// but still under the mutex; making it concurrent with queries
455 /// is a v2 concern (see `docs/decisions/0004-wal.md`).
456 pub fn checkpoint_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
457 let recorder = self
458 .wal
459 .as_ref()
460 .ok_or_else(|| anyhow!("checkpoint requires WAL enabled"))?;
461 let path = path.as_ref();
462 let tmp = snapshot_tmp_path(path);
463
464 let guard = self.lock_store();
465
466 // Make every record appended so far durable, then capture
467 // the LSN that becomes the snapshot fence.
468 recorder
469 .force_fsync()
470 .map_err(|e| anyhow!("WAL fsync before checkpoint failed: {e}"))?;
471 let snapshot_lsn = recorder.wal().durable_lsn();
472
473 let file = OpenOptions::new()
474 .write(true)
475 .create(true)
476 .truncate(true)
477 .open(&tmp)?;
478 let tmp_guard = TempFileGuard::new(tmp.clone());
479 let mut writer = BufWriter::new(file);
480 let meta = guard.save_checkpoint(&mut writer, snapshot_lsn.raw())?;
481
482 use std::io::Write;
483 writer.flush()?;
484 let file = writer.into_inner().map_err(|e| e.into_error())?;
485 file.sync_all()?;
486 drop(file);
487
488 std::fs::rename(&tmp, path)?;
489 tmp_guard.commit();
490
491 if let Some(parent) = path.parent() {
492 if let Ok(dir) = File::open(parent) {
493 let _ = dir.sync_all();
494 }
495 }
496
497 // Append the checkpoint marker AFTER the rename succeeds —
498 // this preserves the invariant that a `Checkpoint` record
499 // in the WAL implies the snapshot it points at exists.
500 recorder
501 .checkpoint_marker(snapshot_lsn)
502 .map_err(|e| anyhow!("WAL checkpoint marker failed: {e}"))?;
503 recorder
504 .force_fsync()
505 .map_err(|e| anyhow!("WAL fsync after checkpoint marker failed: {e}"))?;
506
507 // Best-effort segment truncation. Failure here doesn't undo
508 // the checkpoint — the next call will retry.
509 let _ = recorder.truncate_up_to(snapshot_lsn);
510
511 Ok(meta)
512 }
513}
514
515fn snapshot_tmp_path(target: &Path) -> PathBuf {
516 let mut tmp = target.as_os_str().to_owned();
517 tmp.push(".tmp");
518 PathBuf::from(tmp)
519}
520
521/// RAII handle that deletes its path on drop unless [`commit`] is called.
522///
523/// The snapshot save path creates `<target>.tmp` before the payload is
524/// written; if any step between then and the final rename fails (or the
525/// thread unwinds), the guard's `Drop` removes the scratch file so a crashed
526/// save never leaves leftovers on disk.
527///
528/// [`commit`]: Self::commit
529struct TempFileGuard {
530 path: Option<PathBuf>,
531}
532
533impl TempFileGuard {
534 fn new(path: PathBuf) -> Self {
535 Self { path: Some(path) }
536 }
537
538 /// Disarm the guard. Call this once the tmp file's contents have been
539 /// handed off (e.g. renamed to their final destination) so the `Drop`
540 /// impl does not try to remove them.
541 fn commit(mut self) {
542 self.path.take();
543 }
544}
545
546impl Drop for TempFileGuard {
547 fn drop(&mut self) {
548 if let Some(path) = self.path.take() {
549 // Best-effort: cleanup failure is not worth surfacing — the
550 // worst case is a leaked scratch file that the next save
551 // overwrites via `OpenOptions::truncate(true)`.
552 let _ = std::fs::remove_file(path);
553 }
554 }
555}
556
557/// Storage-agnostic admin surface for HTTP / binding callers that want to
558/// drive snapshot operations without naming the backend type parameter.
559///
560/// `Database<S>` picks up a blanket impl when `S: Snapshotable + 'static`.
561/// Transports (e.g. `lora-server`) type-erase on `Arc<dyn SnapshotAdmin>`.
562pub trait SnapshotAdmin: Send + Sync + 'static {
563 fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
564 fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
565}
566
567impl<S> SnapshotAdmin for Database<S>
568where
569 S: GraphStorage + GraphStorageMut + Snapshotable + Send + 'static,
570{
571 fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
572 self.save_snapshot_to(path)
573 }
574
575 fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
576 self.load_snapshot_from(path)
577 }
578}
579
580/// Storage-agnostic admin surface for the WAL.
581///
582/// `Database<InMemoryGraph>` picks up the blanket impl below when a
583/// WAL is attached. Transports (e.g. `lora-server`) type-erase on
584/// `Arc<dyn WalAdmin>` so they don't need to name the backend type
585/// parameter.
586///
587/// All LSNs cross the trait boundary as raw `u64` so callers don't
588/// need a dependency on `lora-wal`.
589pub trait WalAdmin: Send + Sync + 'static {
590 /// Take a checkpoint at `path`. The snapshot's header is stamped
591 /// with the WAL's `durable_lsn`; older sealed segments are then
592 /// dropped.
593 fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta>;
594
595 /// Snapshot of the WAL's current state — durable / next LSN,
596 /// active / oldest segment id. Cheap; a single mutex acquisition.
597 fn wal_status(&self) -> Result<WalStatus>;
598
599 /// Drop sealed segments at or below `fence_lsn`. Idempotent.
600 fn wal_truncate(&self, fence_lsn: u64) -> Result<()>;
601}
602
603/// Snapshot of WAL state returned by [`WalAdmin::wal_status`].
604///
605/// `bg_failure` is the latched fsync error from the background flusher
606/// (only meaningful under `SyncMode::Group`). When `Some`, the WAL is
607/// poisoned and every subsequent commit will fail loudly until the
608/// operator restarts from the last consistent snapshot + WAL.
609#[derive(Debug, Clone)]
610pub struct WalStatus {
611 pub durable_lsn: u64,
612 pub next_lsn: u64,
613 pub active_segment_id: u64,
614 pub oldest_segment_id: u64,
615 pub bg_failure: Option<String>,
616}
617
618impl WalAdmin for Database<InMemoryGraph> {
619 fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta> {
620 self.checkpoint_to(path)
621 }
622
623 fn wal_status(&self) -> Result<WalStatus> {
624 let recorder = self
625 .wal
626 .as_ref()
627 .ok_or_else(|| anyhow!("WAL not enabled"))?;
628 let wal = recorder.wal();
629 Ok(WalStatus {
630 durable_lsn: wal.durable_lsn().raw(),
631 next_lsn: wal.next_lsn().raw(),
632 active_segment_id: wal.active_segment_id(),
633 oldest_segment_id: wal.oldest_segment_id(),
634 bg_failure: wal.bg_failure(),
635 })
636 }
637
638 fn wal_truncate(&self, fence_lsn: u64) -> Result<()> {
639 let recorder = self
640 .wal
641 .as_ref()
642 .ok_or_else(|| anyhow!("WAL not enabled"))?;
643 recorder.truncate_up_to(Lsn::new(fence_lsn))?;
644 Ok(())
645 }
646}
647
648impl<S> QueryRunner for Database<S>
649where
650 S: GraphStorage + GraphStorageMut + Send + 'static,
651{
652 fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
653 Database::execute(self, query, options)
654 }
655}
656
657// ---------------------------------------------------------------------------
658// Replay
659// ---------------------------------------------------------------------------
660
661/// Apply a `MutationEvent` stream to an in-memory graph by dispatching
662/// each variant to the matching store operation.
663///
664/// Creation events are replayed through id-preserving paths, not the
665/// normal allocator-backed mutation methods. That matters after aborted
666/// transactions: an aborted create can consume id `N` in the original
667/// process, be dropped by replay, and leave the next committed create at
668/// id `N + 1`. Reusing the regular allocator would shift ids downward.
669///
670/// Replay must be invoked **before** the `WalRecorder` is installed
671/// on the graph. Otherwise the replay's own mutations would fire the
672/// recorder and re-write the same events to the WAL, doubling them on
673/// the next recovery.
674fn replay_into(graph: &mut InMemoryGraph, events: Vec<MutationEvent>) -> Result<()> {
675 for (idx, event) in events.into_iter().enumerate() {
676 match event {
677 MutationEvent::CreateNode {
678 id,
679 labels,
680 properties,
681 } => {
682 graph
683 .replay_create_node(id, labels, properties)
684 .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
685 }
686 MutationEvent::CreateRelationship {
687 id,
688 src,
689 dst,
690 rel_type,
691 properties,
692 } => {
693 graph
694 .replay_create_relationship(id, src, dst, &rel_type, properties)
695 .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
696 }
697 MutationEvent::SetNodeProperty {
698 node_id,
699 key,
700 value,
701 } => {
702 if !graph.set_node_property(node_id, key, value) {
703 return Err(anyhow!(
704 "WAL replay failed at event {idx}: missing node {node_id} for property set"
705 ));
706 }
707 }
708 MutationEvent::RemoveNodeProperty { node_id, key } => {
709 if !graph.remove_node_property(node_id, &key) {
710 return Err(anyhow!(
711 "WAL replay failed at event {idx}: missing node {node_id} for property removal"
712 ));
713 }
714 }
715 MutationEvent::AddNodeLabel { node_id, label } => {
716 if !graph.add_node_label(node_id, &label) {
717 return Err(anyhow!(
718 "WAL replay failed at event {idx}: missing node {node_id} for label add"
719 ));
720 }
721 }
722 MutationEvent::RemoveNodeLabel { node_id, label } => {
723 if !graph.remove_node_label(node_id, &label) {
724 return Err(anyhow!(
725 "WAL replay failed at event {idx}: missing node {node_id} for label removal"
726 ));
727 }
728 }
729 MutationEvent::SetRelationshipProperty { rel_id, key, value } => {
730 if !graph.set_relationship_property(rel_id, key, value) {
731 return Err(anyhow!(
732 "WAL replay failed at event {idx}: missing relationship {rel_id} for property set"
733 ));
734 }
735 }
736 MutationEvent::RemoveRelationshipProperty { rel_id, key } => {
737 if !graph.remove_relationship_property(rel_id, &key) {
738 return Err(anyhow!(
739 "WAL replay failed at event {idx}: missing relationship {rel_id} for property removal"
740 ));
741 }
742 }
743 MutationEvent::DeleteRelationship { rel_id } => {
744 if !graph.delete_relationship(rel_id) {
745 return Err(anyhow!(
746 "WAL replay failed at event {idx}: missing relationship {rel_id} for delete"
747 ));
748 }
749 }
750 MutationEvent::DeleteNode { node_id } => {
751 if !graph.delete_node(node_id) {
752 return Err(anyhow!(
753 "WAL replay failed at event {idx}: missing or attached node {node_id} for delete"
754 ));
755 }
756 }
757 MutationEvent::DetachDeleteNode { node_id } => {
758 // After the cascading DeleteRelationship +
759 // DeleteNode events have already replayed, the node
760 // is gone and this becomes a no-op. Calling it
761 // anyway is harmless.
762 graph.detach_delete_node(node_id);
763 }
764 MutationEvent::Clear => {
765 graph.clear();
766 }
767 }
768 }
769 Ok(())
770}