use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError};
use std::time::{Duration, Instant};
use anyhow::{anyhow, Result};
use lora_analyzer::Analyzer;
use lora_ast::Document;
use lora_compiler::{CompiledQuery, Compiler};
use lora_executor::{
classify_stream, compiled_result_columns, project_rows, ExecuteOptions, LoraValue,
MutableExecutionContext, MutableExecutor, QueryResult, Row, StreamShape,
};
use lora_parser::parse_query;
use lora_store::{
GraphStorage, GraphStorageMut, InMemoryGraph, MutationEvent, MutationRecorder, SnapshotMeta,
Snapshotable,
};
use lora_wal::{replay_dir, Lsn, Wal, WalConfig, WalMirror, WalRecorder, WroteCommit};
use crate::archive::WalArchive;
use crate::named::{DatabaseName, DatabaseOpenOptions};
use crate::stream::{AutoCommitGuard, LiveCursor, QueryStream};
use crate::transaction::{LiveStoreGuard, Transaction, TransactionMode};
pub trait QueryRunner: Send + Sync + 'static {
fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult>;
}
pub struct Database<S> {
pub(crate) store: Arc<RwLock<S>>,
pub(crate) wal: Option<Arc<WalRecorder>>,
}
impl Database<InMemoryGraph> {
pub fn in_memory() -> Self {
Self::from_graph(InMemoryGraph::new())
}
pub fn open_with_wal(wal_config: WalConfig) -> Result<Self> {
match wal_config {
WalConfig::Disabled => Ok(Self::in_memory()),
WalConfig::Enabled {
dir,
sync_mode,
segment_target_bytes,
} => {
let mut graph = InMemoryGraph::new();
let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, Lsn::ZERO)?;
replay_into(&mut graph, events)?;
let recorder = Arc::new(WalRecorder::new(wal));
graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
Ok(Self {
store: Arc::new(RwLock::new(graph)),
wal: Some(recorder),
})
}
}
}
pub fn open_named(
database_name: impl AsRef<str>,
options: DatabaseOpenOptions,
) -> Result<Self> {
let name = DatabaseName::parse(database_name.as_ref())?;
let archive = Arc::new(WalArchive::open(
options.database_path_for(&name),
options.max_database_bytes,
)?);
let mut graph = InMemoryGraph::new();
let (wal, events) = Wal::open(
archive.work_dir(),
options.sync_mode,
options.segment_target_bytes,
Lsn::ZERO,
)?;
replay_into(&mut graph, events)?;
let mirror: Arc<dyn WalMirror> = archive;
let recorder = Arc::new(WalRecorder::new_with_mirror(wal, Some(mirror)));
graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
recorder
.flush()
.map_err(|e| anyhow!("initial database archive persist failed: {e}"))?;
Ok(Self {
store: Arc::new(RwLock::new(graph)),
wal: Some(recorder),
})
}
pub fn begin_transaction(&self, mode: TransactionMode) -> Result<Transaction<'_>> {
let live = match mode {
TransactionMode::ReadOnly => LiveStoreGuard::Read(self.read_store()),
TransactionMode::ReadWrite => LiveStoreGuard::Write(self.write_store()),
};
Ok(Transaction::new(live, self.wal.clone(), mode))
}
pub fn sync(&self) -> Result<()> {
if let Some(wal) = &self.wal {
wal.force_fsync()?;
}
Ok(())
}
pub fn recover(snapshot_path: impl AsRef<Path>, wal_config: WalConfig) -> Result<Self> {
let snapshot_path = snapshot_path.as_ref();
let mut graph = InMemoryGraph::new();
let snapshot_lsn = match File::open(snapshot_path) {
Ok(f) => {
let reader = BufReader::new(f);
let meta = graph.load_snapshot(reader)?;
meta.wal_lsn.map(Lsn::new).unwrap_or(Lsn::ZERO)
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Lsn::ZERO,
Err(e) => return Err(e.into()),
};
match wal_config {
WalConfig::Disabled => Ok(Self::from_graph(graph)),
WalConfig::Enabled {
dir,
sync_mode,
segment_target_bytes,
} => {
if dir.exists() {
if let Ok(outcome) = replay_dir(&dir, Lsn::ZERO) {
if let Some(marker) = outcome.checkpoint_lsn_observed {
if marker > snapshot_lsn {
eprintln!(
"lora-wal: snapshot at LSN {} is older than the newest \
checkpoint marker on disk (LSN {}). Replaying every WAL \
record above LSN {}; consider passing the more recent \
snapshot to --restore-from.",
snapshot_lsn.raw(),
marker.raw(),
snapshot_lsn.raw()
);
}
}
}
}
let (wal, events) = Wal::open(dir, sync_mode, segment_target_bytes, snapshot_lsn)?;
replay_into(&mut graph, events)?;
let recorder = Arc::new(WalRecorder::new(wal));
graph.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
Ok(Self {
store: Arc::new(RwLock::new(graph)),
wal: Some(recorder),
})
}
}
}
pub fn stream(&self, query: &str) -> Result<QueryStream<'_>> {
self.stream_with_params(query, BTreeMap::new())
}
pub fn stream_with_params(
&self,
query: &str,
params: BTreeMap<String, LoraValue>,
) -> Result<QueryStream<'_>> {
let document = parse_query(query)?;
let store_guard = self.read_store();
let resolved = {
let mut analyzer = Analyzer::new(&*store_guard);
analyzer.analyze(&document)?
};
let compiled = Compiler::compile(&resolved);
let columns = compiled_result_columns(&compiled);
let shape = classify_stream(&compiled);
drop(store_guard);
match shape {
StreamShape::ReadOnly => {
let live = LiveCursor::open(self.store.clone(), compiled, params)?;
Ok(QueryStream::live(live, columns))
}
StreamShape::Mutating => {
let mut tx = self.begin_transaction(TransactionMode::ReadWrite)?;
let compiled_arc = Arc::new(compiled);
let cursor =
match tx.open_streaming_compiled_autocommit(compiled_arc.clone(), params) {
Ok(c) => c,
Err(err) => {
return Err(err);
}
};
let guard = AutoCommitGuard {
tx: Some(tx),
finalized: false,
};
Ok(QueryStream::auto_commit(cursor, columns, guard))
}
}
}
pub unsafe fn stream_with_params_owned(
self: &Arc<Self>,
query: &str,
params: BTreeMap<String, LoraValue>,
) -> Result<QueryStream<'static>> {
let stream = self.stream_with_params(query, params)?;
Ok(std::mem::transmute::<QueryStream<'_>, QueryStream<'static>>(stream))
}
}
impl<S> Database<S>
where
S: GraphStorage + GraphStorageMut,
{
pub fn new(store: Arc<RwLock<S>>) -> Self {
Self { store, wal: None }
}
pub fn from_graph(graph: S) -> Self {
Self::new(Arc::new(RwLock::new(graph)))
}
pub fn wal(&self) -> Option<&Arc<WalRecorder>> {
self.wal.as_ref()
}
pub fn store(&self) -> &Arc<RwLock<S>> {
&self.store
}
pub fn parse(&self, query: &str) -> Result<Document> {
Ok(parse_query(query)?)
}
pub(crate) fn read_store(&self) -> RwLockReadGuard<'_, S> {
self.store
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
pub(crate) fn write_store(&self) -> RwLockWriteGuard<'_, S> {
self.store
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
fn read_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockReadGuard<'_, S>> {
let Some(deadline) = deadline else {
return Ok(self.read_store());
};
loop {
match self.store.try_read() {
Ok(guard) => return Ok(guard),
Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
return Err(anyhow!("query deadline exceeded"));
}
Err(TryLockError::WouldBlock) => {
std::thread::sleep(Duration::from_millis(1));
}
}
}
}
fn write_store_deadline(&self, deadline: Option<Instant>) -> Result<RwLockWriteGuard<'_, S>> {
let Some(deadline) = deadline else {
return Ok(self.write_store());
};
loop {
match self.store.try_write() {
Ok(guard) => return Ok(guard),
Err(TryLockError::Poisoned(poisoned)) => return Ok(poisoned.into_inner()),
Err(TryLockError::WouldBlock) if Instant::now() >= deadline => {
return Err(anyhow!("query deadline exceeded"));
}
Err(TryLockError::WouldBlock) => {
std::thread::sleep(Duration::from_millis(1));
}
}
}
}
fn compile_document_against(&self, document: &Document, store: &S) -> Result<CompiledQuery> {
let resolved = {
let mut analyzer = Analyzer::new(store);
analyzer.analyze(document)?
};
Ok(Compiler::compile(&resolved))
}
pub fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
self.execute_with_params(query, options, BTreeMap::new())
}
pub fn execute_with_timeout(
&self,
query: &str,
options: Option<ExecuteOptions>,
timeout: Duration,
) -> Result<QueryResult> {
let deadline = Instant::now()
.checked_add(timeout)
.unwrap_or_else(Instant::now);
let rows =
self.execute_rows_with_params_deadline(query, BTreeMap::new(), Some(deadline))?;
Ok(project_rows(rows, options.unwrap_or_default()))
}
pub fn execute_with_params(
&self,
query: &str,
options: Option<ExecuteOptions>,
params: BTreeMap<String, LoraValue>,
) -> Result<QueryResult> {
let rows = self.execute_rows_with_params_deadline(query, params, None)?;
Ok(project_rows(rows, options.unwrap_or_default()))
}
pub fn execute_with_params_timeout(
&self,
query: &str,
options: Option<ExecuteOptions>,
params: BTreeMap<String, LoraValue>,
timeout: Duration,
) -> Result<QueryResult> {
let deadline = Instant::now()
.checked_add(timeout)
.unwrap_or_else(Instant::now);
let rows = self.execute_rows_with_params_deadline(query, params, Some(deadline))?;
Ok(project_rows(rows, options.unwrap_or_default()))
}
pub fn execute_rows(&self, query: &str) -> Result<Vec<Row>> {
self.execute_rows_with_params(query, BTreeMap::new())
}
pub fn execute_rows_with_params(
&self,
query: &str,
params: BTreeMap<String, LoraValue>,
) -> Result<Vec<Row>> {
self.execute_rows_with_params_deadline(query, params, None)
}
fn execute_rows_with_params_deadline(
&self,
query: &str,
params: BTreeMap<String, LoraValue>,
deadline: Option<Instant>,
) -> Result<Vec<Row>> {
let document = self.parse(query)?;
let shape = {
let store = self.read_store_deadline(deadline)?;
let compiled = self.compile_document_against(&document, &*store)?;
if matches!(classify_stream(&compiled), StreamShape::ReadOnly) {
if let Some(rec) = &self.wal {
if let Some(reason) = rec.poisoned() {
return Err(anyhow!("WAL arm failed: WAL poisoned: {reason}"));
}
}
let executor = lora_executor::Executor::with_deadline(
lora_executor::ExecutionContext {
storage: &*store,
params,
},
deadline,
);
return executor
.execute_compiled_rows(&compiled)
.map_err(|e| anyhow!(e));
}
classify_stream(&compiled)
};
debug_assert!(shape.is_mutating());
let mut store = self.write_store_deadline(deadline)?;
let compiled = self.compile_document_against(&document, &*store)?;
if let Some(rec) = &self.wal {
rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
}
let exec_result: Result<Vec<Row>> = (|| {
let mut executor = MutableExecutor::with_deadline(
MutableExecutionContext {
storage: &mut *store,
params,
},
deadline,
);
Ok(executor.execute_compiled_rows(&compiled)?)
})();
if let Some(rec) = &self.wal {
match &exec_result {
Ok(_) => match rec.commit() {
Ok(WroteCommit::Yes) => {
rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
}
Ok(WroteCommit::No) => {
}
Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
},
Err(_) => {
if matches!(rec.abort(), Ok(true)) {
rec.poison(
"query mutated the live graph before failing; restart from snapshot + WAL required",
);
}
}
}
if let Some(reason) = rec.poisoned() {
return Err(anyhow!("WAL poisoned: {reason}"));
}
}
exec_result
}
pub fn try_clear(&self) -> Result<()> {
let mut guard = self.write_store();
let Some(rec) = &self.wal else {
guard.clear();
return Ok(());
};
rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
guard.clear();
match rec.commit() {
Ok(WroteCommit::Yes) => {
rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
}
Ok(WroteCommit::No) => {}
Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
}
if let Some(reason) = rec.poisoned() {
return Err(anyhow!("WAL poisoned: {reason}"));
}
Ok(())
}
pub fn clear(&self) {
let _ = self.try_clear();
}
pub fn node_count(&self) -> usize {
let guard = self.read_store();
guard.node_count()
}
pub fn relationship_count(&self) -> usize {
let guard = self.read_store();
guard.relationship_count()
}
pub fn with_store<R>(&self, f: impl FnOnce(&S) -> R) -> R {
let guard = self.read_store();
f(&*guard)
}
pub fn with_store_mut<R>(&self, f: impl FnOnce(&mut S) -> R) -> R {
let mut guard = self.write_store();
f(&mut *guard)
}
}
impl<S> Database<S>
where
S: GraphStorage + GraphStorageMut + Snapshotable,
{
pub fn save_snapshot_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
let path = path.as_ref();
let tmp = snapshot_tmp_path(path);
let guard = self.read_store();
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp)?;
let tmp_guard = TempFileGuard::new(tmp.clone());
let mut writer = BufWriter::new(file);
let meta = guard.save_snapshot(&mut writer)?;
use std::io::Write;
writer.flush()?;
let file = writer.into_inner().map_err(|e| e.into_error())?;
file.sync_all()?;
drop(file);
std::fs::rename(&tmp, path)?;
tmp_guard.commit();
if let Some(parent) = path.parent() {
if let Ok(dir) = File::open(parent) {
let _ = dir.sync_all();
}
}
Ok(meta)
}
pub fn load_snapshot_from(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
let file = File::open(path.as_ref())?;
let reader = BufReader::new(file);
let mut guard = self.write_store();
Ok(guard.load_snapshot(reader)?)
}
}
impl Database<InMemoryGraph> {
pub fn in_memory_from_snapshot(path: impl AsRef<Path>) -> Result<Self> {
let db = Self::in_memory();
db.load_snapshot_from(path)?;
Ok(db)
}
pub fn checkpoint_to(&self, path: impl AsRef<Path>) -> Result<SnapshotMeta> {
let recorder = self
.wal
.as_ref()
.ok_or_else(|| anyhow!("checkpoint requires WAL enabled"))?;
let path = path.as_ref();
let tmp = snapshot_tmp_path(path);
let guard = self.write_store();
recorder
.force_fsync()
.map_err(|e| anyhow!("WAL fsync before checkpoint failed: {e}"))?;
let snapshot_lsn = recorder.wal().durable_lsn();
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp)?;
let tmp_guard = TempFileGuard::new(tmp.clone());
let mut writer = BufWriter::new(file);
let meta = guard.save_checkpoint(&mut writer, snapshot_lsn.raw())?;
use std::io::Write;
writer.flush()?;
let file = writer.into_inner().map_err(|e| e.into_error())?;
file.sync_all()?;
drop(file);
std::fs::rename(&tmp, path)?;
tmp_guard.commit();
if let Some(parent) = path.parent() {
if let Ok(dir) = File::open(parent) {
let _ = dir.sync_all();
}
}
recorder
.checkpoint_marker(snapshot_lsn)
.map_err(|e| anyhow!("WAL checkpoint marker failed: {e}"))?;
recorder
.force_fsync()
.map_err(|e| anyhow!("WAL fsync after checkpoint marker failed: {e}"))?;
let _ = recorder.truncate_up_to(snapshot_lsn);
Ok(meta)
}
}
fn snapshot_tmp_path(target: &Path) -> PathBuf {
let mut tmp = target.as_os_str().to_owned();
tmp.push(".tmp");
PathBuf::from(tmp)
}
struct TempFileGuard {
path: Option<PathBuf>,
}
impl TempFileGuard {
fn new(path: PathBuf) -> Self {
Self { path: Some(path) }
}
fn commit(mut self) {
self.path.take();
}
}
impl Drop for TempFileGuard {
fn drop(&mut self) {
if let Some(path) = self.path.take() {
let _ = std::fs::remove_file(path);
}
}
}
pub trait SnapshotAdmin: Send + Sync + 'static {
fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta>;
}
impl<S> SnapshotAdmin for Database<S>
where
S: GraphStorage + GraphStorageMut + Snapshotable + Send + Sync + 'static,
{
fn save_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
self.save_snapshot_to(path)
}
fn load_snapshot(&self, path: &Path) -> Result<SnapshotMeta> {
self.load_snapshot_from(path)
}
}
pub trait WalAdmin: Send + Sync + 'static {
fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta>;
fn wal_status(&self) -> Result<WalStatus>;
fn wal_truncate(&self, fence_lsn: u64) -> Result<()>;
}
#[derive(Debug, Clone)]
pub struct WalStatus {
pub durable_lsn: u64,
pub next_lsn: u64,
pub active_segment_id: u64,
pub oldest_segment_id: u64,
pub bg_failure: Option<String>,
}
impl WalAdmin for Database<InMemoryGraph> {
fn checkpoint(&self, path: &Path) -> Result<SnapshotMeta> {
self.checkpoint_to(path)
}
fn wal_status(&self) -> Result<WalStatus> {
let recorder = self
.wal
.as_ref()
.ok_or_else(|| anyhow!("WAL not enabled"))?;
let wal = recorder.wal();
Ok(WalStatus {
durable_lsn: wal.durable_lsn().raw(),
next_lsn: wal.next_lsn().raw(),
active_segment_id: wal.active_segment_id(),
oldest_segment_id: wal.oldest_segment_id(),
bg_failure: wal.bg_failure(),
})
}
fn wal_truncate(&self, fence_lsn: u64) -> Result<()> {
let recorder = self
.wal
.as_ref()
.ok_or_else(|| anyhow!("WAL not enabled"))?;
recorder.truncate_up_to(Lsn::new(fence_lsn))?;
Ok(())
}
}
impl<S> QueryRunner for Database<S>
where
S: GraphStorage + GraphStorageMut + Send + Sync + 'static,
{
fn execute(&self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
Database::execute(self, query, options)
}
}
fn replay_into(graph: &mut InMemoryGraph, events: Vec<MutationEvent>) -> Result<()> {
for (idx, event) in events.into_iter().enumerate() {
match event {
MutationEvent::CreateNode {
id,
labels,
properties,
} => {
graph
.replay_create_node(id, labels, properties)
.map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
}
MutationEvent::CreateRelationship {
id,
src,
dst,
rel_type,
properties,
} => {
graph
.replay_create_relationship(id, src, dst, &rel_type, properties)
.map_err(|e| anyhow!("WAL replay failed at event {idx}: {e}"))?;
}
MutationEvent::SetNodeProperty {
node_id,
key,
value,
} => {
if !graph.set_node_property(node_id, key, value) {
return Err(anyhow!(
"WAL replay failed at event {idx}: missing node {node_id} for property set"
));
}
}
MutationEvent::RemoveNodeProperty { node_id, key } => {
if !graph.remove_node_property(node_id, &key) {
return Err(anyhow!(
"WAL replay failed at event {idx}: missing node {node_id} for property removal"
));
}
}
MutationEvent::AddNodeLabel { node_id, label } => {
if !graph.add_node_label(node_id, &label) {
return Err(anyhow!(
"WAL replay failed at event {idx}: missing node {node_id} for label add"
));
}
}
MutationEvent::RemoveNodeLabel { node_id, label } => {
if !graph.remove_node_label(node_id, &label) {
return Err(anyhow!(
"WAL replay failed at event {idx}: missing node {node_id} for label removal"
));
}
}
MutationEvent::SetRelationshipProperty { rel_id, key, value } => {
if !graph.set_relationship_property(rel_id, key, value) {
return Err(anyhow!(
"WAL replay failed at event {idx}: missing relationship {rel_id} for property set"
));
}
}
MutationEvent::RemoveRelationshipProperty { rel_id, key } => {
if !graph.remove_relationship_property(rel_id, &key) {
return Err(anyhow!(
"WAL replay failed at event {idx}: missing relationship {rel_id} for property removal"
));
}
}
MutationEvent::DeleteRelationship { rel_id } => {
if !graph.delete_relationship(rel_id) {
return Err(anyhow!(
"WAL replay failed at event {idx}: missing relationship {rel_id} for delete"
));
}
}
MutationEvent::DeleteNode { node_id } => {
if !graph.delete_node(node_id) {
return Err(anyhow!(
"WAL replay failed at event {idx}: missing or attached node {node_id} for delete"
));
}
}
MutationEvent::DetachDeleteNode { node_id } => {
graph.detach_delete_node(node_id);
}
MutationEvent::Clear => {
graph.clear();
}
}
}
Ok(())
}