1use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 5;
15const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
16const DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE: usize = 50;
17const DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE: usize = 100;
18
19#[derive(Debug, Clone)]
21pub struct EntityCacheConfig {
22 pub max_entities_per_view: usize,
24 pub max_array_length: usize,
26 pub initial_snapshot_batch_size: usize,
28 pub subsequent_snapshot_batch_size: usize,
30}
31
32impl Default for EntityCacheConfig {
33 fn default() -> Self {
34 Self {
35 max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
36 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
37 initial_snapshot_batch_size: DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE,
38 subsequent_snapshot_batch_size: DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE,
39 }
40 }
41}
42
43#[derive(Clone)]
49pub struct EntityCache {
50 caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
52 config: EntityCacheConfig,
53}
54
55impl EntityCache {
56 pub fn new() -> Self {
58 Self::with_config(EntityCacheConfig::default())
59 }
60
61 pub fn with_config(config: EntityCacheConfig) -> Self {
63 Self {
64 caches: Arc::new(RwLock::new(HashMap::new())),
65 config,
66 }
67 }
68
69 pub async fn upsert(&self, view_id: &str, key: &str, patch: Value) {
70 self.upsert_with_append(view_id, key, patch, &[]).await;
71 }
72
73 pub async fn upsert_with_append(
74 &self,
75 view_id: &str,
76 key: &str,
77 patch: Value,
78 append_paths: &[String],
79 ) {
80 let debug_computed = std::env::var("HYPERSTACK_DEBUG_COMPUTED").is_ok();
81
82 if debug_computed {
83 if let Some(results) = patch.get("results") {
84 if results.get("rng").is_some()
85 || results.get("winning_square").is_some()
86 || results.get("did_hit_motherlode").is_some()
87 {
88 tracing::warn!(
89 "[CACHE_UPSERT] view={} key={} patch.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
90 view_id,
91 key,
92 results.get("rng"),
93 results.get("winning_square"),
94 results.get("did_hit_motherlode")
95 );
96 }
97 }
98 }
99
100 let mut caches = self.caches.write().await;
101
102 let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
103 LruCache::new(
104 NonZeroUsize::new(self.config.max_entities_per_view)
105 .expect("max_entities_per_view must be > 0"),
106 )
107 });
108
109 let max_array_length = self.config.max_array_length;
110
111 if let Some(entity) = cache.get_mut(key) {
112 if debug_computed {
113 if let Some(results) = entity.get("results") {
114 if results.get("rng").is_some() {
115 tracing::warn!(
116 "[CACHE_UPSERT] view={} key={} BEFORE merge entity.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
117 view_id,
118 key,
119 results.get("rng"),
120 results.get("winning_square"),
121 results.get("did_hit_motherlode")
122 );
123 }
124 }
125 }
126
127 deep_merge_with_append(entity, patch, append_paths, max_array_length);
128
129 if debug_computed {
130 if let Some(results) = entity.get("results") {
131 if results.get("rng").is_some() {
132 tracing::warn!(
133 "[CACHE_UPSERT] view={} key={} AFTER merge entity.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
134 view_id,
135 key,
136 results.get("rng"),
137 results.get("winning_square"),
138 results.get("did_hit_motherlode")
139 );
140 }
141 }
142 }
143 } else {
144 let new_entity = truncate_arrays_if_needed(patch, max_array_length);
145 cache.put(key.to_string(), new_entity);
146 }
147 }
148
149 pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
154 let caches = self.caches.read().await;
155
156 caches
157 .get(view_id)
158 .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
159 .unwrap_or_default()
160 }
161
162 pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
164 let caches = self.caches.read().await;
165 caches
166 .get(view_id)
167 .and_then(|cache| cache.peek(key).cloned())
168 }
169
170 pub async fn len(&self, view_id: &str) -> usize {
172 let caches = self.caches.read().await;
173 caches.get(view_id).map(|c| c.len()).unwrap_or(0)
174 }
175
176 pub async fn is_empty(&self, view_id: &str) -> bool {
178 self.len(view_id).await == 0
179 }
180
181 pub fn snapshot_config(&self) -> SnapshotBatchConfig {
183 SnapshotBatchConfig {
184 initial_batch_size: self.config.initial_snapshot_batch_size,
185 subsequent_batch_size: self.config.subsequent_snapshot_batch_size,
186 }
187 }
188
189 pub async fn clear(&self, view_id: &str) {
191 let mut caches = self.caches.write().await;
192 if let Some(cache) = caches.get_mut(view_id) {
193 cache.clear();
194 }
195 }
196
197 pub async fn clear_all(&self) {
198 let mut caches = self.caches.write().await;
199 caches.clear();
200 }
201
202 pub async fn stats(&self) -> CacheStats {
203 let caches = self.caches.read().await;
204 let mut total_entities = 0;
205 let mut views = Vec::new();
206
207 for (view_id, cache) in caches.iter() {
208 let count = cache.len();
209 total_entities += count;
210 views.push((view_id.clone(), count));
211 }
212
213 views.sort_by(|a, b| b.1.cmp(&a.1));
214
215 CacheStats {
216 view_count: caches.len(),
217 total_entities,
218 top_views: views.into_iter().take(5).collect(),
219 }
220 }
221}
222
223#[derive(Debug)]
224pub struct CacheStats {
225 pub view_count: usize,
226 pub total_entities: usize,
227 pub top_views: Vec<(String, usize)>,
228}
229
230#[derive(Debug, Clone, Copy)]
231pub struct SnapshotBatchConfig {
232 pub initial_batch_size: usize,
233 pub subsequent_batch_size: usize,
234}
235
236impl Default for EntityCache {
237 fn default() -> Self {
238 Self::new()
239 }
240}
241
242fn deep_merge_with_append(
243 base: &mut Value,
244 patch: Value,
245 append_paths: &[String],
246 max_array_length: usize,
247) {
248 deep_merge_with_append_inner(base, patch, append_paths, "", max_array_length);
249}
250
251fn deep_merge_with_append_inner(
252 base: &mut Value,
253 patch: Value,
254 append_paths: &[String],
255 current_path: &str,
256 max_array_length: usize,
257) {
258 match (base, patch) {
259 (Value::Object(base_map), Value::Object(patch_map)) => {
260 for (key, patch_value) in patch_map {
261 let child_path = if current_path.is_empty() {
262 key.clone()
263 } else {
264 format!("{}.{}", current_path, key)
265 };
266
267 if let Some(base_value) = base_map.get_mut(&key) {
268 deep_merge_with_append_inner(
269 base_value,
270 patch_value,
271 append_paths,
272 &child_path,
273 max_array_length,
274 );
275 } else {
276 base_map.insert(
277 key,
278 truncate_arrays_if_needed(patch_value, max_array_length),
279 );
280 }
281 }
282 }
283
284 (Value::Array(base_arr), Value::Array(patch_arr)) => {
285 let should_append = append_paths.iter().any(|p| p == current_path);
286 if should_append {
287 base_arr.extend(patch_arr);
288 if base_arr.len() > max_array_length {
289 let excess = base_arr.len() - max_array_length;
290 base_arr.drain(0..excess);
291 }
292 } else {
293 *base_arr = patch_arr;
294 if base_arr.len() > max_array_length {
295 let excess = base_arr.len() - max_array_length;
296 base_arr.drain(0..excess);
297 }
298 }
299 }
300
301 (base, patch_value) => {
302 *base = truncate_arrays_if_needed(patch_value, max_array_length);
303 }
304 }
305}
306
307fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
309 match value {
310 Value::Array(mut arr) => {
311 if arr.len() > max_array_length {
313 let excess = arr.len() - max_array_length;
314 arr.drain(0..excess);
315 }
316 Value::Array(
318 arr.into_iter()
319 .map(|v| truncate_arrays_if_needed(v, max_array_length))
320 .collect(),
321 )
322 }
323 Value::Object(map) => Value::Object(
324 map.into_iter()
325 .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
326 .collect(),
327 ),
328 other => other,
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use serde_json::json;
336
337 #[tokio::test]
338 async fn test_basic_upsert_and_get() {
339 let cache = EntityCache::new();
340
341 cache
342 .upsert("tokens/list", "abc123", json!({"name": "Test Token"}))
343 .await;
344
345 let entity = cache.get("tokens/list", "abc123").await;
346 assert!(entity.is_some());
347 assert_eq!(entity.unwrap()["name"], "Test Token");
348 }
349
350 #[tokio::test]
351 async fn test_deep_merge_objects() {
352 let cache = EntityCache::new();
353
354 cache
355 .upsert(
356 "tokens/list",
357 "abc123",
358 json!({
359 "id": "abc123",
360 "metrics": {"volume": 100}
361 }),
362 )
363 .await;
364
365 cache
366 .upsert(
367 "tokens/list",
368 "abc123",
369 json!({
370 "metrics": {"trades": 50}
371 }),
372 )
373 .await;
374
375 let entity = cache.get("tokens/list", "abc123").await.unwrap();
376 assert_eq!(entity["id"], "abc123");
377 assert_eq!(entity["metrics"]["volume"], 100);
378 assert_eq!(entity["metrics"]["trades"], 50);
379 }
380
381 #[tokio::test]
382 async fn test_array_append() {
383 let cache = EntityCache::new();
384
385 cache
386 .upsert(
387 "tokens/list",
388 "abc123",
389 json!({
390 "events": [{"type": "buy", "amount": 100}]
391 }),
392 )
393 .await;
394
395 cache
396 .upsert_with_append(
397 "tokens/list",
398 "abc123",
399 json!({
400 "events": [{"type": "sell", "amount": 50}]
401 }),
402 &["events".to_string()],
403 )
404 .await;
405
406 let entity = cache.get("tokens/list", "abc123").await.unwrap();
407 let events = entity["events"].as_array().unwrap();
408 assert_eq!(events.len(), 2);
409 assert_eq!(events[0]["type"], "buy");
410 assert_eq!(events[1]["type"], "sell");
411 }
412
413 #[tokio::test]
414 async fn test_array_lru_eviction() {
415 let config = EntityCacheConfig {
416 max_entities_per_view: 1000,
417 max_array_length: 3,
418 ..Default::default()
419 };
420 let cache = EntityCache::with_config(config);
421
422 cache
423 .upsert(
424 "tokens/list",
425 "abc123",
426 json!({
427 "events": [
428 {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
429 ]
430 }),
431 )
432 .await;
433
434 let entity = cache.get("tokens/list", "abc123").await.unwrap();
435 let events = entity["events"].as_array().unwrap();
436
437 assert_eq!(events.len(), 3);
438 assert_eq!(events[0]["id"], 3);
439 assert_eq!(events[1]["id"], 4);
440 assert_eq!(events[2]["id"], 5);
441 }
442
443 #[tokio::test]
444 async fn test_array_append_with_lru() {
445 let config = EntityCacheConfig {
446 max_entities_per_view: 1000,
447 max_array_length: 3,
448 ..Default::default()
449 };
450 let cache = EntityCache::with_config(config);
451
452 cache
453 .upsert(
454 "tokens/list",
455 "abc123",
456 json!({
457 "events": [{"id": 1}, {"id": 2}]
458 }),
459 )
460 .await;
461
462 cache
463 .upsert_with_append(
464 "tokens/list",
465 "abc123",
466 json!({
467 "events": [{"id": 3}, {"id": 4}]
468 }),
469 &["events".to_string()],
470 )
471 .await;
472
473 let entity = cache.get("tokens/list", "abc123").await.unwrap();
474 let events = entity["events"].as_array().unwrap();
475
476 assert_eq!(events.len(), 3);
478 assert_eq!(events[0]["id"], 2);
479 assert_eq!(events[1]["id"], 3);
480 assert_eq!(events[2]["id"], 4);
481 }
482
483 #[tokio::test]
484 async fn test_entity_lru_eviction() {
485 let config = EntityCacheConfig {
486 max_entities_per_view: 2,
487 max_array_length: 100,
488 ..Default::default()
489 };
490 let cache = EntityCache::with_config(config);
491
492 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
493 cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
494 cache.upsert("tokens/list", "key3", json!({"id": 3})).await;
495
496 assert!(cache.get("tokens/list", "key1").await.is_none());
497 assert!(cache.get("tokens/list", "key2").await.is_some());
498 assert!(cache.get("tokens/list", "key3").await.is_some());
499 }
500
501 #[tokio::test]
502 async fn test_get_all() {
503 let cache = EntityCache::new();
504
505 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
506 cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
507
508 let all = cache.get_all("tokens/list").await;
509 assert_eq!(all.len(), 2);
510 }
511
512 #[tokio::test]
513 async fn test_separate_views() {
514 let cache = EntityCache::new();
515
516 cache
517 .upsert("tokens/list", "key1", json!({"type": "token"}))
518 .await;
519 cache
520 .upsert("games/list", "key1", json!({"type": "game"}))
521 .await;
522
523 let token = cache.get("tokens/list", "key1").await.unwrap();
524 let game = cache.get("games/list", "key1").await.unwrap();
525
526 assert_eq!(token["type"], "token");
527 assert_eq!(game["type"], "game");
528 }
529
530 #[test]
531 fn test_deep_merge_with_append() {
532 let mut base = json!({
533 "a": 1,
534 "b": {"c": 2},
535 "arr": [1, 2]
536 });
537
538 let patch = json!({
539 "b": {"d": 3},
540 "arr": [3],
541 "e": 4
542 });
543
544 deep_merge_with_append(&mut base, patch, &["arr".to_string()], 100);
545
546 assert_eq!(base["a"], 1);
547 assert_eq!(base["b"]["c"], 2);
548 assert_eq!(base["b"]["d"], 3);
549 assert_eq!(base["arr"].as_array().unwrap().len(), 3);
550 assert_eq!(base["e"], 4);
551 }
552
553 #[test]
554 fn test_deep_merge_replace_array() {
555 let mut base = json!({
556 "arr": [1, 2, 3]
557 });
558
559 let patch = json!({
560 "arr": [4, 5]
561 });
562
563 deep_merge_with_append(&mut base, patch, &[], 100);
564
565 assert_eq!(base["arr"].as_array().unwrap().len(), 2);
566 assert_eq!(base["arr"][0], 4);
567 assert_eq!(base["arr"][1], 5);
568 }
569
570 #[test]
571 fn test_deep_merge_nested_append() {
572 let mut base = json!({
573 "stats": {"events": [1, 2]}
574 });
575
576 let patch = json!({
577 "stats": {"events": [3]}
578 });
579
580 deep_merge_with_append(&mut base, patch, &["stats.events".to_string()], 100);
581
582 assert_eq!(base["stats"]["events"].as_array().unwrap().len(), 3);
583 }
584
585 #[test]
586 fn test_snapshot_config_defaults() {
587 let cache = EntityCache::new();
588 let config = cache.snapshot_config();
589
590 assert_eq!(config.initial_batch_size, 50);
591 assert_eq!(config.subsequent_batch_size, 100);
592 }
593
594 #[test]
595 fn test_snapshot_config_custom() {
596 let config = EntityCacheConfig {
597 initial_snapshot_batch_size: 25,
598 subsequent_snapshot_batch_size: 75,
599 ..Default::default()
600 };
601 let cache = EntityCache::with_config(config);
602 let snapshot_config = cache.snapshot_config();
603
604 assert_eq!(snapshot_config.initial_batch_size, 25);
605 assert_eq!(snapshot_config.subsequent_batch_size, 75);
606 }
607}