1use std::fmt::Debug;
2use thiserror::Error;
3use tokio::sync::broadcast::{
4 self, Receiver,
5 error::{RecvError, TryRecvError},
6};
7
8use crate::{Frequency, Throttle, Throttled};
9
10#[derive(Debug)]
12pub struct Cache<T> {
13 inner: T,
14 rx: broadcast::Receiver<T>,
15 first_request: bool,
16}
17
18impl<T> Clone for Cache<T>
19where
20 T: Clone + Send + Sync + 'static,
21{
22 fn clone(&self) -> Self {
23 Cache {
24 inner: self.inner.clone(),
25 rx: self.rx.resubscribe(),
26 first_request: self.first_request,
27 }
28 }
29}
30
31impl<T> Cache<T>
32where
33 T: Clone + Send + Sync + 'static,
34{
35 pub(crate) fn new(rx: Receiver<T>, initial_value: T) -> Self {
36 Self {
37 inner: initial_value,
38 rx,
39 first_request: true,
40 }
41 }
42
43 pub fn has_updates(&self) -> bool {
45 !self.rx.is_empty()
46 }
47
48 pub fn get_newest(&mut self) -> &T {
51 _ = self.try_recv_newest(); self.get_current()
53 }
54
55 pub fn get_current(&self) -> &T {
57 &self.inner
58 }
59
60 pub async fn recv_newest(&mut self) -> Result<&T, CacheRecvNewestError> {
65 if self.first_request {
67 self.first_request = false;
68 self.try_recv_newest()?; return Ok(self.get_current());
70 }
71
72 loop {
73 match self.rx.recv().await {
74 Ok(val) => {
75 self.inner = val;
76
77 if !self.rx.is_empty() {
79 continue;
80 }
81 return Ok(self.get_current());
82 }
83 Err(e) => match e {
84 RecvError::Closed => return Err(CacheRecvNewestError::Closed),
85 RecvError::Lagged(nr) => log::debug!(
86 "Cache of actor type {} lagged {nr:?} messages",
87 std::any::type_name::<T>()
88 ),
89 },
90 }
91 }
92 }
93
94 pub async fn recv(&mut self) -> Result<&T, CacheRecvError> {
99 if self.first_request {
101 self.first_request = false;
102 return Ok(self.get_current());
103 }
104
105 match self.rx.recv().await {
106 Ok(val) => {
107 self.inner = val;
108 Ok(self.get_current())
109 }
110 Err(e) => match e {
111 RecvError::Closed => Err(CacheRecvError::Closed),
112 RecvError::Lagged(nr) => Err(CacheRecvError::Lagged(nr)),
113 },
114 }
115 }
116
117 pub fn try_recv_newest(&mut self) -> Result<Option<&T>, CacheRecvNewestError> {
122 loop {
123 match self.rx.try_recv() {
124 Ok(val) => {
125 self.first_request = false;
126 self.inner = val;
127
128 if !self.rx.is_empty() {
130 continue;
131 }
132 return Ok(Some(self.get_current()));
133 }
134 Err(e) => match e {
135 TryRecvError::Closed => return Err(CacheRecvNewestError::Closed),
136 TryRecvError::Empty => {
137 if self.first_request {
140 self.first_request = false;
141 return Ok(Some(self.get_current()));
142 } else {
143 return Ok(None);
144 }
145 }
146 TryRecvError::Lagged(nr) => log::debug!(
147 "Cache of actor type {} lagged {nr:?} messages",
148 std::any::type_name::<T>()
149 ),
150 },
151 }
152 }
153 }
154
155 pub fn try_recv(&mut self) -> Result<Option<&T>, CacheRecvError> {
160 if self.first_request {
162 self.first_request = false;
163 return Ok(Some(self.get_current()));
164 }
165
166 match self.rx.try_recv() {
167 Ok(val) => {
168 self.inner = val;
169 Ok(Some(self.get_current()))
170 }
171 Err(e) => match e {
172 TryRecvError::Closed => Err(CacheRecvError::Closed),
173 TryRecvError::Empty => Ok(None),
174 TryRecvError::Lagged(nr) => Err(CacheRecvError::Lagged(nr)),
175 },
176 }
177 }
178
179 pub fn spawn_throttle<C, F>(&self, client: C, call: fn(&C, F), freq: Frequency)
182 where
183 C: Send + Sync + 'static,
184 T: Throttled<F>,
185 F: Clone + Send + Sync + 'static,
186 {
187 let current = self.inner.clone();
188 let receiver = self.rx.resubscribe();
189 Throttle::spawn_from_receiver(client, call, freq, receiver, Some(current));
190 }
191}
192
193#[derive(Error, Debug, PartialEq, Clone)]
194pub enum CacheRecvError {
195 #[error("Cache channel closed")]
196 Closed,
197 #[error("Cache channel lagged by {0}")]
198 Lagged(u64),
199}
200
201impl From<RecvError> for CacheRecvError {
202 fn from(err: RecvError) -> Self {
203 match err {
204 RecvError::Closed => CacheRecvError::Closed,
205 RecvError::Lagged(nr) => CacheRecvError::Lagged(nr),
206 }
207 }
208}
209
210#[derive(Error, Debug, PartialEq, Clone)]
211pub enum CacheRecvNewestError {
212 #[error("Cache channel closed")]
213 Closed,
214}
215
216#[cfg(test)]
217mod tests {
218 use crate::Handle;
219 use tokio::time::{Duration, sleep};
220
221 #[tokio::test]
222 async fn test_get_newest() {
223 let handle = Handle::new(1);
224 let mut cache = handle.create_cache_from_default();
225 assert_eq!(cache.get_newest(), &0); handle.set(2).await;
227 assert_eq!(cache.get_newest(), &2); }
229
230 #[tokio::test]
231 async fn test_has_updates() {
232 let handle = Handle::new(1);
233 let cache = handle.create_cache().await;
234 assert_eq!(cache.has_updates(), false);
235 handle.set(2).await;
236 assert!(cache.has_updates());
237 }
238
239 #[tokio::test]
240 async fn test_recv_cache() {
241 let handle = Handle::new(1);
242 let mut cache = handle.create_cache().await;
243 assert_eq!(cache.recv().await.unwrap(), &1);
244 handle.set(2).await;
245 handle.set(3).await; assert_eq!(cache.recv().await.unwrap(), &2)
247 }
248
249 #[tokio::test]
250 async fn test_recv_cache_newest() {
251 let handle = Handle::new(1);
252 let mut cache = handle.create_cache().await;
253 assert_eq!(cache.recv_newest().await.unwrap(), &1);
254 handle.set(2).await;
255 handle.set(3).await;
256 assert_eq!(cache.recv_newest().await.unwrap(), &3)
257 }
258
259 #[tokio::test]
260 async fn test_immediate_cache_return() {
261 let handle = Handle::new(1);
262 let mut cache = handle.create_cache().await;
263 handle.set(2).await; assert_eq!(cache.recv().await.unwrap(), &1)
265 }
266
267 #[tokio::test]
268 async fn test_immediate_cache_return_with_newest() {
269 let handle = Handle::new(1);
270 let mut cache = handle.create_cache().await;
271 handle.set(2).await;
272 assert_eq!(cache.recv_newest().await.unwrap(), &2)
273 }
274
275 #[tokio::test]
276 async fn test_delayed_cache_return() {
277 let handle = Handle::new(2);
278 let mut cache = handle.create_cache().await;
279
280 cache.recv().await.unwrap(); tokio::select! {
283 _ = async {
284 sleep(Duration::from_millis(200)).await;
285 handle.set(10).await;
286 sleep(Duration::from_millis(200)).await; } => panic!("Timeout"),
288 res = cache.recv() => assert_eq!(res.unwrap(), &10)
289 };
290 }
291
292 #[tokio::test]
293 async fn test_try_recv() {
294 let handle = Handle::new(2);
295 let mut cache = handle.create_cache().await;
296 assert_eq!(cache.try_recv().unwrap(), Some(&2));
297 assert!(cache.try_recv().unwrap().is_none())
298 }
299
300 #[tokio::test]
301 async fn test_try_recv_default() {
302 let handle = Handle::new(2);
303 let mut cache = handle.create_cache_from_default();
304 assert_eq!(cache.try_recv().unwrap(), Some(&0))
305 }
306
307 #[tokio::test]
308 async fn test_try_recv_newest() {
309 let handle = Handle::new(2);
310 let mut cache = handle.create_cache().await;
311 assert_eq!(cache.try_recv_newest().unwrap(), Some(&2)); assert!(cache.try_recv_newest().unwrap().is_none())
313 }
314
315 #[tokio::test]
316 async fn test_try_recv_newest_default() {
317 let handle = Handle::new(2);
318 let mut cache = handle.create_cache_from_default();
319 assert_eq!(cache.try_recv_newest().unwrap(), Some(&0)) }
321
322 #[tokio::test]
323 async fn test_try_recv_some() {
324 let handle = Handle::new(1);
325 let mut cache = handle.create_cache().await;
326 assert_eq!(cache.try_recv().unwrap(), Some(&1));
327 handle.set(2).await;
328 handle.set(3).await; assert_eq!(cache.try_recv().unwrap(), Some(&2))
330 }
331
332 #[tokio::test]
333 async fn test_try_recv_some_newest() {
334 let handle = Handle::new(1);
335 let mut cache = handle.create_cache().await;
336 assert_eq!(cache.try_recv_newest().unwrap(), Some(&1));
337 handle.set(2).await;
338 handle.set(3).await; assert_eq!(cache.try_recv_newest().unwrap(), Some(&3))
340 }
341
342 #[tokio::test]
343 async fn test_try_set_if_changed() {
344 let handle = Handle::new(1);
345 let mut cache = handle.create_cache().await;
346 assert_eq!(cache.try_recv_newest().unwrap(), Some(&1));
347 handle.set_if_changed(1).await;
348 assert!(cache.try_recv_newest().unwrap().is_none());
349 handle.set_if_changed(2).await;
350 assert_eq!(cache.try_recv_newest().unwrap(), Some(&2))
351 }
352}