gateway_common/utils/
async_map.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3use std::marker::PhantomData;
4use tokio::sync::mpsc::{self, UnboundedSender};
5use tokio::sync::oneshot;
6
7use crate::error::BoxError;
8
9enum CacheSender<K, V> {
10    Get(K),
11    Insert((K, V)),
12    Remove(K),
13}
14
15enum CacheReceiver<V> {
16    Get(Option<V>),
17    Insert(Option<V>),
18    Remove(Option<V>),
19}
20type AsyncMapSender<K, V> =
21    UnboundedSender<(CacheSender<K, V>, oneshot::Sender<CacheReceiver<V>>)>;
22
23#[derive(Clone)]
24pub struct AsyncMap<K, V> {
25    _k: PhantomData<K>,
26    _v: PhantomData<V>,
27    sender: AsyncMapSender<K, V>,
28}
29
30impl<K, V> Default for AsyncMap<K, V>
31where
32    K: Hash + Eq + std::marker::Send + Sync + 'static,
33    V: std::marker::Send + Sync + 'static + Clone,
34{
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl<K, V> AsyncMap<K, V>
41where
42    K: Hash + Eq + std::marker::Send + Sync + 'static,
43    V: std::marker::Send + Sync + 'static + Clone,
44{
45    pub fn new() -> Self {
46        let (sender, mut receiver) =
47            mpsc::unbounded_channel::<(CacheSender<K, V>, oneshot::Sender<CacheReceiver<V>>)>();
48        tokio::spawn(async move {
49            let mut map = HashMap::<K, V>::new();
50            while let Some(msg) = receiver.recv().await {
51                match msg.0 {
52                    CacheSender::Get(key) => {
53                        let _ = msg.1.send(CacheReceiver::Get(map.get(&key).cloned()));
54                    }
55                    CacheSender::Insert((key, value)) => {
56                        let value = map.insert(key, value);
57                        let _ = msg.1.send(CacheReceiver::Insert(value));
58                    }
59                    CacheSender::Remove(key) => {
60                        let value = map.remove(&key);
61                        let _ = msg.1.send(CacheReceiver::Remove(value));
62                    }
63                }
64            }
65        });
66        Self {
67            _k: PhantomData,
68            _v: PhantomData,
69            sender,
70        }
71    }
72
73    pub async fn get(&self, key: K) -> Result<Option<V>, BoxError> {
74        let oneshot = oneshot::channel();
75        let _ = self.sender.send((CacheSender::Get(key), oneshot.0));
76        match oneshot.1.await? {
77            CacheReceiver::Get(value) => Ok(value),
78            _ => Err("err receiver".into()),
79        }
80    }
81
82    pub async fn insert(&self, key: K, value: V) -> Result<Option<V>, BoxError> {
83        let oneshot = oneshot::channel();
84        let _ = self
85            .sender
86            .send((CacheSender::Insert((key, value)), oneshot.0));
87        match oneshot.1.await? {
88            CacheReceiver::Insert(value) => Ok(value),
89            _ => Err("err receiver".into()),
90        }
91    }
92
93    pub async fn remove(&self, key: K) -> Result<Option<V>, BoxError> {
94        let oneshot = oneshot::channel();
95        let _ = self.sender.send((CacheSender::Remove(key), oneshot.0));
96        match oneshot.1.await? {
97            CacheReceiver::Remove(value) => Ok(value),
98            _ => Err("err receiver".into()),
99        }
100    }
101}
102
103
104
105