1use crate::model::{Object, Predicate, Subject, Triple};
7use anyhow::{anyhow, Result};
8use dashmap::DashMap;
9use parking_lot::{Mutex, RwLock};
10use std::collections::{BTreeMap, HashMap, HashSet};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15pub type Timestamp = u64;
17
18pub type TransactionId = u64;
20
21pub type VersionId = u64;
23
24#[derive(Debug, Clone)]
26pub struct MvccConfig {
27 pub max_versions_per_triple: usize,
29
30 pub gc_interval: Duration,
32
33 pub min_version_age: Duration,
35
36 pub enable_snapshot_isolation: bool,
38
39 pub enable_read_your_writes: bool,
41
42 pub conflict_detection: ConflictDetection,
44}
45
46impl Default for MvccConfig {
47 fn default() -> Self {
48 Self {
49 max_versions_per_triple: 100,
50 gc_interval: Duration::from_secs(60),
51 min_version_age: Duration::from_secs(300), enable_snapshot_isolation: true,
53 enable_read_your_writes: true,
54 conflict_detection: ConflictDetection::Optimistic,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ConflictDetection {
62 Optimistic,
64 OptimisticTwoPhase,
66 Pessimistic,
68 TimestampOrdering,
70}
71
72pub struct MvccStore {
74 config: MvccConfig,
76
77 versions: Arc<DashMap<TripleKey, VersionChain>>,
79
80 transactions: Arc<DashMap<TransactionId, TransactionState>>,
82
83 timestamp_counter: Arc<AtomicU64>,
85
86 transaction_counter: Arc<AtomicU64>,
88
89 snapshots: Arc<RwLock<BTreeMap<Timestamp, SnapshotInfo>>>,
91
92 gc_state: Arc<Mutex<GarbageCollectionState>>,
94
95 indexes: Arc<MvccIndexes>,
97}
98
99#[derive(Debug, Clone, Hash, Eq, PartialEq)]
101pub struct TripleKey {
102 subject: String,
103 predicate: String,
104 object: String,
105}
106
107impl TripleKey {
108 fn from_triple(triple: &Triple) -> Self {
109 Self {
110 subject: triple.subject().to_string(),
111 predicate: triple.predicate().to_string(),
112 object: triple.object().to_string(),
113 }
114 }
115}
116
117#[derive(Debug, Clone)]
119pub struct VersionChain {
120 versions: Vec<Version>,
122}
123
124impl VersionChain {
125 fn new() -> Self {
126 Self {
127 versions: Vec::new(),
128 }
129 }
130
131 fn add_version(&mut self, version: Version) {
133 let pos = self
135 .versions
136 .binary_search_by_key(&std::cmp::Reverse(version.timestamp), |v| {
137 std::cmp::Reverse(v.timestamp)
138 })
139 .unwrap_or_else(|pos| pos);
140 self.versions.insert(pos, version);
141 }
142
143 fn get_visible_version(&self, timestamp: Timestamp) -> Option<&Version> {
145 self.versions
146 .iter()
147 .find(|v| v.timestamp <= timestamp && v.is_visible_at(timestamp))
148 }
149
150 fn gc_versions(&mut self, min_timestamp: Timestamp, max_versions: usize) {
152 if self.versions.len() <= 1 {
154 return;
155 }
156
157 self.versions
159 .retain(|v| !(v.deleted && v.timestamp < min_timestamp));
160
161 if self.versions.len() > max_versions {
163 self.versions.truncate(max_versions);
164 }
165 }
166}
167
168#[derive(Debug, Clone)]
170pub struct Version {
171 pub id: VersionId,
173
174 pub timestamp: Timestamp,
176
177 pub transaction_id: TransactionId,
179
180 pub deleted: bool,
182
183 pub triple: Option<Triple>,
185
186 pub commit_timestamp: Option<Timestamp>,
188}
189
190impl Version {
191 fn is_visible_at(&self, timestamp: Timestamp) -> bool {
193 if let Some(commit_ts) = self.commit_timestamp {
194 commit_ts <= timestamp
195 } else {
196 false }
198 }
199}
200
201#[derive(Debug, Clone)]
203pub struct TransactionState {
204 pub id: TransactionId,
206
207 pub start_timestamp: Timestamp,
209
210 pub commit_timestamp: Option<Timestamp>,
212
213 pub status: TransactionStatus,
215
216 pub read_set: HashSet<TripleKey>,
218
219 pub write_set: HashMap<TripleKey, WriteOperation>,
221
222 pub isolation_level: IsolationLevel,
224}
225
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum TransactionStatus {
229 Active,
230 Preparing,
231 Committed,
232 Aborted,
233}
234
235#[derive(Debug, Clone)]
237pub enum WriteOperation {
238 Insert(Triple),
239 Delete,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
244pub enum IsolationLevel {
245 ReadUncommitted,
247 ReadCommitted,
249 RepeatableRead,
251 Serializable,
253 Snapshot,
255 SnapshotIsolation,
257}
258
259#[derive(Debug, Clone)]
261pub struct SnapshotInfo {
262 pub timestamp: Timestamp,
264
265 pub active_transactions: HashSet<TransactionId>,
267
268 pub ref_count: usize,
270}
271
272#[derive(Debug)]
274pub struct GarbageCollectionState {
275 last_gc: Instant,
277
278 versions_collected: u64,
280
281 gc_runs: u64,
283}
284
285pub struct MvccIndexes {
287 subject_index: DashMap<String, HashSet<TripleKey>>,
289
290 predicate_index: DashMap<String, HashSet<TripleKey>>,
292
293 object_index: DashMap<String, HashSet<TripleKey>>,
295}
296
297impl MvccStore {
298 pub fn new(config: MvccConfig) -> Self {
300 Self {
301 config,
302 versions: Arc::new(DashMap::new()),
303 transactions: Arc::new(DashMap::new()),
304 timestamp_counter: Arc::new(AtomicU64::new(1)),
305 transaction_counter: Arc::new(AtomicU64::new(1)),
306 snapshots: Arc::new(RwLock::new(BTreeMap::new())),
307 gc_state: Arc::new(Mutex::new(GarbageCollectionState {
308 last_gc: Instant::now(),
309 versions_collected: 0,
310 gc_runs: 0,
311 })),
312 indexes: Arc::new(MvccIndexes {
313 subject_index: DashMap::new(),
314 predicate_index: DashMap::new(),
315 object_index: DashMap::new(),
316 }),
317 }
318 }
319
320 pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
322 let tx_id = self.transaction_counter.fetch_add(1, Ordering::SeqCst);
323 let start_timestamp = self.get_next_timestamp();
324
325 let tx_state = TransactionState {
326 id: tx_id,
327 start_timestamp,
328 commit_timestamp: None,
329 status: TransactionStatus::Active,
330 read_set: HashSet::new(),
331 write_set: HashMap::new(),
332 isolation_level,
333 };
334
335 self.transactions.insert(tx_id, tx_state);
336
337 if isolation_level == IsolationLevel::Snapshot {
339 self.create_snapshot(start_timestamp)?;
340 }
341
342 Ok(tx_id)
343 }
344
345 pub fn insert(&self, tx_id: TransactionId, triple: Triple) -> Result<()> {
347 let mut tx = self.get_active_transaction(tx_id)?;
348
349 let key = TripleKey::from_triple(&triple);
350
351 if self.config.conflict_detection == ConflictDetection::Pessimistic {
353 self.check_write_conflict(&key, tx_id)?;
354 }
355
356 tx.write_set
358 .insert(key.clone(), WriteOperation::Insert(triple.clone()));
359
360 self.update_indexes_for_insert(&key);
362
363 Ok(())
364 }
365
366 pub fn delete(&self, tx_id: TransactionId, triple: &Triple) -> Result<()> {
368 let mut tx = self.get_active_transaction(tx_id)?;
369
370 let key = TripleKey::from_triple(triple);
371
372 if !self.exists_at_timestamp(&key, tx.start_timestamp)? {
374 return Err(anyhow!("Triple does not exist"));
375 }
376
377 tx.write_set.insert(key.clone(), WriteOperation::Delete);
379
380 self.update_indexes_for_delete(&key);
382
383 Ok(())
384 }
385
386 pub fn query(
388 &self,
389 tx_id: TransactionId,
390 subject: Option<&Subject>,
391 predicate: Option<&Predicate>,
392 object: Option<&Object>,
393 ) -> Result<Vec<Triple>> {
394 let mut tx = self.get_active_transaction(tx_id)?;
395 let timestamp = tx.start_timestamp;
396
397 let candidates = self.get_candidate_keys(subject, predicate, object);
399
400 let mut results = Vec::new();
401 let mut processed_keys = HashSet::new();
402
403 for key in candidates {
404 processed_keys.insert(key.clone());
405
406 tx.read_set.insert(key.clone());
408
409 if let Some(version_chain) = self.versions.get(&key) {
411 if let Some(version) = version_chain.get_visible_version(timestamp) {
412 if !version.deleted {
413 if let Some(triple) = &version.triple {
414 if self.matches_pattern(triple, subject, predicate, object) {
416 results.push(triple.clone());
417 }
418 }
419 }
420 }
421 }
422
423 if self.config.enable_read_your_writes {
425 if let Some(write_op) = tx.write_set.get(&key) {
426 match write_op {
427 WriteOperation::Insert(triple) => {
428 if self.matches_pattern(triple, subject, predicate, object) {
429 results.push(triple.clone());
430 }
431 }
432 WriteOperation::Delete => {
433 results.retain(|t| TripleKey::from_triple(t) != key);
435 }
436 }
437 }
438 }
439 }
440
441 if self.config.enable_read_your_writes {
443 for (key, write_op) in &tx.write_set {
444 if !processed_keys.contains(key) {
445 match write_op {
446 WriteOperation::Insert(triple) => {
447 if self.matches_pattern(triple, subject, predicate, object) {
448 results.push(triple.clone());
449 }
450 }
451 WriteOperation::Delete => {
452 }
454 }
455 }
456 }
457 }
458
459 Ok(results)
460 }
461
462 pub fn commit_transaction(&self, tx_id: TransactionId) -> Result<()> {
464 let mut tx = self.get_active_transaction(tx_id)?;
465
466 tx.status = TransactionStatus::Preparing;
468
469 self.validate_transaction(&tx)?;
471
472 let commit_timestamp = self.get_next_timestamp();
474
475 for (key, operation) in &tx.write_set {
477 let version = match operation {
478 WriteOperation::Insert(triple) => Version {
479 id: self.get_next_timestamp(), timestamp: commit_timestamp,
481 transaction_id: tx_id,
482 deleted: false,
483 triple: Some(triple.clone()),
484 commit_timestamp: Some(commit_timestamp),
485 },
486 WriteOperation::Delete => Version {
487 id: self.get_next_timestamp(),
488 timestamp: commit_timestamp,
489 transaction_id: tx_id,
490 deleted: true,
491 triple: None,
492 commit_timestamp: Some(commit_timestamp),
493 },
494 };
495
496 self.versions
497 .entry(key.clone())
498 .or_insert_with(VersionChain::new)
499 .add_version(version);
500 }
501
502 tx.commit_timestamp = Some(commit_timestamp);
504 tx.status = TransactionStatus::Committed;
505
506 self.maybe_run_gc();
508
509 Ok(())
510 }
511
512 pub fn abort_transaction(&self, tx_id: TransactionId) -> Result<()> {
514 if let Some(mut tx) = self.transactions.get_mut(&tx_id) {
515 tx.status = TransactionStatus::Aborted;
516
517 }
520
521 Ok(())
522 }
523
524 fn validate_transaction(&self, tx: &TransactionState) -> Result<()> {
526 match self.config.conflict_detection {
527 ConflictDetection::Optimistic => {
528 for key in &tx.read_set {
530 if let Some(version_chain) = self.versions.get(key) {
531 if let Some(latest) = version_chain.versions.first() {
532 if latest.timestamp > tx.start_timestamp {
533 return Err(anyhow!("Read conflict detected"));
534 }
535 }
536 }
537 }
538
539 for key in tx.write_set.keys() {
541 if let Some(version_chain) = self.versions.get(key) {
542 if let Some(latest) = version_chain.versions.first() {
543 if latest.timestamp > tx.start_timestamp
544 && latest.transaction_id != tx.id
545 {
546 return Err(anyhow!("Write conflict detected"));
547 }
548 }
549 }
550 }
551 }
552
553 ConflictDetection::Pessimistic => {
554 }
556
557 ConflictDetection::TimestampOrdering => {
558 for key in tx.write_set.keys() {
560 if let Some(version_chain) = self.versions.get(key) {
561 for version in &version_chain.versions {
562 if version.transaction_id != tx.id
563 && version.timestamp > tx.start_timestamp
564 && version.commit_timestamp.is_some()
565 {
566 return Err(anyhow!("Timestamp ordering violation"));
567 }
568 }
569 }
570 }
571 }
572 ConflictDetection::OptimisticTwoPhase => {
573 for key in &tx.read_set {
576 if let Some(version_chain) = self.versions.get(key) {
577 if let Some(latest) = version_chain.versions.first() {
578 if latest.timestamp > tx.start_timestamp {
579 return Err(anyhow!("Read conflict detected in phase 1"));
580 }
581 }
582 }
583 }
584
585 for key in tx.write_set.keys() {
587 if let Some(version_chain) = self.versions.get(key) {
588 for version in &version_chain.versions {
589 if version.transaction_id != tx.id
590 && version.timestamp > tx.start_timestamp
591 && version.commit_timestamp.is_some()
592 {
593 return Err(anyhow!("Write conflict detected in phase 2"));
594 }
595 }
596 }
597 }
598 }
599 }
600
601 Ok(())
602 }
603
604 fn get_active_transaction(
606 &self,
607 tx_id: TransactionId,
608 ) -> Result<dashmap::mapref::one::RefMut<'_, TransactionId, TransactionState>> {
609 let tx = self
610 .transactions
611 .get_mut(&tx_id)
612 .ok_or_else(|| anyhow!("Transaction not found"))?;
613
614 if tx.status != TransactionStatus::Active {
615 return Err(anyhow!("Transaction is not active"));
616 }
617
618 Ok(tx)
619 }
620
621 fn exists_at_timestamp(&self, key: &TripleKey, timestamp: Timestamp) -> Result<bool> {
623 if let Some(version_chain) = self.versions.get(key) {
624 if let Some(version) = version_chain.get_visible_version(timestamp) {
625 return Ok(!version.deleted);
626 }
627 }
628 Ok(false)
629 }
630
631 fn get_candidate_keys(
633 &self,
634 subject: Option<&Subject>,
635 predicate: Option<&Predicate>,
636 object: Option<&Object>,
637 ) -> HashSet<TripleKey> {
638 let mut candidates = HashSet::new();
639
640 if let Some(subj) = subject {
642 if let Some(keys) = self.indexes.subject_index.get(&subj.to_string()) {
643 candidates.extend(keys.iter().cloned());
644 }
645 } else if let Some(pred) = predicate {
646 if let Some(keys) = self.indexes.predicate_index.get(&pred.to_string()) {
647 candidates.extend(keys.iter().cloned());
648 }
649 } else if let Some(obj) = object {
650 if let Some(keys) = self.indexes.object_index.get(&obj.to_string()) {
651 candidates.extend(keys.iter().cloned());
652 }
653 } else {
654 for entry in self.versions.iter() {
656 candidates.insert(entry.key().clone());
657 }
658 }
659
660 candidates
661 }
662
663 fn matches_pattern(
665 &self,
666 triple: &Triple,
667 subject: Option<&Subject>,
668 predicate: Option<&Predicate>,
669 object: Option<&Object>,
670 ) -> bool {
671 if let Some(s) = subject {
672 if triple.subject() != s {
673 return false;
674 }
675 }
676 if let Some(p) = predicate {
677 if triple.predicate() != p {
678 return false;
679 }
680 }
681 if let Some(o) = object {
682 if triple.object() != o {
683 return false;
684 }
685 }
686 true
687 }
688
689 fn update_indexes_for_insert(&self, key: &TripleKey) {
691 self.indexes
692 .subject_index
693 .entry(key.subject.clone())
694 .or_default()
695 .insert(key.clone());
696
697 self.indexes
698 .predicate_index
699 .entry(key.predicate.clone())
700 .or_default()
701 .insert(key.clone());
702
703 self.indexes
704 .object_index
705 .entry(key.object.clone())
706 .or_default()
707 .insert(key.clone());
708 }
709
710 fn update_indexes_for_delete(&self, _key: &TripleKey) {
712 }
715
716 fn check_write_conflict(&self, _key: &TripleKey, _tx_id: TransactionId) -> Result<()> {
718 Ok(())
720 }
721
722 fn create_snapshot(&self, timestamp: Timestamp) -> Result<()> {
724 let active_txs: HashSet<TransactionId> = self
725 .transactions
726 .iter()
727 .filter(|entry| entry.value().status == TransactionStatus::Active)
728 .map(|entry| *entry.key())
729 .collect();
730
731 let snapshot = SnapshotInfo {
732 timestamp,
733 active_transactions: active_txs,
734 ref_count: 1,
735 };
736
737 self.snapshots.write().insert(timestamp, snapshot);
738
739 Ok(())
740 }
741
742 fn get_current_timestamp(&self) -> Timestamp {
744 self.timestamp_counter.load(Ordering::SeqCst)
745 }
746
747 fn get_next_timestamp(&self) -> Timestamp {
749 self.timestamp_counter.fetch_add(1, Ordering::SeqCst)
750 }
751
752 fn maybe_run_gc(&self) {
754 let mut gc_state = self.gc_state.lock();
755
756 if gc_state.last_gc.elapsed() >= self.config.gc_interval {
757 let versions = self.versions.clone();
759 let config = self.config.clone();
760 let min_timestamp = self.calculate_min_timestamp();
761
762 std::thread::spawn(move || {
763 Self::run_gc_internal(versions, config, min_timestamp);
764 });
765
766 gc_state.last_gc = Instant::now();
767 gc_state.gc_runs += 1;
768 }
769 }
770
771 fn run_gc_internal(
773 versions: Arc<DashMap<TripleKey, VersionChain>>,
774 config: MvccConfig,
775 min_timestamp: Timestamp,
776 ) {
777 for mut entry in versions.iter_mut() {
778 entry
779 .value_mut()
780 .gc_versions(min_timestamp, config.max_versions_per_triple);
781 }
782 }
783
784 fn calculate_min_timestamp(&self) -> Timestamp {
786 let min_active = self
788 .transactions
789 .iter()
790 .filter(|entry| entry.value().status == TransactionStatus::Active)
791 .map(|entry| entry.value().start_timestamp)
792 .min()
793 .unwrap_or(self.get_current_timestamp());
794
795 let min_snapshot = self
797 .snapshots
798 .read()
799 .keys()
800 .next()
801 .copied()
802 .unwrap_or(self.get_current_timestamp());
803
804 min_active.min(min_snapshot)
805 }
806
807 pub fn garbage_collect(&self) -> Result<()> {
809 let min_timestamp = self.calculate_min_timestamp();
810 Self::run_gc_internal(self.versions.clone(), self.config.clone(), min_timestamp);
811 Ok(())
812 }
813
814 pub fn get_stats(&self) -> MvccStats {
816 let total_versions = self
817 .versions
818 .iter()
819 .map(|entry| entry.value().versions.len())
820 .sum();
821
822 let gc_state = self.gc_state.lock();
823
824 MvccStats {
825 total_triples: self.versions.len(),
826 total_versions,
827 active_transactions: self
828 .transactions
829 .iter()
830 .filter(|entry| entry.value().status == TransactionStatus::Active)
831 .count(),
832 gc_runs: gc_state.gc_runs,
833 versions_collected: gc_state.versions_collected,
834 }
835 }
836}
837
838#[derive(Debug, Clone)]
840pub struct MvccStats {
841 pub total_triples: usize,
842 pub total_versions: usize,
843 pub active_transactions: usize,
844 pub gc_runs: u64,
845 pub versions_collected: u64,
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851 use crate::model::{Literal, NamedNode};
852
853 #[test]
854 fn test_basic_mvcc_operations() {
855 let config = MvccConfig::default();
856 let store = MvccStore::new(config);
857
858 let tx1 = store
860 .begin_transaction(IsolationLevel::Snapshot)
861 .expect("store operation should succeed");
862
863 let triple = Triple::new(
865 NamedNode::new("http://example.org/s").expect("valid IRI"),
866 NamedNode::new("http://example.org/p").expect("valid IRI"),
867 Literal::new("value"),
868 );
869
870 store
871 .insert(tx1, triple.clone())
872 .expect("MVCC insert should succeed");
873
874 let results = store
876 .query(tx1, None, None, None)
877 .expect("store operation should succeed");
878 assert_eq!(results.len(), 1);
879
880 store
882 .commit_transaction(tx1)
883 .expect("store operation should succeed");
884
885 let tx2 = store
887 .begin_transaction(IsolationLevel::Snapshot)
888 .expect("store operation should succeed");
889 let results = store
890 .query(tx2, None, None, None)
891 .expect("store operation should succeed");
892 assert_eq!(results.len(), 1);
893 }
894
895 #[test]
896 fn test_concurrent_transactions() {
897 let config = MvccConfig::default();
898 let store = MvccStore::new(config);
899
900 let tx0 = store
902 .begin_transaction(IsolationLevel::Snapshot)
903 .expect("store operation should succeed");
904 let triple = Triple::new(
905 NamedNode::new("http://example.org/s").expect("valid IRI"),
906 NamedNode::new("http://example.org/p").expect("valid IRI"),
907 Literal::new("initial"),
908 );
909 store
910 .insert(tx0, triple.clone())
911 .expect("MVCC insert should succeed");
912 store
913 .commit_transaction(tx0)
914 .expect("store operation should succeed");
915
916 let tx1 = store
918 .begin_transaction(IsolationLevel::Snapshot)
919 .expect("store operation should succeed");
920 let tx2 = store
921 .begin_transaction(IsolationLevel::Snapshot)
922 .expect("store operation should succeed");
923
924 assert_eq!(
926 store
927 .query(tx1, None, None, None)
928 .expect("store operation should succeed")
929 .len(),
930 1
931 );
932 assert_eq!(
933 store
934 .query(tx2, None, None, None)
935 .expect("store operation should succeed")
936 .len(),
937 1
938 );
939
940 store
942 .delete(tx1, &triple)
943 .expect("store operation should succeed");
944 let new_triple = Triple::new(
945 NamedNode::new("http://example.org/s").expect("valid IRI"),
946 NamedNode::new("http://example.org/p").expect("valid IRI"),
947 Literal::new("modified"),
948 );
949 store
950 .insert(tx1, new_triple)
951 .expect("store operation should succeed");
952
953 let tx2_results = store
955 .query(tx2, None, None, None)
956 .expect("store operation should succeed");
957 assert_eq!(tx2_results.len(), 1);
958 assert_eq!(tx2_results[0].object().to_string(), "\"initial\"");
959
960 store
962 .commit_transaction(tx1)
963 .expect("store operation should succeed");
964
965 let tx2_results = store
967 .query(tx2, None, None, None)
968 .expect("store operation should succeed");
969 assert_eq!(tx2_results.len(), 1);
970 assert_eq!(tx2_results[0].object().to_string(), "\"initial\"");
971
972 let tx3 = store
974 .begin_transaction(IsolationLevel::Snapshot)
975 .expect("store operation should succeed");
976 let tx3_results = store
977 .query(tx3, None, None, None)
978 .expect("store operation should succeed");
979 assert_eq!(tx3_results.len(), 1);
980 assert_eq!(tx3_results[0].object().to_string(), "\"modified\"");
981 }
982
983 #[test]
984 fn test_write_conflict_detection() {
985 let config = MvccConfig {
986 conflict_detection: ConflictDetection::Optimistic,
987 ..Default::default()
988 };
989 let store = MvccStore::new(config);
990
991 let tx0 = store
993 .begin_transaction(IsolationLevel::Snapshot)
994 .expect("store operation should succeed");
995 let triple = Triple::new(
996 NamedNode::new("http://example.org/s").expect("valid IRI"),
997 NamedNode::new("http://example.org/p").expect("valid IRI"),
998 Literal::new("initial"),
999 );
1000 store
1001 .insert(tx0, triple.clone())
1002 .expect("MVCC insert should succeed");
1003 store
1004 .commit_transaction(tx0)
1005 .expect("store operation should succeed");
1006
1007 let tx1 = store
1009 .begin_transaction(IsolationLevel::Snapshot)
1010 .expect("store operation should succeed");
1011 let tx2 = store
1012 .begin_transaction(IsolationLevel::Snapshot)
1013 .expect("store operation should succeed");
1014
1015 store
1017 .delete(tx1, &triple)
1018 .expect("store operation should succeed");
1019 store
1020 .delete(tx2, &triple)
1021 .expect("store operation should succeed");
1022
1023 assert!(store.commit_transaction(tx1).is_ok());
1025
1026 assert!(store.commit_transaction(tx2).is_err());
1028 }
1029
1030 #[test]
1031 fn test_version_chain() {
1032 let mut chain = VersionChain::new();
1033
1034 for i in 0..5 {
1036 let version = Version {
1037 id: i,
1038 timestamp: i * 10,
1039 transaction_id: i,
1040 deleted: false,
1041 triple: None,
1042 commit_timestamp: Some(i * 10 + 5),
1043 };
1044 chain.add_version(version);
1045 }
1046
1047 assert_eq!(
1049 chain
1050 .get_visible_version(25)
1051 .expect("operation should succeed")
1052 .id,
1053 2
1054 );
1055 assert_eq!(
1056 chain
1057 .get_visible_version(45)
1058 .expect("operation should succeed")
1059 .id,
1060 4
1061 );
1062
1063 chain.gc_versions(20, 3);
1065 assert!(chain.versions.len() <= 3);
1066 }
1067}