1use crate::backend::CacheBackend;
4use crate::entity::CacheEntity;
5use crate::error::{Error, Result};
6use crate::feed::CacheFeed;
7use crate::key::CacheKeyBuilder;
8use crate::observability::{CacheMetrics, NoOpMetrics, TtlPolicy};
9use crate::repository::DataRepository;
10use crate::strategy::CacheStrategy;
11use std::str::FromStr;
12use std::time::{Duration, Instant};
13
14#[derive(Clone, Debug, Default)]
40pub struct OperationConfig {
41 pub ttl_override: Option<Duration>,
70
71 pub retry_count: u32,
76}
77
78impl OperationConfig {
79 pub fn with_ttl(mut self, ttl: Duration) -> Self {
88 self.ttl_override = Some(ttl);
89 self
90 }
91
92 pub fn with_retry(mut self, count: u32) -> Self {
101 self.retry_count = count;
102 self
103 }
104}
105
106pub struct CacheExpander<B: CacheBackend> {
120 backend: B,
121 metrics: Box<dyn CacheMetrics>,
122 pub(crate) ttl_policy: TtlPolicy,
123}
124
125impl<B: CacheBackend> CacheExpander<B> {
126 pub fn new(backend: B) -> Self {
128 CacheExpander {
129 backend,
130 metrics: Box::new(NoOpMetrics),
131 ttl_policy: TtlPolicy::default(),
132 }
133 }
134
135 pub fn with_metrics(mut self, metrics: Box<dyn CacheMetrics>) -> Self {
137 self.metrics = metrics;
138 self
139 }
140
141 pub fn with_ttl_policy(mut self, policy: TtlPolicy) -> Self {
143 self.ttl_policy = policy;
144 self
145 }
146
147 pub async fn with<T, F, R>(
183 &self,
184 feeder: &mut F,
185 repository: &R,
186 strategy: CacheStrategy,
187 ) -> Result<()>
188 where
189 T: CacheEntity,
190 F: CacheFeed<T>,
191 R: DataRepository<T>,
192 T::Key: FromStr,
193 {
194 self.with_config::<T, F, R>(feeder, repository, strategy, OperationConfig::default())
196 .await
197 }
198
199 pub async fn with_config<T, F, R>(
242 &self,
243 feeder: &mut F,
244 repository: &R,
245 strategy: CacheStrategy,
246 config: OperationConfig,
247 ) -> Result<()>
248 where
249 T: CacheEntity,
250 F: CacheFeed<T>,
251 R: DataRepository<T>,
252 T::Key: FromStr,
253 {
254 let mut attempts = 0;
256 let max_attempts = config.retry_count + 1; loop {
259 attempts += 1;
260
261 let result = self
262 .execute_operation::<T, F, R>(feeder, repository, strategy.clone(), &config)
263 .await;
264
265 match result {
266 Ok(()) => return Ok(()),
267 Err(e) => {
268 if attempts >= max_attempts {
269 return Err(e);
270 }
271
272 debug!(
273 "Cache operation failed (attempt {}/{}), retrying...",
274 attempts, max_attempts
275 );
276
277 if config.retry_count > 0 {
279 let delay =
280 tokio::time::Duration::from_millis(100 * 2_u64.pow(attempts - 1));
281 tokio::time::sleep(delay).await;
282 }
283 }
284 }
285 }
286 }
287
288 async fn execute_operation<T, F, R>(
290 &self,
291 feeder: &mut F,
292 repository: &R,
293 strategy: CacheStrategy,
294 config: &OperationConfig,
295 ) -> Result<()>
296 where
297 T: CacheEntity,
298 F: CacheFeed<T>,
299 R: DataRepository<T>,
300 T::Key: FromStr,
301 {
302 let timer = Instant::now();
303
304 feeder.validate()?;
306
307 let entity_id = feeder.entity_id();
309 let cache_key = CacheKeyBuilder::build::<T>(&entity_id);
310
311 debug!(
312 "» Cache operation for key: {} (strategy: {})",
313 cache_key, strategy
314 );
315
316 let result = match strategy {
318 CacheStrategy::Fresh => {
319 self.strategy_fresh::<T, R>(&cache_key, repository, config)
320 .await
321 }
322 CacheStrategy::Refresh => {
323 self.strategy_refresh::<T, R>(&cache_key, repository, config)
324 .await
325 }
326 CacheStrategy::Invalidate => {
327 self.strategy_invalidate::<T, R>(&cache_key, repository, config)
328 .await
329 }
330 CacheStrategy::Bypass => {
331 self.strategy_bypass::<T, R>(&cache_key, repository, config)
332 .await
333 }
334 };
335
336 match result {
338 Ok(Some(entity)) => {
339 entity.validate()?;
340 feeder.on_hit(&cache_key)?;
341 feeder.on_loaded(&entity)?;
342 feeder.feed(Some(entity));
343 self.metrics.record_hit(&cache_key, timer.elapsed());
344 info!("✓ Cache operation succeeded in {:?}", timer.elapsed());
345 }
346 Ok(None) => {
347 feeder.on_miss(&cache_key)?;
348 feeder.feed(None);
349 self.metrics.record_miss(&cache_key, timer.elapsed());
350 debug!("Entity not found after cache operation for {}", cache_key);
351 }
352 Err(e) => {
353 self.metrics.record_error(&cache_key, &e.to_string());
354 return Err(e);
355 }
356 }
357
358 Ok(())
359 }
360
361 async fn strategy_fresh<T: CacheEntity, R: DataRepository<T>>(
363 &self,
364 cache_key: &str,
365 _repository: &R,
366 _config: &OperationConfig,
367 ) -> Result<Option<T>> {
368 debug!("Executing Fresh strategy for {}", cache_key);
369
370 match self.backend.get(cache_key).await? {
371 Some(bytes) => {
372 debug!("✓ Cache hit (Fresh strategy)");
373 T::deserialize_from_cache(&bytes).map(Some)
374 }
375 None => {
376 debug!("✗ Cache miss (Fresh strategy) - no fallback");
377 Ok(None)
378 }
379 }
380 }
381
382 async fn strategy_refresh<T: CacheEntity, R: DataRepository<T>>(
384 &self,
385 cache_key: &str,
386 repository: &R,
387 config: &OperationConfig,
388 ) -> Result<Option<T>>
389 where
390 T::Key: FromStr,
391 {
392 debug!("Executing Refresh strategy for {}", cache_key);
393
394 if let Some(bytes) = self.backend.get(cache_key).await? {
396 debug!("✓ Cache hit (Refresh strategy)");
397 return T::deserialize_from_cache(&bytes).map(Some);
398 }
399
400 debug!("Cache miss, falling back to database");
401
402 let id = self.extract_id_from_key::<T>(cache_key)?;
404 match repository.fetch_by_id(&id).await? {
405 Some(entity) => {
406 let ttl = config
409 .ttl_override
410 .or_else(|| self.ttl_policy.get_ttl(T::cache_prefix()));
411 let bytes = entity.serialize_for_cache()?;
412 let _ = self.backend.set(cache_key, bytes, ttl).await;
413 Ok(Some(entity))
414 }
415 None => Ok(None),
416 }
417 }
418
419 async fn strategy_invalidate<T: CacheEntity, R: DataRepository<T>>(
421 &self,
422 cache_key: &str,
423 repository: &R,
424 config: &OperationConfig,
425 ) -> Result<Option<T>>
426 where
427 T::Key: FromStr,
428 {
429 debug!("Executing Invalidate strategy for {}", cache_key);
430
431 self.backend.delete(cache_key).await?;
433 debug!("✓ Cache invalidated for {}", cache_key);
434
435 let id = self.extract_id_from_key::<T>(cache_key)?;
437 match repository.fetch_by_id(&id).await? {
438 Some(entity) => {
439 let ttl = config
442 .ttl_override
443 .or_else(|| self.ttl_policy.get_ttl(T::cache_prefix()));
444 let bytes = entity.serialize_for_cache()?;
445 let _ = self.backend.set(cache_key, bytes, ttl).await;
446 Ok(Some(entity))
447 }
448 None => Ok(None),
449 }
450 }
451
452 async fn strategy_bypass<T: CacheEntity, R: DataRepository<T>>(
454 &self,
455 cache_key: &str,
456 repository: &R,
457 config: &OperationConfig,
458 ) -> Result<Option<T>>
459 where
460 T::Key: FromStr,
461 {
462 debug!("Executing Bypass strategy for {}", cache_key);
463 debug!("Bypassing cache entirely for {}", cache_key);
464
465 let id = self.extract_id_from_key::<T>(cache_key)?;
467 match repository.fetch_by_id(&id).await? {
468 Some(entity) => {
469 let ttl = config
472 .ttl_override
473 .or_else(|| self.ttl_policy.get_ttl(T::cache_prefix()));
474 let bytes = entity.serialize_for_cache()?;
475 let _ = self.backend.set(cache_key, bytes, ttl).await;
476 Ok(Some(entity))
477 }
478 None => Ok(None),
479 }
480 }
481
482 fn extract_id_from_key<T: CacheEntity>(&self, cache_key: &str) -> Result<T::Key>
485 where
486 T::Key: FromStr,
487 {
488 let parts: Vec<&str> = cache_key.split(':').collect();
489 if parts.len() > 1 {
490 let id_str = parts[1..].join(":");
491 id_str.parse().ok().ok_or_else(|| {
492 Error::ValidationError(format!("Failed to parse ID from cache key: {}", cache_key))
493 })
494 } else {
495 Err(Error::ValidationError(format!(
496 "Invalid cache key format: {}",
497 cache_key
498 )))
499 }
500 }
501
502 pub fn backend(&self) -> &B {
504 &self.backend
505 }
506
507 pub fn backend_mut(&mut self) -> &mut B {
509 &mut self.backend
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use crate::backend::InMemoryBackend;
517 use crate::feed::GenericFeeder;
518 use crate::repository::InMemoryRepository;
519 use serde::{Deserialize, Serialize};
520
521 #[derive(Clone, Serialize, Deserialize)]
522 struct TestEntity {
523 id: String,
524 value: String,
525 }
526
527 impl CacheEntity for TestEntity {
528 type Key = String;
529
530 fn cache_key(&self) -> Self::Key {
531 self.id.clone()
532 }
533
534 fn cache_prefix() -> &'static str {
535 "test"
536 }
537 }
538
539 #[tokio::test]
540 async fn test_expander_with_fresh_strategy_hit() {
541 let backend = InMemoryBackend::new();
542 let expander = CacheExpander::new(backend.clone());
543
544 let entity = TestEntity {
546 id: "1".to_string(),
547 value: "data".to_string(),
548 };
549 let bytes = entity.serialize_for_cache().expect("Failed to serialize");
550 backend
551 .clone()
552 .set("test:1", bytes, None)
553 .await
554 .expect("Failed to set");
555
556 let mut feeder = GenericFeeder::new("1".to_string());
558 let repo = InMemoryRepository::new();
559
560 expander
562 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Fresh)
563 .await
564 .expect("Failed to execute");
565
566 assert!(feeder.data.is_some());
567 }
568
569 #[tokio::test]
570 async fn test_expander_with_fresh_strategy_miss() {
571 let backend = InMemoryBackend::new();
572 let expander = CacheExpander::new(backend);
573
574 let mut feeder = GenericFeeder::new("1".to_string());
575 let repo = InMemoryRepository::new();
576
577 expander
578 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Fresh)
579 .await
580 .expect("Failed to execute");
581
582 assert!(feeder.data.is_none());
583 }
584
585 #[tokio::test]
586 async fn test_expander_refresh_strategy_cache_hit() {
587 let backend = InMemoryBackend::new();
588 let expander = CacheExpander::new(backend.clone());
589
590 let entity = TestEntity {
592 id: "1".to_string(),
593 value: "cached_data".to_string(),
594 };
595 let bytes = entity.serialize_for_cache().expect("Failed to serialize");
596 backend
597 .clone()
598 .set("test:1", bytes, None)
599 .await
600 .expect("Failed to set");
601
602 let mut feeder = GenericFeeder::new("1".to_string());
603 let repo = InMemoryRepository::new();
604
605 expander
606 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Refresh)
607 .await
608 .expect("Failed to execute");
609
610 assert!(feeder.data.is_some());
611 assert_eq!(feeder.data.expect("Data not found").value, "cached_data");
612 }
613
614 #[tokio::test]
615 async fn test_expander_refresh_strategy_cache_miss_db_hit() {
616 let backend = InMemoryBackend::new();
617 let expander = CacheExpander::new(backend.clone());
618
619 let mut repo = InMemoryRepository::new();
621 repo.insert(
622 "1".to_string(),
623 TestEntity {
624 id: "1".to_string(),
625 value: "db_data".to_string(),
626 },
627 );
628
629 let mut feeder = GenericFeeder::new("1".to_string());
630
631 expander
632 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Refresh)
633 .await
634 .expect("Failed to execute");
635
636 assert!(feeder.data.is_some());
637 assert_eq!(feeder.data.expect("Data not found").value, "db_data");
638
639 let cached = backend
641 .clone()
642 .get("test:1")
643 .await
644 .expect("Failed to get from cache");
645 assert!(cached.is_some());
646 }
647
648 #[tokio::test]
649 async fn test_expander_refresh_strategy_complete_miss() {
650 let backend = InMemoryBackend::new();
651 let expander = CacheExpander::new(backend);
652
653 let mut feeder = GenericFeeder::new("nonexistent".to_string());
654 let repo = InMemoryRepository::new();
655
656 expander
657 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Refresh)
658 .await
659 .expect("Failed to execute");
660
661 assert!(feeder.data.is_none());
662 }
663
664 #[tokio::test]
665 async fn test_expander_invalidate_strategy() {
666 let backend = InMemoryBackend::new();
667 let expander = CacheExpander::new(backend.clone());
668
669 let stale_entity = TestEntity {
671 id: "1".to_string(),
672 value: "stale_data".to_string(),
673 };
674 let bytes = stale_entity
675 .serialize_for_cache()
676 .expect("Failed to serialize");
677 backend
678 .clone()
679 .set("test:1", bytes, None)
680 .await
681 .expect("Failed to set");
682
683 let mut repo = InMemoryRepository::new();
685 repo.insert(
686 "1".to_string(),
687 TestEntity {
688 id: "1".to_string(),
689 value: "fresh_data".to_string(),
690 },
691 );
692
693 let mut feeder = GenericFeeder::new("1".to_string());
694
695 expander
696 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Invalidate)
697 .await
698 .expect("Failed to execute");
699
700 assert!(feeder.data.is_some());
701 assert_eq!(feeder.data.expect("Data not found").value, "fresh_data");
702
703 let cached_bytes = backend
705 .clone()
706 .get("test:1")
707 .await
708 .expect("Failed to get")
709 .expect("Cache is empty");
710 let cached_entity =
711 TestEntity::deserialize_from_cache(&cached_bytes).expect("Failed to deserialize");
712 assert_eq!(cached_entity.value, "fresh_data");
713 }
714
715 #[tokio::test]
716 async fn test_expander_bypass_strategy() {
717 let backend = InMemoryBackend::new();
718 let expander = CacheExpander::new(backend.clone());
719
720 let cached_entity = TestEntity {
722 id: "1".to_string(),
723 value: "cached_data".to_string(),
724 };
725 let bytes = cached_entity
726 .serialize_for_cache()
727 .expect("Failed to serialize");
728 backend
729 .clone()
730 .set("test:1", bytes, None)
731 .await
732 .expect("Failed to set");
733
734 let mut repo = InMemoryRepository::new();
736 repo.insert(
737 "1".to_string(),
738 TestEntity {
739 id: "1".to_string(),
740 value: "db_data".to_string(),
741 },
742 );
743
744 let mut feeder = GenericFeeder::new("1".to_string());
745
746 expander
747 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Bypass)
748 .await
749 .expect("Failed to execute");
750
751 assert!(feeder.data.is_some());
753 assert_eq!(feeder.data.expect("Data not found").value, "db_data");
754 }
755
756 #[tokio::test]
757 async fn test_expander_with_ttl_policy() {
758 use crate::observability::TtlPolicy;
759 use std::time::Duration;
760
761 let backend = InMemoryBackend::new();
762 let expander = CacheExpander::new(backend.clone())
763 .with_ttl_policy(TtlPolicy::Fixed(Duration::from_secs(300)));
764
765 let mut repo = InMemoryRepository::new();
766 repo.insert(
767 "1".to_string(),
768 TestEntity {
769 id: "1".to_string(),
770 value: "data".to_string(),
771 },
772 );
773
774 let mut feeder = GenericFeeder::new("1".to_string());
775
776 expander
777 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Refresh)
778 .await
779 .expect("Failed to execute");
780
781 assert!(feeder.data.is_some());
782 }
783
784 #[tokio::test]
785 async fn test_expander_with_custom_metrics() {
786 use crate::observability::CacheMetrics;
787 use std::sync::{Arc, Mutex};
788 use std::time::Duration;
789
790 #[derive(Clone)]
791 struct TestMetrics {
792 hits: Arc<Mutex<usize>>,
793 misses: Arc<Mutex<usize>>,
794 }
795
796 impl CacheMetrics for TestMetrics {
797 fn record_hit(&self, _key: &str, _duration: Duration) {
798 *self.hits.lock().expect("Failed to lock hits") += 1;
799 }
800
801 fn record_miss(&self, _key: &str, _duration: Duration) {
802 *self.misses.lock().expect("Failed to lock misses") += 1;
803 }
804 }
805
806 let metrics = TestMetrics {
807 hits: Arc::new(Mutex::new(0)),
808 misses: Arc::new(Mutex::new(0)),
809 };
810
811 let backend = InMemoryBackend::new();
812 let expander = CacheExpander::new(backend.clone()).with_metrics(Box::new(metrics.clone()));
813
814 let mut repo = InMemoryRepository::new();
816 repo.insert(
817 "1".to_string(),
818 TestEntity {
819 id: "1".to_string(),
820 value: "data".to_string(),
821 },
822 );
823
824 let mut feeder = GenericFeeder::new("1".to_string());
825
826 expander
828 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Refresh)
829 .await
830 .expect("Failed to execute");
831
832 assert_eq!(*metrics.hits.lock().expect("Failed to lock hits"), 1); let mut feeder2 = GenericFeeder::new("1".to_string());
836 expander
837 .with::<TestEntity, _, _>(&mut feeder2, &repo, CacheStrategy::Refresh)
838 .await
839 .expect("Failed to execute");
840
841 assert_eq!(*metrics.hits.lock().expect("Failed to lock hits"), 2);
842 }
843
844 #[tokio::test]
845 async fn test_expander_error_on_missing_data() {
846 let backend = InMemoryBackend::new();
847 let expander = CacheExpander::new(backend);
848
849 let mut feeder = GenericFeeder::new("nonexistent".to_string());
850 let repo = InMemoryRepository::new();
851
852 let result = expander
854 .with::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Fresh)
855 .await;
856 assert!(result.is_ok());
857 assert!(feeder.data.is_none());
858 }
859
860 #[tokio::test]
861 async fn test_expander_backend_reference() {
862 let backend = InMemoryBackend::new();
863 let expander = CacheExpander::new(backend.clone());
864
865 let _backend_ref = expander.backend();
867
868 assert_eq!(backend.len().await, 0);
870 }
871
872 #[tokio::test]
873 async fn test_expander_with_config() {
874 let backend = InMemoryBackend::new();
875 let expander = CacheExpander::new(backend.clone())
876 .with_ttl_policy(TtlPolicy::Fixed(Duration::from_secs(60)));
877
878 let mut repo = InMemoryRepository::new();
879 repo.insert(
880 "1".to_string(),
881 TestEntity {
882 id: "1".to_string(),
883 value: "test_value".to_string(),
884 },
885 );
886
887 let mut feeder = GenericFeeder::new("1".to_string());
888
889 let config = OperationConfig::default()
891 .with_ttl(Duration::from_secs(300))
892 .with_retry(2);
893
894 expander
895 .with_config::<TestEntity, _, _>(&mut feeder, &repo, CacheStrategy::Refresh, config)
896 .await
897 .expect("Failed to execute with config");
898
899 assert!(feeder.data.is_some());
900 assert_eq!(feeder.data.expect("Data not found").value, "test_value");
901
902 match &expander.ttl_policy {
904 TtlPolicy::Fixed(duration) => assert_eq!(*duration, Duration::from_secs(60)),
905 _ => panic!("Expected Fixed TTL policy"),
906 }
907 }
908}