Skip to main content

hyperstack_server/
cache.rs

1//! Entity cache for snapshot-on-subscribe functionality.
2//!
3//! This module provides an `EntityCache` that maintains full projected entities
4//! in memory with LRU eviction. When a new client subscribes, they receive
5//! cached snapshots immediately rather than waiting for the next live mutation.
6
7use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 5;
15const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
16const DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE: usize = 50;
17const DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE: usize = 100;
18
19/// Configuration for the entity cache
20#[derive(Debug, Clone)]
21pub struct EntityCacheConfig {
22    /// Maximum number of entities to cache per view
23    pub max_entities_per_view: usize,
24    /// Maximum array length before oldest elements are evicted
25    pub max_array_length: usize,
26    /// Number of entities to send in the first snapshot batch (for fast initial render)
27    pub initial_snapshot_batch_size: usize,
28    /// Number of entities to send in subsequent snapshot batches
29    pub subsequent_snapshot_batch_size: usize,
30}
31
32impl Default for EntityCacheConfig {
33    fn default() -> Self {
34        Self {
35            max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
36            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
37            initial_snapshot_batch_size: DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE,
38            subsequent_snapshot_batch_size: DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE,
39        }
40    }
41}
42
43/// Entity cache that maintains full projected entities with LRU eviction.
44///
45/// The cache is populated as mutations flow through the projector, regardless
46/// of subscriber state. When a new subscriber connects, they receive snapshots
47/// of all cached entities for their requested view.
48#[derive(Clone)]
49pub struct EntityCache {
50    /// view_id -> LRU<entity_key, full_projected_entity>
51    caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
52    config: EntityCacheConfig,
53}
54
55impl EntityCache {
56    /// Create a new entity cache with default configuration
57    pub fn new() -> Self {
58        Self::with_config(EntityCacheConfig::default())
59    }
60
61    /// Create a new entity cache with custom configuration
62    pub fn with_config(config: EntityCacheConfig) -> Self {
63        Self {
64            caches: Arc::new(RwLock::new(HashMap::new())),
65            config,
66        }
67    }
68
69    pub async fn upsert(&self, view_id: &str, key: &str, patch: Value) {
70        self.upsert_with_append(view_id, key, patch, &[]).await;
71    }
72
73    pub async fn upsert_with_append(
74        &self,
75        view_id: &str,
76        key: &str,
77        patch: Value,
78        append_paths: &[String],
79    ) {
80        let debug_computed = std::env::var("HYPERSTACK_DEBUG_COMPUTED").is_ok();
81
82        if debug_computed {
83            if let Some(results) = patch.get("results") {
84                if results.get("rng").is_some()
85                    || results.get("winning_square").is_some()
86                    || results.get("did_hit_motherlode").is_some()
87                {
88                    tracing::warn!(
89                        "[CACHE_UPSERT] view={} key={} patch.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
90                        view_id,
91                        key,
92                        results.get("rng"),
93                        results.get("winning_square"),
94                        results.get("did_hit_motherlode")
95                    );
96                }
97            }
98        }
99
100        let mut caches = self.caches.write().await;
101
102        let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
103            LruCache::new(
104                NonZeroUsize::new(self.config.max_entities_per_view)
105                    .expect("max_entities_per_view must be > 0"),
106            )
107        });
108
109        let max_array_length = self.config.max_array_length;
110
111        if let Some(entity) = cache.get_mut(key) {
112            if debug_computed {
113                if let Some(results) = entity.get("results") {
114                    if results.get("rng").is_some() {
115                        tracing::warn!(
116                            "[CACHE_UPSERT] view={} key={} BEFORE merge entity.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
117                            view_id,
118                            key,
119                            results.get("rng"),
120                            results.get("winning_square"),
121                            results.get("did_hit_motherlode")
122                        );
123                    }
124                }
125            }
126
127            deep_merge_with_append(entity, patch, append_paths, max_array_length);
128
129            if debug_computed {
130                if let Some(results) = entity.get("results") {
131                    if results.get("rng").is_some() {
132                        tracing::warn!(
133                            "[CACHE_UPSERT] view={} key={} AFTER merge entity.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
134                            view_id,
135                            key,
136                            results.get("rng"),
137                            results.get("winning_square"),
138                            results.get("did_hit_motherlode")
139                        );
140                    }
141                }
142            }
143        } else {
144            let new_entity = truncate_arrays_if_needed(patch, max_array_length);
145            cache.put(key.to_string(), new_entity);
146        }
147    }
148
149    /// Get all cached entities for a view.
150    ///
151    /// Returns a vector of (key, entity) pairs for sending as snapshots
152    /// to new subscribers.
153    pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
154        let caches = self.caches.read().await;
155
156        caches
157            .get(view_id)
158            .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
159            .unwrap_or_default()
160    }
161
162    /// Get a specific entity from the cache
163    pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
164        let caches = self.caches.read().await;
165        caches
166            .get(view_id)
167            .and_then(|cache| cache.peek(key).cloned())
168    }
169
170    /// Get the number of cached entities for a view
171    pub async fn len(&self, view_id: &str) -> usize {
172        let caches = self.caches.read().await;
173        caches.get(view_id).map(|c| c.len()).unwrap_or(0)
174    }
175
176    /// Check if the cache for a view is empty
177    pub async fn is_empty(&self, view_id: &str) -> bool {
178        self.len(view_id).await == 0
179    }
180
181    /// Get the snapshot batch configuration
182    pub fn snapshot_config(&self) -> SnapshotBatchConfig {
183        SnapshotBatchConfig {
184            initial_batch_size: self.config.initial_snapshot_batch_size,
185            subsequent_batch_size: self.config.subsequent_snapshot_batch_size,
186        }
187    }
188
189    /// Clear all cached entities for a view
190    pub async fn clear(&self, view_id: &str) {
191        let mut caches = self.caches.write().await;
192        if let Some(cache) = caches.get_mut(view_id) {
193            cache.clear();
194        }
195    }
196
197    pub async fn clear_all(&self) {
198        let mut caches = self.caches.write().await;
199        caches.clear();
200    }
201
202    pub async fn stats(&self) -> CacheStats {
203        let caches = self.caches.read().await;
204        let mut total_entities = 0;
205        let mut views = Vec::new();
206
207        for (view_id, cache) in caches.iter() {
208            let count = cache.len();
209            total_entities += count;
210            views.push((view_id.clone(), count));
211        }
212
213        views.sort_by(|a, b| b.1.cmp(&a.1));
214
215        CacheStats {
216            view_count: caches.len(),
217            total_entities,
218            top_views: views.into_iter().take(5).collect(),
219        }
220    }
221}
222
223#[derive(Debug)]
224pub struct CacheStats {
225    pub view_count: usize,
226    pub total_entities: usize,
227    pub top_views: Vec<(String, usize)>,
228}
229
230#[derive(Debug, Clone, Copy)]
231pub struct SnapshotBatchConfig {
232    pub initial_batch_size: usize,
233    pub subsequent_batch_size: usize,
234}
235
236impl Default for EntityCache {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242fn deep_merge_with_append(
243    base: &mut Value,
244    patch: Value,
245    append_paths: &[String],
246    max_array_length: usize,
247) {
248    deep_merge_with_append_inner(base, patch, append_paths, "", max_array_length);
249}
250
251fn deep_merge_with_append_inner(
252    base: &mut Value,
253    patch: Value,
254    append_paths: &[String],
255    current_path: &str,
256    max_array_length: usize,
257) {
258    match (base, patch) {
259        (Value::Object(base_map), Value::Object(patch_map)) => {
260            for (key, patch_value) in patch_map {
261                let child_path = if current_path.is_empty() {
262                    key.clone()
263                } else {
264                    format!("{}.{}", current_path, key)
265                };
266
267                if let Some(base_value) = base_map.get_mut(&key) {
268                    deep_merge_with_append_inner(
269                        base_value,
270                        patch_value,
271                        append_paths,
272                        &child_path,
273                        max_array_length,
274                    );
275                } else {
276                    base_map.insert(
277                        key,
278                        truncate_arrays_if_needed(patch_value, max_array_length),
279                    );
280                }
281            }
282        }
283
284        (Value::Array(base_arr), Value::Array(patch_arr)) => {
285            let should_append = append_paths.iter().any(|p| p == current_path);
286            if should_append {
287                base_arr.extend(patch_arr);
288                if base_arr.len() > max_array_length {
289                    let excess = base_arr.len() - max_array_length;
290                    base_arr.drain(0..excess);
291                }
292            } else {
293                *base_arr = patch_arr;
294                if base_arr.len() > max_array_length {
295                    let excess = base_arr.len() - max_array_length;
296                    base_arr.drain(0..excess);
297                }
298            }
299        }
300
301        (base, patch_value) => {
302            *base = truncate_arrays_if_needed(patch_value, max_array_length);
303        }
304    }
305}
306
307/// Recursively truncate any arrays in a value to the max length
308fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
309    match value {
310        Value::Array(mut arr) => {
311            // Truncate this array if needed
312            if arr.len() > max_array_length {
313                let excess = arr.len() - max_array_length;
314                arr.drain(0..excess);
315            }
316            // Recursively process elements
317            Value::Array(
318                arr.into_iter()
319                    .map(|v| truncate_arrays_if_needed(v, max_array_length))
320                    .collect(),
321            )
322        }
323        Value::Object(map) => Value::Object(
324            map.into_iter()
325                .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
326                .collect(),
327        ),
328        other => other,
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use serde_json::json;
336
337    #[tokio::test]
338    async fn test_basic_upsert_and_get() {
339        let cache = EntityCache::new();
340
341        cache
342            .upsert("tokens/list", "abc123", json!({"name": "Test Token"}))
343            .await;
344
345        let entity = cache.get("tokens/list", "abc123").await;
346        assert!(entity.is_some());
347        assert_eq!(entity.unwrap()["name"], "Test Token");
348    }
349
350    #[tokio::test]
351    async fn test_deep_merge_objects() {
352        let cache = EntityCache::new();
353
354        cache
355            .upsert(
356                "tokens/list",
357                "abc123",
358                json!({
359                    "id": "abc123",
360                    "metrics": {"volume": 100}
361                }),
362            )
363            .await;
364
365        cache
366            .upsert(
367                "tokens/list",
368                "abc123",
369                json!({
370                    "metrics": {"trades": 50}
371                }),
372            )
373            .await;
374
375        let entity = cache.get("tokens/list", "abc123").await.unwrap();
376        assert_eq!(entity["id"], "abc123");
377        assert_eq!(entity["metrics"]["volume"], 100);
378        assert_eq!(entity["metrics"]["trades"], 50);
379    }
380
381    #[tokio::test]
382    async fn test_array_append() {
383        let cache = EntityCache::new();
384
385        cache
386            .upsert(
387                "tokens/list",
388                "abc123",
389                json!({
390                    "events": [{"type": "buy", "amount": 100}]
391                }),
392            )
393            .await;
394
395        cache
396            .upsert_with_append(
397                "tokens/list",
398                "abc123",
399                json!({
400                    "events": [{"type": "sell", "amount": 50}]
401                }),
402                &["events".to_string()],
403            )
404            .await;
405
406        let entity = cache.get("tokens/list", "abc123").await.unwrap();
407        let events = entity["events"].as_array().unwrap();
408        assert_eq!(events.len(), 2);
409        assert_eq!(events[0]["type"], "buy");
410        assert_eq!(events[1]["type"], "sell");
411    }
412
413    #[tokio::test]
414    async fn test_array_lru_eviction() {
415        let config = EntityCacheConfig {
416            max_entities_per_view: 1000,
417            max_array_length: 3,
418            ..Default::default()
419        };
420        let cache = EntityCache::with_config(config);
421
422        cache
423            .upsert(
424                "tokens/list",
425                "abc123",
426                json!({
427                    "events": [
428                        {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
429                    ]
430                }),
431            )
432            .await;
433
434        let entity = cache.get("tokens/list", "abc123").await.unwrap();
435        let events = entity["events"].as_array().unwrap();
436
437        assert_eq!(events.len(), 3);
438        assert_eq!(events[0]["id"], 3);
439        assert_eq!(events[1]["id"], 4);
440        assert_eq!(events[2]["id"], 5);
441    }
442
443    #[tokio::test]
444    async fn test_array_append_with_lru() {
445        let config = EntityCacheConfig {
446            max_entities_per_view: 1000,
447            max_array_length: 3,
448            ..Default::default()
449        };
450        let cache = EntityCache::with_config(config);
451
452        cache
453            .upsert(
454                "tokens/list",
455                "abc123",
456                json!({
457                    "events": [{"id": 1}, {"id": 2}]
458                }),
459            )
460            .await;
461
462        cache
463            .upsert_with_append(
464                "tokens/list",
465                "abc123",
466                json!({
467                    "events": [{"id": 3}, {"id": 4}]
468                }),
469                &["events".to_string()],
470            )
471            .await;
472
473        let entity = cache.get("tokens/list", "abc123").await.unwrap();
474        let events = entity["events"].as_array().unwrap();
475
476        // [1,2] + [3,4] = [1,2,3,4] → LRU(3) = [2,3,4]
477        assert_eq!(events.len(), 3);
478        assert_eq!(events[0]["id"], 2);
479        assert_eq!(events[1]["id"], 3);
480        assert_eq!(events[2]["id"], 4);
481    }
482
483    #[tokio::test]
484    async fn test_entity_lru_eviction() {
485        let config = EntityCacheConfig {
486            max_entities_per_view: 2,
487            max_array_length: 100,
488            ..Default::default()
489        };
490        let cache = EntityCache::with_config(config);
491
492        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
493        cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
494        cache.upsert("tokens/list", "key3", json!({"id": 3})).await;
495
496        assert!(cache.get("tokens/list", "key1").await.is_none());
497        assert!(cache.get("tokens/list", "key2").await.is_some());
498        assert!(cache.get("tokens/list", "key3").await.is_some());
499    }
500
501    #[tokio::test]
502    async fn test_get_all() {
503        let cache = EntityCache::new();
504
505        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
506        cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
507
508        let all = cache.get_all("tokens/list").await;
509        assert_eq!(all.len(), 2);
510    }
511
512    #[tokio::test]
513    async fn test_separate_views() {
514        let cache = EntityCache::new();
515
516        cache
517            .upsert("tokens/list", "key1", json!({"type": "token"}))
518            .await;
519        cache
520            .upsert("games/list", "key1", json!({"type": "game"}))
521            .await;
522
523        let token = cache.get("tokens/list", "key1").await.unwrap();
524        let game = cache.get("games/list", "key1").await.unwrap();
525
526        assert_eq!(token["type"], "token");
527        assert_eq!(game["type"], "game");
528    }
529
530    #[test]
531    fn test_deep_merge_with_append() {
532        let mut base = json!({
533            "a": 1,
534            "b": {"c": 2},
535            "arr": [1, 2]
536        });
537
538        let patch = json!({
539            "b": {"d": 3},
540            "arr": [3],
541            "e": 4
542        });
543
544        deep_merge_with_append(&mut base, patch, &["arr".to_string()], 100);
545
546        assert_eq!(base["a"], 1);
547        assert_eq!(base["b"]["c"], 2);
548        assert_eq!(base["b"]["d"], 3);
549        assert_eq!(base["arr"].as_array().unwrap().len(), 3);
550        assert_eq!(base["e"], 4);
551    }
552
553    #[test]
554    fn test_deep_merge_replace_array() {
555        let mut base = json!({
556            "arr": [1, 2, 3]
557        });
558
559        let patch = json!({
560            "arr": [4, 5]
561        });
562
563        deep_merge_with_append(&mut base, patch, &[], 100);
564
565        assert_eq!(base["arr"].as_array().unwrap().len(), 2);
566        assert_eq!(base["arr"][0], 4);
567        assert_eq!(base["arr"][1], 5);
568    }
569
570    #[test]
571    fn test_deep_merge_nested_append() {
572        let mut base = json!({
573            "stats": {"events": [1, 2]}
574        });
575
576        let patch = json!({
577            "stats": {"events": [3]}
578        });
579
580        deep_merge_with_append(&mut base, patch, &["stats.events".to_string()], 100);
581
582        assert_eq!(base["stats"]["events"].as_array().unwrap().len(), 3);
583    }
584
585    #[test]
586    fn test_snapshot_config_defaults() {
587        let cache = EntityCache::new();
588        let config = cache.snapshot_config();
589
590        assert_eq!(config.initial_batch_size, 50);
591        assert_eq!(config.subsequent_batch_size, 100);
592    }
593
594    #[test]
595    fn test_snapshot_config_custom() {
596        let config = EntityCacheConfig {
597            initial_snapshot_batch_size: 25,
598            subsequent_snapshot_batch_size: 75,
599            ..Default::default()
600        };
601        let cache = EntityCache::with_config(config);
602        let snapshot_config = cache.snapshot_config();
603
604        assert_eq!(snapshot_config.initial_batch_size, 25);
605        assert_eq!(snapshot_config.subsequent_batch_size, 75);
606    }
607}