Skip to main content

peat_protocol/storage/
cell_store.rs

1//! Cell state storage manager
2//!
3//! This module provides a high-level wrapper around data sync backends for managing
4//! cell state using CRDT operations.
5
6use crate::models::{
7    cell::{CellState, CellStateExt},
8    Capability,
9};
10use crate::sync::{DataSyncBackend, Document, Query, SyncSubscription, Value};
11use crate::{Error, Result};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tracing::{debug, info, instrument};
15
16/// Collection name
17const CELL_COLLECTION: &str = "cells";
18
19/// Cell storage manager
20pub struct CellStore<B: DataSyncBackend> {
21    backend: Arc<B>,
22    _sync_sub: SyncSubscription,
23}
24
25impl<B: DataSyncBackend> CellStore<B> {
26    /// Create a new cell store with sync subscription for P2P replication
27    pub async fn new(backend: Arc<B>) -> Result<Self> {
28        // Create sync subscription for the cells collection
29        // This is REQUIRED for P2P replication - without it, data stays local
30        let query = Query::All;
31        let sync_sub = backend
32            .sync_engine()
33            .subscribe(CELL_COLLECTION, &query)
34            .await
35            .map_err(|e| {
36                Error::storage_error(
37                    format!("Failed to create sync subscription for cells: {}", e),
38                    "new",
39                    Some(CELL_COLLECTION.to_string()),
40                )
41            })?;
42
43        Ok(Self {
44            backend,
45            _sync_sub: sync_sub,
46        })
47    }
48
49    /// Convert CellState to Document
50    fn cell_to_document(cell: &CellState) -> Result<Document> {
51        let json_val = serde_json::to_value(cell)?;
52        let mut fields = json_val
53            .as_object()
54            .ok_or_else(|| Error::Internal("Failed to serialize cell to object".into()))?
55            .iter()
56            .map(|(k, v)| (k.clone(), v.clone()))
57            .collect::<HashMap<String, Value>>();
58
59        // Add cell_id field for querying
60        if let Some(id) = cell.get_id() {
61            fields.insert("cell_id".to_string(), Value::String(id.to_string()));
62            // Use cell_id as document ID to enable proper updates
63            Ok(Document::with_id(id, fields))
64        } else {
65            Ok(Document::new(fields))
66        }
67    }
68
69    /// Convert Document to CellState
70    fn document_to_cell(doc: &Document) -> Result<CellState> {
71        let json_val = serde_json::to_value(&doc.fields)?;
72        Ok(serde_json::from_value(json_val)?)
73    }
74
75    /// Store a cell state (OR-Set + LWW-Register operations)
76    #[instrument(skip(self, cell))]
77    pub async fn store_cell(&self, cell: &CellState) -> Result<String> {
78        info!("Storing cell: {}", cell.get_id().unwrap_or("<unknown>"));
79
80        let doc = Self::cell_to_document(cell)?;
81
82        // Always INSERT - get_cell will query for latest by updated_at timestamp
83        self.backend
84            .document_store()
85            .upsert(CELL_COLLECTION, doc)
86            .await
87            .map_err(|e| {
88                Error::storage_error(
89                    format!("Failed to store cell: {}", e),
90                    "upsert",
91                    Some(CELL_COLLECTION.to_string()),
92                )
93            })
94    }
95
96    /// Retrieve a cell by ID
97    #[instrument(skip(self))]
98    pub async fn get_cell(&self, cell_id: &str) -> Result<Option<CellState>> {
99        debug!("Retrieving cell: {}", cell_id);
100
101        let query = Query::Eq {
102            field: "cell_id".to_string(),
103            value: Value::String(cell_id.to_string()),
104        };
105        let mut docs = self
106            .backend
107            .document_store()
108            .query(CELL_COLLECTION, &query)
109            .await?;
110
111        if docs.is_empty() {
112            return Ok(None);
113        }
114
115        // Sort by updated_at descending to get the latest version
116        // (since we always INSERT new documents rather than updating)
117        docs.sort_by(|a, b| {
118            let a_ts = a.updated_at;
119            let b_ts = b.updated_at;
120            b_ts.cmp(&a_ts) // Descending order
121        });
122
123        let cell = Self::document_to_cell(&docs[0])?;
124        Ok(Some(cell))
125    }
126
127    /// Get all valid cells (meeting minimum size requirements)
128    #[instrument(skip(self))]
129    pub async fn get_valid_cells(&self) -> Result<Vec<CellState>> {
130        debug!("Querying valid cells");
131
132        // Query all cells - we'll filter in code since query abstraction doesn't support array length
133        let query = Query::All;
134        let docs = self
135            .backend
136            .document_store()
137            .query(CELL_COLLECTION, &query)
138            .await?;
139
140        let cells: Vec<CellState> = docs
141            .into_iter()
142            .filter_map(|doc| Self::document_to_cell(&doc).ok())
143            .filter(|cell: &CellState| cell.is_valid())
144            .collect();
145
146        Ok(cells)
147    }
148
149    /// Get all cells in a platoon
150    #[instrument(skip(self))]
151    pub async fn get_cells_by_zone(&self, platoon_id: &str) -> Result<Vec<CellState>> {
152        debug!("Querying cells by platoon: {}", platoon_id);
153
154        let query = Query::Eq {
155            field: "platoon_id".to_string(),
156            value: Value::String(platoon_id.to_string()),
157        };
158        let docs = self
159            .backend
160            .document_store()
161            .query(CELL_COLLECTION, &query)
162            .await?;
163
164        let cells: Vec<CellState> = docs
165            .into_iter()
166            .filter_map(|doc| Self::document_to_cell(&doc).ok())
167            .collect();
168
169        Ok(cells)
170    }
171
172    /// Get cells that have a specific capability type
173    #[instrument(skip(self))]
174    pub async fn get_cells_with_capability(
175        &self,
176        capability_type: crate::models::CapabilityType,
177    ) -> Result<Vec<CellState>> {
178        debug!("Querying cells with capability: {:?}", capability_type);
179
180        // Query all cells - filter by capability in code
181        let query = Query::All;
182        let docs = self
183            .backend
184            .document_store()
185            .query(CELL_COLLECTION, &query)
186            .await?;
187
188        let cells: Vec<CellState> = docs
189            .into_iter()
190            .filter_map(|doc| Self::document_to_cell(&doc).ok())
191            .filter(|cell: &CellState| cell.has_capability_type(capability_type))
192            .collect();
193
194        Ok(cells)
195    }
196
197    /// Get cells that are not full (can accept more members)
198    #[instrument(skip(self))]
199    pub async fn get_available_cells(&self) -> Result<Vec<CellState>> {
200        debug!("Querying available cells");
201
202        let query = Query::All;
203        let docs = self
204            .backend
205            .document_store()
206            .query(CELL_COLLECTION, &query)
207            .await?;
208
209        let cells: Vec<CellState> = docs
210            .into_iter()
211            .filter_map(|doc| Self::document_to_cell(&doc).ok())
212            .filter(|cell: &CellState| !cell.is_full())
213            .collect();
214
215        Ok(cells)
216    }
217
218    /// Add a member to a cell (OR-Set add operation)
219    #[instrument(skip(self))]
220    pub async fn add_member(&self, cell_id: &str, node_id: String) -> Result<()> {
221        info!("Adding member {} to cell {}", node_id, cell_id);
222
223        let mut cell = self
224            .get_cell(cell_id)
225            .await?
226            .ok_or_else(|| Error::NotFound {
227                resource_type: "Cell".to_string(),
228                id: cell_id.to_string(),
229            })?;
230
231        if !cell.add_member(node_id) {
232            return Err(Error::Internal("Failed to add member to cell".to_string()));
233        }
234
235        self.store_cell(&cell).await?;
236        Ok(())
237    }
238
239    /// Remove a member from a cell (OR-Set remove operation)
240    #[instrument(skip(self))]
241    pub async fn remove_member(&self, cell_id: &str, node_id: &str) -> Result<()> {
242        info!("Removing member {} from cell {}", node_id, cell_id);
243
244        let mut cell = self
245            .get_cell(cell_id)
246            .await?
247            .ok_or_else(|| Error::NotFound {
248                resource_type: "Cell".to_string(),
249                id: cell_id.to_string(),
250            })?;
251
252        if !cell.remove_member(node_id) {
253            return Err(Error::Internal(
254                "Failed to remove member from cell".to_string(),
255            ));
256        }
257
258        self.store_cell(&cell).await?;
259        Ok(())
260    }
261
262    /// Set squad leader (LWW-Register operation)
263    #[instrument(skip(self))]
264    pub async fn set_leader(&self, cell_id: &str, node_id: String) -> Result<()> {
265        info!("Setting leader {} for squad {}", node_id, cell_id);
266
267        let mut cell = self
268            .get_cell(cell_id)
269            .await?
270            .ok_or_else(|| Error::NotFound {
271                resource_type: "Cell".to_string(),
272                id: cell_id.to_string(),
273            })?;
274
275        cell.set_leader(node_id)
276            .map_err(|e| Error::Internal(e.to_string()))?;
277
278        self.store_cell(&cell).await?;
279        Ok(())
280    }
281
282    /// Add a capability to a cell (G-Set operation)
283    #[instrument(skip(self, capability))]
284    pub async fn add_capability(&self, cell_id: &str, capability: Capability) -> Result<()> {
285        info!("Adding capability to cell {}", cell_id);
286
287        let mut cell = self
288            .get_cell(cell_id)
289            .await?
290            .ok_or_else(|| Error::NotFound {
291                resource_type: "Cell".to_string(),
292                id: cell_id.to_string(),
293            })?;
294
295        cell.add_capability(capability);
296        self.store_cell(&cell).await?;
297        Ok(())
298    }
299
300    /// Delete a cell
301    #[instrument(skip(self))]
302    pub async fn delete_cell(&self, cell_id: &str) -> Result<()> {
303        info!("Deleting cell: {}", cell_id);
304
305        self.backend
306            .document_store()
307            .remove(CELL_COLLECTION, &cell_id.to_string())
308            .await?;
309        Ok(())
310    }
311
312    /// Get the underlying backend reference
313    pub fn backend(&self) -> &B {
314        &self.backend
315    }
316}