lora_database/stream.rs
1use std::collections::BTreeMap;
2use std::mem::ManuallyDrop;
3use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
4
5use anyhow::{anyhow, Result};
6use lora_compiler::CompiledQuery;
7use lora_executor::{ExecResult, LoraValue, PullExecutor, Row, RowSource};
8use lora_store::InMemoryGraph;
9
10use crate::transaction::{finalize_tx_stream, Transaction, TxInner};
11
12/// Owning row stream returned by [`crate::Database::stream`] and transaction
13/// streaming methods.
14///
15/// The cursor is fallible (`next_row()` surfaces execution errors)
16/// and exposes plan-derived column names populated even for empty
17/// results. The lifetime parameter `'a` is bound to the source the
18/// cursor borrows from — typically the database for auto-commit
19/// write streams that hold the live write guard until exhaustion or
20/// drop. Read-only and transaction-bound streams need no live
21/// borrow and use `'static` (the buffered variant).
22pub struct QueryStream<'a> {
23 columns: Vec<String>,
24 inner: StreamInner<'a>,
25}
26
27enum StreamInner<'a> {
28 /// Transaction-bound streaming cursor. The cursor borrows from
29 /// the transaction's staged graph, which is kept alive by
30 /// `tx_handle`; finalization releases the cursor token and
31 /// either clears or restores the pending statement savepoint.
32 Tx {
33 cursor: Option<Box<dyn RowSource + 'static>>,
34 state: StreamState,
35 tx_handle: Arc<Mutex<TxInner>>,
36 rollback_on_drop: bool,
37 },
38 /// True pull-based read-only stream. Holds a live store read
39 /// lock through the cursor's lifetime and emits rows as the
40 /// caller pulls them, without any intermediate
41 /// materialization. Backed by a [`LiveCursor`] which uses
42 /// `self_cell` to safely co-own the lock guard and the
43 /// borrowing cursor.
44 Live {
45 cursor: LiveCursor,
46 state: StreamState,
47 // The 'a parameter is unused for this variant — the
48 // self-cell hides the borrow. We carry a phantom to keep
49 // the enum's lifetime parameter consistent with the
50 // other variants.
51 _phantom: std::marker::PhantomData<&'a ()>,
52 },
53 /// Auto-commit write stream backed by a hidden staged
54 /// transaction. The graph is mutated on a clone held in
55 /// `guard.tx.inner.staged`; the live store write lock stays locked
56 /// through the tx's `live` guard so no other writer races. On
57 /// full exhaustion the staged graph is published and the WAL
58 /// replays the buffered events; on premature drop or error the
59 /// staged graph and buffer are discarded and the live store is
60 /// untouched.
61 ///
62 /// `cursor` is a streaming `RowSource` that may apply mutations
63 /// row-by-row (via `StreamingWriteCursor`) or yield from a
64 /// pre-materialized buffer (via `BufferedRowSource`); see
65 /// `Transaction::open_streaming_compiled_autocommit`. It is
66 /// taken and dropped before the guard's commit/rollback so any
67 /// borrows back into the staged graph are released first.
68 AutoCommit {
69 cursor: Option<Box<dyn lora_executor::RowSource + 'static>>,
70 state: StreamState,
71 guard: AutoCommitGuard<'a>,
72 },
73}
74
75/// Self-referential cursor that pulls rows directly from the live
76/// store. The `Arc<RwLock<...>>` keeps the storage alive, the
77/// `RwLockReadGuard` keeps it locked, and the boxed `RowSource`
78/// borrows from the locked storage. Drop releases them in
79/// declaration order — `cursor` first, then `guard`, finally the
80/// `Arc` — so the cursor never sees a dropped guard and the
81/// guard never sees a dropped lock.
82///
83/// `self_cell` can't model this because the cursor borrows from
84/// the guard's deref while the guard itself borrows from the
85/// owner — two levels of nested borrow inside the dependent. The
86/// unsafe scope is small (one constructor, one drop) and
87/// fully encapsulated; nothing outside this module can observe
88/// the lifetime extension.
89pub(crate) struct LiveCursor {
90 /// SAFETY invariant: borrows from `*guard`. Must drop before
91 /// `guard`.
92 cursor: ManuallyDrop<Box<dyn RowSource + 'static>>,
93 /// SAFETY invariant: borrows from the `RwLock` inside `_store`.
94 /// Must drop before `_store`.
95 guard: ManuallyDrop<RwLockReadGuard<'static, InMemoryGraph>>,
96 /// Keeps the underlying `RwLock` alive. Dropped after `guard`.
97 _store: Arc<RwLock<InMemoryGraph>>,
98 /// Keeps the compiled plan alive — operator sources hold
99 /// references into it (e.g. predicate `ResolvedExpr`s). Boxed
100 /// so the plan address is stable across the move into the
101 /// struct.
102 _compiled: Box<CompiledQuery>,
103}
104
105impl LiveCursor {
106 /// Lock the live store and open a streaming cursor against
107 /// the given compiled query. Internal helper for
108 /// `Database::stream_with_params` — never expose the
109 /// constructed `LiveCursor` to callers without the
110 /// surrounding `QueryStream`, which makes the `'static`
111 /// transmutes invisible.
112 pub(crate) fn open(
113 store: Arc<RwLock<InMemoryGraph>>,
114 compiled: CompiledQuery,
115 params: BTreeMap<String, LoraValue>,
116 ) -> Result<Self> {
117 let compiled = Box::new(compiled);
118
119 let guard = store
120 .read()
121 .unwrap_or_else(|poisoned| poisoned.into_inner());
122
123 // SAFETY: We extend the lifetime of the guard and the
124 // borrows into `*guard` / `*compiled` to `'static`. This
125 // is sound because the surrounding `LiveCursor` keeps
126 // (a) the `Arc<RwLock<...>>` alive while the guard is
127 // alive — the RwLock behind the guard never gets freed —
128 // and (b) the `Box<CompiledQuery>` alive while the
129 // cursor is alive. The `Drop` impl below releases
130 // `cursor` before `guard` before `_store`, so neither
131 // borrow can outlive its backing storage.
132 let guard: RwLockReadGuard<'static, InMemoryGraph> = unsafe { std::mem::transmute(guard) };
133 let storage_ref: &'static InMemoryGraph =
134 unsafe { std::mem::transmute::<&InMemoryGraph, _>(&*guard) };
135 let compiled_ref: &'static CompiledQuery =
136 unsafe { std::mem::transmute::<&CompiledQuery, _>(&*compiled) };
137
138 let cursor = PullExecutor::new(storage_ref, params)
139 .open_compiled(compiled_ref)
140 .map_err(|e| anyhow!(e))?;
141
142 Ok(Self {
143 cursor: ManuallyDrop::new(cursor),
144 guard: ManuallyDrop::new(guard),
145 _store: store,
146 _compiled: compiled,
147 })
148 }
149
150 fn next_row(&mut self) -> ExecResult<Option<Row>> {
151 self.cursor.next_row()
152 }
153}
154
155impl Drop for LiveCursor {
156 fn drop(&mut self) {
157 // SAFETY: drop in the documented order — cursor first
158 // (releases its borrow into `*guard`), then guard
159 // (releases the read lock). After these calls we never touch
160 // `cursor` or `guard` again. `_store` and `_compiled`
161 // drop naturally afterwards via the normal field-drop
162 // sequence.
163 unsafe {
164 ManuallyDrop::drop(&mut self.cursor);
165 ManuallyDrop::drop(&mut self.guard);
166 }
167 }
168}
169
170/// Per-stream state held only by auto-commit write streams.
171///
172/// The auto-commit guard is a thin wrapper around an explicit
173/// [`Transaction`]: full cursor exhaustion calls `commit`,
174/// premature drop or error calls `rollback`. All staged-graph,
175/// savepoint, and WAL replay logic lives on `Transaction` itself,
176/// so the guard contributes no behavior of its own beyond the
177/// commit-vs-rollback decision.
178pub(crate) struct AutoCommitGuard<'a> {
179 /// The hidden transaction. `None` once the guard has finalized
180 /// (commit consumes the tx; rollback consumes it; both leave
181 /// `None` behind).
182 pub(crate) tx: Option<Transaction<'a>>,
183 /// Set once a finalization (commit or rollback) has run so
184 /// duplicate calls — including the `Drop` path after a
185 /// successful `next_row`-driven commit — are no-ops.
186 pub(crate) finalized: bool,
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190enum StreamState {
191 Active,
192 Exhausted,
193 Errored,
194}
195
196impl<'a> std::fmt::Debug for QueryStream<'a> {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 let state = match &self.inner {
199 StreamInner::Tx { state, .. }
200 | StreamInner::AutoCommit { state, .. }
201 | StreamInner::Live { state, .. } => *state,
202 };
203 f.debug_struct("QueryStream")
204 .field("columns", &self.columns)
205 .field("state", &state)
206 .finish()
207 }
208}
209
210impl<'a> QueryStream<'a> {
211 pub(crate) fn for_tx_cursor(
212 cursor: Box<dyn RowSource + 'static>,
213 columns: Vec<String>,
214 tx_handle: Arc<Mutex<TxInner>>,
215 rollback_on_drop: bool,
216 ) -> Self {
217 Self {
218 columns,
219 inner: StreamInner::Tx {
220 cursor: Some(cursor),
221 state: StreamState::Active,
222 tx_handle,
223 rollback_on_drop,
224 },
225 }
226 }
227
228 pub(crate) fn auto_commit(
229 cursor: Box<dyn lora_executor::RowSource + 'static>,
230 columns: Vec<String>,
231 guard: AutoCommitGuard<'a>,
232 ) -> Self {
233 Self {
234 columns,
235 inner: StreamInner::AutoCommit {
236 cursor: Some(cursor),
237 state: StreamState::Active,
238 guard,
239 },
240 }
241 }
242
243 pub(crate) fn live(cursor: LiveCursor, columns: Vec<String>) -> Self {
244 Self {
245 columns,
246 inner: StreamInner::Live {
247 cursor,
248 state: StreamState::Active,
249 _phantom: std::marker::PhantomData,
250 },
251 }
252 }
253
254 /// Plan-derived column names. Populated even when the result is
255 /// empty so callers can drive a row-arrays format off this list
256 /// without first peeking at a materialized row.
257 pub fn columns(&self) -> &[String] {
258 &self.columns
259 }
260
261 /// Pull the next row. Returns `Ok(None)` once the cursor is
262 /// exhausted, `Ok(Some(row))` for the next hydrated row, or an
263 /// error if the underlying execution failed. Once an error has
264 /// been observed, subsequent calls keep returning that terminal
265 /// state — the cursor never tries to recover or re-execute.
266 pub fn next_row(&mut self) -> Result<Option<Row>> {
267 match &mut self.inner {
268 StreamInner::Live { state, cursor, .. } => match *state {
269 StreamState::Errored => Err(anyhow!("query stream errored")),
270 StreamState::Exhausted => Ok(None),
271 StreamState::Active => match cursor.next_row() {
272 Ok(Some(row)) => Ok(Some(row)),
273 Ok(None) => {
274 *state = StreamState::Exhausted;
275 Ok(None)
276 }
277 Err(e) => {
278 *state = StreamState::Errored;
279 Err(anyhow!(e))
280 }
281 },
282 },
283 StreamInner::Tx {
284 state,
285 cursor,
286 tx_handle,
287 rollback_on_drop,
288 } => match *state {
289 StreamState::Errored => Err(anyhow!("query stream errored")),
290 StreamState::Exhausted => Ok(None),
291 StreamState::Active => {
292 let pull = match cursor.as_mut() {
293 Some(c) => c.next_row(),
294 None => {
295 *state = StreamState::Errored;
296 return Err(anyhow!("transaction cursor missing"));
297 }
298 };
299 match pull {
300 Ok(Some(row)) => Ok(Some(row)),
301 Ok(None) => {
302 cursor.take();
303 finalize_tx_stream(tx_handle, true, *rollback_on_drop);
304 *state = StreamState::Exhausted;
305 Ok(None)
306 }
307 Err(e) => {
308 cursor.take();
309 finalize_tx_stream(tx_handle, false, *rollback_on_drop);
310 *state = StreamState::Errored;
311 Err(anyhow!(e))
312 }
313 }
314 }
315 },
316 StreamInner::AutoCommit {
317 state,
318 cursor,
319 guard,
320 } => match *state {
321 StreamState::Errored => Err(anyhow!("query stream errored")),
322 StreamState::Exhausted => Ok(None),
323 StreamState::Active => {
324 let pull = match cursor.as_mut() {
325 Some(c) => c.next_row(),
326 None => {
327 *state = StreamState::Errored;
328 return Err(anyhow!("auto-commit cursor missing"));
329 }
330 };
331 match pull {
332 Ok(Some(row)) => Ok(Some(row)),
333 Ok(None) => {
334 // Drop the cursor first so its borrows
335 // into the staged graph release before
336 // commit moves staged out of inner.
337 cursor.take();
338 match guard.commit() {
339 Ok(()) => {
340 *state = StreamState::Exhausted;
341 Ok(None)
342 }
343 Err(e) => {
344 *state = StreamState::Errored;
345 Err(e)
346 }
347 }
348 }
349 Err(e) => {
350 cursor.take();
351 guard.rollback();
352 *state = StreamState::Errored;
353 Err(anyhow!(e))
354 }
355 }
356 }
357 },
358 }
359 }
360
361 /// True once the stream has produced its last row.
362 fn is_exhausted(&self) -> bool {
363 match &self.inner {
364 StreamInner::Tx { state, .. }
365 | StreamInner::AutoCommit { state, .. }
366 | StreamInner::Live { state, .. } => matches!(state, StreamState::Exhausted),
367 }
368 }
369}
370
371impl<'a> Iterator for QueryStream<'a> {
372 type Item = Row;
373
374 fn next(&mut self) -> Option<Self::Item> {
375 match self.next_row() {
376 Ok(Some(row)) => Some(row),
377 Ok(None) => None,
378 Err(_) => None,
379 }
380 }
381
382 fn size_hint(&self) -> (usize, Option<usize>) {
383 match &self.inner {
384 // Live and AutoCommit (now backed by a streaming cursor)
385 // don't know their length until drained.
386 StreamInner::Live { .. } | StreamInner::Tx { .. } | StreamInner::AutoCommit { .. } => {
387 (0, None)
388 }
389 }
390 }
391}
392
393// Note: `ExactSizeIterator` intentionally not implemented. The
394// `Live` variant produces rows lazily and can't report an exact
395// remaining count.
396
397impl<'a> Drop for QueryStream<'a> {
398 fn drop(&mut self) {
399 let exhausted = self.is_exhausted();
400 match &mut self.inner {
401 StreamInner::Tx {
402 cursor,
403 tx_handle,
404 rollback_on_drop,
405 ..
406 } => {
407 cursor.take();
408 finalize_tx_stream(tx_handle, exhausted, *rollback_on_drop);
409 }
410 StreamInner::Live { .. } => {
411 // Drop releases the cursor, then the read guard,
412 // which releases the live store read lock. No
413 // additional cleanup needed — live streams never
414 // mutate, so there is nothing to commit or roll back.
415 }
416 StreamInner::AutoCommit {
417 state,
418 cursor,
419 guard,
420 } => {
421 // Drop the cursor first so its borrows into the
422 // staged graph release before the guard rolls back
423 // (which moves staged to None).
424 cursor.take();
425 // Premature drop = rollback. Successful exhaustion
426 // already finalized the guard via `commit()` in
427 // `next_row`, so this path is a no-op for the
428 // exhausted case.
429 if !guard.finalized && !matches!(state, StreamState::Exhausted) {
430 guard.rollback();
431 }
432 }
433 }
434 }
435}
436
437impl<'a> AutoCommitGuard<'a> {
438 /// Publish the staged graph as the live store. Delegates to
439 /// [`Transaction::commit`] which owns the WAL replay + swap
440 /// logic. Idempotent — subsequent calls are no-ops once
441 /// finalized, regardless of whether the previous attempt
442 /// succeeded or failed.
443 fn commit(&mut self) -> Result<()> {
444 if self.finalized {
445 return Ok(());
446 }
447 // Mark finalized before consuming the tx so a commit
448 // failure still prevents Drop from later trying to roll
449 // back a tx that no longer exists.
450 self.finalized = true;
451 match self.tx.take() {
452 Some(tx) => {
453 // The streaming auto-commit cursor sets
454 // `cursor_active = true` at construction; it must
455 // be cleared before `tx.commit` (which rejects on
456 // an active cursor). The cursor itself was already
457 // dropped by the caller in `next_row` — its
458 // borrows back into staged are gone, so we can
459 // safely flip the flag here. For the buffered
460 // fallback path the flag was never set, so this
461 // assignment is a no-op.
462 if let Ok(mut inner) = tx.inner.lock() {
463 inner.cursor_active = false;
464 }
465 tx.commit()
466 }
467 None => Ok(()),
468 }
469 }
470
471 /// Discard the staged graph. Delegates to
472 /// [`Transaction::rollback`]; failures are swallowed because
473 /// the rollback path runs from `Drop` and has nowhere to
474 /// surface an error.
475 fn rollback(&mut self) {
476 if self.finalized {
477 return;
478 }
479 self.finalized = true;
480 if let Some(tx) = self.tx.take() {
481 // Clear the streaming-cursor flag before delegating to
482 // tx.rollback so the rollback can finalize without
483 // stumbling over a stale `cursor_active = true`.
484 if let Ok(mut inner) = tx.inner.lock() {
485 inner.cursor_active = false;
486 }
487 let _ = tx.rollback();
488 }
489 }
490}