use std::sync::Arc;
use anyhow::{Context, Result};
use tracing::{error, info, warn};
use zeptoclaw::bus::MessageBus;
use zeptoclaw::channels::{register_configured_channels, ChannelManager};
use zeptoclaw::config::{Config, ContainerAgentBackend};
use zeptoclaw::health::{
health_port, start_health_server, start_periodic_usage_flush, UsageMetrics,
};
use zeptoclaw::heartbeat::{ensure_heartbeat_file, HeartbeatService};
use zeptoclaw::providers::{
configured_provider_names, resolve_runtime_provider, RUNTIME_SUPPORTED_PROVIDERS,
};
use super::common::create_agent;
use super::heartbeat::heartbeat_file_path;
pub(crate) async fn cmd_gateway(
containerized_flag: Option<String>,
tunnel_flag: Option<String>,
) -> Result<()> {
println!("Starting ZeptoClaw Gateway...");
let mut config = Config::load().with_context(|| "Failed to load configuration")?;
let containerized = containerized_flag.is_some();
if let Some(ref b) = containerized_flag {
if b != "auto" {
config.container_agent.backend = match b.to_lowercase().as_str() {
"docker" => ContainerAgentBackend::Docker,
#[cfg(target_os = "macos")]
"apple" => ContainerAgentBackend::Apple,
"auto" => ContainerAgentBackend::Auto,
other => {
#[cfg(target_os = "macos")]
return Err(anyhow::anyhow!(
"Unknown backend '{}'. Use: docker or apple",
other
));
#[cfg(not(target_os = "macos"))]
return Err(anyhow::anyhow!("Unknown backend '{}'. Use: docker", other));
}
};
}
}
let mut _tunnel: Option<Box<dyn zeptoclaw::tunnel::TunnelProvider>> = None;
let tunnel_provider = tunnel_flag.or(config.tunnel.provider.clone());
if let Some(ref provider) = tunnel_provider {
let mut tunnel_config = config.tunnel.clone();
tunnel_config.provider = Some(provider.clone());
let mut t = zeptoclaw::tunnel::create_tunnel(&tunnel_config)
.with_context(|| format!("Failed to create {} tunnel", provider))?;
let gateway_port = config.gateway.port;
let tunnel_url = t
.start(gateway_port)
.await
.with_context(|| format!("Failed to start {} tunnel", provider))?;
println!("Tunnel active: {}", tunnel_url);
_tunnel = Some(t);
}
let bus = Arc::new(MessageBus::new());
let metrics = Arc::new(UsageMetrics::new());
let hp = health_port();
let health_handle = match start_health_server(hp, Arc::clone(&metrics)).await {
Ok(handle) => {
info!(
port = hp,
"Health endpoints available at /healthz and /readyz"
);
Some(handle)
}
Err(e) => {
warn!(error = %e, "Failed to start health server (non-fatal)");
None
}
};
let (usage_shutdown_tx, usage_shutdown_rx) = tokio::sync::watch::channel(false);
let usage_flush_handle = start_periodic_usage_flush(Arc::clone(&metrics), usage_shutdown_rx);
let mut proxy = None;
let proxy_handle = if containerized {
info!("Starting gateway with containerized agent mode");
let backend = zeptoclaw::gateway::resolve_backend(&config.container_agent)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
info!("Resolved container backend: {}", backend);
match backend {
zeptoclaw::gateway::ResolvedBackend::Docker => {
validate_docker_available(configured_docker_binary(&config.container_agent))
.await?;
}
#[cfg(target_os = "macos")]
zeptoclaw::gateway::ResolvedBackend::Apple => {
validate_apple_available().await?;
}
}
let image = &config.container_agent.image;
if backend == zeptoclaw::gateway::ResolvedBackend::Docker {
let docker_binary = configured_docker_binary(&config.container_agent);
let image_check = tokio::process::Command::new(docker_binary)
.args(["image", "inspect", image])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.await;
if !image_check.map(|s| s.success()).unwrap_or(false) {
eprintln!(
"Warning: Docker image '{}' not found (checked via '{}').",
image, docker_binary
);
eprintln!("Build it with: {} build -t {} .", docker_binary, image);
return Err(anyhow::anyhow!(
"Docker image '{}' not found (checked via '{}')",
image,
docker_binary
));
}
}
info!("Using container image: {} (backend={})", image, backend);
let proxy_instance = Arc::new(zeptoclaw::gateway::ContainerAgentProxy::new(
config.clone(),
bus.clone(),
backend,
));
proxy_instance.set_usage_metrics(Arc::clone(&metrics));
let proxy_for_task = Arc::clone(&proxy_instance);
let proxy_metrics = Arc::clone(&metrics);
proxy = Some(proxy_instance);
Some(tokio::spawn(async move {
if let Err(e) = proxy_for_task.start().await {
error!("Container agent proxy error: {}", e);
}
proxy_metrics.set_ready(false);
warn!("Container agent proxy stopped; readiness set to false");
}))
} else {
let runtime_provider_name = resolve_runtime_provider(&config).map(|provider| provider.name);
if runtime_provider_name.is_none() {
let configured = configured_provider_names(&config);
if configured.is_empty() {
error!("No AI provider configured. Set ZEPTOCLAW_PROVIDERS_ANTHROPIC_API_KEY");
error!("or add your API key to {:?}", Config::path());
} else {
error!(
"Configured provider(s) are not supported by this runtime: {}",
configured.join(", ")
);
error!(
"Currently supported runtime providers: {}",
RUNTIME_SUPPORTED_PROVIDERS.join(", ")
);
}
std::process::exit(1);
}
None
};
let agent = if !containerized {
let agent = create_agent(config.clone(), bus.clone()).await?;
agent.set_usage_metrics(Arc::clone(&metrics)).await;
Some(agent)
} else {
None
};
let channel_manager = ChannelManager::new(bus.clone(), config.clone());
let channel_count = register_configured_channels(&channel_manager, bus.clone(), &config).await;
if channel_count == 0 {
warn!(
"No channels configured. Enable channels in {:?}",
Config::path()
);
warn!("The agent loop will still run but won't receive messages from external sources.");
} else {
info!("Registered {} channel(s)", channel_count);
}
channel_manager
.start_all()
.await
.with_context(|| "Failed to start channels")?;
let heartbeat_service = if config.heartbeat.enabled {
let hb_path = heartbeat_file_path(&config);
match ensure_heartbeat_file(&hb_path).await {
Ok(true) => info!("Created heartbeat file template at {:?}", hb_path),
Ok(false) => {}
Err(e) => warn!("Failed to initialize heartbeat file {:?}: {}", hb_path, e),
}
let service = Arc::new(HeartbeatService::new(
hb_path,
config.heartbeat.interval_secs,
bus.clone(),
"heartbeat:system",
));
service.start().await?;
Some(service)
} else {
None
};
let agent_handle = if let Some(ref agent) = agent {
let agent_clone = Arc::clone(agent);
let agent_metrics = Arc::clone(&metrics);
Some(tokio::spawn(async move {
if let Err(e) = agent_clone.start().await {
error!("Agent loop error: {}", e);
}
agent_metrics.set_ready(false);
warn!("Agent loop stopped; readiness set to false");
}))
} else {
None
};
metrics.set_ready(true);
println!();
if containerized {
println!("Gateway is running (containerized mode). Press Ctrl+C to stop.");
} else {
println!("Gateway is running. Press Ctrl+C to stop.");
}
println!();
tokio::signal::ctrl_c()
.await
.with_context(|| "Failed to listen for Ctrl+C")?;
println!();
println!("Shutting down...");
metrics.set_ready(false);
let _ = usage_shutdown_tx.send(true);
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), usage_flush_handle).await;
if let Some(service) = &heartbeat_service {
service.stop().await;
}
if let Some(ref agent) = agent {
agent.stop();
}
if let Some(ref proxy) = proxy {
proxy.stop();
}
channel_manager
.stop_all()
.await
.with_context(|| "Failed to stop channels")?;
if let Some(handle) = agent_handle {
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
}
if let Some(handle) = proxy_handle {
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
}
if let Some(handle) = health_handle {
handle.abort();
}
println!("Gateway stopped.");
Ok(())
}
async fn validate_docker_available(docker_binary: &str) -> Result<()> {
if !zeptoclaw::gateway::is_docker_available_with_binary(docker_binary).await {
return Err(anyhow::anyhow!(
"Docker is not available via '{}'. Install Docker or run without --containerized.",
docker_binary
));
}
Ok(())
}
fn configured_docker_binary(config: &zeptoclaw::config::ContainerAgentConfig) -> &str {
config
.docker_binary
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("docker")
}
#[cfg(target_os = "macos")]
async fn validate_apple_available() -> Result<()> {
if !zeptoclaw::gateway::is_apple_container_available().await {
return Err(anyhow::anyhow!(
"Apple Container is not available. Requires macOS 15+ with `container` CLI installed."
));
}
Ok(())
}