kanade_shared/wire/result.rs
1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4/// Prefix injected into the UUIDv5 name string for deriving legacy
5/// `result_id`s. Fixed marker so two backends (or one backend across
6/// restarts) projecting the same legacy payload arrive at the same
7/// id. Tied to the standard `Uuid::NAMESPACE_OID` namespace below.
8/// Bumping this prefix would break dedupe of legacy redeliveries
9/// crossing the upgrade — don't.
10const LEGACY_RESULT_ID_PREFIX: &str = "kanade-issue-19/legacy-result-id:";
11
12#[derive(Serialize, Deserialize, Debug, Clone)]
13pub struct ExecResult {
14 /// v0.29 / Issue #19: agent-minted UUID, unique per (Command, PC)
15 /// run. Replaces `request_id` as the projector's primary key so
16 /// broadcast Commands (commands.all / commands.group.X) — where N
17 /// PCs share one `request_id` — finally persist all N results
18 /// instead of silently dropping all but the first. Pre-v0.29
19 /// agents omit this field; it deserialises as the empty string,
20 /// and [`Self::stable_result_id`] derives a deterministic UUIDv5
21 /// from `(request_id, pc_id)` so legacy payloads (a) get distinct
22 /// ids across broadcast PCs (PC #2's row stops being dropped) and
23 /// (b) get the SAME id on JetStream redelivery (the new `ON
24 /// CONFLICT(result_id) DO NOTHING` path correctly dedupes, so
25 /// `executions.success_count` doesn't double-count across retries).
26 #[serde(default)]
27 pub result_id: String,
28 /// The NATS reply token. Still surfaced for joining back to the
29 /// `kanade run` request/reply path. No longer unique across rows
30 /// (broadcast Commands share it).
31 pub request_id: String,
32 /// v0.29 / Issue #19: back-link to `executions.exec_id`. Copied
33 /// from `Command.exec_id` by the agent. `None` for ad-hoc
34 /// `kanade run` (no deployment) and for results emitted by
35 /// pre-v0.29 agents (decoded via `serde(default)`).
36 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub exec_id: Option<String>,
38 pub pc_id: String,
39 pub exit_code: i32,
40 /// stdout. Empty string when [`Self::stdout_object`] is set — the
41 /// agent overflowed the bytes into [`crate::kv::OBJECT_RESULT_OUTPUT`]
42 /// because the inline payload would have exceeded NATS's default
43 /// `max_payload` (#227). The backend projector derefs the pointer
44 /// before inserting; SQLite still stores the full text inline so
45 /// the SPA Activity page reads unchanged.
46 pub stdout: String,
47 pub stderr: String,
48 pub started_at: chrono::DateTime<chrono::Utc>,
49 pub finished_at: chrono::DateTime<chrono::Utc>,
50 /// Object Store key under [`crate::kv::OBJECT_RESULT_OUTPUT`] when
51 /// `stdout` overflowed the agent's inline threshold (#227). Set to
52 /// `Some("<request_id>/stdout")` by the agent's outbox drain; the
53 /// backend projector fetches the bytes from that key and uses them
54 /// in place of the (empty) `stdout` field. `None` for the common
55 /// small-stdout case + every pre-#227 payload (`serde(default)`
56 /// keeps older results decodable).
57 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub stdout_object: Option<String>,
59 /// Sibling of `stdout_object` for the stderr stream. Same key
60 /// shape (`<request_id>/stderr`).
61 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub stderr_object: Option<String>,
63 /// v0.13: the manifest id that produced this result. Sourced
64 /// from `Command.id` (which is the YAML `manifest.id`, e.g.
65 /// `"inventory-hw"`). Distinct from the per-deploy UUID stored
66 /// in `Command.exec_id`. The results projector uses this to
67 /// look up the manifest's `inventory:` hint and upsert
68 /// `inventory_facts` rows for inventory-tagged jobs.
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub manifest_id: Option<String>,
71}
72
73impl ExecResult {
74 /// Return the `result_id` if the agent supplied one (v0.29+
75 /// payloads always do), otherwise derive a stable UUIDv5 from
76 /// `(request_id, pc_id)`. The projector calls this before INSERT
77 /// so legacy payloads still get a non-empty PK, AND so that
78 /// JetStream redeliveries of the same legacy payload hash to the
79 /// same id and dedupe via `ON CONFLICT`. Per-PC fan-out stays
80 /// distinct (different `pc_id` → different hash).
81 pub fn stable_result_id(&self) -> String {
82 if !self.result_id.is_empty() {
83 return self.result_id.clone();
84 }
85 let name = format!(
86 "{LEGACY_RESULT_ID_PREFIX}{}:{}",
87 self.request_id, self.pc_id
88 );
89 Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96 use chrono::TimeZone;
97
98 #[test]
99 fn exec_result_round_trips_through_json() {
100 let t0 = chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap();
101 let t1 = chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 5).unwrap();
102 let r = ExecResult {
103 result_id: "result-uuid-1".into(),
104 request_id: "req-1".into(),
105 exec_id: Some("exec-uuid-1".into()),
106 pc_id: "minipc".into(),
107 exit_code: 0,
108 stdout: "hello\n".into(),
109 stderr: String::new(),
110 started_at: t0,
111 finished_at: t1,
112 stdout_object: None,
113 stderr_object: None,
114 manifest_id: Some("inventory-hw".into()),
115 };
116 let json = serde_json::to_string(&r).unwrap();
117 let back: ExecResult = serde_json::from_str(&json).unwrap();
118 assert_eq!(back.result_id, r.result_id);
119 assert_eq!(back.request_id, r.request_id);
120 assert_eq!(back.exec_id.as_deref(), Some("exec-uuid-1"));
121 assert_eq!(back.exit_code, r.exit_code);
122 assert_eq!(back.stdout, r.stdout);
123 assert_eq!(back.started_at, t0);
124 assert_eq!(back.finished_at, t1);
125 assert_eq!(back.manifest_id.as_deref(), Some("inventory-hw"));
126 }
127
128 #[test]
129 fn exec_result_without_manifest_id_decodes() {
130 // Older agents (pre-0.13) sent ExecResult with no manifest_id field.
131 let json = r#"{
132 "request_id":"r","pc_id":"x","exit_code":0,
133 "stdout":"","stderr":"",
134 "started_at":"2026-05-16T00:00:00Z",
135 "finished_at":"2026-05-16T00:00:00Z"
136 }"#;
137 let r: ExecResult = serde_json::from_str(json).unwrap();
138 assert_eq!(r.manifest_id, None);
139 }
140
141 #[test]
142 fn exec_result_without_result_id_decodes_empty() {
143 // v0.29 / Issue #19: pre-v0.29 agents don't send `result_id`.
144 // `#[serde(default)]` decodes it as the empty string so the
145 // projector can detect "legacy payload" and call
146 // `stable_result_id()` to derive a deterministic PK.
147 let json = r#"{
148 "request_id":"r","pc_id":"x","exit_code":0,
149 "stdout":"","stderr":"",
150 "started_at":"2026-05-16T00:00:00Z",
151 "finished_at":"2026-05-16T00:00:00Z"
152 }"#;
153 let r: ExecResult = serde_json::from_str(json).unwrap();
154 assert_eq!(r.result_id, "");
155 assert!(r.exec_id.is_none());
156 }
157
158 #[test]
159 fn stable_result_id_is_deterministic_for_legacy_payload() {
160 // Gemini #65 medium fix: legacy redeliveries (same request_id +
161 // pc_id) must hash to the SAME result_id so the projector's
162 // ON CONFLICT(result_id) DO NOTHING dedupes — otherwise
163 // `executions.success_count` double-counts on JetStream ack
164 // timeouts.
165 let json = r#"{
166 "request_id":"r","pc_id":"x","exit_code":0,
167 "stdout":"","stderr":"",
168 "started_at":"2026-05-16T00:00:00Z",
169 "finished_at":"2026-05-16T00:00:00Z"
170 }"#;
171 let a: ExecResult = serde_json::from_str(json).unwrap();
172 let b: ExecResult = serde_json::from_str(json).unwrap();
173 assert_eq!(
174 a.stable_result_id(),
175 b.stable_result_id(),
176 "same legacy payload must hash to the same result_id",
177 );
178 }
179
180 #[test]
181 fn stable_result_id_differs_across_pcs_for_broadcast() {
182 // The other half: a broadcast Command published to two PCs
183 // produces two legacy ExecResults sharing one request_id but
184 // with different pc_ids. Each must get its OWN result_id so
185 // both rows persist (the whole point of Issue #19).
186 let json_a = r#"{
187 "request_id":"shared","pc_id":"pc-1","exit_code":0,
188 "stdout":"","stderr":"",
189 "started_at":"2026-05-16T00:00:00Z",
190 "finished_at":"2026-05-16T00:00:00Z"
191 }"#;
192 let json_b = r#"{
193 "request_id":"shared","pc_id":"pc-2","exit_code":0,
194 "stdout":"","stderr":"",
195 "started_at":"2026-05-16T00:00:00Z",
196 "finished_at":"2026-05-16T00:00:00Z"
197 }"#;
198 let a: ExecResult = serde_json::from_str(json_a).unwrap();
199 let b: ExecResult = serde_json::from_str(json_b).unwrap();
200 assert_ne!(
201 a.stable_result_id(),
202 b.stable_result_id(),
203 "different pc_id must produce a different result_id",
204 );
205 }
206
207 #[test]
208 fn stable_result_id_passes_through_explicit_value() {
209 // v0.29 agents always supply result_id; the helper must
210 // return that as-is (no surprise re-hashing).
211 let r = ExecResult {
212 result_id: "agent-minted-uuid".into(),
213 request_id: "r".into(),
214 exec_id: None,
215 pc_id: "x".into(),
216 exit_code: 0,
217 stdout: String::new(),
218 stderr: String::new(),
219 started_at: chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap(),
220 finished_at: chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap(),
221 stdout_object: None,
222 stderr_object: None,
223 manifest_id: None,
224 };
225 assert_eq!(r.stable_result_id(), "agent-minted-uuid");
226 }
227}