Skip to main content

hyperstack_server/
cache.rs

1//! Entity cache for snapshot-on-subscribe functionality.
2//!
3//! This module provides an `EntityCache` that maintains full projected entities
4//! in memory with LRU eviction. When a new client subscribes, they receive
5//! cached snapshots immediately rather than waiting for the next live mutation.
6
7use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 500;
15const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
16const DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE: usize = 50;
17const DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE: usize = 100;
18
19/// Compare two `_seq` values numerically.
20/// `_seq` format is "{slot}:{offset}" where slot is not zero-padded.
21/// This handles digit-count boundaries correctly (e.g., 99999999 < 100000000).
22pub fn cmp_seq(a: &str, b: &str) -> std::cmp::Ordering {
23    fn parse(s: &str) -> (u64, u64) {
24        let mut parts = s.splitn(2, ':');
25        let slot = parts.next().and_then(|p| p.parse().ok()).unwrap_or(0);
26        let offset = parts.next().and_then(|p| p.parse().ok()).unwrap_or(0);
27        (slot, offset)
28    }
29    parse(a).cmp(&parse(b))
30}
31
32/// Configuration for the entity cache
33#[derive(Debug, Clone)]
34pub struct EntityCacheConfig {
35    /// Maximum number of entities to cache per view
36    pub max_entities_per_view: usize,
37    /// Maximum array length before oldest elements are evicted
38    pub max_array_length: usize,
39    /// Number of entities to send in the first snapshot batch (for fast initial render)
40    pub initial_snapshot_batch_size: usize,
41    /// Number of entities to send in subsequent snapshot batches
42    pub subsequent_snapshot_batch_size: usize,
43}
44
45impl Default for EntityCacheConfig {
46    fn default() -> Self {
47        Self {
48            max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
49            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
50            initial_snapshot_batch_size: DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE,
51            subsequent_snapshot_batch_size: DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE,
52        }
53    }
54}
55
56/// Entity cache that maintains full projected entities with LRU eviction.
57///
58/// The cache is populated as mutations flow through the projector, regardless
59/// of subscriber state. When a new subscriber connects, they receive snapshots
60/// of all cached entities for their requested view.
61#[derive(Clone)]
62pub struct EntityCache {
63    /// view_id -> LRU<entity_key, full_projected_entity>
64    caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
65    config: EntityCacheConfig,
66}
67
68impl EntityCache {
69    /// Create a new entity cache with default configuration
70    pub fn new() -> Self {
71        Self::with_config(EntityCacheConfig::default())
72    }
73
74    /// Create a new entity cache with custom configuration
75    pub fn with_config(config: EntityCacheConfig) -> Self {
76        Self {
77            caches: Arc::new(RwLock::new(HashMap::new())),
78            config,
79        }
80    }
81
82    pub async fn upsert(&self, view_id: &str, key: &str, patch: Value) {
83        self.upsert_with_append(view_id, key, patch, &[]).await;
84    }
85
86    pub async fn upsert_with_append(
87        &self,
88        view_id: &str,
89        key: &str,
90        patch: Value,
91        append_paths: &[String],
92    ) {
93        let mut caches = self.caches.write().await;
94
95        let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
96            LruCache::new(
97                NonZeroUsize::new(self.config.max_entities_per_view)
98                    .expect("max_entities_per_view must be > 0"),
99            )
100        });
101
102        let max_array_length = self.config.max_array_length;
103
104        if let Some(entity) = cache.get_mut(key) {
105            deep_merge_with_append(entity, patch, append_paths, max_array_length);
106        } else {
107            let new_entity = truncate_arrays_if_needed(patch, max_array_length);
108            cache.put(key.to_string(), new_entity);
109        }
110    }
111
112    /// Get all cached entities for a view.
113    ///
114    /// Returns a vector of (key, entity) pairs for sending as snapshots
115    /// to new subscribers.
116    pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
117        let caches = self.caches.read().await;
118
119        caches
120            .get(view_id)
121            .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
122            .unwrap_or_default()
123    }
124
125    /// Get entities with _seq greater than the provided cursor.
126    ///
127    /// Returns entities that have been updated after the given cursor,
128    /// sorted by _seq in ascending order. Useful for resuming from
129    /// a specific point in the stream.
130    pub async fn get_after(
131        &self,
132        view_id: &str,
133        cursor: &str,
134        limit: Option<usize>,
135    ) -> Vec<(String, Value)> {
136        let caches = self.caches.read().await;
137
138        if let Some(cache) = caches.get(view_id) {
139            let mut results: Vec<(String, Value)> = cache
140                .iter()
141                .filter(|(_, entity)| {
142                    entity
143                        .get("_seq")
144                        .and_then(|s| s.as_str())
145                        .map(|seq| cmp_seq(seq, cursor) == std::cmp::Ordering::Greater)
146                        .unwrap_or(false)
147                })
148                .map(|(k, v)| (k.clone(), v.clone()))
149                .collect();
150
151            // Sort by _seq (ascending)
152            results.sort_by(|a, b| {
153                let seq_a = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
154                let seq_b = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
155                cmp_seq(seq_a, seq_b)
156            });
157
158            // Apply limit if provided
159            if let Some(limit) = limit {
160                results.truncate(limit);
161            }
162
163            results
164        } else {
165            vec![]
166        }
167    }
168
169    /// Get a specific entity from the cache
170    pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
171        let caches = self.caches.read().await;
172        caches
173            .get(view_id)
174            .and_then(|cache| cache.peek(key).cloned())
175    }
176
177    /// Get the number of cached entities for a view
178    pub async fn len(&self, view_id: &str) -> usize {
179        let caches = self.caches.read().await;
180        caches.get(view_id).map(|c| c.len()).unwrap_or(0)
181    }
182
183    /// Check if the cache for a view is empty
184    pub async fn is_empty(&self, view_id: &str) -> bool {
185        self.len(view_id).await == 0
186    }
187
188    /// Get the snapshot batch configuration
189    pub fn snapshot_config(&self) -> SnapshotBatchConfig {
190        SnapshotBatchConfig {
191            initial_batch_size: self.config.initial_snapshot_batch_size,
192            subsequent_batch_size: self.config.subsequent_snapshot_batch_size,
193        }
194    }
195
196    /// Clear all cached entities for a view
197    pub async fn clear(&self, view_id: &str) {
198        let mut caches = self.caches.write().await;
199        if let Some(cache) = caches.get_mut(view_id) {
200            cache.clear();
201        }
202    }
203
204    pub async fn clear_all(&self) {
205        let mut caches = self.caches.write().await;
206        caches.clear();
207    }
208
209    pub async fn stats(&self) -> CacheStats {
210        let caches = self.caches.read().await;
211        let mut total_entities = 0;
212        let mut views = Vec::new();
213
214        for (view_id, cache) in caches.iter() {
215            let count = cache.len();
216            total_entities += count;
217            views.push((view_id.clone(), count));
218        }
219
220        views.sort_by(|a, b| b.1.cmp(&a.1));
221
222        CacheStats {
223            view_count: caches.len(),
224            total_entities,
225            top_views: views.into_iter().take(5).collect(),
226        }
227    }
228}
229
230#[derive(Debug)]
231pub struct CacheStats {
232    pub view_count: usize,
233    pub total_entities: usize,
234    pub top_views: Vec<(String, usize)>,
235}
236
237#[derive(Debug, Clone, Copy)]
238pub struct SnapshotBatchConfig {
239    pub initial_batch_size: usize,
240    pub subsequent_batch_size: usize,
241}
242
243impl Default for EntityCache {
244    fn default() -> Self {
245        Self::new()
246    }
247}
248
249fn deep_merge_with_append(
250    base: &mut Value,
251    patch: Value,
252    append_paths: &[String],
253    max_array_length: usize,
254) {
255    deep_merge_with_append_inner(base, patch, append_paths, "", max_array_length);
256}
257
258fn deep_merge_with_append_inner(
259    base: &mut Value,
260    patch: Value,
261    append_paths: &[String],
262    current_path: &str,
263    max_array_length: usize,
264) {
265    match (base, patch) {
266        (Value::Object(base_map), Value::Object(patch_map)) => {
267            for (key, patch_value) in patch_map {
268                let child_path = if current_path.is_empty() {
269                    key.clone()
270                } else {
271                    format!("{}.{}", current_path, key)
272                };
273
274                if let Some(base_value) = base_map.get_mut(&key) {
275                    deep_merge_with_append_inner(
276                        base_value,
277                        patch_value,
278                        append_paths,
279                        &child_path,
280                        max_array_length,
281                    );
282                } else {
283                    base_map.insert(
284                        key,
285                        truncate_arrays_if_needed(patch_value, max_array_length),
286                    );
287                }
288            }
289        }
290
291        (Value::Array(base_arr), Value::Array(patch_arr)) => {
292            let should_append = append_paths.iter().any(|p| p == current_path);
293            if should_append {
294                base_arr.extend(patch_arr);
295                if base_arr.len() > max_array_length {
296                    let excess = base_arr.len() - max_array_length;
297                    base_arr.drain(0..excess);
298                }
299            } else {
300                *base_arr = patch_arr;
301                if base_arr.len() > max_array_length {
302                    let excess = base_arr.len() - max_array_length;
303                    base_arr.drain(0..excess);
304                }
305            }
306        }
307
308        (base, patch_value) => {
309            *base = truncate_arrays_if_needed(patch_value, max_array_length);
310        }
311    }
312}
313
314/// Recursively truncate any arrays in a value to the max length
315fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
316    match value {
317        Value::Array(mut arr) => {
318            // Truncate this array if needed
319            if arr.len() > max_array_length {
320                let excess = arr.len() - max_array_length;
321                arr.drain(0..excess);
322            }
323            // Recursively process elements
324            Value::Array(
325                arr.into_iter()
326                    .map(|v| truncate_arrays_if_needed(v, max_array_length))
327                    .collect(),
328            )
329        }
330        Value::Object(map) => Value::Object(
331            map.into_iter()
332                .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
333                .collect(),
334        ),
335        other => other,
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use serde_json::json;
343
344    #[tokio::test]
345    async fn test_basic_upsert_and_get() {
346        let cache = EntityCache::new();
347
348        cache
349            .upsert("tokens/list", "abc123", json!({"name": "Test Token"}))
350            .await;
351
352        let entity = cache.get("tokens/list", "abc123").await;
353        assert!(entity.is_some());
354        assert_eq!(entity.unwrap()["name"], "Test Token");
355    }
356
357    #[tokio::test]
358    async fn test_deep_merge_objects() {
359        let cache = EntityCache::new();
360
361        cache
362            .upsert(
363                "tokens/list",
364                "abc123",
365                json!({
366                    "id": "abc123",
367                    "metrics": {"volume": 100}
368                }),
369            )
370            .await;
371
372        cache
373            .upsert(
374                "tokens/list",
375                "abc123",
376                json!({
377                    "metrics": {"trades": 50}
378                }),
379            )
380            .await;
381
382        let entity = cache.get("tokens/list", "abc123").await.unwrap();
383        assert_eq!(entity["id"], "abc123");
384        assert_eq!(entity["metrics"]["volume"], 100);
385        assert_eq!(entity["metrics"]["trades"], 50);
386    }
387
388    #[tokio::test]
389    async fn test_array_append() {
390        let cache = EntityCache::new();
391
392        cache
393            .upsert(
394                "tokens/list",
395                "abc123",
396                json!({
397                    "events": [{"type": "buy", "amount": 100}]
398                }),
399            )
400            .await;
401
402        cache
403            .upsert_with_append(
404                "tokens/list",
405                "abc123",
406                json!({
407                    "events": [{"type": "sell", "amount": 50}]
408                }),
409                &["events".to_string()],
410            )
411            .await;
412
413        let entity = cache.get("tokens/list", "abc123").await.unwrap();
414        let events = entity["events"].as_array().unwrap();
415        assert_eq!(events.len(), 2);
416        assert_eq!(events[0]["type"], "buy");
417        assert_eq!(events[1]["type"], "sell");
418    }
419
420    #[tokio::test]
421    async fn test_array_lru_eviction() {
422        let config = EntityCacheConfig {
423            max_entities_per_view: 1000,
424            max_array_length: 3,
425            ..Default::default()
426        };
427        let cache = EntityCache::with_config(config);
428
429        cache
430            .upsert(
431                "tokens/list",
432                "abc123",
433                json!({
434                    "events": [
435                        {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
436                    ]
437                }),
438            )
439            .await;
440
441        let entity = cache.get("tokens/list", "abc123").await.unwrap();
442        let events = entity["events"].as_array().unwrap();
443
444        assert_eq!(events.len(), 3);
445        assert_eq!(events[0]["id"], 3);
446        assert_eq!(events[1]["id"], 4);
447        assert_eq!(events[2]["id"], 5);
448    }
449
450    #[tokio::test]
451    async fn test_array_append_with_lru() {
452        let config = EntityCacheConfig {
453            max_entities_per_view: 1000,
454            max_array_length: 3,
455            ..Default::default()
456        };
457        let cache = EntityCache::with_config(config);
458
459        cache
460            .upsert(
461                "tokens/list",
462                "abc123",
463                json!({
464                    "events": [{"id": 1}, {"id": 2}]
465                }),
466            )
467            .await;
468
469        cache
470            .upsert_with_append(
471                "tokens/list",
472                "abc123",
473                json!({
474                    "events": [{"id": 3}, {"id": 4}]
475                }),
476                &["events".to_string()],
477            )
478            .await;
479
480        let entity = cache.get("tokens/list", "abc123").await.unwrap();
481        let events = entity["events"].as_array().unwrap();
482
483        // [1,2] + [3,4] = [1,2,3,4] → LRU(3) = [2,3,4]
484        assert_eq!(events.len(), 3);
485        assert_eq!(events[0]["id"], 2);
486        assert_eq!(events[1]["id"], 3);
487        assert_eq!(events[2]["id"], 4);
488    }
489
490    #[tokio::test]
491    async fn test_entity_lru_eviction() {
492        let config = EntityCacheConfig {
493            max_entities_per_view: 2,
494            max_array_length: 100,
495            ..Default::default()
496        };
497        let cache = EntityCache::with_config(config);
498
499        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
500        cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
501        cache.upsert("tokens/list", "key3", json!({"id": 3})).await;
502
503        assert!(cache.get("tokens/list", "key1").await.is_none());
504        assert!(cache.get("tokens/list", "key2").await.is_some());
505        assert!(cache.get("tokens/list", "key3").await.is_some());
506    }
507
508    #[tokio::test]
509    async fn test_get_all() {
510        let cache = EntityCache::new();
511
512        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
513        cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
514
515        let all = cache.get_all("tokens/list").await;
516        assert_eq!(all.len(), 2);
517    }
518
519    #[tokio::test]
520    async fn test_separate_views() {
521        let cache = EntityCache::new();
522
523        cache
524            .upsert("tokens/list", "key1", json!({"type": "token"}))
525            .await;
526        cache
527            .upsert("games/list", "key1", json!({"type": "game"}))
528            .await;
529
530        let token = cache.get("tokens/list", "key1").await.unwrap();
531        let game = cache.get("games/list", "key1").await.unwrap();
532
533        assert_eq!(token["type"], "token");
534        assert_eq!(game["type"], "game");
535    }
536
537    #[test]
538    fn test_deep_merge_with_append() {
539        let mut base = json!({
540            "a": 1,
541            "b": {"c": 2},
542            "arr": [1, 2]
543        });
544
545        let patch = json!({
546            "b": {"d": 3},
547            "arr": [3],
548            "e": 4
549        });
550
551        deep_merge_with_append(&mut base, patch, &["arr".to_string()], 100);
552
553        assert_eq!(base["a"], 1);
554        assert_eq!(base["b"]["c"], 2);
555        assert_eq!(base["b"]["d"], 3);
556        assert_eq!(base["arr"].as_array().unwrap().len(), 3);
557        assert_eq!(base["e"], 4);
558    }
559
560    #[test]
561    fn test_deep_merge_replace_array() {
562        let mut base = json!({
563            "arr": [1, 2, 3]
564        });
565
566        let patch = json!({
567            "arr": [4, 5]
568        });
569
570        deep_merge_with_append(&mut base, patch, &[], 100);
571
572        assert_eq!(base["arr"].as_array().unwrap().len(), 2);
573        assert_eq!(base["arr"][0], 4);
574        assert_eq!(base["arr"][1], 5);
575    }
576
577    #[test]
578    fn test_deep_merge_nested_append() {
579        let mut base = json!({
580            "stats": {"events": [1, 2]}
581        });
582
583        let patch = json!({
584            "stats": {"events": [3]}
585        });
586
587        deep_merge_with_append(&mut base, patch, &["stats.events".to_string()], 100);
588
589        assert_eq!(base["stats"]["events"].as_array().unwrap().len(), 3);
590    }
591
592    #[test]
593    fn test_snapshot_config_defaults() {
594        let cache = EntityCache::new();
595        let config = cache.snapshot_config();
596
597        assert_eq!(config.initial_batch_size, 50);
598        assert_eq!(config.subsequent_batch_size, 100);
599    }
600
601    #[test]
602    fn test_snapshot_config_custom() {
603        let config = EntityCacheConfig {
604            initial_snapshot_batch_size: 25,
605            subsequent_snapshot_batch_size: 75,
606            ..Default::default()
607        };
608        let cache = EntityCache::with_config(config);
609        let snapshot_config = cache.snapshot_config();
610
611        assert_eq!(snapshot_config.initial_batch_size, 25);
612        assert_eq!(snapshot_config.subsequent_batch_size, 75);
613    }
614
615    #[tokio::test]
616    async fn test_get_after() {
617        let cache = EntityCache::new();
618
619        // Insert entities with _seq values
620        cache
621            .upsert(
622                "tokens/list",
623                "key1",
624                json!({"id": 1, "_seq": "100:000000000001"}),
625            )
626            .await;
627        cache
628            .upsert(
629                "tokens/list",
630                "key2",
631                json!({"id": 2, "_seq": "100:000000000002"}),
632            )
633            .await;
634        cache
635            .upsert(
636                "tokens/list",
637                "key3",
638                json!({"id": 3, "_seq": "100:000000000003"}),
639            )
640            .await;
641        cache
642            .upsert(
643                "tokens/list",
644                "key4",
645                json!({"id": 4, "_seq": "101:000000000001"}),
646            )
647            .await;
648
649        // Get all entities after "100:000000000002"
650        let after = cache
651            .get_after("tokens/list", "100:000000000002", None)
652            .await;
653
654        // Should return key3 and key4 (sorted by _seq)
655        assert_eq!(after.len(), 2);
656        assert_eq!(after[0].0, "key3");
657        assert_eq!(after[1].0, "key4");
658    }
659
660    #[tokio::test]
661    async fn test_get_after_with_limit() {
662        let cache = EntityCache::new();
663
664        // Insert entities with _seq values
665        cache
666            .upsert(
667                "tokens/list",
668                "key1",
669                json!({"id": 1, "_seq": "100:000000000001"}),
670            )
671            .await;
672        cache
673            .upsert(
674                "tokens/list",
675                "key2",
676                json!({"id": 2, "_seq": "100:000000000002"}),
677            )
678            .await;
679        cache
680            .upsert(
681                "tokens/list",
682                "key3",
683                json!({"id": 3, "_seq": "100:000000000003"}),
684            )
685            .await;
686
687        // Get entities after "100:000000000000" with limit 2
688        let after = cache
689            .get_after("tokens/list", "100:000000000000", Some(2))
690            .await;
691
692        // Should return only first 2 (key1 and key2)
693        assert_eq!(after.len(), 2);
694        assert_eq!(after[0].0, "key1");
695        assert_eq!(after[1].0, "key2");
696    }
697
698    #[tokio::test]
699    async fn test_get_after_empty_result() {
700        let cache = EntityCache::new();
701
702        cache
703            .upsert(
704                "tokens/list",
705                "key1",
706                json!({"id": 1, "_seq": "100:000000000001"}),
707            )
708            .await;
709
710        // Get entities after a future cursor
711        let after = cache
712            .get_after("tokens/list", "999:000000000000", None)
713            .await;
714
715        assert!(after.is_empty());
716    }
717
718    #[tokio::test]
719    async fn test_get_after_missing_seq() {
720        let cache = EntityCache::new();
721
722        // Insert entity without _seq
723        cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
724
725        // Get entities after any cursor - entity without _seq should not be included
726        let after = cache.get_after("tokens/list", "0:000000000000", None).await;
727
728        assert!(after.is_empty());
729    }
730}