mod commands;
mod config_supervisor;
mod groups;
mod heartbeat;
mod inventory;
mod logs;
mod process;
mod self_update;
#[cfg(target_os = "windows")]
mod service;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use clap::Parser;
use kanade_shared::config::{LogSection, load_agent_config};
use kanade_shared::{default_paths, subject};
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
const AGENT_VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(Parser, Debug)]
#[command(
name = "kanade-agent",
about = "Windows endpoint management agent (kanade)",
version
)]
struct Cli {
#[arg(long)]
config: Option<PathBuf>,
}
fn main() -> Result<()> {
#[cfg(target_os = "windows")]
{
match service::try_run_as_service() {
Ok(()) => return Ok(()),
Err(e) if service::is_not_under_scm(&e) => {
}
Err(e) => return Err(anyhow::anyhow!("service dispatcher failed: {e}")),
}
}
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
runtime.block_on(run_agent())
}
pub(crate) async fn run_agent() -> Result<()> {
let cli = Cli::parse();
let cfg_path =
default_paths::find_config(cli.config.as_deref(), "KANADE_AGENT_CONFIG", "agent.toml")?;
let cfg =
load_agent_config(&cfg_path).with_context(|| format!("load config from {cfg_path:?}"))?;
let _log_guard = init_tracing(&cfg.log)
.with_context(|| format!("init tracing from [log] in {cfg_path:?}"))?;
cleanup_stale_upgrade_artifacts();
info!(
pc_id = %cfg.agent.id,
nats_url = %cfg.agent.nats_url,
version = AGENT_VERSION,
log_path = %cfg.log.path,
log_keep_days = cfg.log.keep_days,
"starting kanade-agent",
);
let client = kanade_shared::nats_client::connect(&cfg.agent.nats_url).await?;
info!("connected to NATS");
let cmd_all = client.subscribe(subject::COMMANDS_ALL).await?;
let cmd_self = client
.subscribe(subject::commands_pc(&cfg.agent.id))
.await?;
info!(
commands_all = subject::COMMANDS_ALL,
commands_self = %subject::commands_pc(&cfg.agent.id),
"subscribed",
);
let pc_id = cfg.agent.id.clone();
let cfg_rx = config_supervisor::spawn(client.clone(), pc_id.clone());
let inv_defaults = kanade_shared::config::InventorySection::default();
if cfg.inventory.hw_interval != inv_defaults.hw_interval
|| cfg.inventory.jitter != inv_defaults.jitter
|| cfg.inventory.enabled != inv_defaults.enabled
{
tracing::warn!(
local_inventory = ?cfg.inventory,
"agent.toml::[inventory] is deprecated — values now come from the agent_config KV bucket; this section is logged-and-ignored. Use `kanade config set inventory_interval=...` (and friends) to migrate. The field will be removed in v0.4.0.",
);
}
tokio::spawn(heartbeat::heartbeat_loop(
client.clone(),
pc_id.clone(),
AGENT_VERSION.to_string(),
cfg_rx.clone(),
));
tokio::spawn(inventory::inventory_loop(
client.clone(),
pc_id.clone(),
cfg_rx.clone(),
));
tokio::spawn(self_update::run(
client.clone(),
AGENT_VERSION.to_string(),
cfg_rx.clone(),
));
tokio::spawn(logs::serve(
client.clone(),
pc_id.clone(),
std::path::PathBuf::from(&cfg.log.path),
));
if !cfg.agent.groups.is_empty() {
tracing::warn!(
local_groups = ?cfg.agent.groups,
"agent.toml::[agent] groups is deprecated; use `kanade agent groups set` instead — local value is ignored",
);
}
tokio::spawn(groups::manage(client.clone(), pc_id.clone()));
let _ = tokio::join!(
commands::command_loop(client.clone(), pc_id.clone(), cmd_all),
commands::command_loop(client.clone(), pc_id.clone(), cmd_self),
);
Ok(())
}
fn init_tracing(log: &LogSection) -> Result<Option<tracing_appender::non_blocking::WorkerGuard>> {
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| log.level.clone().into());
if log.keep_days == 0 {
let _ = tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stdout))
.try_init();
return Ok(None);
}
let path = Path::new(&log.path);
let dir = path
.parent()
.with_context(|| format!("[log] path '{}' has no parent dir", log.path))?;
let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("agent");
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("log");
std::fs::create_dir_all(dir).with_context(|| format!("create log dir {dir:?}"))?;
let appender = tracing_appender::rolling::Builder::new()
.filename_prefix(stem)
.filename_suffix(ext)
.rotation(tracing_appender::rolling::Rotation::DAILY)
.max_log_files(log.keep_days)
.build(dir)
.context("build rolling file appender")?;
let (file_writer, guard) = tracing_appender::non_blocking(appender);
let _ = tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stdout))
.with(
tracing_subscriber::fmt::layer()
.with_writer(file_writer)
.with_ansi(false),
)
.try_init();
Ok(Some(guard))
}
fn cleanup_stale_upgrade_artifacts() {
let Ok(current) = std::env::current_exe() else {
return;
};
let Some(exe_dir) = current.parent() else {
return;
};
let Some(exe_name) = current.file_name().and_then(|n| n.to_str()) else {
return;
};
for suffix in ["old", "new"] {
let path = exe_dir.join(format!("{exe_name}.{suffix}"));
if !path.exists() {
continue;
}
match std::fs::remove_file(&path) {
Ok(_) => tracing::info!(?path, suffix, "removed stale upgrade artifact"),
Err(e) => {
tracing::warn!(?path, suffix, error = %e, "couldn't remove stale upgrade artifact")
}
}
}
}