use chrono::Utc;
use futures::TryStreamExt;
use kanade_shared::ipc::envelope::RpcNotification;
use kanade_shared::ipc::error::{ErrorKind, RpcError};
use kanade_shared::ipc::jobs::{
JobProgress, JobsExecuteParams, JobsExecuteResult, JobsKillParams, JobsKillResult,
JobsListParams, JobsListResult, RunStatus, UserInvokableJob,
};
use kanade_shared::ipc::method;
use kanade_shared::kv::BUCKET_JOBS;
use kanade_shared::manifest::Manifest;
use kanade_shared::wire::Command;
use kanade_shared::{ExecResult, default_paths, subject};
use tokio::sync::mpsc;
use tracing::{debug, warn};
use uuid::Uuid;
use super::super::connection::ConnectionState;
use super::system::HandlerResult;
use crate::outbox;
use crate::process::{ExecOutcome, run_command_with_kill};
pub async fn handle_jobs_list(
conn: &ConnectionState,
params: JobsListParams,
) -> HandlerResult<JobsListResult> {
let client = conn.nats.as_ref().ok_or_else(|| {
RpcError::new(
ErrorKind::InternalError,
"jobs.list: NATS client not wired into the connection",
)
})?;
let js = async_nats::jetstream::new(client.clone());
let kv = js.get_key_value(BUCKET_JOBS).await.map_err(|e| {
warn!(error = %e, "jobs.list: failed to open BUCKET_JOBS");
RpcError::new(
ErrorKind::InternalError,
format!("jobs.list: open jobs catalog: {e}"),
)
})?;
let keys = kv.keys().await.map_err(|e| {
warn!(error = %e, "jobs.list: BUCKET_JOBS keys() failed");
RpcError::new(
ErrorKind::InternalError,
format!("jobs.list: scan jobs catalog: {e}"),
)
})?;
let keys: Vec<String> = keys.try_collect().await.map_err(|e| {
warn!(error = %e, "jobs.list: BUCKET_JOBS key stream faulted mid-iteration");
RpcError::new(
ErrorKind::InternalError,
format!("jobs.list: stream jobs catalog: {e}"),
)
})?;
let manifests: Vec<Manifest> = futures::future::join_all(keys.into_iter().map(|k| {
let kv = kv.clone();
async move {
match kv.get(&k).await {
Ok(Some(bytes)) => match serde_json::from_slice::<Manifest>(&bytes) {
Ok(m) => Some(m),
Err(e) => {
warn!(key = %k, error = %e, "jobs.list: skipping unparseable manifest");
None
}
},
Ok(None) => None,
Err(e) => {
warn!(key = %k, error = %e, "jobs.list: skipping unreadable manifest");
None
}
}
}
}))
.await
.into_iter()
.flatten()
.collect();
Ok(build_job_list(&manifests, params.category))
}
pub fn build_job_list(
manifests: &[Manifest],
filter: Option<kanade_shared::ipc::jobs::JobCategory>,
) -> JobsListResult {
let mut items: Vec<UserInvokableJob> = manifests
.iter()
.filter_map(manifest_to_job)
.filter(|j| filter.is_none_or(|c| j.category == c))
.collect();
items.sort_by(|a, b| {
a.display_name
.cmp(&b.display_name)
.then_with(|| a.id.cmp(&b.id))
});
JobsListResult { items }
}
fn manifest_to_job(m: &Manifest) -> Option<UserInvokableJob> {
let client = m.client.as_ref()?;
Some(UserInvokableJob {
id: m.id.clone(),
display_name: client.name.clone(),
display_description: client.description.clone(),
icon: client.icon.clone(),
category: client.category,
version: m.version.clone(),
last_run: None,
})
}
pub async fn handle_jobs_execute(
conn: &mut ConnectionState,
params: JobsExecuteParams,
) -> HandlerResult<JobsExecuteResult> {
let client = conn
.nats
.as_ref()
.ok_or_else(|| {
RpcError::new(
ErrorKind::InternalError,
"jobs.execute: NATS client not wired into the connection",
)
})?
.clone();
let manifest = fetch_manifest(&client, ¶ms.id).await?;
if manifest.client.is_none() {
return Err(RpcError::new(
ErrorKind::Unauthorized,
format!(
"job '{}' is not user-invokable (no client: block)",
params.id
),
));
}
let run_id = Uuid::new_v4().to_string();
let request_id = Uuid::new_v4().to_string();
let cmd = build_command(&manifest, &run_id, &request_id)?;
conn.register_run(run_id.clone());
let push_tx = conn.push_tx.clone();
let pc_id = conn.pc_id.clone();
let spawned_run_id = run_id.clone();
tokio::spawn(run_job(client, cmd, spawned_run_id, push_tx, pc_id));
Ok(JobsExecuteResult { run_id })
}
async fn fetch_manifest(client: &async_nats::Client, id: &str) -> Result<Manifest, RpcError> {
if !valid_job_id(id) {
return Err(RpcError::new(
ErrorKind::InvalidParams,
format!("job id '{id}' is not a valid job id (expected [A-Za-z0-9_.-])"),
));
}
let js = async_nats::jetstream::new(client.clone());
let kv = js.get_key_value(BUCKET_JOBS).await.map_err(|e| {
warn!(error = %e, "jobs.execute: failed to open BUCKET_JOBS");
RpcError::new(
ErrorKind::InternalError,
format!("jobs.execute: open jobs catalog: {e}"),
)
})?;
let bytes = kv
.get(id)
.await
.map_err(|e| {
warn!(key = %id, error = %e, "jobs.execute: KV get failed");
RpcError::new(
ErrorKind::InternalError,
format!("jobs.execute: read job '{id}': {e}"),
)
})?
.ok_or_else(|| RpcError::new(ErrorKind::NotFound, format!("job '{id}' not found")))?;
serde_json::from_slice::<Manifest>(&bytes).map_err(|e| {
warn!(key = %id, error = %e, "jobs.execute: manifest decode failed");
RpcError::new(
ErrorKind::InternalError,
format!("jobs.execute: decode job '{id}': {e}"),
)
})
}
fn valid_job_id(id: &str) -> bool {
!id.is_empty()
&& id
.chars()
.all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
}
pub fn build_command(
manifest: &Manifest,
run_id: &str,
request_id: &str,
) -> Result<Command, RpcError> {
if manifest.execute.script_object.is_some() {
return Err(RpcError::new(
ErrorKind::InvalidParams,
format!(
"job '{}' uses script_object, which is not yet runnable via KLP jobs.execute",
manifest.id
),
));
}
let script = manifest
.execute
.script
.as_deref()
.filter(|s| !s.is_empty())
.ok_or_else(|| {
RpcError::new(
ErrorKind::InvalidParams,
format!("job '{}' has no inline script to run", manifest.id),
)
})?
.to_string();
let timeout_secs = humantime::parse_duration(&manifest.execute.timeout)
.map_err(|e| {
RpcError::new(
ErrorKind::InvalidParams,
format!("job '{}' has an invalid timeout: {e}", manifest.id),
)
})?
.as_secs()
.max(1);
Ok(Command {
id: manifest.id.clone(),
version: manifest.version.clone(),
request_id: request_id.to_string(),
exec_id: Some(run_id.to_string()),
shell: manifest.execute.shell.into(),
script,
script_object: None,
script_object_sha256: None,
timeout_secs,
jitter_secs: None,
run_as: manifest.execute.run_as,
cwd: manifest.execute.cwd.clone(),
deadline_at: None,
staleness: manifest.staleness.clone(),
emit: None,
check: None,
retry: None,
})
}
async fn run_job(
client: async_nats::Client,
cmd: Command,
run_id: String,
push_tx: mpsc::Sender<Vec<u8>>,
pc_id: String,
) {
push_progress(
&push_tx,
JobProgress {
run_id: run_id.clone(),
status: RunStatus::Running,
stdout_chunk: None,
stderr_chunk: None,
exit_code: None,
},
)
.await;
let started_at = Utc::now();
let outcome = run_command_with_kill(&client, &cmd, None).await;
let finished_at = Utc::now();
let (terminal, exit_code, stdout, stderr) = match &outcome {
Ok(o) => {
let (code, out, err) = outcome_to_result_parts(&cmd, o);
(outcome_to_progress(run_id.clone(), o), code, out, err)
}
Err(e) => {
warn!(run_id = %run_id, pc_id = %pc_id, error = %e, "jobs.execute: run failed to start");
let msg = with_note("", &format!("agent failed to start the job: {e}"));
(
JobProgress {
run_id: run_id.clone(),
status: RunStatus::Failed,
stdout_chunk: None,
stderr_chunk: Some(msg.clone()),
exit_code: Some(-1),
},
-1,
String::new(),
msg,
)
}
};
debug!(run_id = %run_id, pc_id = %pc_id, status = ?terminal.status, "jobs.execute: run finished");
push_progress(&push_tx, terminal).await;
let result = build_exec_result(
&cmd,
&pc_id,
exit_code,
stdout,
stderr,
started_at,
finished_at,
);
enqueue_exec_result(result);
}
fn outcome_to_result_parts(cmd: &Command, outcome: &ExecOutcome) -> (i32, String, String) {
match outcome {
ExecOutcome::Completed {
exit_code,
stdout,
stderr,
} => (*exit_code, stdout.clone(), stderr.clone()),
ExecOutcome::Killed { stdout, stderr } => (
-1,
stdout.clone(),
with_note(stderr, "killed by the user via KLP jobs.kill"),
),
ExecOutcome::Timeout { stdout, stderr } => (
-1,
stdout.clone(),
with_note(stderr, &format!("timed out after {}s", cmd.timeout_secs)),
),
}
}
fn build_exec_result(
cmd: &Command,
pc_id: &str,
exit_code: i32,
stdout: String,
stderr: String,
started_at: chrono::DateTime<Utc>,
finished_at: chrono::DateTime<Utc>,
) -> ExecResult {
ExecResult {
result_id: Uuid::new_v4().to_string(),
request_id: cmd.request_id.clone(),
exec_id: None,
pc_id: pc_id.to_string(),
exit_code,
stdout,
stderr,
started_at,
finished_at,
stdout_object: None,
stderr_object: None,
manifest_id: Some(cmd.id.clone()),
}
}
fn enqueue_exec_result(result: ExecResult) {
let manifest_id = result.manifest_id.clone().unwrap_or_default();
let pc_id = result.pc_id.clone();
tokio::task::spawn_blocking(move || {
let outbox_dir = default_paths::data_dir().join("outbox");
match outbox::enqueue(&outbox_dir, &result) {
Ok(path) => debug!(
manifest_id = %manifest_id,
pc_id = %pc_id,
outbox = %path.display(),
"jobs.execute: ExecResult enqueued (operator visibility, #478)",
),
Err(e) => warn!(
manifest_id = %manifest_id,
pc_id = %pc_id,
error = %e,
"jobs.execute: ExecResult outbox enqueue failed (run still completed)",
),
}
});
}
fn with_note(stderr: &str, note: &str) -> String {
let trimmed = stderr.trim_end();
if trimmed.is_empty() {
format!("[KLP] {note}")
} else {
format!("{trimmed}\n[KLP] {note}")
}
}
const MAX_PROGRESS_CHUNK_BYTES: usize = 256 * 1024;
pub fn outcome_to_progress(run_id: String, outcome: &ExecOutcome) -> JobProgress {
use std::borrow::Cow;
let (status, exit_code, stdout, stderr): (RunStatus, i32, Cow<str>, Cow<str>) = match outcome {
ExecOutcome::Completed {
exit_code,
stdout,
stderr,
} => {
let status = if *exit_code == 0 {
RunStatus::Completed
} else {
RunStatus::Failed
};
(
status,
*exit_code,
Cow::Borrowed(stdout),
Cow::Borrowed(stderr),
)
}
ExecOutcome::Killed { stdout, stderr } => (
RunStatus::Killed,
-1,
Cow::Borrowed(stdout),
Cow::Borrowed(stderr),
),
ExecOutcome::Timeout { stdout, stderr } => {
const NOTE: &str =
"⏱ ジョブがタイムアウトしました(manifest の timeout で打ち切られました)";
let stderr = if stderr.trim().is_empty() {
NOTE.to_string()
} else {
format!("{stderr}\n{NOTE}")
};
(
RunStatus::Failed,
-1,
Cow::Borrowed(stdout),
Cow::Owned(stderr),
)
}
};
JobProgress {
run_id,
status,
stdout_chunk: (!stdout.is_empty()).then(|| cap_chunk(&stdout)),
stderr_chunk: (!stderr.is_empty()).then(|| cap_chunk(&stderr)),
exit_code: Some(exit_code),
}
}
fn cap_chunk(s: &str) -> String {
if s.len() <= MAX_PROGRESS_CHUNK_BYTES {
return s.to_string();
}
let mut end = MAX_PROGRESS_CHUNK_BYTES;
while !s.is_char_boundary(end) {
end -= 1;
}
format!("{}\n…[truncated: output exceeded 256 KiB]", &s[..end])
}
async fn push_progress(push_tx: &mpsc::Sender<Vec<u8>>, progress: JobProgress) {
let notif = match RpcNotification::new(method::JOBS_PROGRESS, &progress) {
Ok(n) => n,
Err(e) => {
warn!(error = %e, "jobs.progress: failed to encode notification");
return;
}
};
let body = match serde_json::to_vec(¬if) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "jobs.progress: failed to serialise frame");
return;
}
};
if push_tx.send(body).await.is_err() {
debug!("jobs.progress: push channel closed (client gone)");
}
}
pub async fn handle_jobs_kill(
conn: &ConnectionState,
params: JobsKillParams,
) -> HandlerResult<JobsKillResult> {
if !conn.owns_run(¶ms.run_id) {
return Err(RpcError::new(
ErrorKind::Unauthorized,
format!("run '{}' was not started on this connection", params.run_id),
));
}
let client = conn.nats.as_ref().ok_or_else(|| {
RpcError::new(
ErrorKind::InternalError,
"jobs.kill: NATS client not wired into the connection",
)
})?;
client
.publish(subject::kill(¶ms.run_id), bytes::Bytes::new())
.await
.map_err(|e| {
warn!(run_id = %params.run_id, error = %e, "jobs.kill: publish failed");
RpcError::new(
ErrorKind::InternalError,
format!("jobs.kill: publish kill signal: {e}"),
)
})?;
Ok(JobsKillResult {
requested_at: Utc::now(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use kanade_shared::ipc::jobs::JobCategory;
use kanade_shared::manifest::{ClientHint, Execute, ExecuteShell};
use kanade_shared::wire::{RunAs, Staleness};
fn manifest(id: &str, client: Option<(&str, JobCategory)>) -> Manifest {
Manifest {
id: id.into(),
version: "1.0.0".into(),
description: None,
execute: Execute {
shell: ExecuteShell::Powershell,
script: Some("echo hi".into()),
script_file: None,
script_object: None,
timeout: "30s".into(),
run_as: RunAs::default(),
cwd: None,
},
require_approval: false,
inventory: None,
emit: None,
check: None,
staleness: Staleness::default(),
client: client.map(|(name, category)| ClientHint {
name: name.into(),
description: None,
category,
icon: None,
}),
}
}
#[test]
fn lists_only_client_jobs() {
let manifests = [
manifest("inv-hw", None),
manifest(
"chrome-update",
Some(("Chrome を更新", JobCategory::SoftwareUpdate)),
),
manifest("check-bitlocker", None),
];
let result = build_job_list(&manifests, None);
assert_eq!(result.items.len(), 1);
assert_eq!(result.items[0].id, "chrome-update");
assert_eq!(result.items[0].display_name, "Chrome を更新");
assert_eq!(result.items[0].category, JobCategory::SoftwareUpdate);
assert!(result.items[0].last_run.is_none());
}
#[test]
fn category_filter_narrows_to_one_tab() {
let manifests = [
manifest(
"chrome-update",
Some(("Chrome", JobCategory::SoftwareUpdate)),
),
manifest("fix-teams", Some(("Teams 修復", JobCategory::Troubleshoot))),
manifest("install-slack", Some(("Slack", JobCategory::Catalog))),
];
let only_troubleshoot = build_job_list(&manifests, Some(JobCategory::Troubleshoot));
assert_eq!(only_troubleshoot.items.len(), 1);
assert_eq!(only_troubleshoot.items[0].id, "fix-teams");
}
#[test]
fn empty_when_no_client_jobs() {
let manifests = [manifest("inv-hw", None), manifest("inv-sw", None)];
let result = build_job_list(&manifests, None);
assert!(result.items.is_empty());
}
#[test]
fn maps_all_client_fields() {
let mut m = manifest("fix-teams", Some(("Teams 修復", JobCategory::Troubleshoot)));
if let Some(c) = m.client.as_mut() {
c.description = Some("重いとき用".into());
c.icon = Some("brush-cleaning".into());
}
let result = build_job_list(std::slice::from_ref(&m), None);
let row = &result.items[0];
assert_eq!(row.display_description.as_deref(), Some("重いとき用"));
assert_eq!(row.icon.as_deref(), Some("brush-cleaning"));
assert_eq!(row.version, "1.0.0");
}
#[test]
fn items_sorted_by_display_name() {
let manifests = [
manifest("z", Some(("Zebra", JobCategory::Catalog))),
manifest("a", Some(("Apple", JobCategory::Catalog))),
manifest("m", Some(("Mango", JobCategory::Catalog))),
];
let result = build_job_list(&manifests, None);
let names: Vec<&str> = result
.items
.iter()
.map(|j| j.display_name.as_str())
.collect();
assert_eq!(names, ["Apple", "Mango", "Zebra"]);
}
#[test]
fn build_command_maps_manifest_fields() {
let mut m = manifest("fix-teams", Some(("Teams 修復", JobCategory::Troubleshoot)));
m.execute.run_as = RunAs::User;
m.execute.cwd = Some("C:/temp".into());
m.execute.timeout = "90s".into();
let cmd = build_command(&m, "run-123", "req-9").expect("build");
assert_eq!(cmd.id, "fix-teams");
assert_eq!(cmd.version, "1.0.0");
assert_eq!(cmd.exec_id.as_deref(), Some("run-123")); assert_eq!(cmd.request_id, "req-9"); assert_eq!(cmd.script, "echo hi");
assert_eq!(cmd.timeout_secs, 90);
assert_eq!(cmd.run_as, RunAs::User);
assert_eq!(cmd.cwd.as_deref(), Some("C:/temp"));
assert!(cmd.jitter_secs.is_none());
assert!(cmd.deadline_at.is_none());
assert!(cmd.emit.is_none() && cmd.check.is_none());
assert!(cmd.script_object.is_none());
}
#[test]
fn build_command_rejects_script_object() {
let mut m = manifest("obj-job", Some(("Obj", JobCategory::Catalog)));
m.execute.script = None;
m.execute.script_object = Some("cleanup/1.0.0".into());
let err = build_command(&m, "r1", "req-1").expect_err("script_object unsupported");
assert_eq!(err.data.unwrap().kind, ErrorKind::InvalidParams);
}
#[test]
fn build_command_rejects_missing_inline_script() {
let mut m = manifest("empty", Some(("Empty", JobCategory::Catalog)));
m.execute.script = None;
let err = build_command(&m, "r1", "req-1").expect_err("no script");
let data = err.data.unwrap();
assert_eq!(data.kind, ErrorKind::InvalidParams);
assert!(data.detail.contains("no inline script"), "{}", data.detail);
}
#[test]
fn build_command_rejects_bad_timeout() {
let mut m = manifest("bad", Some(("Bad", JobCategory::Catalog)));
m.execute.timeout = "not-a-duration".into();
let err = build_command(&m, "r1", "req-1").expect_err("bad timeout");
let data = err.data.unwrap();
assert_eq!(data.kind, ErrorKind::InvalidParams);
assert!(data.detail.contains("timeout"), "{}", data.detail);
}
#[test]
fn build_command_floors_subsecond_timeout_to_one() {
let mut m = manifest("ms", Some(("Ms", JobCategory::Catalog)));
m.execute.timeout = "500ms".into();
let cmd = build_command(&m, "r1", "req-1").expect("build");
assert_eq!(cmd.timeout_secs, 1);
}
#[test]
fn valid_job_id_accepts_slugs_rejects_junk() {
for ok in ["chrome-update", "fix_teams.cache", "Job123", "a"] {
assert!(valid_job_id(ok), "{ok} should be valid");
}
for bad in ["", "has space", "wild*", "a>b", "with/slash", "qu?x"] {
assert!(!valid_job_id(bad), "{bad:?} should be invalid");
}
}
#[test]
fn cap_chunk_passes_through_small_output() {
assert_eq!(cap_chunk("hello"), "hello");
}
#[test]
fn cap_chunk_truncates_oversize_on_char_boundary() {
let big = "あ".repeat(MAX_PROGRESS_CHUNK_BYTES); let out = cap_chunk(&big);
assert!(out.len() < big.len(), "must shrink");
assert!(out.contains("truncated"), "must mark truncation");
assert!(out.is_char_boundary(out.len()));
}
#[test]
fn outcome_completed_zero_is_completed() {
let p = outcome_to_progress(
"r1".into(),
&ExecOutcome::Completed {
exit_code: 0,
stdout: "done".into(),
stderr: String::new(),
},
);
assert_eq!(p.status, RunStatus::Completed);
assert_eq!(p.exit_code, Some(0));
assert_eq!(p.stdout_chunk.as_deref(), Some("done"));
assert!(p.stderr_chunk.is_none());
}
#[test]
fn outcome_completed_nonzero_is_failed() {
let p = outcome_to_progress(
"r1".into(),
&ExecOutcome::Completed {
exit_code: 3,
stdout: String::new(),
stderr: "boom".into(),
},
);
assert_eq!(p.status, RunStatus::Failed);
assert_eq!(p.exit_code, Some(3));
assert!(p.stdout_chunk.is_none());
assert_eq!(p.stderr_chunk.as_deref(), Some("boom"));
}
#[test]
fn outcome_killed_maps_to_killed_minus_one() {
let p = outcome_to_progress(
"r1".into(),
&ExecOutcome::Killed {
stdout: String::new(),
stderr: String::new(),
},
);
assert_eq!(p.status, RunStatus::Killed);
assert_eq!(p.exit_code, Some(-1));
}
#[test]
fn outcome_timeout_maps_to_failed_with_note() {
let p = outcome_to_progress(
"r1".into(),
&ExecOutcome::Timeout {
stdout: String::new(),
stderr: String::new(),
},
);
assert_eq!(p.status, RunStatus::Failed);
assert_eq!(p.exit_code, Some(-1));
assert!(
p.stderr_chunk
.as_deref()
.is_some_and(|s| s.contains("タイムアウト")),
"stderr_chunk should carry the timeout note: {:?}",
p.stderr_chunk
);
}
#[test]
fn outcome_timeout_appends_note_after_existing_stderr() {
let p = outcome_to_progress(
"r1".into(),
&ExecOutcome::Timeout {
stdout: String::new(),
stderr: "partial output before kill".into(),
},
);
let chunk = p.stderr_chunk.expect("stderr present");
assert!(chunk.contains("partial output before kill"), "{chunk}");
assert!(chunk.contains("タイムアウト"), "{chunk}");
}
fn fresh_conn() -> ConnectionState {
use kanade_shared::ipc::state::StateSnapshot;
use kanade_shared::wire::EffectiveConfig;
use std::path::PathBuf;
use tokio::sync::watch;
let (_cfg_tx, cfg_rx) = watch::channel(EffectiveConfig::builtin_defaults());
let snapshot = StateSnapshot {
pc_id: "PC1".into(),
online: true,
vpn: "unknown".into(),
checks: vec![],
agent_version: "0.0.0".into(),
target_version: "0.0.0".into(),
};
let (_state_tx, state_rx) = watch::channel(snapshot);
let (push_tx, _push_rx) = mpsc::channel(8);
ConnectionState::new(
crate::klp::auth::PeerCredentials {
user: "DOMAIN\\alice".into(),
session_id: 2,
},
"PC1".into(),
"0.0.0".into(),
cfg_rx,
state_rx,
PathBuf::from("agent.log"),
push_tx,
)
}
#[tokio::test]
async fn kill_unknown_run_is_unauthorized() {
let conn = fresh_conn();
let err = handle_jobs_kill(
&conn,
JobsKillParams {
run_id: "never-started".into(),
},
)
.await
.expect_err("unknown run must be unauthorized");
assert_eq!(err.data.unwrap().kind, ErrorKind::Unauthorized);
}
#[tokio::test]
async fn kill_owned_run_passes_authorization() {
let mut conn = fresh_conn();
conn.register_run("run-mine".into());
let err = handle_jobs_kill(
&conn,
JobsKillParams {
run_id: "run-mine".into(),
},
)
.await
.expect_err("no nats wired → InternalError after auth passes");
assert_eq!(err.data.unwrap().kind, ErrorKind::InternalError);
}
fn cmd_fixture(id: &str) -> Command {
let m = manifest(id, Some(("Job", JobCategory::Catalog)));
build_command(&m, "run-1", "req-1").expect("build_command")
}
#[test]
fn exec_result_is_ad_hoc_with_manifest_id() {
let cmd = cmd_fixture("fix-teams");
let now = Utc::now();
let r = build_exec_result(&cmd, "PC1", 0, "ok".into(), String::new(), now, now);
assert!(r.exec_id.is_none(), "KLP run must be ad-hoc (exec_id None)");
assert_eq!(r.manifest_id.as_deref(), Some("fix-teams"));
assert_eq!(r.pc_id, "PC1");
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "ok");
assert!(!r.result_id.is_empty(), "result_id minted");
}
#[test]
fn result_parts_completed_passes_through() {
let cmd = cmd_fixture("j");
let (code, out, err) = outcome_to_result_parts(
&cmd,
&ExecOutcome::Completed {
exit_code: 2,
stdout: "o".into(),
stderr: "e".into(),
},
);
assert_eq!((code, out.as_str(), err.as_str()), (2, "o", "e"));
}
#[test]
fn result_parts_killed_and_timeout_annotate_stderr() {
let cmd = cmd_fixture("j");
let (code, _out, err) = outcome_to_result_parts(
&cmd,
&ExecOutcome::Killed {
stdout: String::new(),
stderr: String::new(),
},
);
assert_eq!(code, -1);
assert!(err.contains("killed"), "{err}");
let (code, _out, err) = outcome_to_result_parts(
&cmd,
&ExecOutcome::Timeout {
stdout: String::new(),
stderr: "partial".into(),
},
);
assert_eq!(code, -1);
assert!(
err.contains("partial") && err.contains("timed out"),
"{err}"
);
}
#[test]
fn with_note_appends_or_stands_alone() {
assert_eq!(with_note("", "x"), "[KLP] x");
assert_eq!(with_note(" ", "x"), "[KLP] x");
assert_eq!(with_note("out", "x"), "out\n[KLP] x");
assert_eq!(with_note("out\n", "x"), "out\n[KLP] x");
}
}