Skip to main content

lora_database/
database.rs

1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, BufWriter};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError};
6use std::time::{Duration, Instant};
7
8use anyhow::{anyhow, Result};
9use lora_analyzer::Analyzer;
10use lora_ast::Document;
11use lora_compiler::{CompiledQuery, Compiler};
12use lora_executor::{
13    classify_stream, compiled_result_columns, project_rows, ExecuteOptions, LoraValue,
14    MutableExecutionContext, MutableExecutor, QueryResult, Row, StreamShape,
15};
16use lora_parser::parse_query;
17use lora_store::{
18    GraphStorage, GraphStorageMut, InMemoryGraph, MutationEvent, MutationRecorder, SnapshotMeta,
19    Snapshotable,
20};
21use lora_wal::{replay_dir, Lsn, Wal, WalConfig, WalMirror, WalRecorder, WroteCommit};
22
23use crate::archive::WalArchive;
24use crate::named::{DatabaseName, DatabaseOpenOptions};
25use crate::stream::{AutoCommitGuard, LiveCursor, QueryStream};
26use crate::transaction::{LiveStoreGuard, Transaction, TransactionMode};
27
28/// Minimal abstraction any transport can depend on to run Lora queries.
29pub trait QueryRunner: Send + Sync + 'static {
30    fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
31}
32
33/// Owns the graph store and orchestrates parse → analyze → compile → execute.
34///
35/// Optionally drives a write-ahead log: when constructed via
36/// [`Database::open_with_wal`] or [`Database::recover`] the database
37/// holds an [`Arc<WalRecorder>`] that brackets every query with
38/// `begin → mutations → commit/abort → flush` while the store write
39/// lock is held, so the WAL order is exactly the in-memory commit order.
40/// When constructed via [`Database::in_memory`] / [`Database::from_graph`]
41/// the WAL handle is `None` and the engine pays only the existing
42/// `MutationRecorder::record` null-pointer check per mutation.
43pub struct Database<S> {
44    pub(crate) store: Arc<RwLock<S>>,
45    pub(crate) wal: Option<Arc<WalRecorder>>,
46}
47
48impl Database<InMemoryGraph> {
49    /// Convenience constructor: a fresh, empty in-memory graph database.
50    pub fn in_memory() -> Self {
51        Self::from_graph(InMemoryGraph::new())
52    }
53
54    /// Open or create a WAL-enabled in-memory database from a fresh
55    /// graph.
56    ///
57    /// `WalConfig::Disabled` falls back to [`Database::in_memory`].
58    /// Otherwise, opens the WAL directory, replays any committed
59    /// events into a fresh graph, installs a [`WalRecorder`] on the
60    /// graph, and returns a database ready to serve queries.
61    ///
62    /// To restore from a snapshot in addition to the WAL, use
63    /// [`Database::recover`] instead.
64    pub fn open_with_wal(wal_config: WalConfig) -> Result<Self> {
65        match wal_config {
66            WalConfig::Disabled => Ok(Self::in_memory()),
67            WalConfig::Enabled {
68                dir,
69                sync_mode,
70                segment_target_bytes,
71            } => {
72                let mut graph = InMemoryGraph::new();
73                let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, Lsn::ZERO)?;
74                replay_into(&mut graph, events)?;
75                let recorder = Arc::new(WalRecorder::new(wal));
76                graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
77                Ok(Self {
78                    store: Arc::new(RwLock::new(graph)),
79                    wal: Some(recorder),
80                })
81            }
82        }
83    }
84
85    /// Open or create a named portable database rooted under
86    /// `options.database_dir`.
87    ///
88    /// The database name may be either a portable basename (`app` or
89    /// `app.loradb`) or a safe relative path (`tenant/app`). It is resolved
90    /// under `options.database_dir` before the WAL archive backend opens.
91    pub fn open_named(
92        database_name: impl AsRef<str>,
93        options: DatabaseOpenOptions,
94    ) -> Result<Self> {
95        let name = DatabaseName::parse(database_name.as_ref())?;
96        let archive = Arc::new(WalArchive::open(
97            options.database_path_for(&name),
98            options.max_database_bytes,
99        )?);
100        let mut graph = InMemoryGraph::new();
101        let (wal, events) = Wal::open(
102            archive.work_dir(),
103            options.sync_mode,
104            options.segment_target_bytes,
105            Lsn::ZERO,
106        )?;
107        replay_into(&mut graph, events)?;
108        let mirror: Arc<dyn WalMirror> = archive;
109        let recorder = Arc::new(WalRecorder::new_with_mirror(wal, Some(mirror)));
110        graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
111        // Mark the archive dirty so a fresh named database is materialized as
112        // a portable ZIP. The archive writer coalesces this with any immediate
113        // follow-up writes and flushes it in the background, with a final flush
114        // on database drop.
115        recorder
116            .flush()
117            .map_err(|e| anyhow!("initial database archive persist failed: {e}"))?;
118        Ok(Self {
119            store: Arc::new(RwLock::new(graph)),
120            wal: Some(recorder),
121        })
122    }
123
124    /// Start an explicit transaction.
125    ///
126    /// Read-only transactions hold a shared read lock for their
127    /// lifetime; read-write transactions hold the write lock. The
128    /// staging clone is **lazy** — it only happens when a
129    /// [`TransactionMode::ReadWrite`] transaction sees its first
130    /// mutating statement. Materialized read-only statements run
131    /// straight against the live graph; tx-bound streams may still
132    /// clone so their cursors can own a stable view. ReadWrite
133    /// transactions that perform only materialized reads (or commit
134    /// empty) pay nothing for staging.
135    pub fn begin_transaction(&self, mode: TransactionMode) -> Result<Transaction<'_>> {
136        let live = match mode {
137            TransactionMode::ReadOnly => LiveStoreGuard::Read(self.read_store()),
138            TransactionMode::ReadWrite => LiveStoreGuard::Write(self.write_store()),
139        };
140        Ok(Transaction::new(live, self.wal.clone(), mode))
141    }
142
143    /// Restore from a snapshot file then replay any WAL records past
144    /// it.
145    ///
146    /// The snapshot's `wal_lsn` (when set) becomes the replay fence —
147    /// events at or below that LSN are already represented in the
148    /// loaded snapshot and are skipped. A missing snapshot file is
149    /// treated as "fresh start" so operators can pass the same path
150    /// on every boot.
151    ///
152    /// If the WAL contains a checkpoint marker newer than the
153    /// snapshot's `wal_lsn`, a one-line warning is printed to stderr
154    /// — the snapshot is stale relative to a more recent checkpoint
155    /// the operator is presumably aware of. Recovery still proceeds
156    /// from the snapshot's fence (replay re-applies every record
157    /// above it, which is conservative-correct); a tighter contract
158    /// is deferred to v2 because verifying that the marker's
159    /// snapshot file actually exists and is loadable is a separate
160    /// observability concern.
161    pub fn recover(snapshot_path: impl AsRef<Path>, wal_config: WalConfig) -> Result<Self> {
162        let snapshot_path = snapshot_path.as_ref();
163        let mut graph = InMemoryGraph::new();
164        let snapshot_lsn = match File::open(snapshot_path) {
165            Ok(f) => {
166                let reader = BufReader::new(f);
167                let meta = graph.load_snapshot(reader)?;
168                meta.wal_lsn.map(Lsn::new).unwrap_or(Lsn::ZERO)
169            }
170            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Lsn::ZERO,
171            Err(e) => return Err(e.into()),
172        };
173
174        match wal_config {
175            WalConfig::Disabled => Ok(Self::from_graph(graph)),
176            WalConfig::Enabled {
177                dir,
178                sync_mode,
179                segment_target_bytes,
180            } => {
181                // Diagnostic peek at the WAL's newest checkpoint
182                // marker so we can warn the operator about a stale
183                // snapshot before we start replaying. Treat any error
184                // as "no marker" — the subsequent `Wal::open` will
185                // surface the real failure if there is one.
186                if dir.exists() {
187                    if let Ok(outcome) = replay_dir(&dir, Lsn::ZERO) {
188                        if let Some(marker) = outcome.checkpoint_lsn_observed {
189                            if marker > snapshot_lsn {
190                                eprintln!(
191                                    "lora-wal: snapshot at LSN {} is older than the newest \
192                                     checkpoint marker on disk (LSN {}). Replaying every WAL \
193                                     record above LSN {}; consider passing the more recent \
194                                     snapshot to --restore-from.",
195                                    snapshot_lsn.raw(),
196                                    marker.raw(),
197                                    snapshot_lsn.raw()
198                                );
199                            }
200                        }
201                    }
202                }
203
204                let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, snapshot_lsn)?;
205                replay_into(&mut graph, events)?;
206                let recorder = Arc::new(WalRecorder::new(wal));
207                graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
208                Ok(Self {
209                    store: Arc::new(RwLock::new(graph)),
210                    wal: Some(recorder),
211                })
212            }
213        }
214    }
215
216    /// Execute a query and return an owning row stream.
217    pub fn stream(&self, query: &str) -> Result<QueryStream<'_>> {
218        self.stream_with_params(query, BTreeMap::new())
219    }
220
221    /// Execute a parameterised query and return an owning row stream.
222    ///
223    /// The compiled plan is classified at open time. Read-only
224    /// queries run directly off the live store and yield a
225    /// buffered cursor with plan-derived columns. Mutating queries
226    /// are routed through a hidden read-write [`Transaction`]:
227    /// full cursor exhaustion calls `tx.commit` (publishing staged
228    /// changes and replaying the tx-local WAL buffer); a premature
229    /// drop or any error from `next_row` calls `tx.rollback` so
230    /// the live store and the WAL stay untouched.
231    pub fn stream_with_params(
232        &self,
233        query: &str,
234        params: BTreeMap<String, LoraValue>,
235    ) -> Result<QueryStream<'_>> {
236        // Classify by compiling once against the live store. The
237        // mutating branch then re-compiles inside the hidden
238        // transaction (against a staged graph that's identical to
239        // live at clone time, so the second plan matches the
240        // first); the cost is one extra parse+analyze+compile per
241        // mutating stream, paid in exchange for a tiny
242        // classify-stream surface.
243        let document = parse_query(query)?;
244        let store_guard = self.read_store();
245        let resolved = {
246            let mut analyzer = Analyzer::new(&*store_guard);
247            analyzer.analyze(&document)?
248        };
249        let compiled = Compiler::compile(&resolved);
250        let columns = compiled_result_columns(&compiled);
251        let shape = classify_stream(&compiled);
252        // Release the analyzer's lock before either branch
253        // re-acquires (read-only path keeps it; mutating path
254        // delegates to begin_transaction which takes its own).
255        drop(store_guard);
256
257        match shape {
258            StreamShape::ReadOnly => {
259                // True pull-shaped streaming. `LiveCursor` holds
260                // the live store lock and the cursor that
261                // borrows from it; its `Drop` releases them in
262                // the right order so the caller observes pure
263                // pull semantics with no intermediate
264                // materialization.
265                let live = LiveCursor::open(self.store.clone(), compiled, params)?;
266                Ok(QueryStream::live(live, columns))
267            }
268            StreamShape::Mutating => {
269                // Hidden auto-commit transaction. The transaction
270                // owns staging, the buffering recorder, savepoint
271                // management, and the WAL replay-on-commit logic;
272                // we just pick commit-on-exhaustion vs
273                // rollback-on-drop based on cursor state.
274                //
275                // The cursor returned by `open_streaming_compiled_autocommit`
276                // may be a real per-row `StreamingWriteCursor`,
277                // a mutable UNION cursor, or a buffered leaf for
278                // operators that still need full materialization.
279                // Either way the AutoCommit guard's
280                // drop/exhaustion semantics are identical. The
281                // compiled plan is wrapped in an `Arc` so the
282                // cursor's `'static` borrows into it remain valid
283                // for the cursor's lifetime.
284                let mut tx = self.begin_transaction(TransactionMode::ReadWrite)?;
285                let compiled_arc = Arc::new(compiled);
286                let cursor =
287                    match tx.open_streaming_compiled_autocommit(compiled_arc.clone(), params) {
288                        Ok(c) => c,
289                        Err(err) => {
290                            // Tx rolls back implicitly on drop here.
291                            return Err(err);
292                        }
293                    };
294                let guard = AutoCommitGuard {
295                    tx: Some(tx),
296                    finalized: false,
297                };
298                Ok(QueryStream::auto_commit(cursor, columns, guard))
299            }
300        }
301    }
302
303    /// Open a stream whose lifetime can be carried by an outer owner that
304    /// also retains an `Arc<Database>`.
305    ///
306    /// # Safety
307    ///
308    /// The returned stream may contain lock guards that borrow from the
309    /// database's internal `RwLock`. The caller must keep this exact `Arc`
310    /// alive until the stream is dropped. This is intended for language
311    /// bindings that store both the `Arc<Database>` and the `QueryStream` in
312    /// the same opaque stream handle.
313    pub unsafe fn stream_with_params_owned(
314        self: &Arc<Self>,
315        query: &str,
316        params: BTreeMap<String, LoraValue>,
317    ) -> Result<QueryStream<'static>> {
318        let stream = self.stream_with_params(query, params)?;
319        Ok(std::mem::transmute::<QueryStream<'_>, QueryStream<'static>>(stream))
320    }
321}
322
323impl<S> Database<S>
324where
325    S: GraphStorage + GraphStorageMut,
326{
327    /// Build a database from a pre-wrapped, shared store.
328    pub fn new(store: Arc<RwLock<S>>) -> Self {
329        Self { store, wal: None }
330    }
331
332    /// Build a database by taking ownership of a bare graph store.
333    pub fn from_graph(graph: S) -> Self {
334        Self::new(Arc::new(RwLock::new(graph)))
335    }
336
337    /// Handle to the installed WAL recorder, if any. Exposed for
338    /// admin paths (checkpoint, truncate, observability) that need
339    /// to drive the WAL outside the standard query lifecycle.
340    pub fn wal(&self) -> Option<&Arc<WalRecorder>> {
341        self.wal.as_ref()
342    }
343
344    /// Handle to the underlying shared store — useful for callers that need
345    /// to snapshot or share the graph across multiple databases.
346    pub fn store(&self) -> &Arc<RwLock<S>> {
347        &self.store
348    }
349
350    /// Parse a query string into an AST without executing it.
351    pub fn parse(&self, query: &str) -> Result<Document> {
352        Ok(parse_query(query)?)
353    }
354
355    pub(crate) fn read_store(&self) -> RwLockReadGuard<'_, S> {
356        self.store
357            .read()
358            .unwrap_or_else(|poisoned| poisoned.into_inner())
359    }
360
361    pub(crate) fn write_store(&self) -> RwLockWriteGuard<'_, S> {
362        self.store
363            .write()
364            .unwrap_or_else(|poisoned| poisoned.into_inner())
365    }
366
367    fn read_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockReadGuard<'_, S>> {
368        let Some(deadline) = deadline else {
369            return Ok(self.read_store());
370        };
371
372        loop {
373            match self.store.try_read() {
374                Ok(guard) => return Ok(guard),
375                Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
376                Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
377                    return Err(anyhow!("query deadline exceeded"));
378                }
379                Err(TryLockError::WouldBlock) => {
380                    std::thread::sleep(Duration::from_millis(1));
381                }
382            }
383        }
384    }
385
386    fn write_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockWriteGuard<'_, S>> {
387        let Some(deadline) = deadline else {
388            return Ok(self.write_store());
389        };
390
391        loop {
392            match self.store.try_write() {
393                Ok(guard) => return Ok(guard),
394                Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
395                Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
396                    return Err(anyhow!("query deadline exceeded"));
397                }
398                Err(TryLockError::WouldBlock) => {
399                    std::thread::sleep(Duration::from_millis(1));
400                }
401            }
402        }
403    }
404
405    fn compile_document_against(&self, document: &Document, store: &S) -> Result<CompiledQuery> {
406        let resolved = {
407            let mut analyzer = Analyzer::new(store);
408            analyzer.analyze(document)?
409        };
410
411        Ok(Compiler::compile(&resolved))
412    }
413
414    /// Execute a query and return its result.
415    pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
416        self.execute_with_params(query, options, BTreeMap::new())
417    }
418
419    /// Execute a query with a cooperative deadline. The timeout is checked at
420    /// executor operator boundaries and hot scan loops; if it fires, the query
421    /// returns an error and any WAL-backed mutating query is aborted through
422    /// the existing failure path.
423    pub fn execute_with_timeout(
424        &self,
425        query: &str,
426        options: Option<ExecuteOptions>,
427        timeout: Duration,
428    ) -> Result<QueryResult> {
429        let deadline = Instant::now()
430            .checked_add(timeout)
431            .unwrap_or_else(Instant::now);
432        let rows =
433            self.execute_rows_with_params_deadline(query, BTreeMap::new(), Some(deadline))?;
434        Ok(project_rows(rows, options.unwrap_or_default()))
435    }
436
437    /// Execute a query with bound parameters.
438    ///
439    /// When a WAL is attached the call is bracketed by a transaction:
440    ///
441    /// 1. `recorder.arm()` after analyze + compile (so a parse /
442    ///    semantic / compile error never opens a tx that has to be
443    ///    immediately aborted). Arming is *cheap*: no record is
444    ///    appended to the WAL yet, so a pure read query that
445    ///    completes here pays nothing for the WAL hot path.
446    /// 2. The executor runs; every primitive mutation fires
447    ///    `MutationRecorder::record`, which buffers events in memory.
448    /// 3. On Ok, `recorder.commit()` writes `TxBegin`, one batched
449    ///    mutation record, and `TxCommit` only when mutations occurred;
450    ///    the surrounding `recorder.flush()` runs only in that case so
451    ///    a read-only query never pays an `fsync`.
452    /// 4. On Err, `recorder.abort()` clears the pending batch. The
453    ///    engine has no rollback, so the in-memory state may already
454    ///    be partially mutated; the live handle is quarantined while
455    ///    durable recovery stays atomic because no committed batch was
456    ///    written.
457    /// 5. The recorder's poisoned flag is polled once (it also
458    ///    surfaces background-flusher fsync failures from
459    ///    `SyncMode::Group`). If set, the query fails loudly with the
460    ///    durability error so the caller can act on it; the WAL
461    ///    refuses further appends until the operator restarts the
462    ///    database, which recovers from the last consistent
463    ///    snapshot + WAL.
464    pub fn execute_with_params(
465        &self,
466        query: &str,
467        options: Option<ExecuteOptions>,
468        params: BTreeMap<String, LoraValue>,
469    ) -> Result<QueryResult> {
470        let rows = self.execute_rows_with_params_deadline(query, params, None)?;
471        Ok(project_rows(rows, options.unwrap_or_default()))
472    }
473
474    /// Execute a parameterised query with a cooperative deadline.
475    pub fn execute_with_params_timeout(
476        &self,
477        query: &str,
478        options: Option<ExecuteOptions>,
479        params: BTreeMap<String, LoraValue>,
480        timeout: Duration,
481    ) -> Result<QueryResult> {
482        let deadline = Instant::now()
483            .checked_add(timeout)
484            .unwrap_or_else(Instant::now);
485        let rows = self.execute_rows_with_params_deadline(query, params, Some(deadline))?;
486        Ok(project_rows(rows, options.unwrap_or_default()))
487    }
488
489    /// Execute a query and return hydrated rows before final result-format
490    /// projection.
491    pub fn execute_rows(&self, query: &str) -> Result<Vec<Row>> {
492        self.execute_rows_with_params(query, BTreeMap::new())
493    }
494
495    /// Execute a query with parameters and return hydrated rows before final
496    /// result-format projection.
497    pub fn execute_rows_with_params(
498        &self,
499        query: &str,
500        params: BTreeMap<String, LoraValue>,
501    ) -> Result<Vec<Row>> {
502        self.execute_rows_with_params_deadline(query, params, None)
503    }
504
505    fn execute_rows_with_params_deadline(
506        &self,
507        query: &str,
508        params: BTreeMap<String, LoraValue>,
509        deadline: Option<Instant>,
510    ) -> Result<Vec<Row>> {
511        let document = self.parse(query)?;
512        let shape = {
513            let store = self.read_store_deadline(deadline)?;
514            let compiled = self.compile_document_against(&document, &*store)?;
515
516            if matches!(classify_stream(&compiled), StreamShape::ReadOnly) {
517                if let Some(rec) = &self.wal {
518                    if let Some(reason) = rec.poisoned() {
519                        return Err(anyhow!("WAL arm failed: WAL poisoned: {reason}"));
520                    }
521                }
522                let executor = lora_executor::Executor::with_deadline(
523                    lora_executor::ExecutionContext {
524                        storage: &*store,
525                        params,
526                    },
527                    deadline,
528                );
529                return executor
530                    .execute_compiled_rows(&compiled)
531                    .map_err(|e| anyhow!(e));
532            }
533
534            classify_stream(&compiled)
535        };
536
537        debug_assert!(shape.is_mutating());
538
539        let mut store = self.write_store_deadline(deadline)?;
540        let compiled = self.compile_document_against(&document, &*store)?;
541
542        if let Some(rec) = &self.wal {
543            rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
544        }
545
546        let exec_result: Result<Vec<Row>> = (|| {
547            let mut executor = MutableExecutor::with_deadline(
548                MutableExecutionContext {
549                    storage: &mut *store,
550                    params,
551                },
552                deadline,
553            );
554            Ok(executor.execute_compiled_rows(&compiled)?)
555        })();
556
557        if let Some(rec) = &self.wal {
558            match &exec_result {
559                Ok(_) => match rec.commit() {
560                    Ok(WroteCommit::Yes) => {
561                        rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
562                    }
563                    Ok(WroteCommit::No) => {
564                        // Read-only query: no records were written
565                        // and there is nothing to fsync. Skip flush
566                        // entirely so PerCommit pays zero fsyncs on
567                        // pure reads.
568                    }
569                    Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
570                },
571                Err(_) => {
572                    // Best-effort abort. If the WAL saw mutations, durable
573                    // recovery will discard them but the live in-memory store
574                    // may already be ahead of durable state. Quarantine this
575                    // handle so callers restart instead of serving from a
576                    // potentially divergent graph.
577                    if matches!(rec.abort(), Ok(true)) {
578                        rec.poison(
579                            "query mutated the live graph before failing; restart from snapshot + WAL required",
580                        );
581                    }
582                }
583            }
584            if let Some(reason) = rec.poisoned() {
585                return Err(anyhow!("WAL poisoned: {reason}"));
586            }
587        }
588
589        exec_result
590    }
591
592    // ---------- Storage-agnostic utility helpers ----------
593    //
594    // Bindings previously reached into the shared store lock to answer
595    // stat / admin calls; these helpers let them depend on `Database<S>`
596    // instead, so swapping in a new backend only requires changing one type
597    // parameter.
598
599    /// Drop every node and relationship.
600    ///
601    /// When a WAL is attached, `clear()` is wrapped in `arm`/`commit`
602    /// so the `MutationEvent::Clear` fired by the store reaches the
603    /// log inside a transaction (without arming, the recorder would
604    /// poison itself on the first event). WAL failures here are
605    /// best-effort: the in-memory state is still cleared so the
606    /// caller's contract holds, but the recorder's poisoned flag
607    /// will surface to the next query.
608    pub fn clear(&self) {
609        let mut guard = self.write_store();
610        match &self.wal {
611            None => guard.clear(),
612            Some(rec) => {
613                let armed = rec.arm();
614                guard.clear();
615                if armed.is_ok() {
616                    // `clear()` always emits a `MutationEvent::Clear`,
617                    // so commit returns `WroteCommit::Yes` and we
618                    // flush. If that order ever changes, the worst
619                    // case is one redundant flush call.
620                    let _ = rec.commit();
621                    let _ = rec.flush();
622                }
623            }
624        }
625    }
626
627    /// Number of nodes currently in the graph.
628    pub fn node_count(&self) -> usize {
629        let guard = self.read_store();
630        guard.node_count()
631    }
632
633    /// Number of relationships currently in the graph.
634    pub fn relationship_count(&self) -> usize {
635        let guard = self.read_store();
636        guard.relationship_count()
637    }
638
639    /// Run a closure with a shared borrow of the underlying store. Used by
640    /// bindings to answer ad-hoc queries without locking the RwLock themselves.
641    pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
642        let guard = self.read_store();
643        f(&*guard)
644    }
645
646    /// Run a closure with an exclusive borrow of the underlying store. Reserved
647    /// for admin paths (restore, bulk load); regular mutation goes through
648    /// `execute_with_params`.
649    pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
650        let mut guard = self.write_store();
651        f(&mut *guard)
652    }
653}
654
655// ---------------------------------------------------------------------------
656// Snapshot helpers
657//
658// A second impl block so the `Snapshotable` bound only constrains backends
659// that actually need it. `Database<InMemoryGraph>` picks these up
660// automatically; hypothetical backends that don't implement `Snapshotable`
661// still get the core query API above.
662// ---------------------------------------------------------------------------
663
664impl<S> Database<S>
665where
666    S: GraphStorage + GraphStorageMut + Snapshotable,
667{
668    /// Serialize the current graph state to the given path. Writes are
669    /// atomic: the payload goes to `<path>.tmp`, is `fsync`'d, and then
670    /// renamed over the target; a torn write can never leave a half-written
671    /// file at `path`. If any step before the rename fails, the stale
672    /// `<path>.tmp` is removed so a crashed save never leaks scratch files.
673    ///
674    /// Holds a store read lock for the duration of the save so concurrent
675    /// readers can proceed and writers wait behind a consistent snapshot.
676    pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
677        let path = path.as_ref();
678        let tmp = snapshot_tmp_path(path);
679
680        // Acquire the lock once so the snapshot is point-in-time consistent.
681        let guard = self.read_store();
682
683        let file = OpenOptions::new()
684            .write(true)
685            .create(true)
686            .truncate(true)
687            .open(&tmp)?;
688        // Arm cleanup immediately after `open` succeeds: every early return
689        // below must either surface an error *and* unlink the tmp, or commit
690        // the guard once the rename takes effect.
691        let tmp_guard = TempFileGuard::new(tmp.clone());
692        let mut writer = BufWriter::new(file);
693
694        let meta = guard.save_snapshot(&mut writer)?;
695
696        // Flush the BufWriter before fsync; otherwise we fsync an empty
697        // underlying file.
698        use std::io::Write;
699        writer.flush()?;
700        let file = writer.into_inner().map_err(|e| e.into_error())?;
701        file.sync_all()?;
702        drop(file);
703
704        std::fs::rename(&tmp, path)?;
705        // The tmp path no longer has a file behind it — disarm the guard so
706        // it doesn't try to remove the just-renamed target by name race.
707        tmp_guard.commit();
708
709        // Best-effort parent-dir fsync so the rename itself is durable on
710        // power loss. Non-fatal if the parent can't be opened.
711        if let Some(parent) = path.parent() {
712            if let Ok(dir) = File::open(parent) {
713                let _ = dir.sync_all();
714            }
715        }
716
717        Ok(meta)
718    }
719
720    /// Replace the current graph state with a snapshot loaded from `path`.
721    /// Holds the store write lock for the duration of the load; concurrent
722    /// queries block until restore completes.
723    pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
724        let file = File::open(path.as_ref())?;
725        let reader = BufReader::new(file);
726
727        let mut guard = self.write_store();
728        Ok(guard.load_snapshot(reader)?)
729    }
730}
731
732impl Database<InMemoryGraph> {
733    /// Convenience constructor: open (or create) an empty in-memory database
734    /// and immediately restore it from `path`. Errors if the file cannot be
735    /// opened or the snapshot is malformed.
736    pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
737        let db = Self::in_memory();
738        db.load_snapshot_from(path)?;
739        Ok(db)
740    }
741
742    /// Take a checkpoint: snapshot the current state with the WAL's
743    /// `durable_lsn` stamped into the header, append a `Checkpoint`
744    /// marker to the WAL, then drop sealed segments at or below the
745    /// fence.
746    ///
747    /// Errors with "checkpoint requires WAL enabled" when called on a
748    /// database constructed without a WAL — operators that just want
749    /// a fence-less dump should use [`save_snapshot_to`] instead.
750    ///
751    /// The write-lock-held window covers snapshot serialization plus the
752    /// checkpoint marker append. Truncation runs after the rename
753    /// but still under the write lock; making it concurrent with queries
754    /// is a v2 concern (see `docs/decisions/0004-wal.md`).
755    pub fn checkpoint_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
756        let recorder = self
757            .wal
758            .as_ref()
759            .ok_or_else(|| anyhow!("checkpoint requires WAL enabled"))?;
760        let path = path.as_ref();
761        let tmp = snapshot_tmp_path(path);
762
763        let guard = self.write_store();
764
765        // Make every record appended so far durable, then capture
766        // the LSN that becomes the snapshot fence.
767        recorder
768            .force_fsync()
769            .map_err(|e| anyhow!("WAL fsync before checkpoint failed: {e}"))?;
770        let snapshot_lsn = recorder.wal().durable_lsn();
771
772        let file = OpenOptions::new()
773            .write(true)
774            .create(true)
775            .truncate(true)
776            .open(&tmp)?;
777        let tmp_guard = TempFileGuard::new(tmp.clone());
778        let mut writer = BufWriter::new(file);
779        let meta = guard.save_checkpoint(&mut writer, snapshot_lsn.raw())?;
780
781        use std::io::Write;
782        writer.flush()?;
783        let file = writer.into_inner().map_err(|e| e.into_error())?;
784        file.sync_all()?;
785        drop(file);
786
787        std::fs::rename(&tmp, path)?;
788        tmp_guard.commit();
789
790        if let Some(parent) = path.parent() {
791            if let Ok(dir) = File::open(parent) {
792                let _ = dir.sync_all();
793            }
794        }
795
796        // Append the checkpoint marker AFTER the rename succeeds —
797        // this preserves the invariant that a `Checkpoint` record
798        // in the WAL implies the snapshot it points at exists.
799        recorder
800            .checkpoint_marker(snapshot_lsn)
801            .map_err(|e| anyhow!("WAL checkpoint marker failed: {e}"))?;
802        recorder
803            .force_fsync()
804            .map_err(|e| anyhow!("WAL fsync after checkpoint marker failed: {e}"))?;
805
806        // Best-effort segment truncation. Failure here doesn't undo
807        // the checkpoint — the next call will retry.
808        let _ = recorder.truncate_up_to(snapshot_lsn);
809
810        Ok(meta)
811    }
812}
813
814fn snapshot_tmp_path(target: &Path) -> PathBuf {
815    let mut tmp = target.as_os_str().to_owned();
816    tmp.push(".tmp");
817    PathBuf::from(tmp)
818}
819
820/// RAII handle that deletes its path on drop unless [`commit`] is called.
821///
822/// The snapshot save path creates `<target>.tmp` before the payload is
823/// written; if any step between then and the final rename fails (or the
824/// thread unwinds), the guard's `Drop` removes the scratch file so a crashed
825/// save never leaves leftovers on disk.
826///
827/// [`commit`]: Self::commit
828struct TempFileGuard {
829    path: Option<PathBuf>,
830}
831
832impl TempFileGuard {
833    fn new(path: PathBuf) -> Self {
834        Self { path: Some(path) }
835    }
836
837    /// Disarm the guard. Call this once the tmp file's contents have been
838    /// handed off (e.g. renamed to their final destination) so the `Drop`
839    /// impl does not try to remove them.
840    fn commit(mut self) {
841        self.path.take();
842    }
843}
844
845impl Drop for TempFileGuard {
846    fn drop(&mut self) {
847        if let Some(path) = self.path.take() {
848            // Best-effort: cleanup failure is not worth surfacing — the
849            // worst case is a leaked scratch file that the next save
850            // overwrites via `OpenOptions::truncate(true)`.
851            let _ = std::fs::remove_file(path);
852        }
853    }
854}
855
856/// Storage-agnostic admin surface for HTTP / binding callers that want to
857/// drive snapshot operations without naming the backend type parameter.
858///
859/// `Database<S>` picks up a blanket impl when `S: Snapshotable + 'static`.
860/// Transports (e.g. `lora-server`) type-erase on `Arc<dyn SnapshotAdmin>`.
861pub trait SnapshotAdmin: Send + Sync + 'static {
862    fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
863    fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
864}
865
866impl<S> SnapshotAdmin for Database<S>
867where
868    S: GraphStorage + GraphStorageMut + Snapshotable + Send + Sync + 'static,
869{
870    fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
871        self.save_snapshot_to(path)
872    }
873
874    fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
875        self.load_snapshot_from(path)
876    }
877}
878
879/// Storage-agnostic admin surface for the WAL.
880///
881/// `Database<InMemoryGraph>` picks up the blanket impl below when a
882/// WAL is attached. Transports (e.g. `lora-server`) type-erase on
883/// `Arc<dyn WalAdmin>` so they don't need to name the backend type
884/// parameter.
885///
886/// All LSNs cross the trait boundary as raw `u64` so callers don't
887/// need a dependency on `lora-wal`.
888pub trait WalAdmin: Send + Sync + 'static {
889    /// Take a checkpoint at `path`. The snapshot's header is stamped
890    /// with the WAL's `durable_lsn`; older sealed segments are then
891    /// dropped.
892    fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta>;
893
894    /// Snapshot of the WAL's current state — durable / next LSN,
895    /// active / oldest segment id. Cheap; a single WAL mutex acquisition.
896    fn wal_status(&self) -> Result<WalStatus>;
897
898    /// Drop sealed segments at or below `fence_lsn`. Idempotent.
899    fn wal_truncate(&self, fence_lsn: u64) -> Result<()>;
900}
901
902/// Snapshot of WAL state returned by [`WalAdmin::wal_status`].
903///
904/// `bg_failure` is the latched fsync error from the background flusher
905/// (only meaningful under `SyncMode::Group`). When `Some`, the WAL is
906/// poisoned and every subsequent commit will fail loudly until the
907/// operator restarts from the last consistent snapshot + WAL.
908#[derive(Debug, Clone)]
909pub struct WalStatus {
910    pub durable_lsn: u64,
911    pub next_lsn: u64,
912    pub active_segment_id: u64,
913    pub oldest_segment_id: u64,
914    pub bg_failure: Option<String>,
915}
916
917impl WalAdmin for Database<InMemoryGraph> {
918    fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta> {
919        self.checkpoint_to(path)
920    }
921
922    fn wal_status(&self) -> Result<WalStatus> {
923        let recorder = self
924            .wal
925            .as_ref()
926            .ok_or_else(|| anyhow!("WAL not enabled"))?;
927        let wal = recorder.wal();
928        Ok(WalStatus {
929            durable_lsn: wal.durable_lsn().raw(),
930            next_lsn: wal.next_lsn().raw(),
931            active_segment_id: wal.active_segment_id(),
932            oldest_segment_id: wal.oldest_segment_id(),
933            bg_failure: wal.bg_failure(),
934        })
935    }
936
937    fn wal_truncate(&self, fence_lsn: u64) -> Result<()> {
938        let recorder = self
939            .wal
940            .as_ref()
941            .ok_or_else(|| anyhow!("WAL not enabled"))?;
942        recorder.truncate_up_to(Lsn::new(fence_lsn))?;
943        Ok(())
944    }
945}
946
947impl<S> QueryRunner for Database<S>
948where
949    S: GraphStorage + GraphStorageMut + Send + Sync + 'static,
950{
951    fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
952        Database::execute(self, query, options)
953    }
954}
955
956// ---------------------------------------------------------------------------
957// Replay
958// ---------------------------------------------------------------------------
959
960/// Apply a `MutationEvent` stream to an in-memory graph by dispatching
961/// each variant to the matching store operation.
962///
963/// Creation events are replayed through id-preserving paths, not the
964/// normal allocator-backed mutation methods. That matters after aborted
965/// transactions: an aborted create can consume id `N` in the original
966/// process, be dropped by replay, and leave the next committed create at
967/// id `N + 1`. Reusing the regular allocator would shift ids downward.
968///
969/// Replay must be invoked **before** the `WalRecorder` is installed
970/// on the graph. Otherwise the replay's own mutations would fire the
971/// recorder and re-write the same events to the WAL, doubling them on
972/// the next recovery.
973fn replay_into(graph: &mut InMemoryGraph, events: Vec<MutationEvent>) -> Result<()> {
974    for (idx, event) in events.into_iter().enumerate() {
975        match event {
976            MutationEvent::CreateNode {
977                id,
978                labels,
979                properties,
980            } => {
981                graph
982                    .replay_create_node(id, labels, properties)
983                    .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
984            }
985            MutationEvent::CreateRelationship {
986                id,
987                src,
988                dst,
989                rel_type,
990                properties,
991            } => {
992                graph
993                    .replay_create_relationship(id, src, dst, &rel_type, properties)
994                    .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
995            }
996            MutationEvent::SetNodeProperty {
997                node_id,
998                key,
999                value,
1000            } => {
1001                if !graph.set_node_property(node_id, key, value) {
1002                    return Err(anyhow!(
1003                        "WAL replay failed at event {idx}: missing node {node_id} for property set"
1004                    ));
1005                }
1006            }
1007            MutationEvent::RemoveNodeProperty { node_id, key } => {
1008                if !graph.remove_node_property(node_id, &key) {
1009                    return Err(anyhow!(
1010                        "WAL replay failed at event {idx}: missing node {node_id} for property removal"
1011                    ));
1012                }
1013            }
1014            MutationEvent::AddNodeLabel { node_id, label } => {
1015                if !graph.add_node_label(node_id, &label) {
1016                    return Err(anyhow!(
1017                        "WAL replay failed at event {idx}: missing node {node_id} for label add"
1018                    ));
1019                }
1020            }
1021            MutationEvent::RemoveNodeLabel { node_id, label } => {
1022                if !graph.remove_node_label(node_id, &label) {
1023                    return Err(anyhow!(
1024                        "WAL replay failed at event {idx}: missing node {node_id} for label removal"
1025                    ));
1026                }
1027            }
1028            MutationEvent::SetRelationshipProperty { rel_id, key, value } => {
1029                if !graph.set_relationship_property(rel_id, key, value) {
1030                    return Err(anyhow!(
1031                        "WAL replay failed at event {idx}: missing relationship {rel_id} for property set"
1032                    ));
1033                }
1034            }
1035            MutationEvent::RemoveRelationshipProperty { rel_id, key } => {
1036                if !graph.remove_relationship_property(rel_id, &key) {
1037                    return Err(anyhow!(
1038                        "WAL replay failed at event {idx}: missing relationship {rel_id} for property removal"
1039                    ));
1040                }
1041            }
1042            MutationEvent::DeleteRelationship { rel_id } => {
1043                if !graph.delete_relationship(rel_id) {
1044                    return Err(anyhow!(
1045                        "WAL replay failed at event {idx}: missing relationship {rel_id} for delete"
1046                    ));
1047                }
1048            }
1049            MutationEvent::DeleteNode { node_id } => {
1050                if !graph.delete_node(node_id) {
1051                    return Err(anyhow!(
1052                        "WAL replay failed at event {idx}: missing or attached node {node_id} for delete"
1053                    ));
1054                }
1055            }
1056            MutationEvent::DetachDeleteNode { node_id } => {
1057                // After the cascading DeleteRelationship +
1058                // DeleteNode events have already replayed, the node
1059                // is gone and this becomes a no-op. Calling it
1060                // anyway is harmless.
1061                graph.detach_delete_node(node_id);
1062            }
1063            MutationEvent::Clear => {
1064                graph.clear();
1065            }
1066        }
1067    }
1068    Ok(())
1069}