hyperstack_sdk/
state.rs

1use crate::mutation::Mode;
2use serde::{de::DeserializeOwned, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::marker::PhantomData;
6use std::sync::Arc;
7use tokio::sync::{broadcast, RwLock};
8
9enum StoreData<T> {
10    Kv(HashMap<String, T>),
11    Append(Vec<T>),
12    List(Vec<T>),
13}
14
15pub type Update<T> = (String, T);
16
17pub struct EntityStore<T> {
18    mode: Mode,
19    data: Arc<RwLock<StoreData<T>>>,
20    update_tx: broadcast::Sender<Update<T>>,
21    _phantom: PhantomData<T>,
22}
23
24impl<T> EntityStore<T>
25where
26    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
27{
28    pub fn new(mode: Mode) -> Self {
29        let data = match mode {
30            Mode::Kv | Mode::State => StoreData::Kv(HashMap::new()),
31            Mode::Append => StoreData::Append(Vec::new()),
32            Mode::List => StoreData::List(Vec::new()),
33        };
34
35        let (update_tx, _) = broadcast::channel(1000);
36
37        Self {
38            mode,
39            data: Arc::new(RwLock::new(data)),
40            update_tx,
41            _phantom: PhantomData,
42        }
43    }
44
45    pub fn subscribe(&self) -> broadcast::Receiver<Update<T>> {
46        self.update_tx.subscribe()
47    }
48
49    pub async fn get(&self, key: &str) -> Option<T> {
50        let data = self.data.read().await;
51        match &*data {
52            StoreData::Kv(map) => map.get(key).cloned(),
53            _ => None,
54        }
55    }
56
57    pub async fn all(&self) -> HashMap<String, T> {
58        let data = self.data.read().await;
59        match &*data {
60            StoreData::Kv(map) => map.clone(),
61            _ => HashMap::new(),
62        }
63    }
64
65    pub async fn all_vec(&self) -> Vec<T> {
66        let data = self.data.read().await;
67        match &*data {
68            StoreData::Append(vec) | StoreData::List(vec) => vec.clone(),
69            _ => Vec::new(),
70        }
71    }
72
73    pub fn mode(&self) -> &Mode {
74        &self.mode
75    }
76
77    pub(crate) async fn apply_patch(&self, key: String, patch: Value) {
78        let mut data = self.data.write().await;
79        match &mut *data {
80            StoreData::Kv(map) => {
81                let current = map.get(&key).and_then(|v| serde_json::to_value(v).ok());
82                let mut merged = current.unwrap_or_else(|| serde_json::json!({}));
83
84                if let (Some(obj), Some(patch_obj)) = (merged.as_object_mut(), patch.as_object()) {
85                    for (k, v) in patch_obj {
86                        obj.insert(k.clone(), v.clone());
87                    }
88                }
89
90                if let Ok(typed) = serde_json::from_value::<T>(merged) {
91                    map.insert(key.clone(), typed.clone());
92                    let _ = self.update_tx.send((key, typed));
93                }
94            }
95            StoreData::List(vec) | StoreData::Append(vec) => {
96                let item_data = patch.get("item").cloned().unwrap_or(patch);
97                if let Ok(typed) = serde_json::from_value::<T>(item_data) {
98                    vec.push(typed.clone());
99                    let _ = self.update_tx.send((key, typed));
100                }
101            }
102        }
103    }
104
105    pub(crate) async fn apply_upsert(&self, key: String, value: Value) {
106        let mut data = self.data.write().await;
107        
108        if let Ok(typed) = serde_json::from_value::<T>(value) {
109            match &mut *data {
110                StoreData::Kv(map) => {
111                    map.insert(key.clone(), typed.clone());
112                    let _ = self.update_tx.send((key, typed));
113                }
114                StoreData::Append(vec) | StoreData::List(vec) => {
115                    vec.push(typed.clone());
116                    let _ = self.update_tx.send((key, typed));
117                }
118            }
119        }
120        
121    }
122
123    pub(crate) async fn apply_delete(&self, key: String) {
124        let mut data = self.data.write().await;
125        if let StoreData::Kv(map) = &mut *data {
126            map.remove(&key);
127        }
128    }
129}
130
131impl<T> Default for EntityStore<T>
132where
133    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
134{
135    fn default() -> Self {
136        Self::new(Mode::Kv)
137    }
138}
139
140impl<T> Clone for EntityStore<T>
141where
142    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
143{
144    fn clone(&self) -> Self {
145        Self {
146            mode: self.mode,
147            data: self.data.clone(),
148            update_tx: self.update_tx.clone(),
149            _phantom: PhantomData,
150        }
151    }
152}
153