1use crate::frame::{Frame, Operation};
2use serde::de::DeserializeOwned;
3use serde_json::Value;
4use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6use tokio::sync::{broadcast, watch, RwLock};
7
8fn deep_merge(target: &mut Value, patch: &Value) {
9 match (target, patch) {
10 (Value::Object(target_map), Value::Object(patch_map)) => {
11 for (key, patch_value) in patch_map {
12 match target_map.get_mut(key) {
13 Some(target_value) => deep_merge(target_value, patch_value),
14 None => {
15 target_map.insert(key.clone(), patch_value.clone());
16 }
17 }
18 }
19 }
20 (target, patch) => {
21 *target = patch.clone();
22 }
23 }
24}
25
26#[derive(Debug, Clone)]
27pub struct StoreUpdate {
28 pub view: String,
29 pub key: String,
30 pub operation: Operation,
31 pub data: Option<serde_json::Value>,
32 pub previous: Option<serde_json::Value>,
33 pub patch: Option<serde_json::Value>,
36}
37
38pub struct SharedStore {
39 entities: Arc<RwLock<HashMap<String, HashMap<String, serde_json::Value>>>>,
40 updates_tx: broadcast::Sender<StoreUpdate>,
41 ready_views: Arc<RwLock<HashSet<String>>>,
42 ready_tx: watch::Sender<HashSet<String>>,
43 ready_rx: watch::Receiver<HashSet<String>>,
44}
45
46impl SharedStore {
47 pub fn new() -> Self {
48 let (updates_tx, _) = broadcast::channel(1000);
49 let (ready_tx, ready_rx) = watch::channel(HashSet::new());
50 Self {
51 entities: Arc::new(RwLock::new(HashMap::new())),
52 updates_tx,
53 ready_views: Arc::new(RwLock::new(HashSet::new())),
54 ready_tx,
55 ready_rx,
56 }
57 }
58
59 pub async fn apply_frame(&self, frame: Frame) {
60 let entity_name = extract_entity_name(&frame.entity);
61 tracing::debug!(
62 "apply_frame: entity={}, key={}, op={}",
63 entity_name,
64 frame.key,
65 frame.op,
66 );
67
68 let mut entities = self.entities.write().await;
69 let view_map = entities
70 .entry(entity_name.to_string())
71 .or_insert_with(HashMap::new);
72
73 let operation = frame.operation();
74
75 let previous = view_map.get(&frame.key).cloned();
76
77 let (current, patch) = match operation {
78 Operation::Upsert | Operation::Create => {
79 view_map.insert(frame.key.clone(), frame.data.clone());
80 (Some(frame.data), None)
81 }
82 Operation::Patch => {
83 let raw_patch = frame.data.clone();
84 let entry = view_map
85 .entry(frame.key.clone())
86 .or_insert_with(|| serde_json::json!({}));
87 deep_merge(entry, &frame.data);
88 (Some(entry.clone()), Some(raw_patch))
89 }
90 Operation::Delete => {
91 view_map.remove(&frame.key);
92 (None, None)
93 }
94 };
95
96 let _ = self.updates_tx.send(StoreUpdate {
97 view: entity_name.to_string(),
98 key: frame.key,
99 operation,
100 data: current,
101 previous,
102 patch,
103 });
104
105 self.mark_view_ready(entity_name).await;
106 }
107
108 pub async fn mark_view_ready(&self, view: &str) {
109 let mut ready = self.ready_views.write().await;
110 if ready.insert(view.to_string()) {
111 let _ = self.ready_tx.send(ready.clone());
112 }
113 }
114
115 pub async fn wait_for_view_ready(&self, view: &str, timeout: std::time::Duration) -> bool {
116 let entity_name = extract_entity_name(view);
117
118 if self.ready_views.read().await.contains(entity_name) {
119 return true;
120 }
121
122 let mut rx = self.ready_rx.clone();
123 let deadline = tokio::time::Instant::now() + timeout;
124
125 loop {
126 let timeout_remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
127 if timeout_remaining.is_zero() {
128 return false;
129 }
130
131 tokio::select! {
132 result = rx.changed() => {
133 if result.is_err() {
134 return false;
135 }
136 if rx.borrow().contains(entity_name) {
137 return true;
138 }
139 }
140 _ = tokio::time::sleep(timeout_remaining) => {
141 return false;
142 }
143 }
144 }
145 }
146
147 pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
148 let entities = self.entities.read().await;
149 entities
150 .get(view)?
151 .get(key)
152 .and_then(|v| serde_json::from_value(v.clone()).ok())
153 }
154
155 pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
156 let entities = self.entities.read().await;
157 entities
158 .get(view)
159 .map(|map| {
160 map.values()
161 .filter_map(|v| serde_json::from_value(v.clone()).ok())
162 .collect()
163 })
164 .unwrap_or_default()
165 }
166
167 pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
168 let entities = self.entities.read().await;
169 entities.get(view).cloned().unwrap_or_default()
170 }
171
172 pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
173 self.updates_tx.subscribe()
174 }
175}
176
177fn extract_entity_name(view_path: &str) -> &str {
178 view_path.split('/').next().unwrap_or(view_path)
179}
180
181impl Default for SharedStore {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187impl Clone for SharedStore {
188 fn clone(&self) -> Self {
189 Self {
190 entities: self.entities.clone(),
191 updates_tx: self.updates_tx.clone(),
192 ready_views: self.ready_views.clone(),
193 ready_tx: self.ready_tx.clone(),
194 ready_rx: self.ready_rx.clone(),
195 }
196 }
197}