1use aegis_common::{AegisError, Result, TransactionId};
17use parking_lot::RwLock;
18use std::collections::{HashMap, HashSet};
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::{Duration, Instant};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum TransactionState {
29 Active,
30 Preparing,
31 Committed,
32 Aborted,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum IsolationLevel {
38 ReadUncommitted,
39 ReadCommitted,
40 #[default]
41 RepeatableRead,
42 Serializable,
43}
44
45#[derive(Debug)]
51pub struct Transaction {
52 pub id: TransactionId,
53 pub state: TransactionState,
54 pub isolation_level: IsolationLevel,
55 pub start_timestamp: u64,
56 pub commit_timestamp: Option<u64>,
57 pub snapshot: Snapshot,
58 pub write_set: HashSet<VersionKey>,
59 pub read_set: HashSet<VersionKey>,
60 pub locks_held: Vec<LockRequest>,
61 pub started_at: Instant,
62}
63
64impl Transaction {
65 pub fn new(
66 id: TransactionId,
67 isolation_level: IsolationLevel,
68 start_timestamp: u64,
69 active_transactions: HashSet<TransactionId>,
70 ) -> Self {
71 Self {
72 id,
73 state: TransactionState::Active,
74 isolation_level,
75 start_timestamp,
76 commit_timestamp: None,
77 snapshot: Snapshot {
78 timestamp: start_timestamp,
79 active_transactions,
80 },
81 write_set: HashSet::new(),
82 read_set: HashSet::new(),
83 locks_held: Vec::new(),
84 started_at: Instant::now(),
85 }
86 }
87
88 pub fn is_active(&self) -> bool {
89 self.state == TransactionState::Active
90 }
91
92 pub fn duration(&self) -> Duration {
93 self.started_at.elapsed()
94 }
95
96 pub fn add_to_write_set(&mut self, key: VersionKey) {
97 self.write_set.insert(key);
98 }
99
100 pub fn add_to_read_set(&mut self, key: VersionKey) {
101 self.read_set.insert(key);
102 }
103}
104
105#[derive(Debug, Clone)]
111pub struct Snapshot {
112 pub timestamp: u64,
113 pub active_transactions: HashSet<TransactionId>,
114}
115
116impl Snapshot {
117 pub fn is_visible(&self, version: &Version) -> bool {
119 match version.state {
120 VersionState::Committed(commit_ts) => {
121 commit_ts <= self.timestamp
122 && !self.active_transactions.contains(&version.created_by)
123 }
124 VersionState::Active => false,
125 VersionState::Aborted => false,
126 }
127 }
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Hash)]
136pub struct VersionKey {
137 pub table_id: u32,
138 pub row_id: u64,
139}
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum VersionState {
144 Active,
145 Committed(u64),
146 Aborted,
147}
148
149#[derive(Debug, Clone)]
151pub struct Version {
152 pub key: VersionKey,
153 pub created_by: TransactionId,
154 pub state: VersionState,
155 pub data: Vec<u8>,
156 pub prev_version: Option<Box<Version>>,
157}
158
159impl Version {
160 pub fn new(key: VersionKey, created_by: TransactionId, data: Vec<u8>) -> Self {
161 Self {
162 key,
163 created_by,
164 state: VersionState::Active,
165 data,
166 prev_version: None,
167 }
168 }
169
170 pub fn commit(&mut self, commit_timestamp: u64) {
171 self.state = VersionState::Committed(commit_timestamp);
172 }
173
174 pub fn abort(&mut self) {
175 self.state = VersionState::Aborted;
176 }
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum LockMode {
186 Shared,
187 Exclusive,
188 IntentShared,
189 IntentExclusive,
190 Update,
191}
192
193impl LockMode {
194 pub fn is_compatible(&self, other: &LockMode) -> bool {
196 use LockMode::*;
197 matches!(
198 (self, other),
199 (Shared, Shared)
200 | (Shared, IntentShared)
201 | (IntentShared, Shared)
202 | (IntentShared, IntentShared)
203 | (IntentShared, IntentExclusive)
204 | (IntentExclusive, IntentShared)
205 | (IntentExclusive, IntentExclusive)
206 )
207 }
208}
209
210#[derive(Debug, Clone)]
212pub struct LockRequest {
213 pub tx_id: TransactionId,
214 pub key: VersionKey,
215 pub mode: LockMode,
216 pub granted: bool,
217}
218
219#[derive(Debug, Default)]
221struct LockEntry {
222 holders: Vec<LockRequest>,
223 waiters: Vec<LockRequest>,
224}
225
226pub struct LockManager {
228 locks: RwLock<HashMap<VersionKey, LockEntry>>,
229 timeout: Duration,
230}
231
232impl LockManager {
233 pub fn new(timeout: Duration) -> Self {
234 Self {
235 locks: RwLock::new(HashMap::new()),
236 timeout,
237 }
238 }
239
240 pub fn acquire(&self, request: LockRequest) -> Result<()> {
242 let start = Instant::now();
243
244 loop {
245 {
246 let mut locks = self.locks.write();
247 let entry = locks.entry(request.key.clone()).or_default();
248
249 let can_grant = entry
250 .holders
251 .iter()
252 .all(|h| h.tx_id == request.tx_id || h.mode.is_compatible(&request.mode));
253
254 if can_grant {
255 entry.holders.push(LockRequest {
256 granted: true,
257 ..request.clone()
258 });
259 return Ok(());
260 }
261
262 if !entry.waiters.iter().any(|w| w.tx_id == request.tx_id) {
263 entry.waiters.push(request.clone());
264 }
265 }
266
267 if start.elapsed() > self.timeout {
268 self.release_waiter(&request);
269 return Err(AegisError::LockTimeout);
270 }
271
272 std::thread::sleep(Duration::from_millis(1));
273 }
274 }
275
276 pub fn try_acquire(&self, request: LockRequest) -> Result<bool> {
278 let mut locks = self.locks.write();
279 let entry = locks.entry(request.key.clone()).or_default();
280
281 let can_grant = entry
282 .holders
283 .iter()
284 .all(|h| h.tx_id == request.tx_id || h.mode.is_compatible(&request.mode));
285
286 if can_grant {
287 entry.holders.push(LockRequest {
288 granted: true,
289 ..request
290 });
291 Ok(true)
292 } else {
293 Ok(false)
294 }
295 }
296
297 pub fn release(&self, tx_id: TransactionId, key: &VersionKey) {
299 let mut locks = self.locks.write();
300
301 if let Some(entry) = locks.get_mut(key) {
302 entry.holders.retain(|h| h.tx_id != tx_id);
303
304 while !entry.waiters.is_empty() {
305 let waiter = entry.waiters.remove(0);
306 let can_grant = entry
307 .holders
308 .iter()
309 .all(|h| h.mode.is_compatible(&waiter.mode));
310
311 if can_grant {
312 entry.holders.push(LockRequest {
313 granted: true,
314 ..waiter
315 });
316 } else {
317 entry.waiters.insert(0, waiter);
318 break;
319 }
320 }
321
322 if entry.holders.is_empty() && entry.waiters.is_empty() {
323 locks.remove(key);
324 }
325 }
326 }
327
328 pub fn release_all(&self, tx_id: TransactionId) {
330 let mut locks = self.locks.write();
331 let keys: Vec<_> = locks.keys().cloned().collect();
332
333 for key in keys {
334 if let Some(entry) = locks.get_mut(&key) {
335 entry.holders.retain(|h| h.tx_id != tx_id);
336 entry.waiters.retain(|w| w.tx_id != tx_id);
337
338 if entry.holders.is_empty() && entry.waiters.is_empty() {
339 locks.remove(&key);
340 }
341 }
342 }
343 }
344
345 fn release_waiter(&self, request: &LockRequest) {
346 let mut locks = self.locks.write();
347 if let Some(entry) = locks.get_mut(&request.key) {
348 entry.waiters.retain(|w| w.tx_id != request.tx_id);
349 }
350 }
351}
352
353pub struct TransactionManager {
359 transactions: RwLock<HashMap<TransactionId, Transaction>>,
360 next_tx_id: AtomicU64,
361 next_timestamp: AtomicU64,
362 lock_manager: LockManager,
363 versions: RwLock<HashMap<VersionKey, Version>>,
364}
365
366impl TransactionManager {
367 pub fn new() -> Self {
368 Self {
369 transactions: RwLock::new(HashMap::new()),
370 next_tx_id: AtomicU64::new(1),
371 next_timestamp: AtomicU64::new(1),
372 lock_manager: LockManager::new(Duration::from_secs(30)),
373 versions: RwLock::new(HashMap::new()),
374 }
375 }
376
377 pub fn begin(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
379 let tx_id = TransactionId(self.next_tx_id.fetch_add(1, Ordering::SeqCst));
380 let start_ts = self.next_timestamp.fetch_add(1, Ordering::SeqCst);
381
382 let active_txs: HashSet<_> = self
383 .transactions
384 .read()
385 .iter()
386 .filter(|(_, tx)| tx.is_active())
387 .map(|(id, _)| *id)
388 .collect();
389
390 let transaction = Transaction::new(tx_id, isolation_level, start_ts, active_txs);
391
392 self.transactions.write().insert(tx_id, transaction);
393
394 Ok(tx_id)
395 }
396
397 pub fn commit(&self, tx_id: TransactionId) -> Result<()> {
399 let commit_ts = self.next_timestamp.fetch_add(1, Ordering::SeqCst);
400
401 {
402 let mut txs = self.transactions.write();
403 let tx = txs
404 .get_mut(&tx_id)
405 .ok_or_else(|| AegisError::Transaction("Transaction not found".to_string()))?;
406
407 if tx.state != TransactionState::Active {
408 return Err(AegisError::Transaction(
409 "Transaction not active".to_string(),
410 ));
411 }
412
413 if tx.isolation_level == IsolationLevel::Serializable {
414 self.validate_serializable(tx)?;
415 }
416
417 tx.state = TransactionState::Preparing;
418 tx.commit_timestamp = Some(commit_ts);
419 }
420
421 {
422 let mut versions = self.versions.write();
423 let txs = self.transactions.read();
424 let tx = txs.get(&tx_id).ok_or_else(|| {
425 AegisError::Transaction("Transaction disappeared during commit".to_string())
426 })?;
427
428 for key in &tx.write_set {
429 if let Some(version) = versions.get_mut(key) {
430 if version.created_by == tx_id {
431 version.commit(commit_ts);
432 }
433 }
434 }
435 }
436
437 {
438 let mut txs = self.transactions.write();
439 if let Some(tx) = txs.get_mut(&tx_id) {
440 tx.state = TransactionState::Committed;
441 }
442 }
443
444 self.lock_manager.release_all(tx_id);
445
446 Ok(())
447 }
448
449 pub fn abort(&self, tx_id: TransactionId) -> Result<()> {
451 {
452 let mut txs = self.transactions.write();
453 let tx = txs
454 .get_mut(&tx_id)
455 .ok_or_else(|| AegisError::Transaction("Transaction not found".to_string()))?;
456
457 tx.state = TransactionState::Aborted;
458 }
459
460 {
461 let mut versions = self.versions.write();
462 let txs = self.transactions.read();
463 let tx = txs.get(&tx_id).ok_or_else(|| {
464 AegisError::Transaction("Transaction disappeared during abort".to_string())
465 })?;
466
467 for key in &tx.write_set {
468 if let Some(version) = versions.get_mut(key) {
469 if version.created_by == tx_id {
470 version.abort();
471 }
472 }
473 }
474 }
475
476 self.lock_manager.release_all(tx_id);
477
478 Ok(())
479 }
480
481 pub fn read(&self, tx_id: TransactionId, key: &VersionKey) -> Result<Option<Vec<u8>>> {
483 let txs = self.transactions.read();
484 let tx = txs
485 .get(&tx_id)
486 .ok_or_else(|| AegisError::Transaction("Transaction not found".to_string()))?;
487
488 if !tx.is_active() {
489 return Err(AegisError::Transaction(
490 "Transaction not active".to_string(),
491 ));
492 }
493
494 let versions = self.versions.read();
495 if let Some(version) = versions.get(key) {
496 if tx.snapshot.is_visible(version) {
497 return Ok(Some(version.data.clone()));
498 }
499
500 let mut current = version.prev_version.as_ref();
501 while let Some(v) = current {
502 if tx.snapshot.is_visible(v) {
503 return Ok(Some(v.data.clone()));
504 }
505 current = v.prev_version.as_ref();
506 }
507 }
508
509 Ok(None)
510 }
511
512 pub fn write(&self, tx_id: TransactionId, key: VersionKey, data: Vec<u8>) -> Result<()> {
514 {
515 let mut txs = self.transactions.write();
516 let tx = txs
517 .get_mut(&tx_id)
518 .ok_or_else(|| AegisError::Transaction("Transaction not found".to_string()))?;
519
520 if !tx.is_active() {
521 return Err(AegisError::Transaction(
522 "Transaction not active".to_string(),
523 ));
524 }
525
526 tx.add_to_write_set(key.clone());
527 }
528
529 let lock_request = LockRequest {
530 tx_id,
531 key: key.clone(),
532 mode: LockMode::Exclusive,
533 granted: false,
534 };
535 self.lock_manager.acquire(lock_request)?;
536
537 {
538 let txs = self.transactions.read();
539 let tx = txs.get(&tx_id).ok_or_else(|| {
540 AegisError::Transaction("Transaction disappeared during write".to_string())
541 })?;
542 tx.locks_held.len(); }
544
545 let mut versions = self.versions.write();
546 let new_version = Version::new(key.clone(), tx_id, data);
547
548 if let Some(existing) = versions.remove(&key) {
549 let mut new_v = new_version;
550 new_v.prev_version = Some(Box::new(existing));
551 versions.insert(key, new_v);
552 } else {
553 versions.insert(key, new_version);
554 }
555
556 Ok(())
557 }
558
559 pub fn delete(&self, tx_id: TransactionId, key: &VersionKey) -> Result<()> {
561 self.write(tx_id, key.clone(), Vec::new())
562 }
563
564 pub fn stats(&self) -> TransactionStats {
566 let txs = self.transactions.read();
567 let versions = self.versions.read();
568 let mut active = 0;
569 let mut committed = 0;
570 let mut aborted = 0;
571
572 for tx in txs.values() {
573 match tx.state {
574 TransactionState::Active | TransactionState::Preparing => active += 1,
575 TransactionState::Committed => committed += 1,
576 TransactionState::Aborted => aborted += 1,
577 }
578 }
579
580 let mut version_count = 0;
582 for version in versions.values() {
583 version_count += 1;
584 let mut prev = version.prev_version.as_ref();
585 while let Some(v) = prev {
586 version_count += 1;
587 prev = v.prev_version.as_ref();
588 }
589 }
590
591 TransactionStats {
592 active,
593 committed,
594 aborted,
595 total: txs.len(),
596 version_count,
597 }
598 }
599
600 fn get_min_active_timestamp(&self) -> u64 {
607 let txs = self.transactions.read();
608 txs.values()
609 .filter(|tx| tx.is_active())
610 .map(|tx| tx.start_timestamp)
611 .min()
612 .unwrap_or(u64::MAX)
613 }
614
615 pub fn run_gc(&self) -> GcStats {
618 let min_ts = self.get_min_active_timestamp();
619 let mut versions_collected = 0;
620
621 {
623 let mut versions = self.versions.write();
624 for version in versions.values_mut() {
625 versions_collected += Self::gc_version_chain(version, min_ts);
626 }
627 }
628
629 let transactions_cleaned = {
631 let mut txs = self.transactions.write();
632 let to_remove: Vec<TransactionId> = txs
633 .iter()
634 .filter(|(_, tx)| {
635 match tx.state {
636 TransactionState::Committed | TransactionState::Aborted => {
637 tx.commit_timestamp.unwrap_or(tx.start_timestamp) < min_ts
639 }
640 _ => false,
641 }
642 })
643 .map(|(id, _)| *id)
644 .collect();
645
646 let count = to_remove.len();
647 for id in to_remove {
648 txs.remove(&id);
649 }
650 count
651 };
652
653 GcStats {
654 versions_collected,
655 transactions_cleaned,
656 min_active_timestamp: min_ts,
657 }
658 }
659
660 fn gc_version_chain(version: &mut Version, min_ts: u64) -> usize {
669 let mut collected = 0;
670
671 if let Some(ref mut prev) = version.prev_version {
673 collected += Self::gc_version_chain(prev, min_ts);
675
676 if prev.state == VersionState::Aborted {
678 version.prev_version = prev.prev_version.take();
679 collected += 1;
680 return collected;
681 }
682
683 if let VersionState::Committed(curr_commit_ts) = version.state {
688 if let VersionState::Committed(prev_commit_ts) = prev.state {
689 if prev_commit_ts < min_ts && curr_commit_ts < min_ts {
692 version.prev_version = None;
693 collected += 1;
694 }
695 }
696 }
697 }
698
699 collected
700 }
701
702 pub fn run_gc_if_needed(&self, threshold: usize) -> Option<GcStats> {
705 let stats = self.stats();
706 if stats.version_count > threshold {
707 Some(self.run_gc())
708 } else {
709 None
710 }
711 }
712
713 fn validate_serializable(&self, tx: &Transaction) -> Result<()> {
714 let txs = self.transactions.read();
715
716 for other_tx in txs.values() {
717 if other_tx.id == tx.id {
718 continue;
719 }
720
721 if other_tx.state != TransactionState::Committed {
722 continue;
723 }
724
725 if let Some(commit_ts) = other_tx.commit_timestamp {
726 if commit_ts > tx.start_timestamp {
727 for read_key in &tx.read_set {
728 if other_tx.write_set.contains(read_key) {
729 return Err(AegisError::SerializationFailure);
730 }
731 }
732 }
733 }
734 }
735
736 Ok(())
737 }
738}
739
740impl Default for TransactionManager {
741 fn default() -> Self {
742 Self::new()
743 }
744}
745
746#[derive(Debug, Clone)]
752pub struct TransactionStats {
753 pub active: usize,
754 pub committed: usize,
755 pub aborted: usize,
756 pub total: usize,
757 pub version_count: usize,
759}
760
761#[derive(Debug, Clone)]
763pub struct GcStats {
764 pub versions_collected: usize,
766 pub transactions_cleaned: usize,
768 pub min_active_timestamp: u64,
770}
771
772#[cfg(test)]
777mod tests {
778 use super::*;
779
780 #[test]
781 fn test_transaction_lifecycle() {
782 let tm = TransactionManager::new();
783
784 let tx_id = tm.begin(IsolationLevel::RepeatableRead).unwrap();
785 assert!(tm.transactions.read().get(&tx_id).unwrap().is_active());
786
787 tm.commit(tx_id).unwrap();
788 assert_eq!(
789 tm.transactions.read().get(&tx_id).unwrap().state,
790 TransactionState::Committed
791 );
792 }
793
794 #[test]
795 fn test_transaction_abort() {
796 let tm = TransactionManager::new();
797
798 let tx_id = tm.begin(IsolationLevel::RepeatableRead).unwrap();
799 tm.abort(tx_id).unwrap();
800
801 assert_eq!(
802 tm.transactions.read().get(&tx_id).unwrap().state,
803 TransactionState::Aborted
804 );
805 }
806
807 #[test]
808 fn test_mvcc_read_write() {
809 let tm = TransactionManager::new();
810
811 let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
812 let key = VersionKey {
813 table_id: 1,
814 row_id: 1,
815 };
816
817 tm.write(tx1, key.clone(), b"hello".to_vec()).unwrap();
818 tm.commit(tx1).unwrap();
819
820 let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
821 let data = tm.read(tx2, &key).unwrap();
822 assert_eq!(data, Some(b"hello".to_vec()));
823 tm.commit(tx2).unwrap();
824 }
825
826 #[test]
827 fn test_snapshot_isolation() {
828 let tm = TransactionManager::new();
829
830 let key = VersionKey {
831 table_id: 1,
832 row_id: 1,
833 };
834
835 let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
836 tm.write(tx1, key.clone(), b"v1".to_vec()).unwrap();
837 tm.commit(tx1).unwrap();
838
839 let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
840
841 let tx3 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
842 tm.write(tx3, key.clone(), b"v2".to_vec()).unwrap();
843 tm.commit(tx3).unwrap();
844
845 let data = tm.read(tx2, &key).unwrap();
846 assert_eq!(data, Some(b"v1".to_vec()));
847
848 tm.commit(tx2).unwrap();
849 }
850
851 #[test]
852 fn test_lock_compatibility() {
853 assert!(LockMode::Shared.is_compatible(&LockMode::Shared));
854 assert!(!LockMode::Shared.is_compatible(&LockMode::Exclusive));
855 assert!(!LockMode::Exclusive.is_compatible(&LockMode::Exclusive));
856 assert!(!LockMode::Exclusive.is_compatible(&LockMode::Shared));
857 }
858
859 #[test]
860 fn test_lock_manager() {
861 let lm = LockManager::new(Duration::from_secs(1));
862 let key = VersionKey {
863 table_id: 1,
864 row_id: 1,
865 };
866
867 let req1 = LockRequest {
868 tx_id: TransactionId(1),
869 key: key.clone(),
870 mode: LockMode::Shared,
871 granted: false,
872 };
873
874 assert!(lm.try_acquire(req1).unwrap());
875
876 let req2 = LockRequest {
877 tx_id: TransactionId(2),
878 key: key.clone(),
879 mode: LockMode::Shared,
880 granted: false,
881 };
882 assert!(lm.try_acquire(req2).unwrap());
883
884 let req3 = LockRequest {
885 tx_id: TransactionId(3),
886 key: key.clone(),
887 mode: LockMode::Exclusive,
888 granted: false,
889 };
890 assert!(!lm.try_acquire(req3).unwrap());
891
892 lm.release(TransactionId(1), &key);
893 lm.release(TransactionId(2), &key);
894
895 let req4 = LockRequest {
896 tx_id: TransactionId(3),
897 key: key.clone(),
898 mode: LockMode::Exclusive,
899 granted: false,
900 };
901 assert!(lm.try_acquire(req4).unwrap());
902 }
903
904 #[test]
905 fn test_gc_cleans_old_transactions() {
906 let tm = TransactionManager::new();
907
908 let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
910 tm.commit(tx1).unwrap();
911
912 let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
913 tm.commit(tx2).unwrap();
914
915 let tx3 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
916 tm.abort(tx3).unwrap();
917
918 assert_eq!(tm.stats().total, 3);
920 assert_eq!(tm.stats().committed, 2);
921 assert_eq!(tm.stats().aborted, 1);
922
923 let gc_stats = tm.run_gc();
925 assert_eq!(gc_stats.transactions_cleaned, 3);
926
927 assert_eq!(tm.stats().total, 0);
929 }
930
931 #[test]
932 fn test_gc_preserves_active_transaction_visible_versions() {
933 let tm = TransactionManager::new();
934 let key = VersionKey {
935 table_id: 1,
936 row_id: 1,
937 };
938
939 let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
941 tm.write(tx1, key.clone(), b"v1".to_vec()).unwrap();
942 tm.commit(tx1).unwrap();
943
944 let tx_long = tm.begin(IsolationLevel::RepeatableRead).unwrap();
946
947 let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
949 tm.write(tx2, key.clone(), b"v2".to_vec()).unwrap();
950 tm.commit(tx2).unwrap();
951
952 let data = tm.read(tx_long, &key).unwrap();
954 assert_eq!(data, Some(b"v1".to_vec()));
955
956 let _gc_stats = tm.run_gc();
958 let data = tm.read(tx_long, &key).unwrap();
963 assert_eq!(data, Some(b"v1".to_vec()));
964
965 tm.commit(tx_long).unwrap();
966 }
967
968 #[test]
969 fn test_gc_removes_aborted_versions() {
970 let tm = TransactionManager::new();
971 let key = VersionKey {
972 table_id: 1,
973 row_id: 1,
974 };
975
976 let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
978 tm.write(tx1, key.clone(), b"v1".to_vec()).unwrap();
979 tm.commit(tx1).unwrap();
980
981 let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
983 tm.write(tx2, key.clone(), b"v2_aborted".to_vec()).unwrap();
984 tm.abort(tx2).unwrap();
985
986 let tx3 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
988 tm.write(tx3, key.clone(), b"v3".to_vec()).unwrap();
989 tm.commit(tx3).unwrap();
990
991 let gc_stats = tm.run_gc();
993 assert!(gc_stats.versions_collected > 0 || gc_stats.transactions_cleaned > 0);
994
995 let tx4 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
997 let data = tm.read(tx4, &key).unwrap();
998 assert_eq!(data, Some(b"v3".to_vec()));
999 tm.commit(tx4).unwrap();
1000 }
1001
1002 #[test]
1003 fn test_stats_includes_version_count() {
1004 let tm = TransactionManager::new();
1005 let key = VersionKey {
1006 table_id: 1,
1007 row_id: 1,
1008 };
1009
1010 assert_eq!(tm.stats().version_count, 0);
1011
1012 let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
1013 tm.write(tx1, key.clone(), b"v1".to_vec()).unwrap();
1014 tm.commit(tx1).unwrap();
1015
1016 assert_eq!(tm.stats().version_count, 1);
1017
1018 let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
1019 tm.write(tx2, key.clone(), b"v2".to_vec()).unwrap();
1020 tm.commit(tx2).unwrap();
1021
1022 assert_eq!(tm.stats().version_count, 2);
1024 }
1025
1026 #[test]
1027 fn test_run_gc_if_needed() {
1028 let tm = TransactionManager::new();
1029 let key = VersionKey {
1030 table_id: 1,
1031 row_id: 1,
1032 };
1033
1034 assert!(tm.run_gc_if_needed(100).is_none());
1036
1037 for i in 0..5 {
1039 let tx = tm.begin(IsolationLevel::RepeatableRead).unwrap();
1040 tm.write(tx, key.clone(), format!("v{}", i).into_bytes())
1041 .unwrap();
1042 tm.commit(tx).unwrap();
1043 }
1044
1045 assert_eq!(tm.stats().version_count, 5);
1047
1048 let result = tm.run_gc_if_needed(3);
1050 assert!(result.is_some());
1051 }
1052}