Skip to main content

kanade_shared/
subject.rs

1pub const COMMANDS_ALL: &str = "commands.all";
2
3pub fn commands_group(name: &str) -> String {
4    format!("commands.group.{name}")
5}
6
7pub fn commands_pc(pc_id: &str) -> String {
8    format!("commands.pc.{pc_id}")
9}
10
11// `commands_exec` (subject `commands.exec.<job_id>`) was removed in
12// v0.22.1. The STREAM_EXEC stream now catches the existing
13// `commands.{all,group.X,pc.Y}` subjects directly, so the dedicated
14// per-exec subject isn't needed any more. See
15// `kanade-agent::command_replay` for how reconnecting agents catch
16// up on missed messages.
17
18pub fn results(request_id: &str) -> String {
19    format!("results.{request_id}")
20}
21
22pub fn heartbeat(pc_id: &str) -> String {
23    format!("heartbeat.{pc_id}")
24}
25
26/// `host_perf.<pc_id>` — Phase 1 of the perf telemetry pipeline. The
27/// agent publishes a whole-host CPU / Memory / Disk I/O / Network
28/// snapshot here on the cadence set by `host_perf_interval` in
29/// agent_config (default 60 s). Distinct subject from `heartbeat.<pc_id>`
30/// so the periodic heartbeat publisher stays untouched and pre-host_perf
31/// backends that don't subscribe simply ignore the new traffic.
32pub fn host_perf(pc_id: &str) -> String {
33    format!("host_perf.{pc_id}")
34}
35
36/// `kill.<exec_id>` — Spec §2.6 Layer 3 abort signal. The exec_id is
37/// the deployment / scheduler-fire UUID (formerly named `job_id`
38/// pre-v0.29; renamed for accuracy — every `Command.exec_id` is a
39/// per-deploy UUID, not a job-catalog id).
40pub fn kill(exec_id: &str) -> String {
41    format!("kill.{exec_id}")
42}
43
44pub fn inventory(pc_id: &str, category: &str) -> String {
45    format!("inventory.{pc_id}.{category}")
46}
47
48/// `events.started.<exec_id>.<pc_id>` — v0.30 / PR α' lifecycle
49/// event published by the agent just before spawning a script's
50/// child process. Lets the backend project an in-flight row into
51/// `execution_results` (with `finished_at = NULL`) so the SPA
52/// Activity table can show running rows alongside finished ones.
53/// Backend subscribes via [`EVENTS_STARTED_FILTER`].
54pub fn events_started(exec_id: &str, pc_id: &str) -> String {
55    format!("events.started.{exec_id}.{pc_id}")
56}
57
58/// Wildcard the backend events projector consumes on STREAM_EVENTS.
59/// Narrow (`events.started.>`) rather than the whole `events.>` so
60/// future event types can carry their own filters without rerouting
61/// the started subset.
62pub const EVENTS_STARTED_FILTER: &str = "events.started.>";
63
64pub const INVENTORY_HW: &str = "hw";
65pub const INVENTORY_SW: &str = "sw";
66pub const INVENTORY_NET: &str = "net";
67
68/// `logs.fetch.<pc_id>` — request/reply: operator (or backend) sends
69/// a `LogsRequest`; the addressed agent replies with the tail of its
70/// local log file. On-demand only, no stream.
71pub fn logs_fetch(pc_id: &str) -> String {
72    format!("logs.fetch.{pc_id}")
73}
74
75/// `agents.<pc_id>.ping` — v0.38 / #133 request/reply for the
76/// active "ping" round-trip. The agent answers with a fresh
77/// `Heartbeat` on demand instead of the backend waiting up to ~30 s
78/// for the next periodic heartbeat tick to land. Distinct subject
79/// from `heartbeat.<pc_id>` so the periodic publisher is unaffected
80/// and old agents that don't subscribe simply time the request out.
81pub fn ping(pc_id: &str) -> String {
82    format!("agents.{pc_id}.ping")
83}
84
85// v0.14: subject::inventory_request was retired alongside the
86// hardcoded inventory loop. On-demand collection now goes through
87// the normal exec path (`kanade exec configs/jobs/inventory-
88// hw.yaml`) — Command + ExecResult + the inventory-fact projector
89// give operators the same effect with no extra subject.
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94
95    #[test]
96    fn commands_all_constant() {
97        assert_eq!(COMMANDS_ALL, "commands.all");
98    }
99
100    #[test]
101    fn commands_group_formats_name() {
102        assert_eq!(commands_group("canary"), "commands.group.canary");
103        assert_eq!(commands_group("wave1"), "commands.group.wave1");
104    }
105
106    #[test]
107    fn commands_pc_formats_id() {
108        assert_eq!(commands_pc("minipc"), "commands.pc.minipc");
109        assert_eq!(commands_pc("PC1234"), "commands.pc.PC1234");
110    }
111
112    #[test]
113    fn results_formats_request_id() {
114        assert_eq!(results("req-1"), "results.req-1");
115    }
116
117    #[test]
118    fn heartbeat_formats_pc_id() {
119        assert_eq!(heartbeat("minipc"), "heartbeat.minipc");
120    }
121
122    #[test]
123    fn host_perf_formats_pc_id() {
124        assert_eq!(host_perf("minipc"), "host_perf.minipc");
125        assert_eq!(host_perf("PC1234"), "host_perf.PC1234");
126    }
127
128    #[test]
129    fn kill_formats_exec_id() {
130        assert_eq!(kill("exec-uuid-1"), "kill.exec-uuid-1");
131    }
132
133    #[test]
134    fn logs_fetch_formats_pc_id() {
135        assert_eq!(logs_fetch("minipc"), "logs.fetch.minipc");
136    }
137
138    #[test]
139    fn ping_formats_pc_id() {
140        assert_eq!(ping("minipc"), "agents.minipc.ping");
141    }
142
143    #[test]
144    fn events_started_formats_exec_id_and_pc_id() {
145        assert_eq!(
146            events_started("exec-uuid-1", "minipc"),
147            "events.started.exec-uuid-1.minipc",
148        );
149    }
150
151    #[test]
152    fn events_started_filter_is_narrow_wildcard() {
153        assert_eq!(EVENTS_STARTED_FILTER, "events.started.>");
154    }
155
156    #[test]
157    fn inventory_formats_pc_id_and_category() {
158        assert_eq!(inventory("minipc", "hw"), "inventory.minipc.hw");
159        assert_eq!(inventory("minipc", INVENTORY_HW), "inventory.minipc.hw");
160        assert_eq!(inventory("minipc", INVENTORY_SW), "inventory.minipc.sw");
161        assert_eq!(inventory("minipc", INVENTORY_NET), "inventory.minipc.net");
162    }
163}