fusioncache_rs/
lib.rs

1use async_trait::async_trait;
2use distributed_cache::{DistributedCache, RedisConnection};
3use failsafe::{FailSafeCache, FailSafeConfiguration, FailSafeResult};
4use moka::{Expiry, future::Cache};
5use serde::{Serialize, de::DeserializeOwned};
6use std::{
7    collections::HashMap,
8    fmt::{Debug, Display},
9    hash::Hash,
10    marker::PhantomData,
11    sync::Arc,
12    time::Duration,
13};
14use tokio::{
15    select,
16    sync::{Mutex, broadcast},
17    task::{self, JoinHandle},
18};
19use tracing::{debug, error, info, warn};
20
21mod distributed_cache;
22mod failsafe;
23
24const LOG_TARGET: &str = "fusioncache";
25
26pub(crate) enum Source<TValue: Clone + Send + Sync + 'static> {
27    Factory(TValue),
28    Failsafe(TValue),
29}
30
31impl<TValue: Clone + Send + Sync + 'static> Source<TValue> {
32    pub(crate) fn value(&self) -> TValue {
33        match self {
34            Self::Factory(value) => value.clone(),
35            Self::Failsafe(value) => value.clone(),
36        }
37    }
38}
39
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub(crate) struct CacheValue<TValue: Clone + Send + Sync + 'static> {
42    value: TValue,
43    time_to_live: Option<std::time::Duration>,
44    time_to_idle: Option<std::time::Duration>,
45}
46
47impl<TValue: Clone + Send + Sync + 'static> CacheValue<TValue> {
48    pub fn new(
49        value: TValue,
50        time_to_live: Option<std::time::Duration>,
51        time_to_idle: Option<std::time::Duration>,
52    ) -> Self {
53        Self {
54            value,
55            time_to_live,
56            time_to_idle,
57        }
58    }
59}
60
61pub(crate) struct CacheValueExpiry;
62
63impl<TKey: Clone + Send + Sync + 'static, TValue: Clone + Send + Sync + 'static>
64    Expiry<TKey, CacheValue<TValue>> for CacheValueExpiry
65{
66    fn expire_after_create(
67        &self,
68        _key: &TKey,
69        value: &CacheValue<TValue>,
70        _created_at: std::time::Instant,
71    ) -> Option<Duration> {
72        value.time_to_live
73    }
74
75    fn expire_after_read(
76        &self,
77        _key: &TKey,
78        value: &CacheValue<TValue>,
79        read_at: std::time::Instant,
80        _duration_until_expiry: Option<Duration>,
81        last_modified_at: std::time::Instant,
82    ) -> Option<Duration> {
83        value.time_to_idle.map(|time_to_idle| {
84            let time_since_last_modified = read_at.duration_since(last_modified_at);
85            time_to_idle - time_since_last_modified
86        })
87    }
88
89    fn expire_after_update(
90        &self,
91        _key: &TKey,
92        value: &CacheValue<TValue>,
93        _updated_at: std::time::Instant,
94        _duration_until_expiry: Option<Duration>,
95    ) -> Option<Duration> {
96        value.time_to_live
97    }
98}
99
100#[derive(Clone, Debug)]
101pub struct FusionCacheOptions {
102    entry_ttl: Option<std::time::Duration>,
103    ignore_ttl: bool,
104    entry_tti: Option<std::time::Duration>,
105    ignore_tti: bool,
106    fail_safe_configuration: Option<FailSafeConfiguration>,
107    skip_fail_safe_cache: bool,
108    hard_timeout: Option<std::time::Duration>,
109    skip_hard_timeout: bool,
110    skip_distributed_cache: bool,
111    should_redis_writes_happen_in_background: Option<bool>,
112}
113
114pub struct FusionCacheOptionsBuilder {
115    entry_ttl: Option<std::time::Duration>,
116    ignore_ttl: bool,
117    entry_tti: Option<std::time::Duration>,
118    ignore_tti: bool,
119    fail_safe_configuration: Option<FailSafeConfiguration>,
120    skip_fail_safe_cache: bool,
121    hard_timeout: Option<std::time::Duration>,
122    skip_hard_timeout: bool,
123    skip_distributed_cache: bool,
124    should_redis_writes_happen_in_background: Option<bool>,
125}
126
127impl FusionCacheOptionsBuilder {
128    pub fn new() -> Self {
129        Self {
130            entry_ttl: None,
131            ignore_ttl: false,
132            entry_tti: None,
133            ignore_tti: false,
134            fail_safe_configuration: None,
135            skip_fail_safe_cache: false,
136            hard_timeout: None,
137            skip_hard_timeout: false,
138            skip_distributed_cache: false,
139            should_redis_writes_happen_in_background: None,
140        }
141    }
142
143    /// Sets the time-to-live for the cache.
144    ///
145    /// # Arguments
146    ///
147    /// * `time_to_live` - The time-to-live for the cache.
148    pub fn with_time_to_live(mut self, time_to_live: std::time::Duration) -> Self {
149        self.entry_ttl = Some(time_to_live);
150        self
151    }
152
153    /// Ignores the time-to-live for the cache.
154    pub fn ignore_time_to_live(mut self) -> Self {
155        self.ignore_ttl = true;
156        self
157    }
158
159    /// Sets the time-to-idle for the cache.
160    ///
161    /// # Arguments
162    ///
163    /// * `time_to_idle` - The time-to-idle for the cache.
164    pub fn with_time_to_idle(mut self, time_to_idle: std::time::Duration) -> Self {
165        self.entry_tti = Some(time_to_idle);
166        self
167    }
168
169    /// Ignores the time-to-idle for the cache.
170    pub fn ignore_time_to_idle(mut self) -> Self {
171        self.ignore_tti = true;
172        self
173    }
174
175    /// Sets the fail-safe configuration for the cache.
176    ///
177    /// # Arguments
178    ///
179    /// * `entry_ttl` - The time-to-live for the fail-safe cache.
180    /// * `failsafe_cycle_ttl` - The time-to-live for the fail-safe cycle.
181    /// * `max_cycles` - The maximum number of fail-safe cycles.
182    /// * `soft_timeout` - The soft timeout for the fail-safe cache.
183    pub fn with_fail_safe(
184        mut self,
185        entry_ttl: std::time::Duration,
186        failsafe_cycle_ttl: std::time::Duration,
187        max_cycles: Option<u64>,
188        soft_timeout: Option<std::time::Duration>,
189    ) -> Self {
190        self.fail_safe_configuration = Some(FailSafeConfiguration::new(
191            entry_ttl,
192            failsafe_cycle_ttl,
193            max_cycles,
194            soft_timeout,
195        ));
196        self
197    }
198
199    /// Skips the fail-safe cache for the cache.
200    pub fn skip_fail_safe_cache(mut self) -> Self {
201        self.skip_fail_safe_cache = true;
202        self
203    }
204
205    /// Sets the hard timeout for the cache.
206    ///
207    /// # Arguments
208    ///
209    /// * `hard_timeout` - The hard timeout for the cache.
210    pub fn with_hard_timeout(mut self, hard_timeout: std::time::Duration) -> Self {
211        self.hard_timeout = Some(hard_timeout);
212        self
213    }
214
215    /// Skips the hard timeout for the cache.
216    pub fn skip_hard_timeout(mut self) -> Self {
217        self.skip_hard_timeout = true;
218        self
219    }
220
221    /// Skips the distributed cache for the cache.
222    pub fn skip_distributed_cache(mut self) -> Self {
223        self.skip_distributed_cache = true;
224        self
225    }
226
227    pub fn build(self) -> FusionCacheOptions {
228        FusionCacheOptions {
229            entry_ttl: self.entry_ttl,
230            ignore_ttl: self.ignore_ttl,
231            entry_tti: self.entry_tti,
232            ignore_tti: self.ignore_tti,
233            fail_safe_configuration: self.fail_safe_configuration,
234            skip_fail_safe_cache: self.skip_fail_safe_cache,
235            hard_timeout: self.hard_timeout,
236            skip_hard_timeout: self.skip_hard_timeout,
237            skip_distributed_cache: self.skip_distributed_cache,
238            should_redis_writes_happen_in_background: self.should_redis_writes_happen_in_background,
239        }
240    }
241}
242
243impl FusionCacheOptions {
244    fn with_overrides(self, overrides: Option<FusionCacheOptions>) -> FusionCacheOptions {
245        if let Some(overrides) = overrides {
246            FusionCacheOptions {
247                fail_safe_configuration: if let Some(fsc_override) =
248                    overrides.fail_safe_configuration
249                {
250                    Some(fsc_override)
251                } else if !overrides.skip_fail_safe_cache {
252                    self.fail_safe_configuration
253                } else {
254                    None
255                },
256                hard_timeout: if let Some(ht_override) = overrides.hard_timeout {
257                    Some(ht_override)
258                } else if !overrides.skip_hard_timeout {
259                    self.hard_timeout
260                } else {
261                    None
262                },
263                entry_ttl: if let Some(ttl_override) = overrides.entry_ttl {
264                    Some(ttl_override)
265                } else if !overrides.ignore_ttl {
266                    self.entry_ttl
267                } else {
268                    None
269                },
270                entry_tti: if let Some(tti_override) = overrides.entry_tti {
271                    Some(tti_override)
272                } else if !overrides.ignore_tti {
273                    self.entry_tti
274                } else {
275                    None
276                },
277                skip_distributed_cache: overrides.skip_distributed_cache,
278                ignore_ttl: overrides.ignore_ttl,
279                ignore_tti: overrides.ignore_tti,
280                should_redis_writes_happen_in_background: if let Some(
281                    should_redis_writes_happen_in_background,
282                ) =
283                    overrides.should_redis_writes_happen_in_background
284                {
285                    Some(should_redis_writes_happen_in_background)
286                } else {
287                    self.should_redis_writes_happen_in_background
288                },
289                skip_fail_safe_cache: overrides.skip_fail_safe_cache,
290                skip_hard_timeout: overrides.skip_hard_timeout,
291            }
292        } else {
293            FusionCacheOptions { ..self }
294        }
295    }
296}
297
298#[async_trait]
299/// A trait for implementing value factories that can be used with `FusionCache`.
300///
301/// The factory is responsible for retrieving or generating values when they're not in the cache.
302/// It must be cloneable and thread-safe since it might be called from multiple tasks.
303///
304/// # Type Parameters
305/// * `TKey` - The type of keys used in the cache
306/// * `TValue` - The type of values stored in the cache
307/// * `TError` - The type of errors that can occur during value generation
308pub trait Factory<
309    TKey: Clone + Send + Sync + 'static,
310    TValue: Clone + Send + Sync + 'static,
311    TError: Clone + Send + Sync + 'static,
312>: Send + Sync + Clone + Debug + 'static
313{
314    /// Retrieves or generates a value for the given key.
315    ///
316    /// This method is called when a value is not found in the cache.
317    /// It should implement the logic to fetch or generate the value.
318    /// # Arguments
319    /// * `key` - The key for which to retrieve or generate the value
320    async fn get(&self, key: &TKey) -> Result<TValue, TError>;
321}
322
323pub struct FusionCacheBuilder<
324    TKey: Hash + Eq + Send + Sync + Clone + Serialize + DeserializeOwned + 'static,
325    TValue: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
326> {
327    capacity: u64,
328    fail_safe_configuration: Option<FailSafeConfiguration>,
329    hard_timeout: Option<std::time::Duration>,
330    redis_address: Option<String>,
331    application_name: Option<String>,
332    should_factory_execute_in_background: bool,
333    should_redis_writes_happen_in_background: bool,
334    phantom_t_key: PhantomData<TKey>,
335    phantom_t_value: PhantomData<TValue>,
336    entry_ttl: Option<std::time::Duration>,
337    entry_tti: Option<std::time::Duration>,
338    distributed_cache_entry_ttl: Option<Duration>,
339}
340
341impl<
342    TKey: Hash + Eq + Send + Sync + Clone + Serialize + DeserializeOwned + Debug + 'static,
343    TValue: Clone + Send + Sync + Serialize + DeserializeOwned + Debug + 'static,
344> FusionCacheBuilder<TKey, TValue>
345{
346    pub fn new() -> Self {
347        Self {
348            capacity: 1000,
349            fail_safe_configuration: None,
350            should_factory_execute_in_background: false,
351            phantom_t_key: PhantomData,
352            phantom_t_value: PhantomData,
353            hard_timeout: None,
354            redis_address: None,
355            application_name: None,
356            should_redis_writes_happen_in_background: false,
357            entry_ttl: None,
358            entry_tti: None,
359            distributed_cache_entry_ttl: None,
360        }
361    }
362
363    /// Sets the maximum capacity of the cache.
364    /// This is the maximum number of entries the cache can hold before it starts evicting old entries.
365    /// The default capacity is 1000 entries.
366    /// # Arguments
367    /// * `capacity` - The maximum number of entries the cache can hold
368    pub fn with_capacity(mut self, capacity: u64) -> Self {
369        self.capacity = capacity;
370        self
371    }
372
373    /// Sets the time-to-live for entries in the cache.
374    /// # Arguments
375    /// * `time_to_live` - The time-to-live for entries in the cache
376    pub fn with_time_to_live(mut self, time_to_live: std::time::Duration) -> Self {
377        self.entry_ttl = Some(time_to_live);
378        self
379    }
380
381    /// Sets the time-to-idle for entries in the cache.
382    /// # Arguments
383    /// * `time_to_idle` - The time-to-idle for entries in the cache
384    pub fn with_time_to_idle(mut self, time_to_idle: std::time::Duration) -> Self {
385        self.entry_tti = Some(time_to_idle);
386        self
387    }
388
389    /// Configures the fail-safe mechanism for the cache.
390    /// This allows the cache to handle situations where the factory is slow or fails to respond.
391    /// # Arguments
392    /// * `entry_ttl` - The time-to-live for entries in the fail-safe cache
393    /// * `failsafe_cycle_ttl` - The time-to-live for a fail-safe cycle
394    /// * `max_cycles` - The maximum number of cycles the fail-safe cache can go through before giving up
395    /// * `soft_timeout` - An optional soft timeout for factory operations, after which the cache will try to use the fail-safe cache
396    pub fn with_fail_safe(
397        mut self,
398        entry_ttl: std::time::Duration,
399        failsafe_cycle_ttl: std::time::Duration,
400        max_cycles: Option<u64>,
401        soft_timeout: Option<std::time::Duration>,
402    ) -> Self {
403        self.fail_safe_configuration = Some(FailSafeConfiguration::new(
404            entry_ttl,
405            failsafe_cycle_ttl,
406            max_cycles,
407            soft_timeout,
408        ));
409        self
410    }
411
412    /// Sets a hard timeout for factory operations.
413    /// If the factory does not return a value within this duration, it will return a `FusionCacheError::FactoryTimeout`.
414    ///
415    /// # Arguments
416    /// * `timeout` - The maximum duration to wait for the factory to return a value
417    pub fn with_hard_timeout(mut self, timeout: std::time::Duration) -> Self {
418        self.hard_timeout = Some(timeout);
419        self
420    }
421
422    /// Configures whether the factory should execute in the background after a soft timeout.
423    /// If set to `true`, the factory will continue to run in the background even after a soft timeout.
424    ///
425    /// # Arguments
426    /// * `should_execute_in_background` - Whether the factory should execute in the background
427    pub fn set_factory_background_execution(mut self, should_execute_in_background: bool) -> Self {
428        self.should_factory_execute_in_background = should_execute_in_background;
429        self
430    }
431
432    /// Configures Redis as a level 2 cache.
433    /// If `use_as_distributed_cache` is true, it will also be used as a distributed cache, that is, Redis will also be used as a backplane for the cache.
434    ///
435    /// # Arguments
436    /// * `address` - The address of the Redis server.
437    /// * `application_name` - The name of the application.
438    /// * `should_writes_happen_in_background` - Whether the writes to Redis should happen in the background.
439    /// * `entry_ttl` - The time-to-live for entries in the Redis cache.
440    pub fn with_redis(
441        mut self,
442        address: String,
443        application_name: String,
444        should_writes_happen_in_background: bool,
445        entry_ttl: Option<std::time::Duration>,
446    ) -> Self {
447        self.redis_address = Some(address);
448        self.application_name = Some(application_name);
449        self.should_redis_writes_happen_in_background = should_writes_happen_in_background;
450        self.distributed_cache_entry_ttl = entry_ttl;
451        self
452    }
453
454    pub async fn build(self) -> Result<FusionCache<TKey, TValue>, FusionCacheError> {
455        let (distributed_cache_eviction_sender, mut distributed_cache_eviction_receiver) =
456            tokio::sync::mpsc::channel(1000000);
457        let distributed_cache = if let Some(address) = self.redis_address {
458            debug!(target: LOG_TARGET, "Building distributed cache with Redis address: {}", address);
459            let redis_client = redis::Client::open(address).unwrap();
460            let inner_redis_connection = redis_client
461                .get_multiplexed_async_connection()
462                .await
463                .map_err(|e| FusionCacheError::InitializationError(e.to_string()))?;
464            let redis_connection = RedisConnection::new(inner_redis_connection);
465            let mut distributed_cache = DistributedCache::new(
466                redis_connection,
467                redis_client,
468                distributed_cache_eviction_sender,
469                self.application_name.unwrap(),
470                self.distributed_cache_entry_ttl,
471            );
472            distributed_cache
473                .start_synchronization()
474                .await
475                .map_err(|e| FusionCacheError::InitializationError(e.to_string()))?;
476            Some(distributed_cache)
477        } else {
478            debug!(target: LOG_TARGET, "No distributed cache configured");
479            None
480        };
481        let cache_builder = Cache::builder()
482            .expire_after(CacheValueExpiry)
483            .max_capacity(self.capacity);
484        let cache: Cache<TKey, CacheValue<TValue>> = cache_builder.build();
485        let fail_safe_cache: Option<FailSafeCache<TKey, CacheValue<TValue>>> =
486            if let Some(fail_safe_configuration) = self.fail_safe_configuration {
487                debug!(
488                    target: LOG_TARGET,
489                    "Setting fail-safe cache with configuration: {:?}",
490                    fail_safe_configuration
491                );
492                Some(FailSafeCache::new(fail_safe_configuration))
493            } else {
494                debug!(target: LOG_TARGET, "No fail-safe cache configuration set for cache");
495                None
496            };
497        let _cache = cache.clone();
498        let mut _fail_safe_cache = fail_safe_cache.clone();
499        let cache_synchronization_task = if let Some(_) = &distributed_cache {
500            Some(task::spawn(async move {
501                while let Some(key) = distributed_cache_eviction_receiver.recv().await {
502                    if let Some(_) = _cache.get(&key).await {
503                        _cache.invalidate(&key).await;
504                    }
505                    if let Some(fsc) = &mut _fail_safe_cache {
506                        fsc.invalidate(&key).await;
507                    }
508                }
509            }))
510        } else {
511            None
512        };
513        Ok(FusionCache {
514            cache,
515            time_to_live: self.entry_ttl,
516            time_to_idle: self.entry_tti,
517            should_factory_execute_in_background: self.should_factory_execute_in_background,
518            hard_timeout: self.hard_timeout,
519            fail_safe_cache: fail_safe_cache.clone(),
520            in_flight_factory_requests: Arc::new(Mutex::new(HashMap::new())),
521            distributed_cache,
522            should_redis_writes_happen_in_background: self.should_redis_writes_happen_in_background,
523            _cache_synchronization_task: cache_synchronization_task
524                .map(|t| Arc::new(Mutex::new(t))),
525            background_factory_tasks: Arc::new(Mutex::new(HashMap::new())),
526        })
527    }
528}
529
530/// Errors that can occur during cache operations.
531#[derive(Debug, Clone)]
532pub enum FusionCacheError {
533    /// A generic error occurred
534    Other,
535    /// System is in an inconsistent state (e.g., in-flight request tracking corrupted)
536    SystemCorruption,
537    /// The factory failed to generate a value
538    FactoryError(String),
539    /// The factory operation timed out
540    FactoryTimeout,
541    /// An error occurred during initialization of the cache
542    InitializationError(String),
543    /// An error occurred during Redis operations
544    RedisError(String),
545}
546
547impl Display for FusionCacheError {
548    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
549        match self {
550            FusionCacheError::Other => write!(f, "Other"),
551            FusionCacheError::SystemCorruption => write!(f, "SystemCorruption"),
552            FusionCacheError::FactoryError(error_message) => write!(f, "{}", error_message),
553            FusionCacheError::FactoryTimeout => write!(f, "FactoryTimeout"),
554            FusionCacheError::InitializationError(error_message) => {
555                write!(f, "{}", error_message)
556            }
557            FusionCacheError::RedisError(error_message) => {
558                write!(f, "{}", error_message)
559            }
560        }
561    }
562}
563
564/// The main struct of the fusioncache-rs library.
565///
566/// This struct is used to create a cache that can be used to store and retrieve values.
567#[derive(Clone, Debug)]
568pub struct FusionCache<
569    TKey: Hash + Eq + Send + Sync + Clone + Serialize + DeserializeOwned + 'static,
570    TValue: Clone + Send + Sync + Clone + Serialize + DeserializeOwned + 'static,
571> {
572    cache: Cache<TKey, CacheValue<TValue>>,
573    time_to_live: Option<std::time::Duration>,
574    time_to_idle: Option<std::time::Duration>,
575    hard_timeout: Option<std::time::Duration>,
576    should_factory_execute_in_background: bool,
577    fail_safe_cache: Option<FailSafeCache<TKey, CacheValue<TValue>>>,
578    // This map holds the in-flight factory requests for each key.
579    in_flight_factory_requests:
580        Arc<Mutex<HashMap<TKey, broadcast::Sender<Result<TValue, FusionCacheError>>>>>,
581    distributed_cache: Option<DistributedCache<TKey, TValue>>,
582    _cache_synchronization_task: Option<Arc<Mutex<JoinHandle<()>>>>,
583    should_redis_writes_happen_in_background: bool,
584    background_factory_tasks: Arc<
585        Mutex<
586            HashMap<
587                TKey,
588                (
589                    JoinHandle<()>,
590                    broadcast::Sender<Result<TValue, FusionCacheError>>,
591                ),
592            >,
593        >,
594    >,
595}
596
597impl<
598    TKey: Hash + Eq + Send + Sync + Clone + Serialize + DeserializeOwned + Debug + 'static,
599    TValue: Clone + Send + Sync + Clone + Serialize + DeserializeOwned + Debug + 'static,
600> FusionCache<TKey, TValue>
601{
602    /// Retrieves a value from the cache, or generates it using the provided factory if not found.
603    /// If the distributed cache is enabled, the value will be retrieved and written to the distributed cache as well.
604    ///
605    /// This method implements a sophisticated cache stampede protection mechanism:
606    /// - If the value exists in cache, returns it immediately
607    /// - If the value is being generated by another request, waits for and returns that result
608    /// - If no request is in flight, generates the value using the factory
609    ///
610    /// # Arguments
611    /// * `key` - The key to look up in the cache
612    /// * `factory` - The factory to use for generating the value if not found
613    /// * `options` - The options to use for this specifc entry. If not provided, the default options will be used.
614    /// # Returns
615    /// * `Ok(TValue)` - The cached or newly generated value
616    /// * `Err(FusionCacheError)` - If value generation failed or timed out
617    #[tracing::instrument(name = "FusionCache::get_or_set", skip(self))]
618    pub async fn get_or_set<
619        TError: Clone + Send + Sync + Display + 'static,
620        F: Factory<TKey, TValue, TError>,
621    >(
622        &mut self,
623        key: TKey,
624        factory: F,
625        options: Option<FusionCacheOptions>,
626    ) -> Result<TValue, FusionCacheError> {
627        let options = self.get_options().with_overrides(options);
628        info!(target: LOG_TARGET, "Cache lookup for key: {:?}", key);
629        let maybe_entry = self.cache.get(&key).await;
630        match maybe_entry {
631            Some(entry) => {
632                info!(target: LOG_TARGET, "Cache hit for key: {:?}", key);
633                Ok(entry.value)
634            }
635            None => {
636                let maybe_value_from_redis =
637                    if let Some(distributed_cache) = &mut self.distributed_cache {
638                        distributed_cache.get(&key).await?
639                    } else {
640                        None
641                    };
642                if let Some(value_from_redis) = maybe_value_from_redis {
643                    info!(target: LOG_TARGET, "Cache miss for key: {:?}, hit in Redis", key);
644                    self.cache
645                        .insert(key.clone(), value_from_redis.clone())
646                        .await;
647                    Ok(value_from_redis.value)
648                } else {
649                    info!(target: LOG_TARGET, "Cache miss for key: {:?}. Retrieving from factory", key);
650                    self.request_key(key, factory, options).await
651                }
652            }
653        }
654    }
655
656    #[tracing::instrument(name = "FusionCache::request_key", skip(self))]
657    async fn request_key<
658        TError: Clone + Send + Sync + Display + 'static,
659        F: Factory<TKey, TValue, TError>,
660    >(
661        &mut self,
662        key: TKey,
663        factory: F,
664        options: FusionCacheOptions,
665    ) -> Result<TValue, FusionCacheError> {
666        let mut in_flight_factory_requests = self.in_flight_factory_requests.lock().await;
667        let maybe_factory_sender = in_flight_factory_requests.get(&key).cloned();
668        match maybe_factory_sender {
669            Some(factory_sender) => {
670                info!(target: LOG_TARGET, "Factory already in flight for key: {:?}, waiting", key);
671                drop(in_flight_factory_requests);
672                let mut factory_receiver = factory_sender.subscribe();
673                let factory_receiver_result = factory_receiver.recv().await;
674                if let Ok(factory_result) = factory_receiver_result {
675                    info!(target: LOG_TARGET, "Factory result received for key: {:?}", key);
676                    factory_result
677                } else {
678                    // The value might be in the cache, but the receiver could return an error because
679                    // the sender was dropped. In this case, we should try to get the value from the cache.
680                    if let Some(value) = self.cache.get(&key).await.map(|entry| entry.value) {
681                        info!(target: LOG_TARGET, "Factory result received for key: {:?}, but receiver was dropped. Value was in cache. Returning value", key);
682                        Ok(value)
683                    } else {
684                        error!(target: LOG_TARGET, "There's a very high probablity that your system is corrupted. Factory result received for key: {:?}, but receiver was dropped, and the value was not in cache. Returning error.", key);
685                        Err(FusionCacheError::SystemCorruption)
686                    }
687                }
688            }
689            None => {
690                info!(target: LOG_TARGET, "No factory in flight for key: {:?}, creating new factory", key);
691                let (factory_sender, _) = broadcast::channel(1);
692                in_flight_factory_requests.insert(key.clone(), factory_sender.clone());
693                drop(in_flight_factory_requests);
694                let factory_value_or_failsafe = self
695                    .get_from_factory_or_fail_safe(key.clone(), factory, options.clone())
696                    .await;
697
698                match &factory_value_or_failsafe {
699                    Ok(Source::Factory(value)) => {
700                        info!(target: LOG_TARGET, "Factory result received for key: {:?}, setting value in cache", key);
701                        self.set_internal(key.clone(), value.clone(), options).await;
702                    }
703                    Ok(Source::Failsafe(_)) => {
704                        info!(target: LOG_TARGET, "The factory errored out, however failsafe result received for key: {:?}, setting value in cache. Returning failsafe value.", key);
705                    }
706                    Err(e) => {
707                        error!(target: LOG_TARGET, "Error received from factory: {:?}. Returning error.", e);
708                    }
709                }
710
711                let retrieval_result = factory_value_or_failsafe.map(|s| s.value());
712                let _ = factory_sender.send(retrieval_result.clone());
713                self.remove_from_in_flight_factory_requests(&key).await;
714                retrieval_result
715            }
716        }
717    }
718
719    #[tracing::instrument(name = "FusionCache::get_from_factory_or_fail_safe", skip(self))]
720    async fn get_from_factory_or_fail_safe<
721        TError: Clone + Send + Sync + Display + 'static,
722        F: Factory<TKey, TValue, TError>,
723    >(
724        &mut self,
725        key: TKey,
726        factory: F,
727        options: FusionCacheOptions,
728    ) -> Result<Source<TValue>, FusionCacheError> {
729        if let Some(fail_safe_cache) = &mut self.fail_safe_cache {
730            let fail_safe_result = fail_safe_cache.get(&key).await;
731            match fail_safe_result {
732                FailSafeResult::NotInFailSafeMode | FailSafeResult::CurrentCycleEnded => {
733                    info!(target: LOG_TARGET, "Not in fail-safe mode for key: {:?}, getting value from factory", key);
734                    self.do_get_from_factory_or_failsafe(key.clone(), factory, options)
735                        .await
736                }
737                FailSafeResult::Hit(value) => {
738                    warn!(target: LOG_TARGET, "Currently in fail-safe mode. Fail-safe cache hit for key: {:?}, returning failsafe value", key);
739                    Ok(Source::Failsafe(value.value))
740                }
741                FailSafeResult::Miss(error_message) => {
742                    error!(target: LOG_TARGET, "Fail-safe cache miss for key: {:?}, exiting fail-safe mode", key);
743                    fail_safe_cache.exit_failsafe_mode(key.clone()).await;
744                    Err(FusionCacheError::FactoryError(error_message))
745                }
746                FailSafeResult::TooManyCycles(error_message) => {
747                    error!(target: LOG_TARGET, "Fail-safe cache too many cycles for key: {:?}, exiting fail-safe mode", key);
748                    fail_safe_cache.exit_failsafe_mode(key.clone()).await;
749                    fail_safe_cache.invalidate(&key).await;
750                    Err(FusionCacheError::FactoryError(error_message))
751                }
752            }
753        } else {
754            if let Some(hard_timeout) = self.hard_timeout {
755                let result_or_timeout = tokio::time::timeout(hard_timeout, factory.get(&key)).await;
756                match result_or_timeout {
757                    Ok(factory_value) => factory_value
758                        .map_err(|e| FusionCacheError::FactoryError(e.to_string()))
759                        .map(Source::Factory),
760                    Err(_) => {
761                        error!(target: LOG_TARGET, "Factory operation timed out for key: {:?} on hard timeout, returning error", key);
762                        Err(FusionCacheError::FactoryTimeout)
763                    }
764                }
765            } else {
766                info!(target: LOG_TARGET, "No timeouts configured, getting value from factory for key: {:?}", key);
767                factory
768                    .get(&key)
769                    .await
770                    .map_err(|e| FusionCacheError::FactoryError(e.to_string()))
771                    .map(Source::Factory)
772            }
773        }
774    }
775
776    async fn remove_from_in_flight_factory_requests(&self, key: &TKey) {
777        let mut in_flight_factory_requests = self.in_flight_factory_requests.lock().await;
778        if let Some(_) = in_flight_factory_requests.get(key) {
779            in_flight_factory_requests.remove(key);
780        }
781        drop(in_flight_factory_requests);
782    }
783
784    /// Retrieves a value from the cache without invoking the factory.
785    ///
786    /// # Arguments
787    /// * `key` - The key to look up in the cache
788    ///
789    /// # Returns
790    /// * `Some(TValue)` - The cached value if it exists
791    /// * `None` - If the value is not in the cache
792    #[tracing::instrument(name = "FusionCache::get", skip(self))]
793    pub async fn get(&mut self, key: TKey) -> Option<TValue> {
794        if let Some(distributed_cache) = &mut self.distributed_cache {
795            let value = distributed_cache.get(&key).await;
796            if let Ok(value) = value {
797                if let Some(v) = value {
798                    self.cache.insert(key, v.clone()).await;
799                    return Some(v.value);
800                }
801            }
802        }
803        self.cache.get(&key).await.map(|entry| entry.value)
804    }
805
806    /// Directly sets a value in the cache.
807    ///
808    /// # Arguments
809    /// * `key` - The key under which to store the value
810    /// * `value` - The value to store
811    /// * `options` - The options to use for this specifc entry. If not provided, the default options will be used.
812    #[tracing::instrument(name = "FusionCache::set", skip(self))]
813    pub async fn set(&mut self, key: TKey, value: TValue, options: Option<FusionCacheOptions>) {
814        let opts = options.unwrap_or(self.get_options());
815        self.set_internal(key, value, opts).await;
816    }
817
818    #[tracing::instrument(name = "FusionCache::set_internal", skip(self))]
819    async fn set_internal(&mut self, key: TKey, value: TValue, options: FusionCacheOptions) {
820        let cache_value = CacheValue::new(value, options.entry_ttl, options.entry_tti);
821        if let Some(distributed_cache) = &mut self.distributed_cache {
822            if !options.skip_distributed_cache {
823                if !options
824                    .should_redis_writes_happen_in_background
825                    .is_some_and(|bw| bw)
826                {
827                    info!(target: LOG_TARGET, "Setting value in distributed cache for key: {:?}", key);
828                    let _ = distributed_cache.set(&key, &cache_value).await;
829                } else {
830                    let mut _distributed_cache = distributed_cache.clone();
831                    let _cache_value = cache_value.clone();
832                    let _key = key.clone();
833                    info!(target: LOG_TARGET, "Spawning task to set value in distributed cache for key: {:?}", key);
834                    task::spawn(async move {
835                        let _ = _distributed_cache.set(&_key, &_cache_value).await;
836                    });
837                }
838            } else {
839                info!(target: LOG_TARGET, "Skipping distributed cache write for key: {:?}", key);
840            }
841        } else {
842            info!(target: LOG_TARGET, "No distributed cache configured, skipping distributed cache write for key: {:?}", key);
843        }
844        if let Some(fail_safe_cache) = &mut self.fail_safe_cache {
845            info!(target: LOG_TARGET, "Setting value in fail-safe cache for key: {:?}", key);
846            fail_safe_cache
847                .insert(key.clone(), cache_value.clone())
848                .await;
849            fail_safe_cache.exit_failsafe_mode(key.clone()).await;
850        }
851        info!(target: LOG_TARGET, "Setting value in local cache for key: {:?}", key);
852        self.cache.insert(key, cache_value).await;
853    }
854
855    /// Removes a value from the cache.
856    ///
857    /// # Arguments
858    /// * `key` - The key to remove from the cache
859    pub async fn evict(&mut self, key: TKey) {
860        self.cache.invalidate(&key).await;
861        if let Some(distributed_cache) = &mut self.distributed_cache {
862            distributed_cache.evict(&key).await;
863        }
864    }
865    // This method only gets called when the cache is created with fail-safe enabled.
866    #[tracing::instrument(name = "FusionCache::do_get_from_factory_or_failsafe", skip(self))]
867    async fn do_get_from_factory_or_failsafe<
868        TError: Clone + Send + Sync + Display + 'static,
869        F: Factory<TKey, TValue, TError>,
870    >(
871        &mut self,
872        key: TKey,
873        factory: F,
874        options: FusionCacheOptions,
875    ) -> Result<Source<TValue>, FusionCacheError> {
876        if let Some(_) = options
877            .fail_safe_configuration
878            .as_ref()
879            .and_then(|c| c.soft_timeout)
880        {
881            info!(target: LOG_TARGET, "Getting value from factory or failsafe cache for key: {:?}", key);
882            self.factory_result_with_soft_timeout(&key, factory, options)
883                .await
884        } else if let Some(hard_timeout) = self.hard_timeout {
885            let result = select! {
886                factory_result = factory.get(&key) => {
887                    factory_result.map_err(|e| FusionCacheError::FactoryError(e.to_string()))
888                }
889                _ = tokio::time::sleep(hard_timeout) => {
890                    Err(FusionCacheError::FactoryTimeout)
891                }
892            };
893
894            self.process_factory_result_or_timeout(&key, result).await
895        } else {
896            let result = factory
897                .get(&key)
898                .await
899                .map_err(|e| FusionCacheError::FactoryError(e.to_string()));
900
901            self.process_factory_result_or_timeout(&key, result).await
902        }
903    }
904
905    #[tracing::instrument(name = "FusionCache::factory_result_with_soft_timeout", skip(self))]
906    async fn factory_result_with_soft_timeout<
907        TError: Clone + Send + Sync + Display + 'static,
908        F: Factory<TKey, TValue, TError>,
909    >(
910        &mut self,
911        key: &TKey,
912        factory: F,
913        options: FusionCacheOptions,
914    ) -> Result<Source<TValue>, FusionCacheError> {
915        let soft_timeout = options
916            .fail_safe_configuration
917            .as_ref()
918            .unwrap()
919            .soft_timeout
920            .unwrap();
921        let local_factory_result_sender = self
922            .get_or_create_background_factory_task(key, factory, options)
923            .await;
924        let mut local_factory_result_receiver = local_factory_result_sender.subscribe();
925        select! {
926            factory_result = local_factory_result_receiver.recv() => {
927                if let Ok(r) = factory_result {
928                    if let Ok(value) = r {
929                        Ok(Source::Factory(value))
930                    } else {
931                        let fail_safe_cache = self.fail_safe_cache.as_mut().unwrap();
932                        fail_safe_cache.start_failsafe_cycle(key.clone(), r.unwrap_err().to_string()).await;
933                        let fail_safe_result = fail_safe_cache.get(&key).await;
934                        match fail_safe_result {
935                            FailSafeResult::Hit(value)=> {
936                                Ok(Source::Failsafe(value.value))
937                            }
938                            FailSafeResult::Miss(error_message)=>{
939                                fail_safe_cache.exit_failsafe_mode(key.clone()).await;
940                                Err(FusionCacheError::FactoryError(error_message))
941                            }
942                            FailSafeResult::TooManyCycles(error_message)=>{
943                                fail_safe_cache.exit_failsafe_mode(key.clone()).await;
944                                fail_safe_cache.invalidate(&key).await;
945                                Err(FusionCacheError::FactoryError(error_message))
946                            }
947                            FailSafeResult::CurrentCycleEnded=>{
948                                panic!("Got FailSafeResult::CurrentCycleEnded, but we just started a cycle");
949                            }
950                            FailSafeResult::NotInFailSafeMode => {
951                                panic!("Got FailSafeResult::NotInFailSafeMode, but we just entered fail safe mode");
952                            }
953                                                    }
954                    }
955                } else {
956                    Err(FusionCacheError::SystemCorruption)
957                }
958            }
959            _ = tokio::time::sleep(soft_timeout) => {
960                let fail_safe_cache = self.fail_safe_cache.as_mut().unwrap();
961                fail_safe_cache.start_failsafe_cycle(key.clone(), "timeout".to_string()).await;
962                let fail_safe_result = fail_safe_cache.get(&key).await;
963                match fail_safe_result {
964                    FailSafeResult::Hit(value) => {
965                        if !self.should_factory_execute_in_background {
966                            let mut background_factory_tasks = self.background_factory_tasks.lock().await;
967                            let task = background_factory_tasks.remove(&key);
968                            drop(background_factory_tasks);
969                            if let Some((task, _)) = task {
970                                task.abort();
971                            }
972                        }
973                        Ok(Source::Failsafe(value.value))
974                    }
975                    FailSafeResult::Miss(_) => {
976                        fail_safe_cache.exit_failsafe_mode(key.clone()).await;
977                        if let Some(hard_timeout) = self.hard_timeout {
978                            tokio::time::sleep(hard_timeout - soft_timeout).await;
979                            let mut background_factory_tasks = self.background_factory_tasks.lock().await;
980                            let task = background_factory_tasks.remove(&key);
981                            drop(background_factory_tasks);
982                            if let Some((task, _)) = task {
983                                task.abort();
984                            }
985                            Err(FusionCacheError::FactoryTimeout)
986                        } else {
987                            Err(FusionCacheError::FactoryTimeout)
988                        }
989                    }
990                    FailSafeResult::TooManyCycles(error_message) => {
991                        fail_safe_cache.exit_failsafe_mode(key.clone()).await;
992                        if let Some(hard_timeout) = self.hard_timeout {
993                            tokio::time::sleep(hard_timeout - soft_timeout).await;
994                            let mut background_factory_tasks = self.background_factory_tasks.lock().await;
995                            let task = background_factory_tasks.remove(&key);
996                            drop(background_factory_tasks);
997                            if let Some((task, _)) = task {
998                                task.abort();
999                            }
1000                            Err(FusionCacheError::FactoryTimeout)
1001                        } else {
1002                            Err(FusionCacheError::FactoryTimeout)
1003                        }
1004                    }
1005                    FailSafeResult::CurrentCycleEnded => {
1006                        panic!(
1007                            "Got FailSafeResult::CurrentCycleEnded, but we just started a cycle"
1008                        );
1009                    }
1010                    FailSafeResult::NotInFailSafeMode => {
1011                        panic!(
1012                            "Got FailSafeResult::NotInFailSafeMode, but we just entered fail safe mode"
1013                        );
1014                    }
1015                }
1016            }
1017        }
1018    }
1019
1020    // If the factory is already in execution, we use the existing sender and task.
1021    // Otherwise, we create a new sender and task.
1022    // This is because we want to avoid creating a new task for the same key if it's already in execution.
1023    async fn get_or_create_background_factory_task<
1024        TError: Clone + Send + Sync + Display + 'static,
1025        F: Factory<TKey, TValue, TError>,
1026    >(
1027        &mut self,
1028        key: &TKey,
1029        factory: F,
1030        options: FusionCacheOptions,
1031    ) -> broadcast::Sender<Result<TValue, FusionCacheError>> {
1032        let mut background_factory_tasks = self.background_factory_tasks.lock().await;
1033        let background_factory_in_execution = background_factory_tasks.remove(&key);
1034
1035        let (factory_task, local_factory_result_sender) =
1036            if let Some((f, local_factory_result_sender)) = background_factory_in_execution {
1037                (f, local_factory_result_sender.clone())
1038            } else {
1039                let (local_factory_result_sender, _) =
1040                    tokio::sync::broadcast::channel::<Result<TValue, FusionCacheError>>(1);
1041                (
1042                    self.start_factory_in_background(
1043                        &key,
1044                        &factory,
1045                        local_factory_result_sender.clone(),
1046                        options,
1047                    ),
1048                    local_factory_result_sender.clone(),
1049                )
1050            };
1051        background_factory_tasks.insert(
1052            key.clone(),
1053            (factory_task, local_factory_result_sender.clone()),
1054        );
1055        drop(background_factory_tasks);
1056        local_factory_result_sender
1057    }
1058
1059    async fn process_factory_result_or_timeout(
1060        &mut self,
1061        key: &TKey,
1062        result: Result<TValue, FusionCacheError>,
1063    ) -> Result<Source<TValue>, FusionCacheError> {
1064        match result {
1065            Ok(factory_value) => Ok(Source::Factory(factory_value)),
1066            Err(e) => {
1067                let fail_safe_cache = self.fail_safe_cache.as_mut().unwrap();
1068                fail_safe_cache
1069                    .start_failsafe_cycle(key.clone(), e.to_string())
1070                    .await;
1071                let fail_safe_result = fail_safe_cache.get(key).await;
1072                match fail_safe_result {
1073                    FailSafeResult::Hit(value) => Ok(Source::Failsafe(value.value)),
1074                    FailSafeResult::Miss(error_message) => {
1075                        fail_safe_cache.exit_failsafe_mode(key.clone()).await;
1076                        Err(FusionCacheError::FactoryError(error_message))
1077                    }
1078                    FailSafeResult::TooManyCycles(error_message) => {
1079                        fail_safe_cache.exit_failsafe_mode(key.clone()).await;
1080                        Err(FusionCacheError::FactoryError(error_message))
1081                    }
1082                    FailSafeResult::CurrentCycleEnded => {
1083                        panic!(
1084                            "Got FailSafeResult::CurrentCycleEnded, but we just started a cycle"
1085                        );
1086                    }
1087                    FailSafeResult::NotInFailSafeMode => {
1088                        panic!(
1089                            "Got FailSafeResult::NotInFailSafeMode, but we just entered fail safe mode"
1090                        );
1091                    }
1092                }
1093            }
1094        }
1095    }
1096
1097    fn start_factory_in_background<
1098        TError: Clone + Send + Sync + Display + 'static,
1099        F: Factory<TKey, TValue, TError>,
1100    >(
1101        &self,
1102        key: &TKey,
1103        factory: &F,
1104        tx: tokio::sync::broadcast::Sender<Result<TValue, FusionCacheError>>,
1105        options: FusionCacheOptions,
1106    ) -> task::JoinHandle<()> {
1107        let _key = key.clone();
1108        let _factory = factory.clone();
1109        let mut _self = self.clone();
1110        task::spawn(async move {
1111            let hard_timeout = _self.hard_timeout.unwrap_or(Duration::MAX);
1112            let factory_result = select! {
1113                result = _factory.get(&_key) => {
1114                    result.map_err(|e| FusionCacheError::FactoryError(e.to_string()))
1115                },
1116                _ = tokio::time::sleep(hard_timeout) => {
1117                    Err(FusionCacheError::FactoryTimeout)
1118                }
1119            };
1120
1121            match factory_result {
1122                Ok(factory_value) => {
1123                    let _ = tx.send(Ok(factory_value.clone()));
1124                    _self
1125                        .set_internal(_key.clone(), factory_value, options)
1126                        .await;
1127                }
1128                Err(e) => {
1129                    _self.remove_from_in_flight_factory_requests(&_key).await;
1130                    let _ = tx.send(Err(e.into()));
1131                }
1132            };
1133            let mut background_factory_tasks = _self.background_factory_tasks.lock().await;
1134            background_factory_tasks.remove(&_key);
1135            drop(background_factory_tasks);
1136        })
1137    }
1138
1139    fn get_options(&self) -> FusionCacheOptions {
1140        FusionCacheOptions {
1141            entry_ttl: self.time_to_live,
1142            entry_tti: self.time_to_idle,
1143            fail_safe_configuration: self
1144                .fail_safe_cache
1145                .as_ref()
1146                .map(|c| c.configuration.clone()),
1147            hard_timeout: self.hard_timeout.clone(),
1148            skip_distributed_cache: self.distributed_cache.is_none(),
1149            ignore_ttl: self.time_to_live.is_none(),
1150            ignore_tti: self.time_to_idle.is_none(),
1151            skip_fail_safe_cache: self.fail_safe_cache.is_none(),
1152            skip_hard_timeout: self.hard_timeout.is_none(),
1153            should_redis_writes_happen_in_background: Some(
1154                self.should_redis_writes_happen_in_background,
1155            ),
1156        }
1157    }
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162    use core::fmt;
1163    use std::time::Duration;
1164
1165    use super::*;
1166
1167    use futures::future::join_all;
1168    use tracing::{Event, Subscriber};
1169    use tracing_subscriber::{
1170        fmt::{FmtContext, FormatEvent, format},
1171        registry::LookupSpan,
1172    };
1173
1174    #[tokio::test]
1175    async fn test_factory_only_gets_called_once_if_multiple_threads_request_the_same_key() {
1176        let factory = TestFactory::new();
1177        let cache = FusionCacheBuilder::new().build().await.unwrap();
1178        let key = 1;
1179        let mut handles = vec![];
1180        for _ in 0..100000 {
1181            let factory = factory.clone();
1182            let mut cache = cache.clone();
1183            handles.push(tokio::spawn(async move {
1184                let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1185                assert_eq!(value, 1);
1186            }));
1187        }
1188        join_all(handles).await;
1189    }
1190
1191    #[tokio::test]
1192    async fn test_failsafe_hits_when_there_is_an_entry_in_failsafe_cache() {
1193        let factory = FallibleTestFactory::new();
1194        let mut cache = FusionCacheBuilder::new()
1195            .with_fail_safe(
1196                Duration::from_secs(10),
1197                Duration::from_secs(5),
1198                Some(3),
1199                None,
1200            )
1201            .build()
1202            .await
1203            .unwrap();
1204
1205        let value = cache.get_or_set(1, factory.clone(), None).await;
1206        assert_eq!(1, value.unwrap());
1207
1208        cache.evict(1).await;
1209
1210        let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1211        assert_eq!(1, failsafe_value.unwrap())
1212    }
1213
1214    #[tokio::test]
1215    async fn test_failsafe_hits_when_there_is_an_entry_in_failsafe_cache_multithreaded() {
1216        let factory = FallibleTestFactory::new();
1217        let mut cache = FusionCacheBuilder::new()
1218            .with_fail_safe(
1219                Duration::from_secs(10),
1220                Duration::from_secs(5),
1221                Some(3),
1222                None,
1223            )
1224            .build()
1225            .await
1226            .unwrap();
1227
1228        let value = cache.get_or_set(1, factory.clone(), None).await;
1229        assert_eq!(1, value.unwrap());
1230
1231        cache.evict(1).await;
1232
1233        let mut handles = vec![];
1234        for _ in 0..100000 {
1235            let factory = factory.clone();
1236            let mut cache = cache.clone();
1237            handles.push(tokio::spawn(async move {
1238                let value = cache.get_or_set(1, factory.clone(), None).await.unwrap();
1239                assert_eq!(value, 1);
1240            }));
1241        }
1242        join_all(handles).await;
1243    }
1244
1245    #[tokio::test]
1246    async fn test_failsafe_misses_when_there_is_no_entry_in_failsafe_cache() {
1247        let factory = FallibleTestFactory::new();
1248        let mut cache = FusionCacheBuilder::new()
1249            .with_fail_safe(
1250                Duration::from_secs(2),
1251                Duration::from_secs(1),
1252                Some(3),
1253                None,
1254            )
1255            .build()
1256            .await
1257            .unwrap();
1258
1259        let value = cache.get_or_set(1, factory.clone(), None).await;
1260        assert_eq!(1, value.unwrap());
1261
1262        cache.evict(1).await;
1263        tokio::time::sleep(Duration::from_secs(3)).await;
1264
1265        let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1266        assert!(failsafe_value.is_err());
1267    }
1268
1269    #[tokio::test]
1270    async fn test_failsafe_misses_when_there_is_no_entry_in_failsafe_cache_multithreaded() {
1271        let factory = FallibleTestFactory::new();
1272        let mut cache = FusionCacheBuilder::new()
1273            .with_fail_safe(
1274                Duration::from_secs(2),
1275                Duration::from_secs(1),
1276                Some(3),
1277                None,
1278            )
1279            .build()
1280            .await
1281            .unwrap();
1282
1283        let value = cache.get_or_set(1, factory.clone(), None).await;
1284        assert_eq!(1, value.unwrap());
1285
1286        cache.evict(1).await;
1287        tokio::time::sleep(Duration::from_secs(3)).await;
1288
1289        let mut handles = vec![];
1290        for _ in 0..100000 {
1291            let factory = factory.clone();
1292            let mut cache = cache.clone();
1293            handles.push(tokio::spawn(async move {
1294                let value = cache.get_or_set(1, factory.clone(), None).await;
1295                assert!(value.is_err());
1296            }));
1297        }
1298        join_all(handles).await;
1299    }
1300
1301    #[tokio::test]
1302    async fn test_failsafe_misses_when_the_maximum_number_of_cycles_is_reached() {
1303        let factory = FallibleTestFactory::new();
1304        let mut cache = FusionCacheBuilder::new()
1305            .with_fail_safe(
1306                Duration::from_secs(15),
1307                Duration::from_secs(1),
1308                Some(2),
1309                None,
1310            )
1311            .build()
1312            .await
1313            .unwrap();
1314
1315        let value = cache.get_or_set(1, factory.clone(), None).await;
1316        assert_eq!(1, value.unwrap());
1317
1318        cache.evict(1).await;
1319
1320        let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1321        assert_eq!(1, failsafe_value.unwrap());
1322
1323        tokio::time::sleep(Duration::from_secs(2)).await;
1324
1325        let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1326        assert_eq!(1, failsafe_value.unwrap());
1327
1328        tokio::time::sleep(Duration::from_secs(2)).await;
1329
1330        let failsafe_value = cache.get_or_set(1, factory.clone(), None).await;
1331        assert!(failsafe_value.is_err());
1332    }
1333
1334    #[tokio::test]
1335    async fn test_failsafe_misses_when_the_maximum_number_of_cycles_is_reached_multithreaded() {
1336        let factory = FallibleTestFactory::new();
1337        let mut cache = FusionCacheBuilder::new()
1338            .with_fail_safe(
1339                Duration::from_secs(15),
1340                Duration::from_secs(1),
1341                Some(2),
1342                None,
1343            )
1344            .build()
1345            .await
1346            .unwrap();
1347
1348        let value = cache.get_or_set(1, factory.clone(), None).await;
1349        assert_eq!(1, value.unwrap());
1350
1351        cache.evict(1).await;
1352
1353        let mut handles = vec![];
1354        for _ in 0..100000 {
1355            let factory = factory.clone();
1356            let mut cache = cache.clone();
1357            handles.push(tokio::spawn(async move {
1358                let value = cache.get_or_set(1, factory.clone(), None).await;
1359                assert_eq!(1, value.unwrap());
1360            }));
1361        }
1362        join_all(handles).await;
1363
1364        tokio::time::sleep(Duration::from_secs(2)).await;
1365
1366        let mut handles = vec![];
1367        for _ in 0..100000 {
1368            let factory = factory.clone();
1369            let mut cache = cache.clone();
1370            handles.push(tokio::spawn(async move {
1371                let value = cache.get_or_set(1, factory.clone(), None).await;
1372                assert_eq!(1, value.unwrap());
1373            }));
1374        }
1375        join_all(handles).await;
1376
1377        let mut handles = vec![];
1378        for _ in 0..100000 {
1379            let factory = factory.clone();
1380            let mut cache = cache.clone();
1381            handles.push(tokio::spawn(async move {
1382                let value = cache.get_or_set(1, factory.clone(), None).await;
1383                assert!(value.is_err());
1384            }));
1385        }
1386        join_all(handles).await;
1387    }
1388
1389    #[tokio::test]
1390    async fn test_soft_timeout_with_failsafe_hit() {
1391        let factory = SlowTestFactory::new(Duration::from_secs(5), 1);
1392        let mut cache = FusionCacheBuilder::new()
1393            .with_fail_safe(
1394                Duration::from_secs(10),      // entry ttl
1395                Duration::from_secs(5),       // failsafe ttl
1396                Some(3),                      // max cycles
1397                Some(Duration::from_secs(1)), // soft timeout
1398            )
1399            .build()
1400            .await
1401            .unwrap();
1402
1403        // First call to populate both caches
1404        let value = cache.get_or_set(1, factory.clone(), None).await;
1405        assert_eq!(1, value.unwrap());
1406
1407        // Second call should hit failsafe before factory completes
1408        let start = std::time::Instant::now();
1409        let value = cache.get_or_set(1, factory.clone(), None).await;
1410        let elapsed = start.elapsed();
1411
1412        assert_eq!(1, value.unwrap());
1413        assert!(
1414            elapsed < Duration::from_secs(5),
1415            "Should return before factory completes"
1416        );
1417    }
1418
1419    #[tokio::test]
1420    async fn test_hard_timeout_triggers() {
1421        let factory = SlowTestFactory::new(Duration::from_secs(5), 0);
1422        let mut cache = FusionCacheBuilder::new()
1423            .with_hard_timeout(Duration::from_secs(2))
1424            .build()
1425            .await
1426            .unwrap();
1427
1428        let result = cache.get_or_set(1, factory, None).await;
1429        assert!(matches!(result, Err(FusionCacheError::FactoryTimeout)));
1430    }
1431
1432    #[tokio::test]
1433    async fn test_background_execution_continues_after_failsafe() {
1434        let factory = SlowTestFactory::new(Duration::from_secs(3), 1);
1435        let mut cache = FusionCacheBuilder::new()
1436            .with_fail_safe(
1437                Duration::from_secs(10),
1438                Duration::from_secs(5),
1439                Some(3),
1440                Some(Duration::from_secs(1)),
1441            )
1442            .set_factory_background_execution(true)
1443            .build()
1444            .await
1445            .unwrap();
1446
1447        // Populate the caches
1448        let value = cache.get_or_set(1, factory.clone(), None).await;
1449        assert_eq!(1, value.unwrap());
1450
1451        // Trigger failsafe hit and background execution
1452        cache.evict(1).await;
1453        let value = cache.get_or_set(1, factory.clone(), None).await;
1454        assert_eq!(1, value.unwrap());
1455
1456        // Wait for background execution to complete
1457        tokio::time::sleep(Duration::from_secs(4)).await;
1458
1459        // Value should be updated in main cache
1460        let cached_value = cache.get(1).await;
1461        assert_eq!(
1462            Some(2),
1463            cached_value,
1464            "Cache should be updated by background task"
1465        );
1466    }
1467
1468    #[tokio::test]
1469    async fn test_background_execution_does_not_continue_after_failsafe() {
1470        let factory = SlowTestFactory::new(Duration::from_secs(3), 1);
1471        let mut cache = FusionCacheBuilder::new()
1472            .with_fail_safe(
1473                Duration::from_secs(10),
1474                Duration::from_secs(5),
1475                Some(3),
1476                Some(Duration::from_secs(1)),
1477            )
1478            .set_factory_background_execution(false)
1479            .build()
1480            .await
1481            .unwrap();
1482
1483        // Populate the caches
1484        let value = cache.get_or_set(1, factory.clone(), None).await;
1485        assert_eq!(1, value.unwrap());
1486
1487        // Trigger failsafe hit and background execution
1488        cache.evict(1).await;
1489        let value = cache.get_or_set(1, factory.clone(), None).await;
1490        assert_eq!(1, value.unwrap());
1491
1492        // Wait for background execution to complete
1493        tokio::time::sleep(Duration::from_secs(4)).await;
1494
1495        // The value should not be in the main cache.
1496        let cached_value = cache.get(1).await;
1497        assert_eq!(
1498            None, cached_value,
1499            "Cache should not be updated by background task"
1500        );
1501    }
1502
1503    #[tokio::test]
1504    async fn test_entry_is_evicted_after_cache_ttl_expires() {
1505        let factory = TestFactory::new();
1506        let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1507            .with_time_to_live(Duration::from_secs(2))
1508            .build()
1509            .await
1510            .unwrap();
1511
1512        let value = cache.get_or_set(1, factory.clone(), None).await;
1513        assert_eq!(1, value.unwrap());
1514
1515        tokio::time::sleep(Duration::from_secs(3)).await;
1516
1517        let cached_value = cache.get(1).await;
1518        assert_eq!(None, cached_value);
1519    }
1520
1521    #[tokio::test]
1522    async fn test_entry_is_evicted_after_entry_ttl_expires() {
1523        let factory = TestFactory::new();
1524        let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1525            .with_time_to_live(Duration::from_secs(10))
1526            .build()
1527            .await
1528            .unwrap();
1529
1530        let options = FusionCacheOptionsBuilder::new()
1531            .with_time_to_live(Duration::from_secs(2))
1532            .build();
1533
1534        let value = cache.get_or_set(1, factory.clone(), Some(options)).await;
1535        assert_eq!(1, value.unwrap());
1536
1537        tokio::time::sleep(Duration::from_secs(3)).await;
1538
1539        let cached_value = cache.get(1).await;
1540        assert_eq!(None, cached_value);
1541    }
1542
1543    #[derive(Clone, Debug)]
1544    struct SlowTestFactory {
1545        counter: Arc<Mutex<u32>>,
1546        delay: Duration,
1547        delay_after_n_invocations: u32,
1548    }
1549
1550    impl SlowTestFactory {
1551        fn new(delay: Duration, delay_after_n_invocations: u32) -> Self {
1552            Self {
1553                counter: Arc::new(Mutex::new(0)),
1554                delay_after_n_invocations,
1555                delay,
1556            }
1557        }
1558    }
1559
1560    #[async_trait]
1561    impl Factory<u32, u32, TestFactoryError> for SlowTestFactory {
1562        async fn get(&self, _: &u32) -> Result<u32, TestFactoryError> {
1563            let mut counter = self.counter.lock().await;
1564            if *counter >= self.delay_after_n_invocations {
1565                tokio::time::sleep(self.delay).await;
1566            }
1567            *counter += 1;
1568            Ok(*counter)
1569        }
1570    }
1571
1572    #[derive(Clone, Debug)]
1573    struct TestFactory {
1574        counter: Arc<Mutex<u32>>,
1575    }
1576
1577    impl TestFactory {
1578        fn new() -> Self {
1579            Self {
1580                counter: Arc::new(Mutex::new(0)),
1581            }
1582        }
1583    }
1584
1585    #[async_trait]
1586    impl Factory<u32, u32, TestFactoryError> for TestFactory {
1587        async fn get(&self, _: &u32) -> Result<u32, TestFactoryError> {
1588            let mut counter = self.counter.lock().await;
1589            tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
1590            *counter += 1;
1591            Ok(*counter)
1592        }
1593    }
1594
1595    #[derive(Clone, Debug)]
1596    struct TestFactoryError;
1597
1598    impl Display for TestFactoryError {
1599        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1600            write!(f, "TestFactoryError")
1601        }
1602    }
1603
1604    impl Into<FusionCacheError> for TestFactoryError {
1605        fn into(self) -> FusionCacheError {
1606            FusionCacheError::Other
1607        }
1608    }
1609
1610    #[derive(Clone, Debug)]
1611    struct FallibleTestFactory {
1612        counter: Arc<Mutex<u32>>,
1613    }
1614
1615    impl FallibleTestFactory {
1616        pub fn new() -> Self {
1617            Self {
1618                counter: Arc::new(Mutex::new(0)),
1619            }
1620        }
1621    }
1622
1623    #[async_trait]
1624    impl Factory<u32, u32, TestFactoryError> for FallibleTestFactory {
1625        async fn get(&self, _: &u32) -> Result<u32, TestFactoryError> {
1626            let mut counter = self.counter.lock().await;
1627            *counter += 1;
1628            if *counter == 1 {
1629                Ok(1)
1630            } else {
1631                Err(TestFactoryError)
1632            }
1633        }
1634    }
1635
1636    #[tokio::test]
1637    async fn test_distributed_cache_with_redis() {
1638        let factory = TestFactory::new();
1639        let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1640            .with_redis(
1641                "redis://127.0.0.1/".to_string(),
1642                "test_distributed_cache_with_redis".to_string(),
1643                false,
1644                None,
1645            )
1646            .build()
1647            .await
1648            .unwrap();
1649
1650        // Test setting and getting a value
1651        let key = 1;
1652        let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1653        assert_eq!(value, 1);
1654
1655        // Test getting the value again (should be cached)
1656        let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1657        assert_eq!(value, 1);
1658    }
1659
1660    #[tokio::test]
1661    async fn test_distributed_cache_synchronization() {
1662        let factory = TestFactory::new();
1663        let mut cache1: FusionCache<u32, u32> = FusionCacheBuilder::new()
1664            .with_redis(
1665                "redis://127.0.0.1/".to_string(),
1666                "test_distributed_cache_synchronization".to_string(),
1667                false,
1668                None,
1669            )
1670            .build()
1671            .await
1672            .unwrap();
1673
1674        let mut cache2: FusionCache<u32, u32> = FusionCacheBuilder::new()
1675            .with_redis(
1676                "redis://127.0.0.1/".to_string(),
1677                "test_distributed_cache_synchronization".to_string(),
1678                false,
1679                None,
1680            )
1681            .build()
1682            .await
1683            .unwrap();
1684
1685        // Set value in cache1
1686        let key = 1;
1687        let value = cache1.get_or_set(key, factory.clone(), None).await.unwrap();
1688        assert_eq!(value, 1);
1689
1690        // Wait for synchronization
1691        tokio::time::sleep(Duration::from_millis(100)).await;
1692
1693        // Value should be in cache2
1694        let value = cache2.get(key).await.unwrap();
1695        assert_eq!(value, 1);
1696    }
1697
1698    #[tokio::test]
1699    async fn test_distributed_cache_with_redis_connection_failure() {
1700        let factory = TestFactory::new();
1701        let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1702            .with_redis(
1703                "redis://127.0.0.1/".to_string(),
1704                "test_distributed_cache_with_redis_connection_failure".to_string(),
1705                false,
1706                None,
1707            )
1708            .build()
1709            .await
1710            .unwrap();
1711
1712        // Set initial value
1713        let key = 1;
1714        let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1715        assert_eq!(value, 1);
1716
1717        // Break the Redis connection
1718        if let Some(distributed_cache) = &mut cache.distributed_cache {
1719            distributed_cache.break_connection();
1720        }
1721
1722        cache.set(key, 2, None).await;
1723
1724        // Restore the connection
1725        if let Some(distributed_cache) = &mut cache.distributed_cache {
1726            distributed_cache.restore_connection();
1727        }
1728
1729        // Wait for auto-recovery
1730        tokio::time::sleep(Duration::from_secs(2)).await;
1731
1732        // Value should be evicted
1733        let value = cache.get(key).await;
1734        assert_eq!(value, None);
1735    }
1736
1737    #[tokio::test]
1738    async fn test_distributed_cache_with_redis_and_failsafe() {
1739        let factory = FallibleTestFactory::new();
1740        let mut cache: FusionCache<u32, u32> = FusionCacheBuilder::new()
1741            .with_redis(
1742                "redis://127.0.0.1/".to_string(),
1743                "test_distributed_cache_with_redis_and_failsafe".to_string(),
1744                false,
1745                None,
1746            )
1747            .with_fail_safe(
1748                Duration::from_secs(10),
1749                Duration::from_secs(5),
1750                Some(3),
1751                None,
1752            )
1753            .build()
1754            .await
1755            .unwrap();
1756
1757        // Set initial value
1758        let key = 1;
1759        let value = cache.get_or_set(key, factory.clone(), None).await.unwrap();
1760        assert_eq!(value, 1);
1761
1762        // Break the Redis connection
1763        if let Some(distributed_cache) = &mut cache.distributed_cache {
1764            distributed_cache.break_connection();
1765        }
1766
1767        // Try to get the value (should use failsafe)
1768        cache.set(key, 2, None).await;
1769
1770        // Restore the connection
1771        if let Some(distributed_cache) = &mut cache.distributed_cache {
1772            distributed_cache.restore_connection();
1773        }
1774
1775        // Wait for auto-recovery
1776        tokio::time::sleep(Duration::from_secs(2)).await;
1777
1778        // Value should be evicted
1779        let value = cache.get(key).await;
1780        assert_eq!(value, None);
1781    }
1782    struct TraceIdWrapper<F> {
1783        inner: F,
1784    }
1785
1786    impl<S, N, F> FormatEvent<S, N> for TraceIdWrapper<F>
1787    where
1788        S: Subscriber + for<'a> LookupSpan<'a>,
1789        N: for<'a> format::FormatFields<'a> + 'static,
1790        F: FormatEvent<S, N>,
1791    {
1792        fn format_event(
1793            &self,
1794            ctx: &FmtContext<'_, S, N>,
1795            mut writer: format::Writer<'_>,
1796            event: &Event<'_>,
1797        ) -> std::fmt::Result {
1798            // Get the root span's ID, which we use as the trace_id.
1799            let trace_id = ctx.lookup_current().and_then(|span| {
1800                span.scope()
1801                    .from_root()
1802                    .next() // The root span in the hierarchy
1803                    .map(|s| s.id())
1804            });
1805
1806            if let Some(id) = trace_id {
1807                // You can customize the format here, e.g., "[trace_id={...}]"
1808                write!(writer, "trace_id={} ", id.into_u64())?;
1809            }
1810
1811            // Let the inner, default formatter do the rest.
1812            self.inner.format_event(ctx, writer, event)
1813        }
1814    }
1815}