Skip to main content

actify/
cache.rs

1use std::fmt::Debug;
2use thiserror::Error;
3use tokio::sync::broadcast::{
4    self, Receiver,
5    error::{RecvError, TryRecvError},
6};
7
8use crate::{Frequency, Throttle, Throttled};
9
10/// A simple caching struct that can be used to locally maintain a synchronized state with an actor.
11///
12/// Create one via [`Handle::create_cache`](crate::Handle::create_cache) (initialized with the
13/// current actor value), [`Handle::create_cache_from`](crate::Handle::create_cache_from) (custom
14/// initial value), or [`Handle::create_cache_from_default`](crate::Handle::create_cache_from_default)
15/// (starts from `T::default()`).
16#[derive(Debug)]
17pub struct Cache<T> {
18    inner: T,
19    rx: broadcast::Receiver<T>,
20    first_request: bool,
21}
22
23impl<T> Clone for Cache<T>
24where
25    T: Clone + Send + Sync + 'static,
26{
27    fn clone(&self) -> Self {
28        Cache {
29            inner: self.inner.clone(),
30            rx: self.rx.resubscribe(),
31            first_request: self.first_request,
32        }
33    }
34}
35
36impl<T> Cache<T>
37where
38    T: Clone + Send + Sync + 'static,
39{
40    pub(crate) fn new(rx: Receiver<T>, initial_value: T) -> Self {
41        Self {
42            inner: initial_value,
43            rx,
44            first_request: true,
45        }
46    }
47
48    fn is_first_request(&mut self) -> bool {
49        let first = self.first_request;
50        self.first_request = false;
51        first
52    }
53
54    fn store(&mut self, val: T) -> &T {
55        self.inner = val;
56        &self.inner
57    }
58
59    /// Drains all buffered messages from the channel, keeping only the newest value.
60    /// Returns `true` if any value was stored.
61    fn drain_to_newest(&mut self) -> Result<bool, CacheRecvNewestError> {
62        let mut received = false;
63        loop {
64            match self.rx.try_recv() {
65                Ok(val) => {
66                    self.inner = val;
67                    received = true;
68                }
69                Err(TryRecvError::Empty) => return Ok(received),
70                Err(TryRecvError::Closed) => return Err(CacheRecvNewestError::Closed),
71                Err(TryRecvError::Lagged(nr)) => log_lag::<T>(nr),
72            }
73        }
74    }
75
76    /// Returns `true` if there are pending updates from the actor that haven't been received yet.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// # use actify::Handle;
82    /// # #[tokio::main]
83    /// # async fn main() {
84    /// let handle = Handle::new(1);
85    /// let cache = handle.create_cache().await;
86    /// assert!(!cache.has_updates());
87    ///
88    /// handle.set(2).await;
89    /// assert!(cache.has_updates());
90    /// # }
91    /// ```
92    pub fn has_updates(&self) -> bool {
93        !self.rx.is_empty()
94    }
95
96    /// Returns the newest value available, draining any pending updates from the channel.
97    /// If the channel is closed, returns the last known value without error.
98    ///
99    /// Note: when the cache is initialized with a default value (e.g. via
100    /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
101    /// the returned value may differ from the actor's actual value until a broadcast occurs.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// # use actify::Handle;
107    /// # #[tokio::main]
108    /// # async fn main() {
109    /// let handle = Handle::new(1);
110    /// let mut cache = handle.create_cache_from_default();
111    /// assert_eq!(cache.get_newest(), &0); // Not initialized, returns default
112    ///
113    /// handle.set(2).await;
114    /// handle.set(3).await;
115    /// assert_eq!(cache.get_newest(), &3); // Synchronizes with latest value
116    /// # }
117    /// ```
118    pub fn get_newest(&mut self) -> &T {
119        _ = self.try_recv_newest(); // Update if possible
120        self.get_current()
121    }
122
123    /// Returns the current cached value without synchronizing with the actor.
124    ///
125    /// Note: when the cache is initialized with a default value (e.g. via
126    /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
127    /// the returned value may differ from the actor's actual value until a broadcast occurs.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// # use actify::Handle;
133    /// # #[tokio::main]
134    /// # async fn main() {
135    /// let handle = Handle::new(1);
136    /// let cache = handle.create_cache().await;
137    /// assert_eq!(cache.get_current(), &1);
138    ///
139    /// handle.set(2).await;
140    /// // Still returns the cached value, not the updated actor value
141    /// assert_eq!(cache.get_current(), &1);
142    /// # }
143    /// ```
144    pub fn get_current(&self) -> &T {
145        &self.inner
146    }
147
148    /// Receives the newest broadcasted value from the actor, discarding any older messages.
149    ///
150    /// On the first call, returns the current cached value immediately, even if the channel is
151    /// closed. On subsequent calls, waits until an update is available.
152    ///
153    /// Note: when the cache is initialized with a default value (e.g. via
154    /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
155    /// the first call may return the default while the actor holds a different value.
156    ///
157    /// # Errors
158    ///
159    /// Returns [`CacheRecvNewestError::Closed`] if the actor is dropped (after the first call).
160    ///
161    /// # Examples
162    ///
163    /// ```
164    /// # use actify::Handle;
165    /// # #[tokio::main]
166    /// # async fn main() {
167    /// let handle = Handle::new(1);
168    /// let mut cache = handle.create_cache().await;
169    ///
170    /// // First call returns the initialized value immediately
171    /// assert_eq!(cache.recv_newest().await.unwrap(), &1);
172    ///
173    /// handle.set(2).await;
174    /// handle.set(3).await;
175    /// // Skips to newest value, discarding older updates
176    /// assert_eq!(cache.recv_newest().await.unwrap(), &3);
177    /// # }
178    /// ```
179    pub async fn recv_newest(&mut self) -> Result<&T, CacheRecvNewestError> {
180        if self.is_first_request() {
181            return Ok(self.get_newest());
182        }
183
184        loop {
185            match self.rx.recv().await {
186                Ok(val) => {
187                    self.inner = val;
188                    break;
189                }
190                Err(RecvError::Closed) => return Err(CacheRecvNewestError::Closed),
191                Err(RecvError::Lagged(nr)) => log_lag::<T>(nr),
192            }
193        }
194        _ = self.drain_to_newest();
195        Ok(&self.inner)
196    }
197
198    /// Receives the next broadcasted value from the actor (FIFO).
199    ///
200    /// On the first call, returns the current cached value immediately, even if the channel is
201    /// closed. On subsequent calls, waits until an update is available.
202    ///
203    /// Note: when the cache is initialized with a default value (e.g. via
204    /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
205    /// the first call may return the default while the actor holds a different value.
206    ///
207    /// # Errors
208    ///
209    /// Returns [`CacheRecvError::Closed`] if the actor is dropped, or
210    /// [`CacheRecvError::Lagged`] if the cache fell behind and messages were dropped.
211    ///
212    /// # Examples
213    ///
214    /// ```
215    /// # use actify::Handle;
216    /// # #[tokio::main]
217    /// # async fn main() {
218    /// let handle = Handle::new(1);
219    /// let mut cache = handle.create_cache().await;
220    ///
221    /// // First call returns the initialized value immediately
222    /// assert_eq!(cache.recv().await.unwrap(), &1);
223    ///
224    /// handle.set(2).await;
225    /// handle.set(3).await;
226    /// // Returns oldest update first (FIFO)
227    /// assert_eq!(cache.recv().await.unwrap(), &2);
228    /// # }
229    /// ```
230    pub async fn recv(&mut self) -> Result<&T, CacheRecvError> {
231        if self.is_first_request() {
232            return Ok(self.get_current());
233        }
234
235        let val = self.rx.recv().await?;
236        Ok(self.store(val))
237    }
238
239    /// Tries to receive the newest broadcasted value from the actor, discarding any older
240    /// messages. Returns immediately without waiting.
241    ///
242    /// On the first call, returns `Some` with the current cached value, even if no updates are
243    /// present. On subsequent calls, returns `None` if no new updates are available.
244    ///
245    /// Note: when the cache is initialized with a default value (e.g. via
246    /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
247    /// the first call may return the default while the actor holds a different value.
248    ///
249    /// # Errors
250    ///
251    /// Returns [`CacheRecvNewestError::Closed`] if the actor is dropped.
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// # use actify::Handle;
257    /// # #[tokio::main]
258    /// # async fn main() {
259    /// let handle = Handle::new(1);
260    /// let mut cache = handle.create_cache().await;
261    ///
262    /// // First call returns the initialized value
263    /// assert_eq!(cache.try_recv_newest().unwrap(), Some(&1));
264    /// // No new updates available
265    /// assert_eq!(cache.try_recv_newest().unwrap(), None);
266    ///
267    /// handle.set(2).await;
268    /// handle.set(3).await;
269    /// // Skips to newest value
270    /// assert_eq!(cache.try_recv_newest().unwrap(), Some(&3));
271    /// # }
272    /// ```
273    ///
274    /// When the cache is created from a default value, the actor's actual value is never
275    /// received unless a broadcast occurs:
276    ///
277    /// ```
278    /// # use actify::Handle;
279    /// # #[tokio::main]
280    /// # async fn main() {
281    /// let handle = Handle::new(5);
282    /// let mut cache = handle.create_cache_from_default();
283    ///
284    /// // Returns the default, not the actor's actual value (5)
285    /// assert_eq!(cache.try_recv_newest().unwrap(), Some(&0));
286    /// // No broadcasts arrived, so None — the actor's value (5) is never seen
287    /// assert_eq!(cache.try_recv_newest().unwrap(), None);
288    /// # }
289    /// ```
290    pub fn try_recv_newest(&mut self) -> Result<Option<&T>, CacheRecvNewestError> {
291        let first = self.is_first_request();
292        let received = self.drain_to_newest()?;
293        if received || first {
294            Ok(Some(&self.inner))
295        } else {
296            Ok(None)
297        }
298    }
299
300    /// Tries to receive the next broadcasted value from the actor (FIFO). Returns immediately
301    /// without waiting.
302    ///
303    /// On the first call, returns `Some` with the current cached value, even if no updates are
304    /// present or the channel is closed. On subsequent calls, returns `None` if no new updates
305    /// are available.
306    ///
307    /// Note: when the cache is initialized with a default value (e.g. via
308    /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
309    /// the first call may return the default while the actor holds a different value.
310    ///
311    /// # Errors
312    ///
313    /// Returns [`CacheRecvError::Closed`] if the actor is dropped, or
314    /// [`CacheRecvError::Lagged`] if the cache fell behind and messages were dropped.
315    ///
316    /// # Examples
317    ///
318    /// ```
319    /// # use actify::Handle;
320    /// # #[tokio::main]
321    /// # async fn main() {
322    /// let handle = Handle::new(1);
323    /// let mut cache = handle.create_cache().await;
324    ///
325    /// // First call returns the initialized value
326    /// assert_eq!(cache.try_recv().unwrap(), Some(&1));
327    /// // No new updates available
328    /// assert_eq!(cache.try_recv().unwrap(), None);
329    ///
330    /// handle.set(2).await;
331    /// handle.set(3).await;
332    /// // Returns oldest update first (FIFO)
333    /// assert_eq!(cache.try_recv().unwrap(), Some(&2));
334    /// # }
335    /// ```
336    ///
337    /// When the cache is created from a default value, the actor's actual value is never
338    /// received unless a broadcast occurs:
339    ///
340    /// ```
341    /// # use actify::Handle;
342    /// # #[tokio::main]
343    /// # async fn main() {
344    /// let handle = Handle::new(5);
345    /// let mut cache = handle.create_cache_from_default();
346    ///
347    /// // Returns the default, not the actor's actual value (5)
348    /// assert_eq!(cache.try_recv().unwrap(), Some(&0));
349    /// // No broadcasts arrived, so None — the actor's value (5) is never seen
350    /// assert_eq!(cache.try_recv().unwrap(), None);
351    /// # }
352    /// ```
353    pub fn try_recv(&mut self) -> Result<Option<&T>, CacheRecvError> {
354        if self.is_first_request() {
355            return Ok(Some(self.get_current()));
356        }
357
358        match self.rx.try_recv() {
359            Ok(val) => Ok(Some(self.store(val))),
360            Err(TryRecvError::Empty) => Ok(None),
361            Err(TryRecvError::Closed) => Err(CacheRecvError::Closed),
362            Err(TryRecvError::Lagged(nr)) => Err(CacheRecvError::Lagged(nr)),
363        }
364    }
365
366    /// Blocking version of [`recv`](Self::recv). Receives the next broadcasted value (FIFO).
367    /// Must not be called from an async context.
368    ///
369    /// On the first call, returns the current cached value immediately, even if the channel is
370    /// closed. On subsequent calls, blocks until an update is available.
371    ///
372    /// Note: when the cache is initialized with a default value (e.g. via
373    /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
374    /// the first call may return the default while the actor holds a different value.
375    ///
376    /// # Errors
377    ///
378    /// Returns [`CacheRecvError::Closed`] if the actor is dropped, or
379    /// [`CacheRecvError::Lagged`] if the cache fell behind and messages were dropped.
380    ///
381    /// # Examples
382    ///
383    /// ```
384    /// # use actify::Handle;
385    /// # #[tokio::main]
386    /// # async fn main() {
387    /// let handle = Handle::new(1);
388    /// let mut cache = handle.create_cache().await;
389    /// handle.set(2).await;
390    ///
391    /// std::thread::spawn(move || {
392    ///     // First call returns the initialized value immediately
393    ///     assert_eq!(cache.blocking_recv().unwrap(), &1);
394    ///     // Subsequent call receives the update
395    ///     assert_eq!(cache.blocking_recv().unwrap(), &2);
396    /// }).join().unwrap();
397    /// # }
398    /// ```
399    pub fn blocking_recv(&mut self) -> Result<&T, CacheRecvError> {
400        if self.is_first_request() {
401            return Ok(self.get_current());
402        }
403
404        let val = self.rx.blocking_recv()?;
405        Ok(self.store(val))
406    }
407
408    /// Blocking version of [`recv_newest`](Self::recv_newest). Receives the newest broadcasted
409    /// value, discarding any older messages. Must not be called from an async context.
410    ///
411    /// On the first call, returns the newest available value immediately, even if the channel is
412    /// closed. On subsequent calls, blocks until an update is available.
413    ///
414    /// Note: when the cache is initialized with a default value (e.g. via
415    /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
416    /// the first call may return the default while the actor holds a different value.
417    ///
418    /// # Errors
419    ///
420    /// Returns [`CacheRecvNewestError::Closed`] if the actor is dropped (after the first call).
421    ///
422    /// # Examples
423    ///
424    /// ```
425    /// # use actify::Handle;
426    /// # #[tokio::main]
427    /// # async fn main() {
428    /// let handle = Handle::new(1);
429    /// let mut cache = handle.create_cache().await;
430    /// handle.set(2).await;
431    /// handle.set(3).await;
432    ///
433    /// std::thread::spawn(move || {
434    ///     // First call skips to the newest available value
435    ///     assert_eq!(cache.blocking_recv_newest().unwrap(), &3);
436    /// }).join().unwrap();
437    /// # }
438    /// ```
439    pub fn blocking_recv_newest(&mut self) -> Result<&T, CacheRecvNewestError> {
440        if self.is_first_request() {
441            return Ok(self.get_newest());
442        }
443
444        loop {
445            match self.rx.blocking_recv() {
446                Ok(val) => {
447                    self.inner = val;
448                    if self.rx.is_empty() {
449                        return Ok(&self.inner);
450                    }
451                }
452                Err(RecvError::Closed) => return Err(CacheRecvNewestError::Closed),
453                Err(RecvError::Lagged(nr)) => log_lag::<T>(nr),
454            }
455        }
456    }
457
458    /// Spawns a [`Throttle`] that fires given a specified [`Frequency`], given any broadcasted updates by the actor.
459    /// Does not first update the cache to the newest value, since then the user of the cache might miss the update.
460    /// See [`Handle::spawn_throttle`](crate::Handle::spawn_throttle) for an example.
461    pub fn spawn_throttle<C, F>(&self, client: C, call: fn(&C, F), freq: Frequency)
462    where
463        C: Send + Sync + 'static,
464        T: Throttled<F>,
465        F: Clone + Send + Sync + 'static,
466    {
467        let current = self.inner.clone();
468        let receiver = self.rx.resubscribe();
469        Throttle::spawn_from_receiver(client, call, freq, receiver, Some(current));
470    }
471}
472
473fn log_lag<T>(nr: u64) {
474    log::debug!(
475        "Cache of actor type {} lagged {nr:?} messages",
476        std::any::type_name::<T>()
477    );
478}
479
480/// Error returned by [`Cache::recv`] and [`Cache::try_recv`].
481#[derive(Error, Debug, PartialEq, Clone)]
482pub enum CacheRecvError {
483    #[error("Cache channel closed")]
484    Closed,
485    #[error("Cache channel lagged by {0}")]
486    Lagged(u64),
487}
488
489impl From<RecvError> for CacheRecvError {
490    fn from(err: RecvError) -> Self {
491        match err {
492            RecvError::Closed => CacheRecvError::Closed,
493            RecvError::Lagged(nr) => CacheRecvError::Lagged(nr),
494        }
495    }
496}
497
498/// Error returned by [`Cache::recv_newest`] and [`Cache::try_recv_newest`].
499#[derive(Error, Debug, PartialEq, Clone)]
500pub enum CacheRecvNewestError {
501    #[error("Cache channel closed")]
502    Closed,
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508    use crate::Handle;
509    use tokio::time::{Duration, sleep};
510
511    #[tokio::test(start_paused = true)]
512    async fn test_recv_waits_for_update() {
513        let handle = Handle::new(2);
514        let mut cache = handle.create_cache().await;
515
516        assert_eq!(cache.recv().await.unwrap(), &2); // First call returns immediately
517
518        tokio::select! {
519            _ = async {
520                sleep(Duration::from_millis(200)).await;
521                handle.set(10).await;
522                sleep(Duration::from_millis(200)).await;
523            } => panic!("Timeout"),
524            res = cache.recv() => assert_eq!(res.unwrap(), &10)
525        };
526    }
527
528    #[tokio::test(start_paused = true)]
529    async fn test_recv_fifo_ordering() {
530        let handle = Handle::new(0);
531        let mut cache = handle.create_cache().await;
532
533        assert_eq!(cache.recv().await.unwrap(), &0); // First call
534
535        handle.set(1).await;
536        handle.set(2).await;
537        handle.set(3).await;
538
539        assert_eq!(cache.recv().await.unwrap(), &1);
540        assert_eq!(cache.recv().await.unwrap(), &2);
541        assert_eq!(cache.recv().await.unwrap(), &3);
542    }
543
544    #[tokio::test(start_paused = true)]
545    async fn test_recv_newest_skips_intermediate() {
546        let handle = Handle::new(0);
547        let mut cache = handle.create_cache().await;
548
549        assert_eq!(cache.recv_newest().await.unwrap(), &0); // First call
550
551        handle.set(1).await;
552        handle.set(2).await;
553        handle.set(3).await;
554        sleep(Duration::from_millis(1)).await; // Let broadcasts arrive
555
556        assert_eq!(cache.recv_newest().await.unwrap(), &3); // Skips 1 and 2
557    }
558
559    #[tokio::test(start_paused = true)]
560    async fn test_try_recv_returns_none_when_empty() {
561        let handle = Handle::new(1);
562        let mut cache = handle.create_cache().await;
563
564        assert_eq!(cache.try_recv().unwrap(), Some(&1)); // First call
565        assert_eq!(cache.try_recv().unwrap(), None); // No updates
566
567        handle.set(2).await;
568        assert_eq!(cache.try_recv().unwrap(), Some(&2));
569        assert_eq!(cache.try_recv().unwrap(), None);
570    }
571
572    #[tokio::test(start_paused = true)]
573    async fn test_try_recv_newest_returns_none_when_empty() {
574        let handle = Handle::new(1);
575        let mut cache = handle.create_cache().await;
576
577        handle.set(2).await;
578        assert_eq!(cache.try_recv_newest().unwrap(), Some(&2)); // First call
579        assert_eq!(cache.try_recv_newest().unwrap(), None);
580    }
581
582    #[tokio::test(start_paused = true)]
583    async fn test_try_set_if_changed() {
584        let handle = Handle::new(1);
585        let mut cache = handle.create_cache().await;
586        assert_eq!(cache.try_recv_newest().unwrap(), Some(&1));
587        handle.set_if_changed(1).await;
588        assert!(cache.try_recv_newest().unwrap().is_none());
589        handle.set_if_changed(2).await;
590        assert_eq!(cache.try_recv_newest().unwrap(), Some(&2));
591    }
592
593    #[tokio::test(start_paused = true)]
594    async fn test_get_current_does_not_sync() {
595        let handle = Handle::new(1);
596        let cache = handle.create_cache().await;
597
598        handle.set(99).await;
599        assert_eq!(cache.get_current(), &1); // Still the old value
600    }
601
602    #[tokio::test(start_paused = true)]
603    async fn test_get_newest_syncs() {
604        let handle = Handle::new(1);
605        let mut cache = handle.create_cache().await;
606
607        handle.set(2).await;
608        handle.set(3).await;
609        assert_eq!(cache.get_newest(), &3);
610    }
611
612    #[tokio::test(start_paused = true)]
613    async fn test_has_updates() {
614        let handle = Handle::new(1);
615        let cache = handle.create_cache().await;
616
617        assert!(!cache.has_updates());
618        handle.set(2).await;
619        assert!(cache.has_updates());
620    }
621
622    #[tokio::test(start_paused = true)]
623    async fn test_create_cache_from_default() {
624        let handle = Handle::new(42);
625        let mut cache = handle.create_cache_from_default();
626
627        // Starts from default, not the actor's value
628        assert_eq!(cache.get_current(), &0);
629        assert_eq!(cache.try_recv().unwrap(), Some(&0)); // First call returns default
630
631        // Only sees actor value after a broadcast
632        handle.set(99).await;
633        assert_eq!(cache.try_recv().unwrap(), Some(&99));
634    }
635
636    #[tokio::test(start_paused = true)]
637    async fn test_closed_channel() {
638        let handle = Handle::new(1);
639        let mut cache = handle.create_cache().await;
640        cache.recv().await.unwrap(); // Consume first request
641
642        drop(handle);
643        assert_eq!(cache.recv().await, Err(CacheRecvError::Closed));
644        assert_eq!(cache.recv_newest().await, Err(CacheRecvNewestError::Closed));
645        assert_eq!(cache.try_recv(), Err(CacheRecvError::Closed));
646        assert_eq!(cache.try_recv_newest(), Err(CacheRecvNewestError::Closed));
647    }
648
649    #[tokio::test(start_paused = true)]
650    async fn test_blocking_recv() {
651        let handle = Handle::new(1);
652        let mut cache = handle.create_cache().await;
653        handle.set(2).await;
654
655        std::thread::spawn(move || {
656            assert_eq!(cache.blocking_recv().unwrap(), &1);
657            assert_eq!(cache.blocking_recv().unwrap(), &2);
658        })
659        .join()
660        .unwrap();
661    }
662
663    #[tokio::test(start_paused = true)]
664    async fn test_blocking_recv_newest() {
665        let handle = Handle::new(1);
666        let mut cache = handle.create_cache().await;
667        handle.set(2).await;
668        handle.set(3).await;
669
670        std::thread::spawn(move || {
671            // First call drains to newest
672            assert_eq!(cache.blocking_recv_newest().unwrap(), &3);
673        })
674        .join()
675        .unwrap();
676    }
677}