use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard};
use anyhow::Result;
use lora_analyzer::Analyzer;
use lora_ast::Document;
use lora_compiler::{CompiledQuery, Compiler};
use lora_executor::{
ExecuteOptions, LoraValue, MutableExecutionContext, MutableExecutor, QueryResult,
};
use lora_parser::parse_query;
use lora_store::{GraphStorage, GraphStorageMut, InMemoryGraph, SnapshotMeta, Snapshotable};
pub trait QueryRunner: Send + Sync + 'static {
fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
}
pub struct Database<S> {
store: Arc<Mutex<S>>,
}
impl Database<InMemoryGraph> {
pub fn in_memory() -> Self {
Self::from_graph(InMemoryGraph::new())
}
}
impl<S> Database<S>
where
S: GraphStorage + GraphStorageMut,
{
pub fn new(store: Arc<Mutex<S>>) -> Self {
Self { store }
}
pub fn from_graph(graph: S) -> Self {
Self::new(Arc::new(Mutex::new(graph)))
}
pub fn store(&self) -> &Arc<Mutex<S>> {
&self.store
}
pub fn parse(&self, query: &str) -> Result<Document> {
Ok(parse_query(query)?)
}
fn lock_store(&self) -> MutexGuard<'_, S> {
self.store
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
fn compile_query(&self, query: &str) -> Result<(MutexGuard<'_, S>, CompiledQuery)> {
let document = self.parse(query)?;
let store = self.lock_store();
let resolved = {
let mut analyzer = Analyzer::new(&*store);
analyzer.analyze(&document)?
};
let compiled = Compiler::compile(&resolved);
Ok((store, compiled))
}
pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
self.execute_with_params(query, options, BTreeMap::new())
}
pub fn execute_with_params(
&self,
query: &str,
options: Option<ExecuteOptions>,
params: BTreeMap<String, LoraValue>,
) -> Result<QueryResult> {
let (mut store, compiled) = self.compile_query(query)?;
let mut executor = MutableExecutor::new(MutableExecutionContext {
storage: &mut *store,
params,
});
Ok(executor.execute_compiled(&compiled, options)?)
}
pub fn clear(&self) {
let mut guard = self.lock_store();
guard.clear();
}
pub fn node_count(&self) -> usize {
let guard = self.lock_store();
guard.node_count()
}
pub fn relationship_count(&self) -> usize {
let guard = self.lock_store();
guard.relationship_count()
}
pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
let guard = self.lock_store();
f(&*guard)
}
pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
let mut guard = self.lock_store();
f(&mut *guard)
}
}
impl<S> Database<S>
where
S: GraphStorage + GraphStorageMut + Snapshotable,
{
pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
let path = path.as_ref();
let tmp = snapshot_tmp_path(path);
let guard = self.lock_store();
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp)?;
let tmp_guard = TempFileGuard::new(tmp.clone());
let mut writer = BufWriter::new(file);
let meta = guard.save_snapshot(&mut writer)?;
use std::io::Write;
writer.flush()?;
let file = writer.into_inner().map_err(|e| e.into_error())?;
file.sync_all()?;
drop(file);
std::fs::rename(&tmp, path)?;
tmp_guard.commit();
if let Some(parent) = path.parent() {
if let Ok(dir) = File::open(parent) {
let _ = dir.sync_all();
}
}
Ok(meta)
}
pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
let file = File::open(path.as_ref())?;
let reader = BufReader::new(file);
let mut guard = self.lock_store();
Ok(guard.load_snapshot(reader)?)
}
}
impl Database<InMemoryGraph> {
pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
let db = Self::in_memory();
db.load_snapshot_from(path)?;
Ok(db)
}
}
fn snapshot_tmp_path(target: &Path) -> PathBuf {
let mut tmp = target.as_os_str().to_owned();
tmp.push(".tmp");
PathBuf::from(tmp)
}
struct TempFileGuard {
path: Option<PathBuf>,
}
impl TempFileGuard {
fn new(path: PathBuf) -> Self {
Self { path: Some(path) }
}
fn commit(mut self) {
self.path.take();
}
}
impl Drop for TempFileGuard {
fn drop(&mut self) {
if let Some(path) = self.path.take() {
let _ = std::fs::remove_file(path);
}
}
}
pub trait SnapshotAdmin: Send + Sync + 'static {
fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
}
impl<S> SnapshotAdmin for Database<S>
where
S: GraphStorage + GraphStorageMut + Snapshotable + Send + 'static,
{
fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
self.save_snapshot_to(path)
}
fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
self.load_snapshot_from(path)
}
}
impl<S> QueryRunner for Database<S>
where
S: GraphStorage + GraphStorageMut + Send + 'static,
{
fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
Database::execute(self, query, options)
}
}