hyperstack_sdk/
store.rs

1use crate::frame::{Frame, Operation};
2use serde::de::DeserializeOwned;
3use serde_json::Value;
4use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6use tokio::sync::{broadcast, watch, RwLock};
7
8fn deep_merge(target: &mut Value, patch: &Value) {
9    match (target, patch) {
10        (Value::Object(target_map), Value::Object(patch_map)) => {
11            for (key, patch_value) in patch_map {
12                match target_map.get_mut(key) {
13                    Some(target_value) => deep_merge(target_value, patch_value),
14                    None => {
15                        target_map.insert(key.clone(), patch_value.clone());
16                    }
17                }
18            }
19        }
20        (target, patch) => {
21            *target = patch.clone();
22        }
23    }
24}
25
26#[derive(Debug, Clone)]
27pub struct StoreUpdate {
28    pub view: String,
29    pub key: String,
30    pub operation: Operation,
31    pub data: Option<serde_json::Value>,
32    pub previous: Option<serde_json::Value>,
33    /// The raw patch data for Patch operations (before merging into full state).
34    /// This allows consumers to see exactly what fields changed without diffing.
35    pub patch: Option<serde_json::Value>,
36}
37
38pub struct SharedStore {
39    entities: Arc<RwLock<HashMap<String, HashMap<String, serde_json::Value>>>>,
40    updates_tx: broadcast::Sender<StoreUpdate>,
41    ready_views: Arc<RwLock<HashSet<String>>>,
42    ready_tx: watch::Sender<HashSet<String>>,
43    ready_rx: watch::Receiver<HashSet<String>>,
44}
45
46impl SharedStore {
47    pub fn new() -> Self {
48        let (updates_tx, _) = broadcast::channel(1000);
49        let (ready_tx, ready_rx) = watch::channel(HashSet::new());
50        Self {
51            entities: Arc::new(RwLock::new(HashMap::new())),
52            updates_tx,
53            ready_views: Arc::new(RwLock::new(HashSet::new())),
54            ready_tx,
55            ready_rx,
56        }
57    }
58
59    pub async fn apply_frame(&self, frame: Frame) {
60        let entity_name = extract_entity_name(&frame.entity);
61        tracing::debug!(
62            "apply_frame: entity={}, key={}, op={}",
63            entity_name,
64            frame.key,
65            frame.op,
66        );
67
68        let mut entities = self.entities.write().await;
69        let view_map = entities
70            .entry(entity_name.to_string())
71            .or_insert_with(HashMap::new);
72
73        let operation = frame.operation();
74
75        let previous = view_map.get(&frame.key).cloned();
76
77        let (current, patch) = match operation {
78            Operation::Upsert | Operation::Create => {
79                view_map.insert(frame.key.clone(), frame.data.clone());
80                (Some(frame.data), None)
81            }
82            Operation::Patch => {
83                let raw_patch = frame.data.clone();
84                let entry = view_map
85                    .entry(frame.key.clone())
86                    .or_insert_with(|| serde_json::json!({}));
87                deep_merge(entry, &frame.data);
88                (Some(entry.clone()), Some(raw_patch))
89            }
90            Operation::Delete => {
91                view_map.remove(&frame.key);
92                (None, None)
93            }
94        };
95
96        let _ = self.updates_tx.send(StoreUpdate {
97            view: entity_name.to_string(),
98            key: frame.key,
99            operation,
100            data: current,
101            previous,
102            patch,
103        });
104
105        self.mark_view_ready(entity_name).await;
106    }
107
108    pub async fn mark_view_ready(&self, view: &str) {
109        let mut ready = self.ready_views.write().await;
110        if ready.insert(view.to_string()) {
111            let _ = self.ready_tx.send(ready.clone());
112        }
113    }
114
115    pub async fn wait_for_view_ready(&self, view: &str, timeout: std::time::Duration) -> bool {
116        let entity_name = extract_entity_name(view);
117
118        if self.ready_views.read().await.contains(entity_name) {
119            return true;
120        }
121
122        let mut rx = self.ready_rx.clone();
123        let deadline = tokio::time::Instant::now() + timeout;
124
125        loop {
126            let timeout_remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
127            if timeout_remaining.is_zero() {
128                return false;
129            }
130
131            tokio::select! {
132                result = rx.changed() => {
133                    if result.is_err() {
134                        return false;
135                    }
136                    if rx.borrow().contains(entity_name) {
137                        return true;
138                    }
139                }
140                _ = tokio::time::sleep(timeout_remaining) => {
141                    return false;
142                }
143            }
144        }
145    }
146
147    pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
148        let entities = self.entities.read().await;
149        entities
150            .get(view)?
151            .get(key)
152            .and_then(|v| serde_json::from_value(v.clone()).ok())
153    }
154
155    pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
156        let entities = self.entities.read().await;
157        entities
158            .get(view)
159            .map(|map| {
160                map.values()
161                    .filter_map(|v| serde_json::from_value(v.clone()).ok())
162                    .collect()
163            })
164            .unwrap_or_default()
165    }
166
167    pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
168        let entities = self.entities.read().await;
169        entities.get(view).cloned().unwrap_or_default()
170    }
171
172    pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
173        self.updates_tx.subscribe()
174    }
175}
176
177fn extract_entity_name(view_path: &str) -> &str {
178    view_path.split('/').next().unwrap_or(view_path)
179}
180
181impl Default for SharedStore {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187impl Clone for SharedStore {
188    fn clone(&self) -> Self {
189        Self {
190            entities: self.entities.clone(),
191            updates_tx: self.updates_tx.clone(),
192            ready_views: self.ready_views.clone(),
193            ready_tx: self.ready_tx.clone(),
194            ready_rx: self.ready_rx.clone(),
195        }
196    }
197}