Skip to main content

oxirs_core/storage/
mvcc.rs

1//! Multi-Version Concurrency Control (MVCC) for RDF storage
2//!
3//! This module implements MVCC to enable high-concurrency read/write operations
4//! by maintaining multiple timestamped versions of RDF triples.
5
6use crate::model::{Object, Predicate, Subject, Triple};
7use anyhow::{anyhow, Result};
8use dashmap::DashMap;
9use parking_lot::{Mutex, RwLock};
10use std::collections::{BTreeMap, HashMap, HashSet};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15/// Transaction timestamp (logical timestamp)
16pub type Timestamp = u64;
17
18/// Transaction ID
19pub type TransactionId = u64;
20
21/// Version identifier
22pub type VersionId = u64;
23
24/// MVCC configuration
25#[derive(Debug, Clone)]
26pub struct MvccConfig {
27    /// Maximum number of versions to keep per triple
28    pub max_versions_per_triple: usize,
29
30    /// Garbage collection interval
31    pub gc_interval: Duration,
32
33    /// Minimum age for version cleanup (prevents cleaning active versions)
34    pub min_version_age: Duration,
35
36    /// Enable snapshot isolation
37    pub enable_snapshot_isolation: bool,
38
39    /// Enable read-your-writes consistency
40    pub enable_read_your_writes: bool,
41
42    /// Conflict detection strategy
43    pub conflict_detection: ConflictDetection,
44}
45
46impl Default for MvccConfig {
47    fn default() -> Self {
48        Self {
49            max_versions_per_triple: 100,
50            gc_interval: Duration::from_secs(60),
51            min_version_age: Duration::from_secs(300), // 5 minutes
52            enable_snapshot_isolation: true,
53            enable_read_your_writes: true,
54            conflict_detection: ConflictDetection::Optimistic,
55        }
56    }
57}
58
59/// Conflict detection strategy
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ConflictDetection {
62    /// Optimistic concurrency control
63    Optimistic,
64    /// Optimistic two-phase locking
65    OptimisticTwoPhase,
66    /// Pessimistic locking
67    Pessimistic,
68    /// Multi-version timestamp ordering
69    TimestampOrdering,
70}
71
72/// MVCC store for RDF triples
73pub struct MvccStore {
74    /// Configuration
75    config: MvccConfig,
76
77    /// Version storage (triple hash -> versions)
78    versions: Arc<DashMap<TripleKey, VersionChain>>,
79
80    /// Active transactions
81    transactions: Arc<DashMap<TransactionId, TransactionState>>,
82
83    /// Global timestamp counter
84    timestamp_counter: Arc<AtomicU64>,
85
86    /// Transaction ID counter
87    transaction_counter: Arc<AtomicU64>,
88
89    /// Snapshot registry
90    snapshots: Arc<RwLock<BTreeMap<Timestamp, SnapshotInfo>>>,
91
92    /// Garbage collection state
93    gc_state: Arc<Mutex<GarbageCollectionState>>,
94
95    /// Index for efficient queries
96    indexes: Arc<MvccIndexes>,
97}
98
99/// Key for identifying a triple
100#[derive(Debug, Clone, Hash, Eq, PartialEq)]
101pub struct TripleKey {
102    subject: String,
103    predicate: String,
104    object: String,
105}
106
107impl TripleKey {
108    fn from_triple(triple: &Triple) -> Self {
109        Self {
110            subject: triple.subject().to_string(),
111            predicate: triple.predicate().to_string(),
112            object: triple.object().to_string(),
113        }
114    }
115}
116
117/// Version chain for a triple
118#[derive(Debug, Clone)]
119pub struct VersionChain {
120    /// Versions sorted by timestamp (newest first)
121    versions: Vec<Version>,
122}
123
124impl VersionChain {
125    fn new() -> Self {
126        Self {
127            versions: Vec::new(),
128        }
129    }
130
131    /// Add a new version
132    fn add_version(&mut self, version: Version) {
133        // Insert in sorted order (newest first)
134        let pos = self
135            .versions
136            .binary_search_by_key(&std::cmp::Reverse(version.timestamp), |v| {
137                std::cmp::Reverse(v.timestamp)
138            })
139            .unwrap_or_else(|pos| pos);
140        self.versions.insert(pos, version);
141    }
142
143    /// Get visible version for a timestamp
144    fn get_visible_version(&self, timestamp: Timestamp) -> Option<&Version> {
145        self.versions
146            .iter()
147            .find(|v| v.timestamp <= timestamp && v.is_visible_at(timestamp))
148    }
149
150    /// Garbage collect old versions
151    fn gc_versions(&mut self, min_timestamp: Timestamp, max_versions: usize) {
152        // Keep at least one version
153        if self.versions.len() <= 1 {
154            return;
155        }
156
157        // Remove old deleted versions
158        self.versions
159            .retain(|v| !(v.deleted && v.timestamp < min_timestamp));
160
161        // Limit number of versions
162        if self.versions.len() > max_versions {
163            self.versions.truncate(max_versions);
164        }
165    }
166}
167
168/// A version of a triple
169#[derive(Debug, Clone)]
170pub struct Version {
171    /// Version ID
172    pub id: VersionId,
173
174    /// Creation timestamp
175    pub timestamp: Timestamp,
176
177    /// Transaction that created this version
178    pub transaction_id: TransactionId,
179
180    /// Whether this version represents a deletion
181    pub deleted: bool,
182
183    /// The triple data (None if deleted)
184    pub triple: Option<Triple>,
185
186    /// Commit timestamp (None if not yet committed)
187    pub commit_timestamp: Option<Timestamp>,
188}
189
190impl Version {
191    /// Check if this version is visible at a given timestamp
192    fn is_visible_at(&self, timestamp: Timestamp) -> bool {
193        if let Some(commit_ts) = self.commit_timestamp {
194            commit_ts <= timestamp
195        } else {
196            false // Uncommitted versions are not visible
197        }
198    }
199}
200
201/// Transaction state
202#[derive(Debug, Clone)]
203pub struct TransactionState {
204    /// Transaction ID
205    pub id: TransactionId,
206
207    /// Start timestamp
208    pub start_timestamp: Timestamp,
209
210    /// Commit timestamp (if committed)
211    pub commit_timestamp: Option<Timestamp>,
212
213    /// Transaction status
214    pub status: TransactionStatus,
215
216    /// Read set (for conflict detection)
217    pub read_set: HashSet<TripleKey>,
218
219    /// Write set
220    pub write_set: HashMap<TripleKey, WriteOperation>,
221
222    /// Isolation level
223    pub isolation_level: IsolationLevel,
224}
225
226/// Transaction status
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum TransactionStatus {
229    Active,
230    Preparing,
231    Committed,
232    Aborted,
233}
234
235/// Write operation type
236#[derive(Debug, Clone)]
237pub enum WriteOperation {
238    Insert(Triple),
239    Delete,
240}
241
242/// Isolation level
243#[derive(Debug, Clone, Copy, PartialEq, Eq)]
244pub enum IsolationLevel {
245    /// Read uncommitted (dirty reads allowed)
246    ReadUncommitted,
247    /// Read committed (no dirty reads)
248    ReadCommitted,
249    /// Repeatable read (no dirty or non-repeatable reads)
250    RepeatableRead,
251    /// Serializable (full isolation)
252    Serializable,
253    /// Snapshot isolation
254    Snapshot,
255    /// Snapshot isolation (alias for Snapshot)
256    SnapshotIsolation,
257}
258
259/// Snapshot information
260#[derive(Debug, Clone)]
261pub struct SnapshotInfo {
262    /// Snapshot timestamp
263    pub timestamp: Timestamp,
264
265    /// Active transactions at snapshot time
266    pub active_transactions: HashSet<TransactionId>,
267
268    /// Reference count
269    pub ref_count: usize,
270}
271
272/// Garbage collection state
273#[derive(Debug)]
274pub struct GarbageCollectionState {
275    /// Last GC run time
276    last_gc: Instant,
277
278    /// Number of versions collected
279    versions_collected: u64,
280
281    /// Number of GC runs
282    gc_runs: u64,
283}
284
285/// MVCC indexes for efficient queries
286pub struct MvccIndexes {
287    /// Subject index
288    subject_index: DashMap<String, HashSet<TripleKey>>,
289
290    /// Predicate index
291    predicate_index: DashMap<String, HashSet<TripleKey>>,
292
293    /// Object index
294    object_index: DashMap<String, HashSet<TripleKey>>,
295}
296
297impl MvccStore {
298    /// Create a new MVCC store
299    pub fn new(config: MvccConfig) -> Self {
300        Self {
301            config,
302            versions: Arc::new(DashMap::new()),
303            transactions: Arc::new(DashMap::new()),
304            timestamp_counter: Arc::new(AtomicU64::new(1)),
305            transaction_counter: Arc::new(AtomicU64::new(1)),
306            snapshots: Arc::new(RwLock::new(BTreeMap::new())),
307            gc_state: Arc::new(Mutex::new(GarbageCollectionState {
308                last_gc: Instant::now(),
309                versions_collected: 0,
310                gc_runs: 0,
311            })),
312            indexes: Arc::new(MvccIndexes {
313                subject_index: DashMap::new(),
314                predicate_index: DashMap::new(),
315                object_index: DashMap::new(),
316            }),
317        }
318    }
319
320    /// Begin a new transaction
321    pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
322        let tx_id = self.transaction_counter.fetch_add(1, Ordering::SeqCst);
323        let start_timestamp = self.get_next_timestamp();
324
325        let tx_state = TransactionState {
326            id: tx_id,
327            start_timestamp,
328            commit_timestamp: None,
329            status: TransactionStatus::Active,
330            read_set: HashSet::new(),
331            write_set: HashMap::new(),
332            isolation_level,
333        };
334
335        self.transactions.insert(tx_id, tx_state);
336
337        // Create snapshot if needed
338        if isolation_level == IsolationLevel::Snapshot {
339            self.create_snapshot(start_timestamp)?;
340        }
341
342        Ok(tx_id)
343    }
344
345    /// Insert a triple in a transaction
346    pub fn insert(&self, tx_id: TransactionId, triple: Triple) -> Result<()> {
347        let mut tx = self.get_active_transaction(tx_id)?;
348
349        let key = TripleKey::from_triple(&triple);
350
351        // Check for write-write conflicts
352        if self.config.conflict_detection == ConflictDetection::Pessimistic {
353            self.check_write_conflict(&key, tx_id)?;
354        }
355
356        // Add to write set
357        tx.write_set
358            .insert(key.clone(), WriteOperation::Insert(triple.clone()));
359
360        // Update indexes
361        self.update_indexes_for_insert(&key);
362
363        Ok(())
364    }
365
366    /// Delete a triple in a transaction
367    pub fn delete(&self, tx_id: TransactionId, triple: &Triple) -> Result<()> {
368        let mut tx = self.get_active_transaction(tx_id)?;
369
370        let key = TripleKey::from_triple(triple);
371
372        // Check if triple exists
373        if !self.exists_at_timestamp(&key, tx.start_timestamp)? {
374            return Err(anyhow!("Triple does not exist"));
375        }
376
377        // Add to write set
378        tx.write_set.insert(key.clone(), WriteOperation::Delete);
379
380        // Update indexes
381        self.update_indexes_for_delete(&key);
382
383        Ok(())
384    }
385
386    /// Query triples at transaction's snapshot
387    pub fn query(
388        &self,
389        tx_id: TransactionId,
390        subject: Option<&Subject>,
391        predicate: Option<&Predicate>,
392        object: Option<&Object>,
393    ) -> Result<Vec<Triple>> {
394        let mut tx = self.get_active_transaction(tx_id)?;
395        let timestamp = tx.start_timestamp;
396
397        // Get candidate keys from indexes
398        let candidates = self.get_candidate_keys(subject, predicate, object);
399
400        let mut results = Vec::new();
401        let mut processed_keys = HashSet::new();
402
403        for key in candidates {
404            processed_keys.insert(key.clone());
405
406            // Add to read set for conflict detection
407            tx.read_set.insert(key.clone());
408
409            // Check if visible at timestamp
410            if let Some(version_chain) = self.versions.get(&key) {
411                if let Some(version) = version_chain.get_visible_version(timestamp) {
412                    if !version.deleted {
413                        if let Some(triple) = &version.triple {
414                            // Apply predicate filters
415                            if self.matches_pattern(triple, subject, predicate, object) {
416                                results.push(triple.clone());
417                            }
418                        }
419                    }
420                }
421            }
422
423            // Check uncommitted writes in same transaction
424            if self.config.enable_read_your_writes {
425                if let Some(write_op) = tx.write_set.get(&key) {
426                    match write_op {
427                        WriteOperation::Insert(triple) => {
428                            if self.matches_pattern(triple, subject, predicate, object) {
429                                results.push(triple.clone());
430                            }
431                        }
432                        WriteOperation::Delete => {
433                            // Remove from results if it was added
434                            results.retain(|t| TripleKey::from_triple(t) != key);
435                        }
436                    }
437                }
438            }
439        }
440
441        // Also check write set for new keys not in main indexes (read-your-writes)
442        if self.config.enable_read_your_writes {
443            for (key, write_op) in &tx.write_set {
444                if !processed_keys.contains(key) {
445                    match write_op {
446                        WriteOperation::Insert(triple) => {
447                            if self.matches_pattern(triple, subject, predicate, object) {
448                                results.push(triple.clone());
449                            }
450                        }
451                        WriteOperation::Delete => {
452                            // Already handled above
453                        }
454                    }
455                }
456            }
457        }
458
459        Ok(results)
460    }
461
462    /// Commit a transaction
463    pub fn commit_transaction(&self, tx_id: TransactionId) -> Result<()> {
464        let mut tx = self.get_active_transaction(tx_id)?;
465
466        // Change status to preparing
467        tx.status = TransactionStatus::Preparing;
468
469        // Validate transaction
470        self.validate_transaction(&tx)?;
471
472        // Get commit timestamp
473        let commit_timestamp = self.get_next_timestamp();
474
475        // Apply writes
476        for (key, operation) in &tx.write_set {
477            let version = match operation {
478                WriteOperation::Insert(triple) => Version {
479                    id: self.get_next_timestamp(), // Use timestamp as version ID
480                    timestamp: commit_timestamp,
481                    transaction_id: tx_id,
482                    deleted: false,
483                    triple: Some(triple.clone()),
484                    commit_timestamp: Some(commit_timestamp),
485                },
486                WriteOperation::Delete => Version {
487                    id: self.get_next_timestamp(),
488                    timestamp: commit_timestamp,
489                    transaction_id: tx_id,
490                    deleted: true,
491                    triple: None,
492                    commit_timestamp: Some(commit_timestamp),
493                },
494            };
495
496            self.versions
497                .entry(key.clone())
498                .or_insert_with(VersionChain::new)
499                .add_version(version);
500        }
501
502        // Update transaction state
503        tx.commit_timestamp = Some(commit_timestamp);
504        tx.status = TransactionStatus::Committed;
505
506        // Trigger GC if needed
507        self.maybe_run_gc();
508
509        Ok(())
510    }
511
512    /// Abort a transaction
513    pub fn abort_transaction(&self, tx_id: TransactionId) -> Result<()> {
514        if let Some(mut tx) = self.transactions.get_mut(&tx_id) {
515            tx.status = TransactionStatus::Aborted;
516
517            // Clean up any locks or resources
518            // In optimistic mode, no cleanup needed
519        }
520
521        Ok(())
522    }
523
524    /// Validate transaction for conflicts
525    fn validate_transaction(&self, tx: &TransactionState) -> Result<()> {
526        match self.config.conflict_detection {
527            ConflictDetection::Optimistic => {
528                // Check read set for modifications
529                for key in &tx.read_set {
530                    if let Some(version_chain) = self.versions.get(key) {
531                        if let Some(latest) = version_chain.versions.first() {
532                            if latest.timestamp > tx.start_timestamp {
533                                return Err(anyhow!("Read conflict detected"));
534                            }
535                        }
536                    }
537                }
538
539                // Check write-write conflicts
540                for key in tx.write_set.keys() {
541                    if let Some(version_chain) = self.versions.get(key) {
542                        if let Some(latest) = version_chain.versions.first() {
543                            if latest.timestamp > tx.start_timestamp
544                                && latest.transaction_id != tx.id
545                            {
546                                return Err(anyhow!("Write conflict detected"));
547                            }
548                        }
549                    }
550                }
551            }
552
553            ConflictDetection::Pessimistic => {
554                // Conflicts already prevented during operations
555            }
556
557            ConflictDetection::TimestampOrdering => {
558                // Ensure timestamp ordering is maintained
559                for key in tx.write_set.keys() {
560                    if let Some(version_chain) = self.versions.get(key) {
561                        for version in &version_chain.versions {
562                            if version.transaction_id != tx.id
563                                && version.timestamp > tx.start_timestamp
564                                && version.commit_timestamp.is_some()
565                            {
566                                return Err(anyhow!("Timestamp ordering violation"));
567                            }
568                        }
569                    }
570                }
571            }
572            ConflictDetection::OptimisticTwoPhase => {
573                // Two-phase optimistic validation
574                // Phase 1: Read validation (similar to optimistic)
575                for key in &tx.read_set {
576                    if let Some(version_chain) = self.versions.get(key) {
577                        if let Some(latest) = version_chain.versions.first() {
578                            if latest.timestamp > tx.start_timestamp {
579                                return Err(anyhow!("Read conflict detected in phase 1"));
580                            }
581                        }
582                    }
583                }
584
585                // Phase 2: Write validation
586                for key in tx.write_set.keys() {
587                    if let Some(version_chain) = self.versions.get(key) {
588                        for version in &version_chain.versions {
589                            if version.transaction_id != tx.id
590                                && version.timestamp > tx.start_timestamp
591                                && version.commit_timestamp.is_some()
592                            {
593                                return Err(anyhow!("Write conflict detected in phase 2"));
594                            }
595                        }
596                    }
597                }
598            }
599        }
600
601        Ok(())
602    }
603
604    /// Get active transaction
605    fn get_active_transaction(
606        &self,
607        tx_id: TransactionId,
608    ) -> Result<dashmap::mapref::one::RefMut<'_, TransactionId, TransactionState>> {
609        let tx = self
610            .transactions
611            .get_mut(&tx_id)
612            .ok_or_else(|| anyhow!("Transaction not found"))?;
613
614        if tx.status != TransactionStatus::Active {
615            return Err(anyhow!("Transaction is not active"));
616        }
617
618        Ok(tx)
619    }
620
621    /// Check if triple exists at timestamp
622    fn exists_at_timestamp(&self, key: &TripleKey, timestamp: Timestamp) -> Result<bool> {
623        if let Some(version_chain) = self.versions.get(key) {
624            if let Some(version) = version_chain.get_visible_version(timestamp) {
625                return Ok(!version.deleted);
626            }
627        }
628        Ok(false)
629    }
630
631    /// Get candidate keys from indexes
632    fn get_candidate_keys(
633        &self,
634        subject: Option<&Subject>,
635        predicate: Option<&Predicate>,
636        object: Option<&Object>,
637    ) -> HashSet<TripleKey> {
638        let mut candidates = HashSet::new();
639
640        // Use most selective index
641        if let Some(subj) = subject {
642            if let Some(keys) = self.indexes.subject_index.get(&subj.to_string()) {
643                candidates.extend(keys.iter().cloned());
644            }
645        } else if let Some(pred) = predicate {
646            if let Some(keys) = self.indexes.predicate_index.get(&pred.to_string()) {
647                candidates.extend(keys.iter().cloned());
648            }
649        } else if let Some(obj) = object {
650            if let Some(keys) = self.indexes.object_index.get(&obj.to_string()) {
651                candidates.extend(keys.iter().cloned());
652            }
653        } else {
654            // Full scan
655            for entry in self.versions.iter() {
656                candidates.insert(entry.key().clone());
657            }
658        }
659
660        candidates
661    }
662
663    /// Check if triple matches pattern
664    fn matches_pattern(
665        &self,
666        triple: &Triple,
667        subject: Option<&Subject>,
668        predicate: Option<&Predicate>,
669        object: Option<&Object>,
670    ) -> bool {
671        if let Some(s) = subject {
672            if triple.subject() != s {
673                return false;
674            }
675        }
676        if let Some(p) = predicate {
677            if triple.predicate() != p {
678                return false;
679            }
680        }
681        if let Some(o) = object {
682            if triple.object() != o {
683                return false;
684            }
685        }
686        true
687    }
688
689    /// Update indexes for insert
690    fn update_indexes_for_insert(&self, key: &TripleKey) {
691        self.indexes
692            .subject_index
693            .entry(key.subject.clone())
694            .or_default()
695            .insert(key.clone());
696
697        self.indexes
698            .predicate_index
699            .entry(key.predicate.clone())
700            .or_default()
701            .insert(key.clone());
702
703        self.indexes
704            .object_index
705            .entry(key.object.clone())
706            .or_default()
707            .insert(key.clone());
708    }
709
710    /// Update indexes for delete
711    fn update_indexes_for_delete(&self, _key: &TripleKey) {
712        // Note: We don't actually remove from indexes during transaction
713        // This happens during GC when versions are cleaned up
714    }
715
716    /// Check for write conflicts (pessimistic mode)
717    fn check_write_conflict(&self, _key: &TripleKey, _tx_id: TransactionId) -> Result<()> {
718        // In a real implementation, would check locks
719        Ok(())
720    }
721
722    /// Create a snapshot
723    fn create_snapshot(&self, timestamp: Timestamp) -> Result<()> {
724        let active_txs: HashSet<TransactionId> = self
725            .transactions
726            .iter()
727            .filter(|entry| entry.value().status == TransactionStatus::Active)
728            .map(|entry| *entry.key())
729            .collect();
730
731        let snapshot = SnapshotInfo {
732            timestamp,
733            active_transactions: active_txs,
734            ref_count: 1,
735        };
736
737        self.snapshots.write().insert(timestamp, snapshot);
738
739        Ok(())
740    }
741
742    /// Get current timestamp
743    fn get_current_timestamp(&self) -> Timestamp {
744        self.timestamp_counter.load(Ordering::SeqCst)
745    }
746
747    /// Get next timestamp
748    fn get_next_timestamp(&self) -> Timestamp {
749        self.timestamp_counter.fetch_add(1, Ordering::SeqCst)
750    }
751
752    /// Maybe run garbage collection
753    fn maybe_run_gc(&self) {
754        let mut gc_state = self.gc_state.lock();
755
756        if gc_state.last_gc.elapsed() >= self.config.gc_interval {
757            // Run GC in background
758            let versions = self.versions.clone();
759            let config = self.config.clone();
760            let min_timestamp = self.calculate_min_timestamp();
761
762            std::thread::spawn(move || {
763                Self::run_gc_internal(versions, config, min_timestamp);
764            });
765
766            gc_state.last_gc = Instant::now();
767            gc_state.gc_runs += 1;
768        }
769    }
770
771    /// Run garbage collection
772    fn run_gc_internal(
773        versions: Arc<DashMap<TripleKey, VersionChain>>,
774        config: MvccConfig,
775        min_timestamp: Timestamp,
776    ) {
777        for mut entry in versions.iter_mut() {
778            entry
779                .value_mut()
780                .gc_versions(min_timestamp, config.max_versions_per_triple);
781        }
782    }
783
784    /// Calculate minimum timestamp to keep
785    fn calculate_min_timestamp(&self) -> Timestamp {
786        // Keep versions needed by active transactions
787        let min_active = self
788            .transactions
789            .iter()
790            .filter(|entry| entry.value().status == TransactionStatus::Active)
791            .map(|entry| entry.value().start_timestamp)
792            .min()
793            .unwrap_or(self.get_current_timestamp());
794
795        // Keep versions needed by snapshots
796        let min_snapshot = self
797            .snapshots
798            .read()
799            .keys()
800            .next()
801            .copied()
802            .unwrap_or(self.get_current_timestamp());
803
804        min_active.min(min_snapshot)
805    }
806
807    /// Run garbage collection manually
808    pub fn garbage_collect(&self) -> Result<()> {
809        let min_timestamp = self.calculate_min_timestamp();
810        Self::run_gc_internal(self.versions.clone(), self.config.clone(), min_timestamp);
811        Ok(())
812    }
813
814    /// Get store statistics
815    pub fn get_stats(&self) -> MvccStats {
816        let total_versions = self
817            .versions
818            .iter()
819            .map(|entry| entry.value().versions.len())
820            .sum();
821
822        let gc_state = self.gc_state.lock();
823
824        MvccStats {
825            total_triples: self.versions.len(),
826            total_versions,
827            active_transactions: self
828                .transactions
829                .iter()
830                .filter(|entry| entry.value().status == TransactionStatus::Active)
831                .count(),
832            gc_runs: gc_state.gc_runs,
833            versions_collected: gc_state.versions_collected,
834        }
835    }
836}
837
838/// MVCC statistics
839#[derive(Debug, Clone)]
840pub struct MvccStats {
841    pub total_triples: usize,
842    pub total_versions: usize,
843    pub active_transactions: usize,
844    pub gc_runs: u64,
845    pub versions_collected: u64,
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use crate::model::{Literal, NamedNode};
852
853    #[test]
854    fn test_basic_mvcc_operations() {
855        let config = MvccConfig::default();
856        let store = MvccStore::new(config);
857
858        // Begin transaction
859        let tx1 = store
860            .begin_transaction(IsolationLevel::Snapshot)
861            .expect("store operation should succeed");
862
863        // Insert triple
864        let triple = Triple::new(
865            NamedNode::new("http://example.org/s").expect("valid IRI"),
866            NamedNode::new("http://example.org/p").expect("valid IRI"),
867            Literal::new("value"),
868        );
869
870        store
871            .insert(tx1, triple.clone())
872            .expect("MVCC insert should succeed");
873
874        // Query should see the triple (read-your-writes)
875        let results = store
876            .query(tx1, None, None, None)
877            .expect("store operation should succeed");
878        assert_eq!(results.len(), 1);
879
880        // Commit transaction
881        store
882            .commit_transaction(tx1)
883            .expect("store operation should succeed");
884
885        // New transaction should see committed data
886        let tx2 = store
887            .begin_transaction(IsolationLevel::Snapshot)
888            .expect("store operation should succeed");
889        let results = store
890            .query(tx2, None, None, None)
891            .expect("store operation should succeed");
892        assert_eq!(results.len(), 1);
893    }
894
895    #[test]
896    fn test_concurrent_transactions() {
897        let config = MvccConfig::default();
898        let store = MvccStore::new(config);
899
900        // Insert initial data
901        let tx0 = store
902            .begin_transaction(IsolationLevel::Snapshot)
903            .expect("store operation should succeed");
904        let triple = Triple::new(
905            NamedNode::new("http://example.org/s").expect("valid IRI"),
906            NamedNode::new("http://example.org/p").expect("valid IRI"),
907            Literal::new("initial"),
908        );
909        store
910            .insert(tx0, triple.clone())
911            .expect("MVCC insert should succeed");
912        store
913            .commit_transaction(tx0)
914            .expect("store operation should succeed");
915
916        // Start two concurrent transactions
917        let tx1 = store
918            .begin_transaction(IsolationLevel::Snapshot)
919            .expect("store operation should succeed");
920        let tx2 = store
921            .begin_transaction(IsolationLevel::Snapshot)
922            .expect("store operation should succeed");
923
924        // Both should see initial data
925        assert_eq!(
926            store
927                .query(tx1, None, None, None)
928                .expect("store operation should succeed")
929                .len(),
930            1
931        );
932        assert_eq!(
933            store
934                .query(tx2, None, None, None)
935                .expect("store operation should succeed")
936                .len(),
937            1
938        );
939
940        // TX1 modifies data
941        store
942            .delete(tx1, &triple)
943            .expect("store operation should succeed");
944        let new_triple = Triple::new(
945            NamedNode::new("http://example.org/s").expect("valid IRI"),
946            NamedNode::new("http://example.org/p").expect("valid IRI"),
947            Literal::new("modified"),
948        );
949        store
950            .insert(tx1, new_triple)
951            .expect("store operation should succeed");
952
953        // TX2 shouldn't see TX1's changes yet
954        let tx2_results = store
955            .query(tx2, None, None, None)
956            .expect("store operation should succeed");
957        assert_eq!(tx2_results.len(), 1);
958        assert_eq!(tx2_results[0].object().to_string(), "\"initial\"");
959
960        // Commit TX1
961        store
962            .commit_transaction(tx1)
963            .expect("store operation should succeed");
964
965        // TX2 still sees snapshot
966        let tx2_results = store
967            .query(tx2, None, None, None)
968            .expect("store operation should succeed");
969        assert_eq!(tx2_results.len(), 1);
970        assert_eq!(tx2_results[0].object().to_string(), "\"initial\"");
971
972        // New transaction sees committed changes
973        let tx3 = store
974            .begin_transaction(IsolationLevel::Snapshot)
975            .expect("store operation should succeed");
976        let tx3_results = store
977            .query(tx3, None, None, None)
978            .expect("store operation should succeed");
979        assert_eq!(tx3_results.len(), 1);
980        assert_eq!(tx3_results[0].object().to_string(), "\"modified\"");
981    }
982
983    #[test]
984    fn test_write_conflict_detection() {
985        let config = MvccConfig {
986            conflict_detection: ConflictDetection::Optimistic,
987            ..Default::default()
988        };
989        let store = MvccStore::new(config);
990
991        // Insert initial data
992        let tx0 = store
993            .begin_transaction(IsolationLevel::Snapshot)
994            .expect("store operation should succeed");
995        let triple = Triple::new(
996            NamedNode::new("http://example.org/s").expect("valid IRI"),
997            NamedNode::new("http://example.org/p").expect("valid IRI"),
998            Literal::new("initial"),
999        );
1000        store
1001            .insert(tx0, triple.clone())
1002            .expect("MVCC insert should succeed");
1003        store
1004            .commit_transaction(tx0)
1005            .expect("store operation should succeed");
1006
1007        // Start two transactions
1008        let tx1 = store
1009            .begin_transaction(IsolationLevel::Snapshot)
1010            .expect("store operation should succeed");
1011        let tx2 = store
1012            .begin_transaction(IsolationLevel::Snapshot)
1013            .expect("store operation should succeed");
1014
1015        // Both modify the same triple
1016        store
1017            .delete(tx1, &triple)
1018            .expect("store operation should succeed");
1019        store
1020            .delete(tx2, &triple)
1021            .expect("store operation should succeed");
1022
1023        // First commit succeeds
1024        assert!(store.commit_transaction(tx1).is_ok());
1025
1026        // Second commit should fail due to conflict
1027        assert!(store.commit_transaction(tx2).is_err());
1028    }
1029
1030    #[test]
1031    fn test_version_chain() {
1032        let mut chain = VersionChain::new();
1033
1034        // Add versions
1035        for i in 0..5 {
1036            let version = Version {
1037                id: i,
1038                timestamp: i * 10,
1039                transaction_id: i,
1040                deleted: false,
1041                triple: None,
1042                commit_timestamp: Some(i * 10 + 5),
1043            };
1044            chain.add_version(version);
1045        }
1046
1047        // Test visibility
1048        assert_eq!(
1049            chain
1050                .get_visible_version(25)
1051                .expect("operation should succeed")
1052                .id,
1053            2
1054        );
1055        assert_eq!(
1056            chain
1057                .get_visible_version(45)
1058                .expect("operation should succeed")
1059                .id,
1060            4
1061        );
1062
1063        // Test GC
1064        chain.gc_versions(20, 3);
1065        assert!(chain.versions.len() <= 3);
1066    }
1067}