llm_edge_cache/
l2.rs

1//! L2 Distributed Cache using Redis
2//!
3//! Distributed cache layer with persistence and multi-instance sharing.
4//! Target latency: 1-2ms for get/set operations.
5
6use crate::l1::CachedResponse;
7use crate::metrics::{CacheMetrics, CacheOperation, CacheTier, LatencyTimer};
8use redis::{AsyncCommands, RedisError};
9use std::time::Duration;
10use thiserror::Error;
11use tracing::{debug, error, info, warn};
12
13/// L2 cache errors
14#[derive(Debug, Error)]
15pub enum L2Error {
16    #[error("Redis connection error: {0}")]
17    Connection(#[from] RedisError),
18
19    #[error("Serialization error: {0}")]
20    Serialization(#[from] serde_json::Error),
21
22    #[error("Cache operation timeout")]
23    Timeout,
24
25    #[error("Cache unavailable")]
26    Unavailable,
27}
28
29/// Configuration for L2 cache
30#[derive(Debug, Clone)]
31pub struct L2Config {
32    /// Redis connection string (e.g., "redis://127.0.0.1:6379")
33    pub redis_url: String,
34    /// Default TTL in seconds (default: 3600 = 1 hour)
35    pub ttl_seconds: u64,
36    /// Connection timeout in milliseconds (default: 1000)
37    pub connection_timeout_ms: u64,
38    /// Operation timeout in milliseconds (default: 100)
39    pub operation_timeout_ms: u64,
40    /// Key prefix for namespacing (default: "llm_cache:")
41    pub key_prefix: String,
42}
43
44impl Default for L2Config {
45    fn default() -> Self {
46        Self {
47            redis_url: "redis://127.0.0.1:6379".to_string(),
48            ttl_seconds: 3600,
49            connection_timeout_ms: 1000,
50            operation_timeout_ms: 100,
51            key_prefix: "llm_cache:".to_string(),
52        }
53    }
54}
55
56/// L2 cache implementation using Redis
57#[derive(Clone)]
58pub struct L2Cache {
59    client: redis::Client,
60    config: L2Config,
61    metrics: CacheMetrics,
62}
63
64impl L2Cache {
65    /// Create a new L2 cache with default configuration
66    pub async fn new(metrics: CacheMetrics) -> Result<Self, L2Error> {
67        Self::with_config(L2Config::default(), metrics).await
68    }
69
70    /// Create a new L2 cache with custom configuration
71    pub async fn with_config(config: L2Config, metrics: CacheMetrics) -> Result<Self, L2Error> {
72        info!(
73            "Initializing L2 cache: url={}, ttl={}s",
74            config.redis_url, config.ttl_seconds
75        );
76
77        let client = redis::Client::open(config.redis_url.as_str())?;
78
79        // Test connection
80        let mut conn = client.get_multiplexed_async_connection().await?;
81        let _: () = redis::cmd("PING").query_async(&mut conn).await?;
82
83        info!("L2 cache connected to Redis successfully");
84
85        Ok(Self {
86            client,
87            config,
88            metrics,
89        })
90    }
91
92    /// Get a value from the cache
93    ///
94    /// # Performance
95    /// Target: 1-2ms (network round-trip)
96    pub async fn get(&self, key: &str) -> Result<Option<CachedResponse>, L2Error> {
97        let _timer = LatencyTimer::new(CacheTier::L2, self.metrics.clone());
98
99        let prefixed_key = self.prefixed_key(key);
100
101        // Use timeout to prevent slow Redis from blocking
102        let result = tokio::time::timeout(
103            Duration::from_millis(self.config.operation_timeout_ms),
104            self.get_internal(&prefixed_key),
105        )
106        .await;
107
108        match result {
109            Ok(Ok(Some(value))) => {
110                debug!("L2 cache HIT: key={}", &key[..16.min(key.len())]);
111                self.metrics
112                    .record_operation(CacheTier::L2, CacheOperation::Hit);
113                Ok(Some(value))
114            }
115            Ok(Ok(None)) => {
116                debug!("L2 cache MISS: key={}", &key[..16.min(key.len())]);
117                self.metrics
118                    .record_operation(CacheTier::L2, CacheOperation::Miss);
119                Ok(None)
120            }
121            Ok(Err(e)) => {
122                warn!("L2 cache GET error: {}", e);
123                self.metrics
124                    .record_operation(CacheTier::L2, CacheOperation::Miss);
125                Err(e)
126            }
127            Err(_) => {
128                warn!("L2 cache GET timeout");
129                self.metrics
130                    .record_operation(CacheTier::L2, CacheOperation::Miss);
131                Err(L2Error::Timeout)
132            }
133        }
134    }
135
136    /// Internal get implementation
137    async fn get_internal(&self, key: &str) -> Result<Option<CachedResponse>, L2Error> {
138        let mut conn = self.client.get_multiplexed_async_connection().await?;
139        let data: Option<String> = conn.get(key).await?;
140
141        match data {
142            Some(json) => {
143                let response: CachedResponse = serde_json::from_str(&json)?;
144                Ok(Some(response))
145            }
146            None => Ok(None),
147        }
148    }
149
150    /// Set a value in the cache
151    ///
152    /// # Performance
153    /// Target: 1-2ms (async, non-blocking)
154    /// This is designed to be called asynchronously without blocking the main request
155    pub async fn set(&self, key: String, value: CachedResponse) -> Result<(), L2Error> {
156        self.set_with_ttl(key, value, self.config.ttl_seconds).await
157    }
158
159    /// Set a value in the cache with custom TTL
160    pub async fn set_with_ttl(
161        &self,
162        key: String,
163        value: CachedResponse,
164        ttl_seconds: u64,
165    ) -> Result<(), L2Error> {
166        let _timer = LatencyTimer::new(CacheTier::L2, self.metrics.clone());
167
168        let prefixed_key = self.prefixed_key(&key);
169
170        // Use timeout to prevent slow Redis from blocking
171        let result = tokio::time::timeout(
172            Duration::from_millis(self.config.operation_timeout_ms),
173            self.set_internal(prefixed_key, value, ttl_seconds),
174        )
175        .await;
176
177        match result {
178            Ok(Ok(())) => {
179                debug!("L2 cache WRITE: key={}", &key[..16.min(key.len())]);
180                self.metrics
181                    .record_operation(CacheTier::L2, CacheOperation::Write);
182                Ok(())
183            }
184            Ok(Err(e)) => {
185                warn!("L2 cache SET error: {}", e);
186                Err(e)
187            }
188            Err(_) => {
189                warn!("L2 cache SET timeout");
190                Err(L2Error::Timeout)
191            }
192        }
193    }
194
195    /// Internal set implementation
196    async fn set_internal(
197        &self,
198        key: String,
199        value: CachedResponse,
200        ttl_seconds: u64,
201    ) -> Result<(), L2Error> {
202        let json = serde_json::to_string(&value)?;
203        let mut conn = self.client.get_multiplexed_async_connection().await?;
204
205        // Use SETEX to set value with expiration atomically
206        let _: () = conn.set_ex(&key, json, ttl_seconds).await?;
207
208        Ok(())
209    }
210
211    /// Remove a value from the cache
212    pub async fn remove(&self, key: &str) -> Result<(), L2Error> {
213        let prefixed_key = self.prefixed_key(key);
214        let mut conn = self.client.get_multiplexed_async_connection().await?;
215
216        let _: () = conn.del(&prefixed_key).await?;
217        self.metrics
218            .record_operation(CacheTier::L2, CacheOperation::Delete);
219
220        Ok(())
221    }
222
223    /// Clear all cache entries (use with caution!)
224    pub async fn clear(&self) -> Result<(), L2Error> {
225        info!("Clearing L2 cache with prefix: {}", self.config.key_prefix);
226
227        let mut conn = self.client.get_multiplexed_async_connection().await?;
228        let pattern = format!("{}*", self.config.key_prefix);
229
230        // Get all keys matching the pattern
231        let keys: Vec<String> = conn.keys(&pattern).await?;
232
233        if !keys.is_empty() {
234            let _: () = conn.del(&keys).await?;
235            info!("Cleared {} keys from L2 cache", keys.len());
236        }
237
238        Ok(())
239    }
240
241    /// Check if Redis connection is healthy
242    pub async fn health_check(&self) -> bool {
243        match self.client.get_multiplexed_async_connection().await {
244            Ok(mut conn) => {
245                let result: Result<String, RedisError> =
246                    redis::cmd("PING").query_async(&mut conn).await;
247                result.is_ok()
248            }
249            Err(_) => false,
250        }
251    }
252
253    /// Get the current size of the cache (approximate)
254    pub async fn approximate_size(&self) -> Result<usize, L2Error> {
255        let mut conn = self.client.get_multiplexed_async_connection().await?;
256        let pattern = format!("{}*", self.config.key_prefix);
257        let keys: Vec<String> = conn.keys(&pattern).await?;
258
259        Ok(keys.len())
260    }
261
262    /// Add key prefix for namespacing
263    fn prefixed_key(&self, key: &str) -> String {
264        format!("{}{}", self.config.key_prefix, key)
265    }
266
267    /// Get cache configuration
268    pub fn config(&self) -> &L2Config {
269        &self.config
270    }
271}
272
273/// Helper function to create L2 cache with graceful fallback
274///
275/// If Redis is unavailable, returns None and logs a warning.
276/// This allows the application to continue with L1-only caching.
277pub async fn create_l2_cache_optional(config: L2Config, metrics: CacheMetrics) -> Option<L2Cache> {
278    match L2Cache::with_config(config, metrics).await {
279        Ok(cache) => Some(cache),
280        Err(e) => {
281            error!("Failed to initialize L2 cache: {}", e);
282            warn!("Continuing with L1-only caching");
283            None
284        }
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use crate::l1::TokenUsage;
292    use chrono::Utc;
293
294    fn create_test_response(content: &str) -> CachedResponse {
295        CachedResponse {
296            content: content.to_string(),
297            tokens: Some(TokenUsage {
298                prompt_tokens: 10,
299                completion_tokens: 20,
300                total_tokens: 30,
301            }),
302            model: "gpt-4".to_string(),
303            cached_at: Utc::now().timestamp(),
304        }
305    }
306
307    // Note: These tests require a running Redis instance
308    // Run with: docker run -d -p 6379:6379 redis:7-alpine
309
310    #[tokio::test]
311    #[ignore] // Requires Redis
312    async fn test_l2_basic_get_set() {
313        let metrics = CacheMetrics::new();
314        let cache = L2Cache::new(metrics).await.expect("Redis not available");
315
316        let key = "test_key".to_string();
317        let response = create_test_response("Hello, Redis!");
318
319        // Should miss initially
320        let result = cache.get(&key).await.unwrap();
321        assert!(result.is_none());
322
323        // Set value
324        cache.set(key.clone(), response.clone()).await.unwrap();
325
326        // Should hit now
327        let cached = cache.get(&key).await.unwrap();
328        assert!(cached.is_some());
329        assert_eq!(cached.unwrap().content, "Hello, Redis!");
330
331        // Cleanup
332        cache.remove(&key).await.unwrap();
333    }
334
335    #[tokio::test]
336    #[ignore] // Requires Redis
337    async fn test_l2_ttl_expiration() {
338        let metrics = CacheMetrics::new();
339        let cache = L2Cache::new(metrics).await.expect("Redis not available");
340
341        let key = "test_ttl_key".to_string();
342        let response = create_test_response("Will expire soon");
343
344        // Set with 1 second TTL
345        cache.set_with_ttl(key.clone(), response, 1).await.unwrap();
346
347        // Should exist immediately
348        assert!(cache.get(&key).await.unwrap().is_some());
349
350        // Wait for expiration
351        tokio::time::sleep(Duration::from_secs(2)).await;
352
353        // Should be expired
354        assert!(cache.get(&key).await.unwrap().is_none());
355    }
356
357    #[tokio::test]
358    #[ignore] // Requires Redis
359    async fn test_l2_health_check() {
360        let metrics = CacheMetrics::new();
361        let cache = L2Cache::new(metrics).await.expect("Redis not available");
362
363        assert!(cache.health_check().await);
364    }
365
366    #[tokio::test]
367    #[ignore] // Requires Redis
368    async fn test_l2_metrics_recording() {
369        let metrics = CacheMetrics::new();
370        let cache = L2Cache::new(metrics.clone())
371            .await
372            .expect("Redis not available");
373
374        let key = "test_metrics_key".to_string();
375
376        // Miss
377        let _ = cache.get(&key).await;
378        assert_eq!(metrics.snapshot().l2_misses, 1);
379
380        // Write
381        cache
382            .set(key.clone(), create_test_response("test"))
383            .await
384            .unwrap();
385        assert_eq!(metrics.snapshot().l2_writes, 1);
386
387        // Hit
388        let _ = cache.get(&key).await;
389        assert_eq!(metrics.snapshot().l2_hits, 1);
390
391        // Cleanup
392        cache.remove(&key).await.unwrap();
393    }
394
395    #[tokio::test]
396    #[ignore] // Requires Redis
397    async fn test_l2_key_prefix() {
398        let metrics = CacheMetrics::new();
399        let config = L2Config {
400            key_prefix: "test_prefix:".to_string(),
401            ..Default::default()
402        };
403        let cache = L2Cache::with_config(config, metrics)
404            .await
405            .expect("Redis not available");
406
407        let key = "my_key".to_string();
408        cache
409            .set(key.clone(), create_test_response("test"))
410            .await
411            .unwrap();
412
413        // The actual Redis key should be prefixed
414        let prefixed = cache.prefixed_key(&key);
415        assert!(prefixed.starts_with("test_prefix:"));
416
417        // Cleanup
418        cache.remove(&key).await.unwrap();
419    }
420}