lora_database/database.rs
1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, BufWriter};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError};
6use std::time::{Duration, Instant};
7
8use anyhow::{anyhow, Result};
9use lora_analyzer::Analyzer;
10use lora_ast::Document;
11use lora_compiler::{CompiledQuery, Compiler};
12use lora_executor::{
13 classify_stream, compiled_result_columns, project_rows, ExecuteOptions, LoraValue,
14 MutableExecutionContext, MutableExecutor, QueryResult, Row, StreamShape,
15};
16use lora_parser::parse_query;
17use lora_store::{
18 GraphStorage, GraphStorageMut, InMemoryGraph, MutationEvent, MutationRecorder, SnapshotMeta,
19 Snapshotable,
20};
21use lora_wal::{replay_dir, Lsn, Wal, WalConfig, WalMirror, WalRecorder, WroteCommit};
22
23use crate::archive::WalArchive;
24use crate::named::{DatabaseName, DatabaseOpenOptions};
25use crate::stream::{AutoCommitGuard, LiveCursor, QueryStream};
26use crate::transaction::{LiveStoreGuard, Transaction, TransactionMode};
27
28/// Minimal abstraction any transport can depend on to run Lora queries.
29pub trait QueryRunner: Send + Sync + 'static {
30 fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
31}
32
33/// Owns the graph store and orchestrates parse → analyze → compile → execute.
34///
35/// Optionally drives a write-ahead log: when constructed via
36/// [`Database::open_with_wal`] or [`Database::recover`] the database
37/// holds an [`Arc<WalRecorder>`] that brackets every query with
38/// `begin → mutations → commit/abort → flush` while the store write
39/// lock is held, so the WAL order is exactly the in-memory commit order.
40/// When constructed via [`Database::in_memory`] / [`Database::from_graph`]
41/// the WAL handle is `None` and the engine pays only the existing
42/// `MutationRecorder::record` null-pointer check per mutation.
43pub struct Database<S> {
44 pub(crate) store: Arc<RwLock<S>>,
45 pub(crate) wal: Option<Arc<WalRecorder>>,
46}
47
48impl Database<InMemoryGraph> {
49 /// Convenience constructor: a fresh, empty in-memory graph database.
50 pub fn in_memory() -> Self {
51 Self::from_graph(InMemoryGraph::new())
52 }
53
54 /// Open or create a WAL-enabled in-memory database from a fresh
55 /// graph.
56 ///
57 /// `WalConfig::Disabled` falls back to [`Database::in_memory`].
58 /// Otherwise, opens the WAL directory, replays any committed
59 /// events into a fresh graph, installs a [`WalRecorder`] on the
60 /// graph, and returns a database ready to serve queries.
61 ///
62 /// To restore from a snapshot in addition to the WAL, use
63 /// [`Database::recover`] instead.
64 pub fn open_with_wal(wal_config: WalConfig) -> Result<Self> {
65 match wal_config {
66 WalConfig::Disabled => Ok(Self::in_memory()),
67 WalConfig::Enabled {
68 dir,
69 sync_mode,
70 segment_target_bytes,
71 } => {
72 let mut graph = InMemoryGraph::new();
73 let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, Lsn::ZERO)?;
74 replay_into(&mut graph, events)?;
75 let recorder = Arc::new(WalRecorder::new(wal));
76 graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
77 Ok(Self {
78 store: Arc::new(RwLock::new(graph)),
79 wal: Some(recorder),
80 })
81 }
82 }
83 }
84
85 /// Open or create a named portable database rooted under
86 /// `options.database_dir`.
87 ///
88 /// The database name is a logical identifier, not a path. It must contain
89 /// only ASCII letters, digits, `_`, `-`, and `.`, and is resolved to
90 /// `<database_dir>/<database_name>.lora` before the WAL backend opens.
91 pub fn open_named(
92 database_name: impl AsRef<str>,
93 options: DatabaseOpenOptions,
94 ) -> Result<Self> {
95 let name = DatabaseName::parse(database_name.as_ref())?;
96 let archive = Arc::new(WalArchive::open(
97 options.database_path_for(&name),
98 options.max_database_bytes,
99 )?);
100 let mut graph = InMemoryGraph::new();
101 let (wal, events) = Wal::open(
102 archive.work_dir(),
103 options.sync_mode,
104 options.segment_target_bytes,
105 Lsn::ZERO,
106 )?;
107 replay_into(&mut graph, events)?;
108 let mirror: Arc<dyn WalMirror> = archive;
109 let recorder = Arc::new(WalRecorder::new_with_mirror(wal, Some(mirror)));
110 graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
111 // Mark the archive dirty so a fresh named database is materialized as
112 // a portable ZIP. The archive writer coalesces this with any immediate
113 // follow-up writes and flushes it in the background, with a final flush
114 // on database drop.
115 recorder
116 .flush()
117 .map_err(|e| anyhow!("initial database archive persist failed: {e}"))?;
118 Ok(Self {
119 store: Arc::new(RwLock::new(graph)),
120 wal: Some(recorder),
121 })
122 }
123
124 /// Start an explicit transaction.
125 ///
126 /// Read-only transactions hold a shared read lock for their
127 /// lifetime; read-write transactions hold the write lock. The
128 /// staging clone is **lazy** — it only happens when a
129 /// [`TransactionMode::ReadWrite`] transaction sees its first
130 /// mutating statement. Materialized read-only statements run
131 /// straight against the live graph; tx-bound streams may still
132 /// clone so their cursors can own a stable view. ReadWrite
133 /// transactions that perform only materialized reads (or commit
134 /// empty) pay nothing for staging.
135 pub fn begin_transaction(&self, mode: TransactionMode) -> Result<Transaction<'_>> {
136 let live = match mode {
137 TransactionMode::ReadOnly => LiveStoreGuard::Read(self.read_store()),
138 TransactionMode::ReadWrite => LiveStoreGuard::Write(self.write_store()),
139 };
140 Ok(Transaction::new(live, self.wal.clone(), mode))
141 }
142
143 /// Restore from a snapshot file then replay any WAL records past
144 /// it.
145 ///
146 /// The snapshot's `wal_lsn` (when set) becomes the replay fence —
147 /// events at or below that LSN are already represented in the
148 /// loaded snapshot and are skipped. A missing snapshot file is
149 /// treated as "fresh start" so operators can pass the same path
150 /// on every boot.
151 ///
152 /// If the WAL contains a checkpoint marker newer than the
153 /// snapshot's `wal_lsn`, a one-line warning is printed to stderr
154 /// — the snapshot is stale relative to a more recent checkpoint
155 /// the operator is presumably aware of. Recovery still proceeds
156 /// from the snapshot's fence (replay re-applies every record
157 /// above it, which is conservative-correct); a tighter contract
158 /// is deferred to v2 because verifying that the marker's
159 /// snapshot file actually exists and is loadable is a separate
160 /// observability concern.
161 pub fn recover(snapshot_path: impl AsRef<Path>, wal_config: WalConfig) -> Result<Self> {
162 let snapshot_path = snapshot_path.as_ref();
163 let mut graph = InMemoryGraph::new();
164 let snapshot_lsn = match File::open(snapshot_path) {
165 Ok(f) => {
166 let reader = BufReader::new(f);
167 let meta = graph.load_snapshot(reader)?;
168 meta.wal_lsn.map(Lsn::new).unwrap_or(Lsn::ZERO)
169 }
170 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Lsn::ZERO,
171 Err(e) => return Err(e.into()),
172 };
173
174 match wal_config {
175 WalConfig::Disabled => Ok(Self::from_graph(graph)),
176 WalConfig::Enabled {
177 dir,
178 sync_mode,
179 segment_target_bytes,
180 } => {
181 // Diagnostic peek at the WAL's newest checkpoint
182 // marker so we can warn the operator about a stale
183 // snapshot before we start replaying. Treat any error
184 // as "no marker" — the subsequent `Wal::open` will
185 // surface the real failure if there is one.
186 if dir.exists() {
187 if let Ok(outcome) = replay_dir(&dir, Lsn::ZERO) {
188 if let Some(marker) = outcome.checkpoint_lsn_observed {
189 if marker > snapshot_lsn {
190 eprintln!(
191 "lora-wal: snapshot at LSN {} is older than the newest \
192 checkpoint marker on disk (LSN {}). Replaying every WAL \
193 record above LSN {}; consider passing the more recent \
194 snapshot to --restore-from.",
195 snapshot_lsn.raw(),
196 marker.raw(),
197 snapshot_lsn.raw()
198 );
199 }
200 }
201 }
202 }
203
204 let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, snapshot_lsn)?;
205 replay_into(&mut graph, events)?;
206 let recorder = Arc::new(WalRecorder::new(wal));
207 graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
208 Ok(Self {
209 store: Arc::new(RwLock::new(graph)),
210 wal: Some(recorder),
211 })
212 }
213 }
214 }
215
216 /// Execute a query and return an owning row stream.
217 pub fn stream(&self, query: &str) -> Result<QueryStream<'_>> {
218 self.stream_with_params(query, BTreeMap::new())
219 }
220
221 /// Execute a parameterised query and return an owning row stream.
222 ///
223 /// The compiled plan is classified at open time. Read-only
224 /// queries run directly off the live store and yield a
225 /// buffered cursor with plan-derived columns. Mutating queries
226 /// are routed through a hidden read-write [`Transaction`]:
227 /// full cursor exhaustion calls `tx.commit` (publishing staged
228 /// changes and replaying the tx-local WAL buffer); a premature
229 /// drop or any error from `next_row` calls `tx.rollback` so
230 /// the live store and the WAL stay untouched.
231 pub fn stream_with_params(
232 &self,
233 query: &str,
234 params: BTreeMap<String, LoraValue>,
235 ) -> Result<QueryStream<'_>> {
236 // Classify by compiling once against the live store. The
237 // mutating branch then re-compiles inside the hidden
238 // transaction (against a staged graph that's identical to
239 // live at clone time, so the second plan matches the
240 // first); the cost is one extra parse+analyze+compile per
241 // mutating stream, paid in exchange for a tiny
242 // classify-stream surface.
243 let document = parse_query(query)?;
244 let store_guard = self.read_store();
245 let resolved = {
246 let mut analyzer = Analyzer::new(&*store_guard);
247 analyzer.analyze(&document)?
248 };
249 let compiled = Compiler::compile(&resolved);
250 let columns = compiled_result_columns(&compiled);
251 let shape = classify_stream(&compiled);
252 // Release the analyzer's lock before either branch
253 // re-acquires (read-only path keeps it; mutating path
254 // delegates to begin_transaction which takes its own).
255 drop(store_guard);
256
257 match shape {
258 StreamShape::ReadOnly => {
259 // True pull-shaped streaming. `LiveCursor` holds
260 // the live store lock and the cursor that
261 // borrows from it; its `Drop` releases them in
262 // the right order so the caller observes pure
263 // pull semantics with no intermediate
264 // materialization.
265 let live = LiveCursor::open(self.store.clone(), compiled, params)?;
266 Ok(QueryStream::live(live, columns))
267 }
268 StreamShape::Mutating => {
269 // Hidden auto-commit transaction. The transaction
270 // owns staging, the buffering recorder, savepoint
271 // management, and the WAL replay-on-commit logic;
272 // we just pick commit-on-exhaustion vs
273 // rollback-on-drop based on cursor state.
274 //
275 // The cursor returned by `open_streaming_compiled_autocommit`
276 // may be a real per-row `StreamingWriteCursor`,
277 // a mutable UNION cursor, or a buffered leaf for
278 // operators that still need full materialization.
279 // Either way the AutoCommit guard's
280 // drop/exhaustion semantics are identical. The
281 // compiled plan is wrapped in an `Arc` so the
282 // cursor's `'static` borrows into it remain valid
283 // for the cursor's lifetime.
284 let mut tx = self.begin_transaction(TransactionMode::ReadWrite)?;
285 let compiled_arc = Arc::new(compiled);
286 let cursor =
287 match tx.open_streaming_compiled_autocommit(compiled_arc.clone(), params) {
288 Ok(c) => c,
289 Err(err) => {
290 // Tx rolls back implicitly on drop here.
291 return Err(err);
292 }
293 };
294 let guard = AutoCommitGuard {
295 tx: Some(tx),
296 finalized: false,
297 };
298 Ok(QueryStream::auto_commit(cursor, columns, guard))
299 }
300 }
301 }
302
303 /// Open a stream whose lifetime can be carried by an outer owner that
304 /// also retains an `Arc<Database>`.
305 ///
306 /// # Safety
307 ///
308 /// The returned stream may contain lock guards that borrow from the
309 /// database's internal `RwLock`. The caller must keep this exact `Arc`
310 /// alive until the stream is dropped. This is intended for language
311 /// bindings that store both the `Arc<Database>` and the `QueryStream` in
312 /// the same opaque stream handle.
313 pub unsafe fn stream_with_params_owned(
314 self: &Arc<Self>,
315 query: &str,
316 params: BTreeMap<String, LoraValue>,
317 ) -> Result<QueryStream<'static>> {
318 let stream = self.stream_with_params(query, params)?;
319 Ok(std::mem::transmute::<QueryStream<'_>, QueryStream<'static>>(stream))
320 }
321}
322
323impl<S> Database<S>
324where
325 S: GraphStorage + GraphStorageMut,
326{
327 /// Build a database from a pre-wrapped, shared store.
328 pub fn new(store: Arc<RwLock<S>>) -> Self {
329 Self { store, wal: None }
330 }
331
332 /// Build a database by taking ownership of a bare graph store.
333 pub fn from_graph(graph: S) -> Self {
334 Self::new(Arc::new(RwLock::new(graph)))
335 }
336
337 /// Handle to the installed WAL recorder, if any. Exposed for
338 /// admin paths (checkpoint, truncate, observability) that need
339 /// to drive the WAL outside the standard query lifecycle.
340 pub fn wal(&self) -> Option<&Arc<WalRecorder>> {
341 self.wal.as_ref()
342 }
343
344 /// Handle to the underlying shared store — useful for callers that need
345 /// to snapshot or share the graph across multiple databases.
346 pub fn store(&self) -> &Arc<RwLock<S>> {
347 &self.store
348 }
349
350 /// Parse a query string into an AST without executing it.
351 pub fn parse(&self, query: &str) -> Result<Document> {
352 Ok(parse_query(query)?)
353 }
354
355 pub(crate) fn read_store(&self) -> RwLockReadGuard<'_, S> {
356 self.store
357 .read()
358 .unwrap_or_else(|poisoned| poisoned.into_inner())
359 }
360
361 pub(crate) fn write_store(&self) -> RwLockWriteGuard<'_, S> {
362 self.store
363 .write()
364 .unwrap_or_else(|poisoned| poisoned.into_inner())
365 }
366
367 fn read_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockReadGuard<'_, S>> {
368 let Some(deadline) = deadline else {
369 return Ok(self.read_store());
370 };
371
372 loop {
373 match self.store.try_read() {
374 Ok(guard) => return Ok(guard),
375 Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
376 Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
377 return Err(anyhow!("query deadline exceeded"));
378 }
379 Err(TryLockError::WouldBlock) => {
380 std::thread::sleep(Duration::from_millis(1));
381 }
382 }
383 }
384 }
385
386 fn write_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockWriteGuard<'_, S>> {
387 let Some(deadline) = deadline else {
388 return Ok(self.write_store());
389 };
390
391 loop {
392 match self.store.try_write() {
393 Ok(guard) => return Ok(guard),
394 Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
395 Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
396 return Err(anyhow!("query deadline exceeded"));
397 }
398 Err(TryLockError::WouldBlock) => {
399 std::thread::sleep(Duration::from_millis(1));
400 }
401 }
402 }
403 }
404
405 fn compile_document_against(&self, document: &Document, store: &S) -> Result<CompiledQuery> {
406 let resolved = {
407 let mut analyzer = Analyzer::new(store);
408 analyzer.analyze(document)?
409 };
410
411 Ok(Compiler::compile(&resolved))
412 }
413
414 /// Execute a query and return its result.
415 pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
416 self.execute_with_params(query, options, BTreeMap::new())
417 }
418
419 /// Execute a query with a cooperative deadline. The timeout is checked at
420 /// executor operator boundaries and hot scan loops; if it fires, the query
421 /// returns an error and any WAL-backed mutating query is aborted through
422 /// the existing failure path.
423 pub fn execute_with_timeout(
424 &self,
425 query: &str,
426 options: Option<ExecuteOptions>,
427 timeout: Duration,
428 ) -> Result<QueryResult> {
429 let deadline = Instant::now()
430 .checked_add(timeout)
431 .unwrap_or_else(Instant::now);
432 let rows =
433 self.execute_rows_with_params_deadline(query, BTreeMap::new(), Some(deadline))?;
434 Ok(project_rows(rows, options.unwrap_or_default()))
435 }
436
437 /// Execute a query with bound parameters.
438 ///
439 /// When a WAL is attached the call is bracketed by a transaction:
440 ///
441 /// 1. `recorder.arm()` after analyze + compile (so a parse /
442 /// semantic / compile error never opens a tx that has to be
443 /// immediately aborted). Arming is *cheap*: no record is
444 /// appended to the WAL yet, so a pure read query that
445 /// completes here pays nothing for the WAL hot path.
446 /// 2. The executor runs; every primitive mutation fires
447 /// `MutationRecorder::record`, which buffers events in memory.
448 /// 3. On Ok, `recorder.commit()` writes `TxBegin`, one batched
449 /// mutation record, and `TxCommit` only when mutations occurred;
450 /// the surrounding `recorder.flush()` runs only in that case so
451 /// a read-only query never pays an `fsync`.
452 /// 4. On Err, `recorder.abort()` clears the pending batch. The
453 /// engine has no rollback, so the in-memory state may already
454 /// be partially mutated; the live handle is quarantined while
455 /// durable recovery stays atomic because no committed batch was
456 /// written.
457 /// 5. The recorder's poisoned flag is polled once (it also
458 /// surfaces background-flusher fsync failures from
459 /// `SyncMode::Group`). If set, the query fails loudly with the
460 /// durability error so the caller can act on it; the WAL
461 /// refuses further appends until the operator restarts the
462 /// database, which recovers from the last consistent
463 /// snapshot + WAL.
464 pub fn execute_with_params(
465 &self,
466 query: &str,
467 options: Option<ExecuteOptions>,
468 params: BTreeMap<String, LoraValue>,
469 ) -> Result<QueryResult> {
470 let rows = self.execute_rows_with_params_deadline(query, params, None)?;
471 Ok(project_rows(rows, options.unwrap_or_default()))
472 }
473
474 /// Execute a parameterised query with a cooperative deadline.
475 pub fn execute_with_params_timeout(
476 &self,
477 query: &str,
478 options: Option<ExecuteOptions>,
479 params: BTreeMap<String, LoraValue>,
480 timeout: Duration,
481 ) -> Result<QueryResult> {
482 let deadline = Instant::now()
483 .checked_add(timeout)
484 .unwrap_or_else(Instant::now);
485 let rows = self.execute_rows_with_params_deadline(query, params, Some(deadline))?;
486 Ok(project_rows(rows, options.unwrap_or_default()))
487 }
488
489 /// Execute a query and return hydrated rows before final result-format
490 /// projection.
491 pub fn execute_rows(&self, query: &str) -> Result<Vec<Row>> {
492 self.execute_rows_with_params(query, BTreeMap::new())
493 }
494
495 /// Execute a query with parameters and return hydrated rows before final
496 /// result-format projection.
497 pub fn execute_rows_with_params(
498 &self,
499 query: &str,
500 params: BTreeMap<String, LoraValue>,
501 ) -> Result<Vec<Row>> {
502 self.execute_rows_with_params_deadline(query, params, None)
503 }
504
505 fn execute_rows_with_params_deadline(
506 &self,
507 query: &str,
508 params: BTreeMap<String, LoraValue>,
509 deadline: Option<Instant>,
510 ) -> Result<Vec<Row>> {
511 let document = self.parse(query)?;
512 let shape = {
513 let store = self.read_store_deadline(deadline)?;
514 let compiled = self.compile_document_against(&document, &*store)?;
515
516 if matches!(classify_stream(&compiled), StreamShape::ReadOnly) {
517 if let Some(rec) = &self.wal {
518 if let Some(reason) = rec.poisoned() {
519 return Err(anyhow!("WAL arm failed: WAL poisoned: {reason}"));
520 }
521 }
522 let executor = lora_executor::Executor::with_deadline(
523 lora_executor::ExecutionContext {
524 storage: &*store,
525 params,
526 },
527 deadline,
528 );
529 return executor
530 .execute_compiled_rows(&compiled)
531 .map_err(|e| anyhow!(e));
532 }
533
534 classify_stream(&compiled)
535 };
536
537 debug_assert!(shape.is_mutating());
538
539 let mut store = self.write_store_deadline(deadline)?;
540 let compiled = self.compile_document_against(&document, &*store)?;
541
542 if let Some(rec) = &self.wal {
543 rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
544 }
545
546 let exec_result: Result<Vec<Row>> = (|| {
547 let mut executor = MutableExecutor::with_deadline(
548 MutableExecutionContext {
549 storage: &mut *store,
550 params,
551 },
552 deadline,
553 );
554 Ok(executor.execute_compiled_rows(&compiled)?)
555 })();
556
557 if let Some(rec) = &self.wal {
558 match &exec_result {
559 Ok(_) => match rec.commit() {
560 Ok(WroteCommit::Yes) => {
561 rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
562 }
563 Ok(WroteCommit::No) => {
564 // Read-only query: no records were written
565 // and there is nothing to fsync. Skip flush
566 // entirely so PerCommit pays zero fsyncs on
567 // pure reads.
568 }
569 Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
570 },
571 Err(_) => {
572 // Best-effort abort. If the WAL saw mutations, durable
573 // recovery will discard them but the live in-memory store
574 // may already be ahead of durable state. Quarantine this
575 // handle so callers restart instead of serving from a
576 // potentially divergent graph.
577 if matches!(rec.abort(), Ok(true)) {
578 rec.poison(
579 "query mutated the live graph before failing; restart from snapshot + WAL required",
580 );
581 }
582 }
583 }
584 if let Some(reason) = rec.poisoned() {
585 return Err(anyhow!("WAL poisoned: {reason}"));
586 }
587 }
588
589 exec_result
590 }
591
592 // ---------- Storage-agnostic utility helpers ----------
593 //
594 // Bindings previously reached into the shared store lock to answer
595 // stat / admin calls; these helpers let them depend on `Database<S>`
596 // instead, so swapping in a new backend only requires changing one type
597 // parameter.
598
599 /// Drop every node and relationship.
600 ///
601 /// When a WAL is attached, `clear()` is wrapped in `arm`/`commit`
602 /// so the `MutationEvent::Clear` fired by the store reaches the
603 /// log inside a transaction (without arming, the recorder would
604 /// poison itself on the first event). WAL failures here are
605 /// best-effort: the in-memory state is still cleared so the
606 /// caller's contract holds, but the recorder's poisoned flag
607 /// will surface to the next query.
608 pub fn clear(&self) {
609 let mut guard = self.write_store();
610 match &self.wal {
611 None => guard.clear(),
612 Some(rec) => {
613 let armed = rec.arm();
614 guard.clear();
615 if armed.is_ok() {
616 // `clear()` always emits a `MutationEvent::Clear`,
617 // so commit returns `WroteCommit::Yes` and we
618 // flush. If that order ever changes, the worst
619 // case is one redundant flush call.
620 let _ = rec.commit();
621 let _ = rec.flush();
622 }
623 }
624 }
625 }
626
627 /// Number of nodes currently in the graph.
628 pub fn node_count(&self) -> usize {
629 let guard = self.read_store();
630 guard.node_count()
631 }
632
633 /// Number of relationships currently in the graph.
634 pub fn relationship_count(&self) -> usize {
635 let guard = self.read_store();
636 guard.relationship_count()
637 }
638
639 /// Run a closure with a shared borrow of the underlying store. Used by
640 /// bindings to answer ad-hoc queries without locking the RwLock themselves.
641 pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
642 let guard = self.read_store();
643 f(&*guard)
644 }
645
646 /// Run a closure with an exclusive borrow of the underlying store. Reserved
647 /// for admin paths (restore, bulk load); regular mutation goes through
648 /// `execute_with_params`.
649 pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
650 let mut guard = self.write_store();
651 f(&mut *guard)
652 }
653}
654
655// ---------------------------------------------------------------------------
656// Snapshot helpers
657//
658// A second impl block so the `Snapshotable` bound only constrains backends
659// that actually need it. `Database<InMemoryGraph>` picks these up
660// automatically; hypothetical backends that don't implement `Snapshotable`
661// still get the core query API above.
662// ---------------------------------------------------------------------------
663
664impl<S> Database<S>
665where
666 S: GraphStorage + GraphStorageMut + Snapshotable,
667{
668 /// Serialize the current graph state to the given path. Writes are
669 /// atomic: the payload goes to `<path>.tmp`, is `fsync`'d, and then
670 /// renamed over the target; a torn write can never leave a half-written
671 /// file at `path`. If any step before the rename fails, the stale
672 /// `<path>.tmp` is removed so a crashed save never leaks scratch files.
673 ///
674 /// Holds a store read lock for the duration of the save so concurrent
675 /// readers can proceed and writers wait behind a consistent snapshot.
676 pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
677 let path = path.as_ref();
678 let tmp = snapshot_tmp_path(path);
679
680 // Acquire the lock once so the snapshot is point-in-time consistent.
681 let guard = self.read_store();
682
683 let file = OpenOptions::new()
684 .write(true)
685 .create(true)
686 .truncate(true)
687 .open(&tmp)?;
688 // Arm cleanup immediately after `open` succeeds: every early return
689 // below must either surface an error *and* unlink the tmp, or commit
690 // the guard once the rename takes effect.
691 let tmp_guard = TempFileGuard::new(tmp.clone());
692 let mut writer = BufWriter::new(file);
693
694 let meta = guard.save_snapshot(&mut writer)?;
695
696 // Flush the BufWriter before fsync; otherwise we fsync an empty
697 // underlying file.
698 use std::io::Write;
699 writer.flush()?;
700 let file = writer.into_inner().map_err(|e| e.into_error())?;
701 file.sync_all()?;
702 drop(file);
703
704 std::fs::rename(&tmp, path)?;
705 // The tmp path no longer has a file behind it — disarm the guard so
706 // it doesn't try to remove the just-renamed target by name race.
707 tmp_guard.commit();
708
709 // Best-effort parent-dir fsync so the rename itself is durable on
710 // power loss. Non-fatal if the parent can't be opened.
711 if let Some(parent) = path.parent() {
712 if let Ok(dir) = File::open(parent) {
713 let _ = dir.sync_all();
714 }
715 }
716
717 Ok(meta)
718 }
719
720 /// Replace the current graph state with a snapshot loaded from `path`.
721 /// Holds the store write lock for the duration of the load; concurrent
722 /// queries block until restore completes.
723 pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
724 let file = File::open(path.as_ref())?;
725 let reader = BufReader::new(file);
726
727 let mut guard = self.write_store();
728 Ok(guard.load_snapshot(reader)?)
729 }
730}
731
732impl Database<InMemoryGraph> {
733 /// Convenience constructor: open (or create) an empty in-memory database
734 /// and immediately restore it from `path`. Errors if the file cannot be
735 /// opened or the snapshot is malformed.
736 pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
737 let db = Self::in_memory();
738 db.load_snapshot_from(path)?;
739 Ok(db)
740 }
741
742 /// Take a checkpoint: snapshot the current state with the WAL's
743 /// `durable_lsn` stamped into the header, append a `Checkpoint`
744 /// marker to the WAL, then drop sealed segments at or below the
745 /// fence.
746 ///
747 /// Errors with "checkpoint requires WAL enabled" when called on a
748 /// database constructed without a WAL — operators that just want
749 /// a fence-less dump should use [`save_snapshot_to`] instead.
750 ///
751 /// The write-lock-held window covers snapshot serialization plus the
752 /// checkpoint marker append. Truncation runs after the rename
753 /// but still under the write lock; making it concurrent with queries
754 /// is a v2 concern (see `docs/decisions/0004-wal.md`).
755 pub fn checkpoint_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
756 let recorder = self
757 .wal
758 .as_ref()
759 .ok_or_else(|| anyhow!("checkpoint requires WAL enabled"))?;
760 let path = path.as_ref();
761 let tmp = snapshot_tmp_path(path);
762
763 let guard = self.write_store();
764
765 // Make every record appended so far durable, then capture
766 // the LSN that becomes the snapshot fence.
767 recorder
768 .force_fsync()
769 .map_err(|e| anyhow!("WAL fsync before checkpoint failed: {e}"))?;
770 let snapshot_lsn = recorder.wal().durable_lsn();
771
772 let file = OpenOptions::new()
773 .write(true)
774 .create(true)
775 .truncate(true)
776 .open(&tmp)?;
777 let tmp_guard = TempFileGuard::new(tmp.clone());
778 let mut writer = BufWriter::new(file);
779 let meta = guard.save_checkpoint(&mut writer, snapshot_lsn.raw())?;
780
781 use std::io::Write;
782 writer.flush()?;
783 let file = writer.into_inner().map_err(|e| e.into_error())?;
784 file.sync_all()?;
785 drop(file);
786
787 std::fs::rename(&tmp, path)?;
788 tmp_guard.commit();
789
790 if let Some(parent) = path.parent() {
791 if let Ok(dir) = File::open(parent) {
792 let _ = dir.sync_all();
793 }
794 }
795
796 // Append the checkpoint marker AFTER the rename succeeds —
797 // this preserves the invariant that a `Checkpoint` record
798 // in the WAL implies the snapshot it points at exists.
799 recorder
800 .checkpoint_marker(snapshot_lsn)
801 .map_err(|e| anyhow!("WAL checkpoint marker failed: {e}"))?;
802 recorder
803 .force_fsync()
804 .map_err(|e| anyhow!("WAL fsync after checkpoint marker failed: {e}"))?;
805
806 // Best-effort segment truncation. Failure here doesn't undo
807 // the checkpoint — the next call will retry.
808 let _ = recorder.truncate_up_to(snapshot_lsn);
809
810 Ok(meta)
811 }
812}
813
814fn snapshot_tmp_path(target: &Path) -> PathBuf {
815 let mut tmp = target.as_os_str().to_owned();
816 tmp.push(".tmp");
817 PathBuf::from(tmp)
818}
819
820/// RAII handle that deletes its path on drop unless [`commit`] is called.
821///
822/// The snapshot save path creates `<target>.tmp` before the payload is
823/// written; if any step between then and the final rename fails (or the
824/// thread unwinds), the guard's `Drop` removes the scratch file so a crashed
825/// save never leaves leftovers on disk.
826///
827/// [`commit`]: Self::commit
828struct TempFileGuard {
829 path: Option<PathBuf>,
830}
831
832impl TempFileGuard {
833 fn new(path: PathBuf) -> Self {
834 Self { path: Some(path) }
835 }
836
837 /// Disarm the guard. Call this once the tmp file's contents have been
838 /// handed off (e.g. renamed to their final destination) so the `Drop`
839 /// impl does not try to remove them.
840 fn commit(mut self) {
841 self.path.take();
842 }
843}
844
845impl Drop for TempFileGuard {
846 fn drop(&mut self) {
847 if let Some(path) = self.path.take() {
848 // Best-effort: cleanup failure is not worth surfacing — the
849 // worst case is a leaked scratch file that the next save
850 // overwrites via `OpenOptions::truncate(true)`.
851 let _ = std::fs::remove_file(path);
852 }
853 }
854}
855
856/// Storage-agnostic admin surface for HTTP / binding callers that want to
857/// drive snapshot operations without naming the backend type parameter.
858///
859/// `Database<S>` picks up a blanket impl when `S: Snapshotable + 'static`.
860/// Transports (e.g. `lora-server`) type-erase on `Arc<dyn SnapshotAdmin>`.
861pub trait SnapshotAdmin: Send + Sync + 'static {
862 fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
863 fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
864}
865
866impl<S> SnapshotAdmin for Database<S>
867where
868 S: GraphStorage + GraphStorageMut + Snapshotable + Send + Sync + 'static,
869{
870 fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
871 self.save_snapshot_to(path)
872 }
873
874 fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
875 self.load_snapshot_from(path)
876 }
877}
878
879/// Storage-agnostic admin surface for the WAL.
880///
881/// `Database<InMemoryGraph>` picks up the blanket impl below when a
882/// WAL is attached. Transports (e.g. `lora-server`) type-erase on
883/// `Arc<dyn WalAdmin>` so they don't need to name the backend type
884/// parameter.
885///
886/// All LSNs cross the trait boundary as raw `u64` so callers don't
887/// need a dependency on `lora-wal`.
888pub trait WalAdmin: Send + Sync + 'static {
889 /// Take a checkpoint at `path`. The snapshot's header is stamped
890 /// with the WAL's `durable_lsn`; older sealed segments are then
891 /// dropped.
892 fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta>;
893
894 /// Snapshot of the WAL's current state — durable / next LSN,
895 /// active / oldest segment id. Cheap; a single WAL mutex acquisition.
896 fn wal_status(&self) -> Result<WalStatus>;
897
898 /// Drop sealed segments at or below `fence_lsn`. Idempotent.
899 fn wal_truncate(&self, fence_lsn: u64) -> Result<()>;
900}
901
902/// Snapshot of WAL state returned by [`WalAdmin::wal_status`].
903///
904/// `bg_failure` is the latched fsync error from the background flusher
905/// (only meaningful under `SyncMode::Group`). When `Some`, the WAL is
906/// poisoned and every subsequent commit will fail loudly until the
907/// operator restarts from the last consistent snapshot + WAL.
908#[derive(Debug, Clone)]
909pub struct WalStatus {
910 pub durable_lsn: u64,
911 pub next_lsn: u64,
912 pub active_segment_id: u64,
913 pub oldest_segment_id: u64,
914 pub bg_failure: Option<String>,
915}
916
917impl WalAdmin for Database<InMemoryGraph> {
918 fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta> {
919 self.checkpoint_to(path)
920 }
921
922 fn wal_status(&self) -> Result<WalStatus> {
923 let recorder = self
924 .wal
925 .as_ref()
926 .ok_or_else(|| anyhow!("WAL not enabled"))?;
927 let wal = recorder.wal();
928 Ok(WalStatus {
929 durable_lsn: wal.durable_lsn().raw(),
930 next_lsn: wal.next_lsn().raw(),
931 active_segment_id: wal.active_segment_id(),
932 oldest_segment_id: wal.oldest_segment_id(),
933 bg_failure: wal.bg_failure(),
934 })
935 }
936
937 fn wal_truncate(&self, fence_lsn: u64) -> Result<()> {
938 let recorder = self
939 .wal
940 .as_ref()
941 .ok_or_else(|| anyhow!("WAL not enabled"))?;
942 recorder.truncate_up_to(Lsn::new(fence_lsn))?;
943 Ok(())
944 }
945}
946
947impl<S> QueryRunner for Database<S>
948where
949 S: GraphStorage + GraphStorageMut + Send + Sync + 'static,
950{
951 fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
952 Database::execute(self, query, options)
953 }
954}
955
956// ---------------------------------------------------------------------------
957// Replay
958// ---------------------------------------------------------------------------
959
960/// Apply a `MutationEvent` stream to an in-memory graph by dispatching
961/// each variant to the matching store operation.
962///
963/// Creation events are replayed through id-preserving paths, not the
964/// normal allocator-backed mutation methods. That matters after aborted
965/// transactions: an aborted create can consume id `N` in the original
966/// process, be dropped by replay, and leave the next committed create at
967/// id `N + 1`. Reusing the regular allocator would shift ids downward.
968///
969/// Replay must be invoked **before** the `WalRecorder` is installed
970/// on the graph. Otherwise the replay's own mutations would fire the
971/// recorder and re-write the same events to the WAL, doubling them on
972/// the next recovery.
973fn replay_into(graph: &mut InMemoryGraph, events: Vec<MutationEvent>) -> Result<()> {
974 for (idx, event) in events.into_iter().enumerate() {
975 match event {
976 MutationEvent::CreateNode {
977 id,
978 labels,
979 properties,
980 } => {
981 graph
982 .replay_create_node(id, labels, properties)
983 .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
984 }
985 MutationEvent::CreateRelationship {
986 id,
987 src,
988 dst,
989 rel_type,
990 properties,
991 } => {
992 graph
993 .replay_create_relationship(id, src, dst, &rel_type, properties)
994 .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
995 }
996 MutationEvent::SetNodeProperty {
997 node_id,
998 key,
999 value,
1000 } => {
1001 if !graph.set_node_property(node_id, key, value) {
1002 return Err(anyhow!(
1003 "WAL replay failed at event {idx}: missing node {node_id} for property set"
1004 ));
1005 }
1006 }
1007 MutationEvent::RemoveNodeProperty { node_id, key } => {
1008 if !graph.remove_node_property(node_id, &key) {
1009 return Err(anyhow!(
1010 "WAL replay failed at event {idx}: missing node {node_id} for property removal"
1011 ));
1012 }
1013 }
1014 MutationEvent::AddNodeLabel { node_id, label } => {
1015 if !graph.add_node_label(node_id, &label) {
1016 return Err(anyhow!(
1017 "WAL replay failed at event {idx}: missing node {node_id} for label add"
1018 ));
1019 }
1020 }
1021 MutationEvent::RemoveNodeLabel { node_id, label } => {
1022 if !graph.remove_node_label(node_id, &label) {
1023 return Err(anyhow!(
1024 "WAL replay failed at event {idx}: missing node {node_id} for label removal"
1025 ));
1026 }
1027 }
1028 MutationEvent::SetRelationshipProperty { rel_id, key, value } => {
1029 if !graph.set_relationship_property(rel_id, key, value) {
1030 return Err(anyhow!(
1031 "WAL replay failed at event {idx}: missing relationship {rel_id} for property set"
1032 ));
1033 }
1034 }
1035 MutationEvent::RemoveRelationshipProperty { rel_id, key } => {
1036 if !graph.remove_relationship_property(rel_id, &key) {
1037 return Err(anyhow!(
1038 "WAL replay failed at event {idx}: missing relationship {rel_id} for property removal"
1039 ));
1040 }
1041 }
1042 MutationEvent::DeleteRelationship { rel_id } => {
1043 if !graph.delete_relationship(rel_id) {
1044 return Err(anyhow!(
1045 "WAL replay failed at event {idx}: missing relationship {rel_id} for delete"
1046 ));
1047 }
1048 }
1049 MutationEvent::DeleteNode { node_id } => {
1050 if !graph.delete_node(node_id) {
1051 return Err(anyhow!(
1052 "WAL replay failed at event {idx}: missing or attached node {node_id} for delete"
1053 ));
1054 }
1055 }
1056 MutationEvent::DetachDeleteNode { node_id } => {
1057 // After the cascading DeleteRelationship +
1058 // DeleteNode events have already replayed, the node
1059 // is gone and this becomes a no-op. Calling it
1060 // anyway is harmless.
1061 graph.detach_delete_node(node_id);
1062 }
1063 MutationEvent::Clear => {
1064 graph.clear();
1065 }
1066 }
1067 }
1068 Ok(())
1069}