gateway_common/utils/
async_cache.rs

1use crate::error::BoxError;
2use std::marker::PhantomData;
3use tokio::sync::mpsc::{self, UnboundedSender};
4use tokio::sync::oneshot;
5
6enum CacheSender<V> {
7    Get,
8    Insert(V),
9    Clean,
10}
11
12enum CacheReceiver<V> {
13    Get(Option<V>),
14    Insert(Option<V>),
15    Clean(Option<V>),
16}
17type AsyncCacheSender<V> = UnboundedSender<(CacheSender<V>, oneshot::Sender<CacheReceiver<V>>)>;
18
19#[derive(Clone)]
20pub struct AsyncCache<V> {
21    _v: PhantomData<V>,
22    sender: AsyncCacheSender<V>,
23}
24
25impl<V> Default for AsyncCache<V>
26where
27    V: std::marker::Send + Sync + 'static + Clone,
28{
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl<V> AsyncCache<V>
35where
36    V: std::marker::Send + Sync + 'static + Clone,
37{
38    pub fn new() -> Self {
39        let (sender, mut receiver) =
40            mpsc::unbounded_channel::<(CacheSender<V>, oneshot::Sender<CacheReceiver<V>>)>();
41        tokio::spawn(async move {
42            let mut cache = None;
43            while let Some(msg) = receiver.recv().await {
44                match msg.0 {
45                    CacheSender::Get => {
46                        let _ = msg.1.send(CacheReceiver::Get(cache.as_ref().cloned()));
47                    }
48                    CacheSender::Insert(value) => {
49                        let old_value = cache.take();
50                        let _ = cache.insert(value);
51                        let _ = msg.1.send(CacheReceiver::Insert(old_value));
52                    }
53                    CacheSender::Clean => {
54                        let old_value = cache.take();
55                        let _ = msg.1.send(CacheReceiver::Clean(old_value));
56                    }
57                }
58            }
59        });
60        Self {
61            _v: PhantomData,
62            sender,
63        }
64    }
65
66    pub async fn get(&self) -> Result<Option<V>, BoxError> {
67        let oneshot = oneshot::channel();
68        let _ = self.sender.send((CacheSender::Get, oneshot.0));
69        match oneshot.1.await? {
70            CacheReceiver::Get(value) => Ok(value),
71            _ => Err("err receiver".into()),
72        }
73    }
74
75    pub async fn insert(&self, value: V) -> Result<Option<V>, BoxError> {
76        let oneshot = oneshot::channel();
77        let _ = self.sender.send((CacheSender::Insert(value), oneshot.0));
78        match oneshot.1.await? {
79            CacheReceiver::Insert(value) => Ok(value),
80            _ => Err("err receiver".into()),
81        }
82    }
83
84    pub async fn remove(&self) -> Result<Option<V>, BoxError> {
85        let oneshot = oneshot::channel();
86        let _ = self.sender.send((CacheSender::Clean, oneshot.0));
87        match oneshot.1.await? {
88            CacheReceiver::Clean(value) => Ok(value),
89            _ => Err("err receiver".into()),
90        }
91    }
92}