1use crate::{Snapshot, VersionStore};
4use featherdb_core::{Error, Lsn, PageId, Result, TransactionConfig, TransactionId};
5use featherdb_storage::Wal;
6use parking_lot::{Mutex, RwLock};
7use std::collections::{HashMap, HashSet};
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionMode {
14 ReadOnly,
16 ReadWrite,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum TransactionStatus {
23 Active,
25 Committing,
27 Committed,
29 Aborted,
31}
32
33#[derive(Debug, Clone)]
35pub struct TransactionInfo {
36 pub id: u64,
38 pub age: Duration,
40}
41
42#[derive(Debug, Clone)]
44pub struct GcStatus {
45 pub oldest_active_txn: Option<u64>,
47 pub blocked: bool,
49 pub blocked_reason: Option<String>,
51}
52
53#[derive(Debug, Default)]
55pub struct TransactionMetrics {
56 active_count: AtomicU64,
58 total_commits: AtomicU64,
60 total_rollbacks: AtomicU64,
62 long_running_count: AtomicU64,
64 total_duration_us: AtomicU64,
66 total_transactions: AtomicU64,
68}
69
70impl TransactionMetrics {
71 pub fn new() -> Self {
72 Self::default()
73 }
74
75 pub fn record_begin(&self) {
76 self.active_count.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn record_commit(&self, duration: Duration) {
80 self.active_count.fetch_sub(1, Ordering::Relaxed);
81 self.total_commits.fetch_add(1, Ordering::Relaxed);
82 self.total_transactions.fetch_add(1, Ordering::Relaxed);
83 self.total_duration_us
84 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
85 }
86
87 pub fn record_rollback(&self, duration: Duration) {
88 self.active_count.fetch_sub(1, Ordering::Relaxed);
89 self.total_rollbacks.fetch_add(1, Ordering::Relaxed);
90 self.total_transactions.fetch_add(1, Ordering::Relaxed);
91 self.total_duration_us
92 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
93 }
94
95 pub fn update_long_running_count(&self, count: u64) {
96 self.long_running_count.store(count, Ordering::Relaxed);
97 }
98
99 pub fn snapshot(&self) -> TransactionMetricsSnapshot {
101 let active = self.active_count.load(Ordering::Relaxed);
102 let commits = self.total_commits.load(Ordering::Relaxed);
103 let rollbacks = self.total_rollbacks.load(Ordering::Relaxed);
104 let long_running = self.long_running_count.load(Ordering::Relaxed);
105 let total_duration = self.total_duration_us.load(Ordering::Relaxed);
106 let total = self.total_transactions.load(Ordering::Relaxed);
107
108 let avg_duration_us = if total > 0 { total_duration / total } else { 0 };
109
110 TransactionMetricsSnapshot {
111 active_count: active,
112 total_commits: commits,
113 total_rollbacks: rollbacks,
114 long_running_count: long_running,
115 avg_duration_us,
116 total_transactions: total,
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
123pub struct TransactionMetricsSnapshot {
124 pub active_count: u64,
126 pub total_commits: u64,
128 pub total_rollbacks: u64,
130 pub long_running_count: u64,
132 pub avg_duration_us: u64,
134 pub total_transactions: u64,
136}
137
138impl TransactionMetricsSnapshot {
139 pub fn commit_rate(&self) -> f64 {
142 if self.total_transactions == 0 {
143 0.0
144 } else {
145 self.total_commits as f64 / self.total_transactions as f64
146 }
147 }
148}
149
150#[allow(dead_code)]
152struct TransactionState {
153 id: TransactionId,
154 mode: TransactionMode,
155 status: TransactionStatus,
156 snapshot: Snapshot,
157 start_time: Instant,
158 last_lsn: Lsn,
159 write_set: HashSet<(String, Vec<u8>)>,
161}
162
163pub struct TransactionManager {
165 next_txn_id: AtomicU64,
167 active_txns: RwLock<HashMap<TransactionId, TransactionState>>,
169 write_lock: Mutex<()>,
171 oldest_active: AtomicU64,
173 version_store: RwLock<VersionStore>,
175 aborted_txns: RwLock<HashSet<TransactionId>>,
177 has_aborted: AtomicBool,
179 metrics: TransactionMetrics,
181 dirty_pages: RwLock<HashMap<TransactionId, HashSet<PageId>>>,
183 #[allow(clippy::type_complexity)]
186 committed_writes: RwLock<HashMap<TransactionId, HashSet<(String, Vec<u8>)>>>,
187}
188
189impl TransactionManager {
190 pub fn new() -> Self {
195 TransactionManager {
196 next_txn_id: AtomicU64::new(1),
197 active_txns: RwLock::new(HashMap::new()),
198 write_lock: Mutex::new(()),
199 oldest_active: AtomicU64::new(u64::MAX),
200 version_store: RwLock::new(VersionStore::new()),
201 aborted_txns: RwLock::new(HashSet::new()),
202 has_aborted: AtomicBool::new(false),
203 metrics: TransactionMetrics::new(),
204 dirty_pages: RwLock::new(HashMap::new()),
205 committed_writes: RwLock::new(HashMap::new()),
206 }
207 }
208
209 pub fn with_start_txn_id(start_txn_id: u64) -> Self {
217 TransactionManager {
218 next_txn_id: AtomicU64::new(start_txn_id),
219 active_txns: RwLock::new(HashMap::new()),
220 write_lock: Mutex::new(()),
221 oldest_active: AtomicU64::new(u64::MAX),
222 version_store: RwLock::new(VersionStore::new()),
223 aborted_txns: RwLock::new(HashSet::new()),
224 has_aborted: AtomicBool::new(false),
225 metrics: TransactionMetrics::new(),
226 dirty_pages: RwLock::new(HashMap::new()),
227 committed_writes: RwLock::new(HashMap::new()),
228 }
229 }
230
231 pub fn next_txn_id(&self) -> u64 {
236 self.next_txn_id.load(Ordering::SeqCst)
237 }
238
239 pub fn begin(&self, mode: TransactionMode) -> Transaction<'_> {
241 self.begin_with_config(mode, TransactionConfig::default())
242 }
243
244 pub fn begin_with_config(
246 &self,
247 mode: TransactionMode,
248 config: TransactionConfig,
249 ) -> Transaction<'_> {
250 let txn_id = TransactionId(self.next_txn_id.fetch_add(1, Ordering::SeqCst));
251 let created_at = Instant::now();
252
253 let snapshot = {
255 let active = self.active_txns.read();
256 let in_progress: HashSet<_> = active.keys().copied().collect();
257 let hwm = TransactionId(self.next_txn_id.load(Ordering::SeqCst));
258 Snapshot::new(txn_id, hwm, in_progress)
259 };
260
261 let state = TransactionState {
262 id: txn_id,
263 mode,
264 status: TransactionStatus::Active,
265 snapshot: snapshot.clone(),
266 start_time: created_at,
267 last_lsn: Lsn::ZERO,
268 write_set: HashSet::new(),
269 };
270
271 {
273 let mut active = self.active_txns.write();
274 active.insert(txn_id, state);
275
276 let oldest = active.keys().min().map(|t| t.0).unwrap_or(u64::MAX);
278 self.oldest_active.store(oldest, Ordering::SeqCst);
279 }
280
281 self.metrics.record_begin();
283
284 Transaction {
285 id: txn_id,
286 mode,
287 snapshot,
288 manager: self,
289 created_at,
290 config,
291 warned: false,
292 }
293 }
294
295 pub fn begin_read_only(&self) -> Transaction<'_> {
297 self.begin(TransactionMode::ReadOnly)
298 }
299
300 pub fn begin_read_write(&self) -> Transaction<'_> {
302 self.begin(TransactionMode::ReadWrite)
303 }
304
305 pub fn commit_txn(&self, txn_id: TransactionId) -> Result<()> {
307 self.commit(txn_id, None)
308 }
309
310 pub fn abort_txn(&self, txn_id: TransactionId) -> Result<()> {
312 self.abort(txn_id, None)
313 }
314
315 pub fn add_snapshot_ancestor(&self, txn_id: TransactionId, ancestor: TransactionId) {
326 let mut active = self.active_txns.write();
327 if let Some(state) = active.get_mut(&txn_id) {
328 state.snapshot.add_ancestor(ancestor);
329 }
330 }
331
332 pub fn record_write(
337 &self,
338 txn_id: TransactionId,
339 table: String,
340 pk_bytes: Vec<u8>,
341 ) -> Result<()> {
342 let entry = (table.clone(), pk_bytes);
343
344 let snapshot = {
346 let active = self.active_txns.read();
347 match active.get(&txn_id) {
348 Some(state) => state.snapshot.clone(),
349 None => return Ok(()), }
351 };
352
353 {
355 let committed = self.committed_writes.read();
356 for (&committed_txn_id, committed_ws) in committed.iter() {
357 if !snapshot.can_see(committed_txn_id) && committed_ws.contains(&entry) {
358 return Err(Error::WriteConflict { table });
359 }
360 }
361 }
362
363 {
366 let active = self.active_txns.read();
367 for (&other_txn_id, other_state) in active.iter() {
368 if other_txn_id != txn_id
369 && !snapshot.is_ancestor(other_txn_id)
370 && other_state.write_set.contains(&entry)
371 {
372 return Err(Error::WriteConflict { table });
373 }
374 }
375 }
376
377 let mut active = self.active_txns.write();
379 if let Some(state) = active.get_mut(&txn_id) {
380 state.write_set.insert(entry);
381 }
382
383 Ok(())
384 }
385
386 fn commit(&self, txn_id: TransactionId, wal: Option<&mut Wal>) -> Result<()> {
388 let (duration, result) = {
389 let mut active = self.active_txns.write();
390
391 let state = active.get_mut(&txn_id).ok_or(Error::TransactionEnded)?;
392
393 if state.status != TransactionStatus::Active {
394 return Err(Error::TransactionEnded);
395 }
396
397 let start_time = state.start_time;
398
399 let write_set = std::mem::take(&mut state.write_set);
402 let snapshot = state.snapshot.clone();
403
404 if !write_set.is_empty() {
405 let committed = self.committed_writes.read();
406 let mut conflict_table: Option<String> = None;
407
408 'outer: for (&committed_txn_id, committed_ws) in committed.iter() {
409 if !snapshot.can_see(committed_txn_id) {
412 for entry in &write_set {
413 if committed_ws.contains(entry) {
414 conflict_table = Some(entry.0.clone());
415 break 'outer;
416 }
417 }
418 }
419 }
420 drop(committed);
421
422 if let Some(table) = conflict_table {
423 if let Some(s) = active.get_mut(&txn_id) {
425 s.status = TransactionStatus::Aborted;
426 }
427 active.remove(&txn_id);
428 let oldest = active.keys().min().map(|t| t.0).unwrap_or(u64::MAX);
429 self.oldest_active.store(oldest, Ordering::SeqCst);
430 drop(active);
431 self.aborted_txns.write().insert(txn_id);
432 self.has_aborted.store(true, Ordering::Release);
433 self.metrics.record_rollback(start_time.elapsed());
434 return Err(Error::WriteConflict { table });
435 }
436 }
437
438 state.status = TransactionStatus::Committing;
439
440 if let Some(wal) = wal {
442 wal.log_commit(txn_id, state.last_lsn)?;
443 }
444
445 state.status = TransactionStatus::Committed;
446
447 if !write_set.is_empty() {
449 self.committed_writes.write().insert(txn_id, write_set);
450 }
451
452 active.remove(&txn_id);
454
455 let oldest = active.keys().min().map(|t| t.0).unwrap_or(u64::MAX);
457 self.oldest_active.store(oldest, Ordering::SeqCst);
458
459 let oldest_val = self.oldest_active.load(Ordering::SeqCst);
463 if oldest_val == u64::MAX {
464 self.committed_writes.write().clear();
466 }
467
468 (start_time.elapsed(), Ok(()))
469 };
470
471 self.metrics.record_commit(duration);
473
474 result
475 }
476
477 fn abort(&self, txn_id: TransactionId, wal: Option<&mut Wal>) -> Result<()> {
479 let (duration, result) = {
480 let mut active = self.active_txns.write();
481
482 let state = active.get_mut(&txn_id).ok_or(Error::TransactionEnded)?;
483
484 if state.status != TransactionStatus::Active {
485 return Err(Error::TransactionEnded);
486 }
487
488 let start_time = state.start_time;
489
490 if let Some(wal) = wal {
492 wal.log_abort(txn_id, state.last_lsn)?;
493 }
494
495 state.status = TransactionStatus::Aborted;
496
497 active.remove(&txn_id);
499
500 self.aborted_txns.write().insert(txn_id);
502 self.has_aborted.store(true, Ordering::Release);
503
504 let oldest = active.keys().min().map(|t| t.0).unwrap_or(u64::MAX);
506 self.oldest_active.store(oldest, Ordering::SeqCst);
507
508 (start_time.elapsed(), Ok(()))
509 };
510
511 self.metrics.record_rollback(duration);
513
514 result
515 }
516
517 pub fn acquire_write_lock(&self) -> impl Drop + '_ {
519 self.write_lock.lock()
520 }
521
522 #[allow(dead_code)]
524 fn update_last_lsn(&self, txn_id: TransactionId, lsn: Lsn) {
525 let mut active = self.active_txns.write();
526 if let Some(state) = active.get_mut(&txn_id) {
527 state.last_lsn = lsn;
528 }
529 }
530
531 pub fn oldest_active_txn(&self) -> Option<TransactionId> {
533 let oldest = self.oldest_active.load(Ordering::SeqCst);
534 if oldest == u64::MAX {
535 None
536 } else {
537 Some(TransactionId(oldest))
538 }
539 }
540
541 pub fn version_store(&self) -> &RwLock<VersionStore> {
543 &self.version_store
544 }
545
546 pub fn active_count(&self) -> usize {
548 self.active_txns.read().len()
549 }
550
551 #[inline]
554 pub fn has_aborted_transactions(&self) -> bool {
555 self.has_aborted.load(Ordering::Acquire)
556 }
557
558 #[inline]
560 pub fn is_aborted(&self, txn_id: TransactionId) -> bool {
561 if !self.has_aborted.load(Ordering::Acquire) {
563 return false;
564 }
565 self.aborted_txns.read().contains(&txn_id)
566 }
567
568 #[inline]
576 pub fn is_row_visible(
577 &self,
578 snapshot: &Snapshot,
579 created_by: TransactionId,
580 deleted_by: Option<TransactionId>,
581 ) -> bool {
582 if self.is_aborted(created_by) {
585 return false;
586 }
587
588 let effective_deleted_by = match deleted_by {
591 Some(del_by) if self.is_aborted(del_by) => None, other => other,
593 };
594
595 snapshot.is_visible(created_by, effective_deleted_by)
597 }
598
599 pub fn aborted_count(&self) -> usize {
601 self.aborted_txns.read().len()
602 }
603
604 pub fn cleanup_aborted_txns(&self, older_than: TransactionId) {
611 let mut aborted = self.aborted_txns.write();
612 aborted.retain(|&txn_id| txn_id >= older_than);
613 if aborted.is_empty() {
614 self.has_aborted.store(false, Ordering::Release);
615 }
616 }
617
618 pub fn register_dirty_page(&self, txn_id: TransactionId, page_id: PageId) {
626 let mut dirty = self.dirty_pages.write();
627 dirty.entry(txn_id).or_default().insert(page_id);
628 }
629
630 pub fn get_dirty_pages(&self, txn_id: TransactionId) -> HashSet<PageId> {
632 self.dirty_pages
633 .read()
634 .get(&txn_id)
635 .cloned()
636 .unwrap_or_default()
637 }
638
639 pub fn clear_dirty_pages(&self, txn_id: TransactionId) {
641 self.dirty_pages.write().remove(&txn_id);
642 }
643
644 pub fn dirty_page_count(&self, txn_id: TransactionId) -> usize {
646 self.dirty_pages
647 .read()
648 .get(&txn_id)
649 .map(|s| s.len())
650 .unwrap_or(0)
651 }
652
653 pub fn run_gc(&self) -> crate::version::GcStats {
658 let oldest = self.oldest_active_txn();
659 let mut version_store = self.version_store.write();
660 version_store.gc(oldest)
661 }
662
663 pub fn version_count(&self) -> usize {
665 self.version_store.read().len()
666 }
667
668 pub fn long_running_transactions(&self, threshold: Duration) -> Vec<TransactionInfo> {
673 let active = self.active_txns.read();
674 let now = Instant::now();
675
676 let mut results: Vec<TransactionInfo> = active
677 .iter()
678 .filter_map(|(id, state)| {
679 let elapsed = now.duration_since(state.start_time);
680 if elapsed >= threshold {
681 Some(TransactionInfo {
682 id: id.0,
683 age: elapsed,
684 })
685 } else {
686 None
687 }
688 })
689 .collect();
690
691 results.sort_by(|a, b| b.age.cmp(&a.age));
693 results
694 }
695
696 pub fn gc_status(&self) -> GcStatus {
701 let active = self.active_txns.read();
702 let oldest = active.keys().min().copied();
703
704 GcStatus {
705 oldest_active_txn: oldest.map(|id| id.0),
706 blocked: oldest.is_some(),
707 blocked_reason: oldest
708 .map(|id| format!("Waiting for transaction {} to complete", id.0)),
709 }
710 }
711
712 pub fn metrics(&self) -> TransactionMetricsSnapshot {
717 let long_running = self.long_running_transactions(Duration::from_secs(30));
719 self.metrics
720 .update_long_running_count(long_running.len() as u64);
721
722 self.metrics.snapshot()
723 }
724}
725
726impl Default for TransactionManager {
727 fn default() -> Self {
728 Self::new()
729 }
730}
731
732pub struct Transaction<'a> {
734 id: TransactionId,
736 mode: TransactionMode,
738 snapshot: Snapshot,
740 manager: &'a TransactionManager,
742 created_at: Instant,
744 config: TransactionConfig,
746 warned: bool,
748}
749
750impl<'a> Transaction<'a> {
751 pub fn id(&self) -> TransactionId {
753 self.id
754 }
755
756 pub fn mode(&self) -> TransactionMode {
758 self.mode
759 }
760
761 pub fn snapshot(&self) -> &Snapshot {
763 &self.snapshot
764 }
765
766 pub fn is_read_only(&self) -> bool {
768 self.mode == TransactionMode::ReadOnly
769 }
770
771 pub fn commit(self) -> Result<()> {
773 self.check_timeout()?;
774 self.manager.commit(self.id, None)
775 }
776
777 pub fn commit_with_wal(self, wal: &mut Wal) -> Result<()> {
779 self.check_timeout()?;
780 self.manager.commit(self.id, Some(wal))
781 }
782
783 pub fn abort(self) -> Result<()> {
785 self.manager.abort(self.id, None)
786 }
787
788 pub fn abort_with_wal(self, wal: &mut Wal) -> Result<()> {
790 self.manager.abort(self.id, Some(wal))
791 }
792
793 pub fn can_see(&self, created_by: TransactionId) -> bool {
795 self.snapshot.can_see(created_by)
796 }
797
798 pub fn check_write(&self) -> Result<()> {
800 self.check_timeout()?;
801 if self.mode == TransactionMode::ReadOnly {
802 return Err(Error::ReadOnly);
803 }
804 Ok(())
805 }
806
807 pub fn check_read(&self) -> Result<()> {
809 self.check_timeout()
810 }
811
812 pub fn is_expired(&self) -> bool {
814 if let Some(timeout_ms) = self.config.timeout_ms {
815 self.created_at.elapsed().as_millis() as u64 > timeout_ms
816 } else {
817 false
818 }
819 }
820
821 pub fn remaining_time(&self) -> Option<Duration> {
823 let timeout_ms = self.config.timeout_ms?;
824 let elapsed = self.created_at.elapsed();
825 let timeout = Duration::from_millis(timeout_ms);
826 if elapsed >= timeout {
827 None
828 } else {
829 Some(timeout - elapsed)
830 }
831 }
832
833 pub fn elapsed(&self) -> Duration {
835 self.created_at.elapsed()
836 }
837
838 pub fn should_warn(&mut self) -> bool {
841 if self.warned {
842 return false;
843 }
844 if let Some(warn_ms) = self.config.warn_after_ms {
845 if self.created_at.elapsed().as_millis() as u64 > warn_ms {
846 self.warned = true;
847 return true;
848 }
849 }
850 false
851 }
852
853 fn check_timeout(&self) -> Result<()> {
855 if self.is_expired() {
856 return Err(Error::transaction_timeout(
857 self.id.0,
858 self.created_at.elapsed().as_millis() as u64,
859 self.config.timeout_ms.unwrap_or(0),
860 ));
861 }
862 Ok(())
863 }
864}
865
866impl<'a> Drop for Transaction<'a> {
867 fn drop(&mut self) {
868 let _ = self.manager.abort(self.id, None);
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use super::*;
876
877 #[test]
878 fn test_transaction_manager() {
879 let tm = TransactionManager::new();
880
881 let txn1 = tm.begin_read_write();
882 assert_eq!(txn1.id(), TransactionId(1));
883
884 let txn2 = tm.begin_read_only();
885 assert_eq!(txn2.id(), TransactionId(2));
886
887 assert_eq!(tm.active_count(), 2);
888
889 txn1.commit().unwrap();
890 assert_eq!(tm.active_count(), 1);
891
892 txn2.abort().unwrap();
893 assert_eq!(tm.active_count(), 0);
894 }
895
896 #[test]
897 fn test_snapshot_isolation() {
898 let tm = TransactionManager::new();
899
900 let txn1 = tm.begin_read_write();
901 let txn2 = tm.begin_read_only();
902
903 assert!(!txn2.can_see(txn1.id()));
905
906 assert!(txn1.can_see(txn1.id()));
908 assert!(txn2.can_see(txn2.id()));
909 }
910
911 #[test]
912 fn test_read_only_cannot_write() {
913 let tm = TransactionManager::new();
914 let txn = tm.begin_read_only();
915
916 assert!(txn.check_write().is_err());
917 }
918
919 #[test]
920 fn test_auto_abort_on_drop() {
921 let tm = TransactionManager::new();
922
923 {
924 let _txn = tm.begin_read_write();
925 assert_eq!(tm.active_count(), 1);
926 }
928
929 assert_eq!(tm.active_count(), 0);
930 }
931
932 #[test]
933 fn test_run_gc() {
934 use featherdb_core::Value;
935
936 let tm = TransactionManager::new();
937
938 {
940 let mut vs = tm.version_store().write();
941
942 let old_version = crate::version::OldVersion {
944 data: vec![Value::Integer(42)],
945 created_by: TransactionId(1),
946 deleted_by: Some(TransactionId(2)),
947 prev_version: None,
948 };
949 vs.insert(old_version);
950
951 let new_version = crate::version::OldVersion {
953 data: vec![Value::Integer(100)],
954 created_by: TransactionId(3),
955 deleted_by: Some(TransactionId(100)),
956 prev_version: None,
957 };
958 vs.insert(new_version);
959 }
960
961 assert_eq!(tm.version_count(), 2);
962
963 let stats = tm.run_gc();
965
966 assert_eq!(stats.versions_removed, 2);
967 assert_eq!(tm.version_count(), 0);
968 }
969
970 #[test]
971 fn test_gc_preserves_versions_for_active_transactions() {
972 let tm = TransactionManager::new();
973
974 {
976 let mut vs = tm.version_store().write();
977 let old_version = crate::version::OldVersion {
978 data: vec![featherdb_core::Value::Integer(42)],
979 created_by: TransactionId(1),
980 deleted_by: Some(TransactionId(5)),
981 prev_version: None,
982 };
983 vs.insert(old_version);
984 }
985
986 let txn = tm.begin_read_write();
988
989 for _ in 0..10 {
991 let t = tm.begin_read_write();
992 t.commit().unwrap();
993 }
994
995 let stats = tm.run_gc();
1004
1005 assert_eq!(stats.versions_removed, 0);
1008 assert_eq!(tm.version_count(), 1);
1009
1010 txn.commit().unwrap();
1012
1013 let stats = tm.run_gc();
1015 assert_eq!(stats.versions_removed, 1);
1016 assert_eq!(tm.version_count(), 0);
1017 }
1018
1019 #[test]
1020 fn test_transaction_default_has_timeout() {
1021 let tm = TransactionManager::new();
1022 let txn = tm.begin_read_write();
1023
1024 assert!(!txn.is_expired());
1026 let remaining = txn.remaining_time();
1027 assert!(remaining.is_some());
1028 assert!(remaining.unwrap().as_secs() >= 29);
1030 }
1031
1032 #[test]
1033 fn test_transaction_no_limits() {
1034 let tm = TransactionManager::new();
1035 let config = TransactionConfig::no_limits();
1036 let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1037
1038 assert!(!txn.is_expired());
1040 assert_eq!(txn.remaining_time(), None);
1041 }
1042
1043 #[test]
1044 fn test_transaction_with_timeout() {
1045 use std::thread;
1046 use std::time::Duration;
1047
1048 let tm = TransactionManager::new();
1049 let config = TransactionConfig::new().with_timeout(100);
1050 let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1051
1052 assert!(!txn.is_expired());
1054 assert!(txn.remaining_time().is_some());
1055
1056 thread::sleep(Duration::from_millis(150));
1058
1059 assert!(txn.is_expired());
1061 assert_eq!(txn.remaining_time(), None);
1062 }
1063
1064 #[test]
1065 fn test_transaction_elapsed_time() {
1066 use std::thread;
1067 use std::time::Duration;
1068
1069 let tm = TransactionManager::new();
1070 let txn = tm.begin_read_write();
1071
1072 assert!(txn.elapsed().as_millis() < 10);
1074
1075 thread::sleep(Duration::from_millis(50));
1076
1077 assert!(txn.elapsed().as_millis() >= 50);
1079 }
1080
1081 #[test]
1082 fn test_transaction_warning_threshold() {
1083 use std::thread;
1084 use std::time::Duration;
1085
1086 let tm = TransactionManager::new();
1087 let config = TransactionConfig::new().with_warning(50);
1088 let mut txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1089
1090 assert!(!txn.should_warn());
1092
1093 thread::sleep(Duration::from_millis(70));
1094
1095 assert!(txn.should_warn());
1097
1098 assert!(!txn.should_warn());
1100 }
1101
1102 #[test]
1103 fn test_transaction_timeout_and_warning() {
1104 use std::thread;
1105 use std::time::Duration;
1106
1107 let tm = TransactionManager::new();
1108 let config = TransactionConfig::with_timeout_and_warning(200, 50);
1109 let mut txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1110
1111 assert!(!txn.is_expired());
1113 assert!(!txn.should_warn());
1114
1115 thread::sleep(Duration::from_millis(70));
1116
1117 assert!(txn.should_warn());
1119 assert!(!txn.is_expired());
1120 assert!(txn.remaining_time().is_some());
1121
1122 thread::sleep(Duration::from_millis(150));
1123
1124 assert!(txn.is_expired());
1126 assert_eq!(txn.remaining_time(), None);
1127 }
1128
1129 #[test]
1130 fn test_transaction_remaining_time_decreases() {
1131 use std::thread;
1132 use std::time::Duration;
1133
1134 let tm = TransactionManager::new();
1135 let config = TransactionConfig::new().with_timeout(500);
1136 let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1137
1138 let remaining1 = txn.remaining_time().unwrap();
1139 thread::sleep(Duration::from_millis(100));
1140 let remaining2 = txn.remaining_time().unwrap();
1141
1142 assert!(remaining2 < remaining1);
1144 assert!(remaining2.as_millis() < remaining1.as_millis());
1145 }
1146
1147 #[test]
1148 fn test_default_transaction_config() {
1149 let tm = TransactionManager::new();
1150 let config = TransactionConfig::default();
1151 let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1152
1153 assert!(!txn.is_expired());
1155 let remaining = txn.remaining_time();
1156 assert!(remaining.is_some());
1157 assert!(remaining.unwrap().as_secs() >= 29);
1158 }
1159
1160 #[test]
1161 fn test_timeout_prevents_commit() {
1162 use std::thread;
1163 use std::time::Duration;
1164
1165 let tm = TransactionManager::new();
1166 let config = TransactionConfig::new().with_timeout(50);
1167 let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1168
1169 thread::sleep(Duration::from_millis(70));
1171
1172 let result = txn.commit();
1174 assert!(result.is_err());
1175 match result {
1176 Err(Error::TransactionTimeout {
1177 transaction_id,
1178 elapsed_ms,
1179 timeout_ms,
1180 }) => {
1181 assert_eq!(transaction_id, 1);
1182 assert!(elapsed_ms >= 70);
1183 assert_eq!(timeout_ms, 50);
1184 }
1185 _ => panic!("Expected TransactionTimeout error"),
1186 }
1187 }
1188
1189 #[test]
1190 fn test_timeout_prevents_write_operations() {
1191 use std::thread;
1192 use std::time::Duration;
1193
1194 let tm = TransactionManager::new();
1195 let config = TransactionConfig::new().with_timeout(50);
1196 let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1197
1198 thread::sleep(Duration::from_millis(70));
1200
1201 let result = txn.check_write();
1203 assert!(result.is_err());
1204 match result {
1205 Err(Error::TransactionTimeout {
1206 transaction_id,
1207 elapsed_ms,
1208 timeout_ms,
1209 }) => {
1210 assert_eq!(transaction_id, 1);
1211 assert!(elapsed_ms >= 70);
1212 assert_eq!(timeout_ms, 50);
1213 }
1214 _ => panic!("Expected TransactionTimeout error"),
1215 }
1216 }
1217
1218 #[test]
1219 fn test_timeout_prevents_read_operations() {
1220 use std::thread;
1221 use std::time::Duration;
1222
1223 let tm = TransactionManager::new();
1224 let config = TransactionConfig::new().with_timeout(50);
1225 let txn = tm.begin_with_config(TransactionMode::ReadOnly, config);
1226
1227 thread::sleep(Duration::from_millis(70));
1229
1230 let result = txn.check_read();
1232 assert!(result.is_err());
1233 match result {
1234 Err(Error::TransactionTimeout {
1235 transaction_id,
1236 elapsed_ms,
1237 timeout_ms,
1238 }) => {
1239 assert_eq!(transaction_id, 1);
1240 assert!(elapsed_ms >= 70);
1241 assert_eq!(timeout_ms, 50);
1242 }
1243 _ => panic!("Expected TransactionTimeout error"),
1244 }
1245 }
1246
1247 #[test]
1248 fn test_no_timeout_allows_operations() {
1249 use std::thread;
1250 use std::time::Duration;
1251
1252 let tm = TransactionManager::new();
1253 let txn = tm.begin_read_write();
1254
1255 thread::sleep(Duration::from_millis(50));
1257
1258 assert!(txn.check_write().is_ok());
1260 assert!(txn.check_read().is_ok());
1261 assert!(txn.commit().is_ok());
1262 }
1263
1264 #[test]
1265 fn test_operations_succeed_within_timeout() {
1266 use std::thread;
1267 use std::time::Duration;
1268
1269 let tm = TransactionManager::new();
1270 let config = TransactionConfig::new().with_timeout(200);
1271 let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1272
1273 thread::sleep(Duration::from_millis(50));
1275
1276 assert!(txn.check_write().is_ok());
1278 assert!(txn.check_read().is_ok());
1279 assert!(txn.commit().is_ok());
1280 }
1281
1282 #[test]
1283 fn test_timeout_error_message() {
1284 use std::thread;
1285 use std::time::Duration;
1286
1287 let tm = TransactionManager::new();
1288 let config = TransactionConfig::new().with_timeout(30);
1289 let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1290
1291 thread::sleep(Duration::from_millis(50));
1292
1293 let result = txn.commit();
1294 assert!(result.is_err());
1295
1296 let error_msg = result.unwrap_err().to_string();
1297 assert!(error_msg.contains("Transaction"));
1298 assert!(error_msg.contains("exceeded timeout"));
1299 assert!(error_msg.contains("30ms"));
1300 }
1301
1302 #[test]
1303 fn test_long_running_transactions_empty() {
1304 use std::time::Duration;
1305
1306 let tm = TransactionManager::new();
1307
1308 let long_running = tm.long_running_transactions(Duration::from_secs(1));
1310 assert!(long_running.is_empty());
1311 }
1312
1313 #[test]
1314 fn test_long_running_transactions_below_threshold() {
1315 use std::time::Duration;
1316
1317 let tm = TransactionManager::new();
1318 let _txn1 = tm.begin_read_write();
1319 let _txn2 = tm.begin_read_only();
1320
1321 let long_running = tm.long_running_transactions(Duration::from_secs(1));
1323 assert!(long_running.is_empty());
1324 }
1325
1326 #[test]
1327 fn test_long_running_transactions_above_threshold() {
1328 use std::thread;
1329 use std::time::Duration;
1330
1331 let tm = TransactionManager::new();
1332 let _txn1 = tm.begin_read_write();
1333
1334 thread::sleep(Duration::from_millis(100));
1336
1337 let _txn2 = tm.begin_read_only();
1338
1339 let long_running = tm.long_running_transactions(Duration::from_millis(50));
1341
1342 assert_eq!(long_running.len(), 1);
1345 assert_eq!(long_running[0].id, 1);
1346 assert!(long_running[0].age >= Duration::from_millis(100));
1347 }
1348
1349 #[test]
1350 fn test_long_running_transactions_sorted_by_age() {
1351 use std::thread;
1352 use std::time::Duration;
1353
1354 let tm = TransactionManager::new();
1355 let _txn1 = tm.begin_read_write();
1356
1357 thread::sleep(Duration::from_millis(50));
1358 let _txn2 = tm.begin_read_only();
1359
1360 thread::sleep(Duration::from_millis(50));
1361 let _txn3 = tm.begin_read_write();
1362
1363 thread::sleep(Duration::from_millis(30));
1365
1366 let long_running = tm.long_running_transactions(Duration::from_millis(20));
1368
1369 assert_eq!(long_running.len(), 3);
1371 assert_eq!(long_running[0].id, 1); assert_eq!(long_running[1].id, 2); assert_eq!(long_running[2].id, 3); assert!(long_running[0].age >= long_running[1].age);
1377 assert!(long_running[1].age >= long_running[2].age);
1378 }
1379
1380 #[test]
1381 fn test_long_running_transactions_after_commit() {
1382 use std::thread;
1383 use std::time::Duration;
1384
1385 let tm = TransactionManager::new();
1386 let txn1 = tm.begin_read_write();
1387
1388 thread::sleep(Duration::from_millis(50));
1389
1390 let long_running = tm.long_running_transactions(Duration::from_millis(20));
1392 assert_eq!(long_running.len(), 1);
1393 assert_eq!(long_running[0].id, 1);
1394
1395 txn1.commit().unwrap();
1397
1398 let long_running = tm.long_running_transactions(Duration::from_millis(20));
1400 assert!(long_running.is_empty());
1401 }
1402
1403 #[test]
1404 fn test_gc_status_no_active_transactions() {
1405 let tm = TransactionManager::new();
1406
1407 let status = tm.gc_status();
1408 assert_eq!(status.oldest_active_txn, None);
1409 assert!(!status.blocked);
1410 assert_eq!(status.blocked_reason, None);
1411 }
1412
1413 #[test]
1414 fn test_gc_status_with_active_transaction() {
1415 let tm = TransactionManager::new();
1416 let _txn = tm.begin_read_write();
1417
1418 let status = tm.gc_status();
1419 assert_eq!(status.oldest_active_txn, Some(1));
1420 assert!(status.blocked);
1421 assert!(status.blocked_reason.is_some());
1422 assert!(status.blocked_reason.unwrap().contains("transaction 1"));
1423 }
1424
1425 #[test]
1426 fn test_gc_status_multiple_transactions() {
1427 let tm = TransactionManager::new();
1428 let _txn1 = tm.begin_read_write();
1429 let _txn2 = tm.begin_read_only();
1430 let _txn3 = tm.begin_read_write();
1431
1432 let status = tm.gc_status();
1433 assert_eq!(status.oldest_active_txn, Some(1));
1435 assert!(status.blocked);
1436 assert!(status.blocked_reason.unwrap().contains("transaction 1"));
1437 }
1438
1439 #[test]
1440 fn test_gc_status_after_oldest_commits() {
1441 let tm = TransactionManager::new();
1442 let txn1 = tm.begin_read_write();
1443 let _txn2 = tm.begin_read_only();
1444 let _txn3 = tm.begin_read_write();
1445
1446 let status = tm.gc_status();
1448 assert_eq!(status.oldest_active_txn, Some(1));
1449 assert!(status.blocked);
1450
1451 txn1.commit().unwrap();
1453
1454 let status = tm.gc_status();
1456 assert_eq!(status.oldest_active_txn, Some(2));
1457 assert!(status.blocked);
1458 assert!(status.blocked_reason.unwrap().contains("transaction 2"));
1459 }
1460
1461 #[test]
1462 fn test_gc_status_after_all_commit() {
1463 let tm = TransactionManager::new();
1464 let txn1 = tm.begin_read_write();
1465 let txn2 = tm.begin_read_only();
1466
1467 let status = tm.gc_status();
1469 assert!(status.blocked);
1470
1471 txn1.commit().unwrap();
1473 txn2.commit().unwrap();
1474
1475 let status = tm.gc_status();
1477 assert_eq!(status.oldest_active_txn, None);
1478 assert!(!status.blocked);
1479 assert_eq!(status.blocked_reason, None);
1480 }
1481
1482 #[test]
1483 fn test_long_running_and_gc_status_integration() {
1484 use std::thread;
1485 use std::time::Duration;
1486
1487 let tm = TransactionManager::new();
1488
1489 let _old_txn = tm.begin_read_write();
1491
1492 thread::sleep(Duration::from_millis(100));
1493
1494 let _new_txn1 = tm.begin_read_only();
1496 let _new_txn2 = tm.begin_read_write();
1497
1498 let long_running = tm.long_running_transactions(Duration::from_millis(50));
1500 assert_eq!(long_running.len(), 1);
1501 assert_eq!(long_running[0].id, 1);
1502
1503 let gc_status = tm.gc_status();
1505 assert_eq!(gc_status.oldest_active_txn, Some(1));
1506 assert!(gc_status.blocked);
1507 assert!(gc_status.blocked_reason.unwrap().contains("transaction 1"));
1508
1509 assert_eq!(long_running[0].id, gc_status.oldest_active_txn.unwrap());
1511 }
1512
1513 #[test]
1514 fn test_transaction_info_structure() {
1515 use std::thread;
1516 use std::time::Duration;
1517
1518 let tm = TransactionManager::new();
1519 let _txn = tm.begin_read_write();
1520
1521 thread::sleep(Duration::from_millis(50));
1522
1523 let long_running = tm.long_running_transactions(Duration::from_millis(20));
1524 assert_eq!(long_running.len(), 1);
1525
1526 let info = &long_running[0];
1527 assert_eq!(info.id, 1);
1528 assert!(info.age >= Duration::from_millis(50));
1529 assert!(info.age < Duration::from_secs(1));
1530 }
1531
1532 #[test]
1533 fn test_gc_status_structure() {
1534 let tm = TransactionManager::new();
1535 let _txn = tm.begin_read_write();
1536
1537 let status = tm.gc_status();
1538 assert_eq!(status.oldest_active_txn, Some(1));
1539 assert!(status.blocked);
1540
1541 let reason = status.blocked_reason.unwrap();
1542 assert!(reason.contains("Waiting for"));
1543 assert!(reason.contains("transaction 1"));
1544 assert!(reason.contains("complete"));
1545 }
1546
1547 #[test]
1548 fn test_transaction_metrics_initial_state() {
1549 let tm = TransactionManager::new();
1550 let metrics = tm.metrics();
1551
1552 assert_eq!(metrics.active_count, 0);
1553 assert_eq!(metrics.total_commits, 0);
1554 assert_eq!(metrics.total_rollbacks, 0);
1555 assert_eq!(metrics.long_running_count, 0);
1556 assert_eq!(metrics.avg_duration_us, 0);
1557 assert_eq!(metrics.total_transactions, 0);
1558 assert_eq!(metrics.commit_rate(), 0.0);
1559 }
1560
1561 #[test]
1562 fn test_transaction_metrics_begin() {
1563 let tm = TransactionManager::new();
1564 let _txn1 = tm.begin_read_write();
1565 let _txn2 = tm.begin_read_only();
1566
1567 let metrics = tm.metrics();
1568 assert_eq!(metrics.active_count, 2);
1569 assert_eq!(metrics.total_commits, 0);
1570 assert_eq!(metrics.total_rollbacks, 0);
1571 }
1572
1573 #[test]
1574 fn test_transaction_metrics_commit() {
1575 use std::thread;
1576
1577 let tm = TransactionManager::new();
1578 let txn = tm.begin_read_write();
1579
1580 thread::sleep(Duration::from_millis(10));
1582
1583 txn.commit().unwrap();
1584
1585 let metrics = tm.metrics();
1586 assert_eq!(metrics.active_count, 0);
1587 assert_eq!(metrics.total_commits, 1);
1588 assert_eq!(metrics.total_rollbacks, 0);
1589 assert_eq!(metrics.total_transactions, 1);
1590 assert!(metrics.avg_duration_us > 0);
1591 assert_eq!(metrics.commit_rate(), 1.0);
1592 }
1593
1594 #[test]
1595 fn test_transaction_metrics_rollback() {
1596 use std::thread;
1597
1598 let tm = TransactionManager::new();
1599 let txn = tm.begin_read_write();
1600
1601 thread::sleep(Duration::from_millis(10));
1602
1603 txn.abort().unwrap();
1604
1605 let metrics = tm.metrics();
1606 assert_eq!(metrics.active_count, 0);
1607 assert_eq!(metrics.total_commits, 0);
1608 assert_eq!(metrics.total_rollbacks, 1);
1609 assert_eq!(metrics.total_transactions, 1);
1610 assert!(metrics.avg_duration_us > 0);
1611 assert_eq!(metrics.commit_rate(), 0.0);
1612 }
1613
1614 #[test]
1615 fn test_transaction_metrics_mixed() {
1616 use std::thread;
1617
1618 let tm = TransactionManager::new();
1619
1620 for _ in 0..3 {
1622 let txn = tm.begin_read_write();
1623 thread::sleep(Duration::from_millis(5));
1624 txn.commit().unwrap();
1625 }
1626
1627 let txn = tm.begin_read_write();
1629 thread::sleep(Duration::from_millis(5));
1630 txn.abort().unwrap();
1631
1632 let metrics = tm.metrics();
1633 assert_eq!(metrics.active_count, 0);
1634 assert_eq!(metrics.total_commits, 3);
1635 assert_eq!(metrics.total_rollbacks, 1);
1636 assert_eq!(metrics.total_transactions, 4);
1637 assert!(metrics.avg_duration_us > 0);
1638 assert_eq!(metrics.commit_rate(), 0.75); }
1640
1641 #[test]
1642 fn test_transaction_metrics_long_running() {
1643 use std::thread;
1644
1645 let tm = TransactionManager::new();
1646 let _txn1 = tm.begin_read_write();
1647
1648 thread::sleep(Duration::from_millis(10));
1651
1652 let _txn2 = tm.begin_read_only();
1653
1654 let metrics = tm.metrics();
1655 assert_eq!(metrics.active_count, 2);
1656 assert_eq!(metrics.long_running_count, 0);
1659 }
1660
1661 #[test]
1662 fn test_transaction_metrics_snapshot_commit_rate() {
1663 let snapshot = TransactionMetricsSnapshot {
1664 active_count: 2,
1665 total_commits: 7,
1666 total_rollbacks: 3,
1667 long_running_count: 1,
1668 avg_duration_us: 1000,
1669 total_transactions: 10,
1670 };
1671
1672 assert_eq!(snapshot.commit_rate(), 0.7);
1673 }
1674
1675 #[test]
1676 fn test_transaction_metrics_snapshot_commit_rate_zero_transactions() {
1677 let snapshot = TransactionMetricsSnapshot {
1678 active_count: 0,
1679 total_commits: 0,
1680 total_rollbacks: 0,
1681 long_running_count: 0,
1682 avg_duration_us: 0,
1683 total_transactions: 0,
1684 };
1685
1686 assert_eq!(snapshot.commit_rate(), 0.0);
1687 }
1688
1689 #[test]
1690 fn test_transaction_metrics_average_duration() {
1691 use std::thread;
1692
1693 let tm = TransactionManager::new();
1694
1695 let txn1 = tm.begin_read_write();
1697 thread::sleep(Duration::from_millis(10));
1698 txn1.commit().unwrap();
1699
1700 let txn2 = tm.begin_read_write();
1701 thread::sleep(Duration::from_millis(20));
1702 txn2.commit().unwrap();
1703
1704 let metrics = tm.metrics();
1705 assert_eq!(metrics.total_transactions, 2);
1706 assert!(metrics.avg_duration_us >= 10_000);
1709 }
1710
1711 #[test]
1712 fn test_transaction_metrics_auto_abort() {
1713 let tm = TransactionManager::new();
1714
1715 {
1716 let _txn = tm.begin_read_write();
1717 }
1719
1720 let metrics = tm.metrics();
1721 assert_eq!(metrics.active_count, 0);
1722 assert_eq!(metrics.total_commits, 0);
1723 assert_eq!(metrics.total_rollbacks, 1);
1724 assert_eq!(metrics.total_transactions, 1);
1725 }
1726}