Skip to main content

peat_protocol/storage/
node_store.rs

1//! Node state storage manager
2//!
3//! This module provides a high-level wrapper around data sync backends for managing
4//! node configurations and state using CRDT operations.
5
6use crate::models::node::{NodeConfig, NodeState, NodeStateExt};
7use crate::sync::{DataSyncBackend, Document, Query, SyncSubscription, Value};
8use crate::{Error, Result};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tracing::{debug, info, instrument};
12
13/// Collection names
14const NODE_CONFIG_COLLECTION: &str = "node_configs";
15const NODE_STATE_COLLECTION: &str = "node_states";
16
17/// Node storage manager
18pub struct NodeStore<B: DataSyncBackend> {
19    backend: Arc<B>,
20    _config_sync_sub: SyncSubscription,
21    _state_sync_sub: SyncSubscription,
22}
23
24impl<B: DataSyncBackend> NodeStore<B> {
25    /// Create a new node store with sync subscriptions for P2P replication
26    pub async fn new(backend: Arc<B>) -> Result<Self> {
27        // Create sync subscriptions for both collections
28        // This is REQUIRED for P2P replication - without it, data stays local
29        let query = Query::All;
30        let config_sync_sub = backend
31            .sync_engine()
32            .subscribe(NODE_CONFIG_COLLECTION, &query)
33            .await
34            .map_err(|e| {
35                Error::storage_error(
36                    format!("Failed to create sync subscription for node_configs: {}", e),
37                    "new",
38                    Some(NODE_CONFIG_COLLECTION.to_string()),
39                )
40            })?;
41
42        let state_sync_sub = backend
43            .sync_engine()
44            .subscribe(NODE_STATE_COLLECTION, &query)
45            .await
46            .map_err(|e| {
47                Error::storage_error(
48                    format!("Failed to create sync subscription for node_states: {}", e),
49                    "new",
50                    Some(NODE_STATE_COLLECTION.to_string()),
51                )
52            })?;
53
54        Ok(Self {
55            backend,
56            _config_sync_sub: config_sync_sub,
57            _state_sync_sub: state_sync_sub,
58        })
59    }
60
61    /// Convert NodeConfig to Document
62    fn config_to_document(config: &NodeConfig) -> Result<Document> {
63        let json_val = serde_json::to_value(config)?;
64        let fields = json_val
65            .as_object()
66            .ok_or_else(|| Error::Internal("Failed to serialize config to object".into()))?
67            .iter()
68            .map(|(k, v)| (k.clone(), v.clone()))
69            .collect::<HashMap<String, Value>>();
70
71        // Use the config's id as the document ID to enable proper updates
72        Ok(Document::with_id(&config.id, fields))
73    }
74
75    /// Convert Document to NodeConfig
76    fn document_to_config(doc: &Document) -> Result<NodeConfig> {
77        let json_val = serde_json::to_value(&doc.fields)?;
78        Ok(serde_json::from_value(json_val)?)
79    }
80
81    /// Convert NodeState to Document with node_id
82    fn state_to_document(node_id: &str, state: &NodeState) -> Result<Document> {
83        let json_val = serde_json::to_value(state)?;
84        let mut fields = json_val
85            .as_object()
86            .ok_or_else(|| Error::Internal("Failed to serialize state to object".into()))?
87            .iter()
88            .map(|(k, v)| (k.clone(), v.clone()))
89            .collect::<HashMap<String, Value>>();
90
91        // Add node_id field for querying
92        fields.insert("node_id".to_string(), Value::String(node_id.to_string()));
93
94        // Use the node_id as the document ID to enable proper updates
95        Ok(Document::with_id(node_id, fields))
96    }
97
98    /// Convert Document to NodeState
99    fn document_to_state(doc: &Document) -> Result<NodeState> {
100        let json_val = serde_json::to_value(&doc.fields)?;
101        Ok(serde_json::from_value(json_val)?)
102    }
103
104    /// Store a node configuration (G-Set operation)
105    #[instrument(skip(self, config))]
106    pub async fn store_config(&self, config: &NodeConfig) -> Result<String> {
107        info!("Storing node config: {}", config.id);
108
109        let doc = Self::config_to_document(config)?;
110
111        self.backend
112            .document_store()
113            .upsert(NODE_CONFIG_COLLECTION, doc)
114            .await
115            .map_err(|e| {
116                Error::storage_error(
117                    format!("Failed to store node config: {}", e),
118                    "upsert",
119                    Some(NODE_CONFIG_COLLECTION.to_string()),
120                )
121            })
122    }
123
124    /// Retrieve a node configuration by ID
125    #[instrument(skip(self))]
126    pub async fn get_config(&self, node_id: &str) -> Result<Option<NodeConfig>> {
127        debug!("Retrieving node config: {}", node_id);
128
129        let query = Query::Eq {
130            field: "id".to_string(),
131            value: Value::String(node_id.to_string()),
132        };
133        let docs = self
134            .backend
135            .document_store()
136            .query(NODE_CONFIG_COLLECTION, &query)
137            .await?;
138
139        if docs.is_empty() {
140            return Ok(None);
141        }
142
143        let config = Self::document_to_config(&docs[0])?;
144        Ok(Some(config))
145    }
146
147    /// Store node state (LWW-Register operation)
148    #[instrument(skip(self, state))]
149    pub async fn store_state(&self, node_id: &str, state: &NodeState) -> Result<String> {
150        info!("Storing node state: {}", node_id);
151
152        let doc = Self::state_to_document(node_id, state)?;
153
154        self.backend
155            .document_store()
156            .upsert(NODE_STATE_COLLECTION, doc)
157            .await
158            .map_err(|e| {
159                Error::storage_error(
160                    format!("Failed to store node state: {}", e),
161                    "upsert",
162                    Some(NODE_STATE_COLLECTION.to_string()),
163                )
164            })
165    }
166
167    /// Retrieve node state by ID
168    #[instrument(skip(self))]
169    pub async fn get_state(&self, node_id: &str) -> Result<Option<NodeState>> {
170        debug!("Retrieving node state: {}", node_id);
171
172        let query = Query::Eq {
173            field: "node_id".to_string(),
174            value: Value::String(node_id.to_string()),
175        };
176        let docs = self
177            .backend
178            .document_store()
179            .query(NODE_STATE_COLLECTION, &query)
180            .await?;
181
182        if docs.is_empty() {
183            return Ok(None);
184        }
185
186        let state = Self::document_to_state(&docs[0])?;
187        Ok(Some(state))
188    }
189
190    /// Get all nodes in a specific phase
191    #[instrument(skip(self))]
192    pub async fn get_nodes_by_phase(&self, phase: crate::traits::Phase) -> Result<Vec<NodeState>> {
193        use crate::traits::PhaseExt;
194        debug!("Querying nodes by phase: {:?}", phase);
195
196        let phase_str = phase.as_str().to_string();
197        let query = Query::Eq {
198            field: "phase".to_string(),
199            value: Value::String(phase_str),
200        };
201        let docs = self
202            .backend
203            .document_store()
204            .query(NODE_STATE_COLLECTION, &query)
205            .await?;
206
207        let states: Vec<NodeState> = docs
208            .into_iter()
209            .filter_map(|doc| Self::document_to_state(&doc).ok())
210            .collect();
211
212        Ok(states)
213    }
214
215    /// Get all nodes in a specific squad
216    #[instrument(skip(self))]
217    pub async fn get_nodes_by_cell(&self, squad_id: &str) -> Result<Vec<NodeState>> {
218        debug!("Querying nodes by squad: {}", squad_id);
219
220        let query = Query::Eq {
221            field: "squad_id".to_string(),
222            value: Value::String(squad_id.to_string()),
223        };
224        let docs = self
225            .backend
226            .document_store()
227            .query(NODE_STATE_COLLECTION, &query)
228            .await?;
229
230        let states: Vec<NodeState> = docs
231            .into_iter()
232            .filter_map(|doc| Self::document_to_state(&doc).ok())
233            .collect();
234
235        Ok(states)
236    }
237
238    /// Get all operational nodes (health != Failed && fuel > 0)
239    #[instrument(skip(self))]
240    pub async fn get_operational_nodes(&self) -> Result<Vec<NodeState>> {
241        debug!("Querying operational nodes");
242
243        let query = Query::Gt {
244            field: "fuel_minutes".to_string(),
245            value: serde_json::json!(0),
246        };
247        let docs = self
248            .backend
249            .document_store()
250            .query(NODE_STATE_COLLECTION, &query)
251            .await?;
252
253        let states: Vec<NodeState> = docs
254            .into_iter()
255            .filter_map(|doc| Self::document_to_state(&doc).ok())
256            .filter(|state: &NodeState| state.is_operational())
257            .collect();
258
259        Ok(states)
260    }
261
262    /// Delete a node configuration
263    #[instrument(skip(self))]
264    pub async fn delete_config(&self, node_id: &str) -> Result<()> {
265        info!("Deleting node config: {}", node_id);
266
267        self.backend
268            .document_store()
269            .remove(NODE_CONFIG_COLLECTION, &node_id.to_string())
270            .await?;
271        Ok(())
272    }
273
274    /// Delete a node state
275    #[instrument(skip(self))]
276    pub async fn delete_state(&self, node_id: &str) -> Result<()> {
277        info!("Deleting node state: {}", node_id);
278
279        self.backend
280            .document_store()
281            .remove(NODE_STATE_COLLECTION, &node_id.to_string())
282            .await?;
283        Ok(())
284    }
285
286    /// Get the underlying backend reference
287    pub fn backend(&self) -> &B {
288        &self.backend
289    }
290}