use std::collections::BTreeMap;
use std::mem::ManuallyDrop;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use anyhow::{anyhow, Result};
use lora_compiler::CompiledQuery;
use lora_executor::{ExecResult, LoraValue, PullExecutor, Row, RowSource};
use lora_store::InMemoryGraph;
use crate::transaction::{finalize_tx_stream, Transaction, TxInner};
pub struct QueryStream<'a> {
columns: Vec<String>,
inner: StreamInner<'a>,
}
enum StreamInner<'a> {
Tx {
cursor: Option<Box<dyn RowSource + 'static>>,
state: StreamState,
tx_handle: Arc<Mutex<TxInner>>,
rollback_on_drop: bool,
},
Live {
cursor: LiveCursor,
state: StreamState,
_phantom: std::marker::PhantomData<&'a ()>,
},
AutoCommit {
cursor: Option<Box<dyn lora_executor::RowSource + 'static>>,
state: StreamState,
guard: AutoCommitGuard<'a>,
},
}
pub(crate) struct LiveCursor {
cursor: ManuallyDrop<Box<dyn RowSource + 'static>>,
guard: ManuallyDrop<RwLockReadGuard<'static, InMemoryGraph>>,
_store: Arc<RwLock<InMemoryGraph>>,
_compiled: Box<CompiledQuery>,
}
impl LiveCursor {
pub(crate) fn open(
store: Arc<RwLock<InMemoryGraph>>,
compiled: CompiledQuery,
params: BTreeMap<String, LoraValue>,
) -> Result<Self> {
let compiled = Box::new(compiled);
let guard = store
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let guard: RwLockReadGuard<'static, InMemoryGraph> = unsafe { std::mem::transmute(guard) };
let storage_ref: &'static InMemoryGraph =
unsafe { std::mem::transmute::<&InMemoryGraph, _>(&*guard) };
let compiled_ref: &'static CompiledQuery =
unsafe { std::mem::transmute::<&CompiledQuery, _>(&*compiled) };
let cursor = PullExecutor::new(storage_ref, params)
.open_compiled(compiled_ref)
.map_err(|e| anyhow!(e))?;
Ok(Self {
cursor: ManuallyDrop::new(cursor),
guard: ManuallyDrop::new(guard),
_store: store,
_compiled: compiled,
})
}
fn next_row(&mut self) -> ExecResult<Option<Row>> {
self.cursor.next_row()
}
}
impl Drop for LiveCursor {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.cursor);
ManuallyDrop::drop(&mut self.guard);
}
}
}
pub(crate) struct AutoCommitGuard<'a> {
pub(crate) tx: Option<Transaction<'a>>,
pub(crate) finalized: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum StreamState {
Active,
Exhausted,
Errored,
}
impl<'a> std::fmt::Debug for QueryStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = match &self.inner {
StreamInner::Tx { state, .. }
| StreamInner::AutoCommit { state, .. }
| StreamInner::Live { state, .. } => *state,
};
f.debug_struct("QueryStream")
.field("columns", &self.columns)
.field("state", &state)
.finish()
}
}
impl<'a> QueryStream<'a> {
pub(crate) fn for_tx_cursor(
cursor: Box<dyn RowSource + 'static>,
columns: Vec<String>,
tx_handle: Arc<Mutex<TxInner>>,
rollback_on_drop: bool,
) -> Self {
Self {
columns,
inner: StreamInner::Tx {
cursor: Some(cursor),
state: StreamState::Active,
tx_handle,
rollback_on_drop,
},
}
}
pub(crate) fn auto_commit(
cursor: Box<dyn lora_executor::RowSource + 'static>,
columns: Vec<String>,
guard: AutoCommitGuard<'a>,
) -> Self {
Self {
columns,
inner: StreamInner::AutoCommit {
cursor: Some(cursor),
state: StreamState::Active,
guard,
},
}
}
pub(crate) fn live(cursor: LiveCursor, columns: Vec<String>) -> Self {
Self {
columns,
inner: StreamInner::Live {
cursor,
state: StreamState::Active,
_phantom: std::marker::PhantomData,
},
}
}
pub fn columns(&self) -> &[String] {
&self.columns
}
pub fn next_row(&mut self) -> Result<Option<Row>> {
match &mut self.inner {
StreamInner::Live { state, cursor, .. } => match *state {
StreamState::Errored => Err(anyhow!("query stream errored")),
StreamState::Exhausted => Ok(None),
StreamState::Active => match cursor.next_row() {
Ok(Some(row)) => Ok(Some(row)),
Ok(None) => {
*state = StreamState::Exhausted;
Ok(None)
}
Err(e) => {
*state = StreamState::Errored;
Err(anyhow!(e))
}
},
},
StreamInner::Tx {
state,
cursor,
tx_handle,
rollback_on_drop,
} => match *state {
StreamState::Errored => Err(anyhow!("query stream errored")),
StreamState::Exhausted => Ok(None),
StreamState::Active => {
let pull = match cursor.as_mut() {
Some(c) => c.next_row(),
None => {
*state = StreamState::Errored;
return Err(anyhow!("transaction cursor missing"));
}
};
match pull {
Ok(Some(row)) => Ok(Some(row)),
Ok(None) => {
cursor.take();
finalize_tx_stream(tx_handle, true, *rollback_on_drop);
*state = StreamState::Exhausted;
Ok(None)
}
Err(e) => {
cursor.take();
finalize_tx_stream(tx_handle, false, *rollback_on_drop);
*state = StreamState::Errored;
Err(anyhow!(e))
}
}
}
},
StreamInner::AutoCommit {
state,
cursor,
guard,
} => match *state {
StreamState::Errored => Err(anyhow!("query stream errored")),
StreamState::Exhausted => Ok(None),
StreamState::Active => {
let pull = match cursor.as_mut() {
Some(c) => c.next_row(),
None => {
*state = StreamState::Errored;
return Err(anyhow!("auto-commit cursor missing"));
}
};
match pull {
Ok(Some(row)) => Ok(Some(row)),
Ok(None) => {
cursor.take();
match guard.commit() {
Ok(()) => {
*state = StreamState::Exhausted;
Ok(None)
}
Err(e) => {
*state = StreamState::Errored;
Err(e)
}
}
}
Err(e) => {
cursor.take();
guard.rollback();
*state = StreamState::Errored;
Err(anyhow!(e))
}
}
}
},
}
}
fn is_exhausted(&self) -> bool {
match &self.inner {
StreamInner::Tx { state, .. }
| StreamInner::AutoCommit { state, .. }
| StreamInner::Live { state, .. } => matches!(state, StreamState::Exhausted),
}
}
}
impl<'a> Iterator for QueryStream<'a> {
type Item = Row;
fn next(&mut self) -> Option<Self::Item> {
match self.next_row() {
Ok(Some(row)) => Some(row),
Ok(None) => None,
Err(_) => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.inner {
StreamInner::Live { .. } | StreamInner::Tx { .. } | StreamInner::AutoCommit { .. } => {
(0, None)
}
}
}
}
impl<'a> Drop for QueryStream<'a> {
fn drop(&mut self) {
let exhausted = self.is_exhausted();
match &mut self.inner {
StreamInner::Tx {
cursor,
tx_handle,
rollback_on_drop,
..
} => {
cursor.take();
finalize_tx_stream(tx_handle, exhausted, *rollback_on_drop);
}
StreamInner::Live { .. } => {
}
StreamInner::AutoCommit {
state,
cursor,
guard,
} => {
cursor.take();
if !guard.finalized && !matches!(state, StreamState::Exhausted) {
guard.rollback();
}
}
}
}
}
impl<'a> AutoCommitGuard<'a> {
fn commit(&mut self) -> Result<()> {
if self.finalized {
return Ok(());
}
self.finalized = true;
match self.tx.take() {
Some(tx) => {
if let Ok(mut inner) = tx.inner.lock() {
inner.cursor_active = false;
}
tx.commit()
}
None => Ok(()),
}
}
fn rollback(&mut self) {
if self.finalized {
return;
}
self.finalized = true;
if let Some(tx) = self.tx.take() {
if let Ok(mut inner) = tx.inner.lock() {
inner.cursor_active = false;
}
let _ = tx.rollback();
}
}
}