Skip to main content

moka_cache/
lib.rs

1use anyhow::{anyhow, Result};
2use bincode::config;
3pub use moka;
4use moka::{notification::RemovalCause, sync::Cache, Expiry};
5use serde::{de::DeserializeOwned, Serialize};
6use std::{
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11#[derive(Clone, Copy, Debug, Eq, PartialEq)]
12pub enum Expiration {
13    Never,
14    Millis(u64),
15    Second(u64),
16    Minute(u64),
17    Hour(u64),
18}
19
20impl Expiration {
21    pub fn as_duration(&self) -> Option<Duration> {
22        match self {
23            Expiration::Never => None,
24            Expiration::Millis(v) => Some(Duration::from_millis(v.clone())),
25            Expiration::Second(v) => Some(Duration::from_secs(v.clone())),
26            Expiration::Minute(v) => Some(Duration::from_secs(v.clone() * 60)),
27            Expiration::Hour(v) => Some(Duration::from_secs(v.clone() * 60 * 60)),
28        }
29    }
30}
31
32pub type MokaCacheData = (Expiration, Vec<u8>);
33pub struct MokaCache(pub Cache<String, (Expiration, Vec<u8>)>);
34pub type MokaCacheHandler = Arc<MokaCache>;
35
36pub struct MokaCacheExpiry;
37impl Expiry<String, (Expiration, Vec<u8>)> for MokaCacheExpiry {
38    fn expire_after_create(
39        &self,
40        _key: &String,
41        value: &(Expiration, Vec<u8>),
42        _current_time: Instant,
43    ) -> Option<Duration> {
44        value.0.as_duration()
45    }
46}
47
48impl MokaCache {
49    //初始化缓存
50    pub fn new_default(
51        callback: Option<fn(Arc<String>, MokaCacheData, RemovalCause)>,
52        max_cap: u64,
53    ) -> MokaCache {
54        let mut c = Cache::builder()
55            .max_capacity(max_cap)
56            .expire_after(MokaCacheExpiry {});
57        if let Some(callback) = callback {
58            c = c.eviction_listener(callback);
59        }
60        MokaCache(c.build())
61    }
62
63    pub fn insert<K, V>(&self, key: K, value: V, exp: Expiration) -> Result<()>
64    where
65        K: AsRef<str>,
66        V: Serialize + Sync + Send,
67    {
68        let k = key.as_ref();
69        let b = bincode::serde::encode_to_vec(&value, config::standard())?;
70        self.0.insert(k.into(), (exp, b));
71        Ok(())
72    }
73
74    pub fn get<K, V>(&self, key: K) -> Option<(Expiration, V)>
75    where
76        K: AsRef<str>,
77        V: DeserializeOwned + Sync + Send,
78    {
79        let v = self.0.get(key.as_ref())?;
80        let c = config::standard();
81        let b = bincode::serde::decode_from_slice::<V, _>(v.1.as_ref(), c);
82        if let Ok((value, _)) = b {
83            return Some((v.0, value));
84        }
85        if let Err(e) = b {
86            log::error!("cache deserialize error: {}", e.to_string());
87        }
88        None
89    }
90
91    pub fn deserialize<V>(d: &[u8]) -> Option<V>
92    where
93        V: DeserializeOwned + Sync + Send,
94    {
95        let c = config::standard();
96        let b = bincode::serde::decode_from_slice::<V, _>(d, c);
97        if let Ok((value, _)) = b {
98            return Some(value);
99        }
100        if let Err(e) = b {
101            log::error!("deserialize error: {}", e.to_string());
102        }
103        return None;
104    }
105
106    pub fn get_exp<K>(&self, key: K) -> Option<Expiration>
107    where
108        K: AsRef<str>,
109    {
110        let value = self.0.get(key.as_ref());
111        if let Some(v) = value {
112            return Some(v.0);
113        }
114        None
115    }
116
117    pub fn remove<K>(&self, key: K)
118    where
119        K: AsRef<str>,
120    {
121        self.0.invalidate(key.as_ref());
122    }
123
124    pub fn contains_key<K>(&self, key: K) -> bool
125    where
126        K: AsRef<str>,
127    {
128        self.0.contains_key(key.as_ref())
129    }
130
131    //每隔10检查缓存是否过期
132    pub fn check_exp_interval(&self) {
133        self.0.run_pending_tasks();
134    }
135
136    // 刷新key ttl
137    pub fn refresh<K>(&self, key: K) -> Result<()>
138    where
139        K: AsRef<str>,
140    {
141        let k = key.as_ref();
142        let v = self.0.get(k);
143        let Some(v) = v else {
144            return Err(anyhow!("key: {} not found", k));
145        };
146        if v.0 == Expiration::Never {
147            return Ok(());
148        }
149        self.0.invalidate(k);
150        self.0.insert(k.into(), v);
151        return Ok(());
152    }
153}
154
155#[cfg(test)]
156#[allow(dead_code)]
157mod test {
158    use super::*;
159    use serde::{Deserialize, Serialize};
160    use std::process;
161    use std::sync::Mutex;
162    use std::thread::{self, sleep};
163    use toolkit_rs::logger::{self, LogConfig};
164
165    static COUNT: Mutex<u32> = Mutex::new(0);
166    fn cache_key_expired(key: Arc<String>, _value: MokaCacheData, cause: RemovalCause) {
167        let mut k = 0;
168        if let Ok(mut c) = COUNT.lock() {
169            *c += 1;
170            k = *c;
171        }
172        k = k - 1;
173        log::debug!("{}. 过期 key-----> {key}. Cause: {cause:?}", k);
174        // if RemovalCause::Expired == cause {
175        //     log::debug!("过期 key-----> {key}. value--> {value:?}. Cause: {cause:?}");
176        // }
177    }
178    fn new() -> MokaCacheHandler {
179        let lcfg = LogConfig {
180            style: logger::LogStyle::Default,
181            ..LogConfig::default()
182        };
183        logger::setup(lcfg).unwrap_or_else(|e| {
184            println!("log setup err:{}", e);
185            process::exit(1);
186        });
187
188        Arc::new(MokaCache::new_default(Some(cache_key_expired), 512))
189    }
190
191    #[test]
192    fn test_cache_callback() {
193        let m = new();
194
195        let key = "key_0001";
196
197        let mclone = m.clone();
198        let mut k = 0;
199        thread::spawn(move || loop {
200            thread::sleep(Duration::from_millis(50));
201            mclone.check_exp_interval();
202            k = k + 1;
203            //log::debug!("{}.<---------------- check expire ---------------->", k);
204        });
205
206        for i in 0..10 {
207            thread::sleep(Duration::from_millis(50));
208
209            let c = m.contains_key(key);
210            let g = m.get::<_, u32>(key);
211
212            let mut v = 1;
213            if let Some(k) = g {
214                v = v + k.1;
215            }
216            m.remove(key);
217
218            log::debug!("{}. key exists:{} , get value:{}", i, c, v);
219
220            m.insert(key, v, Expiration::Millis(100)).unwrap();
221        }
222
223        thread::sleep(Duration::from_secs(2));
224        let v = m.get::<_, u32>(key);
225        log::debug!("ok test_cache_get_u1622-->{:?}", v);
226    }
227
228    #[test]
229    fn test_encode_decode() {
230        let value: i32 = 1000;
231        let config = config::standard().with_little_endian();
232        let b = bincode::encode_to_vec(&value, config).unwrap();
233        println!("b-->{:?}", b);
234        let (value, _) = bincode::decode_from_slice::<i32, _>(b.as_ref(), config).unwrap();
235        println!("value-->{}", value);
236    }
237
238    #[test]
239    fn test_cache_u16() {
240        let m = new();
241        m.remove("test_cache_get_u1622");
242        m.insert("test_cache_get_u1622", 1000, Expiration::Never)
243            .unwrap();
244        let v = m.get::<_, u32>("test_cache_get_u1622");
245        println!("test_cache_get_u1622-->{:?}", v);
246    }
247
248    #[test]
249    fn test_cache_byte() {
250        let m = new();
251        let b = b"hello world".to_vec();
252        m.insert("test_cache_get_byte", b, Expiration::Never)
253            .unwrap();
254        let v = m.get::<_, Vec<u8>>("test_cache_get_byte");
255        println!("test_cache_get_byte-->{:?}", v);
256    }
257
258    #[test]
259    fn test_cache_struct() {
260        #[derive(Debug, Clone, Serialize, Deserialize)]
261        struct Config {
262            pub path: String,
263            pub cache_capacity: u32,
264            pub len: usize,
265        }
266        let m = new();
267        let b = Config {
268            path: "test".to_string(),
269            cache_capacity: 1024,
270            len: 1024,
271        };
272        m.insert("test_cache_struct", b, Expiration::Never).unwrap();
273
274        let v = m.get::<_, Config>("test_cache_struct");
275        println!("test_cache_struct-->{:?}", v);
276    }
277
278    #[test]
279    fn test_cache_get() {
280        let m = new();
281
282        //
283        m.insert("test_cache_get", "hello world", Expiration::Never)
284            .unwrap();
285        let v = m.get::<_, String>("test_cache_get");
286        println!("test_cache_get--->: {:?}", v);
287
288        //
289        m.insert("test_cache_get_bool", true, Expiration::Never)
290            .unwrap();
291        let v = m.get::<_, bool>("test_cache_get_bool");
292        println!("test_cache_get_bool-->{:?}", v);
293
294        m.insert("test_cache_get_bool_false", false, Expiration::Never)
295            .unwrap();
296        let v = m.get::<_, bool>("test_cache_get_bool_false");
297        println!("test_cache_get_bool_false-->{:?}", v);
298
299        //
300        m.insert("test_cache_get_i32", 1000, Expiration::Never)
301            .unwrap();
302        let v = m.get::<_, i32>("test_cache_get_i32");
303        println!("test_cache_get_i32-->{:?}", v);
304
305        //
306        m.insert(
307            "test_cache_get_byte",
308            b"hello world".to_vec(),
309            Expiration::Never,
310        )
311        .unwrap();
312        let v = m.get::<_, Vec<u8>>("test_cache_get_byte");
313        println!("test_cache_get_byte-->{:?}", v);
314    }
315
316    //
317    fn test_cache_delete() {
318        let m = new();
319        let key = "key_u64";
320        // insert_u64("key_u64", 555, Expiration::Second(6));
321
322        println!("sleep 3s");
323        sleep(Duration::from_secs(3));
324        println!("get_exp:{:?}", m.get_exp(key));
325        //   println!("get_u64:{:?}", get_u64(&key));
326
327        println!("update:");
328        m.remove(key);
329        sleep(Duration::from_secs(1));
330
331        // insert_u64(key.to_string(), 666, Expiration::Second(12));
332        // println!("get_exp:{:?}", get_exp(&key));
333        // println!("get_u64:{:?}", get_u64(&key));
334
335        // println!("sleep 3s");
336        // sleep(Duration::from_secs(3));
337        println!("get_exp:{:?}", m.get_exp(key));
338        // println!("get_u64:{:?}", get_u64(&key));
339    }
340
341    #[test]
342    fn test_cache_expire() {
343        let m = new();
344        let key = "key_i32";
345        m.insert("key_i32", 555, Expiration::Second(6)).unwrap();
346
347        println!("sleep 3s");
348        sleep(Duration::from_secs(3));
349        let Some(exp_at) = m.get_exp(key) else {
350            return;
351        };
352        println!("get_exp:{:?}", exp_at);
353        let v = m.get::<_, i32>(key);
354        println!("get_i32:{:?}", v);
355
356        println!("sleep 3s");
357        sleep(Duration::from_secs(2));
358        println!("get_exp:{:?}", m.get_exp(key));
359
360        println!("sleep 5s");
361        sleep(Duration::from_secs(2));
362        let v = m.get::<_, i32>(key);
363        println!("get_i32:{:?}", v);
364
365        let c = m.contains_key(key);
366        println!("contains_key:{:?}", c);
367    }
368
369    #[test]
370    fn test_cache_refresh() {
371        let m = new();
372        let key = "key_i32".to_string();
373        m.insert(&key, 555, Expiration::Second(6)).unwrap();
374        let v = m.get::<_, i32>(&key);
375        println!("get_i32:{:?}", v);
376
377        sleep(Duration::from_secs(2));
378        let Some(exp_at) = m.get_exp(&key) else {
379            return;
380        };
381        println!("get_exp:{:?}", exp_at);
382
383        if let Err(e) = m.refresh(&key) {
384            println!("refresh error:{:?}", e);
385            return;
386        }
387        println!("refresh get_exp:{:?}", m.get_exp(&key));
388
389        println!("sleep 7s");
390        sleep(Duration::from_secs(7));
391        let v = m.get::<_, i32>(key);
392        println!("get_i32:{:?}", v);
393    }
394}