hyperstack_sdk/
store.rs

1use crate::frame::{Frame, Operation};
2use serde::de::DeserializeOwned;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::{broadcast, RwLock};
6
7#[derive(Debug, Clone)]
8pub struct StoreUpdate {
9    pub view: String,
10    pub key: String,
11    pub operation: Operation,
12    pub data: Option<serde_json::Value>,
13}
14
15pub struct SharedStore {
16    entities: Arc<RwLock<HashMap<String, HashMap<String, serde_json::Value>>>>,
17    updates_tx: broadcast::Sender<StoreUpdate>,
18}
19
20impl SharedStore {
21    pub fn new() -> Self {
22        let (updates_tx, _) = broadcast::channel(1000);
23        Self {
24            entities: Arc::new(RwLock::new(HashMap::new())),
25            updates_tx,
26        }
27    }
28
29    pub async fn apply_frame(&self, frame: Frame) {
30        let entity_name = extract_entity_name(&frame.entity);
31        let data = unwrap_list_item(&frame.data);
32        tracing::debug!(
33            "apply_frame: entity={}, key={}, op={}, has_item={}",
34            entity_name,
35            frame.key,
36            frame.op,
37            frame.data.get("item").is_some()
38        );
39
40        let mut entities = self.entities.write().await;
41        let view_map = entities
42            .entry(entity_name.to_string())
43            .or_insert_with(HashMap::new);
44
45        let operation = frame.operation();
46
47        match operation {
48            Operation::Upsert | Operation::Create => {
49                view_map.insert(frame.key.clone(), data.clone());
50            }
51            Operation::Patch => {
52                let entry = view_map
53                    .entry(frame.key.clone())
54                    .or_insert_with(|| serde_json::json!({}));
55                if let (Some(obj), Some(patch)) = (entry.as_object_mut(), data.as_object()) {
56                    for (k, v) in patch {
57                        obj.insert(k.clone(), v.clone());
58                    }
59                }
60            }
61            Operation::Delete => {
62                view_map.remove(&frame.key);
63            }
64        }
65
66        let _ = self.updates_tx.send(StoreUpdate {
67            view: entity_name.to_string(),
68            key: frame.key,
69            operation,
70            data: Some(data),
71        });
72    }
73
74    pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
75        let entities = self.entities.read().await;
76        entities
77            .get(view)?
78            .get(key)
79            .and_then(|v| serde_json::from_value(v.clone()).ok())
80    }
81
82    pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
83        let entities = self.entities.read().await;
84        entities
85            .get(view)
86            .map(|map| {
87                map.values()
88                    .filter_map(|v| serde_json::from_value(v.clone()).ok())
89                    .collect()
90            })
91            .unwrap_or_default()
92    }
93
94    pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
95        let entities = self.entities.read().await;
96        entities.get(view).cloned().unwrap_or_default()
97    }
98
99    pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
100        self.updates_tx.subscribe()
101    }
102}
103
104fn extract_entity_name(view_path: &str) -> &str {
105    view_path.split('/').next().unwrap_or(view_path)
106}
107
108fn unwrap_list_item(data: &serde_json::Value) -> serde_json::Value {
109    if let Some(item) = data.get("item") {
110        item.clone()
111    } else {
112        data.clone()
113    }
114}
115
116impl Default for SharedStore {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122impl Clone for SharedStore {
123    fn clone(&self) -> Self {
124        Self {
125            entities: self.entities.clone(),
126            updates_tx: self.updates_tx.clone(),
127        }
128    }
129}