Skip to main content

atomr_core/util/
snapshot.rs

1//! `Snapshot<T>` — read-mostly immutable-snapshot container.
2//!
3//! Phase 13.B/C of `docs/full-port-plan.md`. Akka.NET's hot-path
4//! shared state (cluster gossip, sharding allocation tables) is
5//! read by every dispatcher pump but updated rarely; we mirror the
6//! "swap an Arc snapshot" idiom from `arc-swap` so readers never
7//! block writers and vice versa.
8//!
9//! Implementation: `parking_lot::RwLock<Arc<T>>`. Reads acquire the
10//! read lock briefly to clone the `Arc`, then drop it — the actual
11//! data is read through the `Arc` for as long as the borrower needs
12//! it. Writes swap the inner `Arc` under the write lock; existing
13//! readers continue to see the old snapshot until they release it.
14//!
15//! Dep-free; we don't pull in `arc-swap` for one type.
16
17use std::sync::Arc;
18
19use parking_lot::RwLock;
20
21/// Lock-light snapshot container.
22pub struct Snapshot<T> {
23    inner: RwLock<Arc<T>>,
24}
25
26impl<T> Snapshot<T> {
27    pub fn new(value: T) -> Self {
28        Self { inner: RwLock::new(Arc::new(value)) }
29    }
30
31    /// Cheap clone of the current snapshot. Holds the read lock only
32    /// long enough to clone the `Arc`.
33    pub fn load(&self) -> Arc<T> {
34        self.inner.read().clone()
35    }
36
37    /// Replace the snapshot wholesale. Existing readers keep their
38    /// old `Arc` until they drop it.
39    pub fn store(&self, value: T) {
40        *self.inner.write() = Arc::new(value);
41    }
42
43    /// Compute a new value from the current and store it. Equivalent
44    /// to `store(f(load()))` but holds the write lock for the whole
45    /// duration so readers see a consistent transition.
46    pub fn rcu<F>(&self, f: F)
47    where
48        F: FnOnce(&T) -> T,
49    {
50        let mut g = self.inner.write();
51        let next = f(&g);
52        *g = Arc::new(next);
53    }
54}
55
56impl<T: Default> Default for Snapshot<T> {
57    fn default() -> Self {
58        Self::new(T::default())
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65    use std::sync::atomic::{AtomicU32, Ordering};
66    use std::sync::Arc as StdArc;
67
68    #[test]
69    fn load_and_store_round_trip() {
70        let s = Snapshot::new(vec![1, 2, 3]);
71        let snap = s.load();
72        assert_eq!(*snap, vec![1, 2, 3]);
73        s.store(vec![10, 20]);
74        let next = s.load();
75        assert_eq!(*next, vec![10, 20]);
76        // Old snapshot still readable.
77        assert_eq!(*snap, vec![1, 2, 3]);
78    }
79
80    #[test]
81    fn rcu_mutates_atomically() {
82        let s = Snapshot::new(0u32);
83        for _ in 0..10 {
84            s.rcu(|cur| cur + 1);
85        }
86        assert_eq!(*s.load(), 10);
87    }
88
89    #[test]
90    fn many_readers_no_blocking() {
91        let s = StdArc::new(Snapshot::new(0u64));
92        let counter = StdArc::new(AtomicU32::new(0));
93        let mut handles = Vec::new();
94        for _ in 0..8 {
95            let s = s.clone();
96            let c = counter.clone();
97            handles.push(std::thread::spawn(move || {
98                for _ in 0..1000 {
99                    let _ = s.load();
100                    c.fetch_add(1, Ordering::Relaxed);
101                }
102            }));
103        }
104        for h in handles {
105            h.join().unwrap();
106        }
107        assert_eq!(counter.load(Ordering::Relaxed), 8000);
108    }
109
110    #[test]
111    fn default_constructs_via_t_default() {
112        let s: Snapshot<Vec<u32>> = Snapshot::default();
113        assert!(s.load().is_empty());
114    }
115}