roboticus 0.11.4

Autonomous agent runtime — HTTP API, CLI, WebSocket push, and migration engine
Documentation
//! Server bootstrap — `cmd_serve` and its supporting helpers.

use std::net::SocketAddr;
use std::path::Path;
use std::time::Instant;

use tracing::info;

use roboticus_core::config::{RoboticusConfig, resolve_config_path};
use roboticus_server::cli;

use crate::banner::{print_banner, step, step_detail, step_warn};
use crate::legacy_proxy::{
    LegacyLoopbackMode, check_internal_proxy_reachability, collect_legacy_loopback_providers,
    legacy_loopback_mode, migrate_legacy_proxy_urls, validate_legacy_loopback_urls_for_mode,
};

pub(crate) const FALLBACK_CONFIG: &str = r#"
[agent]
name = "Roboticus"
id = "roboticus-dev"

[server]
port = 18789
bind = "127.0.0.1"

[database]
path = ":memory:"

[models]
primary = "ollama/qwen3:8b"
"#;

pub(crate) async fn cmd_serve(
    config_path: Option<String>,
    profile_override: Option<String>,
    port_override: Option<u16>,
    bind_override: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
    use roboticus_core::resolve_profile_config_path;

    let t = cli::theme();
    let boot_start = Instant::now();
    print_banner(t);

    const STEPS: u32 = 12;

    // Config resolution priority:
    //   1. Explicit --config flag
    //   2. Profile-aware resolution (--profile flag or active profile in profiles.toml)
    //   3. Legacy resolve_config_path fallback
    let resolved_path = if config_path.is_some() {
        resolve_config_path(config_path.as_deref()).map(|p| p.to_string_lossy().into_owned())
    } else {
        resolve_profile_config_path(profile_override.as_deref())
            .map(|p| p.to_string_lossy().into_owned())
            .or_else(|| resolve_config_path(None).map(|p| p.to_string_lossy().into_owned()))
    };

    let mut config = match resolved_path {
        Some(ref p) => {
            step(t, 1, STEPS, "Loading configuration");
            step_detail(t, "source", p);
            RoboticusConfig::from_file(Path::new(p))?
        }
        None => {
            step(t, 1, STEPS, "Using default configuration");
            step_detail(t, "source", "built-in defaults");
            RoboticusConfig::from_str(FALLBACK_CONFIG)?
        }
    };

    if let Some(p) = port_override {
        config.server.port = p;
    }
    if let Some(b) = bind_override {
        config.server.bind = b;
    }

    match legacy_loopback_mode() {
        LegacyLoopbackMode::MigrateDeprecated => {
            let migrations =
                migrate_legacy_proxy_urls(&mut config, resolved_path.as_deref().map(Path::new))?;
            if !migrations.is_empty() {
                step_warn(
                    t,
                    2,
                    STEPS,
                    &format!(
                        "Migrated {} legacy provider URL(s) from loopback proxy to in-process routing",
                        migrations.len()
                    ),
                );
                for m in &migrations {
                    step_detail(t, &format!("providers.{}", m.provider), &m.to_url);
                }
                step_warn(
                    t,
                    2,
                    STEPS,
                    "Legacy loopback provider URLs are deprecated and will be removed in v0.8.0",
                );
            }
        }
        LegacyLoopbackMode::Unsupported => {
            if let Err(msg) =
                validate_legacy_loopback_urls_for_mode(&config, LegacyLoopbackMode::Unsupported)
            {
                let (er, r) = (t.error(), t.reset());
                let err_icon = t.icon_error();
                eprintln!(
                    "  {er}{err_icon}{r} Legacy loopback provider URLs are not supported in v0.8.0+"
                );
                for item in collect_legacy_loopback_providers(&config) {
                    step_detail(t, "update required", &item);
                }
                return Err(msg.into());
            }
        }
    }

    config.validate().map_err(|e| {
        let (er, r) = (t.error(), t.reset());
        let err_icon = t.icon_error();
        eprintln!("  {er}{err_icon}{r} Configuration validation failed: {e}");
        e
    })?;
    step(t, 2, STEPS, "Configuration validated");

    let unreachable_proxies = check_internal_proxy_reachability(&config);
    if !unreachable_proxies.is_empty() {
        let (w, r) = (t.warn(), t.reset());
        let warn_icon = t.icon_warn();
        eprintln!(
            "  {w}{warn_icon}{r} Some local provider proxies are not currently reachable: {}",
            unreachable_proxies.join(", ")
        );
        eprintln!(
            "         These providers will be skipped until reachable (circuit breaker will manage availability)."
        );
    }

    // MCP config migration warning: legacy [mcp].clients → [mcp].servers
    if !config.mcp.clients.is_empty() && config.mcp.servers.is_empty() {
        let (w, r) = (t.warn(), t.reset());
        let warn_icon = t.icon_warn();
        eprintln!(
            "  {w}{warn_icon}{r} [mcp].clients is deprecated — rename to [mcp].servers in roboticus.toml"
        );
        eprintln!("         Both keys are accepted, but [mcp].servers is the canonical form.");
    }

    let is_localhost = config.server.bind == "127.0.0.1"
        || config.server.bind == "localhost"
        || config.server.bind == "::1";
    if !is_localhost && config.server.api_key.is_none() {
        let (er, r) = (t.error(), t.reset());
        eprintln!();
        eprintln!(
            "  {er}ERROR:{r} Server bound to {} without API key.",
            config.server.bind
        );
        eprintln!("         Set [server] api_key = \"your-secret\" in config to secure the API.");
        eprintln!();
        return Err("Refusing to start on non-localhost without API key".into());
    }

    // Bootstrap does all heavy initialization: DB, wallet, LLM, plugins.
    // Phase diagnostics are printed by bootstrap itself as [bootstrap] lines.
    let app = roboticus_server::bootstrap_with_config_path(
        config.clone(),
        resolved_path.clone().map(std::path::PathBuf::from),
    )
    .await?;
    step(t, 3, STEPS, "Tracing initialized");
    step_detail(t, "level", &config.agent.log_level);

    let db_path = config.database.path.to_string_lossy();
    step(t, 4, STEPS, "Database initialized");
    step_detail(t, "path", &db_path);
    if db_path == ":memory:" {
        step_detail(t, "mode", "in-memory (ephemeral)");
    } else {
        step_detail(t, "mode", "WAL (persistent)");
    }

    step(t, 5, STEPS, "Wallet service ready");
    step_detail(t, "chain", &format!("chain_id={}", config.wallet.chain_id));
    step_detail(t, "rpc", &config.wallet.rpc_url);

    step(t, 6, STEPS, "Identity resolved");
    step_detail(t, "name", &config.agent.name);
    step_detail(t, "id", &config.agent.id);

    // LlmService is already constructed inside bootstrap_with_config_path and
    // stored in AppState.  No need to instantiate a second (dead) copy here.
    step(t, 7, STEPS, "LLM service ready");
    step_detail(t, "primary", &config.models.primary);
    let fallback_str = if config.models.fallbacks.is_empty() {
        "none".to_string()
    } else {
        config.models.fallbacks.join(", ")
    };
    step_detail(t, "fallbacks", &fallback_str);
    step_detail(t, "routing", &config.models.routing.mode);

    step(t, 8, STEPS, "Agent loop initialized");

    if config.skills.skills_dir.exists() {
        step(t, 9, STEPS, "Skills loaded");
        step_detail(t, "dir", &config.skills.skills_dir.display().to_string());
    } else {
        step_warn(
            t,
            9,
            STEPS,
            &format!(
                "Skills directory not found: {}",
                config.skills.skills_dir.display()
            ),
        );
    }

    let _heartbeat = roboticus_schedule::HeartbeatDaemon::new(60_000);
    step(t, 10, STEPS, "Scheduler initialized");
    step_detail(t, "heartbeat", "60s");

    let mut channels = vec!["web"];
    if config.channels.telegram.is_some() {
        channels.push("telegram");
    }
    if config.channels.whatsapp.is_some() {
        channels.push("whatsapp");
    }
    if config.channels.discord.is_some() {
        channels.push("discord");
    }
    if config.channels.signal.is_some() {
        channels.push("signal");
    }
    if config.a2a.enabled {
        channels.push("a2a");
    }
    step(t, 11, STEPS, "Channel adapters ready");
    step_detail(t, "active", &channels.join(", "));

    let bind_addr = format!("{}:{}", config.server.bind, config.server.port);
    let display_host = match config.server.bind.as_str() {
        "127.0.0.1" | "0.0.0.0" | "::1" | "::" => "localhost",
        other => other,
    };
    let display_addr = format!("{display_host}:{}", config.server.port);

    step(t, 12, STEPS, "HTTP server starting");
    step_detail(t, "bind", &bind_addr);
    step_detail(t, "dashboard", &format!("http://{display_addr}"));
    step_detail(t, "channels", &channels.join(", "));

    let shutdown_signal = async {
        let ctrl_c = tokio::signal::ctrl_c();

        #[cfg(unix)]
        {
            match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
                Ok(mut sigterm) => {
                    tokio::select! {
                        _ = ctrl_c => info!("SIGINT received, shutting down gracefully"),
                        _ = sigterm.recv() => info!("SIGTERM received, shutting down gracefully"),
                    }
                }
                Err(e) => {
                    tracing::warn!(error = %e, "failed to install SIGTERM handler, falling back to SIGINT only");
                    // best-effort: signal wait result is irrelevant during shutdown
                    ctrl_c.await.ok();
                    info!("SIGINT received, shutting down gracefully");
                }
            }
        }
        #[cfg(not(unix))]
        {
            // best-effort: signal wait result is irrelevant during shutdown
            ctrl_c.await.ok();
            info!("SIGINT received, shutting down gracefully");
        }
    };

    let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
        Ok(l) => l,
        Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
            let (w, r) = (t.icon_warn(), t.reset());
            eprintln!("  {w} Port {bind_addr} in use, shutting down previous instance...{r}");

            #[cfg(unix)]
            {
                if let Ok(pids) = find_roboticus_listeners(config.server.port) {
                    let own_pid = std::process::id();
                    for pid in pids.iter().filter(|&&p| p != own_pid) {
                        if let Ok(p) = i32::try_from(*pid) {
                            // SAFETY: `libc::kill` is a POSIX signal-delivery syscall. The PID was
                            // obtained from a live `TcpListener` bind check moments ago. Sending
                            // SIGTERM to a PID is safe; the worst case is the process has already
                            // exited and the signal is a no-op (kill returns -1/ESRCH).
                            unsafe {
                                libc::kill(p, libc::SIGTERM);
                            }
                        }
                    }
                    tokio::time::sleep(std::time::Duration::from_secs(2)).await;

                    if let Ok(remaining) = find_roboticus_listeners(config.server.port) {
                        for pid in remaining.iter().filter(|&&p| p != own_pid) {
                            if let Ok(p) = i32::try_from(*pid) {
                                // SAFETY: `libc::kill` is a POSIX signal-delivery syscall. The PID was
                                // obtained from a live `TcpListener` bind check moments ago. Sending
                                // SIGKILL to a PID is safe; the worst case is the process has already
                                // exited and the signal is a no-op (kill returns -1/ESRCH).
                                unsafe {
                                    libc::kill(p, libc::SIGKILL);
                                }
                            }
                        }
                        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
                    }
                }
            }

            tokio::net::TcpListener::bind(&bind_addr)
                .await
                .map_err(|e2| {
                    format!("port {bind_addr} still in use after killing previous instance: {e2}")
                })?
        }
        Err(e) => return Err(e.into()),
    };
    let elapsed = boot_start.elapsed();
    let (a, b, r) = (t.accent(), t.bold(), t.reset());
    eprintln!();
    let action_icon = t.icon_action();
    eprint!("  {action_icon} ");
    t.typewrite(&format!("{b}Ready{r} in {a}{:.0?}{r}", elapsed), 25);
    eprintln!();
    eprintln!();

    if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
        roboticus_server::enable_stderr_logging();
    }
    info!("Roboticus listening on http://{bind_addr}");

    axum::serve(
        listener,
        app.into_make_service_with_connect_info::<SocketAddr>(),
    )
    .with_graceful_shutdown(shutdown_signal)
    .await?;

    // Semantic cache: in-memory entries are ephemeral (no persistence tier).
    // DB-backed cache entries are written synchronously on store, so no
    // flush is needed at shutdown. When a persistent in-memory cache tier
    // is added, wire its flush() call here.

    info!("Server shut down");
    Ok(())
}

/// Find PIDs of roboticus processes listening on the given port using `lsof`.
/// Filters by process name to avoid killing unrelated processes (e.g. browsers).
#[cfg(unix)]
pub(crate) fn find_roboticus_listeners(port: u16) -> Result<Vec<u32>, Box<dyn std::error::Error>> {
    let output = std::process::Command::new("lsof")
        .args(["-ti", &format!(":{port}")])
        .output()?;
    let pids: Vec<u32> = String::from_utf8_lossy(&output.stdout)
        .lines()
        .filter_map(|line| line.trim().parse::<u32>().ok())
        .filter(|&pid| {
            if let Ok(cmd_output) = std::process::Command::new("ps")
                .args(["-p", &pid.to_string(), "-o", "comm="])
                .output()
            {
                let cmd = String::from_utf8_lossy(&cmd_output.stdout);
                cmd.trim().contains("roboticus")
            } else {
                false
            }
        })
        .collect();
    Ok(pids)
}