gateway_common/utils/
async_cache.rs1use crate::error::BoxError;
2use std::marker::PhantomData;
3use tokio::sync::mpsc::{self, UnboundedSender};
4use tokio::sync::oneshot;
5
6enum CacheSender<V> {
7 Get,
8 Insert(V),
9 Clean,
10}
11
12enum CacheReceiver<V> {
13 Get(Option<V>),
14 Insert(Option<V>),
15 Clean(Option<V>),
16}
17type AsyncCacheSender<V> = UnboundedSender<(CacheSender<V>, oneshot::Sender<CacheReceiver<V>>)>;
18
19#[derive(Clone)]
20pub struct AsyncCache<V> {
21 _v: PhantomData<V>,
22 sender: AsyncCacheSender<V>,
23}
24
25impl<V> Default for AsyncCache<V>
26where
27 V: std::marker::Send + Sync + 'static + Clone,
28{
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl<V> AsyncCache<V>
35where
36 V: std::marker::Send + Sync + 'static + Clone,
37{
38 pub fn new() -> Self {
39 let (sender, mut receiver) =
40 mpsc::unbounded_channel::<(CacheSender<V>, oneshot::Sender<CacheReceiver<V>>)>();
41 tokio::spawn(async move {
42 let mut cache = None;
43 while let Some(msg) = receiver.recv().await {
44 match msg.0 {
45 CacheSender::Get => {
46 let _ = msg.1.send(CacheReceiver::Get(cache.as_ref().cloned()));
47 }
48 CacheSender::Insert(value) => {
49 let old_value = cache.take();
50 let _ = cache.insert(value);
51 let _ = msg.1.send(CacheReceiver::Insert(old_value));
52 }
53 CacheSender::Clean => {
54 let old_value = cache.take();
55 let _ = msg.1.send(CacheReceiver::Clean(old_value));
56 }
57 }
58 }
59 });
60 Self {
61 _v: PhantomData,
62 sender,
63 }
64 }
65
66 pub async fn get(&self) -> Result<Option<V>, BoxError> {
67 let oneshot = oneshot::channel();
68 let _ = self.sender.send((CacheSender::Get, oneshot.0));
69 match oneshot.1.await? {
70 CacheReceiver::Get(value) => Ok(value),
71 _ => Err("err receiver".into()),
72 }
73 }
74
75 pub async fn insert(&self, value: V) -> Result<Option<V>, BoxError> {
76 let oneshot = oneshot::channel();
77 let _ = self.sender.send((CacheSender::Insert(value), oneshot.0));
78 match oneshot.1.await? {
79 CacheReceiver::Insert(value) => Ok(value),
80 _ => Err("err receiver".into()),
81 }
82 }
83
84 pub async fn remove(&self) -> Result<Option<V>, BoxError> {
85 let oneshot = oneshot::channel();
86 let _ = self.sender.send((CacheSender::Clean, oneshot.0));
87 match oneshot.1.await? {
88 CacheReceiver::Clean(value) => Ok(value),
89 _ => Err("err receiver".into()),
90 }
91 }
92}