hyperstack_sdk/
store.rs

1use crate::frame::{parse_snapshot_entities, Frame, Operation};
2use serde::de::DeserializeOwned;
3use serde_json::Value;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::sync::Arc;
6use tokio::sync::{broadcast, watch, RwLock};
7
8/// Default maximum number of entries per view before LRU eviction kicks in.
9/// Set to 10,000 to provide a reasonable balance between memory usage and data retention.
10pub const DEFAULT_MAX_ENTRIES_PER_VIEW: usize = 10_000;
11
12/// Configuration for the SharedStore.
13#[derive(Debug, Clone)]
14pub struct StoreConfig {
15    /// Maximum number of entries to keep per view. When exceeded, oldest entries
16    /// are evicted using LRU (Least Recently Used) strategy.
17    /// Set to `None` to disable size limiting (not recommended for long-running clients).
18    pub max_entries_per_view: Option<usize>,
19}
20
21impl Default for StoreConfig {
22    fn default() -> Self {
23        Self {
24            max_entries_per_view: Some(DEFAULT_MAX_ENTRIES_PER_VIEW),
25        }
26    }
27}
28
29/// Tracks access order for LRU eviction within a view.
30struct ViewData {
31    /// The actual entity data keyed by entity key.
32    entities: HashMap<String, serde_json::Value>,
33    /// Access order queue - front is oldest, back is most recent.
34    /// Used for LRU eviction when max_entries is exceeded.
35    access_order: VecDeque<String>,
36}
37
38fn deep_merge_with_append(
39    target: &mut Value,
40    patch: &Value,
41    append_paths: &[String],
42    current_path: &str,
43) {
44    match (target, patch) {
45        (Value::Object(target_map), Value::Object(patch_map)) => {
46            for (key, patch_value) in patch_map {
47                let field_path = if current_path.is_empty() {
48                    key.clone()
49                } else {
50                    format!("{}.{}", current_path, key)
51                };
52                match target_map.get_mut(key) {
53                    Some(target_value) => {
54                        deep_merge_with_append(target_value, patch_value, append_paths, &field_path)
55                    }
56                    None => {
57                        target_map.insert(key.clone(), patch_value.clone());
58                    }
59                }
60            }
61        }
62        (Value::Array(target_arr), Value::Array(patch_arr))
63            if append_paths.contains(&current_path.to_string()) =>
64        {
65            target_arr.extend(patch_arr.iter().cloned());
66        }
67        (target, patch) => {
68            *target = patch.clone();
69        }
70    }
71}
72
73#[derive(Debug, Clone)]
74pub struct StoreUpdate {
75    pub view: String,
76    pub key: String,
77    pub operation: Operation,
78    pub data: Option<serde_json::Value>,
79    pub previous: Option<serde_json::Value>,
80    /// The raw patch data for Patch operations (before merging into full state).
81    /// This allows consumers to see exactly what fields changed without diffing.
82    pub patch: Option<serde_json::Value>,
83}
84
85pub struct SharedStore {
86    views: Arc<RwLock<HashMap<String, ViewData>>>,
87    updates_tx: broadcast::Sender<StoreUpdate>,
88    ready_views: Arc<RwLock<HashSet<String>>>,
89    ready_tx: watch::Sender<HashSet<String>>,
90    ready_rx: watch::Receiver<HashSet<String>>,
91    config: StoreConfig,
92}
93
94impl ViewData {
95    fn new() -> Self {
96        Self {
97            entities: HashMap::new(),
98            access_order: VecDeque::new(),
99        }
100    }
101
102    fn touch(&mut self, key: &str) {
103        self.access_order.retain(|k| k != key);
104        self.access_order.push_back(key.to_string());
105    }
106
107    fn insert(&mut self, key: String, value: serde_json::Value) {
108        if !self.entities.contains_key(&key) {
109            self.access_order.push_back(key.clone());
110        } else {
111            self.touch(&key);
112        }
113        self.entities.insert(key, value);
114    }
115
116    fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
117        self.access_order.retain(|k| k != key);
118        self.entities.remove(key)
119    }
120
121    fn evict_oldest(&mut self) -> Option<String> {
122        if let Some(oldest_key) = self.access_order.pop_front() {
123            self.entities.remove(&oldest_key);
124            Some(oldest_key)
125        } else {
126            None
127        }
128    }
129
130    fn len(&self) -> usize {
131        self.entities.len()
132    }
133}
134
135impl SharedStore {
136    pub fn new() -> Self {
137        Self::with_config(StoreConfig::default())
138    }
139
140    pub fn with_config(config: StoreConfig) -> Self {
141        let (updates_tx, _) = broadcast::channel(1000);
142        let (ready_tx, ready_rx) = watch::channel(HashSet::new());
143        Self {
144            views: Arc::new(RwLock::new(HashMap::new())),
145            updates_tx,
146            ready_views: Arc::new(RwLock::new(HashSet::new())),
147            ready_tx,
148            ready_rx,
149            config,
150        }
151    }
152
153    fn enforce_max_entries(&self, view_data: &mut ViewData) {
154        if let Some(max) = self.config.max_entries_per_view {
155            while view_data.len() > max {
156                if let Some(evicted_key) = view_data.evict_oldest() {
157                    tracing::debug!("evicted oldest entry: {}", evicted_key);
158                }
159            }
160        }
161    }
162
163    pub async fn apply_frame(&self, frame: Frame) {
164        let entity_name = extract_entity_name(&frame.entity);
165        tracing::debug!(
166            "apply_frame: entity={}, key={}, op={}",
167            entity_name,
168            frame.key,
169            frame.op,
170        );
171
172        let operation = frame.operation();
173
174        if operation == Operation::Snapshot {
175            self.apply_snapshot(&frame).await;
176            return;
177        }
178
179        let mut views = self.views.write().await;
180        let view_data = views
181            .entry(entity_name.to_string())
182            .or_insert_with(ViewData::new);
183
184        let previous = view_data.entities.get(&frame.key).cloned();
185
186        let (current, patch) = match operation {
187            Operation::Upsert | Operation::Create => {
188                view_data.insert(frame.key.clone(), frame.data.clone());
189                self.enforce_max_entries(view_data);
190                (Some(frame.data), None)
191            }
192            Operation::Patch => {
193                let raw_patch = frame.data.clone();
194                let entry = view_data
195                    .entities
196                    .entry(frame.key.clone())
197                    .or_insert_with(|| serde_json::json!({}));
198                deep_merge_with_append(entry, &frame.data, &frame.append, "");
199                let merged = entry.clone();
200                view_data.touch(&frame.key);
201                self.enforce_max_entries(view_data);
202                (Some(merged), Some(raw_patch))
203            }
204            Operation::Delete => {
205                view_data.remove(&frame.key);
206                (None, None)
207            }
208            Operation::Snapshot => unreachable!(),
209        };
210
211        let _ = self.updates_tx.send(StoreUpdate {
212            view: entity_name.to_string(),
213            key: frame.key,
214            operation,
215            data: current,
216            previous,
217            patch,
218        });
219
220        self.mark_view_ready(entity_name).await;
221    }
222
223    async fn apply_snapshot(&self, frame: &Frame) {
224        let entity_name = extract_entity_name(&frame.entity);
225        let snapshot_entities = parse_snapshot_entities(&frame.data);
226
227        tracing::debug!(
228            "apply_snapshot: entity={}, count={}",
229            entity_name,
230            snapshot_entities.len()
231        );
232
233        let mut views = self.views.write().await;
234        let view_data = views
235            .entry(entity_name.to_string())
236            .or_insert_with(ViewData::new);
237
238        for entity in snapshot_entities {
239            let previous = view_data.entities.get(&entity.key).cloned();
240            view_data.insert(entity.key.clone(), entity.data.clone());
241
242            let _ = self.updates_tx.send(StoreUpdate {
243                view: entity_name.to_string(),
244                key: entity.key,
245                operation: Operation::Upsert,
246                data: Some(entity.data),
247                previous,
248                patch: None,
249            });
250        }
251
252        self.enforce_max_entries(view_data);
253        drop(views);
254        self.mark_view_ready(entity_name).await;
255    }
256
257    pub async fn mark_view_ready(&self, view: &str) {
258        let mut ready = self.ready_views.write().await;
259        if ready.insert(view.to_string()) {
260            let _ = self.ready_tx.send(ready.clone());
261        }
262    }
263
264    pub async fn wait_for_view_ready(&self, view: &str, timeout: std::time::Duration) -> bool {
265        let entity_name = extract_entity_name(view);
266
267        if self.ready_views.read().await.contains(entity_name) {
268            return true;
269        }
270
271        let mut rx = self.ready_rx.clone();
272        let deadline = tokio::time::Instant::now() + timeout;
273
274        loop {
275            let timeout_remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
276            if timeout_remaining.is_zero() {
277                return false;
278            }
279
280            tokio::select! {
281                result = rx.changed() => {
282                    if result.is_err() {
283                        return false;
284                    }
285                    if rx.borrow().contains(entity_name) {
286                        return true;
287                    }
288                }
289                _ = tokio::time::sleep(timeout_remaining) => {
290                    return false;
291                }
292            }
293        }
294    }
295
296    pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
297        let views = self.views.read().await;
298        views
299            .get(view)?
300            .entities
301            .get(key)
302            .and_then(|v| serde_json::from_value(v.clone()).ok())
303    }
304
305    pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
306        let views = self.views.read().await;
307        views
308            .get(view)
309            .map(|view_data| {
310                view_data
311                    .entities
312                    .values()
313                    .filter_map(|v| serde_json::from_value(v.clone()).ok())
314                    .collect()
315            })
316            .unwrap_or_default()
317    }
318
319    pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
320        let views = self.views.read().await;
321        views
322            .get(view)
323            .map(|view_data| view_data.entities.clone())
324            .unwrap_or_default()
325    }
326
327    pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
328        self.updates_tx.subscribe()
329    }
330}
331
332fn extract_entity_name(view_path: &str) -> &str {
333    view_path.split('/').next().unwrap_or(view_path)
334}
335
336impl Default for SharedStore {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342impl Clone for SharedStore {
343    fn clone(&self) -> Self {
344        Self {
345            views: self.views.clone(),
346            updates_tx: self.updates_tx.clone(),
347            ready_views: self.ready_views.clone(),
348            ready_tx: self.ready_tx.clone(),
349            ready_rx: self.ready_rx.clone(),
350            config: self.config.clone(),
351        }
352    }
353}