hyperstack_server/
cache.rs

1//! Entity cache for snapshot-on-subscribe functionality.
2//!
3//! This module provides an `EntityCache` that maintains full projected entities
4//! in memory with LRU eviction. When a new client subscribes, they receive
5//! cached snapshots immediately rather than waiting for the next live mutation.
6
7use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 500;
15const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
16
17/// Configuration for the entity cache
18#[derive(Debug, Clone)]
19pub struct EntityCacheConfig {
20    /// Maximum number of entities to cache per view
21    pub max_entities_per_view: usize,
22    /// Maximum array length before oldest elements are evicted
23    pub max_array_length: usize,
24}
25
26impl Default for EntityCacheConfig {
27    fn default() -> Self {
28        Self {
29            max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
30            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
31        }
32    }
33}
34
35/// Entity cache that maintains full projected entities with LRU eviction.
36///
37/// The cache is populated as mutations flow through the projector, regardless
38/// of subscriber state. When a new subscriber connects, they receive snapshots
39/// of all cached entities for their requested view.
40#[derive(Clone)]
41pub struct EntityCache {
42    /// view_id -> LRU<entity_key, full_projected_entity>
43    caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
44    config: EntityCacheConfig,
45}
46
47impl EntityCache {
48    /// Create a new entity cache with default configuration
49    pub fn new() -> Self {
50        Self::with_config(EntityCacheConfig::default())
51    }
52
53    /// Create a new entity cache with custom configuration
54    pub fn with_config(config: EntityCacheConfig) -> Self {
55        Self {
56            caches: Arc::new(RwLock::new(HashMap::new())),
57            config,
58        }
59    }
60
61    pub async fn upsert(&self, view_id: &str, key: &str, patch: Value) {
62        self.upsert_with_append(view_id, key, patch, &[]).await;
63    }
64
65    pub async fn upsert_with_append(
66        &self,
67        view_id: &str,
68        key: &str,
69        patch: Value,
70        append_paths: &[String],
71    ) {
72        let mut caches = self.caches.write().await;
73
74        let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
75            LruCache::new(
76                NonZeroUsize::new(self.config.max_entities_per_view)
77                    .expect("max_entities_per_view must be > 0"),
78            )
79        });
80
81        let max_array_length = self.config.max_array_length;
82
83        if let Some(entity) = cache.get_mut(key) {
84            deep_merge_with_append(entity, patch, append_paths, max_array_length);
85        } else {
86            let new_entity = truncate_arrays_if_needed(patch, max_array_length);
87            cache.put(key.to_string(), new_entity);
88        }
89    }
90
91    /// Get all cached entities for a view.
92    ///
93    /// Returns a vector of (key, entity) pairs for sending as snapshots
94    /// to new subscribers.
95    pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
96        let caches = self.caches.read().await;
97
98        caches
99            .get(view_id)
100            .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
101            .unwrap_or_default()
102    }
103
104    /// Get a specific entity from the cache
105    pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
106        let caches = self.caches.read().await;
107        caches
108            .get(view_id)
109            .and_then(|cache| cache.peek(key).cloned())
110    }
111
112    /// Get the number of cached entities for a view
113    pub async fn len(&self, view_id: &str) -> usize {
114        let caches = self.caches.read().await;
115        caches.get(view_id).map(|c| c.len()).unwrap_or(0)
116    }
117
118    /// Check if the cache for a view is empty
119    pub async fn is_empty(&self, view_id: &str) -> bool {
120        self.len(view_id).await == 0
121    }
122
123    /// Clear all cached entities for a view
124    pub async fn clear(&self, view_id: &str) {
125        let mut caches = self.caches.write().await;
126        if let Some(cache) = caches.get_mut(view_id) {
127            cache.clear();
128        }
129    }
130
131    pub async fn clear_all(&self) {
132        let mut caches = self.caches.write().await;
133        caches.clear();
134    }
135
136    pub async fn stats(&self) -> CacheStats {
137        let caches = self.caches.read().await;
138        let mut total_entities = 0;
139        let mut views = Vec::new();
140
141        for (view_id, cache) in caches.iter() {
142            let count = cache.len();
143            total_entities += count;
144            views.push((view_id.clone(), count));
145        }
146
147        views.sort_by(|a, b| b.1.cmp(&a.1));
148
149        CacheStats {
150            view_count: caches.len(),
151            total_entities,
152            top_views: views.into_iter().take(5).collect(),
153        }
154    }
155}
156
157#[derive(Debug)]
158pub struct CacheStats {
159    pub view_count: usize,
160    pub total_entities: usize,
161    pub top_views: Vec<(String, usize)>,
162}
163
164impl Default for EntityCache {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170fn deep_merge_with_append(
171    base: &mut Value,
172    patch: Value,
173    append_paths: &[String],
174    max_array_length: usize,
175) {
176    deep_merge_with_append_inner(base, patch, append_paths, "", max_array_length);
177}
178
179fn deep_merge_with_append_inner(
180    base: &mut Value,
181    patch: Value,
182    append_paths: &[String],
183    current_path: &str,
184    max_array_length: usize,
185) {
186    match (base, patch) {
187        (Value::Object(base_map), Value::Object(patch_map)) => {
188            for (key, patch_value) in patch_map {
189                let child_path = if current_path.is_empty() {
190                    key.clone()
191                } else {
192                    format!("{}.{}", current_path, key)
193                };
194
195                if let Some(base_value) = base_map.get_mut(&key) {
196                    deep_merge_with_append_inner(
197                        base_value,
198                        patch_value,
199                        append_paths,
200                        &child_path,
201                        max_array_length,
202                    );
203                } else {
204                    base_map.insert(
205                        key,
206                        truncate_arrays_if_needed(patch_value, max_array_length),
207                    );
208                }
209            }
210        }
211
212        (Value::Array(base_arr), Value::Array(patch_arr)) => {
213            let should_append = append_paths.iter().any(|p| p == current_path);
214            if should_append {
215                base_arr.extend(patch_arr);
216                if base_arr.len() > max_array_length {
217                    let excess = base_arr.len() - max_array_length;
218                    base_arr.drain(0..excess);
219                }
220            } else {
221                *base_arr = patch_arr;
222                if base_arr.len() > max_array_length {
223                    let excess = base_arr.len() - max_array_length;
224                    base_arr.drain(0..excess);
225                }
226            }
227        }
228
229        (base, patch_value) => {
230            *base = truncate_arrays_if_needed(patch_value, max_array_length);
231        }
232    }
233}
234
235/// Recursively truncate any arrays in a value to the max length
236fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
237    match value {
238        Value::Array(mut arr) => {
239            // Truncate this array if needed
240            if arr.len() > max_array_length {
241                let excess = arr.len() - max_array_length;
242                arr.drain(0..excess);
243            }
244            // Recursively process elements
245            Value::Array(
246                arr.into_iter()
247                    .map(|v| truncate_arrays_if_needed(v, max_array_length))
248                    .collect(),
249            )
250        }
251        Value::Object(map) => Value::Object(
252            map.into_iter()
253                .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
254                .collect(),
255        ),
256        other => other,
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use serde_json::json;
264
265    #[tokio::test]
266    async fn test_basic_upsert_and_get() {
267        let cache = EntityCache::new();
268
269        cache
270            .upsert("tokens/list", "abc123", json!({"name": "Test Token"}))
271            .await;
272
273        let entity = cache.get("tokens/list", "abc123").await;
274        assert!(entity.is_some());
275        assert_eq!(entity.unwrap()["name"], "Test Token");
276    }
277
278    #[tokio::test]
279    async fn test_deep_merge_objects() {
280        let cache = EntityCache::new();
281
282        cache
283            .upsert(
284                "tokens/list",
285                "abc123",
286                json!({
287                    "id": "abc123",
288                    "metrics": {"volume": 100}
289                }),
290            )
291            .await;
292
293        cache
294            .upsert(
295                "tokens/list",
296                "abc123",
297                json!({
298                    "metrics": {"trades": 50}
299                }),
300            )
301            .await;
302
303        let entity = cache.get("tokens/list", "abc123").await.unwrap();
304        assert_eq!(entity["id"], "abc123");
305        assert_eq!(entity["metrics"]["volume"], 100);
306        assert_eq!(entity["metrics"]["trades"], 50);
307    }
308
309    #[tokio::test]
310    async fn test_array_append() {
311        let cache = EntityCache::new();
312
313        cache
314            .upsert(
315                "tokens/list",
316                "abc123",
317                json!({
318                    "events": [{"type": "buy", "amount": 100}]
319                }),
320            )
321            .await;
322
323        cache
324            .upsert_with_append(
325                "tokens/list",
326                "abc123",
327                json!({
328                    "events": [{"type": "sell", "amount": 50}]
329                }),
330                &["events".to_string()],
331            )
332            .await;
333
334        let entity = cache.get("tokens/list", "abc123").await.unwrap();
335        let events = entity["events"].as_array().unwrap();
336        assert_eq!(events.len(), 2);
337        assert_eq!(events[0]["type"], "buy");
338        assert_eq!(events[1]["type"], "sell");
339    }
340
341    #[tokio::test]
342    async fn test_array_lru_eviction() {
343        let config = EntityCacheConfig {
344            max_entities_per_view: 1000,
345            max_array_length: 3,
346        };
347        let cache = EntityCache::with_config(config);
348
349        cache
350            .upsert(
351                "tokens/list",
352                "abc123",
353                json!({
354                    "events": [
355                        {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
356                    ]
357                }),
358            )
359            .await;
360
361        let entity = cache.get("tokens/list", "abc123").await.unwrap();
362        let events = entity["events"].as_array().unwrap();
363
364        assert_eq!(events.len(), 3);
365        assert_eq!(events[0]["id"], 3);
366        assert_eq!(events[1]["id"], 4);
367        assert_eq!(events[2]["id"], 5);
368    }
369
370    #[tokio::test]
371    async fn test_array_append_with_lru() {
372        let config = EntityCacheConfig {
373            max_entities_per_view: 1000,
374            max_array_length: 3,
375        };
376        let cache = EntityCache::with_config(config);
377
378        cache
379            .upsert(
380                "tokens/list",
381                "abc123",
382                json!({
383                    "events": [{"id": 1}, {"id": 2}]
384                }),
385            )
386            .await;
387
388        cache
389            .upsert_with_append(
390                "tokens/list",
391                "abc123",
392                json!({
393                    "events": [{"id": 3}, {"id": 4}]
394                }),
395                &["events".to_string()],
396            )
397            .await;
398
399        let entity = cache.get("tokens/list", "abc123").await.unwrap();
400        let events = entity["events"].as_array().unwrap();
401
402        // [1,2] + [3,4] = [1,2,3,4] → LRU(3) = [2,3,4]
403        assert_eq!(events.len(), 3);
404        assert_eq!(events[0]["id"], 2);
405        assert_eq!(events[1]["id"], 3);
406        assert_eq!(events[2]["id"], 4);
407    }
408
409    #[tokio::test]
410    async fn test_entity_lru_eviction() {
411        let config = EntityCacheConfig {
412            max_entities_per_view: 2,
413            max_array_length: 100,
414        };
415        let cache = EntityCache::with_config(config);
416
417        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
418        cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
419        cache.upsert("tokens/list", "key3", json!({"id": 3})).await;
420
421        assert!(cache.get("tokens/list", "key1").await.is_none());
422        assert!(cache.get("tokens/list", "key2").await.is_some());
423        assert!(cache.get("tokens/list", "key3").await.is_some());
424    }
425
426    #[tokio::test]
427    async fn test_get_all() {
428        let cache = EntityCache::new();
429
430        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
431        cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
432
433        let all = cache.get_all("tokens/list").await;
434        assert_eq!(all.len(), 2);
435    }
436
437    #[tokio::test]
438    async fn test_separate_views() {
439        let cache = EntityCache::new();
440
441        cache
442            .upsert("tokens/list", "key1", json!({"type": "token"}))
443            .await;
444        cache
445            .upsert("games/list", "key1", json!({"type": "game"}))
446            .await;
447
448        let token = cache.get("tokens/list", "key1").await.unwrap();
449        let game = cache.get("games/list", "key1").await.unwrap();
450
451        assert_eq!(token["type"], "token");
452        assert_eq!(game["type"], "game");
453    }
454
455    #[test]
456    fn test_deep_merge_with_append() {
457        let mut base = json!({
458            "a": 1,
459            "b": {"c": 2},
460            "arr": [1, 2]
461        });
462
463        let patch = json!({
464            "b": {"d": 3},
465            "arr": [3],
466            "e": 4
467        });
468
469        deep_merge_with_append(&mut base, patch, &["arr".to_string()], 100);
470
471        assert_eq!(base["a"], 1);
472        assert_eq!(base["b"]["c"], 2);
473        assert_eq!(base["b"]["d"], 3);
474        assert_eq!(base["arr"].as_array().unwrap().len(), 3);
475        assert_eq!(base["e"], 4);
476    }
477
478    #[test]
479    fn test_deep_merge_replace_array() {
480        let mut base = json!({
481            "arr": [1, 2, 3]
482        });
483
484        let patch = json!({
485            "arr": [4, 5]
486        });
487
488        deep_merge_with_append(&mut base, patch, &[], 100);
489
490        assert_eq!(base["arr"].as_array().unwrap().len(), 2);
491        assert_eq!(base["arr"][0], 4);
492        assert_eq!(base["arr"][1], 5);
493    }
494
495    #[test]
496    fn test_deep_merge_nested_append() {
497        let mut base = json!({
498            "stats": {"events": [1, 2]}
499        });
500
501        let patch = json!({
502            "stats": {"events": [3]}
503        });
504
505        deep_merge_with_append(&mut base, patch, &["stats.events".to_string()], 100);
506
507        assert_eq!(base["stats"]["events"].as_array().unwrap().len(), 3);
508    }
509}