1use crate::frame::{Frame, Operation};
2use serde::de::DeserializeOwned;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::{broadcast, RwLock};
6
7#[derive(Debug, Clone)]
8pub struct StoreUpdate {
9 pub view: String,
10 pub key: String,
11 pub operation: Operation,
12 pub data: Option<serde_json::Value>,
13}
14
15pub struct SharedStore {
16 entities: Arc<RwLock<HashMap<String, HashMap<String, serde_json::Value>>>>,
17 updates_tx: broadcast::Sender<StoreUpdate>,
18}
19
20impl SharedStore {
21 pub fn new() -> Self {
22 let (updates_tx, _) = broadcast::channel(1000);
23 Self {
24 entities: Arc::new(RwLock::new(HashMap::new())),
25 updates_tx,
26 }
27 }
28
29 pub async fn apply_frame(&self, frame: Frame) {
30 let entity_name = extract_entity_name(&frame.entity);
31 let data = unwrap_list_item(&frame.data);
32 tracing::debug!(
33 "apply_frame: entity={}, key={}, op={}, has_item={}",
34 entity_name,
35 frame.key,
36 frame.op,
37 frame.data.get("item").is_some()
38 );
39
40 let mut entities = self.entities.write().await;
41 let view_map = entities
42 .entry(entity_name.to_string())
43 .or_insert_with(HashMap::new);
44
45 let operation = frame.operation();
46
47 match operation {
48 Operation::Upsert | Operation::Create => {
49 view_map.insert(frame.key.clone(), data.clone());
50 }
51 Operation::Patch => {
52 let entry = view_map
53 .entry(frame.key.clone())
54 .or_insert_with(|| serde_json::json!({}));
55 if let (Some(obj), Some(patch)) = (entry.as_object_mut(), data.as_object()) {
56 for (k, v) in patch {
57 obj.insert(k.clone(), v.clone());
58 }
59 }
60 }
61 Operation::Delete => {
62 view_map.remove(&frame.key);
63 }
64 }
65
66 let _ = self.updates_tx.send(StoreUpdate {
67 view: entity_name.to_string(),
68 key: frame.key,
69 operation,
70 data: Some(data),
71 });
72 }
73
74 pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
75 let entities = self.entities.read().await;
76 entities
77 .get(view)?
78 .get(key)
79 .and_then(|v| serde_json::from_value(v.clone()).ok())
80 }
81
82 pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
83 let entities = self.entities.read().await;
84 entities
85 .get(view)
86 .map(|map| {
87 map.values()
88 .filter_map(|v| serde_json::from_value(v.clone()).ok())
89 .collect()
90 })
91 .unwrap_or_default()
92 }
93
94 pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
95 let entities = self.entities.read().await;
96 entities.get(view).cloned().unwrap_or_default()
97 }
98
99 pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
100 self.updates_tx.subscribe()
101 }
102}
103
104fn extract_entity_name(view_path: &str) -> &str {
105 view_path.split('/').next().unwrap_or(view_path)
106}
107
108fn unwrap_list_item(data: &serde_json::Value) -> serde_json::Value {
109 if let Some(item) = data.get("item") {
110 item.clone()
111 } else {
112 data.clone()
113 }
114}
115
116impl Default for SharedStore {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122impl Clone for SharedStore {
123 fn clone(&self) -> Self {
124 Self {
125 entities: self.entities.clone(),
126 updates_tx: self.updates_tx.clone(),
127 }
128 }
129}