1use lru::LruCache;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::num::NonZeroUsize;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14const DEFAULT_MAX_ENTITIES_PER_VIEW: usize = 1000;
16
17const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
19
20#[derive(Debug, Clone)]
22pub struct EntityCacheConfig {
23 pub max_entities_per_view: usize,
25 pub max_array_length: usize,
27}
28
29impl Default for EntityCacheConfig {
30 fn default() -> Self {
31 Self {
32 max_entities_per_view: DEFAULT_MAX_ENTITIES_PER_VIEW,
33 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
34 }
35 }
36}
37
38#[derive(Clone)]
44pub struct EntityCache {
45 caches: Arc<RwLock<HashMap<String, LruCache<String, Value>>>>,
47 config: EntityCacheConfig,
48}
49
50impl EntityCache {
51 pub fn new() -> Self {
53 Self::with_config(EntityCacheConfig::default())
54 }
55
56 pub fn with_config(config: EntityCacheConfig) -> Self {
58 Self {
59 caches: Arc::new(RwLock::new(HashMap::new())),
60 config,
61 }
62 }
63
64 pub async fn upsert(&self, view_id: &str, key: &str, patch: &Value) {
72 let mut caches = self.caches.write().await;
73
74 let cache = caches.entry(view_id.to_string()).or_insert_with(|| {
75 LruCache::new(
76 NonZeroUsize::new(self.config.max_entities_per_view)
77 .expect("max_entities_per_view must be > 0"),
78 )
79 });
80
81 let entity = cache
83 .get_mut(key)
84 .map(|v| v.clone())
85 .unwrap_or_else(|| Value::Object(serde_json::Map::new()));
86
87 let merged = deep_merge_with_array_append(entity, patch, self.config.max_array_length);
89
90 cache.put(key.to_string(), merged);
92 }
93
94 pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> {
99 let caches = self.caches.read().await;
100
101 caches
102 .get(view_id)
103 .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
104 .unwrap_or_default()
105 }
106
107 pub async fn get(&self, view_id: &str, key: &str) -> Option<Value> {
109 let caches = self.caches.read().await;
110 caches
111 .get(view_id)
112 .and_then(|cache| cache.peek(key).cloned())
113 }
114
115 pub async fn len(&self, view_id: &str) -> usize {
117 let caches = self.caches.read().await;
118 caches.get(view_id).map(|c| c.len()).unwrap_or(0)
119 }
120
121 pub async fn is_empty(&self, view_id: &str) -> bool {
123 self.len(view_id).await == 0
124 }
125
126 pub async fn clear(&self, view_id: &str) {
128 let mut caches = self.caches.write().await;
129 if let Some(cache) = caches.get_mut(view_id) {
130 cache.clear();
131 }
132 }
133
134 pub async fn clear_all(&self) {
136 let mut caches = self.caches.write().await;
137 caches.clear();
138 }
139}
140
141impl Default for EntityCache {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147fn deep_merge_with_array_append(base: Value, patch: &Value, max_array_length: usize) -> Value {
152 match (base, patch) {
153 (Value::Object(mut base_map), Value::Object(patch_map)) => {
155 for (key, patch_value) in patch_map {
156 let merged = if let Some(base_value) = base_map.remove(key) {
157 deep_merge_with_array_append(base_value, patch_value, max_array_length)
158 } else {
159 truncate_arrays_if_needed(patch_value.clone(), max_array_length)
161 };
162 base_map.insert(key.clone(), merged);
163 }
164 Value::Object(base_map)
165 }
166
167 (Value::Array(mut base_arr), Value::Array(patch_arr)) => {
169 base_arr.extend(patch_arr.iter().cloned());
171
172 if base_arr.len() > max_array_length {
174 let excess = base_arr.len() - max_array_length;
175 base_arr.drain(0..excess);
176 }
177
178 Value::Array(base_arr)
179 }
180
181 (_, Value::Array(patch_arr)) => {
183 let mut arr = patch_arr.clone();
184 if arr.len() > max_array_length {
185 let excess = arr.len() - max_array_length;
186 arr.drain(0..excess);
187 }
188 Value::Array(arr)
189 }
190
191 (_, patch_value) => patch_value.clone(),
193 }
194}
195
196fn truncate_arrays_if_needed(value: Value, max_array_length: usize) -> Value {
198 match value {
199 Value::Array(mut arr) => {
200 if arr.len() > max_array_length {
202 let excess = arr.len() - max_array_length;
203 arr.drain(0..excess);
204 }
205 Value::Array(
207 arr.into_iter()
208 .map(|v| truncate_arrays_if_needed(v, max_array_length))
209 .collect(),
210 )
211 }
212 Value::Object(map) => Value::Object(
213 map.into_iter()
214 .map(|(k, v)| (k, truncate_arrays_if_needed(v, max_array_length)))
215 .collect(),
216 ),
217 other => other,
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use serde_json::json;
225
226 #[tokio::test]
227 async fn test_basic_upsert_and_get() {
228 let cache = EntityCache::new();
229
230 cache
231 .upsert("tokens/list", "abc123", &json!({"name": "Test Token"}))
232 .await;
233
234 let entity = cache.get("tokens/list", "abc123").await;
235 assert!(entity.is_some());
236 assert_eq!(entity.unwrap()["name"], "Test Token");
237 }
238
239 #[tokio::test]
240 async fn test_deep_merge_objects() {
241 let cache = EntityCache::new();
242
243 cache
245 .upsert(
246 "tokens/list",
247 "abc123",
248 &json!({
249 "id": "abc123",
250 "metrics": {"volume": 100}
251 }),
252 )
253 .await;
254
255 cache
257 .upsert(
258 "tokens/list",
259 "abc123",
260 &json!({
261 "metrics": {"trades": 50}
262 }),
263 )
264 .await;
265
266 let entity = cache.get("tokens/list", "abc123").await.unwrap();
267 assert_eq!(entity["id"], "abc123");
268 assert_eq!(entity["metrics"]["volume"], 100);
269 assert_eq!(entity["metrics"]["trades"], 50);
270 }
271
272 #[tokio::test]
273 async fn test_array_append() {
274 let cache = EntityCache::new();
275
276 cache
278 .upsert(
279 "tokens/list",
280 "abc123",
281 &json!({
282 "events": [{"type": "buy", "amount": 100}]
283 }),
284 )
285 .await;
286
287 cache
289 .upsert(
290 "tokens/list",
291 "abc123",
292 &json!({
293 "events": [{"type": "sell", "amount": 50}]
294 }),
295 )
296 .await;
297
298 let entity = cache.get("tokens/list", "abc123").await.unwrap();
299 let events = entity["events"].as_array().unwrap();
300 assert_eq!(events.len(), 2);
301 assert_eq!(events[0]["type"], "buy");
302 assert_eq!(events[1]["type"], "sell");
303 }
304
305 #[tokio::test]
306 async fn test_array_lru_eviction() {
307 let config = EntityCacheConfig {
308 max_entities_per_view: 1000,
309 max_array_length: 3,
310 };
311 let cache = EntityCache::with_config(config);
312
313 cache
315 .upsert(
316 "tokens/list",
317 "abc123",
318 &json!({
319 "events": [
320 {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}
321 ]
322 }),
323 )
324 .await;
325
326 let entity = cache.get("tokens/list", "abc123").await.unwrap();
327 let events = entity["events"].as_array().unwrap();
328
329 assert_eq!(events.len(), 3);
331 assert_eq!(events[0]["id"], 3);
332 assert_eq!(events[1]["id"], 4);
333 assert_eq!(events[2]["id"], 5);
334 }
335
336 #[tokio::test]
337 async fn test_array_append_with_lru() {
338 let config = EntityCacheConfig {
339 max_entities_per_view: 1000,
340 max_array_length: 3,
341 };
342 let cache = EntityCache::with_config(config);
343
344 cache
346 .upsert(
347 "tokens/list",
348 "abc123",
349 &json!({
350 "events": [{"id": 1}, {"id": 2}]
351 }),
352 )
353 .await;
354
355 cache
357 .upsert(
358 "tokens/list",
359 "abc123",
360 &json!({
361 "events": [{"id": 3}, {"id": 4}]
362 }),
363 )
364 .await;
365
366 let entity = cache.get("tokens/list", "abc123").await.unwrap();
367 let events = entity["events"].as_array().unwrap();
368
369 assert_eq!(events.len(), 3);
371 assert_eq!(events[0]["id"], 2);
372 assert_eq!(events[1]["id"], 3);
373 assert_eq!(events[2]["id"], 4);
374 }
375
376 #[tokio::test]
377 async fn test_entity_lru_eviction() {
378 let config = EntityCacheConfig {
379 max_entities_per_view: 2,
380 max_array_length: 100,
381 };
382 let cache = EntityCache::with_config(config);
383
384 cache.upsert("tokens/list", "key1", &json!({"id": 1})).await;
386 cache.upsert("tokens/list", "key2", &json!({"id": 2})).await;
387 cache.upsert("tokens/list", "key3", &json!({"id": 3})).await;
388
389 assert!(cache.get("tokens/list", "key1").await.is_none());
391 assert!(cache.get("tokens/list", "key2").await.is_some());
392 assert!(cache.get("tokens/list", "key3").await.is_some());
393 }
394
395 #[tokio::test]
396 async fn test_get_all() {
397 let cache = EntityCache::new();
398
399 cache.upsert("tokens/list", "key1", &json!({"id": 1})).await;
400 cache.upsert("tokens/list", "key2", &json!({"id": 2})).await;
401
402 let all = cache.get_all("tokens/list").await;
403 assert_eq!(all.len(), 2);
404 }
405
406 #[tokio::test]
407 async fn test_separate_views() {
408 let cache = EntityCache::new();
409
410 cache
411 .upsert("tokens/list", "key1", &json!({"type": "token"}))
412 .await;
413 cache
414 .upsert("games/list", "key1", &json!({"type": "game"}))
415 .await;
416
417 let token = cache.get("tokens/list", "key1").await.unwrap();
418 let game = cache.get("games/list", "key1").await.unwrap();
419
420 assert_eq!(token["type"], "token");
421 assert_eq!(game["type"], "game");
422 }
423
424 #[test]
425 fn test_deep_merge_function() {
426 let base = json!({
427 "a": 1,
428 "b": {"c": 2},
429 "arr": [1, 2]
430 });
431
432 let patch = json!({
433 "b": {"d": 3},
434 "arr": [3],
435 "e": 4
436 });
437
438 let result = deep_merge_with_array_append(base, &patch, 100);
439
440 assert_eq!(result["a"], 1);
441 assert_eq!(result["b"]["c"], 2);
442 assert_eq!(result["b"]["d"], 3);
443 assert_eq!(result["arr"].as_array().unwrap().len(), 3);
444 assert_eq!(result["e"], 4);
445 }
446}