1use std::collections::BTreeMap;
63use std::sync::atomic::{AtomicU64, Ordering};
64use std::sync::Arc;
65
66use parking_lot::RwLock;
67
68pub const DEFAULT_EPOCH_DURATION_MS: u64 = 10;
70
71pub const MAX_EPOCHS_IN_MEMORY: usize = 100;
73
74pub const MIN_ENTRIES_PER_EPOCH: usize = 1000;
76
77pub struct EpochManager {
83 current_epoch: AtomicU64,
85 epoch_start_time: AtomicU64,
87 epoch_duration_ns: u64,
89 active_readers: RwLock<BTreeMap<u64, u64>>,
91 min_safe_epoch: AtomicU64,
93}
94
95impl EpochManager {
96 pub fn new() -> Self {
97 Self::with_duration_ms(DEFAULT_EPOCH_DURATION_MS)
98 }
99
100 pub fn with_duration_ms(duration_ms: u64) -> Self {
101 let now = Self::current_time_ns();
102 Self {
103 current_epoch: AtomicU64::new(1),
104 epoch_start_time: AtomicU64::new(now),
105 epoch_duration_ns: duration_ms * 1_000_000,
106 active_readers: RwLock::new(BTreeMap::new()),
107 min_safe_epoch: AtomicU64::new(1),
108 }
109 }
110
111 #[inline]
113 pub fn current_epoch(&self) -> u64 {
114 self.current_epoch.load(Ordering::Acquire)
115 }
116
117 #[inline]
119 pub fn min_safe_epoch(&self) -> u64 {
120 self.min_safe_epoch.load(Ordering::Acquire)
121 }
122
123 pub fn should_advance(&self) -> bool {
125 let now = Self::current_time_ns();
126 let start = self.epoch_start_time.load(Ordering::Relaxed);
127 now.saturating_sub(start) >= self.epoch_duration_ns
128 }
129
130 pub fn advance_epoch(&self) -> u64 {
132 let new_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
133 self.epoch_start_time.store(Self::current_time_ns(), Ordering::Relaxed);
134 new_epoch
135 }
136
137 pub fn register_reader(&self, epoch: u64) {
139 let mut readers = self.active_readers.write();
140 *readers.entry(epoch).or_insert(0) += 1;
141 }
142
143 pub fn unregister_reader(&self, epoch: u64) {
145 let mut readers = self.active_readers.write();
146 if let Some(count) = readers.get_mut(&epoch) {
147 *count = count.saturating_sub(1);
148 if *count == 0 {
149 readers.remove(&epoch);
150 if let Some(&min_epoch) = readers.keys().next() {
152 self.min_safe_epoch.store(min_epoch, Ordering::Release);
153 } else {
154 self.min_safe_epoch.store(
155 self.current_epoch.load(Ordering::Relaxed),
156 Ordering::Release,
157 );
158 }
159 }
160 }
161 }
162
163 pub fn gc_eligible_epochs(&self) -> Vec<u64> {
165 let min_safe = self.min_safe_epoch.load(Ordering::Acquire);
166 let readers = self.active_readers.read();
167
168 readers
170 .keys()
171 .filter(|&&e| e < min_safe)
172 .copied()
173 .collect()
174 }
175
176 #[inline]
177 fn current_time_ns() -> u64 {
178 std::time::SystemTime::now()
179 .duration_since(std::time::UNIX_EPOCH)
180 .map(|d| d.as_nanos() as u64)
181 .unwrap_or(0)
182 }
183}
184
185impl Default for EpochManager {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191#[derive(Debug, Clone)]
197pub struct VersionEntry<V> {
198 pub value: V,
200 pub epoch: u64,
202 pub txn_id: u64,
204 pub is_delete: bool,
206}
207
208impl<V> VersionEntry<V> {
209 pub fn new(value: V, epoch: u64, txn_id: u64) -> Self {
210 Self {
211 value,
212 epoch,
213 txn_id,
214 is_delete: false,
215 }
216 }
217
218 pub fn tombstone(epoch: u64, txn_id: u64) -> Self
219 where
220 V: Default,
221 {
222 Self {
223 value: V::default(),
224 epoch,
225 txn_id,
226 is_delete: true,
227 }
228 }
229}
230
231pub struct EpochVersionChain<V> {
243 versions: RwLock<BTreeMap<u64, VersionEntry<V>>>,
246 latest_epoch: AtomicU64,
248}
249
250impl<V: Clone> EpochVersionChain<V> {
251 pub fn new() -> Self {
252 Self {
253 versions: RwLock::new(BTreeMap::new()),
254 latest_epoch: AtomicU64::new(0),
255 }
256 }
257
258 pub fn add_version(&self, epoch: u64, entry: VersionEntry<V>) {
260 let mut versions = self.versions.write();
261 versions.insert(epoch, entry);
262
263 let current = self.latest_epoch.load(Ordering::Relaxed);
265 if epoch > current {
266 self.latest_epoch.store(epoch, Ordering::Release);
267 }
268 }
269
270 pub fn read_at_epoch(&self, target_epoch: u64) -> Option<V> {
274 let latest = self.latest_epoch.load(Ordering::Acquire);
276 if target_epoch >= latest {
277 let versions = self.versions.read();
278 return versions.get(&latest).and_then(|v| {
279 if v.is_delete {
280 None
281 } else {
282 Some(v.value.clone())
283 }
284 });
285 }
286
287 let versions = self.versions.read();
289
290 versions
292 .range(..=target_epoch)
293 .next_back()
294 .and_then(|(_, v)| {
295 if v.is_delete {
296 None
297 } else {
298 Some(v.value.clone())
299 }
300 })
301 }
302
303 pub fn all_versions(&self) -> Vec<(u64, VersionEntry<V>)> {
305 self.versions
306 .read()
307 .iter()
308 .map(|(&e, v)| (e, v.clone()))
309 .collect()
310 }
311
312 pub fn gc_before_epoch(&self, epoch: u64) -> usize {
314 let mut versions = self.versions.write();
315 let old_len = versions.len();
316 versions.retain(|&e, _| e >= epoch);
317 old_len - versions.len()
318 }
319
320 pub fn version_count(&self) -> usize {
322 self.versions.read().len()
323 }
324
325 pub fn is_empty(&self) -> bool {
327 self.versions.read().is_empty()
328 }
329}
330
331impl<V: Clone> Default for EpochVersionChain<V> {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337pub type Key = Vec<u8>;
343
344pub struct EpochMvccStore<V> {
346 data: dashmap::DashMap<Key, EpochVersionChain<V>>,
348 epoch_manager: Arc<EpochManager>,
350 next_txn_id: AtomicU64,
352}
353
354impl<V: Clone + Send + Sync + 'static> EpochMvccStore<V> {
355 pub fn new() -> Self {
356 Self::with_epoch_manager(Arc::new(EpochManager::new()))
357 }
358
359 pub fn with_epoch_manager(epoch_manager: Arc<EpochManager>) -> Self {
360 Self {
361 data: dashmap::DashMap::new(),
362 epoch_manager,
363 next_txn_id: AtomicU64::new(1),
364 }
365 }
366
367 pub fn epoch_manager(&self) -> &Arc<EpochManager> {
369 &self.epoch_manager
370 }
371
372 pub fn begin_txn(&self) -> EpochTransaction<'_, V> {
374 let epoch = self.epoch_manager.current_epoch();
375 let txn_id = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
376
377 self.epoch_manager.register_reader(epoch);
378
379 EpochTransaction {
380 txn_id,
381 read_epoch: epoch,
382 write_buffer: Vec::new(),
383 store: self,
384 }
385 }
386
387 fn write(&self, key: Key, value: V, epoch: u64, txn_id: u64) {
389 let chain = self.data.entry(key).or_insert_with(EpochVersionChain::new);
390 chain.add_version(epoch, VersionEntry::new(value, epoch, txn_id));
391 }
392
393 fn delete(&self, key: Key, epoch: u64, txn_id: u64)
395 where
396 V: Default,
397 {
398 let chain = self.data.entry(key).or_insert_with(EpochVersionChain::new);
399 chain.add_version(epoch, VersionEntry::tombstone(epoch, txn_id));
400 }
401
402 pub fn read_at_epoch(&self, key: &[u8], epoch: u64) -> Option<V> {
404 self.data.get(key).and_then(|chain| chain.read_at_epoch(epoch))
405 }
406
407 pub fn maybe_advance_epoch(&self) -> Option<u64> {
409 if self.epoch_manager.should_advance() {
410 Some(self.epoch_manager.advance_epoch())
411 } else {
412 None
413 }
414 }
415
416 pub fn gc(&self) -> GcStats {
418 let min_safe = self.epoch_manager.min_safe_epoch();
419 let mut stats = GcStats::default();
420
421 for mut entry in self.data.iter_mut() {
422 let removed = entry.value_mut().gc_before_epoch(min_safe);
423 stats.versions_removed += removed;
424 if entry.value().is_empty() {
425 stats.chains_emptied += 1;
426 }
427 }
428
429 self.data.retain(|_, chain| !chain.is_empty());
431
432 stats
433 }
434
435 pub fn stats(&self) -> StoreStats {
437 let mut total_versions = 0;
438 let mut max_versions_per_key = 0;
439
440 for entry in self.data.iter() {
441 let count = entry.value().version_count();
442 total_versions += count;
443 max_versions_per_key = max_versions_per_key.max(count);
444 }
445
446 StoreStats {
447 key_count: self.data.len(),
448 total_versions,
449 max_versions_per_key,
450 current_epoch: self.epoch_manager.current_epoch(),
451 min_safe_epoch: self.epoch_manager.min_safe_epoch(),
452 }
453 }
454}
455
456impl<V: Clone + Send + Sync + 'static> Default for EpochMvccStore<V> {
457 fn default() -> Self {
458 Self::new()
459 }
460}
461
462pub struct EpochTransaction<'a, V> {
468 txn_id: u64,
470 read_epoch: u64,
472 write_buffer: Vec<WriteOp<V>>,
474 store: &'a EpochMvccStore<V>,
476}
477
478enum WriteOp<V> {
480 Put(Key, V),
481 Delete(Key),
482}
483
484impl<'a, V: Clone + Send + Sync + Default + 'static> EpochTransaction<'a, V> {
485 pub fn txn_id(&self) -> u64 {
487 self.txn_id
488 }
489
490 pub fn read_epoch(&self) -> u64 {
492 self.read_epoch
493 }
494
495 pub fn get(&self, key: &[u8]) -> Option<V> {
497 for op in self.write_buffer.iter().rev() {
499 match op {
500 WriteOp::Put(k, v) if k == key => return Some(v.clone()),
501 WriteOp::Delete(k) if k == key => return None,
502 _ => {}
503 }
504 }
505
506 self.store.read_at_epoch(key, self.read_epoch)
508 }
509
510 pub fn put(&mut self, key: Key, value: V) {
512 self.write_buffer.push(WriteOp::Put(key, value));
513 }
514
515 pub fn delete(&mut self, key: Key) {
517 self.write_buffer.push(WriteOp::Delete(key));
518 }
519
520 pub fn commit(mut self) -> CommitResult {
522 let commit_epoch = self.store.epoch_manager.current_epoch();
523 let write_count = self.write_buffer.len();
524
525 for op in self.write_buffer.drain(..) {
526 match op {
527 WriteOp::Put(key, value) => {
528 self.store.write(key, value, commit_epoch, self.txn_id);
529 }
530 WriteOp::Delete(key) => {
531 self.store.delete(key, commit_epoch, self.txn_id);
532 }
533 }
534 }
535
536 self.store.epoch_manager.unregister_reader(self.read_epoch);
538
539 CommitResult {
540 txn_id: self.txn_id,
541 commit_epoch,
542 write_count,
543 }
544 }
545
546 pub fn abort(self) {
548 self.store.epoch_manager.unregister_reader(self.read_epoch);
550 }
551}
552
553impl<'a, V> Drop for EpochTransaction<'a, V> {
554 fn drop(&mut self) {
555 }
558}
559
560#[derive(Debug)]
562pub struct CommitResult {
563 pub txn_id: u64,
564 pub commit_epoch: u64,
565 pub write_count: usize,
566}
567
568#[derive(Debug, Default)]
570pub struct GcStats {
571 pub versions_removed: usize,
572 pub chains_emptied: usize,
573}
574
575#[derive(Debug)]
577pub struct StoreStats {
578 pub key_count: usize,
579 pub total_versions: usize,
580 pub max_versions_per_key: usize,
581 pub current_epoch: u64,
582 pub min_safe_epoch: u64,
583}
584
585pub struct EpochSnapshot<'a, V> {
591 epoch: u64,
592 store: &'a EpochMvccStore<V>,
593}
594
595impl<'a, V: Clone + Send + Sync + 'static> EpochSnapshot<'a, V> {
596 pub fn new(store: &'a EpochMvccStore<V>) -> Self {
598 let epoch = store.epoch_manager.current_epoch();
599 store.epoch_manager.register_reader(epoch);
600 Self { epoch, store }
601 }
602
603 pub fn at_epoch(store: &'a EpochMvccStore<V>, epoch: u64) -> Self {
605 store.epoch_manager.register_reader(epoch);
606 Self { epoch, store }
607 }
608
609 pub fn epoch(&self) -> u64 {
611 self.epoch
612 }
613
614 pub fn get(&self, key: &[u8]) -> Option<V> {
616 self.store.read_at_epoch(key, self.epoch)
617 }
618
619 pub fn keys(&self) -> Vec<Key> {
621 self.store
622 .data
623 .iter()
624 .filter(|e| e.value().read_at_epoch(self.epoch).is_some())
625 .map(|e| e.key().clone())
626 .collect()
627 }
628}
629
630impl<'a, V> Drop for EpochSnapshot<'a, V> {
631 fn drop(&mut self) {
632 self.store.epoch_manager.unregister_reader(self.epoch);
633 }
634}
635
636#[cfg(test)]
641mod tests {
642 use super::*;
643
644 #[test]
645 fn test_epoch_manager_basics() {
646 let manager = EpochManager::with_duration_ms(1); assert_eq!(manager.current_epoch(), 1);
649
650 std::thread::sleep(std::time::Duration::from_millis(2));
652 assert!(manager.should_advance());
653
654 let new_epoch = manager.advance_epoch();
655 assert_eq!(new_epoch, 2);
656 assert_eq!(manager.current_epoch(), 2);
657 }
658
659 #[test]
660 fn test_epoch_reader_tracking() {
661 let manager = EpochManager::new();
662
663 manager.register_reader(1);
664 manager.register_reader(1);
665 manager.register_reader(2);
666
667 assert_eq!(manager.min_safe_epoch(), 1);
668
669 manager.unregister_reader(1);
670 assert_eq!(manager.min_safe_epoch(), 1); manager.unregister_reader(1);
673 assert_eq!(manager.min_safe_epoch(), 2); }
675
676 #[test]
677 fn test_version_chain_read_at_epoch() {
678 let chain: EpochVersionChain<String> = EpochVersionChain::new();
679
680 chain.add_version(1, VersionEntry::new("v1".to_string(), 1, 1));
681 chain.add_version(3, VersionEntry::new("v3".to_string(), 3, 3));
682 chain.add_version(5, VersionEntry::new("v5".to_string(), 5, 5));
683
684 assert_eq!(chain.read_at_epoch(0), None);
686 assert_eq!(chain.read_at_epoch(1), Some("v1".to_string()));
687 assert_eq!(chain.read_at_epoch(2), Some("v1".to_string()));
688 assert_eq!(chain.read_at_epoch(3), Some("v3".to_string()));
689 assert_eq!(chain.read_at_epoch(4), Some("v3".to_string()));
690 assert_eq!(chain.read_at_epoch(5), Some("v5".to_string()));
691 assert_eq!(chain.read_at_epoch(100), Some("v5".to_string()));
692 }
693
694 #[test]
695 fn test_version_chain_delete() {
696 let chain: EpochVersionChain<String> = EpochVersionChain::new();
697
698 chain.add_version(1, VersionEntry::new("value".to_string(), 1, 1));
699 chain.add_version(2, VersionEntry::tombstone(2, 2));
700 chain.add_version(3, VersionEntry::new("resurrected".to_string(), 3, 3));
701
702 assert_eq!(chain.read_at_epoch(1), Some("value".to_string()));
703 assert_eq!(chain.read_at_epoch(2), None); assert_eq!(chain.read_at_epoch(3), Some("resurrected".to_string()));
705 }
706
707 #[test]
708 fn test_version_chain_gc() {
709 let chain: EpochVersionChain<i32> = EpochVersionChain::new();
710
711 for i in 1..=10 {
712 chain.add_version(i, VersionEntry::new(i as i32, i, i));
713 }
714
715 assert_eq!(chain.version_count(), 10);
716
717 let removed = chain.gc_before_epoch(5);
718 assert_eq!(removed, 4);
719 assert_eq!(chain.version_count(), 6);
720
721 assert_eq!(chain.read_at_epoch(4), None);
723 assert_eq!(chain.read_at_epoch(5), Some(5));
725 }
726
727 #[test]
728 fn test_mvcc_store_basic() {
729 let store: EpochMvccStore<String> = EpochMvccStore::new();
730
731 let mut txn = store.begin_txn();
732 txn.put(b"key1".to_vec(), "value1".to_string());
733 txn.put(b"key2".to_vec(), "value2".to_string());
734 let result = txn.commit();
735
736 assert_eq!(result.write_count, 2);
737
738 let txn2 = store.begin_txn();
740 assert_eq!(txn2.get(b"key1"), Some("value1".to_string()));
741 assert_eq!(txn2.get(b"key2"), Some("value2".to_string()));
742 txn2.abort();
743 }
744
745 #[test]
746 fn test_mvcc_store_snapshot_isolation() {
747 let store: EpochMvccStore<i32> = EpochMvccStore::new();
748
749 let mut txn1 = store.begin_txn();
751 txn1.put(b"x".to_vec(), 1);
752 txn1.commit();
753
754 store.epoch_manager().advance_epoch();
756
757 let snapshot = EpochSnapshot::new(&store);
759 assert_eq!(snapshot.get(b"x"), Some(1));
760
761 store.epoch_manager().advance_epoch();
763 let mut txn2 = store.begin_txn();
764 txn2.put(b"x".to_vec(), 2);
765 txn2.commit();
766
767 assert_eq!(snapshot.get(b"x"), Some(1));
769
770 let txn3 = store.begin_txn();
772 assert_eq!(txn3.get(b"x"), Some(2));
773 txn3.abort();
774 }
775
776 #[test]
777 fn test_mvcc_store_delete() {
778 let store: EpochMvccStore<String> = EpochMvccStore::new();
779
780 let mut txn1 = store.begin_txn();
782 txn1.put(b"key".to_vec(), "value".to_string());
783 txn1.commit();
784
785 store.epoch_manager().advance_epoch();
786
787 let snap = EpochSnapshot::new(&store);
789
790 store.epoch_manager().advance_epoch();
791
792 let mut txn2 = store.begin_txn();
794 txn2.delete(b"key".to_vec());
795 txn2.commit();
796
797 assert_eq!(snap.get(b"key"), Some("value".to_string()));
799
800 let txn3 = store.begin_txn();
802 assert_eq!(txn3.get(b"key"), None);
803 txn3.abort();
804 }
805
806 #[test]
807 fn test_mvcc_store_write_buffer() {
808 let store: EpochMvccStore<i32> = EpochMvccStore::new();
809
810 let mut txn = store.begin_txn();
811
812 txn.put(b"a".to_vec(), 1);
814 txn.put(b"b".to_vec(), 2);
815
816 assert_eq!(txn.get(b"a"), Some(1));
818 assert_eq!(txn.get(b"b"), Some(2));
819
820 txn.put(b"a".to_vec(), 10);
822 assert_eq!(txn.get(b"a"), Some(10));
823
824 txn.delete(b"b".to_vec());
826 assert_eq!(txn.get(b"b"), None);
827
828 txn.commit();
829 }
830
831 #[test]
832 fn test_mvcc_store_gc() {
833 let store: EpochMvccStore<i32> = EpochMvccStore::new();
834
835 for i in 0..5 {
837 let mut txn = store.begin_txn();
838 txn.put(b"key".to_vec(), i);
839 txn.commit();
840 store.epoch_manager().advance_epoch();
841 }
842
843 let stats = store.stats();
844 assert!(stats.total_versions >= 5);
845
846 let gc_stats = store.gc();
848
849 assert!(gc_stats.versions_removed >= 0);
852 }
853
854 #[test]
855 fn test_epoch_snapshot_keys() {
856 let store: EpochMvccStore<i32> = EpochMvccStore::new();
857
858 let mut txn = store.begin_txn();
859 txn.put(b"a".to_vec(), 1);
860 txn.put(b"b".to_vec(), 2);
861 txn.put(b"c".to_vec(), 3);
862 txn.commit();
863
864 let snap = EpochSnapshot::new(&store);
865 let keys = snap.keys();
866
867 assert_eq!(keys.len(), 3);
868 assert!(keys.contains(&b"a".to_vec()));
869 assert!(keys.contains(&b"b".to_vec()));
870 assert!(keys.contains(&b"c".to_vec()));
871 }
872}