use std::net::SocketAddr;
use std::path::Path;
use std::time::Instant;
use tracing::info;
use roboticus_core::config::{RoboticusConfig, resolve_config_path};
use roboticus_server::cli;
use crate::banner::{print_banner, step, step_detail, step_warn};
use crate::legacy_proxy::{
LegacyLoopbackMode, check_internal_proxy_reachability, collect_legacy_loopback_providers,
legacy_loopback_mode, migrate_legacy_proxy_urls, validate_legacy_loopback_urls_for_mode,
};
pub(crate) const FALLBACK_CONFIG: &str = r#"
[agent]
name = "Roboticus"
id = "roboticus-dev"
[server]
port = 18789
bind = "127.0.0.1"
[database]
path = ":memory:"
[models]
primary = "ollama/qwen3:8b"
"#;
pub(crate) async fn cmd_serve(
config_path: Option<String>,
profile_override: Option<String>,
port_override: Option<u16>,
bind_override: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
use roboticus_core::resolve_profile_config_path;
let t = cli::theme();
let boot_start = Instant::now();
print_banner(t);
const STEPS: u32 = 12;
let resolved_path = if config_path.is_some() {
resolve_config_path(config_path.as_deref()).map(|p| p.to_string_lossy().into_owned())
} else {
resolve_profile_config_path(profile_override.as_deref())
.map(|p| p.to_string_lossy().into_owned())
.or_else(|| resolve_config_path(None).map(|p| p.to_string_lossy().into_owned()))
};
let mut config = match resolved_path {
Some(ref p) => {
step(t, 1, STEPS, "Loading configuration");
step_detail(t, "source", p);
RoboticusConfig::from_file(Path::new(p))?
}
None => {
step(t, 1, STEPS, "Using default configuration");
step_detail(t, "source", "built-in defaults");
RoboticusConfig::from_str(FALLBACK_CONFIG)?
}
};
if let Some(p) = port_override {
config.server.port = p;
}
if let Some(b) = bind_override {
config.server.bind = b;
}
match legacy_loopback_mode() {
LegacyLoopbackMode::MigrateDeprecated => {
let migrations =
migrate_legacy_proxy_urls(&mut config, resolved_path.as_deref().map(Path::new))?;
if !migrations.is_empty() {
step_warn(
t,
2,
STEPS,
&format!(
"Migrated {} legacy provider URL(s) from loopback proxy to in-process routing",
migrations.len()
),
);
for m in &migrations {
step_detail(t, &format!("providers.{}", m.provider), &m.to_url);
}
step_warn(
t,
2,
STEPS,
"Legacy loopback provider URLs are deprecated and will be removed in v0.8.0",
);
}
}
LegacyLoopbackMode::Unsupported => {
if let Err(msg) =
validate_legacy_loopback_urls_for_mode(&config, LegacyLoopbackMode::Unsupported)
{
let (er, r) = (t.error(), t.reset());
let err_icon = t.icon_error();
eprintln!(
" {er}{err_icon}{r} Legacy loopback provider URLs are not supported in v0.8.0+"
);
for item in collect_legacy_loopback_providers(&config) {
step_detail(t, "update required", &item);
}
return Err(msg.into());
}
}
}
config.validate().map_err(|e| {
let (er, r) = (t.error(), t.reset());
let err_icon = t.icon_error();
eprintln!(" {er}{err_icon}{r} Configuration validation failed: {e}");
e
})?;
step(t, 2, STEPS, "Configuration validated");
let unreachable_proxies = check_internal_proxy_reachability(&config);
if !unreachable_proxies.is_empty() {
let (w, r) = (t.warn(), t.reset());
let warn_icon = t.icon_warn();
eprintln!(
" {w}{warn_icon}{r} Some local provider proxies are not currently reachable: {}",
unreachable_proxies.join(", ")
);
eprintln!(
" These providers will be skipped until reachable (circuit breaker will manage availability)."
);
}
if !config.mcp.clients.is_empty() && config.mcp.servers.is_empty() {
let (w, r) = (t.warn(), t.reset());
let warn_icon = t.icon_warn();
eprintln!(
" {w}{warn_icon}{r} [mcp].clients is deprecated — rename to [mcp].servers in roboticus.toml"
);
eprintln!(" Both keys are accepted, but [mcp].servers is the canonical form.");
}
let is_localhost = config.server.bind == "127.0.0.1"
|| config.server.bind == "localhost"
|| config.server.bind == "::1";
if !is_localhost && config.server.api_key.is_none() {
let (er, r) = (t.error(), t.reset());
eprintln!();
eprintln!(
" {er}ERROR:{r} Server bound to {} without API key.",
config.server.bind
);
eprintln!(" Set [server] api_key = \"your-secret\" in config to secure the API.");
eprintln!();
return Err("Refusing to start on non-localhost without API key".into());
}
let app = roboticus_server::bootstrap_with_config_path(
config.clone(),
resolved_path.clone().map(std::path::PathBuf::from),
)
.await?;
step(t, 3, STEPS, "Tracing initialized");
step_detail(t, "level", &config.agent.log_level);
let db_path = config.database.path.to_string_lossy();
step(t, 4, STEPS, "Database initialized");
step_detail(t, "path", &db_path);
if db_path == ":memory:" {
step_detail(t, "mode", "in-memory (ephemeral)");
} else {
step_detail(t, "mode", "WAL (persistent)");
}
step(t, 5, STEPS, "Wallet service ready");
step_detail(t, "chain", &format!("chain_id={}", config.wallet.chain_id));
step_detail(t, "rpc", &config.wallet.rpc_url);
step(t, 6, STEPS, "Identity resolved");
step_detail(t, "name", &config.agent.name);
step_detail(t, "id", &config.agent.id);
step(t, 7, STEPS, "LLM service ready");
step_detail(t, "primary", &config.models.primary);
let fallback_str = if config.models.fallbacks.is_empty() {
"none".to_string()
} else {
config.models.fallbacks.join(", ")
};
step_detail(t, "fallbacks", &fallback_str);
step_detail(t, "routing", &config.models.routing.mode);
step(t, 8, STEPS, "Agent loop initialized");
if config.skills.skills_dir.exists() {
step(t, 9, STEPS, "Skills loaded");
step_detail(t, "dir", &config.skills.skills_dir.display().to_string());
} else {
step_warn(
t,
9,
STEPS,
&format!(
"Skills directory not found: {}",
config.skills.skills_dir.display()
),
);
}
let _heartbeat = roboticus_schedule::HeartbeatDaemon::new(60_000);
step(t, 10, STEPS, "Scheduler initialized");
step_detail(t, "heartbeat", "60s");
let mut channels = vec!["web"];
if config.channels.telegram.is_some() {
channels.push("telegram");
}
if config.channels.whatsapp.is_some() {
channels.push("whatsapp");
}
if config.channels.discord.is_some() {
channels.push("discord");
}
if config.channels.signal.is_some() {
channels.push("signal");
}
if config.a2a.enabled {
channels.push("a2a");
}
step(t, 11, STEPS, "Channel adapters ready");
step_detail(t, "active", &channels.join(", "));
let bind_addr = format!("{}:{}", config.server.bind, config.server.port);
let display_host = match config.server.bind.as_str() {
"127.0.0.1" | "0.0.0.0" | "::1" | "::" => "localhost",
other => other,
};
let display_addr = format!("{display_host}:{}", config.server.port);
step(t, 12, STEPS, "HTTP server starting");
step_detail(t, "bind", &bind_addr);
step_detail(t, "dashboard", &format!("http://{display_addr}"));
step_detail(t, "channels", &channels.join(", "));
let shutdown_signal = async {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sigterm) => {
tokio::select! {
_ = ctrl_c => info!("SIGINT received, shutting down gracefully"),
_ = sigterm.recv() => info!("SIGTERM received, shutting down gracefully"),
}
}
Err(e) => {
tracing::warn!(error = %e, "failed to install SIGTERM handler, falling back to SIGINT only");
ctrl_c.await.ok();
info!("SIGINT received, shutting down gracefully");
}
}
}
#[cfg(not(unix))]
{
ctrl_c.await.ok();
info!("SIGINT received, shutting down gracefully");
}
};
let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
Ok(l) => l,
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
let (w, r) = (t.icon_warn(), t.reset());
eprintln!(" {w} Port {bind_addr} in use, shutting down previous instance...{r}");
#[cfg(unix)]
{
if let Ok(pids) = find_roboticus_listeners(config.server.port) {
let own_pid = std::process::id();
for pid in pids.iter().filter(|&&p| p != own_pid) {
if let Ok(p) = i32::try_from(*pid) {
unsafe {
libc::kill(p, libc::SIGTERM);
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
if let Ok(remaining) = find_roboticus_listeners(config.server.port) {
for pid in remaining.iter().filter(|&&p| p != own_pid) {
if let Ok(p) = i32::try_from(*pid) {
unsafe {
libc::kill(p, libc::SIGKILL);
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
tokio::net::TcpListener::bind(&bind_addr)
.await
.map_err(|e2| {
format!("port {bind_addr} still in use after killing previous instance: {e2}")
})?
}
Err(e) => return Err(e.into()),
};
let elapsed = boot_start.elapsed();
let (a, b, r) = (t.accent(), t.bold(), t.reset());
eprintln!();
let action_icon = t.icon_action();
eprint!(" {action_icon} ");
t.typewrite(&format!("{b}Ready{r} in {a}{:.0?}{r}", elapsed), 25);
eprintln!();
eprintln!();
if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
roboticus_server::enable_stderr_logging();
}
info!("Roboticus listening on http://{bind_addr}");
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(shutdown_signal)
.await?;
info!("Server shut down");
Ok(())
}
#[cfg(unix)]
pub(crate) fn find_roboticus_listeners(port: u16) -> Result<Vec<u32>, Box<dyn std::error::Error>> {
let output = std::process::Command::new("lsof")
.args(["-ti", &format!(":{port}")])
.output()?;
let pids: Vec<u32> = String::from_utf8_lossy(&output.stdout)
.lines()
.filter_map(|line| line.trim().parse::<u32>().ok())
.filter(|&pid| {
if let Ok(cmd_output) = std::process::Command::new("ps")
.args(["-p", &pid.to_string(), "-o", "comm="])
.output()
{
let cmd = String::from_utf8_lossy(&cmd_output.stdout);
cmd.trim().contains("roboticus")
} else {
false
}
})
.collect();
Ok(pids)
}