Skip to main content

aegis_storage/
transaction.rs

1//! Aegis Transaction - MVCC Transaction Management
2//!
3//! Multi-Version Concurrency Control implementation providing snapshot isolation
4//! and serializable transactions. Manages transaction lifecycles, version chains,
5//! and conflict detection.
6//!
7//! Key Features:
8//! - Snapshot isolation with consistent reads
9//! - Optimistic concurrency control
10//! - Transaction version tracking
11//! - Conflict detection and resolution
12//!
13//! @version 0.1.0
14//! @author AutomataNexus Development Team
15
16use aegis_common::{AegisError, Result, TransactionId};
17use parking_lot::RwLock;
18use std::collections::{HashMap, HashSet};
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::{Duration, Instant};
21
22// =============================================================================
23// Transaction State
24// =============================================================================
25
26/// Current state of a transaction.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum TransactionState {
29    Active,
30    Preparing,
31    Committed,
32    Aborted,
33}
34
35/// Isolation level for transactions.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum IsolationLevel {
38    ReadUncommitted,
39    ReadCommitted,
40    #[default]
41    RepeatableRead,
42    Serializable,
43}
44
45// =============================================================================
46// Transaction
47// =============================================================================
48
49/// A database transaction with MVCC support.
50#[derive(Debug)]
51pub struct Transaction {
52    pub id: TransactionId,
53    pub state: TransactionState,
54    pub isolation_level: IsolationLevel,
55    pub start_timestamp: u64,
56    pub commit_timestamp: Option<u64>,
57    pub snapshot: Snapshot,
58    pub write_set: HashSet<VersionKey>,
59    pub read_set: HashSet<VersionKey>,
60    pub locks_held: Vec<LockRequest>,
61    pub started_at: Instant,
62}
63
64impl Transaction {
65    pub fn new(
66        id: TransactionId,
67        isolation_level: IsolationLevel,
68        start_timestamp: u64,
69        active_transactions: HashSet<TransactionId>,
70    ) -> Self {
71        Self {
72            id,
73            state: TransactionState::Active,
74            isolation_level,
75            start_timestamp,
76            commit_timestamp: None,
77            snapshot: Snapshot {
78                timestamp: start_timestamp,
79                active_transactions,
80            },
81            write_set: HashSet::new(),
82            read_set: HashSet::new(),
83            locks_held: Vec::new(),
84            started_at: Instant::now(),
85        }
86    }
87
88    pub fn is_active(&self) -> bool {
89        self.state == TransactionState::Active
90    }
91
92    pub fn duration(&self) -> Duration {
93        self.started_at.elapsed()
94    }
95
96    pub fn add_to_write_set(&mut self, key: VersionKey) {
97        self.write_set.insert(key);
98    }
99
100    pub fn add_to_read_set(&mut self, key: VersionKey) {
101        self.read_set.insert(key);
102    }
103}
104
105// =============================================================================
106// Snapshot
107// =============================================================================
108
109/// A consistent snapshot for transaction reads.
110#[derive(Debug, Clone)]
111pub struct Snapshot {
112    pub timestamp: u64,
113    pub active_transactions: HashSet<TransactionId>,
114}
115
116impl Snapshot {
117    /// Check if a version is visible to this snapshot.
118    pub fn is_visible(&self, version: &Version) -> bool {
119        match version.state {
120            VersionState::Committed(commit_ts) => {
121                commit_ts <= self.timestamp
122                    && !self.active_transactions.contains(&version.created_by)
123            }
124            VersionState::Active => false,
125            VersionState::Aborted => false,
126        }
127    }
128}
129
130// =============================================================================
131// Version Management
132// =============================================================================
133
134/// Key identifying a specific version of a row.
135#[derive(Debug, Clone, PartialEq, Eq, Hash)]
136pub struct VersionKey {
137    pub table_id: u32,
138    pub row_id: u64,
139}
140
141/// State of a version.
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum VersionState {
144    Active,
145    Committed(u64),
146    Aborted,
147}
148
149/// A single version in the version chain.
150#[derive(Debug, Clone)]
151pub struct Version {
152    pub key: VersionKey,
153    pub created_by: TransactionId,
154    pub state: VersionState,
155    pub data: Vec<u8>,
156    pub prev_version: Option<Box<Version>>,
157}
158
159impl Version {
160    pub fn new(key: VersionKey, created_by: TransactionId, data: Vec<u8>) -> Self {
161        Self {
162            key,
163            created_by,
164            state: VersionState::Active,
165            data,
166            prev_version: None,
167        }
168    }
169
170    pub fn commit(&mut self, commit_timestamp: u64) {
171        self.state = VersionState::Committed(commit_timestamp);
172    }
173
174    pub fn abort(&mut self) {
175        self.state = VersionState::Aborted;
176    }
177}
178
179// =============================================================================
180// Lock Management
181// =============================================================================
182
183/// Type of lock.
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum LockMode {
186    Shared,
187    Exclusive,
188    IntentShared,
189    IntentExclusive,
190    Update,
191}
192
193impl LockMode {
194    /// Check if two lock modes are compatible.
195    pub fn is_compatible(&self, other: &LockMode) -> bool {
196        use LockMode::*;
197        matches!(
198            (self, other),
199            (Shared, Shared)
200                | (Shared, IntentShared)
201                | (IntentShared, Shared)
202                | (IntentShared, IntentShared)
203                | (IntentShared, IntentExclusive)
204                | (IntentExclusive, IntentShared)
205                | (IntentExclusive, IntentExclusive)
206        )
207    }
208}
209
210/// A lock request.
211#[derive(Debug, Clone)]
212pub struct LockRequest {
213    pub tx_id: TransactionId,
214    pub key: VersionKey,
215    pub mode: LockMode,
216    pub granted: bool,
217}
218
219/// Entry in the lock table.
220#[derive(Debug, Default)]
221struct LockEntry {
222    holders: Vec<LockRequest>,
223    waiters: Vec<LockRequest>,
224}
225
226/// Lock manager for concurrency control.
227pub struct LockManager {
228    locks: RwLock<HashMap<VersionKey, LockEntry>>,
229    timeout: Duration,
230}
231
232impl LockManager {
233    pub fn new(timeout: Duration) -> Self {
234        Self {
235            locks: RwLock::new(HashMap::new()),
236            timeout,
237        }
238    }
239
240    /// Acquire a lock, blocking if necessary.
241    pub fn acquire(&self, request: LockRequest) -> Result<()> {
242        let start = Instant::now();
243
244        loop {
245            {
246                let mut locks = self.locks.write();
247                let entry = locks.entry(request.key.clone()).or_default();
248
249                let can_grant = entry
250                    .holders
251                    .iter()
252                    .all(|h| h.tx_id == request.tx_id || h.mode.is_compatible(&request.mode));
253
254                if can_grant {
255                    entry.holders.push(LockRequest {
256                        granted: true,
257                        ..request.clone()
258                    });
259                    return Ok(());
260                }
261
262                if !entry.waiters.iter().any(|w| w.tx_id == request.tx_id) {
263                    entry.waiters.push(request.clone());
264                }
265            }
266
267            if start.elapsed() > self.timeout {
268                self.release_waiter(&request);
269                return Err(AegisError::LockTimeout);
270            }
271
272            std::thread::sleep(Duration::from_millis(1));
273        }
274    }
275
276    /// Try to acquire a lock without blocking.
277    pub fn try_acquire(&self, request: LockRequest) -> Result<bool> {
278        let mut locks = self.locks.write();
279        let entry = locks.entry(request.key.clone()).or_default();
280
281        let can_grant = entry
282            .holders
283            .iter()
284            .all(|h| h.tx_id == request.tx_id || h.mode.is_compatible(&request.mode));
285
286        if can_grant {
287            entry.holders.push(LockRequest {
288                granted: true,
289                ..request
290            });
291            Ok(true)
292        } else {
293            Ok(false)
294        }
295    }
296
297    /// Release a lock.
298    pub fn release(&self, tx_id: TransactionId, key: &VersionKey) {
299        let mut locks = self.locks.write();
300
301        if let Some(entry) = locks.get_mut(key) {
302            entry.holders.retain(|h| h.tx_id != tx_id);
303
304            while !entry.waiters.is_empty() {
305                let waiter = entry.waiters.remove(0);
306                let can_grant = entry
307                    .holders
308                    .iter()
309                    .all(|h| h.mode.is_compatible(&waiter.mode));
310
311                if can_grant {
312                    entry.holders.push(LockRequest {
313                        granted: true,
314                        ..waiter
315                    });
316                } else {
317                    entry.waiters.insert(0, waiter);
318                    break;
319                }
320            }
321
322            if entry.holders.is_empty() && entry.waiters.is_empty() {
323                locks.remove(key);
324            }
325        }
326    }
327
328    /// Release all locks held by a transaction.
329    pub fn release_all(&self, tx_id: TransactionId) {
330        let mut locks = self.locks.write();
331        let keys: Vec<_> = locks.keys().cloned().collect();
332
333        for key in keys {
334            if let Some(entry) = locks.get_mut(&key) {
335                entry.holders.retain(|h| h.tx_id != tx_id);
336                entry.waiters.retain(|w| w.tx_id != tx_id);
337
338                if entry.holders.is_empty() && entry.waiters.is_empty() {
339                    locks.remove(&key);
340                }
341            }
342        }
343    }
344
345    fn release_waiter(&self, request: &LockRequest) {
346        let mut locks = self.locks.write();
347        if let Some(entry) = locks.get_mut(&request.key) {
348            entry.waiters.retain(|w| w.tx_id != request.tx_id);
349        }
350    }
351}
352
353// =============================================================================
354// Transaction Manager
355// =============================================================================
356
357/// Manages all active transactions.
358pub struct TransactionManager {
359    transactions: RwLock<HashMap<TransactionId, Transaction>>,
360    next_tx_id: AtomicU64,
361    next_timestamp: AtomicU64,
362    lock_manager: LockManager,
363    versions: RwLock<HashMap<VersionKey, Version>>,
364}
365
366impl TransactionManager {
367    pub fn new() -> Self {
368        Self {
369            transactions: RwLock::new(HashMap::new()),
370            next_tx_id: AtomicU64::new(1),
371            next_timestamp: AtomicU64::new(1),
372            lock_manager: LockManager::new(Duration::from_secs(30)),
373            versions: RwLock::new(HashMap::new()),
374        }
375    }
376
377    /// Begin a new transaction.
378    pub fn begin(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
379        let tx_id = TransactionId(self.next_tx_id.fetch_add(1, Ordering::SeqCst));
380        let start_ts = self.next_timestamp.fetch_add(1, Ordering::SeqCst);
381
382        let active_txs: HashSet<_> = self
383            .transactions
384            .read()
385            .iter()
386            .filter(|(_, tx)| tx.is_active())
387            .map(|(id, _)| *id)
388            .collect();
389
390        let transaction = Transaction::new(tx_id, isolation_level, start_ts, active_txs);
391
392        self.transactions.write().insert(tx_id, transaction);
393
394        Ok(tx_id)
395    }
396
397    /// Commit a transaction.
398    pub fn commit(&self, tx_id: TransactionId) -> Result<()> {
399        let commit_ts = self.next_timestamp.fetch_add(1, Ordering::SeqCst);
400
401        {
402            let mut txs = self.transactions.write();
403            let tx = txs
404                .get_mut(&tx_id)
405                .ok_or_else(|| AegisError::Transaction("Transaction not found".to_string()))?;
406
407            if tx.state != TransactionState::Active {
408                return Err(AegisError::Transaction(
409                    "Transaction not active".to_string(),
410                ));
411            }
412
413            if tx.isolation_level == IsolationLevel::Serializable {
414                self.validate_serializable(tx)?;
415            }
416
417            tx.state = TransactionState::Preparing;
418            tx.commit_timestamp = Some(commit_ts);
419        }
420
421        {
422            let mut versions = self.versions.write();
423            let txs = self.transactions.read();
424            let tx = txs.get(&tx_id).ok_or_else(|| {
425                AegisError::Transaction("Transaction disappeared during commit".to_string())
426            })?;
427
428            for key in &tx.write_set {
429                if let Some(version) = versions.get_mut(key) {
430                    if version.created_by == tx_id {
431                        version.commit(commit_ts);
432                    }
433                }
434            }
435        }
436
437        {
438            let mut txs = self.transactions.write();
439            if let Some(tx) = txs.get_mut(&tx_id) {
440                tx.state = TransactionState::Committed;
441            }
442        }
443
444        self.lock_manager.release_all(tx_id);
445
446        Ok(())
447    }
448
449    /// Abort a transaction.
450    pub fn abort(&self, tx_id: TransactionId) -> Result<()> {
451        {
452            let mut txs = self.transactions.write();
453            let tx = txs
454                .get_mut(&tx_id)
455                .ok_or_else(|| AegisError::Transaction("Transaction not found".to_string()))?;
456
457            tx.state = TransactionState::Aborted;
458        }
459
460        {
461            let mut versions = self.versions.write();
462            let txs = self.transactions.read();
463            let tx = txs.get(&tx_id).ok_or_else(|| {
464                AegisError::Transaction("Transaction disappeared during abort".to_string())
465            })?;
466
467            for key in &tx.write_set {
468                if let Some(version) = versions.get_mut(key) {
469                    if version.created_by == tx_id {
470                        version.abort();
471                    }
472                }
473            }
474        }
475
476        self.lock_manager.release_all(tx_id);
477
478        Ok(())
479    }
480
481    /// Read a version visible to the transaction.
482    pub fn read(&self, tx_id: TransactionId, key: &VersionKey) -> Result<Option<Vec<u8>>> {
483        let txs = self.transactions.read();
484        let tx = txs
485            .get(&tx_id)
486            .ok_or_else(|| AegisError::Transaction("Transaction not found".to_string()))?;
487
488        if !tx.is_active() {
489            return Err(AegisError::Transaction(
490                "Transaction not active".to_string(),
491            ));
492        }
493
494        let versions = self.versions.read();
495        if let Some(version) = versions.get(key) {
496            if tx.snapshot.is_visible(version) {
497                return Ok(Some(version.data.clone()));
498            }
499
500            let mut current = version.prev_version.as_ref();
501            while let Some(v) = current {
502                if tx.snapshot.is_visible(v) {
503                    return Ok(Some(v.data.clone()));
504                }
505                current = v.prev_version.as_ref();
506            }
507        }
508
509        Ok(None)
510    }
511
512    /// Write a new version.
513    pub fn write(&self, tx_id: TransactionId, key: VersionKey, data: Vec<u8>) -> Result<()> {
514        {
515            let mut txs = self.transactions.write();
516            let tx = txs
517                .get_mut(&tx_id)
518                .ok_or_else(|| AegisError::Transaction("Transaction not found".to_string()))?;
519
520            if !tx.is_active() {
521                return Err(AegisError::Transaction(
522                    "Transaction not active".to_string(),
523                ));
524            }
525
526            tx.add_to_write_set(key.clone());
527        }
528
529        let lock_request = LockRequest {
530            tx_id,
531            key: key.clone(),
532            mode: LockMode::Exclusive,
533            granted: false,
534        };
535        self.lock_manager.acquire(lock_request)?;
536
537        {
538            let txs = self.transactions.read();
539            let tx = txs.get(&tx_id).ok_or_else(|| {
540                AegisError::Transaction("Transaction disappeared during write".to_string())
541            })?;
542            tx.locks_held.len(); // Just to use tx
543        }
544
545        let mut versions = self.versions.write();
546        let new_version = Version::new(key.clone(), tx_id, data);
547
548        if let Some(existing) = versions.remove(&key) {
549            let mut new_v = new_version;
550            new_v.prev_version = Some(Box::new(existing));
551            versions.insert(key, new_v);
552        } else {
553            versions.insert(key, new_version);
554        }
555
556        Ok(())
557    }
558
559    /// Delete a version.
560    pub fn delete(&self, tx_id: TransactionId, key: &VersionKey) -> Result<()> {
561        self.write(tx_id, key.clone(), Vec::new())
562    }
563
564    /// Get transaction statistics.
565    pub fn stats(&self) -> TransactionStats {
566        let txs = self.transactions.read();
567        let versions = self.versions.read();
568        let mut active = 0;
569        let mut committed = 0;
570        let mut aborted = 0;
571
572        for tx in txs.values() {
573            match tx.state {
574                TransactionState::Active | TransactionState::Preparing => active += 1,
575                TransactionState::Committed => committed += 1,
576                TransactionState::Aborted => aborted += 1,
577            }
578        }
579
580        // Count total versions (including history)
581        let mut version_count = 0;
582        for version in versions.values() {
583            version_count += 1;
584            let mut prev = version.prev_version.as_ref();
585            while let Some(v) = prev {
586                version_count += 1;
587                prev = v.prev_version.as_ref();
588            }
589        }
590
591        TransactionStats {
592            active,
593            committed,
594            aborted,
595            total: txs.len(),
596            version_count,
597        }
598    }
599
600    // ==========================================================================
601    // Garbage Collection
602    // ==========================================================================
603
604    /// Get the minimum active timestamp (low watermark).
605    /// Versions older than this are potentially garbage.
606    fn get_min_active_timestamp(&self) -> u64 {
607        let txs = self.transactions.read();
608        txs.values()
609            .filter(|tx| tx.is_active())
610            .map(|tx| tx.start_timestamp)
611            .min()
612            .unwrap_or(u64::MAX)
613    }
614
615    /// Run garbage collection to remove old versions that are no longer visible.
616    /// Returns the number of versions collected.
617    pub fn run_gc(&self) -> GcStats {
618        let min_ts = self.get_min_active_timestamp();
619        let mut versions_collected = 0;
620
621        // Clean up old version chains
622        {
623            let mut versions = self.versions.write();
624            for version in versions.values_mut() {
625                versions_collected += Self::gc_version_chain(version, min_ts);
626            }
627        }
628
629        // Clean up old completed transactions
630        let transactions_cleaned = {
631            let mut txs = self.transactions.write();
632            let to_remove: Vec<TransactionId> = txs
633                .iter()
634                .filter(|(_, tx)| {
635                    match tx.state {
636                        TransactionState::Committed | TransactionState::Aborted => {
637                            // Remove if older than min active timestamp
638                            tx.commit_timestamp.unwrap_or(tx.start_timestamp) < min_ts
639                        }
640                        _ => false,
641                    }
642                })
643                .map(|(id, _)| *id)
644                .collect();
645
646            let count = to_remove.len();
647            for id in to_remove {
648                txs.remove(&id);
649            }
650            count
651        };
652
653        GcStats {
654            versions_collected,
655            transactions_cleaned,
656            min_active_timestamp: min_ts,
657        }
658    }
659
660    /// Recursively garbage collect a version chain.
661    /// Returns the number of versions removed.
662    ///
663    /// We can only remove a version if:
664    /// 1. The current version is committed (has a valid successor)
665    /// 2. The previous version's commit_ts is less than min_ts
666    /// 3. The current version's commit_ts is also less than min_ts
667    ///    (meaning no active transaction could need to see the previous version)
668    fn gc_version_chain(version: &mut Version, min_ts: u64) -> usize {
669        let mut collected = 0;
670
671        // Check if we can truncate the version chain
672        if let Some(ref mut prev) = version.prev_version {
673            // First, recursively GC the older versions
674            collected += Self::gc_version_chain(prev, min_ts);
675
676            // Handle aborted versions - they can always be removed
677            if prev.state == VersionState::Aborted {
678                version.prev_version = prev.prev_version.take();
679                collected += 1;
680                return collected;
681            }
682
683            // For committed versions, we can only remove the previous version if:
684            // - The current version is committed
685            // - Both versions have commit timestamps less than min_ts
686            // This ensures no active transaction needs to see the older version
687            if let VersionState::Committed(curr_commit_ts) = version.state {
688                if let VersionState::Committed(prev_commit_ts) = prev.state {
689                    // Only remove if the current version is also old enough
690                    // that any transaction needing to read would see the current version
691                    if prev_commit_ts < min_ts && curr_commit_ts < min_ts {
692                        version.prev_version = None;
693                        collected += 1;
694                    }
695                }
696            }
697        }
698
699        collected
700    }
701
702    /// Run garbage collection with a threshold.
703    /// Only runs if there are more than `threshold` versions.
704    pub fn run_gc_if_needed(&self, threshold: usize) -> Option<GcStats> {
705        let stats = self.stats();
706        if stats.version_count > threshold {
707            Some(self.run_gc())
708        } else {
709            None
710        }
711    }
712
713    fn validate_serializable(&self, tx: &Transaction) -> Result<()> {
714        let txs = self.transactions.read();
715
716        for other_tx in txs.values() {
717            if other_tx.id == tx.id {
718                continue;
719            }
720
721            if other_tx.state != TransactionState::Committed {
722                continue;
723            }
724
725            if let Some(commit_ts) = other_tx.commit_timestamp {
726                if commit_ts > tx.start_timestamp {
727                    for read_key in &tx.read_set {
728                        if other_tx.write_set.contains(read_key) {
729                            return Err(AegisError::SerializationFailure);
730                        }
731                    }
732                }
733            }
734        }
735
736        Ok(())
737    }
738}
739
740impl Default for TransactionManager {
741    fn default() -> Self {
742        Self::new()
743    }
744}
745
746// =============================================================================
747// Statistics
748// =============================================================================
749
750/// Transaction manager statistics.
751#[derive(Debug, Clone)]
752pub struct TransactionStats {
753    pub active: usize,
754    pub committed: usize,
755    pub aborted: usize,
756    pub total: usize,
757    /// Total number of versions stored (including historical versions)
758    pub version_count: usize,
759}
760
761/// Statistics from a garbage collection run.
762#[derive(Debug, Clone)]
763pub struct GcStats {
764    /// Number of old versions removed from version chains
765    pub versions_collected: usize,
766    /// Number of completed transactions removed
767    pub transactions_cleaned: usize,
768    /// The minimum active timestamp used as the low watermark
769    pub min_active_timestamp: u64,
770}
771
772// =============================================================================
773// Tests
774// =============================================================================
775
776#[cfg(test)]
777mod tests {
778    use super::*;
779
780    #[test]
781    fn test_transaction_lifecycle() {
782        let tm = TransactionManager::new();
783
784        let tx_id = tm.begin(IsolationLevel::RepeatableRead).unwrap();
785        assert!(tm.transactions.read().get(&tx_id).unwrap().is_active());
786
787        tm.commit(tx_id).unwrap();
788        assert_eq!(
789            tm.transactions.read().get(&tx_id).unwrap().state,
790            TransactionState::Committed
791        );
792    }
793
794    #[test]
795    fn test_transaction_abort() {
796        let tm = TransactionManager::new();
797
798        let tx_id = tm.begin(IsolationLevel::RepeatableRead).unwrap();
799        tm.abort(tx_id).unwrap();
800
801        assert_eq!(
802            tm.transactions.read().get(&tx_id).unwrap().state,
803            TransactionState::Aborted
804        );
805    }
806
807    #[test]
808    fn test_mvcc_read_write() {
809        let tm = TransactionManager::new();
810
811        let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
812        let key = VersionKey {
813            table_id: 1,
814            row_id: 1,
815        };
816
817        tm.write(tx1, key.clone(), b"hello".to_vec()).unwrap();
818        tm.commit(tx1).unwrap();
819
820        let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
821        let data = tm.read(tx2, &key).unwrap();
822        assert_eq!(data, Some(b"hello".to_vec()));
823        tm.commit(tx2).unwrap();
824    }
825
826    #[test]
827    fn test_snapshot_isolation() {
828        let tm = TransactionManager::new();
829
830        let key = VersionKey {
831            table_id: 1,
832            row_id: 1,
833        };
834
835        let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
836        tm.write(tx1, key.clone(), b"v1".to_vec()).unwrap();
837        tm.commit(tx1).unwrap();
838
839        let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
840
841        let tx3 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
842        tm.write(tx3, key.clone(), b"v2".to_vec()).unwrap();
843        tm.commit(tx3).unwrap();
844
845        let data = tm.read(tx2, &key).unwrap();
846        assert_eq!(data, Some(b"v1".to_vec()));
847
848        tm.commit(tx2).unwrap();
849    }
850
851    #[test]
852    fn test_lock_compatibility() {
853        assert!(LockMode::Shared.is_compatible(&LockMode::Shared));
854        assert!(!LockMode::Shared.is_compatible(&LockMode::Exclusive));
855        assert!(!LockMode::Exclusive.is_compatible(&LockMode::Exclusive));
856        assert!(!LockMode::Exclusive.is_compatible(&LockMode::Shared));
857    }
858
859    #[test]
860    fn test_lock_manager() {
861        let lm = LockManager::new(Duration::from_secs(1));
862        let key = VersionKey {
863            table_id: 1,
864            row_id: 1,
865        };
866
867        let req1 = LockRequest {
868            tx_id: TransactionId(1),
869            key: key.clone(),
870            mode: LockMode::Shared,
871            granted: false,
872        };
873
874        assert!(lm.try_acquire(req1).unwrap());
875
876        let req2 = LockRequest {
877            tx_id: TransactionId(2),
878            key: key.clone(),
879            mode: LockMode::Shared,
880            granted: false,
881        };
882        assert!(lm.try_acquire(req2).unwrap());
883
884        let req3 = LockRequest {
885            tx_id: TransactionId(3),
886            key: key.clone(),
887            mode: LockMode::Exclusive,
888            granted: false,
889        };
890        assert!(!lm.try_acquire(req3).unwrap());
891
892        lm.release(TransactionId(1), &key);
893        lm.release(TransactionId(2), &key);
894
895        let req4 = LockRequest {
896            tx_id: TransactionId(3),
897            key: key.clone(),
898            mode: LockMode::Exclusive,
899            granted: false,
900        };
901        assert!(lm.try_acquire(req4).unwrap());
902    }
903
904    #[test]
905    fn test_gc_cleans_old_transactions() {
906        let tm = TransactionManager::new();
907
908        // Create and commit several transactions
909        let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
910        tm.commit(tx1).unwrap();
911
912        let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
913        tm.commit(tx2).unwrap();
914
915        let tx3 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
916        tm.abort(tx3).unwrap();
917
918        // All completed transactions should be present
919        assert_eq!(tm.stats().total, 3);
920        assert_eq!(tm.stats().committed, 2);
921        assert_eq!(tm.stats().aborted, 1);
922
923        // No active transactions, so GC should clean all completed ones
924        let gc_stats = tm.run_gc();
925        assert_eq!(gc_stats.transactions_cleaned, 3);
926
927        // Transactions should be cleaned up
928        assert_eq!(tm.stats().total, 0);
929    }
930
931    #[test]
932    fn test_gc_preserves_active_transaction_visible_versions() {
933        let tm = TransactionManager::new();
934        let key = VersionKey {
935            table_id: 1,
936            row_id: 1,
937        };
938
939        // Create first version
940        let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
941        tm.write(tx1, key.clone(), b"v1".to_vec()).unwrap();
942        tm.commit(tx1).unwrap();
943
944        // Start a long-running transaction
945        let tx_long = tm.begin(IsolationLevel::RepeatableRead).unwrap();
946
947        // Create second version
948        let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
949        tm.write(tx2, key.clone(), b"v2".to_vec()).unwrap();
950        tm.commit(tx2).unwrap();
951
952        // Long transaction should still see v1
953        let data = tm.read(tx_long, &key).unwrap();
954        assert_eq!(data, Some(b"v1".to_vec()));
955
956        // GC should not remove v1 because tx_long still needs it
957        let _gc_stats = tm.run_gc();
958        // tx1 and tx2 are committed and older than tx_long, they can be cleaned
959        // but the version chain should be preserved for tx_long
960
961        // Long transaction should still be able to read v1
962        let data = tm.read(tx_long, &key).unwrap();
963        assert_eq!(data, Some(b"v1".to_vec()));
964
965        tm.commit(tx_long).unwrap();
966    }
967
968    #[test]
969    fn test_gc_removes_aborted_versions() {
970        let tm = TransactionManager::new();
971        let key = VersionKey {
972            table_id: 1,
973            row_id: 1,
974        };
975
976        // Create first version
977        let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
978        tm.write(tx1, key.clone(), b"v1".to_vec()).unwrap();
979        tm.commit(tx1).unwrap();
980
981        // Create and abort a version
982        let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
983        tm.write(tx2, key.clone(), b"v2_aborted".to_vec()).unwrap();
984        tm.abort(tx2).unwrap();
985
986        // Create third version
987        let tx3 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
988        tm.write(tx3, key.clone(), b"v3".to_vec()).unwrap();
989        tm.commit(tx3).unwrap();
990
991        // GC should clean up aborted versions
992        let gc_stats = tm.run_gc();
993        assert!(gc_stats.versions_collected > 0 || gc_stats.transactions_cleaned > 0);
994
995        // New transaction should see v3
996        let tx4 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
997        let data = tm.read(tx4, &key).unwrap();
998        assert_eq!(data, Some(b"v3".to_vec()));
999        tm.commit(tx4).unwrap();
1000    }
1001
1002    #[test]
1003    fn test_stats_includes_version_count() {
1004        let tm = TransactionManager::new();
1005        let key = VersionKey {
1006            table_id: 1,
1007            row_id: 1,
1008        };
1009
1010        assert_eq!(tm.stats().version_count, 0);
1011
1012        let tx1 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
1013        tm.write(tx1, key.clone(), b"v1".to_vec()).unwrap();
1014        tm.commit(tx1).unwrap();
1015
1016        assert_eq!(tm.stats().version_count, 1);
1017
1018        let tx2 = tm.begin(IsolationLevel::RepeatableRead).unwrap();
1019        tm.write(tx2, key.clone(), b"v2".to_vec()).unwrap();
1020        tm.commit(tx2).unwrap();
1021
1022        // Now we have 2 versions in the chain
1023        assert_eq!(tm.stats().version_count, 2);
1024    }
1025
1026    #[test]
1027    fn test_run_gc_if_needed() {
1028        let tm = TransactionManager::new();
1029        let key = VersionKey {
1030            table_id: 1,
1031            row_id: 1,
1032        };
1033
1034        // Should not run GC with high threshold
1035        assert!(tm.run_gc_if_needed(100).is_none());
1036
1037        // Create some versions
1038        for i in 0..5 {
1039            let tx = tm.begin(IsolationLevel::RepeatableRead).unwrap();
1040            tm.write(tx, key.clone(), format!("v{}", i).into_bytes())
1041                .unwrap();
1042            tm.commit(tx).unwrap();
1043        }
1044
1045        // Should now have 5 versions in the chain
1046        assert_eq!(tm.stats().version_count, 5);
1047
1048        // Should run GC with low threshold
1049        let result = tm.run_gc_if_needed(3);
1050        assert!(result.is_some());
1051    }
1052}