Skip to main content

orca_control/
cluster_api.rs

1//! API endpoints for multi-node cluster management.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use axum::extract::State;
7use axum::http::StatusCode;
8use axum::response::IntoResponse;
9use axum::routing::{get, post};
10use axum::{Json, Router};
11use serde::{Deserialize, Serialize};
12use tracing::info;
13
14use crate::cluster_state::ClusterState;
15
16/// Shared state for cluster API endpoints.
17pub type ClusterApiState = Arc<ClusterState>;
18
19/// Build the cluster management router.
20pub fn cluster_router(state: ClusterApiState) -> Router {
21    Router::new()
22        .route("/api/v1/cluster/info", get(cluster_info))
23        .route("/api/v1/cluster/register", post(register_node))
24        .route("/api/v1/cluster/heartbeat", post(heartbeat))
25        .with_state(state)
26}
27
28/// Cluster info response.
29#[derive(Serialize)]
30struct ClusterInfo {
31    nodes: Vec<NodeInfo>,
32    services: usize,
33    assignments: usize,
34}
35
36#[derive(Serialize)]
37struct NodeInfo {
38    node_id: u64,
39    address: String,
40    status: String,
41}
42
43async fn cluster_info(State(state): State<ClusterApiState>) -> impl IntoResponse {
44    let nodes = state.get_nodes().unwrap_or_default();
45    let services = state.get_services().unwrap_or_default();
46    let assignments = state.get_all_assignments().unwrap_or_default();
47
48    let node_list: Vec<NodeInfo> = nodes
49        .values()
50        .map(|n| NodeInfo {
51            node_id: n.node_id,
52            address: n.address.clone(),
53            status: format!("{:?}", n.status).to_lowercase(),
54        })
55        .collect();
56
57    Json(ClusterInfo {
58        nodes: node_list,
59        services: services.len(),
60        assignments: assignments.len(),
61    })
62}
63
64/// Register node request.
65#[derive(Deserialize)]
66struct RegisterRequest {
67    node_id: u64,
68    address: String,
69    #[serde(default)]
70    labels: HashMap<String, String>,
71}
72
73async fn register_node(
74    State(state): State<ClusterApiState>,
75    Json(req): Json<RegisterRequest>,
76) -> impl IntoResponse {
77    match state
78        .register_node(req.node_id, req.address.clone(), req.labels)
79        .await
80    {
81        Ok(()) => {
82            info!("Node {} registered at {}", req.node_id, req.address);
83            (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))).into_response()
84        }
85        Err(e) => {
86            tracing::error!("Node registration failed: {e}");
87            (
88                StatusCode::INTERNAL_SERVER_ERROR,
89                Json(serde_json::json!({"error": e.to_string()})),
90            )
91                .into_response()
92        }
93    }
94}
95
96/// Heartbeat request from an agent.
97#[derive(Deserialize)]
98struct HeartbeatRequest {
99    node_id: u64,
100    // workloads field accepted but not processed in M2
101    #[serde(default)]
102    _workloads: Vec<serde_json::Value>,
103}
104
105/// Heartbeat response with commands for the agent.
106#[derive(Serialize)]
107struct HeartbeatResponse {
108    commands: Vec<serde_json::Value>,
109}
110
111async fn heartbeat(
112    State(_state): State<ClusterApiState>,
113    Json(req): Json<HeartbeatRequest>,
114) -> impl IntoResponse {
115    // For M2, heartbeats are acknowledged but don't trigger commands yet.
116    // The scheduler assigns workloads proactively, not reactively via heartbeat.
117    tracing::debug!("Heartbeat from node {}", req.node_id);
118
119    Json(HeartbeatResponse {
120        commands: Vec::new(),
121    })
122}