reflex/cache/
tiered.rs

1//! Tiered cache: L1 exact + L2 semantic.
2//!
3//! Use [`TieredCache::lookup`] for the default flow, or
4//! [`TieredCache::lookup_with_semantic_query`] when the exact key and semantic query differ.
5
6use std::sync::Arc;
7
8use tokio::sync::RwLock;
9use tracing::{debug, info, instrument};
10
11#[cfg(any(test, feature = "mock"))]
12use super::l2::L2Config;
13#[cfg(any(test, feature = "mock"))]
14use super::l2::MockStorageLoader;
15use super::l2::{
16    BqSearchBackend, L2CacheError, L2CacheResult, L2LookupResult, L2SemanticCache, StorageLoader,
17};
18
19use super::{L1CacheHandle, L1LookupResult, ReflexStatus};
20use crate::storage::mmap::MmapFileHandle;
21#[cfg(any(test, feature = "mock"))]
22use crate::vectordb::bq::MockBqClient;
23
24#[derive(Debug)]
25/// Result of a tiered cache lookup.
26pub enum TieredLookupResult {
27    /// Exact match in L1.
28    HitL1(L1LookupResult),
29    /// Semantic match candidates from L2.
30    HitL2(L2LookupResult),
31    /// No match in any tier.
32    Miss,
33}
34
35impl TieredLookupResult {
36    /// Returns the [`ReflexStatus`] for this result.
37    pub fn status(&self) -> ReflexStatus {
38        match self {
39            TieredLookupResult::HitL1(result) => result.status(),
40            TieredLookupResult::HitL2(_) => ReflexStatus::HitL2Semantic,
41            TieredLookupResult::Miss => ReflexStatus::Miss,
42        }
43    }
44
45    /// Returns `true` if this is not a miss.
46    pub fn is_hit(&self) -> bool {
47        !matches!(self, TieredLookupResult::Miss)
48    }
49
50    /// Returns `true` if this is an L1 hit.
51    pub fn is_l1_hit(&self) -> bool {
52        matches!(self, TieredLookupResult::HitL1(_))
53    }
54
55    /// Returns `true` if this is an L2 hit.
56    pub fn is_l2_hit(&self) -> bool {
57        matches!(self, TieredLookupResult::HitL2(_))
58    }
59}
60
61/// Two-tier cache combining L1 (exact) and L2 (semantic).
62pub struct TieredCache<B: BqSearchBackend, S: StorageLoader> {
63    l1: L1CacheHandle,
64    l2: L2SemanticCache<B, S>,
65}
66
67impl<B: BqSearchBackend, S: StorageLoader> std::fmt::Debug for TieredCache<B, S> {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("TieredCache")
70            .field("l1", &self.l1)
71            .field("l2", &self.l2)
72            .finish()
73    }
74}
75
76impl<B: BqSearchBackend, S: StorageLoader> TieredCache<B, S> {
77    /// Creates a tiered cache from an L1 handle and an L2 cache.
78    pub fn new(l1: L1CacheHandle, l2: L2SemanticCache<B, S>) -> Self {
79        Self { l1, l2 }
80    }
81
82    /// Returns the L1 handle.
83    pub fn l1(&self) -> &L1CacheHandle {
84        &self.l1
85    }
86
87    /// Returns the L2 cache.
88    pub fn l2(&self) -> &L2SemanticCache<B, S> {
89        &self.l2
90    }
91
92    /// Looks up `prompt` in L1 (exact) then L2 (semantic).
93    #[instrument(skip(self, prompt), fields(prompt_len = prompt.len(), tenant_id = tenant_id))]
94    pub async fn lookup(&self, prompt: &str, tenant_id: u64) -> L2CacheResult<TieredLookupResult> {
95        self.lookup_with_semantic_query(prompt, prompt, tenant_id)
96            .await
97    }
98
99    /// Looks up with separate exact key (L1) and semantic query (L2).
100    #[instrument(skip(self, exact_key, semantic_query), fields(key_len = exact_key.len(), query_len = semantic_query.len(), tenant_id = tenant_id))]
101    pub async fn lookup_with_semantic_query(
102        &self,
103        exact_key: &str,
104        semantic_query: &str,
105        tenant_id: u64,
106    ) -> L2CacheResult<TieredLookupResult> {
107        debug!("Checking L1 cache");
108        let l1_key = format!("{}:{}", tenant_id, exact_key);
109        if let Some(result) = self.l1.lookup(&l1_key) {
110            info!("L1 cache hit");
111            return Ok(TieredLookupResult::HitL1(result));
112        }
113
114        debug!("L1 miss, checking L2 cache");
115
116        match self.l2.search(semantic_query, tenant_id).await {
117            Ok(result) => {
118                if result.has_candidates() {
119                    info!(
120                        candidates = result.candidates().len(),
121                        best_score = result.best_candidate().map(|c| c.score),
122                        "L2 cache hit"
123                    );
124                    Ok(TieredLookupResult::HitL2(result))
125                } else {
126                    debug!("L2 returned no candidates");
127                    Ok(TieredLookupResult::Miss)
128                }
129            }
130            Err(L2CacheError::NoCandidates) => {
131                debug!("L2 cache miss");
132                Ok(TieredLookupResult::Miss)
133            }
134            Err(e) => Err(e),
135        }
136    }
137
138    /// Inserts an entry into L1 only (tenant-scoped).
139    pub fn insert_l1(&self, prompt: &str, tenant_id: u64, handle: MmapFileHandle) -> [u8; 32] {
140        let l1_key = format!("{}:{}", tenant_id, prompt);
141        self.l1.insert(&l1_key, handle)
142    }
143
144    /// Indexes an entry into L2 only.
145    pub async fn index_l2(
146        &self,
147        prompt: &str,
148        tenant_id: u64,
149        context_hash: u64,
150        storage_key: &str,
151        timestamp: i64,
152    ) -> L2CacheResult<u64> {
153        self.l2
154            .index(prompt, tenant_id, context_hash, storage_key, timestamp)
155            .await
156    }
157
158    /// Inserts into L1 and indexes into L2.
159    pub async fn insert_both(
160        &self,
161        prompt: &str,
162        tenant_id: u64,
163        context_hash: u64,
164        storage_key: &str,
165        timestamp: i64,
166        handle: MmapFileHandle,
167    ) -> L2CacheResult<([u8; 32], u64)> {
168        let l1_hash = self.insert_l1(prompt, tenant_id, handle);
169
170        let l2_point_id = self
171            .index_l2(prompt, tenant_id, context_hash, storage_key, timestamp)
172            .await?;
173
174        Ok((l1_hash, l2_point_id))
175    }
176
177    /// Removes an entry from L1.
178    pub fn remove_l1(&self, prompt: &str, tenant_id: u64) -> Option<MmapFileHandle> {
179        let l1_key = format!("{}:{}", tenant_id, prompt);
180        self.l1.remove_prompt(&l1_key)
181    }
182
183    /// Returns `true` if L1 contains a key for this tenant+prompt.
184    pub fn contains_l1(&self, prompt: &str, tenant_id: u64) -> bool {
185        let l1_key = format!("{}:{}", tenant_id, prompt);
186        self.l1.contains_prompt(&l1_key)
187    }
188
189    /// Returns the L1 entry count.
190    pub fn l1_len(&self) -> usize {
191        self.l1.len()
192    }
193
194    /// Returns `true` if L1 is empty.
195    pub fn l1_is_empty(&self) -> bool {
196        self.l1.is_empty()
197    }
198
199    /// Clears all L1 entries.
200    pub fn clear_l1(&self) {
201        self.l1.clear();
202    }
203
204    /// Runs any pending L1 maintenance tasks.
205    pub fn run_pending_tasks_l1(&self) {
206        self.l1.run_pending_tasks();
207    }
208
209    /// Returns `true` if the L2 backend reports readiness.
210    pub async fn is_ready(&self) -> bool {
211        self.l2.is_ready().await
212    }
213}
214
215#[cfg(any(test, feature = "mock"))]
216/// Type alias for a tiered cache backed by mocks.
217pub type MockTieredCache = TieredCache<MockBqClient, MockStorageLoader>;
218
219#[cfg(any(test, feature = "mock"))]
220impl TieredCache<MockBqClient, MockStorageLoader> {
221    /// Creates a ready-to-use mock cache with default L2 config.
222    pub async fn new_mock() -> L2CacheResult<Self> {
223        let l1 = L1CacheHandle::new();
224        let l2 = L2SemanticCache::new_mock(L2Config::default()).await?;
225        Ok(Self::new(l1, l2))
226    }
227
228    /// Creates a ready-to-use mock cache with a custom L2 config.
229    pub async fn new_mock_with_config(l2_config: L2Config) -> L2CacheResult<Self> {
230        let l1 = L1CacheHandle::new();
231        let l2 = L2SemanticCache::new_mock(l2_config).await?;
232        Ok(Self::new(l1, l2))
233    }
234
235    /// Returns the mock storage loader.
236    pub fn mock_storage(&self) -> &MockStorageLoader {
237        self.l2.storage()
238    }
239
240    /// Returns the mock BQ backend.
241    pub fn mock_bq_backend(&self) -> &MockBqClient {
242        self.l2.bq_backend()
243    }
244}
245
246#[derive(Clone)]
247/// Shared handle to a [`TieredCache`].
248pub struct TieredCacheHandle<B: BqSearchBackend, S: StorageLoader> {
249    inner: Arc<RwLock<TieredCache<B, S>>>,
250}
251
252impl<B: BqSearchBackend, S: StorageLoader> TieredCacheHandle<B, S> {
253    /// Wraps a cache in an `Arc<RwLock<...>>` for shared async access.
254    pub fn new(cache: TieredCache<B, S>) -> Self {
255        Self {
256            inner: Arc::new(RwLock::new(cache)),
257        }
258    }
259
260    /// Delegates to [`TieredCache::lookup`].
261    pub async fn lookup(&self, prompt: &str, tenant_id: u64) -> L2CacheResult<TieredLookupResult> {
262        self.inner.read().await.lookup(prompt, tenant_id).await
263    }
264
265    /// Delegates to [`TieredCache::lookup_with_semantic_query`].
266    pub async fn lookup_with_semantic_query(
267        &self,
268        exact_key: &str,
269        semantic_query: &str,
270        tenant_id: u64,
271    ) -> L2CacheResult<TieredLookupResult> {
272        self.inner
273            .read()
274            .await
275            .lookup_with_semantic_query(exact_key, semantic_query, tenant_id)
276            .await
277    }
278
279    /// Returns the number of strong references to the underlying handle.
280    pub fn strong_count(&self) -> usize {
281        Arc::strong_count(&self.inner)
282    }
283}
284
285impl<B: BqSearchBackend, S: StorageLoader> std::fmt::Debug for TieredCacheHandle<B, S> {
286    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287        f.debug_struct("TieredCacheHandle")
288            .field("strong_count", &self.strong_count())
289            .finish()
290    }
291}