fusen_common/utils/
cache.rs1use tokio::sync::mpsc::{self, UnboundedSender};
2use tokio::sync::oneshot;
3
4enum CacheSender<V> {
5 Get,
6 Insert(V),
7 Clean,
8}
9
10enum CacheReceiver<V> {
11 Get(Option<V>),
12 Insert(Option<V>),
13 Clean(Option<V>),
14}
15type AsyncCacheSender<V> = UnboundedSender<(CacheSender<V>, oneshot::Sender<CacheReceiver<V>>)>;
16
17#[derive(Clone)]
18pub struct AsyncCache<V> {
19 sender: AsyncCacheSender<V>,
20}
21
22impl<V> Default for AsyncCache<V>
23where
24 V: std::marker::Send + Sync + 'static + Clone,
25{
26 fn default() -> Self {
27 Self::new()
28 }
29}
30
31impl<V> AsyncCache<V>
32where
33 V: std::marker::Send + Sync + 'static + Clone,
34{
35 pub fn new() -> Self {
36 let (sender, mut receiver) =
37 mpsc::unbounded_channel::<(CacheSender<V>, oneshot::Sender<CacheReceiver<V>>)>();
38 tokio::spawn(async move {
39 let mut cache = None;
40 while let Some(msg) = receiver.recv().await {
41 match msg.0 {
42 CacheSender::Get => {
43 let _ = msg.1.send(CacheReceiver::Get(cache.as_ref().cloned()));
44 }
45 CacheSender::Insert(value) => {
46 let old_value = cache.take();
47 let _ = cache.insert(value);
48 let _ = msg.1.send(CacheReceiver::Insert(old_value));
49 }
50 CacheSender::Clean => {
51 let old_value = cache.take();
52 let _ = msg.1.send(CacheReceiver::Clean(old_value));
53 }
54 }
55 }
56 });
57 Self { sender }
58 }
59
60 pub async fn get(&self) -> Option<V> {
61 let oneshot = oneshot::channel();
62 let _ = self.sender.send((CacheSender::Get, oneshot.0));
63 match oneshot.1.await.unwrap() {
64 CacheReceiver::Get(value) => value,
65 _ => panic!("err receiver"),
66 }
67 }
68
69 pub async fn insert(&self, value: V) -> Option<V> {
70 let oneshot = oneshot::channel();
71 let _ = self.sender.send((CacheSender::Insert(value), oneshot.0));
72 match oneshot.1.await.unwrap() {
73 CacheReceiver::Insert(value) => value,
74 _ => panic!("err receiver"),
75 }
76 }
77
78 pub async fn remove(&self) -> Option<V> {
79 let oneshot = oneshot::channel();
80 let _ = self.sender.send((CacheSender::Clean, oneshot.0));
81 match oneshot.1.await.unwrap() {
82 CacheReceiver::Clean(value) => value,
83 _ => panic!("err receiver"),
84 }
85 }
86}