use axum::{
extract::{Json, Path, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Router,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::info;
use uuid::Uuid;
const VALID_ROLES: &[&str] = &[
"orchestrator",
"spec",
"coder",
"tester",
"reviewer",
"architect",
"security_auditor",
"devops",
"document_writer",
];
fn is_valid_role(role: &str) -> bool {
VALID_ROLES.contains(&role) || role.starts_with("custom:")
}
#[derive(Debug)]
pub enum ControlPlaneError {
NotFound(String),
BadRequest(String),
Conflict(String),
Internal(String),
}
impl std::fmt::Display for ControlPlaneError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotFound(msg) => write!(f, "Not found: {msg}"),
Self::BadRequest(msg) => write!(f, "Bad request: {msg}"),
Self::Conflict(msg) => write!(f, "Conflict: {msg}"),
Self::Internal(msg) => write!(f, "Internal error: {msg}"),
}
}
}
impl IntoResponse for ControlPlaneError {
fn into_response(self) -> axum::response::Response {
let (status, message) = match &self {
Self::NotFound(msg) => (StatusCode::NOT_FOUND, msg.clone()),
Self::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg.clone()),
Self::Conflict(msg) => (StatusCode::CONFLICT, msg.clone()),
Self::Internal(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg.clone()),
};
let body = serde_json::json!({ "error": message });
(status, Json(body)).into_response()
}
}
pub struct ControlPlaneState {
pub deployments: Arc<RwLock<HashMap<Uuid, DeploymentInfo>>>,
pub agent_definitions: Arc<RwLock<HashMap<Uuid, AgentDefinitionInfo>>>,
pub health_states: Arc<RwLock<HashMap<Uuid, AgentHealthInfo>>>,
pub events: Arc<RwLock<Vec<ControlPlaneEvent>>>,
pub started_at: DateTime<Utc>,
}
impl ControlPlaneState {
pub fn new() -> Self {
Self {
deployments: Arc::new(RwLock::new(HashMap::new())),
agent_definitions: Arc::new(RwLock::new(HashMap::new())),
health_states: Arc::new(RwLock::new(HashMap::new())),
events: Arc::new(RwLock::new(Vec::new())),
started_at: Utc::now(),
}
}
}
impl Default for ControlPlaneState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployRequest {
pub name: String,
pub role: String,
pub replicas: Option<u32>,
pub auto_restart: Option<bool>,
pub max_concurrent_tasks: Option<u32>,
pub tags: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentInfo {
pub id: Uuid,
pub name: String,
pub role: String,
pub replicas: u32,
pub status: String,
pub auto_restart: bool,
pub instances: Vec<InstanceInfo>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub total_tasks: u64,
pub total_errors: u64,
pub tags: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InstanceInfo {
pub id: Uuid,
pub replica_index: u32,
pub status: String,
pub started_at: DateTime<Utc>,
pub last_heartbeat: Option<DateTime<Utc>>,
pub tasks_completed: u64,
pub errors: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ScaleRequest {
pub replicas: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentDefinitionInfo {
pub id: Uuid,
pub name: String,
pub role: String,
pub version: String,
pub description: String,
pub capabilities: Vec<String>,
pub tags: HashMap<String, String>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RegisterAgentRequest {
pub name: String,
pub role: String,
pub version: Option<String>,
pub description: Option<String>,
pub capabilities: Option<Vec<String>>,
pub tags: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentHealthInfo {
pub agent_id: Uuid,
pub agent_name: String,
pub status: String,
pub last_heartbeat: Option<DateTime<Utc>>,
pub restart_count: u32,
pub uptime_secs: i64,
pub probes: Vec<ProbeInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProbeInfo {
pub name: String,
pub status: String,
pub last_check: Option<DateTime<Utc>>,
pub consecutive_failures: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ControlPlaneEvent {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub event_type: String,
pub deployment_id: Option<Uuid>,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ControlPlaneSummary {
pub total_deployments: usize,
pub running_deployments: usize,
pub total_instances: usize,
pub running_instances: usize,
pub total_registered_agents: usize,
pub healthy_agents: usize,
pub unhealthy_agents: usize,
pub total_tasks_completed: u64,
pub total_errors: u64,
pub uptime_secs: i64,
pub recent_events: Vec<ControlPlaneEvent>,
}
async fn record_event(
events: &RwLock<Vec<ControlPlaneEvent>>,
event_type: &str,
deployment_id: Option<Uuid>,
message: impl Into<String>,
) {
let event = ControlPlaneEvent {
id: Uuid::new_v4(),
timestamp: Utc::now(),
event_type: event_type.to_string(),
deployment_id,
message: message.into(),
};
events.write().await.push(event);
}
fn create_instances(replicas: u32) -> Vec<InstanceInfo> {
let now = Utc::now();
(0..replicas)
.map(|i| InstanceInfo {
id: Uuid::new_v4(),
replica_index: i,
status: "running".to_string(),
started_at: now,
last_heartbeat: Some(now),
tasks_completed: 0,
errors: 0,
})
.collect()
}
pub fn control_plane_router(state: Arc<ControlPlaneState>) -> Router {
Router::new()
.route(
"/api/v1/control-plane/deployments",
get(list_deployments).post(create_deployment),
)
.route(
"/api/v1/control-plane/deployments/{id}",
get(get_deployment).delete(delete_deployment),
)
.route(
"/api/v1/control-plane/deployments/{id}/scale",
post(scale_deployment),
)
.route(
"/api/v1/control-plane/deployments/{id}/restart",
post(restart_deployment),
)
.route(
"/api/v1/control-plane/deployments/{id}/stop",
post(stop_deployment),
)
.route(
"/api/v1/control-plane/deployments/{id}/start",
post(start_deployment),
)
.route(
"/api/v1/control-plane/agents",
get(list_agents).post(register_agent),
)
.route(
"/api/v1/control-plane/agents/{id}",
get(get_agent).delete(delete_agent),
)
.route("/api/v1/control-plane/health", get(get_health_summary))
.route("/api/v1/control-plane/health/{id}", get(get_agent_health))
.route(
"/api/v1/control-plane/health/{id}/heartbeat",
post(record_heartbeat),
)
.route("/api/v1/control-plane/summary", get(get_summary))
.route("/api/v1/control-plane/events", get(list_events))
.with_state(state)
}
async fn list_deployments(
State(state): State<Arc<ControlPlaneState>>,
) -> Result<Json<Vec<DeploymentInfo>>, ControlPlaneError> {
let deployments = state.deployments.read().await;
let list: Vec<DeploymentInfo> = deployments.values().cloned().collect();
Ok(Json(list))
}
async fn create_deployment(
State(state): State<Arc<ControlPlaneState>>,
Json(req): Json<DeployRequest>,
) -> Result<(StatusCode, Json<DeploymentInfo>), ControlPlaneError> {
if req.name.trim().is_empty() {
return Err(ControlPlaneError::BadRequest(
"Deployment name must not be empty".to_string(),
));
}
if !is_valid_role(&req.role) {
return Err(ControlPlaneError::BadRequest(format!(
"Invalid role '{}'. Valid roles: {:?}, or 'custom:<name>'",
req.role, VALID_ROLES
)));
}
let replicas = req.replicas.unwrap_or(1);
if replicas == 0 {
return Err(ControlPlaneError::BadRequest(
"Replicas must be greater than 0".to_string(),
));
}
let now = Utc::now();
let id = Uuid::new_v4();
let instances = create_instances(replicas);
let deployment = DeploymentInfo {
id,
name: req.name.clone(),
role: req.role.clone(),
replicas,
status: "running".to_string(),
auto_restart: req.auto_restart.unwrap_or(true),
instances,
created_at: now,
updated_at: now,
total_tasks: 0,
total_errors: 0,
tags: req.tags.unwrap_or_default(),
};
state
.deployments
.write()
.await
.insert(id, deployment.clone());
record_event(
&state.events,
"deployed",
Some(id),
format!("Deployed '{}' with {} replica(s)", req.name, replicas),
)
.await;
info!(deployment_id = %id, name = %req.name, role = %req.role, replicas, "Deployment created");
Ok((StatusCode::CREATED, Json(deployment)))
}
async fn get_deployment(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<DeploymentInfo>, ControlPlaneError> {
let deployments = state.deployments.read().await;
let deployment = deployments
.get(&id)
.cloned()
.ok_or_else(|| ControlPlaneError::NotFound(format!("Deployment {id} not found")))?;
Ok(Json(deployment))
}
async fn delete_deployment(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, ControlPlaneError> {
let removed = state.deployments.write().await.remove(&id);
match removed {
Some(dep) => {
record_event(
&state.events,
"undeployed",
Some(id),
format!("Undeployed '{}'", dep.name),
)
.await;
info!(deployment_id = %id, name = %dep.name, "Deployment deleted");
Ok(Json(serde_json::json!({
"deleted": true,
"deployment_id": id,
})))
}
None => Err(ControlPlaneError::NotFound(format!(
"Deployment {id} not found"
))),
}
}
async fn scale_deployment(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
Json(req): Json<ScaleRequest>,
) -> Result<Json<DeploymentInfo>, ControlPlaneError> {
if req.replicas == 0 {
return Err(ControlPlaneError::BadRequest(
"Replicas must be greater than 0".to_string(),
));
}
let mut deployments = state.deployments.write().await;
let deployment = deployments
.get_mut(&id)
.ok_or_else(|| ControlPlaneError::NotFound(format!("Deployment {id} not found")))?;
let old_replicas = deployment.replicas;
let now = Utc::now();
if req.replicas > old_replicas {
for i in old_replicas..req.replicas {
deployment.instances.push(InstanceInfo {
id: Uuid::new_v4(),
replica_index: i,
status: "running".to_string(),
started_at: now,
last_heartbeat: Some(now),
tasks_completed: 0,
errors: 0,
});
}
} else if req.replicas < old_replicas {
deployment.instances.truncate(req.replicas as usize);
}
deployment.replicas = req.replicas;
deployment.updated_at = now;
let result = deployment.clone();
let name = deployment.name.clone();
drop(deployments);
record_event(
&state.events,
"scaled",
Some(id),
format!(
"Scaled '{}' from {} to {} replica(s)",
name, old_replicas, req.replicas
),
)
.await;
info!(deployment_id = %id, from = old_replicas, to = req.replicas, "Deployment scaled");
Ok(Json(result))
}
async fn restart_deployment(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<DeploymentInfo>, ControlPlaneError> {
let mut deployments = state.deployments.write().await;
let deployment = deployments
.get_mut(&id)
.ok_or_else(|| ControlPlaneError::NotFound(format!("Deployment {id} not found")))?;
let now = Utc::now();
for instance in &mut deployment.instances {
instance.status = "running".to_string();
instance.started_at = now;
instance.last_heartbeat = Some(now);
}
deployment.status = "running".to_string();
deployment.updated_at = now;
let result = deployment.clone();
let name = deployment.name.clone();
drop(deployments);
record_event(
&state.events,
"restarted",
Some(id),
format!("Restarted all instances of '{name}'"),
)
.await;
info!(deployment_id = %id, "Deployment restarted");
Ok(Json(result))
}
async fn stop_deployment(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<DeploymentInfo>, ControlPlaneError> {
let mut deployments = state.deployments.write().await;
let deployment = deployments
.get_mut(&id)
.ok_or_else(|| ControlPlaneError::NotFound(format!("Deployment {id} not found")))?;
let now = Utc::now();
for instance in &mut deployment.instances {
instance.status = "stopped".to_string();
}
deployment.status = "stopped".to_string();
deployment.updated_at = now;
let result = deployment.clone();
let name = deployment.name.clone();
drop(deployments);
record_event(
&state.events,
"stopped",
Some(id),
format!("Stopped deployment '{name}'"),
)
.await;
info!(deployment_id = %id, "Deployment stopped");
Ok(Json(result))
}
async fn start_deployment(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<DeploymentInfo>, ControlPlaneError> {
let mut deployments = state.deployments.write().await;
let deployment = deployments
.get_mut(&id)
.ok_or_else(|| ControlPlaneError::NotFound(format!("Deployment {id} not found")))?;
let now = Utc::now();
for instance in &mut deployment.instances {
instance.status = "running".to_string();
instance.started_at = now;
instance.last_heartbeat = Some(now);
}
deployment.status = "running".to_string();
deployment.updated_at = now;
let result = deployment.clone();
let name = deployment.name.clone();
drop(deployments);
record_event(
&state.events,
"started",
Some(id),
format!("Started deployment '{name}'"),
)
.await;
info!(deployment_id = %id, "Deployment started");
Ok(Json(result))
}
async fn list_agents(
State(state): State<Arc<ControlPlaneState>>,
) -> Result<Json<Vec<AgentDefinitionInfo>>, ControlPlaneError> {
let agents = state.agent_definitions.read().await;
let list: Vec<AgentDefinitionInfo> = agents.values().cloned().collect();
Ok(Json(list))
}
async fn register_agent(
State(state): State<Arc<ControlPlaneState>>,
Json(req): Json<RegisterAgentRequest>,
) -> Result<(StatusCode, Json<AgentDefinitionInfo>), ControlPlaneError> {
if req.name.trim().is_empty() {
return Err(ControlPlaneError::BadRequest(
"Agent name must not be empty".to_string(),
));
}
if !is_valid_role(&req.role) {
return Err(ControlPlaneError::BadRequest(format!(
"Invalid role '{}'. Valid roles: {:?}, or 'custom:<name>'",
req.role, VALID_ROLES
)));
}
let id = Uuid::new_v4();
let now = Utc::now();
let agent_def = AgentDefinitionInfo {
id,
name: req.name.clone(),
role: req.role.clone(),
version: req.version.unwrap_or_else(|| "0.1.0".to_string()),
description: req.description.unwrap_or_default(),
capabilities: req.capabilities.unwrap_or_default(),
tags: req.tags.unwrap_or_default(),
created_at: now,
};
state
.agent_definitions
.write()
.await
.insert(id, agent_def.clone());
let health = AgentHealthInfo {
agent_id: id,
agent_name: req.name.clone(),
status: "healthy".to_string(),
last_heartbeat: Some(now),
restart_count: 0,
uptime_secs: 0,
probes: vec![ProbeInfo {
name: "liveness".to_string(),
status: "ok".to_string(),
last_check: Some(now),
consecutive_failures: 0,
}],
};
state.health_states.write().await.insert(id, health);
record_event(
&state.events,
"registered",
None,
format!(
"Registered agent definition '{}' (role: {})",
req.name, req.role
),
)
.await;
info!(agent_id = %id, name = %req.name, role = %req.role, "Agent definition registered");
Ok((StatusCode::CREATED, Json(agent_def)))
}
async fn get_agent(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<AgentDefinitionInfo>, ControlPlaneError> {
let agents = state.agent_definitions.read().await;
let agent = agents
.get(&id)
.cloned()
.ok_or_else(|| ControlPlaneError::NotFound(format!("Agent definition {id} not found")))?;
Ok(Json(agent))
}
async fn delete_agent(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, ControlPlaneError> {
let removed = state.agent_definitions.write().await.remove(&id);
match removed {
Some(agent) => {
state.health_states.write().await.remove(&id);
record_event(
&state.events,
"unregistered",
None,
format!("Unregistered agent definition '{}'", agent.name),
)
.await;
info!(agent_id = %id, name = %agent.name, "Agent definition unregistered");
Ok(Json(serde_json::json!({
"deleted": true,
"agent_id": id,
})))
}
None => Err(ControlPlaneError::NotFound(format!(
"Agent definition {id} not found"
))),
}
}
async fn get_health_summary(
State(state): State<Arc<ControlPlaneState>>,
) -> Result<Json<Vec<AgentHealthInfo>>, ControlPlaneError> {
let healths = state.health_states.read().await;
let list: Vec<AgentHealthInfo> = healths.values().cloned().collect();
Ok(Json(list))
}
async fn get_agent_health(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<AgentHealthInfo>, ControlPlaneError> {
let healths = state.health_states.read().await;
let health = healths.get(&id).cloned().ok_or_else(|| {
ControlPlaneError::NotFound(format!("Health info for agent {id} not found"))
})?;
Ok(Json(health))
}
async fn record_heartbeat(
State(state): State<Arc<ControlPlaneState>>,
Path(id): Path<Uuid>,
) -> Result<Json<AgentHealthInfo>, ControlPlaneError> {
let mut healths = state.health_states.write().await;
let health = healths.get_mut(&id).ok_or_else(|| {
ControlPlaneError::NotFound(format!("Health info for agent {id} not found"))
})?;
let now = Utc::now();
health.last_heartbeat = Some(now);
health.status = "healthy".to_string();
for probe in &mut health.probes {
if probe.name == "liveness" {
probe.status = "ok".to_string();
probe.last_check = Some(now);
probe.consecutive_failures = 0;
}
}
health.uptime_secs = now.signed_duration_since(state.started_at).num_seconds();
let result = health.clone();
Ok(Json(result))
}
async fn get_summary(
State(state): State<Arc<ControlPlaneState>>,
) -> Result<Json<ControlPlaneSummary>, ControlPlaneError> {
let deployments = state.deployments.read().await;
let healths = state.health_states.read().await;
let events = state.events.read().await;
let agents = state.agent_definitions.read().await;
let now = Utc::now();
let total_deployments = deployments.len();
let running_deployments = deployments
.values()
.filter(|d| d.status == "running")
.count();
let total_instances: usize = deployments.values().map(|d| d.instances.len()).sum();
let running_instances: usize = deployments
.values()
.flat_map(|d| &d.instances)
.filter(|i| i.status == "running")
.count();
let total_tasks_completed: u64 = deployments.values().map(|d| d.total_tasks).sum();
let total_errors: u64 = deployments.values().map(|d| d.total_errors).sum();
let healthy_agents = healths.values().filter(|h| h.status == "healthy").count();
let unhealthy_agents = healths.values().filter(|h| h.status != "healthy").count();
let uptime_secs = now.signed_duration_since(state.started_at).num_seconds();
let recent_events: Vec<ControlPlaneEvent> = events.iter().rev().take(20).cloned().collect();
Ok(Json(ControlPlaneSummary {
total_deployments,
running_deployments,
total_instances,
running_instances,
total_registered_agents: agents.len(),
healthy_agents,
unhealthy_agents,
total_tasks_completed,
total_errors,
uptime_secs,
recent_events,
}))
}
async fn list_events(
State(state): State<Arc<ControlPlaneState>>,
) -> Result<Json<Vec<ControlPlaneEvent>>, ControlPlaneError> {
let events = state.events.read().await;
Ok(Json(events.clone()))
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::Request;
use tower::ServiceExt;
fn test_state() -> Arc<ControlPlaneState> {
Arc::new(ControlPlaneState::new())
}
async fn body_json<T: serde::de::DeserializeOwned>(resp: axum::http::Response<Body>) -> T {
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
serde_json::from_slice(&body).unwrap()
}
fn post_json(uri: &str, body: &impl Serialize) -> Request<Body> {
Request::builder()
.method("POST")
.uri(uri)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(body).unwrap()))
.unwrap()
}
async fn create_test_deployment(state: &Arc<ControlPlaneState>) -> Uuid {
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/deployments",
&DeployRequest {
name: "test-coder".to_string(),
role: "coder".to_string(),
replicas: Some(2),
auto_restart: None,
max_concurrent_tasks: None,
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let dep: DeploymentInfo = body_json(resp).await;
dep.id
}
#[tokio::test]
async fn test_create_deployment() {
let state = test_state();
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/deployments",
&DeployRequest {
name: "my-coder".to_string(),
role: "coder".to_string(),
replicas: Some(3),
auto_restart: Some(false),
max_concurrent_tasks: Some(5),
tags: Some(HashMap::from([("env".to_string(), "prod".to_string())])),
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.name, "my-coder");
assert_eq!(dep.role, "coder");
assert_eq!(dep.replicas, 3);
assert!(!dep.auto_restart);
assert_eq!(dep.status, "running");
assert_eq!(dep.instances.len(), 3);
assert_eq!(dep.tags.get("env").unwrap(), "prod");
let deployments = state.deployments.read().await;
assert_eq!(deployments.len(), 1);
}
#[tokio::test]
async fn test_list_deployments() {
let state = test_state();
create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/deployments",
&DeployRequest {
name: "test-tester".to_string(),
role: "tester".to_string(),
replicas: None,
auto_restart: None,
max_concurrent_tasks: None,
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let app = control_plane_router(state.clone());
let req = Request::builder()
.uri("/api/v1/control-plane/deployments")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let deps: Vec<DeploymentInfo> = body_json(resp).await;
assert_eq!(deps.len(), 2);
}
#[tokio::test]
async fn test_get_deployment_by_id() {
let state = test_state();
let dep_id = create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = Request::builder()
.uri(format!("/api/v1/control-plane/deployments/{dep_id}"))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.id, dep_id);
assert_eq!(dep.name, "test-coder");
}
#[tokio::test]
async fn test_delete_deployment() {
let state = test_state();
let dep_id = create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = Request::builder()
.method("DELETE")
.uri(format!("/api/v1/control-plane/deployments/{dep_id}"))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body: serde_json::Value = body_json(resp).await;
assert_eq!(body["deleted"], true);
let deployments = state.deployments.read().await;
assert!(deployments.is_empty());
}
#[tokio::test]
async fn test_scale_deployment() {
let state = test_state();
let dep_id = create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = post_json(
&format!("/api/v1/control-plane/deployments/{dep_id}/scale"),
&ScaleRequest { replicas: 5 },
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.replicas, 5);
assert_eq!(dep.instances.len(), 5);
let app = control_plane_router(state.clone());
let req = post_json(
&format!("/api/v1/control-plane/deployments/{dep_id}/scale"),
&ScaleRequest { replicas: 1 },
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.replicas, 1);
assert_eq!(dep.instances.len(), 1);
}
#[tokio::test]
async fn test_restart_deployment() {
let state = test_state();
let dep_id = create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = Request::builder()
.method("POST")
.uri(format!("/api/v1/control-plane/deployments/{dep_id}/stop"))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.status, "stopped");
let app = control_plane_router(state.clone());
let req = Request::builder()
.method("POST")
.uri(format!(
"/api/v1/control-plane/deployments/{dep_id}/restart"
))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.status, "running");
for instance in &dep.instances {
assert_eq!(instance.status, "running");
}
}
#[tokio::test]
async fn test_register_agent() {
let state = test_state();
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/agents",
&RegisterAgentRequest {
name: "code-writer".to_string(),
role: "coder".to_string(),
version: Some("1.0.0".to_string()),
description: Some("Writes Rust code".to_string()),
capabilities: Some(vec!["file_write".to_string(), "shell".to_string()]),
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let agent: AgentDefinitionInfo = body_json(resp).await;
assert_eq!(agent.name, "code-writer");
assert_eq!(agent.role, "coder");
assert_eq!(agent.version, "1.0.0");
assert_eq!(agent.capabilities.len(), 2);
let agents = state.agent_definitions.read().await;
assert_eq!(agents.len(), 1);
let healths = state.health_states.read().await;
assert_eq!(healths.len(), 1);
}
#[tokio::test]
async fn test_list_agents() {
let state = test_state();
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/agents",
&RegisterAgentRequest {
name: "agent-a".to_string(),
role: "coder".to_string(),
version: None,
description: None,
capabilities: None,
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/agents",
&RegisterAgentRequest {
name: "agent-b".to_string(),
role: "tester".to_string(),
version: None,
description: None,
capabilities: None,
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let app = control_plane_router(state.clone());
let req = Request::builder()
.uri("/api/v1/control-plane/agents")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let agents: Vec<AgentDefinitionInfo> = body_json(resp).await;
assert_eq!(agents.len(), 2);
}
#[tokio::test]
async fn test_get_health_summary_empty() {
let state = test_state();
let app = control_plane_router(state.clone());
let req = Request::builder()
.uri("/api/v1/control-plane/health")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let healths: Vec<AgentHealthInfo> = body_json(resp).await;
assert!(healths.is_empty());
}
#[tokio::test]
async fn test_record_heartbeat() {
let state = test_state();
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/agents",
&RegisterAgentRequest {
name: "heartbeat-agent".to_string(),
role: "coder".to_string(),
version: None,
description: None,
capabilities: None,
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let agent: AgentDefinitionInfo = body_json(resp).await;
let agent_id = agent.id;
let app = control_plane_router(state.clone());
let req = Request::builder()
.method("POST")
.uri(format!("/api/v1/control-plane/health/{agent_id}/heartbeat"))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let health: AgentHealthInfo = body_json(resp).await;
assert_eq!(health.status, "healthy");
assert!(health.last_heartbeat.is_some());
}
#[tokio::test]
async fn test_get_summary() {
let state = test_state();
create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/agents",
&RegisterAgentRequest {
name: "summary-agent".to_string(),
role: "tester".to_string(),
version: None,
description: None,
capabilities: None,
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let app = control_plane_router(state.clone());
let req = Request::builder()
.uri("/api/v1/control-plane/summary")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let summary: ControlPlaneSummary = body_json(resp).await;
assert_eq!(summary.total_deployments, 1);
assert_eq!(summary.running_deployments, 1);
assert_eq!(summary.total_instances, 2);
assert_eq!(summary.running_instances, 2);
assert_eq!(summary.total_registered_agents, 1);
assert_eq!(summary.healthy_agents, 1);
assert_eq!(summary.unhealthy_agents, 0);
assert!(summary.uptime_secs >= 0);
assert!(!summary.recent_events.is_empty());
}
#[tokio::test]
async fn test_deploy_with_invalid_role_returns_400() {
let state = test_state();
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/deployments",
&DeployRequest {
name: "bad-deploy".to_string(),
role: "nonexistent_role".to_string(),
replicas: None,
auto_restart: None,
max_concurrent_tasks: None,
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let body: serde_json::Value = body_json(resp).await;
assert!(body["error"].as_str().unwrap().contains("Invalid role"));
}
#[tokio::test]
async fn test_delete_nonexistent_returns_404() {
let state = test_state();
let app = control_plane_router(state.clone());
let fake_id = Uuid::new_v4();
let req = Request::builder()
.method("DELETE")
.uri(format!("/api/v1/control-plane/deployments/{fake_id}"))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_list_events() {
let state = test_state();
create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = Request::builder()
.uri("/api/v1/control-plane/events")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let events: Vec<ControlPlaneEvent> = body_json(resp).await;
assert!(!events.is_empty());
assert_eq!(events[0].event_type, "deployed");
}
#[tokio::test]
async fn test_stop_and_start_deployment() {
let state = test_state();
let dep_id = create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = Request::builder()
.method("POST")
.uri(format!("/api/v1/control-plane/deployments/{dep_id}/stop"))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.status, "stopped");
for inst in &dep.instances {
assert_eq!(inst.status, "stopped");
}
let app = control_plane_router(state.clone());
let req = Request::builder()
.method("POST")
.uri(format!("/api/v1/control-plane/deployments/{dep_id}/start"))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.status, "running");
for inst in &dep.instances {
assert_eq!(inst.status, "running");
}
}
#[tokio::test]
async fn test_scale_deployment_zero_replicas_returns_400() {
let state = test_state();
let dep_id = create_test_deployment(&state).await;
let app = control_plane_router(state.clone());
let req = post_json(
&format!("/api/v1/control-plane/deployments/{dep_id}/scale"),
&ScaleRequest { replicas: 0 },
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_deploy_custom_role() {
let state = test_state();
let app = control_plane_router(state.clone());
let req = post_json(
"/api/v1/control-plane/deployments",
&DeployRequest {
name: "custom-agent".to_string(),
role: "custom:data_pipeline".to_string(),
replicas: Some(1),
auto_restart: None,
max_concurrent_tasks: None,
tags: None,
},
);
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let dep: DeploymentInfo = body_json(resp).await;
assert_eq!(dep.role, "custom:data_pipeline");
}
}