kanade_shared/subject.rs
1pub const COMMANDS_ALL: &str = "commands.all";
2
3pub fn commands_group(name: &str) -> String {
4 format!("commands.group.{name}")
5}
6
7pub fn commands_pc(pc_id: &str) -> String {
8 format!("commands.pc.{pc_id}")
9}
10
11// `commands_exec` (subject `commands.exec.<job_id>`) was removed in
12// v0.22.1. The STREAM_EXEC stream now catches the existing
13// `commands.{all,group.X,pc.Y}` subjects directly, so the dedicated
14// per-exec subject isn't needed any more. See
15// `kanade-agent::command_replay` for how reconnecting agents catch
16// up on missed messages.
17
18pub fn results(request_id: &str) -> String {
19 format!("results.{request_id}")
20}
21
22pub fn heartbeat(pc_id: &str) -> String {
23 format!("heartbeat.{pc_id}")
24}
25
26/// `host_perf.<pc_id>` — Phase 1 of the perf telemetry pipeline. The
27/// agent publishes a whole-host CPU / Memory / Disk I/O / Network
28/// snapshot here on the cadence set by `host_perf_interval` in
29/// agent_config (default 60 s). Distinct subject from `heartbeat.<pc_id>`
30/// so the periodic heartbeat publisher stays untouched and pre-host_perf
31/// backends that don't subscribe simply ignore the new traffic.
32pub fn host_perf(pc_id: &str) -> String {
33 format!("host_perf.{pc_id}")
34}
35
36/// `process_perf.<pc_id>` — Phase 2: top-N per-process snapshot
37/// published only while `process_perf_enabled` is `true` AND the
38/// `process_perf_expires_at` deadline is in the future. Separate
39/// subject from `host_perf.<pc_id>` because process-perf is an
40/// opt-in investigation mode — having its own subject lets the
41/// projector skip the heavy table entirely for hosts that never
42/// turn it on.
43pub fn process_perf(pc_id: &str) -> String {
44 format!("process_perf.{pc_id}")
45}
46
47/// `kill.<exec_id>` — Spec §2.6 Layer 3 abort signal. The exec_id is
48/// the deployment / scheduler-fire UUID (formerly named `job_id`
49/// pre-v0.29; renamed for accuracy — every `Command.exec_id` is a
50/// per-deploy UUID, not a job-catalog id).
51pub fn kill(exec_id: &str) -> String {
52 format!("kill.{exec_id}")
53}
54
55pub fn inventory(pc_id: &str, category: &str) -> String {
56 format!("inventory.{pc_id}.{category}")
57}
58
59/// `events.started.<exec_id>.<pc_id>` — v0.30 / PR α' lifecycle
60/// event published by the agent just before spawning a script's
61/// child process. Lets the backend project an in-flight row into
62/// `execution_results` (with `finished_at = NULL`) so the SPA
63/// Activity table can show running rows alongside finished ones.
64/// Backend subscribes via [`EVENTS_STARTED_FILTER`].
65pub fn events_started(exec_id: &str, pc_id: &str) -> String {
66 format!("events.started.{exec_id}.{pc_id}")
67}
68
69/// Wildcard the backend events projector consumes on STREAM_EVENTS.
70/// Narrow (`events.started.>`) rather than the whole `events.>` so
71/// future event types can carry their own filters without rerouting
72/// the started subset.
73pub const EVENTS_STARTED_FILTER: &str = "events.started.>";
74
75pub const INVENTORY_HW: &str = "hw";
76pub const INVENTORY_SW: &str = "sw";
77pub const INVENTORY_NET: &str = "net";
78
79/// `logs.fetch.<pc_id>` — request/reply: operator (or backend) sends
80/// a `LogsRequest`; the addressed agent replies with the tail of its
81/// local log file. On-demand only, no stream.
82pub fn logs_fetch(pc_id: &str) -> String {
83 format!("logs.fetch.{pc_id}")
84}
85
86/// `agents.<pc_id>.ping` — v0.38 / #133 request/reply for the
87/// active "ping" round-trip. The agent answers with a fresh
88/// `Heartbeat` on demand instead of the backend waiting up to ~30 s
89/// for the next periodic heartbeat tick to land. Distinct subject
90/// from `heartbeat.<pc_id>` so the periodic publisher is unaffected
91/// and old agents that don't subscribe simply time the request out.
92pub fn ping(pc_id: &str) -> String {
93 format!("agents.{pc_id}.ping")
94}
95
96// v0.14: subject::inventory_request was retired alongside the
97// hardcoded inventory loop. On-demand collection now goes through
98// the normal exec path (`kanade exec configs/jobs/inventory-
99// hw.yaml`) — Command + ExecResult + the inventory-fact projector
100// give operators the same effect with no extra subject.
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 #[test]
107 fn commands_all_constant() {
108 assert_eq!(COMMANDS_ALL, "commands.all");
109 }
110
111 #[test]
112 fn commands_group_formats_name() {
113 assert_eq!(commands_group("canary"), "commands.group.canary");
114 assert_eq!(commands_group("wave1"), "commands.group.wave1");
115 }
116
117 #[test]
118 fn commands_pc_formats_id() {
119 assert_eq!(commands_pc("minipc"), "commands.pc.minipc");
120 assert_eq!(commands_pc("PC1234"), "commands.pc.PC1234");
121 }
122
123 #[test]
124 fn results_formats_request_id() {
125 assert_eq!(results("req-1"), "results.req-1");
126 }
127
128 #[test]
129 fn heartbeat_formats_pc_id() {
130 assert_eq!(heartbeat("minipc"), "heartbeat.minipc");
131 }
132
133 #[test]
134 fn host_perf_formats_pc_id() {
135 assert_eq!(host_perf("minipc"), "host_perf.minipc");
136 assert_eq!(host_perf("PC1234"), "host_perf.PC1234");
137 }
138
139 #[test]
140 fn process_perf_formats_pc_id() {
141 assert_eq!(process_perf("minipc"), "process_perf.minipc");
142 assert_eq!(process_perf("PC1234"), "process_perf.PC1234");
143 }
144
145 #[test]
146 fn kill_formats_exec_id() {
147 assert_eq!(kill("exec-uuid-1"), "kill.exec-uuid-1");
148 }
149
150 #[test]
151 fn logs_fetch_formats_pc_id() {
152 assert_eq!(logs_fetch("minipc"), "logs.fetch.minipc");
153 }
154
155 #[test]
156 fn ping_formats_pc_id() {
157 assert_eq!(ping("minipc"), "agents.minipc.ping");
158 }
159
160 #[test]
161 fn events_started_formats_exec_id_and_pc_id() {
162 assert_eq!(
163 events_started("exec-uuid-1", "minipc"),
164 "events.started.exec-uuid-1.minipc",
165 );
166 }
167
168 #[test]
169 fn events_started_filter_is_narrow_wildcard() {
170 assert_eq!(EVENTS_STARTED_FILTER, "events.started.>");
171 }
172
173 #[test]
174 fn inventory_formats_pc_id_and_category() {
175 assert_eq!(inventory("minipc", "hw"), "inventory.minipc.hw");
176 assert_eq!(inventory("minipc", INVENTORY_HW), "inventory.minipc.hw");
177 assert_eq!(inventory("minipc", INVENTORY_SW), "inventory.minipc.sw");
178 assert_eq!(inventory("minipc", INVENTORY_NET), "inventory.minipc.net");
179 }
180}