use std::collections::{BTreeSet, HashSet};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use chrono::Utc;
use clap::{Args, Subcommand, ValueEnum};
use croner::Cron;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use stakpak_api::AgentProvider;
use stakpak_shared::utils::normalize_optional_string;
use crate::{
config::{AppConfig, profile_resolver::resolve_profile_run_overrides},
onboarding::{OnboardingMode, run_onboarding},
utils::server_context::{load_remote_skills_context, startup_project_dir},
};
mod presenter;
mod probes;
use self::probes::{
AutopilotProbeContext, ProbeMode, RealProbeEnvironment, run_autopilot_probes,
summarize as summarize_probe_results,
};
const DEFAULT_SYSTEM_PROMPT: &str = include_str!("../../prompts/system_prompt.v1.md");
#[derive(Args, PartialEq, Debug, Clone)]
pub struct StartArgs {
/// Bind address for embedded server runtime
#[arg(long, default_value = "127.0.0.1:4096")]
pub bind: String,
/// Show generated auth token in stdout (local dev only)
#[arg(long, default_value_t = false)]
pub show_token: bool,
/// Disable auth checks for protected routes (local dev only)
#[arg(long, default_value_t = false)]
pub no_auth: bool,
/// Override default model for server runs (provider/model or model id)
#[arg(long)]
pub model: Option<String>,
/// Auto-approve all tools (CI/headless only)
#[arg(long, default_value_t = false)]
pub auto_approve_all: bool,
/// Run in foreground instead of delegating to OS service
#[arg(long, default_value_t = false)]
pub foreground: bool,
/// Do not prompt; require env vars / pre-existing config for setup
#[arg(long, default_value_t = false)]
pub non_interactive: bool,
/// Overwrite existing config (re-initialize from scratch)
#[arg(long, default_value_t = false)]
pub force: bool,
}
#[derive(Args, PartialEq, Debug, Clone)]
pub struct StopArgs {
/// Also remove installed OS service definition
#[arg(long, default_value_t = false)]
pub uninstall: bool,
}
#[derive(Subcommand, PartialEq, Debug, Clone)]
pub enum AutopilotCommands {
/// Start autopilot and install as system service (runs setup on first use)
#[command(name = "up")]
Up {
#[command(flatten)]
args: StartArgs,
/// Internal flag used by service units to avoid recursive delegation
#[arg(long, hide = true, default_value_t = false)]
from_service: bool,
},
/// Stop autopilot and remove system service
#[command(name = "down")]
Down {
#[command(flatten)]
args: StopArgs,
},
/// Show health, uptime, schedule/channel metadata, and recent activity
Status {
/// Emit machine-readable JSON output
#[arg(long, default_value_t = false)]
json: bool,
/// Include recent schedule runs (count)
#[arg(long)]
recent_runs: Option<u32>,
},
/// Stream autopilot logs
Logs {
/// Follow log output
#[arg(short = 'f', long)]
follow: bool,
/// Number of lines to show initially
#[arg(short = 'n', long)]
lines: Option<u32>,
/// Filter logs by component
#[arg(short = 'c', long, value_parser = ["scheduler", "server", "gateway"])]
component: Option<String>,
},
/// Restart autopilot (reload config)
Restart,
/// Manage scheduled tasks
#[command(subcommand)]
Schedule(AutopilotScheduleCommands),
/// Manage messaging channels (Slack, Telegram, Discord)
#[command(subcommand)]
Channel(AutopilotChannelCommands),
/// Run preflight checks for autopilot setup/runtime
Doctor,
}
#[derive(Subcommand, PartialEq, Debug, Clone)]
pub enum AutopilotScheduleCommands {
/// List all schedules
List,
/// Add a schedule
Add {
/// Schedule name
name: String,
/// Cron expression
#[arg(long)]
cron: String,
/// Prompt to run on trigger
#[arg(long)]
prompt: String,
/// Check script path
#[arg(long)]
check: Option<String>,
/// When to trigger after check
#[arg(long, default_value_t = ScheduleTriggerOn::Failure)]
trigger_on: ScheduleTriggerOn,
// /// Working directory for this schedule
// #[arg(long)]
// workdir: Option<String>,
/// Max agent steps
#[arg(long, default_value_t = 50)]
max_steps: u32,
/// Report results to this channel
#[arg(long)]
channel: Option<String>,
/// Profile from config.toml used for this schedule's sessions
#[arg(long)]
profile: Option<String>,
/// Require approval before acting
#[arg(long, default_value_t = false)]
pause_on_approval: bool,
/// Run agent tool calls inside a sandboxed warden container
#[arg(long, default_value_t = false)]
sandbox: bool,
/// Enable immediately
#[arg(long, default_value_t = true)]
enabled: bool,
},
/// Remove a schedule
Remove { name: String },
/// Enable a schedule
Enable { name: String },
/// Disable a schedule
Disable { name: String },
/// Show run history for a schedule
History {
/// Schedule name
name: String,
/// Number of rows to show
#[arg(long, default_value_t = 20, value_parser = clap::value_parser!(u32).range(1..=1000))]
limit: u32,
},
/// Manually trigger a schedule now
Trigger {
/// Schedule name
name: String,
/// Preview what would happen without actually triggering
#[arg(long)]
dry_run: bool,
},
/// Show details of a specific run
Show {
/// Run ID
id: i64,
},
/// Clean up stale runs and optionally prune old history
Clean {
/// Also prune runs older than this many days
#[arg(long)]
older_than_days: Option<u32>,
},
}
#[derive(Subcommand, PartialEq, Debug, Clone)]
pub enum AutopilotChannelCommands {
/// List all channels
List,
/// Add a channel
#[command(
after_long_help = "HOW TO GET TOKENS:\n\n Slack (requires both --bot-token and --app-token):\n\n RECOMMENDED: Use the app manifest for quick setup:\n 1. Go to https://api.slack.com/apps → Create New App → From an app manifest\n 2. Paste the manifest from: https://github.com/stakpak/agent/blob/main/libs/gateway/src/channels/slack-manifest.yaml\n 3. Basic Information → App-Level Tokens → generate token with connections:write scope (xapp-...)\n 4. Install to Workspace → copy Bot User OAuth Token (xoxb-...)\n\n Manual setup (if you already have an app):\n 1. Create app at https://api.slack.com/apps\n 2. Enable Socket Mode → generate app-level token (xapp-...) with connections:write scope\n 3. OAuth & Permissions → add Bot Token Scopes:\n app_mentions:read, channels:history, channels:read, chat:write,\n groups:history, groups:read, im:history, im:read,\n mpim:history, mpim:read, reactions:read, reactions:write\n 4. Event Subscriptions → subscribe to bot events:\n message.channels, message.groups, message.im, app_mention\n 5. Interactivity & Shortcuts → enable\n 6. Install to Workspace → copy Bot User OAuth Token (xoxb-...)\n\n Telegram:\n 1. Message @BotFather on Telegram\n 2. Send /newbot → choose name and username (must end in 'bot')\n 3. Copy the bot token (format: 123456789:ABCdef...)\n\n Discord:\n 1. Create app at https://discord.com/developers/applications\n 2. Bot tab → copy the bot token\n 3. OAuth2 → enable bot scope and required permissions\n\n Optional default notification target:\n --target sets [notifications].channel/chat_id for watch alerts\n Example: --target \"#engineering\" (Slack)\n"
)]
Add {
/// Channel type (slack, telegram, discord)
#[arg(value_enum)]
channel_type: ChannelType,
/// Bot token (Telegram bot token, Discord bot token)
#[arg(long)]
token: Option<String>,
/// Slack bot token (xoxb-...)
#[arg(long)]
bot_token: Option<String>,
/// Slack app token (xapp-...)
#[arg(long)]
app_token: Option<String>,
/// Default notification target (Slack channel, Telegram chat_id, Discord channel_id)
#[arg(long)]
target: Option<String>,
/// Profile from config.toml used for sessions started from this channel
#[arg(long)]
profile: Option<String>,
},
/// Remove a channel
Remove {
/// Channel type (slack, telegram, discord)
#[arg(value_enum)]
channel_type: ChannelType,
},
/// Test channel connectivity
Test,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ValueEnum, Default)]
#[serde(rename_all = "snake_case")]
pub enum ScheduleTriggerOn {
Success,
#[default]
Failure,
Any,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ValueEnum)]
#[serde(rename_all = "snake_case")]
pub enum ChannelType {
Slack,
Telegram,
Discord,
Whatsapp,
Webhook,
}
impl AutopilotCommands {
pub async fn run(self, mut config: AppConfig) -> Result<(), String> {
match self {
AutopilotCommands::Up { args, from_service } => {
start_autopilot(
&mut config,
StartOptions {
bind: args.bind,
show_token: args.show_token,
no_auth: args.no_auth,
model: args.model,
auto_approve_all: args.auto_approve_all,
foreground: args.foreground,
from_service,
non_interactive: args.non_interactive,
force: args.force,
sandbox_mode: stakpak_server::SandboxMode::default(),
},
)
.await
}
AutopilotCommands::Down { args: _ } => stop_autopilot().await,
AutopilotCommands::Status { json, recent_runs } => {
status_autopilot(&config, json, recent_runs).await
}
AutopilotCommands::Logs {
follow,
lines,
component,
} => logs_autopilot(follow, lines, component).await,
AutopilotCommands::Restart => restart_autopilot().await,
AutopilotCommands::Schedule(command) => run_schedule_command(command, &config).await,
AutopilotCommands::Channel(command) => run_channel_command(command, &config).await,
AutopilotCommands::Doctor => doctor_autopilot(&config).await,
}
}
}
#[derive(Debug, Clone)]
struct StartOptions {
bind: String,
show_token: bool,
no_auth: bool,
model: Option<String>,
auto_approve_all: bool,
foreground: bool,
from_service: bool,
non_interactive: bool,
force: bool,
sandbox_mode: stakpak_server::SandboxMode,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct AutopilotConfigFile {
#[serde(default)]
server: AutopilotServerConfig,
#[serde(default)]
schedules: Vec<AutopilotScheduleConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct AutopilotServerConfig {
#[serde(default = "default_server_listen")]
listen: String,
#[serde(default)]
show_token: bool,
#[serde(default)]
no_auth: bool,
#[serde(default)]
model: Option<String>,
#[serde(default)]
auto_approve_all: bool,
/// Controls sandbox container lifecycle: "persistent" (default) spawns once
/// at startup and reuses it; "ephemeral" spawns a new container per session.
#[serde(default)]
sandbox_mode: stakpak_server::SandboxMode,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct AutopilotScheduleConfig {
name: String,
cron: String,
prompt: String,
#[serde(default)]
check: Option<String>,
#[serde(default)]
trigger_on: ScheduleTriggerOn,
// #[serde(default)]
// workdir: Option<String>,
#[serde(default = "default_schedule_max_steps")]
max_steps: u32,
#[serde(default)]
channel: Option<String>,
#[serde(default)]
profile: Option<String>,
#[serde(default)]
pause_on_approval: bool,
#[serde(default)]
sandbox: bool,
#[serde(default = "default_enabled")]
enabled: bool,
}
impl Default for AutopilotServerConfig {
fn default() -> Self {
Self {
listen: default_server_listen(),
show_token: false,
no_auth: false,
model: None,
auto_approve_all: false,
sandbox_mode: stakpak_server::SandboxMode::default(),
}
}
}
impl AutopilotConfigFile {
fn path() -> PathBuf {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".stakpak")
.join("autopilot.toml")
}
fn load_or_default() -> Result<Self, String> {
let path = Self::path();
if !path.exists() {
return Ok(Self::default());
}
Self::load_from_path(&path)
}
async fn load_or_default_async() -> Result<Self, String> {
tokio::task::spawn_blocking(Self::load_or_default)
.await
.map_err(|e| format!("Failed to join config load task: {}", e))?
}
fn load_from_path(path: &Path) -> Result<Self, String> {
let content = std::fs::read_to_string(path)
.map_err(|e| format!("Failed to read autopilot config {}: {}", path.display(), e))?;
let config: Self = toml::from_str(&content)
.map_err(|e| format!("Failed to parse autopilot config {}: {}", path.display(), e))?;
Ok(config)
}
fn save(&self) -> Result<PathBuf, String> {
let path = Self::path();
self.save_to_path(&path)?;
Ok(path)
}
fn save_to_path(&self, path: &Path) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create autopilot config dir: {}", e))?;
}
let mut root = load_toml_root_table(path)?;
{
let server = ensure_toml_table(&mut root, "server");
server.insert(
"listen".to_string(),
toml::Value::String(self.server.listen.clone()),
);
server.insert(
"show_token".to_string(),
toml::Value::Boolean(self.server.show_token),
);
server.insert(
"no_auth".to_string(),
toml::Value::Boolean(self.server.no_auth),
);
match &self.server.model {
Some(model) => {
server.insert("model".to_string(), toml::Value::String(model.clone()));
}
None => {
server.remove("model");
}
}
server.insert(
"auto_approve_all".to_string(),
toml::Value::Boolean(self.server.auto_approve_all),
);
}
root.insert(
"schedules".to_string(),
toml::Value::try_from(&self.schedules)
.map_err(|e| format!("Failed to serialize schedules: {}", e))?,
);
write_toml_root_table(path, root)
}
fn find_schedule(&self, name: &str) -> Option<&AutopilotScheduleConfig> {
self.schedules.iter().find(|schedule| schedule.name == name)
}
fn find_schedule_mut(&mut self, name: &str) -> Option<&mut AutopilotScheduleConfig> {
self.schedules
.iter_mut()
.find(|schedule| schedule.name == name)
}
}
impl std::fmt::Display for ScheduleTriggerOn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ScheduleTriggerOn::Success => write!(f, "success"),
ScheduleTriggerOn::Failure => write!(f, "failure"),
ScheduleTriggerOn::Any => write!(f, "any"),
}
}
}
impl std::fmt::Display for ChannelType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChannelType::Slack => write!(f, "slack"),
ChannelType::Telegram => write!(f, "telegram"),
ChannelType::Discord => write!(f, "discord"),
ChannelType::Whatsapp => write!(f, "whatsapp"),
ChannelType::Webhook => write!(f, "webhook"),
}
}
}
fn default_server_listen() -> String {
"127.0.0.1:4096".to_string()
}
fn default_enabled() -> bool {
true
}
fn default_schedule_max_steps() -> u32 {
50
}
fn load_toml_root_table(path: &Path) -> Result<toml::value::Table, String> {
if !path.exists() {
return Ok(toml::value::Table::new());
}
let content = std::fs::read_to_string(path)
.map_err(|e| format!("Failed to read autopilot config {}: {}", path.display(), e))?;
let value: toml::Value = toml::from_str(&content)
.map_err(|e| format!("Failed to parse autopilot config {}: {}", path.display(), e))?;
match value {
toml::Value::Table(table) => Ok(table),
_ => Err(format!(
"Failed to parse autopilot config {}: top-level TOML value must be a table",
path.display()
)),
}
}
fn ensure_toml_table<'a>(
table: &'a mut toml::value::Table,
key: &str,
) -> &'a mut toml::value::Table {
if !matches!(table.get(key), Some(toml::Value::Table(_))) {
table.insert(
key.to_string(),
toml::Value::Table(toml::value::Table::new()),
);
}
match table.get_mut(key) {
Some(toml::Value::Table(subtable)) => subtable,
_ => unreachable!("table key was just initialized"),
}
}
fn write_toml_root_table(path: &Path, root: toml::value::Table) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create autopilot config dir: {}", e))?;
}
let content = toml::to_string_pretty(&toml::Value::Table(root))
.map_err(|e| format!("Failed to serialize autopilot config: {}", e))?;
std::fs::write(path, content)
.map_err(|e| format!("Failed to write autopilot config {}: {}", path.display(), e))
}
impl StartOptions {
fn with_server_config(mut self, server: &AutopilotServerConfig) -> Self {
self.bind = server.listen.clone();
self.show_token = server.show_token;
self.no_auth = server.no_auth;
self.model = server.model.clone();
self.auto_approve_all = server.auto_approve_all;
self.sandbox_mode = server.sandbox_mode.clone();
self
}
fn has_runtime_overrides(&self) -> bool {
self.bind != "127.0.0.1:4096"
|| self.show_token
|| self.no_auth
|| self.model.is_some()
|| self.auto_approve_all
}
}
impl AutopilotServerConfig {
fn from_start_options(options: &StartOptions) -> Self {
// Load existing config to preserve fields that are only set via config
// file (not CLI flags). CLI flags at their default value should NOT
// overwrite explicitly-set config values.
let existing = AutopilotConfigFile::load_or_default()
.map(|c| c.server)
.unwrap_or_default();
Self {
listen: options.bind.clone(),
show_token: options.show_token,
no_auth: options.no_auth,
model: options.model.clone(),
// Preserve auto_approve_all from config when CLI flag is at default (false).
// Only override if the user explicitly passed --auto-approve-all.
auto_approve_all: options.auto_approve_all || existing.auto_approve_all,
sandbox_mode: existing.sandbox_mode,
}
}
}
#[derive(Debug, Serialize)]
struct AutopilotStatusJson {
command: &'static str,
ok: bool,
profile: String,
config_path: String,
server_config: AutopilotServerConfig,
server_allowed_tool_count: usize,
service: ServiceStatusJson,
server: EndpointStatusJson,
gateway: EndpointStatusJson,
sandbox: SandboxStatusJson,
scheduler: SchedulerStatusJson,
schedules: Vec<AutopilotScheduleStatusJson>,
channels: Vec<AutopilotChannelStatusJson>,
}
#[derive(Debug, Serialize)]
struct ServiceStatusJson {
installed: bool,
active: bool,
path: String,
}
#[derive(Debug, Serialize)]
struct EndpointStatusJson {
expected_enabled: bool,
reachable: bool,
url: String,
}
#[derive(Debug, Serialize)]
struct SandboxStatusJson {
mode: String,
healthy: Option<bool>,
consecutive_ok: Option<u64>,
consecutive_failures: Option<u64>,
last_ok: Option<String>,
last_error: Option<String>,
}
#[derive(Debug, Serialize)]
struct SchedulerStatusJson {
expected_enabled: bool,
config_path: String,
config_valid: bool,
trigger_count: usize,
running: bool,
pid: Option<i64>,
stale_pid: bool,
db_path: Option<String>,
error: Option<String>,
recent_runs: Vec<ScheduleRunSummaryJson>,
}
#[derive(Debug, Serialize)]
struct ScheduleRunSummaryJson {
id: i64,
schedule_name: String,
status: String,
started_at: String,
finished_at: Option<String>,
error_message: Option<String>,
}
#[derive(Debug, Serialize)]
struct AutopilotScheduleStatusJson {
name: String,
cron: String,
enabled: bool,
sandbox: bool,
next_run: Option<String>,
}
#[derive(Debug, Serialize)]
struct AutopilotChannelStatusJson {
name: String,
channel_type: String,
target: String,
enabled: bool,
alerts_only: bool,
}
#[cfg(unix)]
fn detect_host_user_mapping() -> stakpak_server::SandboxUserMapping {
let uid = read_unix_id_value("-u");
let gid = read_unix_id_value("-g");
match (uid, gid) {
(Some(uid), Some(gid)) => stakpak_server::SandboxUserMapping::HostUser { uid, gid },
_ => stakpak_server::SandboxUserMapping::ImageDefault,
}
}
#[cfg(not(unix))]
fn detect_host_user_mapping() -> stakpak_server::SandboxUserMapping {
stakpak_server::SandboxUserMapping::ImageDefault
}
#[cfg(unix)]
fn read_unix_id_value(flag: &str) -> Option<u32> {
let output = std::process::Command::new("id").arg(flag).output().ok()?;
if !output.status.success() {
return None;
}
String::from_utf8_lossy(&output.stdout)
.trim()
.parse::<u32>()
.ok()
}
async fn run_startup_preflight(config: &AppConfig, bind_addr: &str) -> Result<(), String> {
let base_url = loopback_base_url_from_bind(bind_addr);
let server_health_url = format!("{base_url}/v1/health");
let probe_client = build_probe_http_client();
let server_reachable = if let Some(client) = probe_client.as_ref() {
endpoint_ok(client, &server_health_url).await
} else {
false
};
let env = RealProbeEnvironment;
let ctx = AutopilotProbeContext {
app_config: config,
bind_addr: Some(bind_addr),
server_reachable,
};
let results = run_autopilot_probes(ProbeMode::Startup, &ctx, &env);
presenter::print_probe_report("Preflight checks", &results);
let summary = summarize_probe_results(&results);
if summary.blocking_failures > 0 {
return Err(format!(
"Preflight checks failed with {} blocking issue(s)",
summary.blocking_failures
));
}
Ok(())
}
async fn start_autopilot(config: &mut AppConfig, options: StartOptions) -> Result<(), String> {
let autopilot_config_path = AutopilotConfigFile::path();
let needs_setup = !autopilot_config_path.exists() || options.force;
// ── First-run setup (merged from old `init`) ──────────────────────
if needs_setup {
println!("Stakpak Autopilot setup");
println!("Profile: {}", config.profile_name);
println!();
// Credential check — interactive gets onboarding wizard, non-interactive gets error
let has_stakpak_key = config.get_stakpak_api_key().is_some();
let has_provider_keys = !config.get_llm_provider_config().providers.is_empty();
if !has_stakpak_key && !has_provider_keys {
if options.non_interactive {
return Err(
"No provider credentials configured. Run with credentials in env or run interactive setup without --non-interactive.".to_string(),
);
}
println!("No credentials found. Launching onboarding...");
run_onboarding(config, OnboardingMode::Default).await;
println!();
}
// Write default config template (or overwrite with --force)
write_default_autopilot_config(&autopilot_config_path, options.force).await?;
println!(
"✓ Autopilot config created: {}",
autopilot_config_path.display()
);
// Pick up channel tokens from environment
let telegram_token = std::env::var("TELEGRAM_BOT_TOKEN").ok();
let discord_token = std::env::var("DISCORD_BOT_TOKEN").ok();
let slack_bot_token = std::env::var("SLACK_BOT_TOKEN").ok();
let slack_app_token = std::env::var("SLACK_APP_TOKEN").ok();
let has_env_channels = telegram_token.is_some()
|| discord_token.is_some()
|| (slack_bot_token.is_some() && slack_app_token.is_some());
if has_env_channels {
let mut gateway_config = stakpak_gateway::GatewayConfig::load(
autopilot_config_path.as_path(),
&stakpak_gateway::GatewayCliFlags::default(),
)
.unwrap_or_default();
if let Some(token) = telegram_token {
gateway_config.channels.telegram = Some(stakpak_gateway::config::TelegramConfig {
token,
require_mention: false,
model: None,
auto_approve: None,
profile: Some(config.profile_name.clone()),
});
}
if let Some(token) = discord_token {
gateway_config.channels.discord = Some(stakpak_gateway::config::DiscordConfig {
token,
guilds: Vec::new(),
model: None,
auto_approve: None,
profile: Some(config.profile_name.clone()),
});
}
if let (Some(bot_token), Some(app_token)) = (slack_bot_token, slack_app_token) {
gateway_config.channels.slack = Some(stakpak_gateway::config::SlackConfig {
bot_token,
app_token,
model: None,
auto_approve: None,
profile: Some(config.profile_name.clone()),
});
}
gateway_config
.save(autopilot_config_path.as_path())
.map_err(|e| format!("Failed to save channel config: {e}"))?;
let channels = gateway_config.enabled_channels();
println!(
"✓ Channels configured from environment: {}",
channels.join(", ")
);
} else if !options.non_interactive {
println!();
println!("Channels let autopilot talk to you on Slack, Telegram, or Discord.");
println!("You can add them now or later with: stakpak autopilot channel add");
println!();
println!(" Slack quick setup: use the app manifest at");
println!(
" https://github.com/stakpak/agent/blob/main/libs/gateway/src/channels/slack-manifest.yaml"
);
println!();
}
}
// ── Load config and apply runtime overrides ──────────────────────
let config_path = AutopilotConfigFile::path();
let saved_config = AutopilotConfigFile::load_or_default()?;
let has_runtime_overrides = options.has_runtime_overrides();
let effective_server = if has_runtime_overrides || needs_setup {
let server_config = AutopilotServerConfig::from_start_options(&options);
let mut config_file = saved_config.clone();
config_file.server = server_config.clone();
config_file.save()?;
if has_runtime_overrides && !needs_setup {
println!("✓ Saved server overrides to {}", config_path.display());
}
server_config
} else {
saved_config.server.clone()
};
let effective_options = options.clone().with_server_config(&effective_server);
if effective_options.foreground || effective_options.from_service {
if !effective_options.from_service {
run_startup_preflight(config, &effective_server.listen).await?;
}
return start_foreground_runtime(config, &effective_options).await;
}
// Idempotency: if autopilot is already running, skip start
if let Some(pid) = is_autopilot_running() {
println!("Autopilot is already running (PID {}).", pid);
println!();
println!(" Status stakpak autopilot status");
println!(" Restart stakpak autopilot restart");
println!(" Stop stakpak autopilot down");
return Ok(());
}
run_startup_preflight(config, &effective_server.listen).await?;
if !autopilot_service_installed() {
install_autopilot_service(config)?;
println!("✓ Installed autopilot service");
}
// Ensure the sandbox container image is available locally before starting the
// service. Pulling inside the service process produces no visible output — the
// user just sees a frozen "waiting" message. By pulling here we inherit
// stdout/stderr so Docker's progress bars are visible. This applies to both
// persistent and ephemeral modes since both need the image.
ensure_sandbox_image_available()?;
let expects_sandbox = effective_server.sandbox_mode == stakpak_server::SandboxMode::Persistent;
start_autopilot_service()?;
// Wait for the server (and persistent sandbox if configured) to be ready
// before printing the status summary. This ensures `stakpak up` only returns
// once the autopilot is fully operational.
let base_url = format!("http://{}", effective_server.listen);
let health_url = format!("{base_url}/v1/health");
let max_wait = std::time::Duration::from_secs(120);
let poll_interval = std::time::Duration::from_millis(500);
let start = std::time::Instant::now();
// Spinner frames for the waiting animation
const SPINNER: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
let mut spinner_idx: usize = 0;
let mut last_phase = String::new();
let ready = loop {
if start.elapsed() > max_wait {
break false;
}
tokio::time::sleep(poll_interval).await;
// Determine current phase for display
let phase = match reqwest::get(&health_url).await {
Ok(resp) => match resp.json::<serde_json::Value>().await {
Ok(body) => {
if expects_sandbox {
if let Some(sandbox) = body.get("sandbox")
&& sandbox.get("healthy").and_then(|v| v.as_bool()) == Some(true)
{
// Clear the spinner line and break
print!("\r\x1b[2K");
let _ = std::io::Write::flush(&mut std::io::stdout());
break true;
}
"Starting sandbox container...".to_string()
} else {
// Server is up and no sandbox needed — done
print!("\r\x1b[2K");
let _ = std::io::Write::flush(&mut std::io::stdout());
break true;
}
}
Err(_) => "Starting server...".to_string(),
},
Err(_) => "Starting server...".to_string(),
};
// Update spinner on every tick
let elapsed = start.elapsed().as_secs();
let frame = SPINNER[spinner_idx % SPINNER.len()];
spinner_idx += 1;
// Only log phase transitions (avoid spamming the same message)
if phase != last_phase {
last_phase.clone_from(&phase);
}
print!("\r\x1b[2K {frame} {phase} ({elapsed}s)");
let _ = std::io::Write::flush(&mut std::io::stdout());
};
if ready {
println!(" Autopilot ready ✓");
} else {
// Clear spinner line before printing error
print!("\r\x1b[2K");
let _ = std::io::Write::flush(&mut std::io::stdout());
println!(
" ✗ Timed out waiting for autopilot to become healthy ({}s)",
max_wait.as_secs()
);
println!();
println!(" Troubleshoot:");
println!(" stakpak autopilot logs -c server View server logs");
println!(" stakpak autopilot status Check component health");
if expects_sandbox {
println!(" docker ps Verify sandbox container");
}
}
// Clean post-start status summary
let schedule_count = saved_config.schedules.len();
let channel_list = stakpak_gateway::GatewayConfig::load(
config_path.as_path(),
&stakpak_gateway::GatewayCliFlags::default(),
)
.map(|c| c.enabled_channels().join(", "))
.unwrap_or_default();
let resolved_tool_policy = resolve_server_tool_policy(
config.allowed_tools.as_ref(),
config.auto_approve.as_ref(),
effective_server.auto_approve_all,
);
println!();
println!(" Autopilot is running.");
println!();
println!(" Server http://{}", effective_server.listen);
println!(
" Tools {}",
describe_tool_policy(&resolved_tool_policy)
);
println!(
" Schedules {}",
if schedule_count > 0 {
format!("{} active", schedule_count)
} else {
"none (edit ~/.stakpak/autopilot.toml)".to_string()
}
);
println!(
" Channels {}",
if channel_list.is_empty() {
"none (stakpak autopilot channel add)".to_string()
} else {
channel_list
}
);
println!();
println!(" View logs stakpak autopilot logs");
println!(" Status stakpak autopilot status");
println!(" Stop stakpak down");
Ok(())
}
async fn start_foreground_runtime(
config: &AppConfig,
options: &StartOptions,
) -> Result<(), String> {
// --- Per-component file logging ---
// Each runtime gets its own log file under ~/.stakpak/autopilot/logs/.
// Guards must be held for the lifetime of the runtime to ensure logs are flushed.
let log_dir = autopilot_log_dir();
std::fs::create_dir_all(&log_dir)
.map_err(|e| format!("Failed to create autopilot log directory: {}", e))?;
// TODO: add log rotation (daily or size-based) via tracing_appender::rolling::daily()
let scheduler_appender = tracing_appender::rolling::never(&log_dir, "scheduler.log");
let server_appender = tracing_appender::rolling::never(&log_dir, "server.log");
let gateway_appender = tracing_appender::rolling::never(&log_dir, "gateway.log");
let (scheduler_nb, _scheduler_guard) = tracing_appender::non_blocking(scheduler_appender);
let (server_nb, _server_guard) = tracing_appender::non_blocking(server_appender);
let (gateway_nb, _gateway_guard) = tracing_appender::non_blocking(gateway_appender);
{
use tracing_subscriber::fmt::writer::MakeWriterExt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
// Each component gets its own fmt layer with a target-based filter.
let scheduler_layer = tracing_subscriber::fmt::layer()
.with_writer(scheduler_nb.with_filter(|meta: &tracing::Metadata<'_>| {
meta.target().starts_with("stakpak::commands::watch")
}))
.with_target(true)
.with_ansi(false);
let server_layer = tracing_subscriber::fmt::layer()
.with_writer(server_nb.with_filter(|meta: &tracing::Metadata<'_>| {
meta.target().starts_with("stakpak_server")
}))
.with_target(true)
.with_ansi(false);
let gateway_layer = tracing_subscriber::fmt::layer()
.with_writer(gateway_nb.with_filter(|meta: &tracing::Metadata<'_>| {
meta.target().starts_with("stakpak_gateway")
}))
.with_target(true)
.with_ansi(false);
let env_filter =
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
"info,stakpak=info,stakpak_server=info,stakpak_gateway=info".into()
});
// Best-effort: if a global subscriber is already set (e.g. --debug), skip.
let _ = tracing_subscriber::registry()
.with(env_filter)
.with(scheduler_layer)
.with(server_layer)
.with(gateway_layer)
.try_init();
}
// --- 1. Server runtime (initialize before scheduler to avoid sqlite3Close/sqlite3_open
// race on libsql's global state when run_scheduler exits early) ---
let bind = options.bind.clone();
let (auth_config, generated_auth_token) = if options.no_auth {
(stakpak_server::AuthConfig::disabled(), None)
} else {
let token = stakpak_shared::utils::generate_password(64, true);
(
stakpak_server::AuthConfig::token(token.clone()),
Some(token),
)
};
let listener = tokio::net::TcpListener::bind(&bind)
.await
.map_err(|e| format!("Failed to bind {}: {}", bind, e))?;
let listener_addr = listener
.local_addr()
.map_err(|e| format!("Failed to inspect listener address: {}", e))?;
let runtime_client = crate::commands::build_agent_client(config).await?;
let storage = runtime_client.session_storage().clone();
let events = Arc::new(stakpak_server::EventLog::new(4096));
let idempotency = Arc::new(stakpak_server::IdempotencyStore::new(
std::time::Duration::from_secs(24 * 60 * 60),
));
let inference = Arc::new(
stakai::Inference::builder()
.with_registry(runtime_client.stakai().registry().clone())
.build()
.map_err(|e| format!("Failed to initialize inference runtime: {}", e))?,
);
let mut models = runtime_client.list_models().await;
let requested_model = options.model.clone().or(config.model.clone());
let resolved_tool_policy = resolve_server_tool_policy(
config.allowed_tools.as_ref(),
config.auto_approve.as_ref(),
options.auto_approve_all,
);
let requested_model_from_catalog = requested_model.as_deref().and_then(|name| {
if let Some((provider, id)) = name.split_once('/') {
return models
.iter()
.find(|model| model.provider == provider && model.id == id)
.cloned();
}
models.iter().find(|model| model.id == name).cloned()
});
let requested_custom_model = requested_model.as_deref().and_then(|name| {
name.split_once('/')
.map(|(provider, id)| stakai::Model::custom(id, provider))
});
let default_model = requested_model_from_catalog
.clone()
.or(requested_custom_model)
.or_else(|| models.first().cloned())
.or_else(|| Some(stakai::Model::custom("gpt-4o-mini", "openai")));
if let Some(requested) = requested_model.as_deref()
&& requested_model_from_catalog.is_none()
{
if requested.contains('/') {
eprintln!(
"⚠ Requested model '{}' is not in the catalog; using it as a custom model id.",
requested
);
} else if let Some(resolved) = default_model.as_ref() {
eprintln!(
"⚠ Requested model '{}' not found in catalog; using fallback '{}/{}'.",
requested, resolved.provider, resolved.id
);
}
}
if models.is_empty()
&& let Some(default_model) = default_model.clone()
{
models.push(default_model);
}
let mcp_allowed_tools =
mcp_allowed_tools_from_policy(&resolved_tool_policy, config.allowed_tools.as_ref());
let mcp_init_config = crate::commands::agent::run::mcp_init::McpInitConfig {
redact_secrets: true, // applied in proxy layer
privacy_mode: false, // applied in proxy layer
enabled_tools: stakpak_mcp_server::EnabledToolsConfig { slack: false },
enable_mtls: true,
enable_subagents: true,
allowed_tools: mcp_allowed_tools,
subagent_config: stakpak_mcp_server::SubagentConfig {
profile_name: Some(config.profile_name.clone()),
config_path: Some(config.config_path.clone()),
},
};
let mcp_init_result = crate::commands::agent::run::mcp_init::initialize_mcp_server_and_tools(
config,
mcp_init_config,
None,
)
.await
.map_err(|e| format!("Failed to initialize MCP stack: {}", e))?;
let mcp_tools = mcp_init_result
.mcp_tools
.iter()
.map(|tool| stakai::Tool {
tool_type: "function".to_string(),
function: stakai::ToolFunction {
name: tool.name.as_ref().to_string(),
description: tool
.description
.as_ref()
.map(|value| value.to_string())
.unwrap_or_default(),
parameters: serde_json::Value::Object((*tool.input_schema).clone()),
},
provider_options: None,
})
.collect();
// Pre-load remote skills context (fetched from rulebooks API) and capture
// startup project directory for gateway/channel sessions.
let startup_project_dir = startup_project_dir();
let startup_remote_skills = match load_remote_skills_context(&runtime_client).await {
Ok(context_files) => {
tracing::info!(
count = context_files.len(),
"Loaded remote skills context for session bootstrap"
);
context_files
}
Err(error) => {
tracing::warn!(error = %error, "Failed to load remote skills context; sessions will start without it");
Vec::new()
}
};
let app_state = stakpak_server::AppState::new(
storage,
events,
idempotency,
inference,
models,
default_model,
resolved_tool_policy.clone(),
)
.with_base_system_prompt(Some(DEFAULT_SYSTEM_PROMPT.trim().to_string()))
.with_project_dir(startup_project_dir)
.with_skills(startup_remote_skills)
.with_mcp(
mcp_init_result.client,
mcp_tools,
Some(mcp_init_result.server_shutdown_tx),
Some(mcp_init_result.proxy_shutdown_tx),
);
// --- 1b. Sandbox configuration (warden + container image) ---
let warden_path = crate::commands::warden::get_warden_plugin_path().await;
let stakpak_image = crate::commands::warden::stakpak_agent_image();
let volumes = crate::commands::warden::prepare_volumes(config, false);
// Pre-create named volumes to prevent race conditions when parallel sandboxes start
stakpak_shared::container::ensure_named_volumes_exist();
let sandbox_mode = &options.sandbox_mode;
let sandbox_config = stakpak_server::SandboxConfig {
warden_path,
image: stakpak_image.clone(),
volumes,
mode: sandbox_mode.clone(),
user_mapping: detect_host_user_mapping(),
};
tracing::info!(image = %stakpak_image, mode = %sandbox_mode, warden = %sandbox_config.warden_path, "Sandbox config initialized");
let app_state = app_state.with_sandbox(sandbox_config.clone());
// If persistent mode, spawn the sandbox now so sessions get near-zero startup overhead.
// This is a hard requirement — if the sandbox fails to start, the server cannot operate.
let app_state = if *sandbox_mode == stakpak_server::SandboxMode::Persistent {
tracing::info!("Persistent sandbox mode: spawning sandbox at startup");
let persistent = stakpak_server::PersistentSandbox::spawn(&sandbox_config)
.await
.map_err(|e| format!("Failed to spawn persistent sandbox: {e}. The server requires a healthy sandbox to operate. Check Docker is running and the image is available."))?;
app_state.with_persistent_sandbox(persistent)
} else {
app_state
};
// --- 2. Loopback connection for schedule + gateway runtimes ---
let loopback_url = loopback_server_url(listener_addr);
let loopback_token = if options.no_auth {
String::new()
} else {
generated_auth_token.clone().unwrap_or_default()
};
// --- 3. Gateway runtime ---
let config_path = AutopilotConfigFile::path();
let gateway_cli = stakpak_gateway::GatewayCliFlags {
url: Some(loopback_url.clone()),
token: Some(loopback_token.clone()),
..Default::default()
};
let mut gateway_cfg = stakpak_gateway::GatewayConfig::load(config_path.as_path(), &gateway_cli)
.unwrap_or_default();
for warning in gateway_cfg.check_deprecations() {
tracing::warn!("{}", warning);
}
apply_gateway_policy_from_resolved_tools(&mut gateway_cfg, &resolved_tool_policy);
let gateway_profile_overrides = stakpak_gateway::runtime::DispatcherProfileOverrides::new(
gateway_cfg.channels.profiles_map(),
Arc::new(ProfileRunOverrideResolver::new(config.config_path.clone())),
);
let gateway_runtime = if gateway_cfg.has_channels() {
match stakpak_gateway::Gateway::new_with_profile_overrides(
gateway_cfg,
gateway_profile_overrides,
)
.await
{
Ok(gw) => Some(Arc::new(gw)),
Err(e) => {
eprintln!(
"⚠ Failed to initialize gateway: {}. Continuing without channels.",
e
);
None
}
}
} else {
None
};
// --- Build HTTP app ---
let refresh_state = app_state.clone();
let refresh_client = runtime_client.clone();
let (refresh_shutdown_tx, mut refresh_shutdown_rx) = tokio::sync::watch::channel(false);
let refresh_task = tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
if let Err(error) = refresh_state.refresh_mcp_tools().await {
eprintln!("[mcp-refresh] {}", error);
}
match load_remote_skills_context(&refresh_client).await {
Ok(context_files) => {
refresh_state.replace_skills(context_files).await;
}
Err(error) => {
eprintln!("[context-refresh] {}", error);
}
}
}
changed = refresh_shutdown_rx.changed() => {
if changed.is_err() || *refresh_shutdown_rx.borrow() {
break;
}
}
}
}
});
let shutdown_state = app_state.clone();
let shutdown_refresh_tx = refresh_shutdown_tx.clone();
let server_model_id = app_state
.default_model
.as_ref()
.map(|m| format!("{}/{}", m.provider, m.id));
let base_app = stakpak_server::router(app_state, auth_config);
let app = if let Some(gateway_runtime) = gateway_runtime.as_ref() {
let gateway_routes = gateway_runtime.api_router();
base_app.nest_service("/v1/gateway", gateway_routes.into_service())
} else {
base_app
};
let gateway_cancel = tokio_util::sync::CancellationToken::new();
let gateway_task = if let Some(gateway_runtime) = gateway_runtime.as_ref() {
let gateway_runtime = gateway_runtime.clone();
let cancel = gateway_cancel.clone();
Some(tokio::spawn(
async move { gateway_runtime.run(cancel).await },
))
} else {
None
};
let gateway_cancel_for_shutdown = gateway_cancel.clone();
// --- 4. Schedule runtime (spawned AFTER all SQLite initialization to avoid
// sqlite3Close/sqlite3_open race in libsql on musl) ---
let watch_allowed_tools: HashSet<String> = approved_tools_from_policy(&resolved_tool_policy)
.into_iter()
.collect();
let schedule_server = crate::commands::watch::AgentServerConnection {
url: loopback_url,
token: loopback_token,
model: server_model_id,
default_allowed_tools: watch_allowed_tools,
boot_profile: config.profile_name.clone(),
config_path: config.config_path.clone(),
};
let schedule_task = tokio::spawn(async move {
if let Err(error) = crate::commands::watch::commands::run_scheduler(schedule_server).await {
eprintln!("Schedule runtime exited: {}", error);
}
});
// --- Print status ---
println!("Autopilot running in foreground. Press Ctrl+C to stop.");
println!();
println!(" Server http://{}", bind);
if let Some(ref token) = generated_auth_token {
if options.show_token {
println!(" Auth token Bearer {}", token);
} else {
println!(" Auth enabled");
}
} else if options.no_auth {
println!(" Auth disabled");
}
if gateway_runtime.is_some() {
println!(" Gateway enabled");
} else {
println!(" Gateway no channels configured");
}
println!(
" Tools {}",
describe_tool_policy(&resolved_tool_policy)
);
// --- Shutdown handler ---
let shutdown = async move {
wait_for_shutdown_signal().await;
gateway_cancel_for_shutdown.cancel();
for (session_id, run_id) in shutdown_state.run_manager.running_runs().await {
let _ = shutdown_state
.run_manager
.cancel_run(session_id, run_id)
.await;
}
let drain_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
if shutdown_state.run_manager.running_runs().await.is_empty()
|| tokio::time::Instant::now() >= drain_deadline
{
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let _ = shutdown_refresh_tx.send(true);
if let Some(tx) = shutdown_state.mcp_server_shutdown_tx.as_ref() {
let _ = tx.send(());
}
if let Some(tx) = shutdown_state.mcp_proxy_shutdown_tx.as_ref() {
let _ = tx.send(());
}
// Kill the persistent sandbox container (if one is running)
if let Some(ref sandbox) = shutdown_state.persistent_sandbox {
sandbox.kill().await;
}
};
// --- Run server ---
let serve_result = axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await;
// --- Cleanup ---
gateway_cancel.cancel();
if let Some(task) = gateway_task {
match task.await {
Ok(Ok(())) => {}
Ok(Err(e)) => eprintln!("Gateway runtime error: {}", e),
Err(e) => eprintln!("Gateway runtime task failed: {}", e),
}
}
let _ = refresh_shutdown_tx.send(true);
if !refresh_task.is_finished() {
refresh_task.abort();
}
let _ = refresh_task.await;
// Abort the schedule task
schedule_task.abort();
let _ = schedule_task.await;
println!();
println!("Shutting down...");
serve_result.map_err(|e| format!("Server error: {}", e))?;
Ok(())
}
fn loopback_server_url(listener_addr: std::net::SocketAddr) -> String {
let port = listener_addr.port();
if listener_addr.ip().is_ipv6() {
format!("http://[::1]:{port}")
} else {
format!("http://127.0.0.1:{port}")
}
}
fn resolve_server_tool_policy(
allowed_tools: Option<&Vec<String>>,
auto_approve_tools: Option<&Vec<String>>,
auto_approve_all: bool,
) -> stakpak_server::ToolApprovalPolicy {
if auto_approve_all {
return stakpak_server::ToolApprovalPolicy::All;
}
let mut resolved_allowed_tools: Vec<String> = match allowed_tools {
Some(tools) if tools.is_empty() => {
return stakpak_server::ToolApprovalPolicy::All;
}
Some(tools) => tools.clone(),
None => stakpak_server::SAFE_AUTOPILOT_TOOLS
.iter()
.map(|name| (*name).to_string())
.collect(),
};
resolved_allowed_tools.retain(|tool| !tool.trim().is_empty());
let mut policy = stakpak_server::ToolApprovalPolicy::Custom {
rules: std::collections::HashMap::new(),
default: stakpak_server::ToolApprovalAction::Ask,
}
.with_overrides(resolved_allowed_tools.into_iter().filter_map(|tool| {
let trimmed = tool.trim().to_string();
if trimmed.is_empty() {
None
} else {
Some((
stakpak_server::strip_tool_prefix(&trimmed).to_string(),
stakpak_server::ToolApprovalAction::Approve,
))
}
}));
if let Some(overrides) = auto_approve_tools {
policy = policy.with_overrides(overrides.iter().filter_map(|tool| {
let trimmed = tool.trim();
if trimmed.is_empty() {
None
} else {
Some((
stakpak_server::strip_tool_prefix(trimmed).to_string(),
stakpak_server::ToolApprovalAction::Approve,
))
}
}));
}
policy
}
#[derive(Debug, Clone)]
struct ProfileRunOverrideResolver {
config_path: String,
}
impl ProfileRunOverrideResolver {
fn new(config_path: String) -> Self {
Self { config_path }
}
}
impl stakpak_gateway::dispatcher::RunOverrideResolver for ProfileRunOverrideResolver {
fn resolve_run_overrides(
&self,
profile_name: &str,
) -> Option<stakpak_gateway::client::RunOverrides> {
let resolved =
resolve_profile_run_overrides(profile_name, Some(self.config_path.as_str()))?;
let overrides = stakpak_gateway::client::RunOverrides {
model: resolved.model,
auto_approve: resolved
.auto_approve
.map(stakpak_gateway::client::AutoApproveOverride::AllowList),
system_prompt: resolved.system_prompt,
max_turns: resolved.max_turns,
};
if overrides.is_empty() {
None
} else {
Some(overrides)
}
}
}
fn approved_tools_from_policy(policy: &stakpak_server::ToolApprovalPolicy) -> Vec<String> {
match policy {
stakpak_server::ToolApprovalPolicy::Custom { rules, .. } => rules
.iter()
.filter_map(|(name, action)| {
if *action == stakpak_server::ToolApprovalAction::Approve {
let trimmed = name.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
} else {
None
}
})
.collect::<BTreeSet<_>>()
.into_iter()
.collect(),
_ => Vec::new(),
}
}
fn mcp_allowed_tools_from_policy(
policy: &stakpak_server::ToolApprovalPolicy,
configured_allowed_tools: Option<&Vec<String>>,
) -> Option<Vec<String>> {
match policy {
stakpak_server::ToolApprovalPolicy::Custom { default, .. } => {
if *default == stakpak_server::ToolApprovalAction::Ask {
// Ask-by-default policies require full tool visibility so gateway/user
// can approve unlisted tools at runtime.
configured_allowed_tools.cloned()
} else {
let approved = approved_tools_from_policy(policy);
Some(expand_mcp_allowed_tools(&approved))
}
}
stakpak_server::ToolApprovalPolicy::All | stakpak_server::ToolApprovalPolicy::None => {
configured_allowed_tools.cloned()
}
}
}
fn expand_mcp_allowed_tools(tools: &[String]) -> Vec<String> {
let mut normalized = BTreeSet::new();
for tool in tools {
let trimmed = tool.trim();
if trimmed.is_empty() {
continue;
}
normalized.insert(trimmed.to_string());
if !trimmed.starts_with("stakpak__") {
normalized.insert(format!("stakpak__{trimmed}"));
}
}
normalized.into_iter().collect()
}
fn apply_gateway_policy_from_resolved_tools(
gateway_cfg: &mut stakpak_gateway::GatewayConfig,
policy: &stakpak_server::ToolApprovalPolicy,
) {
match policy {
stakpak_server::ToolApprovalPolicy::All => {
gateway_cfg.gateway.approval_mode = stakpak_gateway::ApprovalMode::AllowAll;
gateway_cfg.gateway.approval_allowlist.clear();
}
stakpak_server::ToolApprovalPolicy::Custom { .. } => {
gateway_cfg.gateway.approval_mode = stakpak_gateway::ApprovalMode::Allowlist;
gateway_cfg.gateway.approval_allowlist =
expand_gateway_approval_allowlist(&approved_tools_from_policy(policy));
}
stakpak_server::ToolApprovalPolicy::None => {}
}
}
fn describe_tool_policy(policy: &stakpak_server::ToolApprovalPolicy) -> String {
match policy {
stakpak_server::ToolApprovalPolicy::All => "all tools (auto-approve all)".to_string(),
stakpak_server::ToolApprovalPolicy::Custom { .. } => {
let count = approved_tools_from_policy(policy).len();
format!("{count} tools allowed")
}
stakpak_server::ToolApprovalPolicy::None => "no tools approved".to_string(),
}
}
fn expand_gateway_approval_allowlist(tools: &[String]) -> Vec<String> {
let mut normalized = std::collections::BTreeSet::new();
for tool in tools {
let trimmed = tool.trim();
if trimmed.is_empty() {
continue;
}
normalized.insert(trimmed.to_string());
if !trimmed.starts_with("stakpak__") {
normalized.insert(format!("stakpak__{trimmed}"));
}
}
normalized.into_iter().collect()
}
async fn stop_autopilot() -> Result<(), String> {
let mut stopped = false;
// First, gracefully stop the process via PID file. This triggers the
// shutdown handler which tears down warden + Docker containers cleanly.
// We do this BEFORE uninstalling the service to avoid launchctl/systemd
// force-killing the process tree before cleanup finishes.
if let Some(pid) = is_autopilot_running() {
#[cfg(unix)]
{
// Send SIGTERM to the autopilot process only (not the process group).
// The autopilot's graceful shutdown handler will SIGTERM warden, which
// in turn cleans up its Docker containers before exiting. Killing the
// whole process group would race warden's cleanup.
let _ = std::process::Command::new("kill")
.arg("-TERM")
.arg(pid.to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status();
}
#[cfg(windows)]
{
let _ = std::process::Command::new("taskkill")
.args(["/PID", &pid.to_string()])
.status();
}
// Wait for the process to exit — give enough time for the graceful
// shutdown handler to tear down warden + Docker containers (~10s).
for _ in 0..50 {
if !crate::commands::watch::is_process_running(pid) {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
// If still running after SIGTERM, force kill the process group
if crate::commands::watch::is_process_running(pid) {
#[cfg(unix)]
{
let _ = std::process::Command::new("kill")
.arg("-9")
.arg(format!("-{}", pid))
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status();
let _ = std::process::Command::new("kill")
.arg("-9")
.arg(pid.to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status();
}
#[cfg(windows)]
{
let _ = std::process::Command::new("taskkill")
.args(["/PID", &pid.to_string(), "/T", "/F"])
.status();
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// Clean up PID file if process is gone
if !crate::commands::watch::is_process_running(pid) {
let config = crate::commands::watch::ScheduleConfig::load_default().ok();
if let Some(config) = config {
let pid_file = config
.db_path()
.parent()
.unwrap_or(std::path::Path::new("."))
.join("autopilot.pid");
let _ = std::fs::remove_file(&pid_file);
}
}
stopped = true;
}
// Now that the process has exited cleanly, uninstall the system service.
// This is safe because the process is already gone — launchctl/systemd
// won't force-kill anything.
if autopilot_service_installed() {
// stop_autopilot_service is a no-op if the process is already gone,
// but call it for completeness (systemd may need it).
let _ = stop_autopilot_service();
uninstall_autopilot_service()?;
if !stopped {
stopped = true;
}
println!("✓ Autopilot service stopped and uninstalled");
} else if stopped {
println!("✓ Autopilot process stopped");
}
if !stopped {
println!("Autopilot is not running.");
}
Ok(())
}
async fn restart_autopilot() -> Result<(), String> {
// 1. Validate the autopilot config (server + schedules)
println!("Validating autopilot configuration...");
let autopilot_config = AutopilotConfigFile::load_or_default()?;
for schedule in &autopilot_config.schedules {
validate_schedule(schedule)?;
}
let config_path = AutopilotConfigFile::path();
let channel_count = gateway_channel_count(config_path.as_path())?;
println!(
" ✓ {} schedule(s), {} channel(s), server listen={}",
autopilot_config.schedules.len(),
channel_count,
autopilot_config.server.listen,
);
// 2. Validate the watch/scheduler config (cron parsing, check scripts, db/log paths)
match crate::commands::watch::ScheduleConfig::load_default() {
Ok(config) => {
println!(
" ✓ Scheduler config valid ({} schedules)",
config.schedules.len()
);
}
Err(e) => {
return Err(format!(
"Scheduler configuration error: {}\nFix {} and try again.",
e,
AutopilotConfigFile::path().display(),
));
}
}
// 3. Restart: service path or foreground PID
if autopilot_service_installed() {
println!("\nRestarting autopilot service...");
stop_autopilot_service()?;
// Wait for the old process to fully exit before starting the new one.
// launchctl stop is async — the process may still be shutting down.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
start_autopilot_service()?;
println!("✓ Autopilot service restarted (scheduler, server, gateway)");
} else if let Some(pid) = is_autopilot_running() {
println!("\nAutopilot is running in foreground mode (PID {}).", pid);
println!(
"Stop it with Ctrl-C or `stakpak autopilot down`, then start again with `stakpak up`."
);
println!("Configuration has been validated and will take effect on next start.");
} else {
return Err("Autopilot is not running. Start it with `stakpak up`.".to_string());
}
Ok(())
}
async fn run_schedule_command(
command: AutopilotScheduleCommands,
config: &AppConfig,
) -> Result<(), String> {
match command {
AutopilotScheduleCommands::List => list_schedules().await,
AutopilotScheduleCommands::Add {
name,
cron,
prompt,
check,
trigger_on,
// workdir,
max_steps,
channel,
profile,
pause_on_approval,
sandbox,
enabled,
} => {
let mut autopilot_config = AutopilotConfigFile::load_or_default_async().await?;
let profile = normalize_optional_string(profile);
if let Some(profile_name) = profile.as_deref() {
validate_profile_reference(profile_name, config)?;
}
let schedule = AutopilotScheduleConfig {
name: name.clone(),
cron,
prompt,
check,
trigger_on,
// workdir,
max_steps,
channel,
profile,
pause_on_approval,
sandbox,
enabled,
};
let check_path = schedule.check.clone();
add_schedule_in_config(&mut autopilot_config, schedule)?;
autopilot_config.save()?;
let signaled = signal_scheduler_reload().await;
print_schedule_mutation_feedback(&name, "added", signaled);
if let Some(path) = check_path.as_deref()
&& uses_home_tilde_prefix(path)
{
eprintln!(
"Note: check path '{}' uses '~'. It expands to the HOME of the user running autopilot; for systemd/launchd/containers, prefer an absolute path.",
path
);
}
Ok(())
}
AutopilotScheduleCommands::Remove { name } => {
let mut config = AutopilotConfigFile::load_or_default_async().await?;
remove_schedule_in_config(&mut config, &name)?;
config.save()?;
let signaled = signal_scheduler_reload().await;
print_schedule_mutation_feedback(&name, "removed", signaled);
Ok(())
}
AutopilotScheduleCommands::Enable { name } => {
let mut config = AutopilotConfigFile::load_or_default_async().await?;
set_schedule_enabled_in_config(&mut config, &name, true)?;
config.save()?;
let signaled = signal_scheduler_reload().await;
print_schedule_mutation_feedback(&name, "enabled", signaled);
Ok(())
}
AutopilotScheduleCommands::Disable { name } => {
let mut config = AutopilotConfigFile::load_or_default_async().await?;
set_schedule_enabled_in_config(&mut config, &name, false)?;
config.save()?;
let signaled = signal_scheduler_reload().await;
print_schedule_mutation_feedback(&name, "disabled", signaled);
Ok(())
}
AutopilotScheduleCommands::Trigger { name, dry_run } => {
// Validate the schedule exists in config
let config = AutopilotConfigFile::load_or_default_async().await?;
if config.find_schedule(&name).is_none() {
return Err(format!("Schedule '{}' not found", name));
}
// Delegate to the watch module's fire_schedule
match crate::commands::watch::commands::schedule::fire_schedule(&name, dry_run).await {
Ok(()) => Ok(()),
Err(e) if e.contains("not found") || e.contains("not running") => Err(format!(
"Cannot trigger '{}': autopilot is not running. Start it with: stakpak up",
name
)),
Err(e) => Err(e),
}
}
AutopilotScheduleCommands::History { name, limit } => {
crate::commands::watch::commands::history::show_history(Some(&name), Some(limit)).await
}
AutopilotScheduleCommands::Show { id } => {
crate::commands::watch::commands::history::show_run(id).await
}
AutopilotScheduleCommands::Clean { older_than_days } => {
let config = crate::commands::watch::ScheduleConfig::load_default()
.map_err(|e| format!("Failed to load watch config: {}", e))?;
let db_path = config.db_path();
let db_path_str = db_path
.to_str()
.ok_or_else(|| "Invalid database path".to_string())?;
let db = crate::commands::watch::ScheduleDb::new(db_path_str)
.await
.map_err(|e| format!("Failed to open database: {}", e))?;
// Clean stale running runs
let cleaned = db
.clean_stale_runs()
.await
.map_err(|e| format!("Failed to clean stale runs: {}", e))?;
if cleaned > 0 {
println!(
"✓ Marked {} stale run{} as failed",
cleaned,
if cleaned == 1 { "" } else { "s" }
);
} else {
println!("No stale runs found");
}
// Optionally prune old history
if let Some(days) = older_than_days {
let pruned = db
.prune_runs(days)
.await
.map_err(|e| format!("Failed to prune runs: {}", e))?;
if pruned > 0 {
println!(
"✓ Pruned {} run{} older than {} day{}",
pruned,
if pruned == 1 { "" } else { "s" },
days,
if days == 1 { "" } else { "s" }
);
} else {
println!("No runs older than {} days to prune", days);
}
}
Ok(())
}
}
}
fn require_non_empty_token(token: String, error_message: &str) -> Result<String, String> {
let trimmed = token.trim();
if trimmed.is_empty() {
return Err(error_message.to_string());
}
Ok(trimmed.to_string())
}
fn validate_profile_reference(profile_name: &str, config: &AppConfig) -> Result<(), String> {
let profile_name = profile_name.trim();
if profile_name.is_empty() {
return Err("Profile name cannot be empty".to_string());
}
if profile_name == "all" {
return Err("Profile name 'all' is reserved and cannot be selected directly".to_string());
}
let config_file = AppConfig::load_config_file(Path::new(&config.config_path))
.map_err(|error| format!("Failed to load config.toml: {error}"))?;
if config_file.profiles.contains_key(profile_name) {
return Ok(());
}
let mut available: Vec<String> = config_file
.profiles
.keys()
.filter(|name| name.as_str() != "all")
.cloned()
.collect();
available.sort();
if available.is_empty() {
return Err(format!(
"Profile '{}' not found in config.toml",
profile_name
));
}
Err(format!(
"Profile '{}' not found in config.toml. Available: {}",
profile_name,
available.join(", ")
))
}
fn add_channel_with_optional_target(
config_path: &Path,
channel_type: ChannelType,
token: Option<String>,
bot_token: Option<String>,
app_token: Option<String>,
target: Option<String>,
profile: Option<String>,
) -> Result<Option<String>, String> {
let had_target = target.is_some();
let normalized_target = target
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty());
if had_target && normalized_target.is_none() {
return Err("Target cannot be empty".to_string());
}
let normalized_profile = normalize_optional_string(profile);
let mut root = load_toml_root_table(config_path)?;
{
let channels = ensure_toml_table(&mut root, "channels");
match &channel_type {
ChannelType::Telegram => {
let raw_token = token.or_else(|| std::env::var("TELEGRAM_BOT_TOKEN").ok()).ok_or(
"Telegram token required. Use --token or set TELEGRAM_BOT_TOKEN\n\n To get a token: message @BotFather on Telegram → /newbot → copy the bot token",
)?;
let tok = require_non_empty_token(
raw_token,
"Telegram token cannot be empty. Use --token or set TELEGRAM_BOT_TOKEN",
)?;
let mut telegram = toml::value::Table::new();
telegram.insert("token".to_string(), toml::Value::String(tok));
telegram.insert("require_mention".to_string(), toml::Value::Boolean(false));
if let Some(profile_name) = normalized_profile.as_ref() {
telegram.insert(
"profile".to_string(),
toml::Value::String(profile_name.clone()),
);
}
channels.insert("telegram".to_string(), toml::Value::Table(telegram));
}
ChannelType::Discord => {
let raw_token = token.or_else(|| std::env::var("DISCORD_BOT_TOKEN").ok()).ok_or(
"Discord token required. Use --token or set DISCORD_BOT_TOKEN\n\n To get a token: https://discord.com/developers/applications → create app → Bot tab → copy token",
)?;
let tok = require_non_empty_token(
raw_token,
"Discord token cannot be empty. Use --token or set DISCORD_BOT_TOKEN",
)?;
let mut discord = toml::value::Table::new();
discord.insert("token".to_string(), toml::Value::String(tok));
discord.insert("guilds".to_string(), toml::Value::Array(Vec::new()));
if let Some(profile_name) = normalized_profile.as_ref() {
discord.insert(
"profile".to_string(),
toml::Value::String(profile_name.clone()),
);
}
channels.insert("discord".to_string(), toml::Value::Table(discord));
}
ChannelType::Slack => {
let raw_bot = bot_token.or_else(|| std::env::var("SLACK_BOT_TOKEN").ok()).ok_or(
"Slack bot token required. Use --bot-token or set SLACK_BOT_TOKEN\n\n To get tokens: https://api.slack.com/apps → create app → enable Socket Mode\n → OAuth & Permissions → Install to Workspace → copy Bot User OAuth Token (xoxb-...)",
)?;
let raw_app = app_token.or_else(|| std::env::var("SLACK_APP_TOKEN").ok()).ok_or(
"Slack app token required. Use --app-token or set SLACK_APP_TOKEN\n\n To get tokens: https://api.slack.com/apps → your app → Socket Mode\n → generate app-level token with connections:write scope (xapp-...)",
)?;
let bot = require_non_empty_token(
raw_bot,
"Slack bot token cannot be empty. Use --bot-token or set SLACK_BOT_TOKEN",
)?;
let app = require_non_empty_token(
raw_app,
"Slack app token cannot be empty. Use --app-token or set SLACK_APP_TOKEN",
)?;
let mut slack = toml::value::Table::new();
slack.insert("bot_token".to_string(), toml::Value::String(bot));
slack.insert("app_token".to_string(), toml::Value::String(app));
if let Some(profile_name) = normalized_profile.as_ref() {
slack.insert(
"profile".to_string(),
toml::Value::String(profile_name.clone()),
);
}
channels.insert("slack".to_string(), toml::Value::Table(slack));
}
_ => return Err(format!("{:?} is not supported yet", channel_type)),
}
}
if let Some(target_value) = normalized_target.as_deref() {
apply_default_notification_target(&mut root, &channel_type.to_string(), target_value)?;
}
write_toml_root_table(config_path, root)?;
Ok(normalized_target)
}
fn remove_channel(config_path: &Path, channel_type: ChannelType) -> Result<(), String> {
let mut config = stakpak_gateway::GatewayConfig::load_unvalidated(
config_path,
&stakpak_gateway::GatewayCliFlags::default(),
)
.map_err(|e| format!("Failed to load config: {e}"))?;
match &channel_type {
ChannelType::Telegram => config.channels.telegram = None,
ChannelType::Discord => config.channels.discord = None,
ChannelType::Slack => config.channels.slack = None,
_ => return Err(format!("{:?} is not supported yet", channel_type)),
}
config
.save(config_path)
.map_err(|e| format!("Failed to save config: {e}"))?;
Ok(())
}
async fn run_channel_command(
command: AutopilotChannelCommands,
config: &AppConfig,
) -> Result<(), String> {
let config_path = AutopilotConfigFile::path();
match command {
AutopilotChannelCommands::List => {
let config = load_gateway_config_allowing_no_channels(config_path.as_path())?;
let channels = config.enabled_channels();
if channels.is_empty() {
println!("No channels configured.");
println!(
" Add one: stakpak autopilot channel add slack --bot-token X --app-token Y"
);
return Ok(());
}
println!("{:<15} STATUS", "CHANNEL");
if config.channels.telegram.is_some() {
println!("{:<15} configured", "telegram");
}
if config.channels.discord.is_some() {
println!("{:<15} configured", "discord");
}
if config.channels.slack.is_some() {
println!("{:<15} configured", "slack");
}
Ok(())
}
AutopilotChannelCommands::Add {
channel_type,
token,
bot_token,
app_token,
target,
profile,
} => {
let explicit_profile = normalize_optional_string(profile);
let default_profile = normalize_optional_string(Some(config.profile_name.clone()));
let requested_profile = explicit_profile.clone().or(default_profile);
if let Some(profile_name) = requested_profile.as_deref() {
validate_profile_reference(profile_name, config)?;
if explicit_profile.is_none() {
println!(
"ℹ Using profile '{}' (override with --profile)",
profile_name
);
}
}
let saved_target = add_channel_with_optional_target(
config_path.as_path(),
channel_type.clone(),
token,
bot_token,
app_token,
target,
requested_profile,
)?;
if let Some(target_value) = saved_target {
println!(
"✓ Default notification target set for {}: {}",
channel_type, target_value
);
}
println!("✓ Channel {} added", channel_type);
Ok(())
}
AutopilotChannelCommands::Remove { channel_type } => {
remove_channel(config_path.as_path(), channel_type.clone())?;
println!("✓ Channel {} removed", channel_type);
Ok(())
}
AutopilotChannelCommands::Test => {
let config = stakpak_gateway::GatewayConfig::load(
config_path.as_path(),
&stakpak_gateway::GatewayCliFlags::default(),
)
.map_err(|e| format!("Failed to load config: {e}"))?;
let channels = stakpak_gateway::build_channels(&config)
.map_err(|e| format!("Failed to build channels: {e}"))?;
if channels.is_empty() {
return Err("No channels configured. Add one first: stakpak autopilot channel add slack --bot-token X --app-token Y".to_string());
}
for channel in channels.values() {
match channel.test().await {
Ok(result) => println!(
" ✓ {}: {} ({})",
result.channel, result.identity, result.details
),
Err(error) => println!(" ✗ {}: {}", channel.display_name(), error),
}
}
Ok(())
}
}
}
async fn list_schedules() -> Result<(), String> {
let config = AutopilotConfigFile::load_or_default_async().await?;
if config.schedules.is_empty() {
println!("No schedules configured.");
return Ok(());
}
println!(
"{:<20} {:<16} {:<10} {:<8} {:<24}",
"NAME", "CRON", "STATUS", "SANDBOX", "NEXT RUN"
);
for schedule in &config.schedules {
let next_run =
next_run_for_cron(&schedule.cron, schedule.enabled).unwrap_or_else(|| "-".to_string());
println!(
"{:<20} {:<16} {:<10} {:<8} {:<24}",
truncate_text(&schedule.name, 20),
truncate_text(&schedule.cron, 16),
if schedule.enabled {
"enabled"
} else {
"disabled"
},
if schedule.sandbox { "yes" } else { "no" },
truncate_text(&next_run, 24)
);
}
Ok(())
}
fn validate_schedule(schedule: &AutopilotScheduleConfig) -> Result<(), String> {
if schedule.name.trim().is_empty() {
return Err("Schedule name cannot be empty".to_string());
}
if schedule.name.trim() == crate::commands::watch::RELOAD_SENTINEL {
return Err(format!(
"Schedule name '{}' is reserved",
crate::commands::watch::RELOAD_SENTINEL
));
}
Cron::from_str(&schedule.cron)
.map_err(|e| format!("Invalid cron expression '{}': {}", schedule.cron, e))?;
if schedule.prompt.trim().is_empty() {
return Err("Schedule prompt cannot be empty".to_string());
}
if let Some(profile_name) = schedule.profile.as_deref()
&& profile_name.trim().is_empty()
{
return Err("Schedule profile cannot be empty".to_string());
}
if let Some(check_path) = schedule.check.as_deref() {
let expanded = crate::commands::watch::config::expand_tilde(check_path);
let expanded_str = expanded.to_string_lossy();
if uses_home_tilde_prefix(check_path) && uses_home_tilde_prefix(&expanded_str) {
return Err(format!(
"Cannot expand '~' in check script path for schedule '{}': {}. Home directory for the running user could not be determined; use an absolute path.",
schedule.name, check_path
));
}
if !expanded.exists() {
return Err(format!(
"Check script not found for schedule '{}': {}",
schedule.name, check_path
));
}
if !expanded.is_file() {
return Err(format!(
"Check script path is not a file for schedule '{}': {}",
schedule.name, check_path
));
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let metadata = std::fs::metadata(&expanded).map_err(|e| {
format!(
"Cannot read check script metadata for schedule '{}': {}",
schedule.name, e
)
})?;
let permissions = metadata.permissions();
if permissions.mode() & 0o111 == 0 {
return Err(format!(
"Check script is not executable for schedule '{}': {}",
schedule.name, check_path
));
}
}
}
Ok(())
}
fn uses_home_tilde_prefix(path: &str) -> bool {
path == "~" || path.starts_with("~/") || path.starts_with("~\\")
}
fn add_schedule_in_config(
config: &mut AutopilotConfigFile,
schedule: AutopilotScheduleConfig,
) -> Result<(), String> {
validate_schedule(&schedule)?;
if config.find_schedule(&schedule.name).is_some() {
return Err(format!("Schedule '{}' already exists", schedule.name));
}
config.schedules.push(schedule);
Ok(())
}
fn remove_schedule_in_config(config: &mut AutopilotConfigFile, name: &str) -> Result<(), String> {
let initial_len = config.schedules.len();
config.schedules.retain(|schedule| schedule.name != name);
if config.schedules.len() == initial_len {
return Err(format!("Schedule '{}' not found", name));
}
Ok(())
}
fn set_schedule_enabled_in_config(
config: &mut AutopilotConfigFile,
name: &str,
enabled: bool,
) -> Result<(), String> {
let schedule = config
.find_schedule_mut(name)
.ok_or_else(|| format!("Schedule '{}' not found", name))?;
schedule.enabled = enabled;
Ok(())
}
fn print_schedule_mutation_feedback(name: &str, action: &str, signaled: bool) {
if is_autopilot_running().is_some() {
if signaled {
println!("✓ Schedule '{}' {} (takes effect within ~1s)", name, action);
} else {
println!("✓ Schedule '{}' {} (takes effect within ~5s)", name, action);
}
} else {
println!(
"✓ Schedule '{}' {} (takes effect when autopilot starts)",
name, action
);
}
}
async fn signal_scheduler_reload() -> bool {
let db_path = match autopilot_db_path() {
Ok(path) => path,
Err(_) => return false,
};
let db = match crate::commands::watch::ScheduleDb::new(&db_path).await {
Ok(db) => db,
Err(_) => return false,
};
db.request_config_reload().await.is_ok()
}
fn autopilot_db_path() -> Result<String, String> {
let config = crate::commands::watch::ScheduleConfig::load_default()
.map_err(|error| format!("Failed to load watch config: {}", error))?;
let db_path = config.db_path();
db_path
.to_str()
.map(|value| value.to_string())
.ok_or_else(|| "Invalid db path".to_string())
}
#[derive(Debug, Clone)]
struct NotificationDefaults {
channel: String,
chat_id: Option<String>,
}
fn load_notification_defaults(path: &Path) -> Result<NotificationDefaults, String> {
let root = load_toml_root_table(path)?;
let notifications = root
.get("notifications")
.and_then(toml::Value::as_table)
.ok_or_else(|| "Notifications are not configured".to_string())?;
let channel = notifications
.get("channel")
.and_then(toml::Value::as_str)
.ok_or_else(|| "Notifications channel is not configured".to_string())?
.to_string();
let chat_id = notifications
.get("chat_id")
.and_then(toml::Value::as_str)
.map(str::to_string);
Ok(NotificationDefaults { channel, chat_id })
}
fn resolve_default_gateway_url(root: &toml::value::Table) -> String {
root.get("server")
.and_then(toml::Value::as_table)
.and_then(|server| server.get("listen"))
.and_then(toml::Value::as_str)
.map(loopback_base_url_from_bind)
.unwrap_or_else(|| "http://127.0.0.1:4096".to_string())
}
fn apply_default_notification_target(
root: &mut toml::value::Table,
channel: &str,
target: &str,
) -> Result<(), String> {
if channel.trim().is_empty() {
return Err("Channel cannot be empty".to_string());
}
if target.trim().is_empty() {
return Err("Target cannot be empty".to_string());
}
let default_gateway_url = resolve_default_gateway_url(root);
let notifications = ensure_toml_table(root, "notifications");
if !notifications.contains_key("gateway_url") {
notifications.insert(
"gateway_url".to_string(),
toml::Value::String(default_gateway_url),
);
}
notifications.insert(
"channel".to_string(),
toml::Value::String(channel.trim().to_string()),
);
notifications.insert(
"chat_id".to_string(),
toml::Value::String(target.trim().to_string()),
);
Ok(())
}
#[cfg(test)]
fn set_default_notification_target(path: &Path, channel: &str, target: &str) -> Result<(), String> {
let mut root = load_toml_root_table(path)?;
apply_default_notification_target(&mut root, channel, target)?;
write_toml_root_table(path, root)
}
fn load_gateway_config_allowing_no_channels(
config_path: &Path,
) -> Result<stakpak_gateway::GatewayConfig, String> {
let cli_flags = stakpak_gateway::GatewayCliFlags::default();
let config = stakpak_gateway::GatewayConfig::load_unvalidated(config_path, &cli_flags)
.map_err(|e| format!("Failed to load channel config: {e}"))?;
match config.validate_with_error() {
Ok(()) | Err(stakpak_gateway::config::GatewayConfigValidationError::MissingChannels) => {
Ok(config)
}
Err(error) => Err(format!("Channel config invalid: {error}")),
}
}
fn gateway_channel_count(config_path: &Path) -> Result<usize, String> {
let config = load_gateway_config_allowing_no_channels(config_path)?;
Ok(config.enabled_channels().len())
}
async fn status_autopilot(
config: &AppConfig,
json: bool,
recent_runs: Option<u32>,
) -> Result<(), String> {
let autopilot_config = AutopilotConfigFile::load_or_default_async().await?;
let server_config = autopilot_config.server.clone();
let resolved_tool_policy = resolve_server_tool_policy(
config.allowed_tools.as_ref(),
config.auto_approve.as_ref(),
server_config.auto_approve_all,
);
let server_allowed_tool_count = approved_tools_from_policy(&resolved_tool_policy).len();
let config_path = AutopilotConfigFile::path();
let base_url = loopback_base_url_from_bind(&server_config.listen);
let probe_client = build_probe_http_client();
let schedules = build_schedule_statuses(&autopilot_config.schedules);
let gateway_config = load_gateway_config_allowing_no_channels(config_path.as_path())?;
let notification_defaults = load_notification_defaults(config_path.as_path()).ok();
let channels = build_channel_statuses(&gateway_config, notification_defaults.as_ref());
let service_path = autopilot_service_path();
let service = ServiceStatusJson {
installed: autopilot_service_installed(),
active: autopilot_service_active(),
path: service_path.display().to_string(),
};
let server_url = format!("{}/v1/health", base_url);
let (server_reachable, sandbox_health) = if let Some(client) = probe_client.as_ref() {
match client.get(&server_url).send().await {
Ok(resp) if resp.status().is_success() => {
// Try to parse sandbox health from the response body
let sandbox = resp
.json::<serde_json::Value>()
.await
.ok()
.and_then(|v| v.get("sandbox").cloned())
.and_then(|s| {
Some(SandboxStatusJson {
mode: s.get("mode")?.as_str()?.to_string(),
healthy: s.get("healthy")?.as_bool(),
consecutive_ok: s.get("consecutive_ok")?.as_u64(),
consecutive_failures: s.get("consecutive_failures")?.as_u64(),
last_ok: s.get("last_ok").and_then(|v| v.as_str()).map(String::from),
last_error: s
.get("last_error")
.and_then(|v| v.as_str())
.map(String::from),
})
});
(true, sandbox)
}
_ => (false, None),
}
} else {
(false, None)
};
// Build sandbox status: use live health data if available, otherwise fall back to config
let sandbox = sandbox_health.unwrap_or(SandboxStatusJson {
mode: server_config.sandbox_mode.to_string(),
healthy: None,
consecutive_ok: None,
consecutive_failures: None,
last_ok: None,
last_error: if server_reachable {
None
} else {
Some("Server unreachable — cannot determine sandbox health".to_string())
},
});
let server = EndpointStatusJson {
expected_enabled: true,
reachable: server_reachable,
url: server_url,
};
let gateway_url = format!("{}/v1/gateway/status", base_url);
let gateway_reachable = if let Some(client) = probe_client.as_ref() {
endpoint_ok(client, &gateway_url).await
} else {
false
};
let gateway = EndpointStatusJson {
expected_enabled: true,
reachable: gateway_reachable,
url: gateway_url,
};
let scheduler = collect_scheduler_status(recent_runs).await;
if json {
print_json(&AutopilotStatusJson {
command: "autopilot.status",
ok: true,
profile: config.profile_name.clone(),
config_path: config_path.display().to_string(),
server_config: server_config.clone(),
server_allowed_tool_count,
service,
server,
gateway,
sandbox,
scheduler,
schedules,
channels,
})?;
return Ok(());
}
println!("Autopilot status");
println!();
println!(" Profile {}", config.profile_name);
println!(" Config {}", config_path.display());
println!(
" Service {}",
if service.installed {
if service.active {
"active"
} else {
"installed (inactive)"
}
} else {
"not installed"
}
);
println!(
" Server {}",
if server.reachable {
format!("✓ reachable ({})", server.url)
} else {
format!("✗ unreachable ({})", server.url)
}
);
println!(
" Channels {}",
if gateway.reachable {
format!("✓ reachable ({})", gateway.url)
} else {
format!("✗ unreachable ({})", gateway.url)
}
);
println!(
" Tools {}",
describe_tool_policy(&resolved_tool_policy)
);
// Sandbox status
let sandbox_display = match (sandbox.healthy, sandbox.mode.as_str()) {
(Some(true), mode) => format!("✓ healthy ({mode})"),
(Some(false), mode) => {
let err = sandbox.last_error.as_deref().unwrap_or("unknown error");
format!("✗ unhealthy ({mode}) — {err}")
}
(None, mode) if server_reachable => format!("- {mode} (no health data)"),
(None, mode) => format!("- {mode} (server unreachable)"),
};
println!(" Sandbox {sandbox_display}");
// Scheduler status
let config_exists = AutopilotConfigFile::path().exists();
if !config_exists {
println!(" Scheduler not configured (run: stakpak up)");
} else if scheduler.config_valid {
let sched_state = if scheduler.running {
format!("✓ running (pid {})", scheduler.pid.unwrap_or_default())
} else if scheduler.stale_pid {
format!("⚠ stale (pid {})", scheduler.pid.unwrap_or_default())
} else {
"stopped".to_string()
};
println!(
" Scheduler {} — {} schedules",
sched_state, scheduler.trigger_count
);
} else {
println!(
" Scheduler ✗ config error: {}",
scheduler.error.as_deref().unwrap_or("unknown")
);
}
// Schedules table
if !schedules.is_empty() {
println!();
println!(" Schedules:");
println!(
" {:<20} {:<16} {:<10} {:<8} {:<20}",
"NAME", "CRON", "STATUS", "SANDBOX", "NEXT RUN"
);
for schedule in &schedules {
println!(
" {:<20} {:<16} {:<10} {:<8} {:<20}",
truncate_text(&schedule.name, 20),
truncate_text(&schedule.cron, 16),
if schedule.enabled {
"enabled"
} else {
"disabled"
},
if schedule.sandbox { "yes" } else { "no" },
schedule.next_run.as_deref().unwrap_or("-")
);
}
}
// Channels table
if !channels.is_empty() {
println!();
println!(" Channels:");
println!(
" {:<20} {:<10} {:<24} {:<10}",
"NAME", "TYPE", "TARGET", "STATUS"
);
for channel in &channels {
println!(
" {:<20} {:<10} {:<24} {:<10}",
truncate_text(&channel.name, 20),
truncate_text(&channel.channel_type, 10),
truncate_text(&channel.target, 24),
if channel.enabled {
"enabled"
} else {
"disabled"
}
);
}
}
// Recent runs
if !scheduler.recent_runs.is_empty() {
println!();
println!(" Recent runs:");
for run in &scheduler.recent_runs {
println!(
" #{} {:<16} {:<10} {}",
run.id, run.schedule_name, run.status, run.started_at
);
}
}
Ok(())
}
fn tail_log_files(files: &[PathBuf], follow: bool, lines: Option<u32>) -> Result<(), String> {
let mut cmd = std::process::Command::new("tail");
if follow {
cmd.arg("-f");
}
if let Some(n) = lines {
cmd.arg("-n").arg(n.to_string());
}
for file in files {
cmd.arg(file);
}
let status = cmd
.status()
.map_err(|e| format!("Failed to read autopilot logs: {}", e))?;
if !status.success() {
return Err("Failed to read autopilot logs".to_string());
}
Ok(())
}
async fn logs_autopilot(
follow: bool,
lines: Option<u32>,
component: Option<String>,
) -> Result<(), String> {
let log_dir = autopilot_log_dir();
// Resolve which log files to show
let log_files: Vec<PathBuf> = if let Some(ref name) = component {
let file = log_dir.join(format!("{}.log", name));
if !file.exists() {
return Err(format!(
"Component log file not found: {}\nAutopilot may not have run yet.",
file.display()
));
}
vec![file]
} else {
// Show all component logs plus legacy stdout/stderr
[
"scheduler.log",
"server.log",
"gateway.log",
"stdout.log",
"stderr.log",
]
.iter()
.map(|f| log_dir.join(f))
.filter(|p| p.exists())
.collect()
};
if log_files.is_empty() {
return Err(format!(
"No log files found in {}.\nAutopilot may not have run yet.",
log_dir.display()
));
}
match detect_platform() {
Platform::Linux => {
// If a component filter is set, use tail on the specific file instead of journalctl
if component.is_some() {
tail_log_files(&log_files, follow, lines)?;
} else {
let mut cmd = std::process::Command::new("journalctl");
cmd.args(["--user", "-u", AUTOPILOT_SYSTEMD_SERVICE]);
if follow {
cmd.arg("-f");
}
if let Some(lines) = lines {
cmd.arg("-n").arg(lines.to_string());
}
let status = cmd
.status()
.map_err(|e| format!("Failed to run journalctl: {}", e))?;
if !status.success() {
return Err("Failed to read autopilot logs from journalctl".to_string());
}
}
}
Platform::MacOS => {
tail_log_files(&log_files, follow, lines)?;
}
Platform::Windows | Platform::Unknown => {
return Err(
"Autopilot logs are currently supported on Linux (journalctl) and macOS (tail)."
.to_string(),
);
}
}
Ok(())
}
async fn doctor_autopilot(config: &AppConfig) -> Result<(), String> {
println!("Autopilot doctor");
let mut failures = 0usize;
let autopilot_config = match AutopilotConfigFile::load_or_default() {
Ok(cfg) => {
println!("✓ Autopilot config loaded (listen={})", cfg.server.listen);
cfg
}
Err(e) => {
failures += 1;
println!("✗ Autopilot config invalid: {}", e);
AutopilotConfigFile::default()
}
};
let _ = &autopilot_config;
let base_url = loopback_base_url_from_bind(&autopilot_config.server.listen);
let server_health_url = format!("{}/v1/health", base_url);
let probe_client = build_probe_http_client();
let server_reachable = if let Some(client) = probe_client.as_ref() {
endpoint_ok(client, &server_health_url).await
} else {
false
};
let env = RealProbeEnvironment;
let probe_ctx = AutopilotProbeContext {
app_config: config,
bind_addr: Some(&autopilot_config.server.listen),
server_reachable,
};
let probe_results = run_autopilot_probes(ProbeMode::Doctor, &probe_ctx, &env);
presenter::print_probe_report("Deployment readiness", &probe_results);
failures += summarize_probe_results(&probe_results).blocking_failures;
let gateway_path = AutopilotConfigFile::path();
match load_gateway_config_allowing_no_channels(gateway_path.as_path()) {
Ok(cfg) => {
let channels = cfg.enabled_channels();
if channels.is_empty() {
println!("✓ No channels configured (add with: stakpak autopilot channel add)");
} else {
println!("✓ Channel config valid (channels: {})", channels.join(", "));
}
for warning in cfg.check_deprecations() {
println!("⚠ {}", warning);
}
}
Err(e) => {
failures += 1;
println!("✗ Channel config invalid: {}", e);
}
}
let scheduler_status = collect_scheduler_status(None).await;
if scheduler_status.config_valid {
if scheduler_status.trigger_count == 0 {
println!("✓ No schedules configured (edit ~/.stakpak/autopilot.toml to add)");
} else {
println!(
"✓ Schedule config valid ({} schedules)",
scheduler_status.trigger_count
);
}
} else {
failures += 1;
println!(
"✗ Schedule config invalid: {}",
scheduler_status
.error
.unwrap_or_else(|| "unknown configuration error".to_string())
);
}
if autopilot_service_installed() {
println!("✓ Autopilot service installed");
} else {
failures += 1;
println!("✗ Autopilot service not installed");
}
if server_reachable {
println!("✓ Server health endpoint reachable");
} else {
println!("⚠ Server health endpoint not reachable (not running is OK before start)");
}
let resolved_tool_policy = resolve_server_tool_policy(
config.allowed_tools.as_ref(),
config.auto_approve.as_ref(),
autopilot_config.server.auto_approve_all,
);
println!();
println!("Security:");
match &resolved_tool_policy {
stakpak_server::ToolApprovalPolicy::All => {
println!(" ⚠ all tools allowed (auto_approve_all = true)");
}
_ => {
println!(" ✓ {}", describe_tool_policy(&resolved_tool_policy));
}
}
if failures > 0 {
return Err(format!("Doctor found {} blocking issue(s)", failures));
}
println!("✓ Doctor checks passed");
Ok(())
}
fn print_json<T: Serialize>(value: &T) -> Result<(), String> {
let json = serde_json::to_string(value)
.map_err(|e| format!("Failed to serialize JSON output: {}", e))?;
println!("{}", json);
Ok(())
}
async fn write_default_autopilot_config(path: &Path, force: bool) -> Result<(), String> {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| format!("Failed to create autopilot config directory: {}", e))?;
}
if force || !path.exists() {
let default_config = AutopilotConfigFile::default();
let content = toml::to_string_pretty(&default_config)
.map_err(|e| format!("Failed to serialize default autopilot config: {}", e))?;
let header = "# Stakpak Autopilot Configuration\n\
# Add schedules: stakpak autopilot schedule add <name> --cron '...' --prompt '...'\n\
# Add channels: stakpak autopilot channel add slack --bot-token X --app-token Y\n\
# Start: stakpak up\n\n";
tokio::fs::write(path, format!("{}{}", header, content))
.await
.map_err(|e| format!("Failed to write autopilot config: {}", e))?;
}
Ok(())
}
fn loopback_base_url_from_bind(bind: &str) -> String {
match bind.parse::<SocketAddr>() {
Ok(addr) => {
let port = addr.port();
match addr.ip() {
IpAddr::V4(ip) => {
if ip.is_unspecified() {
format!("http://{}:{}", Ipv4Addr::LOCALHOST, port)
} else {
format!("http://{}:{}", ip, port)
}
}
IpAddr::V6(ip) => {
if ip.is_unspecified() {
format!("http://[{}]:{}", Ipv6Addr::LOCALHOST, port)
} else {
format!("http://[{}]:{}", ip, port)
}
}
}
}
Err(_) => "http://127.0.0.1:4096".to_string(),
}
}
async fn collect_scheduler_status(recent_runs: Option<u32>) -> SchedulerStatusJson {
let config_path = AutopilotConfigFile::path();
let schedule_count = AutopilotConfigFile::load_or_default()
.map(|c| c.schedules.len())
.unwrap_or(0);
let config_valid = config_path.exists();
// Watch runtime uses ~/.stakpak/autopilot/autopilot.db regardless of config format
let db_path = dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".stakpak/autopilot/autopilot.db");
let db_path_str = db_path.to_string_lossy().to_string();
let db = match db_path.to_str() {
Some(path) => match crate::commands::watch::ScheduleDb::new(path).await {
Ok(db) => db,
Err(error) => {
return SchedulerStatusJson {
expected_enabled: true,
config_path: config_path.display().to_string(),
config_valid,
trigger_count: schedule_count,
running: false,
pid: None,
stale_pid: false,
db_path: Some(db_path_str),
error: Some(error.to_string()),
recent_runs: Vec::new(),
};
}
},
None => {
return SchedulerStatusJson {
expected_enabled: true,
config_path: config_path.display().to_string(),
config_valid,
trigger_count: schedule_count,
running: false,
pid: None,
stale_pid: false,
db_path: Some(db_path_str),
error: Some("Invalid scheduler database path".to_string()),
recent_runs: Vec::new(),
};
}
};
let scheduler_state = db.get_autopilot_state().await.ok().flatten();
let (running, stale_pid, pid) = if let Some(state) = scheduler_state {
let pid = state.pid;
let running = u32::try_from(pid)
.ok()
.map(crate::commands::watch::is_process_running)
.unwrap_or(false);
(running, !running, Some(pid))
} else {
(false, false, None)
};
let recent_runs = if let Some(limit) = recent_runs.filter(|limit| *limit > 0) {
match db
.list_runs(&crate::commands::watch::ListRunsFilter {
schedule_name: None,
status: None,
limit: Some(limit),
offset: None,
})
.await
{
Ok(runs) => runs
.into_iter()
.map(|run| ScheduleRunSummaryJson {
id: run.id,
schedule_name: run.schedule_name,
status: run.status.to_string(),
started_at: run.started_at.to_rfc3339(),
finished_at: run.finished_at.map(|value| value.to_rfc3339()),
error_message: run.error_message,
})
.collect(),
Err(_) => Vec::new(),
}
} else {
Vec::new()
};
SchedulerStatusJson {
expected_enabled: true,
config_path: config_path.display().to_string(),
config_valid,
trigger_count: schedule_count,
running,
pid,
stale_pid,
db_path: Some(db_path_str),
error: None,
recent_runs,
}
}
fn build_schedule_statuses(
schedules: &[AutopilotScheduleConfig],
) -> Vec<AutopilotScheduleStatusJson> {
schedules
.iter()
.map(|schedule| AutopilotScheduleStatusJson {
name: schedule.name.clone(),
cron: schedule.cron.clone(),
enabled: schedule.enabled,
sandbox: schedule.sandbox,
next_run: next_run_for_cron(&schedule.cron, schedule.enabled),
})
.collect()
}
fn build_channel_statuses(
gateway_config: &stakpak_gateway::GatewayConfig,
notification_defaults: Option<&NotificationDefaults>,
) -> Vec<AutopilotChannelStatusJson> {
let mut channels = Vec::new();
if gateway_config.channels.telegram.is_some() {
channels.push(build_single_channel_status(
"telegram",
notification_defaults,
));
}
if gateway_config.channels.discord.is_some() {
channels.push(build_single_channel_status(
"discord",
notification_defaults,
));
}
if gateway_config.channels.slack.is_some() {
channels.push(build_single_channel_status("slack", notification_defaults));
}
channels
}
fn build_single_channel_status(
channel_name: &str,
notification_defaults: Option<&NotificationDefaults>,
) -> AutopilotChannelStatusJson {
let target = notification_defaults
.filter(|defaults| defaults.channel == channel_name)
.and_then(|defaults| defaults.chat_id.clone())
.unwrap_or_else(|| "-".to_string());
AutopilotChannelStatusJson {
name: channel_name.to_string(),
channel_type: channel_name.to_string(),
target,
enabled: true,
alerts_only: false,
}
}
fn next_run_for_cron(cron: &str, enabled: bool) -> Option<String> {
if !enabled {
return None;
}
let expression = Cron::from_str(cron).ok()?;
let next = expression.find_next_occurrence(&Utc::now(), false).ok()?;
Some(next.format("%Y-%m-%d %H:%M").to_string())
}
fn truncate_text(value: &str, max_chars: usize) -> String {
if value.chars().count() <= max_chars {
return value.to_string();
}
let mut truncated = value
.chars()
.take(max_chars.saturating_sub(3))
.collect::<String>();
truncated.push_str("...");
truncated
}
#[cfg(test)]
fn bounded_history_limit(limit: u32) -> u32 {
limit.clamp(1, 1000)
}
/// Ensure the sandbox container image is available locally before starting the
/// autopilot service. If the image isn't cached, runs `docker pull` with
/// inherited stdout/stderr so the user sees Docker's native progress bars
/// (layer downloads, extraction, etc.) instead of a silent wait.
fn ensure_sandbox_image_available() -> Result<(), String> {
let image = crate::commands::warden::stakpak_agent_image();
if stakpak_shared::container::warden_image_exists_locally(&image) {
return Ok(());
}
println!(" Pulling sandbox image: {image}");
println!();
stakpak_shared::container::pull_warden_image(&image).map_err(|e| {
format!(
"{e}\n\n\
Troubleshoot:\n \
docker pull --platform linux/amd64 {image} Pull manually\n \
STAKPAK_AGENT_IMAGE=<img> Override image"
)
})?;
println!();
println!(" ✓ Sandbox image ready");
Ok(())
}
fn build_probe_http_client() -> Option<reqwest::Client> {
reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_secs(2))
.timeout(std::time::Duration::from_secs(3))
.build()
.ok()
}
async fn endpoint_ok(client: &reqwest::Client, url: &str) -> bool {
match client.get(url).send().await {
Ok(resp) => resp.status().is_success(),
Err(_) => false,
}
}
async fn wait_for_shutdown_signal() {
let ctrl_c = async {
let _ = tokio::signal::ctrl_c().await;
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sig) => {
sig.recv().await;
}
Err(_) => {
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {}
_ = terminate => {}
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Platform {
MacOS,
Linux,
Windows,
Unknown,
}
fn detect_platform() -> Platform {
#[cfg(target_os = "macos")]
{
return Platform::MacOS;
}
#[cfg(target_os = "linux")]
{
return Platform::Linux;
}
#[cfg(target_os = "windows")]
{
return Platform::Windows;
}
#[allow(unreachable_code)]
Platform::Unknown
}
const AUTOPILOT_SYSTEMD_SERVICE: &str = "stakpak-autopilot";
const AUTOPILOT_LAUNCHD_LABEL: &str = "dev.stakpak.autopilot";
fn autopilot_log_dir() -> PathBuf {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".stakpak")
.join("autopilot")
.join("logs")
}
fn autopilot_service_path() -> PathBuf {
match detect_platform() {
Platform::Linux => dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".config")
.join("systemd")
.join("user")
.join(format!("{}.service", AUTOPILOT_SYSTEMD_SERVICE)),
Platform::MacOS => dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join("Library")
.join("LaunchAgents")
.join(format!("{}.plist", AUTOPILOT_LAUNCHD_LABEL)),
Platform::Windows | Platform::Unknown => PathBuf::new(),
}
}
pub(crate) fn autopilot_service_installed() -> bool {
let path = autopilot_service_path();
!path.as_os_str().is_empty() && path.exists()
}
/// Check if the autopilot process is currently running via PID file + process check.
pub(crate) fn is_autopilot_running() -> Option<u32> {
let config = crate::commands::watch::ScheduleConfig::load_default().ok()?;
let pid_file = config
.db_path()
.parent()
.unwrap_or(std::path::Path::new("."))
.join("autopilot.pid");
let pid_str = std::fs::read_to_string(&pid_file).ok()?;
let pid: u32 = pid_str.trim().parse().ok()?;
if crate::commands::watch::is_process_running(pid) {
Some(pid)
} else {
// Stale PID file — clean it up
let _ = std::fs::remove_file(&pid_file);
None
}
}
fn install_autopilot_service(config: &AppConfig) -> Result<(), String> {
match detect_platform() {
Platform::Linux => install_systemd_service(config),
Platform::MacOS => install_launchd_service(config),
Platform::Windows => Err("Windows autopilot service is not yet supported".to_string()),
Platform::Unknown => Err("Unsupported platform for autopilot service".to_string()),
}
}
fn uninstall_autopilot_service() -> Result<(), String> {
match detect_platform() {
Platform::Linux => uninstall_systemd_service(),
Platform::MacOS => uninstall_launchd_service(),
Platform::Windows => Err("Windows autopilot service is not yet supported".to_string()),
Platform::Unknown => Err("Unsupported platform for autopilot service".to_string()),
}
}
pub(crate) fn start_autopilot_service() -> Result<(), String> {
match detect_platform() {
Platform::Linux => {
run_command(
"systemctl",
&["--user", "daemon-reload"],
"Failed to reload systemd",
)?;
run_command(
"systemctl",
&["--user", "start", AUTOPILOT_SYSTEMD_SERVICE],
"Failed to start autopilot service",
)
}
Platform::MacOS => {
let plist = autopilot_service_path();
let load_output = std::process::Command::new("launchctl")
.args(["load", plist.to_string_lossy().as_ref()])
.output()
.map_err(|e| format!("Failed to load launchd service: {}", e))?;
if !load_output.status.success() {
let stderr = String::from_utf8_lossy(&load_output.stderr);
if !stderr.to_ascii_lowercase().contains("already loaded") {
return Err(format!("Failed to load launchd service: {}", stderr));
}
}
run_command(
"launchctl",
&["start", AUTOPILOT_LAUNCHD_LABEL],
"Failed to start launchd service",
)
}
Platform::Windows => Err("Windows autopilot service is not yet supported".to_string()),
Platform::Unknown => Err("Unsupported platform for autopilot service".to_string()),
}
}
pub(crate) fn stop_autopilot_service() -> Result<(), String> {
match detect_platform() {
Platform::Linux => run_command(
"systemctl",
&["--user", "stop", AUTOPILOT_SYSTEMD_SERVICE],
"Failed to stop autopilot service",
),
Platform::MacOS => {
let output = std::process::Command::new("launchctl")
.args(["stop", AUTOPILOT_LAUNCHD_LABEL])
.output()
.map_err(|e| format!("Failed to stop launchd service: {}", e))?;
if output.status.success() {
Ok(())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr
.to_ascii_lowercase()
.contains("could not find service")
{
Ok(())
} else {
Err(format!("Failed to stop launchd service: {}", stderr))
}
}
}
Platform::Windows => Err("Windows autopilot service is not yet supported".to_string()),
Platform::Unknown => Err("Unsupported platform for autopilot service".to_string()),
}
}
pub(crate) fn autopilot_service_active() -> bool {
match detect_platform() {
Platform::Linux => std::process::Command::new("systemctl")
.args(["--user", "is-active", "--quiet", AUTOPILOT_SYSTEMD_SERVICE])
.status()
.map(|status| status.success())
.unwrap_or(false),
Platform::MacOS => std::process::Command::new("launchctl")
.args(["list", AUTOPILOT_LAUNCHD_LABEL])
.output()
.map(|output| output.status.success())
.unwrap_or(false),
Platform::Windows | Platform::Unknown => false,
}
}
fn install_systemd_service(config: &AppConfig) -> Result<(), String> {
let binary = std::env::current_exe()
.map_err(|e| format!("Failed to resolve stakpak binary path: {}", e))?;
let service_path = autopilot_service_path();
if let Some(parent) = service_path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create systemd directory: {}", e))?;
}
let log_dir = autopilot_log_dir();
std::fs::create_dir_all(&log_dir)
.map_err(|e| format!("Failed to create autopilot log directory: {}", e))?;
let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
let mut exec_parts = vec![binary.display().to_string()];
if !config.profile_name.is_empty() {
exec_parts.push("--profile".to_string());
exec_parts.push(config.profile_name.clone());
}
if !config.config_path.is_empty() {
exec_parts.push("--config".to_string());
exec_parts.push(config.config_path.clone());
}
exec_parts.extend([
"autopilot".to_string(),
"up".to_string(),
"--foreground".to_string(),
"--from-service".to_string(),
]);
let unit = format!(
"[Unit]\nDescription=Stakpak Autopilot Runtime\nAfter=network.target\n\n[Service]\nType=simple\nExecStart={}\nRestart=on-failure\nRestartSec=5\nWorkingDirectory={}\nEnvironment=HOME={}\nEnvironment=PATH=/usr/local/bin:/usr/bin:/bin\nStandardOutput=append:{}/stdout.log\nStandardError=append:{}/stderr.log\nNoNewPrivileges=true\n\n[Install]\nWantedBy=default.target\n",
shell_join(&exec_parts),
home.display(),
home.display(),
log_dir.display(),
log_dir.display(),
);
std::fs::write(&service_path, unit)
.map_err(|e| format!("Failed to write systemd service file: {}", e))?;
run_command(
"systemctl",
&["--user", "daemon-reload"],
"Failed to reload systemd",
)?;
run_command(
"systemctl",
&["--user", "enable", AUTOPILOT_SYSTEMD_SERVICE],
"Failed to enable autopilot service",
)?;
Ok(())
}
fn uninstall_systemd_service() -> Result<(), String> {
let service_path = autopilot_service_path();
let _ = std::process::Command::new("systemctl")
.args(["--user", "stop", AUTOPILOT_SYSTEMD_SERVICE])
.status();
let _ = std::process::Command::new("systemctl")
.args(["--user", "disable", AUTOPILOT_SYSTEMD_SERVICE])
.status();
if service_path.exists() {
std::fs::remove_file(&service_path)
.map_err(|e| format!("Failed to remove systemd service file: {}", e))?;
}
run_command(
"systemctl",
&["--user", "daemon-reload"],
"Failed to reload systemd",
)?;
Ok(())
}
fn install_launchd_service(config: &AppConfig) -> Result<(), String> {
let binary = std::env::current_exe()
.map_err(|e| format!("Failed to resolve stakpak binary path: {}", e))?;
let plist_path = autopilot_service_path();
if let Some(parent) = plist_path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create LaunchAgents directory: {}", e))?;
}
let log_dir = autopilot_log_dir();
std::fs::create_dir_all(&log_dir)
.map_err(|e| format!("Failed to create autopilot log directory: {}", e))?;
let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
let mut args = Vec::new();
if !config.profile_name.is_empty() {
args.push("<string>--profile</string>".to_string());
args.push(format!(
"<string>{}</string>",
xml_escape(&config.profile_name)
));
}
if !config.config_path.is_empty() {
args.push("<string>--config</string>".to_string());
args.push(format!(
"<string>{}</string>",
xml_escape(&config.config_path)
));
}
args.extend([
"<string>autopilot</string>".to_string(),
"<string>up</string>".to_string(),
"<string>--foreground</string>".to_string(),
"<string>--from-service</string>".to_string(),
]);
let plist = format!(
r#"<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{}</string>
<key>ProgramArguments</key>
<array>
<string>{}</string>
{}
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
</dict>
<key>WorkingDirectory</key>
<string>{}</string>
<key>StandardOutPath</key>
<string>{}/stdout.log</string>
<key>StandardErrorPath</key>
<string>{}/stderr.log</string>
<key>EnvironmentVariables</key>
<dict>
<key>HOME</key>
<string>{}</string>
<key>PATH</key>
<string>/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin</string>
</dict>
</dict>
</plist>
"#,
AUTOPILOT_LAUNCHD_LABEL,
xml_escape(&binary.display().to_string()),
args.join("\n "),
xml_escape(&home.display().to_string()),
xml_escape(&log_dir.display().to_string()),
xml_escape(&log_dir.display().to_string()),
xml_escape(&home.display().to_string()),
);
std::fs::write(&plist_path, plist)
.map_err(|e| format!("Failed to write launchd plist: {}", e))?;
Ok(())
}
fn uninstall_launchd_service() -> Result<(), String> {
let plist_path = autopilot_service_path();
let _ = std::process::Command::new("launchctl")
.args(["stop", AUTOPILOT_LAUNCHD_LABEL])
.status();
let _ = std::process::Command::new("launchctl")
.args(["unload", plist_path.to_string_lossy().as_ref()])
.status();
if plist_path.exists() {
std::fs::remove_file(&plist_path)
.map_err(|e| format!("Failed to remove launchd plist: {}", e))?;
}
Ok(())
}
fn run_command(command: &str, args: &[&str], context: &str) -> Result<(), String> {
let output = std::process::Command::new(command)
.args(args)
.output()
.map_err(|e| format!("{}: {}", context, e))?;
if output.status.success() {
Ok(())
} else {
Err(format!(
"{}: {}",
context,
String::from_utf8_lossy(&output.stderr)
))
}
}
fn shell_join(parts: &[String]) -> String {
parts
.iter()
.map(|part| {
if part
.chars()
.all(|c| c.is_ascii_alphanumeric() || matches!(c, '-' | '_' | '/' | '.' | ':'))
{
part.clone()
} else {
format!("'{}'", part.replace('\'', "'\\''"))
}
})
.collect::<Vec<_>>()
.join(" ")
}
fn xml_escape(value: &str) -> String {
value
.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_file_path(name: &str) -> PathBuf {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|value| value.as_nanos())
.unwrap_or(0);
std::env::temp_dir().join(format!(
"stakpak-{}-{}-{}.toml",
name,
std::process::id(),
nanos
))
}
fn write_profile_config(path: &Path, content: &str) {
let write_result = std::fs::write(path, content);
assert!(write_result.is_ok());
}
fn test_app_config(config_path: &Path) -> AppConfig {
AppConfig {
api_endpoint: "https://test".to_string(),
api_key: None,
provider: crate::config::ProviderType::Remote,
mcp_server_host: None,
machine_name: None,
auto_append_gitignore: None,
profile_name: String::new(),
config_path: config_path.to_string_lossy().to_string(),
allowed_tools: None,
auto_approve: None,
rulebooks: None,
warden: None,
providers: std::collections::HashMap::new(),
model: None,
system_prompt: None,
max_turns: None,
anonymous_id: None,
collect_telemetry: None,
editor: None,
recent_models: Vec::new(),
}
}
#[test]
fn config_roundtrip_save_load() {
let path = temp_file_path("autopilot-config");
let mut config = AutopilotConfigFile::default();
config.server.listen = "0.0.0.0:4111".to_string();
config.server.show_token = true;
config.server.no_auth = true;
config.server.model = Some("anthropic/claude-sonnet-4-5".to_string());
config.server.auto_approve_all = true;
let save_result = config.save_to_path(&path);
assert!(save_result.is_ok());
let loaded = AutopilotConfigFile::load_from_path(&path);
assert!(loaded.is_ok());
if let Ok(loaded) = loaded {
assert_eq!(loaded.server.listen, "0.0.0.0:4111");
assert!(loaded.server.show_token);
assert!(loaded.server.no_auth);
assert_eq!(
loaded.server.model.as_deref(),
Some("anthropic/claude-sonnet-4-5")
);
assert!(loaded.server.auto_approve_all);
}
let _ = std::fs::remove_file(path);
}
#[test]
fn loopback_base_url_resolves_unspecified_bind() {
let v4 = loopback_base_url_from_bind("0.0.0.0:4096");
let v6 = loopback_base_url_from_bind("[::]:4096");
assert_eq!(v4, "http://127.0.0.1:4096");
assert_eq!(v6, "http://[::1]:4096");
}
fn sample_schedule(name: &str) -> AutopilotScheduleConfig {
AutopilotScheduleConfig {
name: name.to_string(),
cron: "*/5 * * * *".to_string(),
prompt: "Check infra".to_string(),
check: None,
trigger_on: ScheduleTriggerOn::Failure,
// workdir: None,
max_steps: 50,
channel: None,
profile: None,
pause_on_approval: false,
sandbox: false,
enabled: true,
}
}
#[test]
fn schedule_add_remove_enable_disable_happy_path() {
let mut config = AutopilotConfigFile::default();
let add_result = add_schedule_in_config(&mut config, sample_schedule("health-check"));
assert!(add_result.is_ok());
assert_eq!(config.schedules.len(), 1);
let disable_result = set_schedule_enabled_in_config(&mut config, "health-check", false);
assert!(disable_result.is_ok());
assert!(!config.schedules[0].enabled);
let enable_result = set_schedule_enabled_in_config(&mut config, "health-check", true);
assert!(enable_result.is_ok());
assert!(config.schedules[0].enabled);
let remove_result = remove_schedule_in_config(&mut config, "health-check");
assert!(remove_result.is_ok());
assert!(config.schedules.is_empty());
}
#[test]
fn schedule_duplicate_name_rejected() {
let mut config = AutopilotConfigFile::default();
let first = add_schedule_in_config(&mut config, sample_schedule("drift-detect"));
assert!(first.is_ok());
let duplicate = add_schedule_in_config(&mut config, sample_schedule("drift-detect"));
assert!(duplicate.is_err());
}
#[test]
fn schedule_invalid_cron_rejected() {
let mut config = AutopilotConfigFile::default();
let mut schedule = sample_schedule("broken");
schedule.cron = "invalid cron".to_string();
let result = add_schedule_in_config(&mut config, schedule);
assert!(result.is_err());
}
#[test]
fn schedule_reserved_name_rejected() {
let mut config = AutopilotConfigFile::default();
let schedule = sample_schedule(crate::commands::watch::RELOAD_SENTINEL);
let result = add_schedule_in_config(&mut config, schedule);
assert!(result.is_err());
let message = result.expect_err("reserved schedule name should be rejected");
assert!(message.contains("reserved"));
}
#[test]
fn schedule_missing_check_script_rejected() {
let mut config = AutopilotConfigFile::default();
let mut schedule = sample_schedule("missing-check");
let missing = temp_file_path("autopilot-missing-check-script");
let _ = std::fs::remove_file(&missing);
schedule.check = Some(missing.to_string_lossy().to_string());
let result = add_schedule_in_config(&mut config, schedule);
assert!(result.is_err());
let message = result.expect_err("missing check script should be rejected");
assert!(message.contains("Check script not found"));
}
#[test]
fn schedule_existing_check_script_is_accepted() {
let mut config = AutopilotConfigFile::default();
let mut schedule = sample_schedule("existing-check");
let script_path = temp_file_path("autopilot-existing-check-script");
std::fs::write(&script_path, "#!/bin/sh\necho ok\n").expect("write check script");
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755))
.expect("set executable permission");
}
schedule.check = Some(script_path.to_string_lossy().to_string());
let result = add_schedule_in_config(&mut config, schedule);
assert!(result.is_ok());
assert_eq!(config.schedules.len(), 1);
let _ = std::fs::remove_file(script_path);
}
#[test]
fn history_limit_is_bounded() {
assert_eq!(bounded_history_limit(0), 1);
assert_eq!(bounded_history_limit(20), 20);
assert_eq!(bounded_history_limit(10_000), 1000);
}
#[test]
fn load_ignores_gateway_channel_schema() {
let path = temp_file_path("autopilot-gateway-channels");
let write_result = std::fs::write(
&path,
r##"
[server]
listen = "127.0.0.1:4096"
[channels.slack]
bot_token = "xoxb-test"
app_token = "xapp-test"
"##,
);
assert!(write_result.is_ok());
let loaded = AutopilotConfigFile::load_from_path(&path);
assert!(loaded.is_ok());
let _ = std::fs::remove_file(path);
}
#[test]
fn server_config_save_preserves_gateway_and_notifications_sections() {
let path = temp_file_path("autopilot-preserve");
let write_result = std::fs::write(
&path,
r##"
[server]
listen = "127.0.0.1:4096"
url = "http://127.0.0.1:4096"
token = "gateway-token"
[notifications]
gateway_url = "http://127.0.0.1:4096"
channel = "slack"
chat_id = "#infra"
[channels.slack]
bot_token = "xoxb-old"
app_token = "xapp-old"
"##,
);
assert!(write_result.is_ok());
let load_result = AutopilotConfigFile::load_from_path(&path);
assert!(load_result.is_ok());
let mut loaded = match load_result {
Ok(value) => value,
Err(error) => panic!("failed to load config: {error}"),
};
loaded.server.auto_approve_all = true;
let save_updated = loaded.save_to_path(&path);
assert!(save_updated.is_ok());
let reloaded = std::fs::read_to_string(&path);
assert!(reloaded.is_ok());
let reloaded = match reloaded {
Ok(value) => value,
Err(error) => panic!("failed to read config: {error}"),
};
assert!(reloaded.contains("[channels.slack]"));
assert!(reloaded.contains("bot_token = \"xoxb-old\""));
assert!(reloaded.contains("[notifications]"));
assert!(reloaded.contains("channel = \"slack\""));
assert!(reloaded.contains("chat_id = \"#infra\""));
assert!(reloaded.contains("auto_approve_all = true"));
let _ = std::fs::remove_file(path);
}
#[test]
fn set_default_notification_target_merges_without_overwrite() {
let path = temp_file_path("autopilot-notification-target");
let write_result = std::fs::write(
&path,
r##"
[server]
listen = "127.0.0.1:4096"
[[schedules]]
name = "health-check"
cron = "*/5 * * * *"
prompt = "Check system health"
[channels.slack]
bot_token = "xoxb-test"
app_token = "xapp-test"
"##,
);
assert!(write_result.is_ok());
let set_result = set_default_notification_target(path.as_path(), "slack", "#ops");
assert!(set_result.is_ok());
let reloaded = std::fs::read_to_string(&path);
assert!(reloaded.is_ok());
let reloaded = match reloaded {
Ok(value) => value,
Err(error) => panic!("failed to read config: {error}"),
};
assert!(reloaded.contains("[[schedules]]"));
assert!(reloaded.contains("[channels.slack]"));
assert!(reloaded.contains("[notifications]"));
assert!(reloaded.contains("channel = \"slack\""));
assert!(reloaded.contains("chat_id = \"#ops\""));
let _ = std::fs::remove_file(path);
}
#[test]
fn channel_add_with_target_updates_notifications() {
let path = temp_file_path("autopilot-channel-add-target");
let write_result = std::fs::write(
&path,
r##"
[server]
listen = "127.0.0.1:4096"
[[schedules]]
name = "health-check"
cron = "*/5 * * * *"
prompt = "Check system health"
"##,
);
assert!(write_result.is_ok());
let add_result = add_channel_with_optional_target(
path.as_path(),
ChannelType::Slack,
None,
Some("xoxb-test".to_string()),
Some("xapp-test".to_string()),
Some("#eng".to_string()),
None,
);
assert!(add_result.is_ok());
assert_eq!(add_result.ok(), Some(Some("#eng".to_string())));
let reloaded = std::fs::read_to_string(&path);
assert!(reloaded.is_ok());
let reloaded = match reloaded {
Ok(value) => value,
Err(error) => panic!("failed to read config: {error}"),
};
assert!(reloaded.contains("[channels.slack]"));
assert!(reloaded.contains("bot_token = \"xoxb-test\""));
assert!(reloaded.contains("app_token = \"xapp-test\""));
assert!(reloaded.contains("[notifications]"));
assert!(reloaded.contains("channel = \"slack\""));
assert!(reloaded.contains("chat_id = \"#eng\""));
assert!(reloaded.contains("[[schedules]]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn channel_add_with_invalid_target_is_atomic() {
let path = temp_file_path("autopilot-channel-add-invalid-target");
let write_result = std::fs::write(
&path,
r##"
[server]
listen = "127.0.0.1:4096"
[[schedules]]
name = "health-check"
cron = "*/5 * * * *"
prompt = "Check system health"
"##,
);
assert!(write_result.is_ok());
let add_result = add_channel_with_optional_target(
path.as_path(),
ChannelType::Slack,
None,
Some("xoxb-test".to_string()),
Some("xapp-test".to_string()),
Some(" ".to_string()),
None,
);
assert!(add_result.is_err());
let reloaded = std::fs::read_to_string(&path);
assert!(reloaded.is_ok());
let reloaded = match reloaded {
Ok(value) => value,
Err(error) => panic!("failed to read config: {error}"),
};
assert!(!reloaded.contains("[channels.slack]"));
assert!(!reloaded.contains("[notifications]"));
assert!(reloaded.contains("[[schedules]]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn channel_add_rejects_empty_tokens() {
let path = temp_file_path("autopilot-channel-add-empty-token");
let empty_telegram_result = add_channel_with_optional_target(
path.as_path(),
ChannelType::Telegram,
Some(" ".to_string()),
None,
None,
None,
None,
);
assert!(empty_telegram_result.is_err());
let empty_discord_result = add_channel_with_optional_target(
path.as_path(),
ChannelType::Discord,
Some(" ".to_string()),
None,
None,
None,
None,
);
assert!(empty_discord_result.is_err());
let empty_bot_result = add_channel_with_optional_target(
path.as_path(),
ChannelType::Slack,
None,
Some(" ".to_string()),
Some("xapp-test".to_string()),
None,
None,
);
assert!(empty_bot_result.is_err());
let empty_app_result = add_channel_with_optional_target(
path.as_path(),
ChannelType::Slack,
None,
Some("xoxb-test".to_string()),
Some(" ".to_string()),
None,
None,
);
assert!(empty_app_result.is_err());
let _ = std::fs::remove_file(path);
}
#[test]
fn channel_remove_recovers_from_invalid_channel_config() {
let path = temp_file_path("autopilot-channel-remove-invalid");
let write_result = std::fs::write(
&path,
r##"
[channels.slack]
bot_token = ""
app_token = "xapp-test"
"##,
);
assert!(write_result.is_ok());
let remove_result = remove_channel(path.as_path(), ChannelType::Slack);
assert!(remove_result.is_ok());
let reloaded = std::fs::read_to_string(&path);
assert!(reloaded.is_ok());
let reloaded = match reloaded {
Ok(value) => value,
Err(error) => panic!("failed to read config: {error}"),
};
assert!(!reloaded.contains("[channels.slack]"));
let _ = std::fs::remove_file(path);
}
#[tokio::test]
async fn schedule_add_writes_to_config() {
// Schedule add now works — it writes to the config file.
// We can't easily test it here without a temp config path,
// so just verify the helper functions work correctly.
let mut config = AutopilotConfigFile::default();
let schedule = AutopilotScheduleConfig {
name: "demo".to_string(),
cron: "*/5 * * * *".to_string(),
prompt: "hello".to_string(),
check: None,
trigger_on: ScheduleTriggerOn::Failure,
// workdir: None,
max_steps: 50,
channel: None,
profile: None,
pause_on_approval: false,
sandbox: false,
enabled: true,
};
let result = add_schedule_in_config(&mut config, schedule);
assert!(result.is_ok());
assert!(config.find_schedule("demo").is_some());
}
#[test]
fn gateway_channel_count_surfaces_invalid_channel_config() {
let path = temp_file_path("autopilot-invalid-gateway-channel");
let write_result = std::fs::write(
&path,
r##"
[channels.slack]
bot_token = ""
app_token = "xapp-test"
"##,
);
assert!(write_result.is_ok());
let count_result = gateway_channel_count(path.as_path());
assert!(count_result.is_err());
let _ = std::fs::remove_file(path);
}
#[tokio::test]
async fn channel_add_requires_token() {
// Channel add without token should fail with a helpful message
let profile_path = temp_file_path("autopilot-channel-token-required-profile");
write_profile_config(
&profile_path,
r#"
[settings]
editor = "nano"
[profiles.default]
api_key = "default-key"
"#,
);
let app_config = test_app_config(&profile_path);
let result = run_channel_command(
AutopilotChannelCommands::Add {
channel_type: ChannelType::Telegram,
token: None,
bot_token: None,
app_token: None,
target: None,
profile: None,
},
&app_config,
)
.await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Telegram token required"));
let _ = std::fs::remove_file(profile_path);
}
#[test]
fn validate_profile_reference_accepts_existing_profile() {
let profile_path = temp_file_path("autopilot-validate-profile-existing");
write_profile_config(
&profile_path,
r#"
[settings]
editor = "nano"
[profiles.default]
api_key = "default-key"
[profiles.ops]
api_key = "ops-key"
"#,
);
let app_config = test_app_config(&profile_path);
let result = validate_profile_reference("ops", &app_config);
assert!(result.is_ok());
let _ = std::fs::remove_file(profile_path);
}
#[test]
fn validate_profile_reference_rejects_reserved_all() {
let app_config = test_app_config(Path::new("/tmp/non-existent-config.toml"));
let result = validate_profile_reference("all", &app_config);
assert!(result.is_err());
assert!(
result
.expect_err("expected reserved profile error")
.contains("reserved")
);
}
#[test]
fn validate_profile_reference_rejects_empty_name() {
let app_config = test_app_config(Path::new("/tmp/non-existent-config.toml"));
let result = validate_profile_reference(" ", &app_config);
assert!(result.is_err());
assert!(
result
.expect_err("expected empty profile name error")
.contains("cannot be empty")
);
}
#[test]
fn validate_profile_reference_lists_available_profiles_on_missing() {
let profile_path = temp_file_path("autopilot-validate-profile-missing");
write_profile_config(
&profile_path,
r#"
[settings]
editor = "nano"
[profiles.default]
api_key = "default-key"
[profiles.ops]
api_key = "ops-key"
[profiles.monitoring]
api_key = "monitoring-key"
"#,
);
let app_config = test_app_config(&profile_path);
let result = validate_profile_reference("missing", &app_config);
assert!(result.is_err());
let message = result.expect_err("expected missing profile error");
assert!(message.contains("missing"));
assert!(message.contains("default"));
assert!(message.contains("monitoring"));
assert!(message.contains("ops"));
let _ = std::fs::remove_file(profile_path);
}
#[test]
fn validate_profile_reference_surfaces_config_load_errors() {
// Write invalid TOML so load_config_file returns a parse error
// (a missing file returns Ok(default_config) with a "default" profile).
let bad_path = temp_file_path("autopilot-validate-profile-bad-config");
std::fs::write(&bad_path, "{{{{invalid toml!!!!").expect("write bad config");
let app_config = test_app_config(&bad_path);
let result = validate_profile_reference("default", &app_config);
assert!(result.is_err());
assert!(
result
.expect_err("expected config load error")
.contains("Failed to load config.toml")
);
let _ = std::fs::remove_file(bad_path);
}
#[test]
fn resolve_policy_none_allowed_tools_falls_back_to_safe_list() {
let policy = resolve_server_tool_policy(None, None, false);
assert_eq!(
policy.action_for("view"),
stakpak_server::ToolApprovalAction::Approve
);
assert_eq!(
policy.action_for("run_command"),
stakpak_server::ToolApprovalAction::Ask
);
}
#[test]
fn resolve_policy_explicit_allowed_tools() {
let allowed_tools = vec!["view".to_string()];
let policy = resolve_server_tool_policy(Some(&allowed_tools), None, false);
assert_eq!(
policy.action_for("view"),
stakpak_server::ToolApprovalAction::Approve
);
assert_eq!(
policy.action_for("run_command"),
stakpak_server::ToolApprovalAction::Ask
);
}
#[test]
fn resolve_policy_auto_approve_all_overrides() {
let policy = resolve_server_tool_policy(None, None, true);
assert_eq!(
policy.action_for("run_command"),
stakpak_server::ToolApprovalAction::Approve
);
assert_eq!(
policy.action_for("some_future_tool"),
stakpak_server::ToolApprovalAction::Approve
);
}
#[test]
fn resolve_policy_auto_approve_extras_promoted() {
let allowed_tools = vec!["view".to_string()];
let auto_approve = vec!["run_command".to_string()];
let policy = resolve_server_tool_policy(Some(&allowed_tools), Some(&auto_approve), false);
assert_eq!(
policy.action_for("run_command"),
stakpak_server::ToolApprovalAction::Approve
);
}
#[test]
fn resolve_profile_run_overrides_loads_profile_values() {
let path = temp_file_path("profile-overrides");
write_profile_config(
&path,
r#"
[settings]
editor = "nano"
[profiles.default]
api_key = "default-key"
model = "openai/gpt-4o-mini"
[profiles.production]
api_key = "prod-key"
model = "anthropic/claude-sonnet-4-5"
allowed_tools = ["stakpak__view", ""]
auto_approve = ["stakpak__run_command", " "]
system_prompt = "production prompt"
max_turns = 32
"#,
);
let resolved =
resolve_profile_run_overrides("production", Some(path.to_string_lossy().as_ref()));
assert!(resolved.is_some());
if let Some(resolved) = resolved {
assert_eq!(
resolved.model.as_deref(),
Some("anthropic/claude-sonnet-4-5")
);
assert_eq!(resolved.allowed_tools, Some(vec!["view".to_string()]));
assert_eq!(resolved.auto_approve, Some(vec!["run_command".to_string()]));
assert_eq!(resolved.system_prompt.as_deref(), Some("production prompt"));
assert_eq!(resolved.max_turns, Some(32));
}
let _ = std::fs::remove_file(path);
}
#[test]
fn resolve_profile_run_overrides_preserves_explicit_empty_tool_lists() {
let path = temp_file_path("profile-overrides-empty-tools");
write_profile_config(
&path,
r#"
[settings]
editor = "nano"
[profiles.default]
api_key = "default-key"
[profiles.ops]
api_key = "ops-key"
allowed_tools = []
auto_approve = []
"#,
);
let resolved = resolve_profile_run_overrides("ops", Some(path.to_string_lossy().as_ref()));
assert!(resolved.is_some());
if let Some(resolved) = resolved {
assert_eq!(resolved.allowed_tools, Some(Vec::new()));
assert_eq!(resolved.auto_approve, Some(Vec::new()));
}
let _ = std::fs::remove_file(path);
}
#[test]
fn profile_run_override_resolver_maps_runtime_fields() {
let path = temp_file_path("gateway-channel-profile");
write_profile_config(
&path,
r#"
[settings]
editor = "nano"
[profiles.default]
api_key = "default-key"
[profiles.ops]
api_key = "ops-key"
model = "anthropic/claude-opus-4-5"
auto_approve = ["view"]
system_prompt = "ops prompt"
max_turns = 12
"#,
);
let resolver = ProfileRunOverrideResolver::new(path.to_string_lossy().to_string());
let resolved = stakpak_gateway::dispatcher::RunOverrideResolver::resolve_run_overrides(
&resolver, "ops",
);
assert!(resolved.is_some());
if let Some(resolved) = resolved {
assert_eq!(resolved.model.as_deref(), Some("anthropic/claude-opus-4-5"));
assert!(matches!(
resolved.auto_approve,
Some(stakpak_gateway::client::AutoApproveOverride::AllowList(ref tools)) if tools == &vec!["view".to_string()]
));
assert_eq!(resolved.system_prompt.as_deref(), Some("ops prompt"));
assert_eq!(resolved.max_turns, Some(12));
}
let _ = std::fs::remove_file(path);
}
#[test]
fn channel_profiles_map_reads_explicit_profiles() {
let mut gateway_cfg = stakpak_gateway::GatewayConfig::default();
gateway_cfg.channels.slack = Some(stakpak_gateway::config::SlackConfig {
bot_token: "xoxb-token".to_string(),
app_token: "xapp-token".to_string(),
model: None,
auto_approve: None,
profile: Some("ops".to_string()),
});
let profiles = gateway_cfg.channels.profiles_map();
assert_eq!(profiles.get("slack").map(String::as_str), Some("ops"));
assert!(!profiles.contains_key("telegram"));
}
#[test]
fn mcp_allowed_tools_unrestricted_when_policy_is_ask_default() {
let policy = resolve_server_tool_policy(None, None, false);
let allowed = mcp_allowed_tools_from_policy(&policy, None);
assert!(allowed.is_none());
}
#[test]
fn test_gateway_gets_allowlist_from_resolved_policy() {
let policy = resolve_server_tool_policy(None, None, false);
let mut gateway_cfg = stakpak_gateway::GatewayConfig::default();
apply_gateway_policy_from_resolved_tools(&mut gateway_cfg, &policy);
assert!(matches!(
gateway_cfg.gateway.approval_mode,
stakpak_gateway::ApprovalMode::Allowlist
));
assert!(!gateway_cfg.gateway.approval_allowlist.is_empty());
assert!(
gateway_cfg
.gateway
.approval_allowlist
.contains(&"view".to_string())
);
assert!(
gateway_cfg
.gateway
.approval_allowlist
.contains(&"stakpak__view".to_string())
);
}
#[test]
fn test_gateway_gets_allow_all_when_auto_approve_all() {
let policy = resolve_server_tool_policy(None, None, true);
let mut gateway_cfg = stakpak_gateway::GatewayConfig::default();
gateway_cfg.gateway.approval_mode = stakpak_gateway::ApprovalMode::Allowlist;
gateway_cfg.gateway.approval_allowlist = vec!["view".to_string()];
apply_gateway_policy_from_resolved_tools(&mut gateway_cfg, &policy);
assert!(matches!(
gateway_cfg.gateway.approval_mode,
stakpak_gateway::ApprovalMode::AllowAll
));
assert!(gateway_cfg.gateway.approval_allowlist.is_empty());
}
#[test]
fn status_json_schema_contains_core_fields() {
let payload = AutopilotStatusJson {
command: "autopilot.status",
ok: true,
profile: "default".to_string(),
config_path: "/tmp/autopilot.toml".to_string(),
server_config: AutopilotServerConfig::default(),
server_allowed_tool_count: 9,
service: ServiceStatusJson {
installed: true,
active: true,
path: "/tmp/service".to_string(),
},
server: EndpointStatusJson {
expected_enabled: true,
reachable: true,
url: "http://127.0.0.1:4096/v1/health".to_string(),
},
gateway: EndpointStatusJson {
expected_enabled: true,
reachable: false,
url: "http://127.0.0.1:4096/v1/gateway/status".to_string(),
},
sandbox: SandboxStatusJson {
mode: "persistent".to_string(),
healthy: Some(true),
consecutive_ok: Some(42),
consecutive_failures: Some(0),
last_ok: Some("2026-01-01T00:00:00Z".to_string()),
last_error: None,
},
scheduler: SchedulerStatusJson {
expected_enabled: true,
config_path: "/tmp/autopilot.toml".to_string(),
config_valid: true,
trigger_count: 2,
running: true,
pid: Some(123),
stale_pid: false,
db_path: Some("/tmp/autopilot.db".to_string()),
error: None,
recent_runs: vec![ScheduleRunSummaryJson {
id: 1,
schedule_name: "example".to_string(),
status: "completed".to_string(),
started_at: "2026-01-01T00:00:00Z".to_string(),
finished_at: Some("2026-01-01T00:00:10Z".to_string()),
error_message: None,
}],
},
schedules: vec![AutopilotScheduleStatusJson {
name: "health-check".to_string(),
cron: "*/5 * * * *".to_string(),
enabled: true,
sandbox: false,
next_run: Some("2026-01-01 00:05".to_string()),
}],
channels: vec![AutopilotChannelStatusJson {
name: "slack".to_string(),
channel_type: "slack".to_string(),
target: "#infra".to_string(),
enabled: true,
alerts_only: false,
}],
};
let json = serde_json::to_value(payload);
assert!(json.is_ok());
if let Ok(value) = json {
assert_eq!(
value.get("command").and_then(|v| v.as_str()),
Some("autopilot.status")
);
assert!(value.get("server_config").is_some());
assert!(value.get("server_allowed_tool_count").is_some());
assert!(value.get("service").is_some());
assert!(value.get("server").is_some());
assert!(value.get("gateway").is_some());
assert!(value.get("sandbox").is_some());
assert!(value.get("scheduler").is_some());
assert!(value.get("schedules").is_some());
assert!(value.get("channels").is_some());
// Verify sandbox fields
let sandbox = value.get("sandbox").expect("sandbox field");
assert_eq!(
sandbox.get("mode").and_then(|v| v.as_str()),
Some("persistent")
);
assert_eq!(sandbox.get("healthy").and_then(|v| v.as_bool()), Some(true));
assert_eq!(
sandbox.get("consecutive_ok").and_then(|v| v.as_u64()),
Some(42)
);
let scheduler_runs = value
.get("scheduler")
.and_then(|s| s.get("recent_runs"))
.and_then(|runs| runs.as_array())
.map(|runs| runs.len())
.unwrap_or_default();
assert_eq!(scheduler_runs, 1);
}
}
}