Skip to main content

lora_database/
database.rs

1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, BufWriter};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError};
6use std::time::{Duration, Instant};
7
8use anyhow::{anyhow, Result};
9use lora_analyzer::Analyzer;
10use lora_ast::Document;
11use lora_compiler::{CompiledQuery, Compiler};
12use lora_executor::{
13    classify_stream, compiled_result_columns, project_rows, ExecuteOptions, LoraValue,
14    MutableExecutionContext, MutableExecutor, QueryResult, Row, StreamShape,
15};
16use lora_parser::parse_query;
17use lora_store::{
18    GraphStorage, GraphStorageMut, InMemoryGraph, MutationEvent, MutationRecorder, SnapshotMeta,
19    Snapshotable,
20};
21use lora_wal::{replay_dir, Lsn, Wal, WalConfig, WalMirror, WalRecorder, WroteCommit};
22
23use crate::archive::WalArchive;
24use crate::named::{DatabaseName, DatabaseOpenOptions};
25use crate::stream::{AutoCommitGuard, LiveCursor, QueryStream};
26use crate::transaction::{LiveStoreGuard, Transaction, TransactionMode};
27
28/// Minimal abstraction any transport can depend on to run Lora queries.
29pub trait QueryRunner: Send + Sync + 'static {
30    fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
31}
32
33/// Owns the graph store and orchestrates parse → analyze → compile → execute.
34///
35/// Optionally drives a write-ahead log: when constructed via
36/// [`Database::open_with_wal`] or [`Database::recover`] the database
37/// holds an [`Arc<WalRecorder>`] that brackets every query with
38/// `begin → mutations → commit/abort → flush` while the store write
39/// lock is held, so the WAL order is exactly the in-memory commit order.
40/// When constructed via [`Database::in_memory`] / [`Database::from_graph`]
41/// the WAL handle is `None` and the engine pays only the existing
42/// `MutationRecorder::record` null-pointer check per mutation.
43pub struct Database<S> {
44    pub(crate) store: Arc<RwLock<S>>,
45    pub(crate) wal: Option<Arc<WalRecorder>>,
46}
47
48impl Database<InMemoryGraph> {
49    /// Convenience constructor: a fresh, empty in-memory graph database.
50    pub fn in_memory() -> Self {
51        Self::from_graph(InMemoryGraph::new())
52    }
53
54    /// Open or create a WAL-enabled in-memory database from a fresh
55    /// graph.
56    ///
57    /// `WalConfig::Disabled` falls back to [`Database::in_memory`].
58    /// Otherwise, opens the WAL directory, replays any committed
59    /// events into a fresh graph, installs a [`WalRecorder`] on the
60    /// graph, and returns a database ready to serve queries.
61    ///
62    /// To restore from a snapshot in addition to the WAL, use
63    /// [`Database::recover`] instead.
64    pub fn open_with_wal(wal_config: WalConfig) -> Result<Self> {
65        match wal_config {
66            WalConfig::Disabled => Ok(Self::in_memory()),
67            WalConfig::Enabled {
68                dir,
69                sync_mode,
70                segment_target_bytes,
71            } => {
72                let mut graph = InMemoryGraph::new();
73                let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, Lsn::ZERO)?;
74                replay_into(&mut graph, events)?;
75                let recorder = Arc::new(WalRecorder::new(wal));
76                graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
77                Ok(Self {
78                    store: Arc::new(RwLock::new(graph)),
79                    wal: Some(recorder),
80                })
81            }
82        }
83    }
84
85    /// Open or create a named portable database rooted under
86    /// `options.database_dir`.
87    ///
88    /// The database name may be either a portable basename (`app` or
89    /// `app.loradb`) or a safe relative path (`tenant/app`). It is resolved
90    /// under `options.database_dir` before the WAL archive backend opens.
91    pub fn open_named(
92        database_name: impl AsRef<str>,
93        options: DatabaseOpenOptions,
94    ) -> Result<Self> {
95        let name = DatabaseName::parse(database_name.as_ref())?;
96        let archive = Arc::new(WalArchive::open(
97            options.database_path_for(&name),
98            options.max_database_bytes,
99        )?);
100        let mut graph = InMemoryGraph::new();
101        let (wal, events) = Wal::open(
102            archive.work_dir(),
103            options.sync_mode,
104            options.segment_target_bytes,
105            Lsn::ZERO,
106        )?;
107        replay_into(&mut graph, events)?;
108        let mirror: Arc<dyn WalMirror> = archive;
109        let recorder = Arc::new(WalRecorder::new_with_mirror(wal, Some(mirror)));
110        graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
111        // Mark the archive dirty so a fresh named database is materialized as
112        // a portable ZIP. The archive writer coalesces this with any immediate
113        // follow-up writes and flushes it in the background, with a final flush
114        // on database drop.
115        recorder
116            .flush()
117            .map_err(|e| anyhow!("initial database archive persist failed: {e}"))?;
118        Ok(Self {
119            store: Arc::new(RwLock::new(graph)),
120            wal: Some(recorder),
121        })
122    }
123
124    /// Start an explicit transaction.
125    ///
126    /// Read-only transactions hold a shared read lock for their
127    /// lifetime; read-write transactions hold the write lock. The
128    /// staging clone is **lazy** — it only happens when a
129    /// [`TransactionMode::ReadWrite`] transaction sees its first
130    /// mutating statement. Materialized read-only statements run
131    /// straight against the live graph; tx-bound streams may still
132    /// clone so their cursors can own a stable view. ReadWrite
133    /// transactions that perform only materialized reads (or commit
134    /// empty) pay nothing for staging.
135    pub fn begin_transaction(&self, mode: TransactionMode) -> Result<Transaction<'_>> {
136        let live = match mode {
137            TransactionMode::ReadOnly => LiveStoreGuard::Read(self.read_store()),
138            TransactionMode::ReadWrite => LiveStoreGuard::Write(self.write_store()),
139        };
140        Ok(Transaction::new(live, self.wal.clone(), mode))
141    }
142
143    /// Force any pending WAL bytes to durable storage and, for archive-backed
144    /// databases, refresh the portable `.loradb` file before returning.
145    pub fn sync(&self) -> Result<()> {
146        if let Some(wal) = &self.wal {
147            wal.force_fsync()?;
148        }
149        Ok(())
150    }
151
152    /// Restore from a snapshot file then replay any WAL records past
153    /// it.
154    ///
155    /// The snapshot's `wal_lsn` (when set) becomes the replay fence —
156    /// events at or below that LSN are already represented in the
157    /// loaded snapshot and are skipped. A missing snapshot file is
158    /// treated as "fresh start" so operators can pass the same path
159    /// on every boot.
160    ///
161    /// If the WAL contains a checkpoint marker newer than the
162    /// snapshot's `wal_lsn`, a one-line warning is printed to stderr
163    /// — the snapshot is stale relative to a more recent checkpoint
164    /// the operator is presumably aware of. Recovery still proceeds
165    /// from the snapshot's fence (replay re-applies every record
166    /// above it, which is conservative-correct); a tighter contract
167    /// is deferred to v2 because verifying that the marker's
168    /// snapshot file actually exists and is loadable is a separate
169    /// observability concern.
170    pub fn recover(snapshot_path: impl AsRef<Path>, wal_config: WalConfig) -> Result<Self> {
171        let snapshot_path = snapshot_path.as_ref();
172        let mut graph = InMemoryGraph::new();
173        let snapshot_lsn = match File::open(snapshot_path) {
174            Ok(f) => {
175                let reader = BufReader::new(f);
176                let meta = graph.load_snapshot(reader)?;
177                meta.wal_lsn.map(Lsn::new).unwrap_or(Lsn::ZERO)
178            }
179            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Lsn::ZERO,
180            Err(e) => return Err(e.into()),
181        };
182
183        match wal_config {
184            WalConfig::Disabled => Ok(Self::from_graph(graph)),
185            WalConfig::Enabled {
186                dir,
187                sync_mode,
188                segment_target_bytes,
189            } => {
190                // Diagnostic peek at the WAL's newest checkpoint
191                // marker so we can warn the operator about a stale
192                // snapshot before we start replaying. Treat any error
193                // as "no marker" — the subsequent `Wal::open` will
194                // surface the real failure if there is one.
195                if dir.exists() {
196                    if let Ok(outcome) = replay_dir(&dir, Lsn::ZERO) {
197                        if let Some(marker) = outcome.checkpoint_lsn_observed {
198                            if marker > snapshot_lsn {
199                                eprintln!(
200                                    "lora-wal: snapshot at LSN {} is older than the newest \
201                                     checkpoint marker on disk (LSN {}). Replaying every WAL \
202                                     record above LSN {}; consider passing the more recent \
203                                     snapshot to --restore-from.",
204                                    snapshot_lsn.raw(),
205                                    marker.raw(),
206                                    snapshot_lsn.raw()
207                                );
208                            }
209                        }
210                    }
211                }
212
213                let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, snapshot_lsn)?;
214                replay_into(&mut graph, events)?;
215                let recorder = Arc::new(WalRecorder::new(wal));
216                graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
217                Ok(Self {
218                    store: Arc::new(RwLock::new(graph)),
219                    wal: Some(recorder),
220                })
221            }
222        }
223    }
224
225    /// Execute a query and return an owning row stream.
226    pub fn stream(&self, query: &str) -> Result<QueryStream<'_>> {
227        self.stream_with_params(query, BTreeMap::new())
228    }
229
230    /// Execute a parameterised query and return an owning row stream.
231    ///
232    /// The compiled plan is classified at open time. Read-only
233    /// queries run directly off the live store and yield a
234    /// buffered cursor with plan-derived columns. Mutating queries
235    /// are routed through a hidden read-write [`Transaction`]:
236    /// full cursor exhaustion calls `tx.commit` (publishing staged
237    /// changes and replaying the tx-local WAL buffer); a premature
238    /// drop or any error from `next_row` calls `tx.rollback` so
239    /// the live store and the WAL stay untouched.
240    pub fn stream_with_params(
241        &self,
242        query: &str,
243        params: BTreeMap<String, LoraValue>,
244    ) -> Result<QueryStream<'_>> {
245        // Classify by compiling once against the live store. The
246        // mutating branch then re-compiles inside the hidden
247        // transaction (against a staged graph that's identical to
248        // live at clone time, so the second plan matches the
249        // first); the cost is one extra parse+analyze+compile per
250        // mutating stream, paid in exchange for a tiny
251        // classify-stream surface.
252        let document = parse_query(query)?;
253        let store_guard = self.read_store();
254        let resolved = {
255            let mut analyzer = Analyzer::new(&*store_guard);
256            analyzer.analyze(&document)?
257        };
258        let compiled = Compiler::compile(&resolved);
259        let columns = compiled_result_columns(&compiled);
260        let shape = classify_stream(&compiled);
261        // Release the analyzer's lock before either branch
262        // re-acquires (read-only path keeps it; mutating path
263        // delegates to begin_transaction which takes its own).
264        drop(store_guard);
265
266        match shape {
267            StreamShape::ReadOnly => {
268                // True pull-shaped streaming. `LiveCursor` holds
269                // the live store lock and the cursor that
270                // borrows from it; its `Drop` releases them in
271                // the right order so the caller observes pure
272                // pull semantics with no intermediate
273                // materialization.
274                let live = LiveCursor::open(self.store.clone(), compiled, params)?;
275                Ok(QueryStream::live(live, columns))
276            }
277            StreamShape::Mutating => {
278                // Hidden auto-commit transaction. The transaction
279                // owns staging, the buffering recorder, savepoint
280                // management, and the WAL replay-on-commit logic;
281                // we just pick commit-on-exhaustion vs
282                // rollback-on-drop based on cursor state.
283                //
284                // The cursor returned by `open_streaming_compiled_autocommit`
285                // may be a real per-row `StreamingWriteCursor`,
286                // a mutable UNION cursor, or a buffered leaf for
287                // operators that still need full materialization.
288                // Either way the AutoCommit guard's
289                // drop/exhaustion semantics are identical. The
290                // compiled plan is wrapped in an `Arc` so the
291                // cursor's `'static` borrows into it remain valid
292                // for the cursor's lifetime.
293                let mut tx = self.begin_transaction(TransactionMode::ReadWrite)?;
294                let compiled_arc = Arc::new(compiled);
295                let cursor =
296                    match tx.open_streaming_compiled_autocommit(compiled_arc.clone(), params) {
297                        Ok(c) => c,
298                        Err(err) => {
299                            // Tx rolls back implicitly on drop here.
300                            return Err(err);
301                        }
302                    };
303                let guard = AutoCommitGuard {
304                    tx: Some(tx),
305                    finalized: false,
306                };
307                Ok(QueryStream::auto_commit(cursor, columns, guard))
308            }
309        }
310    }
311
312    /// Open a stream whose lifetime can be carried by an outer owner that
313    /// also retains an `Arc<Database>`.
314    ///
315    /// # Safety
316    ///
317    /// The returned stream may contain lock guards that borrow from the
318    /// database's internal `RwLock`. The caller must keep this exact `Arc`
319    /// alive until the stream is dropped. This is intended for language
320    /// bindings that store both the `Arc<Database>` and the `QueryStream` in
321    /// the same opaque stream handle.
322    pub unsafe fn stream_with_params_owned(
323        self: &Arc<Self>,
324        query: &str,
325        params: BTreeMap<String, LoraValue>,
326    ) -> Result<QueryStream<'static>> {
327        let stream = self.stream_with_params(query, params)?;
328        Ok(std::mem::transmute::<QueryStream<'_>, QueryStream<'static>>(stream))
329    }
330}
331
332impl<S> Database<S>
333where
334    S: GraphStorage + GraphStorageMut,
335{
336    /// Build a database from a pre-wrapped, shared store.
337    pub fn new(store: Arc<RwLock<S>>) -> Self {
338        Self { store, wal: None }
339    }
340
341    /// Build a database by taking ownership of a bare graph store.
342    pub fn from_graph(graph: S) -> Self {
343        Self::new(Arc::new(RwLock::new(graph)))
344    }
345
346    /// Handle to the installed WAL recorder, if any. Exposed for
347    /// admin paths (checkpoint, truncate, observability) that need
348    /// to drive the WAL outside the standard query lifecycle.
349    pub fn wal(&self) -> Option<&Arc<WalRecorder>> {
350        self.wal.as_ref()
351    }
352
353    /// Handle to the underlying shared store — useful for callers that need
354    /// to snapshot or share the graph across multiple databases.
355    pub fn store(&self) -> &Arc<RwLock<S>> {
356        &self.store
357    }
358
359    /// Parse a query string into an AST without executing it.
360    pub fn parse(&self, query: &str) -> Result<Document> {
361        Ok(parse_query(query)?)
362    }
363
364    pub(crate) fn read_store(&self) -> RwLockReadGuard<'_, S> {
365        self.store
366            .read()
367            .unwrap_or_else(|poisoned| poisoned.into_inner())
368    }
369
370    pub(crate) fn write_store(&self) -> RwLockWriteGuard<'_, S> {
371        self.store
372            .write()
373            .unwrap_or_else(|poisoned| poisoned.into_inner())
374    }
375
376    fn read_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockReadGuard<'_, S>> {
377        let Some(deadline) = deadline else {
378            return Ok(self.read_store());
379        };
380
381        loop {
382            match self.store.try_read() {
383                Ok(guard) => return Ok(guard),
384                Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
385                Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
386                    return Err(anyhow!("query deadline exceeded"));
387                }
388                Err(TryLockError::WouldBlock) => {
389                    std::thread::sleep(Duration::from_millis(1));
390                }
391            }
392        }
393    }
394
395    fn write_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockWriteGuard<'_, S>> {
396        let Some(deadline) = deadline else {
397            return Ok(self.write_store());
398        };
399
400        loop {
401            match self.store.try_write() {
402                Ok(guard) => return Ok(guard),
403                Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
404                Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
405                    return Err(anyhow!("query deadline exceeded"));
406                }
407                Err(TryLockError::WouldBlock) => {
408                    std::thread::sleep(Duration::from_millis(1));
409                }
410            }
411        }
412    }
413
414    fn compile_document_against(&self, document: &Document, store: &S) -> Result<CompiledQuery> {
415        let resolved = {
416            let mut analyzer = Analyzer::new(store);
417            analyzer.analyze(document)?
418        };
419
420        Ok(Compiler::compile(&resolved))
421    }
422
423    /// Execute a query and return its result.
424    pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
425        self.execute_with_params(query, options, BTreeMap::new())
426    }
427
428    /// Execute a query with a cooperative deadline. The timeout is checked at
429    /// executor operator boundaries and hot scan loops; if it fires, the query
430    /// returns an error and any WAL-backed mutating query is aborted through
431    /// the existing failure path.
432    pub fn execute_with_timeout(
433        &self,
434        query: &str,
435        options: Option<ExecuteOptions>,
436        timeout: Duration,
437    ) -> Result<QueryResult> {
438        let deadline = Instant::now()
439            .checked_add(timeout)
440            .unwrap_or_else(Instant::now);
441        let rows =
442            self.execute_rows_with_params_deadline(query, BTreeMap::new(), Some(deadline))?;
443        Ok(project_rows(rows, options.unwrap_or_default()))
444    }
445
446    /// Execute a query with bound parameters.
447    ///
448    /// When a WAL is attached the call is bracketed by a transaction:
449    ///
450    /// 1. `recorder.arm()` after analyze + compile (so a parse /
451    ///    semantic / compile error never opens a tx that has to be
452    ///    immediately aborted). Arming is *cheap*: no record is
453    ///    appended to the WAL yet, so a pure read query that
454    ///    completes here pays nothing for the WAL hot path.
455    /// 2. The executor runs; every primitive mutation fires
456    ///    `MutationRecorder::record`, which buffers events in memory.
457    /// 3. On Ok, `recorder.commit()` writes `TxBegin`, one batched
458    ///    mutation record, and `TxCommit` only when mutations occurred;
459    ///    the surrounding `recorder.flush()` runs only in that case so
460    ///    a read-only query never pays an `fsync`.
461    /// 4. On Err, `recorder.abort()` clears the pending batch. The
462    ///    engine has no rollback, so the in-memory state may already
463    ///    be partially mutated; the live handle is quarantined while
464    ///    durable recovery stays atomic because no committed batch was
465    ///    written.
466    /// 5. The recorder's poisoned flag is polled once (it also
467    ///    surfaces background-flusher fsync failures from
468    ///    `SyncMode::Group`). If set, the query fails loudly with the
469    ///    durability error so the caller can act on it; the WAL
470    ///    refuses further appends until the operator restarts the
471    ///    database, which recovers from the last consistent
472    ///    snapshot + WAL.
473    pub fn execute_with_params(
474        &self,
475        query: &str,
476        options: Option<ExecuteOptions>,
477        params: BTreeMap<String, LoraValue>,
478    ) -> Result<QueryResult> {
479        let rows = self.execute_rows_with_params_deadline(query, params, None)?;
480        Ok(project_rows(rows, options.unwrap_or_default()))
481    }
482
483    /// Execute a parameterised query with a cooperative deadline.
484    pub fn execute_with_params_timeout(
485        &self,
486        query: &str,
487        options: Option<ExecuteOptions>,
488        params: BTreeMap<String, LoraValue>,
489        timeout: Duration,
490    ) -> Result<QueryResult> {
491        let deadline = Instant::now()
492            .checked_add(timeout)
493            .unwrap_or_else(Instant::now);
494        let rows = self.execute_rows_with_params_deadline(query, params, Some(deadline))?;
495        Ok(project_rows(rows, options.unwrap_or_default()))
496    }
497
498    /// Execute a query and return hydrated rows before final result-format
499    /// projection.
500    pub fn execute_rows(&self, query: &str) -> Result<Vec<Row>> {
501        self.execute_rows_with_params(query, BTreeMap::new())
502    }
503
504    /// Execute a query with parameters and return hydrated rows before final
505    /// result-format projection.
506    pub fn execute_rows_with_params(
507        &self,
508        query: &str,
509        params: BTreeMap<String, LoraValue>,
510    ) -> Result<Vec<Row>> {
511        self.execute_rows_with_params_deadline(query, params, None)
512    }
513
514    fn execute_rows_with_params_deadline(
515        &self,
516        query: &str,
517        params: BTreeMap<String, LoraValue>,
518        deadline: Option<Instant>,
519    ) -> Result<Vec<Row>> {
520        let document = self.parse(query)?;
521        let shape = {
522            let store = self.read_store_deadline(deadline)?;
523            let compiled = self.compile_document_against(&document, &*store)?;
524
525            if matches!(classify_stream(&compiled), StreamShape::ReadOnly) {
526                if let Some(rec) = &self.wal {
527                    if let Some(reason) = rec.poisoned() {
528                        return Err(anyhow!("WAL arm failed: WAL poisoned: {reason}"));
529                    }
530                }
531                let executor = lora_executor::Executor::with_deadline(
532                    lora_executor::ExecutionContext {
533                        storage: &*store,
534                        params,
535                    },
536                    deadline,
537                );
538                return executor
539                    .execute_compiled_rows(&compiled)
540                    .map_err(|e| anyhow!(e));
541            }
542
543            classify_stream(&compiled)
544        };
545
546        debug_assert!(shape.is_mutating());
547
548        let mut store = self.write_store_deadline(deadline)?;
549        let compiled = self.compile_document_against(&document, &*store)?;
550
551        if let Some(rec) = &self.wal {
552            rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
553        }
554
555        let exec_result: Result<Vec<Row>> = (|| {
556            let mut executor = MutableExecutor::with_deadline(
557                MutableExecutionContext {
558                    storage: &mut *store,
559                    params,
560                },
561                deadline,
562            );
563            Ok(executor.execute_compiled_rows(&compiled)?)
564        })();
565
566        if let Some(rec) = &self.wal {
567            match &exec_result {
568                Ok(_) => match rec.commit() {
569                    Ok(WroteCommit::Yes) => {
570                        rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
571                    }
572                    Ok(WroteCommit::No) => {
573                        // Read-only query: no records were written
574                        // and there is nothing to fsync. Skip flush
575                        // entirely so PerCommit pays zero fsyncs on
576                        // pure reads.
577                    }
578                    Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
579                },
580                Err(_) => {
581                    // Best-effort abort. If the WAL saw mutations, durable
582                    // recovery will discard them but the live in-memory store
583                    // may already be ahead of durable state. Quarantine this
584                    // handle so callers restart instead of serving from a
585                    // potentially divergent graph.
586                    if matches!(rec.abort(), Ok(true)) {
587                        rec.poison(
588                            "query mutated the live graph before failing; restart from snapshot + WAL required",
589                        );
590                    }
591                }
592            }
593            if let Some(reason) = rec.poisoned() {
594                return Err(anyhow!("WAL poisoned: {reason}"));
595            }
596        }
597
598        exec_result
599    }
600
601    // ---------- Storage-agnostic utility helpers ----------
602    //
603    // Bindings previously reached into the shared store lock to answer
604    // stat / admin calls; these helpers let them depend on `Database<S>`
605    // instead, so swapping in a new backend only requires changing one type
606    // parameter.
607
608    /// Drop every node and relationship, returning WAL/archive errors to the
609    /// caller.
610    ///
611    /// When a WAL is attached, the clear is wrapped in `arm`/`commit` so the
612    /// `MutationEvent::Clear` fired by the store reaches the log inside a
613    /// transaction. If a failure happens after the in-memory graph has been
614    /// cleared, the recorder is poisoned by the failing WAL path and future
615    /// writes fail until the database is reopened from durable state.
616    pub fn try_clear(&self) -> Result<()> {
617        let mut guard = self.write_store();
618        let Some(rec) = &self.wal else {
619            guard.clear();
620            return Ok(());
621        };
622
623        rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
624        guard.clear();
625        match rec.commit() {
626            Ok(WroteCommit::Yes) => {
627                rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
628            }
629            Ok(WroteCommit::No) => {}
630            Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
631        }
632        if let Some(reason) = rec.poisoned() {
633            return Err(anyhow!("WAL poisoned: {reason}"));
634        }
635        Ok(())
636    }
637
638    /// Drop every node and relationship.
639    ///
640    /// This compatibility helper keeps the historical infallible Rust API.
641    /// Bindings that can report errors should call [`Self::try_clear`].
642    pub fn clear(&self) {
643        let _ = self.try_clear();
644    }
645
646    /// Number of nodes currently in the graph.
647    pub fn node_count(&self) -> usize {
648        let guard = self.read_store();
649        guard.node_count()
650    }
651
652    /// Number of relationships currently in the graph.
653    pub fn relationship_count(&self) -> usize {
654        let guard = self.read_store();
655        guard.relationship_count()
656    }
657
658    /// Run a closure with a shared borrow of the underlying store. Used by
659    /// bindings to answer ad-hoc queries without locking the RwLock themselves.
660    pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
661        let guard = self.read_store();
662        f(&*guard)
663    }
664
665    /// Run a closure with an exclusive borrow of the underlying store. Reserved
666    /// for admin paths (restore, bulk load); regular mutation goes through
667    /// `execute_with_params`.
668    pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
669        let mut guard = self.write_store();
670        f(&mut *guard)
671    }
672}
673
674// ---------------------------------------------------------------------------
675// Snapshot helpers
676//
677// A second impl block so the `Snapshotable` bound only constrains backends
678// that actually need it. `Database<InMemoryGraph>` picks these up
679// automatically; hypothetical backends that don't implement `Snapshotable`
680// still get the core query API above.
681// ---------------------------------------------------------------------------
682
683impl<S> Database<S>
684where
685    S: GraphStorage + GraphStorageMut + Snapshotable,
686{
687    /// Serialize the current graph state to the given path. Writes are
688    /// atomic: the payload goes to `<path>.tmp`, is `fsync`'d, and then
689    /// renamed over the target; a torn write can never leave a half-written
690    /// file at `path`. If any step before the rename fails, the stale
691    /// `<path>.tmp` is removed so a crashed save never leaks scratch files.
692    ///
693    /// Holds a store read lock for the duration of the save so concurrent
694    /// readers can proceed and writers wait behind a consistent snapshot.
695    pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
696        let path = path.as_ref();
697        let tmp = snapshot_tmp_path(path);
698
699        // Acquire the lock once so the snapshot is point-in-time consistent.
700        let guard = self.read_store();
701
702        let file = OpenOptions::new()
703            .write(true)
704            .create(true)
705            .truncate(true)
706            .open(&tmp)?;
707        // Arm cleanup immediately after `open` succeeds: every early return
708        // below must either surface an error *and* unlink the tmp, or commit
709        // the guard once the rename takes effect.
710        let tmp_guard = TempFileGuard::new(tmp.clone());
711        let mut writer = BufWriter::new(file);
712
713        let meta = guard.save_snapshot(&mut writer)?;
714
715        // Flush the BufWriter before fsync; otherwise we fsync an empty
716        // underlying file.
717        use std::io::Write;
718        writer.flush()?;
719        let file = writer.into_inner().map_err(|e| e.into_error())?;
720        file.sync_all()?;
721        drop(file);
722
723        std::fs::rename(&tmp, path)?;
724        // The tmp path no longer has a file behind it — disarm the guard so
725        // it doesn't try to remove the just-renamed target by name race.
726        tmp_guard.commit();
727
728        // Best-effort parent-dir fsync so the rename itself is durable on
729        // power loss. Non-fatal if the parent can't be opened.
730        if let Some(parent) = path.parent() {
731            if let Ok(dir) = File::open(parent) {
732                let _ = dir.sync_all();
733            }
734        }
735
736        Ok(meta)
737    }
738
739    /// Replace the current graph state with a snapshot loaded from `path`.
740    /// Holds the store write lock for the duration of the load; concurrent
741    /// queries block until restore completes.
742    pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
743        let file = File::open(path.as_ref())?;
744        let reader = BufReader::new(file);
745
746        let mut guard = self.write_store();
747        Ok(guard.load_snapshot(reader)?)
748    }
749}
750
751impl Database<InMemoryGraph> {
752    /// Convenience constructor: open (or create) an empty in-memory database
753    /// and immediately restore it from `path`. Errors if the file cannot be
754    /// opened or the snapshot is malformed.
755    pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
756        let db = Self::in_memory();
757        db.load_snapshot_from(path)?;
758        Ok(db)
759    }
760
761    /// Take a checkpoint: snapshot the current state with the WAL's
762    /// `durable_lsn` stamped into the header, append a `Checkpoint`
763    /// marker to the WAL, then drop sealed segments at or below the
764    /// fence.
765    ///
766    /// Errors with "checkpoint requires WAL enabled" when called on a
767    /// database constructed without a WAL — operators that just want
768    /// a fence-less dump should use [`save_snapshot_to`] instead.
769    ///
770    /// The write-lock-held window covers snapshot serialization plus the
771    /// checkpoint marker append. Truncation runs after the rename
772    /// but still under the write lock; making it concurrent with queries
773    /// is a v2 concern (see `docs/decisions/0004-wal.md`).
774    pub fn checkpoint_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
775        let recorder = self
776            .wal
777            .as_ref()
778            .ok_or_else(|| anyhow!("checkpoint requires WAL enabled"))?;
779        let path = path.as_ref();
780        let tmp = snapshot_tmp_path(path);
781
782        let guard = self.write_store();
783
784        // Make every record appended so far durable, then capture
785        // the LSN that becomes the snapshot fence.
786        recorder
787            .force_fsync()
788            .map_err(|e| anyhow!("WAL fsync before checkpoint failed: {e}"))?;
789        let snapshot_lsn = recorder.wal().durable_lsn();
790
791        let file = OpenOptions::new()
792            .write(true)
793            .create(true)
794            .truncate(true)
795            .open(&tmp)?;
796        let tmp_guard = TempFileGuard::new(tmp.clone());
797        let mut writer = BufWriter::new(file);
798        let meta = guard.save_checkpoint(&mut writer, snapshot_lsn.raw())?;
799
800        use std::io::Write;
801        writer.flush()?;
802        let file = writer.into_inner().map_err(|e| e.into_error())?;
803        file.sync_all()?;
804        drop(file);
805
806        std::fs::rename(&tmp, path)?;
807        tmp_guard.commit();
808
809        if let Some(parent) = path.parent() {
810            if let Ok(dir) = File::open(parent) {
811                let _ = dir.sync_all();
812            }
813        }
814
815        // Append the checkpoint marker AFTER the rename succeeds —
816        // this preserves the invariant that a `Checkpoint` record
817        // in the WAL implies the snapshot it points at exists.
818        recorder
819            .checkpoint_marker(snapshot_lsn)
820            .map_err(|e| anyhow!("WAL checkpoint marker failed: {e}"))?;
821        recorder
822            .force_fsync()
823            .map_err(|e| anyhow!("WAL fsync after checkpoint marker failed: {e}"))?;
824
825        // Best-effort segment truncation. Failure here doesn't undo
826        // the checkpoint — the next call will retry.
827        let _ = recorder.truncate_up_to(snapshot_lsn);
828
829        Ok(meta)
830    }
831}
832
833fn snapshot_tmp_path(target: &Path) -> PathBuf {
834    let mut tmp = target.as_os_str().to_owned();
835    tmp.push(".tmp");
836    PathBuf::from(tmp)
837}
838
839/// RAII handle that deletes its path on drop unless [`commit`] is called.
840///
841/// The snapshot save path creates `<target>.tmp` before the payload is
842/// written; if any step between then and the final rename fails (or the
843/// thread unwinds), the guard's `Drop` removes the scratch file so a crashed
844/// save never leaves leftovers on disk.
845///
846/// [`commit`]: Self::commit
847struct TempFileGuard {
848    path: Option<PathBuf>,
849}
850
851impl TempFileGuard {
852    fn new(path: PathBuf) -> Self {
853        Self { path: Some(path) }
854    }
855
856    /// Disarm the guard. Call this once the tmp file's contents have been
857    /// handed off (e.g. renamed to their final destination) so the `Drop`
858    /// impl does not try to remove them.
859    fn commit(mut self) {
860        self.path.take();
861    }
862}
863
864impl Drop for TempFileGuard {
865    fn drop(&mut self) {
866        if let Some(path) = self.path.take() {
867            // Best-effort: cleanup failure is not worth surfacing — the
868            // worst case is a leaked scratch file that the next save
869            // overwrites via `OpenOptions::truncate(true)`.
870            let _ = std::fs::remove_file(path);
871        }
872    }
873}
874
875/// Storage-agnostic admin surface for HTTP / binding callers that want to
876/// drive snapshot operations without naming the backend type parameter.
877///
878/// `Database<S>` picks up a blanket impl when `S: Snapshotable + 'static`.
879/// Transports (e.g. `lora-server`) type-erase on `Arc<dyn SnapshotAdmin>`.
880pub trait SnapshotAdmin: Send + Sync + 'static {
881    fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
882    fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
883}
884
885impl<S> SnapshotAdmin for Database<S>
886where
887    S: GraphStorage + GraphStorageMut + Snapshotable + Send + Sync + 'static,
888{
889    fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
890        self.save_snapshot_to(path)
891    }
892
893    fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
894        self.load_snapshot_from(path)
895    }
896}
897
898/// Storage-agnostic admin surface for the WAL.
899///
900/// `Database<InMemoryGraph>` picks up the blanket impl below when a
901/// WAL is attached. Transports (e.g. `lora-server`) type-erase on
902/// `Arc<dyn WalAdmin>` so they don't need to name the backend type
903/// parameter.
904///
905/// All LSNs cross the trait boundary as raw `u64` so callers don't
906/// need a dependency on `lora-wal`.
907pub trait WalAdmin: Send + Sync + 'static {
908    /// Take a checkpoint at `path`. The snapshot's header is stamped
909    /// with the WAL's `durable_lsn`; older sealed segments are then
910    /// dropped.
911    fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta>;
912
913    /// Snapshot of the WAL's current state — durable / next LSN,
914    /// active / oldest segment id. Cheap; a single WAL mutex acquisition.
915    fn wal_status(&self) -> Result<WalStatus>;
916
917    /// Drop sealed segments at or below `fence_lsn`. Idempotent.
918    fn wal_truncate(&self, fence_lsn: u64) -> Result<()>;
919}
920
921/// Snapshot of WAL state returned by [`WalAdmin::wal_status`].
922///
923/// `bg_failure` is the latched fsync error from the background flusher
924/// (only meaningful under `SyncMode::Group`). When `Some`, the WAL is
925/// poisoned and every subsequent commit will fail loudly until the
926/// operator restarts from the last consistent snapshot + WAL.
927#[derive(Debug, Clone)]
928pub struct WalStatus {
929    pub durable_lsn: u64,
930    pub next_lsn: u64,
931    pub active_segment_id: u64,
932    pub oldest_segment_id: u64,
933    pub bg_failure: Option<String>,
934}
935
936impl WalAdmin for Database<InMemoryGraph> {
937    fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta> {
938        self.checkpoint_to(path)
939    }
940
941    fn wal_status(&self) -> Result<WalStatus> {
942        let recorder = self
943            .wal
944            .as_ref()
945            .ok_or_else(|| anyhow!("WAL not enabled"))?;
946        let wal = recorder.wal();
947        Ok(WalStatus {
948            durable_lsn: wal.durable_lsn().raw(),
949            next_lsn: wal.next_lsn().raw(),
950            active_segment_id: wal.active_segment_id(),
951            oldest_segment_id: wal.oldest_segment_id(),
952            bg_failure: wal.bg_failure(),
953        })
954    }
955
956    fn wal_truncate(&self, fence_lsn: u64) -> Result<()> {
957        let recorder = self
958            .wal
959            .as_ref()
960            .ok_or_else(|| anyhow!("WAL not enabled"))?;
961        recorder.truncate_up_to(Lsn::new(fence_lsn))?;
962        Ok(())
963    }
964}
965
966impl<S> QueryRunner for Database<S>
967where
968    S: GraphStorage + GraphStorageMut + Send + Sync + 'static,
969{
970    fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
971        Database::execute(self, query, options)
972    }
973}
974
975// ---------------------------------------------------------------------------
976// Replay
977// ---------------------------------------------------------------------------
978
979/// Apply a `MutationEvent` stream to an in-memory graph by dispatching
980/// each variant to the matching store operation.
981///
982/// Creation events are replayed through id-preserving paths, not the
983/// normal allocator-backed mutation methods. That matters after aborted
984/// transactions: an aborted create can consume id `N` in the original
985/// process, be dropped by replay, and leave the next committed create at
986/// id `N + 1`. Reusing the regular allocator would shift ids downward.
987///
988/// Replay must be invoked **before** the `WalRecorder` is installed
989/// on the graph. Otherwise the replay's own mutations would fire the
990/// recorder and re-write the same events to the WAL, doubling them on
991/// the next recovery.
992fn replay_into(graph: &mut InMemoryGraph, events: Vec<MutationEvent>) -> Result<()> {
993    for (idx, event) in events.into_iter().enumerate() {
994        match event {
995            MutationEvent::CreateNode {
996                id,
997                labels,
998                properties,
999            } => {
1000                graph
1001                    .replay_create_node(id, labels, properties)
1002                    .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
1003            }
1004            MutationEvent::CreateRelationship {
1005                id,
1006                src,
1007                dst,
1008                rel_type,
1009                properties,
1010            } => {
1011                graph
1012                    .replay_create_relationship(id, src, dst, &rel_type, properties)
1013                    .map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
1014            }
1015            MutationEvent::SetNodeProperty {
1016                node_id,
1017                key,
1018                value,
1019            } => {
1020                if !graph.set_node_property(node_id, key, value) {
1021                    return Err(anyhow!(
1022                        "WAL replay failed at event {idx}: missing node {node_id} for property set"
1023                    ));
1024                }
1025            }
1026            MutationEvent::RemoveNodeProperty { node_id, key } => {
1027                if !graph.remove_node_property(node_id, &key) {
1028                    return Err(anyhow!(
1029                        "WAL replay failed at event {idx}: missing node {node_id} for property removal"
1030                    ));
1031                }
1032            }
1033            MutationEvent::AddNodeLabel { node_id, label } => {
1034                if !graph.add_node_label(node_id, &label) {
1035                    return Err(anyhow!(
1036                        "WAL replay failed at event {idx}: missing node {node_id} for label add"
1037                    ));
1038                }
1039            }
1040            MutationEvent::RemoveNodeLabel { node_id, label } => {
1041                if !graph.remove_node_label(node_id, &label) {
1042                    return Err(anyhow!(
1043                        "WAL replay failed at event {idx}: missing node {node_id} for label removal"
1044                    ));
1045                }
1046            }
1047            MutationEvent::SetRelationshipProperty { rel_id, key, value } => {
1048                if !graph.set_relationship_property(rel_id, key, value) {
1049                    return Err(anyhow!(
1050                        "WAL replay failed at event {idx}: missing relationship {rel_id} for property set"
1051                    ));
1052                }
1053            }
1054            MutationEvent::RemoveRelationshipProperty { rel_id, key } => {
1055                if !graph.remove_relationship_property(rel_id, &key) {
1056                    return Err(anyhow!(
1057                        "WAL replay failed at event {idx}: missing relationship {rel_id} for property removal"
1058                    ));
1059                }
1060            }
1061            MutationEvent::DeleteRelationship { rel_id } => {
1062                if !graph.delete_relationship(rel_id) {
1063                    return Err(anyhow!(
1064                        "WAL replay failed at event {idx}: missing relationship {rel_id} for delete"
1065                    ));
1066                }
1067            }
1068            MutationEvent::DeleteNode { node_id } => {
1069                if !graph.delete_node(node_id) {
1070                    return Err(anyhow!(
1071                        "WAL replay failed at event {idx}: missing or attached node {node_id} for delete"
1072                    ));
1073                }
1074            }
1075            MutationEvent::DetachDeleteNode { node_id } => {
1076                // After the cascading DeleteRelationship +
1077                // DeleteNode events have already replayed, the node
1078                // is gone and this becomes a no-op. Calling it
1079                // anyway is harmless.
1080                graph.detach_delete_node(node_id);
1081            }
1082            MutationEvent::Clear => {
1083                graph.clear();
1084            }
1085        }
1086    }
1087    Ok(())
1088}