1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// Prefix injected into the UUIDv5 name string for deriving legacy
/// `result_id`s. Fixed marker so two backends (or one backend across
/// restarts) projecting the same legacy payload arrive at the same
/// id. Tied to the standard `Uuid::NAMESPACE_OID` namespace below.
/// Bumping this prefix would break dedupe of legacy redeliveries
/// crossing the upgrade — don't.
const LEGACY_RESULT_ID_PREFIX: &str = "kanade-issue-19/legacy-result-id:";
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ExecResult {
/// v0.29 / Issue #19: agent-minted UUID, unique per (Command, PC)
/// run. Replaces `request_id` as the projector's primary key so
/// broadcast Commands (commands.all / commands.group.X) — where N
/// PCs share one `request_id` — finally persist all N results
/// instead of silently dropping all but the first. Pre-v0.29
/// agents omit this field; it deserialises as the empty string,
/// and [`Self::stable_result_id`] derives a deterministic UUIDv5
/// from `(request_id, pc_id)` so legacy payloads (a) get distinct
/// ids across broadcast PCs (PC #2's row stops being dropped) and
/// (b) get the SAME id on JetStream redelivery (the new `ON
/// CONFLICT(result_id) DO NOTHING` path correctly dedupes, so
/// `executions.success_count` doesn't double-count across retries).
#[serde(default)]
pub result_id: String,
/// The NATS reply token. Still surfaced for joining back to the
/// `kanade run` request/reply path. No longer unique across rows
/// (broadcast Commands share it).
pub request_id: String,
/// v0.29 / Issue #19: back-link to `executions.exec_id`. Copied
/// from `Command.exec_id` by the agent. `None` for ad-hoc
/// `kanade run` (no deployment) and for results emitted by
/// pre-v0.29 agents (decoded via `serde(default)`).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exec_id: Option<String>,
pub pc_id: String,
pub exit_code: i32,
pub stdout: String,
pub stderr: String,
pub started_at: chrono::DateTime<chrono::Utc>,
pub finished_at: chrono::DateTime<chrono::Utc>,
/// v0.13: the manifest id that produced this result. Sourced
/// from `Command.id` (which is the YAML `manifest.id`, e.g.
/// `"inventory-hw"`). Distinct from the per-deploy UUID stored
/// in `Command.exec_id`. The results projector uses this to
/// look up the manifest's `inventory:` hint and upsert
/// `inventory_facts` rows for inventory-tagged jobs.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub manifest_id: Option<String>,
}
impl ExecResult {
/// Return the `result_id` if the agent supplied one (v0.29+
/// payloads always do), otherwise derive a stable UUIDv5 from
/// `(request_id, pc_id)`. The projector calls this before INSERT
/// so legacy payloads still get a non-empty PK, AND so that
/// JetStream redeliveries of the same legacy payload hash to the
/// same id and dedupe via `ON CONFLICT`. Per-PC fan-out stays
/// distinct (different `pc_id` → different hash).
pub fn stable_result_id(&self) -> String {
if !self.result_id.is_empty() {
return self.result_id.clone();
}
let name = format!(
"{LEGACY_RESULT_ID_PREFIX}{}:{}",
self.request_id, self.pc_id
);
Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
#[test]
fn exec_result_round_trips_through_json() {
let t0 = chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap();
let t1 = chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 5).unwrap();
let r = ExecResult {
result_id: "result-uuid-1".into(),
request_id: "req-1".into(),
exec_id: Some("exec-uuid-1".into()),
pc_id: "minipc".into(),
exit_code: 0,
stdout: "hello\n".into(),
stderr: String::new(),
started_at: t0,
finished_at: t1,
manifest_id: Some("inventory-hw".into()),
};
let json = serde_json::to_string(&r).unwrap();
let back: ExecResult = serde_json::from_str(&json).unwrap();
assert_eq!(back.result_id, r.result_id);
assert_eq!(back.request_id, r.request_id);
assert_eq!(back.exec_id.as_deref(), Some("exec-uuid-1"));
assert_eq!(back.exit_code, r.exit_code);
assert_eq!(back.stdout, r.stdout);
assert_eq!(back.started_at, t0);
assert_eq!(back.finished_at, t1);
assert_eq!(back.manifest_id.as_deref(), Some("inventory-hw"));
}
#[test]
fn exec_result_without_manifest_id_decodes() {
// Older agents (pre-0.13) sent ExecResult with no manifest_id field.
let json = r#"{
"request_id":"r","pc_id":"x","exit_code":0,
"stdout":"","stderr":"",
"started_at":"2026-05-16T00:00:00Z",
"finished_at":"2026-05-16T00:00:00Z"
}"#;
let r: ExecResult = serde_json::from_str(json).unwrap();
assert_eq!(r.manifest_id, None);
}
#[test]
fn exec_result_without_result_id_decodes_empty() {
// v0.29 / Issue #19: pre-v0.29 agents don't send `result_id`.
// `#[serde(default)]` decodes it as the empty string so the
// projector can detect "legacy payload" and call
// `stable_result_id()` to derive a deterministic PK.
let json = r#"{
"request_id":"r","pc_id":"x","exit_code":0,
"stdout":"","stderr":"",
"started_at":"2026-05-16T00:00:00Z",
"finished_at":"2026-05-16T00:00:00Z"
}"#;
let r: ExecResult = serde_json::from_str(json).unwrap();
assert_eq!(r.result_id, "");
assert!(r.exec_id.is_none());
}
#[test]
fn stable_result_id_is_deterministic_for_legacy_payload() {
// Gemini #65 medium fix: legacy redeliveries (same request_id +
// pc_id) must hash to the SAME result_id so the projector's
// ON CONFLICT(result_id) DO NOTHING dedupes — otherwise
// `executions.success_count` double-counts on JetStream ack
// timeouts.
let json = r#"{
"request_id":"r","pc_id":"x","exit_code":0,
"stdout":"","stderr":"",
"started_at":"2026-05-16T00:00:00Z",
"finished_at":"2026-05-16T00:00:00Z"
}"#;
let a: ExecResult = serde_json::from_str(json).unwrap();
let b: ExecResult = serde_json::from_str(json).unwrap();
assert_eq!(
a.stable_result_id(),
b.stable_result_id(),
"same legacy payload must hash to the same result_id",
);
}
#[test]
fn stable_result_id_differs_across_pcs_for_broadcast() {
// The other half: a broadcast Command published to two PCs
// produces two legacy ExecResults sharing one request_id but
// with different pc_ids. Each must get its OWN result_id so
// both rows persist (the whole point of Issue #19).
let json_a = r#"{
"request_id":"shared","pc_id":"pc-1","exit_code":0,
"stdout":"","stderr":"",
"started_at":"2026-05-16T00:00:00Z",
"finished_at":"2026-05-16T00:00:00Z"
}"#;
let json_b = r#"{
"request_id":"shared","pc_id":"pc-2","exit_code":0,
"stdout":"","stderr":"",
"started_at":"2026-05-16T00:00:00Z",
"finished_at":"2026-05-16T00:00:00Z"
}"#;
let a: ExecResult = serde_json::from_str(json_a).unwrap();
let b: ExecResult = serde_json::from_str(json_b).unwrap();
assert_ne!(
a.stable_result_id(),
b.stable_result_id(),
"different pc_id must produce a different result_id",
);
}
#[test]
fn stable_result_id_passes_through_explicit_value() {
// v0.29 agents always supply result_id; the helper must
// return that as-is (no surprise re-hashing).
let r = ExecResult {
result_id: "agent-minted-uuid".into(),
request_id: "r".into(),
exec_id: None,
pc_id: "x".into(),
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
started_at: chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap(),
finished_at: chrono::Utc.with_ymd_and_hms(2026, 5, 16, 0, 0, 0).unwrap(),
manifest_id: None,
};
assert_eq!(r.stable_result_id(), "agent-minted-uuid");
}
}