multi_tier_cache/backends/
memcached_cache.rs1use anyhow::{Result, anyhow};
6use bytes::Bytes;
7use futures_util::future::BoxFuture;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Duration;
11use tracing::{debug, info};
12
13pub struct MemcachedCache {
26 client: memcache::Client,
28 hits: Arc<AtomicU64>,
30 misses: Arc<AtomicU64>,
32 sets: Arc<AtomicU64>,
34}
35
36impl MemcachedCache {
37 pub fn new() -> Result<Self> {
47 info!("Initializing Memcached Cache");
48
49 let memcached_url = std::env::var("MEMCACHED_URL")
51 .unwrap_or_else(|_| "memcache://127.0.0.1:11211".to_string());
52
53 let client = memcache::connect(memcached_url.as_str())
55 .map_err(|e| anyhow!("Failed to connect to Memcached: {e}"))?;
56
57 match client.version() {
59 Ok(versions) => {
60 info!(
61 url = %memcached_url,
62 server_count = versions.len(),
63 "Memcached Cache connected successfully"
64 );
65 }
66 Err(e) => {
67 return Err(anyhow!("Memcached connection test failed: {e}"));
68 }
69 }
70
71 Ok(Self {
72 client,
73 hits: Arc::new(AtomicU64::new(0)),
74 misses: Arc::new(AtomicU64::new(0)),
75 sets: Arc::new(AtomicU64::new(0)),
76 })
77 }
78
79 pub fn get_server_stats(
88 &self,
89 ) -> Result<Vec<(String, std::collections::HashMap<String, String>)>> {
90 self.client
91 .stats()
92 .map_err(|e| anyhow!("Failed to get Memcached stats: {e}"))
93 }
94}
95
96use crate::error::CacheResult;
99use crate::traits::CacheBackend;
100
101impl CacheBackend for MemcachedCache {
103 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
104 Box::pin(async move {
105 if let Ok(Some(bytes_vec)) = self.client.get::<Vec<u8>>(key) {
106 self.hits.fetch_add(1, Ordering::Relaxed);
107 Some(Bytes::from(bytes_vec))
108 } else {
109 self.misses.fetch_add(1, Ordering::Relaxed);
110 None
111 }
112 })
113 }
114
115 fn set_with_ttl<'a>(
116 &'a self,
117 key: &'a str,
118 value: Bytes,
119 ttl: Duration,
120 ) -> BoxFuture<'a, CacheResult<()>> {
121 Box::pin(async move {
122 self.client
123 .set(
124 key,
125 value.as_ref(),
126 u32::try_from(ttl.as_secs()).unwrap_or(u32::MAX),
127 )
128 .map_err(|e| {
129 crate::error::CacheError::BackendError(format!(
130 "Memcached operation failed: {e}"
131 ))
132 })?;
133
134 self.sets.fetch_add(1, Ordering::Relaxed);
135 debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Memcached] Cached key with TTL");
136 Ok(())
137 })
138 }
139
140 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
141 Box::pin(async move {
142 self.client.delete(key).map_err(|e| {
143 crate::error::CacheError::BackendError(format!("Memcached operation failed: {e}"))
144 })?;
145 Ok(())
146 })
147 }
148
149 fn health_check(&self) -> BoxFuture<'_, bool> {
150 Box::pin(async move {
151 let test_key = "health_check_memcached";
152 let test_value = Bytes::from_static(b"health_check");
153
154 match self
155 .set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10))
156 .await
157 {
158 Ok(()) => match self.get(test_key).await {
159 Some(retrieved) => {
160 let _ = self.remove(test_key).await;
161 retrieved == test_value
162 }
163 None => false,
164 },
165 Err(_) => false,
166 }
167 })
168 }
169
170 fn name(&self) -> &'static str {
171 "Memcached"
172 }
173}