1use crate::frame::{
2 parse_snapshot_entities, Frame, Operation, SortConfig, SortOrder, SubscribedFrame,
3};
4use serde::de::DeserializeOwned;
5use serde_json::Value;
6use std::cmp::Ordering;
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::sync::Arc;
9use tokio::sync::{broadcast, watch, RwLock};
10
11pub const DEFAULT_MAX_ENTRIES_PER_VIEW: usize = 10_000;
14
15#[derive(Debug, Clone)]
17pub struct StoreConfig {
18 pub max_entries_per_view: Option<usize>,
22}
23
24impl Default for StoreConfig {
25 fn default() -> Self {
26 Self {
27 max_entries_per_view: Some(DEFAULT_MAX_ENTRIES_PER_VIEW),
28 }
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33struct SortKey {
34 sort_value: SortValue,
35 entity_key: String,
36}
37
38impl PartialOrd for SortKey {
39 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
40 Some(self.cmp(other))
41 }
42}
43
44impl Ord for SortKey {
45 fn cmp(&self, other: &Self) -> Ordering {
46 match self.sort_value.cmp(&other.sort_value) {
47 Ordering::Equal => self.entity_key.cmp(&other.entity_key),
48 other => other,
49 }
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54enum SortValue {
55 Null,
56 Bool(bool),
57 Integer(i64),
58 String(String),
59}
60
61impl Ord for SortValue {
62 fn cmp(&self, other: &Self) -> Ordering {
63 match (self, other) {
64 (SortValue::Null, SortValue::Null) => Ordering::Equal,
65 (SortValue::Null, _) => Ordering::Less,
66 (_, SortValue::Null) => Ordering::Greater,
67 (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
68 (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
69 (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
70 (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
71 (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
72 (SortValue::Bool(_), _) => Ordering::Less,
73 (_, SortValue::Bool(_)) => Ordering::Greater,
74 }
75 }
76}
77
78impl PartialOrd for SortValue {
79 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80 Some(self.cmp(other))
81 }
82}
83
84fn extract_sort_value(entity: &Value, field_path: &[String]) -> SortValue {
85 let mut current = entity;
86 for segment in field_path {
87 match current.get(segment) {
88 Some(v) => current = v,
89 None => return SortValue::Null,
90 }
91 }
92
93 match current {
94 Value::Null => SortValue::Null,
95 Value::Bool(b) => SortValue::Bool(*b),
96 Value::Number(n) => {
97 if let Some(i) = n.as_i64() {
98 SortValue::Integer(i)
99 } else if let Some(f) = n.as_f64() {
100 SortValue::Integer(f as i64)
101 } else {
102 SortValue::Null
103 }
104 }
105 Value::String(s) => SortValue::String(s.clone()),
106 _ => SortValue::Null,
107 }
108}
109
110struct ViewData {
111 entities: HashMap<String, serde_json::Value>,
112 access_order: VecDeque<String>,
113 sort_config: Option<SortConfig>,
114 sorted_keys: BTreeMap<SortKey, ()>,
115}
116
117pub fn deep_merge_with_append(
118 target: &mut Value,
119 patch: &Value,
120 append_paths: &[String],
121 current_path: &str,
122) {
123 match (target, patch) {
124 (Value::Object(target_map), Value::Object(patch_map)) => {
125 for (key, patch_value) in patch_map {
126 let field_path = if current_path.is_empty() {
127 key.clone()
128 } else {
129 format!("{}.{}", current_path, key)
130 };
131 match target_map.get_mut(key) {
132 Some(target_value) => {
133 deep_merge_with_append(target_value, patch_value, append_paths, &field_path)
134 }
135 None => {
136 target_map.insert(key.clone(), patch_value.clone());
137 }
138 }
139 }
140 }
141 (Value::Array(target_arr), Value::Array(patch_arr))
142 if append_paths.contains(¤t_path.to_string()) =>
143 {
144 target_arr.extend(patch_arr.iter().cloned());
145 }
146 (target, patch) => {
147 *target = patch.clone();
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
153pub struct StoreUpdate {
154 pub view: String,
155 pub key: String,
156 pub operation: Operation,
157 pub data: Option<serde_json::Value>,
158 pub previous: Option<serde_json::Value>,
159 pub patch: Option<serde_json::Value>,
162}
163
164pub struct SharedStore {
165 views: Arc<RwLock<HashMap<String, ViewData>>>,
166 view_configs: Arc<RwLock<HashMap<String, SortConfig>>>,
167 updates_tx: broadcast::Sender<StoreUpdate>,
168 ready_views: Arc<RwLock<HashSet<String>>>,
169 ready_tx: watch::Sender<HashSet<String>>,
170 ready_rx: watch::Receiver<HashSet<String>>,
171 config: StoreConfig,
172}
173
174impl ViewData {
175 fn new() -> Self {
176 Self {
177 entities: HashMap::new(),
178 access_order: VecDeque::new(),
179 sort_config: None,
180 sorted_keys: BTreeMap::new(),
181 }
182 }
183
184 fn with_sort_config(sort_config: SortConfig) -> Self {
185 Self {
186 entities: HashMap::new(),
187 access_order: VecDeque::new(),
188 sort_config: Some(sort_config),
189 sorted_keys: BTreeMap::new(),
190 }
191 }
192
193 fn set_sort_config(&mut self, config: SortConfig) {
194 if self.sort_config.is_some() {
195 return;
196 }
197 self.sort_config = Some(config);
198 self.rebuild_sorted_keys();
199 }
200
201 fn rebuild_sorted_keys(&mut self) {
202 self.sorted_keys.clear();
203 if let Some(ref config) = self.sort_config {
204 for (key, value) in &self.entities {
205 let sort_value = extract_sort_value(value, &config.field);
206 let sort_key = SortKey {
207 sort_value,
208 entity_key: key.clone(),
209 };
210 self.sorted_keys.insert(sort_key, ());
211 }
212 }
213 self.access_order.clear();
214 }
215
216 fn touch(&mut self, key: &str) {
217 if self.sort_config.is_some() {
218 return;
219 }
220 self.access_order.retain(|k| k != key);
221 self.access_order.push_back(key.to_string());
222 }
223
224 fn insert(&mut self, key: String, value: serde_json::Value) {
225 if let Some(ref config) = self.sort_config {
226 if let Some(old_value) = self.entities.get(&key) {
227 let old_sort_value = extract_sort_value(old_value, &config.field);
228 let old_sort_key = SortKey {
229 sort_value: old_sort_value,
230 entity_key: key.clone(),
231 };
232 self.sorted_keys.remove(&old_sort_key);
233 }
234
235 let sort_value = extract_sort_value(&value, &config.field);
236 let sort_key = SortKey {
237 sort_value,
238 entity_key: key.clone(),
239 };
240 self.sorted_keys.insert(sort_key, ());
241 } else if !self.entities.contains_key(&key) {
242 self.access_order.push_back(key.clone());
243 } else {
244 self.touch(&key);
245 }
246 self.entities.insert(key, value);
247 }
248
249 fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
250 if let Some(ref config) = self.sort_config {
251 if let Some(value) = self.entities.get(key) {
252 let sort_value = extract_sort_value(value, &config.field);
253 let sort_key = SortKey {
254 sort_value,
255 entity_key: key.to_string(),
256 };
257 self.sorted_keys.remove(&sort_key);
258 }
259 } else {
260 self.access_order.retain(|k| k != key);
261 }
262 self.entities.remove(key)
263 }
264
265 fn evict_oldest(&mut self) -> Option<String> {
266 if self.sort_config.is_some() {
267 if let Some((sort_key, _)) = self
268 .sorted_keys
269 .iter()
270 .next_back()
271 .map(|(k, v)| (k.clone(), *v))
272 {
273 self.sorted_keys.remove(&sort_key);
274 self.entities.remove(&sort_key.entity_key);
275 return Some(sort_key.entity_key);
276 }
277 return None;
278 }
279
280 if let Some(oldest_key) = self.access_order.pop_front() {
281 self.entities.remove(&oldest_key);
282 Some(oldest_key)
283 } else {
284 None
285 }
286 }
287
288 fn len(&self) -> usize {
289 self.entities.len()
290 }
291
292 #[allow(dead_code)]
293 fn ordered_keys(&self) -> Vec<String> {
294 if let Some(ref config) = self.sort_config {
295 let keys: Vec<String> = self
296 .sorted_keys
297 .keys()
298 .map(|sk| sk.entity_key.clone())
299 .collect();
300 match config.order {
301 SortOrder::Asc => keys,
302 SortOrder::Desc => keys.into_iter().rev().collect(),
303 }
304 } else {
305 self.entities.keys().cloned().collect()
306 }
307 }
308
309 fn ordered_values(&self) -> Vec<serde_json::Value> {
310 if let Some(ref config) = self.sort_config {
311 let values: Vec<serde_json::Value> = self
312 .sorted_keys
313 .keys()
314 .filter_map(|sk| self.entities.get(&sk.entity_key).cloned())
315 .collect();
316 match config.order {
317 SortOrder::Asc => values,
318 SortOrder::Desc => values.into_iter().rev().collect(),
319 }
320 } else {
321 self.entities.values().cloned().collect()
322 }
323 }
324}
325
326impl SharedStore {
327 pub fn new() -> Self {
328 Self::with_config(StoreConfig::default())
329 }
330
331 pub fn with_config(config: StoreConfig) -> Self {
332 let (updates_tx, _) = broadcast::channel(1000);
333 let (ready_tx, ready_rx) = watch::channel(HashSet::new());
334 Self {
335 views: Arc::new(RwLock::new(HashMap::new())),
336 view_configs: Arc::new(RwLock::new(HashMap::new())),
337 updates_tx,
338 ready_views: Arc::new(RwLock::new(HashSet::new())),
339 ready_tx,
340 ready_rx,
341 config,
342 }
343 }
344
345 fn enforce_max_entries(&self, view_data: &mut ViewData) {
346 if let Some(max) = self.config.max_entries_per_view {
347 while view_data.len() > max {
348 if let Some(evicted_key) = view_data.evict_oldest() {
349 tracing::debug!("evicted oldest entry: {}", evicted_key);
350 }
351 }
352 }
353 }
354
355 pub async fn apply_frame(&self, frame: Frame) {
356 let view_path = &frame.entity;
357 tracing::debug!(
358 "apply_frame: view={}, key={}, op={}",
359 view_path,
360 frame.key,
361 frame.op,
362 );
363
364 let operation = frame.operation();
365
366 if operation == Operation::Snapshot {
367 self.apply_snapshot(&frame).await;
368 return;
369 }
370
371 let sort_config = self.view_configs.read().await.get(view_path).cloned();
372
373 let mut views = self.views.write().await;
374 let view_data = views.entry(view_path.to_string()).or_insert_with(|| {
375 if let Some(config) = sort_config {
376 ViewData::with_sort_config(config)
377 } else {
378 ViewData::new()
379 }
380 });
381
382 let previous = view_data.entities.get(&frame.key).cloned();
383
384 let (current, patch) = match operation {
385 Operation::Upsert | Operation::Create => {
386 view_data.insert(frame.key.clone(), frame.data.clone());
387 self.enforce_max_entries(view_data);
388 (Some(frame.data), None)
389 }
390 Operation::Patch => {
391 let raw_patch = frame.data.clone();
392 let entry = view_data
393 .entities
394 .entry(frame.key.clone())
395 .or_insert_with(|| serde_json::json!({}));
396 deep_merge_with_append(entry, &frame.data, &frame.append, "");
397 let merged = entry.clone();
398 view_data.touch(&frame.key);
399 self.enforce_max_entries(view_data);
400 (Some(merged), Some(raw_patch))
401 }
402 Operation::Delete => {
403 view_data.remove(&frame.key);
404 (None, None)
405 }
406 Operation::Snapshot | Operation::Subscribed => unreachable!(),
407 };
408
409 let _ = self.updates_tx.send(StoreUpdate {
410 view: view_path.to_string(),
411 key: frame.key,
412 operation,
413 data: current,
414 previous,
415 patch,
416 });
417
418 self.mark_view_ready(view_path).await;
419 }
420
421 async fn apply_snapshot(&self, frame: &Frame) {
422 let view_path = &frame.entity;
423 let snapshot_entities = parse_snapshot_entities(&frame.data);
424
425 tracing::debug!(
426 "apply_snapshot: view={}, count={}",
427 view_path,
428 snapshot_entities.len()
429 );
430
431 let sort_config = self.view_configs.read().await.get(view_path).cloned();
432
433 let mut views = self.views.write().await;
434 let view_data = views.entry(view_path.to_string()).or_insert_with(|| {
435 if let Some(config) = sort_config {
436 ViewData::with_sort_config(config)
437 } else {
438 ViewData::new()
439 }
440 });
441
442 for entity in snapshot_entities {
443 let previous = view_data.entities.get(&entity.key).cloned();
444 view_data.insert(entity.key.clone(), entity.data.clone());
445
446 let _ = self.updates_tx.send(StoreUpdate {
447 view: view_path.to_string(),
448 key: entity.key,
449 operation: Operation::Upsert,
450 data: Some(entity.data),
451 previous,
452 patch: None,
453 });
454 }
455
456 self.enforce_max_entries(view_data);
457 drop(views);
458 self.mark_view_ready(view_path).await;
459 }
460
461 pub async fn mark_view_ready(&self, view: &str) {
462 let mut ready = self.ready_views.write().await;
463 if ready.insert(view.to_string()) {
464 let _ = self.ready_tx.send(ready.clone());
465 }
466 }
467
468 pub async fn wait_for_view_ready(&self, view: &str, timeout: std::time::Duration) -> bool {
469 if self.ready_views.read().await.contains(view) {
470 return true;
471 }
472
473 let mut rx = self.ready_rx.clone();
474 let deadline = tokio::time::Instant::now() + timeout;
475
476 loop {
477 let timeout_remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
478 if timeout_remaining.is_zero() {
479 return false;
480 }
481
482 tokio::select! {
483 result = rx.changed() => {
484 if result.is_err() {
485 return false;
486 }
487 if rx.borrow().contains(view) {
488 return true;
489 }
490 }
491 _ = tokio::time::sleep(timeout_remaining) => {
492 return false;
493 }
494 }
495 }
496 }
497
498 pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
499 let views = self.views.read().await;
500 views
501 .get(view)?
502 .entities
503 .get(key)
504 .and_then(|v| serde_json::from_value(v.clone()).ok())
505 }
506
507 pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
508 let views = self.views.read().await;
509 views
510 .get(view)
511 .map(|view_data| {
512 view_data
513 .ordered_values()
514 .into_iter()
515 .filter_map(|v| serde_json::from_value(v).ok())
516 .collect()
517 })
518 .unwrap_or_default()
519 }
520
521 pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
522 let views = self.views.read().await;
523 views
524 .get(view)
525 .map(|view_data| view_data.entities.clone())
526 .unwrap_or_default()
527 }
528
529 pub fn get_sync<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
537 let views = self.views.try_read().ok()?;
538 views
539 .get(view)?
540 .entities
541 .get(key)
542 .and_then(|v| serde_json::from_value(v.clone()).ok())
543 }
544
545 pub fn list_sync<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
553 let Some(views) = self.views.try_read().ok() else {
554 return Vec::new();
555 };
556 views
557 .get(view)
558 .map(|view_data| {
559 view_data
560 .ordered_values()
561 .into_iter()
562 .filter_map(|v| serde_json::from_value(v).ok())
563 .collect()
564 })
565 .unwrap_or_default()
566 }
567
568 pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
569 self.updates_tx.subscribe()
570 }
571
572 pub async fn apply_subscribed_frame(&self, frame: SubscribedFrame) {
573 let view_path = &frame.view;
574 tracing::debug!(
575 "apply_subscribed_frame: view={}, mode={:?}, sort={:?}",
576 view_path,
577 frame.mode,
578 frame.sort,
579 );
580
581 if let Some(sort_config) = frame.sort {
582 self.view_configs
583 .write()
584 .await
585 .insert(view_path.to_string(), sort_config.clone());
586
587 let mut views = self.views.write().await;
588 if let Some(view_data) = views.get_mut(view_path) {
589 view_data.set_sort_config(sort_config);
590 }
591 }
592 }
593
594 pub async fn get_view_sort_config(&self, view: &str) -> Option<SortConfig> {
595 self.view_configs.read().await.get(view).cloned()
596 }
597
598 pub async fn set_view_sort_config(&self, view: &str, config: SortConfig) {
599 self.view_configs
600 .write()
601 .await
602 .insert(view.to_string(), config.clone());
603
604 let mut views = self.views.write().await;
605 if let Some(view_data) = views.get_mut(view) {
606 view_data.set_sort_config(config);
607 }
608 }
609}
610
611impl Default for SharedStore {
612 fn default() -> Self {
613 Self::new()
614 }
615}
616
617impl Clone for SharedStore {
618 fn clone(&self) -> Self {
619 Self {
620 views: self.views.clone(),
621 view_configs: self.view_configs.clone(),
622 updates_tx: self.updates_tx.clone(),
623 ready_views: self.ready_views.clone(),
624 ready_tx: self.ready_tx.clone(),
625 ready_rx: self.ready_rx.clone(),
626 config: self.config.clone(),
627 }
628 }
629}