kanade-agent 0.6.1

Windows-side resident daemon for the kanade endpoint-management system. Subscribes to commands.* over NATS, runs scripts, publishes WMI inventory + heartbeats, watches for self-updates
mod commands;
mod config_supervisor;
mod groups;
mod heartbeat;
mod inventory;
mod process;
mod self_update;

#[cfg(target_os = "windows")]
mod service;

use std::path::PathBuf;

use anyhow::{Context, Result};
use clap::Parser;
use kanade_shared::config::load_agent_config;
use kanade_shared::{default_paths, subject};
use tracing::info;

const AGENT_VERSION: &str = env!("CARGO_PKG_VERSION");

#[derive(Parser, Debug)]
#[command(
    name = "kanade-agent",
    about = "Windows endpoint management agent (kanade)",
    version
)]
struct Cli {
    /// Path to agent.toml. When unset, the agent looks at
    /// $KANADE_AGENT_CONFIG, then `<config_dir>/agent.toml` (see
    /// kanade_shared::default_paths::config_dir).
    #[arg(long)]
    config: Option<PathBuf>,
}

/// Top-level entry point.
///
/// On Windows, we first try to attach to the Service Control Manager.
/// If that succeeds we run as a real Windows service (service.rs
/// owns the tokio runtime for the lifetime of the service); if it
/// fails with `ERROR_FAILED_SERVICE_CONTROLLER_CONNECT` (Win32 1063),
/// we fall through to console mode — convenient for `cargo run` and
/// for manual debugging.
///
/// On non-Windows targets we always run in console mode.
fn main() -> Result<()> {
    #[cfg(target_os = "windows")]
    {
        match service::try_run_as_service() {
            Ok(()) => return Ok(()),
            Err(e) if service::is_not_under_scm(&e) => {
                // Not started by SCM — fall through to console mode.
            }
            Err(e) => return Err(anyhow::anyhow!("service dispatcher failed: {e}")),
        }
    }

    let runtime = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .context("build tokio runtime")?;
    runtime.block_on(run_agent())
}

/// Run the agent's tokio main loop. Called either from console
/// mode (directly from `main`) or from inside the Windows service
/// entry point (see [`service::run_service`]).
pub(crate) async fn run_agent() -> Result<()> {
    // tracing subscriber is shared between console + service modes
    // and is idempotent (init() returns Err on second call, but we
    // only ever call once per process).
    let _ = tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "info,kanade_agent=debug".into()),
        )
        .try_init();

    cleanup_stale_upgrade_artifacts();

    let cli = Cli::parse();
    let cfg_path =
        default_paths::find_config(cli.config.as_deref(), "KANADE_AGENT_CONFIG", "agent.toml")?;
    let cfg =
        load_agent_config(&cfg_path).with_context(|| format!("load config from {cfg_path:?}"))?;
    info!(
        pc_id = %cfg.agent.id,
        nats_url = %cfg.agent.nats_url,
        version = AGENT_VERSION,
        "starting kanade-agent",
    );

    let client = kanade_shared::nats_client::connect(&cfg.agent.nats_url).await?;
    info!("connected to NATS");

    let cmd_all = client.subscribe(subject::COMMANDS_ALL).await?;
    let cmd_self = client
        .subscribe(subject::commands_pc(&cfg.agent.id))
        .await?;
    info!(
        commands_all = subject::COMMANDS_ALL,
        commands_self = %subject::commands_pc(&cfg.agent.id),
        "subscribed",
    );

    let pc_id = cfg.agent.id.clone();

    // Sprint 6: every fleet-wide knob (heartbeat cadence, inventory
    // cadence / jitter / enabled, target_version) is now sourced
    // from the agent_config KV bucket and watched live. The
    // supervisor publishes the resolved EffectiveConfig on a watch
    // channel; heartbeat / inventory / self_update subscribe.
    let cfg_rx = config_supervisor::spawn(client.clone(), pc_id.clone());

    // Sprint 6: cfg.inventory is parsed for back-compat but the
    // runtime sources cadence / jitter / enabled from the
    // agent_config KV bucket via cfg_rx. Warn the operator when
    // the local toml carries non-default values so they know to
    // migrate via `kanade config set inventory_interval=... ` etc.
    let inv_defaults = kanade_shared::config::InventorySection::default();
    if cfg.inventory.hw_interval != inv_defaults.hw_interval
        || cfg.inventory.jitter != inv_defaults.jitter
        || cfg.inventory.enabled != inv_defaults.enabled
    {
        tracing::warn!(
            local_inventory = ?cfg.inventory,
            "agent.toml::[inventory] is deprecated — values now come from the agent_config KV bucket; this section is logged-and-ignored. Use `kanade config set inventory_interval=...` (and friends) to migrate. The field will be removed in v0.4.0.",
        );
    }

    tokio::spawn(heartbeat::heartbeat_loop(
        client.clone(),
        pc_id.clone(),
        AGENT_VERSION.to_string(),
        cfg_rx.clone(),
    ));
    tokio::spawn(inventory::inventory_loop(
        client.clone(),
        pc_id.clone(),
        cfg_rx.clone(),
    ));
    tokio::spawn(self_update::run(
        client.clone(),
        AGENT_VERSION.to_string(),
        cfg_rx.clone(),
    ));

    // Group membership: Sprint 5 moves this from agent.toml (per-box
    // local config) to a server-managed KV bucket. The manager reads
    // `agent_groups.{pc_id}` from JetStream KV, spawns one
    // `commands.group.<name>` subscriber per current group, and reacts
    // to KV updates by adding / dropping subscriptions live.
    if !cfg.agent.groups.is_empty() {
        tracing::warn!(
            local_groups = ?cfg.agent.groups,
            "agent.toml::[agent] groups is deprecated; use `kanade agent groups set` instead — local value is ignored",
        );
    }
    tokio::spawn(groups::manage(client.clone(), pc_id.clone()));

    let _ = tokio::join!(
        commands::command_loop(client.clone(), pc_id.clone(), cmd_all),
        commands::command_loop(client.clone(), pc_id.clone(), cmd_self),
    );

    Ok(())
}

/// Remove `<exe>.old` / `<exe>.new` left over from the previous
/// self-update cycle. `.old` is the previous-version exe, no longer
/// loaded; `.new` would only exist if a swap was interrupted before
/// the final rename (the in-place exe is still valid in that case).
/// Either way, removal here keeps the install dir tidy and stops
/// stale binaries from accumulating across upgrade cycles.
fn cleanup_stale_upgrade_artifacts() {
    let Ok(current) = std::env::current_exe() else {
        return;
    };
    let Some(exe_dir) = current.parent() else {
        return;
    };
    let Some(exe_name) = current.file_name().and_then(|n| n.to_str()) else {
        return;
    };
    for suffix in ["old", "new"] {
        let path = exe_dir.join(format!("{exe_name}.{suffix}"));
        if !path.exists() {
            continue;
        }
        match std::fs::remove_file(&path) {
            Ok(_) => tracing::info!(?path, suffix, "removed stale upgrade artifact"),
            Err(e) => {
                tracing::warn!(?path, suffix, error = %e, "couldn't remove stale upgrade artifact")
            }
        }
    }
}