hyperstack_server/
cache.rs

1//! Entity cache for snapshot-on-subscribe functionality.
2//!
3//! This module provides an `EntityCache` that maintains full projected entities
4//! in memory with LRU eviction. When a new client subscribes, they receive
5//! cached snapshots immediately rather than waiting for the next live mutation.
6
7use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14/// Default maximum entities per view
15const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 1000;
16
17/// Default maximum array length before LRU eviction
18const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
19
20/// Configuration for the entity cache
21#[derive(Debug, Clone)]
22pub struct EntityCacheConfig {
23    /// Maximum number of entities to cache per view
24    pub max_entities_per_view: usize,
25    /// Maximum array length before oldest elements are evicted
26    pub max_array_length: usize,
27}
28
29impl Default for EntityCacheConfig {
30    fn default() -> Self {
31        Self {
32            max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
33            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
34        }
35    }
36}
37
38/// Entity cache that maintains full projected entities with LRU eviction.
39///
40/// The cache is populated as mutations flow through the projector, regardless
41/// of subscriber state. When a new subscriber connects, they receive snapshots
42/// of all cached entities for their requested view.
43#[derive(Clone)]
44pub struct EntityCache {
45    /// view_id -> LRU<entity_key, full_projected_entity>
46    caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
47    config: EntityCacheConfig,
48}
49
50impl EntityCache {
51    /// Create a new entity cache with default configuration
52    pub fn new() -> Self {
53        Self::with_config(EntityCacheConfig::default())
54    }
55
56    /// Create a new entity cache with custom configuration
57    pub fn with_config(config: EntityCacheConfig) -> Self {
58        Self {
59            caches: Arc::new(RwLock::new(HashMap::new())),
60            config,
61        }
62    }
63
64    /// Upsert a patch into the cache, merging with existing entity data.
65    ///
66    /// This method:
67    /// 1. Gets or creates the LRU cache for the view
68    /// 2. Gets or creates an empty entity for the key
69    /// 3. Deep merges the patch into the entity (appending arrays)
70    /// 4. Updates the LRU cache (promoting the key to most recently used)
71    pub async fn upsert(&self, view_id: &str, key: &str, patch: &Value) {
72        let mut caches = self.caches.write().await;
73
74        let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
75            LruCache::new(
76                NonZeroUsize::new(self.config.max_entities_per_view)
77                    .expect("max_entities_per_view must be > 0"),
78            )
79        });
80
81        // Get existing entity or create empty object
82        let entity = cache
83            .get_mut(key)
84            .map(|v| v.clone())
85            .unwrap_or_else(|| Value::Object(serde_json::Map::new()));
86
87        // Deep merge patch into entity
88        let merged = deep_merge_with_array_append(entity, patch, self.config.max_array_length);
89
90        // Put back into cache (this also promotes to most recently used)
91        cache.put(key.to_string(), merged);
92    }
93
94    /// Get all cached entities for a view.
95    ///
96    /// Returns a vector of (key, entity) pairs for sending as snapshots
97    /// to new subscribers.
98    pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
99        let caches = self.caches.read().await;
100
101        caches
102            .get(view_id)
103            .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
104            .unwrap_or_default()
105    }
106
107    /// Get a specific entity from the cache
108    pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
109        let caches = self.caches.read().await;
110        caches
111            .get(view_id)
112            .and_then(|cache| cache.peek(key).cloned())
113    }
114
115    /// Get the number of cached entities for a view
116    pub async fn len(&self, view_id: &str) -> usize {
117        let caches = self.caches.read().await;
118        caches.get(view_id).map(|c| c.len()).unwrap_or(0)
119    }
120
121    /// Check if the cache for a view is empty
122    pub async fn is_empty(&self, view_id: &str) -> bool {
123        self.len(view_id).await == 0
124    }
125
126    /// Clear all cached entities for a view
127    pub async fn clear(&self, view_id: &str) {
128        let mut caches = self.caches.write().await;
129        if let Some(cache) = caches.get_mut(view_id) {
130            cache.clear();
131        }
132    }
133
134    /// Clear all caches
135    pub async fn clear_all(&self) {
136        let mut caches = self.caches.write().await;
137        caches.clear();
138    }
139}
140
141impl Default for EntityCache {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147/// Deep merge two JSON values, appending arrays instead of replacing them.
148///
149/// For arrays, new elements are appended to the end. If the array exceeds
150/// `max_array_length`, the oldest elements (from the beginning) are removed.
151fn deep_merge_with_array_append(base: Value, patch: &Value, max_array_length: usize) -> Value {
152    match (base, patch) {
153        // Both are objects: recursively merge
154        (Value::Object(mut base_map), Value::Object(patch_map)) => {
155            for (key, patch_value) in patch_map {
156                let merged = if let Some(base_value) = base_map.remove(key) {
157                    deep_merge_with_array_append(base_value, patch_value, max_array_length)
158                } else {
159                    // Key doesn't exist in base, use patch value (with array truncation if needed)
160                    truncate_arrays_if_needed(patch_value.clone(), max_array_length)
161                };
162                base_map.insert(key.clone(), merged);
163            }
164            Value::Object(base_map)
165        }
166
167        // Both are arrays: append and apply LRU eviction
168        (Value::Array(mut base_arr), Value::Array(patch_arr)) => {
169            // Append new elements
170            base_arr.extend(patch_arr.iter().cloned());
171
172            // LRU eviction: remove oldest elements from beginning if over limit
173            if base_arr.len() > max_array_length {
174                let excess = base_arr.len() - max_array_length;
175                base_arr.drain(0..excess);
176            }
177
178            Value::Array(base_arr)
179        }
180
181        // Patch has array but base doesn't: use patch array (truncated if needed)
182        (_, Value::Array(patch_arr)) => {
183            let mut arr = patch_arr.clone();
184            if arr.len() > max_array_length {
185                let excess = arr.len() - max_array_length;
186                arr.drain(0..excess);
187            }
188            Value::Array(arr)
189        }
190
191        // Default: patch value overwrites base
192        (_, patch_value) => patch_value.clone(),
193    }
194}
195
196/// Recursively truncate any arrays in a value to the max length
197fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
198    match value {
199        Value::Array(mut arr) => {
200            // Truncate this array if needed
201            if arr.len() > max_array_length {
202                let excess = arr.len() - max_array_length;
203                arr.drain(0..excess);
204            }
205            // Recursively process elements
206            Value::Array(
207                arr.into_iter()
208                    .map(|v| truncate_arrays_if_needed(v, max_array_length))
209                    .collect(),
210            )
211        }
212        Value::Object(map) => Value::Object(
213            map.into_iter()
214                .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
215                .collect(),
216        ),
217        other => other,
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use serde_json::json;
225
226    #[tokio::test]
227    async fn test_basic_upsert_and_get() {
228        let cache = EntityCache::new();
229
230        cache
231            .upsert("tokens/list", "abc123", &json!({"name": "Test Token"}))
232            .await;
233
234        let entity = cache.get("tokens/list", "abc123").await;
235        assert!(entity.is_some());
236        assert_eq!(entity.unwrap()["name"], "Test Token");
237    }
238
239    #[tokio::test]
240    async fn test_deep_merge_objects() {
241        let cache = EntityCache::new();
242
243        // First patch: set initial data
244        cache
245            .upsert(
246                "tokens/list",
247                "abc123",
248                &json!({
249                    "id": "abc123",
250                    "metrics": {"volume": 100}
251                }),
252            )
253            .await;
254
255        // Second patch: add more data
256        cache
257            .upsert(
258                "tokens/list",
259                "abc123",
260                &json!({
261                    "metrics": {"trades": 50}
262                }),
263            )
264            .await;
265
266        let entity = cache.get("tokens/list", "abc123").await.unwrap();
267        assert_eq!(entity["id"], "abc123");
268        assert_eq!(entity["metrics"]["volume"], 100);
269        assert_eq!(entity["metrics"]["trades"], 50);
270    }
271
272    #[tokio::test]
273    async fn test_array_append() {
274        let cache = EntityCache::new();
275
276        // First patch with initial events
277        cache
278            .upsert(
279                "tokens/list",
280                "abc123",
281                &json!({
282                    "events": [{"type": "buy", "amount": 100}]
283                }),
284            )
285            .await;
286
287        // Second patch appends to events
288        cache
289            .upsert(
290                "tokens/list",
291                "abc123",
292                &json!({
293                    "events": [{"type": "sell", "amount": 50}]
294                }),
295            )
296            .await;
297
298        let entity = cache.get("tokens/list", "abc123").await.unwrap();
299        let events = entity["events"].as_array().unwrap();
300        assert_eq!(events.len(), 2);
301        assert_eq!(events[0]["type"], "buy");
302        assert_eq!(events[1]["type"], "sell");
303    }
304
305    #[tokio::test]
306    async fn test_array_lru_eviction() {
307        let config = EntityCacheConfig {
308            max_entities_per_view: 1000,
309            max_array_length: 3,
310        };
311        let cache = EntityCache::with_config(config);
312
313        // Add 5 events (exceeds max of 3)
314        cache
315            .upsert(
316                "tokens/list",
317                "abc123",
318                &json!({
319                    "events": [
320                        {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
321                    ]
322                }),
323            )
324            .await;
325
326        let entity = cache.get("tokens/list", "abc123").await.unwrap();
327        let events = entity["events"].as_array().unwrap();
328
329        // Should only have last 3 (oldest evicted)
330        assert_eq!(events.len(), 3);
331        assert_eq!(events[0]["id"], 3);
332        assert_eq!(events[1]["id"], 4);
333        assert_eq!(events[2]["id"], 5);
334    }
335
336    #[tokio::test]
337    async fn test_array_append_with_lru() {
338        let config = EntityCacheConfig {
339            max_entities_per_view: 1000,
340            max_array_length: 3,
341        };
342        let cache = EntityCache::with_config(config);
343
344        // Start with 2 events
345        cache
346            .upsert(
347                "tokens/list",
348                "abc123",
349                &json!({
350                    "events": [{"id": 1}, {"id": 2}]
351                }),
352            )
353            .await;
354
355        // Append 2 more (total 4, exceeds max 3)
356        cache
357            .upsert(
358                "tokens/list",
359                "abc123",
360                &json!({
361                    "events": [{"id": 3}, {"id": 4}]
362                }),
363            )
364            .await;
365
366        let entity = cache.get("tokens/list", "abc123").await.unwrap();
367        let events = entity["events"].as_array().unwrap();
368
369        // Should have last 3
370        assert_eq!(events.len(), 3);
371        assert_eq!(events[0]["id"], 2);
372        assert_eq!(events[1]["id"], 3);
373        assert_eq!(events[2]["id"], 4);
374    }
375
376    #[tokio::test]
377    async fn test_entity_lru_eviction() {
378        let config = EntityCacheConfig {
379            max_entities_per_view: 2,
380            max_array_length: 100,
381        };
382        let cache = EntityCache::with_config(config);
383
384        // Add 3 entities (exceeds max of 2)
385        cache.upsert("tokens/list", "key1", &json!({"id": 1})).await;
386        cache.upsert("tokens/list", "key2", &json!({"id": 2})).await;
387        cache.upsert("tokens/list", "key3", &json!({"id": 3})).await;
388
389        // key1 should be evicted (LRU)
390        assert!(cache.get("tokens/list", "key1").await.is_none());
391        assert!(cache.get("tokens/list", "key2").await.is_some());
392        assert!(cache.get("tokens/list", "key3").await.is_some());
393    }
394
395    #[tokio::test]
396    async fn test_get_all() {
397        let cache = EntityCache::new();
398
399        cache.upsert("tokens/list", "key1", &json!({"id": 1})).await;
400        cache.upsert("tokens/list", "key2", &json!({"id": 2})).await;
401
402        let all = cache.get_all("tokens/list").await;
403        assert_eq!(all.len(), 2);
404    }
405
406    #[tokio::test]
407    async fn test_separate_views() {
408        let cache = EntityCache::new();
409
410        cache
411            .upsert("tokens/list", "key1", &json!({"type": "token"}))
412            .await;
413        cache
414            .upsert("games/list", "key1", &json!({"type": "game"}))
415            .await;
416
417        let token = cache.get("tokens/list", "key1").await.unwrap();
418        let game = cache.get("games/list", "key1").await.unwrap();
419
420        assert_eq!(token["type"], "token");
421        assert_eq!(game["type"], "game");
422    }
423
424    #[test]
425    fn test_deep_merge_function() {
426        let base = json!({
427            "a": 1,
428            "b": {"c": 2},
429            "arr": [1, 2]
430        });
431
432        let patch = json!({
433            "b": {"d": 3},
434            "arr": [3],
435            "e": 4
436        });
437
438        let result = deep_merge_with_array_append(base, &patch, 100);
439
440        assert_eq!(result["a"], 1);
441        assert_eq!(result["b"]["c"], 2);
442        assert_eq!(result["b"]["d"], 3);
443        assert_eq!(result["arr"].as_array().unwrap().len(), 3);
444        assert_eq!(result["e"], 4);
445    }
446}