fusen_common/utils/
map.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3use tokio::sync::mpsc::{self, UnboundedSender};
4use tokio::sync::oneshot;
5
6enum CacheSender<K, V> {
7    Get(K),
8    Insert((K, V)),
9    Remove(K),
10    Clone,
11}
12
13enum CacheReceiver<K, V> {
14    Get(Option<V>),
15    Insert(Option<V>),
16    Remove(Option<V>),
17    Clone(HashMap<K, V>),
18}
19type AsyncMapSender<K, V> =
20    UnboundedSender<(CacheSender<K, V>, oneshot::Sender<CacheReceiver<K, V>>)>;
21
22#[derive(Clone)]
23pub struct AsyncMap<K, V> {
24    sender: AsyncMapSender<K, V>,
25}
26
27impl<K, V> Default for AsyncMap<K, V>
28where
29    K: Hash + Eq + std::marker::Send + Sync + 'static + Clone,
30    V: std::marker::Send + Sync + 'static + Clone,
31{
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl<K, V> AsyncMap<K, V>
38where
39    K: Hash + Eq + std::marker::Send + Sync + 'static + Clone,
40    V: std::marker::Send + Sync + 'static + Clone,
41{
42    pub fn new() -> Self {
43        let (sender, mut receiver) =
44            mpsc::unbounded_channel::<(CacheSender<K, V>, oneshot::Sender<CacheReceiver<K, V>>)>();
45        tokio::spawn(async move {
46            let mut map = HashMap::<K, V>::new();
47            while let Some(msg) = receiver.recv().await {
48                match msg.0 {
49                    CacheSender::Get(key) => {
50                        let _ = msg.1.send(CacheReceiver::Get(map.get(&key).cloned()));
51                    }
52                    CacheSender::Insert((key, value)) => {
53                        let value = map.insert(key, value);
54                        let _ = msg.1.send(CacheReceiver::Insert(value));
55                    }
56                    CacheSender::Remove(key) => {
57                        let value = map.remove(&key);
58                        let _ = msg.1.send(CacheReceiver::Remove(value));
59                    }
60                    CacheSender::Clone => {
61                        let value = map.clone();
62                        let _ = msg.1.send(CacheReceiver::Clone(value));
63                    }
64                }
65            }
66        });
67        Self { sender }
68    }
69
70    pub async fn get(&self, key: K) -> Option<V> {
71        let oneshot = oneshot::channel();
72        let _ = self.sender.send((CacheSender::Get(key), oneshot.0));
73        match oneshot.1.await.unwrap() {
74            CacheReceiver::Get(value) => value,
75            _ => panic!("err receiver"),
76        }
77    }
78
79    pub async fn insert(&self, key: K, value: V) -> Option<V> {
80        let oneshot = oneshot::channel();
81        let _ = self
82            .sender
83            .send((CacheSender::Insert((key, value)), oneshot.0));
84        match oneshot.1.await.unwrap() {
85            CacheReceiver::Insert(value) => value,
86            _ => panic!("err receiver"),
87        }
88    }
89
90    pub async fn remove(&self, key: K) -> Option<V> {
91        let oneshot = oneshot::channel();
92        let _ = self.sender.send((CacheSender::Remove(key), oneshot.0));
93        match oneshot.1.await.unwrap() {
94            CacheReceiver::Remove(value) => value,
95            _ => panic!("err receiver"),
96        }
97    }
98
99    pub async fn map_clone(&self) -> HashMap<K, V> {
100        let oneshot = oneshot::channel();
101        let _ = self.sender.send((CacheSender::Clone, oneshot.0));
102        match oneshot.1.await.unwrap() {
103            CacheReceiver::Clone(value) => value,
104            _ => panic!("err receiver"),
105        }
106    }
107}