use std::sync::Arc;
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use tokio::sync::RwLock;
use tower_http::cors::{Any, CorsLayer};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::{
common::task::{ValorMasterTask, ValorTaskId},
master::{ValorTaskManager, ValorWorkerRegistry},
northbound::{
ValorApiCreateTaskRequest, ValorApiCreateTaskResponse, ValorApiListTasksQuery,
ValorApiMessage, ValorApiRegistryStatus, ValorApiResponse, ValorApiTaskDetail,
ValorApiTaskStats, ValorApiTaskSummary, ValorApiWorkerDetail, ValorApiWorkerSummary,
},
service::ValorServiceRegistry,
types::ValorID,
};
type ApiResponseString = ValorApiResponse<crate::northbound::ValorApiMessage>;
#[derive(OpenApi)]
#[openapi(
paths(
list_workers,
get_worker,
get_registry_status,
list_services,
create_task,
list_tasks,
get_task,
wait_task,
cancel_task,
get_task_stats,
),
components(schemas(
ApiResponseString,
ValorApiMessage,
ValorApiRegistryStatus,
ValorApiCreateTaskRequest,
ValorApiCreateTaskResponse,
ValorApiTaskSummary,
ValorApiTaskDetail,
ValorApiTaskStats,
ValorApiListTasksQuery
)),
tags(
(name = "Master", description = "Master-side APIs: registry, services, tasks"),
(name = "Worker", description = "Worker discovery and inspection"),
)
)]
struct NorthboundApiDoc;
pub struct NorthboundApi {
registry: Arc<RwLock<ValorWorkerRegistry>>,
task_manager: Arc<ValorTaskManager>,
master_actor: ractor::ActorRef<crate::master::ValorMasterMessage>,
righgravity_client: Arc<crate::righgravity_client::RighGravityClient>,
online_nodes_counter: crate::northbound::ValorOnlineNodesCounter,
}
impl NorthboundApi {
pub fn new(
registry: Arc<RwLock<ValorWorkerRegistry>>,
_service_registry: Arc<RwLock<ValorServiceRegistry>>,
task_manager: Arc<ValorTaskManager>,
master_actor: ractor::ActorRef<crate::master::ValorMasterMessage>,
righgravity_client: Arc<crate::righgravity_client::RighGravityClient>,
) -> Self {
Self {
registry,
task_manager,
master_actor,
righgravity_client,
online_nodes_counter: Arc::new(RwLock::new(Default::default())),
}
}
pub fn router(self) -> Router {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
let openapi = NorthboundApiDoc::openapi();
let swagger = SwaggerUi::new("/swagger")
.url("/api-docs/openapi.json", openapi.clone())
.config(utoipa_swagger_ui::Config::default().try_it_out_enabled(true));
let state = ApiState {
registry: self.registry,
task_manager: self.task_manager,
master_actor: self.master_actor,
righgravity_client: self.righgravity_client,
online_nodes_counter: self.online_nodes_counter,
};
Router::new()
.route("/api/v1/registry/status", get(get_registry_status))
.route("/api/v1/workers", get(list_workers))
.route("/api/v1/workers/{worker_id}", get(get_worker))
.route("/api/v1/services", get(list_services))
.route("/api/v1/tasks", post(create_task).get(list_tasks))
.route("/api/v1/tasks/{task_id}", get(get_task))
.route("/api/v1/tasks/{task_id}/wait", post(wait_task))
.route("/api/v1/tasks/{task_id}/cancel", post(cancel_task))
.route("/api/v1/tasks/stats", get(get_task_stats))
.route("/api/v1/nodes/status", get(get_nodes_status_handler))
.route(
"/api/v1/nodes/{node_id}/status",
get(get_node_status_handler),
)
.route("/api/v1/nodes/events", post(handle_node_event_handler))
.merge(swagger)
.layer(cors)
.with_state(state)
}
}
#[derive(Clone)]
struct ApiState {
registry: Arc<RwLock<ValorWorkerRegistry>>,
task_manager: Arc<ValorTaskManager>,
master_actor: ractor::ActorRef<crate::master::ValorMasterMessage>,
righgravity_client: Arc<crate::righgravity_client::RighGravityClient>,
online_nodes_counter: crate::northbound::ValorOnlineNodesCounter,
}
#[utoipa::path(
get,
path = "/api/v1/workers",
tag = "Worker",
responses((status = 200, body = ApiResponseString))
)]
async fn list_workers(
State(state): State<ApiState>,
) -> Result<Json<ValorApiResponse<Vec<ValorApiWorkerSummary>>>, StatusCode> {
let summaries: Vec<ValorApiWorkerSummary> = state
.registry
.read()
.await
.iter()
.map(|(id, rec)| ValorApiWorkerSummary {
worker_id: id.to_string(),
total_cpu: rec.capacity.total_cpu,
free_cpu: rec.capacity.free_cpu,
cpu_used_pct: rec.capacity.cpu_usage_pct,
total_mem_mb: rec.capacity.total_mem_mb,
free_mem_mb: rec.capacity.free_mem_mb,
services: rec.services.len(),
})
.collect();
Ok(Json(ValorApiResponse::success(summaries)))
}
#[utoipa::path(
get,
path = "/api/v1/registry/status",
tag = "Master",
responses((status = 200, body = ApiResponseString))
)]
async fn get_registry_status(
State(state): State<ApiState>,
) -> Result<Json<ValorApiResponse<ValorApiRegistryStatus>>, StatusCode> {
let (joining, ready, unreachable, total) = state.registry.read().await.state_counts();
let message = if ready == 0 && total > 0 {
Some("Workers registered but not READY yet".to_string())
} else if total == 0 {
Some("No workers registered".to_string())
} else {
None
};
Ok(Json(ValorApiResponse::success(ValorApiRegistryStatus {
joining,
ready,
unreachable,
total,
message,
})))
}
#[utoipa::path(
get,
path = "/api/v1/workers/{worker_id}",
tag = "Worker",
params(("worker_id" = String, Path, description = "Worker id")),
responses((status = 200, body = ApiResponseString))
)]
async fn get_worker(
State(state): State<ApiState>,
Path(worker_id): Path<String>,
) -> Result<Json<ValorApiResponse<Option<ValorApiWorkerDetail>>>, StatusCode> {
let id = ValorID::new(&worker_id);
let reg_guard = state.registry.read().await;
let maybe = reg_guard
.iter()
.find(|(wid, _)| *wid == &id)
.map(|(wid, rec)| ValorApiWorkerDetail {
worker_id: wid.to_string(),
capacity: rec.capacity,
services: rec.services.clone(),
missed_heartbeat_intervals: reg_guard.get_missed_heartbeat_intervals(wid),
});
Ok(Json(ValorApiResponse::success(maybe)))
}
#[utoipa::path(
post,
path = "/api/v1/tasks",
tag = "Master",
request_body = ValorApiCreateTaskRequest,
responses((status = 200, body = ApiResponseString))
)]
async fn create_task(
State(state): State<ApiState>,
Json(request): Json<ValorApiCreateTaskRequest>,
) -> Result<Json<ValorApiResponse<ValorApiCreateTaskResponse>>, StatusCode> {
let union = state.registry.read().await.union_services();
let task_type_for_check = request.task_type.clone();
let requested_id = match &task_type_for_check {
crate::common::task::ValorTaskType::ExecuteService { service_id, .. } => service_id,
};
tracing::info!(
"API: request to create task for service_id={}",
requested_id
);
if !union.contains(requested_id) {
if union.is_empty() {
tracing::warn!(
"API: service catalog empty (no READY workers). Accepting task optimistically: {}",
requested_id
);
} else {
tracing::warn!(
"API: rejected task create, unknown service_id={} (catalog_size={})",
requested_id,
union.len()
);
return Err(StatusCode::BAD_REQUEST);
}
}
let task_id = ValorTaskId::generate();
let default_timeout = crate::config::ValorApplicationConfig::load()
.ok()
.map(|c| c.tuning().task_default_timeout_ms)
.unwrap_or(60_000);
let task = ValorMasterTask {
task_id: task_id.clone(),
task_type: task_type_for_check.clone(),
priority: request.priority.unwrap_or_default(),
timeout_ms: request.timeout_ms.or(Some(default_timeout)),
attempt: 0,
status: crate::common::task::ValorTaskStatus::Pending,
assigned_worker: None,
created_at: current_timestamp_ms(),
assigned_at: None,
completed_at: None,
input: request.input,
output: None,
error: None,
};
let flow_span = tracing::info_span!(
"flow.api.create_task",
task_id = %task_id,
service_id = %requested_id
);
tracing::info!(parent: &flow_span, "API: creating task (pending) and dispatching to master");
let _ = state
.master_actor
.cast(crate::master::ValorMasterMessage::CreateTask(task));
Ok(Json(ValorApiResponse::success(
ValorApiCreateTaskResponse {
task_id: task_id.to_string(),
},
)))
}
#[utoipa::path(
get,
path = "/api/v1/tasks",
tag = "Master",
params(ValorApiListTasksQuery),
responses((status = 200, body = ApiResponseString))
)]
async fn list_tasks(
State(state): State<ApiState>,
Query(query): Query<ValorApiListTasksQuery>,
) -> Result<Json<ValorApiResponse<Vec<ValorApiTaskSummary>>>, StatusCode> {
let mut tasks = if let Some(status) = query.status {
state.task_manager.list_tasks_by_status(status).await
} else {
state.task_manager.list_tasks().await
};
if let Some(filter_worker) = &query.worker_id {
tasks.retain(|t| t.assigned_worker.as_ref() == Some(filter_worker));
}
let mut summaries: Vec<ValorApiTaskSummary> = tasks.into_iter().map(Into::into).collect();
if let Some(limit) = query.limit {
summaries.truncate(limit);
}
Ok(Json(ValorApiResponse::success(summaries)))
}
#[utoipa::path(
get,
path = "/api/v1/tasks/{task_id}",
tag = "Master",
params(("task_id" = String, Path, description = "Task ID")),
responses((status = 200, body = ApiResponseString))
)]
async fn get_task(
State(state): State<ApiState>,
Path(task_id): Path<String>,
) -> Result<Json<ValorApiResponse<Option<ValorApiTaskDetail>>>, StatusCode> {
let task_id = ValorTaskId::new(task_id);
let task = state.task_manager.get_task(&task_id).await;
Ok(Json(ValorApiResponse::success(
task.map(|t| ValorApiTaskDetail { task: t }),
)))
}
#[utoipa::path(
post,
path = "/api/v1/tasks/{task_id}/wait",
tag = "Master",
params(("task_id" = String, Path, description = "Task ID"), ("timeout_ms" = Option<u64>, Query, description = "Timeout in milliseconds")),
responses((status = 200, body = ApiResponseString))
)]
async fn wait_task(
State(state): State<ApiState>,
Path(task_id): Path<String>,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> Result<Json<ValorApiResponse<ValorApiTaskDetail>>, StatusCode> {
let task_id = ValorTaskId::new(task_id);
let timeout_ms = params.get("timeout_ms").and_then(|s| s.parse::<u64>().ok());
match state
.task_manager
.wait_for_terminal(&task_id, timeout_ms)
.await
{
Ok(task) => Ok(Json(ValorApiResponse::success(ValorApiTaskDetail { task }))),
Err(e) if e == "Wait timeout" => Err(StatusCode::REQUEST_TIMEOUT),
Err(_e) => Err(StatusCode::BAD_REQUEST),
}
}
#[utoipa::path(
post,
path = "/api/v1/tasks/{task_id}/cancel",
tag = "Master",
params(("task_id" = String, Path, description = "Task ID")),
responses((status = 200, body = ApiResponseString))
)]
async fn cancel_task(
State(state): State<ApiState>,
Path(task_id): Path<String>,
) -> Result<Json<ValorApiResponse<crate::northbound::ValorApiMessage>>, StatusCode> {
let task_id = ValorTaskId::new(task_id);
match state.task_manager.cancel_task(&task_id).await {
Ok(()) => Ok(Json(ValorApiResponse::success(
crate::northbound::ValorApiMessage {
message: "Task cancelled".to_string(),
},
))),
Err(e) => {
tracing::error!("Failed to cancel task: {}", e);
Err(StatusCode::BAD_REQUEST)
}
}
}
#[utoipa::path(
get,
path = "/api/v1/tasks/stats",
tag = "Master",
responses((status = 200, body = ApiResponseString))
)]
async fn get_task_stats(
State(state): State<ApiState>,
) -> Result<Json<ValorApiResponse<ValorApiTaskStats>>, StatusCode> {
let stats = state.task_manager.get_stats().await;
Ok(Json(ValorApiResponse::success(stats.into())))
}
#[utoipa::path(
get,
path = "/api/v1/services",
tag = "Master",
responses((status = 200, body = ApiResponseString))
)]
async fn list_services(
State(state): State<ApiState>,
) -> Result<Json<ValorApiResponse<Vec<String>>>, StatusCode> {
let ids: Vec<String> = state
.registry
.read()
.await
.union_services()
.into_iter()
.map(|id| id.to_string())
.collect();
Ok(Json(ValorApiResponse::success(ids)))
}
async fn get_nodes_status_handler(
State(state): State<ApiState>,
) -> Result<Json<crate::northbound::ValorNodeStatusResponse>, StatusCode> {
crate::northbound::get_nodes_status(State(state.righgravity_client)).await
}
async fn get_node_status_handler(
State(state): State<ApiState>,
path: axum::extract::Path<String>,
) -> Result<Json<crate::northbound::ValorNodeStatus>, StatusCode> {
crate::northbound::get_node_status(State(state.righgravity_client), path).await
}
async fn handle_node_event_handler(
State(state): State<ApiState>,
Json(event): Json<crate::northbound::ValorNodeStateChangeEvent>,
) -> Result<StatusCode, StatusCode> {
crate::northbound::handle_node_event(State(state.online_nodes_counter), Json(event)).await
}
fn current_timestamp_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
#[derive(Debug, Clone)]
pub struct ValorApiServerConfig {
pub host: String,
pub port: u16,
}
impl Default for ValorApiServerConfig {
fn default() -> Self {
Self {
host: "0.0.0.0".to_string(),
port: 8080,
}
}
}
pub struct ApiServerManager {
config: ValorApiServerConfig,
registry: Arc<RwLock<ValorWorkerRegistry>>,
task_manager: Arc<ValorTaskManager>,
master_actor: ractor::ActorRef<crate::master::ValorMasterMessage>,
righgravity_client: Arc<crate::righgravity_client::RighGravityClient>,
online_nodes_counter: crate::northbound::ValorOnlineNodesCounter,
}
impl ApiServerManager {
pub fn new(
config: ValorApiServerConfig,
registry: Arc<RwLock<ValorWorkerRegistry>>,
_service_registry: Arc<RwLock<ValorServiceRegistry>>,
task_manager: Arc<ValorTaskManager>,
master_actor: ractor::ActorRef<crate::master::ValorMasterMessage>,
righgravity_client: Arc<crate::righgravity_client::RighGravityClient>,
online_nodes_counter: crate::northbound::ValorOnlineNodesCounter,
) -> Self {
Self {
config,
registry,
task_manager,
master_actor,
righgravity_client,
online_nodes_counter,
}
}
pub async fn start(self) -> Result<(), Box<dyn std::error::Error>> {
let api = NorthboundApi::new(
self.registry,
Arc::new(RwLock::new(ValorServiceRegistry::new())),
self.task_manager,
self.master_actor,
self.righgravity_client,
);
let api = NorthboundApi {
registry: api.registry,
task_manager: api.task_manager,
master_actor: api.master_actor,
righgravity_client: api.righgravity_client,
online_nodes_counter: self.online_nodes_counter,
};
let app = api.router();
let addr = format!("{}:{}", self.config.host, self.config.port);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.map_err(|e| format!("Invalid address {addr}: {e}"))?;
tracing::info!("Starting northbound API server on {}", addr);
axum::serve(listener, app.into_make_service())
.await
.map_err(|e| format!("Server error: {e}"))?;
Ok(())
}
}