1use std::collections::BTreeMap;
63use std::sync::Arc;
64use std::sync::atomic::{AtomicU64, Ordering};
65
66use parking_lot::RwLock;
67
68pub const DEFAULT_EPOCH_DURATION_MS: u64 = 10;
70
71pub const MAX_EPOCHS_IN_MEMORY: usize = 100;
73
74pub const MIN_ENTRIES_PER_EPOCH: usize = 1000;
76
77pub struct EpochManager {
83 current_epoch: AtomicU64,
85 epoch_start_time: AtomicU64,
87 epoch_duration_ns: u64,
89 active_readers: RwLock<BTreeMap<u64, u64>>,
91 min_safe_epoch: AtomicU64,
93}
94
95impl EpochManager {
96 pub fn new() -> Self {
97 Self::with_duration_ms(DEFAULT_EPOCH_DURATION_MS)
98 }
99
100 pub fn with_duration_ms(duration_ms: u64) -> Self {
101 let now = Self::current_time_ns();
102 Self {
103 current_epoch: AtomicU64::new(1),
104 epoch_start_time: AtomicU64::new(now),
105 epoch_duration_ns: duration_ms * 1_000_000,
106 active_readers: RwLock::new(BTreeMap::new()),
107 min_safe_epoch: AtomicU64::new(1),
108 }
109 }
110
111 #[inline]
113 pub fn current_epoch(&self) -> u64 {
114 self.current_epoch.load(Ordering::Acquire)
115 }
116
117 #[inline]
119 pub fn min_safe_epoch(&self) -> u64 {
120 self.min_safe_epoch.load(Ordering::Acquire)
121 }
122
123 pub fn should_advance(&self) -> bool {
125 let now = Self::current_time_ns();
126 let start = self.epoch_start_time.load(Ordering::Relaxed);
127 now.saturating_sub(start) >= self.epoch_duration_ns
128 }
129
130 pub fn advance_epoch(&self) -> u64 {
132 let new_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
133 self.epoch_start_time
134 .store(Self::current_time_ns(), Ordering::Relaxed);
135 new_epoch
136 }
137
138 pub fn register_reader(&self, epoch: u64) {
140 let mut readers = self.active_readers.write();
141 *readers.entry(epoch).or_insert(0) += 1;
142 }
143
144 pub fn unregister_reader(&self, epoch: u64) {
146 let mut readers = self.active_readers.write();
147 if let Some(count) = readers.get_mut(&epoch) {
148 *count = count.saturating_sub(1);
149 if *count == 0 {
150 readers.remove(&epoch);
151 if let Some(&min_epoch) = readers.keys().next() {
153 self.min_safe_epoch.store(min_epoch, Ordering::Release);
154 } else {
155 self.min_safe_epoch.store(
156 self.current_epoch.load(Ordering::Relaxed),
157 Ordering::Release,
158 );
159 }
160 }
161 }
162 }
163
164 pub fn gc_eligible_epochs(&self) -> Vec<u64> {
166 let min_safe = self.min_safe_epoch.load(Ordering::Acquire);
167 let readers = self.active_readers.read();
168
169 readers.keys().filter(|&&e| e < min_safe).copied().collect()
171 }
172
173 #[inline]
174 fn current_time_ns() -> u64 {
175 std::time::SystemTime::now()
176 .duration_since(std::time::UNIX_EPOCH)
177 .map(|d| d.as_nanos() as u64)
178 .unwrap_or(0)
179 }
180}
181
182impl Default for EpochManager {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188#[derive(Debug, Clone)]
194pub struct VersionEntry<V> {
195 pub value: V,
197 pub epoch: u64,
199 pub txn_id: u64,
201 pub is_delete: bool,
203}
204
205impl<V> VersionEntry<V> {
206 pub fn new(value: V, epoch: u64, txn_id: u64) -> Self {
207 Self {
208 value,
209 epoch,
210 txn_id,
211 is_delete: false,
212 }
213 }
214
215 pub fn tombstone(epoch: u64, txn_id: u64) -> Self
216 where
217 V: Default,
218 {
219 Self {
220 value: V::default(),
221 epoch,
222 txn_id,
223 is_delete: true,
224 }
225 }
226}
227
228pub struct EpochVersionChain<V> {
240 versions: RwLock<BTreeMap<u64, VersionEntry<V>>>,
243 latest_epoch: AtomicU64,
245}
246
247impl<V: Clone> EpochVersionChain<V> {
248 pub fn new() -> Self {
249 Self {
250 versions: RwLock::new(BTreeMap::new()),
251 latest_epoch: AtomicU64::new(0),
252 }
253 }
254
255 pub fn add_version(&self, epoch: u64, entry: VersionEntry<V>) {
257 let mut versions = self.versions.write();
258 versions.insert(epoch, entry);
259
260 let current = self.latest_epoch.load(Ordering::Relaxed);
262 if epoch > current {
263 self.latest_epoch.store(epoch, Ordering::Release);
264 }
265 }
266
267 pub fn read_at_epoch(&self, target_epoch: u64) -> Option<V> {
271 let latest = self.latest_epoch.load(Ordering::Acquire);
273 if target_epoch >= latest {
274 let versions = self.versions.read();
275 return versions.get(&latest).and_then(|v| {
276 if v.is_delete {
277 None
278 } else {
279 Some(v.value.clone())
280 }
281 });
282 }
283
284 let versions = self.versions.read();
286
287 versions
289 .range(..=target_epoch)
290 .next_back()
291 .and_then(|(_, v)| {
292 if v.is_delete {
293 None
294 } else {
295 Some(v.value.clone())
296 }
297 })
298 }
299
300 pub fn all_versions(&self) -> Vec<(u64, VersionEntry<V>)> {
302 self.versions
303 .read()
304 .iter()
305 .map(|(&e, v)| (e, v.clone()))
306 .collect()
307 }
308
309 pub fn gc_before_epoch(&self, epoch: u64) -> usize {
311 let mut versions = self.versions.write();
312 let old_len = versions.len();
313 versions.retain(|&e, _| e >= epoch);
314 old_len - versions.len()
315 }
316
317 pub fn version_count(&self) -> usize {
319 self.versions.read().len()
320 }
321
322 pub fn is_empty(&self) -> bool {
324 self.versions.read().is_empty()
325 }
326}
327
328impl<V: Clone> Default for EpochVersionChain<V> {
329 fn default() -> Self {
330 Self::new()
331 }
332}
333
334pub type Key = Vec<u8>;
340
341pub struct EpochMvccStore<V> {
343 data: dashmap::DashMap<Key, EpochVersionChain<V>>,
345 epoch_manager: Arc<EpochManager>,
347 next_txn_id: AtomicU64,
349}
350
351impl<V: Clone + Send + Sync + 'static> EpochMvccStore<V> {
352 pub fn new() -> Self {
353 Self::with_epoch_manager(Arc::new(EpochManager::new()))
354 }
355
356 pub fn with_epoch_manager(epoch_manager: Arc<EpochManager>) -> Self {
357 Self {
358 data: dashmap::DashMap::new(),
359 epoch_manager,
360 next_txn_id: AtomicU64::new(1),
361 }
362 }
363
364 pub fn epoch_manager(&self) -> &Arc<EpochManager> {
366 &self.epoch_manager
367 }
368
369 pub fn begin_txn(&self) -> EpochTransaction<'_, V> {
371 let epoch = self.epoch_manager.current_epoch();
372 let txn_id = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
373
374 self.epoch_manager.register_reader(epoch);
375
376 EpochTransaction {
377 txn_id,
378 read_epoch: epoch,
379 write_buffer: Vec::new(),
380 store: self,
381 }
382 }
383
384 fn write(&self, key: Key, value: V, epoch: u64, txn_id: u64) {
386 let chain = self.data.entry(key).or_insert_with(EpochVersionChain::new);
387 chain.add_version(epoch, VersionEntry::new(value, epoch, txn_id));
388 }
389
390 fn delete(&self, key: Key, epoch: u64, txn_id: u64)
392 where
393 V: Default,
394 {
395 let chain = self.data.entry(key).or_insert_with(EpochVersionChain::new);
396 chain.add_version(epoch, VersionEntry::tombstone(epoch, txn_id));
397 }
398
399 pub fn read_at_epoch(&self, key: &[u8], epoch: u64) -> Option<V> {
401 self.data
402 .get(key)
403 .and_then(|chain| chain.read_at_epoch(epoch))
404 }
405
406 pub fn maybe_advance_epoch(&self) -> Option<u64> {
408 if self.epoch_manager.should_advance() {
409 Some(self.epoch_manager.advance_epoch())
410 } else {
411 None
412 }
413 }
414
415 pub fn gc(&self) -> GcStats {
417 let min_safe = self.epoch_manager.min_safe_epoch();
418 let mut stats = GcStats::default();
419
420 for mut entry in self.data.iter_mut() {
421 let removed = entry.value_mut().gc_before_epoch(min_safe);
422 stats.versions_removed += removed;
423 if entry.value().is_empty() {
424 stats.chains_emptied += 1;
425 }
426 }
427
428 self.data.retain(|_, chain| !chain.is_empty());
430
431 stats
432 }
433
434 pub fn stats(&self) -> StoreStats {
436 let mut total_versions = 0;
437 let mut max_versions_per_key = 0;
438
439 for entry in self.data.iter() {
440 let count = entry.value().version_count();
441 total_versions += count;
442 max_versions_per_key = max_versions_per_key.max(count);
443 }
444
445 StoreStats {
446 key_count: self.data.len(),
447 total_versions,
448 max_versions_per_key,
449 current_epoch: self.epoch_manager.current_epoch(),
450 min_safe_epoch: self.epoch_manager.min_safe_epoch(),
451 }
452 }
453}
454
455impl<V: Clone + Send + Sync + 'static> Default for EpochMvccStore<V> {
456 fn default() -> Self {
457 Self::new()
458 }
459}
460
461pub struct EpochTransaction<'a, V> {
467 txn_id: u64,
469 read_epoch: u64,
471 write_buffer: Vec<WriteOp<V>>,
473 store: &'a EpochMvccStore<V>,
475}
476
477enum WriteOp<V> {
479 Put(Key, V),
480 Delete(Key),
481}
482
483impl<'a, V: Clone + Send + Sync + Default + 'static> EpochTransaction<'a, V> {
484 pub fn txn_id(&self) -> u64 {
486 self.txn_id
487 }
488
489 pub fn read_epoch(&self) -> u64 {
491 self.read_epoch
492 }
493
494 pub fn get(&self, key: &[u8]) -> Option<V> {
496 for op in self.write_buffer.iter().rev() {
498 match op {
499 WriteOp::Put(k, v) if k == key => return Some(v.clone()),
500 WriteOp::Delete(k) if k == key => return None,
501 _ => {}
502 }
503 }
504
505 self.store.read_at_epoch(key, self.read_epoch)
507 }
508
509 pub fn put(&mut self, key: Key, value: V) {
511 self.write_buffer.push(WriteOp::Put(key, value));
512 }
513
514 pub fn delete(&mut self, key: Key) {
516 self.write_buffer.push(WriteOp::Delete(key));
517 }
518
519 pub fn commit(mut self) -> CommitResult {
521 let commit_epoch = self.store.epoch_manager.current_epoch();
522 let write_count = self.write_buffer.len();
523
524 for op in self.write_buffer.drain(..) {
525 match op {
526 WriteOp::Put(key, value) => {
527 self.store.write(key, value, commit_epoch, self.txn_id);
528 }
529 WriteOp::Delete(key) => {
530 self.store.delete(key, commit_epoch, self.txn_id);
531 }
532 }
533 }
534
535 self.store.epoch_manager.unregister_reader(self.read_epoch);
537
538 CommitResult {
539 txn_id: self.txn_id,
540 commit_epoch,
541 write_count,
542 }
543 }
544
545 pub fn abort(self) {
547 self.store.epoch_manager.unregister_reader(self.read_epoch);
549 }
550}
551
552impl<'a, V> Drop for EpochTransaction<'a, V> {
553 fn drop(&mut self) {
554 }
557}
558
559#[derive(Debug)]
561pub struct CommitResult {
562 pub txn_id: u64,
563 pub commit_epoch: u64,
564 pub write_count: usize,
565}
566
567#[derive(Debug, Default)]
569pub struct GcStats {
570 pub versions_removed: usize,
571 pub chains_emptied: usize,
572}
573
574#[derive(Debug)]
576pub struct StoreStats {
577 pub key_count: usize,
578 pub total_versions: usize,
579 pub max_versions_per_key: usize,
580 pub current_epoch: u64,
581 pub min_safe_epoch: u64,
582}
583
584pub struct EpochSnapshot<'a, V> {
590 epoch: u64,
591 store: &'a EpochMvccStore<V>,
592}
593
594impl<'a, V: Clone + Send + Sync + 'static> EpochSnapshot<'a, V> {
595 pub fn new(store: &'a EpochMvccStore<V>) -> Self {
597 let epoch = store.epoch_manager.current_epoch();
598 store.epoch_manager.register_reader(epoch);
599 Self { epoch, store }
600 }
601
602 pub fn at_epoch(store: &'a EpochMvccStore<V>, epoch: u64) -> Self {
604 store.epoch_manager.register_reader(epoch);
605 Self { epoch, store }
606 }
607
608 pub fn epoch(&self) -> u64 {
610 self.epoch
611 }
612
613 pub fn get(&self, key: &[u8]) -> Option<V> {
615 self.store.read_at_epoch(key, self.epoch)
616 }
617
618 pub fn keys(&self) -> Vec<Key> {
620 self.store
621 .data
622 .iter()
623 .filter(|e| e.value().read_at_epoch(self.epoch).is_some())
624 .map(|e| e.key().clone())
625 .collect()
626 }
627}
628
629impl<'a, V> Drop for EpochSnapshot<'a, V> {
630 fn drop(&mut self) {
631 self.store.epoch_manager.unregister_reader(self.epoch);
632 }
633}
634
635#[cfg(test)]
640mod tests {
641 use super::*;
642
643 #[test]
644 fn test_epoch_manager_basics() {
645 let manager = EpochManager::with_duration_ms(1); assert_eq!(manager.current_epoch(), 1);
648
649 std::thread::sleep(std::time::Duration::from_millis(2));
651 assert!(manager.should_advance());
652
653 let new_epoch = manager.advance_epoch();
654 assert_eq!(new_epoch, 2);
655 assert_eq!(manager.current_epoch(), 2);
656 }
657
658 #[test]
659 fn test_epoch_reader_tracking() {
660 let manager = EpochManager::new();
661
662 manager.register_reader(1);
663 manager.register_reader(1);
664 manager.register_reader(2);
665
666 assert_eq!(manager.min_safe_epoch(), 1);
667
668 manager.unregister_reader(1);
669 assert_eq!(manager.min_safe_epoch(), 1); manager.unregister_reader(1);
672 assert_eq!(manager.min_safe_epoch(), 2); }
674
675 #[test]
676 fn test_version_chain_read_at_epoch() {
677 let chain: EpochVersionChain<String> = EpochVersionChain::new();
678
679 chain.add_version(1, VersionEntry::new("v1".to_string(), 1, 1));
680 chain.add_version(3, VersionEntry::new("v3".to_string(), 3, 3));
681 chain.add_version(5, VersionEntry::new("v5".to_string(), 5, 5));
682
683 assert_eq!(chain.read_at_epoch(0), None);
685 assert_eq!(chain.read_at_epoch(1), Some("v1".to_string()));
686 assert_eq!(chain.read_at_epoch(2), Some("v1".to_string()));
687 assert_eq!(chain.read_at_epoch(3), Some("v3".to_string()));
688 assert_eq!(chain.read_at_epoch(4), Some("v3".to_string()));
689 assert_eq!(chain.read_at_epoch(5), Some("v5".to_string()));
690 assert_eq!(chain.read_at_epoch(100), Some("v5".to_string()));
691 }
692
693 #[test]
694 fn test_version_chain_delete() {
695 let chain: EpochVersionChain<String> = EpochVersionChain::new();
696
697 chain.add_version(1, VersionEntry::new("value".to_string(), 1, 1));
698 chain.add_version(2, VersionEntry::tombstone(2, 2));
699 chain.add_version(3, VersionEntry::new("resurrected".to_string(), 3, 3));
700
701 assert_eq!(chain.read_at_epoch(1), Some("value".to_string()));
702 assert_eq!(chain.read_at_epoch(2), None); assert_eq!(chain.read_at_epoch(3), Some("resurrected".to_string()));
704 }
705
706 #[test]
707 fn test_version_chain_gc() {
708 let chain: EpochVersionChain<i32> = EpochVersionChain::new();
709
710 for i in 1..=10 {
711 chain.add_version(i, VersionEntry::new(i as i32, i, i));
712 }
713
714 assert_eq!(chain.version_count(), 10);
715
716 let removed = chain.gc_before_epoch(5);
717 assert_eq!(removed, 4);
718 assert_eq!(chain.version_count(), 6);
719
720 assert_eq!(chain.read_at_epoch(4), None);
722 assert_eq!(chain.read_at_epoch(5), Some(5));
724 }
725
726 #[test]
727 fn test_mvcc_store_basic() {
728 let store: EpochMvccStore<String> = EpochMvccStore::new();
729
730 let mut txn = store.begin_txn();
731 txn.put(b"key1".to_vec(), "value1".to_string());
732 txn.put(b"key2".to_vec(), "value2".to_string());
733 let result = txn.commit();
734
735 assert_eq!(result.write_count, 2);
736
737 let txn2 = store.begin_txn();
739 assert_eq!(txn2.get(b"key1"), Some("value1".to_string()));
740 assert_eq!(txn2.get(b"key2"), Some("value2".to_string()));
741 txn2.abort();
742 }
743
744 #[test]
745 fn test_mvcc_store_snapshot_isolation() {
746 let store: EpochMvccStore<i32> = EpochMvccStore::new();
747
748 let mut txn1 = store.begin_txn();
750 txn1.put(b"x".to_vec(), 1);
751 txn1.commit();
752
753 store.epoch_manager().advance_epoch();
755
756 let snapshot = EpochSnapshot::new(&store);
758 assert_eq!(snapshot.get(b"x"), Some(1));
759
760 store.epoch_manager().advance_epoch();
762 let mut txn2 = store.begin_txn();
763 txn2.put(b"x".to_vec(), 2);
764 txn2.commit();
765
766 assert_eq!(snapshot.get(b"x"), Some(1));
768
769 let txn3 = store.begin_txn();
771 assert_eq!(txn3.get(b"x"), Some(2));
772 txn3.abort();
773 }
774
775 #[test]
776 fn test_mvcc_store_delete() {
777 let store: EpochMvccStore<String> = EpochMvccStore::new();
778
779 let mut txn1 = store.begin_txn();
781 txn1.put(b"key".to_vec(), "value".to_string());
782 txn1.commit();
783
784 store.epoch_manager().advance_epoch();
785
786 let snap = EpochSnapshot::new(&store);
788
789 store.epoch_manager().advance_epoch();
790
791 let mut txn2 = store.begin_txn();
793 txn2.delete(b"key".to_vec());
794 txn2.commit();
795
796 assert_eq!(snap.get(b"key"), Some("value".to_string()));
798
799 let txn3 = store.begin_txn();
801 assert_eq!(txn3.get(b"key"), None);
802 txn3.abort();
803 }
804
805 #[test]
806 fn test_mvcc_store_write_buffer() {
807 let store: EpochMvccStore<i32> = EpochMvccStore::new();
808
809 let mut txn = store.begin_txn();
810
811 txn.put(b"a".to_vec(), 1);
813 txn.put(b"b".to_vec(), 2);
814
815 assert_eq!(txn.get(b"a"), Some(1));
817 assert_eq!(txn.get(b"b"), Some(2));
818
819 txn.put(b"a".to_vec(), 10);
821 assert_eq!(txn.get(b"a"), Some(10));
822
823 txn.delete(b"b".to_vec());
825 assert_eq!(txn.get(b"b"), None);
826
827 txn.commit();
828 }
829
830 #[test]
831 fn test_mvcc_store_gc() {
832 let store: EpochMvccStore<i32> = EpochMvccStore::new();
833
834 for i in 0..5 {
836 let mut txn = store.begin_txn();
837 txn.put(b"key".to_vec(), i);
838 txn.commit();
839 store.epoch_manager().advance_epoch();
840 }
841
842 let stats = store.stats();
843 assert!(stats.total_versions >= 5);
844
845 let gc_stats = store.gc();
847
848 let _ = gc_stats.versions_removed;
851 }
852
853 #[test]
854 fn test_epoch_snapshot_keys() {
855 let store: EpochMvccStore<i32> = EpochMvccStore::new();
856
857 let mut txn = store.begin_txn();
858 txn.put(b"a".to_vec(), 1);
859 txn.put(b"b".to_vec(), 2);
860 txn.put(b"c".to_vec(), 3);
861 txn.commit();
862
863 let snap = EpochSnapshot::new(&store);
864 let keys = snap.keys();
865
866 assert_eq!(keys.len(), 3);
867 assert!(keys.contains(&b"a".to_vec()));
868 assert!(keys.contains(&b"b".to_vec()));
869 assert!(keys.contains(&b"c".to_vec()));
870 }
871}