1use crate::l1::CachedResponse;
7use crate::metrics::{CacheMetrics, CacheOperation, CacheTier, LatencyTimer};
8use redis::{AsyncCommands, RedisError};
9use std::time::Duration;
10use thiserror::Error;
11use tracing::{debug, error, info, warn};
12
13#[derive(Debug, Error)]
15pub enum L2Error {
16 #[error("Redis connection error: {0}")]
17 Connection(#[from] RedisError),
18
19 #[error("Serialization error: {0}")]
20 Serialization(#[from] serde_json::Error),
21
22 #[error("Cache operation timeout")]
23 Timeout,
24
25 #[error("Cache unavailable")]
26 Unavailable,
27}
28
29#[derive(Debug, Clone)]
31pub struct L2Config {
32 pub redis_url: String,
34 pub ttl_seconds: u64,
36 pub connection_timeout_ms: u64,
38 pub operation_timeout_ms: u64,
40 pub key_prefix: String,
42}
43
44impl Default for L2Config {
45 fn default() -> Self {
46 Self {
47 redis_url: "redis://127.0.0.1:6379".to_string(),
48 ttl_seconds: 3600,
49 connection_timeout_ms: 1000,
50 operation_timeout_ms: 100,
51 key_prefix: "llm_cache:".to_string(),
52 }
53 }
54}
55
56#[derive(Clone)]
58pub struct L2Cache {
59 client: redis::Client,
60 config: L2Config,
61 metrics: CacheMetrics,
62}
63
64impl L2Cache {
65 pub async fn new(metrics: CacheMetrics) -> Result<Self, L2Error> {
67 Self::with_config(L2Config::default(), metrics).await
68 }
69
70 pub async fn with_config(config: L2Config, metrics: CacheMetrics) -> Result<Self, L2Error> {
72 info!(
73 "Initializing L2 cache: url={}, ttl={}s",
74 config.redis_url, config.ttl_seconds
75 );
76
77 let client = redis::Client::open(config.redis_url.as_str())?;
78
79 let mut conn = client.get_multiplexed_async_connection().await?;
81 let _: () = redis::cmd("PING").query_async(&mut conn).await?;
82
83 info!("L2 cache connected to Redis successfully");
84
85 Ok(Self {
86 client,
87 config,
88 metrics,
89 })
90 }
91
92 pub async fn get(&self, key: &str) -> Result<Option<CachedResponse>, L2Error> {
97 let _timer = LatencyTimer::new(CacheTier::L2, self.metrics.clone());
98
99 let prefixed_key = self.prefixed_key(key);
100
101 let result = tokio::time::timeout(
103 Duration::from_millis(self.config.operation_timeout_ms),
104 self.get_internal(&prefixed_key),
105 )
106 .await;
107
108 match result {
109 Ok(Ok(Some(value))) => {
110 debug!("L2 cache HIT: key={}", &key[..16.min(key.len())]);
111 self.metrics
112 .record_operation(CacheTier::L2, CacheOperation::Hit);
113 Ok(Some(value))
114 }
115 Ok(Ok(None)) => {
116 debug!("L2 cache MISS: key={}", &key[..16.min(key.len())]);
117 self.metrics
118 .record_operation(CacheTier::L2, CacheOperation::Miss);
119 Ok(None)
120 }
121 Ok(Err(e)) => {
122 warn!("L2 cache GET error: {}", e);
123 self.metrics
124 .record_operation(CacheTier::L2, CacheOperation::Miss);
125 Err(e)
126 }
127 Err(_) => {
128 warn!("L2 cache GET timeout");
129 self.metrics
130 .record_operation(CacheTier::L2, CacheOperation::Miss);
131 Err(L2Error::Timeout)
132 }
133 }
134 }
135
136 async fn get_internal(&self, key: &str) -> Result<Option<CachedResponse>, L2Error> {
138 let mut conn = self.client.get_multiplexed_async_connection().await?;
139 let data: Option<String> = conn.get(key).await?;
140
141 match data {
142 Some(json) => {
143 let response: CachedResponse = serde_json::from_str(&json)?;
144 Ok(Some(response))
145 }
146 None => Ok(None),
147 }
148 }
149
150 pub async fn set(&self, key: String, value: CachedResponse) -> Result<(), L2Error> {
156 self.set_with_ttl(key, value, self.config.ttl_seconds).await
157 }
158
159 pub async fn set_with_ttl(
161 &self,
162 key: String,
163 value: CachedResponse,
164 ttl_seconds: u64,
165 ) -> Result<(), L2Error> {
166 let _timer = LatencyTimer::new(CacheTier::L2, self.metrics.clone());
167
168 let prefixed_key = self.prefixed_key(&key);
169
170 let result = tokio::time::timeout(
172 Duration::from_millis(self.config.operation_timeout_ms),
173 self.set_internal(prefixed_key, value, ttl_seconds),
174 )
175 .await;
176
177 match result {
178 Ok(Ok(())) => {
179 debug!("L2 cache WRITE: key={}", &key[..16.min(key.len())]);
180 self.metrics
181 .record_operation(CacheTier::L2, CacheOperation::Write);
182 Ok(())
183 }
184 Ok(Err(e)) => {
185 warn!("L2 cache SET error: {}", e);
186 Err(e)
187 }
188 Err(_) => {
189 warn!("L2 cache SET timeout");
190 Err(L2Error::Timeout)
191 }
192 }
193 }
194
195 async fn set_internal(
197 &self,
198 key: String,
199 value: CachedResponse,
200 ttl_seconds: u64,
201 ) -> Result<(), L2Error> {
202 let json = serde_json::to_string(&value)?;
203 let mut conn = self.client.get_multiplexed_async_connection().await?;
204
205 let _: () = conn.set_ex(&key, json, ttl_seconds).await?;
207
208 Ok(())
209 }
210
211 pub async fn remove(&self, key: &str) -> Result<(), L2Error> {
213 let prefixed_key = self.prefixed_key(key);
214 let mut conn = self.client.get_multiplexed_async_connection().await?;
215
216 let _: () = conn.del(&prefixed_key).await?;
217 self.metrics
218 .record_operation(CacheTier::L2, CacheOperation::Delete);
219
220 Ok(())
221 }
222
223 pub async fn clear(&self) -> Result<(), L2Error> {
225 info!("Clearing L2 cache with prefix: {}", self.config.key_prefix);
226
227 let mut conn = self.client.get_multiplexed_async_connection().await?;
228 let pattern = format!("{}*", self.config.key_prefix);
229
230 let keys: Vec<String> = conn.keys(&pattern).await?;
232
233 if !keys.is_empty() {
234 let _: () = conn.del(&keys).await?;
235 info!("Cleared {} keys from L2 cache", keys.len());
236 }
237
238 Ok(())
239 }
240
241 pub async fn health_check(&self) -> bool {
243 match self.client.get_multiplexed_async_connection().await {
244 Ok(mut conn) => {
245 let result: Result<String, RedisError> =
246 redis::cmd("PING").query_async(&mut conn).await;
247 result.is_ok()
248 }
249 Err(_) => false,
250 }
251 }
252
253 pub async fn approximate_size(&self) -> Result<usize, L2Error> {
255 let mut conn = self.client.get_multiplexed_async_connection().await?;
256 let pattern = format!("{}*", self.config.key_prefix);
257 let keys: Vec<String> = conn.keys(&pattern).await?;
258
259 Ok(keys.len())
260 }
261
262 fn prefixed_key(&self, key: &str) -> String {
264 format!("{}{}", self.config.key_prefix, key)
265 }
266
267 pub fn config(&self) -> &L2Config {
269 &self.config
270 }
271}
272
273pub async fn create_l2_cache_optional(config: L2Config, metrics: CacheMetrics) -> Option<L2Cache> {
278 match L2Cache::with_config(config, metrics).await {
279 Ok(cache) => Some(cache),
280 Err(e) => {
281 error!("Failed to initialize L2 cache: {}", e);
282 warn!("Continuing with L1-only caching");
283 None
284 }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use crate::l1::TokenUsage;
292 use chrono::Utc;
293
294 fn create_test_response(content: &str) -> CachedResponse {
295 CachedResponse {
296 content: content.to_string(),
297 tokens: Some(TokenUsage {
298 prompt_tokens: 10,
299 completion_tokens: 20,
300 total_tokens: 30,
301 }),
302 model: "gpt-4".to_string(),
303 cached_at: Utc::now().timestamp(),
304 }
305 }
306
307 #[tokio::test]
311 #[ignore] async fn test_l2_basic_get_set() {
313 let metrics = CacheMetrics::new();
314 let cache = L2Cache::new(metrics).await.expect("Redis not available");
315
316 let key = "test_key".to_string();
317 let response = create_test_response("Hello, Redis!");
318
319 let result = cache.get(&key).await.unwrap();
321 assert!(result.is_none());
322
323 cache.set(key.clone(), response.clone()).await.unwrap();
325
326 let cached = cache.get(&key).await.unwrap();
328 assert!(cached.is_some());
329 assert_eq!(cached.unwrap().content, "Hello, Redis!");
330
331 cache.remove(&key).await.unwrap();
333 }
334
335 #[tokio::test]
336 #[ignore] async fn test_l2_ttl_expiration() {
338 let metrics = CacheMetrics::new();
339 let cache = L2Cache::new(metrics).await.expect("Redis not available");
340
341 let key = "test_ttl_key".to_string();
342 let response = create_test_response("Will expire soon");
343
344 cache.set_with_ttl(key.clone(), response, 1).await.unwrap();
346
347 assert!(cache.get(&key).await.unwrap().is_some());
349
350 tokio::time::sleep(Duration::from_secs(2)).await;
352
353 assert!(cache.get(&key).await.unwrap().is_none());
355 }
356
357 #[tokio::test]
358 #[ignore] async fn test_l2_health_check() {
360 let metrics = CacheMetrics::new();
361 let cache = L2Cache::new(metrics).await.expect("Redis not available");
362
363 assert!(cache.health_check().await);
364 }
365
366 #[tokio::test]
367 #[ignore] async fn test_l2_metrics_recording() {
369 let metrics = CacheMetrics::new();
370 let cache = L2Cache::new(metrics.clone())
371 .await
372 .expect("Redis not available");
373
374 let key = "test_metrics_key".to_string();
375
376 let _ = cache.get(&key).await;
378 assert_eq!(metrics.snapshot().l2_misses, 1);
379
380 cache
382 .set(key.clone(), create_test_response("test"))
383 .await
384 .unwrap();
385 assert_eq!(metrics.snapshot().l2_writes, 1);
386
387 let _ = cache.get(&key).await;
389 assert_eq!(metrics.snapshot().l2_hits, 1);
390
391 cache.remove(&key).await.unwrap();
393 }
394
395 #[tokio::test]
396 #[ignore] async fn test_l2_key_prefix() {
398 let metrics = CacheMetrics::new();
399 let config = L2Config {
400 key_prefix: "test_prefix:".to_string(),
401 ..Default::default()
402 };
403 let cache = L2Cache::with_config(config, metrics)
404 .await
405 .expect("Redis not available");
406
407 let key = "my_key".to_string();
408 cache
409 .set(key.clone(), create_test_response("test"))
410 .await
411 .unwrap();
412
413 let prefixed = cache.prefixed_key(&key);
415 assert!(prefixed.starts_with("test_prefix:"));
416
417 cache.remove(&key).await.unwrap();
419 }
420}