peat_protocol/storage/
node_store.rs1use crate::models::node::{NodeConfig, NodeState, NodeStateExt};
7use crate::sync::{DataSyncBackend, Document, Query, SyncSubscription, Value};
8use crate::{Error, Result};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tracing::{debug, info, instrument};
12
13const NODE_CONFIG_COLLECTION: &str = "node_configs";
15const NODE_STATE_COLLECTION: &str = "node_states";
16
17pub struct NodeStore<B: DataSyncBackend> {
19 backend: Arc<B>,
20 _config_sync_sub: SyncSubscription,
21 _state_sync_sub: SyncSubscription,
22}
23
24impl<B: DataSyncBackend> NodeStore<B> {
25 pub async fn new(backend: Arc<B>) -> Result<Self> {
27 let query = Query::All;
30 let config_sync_sub = backend
31 .sync_engine()
32 .subscribe(NODE_CONFIG_COLLECTION, &query)
33 .await
34 .map_err(|e| {
35 Error::storage_error(
36 format!("Failed to create sync subscription for node_configs: {}", e),
37 "new",
38 Some(NODE_CONFIG_COLLECTION.to_string()),
39 )
40 })?;
41
42 let state_sync_sub = backend
43 .sync_engine()
44 .subscribe(NODE_STATE_COLLECTION, &query)
45 .await
46 .map_err(|e| {
47 Error::storage_error(
48 format!("Failed to create sync subscription for node_states: {}", e),
49 "new",
50 Some(NODE_STATE_COLLECTION.to_string()),
51 )
52 })?;
53
54 Ok(Self {
55 backend,
56 _config_sync_sub: config_sync_sub,
57 _state_sync_sub: state_sync_sub,
58 })
59 }
60
61 fn config_to_document(config: &NodeConfig) -> Result<Document> {
63 let json_val = serde_json::to_value(config)?;
64 let fields = json_val
65 .as_object()
66 .ok_or_else(|| Error::Internal("Failed to serialize config to object".into()))?
67 .iter()
68 .map(|(k, v)| (k.clone(), v.clone()))
69 .collect::<HashMap<String, Value>>();
70
71 Ok(Document::with_id(&config.id, fields))
73 }
74
75 fn document_to_config(doc: &Document) -> Result<NodeConfig> {
77 let json_val = serde_json::to_value(&doc.fields)?;
78 Ok(serde_json::from_value(json_val)?)
79 }
80
81 fn state_to_document(node_id: &str, state: &NodeState) -> Result<Document> {
83 let json_val = serde_json::to_value(state)?;
84 let mut fields = json_val
85 .as_object()
86 .ok_or_else(|| Error::Internal("Failed to serialize state to object".into()))?
87 .iter()
88 .map(|(k, v)| (k.clone(), v.clone()))
89 .collect::<HashMap<String, Value>>();
90
91 fields.insert("node_id".to_string(), Value::String(node_id.to_string()));
93
94 Ok(Document::with_id(node_id, fields))
96 }
97
98 fn document_to_state(doc: &Document) -> Result<NodeState> {
100 let json_val = serde_json::to_value(&doc.fields)?;
101 Ok(serde_json::from_value(json_val)?)
102 }
103
104 #[instrument(skip(self, config))]
106 pub async fn store_config(&self, config: &NodeConfig) -> Result<String> {
107 info!("Storing node config: {}", config.id);
108
109 let doc = Self::config_to_document(config)?;
110
111 self.backend
112 .document_store()
113 .upsert(NODE_CONFIG_COLLECTION, doc)
114 .await
115 .map_err(|e| {
116 Error::storage_error(
117 format!("Failed to store node config: {}", e),
118 "upsert",
119 Some(NODE_CONFIG_COLLECTION.to_string()),
120 )
121 })
122 }
123
124 #[instrument(skip(self))]
126 pub async fn get_config(&self, node_id: &str) -> Result<Option<NodeConfig>> {
127 debug!("Retrieving node config: {}", node_id);
128
129 let query = Query::Eq {
130 field: "id".to_string(),
131 value: Value::String(node_id.to_string()),
132 };
133 let docs = self
134 .backend
135 .document_store()
136 .query(NODE_CONFIG_COLLECTION, &query)
137 .await?;
138
139 if docs.is_empty() {
140 return Ok(None);
141 }
142
143 let config = Self::document_to_config(&docs[0])?;
144 Ok(Some(config))
145 }
146
147 #[instrument(skip(self, state))]
149 pub async fn store_state(&self, node_id: &str, state: &NodeState) -> Result<String> {
150 info!("Storing node state: {}", node_id);
151
152 let doc = Self::state_to_document(node_id, state)?;
153
154 self.backend
155 .document_store()
156 .upsert(NODE_STATE_COLLECTION, doc)
157 .await
158 .map_err(|e| {
159 Error::storage_error(
160 format!("Failed to store node state: {}", e),
161 "upsert",
162 Some(NODE_STATE_COLLECTION.to_string()),
163 )
164 })
165 }
166
167 #[instrument(skip(self))]
169 pub async fn get_state(&self, node_id: &str) -> Result<Option<NodeState>> {
170 debug!("Retrieving node state: {}", node_id);
171
172 let query = Query::Eq {
173 field: "node_id".to_string(),
174 value: Value::String(node_id.to_string()),
175 };
176 let docs = self
177 .backend
178 .document_store()
179 .query(NODE_STATE_COLLECTION, &query)
180 .await?;
181
182 if docs.is_empty() {
183 return Ok(None);
184 }
185
186 let state = Self::document_to_state(&docs[0])?;
187 Ok(Some(state))
188 }
189
190 #[instrument(skip(self))]
192 pub async fn get_nodes_by_phase(&self, phase: crate::traits::Phase) -> Result<Vec<NodeState>> {
193 use crate::traits::PhaseExt;
194 debug!("Querying nodes by phase: {:?}", phase);
195
196 let phase_str = phase.as_str().to_string();
197 let query = Query::Eq {
198 field: "phase".to_string(),
199 value: Value::String(phase_str),
200 };
201 let docs = self
202 .backend
203 .document_store()
204 .query(NODE_STATE_COLLECTION, &query)
205 .await?;
206
207 let states: Vec<NodeState> = docs
208 .into_iter()
209 .filter_map(|doc| Self::document_to_state(&doc).ok())
210 .collect();
211
212 Ok(states)
213 }
214
215 #[instrument(skip(self))]
217 pub async fn get_nodes_by_cell(&self, squad_id: &str) -> Result<Vec<NodeState>> {
218 debug!("Querying nodes by squad: {}", squad_id);
219
220 let query = Query::Eq {
221 field: "squad_id".to_string(),
222 value: Value::String(squad_id.to_string()),
223 };
224 let docs = self
225 .backend
226 .document_store()
227 .query(NODE_STATE_COLLECTION, &query)
228 .await?;
229
230 let states: Vec<NodeState> = docs
231 .into_iter()
232 .filter_map(|doc| Self::document_to_state(&doc).ok())
233 .collect();
234
235 Ok(states)
236 }
237
238 #[instrument(skip(self))]
240 pub async fn get_operational_nodes(&self) -> Result<Vec<NodeState>> {
241 debug!("Querying operational nodes");
242
243 let query = Query::Gt {
244 field: "fuel_minutes".to_string(),
245 value: serde_json::json!(0),
246 };
247 let docs = self
248 .backend
249 .document_store()
250 .query(NODE_STATE_COLLECTION, &query)
251 .await?;
252
253 let states: Vec<NodeState> = docs
254 .into_iter()
255 .filter_map(|doc| Self::document_to_state(&doc).ok())
256 .filter(|state: &NodeState| state.is_operational())
257 .collect();
258
259 Ok(states)
260 }
261
262 #[instrument(skip(self))]
264 pub async fn delete_config(&self, node_id: &str) -> Result<()> {
265 info!("Deleting node config: {}", node_id);
266
267 self.backend
268 .document_store()
269 .remove(NODE_CONFIG_COLLECTION, &node_id.to_string())
270 .await?;
271 Ok(())
272 }
273
274 #[instrument(skip(self))]
276 pub async fn delete_state(&self, node_id: &str) -> Result<()> {
277 info!("Deleting node state: {}", node_id);
278
279 self.backend
280 .document_store()
281 .remove(NODE_STATE_COLLECTION, &node_id.to_string())
282 .await?;
283 Ok(())
284 }
285
286 pub fn backend(&self) -> &B {
288 &self.backend
289 }
290}