1pub mod mvcc;
33
34use std::collections::{HashMap, HashSet};
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::{Arc, Mutex};
37
38use arrow::array::RecordBatch;
39use arrow::datatypes::Schema;
40
41pub use mvcc::{
42 RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionSnapshot, TxnId, TxnIdManager,
43};
44
45pub type SessionId = u64;
50
51use llkv_expr::expr::Expr as LlkvExpr;
52use llkv_plan::plans::{
53 ColumnSpec, CreateIndexPlan, CreateTablePlan, DeletePlan, InsertPlan, PlanOperation, PlanValue,
54 SelectPlan, UpdatePlan,
55};
56use llkv_result::{Error, Result as LlkvResult};
57use llkv_storage::pager::Pager;
58use simd_r_drive_entry_handle::EntryHandle;
59
60use llkv_executor::SelectExecution;
61
62pub struct RowBatch {
68 pub columns: Vec<String>,
69 pub rows: Vec<Vec<PlanValue>>,
70}
71
72#[derive(Clone, Debug)]
74pub enum TransactionKind {
75 Begin,
76 Commit,
77 Rollback,
78}
79
80fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
82 if plan.tables.len() == 1 {
83 Some(plan.tables[0].qualified_name())
84 } else {
85 None
86 }
87}
88
89#[allow(clippy::large_enum_variant)] #[derive(Clone, Debug)]
92pub enum TransactionResult<P>
93where
94 P: Pager<Blob = EntryHandle> + Send + Sync,
95{
96 CreateTable {
97 table_name: String,
98 },
99 Insert {
100 rows_inserted: usize,
101 },
102 Update {
103 rows_matched: usize,
104 rows_updated: usize,
105 },
106 Delete {
107 rows_deleted: usize,
108 },
109 CreateIndex {
110 table_name: String,
111 index_name: Option<String>,
112 },
113 Select {
114 table_name: String,
115 schema: Arc<Schema>,
116 execution: SelectExecution<P>,
117 },
118 Transaction {
119 kind: TransactionKind,
120 },
121}
122
123impl<P> TransactionResult<P>
124where
125 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
126{
127 pub fn convert_pager_type<P2>(self) -> LlkvResult<TransactionResult<P2>>
129 where
130 P2: Pager<Blob = EntryHandle> + Send + Sync + 'static,
131 {
132 match self {
133 TransactionResult::CreateTable { table_name } => {
134 Ok(TransactionResult::CreateTable { table_name })
135 }
136 TransactionResult::Insert { rows_inserted } => {
137 Ok(TransactionResult::Insert { rows_inserted })
138 }
139 TransactionResult::Update {
140 rows_matched,
141 rows_updated,
142 } => Ok(TransactionResult::Update {
143 rows_matched,
144 rows_updated,
145 }),
146 TransactionResult::Delete { rows_deleted } => {
147 Ok(TransactionResult::Delete { rows_deleted })
148 }
149 TransactionResult::CreateIndex {
150 table_name,
151 index_name,
152 } => Ok(TransactionResult::CreateIndex {
153 table_name,
154 index_name,
155 }),
156 TransactionResult::Transaction { kind } => Ok(TransactionResult::Transaction { kind }),
157 TransactionResult::Select { .. } => Err(Error::Internal(
158 "cannot convert SELECT TransactionResult between pager types".into(),
159 )),
160 }
161 }
162}
163
164pub trait TransactionContext: Send + Sync {
172 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
174
175 fn set_snapshot(&self, snapshot: mvcc::TransactionSnapshot);
177
178 fn snapshot(&self) -> mvcc::TransactionSnapshot;
180
181 fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<ColumnSpec>>;
183
184 fn export_table_rows(&self, table_name: &str) -> LlkvResult<RowBatch>;
186
187 fn get_batches_with_row_ids(
189 &self,
190 table_name: &str,
191 filter: Option<LlkvExpr<'static, String>>,
192 ) -> LlkvResult<Vec<RecordBatch>>;
193
194 fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
196
197 fn create_table_plan(
199 &self,
200 plan: CreateTablePlan,
201 ) -> LlkvResult<TransactionResult<Self::Pager>>;
202
203 fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
205
206 fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
208
209 fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
211
212 fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
214
215 fn append_batches_with_row_ids(
217 &self,
218 table_name: &str,
219 batches: Vec<RecordBatch>,
220 ) -> LlkvResult<usize>;
221
222 fn table_names(&self) -> Vec<String>;
224
225 fn table_id(&self, table_name: &str) -> LlkvResult<llkv_table::types::TableId>;
227
228 fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot;
230
231 fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
233 Ok(())
234 }
235
236 fn clear_transaction_state(&self, _txn_id: TxnId) {}
238}
239
240pub struct SessionTransaction<BaseCtx, StagingCtx>
242where
243 BaseCtx: TransactionContext + 'static,
244 StagingCtx: TransactionContext + 'static,
245{
246 snapshot: mvcc::TransactionSnapshot,
248 staging: Arc<StagingCtx>,
250 operations: Vec<PlanOperation>,
252 staged_tables: HashSet<String>,
254 new_tables: HashSet<String>,
256 missing_tables: HashSet<String>,
258 catalog_snapshot: llkv_table::catalog::TableCatalogSnapshot,
261 base_context: Arc<BaseCtx>,
263 is_aborted: bool,
265 txn_manager: Arc<TxnIdManager>,
267 accessed_tables: HashSet<String>,
269}
270
271impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
272where
273 BaseCtx: TransactionContext + 'static,
274 StagingCtx: TransactionContext + 'static,
275{
276 pub fn new(
277 base_context: Arc<BaseCtx>,
278 staging: Arc<StagingCtx>,
279 txn_manager: Arc<TxnIdManager>,
280 ) -> Self {
281 let catalog_snapshot = base_context.catalog_snapshot();
284
285 let snapshot = txn_manager.begin_transaction();
286 tracing::debug!(
287 "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
288 snapshot.txn_id,
289 snapshot.snapshot_id
290 );
291 TransactionContext::set_snapshot(&*base_context, snapshot);
292 TransactionContext::set_snapshot(&*staging, snapshot);
293
294 Self {
295 staging,
296 operations: Vec::new(),
297 staged_tables: HashSet::new(),
298 new_tables: HashSet::new(),
299 missing_tables: HashSet::new(),
300 catalog_snapshot,
301 base_context,
302 is_aborted: false,
303 accessed_tables: HashSet::new(),
304 snapshot,
305 txn_manager,
306 }
307 }
308
309 fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
312 tracing::trace!(
313 "[ENSURE] ensure_table_exists called for table='{}'",
314 table_name
315 );
316
317 if self.staged_tables.contains(table_name) {
319 tracing::trace!("[ENSURE] table already verified to exist");
320 return Ok(());
321 }
322
323 if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
325 {
326 self.missing_tables.insert(table_name.to_string());
327 return Err(Error::CatalogError(format!(
328 "Catalog Error: Table '{table_name}' does not exist"
329 )));
330 }
331
332 if self.missing_tables.contains(table_name) {
333 return Err(Error::CatalogError(format!(
334 "Catalog Error: Table '{table_name}' does not exist"
335 )));
336 }
337
338 if self.new_tables.contains(table_name) {
340 tracing::trace!("[ENSURE] Table was created in this transaction");
341 match self.staging.table_column_specs(table_name) {
343 Ok(_) => {
344 self.staged_tables.insert(table_name.to_string());
345 return Ok(());
346 }
347 Err(_) => {
348 return Err(Error::CatalogError(format!(
349 "Catalog Error: Table '{table_name}' was created but not found in staging"
350 )));
351 }
352 }
353 }
354
355 tracing::trace!(
357 "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
358 );
359 self.staged_tables.insert(table_name.to_string());
360 Ok(())
361 }
362
363 pub fn execute_select(
367 &mut self,
368 plan: SelectPlan,
369 ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
370 let table_name = select_plan_table_name(&plan).ok_or_else(|| {
372 Error::InvalidArgumentError(
373 "Transaction execute_select requires single-table query".into(),
374 )
375 })?;
376
377 self.ensure_table_exists(&table_name)?;
379
380 if self.new_tables.contains(&table_name) {
382 tracing::trace!(
383 "[SELECT] Reading from staging for new table '{}'",
384 table_name
385 );
386 return self.staging.execute_select(plan);
387 }
388
389 self.accessed_tables.insert(table_name.clone());
391
392 tracing::trace!(
395 "[SELECT] Reading from BASE with MVCC for existing table '{}'",
396 table_name
397 );
398 self.base_context.execute_select(plan).and_then(|exec| {
399 let schema = exec.schema();
403 let batches = exec.collect().unwrap_or_default();
404 let combined = if batches.is_empty() {
405 RecordBatch::new_empty(Arc::clone(&schema))
406 } else if batches.len() == 1 {
407 batches.into_iter().next().unwrap()
408 } else {
409 let refs: Vec<&RecordBatch> = batches.iter().collect();
410 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
411 Error::Internal(format!("failed to concatenate batches: {err}"))
412 })?
413 };
414 Ok(SelectExecution::from_batch(
415 table_name,
416 Arc::clone(&schema),
417 combined,
418 ))
419 })
420 }
421
422 pub fn execute_operation(
424 &mut self,
425 operation: PlanOperation,
426 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
427 tracing::trace!(
428 "[TX] SessionTransaction::execute_operation called, operation={:?}",
429 match &operation {
430 PlanOperation::Insert(p) => format!("INSERT({})", p.table),
431 PlanOperation::Update(p) => format!("UPDATE({})", p.table),
432 PlanOperation::Delete(p) => format!("DELETE({})", p.table),
433 PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
434 _ => "OTHER".to_string(),
435 }
436 );
437 if self.is_aborted {
439 return Err(Error::TransactionContextError(
440 "TransactionContext Error: transaction is aborted".into(),
441 ));
442 }
443
444 let result = match operation {
446 PlanOperation::CreateTable(ref plan) => {
447 match self.staging.create_table_plan(plan.clone()) {
448 Ok(result) => {
449 self.new_tables.insert(plan.name.clone());
451 self.missing_tables.remove(&plan.name);
452 self.staged_tables.insert(plan.name.clone());
453 self.operations
455 .push(PlanOperation::CreateTable(plan.clone()));
456 result.convert_pager_type()?
457 }
458 Err(e) => {
459 self.is_aborted = true;
460 return Err(e);
461 }
462 }
463 }
464 PlanOperation::Insert(ref plan) => {
465 tracing::trace!(
466 "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
467 plan.table
468 );
469 if let Err(e) = self.ensure_table_exists(&plan.table) {
471 self.is_aborted = true;
472 return Err(e);
473 }
474
475 let is_new_table = self.new_tables.contains(&plan.table);
478 if !is_new_table {
480 self.accessed_tables.insert(plan.table.clone());
481 }
482 let result = if is_new_table {
483 tracing::trace!("[TX] INSERT into staging for new table");
484 self.staging.insert(plan.clone())
485 } else {
486 tracing::trace!(
487 "[TX] INSERT directly into BASE with txn_id={}",
488 self.snapshot.txn_id
489 );
490 self.base_context
492 .insert(plan.clone())
493 .and_then(|r| r.convert_pager_type())
494 };
495
496 match result {
497 Ok(result) => {
498 if is_new_table {
501 tracing::trace!(
502 "[TX] INSERT to new table - tracking for commit replay"
503 );
504 self.operations.push(PlanOperation::Insert(plan.clone()));
505 } else {
506 tracing::trace!(
507 "[TX] INSERT to existing table - already in BASE, no replay needed"
508 );
509 }
510 result
511 }
512 Err(e) => {
513 tracing::trace!(
514 "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
515 e
516 );
517 tracing::trace!("DEBUG setting is_aborted=true");
518 self.is_aborted = true;
519 return Err(e);
520 }
521 }
522 }
523 PlanOperation::Update(ref plan) => {
524 if let Err(e) = self.ensure_table_exists(&plan.table) {
525 self.is_aborted = true;
526 return Err(e);
527 }
528
529 let is_new_table = self.new_tables.contains(&plan.table);
532 if !is_new_table {
534 self.accessed_tables.insert(plan.table.clone());
535 }
536 let result = if is_new_table {
537 tracing::trace!("[TX] UPDATE in staging for new table");
538 self.staging.update(plan.clone())
539 } else {
540 tracing::trace!(
541 "[TX] UPDATE directly in BASE with txn_id={}",
542 self.snapshot.txn_id
543 );
544 self.base_context
545 .update(plan.clone())
546 .and_then(|r| r.convert_pager_type())
547 };
548
549 match result {
550 Ok(result) => {
551 if is_new_table {
553 tracing::trace!(
554 "[TX] UPDATE to new table - tracking for commit replay"
555 );
556 self.operations.push(PlanOperation::Update(plan.clone()));
557 } else {
558 tracing::trace!(
559 "[TX] UPDATE to existing table - already in BASE, no replay needed"
560 );
561 }
562 result
563 }
564 Err(e) => {
565 self.is_aborted = true;
566 return Err(e);
567 }
568 }
569 }
570 PlanOperation::Delete(ref plan) => {
571 tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
572 if let Err(e) = self.ensure_table_exists(&plan.table) {
573 tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
574 self.is_aborted = true;
575 return Err(e);
576 }
577
578 let is_new_table = self.new_tables.contains(&plan.table);
581 tracing::debug!("[DELETE] is_new_table={}", is_new_table);
582 if !is_new_table {
584 tracing::debug!(
585 "[DELETE] Tracking access to existing table '{}'",
586 plan.table
587 );
588 self.accessed_tables.insert(plan.table.clone());
589 }
590 let result = if is_new_table {
591 tracing::debug!("[DELETE] Deleting from staging for new table");
592 self.staging.delete(plan.clone())
593 } else {
594 tracing::debug!(
595 "[DELETE] Deleting from BASE with txn_id={}",
596 self.snapshot.txn_id
597 );
598 self.base_context
599 .delete(plan.clone())
600 .and_then(|r| r.convert_pager_type())
601 };
602
603 tracing::debug!(
604 "[DELETE] Result: {:?}",
605 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
606 );
607 match result {
608 Ok(result) => {
609 if is_new_table {
611 tracing::trace!(
612 "[TX] DELETE from new table - tracking for commit replay"
613 );
614 self.operations.push(PlanOperation::Delete(plan.clone()));
615 } else {
616 tracing::trace!(
617 "[TX] DELETE from existing table - already in BASE, no replay needed"
618 );
619 }
620 result
621 }
622 Err(e) => {
623 self.is_aborted = true;
624 return Err(e);
625 }
626 }
627 }
628 PlanOperation::Select(ref plan) => {
629 let table_name = select_plan_table_name(plan).unwrap_or_default();
632 match self.execute_select(plan.clone()) {
633 Ok(staging_execution) => {
634 let schema = staging_execution.schema();
636 let batches = staging_execution.collect().unwrap_or_default();
637
638 let combined = if batches.is_empty() {
640 RecordBatch::new_empty(Arc::clone(&schema))
641 } else if batches.len() == 1 {
642 batches.into_iter().next().unwrap()
643 } else {
644 let refs: Vec<&RecordBatch> = batches.iter().collect();
645 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
646 Error::Internal(format!("failed to concatenate batches: {err}"))
647 })?
648 };
649
650 let execution = SelectExecution::from_batch(
652 table_name.clone(),
653 Arc::clone(&schema),
654 combined,
655 );
656
657 TransactionResult::Select {
658 table_name,
659 schema,
660 execution,
661 }
662 }
663 Err(e) => {
664 return Err(e);
667 }
668 }
669 }
670 };
671
672 Ok(result)
673 }
674
675 pub fn operations(&self) -> &[PlanOperation] {
677 &self.operations
678 }
679}
680
681pub struct TransactionSession<BaseCtx, StagingCtx>
684where
685 BaseCtx: TransactionContext + 'static,
686 StagingCtx: TransactionContext + 'static,
687{
688 context: Arc<BaseCtx>,
689 session_id: SessionId,
690 transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
691 txn_manager: Arc<TxnIdManager>,
692}
693
694impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
695where
696 BaseCtx: TransactionContext + 'static,
697 StagingCtx: TransactionContext + 'static,
698{
699 pub fn new(
700 context: Arc<BaseCtx>,
701 session_id: SessionId,
702 transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
703 txn_manager: Arc<TxnIdManager>,
704 ) -> Self {
705 Self {
706 context,
707 session_id,
708 transactions,
709 txn_manager,
710 }
711 }
712
713 pub fn clone_session(&self) -> Self {
716 Self {
717 context: Arc::clone(&self.context),
718 session_id: self.session_id,
719 transactions: Arc::clone(&self.transactions),
720 txn_manager: Arc::clone(&self.txn_manager),
721 }
722 }
723
724 pub fn session_id(&self) -> SessionId {
726 self.session_id
727 }
728
729 pub fn context(&self) -> &Arc<BaseCtx> {
731 &self.context
732 }
733
734 pub fn has_active_transaction(&self) -> bool {
736 self.transactions
737 .lock()
738 .expect("transactions lock poisoned")
739 .contains_key(&self.session_id)
740 }
741
742 pub fn is_aborted(&self) -> bool {
744 self.transactions
745 .lock()
746 .expect("transactions lock poisoned")
747 .get(&self.session_id)
748 .map(|tx| tx.is_aborted)
749 .unwrap_or(false)
750 }
751
752 pub fn abort_transaction(&self) {
755 let mut guard = self
756 .transactions
757 .lock()
758 .expect("transactions lock poisoned");
759 if let Some(tx) = guard.get_mut(&self.session_id) {
760 tx.is_aborted = true;
761 }
762 }
763
764 pub fn begin_transaction(
766 &self,
767 staging: Arc<StagingCtx>,
768 ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
769 tracing::debug!(
770 "[BEGIN] begin_transaction called for session_id={}",
771 self.session_id
772 );
773 let mut guard = self
774 .transactions
775 .lock()
776 .expect("transactions lock poisoned");
777 tracing::debug!(
778 "[BEGIN] session_id={}, transactions map has {} entries",
779 self.session_id,
780 guard.len()
781 );
782 if guard.contains_key(&self.session_id) {
783 return Err(Error::InvalidArgumentError(
784 "a transaction is already in progress in this session".into(),
785 ));
786 }
787 guard.insert(
788 self.session_id,
789 SessionTransaction::new(
790 Arc::clone(&self.context),
791 staging,
792 Arc::clone(&self.txn_manager),
793 ),
794 );
795 tracing::debug!(
796 "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
797 self.session_id,
798 guard.len()
799 );
800 Ok(TransactionResult::Transaction {
801 kind: TransactionKind::Begin,
802 })
803 }
804
805 pub fn commit_transaction(
808 &self,
809 ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
810 tracing::trace!(
811 "[COMMIT] commit_transaction called for session {:?}",
812 self.session_id
813 );
814 let mut guard = self
815 .transactions
816 .lock()
817 .expect("transactions lock poisoned");
818 tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
819 let tx_opt = guard.remove(&self.session_id);
820 tracing::trace!(
821 "[COMMIT] commit_transaction remove returned: {}",
822 tx_opt.is_some()
823 );
824 let tx = tx_opt.ok_or_else(|| {
825 tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
826 Error::InvalidArgumentError(
827 "no transaction is currently in progress in this session".into(),
828 )
829 })?;
830 tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
831
832 if tx.is_aborted {
834 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
835 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
836 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
837 let auto_commit_snapshot = TransactionSnapshot {
839 txn_id: TXN_ID_AUTO_COMMIT,
840 snapshot_id: tx.txn_manager.last_committed(),
841 };
842 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
843 tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
844 return Ok((
845 TransactionResult::Transaction {
846 kind: TransactionKind::Rollback,
847 },
848 Vec::new(),
849 ));
850 }
851
852 tracing::debug!(
855 "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
856 tx.snapshot.txn_id,
857 tx.accessed_tables.len()
858 );
859 for accessed_table_name in &tx.accessed_tables {
860 tracing::debug!(
861 "[COMMIT CONFLICT CHECK] Checking table '{}'",
862 accessed_table_name
863 );
864 if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
866 match self.context.table_id(accessed_table_name) {
868 Ok(current_table_id) => {
869 if current_table_id != snapshot_table_id {
871 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
872 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
873 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
874 let auto_commit_snapshot = TransactionSnapshot {
875 txn_id: TXN_ID_AUTO_COMMIT,
876 snapshot_id: tx.txn_manager.last_committed(),
877 };
878 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
879 return Err(Error::TransactionContextError(
880 "another transaction has dropped this table".into(),
881 ));
882 }
883 }
884 Err(_) => {
885 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
887 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
888 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
889 let auto_commit_snapshot = TransactionSnapshot {
890 txn_id: TXN_ID_AUTO_COMMIT,
891 snapshot_id: tx.txn_manager.last_committed(),
892 };
893 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
894 return Err(Error::TransactionContextError(
895 "another transaction has dropped this table".into(),
896 ));
897 }
898 }
899 }
900 }
901
902 if let Err(err) = tx
903 .base_context
904 .validate_commit_constraints(tx.snapshot.txn_id)
905 {
906 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
907 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
908 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
909 let auto_commit_snapshot = TransactionSnapshot {
910 txn_id: TXN_ID_AUTO_COMMIT,
911 snapshot_id: tx.txn_manager.last_committed(),
912 };
913 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
914 let wrapped = match err {
915 Error::ConstraintError(msg) => Error::TransactionContextError(format!(
916 "TransactionContext Error: constraint violation: {msg}"
917 )),
918 other => other,
919 };
920 return Err(wrapped);
921 }
922
923 let operations = tx.operations;
924 tracing::trace!(
925 "DEBUG commit_transaction: returning Commit with {} operations",
926 operations.len()
927 );
928
929 tx.txn_manager.mark_committed(tx.snapshot.txn_id);
930 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
931 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
932 TransactionContext::set_snapshot(&*self.context, tx.snapshot);
933
934 Ok((
935 TransactionResult::Transaction {
936 kind: TransactionKind::Commit,
937 },
938 operations,
939 ))
940 }
941
942 pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
944 let mut guard = self
945 .transactions
946 .lock()
947 .expect("transactions lock poisoned");
948 if let Some(tx) = guard.remove(&self.session_id) {
949 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
950 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
951 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
952 let auto_commit_snapshot = TransactionSnapshot {
954 txn_id: TXN_ID_AUTO_COMMIT,
955 snapshot_id: tx.txn_manager.last_committed(),
956 };
957 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
958 } else {
959 return Err(Error::InvalidArgumentError(
960 "no transaction is currently in progress in this session".into(),
961 ));
962 }
963 Ok(TransactionResult::Transaction {
964 kind: TransactionKind::Rollback,
965 })
966 }
967
968 pub fn execute_operation(
970 &self,
971 operation: PlanOperation,
972 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
973 tracing::debug!(
974 "[EXECUTE_OP] execute_operation called for session_id={}",
975 self.session_id
976 );
977 if !self.has_active_transaction() {
978 return Err(Error::InvalidArgumentError(
980 "execute_operation called without active transaction".into(),
981 ));
982 }
983
984 let mut guard = self
986 .transactions
987 .lock()
988 .expect("transactions lock poisoned");
989 tracing::debug!(
990 "[EXECUTE_OP] session_id={}, transactions map has {} entries",
991 self.session_id,
992 guard.len()
993 );
994 let tx = guard
995 .get_mut(&self.session_id)
996 .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
997 tracing::debug!(
998 "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
999 self.session_id,
1000 tx.snapshot.txn_id,
1001 tx.accessed_tables.len()
1002 );
1003
1004 let result = tx.execute_operation(operation);
1005 if let Err(ref e) = result {
1006 tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1007 tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1008 }
1009 result
1010 }
1011}
1012
1013impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1014where
1015 BaseCtx: TransactionContext,
1016 StagingCtx: TransactionContext,
1017{
1018 fn drop(&mut self) {
1019 match self.transactions.lock() {
1022 Ok(mut guard) => {
1023 if guard.remove(&self.session_id).is_some() {
1024 eprintln!(
1025 "Warning: TransactionSession dropped with active transaction - auto-rolling back"
1026 );
1027 }
1028 }
1029 Err(_) => {
1030 tracing::trace!(
1033 "Warning: TransactionSession dropped with poisoned transaction mutex"
1034 );
1035 }
1036 }
1037 }
1038}
1039
1040pub struct TransactionManager<BaseCtx, StagingCtx>
1042where
1043 BaseCtx: TransactionContext + 'static,
1044 StagingCtx: TransactionContext + 'static,
1045{
1046 transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1047 next_session_id: AtomicU64,
1048 txn_manager: Arc<TxnIdManager>,
1049}
1050
1051impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1052where
1053 BaseCtx: TransactionContext + 'static,
1054 StagingCtx: TransactionContext + 'static,
1055{
1056 pub fn new() -> Self {
1057 Self {
1058 transactions: Arc::new(Mutex::new(HashMap::new())),
1059 next_session_id: AtomicU64::new(1),
1060 txn_manager: Arc::new(TxnIdManager::new()),
1061 }
1062 }
1063
1064 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1066 Self {
1067 transactions: Arc::new(Mutex::new(HashMap::new())),
1068 next_session_id: AtomicU64::new(1),
1069 txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1070 }
1071 }
1072
1073 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1075 Self {
1076 transactions: Arc::new(Mutex::new(HashMap::new())),
1077 next_session_id: AtomicU64::new(1),
1078 txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1079 next_txn_id,
1080 last_committed,
1081 )),
1082 }
1083 }
1084
1085 pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1087 let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1088 tracing::debug!(
1089 "[TX_MANAGER] create_session: allocated session_id={}",
1090 session_id
1091 );
1092 TransactionSession::new(
1093 context,
1094 session_id,
1095 Arc::clone(&self.transactions),
1096 Arc::clone(&self.txn_manager),
1097 )
1098 }
1099
1100 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1102 Arc::clone(&self.txn_manager)
1103 }
1104
1105 pub fn has_active_transaction(&self) -> bool {
1107 !self
1108 .transactions
1109 .lock()
1110 .expect("transactions lock poisoned")
1111 .is_empty()
1112 }
1113}
1114
1115impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1116where
1117 BaseCtx: TransactionContext + 'static,
1118 StagingCtx: TransactionContext + 'static,
1119{
1120 fn default() -> Self {
1121 Self::new()
1122 }
1123}