Skip to main content

lora_database/
transaction.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex, MutexGuard, RwLockReadGuard, RwLockWriteGuard};
3use std::time::{Duration, Instant};
4
5use anyhow::{anyhow, Result};
6use lora_analyzer::Analyzer;
7use lora_compiler::{CompiledQuery, Compiler};
8use lora_executor::{
9    classify_stream, compiled_result_columns, project_rows, ExecuteOptions, ExecutionContext,
10    Executor, LoraValue, MutableExecutionContext, MutableExecutor, MutablePullExecutor,
11    PullExecutor, QueryResult, Row, RowSource,
12};
13use lora_parser::parse_query;
14use lora_store::{InMemoryGraph, MutationEvent, MutationRecorder};
15use lora_wal::{WalRecorder, WroteCommit};
16
17use crate::stream::QueryStream;
18
19/// Transaction execution mode.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum TransactionMode {
22    /// Use the read-only executor. Write operators return read-only errors.
23    ReadOnly,
24    /// Execute reads and writes against a staged graph, then publish on commit.
25    ReadWrite,
26}
27
28pub(crate) enum LiveStoreGuard<'db> {
29    Read(RwLockReadGuard<'db, InMemoryGraph>),
30    Write(RwLockWriteGuard<'db, InMemoryGraph>),
31}
32
33impl LiveStoreGuard<'_> {
34    fn as_graph(&self) -> &InMemoryGraph {
35        match self {
36            Self::Read(guard) => guard,
37            Self::Write(guard) => guard,
38        }
39    }
40
41    fn as_graph_mut(&mut self) -> Option<&mut InMemoryGraph> {
42        match self {
43            Self::Read(_) => None,
44            Self::Write(guard) => Some(guard),
45        }
46    }
47}
48
49/// Captures the staged graph and tx-local mutation buffer at the point
50/// a statement is opened, so a failed/dropped statement can be rolled
51/// back to that point without affecting earlier work in the same
52/// transaction.
53pub(crate) struct Savepoint {
54    staged: Option<InMemoryGraph>,
55    buffer_len: usize,
56}
57
58/// Buffers `MutationEvent`s emitted by the staged graph while a
59/// transaction is in progress. The buffer replaces direct WAL writes
60/// during the transaction body; on commit the host replays the
61/// buffer into the real `WalRecorder` as a single durable
62/// transaction. Statement rollback truncates the buffer back to its
63/// pre-statement length; transaction rollback drops it entirely.
64struct BufferingRecorder {
65    buffer: Arc<Mutex<Vec<MutationEvent>>>,
66}
67
68impl BufferingRecorder {
69    fn new(buffer: Arc<Mutex<Vec<MutationEvent>>>) -> Self {
70        Self { buffer }
71    }
72}
73
74impl MutationRecorder for BufferingRecorder {
75    fn record(&self, event: &MutationEvent) {
76        if let Ok(mut buf) = self.buffer.lock() {
77            buf.push(event.clone());
78        }
79    }
80}
81
82/// Shared transaction state. Wrapped in `Arc<Mutex<>>` so a
83/// `QueryStream` opened against the transaction can release its
84/// cursor token and signal savepoint-rollback intent on drop without
85/// borrowing the [`Transaction`] handle.
86pub(crate) struct TxInner {
87    /// The cloned staging graph. Mutated by write statements through
88    /// the [`MutableExecutor`]; read by read-only statements through
89    /// [`PullExecutor`]. `None` once the transaction has been closed.
90    pub(crate) staged: Option<InMemoryGraph>,
91    /// Tx-local mutation log, populated by the [`BufferingRecorder`]
92    /// installed on `staged`. Replayed into the real WAL exactly once
93    /// at commit time.
94    pub(crate) buffer: Arc<Mutex<Vec<MutationEvent>>>,
95    /// Per-statement savepoint snapshot. Set when a statement opens,
96    /// cleared on successful completion, restored on
97    /// failure/premature drop.
98    pub(crate) pending_savepoint: Option<Savepoint>,
99    /// True while a `QueryStream` opened against this transaction is
100    /// alive. Blocks new statements and prevents commit until the
101    /// cursor is released.
102    pub(crate) cursor_active: bool,
103    /// Set by the cursor's `Drop` impl when the cursor was released
104    /// without exhausting all rows. The next transaction operation
105    /// applies the pending savepoint before doing anything else.
106    pub(crate) cursor_dropped_dirty: bool,
107    /// True after `commit` or `rollback` has run, regardless of
108    /// outcome. Subsequent operations fail loudly instead of silently
109    /// running on stale state.
110    pub(crate) closed: bool,
111    /// Transaction execution mode chosen at `begin_transaction` time.
112    pub(crate) mode: TransactionMode,
113    /// Whether this transaction needs a mutation buffer for durable WAL
114    /// replay. Databases without a WAL can skip recorder installation and
115    /// avoid cloning mutation payloads into an unused buffer.
116    pub(crate) buffer_mutations: bool,
117}
118
119/// Explicit transaction over the in-memory graph.
120///
121/// The implementation is conservative: read-only transactions hold a
122/// database read lock, and read-write transactions hold the database
123/// write lock. Read-write transactions lazily create a cloned staging
124/// graph on the first mutating statement, then either swap that graph
125/// into place on commit or drop it on rollback. Explicit mutating
126/// statements capture a graph +
127/// WAL-buffer savepoint so a failed or dropped streaming statement
128/// only rolls back its own effects, not the transaction as a whole.
129///
130/// When a WAL is attached, mutation events fire into a tx-local
131/// buffer rather than the durable log. The buffer is replayed into
132/// the WAL exactly once at commit, so recovery never observes
133/// partial / aborted / dropped statements.
134pub struct Transaction<'db> {
135    pub(crate) live: Option<LiveStoreGuard<'db>>,
136    pub(crate) inner: Arc<Mutex<TxInner>>,
137    pub(crate) wal: Option<Arc<WalRecorder>>,
138    mode: TransactionMode,
139}
140
141impl<'db> Transaction<'db> {
142    /// Build a fresh transaction. Used by `Database::begin_transaction`.
143    pub(crate) fn new(
144        live: LiveStoreGuard<'db>,
145        wal: Option<Arc<WalRecorder>>,
146        mode: TransactionMode,
147    ) -> Self {
148        let buffer_mutations = wal.is_some();
149        let inner = TxInner {
150            staged: None,
151            buffer: Arc::new(Mutex::new(Vec::new())),
152            pending_savepoint: None,
153            cursor_active: false,
154            cursor_dropped_dirty: false,
155            closed: false,
156            mode,
157            buffer_mutations,
158        };
159        Self {
160            live: Some(live),
161            inner: Arc::new(Mutex::new(inner)),
162            wal,
163            mode,
164        }
165    }
166
167    /// Transaction mode chosen at begin time.
168    pub fn mode(&self) -> TransactionMode {
169        self.mode
170    }
171
172    /// Execute a query inside the transaction and return a materialized
173    /// `QueryResult`.
174    pub fn execute(&mut self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
175        self.execute_with_params(query, options, BTreeMap::new())
176    }
177
178    /// Execute a query inside the transaction with a cooperative deadline.
179    pub fn execute_with_timeout(
180        &mut self,
181        query: &str,
182        options: Option<ExecuteOptions>,
183        timeout: Duration,
184    ) -> Result<QueryResult> {
185        let deadline = Instant::now()
186            .checked_add(timeout)
187            .unwrap_or_else(Instant::now);
188        let rows =
189            self.execute_rows_with_params_deadline(query, BTreeMap::new(), Some(deadline))?;
190        Ok(project_rows(rows, options.unwrap_or_default()))
191    }
192
193    /// Execute a parameterised query inside the transaction.
194    pub fn execute_with_params(
195        &mut self,
196        query: &str,
197        options: Option<ExecuteOptions>,
198        params: BTreeMap<String, LoraValue>,
199    ) -> Result<QueryResult> {
200        let rows = self.execute_rows_with_params_deadline(query, params, None)?;
201        Ok(project_rows(rows, options.unwrap_or_default()))
202    }
203
204    /// Execute a parameterised query inside the transaction with a cooperative
205    /// deadline.
206    pub fn execute_with_params_timeout(
207        &mut self,
208        query: &str,
209        options: Option<ExecuteOptions>,
210        params: BTreeMap<String, LoraValue>,
211        timeout: Duration,
212    ) -> Result<QueryResult> {
213        let deadline = Instant::now()
214            .checked_add(timeout)
215            .unwrap_or_else(Instant::now);
216        let rows = self.execute_rows_with_params_deadline(query, params, Some(deadline))?;
217        Ok(project_rows(rows, options.unwrap_or_default()))
218    }
219
220    /// Execute a query inside the transaction and return hydrated rows before
221    /// final result-format projection.
222    pub fn execute_rows(&mut self, query: &str) -> Result<Vec<Row>> {
223        self.execute_rows_with_params(query, BTreeMap::new())
224    }
225
226    /// Execute a parameterised query inside the transaction and return hydrated
227    /// rows before final result-format projection.
228    pub fn execute_rows_with_params(
229        &mut self,
230        query: &str,
231        params: BTreeMap<String, LoraValue>,
232    ) -> Result<Vec<Row>> {
233        self.execute_rows_with_params_deadline(query, params, None)
234    }
235
236    fn execute_rows_with_params_deadline(
237        &mut self,
238        query: &str,
239        params: BTreeMap<String, LoraValue>,
240        deadline: Option<Instant>,
241    ) -> Result<Vec<Row>> {
242        let compiled = self.compile_in_tx(query)?;
243        self.execute_rows_compiled_deadline(&compiled, params, deadline)
244    }
245
246    fn execute_rows_compiled_deadline(
247        &mut self,
248        compiled: &CompiledQuery,
249        params: BTreeMap<String, LoraValue>,
250        deadline: Option<Instant>,
251    ) -> Result<Vec<Row>> {
252        // ReadOnly tx: never clones, runs straight against live.
253        if self.is_read_only_unchecked() {
254            self.precheck_open_no_savepoint()?;
255            let live = self
256                .live
257                .as_ref()
258                .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
259            let storage = live.as_graph();
260            let executor = Executor::with_deadline(ExecutionContext { storage, params }, deadline);
261            return executor
262                .execute_compiled_rows(compiled)
263                .map_err(|e| anyhow!(e));
264        }
265
266        // ReadWrite tx, lazy-clone aware.
267        let mut inner = self.begin_statement()?;
268        let is_mutating = classify_stream(compiled).is_mutating();
269
270        if !is_mutating {
271            // Read-only statement in a ReadWrite tx. Run against
272            // staged if it has been materialized (so the read
273            // sees prior in-tx writes), otherwise straight off
274            // the live graph — which equals staged-as-it-would-be
275            // because no writes have happened yet.
276            return match inner.staged.as_ref() {
277                Some(staged) => {
278                    let executor = Executor::with_deadline(
279                        ExecutionContext {
280                            storage: staged,
281                            params,
282                        },
283                        deadline,
284                    );
285                    executor
286                        .execute_compiled_rows(compiled)
287                        .map_err(|e| anyhow!(e))
288                }
289                None => {
290                    drop(inner);
291                    let live = self
292                        .live
293                        .as_ref()
294                        .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
295                    let storage = live.as_graph();
296                    let executor =
297                        Executor::with_deadline(ExecutionContext { storage, params }, deadline);
298                    executor
299                        .execute_compiled_rows(compiled)
300                        .map_err(|e| anyhow!(e))
301                }
302            };
303        }
304
305        // Mutating statement: lazy-clone the live graph if this
306        // is the first write in the tx, then capture a savepoint
307        // and run the mutable executor.
308        let clone_savepoint_graph = inner.staged.is_some();
309        self.ensure_staged_locked(&mut inner)?;
310        let savepoint = Some(take_savepoint(&inner, clone_savepoint_graph));
311
312        let exec_result: ExecResultRows = {
313            let staged = inner.staged_mut()?;
314            let mut executor = MutableExecutor::with_deadline(
315                MutableExecutionContext {
316                    storage: staged,
317                    params,
318                },
319                deadline,
320            );
321            executor
322                .execute_compiled_rows(compiled)
323                .map_err(|e| anyhow!(e))
324        };
325
326        match exec_result {
327            Ok(rows) => Ok(rows),
328            Err(err) => {
329                restore_savepoint(&mut inner, savepoint);
330                Err(err)
331            }
332        }
333    }
334
335    /// Open a streaming write cursor over the staged graph for a
336    /// pre-compiled mutating plan, used by the hidden auto-commit
337    /// stream path in `Database::stream_with_params`.
338    ///
339    /// The returned `Box<dyn RowSource + 'static>` may be either a
340    /// real per-row [`StreamingWriteCursor`][lora_executor::StreamingWriteCursor],
341    /// a mutable UNION cursor, or a [`BufferedRowSource`][lora_executor::BufferedRowSource]
342    /// for the remaining materialized leaves. Either way:
343    ///
344    /// * The cursor mutates the *staged* graph, never the live store.
345    /// * Mutations fire the [`BufferingRecorder`] installed on staged
346    ///   by [`Self::ensure_staged_locked`], which accumulates into
347    ///   `inner.buffer` and is replayed into the WAL on commit.
348    /// * `cursor_active` is set to `true` here. The caller MUST clear
349    ///   it before invoking [`Self::commit`] or [`Self::rollback`] —
350    ///   the cursor itself does not.
351    ///
352    /// # Safety
353    ///
354    /// The cursor is `'static` because it owns its compiled query (via
355    /// the supplied `Arc`) and aliases the staged graph through a raw
356    /// pointer. Soundness depends on the invariant that
357    /// `inner.staged` remains `Some(_)` at a stable address for the
358    /// cursor's lifetime. That invariant holds while
359    /// `cursor_active = true` blocks every other path that could
360    /// move or drop staged: explicit statements (`begin_statement`
361    /// rejects), `commit` and `rollback` (rejected until the caller
362    /// clears `cursor_active`).
363    pub(crate) fn open_streaming_compiled_autocommit(
364        &mut self,
365        compiled: Arc<CompiledQuery>,
366        params: BTreeMap<String, LoraValue>,
367    ) -> Result<Box<dyn RowSource + 'static>> {
368        if self.is_read_only_unchecked() {
369            return Err(anyhow!(
370                "streaming write cursor requires a ReadWrite transaction"
371            ));
372        }
373
374        let mut inner = self.begin_statement()?;
375        self.ensure_staged_locked(&mut inner)?;
376        inner.cursor_active = true;
377
378        // SAFETY: `inner.staged` is `Some` after `ensure_staged_locked`,
379        // and stays at the same address while `cursor_active = true`
380        // (see method-level safety note).
381        let staged_ptr: *mut InMemoryGraph = inner
382            .staged
383            .as_mut()
384            .expect("ensure_staged_locked guarantees Some")
385            as *mut _;
386        drop(inner);
387
388        // SAFETY: `compiled` (Arc held by the caller / AutoCommit guard)
389        // keeps the plan alive; `staged_ptr` is valid for the cursor's
390        // lifetime per the invariant above. We extend both lifetimes
391        // to `'static` so the resulting cursor can sit inside the
392        // `'static`-shaped AutoCommit variant of `QueryStream`.
393        let storage_static: &'static mut InMemoryGraph = unsafe { &mut *staged_ptr };
394        let compiled_static: &'static CompiledQuery =
395            unsafe { std::mem::transmute::<&CompiledQuery, _>(compiled.as_ref()) };
396
397        // `MutablePullExecutor::open_compiled` picks the narrowest
398        // cursor shape it can: per-row write cursor, branch-wise
399        // mutable UNION cursor, or a buffered materialized leaf.
400        let cursor = MutablePullExecutor::new(storage_static, params)
401            .open_compiled(compiled_static)
402            .map_err(|e| {
403                // Roll back: the cursor build never happened, so the
404                // tx is in a clean-but-poisoned state. Discard
405                // everything and let the caller bubble the error.
406                if let Ok(mut inner) = self.inner.lock() {
407                    discard_transaction_state(&mut inner);
408                }
409                self.live.take();
410                anyhow!(e)
411            })?;
412
413        // The Arc<CompiledQuery> is the safety anchor for the
414        // `'static` plan reference. Keep it alive for the cursor's
415        // lifetime by leaking a clone into the cursor's owned data.
416        // We can't store it on the cursor itself (it's a Box<dyn>),
417        // so we wrap the cursor in a guard that owns the Arc.
418        Ok(Box::new(StreamingCursorWithArc {
419            cursor,
420            _compiled: compiled,
421        }))
422    }
423
424    /// Compile a query in this transaction's view of the world:
425    /// against `staged` if it has been materialized, otherwise
426    /// straight against `live`. The two are equivalent before the
427    /// first mutating statement, so the resulting plan is valid
428    /// either way.
429    fn compile_in_tx(&self, query: &str) -> Result<CompiledQuery> {
430        let document = parse_query(query)?;
431        let resolved = {
432            let inner = self.lock_inner_unchecked();
433            if let Some(staged) = &inner.staged {
434                let mut analyzer = Analyzer::new(staged);
435                analyzer.analyze(&document)?
436            } else {
437                drop(inner);
438                let live = self
439                    .live
440                    .as_ref()
441                    .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
442                let mut analyzer = Analyzer::new(live.as_graph());
443                analyzer.analyze(&document)?
444            }
445        };
446        Ok(Compiler::compile(&resolved))
447    }
448
449    /// Materialize `inner.staged` if it doesn't exist yet —
450    /// ReadWrite transactions defer this clone until the first
451    /// mutating statement.
452    fn ensure_staged_locked(&self, inner: &mut MutexGuard<'_, TxInner>) -> Result<()> {
453        if inner.staged.is_some() {
454            return Ok(());
455        }
456        let live = self
457            .live
458            .as_ref()
459            .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
460        let mut staged: InMemoryGraph = live.as_graph().clone();
461        if matches!(inner.mode, TransactionMode::ReadWrite) && inner.buffer_mutations {
462            staged.set_mutation_recorder(Some(
463                Arc::new(BufferingRecorder::new(inner.buffer.clone())) as Arc<dyn MutationRecorder>,
464            ));
465        }
466        inner.staged = Some(staged);
467        Ok(())
468    }
469
470    /// Execute a query inside the transaction and return an owning row stream.
471    pub fn stream(&mut self, query: &str) -> Result<QueryStream<'static>> {
472        self.stream_with_params(query, BTreeMap::new())
473    }
474
475    /// Execute a parameterised query inside the transaction and return an
476    /// owning row stream.
477    pub fn stream_with_params(
478        &mut self,
479        query: &str,
480        params: BTreeMap<String, LoraValue>,
481    ) -> Result<QueryStream<'static>> {
482        let compiled = Arc::new(self.compile_in_tx(query)?);
483        let columns = compiled_result_columns(&compiled);
484        self.stream_compiled(compiled, columns, params)
485    }
486
487    /// Open a tx-bound stream for an already-compiled plan. Lets
488    /// `Database::stream_with_params` reuse the plan it built for
489    /// classification.
490    pub(crate) fn stream_compiled(
491        &mut self,
492        compiled: Arc<CompiledQuery>,
493        columns: Vec<String>,
494        params: BTreeMap<String, LoraValue>,
495    ) -> Result<QueryStream<'static>> {
496        let mut inner = self.begin_statement()?;
497        let is_mutating = classify_stream(&compiled).is_mutating();
498        if matches!(inner.mode, TransactionMode::ReadOnly) && is_mutating {
499            return Err(anyhow!(
500                "cannot execute mutating query in read-only transaction"
501            ));
502        }
503
504        // Transaction streams borrow from the staged graph. Even
505        // read-only streams materialize staging when needed so the
506        // cursor can outlive the `&mut Transaction` borrow without
507        // borrowing from the transaction-owned live write guard.
508        let clone_savepoint_graph = inner.staged.is_some();
509        self.ensure_staged_locked(&mut inner)?;
510        inner.cursor_active = true;
511
512        let rollback_on_drop = is_mutating;
513        if rollback_on_drop {
514            inner.pending_savepoint = Some(take_savepoint(&inner, clone_savepoint_graph));
515        } else {
516            inner.pending_savepoint = None;
517        }
518
519        let staged_ptr: *mut InMemoryGraph = inner
520            .staged
521            .as_mut()
522            .expect("ensure_staged_locked guarantees Some")
523            as *mut _;
524        drop(inner);
525
526        let compiled_static: &'static CompiledQuery =
527            unsafe { std::mem::transmute::<&CompiledQuery, _>(compiled.as_ref()) };
528        let cursor: Result<Box<dyn RowSource + 'static>> = if is_mutating {
529            let storage_static: &'static mut InMemoryGraph = unsafe { &mut *staged_ptr };
530            MutablePullExecutor::new(storage_static, params)
531                .open_compiled(compiled_static)
532                .map(|cursor| {
533                    Box::new(StreamingCursorWithArc {
534                        cursor,
535                        _compiled: compiled.clone(),
536                    }) as Box<dyn RowSource + 'static>
537                })
538                .map_err(|e| anyhow!(e))
539        } else {
540            let storage_static: &'static InMemoryGraph = unsafe { &*staged_ptr };
541            PullExecutor::new(storage_static, params)
542                .open_compiled(compiled_static)
543                .map(|cursor| {
544                    Box::new(StreamingCursorWithArc {
545                        cursor,
546                        _compiled: compiled.clone(),
547                    }) as Box<dyn RowSource + 'static>
548                })
549                .map_err(|e| anyhow!(e))
550        };
551
552        match cursor {
553            Ok(cursor) => Ok(QueryStream::for_tx_cursor(
554                cursor,
555                columns,
556                self.inner.clone(),
557                rollback_on_drop,
558            )),
559            Err(err) => {
560                finalize_tx_stream(&self.inner, false, rollback_on_drop);
561                Err(err)
562            }
563        }
564    }
565
566    /// Commit the transaction and publish staged changes.
567    ///
568    /// When WAL is attached the buffered tx-local mutation log is
569    /// replayed into the durable WAL as a single committed
570    /// transaction; recovery therefore observes either every write
571    /// in this transaction or none.
572    pub fn commit(mut self) -> Result<()> {
573        // Apply any pending statement rollback first (cursor was
574        // dropped pre-exhaustion in a previous step). After that we
575        // hold the staged graph and buffer in their final shape.
576        let (staged, buffer_events, mode) = {
577            let mut inner = self.inner.lock().unwrap();
578            if inner.cursor_active {
579                return Err(anyhow!(
580                    "cannot commit transaction while a streaming cursor is still active"
581                ));
582            }
583            if inner.cursor_dropped_dirty {
584                if let Some(sp) = inner.pending_savepoint.take() {
585                    apply_savepoint(&mut inner, sp);
586                }
587                inner.cursor_dropped_dirty = false;
588            }
589            if inner.closed {
590                return Err(anyhow!("transaction is already closed"));
591            }
592            let mode = inner.mode;
593            // Both modes can have `staged = None`: ReadOnly never
594            // clones, and ReadWrite tx that performed no writes
595            // (or where every write was rolled back via a
596            // savepoint) leaves it unmaterialized too.
597            let staged = inner.staged.take();
598            let buffer_events = std::mem::take(&mut *inner.buffer.lock().unwrap());
599            inner.closed = true;
600            (staged, buffer_events, mode)
601        };
602
603        // Replay the tx-local mutation buffer into the real WAL as
604        // one committed transaction. Read-only transactions never
605        // touch the WAL — `arm` is only called when there is durable
606        // work to commit.
607        if let Some(rec) = &self.wal {
608            if matches!(mode, TransactionMode::ReadWrite) && !buffer_events.is_empty() {
609                rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
610                for event in &buffer_events {
611                    rec.record(event);
612                    if let Some(reason) = rec.poisoned() {
613                        return Err(anyhow!("WAL poisoned during commit replay: {reason}"));
614                    }
615                }
616                match rec.commit() {
617                    Ok(WroteCommit::Yes) => {
618                        rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
619                    }
620                    Ok(WroteCommit::No) => {}
621                    Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
622                }
623                if let Some(reason) = rec.poisoned() {
624                    return Err(anyhow!("WAL poisoned: {reason}"));
625                }
626            }
627        }
628
629        if matches!(mode, TransactionMode::ReadWrite) {
630            if let Some(mut staged) = staged {
631                // Strip the buffering recorder from the staged graph
632                // before publishing it as the live store; the live store
633                // either has the durable WAL recorder reinstalled below
634                // or no recorder at all (for non-WAL databases).
635                staged.set_mutation_recorder(None);
636                if let Some(rec) = &self.wal {
637                    staged.set_mutation_recorder(Some(rec.clone() as Arc<dyn MutationRecorder>));
638                }
639                let live = self
640                    .live
641                    .as_mut()
642                    .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
643                let live = live
644                    .as_graph_mut()
645                    .ok_or_else(|| anyhow!("read-only transaction cannot publish staged graph"))?;
646                *live = staged;
647            }
648        }
649
650        self.live.take();
651        Ok(())
652    }
653
654    /// Roll back the transaction. Staged graph changes and buffered
655    /// mutations are discarded; the WAL is never armed.
656    pub fn rollback(mut self) -> Result<()> {
657        let mut inner = self.inner.lock().unwrap();
658        if inner.closed {
659            return Err(anyhow!("transaction is already closed"));
660        }
661        discard_transaction_state(&mut inner);
662        drop(inner);
663        self.live.take();
664        Ok(())
665    }
666
667    /// Acquire the inner state for a new statement. Validates that
668    /// the transaction is still open and no cursor is active, and
669    /// applies any pending savepoint left behind by a dropped
670    /// cursor. The staged graph is *not* required: ReadWrite
671    /// transactions defer the staging clone until the first
672    /// mutating statement (see [`Transaction::ensure_staged_locked`]).
673    fn begin_statement(&self) -> Result<MutexGuard<'_, TxInner>> {
674        let mut inner = self.inner.lock().unwrap();
675        if inner.closed {
676            return Err(anyhow!("transaction is already closed"));
677        }
678        if inner.cursor_active {
679            return Err(anyhow!(
680                "cannot start a new statement while a streaming cursor is still active"
681            ));
682        }
683        if inner.cursor_dropped_dirty {
684            if let Some(sp) = inner.pending_savepoint.take() {
685                apply_savepoint(&mut inner, sp);
686            }
687            inner.cursor_dropped_dirty = false;
688        }
689        Ok(inner)
690    }
691
692    /// Cheap state check for the ReadOnly fast path: closed +
693    /// cursor_active. No staged-graph check — ReadOnly tx has no
694    /// staged graph by construction.
695    fn precheck_open_no_savepoint(&self) -> Result<()> {
696        let inner = self.inner.lock().unwrap();
697        if inner.closed {
698            return Err(anyhow!("transaction is already closed"));
699        }
700        if inner.cursor_active {
701            return Err(anyhow!(
702                "cannot start a new statement while a streaming cursor is still active"
703            ));
704        }
705        Ok(())
706    }
707
708    /// True if the transaction was begun in `ReadOnly` mode. Cheap
709    /// — `mode` doesn't change after `begin_transaction`, so we
710    /// pay one small state-lock acquisition.
711    fn is_read_only_unchecked(&self) -> bool {
712        matches!(self.mode, TransactionMode::ReadOnly)
713    }
714
715    fn lock_inner_unchecked(&self) -> MutexGuard<'_, TxInner> {
716        self.inner
717            .lock()
718            .unwrap_or_else(|poisoned| poisoned.into_inner())
719    }
720}
721
722type ExecResultRows = Result<Vec<Row>>;
723
724impl TxInner {
725    fn staged_mut(&mut self) -> Result<&mut InMemoryGraph> {
726        self.staged
727            .as_mut()
728            .ok_or_else(|| anyhow!("transaction has no staged graph"))
729    }
730}
731
732/// `RowSource` adapter that owns an `Arc<CompiledQuery>` so the
733/// inner cursor's `'static` borrows into the plan stay valid for the
734/// life of the wrapper. The inner cursor is stored first so it drops
735/// before the Arc, releasing any borrows back into the plan.
736struct StreamingCursorWithArc {
737    cursor: Box<dyn RowSource + 'static>,
738    _compiled: Arc<CompiledQuery>,
739}
740
741impl RowSource for StreamingCursorWithArc {
742    fn next_row(&mut self) -> lora_executor::ExecResult<Option<Row>> {
743        self.cursor.next_row()
744    }
745}
746
747pub(crate) fn finalize_tx_stream(
748    handle: &Arc<Mutex<TxInner>>,
749    exhausted: bool,
750    rollback_on_drop: bool,
751) {
752    if let Ok(mut inner) = handle.lock() {
753        inner.cursor_active = false;
754
755        if inner.closed {
756            discard_transaction_state(&mut inner);
757            return;
758        }
759
760        if exhausted || !rollback_on_drop {
761            inner.pending_savepoint = None;
762            inner.cursor_dropped_dirty = false;
763            return;
764        }
765
766        if let Some(sp) = inner.pending_savepoint.take() {
767            apply_savepoint(&mut inner, sp);
768        }
769        inner.cursor_dropped_dirty = false;
770    }
771}
772
773fn discard_transaction_state(inner: &mut TxInner) {
774    // A full transaction rollback supersedes any pending cursor savepoint.
775    inner.pending_savepoint = None;
776    inner.cursor_dropped_dirty = false;
777    inner.cursor_active = false;
778    inner.staged = None;
779    if let Ok(mut buf) = inner.buffer.lock() {
780        buf.clear();
781    }
782    inner.closed = true;
783}
784
785fn take_savepoint(inner: &TxInner, clone_staged: bool) -> Savepoint {
786    let buffer_len = inner.buffer.lock().ok().map(|b| b.len()).unwrap_or(0);
787    Savepoint {
788        staged: if clone_staged {
789            inner.staged.as_ref().cloned()
790        } else {
791            None
792        },
793        buffer_len,
794    }
795}
796
797fn restore_savepoint(inner: &mut TxInner, savepoint: Option<Savepoint>) {
798    if let Some(sp) = savepoint {
799        apply_savepoint(inner, sp);
800    }
801}
802
803fn apply_savepoint(inner: &mut TxInner, sp: Savepoint) {
804    if let Ok(mut buf) = inner.buffer.lock() {
805        buf.truncate(sp.buffer_len);
806    }
807
808    let Some(mut graph) = sp.staged else {
809        inner.staged = None;
810        return;
811    };
812
813    // Rebuild the staged graph from the snapshot and re-install the
814    // buffering recorder. `InMemoryGraph::clone` deliberately drops
815    // recorders, so the snapshot has none until we put it back.
816    if matches!(inner.mode, TransactionMode::ReadWrite) && inner.buffer_mutations {
817        graph.set_mutation_recorder(Some(
818            Arc::new(BufferingRecorder::new(inner.buffer.clone())) as Arc<dyn MutationRecorder>
819        ));
820    }
821    inner.staged = Some(graph);
822}
823
824impl Drop for Transaction<'_> {
825    fn drop(&mut self) {
826        // If the user never called commit/rollback, treat it as a
827        // rollback: drop staged changes and the buffered mutations.
828        // The live RwLock guard is released as part of dropping `self.live`.
829        if let Ok(mut inner) = self.inner.lock() {
830            if !inner.closed {
831                if inner.cursor_active {
832                    // A tx-bound stream may still be borrowing the
833                    // staged graph through `inner`. Leave that graph
834                    // in place until the stream drops, but mark the
835                    // transaction closed so finalization discards it
836                    // instead of making it commit-eligible.
837                    inner.closed = true;
838                } else {
839                    discard_transaction_state(&mut inner);
840                }
841            }
842        }
843    }
844}