1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex, MutexGuard, RwLockReadGuard, RwLockWriteGuard};
3use std::time::{Duration, Instant};
4
5use anyhow::{anyhow, Result};
6use lora_analyzer::Analyzer;
7use lora_compiler::{CompiledQuery, Compiler};
8use lora_executor::{
9 classify_stream, compiled_result_columns, project_rows, ExecuteOptions, ExecutionContext,
10 Executor, LoraValue, MutableExecutionContext, MutableExecutor, MutablePullExecutor,
11 PullExecutor, QueryResult, Row, RowSource,
12};
13use lora_parser::parse_query;
14use lora_store::{InMemoryGraph, MutationEvent, MutationRecorder};
15use lora_wal::{WalRecorder, WroteCommit};
16
17use crate::stream::QueryStream;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum TransactionMode {
22 ReadOnly,
24 ReadWrite,
26}
27
28pub(crate) enum LiveStoreGuard<'db> {
29 Read(RwLockReadGuard<'db, InMemoryGraph>),
30 Write(RwLockWriteGuard<'db, InMemoryGraph>),
31}
32
33impl LiveStoreGuard<'_> {
34 fn as_graph(&self) -> &InMemoryGraph {
35 match self {
36 Self::Read(guard) => guard,
37 Self::Write(guard) => guard,
38 }
39 }
40
41 fn as_graph_mut(&mut self) -> Option<&mut InMemoryGraph> {
42 match self {
43 Self::Read(_) => None,
44 Self::Write(guard) => Some(guard),
45 }
46 }
47}
48
49pub(crate) struct Savepoint {
54 staged: Option<InMemoryGraph>,
55 buffer_len: usize,
56}
57
58struct BufferingRecorder {
65 buffer: Arc<Mutex<Vec<MutationEvent>>>,
66}
67
68impl BufferingRecorder {
69 fn new(buffer: Arc<Mutex<Vec<MutationEvent>>>) -> Self {
70 Self { buffer }
71 }
72}
73
74impl MutationRecorder for BufferingRecorder {
75 fn record(&self, event: &MutationEvent) {
76 if let Ok(mut buf) = self.buffer.lock() {
77 buf.push(event.clone());
78 }
79 }
80}
81
82pub(crate) struct TxInner {
87 pub(crate) staged: Option<InMemoryGraph>,
91 pub(crate) buffer: Arc<Mutex<Vec<MutationEvent>>>,
95 pub(crate) pending_savepoint: Option<Savepoint>,
99 pub(crate) cursor_active: bool,
103 pub(crate) cursor_dropped_dirty: bool,
107 pub(crate) closed: bool,
111 pub(crate) mode: TransactionMode,
113 pub(crate) buffer_mutations: bool,
117}
118
119pub struct Transaction<'db> {
135 pub(crate) live: Option<LiveStoreGuard<'db>>,
136 pub(crate) inner: Arc<Mutex<TxInner>>,
137 pub(crate) wal: Option<Arc<WalRecorder>>,
138 mode: TransactionMode,
139}
140
141impl<'db> Transaction<'db> {
142 pub(crate) fn new(
144 live: LiveStoreGuard<'db>,
145 wal: Option<Arc<WalRecorder>>,
146 mode: TransactionMode,
147 ) -> Self {
148 let buffer_mutations = wal.is_some();
149 let inner = TxInner {
150 staged: None,
151 buffer: Arc::new(Mutex::new(Vec::new())),
152 pending_savepoint: None,
153 cursor_active: false,
154 cursor_dropped_dirty: false,
155 closed: false,
156 mode,
157 buffer_mutations,
158 };
159 Self {
160 live: Some(live),
161 inner: Arc::new(Mutex::new(inner)),
162 wal,
163 mode,
164 }
165 }
166
167 pub fn mode(&self) -> TransactionMode {
169 self.mode
170 }
171
172 pub fn execute(&mut self, query: &str, options: Option<ExecuteOptions>) -> Result<QueryResult> {
175 self.execute_with_params(query, options, BTreeMap::new())
176 }
177
178 pub fn execute_with_timeout(
180 &mut self,
181 query: &str,
182 options: Option<ExecuteOptions>,
183 timeout: Duration,
184 ) -> Result<QueryResult> {
185 let deadline = Instant::now()
186 .checked_add(timeout)
187 .unwrap_or_else(Instant::now);
188 let rows =
189 self.execute_rows_with_params_deadline(query, BTreeMap::new(), Some(deadline))?;
190 Ok(project_rows(rows, options.unwrap_or_default()))
191 }
192
193 pub fn execute_with_params(
195 &mut self,
196 query: &str,
197 options: Option<ExecuteOptions>,
198 params: BTreeMap<String, LoraValue>,
199 ) -> Result<QueryResult> {
200 let rows = self.execute_rows_with_params_deadline(query, params, None)?;
201 Ok(project_rows(rows, options.unwrap_or_default()))
202 }
203
204 pub fn execute_with_params_timeout(
207 &mut self,
208 query: &str,
209 options: Option<ExecuteOptions>,
210 params: BTreeMap<String, LoraValue>,
211 timeout: Duration,
212 ) -> Result<QueryResult> {
213 let deadline = Instant::now()
214 .checked_add(timeout)
215 .unwrap_or_else(Instant::now);
216 let rows = self.execute_rows_with_params_deadline(query, params, Some(deadline))?;
217 Ok(project_rows(rows, options.unwrap_or_default()))
218 }
219
220 pub fn execute_rows(&mut self, query: &str) -> Result<Vec<Row>> {
223 self.execute_rows_with_params(query, BTreeMap::new())
224 }
225
226 pub fn execute_rows_with_params(
229 &mut self,
230 query: &str,
231 params: BTreeMap<String, LoraValue>,
232 ) -> Result<Vec<Row>> {
233 self.execute_rows_with_params_deadline(query, params, None)
234 }
235
236 fn execute_rows_with_params_deadline(
237 &mut self,
238 query: &str,
239 params: BTreeMap<String, LoraValue>,
240 deadline: Option<Instant>,
241 ) -> Result<Vec<Row>> {
242 let compiled = self.compile_in_tx(query)?;
243 self.execute_rows_compiled_deadline(&compiled, params, deadline)
244 }
245
246 fn execute_rows_compiled_deadline(
247 &mut self,
248 compiled: &CompiledQuery,
249 params: BTreeMap<String, LoraValue>,
250 deadline: Option<Instant>,
251 ) -> Result<Vec<Row>> {
252 if self.is_read_only_unchecked() {
254 self.precheck_open_no_savepoint()?;
255 let live = self
256 .live
257 .as_ref()
258 .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
259 let storage = live.as_graph();
260 let executor = Executor::with_deadline(ExecutionContext { storage, params }, deadline);
261 return executor
262 .execute_compiled_rows(compiled)
263 .map_err(|e| anyhow!(e));
264 }
265
266 let mut inner = self.begin_statement()?;
268 let is_mutating = classify_stream(compiled).is_mutating();
269
270 if !is_mutating {
271 return match inner.staged.as_ref() {
277 Some(staged) => {
278 let executor = Executor::with_deadline(
279 ExecutionContext {
280 storage: staged,
281 params,
282 },
283 deadline,
284 );
285 executor
286 .execute_compiled_rows(compiled)
287 .map_err(|e| anyhow!(e))
288 }
289 None => {
290 drop(inner);
291 let live = self
292 .live
293 .as_ref()
294 .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
295 let storage = live.as_graph();
296 let executor =
297 Executor::with_deadline(ExecutionContext { storage, params }, deadline);
298 executor
299 .execute_compiled_rows(compiled)
300 .map_err(|e| anyhow!(e))
301 }
302 };
303 }
304
305 let clone_savepoint_graph = inner.staged.is_some();
309 self.ensure_staged_locked(&mut inner)?;
310 let savepoint = Some(take_savepoint(&inner, clone_savepoint_graph));
311
312 let exec_result: ExecResultRows = {
313 let staged = inner.staged_mut()?;
314 let mut executor = MutableExecutor::with_deadline(
315 MutableExecutionContext {
316 storage: staged,
317 params,
318 },
319 deadline,
320 );
321 executor
322 .execute_compiled_rows(compiled)
323 .map_err(|e| anyhow!(e))
324 };
325
326 match exec_result {
327 Ok(rows) => Ok(rows),
328 Err(err) => {
329 restore_savepoint(&mut inner, savepoint);
330 Err(err)
331 }
332 }
333 }
334
335 pub(crate) fn open_streaming_compiled_autocommit(
364 &mut self,
365 compiled: Arc<CompiledQuery>,
366 params: BTreeMap<String, LoraValue>,
367 ) -> Result<Box<dyn RowSource + 'static>> {
368 if self.is_read_only_unchecked() {
369 return Err(anyhow!(
370 "streaming write cursor requires a ReadWrite transaction"
371 ));
372 }
373
374 let mut inner = self.begin_statement()?;
375 self.ensure_staged_locked(&mut inner)?;
376 inner.cursor_active = true;
377
378 let staged_ptr: *mut InMemoryGraph = inner
382 .staged
383 .as_mut()
384 .expect("ensure_staged_locked guarantees Some")
385 as *mut _;
386 drop(inner);
387
388 let storage_static: &'static mut InMemoryGraph = unsafe { &mut *staged_ptr };
394 let compiled_static: &'static CompiledQuery =
395 unsafe { std::mem::transmute::<&CompiledQuery, _>(compiled.as_ref()) };
396
397 let cursor = MutablePullExecutor::new(storage_static, params)
401 .open_compiled(compiled_static)
402 .map_err(|e| {
403 if let Ok(mut inner) = self.inner.lock() {
407 discard_transaction_state(&mut inner);
408 }
409 self.live.take();
410 anyhow!(e)
411 })?;
412
413 Ok(Box::new(StreamingCursorWithArc {
419 cursor,
420 _compiled: compiled,
421 }))
422 }
423
424 fn compile_in_tx(&self, query: &str) -> Result<CompiledQuery> {
430 let document = parse_query(query)?;
431 let resolved = {
432 let inner = self.lock_inner_unchecked();
433 if let Some(staged) = &inner.staged {
434 let mut analyzer = Analyzer::new(staged);
435 analyzer.analyze(&document)?
436 } else {
437 drop(inner);
438 let live = self
439 .live
440 .as_ref()
441 .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
442 let mut analyzer = Analyzer::new(live.as_graph());
443 analyzer.analyze(&document)?
444 }
445 };
446 Ok(Compiler::compile(&resolved))
447 }
448
449 fn ensure_staged_locked(&self, inner: &mut MutexGuard<'_, TxInner>) -> Result<()> {
453 if inner.staged.is_some() {
454 return Ok(());
455 }
456 let live = self
457 .live
458 .as_ref()
459 .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
460 let mut staged: InMemoryGraph = live.as_graph().clone();
461 if matches!(inner.mode, TransactionMode::ReadWrite) && inner.buffer_mutations {
462 staged.set_mutation_recorder(Some(
463 Arc::new(BufferingRecorder::new(inner.buffer.clone())) as Arc<dyn MutationRecorder>,
464 ));
465 }
466 inner.staged = Some(staged);
467 Ok(())
468 }
469
470 pub fn stream(&mut self, query: &str) -> Result<QueryStream<'static>> {
472 self.stream_with_params(query, BTreeMap::new())
473 }
474
475 pub fn stream_with_params(
478 &mut self,
479 query: &str,
480 params: BTreeMap<String, LoraValue>,
481 ) -> Result<QueryStream<'static>> {
482 let compiled = Arc::new(self.compile_in_tx(query)?);
483 let columns = compiled_result_columns(&compiled);
484 self.stream_compiled(compiled, columns, params)
485 }
486
487 pub(crate) fn stream_compiled(
491 &mut self,
492 compiled: Arc<CompiledQuery>,
493 columns: Vec<String>,
494 params: BTreeMap<String, LoraValue>,
495 ) -> Result<QueryStream<'static>> {
496 let mut inner = self.begin_statement()?;
497 let is_mutating = classify_stream(&compiled).is_mutating();
498 if matches!(inner.mode, TransactionMode::ReadOnly) && is_mutating {
499 return Err(anyhow!(
500 "cannot execute mutating query in read-only transaction"
501 ));
502 }
503
504 let clone_savepoint_graph = inner.staged.is_some();
509 self.ensure_staged_locked(&mut inner)?;
510 inner.cursor_active = true;
511
512 let rollback_on_drop = is_mutating;
513 if rollback_on_drop {
514 inner.pending_savepoint = Some(take_savepoint(&inner, clone_savepoint_graph));
515 } else {
516 inner.pending_savepoint = None;
517 }
518
519 let staged_ptr: *mut InMemoryGraph = inner
520 .staged
521 .as_mut()
522 .expect("ensure_staged_locked guarantees Some")
523 as *mut _;
524 drop(inner);
525
526 let compiled_static: &'static CompiledQuery =
527 unsafe { std::mem::transmute::<&CompiledQuery, _>(compiled.as_ref()) };
528 let cursor: Result<Box<dyn RowSource + 'static>> = if is_mutating {
529 let storage_static: &'static mut InMemoryGraph = unsafe { &mut *staged_ptr };
530 MutablePullExecutor::new(storage_static, params)
531 .open_compiled(compiled_static)
532 .map(|cursor| {
533 Box::new(StreamingCursorWithArc {
534 cursor,
535 _compiled: compiled.clone(),
536 }) as Box<dyn RowSource + 'static>
537 })
538 .map_err(|e| anyhow!(e))
539 } else {
540 let storage_static: &'static InMemoryGraph = unsafe { &*staged_ptr };
541 PullExecutor::new(storage_static, params)
542 .open_compiled(compiled_static)
543 .map(|cursor| {
544 Box::new(StreamingCursorWithArc {
545 cursor,
546 _compiled: compiled.clone(),
547 }) as Box<dyn RowSource + 'static>
548 })
549 .map_err(|e| anyhow!(e))
550 };
551
552 match cursor {
553 Ok(cursor) => Ok(QueryStream::for_tx_cursor(
554 cursor,
555 columns,
556 self.inner.clone(),
557 rollback_on_drop,
558 )),
559 Err(err) => {
560 finalize_tx_stream(&self.inner, false, rollback_on_drop);
561 Err(err)
562 }
563 }
564 }
565
566 pub fn commit(mut self) -> Result<()> {
573 let (staged, buffer_events, mode) = {
577 let mut inner = self.inner.lock().unwrap();
578 if inner.cursor_active {
579 return Err(anyhow!(
580 "cannot commit transaction while a streaming cursor is still active"
581 ));
582 }
583 if inner.cursor_dropped_dirty {
584 if let Some(sp) = inner.pending_savepoint.take() {
585 apply_savepoint(&mut inner, sp);
586 }
587 inner.cursor_dropped_dirty = false;
588 }
589 if inner.closed {
590 return Err(anyhow!("transaction is already closed"));
591 }
592 let mode = inner.mode;
593 let staged = inner.staged.take();
598 let buffer_events = std::mem::take(&mut *inner.buffer.lock().unwrap());
599 inner.closed = true;
600 (staged, buffer_events, mode)
601 };
602
603 if let Some(rec) = &self.wal {
608 if matches!(mode, TransactionMode::ReadWrite) && !buffer_events.is_empty() {
609 rec.arm().map_err(|e| anyhow!("WAL arm failed: {e}"))?;
610 for event in &buffer_events {
611 rec.record(event);
612 if let Some(reason) = rec.poisoned() {
613 return Err(anyhow!("WAL poisoned during commit replay: {reason}"));
614 }
615 }
616 match rec.commit() {
617 Ok(WroteCommit::Yes) => {
618 rec.flush().map_err(|e| anyhow!("WAL flush failed: {e}"))?;
619 }
620 Ok(WroteCommit::No) => {}
621 Err(e) => return Err(anyhow!("WAL commit failed: {e}")),
622 }
623 if let Some(reason) = rec.poisoned() {
624 return Err(anyhow!("WAL poisoned: {reason}"));
625 }
626 }
627 }
628
629 if matches!(mode, TransactionMode::ReadWrite) {
630 if let Some(mut staged) = staged {
631 staged.set_mutation_recorder(None);
636 if let Some(rec) = &self.wal {
637 staged.set_mutation_recorder(Some(rec.clone() as Arc<dyn MutationRecorder>));
638 }
639 let live = self
640 .live
641 .as_mut()
642 .ok_or_else(|| anyhow!("transaction has no live graph guard"))?;
643 let live = live
644 .as_graph_mut()
645 .ok_or_else(|| anyhow!("read-only transaction cannot publish staged graph"))?;
646 *live = staged;
647 }
648 }
649
650 self.live.take();
651 Ok(())
652 }
653
654 pub fn rollback(mut self) -> Result<()> {
657 let mut inner = self.inner.lock().unwrap();
658 if inner.closed {
659 return Err(anyhow!("transaction is already closed"));
660 }
661 discard_transaction_state(&mut inner);
662 drop(inner);
663 self.live.take();
664 Ok(())
665 }
666
667 fn begin_statement(&self) -> Result<MutexGuard<'_, TxInner>> {
674 let mut inner = self.inner.lock().unwrap();
675 if inner.closed {
676 return Err(anyhow!("transaction is already closed"));
677 }
678 if inner.cursor_active {
679 return Err(anyhow!(
680 "cannot start a new statement while a streaming cursor is still active"
681 ));
682 }
683 if inner.cursor_dropped_dirty {
684 if let Some(sp) = inner.pending_savepoint.take() {
685 apply_savepoint(&mut inner, sp);
686 }
687 inner.cursor_dropped_dirty = false;
688 }
689 Ok(inner)
690 }
691
692 fn precheck_open_no_savepoint(&self) -> Result<()> {
696 let inner = self.inner.lock().unwrap();
697 if inner.closed {
698 return Err(anyhow!("transaction is already closed"));
699 }
700 if inner.cursor_active {
701 return Err(anyhow!(
702 "cannot start a new statement while a streaming cursor is still active"
703 ));
704 }
705 Ok(())
706 }
707
708 fn is_read_only_unchecked(&self) -> bool {
712 matches!(self.mode, TransactionMode::ReadOnly)
713 }
714
715 fn lock_inner_unchecked(&self) -> MutexGuard<'_, TxInner> {
716 self.inner
717 .lock()
718 .unwrap_or_else(|poisoned| poisoned.into_inner())
719 }
720}
721
722type ExecResultRows = Result<Vec<Row>>;
723
724impl TxInner {
725 fn staged_mut(&mut self) -> Result<&mut InMemoryGraph> {
726 self.staged
727 .as_mut()
728 .ok_or_else(|| anyhow!("transaction has no staged graph"))
729 }
730}
731
732struct StreamingCursorWithArc {
737 cursor: Box<dyn RowSource + 'static>,
738 _compiled: Arc<CompiledQuery>,
739}
740
741impl RowSource for StreamingCursorWithArc {
742 fn next_row(&mut self) -> lora_executor::ExecResult<Option<Row>> {
743 self.cursor.next_row()
744 }
745}
746
747pub(crate) fn finalize_tx_stream(
748 handle: &Arc<Mutex<TxInner>>,
749 exhausted: bool,
750 rollback_on_drop: bool,
751) {
752 if let Ok(mut inner) = handle.lock() {
753 inner.cursor_active = false;
754
755 if inner.closed {
756 discard_transaction_state(&mut inner);
757 return;
758 }
759
760 if exhausted || !rollback_on_drop {
761 inner.pending_savepoint = None;
762 inner.cursor_dropped_dirty = false;
763 return;
764 }
765
766 if let Some(sp) = inner.pending_savepoint.take() {
767 apply_savepoint(&mut inner, sp);
768 }
769 inner.cursor_dropped_dirty = false;
770 }
771}
772
773fn discard_transaction_state(inner: &mut TxInner) {
774 inner.pending_savepoint = None;
776 inner.cursor_dropped_dirty = false;
777 inner.cursor_active = false;
778 inner.staged = None;
779 if let Ok(mut buf) = inner.buffer.lock() {
780 buf.clear();
781 }
782 inner.closed = true;
783}
784
785fn take_savepoint(inner: &TxInner, clone_staged: bool) -> Savepoint {
786 let buffer_len = inner.buffer.lock().ok().map(|b| b.len()).unwrap_or(0);
787 Savepoint {
788 staged: if clone_staged {
789 inner.staged.as_ref().cloned()
790 } else {
791 None
792 },
793 buffer_len,
794 }
795}
796
797fn restore_savepoint(inner: &mut TxInner, savepoint: Option<Savepoint>) {
798 if let Some(sp) = savepoint {
799 apply_savepoint(inner, sp);
800 }
801}
802
803fn apply_savepoint(inner: &mut TxInner, sp: Savepoint) {
804 if let Ok(mut buf) = inner.buffer.lock() {
805 buf.truncate(sp.buffer_len);
806 }
807
808 let Some(mut graph) = sp.staged else {
809 inner.staged = None;
810 return;
811 };
812
813 if matches!(inner.mode, TransactionMode::ReadWrite) && inner.buffer_mutations {
817 graph.set_mutation_recorder(Some(
818 Arc::new(BufferingRecorder::new(inner.buffer.clone())) as Arc<dyn MutationRecorder>
819 ));
820 }
821 inner.staged = Some(graph);
822}
823
824impl Drop for Transaction<'_> {
825 fn drop(&mut self) {
826 if let Ok(mut inner) = self.inner.lock() {
830 if !inner.closed {
831 if inner.cursor_active {
832 inner.closed = true;
838 } else {
839 discard_transaction_state(&mut inner);
840 }
841 }
842 }
843 }
844}