rustlite_core/
transaction.rs

1//! Transaction management module with MVCC.
2//!
3//! Provides ACID transaction support using Multi-Version Concurrency Control (MVCC).
4//! Implements snapshot isolation with timestamp-based versioning.
5
6use crate::{Error, Result};
7use std::collections::{BTreeMap, HashMap};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12/// Transaction isolation levels
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14pub enum IsolationLevel {
15    /// Read uncommitted (lowest isolation)
16    ReadUncommitted,
17    /// Read committed
18    ReadCommitted,
19    /// Repeatable read (snapshot isolation)
20    #[default]
21    RepeatableRead,
22    /// Serializable (highest isolation)
23    Serializable,
24}
25
26/// Transaction ID (monotonically increasing)
27pub type TransactionId = u64;
28
29/// Timestamp for MVCC versioning
30pub type Timestamp = u64;
31
32/// A versioned value in MVCC
33#[derive(Debug, Clone)]
34pub struct VersionedValue {
35    /// The actual value
36    pub value: Option<Vec<u8>>,
37    /// Transaction ID that created this version
38    pub txn_id: TransactionId,
39    /// Timestamp when this version was created
40    pub created_at: Timestamp,
41    /// Timestamp when this version was deleted (None if still valid)
42    pub deleted_at: Option<Timestamp>,
43    /// Whether this version is committed
44    pub committed: bool,
45}
46
47impl VersionedValue {
48    /// Check if this version is visible to a transaction with given snapshot timestamp
49    pub fn is_visible(&self, snapshot_ts: Timestamp, current_txn_id: TransactionId) -> bool {
50        // Version must be committed (or created by current transaction)
51        if !self.committed && self.txn_id != current_txn_id {
52            return false;
53        }
54
55        // Version must be created before snapshot
56        if self.created_at > snapshot_ts && self.txn_id != current_txn_id {
57            return false;
58        }
59
60        // Version must not be deleted before snapshot
61        if let Some(deleted_ts) = self.deleted_at {
62            if deleted_ts <= snapshot_ts {
63                return false;
64            }
65        }
66
67        // Value must exist (not a delete marker for uncommitted txn)
68        self.value.is_some()
69    }
70}
71
72/// MVCC version chain for a key
73#[derive(Debug, Clone)]
74pub struct VersionChain {
75    /// All versions of this key, sorted by timestamp (newest first)
76    versions: Vec<VersionedValue>,
77}
78
79impl VersionChain {
80    /// Create a new version chain
81    pub fn new() -> Self {
82        Self {
83            versions: Vec::new(),
84        }
85    }
86
87    /// Add a new version (inserts at front)
88    pub fn add_version(&mut self, version: VersionedValue) {
89        self.versions.insert(0, version);
90    }
91
92    /// Get the visible version for a transaction
93    pub fn get_visible(
94        &self,
95        snapshot_ts: Timestamp,
96        current_txn_id: TransactionId,
97    ) -> Option<Vec<u8>> {
98        for version in &self.versions {
99            if version.is_visible(snapshot_ts, current_txn_id) {
100                return version.value.clone();
101            }
102        }
103        None
104    }
105
106    /// Mark all versions created by a transaction as committed
107    pub fn commit_transaction(&mut self, txn_id: TransactionId) {
108        for version in &mut self.versions {
109            if version.txn_id == txn_id {
110                version.committed = true;
111            }
112        }
113    }
114
115    /// Remove all versions created by a transaction (for rollback)
116    pub fn rollback_transaction(&mut self, txn_id: TransactionId) {
117        self.versions.retain(|v| v.txn_id != txn_id);
118    }
119
120    /// Garbage collect versions older than the oldest active snapshot
121    pub fn gc(&mut self, min_active_ts: Timestamp) {
122        // Keep only the first committed version visible to oldest snapshot
123        let mut found_visible = false;
124        self.versions.retain(|v| {
125            if found_visible && v.committed && v.created_at < min_active_ts {
126                false
127            } else {
128                if v.committed && v.created_at <= min_active_ts {
129                    found_visible = true;
130                }
131                true
132            }
133        });
134    }
135}
136
137impl Default for VersionChain {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143/// MVCC storage for versioned data
144pub struct MVCCStorage {
145    /// Version chains for each key
146    data: RwLock<HashMap<Vec<u8>, VersionChain>>,
147}
148
149impl MVCCStorage {
150    /// Create new MVCC storage
151    pub fn new() -> Self {
152        Self {
153            data: RwLock::new(HashMap::new()),
154        }
155    }
156
157    /// Read a key with MVCC snapshot isolation
158    pub fn read(
159        &self,
160        key: &[u8],
161        snapshot_ts: Timestamp,
162        txn_id: TransactionId,
163    ) -> Result<Option<Vec<u8>>> {
164        let data = self.data.read().map_err(|_| Error::LockPoisoned)?;
165
166        if let Some(chain) = data.get(key) {
167            Ok(chain.get_visible(snapshot_ts, txn_id))
168        } else {
169            Ok(None)
170        }
171    }
172
173    /// Write a key (creates a new version)
174    pub fn write(
175        &self,
176        key: Vec<u8>,
177        value: Vec<u8>,
178        txn_id: TransactionId,
179        timestamp: Timestamp,
180    ) -> Result<()> {
181        let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
182
183        let chain = data.entry(key).or_insert_with(VersionChain::new);
184
185        chain.add_version(VersionedValue {
186            value: Some(value),
187            txn_id,
188            created_at: timestamp,
189            deleted_at: None,
190            committed: false,
191        });
192
193        Ok(())
194    }
195
196    /// Delete a key (creates a delete marker)
197    pub fn delete(&self, key: &[u8], txn_id: TransactionId, timestamp: Timestamp) -> Result<()> {
198        let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
199
200        let chain = data.entry(key.to_vec()).or_insert_with(VersionChain::new);
201
202        // Mark previous version as deleted
203        if let Some(prev) = chain.versions.first_mut() {
204            if prev.txn_id != txn_id {
205                prev.deleted_at = Some(timestamp);
206            }
207        }
208
209        // Add delete marker for this transaction
210        chain.add_version(VersionedValue {
211            value: None,
212            txn_id,
213            created_at: timestamp,
214            deleted_at: None,
215            committed: false,
216        });
217
218        Ok(())
219    }
220
221    /// Commit all versions for a transaction
222    pub fn commit(&self, txn_id: TransactionId) -> Result<()> {
223        let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
224
225        for chain in data.values_mut() {
226            chain.commit_transaction(txn_id);
227        }
228
229        Ok(())
230    }
231
232    /// Rollback all versions for a transaction
233    pub fn rollback(&self, txn_id: TransactionId) -> Result<()> {
234        let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
235
236        for chain in data.values_mut() {
237            chain.rollback_transaction(txn_id);
238        }
239
240        // Remove empty chains
241        data.retain(|_, chain| !chain.versions.is_empty());
242
243        Ok(())
244    }
245
246    /// Garbage collect old versions
247    pub fn gc(&self, min_active_ts: Timestamp) -> Result<()> {
248        let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
249
250        for chain in data.values_mut() {
251            chain.gc(min_active_ts);
252        }
253
254        Ok(())
255    }
256
257    /// Scan keys with prefix (for range queries)
258    pub fn scan_prefix(
259        &self,
260        prefix: &[u8],
261        snapshot_ts: Timestamp,
262        txn_id: TransactionId,
263    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
264        let data = self.data.read().map_err(|_| Error::LockPoisoned)?;
265
266        let mut results = Vec::new();
267        for (key, chain) in data.iter() {
268            if key.starts_with(prefix) {
269                if let Some(value) = chain.get_visible(snapshot_ts, txn_id) {
270                    results.push((key.clone(), value));
271                }
272            }
273        }
274
275        results.sort_by(|a, b| a.0.cmp(&b.0));
276        Ok(results)
277    }
278}
279
280impl Default for MVCCStorage {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286/// Active transaction information
287#[derive(Debug, Clone)]
288#[allow(dead_code)]
289struct ActiveTransaction {
290    txn_id: TransactionId,
291    snapshot_ts: Timestamp,
292    isolation: IsolationLevel,
293}
294
295/// Transaction Manager for MVCC
296pub struct TransactionManager {
297    /// Next transaction ID
298    next_txn_id: AtomicU64,
299    /// Next timestamp
300    next_timestamp: AtomicU64,
301    /// Active transactions
302    active_txns: RwLock<BTreeMap<TransactionId, ActiveTransaction>>,
303    /// MVCC storage
304    storage: Arc<MVCCStorage>,
305    /// Self reference for creating transactions
306    self_ref: RwLock<Option<std::sync::Weak<TransactionManager>>>,
307}
308
309impl TransactionManager {
310    /// Create a new transaction manager
311    pub fn new(storage: Arc<MVCCStorage>) -> Arc<Self> {
312        let manager = Arc::new(Self {
313            next_txn_id: AtomicU64::new(1),
314            next_timestamp: AtomicU64::new(Self::current_timestamp()),
315            active_txns: RwLock::new(BTreeMap::new()),
316            storage,
317            self_ref: RwLock::new(None),
318        });
319
320        // Store weak self-reference
321        *manager.self_ref.write().unwrap() = Some(Arc::downgrade(&manager));
322        manager
323    }
324
325    /// Get current timestamp (milliseconds since UNIX epoch)
326    fn current_timestamp() -> u64 {
327        SystemTime::now()
328            .duration_since(UNIX_EPOCH)
329            .unwrap()
330            .as_millis() as u64
331    }
332
333    /// Generate next transaction ID
334    fn next_txn_id(&self) -> TransactionId {
335        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
336    }
337
338    /// Generate next timestamp
339    fn next_timestamp(&self) -> Timestamp {
340        self.next_timestamp.fetch_add(1, Ordering::SeqCst)
341    }
342
343    /// Begin a new transaction
344    pub fn begin(self: &Arc<Self>, isolation: IsolationLevel) -> Result<Transaction> {
345        let txn_id = self.next_txn_id();
346        let snapshot_ts = self.next_timestamp();
347
348        let active_txn = ActiveTransaction {
349            txn_id,
350            snapshot_ts,
351            isolation,
352        };
353
354        {
355            let mut active = self.active_txns.write().map_err(|_| Error::LockPoisoned)?;
356            active.insert(txn_id, active_txn.clone());
357        }
358
359        Ok(Transaction {
360            txn_id,
361            snapshot_ts,
362            isolation,
363            storage: Arc::clone(&self.storage),
364            manager: Some(Arc::clone(self)),
365            write_set: RwLock::new(HashMap::new()),
366            committed: false,
367        })
368    }
369
370    /// Commit a transaction
371    pub fn commit(&self, txn_id: TransactionId) -> Result<()> {
372        // Validate no conflicts (simplified - just check write-write conflicts)
373        // In a full implementation, we'd do serializability validation here
374
375        // Commit in storage
376        self.storage.commit(txn_id)?;
377
378        // Remove from active transactions
379        {
380            let mut active = self.active_txns.write().map_err(|_| Error::LockPoisoned)?;
381            active.remove(&txn_id);
382        }
383
384        Ok(())
385    }
386
387    /// Rollback a transaction
388    pub fn rollback(&self, txn_id: TransactionId) -> Result<()> {
389        // Rollback in storage
390        self.storage.rollback(txn_id)?;
391
392        // Remove from active transactions
393        {
394            let mut active = self.active_txns.write().map_err(|_| Error::LockPoisoned)?;
395            active.remove(&txn_id);
396        }
397
398        Ok(())
399    }
400
401    /// Perform garbage collection
402    pub fn gc(&self) -> Result<()> {
403        // Find oldest active snapshot
404        let min_active_ts = {
405            let active = self.active_txns.read().map_err(|_| Error::LockPoisoned)?;
406            active
407                .values()
408                .map(|txn| txn.snapshot_ts)
409                .min()
410                .unwrap_or(self.next_timestamp())
411        };
412
413        self.storage.gc(min_active_ts)
414    }
415}
416
417/// A database transaction with MVCC support
418pub struct Transaction {
419    /// Transaction ID
420    pub txn_id: TransactionId,
421    /// Snapshot timestamp
422    snapshot_ts: Timestamp,
423    /// Isolation level
424    isolation: IsolationLevel,
425    /// Reference to MVCC storage
426    storage: Arc<MVCCStorage>,
427    /// Reference to transaction manager (for commit/rollback)
428    manager: Option<Arc<TransactionManager>>,
429    /// Write set for validation
430    write_set: RwLock<HashMap<Vec<u8>, Vec<u8>>>,
431    /// Whether transaction is committed
432    committed: bool,
433}
434
435impl Transaction {
436    /// Read a value with snapshot isolation
437    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
438        // Check write set first (read your own writes)
439        {
440            let write_set = self.write_set.read().map_err(|_| Error::LockPoisoned)?;
441            if let Some(value) = write_set.get(key) {
442                return Ok(Some(value.clone()));
443            }
444        }
445
446        // Read from MVCC storage with snapshot isolation
447        self.storage.read(key, self.snapshot_ts, self.txn_id)
448    }
449
450    /// Write a value (buffered until commit)
451    pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
452        // Add to write set
453        {
454            let mut write_set = self.write_set.write().map_err(|_| Error::LockPoisoned)?;
455            write_set.insert(key.clone(), value.clone());
456        }
457
458        // Write to MVCC storage (creates uncommitted version with snapshot timestamp)
459        self.storage
460            .write(key, value, self.txn_id, self.snapshot_ts)
461    }
462
463    /// Delete a key
464    pub fn delete(&mut self, key: &[u8]) -> Result<()> {
465        // Remove from write set if present
466        {
467            let mut write_set = self.write_set.write().map_err(|_| Error::LockPoisoned)?;
468            write_set.remove(key);
469        }
470
471        self.storage.delete(key, self.txn_id, self.snapshot_ts)
472    }
473
474    /// Scan keys with prefix
475    pub fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
476        self.storage
477            .scan_prefix(prefix, self.snapshot_ts, self.txn_id)
478    }
479
480    /// Commit the transaction
481    pub fn commit(mut self) -> Result<()> {
482        if self.committed {
483            return Err(Error::Transaction("Transaction already committed".into()));
484        }
485
486        if let Some(manager) = &self.manager {
487            manager.commit(self.txn_id)?;
488        } else {
489            self.storage.commit(self.txn_id)?;
490        }
491
492        self.committed = true;
493        Ok(())
494    }
495
496    /// Rollback the transaction
497    pub fn rollback(self) -> Result<()> {
498        if self.committed {
499            return Err(Error::Transaction("Transaction already committed".into()));
500        }
501
502        if let Some(manager) = &self.manager {
503            manager.rollback(self.txn_id)
504        } else {
505            self.storage.rollback(self.txn_id)
506        }
507    }
508
509    /// Get transaction ID
510    pub fn id(&self) -> TransactionId {
511        self.txn_id
512    }
513
514    /// Get isolation level
515    pub fn isolation_level(&self) -> IsolationLevel {
516        self.isolation
517    }
518}