fusen_common/utils/
cache.rs

1use tokio::sync::mpsc::{self, UnboundedSender};
2use tokio::sync::oneshot;
3
4enum CacheSender<V> {
5    Get,
6    Insert(V),
7    Clean,
8}
9
10enum CacheReceiver<V> {
11    Get(Option<V>),
12    Insert(Option<V>),
13    Clean(Option<V>),
14}
15type AsyncCacheSender<V> = UnboundedSender<(CacheSender<V>, oneshot::Sender<CacheReceiver<V>>)>;
16
17#[derive(Clone)]
18pub struct AsyncCache<V> {
19    sender: AsyncCacheSender<V>,
20}
21
22impl<V> Default for AsyncCache<V>
23where
24    V: std::marker::Send + Sync + 'static + Clone,
25{
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31impl<V> AsyncCache<V>
32where
33    V: std::marker::Send + Sync + 'static + Clone,
34{
35    pub fn new() -> Self {
36        let (sender, mut receiver) =
37            mpsc::unbounded_channel::<(CacheSender<V>, oneshot::Sender<CacheReceiver<V>>)>();
38        tokio::spawn(async move {
39            let mut cache = None;
40            while let Some(msg) = receiver.recv().await {
41                match msg.0 {
42                    CacheSender::Get => {
43                        let _ = msg.1.send(CacheReceiver::Get(cache.as_ref().cloned()));
44                    }
45                    CacheSender::Insert(value) => {
46                        let old_value = cache.take();
47                        let _ = cache.insert(value);
48                        let _ = msg.1.send(CacheReceiver::Insert(old_value));
49                    }
50                    CacheSender::Clean => {
51                        let old_value = cache.take();
52                        let _ = msg.1.send(CacheReceiver::Clean(old_value));
53                    }
54                }
55            }
56        });
57        Self { sender }
58    }
59
60    pub async fn get(&self) -> Option<V> {
61        let oneshot = oneshot::channel();
62        let _ = self.sender.send((CacheSender::Get, oneshot.0));
63        match oneshot.1.await.unwrap() {
64            CacheReceiver::Get(value) => value,
65            _ => panic!("err receiver"),
66        }
67    }
68
69    pub async fn insert(&self, value: V) -> Option<V> {
70        let oneshot = oneshot::channel();
71        let _ = self.sender.send((CacheSender::Insert(value), oneshot.0));
72        match oneshot.1.await.unwrap() {
73            CacheReceiver::Insert(value) => value,
74            _ => panic!("err receiver"),
75        }
76    }
77
78    pub async fn remove(&self) -> Option<V> {
79        let oneshot = oneshot::channel();
80        let _ = self.sender.send((CacheSender::Clean, oneshot.0));
81        match oneshot.1.await.unwrap() {
82            CacheReceiver::Clean(value) => value,
83            _ => panic!("err receiver"),
84        }
85    }
86}