1use std::collections::BTreeMap;
60use std::sync::atomic::{AtomicU64, Ordering};
61use std::sync::Arc;
62
63use parking_lot::RwLock;
64
65pub const DEFAULT_EPOCH_DURATION_MS: u64 = 10;
67
68pub const MAX_EPOCHS_IN_MEMORY: usize = 100;
70
71pub const MIN_ENTRIES_PER_EPOCH: usize = 1000;
73
74pub struct EpochManager {
80 current_epoch: AtomicU64,
82 epoch_start_time: AtomicU64,
84 epoch_duration_ns: u64,
86 active_readers: RwLock<BTreeMap<u64, u64>>,
88 min_safe_epoch: AtomicU64,
90}
91
92impl EpochManager {
93 pub fn new() -> Self {
94 Self::with_duration_ms(DEFAULT_EPOCH_DURATION_MS)
95 }
96
97 pub fn with_duration_ms(duration_ms: u64) -> Self {
98 let now = Self::current_time_ns();
99 Self {
100 current_epoch: AtomicU64::new(1),
101 epoch_start_time: AtomicU64::new(now),
102 epoch_duration_ns: duration_ms * 1_000_000,
103 active_readers: RwLock::new(BTreeMap::new()),
104 min_safe_epoch: AtomicU64::new(1),
105 }
106 }
107
108 #[inline]
110 pub fn current_epoch(&self) -> u64 {
111 self.current_epoch.load(Ordering::Acquire)
112 }
113
114 #[inline]
116 pub fn min_safe_epoch(&self) -> u64 {
117 self.min_safe_epoch.load(Ordering::Acquire)
118 }
119
120 pub fn should_advance(&self) -> bool {
122 let now = Self::current_time_ns();
123 let start = self.epoch_start_time.load(Ordering::Relaxed);
124 now.saturating_sub(start) >= self.epoch_duration_ns
125 }
126
127 pub fn advance_epoch(&self) -> u64 {
129 let new_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
130 self.epoch_start_time.store(Self::current_time_ns(), Ordering::Relaxed);
131 new_epoch
132 }
133
134 pub fn register_reader(&self, epoch: u64) {
136 let mut readers = self.active_readers.write();
137 *readers.entry(epoch).or_insert(0) += 1;
138 }
139
140 pub fn unregister_reader(&self, epoch: u64) {
142 let mut readers = self.active_readers.write();
143 if let Some(count) = readers.get_mut(&epoch) {
144 *count = count.saturating_sub(1);
145 if *count == 0 {
146 readers.remove(&epoch);
147 if let Some(&min_epoch) = readers.keys().next() {
149 self.min_safe_epoch.store(min_epoch, Ordering::Release);
150 } else {
151 self.min_safe_epoch.store(
152 self.current_epoch.load(Ordering::Relaxed),
153 Ordering::Release,
154 );
155 }
156 }
157 }
158 }
159
160 pub fn gc_eligible_epochs(&self) -> Vec<u64> {
162 let min_safe = self.min_safe_epoch.load(Ordering::Acquire);
163 let readers = self.active_readers.read();
164
165 readers
167 .keys()
168 .filter(|&&e| e < min_safe)
169 .copied()
170 .collect()
171 }
172
173 #[inline]
174 fn current_time_ns() -> u64 {
175 std::time::SystemTime::now()
176 .duration_since(std::time::UNIX_EPOCH)
177 .map(|d| d.as_nanos() as u64)
178 .unwrap_or(0)
179 }
180}
181
182impl Default for EpochManager {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188#[derive(Debug, Clone)]
194pub struct VersionEntry<V> {
195 pub value: V,
197 pub epoch: u64,
199 pub txn_id: u64,
201 pub is_delete: bool,
203}
204
205impl<V> VersionEntry<V> {
206 pub fn new(value: V, epoch: u64, txn_id: u64) -> Self {
207 Self {
208 value,
209 epoch,
210 txn_id,
211 is_delete: false,
212 }
213 }
214
215 pub fn tombstone(epoch: u64, txn_id: u64) -> Self
216 where
217 V: Default,
218 {
219 Self {
220 value: V::default(),
221 epoch,
222 txn_id,
223 is_delete: true,
224 }
225 }
226}
227
228pub struct EpochVersionChain<V> {
240 versions: RwLock<BTreeMap<u64, VersionEntry<V>>>,
243 latest_epoch: AtomicU64,
245}
246
247impl<V: Clone> EpochVersionChain<V> {
248 pub fn new() -> Self {
249 Self {
250 versions: RwLock::new(BTreeMap::new()),
251 latest_epoch: AtomicU64::new(0),
252 }
253 }
254
255 pub fn add_version(&self, epoch: u64, entry: VersionEntry<V>) {
257 let mut versions = self.versions.write();
258 versions.insert(epoch, entry);
259
260 let current = self.latest_epoch.load(Ordering::Relaxed);
262 if epoch > current {
263 self.latest_epoch.store(epoch, Ordering::Release);
264 }
265 }
266
267 pub fn read_at_epoch(&self, target_epoch: u64) -> Option<V> {
271 let latest = self.latest_epoch.load(Ordering::Acquire);
273 if target_epoch >= latest {
274 let versions = self.versions.read();
275 return versions.get(&latest).and_then(|v| {
276 if v.is_delete {
277 None
278 } else {
279 Some(v.value.clone())
280 }
281 });
282 }
283
284 let versions = self.versions.read();
286
287 versions
289 .range(..=target_epoch)
290 .next_back()
291 .and_then(|(_, v)| {
292 if v.is_delete {
293 None
294 } else {
295 Some(v.value.clone())
296 }
297 })
298 }
299
300 pub fn all_versions(&self) -> Vec<(u64, VersionEntry<V>)> {
302 self.versions
303 .read()
304 .iter()
305 .map(|(&e, v)| (e, v.clone()))
306 .collect()
307 }
308
309 pub fn gc_before_epoch(&self, epoch: u64) -> usize {
311 let mut versions = self.versions.write();
312 let old_len = versions.len();
313 versions.retain(|&e, _| e >= epoch);
314 old_len - versions.len()
315 }
316
317 pub fn version_count(&self) -> usize {
319 self.versions.read().len()
320 }
321
322 pub fn is_empty(&self) -> bool {
324 self.versions.read().is_empty()
325 }
326}
327
328impl<V: Clone> Default for EpochVersionChain<V> {
329 fn default() -> Self {
330 Self::new()
331 }
332}
333
334pub type Key = Vec<u8>;
340
341pub struct EpochMvccStore<V> {
343 data: dashmap::DashMap<Key, EpochVersionChain<V>>,
345 epoch_manager: Arc<EpochManager>,
347 next_txn_id: AtomicU64,
349}
350
351impl<V: Clone + Send + Sync + 'static> EpochMvccStore<V> {
352 pub fn new() -> Self {
353 Self::with_epoch_manager(Arc::new(EpochManager::new()))
354 }
355
356 pub fn with_epoch_manager(epoch_manager: Arc<EpochManager>) -> Self {
357 Self {
358 data: dashmap::DashMap::new(),
359 epoch_manager,
360 next_txn_id: AtomicU64::new(1),
361 }
362 }
363
364 pub fn epoch_manager(&self) -> &Arc<EpochManager> {
366 &self.epoch_manager
367 }
368
369 pub fn begin_txn(&self) -> EpochTransaction<'_, V> {
371 let epoch = self.epoch_manager.current_epoch();
372 let txn_id = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
373
374 self.epoch_manager.register_reader(epoch);
375
376 EpochTransaction {
377 txn_id,
378 read_epoch: epoch,
379 write_buffer: Vec::new(),
380 store: self,
381 }
382 }
383
384 fn write(&self, key: Key, value: V, epoch: u64, txn_id: u64) {
386 let chain = self.data.entry(key).or_insert_with(EpochVersionChain::new);
387 chain.add_version(epoch, VersionEntry::new(value, epoch, txn_id));
388 }
389
390 fn delete(&self, key: Key, epoch: u64, txn_id: u64)
392 where
393 V: Default,
394 {
395 let chain = self.data.entry(key).or_insert_with(EpochVersionChain::new);
396 chain.add_version(epoch, VersionEntry::tombstone(epoch, txn_id));
397 }
398
399 pub fn read_at_epoch(&self, key: &[u8], epoch: u64) -> Option<V> {
401 self.data.get(key).and_then(|chain| chain.read_at_epoch(epoch))
402 }
403
404 pub fn maybe_advance_epoch(&self) -> Option<u64> {
406 if self.epoch_manager.should_advance() {
407 Some(self.epoch_manager.advance_epoch())
408 } else {
409 None
410 }
411 }
412
413 pub fn gc(&self) -> GcStats {
415 let min_safe = self.epoch_manager.min_safe_epoch();
416 let mut stats = GcStats::default();
417
418 for mut entry in self.data.iter_mut() {
419 let removed = entry.value_mut().gc_before_epoch(min_safe);
420 stats.versions_removed += removed;
421 if entry.value().is_empty() {
422 stats.chains_emptied += 1;
423 }
424 }
425
426 self.data.retain(|_, chain| !chain.is_empty());
428
429 stats
430 }
431
432 pub fn stats(&self) -> StoreStats {
434 let mut total_versions = 0;
435 let mut max_versions_per_key = 0;
436
437 for entry in self.data.iter() {
438 let count = entry.value().version_count();
439 total_versions += count;
440 max_versions_per_key = max_versions_per_key.max(count);
441 }
442
443 StoreStats {
444 key_count: self.data.len(),
445 total_versions,
446 max_versions_per_key,
447 current_epoch: self.epoch_manager.current_epoch(),
448 min_safe_epoch: self.epoch_manager.min_safe_epoch(),
449 }
450 }
451}
452
453impl<V: Clone + Send + Sync + 'static> Default for EpochMvccStore<V> {
454 fn default() -> Self {
455 Self::new()
456 }
457}
458
459pub struct EpochTransaction<'a, V> {
465 txn_id: u64,
467 read_epoch: u64,
469 write_buffer: Vec<WriteOp<V>>,
471 store: &'a EpochMvccStore<V>,
473}
474
475enum WriteOp<V> {
477 Put(Key, V),
478 Delete(Key),
479}
480
481impl<'a, V: Clone + Send + Sync + Default + 'static> EpochTransaction<'a, V> {
482 pub fn txn_id(&self) -> u64 {
484 self.txn_id
485 }
486
487 pub fn read_epoch(&self) -> u64 {
489 self.read_epoch
490 }
491
492 pub fn get(&self, key: &[u8]) -> Option<V> {
494 for op in self.write_buffer.iter().rev() {
496 match op {
497 WriteOp::Put(k, v) if k == key => return Some(v.clone()),
498 WriteOp::Delete(k) if k == key => return None,
499 _ => {}
500 }
501 }
502
503 self.store.read_at_epoch(key, self.read_epoch)
505 }
506
507 pub fn put(&mut self, key: Key, value: V) {
509 self.write_buffer.push(WriteOp::Put(key, value));
510 }
511
512 pub fn delete(&mut self, key: Key) {
514 self.write_buffer.push(WriteOp::Delete(key));
515 }
516
517 pub fn commit(mut self) -> CommitResult {
519 let commit_epoch = self.store.epoch_manager.current_epoch();
520 let write_count = self.write_buffer.len();
521
522 for op in self.write_buffer.drain(..) {
523 match op {
524 WriteOp::Put(key, value) => {
525 self.store.write(key, value, commit_epoch, self.txn_id);
526 }
527 WriteOp::Delete(key) => {
528 self.store.delete(key, commit_epoch, self.txn_id);
529 }
530 }
531 }
532
533 self.store.epoch_manager.unregister_reader(self.read_epoch);
535
536 CommitResult {
537 txn_id: self.txn_id,
538 commit_epoch,
539 write_count,
540 }
541 }
542
543 pub fn abort(self) {
545 self.store.epoch_manager.unregister_reader(self.read_epoch);
547 }
548}
549
550impl<'a, V> Drop for EpochTransaction<'a, V> {
551 fn drop(&mut self) {
552 }
555}
556
557#[derive(Debug)]
559pub struct CommitResult {
560 pub txn_id: u64,
561 pub commit_epoch: u64,
562 pub write_count: usize,
563}
564
565#[derive(Debug, Default)]
567pub struct GcStats {
568 pub versions_removed: usize,
569 pub chains_emptied: usize,
570}
571
572#[derive(Debug)]
574pub struct StoreStats {
575 pub key_count: usize,
576 pub total_versions: usize,
577 pub max_versions_per_key: usize,
578 pub current_epoch: u64,
579 pub min_safe_epoch: u64,
580}
581
582pub struct EpochSnapshot<'a, V> {
588 epoch: u64,
589 store: &'a EpochMvccStore<V>,
590}
591
592impl<'a, V: Clone + Send + Sync + 'static> EpochSnapshot<'a, V> {
593 pub fn new(store: &'a EpochMvccStore<V>) -> Self {
595 let epoch = store.epoch_manager.current_epoch();
596 store.epoch_manager.register_reader(epoch);
597 Self { epoch, store }
598 }
599
600 pub fn at_epoch(store: &'a EpochMvccStore<V>, epoch: u64) -> Self {
602 store.epoch_manager.register_reader(epoch);
603 Self { epoch, store }
604 }
605
606 pub fn epoch(&self) -> u64 {
608 self.epoch
609 }
610
611 pub fn get(&self, key: &[u8]) -> Option<V> {
613 self.store.read_at_epoch(key, self.epoch)
614 }
615
616 pub fn keys(&self) -> Vec<Key> {
618 self.store
619 .data
620 .iter()
621 .filter(|e| e.value().read_at_epoch(self.epoch).is_some())
622 .map(|e| e.key().clone())
623 .collect()
624 }
625}
626
627impl<'a, V> Drop for EpochSnapshot<'a, V> {
628 fn drop(&mut self) {
629 self.store.epoch_manager.unregister_reader(self.epoch);
630 }
631}
632
633#[cfg(test)]
638mod tests {
639 use super::*;
640
641 #[test]
642 fn test_epoch_manager_basics() {
643 let manager = EpochManager::with_duration_ms(1); assert_eq!(manager.current_epoch(), 1);
646
647 std::thread::sleep(std::time::Duration::from_millis(2));
649 assert!(manager.should_advance());
650
651 let new_epoch = manager.advance_epoch();
652 assert_eq!(new_epoch, 2);
653 assert_eq!(manager.current_epoch(), 2);
654 }
655
656 #[test]
657 fn test_epoch_reader_tracking() {
658 let manager = EpochManager::new();
659
660 manager.register_reader(1);
661 manager.register_reader(1);
662 manager.register_reader(2);
663
664 assert_eq!(manager.min_safe_epoch(), 1);
665
666 manager.unregister_reader(1);
667 assert_eq!(manager.min_safe_epoch(), 1); manager.unregister_reader(1);
670 assert_eq!(manager.min_safe_epoch(), 2); }
672
673 #[test]
674 fn test_version_chain_read_at_epoch() {
675 let chain: EpochVersionChain<String> = EpochVersionChain::new();
676
677 chain.add_version(1, VersionEntry::new("v1".to_string(), 1, 1));
678 chain.add_version(3, VersionEntry::new("v3".to_string(), 3, 3));
679 chain.add_version(5, VersionEntry::new("v5".to_string(), 5, 5));
680
681 assert_eq!(chain.read_at_epoch(0), None);
683 assert_eq!(chain.read_at_epoch(1), Some("v1".to_string()));
684 assert_eq!(chain.read_at_epoch(2), Some("v1".to_string()));
685 assert_eq!(chain.read_at_epoch(3), Some("v3".to_string()));
686 assert_eq!(chain.read_at_epoch(4), Some("v3".to_string()));
687 assert_eq!(chain.read_at_epoch(5), Some("v5".to_string()));
688 assert_eq!(chain.read_at_epoch(100), Some("v5".to_string()));
689 }
690
691 #[test]
692 fn test_version_chain_delete() {
693 let chain: EpochVersionChain<String> = EpochVersionChain::new();
694
695 chain.add_version(1, VersionEntry::new("value".to_string(), 1, 1));
696 chain.add_version(2, VersionEntry::tombstone(2, 2));
697 chain.add_version(3, VersionEntry::new("resurrected".to_string(), 3, 3));
698
699 assert_eq!(chain.read_at_epoch(1), Some("value".to_string()));
700 assert_eq!(chain.read_at_epoch(2), None); assert_eq!(chain.read_at_epoch(3), Some("resurrected".to_string()));
702 }
703
704 #[test]
705 fn test_version_chain_gc() {
706 let chain: EpochVersionChain<i32> = EpochVersionChain::new();
707
708 for i in 1..=10 {
709 chain.add_version(i, VersionEntry::new(i as i32, i, i));
710 }
711
712 assert_eq!(chain.version_count(), 10);
713
714 let removed = chain.gc_before_epoch(5);
715 assert_eq!(removed, 4);
716 assert_eq!(chain.version_count(), 6);
717
718 assert_eq!(chain.read_at_epoch(4), None);
720 assert_eq!(chain.read_at_epoch(5), Some(5));
722 }
723
724 #[test]
725 fn test_mvcc_store_basic() {
726 let store: EpochMvccStore<String> = EpochMvccStore::new();
727
728 let mut txn = store.begin_txn();
729 txn.put(b"key1".to_vec(), "value1".to_string());
730 txn.put(b"key2".to_vec(), "value2".to_string());
731 let result = txn.commit();
732
733 assert_eq!(result.write_count, 2);
734
735 let txn2 = store.begin_txn();
737 assert_eq!(txn2.get(b"key1"), Some("value1".to_string()));
738 assert_eq!(txn2.get(b"key2"), Some("value2".to_string()));
739 txn2.abort();
740 }
741
742 #[test]
743 fn test_mvcc_store_snapshot_isolation() {
744 let store: EpochMvccStore<i32> = EpochMvccStore::new();
745
746 let mut txn1 = store.begin_txn();
748 txn1.put(b"x".to_vec(), 1);
749 txn1.commit();
750
751 store.epoch_manager().advance_epoch();
753
754 let snapshot = EpochSnapshot::new(&store);
756 assert_eq!(snapshot.get(b"x"), Some(1));
757
758 store.epoch_manager().advance_epoch();
760 let mut txn2 = store.begin_txn();
761 txn2.put(b"x".to_vec(), 2);
762 txn2.commit();
763
764 assert_eq!(snapshot.get(b"x"), Some(1));
766
767 let txn3 = store.begin_txn();
769 assert_eq!(txn3.get(b"x"), Some(2));
770 txn3.abort();
771 }
772
773 #[test]
774 fn test_mvcc_store_delete() {
775 let store: EpochMvccStore<String> = EpochMvccStore::new();
776
777 let mut txn1 = store.begin_txn();
779 txn1.put(b"key".to_vec(), "value".to_string());
780 txn1.commit();
781
782 store.epoch_manager().advance_epoch();
783
784 let snap = EpochSnapshot::new(&store);
786
787 store.epoch_manager().advance_epoch();
788
789 let mut txn2 = store.begin_txn();
791 txn2.delete(b"key".to_vec());
792 txn2.commit();
793
794 assert_eq!(snap.get(b"key"), Some("value".to_string()));
796
797 let txn3 = store.begin_txn();
799 assert_eq!(txn3.get(b"key"), None);
800 txn3.abort();
801 }
802
803 #[test]
804 fn test_mvcc_store_write_buffer() {
805 let store: EpochMvccStore<i32> = EpochMvccStore::new();
806
807 let mut txn = store.begin_txn();
808
809 txn.put(b"a".to_vec(), 1);
811 txn.put(b"b".to_vec(), 2);
812
813 assert_eq!(txn.get(b"a"), Some(1));
815 assert_eq!(txn.get(b"b"), Some(2));
816
817 txn.put(b"a".to_vec(), 10);
819 assert_eq!(txn.get(b"a"), Some(10));
820
821 txn.delete(b"b".to_vec());
823 assert_eq!(txn.get(b"b"), None);
824
825 txn.commit();
826 }
827
828 #[test]
829 fn test_mvcc_store_gc() {
830 let store: EpochMvccStore<i32> = EpochMvccStore::new();
831
832 for i in 0..5 {
834 let mut txn = store.begin_txn();
835 txn.put(b"key".to_vec(), i);
836 txn.commit();
837 store.epoch_manager().advance_epoch();
838 }
839
840 let stats = store.stats();
841 assert!(stats.total_versions >= 5);
842
843 let gc_stats = store.gc();
845
846 assert!(gc_stats.versions_removed >= 0);
849 }
850
851 #[test]
852 fn test_epoch_snapshot_keys() {
853 let store: EpochMvccStore<i32> = EpochMvccStore::new();
854
855 let mut txn = store.begin_txn();
856 txn.put(b"a".to_vec(), 1);
857 txn.put(b"b".to_vec(), 2);
858 txn.put(b"c".to_vec(), 3);
859 txn.commit();
860
861 let snap = EpochSnapshot::new(&store);
862 let keys = snap.keys();
863
864 assert_eq!(keys.len(), 3);
865 assert!(keys.contains(&b"a".to_vec()));
866 assert!(keys.contains(&b"b".to_vec()));
867 assert!(keys.contains(&b"c".to_vec()));
868 }
869}