use crate::models::{
cell::{CellState, CellStateExt},
Capability,
};
use crate::sync::{DataSyncBackend, Document, Query, SyncSubscription, Value};
use crate::{Error, Result};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, info, instrument};
const CELL_COLLECTION: &str = "cells";
pub struct CellStore<B: DataSyncBackend> {
backend: Arc<B>,
_sync_sub: SyncSubscription,
}
impl<B: DataSyncBackend> CellStore<B> {
pub async fn new(backend: Arc<B>) -> Result<Self> {
let query = Query::All;
let sync_sub = backend
.sync_engine()
.subscribe(CELL_COLLECTION, &query)
.await
.map_err(|e| {
Error::storage_error(
format!("Failed to create sync subscription for cells: {}", e),
"new",
Some(CELL_COLLECTION.to_string()),
)
})?;
Ok(Self {
backend,
_sync_sub: sync_sub,
})
}
fn cell_to_document(cell: &CellState) -> Result<Document> {
let json_val = serde_json::to_value(cell)?;
let mut fields = json_val
.as_object()
.ok_or_else(|| Error::Internal("Failed to serialize cell to object".into()))?
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, Value>>();
if let Some(id) = cell.get_id() {
fields.insert("cell_id".to_string(), Value::String(id.to_string()));
Ok(Document::with_id(id, fields))
} else {
Ok(Document::new(fields))
}
}
fn document_to_cell(doc: &Document) -> Result<CellState> {
let json_val = serde_json::to_value(&doc.fields)?;
Ok(serde_json::from_value(json_val)?)
}
#[instrument(skip(self, cell))]
pub async fn store_cell(&self, cell: &CellState) -> Result<String> {
info!("Storing cell: {}", cell.get_id().unwrap_or("<unknown>"));
let doc = Self::cell_to_document(cell)?;
self.backend
.document_store()
.upsert(CELL_COLLECTION, doc)
.await
.map_err(|e| {
Error::storage_error(
format!("Failed to store cell: {}", e),
"upsert",
Some(CELL_COLLECTION.to_string()),
)
})
}
#[instrument(skip(self))]
pub async fn get_cell(&self, cell_id: &str) -> Result<Option<CellState>> {
debug!("Retrieving cell: {}", cell_id);
let query = Query::Eq {
field: "cell_id".to_string(),
value: Value::String(cell_id.to_string()),
};
let mut docs = self
.backend
.document_store()
.query(CELL_COLLECTION, &query)
.await?;
if docs.is_empty() {
return Ok(None);
}
docs.sort_by(|a, b| {
let a_ts = a.updated_at;
let b_ts = b.updated_at;
b_ts.cmp(&a_ts) });
let cell = Self::document_to_cell(&docs[0])?;
Ok(Some(cell))
}
#[instrument(skip(self))]
pub async fn get_valid_cells(&self) -> Result<Vec<CellState>> {
debug!("Querying valid cells");
let query = Query::All;
let docs = self
.backend
.document_store()
.query(CELL_COLLECTION, &query)
.await?;
let cells: Vec<CellState> = docs
.into_iter()
.filter_map(|doc| Self::document_to_cell(&doc).ok())
.filter(|cell: &CellState| cell.is_valid())
.collect();
Ok(cells)
}
#[instrument(skip(self))]
pub async fn get_cells_by_zone(&self, platoon_id: &str) -> Result<Vec<CellState>> {
debug!("Querying cells by platoon: {}", platoon_id);
let query = Query::Eq {
field: "platoon_id".to_string(),
value: Value::String(platoon_id.to_string()),
};
let docs = self
.backend
.document_store()
.query(CELL_COLLECTION, &query)
.await?;
let cells: Vec<CellState> = docs
.into_iter()
.filter_map(|doc| Self::document_to_cell(&doc).ok())
.collect();
Ok(cells)
}
#[instrument(skip(self))]
pub async fn get_cells_with_capability(
&self,
capability_type: crate::models::CapabilityType,
) -> Result<Vec<CellState>> {
debug!("Querying cells with capability: {:?}", capability_type);
let query = Query::All;
let docs = self
.backend
.document_store()
.query(CELL_COLLECTION, &query)
.await?;
let cells: Vec<CellState> = docs
.into_iter()
.filter_map(|doc| Self::document_to_cell(&doc).ok())
.filter(|cell: &CellState| cell.has_capability_type(capability_type))
.collect();
Ok(cells)
}
#[instrument(skip(self))]
pub async fn get_available_cells(&self) -> Result<Vec<CellState>> {
debug!("Querying available cells");
let query = Query::All;
let docs = self
.backend
.document_store()
.query(CELL_COLLECTION, &query)
.await?;
let cells: Vec<CellState> = docs
.into_iter()
.filter_map(|doc| Self::document_to_cell(&doc).ok())
.filter(|cell: &CellState| !cell.is_full())
.collect();
Ok(cells)
}
#[instrument(skip(self))]
pub async fn add_member(&self, cell_id: &str, node_id: String) -> Result<()> {
info!("Adding member {} to cell {}", node_id, cell_id);
let mut cell = self
.get_cell(cell_id)
.await?
.ok_or_else(|| Error::NotFound {
resource_type: "Cell".to_string(),
id: cell_id.to_string(),
})?;
if !cell.add_member(node_id) {
return Err(Error::Internal("Failed to add member to cell".to_string()));
}
self.store_cell(&cell).await?;
Ok(())
}
#[instrument(skip(self))]
pub async fn remove_member(&self, cell_id: &str, node_id: &str) -> Result<()> {
info!("Removing member {} from cell {}", node_id, cell_id);
let mut cell = self
.get_cell(cell_id)
.await?
.ok_or_else(|| Error::NotFound {
resource_type: "Cell".to_string(),
id: cell_id.to_string(),
})?;
if !cell.remove_member(node_id) {
return Err(Error::Internal(
"Failed to remove member from cell".to_string(),
));
}
self.store_cell(&cell).await?;
Ok(())
}
#[instrument(skip(self))]
pub async fn set_leader(&self, cell_id: &str, node_id: String) -> Result<()> {
info!("Setting leader {} for squad {}", node_id, cell_id);
let mut cell = self
.get_cell(cell_id)
.await?
.ok_or_else(|| Error::NotFound {
resource_type: "Cell".to_string(),
id: cell_id.to_string(),
})?;
cell.set_leader(node_id)
.map_err(|e| Error::Internal(e.to_string()))?;
self.store_cell(&cell).await?;
Ok(())
}
#[instrument(skip(self, capability))]
pub async fn add_capability(&self, cell_id: &str, capability: Capability) -> Result<()> {
info!("Adding capability to cell {}", cell_id);
let mut cell = self
.get_cell(cell_id)
.await?
.ok_or_else(|| Error::NotFound {
resource_type: "Cell".to_string(),
id: cell_id.to_string(),
})?;
cell.add_capability(capability);
self.store_cell(&cell).await?;
Ok(())
}
#[instrument(skip(self))]
pub async fn delete_cell(&self, cell_id: &str) -> Result<()> {
info!("Deleting cell: {}", cell_id);
self.backend
.document_store()
.remove(CELL_COLLECTION, &cell_id.to_string())
.await?;
Ok(())
}
pub fn backend(&self) -> &B {
&self.backend
}
}