kanade_shared/subject.rs
1pub const COMMANDS_ALL: &str = "commands.all";
2
3pub fn commands_group(name: &str) -> String {
4 format!("commands.group.{name}")
5}
6
7pub fn commands_pc(pc_id: &str) -> String {
8 format!("commands.pc.{pc_id}")
9}
10
11/// `notifications.all` — broadcast end-user notification (SPEC
12/// §2.2.1 / Phase E). Mirrors [`COMMANDS_ALL`] but on the
13/// notification fan-out plane: the backend publishes here, the
14/// `NOTIFICATIONS` stream retains it, and every agent forwards it to
15/// the Client Apps in matching user sessions.
16pub const NOTIFICATIONS_ALL: &str = "notifications.all";
17
18/// Subject prefix for [`notifications_group`]. Exposed so callers that
19/// parse a subject back into its group name (the backend's notification
20/// audience resolver) strip against the same string the builder emits —
21/// if the format ever changes, both move together instead of the parser
22/// silently failing to match.
23pub const NOTIFICATIONS_GROUP_PREFIX: &str = "notifications.group.";
24
25/// Subject prefix for [`notifications_pc`]. See
26/// [`NOTIFICATIONS_GROUP_PREFIX`].
27pub const NOTIFICATIONS_PC_PREFIX: &str = "notifications.pc.";
28
29/// `notifications.group.{group_name}` — group-scoped end-user
30/// notification. Sibling of [`commands_group`] on the notification
31/// plane.
32pub fn notifications_group(name: &str) -> String {
33 format!("{NOTIFICATIONS_GROUP_PREFIX}{name}")
34}
35
36/// `notifications.pc.{pc_id}` — single-PC end-user notification.
37/// Sibling of [`commands_pc`] on the notification plane.
38pub fn notifications_pc(pc_id: &str) -> String {
39 format!("{NOTIFICATIONS_PC_PREFIX}{pc_id}")
40}
41
42/// `events.notifications.acked.{pc_id}.{user_sid}.{notif_id}` — the
43/// agent publishes this when a user clicks "確認" on a notification
44/// (SPEC §2.2.2 / Phase E). The `{user_sid}` segment distinguishes
45/// concurrent users on a shared PC (Fast User Switching / RDP). Lives
46/// under `events.>` so the existing `EVENTS` stream retains it; the
47/// backend's notification-acks projector consumes the narrowed
48/// [`EVENTS_NOTIFICATIONS_ACKED_FILTER`] to build the SPA's
49/// per-recipient confirmation view.
50///
51/// Subject spelling is fixed by SPEC §2.2.2 / §2.12.8 (`events.>`), so
52/// acks ride the `EVENTS` stream's retention (shorter than the 90-day
53/// `NOTIFICATIONS` history), not the notification stream's. That only
54/// bounds **re-projection** after a `-WipeDb`: the durable source of
55/// truth for ack_status is the `notification_acks` SQLite table, which
56/// persists independently — so a live fleet keeps full ack history;
57/// only a DB wipe truncates re-derivable acks to the EVENTS window,
58/// the same limitation every `events.*`-projected table already has.
59pub fn events_notifications_acked(pc_id: &str, user_sid: &str, notif_id: &str) -> String {
60 format!("events.notifications.acked.{pc_id}.{user_sid}.{notif_id}")
61}
62
63// `commands_exec` (subject `commands.exec.<job_id>`) was removed in
64// v0.22.1. The STREAM_EXEC stream now catches the existing
65// `commands.{all,group.X,pc.Y}` subjects directly, so the dedicated
66// per-exec subject isn't needed any more. See
67// `kanade-agent::command_replay` for how reconnecting agents catch
68// up on missed messages.
69
70pub fn results(request_id: &str) -> String {
71 format!("results.{request_id}")
72}
73
74pub fn heartbeat(pc_id: &str) -> String {
75 format!("heartbeat.{pc_id}")
76}
77
78/// `host_perf.<pc_id>` — Phase 1 of the perf telemetry pipeline. The
79/// agent publishes a whole-host CPU / Memory / Disk I/O / Network
80/// snapshot here on the cadence set by `host_perf_interval` in
81/// agent_config (default 60 s). Distinct subject from `heartbeat.<pc_id>`
82/// so the periodic heartbeat publisher stays untouched and pre-host_perf
83/// backends that don't subscribe simply ignore the new traffic.
84pub fn host_perf(pc_id: &str) -> String {
85 format!("host_perf.{pc_id}")
86}
87
88/// `process_perf.<pc_id>` — Phase 2: top-N per-process snapshot
89/// published only while `process_perf_enabled` is `true` AND the
90/// `process_perf_expires_at` deadline is in the future. Separate
91/// subject from `host_perf.<pc_id>` because process-perf is an
92/// opt-in investigation mode — having its own subject lets the
93/// projector skip the heavy table entirely for hosts that never
94/// turn it on.
95pub fn process_perf(pc_id: &str) -> String {
96 format!("process_perf.{pc_id}")
97}
98
99/// `obs.<pc_id>` — per-PC observability event stream (Issue #246).
100/// The agent publishes one [`crate::wire::ObsEvent`] per timeline
101/// event (sign-in / out, power on / off, sleep / resume, agent
102/// milestones, diagnostic bundle pointers). Distinct from
103/// `events.started.*` (in-flight script lifecycle) and
104/// `host_perf.<pc_id>` (numeric telemetry) — `obs.*` is the
105/// semantic-event stream the SPA Timeline page consumes.
106pub fn obs(pc_id: &str) -> String {
107 format!("obs.{pc_id}")
108}
109
110/// `obs.>` — filter the backend projector subscribes to so a new
111/// PC starts flowing into the timeline without any per-PC SUB
112/// registration. Pairs with [`obs`] for publish.
113pub const OBS_FILTER: &str = "obs.>";
114
115/// `kill.<exec_id>` — Spec §2.6 Layer 3 abort signal. The exec_id is
116/// the deployment / scheduler-fire UUID (formerly named `job_id`
117/// pre-v0.29; renamed for accuracy — every `Command.exec_id` is a
118/// per-deploy UUID, not a job-catalog id).
119pub fn kill(exec_id: &str) -> String {
120 format!("kill.{exec_id}")
121}
122
123pub fn inventory(pc_id: &str, category: &str) -> String {
124 format!("inventory.{pc_id}.{category}")
125}
126
127/// `events.started.<exec_id>.<pc_id>` — v0.30 / PR α' lifecycle
128/// event published by the agent just before spawning a script's
129/// child process. Lets the backend project an in-flight row into
130/// `execution_results` (with `finished_at = NULL`) so the SPA
131/// Activity table can show running rows alongside finished ones.
132/// Backend subscribes via [`EVENTS_STARTED_FILTER`].
133pub fn events_started(exec_id: &str, pc_id: &str) -> String {
134 format!("events.started.{exec_id}.{pc_id}")
135}
136
137/// Wildcard the backend events projector consumes on STREAM_EVENTS.
138/// Narrow (`events.started.>`) rather than the whole `events.>` so
139/// future event types can carry their own filters without rerouting
140/// the started subset.
141pub const EVENTS_STARTED_FILTER: &str = "events.started.>";
142
143/// Wildcard the backend notification-acks projector consumes on
144/// `STREAM_EVENTS`. Narrow (`events.notifications.acked.>`) rather
145/// than the whole `events.>` so the projector only wakes for ack
146/// events and not the high-volume `events.started.*` lifecycle
147/// traffic (which the events projector handles separately).
148pub const EVENTS_NOTIFICATIONS_ACKED_FILTER: &str = "events.notifications.acked.>";
149
150pub const INVENTORY_HW: &str = "hw";
151pub const INVENTORY_SW: &str = "sw";
152pub const INVENTORY_NET: &str = "net";
153
154/// `logs.fetch.<pc_id>` — request/reply: operator (or backend) sends
155/// a `LogsRequest`; the addressed agent replies with the tail of its
156/// local log file. On-demand only, no stream.
157pub fn logs_fetch(pc_id: &str) -> String {
158 format!("logs.fetch.{pc_id}")
159}
160
161/// `job.tail.<pc_id>` — request/reply for the live tail of a
162/// still-running job's stdout/stderr. The operator (or backend, on
163/// the SPA's behalf) sends a [`crate::wire::JobTailRequest`] carrying
164/// the `result_id`; the addressed agent replies with the current
165/// ring-buffer tail from its in-memory live registry. On-demand only,
166/// no stream — the SPA polls this every few seconds (same shape as
167/// `logs.fetch.<pc_id>`) while a job is in flight. Distinct subject
168/// from `logs.fetch.<pc_id>` (whole-agent log file) because this is
169/// scoped to a single job's captured output, not the agent's log.
170pub fn job_tail(pc_id: &str) -> String {
171 format!("job.tail.{pc_id}")
172}
173
174/// `agents.<pc_id>.ping` — v0.38 / #133 request/reply for the
175/// active "ping" round-trip. The agent answers with a fresh
176/// `Heartbeat` on demand instead of the backend waiting up to ~30 s
177/// for the next periodic heartbeat tick to land. Distinct subject
178/// from `heartbeat.<pc_id>` so the periodic publisher is unaffected
179/// and old agents that don't subscribe simply time the request out.
180pub fn ping(pc_id: &str) -> String {
181 format!("agents.{pc_id}.ping")
182}
183
184// v0.14: subject::inventory_request was retired alongside the
185// hardcoded inventory loop. On-demand collection now goes through
186// the normal exec path (`kanade exec configs/jobs/inventory-
187// hw.yaml`) — Command + ExecResult + the inventory-fact projector
188// give operators the same effect with no extra subject.
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn commands_all_constant() {
196 assert_eq!(COMMANDS_ALL, "commands.all");
197 }
198
199 #[test]
200 fn commands_group_formats_name() {
201 assert_eq!(commands_group("canary"), "commands.group.canary");
202 assert_eq!(commands_group("wave1"), "commands.group.wave1");
203 }
204
205 #[test]
206 fn commands_pc_formats_id() {
207 assert_eq!(commands_pc("pc-01"), "commands.pc.pc-01");
208 assert_eq!(commands_pc("PC1234"), "commands.pc.PC1234");
209 }
210
211 #[test]
212 fn notifications_all_constant() {
213 assert_eq!(NOTIFICATIONS_ALL, "notifications.all");
214 }
215
216 #[test]
217 fn notifications_group_formats_name() {
218 assert_eq!(
219 notifications_group("tokyo-office"),
220 "notifications.group.tokyo-office"
221 );
222 }
223
224 #[test]
225 fn notifications_pc_formats_id() {
226 assert_eq!(notifications_pc("PC1234"), "notifications.pc.PC1234");
227 }
228
229 #[test]
230 fn events_notifications_acked_formats_all_segments() {
231 assert_eq!(
232 events_notifications_acked("PC1234", "S-1-5-21-1001", "notif-9f3a"),
233 "events.notifications.acked.PC1234.S-1-5-21-1001.notif-9f3a"
234 );
235 }
236
237 #[test]
238 fn events_notifications_acked_filter_is_narrow_wildcard() {
239 assert_eq!(
240 EVENTS_NOTIFICATIONS_ACKED_FILTER,
241 "events.notifications.acked.>"
242 );
243 // Must stay a strict subset of the EVENTS stream's `events.>`
244 // subjects so STREAM_EVENTS retains it without a config change.
245 assert!(EVENTS_NOTIFICATIONS_ACKED_FILTER.starts_with("events."));
246 }
247
248 #[test]
249 fn results_formats_request_id() {
250 assert_eq!(results("req-1"), "results.req-1");
251 }
252
253 #[test]
254 fn heartbeat_formats_pc_id() {
255 assert_eq!(heartbeat("pc-01"), "heartbeat.pc-01");
256 }
257
258 #[test]
259 fn host_perf_formats_pc_id() {
260 assert_eq!(host_perf("pc-01"), "host_perf.pc-01");
261 assert_eq!(host_perf("PC1234"), "host_perf.PC1234");
262 }
263
264 #[test]
265 fn process_perf_formats_pc_id() {
266 assert_eq!(process_perf("pc-01"), "process_perf.pc-01");
267 assert_eq!(process_perf("PC1234"), "process_perf.PC1234");
268 }
269
270 #[test]
271 fn obs_formats_pc_id() {
272 assert_eq!(obs("pc-01"), "obs.pc-01");
273 assert_eq!(obs("PC1234"), "obs.PC1234");
274 }
275
276 #[test]
277 fn obs_filter_constant() {
278 assert_eq!(OBS_FILTER, "obs.>");
279 }
280
281 #[test]
282 fn kill_formats_exec_id() {
283 assert_eq!(kill("exec-uuid-1"), "kill.exec-uuid-1");
284 }
285
286 #[test]
287 fn logs_fetch_formats_pc_id() {
288 assert_eq!(logs_fetch("pc-01"), "logs.fetch.pc-01");
289 }
290
291 #[test]
292 fn ping_formats_pc_id() {
293 assert_eq!(ping("pc-01"), "agents.pc-01.ping");
294 }
295
296 #[test]
297 fn job_tail_formats_pc_id() {
298 assert_eq!(job_tail("pc-01"), "job.tail.pc-01");
299 assert_eq!(job_tail("PC1234"), "job.tail.PC1234");
300 }
301
302 #[test]
303 fn events_started_formats_exec_id_and_pc_id() {
304 assert_eq!(
305 events_started("exec-uuid-1", "pc-01"),
306 "events.started.exec-uuid-1.pc-01",
307 );
308 }
309
310 #[test]
311 fn events_started_filter_is_narrow_wildcard() {
312 assert_eq!(EVENTS_STARTED_FILTER, "events.started.>");
313 }
314
315 #[test]
316 fn inventory_formats_pc_id_and_category() {
317 assert_eq!(inventory("pc-01", "hw"), "inventory.pc-01.hw");
318 assert_eq!(inventory("pc-01", INVENTORY_HW), "inventory.pc-01.hw");
319 assert_eq!(inventory("pc-01", INVENTORY_SW), "inventory.pc-01.sw");
320 assert_eq!(inventory("pc-01", INVENTORY_NET), "inventory.pc-01.net");
321 }
322}