1use crate::canonical_json;
2use crate::error::{LlmixError, LlmixResult};
3use crate::types::{CacheHitTier, CachingStrategy, ResponseCacheStats, ResponseCacheStrategy};
4use async_trait::async_trait;
5use lru::LruCache;
6#[cfg(feature = "redis")]
7use redis::{aio::MultiplexedConnection, AsyncCommands, Client};
8use serde::{Deserialize, Serialize};
9use serde_json::{Map, Number, Value};
10use sha2::{Digest, Sha256};
11use std::collections::HashMap;
12use std::num::NonZeroUsize;
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15use tokio::sync::Mutex;
16
17pub const CACHE_KEY_PREFIX: &str = "llmix:resp:v2:";
18const DEFAULT_L1_MAX: usize = 1000;
19const DEFAULT_TTL_SECONDS: u64 = 3600;
20const CACHE_KEY_FIELDS: [&str; 15] = [
21 "baseUrl",
22 "enableThinking",
23 "frequencyPenalty",
24 "maxOutputTokens",
25 "messages",
26 "model",
27 "presencePenalty",
28 "provider",
29 "providerOptions",
30 "responseFormat",
31 "seed",
32 "stopSequences",
33 "temperature",
34 "topK",
35 "topP",
36];
37
38fn has_redis_url(redis_url: Option<&str>) -> bool {
39 redis_url.is_some_and(|value| !value.trim().is_empty())
40}
41
42#[derive(Debug, Clone, PartialEq, Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CacheKeyParams {
45 pub provider: String,
46 pub model: String,
47 pub messages: Vec<Value>,
48 #[serde(default)]
49 pub base_url: Option<String>,
50 #[serde(default)]
51 pub enable_thinking: Option<bool>,
52 #[serde(default)]
53 pub temperature: Option<f64>,
54 #[serde(default)]
55 pub max_output_tokens: Option<u64>,
56 #[serde(default)]
57 pub response_format: Option<Value>,
58 #[serde(default)]
59 pub provider_options: Option<Value>,
60 #[serde(default)]
61 pub seed: Option<i64>,
62 #[serde(default)]
63 pub top_p: Option<f64>,
64 #[serde(default)]
65 pub top_k: Option<i64>,
66 #[serde(default)]
67 pub presence_penalty: Option<f64>,
68 #[serde(default)]
69 pub frequency_penalty: Option<f64>,
70 #[serde(default)]
71 pub stop_sequences: Option<Vec<String>>,
72}
73
74impl CacheKeyParams {
75 fn to_canonical_value(&self) -> Value {
76 let mut fields = HashMap::new();
77
78 fields.insert("provider", Value::String(self.provider.clone()));
79 fields.insert("model", Value::String(self.model.clone()));
80 fields.insert("messages", Value::Array(self.messages.clone()));
81
82 if let Some(base_url) = self.base_url.as_ref() {
83 fields.insert("baseUrl", Value::String(base_url.clone()));
84 }
85 if let Some(enable_thinking) = self.enable_thinking {
86 fields.insert("enableThinking", Value::Bool(enable_thinking));
87 }
88 if let Some(value) = self.max_output_tokens {
89 fields.insert("maxOutputTokens", Value::Number(Number::from(value)));
90 }
91 if let Some(value) = self
92 .response_format
93 .clone()
94 .filter(|value| !value.is_null())
95 {
96 fields.insert("responseFormat", value);
97 }
98 if let Some(value) = self
99 .provider_options
100 .clone()
101 .filter(|value| !value.is_null())
102 {
103 fields.insert("providerOptions", value);
104 }
105 if let Some(value) = self.seed {
106 fields.insert("seed", Value::Number(Number::from(value)));
107 }
108 if let Some(value) = finite_number(self.temperature) {
109 fields.insert("temperature", Value::Number(value));
110 }
111 if let Some(value) = finite_number(self.top_p) {
112 fields.insert("topP", Value::Number(value));
113 }
114 if let Some(value) = self.top_k {
115 fields.insert("topK", Value::Number(Number::from(value)));
116 }
117 if let Some(value) = finite_number(self.presence_penalty) {
118 fields.insert("presencePenalty", Value::Number(value));
119 }
120 if let Some(value) = finite_number(self.frequency_penalty) {
121 fields.insert("frequencyPenalty", Value::Number(value));
122 }
123 if let Some(stop_sequences) = self.stop_sequences.as_ref() {
124 let array = stop_sequences
125 .iter()
126 .map(|value| Value::String(value.clone()))
127 .collect::<Vec<_>>();
128 fields.insert("stopSequences", Value::Array(array));
129 }
130
131 let mut map = Map::new();
132 for field in CACHE_KEY_FIELDS {
133 if let Some(value) = fields.remove(field) {
134 map.insert(field.to_owned(), value);
135 }
136 }
137 Value::Object(map)
138 }
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct CacheResult {
143 pub value: String,
144 pub tier: CacheHitTier,
145}
146
147#[derive(Debug, Clone, PartialEq)]
148struct CachedValue {
149 data: String,
150 cached_at: f64,
151}
152
153#[derive(Debug, Clone)]
154pub struct TwoTierCacheConfig {
155 pub max_items: usize,
156 pub ttl_seconds: u64,
157 pub redis_url: Option<String>,
158}
159
160impl Default for TwoTierCacheConfig {
161 fn default() -> Self {
162 Self {
163 max_items: DEFAULT_L1_MAX,
164 ttl_seconds: DEFAULT_TTL_SECONDS,
165 redis_url: None,
166 }
167 }
168}
169
170#[derive(Debug, Serialize)]
171struct RedisPayload<'a> {
172 data: &'a str,
173 cached_at: f64,
174}
175
176#[derive(Debug, Deserialize)]
177struct StoredPayload {
178 data: String,
179 #[serde(default, alias = "cachedAt")]
180 cached_at: Option<f64>,
181}
182
183#[async_trait]
184trait L2CacheBackend: Send + Sync {
185 async fn get(&self, key: &str) -> LlmixResult<Option<String>>;
186 async fn setex(&self, key: &str, ttl_seconds: u64, value: String) -> LlmixResult<()>;
187 async fn clear_prefix(&self, prefix: &str) -> LlmixResult<()>;
188 async fn close(&self) -> LlmixResult<()> {
189 Ok(())
190 }
191}
192
193#[cfg(feature = "redis")]
194struct RedisBackend {
195 client: Client,
196 connection: Mutex<Option<MultiplexedConnection>>,
197}
198
199#[cfg(feature = "redis")]
200impl RedisBackend {
201 fn new(redis_url: &str) -> LlmixResult<Self> {
202 let client = Client::open(redis_url)
203 .map_err(|error| LlmixError::Redis(format!("invalid redis url: {error}")))?;
204 Ok(Self {
205 client,
206 connection: Mutex::new(None),
207 })
208 }
209
210 async fn connection(&self) -> LlmixResult<MultiplexedConnection> {
211 let mut cached = self.connection.lock().await;
212 if let Some(connection) = cached.as_ref() {
213 return Ok(connection.clone());
214 }
215
216 let connection = self
217 .client
218 .get_multiplexed_async_connection()
219 .await
220 .map_err(map_redis_error)?;
221 *cached = Some(connection.clone());
222 Ok(connection)
223 }
224
225 async fn reset_connection(&self) {
226 *self.connection.lock().await = None;
227 }
228}
229
230#[cfg(feature = "redis")]
231#[async_trait]
232impl L2CacheBackend for RedisBackend {
233 async fn get(&self, key: &str) -> LlmixResult<Option<String>> {
234 let mut connection = self.connection().await?;
235 match connection.get(key).await {
236 Ok(value) => Ok(value),
237 Err(error) => {
238 self.reset_connection().await;
239 Err(map_redis_error(error))
240 }
241 }
242 }
243
244 async fn setex(&self, key: &str, ttl_seconds: u64, value: String) -> LlmixResult<()> {
245 let mut connection = self.connection().await?;
246 match connection.set_ex(key, value, ttl_seconds).await {
247 Ok(()) => Ok(()),
248 Err(error) => {
249 self.reset_connection().await;
250 Err(map_redis_error(error))
251 }
252 }
253 }
254
255 async fn clear_prefix(&self, prefix: &str) -> LlmixResult<()> {
256 let mut connection = self.connection().await?;
257 let pattern = format!("{prefix}*");
258 let mut cursor = 0_u64;
259
260 loop {
261 let (next_cursor, keys): (u64, Vec<String>) = match redis::cmd("SCAN")
262 .arg(cursor)
263 .arg("MATCH")
264 .arg(&pattern)
265 .arg("COUNT")
266 .arg(1000_u32)
267 .query_async(&mut connection)
268 .await
269 {
270 Ok(result) => result,
271 Err(error) => {
272 self.reset_connection().await;
273 return Err(map_redis_error(error));
274 }
275 };
276
277 if !keys.is_empty() {
278 let deleted: redis::RedisResult<()> = redis::cmd("DEL")
279 .arg(&keys)
280 .query_async(&mut connection)
281 .await;
282 if let Err(error) = deleted {
283 self.reset_connection().await;
284 return Err(map_redis_error(error));
285 }
286 }
287
288 if next_cursor == 0 {
289 return Ok(());
290 }
291 cursor = next_cursor;
292 }
293 }
294
295 async fn close(&self) -> LlmixResult<()> {
296 self.reset_connection().await;
297 Ok(())
298 }
299}
300
301pub struct TwoTierCache {
302 strategy: ResponseCacheStrategy,
303 ttl_seconds: u64,
304 l1_max: usize,
305 l1: Mutex<LruCache<String, CachedValue>>,
306 l2_enabled: bool,
307 l2_healthy: Mutex<bool>,
308 l2_reads_blocked: Mutex<bool>,
309 l2_consecutive_write_failures: Mutex<u32>,
310 backend: Option<Arc<dyn L2CacheBackend>>,
311}
312
313impl TwoTierCache {
314 pub fn new(strategy: ResponseCacheStrategy, config: TwoTierCacheConfig) -> LlmixResult<Self> {
315 let redis_requested = has_redis_url(config.redis_url.as_deref());
316 if strategy == ResponseCacheStrategy::Redis && !redis_requested {
317 return Err(LlmixError::InvalidResponseCacheConfig(
318 "TwoTierCache strategy \"redis\" requires config.redis_url to be set.".to_owned(),
319 ));
320 }
321
322 let max_items = NonZeroUsize::new(config.max_items.max(1)).expect("max(1) is non-zero");
323 #[allow(unused_mut)]
324 let mut backend: Option<Arc<dyn L2CacheBackend>> = None;
325
326 #[cfg(feature = "redis")]
327 if strategy != ResponseCacheStrategy::Memory && redis_requested {
328 let redis_url = config
329 .redis_url
330 .as_deref()
331 .expect("redis_requested implies redis_url is present");
332 backend = Some(Arc::new(RedisBackend::new(redis_url)?));
333 }
334
335 #[cfg(not(feature = "redis"))]
336 if strategy == ResponseCacheStrategy::Redis && redis_requested {
337 return Err(LlmixError::InvalidResponseCacheConfig(
338 "TwoTierCache strategy \"redis\" requires llmix-rs to be built with the `redis` feature.".to_owned(),
339 ));
340 }
341
342 let l2_enabled = strategy != ResponseCacheStrategy::Memory && backend.is_some();
343
344 Ok(Self {
345 strategy,
346 ttl_seconds: config.ttl_seconds.max(1),
347 l1_max: max_items.get(),
348 l1: Mutex::new(LruCache::new(max_items)),
349 l2_enabled,
350 l2_healthy: Mutex::new(true),
351 l2_reads_blocked: Mutex::new(false),
352 l2_consecutive_write_failures: Mutex::new(0),
353 backend,
354 })
355 }
356
357 #[cfg(test)]
358 fn new_with_backend(
359 strategy: ResponseCacheStrategy,
360 config: TwoTierCacheConfig,
361 backend: Arc<dyn L2CacheBackend>,
362 ) -> LlmixResult<Self> {
363 let max_items = NonZeroUsize::new(config.max_items.max(1)).expect("max(1) is non-zero");
364 Ok(Self {
365 strategy,
366 ttl_seconds: config.ttl_seconds.max(1),
367 l1_max: max_items.get(),
368 l1: Mutex::new(LruCache::new(max_items)),
369 l2_enabled: strategy != ResponseCacheStrategy::Memory,
370 l2_healthy: Mutex::new(true),
371 l2_reads_blocked: Mutex::new(false),
372 l2_consecutive_write_failures: Mutex::new(0),
373 backend: Some(backend),
374 })
375 }
376
377 pub async fn get(&self, key: &str) -> Option<CacheResult> {
378 if let Some(hit) = self.get_l1(key).await {
379 return Some(CacheResult {
380 value: hit.data,
381 tier: CacheHitTier::L1,
382 });
383 }
384
385 if !self.l2_enabled {
386 return None;
387 }
388 if *self.l2_reads_blocked.lock().await {
389 return None;
390 }
391
392 let backend = self.backend.as_ref()?;
393 let raw = match backend.get(key).await {
394 Ok(value) => value?,
395 Err(_) => return None,
396 };
397 let parsed = serde_json::from_str::<StoredPayload>(&raw).ok()?;
398 let cached_at = normalize_cached_at_seconds(parsed.cached_at);
399 if cache_age_seconds(cached_at) >= self.ttl_seconds as f64 {
400 return None;
401 }
402
403 let entry = CachedValue {
404 data: parsed.data,
405 cached_at,
406 };
407 self.put_l1(key, entry.clone()).await;
408
409 Some(CacheResult {
410 value: entry.data,
411 tier: CacheHitTier::L2,
412 })
413 }
414
415 pub async fn set(&self, key: &str, value: &str) {
416 let entry = CachedValue {
417 data: value.to_owned(),
418 cached_at: now_seconds_f64(),
419 };
420 self.put_l1(key, entry.clone()).await;
421
422 if !self.l2_enabled {
423 return;
424 }
425
426 let Some(backend) = self.backend.as_ref() else {
427 return;
428 };
429
430 let ttl = self.ttl_seconds;
432 let payload = serde_json::to_string(&RedisPayload {
433 data: &entry.data,
434 cached_at: entry.cached_at,
435 });
436 let Ok(payload) = payload else {
437 return;
438 };
439
440 match backend.setex(key, ttl, payload).await {
441 Ok(()) => {
442 *self.l2_consecutive_write_failures.lock().await = 0;
443 *self.l2_healthy.lock().await = true;
444 }
445 Err(_) => {
446 let mut failures = self.l2_consecutive_write_failures.lock().await;
447 *failures += 1;
448 if *failures >= 3 {
449 *self.l2_healthy.lock().await = false;
450 }
451 }
452 }
453 }
454
455 pub async fn clear(&self) -> LlmixResult<()> {
456 self.l1.lock().await.clear();
457 if !self.l2_enabled {
458 return Ok(());
459 }
460 let Some(backend) = self.backend.as_ref() else {
461 return Ok(());
462 };
463 match backend.clear_prefix(CACHE_KEY_PREFIX).await {
464 Ok(()) => {
465 *self.l2_reads_blocked.lock().await = false;
466 *self.l2_healthy.lock().await = true;
467 Ok(())
468 }
469 Err(error) => {
470 *self.l2_reads_blocked.lock().await = true;
471 *self.l2_healthy.lock().await = false;
472 Err(error)
473 }
474 }
475 }
476
477 pub async fn close(&self) {
478 if let Some(backend) = self.backend.as_ref() {
479 let _ = backend.close().await;
480 }
481 }
482
483 pub async fn get_stats(&self) -> ResponseCacheStats {
484 let l1_size = self.l1.lock().await.len();
485 ResponseCacheStats {
486 l1_size,
487 l1_max: self.l1_max,
488 l2_enabled: self.l2_enabled,
489 l2_healthy: *self.l2_healthy.lock().await,
490 strategy: self.strategy,
491 }
492 }
493
494 async fn get_l1(&self, key: &str) -> Option<CachedValue> {
495 let mut l1 = self.l1.lock().await;
496 let cached = l1.get(key).cloned()?;
497 if cache_age_seconds(cached.cached_at) >= self.ttl_seconds as f64 {
498 l1.pop(key);
499 return None;
500 }
501 Some(cached)
502 }
503
504 async fn put_l1(&self, key: &str, value: CachedValue) {
505 self.l1.lock().await.put(key.to_owned(), value);
506 }
507}
508
509pub fn is_response_cache_strategy(strategy: CachingStrategy) -> bool {
510 matches!(
511 strategy,
512 CachingStrategy::Redis | CachingStrategy::RedisOrMemory | CachingStrategy::Memory
513 )
514}
515
516pub fn should_skip_cache(strategy: CachingStrategy) -> bool {
517 matches!(
518 strategy,
519 CachingStrategy::Native | CachingStrategy::Gateway | CachingStrategy::Disabled
520 )
521}
522
523pub fn resolve_response_cache_strategy(
524 strategy: CachingStrategy,
525 redis_url: Option<&str>,
526) -> LlmixResult<Option<ResponseCacheStrategy>> {
527 if !is_response_cache_strategy(strategy) {
528 return Ok(None);
529 }
530
531 match strategy {
532 CachingStrategy::Redis => {
533 if !has_redis_url(redis_url) {
534 return Err(LlmixError::InvalidResponseCacheConfig(
535 "Response cache strategy \"redis\" requires REDIS_URL to be set.".to_owned(),
536 ));
537 }
538 Ok(Some(ResponseCacheStrategy::Redis))
539 }
540 CachingStrategy::RedisOrMemory => {
541 if !has_redis_url(redis_url) {
542 return Ok(Some(ResponseCacheStrategy::Memory));
543 }
544 Ok(Some(ResponseCacheStrategy::RedisOrMemory))
545 }
546 CachingStrategy::Memory => Ok(Some(ResponseCacheStrategy::Memory)),
547 _ => Ok(None),
548 }
549}
550
551pub fn generate_cache_key(params: &CacheKeyParams) -> LlmixResult<String> {
552 let canonical = params.to_canonical_value();
553 let json = canonical_json::to_string(&canonical)?;
554 let mut hasher = Sha256::new();
555 hasher.update(json.as_bytes());
556 let digest = format!("{:x}", hasher.finalize());
557 Ok(format!("{CACHE_KEY_PREFIX}{digest}"))
558}
559
560fn finite_number(value: Option<f64>) -> Option<Number> {
561 let value = value?;
562 canonical_number_from_f64(value)
563}
564
565fn canonical_number_from_f64(value: f64) -> Option<Number> {
566 if !value.is_finite() {
567 return None;
568 }
569
570 if value == 0.0 {
571 return Some(Number::from(0));
572 }
573
574 if value.fract() == 0.0 {
575 if value.is_sign_positive() && value <= u64::MAX as f64 {
576 let integer = value as u64;
577 if integer as f64 == value {
578 return Some(Number::from(integer));
579 }
580 }
581
582 if value >= i64::MIN as f64 && value <= i64::MAX as f64 {
583 let integer = value as i64;
584 if integer as f64 == value {
585 return Some(Number::from(integer));
586 }
587 }
588 }
589
590 Number::from_f64(value)
591}
592
593fn normalize_cached_at_seconds(raw: Option<f64>) -> f64 {
594 let Some(raw) = raw else {
595 return now_seconds_f64();
596 };
597 if !raw.is_finite() || raw <= 0.0 {
598 return now_seconds_f64();
599 }
600 if raw > 1_000_000_000_000.0 {
601 return raw / 1000.0;
602 }
603 raw
604}
605
606fn cache_age_seconds(cached_at: f64) -> f64 {
607 let age = now_seconds_f64() - cached_at;
608 if age.is_sign_negative() {
609 0.0
610 } else {
611 age
612 }
613}
614
615fn now_seconds_f64() -> f64 {
616 SystemTime::now()
617 .duration_since(UNIX_EPOCH)
618 .expect("system clock should be after unix epoch")
619 .as_secs_f64()
620}
621
622#[cfg(feature = "redis")]
623fn map_redis_error(error: redis::RedisError) -> LlmixError {
624 LlmixError::Redis(error.to_string())
625}
626
627#[cfg(test)]
628mod tests {
629 use super::{
630 generate_cache_key, CacheKeyParams, L2CacheBackend, ResponseCacheStrategy, TwoTierCache,
631 TwoTierCacheConfig, CACHE_KEY_PREFIX,
632 };
633 use crate::types::{CacheHitTier, CachingStrategy};
634 use async_trait::async_trait;
635 use serde_json::{json, Value};
636 use std::collections::HashMap;
637 use std::sync::Arc;
638 use tokio::sync::Mutex;
639
640 #[derive(Default)]
641 struct MockBackend {
642 store: Mutex<HashMap<String, String>>,
643 fail_writes: Mutex<u32>,
644 fail_clears: Mutex<bool>,
645 }
646
647 #[async_trait]
648 impl L2CacheBackend for MockBackend {
649 async fn get(&self, key: &str) -> crate::error::LlmixResult<Option<String>> {
650 Ok(self.store.lock().await.get(key).cloned())
651 }
652
653 async fn setex(
654 &self,
655 key: &str,
656 _ttl_seconds: u64,
657 value: String,
658 ) -> crate::error::LlmixResult<()> {
659 let mut remaining_failures = self.fail_writes.lock().await;
660 if *remaining_failures > 0 {
661 *remaining_failures -= 1;
662 return Err(crate::error::LlmixError::InvalidResponseCacheConfig(
663 "simulated write failure".to_owned(),
664 ));
665 }
666 self.store.lock().await.insert(key.to_owned(), value);
667 Ok(())
668 }
669
670 async fn clear_prefix(&self, prefix: &str) -> crate::error::LlmixResult<()> {
671 if *self.fail_clears.lock().await {
672 return Err(crate::error::LlmixError::InvalidResponseCacheConfig(
673 "simulated clear failure".to_owned(),
674 ));
675 }
676 self.store
677 .lock()
678 .await
679 .retain(|key, _| !key.starts_with(prefix));
680 Ok(())
681 }
682 }
683
684 #[tokio::test]
685 async fn l2_write_payload_uses_seconds_and_snake_case() {
686 let backend = Arc::new(MockBackend::default());
687 let cache = TwoTierCache::new_with_backend(
688 ResponseCacheStrategy::Redis,
689 TwoTierCacheConfig {
690 max_items: 10,
691 ttl_seconds: 60,
692 redis_url: Some("redis://localhost:6379".to_owned()),
693 },
694 backend.clone(),
695 )
696 .expect("cache should construct");
697
698 cache.set("redis-seconds", "value").await;
699
700 let raw = backend
701 .store
702 .lock()
703 .await
704 .get("redis-seconds")
705 .cloned()
706 .expect("mock backend should have stored payload");
707 let payload: Value = serde_json::from_str(&raw).expect("payload should be valid json");
708 assert_eq!(payload["data"], "value");
709 let cached_at = payload["cached_at"]
710 .as_f64()
711 .expect("cached_at should be seconds");
712 assert!(cached_at > 1_000_000_000.0);
713 assert!(cached_at < 1_000_000_000_000.0);
714 }
715
716 #[tokio::test]
717 async fn l2_get_accepts_legacy_millisecond_timestamps() {
718 let backend = Arc::new(MockBackend::default());
719 let fresh_legacy_ms = (super::now_seconds_f64() as u64).saturating_sub(1) * 1000;
720 backend.store.lock().await.insert(
721 "legacy-ms".to_owned(),
722 format!(
723 r#"{{"data":"legacy-value","cached_at":{}}}"#,
724 fresh_legacy_ms
725 ),
726 );
727 let cache = TwoTierCache::new_with_backend(
728 ResponseCacheStrategy::RedisOrMemory,
729 TwoTierCacheConfig {
730 max_items: 10,
731 ttl_seconds: 60,
732 redis_url: Some("redis://localhost:6379".to_owned()),
733 },
734 backend,
735 )
736 .expect("cache should construct");
737
738 let hit = cache.get("legacy-ms").await.expect("l2 should hit");
739 assert_eq!(hit.value, "legacy-value");
740 assert_eq!(hit.tier, CacheHitTier::L2);
741 }
742
743 #[tokio::test]
744 async fn third_consecutive_l2_write_failure_marks_backend_unhealthy() {
745 let backend = Arc::new(MockBackend::default());
746 *backend.fail_writes.lock().await = 3;
747
748 let cache = TwoTierCache::new_with_backend(
749 ResponseCacheStrategy::RedisOrMemory,
750 TwoTierCacheConfig {
751 max_items: 10,
752 ttl_seconds: 60,
753 redis_url: Some("redis://localhost:6379".to_owned()),
754 },
755 backend,
756 )
757 .expect("cache should construct");
758
759 cache.set("key1", "value1").await;
760 assert!(cache.get_stats().await.l2_healthy);
761 cache.set("key2", "value2").await;
762 assert!(cache.get_stats().await.l2_healthy);
763 cache.set("key3", "value3").await;
764 assert!(!cache.get_stats().await.l2_healthy);
765 }
766
767 #[tokio::test]
768 async fn clear_removes_l2_entries_before_they_can_repopulate_l1() {
769 let backend = Arc::new(MockBackend::default());
770 let cache = TwoTierCache::new_with_backend(
771 ResponseCacheStrategy::RedisOrMemory,
772 TwoTierCacheConfig {
773 max_items: 10,
774 ttl_seconds: 60,
775 redis_url: Some("redis://localhost:6379".to_owned()),
776 },
777 backend.clone(),
778 )
779 .expect("cache should construct");
780 let key = format!("{CACHE_KEY_PREFIX}clear-me");
781 backend.store.lock().await.insert(
782 key.clone(),
783 format!(
784 r#"{{"data":"stale-value","cached_at":{}}}"#,
785 super::now_seconds_f64()
786 ),
787 );
788
789 let hit = cache.get(&key).await.expect("l2 should populate l1");
790 assert_eq!(hit.tier, CacheHitTier::L2);
791
792 cache.clear().await.expect("clear should remove l2 entries");
793
794 assert!(cache.get(&key).await.is_none());
795 assert!(!backend.store.lock().await.contains_key(&key));
796 }
797
798 #[tokio::test]
799 async fn failed_clear_blocks_l2_reads_until_a_clear_succeeds() {
800 let backend = Arc::new(MockBackend::default());
801 let cache = TwoTierCache::new_with_backend(
802 ResponseCacheStrategy::RedisOrMemory,
803 TwoTierCacheConfig {
804 max_items: 10,
805 ttl_seconds: 60,
806 redis_url: Some("redis://localhost:6379".to_owned()),
807 },
808 backend.clone(),
809 )
810 .expect("cache should construct");
811 let key = format!("{CACHE_KEY_PREFIX}stale-after-clear-failure");
812 backend.store.lock().await.insert(
813 key.clone(),
814 format!(
815 r#"{{"data":"stale-value","cached_at":{}}}"#,
816 super::now_seconds_f64()
817 ),
818 );
819 *backend.fail_clears.lock().await = true;
820
821 let error = cache
822 .clear()
823 .await
824 .expect_err("clear failure should be surfaced");
825 assert!(matches!(
826 error,
827 crate::error::LlmixError::InvalidResponseCacheConfig(_)
828 ));
829 assert!(cache.get(&key).await.is_none());
830
831 *backend.fail_clears.lock().await = false;
832 cache
833 .clear()
834 .await
835 .expect("later clear should recover l2 reads");
836 assert!(cache.get_stats().await.l2_healthy);
837 }
838
839 #[test]
840 fn non_finite_temperatures_collapse_to_same_cache_key_as_absent() {
841 let absent = CacheKeyParams {
842 provider: "openai".to_owned(),
843 model: "gpt-4.1".to_owned(),
844 messages: vec![json!({"role": "user", "content": "Hello"})],
845 base_url: None,
846 enable_thinking: None,
847 temperature: None,
848 max_output_tokens: None,
849 response_format: None,
850 provider_options: None,
851 seed: None,
852 top_p: None,
853 top_k: None,
854 presence_penalty: None,
855 frequency_penalty: None,
856 stop_sequences: None,
857 };
858 let nan = CacheKeyParams {
859 temperature: Some(f64::NAN),
860 ..absent.clone()
861 };
862 let inf = CacheKeyParams {
863 temperature: Some(f64::INFINITY),
864 ..absent.clone()
865 };
866
867 let absent_key = generate_cache_key(&absent).expect("key should serialize");
868 let nan_key = generate_cache_key(&nan).expect("nan should be omitted");
869 let inf_key = generate_cache_key(&inf).expect("inf should be omitted");
870
871 assert_eq!(absent_key, nan_key);
872 assert_eq!(absent_key, inf_key);
873 assert!(absent_key.starts_with(CACHE_KEY_PREFIX));
874 }
875
876 #[test]
877 fn strategy_helpers_match_contract() {
878 assert!(super::is_response_cache_strategy(CachingStrategy::Redis));
879 assert!(super::is_response_cache_strategy(
880 CachingStrategy::RedisOrMemory
881 ));
882 assert!(super::is_response_cache_strategy(CachingStrategy::Memory));
883 assert!(!super::is_response_cache_strategy(
884 CachingStrategy::Disabled
885 ));
886
887 assert!(super::should_skip_cache(CachingStrategy::Native));
888 assert!(super::should_skip_cache(CachingStrategy::Gateway));
889 assert!(super::should_skip_cache(CachingStrategy::Disabled));
890 assert!(!super::should_skip_cache(CachingStrategy::Redis));
891 }
892}