Skip to main content

kanade_shared/
subject.rs

1pub const COMMANDS_ALL: &str = "commands.all";
2
3pub fn commands_group(name: &str) -> String {
4    format!("commands.group.{name}")
5}
6
7pub fn commands_pc(pc_id: &str) -> String {
8    format!("commands.pc.{pc_id}")
9}
10
11// `commands_exec` (subject `commands.exec.<job_id>`) was removed in
12// v0.22.1. The STREAM_EXEC stream now catches the existing
13// `commands.{all,group.X,pc.Y}` subjects directly, so the dedicated
14// per-exec subject isn't needed any more. See
15// `kanade-agent::command_replay` for how reconnecting agents catch
16// up on missed messages.
17
18pub fn results(request_id: &str) -> String {
19    format!("results.{request_id}")
20}
21
22pub fn heartbeat(pc_id: &str) -> String {
23    format!("heartbeat.{pc_id}")
24}
25
26/// `host_perf.<pc_id>` — Phase 1 of the perf telemetry pipeline. The
27/// agent publishes a whole-host CPU / Memory / Disk I/O / Network
28/// snapshot here on the cadence set by `host_perf_interval` in
29/// agent_config (default 60 s). Distinct subject from `heartbeat.<pc_id>`
30/// so the periodic heartbeat publisher stays untouched and pre-host_perf
31/// backends that don't subscribe simply ignore the new traffic.
32pub fn host_perf(pc_id: &str) -> String {
33    format!("host_perf.{pc_id}")
34}
35
36/// `process_perf.<pc_id>` — Phase 2: top-N per-process snapshot
37/// published only while `process_perf_enabled` is `true` AND the
38/// `process_perf_expires_at` deadline is in the future. Separate
39/// subject from `host_perf.<pc_id>` because process-perf is an
40/// opt-in investigation mode — having its own subject lets the
41/// projector skip the heavy table entirely for hosts that never
42/// turn it on.
43pub fn process_perf(pc_id: &str) -> String {
44    format!("process_perf.{pc_id}")
45}
46
47/// `obs.<pc_id>` — per-PC observability event stream (Issue #246).
48/// The agent publishes one [`crate::wire::ObsEvent`] per timeline
49/// event (sign-in / out, power on / off, sleep / resume, agent
50/// milestones, diagnostic bundle pointers). Distinct from
51/// `events.started.*` (in-flight script lifecycle) and
52/// `host_perf.<pc_id>` (numeric telemetry) — `obs.*` is the
53/// semantic-event stream the SPA Timeline page consumes.
54pub fn obs(pc_id: &str) -> String {
55    format!("obs.{pc_id}")
56}
57
58/// `obs.>` — filter the backend projector subscribes to so a new
59/// PC starts flowing into the timeline without any per-PC SUB
60/// registration. Pairs with [`obs`] for publish.
61pub const OBS_FILTER: &str = "obs.>";
62
63/// `kill.<exec_id>` — Spec §2.6 Layer 3 abort signal. The exec_id is
64/// the deployment / scheduler-fire UUID (formerly named `job_id`
65/// pre-v0.29; renamed for accuracy — every `Command.exec_id` is a
66/// per-deploy UUID, not a job-catalog id).
67pub fn kill(exec_id: &str) -> String {
68    format!("kill.{exec_id}")
69}
70
71pub fn inventory(pc_id: &str, category: &str) -> String {
72    format!("inventory.{pc_id}.{category}")
73}
74
75/// `events.started.<exec_id>.<pc_id>` — v0.30 / PR α' lifecycle
76/// event published by the agent just before spawning a script's
77/// child process. Lets the backend project an in-flight row into
78/// `execution_results` (with `finished_at = NULL`) so the SPA
79/// Activity table can show running rows alongside finished ones.
80/// Backend subscribes via [`EVENTS_STARTED_FILTER`].
81pub fn events_started(exec_id: &str, pc_id: &str) -> String {
82    format!("events.started.{exec_id}.{pc_id}")
83}
84
85/// Wildcard the backend events projector consumes on STREAM_EVENTS.
86/// Narrow (`events.started.>`) rather than the whole `events.>` so
87/// future event types can carry their own filters without rerouting
88/// the started subset.
89pub const EVENTS_STARTED_FILTER: &str = "events.started.>";
90
91pub const INVENTORY_HW: &str = "hw";
92pub const INVENTORY_SW: &str = "sw";
93pub const INVENTORY_NET: &str = "net";
94
95/// `logs.fetch.<pc_id>` — request/reply: operator (or backend) sends
96/// a `LogsRequest`; the addressed agent replies with the tail of its
97/// local log file. On-demand only, no stream.
98pub fn logs_fetch(pc_id: &str) -> String {
99    format!("logs.fetch.{pc_id}")
100}
101
102/// `job.tail.<pc_id>` — request/reply for the live tail of a
103/// still-running job's stdout/stderr. The operator (or backend, on
104/// the SPA's behalf) sends a [`crate::wire::JobTailRequest`] carrying
105/// the `result_id`; the addressed agent replies with the current
106/// ring-buffer tail from its in-memory live registry. On-demand only,
107/// no stream — the SPA polls this every few seconds (same shape as
108/// `logs.fetch.<pc_id>`) while a job is in flight. Distinct subject
109/// from `logs.fetch.<pc_id>` (whole-agent log file) because this is
110/// scoped to a single job's captured output, not the agent's log.
111pub fn job_tail(pc_id: &str) -> String {
112    format!("job.tail.{pc_id}")
113}
114
115/// `agents.<pc_id>.ping` — v0.38 / #133 request/reply for the
116/// active "ping" round-trip. The agent answers with a fresh
117/// `Heartbeat` on demand instead of the backend waiting up to ~30 s
118/// for the next periodic heartbeat tick to land. Distinct subject
119/// from `heartbeat.<pc_id>` so the periodic publisher is unaffected
120/// and old agents that don't subscribe simply time the request out.
121pub fn ping(pc_id: &str) -> String {
122    format!("agents.{pc_id}.ping")
123}
124
125// v0.14: subject::inventory_request was retired alongside the
126// hardcoded inventory loop. On-demand collection now goes through
127// the normal exec path (`kanade exec configs/jobs/inventory-
128// hw.yaml`) — Command + ExecResult + the inventory-fact projector
129// give operators the same effect with no extra subject.
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn commands_all_constant() {
137        assert_eq!(COMMANDS_ALL, "commands.all");
138    }
139
140    #[test]
141    fn commands_group_formats_name() {
142        assert_eq!(commands_group("canary"), "commands.group.canary");
143        assert_eq!(commands_group("wave1"), "commands.group.wave1");
144    }
145
146    #[test]
147    fn commands_pc_formats_id() {
148        assert_eq!(commands_pc("pc-01"), "commands.pc.pc-01");
149        assert_eq!(commands_pc("PC1234"), "commands.pc.PC1234");
150    }
151
152    #[test]
153    fn results_formats_request_id() {
154        assert_eq!(results("req-1"), "results.req-1");
155    }
156
157    #[test]
158    fn heartbeat_formats_pc_id() {
159        assert_eq!(heartbeat("pc-01"), "heartbeat.pc-01");
160    }
161
162    #[test]
163    fn host_perf_formats_pc_id() {
164        assert_eq!(host_perf("pc-01"), "host_perf.pc-01");
165        assert_eq!(host_perf("PC1234"), "host_perf.PC1234");
166    }
167
168    #[test]
169    fn process_perf_formats_pc_id() {
170        assert_eq!(process_perf("pc-01"), "process_perf.pc-01");
171        assert_eq!(process_perf("PC1234"), "process_perf.PC1234");
172    }
173
174    #[test]
175    fn obs_formats_pc_id() {
176        assert_eq!(obs("pc-01"), "obs.pc-01");
177        assert_eq!(obs("PC1234"), "obs.PC1234");
178    }
179
180    #[test]
181    fn obs_filter_constant() {
182        assert_eq!(OBS_FILTER, "obs.>");
183    }
184
185    #[test]
186    fn kill_formats_exec_id() {
187        assert_eq!(kill("exec-uuid-1"), "kill.exec-uuid-1");
188    }
189
190    #[test]
191    fn logs_fetch_formats_pc_id() {
192        assert_eq!(logs_fetch("pc-01"), "logs.fetch.pc-01");
193    }
194
195    #[test]
196    fn ping_formats_pc_id() {
197        assert_eq!(ping("pc-01"), "agents.pc-01.ping");
198    }
199
200    #[test]
201    fn job_tail_formats_pc_id() {
202        assert_eq!(job_tail("pc-01"), "job.tail.pc-01");
203        assert_eq!(job_tail("PC1234"), "job.tail.PC1234");
204    }
205
206    #[test]
207    fn events_started_formats_exec_id_and_pc_id() {
208        assert_eq!(
209            events_started("exec-uuid-1", "pc-01"),
210            "events.started.exec-uuid-1.pc-01",
211        );
212    }
213
214    #[test]
215    fn events_started_filter_is_narrow_wildcard() {
216        assert_eq!(EVENTS_STARTED_FILTER, "events.started.>");
217    }
218
219    #[test]
220    fn inventory_formats_pc_id_and_category() {
221        assert_eq!(inventory("pc-01", "hw"), "inventory.pc-01.hw");
222        assert_eq!(inventory("pc-01", INVENTORY_HW), "inventory.pc-01.hw");
223        assert_eq!(inventory("pc-01", INVENTORY_SW), "inventory.pc-01.sw");
224        assert_eq!(inventory("pc-01", INVENTORY_NET), "inventory.pc-01.net");
225    }
226}