actify/cache.rs
1use std::fmt::Debug;
2use thiserror::Error;
3use tokio::sync::broadcast::{
4 self, Receiver,
5 error::{RecvError, TryRecvError},
6};
7
8use crate::{Frequency, Throttle, Throttled};
9
10/// A simple caching struct that can be used to locally maintain a synchronized state with an actor.
11///
12/// Create one via [`Handle::create_cache`](crate::Handle::create_cache) (initialized with the
13/// current actor value), [`Handle::create_cache_from`](crate::Handle::create_cache_from) (custom
14/// initial value), or [`Handle::create_cache_from_default`](crate::Handle::create_cache_from_default)
15/// (starts from `T::default()`).
16#[derive(Debug)]
17pub struct Cache<T> {
18 inner: T,
19 rx: broadcast::Receiver<T>,
20 first_request: bool,
21}
22
23impl<T> Clone for Cache<T>
24where
25 T: Clone + Send + Sync + 'static,
26{
27 fn clone(&self) -> Self {
28 Cache {
29 inner: self.inner.clone(),
30 rx: self.rx.resubscribe(),
31 first_request: self.first_request,
32 }
33 }
34}
35
36impl<T> Cache<T>
37where
38 T: Clone + Send + Sync + 'static,
39{
40 pub(crate) fn new(rx: Receiver<T>, initial_value: T) -> Self {
41 Self {
42 inner: initial_value,
43 rx,
44 first_request: true,
45 }
46 }
47
48 fn is_first_request(&mut self) -> bool {
49 let first = self.first_request;
50 self.first_request = false;
51 first
52 }
53
54 fn store(&mut self, val: T) -> &T {
55 self.inner = val;
56 &self.inner
57 }
58
59 /// Drains all buffered messages from the channel, keeping only the newest value.
60 /// Returns `true` if any value was stored.
61 fn drain_to_newest(&mut self) -> Result<bool, CacheRecvNewestError> {
62 let mut received = false;
63 loop {
64 match self.rx.try_recv() {
65 Ok(val) => {
66 self.inner = val;
67 received = true;
68 }
69 Err(TryRecvError::Empty) => return Ok(received),
70 Err(TryRecvError::Closed) => return Err(CacheRecvNewestError::Closed),
71 Err(TryRecvError::Lagged(nr)) => log_lag::<T>(nr),
72 }
73 }
74 }
75
76 /// Returns `true` if there are pending updates from the actor that haven't been received yet.
77 ///
78 /// # Examples
79 ///
80 /// ```
81 /// # use actify::Handle;
82 /// # #[tokio::main]
83 /// # async fn main() {
84 /// let handle = Handle::new(1);
85 /// let cache = handle.create_cache().await;
86 /// assert!(!cache.has_updates());
87 ///
88 /// handle.set(2).await;
89 /// assert!(cache.has_updates());
90 /// # }
91 /// ```
92 pub fn has_updates(&self) -> bool {
93 !self.rx.is_empty()
94 }
95
96 /// Returns the newest value available, draining any pending updates from the channel.
97 /// If the channel is closed, returns the last known value without error.
98 ///
99 /// Note: when the cache is initialized with a default value (e.g. via
100 /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
101 /// the returned value may differ from the actor's actual value until a broadcast occurs.
102 ///
103 /// # Examples
104 ///
105 /// ```
106 /// # use actify::Handle;
107 /// # #[tokio::main]
108 /// # async fn main() {
109 /// let handle = Handle::new(1);
110 /// let mut cache = handle.create_cache_from_default();
111 /// assert_eq!(cache.get_newest(), &0); // Not initialized, returns default
112 ///
113 /// handle.set(2).await;
114 /// handle.set(3).await;
115 /// assert_eq!(cache.get_newest(), &3); // Synchronizes with latest value
116 /// # }
117 /// ```
118 pub fn get_newest(&mut self) -> &T {
119 _ = self.try_recv_newest(); // Update if possible
120 self.get_current()
121 }
122
123 /// Returns the current cached value without synchronizing with the actor.
124 ///
125 /// Note: when the cache is initialized with a default value (e.g. via
126 /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
127 /// the returned value may differ from the actor's actual value until a broadcast occurs.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// # use actify::Handle;
133 /// # #[tokio::main]
134 /// # async fn main() {
135 /// let handle = Handle::new(1);
136 /// let cache = handle.create_cache().await;
137 /// assert_eq!(cache.get_current(), &1);
138 ///
139 /// handle.set(2).await;
140 /// // Still returns the cached value, not the updated actor value
141 /// assert_eq!(cache.get_current(), &1);
142 /// # }
143 /// ```
144 pub fn get_current(&self) -> &T {
145 &self.inner
146 }
147
148 /// Receives the newest broadcasted value from the actor, discarding any older messages.
149 ///
150 /// On the first call, returns the current cached value immediately, even if the channel is
151 /// closed. On subsequent calls, waits until an update is available.
152 ///
153 /// Note: when the cache is initialized with a default value (e.g. via
154 /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
155 /// the first call may return the default while the actor holds a different value.
156 ///
157 /// # Errors
158 ///
159 /// Returns [`CacheRecvNewestError::Closed`] if the actor is dropped (after the first call).
160 ///
161 /// # Examples
162 ///
163 /// ```
164 /// # use actify::Handle;
165 /// # #[tokio::main]
166 /// # async fn main() {
167 /// let handle = Handle::new(1);
168 /// let mut cache = handle.create_cache().await;
169 ///
170 /// // First call returns the initialized value immediately
171 /// assert_eq!(cache.recv_newest().await.unwrap(), &1);
172 ///
173 /// handle.set(2).await;
174 /// handle.set(3).await;
175 /// // Skips to newest value, discarding older updates
176 /// assert_eq!(cache.recv_newest().await.unwrap(), &3);
177 /// # }
178 /// ```
179 pub async fn recv_newest(&mut self) -> Result<&T, CacheRecvNewestError> {
180 if self.is_first_request() {
181 return Ok(self.get_newest());
182 }
183
184 loop {
185 match self.rx.recv().await {
186 Ok(val) => {
187 self.inner = val;
188 break;
189 }
190 Err(RecvError::Closed) => return Err(CacheRecvNewestError::Closed),
191 Err(RecvError::Lagged(nr)) => log_lag::<T>(nr),
192 }
193 }
194 _ = self.drain_to_newest();
195 Ok(&self.inner)
196 }
197
198 /// Receives the next broadcasted value from the actor (FIFO).
199 ///
200 /// On the first call, returns the current cached value immediately, even if the channel is
201 /// closed. On subsequent calls, waits until an update is available.
202 ///
203 /// Note: when the cache is initialized with a default value (e.g. via
204 /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
205 /// the first call may return the default while the actor holds a different value.
206 ///
207 /// # Errors
208 ///
209 /// Returns [`CacheRecvError::Closed`] if the actor is dropped, or
210 /// [`CacheRecvError::Lagged`] if the cache fell behind and messages were dropped.
211 ///
212 /// # Examples
213 ///
214 /// ```
215 /// # use actify::Handle;
216 /// # #[tokio::main]
217 /// # async fn main() {
218 /// let handle = Handle::new(1);
219 /// let mut cache = handle.create_cache().await;
220 ///
221 /// // First call returns the initialized value immediately
222 /// assert_eq!(cache.recv().await.unwrap(), &1);
223 ///
224 /// handle.set(2).await;
225 /// handle.set(3).await;
226 /// // Returns oldest update first (FIFO)
227 /// assert_eq!(cache.recv().await.unwrap(), &2);
228 /// # }
229 /// ```
230 pub async fn recv(&mut self) -> Result<&T, CacheRecvError> {
231 if self.is_first_request() {
232 return Ok(self.get_current());
233 }
234
235 let val = self.rx.recv().await?;
236 Ok(self.store(val))
237 }
238
239 /// Tries to receive the newest broadcasted value from the actor, discarding any older
240 /// messages. Returns immediately without waiting.
241 ///
242 /// On the first call, returns `Some` with the current cached value, even if no updates are
243 /// present. On subsequent calls, returns `None` if no new updates are available.
244 ///
245 /// Note: when the cache is initialized with a default value (e.g. via
246 /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
247 /// the first call may return the default while the actor holds a different value.
248 ///
249 /// # Errors
250 ///
251 /// Returns [`CacheRecvNewestError::Closed`] if the actor is dropped.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// # use actify::Handle;
257 /// # #[tokio::main]
258 /// # async fn main() {
259 /// let handle = Handle::new(1);
260 /// let mut cache = handle.create_cache().await;
261 ///
262 /// // First call returns the initialized value
263 /// assert_eq!(cache.try_recv_newest().unwrap(), Some(&1));
264 /// // No new updates available
265 /// assert_eq!(cache.try_recv_newest().unwrap(), None);
266 ///
267 /// handle.set(2).await;
268 /// handle.set(3).await;
269 /// // Skips to newest value
270 /// assert_eq!(cache.try_recv_newest().unwrap(), Some(&3));
271 /// # }
272 /// ```
273 ///
274 /// When the cache is created from a default value, the actor's actual value is never
275 /// received unless a broadcast occurs:
276 ///
277 /// ```
278 /// # use actify::Handle;
279 /// # #[tokio::main]
280 /// # async fn main() {
281 /// let handle = Handle::new(5);
282 /// let mut cache = handle.create_cache_from_default();
283 ///
284 /// // Returns the default, not the actor's actual value (5)
285 /// assert_eq!(cache.try_recv_newest().unwrap(), Some(&0));
286 /// // No broadcasts arrived, so None — the actor's value (5) is never seen
287 /// assert_eq!(cache.try_recv_newest().unwrap(), None);
288 /// # }
289 /// ```
290 pub fn try_recv_newest(&mut self) -> Result<Option<&T>, CacheRecvNewestError> {
291 let first = self.is_first_request();
292 let received = self.drain_to_newest()?;
293 if received || first {
294 Ok(Some(&self.inner))
295 } else {
296 Ok(None)
297 }
298 }
299
300 /// Tries to receive the next broadcasted value from the actor (FIFO). Returns immediately
301 /// without waiting.
302 ///
303 /// On the first call, returns `Some` with the current cached value, even if no updates are
304 /// present or the channel is closed. On subsequent calls, returns `None` if no new updates
305 /// are available.
306 ///
307 /// Note: when the cache is initialized with a default value (e.g. via
308 /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
309 /// the first call may return the default while the actor holds a different value.
310 ///
311 /// # Errors
312 ///
313 /// Returns [`CacheRecvError::Closed`] if the actor is dropped, or
314 /// [`CacheRecvError::Lagged`] if the cache fell behind and messages were dropped.
315 ///
316 /// # Examples
317 ///
318 /// ```
319 /// # use actify::Handle;
320 /// # #[tokio::main]
321 /// # async fn main() {
322 /// let handle = Handle::new(1);
323 /// let mut cache = handle.create_cache().await;
324 ///
325 /// // First call returns the initialized value
326 /// assert_eq!(cache.try_recv().unwrap(), Some(&1));
327 /// // No new updates available
328 /// assert_eq!(cache.try_recv().unwrap(), None);
329 ///
330 /// handle.set(2).await;
331 /// handle.set(3).await;
332 /// // Returns oldest update first (FIFO)
333 /// assert_eq!(cache.try_recv().unwrap(), Some(&2));
334 /// # }
335 /// ```
336 ///
337 /// When the cache is created from a default value, the actor's actual value is never
338 /// received unless a broadcast occurs:
339 ///
340 /// ```
341 /// # use actify::Handle;
342 /// # #[tokio::main]
343 /// # async fn main() {
344 /// let handle = Handle::new(5);
345 /// let mut cache = handle.create_cache_from_default();
346 ///
347 /// // Returns the default, not the actor's actual value (5)
348 /// assert_eq!(cache.try_recv().unwrap(), Some(&0));
349 /// // No broadcasts arrived, so None — the actor's value (5) is never seen
350 /// assert_eq!(cache.try_recv().unwrap(), None);
351 /// # }
352 /// ```
353 pub fn try_recv(&mut self) -> Result<Option<&T>, CacheRecvError> {
354 if self.is_first_request() {
355 return Ok(Some(self.get_current()));
356 }
357
358 match self.rx.try_recv() {
359 Ok(val) => Ok(Some(self.store(val))),
360 Err(TryRecvError::Empty) => Ok(None),
361 Err(TryRecvError::Closed) => Err(CacheRecvError::Closed),
362 Err(TryRecvError::Lagged(nr)) => Err(CacheRecvError::Lagged(nr)),
363 }
364 }
365
366 /// Blocking version of [`recv`](Self::recv). Receives the next broadcasted value (FIFO).
367 /// Must not be called from an async context.
368 ///
369 /// On the first call, returns the current cached value immediately, even if the channel is
370 /// closed. On subsequent calls, blocks until an update is available.
371 ///
372 /// Note: when the cache is initialized with a default value (e.g. via
373 /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
374 /// the first call may return the default while the actor holds a different value.
375 ///
376 /// # Errors
377 ///
378 /// Returns [`CacheRecvError::Closed`] if the actor is dropped, or
379 /// [`CacheRecvError::Lagged`] if the cache fell behind and messages were dropped.
380 ///
381 /// # Examples
382 ///
383 /// ```
384 /// # use actify::Handle;
385 /// # #[tokio::main]
386 /// # async fn main() {
387 /// let handle = Handle::new(1);
388 /// let mut cache = handle.create_cache().await;
389 /// handle.set(2).await;
390 ///
391 /// std::thread::spawn(move || {
392 /// // First call returns the initialized value immediately
393 /// assert_eq!(cache.blocking_recv().unwrap(), &1);
394 /// // Subsequent call receives the update
395 /// assert_eq!(cache.blocking_recv().unwrap(), &2);
396 /// }).join().unwrap();
397 /// # }
398 /// ```
399 pub fn blocking_recv(&mut self) -> Result<&T, CacheRecvError> {
400 if self.is_first_request() {
401 return Ok(self.get_current());
402 }
403
404 let val = self.rx.blocking_recv()?;
405 Ok(self.store(val))
406 }
407
408 /// Blocking version of [`recv_newest`](Self::recv_newest). Receives the newest broadcasted
409 /// value, discarding any older messages. Must not be called from an async context.
410 ///
411 /// On the first call, returns the newest available value immediately, even if the channel is
412 /// closed. On subsequent calls, blocks until an update is available.
413 ///
414 /// Note: when the cache is initialized with a default value (e.g. via
415 /// [`create_cache_from_default`](crate::Handle::create_cache_from_default)),
416 /// the first call may return the default while the actor holds a different value.
417 ///
418 /// # Errors
419 ///
420 /// Returns [`CacheRecvNewestError::Closed`] if the actor is dropped (after the first call).
421 ///
422 /// # Examples
423 ///
424 /// ```
425 /// # use actify::Handle;
426 /// # #[tokio::main]
427 /// # async fn main() {
428 /// let handle = Handle::new(1);
429 /// let mut cache = handle.create_cache().await;
430 /// handle.set(2).await;
431 /// handle.set(3).await;
432 ///
433 /// std::thread::spawn(move || {
434 /// // First call skips to the newest available value
435 /// assert_eq!(cache.blocking_recv_newest().unwrap(), &3);
436 /// }).join().unwrap();
437 /// # }
438 /// ```
439 pub fn blocking_recv_newest(&mut self) -> Result<&T, CacheRecvNewestError> {
440 if self.is_first_request() {
441 return Ok(self.get_newest());
442 }
443
444 loop {
445 match self.rx.blocking_recv() {
446 Ok(val) => {
447 self.inner = val;
448 if self.rx.is_empty() {
449 return Ok(&self.inner);
450 }
451 }
452 Err(RecvError::Closed) => return Err(CacheRecvNewestError::Closed),
453 Err(RecvError::Lagged(nr)) => log_lag::<T>(nr),
454 }
455 }
456 }
457
458 /// Spawns a [`Throttle`] that fires given a specified [`Frequency`], given any broadcasted updates by the actor.
459 /// Does not first update the cache to the newest value, since then the user of the cache might miss the update.
460 /// See [`Handle::spawn_throttle`](crate::Handle::spawn_throttle) for an example.
461 pub fn spawn_throttle<C, F>(&self, client: C, call: fn(&C, F), freq: Frequency)
462 where
463 C: Send + Sync + 'static,
464 T: Throttled<F>,
465 F: Clone + Send + Sync + 'static,
466 {
467 let current = self.inner.clone();
468 let receiver = self.rx.resubscribe();
469 Throttle::spawn_from_receiver(client, call, freq, receiver, Some(current));
470 }
471}
472
473fn log_lag<T>(nr: u64) {
474 log::debug!(
475 "Cache of actor type {} lagged {nr:?} messages",
476 std::any::type_name::<T>()
477 );
478}
479
480/// Error returned by [`Cache::recv`] and [`Cache::try_recv`].
481#[derive(Error, Debug, PartialEq, Clone)]
482pub enum CacheRecvError {
483 #[error("Cache channel closed")]
484 Closed,
485 #[error("Cache channel lagged by {0}")]
486 Lagged(u64),
487}
488
489impl From<RecvError> for CacheRecvError {
490 fn from(err: RecvError) -> Self {
491 match err {
492 RecvError::Closed => CacheRecvError::Closed,
493 RecvError::Lagged(nr) => CacheRecvError::Lagged(nr),
494 }
495 }
496}
497
498/// Error returned by [`Cache::recv_newest`] and [`Cache::try_recv_newest`].
499#[derive(Error, Debug, PartialEq, Clone)]
500pub enum CacheRecvNewestError {
501 #[error("Cache channel closed")]
502 Closed,
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508 use crate::Handle;
509 use tokio::time::{Duration, sleep};
510
511 #[tokio::test(start_paused = true)]
512 async fn test_recv_waits_for_update() {
513 let handle = Handle::new(2);
514 let mut cache = handle.create_cache().await;
515
516 assert_eq!(cache.recv().await.unwrap(), &2); // First call returns immediately
517
518 tokio::select! {
519 _ = async {
520 sleep(Duration::from_millis(200)).await;
521 handle.set(10).await;
522 sleep(Duration::from_millis(200)).await;
523 } => panic!("Timeout"),
524 res = cache.recv() => assert_eq!(res.unwrap(), &10)
525 };
526 }
527
528 #[tokio::test(start_paused = true)]
529 async fn test_recv_fifo_ordering() {
530 let handle = Handle::new(0);
531 let mut cache = handle.create_cache().await;
532
533 assert_eq!(cache.recv().await.unwrap(), &0); // First call
534
535 handle.set(1).await;
536 handle.set(2).await;
537 handle.set(3).await;
538
539 assert_eq!(cache.recv().await.unwrap(), &1);
540 assert_eq!(cache.recv().await.unwrap(), &2);
541 assert_eq!(cache.recv().await.unwrap(), &3);
542 }
543
544 #[tokio::test(start_paused = true)]
545 async fn test_recv_newest_skips_intermediate() {
546 let handle = Handle::new(0);
547 let mut cache = handle.create_cache().await;
548
549 assert_eq!(cache.recv_newest().await.unwrap(), &0); // First call
550
551 handle.set(1).await;
552 handle.set(2).await;
553 handle.set(3).await;
554 sleep(Duration::from_millis(1)).await; // Let broadcasts arrive
555
556 assert_eq!(cache.recv_newest().await.unwrap(), &3); // Skips 1 and 2
557 }
558
559 #[tokio::test(start_paused = true)]
560 async fn test_try_recv_returns_none_when_empty() {
561 let handle = Handle::new(1);
562 let mut cache = handle.create_cache().await;
563
564 assert_eq!(cache.try_recv().unwrap(), Some(&1)); // First call
565 assert_eq!(cache.try_recv().unwrap(), None); // No updates
566
567 handle.set(2).await;
568 assert_eq!(cache.try_recv().unwrap(), Some(&2));
569 assert_eq!(cache.try_recv().unwrap(), None);
570 }
571
572 #[tokio::test(start_paused = true)]
573 async fn test_try_recv_newest_returns_none_when_empty() {
574 let handle = Handle::new(1);
575 let mut cache = handle.create_cache().await;
576
577 handle.set(2).await;
578 assert_eq!(cache.try_recv_newest().unwrap(), Some(&2)); // First call
579 assert_eq!(cache.try_recv_newest().unwrap(), None);
580 }
581
582 #[tokio::test(start_paused = true)]
583 async fn test_try_set_if_changed() {
584 let handle = Handle::new(1);
585 let mut cache = handle.create_cache().await;
586 assert_eq!(cache.try_recv_newest().unwrap(), Some(&1));
587 handle.set_if_changed(1).await;
588 assert!(cache.try_recv_newest().unwrap().is_none());
589 handle.set_if_changed(2).await;
590 assert_eq!(cache.try_recv_newest().unwrap(), Some(&2));
591 }
592
593 #[tokio::test(start_paused = true)]
594 async fn test_get_current_does_not_sync() {
595 let handle = Handle::new(1);
596 let cache = handle.create_cache().await;
597
598 handle.set(99).await;
599 assert_eq!(cache.get_current(), &1); // Still the old value
600 }
601
602 #[tokio::test(start_paused = true)]
603 async fn test_get_newest_syncs() {
604 let handle = Handle::new(1);
605 let mut cache = handle.create_cache().await;
606
607 handle.set(2).await;
608 handle.set(3).await;
609 assert_eq!(cache.get_newest(), &3);
610 }
611
612 #[tokio::test(start_paused = true)]
613 async fn test_has_updates() {
614 let handle = Handle::new(1);
615 let cache = handle.create_cache().await;
616
617 assert!(!cache.has_updates());
618 handle.set(2).await;
619 assert!(cache.has_updates());
620 }
621
622 #[tokio::test(start_paused = true)]
623 async fn test_create_cache_from_default() {
624 let handle = Handle::new(42);
625 let mut cache = handle.create_cache_from_default();
626
627 // Starts from default, not the actor's value
628 assert_eq!(cache.get_current(), &0);
629 assert_eq!(cache.try_recv().unwrap(), Some(&0)); // First call returns default
630
631 // Only sees actor value after a broadcast
632 handle.set(99).await;
633 assert_eq!(cache.try_recv().unwrap(), Some(&99));
634 }
635
636 #[tokio::test(start_paused = true)]
637 async fn test_closed_channel() {
638 let handle = Handle::new(1);
639 let mut cache = handle.create_cache().await;
640 cache.recv().await.unwrap(); // Consume first request
641
642 drop(handle);
643 assert_eq!(cache.recv().await, Err(CacheRecvError::Closed));
644 assert_eq!(cache.recv_newest().await, Err(CacheRecvNewestError::Closed));
645 assert_eq!(cache.try_recv(), Err(CacheRecvError::Closed));
646 assert_eq!(cache.try_recv_newest(), Err(CacheRecvNewestError::Closed));
647 }
648
649 #[tokio::test(start_paused = true)]
650 async fn test_blocking_recv() {
651 let handle = Handle::new(1);
652 let mut cache = handle.create_cache().await;
653 handle.set(2).await;
654
655 std::thread::spawn(move || {
656 assert_eq!(cache.blocking_recv().unwrap(), &1);
657 assert_eq!(cache.blocking_recv().unwrap(), &2);
658 })
659 .join()
660 .unwrap();
661 }
662
663 #[tokio::test(start_paused = true)]
664 async fn test_blocking_recv_newest() {
665 let handle = Handle::new(1);
666 let mut cache = handle.create_cache().await;
667 handle.set(2).await;
668 handle.set(3).await;
669
670 std::thread::spawn(move || {
671 // First call drains to newest
672 assert_eq!(cache.blocking_recv_newest().unwrap(), &3);
673 })
674 .join()
675 .unwrap();
676 }
677}