volli-manager 0.1.12

Manager for volli
Documentation
use crate::connection::HealthContext;
use crate::peers::prune_stale_peers;
use crate::peers::{AliveTable, AliveTx};
use crate::workers::WorkerTable;
use crate::{load_manager_name, load_profile_host, load_tcp_port};
use serde::Deserialize;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
// Path imports no longer needed after centralizing helpers
use tracing::{error, info, warn};

// hash/socket path helpers available via volli_core::util when needed

#[derive(Clone)]
pub struct ControlContext {
    pub health: HealthContext,
    pub peers: AliveTable,
    pub workers: WorkerTable,
    pub profile: String,
    pub self_id: String,
    pub peer_version: Arc<AtomicU64>,
    pub alive_tx: AliveTx,
}

#[cfg(unix)]
pub fn maybe_spawn_control_server(socket_path: std::path::PathBuf, ctx: ControlContext) {
    use tokio::net::UnixListener;
    // Remove existing stale socket
    let _ = std::fs::remove_file(&socket_path);
    info!(
        target = "control",
        profile = %ctx.profile,
        path = %socket_path.display(),
        "starting control server"
    );
    tokio::spawn(async move {
        match UnixListener::bind(&socket_path) {
            Ok(listener) => loop {
                match listener.accept().await {
                    Ok((stream, _addr)) => {
                        let ctx = ctx.clone();
                        tokio::spawn(async move {
                            if let Err(e) = handle_conn(ctx.clone(), stream).await {
                                warn!(
                                    target = "control",
                                    profile = %ctx.profile,
                                    "control connection error: {}",
                                    e
                                );
                            }
                        });
                    }
                    Err(e) => {
                        error!(target = "control", profile = %ctx.profile, "accept failed: {}", e);
                        break;
                    }
                }
            },
            Err(e) => {
                error!(
                    target = "control",
                    profile = %ctx.profile,
                    path = %socket_path.display(),
                    "failed to bind control socket: {}",
                    e
                );
            }
        }
    });
}

#[cfg(not(unix))]
pub fn maybe_spawn_control_server(_socket_path: std::path::PathBuf, _ctx: ControlContext) {}

#[cfg(unix)]
#[derive(Debug, Deserialize)]
#[serde(tag = "cmd", rename_all = "snake_case")]
enum ControlCommand {
    Ping,
    Status,
    Workers,
    HealthSet {
        health_score: Option<f32>,
        avg_cpu: Option<f32>,
        avg_memory: Option<f32>,
        load_percentage: Option<f32>,
        current_workers: Option<u32>,
        max_workers: Option<u32>,
    },
    HealthReset,
    HealthUnavailable,
    PrunePeers {
        older_than_secs: Option<u64>,
    },
}

