1use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 500;
15const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
16
17#[derive(Debug, Clone)]
19pub struct EntityCacheConfig {
20 pub max_entities_per_view: usize,
22 pub max_array_length: usize,
24}
25
26impl Default for EntityCacheConfig {
27 fn default() -> Self {
28 Self {
29 max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
30 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
31 }
32 }
33}
34
35#[derive(Clone)]
41pub struct EntityCache {
42 caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
44 config: EntityCacheConfig,
45}
46
47impl EntityCache {
48 pub fn new() -> Self {
50 Self::with_config(EntityCacheConfig::default())
51 }
52
53 pub fn with_config(config: EntityCacheConfig) -> Self {
55 Self {
56 caches: Arc::new(RwLock::new(HashMap::new())),
57 config,
58 }
59 }
60
61 pub async fn upsert(&self, view_id: &str, key: &str, patch: Value) {
62 self.upsert_with_append(view_id, key, patch, &[]).await;
63 }
64
65 pub async fn upsert_with_append(
66 &self,
67 view_id: &str,
68 key: &str,
69 patch: Value,
70 append_paths: &[String],
71 ) {
72 let mut caches = self.caches.write().await;
73
74 let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
75 LruCache::new(
76 NonZeroUsize::new(self.config.max_entities_per_view)
77 .expect("max_entities_per_view must be > 0"),
78 )
79 });
80
81 let max_array_length = self.config.max_array_length;
82
83 if let Some(entity) = cache.get_mut(key) {
84 deep_merge_with_append(entity, patch, append_paths, max_array_length);
85 } else {
86 let new_entity = truncate_arrays_if_needed(patch, max_array_length);
87 cache.put(key.to_string(), new_entity);
88 }
89 }
90
91 pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
96 let caches = self.caches.read().await;
97
98 caches
99 .get(view_id)
100 .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
101 .unwrap_or_default()
102 }
103
104 pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
106 let caches = self.caches.read().await;
107 caches
108 .get(view_id)
109 .and_then(|cache| cache.peek(key).cloned())
110 }
111
112 pub async fn len(&self, view_id: &str) -> usize {
114 let caches = self.caches.read().await;
115 caches.get(view_id).map(|c| c.len()).unwrap_or(0)
116 }
117
118 pub async fn is_empty(&self, view_id: &str) -> bool {
120 self.len(view_id).await == 0
121 }
122
123 pub async fn clear(&self, view_id: &str) {
125 let mut caches = self.caches.write().await;
126 if let Some(cache) = caches.get_mut(view_id) {
127 cache.clear();
128 }
129 }
130
131 pub async fn clear_all(&self) {
132 let mut caches = self.caches.write().await;
133 caches.clear();
134 }
135
136 pub async fn stats(&self) -> CacheStats {
137 let caches = self.caches.read().await;
138 let mut total_entities = 0;
139 let mut views = Vec::new();
140
141 for (view_id, cache) in caches.iter() {
142 let count = cache.len();
143 total_entities += count;
144 views.push((view_id.clone(), count));
145 }
146
147 views.sort_by(|a, b| b.1.cmp(&a.1));
148
149 CacheStats {
150 view_count: caches.len(),
151 total_entities,
152 top_views: views.into_iter().take(5).collect(),
153 }
154 }
155}
156
157#[derive(Debug)]
158pub struct CacheStats {
159 pub view_count: usize,
160 pub total_entities: usize,
161 pub top_views: Vec<(String, usize)>,
162}
163
164impl Default for EntityCache {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170fn deep_merge_with_append(
171 base: &mut Value,
172 patch: Value,
173 append_paths: &[String],
174 max_array_length: usize,
175) {
176 deep_merge_with_append_inner(base, patch, append_paths, "", max_array_length);
177}
178
179fn deep_merge_with_append_inner(
180 base: &mut Value,
181 patch: Value,
182 append_paths: &[String],
183 current_path: &str,
184 max_array_length: usize,
185) {
186 match (base, patch) {
187 (Value::Object(base_map), Value::Object(patch_map)) => {
188 for (key, patch_value) in patch_map {
189 let child_path = if current_path.is_empty() {
190 key.clone()
191 } else {
192 format!("{}.{}", current_path, key)
193 };
194
195 if let Some(base_value) = base_map.get_mut(&key) {
196 deep_merge_with_append_inner(
197 base_value,
198 patch_value,
199 append_paths,
200 &child_path,
201 max_array_length,
202 );
203 } else {
204 base_map.insert(
205 key,
206 truncate_arrays_if_needed(patch_value, max_array_length),
207 );
208 }
209 }
210 }
211
212 (Value::Array(base_arr), Value::Array(patch_arr)) => {
213 let should_append = append_paths.iter().any(|p| p == current_path);
214 if should_append {
215 base_arr.extend(patch_arr);
216 if base_arr.len() > max_array_length {
217 let excess = base_arr.len() - max_array_length;
218 base_arr.drain(0..excess);
219 }
220 } else {
221 *base_arr = patch_arr;
222 if base_arr.len() > max_array_length {
223 let excess = base_arr.len() - max_array_length;
224 base_arr.drain(0..excess);
225 }
226 }
227 }
228
229 (base, patch_value) => {
230 *base = truncate_arrays_if_needed(patch_value, max_array_length);
231 }
232 }
233}
234
235fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
237 match value {
238 Value::Array(mut arr) => {
239 if arr.len() > max_array_length {
241 let excess = arr.len() - max_array_length;
242 arr.drain(0..excess);
243 }
244 Value::Array(
246 arr.into_iter()
247 .map(|v| truncate_arrays_if_needed(v, max_array_length))
248 .collect(),
249 )
250 }
251 Value::Object(map) => Value::Object(
252 map.into_iter()
253 .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
254 .collect(),
255 ),
256 other => other,
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use serde_json::json;
264
265 #[tokio::test]
266 async fn test_basic_upsert_and_get() {
267 let cache = EntityCache::new();
268
269 cache
270 .upsert("tokens/list", "abc123", json!({"name": "Test Token"}))
271 .await;
272
273 let entity = cache.get("tokens/list", "abc123").await;
274 assert!(entity.is_some());
275 assert_eq!(entity.unwrap()["name"], "Test Token");
276 }
277
278 #[tokio::test]
279 async fn test_deep_merge_objects() {
280 let cache = EntityCache::new();
281
282 cache
283 .upsert(
284 "tokens/list",
285 "abc123",
286 json!({
287 "id": "abc123",
288 "metrics": {"volume": 100}
289 }),
290 )
291 .await;
292
293 cache
294 .upsert(
295 "tokens/list",
296 "abc123",
297 json!({
298 "metrics": {"trades": 50}
299 }),
300 )
301 .await;
302
303 let entity = cache.get("tokens/list", "abc123").await.unwrap();
304 assert_eq!(entity["id"], "abc123");
305 assert_eq!(entity["metrics"]["volume"], 100);
306 assert_eq!(entity["metrics"]["trades"], 50);
307 }
308
309 #[tokio::test]
310 async fn test_array_append() {
311 let cache = EntityCache::new();
312
313 cache
314 .upsert(
315 "tokens/list",
316 "abc123",
317 json!({
318 "events": [{"type": "buy", "amount": 100}]
319 }),
320 )
321 .await;
322
323 cache
324 .upsert_with_append(
325 "tokens/list",
326 "abc123",
327 json!({
328 "events": [{"type": "sell", "amount": 50}]
329 }),
330 &["events".to_string()],
331 )
332 .await;
333
334 let entity = cache.get("tokens/list", "abc123").await.unwrap();
335 let events = entity["events"].as_array().unwrap();
336 assert_eq!(events.len(), 2);
337 assert_eq!(events[0]["type"], "buy");
338 assert_eq!(events[1]["type"], "sell");
339 }
340
341 #[tokio::test]
342 async fn test_array_lru_eviction() {
343 let config = EntityCacheConfig {
344 max_entities_per_view: 1000,
345 max_array_length: 3,
346 };
347 let cache = EntityCache::with_config(config);
348
349 cache
350 .upsert(
351 "tokens/list",
352 "abc123",
353 json!({
354 "events": [
355 {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
356 ]
357 }),
358 )
359 .await;
360
361 let entity = cache.get("tokens/list", "abc123").await.unwrap();
362 let events = entity["events"].as_array().unwrap();
363
364 assert_eq!(events.len(), 3);
365 assert_eq!(events[0]["id"], 3);
366 assert_eq!(events[1]["id"], 4);
367 assert_eq!(events[2]["id"], 5);
368 }
369
370 #[tokio::test]
371 async fn test_array_append_with_lru() {
372 let config = EntityCacheConfig {
373 max_entities_per_view: 1000,
374 max_array_length: 3,
375 };
376 let cache = EntityCache::with_config(config);
377
378 cache
379 .upsert(
380 "tokens/list",
381 "abc123",
382 json!({
383 "events": [{"id": 1}, {"id": 2}]
384 }),
385 )
386 .await;
387
388 cache
389 .upsert_with_append(
390 "tokens/list",
391 "abc123",
392 json!({
393 "events": [{"id": 3}, {"id": 4}]
394 }),
395 &["events".to_string()],
396 )
397 .await;
398
399 let entity = cache.get("tokens/list", "abc123").await.unwrap();
400 let events = entity["events"].as_array().unwrap();
401
402 assert_eq!(events.len(), 3);
404 assert_eq!(events[0]["id"], 2);
405 assert_eq!(events[1]["id"], 3);
406 assert_eq!(events[2]["id"], 4);
407 }
408
409 #[tokio::test]
410 async fn test_entity_lru_eviction() {
411 let config = EntityCacheConfig {
412 max_entities_per_view: 2,
413 max_array_length: 100,
414 };
415 let cache = EntityCache::with_config(config);
416
417 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
418 cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
419 cache.upsert("tokens/list", "key3", json!({"id": 3})).await;
420
421 assert!(cache.get("tokens/list", "key1").await.is_none());
422 assert!(cache.get("tokens/list", "key2").await.is_some());
423 assert!(cache.get("tokens/list", "key3").await.is_some());
424 }
425
426 #[tokio::test]
427 async fn test_get_all() {
428 let cache = EntityCache::new();
429
430 cache.upsert("tokens/list", "key1", json!({"id": 1})).await;
431 cache.upsert("tokens/list", "key2", json!({"id": 2})).await;
432
433 let all = cache.get_all("tokens/list").await;
434 assert_eq!(all.len(), 2);
435 }
436
437 #[tokio::test]
438 async fn test_separate_views() {
439 let cache = EntityCache::new();
440
441 cache
442 .upsert("tokens/list", "key1", json!({"type": "token"}))
443 .await;
444 cache
445 .upsert("games/list", "key1", json!({"type": "game"}))
446 .await;
447
448 let token = cache.get("tokens/list", "key1").await.unwrap();
449 let game = cache.get("games/list", "key1").await.unwrap();
450
451 assert_eq!(token["type"], "token");
452 assert_eq!(game["type"], "game");
453 }
454
455 #[test]
456 fn test_deep_merge_with_append() {
457 let mut base = json!({
458 "a": 1,
459 "b": {"c": 2},
460 "arr": [1, 2]
461 });
462
463 let patch = json!({
464 "b": {"d": 3},
465 "arr": [3],
466 "e": 4
467 });
468
469 deep_merge_with_append(&mut base, patch, &["arr".to_string()], 100);
470
471 assert_eq!(base["a"], 1);
472 assert_eq!(base["b"]["c"], 2);
473 assert_eq!(base["b"]["d"], 3);
474 assert_eq!(base["arr"].as_array().unwrap().len(), 3);
475 assert_eq!(base["e"], 4);
476 }
477
478 #[test]
479 fn test_deep_merge_replace_array() {
480 let mut base = json!({
481 "arr": [1, 2, 3]
482 });
483
484 let patch = json!({
485 "arr": [4, 5]
486 });
487
488 deep_merge_with_append(&mut base, patch, &[], 100);
489
490 assert_eq!(base["arr"].as_array().unwrap().len(), 2);
491 assert_eq!(base["arr"][0], 4);
492 assert_eq!(base["arr"][1], 5);
493 }
494
495 #[test]
496 fn test_deep_merge_nested_append() {
497 let mut base = json!({
498 "stats": {"events": [1, 2]}
499 });
500
501 let patch = json!({
502 "stats": {"events": [3]}
503 });
504
505 deep_merge_with_append(&mut base, patch, &["stats.events".to_string()], 100);
506
507 assert_eq!(base["stats"]["events"].as_array().unwrap().len(), 3);
508 }
509}