peat_protocol/storage/
cell_store.rs1use crate::models::{
7 cell::{CellState, CellStateExt},
8 Capability,
9};
10use crate::sync::{DataSyncBackend, Document, Query, SyncSubscription, Value};
11use crate::{Error, Result};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tracing::{debug, info, instrument};
15
16const CELL_COLLECTION: &str = "cells";
18
19pub struct CellStore<B: DataSyncBackend> {
21 backend: Arc<B>,
22 _sync_sub: SyncSubscription,
23}
24
25impl<B: DataSyncBackend> CellStore<B> {
26 pub async fn new(backend: Arc<B>) -> Result<Self> {
28 let query = Query::All;
31 let sync_sub = backend
32 .sync_engine()
33 .subscribe(CELL_COLLECTION, &query)
34 .await
35 .map_err(|e| {
36 Error::storage_error(
37 format!("Failed to create sync subscription for cells: {}", e),
38 "new",
39 Some(CELL_COLLECTION.to_string()),
40 )
41 })?;
42
43 Ok(Self {
44 backend,
45 _sync_sub: sync_sub,
46 })
47 }
48
49 fn cell_to_document(cell: &CellState) -> Result<Document> {
51 let json_val = serde_json::to_value(cell)?;
52 let mut fields = json_val
53 .as_object()
54 .ok_or_else(|| Error::Internal("Failed to serialize cell to object".into()))?
55 .iter()
56 .map(|(k, v)| (k.clone(), v.clone()))
57 .collect::<HashMap<String, Value>>();
58
59 if let Some(id) = cell.get_id() {
61 fields.insert("cell_id".to_string(), Value::String(id.to_string()));
62 Ok(Document::with_id(id, fields))
64 } else {
65 Ok(Document::new(fields))
66 }
67 }
68
69 fn document_to_cell(doc: &Document) -> Result<CellState> {
71 let json_val = serde_json::to_value(&doc.fields)?;
72 Ok(serde_json::from_value(json_val)?)
73 }
74
75 #[instrument(skip(self, cell))]
77 pub async fn store_cell(&self, cell: &CellState) -> Result<String> {
78 info!("Storing cell: {}", cell.get_id().unwrap_or("<unknown>"));
79
80 let doc = Self::cell_to_document(cell)?;
81
82 self.backend
84 .document_store()
85 .upsert(CELL_COLLECTION, doc)
86 .await
87 .map_err(|e| {
88 Error::storage_error(
89 format!("Failed to store cell: {}", e),
90 "upsert",
91 Some(CELL_COLLECTION.to_string()),
92 )
93 })
94 }
95
96 #[instrument(skip(self))]
98 pub async fn get_cell(&self, cell_id: &str) -> Result<Option<CellState>> {
99 debug!("Retrieving cell: {}", cell_id);
100
101 let query = Query::Eq {
102 field: "cell_id".to_string(),
103 value: Value::String(cell_id.to_string()),
104 };
105 let mut docs = self
106 .backend
107 .document_store()
108 .query(CELL_COLLECTION, &query)
109 .await?;
110
111 if docs.is_empty() {
112 return Ok(None);
113 }
114
115 docs.sort_by(|a, b| {
118 let a_ts = a.updated_at;
119 let b_ts = b.updated_at;
120 b_ts.cmp(&a_ts) });
122
123 let cell = Self::document_to_cell(&docs[0])?;
124 Ok(Some(cell))
125 }
126
127 #[instrument(skip(self))]
129 pub async fn get_valid_cells(&self) -> Result<Vec<CellState>> {
130 debug!("Querying valid cells");
131
132 let query = Query::All;
134 let docs = self
135 .backend
136 .document_store()
137 .query(CELL_COLLECTION, &query)
138 .await?;
139
140 let cells: Vec<CellState> = docs
141 .into_iter()
142 .filter_map(|doc| Self::document_to_cell(&doc).ok())
143 .filter(|cell: &CellState| cell.is_valid())
144 .collect();
145
146 Ok(cells)
147 }
148
149 #[instrument(skip(self))]
151 pub async fn get_cells_by_zone(&self, platoon_id: &str) -> Result<Vec<CellState>> {
152 debug!("Querying cells by platoon: {}", platoon_id);
153
154 let query = Query::Eq {
155 field: "platoon_id".to_string(),
156 value: Value::String(platoon_id.to_string()),
157 };
158 let docs = self
159 .backend
160 .document_store()
161 .query(CELL_COLLECTION, &query)
162 .await?;
163
164 let cells: Vec<CellState> = docs
165 .into_iter()
166 .filter_map(|doc| Self::document_to_cell(&doc).ok())
167 .collect();
168
169 Ok(cells)
170 }
171
172 #[instrument(skip(self))]
174 pub async fn get_cells_with_capability(
175 &self,
176 capability_type: crate::models::CapabilityType,
177 ) -> Result<Vec<CellState>> {
178 debug!("Querying cells with capability: {:?}", capability_type);
179
180 let query = Query::All;
182 let docs = self
183 .backend
184 .document_store()
185 .query(CELL_COLLECTION, &query)
186 .await?;
187
188 let cells: Vec<CellState> = docs
189 .into_iter()
190 .filter_map(|doc| Self::document_to_cell(&doc).ok())
191 .filter(|cell: &CellState| cell.has_capability_type(capability_type))
192 .collect();
193
194 Ok(cells)
195 }
196
197 #[instrument(skip(self))]
199 pub async fn get_available_cells(&self) -> Result<Vec<CellState>> {
200 debug!("Querying available cells");
201
202 let query = Query::All;
203 let docs = self
204 .backend
205 .document_store()
206 .query(CELL_COLLECTION, &query)
207 .await?;
208
209 let cells: Vec<CellState> = docs
210 .into_iter()
211 .filter_map(|doc| Self::document_to_cell(&doc).ok())
212 .filter(|cell: &CellState| !cell.is_full())
213 .collect();
214
215 Ok(cells)
216 }
217
218 #[instrument(skip(self))]
220 pub async fn add_member(&self, cell_id: &str, node_id: String) -> Result<()> {
221 info!("Adding member {} to cell {}", node_id, cell_id);
222
223 let mut cell = self
224 .get_cell(cell_id)
225 .await?
226 .ok_or_else(|| Error::NotFound {
227 resource_type: "Cell".to_string(),
228 id: cell_id.to_string(),
229 })?;
230
231 if !cell.add_member(node_id) {
232 return Err(Error::Internal("Failed to add member to cell".to_string()));
233 }
234
235 self.store_cell(&cell).await?;
236 Ok(())
237 }
238
239 #[instrument(skip(self))]
241 pub async fn remove_member(&self, cell_id: &str, node_id: &str) -> Result<()> {
242 info!("Removing member {} from cell {}", node_id, cell_id);
243
244 let mut cell = self
245 .get_cell(cell_id)
246 .await?
247 .ok_or_else(|| Error::NotFound {
248 resource_type: "Cell".to_string(),
249 id: cell_id.to_string(),
250 })?;
251
252 if !cell.remove_member(node_id) {
253 return Err(Error::Internal(
254 "Failed to remove member from cell".to_string(),
255 ));
256 }
257
258 self.store_cell(&cell).await?;
259 Ok(())
260 }
261
262 #[instrument(skip(self))]
264 pub async fn set_leader(&self, cell_id: &str, node_id: String) -> Result<()> {
265 info!("Setting leader {} for squad {}", node_id, cell_id);
266
267 let mut cell = self
268 .get_cell(cell_id)
269 .await?
270 .ok_or_else(|| Error::NotFound {
271 resource_type: "Cell".to_string(),
272 id: cell_id.to_string(),
273 })?;
274
275 cell.set_leader(node_id)
276 .map_err(|e| Error::Internal(e.to_string()))?;
277
278 self.store_cell(&cell).await?;
279 Ok(())
280 }
281
282 #[instrument(skip(self, capability))]
284 pub async fn add_capability(&self, cell_id: &str, capability: Capability) -> Result<()> {
285 info!("Adding capability to cell {}", cell_id);
286
287 let mut cell = self
288 .get_cell(cell_id)
289 .await?
290 .ok_or_else(|| Error::NotFound {
291 resource_type: "Cell".to_string(),
292 id: cell_id.to_string(),
293 })?;
294
295 cell.add_capability(capability);
296 self.store_cell(&cell).await?;
297 Ok(())
298 }
299
300 #[instrument(skip(self))]
302 pub async fn delete_cell(&self, cell_id: &str) -> Result<()> {
303 info!("Deleting cell: {}", cell_id);
304
305 self.backend
306 .document_store()
307 .remove(CELL_COLLECTION, &cell_id.to_string())
308 .await?;
309 Ok(())
310 }
311
312 pub fn backend(&self) -> &B {
314 &self.backend
315 }
316}