#[cfg(unix)]
async fn handle_conn(ctx: ControlContext, mut stream: tokio::net::UnixStream) -> eyre::Result<()> {
    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
    let (r, mut w) = stream.split();
    let mut reader = BufReader::new(r);
    let mut line = String::new();
    while reader.read_line(&mut line).await? > 0 {
        let raw = line.trim();
        if raw.is_empty() {
            line.clear();
            continue;
        }
        match serde_json::from_str::<ControlCommand>(raw) {
            Ok(ControlCommand::Ping) => {
                w.write_all(b"{\"ok\":true}\n").await?;
            }
            Ok(ControlCommand::Status) => {
                // Build a concise status: workers count, health metrics, peer states

                use volli_core::ManagerPeerEntry;
                let health_json = if let Ok(mut c) = ctx.health.collector.try_lock() {
                    let m = c.collect_metrics().await;
                    Some(serde_json::json!({
                        "health_score": m.health_score,
                        "load_percentage": m.load_percentage,
                        "max_workers": m.max_workers,
                        "current_workers": m.current_workers,
                        "avg_cpu": m.avg_cpu,
                        "avg_memory": m.avg_memory,
                        "last_health_update": m.last_health_update,
                    }))
                } else {
                    None
                };
                // Use live local worker count from health collector
                let worker_count = if let Ok(c) = ctx.health.collector.try_lock() {
                    c.worker_count() as usize
                } else {
                    0
                };

                // Self-identification for better matching in clients
                let self_name = load_manager_name(&ctx.profile)
                    .ok()
                    .flatten()
                    .unwrap_or_else(|| ctx.profile.to_string());
                let self_host = load_profile_host(&ctx.profile)
                    .ok()
                    .flatten()
                    .unwrap_or_default();
                let self_tcp = load_tcp_port(&ctx.profile).ok().flatten().unwrap_or(0);

                // Load peer metadata for names + health
                let peer_entries = crate::load_peers_for_gossip(&ctx.profile).unwrap_or_default();
                let mut peer_map: std::collections::HashMap<String, ManagerPeerEntry> =
                    std::collections::HashMap::new();
                for p in peer_entries {
                    peer_map.insert(p.manager_id.clone(), p);
                }
                let map = ctx.peers.lock().await;
                let mut peers_status = Vec::new();
                for (full_id, state) in map.iter() {
                    let manager_id = full_id.split(':').next_back().unwrap_or(full_id);
                    let (name, health, host, tcp_port) = peer_map
                        .get(manager_id)
                        .map(|p| {
                            (
                                p.manager_name.clone(),
                                p.health.clone(),
                                p.host.clone(),
                                p.tcp_port,
                            )
                        })
                        .unwrap_or_else(|| (manager_id.to_string(), None, String::new(), 0));
                    let conn = match state.conn_state {
                        crate::ConnectionState::Client => "client",
                        crate::ConnectionState::Server => "server",
                        crate::ConnectionState::Inactive => "inactive",
                    };
                    // Lightweight RTT via TCP connect (short timeout)
                    let rtt_ms = if !host.is_empty() && tcp_port != 0 {
                        let addr = format!("{}:{}", host, tcp_port);
                        let start = std::time::Instant::now();
                        match tokio::time::timeout(
                            std::time::Duration::from_millis(250),
                            tokio::net::TcpStream::connect(&addr),
                        )
                        .await
                        {
                            Ok(Ok(_)) => Some(start.elapsed().as_millis() as u64),
                            _ => None,
                        }
                    } else {
                        None
                    };
                    peers_status.push(serde_json::json!({
                        "id": manager_id,
                        "name": name,
                        "state": conn,
                        "health": health,
                        "rtt_ms": rtt_ms,
                    }));
                }

                let resp = serde_json::json!({
                    "ok": true,
                    "profile": ctx.profile,
                    "workers": worker_count,
                    "health": health_json,
                    "self": {"id": ctx.self_id, "name": self_name, "host": self_host, "tcp_port": self_tcp},
                    "peers": peers_status,
                });
                w.write_all(resp.to_string().as_bytes()).await?;
                w.write_all(b"\n").await?;
            }
            Ok(ControlCommand::Workers) => {
                let list = crate::workers::list_workers(&ctx.workers).await;
                let resp = serde_json::json!({
                    "ok": true,
                    "profile": ctx.profile,
                    "self_id": ctx.self_id,
                    "workers": list,
                });
                w.write_all(resp.to_string().as_bytes()).await?;
                w.write_all(b"\n").await?;
            }
            Ok(ControlCommand::PrunePeers { older_than_secs }) => {
                let threshold = older_than_secs.unwrap_or(86_400);
                let removed = prune_stale_peers(
                    &ctx.peers,
                    &ctx.profile,
                    &ctx.alive_tx,
                    &ctx.peer_version,
                    threshold,
                )
                .await;
                let resp = serde_json::json!({
                    "ok": true,
                    "profile": ctx.profile,
                    "removed": removed,
                });
                w.write_all(resp.to_string().as_bytes()).await?;
                w.write_all(b"\n").await?;
            }
            Ok(ControlCommand::HealthReset) => {
                if let Ok(mut c) = ctx.health.collector.try_lock() {
                    c.clear_override();
                }
                info!(target = "control", profile = %ctx.profile, "health override reset");
                // Bump peer version immediately to propagate updated health to workers
                let new_ver = ctx.peer_version.fetch_add(1, Ordering::SeqCst) + 1;
                let _ = crate::save_peer_version(&ctx.profile, new_ver);
                let _ = ctx.alive_tx.send(new_ver);
                w.write_all(b"{\"ok\":true}\n").await?;
            }
            Ok(ControlCommand::HealthUnavailable) => {
                if let Ok(mut c) = ctx.health.collector.try_lock() {
                    c.set_override(Some(crate::health::HealthOverride {
                        health_score: Some(0.0),
                        avg_cpu: Some(100.0),
                        avg_memory: Some(95.0),
                        load_percentage: None,
                        current_workers: None,
                        max_workers: None,
                    }));
                }
                info!(target = "control", profile = %ctx.profile, "health set to unavailable");
                let new_ver = ctx.peer_version.fetch_add(1, Ordering::SeqCst) + 1;
                let _ = crate::save_peer_version(&ctx.profile, new_ver);
                let _ = ctx.alive_tx.send(new_ver);
                w.write_all(b"{\"ok\":true}\n").await?;
            }
            Ok(ControlCommand::HealthSet {
                health_score,
                avg_cpu,
                avg_memory,
                load_percentage,
                current_workers,
                max_workers,
            }) => {
                if let Ok(mut c) = ctx.health.collector.try_lock() {
                    let ov = crate::health::HealthOverride {
                        health_score,
                        avg_cpu,
                        avg_memory,
                        load_percentage,
                        current_workers,
                        max_workers,
                    };
                    c.set_override(Some(ov));
                }
                info!(target = "control", profile = %ctx.profile, "health override applied");
                let new_ver = ctx.peer_version.fetch_add(1, Ordering::SeqCst) + 1;
                let _ = crate::save_peer_version(&ctx.profile, new_ver);
                let _ = ctx.alive_tx.send(new_ver);
                w.write_all(b"{\"ok\":true}\n").await?;
            }
            Err(e) => {
                let msg = format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e);
                w.write_all(msg.as_bytes()).await?;
            }
        }
        line.clear();
    }
    Ok(())
}