1use crate::mutation::Mode;
2use serde::{de::DeserializeOwned, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::marker::PhantomData;
6use std::sync::Arc;
7use tokio::sync::{broadcast, RwLock};
8
9enum StoreData<T> {
10 Kv(HashMap<String, T>),
11 Append(Vec<T>),
12 List(Vec<T>),
13}
14
15pub type Update<T> = (String, T);
16
17pub struct EntityStore<T> {
18 mode: Mode,
19 data: Arc<RwLock<StoreData<T>>>,
20 update_tx: broadcast::Sender<Update<T>>,
21 _phantom: PhantomData<T>,
22}
23
24impl<T> EntityStore<T>
25where
26 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
27{
28 pub fn new(mode: Mode) -> Self {
29 let data = match mode {
30 Mode::Kv | Mode::State => StoreData::Kv(HashMap::new()),
31 Mode::Append => StoreData::Append(Vec::new()),
32 Mode::List => StoreData::List(Vec::new()),
33 };
34
35 let (update_tx, _) = broadcast::channel(1000);
36
37 Self {
38 mode,
39 data: Arc::new(RwLock::new(data)),
40 update_tx,
41 _phantom: PhantomData,
42 }
43 }
44
45 pub fn subscribe(&self) -> broadcast::Receiver<Update<T>> {
46 self.update_tx.subscribe()
47 }
48
49 pub async fn get(&self, key: &str) -> Option<T> {
50 let data = self.data.read().await;
51 match &*data {
52 StoreData::Kv(map) => map.get(key).cloned(),
53 _ => None,
54 }
55 }
56
57 pub async fn all(&self) -> HashMap<String, T> {
58 let data = self.data.read().await;
59 match &*data {
60 StoreData::Kv(map) => map.clone(),
61 _ => HashMap::new(),
62 }
63 }
64
65 pub async fn all_vec(&self) -> Vec<T> {
66 let data = self.data.read().await;
67 match &*data {
68 StoreData::Append(vec) | StoreData::List(vec) => vec.clone(),
69 _ => Vec::new(),
70 }
71 }
72
73 pub fn mode(&self) -> &Mode {
74 &self.mode
75 }
76
77 pub(crate) async fn apply_patch(&self, key: String, patch: Value) {
78 let mut data = self.data.write().await;
79 match &mut *data {
80 StoreData::Kv(map) => {
81 let current = map.get(&key).and_then(|v| serde_json::to_value(v).ok());
82 let mut merged = current.unwrap_or_else(|| serde_json::json!({}));
83
84 if let (Some(obj), Some(patch_obj)) = (merged.as_object_mut(), patch.as_object()) {
85 for (k, v) in patch_obj {
86 obj.insert(k.clone(), v.clone());
87 }
88 }
89
90 if let Ok(typed) = serde_json::from_value::<T>(merged) {
91 map.insert(key.clone(), typed.clone());
92 let _ = self.update_tx.send((key, typed));
93 }
94 }
95 StoreData::List(vec) | StoreData::Append(vec) => {
96 let item_data = patch.get("item").cloned().unwrap_or(patch);
97 if let Ok(typed) = serde_json::from_value::<T>(item_data) {
98 vec.push(typed.clone());
99 let _ = self.update_tx.send((key, typed));
100 }
101 }
102 }
103 }
104
105 pub(crate) async fn apply_upsert(&self, key: String, value: Value) {
106 let mut data = self.data.write().await;
107
108 if let Ok(typed) = serde_json::from_value::<T>(value) {
109 match &mut *data {
110 StoreData::Kv(map) => {
111 map.insert(key.clone(), typed.clone());
112 let _ = self.update_tx.send((key, typed));
113 }
114 StoreData::Append(vec) | StoreData::List(vec) => {
115 vec.push(typed.clone());
116 let _ = self.update_tx.send((key, typed));
117 }
118 }
119 }
120 }
121
122 pub(crate) async fn apply_delete(&self, key: String) {
123 let mut data = self.data.write().await;
124 if let StoreData::Kv(map) = &mut *data {
125 map.remove(&key);
126 }
127 }
128}
129
130impl<T> Default for EntityStore<T>
131where
132 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
133{
134 fn default() -> Self {
135 Self::new(Mode::Kv)
136 }
137}
138
139impl<T> Clone for EntityStore<T>
140where
141 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
142{
143 fn clone(&self) -> Self {
144 Self {
145 mode: self.mode,
146 data: self.data.clone(),
147 update_tx: self.update_tx.clone(),
148 _phantom: PhantomData,
149 }
150 }
151}