1use std::collections::{HashMap, HashSet};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13
14use arrow::array::RecordBatch;
15use llkv_column_map::types::TableId;
16use llkv_executor::{ExecutorRowBatch, SelectExecution};
17use llkv_expr::expr::Expr as LlkvExpr;
18use llkv_plan::plans::{
19 CreateIndexPlan, CreateTablePlan, DeletePlan, DropTablePlan, InsertPlan, PlanColumnSpec,
20 PlanOperation, SelectPlan, TruncatePlan, UpdatePlan,
21};
22use llkv_result::{Error, Result as LlkvResult};
23use llkv_storage::pager::Pager;
24use llkv_table::CatalogDdl;
25use simd_r_drive_entry_handle::EntryHandle;
26
27use crate::mvcc::{TXN_ID_AUTO_COMMIT, TransactionSnapshot, TxnId, TxnIdManager};
28use crate::types::{TransactionCatalogSnapshot, TransactionKind, TransactionResult};
29
30pub type TransactionSessionId = u64;
35
36fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
38 if plan.tables.len() == 1 {
39 Some(plan.tables[0].qualified_name())
40 } else {
41 None
42 }
43}
44
45pub trait TransactionContext: CatalogDdl + Send + Sync {
50 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
52
53 type Snapshot: TransactionCatalogSnapshot;
55
56 fn set_snapshot(&self, snapshot: TransactionSnapshot);
58
59 fn snapshot(&self) -> TransactionSnapshot;
61
62 fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<PlanColumnSpec>>;
64
65 fn export_table_rows(&self, table_name: &str) -> LlkvResult<ExecutorRowBatch>;
67
68 fn get_batches_with_row_ids(
70 &self,
71 table_name: &str,
72 filter: Option<LlkvExpr<'static, String>>,
73 ) -> LlkvResult<Vec<RecordBatch>>;
74
75 fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
77
78 fn apply_create_table_plan(
80 &self,
81 plan: CreateTablePlan,
82 ) -> LlkvResult<TransactionResult<Self::Pager>>;
83
84 fn drop_table(&self, plan: DropTablePlan) -> LlkvResult<()>;
86
87 fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
89
90 fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
92
93 fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
95
96 fn truncate(&self, plan: TruncatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
98
99 fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
101
102 fn append_batches_with_row_ids(
104 &self,
105 table_name: &str,
106 batches: Vec<RecordBatch>,
107 ) -> LlkvResult<usize>;
108
109 fn table_names(&self) -> Vec<String>;
111
112 fn table_id(&self, table_name: &str) -> LlkvResult<TableId>;
114
115 fn catalog_snapshot(&self) -> Self::Snapshot;
117
118 fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
120 Ok(())
121 }
122
123 fn clear_transaction_state(&self, _txn_id: TxnId) {}
125}
126
127pub struct SessionTransaction<BaseCtx, StagingCtx>
135where
136 BaseCtx: TransactionContext + 'static,
137 StagingCtx: TransactionContext + 'static,
138{
139 snapshot: TransactionSnapshot,
141 staging: Arc<StagingCtx>,
143 operations: Vec<PlanOperation>,
145 staged_tables: HashSet<String>,
147 new_tables: HashSet<String>,
149 missing_tables: HashSet<String>,
151 locked_table_names: HashSet<String>,
154 catalog_snapshot: BaseCtx::Snapshot,
157 base_context: Arc<BaseCtx>,
159 is_aborted: bool,
161 txn_manager: Arc<TxnIdManager>,
163 accessed_tables: HashSet<String>,
165 transactional_foreign_keys: HashMap<String, Vec<String>>,
169}
170
171impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
172where
173 BaseCtx: TransactionContext + 'static,
174 StagingCtx: TransactionContext + 'static,
175{
176 pub fn new(
177 base_context: Arc<BaseCtx>,
178 staging: Arc<StagingCtx>,
179 txn_manager: Arc<TxnIdManager>,
180 ) -> Self {
181 let catalog_snapshot = base_context.catalog_snapshot();
184
185 let snapshot = txn_manager.begin_transaction();
186 tracing::debug!(
187 "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
188 snapshot.txn_id,
189 snapshot.snapshot_id
190 );
191 TransactionContext::set_snapshot(&*base_context, snapshot);
192 TransactionContext::set_snapshot(&*staging, snapshot);
193
194 Self {
195 staging,
196 operations: Vec::new(),
197 staged_tables: HashSet::new(),
198 new_tables: HashSet::new(),
199 missing_tables: HashSet::new(),
200 locked_table_names: HashSet::new(),
201 catalog_snapshot,
202 base_context,
203 is_aborted: false,
204 accessed_tables: HashSet::new(),
205 snapshot,
206 txn_manager,
207 transactional_foreign_keys: HashMap::new(),
208 }
209 }
210
211 fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
214 tracing::trace!(
215 "[ENSURE] ensure_table_exists called for table='{}'",
216 table_name
217 );
218
219 if self.staged_tables.contains(table_name) {
221 tracing::trace!("[ENSURE] table already verified to exist");
222 return Ok(());
223 }
224
225 if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
227 {
228 self.missing_tables.insert(table_name.to_string());
229 return Err(Error::CatalogError(format!(
230 "Catalog Error: Table '{table_name}' does not exist"
231 )));
232 }
233
234 if self.missing_tables.contains(table_name) {
235 return Err(Error::CatalogError(format!(
236 "Catalog Error: Table '{table_name}' does not exist"
237 )));
238 }
239
240 if self.new_tables.contains(table_name) {
242 tracing::trace!("[ENSURE] Table was created in this transaction");
243 match self.staging.table_column_specs(table_name) {
245 Ok(_) => {
246 self.staged_tables.insert(table_name.to_string());
247 return Ok(());
248 }
249 Err(_) => {
250 return Err(Error::CatalogError(format!(
251 "Catalog Error: Table '{table_name}' was created but not found in staging"
252 )));
253 }
254 }
255 }
256
257 tracing::trace!(
259 "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
260 );
261 self.staged_tables.insert(table_name.to_string());
262 Ok(())
263 }
264
265 pub fn execute_select(
269 &mut self,
270 plan: SelectPlan,
271 ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
272 let table_name = select_plan_table_name(&plan).ok_or_else(|| {
274 Error::InvalidArgumentError(
275 "Transaction execute_select requires single-table query".into(),
276 )
277 })?;
278
279 self.ensure_table_exists(&table_name)?;
281
282 if self.new_tables.contains(&table_name) {
284 tracing::trace!(
285 "[SELECT] Reading from staging for new table '{}'",
286 table_name
287 );
288 return self.staging.execute_select(plan);
289 }
290
291 self.accessed_tables.insert(table_name.clone());
293
294 tracing::trace!(
297 "[SELECT] Reading from BASE with MVCC for existing table '{}'",
298 table_name
299 );
300 self.base_context.execute_select(plan).and_then(|exec| {
301 let schema = exec.schema();
305 let batches = exec.collect().unwrap_or_default();
306 let combined = if batches.is_empty() {
307 RecordBatch::new_empty(Arc::clone(&schema))
308 } else if batches.len() == 1 {
309 batches.into_iter().next().unwrap()
310 } else {
311 let refs: Vec<&RecordBatch> = batches.iter().collect();
312 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
313 Error::Internal(format!("failed to concatenate batches: {err}"))
314 })?
315 };
316 Ok(SelectExecution::from_batch(
317 table_name,
318 Arc::clone(&schema),
319 combined,
320 ))
321 })
322 }
323
324 pub fn execute_operation(
326 &mut self,
327 operation: PlanOperation,
328 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
329 tracing::trace!(
330 "[TX] SessionTransaction::execute_operation called, operation={:?}",
331 match &operation {
332 PlanOperation::Insert(p) => format!("INSERT({})", p.table),
333 PlanOperation::Update(p) => format!("UPDATE({})", p.table),
334 PlanOperation::Delete(p) => format!("DELETE({})", p.table),
335 PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
336 _ => "OTHER".to_string(),
337 }
338 );
339 if self.is_aborted {
341 return Err(Error::TransactionContextError(
342 "TransactionContext Error: transaction is aborted".into(),
343 ));
344 }
345
346 let result = match operation {
348 PlanOperation::CreateTable(ref plan) => {
349 for fk in &plan.foreign_keys {
353 let canonical_ref_table = fk.referenced_table.to_ascii_lowercase();
354 if !self.new_tables.contains(&canonical_ref_table)
356 && !self.catalog_snapshot.table_exists(&canonical_ref_table)
357 {
358 self.is_aborted = true;
359 return Err(Error::CatalogError(format!(
360 "Catalog Error: referenced table '{}' does not exist",
361 fk.referenced_table
362 )));
363 }
364 }
365
366 let mut staging_plan = plan.clone();
370 staging_plan.foreign_keys.clear();
371
372 match self.staging.apply_create_table_plan(staging_plan) {
373 Ok(result) => {
374 self.new_tables.insert(plan.name.clone());
376 self.missing_tables.remove(&plan.name);
377 self.staged_tables.insert(plan.name.clone());
378 self.locked_table_names
380 .insert(plan.name.to_ascii_lowercase());
381
382 for fk in &plan.foreign_keys {
384 let referenced_table = fk.referenced_table.to_ascii_lowercase();
385 self.transactional_foreign_keys
386 .entry(referenced_table)
387 .or_default()
388 .push(plan.name.to_ascii_lowercase());
389 }
390
391 self.operations
393 .push(PlanOperation::CreateTable(plan.clone()));
394 result.convert_pager_type()?
395 }
396 Err(e) => {
397 self.is_aborted = true;
398 return Err(e);
399 }
400 }
401 }
402 PlanOperation::DropTable(ref plan) => {
403 let canonical_name = plan.name.to_ascii_lowercase();
404
405 self.locked_table_names.insert(canonical_name.clone());
407
408 if self.new_tables.contains(&canonical_name) {
410 TransactionContext::drop_table(self.staging.as_ref(), plan.clone())?;
413 self.new_tables.remove(&canonical_name);
414 self.staged_tables.remove(&canonical_name);
415
416 self.transactional_foreign_keys.iter_mut().for_each(
418 |(_, referencing_tables)| {
419 referencing_tables.retain(|t| t != &canonical_name);
420 },
421 );
422 self.transactional_foreign_keys
424 .retain(|_, referencing_tables| !referencing_tables.is_empty());
425
426 self.operations.retain(|op| {
428 !matches!(op, PlanOperation::CreateTable(p) if p.name.to_ascii_lowercase() == canonical_name)
429 });
430 TransactionResult::NoOp
433 } else {
434 if !self.catalog_snapshot.table_exists(&canonical_name) && !plan.if_exists {
437 self.is_aborted = true;
438 return Err(Error::InvalidArgumentError(format!(
439 "table '{}' does not exist",
440 plan.name
441 )));
442 }
443
444 if self.catalog_snapshot.table_exists(&canonical_name) {
445 self.missing_tables.insert(canonical_name.clone());
447 self.staged_tables.remove(&canonical_name);
448 self.operations.push(PlanOperation::DropTable(plan.clone()));
450 }
451 TransactionResult::NoOp
452 }
453 }
454 PlanOperation::Insert(ref plan) => {
455 tracing::trace!(
456 "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
457 plan.table
458 );
459 if let Err(e) = self.ensure_table_exists(&plan.table) {
461 self.is_aborted = true;
462 return Err(e);
463 }
464
465 let is_new_table = self.new_tables.contains(&plan.table);
468 if !is_new_table {
470 self.accessed_tables.insert(plan.table.clone());
471 }
472 let result = if is_new_table {
473 tracing::trace!("[TX] INSERT into staging for new table");
474 self.staging.insert(plan.clone())
475 } else {
476 tracing::trace!(
477 "[TX] INSERT directly into BASE with txn_id={}",
478 self.snapshot.txn_id
479 );
480 self.base_context
482 .insert(plan.clone())
483 .and_then(|r| r.convert_pager_type())
484 };
485
486 match result {
487 Ok(result) => {
488 if is_new_table {
491 tracing::trace!(
492 "[TX] INSERT to new table - tracking for commit replay"
493 );
494 self.operations.push(PlanOperation::Insert(plan.clone()));
495 } else {
496 tracing::trace!(
497 "[TX] INSERT to existing table - already in BASE, no replay needed"
498 );
499 }
500 result
501 }
502 Err(e) => {
503 tracing::trace!(
504 "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
505 e
506 );
507 tracing::trace!("DEBUG setting is_aborted=true");
508 self.is_aborted = true;
509 return Err(e);
510 }
511 }
512 }
513 PlanOperation::Update(ref plan) => {
514 if let Err(e) = self.ensure_table_exists(&plan.table) {
515 self.is_aborted = true;
516 return Err(e);
517 }
518
519 let is_new_table = self.new_tables.contains(&plan.table);
522 if !is_new_table {
524 self.accessed_tables.insert(plan.table.clone());
525 }
526 let result = if is_new_table {
527 tracing::trace!("[TX] UPDATE in staging for new table");
528 self.staging.update(plan.clone())
529 } else {
530 tracing::trace!(
531 "[TX] UPDATE directly in BASE with txn_id={}",
532 self.snapshot.txn_id
533 );
534 self.base_context
535 .update(plan.clone())
536 .and_then(|r| r.convert_pager_type())
537 };
538
539 match result {
540 Ok(result) => {
541 if is_new_table {
543 tracing::trace!(
544 "[TX] UPDATE to new table - tracking for commit replay"
545 );
546 self.operations.push(PlanOperation::Update(plan.clone()));
547 } else {
548 tracing::trace!(
549 "[TX] UPDATE to existing table - already in BASE, no replay needed"
550 );
551 }
552 result
553 }
554 Err(e) => {
555 self.is_aborted = true;
556 return Err(e);
557 }
558 }
559 }
560 PlanOperation::Delete(ref plan) => {
561 tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
562 if let Err(e) = self.ensure_table_exists(&plan.table) {
563 tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
564 self.is_aborted = true;
565 return Err(e);
566 }
567
568 let is_new_table = self.new_tables.contains(&plan.table);
571 tracing::debug!("[DELETE] is_new_table={}", is_new_table);
572 if !is_new_table {
574 tracing::debug!(
575 "[DELETE] Tracking access to existing table '{}'",
576 plan.table
577 );
578 self.accessed_tables.insert(plan.table.clone());
579 }
580 let result = if is_new_table {
581 tracing::debug!("[DELETE] Deleting from staging for new table");
582 self.staging.delete(plan.clone())
583 } else {
584 tracing::debug!(
585 "[DELETE] Deleting from BASE with txn_id={}",
586 self.snapshot.txn_id
587 );
588 self.base_context
589 .delete(plan.clone())
590 .and_then(|r| r.convert_pager_type())
591 };
592
593 tracing::debug!(
594 "[DELETE] Result: {:?}",
595 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
596 );
597 match result {
598 Ok(result) => {
599 if is_new_table {
601 tracing::trace!(
602 "[TX] DELETE from new table - tracking for commit replay"
603 );
604 self.operations.push(PlanOperation::Delete(plan.clone()));
605 } else {
606 tracing::trace!(
607 "[TX] DELETE from existing table - already in BASE, no replay needed"
608 );
609 }
610 result
611 }
612 Err(e) => {
613 self.is_aborted = true;
614 return Err(e);
615 }
616 }
617 }
618 PlanOperation::Truncate(ref plan) => {
619 tracing::debug!("[TRUNCATE] Starting truncate for table '{}'", plan.table);
620 if let Err(e) = self.ensure_table_exists(&plan.table) {
621 tracing::debug!("[TRUNCATE] ensure_table_exists failed: {}", e);
622 self.is_aborted = true;
623 return Err(e);
624 }
625
626 let is_new_table = self.new_tables.contains(&plan.table);
628 tracing::debug!("[TRUNCATE] is_new_table={}", is_new_table);
629 if !is_new_table {
631 tracing::debug!(
632 "[TRUNCATE] Tracking access to existing table '{}'",
633 plan.table
634 );
635 self.accessed_tables.insert(plan.table.clone());
636 }
637 let result = if is_new_table {
638 tracing::debug!("[TRUNCATE] Truncating staging for new table");
639 self.staging.truncate(plan.clone())
640 } else {
641 tracing::debug!(
642 "[TRUNCATE] Truncating BASE with txn_id={}",
643 self.snapshot.txn_id
644 );
645 self.base_context
646 .truncate(plan.clone())
647 .and_then(|r| r.convert_pager_type())
648 };
649
650 tracing::debug!(
651 "[TRUNCATE] Result: {:?}",
652 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
653 );
654 match result {
655 Ok(result) => {
656 if is_new_table {
658 tracing::trace!(
659 "[TX] TRUNCATE on new table - tracking for commit replay"
660 );
661 self.operations.push(PlanOperation::Truncate(plan.clone()));
662 } else {
663 tracing::trace!(
664 "[TX] TRUNCATE on existing table - already in BASE, no replay needed"
665 );
666 }
667 result
668 }
669 Err(e) => {
670 self.is_aborted = true;
671 return Err(e);
672 }
673 }
674 }
675 PlanOperation::Select(plan) => {
676 let table_name = select_plan_table_name(plan.as_ref()).unwrap_or_default();
679 let plan = *plan;
680 match self.execute_select(plan) {
681 Ok(staging_execution) => {
682 let schema = staging_execution.schema();
684 let batches = staging_execution.collect().unwrap_or_default();
685
686 let combined = if batches.is_empty() {
688 RecordBatch::new_empty(Arc::clone(&schema))
689 } else if batches.len() == 1 {
690 batches.into_iter().next().unwrap()
691 } else {
692 let refs: Vec<&RecordBatch> = batches.iter().collect();
693 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
694 Error::Internal(format!("failed to concatenate batches: {err}"))
695 })?
696 };
697
698 let execution = SelectExecution::from_batch(
700 table_name.clone(),
701 Arc::clone(&schema),
702 combined,
703 );
704
705 TransactionResult::Select {
706 table_name,
707 schema,
708 execution,
709 }
710 }
711 Err(e) => {
712 return Err(e);
715 }
716 }
717 }
718 };
719
720 Ok(result)
721 }
722
723 pub fn operations(&self) -> &[PlanOperation] {
725 &self.operations
726 }
727}
728
729pub struct TransactionSession<BaseCtx, StagingCtx>
733where
734 BaseCtx: TransactionContext + 'static,
735 StagingCtx: TransactionContext + 'static,
736{
737 context: Arc<BaseCtx>,
738 session_id: TransactionSessionId,
739 transactions:
740 Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
741 txn_manager: Arc<TxnIdManager>,
742}
743
744impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
745where
746 BaseCtx: TransactionContext + 'static,
747 StagingCtx: TransactionContext + 'static,
748{
749 pub fn new(
750 context: Arc<BaseCtx>,
751 session_id: TransactionSessionId,
752 transactions: Arc<
753 Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>,
754 >,
755 txn_manager: Arc<TxnIdManager>,
756 ) -> Self {
757 Self {
758 context,
759 session_id,
760 transactions,
761 txn_manager,
762 }
763 }
764
765 pub fn clone_session(&self) -> Self {
768 Self {
769 context: Arc::clone(&self.context),
770 session_id: self.session_id,
771 transactions: Arc::clone(&self.transactions),
772 txn_manager: Arc::clone(&self.txn_manager),
773 }
774 }
775
776 pub fn session_id(&self) -> TransactionSessionId {
778 self.session_id
779 }
780
781 pub fn context(&self) -> &Arc<BaseCtx> {
783 &self.context
784 }
785
786 pub fn has_active_transaction(&self) -> bool {
788 self.transactions
789 .lock()
790 .expect("transactions lock poisoned")
791 .contains_key(&self.session_id)
792 }
793
794 pub fn is_aborted(&self) -> bool {
796 self.transactions
797 .lock()
798 .expect("transactions lock poisoned")
799 .get(&self.session_id)
800 .map(|tx| tx.is_aborted)
801 .unwrap_or(false)
802 }
803
804 pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
807 self.transactions
808 .lock()
809 .expect("transactions lock poisoned")
810 .get(&self.session_id)
811 .map(|tx| tx.new_tables.contains(table_name))
812 .unwrap_or(false)
813 }
814
815 pub fn table_column_specs_from_transaction(
818 &self,
819 table_name: &str,
820 ) -> Option<Vec<PlanColumnSpec>> {
821 let guard = self
822 .transactions
823 .lock()
824 .expect("transactions lock poisoned");
825
826 let tx = guard.get(&self.session_id)?;
827 if !tx.new_tables.contains(table_name) {
828 return None;
829 }
830
831 tx.staging.table_column_specs(table_name).ok()
833 }
834
835 pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
838 let canonical = referenced_table.to_ascii_lowercase();
839 let guard = self
840 .transactions
841 .lock()
842 .expect("transactions lock poisoned");
843
844 let tx = match guard.get(&self.session_id) {
845 Some(tx) => tx,
846 None => return Vec::new(),
847 };
848
849 tx.transactional_foreign_keys
850 .get(&canonical)
851 .cloned()
852 .unwrap_or_else(Vec::new)
853 }
854
855 pub fn has_table_locked_by_other_session(&self, table_name: &str) -> bool {
858 let canonical = table_name.to_ascii_lowercase();
859 let guard = self
860 .transactions
861 .lock()
862 .expect("transactions lock poisoned");
863
864 for (session_id, tx) in guard.iter() {
865 if *session_id == self.session_id {
867 continue;
868 }
869
870 if tx.locked_table_names.contains(&canonical) {
872 return true;
873 }
874 }
875
876 false
877 }
878
879 pub fn abort_transaction(&self) {
882 let mut guard = self
883 .transactions
884 .lock()
885 .expect("transactions lock poisoned");
886 if let Some(tx) = guard.get_mut(&self.session_id) {
887 tx.is_aborted = true;
888 }
889 }
890
891 pub fn begin_transaction(
893 &self,
894 staging: Arc<StagingCtx>,
895 ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
896 tracing::debug!(
897 "[BEGIN] begin_transaction called for session_id={}",
898 self.session_id
899 );
900 let mut guard = self
901 .transactions
902 .lock()
903 .expect("transactions lock poisoned");
904 tracing::debug!(
905 "[BEGIN] session_id={}, transactions map has {} entries",
906 self.session_id,
907 guard.len()
908 );
909 if guard.contains_key(&self.session_id) {
910 return Err(Error::InvalidArgumentError(
911 "a transaction is already in progress in this session".into(),
912 ));
913 }
914 guard.insert(
915 self.session_id,
916 SessionTransaction::new(
917 Arc::clone(&self.context),
918 staging,
919 Arc::clone(&self.txn_manager),
920 ),
921 );
922 tracing::debug!(
923 "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
924 self.session_id,
925 guard.len()
926 );
927 Ok(TransactionResult::Transaction {
928 kind: TransactionKind::Begin,
929 })
930 }
931
932 pub fn commit_transaction(
935 &self,
936 ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
937 tracing::trace!(
938 "[COMMIT] commit_transaction called for session {:?}",
939 self.session_id
940 );
941 let mut guard = self
942 .transactions
943 .lock()
944 .expect("transactions lock poisoned");
945 tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
946 let tx_opt = guard.remove(&self.session_id);
947 tracing::trace!(
948 "[COMMIT] commit_transaction remove returned: {}",
949 tx_opt.is_some()
950 );
951 let tx = tx_opt.ok_or_else(|| {
952 tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
953 Error::InvalidArgumentError(
954 "no transaction is currently in progress in this session".into(),
955 )
956 })?;
957 tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
958
959 if tx.is_aborted {
961 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
962 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
963 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
964 let auto_commit_snapshot = TransactionSnapshot {
966 txn_id: TXN_ID_AUTO_COMMIT,
967 snapshot_id: tx.txn_manager.last_committed(),
968 };
969 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
970 tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
971 return Ok((
972 TransactionResult::Transaction {
973 kind: TransactionKind::Rollback,
974 },
975 Vec::new(),
976 ));
977 }
978
979 tracing::debug!(
982 "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
983 tx.snapshot.txn_id,
984 tx.accessed_tables.len()
985 );
986 for accessed_table_name in &tx.accessed_tables {
987 tracing::debug!(
988 "[COMMIT CONFLICT CHECK] Checking table '{}'",
989 accessed_table_name
990 );
991 if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
993 match self.context.table_id(accessed_table_name) {
995 Ok(current_table_id) => {
996 if current_table_id != snapshot_table_id {
998 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
999 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1000 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1001 let auto_commit_snapshot = TransactionSnapshot {
1002 txn_id: TXN_ID_AUTO_COMMIT,
1003 snapshot_id: tx.txn_manager.last_committed(),
1004 };
1005 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1006 return Err(Error::TransactionContextError(
1007 "another transaction has dropped this table".into(),
1008 ));
1009 }
1010 }
1011 Err(_) => {
1012 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1014 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1015 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1016 let auto_commit_snapshot = TransactionSnapshot {
1017 txn_id: TXN_ID_AUTO_COMMIT,
1018 snapshot_id: tx.txn_manager.last_committed(),
1019 };
1020 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1021 return Err(Error::TransactionContextError(
1022 "another transaction has dropped this table".into(),
1023 ));
1024 }
1025 }
1026 }
1027 }
1028
1029 if let Err(err) = tx
1030 .base_context
1031 .validate_commit_constraints(tx.snapshot.txn_id)
1032 {
1033 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1034 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1035 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1036 let auto_commit_snapshot = TransactionSnapshot {
1037 txn_id: TXN_ID_AUTO_COMMIT,
1038 snapshot_id: tx.txn_manager.last_committed(),
1039 };
1040 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1041 let wrapped = match err {
1042 Error::ConstraintError(msg) => Error::TransactionContextError(format!(
1043 "TransactionContext Error: constraint violation: {msg}"
1044 )),
1045 other => other,
1046 };
1047 return Err(wrapped);
1048 }
1049
1050 let operations = tx.operations;
1051 tracing::trace!(
1052 "DEBUG commit_transaction: returning Commit with {} operations",
1053 operations.len()
1054 );
1055
1056 tx.txn_manager.mark_committed(tx.snapshot.txn_id);
1057 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1058 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1059 TransactionContext::set_snapshot(&*self.context, tx.snapshot);
1060
1061 Ok((
1062 TransactionResult::Transaction {
1063 kind: TransactionKind::Commit,
1064 },
1065 operations,
1066 ))
1067 }
1068
1069 pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
1071 let mut guard = self
1072 .transactions
1073 .lock()
1074 .expect("transactions lock poisoned");
1075 if let Some(tx) = guard.remove(&self.session_id) {
1076 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1077 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1078 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1079 let auto_commit_snapshot = TransactionSnapshot {
1081 txn_id: TXN_ID_AUTO_COMMIT,
1082 snapshot_id: tx.txn_manager.last_committed(),
1083 };
1084 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1085 } else {
1086 return Err(Error::InvalidArgumentError(
1087 "no transaction is currently in progress in this session".into(),
1088 ));
1089 }
1090 Ok(TransactionResult::Transaction {
1091 kind: TransactionKind::Rollback,
1092 })
1093 }
1094
1095 pub fn execute_operation(
1097 &self,
1098 operation: PlanOperation,
1099 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
1100 tracing::debug!(
1101 "[EXECUTE_OP] execute_operation called for session_id={}",
1102 self.session_id
1103 );
1104 if !self.has_active_transaction() {
1105 return Err(Error::InvalidArgumentError(
1107 "execute_operation called without active transaction".into(),
1108 ));
1109 }
1110
1111 if let PlanOperation::CreateTable(ref plan) = operation {
1113 let guard = self
1114 .transactions
1115 .lock()
1116 .expect("transactions lock poisoned");
1117
1118 let canonical_name = plan.name.to_ascii_lowercase();
1119
1120 for (other_session_id, other_tx) in guard.iter() {
1122 if *other_session_id != self.session_id
1123 && other_tx.locked_table_names.contains(&canonical_name)
1124 {
1125 return Err(Error::TransactionContextError(format!(
1126 "table '{}' is locked by another active transaction",
1127 plan.name
1128 )));
1129 }
1130 }
1131 drop(guard); }
1133
1134 if let PlanOperation::DropTable(ref plan) = operation {
1136 let guard = self
1137 .transactions
1138 .lock()
1139 .expect("transactions lock poisoned");
1140
1141 let canonical_name = plan.name.to_ascii_lowercase();
1142
1143 for (other_session_id, other_tx) in guard.iter() {
1145 if *other_session_id != self.session_id
1146 && other_tx.locked_table_names.contains(&canonical_name)
1147 {
1148 return Err(Error::TransactionContextError(format!(
1149 "table '{}' is locked by another active transaction",
1150 plan.name
1151 )));
1152 }
1153 }
1154 drop(guard); }
1156
1157 let mut guard = self
1159 .transactions
1160 .lock()
1161 .expect("transactions lock poisoned");
1162 tracing::debug!(
1163 "[EXECUTE_OP] session_id={}, transactions map has {} entries",
1164 self.session_id,
1165 guard.len()
1166 );
1167 let tx = guard
1168 .get_mut(&self.session_id)
1169 .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
1170 tracing::debug!(
1171 "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
1172 self.session_id,
1173 tx.snapshot.txn_id,
1174 tx.accessed_tables.len()
1175 );
1176
1177 let result = tx.execute_operation(operation);
1178 if let Err(ref e) = result {
1179 tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1180 tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1181 }
1182 result
1183 }
1184}
1185
1186impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1187where
1188 BaseCtx: TransactionContext,
1189 StagingCtx: TransactionContext,
1190{
1191 fn drop(&mut self) {
1192 match self.transactions.lock() {
1195 Ok(mut guard) => {
1196 if guard.remove(&self.session_id).is_some() {
1197 eprintln!(
1198 "Warning: TransactionSession dropped with active transaction - auto-rolling back"
1199 );
1200 }
1201 }
1202 Err(_) => {
1203 tracing::trace!(
1206 "Warning: TransactionSession dropped with poisoned transaction mutex"
1207 );
1208 }
1209 }
1210 }
1211}
1212
1213pub struct TransactionManager<BaseCtx, StagingCtx>
1218where
1219 BaseCtx: TransactionContext + 'static,
1220 StagingCtx: TransactionContext + 'static,
1221{
1222 transactions:
1223 Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1224 next_session_id: AtomicU64,
1225 txn_manager: Arc<TxnIdManager>,
1226}
1227
1228impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1229where
1230 BaseCtx: TransactionContext + 'static,
1231 StagingCtx: TransactionContext + 'static,
1232{
1233 pub fn new() -> Self {
1234 Self {
1235 transactions: Arc::new(Mutex::new(HashMap::new())),
1236 next_session_id: AtomicU64::new(1),
1237 txn_manager: Arc::new(TxnIdManager::new()),
1238 }
1239 }
1240
1241 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1243 Self {
1244 transactions: Arc::new(Mutex::new(HashMap::new())),
1245 next_session_id: AtomicU64::new(1),
1246 txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1247 }
1248 }
1249
1250 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1252 Self {
1253 transactions: Arc::new(Mutex::new(HashMap::new())),
1254 next_session_id: AtomicU64::new(1),
1255 txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1256 next_txn_id,
1257 last_committed,
1258 )),
1259 }
1260 }
1261
1262 pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1264 let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1265 tracing::debug!(
1266 "[TX_MANAGER] create_session: allocated session_id={}",
1267 session_id
1268 );
1269 TransactionSession::new(
1270 context,
1271 session_id,
1272 Arc::clone(&self.transactions),
1273 Arc::clone(&self.txn_manager),
1274 )
1275 }
1276
1277 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1279 Arc::clone(&self.txn_manager)
1280 }
1281
1282 pub fn has_active_transaction(&self) -> bool {
1284 !self
1285 .transactions
1286 .lock()
1287 .expect("transactions lock poisoned")
1288 .is_empty()
1289 }
1290}
1291
1292impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1293where
1294 BaseCtx: TransactionContext + 'static,
1295 StagingCtx: TransactionContext + 'static,
1296{
1297 fn default() -> Self {
1298 Self::new()
1299 }
1300}