use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use async_nats::jetstream;
use kanade_shared::kv::OBJECT_AGENT_RELEASES;
use kanade_shared::wire::EffectiveConfig;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tracing::{info, warn};
#[derive(Serialize, Deserialize, Debug, Clone)]
struct LastSwap {
target: String,
running_before: String,
}
pub async fn run(
client: async_nats::Client,
running_version: String,
mut cfg_rx: watch::Receiver<EffectiveConfig>,
) {
let js = jetstream::new(client);
let store = match js.get_object_store(OBJECT_AGENT_RELEASES).await {
Ok(s) => s,
Err(_) => {
info!(
store = OBJECT_AGENT_RELEASES,
"agent_releases Object Store missing — self-update idle"
);
return;
}
};
let last_swap = read_last_swap();
if let Some(prev) = &last_swap {
info!(?prev, "recovered last_swap.json from prior cycle");
}
let (mut current_target, jitter) = {
let cfg = cfg_rx.borrow();
(
cfg.target_version.clone(),
cfg.target_version_jitter_duration(),
)
};
let mut loop_blocked_target: Option<String> = None;
if let Some(target) = current_target.as_deref()
&& target != running_version
{
if is_loop(&last_swap, target, &running_version) {
loop_blocked_target = Some(target.to_string());
warn!(
target,
running = %running_version,
"self-update LOOP detected — previous swap to this target produced the same running_version. \
Refusing to swap again. The binary under this label has a label/version mismatch; \
republish it or clear target_version (`kanade config unset target_version`)."
);
} else {
sleep_jitter(jitter).await;
if let Err(e) = attempt_swap(&store, target, &running_version).await {
warn!(error = %e, target, "initial self-update fetch failed");
}
}
} else if last_swap.is_some() {
clear_last_swap();
}
loop {
if cfg_rx.changed().await.is_err() {
return;
}
let (new_target, jitter) = {
let cfg = cfg_rx.borrow();
(
cfg.target_version.clone(),
cfg.target_version_jitter_duration(),
)
};
if new_target == current_target {
continue;
}
current_target = new_target.clone();
if loop_blocked_target.is_some() && loop_blocked_target.as_deref() != new_target.as_deref()
{
info!("target_version changed; clearing loop block");
loop_blocked_target = None;
clear_last_swap();
}
if let Some(target) = new_target.as_deref()
&& target != running_version
{
if loop_blocked_target.as_deref() == Some(target) {
warn!(target, "still loop-blocked on this target; ignoring");
continue;
}
sleep_jitter(jitter).await;
if let Err(e) = attempt_swap(&store, target, &running_version).await {
warn!(error = %e, target, "self-update fetch failed");
}
}
}
}
fn is_loop(last: &Option<LastSwap>, target: &str, running: &str) -> bool {
last.as_ref()
.map(|p| p.target == target && p.running_before == running)
.unwrap_or(false)
}
fn last_swap_path() -> Option<PathBuf> {
use kanade_shared::default_paths;
Some(default_paths::data_dir().join("last_swap.json"))
}
fn read_last_swap() -> Option<LastSwap> {
let path = last_swap_path()?;
let bytes = std::fs::read(&path).ok()?;
serde_json::from_slice(&bytes).ok()
}
fn write_last_swap(target: &str, running_before: &str) {
let Some(path) = last_swap_path() else {
return;
};
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let payload = LastSwap {
target: target.to_string(),
running_before: running_before.to_string(),
};
match serde_json::to_vec(&payload) {
Ok(b) => {
if let Err(e) = std::fs::write(&path, b) {
warn!(error = %e, ?path, "write last_swap.json");
}
}
Err(e) => warn!(error = %e, "encode last_swap.json"),
}
}
fn clear_last_swap() {
if let Some(path) = last_swap_path() {
let _ = std::fs::remove_file(path);
}
}
async fn attempt_swap(
store: &jetstream::object_store::ObjectStore,
target: &str,
running: &str,
) -> Result<()> {
write_last_swap(target, running);
maybe_download(store, target, running).await
}
async fn sleep_jitter(max: Duration) {
if max.is_zero() {
return;
}
let secs = max.as_secs();
let pick = if secs == 0 {
0
} else {
use rand::Rng;
rand::rng().random_range(0..=secs)
};
info!(
jitter_max_secs = secs,
sleep_secs = pick,
"self-update jitter — pausing before download"
);
tokio::time::sleep(Duration::from_secs(pick)).await;
}
async fn maybe_download(
store: &jetstream::object_store::ObjectStore,
target: &str,
running: &str,
) -> Result<()> {
if target == running {
info!(target, "target_version matches running — no self-update");
return Ok(());
}
info!(
target,
running, "target_version drift — downloading new binary"
);
let mut object = store
.get(target)
.await
.with_context(|| format!("object store get '{target}'"))?;
let staging = staging_path(target)?;
if let Some(parent) = staging.parent() {
tokio::fs::create_dir_all(parent).await.ok();
}
let mut file = tokio::fs::File::create(&staging)
.await
.with_context(|| format!("create {staging:?}"))?;
let mut hasher = Sha256::new();
let mut buf = [0u8; 64 * 1024];
let mut total: u64 = 0;
loop {
let n = tokio::io::AsyncReadExt::read(&mut object, &mut buf)
.await
.context("read object chunk")?;
if n == 0 {
break;
}
file.write_all(&buf[..n])
.await
.context("write staged exe")?;
hasher.update(&buf[..n]);
total += n as u64;
}
file.flush().await.ok();
let digest = hasher.finalize();
info!(
target,
path = ?staging,
bytes = total,
sha256 = %hex(&digest),
"staged new agent binary — beginning atomic swap",
);
swap_and_restart(&staging, target).await?;
Ok(())
}
async fn swap_and_restart(staged: &Path, target_version: &str) -> Result<()> {
let current = std::env::current_exe().context("current_exe")?;
let exe_dir = current
.parent()
.context("current_exe has no parent directory")?;
let exe_name = current
.file_name()
.and_then(|s| s.to_str())
.context("current_exe has no UTF-8 file name")?
.to_string();
let new_path = exe_dir.join(format!("{exe_name}.new"));
let old_path = exe_dir.join(format!("{exe_name}.old"));
let _ = tokio::fs::remove_file(&new_path).await;
let _ = tokio::fs::remove_file(&old_path).await;
tokio::fs::copy(staged, &new_path)
.await
.with_context(|| format!("copy {staged:?} -> {new_path:?}"))?;
tokio::fs::rename(¤t, &old_path)
.await
.with_context(|| format!("rename {current:?} -> {old_path:?}"))?;
tokio::fs::rename(&new_path, ¤t)
.await
.with_context(|| format!("rename {new_path:?} -> {current:?}"))?;
info!(
target = target_version,
replaced = ?current,
backup = ?old_path,
"swap complete — exiting (code 64); SCM failure-actions take over",
);
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
std::process::exit(64);
}
fn staging_path(version: &str) -> Result<PathBuf> {
use kanade_shared::default_paths;
let exe = std::env::current_exe().context("current_exe")?;
let stem = exe
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("kanade-agent")
.to_string();
Ok(default_paths::data_dir()
.join("staging")
.join(format!("{stem}.{version}.staged")))
}
fn hex(bytes: &[u8]) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = write!(out, "{b:02x}");
}
out
}