fusen_common/utils/
map.rs1use std::collections::HashMap;
2use std::hash::Hash;
3use tokio::sync::mpsc::{self, UnboundedSender};
4use tokio::sync::oneshot;
5
6enum CacheSender<K, V> {
7 Get(K),
8 Insert((K, V)),
9 Remove(K),
10 Clone,
11}
12
13enum CacheReceiver<K, V> {
14 Get(Option<V>),
15 Insert(Option<V>),
16 Remove(Option<V>),
17 Clone(HashMap<K, V>),
18}
19type AsyncMapSender<K, V> =
20 UnboundedSender<(CacheSender<K, V>, oneshot::Sender<CacheReceiver<K, V>>)>;
21
22#[derive(Clone)]
23pub struct AsyncMap<K, V> {
24 sender: AsyncMapSender<K, V>,
25}
26
27impl<K, V> Default for AsyncMap<K, V>
28where
29 K: Hash + Eq + std::marker::Send + Sync + 'static + Clone,
30 V: std::marker::Send + Sync + 'static + Clone,
31{
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl<K, V> AsyncMap<K, V>
38where
39 K: Hash + Eq + std::marker::Send + Sync + 'static + Clone,
40 V: std::marker::Send + Sync + 'static + Clone,
41{
42 pub fn new() -> Self {
43 let (sender, mut receiver) =
44 mpsc::unbounded_channel::<(CacheSender<K, V>, oneshot::Sender<CacheReceiver<K, V>>)>();
45 tokio::spawn(async move {
46 let mut map = HashMap::<K, V>::new();
47 while let Some(msg) = receiver.recv().await {
48 match msg.0 {
49 CacheSender::Get(key) => {
50 let _ = msg.1.send(CacheReceiver::Get(map.get(&key).cloned()));
51 }
52 CacheSender::Insert((key, value)) => {
53 let value = map.insert(key, value);
54 let _ = msg.1.send(CacheReceiver::Insert(value));
55 }
56 CacheSender::Remove(key) => {
57 let value = map.remove(&key);
58 let _ = msg.1.send(CacheReceiver::Remove(value));
59 }
60 CacheSender::Clone => {
61 let value = map.clone();
62 let _ = msg.1.send(CacheReceiver::Clone(value));
63 }
64 }
65 }
66 });
67 Self { sender }
68 }
69
70 pub async fn get(&self, key: K) -> Option<V> {
71 let oneshot = oneshot::channel();
72 let _ = self.sender.send((CacheSender::Get(key), oneshot.0));
73 match oneshot.1.await.unwrap() {
74 CacheReceiver::Get(value) => value,
75 _ => panic!("err receiver"),
76 }
77 }
78
79 pub async fn insert(&self, key: K, value: V) -> Option<V> {
80 let oneshot = oneshot::channel();
81 let _ = self
82 .sender
83 .send((CacheSender::Insert((key, value)), oneshot.0));
84 match oneshot.1.await.unwrap() {
85 CacheReceiver::Insert(value) => value,
86 _ => panic!("err receiver"),
87 }
88 }
89
90 pub async fn remove(&self, key: K) -> Option<V> {
91 let oneshot = oneshot::channel();
92 let _ = self.sender.send((CacheSender::Remove(key), oneshot.0));
93 match oneshot.1.await.unwrap() {
94 CacheReceiver::Remove(value) => value,
95 _ => panic!("err receiver"),
96 }
97 }
98
99 pub async fn map_clone(&self) -> HashMap<K, V> {
100 let oneshot = oneshot::channel();
101 let _ = self.sender.send((CacheSender::Clone, oneshot.0));
102 match oneshot.1.await.unwrap() {
103 CacheReceiver::Clone(value) => value,
104 _ => panic!("err receiver"),
105 }
106 }
107}