kotoba_cache/
lib.rs

1//! `kotoba-cache`
2//!
3//! Redis-based distributed cache layer for KotobaDB.
4//! Provides high-performance caching with distributed invalidation.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10use redis::{Client, Connection, AsyncCommands, RedisResult};
11use dashmap::DashMap;
12use serde::{Deserialize, Serialize};
13use tracing::{info, warn, error, instrument};
14use metrics::{counter, histogram};
15use chrono::{DateTime, Utc};
16
17/// Cache layer configuration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CacheConfig {
20    /// Redis connection URLs (cluster support)
21    pub redis_urls: Vec<String>,
22    /// Connection timeout in seconds
23    pub connection_timeout_seconds: u64,
24    /// Default TTL for cache entries in seconds
25    pub default_ttl_seconds: u64,
26    /// Maximum cache size (approximate)
27    pub max_size_bytes: u64,
28    /// Enable compression for large values
29    pub enable_compression: bool,
30    /// Compression threshold in bytes
31    pub compression_threshold_bytes: usize,
32    /// Enable metrics collection
33    pub enable_metrics: bool,
34    /// Cache key prefix
35    pub key_prefix: String,
36}
37
38impl Default for CacheConfig {
39    fn default() -> Self {
40        Self {
41            redis_urls: vec!["redis://127.0.0.1:6379".to_string()],
42            connection_timeout_seconds: 30,
43            default_ttl_seconds: 3600, // 1 hour
44            max_size_bytes: 1024 * 1024 * 1024, // 1GB
45            enable_compression: true,
46            compression_threshold_bytes: 1024, // 1KB
47            enable_metrics: true,
48            key_prefix: "kotoba:cache".to_string(),
49        }
50    }
51}
52
53/// Main cache layer implementation
54pub struct CacheLayer {
55    /// Configuration
56    config: CacheConfig,
57    /// Redis client
58    client: Client,
59    /// Single connection (simplified for now)
60    connection: Arc<RwLock<Option<redis::aio::Connection>>>,
61    /// Cache statistics
62    stats: Arc<RwLock<CacheStats>>,
63    /// Active cache entries (for local tracking)
64    active_entries: Arc<DashMap<String, CacheEntry>>,
65}
66
67/// Cache entry metadata
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct CacheEntry {
70    /// Cache key
71    pub key: String,
72    /// Entry size in bytes
73    pub size_bytes: usize,
74    /// Creation timestamp
75    pub created_at: DateTime<Utc>,
76    /// Last access timestamp
77    pub last_accessed: DateTime<Utc>,
78    /// Access count
79    pub access_count: u64,
80    /// TTL in seconds
81    pub ttl_seconds: Option<u64>,
82}
83
84/// Cache statistics
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct CacheStats {
87    pub hits: u64,
88    pub misses: u64,
89    pub sets: u64,
90    pub deletes: u64,
91    pub evictions: u64,
92    pub hit_ratio: f64,
93    pub total_size_bytes: u64,
94    pub entries_count: u64,
95}
96
97impl CacheLayer {
98    /// Create a new cache layer
99    pub async fn new(config: CacheConfig) -> Result<Self, CacheError> {
100        info!("Initializing Redis cache layer with config: {:?}", config);
101
102        // Create Redis client (use first URL for now)
103        let client = Client::open(config.redis_urls.first().unwrap_or(&"redis://127.0.0.1:6379".to_string()).clone())
104            .map_err(|e| CacheError::ConnectionError(e.to_string()))?;
105
106        // Create connection
107        let connection = match client.get_async_connection().await {
108            Ok(conn) => Some(conn),
109            Err(e) => {
110                warn!("Failed to establish Redis connection: {}. Using mock cache.", e);
111                None // Allow mock operation for testing
112            }
113        };
114
115        if connection.is_some() {
116            info!("Redis connection established successfully");
117        }
118
119        let cache = Self {
120            config,
121            client,
122            connection: Arc::new(RwLock::new(connection)),
123            stats: Arc::new(RwLock::new(CacheStats::default())),
124            active_entries: Arc::new(DashMap::new()),
125        };
126
127        Ok(cache)
128    }
129
130    /// Get a value from cache
131    #[instrument(skip(self))]
132    pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>, CacheError> {
133        // Check if we have a Redis connection
134        let conn_opt = self.connection.read().await;
135        if conn_opt.is_none() {
136            // No Redis connection, simulate cache miss
137            if self.config.enable_metrics {
138                self.record_miss().await;
139            }
140            return Ok(None);
141        }
142        drop(conn_opt);
143
144        let cache_key = self.make_cache_key(key);
145        let mut connection = self.get_connection().await?;
146
147        let result = connection.get::<_, Option<Vec<u8>>>(&cache_key).await;
148        self.return_connection(connection);
149
150        match result {
151            Ok(Some(data)) => {
152                // Decompress if needed
153                let decompressed_data = if self.config.enable_compression {
154                    self.decompress_data(&data)?
155                } else {
156                    data
157                };
158
159                // Deserialize
160                let value: serde_json::Value = serde_json::from_slice(&decompressed_data)
161                    .map_err(|e| CacheError::SerializationError(e.to_string()))?;
162
163                // Update statistics
164                if self.config.enable_metrics {
165                    self.record_hit().await;
166                }
167
168                // Update access metadata
169                self.update_access_metadata(key).await?;
170
171                Ok(Some(value))
172            }
173            Ok(None) => {
174                if self.config.enable_metrics {
175                    self.record_miss().await;
176                }
177                Ok(None)
178            }
179            Err(e) => {
180                error!("Cache get error for key {}: {}", cache_key, e);
181                if self.config.enable_metrics {
182                    self.record_miss().await;
183                }
184                Err(CacheError::RedisError(e.to_string()))
185            }
186        }
187    }
188
189    /// Set a value in cache
190    #[instrument(skip(self, value))]
191    pub async fn set(
192        &self,
193        key: &str,
194        value: serde_json::Value,
195        ttl_seconds: Option<u64>,
196    ) -> Result<(), CacheError> {
197        // Check if we have a Redis connection
198        let conn_opt = self.connection.read().await;
199        if conn_opt.is_none() {
200            // No Redis connection, just return success for mock operation
201            if self.config.enable_metrics {
202                self.record_set().await;
203            }
204            return Ok(());
205        }
206        drop(conn_opt);
207
208        let cache_key = self.make_cache_key(key);
209        let mut connection = self.get_connection().await?;
210
211        // Serialize value
212        let serialized_data = serde_json::to_vec(&value)
213            .map_err(|e| CacheError::SerializationError(e.to_string()))?;
214
215        // Compress if enabled and above threshold
216        let final_data = if self.config.enable_compression && serialized_data.len() > self.config.compression_threshold_bytes {
217            self.compress_data(&serialized_data)?
218        } else {
219            serialized_data.clone()
220        };
221
222        // Set TTL
223        let ttl = ttl_seconds.unwrap_or(self.config.default_ttl_seconds);
224
225        let result = if ttl > 0 {
226            connection.set_ex(&cache_key, final_data, ttl as u64).await
227        } else {
228            connection.set(&cache_key, final_data).await
229        };
230
231        self.return_connection(connection);
232
233        match result {
234            Ok(()) => {
235                // Update statistics
236                if self.config.enable_metrics {
237                    self.record_set().await;
238                }
239
240                // Track cache entry
241                let entry = CacheEntry {
242                    key: key.to_string(),
243                    size_bytes: serialized_data.len(),
244                    created_at: Utc::now(),
245                    last_accessed: Utc::now(),
246                    access_count: 0,
247                    ttl_seconds: Some(ttl),
248                };
249                self.active_entries.insert(key.to_string(), entry);
250
251                // Check cache size limit
252                self.enforce_size_limit().await?;
253
254                Ok(())
255            }
256            Err(e) => {
257                error!("Cache set error for key {}: {}", cache_key, e);
258                Err(CacheError::RedisError(e.to_string()))
259            }
260        }
261    }
262
263    /// Delete a value from cache
264    #[instrument(skip(self))]
265    pub async fn delete(&self, key: &str) -> Result<bool, CacheError> {
266        // Check if we have a Redis connection
267        let conn_opt = self.connection.read().await;
268        if conn_opt.is_none() {
269            // No Redis connection, simulate successful delete
270            if self.config.enable_metrics {
271                self.record_delete().await;
272            }
273            self.active_entries.remove(key);
274            return Ok(true);
275        }
276        drop(conn_opt);
277
278        let cache_key = self.make_cache_key(key);
279        let mut connection = self.get_connection().await?;
280
281        let result = connection.del::<_, i64>(&cache_key).await;
282        self.return_connection(connection);
283
284        match result {
285            Ok(count) => {
286                let deleted = count > 0;
287                if deleted {
288                    if self.config.enable_metrics {
289                        self.record_delete().await;
290                    }
291                    self.active_entries.remove(key);
292                }
293                Ok(deleted)
294            }
295            Err(e) => {
296                error!("Cache delete error for key {}: {}", cache_key, e);
297                Err(CacheError::RedisError(e.to_string()))
298            }
299        }
300    }
301
302    /// Check if key exists in cache
303    #[instrument(skip(self))]
304    pub async fn exists(&self, key: &str) -> Result<bool, CacheError> {
305        // Check if we have a Redis connection
306        let conn_opt = self.connection.read().await;
307        if conn_opt.is_none() {
308            // No Redis connection, simulate non-existence
309            return Ok(false);
310        }
311        drop(conn_opt);
312
313        let cache_key = self.make_cache_key(key);
314        let mut connection = self.get_connection().await?;
315
316        let result = connection.exists::<_, bool>(&cache_key).await;
317        self.return_connection(connection);
318
319        match result {
320            Ok(exists) => Ok(exists),
321            Err(e) => {
322                error!("Cache exists error for key {}: {}", cache_key, e);
323                Err(CacheError::RedisError(e.to_string()))
324            }
325        }
326    }
327
328    /// Get time to live for a key
329    #[instrument(skip(self))]
330    pub async fn ttl(&self, key: &str) -> Result<Option<i64>, CacheError> {
331        // Check if we have a Redis connection
332        let conn_opt = self.connection.read().await;
333        if conn_opt.is_none() {
334            return Ok(None);
335        }
336        drop(conn_opt);
337
338        let cache_key = self.make_cache_key(key);
339        let mut connection = self.get_connection().await?;
340
341        let result = connection.ttl::<_, i64>(&cache_key).await;
342        self.return_connection(connection);
343
344        match result {
345            Ok(ttl) => {
346                if ttl < 0 {
347                    Ok(None)
348                } else {
349                    Ok(Some(ttl))
350                }
351            }
352            Err(e) => {
353                error!("Cache TTL error for key {}: {}", cache_key, e);
354                Err(CacheError::RedisError(e.to_string()))
355            }
356        }
357    }
358
359    /// Increment a numeric value
360    #[instrument(skip(self))]
361    pub async fn increment(&self, key: &str, amount: i64) -> Result<i64, CacheError> {
362        // Check if we have a Redis connection
363        let conn_opt = self.connection.read().await;
364        if conn_opt.is_none() {
365            // No Redis connection, simulate increment
366            return Ok(amount);
367        }
368        drop(conn_opt);
369
370        let cache_key = self.make_cache_key(key);
371        let mut connection = self.get_connection().await?;
372
373        let result = connection.incr::<_, _, i64>(&cache_key, amount).await;
374        self.return_connection(connection);
375
376        match result {
377            Ok(value) => Ok(value),
378            Err(e) => {
379                error!("Cache increment error for key {}: {}", cache_key, e);
380                Err(CacheError::RedisError(e.to_string()))
381            }
382        }
383    }
384
385    /// Clear all cache entries
386    #[instrument(skip(self))]
387    pub async fn clear(&self) -> Result<(), CacheError> {
388        // Check if we have a Redis connection
389        let conn_opt = self.connection.read().await;
390        let keys_deleted = if conn_opt.is_some() {
391            drop(conn_opt);
392
393            let pattern = format!("{}:*", self.config.key_prefix);
394            let mut connection = self.get_connection().await?;
395
396            // Get all keys matching the pattern
397            let keys: Vec<String> = connection.keys::<_, Vec<String>>(&pattern).await
398                .map_err(|e| CacheError::RedisError(e.to_string()))?;
399
400            let keys_count = keys.len();
401
402            if !keys.is_empty() {
403                // Delete all keys
404                let _: () = connection.del::<_, ()>(&keys).await
405                    .map_err(|e| CacheError::RedisError(e.to_string()))?;
406            }
407
408            self.return_connection(connection);
409            keys_count
410        } else {
411            0
412        };
413
414        // Clear local tracking
415        self.active_entries.clear();
416
417        if self.config.enable_metrics {
418            let mut stats = self.stats.write().await;
419            stats.deletes += keys_deleted as u64;
420        }
421
422        info!("Cache cleared, {} keys deleted", keys_deleted);
423        Ok(())
424    }
425
426    /// Get cache statistics
427    pub async fn get_statistics(&self) -> CacheStats {
428        self.stats.read().await.clone()
429    }
430
431    /// Get cache information
432    #[instrument(skip(self))]
433    pub async fn get_info(&self) -> Result<HashMap<String, String>, CacheError> {
434        // Check if we have a Redis connection
435        let conn_opt = self.connection.read().await;
436        if conn_opt.is_none() {
437            // No Redis connection, return mock info
438            let mut info_map = HashMap::new();
439            info_map.insert("status".to_string(), "mock".to_string());
440            info_map.insert("version".to_string(), "0.0.0".to_string());
441            return Ok(info_map);
442        }
443        drop(conn_opt);
444
445        let mut connection = self.get_connection().await?;
446
447        let info_result: Result<String, _> = redis::cmd("INFO").query_async(&mut connection).await;
448        self.return_connection(connection);
449
450        match info_result {
451            Ok(info) => {
452                // Parse INFO command output
453                let mut info_map = HashMap::new();
454                for line in info.lines() {
455                    if let Some((key, value)) = line.split_once(':') {
456                        info_map.insert(key.to_string(), value.to_string());
457                    }
458                }
459                Ok(info_map)
460            }
461            Err(e) => {
462                error!("Cache INFO error: {}", e);
463                Err(CacheError::RedisError(e.to_string()))
464            }
465        }
466    }
467
468    /// Get a Redis connection
469    async fn get_connection(&self) -> Result<redis::aio::Connection, CacheError> {
470        let mut conn_opt = self.connection.write().await;
471        match conn_opt.take() {
472            Some(conn) => Ok(conn),
473            None => {
474                // Try to create new connection
475                self.client.get_async_connection().await
476                    .map_err(|e| CacheError::ConnectionError(e.to_string()))
477            }
478        }
479    }
480
481    /// Return a connection
482    async fn return_connection(&self, connection: redis::aio::Connection) {
483        let mut conn_opt = self.connection.write().await;
484        *conn_opt = Some(connection);
485    }
486
487    /// Make cache key with prefix
488    fn make_cache_key(&self, key: &str) -> String {
489        format!("{}:{}", self.config.key_prefix, key)
490    }
491
492    /// Compress data using LZ4
493    fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>, CacheError> {
494        lz4::block::compress(data, None, true)
495            .map_err(|e| CacheError::CompressionError(e.to_string()))
496    }
497
498    /// Decompress data using LZ4
499    fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>, CacheError> {
500        lz4::block::decompress(data, None)
501            .map_err(|e| CacheError::CompressionError(e.to_string()))
502    }
503
504    /// Update access metadata for a cache entry
505    async fn update_access_metadata(&self, key: &str) -> Result<(), CacheError> {
506        if let Some(mut entry) = self.active_entries.get_mut(key) {
507            entry.last_accessed = Utc::now();
508            entry.access_count += 1;
509        }
510        Ok(())
511    }
512
513    /// Enforce cache size limit by evicting least recently used entries
514    async fn enforce_size_limit(&self) -> Result<(), CacheError> {
515        let mut total_size = 0u64;
516        let mut entries: Vec<_> = self.active_entries.iter().collect();
517
518        // Sort by last accessed time (oldest first)
519        entries.sort_by(|a, b| a.last_accessed.cmp(&b.last_accessed));
520
521        for entry in entries {
522            total_size += entry.size_bytes as u64;
523
524            if total_size > self.config.max_size_bytes {
525                // Evict this entry
526                let key = entry.key.clone();
527                if let Err(e) = self.delete(&key).await {
528                    warn!("Failed to evict cache entry {}: {}", key, e);
529                } else {
530                    if self.config.enable_metrics {
531                        self.record_eviction().await;
532                    }
533                }
534            }
535        }
536
537        Ok(())
538    }
539
540    /// Record cache hit
541    async fn record_hit(&self) {
542        // Use simple counter without labels to avoid version compatibility issues
543        let mut stats = self.stats.write().await;
544        stats.hits += 1;
545        self.update_hit_ratio(&mut stats);
546    }
547
548    /// Record cache miss
549    async fn record_miss(&self) {
550        let mut stats = self.stats.write().await;
551        stats.misses += 1;
552        self.update_hit_ratio(&mut stats);
553    }
554
555    /// Record cache set
556    async fn record_set(&self) {
557        let mut stats = self.stats.write().await;
558        stats.sets += 1;
559    }
560
561    /// Record cache delete
562    async fn record_delete(&self) {
563        let mut stats = self.stats.write().await;
564        stats.deletes += 1;
565    }
566
567    /// Record cache eviction
568    async fn record_eviction(&self) {
569        let mut stats = self.stats.write().await;
570        stats.evictions += 1;
571    }
572
573    /// Update hit ratio in statistics
574    fn update_hit_ratio(&self, stats: &mut CacheStats) {
575        let total = stats.hits + stats.misses;
576        if total > 0 {
577            stats.hit_ratio = stats.hits as f64 / total as f64;
578        }
579    }
580}
581
582/// Cache error types
583#[derive(thiserror::Error, Debug)]
584pub enum CacheError {
585    #[error("Redis connection error: {0}")]
586    ConnectionError(String),
587
588    #[error("Redis operation error: {0}")]
589    RedisError(String),
590
591    #[error("Serialization error: {0}")]
592    SerializationError(String),
593
594    #[error("Compression error: {0}")]
595    CompressionError(String),
596
597    #[error("Invalid configuration: {0}")]
598    ConfigError(String),
599}
600
601impl Default for CacheStats {
602    fn default() -> Self {
603        Self {
604            hits: 0,
605            misses: 0,
606            sets: 0,
607            deletes: 0,
608            evictions: 0,
609            hit_ratio: 0.0,
610            total_size_bytes: 0,
611            entries_count: 0,
612        }
613    }
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619    use std::time::Duration;
620
621    #[test]
622    fn test_cache_config_default() {
623        let config = CacheConfig::default();
624
625        assert_eq!(config.redis_urls, vec!["redis://127.0.0.1:6379".to_string()]);
626        assert_eq!(config.connection_timeout_seconds, 30);
627        assert_eq!(config.default_ttl_seconds, 3600);
628        assert_eq!(config.max_size_bytes, 1024 * 1024 * 1024);
629        assert!(config.enable_compression);
630        assert_eq!(config.compression_threshold_bytes, 1024);
631        assert!(config.enable_metrics);
632        assert_eq!(config.key_prefix, "kotoba:cache");
633    }
634
635    #[test]
636    fn test_cache_config_custom() {
637        let config = CacheConfig {
638            redis_urls: vec!["redis://localhost:6380".to_string()],
639            connection_timeout_seconds: 60,
640            default_ttl_seconds: 1800,
641            max_size_bytes: 512 * 1024 * 1024,
642            enable_compression: false,
643            compression_threshold_bytes: 2048,
644            enable_metrics: false,
645            key_prefix: "test:cache".to_string(),
646        };
647
648        assert_eq!(config.redis_urls, vec!["redis://localhost:6380".to_string()]);
649        assert_eq!(config.connection_timeout_seconds, 60);
650        assert_eq!(config.default_ttl_seconds, 1800);
651        assert_eq!(config.max_size_bytes, 512 * 1024 * 1024);
652        assert!(!config.enable_compression);
653        assert_eq!(config.compression_threshold_bytes, 2048);
654        assert!(!config.enable_metrics);
655        assert_eq!(config.key_prefix, "test:cache");
656    }
657
658    #[test]
659    fn test_cache_stats_default() {
660        let stats = CacheStats::default();
661
662        assert_eq!(stats.hits, 0);
663        assert_eq!(stats.misses, 0);
664        assert_eq!(stats.sets, 0);
665        assert_eq!(stats.deletes, 0);
666        assert_eq!(stats.evictions, 0);
667        assert_eq!(stats.hit_ratio, 0.0);
668        assert_eq!(stats.total_size_bytes, 0);
669        assert_eq!(stats.entries_count, 0);
670    }
671
672    #[test]
673    fn test_cache_stats_hit_ratio_calculation() {
674        let mut stats = CacheStats::default();
675
676        // Test with no requests
677        CacheStats::update_hit_ratio(&stats, &mut stats);
678        assert_eq!(stats.hit_ratio, 0.0);
679
680        // Test with hits
681        stats.hits = 3;
682        stats.misses = 2;
683        CacheStats::update_hit_ratio(&stats, &mut stats);
684        assert_eq!(stats.hit_ratio, 0.6);
685
686        // Test with only hits
687        stats.hits = 5;
688        stats.misses = 0;
689        CacheStats::update_hit_ratio(&stats, &mut stats);
690        assert_eq!(stats.hit_ratio, 1.0);
691
692        // Test with only misses
693        stats.hits = 0;
694        stats.misses = 3;
695        CacheStats::update_hit_ratio(&stats, &mut stats);
696        assert_eq!(stats.hit_ratio, 0.0);
697    }
698
699    #[test]
700    fn test_cache_entry_creation() {
701        let now = Utc::now();
702        let entry = CacheEntry {
703            key: "test_key".to_string(),
704            size_bytes: 1024,
705            created_at: now,
706            last_accessed: now,
707            access_count: 0,
708            ttl_seconds: Some(3600),
709        };
710
711        assert_eq!(entry.key, "test_key");
712        assert_eq!(entry.size_bytes, 1024);
713        assert_eq!(entry.created_at, now);
714        assert_eq!(entry.last_accessed, now);
715        assert_eq!(entry.access_count, 0);
716        assert_eq!(entry.ttl_seconds, Some(3600));
717    }
718
719    #[test]
720    fn test_cache_key_generation() {
721        let config = CacheConfig {
722            key_prefix: "test:cache".to_string(),
723            ..Default::default()
724        };
725
726        let cache = CacheLayer {
727            config,
728            client: Client::open("redis://127.0.0.1:6379").unwrap(),
729            connection: Arc::new(RwLock::new(None)),
730            stats: Arc::new(RwLock::new(CacheStats::default())),
731            active_entries: Arc::new(DashMap::new()),
732        };
733
734        assert_eq!(cache.make_cache_key("my_key"), "test:cache:my_key");
735        assert_eq!(cache.make_cache_key("another/key"), "test:cache:another/key");
736        assert_eq!(cache.make_cache_key(""), "test:cache:");
737    }
738
739    #[test]
740    fn test_cache_config_multiple_redis_urls() {
741        let config = CacheConfig {
742            redis_urls: vec![
743                "redis://127.0.0.1:6379".to_string(),
744                "redis://127.0.0.1:6380".to_string(),
745                "redis://127.0.0.1:6381".to_string(),
746            ],
747            ..Default::default()
748        };
749
750        assert_eq!(config.redis_urls.len(), 3);
751        assert_eq!(config.redis_urls[0], "redis://127.0.0.1:6379");
752        assert_eq!(config.redis_urls[1], "redis://127.0.0.1:6380");
753        assert_eq!(config.redis_urls[2], "redis://127.0.0.1:6381");
754    }
755
756    #[test]
757    fn test_cache_error_types() {
758        let conn_err = CacheError::ConnectionError("connection failed".to_string());
759        assert!(format!("{}", conn_err).contains("connection failed"));
760
761        let redis_err = CacheError::RedisError("redis error".to_string());
762        assert!(format!("{}", redis_err).contains("redis error"));
763
764        let ser_err = CacheError::SerializationError("serialization failed".to_string());
765        assert!(format!("{}", ser_err).contains("serialization failed"));
766
767        let comp_err = CacheError::CompressionError("compression failed".to_string());
768        assert!(format!("{}", comp_err).contains("compression failed"));
769
770        let config_err = CacheError::ConfigError("invalid config".to_string());
771        assert!(format!("{}", config_err).contains("invalid config"));
772    }
773
774    #[test]
775    fn test_json_serialization_roundtrip() {
776        let config = CacheConfig::default();
777        let json_str = serde_json::to_string(&config).unwrap();
778        let deserialized: CacheConfig = serde_json::from_str(&json_str).unwrap();
779        assert_eq!(config.redis_urls, deserialized.redis_urls);
780        assert_eq!(config.key_prefix, deserialized.key_prefix);
781    }
782
783    #[test]
784    fn test_cache_stats_serialization() {
785        let stats = CacheStats {
786            hits: 100,
787            misses: 50,
788            sets: 75,
789            deletes: 25,
790            evictions: 10,
791            hit_ratio: 0.667,
792            total_size_bytes: 1024 * 1024,
793            entries_count: 100,
794        };
795
796        let json_str = serde_json::to_string(&stats).unwrap();
797        let deserialized: CacheStats = serde_json::from_str(&json_str).unwrap();
798
799        assert_eq!(stats.hits, deserialized.hits);
800        assert_eq!(stats.misses, deserialized.misses);
801        assert_eq!(stats.sets, deserialized.sets);
802        assert_eq!(stats.deletes, deserialized.deletes);
803        assert_eq!(stats.evictions, deserialized.evictions);
804        assert!((stats.hit_ratio - deserialized.hit_ratio).abs() < 0.001);
805        assert_eq!(stats.total_size_bytes, deserialized.total_size_bytes);
806        assert_eq!(stats.entries_count, deserialized.entries_count);
807    }
808
809    #[tokio::test]
810    async fn test_cache_layer_creation_mock() {
811        // Test with invalid Redis URL to force mock mode
812        let config = CacheConfig {
813            redis_urls: vec!["redis://invalid.host:9999".to_string()],
814            key_prefix: "test:cache".to_string(),
815            ..Default::default()
816        };
817
818        // This should succeed even with invalid Redis URL (mock mode)
819        let cache_result = CacheLayer::new(config).await;
820        assert!(cache_result.is_ok(), "Cache layer should create successfully in mock mode");
821
822        let cache = cache_result.unwrap();
823        assert_eq!(cache.config.key_prefix, "test:cache");
824    }
825
826    #[tokio::test]
827    async fn test_cache_mock_operations() {
828        let config = CacheConfig {
829            redis_urls: vec!["redis://invalid.host:9999".to_string()],
830            key_prefix: "test:mock".to_string(),
831            enable_metrics: true,
832            ..Default::default()
833        };
834
835        let cache = CacheLayer::new(config).await.unwrap();
836
837        // Test mock operations (should work without Redis)
838        let test_value = serde_json::json!({"message": "mock test", "number": 123});
839
840        // Set operation
841        let set_result = cache.set("mock_key", test_value.clone(), Some(60)).await;
842        assert!(set_result.is_ok(), "Mock set should succeed");
843
844        // Get operation (should return None in mock mode since we can't actually store)
845        let get_result = cache.get("mock_key").await;
846        assert!(get_result.is_ok(), "Mock get should succeed");
847        assert_eq!(get_result.unwrap(), None, "Mock get should return None");
848
849        // Exists operation
850        let exists_result = cache.exists("mock_key").await;
851        assert!(exists_result.is_ok(), "Mock exists should succeed");
852        assert!(!exists_result.unwrap(), "Mock exists should return false");
853
854        // Delete operation
855        let delete_result = cache.delete("mock_key").await;
856        assert!(delete_result.is_ok(), "Mock delete should succeed");
857        assert!(delete_result.unwrap(), "Mock delete should return true");
858
859        // TTL operation
860        let ttl_result = cache.ttl("mock_key").await;
861        assert!(ttl_result.is_ok(), "Mock TTL should succeed");
862        assert_eq!(ttl_result.unwrap(), None, "Mock TTL should return None");
863
864        // Increment operation
865        let incr_result = cache.increment("mock_key", 5).await;
866        assert!(incr_result.is_ok(), "Mock increment should succeed");
867        assert_eq!(incr_result.unwrap(), 5, "Mock increment should return amount");
868
869        // Clear operation
870        let clear_result = cache.clear().await;
871        assert!(clear_result.is_ok(), "Mock clear should succeed");
872
873        // Info operation
874        let info_result = cache.get_info().await;
875        assert!(info_result.is_ok(), "Mock info should succeed");
876        let info = info_result.unwrap();
877        assert_eq!(info.get("status"), Some(&"mock".to_string()));
878    }
879
880    #[tokio::test]
881    async fn test_cache_mock_statistics() {
882        let config = CacheConfig {
883            redis_urls: vec!["redis://invalid.host:9999".to_string()],
884            key_prefix: "test:stats".to_string(),
885            enable_metrics: true,
886            ..Default::default()
887        };
888
889        let cache = CacheLayer::new(config).await.unwrap();
890
891        // Get initial stats
892        let initial_stats = cache.get_statistics().await;
893        assert_eq!(initial_stats.hits, 0);
894        assert_eq!(initial_stats.misses, 0);
895        assert_eq!(initial_stats.sets, 0);
896
897        // Perform mock operations that should update stats
898        let test_value = serde_json::json!({"test": true});
899
900        // Set operation (should record in stats)
901        cache.set("stats_test", test_value.clone(), None).await.unwrap();
902
903        // Get operation (should be miss in mock mode)
904        cache.get("stats_test").await.unwrap();
905
906        // Get non-existent (should be miss)
907        cache.get("non_existent").await.unwrap();
908
909        // Delete operation (should record in stats)
910        cache.delete("stats_test").await.unwrap();
911
912        // Check updated stats
913        let updated_stats = cache.get_statistics().await;
914        assert_eq!(updated_stats.sets, 1);
915        assert_eq!(updated_stats.misses, 2); // get on existing and non-existent
916        assert_eq!(updated_stats.deletes, 1);
917    }
918
919    #[tokio::test]
920    async fn test_cache_large_value_compression() {
921        let config = CacheConfig {
922            redis_urls: vec!["redis://invalid.host:9999".to_string()],
923            key_prefix: "test:compress".to_string(),
924            enable_compression: true,
925            compression_threshold_bytes: 100, // Low threshold for testing
926            ..Default::default()
927        };
928
929        let cache = CacheLayer::new(config).await.unwrap();
930
931        // Create a large value that should be compressed
932        let large_string = "x".repeat(200); // 200 characters
933        let large_value = serde_json::json!({"data": large_string});
934
935        // Set operation (should work in mock mode)
936        let set_result = cache.set("large_key", large_value, None).await;
937        assert!(set_result.is_ok(), "Setting large value should succeed");
938    }
939
940    #[tokio::test]
941    async fn test_cache_concurrent_operations() {
942        let config = CacheConfig {
943            redis_urls: vec!["redis://invalid.host:9999".to_string()],
944            key_prefix: "test:concurrent".to_string(),
945            ..Default::default()
946        };
947
948        let cache = Arc::new(CacheLayer::new(config).await.unwrap());
949        let mut handles = vec![];
950
951        // Spawn multiple concurrent operations
952        for i in 0..10 {
953            let cache_clone = Arc::clone(&cache);
954            let handle = tokio::spawn(async move {
955                let key = format!("concurrent_key_{}", i);
956                let value = serde_json::json!({"index": i, "thread": "test"});
957
958                // Perform operations
959                let set_result = cache_clone.set(&key, value, Some(300)).await;
960                assert!(set_result.is_ok());
961
962                let get_result = cache_clone.get(&key).await;
963                assert!(get_result.is_ok());
964
965                let exists_result = cache_clone.exists(&key).await;
966                assert!(exists_result.is_ok());
967
968                let delete_result = cache_clone.delete(&key).await;
969                assert!(delete_result.is_ok());
970            });
971            handles.push(handle);
972        }
973
974        // Wait for all operations to complete
975        for handle in handles {
976            handle.await.unwrap();
977        }
978    }
979
980    #[tokio::test]
981    async fn test_cache_ttl_operations() {
982        let config = CacheConfig {
983            redis_urls: vec!["redis://invalid.host:9999".to_string()],
984            key_prefix: "test:ttl".to_string(),
985            default_ttl_seconds: 120,
986            ..Default::default()
987        };
988
989        let cache = CacheLayer::new(config).await.unwrap();
990
991        // Test setting with TTL
992        let test_value = serde_json::json!({"ttl_test": true});
993        let set_result = cache.set("ttl_key", test_value, Some(60)).await;
994        assert!(set_result.is_ok());
995
996        // Test TTL retrieval (should return None in mock mode)
997        let ttl_result = cache.ttl("ttl_key").await;
998        assert!(ttl_result.is_ok());
999        assert_eq!(ttl_result.unwrap(), None);
1000    }
1001
1002    #[tokio::test]
1003    async fn test_cache_increment_operations() {
1004        let config = CacheConfig {
1005            redis_urls: vec!["redis://invalid.host:9999".to_string()],
1006            key_prefix: "test:incr".to_string(),
1007            ..Default::default()
1008        };
1009
1010        let cache = CacheLayer::new(config).await.unwrap();
1011
1012        // Test increment operations
1013        let incr1 = cache.increment("counter", 5).await;
1014        assert!(incr1.is_ok());
1015        assert_eq!(incr1.unwrap(), 5);
1016
1017        let incr2 = cache.increment("counter", 3).await;
1018        assert!(incr2.is_ok());
1019        assert_eq!(incr2.unwrap(), 3);
1020
1021        let incr3 = cache.increment("new_counter", -2).await;
1022        assert!(incr3.is_ok());
1023        assert_eq!(incr3.unwrap(), -2);
1024    }
1025
1026    #[tokio::test]
1027    async fn test_cache_info_mock() {
1028        let config = CacheConfig {
1029            redis_urls: vec!["redis://invalid.host:9999".to_string()],
1030            key_prefix: "test:info".to_string(),
1031            ..Default::default()
1032        };
1033
1034        let cache = CacheLayer::new(config).await.unwrap();
1035
1036        let info_result = cache.get_info().await;
1037        assert!(info_result.is_ok());
1038
1039        let info = info_result.unwrap();
1040        assert_eq!(info.get("status"), Some(&"mock".to_string()));
1041        assert_eq!(info.get("version"), Some(&"0.0.0".to_string()));
1042    }
1043
1044    #[test]
1045    fn test_cache_config_validation() {
1046        // Test valid config
1047        let valid_config = CacheConfig {
1048            redis_urls: vec!["redis://localhost:6379".to_string()],
1049            connection_timeout_seconds: 30,
1050            default_ttl_seconds: 3600,
1051            max_size_bytes: 1024 * 1024 * 1024,
1052            enable_compression: true,
1053            compression_threshold_bytes: 1024,
1054            enable_metrics: true,
1055            key_prefix: "valid:prefix".to_string(),
1056        };
1057
1058        assert!(!valid_config.redis_urls.is_empty());
1059        assert!(valid_config.connection_timeout_seconds > 0);
1060        assert!(valid_config.default_ttl_seconds > 0);
1061        assert!(valid_config.max_size_bytes > 0);
1062        assert!(valid_config.compression_threshold_bytes > 0);
1063        assert!(!valid_config.key_prefix.is_empty());
1064    }
1065
1066    #[test]
1067    fn test_cache_config_edge_cases() {
1068        // Test config with empty values
1069        let empty_config = CacheConfig {
1070            redis_urls: vec![],
1071            key_prefix: "".to_string(),
1072            ..Default::default()
1073        };
1074
1075        assert!(empty_config.redis_urls.is_empty());
1076        assert!(empty_config.key_prefix.is_empty());
1077
1078        // Test config with extreme values
1079        let extreme_config = CacheConfig {
1080            redis_urls: vec!["redis://test".to_string()],
1081            connection_timeout_seconds: u64::MAX,
1082            default_ttl_seconds: u64::MAX,
1083            max_size_bytes: u64::MAX,
1084            compression_threshold_bytes: usize::MAX,
1085            key_prefix: "a".repeat(1000), // Very long prefix
1086            ..Default::default()
1087        };
1088
1089        assert_eq!(extreme_config.connection_timeout_seconds, u64::MAX);
1090        assert_eq!(extreme_config.max_size_bytes, u64::MAX);
1091        assert_eq!(extreme_config.compression_threshold_bytes, usize::MAX);
1092        assert_eq!(extreme_config.key_prefix.len(), 1000);
1093    }
1094}