1use std::sync::Arc;
7
8use tokio::sync::RwLock;
9use tracing::{debug, info, instrument};
10
11#[cfg(any(test, feature = "mock"))]
12use super::l2::L2Config;
13#[cfg(any(test, feature = "mock"))]
14use super::l2::MockStorageLoader;
15use super::l2::{
16 BqSearchBackend, L2CacheError, L2CacheResult, L2LookupResult, L2SemanticCache, StorageLoader,
17};
18
19use super::{L1CacheHandle, L1LookupResult, ReflexStatus};
20use crate::storage::mmap::MmapFileHandle;
21#[cfg(any(test, feature = "mock"))]
22use crate::vectordb::bq::MockBqClient;
23
24#[derive(Debug)]
25pub enum TieredLookupResult {
27 HitL1(L1LookupResult),
29 HitL2(L2LookupResult),
31 Miss,
33}
34
35impl TieredLookupResult {
36 pub fn status(&self) -> ReflexStatus {
38 match self {
39 TieredLookupResult::HitL1(result) => result.status(),
40 TieredLookupResult::HitL2(_) => ReflexStatus::HitL2Semantic,
41 TieredLookupResult::Miss => ReflexStatus::Miss,
42 }
43 }
44
45 pub fn is_hit(&self) -> bool {
47 !matches!(self, TieredLookupResult::Miss)
48 }
49
50 pub fn is_l1_hit(&self) -> bool {
52 matches!(self, TieredLookupResult::HitL1(_))
53 }
54
55 pub fn is_l2_hit(&self) -> bool {
57 matches!(self, TieredLookupResult::HitL2(_))
58 }
59}
60
61pub struct TieredCache<B: BqSearchBackend, S: StorageLoader> {
63 l1: L1CacheHandle,
64 l2: L2SemanticCache<B, S>,
65}
66
67impl<B: BqSearchBackend, S: StorageLoader> std::fmt::Debug for TieredCache<B, S> {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("TieredCache")
70 .field("l1", &self.l1)
71 .field("l2", &self.l2)
72 .finish()
73 }
74}
75
76impl<B: BqSearchBackend, S: StorageLoader> TieredCache<B, S> {
77 pub fn new(l1: L1CacheHandle, l2: L2SemanticCache<B, S>) -> Self {
79 Self { l1, l2 }
80 }
81
82 pub fn l1(&self) -> &L1CacheHandle {
84 &self.l1
85 }
86
87 pub fn l2(&self) -> &L2SemanticCache<B, S> {
89 &self.l2
90 }
91
92 #[instrument(skip(self, prompt), fields(prompt_len = prompt.len(), tenant_id = tenant_id))]
94 pub async fn lookup(&self, prompt: &str, tenant_id: u64) -> L2CacheResult<TieredLookupResult> {
95 self.lookup_with_semantic_query(prompt, prompt, tenant_id)
96 .await
97 }
98
99 #[instrument(skip(self, exact_key, semantic_query), fields(key_len = exact_key.len(), query_len = semantic_query.len(), tenant_id = tenant_id))]
101 pub async fn lookup_with_semantic_query(
102 &self,
103 exact_key: &str,
104 semantic_query: &str,
105 tenant_id: u64,
106 ) -> L2CacheResult<TieredLookupResult> {
107 debug!("Checking L1 cache");
108 let l1_key = format!("{}:{}", tenant_id, exact_key);
109 if let Some(result) = self.l1.lookup(&l1_key) {
110 info!("L1 cache hit");
111 return Ok(TieredLookupResult::HitL1(result));
112 }
113
114 debug!("L1 miss, checking L2 cache");
115
116 match self.l2.search(semantic_query, tenant_id).await {
117 Ok(result) => {
118 if result.has_candidates() {
119 info!(
120 candidates = result.candidates().len(),
121 best_score = result.best_candidate().map(|c| c.score),
122 "L2 cache hit"
123 );
124 Ok(TieredLookupResult::HitL2(result))
125 } else {
126 debug!("L2 returned no candidates");
127 Ok(TieredLookupResult::Miss)
128 }
129 }
130 Err(L2CacheError::NoCandidates) => {
131 debug!("L2 cache miss");
132 Ok(TieredLookupResult::Miss)
133 }
134 Err(e) => Err(e),
135 }
136 }
137
138 pub fn insert_l1(&self, prompt: &str, tenant_id: u64, handle: MmapFileHandle) -> [u8; 32] {
140 let l1_key = format!("{}:{}", tenant_id, prompt);
141 self.l1.insert(&l1_key, handle)
142 }
143
144 pub async fn index_l2(
146 &self,
147 prompt: &str,
148 tenant_id: u64,
149 context_hash: u64,
150 storage_key: &str,
151 timestamp: i64,
152 ) -> L2CacheResult<u64> {
153 self.l2
154 .index(prompt, tenant_id, context_hash, storage_key, timestamp)
155 .await
156 }
157
158 pub async fn insert_both(
160 &self,
161 prompt: &str,
162 tenant_id: u64,
163 context_hash: u64,
164 storage_key: &str,
165 timestamp: i64,
166 handle: MmapFileHandle,
167 ) -> L2CacheResult<([u8; 32], u64)> {
168 let l1_hash = self.insert_l1(prompt, tenant_id, handle);
169
170 let l2_point_id = self
171 .index_l2(prompt, tenant_id, context_hash, storage_key, timestamp)
172 .await?;
173
174 Ok((l1_hash, l2_point_id))
175 }
176
177 pub fn remove_l1(&self, prompt: &str, tenant_id: u64) -> Option<MmapFileHandle> {
179 let l1_key = format!("{}:{}", tenant_id, prompt);
180 self.l1.remove_prompt(&l1_key)
181 }
182
183 pub fn contains_l1(&self, prompt: &str, tenant_id: u64) -> bool {
185 let l1_key = format!("{}:{}", tenant_id, prompt);
186 self.l1.contains_prompt(&l1_key)
187 }
188
189 pub fn l1_len(&self) -> usize {
191 self.l1.len()
192 }
193
194 pub fn l1_is_empty(&self) -> bool {
196 self.l1.is_empty()
197 }
198
199 pub fn clear_l1(&self) {
201 self.l1.clear();
202 }
203
204 pub fn run_pending_tasks_l1(&self) {
206 self.l1.run_pending_tasks();
207 }
208
209 pub async fn is_ready(&self) -> bool {
211 self.l2.is_ready().await
212 }
213}
214
215#[cfg(any(test, feature = "mock"))]
216pub type MockTieredCache = TieredCache<MockBqClient, MockStorageLoader>;
218
219#[cfg(any(test, feature = "mock"))]
220impl TieredCache<MockBqClient, MockStorageLoader> {
221 pub async fn new_mock() -> L2CacheResult<Self> {
223 let l1 = L1CacheHandle::new();
224 let l2 = L2SemanticCache::new_mock(L2Config::default()).await?;
225 Ok(Self::new(l1, l2))
226 }
227
228 pub async fn new_mock_with_config(l2_config: L2Config) -> L2CacheResult<Self> {
230 let l1 = L1CacheHandle::new();
231 let l2 = L2SemanticCache::new_mock(l2_config).await?;
232 Ok(Self::new(l1, l2))
233 }
234
235 pub fn mock_storage(&self) -> &MockStorageLoader {
237 self.l2.storage()
238 }
239
240 pub fn mock_bq_backend(&self) -> &MockBqClient {
242 self.l2.bq_backend()
243 }
244}
245
246#[derive(Clone)]
247pub struct TieredCacheHandle<B: BqSearchBackend, S: StorageLoader> {
249 inner: Arc<RwLock<TieredCache<B, S>>>,
250}
251
252impl<B: BqSearchBackend, S: StorageLoader> TieredCacheHandle<B, S> {
253 pub fn new(cache: TieredCache<B, S>) -> Self {
255 Self {
256 inner: Arc::new(RwLock::new(cache)),
257 }
258 }
259
260 pub async fn lookup(&self, prompt: &str, tenant_id: u64) -> L2CacheResult<TieredLookupResult> {
262 self.inner.read().await.lookup(prompt, tenant_id).await
263 }
264
265 pub async fn lookup_with_semantic_query(
267 &self,
268 exact_key: &str,
269 semantic_query: &str,
270 tenant_id: u64,
271 ) -> L2CacheResult<TieredLookupResult> {
272 self.inner
273 .read()
274 .await
275 .lookup_with_semantic_query(exact_key, semantic_query, tenant_id)
276 .await
277 }
278
279 pub fn strong_count(&self) -> usize {
281 Arc::strong_count(&self.inner)
282 }
283}
284
285impl<B: BqSearchBackend, S: StorageLoader> std::fmt::Debug for TieredCacheHandle<B, S> {
286 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 f.debug_struct("TieredCacheHandle")
288 .field("strong_count", &self.strong_count())
289 .finish()
290 }
291}