use std::collections::BTreeMap;
use std::sync::{Arc, Mutex, MutexGuard, RwLockReadGuard, RwLockWriteGuard};
use std::time::{Duration, Instant};
use anyhow::{anyhow, Result};
use lora_analyzer::Analyzer;
use lora_compiler::{CompiledQuery, Compiler};
use lora_executor::{
classify_stream, compiled_result_columns, project_rows, ExecuteOptions, ExecutionContext,
Executor, LoraValue, MutableExecutionContext, MutableExecutor, MutablePullExecutor,
PullExecutor, QueryResult, Row, RowSource,
};
use lora_parser::parse_query;
use lora_store::{InMemoryGraph, MutationEvent, MutationRecorder};
use lora_wal::{WalRecorder, WroteCommit};
use crate::stream::QueryStream;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionMode {
ReadOnly,
ReadWrite,
}
pub(crate) enum LiveStoreGuard<'db> {
Read(RwLockReadGuard<'db, InMemoryGraph>),
Write(RwLockWriteGuard<'db, InMemoryGraph>),
}
impl LiveStoreGuard<'_> {
fn as_graph(&self) -> &InMemoryGraph {
match self {
Self::Read(guard) => guard,
Self::Write(guard) => guard,
}
}
fn as_graph_mut(&mut self) -> Option<&mut InMemoryGraph> {
match self {
Self::Read(_) => None,
Self::Write(guard) => Some(guard),
}
}
}
pub(crate) struct Savepoint {
staged: Option<InMemoryGraph>,
buffer_len: usize,
}
struct BufferingRecorder {
buffer: Arc<Mutex<Vec<MutationEvent>>>,
}
impl BufferingRecorder {
fn new(buffer: Arc<Mutex<Vec<MutationEvent>>>) -> Self {
Self { buffer }
}
}
impl MutationRecorder for BufferingRecorder {
fn record(&self, event: &MutationEvent) {
if let Ok(mut buf) = self.buffer.lock() {
buf.push(event.clone());
}
}
}
pub(crate) struct TxInner {
pub(crate) staged: Option<InMemoryGraph>,
pub(crate) buffer: Arc<Mutex<Vec<MutationEvent>>>,
pub(crate) pending_savepoint: Option<Savepoint>,
pub(crate) cursor_active: bool,
pub(crate) cursor_dropped_dirty: bool,
pub(crate) closed: bool,
pub(crate) mode: TransactionMode,
pub(crate) buffer_mutations: bool,
}
pub struct Transaction<'db> {
pub(crate) live: Option<LiveStoreGuard<'db>>,
pub(crate) inner: Arc<Mutex<TxInner>>,
pub(crate) wal: Option<Arc<WalRecorder>>,
mode: TransactionMode,
}
impl<'db> Transaction<'db> {
pub(crate) fn new(
live: LiveStoreGuard<'db>,
wal: Option<Arc<WalRecorder>>,
mode: TransactionMode,
) -> Self {
let buffer_mutations = wal.is_some();
let inner = TxInner {
staged: None,
buffer: Arc::new(Mutex::new(Vec::new())),
pending_savepoint: None,
cursor_active: false,
cursor_dropped_dirty: false,
closed: false,
mode,
buffer_mutations,
};
Self {
live: Some(live),
inner: Arc::new(Mutex::new(inner)),
wal,
mode,
}
}
pub fn mode(&self) -> TransactionMode {
self.mode
}
pub fn execute(&mut self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
self.execute_with_params(query, options, BTreeMap::new())
}
pub fn execute_with_timeout(
&mut self,
query: &str,
options: Option<ExecuteOptions>,
timeout: Duration,
) -> Result<QueryResult> {
let deadline = Instant::now()
.checked_add(timeout)
.unwrap_or_else(Instant::now);
let rows =
self.execute_rows_with_params_deadline(query, BTreeMap::new(), Some(deadline))?;
Ok(project_rows(rows, options.unwrap_or_default()))
}
pub fn execute_with_params(
&mut self,
query: &str,
options: Option<ExecuteOptions>,
params: BTreeMap<String, LoraValue>,
) -> Result<QueryResult> {
let rows = self.execute_rows_with_params_deadline(query, params, None)?;
Ok(project_rows(rows, options.unwrap_or_default()))
}
pub fn execute_with_params_timeout(
&mut self,
query: &str,
options: Option<ExecuteOptions>,
params: BTreeMap<String, LoraValue>,
timeout: Duration,
) -> Result<QueryResult> {
let deadline = Instant::now()
.checked_add(timeout)
.unwrap_or_else(Instant::now);
let rows = self.execute_rows_with_params_deadline(query, params, Some(deadline))?;
Ok(project_rows(rows, options.unwrap_or_default()))
}
pub fn execute_rows(&mut self, query: &str) -> Result<Vec<Row>> {
self.execute_rows_with_params(query, BTreeMap::new())
}
pub fn execute_rows_with_params(
&mut self,
query: &str,
params: BTreeMap<String, LoraValue>,
) -> Result<Vec<Row>> {
self.execute_rows_with_params_deadline(query, params, None)
}
fn execute_rows_with_params_deadline(
&mut self,
query: &str,
params: BTreeMap<String, LoraValue>,
deadline: Option<Instant>,
) -> Result<Vec<Row>> {
let compiled = self.compile_in_tx(query)?;
self.execute_rows_compiled_deadline(&compiled, params, deadline)
}
fn execute_rows_compiled_deadline(
&mut self,
compiled: &CompiledQuery,
params: BTreeMap<String, LoraValue>,
deadline: Option<Instant>,
) -> Result<Vec<Row>> {
if self.is_read_only_unchecked() {
self.precheck_open_no_savepoint()?;
let live = self
.live
.as_ref()
.ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
let storage = live.as_graph();
let executor = Executor::with_deadline(ExecutionContext { storage, params }, deadline);
return executor
.execute_compiled_rows(compiled)
.map_err(|e| anyhow!(e));
}
let mut inner = self.begin_statement()?;
let is_mutating = classify_stream(compiled).is_mutating();
if !is_mutating {
return match inner.staged.as_ref() {
Some(staged) => {
let executor = Executor::with_deadline(
ExecutionContext {
storage: staged,
params,
},
deadline,
);
executor
.execute_compiled_rows(compiled)
.map_err(|e| anyhow!(e))
}
None => {
drop(inner);
let live = self
.live
.as_ref()
.ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
let storage = live.as_graph();
let executor =
Executor::with_deadline(ExecutionContext { storage, params }, deadline);
executor
.execute_compiled_rows(compiled)
.map_err(|e| anyhow!(e))
}
};
}
let clone_savepoint_graph = inner.staged.is_some();
self.ensure_staged_locked(&mut inner)?;
let savepoint = Some(take_savepoint(&inner, clone_savepoint_graph));
let exec_result: ExecResultRows = {
let staged = inner.staged_mut()?;
let mut executor = MutableExecutor::with_deadline(
MutableExecutionContext {
storage: staged,
params,
},
deadline,
);
executor
.execute_compiled_rows(compiled)
.map_err(|e| anyhow!(e))
};
match exec_result {
Ok(rows) => Ok(rows),
Err(err) => {
restore_savepoint(&mut inner, savepoint);
Err(err)
}
}
}
pub(crate) fn open_streaming_compiled_autocommit(
&mut self,
compiled: Arc<CompiledQuery>,
params: BTreeMap<String, LoraValue>,
) -> Result<Box<dyn RowSource + 'static>> {
if self.is_read_only_unchecked() {
return Err(anyhow!(
"streaming write cursor requires a ReadWrite transaction"
));
}
let mut inner = self.begin_statement()?;
self.ensure_staged_locked(&mut inner)?;
inner.cursor_active = true;
let staged_ptr: *mut InMemoryGraph = inner
.staged
.as_mut()
.expect("ensure_staged_locked guarantees Some")
as *mut _;
drop(inner);
let storage_static: &'static mut InMemoryGraph = unsafe { &mut *staged_ptr };
let compiled_static: &'static CompiledQuery =
unsafe { std::mem::transmute::<&CompiledQuery, _>(compiled.as_ref()) };
let cursor = MutablePullExecutor::new(storage_static, params)
.open_compiled(compiled_static)
.map_err(|e| {
if let Ok(mut inner) = self.inner.lock() {
discard_transaction_state(&mut inner);
}
self.live.take();
anyhow!(e)
})?;
Ok(Box::new(StreamingCursorWithArc {
cursor,
_compiled: compiled,
}))
}
fn compile_in_tx(&self, query: &str) -> Result<CompiledQuery> {
let document = parse_query(query)?;
let resolved = {
let inner = self.lock_inner_unchecked();
if let Some(staged) = &inner.staged {
let mut analyzer = Analyzer::new(staged);
analyzer.analyze(&document)?
} else {
drop(inner);
let live = self
.live
.as_ref()
.ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
let mut analyzer = Analyzer::new(live.as_graph());
analyzer.analyze(&document)?
}
};
Ok(Compiler::compile(&resolved))
}
fn ensure_staged_locked(&self, inner: &mut MutexGuard<'_, TxInner>) -> Result<()> {
if inner.staged.is_some() {
return Ok(());
}
let live = self
.live
.as_ref()
.ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
let mut staged: InMemoryGraph = live.as_graph().clone();
if matches!(inner.mode, TransactionMode::ReadWrite) && inner.buffer_mutations {
staged.set_mutation_recorder(Some(
Arc::new(BufferingRecorder::new(inner.buffer.clone())) as Arc<dyn MutationRecorder>,
));
}
inner.staged = Some(staged);
Ok(())
}
pub fn stream(&mut self, query: &str) -> Result<QueryStream<'static>> {
self.stream_with_params(query, BTreeMap::new())
}
pub fn stream_with_params(
&mut self,
query: &str,
params: BTreeMap<String, LoraValue>,
) -> Result<QueryStream<'static>> {
let compiled = Arc::new(self.compile_in_tx(query)?);
let columns = compiled_result_columns(&compiled);
self.stream_compiled(compiled, columns, params)
}
pub(crate) fn stream_compiled(
&mut self,
compiled: Arc<CompiledQuery>,
columns: Vec<String>,
params: BTreeMap<String, LoraValue>,
) -> Result<QueryStream<'static>> {
let mut inner = self.begin_statement()?;
let is_mutating = classify_stream(&compiled).is_mutating();
if matches!(inner.mode, TransactionMode::ReadOnly) && is_mutating {
return Err(anyhow!(
"cannot execute mutating query in read-only transaction"
));
}
let clone_savepoint_graph = inner.staged.is_some();
self.ensure_staged_locked(&mut inner)?;
inner.cursor_active = true;
let rollback_on_drop = is_mutating;
if rollback_on_drop {
inner.pending_savepoint = Some(take_savepoint(&inner, clone_savepoint_graph));
} else {
inner.pending_savepoint = None;
}
let staged_ptr: *mut InMemoryGraph = inner
.staged
.as_mut()
.expect("ensure_staged_locked guarantees Some")
as *mut _;
drop(inner);
let compiled_static: &'static CompiledQuery =
unsafe { std::mem::transmute::<&CompiledQuery, _>(compiled.as_ref()) };
let cursor: Result<Box<dyn RowSource + 'static>> = if is_mutating {
let storage_static: &'static mut InMemoryGraph = unsafe { &mut *staged_ptr };
MutablePullExecutor::new(storage_static, params)
.open_compiled(compiled_static)
.map(|cursor| {
Box::new(StreamingCursorWithArc {
cursor,
_compiled: compiled.clone(),
}) as Box<dyn RowSource + 'static>
})
.map_err(|e| anyhow!(e))
} else {
let storage_static: &'static InMemoryGraph = unsafe { &*staged_ptr };
PullExecutor::new(storage_static, params)
.open_compiled(compiled_static)
.map(|cursor| {
Box::new(StreamingCursorWithArc {
cursor,
_compiled: compiled.clone(),
}) as Box<dyn RowSource + 'static>
})
.map_err(|e| anyhow!(e))
};
match cursor {
Ok(cursor) => Ok(QueryStream::for_tx_cursor(
cursor,
columns,
self.inner.clone(),
rollback_on_drop,
)),
Err(err) => {
finalize_tx_stream(&self.inner, false, rollback_on_drop);
Err(err)
}
}
}
pub fn commit(mut self) -> Result<()> {
let (staged, buffer_events, mode) = {
let mut inner = self.inner.lock().unwrap();
if inner.cursor_active {
return Err(anyhow!(
"cannot commit transaction while a streaming cursor is still active"
));
}
if inner.cursor_dropped_dirty {
if let Some(sp) = inner.pending_savepoint.take() {
apply_savepoint(&mut inner, sp);
}
inner.cursor_dropped_dirty = false;
}
if inner.closed {
return Err(anyhow!("transaction is already closed"));
}
let mode = inner.mode;
let staged = inner.staged.take();
let buffer_events = std::mem::take(&mut *inner.buffer.lock().unwrap());
inner.closed = true;
(staged, buffer_events, mode)
};
if let Some(rec) = &self.wal {
if matches!(mode, TransactionMode::ReadWrite) && !buffer_events.is_empty() {
rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
for event in &buffer_events {
rec.record(event);
if let Some(reason) = rec.poisoned() {
return Err(anyhow!("WAL poisoned during commit replay: {reason}"));
}
}
match rec.commit() {
Ok(WroteCommit::Yes) => {
rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
}
Ok(WroteCommit::No) => {}
Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
}
if let Some(reason) = rec.poisoned() {
return Err(anyhow!("WAL poisoned: {reason}"));
}
}
}
if matches!(mode, TransactionMode::ReadWrite) {
if let Some(mut staged) = staged {
staged.set_mutation_recorder(None);
if let Some(rec) = &self.wal {
staged.set_mutation_recorder(Some(rec.clone() as Arc<dyn MutationRecorder>));
}
let live = self
.live
.as_mut()
.ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
let live = live
.as_graph_mut()
.ok_or_else(|| anyhow!("read-only transaction cannot publish staged graph"))?;
*live = staged;
}
}
self.live.take();
Ok(())
}
pub fn rollback(mut self) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
if inner.closed {
return Err(anyhow!("transaction is already closed"));
}
discard_transaction_state(&mut inner);
drop(inner);
self.live.take();
Ok(())
}
fn begin_statement(&self) -> Result<MutexGuard<'_, TxInner>> {
let mut inner = self.inner.lock().unwrap();
if inner.closed {
return Err(anyhow!("transaction is already closed"));
}
if inner.cursor_active {
return Err(anyhow!(
"cannot start a new statement while a streaming cursor is still active"
));
}
if inner.cursor_dropped_dirty {
if let Some(sp) = inner.pending_savepoint.take() {
apply_savepoint(&mut inner, sp);
}
inner.cursor_dropped_dirty = false;
}
Ok(inner)
}
fn precheck_open_no_savepoint(&self) -> Result<()> {
let inner = self.inner.lock().unwrap();
if inner.closed {
return Err(anyhow!("transaction is already closed"));
}
if inner.cursor_active {
return Err(anyhow!(
"cannot start a new statement while a streaming cursor is still active"
));
}
Ok(())
}
fn is_read_only_unchecked(&self) -> bool {
matches!(self.mode, TransactionMode::ReadOnly)
}
fn lock_inner_unchecked(&self) -> MutexGuard<'_, TxInner> {
self.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
}
type ExecResultRows = Result<Vec<Row>>;
impl TxInner {
fn staged_mut(&mut self) -> Result<&mut InMemoryGraph> {
self.staged
.as_mut()
.ok_or_else(|| anyhow!("transaction has no staged graph"))
}
}
struct StreamingCursorWithArc {
cursor: Box<dyn RowSource + 'static>,
_compiled: Arc<CompiledQuery>,
}
impl RowSource for StreamingCursorWithArc {
fn next_row(&mut self) -> lora_executor::ExecResult<Option<Row>> {
self.cursor.next_row()
}
}
pub(crate) fn finalize_tx_stream(
handle: &Arc<Mutex<TxInner>>,
exhausted: bool,
rollback_on_drop: bool,
) {
if let Ok(mut inner) = handle.lock() {
inner.cursor_active = false;
if inner.closed {
discard_transaction_state(&mut inner);
return;
}
if exhausted || !rollback_on_drop {
inner.pending_savepoint = None;
inner.cursor_dropped_dirty = false;
return;
}
if let Some(sp) = inner.pending_savepoint.take() {
apply_savepoint(&mut inner, sp);
}
inner.cursor_dropped_dirty = false;
}
}
fn discard_transaction_state(inner: &mut TxInner) {
inner.pending_savepoint = None;
inner.cursor_dropped_dirty = false;
inner.cursor_active = false;
inner.staged = None;
if let Ok(mut buf) = inner.buffer.lock() {
buf.clear();
}
inner.closed = true;
}
fn take_savepoint(inner: &TxInner, clone_staged: bool) -> Savepoint {
let buffer_len = inner.buffer.lock().ok().map(|b| b.len()).unwrap_or(0);
Savepoint {
staged: if clone_staged {
inner.staged.as_ref().cloned()
} else {
None
},
buffer_len,
}
}
fn restore_savepoint(inner: &mut TxInner, savepoint: Option<Savepoint>) {
if let Some(sp) = savepoint {
apply_savepoint(inner, sp);
}
}
fn apply_savepoint(inner: &mut TxInner, sp: Savepoint) {
if let Ok(mut buf) = inner.buffer.lock() {
buf.truncate(sp.buffer_len);
}
let Some(mut graph) = sp.staged else {
inner.staged = None;
return;
};
if matches!(inner.mode, TransactionMode::ReadWrite) && inner.buffer_mutations {
graph.set_mutation_recorder(Some(
Arc::new(BufferingRecorder::new(inner.buffer.clone())) as Arc<dyn MutationRecorder>
));
}
inner.staged = Some(graph);
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
if let Ok(mut inner) = self.inner.lock() {
if !inner.closed {
if inner.cursor_active {
inner.closed = true;
} else {
discard_transaction_state(&mut inner);
}
}
}
}
}