1use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 500;
15const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
16const DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE: usize = 50;
17const DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE: usize = 100;
18
19#[derive(Debug, Clone)]
21pub struct EntityCacheConfig {
22 pub max_entities_per_view: usize,
24 pub max_array_length: usize,
26 pub initial_snapshot_batch_size: usize,
28 pub subsequent_snapshot_batch_size: usize,
30}
31
32impl Default for EntityCacheConfig {
33 fn default() -> Self {
34 Self {
35 max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
36 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
37 initial_snapshot_batch_size: DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE,
38 subsequent_snapshot_batch_size: DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE,
39 }
40 }
41}
42
43#[derive(Clone)]
49pub struct EntityCache {
50 caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
52 config: EntityCacheConfig,
53}
54
55impl EntityCache {
56 pub fn new() -> Self {
58 Self::with_config(EntityCacheConfig::default())
59 }
60
61 pub fn with_config(config: EntityCacheConfig) -> Self {
63 Self {
64 caches: Arc::new(RwLock::new(HashMap::new())),
65 config,
66 }
67 }
68
69 pub async fn upsert(&self, view_id: &str, key: &str, patch: Value) {
70 self.upsert_with_append(view_id, key, patch, &[]).await;
71 }
72
73 pub async fn upsert_with_append(
74 &self,
75 view_id: &str,
76 key: &str,
77 patch: Value,
78 append_paths: &[String],
79 ) {
80 let mut caches = self.caches.write().await;
81
82 let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
83 LruCache::new(
84 NonZeroUsize::new(self.config.max_entities_per_view)
85 .expect("max_entities_per_view must be > 0"),
86 )
87 });
88
89 let max_array_length = self.config.max_array_length;
90
91 if let Some(entity) = cache.get_mut(key) {
92 deep_merge_with_append(entity, patch, append_paths, max_array_length);
93 } else {
94 let new_entity = truncate_arrays_if_needed(patch, max_array_length);
95 cache.put(key.to_string(), new_entity);
96 }
97 }
98
99 pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
104 let caches = self.caches.read().await;
105
106 caches
107 .get(view_id)
108 .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
109 .unwrap_or_default()
110 }
111
112 pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
114 let caches = self.caches.read().await;
115 caches
116 .get(view_id)
117 .and_then(|cache| cache.peek(key).cloned())
118 }
119
120 pub async fn len(&self, view_id: &str) -> usize {
122 let caches = self.caches.read().await;
123 caches.get(view_id).map(|c| c.len()).unwrap_or(0)
124 }
125
126 pub async fn is_empty(&self, view_id: &str) -> bool {
128 self.len(view_id).await == 0
129 }
130
131 pub fn snapshot_config(&self) -> SnapshotBatchConfig {
133 SnapshotBatchConfig {
134 initial_batch_size: self.config.initial_snapshot_batch_size,
135 subsequent_batch_size: self.config.subsequent_snapshot_batch_size,
136 }
137 }
138
139 pub async fn clear(&self, view_id: &str) {
141 let mut caches = self.caches.write().await;
142 if let Some(cache) = caches.get_mut(view_id) {
143 cache.clear();
144 }
145 }
146
147 pub async fn clear_all(&self) {
148 let mut caches = self.caches.write().await;
149 caches.clear();
150 }
151
152 pub async fn stats(&self) -> CacheStats {
153 let caches = self.caches.read().await;
154 let mut total_entities = 0;
155 let mut views = Vec::new();
156
157 for (view_id, cache) in caches.iter() {
158 let count = cache.len();
159 total_entities += count;
160 views.push((view_id.clone(), count));
161 }
162
163 views.sort_by(|a, b| b.1.cmp(&a.1));
164
165 CacheStats {
166 view_count: caches.len(),
167 total_entities,
168 top_views: views.into_iter().take(5).collect(),
169 }
170 }
171}
172
173#[derive(Debug)]
174pub struct CacheStats {
175 pub view_count: usize,
176 pub total_entities: usize,
177 pub top_views: Vec<(String, usize)>,
178}
179
180#[derive(Debug, Clone, Copy)]
181pub struct SnapshotBatchConfig {
182 pub initial_batch_size: usize,
183 pub subsequent_batch_size: usize,
184}
185
186impl Default for EntityCache {
187 fn default() -> Self {
188 Self::new()
189 }
190}
191
192fn deep_merge_with_append(
193 base: &mut Value,
194 patch: Value,
195 append_paths: &[String],
196 max_array_length: usize,
197) {
198 deep_merge_with_append_inner(base, patch, append_paths, "", max_array_length);
199}
200
201fn deep_merge_with_append_inner(
202 base: &mut Value,
203 patch: Value,
204 append_paths: &[String],
205 current_path: &str,
206 max_array_length: usize,
207) {
208 match (base, patch) {
209 (Value::Object(base_map), Value::Object(patch_map)) => {
210 for (key, patch_value) in patch_map {
211 let child_path = if current_path.is_empty() {
212 key.clone()
213 } else {
214 format!("{}.{}", current_path, key)
215 };
216
217 if let Some(base_value) = base_map.get_mut(&key) {
218 deep_merge_with_append_inner(
219 base_value,
220 patch_value,
221 append_paths,
222 &child_path,
223 max_array_length,
224 );
225 } else {
226 base_map.insert(
227 key,
228 truncate_arrays_if_needed(patch_value, max_array_length),
229 );
230 }
231 }
232 }
233
234 (Value::Array(base_arr), Value::Array(patch_arr)) => {
235 let should_append = append_paths.iter().any(|p| p == current_path);
236 if should_append {
237 base_arr.extend(patch_arr);
238 if base_arr.len() > max_array_length {
239 let excess = base_arr.len() - max_array_length;
240 base_arr.drain(0..excess);
241 }
242 } else {
243 *base_arr = patch_arr;
244 if base_arr.len() > max_array_length {
245 let excess = base_arr.len() - max_array_length;
246 base_arr.drain(0..excess);
247 }
248 }
249 }
250
251 (base, patch_value) => {
252 *base = truncate_arrays_if_needed(patch_value, max_array_length);
253 }
254 }
255}
256
257fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
259 match value {
260 Value::Array(mut arr) => {
261 if arr.len() > max_array_length {
263 let excess = arr.len() - max_array_length;
264 arr.drain(0..excess);
265 }
266 Value::Array(
268 arr.into_iter()
269 .map(|v| truncate_arrays_if_needed(v, max_array_length))
270 .collect(),
271 )
272 }
273 Value::Object(map) => Value::Object(
274 map.into_iter()
275 .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
276 .collect(),
277 ),
278 other => other,
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use serde_json::json;
286
287 #[tokio::test]
288 async fn test_basic_upsert_and_get() {
289 let cache = EntityCache::new();
290
291 cache
292 .upsert("tokens/list", "abc123", json!({"name": "Test Token"}))
293 .await;
294
295 let entity = cache.get("tokens/list", "abc123").await;
296 assert!(entity.is_some());
297 assert_eq!(entity.unwrap()["name"], "Test Token");
298 }
299
300 #[tokio::test]
301 async fn test_deep_merge_objects() {
302 let cache = EntityCache::new();
303
304 cache
305 .upsert(
306 "tokens/list",
307 "abc123",
308 json!({
309 "id": "abc123",
310 "metrics": {"volume": 100}
311 }),
312 )
313 .await;
314
315 cache
316 .upsert(
317 "tokens/list",
318 "abc123",
319 json!({
320 "metrics": {"trades": 50}
321 }),
322 )
323 .await;
324
325 let entity = cache.get("tokens/list", "abc123").await.unwrap();
326 assert_eq!(entity["id"], "abc123");
327 assert_eq!(entity["metrics"]["volume"], 100);
328 assert_eq!(entity["metrics"]["trades"], 50);
329 }
330
331 #[tokio::test]
332 async fn test_array_append() {
333 let cache = EntityCache::new();
334
335 cache
336 .upsert(
337 "tokens/list",
338 "abc123",
339 json!({
340 "events": [{"type": "buy", "amount": 100}]
341 }),
342 )
343 .await;
344
345 cache
346 .upsert_with_append(
347 "tokens/list",
348 "abc123",
349 json!({
350 "events": [{"type": "sell", "amount": 50}]
351 }),
352 &["events".to_string()],
353 )
354 .await;
355
356 let entity = cache.get("tokens/list", "abc123").await.unwrap();
357 let events = entity["events"].as_array().unwrap();
358 assert_eq!(events.len(), 2);
359 assert_eq!(events[0]["type"], "buy");
360 assert_eq!(events[1]["type"], "sell");
361 }
362
363 #[tokio::test]
364 async fn test_array_lru_eviction() {
365 let config = EntityCacheConfig {
366 max_entities_per_view: 1000,
367 max_array_length: 3,
368 ..Default::default()
369 };
370 let cache = EntityCache::with_config(config);
371
372 cache
373 .upsert(
374 "tokens/list",
375 "abc123",
376 json!({
377 "events": [
378 {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
379 ]
380 }),
381 )
382 .await;
383
384 let entity = cache.get("tokens/list", "abc123").await.unwrap();
385 let events = entity["events"].as_array().unwrap();
386
387 assert_eq!(events.len(), 3);
388 assert_eq!(events[0]["id"], 3);
389 assert_eq!(events[1]["id"], 4);
390 assert_eq!(events[2]["id"], 5);
391 }
392
393 #[tokio::test]
394 async fn test_array_append_with_lru() {
395 let config = EntityCacheConfig {
396 max_entities_per_view: 1000,
397 max_array_length: 3,
398 ..Default::default()
399 };
400 let cache = EntityCache::with_config(config);
401
402 cache
403 .upsert(
404 "tokens/list",
405 "abc123",
406 json!({
407 "events": [{"id": 1}, {"id": 2}]
408 }),
409 )
410 .await;
411
412 cache
413 .upsert_with_append(
414 "tokens/list",
415 "abc123",
416 json!({
417 "events": [{"id": 3}, {"id": 4}]
418 }),
419 &["events".to_string()],
420 )
421 .await;
422
423 let entity = cache.get("tokens/list", "abc123").await.unwrap();
424 let events = entity["events"].as_array().unwrap();
425
426 assert_eq!(events.len(), 3);
428 assert_eq!(events[0]["id"], 2);
429 assert_eq!(events[1]["id"], 3);
430 assert_eq!(events[2]["id"], 4);
431 }
432
433 #[tokio::test]
434 async fn test_entity_lru_eviction() {
435 let config = EntityCacheConfig {
436 max_entities_per_view: 2,
437 max_array_length: 100,
438 ..Default::default()
439 };
440 let cache = EntityCache::with_config(config);
441
442 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
443 cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
444 cache.upsert("tokens/list", "key3", json!({"id": 3})).await;
445
446 assert!(cache.get("tokens/list", "key1").await.is_none());
447 assert!(cache.get("tokens/list", "key2").await.is_some());
448 assert!(cache.get("tokens/list", "key3").await.is_some());
449 }
450
451 #[tokio::test]
452 async fn test_get_all() {
453 let cache = EntityCache::new();
454
455 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
456 cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
457
458 let all = cache.get_all("tokens/list").await;
459 assert_eq!(all.len(), 2);
460 }
461
462 #[tokio::test]
463 async fn test_separate_views() {
464 let cache = EntityCache::new();
465
466 cache
467 .upsert("tokens/list", "key1", json!({"type": "token"}))
468 .await;
469 cache
470 .upsert("games/list", "key1", json!({"type": "game"}))
471 .await;
472
473 let token = cache.get("tokens/list", "key1").await.unwrap();
474 let game = cache.get("games/list", "key1").await.unwrap();
475
476 assert_eq!(token["type"], "token");
477 assert_eq!(game["type"], "game");
478 }
479
480 #[test]
481 fn test_deep_merge_with_append() {
482 let mut base = json!({
483 "a": 1,
484 "b": {"c": 2},
485 "arr": [1, 2]
486 });
487
488 let patch = json!({
489 "b": {"d": 3},
490 "arr": [3],
491 "e": 4
492 });
493
494 deep_merge_with_append(&mut base, patch, &["arr".to_string()], 100);
495
496 assert_eq!(base["a"], 1);
497 assert_eq!(base["b"]["c"], 2);
498 assert_eq!(base["b"]["d"], 3);
499 assert_eq!(base["arr"].as_array().unwrap().len(), 3);
500 assert_eq!(base["e"], 4);
501 }
502
503 #[test]
504 fn test_deep_merge_replace_array() {
505 let mut base = json!({
506 "arr": [1, 2, 3]
507 });
508
509 let patch = json!({
510 "arr": [4, 5]
511 });
512
513 deep_merge_with_append(&mut base, patch, &[], 100);
514
515 assert_eq!(base["arr"].as_array().unwrap().len(), 2);
516 assert_eq!(base["arr"][0], 4);
517 assert_eq!(base["arr"][1], 5);
518 }
519
520 #[test]
521 fn test_deep_merge_nested_append() {
522 let mut base = json!({
523 "stats": {"events": [1, 2]}
524 });
525
526 let patch = json!({
527 "stats": {"events": [3]}
528 });
529
530 deep_merge_with_append(&mut base, patch, &["stats.events".to_string()], 100);
531
532 assert_eq!(base["stats"]["events"].as_array().unwrap().len(), 3);
533 }
534
535 #[test]
536 fn test_snapshot_config_defaults() {
537 let cache = EntityCache::new();
538 let config = cache.snapshot_config();
539
540 assert_eq!(config.initial_batch_size, 50);
541 assert_eq!(config.subsequent_batch_size, 100);
542 }
543
544 #[test]
545 fn test_snapshot_config_custom() {
546 let config = EntityCacheConfig {
547 initial_snapshot_batch_size: 25,
548 subsequent_snapshot_batch_size: 75,
549 ..Default::default()
550 };
551 let cache = EntityCache::with_config(config);
552 let snapshot_config = cache.snapshot_config();
553
554 assert_eq!(snapshot_config.initial_batch_size, 25);
555 assert_eq!(snapshot_config.subsequent_batch_size, 75);
556 }
557}