1use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 500;
15const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
16const DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE: usize = 50;
17const DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE: usize = 100;
18
19pub fn cmp_seq(a: &str, b: &str) -> std::cmp::Ordering {
23 fn parse(s: &str) -> (u64, u64) {
24 let mut parts = s.splitn(2, ':');
25 let slot = parts.next().and_then(|p| p.parse().ok()).unwrap_or(0);
26 let offset = parts.next().and_then(|p| p.parse().ok()).unwrap_or(0);
27 (slot, offset)
28 }
29 parse(a).cmp(&parse(b))
30}
31
32#[derive(Debug, Clone)]
34pub struct EntityCacheConfig {
35 pub max_entities_per_view: usize,
37 pub max_array_length: usize,
39 pub initial_snapshot_batch_size: usize,
41 pub subsequent_snapshot_batch_size: usize,
43}
44
45impl Default for EntityCacheConfig {
46 fn default() -> Self {
47 Self {
48 max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
49 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
50 initial_snapshot_batch_size: DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE,
51 subsequent_snapshot_batch_size: DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE,
52 }
53 }
54}
55
56#[derive(Clone)]
62pub struct EntityCache {
63 caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
65 config: EntityCacheConfig,
66}
67
68impl EntityCache {
69 pub fn new() -> Self {
71 Self::with_config(EntityCacheConfig::default())
72 }
73
74 pub fn with_config(config: EntityCacheConfig) -> Self {
76 Self {
77 caches: Arc::new(RwLock::new(HashMap::new())),
78 config,
79 }
80 }
81
82 pub async fn upsert(&self, view_id: &str, key: &str, patch: Value) {
83 self.upsert_with_append(view_id, key, patch, &[]).await;
84 }
85
86 pub async fn upsert_with_append(
87 &self,
88 view_id: &str,
89 key: &str,
90 patch: Value,
91 append_paths: &[String],
92 ) {
93 let mut caches = self.caches.write().await;
94
95 let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
96 LruCache::new(
97 NonZeroUsize::new(self.config.max_entities_per_view)
98 .expect("max_entities_per_view must be > 0"),
99 )
100 });
101
102 let max_array_length = self.config.max_array_length;
103
104 if let Some(entity) = cache.get_mut(key) {
105 deep_merge_with_append(entity, patch, append_paths, max_array_length);
106 } else {
107 let new_entity = truncate_arrays_if_needed(patch, max_array_length);
108 cache.put(key.to_string(), new_entity);
109 }
110 }
111
112 pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
117 let caches = self.caches.read().await;
118
119 caches
120 .get(view_id)
121 .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
122 .unwrap_or_default()
123 }
124
125 pub async fn get_after(
131 &self,
132 view_id: &str,
133 cursor: &str,
134 limit: Option<usize>,
135 ) -> Vec<(String, Value)> {
136 let caches = self.caches.read().await;
137
138 if let Some(cache) = caches.get(view_id) {
139 let mut results: Vec<(String, Value)> = cache
140 .iter()
141 .filter(|(_, entity)| {
142 entity
143 .get("_seq")
144 .and_then(|s| s.as_str())
145 .map(|seq| cmp_seq(seq, cursor) == std::cmp::Ordering::Greater)
146 .unwrap_or(false)
147 })
148 .map(|(k, v)| (k.clone(), v.clone()))
149 .collect();
150
151 results.sort_by(|a, b| {
153 let seq_a = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
154 let seq_b = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
155 cmp_seq(seq_a, seq_b)
156 });
157
158 if let Some(limit) = limit {
160 results.truncate(limit);
161 }
162
163 results
164 } else {
165 vec![]
166 }
167 }
168
169 pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
171 let caches = self.caches.read().await;
172 caches
173 .get(view_id)
174 .and_then(|cache| cache.peek(key).cloned())
175 }
176
177 pub async fn len(&self, view_id: &str) -> usize {
179 let caches = self.caches.read().await;
180 caches.get(view_id).map(|c| c.len()).unwrap_or(0)
181 }
182
183 pub async fn is_empty(&self, view_id: &str) -> bool {
185 self.len(view_id).await == 0
186 }
187
188 pub fn snapshot_config(&self) -> SnapshotBatchConfig {
190 SnapshotBatchConfig {
191 initial_batch_size: self.config.initial_snapshot_batch_size,
192 subsequent_batch_size: self.config.subsequent_snapshot_batch_size,
193 }
194 }
195
196 pub async fn clear(&self, view_id: &str) {
198 let mut caches = self.caches.write().await;
199 if let Some(cache) = caches.get_mut(view_id) {
200 cache.clear();
201 }
202 }
203
204 pub async fn clear_all(&self) {
205 let mut caches = self.caches.write().await;
206 caches.clear();
207 }
208
209 pub async fn stats(&self) -> CacheStats {
210 let caches = self.caches.read().await;
211 let mut total_entities = 0;
212 let mut views = Vec::new();
213
214 for (view_id, cache) in caches.iter() {
215 let count = cache.len();
216 total_entities += count;
217 views.push((view_id.clone(), count));
218 }
219
220 views.sort_by(|a, b| b.1.cmp(&a.1));
221
222 CacheStats {
223 view_count: caches.len(),
224 total_entities,
225 top_views: views.into_iter().take(5).collect(),
226 }
227 }
228}
229
230#[derive(Debug)]
231pub struct CacheStats {
232 pub view_count: usize,
233 pub total_entities: usize,
234 pub top_views: Vec<(String, usize)>,
235}
236
237#[derive(Debug, Clone, Copy)]
238pub struct SnapshotBatchConfig {
239 pub initial_batch_size: usize,
240 pub subsequent_batch_size: usize,
241}
242
243impl Default for EntityCache {
244 fn default() -> Self {
245 Self::new()
246 }
247}
248
249fn deep_merge_with_append(
250 base: &mut Value,
251 patch: Value,
252 append_paths: &[String],
253 max_array_length: usize,
254) {
255 deep_merge_with_append_inner(base, patch, append_paths, "", max_array_length);
256}
257
258fn deep_merge_with_append_inner(
259 base: &mut Value,
260 patch: Value,
261 append_paths: &[String],
262 current_path: &str,
263 max_array_length: usize,
264) {
265 match (base, patch) {
266 (Value::Object(base_map), Value::Object(patch_map)) => {
267 for (key, patch_value) in patch_map {
268 let child_path = if current_path.is_empty() {
269 key.clone()
270 } else {
271 format!("{}.{}", current_path, key)
272 };
273
274 if let Some(base_value) = base_map.get_mut(&key) {
275 deep_merge_with_append_inner(
276 base_value,
277 patch_value,
278 append_paths,
279 &child_path,
280 max_array_length,
281 );
282 } else {
283 base_map.insert(
284 key,
285 truncate_arrays_if_needed(patch_value, max_array_length),
286 );
287 }
288 }
289 }
290
291 (Value::Array(base_arr), Value::Array(patch_arr)) => {
292 let should_append = append_paths.iter().any(|p| p == current_path);
293 if should_append {
294 base_arr.extend(patch_arr);
295 if base_arr.len() > max_array_length {
296 let excess = base_arr.len() - max_array_length;
297 base_arr.drain(0..excess);
298 }
299 } else {
300 *base_arr = patch_arr;
301 if base_arr.len() > max_array_length {
302 let excess = base_arr.len() - max_array_length;
303 base_arr.drain(0..excess);
304 }
305 }
306 }
307
308 (base, patch_value) => {
309 *base = truncate_arrays_if_needed(patch_value, max_array_length);
310 }
311 }
312}
313
314fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
316 match value {
317 Value::Array(mut arr) => {
318 if arr.len() > max_array_length {
320 let excess = arr.len() - max_array_length;
321 arr.drain(0..excess);
322 }
323 Value::Array(
325 arr.into_iter()
326 .map(|v| truncate_arrays_if_needed(v, max_array_length))
327 .collect(),
328 )
329 }
330 Value::Object(map) => Value::Object(
331 map.into_iter()
332 .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
333 .collect(),
334 ),
335 other => other,
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use serde_json::json;
343
344 #[tokio::test]
345 async fn test_basic_upsert_and_get() {
346 let cache = EntityCache::new();
347
348 cache
349 .upsert("tokens/list", "abc123", json!({"name": "Test Token"}))
350 .await;
351
352 let entity = cache.get("tokens/list", "abc123").await;
353 assert!(entity.is_some());
354 assert_eq!(entity.unwrap()["name"], "Test Token");
355 }
356
357 #[tokio::test]
358 async fn test_deep_merge_objects() {
359 let cache = EntityCache::new();
360
361 cache
362 .upsert(
363 "tokens/list",
364 "abc123",
365 json!({
366 "id": "abc123",
367 "metrics": {"volume": 100}
368 }),
369 )
370 .await;
371
372 cache
373 .upsert(
374 "tokens/list",
375 "abc123",
376 json!({
377 "metrics": {"trades": 50}
378 }),
379 )
380 .await;
381
382 let entity = cache.get("tokens/list", "abc123").await.unwrap();
383 assert_eq!(entity["id"], "abc123");
384 assert_eq!(entity["metrics"]["volume"], 100);
385 assert_eq!(entity["metrics"]["trades"], 50);
386 }
387
388 #[tokio::test]
389 async fn test_array_append() {
390 let cache = EntityCache::new();
391
392 cache
393 .upsert(
394 "tokens/list",
395 "abc123",
396 json!({
397 "events": [{"type": "buy", "amount": 100}]
398 }),
399 )
400 .await;
401
402 cache
403 .upsert_with_append(
404 "tokens/list",
405 "abc123",
406 json!({
407 "events": [{"type": "sell", "amount": 50}]
408 }),
409 &["events".to_string()],
410 )
411 .await;
412
413 let entity = cache.get("tokens/list", "abc123").await.unwrap();
414 let events = entity["events"].as_array().unwrap();
415 assert_eq!(events.len(), 2);
416 assert_eq!(events[0]["type"], "buy");
417 assert_eq!(events[1]["type"], "sell");
418 }
419
420 #[tokio::test]
421 async fn test_array_lru_eviction() {
422 let config = EntityCacheConfig {
423 max_entities_per_view: 1000,
424 max_array_length: 3,
425 ..Default::default()
426 };
427 let cache = EntityCache::with_config(config);
428
429 cache
430 .upsert(
431 "tokens/list",
432 "abc123",
433 json!({
434 "events": [
435 {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
436 ]
437 }),
438 )
439 .await;
440
441 let entity = cache.get("tokens/list", "abc123").await.unwrap();
442 let events = entity["events"].as_array().unwrap();
443
444 assert_eq!(events.len(), 3);
445 assert_eq!(events[0]["id"], 3);
446 assert_eq!(events[1]["id"], 4);
447 assert_eq!(events[2]["id"], 5);
448 }
449
450 #[tokio::test]
451 async fn test_array_append_with_lru() {
452 let config = EntityCacheConfig {
453 max_entities_per_view: 1000,
454 max_array_length: 3,
455 ..Default::default()
456 };
457 let cache = EntityCache::with_config(config);
458
459 cache
460 .upsert(
461 "tokens/list",
462 "abc123",
463 json!({
464 "events": [{"id": 1}, {"id": 2}]
465 }),
466 )
467 .await;
468
469 cache
470 .upsert_with_append(
471 "tokens/list",
472 "abc123",
473 json!({
474 "events": [{"id": 3}, {"id": 4}]
475 }),
476 &["events".to_string()],
477 )
478 .await;
479
480 let entity = cache.get("tokens/list", "abc123").await.unwrap();
481 let events = entity["events"].as_array().unwrap();
482
483 assert_eq!(events.len(), 3);
485 assert_eq!(events[0]["id"], 2);
486 assert_eq!(events[1]["id"], 3);
487 assert_eq!(events[2]["id"], 4);
488 }
489
490 #[tokio::test]
491 async fn test_entity_lru_eviction() {
492 let config = EntityCacheConfig {
493 max_entities_per_view: 2,
494 max_array_length: 100,
495 ..Default::default()
496 };
497 let cache = EntityCache::with_config(config);
498
499 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
500 cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
501 cache.upsert("tokens/list", "key3", json!({"id": 3})).await;
502
503 assert!(cache.get("tokens/list", "key1").await.is_none());
504 assert!(cache.get("tokens/list", "key2").await.is_some());
505 assert!(cache.get("tokens/list", "key3").await.is_some());
506 }
507
508 #[tokio::test]
509 async fn test_get_all() {
510 let cache = EntityCache::new();
511
512 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
513 cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
514
515 let all = cache.get_all("tokens/list").await;
516 assert_eq!(all.len(), 2);
517 }
518
519 #[tokio::test]
520 async fn test_separate_views() {
521 let cache = EntityCache::new();
522
523 cache
524 .upsert("tokens/list", "key1", json!({"type": "token"}))
525 .await;
526 cache
527 .upsert("games/list", "key1", json!({"type": "game"}))
528 .await;
529
530 let token = cache.get("tokens/list", "key1").await.unwrap();
531 let game = cache.get("games/list", "key1").await.unwrap();
532
533 assert_eq!(token["type"], "token");
534 assert_eq!(game["type"], "game");
535 }
536
537 #[test]
538 fn test_deep_merge_with_append() {
539 let mut base = json!({
540 "a": 1,
541 "b": {"c": 2},
542 "arr": [1, 2]
543 });
544
545 let patch = json!({
546 "b": {"d": 3},
547 "arr": [3],
548 "e": 4
549 });
550
551 deep_merge_with_append(&mut base, patch, &["arr".to_string()], 100);
552
553 assert_eq!(base["a"], 1);
554 assert_eq!(base["b"]["c"], 2);
555 assert_eq!(base["b"]["d"], 3);
556 assert_eq!(base["arr"].as_array().unwrap().len(), 3);
557 assert_eq!(base["e"], 4);
558 }
559
560 #[test]
561 fn test_deep_merge_replace_array() {
562 let mut base = json!({
563 "arr": [1, 2, 3]
564 });
565
566 let patch = json!({
567 "arr": [4, 5]
568 });
569
570 deep_merge_with_append(&mut base, patch, &[], 100);
571
572 assert_eq!(base["arr"].as_array().unwrap().len(), 2);
573 assert_eq!(base["arr"][0], 4);
574 assert_eq!(base["arr"][1], 5);
575 }
576
577 #[test]
578 fn test_deep_merge_nested_append() {
579 let mut base = json!({
580 "stats": {"events": [1, 2]}
581 });
582
583 let patch = json!({
584 "stats": {"events": [3]}
585 });
586
587 deep_merge_with_append(&mut base, patch, &["stats.events".to_string()], 100);
588
589 assert_eq!(base["stats"]["events"].as_array().unwrap().len(), 3);
590 }
591
592 #[test]
593 fn test_snapshot_config_defaults() {
594 let cache = EntityCache::new();
595 let config = cache.snapshot_config();
596
597 assert_eq!(config.initial_batch_size, 50);
598 assert_eq!(config.subsequent_batch_size, 100);
599 }
600
601 #[test]
602 fn test_snapshot_config_custom() {
603 let config = EntityCacheConfig {
604 initial_snapshot_batch_size: 25,
605 subsequent_snapshot_batch_size: 75,
606 ..Default::default()
607 };
608 let cache = EntityCache::with_config(config);
609 let snapshot_config = cache.snapshot_config();
610
611 assert_eq!(snapshot_config.initial_batch_size, 25);
612 assert_eq!(snapshot_config.subsequent_batch_size, 75);
613 }
614
615 #[tokio::test]
616 async fn test_get_after() {
617 let cache = EntityCache::new();
618
619 cache
621 .upsert(
622 "tokens/list",
623 "key1",
624 json!({"id": 1, "_seq": "100:000000000001"}),
625 )
626 .await;
627 cache
628 .upsert(
629 "tokens/list",
630 "key2",
631 json!({"id": 2, "_seq": "100:000000000002"}),
632 )
633 .await;
634 cache
635 .upsert(
636 "tokens/list",
637 "key3",
638 json!({"id": 3, "_seq": "100:000000000003"}),
639 )
640 .await;
641 cache
642 .upsert(
643 "tokens/list",
644 "key4",
645 json!({"id": 4, "_seq": "101:000000000001"}),
646 )
647 .await;
648
649 let after = cache
651 .get_after("tokens/list", "100:000000000002", None)
652 .await;
653
654 assert_eq!(after.len(), 2);
656 assert_eq!(after[0].0, "key3");
657 assert_eq!(after[1].0, "key4");
658 }
659
660 #[tokio::test]
661 async fn test_get_after_with_limit() {
662 let cache = EntityCache::new();
663
664 cache
666 .upsert(
667 "tokens/list",
668 "key1",
669 json!({"id": 1, "_seq": "100:000000000001"}),
670 )
671 .await;
672 cache
673 .upsert(
674 "tokens/list",
675 "key2",
676 json!({"id": 2, "_seq": "100:000000000002"}),
677 )
678 .await;
679 cache
680 .upsert(
681 "tokens/list",
682 "key3",
683 json!({"id": 3, "_seq": "100:000000000003"}),
684 )
685 .await;
686
687 let after = cache
689 .get_after("tokens/list", "100:000000000000", Some(2))
690 .await;
691
692 assert_eq!(after.len(), 2);
694 assert_eq!(after[0].0, "key1");
695 assert_eq!(after[1].0, "key2");
696 }
697
698 #[tokio::test]
699 async fn test_get_after_empty_result() {
700 let cache = EntityCache::new();
701
702 cache
703 .upsert(
704 "tokens/list",
705 "key1",
706 json!({"id": 1, "_seq": "100:000000000001"}),
707 )
708 .await;
709
710 let after = cache
712 .get_after("tokens/list", "999:000000000000", None)
713 .await;
714
715 assert!(after.is_empty());
716 }
717
718 #[tokio::test]
719 async fn test_get_after_missing_seq() {
720 let cache = EntityCache::new();
721
722 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
724
725 let after = cache.get_after("tokens/list", "0:000000000000", None).await;
727
728 assert!(after.is_empty());
729 }
730}