use crate::cache_service::ICacheService;
use async_trait::async_trait;
use genies_core::Result;
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::ops::Sub;
use std::sync::Mutex;
use std::time::{Duration, Instant};
pub struct MemService {
pub cache: Mutex<HashMap<String, (String, Option<(Instant, Duration)>), RandomState>>,
}
impl MemService {
pub fn recycling(&self) {
if let Ok(mut map_lock_guard) = self.cache.lock() {
let mut need_removed = vec![];
for (k, v) in map_lock_guard.iter() {
match v.1 {
None => {}
Some((i, d)) => {
if i.elapsed() >= d {
need_removed.push(k.to_string());
}
}
}
}
for x in need_removed {
map_lock_guard.remove(&x);
}
}
}
}
impl Default for MemService {
fn default() -> Self {
Self {
cache: Default::default(),
}
}
}
#[async_trait]
impl ICacheService for MemService {
async fn set_string(&self, k: &str, v: &str) -> Result<String> {
self.recycling();
let mut guard = self.cache.lock()?;
guard.insert(k.to_string(), (v.to_string(), None));
return Ok(v.to_string());
}
async fn get_string(&self, k: &str) -> Result<String> {
self.recycling();
let guard = self.cache.lock()?;
let v = guard.get(k);
match v {
Some((v, _)) => {
return Ok(v.to_string());
}
_ => {
return Ok("".to_string());
}
}
}
async fn del_string(&self, k: &str) -> Result<String> {
self.recycling();
let mut guard = self.cache.lock()?;
let v = guard.remove(k);
match v {
Some((v, _)) => {
return Ok(v.to_string());
}
_ => {
return Ok("".to_string());
}
}
}
async fn set_string_ex(&self, k: &str, v: &str, t: Option<Duration>) -> Result<String> {
self.recycling();
let mut locked = self.cache.lock()?;
let mut e = Option::None;
if let Some(ex) = t {
e = Some((Instant::now(), ex));
}
let inserted = locked.insert(k.to_string(), (v.to_string(), e));
if inserted.is_some() {
return Ok(v.to_string());
}
return Result::Err(genies_core::error::Error::E(format!(
"[mem_service]insert fail!"
)));
}
async fn set_string_ex_nx(&self, k: &str, v: &str, t: Option<Duration>) -> Result<bool> {
self.recycling();
let mut locked = self.cache.lock()?;
if locked.contains_key(k) {
return Ok(false);
}
let e = t.map(|ex| (Instant::now(), ex));
locked.insert(k.to_string(), (v.to_string(), e));
Ok(true)
}
async fn set_value(&self, _k: &str, _v: &[u8]) -> Result<String> {
todo!();
}
async fn get_value(&self, _k: &str) -> Result<Vec<u8>> {
todo!();
}
async fn set_value_ex(&self, _k: &str, _v: &[u8], _t: Option<Duration>) -> Result<String> {
todo!();
}
async fn ttl(&self, k: &str) -> Result<i64> {
self.recycling();
let locked = self.cache.lock()?;
let v = locked.get(k).cloned();
drop(locked);
return match v {
None => Ok(-2),
Some((_r, o)) => match o {
None => Ok(-1),
Some((i, d)) => {
let use_time = i.elapsed();
if d > use_time {
return Ok(d.sub(use_time).as_secs() as i64);
}
Ok(0)
}
},
};
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_nx_first_set_returns_true() {
let service = MemService::default();
let result = service
.set_string_ex_nx("key1", "value1", Some(Duration::from_secs(60)))
.await
.unwrap();
assert!(result, "首次 NX 设置应返回 true");
}
#[tokio::test]
async fn test_nx_existing_key_returns_false() {
let service = MemService::default();
let first = service
.set_string_ex_nx("key2", "value1", Some(Duration::from_secs(60)))
.await
.unwrap();
assert!(first, "首次设置应成功");
let second = service
.set_string_ex_nx("key2", "value2", Some(Duration::from_secs(60)))
.await
.unwrap();
assert!(!second, "key 已存在时 NX 应返回 false");
}
#[tokio::test]
async fn test_nx_then_get_returns_correct_value() {
let service = MemService::default();
let _ = service
.set_string_ex_nx("key3", "expected_value", Some(Duration::from_secs(60)))
.await
.unwrap();
let value = service.get_string("key3").await.unwrap();
assert_eq!(value, "expected_value", "get_string 应返回正确的值");
}
#[tokio::test]
async fn test_full_idempotent_flow_success() {
let service = MemService::default();
let key = "idempotent_key_success";
let ttl = Some(Duration::from_secs(300));
let lock_acquired = service
.set_string_ex_nx(key, "CONSUMING", ttl)
.await
.unwrap();
assert!(lock_acquired, "NX 抢锁应成功");
let _ = service
.set_string_ex(key, "CONSUMED", ttl)
.await
.unwrap();
let retry = service
.set_string_ex_nx(key, "CONSUMING", ttl)
.await
.unwrap();
assert!(!retry, "key 已存在,再次 NX 应返回 false");
let status = service.get_string(key).await.unwrap();
assert_eq!(status, "CONSUMED", "状态应为 CONSUMED");
}
#[tokio::test]
async fn test_idempotent_flow_handler_failure() {
let service = MemService::default();
let key = "idempotent_key_failure";
let ttl = Some(Duration::from_secs(300));
let lock_acquired = service
.set_string_ex_nx(key, "CONSUMING", ttl)
.await
.unwrap();
assert!(lock_acquired, "NX 抢锁应成功");
let _ = service.del_string(key).await.unwrap();
let retry = service
.set_string_ex_nx(key, "CONSUMING", ttl)
.await
.unwrap();
assert!(retry, "key 删除后,再次 NX 应成功");
}
#[tokio::test]
async fn test_nx_ttl_expiry() {
let service = MemService::default();
let key = "ttl_expiry_key";
let first = service
.set_string_ex_nx(key, "value", Some(Duration::from_millis(50)))
.await
.unwrap();
assert!(first, "首次 NX 应成功");
sleep(Duration::from_millis(100)).await;
let after_expiry = service
.set_string_ex_nx(key, "new_value", Some(Duration::from_secs(60)))
.await
.unwrap();
assert!(after_expiry, "TTL 过期后,NX 应再次成功");
}
#[tokio::test]
async fn test_nx_concurrent_only_one_wins() {
let service = Arc::new(MemService::default());
let key = "concurrent_key";
let num_tasks = 10;
let mut handles = vec![];
for _ in 0..num_tasks {
let svc = Arc::clone(&service);
let k = key.to_string();
let handle = tokio::spawn(async move {
svc.set_string_ex_nx(&k, "winner", Some(Duration::from_secs(60)))
.await
.unwrap()
});
handles.push(handle);
}
let mut results = vec![];
for handle in handles {
results.push(handle.await.unwrap());
}
let winners: Vec<_> = results.iter().filter(|&&r| r).collect();
assert_eq!(
winners.len(),
1,
"只有一个 task 应该返回 true,实际有 {} 个",
winners.len()
);
let losers: Vec<_> = results.iter().filter(|&&r| !r).collect();
assert_eq!(
losers.len(),
num_tasks - 1,
"其余 {} 个 task 应返回 false",
num_tasks - 1
);
}
#[tokio::test]
async fn test_nx_without_ttl() {
let service = MemService::default();
let key = "no_ttl_key";
let first = service
.set_string_ex_nx(key, "value_no_ttl", None)
.await
.unwrap();
assert!(first, "首次 NX(无 TTL)应成功");
let second = service
.set_string_ex_nx(key, "another_value", None)
.await
.unwrap();
assert!(!second, "key 已存在,NX 应返回 false");
let value = service.get_string(key).await.unwrap();
assert_eq!(value, "value_no_ttl", "值应保持不变");
}
#[tokio::test]
async fn test_nx_different_keys_independent() {
let service = MemService::default();
let ttl = Some(Duration::from_secs(60));
let key_a = service
.set_string_ex_nx("key_a", "value_a", ttl)
.await
.unwrap();
let key_b = service
.set_string_ex_nx("key_b", "value_b", ttl)
.await
.unwrap();
let key_c = service
.set_string_ex_nx("key_c", "value_c", ttl)
.await
.unwrap();
assert!(key_a, "key_a 首次 NX 应成功");
assert!(key_b, "key_b 首次 NX 应成功");
assert!(key_c, "key_c 首次 NX 应成功");
assert_eq!(service.get_string("key_a").await.unwrap(), "value_a");
assert_eq!(service.get_string("key_b").await.unwrap(), "value_b");
assert_eq!(service.get_string("key_c").await.unwrap(), "value_c");
let key_a_retry = service
.set_string_ex_nx("key_a", "new_value", ttl)
.await
.unwrap();
assert!(!key_a_retry, "key_a 已存在,NX 应返回 false");
assert_eq!(service.get_string("key_b").await.unwrap(), "value_b");
assert_eq!(service.get_string("key_c").await.unwrap(), "value_c");
}
}