gateway_common/utils/
async_map.rs1use std::collections::HashMap;
2use std::hash::Hash;
3use std::marker::PhantomData;
4use tokio::sync::mpsc::{self, UnboundedSender};
5use tokio::sync::oneshot;
6
7use crate::error::BoxError;
8
9enum CacheSender<K, V> {
10 Get(K),
11 Insert((K, V)),
12 Remove(K),
13}
14
15enum CacheReceiver<V> {
16 Get(Option<V>),
17 Insert(Option<V>),
18 Remove(Option<V>),
19}
20type AsyncMapSender<K, V> =
21 UnboundedSender<(CacheSender<K, V>, oneshot::Sender<CacheReceiver<V>>)>;
22
23#[derive(Clone)]
24pub struct AsyncMap<K, V> {
25 _k: PhantomData<K>,
26 _v: PhantomData<V>,
27 sender: AsyncMapSender<K, V>,
28}
29
30impl<K, V> Default for AsyncMap<K, V>
31where
32 K: Hash + Eq + std::marker::Send + Sync + 'static,
33 V: std::marker::Send + Sync + 'static + Clone,
34{
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl<K, V> AsyncMap<K, V>
41where
42 K: Hash + Eq + std::marker::Send + Sync + 'static,
43 V: std::marker::Send + Sync + 'static + Clone,
44{
45 pub fn new() -> Self {
46 let (sender, mut receiver) =
47 mpsc::unbounded_channel::<(CacheSender<K, V>, oneshot::Sender<CacheReceiver<V>>)>();
48 tokio::spawn(async move {
49 let mut map = HashMap::<K, V>::new();
50 while let Some(msg) = receiver.recv().await {
51 match msg.0 {
52 CacheSender::Get(key) => {
53 let _ = msg.1.send(CacheReceiver::Get(map.get(&key).cloned()));
54 }
55 CacheSender::Insert((key, value)) => {
56 let value = map.insert(key, value);
57 let _ = msg.1.send(CacheReceiver::Insert(value));
58 }
59 CacheSender::Remove(key) => {
60 let value = map.remove(&key);
61 let _ = msg.1.send(CacheReceiver::Remove(value));
62 }
63 }
64 }
65 });
66 Self {
67 _k: PhantomData,
68 _v: PhantomData,
69 sender,
70 }
71 }
72
73 pub async fn get(&self, key: K) -> Result<Option<V>, BoxError> {
74 let oneshot = oneshot::channel();
75 let _ = self.sender.send((CacheSender::Get(key), oneshot.0));
76 match oneshot.1.await? {
77 CacheReceiver::Get(value) => Ok(value),
78 _ => Err("err receiver".into()),
79 }
80 }
81
82 pub async fn insert(&self, key: K, value: V) -> Result<Option<V>, BoxError> {
83 let oneshot = oneshot::channel();
84 let _ = self
85 .sender
86 .send((CacheSender::Insert((key, value)), oneshot.0));
87 match oneshot.1.await? {
88 CacheReceiver::Insert(value) => Ok(value),
89 _ => Err("err receiver".into()),
90 }
91 }
92
93 pub async fn remove(&self, key: K) -> Result<Option<V>, BoxError> {
94 let oneshot = oneshot::channel();
95 let _ = self.sender.send((CacheSender::Remove(key), oneshot.0));
96 match oneshot.1.await? {
97 CacheReceiver::Remove(value) => Ok(value),
98 _ => Err("err receiver".into()),
99 }
100 }
101}
102
103
104
105