Skip to main content

kanade_shared/wire/
result.rs

1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4/// Prefix injected into the UUIDv5 name string for deriving legacy
5/// `result_id`s. Fixed marker so two backends (or one backend across
6/// restarts) projecting the same legacy payload arrive at the same
7/// id. Tied to the standard `Uuid::NAMESPACE_OID` namespace below.
8/// Bumping this prefix would break dedupe of legacy redeliveries
9/// crossing the upgrade — don't.
10const LEGACY_RESULT_ID_PREFIX: &str = "kanade-issue-19/legacy-result-id:";
11
12#[derive(Serialize, Deserialize, Debug, Clone)]
13pub struct ExecResult {
14    /// v0.29 / Issue #19: agent-minted UUID, unique per (Command, PC)
15    /// run. Replaces `request_id` as the projector's primary key so
16    /// broadcast Commands (commands.all / commands.group.X) — where N
17    /// PCs share one `request_id` — finally persist all N results
18    /// instead of silently dropping all but the first. Pre-v0.29
19    /// agents omit this field; it deserialises as the empty string,
20    /// and [`Self::stable_result_id`] derives a deterministic UUIDv5
21    /// from `(request_id, pc_id)` so legacy payloads (a) get distinct
22    /// ids across broadcast PCs (PC #2's row stops being dropped) and
23    /// (b) get the SAME id on JetStream redelivery (the new `ON
24    /// CONFLICT(result_id) DO NOTHING` path correctly dedupes, so
25    /// `executions.success_count` doesn't double-count across retries).
26    #[serde(default)]
27    pub result_id: String,
28    /// The NATS reply token. Still surfaced for joining back to the
29    /// `kanade run` request/reply path. No longer unique across rows
30    /// (broadcast Commands share it).
31    pub request_id: String,
32    /// v0.29 / Issue #19: back-link to `executions.exec_id`. Copied
33    /// from `Command.exec_id` by the agent. `None` for ad-hoc
34    /// `kanade run` (no deployment) and for results emitted by
35    /// pre-v0.29 agents (decoded via `serde(default)`).
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub exec_id: Option<String>,
38    pub pc_id: String,
39    pub exit_code: i32,
40    /// stdout. Empty string when [`Self::stdout_object`] is set — the
41    /// agent overflowed the bytes into [`crate::kv::OBJECT_RESULT_OUTPUT`]
42    /// because the inline payload would have exceeded NATS's default
43    /// `max_payload` (#227). The backend projector derefs the pointer
44    /// before inserting; SQLite still stores the full text inline so
45    /// the SPA Activity page reads unchanged.
46    pub stdout: String,
47    pub stderr: String,
48    pub started_at: chrono::DateTime<chrono::Utc>,
49    pub finished_at: chrono::DateTime<chrono::Utc>,
50    /// Object Store key under [`crate::kv::OBJECT_RESULT_OUTPUT`] when
51    /// `stdout` overflowed the agent's inline threshold (#227). Set to
52    /// `Some("<request_id>/stdout")` by the agent's outbox drain; the
53    /// backend projector fetches the bytes from that key and uses them
54    /// in place of the (empty) `stdout` field. `None` for the common
55    /// small-stdout case + every pre-#227 payload (`serde(default)`
56    /// keeps older results decodable).
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub stdout_object: Option<String>,
59    /// Sibling of `stdout_object` for the stderr stream. Same key
60    /// shape (`<request_id>/stderr`).
61    #[serde(default, skip_serializing_if = "Option::is_none")]
62    pub stderr_object: Option<String>,
63    /// v0.13: the manifest id that produced this result. Sourced
64    /// from `Command.id` (which is the YAML `manifest.id`, e.g.
65    /// `"inventory-hw"`). Distinct from the per-deploy UUID stored
66    /// in `Command.exec_id`. The results projector uses this to
67    /// look up the manifest's `inventory:` hint and upsert
68    /// `inventory_facts` rows for inventory-tagged jobs.
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub manifest_id: Option<String>,
71    /// #219: Object Store key under [`crate::kv::OBJECT_COLLECTIONS`] for
72    /// the bundle this run collected, when the job carried a `collect:`
73    /// hint and the run succeeded. Set by the agent to
74    /// `Some("<pc_id>/<job_id>/<rfc3339>.zip")` after it zips the
75    /// script's listed files and uploads the archive. `None` for every
76    /// non-collect job + every pre-#219 payload (`serde(default)` keeps
77    /// older results decodable). The SPA Collect page lists / downloads
78    /// these straight from the bucket.
79    #[serde(default, skip_serializing_if = "Option::is_none")]
80    pub collect_object: Option<String>,
81}
82
83impl ExecResult {
84    /// Return the `result_id` if the agent supplied one (v0.29+
85    /// payloads always do), otherwise derive a stable UUIDv5 from
86    /// `(request_id, pc_id)`. The projector calls this before INSERT
87    /// so legacy payloads still get a non-empty PK, AND so that
88    /// JetStream redeliveries of the same legacy payload hash to the
89    /// same id and dedupe via `ON CONFLICT`. Per-PC fan-out stays
90    /// distinct (different `pc_id` → different hash).
91    pub fn stable_result_id(&self) -> String {
92        if !self.result_id.is_empty() {
93            return self.result_id.clone();
94        }
95        let name = format!(
96            "{LEGACY_RESULT_ID_PREFIX}{}:{}",
97            self.request_id, self.pc_id
98        );
99        Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use chrono::TimeZone;
107
108    #[test]
109    fn exec_result_round_trips_through_json() {
110        let t0 = chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap();
111        let t1 = chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 5).unwrap();
112        let r = ExecResult {
113            result_id: "result-uuid-1".into(),
114            request_id: "req-1".into(),
115            exec_id: Some("exec-uuid-1".into()),
116            pc_id: "pc-01".into(),
117            exit_code: 0,
118            stdout: "hello\n".into(),
119            stderr: String::new(),
120            started_at: t0,
121            finished_at: t1,
122            stdout_object: None,
123            stderr_object: None,
124            manifest_id: Some("inventory-hw".into()),
125            collect_object: None,
126        };
127        let json = serde_json::to_string(&r).unwrap();
128        let back: ExecResult = serde_json::from_str(&json).unwrap();
129        assert_eq!(back.result_id, r.result_id);
130        assert_eq!(back.request_id, r.request_id);
131        assert_eq!(back.exec_id.as_deref(), Some("exec-uuid-1"));
132        assert_eq!(back.exit_code, r.exit_code);
133        assert_eq!(back.stdout, r.stdout);
134        assert_eq!(back.started_at, t0);
135        assert_eq!(back.finished_at, t1);
136        assert_eq!(back.manifest_id.as_deref(), Some("inventory-hw"));
137    }
138
139    #[test]
140    fn exec_result_without_manifest_id_decodes() {
141        // Older agents (pre-0.13) sent ExecResult with no manifest_id field.
142        let json = r#"{
143            "request_id":"r","pc_id":"x","exit_code":0,
144            "stdout":"","stderr":"",
145            "started_at":"2026-05-16T00:00:00Z",
146            "finished_at":"2026-05-16T00:00:00Z"
147        }"#;
148        let r: ExecResult = serde_json::from_str(json).unwrap();
149        assert_eq!(r.manifest_id, None);
150    }
151
152    #[test]
153    fn exec_result_without_result_id_decodes_empty() {
154        // v0.29 / Issue #19: pre-v0.29 agents don't send `result_id`.
155        // `#[serde(default)]` decodes it as the empty string so the
156        // projector can detect "legacy payload" and call
157        // `stable_result_id()` to derive a deterministic PK.
158        let json = r#"{
159            "request_id":"r","pc_id":"x","exit_code":0,
160            "stdout":"","stderr":"",
161            "started_at":"2026-05-16T00:00:00Z",
162            "finished_at":"2026-05-16T00:00:00Z"
163        }"#;
164        let r: ExecResult = serde_json::from_str(json).unwrap();
165        assert_eq!(r.result_id, "");
166        assert!(r.exec_id.is_none());
167    }
168
169    #[test]
170    fn stable_result_id_is_deterministic_for_legacy_payload() {
171        // Gemini #65 medium fix: legacy redeliveries (same request_id +
172        // pc_id) must hash to the SAME result_id so the projector's
173        // ON CONFLICT(result_id) DO NOTHING dedupes — otherwise
174        // `executions.success_count` double-counts on JetStream ack
175        // timeouts.
176        let json = r#"{
177            "request_id":"r","pc_id":"x","exit_code":0,
178            "stdout":"","stderr":"",
179            "started_at":"2026-05-16T00:00:00Z",
180            "finished_at":"2026-05-16T00:00:00Z"
181        }"#;
182        let a: ExecResult = serde_json::from_str(json).unwrap();
183        let b: ExecResult = serde_json::from_str(json).unwrap();
184        assert_eq!(
185            a.stable_result_id(),
186            b.stable_result_id(),
187            "same legacy payload must hash to the same result_id",
188        );
189    }
190
191    #[test]
192    fn stable_result_id_differs_across_pcs_for_broadcast() {
193        // The other half: a broadcast Command published to two PCs
194        // produces two legacy ExecResults sharing one request_id but
195        // with different pc_ids. Each must get its OWN result_id so
196        // both rows persist (the whole point of Issue #19).
197        let json_a = r#"{
198            "request_id":"shared","pc_id":"pc-1","exit_code":0,
199            "stdout":"","stderr":"",
200            "started_at":"2026-05-16T00:00:00Z",
201            "finished_at":"2026-05-16T00:00:00Z"
202        }"#;
203        let json_b = r#"{
204            "request_id":"shared","pc_id":"pc-2","exit_code":0,
205            "stdout":"","stderr":"",
206            "started_at":"2026-05-16T00:00:00Z",
207            "finished_at":"2026-05-16T00:00:00Z"
208        }"#;
209        let a: ExecResult = serde_json::from_str(json_a).unwrap();
210        let b: ExecResult = serde_json::from_str(json_b).unwrap();
211        assert_ne!(
212            a.stable_result_id(),
213            b.stable_result_id(),
214            "different pc_id must produce a different result_id",
215        );
216    }
217
218    #[test]
219    fn stable_result_id_passes_through_explicit_value() {
220        // v0.29 agents always supply result_id; the helper must
221        // return that as-is (no surprise re-hashing).
222        let r = ExecResult {
223            result_id: "agent-minted-uuid".into(),
224            request_id: "r".into(),
225            exec_id: None,
226            pc_id: "x".into(),
227            exit_code: 0,
228            stdout: String::new(),
229            stderr: String::new(),
230            started_at: chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap(),
231            finished_at: chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap(),
232            stdout_object: None,
233            stderr_object: None,
234            manifest_id: None,
235            collect_object: None,
236        };
237        assert_eq!(r.stable_result_id(), "agent-minted-uuid");
238    }
239
240    #[test]
241    fn exec_result_collect_object_round_trips_and_omits_when_absent() {
242        // #219: collect_object is off the wire when None
243        // (skip_serializing_if) so pre-#219 readers stay compatible...
244        let t0 = chrono::Utc.with_ymd_and_hms(2026, 6, 15, 0, 0, 0).unwrap();
245        let mut r = ExecResult {
246            result_id: "r1".into(),
247            request_id: "req".into(),
248            exec_id: None,
249            pc_id: "PC1".into(),
250            exit_code: 0,
251            stdout: String::new(),
252            stderr: String::new(),
253            started_at: t0,
254            finished_at: t0,
255            stdout_object: None,
256            stderr_object: None,
257            manifest_id: Some("collect-diagnostics".into()),
258            collect_object: None,
259        };
260        let json = serde_json::to_string(&r).unwrap();
261        assert!(
262            !json.contains("collect_object"),
263            "collect_object must be absent when None: {json}"
264        );
265        // ...and a set key survives the round-trip.
266        r.collect_object = Some("PC1/collect-diagnostics/20260615T000000Z.zip".into());
267        let back: ExecResult = serde_json::from_str(&serde_json::to_string(&r).unwrap()).unwrap();
268        assert_eq!(
269            back.collect_object.as_deref(),
270            Some("PC1/collect-diagnostics/20260615T000000Z.zip"),
271        );
272    }
273}