pub mod api_docs;
mod chat_handlers;
mod hitl_handlers;
mod registration_handlers;
mod registry_handlers;
mod status_handlers;
use super::SharedAgentStatus;
use crate::agents::{AgentConfig, ChatCapable};
use crate::orchestrator_registry::OrchestratorRegistry;
use crate::workers::buffer::ResponseBuffer;
use axum::{
Router,
extract::State,
http::header,
response::{Html, IntoResponse, Json},
routing::{get, post, put},
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64};
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use utoipa::ToSchema;
#[derive(Clone)]
pub(crate) struct MultiAppState {
statuses: HashMap<String, SharedAgentStatus>,
chat_agents: HashMap<String, Arc<dyn ChatCapable>>,
configs: HashMap<String, Arc<RwLock<AgentConfig>>>,
buffers: HashMap<String, Arc<ResponseBuffer>>,
pause_handles: HashMap<String, Arc<AtomicBool>>,
orchestrator_registry: Option<OrchestratorRegistry>,
base_hold_secs: Arc<AtomicU64>,
response_sla_secs: Arc<AtomicU64>,
buffer_floor_pct: Arc<AtomicU64>,
before_release_middleware: Option<Arc<crate::middleware::pipeline::MiddlewarePipeline>>,
}
#[derive(Serialize, ToSchema)]
pub(super) struct GlobalConfig {
base_hold_secs: u64,
response_sla_secs: u64,
buffer_floor_pct: u64,
}
#[derive(Deserialize, ToSchema)]
pub(super) struct GlobalConfigUpdate {
base_hold_secs: Option<u64>,
response_sla_secs: Option<u64>,
buffer_floor_pct: Option<u64>,
}
pub struct MultiAgentStatusServer;
fn resolve_dashboard_bind(raw: Option<&str>) -> std::net::IpAddr {
raw.and_then(|s| s.parse().ok())
.unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST))
}
impl MultiAgentStatusServer {
pub async fn run(
port: u16,
statuses: HashMap<String, SharedAgentStatus>,
chat_agents: HashMap<String, Arc<dyn ChatCapable>>,
configs: HashMap<String, AgentConfig>,
) {
Self::run_with_registry(port, statuses, chat_agents, configs, None).await;
}
pub async fn run_with_registry(
port: u16,
statuses: HashMap<String, SharedAgentStatus>,
chat_agents: HashMap<String, Arc<dyn ChatCapable>>,
configs: HashMap<String, AgentConfig>,
registry: Option<OrchestratorRegistry>,
) {
let rw_configs = configs
.into_iter()
.map(|(k, v)| (k, Arc::new(RwLock::new(v))))
.collect();
Self::run_control_plane(
port,
statuses,
chat_agents,
rw_configs,
HashMap::new(),
HashMap::new(),
registry,
None, )
.await;
}
#[allow(clippy::too_many_arguments)]
pub async fn run_control_plane(
port: u16,
statuses: HashMap<String, SharedAgentStatus>,
chat_agents: HashMap<String, Arc<dyn ChatCapable>>,
configs: HashMap<String, Arc<RwLock<AgentConfig>>>,
buffers: HashMap<String, Arc<ResponseBuffer>>,
pause_handles: HashMap<String, Arc<AtomicBool>>,
registry: Option<OrchestratorRegistry>,
middleware: Option<Arc<crate::middleware::pipeline::MiddlewarePipeline>>,
) {
let base_hold = buffers
.values()
.map(|b| b.base_hold_duration())
.max()
.unwrap_or(std::time::Duration::ZERO);
let base_secs = base_hold.as_secs();
let sla_secs = base_secs;
let agent_count = configs.len();
let state = MultiAppState {
statuses,
chat_agents,
configs,
buffers,
pause_handles,
orchestrator_registry: registry,
base_hold_secs: Arc::new(AtomicU64::new(base_secs)),
response_sla_secs: Arc::new(AtomicU64::new(sla_secs)),
buffer_floor_pct: Arc::new(AtomicU64::new(0)), before_release_middleware: middleware,
};
for buf in state.buffers.values() {
buf.set_response_sla(base_hold);
}
let app = build_router(state);
let ip = resolve_dashboard_bind(std::env::var("QUORUM_DASHBOARD_BIND").ok().as_deref());
let addr = SocketAddr::from((ip, port));
if !ip.is_loopback() {
warn!(
bind = %ip,
"dashboard bound to non-loopback address — control plane is reachable \
from the network with no built-in authentication. Restrict access via \
the host firewall, an external reverse proxy with auth, or revert to \
the loopback default."
);
}
info!(
"Multi-agent dashboard → http://{}/ ({} agents) Swagger UI → http://{}/swagger-ui/",
addr, agent_count, addr
);
let listener = match tokio::net::TcpListener::bind(addr).await {
Ok(l) => l,
Err(e) => {
error!("Failed to bind multi-agent server on port {}: {}", port, e);
return;
}
};
if let Err(e) = axum::serve(listener, app).await {
error!("Multi-agent server error: {}", e);
}
}
}
#[derive(Serialize, ToSchema)]
pub(super) struct AgentSummary {
name: String,
model_name: String,
provider_id: String,
nats_connected: bool,
current_job: Option<String>,
current_phase: Option<String>,
has_chat: bool,
is_paused: bool,
buffered_count: u32,
error_rate: f32,
mean_score: Option<f32>,
score_std_dev: Option<f32>,
avg_response_ms: Option<u64>,
is_flagged: bool,
flag_reason: Option<String>,
auto_approve: bool,
auto_approve_threshold: f32,
}
#[utoipa::path(
get,
path = "/api/agents",
responses(
(status = 200, description = "List of all agents with summary status", body = Vec<AgentSummary>)
),
tag = "Agents"
)]
pub(super) async fn list_agents(State(state): State<MultiAppState>) -> Json<Vec<AgentSummary>> {
let mut agents = Vec::new();
for (name, config) in &state.configs {
let config = config.read().await;
let (
nats_connected,
current_job,
current_phase,
is_paused,
buffered_count,
error_rate,
mean_score,
score_std_dev,
avg_response_ms,
is_flagged,
flag_reason,
) = if let Some(status) = state.statuses.get(name) {
let snap = status.read().await;
let avg_ms = if snap.recent_tasks.is_empty() {
None
} else {
let total: u64 = snap.recent_tasks.iter().map(|t| t.duration_ms).sum();
Some(total / snap.recent_tasks.len() as u64)
};
(
snap.nats_connected,
snap.current_job.clone(),
snap.current_phase.clone(),
snap.is_paused,
snap.buffered_count,
snap.error_rate,
snap.mean_score,
snap.score_std_dev,
avg_ms,
snap.is_flagged,
snap.flag_reason.clone(),
)
} else {
(
false, None, None, false, 0, 0.0, None, None, None, false, None,
)
};
let buffered_count = if let Some(buf) = state.buffers.get(name) {
buf.len().await as u32
} else {
buffered_count
};
let (auto_approve, auto_approve_threshold) = state
.buffers
.get(name)
.map(|b| (b.is_auto_approve(), b.auto_approve_threshold()))
.unwrap_or((true, 1.0));
agents.push(AgentSummary {
name: name.clone(),
model_name: config.model_name.clone(),
provider_id: config.provider_id.clone(),
nats_connected,
current_job,
current_phase,
has_chat: state.chat_agents.contains_key(name),
is_paused,
buffered_count,
error_rate,
mean_score,
score_std_dev,
avg_response_ms,
is_flagged,
flag_reason,
auto_approve,
auto_approve_threshold,
});
}
agents.sort_by(|a, b| a.name.cmp(&b.name));
Json(agents)
}
fn build_router(state: MultiAppState) -> Router {
use utoipa::OpenApi;
let swagger_ui = utoipa_swagger_ui::SwaggerUi::new("/swagger-ui")
.url("/api-docs/openapi.json", api_docs::ApiDoc::openapi());
Router::new()
.merge(swagger_ui)
.route("/", get(dashboard_page))
.route(
"/api/config",
get(registry_handlers::get_global_config).put(registry_handlers::update_global_config),
)
.route("/api/agents", get(list_agents))
.route(
"/api/agents/register",
post(registration_handlers::register_agent),
)
.route(
"/api/agents/bulk",
post(registration_handlers::bulk_register),
)
.route(
"/api/agents/pause-all",
put(hitl_handlers::pause_all_agents),
)
.route("/api/agents/auto-all", put(hitl_handlers::auto_all_agents))
.route(
"/api/agents/{name}/status",
get(status_handlers::agent_status),
)
.route(
"/api/agents/{name}/config",
get(status_handlers::agent_config).put(hitl_handlers::agent_config_update),
)
.route("/api/agents/{name}/chat", post(chat_handlers::agent_chat))
.route("/api/agents/{name}/pause", put(hitl_handlers::agent_pause))
.route(
"/api/agents/{name}/auto",
put(hitl_handlers::agent_auto_approve),
)
.route(
"/api/agents/{name}/buffer",
get(hitl_handlers::agent_buffer_list),
)
.route(
"/api/agents/{name}/buffer/{id}",
get(hitl_handlers::agent_buffer_detail).put(hitl_handlers::agent_buffer_edit),
)
.route(
"/api/agents/{name}/buffer/{id}/release",
post(hitl_handlers::agent_buffer_release),
)
.route(
"/api/agents/{name}/buffer/{id}/reject",
post(hitl_handlers::agent_buffer_reject),
)
.route(
"/api/agents/{name}/buffer/{id}/stop",
post(hitl_handlers::agent_buffer_stop),
)
.route(
"/api/agents/{name}/buffer/{id}/unstop",
post(hitl_handlers::agent_buffer_unstop),
)
.route(
"/api/agents/{id}/manage",
put(registration_handlers::replace_agent)
.patch(registration_handlers::patch_agent)
.delete(registration_handlers::delete_agent),
)
.route(
"/api/orchestrators",
get(registry_handlers::list_orchestrators).post(registry_handlers::add_orchestrator),
)
.route(
"/api/orchestrators/budgets",
get(registry_handlers::get_orchestrator_budgets),
)
.route(
"/api/orchestrators/{orch_id}/proxy/{*path}",
get(registry_handlers::proxy_orchestrator_get)
.post(registry_handlers::proxy_orchestrator_post),
)
.route(
"/api/orchestrators/{orch_id}/stream/{job_id}",
get(registry_handlers::proxy_orchestrator_sse),
)
.with_state(state)
}
#[utoipa::path(
get,
path = "/",
responses(
(status = 200, description = "Multi-agent dashboard HTML page", content_type = "text/html")
),
tag = "Dashboard"
)]
async fn dashboard_page() -> impl IntoResponse {
let html = include_str!("../multi_status.html");
([(header::CACHE_CONTROL, "no-store")], Html(html))
}
#[cfg(test)]
mod tests;