Skip to main content

lora_database/
stream.rs

1use std::collections::BTreeMap;
2use std::mem::ManuallyDrop;
3use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
4
5use anyhow::{anyhow, Result};
6use lora_compiler::CompiledQuery;
7use lora_executor::{ExecResult, LoraValue, PullExecutor, Row, RowSource};
8use lora_store::InMemoryGraph;
9
10use crate::transaction::{finalize_tx_stream, Transaction, TxInner};
11
12/// Owning row stream returned by [`crate::Database::stream`] and transaction
13/// streaming methods.
14///
15/// The cursor is fallible (`next_row()` surfaces execution errors)
16/// and exposes plan-derived column names populated even for empty
17/// results. The lifetime parameter `'a` is bound to the source the
18/// cursor borrows from — typically the database for auto-commit
19/// write streams that hold the live write guard until exhaustion or
20/// drop. Read-only and transaction-bound streams need no live
21/// borrow and use `'static` (the buffered variant).
22pub struct QueryStream<'a> {
23    columns: Vec<String>,
24    inner: StreamInner<'a>,
25}
26
27enum StreamInner<'a> {
28    /// Transaction-bound streaming cursor. The cursor borrows from
29    /// the transaction's staged graph, which is kept alive by
30    /// `tx_handle`; finalization releases the cursor token and
31    /// either clears or restores the pending statement savepoint.
32    Tx {
33        cursor: Option<Box<dyn RowSource + 'static>>,
34        state: StreamState,
35        tx_handle: Arc<Mutex<TxInner>>,
36        rollback_on_drop: bool,
37    },
38    /// True pull-based read-only stream. Holds a live store read
39    /// lock through the cursor's lifetime and emits rows as the
40    /// caller pulls them, without any intermediate
41    /// materialization. Backed by a [`LiveCursor`] which uses
42    /// `self_cell` to safely co-own the lock guard and the
43    /// borrowing cursor.
44    Live {
45        cursor: LiveCursor,
46        state: StreamState,
47        // The 'a parameter is unused for this variant — the
48        // self-cell hides the borrow. We carry a phantom to keep
49        // the enum's lifetime parameter consistent with the
50        // other variants.
51        _phantom: std::marker::PhantomData<&'a ()>,
52    },
53    /// Auto-commit write stream backed by a hidden staged
54    /// transaction. The graph is mutated on a clone held in
55    /// `guard.tx.inner.staged`; the live store write lock stays locked
56    /// through the tx's `live` guard so no other writer races. On
57    /// full exhaustion the staged graph is published and the WAL
58    /// replays the buffered events; on premature drop or error the
59    /// staged graph and buffer are discarded and the live store is
60    /// untouched.
61    ///
62    /// `cursor` is a streaming `RowSource` that may apply mutations
63    /// row-by-row (via `StreamingWriteCursor`) or yield from a
64    /// pre-materialized buffer (via `BufferedRowSource`); see
65    /// `Transaction::open_streaming_compiled_autocommit`. It is
66    /// taken and dropped before the guard's commit/rollback so any
67    /// borrows back into the staged graph are released first.
68    AutoCommit {
69        cursor: Option<Box<dyn lora_executor::RowSource + 'static>>,
70        state: StreamState,
71        guard: AutoCommitGuard<'a>,
72    },
73}
74
75/// Self-referential cursor that pulls rows directly from the live
76/// store. The `Arc<RwLock<...>>` keeps the storage alive, the
77/// `RwLockReadGuard` keeps it locked, and the boxed `RowSource`
78/// borrows from the locked storage. Drop releases them in
79/// declaration order — `cursor` first, then `guard`, finally the
80/// `Arc` — so the cursor never sees a dropped guard and the
81/// guard never sees a dropped lock.
82///
83/// `self_cell` can't model this because the cursor borrows from
84/// the guard's deref while the guard itself borrows from the
85/// owner — two levels of nested borrow inside the dependent. The
86/// unsafe scope is small (one constructor, one drop) and
87/// fully encapsulated; nothing outside this module can observe
88/// the lifetime extension.
89pub(crate) struct LiveCursor {
90    /// SAFETY invariant: borrows from `*guard`. Must drop before
91    /// `guard`.
92    cursor: ManuallyDrop<Box<dyn RowSource + 'static>>,
93    /// SAFETY invariant: borrows from the `RwLock` inside `_store`.
94    /// Must drop before `_store`.
95    guard: ManuallyDrop<RwLockReadGuard<'static, InMemoryGraph>>,
96    /// Keeps the underlying `RwLock` alive. Dropped after `guard`.
97    _store: Arc<RwLock<InMemoryGraph>>,
98    /// Keeps the compiled plan alive — operator sources hold
99    /// references into it (e.g. predicate `ResolvedExpr`s). Boxed
100    /// so the plan address is stable across the move into the
101    /// struct.
102    _compiled: Box<CompiledQuery>,
103}
104
105impl LiveCursor {
106    /// Lock the live store and open a streaming cursor against
107    /// the given compiled query. Internal helper for
108    /// `Database::stream_with_params` — never expose the
109    /// constructed `LiveCursor` to callers without the
110    /// surrounding `QueryStream`, which makes the `'static`
111    /// transmutes invisible.
112    pub(crate) fn open(
113        store: Arc<RwLock<InMemoryGraph>>,
114        compiled: CompiledQuery,
115        params: BTreeMap<String, LoraValue>,
116    ) -> Result<Self> {
117        let compiled = Box::new(compiled);
118
119        let guard = store
120            .read()
121            .unwrap_or_else(|poisoned| poisoned.into_inner());
122
123        // SAFETY: We extend the lifetime of the guard and the
124        // borrows into `*guard` / `*compiled` to `'static`. This
125        // is sound because the surrounding `LiveCursor` keeps
126        // (a) the `Arc<RwLock<...>>` alive while the guard is
127        // alive — the RwLock behind the guard never gets freed —
128        // and (b) the `Box<CompiledQuery>` alive while the
129        // cursor is alive. The `Drop` impl below releases
130        // `cursor` before `guard` before `_store`, so neither
131        // borrow can outlive its backing storage.
132        let guard: RwLockReadGuard<'static, InMemoryGraph> = unsafe { std::mem::transmute(guard) };
133        let storage_ref: &'static InMemoryGraph =
134            unsafe { std::mem::transmute::<&InMemoryGraph, _>(&*guard) };
135        let compiled_ref: &'static CompiledQuery =
136            unsafe { std::mem::transmute::<&CompiledQuery, _>(&*compiled) };
137
138        let cursor = PullExecutor::new(storage_ref, params)
139            .open_compiled(compiled_ref)
140            .map_err(|e| anyhow!(e))?;
141
142        Ok(Self {
143            cursor: ManuallyDrop::new(cursor),
144            guard: ManuallyDrop::new(guard),
145            _store: store,
146            _compiled: compiled,
147        })
148    }
149
150    fn next_row(&mut self) -> ExecResult<Option<Row>> {
151        self.cursor.next_row()
152    }
153}
154
155impl Drop for LiveCursor {
156    fn drop(&mut self) {
157        // SAFETY: drop in the documented order — cursor first
158        // (releases its borrow into `*guard`), then guard
159        // (releases the read lock). After these calls we never touch
160        // `cursor` or `guard` again. `_store` and `_compiled`
161        // drop naturally afterwards via the normal field-drop
162        // sequence.
163        unsafe {
164            ManuallyDrop::drop(&mut self.cursor);
165            ManuallyDrop::drop(&mut self.guard);
166        }
167    }
168}
169
170/// Per-stream state held only by auto-commit write streams.
171///
172/// The auto-commit guard is a thin wrapper around an explicit
173/// [`Transaction`]: full cursor exhaustion calls `commit`,
174/// premature drop or error calls `rollback`. All staged-graph,
175/// savepoint, and WAL replay logic lives on `Transaction` itself,
176/// so the guard contributes no behavior of its own beyond the
177/// commit-vs-rollback decision.
178pub(crate) struct AutoCommitGuard<'a> {
179    /// The hidden transaction. `None` once the guard has finalized
180    /// (commit consumes the tx; rollback consumes it; both leave
181    /// `None` behind).
182    pub(crate) tx: Option<Transaction<'a>>,
183    /// Set once a finalization (commit or rollback) has run so
184    /// duplicate calls — including the `Drop` path after a
185    /// successful `next_row`-driven commit — are no-ops.
186    pub(crate) finalized: bool,
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190enum StreamState {
191    Active,
192    Exhausted,
193    Errored,
194}
195
196impl<'a> std::fmt::Debug for QueryStream<'a> {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        let state = match &self.inner {
199            StreamInner::Tx { state, .. }
200            | StreamInner::AutoCommit { state, .. }
201            | StreamInner::Live { state, .. } => *state,
202        };
203        f.debug_struct("QueryStream")
204            .field("columns", &self.columns)
205            .field("state", &state)
206            .finish()
207    }
208}
209
210impl<'a> QueryStream<'a> {
211    pub(crate) fn for_tx_cursor(
212        cursor: Box<dyn RowSource + 'static>,
213        columns: Vec<String>,
214        tx_handle: Arc<Mutex<TxInner>>,
215        rollback_on_drop: bool,
216    ) -> Self {
217        Self {
218            columns,
219            inner: StreamInner::Tx {
220                cursor: Some(cursor),
221                state: StreamState::Active,
222                tx_handle,
223                rollback_on_drop,
224            },
225        }
226    }
227
228    pub(crate) fn auto_commit(
229        cursor: Box<dyn lora_executor::RowSource + 'static>,
230        columns: Vec<String>,
231        guard: AutoCommitGuard<'a>,
232    ) -> Self {
233        Self {
234            columns,
235            inner: StreamInner::AutoCommit {
236                cursor: Some(cursor),
237                state: StreamState::Active,
238                guard,
239            },
240        }
241    }
242
243    pub(crate) fn live(cursor: LiveCursor, columns: Vec<String>) -> Self {
244        Self {
245            columns,
246            inner: StreamInner::Live {
247                cursor,
248                state: StreamState::Active,
249                _phantom: std::marker::PhantomData,
250            },
251        }
252    }
253
254    /// Plan-derived column names. Populated even when the result is
255    /// empty so callers can drive a row-arrays format off this list
256    /// without first peeking at a materialized row.
257    pub fn columns(&self) -> &[String] {
258        &self.columns
259    }
260
261    /// Pull the next row. Returns `Ok(None)` once the cursor is
262    /// exhausted, `Ok(Some(row))` for the next hydrated row, or an
263    /// error if the underlying execution failed. Once an error has
264    /// been observed, subsequent calls keep returning that terminal
265    /// state — the cursor never tries to recover or re-execute.
266    pub fn next_row(&mut self) -> Result<Option<Row>> {
267        match &mut self.inner {
268            StreamInner::Live { state, cursor, .. } => match *state {
269                StreamState::Errored => Err(anyhow!("query stream errored")),
270                StreamState::Exhausted => Ok(None),
271                StreamState::Active => match cursor.next_row() {
272                    Ok(Some(row)) => Ok(Some(row)),
273                    Ok(None) => {
274                        *state = StreamState::Exhausted;
275                        Ok(None)
276                    }
277                    Err(e) => {
278                        *state = StreamState::Errored;
279                        Err(anyhow!(e))
280                    }
281                },
282            },
283            StreamInner::Tx {
284                state,
285                cursor,
286                tx_handle,
287                rollback_on_drop,
288            } => match *state {
289                StreamState::Errored => Err(anyhow!("query stream errored")),
290                StreamState::Exhausted => Ok(None),
291                StreamState::Active => {
292                    let pull = match cursor.as_mut() {
293                        Some(c) => c.next_row(),
294                        None => {
295                            *state = StreamState::Errored;
296                            return Err(anyhow!("transaction cursor missing"));
297                        }
298                    };
299                    match pull {
300                        Ok(Some(row)) => Ok(Some(row)),
301                        Ok(None) => {
302                            cursor.take();
303                            finalize_tx_stream(tx_handle, true, *rollback_on_drop);
304                            *state = StreamState::Exhausted;
305                            Ok(None)
306                        }
307                        Err(e) => {
308                            cursor.take();
309                            finalize_tx_stream(tx_handle, false, *rollback_on_drop);
310                            *state = StreamState::Errored;
311                            Err(anyhow!(e))
312                        }
313                    }
314                }
315            },
316            StreamInner::AutoCommit {
317                state,
318                cursor,
319                guard,
320            } => match *state {
321                StreamState::Errored => Err(anyhow!("query stream errored")),
322                StreamState::Exhausted => Ok(None),
323                StreamState::Active => {
324                    let pull = match cursor.as_mut() {
325                        Some(c) => c.next_row(),
326                        None => {
327                            *state = StreamState::Errored;
328                            return Err(anyhow!("auto-commit cursor missing"));
329                        }
330                    };
331                    match pull {
332                        Ok(Some(row)) => Ok(Some(row)),
333                        Ok(None) => {
334                            // Drop the cursor first so its borrows
335                            // into the staged graph release before
336                            // commit moves staged out of inner.
337                            cursor.take();
338                            match guard.commit() {
339                                Ok(()) => {
340                                    *state = StreamState::Exhausted;
341                                    Ok(None)
342                                }
343                                Err(e) => {
344                                    *state = StreamState::Errored;
345                                    Err(e)
346                                }
347                            }
348                        }
349                        Err(e) => {
350                            cursor.take();
351                            guard.rollback();
352                            *state = StreamState::Errored;
353                            Err(anyhow!(e))
354                        }
355                    }
356                }
357            },
358        }
359    }
360
361    /// True once the stream has produced its last row.
362    fn is_exhausted(&self) -> bool {
363        match &self.inner {
364            StreamInner::Tx { state, .. }
365            | StreamInner::AutoCommit { state, .. }
366            | StreamInner::Live { state, .. } => matches!(state, StreamState::Exhausted),
367        }
368    }
369}
370
371impl<'a> Iterator for QueryStream<'a> {
372    type Item = Row;
373
374    fn next(&mut self) -> Option<Self::Item> {
375        match self.next_row() {
376            Ok(Some(row)) => Some(row),
377            Ok(None) => None,
378            Err(_) => None,
379        }
380    }
381
382    fn size_hint(&self) -> (usize, Option<usize>) {
383        match &self.inner {
384            // Live and AutoCommit (now backed by a streaming cursor)
385            // don't know their length until drained.
386            StreamInner::Live { .. } | StreamInner::Tx { .. } | StreamInner::AutoCommit { .. } => {
387                (0, None)
388            }
389        }
390    }
391}
392
393// Note: `ExactSizeIterator` intentionally not implemented. The
394// `Live` variant produces rows lazily and can't report an exact
395// remaining count.
396
397impl<'a> Drop for QueryStream<'a> {
398    fn drop(&mut self) {
399        let exhausted = self.is_exhausted();
400        match &mut self.inner {
401            StreamInner::Tx {
402                cursor,
403                tx_handle,
404                rollback_on_drop,
405                ..
406            } => {
407                cursor.take();
408                finalize_tx_stream(tx_handle, exhausted, *rollback_on_drop);
409            }
410            StreamInner::Live { .. } => {
411                // Drop releases the cursor, then the read guard,
412                // which releases the live store read lock. No
413                // additional cleanup needed — live streams never
414                // mutate, so there is nothing to commit or roll back.
415            }
416            StreamInner::AutoCommit {
417                state,
418                cursor,
419                guard,
420            } => {
421                // Drop the cursor first so its borrows into the
422                // staged graph release before the guard rolls back
423                // (which moves staged to None).
424                cursor.take();
425                // Premature drop = rollback. Successful exhaustion
426                // already finalized the guard via `commit()` in
427                // `next_row`, so this path is a no-op for the
428                // exhausted case.
429                if !guard.finalized && !matches!(state, StreamState::Exhausted) {
430                    guard.rollback();
431                }
432            }
433        }
434    }
435}
436
437impl<'a> AutoCommitGuard<'a> {
438    /// Publish the staged graph as the live store. Delegates to
439    /// [`Transaction::commit`] which owns the WAL replay + swap
440    /// logic. Idempotent — subsequent calls are no-ops once
441    /// finalized, regardless of whether the previous attempt
442    /// succeeded or failed.
443    fn commit(&mut self) -> Result<()> {
444        if self.finalized {
445            return Ok(());
446        }
447        // Mark finalized before consuming the tx so a commit
448        // failure still prevents Drop from later trying to roll
449        // back a tx that no longer exists.
450        self.finalized = true;
451        match self.tx.take() {
452            Some(tx) => {
453                // The streaming auto-commit cursor sets
454                // `cursor_active = true` at construction; it must
455                // be cleared before `tx.commit` (which rejects on
456                // an active cursor). The cursor itself was already
457                // dropped by the caller in `next_row` — its
458                // borrows back into staged are gone, so we can
459                // safely flip the flag here. For the buffered
460                // fallback path the flag was never set, so this
461                // assignment is a no-op.
462                if let Ok(mut inner) = tx.inner.lock() {
463                    inner.cursor_active = false;
464                }
465                tx.commit()
466            }
467            None => Ok(()),
468        }
469    }
470
471    /// Discard the staged graph. Delegates to
472    /// [`Transaction::rollback`]; failures are swallowed because
473    /// the rollback path runs from `Drop` and has nowhere to
474    /// surface an error.
475    fn rollback(&mut self) {
476        if self.finalized {
477            return;
478        }
479        self.finalized = true;
480        if let Some(tx) = self.tx.take() {
481            // Clear the streaming-cursor flag before delegating to
482            // tx.rollback so the rollback can finalize without
483            // stumbling over a stale `cursor_active = true`.
484            if let Ok(mut inner) = tx.inner.lock() {
485                inner.cursor_active = false;
486            }
487            let _ = tx.rollback();
488        }
489    }
490}