hyperstack_server/
cache.rs

1//! Entity cache for snapshot-on-subscribe functionality.
2//!
3//! This module provides an `EntityCache` that maintains full projected entities
4//! in memory with LRU eviction. When a new client subscribes, they receive
5//! cached snapshots immediately rather than waiting for the next live mutation.
6
7use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 500;
15const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
16const DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE: usize = 50;
17const DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE: usize = 100;
18
19/// Configuration for the entity cache
20#[derive(Debug, Clone)]
21pub struct EntityCacheConfig {
22    /// Maximum number of entities to cache per view
23    pub max_entities_per_view: usize,
24    /// Maximum array length before oldest elements are evicted
25    pub max_array_length: usize,
26    /// Number of entities to send in the first snapshot batch (for fast initial render)
27    pub initial_snapshot_batch_size: usize,
28    /// Number of entities to send in subsequent snapshot batches
29    pub subsequent_snapshot_batch_size: usize,
30}
31
32impl Default for EntityCacheConfig {
33    fn default() -> Self {
34        Self {
35            max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
36            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
37            initial_snapshot_batch_size: DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE,
38            subsequent_snapshot_batch_size: DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE,
39        }
40    }
41}
42
43/// Entity cache that maintains full projected entities with LRU eviction.
44///
45/// The cache is populated as mutations flow through the projector, regardless
46/// of subscriber state. When a new subscriber connects, they receive snapshots
47/// of all cached entities for their requested view.
48#[derive(Clone)]
49pub struct EntityCache {
50    /// view_id -> LRU<entity_key, full_projected_entity>
51    caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
52    config: EntityCacheConfig,
53}
54
55impl EntityCache {
56    /// Create a new entity cache with default configuration
57    pub fn new() -> Self {
58        Self::with_config(EntityCacheConfig::default())
59    }
60
61    /// Create a new entity cache with custom configuration
62    pub fn with_config(config: EntityCacheConfig) -> Self {
63        Self {
64            caches: Arc::new(RwLock::new(HashMap::new())),
65            config,
66        }
67    }
68
69    pub async fn upsert(&self, view_id: &str, key: &str, patch: Value) {
70        self.upsert_with_append(view_id, key, patch, &[]).await;
71    }
72
73    pub async fn upsert_with_append(
74        &self,
75        view_id: &str,
76        key: &str,
77        patch: Value,
78        append_paths: &[String],
79    ) {
80        let mut caches = self.caches.write().await;
81
82        let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
83            LruCache::new(
84                NonZeroUsize::new(self.config.max_entities_per_view)
85                    .expect("max_entities_per_view must be > 0"),
86            )
87        });
88
89        let max_array_length = self.config.max_array_length;
90
91        if let Some(entity) = cache.get_mut(key) {
92            deep_merge_with_append(entity, patch, append_paths, max_array_length);
93        } else {
94            let new_entity = truncate_arrays_if_needed(patch, max_array_length);
95            cache.put(key.to_string(), new_entity);
96        }
97    }
98
99    /// Get all cached entities for a view.
100    ///
101    /// Returns a vector of (key, entity) pairs for sending as snapshots
102    /// to new subscribers.
103    pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
104        let caches = self.caches.read().await;
105
106        caches
107            .get(view_id)
108            .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
109            .unwrap_or_default()
110    }
111
112    /// Get a specific entity from the cache
113    pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
114        let caches = self.caches.read().await;
115        caches
116            .get(view_id)
117            .and_then(|cache| cache.peek(key).cloned())
118    }
119
120    /// Get the number of cached entities for a view
121    pub async fn len(&self, view_id: &str) -> usize {
122        let caches = self.caches.read().await;
123        caches.get(view_id).map(|c| c.len()).unwrap_or(0)
124    }
125
126    /// Check if the cache for a view is empty
127    pub async fn is_empty(&self, view_id: &str) -> bool {
128        self.len(view_id).await == 0
129    }
130
131    /// Get the snapshot batch configuration
132    pub fn snapshot_config(&self) -> SnapshotBatchConfig {
133        SnapshotBatchConfig {
134            initial_batch_size: self.config.initial_snapshot_batch_size,
135            subsequent_batch_size: self.config.subsequent_snapshot_batch_size,
136        }
137    }
138
139    /// Clear all cached entities for a view
140    pub async fn clear(&self, view_id: &str) {
141        let mut caches = self.caches.write().await;
142        if let Some(cache) = caches.get_mut(view_id) {
143            cache.clear();
144        }
145    }
146
147    pub async fn clear_all(&self) {
148        let mut caches = self.caches.write().await;
149        caches.clear();
150    }
151
152    pub async fn stats(&self) -> CacheStats {
153        let caches = self.caches.read().await;
154        let mut total_entities = 0;
155        let mut views = Vec::new();
156
157        for (view_id, cache) in caches.iter() {
158            let count = cache.len();
159            total_entities += count;
160            views.push((view_id.clone(), count));
161        }
162
163        views.sort_by(|a, b| b.1.cmp(&a.1));
164
165        CacheStats {
166            view_count: caches.len(),
167            total_entities,
168            top_views: views.into_iter().take(5).collect(),
169        }
170    }
171}
172
173#[derive(Debug)]
174pub struct CacheStats {
175    pub view_count: usize,
176    pub total_entities: usize,
177    pub top_views: Vec<(String, usize)>,
178}
179
180#[derive(Debug, Clone, Copy)]
181pub struct SnapshotBatchConfig {
182    pub initial_batch_size: usize,
183    pub subsequent_batch_size: usize,
184}
185
186impl Default for EntityCache {
187    fn default() -> Self {
188        Self::new()
189    }
190}
191
192fn deep_merge_with_append(
193    base: &mut Value,
194    patch: Value,
195    append_paths: &[String],
196    max_array_length: usize,
197) {
198    deep_merge_with_append_inner(base, patch, append_paths, "", max_array_length);
199}
200
201fn deep_merge_with_append_inner(
202    base: &mut Value,
203    patch: Value,
204    append_paths: &[String],
205    current_path: &str,
206    max_array_length: usize,
207) {
208    match (base, patch) {
209        (Value::Object(base_map), Value::Object(patch_map)) => {
210            for (key, patch_value) in patch_map {
211                let child_path = if current_path.is_empty() {
212                    key.clone()
213                } else {
214                    format!("{}.{}", current_path, key)
215                };
216
217                if let Some(base_value) = base_map.get_mut(&key) {
218                    deep_merge_with_append_inner(
219                        base_value,
220                        patch_value,
221                        append_paths,
222                        &child_path,
223                        max_array_length,
224                    );
225                } else {
226                    base_map.insert(
227                        key,
228                        truncate_arrays_if_needed(patch_value, max_array_length),
229                    );
230                }
231            }
232        }
233
234        (Value::Array(base_arr), Value::Array(patch_arr)) => {
235            let should_append = append_paths.iter().any(|p| p == current_path);
236            if should_append {
237                base_arr.extend(patch_arr);
238                if base_arr.len() > max_array_length {
239                    let excess = base_arr.len() - max_array_length;
240                    base_arr.drain(0..excess);
241                }
242            } else {
243                *base_arr = patch_arr;
244                if base_arr.len() > max_array_length {
245                    let excess = base_arr.len() - max_array_length;
246                    base_arr.drain(0..excess);
247                }
248            }
249        }
250
251        (base, patch_value) => {
252            *base = truncate_arrays_if_needed(patch_value, max_array_length);
253        }
254    }
255}
256
257/// Recursively truncate any arrays in a value to the max length
258fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
259    match value {
260        Value::Array(mut arr) => {
261            // Truncate this array if needed
262            if arr.len() > max_array_length {
263                let excess = arr.len() - max_array_length;
264                arr.drain(0..excess);
265            }
266            // Recursively process elements
267            Value::Array(
268                arr.into_iter()
269                    .map(|v| truncate_arrays_if_needed(v, max_array_length))
270                    .collect(),
271            )
272        }
273        Value::Object(map) => Value::Object(
274            map.into_iter()
275                .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
276                .collect(),
277        ),
278        other => other,
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use serde_json::json;
286
287    #[tokio::test]
288    async fn test_basic_upsert_and_get() {
289        let cache = EntityCache::new();
290
291        cache
292            .upsert("tokens/list", "abc123", json!({"name": "Test Token"}))
293            .await;
294
295        let entity = cache.get("tokens/list", "abc123").await;
296        assert!(entity.is_some());
297        assert_eq!(entity.unwrap()["name"], "Test Token");
298    }
299
300    #[tokio::test]
301    async fn test_deep_merge_objects() {
302        let cache = EntityCache::new();
303
304        cache
305            .upsert(
306                "tokens/list",
307                "abc123",
308                json!({
309                    "id": "abc123",
310                    "metrics": {"volume": 100}
311                }),
312            )
313            .await;
314
315        cache
316            .upsert(
317                "tokens/list",
318                "abc123",
319                json!({
320                    "metrics": {"trades": 50}
321                }),
322            )
323            .await;
324
325        let entity = cache.get("tokens/list", "abc123").await.unwrap();
326        assert_eq!(entity["id"], "abc123");
327        assert_eq!(entity["metrics"]["volume"], 100);
328        assert_eq!(entity["metrics"]["trades"], 50);
329    }
330
331    #[tokio::test]
332    async fn test_array_append() {
333        let cache = EntityCache::new();
334
335        cache
336            .upsert(
337                "tokens/list",
338                "abc123",
339                json!({
340                    "events": [{"type": "buy", "amount": 100}]
341                }),
342            )
343            .await;
344
345        cache
346            .upsert_with_append(
347                "tokens/list",
348                "abc123",
349                json!({
350                    "events": [{"type": "sell", "amount": 50}]
351                }),
352                &["events".to_string()],
353            )
354            .await;
355
356        let entity = cache.get("tokens/list", "abc123").await.unwrap();
357        let events = entity["events"].as_array().unwrap();
358        assert_eq!(events.len(), 2);
359        assert_eq!(events[0]["type"], "buy");
360        assert_eq!(events[1]["type"], "sell");
361    }
362
363    #[tokio::test]
364    async fn test_array_lru_eviction() {
365        let config = EntityCacheConfig {
366            max_entities_per_view: 1000,
367            max_array_length: 3,
368            ..Default::default()
369        };
370        let cache = EntityCache::with_config(config);
371
372        cache
373            .upsert(
374                "tokens/list",
375                "abc123",
376                json!({
377                    "events": [
378                        {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
379                    ]
380                }),
381            )
382            .await;
383
384        let entity = cache.get("tokens/list", "abc123").await.unwrap();
385        let events = entity["events"].as_array().unwrap();
386
387        assert_eq!(events.len(), 3);
388        assert_eq!(events[0]["id"], 3);
389        assert_eq!(events[1]["id"], 4);
390        assert_eq!(events[2]["id"], 5);
391    }
392
393    #[tokio::test]
394    async fn test_array_append_with_lru() {
395        let config = EntityCacheConfig {
396            max_entities_per_view: 1000,
397            max_array_length: 3,
398            ..Default::default()
399        };
400        let cache = EntityCache::with_config(config);
401
402        cache
403            .upsert(
404                "tokens/list",
405                "abc123",
406                json!({
407                    "events": [{"id": 1}, {"id": 2}]
408                }),
409            )
410            .await;
411
412        cache
413            .upsert_with_append(
414                "tokens/list",
415                "abc123",
416                json!({
417                    "events": [{"id": 3}, {"id": 4}]
418                }),
419                &["events".to_string()],
420            )
421            .await;
422
423        let entity = cache.get("tokens/list", "abc123").await.unwrap();
424        let events = entity["events"].as_array().unwrap();
425
426        // [1,2] + [3,4] = [1,2,3,4] → LRU(3) = [2,3,4]
427        assert_eq!(events.len(), 3);
428        assert_eq!(events[0]["id"], 2);
429        assert_eq!(events[1]["id"], 3);
430        assert_eq!(events[2]["id"], 4);
431    }
432
433    #[tokio::test]
434    async fn test_entity_lru_eviction() {
435        let config = EntityCacheConfig {
436            max_entities_per_view: 2,
437            max_array_length: 100,
438            ..Default::default()
439        };
440        let cache = EntityCache::with_config(config);
441
442        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
443        cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
444        cache.upsert("tokens/list", "key3", json!({"id": 3})).await;
445
446        assert!(cache.get("tokens/list", "key1").await.is_none());
447        assert!(cache.get("tokens/list", "key2").await.is_some());
448        assert!(cache.get("tokens/list", "key3").await.is_some());
449    }
450
451    #[tokio::test]
452    async fn test_get_all() {
453        let cache = EntityCache::new();
454
455        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
456        cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
457
458        let all = cache.get_all("tokens/list").await;
459        assert_eq!(all.len(), 2);
460    }
461
462    #[tokio::test]
463    async fn test_separate_views() {
464        let cache = EntityCache::new();
465
466        cache
467            .upsert("tokens/list", "key1", json!({"type": "token"}))
468            .await;
469        cache
470            .upsert("games/list", "key1", json!({"type": "game"}))
471            .await;
472
473        let token = cache.get("tokens/list", "key1").await.unwrap();
474        let game = cache.get("games/list", "key1").await.unwrap();
475
476        assert_eq!(token["type"], "token");
477        assert_eq!(game["type"], "game");
478    }
479
480    #[test]
481    fn test_deep_merge_with_append() {
482        let mut base = json!({
483            "a": 1,
484            "b": {"c": 2},
485            "arr": [1, 2]
486        });
487
488        let patch = json!({
489            "b": {"d": 3},
490            "arr": [3],
491            "e": 4
492        });
493
494        deep_merge_with_append(&mut base, patch, &["arr".to_string()], 100);
495
496        assert_eq!(base["a"], 1);
497        assert_eq!(base["b"]["c"], 2);
498        assert_eq!(base["b"]["d"], 3);
499        assert_eq!(base["arr"].as_array().unwrap().len(), 3);
500        assert_eq!(base["e"], 4);
501    }
502
503    #[test]
504    fn test_deep_merge_replace_array() {
505        let mut base = json!({
506            "arr": [1, 2, 3]
507        });
508
509        let patch = json!({
510            "arr": [4, 5]
511        });
512
513        deep_merge_with_append(&mut base, patch, &[], 100);
514
515        assert_eq!(base["arr"].as_array().unwrap().len(), 2);
516        assert_eq!(base["arr"][0], 4);
517        assert_eq!(base["arr"][1], 5);
518    }
519
520    #[test]
521    fn test_deep_merge_nested_append() {
522        let mut base = json!({
523            "stats": {"events": [1, 2]}
524        });
525
526        let patch = json!({
527            "stats": {"events": [3]}
528        });
529
530        deep_merge_with_append(&mut base, patch, &["stats.events".to_string()], 100);
531
532        assert_eq!(base["stats"]["events"].as_array().unwrap().len(), 3);
533    }
534
535    #[test]
536    fn test_snapshot_config_defaults() {
537        let cache = EntityCache::new();
538        let config = cache.snapshot_config();
539
540        assert_eq!(config.initial_batch_size, 50);
541        assert_eq!(config.subsequent_batch_size, 100);
542    }
543
544    #[test]
545    fn test_snapshot_config_custom() {
546        let config = EntityCacheConfig {
547            initial_snapshot_batch_size: 25,
548            subsequent_snapshot_batch_size: 75,
549            ..Default::default()
550        };
551        let cache = EntityCache::with_config(config);
552        let snapshot_config = cache.snapshot_config();
553
554        assert_eq!(snapshot_config.initial_batch_size, 25);
555        assert_eq!(snapshot_config.subsequent_batch_size, 75);
556    }
557}