1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10use redis::{Client, Connection, AsyncCommands, RedisResult};
11use dashmap::DashMap;
12use serde::{Deserialize, Serialize};
13use tracing::{info, warn, error, instrument};
14use metrics::{counter, histogram};
15use chrono::{DateTime, Utc};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CacheConfig {
20 pub redis_urls: Vec<String>,
22 pub connection_timeout_seconds: u64,
24 pub default_ttl_seconds: u64,
26 pub max_size_bytes: u64,
28 pub enable_compression: bool,
30 pub compression_threshold_bytes: usize,
32 pub enable_metrics: bool,
34 pub key_prefix: String,
36}
37
38impl Default for CacheConfig {
39 fn default() -> Self {
40 Self {
41 redis_urls: vec!["redis://127.0.0.1:6379".to_string()],
42 connection_timeout_seconds: 30,
43 default_ttl_seconds: 3600, max_size_bytes: 1024 * 1024 * 1024, enable_compression: true,
46 compression_threshold_bytes: 1024, enable_metrics: true,
48 key_prefix: "kotoba:cache".to_string(),
49 }
50 }
51}
52
53pub struct CacheLayer {
55 config: CacheConfig,
57 client: Client,
59 connection: Arc<RwLock<Option<redis::aio::Connection>>>,
61 stats: Arc<RwLock<CacheStats>>,
63 active_entries: Arc<DashMap<String, CacheEntry>>,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct CacheEntry {
70 pub key: String,
72 pub size_bytes: usize,
74 pub created_at: DateTime<Utc>,
76 pub last_accessed: DateTime<Utc>,
78 pub access_count: u64,
80 pub ttl_seconds: Option<u64>,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct CacheStats {
87 pub hits: u64,
88 pub misses: u64,
89 pub sets: u64,
90 pub deletes: u64,
91 pub evictions: u64,
92 pub hit_ratio: f64,
93 pub total_size_bytes: u64,
94 pub entries_count: u64,
95}
96
97impl CacheLayer {
98 pub async fn new(config: CacheConfig) -> Result<Self, CacheError> {
100 info!("Initializing Redis cache layer with config: {:?}", config);
101
102 let client = Client::open(config.redis_urls.first().unwrap_or(&"redis://127.0.0.1:6379".to_string()).clone())
104 .map_err(|e| CacheError::ConnectionError(e.to_string()))?;
105
106 let connection = match client.get_async_connection().await {
108 Ok(conn) => Some(conn),
109 Err(e) => {
110 warn!("Failed to establish Redis connection: {}. Using mock cache.", e);
111 None }
113 };
114
115 if connection.is_some() {
116 info!("Redis connection established successfully");
117 }
118
119 let cache = Self {
120 config,
121 client,
122 connection: Arc::new(RwLock::new(connection)),
123 stats: Arc::new(RwLock::new(CacheStats::default())),
124 active_entries: Arc::new(DashMap::new()),
125 };
126
127 Ok(cache)
128 }
129
130 #[instrument(skip(self))]
132 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>, CacheError> {
133 let conn_opt = self.connection.read().await;
135 if conn_opt.is_none() {
136 if self.config.enable_metrics {
138 self.record_miss().await;
139 }
140 return Ok(None);
141 }
142 drop(conn_opt);
143
144 let cache_key = self.make_cache_key(key);
145 let mut connection = self.get_connection().await?;
146
147 let result = connection.get::<_, Option<Vec<u8>>>(&cache_key).await;
148 self.return_connection(connection);
149
150 match result {
151 Ok(Some(data)) => {
152 let decompressed_data = if self.config.enable_compression {
154 self.decompress_data(&data)?
155 } else {
156 data
157 };
158
159 let value: serde_json::Value = serde_json::from_slice(&decompressed_data)
161 .map_err(|e| CacheError::SerializationError(e.to_string()))?;
162
163 if self.config.enable_metrics {
165 self.record_hit().await;
166 }
167
168 self.update_access_metadata(key).await?;
170
171 Ok(Some(value))
172 }
173 Ok(None) => {
174 if self.config.enable_metrics {
175 self.record_miss().await;
176 }
177 Ok(None)
178 }
179 Err(e) => {
180 error!("Cache get error for key {}: {}", cache_key, e);
181 if self.config.enable_metrics {
182 self.record_miss().await;
183 }
184 Err(CacheError::RedisError(e.to_string()))
185 }
186 }
187 }
188
189 #[instrument(skip(self, value))]
191 pub async fn set(
192 &self,
193 key: &str,
194 value: serde_json::Value,
195 ttl_seconds: Option<u64>,
196 ) -> Result<(), CacheError> {
197 let conn_opt = self.connection.read().await;
199 if conn_opt.is_none() {
200 if self.config.enable_metrics {
202 self.record_set().await;
203 }
204 return Ok(());
205 }
206 drop(conn_opt);
207
208 let cache_key = self.make_cache_key(key);
209 let mut connection = self.get_connection().await?;
210
211 let serialized_data = serde_json::to_vec(&value)
213 .map_err(|e| CacheError::SerializationError(e.to_string()))?;
214
215 let final_data = if self.config.enable_compression && serialized_data.len() > self.config.compression_threshold_bytes {
217 self.compress_data(&serialized_data)?
218 } else {
219 serialized_data.clone()
220 };
221
222 let ttl = ttl_seconds.unwrap_or(self.config.default_ttl_seconds);
224
225 let result = if ttl > 0 {
226 connection.set_ex(&cache_key, final_data, ttl as u64).await
227 } else {
228 connection.set(&cache_key, final_data).await
229 };
230
231 self.return_connection(connection);
232
233 match result {
234 Ok(()) => {
235 if self.config.enable_metrics {
237 self.record_set().await;
238 }
239
240 let entry = CacheEntry {
242 key: key.to_string(),
243 size_bytes: serialized_data.len(),
244 created_at: Utc::now(),
245 last_accessed: Utc::now(),
246 access_count: 0,
247 ttl_seconds: Some(ttl),
248 };
249 self.active_entries.insert(key.to_string(), entry);
250
251 self.enforce_size_limit().await?;
253
254 Ok(())
255 }
256 Err(e) => {
257 error!("Cache set error for key {}: {}", cache_key, e);
258 Err(CacheError::RedisError(e.to_string()))
259 }
260 }
261 }
262
263 #[instrument(skip(self))]
265 pub async fn delete(&self, key: &str) -> Result<bool, CacheError> {
266 let conn_opt = self.connection.read().await;
268 if conn_opt.is_none() {
269 if self.config.enable_metrics {
271 self.record_delete().await;
272 }
273 self.active_entries.remove(key);
274 return Ok(true);
275 }
276 drop(conn_opt);
277
278 let cache_key = self.make_cache_key(key);
279 let mut connection = self.get_connection().await?;
280
281 let result = connection.del::<_, i64>(&cache_key).await;
282 self.return_connection(connection);
283
284 match result {
285 Ok(count) => {
286 let deleted = count > 0;
287 if deleted {
288 if self.config.enable_metrics {
289 self.record_delete().await;
290 }
291 self.active_entries.remove(key);
292 }
293 Ok(deleted)
294 }
295 Err(e) => {
296 error!("Cache delete error for key {}: {}", cache_key, e);
297 Err(CacheError::RedisError(e.to_string()))
298 }
299 }
300 }
301
302 #[instrument(skip(self))]
304 pub async fn exists(&self, key: &str) -> Result<bool, CacheError> {
305 let conn_opt = self.connection.read().await;
307 if conn_opt.is_none() {
308 return Ok(false);
310 }
311 drop(conn_opt);
312
313 let cache_key = self.make_cache_key(key);
314 let mut connection = self.get_connection().await?;
315
316 let result = connection.exists::<_, bool>(&cache_key).await;
317 self.return_connection(connection);
318
319 match result {
320 Ok(exists) => Ok(exists),
321 Err(e) => {
322 error!("Cache exists error for key {}: {}", cache_key, e);
323 Err(CacheError::RedisError(e.to_string()))
324 }
325 }
326 }
327
328 #[instrument(skip(self))]
330 pub async fn ttl(&self, key: &str) -> Result<Option<i64>, CacheError> {
331 let conn_opt = self.connection.read().await;
333 if conn_opt.is_none() {
334 return Ok(None);
335 }
336 drop(conn_opt);
337
338 let cache_key = self.make_cache_key(key);
339 let mut connection = self.get_connection().await?;
340
341 let result = connection.ttl::<_, i64>(&cache_key).await;
342 self.return_connection(connection);
343
344 match result {
345 Ok(ttl) => {
346 if ttl < 0 {
347 Ok(None)
348 } else {
349 Ok(Some(ttl))
350 }
351 }
352 Err(e) => {
353 error!("Cache TTL error for key {}: {}", cache_key, e);
354 Err(CacheError::RedisError(e.to_string()))
355 }
356 }
357 }
358
359 #[instrument(skip(self))]
361 pub async fn increment(&self, key: &str, amount: i64) -> Result<i64, CacheError> {
362 let conn_opt = self.connection.read().await;
364 if conn_opt.is_none() {
365 return Ok(amount);
367 }
368 drop(conn_opt);
369
370 let cache_key = self.make_cache_key(key);
371 let mut connection = self.get_connection().await?;
372
373 let result = connection.incr::<_, _, i64>(&cache_key, amount).await;
374 self.return_connection(connection);
375
376 match result {
377 Ok(value) => Ok(value),
378 Err(e) => {
379 error!("Cache increment error for key {}: {}", cache_key, e);
380 Err(CacheError::RedisError(e.to_string()))
381 }
382 }
383 }
384
385 #[instrument(skip(self))]
387 pub async fn clear(&self) -> Result<(), CacheError> {
388 let conn_opt = self.connection.read().await;
390 let keys_deleted = if conn_opt.is_some() {
391 drop(conn_opt);
392
393 let pattern = format!("{}:*", self.config.key_prefix);
394 let mut connection = self.get_connection().await?;
395
396 let keys: Vec<String> = connection.keys::<_, Vec<String>>(&pattern).await
398 .map_err(|e| CacheError::RedisError(e.to_string()))?;
399
400 let keys_count = keys.len();
401
402 if !keys.is_empty() {
403 let _: () = connection.del::<_, ()>(&keys).await
405 .map_err(|e| CacheError::RedisError(e.to_string()))?;
406 }
407
408 self.return_connection(connection);
409 keys_count
410 } else {
411 0
412 };
413
414 self.active_entries.clear();
416
417 if self.config.enable_metrics {
418 let mut stats = self.stats.write().await;
419 stats.deletes += keys_deleted as u64;
420 }
421
422 info!("Cache cleared, {} keys deleted", keys_deleted);
423 Ok(())
424 }
425
426 pub async fn get_statistics(&self) -> CacheStats {
428 self.stats.read().await.clone()
429 }
430
431 #[instrument(skip(self))]
433 pub async fn get_info(&self) -> Result<HashMap<String, String>, CacheError> {
434 let conn_opt = self.connection.read().await;
436 if conn_opt.is_none() {
437 let mut info_map = HashMap::new();
439 info_map.insert("status".to_string(), "mock".to_string());
440 info_map.insert("version".to_string(), "0.0.0".to_string());
441 return Ok(info_map);
442 }
443 drop(conn_opt);
444
445 let mut connection = self.get_connection().await?;
446
447 let info_result: Result<String, _> = redis::cmd("INFO").query_async(&mut connection).await;
448 self.return_connection(connection);
449
450 match info_result {
451 Ok(info) => {
452 let mut info_map = HashMap::new();
454 for line in info.lines() {
455 if let Some((key, value)) = line.split_once(':') {
456 info_map.insert(key.to_string(), value.to_string());
457 }
458 }
459 Ok(info_map)
460 }
461 Err(e) => {
462 error!("Cache INFO error: {}", e);
463 Err(CacheError::RedisError(e.to_string()))
464 }
465 }
466 }
467
468 async fn get_connection(&self) -> Result<redis::aio::Connection, CacheError> {
470 let mut conn_opt = self.connection.write().await;
471 match conn_opt.take() {
472 Some(conn) => Ok(conn),
473 None => {
474 self.client.get_async_connection().await
476 .map_err(|e| CacheError::ConnectionError(e.to_string()))
477 }
478 }
479 }
480
481 async fn return_connection(&self, connection: redis::aio::Connection) {
483 let mut conn_opt = self.connection.write().await;
484 *conn_opt = Some(connection);
485 }
486
487 fn make_cache_key(&self, key: &str) -> String {
489 format!("{}:{}", self.config.key_prefix, key)
490 }
491
492 fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>, CacheError> {
494 lz4::block::compress(data, None, true)
495 .map_err(|e| CacheError::CompressionError(e.to_string()))
496 }
497
498 fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>, CacheError> {
500 lz4::block::decompress(data, None)
501 .map_err(|e| CacheError::CompressionError(e.to_string()))
502 }
503
504 async fn update_access_metadata(&self, key: &str) -> Result<(), CacheError> {
506 if let Some(mut entry) = self.active_entries.get_mut(key) {
507 entry.last_accessed = Utc::now();
508 entry.access_count += 1;
509 }
510 Ok(())
511 }
512
513 async fn enforce_size_limit(&self) -> Result<(), CacheError> {
515 let mut total_size = 0u64;
516 let mut entries: Vec<_> = self.active_entries.iter().collect();
517
518 entries.sort_by(|a, b| a.last_accessed.cmp(&b.last_accessed));
520
521 for entry in entries {
522 total_size += entry.size_bytes as u64;
523
524 if total_size > self.config.max_size_bytes {
525 let key = entry.key.clone();
527 if let Err(e) = self.delete(&key).await {
528 warn!("Failed to evict cache entry {}: {}", key, e);
529 } else {
530 if self.config.enable_metrics {
531 self.record_eviction().await;
532 }
533 }
534 }
535 }
536
537 Ok(())
538 }
539
540 async fn record_hit(&self) {
542 let mut stats = self.stats.write().await;
544 stats.hits += 1;
545 self.update_hit_ratio(&mut stats);
546 }
547
548 async fn record_miss(&self) {
550 let mut stats = self.stats.write().await;
551 stats.misses += 1;
552 self.update_hit_ratio(&mut stats);
553 }
554
555 async fn record_set(&self) {
557 let mut stats = self.stats.write().await;
558 stats.sets += 1;
559 }
560
561 async fn record_delete(&self) {
563 let mut stats = self.stats.write().await;
564 stats.deletes += 1;
565 }
566
567 async fn record_eviction(&self) {
569 let mut stats = self.stats.write().await;
570 stats.evictions += 1;
571 }
572
573 fn update_hit_ratio(&self, stats: &mut CacheStats) {
575 let total = stats.hits + stats.misses;
576 if total > 0 {
577 stats.hit_ratio = stats.hits as f64 / total as f64;
578 }
579 }
580}
581
582#[derive(thiserror::Error, Debug)]
584pub enum CacheError {
585 #[error("Redis connection error: {0}")]
586 ConnectionError(String),
587
588 #[error("Redis operation error: {0}")]
589 RedisError(String),
590
591 #[error("Serialization error: {0}")]
592 SerializationError(String),
593
594 #[error("Compression error: {0}")]
595 CompressionError(String),
596
597 #[error("Invalid configuration: {0}")]
598 ConfigError(String),
599}
600
601impl Default for CacheStats {
602 fn default() -> Self {
603 Self {
604 hits: 0,
605 misses: 0,
606 sets: 0,
607 deletes: 0,
608 evictions: 0,
609 hit_ratio: 0.0,
610 total_size_bytes: 0,
611 entries_count: 0,
612 }
613 }
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619 use std::time::Duration;
620
621 #[test]
622 fn test_cache_config_default() {
623 let config = CacheConfig::default();
624
625 assert_eq!(config.redis_urls, vec!["redis://127.0.0.1:6379".to_string()]);
626 assert_eq!(config.connection_timeout_seconds, 30);
627 assert_eq!(config.default_ttl_seconds, 3600);
628 assert_eq!(config.max_size_bytes, 1024 * 1024 * 1024);
629 assert!(config.enable_compression);
630 assert_eq!(config.compression_threshold_bytes, 1024);
631 assert!(config.enable_metrics);
632 assert_eq!(config.key_prefix, "kotoba:cache");
633 }
634
635 #[test]
636 fn test_cache_config_custom() {
637 let config = CacheConfig {
638 redis_urls: vec!["redis://localhost:6380".to_string()],
639 connection_timeout_seconds: 60,
640 default_ttl_seconds: 1800,
641 max_size_bytes: 512 * 1024 * 1024,
642 enable_compression: false,
643 compression_threshold_bytes: 2048,
644 enable_metrics: false,
645 key_prefix: "test:cache".to_string(),
646 };
647
648 assert_eq!(config.redis_urls, vec!["redis://localhost:6380".to_string()]);
649 assert_eq!(config.connection_timeout_seconds, 60);
650 assert_eq!(config.default_ttl_seconds, 1800);
651 assert_eq!(config.max_size_bytes, 512 * 1024 * 1024);
652 assert!(!config.enable_compression);
653 assert_eq!(config.compression_threshold_bytes, 2048);
654 assert!(!config.enable_metrics);
655 assert_eq!(config.key_prefix, "test:cache");
656 }
657
658 #[test]
659 fn test_cache_stats_default() {
660 let stats = CacheStats::default();
661
662 assert_eq!(stats.hits, 0);
663 assert_eq!(stats.misses, 0);
664 assert_eq!(stats.sets, 0);
665 assert_eq!(stats.deletes, 0);
666 assert_eq!(stats.evictions, 0);
667 assert_eq!(stats.hit_ratio, 0.0);
668 assert_eq!(stats.total_size_bytes, 0);
669 assert_eq!(stats.entries_count, 0);
670 }
671
672 #[test]
673 fn test_cache_stats_hit_ratio_calculation() {
674 let mut stats = CacheStats::default();
675
676 CacheStats::update_hit_ratio(&stats, &mut stats);
678 assert_eq!(stats.hit_ratio, 0.0);
679
680 stats.hits = 3;
682 stats.misses = 2;
683 CacheStats::update_hit_ratio(&stats, &mut stats);
684 assert_eq!(stats.hit_ratio, 0.6);
685
686 stats.hits = 5;
688 stats.misses = 0;
689 CacheStats::update_hit_ratio(&stats, &mut stats);
690 assert_eq!(stats.hit_ratio, 1.0);
691
692 stats.hits = 0;
694 stats.misses = 3;
695 CacheStats::update_hit_ratio(&stats, &mut stats);
696 assert_eq!(stats.hit_ratio, 0.0);
697 }
698
699 #[test]
700 fn test_cache_entry_creation() {
701 let now = Utc::now();
702 let entry = CacheEntry {
703 key: "test_key".to_string(),
704 size_bytes: 1024,
705 created_at: now,
706 last_accessed: now,
707 access_count: 0,
708 ttl_seconds: Some(3600),
709 };
710
711 assert_eq!(entry.key, "test_key");
712 assert_eq!(entry.size_bytes, 1024);
713 assert_eq!(entry.created_at, now);
714 assert_eq!(entry.last_accessed, now);
715 assert_eq!(entry.access_count, 0);
716 assert_eq!(entry.ttl_seconds, Some(3600));
717 }
718
719 #[test]
720 fn test_cache_key_generation() {
721 let config = CacheConfig {
722 key_prefix: "test:cache".to_string(),
723 ..Default::default()
724 };
725
726 let cache = CacheLayer {
727 config,
728 client: Client::open("redis://127.0.0.1:6379").unwrap(),
729 connection: Arc::new(RwLock::new(None)),
730 stats: Arc::new(RwLock::new(CacheStats::default())),
731 active_entries: Arc::new(DashMap::new()),
732 };
733
734 assert_eq!(cache.make_cache_key("my_key"), "test:cache:my_key");
735 assert_eq!(cache.make_cache_key("another/key"), "test:cache:another/key");
736 assert_eq!(cache.make_cache_key(""), "test:cache:");
737 }
738
739 #[test]
740 fn test_cache_config_multiple_redis_urls() {
741 let config = CacheConfig {
742 redis_urls: vec![
743 "redis://127.0.0.1:6379".to_string(),
744 "redis://127.0.0.1:6380".to_string(),
745 "redis://127.0.0.1:6381".to_string(),
746 ],
747 ..Default::default()
748 };
749
750 assert_eq!(config.redis_urls.len(), 3);
751 assert_eq!(config.redis_urls[0], "redis://127.0.0.1:6379");
752 assert_eq!(config.redis_urls[1], "redis://127.0.0.1:6380");
753 assert_eq!(config.redis_urls[2], "redis://127.0.0.1:6381");
754 }
755
756 #[test]
757 fn test_cache_error_types() {
758 let conn_err = CacheError::ConnectionError("connection failed".to_string());
759 assert!(format!("{}", conn_err).contains("connection failed"));
760
761 let redis_err = CacheError::RedisError("redis error".to_string());
762 assert!(format!("{}", redis_err).contains("redis error"));
763
764 let ser_err = CacheError::SerializationError("serialization failed".to_string());
765 assert!(format!("{}", ser_err).contains("serialization failed"));
766
767 let comp_err = CacheError::CompressionError("compression failed".to_string());
768 assert!(format!("{}", comp_err).contains("compression failed"));
769
770 let config_err = CacheError::ConfigError("invalid config".to_string());
771 assert!(format!("{}", config_err).contains("invalid config"));
772 }
773
774 #[test]
775 fn test_json_serialization_roundtrip() {
776 let config = CacheConfig::default();
777 let json_str = serde_json::to_string(&config).unwrap();
778 let deserialized: CacheConfig = serde_json::from_str(&json_str).unwrap();
779 assert_eq!(config.redis_urls, deserialized.redis_urls);
780 assert_eq!(config.key_prefix, deserialized.key_prefix);
781 }
782
783 #[test]
784 fn test_cache_stats_serialization() {
785 let stats = CacheStats {
786 hits: 100,
787 misses: 50,
788 sets: 75,
789 deletes: 25,
790 evictions: 10,
791 hit_ratio: 0.667,
792 total_size_bytes: 1024 * 1024,
793 entries_count: 100,
794 };
795
796 let json_str = serde_json::to_string(&stats).unwrap();
797 let deserialized: CacheStats = serde_json::from_str(&json_str).unwrap();
798
799 assert_eq!(stats.hits, deserialized.hits);
800 assert_eq!(stats.misses, deserialized.misses);
801 assert_eq!(stats.sets, deserialized.sets);
802 assert_eq!(stats.deletes, deserialized.deletes);
803 assert_eq!(stats.evictions, deserialized.evictions);
804 assert!((stats.hit_ratio - deserialized.hit_ratio).abs() < 0.001);
805 assert_eq!(stats.total_size_bytes, deserialized.total_size_bytes);
806 assert_eq!(stats.entries_count, deserialized.entries_count);
807 }
808
809 #[tokio::test]
810 async fn test_cache_layer_creation_mock() {
811 let config = CacheConfig {
813 redis_urls: vec!["redis://invalid.host:9999".to_string()],
814 key_prefix: "test:cache".to_string(),
815 ..Default::default()
816 };
817
818 let cache_result = CacheLayer::new(config).await;
820 assert!(cache_result.is_ok(), "Cache layer should create successfully in mock mode");
821
822 let cache = cache_result.unwrap();
823 assert_eq!(cache.config.key_prefix, "test:cache");
824 }
825
826 #[tokio::test]
827 async fn test_cache_mock_operations() {
828 let config = CacheConfig {
829 redis_urls: vec!["redis://invalid.host:9999".to_string()],
830 key_prefix: "test:mock".to_string(),
831 enable_metrics: true,
832 ..Default::default()
833 };
834
835 let cache = CacheLayer::new(config).await.unwrap();
836
837 let test_value = serde_json::json!({"message": "mock test", "number": 123});
839
840 let set_result = cache.set("mock_key", test_value.clone(), Some(60)).await;
842 assert!(set_result.is_ok(), "Mock set should succeed");
843
844 let get_result = cache.get("mock_key").await;
846 assert!(get_result.is_ok(), "Mock get should succeed");
847 assert_eq!(get_result.unwrap(), None, "Mock get should return None");
848
849 let exists_result = cache.exists("mock_key").await;
851 assert!(exists_result.is_ok(), "Mock exists should succeed");
852 assert!(!exists_result.unwrap(), "Mock exists should return false");
853
854 let delete_result = cache.delete("mock_key").await;
856 assert!(delete_result.is_ok(), "Mock delete should succeed");
857 assert!(delete_result.unwrap(), "Mock delete should return true");
858
859 let ttl_result = cache.ttl("mock_key").await;
861 assert!(ttl_result.is_ok(), "Mock TTL should succeed");
862 assert_eq!(ttl_result.unwrap(), None, "Mock TTL should return None");
863
864 let incr_result = cache.increment("mock_key", 5).await;
866 assert!(incr_result.is_ok(), "Mock increment should succeed");
867 assert_eq!(incr_result.unwrap(), 5, "Mock increment should return amount");
868
869 let clear_result = cache.clear().await;
871 assert!(clear_result.is_ok(), "Mock clear should succeed");
872
873 let info_result = cache.get_info().await;
875 assert!(info_result.is_ok(), "Mock info should succeed");
876 let info = info_result.unwrap();
877 assert_eq!(info.get("status"), Some(&"mock".to_string()));
878 }
879
880 #[tokio::test]
881 async fn test_cache_mock_statistics() {
882 let config = CacheConfig {
883 redis_urls: vec!["redis://invalid.host:9999".to_string()],
884 key_prefix: "test:stats".to_string(),
885 enable_metrics: true,
886 ..Default::default()
887 };
888
889 let cache = CacheLayer::new(config).await.unwrap();
890
891 let initial_stats = cache.get_statistics().await;
893 assert_eq!(initial_stats.hits, 0);
894 assert_eq!(initial_stats.misses, 0);
895 assert_eq!(initial_stats.sets, 0);
896
897 let test_value = serde_json::json!({"test": true});
899
900 cache.set("stats_test", test_value.clone(), None).await.unwrap();
902
903 cache.get("stats_test").await.unwrap();
905
906 cache.get("non_existent").await.unwrap();
908
909 cache.delete("stats_test").await.unwrap();
911
912 let updated_stats = cache.get_statistics().await;
914 assert_eq!(updated_stats.sets, 1);
915 assert_eq!(updated_stats.misses, 2); assert_eq!(updated_stats.deletes, 1);
917 }
918
919 #[tokio::test]
920 async fn test_cache_large_value_compression() {
921 let config = CacheConfig {
922 redis_urls: vec!["redis://invalid.host:9999".to_string()],
923 key_prefix: "test:compress".to_string(),
924 enable_compression: true,
925 compression_threshold_bytes: 100, ..Default::default()
927 };
928
929 let cache = CacheLayer::new(config).await.unwrap();
930
931 let large_string = "x".repeat(200); let large_value = serde_json::json!({"data": large_string});
934
935 let set_result = cache.set("large_key", large_value, None).await;
937 assert!(set_result.is_ok(), "Setting large value should succeed");
938 }
939
940 #[tokio::test]
941 async fn test_cache_concurrent_operations() {
942 let config = CacheConfig {
943 redis_urls: vec!["redis://invalid.host:9999".to_string()],
944 key_prefix: "test:concurrent".to_string(),
945 ..Default::default()
946 };
947
948 let cache = Arc::new(CacheLayer::new(config).await.unwrap());
949 let mut handles = vec![];
950
951 for i in 0..10 {
953 let cache_clone = Arc::clone(&cache);
954 let handle = tokio::spawn(async move {
955 let key = format!("concurrent_key_{}", i);
956 let value = serde_json::json!({"index": i, "thread": "test"});
957
958 let set_result = cache_clone.set(&key, value, Some(300)).await;
960 assert!(set_result.is_ok());
961
962 let get_result = cache_clone.get(&key).await;
963 assert!(get_result.is_ok());
964
965 let exists_result = cache_clone.exists(&key).await;
966 assert!(exists_result.is_ok());
967
968 let delete_result = cache_clone.delete(&key).await;
969 assert!(delete_result.is_ok());
970 });
971 handles.push(handle);
972 }
973
974 for handle in handles {
976 handle.await.unwrap();
977 }
978 }
979
980 #[tokio::test]
981 async fn test_cache_ttl_operations() {
982 let config = CacheConfig {
983 redis_urls: vec!["redis://invalid.host:9999".to_string()],
984 key_prefix: "test:ttl".to_string(),
985 default_ttl_seconds: 120,
986 ..Default::default()
987 };
988
989 let cache = CacheLayer::new(config).await.unwrap();
990
991 let test_value = serde_json::json!({"ttl_test": true});
993 let set_result = cache.set("ttl_key", test_value, Some(60)).await;
994 assert!(set_result.is_ok());
995
996 let ttl_result = cache.ttl("ttl_key").await;
998 assert!(ttl_result.is_ok());
999 assert_eq!(ttl_result.unwrap(), None);
1000 }
1001
1002 #[tokio::test]
1003 async fn test_cache_increment_operations() {
1004 let config = CacheConfig {
1005 redis_urls: vec!["redis://invalid.host:9999".to_string()],
1006 key_prefix: "test:incr".to_string(),
1007 ..Default::default()
1008 };
1009
1010 let cache = CacheLayer::new(config).await.unwrap();
1011
1012 let incr1 = cache.increment("counter", 5).await;
1014 assert!(incr1.is_ok());
1015 assert_eq!(incr1.unwrap(), 5);
1016
1017 let incr2 = cache.increment("counter", 3).await;
1018 assert!(incr2.is_ok());
1019 assert_eq!(incr2.unwrap(), 3);
1020
1021 let incr3 = cache.increment("new_counter", -2).await;
1022 assert!(incr3.is_ok());
1023 assert_eq!(incr3.unwrap(), -2);
1024 }
1025
1026 #[tokio::test]
1027 async fn test_cache_info_mock() {
1028 let config = CacheConfig {
1029 redis_urls: vec!["redis://invalid.host:9999".to_string()],
1030 key_prefix: "test:info".to_string(),
1031 ..Default::default()
1032 };
1033
1034 let cache = CacheLayer::new(config).await.unwrap();
1035
1036 let info_result = cache.get_info().await;
1037 assert!(info_result.is_ok());
1038
1039 let info = info_result.unwrap();
1040 assert_eq!(info.get("status"), Some(&"mock".to_string()));
1041 assert_eq!(info.get("version"), Some(&"0.0.0".to_string()));
1042 }
1043
1044 #[test]
1045 fn test_cache_config_validation() {
1046 let valid_config = CacheConfig {
1048 redis_urls: vec!["redis://localhost:6379".to_string()],
1049 connection_timeout_seconds: 30,
1050 default_ttl_seconds: 3600,
1051 max_size_bytes: 1024 * 1024 * 1024,
1052 enable_compression: true,
1053 compression_threshold_bytes: 1024,
1054 enable_metrics: true,
1055 key_prefix: "valid:prefix".to_string(),
1056 };
1057
1058 assert!(!valid_config.redis_urls.is_empty());
1059 assert!(valid_config.connection_timeout_seconds > 0);
1060 assert!(valid_config.default_ttl_seconds > 0);
1061 assert!(valid_config.max_size_bytes > 0);
1062 assert!(valid_config.compression_threshold_bytes > 0);
1063 assert!(!valid_config.key_prefix.is_empty());
1064 }
1065
1066 #[test]
1067 fn test_cache_config_edge_cases() {
1068 let empty_config = CacheConfig {
1070 redis_urls: vec![],
1071 key_prefix: "".to_string(),
1072 ..Default::default()
1073 };
1074
1075 assert!(empty_config.redis_urls.is_empty());
1076 assert!(empty_config.key_prefix.is_empty());
1077
1078 let extreme_config = CacheConfig {
1080 redis_urls: vec!["redis://test".to_string()],
1081 connection_timeout_seconds: u64::MAX,
1082 default_ttl_seconds: u64::MAX,
1083 max_size_bytes: u64::MAX,
1084 compression_threshold_bytes: usize::MAX,
1085 key_prefix: "a".repeat(1000), ..Default::default()
1087 };
1088
1089 assert_eq!(extreme_config.connection_timeout_seconds, u64::MAX);
1090 assert_eq!(extreme_config.max_size_bytes, u64::MAX);
1091 assert_eq!(extreme_config.compression_threshold_bytes, usize::MAX);
1092 assert_eq!(extreme_config.key_prefix.len(), 1000);
1093 }
1094}