use anyhow::{anyhow, Result};
use bincode::config;
pub use moka;
use moka::{notification::RemovalCause, sync::Cache, Expiry};
use serde::{de::DeserializeOwned, Serialize};
use std::{
sync::Arc,
time::{Duration, Instant},
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Expiration {
Never,
Millis(u64),
Second(u64),
Minute(u64),
Hour(u64),
}
impl Expiration {
pub fn as_duration(&self) -> Option<Duration> {
match self {
Expiration::Never => None,
Expiration::Millis(v) => Some(Duration::from_millis(v.clone())),
Expiration::Second(v) => Some(Duration::from_secs(v.clone())),
Expiration::Minute(v) => Some(Duration::from_secs(v.clone() * 60)),
Expiration::Hour(v) => Some(Duration::from_secs(v.clone() * 60 * 60)),
}
}
}
pub type MokaCacheData = (Expiration, Vec<u8>);
pub struct MokaCache(pub Cache<String, (Expiration, Vec<u8>)>);
pub type MokaCacheHandler = Arc<MokaCache>;
pub struct MokaCacheExpiry;
impl Expiry<String, (Expiration, Vec<u8>)> for MokaCacheExpiry {
fn expire_after_create(
&self,
_key: &String,
value: &(Expiration, Vec<u8>),
_current_time: Instant,
) -> Option<Duration> {
value.0.as_duration()
}
}
impl MokaCache {
pub fn new_default(
callback: Option<fn(Arc<String>, MokaCacheData, RemovalCause)>,
max_cap: u64,
) -> MokaCache {
let mut c = Cache::builder()
.max_capacity(max_cap)
.expire_after(MokaCacheExpiry {});
if let Some(callback) = callback {
c = c.eviction_listener(callback);
}
MokaCache(c.build())
}
pub fn insert<K, V>(&self, key: K, value: V, exp: Expiration) -> Result<()>
where
K: AsRef<str>,
V: Serialize + Sync + Send,
{
let k = key.as_ref();
let b = bincode::serde::encode_to_vec(&value, config::standard())?;
self.0.insert(k.into(), (exp, b));
Ok(())
}
pub fn get<K, V>(&self, key: K) -> Option<(Expiration, V)>
where
K: AsRef<str>,
V: DeserializeOwned + Sync + Send,
{
let v = self.0.get(key.as_ref())?;
let c = config::standard();
let b = bincode::serde::decode_from_slice::<V, _>(v.1.as_ref(), c);
if let Ok((value, _)) = b {
return Some((v.0, value));
}
if let Err(e) = b {
log::error!("cache deserialize error: {}", e.to_string());
}
None
}
pub fn deserialize<V>(d: &[u8]) -> Option<V>
where
V: DeserializeOwned + Sync + Send,
{
let c = config::standard();
let b = bincode::serde::decode_from_slice::<V, _>(d, c);
if let Ok((value, _)) = b {
return Some(value);
}
if let Err(e) = b {
log::error!("deserialize error: {}", e.to_string());
}
return None;
}
pub fn get_exp<K>(&self, key: K) -> Option<Expiration>
where
K: AsRef<str>,
{
let value = self.0.get(key.as_ref());
if let Some(v) = value {
return Some(v.0);
}
None
}
pub fn remove<K>(&self, key: K)
where
K: AsRef<str>,
{
self.0.invalidate(key.as_ref());
}
pub fn contains_key<K>(&self, key: K) -> bool
where
K: AsRef<str>,
{
self.0.contains_key(key.as_ref())
}
pub fn check_exp_interval(&self) {
self.0.run_pending_tasks();
}
pub fn refresh<K>(&self, key: K) -> Result<()>
where
K: AsRef<str>,
{
let k = key.as_ref();
let v = self.0.get(k);
let Some(v) = v else {
return Err(anyhow!("key: {} not found", k));
};
if v.0 == Expiration::Never {
return Ok(());
}
self.0.invalidate(k);
self.0.insert(k.into(), v);
return Ok(());
}
}
#[cfg(test)]
#[allow(dead_code)]
mod test {
use super::*;
use serde::{Deserialize, Serialize};
use std::process;
use std::sync::Mutex;
use std::thread::{self, sleep};
use toolkit_rs::logger::{self, LogConfig};
static COUNT: Mutex<u32> = Mutex::new(0);
fn cache_key_expired(key: Arc<String>, _value: MokaCacheData, cause: RemovalCause) {
let mut k = 0;
if let Ok(mut c) = COUNT.lock() {
*c += 1;
k = *c;
}
k = k - 1;
log::debug!("{}. 过期 key-----> {key}. Cause: {cause:?}", k);
}
fn new() -> MokaCacheHandler {
let lcfg = LogConfig {
style: logger::LogStyle::Default,
..LogConfig::default()
};
logger::setup(lcfg).unwrap_or_else(|e| {
println!("log setup err:{}", e);
process::exit(1);
});
Arc::new(MokaCache::new_default(Some(cache_key_expired), 512))
}
#[test]
fn test_cache_callback() {
let m = new();
let key = "key_0001";
let mclone = m.clone();
let mut k = 0;
thread::spawn(move || loop {
thread::sleep(Duration::from_millis(50));
mclone.check_exp_interval();
k = k + 1;
});
for i in 0..10 {
thread::sleep(Duration::from_millis(50));
let c = m.contains_key(key);
let g = m.get::<_, u32>(key);
let mut v = 1;
if let Some(k) = g {
v = v + k.1;
}
m.remove(key);
log::debug!("{}. key exists:{} , get value:{}", i, c, v);
m.insert(key, v, Expiration::Millis(100)).unwrap();
}
thread::sleep(Duration::from_secs(2));
let v = m.get::<_, u32>(key);
log::debug!("ok test_cache_get_u1622-->{:?}", v);
}
#[test]
fn test_encode_decode() {
let value: i32 = 1000;
let config = config::standard().with_little_endian();
let b = bincode::encode_to_vec(&value, config).unwrap();
println!("b-->{:?}", b);
let (value, _) = bincode::decode_from_slice::<i32, _>(b.as_ref(), config).unwrap();
println!("value-->{}", value);
}
#[test]
fn test_cache_u16() {
let m = new();
m.remove("test_cache_get_u1622");
m.insert("test_cache_get_u1622", 1000, Expiration::Never)
.unwrap();
let v = m.get::<_, u32>("test_cache_get_u1622");
println!("test_cache_get_u1622-->{:?}", v);
}
#[test]
fn test_cache_byte() {
let m = new();
let b = b"hello world".to_vec();
m.insert("test_cache_get_byte", b, Expiration::Never)
.unwrap();
let v = m.get::<_, Vec<u8>>("test_cache_get_byte");
println!("test_cache_get_byte-->{:?}", v);
}
#[test]
fn test_cache_struct() {
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Config {
pub path: String,
pub cache_capacity: u32,
pub len: usize,
}
let m = new();
let b = Config {
path: "test".to_string(),
cache_capacity: 1024,
len: 1024,
};
m.insert("test_cache_struct", b, Expiration::Never).unwrap();
let v = m.get::<_, Config>("test_cache_struct");
println!("test_cache_struct-->{:?}", v);
}
#[test]
fn test_cache_get() {
let m = new();
m.insert("test_cache_get", "hello world", Expiration::Never)
.unwrap();
let v = m.get::<_, String>("test_cache_get");
println!("test_cache_get--->: {:?}", v);
m.insert("test_cache_get_bool", true, Expiration::Never)
.unwrap();
let v = m.get::<_, bool>("test_cache_get_bool");
println!("test_cache_get_bool-->{:?}", v);
m.insert("test_cache_get_bool_false", false, Expiration::Never)
.unwrap();
let v = m.get::<_, bool>("test_cache_get_bool_false");
println!("test_cache_get_bool_false-->{:?}", v);
m.insert("test_cache_get_i32", 1000, Expiration::Never)
.unwrap();
let v = m.get::<_, i32>("test_cache_get_i32");
println!("test_cache_get_i32-->{:?}", v);
m.insert(
"test_cache_get_byte",
b"hello world".to_vec(),
Expiration::Never,
)
.unwrap();
let v = m.get::<_, Vec<u8>>("test_cache_get_byte");
println!("test_cache_get_byte-->{:?}", v);
}
fn test_cache_delete() {
let m = new();
let key = "key_u64";
println!("sleep 3s");
sleep(Duration::from_secs(3));
println!("get_exp:{:?}", m.get_exp(key));
println!("update:");
m.remove(key);
sleep(Duration::from_secs(1));
println!("get_exp:{:?}", m.get_exp(key));
}
#[test]
fn test_cache_expire() {
let m = new();
let key = "key_i32";
m.insert("key_i32", 555, Expiration::Second(6)).unwrap();
println!("sleep 3s");
sleep(Duration::from_secs(3));
let Some(exp_at) = m.get_exp(key) else {
return;
};
println!("get_exp:{:?}", exp_at);
let v = m.get::<_, i32>(key);
println!("get_i32:{:?}", v);
println!("sleep 3s");
sleep(Duration::from_secs(2));
println!("get_exp:{:?}", m.get_exp(key));
println!("sleep 5s");
sleep(Duration::from_secs(2));
let v = m.get::<_, i32>(key);
println!("get_i32:{:?}", v);
let c = m.contains_key(key);
println!("contains_key:{:?}", c);
}
#[test]
fn test_cache_refresh() {
let m = new();
let key = "key_i32".to_string();
m.insert(&key, 555, Expiration::Second(6)).unwrap();
let v = m.get::<_, i32>(&key);
println!("get_i32:{:?}", v);
sleep(Duration::from_secs(2));
let Some(exp_at) = m.get_exp(&key) else {
return;
};
println!("get_exp:{:?}", exp_at);
if let Err(e) = m.refresh(&key) {
println!("refresh error:{:?}", e);
return;
}
println!("refresh get_exp:{:?}", m.get_exp(&key));
println!("sleep 7s");
sleep(Duration::from_secs(7));
let v = m.get::<_, i32>(key);
println!("get_i32:{:?}", v);
}
}