Skip to main content

lora_database/
database.rs

1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, BufWriter};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use anyhow::Result;
8use lora_analyzer::Analyzer;
9use lora_ast::Document;
10use lora_compiler::{CompiledQuery, Compiler};
11use lora_executor::{
12    ExecuteOptions, LoraValue, MutableExecutionContext, MutableExecutor, QueryResult,
13};
14use lora_parser::parse_query;
15use lora_store::{GraphStorage, GraphStorageMut, InMemoryGraph, SnapshotMeta, Snapshotable};
16
17/// Minimal abstraction any transport can depend on to run Lora queries.
18pub trait QueryRunner: Send + Sync + 'static {
19    fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
20}
21
22/// Owns the graph store and orchestrates parse → analyze → compile → execute.
23pub struct Database<S> {
24    store: Arc<Mutex<S>>,
25}
26
27impl Database<InMemoryGraph> {
28    /// Convenience constructor: a fresh, empty in-memory graph database.
29    pub fn in_memory() -> Self {
30        Self::from_graph(InMemoryGraph::new())
31    }
32}
33
34impl<S> Database<S>
35where
36    S: GraphStorage + GraphStorageMut,
37{
38    /// Build a database from a pre-wrapped, shared store.
39    pub fn new(store: Arc<Mutex<S>>) -> Self {
40        Self { store }
41    }
42
43    /// Build a database by taking ownership of a bare graph store.
44    pub fn from_graph(graph: S) -> Self {
45        Self::new(Arc::new(Mutex::new(graph)))
46    }
47
48    /// Handle to the underlying shared store — useful for callers that need
49    /// to snapshot or share the graph across multiple databases.
50    pub fn store(&self) -> &Arc<Mutex<S>> {
51        &self.store
52    }
53
54    /// Parse a query string into an AST without executing it.
55    pub fn parse(&self, query: &str) -> Result<Document> {
56        Ok(parse_query(query)?)
57    }
58
59    fn lock_store(&self) -> MutexGuard<'_, S> {
60        self.store
61            .lock()
62            .unwrap_or_else(|poisoned| poisoned.into_inner())
63    }
64
65    fn compile_query(&self, query: &str) -> Result<(MutexGuard<'_, S>, CompiledQuery)> {
66        let document = self.parse(query)?;
67        let store = self.lock_store();
68
69        let resolved = {
70            let mut analyzer = Analyzer::new(&*store);
71            analyzer.analyze(&document)?
72        };
73
74        let compiled = Compiler::compile(&resolved);
75        Ok((store, compiled))
76    }
77
78    /// Execute a query and return its result.
79    pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
80        self.execute_with_params(query, options, BTreeMap::new())
81    }
82
83    /// Execute a query with bound parameters.
84    pub fn execute_with_params(
85        &self,
86        query: &str,
87        options: Option<ExecuteOptions>,
88        params: BTreeMap<String, LoraValue>,
89    ) -> Result<QueryResult> {
90        let (mut store, compiled) = self.compile_query(query)?;
91
92        let mut executor = MutableExecutor::new(MutableExecutionContext {
93            storage: &mut *store,
94            params,
95        });
96
97        Ok(executor.execute_compiled(&compiled, options)?)
98    }
99
100    // ---------- Storage-agnostic utility helpers ----------
101    //
102    // Bindings previously reached into `Arc<Mutex<InMemoryGraph>>` to answer
103    // stat / admin calls; these helpers let them depend on `Database<S>`
104    // instead, so swapping in a new backend only requires changing one type
105    // parameter.
106
107    /// Drop every node and relationship.
108    pub fn clear(&self) {
109        let mut guard = self.lock_store();
110        guard.clear();
111    }
112
113    /// Number of nodes currently in the graph.
114    pub fn node_count(&self) -> usize {
115        let guard = self.lock_store();
116        guard.node_count()
117    }
118
119    /// Number of relationships currently in the graph.
120    pub fn relationship_count(&self) -> usize {
121        let guard = self.lock_store();
122        guard.relationship_count()
123    }
124
125    /// Run a closure with a shared borrow of the underlying store. Used by
126    /// bindings to answer ad-hoc queries without locking the mutex themselves.
127    pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
128        let guard = self.lock_store();
129        f(&*guard)
130    }
131
132    /// Run a closure with an exclusive borrow of the underlying store. Reserved
133    /// for admin paths (restore, bulk load); regular mutation goes through
134    /// `execute_with_params`.
135    pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
136        let mut guard = self.lock_store();
137        f(&mut *guard)
138    }
139}
140
141// ---------------------------------------------------------------------------
142// Snapshot helpers
143//
144// A second impl block so the `Snapshotable` bound only constrains backends
145// that actually need it. `Database<InMemoryGraph>` picks these up
146// automatically; hypothetical backends that don't implement `Snapshotable`
147// still get the core query API above.
148// ---------------------------------------------------------------------------
149
150impl<S> Database<S>
151where
152    S: GraphStorage + GraphStorageMut + Snapshotable,
153{
154    /// Serialize the current graph state to the given path. Writes are
155    /// atomic: the payload goes to `<path>.tmp`, is `fsync`'d, and then
156    /// renamed over the target; a torn write can never leave a half-written
157    /// file at `path`. If any step before the rename fails, the stale
158    /// `<path>.tmp` is removed so a crashed save never leaks scratch files.
159    ///
160    /// Holds the store mutex for the duration of the save so concurrent
161    /// queries see a consistent point-in-time snapshot.
162    pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
163        let path = path.as_ref();
164        let tmp = snapshot_tmp_path(path);
165
166        // Acquire the lock once so the snapshot is point-in-time consistent.
167        let guard = self.lock_store();
168
169        let file = OpenOptions::new()
170            .write(true)
171            .create(true)
172            .truncate(true)
173            .open(&tmp)?;
174        // Arm cleanup immediately after `open` succeeds: every early return
175        // below must either surface an error *and* unlink the tmp, or commit
176        // the guard once the rename takes effect.
177        let tmp_guard = TempFileGuard::new(tmp.clone());
178        let mut writer = BufWriter::new(file);
179
180        let meta = guard.save_snapshot(&mut writer)?;
181
182        // Flush the BufWriter before fsync; otherwise we fsync an empty
183        // underlying file.
184        use std::io::Write;
185        writer.flush()?;
186        let file = writer.into_inner().map_err(|e| e.into_error())?;
187        file.sync_all()?;
188        drop(file);
189
190        std::fs::rename(&tmp, path)?;
191        // The tmp path no longer has a file behind it — disarm the guard so
192        // it doesn't try to remove the just-renamed target by name race.
193        tmp_guard.commit();
194
195        // Best-effort parent-dir fsync so the rename itself is durable on
196        // power loss. Non-fatal if the parent can't be opened.
197        if let Some(parent) = path.parent() {
198            if let Ok(dir) = File::open(parent) {
199                let _ = dir.sync_all();
200            }
201        }
202
203        Ok(meta)
204    }
205
206    /// Replace the current graph state with a snapshot loaded from `path`.
207    /// Holds the store mutex for the duration of the load; concurrent
208    /// queries block until restore completes.
209    pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
210        let file = File::open(path.as_ref())?;
211        let reader = BufReader::new(file);
212
213        let mut guard = self.lock_store();
214        Ok(guard.load_snapshot(reader)?)
215    }
216}
217
218impl Database<InMemoryGraph> {
219    /// Convenience constructor: open (or create) an empty in-memory database
220    /// and immediately restore it from `path`. Errors if the file cannot be
221    /// opened or the snapshot is malformed.
222    pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
223        let db = Self::in_memory();
224        db.load_snapshot_from(path)?;
225        Ok(db)
226    }
227}
228
229fn snapshot_tmp_path(target: &Path) -> PathBuf {
230    let mut tmp = target.as_os_str().to_owned();
231    tmp.push(".tmp");
232    PathBuf::from(tmp)
233}
234
235/// RAII handle that deletes its path on drop unless [`commit`] is called.
236///
237/// The snapshot save path creates `<target>.tmp` before the payload is
238/// written; if any step between then and the final rename fails (or the
239/// thread unwinds), the guard's `Drop` removes the scratch file so a crashed
240/// save never leaves leftovers on disk.
241///
242/// [`commit`]: Self::commit
243struct TempFileGuard {
244    path: Option<PathBuf>,
245}
246
247impl TempFileGuard {
248    fn new(path: PathBuf) -> Self {
249        Self { path: Some(path) }
250    }
251
252    /// Disarm the guard. Call this once the tmp file's contents have been
253    /// handed off (e.g. renamed to their final destination) so the `Drop`
254    /// impl does not try to remove them.
255    fn commit(mut self) {
256        self.path.take();
257    }
258}
259
260impl Drop for TempFileGuard {
261    fn drop(&mut self) {
262        if let Some(path) = self.path.take() {
263            // Best-effort: cleanup failure is not worth surfacing — the
264            // worst case is a leaked scratch file that the next save
265            // overwrites via `OpenOptions::truncate(true)`.
266            let _ = std::fs::remove_file(path);
267        }
268    }
269}
270
271/// Storage-agnostic admin surface for HTTP / binding callers that want to
272/// drive snapshot operations without naming the backend type parameter.
273///
274/// `Database<S>` picks up a blanket impl when `S: Snapshotable + 'static`.
275/// Transports (e.g. `lora-server`) type-erase on `Arc<dyn SnapshotAdmin>`.
276pub trait SnapshotAdmin: Send + Sync + 'static {
277    fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
278    fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
279}
280
281impl<S> SnapshotAdmin for Database<S>
282where
283    S: GraphStorage + GraphStorageMut + Snapshotable + Send + 'static,
284{
285    fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
286        self.save_snapshot_to(path)
287    }
288
289    fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
290        self.load_snapshot_from(path)
291    }
292}
293
294impl<S> QueryRunner for Database<S>
295where
296    S: GraphStorage + GraphStorageMut + Send + 'static,
297{
298    fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
299        Database::execute(self, query, options)
300    }
301}