lora_database/
database.rs1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, BufWriter};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use anyhow::Result;
8use lora_analyzer::Analyzer;
9use lora_ast::Document;
10use lora_compiler::{CompiledQuery, Compiler};
11use lora_executor::{
12 ExecuteOptions, LoraValue, MutableExecutionContext, MutableExecutor, QueryResult,
13};
14use lora_parser::parse_query;
15use lora_store::{GraphStorage, GraphStorageMut, InMemoryGraph, SnapshotMeta, Snapshotable};
16
17pub trait QueryRunner: Send + Sync + 'static {
19 fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
20}
21
22pub struct Database<S> {
24 store: Arc<Mutex<S>>,
25}
26
27impl Database<InMemoryGraph> {
28 pub fn in_memory() -> Self {
30 Self::from_graph(InMemoryGraph::new())
31 }
32}
33
34impl<S> Database<S>
35where
36 S: GraphStorage + GraphStorageMut,
37{
38 pub fn new(store: Arc<Mutex<S>>) -> Self {
40 Self { store }
41 }
42
43 pub fn from_graph(graph: S) -> Self {
45 Self::new(Arc::new(Mutex::new(graph)))
46 }
47
48 pub fn store(&self) -> &Arc<Mutex<S>> {
51 &self.store
52 }
53
54 pub fn parse(&self, query: &str) -> Result<Document> {
56 Ok(parse_query(query)?)
57 }
58
59 fn lock_store(&self) -> MutexGuard<'_, S> {
60 self.store
61 .lock()
62 .unwrap_or_else(|poisoned| poisoned.into_inner())
63 }
64
65 fn compile_query(&self, query: &str) -> Result<(MutexGuard<'_, S>, CompiledQuery)> {
66 let document = self.parse(query)?;
67 let store = self.lock_store();
68
69 let resolved = {
70 let mut analyzer = Analyzer::new(&*store);
71 analyzer.analyze(&document)?
72 };
73
74 let compiled = Compiler::compile(&resolved);
75 Ok((store, compiled))
76 }
77
78 pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
80 self.execute_with_params(query, options, BTreeMap::new())
81 }
82
83 pub fn execute_with_params(
85 &self,
86 query: &str,
87 options: Option<ExecuteOptions>,
88 params: BTreeMap<String, LoraValue>,
89 ) -> Result<QueryResult> {
90 let (mut store, compiled) = self.compile_query(query)?;
91
92 let mut executor = MutableExecutor::new(MutableExecutionContext {
93 storage: &mut *store,
94 params,
95 });
96
97 Ok(executor.execute_compiled(&compiled, options)?)
98 }
99
100 pub fn clear(&self) {
109 let mut guard = self.lock_store();
110 guard.clear();
111 }
112
113 pub fn node_count(&self) -> usize {
115 let guard = self.lock_store();
116 guard.node_count()
117 }
118
119 pub fn relationship_count(&self) -> usize {
121 let guard = self.lock_store();
122 guard.relationship_count()
123 }
124
125 pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
128 let guard = self.lock_store();
129 f(&*guard)
130 }
131
132 pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
136 let mut guard = self.lock_store();
137 f(&mut *guard)
138 }
139}
140
141impl<S> Database<S>
151where
152 S: GraphStorage + GraphStorageMut + Snapshotable,
153{
154 pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
163 let path = path.as_ref();
164 let tmp = snapshot_tmp_path(path);
165
166 let guard = self.lock_store();
168
169 let file = OpenOptions::new()
170 .write(true)
171 .create(true)
172 .truncate(true)
173 .open(&tmp)?;
174 let tmp_guard = TempFileGuard::new(tmp.clone());
178 let mut writer = BufWriter::new(file);
179
180 let meta = guard.save_snapshot(&mut writer)?;
181
182 use std::io::Write;
185 writer.flush()?;
186 let file = writer.into_inner().map_err(|e| e.into_error())?;
187 file.sync_all()?;
188 drop(file);
189
190 std::fs::rename(&tmp, path)?;
191 tmp_guard.commit();
194
195 if let Some(parent) = path.parent() {
198 if let Ok(dir) = File::open(parent) {
199 let _ = dir.sync_all();
200 }
201 }
202
203 Ok(meta)
204 }
205
206 pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
210 let file = File::open(path.as_ref())?;
211 let reader = BufReader::new(file);
212
213 let mut guard = self.lock_store();
214 Ok(guard.load_snapshot(reader)?)
215 }
216}
217
218impl Database<InMemoryGraph> {
219 pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
223 let db = Self::in_memory();
224 db.load_snapshot_from(path)?;
225 Ok(db)
226 }
227}
228
229fn snapshot_tmp_path(target: &Path) -> PathBuf {
230 let mut tmp = target.as_os_str().to_owned();
231 tmp.push(".tmp");
232 PathBuf::from(tmp)
233}
234
235struct TempFileGuard {
244 path: Option<PathBuf>,
245}
246
247impl TempFileGuard {
248 fn new(path: PathBuf) -> Self {
249 Self { path: Some(path) }
250 }
251
252 fn commit(mut self) {
256 self.path.take();
257 }
258}
259
260impl Drop for TempFileGuard {
261 fn drop(&mut self) {
262 if let Some(path) = self.path.take() {
263 let _ = std::fs::remove_file(path);
267 }
268 }
269}
270
271pub trait SnapshotAdmin: Send + Sync + 'static {
277 fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
278 fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
279}
280
281impl<S> SnapshotAdmin for Database<S>
282where
283 S: GraphStorage + GraphStorageMut + Snapshotable + Send + 'static,
284{
285 fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
286 self.save_snapshot_to(path)
287 }
288
289 fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
290 self.load_snapshot_from(path)
291 }
292}
293
294impl<S> QueryRunner for Database<S>
295where
296 S: GraphStorage + GraphStorageMut + Send + 'static,
297{
298 fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
299 Database::execute(self, query, options)
300 }
301}