kanade-agent 0.5.1

Windows-side resident daemon for the kanade endpoint-management system. Subscribes to commands.* over NATS, runs scripts, publishes WMI inventory + heartbeats, watches for self-updates
use kanade_shared::subject;
use kanade_shared::wire::{EffectiveConfig, Heartbeat};
use tokio::sync::watch;
use tracing::{info, warn};

/// Heartbeat publisher loop. Cadence is taken from
/// [`EffectiveConfig::heartbeat_duration`] and updates live the
/// moment the config_supervisor pushes a new value on the watch
/// channel.
pub async fn heartbeat_loop(
    client: async_nats::Client,
    pc_id: String,
    agent_version: String,
    mut cfg_rx: watch::Receiver<EffectiveConfig>,
) {
    let mut current_dur = cfg_rx.borrow().heartbeat_duration();
    let mut ticker = tokio::time::interval(current_dur);
    info!(?current_dur, "heartbeat loop scheduled");

    loop {
        tokio::select! {
            _ = ticker.tick() => {
                let hb = Heartbeat {
                    pc_id: pc_id.clone(),
                    at: chrono::Utc::now(),
                    agent_version: agent_version.clone(),
                };
                let payload = match serde_json::to_vec(&hb) {
                    Ok(b) => b,
                    Err(e) => {
                        warn!(error = %e, "serialize heartbeat");
                        continue;
                    }
                };
                if let Err(e) = client
                    .publish(subject::heartbeat(&pc_id), payload.into())
                    .await
                {
                    warn!(error = %e, "publish heartbeat");
                }
            }
            res = cfg_rx.changed() => {
                if res.is_err() {
                    // Supervisor dropped: just keep ticking at the
                    // last cadence we knew.
                    continue;
                }
                let new_dur = cfg_rx.borrow().heartbeat_duration();
                if new_dur != current_dur {
                    info!(
                        old = ?current_dur,
                        new = ?new_dur,
                        "heartbeat interval changed; rescheduling ticker",
                    );
                    current_dur = new_dur;
                    ticker = tokio::time::interval(current_dur);
                }
            }
        }
    }
}