do_memory_storage_turso/cache/
wrapper.rs1use super::config::{CacheConfig, CacheStats};
4use crate::TursoStorage;
5use async_trait::async_trait;
6use do_memory_core::memory::attribution::{
7 RecommendationFeedback, RecommendationSession, RecommendationStats,
8};
9use do_memory_core::{
10 Episode, Error, Heuristic, Pattern, Result, StorageBackend, episode::PatternId,
11};
12use do_memory_storage_redb::{AdaptiveCache, AdaptiveCacheConfig};
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use uuid::Uuid;
16
17pub struct CachedTursoStorage {
22 storage: Arc<TursoStorage>,
24
25 episode_cache: Option<AdaptiveCache<Episode>>,
27
28 pattern_cache: Option<AdaptiveCache<Pattern>>,
30
31 heuristic_cache: Option<AdaptiveCache<Heuristic>>,
33
34 config: CacheConfig,
36
37 stats: CacheStatsInner,
39}
40
41#[derive(Default)]
43struct CacheStatsInner {
44 episode_hits: AtomicU64,
45 episode_misses: AtomicU64,
46 pattern_hits: AtomicU64,
47 pattern_misses: AtomicU64,
48 heuristic_hits: AtomicU64,
49 heuristic_misses: AtomicU64,
50}
51
52impl CachedTursoStorage {
53 pub fn new(storage: TursoStorage, config: CacheConfig) -> Self {
55 let episode_cache = if config.enable_episode_cache {
57 let cache_config = AdaptiveCacheConfig {
58 max_size: config.max_episodes,
59 default_ttl: config.episode_ttl,
60 min_ttl: config.min_ttl,
61 max_ttl: config.max_ttl,
62 hot_threshold: config.hot_threshold,
63 cold_threshold: config.cold_threshold,
64 adaptation_rate: config.adaptation_rate,
65 window_size: 20,
66 cleanup_interval_secs: config.cleanup_interval_secs,
67 enable_background_cleanup: config.enable_background_cleanup,
68 };
69 Some(AdaptiveCache::new(cache_config))
70 } else {
71 None
72 };
73
74 let pattern_cache = if config.enable_pattern_cache {
76 let cache_config = AdaptiveCacheConfig {
77 max_size: config.max_patterns,
78 default_ttl: config.pattern_ttl,
79 min_ttl: config.min_ttl,
80 max_ttl: config.max_ttl,
81 hot_threshold: config.hot_threshold,
82 cold_threshold: config.cold_threshold,
83 adaptation_rate: config.adaptation_rate,
84 window_size: 20,
85 cleanup_interval_secs: config.cleanup_interval_secs,
86 enable_background_cleanup: config.enable_background_cleanup,
87 };
88 Some(AdaptiveCache::new(cache_config))
89 } else {
90 None
91 };
92
93 let heuristic_cache = if config.enable_pattern_cache {
95 let cache_config = AdaptiveCacheConfig {
96 max_size: config.max_patterns / 2, default_ttl: config.pattern_ttl,
98 min_ttl: config.min_ttl,
99 max_ttl: config.max_ttl,
100 hot_threshold: config.hot_threshold,
101 cold_threshold: config.cold_threshold,
102 adaptation_rate: config.adaptation_rate,
103 window_size: 20,
104 cleanup_interval_secs: config.cleanup_interval_secs,
105 enable_background_cleanup: config.enable_background_cleanup,
106 };
107 Some(AdaptiveCache::new(cache_config))
108 } else {
109 None
110 };
111
112 Self {
113 storage: Arc::new(storage),
114 episode_cache,
115 pattern_cache,
116 heuristic_cache,
117 config,
118 stats: CacheStatsInner::default(),
119 }
120 }
121
122 pub fn storage(&self) -> &TursoStorage {
124 &self.storage
125 }
126
127 pub fn config(&self) -> &CacheConfig {
129 &self.config
130 }
131
132 pub fn stats(&self) -> CacheStats {
134 CacheStats {
135 episode_hits: self.stats.episode_hits.load(Ordering::Relaxed),
136 episode_misses: self.stats.episode_misses.load(Ordering::Relaxed),
137 pattern_hits: self.stats.pattern_hits.load(Ordering::Relaxed),
138 pattern_misses: self.stats.pattern_misses.load(Ordering::Relaxed),
139 query_hits: 0, query_misses: 0,
141 evictions: 0, expirations: 0,
143 }
144 }
145
146 pub async fn get_episode_cached(&self, id: Uuid) -> Result<Option<Episode>> {
148 if let Some(ref cache) = self.episode_cache {
150 if let Some(episode) = cache.get_and_record(id).await {
151 self.stats.episode_hits.fetch_add(1, Ordering::Relaxed);
152 return Ok(Some(episode));
153 }
154 }
155
156 self.stats.episode_misses.fetch_add(1, Ordering::Relaxed);
158 let episode = self.storage.get_episode(id).await?;
159
160 if let (Some(ep), Some(cache)) = (&episode, &self.episode_cache) {
162 cache.record_access(id, false, Some(ep.clone())).await;
163 }
164
165 Ok(episode)
166 }
167
168 pub async fn get_pattern_cached(
170 &self,
171 id: do_memory_core::episode::PatternId,
172 ) -> Result<Option<Pattern>> {
173 let cache_key = id;
175
176 if let Some(ref cache) = self.pattern_cache {
178 if let Some(pattern) = cache.get_and_record(cache_key).await {
179 self.stats.pattern_hits.fetch_add(1, Ordering::Relaxed);
180 return Ok(Some(pattern));
181 }
182 }
183
184 self.stats.pattern_misses.fetch_add(1, Ordering::Relaxed);
186 let pattern = self.storage.get_pattern(id).await?;
187
188 if let (Some(pat), Some(cache)) = (&pattern, &self.pattern_cache) {
190 cache
191 .record_access(cache_key, false, Some(pat.clone()))
192 .await;
193 }
194
195 Ok(pattern)
196 }
197
198 pub async fn get_heuristic_cached(&self, id: Uuid) -> Result<Option<Heuristic>> {
200 if let Some(ref cache) = self.heuristic_cache {
202 if let Some(heuristic) = cache.get_and_record(id).await {
203 self.stats.heuristic_hits.fetch_add(1, Ordering::Relaxed);
204 return Ok(Some(heuristic));
205 }
206 }
207
208 self.stats.heuristic_misses.fetch_add(1, Ordering::Relaxed);
210 let heuristic = self.storage.get_heuristic(id).await?;
211
212 if let (Some(h), Some(cache)) = (&heuristic, &self.heuristic_cache) {
214 cache.record_access(id, false, Some(h.clone())).await;
215 }
216
217 Ok(heuristic)
218 }
219
220 pub async fn store_episode_cached(&self, episode: &Episode) -> Result<()> {
222 self.storage.store_episode(episode).await?;
224
225 if let Some(ref cache) = self.episode_cache {
227 cache.remove(episode.episode_id).await;
228 }
229
230 Ok(())
231 }
232
233 pub async fn store_pattern_cached(&self, pattern: &Pattern) -> Result<()> {
235 self.storage.store_pattern(pattern).await?;
237
238 if let Some(ref cache) = self.pattern_cache {
240 cache.remove(pattern.id()).await;
241 }
242
243 Ok(())
244 }
245
246 pub async fn store_heuristic_cached(&self, heuristic: &Heuristic) -> Result<()> {
248 self.storage.store_heuristic(heuristic).await?;
250
251 if let Some(ref cache) = self.heuristic_cache {
253 cache.remove(heuristic.heuristic_id).await;
254 }
255
256 Ok(())
257 }
258
259 pub async fn delete_episode_cached(&self, id: Uuid) -> Result<()> {
261 self.storage.delete_episode(id).await?;
263
264 if let Some(ref cache) = self.episode_cache {
266 cache.remove(id).await;
267 }
268
269 Ok(())
270 }
271
272 pub async fn clear_caches(&self) {
274 if let Some(ref cache) = self.episode_cache {
275 cache.clear().await;
276 }
277 if let Some(ref cache) = self.pattern_cache {
278 cache.clear().await;
279 }
280 if let Some(ref cache) = self.heuristic_cache {
281 cache.clear().await;
282 }
283 }
284
285 pub async fn cache_sizes(&self) -> (usize, usize, usize) {
287 let episode_size = if let Some(ref cache) = self.episode_cache {
288 cache.len().await
289 } else {
290 0
291 };
292
293 let pattern_size = if let Some(ref cache) = self.pattern_cache {
294 cache.len().await
295 } else {
296 0
297 };
298
299 let heuristic_size = if let Some(ref cache) = self.heuristic_cache {
300 cache.len().await
301 } else {
302 0
303 };
304
305 (episode_size, pattern_size, heuristic_size)
306 }
307}
308
309#[async_trait]
310impl StorageBackend for CachedTursoStorage {
311 async fn store_episode(&self, episode: &Episode) -> Result<()> {
312 self.store_episode_cached(episode)
313 .await
314 .map_err(|e| Error::Storage(format!("Cache store error: {}", e)))
315 }
316
317 async fn get_episode(&self, id: Uuid) -> Result<Option<Episode>> {
318 self.get_episode_cached(id)
319 .await
320 .map_err(|e| Error::Storage(format!("Cache get error: {}", e)))
321 }
322
323 async fn delete_episode(&self, id: Uuid) -> Result<()> {
324 self.delete_episode_cached(id)
325 .await
326 .map_err(|e| Error::Storage(format!("Cache delete error: {}", e)))
327 }
328
329 async fn store_pattern(&self, pattern: &Pattern) -> Result<()> {
330 self.store_pattern_cached(pattern)
331 .await
332 .map_err(|e| Error::Storage(format!("Cache store error: {}", e)))
333 }
334
335 async fn get_pattern(&self, id: PatternId) -> Result<Option<Pattern>> {
336 self.get_pattern_cached(id)
337 .await
338 .map_err(|e| Error::Storage(format!("Cache get error: {}", e)))
339 }
340
341 async fn store_heuristic(&self, heuristic: &Heuristic) -> Result<()> {
342 self.store_heuristic_cached(heuristic)
343 .await
344 .map_err(|e| Error::Storage(format!("Cache store error: {}", e)))
345 }
346
347 async fn get_heuristic(&self, id: Uuid) -> Result<Option<Heuristic>> {
348 self.get_heuristic_cached(id)
349 .await
350 .map_err(|e| Error::Storage(format!("Cache get error: {}", e)))
351 }
352
353 async fn query_episodes_since(
354 &self,
355 since: chrono::DateTime<chrono::Utc>,
356 limit: Option<usize>,
357 ) -> Result<Vec<Episode>> {
358 self.storage
361 .query_episodes_since(since, limit)
362 .await
363 .map_err(|e| Error::Storage(format!("Query error: {}", e)))
364 }
365
366 async fn query_episodes_by_metadata(
367 &self,
368 key: &str,
369 value: &str,
370 limit: Option<usize>,
371 ) -> Result<Vec<Episode>> {
372 self.storage
375 .query_episodes_by_metadata(key, value, limit)
376 .await
377 .map_err(|e| Error::Storage(format!("Query error: {}", e)))
378 }
379
380 async fn store_embedding(&self, id: &str, embedding: Vec<f32>) -> Result<()> {
381 self.storage
382 .store_embedding(id, embedding)
383 .await
384 .map_err(|e| Error::Storage(format!("Store embedding error: {}", e)))
385 }
386
387 async fn get_embedding(&self, id: &str) -> Result<Option<Vec<f32>>> {
388 self.storage
389 .get_embedding(id)
390 .await
391 .map_err(|e| Error::Storage(format!("Get embedding error: {}", e)))
392 }
393
394 async fn delete_embedding(&self, id: &str) -> Result<bool> {
395 self.storage
396 .delete_embedding(id)
397 .await
398 .map_err(|e| Error::Storage(format!("Delete embedding error: {}", e)))
399 }
400
401 async fn store_embeddings_batch(&self, embeddings: Vec<(String, Vec<f32>)>) -> Result<()> {
402 self.storage
403 .store_embeddings_batch(embeddings)
404 .await
405 .map_err(|e| Error::Storage(format!("Batch store embeddings error: {}", e)))
406 }
407
408 async fn get_embeddings_batch(&self, ids: &[String]) -> Result<Vec<Option<Vec<f32>>>> {
409 self.storage
410 .get_embeddings_batch(ids)
411 .await
412 .map_err(|e| Error::Storage(format!("Batch get embeddings error: {}", e)))
413 }
414
415 async fn store_recommendation_session(&self, session: &RecommendationSession) -> Result<()> {
416 self.storage
417 .store_recommendation_session(session)
418 .await
419 .map_err(|e| Error::Storage(format!("Store recommendation session error: {}", e)))
420 }
421
422 async fn get_recommendation_session(
423 &self,
424 session_id: Uuid,
425 ) -> Result<Option<RecommendationSession>> {
426 self.storage
427 .get_recommendation_session(session_id)
428 .await
429 .map_err(|e| Error::Storage(format!("Get recommendation session error: {}", e)))
430 }
431
432 async fn get_recommendation_session_for_episode(
433 &self,
434 episode_id: Uuid,
435 ) -> Result<Option<RecommendationSession>> {
436 self.storage
437 .get_recommendation_session_for_episode(episode_id)
438 .await
439 .map_err(|e| {
440 Error::Storage(format!("Get recommendation session (episode) error: {}", e))
441 })
442 }
443
444 async fn store_recommendation_feedback(&self, feedback: &RecommendationFeedback) -> Result<()> {
445 self.storage
446 .store_recommendation_feedback(feedback)
447 .await
448 .map_err(|e| Error::Storage(format!("Store recommendation feedback error: {}", e)))
449 }
450
451 async fn get_recommendation_feedback(
452 &self,
453 session_id: Uuid,
454 ) -> Result<Option<RecommendationFeedback>> {
455 self.storage
456 .get_recommendation_feedback(session_id)
457 .await
458 .map_err(|e| Error::Storage(format!("Get recommendation feedback error: {}", e)))
459 }
460
461 async fn get_recommendation_stats(&self) -> Result<RecommendationStats> {
462 self.storage
463 .get_recommendation_stats()
464 .await
465 .map_err(|e| Error::Storage(format!("Get recommendation stats error: {}", e)))
466 }
467}