use crate::connection::HealthContext;
use crate::peers::prune_stale_peers;
use crate::peers::{AliveTable, AliveTx};
use crate::workers::WorkerTable;
use crate::{load_manager_name, load_profile_host, load_tcp_port};
use serde::Deserialize;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tracing::{error, info, warn};
#[derive(Clone)]
pub struct ControlContext {
pub health: HealthContext,
pub peers: AliveTable,
pub workers: WorkerTable,
pub profile: String,
pub self_id: String,
pub peer_version: Arc<AtomicU64>,
pub alive_tx: AliveTx,
}
#[cfg(unix)]
pub fn maybe_spawn_control_server(socket_path: std::path::PathBuf, ctx: ControlContext) {
use tokio::net::UnixListener;
let _ = std::fs::remove_file(&socket_path);
info!(
target = "control",
profile = %ctx.profile,
path = %socket_path.display(),
"starting control server"
);
tokio::spawn(async move {
match UnixListener::bind(&socket_path) {
Ok(listener) => loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let ctx = ctx.clone();
tokio::spawn(async move {
if let Err(e) = handle_conn(ctx.clone(), stream).await {
warn!(
target = "control",
profile = %ctx.profile,
"control connection error: {}",
e
);
}
});
}
Err(e) => {
error!(target = "control", profile = %ctx.profile, "accept failed: {}", e);
break;
}
}
},
Err(e) => {
error!(
target = "control",
profile = %ctx.profile,
path = %socket_path.display(),
"failed to bind control socket: {}",
e
);
}
}
});
}
#[cfg(not(unix))]
pub fn maybe_spawn_control_server(_socket_path: std::path::PathBuf, _ctx: ControlContext) {}
#[cfg(unix)]
#[derive(Debug, Deserialize)]
#[serde(tag = "cmd", rename_all = "snake_case")]
enum ControlCommand {
Ping,
Status,
Workers,
HealthSet {
health_score: Option<f32>,
avg_cpu: Option<f32>,
avg_memory: Option<f32>,
load_percentage: Option<f32>,
current_workers: Option<u32>,
max_workers: Option<u32>,
},
HealthReset,
HealthUnavailable,
PrunePeers {
older_than_secs: Option<u64>,
},
}
#[cfg(unix)]
async fn handle_conn(ctx: ControlContext, mut stream: tokio::net::UnixStream) -> eyre::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
let (r, mut w) = stream.split();
let mut reader = BufReader::new(r);
let mut line = String::new();
while reader.read_line(&mut line).await? > 0 {
let raw = line.trim();
if raw.is_empty() {
line.clear();
continue;
}
match serde_json::from_str::<ControlCommand>(raw) {
Ok(ControlCommand::Ping) => {
w.write_all(b"{\"ok\":true}\n").await?;
}
Ok(ControlCommand::Status) => {
use volli_core::ManagerPeerEntry;
let health_json = if let Ok(mut c) = ctx.health.collector.try_lock() {
let m = c.collect_metrics().await;
Some(serde_json::json!({
"health_score": m.health_score,
"load_percentage": m.load_percentage,
"max_workers": m.max_workers,
"current_workers": m.current_workers,
"avg_cpu": m.avg_cpu,
"avg_memory": m.avg_memory,
"last_health_update": m.last_health_update,
}))
} else {
None
};
let worker_count = if let Ok(c) = ctx.health.collector.try_lock() {
c.worker_count() as usize
} else {
0
};
let self_name = load_manager_name(&ctx.profile)
.ok()
.flatten()
.unwrap_or_else(|| ctx.profile.to_string());
let self_host = load_profile_host(&ctx.profile)
.ok()
.flatten()
.unwrap_or_default();
let self_tcp = load_tcp_port(&ctx.profile).ok().flatten().unwrap_or(0);
let peer_entries = crate::load_peers_for_gossip(&ctx.profile).unwrap_or_default();
let mut peer_map: std::collections::HashMap<String, ManagerPeerEntry> =
std::collections::HashMap::new();
for p in peer_entries {
peer_map.insert(p.manager_id.clone(), p);
}
let map = ctx.peers.lock().await;
let mut peers_status = Vec::new();
for (full_id, state) in map.iter() {
let manager_id = full_id.split(':').next_back().unwrap_or(full_id);
let (name, health, host, tcp_port) = peer_map
.get(manager_id)
.map(|p| {
(
p.manager_name.clone(),
p.health.clone(),
p.host.clone(),
p.tcp_port,
)
})
.unwrap_or_else(|| (manager_id.to_string(), None, String::new(), 0));
let conn = match state.conn_state {
crate::ConnectionState::Client => "client",
crate::ConnectionState::Server => "server",
crate::ConnectionState::Inactive => "inactive",
};
let rtt_ms = if !host.is_empty() && tcp_port != 0 {
let addr = format!("{}:{}", host, tcp_port);
let start = std::time::Instant::now();
match tokio::time::timeout(
std::time::Duration::from_millis(250),
tokio::net::TcpStream::connect(&addr),
)
.await
{
Ok(Ok(_)) => Some(start.elapsed().as_millis() as u64),
_ => None,
}
} else {
None
};
peers_status.push(serde_json::json!({
"id": manager_id,
"name": name,
"state": conn,
"health": health,
"rtt_ms": rtt_ms,
}));
}
let resp = serde_json::json!({
"ok": true,
"profile": ctx.profile,
"workers": worker_count,
"health": health_json,
"self": {"id": ctx.self_id, "name": self_name, "host": self_host, "tcp_port": self_tcp},
"peers": peers_status,
});
w.write_all(resp.to_string().as_bytes()).await?;
w.write_all(b"\n").await?;
}
Ok(ControlCommand::Workers) => {
let list = crate::workers::list_workers(&ctx.workers).await;
let resp = serde_json::json!({
"ok": true,
"profile": ctx.profile,
"self_id": ctx.self_id,
"workers": list,
});
w.write_all(resp.to_string().as_bytes()).await?;
w.write_all(b"\n").await?;
}
Ok(ControlCommand::PrunePeers { older_than_secs }) => {
let threshold = older_than_secs.unwrap_or(86_400);
let removed = prune_stale_peers(
&ctx.peers,
&ctx.profile,
&ctx.alive_tx,
&ctx.peer_version,
threshold,
)
.await;
let resp = serde_json::json!({
"ok": true,
"profile": ctx.profile,
"removed": removed,
});
w.write_all(resp.to_string().as_bytes()).await?;
w.write_all(b"\n").await?;
}
Ok(ControlCommand::HealthReset) => {
if let Ok(mut c) = ctx.health.collector.try_lock() {
c.clear_override();
}
info!(target = "control", profile = %ctx.profile, "health override reset");
let new_ver = ctx.peer_version.fetch_add(1, Ordering::SeqCst) + 1;
let _ = crate::save_peer_version(&ctx.profile, new_ver);
let _ = ctx.alive_tx.send(new_ver);
w.write_all(b"{\"ok\":true}\n").await?;
}
Ok(ControlCommand::HealthUnavailable) => {
if let Ok(mut c) = ctx.health.collector.try_lock() {
c.set_override(Some(crate::health::HealthOverride {
health_score: Some(0.0),
avg_cpu: Some(100.0),
avg_memory: Some(95.0),
load_percentage: None,
current_workers: None,
max_workers: None,
}));
}
info!(target = "control", profile = %ctx.profile, "health set to unavailable");
let new_ver = ctx.peer_version.fetch_add(1, Ordering::SeqCst) + 1;
let _ = crate::save_peer_version(&ctx.profile, new_ver);
let _ = ctx.alive_tx.send(new_ver);
w.write_all(b"{\"ok\":true}\n").await?;
}
Ok(ControlCommand::HealthSet {
health_score,
avg_cpu,
avg_memory,
load_percentage,
current_workers,
max_workers,
}) => {
if let Ok(mut c) = ctx.health.collector.try_lock() {
let ov = crate::health::HealthOverride {
health_score,
avg_cpu,
avg_memory,
load_percentage,
current_workers,
max_workers,
};
c.set_override(Some(ov));
}
info!(target = "control", profile = %ctx.profile, "health override applied");
let new_ver = ctx.peer_version.fetch_add(1, Ordering::SeqCst) + 1;
let _ = crate::save_peer_version(&ctx.profile, new_ver);
let _ = ctx.alive_tx.send(new_ver);
w.write_all(b"{\"ok\":true}\n").await?;
}
Err(e) => {
let msg = format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e);
w.write_all(msg.as_bytes()).await?;
}
}
line.clear();
}
Ok(())
}