1use std::collections::{HashMap, HashSet};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, Mutex};
12
13use arrow::array::RecordBatch;
14use llkv_column_map::types::TableId;
15use llkv_executor::{ExecutorRowBatch, SelectExecution};
16use llkv_expr::expr::Expr as LlkvExpr;
17use llkv_plan::plans::{
18 CreateIndexPlan, CreateTablePlan, DeletePlan, DropTablePlan, InsertPlan, PlanColumnSpec,
19 PlanOperation, SelectPlan, UpdatePlan,
20};
21use llkv_result::{Error, Result as LlkvResult};
22use llkv_storage::pager::Pager;
23use llkv_table::CatalogDdl;
24use simd_r_drive_entry_handle::EntryHandle;
25
26use crate::mvcc::{TXN_ID_AUTO_COMMIT, TransactionSnapshot, TxnId, TxnIdManager};
27use crate::types::{TransactionCatalogSnapshot, TransactionKind, TransactionResult};
28
29pub type TransactionSessionId = u64;
34
35fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
37 if plan.tables.len() == 1 {
38 Some(plan.tables[0].qualified_name())
39 } else {
40 None
41 }
42}
43
44pub trait TransactionContext: CatalogDdl + Send + Sync {
49 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
51
52 type Snapshot: TransactionCatalogSnapshot;
54
55 fn set_snapshot(&self, snapshot: TransactionSnapshot);
57
58 fn snapshot(&self) -> TransactionSnapshot;
60
61 fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<PlanColumnSpec>>;
63
64 fn export_table_rows(&self, table_name: &str) -> LlkvResult<ExecutorRowBatch>;
66
67 fn get_batches_with_row_ids(
69 &self,
70 table_name: &str,
71 filter: Option<LlkvExpr<'static, String>>,
72 ) -> LlkvResult<Vec<RecordBatch>>;
73
74 fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
76
77 fn apply_create_table_plan(
79 &self,
80 plan: CreateTablePlan,
81 ) -> LlkvResult<TransactionResult<Self::Pager>>;
82
83 fn drop_table(&self, plan: DropTablePlan) -> LlkvResult<()>;
85
86 fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
88
89 fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
91
92 fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
94
95 fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
97
98 fn append_batches_with_row_ids(
100 &self,
101 table_name: &str,
102 batches: Vec<RecordBatch>,
103 ) -> LlkvResult<usize>;
104
105 fn table_names(&self) -> Vec<String>;
107
108 fn table_id(&self, table_name: &str) -> LlkvResult<TableId>;
110
111 fn catalog_snapshot(&self) -> Self::Snapshot;
113
114 fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
116 Ok(())
117 }
118
119 fn clear_transaction_state(&self, _txn_id: TxnId) {}
121}
122
123pub struct SessionTransaction<BaseCtx, StagingCtx>
131where
132 BaseCtx: TransactionContext + 'static,
133 StagingCtx: TransactionContext + 'static,
134{
135 snapshot: TransactionSnapshot,
137 staging: Arc<StagingCtx>,
139 operations: Vec<PlanOperation>,
141 staged_tables: HashSet<String>,
143 new_tables: HashSet<String>,
145 missing_tables: HashSet<String>,
147 locked_table_names: HashSet<String>,
150 catalog_snapshot: BaseCtx::Snapshot,
153 base_context: Arc<BaseCtx>,
155 is_aborted: bool,
157 txn_manager: Arc<TxnIdManager>,
159 accessed_tables: HashSet<String>,
161 transactional_foreign_keys: HashMap<String, Vec<String>>,
165}
166
167impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
168where
169 BaseCtx: TransactionContext + 'static,
170 StagingCtx: TransactionContext + 'static,
171{
172 pub fn new(
173 base_context: Arc<BaseCtx>,
174 staging: Arc<StagingCtx>,
175 txn_manager: Arc<TxnIdManager>,
176 ) -> Self {
177 let catalog_snapshot = base_context.catalog_snapshot();
180
181 let snapshot = txn_manager.begin_transaction();
182 tracing::debug!(
183 "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
184 snapshot.txn_id,
185 snapshot.snapshot_id
186 );
187 TransactionContext::set_snapshot(&*base_context, snapshot);
188 TransactionContext::set_snapshot(&*staging, snapshot);
189
190 Self {
191 staging,
192 operations: Vec::new(),
193 staged_tables: HashSet::new(),
194 new_tables: HashSet::new(),
195 missing_tables: HashSet::new(),
196 locked_table_names: HashSet::new(),
197 catalog_snapshot,
198 base_context,
199 is_aborted: false,
200 accessed_tables: HashSet::new(),
201 snapshot,
202 txn_manager,
203 transactional_foreign_keys: HashMap::new(),
204 }
205 }
206
207 fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
210 tracing::trace!(
211 "[ENSURE] ensure_table_exists called for table='{}'",
212 table_name
213 );
214
215 if self.staged_tables.contains(table_name) {
217 tracing::trace!("[ENSURE] table already verified to exist");
218 return Ok(());
219 }
220
221 if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
223 {
224 self.missing_tables.insert(table_name.to_string());
225 return Err(Error::CatalogError(format!(
226 "Catalog Error: Table '{table_name}' does not exist"
227 )));
228 }
229
230 if self.missing_tables.contains(table_name) {
231 return Err(Error::CatalogError(format!(
232 "Catalog Error: Table '{table_name}' does not exist"
233 )));
234 }
235
236 if self.new_tables.contains(table_name) {
238 tracing::trace!("[ENSURE] Table was created in this transaction");
239 match self.staging.table_column_specs(table_name) {
241 Ok(_) => {
242 self.staged_tables.insert(table_name.to_string());
243 return Ok(());
244 }
245 Err(_) => {
246 return Err(Error::CatalogError(format!(
247 "Catalog Error: Table '{table_name}' was created but not found in staging"
248 )));
249 }
250 }
251 }
252
253 tracing::trace!(
255 "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
256 );
257 self.staged_tables.insert(table_name.to_string());
258 Ok(())
259 }
260
261 pub fn execute_select(
265 &mut self,
266 plan: SelectPlan,
267 ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
268 let table_name = select_plan_table_name(&plan).ok_or_else(|| {
270 Error::InvalidArgumentError(
271 "Transaction execute_select requires single-table query".into(),
272 )
273 })?;
274
275 self.ensure_table_exists(&table_name)?;
277
278 if self.new_tables.contains(&table_name) {
280 tracing::trace!(
281 "[SELECT] Reading from staging for new table '{}'",
282 table_name
283 );
284 return self.staging.execute_select(plan);
285 }
286
287 self.accessed_tables.insert(table_name.clone());
289
290 tracing::trace!(
293 "[SELECT] Reading from BASE with MVCC for existing table '{}'",
294 table_name
295 );
296 self.base_context.execute_select(plan).and_then(|exec| {
297 let schema = exec.schema();
301 let batches = exec.collect().unwrap_or_default();
302 let combined = if batches.is_empty() {
303 RecordBatch::new_empty(Arc::clone(&schema))
304 } else if batches.len() == 1 {
305 batches.into_iter().next().unwrap()
306 } else {
307 let refs: Vec<&RecordBatch> = batches.iter().collect();
308 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
309 Error::Internal(format!("failed to concatenate batches: {err}"))
310 })?
311 };
312 Ok(SelectExecution::from_batch(
313 table_name,
314 Arc::clone(&schema),
315 combined,
316 ))
317 })
318 }
319
320 pub fn execute_operation(
322 &mut self,
323 operation: PlanOperation,
324 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
325 tracing::trace!(
326 "[TX] SessionTransaction::execute_operation called, operation={:?}",
327 match &operation {
328 PlanOperation::Insert(p) => format!("INSERT({})", p.table),
329 PlanOperation::Update(p) => format!("UPDATE({})", p.table),
330 PlanOperation::Delete(p) => format!("DELETE({})", p.table),
331 PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
332 _ => "OTHER".to_string(),
333 }
334 );
335 if self.is_aborted {
337 return Err(Error::TransactionContextError(
338 "TransactionContext Error: transaction is aborted".into(),
339 ));
340 }
341
342 let result = match operation {
344 PlanOperation::CreateTable(ref plan) => {
345 for fk in &plan.foreign_keys {
349 let canonical_ref_table = fk.referenced_table.to_ascii_lowercase();
350 if !self.new_tables.contains(&canonical_ref_table)
352 && !self.catalog_snapshot.table_exists(&canonical_ref_table)
353 {
354 self.is_aborted = true;
355 return Err(Error::CatalogError(format!(
356 "Catalog Error: referenced table '{}' does not exist",
357 fk.referenced_table
358 )));
359 }
360 }
361
362 let mut staging_plan = plan.clone();
366 staging_plan.foreign_keys.clear();
367
368 match self.staging.apply_create_table_plan(staging_plan) {
369 Ok(result) => {
370 self.new_tables.insert(plan.name.clone());
372 self.missing_tables.remove(&plan.name);
373 self.staged_tables.insert(plan.name.clone());
374 self.locked_table_names
376 .insert(plan.name.to_ascii_lowercase());
377
378 for fk in &plan.foreign_keys {
380 let referenced_table = fk.referenced_table.to_ascii_lowercase();
381 self.transactional_foreign_keys
382 .entry(referenced_table)
383 .or_default()
384 .push(plan.name.to_ascii_lowercase());
385 }
386
387 self.operations
389 .push(PlanOperation::CreateTable(plan.clone()));
390 result.convert_pager_type()?
391 }
392 Err(e) => {
393 self.is_aborted = true;
394 return Err(e);
395 }
396 }
397 }
398 PlanOperation::DropTable(ref plan) => {
399 let canonical_name = plan.name.to_ascii_lowercase();
400
401 self.locked_table_names.insert(canonical_name.clone());
403
404 if self.new_tables.contains(&canonical_name) {
406 TransactionContext::drop_table(self.staging.as_ref(), plan.clone())?;
409 self.new_tables.remove(&canonical_name);
410 self.staged_tables.remove(&canonical_name);
411
412 self.transactional_foreign_keys.iter_mut().for_each(
414 |(_, referencing_tables)| {
415 referencing_tables.retain(|t| t != &canonical_name);
416 },
417 );
418 self.transactional_foreign_keys
420 .retain(|_, referencing_tables| !referencing_tables.is_empty());
421
422 self.operations.retain(|op| {
424 !matches!(op, PlanOperation::CreateTable(p) if p.name.to_ascii_lowercase() == canonical_name)
425 });
426 TransactionResult::NoOp
429 } else {
430 if !self.catalog_snapshot.table_exists(&canonical_name) && !plan.if_exists {
433 self.is_aborted = true;
434 return Err(Error::InvalidArgumentError(format!(
435 "table '{}' does not exist",
436 plan.name
437 )));
438 }
439
440 if self.catalog_snapshot.table_exists(&canonical_name) {
441 self.missing_tables.insert(canonical_name.clone());
443 self.staged_tables.remove(&canonical_name);
444 self.operations.push(PlanOperation::DropTable(plan.clone()));
446 }
447 TransactionResult::NoOp
448 }
449 }
450 PlanOperation::Insert(ref plan) => {
451 tracing::trace!(
452 "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
453 plan.table
454 );
455 if let Err(e) = self.ensure_table_exists(&plan.table) {
457 self.is_aborted = true;
458 return Err(e);
459 }
460
461 let is_new_table = self.new_tables.contains(&plan.table);
464 if !is_new_table {
466 self.accessed_tables.insert(plan.table.clone());
467 }
468 let result = if is_new_table {
469 tracing::trace!("[TX] INSERT into staging for new table");
470 self.staging.insert(plan.clone())
471 } else {
472 tracing::trace!(
473 "[TX] INSERT directly into BASE with txn_id={}",
474 self.snapshot.txn_id
475 );
476 self.base_context
478 .insert(plan.clone())
479 .and_then(|r| r.convert_pager_type())
480 };
481
482 match result {
483 Ok(result) => {
484 if is_new_table {
487 tracing::trace!(
488 "[TX] INSERT to new table - tracking for commit replay"
489 );
490 self.operations.push(PlanOperation::Insert(plan.clone()));
491 } else {
492 tracing::trace!(
493 "[TX] INSERT to existing table - already in BASE, no replay needed"
494 );
495 }
496 result
497 }
498 Err(e) => {
499 tracing::trace!(
500 "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
501 e
502 );
503 tracing::trace!("DEBUG setting is_aborted=true");
504 self.is_aborted = true;
505 return Err(e);
506 }
507 }
508 }
509 PlanOperation::Update(ref plan) => {
510 if let Err(e) = self.ensure_table_exists(&plan.table) {
511 self.is_aborted = true;
512 return Err(e);
513 }
514
515 let is_new_table = self.new_tables.contains(&plan.table);
518 if !is_new_table {
520 self.accessed_tables.insert(plan.table.clone());
521 }
522 let result = if is_new_table {
523 tracing::trace!("[TX] UPDATE in staging for new table");
524 self.staging.update(plan.clone())
525 } else {
526 tracing::trace!(
527 "[TX] UPDATE directly in BASE with txn_id={}",
528 self.snapshot.txn_id
529 );
530 self.base_context
531 .update(plan.clone())
532 .and_then(|r| r.convert_pager_type())
533 };
534
535 match result {
536 Ok(result) => {
537 if is_new_table {
539 tracing::trace!(
540 "[TX] UPDATE to new table - tracking for commit replay"
541 );
542 self.operations.push(PlanOperation::Update(plan.clone()));
543 } else {
544 tracing::trace!(
545 "[TX] UPDATE to existing table - already in BASE, no replay needed"
546 );
547 }
548 result
549 }
550 Err(e) => {
551 self.is_aborted = true;
552 return Err(e);
553 }
554 }
555 }
556 PlanOperation::Delete(ref plan) => {
557 tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
558 if let Err(e) = self.ensure_table_exists(&plan.table) {
559 tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
560 self.is_aborted = true;
561 return Err(e);
562 }
563
564 let is_new_table = self.new_tables.contains(&plan.table);
567 tracing::debug!("[DELETE] is_new_table={}", is_new_table);
568 if !is_new_table {
570 tracing::debug!(
571 "[DELETE] Tracking access to existing table '{}'",
572 plan.table
573 );
574 self.accessed_tables.insert(plan.table.clone());
575 }
576 let result = if is_new_table {
577 tracing::debug!("[DELETE] Deleting from staging for new table");
578 self.staging.delete(plan.clone())
579 } else {
580 tracing::debug!(
581 "[DELETE] Deleting from BASE with txn_id={}",
582 self.snapshot.txn_id
583 );
584 self.base_context
585 .delete(plan.clone())
586 .and_then(|r| r.convert_pager_type())
587 };
588
589 tracing::debug!(
590 "[DELETE] Result: {:?}",
591 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
592 );
593 match result {
594 Ok(result) => {
595 if is_new_table {
597 tracing::trace!(
598 "[TX] DELETE from new table - tracking for commit replay"
599 );
600 self.operations.push(PlanOperation::Delete(plan.clone()));
601 } else {
602 tracing::trace!(
603 "[TX] DELETE from existing table - already in BASE, no replay needed"
604 );
605 }
606 result
607 }
608 Err(e) => {
609 self.is_aborted = true;
610 return Err(e);
611 }
612 }
613 }
614 PlanOperation::Select(ref plan) => {
615 let table_name = select_plan_table_name(plan).unwrap_or_default();
618 match self.execute_select(plan.clone()) {
619 Ok(staging_execution) => {
620 let schema = staging_execution.schema();
622 let batches = staging_execution.collect().unwrap_or_default();
623
624 let combined = if batches.is_empty() {
626 RecordBatch::new_empty(Arc::clone(&schema))
627 } else if batches.len() == 1 {
628 batches.into_iter().next().unwrap()
629 } else {
630 let refs: Vec<&RecordBatch> = batches.iter().collect();
631 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
632 Error::Internal(format!("failed to concatenate batches: {err}"))
633 })?
634 };
635
636 let execution = SelectExecution::from_batch(
638 table_name.clone(),
639 Arc::clone(&schema),
640 combined,
641 );
642
643 TransactionResult::Select {
644 table_name,
645 schema,
646 execution,
647 }
648 }
649 Err(e) => {
650 return Err(e);
653 }
654 }
655 }
656 };
657
658 Ok(result)
659 }
660
661 pub fn operations(&self) -> &[PlanOperation] {
663 &self.operations
664 }
665}
666
667pub struct TransactionSession<BaseCtx, StagingCtx>
671where
672 BaseCtx: TransactionContext + 'static,
673 StagingCtx: TransactionContext + 'static,
674{
675 context: Arc<BaseCtx>,
676 session_id: TransactionSessionId,
677 transactions:
678 Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
679 txn_manager: Arc<TxnIdManager>,
680}
681
682impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
683where
684 BaseCtx: TransactionContext + 'static,
685 StagingCtx: TransactionContext + 'static,
686{
687 pub fn new(
688 context: Arc<BaseCtx>,
689 session_id: TransactionSessionId,
690 transactions: Arc<
691 Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>,
692 >,
693 txn_manager: Arc<TxnIdManager>,
694 ) -> Self {
695 Self {
696 context,
697 session_id,
698 transactions,
699 txn_manager,
700 }
701 }
702
703 pub fn clone_session(&self) -> Self {
706 Self {
707 context: Arc::clone(&self.context),
708 session_id: self.session_id,
709 transactions: Arc::clone(&self.transactions),
710 txn_manager: Arc::clone(&self.txn_manager),
711 }
712 }
713
714 pub fn session_id(&self) -> TransactionSessionId {
716 self.session_id
717 }
718
719 pub fn context(&self) -> &Arc<BaseCtx> {
721 &self.context
722 }
723
724 pub fn has_active_transaction(&self) -> bool {
726 self.transactions
727 .lock()
728 .expect("transactions lock poisoned")
729 .contains_key(&self.session_id)
730 }
731
732 pub fn is_aborted(&self) -> bool {
734 self.transactions
735 .lock()
736 .expect("transactions lock poisoned")
737 .get(&self.session_id)
738 .map(|tx| tx.is_aborted)
739 .unwrap_or(false)
740 }
741
742 pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
745 self.transactions
746 .lock()
747 .expect("transactions lock poisoned")
748 .get(&self.session_id)
749 .map(|tx| tx.new_tables.contains(table_name))
750 .unwrap_or(false)
751 }
752
753 pub fn table_column_specs_from_transaction(
756 &self,
757 table_name: &str,
758 ) -> Option<Vec<PlanColumnSpec>> {
759 let guard = self
760 .transactions
761 .lock()
762 .expect("transactions lock poisoned");
763
764 let tx = guard.get(&self.session_id)?;
765 if !tx.new_tables.contains(table_name) {
766 return None;
767 }
768
769 tx.staging.table_column_specs(table_name).ok()
771 }
772
773 pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
776 let canonical = referenced_table.to_ascii_lowercase();
777 let guard = self
778 .transactions
779 .lock()
780 .expect("transactions lock poisoned");
781
782 let tx = match guard.get(&self.session_id) {
783 Some(tx) => tx,
784 None => return Vec::new(),
785 };
786
787 tx.transactional_foreign_keys
788 .get(&canonical)
789 .cloned()
790 .unwrap_or_else(Vec::new)
791 }
792
793 pub fn has_table_locked_by_other_session(&self, table_name: &str) -> bool {
796 let canonical = table_name.to_ascii_lowercase();
797 let guard = self
798 .transactions
799 .lock()
800 .expect("transactions lock poisoned");
801
802 for (session_id, tx) in guard.iter() {
803 if *session_id == self.session_id {
805 continue;
806 }
807
808 if tx.locked_table_names.contains(&canonical) {
810 return true;
811 }
812 }
813
814 false
815 }
816
817 pub fn abort_transaction(&self) {
820 let mut guard = self
821 .transactions
822 .lock()
823 .expect("transactions lock poisoned");
824 if let Some(tx) = guard.get_mut(&self.session_id) {
825 tx.is_aborted = true;
826 }
827 }
828
829 pub fn begin_transaction(
831 &self,
832 staging: Arc<StagingCtx>,
833 ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
834 tracing::debug!(
835 "[BEGIN] begin_transaction called for session_id={}",
836 self.session_id
837 );
838 let mut guard = self
839 .transactions
840 .lock()
841 .expect("transactions lock poisoned");
842 tracing::debug!(
843 "[BEGIN] session_id={}, transactions map has {} entries",
844 self.session_id,
845 guard.len()
846 );
847 if guard.contains_key(&self.session_id) {
848 return Err(Error::InvalidArgumentError(
849 "a transaction is already in progress in this session".into(),
850 ));
851 }
852 guard.insert(
853 self.session_id,
854 SessionTransaction::new(
855 Arc::clone(&self.context),
856 staging,
857 Arc::clone(&self.txn_manager),
858 ),
859 );
860 tracing::debug!(
861 "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
862 self.session_id,
863 guard.len()
864 );
865 Ok(TransactionResult::Transaction {
866 kind: TransactionKind::Begin,
867 })
868 }
869
870 pub fn commit_transaction(
873 &self,
874 ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
875 tracing::trace!(
876 "[COMMIT] commit_transaction called for session {:?}",
877 self.session_id
878 );
879 let mut guard = self
880 .transactions
881 .lock()
882 .expect("transactions lock poisoned");
883 tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
884 let tx_opt = guard.remove(&self.session_id);
885 tracing::trace!(
886 "[COMMIT] commit_transaction remove returned: {}",
887 tx_opt.is_some()
888 );
889 let tx = tx_opt.ok_or_else(|| {
890 tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
891 Error::InvalidArgumentError(
892 "no transaction is currently in progress in this session".into(),
893 )
894 })?;
895 tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
896
897 if tx.is_aborted {
899 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
900 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
901 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
902 let auto_commit_snapshot = TransactionSnapshot {
904 txn_id: TXN_ID_AUTO_COMMIT,
905 snapshot_id: tx.txn_manager.last_committed(),
906 };
907 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
908 tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
909 return Ok((
910 TransactionResult::Transaction {
911 kind: TransactionKind::Rollback,
912 },
913 Vec::new(),
914 ));
915 }
916
917 tracing::debug!(
920 "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
921 tx.snapshot.txn_id,
922 tx.accessed_tables.len()
923 );
924 for accessed_table_name in &tx.accessed_tables {
925 tracing::debug!(
926 "[COMMIT CONFLICT CHECK] Checking table '{}'",
927 accessed_table_name
928 );
929 if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
931 match self.context.table_id(accessed_table_name) {
933 Ok(current_table_id) => {
934 if current_table_id != snapshot_table_id {
936 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
937 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
938 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
939 let auto_commit_snapshot = TransactionSnapshot {
940 txn_id: TXN_ID_AUTO_COMMIT,
941 snapshot_id: tx.txn_manager.last_committed(),
942 };
943 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
944 return Err(Error::TransactionContextError(
945 "another transaction has dropped this table".into(),
946 ));
947 }
948 }
949 Err(_) => {
950 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
952 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
953 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
954 let auto_commit_snapshot = TransactionSnapshot {
955 txn_id: TXN_ID_AUTO_COMMIT,
956 snapshot_id: tx.txn_manager.last_committed(),
957 };
958 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
959 return Err(Error::TransactionContextError(
960 "another transaction has dropped this table".into(),
961 ));
962 }
963 }
964 }
965 }
966
967 if let Err(err) = tx
968 .base_context
969 .validate_commit_constraints(tx.snapshot.txn_id)
970 {
971 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
972 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
973 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
974 let auto_commit_snapshot = TransactionSnapshot {
975 txn_id: TXN_ID_AUTO_COMMIT,
976 snapshot_id: tx.txn_manager.last_committed(),
977 };
978 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
979 let wrapped = match err {
980 Error::ConstraintError(msg) => Error::TransactionContextError(format!(
981 "TransactionContext Error: constraint violation: {msg}"
982 )),
983 other => other,
984 };
985 return Err(wrapped);
986 }
987
988 let operations = tx.operations;
989 tracing::trace!(
990 "DEBUG commit_transaction: returning Commit with {} operations",
991 operations.len()
992 );
993
994 tx.txn_manager.mark_committed(tx.snapshot.txn_id);
995 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
996 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
997 TransactionContext::set_snapshot(&*self.context, tx.snapshot);
998
999 Ok((
1000 TransactionResult::Transaction {
1001 kind: TransactionKind::Commit,
1002 },
1003 operations,
1004 ))
1005 }
1006
1007 pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
1009 let mut guard = self
1010 .transactions
1011 .lock()
1012 .expect("transactions lock poisoned");
1013 if let Some(tx) = guard.remove(&self.session_id) {
1014 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1015 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1016 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1017 let auto_commit_snapshot = TransactionSnapshot {
1019 txn_id: TXN_ID_AUTO_COMMIT,
1020 snapshot_id: tx.txn_manager.last_committed(),
1021 };
1022 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1023 } else {
1024 return Err(Error::InvalidArgumentError(
1025 "no transaction is currently in progress in this session".into(),
1026 ));
1027 }
1028 Ok(TransactionResult::Transaction {
1029 kind: TransactionKind::Rollback,
1030 })
1031 }
1032
1033 pub fn execute_operation(
1035 &self,
1036 operation: PlanOperation,
1037 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
1038 tracing::debug!(
1039 "[EXECUTE_OP] execute_operation called for session_id={}",
1040 self.session_id
1041 );
1042 if !self.has_active_transaction() {
1043 return Err(Error::InvalidArgumentError(
1045 "execute_operation called without active transaction".into(),
1046 ));
1047 }
1048
1049 if let PlanOperation::CreateTable(ref plan) = operation {
1051 let guard = self
1052 .transactions
1053 .lock()
1054 .expect("transactions lock poisoned");
1055
1056 let canonical_name = plan.name.to_ascii_lowercase();
1057
1058 for (other_session_id, other_tx) in guard.iter() {
1060 if *other_session_id != self.session_id
1061 && other_tx.locked_table_names.contains(&canonical_name)
1062 {
1063 return Err(Error::TransactionContextError(format!(
1064 "table '{}' is locked by another active transaction",
1065 plan.name
1066 )));
1067 }
1068 }
1069 drop(guard); }
1071
1072 if let PlanOperation::DropTable(ref plan) = operation {
1074 let guard = self
1075 .transactions
1076 .lock()
1077 .expect("transactions lock poisoned");
1078
1079 let canonical_name = plan.name.to_ascii_lowercase();
1080
1081 for (other_session_id, other_tx) in guard.iter() {
1083 if *other_session_id != self.session_id
1084 && other_tx.locked_table_names.contains(&canonical_name)
1085 {
1086 return Err(Error::TransactionContextError(format!(
1087 "table '{}' is locked by another active transaction",
1088 plan.name
1089 )));
1090 }
1091 }
1092 drop(guard); }
1094
1095 let mut guard = self
1097 .transactions
1098 .lock()
1099 .expect("transactions lock poisoned");
1100 tracing::debug!(
1101 "[EXECUTE_OP] session_id={}, transactions map has {} entries",
1102 self.session_id,
1103 guard.len()
1104 );
1105 let tx = guard
1106 .get_mut(&self.session_id)
1107 .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
1108 tracing::debug!(
1109 "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
1110 self.session_id,
1111 tx.snapshot.txn_id,
1112 tx.accessed_tables.len()
1113 );
1114
1115 let result = tx.execute_operation(operation);
1116 if let Err(ref e) = result {
1117 tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1118 tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1119 }
1120 result
1121 }
1122}
1123
1124impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1125where
1126 BaseCtx: TransactionContext,
1127 StagingCtx: TransactionContext,
1128{
1129 fn drop(&mut self) {
1130 match self.transactions.lock() {
1133 Ok(mut guard) => {
1134 if guard.remove(&self.session_id).is_some() {
1135 eprintln!(
1136 "Warning: TransactionSession dropped with active transaction - auto-rolling back"
1137 );
1138 }
1139 }
1140 Err(_) => {
1141 tracing::trace!(
1144 "Warning: TransactionSession dropped with poisoned transaction mutex"
1145 );
1146 }
1147 }
1148 }
1149}
1150
1151pub struct TransactionManager<BaseCtx, StagingCtx>
1156where
1157 BaseCtx: TransactionContext + 'static,
1158 StagingCtx: TransactionContext + 'static,
1159{
1160 transactions:
1161 Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1162 next_session_id: AtomicU64,
1163 txn_manager: Arc<TxnIdManager>,
1164}
1165
1166impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1167where
1168 BaseCtx: TransactionContext + 'static,
1169 StagingCtx: TransactionContext + 'static,
1170{
1171 pub fn new() -> Self {
1172 Self {
1173 transactions: Arc::new(Mutex::new(HashMap::new())),
1174 next_session_id: AtomicU64::new(1),
1175 txn_manager: Arc::new(TxnIdManager::new()),
1176 }
1177 }
1178
1179 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1181 Self {
1182 transactions: Arc::new(Mutex::new(HashMap::new())),
1183 next_session_id: AtomicU64::new(1),
1184 txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1185 }
1186 }
1187
1188 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1190 Self {
1191 transactions: Arc::new(Mutex::new(HashMap::new())),
1192 next_session_id: AtomicU64::new(1),
1193 txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1194 next_txn_id,
1195 last_committed,
1196 )),
1197 }
1198 }
1199
1200 pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1202 let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1203 tracing::debug!(
1204 "[TX_MANAGER] create_session: allocated session_id={}",
1205 session_id
1206 );
1207 TransactionSession::new(
1208 context,
1209 session_id,
1210 Arc::clone(&self.transactions),
1211 Arc::clone(&self.txn_manager),
1212 )
1213 }
1214
1215 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1217 Arc::clone(&self.txn_manager)
1218 }
1219
1220 pub fn has_active_transaction(&self) -> bool {
1222 !self
1223 .transactions
1224 .lock()
1225 .expect("transactions lock poisoned")
1226 .is_empty()
1227 }
1228}
1229
1230impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1231where
1232 BaseCtx: TransactionContext + 'static,
1233 StagingCtx: TransactionContext + 'static,
1234{
1235 fn default() -> Self {
1236 Self::new()
1237 }
1238}