Skip to main content

kanade_shared/wire/
result.rs

1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4/// Prefix injected into the UUIDv5 name string for deriving legacy
5/// `result_id`s. Fixed marker so two backends (or one backend across
6/// restarts) projecting the same legacy payload arrive at the same
7/// id. Tied to the standard `Uuid::NAMESPACE_OID` namespace below.
8/// Bumping this prefix would break dedupe of legacy redeliveries
9/// crossing the upgrade — don't.
10const LEGACY_RESULT_ID_PREFIX: &str = "kanade-issue-19/legacy-result-id:";
11
12#[derive(Serialize, Deserialize, Debug, Clone)]
13pub struct ExecResult {
14    /// v0.29 / Issue #19: agent-minted UUID, unique per (Command, PC)
15    /// run. Replaces `request_id` as the projector's primary key so
16    /// broadcast Commands (commands.all / commands.group.X) — where N
17    /// PCs share one `request_id` — finally persist all N results
18    /// instead of silently dropping all but the first. Pre-v0.29
19    /// agents omit this field; it deserialises as the empty string,
20    /// and [`Self::stable_result_id`] derives a deterministic UUIDv5
21    /// from `(request_id, pc_id)` so legacy payloads (a) get distinct
22    /// ids across broadcast PCs (PC #2's row stops being dropped) and
23    /// (b) get the SAME id on JetStream redelivery (the new `ON
24    /// CONFLICT(result_id) DO NOTHING` path correctly dedupes, so
25    /// `executions.success_count` doesn't double-count across retries).
26    #[serde(default)]
27    pub result_id: String,
28    /// The NATS reply token. Still surfaced for joining back to the
29    /// `kanade run` request/reply path. No longer unique across rows
30    /// (broadcast Commands share it).
31    pub request_id: String,
32    /// v0.29 / Issue #19: back-link to `executions.exec_id`. Copied
33    /// from `Command.exec_id` by the agent. `None` for ad-hoc
34    /// `kanade run` (no deployment) and for results emitted by
35    /// pre-v0.29 agents (decoded via `serde(default)`).
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub exec_id: Option<String>,
38    pub pc_id: String,
39    pub exit_code: i32,
40    pub stdout: String,
41    pub stderr: String,
42    pub started_at: chrono::DateTime<chrono::Utc>,
43    pub finished_at: chrono::DateTime<chrono::Utc>,
44    /// v0.13: the manifest id that produced this result. Sourced
45    /// from `Command.id` (which is the YAML `manifest.id`, e.g.
46    /// `"inventory-hw"`). Distinct from the per-deploy UUID stored
47    /// in `Command.exec_id`. The results projector uses this to
48    /// look up the manifest's `inventory:` hint and upsert
49    /// `inventory_facts` rows for inventory-tagged jobs.
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub manifest_id: Option<String>,
52}
53
54impl ExecResult {
55    /// Return the `result_id` if the agent supplied one (v0.29+
56    /// payloads always do), otherwise derive a stable UUIDv5 from
57    /// `(request_id, pc_id)`. The projector calls this before INSERT
58    /// so legacy payloads still get a non-empty PK, AND so that
59    /// JetStream redeliveries of the same legacy payload hash to the
60    /// same id and dedupe via `ON CONFLICT`. Per-PC fan-out stays
61    /// distinct (different `pc_id` → different hash).
62    pub fn stable_result_id(&self) -> String {
63        if !self.result_id.is_empty() {
64            return self.result_id.clone();
65        }
66        let name = format!(
67            "{LEGACY_RESULT_ID_PREFIX}{}:{}",
68            self.request_id, self.pc_id
69        );
70        Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use chrono::TimeZone;
78
79    #[test]
80    fn exec_result_round_trips_through_json() {
81        let t0 = chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap();
82        let t1 = chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 5).unwrap();
83        let r = ExecResult {
84            result_id: "result-uuid-1".into(),
85            request_id: "req-1".into(),
86            exec_id: Some("exec-uuid-1".into()),
87            pc_id: "minipc".into(),
88            exit_code: 0,
89            stdout: "hello\n".into(),
90            stderr: String::new(),
91            started_at: t0,
92            finished_at: t1,
93            manifest_id: Some("inventory-hw".into()),
94        };
95        let json = serde_json::to_string(&r).unwrap();
96        let back: ExecResult = serde_json::from_str(&json).unwrap();
97        assert_eq!(back.result_id, r.result_id);
98        assert_eq!(back.request_id, r.request_id);
99        assert_eq!(back.exec_id.as_deref(), Some("exec-uuid-1"));
100        assert_eq!(back.exit_code, r.exit_code);
101        assert_eq!(back.stdout, r.stdout);
102        assert_eq!(back.started_at, t0);
103        assert_eq!(back.finished_at, t1);
104        assert_eq!(back.manifest_id.as_deref(), Some("inventory-hw"));
105    }
106
107    #[test]
108    fn exec_result_without_manifest_id_decodes() {
109        // Older agents (pre-0.13) sent ExecResult with no manifest_id field.
110        let json = r#"{
111            "request_id":"r","pc_id":"x","exit_code":0,
112            "stdout":"","stderr":"",
113            "started_at":"2026-05-16T00:00:00Z",
114            "finished_at":"2026-05-16T00:00:00Z"
115        }"#;
116        let r: ExecResult = serde_json::from_str(json).unwrap();
117        assert_eq!(r.manifest_id, None);
118    }
119
120    #[test]
121    fn exec_result_without_result_id_decodes_empty() {
122        // v0.29 / Issue #19: pre-v0.29 agents don't send `result_id`.
123        // `#[serde(default)]` decodes it as the empty string so the
124        // projector can detect "legacy payload" and call
125        // `stable_result_id()` to derive a deterministic PK.
126        let json = r#"{
127            "request_id":"r","pc_id":"x","exit_code":0,
128            "stdout":"","stderr":"",
129            "started_at":"2026-05-16T00:00:00Z",
130            "finished_at":"2026-05-16T00:00:00Z"
131        }"#;
132        let r: ExecResult = serde_json::from_str(json).unwrap();
133        assert_eq!(r.result_id, "");
134        assert!(r.exec_id.is_none());
135    }
136
137    #[test]
138    fn stable_result_id_is_deterministic_for_legacy_payload() {
139        // Gemini #65 medium fix: legacy redeliveries (same request_id +
140        // pc_id) must hash to the SAME result_id so the projector's
141        // ON CONFLICT(result_id) DO NOTHING dedupes — otherwise
142        // `executions.success_count` double-counts on JetStream ack
143        // timeouts.
144        let json = r#"{
145            "request_id":"r","pc_id":"x","exit_code":0,
146            "stdout":"","stderr":"",
147            "started_at":"2026-05-16T00:00:00Z",
148            "finished_at":"2026-05-16T00:00:00Z"
149        }"#;
150        let a: ExecResult = serde_json::from_str(json).unwrap();
151        let b: ExecResult = serde_json::from_str(json).unwrap();
152        assert_eq!(
153            a.stable_result_id(),
154            b.stable_result_id(),
155            "same legacy payload must hash to the same result_id",
156        );
157    }
158
159    #[test]
160    fn stable_result_id_differs_across_pcs_for_broadcast() {
161        // The other half: a broadcast Command published to two PCs
162        // produces two legacy ExecResults sharing one request_id but
163        // with different pc_ids. Each must get its OWN result_id so
164        // both rows persist (the whole point of Issue #19).
165        let json_a = r#"{
166            "request_id":"shared","pc_id":"pc-1","exit_code":0,
167            "stdout":"","stderr":"",
168            "started_at":"2026-05-16T00:00:00Z",
169            "finished_at":"2026-05-16T00:00:00Z"
170        }"#;
171        let json_b = r#"{
172            "request_id":"shared","pc_id":"pc-2","exit_code":0,
173            "stdout":"","stderr":"",
174            "started_at":"2026-05-16T00:00:00Z",
175            "finished_at":"2026-05-16T00:00:00Z"
176        }"#;
177        let a: ExecResult = serde_json::from_str(json_a).unwrap();
178        let b: ExecResult = serde_json::from_str(json_b).unwrap();
179        assert_ne!(
180            a.stable_result_id(),
181            b.stable_result_id(),
182            "different pc_id must produce a different result_id",
183        );
184    }
185
186    #[test]
187    fn stable_result_id_passes_through_explicit_value() {
188        // v0.29 agents always supply result_id; the helper must
189        // return that as-is (no surprise re-hashing).
190        let r = ExecResult {
191            result_id: "agent-minted-uuid".into(),
192            request_id: "r".into(),
193            exec_id: None,
194            pc_id: "x".into(),
195            exit_code: 0,
196            stdout: String::new(),
197            stderr: String::new(),
198            started_at: chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap(),
199            finished_at: chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap(),
200            manifest_id: None,
201        };
202        assert_eq!(r.stable_result_id(), "agent-minted-uuid");
203    }
204}