Skip to main content

llmix_rs/
response_cache.rs

1use crate::canonical_json;
2use crate::error::{LlmixError, LlmixResult};
3use crate::types::{CacheHitTier, CachingStrategy, ResponseCacheStats, ResponseCacheStrategy};
4use async_trait::async_trait;
5use lru::LruCache;
6#[cfg(feature = "redis")]
7use redis::{aio::MultiplexedConnection, AsyncCommands, Client};
8use serde::{Deserialize, Serialize};
9use serde_json::{Map, Number, Value};
10use sha2::{Digest, Sha256};
11use std::collections::HashMap;
12use std::num::NonZeroUsize;
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15use tokio::sync::Mutex;
16
17pub const CACHE_KEY_PREFIX: &str = "llmix:resp:v2:";
18const DEFAULT_L1_MAX: usize = 1000;
19const DEFAULT_TTL_SECONDS: u64 = 3600;
20const CACHE_KEY_FIELDS: [&str; 15] = [
21    "baseUrl",
22    "enableThinking",
23    "frequencyPenalty",
24    "maxOutputTokens",
25    "messages",
26    "model",
27    "presencePenalty",
28    "provider",
29    "providerOptions",
30    "responseFormat",
31    "seed",
32    "stopSequences",
33    "temperature",
34    "topK",
35    "topP",
36];
37
38fn has_redis_url(redis_url: Option<&str>) -> bool {
39    redis_url.is_some_and(|value| !value.trim().is_empty())
40}
41
42#[derive(Debug, Clone, PartialEq, Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CacheKeyParams {
45    pub provider: String,
46    pub model: String,
47    pub messages: Vec<Value>,
48    #[serde(default)]
49    pub base_url: Option<String>,
50    #[serde(default)]
51    pub enable_thinking: Option<bool>,
52    #[serde(default)]
53    pub temperature: Option<f64>,
54    #[serde(default)]
55    pub max_output_tokens: Option<u64>,
56    #[serde(default)]
57    pub response_format: Option<Value>,
58    #[serde(default)]
59    pub provider_options: Option<Value>,
60    #[serde(default)]
61    pub seed: Option<i64>,
62    #[serde(default)]
63    pub top_p: Option<f64>,
64    #[serde(default)]
65    pub top_k: Option<i64>,
66    #[serde(default)]
67    pub presence_penalty: Option<f64>,
68    #[serde(default)]
69    pub frequency_penalty: Option<f64>,
70    #[serde(default)]
71    pub stop_sequences: Option<Vec<String>>,
72}
73
74impl CacheKeyParams {
75    fn to_canonical_value(&self) -> Value {
76        let mut fields = HashMap::new();
77
78        fields.insert("provider", Value::String(self.provider.clone()));
79        fields.insert("model", Value::String(self.model.clone()));
80        fields.insert("messages", Value::Array(self.messages.clone()));
81
82        if let Some(base_url) = self.base_url.as_ref() {
83            fields.insert("baseUrl", Value::String(base_url.clone()));
84        }
85        if let Some(enable_thinking) = self.enable_thinking {
86            fields.insert("enableThinking", Value::Bool(enable_thinking));
87        }
88        if let Some(value) = self.max_output_tokens {
89            fields.insert("maxOutputTokens", Value::Number(Number::from(value)));
90        }
91        if let Some(value) = self
92            .response_format
93            .clone()
94            .filter(|value| !value.is_null())
95        {
96            fields.insert("responseFormat", value);
97        }
98        if let Some(value) = self
99            .provider_options
100            .clone()
101            .filter(|value| !value.is_null())
102        {
103            fields.insert("providerOptions", value);
104        }
105        if let Some(value) = self.seed {
106            fields.insert("seed", Value::Number(Number::from(value)));
107        }
108        if let Some(value) = finite_number(self.temperature) {
109            fields.insert("temperature", Value::Number(value));
110        }
111        if let Some(value) = finite_number(self.top_p) {
112            fields.insert("topP", Value::Number(value));
113        }
114        if let Some(value) = self.top_k {
115            fields.insert("topK", Value::Number(Number::from(value)));
116        }
117        if let Some(value) = finite_number(self.presence_penalty) {
118            fields.insert("presencePenalty", Value::Number(value));
119        }
120        if let Some(value) = finite_number(self.frequency_penalty) {
121            fields.insert("frequencyPenalty", Value::Number(value));
122        }
123        if let Some(stop_sequences) = self.stop_sequences.as_ref() {
124            let array = stop_sequences
125                .iter()
126                .map(|value| Value::String(value.clone()))
127                .collect::<Vec<_>>();
128            fields.insert("stopSequences", Value::Array(array));
129        }
130
131        let mut map = Map::new();
132        for field in CACHE_KEY_FIELDS {
133            if let Some(value) = fields.remove(field) {
134                map.insert(field.to_owned(), value);
135            }
136        }
137        Value::Object(map)
138    }
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct CacheResult {
143    pub value: String,
144    pub tier: CacheHitTier,
145}
146
147#[derive(Debug, Clone, PartialEq)]
148struct CachedValue {
149    data: String,
150    cached_at: f64,
151}
152
153#[derive(Debug, Clone)]
154pub struct TwoTierCacheConfig {
155    pub max_items: usize,
156    pub ttl_seconds: u64,
157    pub redis_url: Option<String>,
158}
159
160impl Default for TwoTierCacheConfig {
161    fn default() -> Self {
162        Self {
163            max_items: DEFAULT_L1_MAX,
164            ttl_seconds: DEFAULT_TTL_SECONDS,
165            redis_url: None,
166        }
167    }
168}
169
170#[derive(Debug, Serialize)]
171struct RedisPayload<'a> {
172    data: &'a str,
173    cached_at: f64,
174}
175
176#[derive(Debug, Deserialize)]
177struct StoredPayload {
178    data: String,
179    #[serde(default, alias = "cachedAt")]
180    cached_at: Option<f64>,
181}
182
183#[async_trait]
184trait L2CacheBackend: Send + Sync {
185    async fn get(&self, key: &str) -> LlmixResult<Option<String>>;
186    async fn setex(&self, key: &str, ttl_seconds: u64, value: String) -> LlmixResult<()>;
187    async fn clear_prefix(&self, prefix: &str) -> LlmixResult<()>;
188    async fn close(&self) -> LlmixResult<()> {
189        Ok(())
190    }
191}
192
193#[cfg(feature = "redis")]
194struct RedisBackend {
195    client: Client,
196    connection: Mutex<Option<MultiplexedConnection>>,
197}
198
199#[cfg(feature = "redis")]
200impl RedisBackend {
201    fn new(redis_url: &str) -> LlmixResult<Self> {
202        let client = Client::open(redis_url)
203            .map_err(|error| LlmixError::Redis(format!("invalid redis url: {error}")))?;
204        Ok(Self {
205            client,
206            connection: Mutex::new(None),
207        })
208    }
209
210    async fn connection(&self) -> LlmixResult<MultiplexedConnection> {
211        let mut cached = self.connection.lock().await;
212        if let Some(connection) = cached.as_ref() {
213            return Ok(connection.clone());
214        }
215
216        let connection = self
217            .client
218            .get_multiplexed_async_connection()
219            .await
220            .map_err(map_redis_error)?;
221        *cached = Some(connection.clone());
222        Ok(connection)
223    }
224
225    async fn reset_connection(&self) {
226        *self.connection.lock().await = None;
227    }
228}
229
230#[cfg(feature = "redis")]
231#[async_trait]
232impl L2CacheBackend for RedisBackend {
233    async fn get(&self, key: &str) -> LlmixResult<Option<String>> {
234        let mut connection = self.connection().await?;
235        match connection.get(key).await {
236            Ok(value) => Ok(value),
237            Err(error) => {
238                self.reset_connection().await;
239                Err(map_redis_error(error))
240            }
241        }
242    }
243
244    async fn setex(&self, key: &str, ttl_seconds: u64, value: String) -> LlmixResult<()> {
245        let mut connection = self.connection().await?;
246        match connection.set_ex(key, value, ttl_seconds).await {
247            Ok(()) => Ok(()),
248            Err(error) => {
249                self.reset_connection().await;
250                Err(map_redis_error(error))
251            }
252        }
253    }
254
255    async fn clear_prefix(&self, prefix: &str) -> LlmixResult<()> {
256        let mut connection = self.connection().await?;
257        let pattern = format!("{prefix}*");
258        let mut cursor = 0_u64;
259
260        loop {
261            let (next_cursor, keys): (u64, Vec<String>) = match redis::cmd("SCAN")
262                .arg(cursor)
263                .arg("MATCH")
264                .arg(&pattern)
265                .arg("COUNT")
266                .arg(1000_u32)
267                .query_async(&mut connection)
268                .await
269            {
270                Ok(result) => result,
271                Err(error) => {
272                    self.reset_connection().await;
273                    return Err(map_redis_error(error));
274                }
275            };
276
277            if !keys.is_empty() {
278                let deleted: redis::RedisResult<()> = redis::cmd("DEL")
279                    .arg(&keys)
280                    .query_async(&mut connection)
281                    .await;
282                if let Err(error) = deleted {
283                    self.reset_connection().await;
284                    return Err(map_redis_error(error));
285                }
286            }
287
288            if next_cursor == 0 {
289                return Ok(());
290            }
291            cursor = next_cursor;
292        }
293    }
294
295    async fn close(&self) -> LlmixResult<()> {
296        self.reset_connection().await;
297        Ok(())
298    }
299}
300
301pub struct TwoTierCache {
302    strategy: ResponseCacheStrategy,
303    ttl_seconds: u64,
304    l1_max: usize,
305    l1: Mutex<LruCache<String, CachedValue>>,
306    l2_enabled: bool,
307    l2_healthy: Mutex<bool>,
308    l2_reads_blocked: Mutex<bool>,
309    l2_consecutive_write_failures: Mutex<u32>,
310    backend: Option<Arc<dyn L2CacheBackend>>,
311}
312
313impl TwoTierCache {
314    pub fn new(strategy: ResponseCacheStrategy, config: TwoTierCacheConfig) -> LlmixResult<Self> {
315        let redis_requested = has_redis_url(config.redis_url.as_deref());
316        if strategy == ResponseCacheStrategy::Redis && !redis_requested {
317            return Err(LlmixError::InvalidResponseCacheConfig(
318                "TwoTierCache strategy \"redis\" requires config.redis_url to be set.".to_owned(),
319            ));
320        }
321
322        let max_items = NonZeroUsize::new(config.max_items.max(1)).expect("max(1) is non-zero");
323        #[allow(unused_mut)]
324        let mut backend: Option<Arc<dyn L2CacheBackend>> = None;
325
326        #[cfg(feature = "redis")]
327        if strategy != ResponseCacheStrategy::Memory && redis_requested {
328            let redis_url = config
329                .redis_url
330                .as_deref()
331                .expect("redis_requested implies redis_url is present");
332            backend = Some(Arc::new(RedisBackend::new(redis_url)?));
333        }
334
335        #[cfg(not(feature = "redis"))]
336        if strategy == ResponseCacheStrategy::Redis && redis_requested {
337            return Err(LlmixError::InvalidResponseCacheConfig(
338                "TwoTierCache strategy \"redis\" requires llmix-rs to be built with the `redis` feature.".to_owned(),
339            ));
340        }
341
342        let l2_enabled = strategy != ResponseCacheStrategy::Memory && backend.is_some();
343
344        Ok(Self {
345            strategy,
346            ttl_seconds: config.ttl_seconds.max(1),
347            l1_max: max_items.get(),
348            l1: Mutex::new(LruCache::new(max_items)),
349            l2_enabled,
350            l2_healthy: Mutex::new(true),
351            l2_reads_blocked: Mutex::new(false),
352            l2_consecutive_write_failures: Mutex::new(0),
353            backend,
354        })
355    }
356
357    #[cfg(test)]
358    fn new_with_backend(
359        strategy: ResponseCacheStrategy,
360        config: TwoTierCacheConfig,
361        backend: Arc<dyn L2CacheBackend>,
362    ) -> LlmixResult<Self> {
363        let max_items = NonZeroUsize::new(config.max_items.max(1)).expect("max(1) is non-zero");
364        Ok(Self {
365            strategy,
366            ttl_seconds: config.ttl_seconds.max(1),
367            l1_max: max_items.get(),
368            l1: Mutex::new(LruCache::new(max_items)),
369            l2_enabled: strategy != ResponseCacheStrategy::Memory,
370            l2_healthy: Mutex::new(true),
371            l2_reads_blocked: Mutex::new(false),
372            l2_consecutive_write_failures: Mutex::new(0),
373            backend: Some(backend),
374        })
375    }
376
377    pub async fn get(&self, key: &str) -> Option<CacheResult> {
378        if let Some(hit) = self.get_l1(key).await {
379            return Some(CacheResult {
380                value: hit.data,
381                tier: CacheHitTier::L1,
382            });
383        }
384
385        if !self.l2_enabled {
386            return None;
387        }
388        if *self.l2_reads_blocked.lock().await {
389            return None;
390        }
391
392        let backend = self.backend.as_ref()?;
393        let raw = match backend.get(key).await {
394            Ok(value) => value?,
395            Err(_) => return None,
396        };
397        let parsed = serde_json::from_str::<StoredPayload>(&raw).ok()?;
398        let cached_at = normalize_cached_at_seconds(parsed.cached_at);
399        if cache_age_seconds(cached_at) >= self.ttl_seconds as f64 {
400            return None;
401        }
402
403        let entry = CachedValue {
404            data: parsed.data,
405            cached_at,
406        };
407        self.put_l1(key, entry.clone()).await;
408
409        Some(CacheResult {
410            value: entry.data,
411            tier: CacheHitTier::L2,
412        })
413    }
414
415    pub async fn set(&self, key: &str, value: &str) {
416        let entry = CachedValue {
417            data: value.to_owned(),
418            cached_at: now_seconds_f64(),
419        };
420        self.put_l1(key, entry.clone()).await;
421
422        if !self.l2_enabled {
423            return;
424        }
425
426        let Some(backend) = self.backend.as_ref() else {
427            return;
428        };
429
430        // Constructor clamps ttl_seconds to >= 1, so use it directly.
431        let ttl = self.ttl_seconds;
432        let payload = serde_json::to_string(&RedisPayload {
433            data: &entry.data,
434            cached_at: entry.cached_at,
435        });
436        let Ok(payload) = payload else {
437            return;
438        };
439
440        match backend.setex(key, ttl, payload).await {
441            Ok(()) => {
442                *self.l2_consecutive_write_failures.lock().await = 0;
443                *self.l2_healthy.lock().await = true;
444            }
445            Err(_) => {
446                let mut failures = self.l2_consecutive_write_failures.lock().await;
447                *failures += 1;
448                if *failures >= 3 {
449                    *self.l2_healthy.lock().await = false;
450                }
451            }
452        }
453    }
454
455    pub async fn clear(&self) -> LlmixResult<()> {
456        self.l1.lock().await.clear();
457        if !self.l2_enabled {
458            return Ok(());
459        }
460        let Some(backend) = self.backend.as_ref() else {
461            return Ok(());
462        };
463        match backend.clear_prefix(CACHE_KEY_PREFIX).await {
464            Ok(()) => {
465                *self.l2_reads_blocked.lock().await = false;
466                *self.l2_healthy.lock().await = true;
467                Ok(())
468            }
469            Err(error) => {
470                *self.l2_reads_blocked.lock().await = true;
471                *self.l2_healthy.lock().await = false;
472                Err(error)
473            }
474        }
475    }
476
477    pub async fn close(&self) {
478        if let Some(backend) = self.backend.as_ref() {
479            let _ = backend.close().await;
480        }
481    }
482
483    pub async fn get_stats(&self) -> ResponseCacheStats {
484        let l1_size = self.l1.lock().await.len();
485        ResponseCacheStats {
486            l1_size,
487            l1_max: self.l1_max,
488            l2_enabled: self.l2_enabled,
489            l2_healthy: *self.l2_healthy.lock().await,
490            strategy: self.strategy,
491        }
492    }
493
494    async fn get_l1(&self, key: &str) -> Option<CachedValue> {
495        let mut l1 = self.l1.lock().await;
496        let cached = l1.get(key).cloned()?;
497        if cache_age_seconds(cached.cached_at) >= self.ttl_seconds as f64 {
498            l1.pop(key);
499            return None;
500        }
501        Some(cached)
502    }
503
504    async fn put_l1(&self, key: &str, value: CachedValue) {
505        self.l1.lock().await.put(key.to_owned(), value);
506    }
507}
508
509pub fn is_response_cache_strategy(strategy: CachingStrategy) -> bool {
510    matches!(
511        strategy,
512        CachingStrategy::Redis | CachingStrategy::RedisOrMemory | CachingStrategy::Memory
513    )
514}
515
516pub fn should_skip_cache(strategy: CachingStrategy) -> bool {
517    matches!(
518        strategy,
519        CachingStrategy::Native | CachingStrategy::Gateway | CachingStrategy::Disabled
520    )
521}
522
523pub fn resolve_response_cache_strategy(
524    strategy: CachingStrategy,
525    redis_url: Option<&str>,
526) -> LlmixResult<Option<ResponseCacheStrategy>> {
527    if !is_response_cache_strategy(strategy) {
528        return Ok(None);
529    }
530
531    match strategy {
532        CachingStrategy::Redis => {
533            if !has_redis_url(redis_url) {
534                return Err(LlmixError::InvalidResponseCacheConfig(
535                    "Response cache strategy \"redis\" requires REDIS_URL to be set.".to_owned(),
536                ));
537            }
538            Ok(Some(ResponseCacheStrategy::Redis))
539        }
540        CachingStrategy::RedisOrMemory => {
541            if !has_redis_url(redis_url) {
542                return Ok(Some(ResponseCacheStrategy::Memory));
543            }
544            Ok(Some(ResponseCacheStrategy::RedisOrMemory))
545        }
546        CachingStrategy::Memory => Ok(Some(ResponseCacheStrategy::Memory)),
547        _ => Ok(None),
548    }
549}
550
551pub fn generate_cache_key(params: &CacheKeyParams) -> LlmixResult<String> {
552    let canonical = params.to_canonical_value();
553    let json = canonical_json::to_string(&canonical)?;
554    let mut hasher = Sha256::new();
555    hasher.update(json.as_bytes());
556    let digest = format!("{:x}", hasher.finalize());
557    Ok(format!("{CACHE_KEY_PREFIX}{digest}"))
558}
559
560fn finite_number(value: Option<f64>) -> Option<Number> {
561    let value = value?;
562    canonical_number_from_f64(value)
563}
564
565fn canonical_number_from_f64(value: f64) -> Option<Number> {
566    if !value.is_finite() {
567        return None;
568    }
569
570    if value == 0.0 {
571        return Some(Number::from(0));
572    }
573
574    if value.fract() == 0.0 {
575        if value.is_sign_positive() && value <= u64::MAX as f64 {
576            let integer = value as u64;
577            if integer as f64 == value {
578                return Some(Number::from(integer));
579            }
580        }
581
582        if value >= i64::MIN as f64 && value <= i64::MAX as f64 {
583            let integer = value as i64;
584            if integer as f64 == value {
585                return Some(Number::from(integer));
586            }
587        }
588    }
589
590    Number::from_f64(value)
591}
592
593fn normalize_cached_at_seconds(raw: Option<f64>) -> f64 {
594    let Some(raw) = raw else {
595        return now_seconds_f64();
596    };
597    if !raw.is_finite() || raw <= 0.0 {
598        return now_seconds_f64();
599    }
600    if raw > 1_000_000_000_000.0 {
601        return raw / 1000.0;
602    }
603    raw
604}
605
606fn cache_age_seconds(cached_at: f64) -> f64 {
607    let age = now_seconds_f64() - cached_at;
608    if age.is_sign_negative() {
609        0.0
610    } else {
611        age
612    }
613}
614
615fn now_seconds_f64() -> f64 {
616    SystemTime::now()
617        .duration_since(UNIX_EPOCH)
618        .expect("system clock should be after unix epoch")
619        .as_secs_f64()
620}
621
622#[cfg(feature = "redis")]
623fn map_redis_error(error: redis::RedisError) -> LlmixError {
624    LlmixError::Redis(error.to_string())
625}
626
627#[cfg(test)]
628mod tests {
629    use super::{
630        generate_cache_key, CacheKeyParams, L2CacheBackend, ResponseCacheStrategy, TwoTierCache,
631        TwoTierCacheConfig, CACHE_KEY_PREFIX,
632    };
633    use crate::types::{CacheHitTier, CachingStrategy};
634    use async_trait::async_trait;
635    use serde_json::{json, Value};
636    use std::collections::HashMap;
637    use std::sync::Arc;
638    use tokio::sync::Mutex;
639
640    #[derive(Default)]
641    struct MockBackend {
642        store: Mutex<HashMap<String, String>>,
643        fail_writes: Mutex<u32>,
644        fail_clears: Mutex<bool>,
645    }
646
647    #[async_trait]
648    impl L2CacheBackend for MockBackend {
649        async fn get(&self, key: &str) -> crate::error::LlmixResult<Option<String>> {
650            Ok(self.store.lock().await.get(key).cloned())
651        }
652
653        async fn setex(
654            &self,
655            key: &str,
656            _ttl_seconds: u64,
657            value: String,
658        ) -> crate::error::LlmixResult<()> {
659            let mut remaining_failures = self.fail_writes.lock().await;
660            if *remaining_failures > 0 {
661                *remaining_failures -= 1;
662                return Err(crate::error::LlmixError::InvalidResponseCacheConfig(
663                    "simulated write failure".to_owned(),
664                ));
665            }
666            self.store.lock().await.insert(key.to_owned(), value);
667            Ok(())
668        }
669
670        async fn clear_prefix(&self, prefix: &str) -> crate::error::LlmixResult<()> {
671            if *self.fail_clears.lock().await {
672                return Err(crate::error::LlmixError::InvalidResponseCacheConfig(
673                    "simulated clear failure".to_owned(),
674                ));
675            }
676            self.store
677                .lock()
678                .await
679                .retain(|key, _| !key.starts_with(prefix));
680            Ok(())
681        }
682    }
683
684    #[tokio::test]
685    async fn l2_write_payload_uses_seconds_and_snake_case() {
686        let backend = Arc::new(MockBackend::default());
687        let cache = TwoTierCache::new_with_backend(
688            ResponseCacheStrategy::Redis,
689            TwoTierCacheConfig {
690                max_items: 10,
691                ttl_seconds: 60,
692                redis_url: Some("redis://localhost:6379".to_owned()),
693            },
694            backend.clone(),
695        )
696        .expect("cache should construct");
697
698        cache.set("redis-seconds", "value").await;
699
700        let raw = backend
701            .store
702            .lock()
703            .await
704            .get("redis-seconds")
705            .cloned()
706            .expect("mock backend should have stored payload");
707        let payload: Value = serde_json::from_str(&raw).expect("payload should be valid json");
708        assert_eq!(payload["data"], "value");
709        let cached_at = payload["cached_at"]
710            .as_f64()
711            .expect("cached_at should be seconds");
712        assert!(cached_at > 1_000_000_000.0);
713        assert!(cached_at < 1_000_000_000_000.0);
714    }
715
716    #[tokio::test]
717    async fn l2_get_accepts_legacy_millisecond_timestamps() {
718        let backend = Arc::new(MockBackend::default());
719        let fresh_legacy_ms = (super::now_seconds_f64() as u64).saturating_sub(1) * 1000;
720        backend.store.lock().await.insert(
721            "legacy-ms".to_owned(),
722            format!(
723                r#"{{"data":"legacy-value","cached_at":{}}}"#,
724                fresh_legacy_ms
725            ),
726        );
727        let cache = TwoTierCache::new_with_backend(
728            ResponseCacheStrategy::RedisOrMemory,
729            TwoTierCacheConfig {
730                max_items: 10,
731                ttl_seconds: 60,
732                redis_url: Some("redis://localhost:6379".to_owned()),
733            },
734            backend,
735        )
736        .expect("cache should construct");
737
738        let hit = cache.get("legacy-ms").await.expect("l2 should hit");
739        assert_eq!(hit.value, "legacy-value");
740        assert_eq!(hit.tier, CacheHitTier::L2);
741    }
742
743    #[tokio::test]
744    async fn third_consecutive_l2_write_failure_marks_backend_unhealthy() {
745        let backend = Arc::new(MockBackend::default());
746        *backend.fail_writes.lock().await = 3;
747
748        let cache = TwoTierCache::new_with_backend(
749            ResponseCacheStrategy::RedisOrMemory,
750            TwoTierCacheConfig {
751                max_items: 10,
752                ttl_seconds: 60,
753                redis_url: Some("redis://localhost:6379".to_owned()),
754            },
755            backend,
756        )
757        .expect("cache should construct");
758
759        cache.set("key1", "value1").await;
760        assert!(cache.get_stats().await.l2_healthy);
761        cache.set("key2", "value2").await;
762        assert!(cache.get_stats().await.l2_healthy);
763        cache.set("key3", "value3").await;
764        assert!(!cache.get_stats().await.l2_healthy);
765    }
766
767    #[tokio::test]
768    async fn clear_removes_l2_entries_before_they_can_repopulate_l1() {
769        let backend = Arc::new(MockBackend::default());
770        let cache = TwoTierCache::new_with_backend(
771            ResponseCacheStrategy::RedisOrMemory,
772            TwoTierCacheConfig {
773                max_items: 10,
774                ttl_seconds: 60,
775                redis_url: Some("redis://localhost:6379".to_owned()),
776            },
777            backend.clone(),
778        )
779        .expect("cache should construct");
780        let key = format!("{CACHE_KEY_PREFIX}clear-me");
781        backend.store.lock().await.insert(
782            key.clone(),
783            format!(
784                r#"{{"data":"stale-value","cached_at":{}}}"#,
785                super::now_seconds_f64()
786            ),
787        );
788
789        let hit = cache.get(&key).await.expect("l2 should populate l1");
790        assert_eq!(hit.tier, CacheHitTier::L2);
791
792        cache.clear().await.expect("clear should remove l2 entries");
793
794        assert!(cache.get(&key).await.is_none());
795        assert!(!backend.store.lock().await.contains_key(&key));
796    }
797
798    #[tokio::test]
799    async fn failed_clear_blocks_l2_reads_until_a_clear_succeeds() {
800        let backend = Arc::new(MockBackend::default());
801        let cache = TwoTierCache::new_with_backend(
802            ResponseCacheStrategy::RedisOrMemory,
803            TwoTierCacheConfig {
804                max_items: 10,
805                ttl_seconds: 60,
806                redis_url: Some("redis://localhost:6379".to_owned()),
807            },
808            backend.clone(),
809        )
810        .expect("cache should construct");
811        let key = format!("{CACHE_KEY_PREFIX}stale-after-clear-failure");
812        backend.store.lock().await.insert(
813            key.clone(),
814            format!(
815                r#"{{"data":"stale-value","cached_at":{}}}"#,
816                super::now_seconds_f64()
817            ),
818        );
819        *backend.fail_clears.lock().await = true;
820
821        let error = cache
822            .clear()
823            .await
824            .expect_err("clear failure should be surfaced");
825        assert!(matches!(
826            error,
827            crate::error::LlmixError::InvalidResponseCacheConfig(_)
828        ));
829        assert!(cache.get(&key).await.is_none());
830
831        *backend.fail_clears.lock().await = false;
832        cache
833            .clear()
834            .await
835            .expect("later clear should recover l2 reads");
836        assert!(cache.get_stats().await.l2_healthy);
837    }
838
839    #[test]
840    fn non_finite_temperatures_collapse_to_same_cache_key_as_absent() {
841        let absent = CacheKeyParams {
842            provider: "openai".to_owned(),
843            model: "gpt-4.1".to_owned(),
844            messages: vec![json!({"role": "user", "content": "Hello"})],
845            base_url: None,
846            enable_thinking: None,
847            temperature: None,
848            max_output_tokens: None,
849            response_format: None,
850            provider_options: None,
851            seed: None,
852            top_p: None,
853            top_k: None,
854            presence_penalty: None,
855            frequency_penalty: None,
856            stop_sequences: None,
857        };
858        let nan = CacheKeyParams {
859            temperature: Some(f64::NAN),
860            ..absent.clone()
861        };
862        let inf = CacheKeyParams {
863            temperature: Some(f64::INFINITY),
864            ..absent.clone()
865        };
866
867        let absent_key = generate_cache_key(&absent).expect("key should serialize");
868        let nan_key = generate_cache_key(&nan).expect("nan should be omitted");
869        let inf_key = generate_cache_key(&inf).expect("inf should be omitted");
870
871        assert_eq!(absent_key, nan_key);
872        assert_eq!(absent_key, inf_key);
873        assert!(absent_key.starts_with(CACHE_KEY_PREFIX));
874    }
875
876    #[test]
877    fn strategy_helpers_match_contract() {
878        assert!(super::is_response_cache_strategy(CachingStrategy::Redis));
879        assert!(super::is_response_cache_strategy(
880            CachingStrategy::RedisOrMemory
881        ));
882        assert!(super::is_response_cache_strategy(CachingStrategy::Memory));
883        assert!(!super::is_response_cache_strategy(
884            CachingStrategy::Disabled
885        ));
886
887        assert!(super::should_skip_cache(CachingStrategy::Native));
888        assert!(super::should_skip_cache(CachingStrategy::Gateway));
889        assert!(super::should_skip_cache(CachingStrategy::Disabled));
890        assert!(!super::should_skip_cache(CachingStrategy::Redis));
891    }
892}