lora_database/database.rs
1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, BufWriter};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError};
6use std::time::{Duration, Instant};
7
8use anyhow::{anyhow, Result};
9use lora_analyzer::Analyzer;
10use lora_ast::Document;
11use lora_compiler::{CompiledQuery, Compiler};
12use lora_executor::{
13 classify_stream, compiled_result_columns, project_rows, ExecuteOptions, LoraValue,
14 MutableExecutionContext, MutableExecutor, QueryResult, Row, StreamShape,
15};
16use lora_parser::parse_query;
17use lora_store::{
18 GraphStorage, GraphStorageMut, InMemoryGraph, MutationEvent, MutationRecorder, SnapshotMeta,
19 Snapshotable,
20};
21use lora_wal::{replay_dir, Lsn, Wal, WalConfig, WalMirror, WalRecorder, WroteCommit};
22
23use crate::archive::WalArchive;
24use crate::named::{DatabaseName, DatabaseOpenOptions};
25use crate::stream::{AutoCommitGuard, LiveCursor, QueryStream};
26use crate::transaction::{LiveStoreGuard, Transaction, TransactionMode};
27
28/// Minimal abstraction any transport can depend on to run Lora queries.
29pub trait QueryRunner: Send + Sync + 'static {
30 fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
31}
32
33/// Owns the graph store and orchestrates parse → analyze → compile → execute.
34///
35/// Optionally drives a write-ahead log: when constructed via
36/// [`Database::open_with_wal`] or [`Database::recover`] the database
37/// holds an [`Arc<WalRecorder>`] that brackets every query with
38/// `begin → mutations → commit/abort → flush` while the store write
39/// lock is held, so the WAL order is exactly the in-memory commit order.
40/// When constructed via [`Database::in_memory`] / [`Database::from_graph`]
41/// the WAL handle is `None` and the engine pays only the existing
42/// `MutationRecorder::record` null-pointer check per mutation.
43pub struct Database<S> {
44 pub(crate) store: Arc<RwLock<S>>,
45 pub(crate) wal: Option<Arc<WalRecorder>>,
46}
47
48impl Database<InMemoryGraph> {
49 /// Convenience constructor: a fresh, empty in-memory graph database.
50 pub fn in_memory() -> Self {
51 Self::from_graph(InMemoryGraph::new())
52 }
53
54 /// Open or create a WAL-enabled in-memory database from a fresh
55 /// graph.
56 ///
57 /// `WalConfig::Disabled` falls back to [`Database::in_memory`].
58 /// Otherwise, opens the WAL directory, replays any committed
59 /// events into a fresh graph, installs a [`WalRecorder`] on the
60 /// graph, and returns a database ready to serve queries.
61 ///
62 /// To restore from a snapshot in addition to the WAL, use
63 /// [`Database::recover`] instead.
64 pub fn open_with_wal(wal_config: WalConfig) -> Result<Self> {
65 match wal_config {
66 WalConfig::Disabled => Ok(Self::in_memory()),
67 WalConfig::Enabled {
68 dir,
69 sync_mode,
70 segment_target_bytes,
71 } => {
72 let mut graph = InMemoryGraph::new();
73 let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, Lsn::ZERO)?;
74 replay_into(&mut graph, events)?;
75 let recorder = Arc::new(WalRecorder::new(wal));
76 graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
77 Ok(Self {
78 store: Arc::new(RwLock::new(graph)),
79 wal: Some(recorder),
80 })
81 }
82 }
83 }
84
85 /// Open or create a named portable database rooted under
86 /// `options.database_dir`.
87 ///
88 /// The database name may be either a portable basename (`app` or
89 /// `app.loradb`) or a safe relative path (`tenant/app`). It is resolved
90 /// under `options.database_dir` before the WAL archive backend opens.
91 pub fn open_named(
92 database_name: impl AsRef<str>,
93 options: DatabaseOpenOptions,
94 ) -> Result<Self> {
95 let name = DatabaseName::parse(database_name.as_ref())?;
96 let archive = Arc::new(WalArchive::open(
97 options.database_path_for(&name),
98 options.max_database_bytes,
99 )?);
100 let mut graph = InMemoryGraph::new();
101 let (wal, events) = Wal::open(
102 archive.work_dir(),
103 options.sync_mode,
104 options.segment_target_bytes,
105 Lsn::ZERO,
106 )?;
107 replay_into(&mut graph, events)?;
108 let mirror: Arc<dyn WalMirror> = archive;
109 let recorder = Arc::new(WalRecorder::new_with_mirror(wal, Some(mirror)));
110 graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
111 // Mark the archive dirty so a fresh named database is materialized as
112 // a portable ZIP. The archive writer coalesces this with any immediate
113 // follow-up writes and flushes it in the background, with a final flush
114 // on database drop.
115 recorder
116 .flush()
117 .map_err(|e| anyhow!("initial database archive persist failed: {e}"))?;
118 Ok(Self {
119 store: Arc::new(RwLock::new(graph)),
120 wal: Some(recorder),
121 })
122 }
123
124 /// Start an explicit transaction.
125 ///
126 /// Read-only transactions hold a shared read lock for their
127 /// lifetime; read-write transactions hold the write lock. The
128 /// staging clone is **lazy** — it only happens when a
129 /// [`TransactionMode::ReadWrite`] transaction sees its first
130 /// mutating statement. Materialized read-only statements run
131 /// straight against the live graph; tx-bound streams may still
132 /// clone so their cursors can own a stable view. ReadWrite
133 /// transactions that perform only materialized reads (or commit
134 /// empty) pay nothing for staging.
135 pub fn begin_transaction(&self, mode: TransactionMode) -> Result<Transaction<'_>> {
136 let live = match mode {
137 TransactionMode::ReadOnly => LiveStoreGuard::Read(self.read_store()),
138 TransactionMode::ReadWrite => LiveStoreGuard::Write(self.write_store()),
139 };
140 Ok(Transaction::new(live, self.wal.clone(), mode))
141 }
142
143 /// Force any pending WAL bytes to durable storage and, for archive-backed
144 /// databases, refresh the portable `.loradb` file before returning.
145 pub fn sync(&self) -> Result<()> {
146 if let Some(wal) = &self.wal {
147 wal.force_fsync()?;
148 }
149 Ok(())
150 }
151
152 /// Restore from a snapshot file then replay any WAL records past
153 /// it.
154 ///
155 /// The snapshot's `wal_lsn` (when set) becomes the replay fence —
156 /// events at or below that LSN are already represented in the
157 /// loaded snapshot and are skipped. A missing snapshot file is
158 /// treated as "fresh start" so operators can pass the same path
159 /// on every boot.
160 ///
161 /// If the WAL contains a checkpoint marker newer than the
162 /// snapshot's `wal_lsn`, a one-line warning is printed to stderr
163 /// — the snapshot is stale relative to a more recent checkpoint
164 /// the operator is presumably aware of. Recovery still proceeds
165 /// from the snapshot's fence (replay re-applies every record
166 /// above it, which is conservative-correct); a tighter contract
167 /// is deferred to v2 because verifying that the marker's
168 /// snapshot file actually exists and is loadable is a separate
169 /// observability concern.
170 pub fn recover(snapshot_path: impl AsRef<Path>, wal_config: WalConfig) -> Result<Self> {
171 let snapshot_path = snapshot_path.as_ref();
172 let mut graph = InMemoryGraph::new();
173 let snapshot_lsn = match File::open(snapshot_path) {
174 Ok(f) => {
175 let reader = BufReader::new(f);
176 let meta = graph.load_snapshot(reader)?;
177 meta.wal_lsn.map(Lsn::new).unwrap_or(Lsn::ZERO)
178 }
179 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Lsn::ZERO,
180 Err(e) => return Err(e.into()),
181 };
182
183 match wal_config {
184 WalConfig::Disabled => Ok(Self::from_graph(graph)),
185 WalConfig::Enabled {
186 dir,
187 sync_mode,
188 segment_target_bytes,
189 } => {
190 // Diagnostic peek at the WAL's newest checkpoint
191 // marker so we can warn the operator about a stale
192 // snapshot before we start replaying. Treat any error
193 // as "no marker" — the subsequent `Wal::open` will
194 // surface the real failure if there is one.
195 if dir.exists() {
196 if let Ok(outcome) = replay_dir(&dir, Lsn::ZERO) {
197 if let Some(marker) = outcome.checkpoint_lsn_observed {
198 if marker > snapshot_lsn {
199 eprintln!(
200 "lora-wal: snapshot at LSN {} is older than the newest \
201 checkpoint marker on disk (LSN {}). Replaying every WAL \
202 record above LSN {}; consider passing the more recent \
203 snapshot to --restore-from.",
204 snapshot_lsn.raw(),
205 marker.raw(),
206 snapshot_lsn.raw()
207 );
208 }
209 }
210 }
211 }
212
213 let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, snapshot_lsn)?;
214 replay_into(&mut graph, events)?;
215 let recorder = Arc::new(WalRecorder::new(wal));
216 graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
217 Ok(Self {
218 store: Arc::new(RwLock::new(graph)),
219 wal: Some(recorder),
220 })
221 }
222 }
223 }
224
225 /// Execute a query and return an owning row stream.
226 pub fn stream(&self, query: &str) -> Result<QueryStream<'_>> {
227 self.stream_with_params(query, BTreeMap::new())
228 }
229
230 /// Execute a parameterised query and return an owning row stream.
231 ///
232 /// The compiled plan is classified at open time. Read-only
233 /// queries run directly off the live store and yield a
234 /// buffered cursor with plan-derived columns. Mutating queries
235 /// are routed through a hidden read-write [`Transaction`]:
236 /// full cursor exhaustion calls `tx.commit` (publishing staged
237 /// changes and replaying the tx-local WAL buffer); a premature
238 /// drop or any error from `next_row` calls `tx.rollback` so
239 /// the live store and the WAL stay untouched.
240 pub fn stream_with_params(
241 &self,
242 query: &str,
243 params: BTreeMap<String, LoraValue>,
244 ) -> Result<QueryStream<'_>> {
245 // Classify by compiling once against the live store. The
246 // mutating branch then re-compiles inside the hidden
247 // transaction (against a staged graph that's identical to
248 // live at clone time, so the second plan matches the
249 // first); the cost is one extra parse+analyze+compile per
250 // mutating stream, paid in exchange for a tiny
251 // classify-stream surface.
252 let document = parse_query(query)?;
253 let store_guard = self.read_store();
254 let resolved = {
255 let mut analyzer = Analyzer::new(&*store_guard);
256 analyzer.analyze(&document)?
257 };
258 let compiled = Compiler::compile(&resolved);
259 let columns = compiled_result_columns(&compiled);
260 let shape = classify_stream(&compiled);
261 // Release the analyzer's lock before either branch
262 // re-acquires (read-only path keeps it; mutating path
263 // delegates to begin_transaction which takes its own).
264 drop(store_guard);
265
266 match shape {
267 StreamShape::ReadOnly => {
268 // True pull-shaped streaming. `LiveCursor` holds
269 // the live store lock and the cursor that
270 // borrows from it; its `Drop` releases them in
271 // the right order so the caller observes pure
272 // pull semantics with no intermediate
273 // materialization.
274 let live = LiveCursor::open(self.store.clone(), compiled, params)?;
275 Ok(QueryStream::live(live, columns))
276 }
277 StreamShape::Mutating => {
278 // Hidden auto-commit transaction. The transaction
279 // owns staging, the buffering recorder, savepoint
280 // management, and the WAL replay-on-commit logic;
281 // we just pick commit-on-exhaustion vs
282 // rollback-on-drop based on cursor state.
283 //
284 // The cursor returned by `open_streaming_compiled_autocommit`
285 // may be a real per-row `StreamingWriteCursor`,
286 // a mutable UNION cursor, or a buffered leaf for
287 // operators that still need full materialization.
288 // Either way the AutoCommit guard's
289 // drop/exhaustion semantics are identical. The
290 // compiled plan is wrapped in an `Arc` so the
291 // cursor's `'static` borrows into it remain valid
292 // for the cursor's lifetime.
293 let mut tx = self.begin_transaction(TransactionMode::ReadWrite)?;
294 let compiled_arc = Arc::new(compiled);
295 let cursor =
296 match tx.open_streaming_compiled_autocommit(compiled_arc.clone(), params) {
297 Ok(c) => c,
298 Err(err) => {
299 // Tx rolls back implicitly on drop here.
300 return Err(err);
301 }
302 };
303 let guard = AutoCommitGuard {
304 tx: Some(tx),
305 finalized: false,
306 };
307 Ok(QueryStream::auto_commit(cursor, columns, guard))
308 }
309 }
310 }
311
312 /// Open a stream whose lifetime can be carried by an outer owner that
313 /// also retains an `Arc<Database>`.
314 ///
315 /// # Safety
316 ///
317 /// The returned stream may contain lock guards that borrow from the
318 /// database's internal `RwLock`. The caller must keep this exact `Arc`
319 /// alive until the stream is dropped. This is intended for language
320 /// bindings that store both the `Arc<Database>` and the `QueryStream` in
321 /// the same opaque stream handle.
322 pub unsafe fn stream_with_params_owned(
323 self: &Arc<Self>,
324 query: &str,
325 params: BTreeMap<String, LoraValue>,
326 ) -> Result<QueryStream<'static>> {
327 let stream = self.stream_with_params(query, params)?;
328 Ok(std::mem::transmute::<QueryStream<'_>, QueryStream<'static>>(stream))
329 }
330}
331
332impl<S> Database<S>
333where
334 S: GraphStorage + GraphStorageMut,
335{
336 /// Build a database from a pre-wrapped, shared store.
337 pub fn new(store: Arc<RwLock<S>>) -> Self {
338 Self { store, wal: None }
339 }
340
341 /// Build a database by taking ownership of a bare graph store.
342 pub fn from_graph(graph: S) -> Self {
343 Self::new(Arc::new(RwLock::new(graph)))
344 }
345
346 /// Handle to the installed WAL recorder, if any. Exposed for
347 /// admin paths (checkpoint, truncate, observability) that need
348 /// to drive the WAL outside the standard query lifecycle.
349 pub fn wal(&self) -> Option<&Arc<WalRecorder>> {
350 self.wal.as_ref()
351 }
352
353 /// Handle to the underlying shared store — useful for callers that need
354 /// to snapshot or share the graph across multiple databases.
355 pub fn store(&self) -> &Arc<RwLock<S>> {
356 &self.store
357 }
358
359 /// Parse a query string into an AST without executing it.
360 pub fn parse(&self, query: &str) -> Result<Document> {
361 Ok(parse_query(query)?)
362 }
363
364 pub(crate) fn read_store(&self) -> RwLockReadGuard<'_, S> {
365 self.store
366 .read()
367 .unwrap_or_else(|poisoned| poisoned.into_inner())
368 }
369
370 pub(crate) fn write_store(&self) -> RwLockWriteGuard<'_, S> {
371 self.store
372 .write()
373 .unwrap_or_else(|poisoned| poisoned.into_inner())
374 }
375
376 fn read_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockReadGuard<'_, S>> {
377 let Some(deadline) = deadline else {
378 return Ok(self.read_store());
379 };
380
381 loop {
382 match self.store.try_read() {
383 Ok(guard) => return Ok(guard),
384 Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
385 Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
386 return Err(anyhow!("query deadline exceeded"));
387 }
388 Err(TryLockError::WouldBlock) => {
389 std::thread::sleep(Duration::from_millis(1));
390 }
391 }
392 }
393 }
394
395 fn write_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockWriteGuard<'_, S>> {
396 let Some(deadline) = deadline else {
397 return Ok(self.write_store());
398 };
399
400 loop {
401 match self.store.try_write() {
402 Ok(guard) => return Ok(guard),
403 Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
404 Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
405 return Err(anyhow!("query deadline exceeded"));
406 }
407 Err(TryLockError::WouldBlock) => {
408 std::thread::sleep(Duration::from_millis(1));
409 }
410 }
411 }
412 }
413
414 fn compile_document_against(&self, document: &Document, store: &S) -> Result<CompiledQuery> {
415 let resolved = {
416 let mut analyzer = Analyzer::new(store);
417 analyzer.analyze(document)?
418 };
419
420 Ok(Compiler::compile(&resolved))
421 }
422
423 /// Execute a query and return its result.
424 pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
425 self.execute_with_params(query, options, BTreeMap::new())
426 }
427
428 /// Execute a query with a cooperative deadline. The timeout is checked at
429 /// executor operator boundaries and hot scan loops; if it fires, the query
430 /// returns an error and any WAL-backed mutating query is aborted through
431 /// the existing failure path.
432 pub fn execute_with_timeout(
433 &self,
434 query: &str,
435 options: Option<ExecuteOptions>,
436 timeout: Duration,
437 ) -> Result<QueryResult> {
438 let deadline = Instant::now()
439 .checked_add(timeout)
440 .unwrap_or_else(Instant::now);
441 let rows =
442 self.execute_rows_with_params_deadline(query, BTreeMap::new(), Some(deadline))?;
443 Ok(project_rows(rows, options.unwrap_or_default()))
444 }
445
446 /// Execute a query with bound parameters.
447 ///
448 /// When a WAL is attached the call is bracketed by a transaction:
449 ///
450 /// 1. `recorder.arm()` after analyze + compile (so a parse /
451 /// semantic / compile error never opens a tx that has to be
452 /// immediately aborted). Arming is *cheap*: no record is
453 /// appended to the WAL yet, so a pure read query that
454 /// completes here pays nothing for the WAL hot path.
455 /// 2. The executor runs; every primitive mutation fires
456 /// `MutationRecorder::record`, which buffers events in memory.
457 /// 3. On Ok, `recorder.commit()` writes `TxBegin`, one batched
458 /// mutation record, and `TxCommit` only when mutations occurred;
459 /// the surrounding `recorder.flush()` runs only in that case so
460 /// a read-only query never pays an `fsync`.
461 /// 4. On Err, `recorder.abort()` clears the pending batch. The
462 /// engine has no rollback, so the in-memory state may already
463 /// be partially mutated; the live handle is quarantined while
464 /// durable recovery stays atomic because no committed batch was
465 /// written.
466 /// 5. The recorder's poisoned flag is polled once (it also
467 /// surfaces background-flusher fsync failures from
468 /// `SyncMode::Group`). If set, the query fails loudly with the
469 /// durability error so the caller can act on it; the WAL
470 /// refuses further appends until the operator restarts the
471 /// database, which recovers from the last consistent
472 /// snapshot + WAL.
473 pub fn execute_with_params(
474 &self,
475 query: &str,
476 options: Option<ExecuteOptions>,
477 params: BTreeMap<String, LoraValue>,
478 ) -> Result<QueryResult> {
479 let rows = self.execute_rows_with_params_deadline(query, params, None)?;
480 Ok(project_rows(rows, options.unwrap_or_default()))
481 }
482
483 /// Execute a parameterised query with a cooperative deadline.
484 pub fn execute_with_params_timeout(
485 &self,
486 query: &str,
487 options: Option<ExecuteOptions>,
488 params: BTreeMap<String, LoraValue>,
489 timeout: Duration,
490 ) -> Result<QueryResult> {
491 let deadline = Instant::now()
492 .checked_add(timeout)
493 .unwrap_or_else(Instant::now);
494 let rows = self.execute_rows_with_params_deadline(query, params, Some(deadline))?;
495 Ok(project_rows(rows, options.unwrap_or_default()))
496 }
497
498 /// Execute a query and return hydrated rows before final result-format
499 /// projection.
500 pub fn execute_rows(&self, query: &str) -> Result<Vec<Row>> {
501 self.execute_rows_with_params(query, BTreeMap::new())
502 }
503
504 /// Execute a query with parameters and return hydrated rows before final
505 /// result-format projection.
506 pub fn execute_rows_with_params(
507 &self,
508 query: &str,
509 params: BTreeMap<String, LoraValue>,
510 ) -> Result<Vec<Row>> {
511 self.execute_rows_with_params_deadline(query, params, None)
512 }
513
514 fn execute_rows_with_params_deadline(
515 &self,
516 query: &str,
517 params: BTreeMap<String, LoraValue>,
518 deadline: Option<Instant>,
519 ) -> Result<Vec<Row>> {
520 let document = self.parse(query)?;
521 let shape = {
522 let store = self.read_store_deadline(deadline)?;
523 let compiled = self.compile_document_against(&document, &*store)?;
524
525 if matches!(classify_stream(&compiled), StreamShape::ReadOnly) {
526 if let Some(rec) = &self.wal {
527 if let Some(reason) = rec.poisoned() {
528 return Err(anyhow!("WAL arm failed: WAL poisoned: {reason}"));
529 }
530 }
531 let executor = lora_executor::Executor::with_deadline(
532 lora_executor::ExecutionContext {
533 storage: &*store,
534 params,
535 },
536 deadline,
537 );
538 return executor
539 .execute_compiled_rows(&compiled)
540 .map_err(|e| anyhow!(e));
541 }
542
543 classify_stream(&compiled)
544 };
545
546 debug_assert!(shape.is_mutating());
547
548 let mut store = self.write_store_deadline(deadline)?;
549 let compiled = self.compile_document_against(&document, &*store)?;
550
551 if let Some(rec) = &self.wal {
552 rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
553 }
554
555 let exec_result: Result<Vec<Row>> = (|| {
556 let mut executor = MutableExecutor::with_deadline(
557 MutableExecutionContext {
558 storage: &mut *store,
559 params,
560 },
561 deadline,
562 );
563 Ok(executor.execute_compiled_rows(&compiled)?)
564 })();
565
566 if let Some(rec) = &self.wal {
567 match &exec_result {
568 Ok(_) => match rec.commit() {
569 Ok(WroteCommit::Yes) => {
570 rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
571 }
572 Ok(WroteCommit::No) => {
573 // Read-only query: no records were written
574 // and there is nothing to fsync. Skip flush
575 // entirely so PerCommit pays zero fsyncs on
576 // pure reads.
577 }
578 Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
579 },
580 Err(_) => {
581 // Best-effort abort. If the WAL saw mutations, durable
582 // recovery will discard them but the live in-memory store
583 // may already be ahead of durable state. Quarantine this
584 // handle so callers restart instead of serving from a
585 // potentially divergent graph.
586 if matches!(rec.abort(), Ok(true)) {
587 rec.poison(
588 "query mutated the live graph before failing; restart from snapshot + WAL required",
589 );
590 }
591 }
592 }
593 if let Some(reason) = rec.poisoned() {
594 return Err(anyhow!("WAL poisoned: {reason}"));
595 }
596 }
597
598 exec_result
599 }
600
601 // ---------- Storage-agnostic utility helpers ----------
602 //
603 // Bindings previously reached into the shared store lock to answer
604 // stat / admin calls; these helpers let them depend on `Database<S>`
605 // instead, so swapping in a new backend only requires changing one type
606 // parameter.
607
608 /// Drop every node and relationship, returning WAL/archive errors to the
609 /// caller.
610 ///
611 /// When a WAL is attached, the clear is wrapped in `arm`/`commit` so the
612 /// `MutationEvent::Clear` fired by the store reaches the log inside a
613 /// transaction. If a failure happens after the in-memory graph has been
614 /// cleared, the recorder is poisoned by the failing WAL path and future
615 /// writes fail until the database is reopened from durable state.
616 pub fn try_clear(&self) -> Result<()> {
617 let mut guard = self.write_store();
618 let Some(rec) = &self.wal else {
619 guard.clear();
620 return Ok(());
621 };
622
623 rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
624 guard.clear();
625 match rec.commit() {
626 Ok(WroteCommit::Yes) => {
627 rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
628 }
629 Ok(WroteCommit::No) => {}
630 Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
631 }
632 if let Some(reason) = rec.poisoned() {
633 return Err(anyhow!("WAL poisoned: {reason}"));
634 }
635 Ok(())
636 }
637
638 /// Drop every node and relationship.
639 ///
640 /// This compatibility helper keeps the historical infallible Rust API.
641 /// Bindings that can report errors should call [`Self::try_clear`].
642 pub fn clear(&self) {
643 let _ = self.try_clear();
644 }
645
646 /// Number of nodes currently in the graph.
647 pub fn node_count(&self) -> usize {
648 let guard = self.read_store();
649 guard.node_count()
650 }
651
652 /// Number of relationships currently in the graph.
653 pub fn relationship_count(&self) -> usize {
654 let guard = self.read_store();
655 guard.relationship_count()
656 }
657
658 /// Run a closure with a shared borrow of the underlying store. Used by
659 /// bindings to answer ad-hoc queries without locking the RwLock themselves.
660 pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
661 let guard = self.read_store();
662 f(&*guard)
663 }
664
665 /// Run a closure with an exclusive borrow of the underlying store. Reserved
666 /// for admin paths (restore, bulk load); regular mutation goes through
667 /// `execute_with_params`.
668 pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
669 let mut guard = self.write_store();
670 f(&mut *guard)
671 }
672}
673
674// ---------------------------------------------------------------------------
675// Snapshot helpers
676//
677// A second impl block so the `Snapshotable` bound only constrains backends
678// that actually need it. `Database<InMemoryGraph>` picks these up
679// automatically; hypothetical backends that don't implement `Snapshotable`
680// still get the core query API above.
681// ---------------------------------------------------------------------------
682
683impl<S> Database<S>
684where
685 S: GraphStorage + GraphStorageMut + Snapshotable,
686{
687 /// Serialize the current graph state to the given path. Writes are
688 /// atomic: the payload goes to `<path>.tmp`, is `fsync`'d, and then
689 /// renamed over the target; a torn write can never leave a half-written
690 /// file at `path`. If any step before the rename fails, the stale
691 /// `<path>.tmp` is removed so a crashed save never leaks scratch files.
692 ///
693 /// Holds a store read lock for the duration of the save so concurrent
694 /// readers can proceed and writers wait behind a consistent snapshot.
695 pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
696 let path = path.as_ref();
697 let tmp = snapshot_tmp_path(path);
698
699 // Acquire the lock once so the snapshot is point-in-time consistent.
700 let guard = self.read_store();
701
702 let file = OpenOptions::new()
703 .write(true)
704 .create(true)
705 .truncate(true)
706 .open(&tmp)?;
707 // Arm cleanup immediately after `open` succeeds: every early return
708 // below must either surface an error *and* unlink the tmp, or commit
709 // the guard once the rename takes effect.
710 let tmp_guard = TempFileGuard::new(tmp.clone());
711 let mut writer = BufWriter::new(file);
712
713 let meta = guard.save_snapshot(&mut writer)?;
714
715 // Flush the BufWriter before fsync; otherwise we fsync an empty
716 // underlying file.
717 use std::io::Write;
718 writer.flush()?;
719 let file = writer.into_inner().map_err(|e| e.into_error())?;
720 file.sync_all()?;
721 drop(file);
722
723 std::fs::rename(&tmp, path)?;
724 // The tmp path no longer has a file behind it — disarm the guard so
725 // it doesn't try to remove the just-renamed target by name race.
726 tmp_guard.commit();
727
728 // Best-effort parent-dir fsync so the rename itself is durable on
729 // power loss. Non-fatal if the parent can't be opened.
730 if let Some(parent) = path.parent() {
731 if let Ok(dir) = File::open(parent) {
732 let _ = dir.sync_all();
733 }
734 }
735
736 Ok(meta)
737 }
738
739 /// Replace the current graph state with a snapshot loaded from `path`.
740 /// Holds the store write lock for the duration of the load; concurrent
741 /// queries block until restore completes.
742 pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
743 let file = File::open(path.as_ref())?;
744 let reader = BufReader::new(file);
745
746 let mut guard = self.write_store();
747 Ok(guard.load_snapshot(reader)?)
748 }
749}
750
751impl Database<InMemoryGraph> {
752 /// Convenience constructor: open (or create) an empty in-memory database
753 /// and immediately restore it from `path`. Errors if the file cannot be
754 /// opened or the snapshot is malformed.
755 pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
756 let db = Self::in_memory();
757 db.load_snapshot_from(path)?;
758 Ok(db)
759 }
760
761 /// Take a checkpoint: snapshot the current state with the WAL's
762 /// `durable_lsn` stamped into the header, append a `Checkpoint`
763 /// marker to the WAL, then drop sealed segments at or below the
764 /// fence.
765 ///
766 /// Errors with "checkpoint requires WAL enabled" when called on a
767 /// database constructed without a WAL — operators that just want
768 /// a fence-less dump should use [`save_snapshot_to`] instead.
769 ///
770 /// The write-lock-held window covers snapshot serialization plus the
771 /// checkpoint marker append. Truncation runs after the rename
772 /// but still under the write lock; making it concurrent with queries
773 /// is a v2 concern (see `docs/decisions/0004-wal.md`).
774 pub fn checkpoint_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
775 let recorder = self
776 .wal
777 .as_ref()
778 .ok_or_else(|| anyhow!("checkpoint requires WAL enabled"))?;
779 let path = path.as_ref();
780 let tmp = snapshot_tmp_path(path);
781
782 let guard = self.write_store();
783
784 // Make every record appended so far durable, then capture
785 // the LSN that becomes the snapshot fence.
786 recorder
787 .force_fsync()
788 .map_err(|e| anyhow!("WAL fsync before checkpoint failed: {e}"))?;
789 let snapshot_lsn = recorder.wal().durable_lsn();
790
791 let file = OpenOptions::new()
792 .write(true)
793 .create(true)
794 .truncate(true)
795 .open(&tmp)?;
796 let tmp_guard = TempFileGuard::new(tmp.clone());
797 let mut writer = BufWriter::new(file);
798 let meta = guard.save_checkpoint(&mut writer, snapshot_lsn.raw())?;
799
800 use std::io::Write;
801 writer.flush()?;
802 let file = writer.into_inner().map_err(|e| e.into_error())?;
803 file.sync_all()?;
804 drop(file);
805
806 std::fs::rename(&tmp, path)?;
807 tmp_guard.commit();
808
809 if let Some(parent) = path.parent() {
810 if let Ok(dir) = File::open(parent) {
811 let _ = dir.sync_all();
812 }
813 }
814
815 // Append the checkpoint marker AFTER the rename succeeds —
816 // this preserves the invariant that a `Checkpoint` record
817 // in the WAL implies the snapshot it points at exists.
818 recorder
819 .checkpoint_marker(snapshot_lsn)
820 .map_err(|e| anyhow!("WAL checkpoint marker failed: {e}"))?;
821 recorder
822 .force_fsync()
823 .map_err(|e| anyhow!("WAL fsync after checkpoint marker failed: {e}"))?;
824
825 // Best-effort segment truncation. Failure here doesn't undo
826 // the checkpoint — the next call will retry.
827 let _ = recorder.truncate_up_to(snapshot_lsn);
828
829 Ok(meta)
830 }
831}
832
833fn snapshot_tmp_path(target: &Path) -> PathBuf {
834 let mut tmp = target.as_os_str().to_owned();
835 tmp.push(".tmp");
836 PathBuf::from(tmp)
837}
838
839/// RAII handle that deletes its path on drop unless [`commit`] is called.
840///
841/// The snapshot save path creates `<target>.tmp` before the payload is
842/// written; if any step between then and the final rename fails (or the
843/// thread unwinds), the guard's `Drop` removes the scratch file so a crashed
844/// save never leaves leftovers on disk.
845///
846/// [`commit`]: Self::commit
847struct TempFileGuard {
848 path: Option<PathBuf>,
849}
850
851impl TempFileGuard {
852 fn new(path: PathBuf) -> Self {
853 Self { path: Some(path) }
854 }
855
856 /// Disarm the guard. Call this once the tmp file's contents have been
857 /// handed off (e.g. renamed to their final destination) so the `Drop`
858 /// impl does not try to remove them.
859 fn commit(mut self) {
860 self.path.take();
861 }
862}
863
864impl Drop for TempFileGuard {
865 fn drop(&mut self) {
866 if let Some(path) = self.path.take() {
867 // Best-effort: cleanup failure is not worth surfacing — the
868 // worst case is a leaked scratch file that the next save
869 // overwrites via `OpenOptions::truncate(true)`.
870 let _ = std::fs::remove_file(path);
871 }
872 }
873}
874
875/// Storage-agnostic admin surface for HTTP / binding callers that want to
876/// drive snapshot operations without naming the backend type parameter.
877///
878/// `Database<S>` picks up a blanket impl when `S: Snapshotable + 'static`.
879/// Transports (e.g. `lora-server`) type-erase on `Arc<dyn SnapshotAdmin>`.
880pub trait SnapshotAdmin: Send + Sync + 'static {
881 fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
882 fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
883}
884
885impl<S> SnapshotAdmin for Database<S>
886where
887 S: GraphStorage + GraphStorageMut + Snapshotable + Send + Sync + 'static,
888{
889 fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
890 self.save_snapshot_to(path)
891 }
892
893 fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
894 self.load_snapshot_from(path)
895 }
896}
897
898/// Storage-agnostic admin surface for the WAL.
899///
900/// `Database<InMemoryGraph>` picks up the blanket impl below when a
901/// WAL is attached. Transports (e.g. `lora-server`) type-erase on
902/// `Arc<dyn WalAdmin>` so they don't need to name the backend type
903/// parameter.
904///
905/// All LSNs cross the trait boundary as raw `u64` so callers don't
906/// need a dependency on `lora-wal`.
907pub trait WalAdmin: Send + Sync + 'static {
908 /// Take a checkpoint at `path`. The snapshot's header is stamped
909 /// with the WAL's `durable_lsn`; older sealed segments are then
910 /// dropped.
911 fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta>;
912
913 /// Snapshot of the WAL's current state — durable / next LSN,
914 /// active / oldest segment id. Cheap; a single WAL mutex acquisition.
915 fn wal_status(&self) -> Result<WalStatus>;
916
917 /// Drop sealed segments at or below `fence_lsn`. Idempotent.
918 fn wal_truncate(&self, fence_lsn: u64) -> Result<()>;
919}
920
921/// Snapshot of WAL state returned by [`WalAdmin::wal_status`].
922///
923/// `bg_failure` is the latched fsync error from the background flusher
924/// (only meaningful under `SyncMode::Group`). When `Some`, the WAL is
925/// poisoned and every subsequent commit will fail loudly until the
926/// operator restarts from the last consistent snapshot + WAL.
927#[derive(Debug, Clone)]
928pub struct WalStatus {
929 pub durable_lsn: u64,
930 pub next_lsn: u64,
931 pub active_segment_id: u64,
932 pub oldest_segment_id: u64,
933 pub bg_failure: Option<String>,
934}
935
936impl WalAdmin for Database<InMemoryGraph> {
937 fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta> {
938 self.checkpoint_to(path)
939 }
940
941 fn wal_status(&self) -> Result<WalStatus> {
942 let recorder = self
943 .wal
944 .as_ref()
945 .ok_or_else(|| anyhow!("WAL not enabled"))?;
946 let wal = recorder.wal();
947 Ok(WalStatus {
948 durable_lsn: wal.durable_lsn().raw(),
949 next_lsn: wal.next_lsn().raw(),
950 active_segment_id: wal.active_segment_id(),
951 oldest_segment_id: wal.oldest_segment_id(),
952 bg_failure: wal.bg_failure(),
953 })
954 }
955
956 fn wal_truncate(&self, fence_lsn: u64) -> Result<()> {
957 let recorder = self
958 .wal
959 .as_ref()
960 .ok_or_else(|| anyhow!("WAL not enabled"))?;
961 recorder.truncate_up_to(Lsn::new(fence_lsn))?;
962 Ok(())
963 }
964}
965
966impl<S> QueryRunner for Database<S>
967where
968 S: GraphStorage + GraphStorageMut + Send + Sync + 'static,
969{
970 fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
971 Database::execute(self, query, options)
972 }
973}
974
975// ---------------------------------------------------------------------------
976// Replay
977// ---------------------------------------------------------------------------
978
979/// Apply a `MutationEvent` stream to an in-memory graph by dispatching
980/// each variant to the matching store operation.
981///
982/// Creation events are replayed through id-preserving paths, not the
983/// normal allocator-backed mutation methods. That matters after aborted
984/// transactions: an aborted create can consume id `N` in the original
985/// process, be dropped by replay, and leave the next committed create at
986/// id `N + 1`. Reusing the regular allocator would shift ids downward.
987///
988/// Replay must be invoked **before** the `WalRecorder` is installed
989/// on the graph. Otherwise the replay's own mutations would fire the
990/// recorder and re-write the same events to the WAL, doubling them on
991/// the next recovery.
992fn replay_into(graph: &mut InMemoryGraph, events: Vec<MutationEvent>) -> Result<()> {
993 for (idx, event) in events.into_iter().enumerate() {
994 match event {
995 MutationEvent::CreateNode {
996 id,
997 labels,
998 properties,
999 } => {
1000 graph
1001 .replay_create_node(id, labels, properties)
1002 .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
1003 }
1004 MutationEvent::CreateRelationship {
1005 id,
1006 src,
1007 dst,
1008 rel_type,
1009 properties,
1010 } => {
1011 graph
1012 .replay_create_relationship(id, src, dst, &rel_type, properties)
1013 .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
1014 }
1015 MutationEvent::SetNodeProperty {
1016 node_id,
1017 key,
1018 value,
1019 } => {
1020 if !graph.set_node_property(node_id, key, value) {
1021 return Err(anyhow!(
1022 "WAL replay failed at event {idx}: missing node {node_id} for property set"
1023 ));
1024 }
1025 }
1026 MutationEvent::RemoveNodeProperty { node_id, key } => {
1027 if !graph.remove_node_property(node_id, &key) {
1028 return Err(anyhow!(
1029 "WAL replay failed at event {idx}: missing node {node_id} for property removal"
1030 ));
1031 }
1032 }
1033 MutationEvent::AddNodeLabel { node_id, label } => {
1034 if !graph.add_node_label(node_id, &label) {
1035 return Err(anyhow!(
1036 "WAL replay failed at event {idx}: missing node {node_id} for label add"
1037 ));
1038 }
1039 }
1040 MutationEvent::RemoveNodeLabel { node_id, label } => {
1041 if !graph.remove_node_label(node_id, &label) {
1042 return Err(anyhow!(
1043 "WAL replay failed at event {idx}: missing node {node_id} for label removal"
1044 ));
1045 }
1046 }
1047 MutationEvent::SetRelationshipProperty { rel_id, key, value } => {
1048 if !graph.set_relationship_property(rel_id, key, value) {
1049 return Err(anyhow!(
1050 "WAL replay failed at event {idx}: missing relationship {rel_id} for property set"
1051 ));
1052 }
1053 }
1054 MutationEvent::RemoveRelationshipProperty { rel_id, key } => {
1055 if !graph.remove_relationship_property(rel_id, &key) {
1056 return Err(anyhow!(
1057 "WAL replay failed at event {idx}: missing relationship {rel_id} for property removal"
1058 ));
1059 }
1060 }
1061 MutationEvent::DeleteRelationship { rel_id } => {
1062 if !graph.delete_relationship(rel_id) {
1063 return Err(anyhow!(
1064 "WAL replay failed at event {idx}: missing relationship {rel_id} for delete"
1065 ));
1066 }
1067 }
1068 MutationEvent::DeleteNode { node_id } => {
1069 if !graph.delete_node(node_id) {
1070 return Err(anyhow!(
1071 "WAL replay failed at event {idx}: missing or attached node {node_id} for delete"
1072 ));
1073 }
1074 }
1075 MutationEvent::DetachDeleteNode { node_id } => {
1076 // After the cascading DeleteRelationship +
1077 // DeleteNode events have already replayed, the node
1078 // is gone and this becomes a no-op. Calling it
1079 // anyway is harmless.
1080 graph.detach_delete_node(node_id);
1081 }
1082 MutationEvent::Clear => {
1083 graph.clear();
1084 }
1085 }
1086 }
1087 Ok(())
1088}