1use crate::frame::{
2 parse_snapshot_entities, Frame, Operation, SortConfig, SortOrder, SubscribedFrame,
3};
4use serde::de::DeserializeOwned;
5use serde_json::Value;
6use std::cmp::Ordering;
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::sync::Arc;
9use tokio::sync::{broadcast, watch, RwLock};
10
11pub const DEFAULT_MAX_ENTRIES_PER_VIEW: usize = 10_000;
14
15#[derive(Debug, Clone)]
17pub struct StoreConfig {
18 pub max_entries_per_view: Option<usize>,
22}
23
24impl Default for StoreConfig {
25 fn default() -> Self {
26 Self {
27 max_entries_per_view: Some(DEFAULT_MAX_ENTRIES_PER_VIEW),
28 }
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33struct SortKey {
34 sort_value: SortValue,
35 entity_key: String,
36}
37
38impl PartialOrd for SortKey {
39 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
40 Some(self.cmp(other))
41 }
42}
43
44impl Ord for SortKey {
45 fn cmp(&self, other: &Self) -> Ordering {
46 match self.sort_value.cmp(&other.sort_value) {
47 Ordering::Equal => self.entity_key.cmp(&other.entity_key),
48 other => other,
49 }
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54enum SortValue {
55 Null,
56 Bool(bool),
57 Integer(i64),
58 String(String),
59}
60
61impl Ord for SortValue {
62 fn cmp(&self, other: &Self) -> Ordering {
63 match (self, other) {
64 (SortValue::Null, SortValue::Null) => Ordering::Equal,
65 (SortValue::Null, _) => Ordering::Less,
66 (_, SortValue::Null) => Ordering::Greater,
67 (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
68 (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
69 (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
70 (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
71 (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
72 (SortValue::Bool(_), _) => Ordering::Less,
73 (_, SortValue::Bool(_)) => Ordering::Greater,
74 }
75 }
76}
77
78impl PartialOrd for SortValue {
79 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80 Some(self.cmp(other))
81 }
82}
83
84fn extract_sort_value(entity: &Value, field_path: &[String]) -> SortValue {
85 let mut current = entity;
86 for segment in field_path {
87 match current.get(segment) {
88 Some(v) => current = v,
89 None => return SortValue::Null,
90 }
91 }
92
93 match current {
94 Value::Null => SortValue::Null,
95 Value::Bool(b) => SortValue::Bool(*b),
96 Value::Number(n) => {
97 if let Some(i) = n.as_i64() {
98 SortValue::Integer(i)
99 } else if let Some(f) = n.as_f64() {
100 SortValue::Integer(f as i64)
101 } else {
102 SortValue::Null
103 }
104 }
105 Value::String(s) => SortValue::String(s.clone()),
106 _ => SortValue::Null,
107 }
108}
109
110
111
112struct ViewData {
113 entities: HashMap<String, serde_json::Value>,
114 access_order: VecDeque<String>,
115 sort_config: Option<SortConfig>,
116 sorted_keys: BTreeMap<SortKey, ()>,
117}
118
119fn deep_merge_with_append(
120 target: &mut Value,
121 patch: &Value,
122 append_paths: &[String],
123 current_path: &str,
124) {
125 match (target, patch) {
126 (Value::Object(target_map), Value::Object(patch_map)) => {
127 for (key, patch_value) in patch_map {
128 let field_path = if current_path.is_empty() {
129 key.clone()
130 } else {
131 format!("{}.{}", current_path, key)
132 };
133 match target_map.get_mut(key) {
134 Some(target_value) => {
135 deep_merge_with_append(target_value, patch_value, append_paths, &field_path)
136 }
137 None => {
138 target_map.insert(key.clone(), patch_value.clone());
139 }
140 }
141 }
142 }
143 (Value::Array(target_arr), Value::Array(patch_arr))
144 if append_paths.contains(¤t_path.to_string()) =>
145 {
146 target_arr.extend(patch_arr.iter().cloned());
147 }
148 (target, patch) => {
149 *target = patch.clone();
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
155pub struct StoreUpdate {
156 pub view: String,
157 pub key: String,
158 pub operation: Operation,
159 pub data: Option<serde_json::Value>,
160 pub previous: Option<serde_json::Value>,
161 pub patch: Option<serde_json::Value>,
164}
165
166pub struct SharedStore {
167 views: Arc<RwLock<HashMap<String, ViewData>>>,
168 view_configs: Arc<RwLock<HashMap<String, SortConfig>>>,
169 updates_tx: broadcast::Sender<StoreUpdate>,
170 ready_views: Arc<RwLock<HashSet<String>>>,
171 ready_tx: watch::Sender<HashSet<String>>,
172 ready_rx: watch::Receiver<HashSet<String>>,
173 config: StoreConfig,
174}
175
176impl ViewData {
177 fn new() -> Self {
178 Self {
179 entities: HashMap::new(),
180 access_order: VecDeque::new(),
181 sort_config: None,
182 sorted_keys: BTreeMap::new(),
183 }
184 }
185
186 fn with_sort_config(sort_config: SortConfig) -> Self {
187 Self {
188 entities: HashMap::new(),
189 access_order: VecDeque::new(),
190 sort_config: Some(sort_config),
191 sorted_keys: BTreeMap::new(),
192 }
193 }
194
195 fn set_sort_config(&mut self, config: SortConfig) {
196 if self.sort_config.is_some() {
197 return;
198 }
199 self.sort_config = Some(config);
200 self.rebuild_sorted_keys();
201 }
202
203 fn rebuild_sorted_keys(&mut self) {
204 self.sorted_keys.clear();
205 if let Some(ref config) = self.sort_config {
206 for (key, value) in &self.entities {
207 let sort_value = extract_sort_value(value, &config.field);
208 let sort_key = SortKey {
209 sort_value,
210 entity_key: key.clone(),
211 };
212 self.sorted_keys.insert(sort_key, ());
213 }
214 }
215 self.access_order.clear();
216 }
217
218 fn touch(&mut self, key: &str) {
219 if self.sort_config.is_some() {
220 return;
221 }
222 self.access_order.retain(|k| k != key);
223 self.access_order.push_back(key.to_string());
224 }
225
226 fn insert(&mut self, key: String, value: serde_json::Value) {
227 if let Some(ref config) = self.sort_config {
228 if let Some(old_value) = self.entities.get(&key) {
229 let old_sort_value = extract_sort_value(old_value, &config.field);
230 let old_sort_key = SortKey {
231 sort_value: old_sort_value,
232 entity_key: key.clone(),
233 };
234 self.sorted_keys.remove(&old_sort_key);
235 }
236
237 let sort_value = extract_sort_value(&value, &config.field);
238 let sort_key = SortKey {
239 sort_value,
240 entity_key: key.clone(),
241 };
242 self.sorted_keys.insert(sort_key, ());
243 } else if !self.entities.contains_key(&key) {
244 self.access_order.push_back(key.clone());
245 } else {
246 self.touch(&key);
247 }
248 self.entities.insert(key, value);
249 }
250
251 fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
252 if let Some(ref config) = self.sort_config {
253 if let Some(value) = self.entities.get(key) {
254 let sort_value = extract_sort_value(value, &config.field);
255 let sort_key = SortKey {
256 sort_value,
257 entity_key: key.to_string(),
258 };
259 self.sorted_keys.remove(&sort_key);
260 }
261 } else {
262 self.access_order.retain(|k| k != key);
263 }
264 self.entities.remove(key)
265 }
266
267 fn evict_oldest(&mut self) -> Option<String> {
268 if self.sort_config.is_some() {
269 if let Some((sort_key, _)) = self
270 .sorted_keys
271 .iter()
272 .next_back()
273 .map(|(k, v)| (k.clone(), *v))
274 {
275 self.sorted_keys.remove(&sort_key);
276 self.entities.remove(&sort_key.entity_key);
277 return Some(sort_key.entity_key);
278 }
279 return None;
280 }
281
282 if let Some(oldest_key) = self.access_order.pop_front() {
283 self.entities.remove(&oldest_key);
284 Some(oldest_key)
285 } else {
286 None
287 }
288 }
289
290 fn len(&self) -> usize {
291 self.entities.len()
292 }
293
294 #[allow(dead_code)]
295 fn ordered_keys(&self) -> Vec<String> {
296 if let Some(ref config) = self.sort_config {
297 let keys: Vec<String> = self
298 .sorted_keys
299 .keys()
300 .map(|sk| sk.entity_key.clone())
301 .collect();
302 match config.order {
303 SortOrder::Asc => keys,
304 SortOrder::Desc => keys.into_iter().rev().collect(),
305 }
306 } else {
307 self.entities.keys().cloned().collect()
308 }
309 }
310
311 fn ordered_values(&self) -> Vec<serde_json::Value> {
312 if let Some(ref config) = self.sort_config {
313 let values: Vec<serde_json::Value> = self
314 .sorted_keys
315 .keys()
316 .filter_map(|sk| self.entities.get(&sk.entity_key).cloned())
317 .collect();
318 match config.order {
319 SortOrder::Asc => values,
320 SortOrder::Desc => values.into_iter().rev().collect(),
321 }
322 } else {
323 self.entities.values().cloned().collect()
324 }
325 }
326}
327
328impl SharedStore {
329 pub fn new() -> Self {
330 Self::with_config(StoreConfig::default())
331 }
332
333 pub fn with_config(config: StoreConfig) -> Self {
334 let (updates_tx, _) = broadcast::channel(1000);
335 let (ready_tx, ready_rx) = watch::channel(HashSet::new());
336 Self {
337 views: Arc::new(RwLock::new(HashMap::new())),
338 view_configs: Arc::new(RwLock::new(HashMap::new())),
339 updates_tx,
340 ready_views: Arc::new(RwLock::new(HashSet::new())),
341 ready_tx,
342 ready_rx,
343 config,
344 }
345 }
346
347 fn enforce_max_entries(&self, view_data: &mut ViewData) {
348 if let Some(max) = self.config.max_entries_per_view {
349 while view_data.len() > max {
350 if let Some(evicted_key) = view_data.evict_oldest() {
351 tracing::debug!("evicted oldest entry: {}", evicted_key);
352 }
353 }
354 }
355 }
356
357 pub async fn apply_frame(&self, frame: Frame) {
358 let view_path = &frame.entity;
359 tracing::debug!(
360 "apply_frame: view={}, key={}, op={}",
361 view_path,
362 frame.key,
363 frame.op,
364 );
365
366 let operation = frame.operation();
367
368 if operation == Operation::Snapshot {
369 self.apply_snapshot(&frame).await;
370 return;
371 }
372
373 let sort_config = self.view_configs.read().await.get(view_path).cloned();
374
375 let mut views = self.views.write().await;
376 let view_data = views.entry(view_path.to_string()).or_insert_with(|| {
377 if let Some(config) = sort_config {
378 ViewData::with_sort_config(config)
379 } else {
380 ViewData::new()
381 }
382 });
383
384 let previous = view_data.entities.get(&frame.key).cloned();
385
386 let (current, patch) = match operation {
387 Operation::Upsert | Operation::Create => {
388 view_data.insert(frame.key.clone(), frame.data.clone());
389 self.enforce_max_entries(view_data);
390 (Some(frame.data), None)
391 }
392 Operation::Patch => {
393 let raw_patch = frame.data.clone();
394 let entry = view_data
395 .entities
396 .entry(frame.key.clone())
397 .or_insert_with(|| serde_json::json!({}));
398 deep_merge_with_append(entry, &frame.data, &frame.append, "");
399 let merged = entry.clone();
400 view_data.touch(&frame.key);
401 self.enforce_max_entries(view_data);
402 (Some(merged), Some(raw_patch))
403 }
404 Operation::Delete => {
405 view_data.remove(&frame.key);
406 (None, None)
407 }
408 Operation::Snapshot | Operation::Subscribed => unreachable!(),
409 };
410
411 let _ = self.updates_tx.send(StoreUpdate {
412 view: view_path.to_string(),
413 key: frame.key,
414 operation,
415 data: current,
416 previous,
417 patch,
418 });
419
420 self.mark_view_ready(view_path).await;
421 }
422
423 async fn apply_snapshot(&self, frame: &Frame) {
424 let view_path = &frame.entity;
425 let snapshot_entities = parse_snapshot_entities(&frame.data);
426
427 tracing::debug!(
428 "apply_snapshot: view={}, count={}",
429 view_path,
430 snapshot_entities.len()
431 );
432
433 let sort_config = self.view_configs.read().await.get(view_path).cloned();
434
435 let mut views = self.views.write().await;
436 let view_data = views.entry(view_path.to_string()).or_insert_with(|| {
437 if let Some(config) = sort_config {
438 ViewData::with_sort_config(config)
439 } else {
440 ViewData::new()
441 }
442 });
443
444 for entity in snapshot_entities {
445 let previous = view_data.entities.get(&entity.key).cloned();
446 view_data.insert(entity.key.clone(), entity.data.clone());
447
448 let _ = self.updates_tx.send(StoreUpdate {
449 view: view_path.to_string(),
450 key: entity.key,
451 operation: Operation::Upsert,
452 data: Some(entity.data),
453 previous,
454 patch: None,
455 });
456 }
457
458 self.enforce_max_entries(view_data);
459 drop(views);
460 self.mark_view_ready(view_path).await;
461 }
462
463 pub async fn mark_view_ready(&self, view: &str) {
464 let mut ready = self.ready_views.write().await;
465 if ready.insert(view.to_string()) {
466 let _ = self.ready_tx.send(ready.clone());
467 }
468 }
469
470 pub async fn wait_for_view_ready(&self, view: &str, timeout: std::time::Duration) -> bool {
471 if self.ready_views.read().await.contains(view) {
472 return true;
473 }
474
475 let mut rx = self.ready_rx.clone();
476 let deadline = tokio::time::Instant::now() + timeout;
477
478 loop {
479 let timeout_remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
480 if timeout_remaining.is_zero() {
481 return false;
482 }
483
484 tokio::select! {
485 result = rx.changed() => {
486 if result.is_err() {
487 return false;
488 }
489 if rx.borrow().contains(view) {
490 return true;
491 }
492 }
493 _ = tokio::time::sleep(timeout_remaining) => {
494 return false;
495 }
496 }
497 }
498 }
499
500 pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
501 let views = self.views.read().await;
502 views
503 .get(view)?
504 .entities
505 .get(key)
506 .and_then(|v| serde_json::from_value(v.clone()).ok())
507 }
508
509 pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
510 let views = self.views.read().await;
511 views
512 .get(view)
513 .map(|view_data| {
514 view_data
515 .ordered_values()
516 .into_iter()
517 .filter_map(|v| serde_json::from_value(v).ok())
518 .collect()
519 })
520 .unwrap_or_default()
521 }
522
523 pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
524 let views = self.views.read().await;
525 views
526 .get(view)
527 .map(|view_data| view_data.entities.clone())
528 .unwrap_or_default()
529 }
530
531 pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
532 self.updates_tx.subscribe()
533 }
534
535 pub async fn apply_subscribed_frame(&self, frame: SubscribedFrame) {
536 let view_path = &frame.view;
537 tracing::debug!(
538 "apply_subscribed_frame: view={}, mode={:?}, sort={:?}",
539 view_path,
540 frame.mode,
541 frame.sort,
542 );
543
544 if let Some(sort_config) = frame.sort {
545 self.view_configs
546 .write()
547 .await
548 .insert(view_path.to_string(), sort_config.clone());
549
550 let mut views = self.views.write().await;
551 if let Some(view_data) = views.get_mut(view_path) {
552 view_data.set_sort_config(sort_config);
553 }
554 }
555 }
556
557 pub async fn get_view_sort_config(&self, view: &str) -> Option<SortConfig> {
558 self.view_configs.read().await.get(view).cloned()
559 }
560
561 pub async fn set_view_sort_config(&self, view: &str, config: SortConfig) {
562 self.view_configs
563 .write()
564 .await
565 .insert(view.to_string(), config.clone());
566
567 let mut views = self.views.write().await;
568 if let Some(view_data) = views.get_mut(view) {
569 view_data.set_sort_config(config);
570 }
571 }
572}
573
574impl Default for SharedStore {
575 fn default() -> Self {
576 Self::new()
577 }
578}
579
580impl Clone for SharedStore {
581 fn clone(&self) -> Self {
582 Self {
583 views: self.views.clone(),
584 view_configs: self.view_configs.clone(),
585 updates_tx: self.updates_tx.clone(),
586 ready_views: self.ready_views.clone(),
587 ready_tx: self.ready_tx.clone(),
588 ready_rx: self.ready_rx.clone(),
589 config: self.config.clone(),
590 }
591 }
592}