multi_tier_cache/
l2_cache.rs1use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use serde_json;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12pub struct L2Cache {
14 client: Client,
16 hits: Arc<AtomicU64>,
19 misses: Arc<AtomicU64>,
21 sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26 pub async fn new() -> Result<Self> {
28 println!(" 🔴 Initializing L2 Cache (Redis)...");
29
30 let redis_url = std::env::var("REDIS_URL")
32 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34 let client = Client::open(redis_url.as_str())?;
35
36 let mut conn = client.get_multiplexed_async_connection().await?;
38 let _: String = redis::cmd("PING").query_async(&mut conn).await?;
39
40 println!(" ✅ L2 Cache connected to Redis at {}", redis_url);
41
42 Ok(Self {
43 client,
44 hits: Arc::new(AtomicU64::new(0)),
45 misses: Arc::new(AtomicU64::new(0)),
46 sets: Arc::new(AtomicU64::new(0)),
47 })
48 }
49
50 pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
52 match self.client.get_multiplexed_async_connection().await {
53 Ok(mut conn) => {
54 match conn.get::<_, String>(key).await {
55 Ok(json_str) => {
56 match serde_json::from_str(&json_str) {
57 Ok(value) => {
58 self.hits.fetch_add(1, Ordering::Relaxed);
59 Some(value)
60 }
61 Err(_) => {
62 self.misses.fetch_add(1, Ordering::Relaxed);
63 None
64 }
65 }
66 }
67 Err(_) => {
68 self.misses.fetch_add(1, Ordering::Relaxed);
69 None
70 }
71 }
72 }
73 Err(_) => {
74 self.misses.fetch_add(1, Ordering::Relaxed);
75 None
76 }
77 }
78 }
79
80 pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
82 let json_str = serde_json::to_string(&value)?;
83 let mut conn = self.client.get_multiplexed_async_connection().await?;
84
85 let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
86 self.sets.fetch_add(1, Ordering::Relaxed);
87 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
88 Ok(())
89 }
90
91 pub async fn remove(&self, key: &str) -> Result<()> {
93 let mut conn = self.client.get_multiplexed_async_connection().await?;
94 let _: () = conn.del(key).await?;
95 Ok(())
96 }
97
98 pub async fn health_check(&self) -> bool {
100 let test_key = "health_check_l2";
101 let timestamp = std::time::SystemTime::now()
102 .duration_since(std::time::UNIX_EPOCH)
103 .unwrap_or(Duration::from_secs(0))
104 .as_secs();
105 let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
106
107 match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
108 Ok(_) => {
109 match self.get(test_key).await {
110 Some(retrieved) => {
111 let _ = self.remove(test_key).await;
112 retrieved["test"].as_bool().unwrap_or(false)
113 }
114 None => false
115 }
116 }
117 Err(_) => false
118 }
119 }
120
121 pub async fn stream_add(
131 &self,
132 stream_key: &str,
133 fields: Vec<(String, String)>,
134 maxlen: Option<usize>
135 ) -> Result<String> {
136 let mut conn = self.client.get_multiplexed_async_connection().await?;
137
138 let mut cmd = redis::cmd("XADD");
140 cmd.arg(stream_key);
141
142 if let Some(max) = maxlen {
143 cmd.arg("MAXLEN").arg("~").arg(max);
145 }
146
147 cmd.arg("*"); for (field, value) in &fields {
151 cmd.arg(field).arg(value);
152 }
153
154 let entry_id: String = cmd.query_async(&mut conn).await?;
155
156 println!("📤 [L2 Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
157 Ok(entry_id)
158 }
159
160 pub async fn stream_read_latest(
169 &self,
170 stream_key: &str,
171 count: usize
172 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
173 let mut conn = self.client.get_multiplexed_async_connection().await?;
174
175 let raw_result: redis::Value = redis::cmd("XREVRANGE")
177 .arg(stream_key)
178 .arg("+") .arg("-") .arg("COUNT")
181 .arg(count)
182 .query_async(&mut conn)
183 .await?;
184
185 let entries = Self::parse_stream_response(raw_result)?;
186
187 println!("📥 [L2 Stream] Read {} entries from '{}'", entries.len(), stream_key);
188 Ok(entries)
189 }
190
191 pub async fn stream_read(
202 &self,
203 stream_key: &str,
204 last_id: &str,
205 count: usize,
206 block_ms: Option<usize>
207 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
208 let mut conn = self.client.get_multiplexed_async_connection().await?;
209
210 let mut cmd = redis::cmd("XREAD");
211
212 if let Some(block) = block_ms {
214 cmd.arg("BLOCK").arg(block);
215 }
216
217 cmd.arg("COUNT").arg(count);
218 cmd.arg("STREAMS").arg(stream_key).arg(last_id);
219
220 let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
221
222 let entries = Self::parse_xread_response(raw_result)?;
224
225 println!("📥 [L2 Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
226 Ok(entries)
227 }
228
229 fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
232 match value {
233 redis::Value::Array(entries) => {
234 let mut result = Vec::new();
235
236 for entry in entries {
237 if let redis::Value::Array(entry_parts) = entry {
238 if entry_parts.len() >= 2 {
239 let id = Self::value_to_string(&entry_parts[0]);
241
242 if let redis::Value::Array(field_values) = &entry_parts[1] {
244 let mut fields = Vec::new();
245
246 for chunk in field_values.chunks(2) {
248 if chunk.len() == 2 {
249 let field = Self::value_to_string(&chunk[0]);
250 let value = Self::value_to_string(&chunk[1]);
251 fields.push((field, value));
252 }
253 }
254
255 result.push((id, fields));
256 }
257 }
258 }
259 }
260
261 Ok(result)
262 }
263 redis::Value::Nil => Ok(Vec::new()),
264 _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
265 }
266 }
267
268 fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
271 match value {
272 redis::Value::Array(streams) => {
273 let mut all_entries = Vec::new();
274
275 for stream in streams {
276 if let redis::Value::Array(stream_parts) = stream {
277 if stream_parts.len() >= 2 {
278 if let redis::Value::Array(entries) = &stream_parts[1] {
280 for entry in entries {
281 if let redis::Value::Array(entry_parts) = entry {
282 if entry_parts.len() >= 2 {
283 let id = Self::value_to_string(&entry_parts[0]);
284
285 if let redis::Value::Array(field_values) = &entry_parts[1] {
286 let mut fields = Vec::new();
287
288 for chunk in field_values.chunks(2) {
289 if chunk.len() == 2 {
290 let field = Self::value_to_string(&chunk[0]);
291 let value = Self::value_to_string(&chunk[1]);
292 fields.push((field, value));
293 }
294 }
295
296 all_entries.push((id, fields));
297 }
298 }
299 }
300 }
301 }
302 }
303 }
304 }
305
306 Ok(all_entries)
307 }
308 redis::Value::Nil => Ok(Vec::new()),
309 _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
310 }
311 }
312
313 fn value_to_string(value: &redis::Value) -> String {
315 match value {
316 redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
317 redis::Value::SimpleString(s) => s.clone(),
318 redis::Value::Int(i) => i.to_string(),
319 redis::Value::Okay => "OK".to_string(),
320 redis::Value::Nil => String::new(),
321 _ => String::new(),
322 }
323 }
324}