use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use anyhow::Result;
use async_nats::jetstream::kv::Store;
use futures::StreamExt;
use kanade_shared::ExecResult;
use kanade_shared::default_paths;
use kanade_shared::kv::{BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, SCRIPT_STATUS_REVOKED};
use kanade_shared::wire::Command;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::outbox;
use crate::process::{ExecOutcome, apply_jitter, run_command_with_kill};
use crate::script_cache::ScriptCache;
use crate::staleness::{StalenessDecision, Tracker, decide as staleness_decide};
pub struct DedupCache {
seen: HashSet<String>,
order: VecDeque<String>,
cap: usize,
}
impl DedupCache {
pub fn new(cap: usize) -> Self {
Self {
seen: HashSet::with_capacity(cap),
order: VecDeque::with_capacity(cap),
cap,
}
}
pub fn insert(&mut self, id: String) -> bool {
if self.seen.contains(&id) {
return false;
}
self.seen.insert(id.clone());
self.order.push_back(id);
while self.order.len() > self.cap {
if let Some(old) = self.order.pop_front() {
self.seen.remove(&old);
}
}
true
}
}
pub fn shared_dedup_cache() -> Arc<Mutex<DedupCache>> {
Arc::new(Mutex::new(DedupCache::new(1024)))
}
pub async fn command_loop(
client: async_nats::Client,
pc_id: String,
dedup: Arc<Mutex<DedupCache>>,
staleness: Tracker,
mut sub: async_nats::Subscriber,
script_cache: ScriptCache,
check_sink: crate::check_cache::CheckSink,
) {
let jetstream = async_nats::jetstream::new(client.clone());
let script_current = jetstream.get_key_value(BUCKET_SCRIPT_CURRENT).await.ok();
let script_status = jetstream.get_key_value(BUCKET_SCRIPT_STATUS).await.ok();
if script_current.is_none() {
warn!(
bucket = BUCKET_SCRIPT_CURRENT,
"KV bucket missing — version-pinning skipped (run `kanade jetstream setup`)"
);
}
if script_status.is_none() {
warn!(
bucket = BUCKET_SCRIPT_STATUS,
"KV bucket missing — revoke check skipped (run `kanade jetstream setup`)"
);
}
while let Some(msg) = sub.next().await {
let cmd: Command = match serde_json::from_slice(&msg.payload) {
Ok(c) => c,
Err(e) => {
warn!(error = %e, subject = %msg.subject, "deserialize command");
continue;
}
};
if !dedup.lock().await.insert(cmd.request_id.clone()) {
debug!(
request_id = %cmd.request_id,
"core-sub dedup: already seen via replay or earlier delivery",
);
continue;
}
let client = client.clone();
let pc_id = pc_id.clone();
let cur = script_current.clone();
let sta = script_status.clone();
let staleness = staleness.clone();
let script_cache = script_cache.clone();
let check_sink = check_sink.clone();
tokio::spawn(async move {
if let Err(e) = handle_command(
client,
pc_id,
cmd,
cur,
sta,
staleness,
script_cache,
check_sink,
)
.await
{
error!(error = %e, "command handler failed");
}
});
}
}
fn outcome_is_retryable(outcome: &ExecOutcome) -> bool {
match outcome {
ExecOutcome::Completed { exit_code, .. } => *exit_code != 0,
ExecOutcome::Timeout { .. } => true,
ExecOutcome::Killed { .. } => false,
}
}
fn retry_note(attempt: u32, exit_code: i32, killed: bool) -> Option<String> {
(attempt > 0).then(|| {
let plural = if attempt == 1 { "retry" } else { "retries" };
if killed {
format!("stopped by remote kill after {attempt} {plural} (#418 on_failure.retry)")
} else if exit_code == 0 {
format!("succeeded after {attempt} {plural} (#418 on_failure.retry)")
} else {
format!("failed after {attempt} {plural} exhausted (#418 on_failure.retry)")
}
})
}
async fn wait_or_killed(
client: &async_nats::Client,
exec_id: Option<&str>,
backoff: std::time::Duration,
) -> bool {
let Some(eid) = exec_id else {
tokio::time::sleep(backoff).await;
return false;
};
let kill_subject = kanade_shared::subject::kill(eid);
match client.subscribe(kill_subject.clone()).await {
Ok(mut kill_sub) => {
tokio::select! {
_ = tokio::time::sleep(backoff) => false,
_ = kill_sub.next() => true,
}
}
Err(e) => {
warn!(
error = %e,
exec_id = %eid,
subject = %kill_subject,
"kill subscribe failed during retry backoff; sleeping deaf to kill",
);
tokio::time::sleep(backoff).await;
false
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn handle_command(
client: async_nats::Client,
pc_id: String,
mut cmd: Command,
script_current: Option<Store>,
script_status: Option<Store>,
staleness: Tracker,
script_cache: ScriptCache,
check_sink: crate::check_cache::CheckSink,
) -> Result<()> {
let staleness_now = staleness.staleness(&client);
match staleness_decide(&cmd.staleness, staleness_now) {
StalenessDecision::Proceed => {}
StalenessDecision::Skip { observed, allowed } => {
warn!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
observed_s = observed.as_secs(),
allowed_s = allowed.as_secs(),
"skip: staleness policy (mode=strict) exceeded — broker view too old",
);
return publish_staleness_skipped(&pc_id, &cmd, observed, allowed).await;
}
}
if let Some(cur) = &script_current
&& let Ok(Some(entry)) = cur.get(&cmd.id).await
{
let expected = String::from_utf8_lossy(&entry).to_string();
if expected != cmd.version {
warn!(
cmd_id = %cmd.id,
expected = %expected,
got = %cmd.version,
request_id = %cmd.request_id,
"skip stale command (version mismatch)",
);
return publish_version_mismatch_skipped(&pc_id, &cmd, &expected).await;
}
}
if let Some(sta) = &script_status
&& let Ok(Some(entry)) = sta.get(&cmd.id).await
{
let status = String::from_utf8_lossy(&entry).to_string();
if status == SCRIPT_STATUS_REVOKED {
warn!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
"skip revoked command",
);
return publish_revoked_skipped(&pc_id, &cmd).await;
}
}
let now = chrono::Utc::now();
if let Some(deadline) = cmd.deadline_at {
if should_skip_for_deadline(deadline, now) {
warn!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
%deadline,
%now,
"skip: starting deadline expired",
);
return publish_skipped(&client, &pc_id, &cmd, deadline, now).await;
}
}
if cmd.script.is_empty()
&& let Some(key) = cmd.script_object.as_deref()
{
let sha = cmd.script_object_sha256.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"Command {request_id} has script_object={key} but no script_object_sha256 \
— wire builder bug",
request_id = cmd.request_id,
)
})?;
match script_cache.resolve(key, sha).await {
Ok(body) => {
debug!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
%key,
sha256 = %sha,
size = body.len(),
"script_object resolved",
);
cmd.script = body;
}
Err(e) => {
warn!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
%key,
sha256 = %sha,
error = %e,
"script_object resolve failed — aborting run",
);
return Err(e);
}
}
}
apply_jitter(&cmd).await;
info!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
version = %cmd.version,
exec_id = ?cmd.exec_id,
"executing command",
);
let started_at = chrono::Utc::now();
let result_id = Uuid::new_v4().to_string();
let live_handle = crate::live_tail::register(&result_id);
if let Some(exec_id) = cmd.exec_id.as_deref() {
let event = kanade_shared::wire::EventStarted {
result_id: result_id.clone(),
request_id: cmd.request_id.clone(),
exec_id: exec_id.to_string(),
pc_id: pc_id.clone(),
started_at,
manifest_id: cmd.id.clone(),
version: cmd.version.clone(),
};
let events_outbox_dir = default_paths::data_dir().join("events-outbox");
match crate::events_outbox::enqueue(&events_outbox_dir, &event) {
Ok(p) => debug!(
result_id = %result_id,
events_outbox = %p.display(),
"started event enqueued (drain task delivers via JetStream)",
),
Err(e) => warn!(
error = %e,
result_id = %result_id,
"events_outbox enqueue failed; in-flight view will not show this row until ExecResult lands",
),
}
}
let max_retries = cmd.retry.map(|r| r.max).unwrap_or(0);
let backoff = cmd
.retry
.map(|r| std::time::Duration::from_secs(r.backoff_secs));
let mut attempt: u32 = 0;
let outcome = loop {
let outcome = run_command_with_kill(&client, &cmd, Some(live_handle.tail())).await?;
if !outcome_is_retryable(&outcome) || attempt >= max_retries {
break outcome;
}
attempt += 1;
warn!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
attempt,
max_retries,
backoff_secs = backoff.map(|b| b.as_secs()).unwrap_or(0),
"fire failed; retrying after backoff (#418 on_failure.retry)",
);
if let Some(b) = backoff
&& wait_or_killed(&client, cmd.exec_id.as_deref(), b).await
{
info!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
attempt,
"remote kill during retry backoff — aborting retries (#418 on_failure.retry)",
);
break ExecOutcome::Killed {
stdout: String::new(),
stderr: String::new(),
};
}
};
let finished_at = chrono::Utc::now();
let final_killed = matches!(outcome, ExecOutcome::Killed { .. });
drop(live_handle);
let (exit_code, stdout, stderr, status_note) = match outcome {
ExecOutcome::Completed {
exit_code,
stdout,
stderr,
} => (exit_code, stdout, stderr, None),
ExecOutcome::Killed { stdout, stderr } => {
let eid = cmd.exec_id.as_deref().unwrap_or("?");
(
-1,
stdout,
stderr,
Some(format!("killed by remote signal (kill.{eid})")),
)
}
ExecOutcome::Timeout { stdout, stderr } => (
-1,
stdout,
stderr,
Some(format!("timeout after {}s", cmd.timeout_secs)),
),
};
let stderr = [status_note, retry_note(attempt, exit_code, final_killed)]
.into_iter()
.flatten()
.fold(stderr, |acc, note| {
if acc.is_empty() {
note
} else {
format!("{acc}\n{note}")
}
});
if let Some(check_hint) = &cmd.check {
let check = if exit_code == 0 {
crate::check_cache::build_check(check_hint, &stdout)
} else {
crate::check_cache::build_check_failed(check_hint, exit_code, &stderr)
};
check_sink.record(check);
}
let stdout = if exit_code == 0
&& matches!(
cmd.emit.as_ref().map(|e| e.kind),
Some(kanade_shared::manifest::EmitKind::Events),
) {
forward_obs_events(stdout, pc_id.clone()).await;
String::new()
} else {
stdout
};
let result = ExecResult {
result_id: result_id.clone(),
request_id: cmd.request_id.clone(),
exec_id: cmd.exec_id.clone(),
pc_id: pc_id.clone(),
exit_code,
stdout,
stderr,
started_at,
finished_at,
stdout_object: None,
stderr_object: None,
manifest_id: Some(cmd.id.clone()),
};
let outbox_dir = default_paths::data_dir().join("outbox");
let path = outbox::enqueue(&outbox_dir, &result)?;
debug!(
request_id = %cmd.request_id,
exit_code,
outbox = %path.display(),
"result enqueued to outbox (drain task delivers via JetStream)",
);
let _ = client;
Ok(())
}
async fn forward_obs_events(stdout: String, pc_id: String) {
use kanade_shared::wire::ObsEvent;
let obs_outbox_dir = default_paths::data_dir().join("obs-outbox");
if let Err(e) = crate::obs_outbox::ensure_outbox_dir(&obs_outbox_dir) {
warn!(error = %e, "obs: ensure_outbox_dir failed; aborting forward");
return;
}
let pc_id_log = pc_id.clone();
let (ok, bad) = tokio::task::spawn_blocking(move || {
let mut ok = 0usize;
let mut bad = 0usize;
for (i, raw) in stdout.lines().enumerate() {
let trimmed = raw.trim();
if trimmed.is_empty() {
continue;
}
let mut event: ObsEvent = match serde_json::from_str(trimmed) {
Ok(e) => e,
Err(e) => {
warn!(
line_no = i + 1,
error = %e,
"obs: stdout line is not a valid ObsEvent JSON; skipping",
);
bad += 1;
continue;
}
};
event.pc_id = pc_id.clone();
if let Err(e) = crate::obs_outbox::enqueue(&obs_outbox_dir, &event) {
warn!(
line_no = i + 1,
error = %e,
"obs: enqueue to outbox failed; line dropped",
);
bad += 1;
} else {
ok += 1;
}
}
(ok, bad)
})
.await
.unwrap_or_else(|e| {
warn!(error = %e, "obs: forwarder task panicked / cancelled");
(0, 0)
});
debug!(ok, bad, pc_id = %pc_id_log, "obs: forwarded NDJSON stdout to obs-outbox");
}
fn should_skip_for_deadline(
deadline: chrono::DateTime<chrono::Utc>,
now: chrono::DateTime<chrono::Utc>,
) -> bool {
now > deadline
}
async fn publish_staleness_skipped(
pc_id: &str,
cmd: &Command,
observed: std::time::Duration,
allowed: std::time::Duration,
) -> Result<()> {
let now = chrono::Utc::now();
let stderr = format!(
"skipped: staleness policy (mode=strict) exceeded — agent has been disconnected for {}, max allowed {}",
humantime::format_duration(observed),
humantime::format_duration(allowed),
);
let result = ExecResult {
result_id: Uuid::new_v4().to_string(),
request_id: cmd.request_id.clone(),
exec_id: cmd.exec_id.clone(),
pc_id: pc_id.to_string(),
exit_code: 127,
stdout: String::new(),
stderr,
started_at: now,
finished_at: now,
stdout_object: None,
stderr_object: None,
manifest_id: Some(cmd.id.clone()),
};
let outbox_dir = default_paths::data_dir().join("outbox");
let path = outbox::enqueue(&outbox_dir, &result)?;
info!(
request_id = %cmd.request_id,
exit_code = 127,
outbox = %path.display(),
"staleness-skip result enqueued to outbox",
);
Ok(())
}
async fn publish_skipped(
_client: &async_nats::Client,
pc_id: &str,
cmd: &Command,
deadline: chrono::DateTime<chrono::Utc>,
now: chrono::DateTime<chrono::Utc>,
) -> Result<()> {
let lateness = now - deadline;
let stderr = format!(
"skipped: starting deadline expired {} ago (deadline {}, received {})",
humantime::format_duration(
lateness
.to_std()
.unwrap_or(std::time::Duration::from_secs(0))
),
deadline,
now,
);
let result = ExecResult {
result_id: Uuid::new_v4().to_string(),
request_id: cmd.request_id.clone(),
exec_id: cmd.exec_id.clone(),
pc_id: pc_id.to_string(),
exit_code: 125,
stdout: String::new(),
stderr,
started_at: now,
finished_at: now,
stdout_object: None,
stderr_object: None,
manifest_id: Some(cmd.id.clone()),
};
let outbox_dir = default_paths::data_dir().join("outbox");
let path = outbox::enqueue(&outbox_dir, &result)?;
info!(
request_id = %cmd.request_id,
exit_code = 125,
outbox = %path.display(),
"synthetic skipped-result enqueued to outbox",
);
Ok(())
}
async fn publish_version_mismatch_skipped(
pc_id: &str,
cmd: &Command,
expected: &str,
) -> Result<()> {
let now = chrono::Utc::now();
let stderr = format!(
"skipped: version-pin mismatch — script_current[{}] = {expected}, command brought {}",
cmd.id, cmd.version,
);
let result = ExecResult {
result_id: Uuid::new_v4().to_string(),
request_id: cmd.request_id.clone(),
exec_id: cmd.exec_id.clone(),
pc_id: pc_id.to_string(),
exit_code: 124,
stdout: String::new(),
stderr,
started_at: now,
finished_at: now,
stdout_object: None,
stderr_object: None,
manifest_id: Some(cmd.id.clone()),
};
let outbox_dir = default_paths::data_dir().join("outbox");
let path = outbox::enqueue(&outbox_dir, &result)?;
info!(
request_id = %cmd.request_id,
exit_code = 124,
outbox = %path.display(),
"version-mismatch skip result enqueued to outbox",
);
Ok(())
}
async fn publish_revoked_skipped(pc_id: &str, cmd: &Command) -> Result<()> {
let now = chrono::Utc::now();
let stderr = format!(
"skipped: command was revoked (script_status[{}] = revoked)",
cmd.id,
);
let result = ExecResult {
result_id: Uuid::new_v4().to_string(),
request_id: cmd.request_id.clone(),
exec_id: cmd.exec_id.clone(),
pc_id: pc_id.to_string(),
exit_code: 126,
stdout: String::new(),
stderr,
started_at: now,
finished_at: now,
stdout_object: None,
stderr_object: None,
manifest_id: Some(cmd.id.clone()),
};
let outbox_dir = default_paths::data_dir().join("outbox");
let path = outbox::enqueue(&outbox_dir, &result)?;
info!(
request_id = %cmd.request_id,
exit_code = 126,
outbox = %path.display(),
"revoked skip result enqueued to outbox",
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
fn at(secs: i64) -> chrono::DateTime<chrono::Utc> {
chrono::Utc
.timestamp_opt(1_700_000_000 + secs, 0)
.single()
.unwrap()
}
#[test]
fn now_strictly_before_deadline_runs() {
assert!(!should_skip_for_deadline(at(100), at(99)));
}
#[test]
fn now_one_second_before_deadline_runs() {
assert!(!should_skip_for_deadline(at(100), at(99)));
}
#[test]
fn now_exactly_at_deadline_still_runs() {
assert!(!should_skip_for_deadline(at(100), at(100)));
}
#[test]
fn now_one_second_past_deadline_skips() {
assert!(should_skip_for_deadline(at(100), at(101)));
}
#[test]
fn now_long_past_deadline_skips() {
assert!(should_skip_for_deadline(at(100), at(86400)));
}
fn completed(exit_code: i32) -> ExecOutcome {
ExecOutcome::Completed {
exit_code,
stdout: String::new(),
stderr: String::new(),
}
}
#[test]
fn clean_exit_is_not_retryable() {
assert!(!outcome_is_retryable(&completed(0)));
}
#[test]
fn nonzero_exit_is_retryable() {
assert!(outcome_is_retryable(&completed(1)));
assert!(outcome_is_retryable(&completed(-1)));
}
#[test]
fn timeout_is_retryable() {
assert!(outcome_is_retryable(&ExecOutcome::Timeout {
stdout: String::new(),
stderr: String::new(),
}));
}
#[test]
fn remote_kill_is_never_retried() {
assert!(!outcome_is_retryable(&ExecOutcome::Killed {
stdout: String::new(),
stderr: String::new(),
}));
}
#[test]
fn no_retry_emits_no_note() {
assert_eq!(retry_note(0, 0, false), None);
assert_eq!(retry_note(0, 1, false), None);
}
#[test]
fn retry_note_reports_eventual_success() {
let note = retry_note(2, 0, false).expect("a retry happened");
assert!(note.contains("succeeded after 2 retries"), "got: {note}");
}
#[test]
fn retry_note_reports_exhaustion() {
let note = retry_note(3, 1, false).expect("a retry happened");
assert!(
note.contains("failed after 3 retries exhausted"),
"got: {note}"
);
}
#[test]
fn retry_note_killed_is_not_exhausted() {
let note = retry_note(2, -1, true).expect("a retry happened");
assert!(
note.contains("stopped by remote kill after 2 retries"),
"got: {note}"
);
assert!(!note.contains("exhausted"), "got: {note}");
}
#[test]
fn retry_note_singular_for_one_retry() {
let note = retry_note(1, 0, false).expect("a retry happened");
assert!(note.contains("after 1 retry"), "got: {note}");
assert!(!note.contains("retries"), "got: {note}");
}
}