1use std::collections::{HashMap, HashSet};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13
14use arrow::array::RecordBatch;
15use llkv_executor::{ExecutorRowBatch, SelectExecution};
16use llkv_expr::expr::Expr as LlkvExpr;
17use llkv_plan::plans::{
18 CreateIndexPlan, CreateTablePlan, DeletePlan, DropTablePlan, InsertPlan, PlanColumnSpec,
19 PlanOperation, SelectPlan, TruncatePlan, UpdatePlan,
20};
21use llkv_result::{Error, Result as LlkvResult};
22use llkv_storage::pager::Pager;
23use llkv_table::CatalogDdl;
24use llkv_types::TableId;
25use simd_r_drive_entry_handle::EntryHandle;
26
27use crate::mvcc::{TXN_ID_AUTO_COMMIT, TransactionSnapshot, TxnId, TxnIdManager};
28use crate::types::{TransactionCatalogSnapshot, TransactionKind, TransactionResult};
29
30pub type TransactionSessionId = u64;
35
36fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
38 if plan.tables.len() == 1 {
39 Some(plan.tables[0].qualified_name())
40 } else {
41 None
42 }
43}
44
45pub trait TransactionContext: CatalogDdl + Send + Sync {
50 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
52
53 type Snapshot: TransactionCatalogSnapshot;
55
56 fn set_snapshot(&self, snapshot: TransactionSnapshot);
58
59 fn snapshot(&self) -> TransactionSnapshot;
61
62 fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<PlanColumnSpec>>;
64
65 fn export_table_rows(&self, table_name: &str) -> LlkvResult<ExecutorRowBatch>;
67
68 fn get_batches_with_row_ids(
70 &self,
71 table_name: &str,
72 filter: Option<LlkvExpr<'static, String>>,
73 ) -> LlkvResult<Vec<RecordBatch>>;
74
75 fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
77
78 fn apply_create_table_plan(
80 &self,
81 plan: CreateTablePlan,
82 ) -> LlkvResult<TransactionResult<Self::Pager>>;
83
84 fn drop_table(&self, plan: DropTablePlan) -> LlkvResult<()>;
86
87 fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
89
90 fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
92
93 fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
95
96 fn truncate(&self, plan: TruncatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
98
99 fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
101
102 fn append_batches_with_row_ids(
104 &self,
105 table_name: &str,
106 batches: Vec<RecordBatch>,
107 ) -> LlkvResult<usize>;
108
109 fn table_names(&self) -> Vec<String>;
111
112 fn table_id(&self, table_name: &str) -> LlkvResult<TableId>;
114
115 fn catalog_snapshot(&self) -> Self::Snapshot;
117
118 fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
120 Ok(())
121 }
122
123 fn clear_transaction_state(&self, _txn_id: TxnId) {}
125}
126
127pub struct SessionTransaction<BaseCtx, StagingCtx>
135where
136 BaseCtx: TransactionContext + 'static,
137 StagingCtx: TransactionContext + 'static,
138{
139 snapshot: TransactionSnapshot,
141 staging: Arc<StagingCtx>,
143 operations: Vec<PlanOperation>,
145 staged_tables: HashSet<String>,
147 new_tables: HashSet<String>,
149 missing_tables: HashSet<String>,
151 locked_table_names: HashSet<String>,
154 catalog_snapshot: BaseCtx::Snapshot,
157 base_context: Arc<BaseCtx>,
159 is_aborted: bool,
161 txn_manager: Arc<TxnIdManager>,
163 accessed_tables: HashSet<String>,
165 transactional_foreign_keys: HashMap<String, Vec<String>>,
169}
170
171impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
172where
173 BaseCtx: TransactionContext + 'static,
174 StagingCtx: TransactionContext + 'static,
175{
176 pub fn new(
177 base_context: Arc<BaseCtx>,
178 staging: Arc<StagingCtx>,
179 txn_manager: Arc<TxnIdManager>,
180 ) -> Self {
181 let catalog_snapshot = base_context.catalog_snapshot();
184
185 let snapshot = txn_manager.begin_transaction();
186 tracing::debug!(
187 "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
188 snapshot.txn_id,
189 snapshot.snapshot_id
190 );
191 TransactionContext::set_snapshot(&*base_context, snapshot);
192 TransactionContext::set_snapshot(&*staging, snapshot);
193
194 Self {
195 staging,
196 operations: Vec::new(),
197 staged_tables: HashSet::new(),
198 new_tables: HashSet::new(),
199 missing_tables: HashSet::new(),
200 locked_table_names: HashSet::new(),
201 catalog_snapshot,
202 base_context,
203 is_aborted: false,
204 accessed_tables: HashSet::new(),
205 snapshot,
206 txn_manager,
207 transactional_foreign_keys: HashMap::new(),
208 }
209 }
210
211 fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
214 tracing::trace!(
215 "[ENSURE] ensure_table_exists called for table='{}'",
216 table_name
217 );
218
219 if self.staged_tables.contains(table_name) {
221 tracing::trace!("[ENSURE] table already verified to exist");
222 return Ok(());
223 }
224
225 if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
227 {
228 self.missing_tables.insert(table_name.to_string());
229 return Err(Error::CatalogError(format!(
230 "Catalog Error: Table '{table_name}' does not exist"
231 )));
232 }
233
234 if self.missing_tables.contains(table_name) {
235 return Err(Error::CatalogError(format!(
236 "Catalog Error: Table '{table_name}' does not exist"
237 )));
238 }
239
240 if self.new_tables.contains(table_name) {
242 tracing::trace!("[ENSURE] Table was created in this transaction");
243 match self.staging.table_column_specs(table_name) {
245 Ok(_) => {
246 self.staged_tables.insert(table_name.to_string());
247 return Ok(());
248 }
249 Err(_) => {
250 return Err(Error::CatalogError(format!(
251 "Catalog Error: Table '{table_name}' was created but not found in staging"
252 )));
253 }
254 }
255 }
256
257 tracing::trace!(
259 "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
260 );
261 self.staged_tables.insert(table_name.to_string());
262 Ok(())
263 }
264
265 pub fn execute_select(
269 &mut self,
270 plan: SelectPlan,
271 ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
272 let table_name = select_plan_table_name(&plan).ok_or_else(|| {
274 Error::InvalidArgumentError(
275 "Transaction execute_select requires single-table query".into(),
276 )
277 })?;
278
279 self.ensure_table_exists(&table_name)?;
281
282 if self.new_tables.contains(&table_name) {
284 tracing::trace!(
285 "[SELECT] Reading from staging for new table '{}'",
286 table_name
287 );
288 return self.staging.execute_select(plan);
289 }
290
291 self.accessed_tables.insert(table_name.clone());
293
294 tracing::trace!(
297 "[SELECT] Reading from BASE with MVCC for existing table '{}'",
298 table_name
299 );
300 match self.base_context.execute_select(plan) {
301 Ok(exec) => {
302 let schema = exec.schema();
306 let batches = exec.collect().unwrap_or_default();
307 let combined = if batches.is_empty() {
308 RecordBatch::new_empty(Arc::clone(&schema))
309 } else if batches.len() == 1 {
310 batches.into_iter().next().unwrap()
311 } else {
312 let refs: Vec<&RecordBatch> = batches.iter().collect();
313 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
314 Error::Internal(format!("failed to concatenate batches: {err}"))
315 })?
316 };
317 Ok(SelectExecution::from_batch(
318 table_name,
319 Arc::clone(&schema),
320 combined,
321 ))
322 }
323 Err(Error::NotFound) if self.catalog_snapshot.table_exists(&table_name) => {
324 tracing::debug!(
327 "[SELECT] Table '{}' not found in current catalog but exists in snapshot; returning empty result, deferring conflict to commit",
328 table_name
329 );
330 let schema = Arc::new(arrow::datatypes::Schema::empty());
332 Ok(SelectExecution::from_batch(
333 table_name,
334 schema.clone(),
335 RecordBatch::new_empty(schema),
336 ))
337 }
338 Err(e) => Err(e),
339 }
340 }
341
342 pub fn execute_operation(
344 &mut self,
345 operation: PlanOperation,
346 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
347 tracing::trace!(
348 "[TX] SessionTransaction::execute_operation called, operation={:?}",
349 match &operation {
350 PlanOperation::Insert(p) => format!("INSERT({})", p.table),
351 PlanOperation::Update(p) => format!("UPDATE({})", p.table),
352 PlanOperation::Delete(p) => format!("DELETE({})", p.table),
353 PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
354 _ => "OTHER".to_string(),
355 }
356 );
357 if self.is_aborted {
359 return Err(Error::TransactionContextError(
360 "TransactionContext Error: transaction is aborted".into(),
361 ));
362 }
363
364 let result = match operation {
366 PlanOperation::CreateTable(ref plan) => {
367 for fk in &plan.foreign_keys {
371 let canonical_ref_table = fk.referenced_table.to_ascii_lowercase();
372 if !self.new_tables.contains(&canonical_ref_table)
374 && !self.catalog_snapshot.table_exists(&canonical_ref_table)
375 {
376 self.is_aborted = true;
377 return Err(Error::CatalogError(format!(
378 "Catalog Error: referenced table '{}' does not exist",
379 fk.referenced_table
380 )));
381 }
382 }
383
384 let mut staging_plan = plan.clone();
388 staging_plan.foreign_keys.clear();
389
390 match self.staging.apply_create_table_plan(staging_plan) {
391 Ok(result) => {
392 self.new_tables.insert(plan.name.clone());
394 self.missing_tables.remove(&plan.name);
395 self.staged_tables.insert(plan.name.clone());
396 self.locked_table_names
398 .insert(plan.name.to_ascii_lowercase());
399
400 for fk in &plan.foreign_keys {
402 let referenced_table = fk.referenced_table.to_ascii_lowercase();
403 self.transactional_foreign_keys
404 .entry(referenced_table)
405 .or_default()
406 .push(plan.name.to_ascii_lowercase());
407 }
408
409 self.operations
411 .push(PlanOperation::CreateTable(plan.clone()));
412 result.convert_pager_type()?
413 }
414 Err(e) => {
415 self.is_aborted = true;
416 return Err(e);
417 }
418 }
419 }
420 PlanOperation::DropTable(ref plan) => {
421 let canonical_name = plan.name.to_ascii_lowercase();
422
423 self.locked_table_names.insert(canonical_name.clone());
425
426 if self.new_tables.contains(&canonical_name) {
428 TransactionContext::drop_table(self.staging.as_ref(), plan.clone())?;
431 self.new_tables.remove(&canonical_name);
432 self.staged_tables.remove(&canonical_name);
433
434 self.transactional_foreign_keys.iter_mut().for_each(
436 |(_, referencing_tables)| {
437 referencing_tables.retain(|t| t != &canonical_name);
438 },
439 );
440 self.transactional_foreign_keys
442 .retain(|_, referencing_tables| !referencing_tables.is_empty());
443
444 self.operations.retain(|op| {
446 !matches!(op, PlanOperation::CreateTable(p) if p.name.to_ascii_lowercase() == canonical_name)
447 });
448 TransactionResult::NoOp
451 } else {
452 if !self.catalog_snapshot.table_exists(&canonical_name) && !plan.if_exists {
455 self.is_aborted = true;
456 return Err(Error::InvalidArgumentError(format!(
457 "table '{}' does not exist",
458 plan.name
459 )));
460 }
461
462 if self.catalog_snapshot.table_exists(&canonical_name) {
463 self.missing_tables.insert(canonical_name.clone());
465 self.staged_tables.remove(&canonical_name);
466 self.operations.push(PlanOperation::DropTable(plan.clone()));
468 }
469 TransactionResult::NoOp
470 }
471 }
472 PlanOperation::Insert(ref plan) => {
473 tracing::trace!(
474 "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
475 plan.table
476 );
477 if let Err(e) = self.ensure_table_exists(&plan.table) {
479 self.is_aborted = true;
480 return Err(e);
481 }
482
483 let is_new_table = self.new_tables.contains(&plan.table);
486 if !is_new_table {
488 self.accessed_tables.insert(plan.table.clone());
489 }
490 let result = if is_new_table {
491 tracing::trace!("[TX] INSERT into staging for new table");
492 self.staging.insert(plan.clone())
493 } else {
494 tracing::trace!(
495 "[TX] INSERT directly into BASE with txn_id={}",
496 self.snapshot.txn_id
497 );
498 self.base_context
500 .insert(plan.clone())
501 .and_then(|r| r.convert_pager_type())
502 };
503
504 match result {
505 Ok(result) => {
506 if is_new_table {
509 tracing::trace!(
510 "[TX] INSERT to new table - tracking for commit replay"
511 );
512 self.operations.push(PlanOperation::Insert(plan.clone()));
513 } else {
514 tracing::trace!(
515 "[TX] INSERT to existing table - already in BASE, no replay needed"
516 );
517 }
518 result
519 }
520 Err(e) => {
521 tracing::trace!(
522 "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
523 e
524 );
525 tracing::trace!("DEBUG setting is_aborted=true");
526 self.is_aborted = true;
527 return Err(e);
528 }
529 }
530 }
531 PlanOperation::Update(ref plan) => {
532 if let Err(e) = self.ensure_table_exists(&plan.table) {
533 self.is_aborted = true;
534 return Err(e);
535 }
536
537 let is_new_table = self.new_tables.contains(&plan.table);
540 if !is_new_table {
542 self.accessed_tables.insert(plan.table.clone());
543 }
544 let result = if is_new_table {
545 tracing::trace!("[TX] UPDATE in staging for new table");
546 self.staging.update(plan.clone())
547 } else {
548 tracing::trace!(
549 "[TX] UPDATE directly in BASE with txn_id={}",
550 self.snapshot.txn_id
551 );
552 match self.base_context.update(plan.clone()) {
553 Ok(r) => r.convert_pager_type(),
554 Err(Error::NotFound) if self.catalog_snapshot.table_exists(&plan.table) => {
555 tracing::debug!(
558 "[UPDATE] Table '{}' not found in current catalog but exists in snapshot; deferring conflict to commit",
559 plan.table
560 );
561 Ok(TransactionResult::Update {
562 rows_matched: 0,
563 rows_updated: 0,
564 })
565 }
566 Err(e) => Err(e),
567 }
568 };
569
570 match result {
571 Ok(result) => {
572 if is_new_table {
574 tracing::trace!(
575 "[TX] UPDATE to new table - tracking for commit replay"
576 );
577 self.operations.push(PlanOperation::Update(plan.clone()));
578 } else {
579 tracing::trace!(
580 "[TX] UPDATE to existing table - already in BASE, no replay needed"
581 );
582 }
583 result
584 }
585 Err(e) => {
586 self.is_aborted = true;
587 return Err(e);
588 }
589 }
590 }
591 PlanOperation::Delete(ref plan) => {
592 tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
593 if let Err(e) = self.ensure_table_exists(&plan.table) {
594 tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
595 self.is_aborted = true;
596 return Err(e);
597 }
598
599 let is_new_table = self.new_tables.contains(&plan.table);
602 tracing::debug!("[DELETE] is_new_table={}", is_new_table);
603 if !is_new_table {
605 tracing::debug!(
606 "[DELETE] Tracking access to existing table '{}'",
607 plan.table
608 );
609 self.accessed_tables.insert(plan.table.clone());
610 }
611 let result = if is_new_table {
612 tracing::debug!("[DELETE] Deleting from staging for new table");
613 self.staging.delete(plan.clone())
614 } else {
615 tracing::debug!(
616 "[DELETE] Deleting from BASE with txn_id={}",
617 self.snapshot.txn_id
618 );
619 match self.base_context.delete(plan.clone()) {
620 Ok(r) => r.convert_pager_type(),
621 Err(Error::NotFound) if self.catalog_snapshot.table_exists(&plan.table) => {
622 tracing::debug!(
625 "[DELETE] Table '{}' not found in current catalog but exists in snapshot; deferring conflict to commit",
626 plan.table
627 );
628 Ok(TransactionResult::Delete { rows_deleted: 0 })
629 }
630 Err(e) => Err(e),
631 }
632 };
633
634 tracing::debug!(
635 "[DELETE] Result: {:?}",
636 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
637 );
638 match result {
639 Ok(result) => {
640 if is_new_table {
642 tracing::trace!(
643 "[TX] DELETE from new table - tracking for commit replay"
644 );
645 self.operations.push(PlanOperation::Delete(plan.clone()));
646 } else {
647 tracing::trace!(
648 "[TX] DELETE from existing table - already in BASE, no replay needed"
649 );
650 }
651 result
652 }
653 Err(e) => {
654 self.is_aborted = true;
655 return Err(e);
656 }
657 }
658 }
659 PlanOperation::Truncate(ref plan) => {
660 tracing::debug!("[TRUNCATE] Starting truncate for table '{}'", plan.table);
661 if let Err(e) = self.ensure_table_exists(&plan.table) {
662 tracing::debug!("[TRUNCATE] ensure_table_exists failed: {}", e);
663 self.is_aborted = true;
664 return Err(e);
665 }
666
667 let is_new_table = self.new_tables.contains(&plan.table);
669 tracing::debug!("[TRUNCATE] is_new_table={}", is_new_table);
670 if !is_new_table {
672 tracing::debug!(
673 "[TRUNCATE] Tracking access to existing table '{}'",
674 plan.table
675 );
676 self.accessed_tables.insert(plan.table.clone());
677 }
678 let result = if is_new_table {
679 tracing::debug!("[TRUNCATE] Truncating staging for new table");
680 self.staging.truncate(plan.clone())
681 } else {
682 tracing::debug!(
683 "[TRUNCATE] Truncating BASE with txn_id={}",
684 self.snapshot.txn_id
685 );
686 self.base_context
687 .truncate(plan.clone())
688 .and_then(|r| r.convert_pager_type())
689 };
690
691 tracing::debug!(
692 "[TRUNCATE] Result: {:?}",
693 result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
694 );
695 match result {
696 Ok(result) => {
697 if is_new_table {
699 tracing::trace!(
700 "[TX] TRUNCATE on new table - tracking for commit replay"
701 );
702 self.operations.push(PlanOperation::Truncate(plan.clone()));
703 } else {
704 tracing::trace!(
705 "[TX] TRUNCATE on existing table - already in BASE, no replay needed"
706 );
707 }
708 result
709 }
710 Err(e) => {
711 self.is_aborted = true;
712 return Err(e);
713 }
714 }
715 }
716 PlanOperation::Select(plan) => {
717 let table_name = select_plan_table_name(plan.as_ref()).unwrap_or_default();
720 let plan = *plan;
721 match self.execute_select(plan) {
722 Ok(staging_execution) => {
723 let schema = staging_execution.schema();
725 let batches = staging_execution.collect().unwrap_or_default();
726
727 let combined = if batches.is_empty() {
729 RecordBatch::new_empty(Arc::clone(&schema))
730 } else if batches.len() == 1 {
731 batches.into_iter().next().unwrap()
732 } else {
733 let refs: Vec<&RecordBatch> = batches.iter().collect();
734 arrow::compute::concat_batches(&schema, refs).map_err(|err| {
735 Error::Internal(format!("failed to concatenate batches: {err}"))
736 })?
737 };
738
739 let execution = SelectExecution::from_batch(
741 table_name.clone(),
742 Arc::clone(&schema),
743 combined,
744 );
745
746 TransactionResult::Select {
747 table_name,
748 schema,
749 execution,
750 }
751 }
752 Err(e) => {
753 return Err(e);
756 }
757 }
758 }
759 };
760
761 Ok(result)
762 }
763
764 pub fn operations(&self) -> &[PlanOperation] {
766 &self.operations
767 }
768}
769
770pub struct TransactionSession<BaseCtx, StagingCtx>
774where
775 BaseCtx: TransactionContext + 'static,
776 StagingCtx: TransactionContext + 'static,
777{
778 context: Arc<BaseCtx>,
779 session_id: TransactionSessionId,
780 transactions:
781 Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
782 txn_manager: Arc<TxnIdManager>,
783}
784
785impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
786where
787 BaseCtx: TransactionContext + 'static,
788 StagingCtx: TransactionContext + 'static,
789{
790 pub fn new(
791 context: Arc<BaseCtx>,
792 session_id: TransactionSessionId,
793 transactions: Arc<
794 Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>,
795 >,
796 txn_manager: Arc<TxnIdManager>,
797 ) -> Self {
798 Self {
799 context,
800 session_id,
801 transactions,
802 txn_manager,
803 }
804 }
805
806 pub fn clone_session(&self) -> Self {
809 Self {
810 context: Arc::clone(&self.context),
811 session_id: self.session_id,
812 transactions: Arc::clone(&self.transactions),
813 txn_manager: Arc::clone(&self.txn_manager),
814 }
815 }
816
817 pub fn session_id(&self) -> TransactionSessionId {
819 self.session_id
820 }
821
822 pub fn context(&self) -> &Arc<BaseCtx> {
824 &self.context
825 }
826
827 pub fn has_active_transaction(&self) -> bool {
829 self.transactions
830 .lock()
831 .expect("transactions lock poisoned")
832 .contains_key(&self.session_id)
833 }
834
835 pub fn is_aborted(&self) -> bool {
837 self.transactions
838 .lock()
839 .expect("transactions lock poisoned")
840 .get(&self.session_id)
841 .map(|tx| tx.is_aborted)
842 .unwrap_or(false)
843 }
844
845 pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
848 self.transactions
849 .lock()
850 .expect("transactions lock poisoned")
851 .get(&self.session_id)
852 .map(|tx| tx.new_tables.contains(table_name))
853 .unwrap_or(false)
854 }
855
856 pub fn table_column_specs_from_transaction(
859 &self,
860 table_name: &str,
861 ) -> Option<Vec<PlanColumnSpec>> {
862 let guard = self
863 .transactions
864 .lock()
865 .expect("transactions lock poisoned");
866
867 let tx = guard.get(&self.session_id)?;
868 if !tx.new_tables.contains(table_name) {
869 return None;
870 }
871
872 tx.staging.table_column_specs(table_name).ok()
874 }
875
876 pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
879 let canonical = referenced_table.to_ascii_lowercase();
880 let guard = self
881 .transactions
882 .lock()
883 .expect("transactions lock poisoned");
884
885 let tx = match guard.get(&self.session_id) {
886 Some(tx) => tx,
887 None => return Vec::new(),
888 };
889
890 tx.transactional_foreign_keys
891 .get(&canonical)
892 .cloned()
893 .unwrap_or_else(Vec::new)
894 }
895
896 pub fn has_table_locked_by_other_session(&self, table_name: &str) -> bool {
899 let canonical = table_name.to_ascii_lowercase();
900 let guard = self
901 .transactions
902 .lock()
903 .expect("transactions lock poisoned");
904
905 for (session_id, tx) in guard.iter() {
906 if *session_id == self.session_id {
908 continue;
909 }
910
911 if tx.locked_table_names.contains(&canonical) {
913 return true;
914 }
915 }
916
917 false
918 }
919
920 pub fn abort_transaction(&self) {
923 let mut guard = self
924 .transactions
925 .lock()
926 .expect("transactions lock poisoned");
927 if let Some(tx) = guard.get_mut(&self.session_id) {
928 tx.is_aborted = true;
929 }
930 }
931
932 pub fn begin_transaction(
934 &self,
935 staging: Arc<StagingCtx>,
936 ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
937 tracing::debug!(
938 "[BEGIN] begin_transaction called for session_id={}",
939 self.session_id
940 );
941 let mut guard = self
942 .transactions
943 .lock()
944 .expect("transactions lock poisoned");
945 tracing::debug!(
946 "[BEGIN] session_id={}, transactions map has {} entries",
947 self.session_id,
948 guard.len()
949 );
950 if guard.contains_key(&self.session_id) {
951 return Err(Error::InvalidArgumentError(
952 "a transaction is already in progress in this session".into(),
953 ));
954 }
955 guard.insert(
956 self.session_id,
957 SessionTransaction::new(
958 Arc::clone(&self.context),
959 staging,
960 Arc::clone(&self.txn_manager),
961 ),
962 );
963 tracing::debug!(
964 "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
965 self.session_id,
966 guard.len()
967 );
968 Ok(TransactionResult::Transaction {
969 kind: TransactionKind::Begin,
970 })
971 }
972
973 pub fn commit_transaction(
976 &self,
977 ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
978 tracing::trace!(
979 "[COMMIT] commit_transaction called for session {:?}",
980 self.session_id
981 );
982 let mut guard = self
983 .transactions
984 .lock()
985 .expect("transactions lock poisoned");
986 tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
987 let tx_opt = guard.remove(&self.session_id);
988 tracing::trace!(
989 "[COMMIT] commit_transaction remove returned: {}",
990 tx_opt.is_some()
991 );
992 let tx = tx_opt.ok_or_else(|| {
993 tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
994 Error::InvalidArgumentError(
995 "no transaction is currently in progress in this session".into(),
996 )
997 })?;
998 tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
999
1000 if tx.is_aborted {
1002 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1003 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1004 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1005 let auto_commit_snapshot = TransactionSnapshot {
1007 txn_id: TXN_ID_AUTO_COMMIT,
1008 snapshot_id: tx.txn_manager.last_committed(),
1009 };
1010 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1011 tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
1012 return Ok((
1013 TransactionResult::Transaction {
1014 kind: TransactionKind::Rollback,
1015 },
1016 Vec::new(),
1017 ));
1018 }
1019
1020 tracing::debug!(
1023 "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
1024 tx.snapshot.txn_id,
1025 tx.accessed_tables.len()
1026 );
1027 for accessed_table_name in &tx.accessed_tables {
1028 tracing::debug!(
1029 "[COMMIT CONFLICT CHECK] Checking table '{}'",
1030 accessed_table_name
1031 );
1032 if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
1034 match self.context.table_id(accessed_table_name) {
1036 Ok(current_table_id) => {
1037 if current_table_id != snapshot_table_id {
1039 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1040 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1041 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1042 let auto_commit_snapshot = TransactionSnapshot {
1043 txn_id: TXN_ID_AUTO_COMMIT,
1044 snapshot_id: tx.txn_manager.last_committed(),
1045 };
1046 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1047 return Err(Error::TransactionContextError(
1048 "another transaction has dropped this table".into(),
1049 ));
1050 }
1051 }
1052 Err(_) => {
1053 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1055 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1056 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1057 let auto_commit_snapshot = TransactionSnapshot {
1058 txn_id: TXN_ID_AUTO_COMMIT,
1059 snapshot_id: tx.txn_manager.last_committed(),
1060 };
1061 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1062 return Err(Error::TransactionContextError(
1063 "another transaction has dropped this table".into(),
1064 ));
1065 }
1066 }
1067 }
1068 }
1069
1070 if let Err(err) = tx
1071 .base_context
1072 .validate_commit_constraints(tx.snapshot.txn_id)
1073 {
1074 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1075 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1076 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1077 let auto_commit_snapshot = TransactionSnapshot {
1078 txn_id: TXN_ID_AUTO_COMMIT,
1079 snapshot_id: tx.txn_manager.last_committed(),
1080 };
1081 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1082 let wrapped = match err {
1083 Error::ConstraintError(msg) => Error::TransactionContextError(format!(
1084 "TransactionContext Error: constraint violation: {msg}"
1085 )),
1086 other => other,
1087 };
1088 return Err(wrapped);
1089 }
1090
1091 let operations = tx.operations;
1092 tracing::trace!(
1093 "DEBUG commit_transaction: returning Commit with {} operations",
1094 operations.len()
1095 );
1096
1097 tx.txn_manager.mark_committed(tx.snapshot.txn_id);
1098 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1099 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1100 TransactionContext::set_snapshot(&*self.context, tx.snapshot);
1101
1102 Ok((
1103 TransactionResult::Transaction {
1104 kind: TransactionKind::Commit,
1105 },
1106 operations,
1107 ))
1108 }
1109
1110 pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
1112 let mut guard = self
1113 .transactions
1114 .lock()
1115 .expect("transactions lock poisoned");
1116 if let Some(tx) = guard.remove(&self.session_id) {
1117 tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1118 tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1119 tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1120 let auto_commit_snapshot = TransactionSnapshot {
1122 txn_id: TXN_ID_AUTO_COMMIT,
1123 snapshot_id: tx.txn_manager.last_committed(),
1124 };
1125 TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1126 } else {
1127 return Err(Error::InvalidArgumentError(
1128 "no transaction is currently in progress in this session".into(),
1129 ));
1130 }
1131 Ok(TransactionResult::Transaction {
1132 kind: TransactionKind::Rollback,
1133 })
1134 }
1135
1136 pub fn execute_operation(
1138 &self,
1139 operation: PlanOperation,
1140 ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
1141 tracing::debug!(
1142 "[EXECUTE_OP] execute_operation called for session_id={}",
1143 self.session_id
1144 );
1145 if !self.has_active_transaction() {
1146 return Err(Error::InvalidArgumentError(
1148 "execute_operation called without active transaction".into(),
1149 ));
1150 }
1151
1152 if let PlanOperation::CreateTable(ref plan) = operation {
1154 let guard = self
1155 .transactions
1156 .lock()
1157 .expect("transactions lock poisoned");
1158
1159 let canonical_name = plan.name.to_ascii_lowercase();
1160
1161 for (other_session_id, other_tx) in guard.iter() {
1163 if *other_session_id != self.session_id
1164 && other_tx.locked_table_names.contains(&canonical_name)
1165 {
1166 return Err(Error::TransactionContextError(format!(
1167 "table '{}' is locked by another active transaction",
1168 plan.name
1169 )));
1170 }
1171 }
1172 drop(guard); }
1174
1175 if let PlanOperation::DropTable(ref plan) = operation {
1177 let guard = self
1178 .transactions
1179 .lock()
1180 .expect("transactions lock poisoned");
1181
1182 let canonical_name = plan.name.to_ascii_lowercase();
1183
1184 for (other_session_id, other_tx) in guard.iter() {
1186 if *other_session_id != self.session_id
1187 && other_tx.locked_table_names.contains(&canonical_name)
1188 {
1189 return Err(Error::TransactionContextError(format!(
1190 "table '{}' is locked by another active transaction",
1191 plan.name
1192 )));
1193 }
1194 }
1195 drop(guard); }
1197
1198 let mut guard = self
1200 .transactions
1201 .lock()
1202 .expect("transactions lock poisoned");
1203 tracing::debug!(
1204 "[EXECUTE_OP] session_id={}, transactions map has {} entries",
1205 self.session_id,
1206 guard.len()
1207 );
1208 let tx = guard
1209 .get_mut(&self.session_id)
1210 .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
1211 tracing::debug!(
1212 "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
1213 self.session_id,
1214 tx.snapshot.txn_id,
1215 tx.accessed_tables.len()
1216 );
1217
1218 let result = tx.execute_operation(operation);
1219 if let Err(ref e) = result {
1220 tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1221 tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1222 }
1223 result
1224 }
1225}
1226
1227impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1228where
1229 BaseCtx: TransactionContext,
1230 StagingCtx: TransactionContext,
1231{
1232 fn drop(&mut self) {
1233 match self.transactions.lock() {
1236 Ok(mut guard) => {
1237 if guard.remove(&self.session_id).is_some() {
1238 tracing::warn!(
1239 "TransactionSession dropped with active transaction - auto-rolling back"
1240 );
1241 }
1242 }
1243 Err(_) => {
1244 tracing::warn!("TransactionSession dropped with poisoned transaction mutex");
1247 }
1248 }
1249 }
1250}
1251
1252pub struct TransactionManager<BaseCtx, StagingCtx>
1257where
1258 BaseCtx: TransactionContext + 'static,
1259 StagingCtx: TransactionContext + 'static,
1260{
1261 transactions:
1262 Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1263 next_session_id: AtomicU64,
1264 txn_manager: Arc<TxnIdManager>,
1265}
1266
1267impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1268where
1269 BaseCtx: TransactionContext + 'static,
1270 StagingCtx: TransactionContext + 'static,
1271{
1272 pub fn new() -> Self {
1273 Self {
1274 transactions: Arc::new(Mutex::new(HashMap::new())),
1275 next_session_id: AtomicU64::new(1),
1276 txn_manager: Arc::new(TxnIdManager::new()),
1277 }
1278 }
1279
1280 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1282 Self {
1283 transactions: Arc::new(Mutex::new(HashMap::new())),
1284 next_session_id: AtomicU64::new(1),
1285 txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1286 }
1287 }
1288
1289 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1291 Self {
1292 transactions: Arc::new(Mutex::new(HashMap::new())),
1293 next_session_id: AtomicU64::new(1),
1294 txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1295 next_txn_id,
1296 last_committed,
1297 )),
1298 }
1299 }
1300
1301 pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1303 let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1304 tracing::debug!(
1305 "[TX_MANAGER] create_session: allocated session_id={}",
1306 session_id
1307 );
1308 TransactionSession::new(
1309 context,
1310 session_id,
1311 Arc::clone(&self.transactions),
1312 Arc::clone(&self.txn_manager),
1313 )
1314 }
1315
1316 pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1318 Arc::clone(&self.txn_manager)
1319 }
1320
1321 pub fn has_active_transaction(&self) -> bool {
1323 !self
1324 .transactions
1325 .lock()
1326 .expect("transactions lock poisoned")
1327 .is_empty()
1328 }
1329}
1330
1331impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1332where
1333 BaseCtx: TransactionContext + 'static,
1334 StagingCtx: TransactionContext + 'static,
1335{
1336 fn default() -> Self {
1337 Self::new()
1338 }
1339}