1use async_trait::async_trait;
2use distributed_cache::{DistributedCache, RedisConnection};
3use failsafe::{FailSafeCache, FailSafeConfiguration, FailSafeResult};
4use moka::{Expiry, future::Cache};
5use serde::{Serialize, de::DeserializeOwned};
6use std::{
7 collections::HashMap,
8 fmt::{Debug, Display},
9 hash::Hash,
10 marker::PhantomData,
11 sync::Arc,
12 time::Duration,
13};
14use tokio::{
15 select,
16 sync::{Mutex, broadcast},
17 task::{self, JoinHandle},
18};
19use tracing::{debug, error, info, warn};
20
21mod distributed_cache;
22mod failsafe;
23
24const LOG_TARGET: &str = "fusioncache";
25
26pub(crate) enum Source<TValue: Clone + Send + Sync + 'static> {
27 Factory(TValue),
28 Failsafe(TValue),
29}
30
31impl<TValue: Clone + Send + Sync + 'static> Source<TValue> {
32 pub(crate) fn value(&self) -> TValue {
33 match self {
34 Self::Factory(value) => value.clone(),
35 Self::Failsafe(value) => value.clone(),
36 }
37 }
38}
39
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub(crate) struct CacheValue<TValue: Clone + Send + Sync + 'static> {
42 value: TValue,
43 time_to_live: Option<std::time::Duration>,
44 time_to_idle: Option<std::time::Duration>,
45}
46
47impl<TValue: Clone + Send + Sync + 'static> CacheValue<TValue> {
48 pub fn new(
49 value: TValue,
50 time_to_live: Option<std::time::Duration>,
51 time_to_idle: Option<std::time::Duration>,
52 ) -> Self {
53 Self {
54 value,
55 time_to_live,
56 time_to_idle,
57 }
58 }
59}
60
61pub(crate) struct CacheValueExpiry;
62
63impl<TKey: Clone + Send + Sync + 'static, TValue: Clone + Send + Sync + 'static>
64 Expiry<TKey, CacheValue<TValue>> for CacheValueExpiry
65{
66 fn expire_after_create(
67 &self,
68 _key: &TKey,
69 value: &CacheValue<TValue>,
70 _created_at: std::time::Instant,
71 ) -> Option<Duration> {
72 value.time_to_live
73 }
74
75 fn expire_after_read(
76 &self,
77 _key: &TKey,
78 value: &CacheValue<TValue>,
79 read_at: std::time::Instant,
80 _duration_until_expiry: Option<Duration>,
81 last_modified_at: std::time::Instant,
82 ) -> Option<Duration> {
83 value.time_to_idle.map(|time_to_idle| {
84 let time_since_last_modified = read_at.duration_since(last_modified_at);
85 time_to_idle - time_since_last_modified
86 })
87 }
88
89 fn expire_after_update(
90 &self,
91 _key: &TKey,
92 value: &CacheValue<TValue>,
93 _updated_at: std::time::Instant,
94 _duration_until_expiry: Option<Duration>,
95 ) -> Option<Duration> {
96 value.time_to_live
97 }
98}
99
100#[derive(Clone, Debug)]
101pub struct FusionCacheOptions {
102 entry_ttl: Option<std::time::Duration>,
103 ignore_ttl: bool,
104 entry_tti: Option<std::time::Duration>,
105 ignore_tti: bool,
106 fail_safe_configuration: Option<FailSafeConfiguration>,
107 skip_fail_safe_cache: bool,
108 hard_timeout: Option<std::time::Duration>,
109 skip_hard_timeout: bool,
110 skip_distributed_cache: bool,
111 should_redis_writes_happen_in_background: Option<bool>,
112}
113
114pub struct FusionCacheOptionsBuilder {
115 entry_ttl: Option<std::time::Duration>,
116 ignore_ttl: bool,
117 entry_tti: Option<std::time::Duration>,
118 ignore_tti: bool,
119 fail_safe_configuration: Option<FailSafeConfiguration>,
120 skip_fail_safe_cache: bool,
121 hard_timeout: Option<std::time::Duration>,
122 skip_hard_timeout: bool,
123 skip_distributed_cache: bool,
124 should_redis_writes_happen_in_background: Option<bool>,
125}
126
127impl FusionCacheOptionsBuilder {
128 pub fn new() -> Self {
129 Self {
130 entry_ttl: None,
131 ignore_ttl: false,
132 entry_tti: None,
133 ignore_tti: false,
134 fail_safe_configuration: None,
135 skip_fail_safe_cache: false,
136 hard_timeout: None,
137 skip_hard_timeout: false,
138 skip_distributed_cache: false,
139 should_redis_writes_happen_in_background: None,
140 }
141 }
142
143 pub fn with_time_to_live(mut self, time_to_live: std::time::Duration) -> Self {
149 self.entry_ttl = Some(time_to_live);
150 self
151 }
152
153 pub fn ignore_time_to_live(mut self) -> Self {
155 self.ignore_ttl = true;
156 self
157 }
158
159 pub fn with_time_to_idle(mut self, time_to_idle: std::time::Duration) -> Self {
165 self.entry_tti = Some(time_to_idle);
166 self
167 }
168
169 pub fn ignore_time_to_idle(mut self) -> Self {
171 self.ignore_tti = true;
172 self
173 }
174
175 pub fn with_fail_safe(
184 mut self,
185 entry_ttl: std::time::Duration,
186 failsafe_cycle_ttl: std::time::Duration,
187 max_cycles: Option<u64>,
188 soft_timeout: Option<std::time::Duration>,
189 ) -> Self {
190 self.fail_safe_configuration = Some(FailSafeConfiguration::new(
191 entry_ttl,
192 failsafe_cycle_ttl,
193 max_cycles,
194 soft_timeout,
195 ));
196 self
197 }
198
199 pub fn skip_fail_safe_cache(mut self) -> Self {
201 self.skip_fail_safe_cache = true;
202 self
203 }
204
205 pub fn with_hard_timeout(mut self, hard_timeout: std::time::Duration) -> Self {
211 self.hard_timeout = Some(hard_timeout);
212 self
213 }
214
215 pub fn skip_hard_timeout(mut self) -> Self {
217 self.skip_hard_timeout = true;
218 self
219 }
220
221 pub fn skip_distributed_cache(mut self) -> Self {
223 self.skip_distributed_cache = true;
224 self
225 }
226
227 pub fn build(self) -> FusionCacheOptions {
228 FusionCacheOptions {
229 entry_ttl: self.entry_ttl,
230 ignore_ttl: self.ignore_ttl,
231 entry_tti: self.entry_tti,
232 ignore_tti: self.ignore_tti,
233 fail_safe_configuration: self.fail_safe_configuration,
234 skip_fail_safe_cache: self.skip_fail_safe_cache,
235 hard_timeout: self.hard_timeout,
236 skip_hard_timeout: self.skip_hard_timeout,
237 skip_distributed_cache: self.skip_distributed_cache,
238 should_redis_writes_happen_in_background: self.should_redis_writes_happen_in_background,
239 }
240 }
241}
242
243impl FusionCacheOptions {
244 fn with_overrides(self, overrides: Option<FusionCacheOptions>) -> FusionCacheOptions {
245 if let Some(overrides) = overrides {
246 FusionCacheOptions {
247 fail_safe_configuration: if let Some(fsc_override) =
248 overrides.fail_safe_configuration
249 {
250 Some(fsc_override)
251 } else if !overrides.skip_fail_safe_cache {
252 self.fail_safe_configuration
253 } else {
254 None
255 },
256 hard_timeout: if let Some(ht_override) = overrides.hard_timeout {
257 Some(ht_override)
258 } else if !overrides.skip_hard_timeout {
259 self.hard_timeout
260 } else {
261 None
262 },
263 entry_ttl: if let Some(ttl_override) = overrides.entry_ttl {
264 Some(ttl_override)
265 } else if !overrides.ignore_ttl {
266 self.entry_ttl
267 } else {
268 None
269 },
270 entry_tti: if let Some(tti_override) = overrides.entry_tti {
271 Some(tti_override)
272 } else if !overrides.ignore_tti {
273 self.entry_tti
274 } else {
275 None
276 },
277 skip_distributed_cache: overrides.skip_distributed_cache,
278 ignore_ttl: overrides.ignore_ttl,
279 ignore_tti: overrides.ignore_tti,
280 should_redis_writes_happen_in_background: if let Some(
281 should_redis_writes_happen_in_background,
282 ) =
283 overrides.should_redis_writes_happen_in_background
284 {
285 Some(should_redis_writes_happen_in_background)
286 } else {
287 self.should_redis_writes_happen_in_background
288 },
289 skip_fail_safe_cache: overrides.skip_fail_safe_cache,
290 skip_hard_timeout: overrides.skip_hard_timeout,
291 }
292 } else {
293 FusionCacheOptions { ..self }
294 }
295 }
296}
297
298#[async_trait]
299pub trait Factory<
309 TKey: Clone + Send + Sync + 'static,
310 TValue: Clone + Send + Sync + 'static,
311 TError: Clone + Send + Sync + 'static,
312>: Send + Sync + Clone + Debug + 'static
313{
314 async fn get(&self, key: &TKey) -> Result<TValue, TError>;
321}
322
323pub struct FusionCacheBuilder<
324 TKey: Hash + Eq + Send + Sync + Clone + Serialize + DeserializeOwned + 'static,
325 TValue: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
326> {
327 capacity: u64,
328 fail_safe_configuration: Option<FailSafeConfiguration>,
329 hard_timeout: Option<std::time::Duration>,
330 redis_address: Option<String>,
331 application_name: Option<String>,
332 should_factory_execute_in_background: bool,
333 should_redis_writes_happen_in_background: bool,
334 phantom_t_key: PhantomData<TKey>,
335 phantom_t_value: PhantomData<TValue>,
336 entry_ttl: Option<std::time::Duration>,
337 entry_tti: Option<std::time::Duration>,
338 distributed_cache_entry_ttl: Option<Duration>,
339}
340
341impl<
342 TKey: Hash + Eq + Send + Sync + Clone + Serialize + DeserializeOwned + Debug + 'static,
343 TValue: Clone + Send + Sync + Serialize + DeserializeOwned + Debug + 'static,
344> FusionCacheBuilder<TKey, TValue>
345{
346 pub fn new() -> Self {
347 Self {
348 capacity: 1000,
349 fail_safe_configuration: None,
350 should_factory_execute_in_background: false,
351 phantom_t_key: PhantomData,
352 phantom_t_value: PhantomData,
353 hard_timeout: None,
354 redis_address: None,
355 application_name: None,
356 should_redis_writes_happen_in_background: false,
357 entry_ttl: None,
358 entry_tti: None,
359 distributed_cache_entry_ttl: None,
360 }
361 }
362
363 pub fn with_capacity(mut self, capacity: u64) -> Self {
369 self.capacity = capacity;
370 self
371 }
372
373 pub fn with_time_to_live(mut self, time_to_live: std::time::Duration) -> Self {
377 self.entry_ttl = Some(time_to_live);
378 self
379 }
380
381 pub fn with_time_to_idle(mut self, time_to_idle: std::time::Duration) -> Self {
385 self.entry_tti = Some(time_to_idle);
386 self
387 }
388
389 pub fn with_fail_safe(
397 mut self,
398 entry_ttl: std::time::Duration,
399 failsafe_cycle_ttl: std::time::Duration,
400 max_cycles: Option<u64>,
401 soft_timeout: Option<std::time::Duration>,
402 ) -> Self {
403 self.fail_safe_configuration = Some(FailSafeConfiguration::new(
404 entry_ttl,
405 failsafe_cycle_ttl,
406 max_cycles,
407 soft_timeout,
408 ));
409 self
410 }
411
412 pub fn with_hard_timeout(mut self, timeout: std::time::Duration) -> Self {
418 self.hard_timeout = Some(timeout);
419 self
420 }
421
422 pub fn set_factory_background_execution(mut self, should_execute_in_background: bool) -> Self {
428 self.should_factory_execute_in_background = should_execute_in_background;
429 self
430 }
431
432 pub fn with_redis(
441 mut self,
442 address: String,
443 application_name: String,
444 should_writes_happen_in_background: bool,
445 entry_ttl: Option<std::time::Duration>,
446 ) -> Self {
447 self.redis_address = Some(address);
448 self.application_name = Some(application_name);
449 self.should_redis_writes_happen_in_background = should_writes_happen_in_background;
450 self.distributed_cache_entry_ttl = entry_ttl;
451 self
452 }
453
454 pub async fn build(self) -> Result<FusionCache<TKey, TValue>, FusionCacheError> {
455 let (distributed_cache_eviction_sender, mut distributed_cache_eviction_receiver) =
456 tokio::sync::mpsc::channel(1000000);
457 let distributed_cache = if let Some(address) = self.redis_address {
458 debug!(target: LOG_TARGET, "Building distributed cache with Redis address: {}", address);
459 let redis_client = redis::Client::open(address).unwrap();
460 let inner_redis_connection = redis_client
461 .get_multiplexed_async_connection()
462 .await
463 .map_err(|e| FusionCacheError::InitializationError(e.to_string()))?;
464 let redis_connection = RedisConnection::new(inner_redis_connection);
465 let mut distributed_cache = DistributedCache::new(
466 redis_connection,
467 redis_client,
468 distributed_cache_eviction_sender,
469 self.application_name.unwrap(),
470 self.distributed_cache_entry_ttl,
471 );
472 distributed_cache
473 .start_synchronization()
474 .await
475 .map_err(|e| FusionCacheError::InitializationError(e.to_string()))?;
476 Some(distributed_cache)
477 } else {
478 debug!(target: LOG_TARGET, "No distributed cache configured");
479 None
480 };
481 let cache_builder = Cache::builder()
482 .expire_after(CacheValueExpiry)
483 .max_capacity(self.capacity);
484 let cache: Cache<TKey, CacheValue<TValue>> = cache_builder.build();
485 let fail_safe_cache: Option<FailSafeCache<TKey, CacheValue<TValue>>> =
486 if let Some(fail_safe_configuration) = self.fail_safe_configuration {
487 debug!(
488 target: LOG_TARGET,
489 "Setting fail-safe cache with configuration: {:?}",
490 fail_safe_configuration
491 );
492 Some(FailSafeCache::new(fail_safe_configuration))
493 } else {
494 debug!(target: LOG_TARGET, "No fail-safe cache configuration set for cache");
495 None
496 };
497 let _cache = cache.clone();
498 let mut _fail_safe_cache = fail_safe_cache.clone();
499 let cache_synchronization_task = if let Some(_) = &distributed_cache {
500 Some(task::spawn(async move {
501 while let Some(key) = distributed_cache_eviction_receiver.recv().await {
502 if let Some(_) = _cache.get(&key).await {
503 _cache.invalidate(&key).await;
504 }
505 if let Some(fsc) = &mut _fail_safe_cache {
506 fsc.invalidate(&key).await;
507 }
508 }
509 }))
510 } else {
511 None
512 };
513 Ok(FusionCache {
514 cache,
515 time_to_live: self.entry_ttl,
516 time_to_idle: self.entry_tti,
517 should_factory_execute_in_background: self.should_factory_execute_in_background,
518 hard_timeout: self.hard_timeout,
519 fail_safe_cache: fail_safe_cache.clone(),
520 in_flight_factory_requests: Arc::new(Mutex::new(HashMap::new())),
521 distributed_cache,
522 should_redis_writes_happen_in_background: self.should_redis_writes_happen_in_background,
523 _cache_synchronization_task: cache_synchronization_task
524 .map(|t| Arc::new(Mutex::new(t))),
525 background_factory_tasks: Arc::new(Mutex::new(HashMap::new())),
526 })
527 }
528}
529
530#[derive(Debug, Clone)]
532pub enum FusionCacheError {
533 Other,
535 SystemCorruption,
537 FactoryError(String),
539 FactoryTimeout,
541 InitializationError(String),
543 RedisError(String),
545}
546
547impl Display for FusionCacheError {
548 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
549 match self {
550 FusionCacheError::Other => write!(f, "Other"),
551 FusionCacheError::SystemCorruption => write!(f, "SystemCorruption"),
552 FusionCacheError::FactoryError(error_message) => write!(f, "{}", error_message),
553 FusionCacheError::FactoryTimeout => write!(f, "FactoryTimeout"),
554 FusionCacheError::InitializationError(error_message) => {
555 write!(f, "{}", error_message)
556 }
557 FusionCacheError::RedisError(error_message) => {
558 write!(f, "{}", error_message)
559 }
560 }
561 }
562}
563
564#[derive(Clone, Debug)]
568pub struct FusionCache<
569 TKey: Hash + Eq + Send + Sync + Clone + Serialize + DeserializeOwned + 'static,
570 TValue: Clone + Send + Sync + Clone + Serialize + DeserializeOwned + 'static,
571> {
572 cache: Cache<TKey, CacheValue<TValue>>,
573 time_to_live: Option<std::time::Duration>,
574 time_to_idle: Option<std::time::Duration>,
575 hard_timeout: Option<std::time::Duration>,
576 should_factory_execute_in_background: bool,
577 fail_safe_cache: Option<FailSafeCache<TKey, CacheValue<TValue>>>,
578 in_flight_factory_requests:
580 Arc<Mutex<HashMap<TKey, broadcast::Sender<Result<TValue, FusionCacheError>>>>>,
581 distributed_cache: Option<DistributedCache<TKey, TValue>>,
582 _cache_synchronization_task: Option<Arc<Mutex<JoinHandle<()>>>>,
583 should_redis_writes_happen_in_background: bool,
584 background_factory_tasks: Arc<
585 Mutex<
586 HashMap<
587 TKey,
588 (
589 JoinHandle<()>,
590 broadcast::Sender<Result<TValue, FusionCacheError>>,
591 ),
592 >,
593 >,
594 >,
595}
596
597impl<
598 TKey: Hash + Eq + Send + Sync + Clone + Serialize + DeserializeOwned + Debug + 'static,
599 TValue: Clone + Send + Sync + Clone + Serialize + DeserializeOwned + Debug + 'static,
600> FusionCache<TKey, TValue>
601{
602 #[tracing::instrument(name = "FusionCache::get_or_set", skip(self))]
618 pub async fn get_or_set<
619 TError: Clone + Send + Sync + Display + 'static,
620 F: Factory<TKey, TValue, TError>,
621 >(
622 &mut self,
623 key: TKey,
624 factory: F,
625 options: Option<FusionCacheOptions>,
626 ) -> Result<TValue, FusionCacheError> {
627 let options = self.get_options().with_overrides(options);
628 info!(target: LOG_TARGET, "Cache lookup for key: {:?}", key);
629 let maybe_entry = self.cache.get(&key).await;
630 match maybe_entry {
631 Some(entry) => {
632 info!(target: LOG_TARGET, "Cache hit for key: {:?}", key);
633 Ok(entry.value)
634 }
635 None => {
636 let maybe_value_from_redis =
637 if let Some(distributed_cache) = &mut self.distributed_cache {
638 distributed_cache.get(&key).await?
639 } else {
640 None
641 };
642 if let Some(value_from_redis) = maybe_value_from_redis {
643 info!(target: LOG_TARGET, "Cache miss for key: {:?}, hit in Redis", key);
644 self.cache
645 .insert(key.clone(), value_from_redis.clone())
646 .await;
647 Ok(value_from_redis.value)
648 } else {
649 info!(target: LOG_TARGET, "Cache miss for key: {:?}. Retrieving from factory", key);
650 self.request_key(key, factory, options).await
651 }
652 }
653 }
654 }
655
656 #[tracing::instrument(name = "FusionCache::request_key", skip(self))]
657 async fn request_key<
658 TError: Clone + Send + Sync + Display + 'static,
659 F: Factory<TKey, TValue, TError>,
660 >(
661 &mut self,
662 key: TKey,
663 factory: F,
664 options: FusionCacheOptions,
665 ) -> Result<TValue, FusionCacheError> {
666 let mut in_flight_factory_requests = self.in_flight_factory_requests.lock().await;
667 let maybe_factory_sender = in_flight_factory_requests.get(&key).cloned();
668 match maybe_factory_sender {
669 Some(factory_sender) => {
670 info!(target: LOG_TARGET, "Factory already in flight for key: {:?}, waiting", key);
671 drop(in_flight_factory_requests);
672 let mut factory_receiver = factory_sender.subscribe();
673 let factory_receiver_result = factory_receiver.recv().await;
674 if let Ok(factory_result) = factory_receiver_result {
675 info!(target: LOG_TARGET, "Factory result received for key: {:?}", key);
676 factory_result
677 } else {
678 if let Some(value) = self.cache.get(&key).await.map(|entry| entry.value) {
681 info!(target: LOG_TARGET, "Factory result received for key: {:?}, but receiver was dropped. Value was in cache. Returning value", key);
682 Ok(value)
683 } else {
684 error!(target: LOG_TARGET, "There's a very high probablity that your system is corrupted. Factory result received for key: {:?}, but receiver was dropped, and the value was not in cache. Returning error.", key);
685 Err(FusionCacheError::SystemCorruption)
686 }
687 }
688 }
689 None => {
690 info!(target: LOG_TARGET, "No factory in flight for key: {:?}, creating new factory", key);
691 let (factory_sender, _) = broadcast::channel(1);
692 in_flight_factory_requests.insert(key.clone(), factory_sender.clone());
693 drop(in_flight_factory_requests);
694 let factory_value_or_failsafe = self
695 .get_from_factory_or_fail_safe(key.clone(), factory, options.clone())
696 .await;
697
698 match &factory_value_or_failsafe {
699 Ok(Source::Factory(value)) => {
700 info!(target: LOG_TARGET, "Factory result received for key: {:?}, setting value in cache", key);
701 self.set_internal(key.clone(), value.clone(), options).await;
702 }
703 Ok(Source::Failsafe(_)) => {
704 info!(target: LOG_TARGET, "The factory errored out, however failsafe result received for key: {:?}, setting value in cache. Returning failsafe value.", key);
705 }
706 Err(e) => {
707 error!(target: LOG_TARGET, "Error received from factory: {:?}. Returning error.", e);
708 }
709 }
710
711 let retrieval_result = factory_value_or_failsafe.map(|s| s.value());
712 let _ = factory_sender.send(retrieval_result.clone());
713 self.remove_from_in_flight_factory_requests(&key).await;
714 retrieval_result
715 }
716 }
717 }
718
719 #[tracing::instrument(name = "FusionCache::get_from_factory_or_fail_safe", skip(self))]
720 async fn get_from_factory_or_fail_safe<
721 TError: Clone + Send + Sync + Display + 'static,
722 F: Factory<TKey, TValue, TError>,
723 >(
724 &mut self,
725 key: TKey,
726 factory: F,
727 options: FusionCacheOptions,
728 ) -> Result<Source<TValue>, FusionCacheError> {
729 if let Some(fail_safe_cache) = &mut self.fail_safe_cache {
730 let fail_safe_result = fail_safe_cache.get(&key).await;
731 match fail_safe_result {
732 FailSafeResult::NotInFailSafeMode | FailSafeResult::CurrentCycleEnded => {
733 info!(target: LOG_TARGET, "Not in fail-safe mode for key: {:?}, getting value from factory", key);
734 self.do_get_from_factory_or_failsafe(key.clone(), factory, options)
735 .await
736 }
737 FailSafeResult::Hit(value) => {
738 warn!(target: LOG_TARGET, "Currently in fail-safe mode. Fail-safe cache hit for key: {:?}, returning failsafe value", key);
739 Ok(Source::Failsafe(value.value))
740 }
741 FailSafeResult::Miss(error_message) => {
742 error!(target: LOG_TARGET, "Fail-safe cache miss for key: {:?}, exiting fail-safe mode", key);
743 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
744 Err(FusionCacheError::FactoryError(error_message))
745 }
746 FailSafeResult::TooManyCycles(error_message) => {
747 error!(target: LOG_TARGET, "Fail-safe cache too many cycles for key: {:?}, exiting fail-safe mode", key);
748 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
749 fail_safe_cache.invalidate(&key).await;
750 Err(FusionCacheError::FactoryError(error_message))
751 }
752 }
753 } else {
754 if let Some(hard_timeout) = self.hard_timeout {
755 let result_or_timeout = tokio::time::timeout(hard_timeout, factory.get(&key)).await;
756 match result_or_timeout {
757 Ok(factory_value) => factory_value
758 .map_err(|e| FusionCacheError::FactoryError(e.to_string()))
759 .map(Source::Factory),
760 Err(_) => {
761 error!(target: LOG_TARGET, "Factory operation timed out for key: {:?} on hard timeout, returning error", key);
762 Err(FusionCacheError::FactoryTimeout)
763 }
764 }
765 } else {
766 info!(target: LOG_TARGET, "No timeouts configured, getting value from factory for key: {:?}", key);
767 factory
768 .get(&key)
769 .await
770 .map_err(|e| FusionCacheError::FactoryError(e.to_string()))
771 .map(Source::Factory)
772 }
773 }
774 }
775
776 async fn remove_from_in_flight_factory_requests(&self, key: &TKey) {
777 let mut in_flight_factory_requests = self.in_flight_factory_requests.lock().await;
778 if let Some(_) = in_flight_factory_requests.get(key) {
779 in_flight_factory_requests.remove(key);
780 }
781 drop(in_flight_factory_requests);
782 }
783
784 #[tracing::instrument(name = "FusionCache::get", skip(self))]
793 pub async fn get(&mut self, key: TKey) -> Option<TValue> {
794 if let Some(distributed_cache) = &mut self.distributed_cache {
795 let value = distributed_cache.get(&key).await;
796 if let Ok(value) = value {
797 if let Some(v) = value {
798 self.cache.insert(key, v.clone()).await;
799 return Some(v.value);
800 }
801 }
802 }
803 self.cache.get(&key).await.map(|entry| entry.value)
804 }
805
806 #[tracing::instrument(name = "FusionCache::set", skip(self))]
813 pub async fn set(&mut self, key: TKey, value: TValue, options: Option<FusionCacheOptions>) {
814 let opts = options.unwrap_or(self.get_options());
815 self.set_internal(key, value, opts).await;
816 }
817
818 #[tracing::instrument(name = "FusionCache::set_internal", skip(self))]
819 async fn set_internal(&mut self, key: TKey, value: TValue, options: FusionCacheOptions) {
820 let cache_value = CacheValue::new(value, options.entry_ttl, options.entry_tti);
821 if let Some(distributed_cache) = &mut self.distributed_cache {
822 if !options.skip_distributed_cache {
823 if !options
824 .should_redis_writes_happen_in_background
825 .is_some_and(|bw| bw)
826 {
827 info!(target: LOG_TARGET, "Setting value in distributed cache for key: {:?}", key);
828 let _ = distributed_cache.set(&key, &cache_value).await;
829 } else {
830 let mut _distributed_cache = distributed_cache.clone();
831 let _cache_value = cache_value.clone();
832 let _key = key.clone();
833 info!(target: LOG_TARGET, "Spawning task to set value in distributed cache for key: {:?}", key);
834 task::spawn(async move {
835 let _ = _distributed_cache.set(&_key, &_cache_value).await;
836 });
837 }
838 } else {
839 info!(target: LOG_TARGET, "Skipping distributed cache write for key: {:?}", key);
840 }
841 } else {
842 info!(target: LOG_TARGET, "No distributed cache configured, skipping distributed cache write for key: {:?}", key);
843 }
844 if let Some(fail_safe_cache) = &mut self.fail_safe_cache {
845 info!(target: LOG_TARGET, "Setting value in fail-safe cache for key: {:?}", key);
846 fail_safe_cache
847 .insert(key.clone(), cache_value.clone())
848 .await;
849 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
850 }
851 info!(target: LOG_TARGET, "Setting value in local cache for key: {:?}", key);
852 self.cache.insert(key, cache_value).await;
853 }
854
855 pub async fn evict(&mut self, key: TKey) {
860 self.cache.invalidate(&key).await;
861 if let Some(distributed_cache) = &mut self.distributed_cache {
862 distributed_cache.evict(&key).await;
863 }
864 }
865 #[tracing::instrument(name = "FusionCache::do_get_from_factory_or_failsafe", skip(self))]
867 async fn do_get_from_factory_or_failsafe<
868 TError: Clone + Send + Sync + Display + 'static,
869 F: Factory<TKey, TValue, TError>,
870 >(
871 &mut self,
872 key: TKey,
873 factory: F,
874 options: FusionCacheOptions,
875 ) -> Result<Source<TValue>, FusionCacheError> {
876 if let Some(_) = options
877 .fail_safe_configuration
878 .as_ref()
879 .and_then(|c| c.soft_timeout)
880 {
881 info!(target: LOG_TARGET, "Getting value from factory or failsafe cache for key: {:?}", key);
882 self.factory_result_with_soft_timeout(&key, factory, options)
883 .await
884 } else if let Some(hard_timeout) = self.hard_timeout {
885 let result = select! {
886 factory_result = factory.get(&key) => {
887 factory_result.map_err(|e| FusionCacheError::FactoryError(e.to_string()))
888 }
889 _ = tokio::time::sleep(hard_timeout) => {
890 Err(FusionCacheError::FactoryTimeout)
891 }
892 };
893
894 self.process_factory_result_or_timeout(&key, result).await
895 } else {
896 let result = factory
897 .get(&key)
898 .await
899 .map_err(|e| FusionCacheError::FactoryError(e.to_string()));
900
901 self.process_factory_result_or_timeout(&key, result).await
902 }
903 }
904
905 #[tracing::instrument(name = "FusionCache::factory_result_with_soft_timeout", skip(self))]
906 async fn factory_result_with_soft_timeout<
907 TError: Clone + Send + Sync + Display + 'static,
908 F: Factory<TKey, TValue, TError>,
909 >(
910 &mut self,
911 key: &TKey,
912 factory: F,
913 options: FusionCacheOptions,
914 ) -> Result<Source<TValue>, FusionCacheError> {
915 let soft_timeout = options
916 .fail_safe_configuration
917 .as_ref()
918 .unwrap()
919 .soft_timeout
920 .unwrap();
921 let local_factory_result_sender = self
922 .get_or_create_background_factory_task(key, factory, options)
923 .await;
924 let mut local_factory_result_receiver = local_factory_result_sender.subscribe();
925 select! {
926 factory_result = local_factory_result_receiver.recv() => {
927 if let Ok(r) = factory_result {
928 if let Ok(value) = r {
929 Ok(Source::Factory(value))
930 } else {
931 let fail_safe_cache = self.fail_safe_cache.as_mut().unwrap();
932 fail_safe_cache.start_failsafe_cycle(key.clone(), r.unwrap_err().to_string()).await;
933 let fail_safe_result = fail_safe_cache.get(&key).await;
934 match fail_safe_result {
935 FailSafeResult::Hit(value)=> {
936 Ok(Source::Failsafe(value.value))
937 }
938 FailSafeResult::Miss(error_message)=>{
939 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
940 Err(FusionCacheError::FactoryError(error_message))
941 }
942 FailSafeResult::TooManyCycles(error_message)=>{
943 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
944 fail_safe_cache.invalidate(&key).await;
945 Err(FusionCacheError::FactoryError(error_message))
946 }
947 FailSafeResult::CurrentCycleEnded=>{
948 panic!("Got FailSafeResult::CurrentCycleEnded, but we just started a cycle");
949 }
950 FailSafeResult::NotInFailSafeMode => {
951 panic!("Got FailSafeResult::NotInFailSafeMode, but we just entered fail safe mode");
952 }
953 }
954 }
955 } else {
956 Err(FusionCacheError::SystemCorruption)
957 }
958 }
959 _ = tokio::time::sleep(soft_timeout) => {
960 let fail_safe_cache = self.fail_safe_cache.as_mut().unwrap();
961 fail_safe_cache.start_failsafe_cycle(key.clone(), "timeout".to_string()).await;
962 let fail_safe_result = fail_safe_cache.get(&key).await;
963 match fail_safe_result {
964 FailSafeResult::Hit(value) => {
965 if !self.should_factory_execute_in_background {
966 let mut background_factory_tasks = self.background_factory_tasks.lock().await;
967 let task = background_factory_tasks.remove(&key);
968 drop(background_factory_tasks);
969 if let Some((task, _)) = task {
970 task.abort();
971 }
972 }
973 Ok(Source::Failsafe(value.value))
974 }
975 FailSafeResult::Miss(_) => {
976 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
977 if let Some(hard_timeout) = self.hard_timeout {
978 tokio::time::sleep(hard_timeout - soft_timeout).await;
979 let mut background_factory_tasks = self.background_factory_tasks.lock().await;
980 let task = background_factory_tasks.remove(&key);
981 drop(background_factory_tasks);
982 if let Some((task, _)) = task {
983 task.abort();
984 }
985 Err(FusionCacheError::FactoryTimeout)
986 } else {
987 Err(FusionCacheError::FactoryTimeout)
988 }
989 }
990 FailSafeResult::TooManyCycles(error_message) => {
991 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
992 if let Some(hard_timeout) = self.hard_timeout {
993 tokio::time::sleep(hard_timeout - soft_timeout).await;
994 let mut background_factory_tasks = self.background_factory_tasks.lock().await;
995 let task = background_factory_tasks.remove(&key);
996 drop(background_factory_tasks);
997 if let Some((task, _)) = task {
998 task.abort();
999 }
1000 Err(FusionCacheError::FactoryTimeout)
1001 } else {
1002 Err(FusionCacheError::FactoryTimeout)
1003 }
1004 }
1005 FailSafeResult::CurrentCycleEnded => {
1006 panic!(
1007 "Got FailSafeResult::CurrentCycleEnded, but we just started a cycle"
1008 );
1009 }
1010 FailSafeResult::NotInFailSafeMode => {
1011 panic!(
1012 "Got FailSafeResult::NotInFailSafeMode, but we just entered fail safe mode"
1013 );
1014 }
1015 }
1016 }
1017 }
1018 }
1019
1020 async fn get_or_create_background_factory_task<
1024 TError: Clone + Send + Sync + Display + 'static,
1025 F: Factory<TKey, TValue, TError>,
1026 >(
1027 &mut self,
1028 key: &TKey,
1029 factory: F,
1030 options: FusionCacheOptions,
1031 ) -> broadcast::Sender<Result<TValue, FusionCacheError>> {
1032 let mut background_factory_tasks = self.background_factory_tasks.lock().await;
1033 let background_factory_in_execution = background_factory_tasks.remove(&key);
1034
1035 let (factory_task, local_factory_result_sender) =
1036 if let Some((f, local_factory_result_sender)) = background_factory_in_execution {
1037 (f, local_factory_result_sender.clone())
1038 } else {
1039 let (local_factory_result_sender, _) =
1040 tokio::sync::broadcast::channel::<Result<TValue, FusionCacheError>>(1);
1041 (
1042 self.start_factory_in_background(
1043 &key,
1044 &factory,
1045 local_factory_result_sender.clone(),
1046 options,
1047 ),
1048 local_factory_result_sender.clone(),
1049 )
1050 };
1051 background_factory_tasks.insert(
1052 key.clone(),
1053 (factory_task, local_factory_result_sender.clone()),
1054 );
1055 drop(background_factory_tasks);
1056 local_factory_result_sender
1057 }
1058
1059 async fn process_factory_result_or_timeout(
1060 &mut self,
1061 key: &TKey,
1062 result: Result<TValue, FusionCacheError>,
1063 ) -> Result<Source<TValue>, FusionCacheError> {
1064 match result {
1065 Ok(factory_value) => Ok(Source::Factory(factory_value)),
1066 Err(e) => {
1067 let fail_safe_cache = self.fail_safe_cache.as_mut().unwrap();
1068 fail_safe_cache
1069 .start_failsafe_cycle(key.clone(), e.to_string())
1070 .await;
1071 let fail_safe_result = fail_safe_cache.get(key).await;
1072 match fail_safe_result {
1073 FailSafeResult::Hit(value) => Ok(Source::Failsafe(value.value)),
1074 FailSafeResult::Miss(error_message) => {
1075 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
1076 Err(FusionCacheError::FactoryError(error_message))
1077 }
1078 FailSafeResult::TooManyCycles(error_message) => {
1079 fail_safe_cache.exit_failsafe_mode(key.clone()).await;
1080 Err(FusionCacheError::FactoryError(error_message))
1081 }
1082 FailSafeResult::CurrentCycleEnded => {
1083 panic!(
1084 "Got FailSafeResult::CurrentCycleEnded, but we just started a cycle"
1085 );
1086 }
1087 FailSafeResult::NotInFailSafeMode => {
1088 panic!(
1089 "Got FailSafeResult::NotInFailSafeMode, but we just entered fail safe mode"
1090 );
1091 }
1092 }
1093 }
1094 }
1095 }
1096
1097 fn start_factory_in_background<
1098 TError: Clone + Send + Sync + Display + 'static,
1099 F: Factory<TKey, TValue, TError>,
1100 >(
1101 &self,
1102 key: &TKey,
1103 factory: &F,
1104 tx: tokio::sync::broadcast::Sender<Result<TValue, FusionCacheError>>,
1105 options: FusionCacheOptions,
1106 ) -> task::JoinHandle<()> {
1107 let _key = key.clone();
1108 let _factory = factory.clone();
1109 let mut _self = self.clone();
1110 task::spawn(async move {
1111 let hard_timeout = _self.hard_timeout.unwrap_or(Duration::MAX);
1112 let factory_result = select! {
1113 result = _factory.get(&_key) => {
1114 result.map_err(|e| FusionCacheError::FactoryError(e.to_string()))
1115 },
1116 _ = tokio::time::sleep(hard_timeout) => {
1117 Err(FusionCacheError::FactoryTimeout)
1118 }
1119 };
1120
1121 match factory_result {
1122 Ok(factory_value) => {
1123 let _ = tx.send(Ok(factory_value.clone()));
1124 _self
1125 .set_internal(_key.clone(), factory_value, options)
1126 .await;
1127 }
1128 Err(e) => {
1129 _self.remove_from_in_flight_factory_requests(&_key).await;
1130 let _ = tx.send(Err(e.into()));
1131 }
1132 };
1133 let mut background_factory_tasks = _self.background_factory_tasks.lock().await;
1134 background_factory_tasks.remove(&_key);
1135 drop(background_factory_tasks);
1136 })
1137 }
1138
1139 fn get_options(&self) -> FusionCacheOptions {
1140 FusionCacheOptions {
1141 entry_ttl: self.time_to_live,
1142 entry_tti: self.time_to_idle,
1143 fail_safe_configuration: self
1144 .fail_safe_cache
1145 .as_ref()
1146 .map(|c| c.configuration.clone()),
1147 hard_timeout: self.hard_timeout.clone(),
1148 skip_distributed_cache: self.distributed_cache.is_none(),
1149 ignore_ttl: self.time_to_live.is_none(),
1150 ignore_tti: self.time_to_idle.is_none(),
1151 skip_fail_safe_cache: self.fail_safe_cache.is_none(),
1152 skip_hard_timeout: self.hard_timeout.is_none(),
1153 should_redis_writes_happen_in_background: Some(
1154 self.should_redis_writes_happen_in_background,
1155 ),
1156 }
1157 }
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162 use core::fmt;
1163 use std::time::Duration;
1164
1165 use super::*;
1166
1167 use futures::future::join_all;
1168 use tracing::{Event, Subscriber};
1169 use tracing_subscriber::{
1170 fmt::{FmtContext, FormatEvent, format},
1171 registry::LookupSpan,
1172 };
1173
1174 #[tokio::test]
1175 async fn test_factory_only_gets_called_once_if_multiple_threads_request_the_same_key() {
1176 let factory = TestFactory::new();
1177 let cache = FusionCacheBuilder::new().build().await.unwrap();
1178 let key = 1;
1179 let mut handles = vec![];
1180 for _ in 0..100000 {
1181 let factory = factory.clone();
1182 let mut cache = cache.clone();
1183 handles.push(tokio::spawn(async move {
1184 let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1185 assert_eq!(value, 1);
1186 }));
1187 }
1188 join_all(handles).await;
1189 }
1190
1191 #[tokio::test]
1192 async fn test_failsafe_hits_when_there_is_an_entry_in_failsafe_cache() {
1193 let factory = FallibleTestFactory::new();
1194 let mut cache = FusionCacheBuilder::new()
1195 .with_fail_safe(
1196 Duration::from_secs(10),
1197 Duration::from_secs(5),
1198 Some(3),
1199 None,
1200 )
1201 .build()
1202 .await
1203 .unwrap();
1204
1205 let value = cache.get_or_set(1, factory.clone(), None).await;
1206 assert_eq!(1, value.unwrap());
1207
1208 cache.evict(1).await;
1209
1210 let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1211 assert_eq!(1, failsafe_value.unwrap())
1212 }
1213
1214 #[tokio::test]
1215 async fn test_failsafe_hits_when_there_is_an_entry_in_failsafe_cache_multithreaded() {
1216 let factory = FallibleTestFactory::new();
1217 let mut cache = FusionCacheBuilder::new()
1218 .with_fail_safe(
1219 Duration::from_secs(10),
1220 Duration::from_secs(5),
1221 Some(3),
1222 None,
1223 )
1224 .build()
1225 .await
1226 .unwrap();
1227
1228 let value = cache.get_or_set(1, factory.clone(), None).await;
1229 assert_eq!(1, value.unwrap());
1230
1231 cache.evict(1).await;
1232
1233 let mut handles = vec![];
1234 for _ in 0..100000 {
1235 let factory = factory.clone();
1236 let mut cache = cache.clone();
1237 handles.push(tokio::spawn(async move {
1238 let value = cache.get_or_set(1, factory.clone(), None).await.unwrap();
1239 assert_eq!(value, 1);
1240 }));
1241 }
1242 join_all(handles).await;
1243 }
1244
1245 #[tokio::test]
1246 async fn test_failsafe_misses_when_there_is_no_entry_in_failsafe_cache() {
1247 let factory = FallibleTestFactory::new();
1248 let mut cache = FusionCacheBuilder::new()
1249 .with_fail_safe(
1250 Duration::from_secs(2),
1251 Duration::from_secs(1),
1252 Some(3),
1253 None,
1254 )
1255 .build()
1256 .await
1257 .unwrap();
1258
1259 let value = cache.get_or_set(1, factory.clone(), None).await;
1260 assert_eq!(1, value.unwrap());
1261
1262 cache.evict(1).await;
1263 tokio::time::sleep(Duration::from_secs(3)).await;
1264
1265 let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1266 assert!(failsafe_value.is_err());
1267 }
1268
1269 #[tokio::test]
1270 async fn test_failsafe_misses_when_there_is_no_entry_in_failsafe_cache_multithreaded() {
1271 let factory = FallibleTestFactory::new();
1272 let mut cache = FusionCacheBuilder::new()
1273 .with_fail_safe(
1274 Duration::from_secs(2),
1275 Duration::from_secs(1),
1276 Some(3),
1277 None,
1278 )
1279 .build()
1280 .await
1281 .unwrap();
1282
1283 let value = cache.get_or_set(1, factory.clone(), None).await;
1284 assert_eq!(1, value.unwrap());
1285
1286 cache.evict(1).await;
1287 tokio::time::sleep(Duration::from_secs(3)).await;
1288
1289 let mut handles = vec![];
1290 for _ in 0..100000 {
1291 let factory = factory.clone();
1292 let mut cache = cache.clone();
1293 handles.push(tokio::spawn(async move {
1294 let value = cache.get_or_set(1, factory.clone(), None).await;
1295 assert!(value.is_err());
1296 }));
1297 }
1298 join_all(handles).await;
1299 }
1300
1301 #[tokio::test]
1302 async fn test_failsafe_misses_when_the_maximum_number_of_cycles_is_reached() {
1303 let factory = FallibleTestFactory::new();
1304 let mut cache = FusionCacheBuilder::new()
1305 .with_fail_safe(
1306 Duration::from_secs(15),
1307 Duration::from_secs(1),
1308 Some(2),
1309 None,
1310 )
1311 .build()
1312 .await
1313 .unwrap();
1314
1315 let value = cache.get_or_set(1, factory.clone(), None).await;
1316 assert_eq!(1, value.unwrap());
1317
1318 cache.evict(1).await;
1319
1320 let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1321 assert_eq!(1, failsafe_value.unwrap());
1322
1323 tokio::time::sleep(Duration::from_secs(2)).await;
1324
1325 let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1326 assert_eq!(1, failsafe_value.unwrap());
1327
1328 tokio::time::sleep(Duration::from_secs(2)).await;
1329
1330 let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1331 assert!(failsafe_value.is_err());
1332 }
1333
1334 #[tokio::test]
1335 async fn test_failsafe_misses_when_the_maximum_number_of_cycles_is_reached_multithreaded() {
1336 let factory = FallibleTestFactory::new();
1337 let mut cache = FusionCacheBuilder::new()
1338 .with_fail_safe(
1339 Duration::from_secs(15),
1340 Duration::from_secs(1),
1341 Some(2),
1342 None,
1343 )
1344 .build()
1345 .await
1346 .unwrap();
1347
1348 let value = cache.get_or_set(1, factory.clone(), None).await;
1349 assert_eq!(1, value.unwrap());
1350
1351 cache.evict(1).await;
1352
1353 let mut handles = vec![];
1354 for _ in 0..100000 {
1355 let factory = factory.clone();
1356 let mut cache = cache.clone();
1357 handles.push(tokio::spawn(async move {
1358 let value = cache.get_or_set(1, factory.clone(), None).await;
1359 assert_eq!(1, value.unwrap());
1360 }));
1361 }
1362 join_all(handles).await;
1363
1364 tokio::time::sleep(Duration::from_secs(2)).await;
1365
1366 let mut handles = vec![];
1367 for _ in 0..100000 {
1368 let factory = factory.clone();
1369 let mut cache = cache.clone();
1370 handles.push(tokio::spawn(async move {
1371 let value = cache.get_or_set(1, factory.clone(), None).await;
1372 assert_eq!(1, value.unwrap());
1373 }));
1374 }
1375 join_all(handles).await;
1376
1377 let mut handles = vec![];
1378 for _ in 0..100000 {
1379 let factory = factory.clone();
1380 let mut cache = cache.clone();
1381 handles.push(tokio::spawn(async move {
1382 let value = cache.get_or_set(1, factory.clone(), None).await;
1383 assert!(value.is_err());
1384 }));
1385 }
1386 join_all(handles).await;
1387 }
1388
1389 #[tokio::test]
1390 async fn test_soft_timeout_with_failsafe_hit() {
1391 let factory = SlowTestFactory::new(Duration::from_secs(5), 1);
1392 let mut cache = FusionCacheBuilder::new()
1393 .with_fail_safe(
1394 Duration::from_secs(10), Duration::from_secs(5), Some(3), Some(Duration::from_secs(1)), )
1399 .build()
1400 .await
1401 .unwrap();
1402
1403 let value = cache.get_or_set(1, factory.clone(), None).await;
1405 assert_eq!(1, value.unwrap());
1406
1407 let start = std::time::Instant::now();
1409 let value = cache.get_or_set(1, factory.clone(), None).await;
1410 let elapsed = start.elapsed();
1411
1412 assert_eq!(1, value.unwrap());
1413 assert!(
1414 elapsed < Duration::from_secs(5),
1415 "Should return before factory completes"
1416 );
1417 }
1418
1419 #[tokio::test]
1420 async fn test_hard_timeout_triggers() {
1421 let factory = SlowTestFactory::new(Duration::from_secs(5), 0);
1422 let mut cache = FusionCacheBuilder::new()
1423 .with_hard_timeout(Duration::from_secs(2))
1424 .build()
1425 .await
1426 .unwrap();
1427
1428 let result = cache.get_or_set(1, factory, None).await;
1429 assert!(matches!(result, Err(FusionCacheError::FactoryTimeout)));
1430 }
1431
1432 #[tokio::test]
1433 async fn test_background_execution_continues_after_failsafe() {
1434 let factory = SlowTestFactory::new(Duration::from_secs(3), 1);
1435 let mut cache = FusionCacheBuilder::new()
1436 .with_fail_safe(
1437 Duration::from_secs(10),
1438 Duration::from_secs(5),
1439 Some(3),
1440 Some(Duration::from_secs(1)),
1441 )
1442 .set_factory_background_execution(true)
1443 .build()
1444 .await
1445 .unwrap();
1446
1447 let value = cache.get_or_set(1, factory.clone(), None).await;
1449 assert_eq!(1, value.unwrap());
1450
1451 cache.evict(1).await;
1453 let value = cache.get_or_set(1, factory.clone(), None).await;
1454 assert_eq!(1, value.unwrap());
1455
1456 tokio::time::sleep(Duration::from_secs(4)).await;
1458
1459 let cached_value = cache.get(1).await;
1461 assert_eq!(
1462 Some(2),
1463 cached_value,
1464 "Cache should be updated by background task"
1465 );
1466 }
1467
1468 #[tokio::test]
1469 async fn test_background_execution_does_not_continue_after_failsafe() {
1470 let factory = SlowTestFactory::new(Duration::from_secs(3), 1);
1471 let mut cache = FusionCacheBuilder::new()
1472 .with_fail_safe(
1473 Duration::from_secs(10),
1474 Duration::from_secs(5),
1475 Some(3),
1476 Some(Duration::from_secs(1)),
1477 )
1478 .set_factory_background_execution(false)
1479 .build()
1480 .await
1481 .unwrap();
1482
1483 let value = cache.get_or_set(1, factory.clone(), None).await;
1485 assert_eq!(1, value.unwrap());
1486
1487 cache.evict(1).await;
1489 let value = cache.get_or_set(1, factory.clone(), None).await;
1490 assert_eq!(1, value.unwrap());
1491
1492 tokio::time::sleep(Duration::from_secs(4)).await;
1494
1495 let cached_value = cache.get(1).await;
1497 assert_eq!(
1498 None, cached_value,
1499 "Cache should not be updated by background task"
1500 );
1501 }
1502
1503 #[tokio::test]
1504 async fn test_entry_is_evicted_after_cache_ttl_expires() {
1505 let factory = TestFactory::new();
1506 let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1507 .with_time_to_live(Duration::from_secs(2))
1508 .build()
1509 .await
1510 .unwrap();
1511
1512 let value = cache.get_or_set(1, factory.clone(), None).await;
1513 assert_eq!(1, value.unwrap());
1514
1515 tokio::time::sleep(Duration::from_secs(3)).await;
1516
1517 let cached_value = cache.get(1).await;
1518 assert_eq!(None, cached_value);
1519 }
1520
1521 #[tokio::test]
1522 async fn test_entry_is_evicted_after_entry_ttl_expires() {
1523 let factory = TestFactory::new();
1524 let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1525 .with_time_to_live(Duration::from_secs(10))
1526 .build()
1527 .await
1528 .unwrap();
1529
1530 let options = FusionCacheOptionsBuilder::new()
1531 .with_time_to_live(Duration::from_secs(2))
1532 .build();
1533
1534 let value = cache.get_or_set(1, factory.clone(), Some(options)).await;
1535 assert_eq!(1, value.unwrap());
1536
1537 tokio::time::sleep(Duration::from_secs(3)).await;
1538
1539 let cached_value = cache.get(1).await;
1540 assert_eq!(None, cached_value);
1541 }
1542
1543 #[derive(Clone, Debug)]
1544 struct SlowTestFactory {
1545 counter: Arc<Mutex<u32>>,
1546 delay: Duration,
1547 delay_after_n_invocations: u32,
1548 }
1549
1550 impl SlowTestFactory {
1551 fn new(delay: Duration, delay_after_n_invocations: u32) -> Self {
1552 Self {
1553 counter: Arc::new(Mutex::new(0)),
1554 delay_after_n_invocations,
1555 delay,
1556 }
1557 }
1558 }
1559
1560 #[async_trait]
1561 impl Factory<u32, u32, TestFactoryError> for SlowTestFactory {
1562 async fn get(&self, _: &u32) -> Result<u32, TestFactoryError> {
1563 let mut counter = self.counter.lock().await;
1564 if *counter >= self.delay_after_n_invocations {
1565 tokio::time::sleep(self.delay).await;
1566 }
1567 *counter += 1;
1568 Ok(*counter)
1569 }
1570 }
1571
1572 #[derive(Clone, Debug)]
1573 struct TestFactory {
1574 counter: Arc<Mutex<u32>>,
1575 }
1576
1577 impl TestFactory {
1578 fn new() -> Self {
1579 Self {
1580 counter: Arc::new(Mutex::new(0)),
1581 }
1582 }
1583 }
1584
1585 #[async_trait]
1586 impl Factory<u32, u32, TestFactoryError> for TestFactory {
1587 async fn get(&self, _: &u32) -> Result<u32, TestFactoryError> {
1588 let mut counter = self.counter.lock().await;
1589 tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
1590 *counter += 1;
1591 Ok(*counter)
1592 }
1593 }
1594
1595 #[derive(Clone, Debug)]
1596 struct TestFactoryError;
1597
1598 impl Display for TestFactoryError {
1599 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1600 write!(f, "TestFactoryError")
1601 }
1602 }
1603
1604 impl Into<FusionCacheError> for TestFactoryError {
1605 fn into(self) -> FusionCacheError {
1606 FusionCacheError::Other
1607 }
1608 }
1609
1610 #[derive(Clone, Debug)]
1611 struct FallibleTestFactory {
1612 counter: Arc<Mutex<u32>>,
1613 }
1614
1615 impl FallibleTestFactory {
1616 pub fn new() -> Self {
1617 Self {
1618 counter: Arc::new(Mutex::new(0)),
1619 }
1620 }
1621 }
1622
1623 #[async_trait]
1624 impl Factory<u32, u32, TestFactoryError> for FallibleTestFactory {
1625 async fn get(&self, _: &u32) -> Result<u32, TestFactoryError> {
1626 let mut counter = self.counter.lock().await;
1627 *counter += 1;
1628 if *counter == 1 {
1629 Ok(1)
1630 } else {
1631 Err(TestFactoryError)
1632 }
1633 }
1634 }
1635
1636 #[tokio::test]
1637 async fn test_distributed_cache_with_redis() {
1638 let factory = TestFactory::new();
1639 let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1640 .with_redis(
1641 "redis://127.0.0.1/".to_string(),
1642 "test_distributed_cache_with_redis".to_string(),
1643 false,
1644 None,
1645 )
1646 .build()
1647 .await
1648 .unwrap();
1649
1650 let key = 1;
1652 let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1653 assert_eq!(value, 1);
1654
1655 let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1657 assert_eq!(value, 1);
1658 }
1659
1660 #[tokio::test]
1661 async fn test_distributed_cache_synchronization() {
1662 let factory = TestFactory::new();
1663 let mut cache1: FusionCache<u32, u32> = FusionCacheBuilder::new()
1664 .with_redis(
1665 "redis://127.0.0.1/".to_string(),
1666 "test_distributed_cache_synchronization".to_string(),
1667 false,
1668 None,
1669 )
1670 .build()
1671 .await
1672 .unwrap();
1673
1674 let mut cache2: FusionCache<u32, u32> = FusionCacheBuilder::new()
1675 .with_redis(
1676 "redis://127.0.0.1/".to_string(),
1677 "test_distributed_cache_synchronization".to_string(),
1678 false,
1679 None,
1680 )
1681 .build()
1682 .await
1683 .unwrap();
1684
1685 let key = 1;
1687 let value = cache1.get_or_set(key, factory.clone(), None).await.unwrap();
1688 assert_eq!(value, 1);
1689
1690 tokio::time::sleep(Duration::from_millis(100)).await;
1692
1693 let value = cache2.get(key).await.unwrap();
1695 assert_eq!(value, 1);
1696 }
1697
1698 #[tokio::test]
1699 async fn test_distributed_cache_with_redis_connection_failure() {
1700 let factory = TestFactory::new();
1701 let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1702 .with_redis(
1703 "redis://127.0.0.1/".to_string(),
1704 "test_distributed_cache_with_redis_connection_failure".to_string(),
1705 false,
1706 None,
1707 )
1708 .build()
1709 .await
1710 .unwrap();
1711
1712 let key = 1;
1714 let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1715 assert_eq!(value, 1);
1716
1717 if let Some(distributed_cache) = &mut cache.distributed_cache {
1719 distributed_cache.break_connection();
1720 }
1721
1722 cache.set(key, 2, None).await;
1723
1724 if let Some(distributed_cache) = &mut cache.distributed_cache {
1726 distributed_cache.restore_connection();
1727 }
1728
1729 tokio::time::sleep(Duration::from_secs(2)).await;
1731
1732 let value = cache.get(key).await;
1734 assert_eq!(value, None);
1735 }
1736
1737 #[tokio::test]
1738 async fn test_distributed_cache_with_redis_and_failsafe() {
1739 let factory = FallibleTestFactory::new();
1740 let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1741 .with_redis(
1742 "redis://127.0.0.1/".to_string(),
1743 "test_distributed_cache_with_redis_and_failsafe".to_string(),
1744 false,
1745 None,
1746 )
1747 .with_fail_safe(
1748 Duration::from_secs(10),
1749 Duration::from_secs(5),
1750 Some(3),
1751 None,
1752 )
1753 .build()
1754 .await
1755 .unwrap();
1756
1757 let key = 1;
1759 let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1760 assert_eq!(value, 1);
1761
1762 if let Some(distributed_cache) = &mut cache.distributed_cache {
1764 distributed_cache.break_connection();
1765 }
1766
1767 cache.set(key, 2, None).await;
1769
1770 if let Some(distributed_cache) = &mut cache.distributed_cache {
1772 distributed_cache.restore_connection();
1773 }
1774
1775 tokio::time::sleep(Duration::from_secs(2)).await;
1777
1778 let value = cache.get(key).await;
1780 assert_eq!(value, None);
1781 }
1782 struct TraceIdWrapper<F> {
1783 inner: F,
1784 }
1785
1786 impl<S, N, F> FormatEvent<S, N> for TraceIdWrapper<F>
1787 where
1788 S: Subscriber + for<'a> LookupSpan<'a>,
1789 N: for<'a> format::FormatFields<'a> + 'static,
1790 F: FormatEvent<S, N>,
1791 {
1792 fn format_event(
1793 &self,
1794 ctx: &FmtContext<'_, S, N>,
1795 mut writer: format::Writer<'_>,
1796 event: &Event<'_>,
1797 ) -> std::fmt::Result {
1798 let trace_id = ctx.lookup_current().and_then(|span| {
1800 span.scope()
1801 .from_root()
1802 .next() .map(|s| s.id())
1804 });
1805
1806 if let Some(id) = trace_id {
1807 write!(writer, "trace_id={} ", id.into_u64())?;
1809 }
1810
1811 self.inner.format_event(ctx, writer, event)
1813 }
1814 }
1815}