use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use async_nats::jetstream;
use kanade_shared::kv::OBJECT_AGENT_RELEASES;
use kanade_shared::wire::EffectiveConfig;
use sha2::{Digest, Sha256};
use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tracing::{info, warn};
pub async fn run(
client: async_nats::Client,
running_version: String,
mut cfg_rx: watch::Receiver<EffectiveConfig>,
) {
let js = jetstream::new(client);
let store = match js.get_object_store(OBJECT_AGENT_RELEASES).await {
Ok(s) => s,
Err(_) => {
info!(
store = OBJECT_AGENT_RELEASES,
"agent_releases Object Store missing — self-update idle"
);
return;
}
};
let (mut current_target, jitter) = {
let cfg = cfg_rx.borrow();
(
cfg.target_version.clone(),
cfg.target_version_jitter_duration(),
)
};
if let Some(target) = current_target.as_deref()
&& target != running_version
{
sleep_jitter(jitter).await;
if let Err(e) = maybe_download(&store, target, &running_version).await {
warn!(error = %e, target, "initial self-update fetch failed");
}
}
loop {
if cfg_rx.changed().await.is_err() {
return;
}
let (new_target, jitter) = {
let cfg = cfg_rx.borrow();
(
cfg.target_version.clone(),
cfg.target_version_jitter_duration(),
)
};
if new_target == current_target {
continue;
}
current_target = new_target.clone();
if let Some(target) = new_target.as_deref()
&& target != running_version
{
sleep_jitter(jitter).await;
if let Err(e) = maybe_download(&store, target, &running_version).await {
warn!(error = %e, target, "self-update fetch failed");
}
}
}
}
async fn sleep_jitter(max: Duration) {
if max.is_zero() {
return;
}
let secs = max.as_secs();
let pick = if secs == 0 {
0
} else {
use rand::Rng;
rand::rng().random_range(0..=secs)
};
info!(
jitter_max_secs = secs,
sleep_secs = pick,
"self-update jitter — pausing before download"
);
tokio::time::sleep(Duration::from_secs(pick)).await;
}
async fn maybe_download(
store: &jetstream::object_store::ObjectStore,
target: &str,
running: &str,
) -> Result<()> {
if target == running {
info!(target, "target_version matches running — no self-update");
return Ok(());
}
info!(
target,
running, "target_version drift — downloading new binary"
);
let mut object = store
.get(target)
.await
.with_context(|| format!("object store get '{target}'"))?;
let staging = staging_path(target)?;
if let Some(parent) = staging.parent() {
tokio::fs::create_dir_all(parent).await.ok();
}
let mut file = tokio::fs::File::create(&staging)
.await
.with_context(|| format!("create {staging:?}"))?;
let mut hasher = Sha256::new();
let mut buf = [0u8; 64 * 1024];
let mut total: u64 = 0;
loop {
let n = tokio::io::AsyncReadExt::read(&mut object, &mut buf)
.await
.context("read object chunk")?;
if n == 0 {
break;
}
file.write_all(&buf[..n])
.await
.context("write staged exe")?;
hasher.update(&buf[..n]);
total += n as u64;
}
file.flush().await.ok();
let digest = hasher.finalize();
info!(
target,
path = ?staging,
bytes = total,
sha256 = %hex(&digest),
"staged new agent binary — beginning atomic swap",
);
swap_and_restart(&staging, target).await?;
Ok(())
}
async fn swap_and_restart(staged: &Path, target_version: &str) -> Result<()> {
let current = std::env::current_exe().context("current_exe")?;
let exe_dir = current
.parent()
.context("current_exe has no parent directory")?;
let exe_name = current
.file_name()
.and_then(|s| s.to_str())
.context("current_exe has no UTF-8 file name")?
.to_string();
let new_path = exe_dir.join(format!("{exe_name}.new"));
let old_path = exe_dir.join(format!("{exe_name}.old"));
let _ = tokio::fs::remove_file(&new_path).await;
let _ = tokio::fs::remove_file(&old_path).await;
tokio::fs::copy(staged, &new_path)
.await
.with_context(|| format!("copy {staged:?} -> {new_path:?}"))?;
tokio::fs::rename(¤t, &old_path)
.await
.with_context(|| format!("rename {current:?} -> {old_path:?}"))?;
tokio::fs::rename(&new_path, ¤t)
.await
.with_context(|| format!("rename {new_path:?} -> {current:?}"))?;
info!(
target = target_version,
replaced = ?current,
backup = ?old_path,
"swap complete — exiting (code 64); SCM failure-actions take over",
);
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
std::process::exit(64);
}
fn staging_path(version: &str) -> Result<PathBuf> {
use kanade_shared::default_paths;
let exe = std::env::current_exe().context("current_exe")?;
let stem = exe
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("kanade-agent")
.to_string();
Ok(default_paths::data_dir()
.join("staging")
.join(format!("{stem}.{version}.staged")))
}
fn hex(bytes: &[u8]) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = write!(out, "{b:02x}");
}
out
}