righvalor 0.1.0

RighValor: AI Infrastructure and Applications Framework for the Far Edge
use std::sync::Arc;

use axum::{extract::State, http::StatusCode, response::Json};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use utoipa::ToSchema;

use crate::righgravity_client::{RighGravityClient, RighGravityNodeState};

/// Simple counter for online nodes
pub type ValorOnlineNodesCounter = Arc<RwLock<ValorOnlineNodesInfo>>;

#[derive(Debug, Clone, Default)]
pub struct ValorOnlineNodesInfo {
    pub online_nodes: std::collections::HashSet<String>,
    pub total_seen: usize,
}

/// Node status response
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ValorNodeStatusResponse {
    pub nodes: Vec<ValorNodeStatus>,
    pub total_nodes: usize,
    pub online_nodes: usize,
}

/// Individual node status
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ValorNodeStatus {
    pub id: String,
    pub ip: Option<String>,
    pub online: bool,
    pub onboarding_state: String,
}

impl From<RighGravityNodeState> for ValorNodeStatus {
    fn from(node: RighGravityNodeState) -> Self {
        let online = RighGravityClient::is_node_online(&node);
        Self {
            id: node.id,
            ip: node.ip,
            online,
            onboarding_state: node.onboarding_state,
        }
    }
}

/// Get node status from RighGravity
#[utoipa::path(
    get,
    path = "/api/v1/nodes/status",
    tag = "Master",
    responses(
        (status = 200, description = "Node status retrieved successfully", body = ValorNodeStatusResponse),
        (status = 500, description = "Failed to retrieve node status")
    )
)]
pub async fn get_nodes_status(
    State(client): State<std::sync::Arc<RighGravityClient>>,
) -> Result<Json<ValorNodeStatusResponse>, StatusCode> {
    // Fetch node states from RighGravity
    let nodes = client.fetch_node_states().await.map_err(|e| {
        tracing::error!("Failed to fetch node states: {}", e);
        StatusCode::INTERNAL_SERVER_ERROR
    })?;

    let online_count = nodes
        .iter()
        .filter(|n| RighGravityClient::is_node_online(n))
        .count();

    let response = ValorNodeStatusResponse {
        total_nodes: nodes.len(),
        online_nodes: online_count,
        nodes: nodes.into_iter().map(ValorNodeStatus::from).collect(),
    };

    Ok(Json(response))
}

/// Get specific node status from RighGravity
#[utoipa::path(
    get,
    path = "/api/v1/nodes/{node_id}/status",
    tag = "Master",
    params(
        ("node_id" = String, description = "Node ID to query")
    ),
    responses(
        (status = 200, description = "Node status retrieved successfully", body = ValorNodeStatus),
        (status = 404, description = "Node not found"),
        (status = 500, description = "Failed to retrieve node status")
    )
)]
pub async fn get_node_status(
    State(client): State<std::sync::Arc<RighGravityClient>>,
    axum::extract::Path(node_id): axum::extract::Path<String>,
) -> Result<Json<ValorNodeStatus>, StatusCode> {
    // Fetch specific node from RighGravity
    let node = client
        .get_node_by_id(&node_id)
        .await
        .map_err(|e| {
            tracing::error!("Failed to fetch node {}: {}", node_id, e);
            StatusCode::INTERNAL_SERVER_ERROR
        })?
        .ok_or(StatusCode::NOT_FOUND)?;

    Ok(Json(ValorNodeStatus::from(node)))
}

/// Node state change event received from RighGravity
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ValorNodeStateChangeEvent {
    pub node_id: String,
    pub event_type: ValorNodeEventType,
    pub timestamp: u64,
    pub details: Option<ValorNodeEventDetails>,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub enum ValorNodeEventType {
    Connected,
    Disconnected,
    Initialized,
    StateChanged,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ValorNodeEventDetails {
    pub ip: Option<String>,
    pub firmware_version: Option<String>,
    pub onboarding_state: Option<String>,
}

/// Handle node state change events from RighGravity
#[utoipa::path(
    post,
    path = "/api/v1/nodes/events",
    tag = "Master",
    request_body(
        content = ValorNodeStateChangeEvent,
        content_type = "application/json"
    ),
    responses(
        (status = 200, description = "Event processed successfully"),
        (status = 400, description = "Invalid event data")
    )
)]
pub async fn handle_node_event(
    State(counter): State<ValorOnlineNodesCounter>,
    Json(event): Json<ValorNodeStateChangeEvent>,
) -> Result<StatusCode, StatusCode> {
    let event_type_str = match &event.event_type {
        ValorNodeEventType::Connected => "Connected",
        ValorNodeEventType::Disconnected => "Disconnected",
        ValorNodeEventType::Initialized => "Initialized",
        ValorNodeEventType::StateChanged => "StateChanged",
    };

    tracing::info!(
        "Received node event from RighGravity: node_id={}, event={}",
        event.node_id,
        event_type_str
    );

    // Update counter based on event type
    let mut info = counter.write().await;
    match event.event_type {
        ValorNodeEventType::Connected | ValorNodeEventType::Initialized => {
            info.online_nodes.insert(event.node_id.clone());
            if info.total_seen < info.online_nodes.len() {
                info.total_seen = info.online_nodes.len();
            }
        }
        ValorNodeEventType::Disconnected => {
            info.online_nodes.remove(&event.node_id);
        }
        ValorNodeEventType::StateChanged => {
            // State change doesn't affect online status
        }
    }

    let online_count = info.online_nodes.len();
    let total = info.total_seen;
    drop(info); // Release the lock early

    tracing::info!("Node status updated: {}/{} online", online_count, total);

    Ok(StatusCode::OK)
}