Skip to main content

kanade_shared/
subject.rs

1pub const COMMANDS_ALL: &str = "commands.all";
2
3pub fn commands_group(name: &str) -> String {
4    format!("commands.group.{name}")
5}
6
7pub fn commands_pc(pc_id: &str) -> String {
8    format!("commands.pc.{pc_id}")
9}
10
11/// `notifications.all` — broadcast end-user notification (SPEC
12/// §2.2.1 / Phase E). Mirrors [`COMMANDS_ALL`] but on the
13/// notification fan-out plane: the backend publishes here, the
14/// `NOTIFICATIONS` stream retains it, and every agent forwards it to
15/// the Client Apps in matching user sessions.
16pub const NOTIFICATIONS_ALL: &str = "notifications.all";
17
18/// Subject prefix for [`notifications_group`]. Exposed so callers that
19/// parse a subject back into its group name (the backend's notification
20/// audience resolver) strip against the same string the builder emits —
21/// if the format ever changes, both move together instead of the parser
22/// silently failing to match.
23pub const NOTIFICATIONS_GROUP_PREFIX: &str = "notifications.group.";
24
25/// Subject prefix for [`notifications_pc`]. See
26/// [`NOTIFICATIONS_GROUP_PREFIX`].
27pub const NOTIFICATIONS_PC_PREFIX: &str = "notifications.pc.";
28
29/// `notifications.group.{group_name}` — group-scoped end-user
30/// notification. Sibling of [`commands_group`] on the notification
31/// plane.
32pub fn notifications_group(name: &str) -> String {
33    format!("{NOTIFICATIONS_GROUP_PREFIX}{name}")
34}
35
36/// `notifications.pc.{pc_id}` — single-PC end-user notification.
37/// Sibling of [`commands_pc`] on the notification plane.
38pub fn notifications_pc(pc_id: &str) -> String {
39    format!("{NOTIFICATIONS_PC_PREFIX}{pc_id}")
40}
41
42/// `notif-amend` — ephemeral fleet-wide control channel for post-send
43/// operations on a notification (currently: recall). Deliberately NOT
44/// under `notifications.>` so the `NOTIFICATIONS` JetStream stream doesn't
45/// retain it (these are live control pushes, not history). Published via
46/// **core NATS** (fire-and-forget); every agent subscribes and forwards a
47/// `notifications.amended` push to its clients, each of which applies it
48/// only if it holds the referenced id — so a single broadcast needs no
49/// per-audience routing (an id a client doesn't have is a no-op). The
50/// durable half of a recall is the stream message deletion; this is just
51/// the "remove it from screens that are showing it now" half.
52pub const NOTIFICATIONS_AMEND_SUBJECT: &str = "notif-amend";
53
54/// `events.notifications.acked.{pc_id}.{user_sid}.{notif_id}` — the
55/// agent publishes this when a user clicks "確認" on a notification
56/// (SPEC §2.2.2 / Phase E). The `{user_sid}` segment distinguishes
57/// concurrent users on a shared PC (Fast User Switching / RDP). Lives
58/// under `events.>` so the existing `EVENTS` stream retains it; the
59/// backend's notification-acks projector consumes the narrowed
60/// [`EVENTS_NOTIFICATIONS_ACKED_FILTER`] to build the SPA's
61/// per-recipient confirmation view.
62///
63/// Subject spelling is fixed by SPEC §2.2.2 / §2.12.8 (`events.>`), so
64/// acks ride the `EVENTS` stream's retention (shorter than the 90-day
65/// `NOTIFICATIONS` history), not the notification stream's. That only
66/// bounds **re-projection** after a `-WipeDb`: the durable source of
67/// truth for ack_status is the `notification_acks` SQLite table, which
68/// persists independently — so a live fleet keeps full ack history;
69/// only a DB wipe truncates re-derivable acks to the EVENTS window,
70/// the same limitation every `events.*`-projected table already has.
71pub fn events_notifications_acked(pc_id: &str, user_sid: &str, notif_id: &str) -> String {
72    format!("events.notifications.acked.{pc_id}.{user_sid}.{notif_id}")
73}
74
75/// `events.notifications.unacked.{pc_id}.{user_sid}.{notif_id}` — the
76/// agent publishes this when a user *retracts* a prior "確認" (the
77/// read↔unread toggle). Mirror of [`events_notifications_acked`]; the
78/// backend's notification-acks projector consumes it on the same
79/// consumer (see [`EVENTS_NOTIFICATIONS_FILTER`]) so ack and unack for
80/// one recipient stay strictly ordered on the `EVENTS` stream. Layer
81/// split: the **agent** tombstones the `notifications_read` KV (so the
82/// user's own `notifications.list` goes back to unread); this event is
83/// the **projector's** half — it stamps `unacked_at` on the read-model
84/// row and appends the revoke to the audit log. Same retention caveat as
85/// the acked subject: the durable source of truth is SQLite, the stream
86/// only bounds re-projection after a `-WipeDb`.
87pub fn events_notifications_unacked(pc_id: &str, user_sid: &str, notif_id: &str) -> String {
88    format!("events.notifications.unacked.{pc_id}.{user_sid}.{notif_id}")
89}
90
91// `commands_exec` (subject `commands.exec.<job_id>`) was removed in
92// v0.22.1. The STREAM_EXEC stream now catches the existing
93// `commands.{all,group.X,pc.Y}` subjects directly, so the dedicated
94// per-exec subject isn't needed any more. See
95// `kanade-agent::command_replay` for how reconnecting agents catch
96// up on missed messages.
97
98pub fn results(request_id: &str) -> String {
99    format!("results.{request_id}")
100}
101
102pub fn heartbeat(pc_id: &str) -> String {
103    format!("heartbeat.{pc_id}")
104}
105
106/// `host_perf.<pc_id>` — Phase 1 of the perf telemetry pipeline. The
107/// agent publishes a whole-host CPU / Memory / Disk I/O / Network
108/// snapshot here on the cadence set by `host_perf_interval` in
109/// agent_config (default 60 s). Distinct subject from `heartbeat.<pc_id>`
110/// so the periodic heartbeat publisher stays untouched and pre-host_perf
111/// backends that don't subscribe simply ignore the new traffic.
112pub fn host_perf(pc_id: &str) -> String {
113    format!("host_perf.{pc_id}")
114}
115
116/// `process_perf.<pc_id>` — Phase 2: top-N per-process snapshot
117/// published only while `process_perf_enabled` is `true` AND the
118/// `process_perf_expires_at` deadline is in the future. Separate
119/// subject from `host_perf.<pc_id>` because process-perf is an
120/// opt-in investigation mode — having its own subject lets the
121/// projector skip the heavy table entirely for hosts that never
122/// turn it on.
123pub fn process_perf(pc_id: &str) -> String {
124    format!("process_perf.{pc_id}")
125}
126
127/// `obs.<pc_id>` — per-PC observability event stream (Issue #246).
128/// The agent publishes one [`crate::wire::ObsEvent`] per timeline
129/// event (sign-in / out, power on / off, sleep / resume, agent
130/// milestones, diagnostic bundle pointers). Distinct from
131/// `events.started.*` (in-flight script lifecycle) and
132/// `host_perf.<pc_id>` (numeric telemetry) — `obs.*` is the
133/// semantic-event stream the SPA Timeline page consumes.
134pub fn obs(pc_id: &str) -> String {
135    format!("obs.{pc_id}")
136}
137
138/// `obs.>` — filter the backend projector subscribes to so a new
139/// PC starts flowing into the timeline without any per-PC SUB
140/// registration. Pairs with [`obs`] for publish.
141pub const OBS_FILTER: &str = "obs.>";
142
143/// `kill.<exec_id>` — Spec §2.6 Layer 3 abort signal. The exec_id is
144/// the deployment / scheduler-fire UUID (formerly named `job_id`
145/// pre-v0.29; renamed for accuracy — every `Command.exec_id` is a
146/// per-deploy UUID, not a job-catalog id).
147pub fn kill(exec_id: &str) -> String {
148    format!("kill.{exec_id}")
149}
150
151pub fn inventory(pc_id: &str, category: &str) -> String {
152    format!("inventory.{pc_id}.{category}")
153}
154
155/// `events.started.<exec_id>.<pc_id>` — v0.30 / PR α' lifecycle
156/// event published by the agent just before spawning a script's
157/// child process. Lets the backend project an in-flight row into
158/// `execution_results` (with `finished_at = NULL`) so the SPA
159/// Activity table can show running rows alongside finished ones.
160/// Backend subscribes via [`EVENTS_STARTED_FILTER`].
161pub fn events_started(exec_id: &str, pc_id: &str) -> String {
162    format!("events.started.{exec_id}.{pc_id}")
163}
164
165/// Wildcard the backend events projector consumes on STREAM_EVENTS.
166/// Narrow (`events.started.>`) rather than the whole `events.>` so
167/// future event types can carry their own filters without rerouting
168/// the started subset.
169pub const EVENTS_STARTED_FILTER: &str = "events.started.>";
170
171/// Wildcard the backend notification-acks projector consumes on
172/// `STREAM_EVENTS`. Narrow (`events.notifications.acked.>`) rather
173/// than the whole `events.>` so the projector only wakes for ack
174/// events and not the high-volume `events.started.*` lifecycle
175/// traffic (which the events projector handles separately).
176/// NOTE: since unack landed, the backend projector consumes the broader
177/// [`EVENTS_NOTIFICATIONS_FILTER`] (ack + unack on one ordered consumer), so
178/// this narrower `acked.>` filter is **no longer the active consumer filter**.
179/// Kept as a named constant for the ack-only subject shape (docs + tests).
180pub const EVENTS_NOTIFICATIONS_ACKED_FILTER: &str = "events.notifications.acked.>";
181
182/// Wildcard the backend notification-acks projector consumes on
183/// `STREAM_EVENTS` once it handles both ack **and** unack. Broader than
184/// [`EVENTS_NOTIFICATIONS_ACKED_FILTER`] (`acked.>`) so a single durable
185/// consumer sees both `events.notifications.acked.*` and
186/// `events.notifications.unacked.*` in stream-sequence order — the only
187/// way `ack → unack → re-ack` stays correctly serialised (a second
188/// consumer would race the DELETE past a later INSERT). Still a strict
189/// subset of `events.>` so STREAM_EVENTS retains it without a config
190/// change, and still narrower than the high-volume `events.started.*`
191/// traffic the events projector owns.
192pub const EVENTS_NOTIFICATIONS_FILTER: &str = "events.notifications.>";
193
194pub const INVENTORY_HW: &str = "hw";
195pub const INVENTORY_SW: &str = "sw";
196pub const INVENTORY_NET: &str = "net";
197
198/// `logs.fetch.<pc_id>` — request/reply: operator (or backend) sends
199/// a `LogsRequest`; the addressed agent replies with the tail of its
200/// local log file. On-demand only, no stream.
201pub fn logs_fetch(pc_id: &str) -> String {
202    format!("logs.fetch.{pc_id}")
203}
204
205/// `job.tail.<pc_id>` — request/reply for the live tail of a
206/// still-running job's stdout/stderr. The operator (or backend, on
207/// the SPA's behalf) sends a [`crate::wire::JobTailRequest`] carrying
208/// the `result_id`; the addressed agent replies with the current
209/// ring-buffer tail from its in-memory live registry. On-demand only,
210/// no stream — the SPA polls this every few seconds (same shape as
211/// `logs.fetch.<pc_id>`) while a job is in flight. Distinct subject
212/// from `logs.fetch.<pc_id>` (whole-agent log file) because this is
213/// scoped to a single job's captured output, not the agent's log.
214pub fn job_tail(pc_id: &str) -> String {
215    format!("job.tail.{pc_id}")
216}
217
218/// `agents.<pc_id>.ping` — v0.38 / #133 request/reply for the
219/// active "ping" round-trip. The agent answers with a fresh
220/// `Heartbeat` on demand instead of the backend waiting up to ~30 s
221/// for the next periodic heartbeat tick to land. Distinct subject
222/// from `heartbeat.<pc_id>` so the periodic publisher is unaffected
223/// and old agents that don't subscribe simply time the request out.
224pub fn ping(pc_id: &str) -> String {
225    format!("agents.{pc_id}.ping")
226}
227
228// v0.14: subject::inventory_request was retired alongside the
229// hardcoded inventory loop. On-demand collection now goes through
230// the normal exec path (`kanade exec configs/jobs/inventory-
231// hw.yaml`) — Command + ExecResult + the inventory-fact projector
232// give operators the same effect with no extra subject.
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn commands_all_constant() {
240        assert_eq!(COMMANDS_ALL, "commands.all");
241    }
242
243    #[test]
244    fn commands_group_formats_name() {
245        assert_eq!(commands_group("canary"), "commands.group.canary");
246        assert_eq!(commands_group("wave1"), "commands.group.wave1");
247    }
248
249    #[test]
250    fn commands_pc_formats_id() {
251        assert_eq!(commands_pc("pc-01"), "commands.pc.pc-01");
252        assert_eq!(commands_pc("PC1234"), "commands.pc.PC1234");
253    }
254
255    #[test]
256    fn notifications_all_constant() {
257        assert_eq!(NOTIFICATIONS_ALL, "notifications.all");
258    }
259
260    #[test]
261    fn notifications_group_formats_name() {
262        assert_eq!(
263            notifications_group("tokyo-office"),
264            "notifications.group.tokyo-office"
265        );
266    }
267
268    #[test]
269    fn notifications_pc_formats_id() {
270        assert_eq!(notifications_pc("PC1234"), "notifications.pc.PC1234");
271    }
272
273    #[test]
274    fn notifications_amend_subject_constant() {
275        // Pin the value: backend publisher and every agent subscriber must
276        // agree on it, and it must stay OUTSIDE `notifications.>` so the
277        // NOTIFICATIONS stream never retains these ephemeral control messages.
278        assert_eq!(NOTIFICATIONS_AMEND_SUBJECT, "notif-amend");
279        assert!(!NOTIFICATIONS_AMEND_SUBJECT.starts_with("notifications."));
280    }
281
282    #[test]
283    fn events_notifications_acked_formats_all_segments() {
284        assert_eq!(
285            events_notifications_acked("PC1234", "S-1-5-21-1001", "notif-9f3a"),
286            "events.notifications.acked.PC1234.S-1-5-21-1001.notif-9f3a"
287        );
288    }
289
290    #[test]
291    fn events_notifications_acked_filter_is_narrow_wildcard() {
292        assert_eq!(
293            EVENTS_NOTIFICATIONS_ACKED_FILTER,
294            "events.notifications.acked.>"
295        );
296        // Must stay a strict subset of the EVENTS stream's `events.>`
297        // subjects so STREAM_EVENTS retains it without a config change.
298        assert!(EVENTS_NOTIFICATIONS_ACKED_FILTER.starts_with("events."));
299    }
300
301    #[test]
302    fn events_notifications_unacked_formats_all_segments() {
303        assert_eq!(
304            events_notifications_unacked("PC1234", "S-1-5-21-1001", "notif-9f3a"),
305            "events.notifications.unacked.PC1234.S-1-5-21-1001.notif-9f3a"
306        );
307    }
308
309    #[test]
310    fn events_notifications_filter_covers_acked_and_unacked() {
311        assert_eq!(EVENTS_NOTIFICATIONS_FILTER, "events.notifications.>");
312        // The broadened filter must subsume both the acked and unacked
313        // subjects so one durable consumer serialises them in order.
314        let acked = events_notifications_acked("PC1", "S-1", "n1");
315        let unacked = events_notifications_unacked("PC1", "S-1", "n1");
316        assert!(acked.starts_with("events.notifications."));
317        assert!(unacked.starts_with("events.notifications."));
318        // Still a subset of the EVENTS stream's retained subjects.
319        assert!(EVENTS_NOTIFICATIONS_FILTER.starts_with("events."));
320    }
321
322    #[test]
323    fn results_formats_request_id() {
324        assert_eq!(results("req-1"), "results.req-1");
325    }
326
327    #[test]
328    fn heartbeat_formats_pc_id() {
329        assert_eq!(heartbeat("pc-01"), "heartbeat.pc-01");
330    }
331
332    #[test]
333    fn host_perf_formats_pc_id() {
334        assert_eq!(host_perf("pc-01"), "host_perf.pc-01");
335        assert_eq!(host_perf("PC1234"), "host_perf.PC1234");
336    }
337
338    #[test]
339    fn process_perf_formats_pc_id() {
340        assert_eq!(process_perf("pc-01"), "process_perf.pc-01");
341        assert_eq!(process_perf("PC1234"), "process_perf.PC1234");
342    }
343
344    #[test]
345    fn obs_formats_pc_id() {
346        assert_eq!(obs("pc-01"), "obs.pc-01");
347        assert_eq!(obs("PC1234"), "obs.PC1234");
348    }
349
350    #[test]
351    fn obs_filter_constant() {
352        assert_eq!(OBS_FILTER, "obs.>");
353    }
354
355    #[test]
356    fn kill_formats_exec_id() {
357        assert_eq!(kill("exec-uuid-1"), "kill.exec-uuid-1");
358    }
359
360    #[test]
361    fn logs_fetch_formats_pc_id() {
362        assert_eq!(logs_fetch("pc-01"), "logs.fetch.pc-01");
363    }
364
365    #[test]
366    fn ping_formats_pc_id() {
367        assert_eq!(ping("pc-01"), "agents.pc-01.ping");
368    }
369
370    #[test]
371    fn job_tail_formats_pc_id() {
372        assert_eq!(job_tail("pc-01"), "job.tail.pc-01");
373        assert_eq!(job_tail("PC1234"), "job.tail.PC1234");
374    }
375
376    #[test]
377    fn events_started_formats_exec_id_and_pc_id() {
378        assert_eq!(
379            events_started("exec-uuid-1", "pc-01"),
380            "events.started.exec-uuid-1.pc-01",
381        );
382    }
383
384    #[test]
385    fn events_started_filter_is_narrow_wildcard() {
386        assert_eq!(EVENTS_STARTED_FILTER, "events.started.>");
387    }
388
389    #[test]
390    fn inventory_formats_pc_id_and_category() {
391        assert_eq!(inventory("pc-01", "hw"), "inventory.pc-01.hw");
392        assert_eq!(inventory("pc-01", INVENTORY_HW), "inventory.pc-01.hw");
393        assert_eq!(inventory("pc-01", INVENTORY_SW), "inventory.pc-01.sw");
394        assert_eq!(inventory("pc-01", INVENTORY_NET), "inventory.pc-01.net");
395    }
396}