1pub mod mvcc;
33
34use std::collections::{HashMap, HashSet};
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::{Arc, Mutex};
37
38use arrow::array::RecordBatch;
39use arrow::datatypes::Schema;
40
41pub use mvcc::{
42 RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionSnapshot, TxnId, TxnIdManager,
43};
44
45pub type SessionId = u64;
50
51use llkv_expr::expr::Expr as LlkvExpr;
52use llkv_plan::plans::{
53 ColumnSpec, CreateTablePlan, DeletePlan, InsertPlan, PlanOperation, PlanValue, SelectPlan,
54 UpdatePlan,
55};
56use llkv_result::{Error, Result as LlkvResult};
57use llkv_storage::pager::Pager;
58use simd_r_drive_entry_handle::EntryHandle;
59
60use llkv_executor::SelectExecution;
61
62pub struct RowBatch {
68 pub columns: Vec<String>,
69 pub rows: Vec<Vec<PlanValue>>,
70}
71
72#[derive(Clone, Debug)]
74pub enum TransactionKind {
75 Begin,
76 Commit,
77 Rollback,
78}
79
80#[allow(clippy::large_enum_variant)] #[derive(Clone, Debug)]
83pub enum TransactionResult<P>
84where
85 P: Pager<Blob = EntryHandle> + Send + Sync,
86{
87 CreateTable {
88 table_name: String,
89 },
90 Insert {
91 rows_inserted: usize,
92 },
93 Update {
94 rows_matched: usize,
95 rows_updated: usize,
96 },
97 Delete {
98 rows_deleted: usize,
99 },
100 Select {
101 table_name: String,
102 schema: Arc<Schema>,
103 execution: SelectExecution<P>,
104 },
105 Transaction {
106 kind: TransactionKind,
107 },
108}
109
110impl<P> TransactionResult<P>
111where
112 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
113{
114 pub fn convert_pager_type<P2>(self) -> LlkvResult<TransactionResult<P2>>
116 where
117 P2: Pager<Blob = EntryHandle> + Send + Sync + 'static,
118 {
119 match self {
120 TransactionResult::CreateTable { table_name } => {
121 Ok(TransactionResult::CreateTable { table_name })
122 }
123 TransactionResult::Insert { rows_inserted } => {
124 Ok(TransactionResult::Insert { rows_inserted })
125 }
126 TransactionResult::Update {
127 rows_matched,
128 rows_updated,
129 } => Ok(TransactionResult::Update {
130 rows_matched,
131 rows_updated,
132 }),
133 TransactionResult::Delete { rows_deleted } => {
134 Ok(TransactionResult::Delete { rows_deleted })
135 }
136 TransactionResult::Transaction { kind } => Ok(TransactionResult::Transaction { kind }),
137 TransactionResult::Select { .. } => Err(Error::Internal(
138 "cannot convert SELECT TransactionResult between pager types".into(),
139 )),
140 }
141 }
142}
143
144pub trait TransactionContext: Send + Sync {
152 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
154
155 fn set_snapshot(&self, snapshot: mvcc::TransactionSnapshot);
157
158 fn snapshot(&self) -> mvcc::TransactionSnapshot;
160
161 fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<ColumnSpec>>;
163
164 fn export_table_rows(&self, table_name: &str) -> LlkvResult<RowBatch>;
166
167 fn get_batches_with_row_ids(
169 &self,
170 table_name: &str,
171 filter: Option<LlkvExpr<'static, String>>,
172 ) -> LlkvResult<Vec<RecordBatch>>;
173
174 fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
176
177 fn create_table_plan(
179 &self,
180 plan: CreateTablePlan,
181 ) -> LlkvResult<TransactionResult<Self::Pager>>;
182
183 fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
185
186 fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
188
189 fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
191
192 fn append_batches_with_row_ids(
194 &self,
195 table_name: &str,
196 batches: Vec<RecordBatch>,
197 ) -> LlkvResult<usize>;
198
199 fn table_names(&self) -> Vec<String>;
201
202 fn table_id(&self, table_name: &str) -> LlkvResult<llkv_table::types::TableId>;
204
205 fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot;
207}
208
209pub struct SessionTransaction<BaseCtx, StagingCtx>
211where
212 BaseCtx: TransactionContext + 'static,
213 StagingCtx: TransactionContext + 'static,
214{
215 snapshot: mvcc::TransactionSnapshot,
217 staging: Arc<StagingCtx>,
219 operations: Vec<PlanOperation>,
221 staged_tables: HashSet<String>,
223 new_tables: HashSet<String>,
225 missing_tables: HashSet<String>,
227 catalog_snapshot: llkv_table::catalog::TableCatalogSnapshot,
230 base_context: Arc<BaseCtx>,
232 is_aborted: bool,
234 txn_manager: Arc<TxnIdManager>,
236 accessed_tables: HashSet<String>,
238}
239
240impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
241where
242 BaseCtx: TransactionContext + 'static,
243 StagingCtx: TransactionContext + 'static,
244{
245 pub fn new(
246 base_context: Arc<BaseCtx>,
247 staging: Arc<StagingCtx>,
248 txn_manager: Arc<TxnIdManager>,
249 ) -> Self {
250 let catalog_snapshot = base_context.catalog_snapshot();
253
254 let snapshot = txn_manager.begin_transaction();
255 tracing::debug!(
256 "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
257 snapshot.txn_id,
258 snapshot.snapshot_id
259 );
260 TransactionContext::set_snapshot(&*base_context, snapshot);
261 TransactionContext::set_snapshot(&*staging, snapshot);
262
263 Self {
264 staging,
265 operations: Vec::new(),
266 staged_tables: HashSet::new(),
267 new_tables: HashSet::new(),
268 missing_tables: HashSet::new(),
269 catalog_snapshot,
270 base_context,
271 is_aborted: false,
272 accessed_tables: HashSet::new(),
273 snapshot,
274 txn_manager,
275 }
276 }
277
278 fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
281 tracing::trace!(
282 "[ENSURE] ensure_table_exists called for table='{}'",
283 table_name
284 );
285
286 if self.staged_tables.contains(table_name) {
288 tracing::trace!("[ENSURE] table already verified to exist");
289 return Ok(());
290 }
291
292 if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
294 {
295 self.missing_tables.insert(table_name.to_string());
296 return Err(Error::CatalogError(format!(
297 "Catalog Error: Table '{table_name}' does not exist"
298 )));
299 }
300
301 if self.missing_tables.contains(table_name) {
302 return Err(Error::CatalogError(format!(
303 "Catalog Error: Table '{table_name}' does not exist"
304 )));
305 }
306
307 if self.new_tables.contains(table_name) {
309 tracing::trace!("[ENSURE] Table was created in this transaction");
310 match self.staging.table_column_specs(table_name) {
312 Ok(_) => {
313 self.staged_tables.insert(table_name.to_string());
314 return Ok(());
315 }
316 Err(_) => {
317 return Err(Error::CatalogError(format!(
318 "Catalog Error: Table '{table_name}' was created but not found in staging"
319 )));
320 }
321 }
322 }
323
324 tracing::trace!(
326 "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
327 );
328 self.staged_tables.insert(table_name.to_string());
329 Ok(())
330 }
331
332 pub fn execute_select(
336 &mut self,
337 plan: SelectPlan,
338 ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
339 self.ensure_table_exists(&plan.table)?;
341
342 if self.new_tables.contains(&plan.table) {
344 tracing::trace!(
345 "[SELECT] Reading from staging for new table '{}'",
346 plan.table
347 );
348 return self.staging.execute_select(plan);
349 }
350
351 self.accessed_tables.insert(plan.table.clone());
353
354 tracing::trace!(
357 "[SELECT] Reading from BASE with MVCC for existing table '{}'",
358 plan.table
359 );
360 let table_name = plan.table.clone();
361 self.base_context.execute_select(plan).and_then(|exec| {
362 let schema = exec.schema();
366 let batches = exec.collect().unwrap_or_default();
367 let combined = if batches.is_empty() {
368 RecordBatch::new_empty(Arc::clone(&schema))
369 } else if batches.len() == 1 {
370 batches.into_iter().next().unwrap()
371 } else {
372 let refs: Vec<&RecordBatch> = batches.iter().collect();
373 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
374 Error::Internal(format!("failed to concatenate batches: {err}"))
375 })?
376 };
377 Ok(SelectExecution::from_batch(
378 table_name,
379 Arc::clone(&schema),
380 combined,
381 ))
382 })
383 }
384
385 pub fn execute_operation(
387 &mut self,
388 operation: PlanOperation,
389 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
390 tracing::trace!(
391 "[TX] SessionTransaction::execute_operation called, operation={:?}",
392 match &operation {
393 PlanOperation::Insert(p) => format!("INSERT({})", p.table),
394 PlanOperation::Update(p) => format!("UPDATE({})", p.table),
395 PlanOperation::Delete(p) => format!("DELETE({})", p.table),
396 PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
397 _ => "OTHER".to_string(),
398 }
399 );
400 if self.is_aborted {
402 return Err(Error::TransactionContextError(
403 "TransactionContext Error: transaction is aborted".into(),
404 ));
405 }
406
407 let result = match operation {
409 PlanOperation::CreateTable(ref plan) => {
410 match self.staging.create_table_plan(plan.clone()) {
411 Ok(result) => {
412 self.new_tables.insert(plan.name.clone());
414 self.missing_tables.remove(&plan.name);
415 self.staged_tables.insert(plan.name.clone());
416 self.operations
418 .push(PlanOperation::CreateTable(plan.clone()));
419 result.convert_pager_type()?
420 }
421 Err(e) => {
422 self.is_aborted = true;
423 return Err(e);
424 }
425 }
426 }
427 PlanOperation::Insert(ref plan) => {
428 tracing::trace!(
429 "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
430 plan.table
431 );
432 if let Err(e) = self.ensure_table_exists(&plan.table) {
434 self.is_aborted = true;
435 return Err(e);
436 }
437
438 let is_new_table = self.new_tables.contains(&plan.table);
441 if !is_new_table {
443 self.accessed_tables.insert(plan.table.clone());
444 }
445 let result = if is_new_table {
446 tracing::trace!("[TX] INSERT into staging for new table");
447 self.staging.insert(plan.clone())
448 } else {
449 tracing::trace!(
450 "[TX] INSERT directly into BASE with txn_id={}",
451 self.snapshot.txn_id
452 );
453 self.base_context
455 .insert(plan.clone())
456 .and_then(|r| r.convert_pager_type())
457 };
458
459 match result {
460 Ok(result) => {
461 if is_new_table {
464 tracing::trace!(
465 "[TX] INSERT to new table - tracking for commit replay"
466 );
467 self.operations.push(PlanOperation::Insert(plan.clone()));
468 } else {
469 tracing::trace!(
470 "[TX] INSERT to existing table - already in BASE, no replay needed"
471 );
472 }
473 result
474 }
475 Err(e) => {
476 tracing::trace!(
477 "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
478 e
479 );
480 tracing::trace!("DEBUG setting is_aborted=true");
481 self.is_aborted = true;
482 return Err(e);
483 }
484 }
485 }
486 PlanOperation::Update(ref plan) => {
487 if let Err(e) = self.ensure_table_exists(&plan.table) {
488 self.is_aborted = true;
489 return Err(e);
490 }
491
492 let is_new_table = self.new_tables.contains(&plan.table);
495 if !is_new_table {
497 self.accessed_tables.insert(plan.table.clone());
498 }
499 let result = if is_new_table {
500 tracing::trace!("[TX] UPDATE in staging for new table");
501 self.staging.update(plan.clone())
502 } else {
503 tracing::trace!(
504 "[TX] UPDATE directly in BASE with txn_id={}",
505 self.snapshot.txn_id
506 );
507 self.base_context
508 .update(plan.clone())
509 .and_then(|r| r.convert_pager_type())
510 };
511
512 match result {
513 Ok(result) => {
514 if is_new_table {
516 tracing::trace!(
517 "[TX] UPDATE to new table - tracking for commit replay"
518 );
519 self.operations.push(PlanOperation::Update(plan.clone()));
520 } else {
521 tracing::trace!(
522 "[TX] UPDATE to existing table - already in BASE, no replay needed"
523 );
524 }
525 result
526 }
527 Err(e) => {
528 self.is_aborted = true;
529 return Err(e);
530 }
531 }
532 }
533 PlanOperation::Delete(ref plan) => {
534 tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
535 if let Err(e) = self.ensure_table_exists(&plan.table) {
536 tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
537 self.is_aborted = true;
538 return Err(e);
539 }
540
541 let is_new_table = self.new_tables.contains(&plan.table);
544 tracing::debug!("[DELETE] is_new_table={}", is_new_table);
545 if !is_new_table {
547 tracing::debug!(
548 "[DELETE] Tracking access to existing table '{}'",
549 plan.table
550 );
551 self.accessed_tables.insert(plan.table.clone());
552 }
553 let result = if is_new_table {
554 tracing::debug!("[DELETE] Deleting from staging for new table");
555 self.staging.delete(plan.clone())
556 } else {
557 tracing::debug!(
558 "[DELETE] Deleting from BASE with txn_id={}",
559 self.snapshot.txn_id
560 );
561 self.base_context
562 .delete(plan.clone())
563 .and_then(|r| r.convert_pager_type())
564 };
565
566 tracing::debug!(
567 "[DELETE] Result: {:?}",
568 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
569 );
570 match result {
571 Ok(result) => {
572 if is_new_table {
574 tracing::trace!(
575 "[TX] DELETE from new table - tracking for commit replay"
576 );
577 self.operations.push(PlanOperation::Delete(plan.clone()));
578 } else {
579 tracing::trace!(
580 "[TX] DELETE from existing table - already in BASE, no replay needed"
581 );
582 }
583 result
584 }
585 Err(e) => {
586 self.is_aborted = true;
587 return Err(e);
588 }
589 }
590 }
591 PlanOperation::Select(ref plan) => {
592 let table_name = plan.table.clone();
595 match self.execute_select(plan.clone()) {
596 Ok(staging_execution) => {
597 let schema = staging_execution.schema();
599 let batches = staging_execution.collect().unwrap_or_default();
600
601 let combined = if batches.is_empty() {
603 RecordBatch::new_empty(Arc::clone(&schema))
604 } else if batches.len() == 1 {
605 batches.into_iter().next().unwrap()
606 } else {
607 let refs: Vec<&RecordBatch> = batches.iter().collect();
608 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
609 Error::Internal(format!("failed to concatenate batches: {err}"))
610 })?
611 };
612
613 let execution = SelectExecution::from_batch(
615 table_name.clone(),
616 Arc::clone(&schema),
617 combined,
618 );
619
620 TransactionResult::Select {
621 table_name,
622 schema,
623 execution,
624 }
625 }
626 Err(e) => {
627 return Err(e);
630 }
631 }
632 }
633 };
634
635 Ok(result)
636 }
637
638 pub fn operations(&self) -> &[PlanOperation] {
640 &self.operations
641 }
642}
643
644pub struct TransactionSession<BaseCtx, StagingCtx>
647where
648 BaseCtx: TransactionContext + 'static,
649 StagingCtx: TransactionContext + 'static,
650{
651 context: Arc<BaseCtx>,
652 session_id: SessionId,
653 transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
654 txn_manager: Arc<TxnIdManager>,
655}
656
657impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
658where
659 BaseCtx: TransactionContext + 'static,
660 StagingCtx: TransactionContext + 'static,
661{
662 pub fn new(
663 context: Arc<BaseCtx>,
664 session_id: SessionId,
665 transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
666 txn_manager: Arc<TxnIdManager>,
667 ) -> Self {
668 Self {
669 context,
670 session_id,
671 transactions,
672 txn_manager,
673 }
674 }
675
676 pub fn clone_session(&self) -> Self {
679 Self {
680 context: Arc::clone(&self.context),
681 session_id: self.session_id,
682 transactions: Arc::clone(&self.transactions),
683 txn_manager: Arc::clone(&self.txn_manager),
684 }
685 }
686
687 pub fn session_id(&self) -> SessionId {
689 self.session_id
690 }
691
692 pub fn context(&self) -> &Arc<BaseCtx> {
694 &self.context
695 }
696
697 pub fn has_active_transaction(&self) -> bool {
699 self.transactions
700 .lock()
701 .expect("transactions lock poisoned")
702 .contains_key(&self.session_id)
703 }
704
705 pub fn is_aborted(&self) -> bool {
707 self.transactions
708 .lock()
709 .expect("transactions lock poisoned")
710 .get(&self.session_id)
711 .map(|tx| tx.is_aborted)
712 .unwrap_or(false)
713 }
714
715 pub fn abort_transaction(&self) {
718 let mut guard = self
719 .transactions
720 .lock()
721 .expect("transactions lock poisoned");
722 if let Some(tx) = guard.get_mut(&self.session_id) {
723 tx.is_aborted = true;
724 }
725 }
726
727 pub fn begin_transaction(
729 &self,
730 staging: Arc<StagingCtx>,
731 ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
732 tracing::debug!(
733 "[BEGIN] begin_transaction called for session_id={}",
734 self.session_id
735 );
736 let mut guard = self
737 .transactions
738 .lock()
739 .expect("transactions lock poisoned");
740 tracing::debug!(
741 "[BEGIN] session_id={}, transactions map has {} entries",
742 self.session_id,
743 guard.len()
744 );
745 if guard.contains_key(&self.session_id) {
746 return Err(Error::InvalidArgumentError(
747 "a transaction is already in progress in this session".into(),
748 ));
749 }
750 guard.insert(
751 self.session_id,
752 SessionTransaction::new(
753 Arc::clone(&self.context),
754 staging,
755 Arc::clone(&self.txn_manager),
756 ),
757 );
758 tracing::debug!(
759 "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
760 self.session_id,
761 guard.len()
762 );
763 Ok(TransactionResult::Transaction {
764 kind: TransactionKind::Begin,
765 })
766 }
767
768 pub fn commit_transaction(
771 &self,
772 ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
773 tracing::trace!(
774 "[COMMIT] commit_transaction called for session {:?}",
775 self.session_id
776 );
777 let mut guard = self
778 .transactions
779 .lock()
780 .expect("transactions lock poisoned");
781 tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
782 let tx_opt = guard.remove(&self.session_id);
783 tracing::trace!(
784 "[COMMIT] commit_transaction remove returned: {}",
785 tx_opt.is_some()
786 );
787 let tx = tx_opt.ok_or_else(|| {
788 tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
789 Error::InvalidArgumentError(
790 "no transaction is currently in progress in this session".into(),
791 )
792 })?;
793 tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
794
795 if tx.is_aborted {
797 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
798 let auto_commit_snapshot = TransactionSnapshot {
800 txn_id: TXN_ID_AUTO_COMMIT,
801 snapshot_id: tx.txn_manager.last_committed(),
802 };
803 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
804 tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
805 return Ok((
806 TransactionResult::Transaction {
807 kind: TransactionKind::Rollback,
808 },
809 Vec::new(),
810 ));
811 }
812
813 tracing::debug!(
816 "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
817 tx.snapshot.txn_id,
818 tx.accessed_tables.len()
819 );
820 for accessed_table_name in &tx.accessed_tables {
821 tracing::debug!(
822 "[COMMIT CONFLICT CHECK] Checking table '{}'",
823 accessed_table_name
824 );
825 if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
827 match self.context.table_id(accessed_table_name) {
829 Ok(current_table_id) => {
830 if current_table_id != snapshot_table_id {
832 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
833 let auto_commit_snapshot = TransactionSnapshot {
834 txn_id: TXN_ID_AUTO_COMMIT,
835 snapshot_id: tx.txn_manager.last_committed(),
836 };
837 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
838 return Err(Error::TransactionContextError(
839 "another transaction has dropped this table".into(),
840 ));
841 }
842 }
843 Err(_) => {
844 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
846 let auto_commit_snapshot = TransactionSnapshot {
847 txn_id: TXN_ID_AUTO_COMMIT,
848 snapshot_id: tx.txn_manager.last_committed(),
849 };
850 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
851 return Err(Error::TransactionContextError(
852 "another transaction has dropped this table".into(),
853 ));
854 }
855 }
856 }
857 }
858
859 let operations = tx.operations;
860 tracing::trace!(
861 "DEBUG commit_transaction: returning Commit with {} operations",
862 operations.len()
863 );
864
865 tx.txn_manager.mark_committed(tx.snapshot.txn_id);
866 TransactionContext::set_snapshot(&*self.context, tx.snapshot);
867
868 Ok((
869 TransactionResult::Transaction {
870 kind: TransactionKind::Commit,
871 },
872 operations,
873 ))
874 }
875
876 pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
878 let mut guard = self
879 .transactions
880 .lock()
881 .expect("transactions lock poisoned");
882 if let Some(tx) = guard.remove(&self.session_id) {
883 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
884 let auto_commit_snapshot = TransactionSnapshot {
886 txn_id: TXN_ID_AUTO_COMMIT,
887 snapshot_id: tx.txn_manager.last_committed(),
888 };
889 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
890 } else {
891 return Err(Error::InvalidArgumentError(
892 "no transaction is currently in progress in this session".into(),
893 ));
894 }
895 Ok(TransactionResult::Transaction {
896 kind: TransactionKind::Rollback,
897 })
898 }
899
900 pub fn execute_operation(
902 &self,
903 operation: PlanOperation,
904 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
905 tracing::debug!(
906 "[EXECUTE_OP] execute_operation called for session_id={}",
907 self.session_id
908 );
909 if !self.has_active_transaction() {
910 return Err(Error::InvalidArgumentError(
912 "execute_operation called without active transaction".into(),
913 ));
914 }
915
916 let mut guard = self
918 .transactions
919 .lock()
920 .expect("transactions lock poisoned");
921 tracing::debug!(
922 "[EXECUTE_OP] session_id={}, transactions map has {} entries",
923 self.session_id,
924 guard.len()
925 );
926 let tx = guard
927 .get_mut(&self.session_id)
928 .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
929 tracing::debug!(
930 "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
931 self.session_id,
932 tx.snapshot.txn_id,
933 tx.accessed_tables.len()
934 );
935
936 let result = tx.execute_operation(operation);
937 if let Err(ref e) = result {
938 tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
939 tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
940 }
941 result
942 }
943}
944
945impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
946where
947 BaseCtx: TransactionContext,
948 StagingCtx: TransactionContext,
949{
950 fn drop(&mut self) {
951 match self.transactions.lock() {
954 Ok(mut guard) => {
955 if guard.remove(&self.session_id).is_some() {
956 eprintln!(
957 "Warning: TransactionSession dropped with active transaction - auto-rolling back"
958 );
959 }
960 }
961 Err(_) => {
962 tracing::trace!(
965 "Warning: TransactionSession dropped with poisoned transaction mutex"
966 );
967 }
968 }
969 }
970}
971
972pub struct TransactionManager<BaseCtx, StagingCtx>
974where
975 BaseCtx: TransactionContext + 'static,
976 StagingCtx: TransactionContext + 'static,
977{
978 transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
979 next_session_id: AtomicU64,
980 txn_manager: Arc<TxnIdManager>,
981}
982
983impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
984where
985 BaseCtx: TransactionContext + 'static,
986 StagingCtx: TransactionContext + 'static,
987{
988 pub fn new() -> Self {
989 Self {
990 transactions: Arc::new(Mutex::new(HashMap::new())),
991 next_session_id: AtomicU64::new(1),
992 txn_manager: Arc::new(TxnIdManager::new()),
993 }
994 }
995
996 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
998 Self {
999 transactions: Arc::new(Mutex::new(HashMap::new())),
1000 next_session_id: AtomicU64::new(1),
1001 txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1002 }
1003 }
1004
1005 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1007 Self {
1008 transactions: Arc::new(Mutex::new(HashMap::new())),
1009 next_session_id: AtomicU64::new(1),
1010 txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1011 next_txn_id,
1012 last_committed,
1013 )),
1014 }
1015 }
1016
1017 pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1019 let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1020 tracing::debug!(
1021 "[TX_MANAGER] create_session: allocated session_id={}",
1022 session_id
1023 );
1024 TransactionSession::new(
1025 context,
1026 session_id,
1027 Arc::clone(&self.transactions),
1028 Arc::clone(&self.txn_manager),
1029 )
1030 }
1031
1032 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1034 Arc::clone(&self.txn_manager)
1035 }
1036
1037 pub fn has_active_transaction(&self) -> bool {
1039 !self
1040 .transactions
1041 .lock()
1042 .expect("transactions lock poisoned")
1043 .is_empty()
1044 }
1045}
1046
1047impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1048where
1049 BaseCtx: TransactionContext + 'static,
1050 StagingCtx: TransactionContext + 'static,
1051{
1052 fn default() -> Self {
1053 Self::new()
1054 }
1055}