use std::sync::Arc;
use axum::{extract::State, http::StatusCode, response::Json};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use utoipa::ToSchema;
use crate::righgravity_client::{RighGravityClient, RighGravityNodeState};
pub type ValorOnlineNodesCounter = Arc<RwLock<ValorOnlineNodesInfo>>;
#[derive(Debug, Clone, Default)]
pub struct ValorOnlineNodesInfo {
pub online_nodes: std::collections::HashSet<String>,
pub total_seen: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ValorNodeStatusResponse {
pub nodes: Vec<ValorNodeStatus>,
pub total_nodes: usize,
pub online_nodes: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ValorNodeStatus {
pub id: String,
pub ip: Option<String>,
pub online: bool,
pub onboarding_state: String,
}
impl From<RighGravityNodeState> for ValorNodeStatus {
fn from(node: RighGravityNodeState) -> Self {
let online = RighGravityClient::is_node_online(&node);
Self {
id: node.id,
ip: node.ip,
online,
onboarding_state: node.onboarding_state,
}
}
}
#[utoipa::path(
get,
path = "/api/v1/nodes/status",
tag = "Master",
responses(
(status = 200, description = "Node status retrieved successfully", body = ValorNodeStatusResponse),
(status = 500, description = "Failed to retrieve node status")
)
)]
pub async fn get_nodes_status(
State(client): State<std::sync::Arc<RighGravityClient>>,
) -> Result<Json<ValorNodeStatusResponse>, StatusCode> {
let nodes = client.fetch_node_states().await.map_err(|e| {
tracing::error!("Failed to fetch node states: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let online_count = nodes
.iter()
.filter(|n| RighGravityClient::is_node_online(n))
.count();
let response = ValorNodeStatusResponse {
total_nodes: nodes.len(),
online_nodes: online_count,
nodes: nodes.into_iter().map(ValorNodeStatus::from).collect(),
};
Ok(Json(response))
}
#[utoipa::path(
get,
path = "/api/v1/nodes/{node_id}/status",
tag = "Master",
params(
("node_id" = String, description = "Node ID to query")
),
responses(
(status = 200, description = "Node status retrieved successfully", body = ValorNodeStatus),
(status = 404, description = "Node not found"),
(status = 500, description = "Failed to retrieve node status")
)
)]
pub async fn get_node_status(
State(client): State<std::sync::Arc<RighGravityClient>>,
axum::extract::Path(node_id): axum::extract::Path<String>,
) -> Result<Json<ValorNodeStatus>, StatusCode> {
let node = client
.get_node_by_id(&node_id)
.await
.map_err(|e| {
tracing::error!("Failed to fetch node {}: {}", node_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(ValorNodeStatus::from(node)))
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ValorNodeStateChangeEvent {
pub node_id: String,
pub event_type: ValorNodeEventType,
pub timestamp: u64,
pub details: Option<ValorNodeEventDetails>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub enum ValorNodeEventType {
Connected,
Disconnected,
Initialized,
StateChanged,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ValorNodeEventDetails {
pub ip: Option<String>,
pub firmware_version: Option<String>,
pub onboarding_state: Option<String>,
}
#[utoipa::path(
post,
path = "/api/v1/nodes/events",
tag = "Master",
request_body(
content = ValorNodeStateChangeEvent,
content_type = "application/json"
),
responses(
(status = 200, description = "Event processed successfully"),
(status = 400, description = "Invalid event data")
)
)]
pub async fn handle_node_event(
State(counter): State<ValorOnlineNodesCounter>,
Json(event): Json<ValorNodeStateChangeEvent>,
) -> Result<StatusCode, StatusCode> {
let event_type_str = match &event.event_type {
ValorNodeEventType::Connected => "Connected",
ValorNodeEventType::Disconnected => "Disconnected",
ValorNodeEventType::Initialized => "Initialized",
ValorNodeEventType::StateChanged => "StateChanged",
};
tracing::info!(
"Received node event from RighGravity: node_id={}, event={}",
event.node_id,
event_type_str
);
let mut info = counter.write().await;
match event.event_type {
ValorNodeEventType::Connected | ValorNodeEventType::Initialized => {
info.online_nodes.insert(event.node_id.clone());
if info.total_seen < info.online_nodes.len() {
info.total_seen = info.online_nodes.len();
}
}
ValorNodeEventType::Disconnected => {
info.online_nodes.remove(&event.node_id);
}
ValorNodeEventType::StateChanged => {
}
}
let online_count = info.online_nodes.len();
let total = info.total_seen;
drop(info);
tracing::info!("Node status updated: {}/{} online", online_count, total);
Ok(StatusCode::OK)
}