rust_logic_graph/distributed/
cache.rs1use crate::distributed::context::DistributedContext;
6use crate::distributed::store::ContextStore;
7use anyhow::Result;
8use std::sync::Arc;
9use std::time::Duration;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum CacheStrategy {
14 WriteThrough,
16
17 WriteBehind,
19
20 ReadThrough,
22
23 CacheAside,
25}
26
27pub struct DistributedCache {
29 store: Arc<dyn ContextStore>,
31
32 strategy: CacheStrategy,
34
35 default_ttl: Option<Duration>,
37}
38
39impl DistributedCache {
40 pub fn new(store: Arc<dyn ContextStore>) -> Self {
42 Self {
43 store,
44 strategy: CacheStrategy::WriteThrough,
45 default_ttl: Some(Duration::from_secs(3600)), }
47 }
48
49 pub fn with_config(
51 store: Arc<dyn ContextStore>,
52 strategy: CacheStrategy,
53 default_ttl: Option<Duration>,
54 ) -> Self {
55 Self {
56 store,
57 strategy,
58 default_ttl,
59 }
60 }
61
62 pub async fn get(&self, session_id: &str) -> Result<Option<DistributedContext>> {
64 match self.strategy {
65 CacheStrategy::ReadThrough | CacheStrategy::WriteThrough => {
66 self.store.load(session_id).await
67 }
68 CacheStrategy::CacheAside | CacheStrategy::WriteBehind => {
69 self.store.load(session_id).await
70 }
71 }
72 }
73
74 pub async fn put(&self, context: &DistributedContext) -> Result<()> {
76 self.put_with_ttl(context, self.default_ttl).await
77 }
78
79 pub async fn put_with_ttl(
81 &self,
82 context: &DistributedContext,
83 ttl: Option<Duration>,
84 ) -> Result<()> {
85 match self.strategy {
86 CacheStrategy::WriteThrough => {
87 self.store.save(context, ttl).await
89 }
90 CacheStrategy::WriteBehind => {
91 let store = self.store.clone();
93 let ctx = context.clone();
94 tokio::spawn(async move {
95 let _ = store.save(&ctx, ttl).await;
96 });
97 Ok(())
98 }
99 CacheStrategy::ReadThrough | CacheStrategy::CacheAside => {
100 self.store.save(context, ttl).await
101 }
102 }
103 }
104
105 pub async fn delete(&self, session_id: &str) -> Result<()> {
107 self.store.delete(session_id).await
108 }
109
110 pub async fn exists(&self, session_id: &str) -> Result<bool> {
112 self.store.exists(session_id).await
113 }
114
115 pub async fn invalidate(&self, session_id: &str) -> Result<()> {
117 self.delete(session_id).await
118 }
119
120 pub async fn get_many(
122 &self,
123 session_ids: &[String],
124 ) -> Result<Vec<Option<DistributedContext>>> {
125 let mut results = Vec::new();
126
127 for session_id in session_ids {
128 let context = self.get(session_id).await?;
129 results.push(context);
130 }
131
132 Ok(results)
133 }
134
135 pub async fn put_many(&self, contexts: &[DistributedContext]) -> Result<()> {
137 for context in contexts {
138 self.put(context).await?;
139 }
140 Ok(())
141 }
142
143 pub async fn stats(&self) -> CacheStats {
145 CacheStats {
146 total_contexts: self.store.list_sessions().await.unwrap_or_default().len(),
147 strategy: self.strategy,
148 default_ttl: self.default_ttl,
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
155pub struct CacheStats {
156 pub total_contexts: usize,
157 pub strategy: CacheStrategy,
158 pub default_ttl: Option<Duration>,
159}
160
161pub struct CacheWarmer {
163 cache: Arc<DistributedCache>,
164}
165
166impl CacheWarmer {
167 pub fn new(cache: Arc<DistributedCache>) -> Self {
169 Self { cache }
170 }
171
172 pub async fn warm(&self, contexts: Vec<DistributedContext>) -> Result<()> {
174 self.cache.put_many(&contexts).await
175 }
176
177 pub async fn warm_from_source(
179 &self,
180 session_ids: Vec<String>,
181 source: Arc<dyn ContextStore>,
182 ) -> Result<()> {
183 for session_id in session_ids {
184 if let Some(context) = source.load(&session_id).await? {
185 self.cache.put(&context).await?;
186 }
187 }
188 Ok(())
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::distributed::store::InMemoryStore;
196 use serde_json::json;
197
198 #[tokio::test]
199 async fn test_cache_put_and_get() {
200 let store = Arc::new(InMemoryStore::new());
201 let cache = DistributedCache::new(store);
202
203 let mut ctx = DistributedContext::new("test-session");
204 ctx.set("key1", json!("value1"));
205
206 cache.put(&ctx).await.unwrap();
207
208 let loaded = cache.get("test-session").await.unwrap();
209 assert!(loaded.is_some());
210 assert_eq!(loaded.unwrap().get("key1"), Some(&json!("value1")));
211 }
212
213 #[tokio::test]
214 async fn test_cache_delete() {
215 let store = Arc::new(InMemoryStore::new());
216 let cache = DistributedCache::new(store);
217
218 let ctx = DistributedContext::new("test-session");
219 cache.put(&ctx).await.unwrap();
220
221 assert!(cache.exists("test-session").await.unwrap());
222
223 cache.delete("test-session").await.unwrap();
224
225 assert!(!cache.exists("test-session").await.unwrap());
226 }
227
228 #[tokio::test]
229 async fn test_batch_operations() {
230 let store = Arc::new(InMemoryStore::new());
231 let cache = DistributedCache::new(store);
232
233 let ctx1 = DistributedContext::new("session-1");
234 let ctx2 = DistributedContext::new("session-2");
235
236 cache.put_many(&[ctx1, ctx2]).await.unwrap();
237
238 let results = cache
239 .get_many(&["session-1".to_string(), "session-2".to_string()])
240 .await
241 .unwrap();
242
243 assert_eq!(results.len(), 2);
244 assert!(results[0].is_some());
245 assert!(results[1].is_some());
246 }
247
248 #[tokio::test]
249 async fn test_cache_warmer() {
250 let store = Arc::new(InMemoryStore::new());
251 let cache = Arc::new(DistributedCache::new(store));
252 let warmer = CacheWarmer::new(cache.clone());
253
254 let ctx1 = DistributedContext::new("session-1");
255 let ctx2 = DistributedContext::new("session-2");
256
257 warmer.warm(vec![ctx1, ctx2]).await.unwrap();
258
259 assert!(cache.exists("session-1").await.unwrap());
260 assert!(cache.exists("session-2").await.unwrap());
261 }
262
263 #[tokio::test]
264 async fn test_cache_stats() {
265 let store = Arc::new(InMemoryStore::new());
266 let cache = DistributedCache::with_config(
267 store,
268 CacheStrategy::WriteThrough,
269 Some(Duration::from_secs(300)),
270 );
271
272 let ctx1 = DistributedContext::new("session-1");
273 let ctx2 = DistributedContext::new("session-2");
274
275 cache.put(&ctx1).await.unwrap();
276 cache.put(&ctx2).await.unwrap();
277
278 let stats = cache.stats().await;
279 assert_eq!(stats.total_contexts, 2);
280 assert_eq!(stats.strategy, CacheStrategy::WriteThrough);
281 }
282}