Skip to main content

typhoon/cache/
value.rs

1#[cfg(test)]
2#[path = "../../tests/cache/value.rs"]
3mod tests;
4
5use std::sync::Arc;
6
7use cfg_if::cfg_if;
8
9cfg_if! {
10    if #[cfg(any(feature = "client", all(test, feature = "server")))] {
11        use std::cell::UnsafeCell;
12        use std::marker::PhantomData;
13        use std::sync::Weak;
14        use arc_swap::ArcSwap;
15        use crate::cache::common::CacheError;
16    }
17}
18
19/// Shared mutable value with lock-free reads.
20///
21/// All siblings share the same [`ArcSwap`]; a [`SharedValue::set`] atomically publishes to all of
22/// them. Staleness is detected by comparing the last-seen shared pointer (`shared`) to the
23/// current `ArcSwap` value — independent of any local mutations made via [`SharedValue::get_mut`].
24///
25/// `shared` always matches the ArcSwap. `local` may diverge (via `get_mut`) until the next
26/// shared-state update, at which point it is reset.
27///
28/// `!Sync` by design: each instance must be driven from exactly one task at a time.
29#[cfg(any(feature = "client", all(test, feature = "server")))]
30pub(crate) struct SharedValue<T: Clone + Send + Sync> {
31    state: Arc<ArcSwap<T>>,
32    /// Last pointer seen from `state`; used for staleness detection only.
33    shared: Arc<T>,
34    /// Local working copy; may differ from `shared` after a `get_mut` call.
35    local: Arc<T>,
36    _not_sync: PhantomData<UnsafeCell<()>>,
37}
38
39#[cfg(any(feature = "client", all(test, feature = "server")))]
40impl<T: Clone + Send + Sync> SharedValue<T> {
41    /// Create a new shared value.
42    #[inline]
43    pub(crate) fn new(value: T) -> Self {
44        let arc = Arc::new(value);
45        SharedValue {
46            state: Arc::new(ArcSwap::from(arc.clone())),
47            shared: arc.clone(),
48            local: arc,
49            _not_sync: PhantomData,
50        }
51    }
52
53    /// Return a shared reference to the current value, refreshing the local cache if the shared
54    /// state has been updated by a sibling.
55    #[inline]
56    pub(crate) fn get(&mut self) -> &T {
57        self.refresh();
58        &self.local
59    }
60
61    /// Return a mutable reference to the local cache copy, refreshing from shared state first if
62    /// a sibling has called `set`. Mutations are local to this instance only and do not propagate.
63    #[inline]
64    pub(crate) fn get_mut(&mut self) -> &mut T {
65        self.refresh();
66        Arc::make_mut(&mut self.local)
67    }
68
69    /// Atomically publish `value` to all siblings and update this instance's local cache.
70    #[inline]
71    pub(crate) fn set(&mut self, value: T) {
72        let arc = Arc::new(value);
73        self.shared = arc.clone();
74        self.local = arc.clone();
75        self.state.store(arc);
76    }
77
78    /// Create another `SharedValue` pointing at the same shared state.
79    #[inline]
80    pub(crate) fn create_sibling(&self) -> SharedValue<T> {
81        let current = self.state.load_full();
82        SharedValue {
83            state: Arc::clone(&self.state),
84            shared: current.clone(),
85            local: current,
86            _not_sync: PhantomData,
87        }
88    }
89
90    /// Create a [`CachedValue`] that reads from this shared state but detects drops of the source.
91    #[inline]
92    pub(crate) fn create_cache(&self) -> CachedValue<T> {
93        let current = self.state.load_full();
94        CachedValue {
95            source: Arc::downgrade(&self.state),
96            shared: current.clone(),
97            local: current,
98            _not_sync: PhantomData,
99        }
100    }
101
102    /// Re-fetch from shared state if a sibling has published a new value.
103    /// Local mutations survive as long as the shared pointer has not changed.
104    #[inline]
105    fn refresh(&mut self) {
106        let current = self.state.load();
107        if !Arc::ptr_eq(&self.shared, &current) {
108            self.shared = Arc::clone(&current);
109            self.local = Arc::clone(&current);
110        }
111    }
112}
113
114/// Read-only cache of a [`SharedValue`] that returns [`CacheError::SourceDropped`] when the
115/// originating `SharedValue` has been dropped.
116///
117/// Staleness detection uses the same `shared` / `local` split as [`SharedValue`].
118/// `!Sync` by design.
119#[cfg(any(feature = "client", all(test, feature = "server")))]
120pub(crate) struct CachedValue<T: Clone + Send + Sync> {
121    source: Weak<ArcSwap<T>>,
122    /// Last pointer seen from the source; used for staleness detection only.
123    shared: Arc<T>,
124    /// Local working copy; may differ from `shared` after a `get_mut` call.
125    local: Arc<T>,
126    _not_sync: PhantomData<UnsafeCell<()>>,
127}
128
129#[cfg(any(feature = "client", all(test, feature = "server")))]
130impl<T: Clone + Send + Sync> CachedValue<T> {
131    /// Return a mutable reference to the local cache copy, or `Err` if the source was dropped.
132    /// Mutations are local to this instance only.
133    #[inline]
134    pub(crate) fn get_mut(&mut self) -> Result<&mut T, CacheError> {
135        self.refresh()?;
136        Ok(Arc::make_mut(&mut self.local))
137    }
138
139    /// Project this cache's *published* value into a read-only, `Send + Sync` [`DerivedValue`].
140    #[inline]
141    pub(crate) fn derive<R, F>(&self, f: F) -> Result<DerivedValue<R>, CacheError>
142    where
143        T: 'static,
144        R: 'static,
145        F: Fn(&T) -> R + Send + Sync + 'static,
146    {
147        let state = self.source.upgrade().ok_or(CacheError::SourceDropped)?;
148        Ok(DerivedValue {
149            read: Arc::new(move || {
150                let guard = state.load();
151                f(&guard)
152            }),
153        })
154    }
155
156    /// Create a sibling [`CachedValue`] pointing at the same source, or `Err` if dropped.
157    #[inline]
158    pub(crate) fn create_sibling(&self) -> Result<CachedValue<T>, CacheError> {
159        let source = self.source.upgrade().ok_or(CacheError::SourceDropped)?;
160        let current = source.load_full();
161        Ok(CachedValue {
162            source: self.source.clone(),
163            shared: current.clone(),
164            local: current,
165            _not_sync: PhantomData,
166        })
167    }
168
169    /// Re-fetch from shared state if the source has published a new value.
170    #[inline]
171    fn refresh(&mut self) -> Result<(), CacheError> {
172        let source = self.source.upgrade().ok_or(CacheError::SourceDropped)?;
173        let current = source.load();
174        if !Arc::ptr_eq(&self.shared, &current) {
175            self.shared = Arc::clone(&current);
176            self.local = Arc::clone(&current);
177        }
178        Ok(())
179    }
180}
181
182/// Read-only, `Send + Sync`, live projection of a [`SharedValue`]'s published value.
183pub struct DerivedValue<R> {
184    read: Arc<dyn Fn() -> R + Send + Sync>,
185}
186
187impl<R> DerivedValue<R> {
188    /// A `DerivedValue` that always yields `value` — for fixed sources such as a server-side per-user identity that never rotates.
189    #[cfg(any(feature = "server", test))]
190    #[inline]
191    pub(crate) fn constant(value: R) -> Self
192    where
193        R: Clone + Send + Sync + 'static,
194    {
195        Self {
196            read: Arc::new(move || value.clone()),
197        }
198    }
199
200    /// Read the current projected value.
201    #[inline]
202    pub fn get(&self) -> R {
203        (self.read)()
204    }
205}
206
207impl<R> Clone for DerivedValue<R> {
208    #[inline]
209    fn clone(&self) -> Self {
210        Self {
211            read: Arc::clone(&self.read),
212        }
213    }
214}