actify/
cache.rs

1use std::fmt::Debug;
2use thiserror::Error;
3use tokio::sync::broadcast::{
4    self, Receiver,
5    error::{RecvError, TryRecvError},
6};
7
8use crate::{Frequency, Throttle, Throttled};
9
10/// A simple caching struct that can be used to locally maintain a synchronized state with an actor
11#[derive(Debug)]
12pub struct Cache<T> {
13    inner: T,
14    rx: broadcast::Receiver<T>,
15    first_request: bool,
16}
17
18impl<T> Clone for Cache<T>
19where
20    T: Clone + Send + Sync + 'static,
21{
22    fn clone(&self) -> Self {
23        Cache {
24            inner: self.inner.clone(),
25            rx: self.rx.resubscribe(),
26            first_request: self.first_request,
27        }
28    }
29}
30
31impl<T> Cache<T>
32where
33    T: Clone + Send + Sync + 'static,
34{
35    pub(crate) fn new(rx: Receiver<T>, initial_value: T) -> Self {
36        Self {
37            inner: initial_value,
38            rx,
39            first_request: true,
40        }
41    }
42
43    /// Returns if any new updates are received
44    pub fn has_updates(&self) -> bool {
45        !self.rx.is_empty()
46    }
47
48    /// Returns the newest value available, even if the channel is closed
49    /// Note that when the cache is initialized with a default value, this might return the default while the actor has a different value
50    pub fn get_newest(&mut self) -> &T {
51        _ = self.try_recv_newest(); // Update if possible
52        self.get_current()
53    }
54
55    /// Returns the current value held by the cache, without synchronizing with the actor
56    pub fn get_current(&self) -> &T {
57        &self.inner
58    }
59
60    /// Receive the newest updated value broadcasted by the actor, discarding any older messages.
61    /// The first time it will return its current value immediately
62    /// After that, it might wait indefinitely for a new update
63    /// Note that when the cache is initialized with a default value, this might return the default while the actor has a different value
64    pub async fn recv_newest(&mut self) -> Result<&T, CacheRecvNewestError> {
65        // If requesting a value for the first time, it returns immediately
66        if self.first_request {
67            self.first_request = false;
68            self.try_recv_newest()?; // Update inner to newest if possible
69            return Ok(self.get_current());
70        }
71
72        loop {
73            match self.rx.recv().await {
74                Ok(val) => {
75                    self.inner = val;
76
77                    // If only interested in the newest, and more updates are available, process those first
78                    if !self.rx.is_empty() {
79                        continue;
80                    }
81                    return Ok(self.get_current());
82                }
83                Err(e) => match e {
84                    RecvError::Closed => return Err(CacheRecvNewestError::Closed),
85                    RecvError::Lagged(nr) => log::debug!(
86                        "Cache of actor type {} lagged {nr:?} messages",
87                        std::any::type_name::<T>()
88                    ),
89                },
90            }
91        }
92    }
93
94    /// Receive the last updated value broadcasted by the actor (FIFO).
95    /// The first time it will return its current value immediately
96    /// After that, it might wait indefinitely for a new update
97    /// Note that when the cache is initialized with a default value, this might return the default while the actor has a different value
98    pub async fn recv(&mut self) -> Result<&T, CacheRecvError> {
99        // If requesting a value for the first time, it returns immediately
100        if self.first_request {
101            self.first_request = false;
102            return Ok(self.get_current());
103        }
104
105        match self.rx.recv().await {
106            Ok(val) => {
107                self.inner = val;
108                Ok(self.get_current())
109            }
110            Err(e) => match e {
111                RecvError::Closed => Err(CacheRecvError::Closed),
112                RecvError::Lagged(nr) => Err(CacheRecvError::Lagged(nr)),
113            },
114        }
115    }
116
117    /// Try to receive the newest updated value broadcasted by the actor, discarding any older messages.
118    /// The first time it will return its initialized value, even if no updates are present.
119    /// After that, lacking updates will return None.
120    /// Note that when the cache is initialized with a default value, this might return None while the actor has a value
121    pub fn try_recv_newest(&mut self) -> Result<Option<&T>, CacheRecvNewestError> {
122        loop {
123            match self.rx.try_recv() {
124                Ok(val) => {
125                    self.first_request = false;
126                    self.inner = val;
127
128                    // If more updates are available, process those first
129                    if !self.rx.is_empty() {
130                        continue;
131                    }
132                    return Ok(Some(self.get_current()));
133                }
134                Err(e) => match e {
135                    TryRecvError::Closed => return Err(CacheRecvNewestError::Closed),
136                    TryRecvError::Empty => {
137                        // If no new updates are present when listening for the newest value the first time
138                        // Then simply exit with the initialized value if present
139                        if self.first_request {
140                            self.first_request = false;
141                            return Ok(Some(self.get_current()));
142                        } else {
143                            return Ok(None);
144                        }
145                    }
146                    TryRecvError::Lagged(nr) => log::debug!(
147                        "Cache of actor type {} lagged {nr:?} messages",
148                        std::any::type_name::<T>()
149                    ),
150                },
151            }
152        }
153    }
154
155    /// Try to receive the last updated value broadcasted by the actor once (FIFO).
156    /// The first time it will return its initialized value, even if no updates are present.
157    /// After that, lacking updates will return None.
158    /// Note that when the cache is initialized with a default value, this might return None while the actor has a value
159    pub fn try_recv(&mut self) -> Result<Option<&T>, CacheRecvError> {
160        // If requesting a value for the first time, it returns immediately
161        if self.first_request {
162            self.first_request = false;
163            return Ok(Some(self.get_current()));
164        }
165
166        match self.rx.try_recv() {
167            Ok(val) => {
168                self.inner = val;
169                Ok(Some(self.get_current()))
170            }
171            Err(e) => match e {
172                TryRecvError::Closed => Err(CacheRecvError::Closed),
173                TryRecvError::Empty => Ok(None),
174                TryRecvError::Lagged(nr) => Err(CacheRecvError::Lagged(nr)),
175            },
176        }
177    }
178
179    /// Spawns a throttle that fires given a specificed [Frequency], given any broadcasted updates by the actor.
180    /// Does not first update the cache to the newest value, since then the user of the cache might miss the update
181    pub fn spawn_throttle<C, F>(&self, client: C, call: fn(&C, F), freq: Frequency)
182    where
183        C: Send + Sync + 'static,
184        T: Throttled<F>,
185        F: Clone + Send + Sync + 'static,
186    {
187        let current = self.inner.clone();
188        let receiver = self.rx.resubscribe();
189        Throttle::spawn_from_receiver(client, call, freq, receiver, Some(current));
190    }
191}
192
193#[derive(Error, Debug, PartialEq, Clone)]
194pub enum CacheRecvError {
195    #[error("Cache channel closed")]
196    Closed,
197    #[error("Cache channel lagged by {0}")]
198    Lagged(u64),
199}
200
201impl From<RecvError> for CacheRecvError {
202    fn from(err: RecvError) -> Self {
203        match err {
204            RecvError::Closed => CacheRecvError::Closed,
205            RecvError::Lagged(nr) => CacheRecvError::Lagged(nr),
206        }
207    }
208}
209
210#[derive(Error, Debug, PartialEq, Clone)]
211pub enum CacheRecvNewestError {
212    #[error("Cache channel closed")]
213    Closed,
214}
215
216#[cfg(test)]
217mod tests {
218    use crate::Handle;
219    use tokio::time::{Duration, sleep};
220
221    #[tokio::test]
222    async fn test_get_newest() {
223        let handle = Handle::new(1);
224        let mut cache = handle.create_cache_from_default();
225        assert_eq!(cache.get_newest(), &0); // Not initalized, so default although value is set
226        handle.set(2).await;
227        assert_eq!(cache.get_newest(), &2); // The new value is broadcasted and processed
228    }
229
230    #[tokio::test]
231    async fn test_has_updates() {
232        let handle = Handle::new(1);
233        let cache = handle.create_cache().await;
234        assert_eq!(cache.has_updates(), false);
235        handle.set(2).await;
236        assert!(cache.has_updates());
237    }
238
239    #[tokio::test]
240    async fn test_recv_cache() {
241        let handle = Handle::new(1);
242        let mut cache = handle.create_cache().await;
243        assert_eq!(cache.recv().await.unwrap(), &1);
244        handle.set(2).await;
245        handle.set(3).await; // Not returned yet, as returning oldest value first
246        assert_eq!(cache.recv().await.unwrap(), &2)
247    }
248
249    #[tokio::test]
250    async fn test_recv_cache_newest() {
251        let handle = Handle::new(1);
252        let mut cache = handle.create_cache().await;
253        assert_eq!(cache.recv_newest().await.unwrap(), &1);
254        handle.set(2).await;
255        handle.set(3).await;
256        assert_eq!(cache.recv_newest().await.unwrap(), &3)
257    }
258
259    #[tokio::test]
260    async fn test_immediate_cache_return() {
261        let handle = Handle::new(1);
262        let mut cache = handle.create_cache().await;
263        handle.set(2).await; // Not returned yet, as returning oldest value first
264        assert_eq!(cache.recv().await.unwrap(), &1)
265    }
266
267    #[tokio::test]
268    async fn test_immediate_cache_return_with_newest() {
269        let handle = Handle::new(1);
270        let mut cache = handle.create_cache().await;
271        handle.set(2).await;
272        assert_eq!(cache.recv_newest().await.unwrap(), &2)
273    }
274
275    #[tokio::test]
276    async fn test_delayed_cache_return() {
277        let handle = Handle::new(2);
278        let mut cache = handle.create_cache().await;
279
280        cache.recv().await.unwrap(); // First listen exits immediately
281
282        tokio::select! {
283            _ = async {
284                sleep(Duration::from_millis(200)).await;
285                handle.set(10).await;
286                sleep(Duration::from_millis(200)).await; // Allow recv to exit
287            } => panic!("Timeout"),
288            res = cache.recv() => assert_eq!(res.unwrap(), &10)
289        };
290    }
291
292    #[tokio::test]
293    async fn test_try_recv() {
294        let handle = Handle::new(2);
295        let mut cache = handle.create_cache().await;
296        assert_eq!(cache.try_recv().unwrap(), Some(&2));
297        assert!(cache.try_recv().unwrap().is_none())
298    }
299
300    #[tokio::test]
301    async fn test_try_recv_default() {
302        let handle = Handle::new(2);
303        let mut cache = handle.create_cache_from_default();
304        assert_eq!(cache.try_recv().unwrap(), Some(&0))
305    }
306
307    #[tokio::test]
308    async fn test_try_recv_newest() {
309        let handle = Handle::new(2);
310        let mut cache = handle.create_cache().await;
311        assert_eq!(cache.try_recv_newest().unwrap(), Some(&2)); // Returns the initialized value directly
312        assert!(cache.try_recv_newest().unwrap().is_none())
313    }
314
315    #[tokio::test]
316    async fn test_try_recv_newest_default() {
317        let handle = Handle::new(2);
318        let mut cache = handle.create_cache_from_default();
319        assert_eq!(cache.try_recv_newest().unwrap(), Some(&0)) // Returns the default directly
320    }
321
322    #[tokio::test]
323    async fn test_try_recv_some() {
324        let handle = Handle::new(1);
325        let mut cache = handle.create_cache().await;
326        assert_eq!(cache.try_recv().unwrap(), Some(&1));
327        handle.set(2).await;
328        handle.set(3).await; // Not returned, as returning oldes value first
329        assert_eq!(cache.try_recv().unwrap(), Some(&2))
330    }
331
332    #[tokio::test]
333    async fn test_try_recv_some_newest() {
334        let handle = Handle::new(1);
335        let mut cache = handle.create_cache().await;
336        assert_eq!(cache.try_recv_newest().unwrap(), Some(&1));
337        handle.set(2).await;
338        handle.set(3).await; // Returned, as newest value first
339        assert_eq!(cache.try_recv_newest().unwrap(), Some(&3))
340    }
341
342    #[tokio::test]
343    async fn test_try_set_if_changed() {
344        let handle = Handle::new(1);
345        let mut cache = handle.create_cache().await;
346        assert_eq!(cache.try_recv_newest().unwrap(), Some(&1));
347        handle.set_if_changed(1).await;
348        assert!(cache.try_recv_newest().unwrap().is_none());
349        handle.set_if_changed(2).await;
350        assert_eq!(cache.try_recv_newest().unwrap(), Some(&2))
351    }
352}