rust_logic_graph/distributed/
cache.rs

1//! Distributed Caching for Contexts
2//!
3//! Provides caching strategies for distributed context sharing.
4
5use crate::distributed::context::DistributedContext;
6use crate::distributed::store::ContextStore;
7use anyhow::Result;
8use std::sync::Arc;
9use std::time::Duration;
10
11/// Caching strategy for distributed contexts
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum CacheStrategy {
14    /// Write-through: write to cache and store simultaneously
15    WriteThrough,
16
17    /// Write-behind: write to cache immediately, async write to store
18    WriteBehind,
19
20    /// Read-through: read from cache, fallback to store
21    ReadThrough,
22
23    /// Cache-aside: application manages cache and store
24    CacheAside,
25}
26
27/// Distributed cache for contexts
28pub struct DistributedCache {
29    /// Primary store (e.g., Redis)
30    store: Arc<dyn ContextStore>,
31
32    /// Caching strategy
33    strategy: CacheStrategy,
34
35    /// Default TTL for cached contexts
36    default_ttl: Option<Duration>,
37}
38
39impl DistributedCache {
40    /// Create a new distributed cache
41    pub fn new(store: Arc<dyn ContextStore>) -> Self {
42        Self {
43            store,
44            strategy: CacheStrategy::WriteThrough,
45            default_ttl: Some(Duration::from_secs(3600)), // 1 hour default
46        }
47    }
48
49    /// Create with custom strategy and TTL
50    pub fn with_config(
51        store: Arc<dyn ContextStore>,
52        strategy: CacheStrategy,
53        default_ttl: Option<Duration>,
54    ) -> Self {
55        Self {
56            store,
57            strategy,
58            default_ttl,
59        }
60    }
61
62    /// Get a context from cache
63    pub async fn get(&self, session_id: &str) -> Result<Option<DistributedContext>> {
64        match self.strategy {
65            CacheStrategy::ReadThrough | CacheStrategy::WriteThrough => {
66                self.store.load(session_id).await
67            }
68            CacheStrategy::CacheAside | CacheStrategy::WriteBehind => {
69                self.store.load(session_id).await
70            }
71        }
72    }
73
74    /// Put a context into cache
75    pub async fn put(&self, context: &DistributedContext) -> Result<()> {
76        self.put_with_ttl(context, self.default_ttl).await
77    }
78
79    /// Put a context with custom TTL
80    pub async fn put_with_ttl(
81        &self,
82        context: &DistributedContext,
83        ttl: Option<Duration>,
84    ) -> Result<()> {
85        match self.strategy {
86            CacheStrategy::WriteThrough => {
87                // Write immediately to store
88                self.store.save(context, ttl).await
89            }
90            CacheStrategy::WriteBehind => {
91                // Write to cache immediately, async write to store
92                let store = self.store.clone();
93                let ctx = context.clone();
94                tokio::spawn(async move {
95                    let _ = store.save(&ctx, ttl).await;
96                });
97                Ok(())
98            }
99            CacheStrategy::ReadThrough | CacheStrategy::CacheAside => {
100                self.store.save(context, ttl).await
101            }
102        }
103    }
104
105    /// Delete a context from cache
106    pub async fn delete(&self, session_id: &str) -> Result<()> {
107        self.store.delete(session_id).await
108    }
109
110    /// Check if context exists in cache
111    pub async fn exists(&self, session_id: &str) -> Result<bool> {
112        self.store.exists(session_id).await
113    }
114
115    /// Invalidate (delete) a context
116    pub async fn invalidate(&self, session_id: &str) -> Result<()> {
117        self.delete(session_id).await
118    }
119
120    /// Batch get multiple contexts
121    pub async fn get_many(
122        &self,
123        session_ids: &[String],
124    ) -> Result<Vec<Option<DistributedContext>>> {
125        let mut results = Vec::new();
126
127        for session_id in session_ids {
128            let context = self.get(session_id).await?;
129            results.push(context);
130        }
131
132        Ok(results)
133    }
134
135    /// Batch put multiple contexts
136    pub async fn put_many(&self, contexts: &[DistributedContext]) -> Result<()> {
137        for context in contexts {
138            self.put(context).await?;
139        }
140        Ok(())
141    }
142
143    /// Get cache statistics
144    pub async fn stats(&self) -> CacheStats {
145        CacheStats {
146            total_contexts: self.store.list_sessions().await.unwrap_or_default().len(),
147            strategy: self.strategy,
148            default_ttl: self.default_ttl,
149        }
150    }
151}
152
153/// Cache statistics
154#[derive(Debug, Clone)]
155pub struct CacheStats {
156    pub total_contexts: usize,
157    pub strategy: CacheStrategy,
158    pub default_ttl: Option<Duration>,
159}
160
161/// Cache warming utility
162pub struct CacheWarmer {
163    cache: Arc<DistributedCache>,
164}
165
166impl CacheWarmer {
167    /// Create a new cache warmer
168    pub fn new(cache: Arc<DistributedCache>) -> Self {
169        Self { cache }
170    }
171
172    /// Warm cache with contexts
173    pub async fn warm(&self, contexts: Vec<DistributedContext>) -> Result<()> {
174        self.cache.put_many(&contexts).await
175    }
176
177    /// Warm cache with session IDs (loads from source)
178    pub async fn warm_from_source(
179        &self,
180        session_ids: Vec<String>,
181        source: Arc<dyn ContextStore>,
182    ) -> Result<()> {
183        for session_id in session_ids {
184            if let Some(context) = source.load(&session_id).await? {
185                self.cache.put(&context).await?;
186            }
187        }
188        Ok(())
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::distributed::store::InMemoryStore;
196    use serde_json::json;
197
198    #[tokio::test]
199    async fn test_cache_put_and_get() {
200        let store = Arc::new(InMemoryStore::new());
201        let cache = DistributedCache::new(store);
202
203        let mut ctx = DistributedContext::new("test-session");
204        ctx.set("key1", json!("value1"));
205
206        cache.put(&ctx).await.unwrap();
207
208        let loaded = cache.get("test-session").await.unwrap();
209        assert!(loaded.is_some());
210        assert_eq!(loaded.unwrap().get("key1"), Some(&json!("value1")));
211    }
212
213    #[tokio::test]
214    async fn test_cache_delete() {
215        let store = Arc::new(InMemoryStore::new());
216        let cache = DistributedCache::new(store);
217
218        let ctx = DistributedContext::new("test-session");
219        cache.put(&ctx).await.unwrap();
220
221        assert!(cache.exists("test-session").await.unwrap());
222
223        cache.delete("test-session").await.unwrap();
224
225        assert!(!cache.exists("test-session").await.unwrap());
226    }
227
228    #[tokio::test]
229    async fn test_batch_operations() {
230        let store = Arc::new(InMemoryStore::new());
231        let cache = DistributedCache::new(store);
232
233        let ctx1 = DistributedContext::new("session-1");
234        let ctx2 = DistributedContext::new("session-2");
235
236        cache.put_many(&[ctx1, ctx2]).await.unwrap();
237
238        let results = cache
239            .get_many(&["session-1".to_string(), "session-2".to_string()])
240            .await
241            .unwrap();
242
243        assert_eq!(results.len(), 2);
244        assert!(results[0].is_some());
245        assert!(results[1].is_some());
246    }
247
248    #[tokio::test]
249    async fn test_cache_warmer() {
250        let store = Arc::new(InMemoryStore::new());
251        let cache = Arc::new(DistributedCache::new(store));
252        let warmer = CacheWarmer::new(cache.clone());
253
254        let ctx1 = DistributedContext::new("session-1");
255        let ctx2 = DistributedContext::new("session-2");
256
257        warmer.warm(vec![ctx1, ctx2]).await.unwrap();
258
259        assert!(cache.exists("session-1").await.unwrap());
260        assert!(cache.exists("session-2").await.unwrap());
261    }
262
263    #[tokio::test]
264    async fn test_cache_stats() {
265        let store = Arc::new(InMemoryStore::new());
266        let cache = DistributedCache::with_config(
267            store,
268            CacheStrategy::WriteThrough,
269            Some(Duration::from_secs(300)),
270        );
271
272        let ctx1 = DistributedContext::new("session-1");
273        let ctx2 = DistributedContext::new("session-2");
274
275        cache.put(&ctx1).await.unwrap();
276        cache.put(&ctx2).await.unwrap();
277
278        let stats = cache.stats().await;
279        assert_eq!(stats.total_contexts, 2);
280        assert_eq!(stats.strategy, CacheStrategy::WriteThrough);
281    }
282}