Skip to main content

do_memory_storage_turso/cache/
wrapper.rs

1//! Cached wrapper for TursoStorage
2
3use super::config::{CacheConfig, CacheStats};
4use crate::TursoStorage;
5use async_trait::async_trait;
6use do_memory_core::memory::attribution::{
7    RecommendationFeedback, RecommendationSession, RecommendationStats,
8};
9use do_memory_core::{
10    Episode, Error, Heuristic, Pattern, Result, StorageBackend, episode::PatternId,
11};
12use do_memory_storage_redb::{AdaptiveCache, AdaptiveCacheConfig};
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use uuid::Uuid;
16
17/// Cached wrapper around TursoStorage
18///
19/// Provides transparent caching for episodes, patterns, and heuristics
20/// using adaptive TTL based on access patterns.
21pub struct CachedTursoStorage {
22    /// Underlying Turso storage
23    storage: Arc<TursoStorage>,
24
25    /// Episode cache
26    episode_cache: Option<AdaptiveCache<Episode>>,
27
28    /// Pattern cache
29    pattern_cache: Option<AdaptiveCache<Pattern>>,
30
31    /// Heuristic cache
32    heuristic_cache: Option<AdaptiveCache<Heuristic>>,
33
34    /// Cache configuration
35    config: CacheConfig,
36
37    /// Cache statistics
38    stats: CacheStatsInner,
39}
40
41/// Internal cache statistics with atomic counters
42#[derive(Default)]
43struct CacheStatsInner {
44    episode_hits: AtomicU64,
45    episode_misses: AtomicU64,
46    pattern_hits: AtomicU64,
47    pattern_misses: AtomicU64,
48    heuristic_hits: AtomicU64,
49    heuristic_misses: AtomicU64,
50}
51
52impl CachedTursoStorage {
53    /// Create a new cached storage wrapper
54    pub fn new(storage: TursoStorage, config: CacheConfig) -> Self {
55        // Create episode cache if enabled
56        let episode_cache = if config.enable_episode_cache {
57            let cache_config = AdaptiveCacheConfig {
58                max_size: config.max_episodes,
59                default_ttl: config.episode_ttl,
60                min_ttl: config.min_ttl,
61                max_ttl: config.max_ttl,
62                hot_threshold: config.hot_threshold,
63                cold_threshold: config.cold_threshold,
64                adaptation_rate: config.adaptation_rate,
65                window_size: 20,
66                cleanup_interval_secs: config.cleanup_interval_secs,
67                enable_background_cleanup: config.enable_background_cleanup,
68            };
69            Some(AdaptiveCache::new(cache_config))
70        } else {
71            None
72        };
73
74        // Create pattern cache if enabled
75        let pattern_cache = if config.enable_pattern_cache {
76            let cache_config = AdaptiveCacheConfig {
77                max_size: config.max_patterns,
78                default_ttl: config.pattern_ttl,
79                min_ttl: config.min_ttl,
80                max_ttl: config.max_ttl,
81                hot_threshold: config.hot_threshold,
82                cold_threshold: config.cold_threshold,
83                adaptation_rate: config.adaptation_rate,
84                window_size: 20,
85                cleanup_interval_secs: config.cleanup_interval_secs,
86                enable_background_cleanup: config.enable_background_cleanup,
87            };
88            Some(AdaptiveCache::new(cache_config))
89        } else {
90            None
91        };
92
93        // Create heuristic cache (smaller, same config as patterns)
94        let heuristic_cache = if config.enable_pattern_cache {
95            let cache_config = AdaptiveCacheConfig {
96                max_size: config.max_patterns / 2, // Half the size of patterns
97                default_ttl: config.pattern_ttl,
98                min_ttl: config.min_ttl,
99                max_ttl: config.max_ttl,
100                hot_threshold: config.hot_threshold,
101                cold_threshold: config.cold_threshold,
102                adaptation_rate: config.adaptation_rate,
103                window_size: 20,
104                cleanup_interval_secs: config.cleanup_interval_secs,
105                enable_background_cleanup: config.enable_background_cleanup,
106            };
107            Some(AdaptiveCache::new(cache_config))
108        } else {
109            None
110        };
111
112        Self {
113            storage: Arc::new(storage),
114            episode_cache,
115            pattern_cache,
116            heuristic_cache,
117            config,
118            stats: CacheStatsInner::default(),
119        }
120    }
121
122    /// Get the underlying storage (for operations that bypass cache)
123    pub fn storage(&self) -> &TursoStorage {
124        &self.storage
125    }
126
127    /// Get cache configuration
128    pub fn config(&self) -> &CacheConfig {
129        &self.config
130    }
131
132    /// Get cache statistics
133    pub fn stats(&self) -> CacheStats {
134        CacheStats {
135            episode_hits: self.stats.episode_hits.load(Ordering::Relaxed),
136            episode_misses: self.stats.episode_misses.load(Ordering::Relaxed),
137            pattern_hits: self.stats.pattern_hits.load(Ordering::Relaxed),
138            pattern_misses: self.stats.pattern_misses.load(Ordering::Relaxed),
139            query_hits: 0, // Not yet implemented
140            query_misses: 0,
141            evictions: 0, // Requires async access, use cache_sizes() + stats for accurate count
142            expirations: 0,
143        }
144    }
145
146    /// Get episode with caching
147    pub async fn get_episode_cached(&self, id: Uuid) -> Result<Option<Episode>> {
148        // Check cache first
149        if let Some(ref cache) = self.episode_cache {
150            if let Some(episode) = cache.get_and_record(id).await {
151                self.stats.episode_hits.fetch_add(1, Ordering::Relaxed);
152                return Ok(Some(episode));
153            }
154        }
155
156        // Cache miss - fetch from storage
157        self.stats.episode_misses.fetch_add(1, Ordering::Relaxed);
158        let episode = self.storage.get_episode(id).await?;
159
160        // Store in cache if found
161        if let (Some(ep), Some(cache)) = (&episode, &self.episode_cache) {
162            cache.record_access(id, false, Some(ep.clone())).await;
163        }
164
165        Ok(episode)
166    }
167
168    /// Get pattern with caching
169    pub async fn get_pattern_cached(
170        &self,
171        id: do_memory_core::episode::PatternId,
172    ) -> Result<Option<Pattern>> {
173        // PatternId is already a Uuid, use directly
174        let cache_key = id;
175
176        // Check cache first
177        if let Some(ref cache) = self.pattern_cache {
178            if let Some(pattern) = cache.get_and_record(cache_key).await {
179                self.stats.pattern_hits.fetch_add(1, Ordering::Relaxed);
180                return Ok(Some(pattern));
181            }
182        }
183
184        // Cache miss - fetch from storage
185        self.stats.pattern_misses.fetch_add(1, Ordering::Relaxed);
186        let pattern = self.storage.get_pattern(id).await?;
187
188        // Store in cache if found
189        if let (Some(pat), Some(cache)) = (&pattern, &self.pattern_cache) {
190            cache
191                .record_access(cache_key, false, Some(pat.clone()))
192                .await;
193        }
194
195        Ok(pattern)
196    }
197
198    /// Get heuristic with caching
199    pub async fn get_heuristic_cached(&self, id: Uuid) -> Result<Option<Heuristic>> {
200        // Check cache first
201        if let Some(ref cache) = self.heuristic_cache {
202            if let Some(heuristic) = cache.get_and_record(id).await {
203                self.stats.heuristic_hits.fetch_add(1, Ordering::Relaxed);
204                return Ok(Some(heuristic));
205            }
206        }
207
208        // Cache miss - fetch from storage
209        self.stats.heuristic_misses.fetch_add(1, Ordering::Relaxed);
210        let heuristic = self.storage.get_heuristic(id).await?;
211
212        // Store in cache if found
213        if let (Some(h), Some(cache)) = (&heuristic, &self.heuristic_cache) {
214            cache.record_access(id, false, Some(h.clone())).await;
215        }
216
217        Ok(heuristic)
218    }
219
220    /// Store episode (invalidates cache entry)
221    pub async fn store_episode_cached(&self, episode: &Episode) -> Result<()> {
222        // Store in database first
223        self.storage.store_episode(episode).await?;
224
225        // Invalidate cache entry
226        if let Some(ref cache) = self.episode_cache {
227            cache.remove(episode.episode_id).await;
228        }
229
230        Ok(())
231    }
232
233    /// Store pattern (invalidates cache entry)
234    pub async fn store_pattern_cached(&self, pattern: &Pattern) -> Result<()> {
235        // Store in database first
236        self.storage.store_pattern(pattern).await?;
237
238        // Invalidate cache entry
239        if let Some(ref cache) = self.pattern_cache {
240            cache.remove(pattern.id()).await;
241        }
242
243        Ok(())
244    }
245
246    /// Store heuristic (invalidates cache entry)
247    pub async fn store_heuristic_cached(&self, heuristic: &Heuristic) -> Result<()> {
248        // Store in database first
249        self.storage.store_heuristic(heuristic).await?;
250
251        // Invalidate cache entry
252        if let Some(ref cache) = self.heuristic_cache {
253            cache.remove(heuristic.heuristic_id).await;
254        }
255
256        Ok(())
257    }
258
259    /// Delete episode (invalidates cache entry)
260    pub async fn delete_episode_cached(&self, id: Uuid) -> Result<()> {
261        // Delete from database first
262        self.storage.delete_episode(id).await?;
263
264        // Invalidate cache entry
265        if let Some(ref cache) = self.episode_cache {
266            cache.remove(id).await;
267        }
268
269        Ok(())
270    }
271
272    /// Clear all caches
273    pub async fn clear_caches(&self) {
274        if let Some(ref cache) = self.episode_cache {
275            cache.clear().await;
276        }
277        if let Some(ref cache) = self.pattern_cache {
278            cache.clear().await;
279        }
280        if let Some(ref cache) = self.heuristic_cache {
281            cache.clear().await;
282        }
283    }
284
285    /// Get cache sizes
286    pub async fn cache_sizes(&self) -> (usize, usize, usize) {
287        let episode_size = if let Some(ref cache) = self.episode_cache {
288            cache.len().await
289        } else {
290            0
291        };
292
293        let pattern_size = if let Some(ref cache) = self.pattern_cache {
294            cache.len().await
295        } else {
296            0
297        };
298
299        let heuristic_size = if let Some(ref cache) = self.heuristic_cache {
300            cache.len().await
301        } else {
302            0
303        };
304
305        (episode_size, pattern_size, heuristic_size)
306    }
307}
308
309#[async_trait]
310impl StorageBackend for CachedTursoStorage {
311    async fn store_episode(&self, episode: &Episode) -> Result<()> {
312        self.store_episode_cached(episode)
313            .await
314            .map_err(|e| Error::Storage(format!("Cache store error: {}", e)))
315    }
316
317    async fn get_episode(&self, id: Uuid) -> Result<Option<Episode>> {
318        self.get_episode_cached(id)
319            .await
320            .map_err(|e| Error::Storage(format!("Cache get error: {}", e)))
321    }
322
323    async fn delete_episode(&self, id: Uuid) -> Result<()> {
324        self.delete_episode_cached(id)
325            .await
326            .map_err(|e| Error::Storage(format!("Cache delete error: {}", e)))
327    }
328
329    async fn store_pattern(&self, pattern: &Pattern) -> Result<()> {
330        self.store_pattern_cached(pattern)
331            .await
332            .map_err(|e| Error::Storage(format!("Cache store error: {}", e)))
333    }
334
335    async fn get_pattern(&self, id: PatternId) -> Result<Option<Pattern>> {
336        self.get_pattern_cached(id)
337            .await
338            .map_err(|e| Error::Storage(format!("Cache get error: {}", e)))
339    }
340
341    async fn store_heuristic(&self, heuristic: &Heuristic) -> Result<()> {
342        self.store_heuristic_cached(heuristic)
343            .await
344            .map_err(|e| Error::Storage(format!("Cache store error: {}", e)))
345    }
346
347    async fn get_heuristic(&self, id: Uuid) -> Result<Option<Heuristic>> {
348        self.get_heuristic_cached(id)
349            .await
350            .map_err(|e| Error::Storage(format!("Cache get error: {}", e)))
351    }
352
353    async fn query_episodes_since(
354        &self,
355        since: chrono::DateTime<chrono::Utc>,
356        limit: Option<usize>,
357    ) -> Result<Vec<Episode>> {
358        // Query caching not implemented for this method
359        // Fall back to underlying storage
360        self.storage
361            .query_episodes_since(since, limit)
362            .await
363            .map_err(|e| Error::Storage(format!("Query error: {}", e)))
364    }
365
366    async fn query_episodes_by_metadata(
367        &self,
368        key: &str,
369        value: &str,
370        limit: Option<usize>,
371    ) -> Result<Vec<Episode>> {
372        // Query caching not implemented for this method
373        // Fall back to underlying storage
374        self.storage
375            .query_episodes_by_metadata(key, value, limit)
376            .await
377            .map_err(|e| Error::Storage(format!("Query error: {}", e)))
378    }
379
380    async fn store_embedding(&self, id: &str, embedding: Vec<f32>) -> Result<()> {
381        self.storage
382            .store_embedding(id, embedding)
383            .await
384            .map_err(|e| Error::Storage(format!("Store embedding error: {}", e)))
385    }
386
387    async fn get_embedding(&self, id: &str) -> Result<Option<Vec<f32>>> {
388        self.storage
389            .get_embedding(id)
390            .await
391            .map_err(|e| Error::Storage(format!("Get embedding error: {}", e)))
392    }
393
394    async fn delete_embedding(&self, id: &str) -> Result<bool> {
395        self.storage
396            .delete_embedding(id)
397            .await
398            .map_err(|e| Error::Storage(format!("Delete embedding error: {}", e)))
399    }
400
401    async fn store_embeddings_batch(&self, embeddings: Vec<(String, Vec<f32>)>) -> Result<()> {
402        self.storage
403            .store_embeddings_batch(embeddings)
404            .await
405            .map_err(|e| Error::Storage(format!("Batch store embeddings error: {}", e)))
406    }
407
408    async fn get_embeddings_batch(&self, ids: &[String]) -> Result<Vec<Option<Vec<f32>>>> {
409        self.storage
410            .get_embeddings_batch(ids)
411            .await
412            .map_err(|e| Error::Storage(format!("Batch get embeddings error: {}", e)))
413    }
414
415    async fn store_recommendation_session(&self, session: &RecommendationSession) -> Result<()> {
416        self.storage
417            .store_recommendation_session(session)
418            .await
419            .map_err(|e| Error::Storage(format!("Store recommendation session error: {}", e)))
420    }
421
422    async fn get_recommendation_session(
423        &self,
424        session_id: Uuid,
425    ) -> Result<Option<RecommendationSession>> {
426        self.storage
427            .get_recommendation_session(session_id)
428            .await
429            .map_err(|e| Error::Storage(format!("Get recommendation session error: {}", e)))
430    }
431
432    async fn get_recommendation_session_for_episode(
433        &self,
434        episode_id: Uuid,
435    ) -> Result<Option<RecommendationSession>> {
436        self.storage
437            .get_recommendation_session_for_episode(episode_id)
438            .await
439            .map_err(|e| {
440                Error::Storage(format!("Get recommendation session (episode) error: {}", e))
441            })
442    }
443
444    async fn store_recommendation_feedback(&self, feedback: &RecommendationFeedback) -> Result<()> {
445        self.storage
446            .store_recommendation_feedback(feedback)
447            .await
448            .map_err(|e| Error::Storage(format!("Store recommendation feedback error: {}", e)))
449    }
450
451    async fn get_recommendation_feedback(
452        &self,
453        session_id: Uuid,
454    ) -> Result<Option<RecommendationFeedback>> {
455        self.storage
456            .get_recommendation_feedback(session_id)
457            .await
458            .map_err(|e| Error::Storage(format!("Get recommendation feedback error: {}", e)))
459    }
460
461    async fn get_recommendation_stats(&self) -> Result<RecommendationStats> {
462        self.storage
463            .get_recommendation_stats()
464            .await
465            .map_err(|e| Error::Storage(format!("Get recommendation stats error: {}", e)))
466    }
467}