1use anyhow::{anyhow, Result};
2use bincode::config;
3pub use moka;
4use moka::{notification::RemovalCause, sync::Cache, Expiry};
5use serde::{de::DeserializeOwned, Serialize};
6use std::{
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11#[derive(Clone, Copy, Debug, Eq, PartialEq)]
12pub enum Expiration {
13 Never,
14 Millis(u64),
15 Second(u64),
16 Minute(u64),
17 Hour(u64),
18}
19
20impl Expiration {
21 pub fn as_duration(&self) -> Option<Duration> {
22 match self {
23 Expiration::Never => None,
24 Expiration::Millis(v) => Some(Duration::from_millis(v.clone())),
25 Expiration::Second(v) => Some(Duration::from_secs(v.clone())),
26 Expiration::Minute(v) => Some(Duration::from_secs(v.clone() * 60)),
27 Expiration::Hour(v) => Some(Duration::from_secs(v.clone() * 60 * 60)),
28 }
29 }
30}
31
32pub type MokaCacheData = (Expiration, Vec<u8>);
33pub struct MokaCache(pub Cache<String, (Expiration, Vec<u8>)>);
34pub type MokaCacheHandler = Arc<MokaCache>;
35
36pub struct MokaCacheExpiry;
37impl Expiry<String, (Expiration, Vec<u8>)> for MokaCacheExpiry {
38 fn expire_after_create(
39 &self,
40 _key: &String,
41 value: &(Expiration, Vec<u8>),
42 _current_time: Instant,
43 ) -> Option<Duration> {
44 value.0.as_duration()
45 }
46}
47
48impl MokaCache {
49 pub fn new_default(
51 callback: Option<fn(Arc<String>, MokaCacheData, RemovalCause)>,
52 max_cap: u64,
53 ) -> MokaCache {
54 let mut c = Cache::builder()
55 .max_capacity(max_cap)
56 .expire_after(MokaCacheExpiry {});
57 if let Some(callback) = callback {
58 c = c.eviction_listener(callback);
59 }
60 MokaCache(c.build())
61 }
62
63 pub fn insert<K, V>(&self, key: K, value: V, exp: Expiration) -> Result<()>
64 where
65 K: AsRef<str>,
66 V: Serialize + Sync + Send,
67 {
68 let k = key.as_ref();
69 let b = bincode::serde::encode_to_vec(&value, config::standard())?;
70 self.0.insert(k.into(), (exp, b));
71 Ok(())
72 }
73
74 pub fn get<K, V>(&self, key: K) -> Option<(Expiration, V)>
75 where
76 K: AsRef<str>,
77 V: DeserializeOwned + Sync + Send,
78 {
79 let v = self.0.get(key.as_ref())?;
80 let c = config::standard();
81 let b = bincode::serde::decode_from_slice::<V, _>(v.1.as_ref(), c);
82 if let Ok((value, _)) = b {
83 return Some((v.0, value));
84 }
85 if let Err(e) = b {
86 log::error!("cache deserialize error: {}", e.to_string());
87 }
88 None
89 }
90
91 pub fn deserialize<V>(d: &[u8]) -> Option<V>
92 where
93 V: DeserializeOwned + Sync + Send,
94 {
95 let c = config::standard();
96 let b = bincode::serde::decode_from_slice::<V, _>(d, c);
97 if let Ok((value, _)) = b {
98 return Some(value);
99 }
100 if let Err(e) = b {
101 log::error!("deserialize error: {}", e.to_string());
102 }
103 return None;
104 }
105
106 pub fn get_exp<K>(&self, key: K) -> Option<Expiration>
107 where
108 K: AsRef<str>,
109 {
110 let value = self.0.get(key.as_ref());
111 if let Some(v) = value {
112 return Some(v.0);
113 }
114 None
115 }
116
117 pub fn remove<K>(&self, key: K)
118 where
119 K: AsRef<str>,
120 {
121 self.0.invalidate(key.as_ref());
122 }
123
124 pub fn contains_key<K>(&self, key: K) -> bool
125 where
126 K: AsRef<str>,
127 {
128 self.0.contains_key(key.as_ref())
129 }
130
131 pub fn check_exp_interval(&self) {
133 self.0.run_pending_tasks();
134 }
135
136 pub fn refresh<K>(&self, key: K) -> Result<()>
138 where
139 K: AsRef<str>,
140 {
141 let k = key.as_ref();
142 let v = self.0.get(k);
143 let Some(v) = v else {
144 return Err(anyhow!("key: {} not found", k));
145 };
146 if v.0 == Expiration::Never {
147 return Ok(());
148 }
149 self.0.invalidate(k);
150 self.0.insert(k.into(), v);
151 return Ok(());
152 }
153}
154
155#[cfg(test)]
156#[allow(dead_code)]
157mod test {
158 use super::*;
159 use serde::{Deserialize, Serialize};
160 use std::process;
161 use std::sync::Mutex;
162 use std::thread::{self, sleep};
163 use toolkit_rs::logger::{self, LogConfig};
164
165 static COUNT: Mutex<u32> = Mutex::new(0);
166 fn cache_key_expired(key: Arc<String>, _value: MokaCacheData, cause: RemovalCause) {
167 let mut k = 0;
168 if let Ok(mut c) = COUNT.lock() {
169 *c += 1;
170 k = *c;
171 }
172 k = k - 1;
173 log::debug!("{}. 过期 key-----> {key}. Cause: {cause:?}", k);
174 }
178 fn new() -> MokaCacheHandler {
179 let lcfg = LogConfig {
180 style: logger::LogStyle::Default,
181 ..LogConfig::default()
182 };
183 logger::setup(lcfg).unwrap_or_else(|e| {
184 println!("log setup err:{}", e);
185 process::exit(1);
186 });
187
188 Arc::new(MokaCache::new_default(Some(cache_key_expired), 512))
189 }
190
191 #[test]
192 fn test_cache_callback() {
193 let m = new();
194
195 let key = "key_0001";
196
197 let mclone = m.clone();
198 let mut k = 0;
199 thread::spawn(move || loop {
200 thread::sleep(Duration::from_millis(50));
201 mclone.check_exp_interval();
202 k = k + 1;
203 });
205
206 for i in 0..10 {
207 thread::sleep(Duration::from_millis(50));
208
209 let c = m.contains_key(key);
210 let g = m.get::<_, u32>(key);
211
212 let mut v = 1;
213 if let Some(k) = g {
214 v = v + k.1;
215 }
216 m.remove(key);
217
218 log::debug!("{}. key exists:{} , get value:{}", i, c, v);
219
220 m.insert(key, v, Expiration::Millis(100)).unwrap();
221 }
222
223 thread::sleep(Duration::from_secs(2));
224 let v = m.get::<_, u32>(key);
225 log::debug!("ok test_cache_get_u1622-->{:?}", v);
226 }
227
228 #[test]
229 fn test_encode_decode() {
230 let value: i32 = 1000;
231 let config = config::standard().with_little_endian();
232 let b = bincode::encode_to_vec(&value, config).unwrap();
233 println!("b-->{:?}", b);
234 let (value, _) = bincode::decode_from_slice::<i32, _>(b.as_ref(), config).unwrap();
235 println!("value-->{}", value);
236 }
237
238 #[test]
239 fn test_cache_u16() {
240 let m = new();
241 m.remove("test_cache_get_u1622");
242 m.insert("test_cache_get_u1622", 1000, Expiration::Never)
243 .unwrap();
244 let v = m.get::<_, u32>("test_cache_get_u1622");
245 println!("test_cache_get_u1622-->{:?}", v);
246 }
247
248 #[test]
249 fn test_cache_byte() {
250 let m = new();
251 let b = b"hello world".to_vec();
252 m.insert("test_cache_get_byte", b, Expiration::Never)
253 .unwrap();
254 let v = m.get::<_, Vec<u8>>("test_cache_get_byte");
255 println!("test_cache_get_byte-->{:?}", v);
256 }
257
258 #[test]
259 fn test_cache_struct() {
260 #[derive(Debug, Clone, Serialize, Deserialize)]
261 struct Config {
262 pub path: String,
263 pub cache_capacity: u32,
264 pub len: usize,
265 }
266 let m = new();
267 let b = Config {
268 path: "test".to_string(),
269 cache_capacity: 1024,
270 len: 1024,
271 };
272 m.insert("test_cache_struct", b, Expiration::Never).unwrap();
273
274 let v = m.get::<_, Config>("test_cache_struct");
275 println!("test_cache_struct-->{:?}", v);
276 }
277
278 #[test]
279 fn test_cache_get() {
280 let m = new();
281
282 m.insert("test_cache_get", "hello world", Expiration::Never)
284 .unwrap();
285 let v = m.get::<_, String>("test_cache_get");
286 println!("test_cache_get--->: {:?}", v);
287
288 m.insert("test_cache_get_bool", true, Expiration::Never)
290 .unwrap();
291 let v = m.get::<_, bool>("test_cache_get_bool");
292 println!("test_cache_get_bool-->{:?}", v);
293
294 m.insert("test_cache_get_bool_false", false, Expiration::Never)
295 .unwrap();
296 let v = m.get::<_, bool>("test_cache_get_bool_false");
297 println!("test_cache_get_bool_false-->{:?}", v);
298
299 m.insert("test_cache_get_i32", 1000, Expiration::Never)
301 .unwrap();
302 let v = m.get::<_, i32>("test_cache_get_i32");
303 println!("test_cache_get_i32-->{:?}", v);
304
305 m.insert(
307 "test_cache_get_byte",
308 b"hello world".to_vec(),
309 Expiration::Never,
310 )
311 .unwrap();
312 let v = m.get::<_, Vec<u8>>("test_cache_get_byte");
313 println!("test_cache_get_byte-->{:?}", v);
314 }
315
316 fn test_cache_delete() {
318 let m = new();
319 let key = "key_u64";
320 println!("sleep 3s");
323 sleep(Duration::from_secs(3));
324 println!("get_exp:{:?}", m.get_exp(key));
325 println!("update:");
328 m.remove(key);
329 sleep(Duration::from_secs(1));
330
331 println!("get_exp:{:?}", m.get_exp(key));
338 }
340
341 #[test]
342 fn test_cache_expire() {
343 let m = new();
344 let key = "key_i32";
345 m.insert("key_i32", 555, Expiration::Second(6)).unwrap();
346
347 println!("sleep 3s");
348 sleep(Duration::from_secs(3));
349 let Some(exp_at) = m.get_exp(key) else {
350 return;
351 };
352 println!("get_exp:{:?}", exp_at);
353 let v = m.get::<_, i32>(key);
354 println!("get_i32:{:?}", v);
355
356 println!("sleep 3s");
357 sleep(Duration::from_secs(2));
358 println!("get_exp:{:?}", m.get_exp(key));
359
360 println!("sleep 5s");
361 sleep(Duration::from_secs(2));
362 let v = m.get::<_, i32>(key);
363 println!("get_i32:{:?}", v);
364
365 let c = m.contains_key(key);
366 println!("contains_key:{:?}", c);
367 }
368
369 #[test]
370 fn test_cache_refresh() {
371 let m = new();
372 let key = "key_i32".to_string();
373 m.insert(&key, 555, Expiration::Second(6)).unwrap();
374 let v = m.get::<_, i32>(&key);
375 println!("get_i32:{:?}", v);
376
377 sleep(Duration::from_secs(2));
378 let Some(exp_at) = m.get_exp(&key) else {
379 return;
380 };
381 println!("get_exp:{:?}", exp_at);
382
383 if let Err(e) = m.refresh(&key) {
384 println!("refresh error:{:?}", e);
385 return;
386 }
387 println!("refresh get_exp:{:?}", m.get_exp(&key));
388
389 println!("sleep 7s");
390 sleep(Duration::from_secs(7));
391 let v = m.get::<_, i32>(key);
392 println!("get_i32:{:?}", v);
393 }
394}