use anyhow::{Result, anyhow};
use bytes::Bytes;
use futures_util::future::BoxFuture;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tracing::{debug, info};
pub struct MemcachedCache {
client: memcache::Client,
hits: Arc<AtomicU64>,
misses: Arc<AtomicU64>,
sets: Arc<AtomicU64>,
}
impl MemcachedCache {
pub fn new() -> Result<Self> {
info!("Initializing Memcached Cache");
let memcached_url = std::env::var("MEMCACHED_URL")
.unwrap_or_else(|_| "memcache://127.0.0.1:11211".to_string());
let client = memcache::connect(memcached_url.as_str())
.map_err(|e| anyhow!("Failed to connect to Memcached: {e}"))?;
match client.version() {
Ok(versions) => {
info!(
url = %memcached_url,
server_count = versions.len(),
"Memcached Cache connected successfully"
);
}
Err(e) => {
return Err(anyhow!("Memcached connection test failed: {e}"));
}
}
Ok(Self {
client,
hits: Arc::new(AtomicU64::new(0)),
misses: Arc::new(AtomicU64::new(0)),
sets: Arc::new(AtomicU64::new(0)),
})
}
pub fn get_server_stats(
&self,
) -> Result<Vec<(String, std::collections::HashMap<String, String>)>> {
self.client
.stats()
.map_err(|e| anyhow!("Failed to get Memcached stats: {e}"))
}
}
use crate::error::CacheResult;
use crate::traits::CacheBackend;
impl CacheBackend for MemcachedCache {
fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
Box::pin(async move {
if let Ok(Some(bytes_vec)) = self.client.get::<Vec<u8>>(key) {
self.hits.fetch_add(1, Ordering::Relaxed);
Some(Bytes::from(bytes_vec))
} else {
self.misses.fetch_add(1, Ordering::Relaxed);
None
}
})
}
fn set_with_ttl<'a>(
&'a self,
key: &'a str,
value: Bytes,
ttl: Duration,
) -> BoxFuture<'a, CacheResult<()>> {
Box::pin(async move {
self.client
.set(
key,
value.as_ref(),
u32::try_from(ttl.as_secs()).unwrap_or(u32::MAX),
)
.map_err(|e| {
crate::error::CacheError::BackendError(format!(
"Memcached operation failed: {e}"
))
})?;
self.sets.fetch_add(1, Ordering::Relaxed);
debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Memcached] Cached key with TTL");
Ok(())
})
}
fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
Box::pin(async move {
self.client.delete(key).map_err(|e| {
crate::error::CacheError::BackendError(format!("Memcached operation failed: {e}"))
})?;
Ok(())
})
}
fn health_check(&self) -> BoxFuture<'_, bool> {
Box::pin(async move {
let test_key = "health_check_memcached";
let test_value = Bytes::from_static(b"health_check");
match self
.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10))
.await
{
Ok(()) => match self.get(test_key).await {
Some(retrieved) => {
let _ = self.remove(test_key).await;
retrieved == test_value
}
None => false,
},
Err(_) => false,
}
})
}
fn name(&self) -> &'static str {
"Memcached"
}
}