orca_control/
cluster_api.rs1use std::collections::HashMap;
4use std::sync::Arc;
5
6use axum::extract::State;
7use axum::http::StatusCode;
8use axum::response::IntoResponse;
9use axum::routing::{get, post};
10use axum::{Json, Router};
11use serde::{Deserialize, Serialize};
12use tracing::info;
13
14use crate::cluster_state::ClusterState;
15
16pub type ClusterApiState = Arc<ClusterState>;
18
19pub fn cluster_router(state: ClusterApiState) -> Router {
21 Router::new()
22 .route("/api/v1/cluster/info", get(cluster_info))
23 .route("/api/v1/cluster/register", post(register_node))
24 .route("/api/v1/cluster/heartbeat", post(heartbeat))
25 .with_state(state)
26}
27
28#[derive(Serialize)]
30struct ClusterInfo {
31 nodes: Vec<NodeInfo>,
32 services: usize,
33 assignments: usize,
34}
35
36#[derive(Serialize)]
37struct NodeInfo {
38 node_id: u64,
39 address: String,
40 status: String,
41}
42
43async fn cluster_info(State(state): State<ClusterApiState>) -> impl IntoResponse {
44 let nodes = state.get_nodes().unwrap_or_default();
45 let services = state.get_services().unwrap_or_default();
46 let assignments = state.get_all_assignments().unwrap_or_default();
47
48 let node_list: Vec<NodeInfo> = nodes
49 .values()
50 .map(|n| NodeInfo {
51 node_id: n.node_id,
52 address: n.address.clone(),
53 status: format!("{:?}", n.status).to_lowercase(),
54 })
55 .collect();
56
57 Json(ClusterInfo {
58 nodes: node_list,
59 services: services.len(),
60 assignments: assignments.len(),
61 })
62}
63
64#[derive(Deserialize)]
66struct RegisterRequest {
67 node_id: u64,
68 address: String,
69 #[serde(default)]
70 labels: HashMap<String, String>,
71}
72
73async fn register_node(
74 State(state): State<ClusterApiState>,
75 Json(req): Json<RegisterRequest>,
76) -> impl IntoResponse {
77 match state
78 .register_node(req.node_id, req.address.clone(), req.labels)
79 .await
80 {
81 Ok(()) => {
82 info!("Node {} registered at {}", req.node_id, req.address);
83 (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))).into_response()
84 }
85 Err(e) => {
86 tracing::error!("Node registration failed: {e}");
87 (
88 StatusCode::INTERNAL_SERVER_ERROR,
89 Json(serde_json::json!({"error": e.to_string()})),
90 )
91 .into_response()
92 }
93 }
94}
95
96#[derive(Deserialize)]
98struct HeartbeatRequest {
99 node_id: u64,
100 #[serde(default)]
102 _workloads: Vec<serde_json::Value>,
103}
104
105#[derive(Serialize)]
107struct HeartbeatResponse {
108 commands: Vec<serde_json::Value>,
109}
110
111async fn heartbeat(
112 State(_state): State<ClusterApiState>,
113 Json(req): Json<HeartbeatRequest>,
114) -> impl IntoResponse {
115 tracing::debug!("Heartbeat from node {}", req.node_id);
118
119 Json(HeartbeatResponse {
120 commands: Vec::new(),
121 })
122}