1use crate::frame::{parse_snapshot_entities, Frame, Operation};
2use serde::de::DeserializeOwned;
3use serde_json::Value;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::Arc;
6use tokio::sync::{broadcast, watch, RwLock};
7
8pub const DEFAULT_MAX_ENTRIES_PER_VIEW: usize = 10_000;
11
12#[derive(Debug, Clone)]
14pub struct StoreConfig {
15 pub max_entries_per_view: Option<usize>,
19}
20
21impl Default for StoreConfig {
22 fn default() -> Self {
23 Self {
24 max_entries_per_view: Some(DEFAULT_MAX_ENTRIES_PER_VIEW),
25 }
26 }
27}
28
29struct ViewData {
31 entities: HashMap<String, serde_json::Value>,
33 access_order: VecDeque<String>,
36}
37
38fn deep_merge_with_append(
39 target: &mut Value,
40 patch: &Value,
41 append_paths: &[String],
42 current_path: &str,
43) {
44 match (target, patch) {
45 (Value::Object(target_map), Value::Object(patch_map)) => {
46 for (key, patch_value) in patch_map {
47 let field_path = if current_path.is_empty() {
48 key.clone()
49 } else {
50 format!("{}.{}", current_path, key)
51 };
52 match target_map.get_mut(key) {
53 Some(target_value) => {
54 deep_merge_with_append(target_value, patch_value, append_paths, &field_path)
55 }
56 None => {
57 target_map.insert(key.clone(), patch_value.clone());
58 }
59 }
60 }
61 }
62 (Value::Array(target_arr), Value::Array(patch_arr))
63 if append_paths.contains(¤t_path.to_string()) =>
64 {
65 target_arr.extend(patch_arr.iter().cloned());
66 }
67 (target, patch) => {
68 *target = patch.clone();
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
74pub struct StoreUpdate {
75 pub view: String,
76 pub key: String,
77 pub operation: Operation,
78 pub data: Option<serde_json::Value>,
79 pub previous: Option<serde_json::Value>,
80 pub patch: Option<serde_json::Value>,
83}
84
85pub struct SharedStore {
86 views: Arc<RwLock<HashMap<String, ViewData>>>,
87 updates_tx: broadcast::Sender<StoreUpdate>,
88 ready_views: Arc<RwLock<HashSet<String>>>,
89 ready_tx: watch::Sender<HashSet<String>>,
90 ready_rx: watch::Receiver<HashSet<String>>,
91 config: StoreConfig,
92}
93
94impl ViewData {
95 fn new() -> Self {
96 Self {
97 entities: HashMap::new(),
98 access_order: VecDeque::new(),
99 }
100 }
101
102 fn touch(&mut self, key: &str) {
103 self.access_order.retain(|k| k != key);
104 self.access_order.push_back(key.to_string());
105 }
106
107 fn insert(&mut self, key: String, value: serde_json::Value) {
108 if !self.entities.contains_key(&key) {
109 self.access_order.push_back(key.clone());
110 } else {
111 self.touch(&key);
112 }
113 self.entities.insert(key, value);
114 }
115
116 fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
117 self.access_order.retain(|k| k != key);
118 self.entities.remove(key)
119 }
120
121 fn evict_oldest(&mut self) -> Option<String> {
122 if let Some(oldest_key) = self.access_order.pop_front() {
123 self.entities.remove(&oldest_key);
124 Some(oldest_key)
125 } else {
126 None
127 }
128 }
129
130 fn len(&self) -> usize {
131 self.entities.len()
132 }
133}
134
135impl SharedStore {
136 pub fn new() -> Self {
137 Self::with_config(StoreConfig::default())
138 }
139
140 pub fn with_config(config: StoreConfig) -> Self {
141 let (updates_tx, _) = broadcast::channel(1000);
142 let (ready_tx, ready_rx) = watch::channel(HashSet::new());
143 Self {
144 views: Arc::new(RwLock::new(HashMap::new())),
145 updates_tx,
146 ready_views: Arc::new(RwLock::new(HashSet::new())),
147 ready_tx,
148 ready_rx,
149 config,
150 }
151 }
152
153 fn enforce_max_entries(&self, view_data: &mut ViewData) {
154 if let Some(max) = self.config.max_entries_per_view {
155 while view_data.len() > max {
156 if let Some(evicted_key) = view_data.evict_oldest() {
157 tracing::debug!("evicted oldest entry: {}", evicted_key);
158 }
159 }
160 }
161 }
162
163 pub async fn apply_frame(&self, frame: Frame) {
164 let entity_name = extract_entity_name(&frame.entity);
165 tracing::debug!(
166 "apply_frame: entity={}, key={}, op={}",
167 entity_name,
168 frame.key,
169 frame.op,
170 );
171
172 let operation = frame.operation();
173
174 if operation == Operation::Snapshot {
175 self.apply_snapshot(&frame).await;
176 return;
177 }
178
179 let mut views = self.views.write().await;
180 let view_data = views
181 .entry(entity_name.to_string())
182 .or_insert_with(ViewData::new);
183
184 let previous = view_data.entities.get(&frame.key).cloned();
185
186 let (current, patch) = match operation {
187 Operation::Upsert | Operation::Create => {
188 view_data.insert(frame.key.clone(), frame.data.clone());
189 self.enforce_max_entries(view_data);
190 (Some(frame.data), None)
191 }
192 Operation::Patch => {
193 let raw_patch = frame.data.clone();
194 let entry = view_data
195 .entities
196 .entry(frame.key.clone())
197 .or_insert_with(|| serde_json::json!({}));
198 deep_merge_with_append(entry, &frame.data, &frame.append, "");
199 let merged = entry.clone();
200 view_data.touch(&frame.key);
201 self.enforce_max_entries(view_data);
202 (Some(merged), Some(raw_patch))
203 }
204 Operation::Delete => {
205 view_data.remove(&frame.key);
206 (None, None)
207 }
208 Operation::Snapshot => unreachable!(),
209 };
210
211 let _ = self.updates_tx.send(StoreUpdate {
212 view: entity_name.to_string(),
213 key: frame.key,
214 operation,
215 data: current,
216 previous,
217 patch,
218 });
219
220 self.mark_view_ready(entity_name).await;
221 }
222
223 async fn apply_snapshot(&self, frame: &Frame) {
224 let entity_name = extract_entity_name(&frame.entity);
225 let snapshot_entities = parse_snapshot_entities(&frame.data);
226
227 tracing::debug!(
228 "apply_snapshot: entity={}, count={}",
229 entity_name,
230 snapshot_entities.len()
231 );
232
233 let mut views = self.views.write().await;
234 let view_data = views
235 .entry(entity_name.to_string())
236 .or_insert_with(ViewData::new);
237
238 for entity in snapshot_entities {
239 let previous = view_data.entities.get(&entity.key).cloned();
240 view_data.insert(entity.key.clone(), entity.data.clone());
241
242 let _ = self.updates_tx.send(StoreUpdate {
243 view: entity_name.to_string(),
244 key: entity.key,
245 operation: Operation::Upsert,
246 data: Some(entity.data),
247 previous,
248 patch: None,
249 });
250 }
251
252 self.enforce_max_entries(view_data);
253 drop(views);
254 self.mark_view_ready(entity_name).await;
255 }
256
257 pub async fn mark_view_ready(&self, view: &str) {
258 let mut ready = self.ready_views.write().await;
259 if ready.insert(view.to_string()) {
260 let _ = self.ready_tx.send(ready.clone());
261 }
262 }
263
264 pub async fn wait_for_view_ready(&self, view: &str, timeout: std::time::Duration) -> bool {
265 let entity_name = extract_entity_name(view);
266
267 if self.ready_views.read().await.contains(entity_name) {
268 return true;
269 }
270
271 let mut rx = self.ready_rx.clone();
272 let deadline = tokio::time::Instant::now() + timeout;
273
274 loop {
275 let timeout_remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
276 if timeout_remaining.is_zero() {
277 return false;
278 }
279
280 tokio::select! {
281 result = rx.changed() => {
282 if result.is_err() {
283 return false;
284 }
285 if rx.borrow().contains(entity_name) {
286 return true;
287 }
288 }
289 _ = tokio::time::sleep(timeout_remaining) => {
290 return false;
291 }
292 }
293 }
294 }
295
296 pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
297 let views = self.views.read().await;
298 views
299 .get(view)?
300 .entities
301 .get(key)
302 .and_then(|v| serde_json::from_value(v.clone()).ok())
303 }
304
305 pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
306 let views = self.views.read().await;
307 views
308 .get(view)
309 .map(|view_data| {
310 view_data
311 .entities
312 .values()
313 .filter_map(|v| serde_json::from_value(v.clone()).ok())
314 .collect()
315 })
316 .unwrap_or_default()
317 }
318
319 pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
320 let views = self.views.read().await;
321 views
322 .get(view)
323 .map(|view_data| view_data.entities.clone())
324 .unwrap_or_default()
325 }
326
327 pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
328 self.updates_tx.subscribe()
329 }
330}
331
332fn extract_entity_name(view_path: &str) -> &str {
333 view_path.split('/').next().unwrap_or(view_path)
334}
335
336impl Default for SharedStore {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342impl Clone for SharedStore {
343 fn clone(&self) -> Self {
344 Self {
345 views: self.views.clone(),
346 updates_tx: self.updates_tx.clone(),
347 ready_views: self.ready_views.clone(),
348 ready_tx: self.ready_tx.clone(),
349 ready_rx: self.ready_rx.clone(),
350 config: self.config.clone(),
351 }
352 }
353}