use crate::models::node::{NodeConfig, NodeState, NodeStateExt};
use crate::sync::{DataSyncBackend, Document, Query, SyncSubscription, Value};
use crate::{Error, Result};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, info, instrument};
const NODE_CONFIG_COLLECTION: &str = "node_configs";
const NODE_STATE_COLLECTION: &str = "node_states";
pub struct NodeStore<B: DataSyncBackend> {
backend: Arc<B>,
_config_sync_sub: SyncSubscription,
_state_sync_sub: SyncSubscription,
}
impl<B: DataSyncBackend> NodeStore<B> {
pub async fn new(backend: Arc<B>) -> Result<Self> {
let query = Query::All;
let config_sync_sub = backend
.sync_engine()
.subscribe(NODE_CONFIG_COLLECTION, &query)
.await
.map_err(|e| {
Error::storage_error(
format!("Failed to create sync subscription for node_configs: {}", e),
"new",
Some(NODE_CONFIG_COLLECTION.to_string()),
)
})?;
let state_sync_sub = backend
.sync_engine()
.subscribe(NODE_STATE_COLLECTION, &query)
.await
.map_err(|e| {
Error::storage_error(
format!("Failed to create sync subscription for node_states: {}", e),
"new",
Some(NODE_STATE_COLLECTION.to_string()),
)
})?;
Ok(Self {
backend,
_config_sync_sub: config_sync_sub,
_state_sync_sub: state_sync_sub,
})
}
fn config_to_document(config: &NodeConfig) -> Result<Document> {
let json_val = serde_json::to_value(config)?;
let fields = json_val
.as_object()
.ok_or_else(|| Error::Internal("Failed to serialize config to object".into()))?
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, Value>>();
Ok(Document::with_id(&config.id, fields))
}
fn document_to_config(doc: &Document) -> Result<NodeConfig> {
let json_val = serde_json::to_value(&doc.fields)?;
Ok(serde_json::from_value(json_val)?)
}
fn state_to_document(node_id: &str, state: &NodeState) -> Result<Document> {
let json_val = serde_json::to_value(state)?;
let mut fields = json_val
.as_object()
.ok_or_else(|| Error::Internal("Failed to serialize state to object".into()))?
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, Value>>();
fields.insert("node_id".to_string(), Value::String(node_id.to_string()));
Ok(Document::with_id(node_id, fields))
}
fn document_to_state(doc: &Document) -> Result<NodeState> {
let json_val = serde_json::to_value(&doc.fields)?;
Ok(serde_json::from_value(json_val)?)
}
#[instrument(skip(self, config))]
pub async fn store_config(&self, config: &NodeConfig) -> Result<String> {
info!("Storing node config: {}", config.id);
let doc = Self::config_to_document(config)?;
self.backend
.document_store()
.upsert(NODE_CONFIG_COLLECTION, doc)
.await
.map_err(|e| {
Error::storage_error(
format!("Failed to store node config: {}", e),
"upsert",
Some(NODE_CONFIG_COLLECTION.to_string()),
)
})
}
#[instrument(skip(self))]
pub async fn get_config(&self, node_id: &str) -> Result<Option<NodeConfig>> {
debug!("Retrieving node config: {}", node_id);
let query = Query::Eq {
field: "id".to_string(),
value: Value::String(node_id.to_string()),
};
let docs = self
.backend
.document_store()
.query(NODE_CONFIG_COLLECTION, &query)
.await?;
if docs.is_empty() {
return Ok(None);
}
let config = Self::document_to_config(&docs[0])?;
Ok(Some(config))
}
#[instrument(skip(self, state))]
pub async fn store_state(&self, node_id: &str, state: &NodeState) -> Result<String> {
info!("Storing node state: {}", node_id);
let doc = Self::state_to_document(node_id, state)?;
self.backend
.document_store()
.upsert(NODE_STATE_COLLECTION, doc)
.await
.map_err(|e| {
Error::storage_error(
format!("Failed to store node state: {}", e),
"upsert",
Some(NODE_STATE_COLLECTION.to_string()),
)
})
}
#[instrument(skip(self))]
pub async fn get_state(&self, node_id: &str) -> Result<Option<NodeState>> {
debug!("Retrieving node state: {}", node_id);
let query = Query::Eq {
field: "node_id".to_string(),
value: Value::String(node_id.to_string()),
};
let docs = self
.backend
.document_store()
.query(NODE_STATE_COLLECTION, &query)
.await?;
if docs.is_empty() {
return Ok(None);
}
let state = Self::document_to_state(&docs[0])?;
Ok(Some(state))
}
#[instrument(skip(self))]
pub async fn get_nodes_by_phase(&self, phase: crate::traits::Phase) -> Result<Vec<NodeState>> {
use crate::traits::PhaseExt;
debug!("Querying nodes by phase: {:?}", phase);
let phase_str = phase.as_str().to_string();
let query = Query::Eq {
field: "phase".to_string(),
value: Value::String(phase_str),
};
let docs = self
.backend
.document_store()
.query(NODE_STATE_COLLECTION, &query)
.await?;
let states: Vec<NodeState> = docs
.into_iter()
.filter_map(|doc| Self::document_to_state(&doc).ok())
.collect();
Ok(states)
}
#[instrument(skip(self))]
pub async fn get_nodes_by_cell(&self, squad_id: &str) -> Result<Vec<NodeState>> {
debug!("Querying nodes by squad: {}", squad_id);
let query = Query::Eq {
field: "squad_id".to_string(),
value: Value::String(squad_id.to_string()),
};
let docs = self
.backend
.document_store()
.query(NODE_STATE_COLLECTION, &query)
.await?;
let states: Vec<NodeState> = docs
.into_iter()
.filter_map(|doc| Self::document_to_state(&doc).ok())
.collect();
Ok(states)
}
#[instrument(skip(self))]
pub async fn get_operational_nodes(&self) -> Result<Vec<NodeState>> {
debug!("Querying operational nodes");
let query = Query::Gt {
field: "fuel_minutes".to_string(),
value: serde_json::json!(0),
};
let docs = self
.backend
.document_store()
.query(NODE_STATE_COLLECTION, &query)
.await?;
let states: Vec<NodeState> = docs
.into_iter()
.filter_map(|doc| Self::document_to_state(&doc).ok())
.filter(|state: &NodeState| state.is_operational())
.collect();
Ok(states)
}
#[instrument(skip(self))]
pub async fn delete_config(&self, node_id: &str) -> Result<()> {
info!("Deleting node config: {}", node_id);
self.backend
.document_store()
.remove(NODE_CONFIG_COLLECTION, &node_id.to_string())
.await?;
Ok(())
}
#[instrument(skip(self))]
pub async fn delete_state(&self, node_id: &str) -> Result<()> {
info!("Deleting node state: {}", node_id);
self.backend
.document_store()
.remove(NODE_STATE_COLLECTION, &node_id.to_string())
.await?;
Ok(())
}
pub fn backend(&self) -> &B {
&self.backend
}
}