1use std::collections::{HashMap, HashSet};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, Mutex};
12
13use arrow::array::RecordBatch;
14use llkv_column_map::types::TableId;
15use llkv_executor::{ExecutorRowBatch, SelectExecution};
16use llkv_expr::expr::Expr as LlkvExpr;
17use llkv_plan::plans::{
18 CreateIndexPlan, CreateTablePlan, DeletePlan, DropTablePlan, InsertPlan, PlanColumnSpec,
19 PlanOperation, SelectPlan, TruncatePlan, UpdatePlan,
20};
21use llkv_result::{Error, Result as LlkvResult};
22use llkv_storage::pager::Pager;
23use llkv_table::CatalogDdl;
24use simd_r_drive_entry_handle::EntryHandle;
25
26use crate::mvcc::{TXN_ID_AUTO_COMMIT, TransactionSnapshot, TxnId, TxnIdManager};
27use crate::types::{TransactionCatalogSnapshot, TransactionKind, TransactionResult};
28
29pub type TransactionSessionId = u64;
34
35fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
37 if plan.tables.len() == 1 {
38 Some(plan.tables[0].qualified_name())
39 } else {
40 None
41 }
42}
43
44pub trait TransactionContext: CatalogDdl + Send + Sync {
49 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
51
52 type Snapshot: TransactionCatalogSnapshot;
54
55 fn set_snapshot(&self, snapshot: TransactionSnapshot);
57
58 fn snapshot(&self) -> TransactionSnapshot;
60
61 fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<PlanColumnSpec>>;
63
64 fn export_table_rows(&self, table_name: &str) -> LlkvResult<ExecutorRowBatch>;
66
67 fn get_batches_with_row_ids(
69 &self,
70 table_name: &str,
71 filter: Option<LlkvExpr<'static, String>>,
72 ) -> LlkvResult<Vec<RecordBatch>>;
73
74 fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
76
77 fn apply_create_table_plan(
79 &self,
80 plan: CreateTablePlan,
81 ) -> LlkvResult<TransactionResult<Self::Pager>>;
82
83 fn drop_table(&self, plan: DropTablePlan) -> LlkvResult<()>;
85
86 fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
88
89 fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
91
92 fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
94
95 fn truncate(&self, plan: TruncatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
97
98 fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
100
101 fn append_batches_with_row_ids(
103 &self,
104 table_name: &str,
105 batches: Vec<RecordBatch>,
106 ) -> LlkvResult<usize>;
107
108 fn table_names(&self) -> Vec<String>;
110
111 fn table_id(&self, table_name: &str) -> LlkvResult<TableId>;
113
114 fn catalog_snapshot(&self) -> Self::Snapshot;
116
117 fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
119 Ok(())
120 }
121
122 fn clear_transaction_state(&self, _txn_id: TxnId) {}
124}
125
126pub struct SessionTransaction<BaseCtx, StagingCtx>
134where
135 BaseCtx: TransactionContext + 'static,
136 StagingCtx: TransactionContext + 'static,
137{
138 snapshot: TransactionSnapshot,
140 staging: Arc<StagingCtx>,
142 operations: Vec<PlanOperation>,
144 staged_tables: HashSet<String>,
146 new_tables: HashSet<String>,
148 missing_tables: HashSet<String>,
150 locked_table_names: HashSet<String>,
153 catalog_snapshot: BaseCtx::Snapshot,
156 base_context: Arc<BaseCtx>,
158 is_aborted: bool,
160 txn_manager: Arc<TxnIdManager>,
162 accessed_tables: HashSet<String>,
164 transactional_foreign_keys: HashMap<String, Vec<String>>,
168}
169
170impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
171where
172 BaseCtx: TransactionContext + 'static,
173 StagingCtx: TransactionContext + 'static,
174{
175 pub fn new(
176 base_context: Arc<BaseCtx>,
177 staging: Arc<StagingCtx>,
178 txn_manager: Arc<TxnIdManager>,
179 ) -> Self {
180 let catalog_snapshot = base_context.catalog_snapshot();
183
184 let snapshot = txn_manager.begin_transaction();
185 tracing::debug!(
186 "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
187 snapshot.txn_id,
188 snapshot.snapshot_id
189 );
190 TransactionContext::set_snapshot(&*base_context, snapshot);
191 TransactionContext::set_snapshot(&*staging, snapshot);
192
193 Self {
194 staging,
195 operations: Vec::new(),
196 staged_tables: HashSet::new(),
197 new_tables: HashSet::new(),
198 missing_tables: HashSet::new(),
199 locked_table_names: HashSet::new(),
200 catalog_snapshot,
201 base_context,
202 is_aborted: false,
203 accessed_tables: HashSet::new(),
204 snapshot,
205 txn_manager,
206 transactional_foreign_keys: HashMap::new(),
207 }
208 }
209
210 fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
213 tracing::trace!(
214 "[ENSURE] ensure_table_exists called for table='{}'",
215 table_name
216 );
217
218 if self.staged_tables.contains(table_name) {
220 tracing::trace!("[ENSURE] table already verified to exist");
221 return Ok(());
222 }
223
224 if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
226 {
227 self.missing_tables.insert(table_name.to_string());
228 return Err(Error::CatalogError(format!(
229 "Catalog Error: Table '{table_name}' does not exist"
230 )));
231 }
232
233 if self.missing_tables.contains(table_name) {
234 return Err(Error::CatalogError(format!(
235 "Catalog Error: Table '{table_name}' does not exist"
236 )));
237 }
238
239 if self.new_tables.contains(table_name) {
241 tracing::trace!("[ENSURE] Table was created in this transaction");
242 match self.staging.table_column_specs(table_name) {
244 Ok(_) => {
245 self.staged_tables.insert(table_name.to_string());
246 return Ok(());
247 }
248 Err(_) => {
249 return Err(Error::CatalogError(format!(
250 "Catalog Error: Table '{table_name}' was created but not found in staging"
251 )));
252 }
253 }
254 }
255
256 tracing::trace!(
258 "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
259 );
260 self.staged_tables.insert(table_name.to_string());
261 Ok(())
262 }
263
264 pub fn execute_select(
268 &mut self,
269 plan: SelectPlan,
270 ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
271 let table_name = select_plan_table_name(&plan).ok_or_else(|| {
273 Error::InvalidArgumentError(
274 "Transaction execute_select requires single-table query".into(),
275 )
276 })?;
277
278 self.ensure_table_exists(&table_name)?;
280
281 if self.new_tables.contains(&table_name) {
283 tracing::trace!(
284 "[SELECT] Reading from staging for new table '{}'",
285 table_name
286 );
287 return self.staging.execute_select(plan);
288 }
289
290 self.accessed_tables.insert(table_name.clone());
292
293 tracing::trace!(
296 "[SELECT] Reading from BASE with MVCC for existing table '{}'",
297 table_name
298 );
299 self.base_context.execute_select(plan).and_then(|exec| {
300 let schema = exec.schema();
304 let batches = exec.collect().unwrap_or_default();
305 let combined = if batches.is_empty() {
306 RecordBatch::new_empty(Arc::clone(&schema))
307 } else if batches.len() == 1 {
308 batches.into_iter().next().unwrap()
309 } else {
310 let refs: Vec<&RecordBatch> = batches.iter().collect();
311 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
312 Error::Internal(format!("failed to concatenate batches: {err}"))
313 })?
314 };
315 Ok(SelectExecution::from_batch(
316 table_name,
317 Arc::clone(&schema),
318 combined,
319 ))
320 })
321 }
322
323 pub fn execute_operation(
325 &mut self,
326 operation: PlanOperation,
327 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
328 tracing::trace!(
329 "[TX] SessionTransaction::execute_operation called, operation={:?}",
330 match &operation {
331 PlanOperation::Insert(p) => format!("INSERT({})", p.table),
332 PlanOperation::Update(p) => format!("UPDATE({})", p.table),
333 PlanOperation::Delete(p) => format!("DELETE({})", p.table),
334 PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
335 _ => "OTHER".to_string(),
336 }
337 );
338 if self.is_aborted {
340 return Err(Error::TransactionContextError(
341 "TransactionContext Error: transaction is aborted".into(),
342 ));
343 }
344
345 let result = match operation {
347 PlanOperation::CreateTable(ref plan) => {
348 for fk in &plan.foreign_keys {
352 let canonical_ref_table = fk.referenced_table.to_ascii_lowercase();
353 if !self.new_tables.contains(&canonical_ref_table)
355 && !self.catalog_snapshot.table_exists(&canonical_ref_table)
356 {
357 self.is_aborted = true;
358 return Err(Error::CatalogError(format!(
359 "Catalog Error: referenced table '{}' does not exist",
360 fk.referenced_table
361 )));
362 }
363 }
364
365 let mut staging_plan = plan.clone();
369 staging_plan.foreign_keys.clear();
370
371 match self.staging.apply_create_table_plan(staging_plan) {
372 Ok(result) => {
373 self.new_tables.insert(plan.name.clone());
375 self.missing_tables.remove(&plan.name);
376 self.staged_tables.insert(plan.name.clone());
377 self.locked_table_names
379 .insert(plan.name.to_ascii_lowercase());
380
381 for fk in &plan.foreign_keys {
383 let referenced_table = fk.referenced_table.to_ascii_lowercase();
384 self.transactional_foreign_keys
385 .entry(referenced_table)
386 .or_default()
387 .push(plan.name.to_ascii_lowercase());
388 }
389
390 self.operations
392 .push(PlanOperation::CreateTable(plan.clone()));
393 result.convert_pager_type()?
394 }
395 Err(e) => {
396 self.is_aborted = true;
397 return Err(e);
398 }
399 }
400 }
401 PlanOperation::DropTable(ref plan) => {
402 let canonical_name = plan.name.to_ascii_lowercase();
403
404 self.locked_table_names.insert(canonical_name.clone());
406
407 if self.new_tables.contains(&canonical_name) {
409 TransactionContext::drop_table(self.staging.as_ref(), plan.clone())?;
412 self.new_tables.remove(&canonical_name);
413 self.staged_tables.remove(&canonical_name);
414
415 self.transactional_foreign_keys.iter_mut().for_each(
417 |(_, referencing_tables)| {
418 referencing_tables.retain(|t| t != &canonical_name);
419 },
420 );
421 self.transactional_foreign_keys
423 .retain(|_, referencing_tables| !referencing_tables.is_empty());
424
425 self.operations.retain(|op| {
427 !matches!(op, PlanOperation::CreateTable(p) if p.name.to_ascii_lowercase() == canonical_name)
428 });
429 TransactionResult::NoOp
432 } else {
433 if !self.catalog_snapshot.table_exists(&canonical_name) && !plan.if_exists {
436 self.is_aborted = true;
437 return Err(Error::InvalidArgumentError(format!(
438 "table '{}' does not exist",
439 plan.name
440 )));
441 }
442
443 if self.catalog_snapshot.table_exists(&canonical_name) {
444 self.missing_tables.insert(canonical_name.clone());
446 self.staged_tables.remove(&canonical_name);
447 self.operations.push(PlanOperation::DropTable(plan.clone()));
449 }
450 TransactionResult::NoOp
451 }
452 }
453 PlanOperation::Insert(ref plan) => {
454 tracing::trace!(
455 "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
456 plan.table
457 );
458 if let Err(e) = self.ensure_table_exists(&plan.table) {
460 self.is_aborted = true;
461 return Err(e);
462 }
463
464 let is_new_table = self.new_tables.contains(&plan.table);
467 if !is_new_table {
469 self.accessed_tables.insert(plan.table.clone());
470 }
471 let result = if is_new_table {
472 tracing::trace!("[TX] INSERT into staging for new table");
473 self.staging.insert(plan.clone())
474 } else {
475 tracing::trace!(
476 "[TX] INSERT directly into BASE with txn_id={}",
477 self.snapshot.txn_id
478 );
479 self.base_context
481 .insert(plan.clone())
482 .and_then(|r| r.convert_pager_type())
483 };
484
485 match result {
486 Ok(result) => {
487 if is_new_table {
490 tracing::trace!(
491 "[TX] INSERT to new table - tracking for commit replay"
492 );
493 self.operations.push(PlanOperation::Insert(plan.clone()));
494 } else {
495 tracing::trace!(
496 "[TX] INSERT to existing table - already in BASE, no replay needed"
497 );
498 }
499 result
500 }
501 Err(e) => {
502 tracing::trace!(
503 "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
504 e
505 );
506 tracing::trace!("DEBUG setting is_aborted=true");
507 self.is_aborted = true;
508 return Err(e);
509 }
510 }
511 }
512 PlanOperation::Update(ref plan) => {
513 if let Err(e) = self.ensure_table_exists(&plan.table) {
514 self.is_aborted = true;
515 return Err(e);
516 }
517
518 let is_new_table = self.new_tables.contains(&plan.table);
521 if !is_new_table {
523 self.accessed_tables.insert(plan.table.clone());
524 }
525 let result = if is_new_table {
526 tracing::trace!("[TX] UPDATE in staging for new table");
527 self.staging.update(plan.clone())
528 } else {
529 tracing::trace!(
530 "[TX] UPDATE directly in BASE with txn_id={}",
531 self.snapshot.txn_id
532 );
533 self.base_context
534 .update(plan.clone())
535 .and_then(|r| r.convert_pager_type())
536 };
537
538 match result {
539 Ok(result) => {
540 if is_new_table {
542 tracing::trace!(
543 "[TX] UPDATE to new table - tracking for commit replay"
544 );
545 self.operations.push(PlanOperation::Update(plan.clone()));
546 } else {
547 tracing::trace!(
548 "[TX] UPDATE to existing table - already in BASE, no replay needed"
549 );
550 }
551 result
552 }
553 Err(e) => {
554 self.is_aborted = true;
555 return Err(e);
556 }
557 }
558 }
559 PlanOperation::Delete(ref plan) => {
560 tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
561 if let Err(e) = self.ensure_table_exists(&plan.table) {
562 tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
563 self.is_aborted = true;
564 return Err(e);
565 }
566
567 let is_new_table = self.new_tables.contains(&plan.table);
570 tracing::debug!("[DELETE] is_new_table={}", is_new_table);
571 if !is_new_table {
573 tracing::debug!(
574 "[DELETE] Tracking access to existing table '{}'",
575 plan.table
576 );
577 self.accessed_tables.insert(plan.table.clone());
578 }
579 let result = if is_new_table {
580 tracing::debug!("[DELETE] Deleting from staging for new table");
581 self.staging.delete(plan.clone())
582 } else {
583 tracing::debug!(
584 "[DELETE] Deleting from BASE with txn_id={}",
585 self.snapshot.txn_id
586 );
587 self.base_context
588 .delete(plan.clone())
589 .and_then(|r| r.convert_pager_type())
590 };
591
592 tracing::debug!(
593 "[DELETE] Result: {:?}",
594 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
595 );
596 match result {
597 Ok(result) => {
598 if is_new_table {
600 tracing::trace!(
601 "[TX] DELETE from new table - tracking for commit replay"
602 );
603 self.operations.push(PlanOperation::Delete(plan.clone()));
604 } else {
605 tracing::trace!(
606 "[TX] DELETE from existing table - already in BASE, no replay needed"
607 );
608 }
609 result
610 }
611 Err(e) => {
612 self.is_aborted = true;
613 return Err(e);
614 }
615 }
616 }
617 PlanOperation::Truncate(ref plan) => {
618 tracing::debug!("[TRUNCATE] Starting truncate for table '{}'", plan.table);
619 if let Err(e) = self.ensure_table_exists(&plan.table) {
620 tracing::debug!("[TRUNCATE] ensure_table_exists failed: {}", e);
621 self.is_aborted = true;
622 return Err(e);
623 }
624
625 let is_new_table = self.new_tables.contains(&plan.table);
627 tracing::debug!("[TRUNCATE] is_new_table={}", is_new_table);
628 if !is_new_table {
630 tracing::debug!(
631 "[TRUNCATE] Tracking access to existing table '{}'",
632 plan.table
633 );
634 self.accessed_tables.insert(plan.table.clone());
635 }
636 let result = if is_new_table {
637 tracing::debug!("[TRUNCATE] Truncating staging for new table");
638 self.staging.truncate(plan.clone())
639 } else {
640 tracing::debug!(
641 "[TRUNCATE] Truncating BASE with txn_id={}",
642 self.snapshot.txn_id
643 );
644 self.base_context
645 .truncate(plan.clone())
646 .and_then(|r| r.convert_pager_type())
647 };
648
649 tracing::debug!(
650 "[TRUNCATE] Result: {:?}",
651 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
652 );
653 match result {
654 Ok(result) => {
655 if is_new_table {
657 tracing::trace!(
658 "[TX] TRUNCATE on new table - tracking for commit replay"
659 );
660 self.operations.push(PlanOperation::Truncate(plan.clone()));
661 } else {
662 tracing::trace!(
663 "[TX] TRUNCATE on existing table - already in BASE, no replay needed"
664 );
665 }
666 result
667 }
668 Err(e) => {
669 self.is_aborted = true;
670 return Err(e);
671 }
672 }
673 }
674 PlanOperation::Select(ref plan) => {
675 let table_name = select_plan_table_name(plan).unwrap_or_default();
678 match self.execute_select(plan.clone()) {
679 Ok(staging_execution) => {
680 let schema = staging_execution.schema();
682 let batches = staging_execution.collect().unwrap_or_default();
683
684 let combined = if batches.is_empty() {
686 RecordBatch::new_empty(Arc::clone(&schema))
687 } else if batches.len() == 1 {
688 batches.into_iter().next().unwrap()
689 } else {
690 let refs: Vec<&RecordBatch> = batches.iter().collect();
691 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
692 Error::Internal(format!("failed to concatenate batches: {err}"))
693 })?
694 };
695
696 let execution = SelectExecution::from_batch(
698 table_name.clone(),
699 Arc::clone(&schema),
700 combined,
701 );
702
703 TransactionResult::Select {
704 table_name,
705 schema,
706 execution,
707 }
708 }
709 Err(e) => {
710 return Err(e);
713 }
714 }
715 }
716 };
717
718 Ok(result)
719 }
720
721 pub fn operations(&self) -> &[PlanOperation] {
723 &self.operations
724 }
725}
726
727pub struct TransactionSession<BaseCtx, StagingCtx>
731where
732 BaseCtx: TransactionContext + 'static,
733 StagingCtx: TransactionContext + 'static,
734{
735 context: Arc<BaseCtx>,
736 session_id: TransactionSessionId,
737 transactions:
738 Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
739 txn_manager: Arc<TxnIdManager>,
740}
741
742impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
743where
744 BaseCtx: TransactionContext + 'static,
745 StagingCtx: TransactionContext + 'static,
746{
747 pub fn new(
748 context: Arc<BaseCtx>,
749 session_id: TransactionSessionId,
750 transactions: Arc<
751 Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>,
752 >,
753 txn_manager: Arc<TxnIdManager>,
754 ) -> Self {
755 Self {
756 context,
757 session_id,
758 transactions,
759 txn_manager,
760 }
761 }
762
763 pub fn clone_session(&self) -> Self {
766 Self {
767 context: Arc::clone(&self.context),
768 session_id: self.session_id,
769 transactions: Arc::clone(&self.transactions),
770 txn_manager: Arc::clone(&self.txn_manager),
771 }
772 }
773
774 pub fn session_id(&self) -> TransactionSessionId {
776 self.session_id
777 }
778
779 pub fn context(&self) -> &Arc<BaseCtx> {
781 &self.context
782 }
783
784 pub fn has_active_transaction(&self) -> bool {
786 self.transactions
787 .lock()
788 .expect("transactions lock poisoned")
789 .contains_key(&self.session_id)
790 }
791
792 pub fn is_aborted(&self) -> bool {
794 self.transactions
795 .lock()
796 .expect("transactions lock poisoned")
797 .get(&self.session_id)
798 .map(|tx| tx.is_aborted)
799 .unwrap_or(false)
800 }
801
802 pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
805 self.transactions
806 .lock()
807 .expect("transactions lock poisoned")
808 .get(&self.session_id)
809 .map(|tx| tx.new_tables.contains(table_name))
810 .unwrap_or(false)
811 }
812
813 pub fn table_column_specs_from_transaction(
816 &self,
817 table_name: &str,
818 ) -> Option<Vec<PlanColumnSpec>> {
819 let guard = self
820 .transactions
821 .lock()
822 .expect("transactions lock poisoned");
823
824 let tx = guard.get(&self.session_id)?;
825 if !tx.new_tables.contains(table_name) {
826 return None;
827 }
828
829 tx.staging.table_column_specs(table_name).ok()
831 }
832
833 pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
836 let canonical = referenced_table.to_ascii_lowercase();
837 let guard = self
838 .transactions
839 .lock()
840 .expect("transactions lock poisoned");
841
842 let tx = match guard.get(&self.session_id) {
843 Some(tx) => tx,
844 None => return Vec::new(),
845 };
846
847 tx.transactional_foreign_keys
848 .get(&canonical)
849 .cloned()
850 .unwrap_or_else(Vec::new)
851 }
852
853 pub fn has_table_locked_by_other_session(&self, table_name: &str) -> bool {
856 let canonical = table_name.to_ascii_lowercase();
857 let guard = self
858 .transactions
859 .lock()
860 .expect("transactions lock poisoned");
861
862 for (session_id, tx) in guard.iter() {
863 if *session_id == self.session_id {
865 continue;
866 }
867
868 if tx.locked_table_names.contains(&canonical) {
870 return true;
871 }
872 }
873
874 false
875 }
876
877 pub fn abort_transaction(&self) {
880 let mut guard = self
881 .transactions
882 .lock()
883 .expect("transactions lock poisoned");
884 if let Some(tx) = guard.get_mut(&self.session_id) {
885 tx.is_aborted = true;
886 }
887 }
888
889 pub fn begin_transaction(
891 &self,
892 staging: Arc<StagingCtx>,
893 ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
894 tracing::debug!(
895 "[BEGIN] begin_transaction called for session_id={}",
896 self.session_id
897 );
898 let mut guard = self
899 .transactions
900 .lock()
901 .expect("transactions lock poisoned");
902 tracing::debug!(
903 "[BEGIN] session_id={}, transactions map has {} entries",
904 self.session_id,
905 guard.len()
906 );
907 if guard.contains_key(&self.session_id) {
908 return Err(Error::InvalidArgumentError(
909 "a transaction is already in progress in this session".into(),
910 ));
911 }
912 guard.insert(
913 self.session_id,
914 SessionTransaction::new(
915 Arc::clone(&self.context),
916 staging,
917 Arc::clone(&self.txn_manager),
918 ),
919 );
920 tracing::debug!(
921 "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
922 self.session_id,
923 guard.len()
924 );
925 Ok(TransactionResult::Transaction {
926 kind: TransactionKind::Begin,
927 })
928 }
929
930 pub fn commit_transaction(
933 &self,
934 ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
935 tracing::trace!(
936 "[COMMIT] commit_transaction called for session {:?}",
937 self.session_id
938 );
939 let mut guard = self
940 .transactions
941 .lock()
942 .expect("transactions lock poisoned");
943 tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
944 let tx_opt = guard.remove(&self.session_id);
945 tracing::trace!(
946 "[COMMIT] commit_transaction remove returned: {}",
947 tx_opt.is_some()
948 );
949 let tx = tx_opt.ok_or_else(|| {
950 tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
951 Error::InvalidArgumentError(
952 "no transaction is currently in progress in this session".into(),
953 )
954 })?;
955 tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
956
957 if tx.is_aborted {
959 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
960 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
961 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
962 let auto_commit_snapshot = TransactionSnapshot {
964 txn_id: TXN_ID_AUTO_COMMIT,
965 snapshot_id: tx.txn_manager.last_committed(),
966 };
967 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
968 tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
969 return Ok((
970 TransactionResult::Transaction {
971 kind: TransactionKind::Rollback,
972 },
973 Vec::new(),
974 ));
975 }
976
977 tracing::debug!(
980 "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
981 tx.snapshot.txn_id,
982 tx.accessed_tables.len()
983 );
984 for accessed_table_name in &tx.accessed_tables {
985 tracing::debug!(
986 "[COMMIT CONFLICT CHECK] Checking table '{}'",
987 accessed_table_name
988 );
989 if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
991 match self.context.table_id(accessed_table_name) {
993 Ok(current_table_id) => {
994 if current_table_id != snapshot_table_id {
996 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
997 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
998 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
999 let auto_commit_snapshot = TransactionSnapshot {
1000 txn_id: TXN_ID_AUTO_COMMIT,
1001 snapshot_id: tx.txn_manager.last_committed(),
1002 };
1003 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1004 return Err(Error::TransactionContextError(
1005 "another transaction has dropped this table".into(),
1006 ));
1007 }
1008 }
1009 Err(_) => {
1010 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1012 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1013 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1014 let auto_commit_snapshot = TransactionSnapshot {
1015 txn_id: TXN_ID_AUTO_COMMIT,
1016 snapshot_id: tx.txn_manager.last_committed(),
1017 };
1018 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1019 return Err(Error::TransactionContextError(
1020 "another transaction has dropped this table".into(),
1021 ));
1022 }
1023 }
1024 }
1025 }
1026
1027 if let Err(err) = tx
1028 .base_context
1029 .validate_commit_constraints(tx.snapshot.txn_id)
1030 {
1031 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1032 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1033 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1034 let auto_commit_snapshot = TransactionSnapshot {
1035 txn_id: TXN_ID_AUTO_COMMIT,
1036 snapshot_id: tx.txn_manager.last_committed(),
1037 };
1038 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1039 let wrapped = match err {
1040 Error::ConstraintError(msg) => Error::TransactionContextError(format!(
1041 "TransactionContext Error: constraint violation: {msg}"
1042 )),
1043 other => other,
1044 };
1045 return Err(wrapped);
1046 }
1047
1048 let operations = tx.operations;
1049 tracing::trace!(
1050 "DEBUG commit_transaction: returning Commit with {} operations",
1051 operations.len()
1052 );
1053
1054 tx.txn_manager.mark_committed(tx.snapshot.txn_id);
1055 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1056 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1057 TransactionContext::set_snapshot(&*self.context, tx.snapshot);
1058
1059 Ok((
1060 TransactionResult::Transaction {
1061 kind: TransactionKind::Commit,
1062 },
1063 operations,
1064 ))
1065 }
1066
1067 pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
1069 let mut guard = self
1070 .transactions
1071 .lock()
1072 .expect("transactions lock poisoned");
1073 if let Some(tx) = guard.remove(&self.session_id) {
1074 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1075 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1076 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1077 let auto_commit_snapshot = TransactionSnapshot {
1079 txn_id: TXN_ID_AUTO_COMMIT,
1080 snapshot_id: tx.txn_manager.last_committed(),
1081 };
1082 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1083 } else {
1084 return Err(Error::InvalidArgumentError(
1085 "no transaction is currently in progress in this session".into(),
1086 ));
1087 }
1088 Ok(TransactionResult::Transaction {
1089 kind: TransactionKind::Rollback,
1090 })
1091 }
1092
1093 pub fn execute_operation(
1095 &self,
1096 operation: PlanOperation,
1097 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
1098 tracing::debug!(
1099 "[EXECUTE_OP] execute_operation called for session_id={}",
1100 self.session_id
1101 );
1102 if !self.has_active_transaction() {
1103 return Err(Error::InvalidArgumentError(
1105 "execute_operation called without active transaction".into(),
1106 ));
1107 }
1108
1109 if let PlanOperation::CreateTable(ref plan) = operation {
1111 let guard = self
1112 .transactions
1113 .lock()
1114 .expect("transactions lock poisoned");
1115
1116 let canonical_name = plan.name.to_ascii_lowercase();
1117
1118 for (other_session_id, other_tx) in guard.iter() {
1120 if *other_session_id != self.session_id
1121 && other_tx.locked_table_names.contains(&canonical_name)
1122 {
1123 return Err(Error::TransactionContextError(format!(
1124 "table '{}' is locked by another active transaction",
1125 plan.name
1126 )));
1127 }
1128 }
1129 drop(guard); }
1131
1132 if let PlanOperation::DropTable(ref plan) = operation {
1134 let guard = self
1135 .transactions
1136 .lock()
1137 .expect("transactions lock poisoned");
1138
1139 let canonical_name = plan.name.to_ascii_lowercase();
1140
1141 for (other_session_id, other_tx) in guard.iter() {
1143 if *other_session_id != self.session_id
1144 && other_tx.locked_table_names.contains(&canonical_name)
1145 {
1146 return Err(Error::TransactionContextError(format!(
1147 "table '{}' is locked by another active transaction",
1148 plan.name
1149 )));
1150 }
1151 }
1152 drop(guard); }
1154
1155 let mut guard = self
1157 .transactions
1158 .lock()
1159 .expect("transactions lock poisoned");
1160 tracing::debug!(
1161 "[EXECUTE_OP] session_id={}, transactions map has {} entries",
1162 self.session_id,
1163 guard.len()
1164 );
1165 let tx = guard
1166 .get_mut(&self.session_id)
1167 .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
1168 tracing::debug!(
1169 "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
1170 self.session_id,
1171 tx.snapshot.txn_id,
1172 tx.accessed_tables.len()
1173 );
1174
1175 let result = tx.execute_operation(operation);
1176 if let Err(ref e) = result {
1177 tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1178 tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1179 }
1180 result
1181 }
1182}
1183
1184impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1185where
1186 BaseCtx: TransactionContext,
1187 StagingCtx: TransactionContext,
1188{
1189 fn drop(&mut self) {
1190 match self.transactions.lock() {
1193 Ok(mut guard) => {
1194 if guard.remove(&self.session_id).is_some() {
1195 eprintln!(
1196 "Warning: TransactionSession dropped with active transaction - auto-rolling back"
1197 );
1198 }
1199 }
1200 Err(_) => {
1201 tracing::trace!(
1204 "Warning: TransactionSession dropped with poisoned transaction mutex"
1205 );
1206 }
1207 }
1208 }
1209}
1210
1211pub struct TransactionManager<BaseCtx, StagingCtx>
1216where
1217 BaseCtx: TransactionContext + 'static,
1218 StagingCtx: TransactionContext + 'static,
1219{
1220 transactions:
1221 Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1222 next_session_id: AtomicU64,
1223 txn_manager: Arc<TxnIdManager>,
1224}
1225
1226impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1227where
1228 BaseCtx: TransactionContext + 'static,
1229 StagingCtx: TransactionContext + 'static,
1230{
1231 pub fn new() -> Self {
1232 Self {
1233 transactions: Arc::new(Mutex::new(HashMap::new())),
1234 next_session_id: AtomicU64::new(1),
1235 txn_manager: Arc::new(TxnIdManager::new()),
1236 }
1237 }
1238
1239 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1241 Self {
1242 transactions: Arc::new(Mutex::new(HashMap::new())),
1243 next_session_id: AtomicU64::new(1),
1244 txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1245 }
1246 }
1247
1248 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1250 Self {
1251 transactions: Arc::new(Mutex::new(HashMap::new())),
1252 next_session_id: AtomicU64::new(1),
1253 txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1254 next_txn_id,
1255 last_committed,
1256 )),
1257 }
1258 }
1259
1260 pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1262 let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1263 tracing::debug!(
1264 "[TX_MANAGER] create_session: allocated session_id={}",
1265 session_id
1266 );
1267 TransactionSession::new(
1268 context,
1269 session_id,
1270 Arc::clone(&self.transactions),
1271 Arc::clone(&self.txn_manager),
1272 )
1273 }
1274
1275 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1277 Arc::clone(&self.txn_manager)
1278 }
1279
1280 pub fn has_active_transaction(&self) -> bool {
1282 !self
1283 .transactions
1284 .lock()
1285 .expect("transactions lock poisoned")
1286 .is_empty()
1287 }
1288}
1289
1290impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1291where
1292 BaseCtx: TransactionContext + 'static,
1293 StagingCtx: TransactionContext + 'static,
1294{
1295 fn default() -> Self {
1296 Self::new()
1297 }
1298}