Skip to main content

kanade_shared/
kv.rs

1//! NATS KV bucket name + key helpers (spec §2.3.2).
2//!
3//! NATS KV bucket names must be domain-safe ASCII (a-z, A-Z, 0-9, _, -),
4//! so the spec's dotted names (`script.current`, `script.status`) are
5//! flattened to underscore form here.
6
7pub const BUCKET_SCRIPT_CURRENT: &str = "script_current";
8pub const BUCKET_SCRIPT_STATUS: &str = "script_status";
9pub const BUCKET_AGENTS_STATE: &str = "agents_state";
10pub const BUCKET_AGENT_CONFIG: &str = "agent_config";
11pub const BUCKET_AGENT_GROUPS: &str = "agent_groups";
12pub const BUCKET_SCHEDULES: &str = "schedules";
13
14/// Job catalog (v0.15) — operator-registered Manifests, keyed by
15/// `manifest.id`. Schedules and ad-hoc `kanade run --job-id ...` look
16/// jobs up here; the wire never round-trips an inline Manifest body
17/// through a Schedule again. Editing a job in-place retroactively
18/// changes what future schedule fires deploy.
19pub const BUCKET_JOBS: &str = "jobs";
20
21/// Parallel "operator source-of-truth YAML" stores keyed identically
22/// to `BUCKET_JOBS` / `BUCKET_SCHEDULES`. The agent / scheduler /
23/// projector all keep reading the JSON KVs above — these buckets
24/// exist only so the SPA's YAML editor can round-trip operator
25/// comments + script indentation + block-scalar style exactly.
26///
27/// Population is opportunistic: any `POST` with a
28/// `Content-Type: application/yaml` body stores the raw bytes here
29/// alongside the parsed JSON; JSON-content-type POSTs fall back to a
30/// `serde_yaml` dump so the buckets stay in lockstep with the JSON
31/// store (operator just loses comments on that path).
32pub const BUCKET_JOBS_YAML: &str = "jobs_yaml";
33pub const BUCKET_SCHEDULES_YAML: &str = "schedules_yaml";
34
35/// Fleet-wide singleton settings that aren't per-agent (so they don't
36/// belong in `agent_config`'s layered scopes) and aren't per-schedule
37/// (so they don't belong in `schedules`). First and only key so far is
38/// [`KEY_FREEZE`] (#418 Phase 5 global change-freeze). One small bucket
39/// both the backend scheduler and every agent's local scheduler watch.
40pub const BUCKET_FLEET_CONFIG: &str = "fleet_config";
41
42/// `notifications_read` — per-user read/ack state for end-user
43/// notifications (SPEC §2.3.2 / Phase E). Key shape
44/// `{pc_id}.{user_sid}.{notification_id}`, value JSON
45/// `{"acked_at": ..., "acked_by": "<sid>"}`. The agent writes a row
46/// when it handles a KLP `notifications.ack`, stamping the connecting
47/// user's OS-derived SID — so a shared PC tracks each user's reads
48/// independently. The `{pc_id}.{user_sid}.` prefix lets
49/// `notifications.list` fetch one user's read set with a single prefix
50/// walk. `history: 1` — only the latest ack per key matters.
51pub const BUCKET_NOTIFICATIONS_READ: &str = "notifications_read";
52
53/// KV key in [`BUCKET_NOTIFICATIONS_READ`] for one user's ack of one
54/// notification: `{pc_id}.{user_sid}.{notification_id}` (SPEC §2.3.2).
55///
56/// The components are joined with `.` per the spec's documented key
57/// shape. In practice none of them contain a `.` — `pc_id` is a
58/// hostname, `user_sid` is `S-1-5-…` (hyphen-delimited), and the
59/// backend mints `notification_id` as a UUID (operator-supplied
60/// manifest ids are kebab-case) — so the join stays unambiguous and
61/// the `{pc_id}.{user_sid}.` prefix (see
62/// [`notifications_read_prefix`]) cleanly selects exactly one user's
63/// read set for `notifications.list`.
64pub fn notifications_read_key(pc_id: &str, user_sid: &str, notification_id: &str) -> String {
65    format!("{pc_id}.{user_sid}.{notification_id}")
66}
67
68/// Prefix selecting every ack row for one `(pc_id, user_sid)` in
69/// [`BUCKET_NOTIFICATIONS_READ`] — `{pc_id}.{user_sid}.`.
70/// `notifications.list` walks the bucket keys and keeps those carrying
71/// this prefix to compute the caller's unread set. Pairs with
72/// [`notifications_read_key`].
73pub fn notifications_read_prefix(pc_id: &str, user_sid: &str) -> String {
74    format!("{pc_id}.{user_sid}.")
75}
76
77/// Singleton key in [`BUCKET_FLEET_CONFIG`] holding the JSON-encoded
78/// [`crate::manifest::Freeze`]. **Key absent ⇒ not frozen** (clearing
79/// the freeze is a KV delete), so readers treat a missing key as "fire
80/// normally" and only evaluate `Freeze::is_active` when the key exists.
81pub const KEY_FREEZE: &str = "freeze";
82
83/// KV bucket holding **per-(schedule, pc) last-dispatch marks** for the
84/// backend scheduler's in-flight suppression.
85///
86/// The per-pc / per-target dedup ([`crate::manifest::ExecMode`]) only
87/// sees *completed* runs (`execution_results`, exit_code = 0). Since
88/// #418 the reconcile poll runs every minute ([`crate::manifest::POLL_CRON`]),
89/// but a dispatched Command doesn't land a completion until
90/// `jitter (agent-side) + run + outbox drain` later — frequently
91/// several minutes with a 3–5 min jitter. Without a dispatch record the
92/// poll re-fires the same PC (or whole target) every tick across that
93/// gap. This bucket records "I dispatched (schedule, pc) at T" so the
94/// scheduler can suppress re-fire for a bounded window without waiting
95/// on the completion round-trip.
96///
97/// Values are the dispatch instant as an RFC3339 string. A bucket-wide
98/// `max_age` GCs marks once they're well past any suppression window,
99/// so the bucket can't grow unbounded; the suppression-window check
100/// itself lives in `scheduler::policy::suppress_dispatched`.
101pub const BUCKET_SCHEDULER_DISPATCH: &str = "scheduler_dispatch";
102
103/// Per-pc dispatch-mark key (OncePerPc).
104///
105/// The `pc.` / `target.` kind prefix keeps the two namespaces apart,
106/// and each component is **length-prefixed** (`<len>.<value>`) so no two
107/// distinct `(schedule_id, pc_id)` pairs can ever collide — even when an
108/// id contains the `.` separator: `("a.b", "c")` → `pc.3.a.b.1.c`,
109/// `("a", "b.c")` → `pc.1.a.3.b.c`. (Percent-/base64-encoding isn't an
110/// option: NATS KV keys only allow `[-/_=.a-zA-Z0-9]`, so the
111/// self-delimiting length prefix is the cheapest injective encoding that
112/// stays in-charset.)
113pub fn dispatch_mark_pc_key(schedule_id: &str, pc_id: &str) -> String {
114    format!(
115        "pc.{}.{}.{}.{}",
116        schedule_id.len(),
117        schedule_id,
118        pc_id.len(),
119        pc_id
120    )
121}
122
123/// Whole-target dispatch-mark key (OncePerTarget). One key per
124/// schedule — a per-target fire dispatches the whole target at once, so
125/// there's nothing per-pc to record. Length-prefixed for symmetry with
126/// [`dispatch_mark_pc_key`].
127pub fn dispatch_mark_target_key(schedule_id: &str) -> String {
128    format!("target.{}.{}", schedule_id.len(), schedule_id)
129}
130
131/// Object Store bucket holding raw agent binaries (one object per
132/// version, e.g. `0.2.0` → file bytes).
133pub const OBJECT_AGENT_RELEASES: &str = "agent_releases";
134
135/// Object Store holding **generic application packages** — anything
136/// the agent / kitting scripts pull down + install on endpoints.
137/// First consumer is the kanade-client app, but the bucket is
138/// intentionally generic: third-party installers (Webex, Teams,
139/// custom MSI bundles), upgrade scripts, configuration archives,
140/// etc. all live here.
141///
142/// Object keys are `<name>/<version>` — operator picks `<name>`
143/// once per package family (e.g. `kanade-client`,
144/// `webex-meetings`), then `<version>` per release (e.g.
145/// `0.41.0`, `2025.03`). Slashes are explicitly allowed by NATS
146/// Object Store key rules; the SPA / CLI / HTTP routes all carry
147/// the pair as two path segments.
148///
149/// Why a separate bucket from `agent_releases`:
150/// - `agent_releases` is fleet-critical (the agent's own self-
151///   update path). Keeping it small + audited matters.
152/// - `app_packages` is operator-curated user-space content. The
153///   lifecycle is different (operators add/remove packages
154///   freely; agent releases follow the release.yml pipeline).
155pub const OBJECT_APP_PACKAGES: &str = "app_packages";
156
157/// Object Store holding **manifest script bodies** referenced by
158/// `Execute::script_object` (SPEC §2.4.1's alternative to inline
159/// `script:` / repo-local `script_file:`). Per yukimemi/kanade
160/// issue #210, this is the "Plan B 4-bucket layout" sibling of
161/// `app_packages` — separated because scripts have a different
162/// lifecycle than installer binaries:
163///
164/// - Smaller (typical KB-to-low-MB, vs MB-to-hundreds-of-MB
165///   installers).
166/// - Coupled to manifest versions (script lifecycle = manifest
167///   lifecycle; the `script_current` / `script_status` KV gates
168///   in SPEC §2.6.2 already track manifest versions, so a
169///   matching dedicated bucket keeps the audit story aligned).
170/// - Different access pattern (every Command execute potentially
171///   fetches; vs installer fetched once per fleet deploy).
172///
173/// Object keys follow the same `<name>/<version>` shape as
174/// `app_packages` so the SPA / operator tooling stays uniform.
175/// For manifest-driven scripts `<name>` is the manifest id and
176/// `<version>` is the manifest version, but the bucket itself
177/// imposes no semantics on the pair — operator-uploaded
178/// ad-hoc scripts can use any `<name>/<version>` they like.
179pub const OBJECT_SCRIPTS: &str = "scripts";
180
181/// Object Store holding **overflow stdout / stderr blobs** for the
182/// `ExecResult` wire kind (#227). The default NATS `max_payload` is
183/// 1 MB; a result whose stdout / stderr exceeds it would reject the
184/// publish and pin the agent's outbox in a reconnect loop. The agent
185/// uploads any stdout / stderr larger than `STDOUT_INLINE_THRESHOLD`
186/// (256 KB, picked at 1/4 of the default max_payload so the rest of
187/// the ExecResult fields fit alongside) into this bucket and replaces
188/// the inline field with [`crate::wire::ExecResult::stdout_object`] /
189/// `stderr_object` pointers. Backend's results projector derefs the
190/// pointers before INSERT so downstream consumers (SQLite, SPA
191/// Activity, inventory projector) see the full text the same way
192/// they always have.
193///
194/// Object keys follow the shape `<request_id>/{stdout,stderr}` so
195/// stdout + stderr for the same execution share a sibling prefix —
196/// makes `kanade jetstream` listings group naturally and keeps the
197/// per-key namespace tight against duplicate uploads.
198///
199/// Per-bucket retention (not a stream-wide TTL since async-nats
200/// object_store inherits stream config): matches `STREAM_RESULTS`'s
201/// 30-day retention so an operator who can still query the result
202/// row in SQLite can also fetch the original blob if the inline
203/// copy ever needs re-projection.
204pub const OBJECT_RESULT_OUTPUT: &str = "result_output";
205
206/// Object Store holding **collected file bundles** (#219). A job
207/// carrying a `collect:` manifest hint prints a JSON list of file
208/// paths on stdout; the agent zips them and uploads the archive here,
209/// recording the key in [`crate::wire::ExecResult::collect_object`].
210/// The SPA Collect page lists / downloads bundles straight from this
211/// bucket. Object keys follow `<pc_id>/<job_id>/<rfc3339>.zip` so a
212/// listing groups by host then job. Per-bucket retention is 30 days
213/// (bundles are debugging/audit artifacts, not curated config like
214/// `app_packages` / `scripts`, so they auto-expire) — see
215/// `kanade-shared::bootstrap`.
216pub const OBJECT_COLLECTIONS: &str = "collections";
217
218/// Inline threshold for `ExecResult.stdout` / `.stderr`. Larger
219/// payloads overflow into [`OBJECT_RESULT_OUTPUT`]. 256 KB = 1/4 of
220/// the NATS default `max_payload` (1 MB) so the rest of the
221/// ExecResult JSON (request_id, exec_id, etc.) easily fits below the
222/// publish-reject ceiling.
223///
224/// Lives next to the bucket constant rather than on the agent side
225/// so the SPA / future operator tooling can quote the same threshold
226/// when explaining "why this result has no inline stdout".
227pub const STDOUT_INLINE_THRESHOLD: usize = 256 * 1024;
228
229/// Key inside [`BUCKET_AGENT_CONFIG`] carrying the broadcast target
230/// version. Agents watch this key and self-update when their running
231/// version drifts.
232pub const KEY_AGENT_TARGET_VERSION: &str = "target_version";
233
234/// Sprint 6 layered-config keys inside [`BUCKET_AGENT_CONFIG`]:
235///   * `global`        — fleet-wide default ConfigScope JSON
236///   * `groups.<name>` — per-group override (partial ConfigScope)
237///   * `pcs.<pc_id>`   — per-pc override (partial ConfigScope)
238///
239/// The `groups.` / `pcs.` prefixes let a `kv.keys()` walk pick out
240/// just the rows in one scope when listing.
241pub const KEY_AGENT_CONFIG_GLOBAL: &str = "global";
242pub const PREFIX_AGENT_CONFIG_GROUPS: &str = "groups.";
243pub const PREFIX_AGENT_CONFIG_PCS: &str = "pcs.";
244
245pub fn agent_config_group_key(group: &str) -> String {
246    format!("{PREFIX_AGENT_CONFIG_GROUPS}{group}")
247}
248
249pub fn agent_config_pc_key(pc_id: &str) -> String {
250    format!("{PREFIX_AGENT_CONFIG_PCS}{pc_id}")
251}
252
253/// Inverse of [`agent_config_group_key`] — returns the bare group
254/// name if `key` carries the groups-scope prefix, else `None`.
255pub fn parse_agent_config_group_key(key: &str) -> Option<&str> {
256    key.strip_prefix(PREFIX_AGENT_CONFIG_GROUPS)
257}
258
259/// Inverse of [`agent_config_pc_key`].
260pub fn parse_agent_config_pc_key(key: &str) -> Option<&str> {
261    key.strip_prefix(PREFIX_AGENT_CONFIG_PCS)
262}
263
264pub const SCRIPT_STATUS_ACTIVE: &str = "ACTIVE";
265pub const SCRIPT_STATUS_REVOKED: &str = "REVOKED";
266
267pub const STREAM_INVENTORY: &str = "INVENTORY";
268pub const STREAM_RESULTS: &str = "RESULTS";
269pub const STREAM_EXEC: &str = "EXEC";
270pub const STREAM_EVENTS: &str = "EVENTS";
271pub const STREAM_AUDIT: &str = "AUDIT";
272
273/// JetStream stream retaining end-user notification history (SPEC
274/// §2.3.1 / Phase E). Catches every `notifications.{all|group.X|pc.Y}`
275/// publish the backend fans out, so a Client App that connects after
276/// a notification was sent can still fetch it via KLP
277/// `notifications.list`. 90-day window — long enough for "what did I
278/// miss while on leave" without unbounded growth. Unlike `EXEC`,
279/// retains all messages per subject (no `max_messages_per_subject`):
280/// each notification is its own history entry, not a latest-only state.
281pub const STREAM_NOTIFICATIONS: &str = "NOTIFICATIONS";
282
283/// JetStream stream backing the per-PC observability event pipeline
284/// (Issue #246). Distinct from [`STREAM_EVENTS`] (in-flight script
285/// lifecycle) — `STREAM_OBS_EVENTS` carries the timeline data the
286/// SPA's Events page consumes: sign-in/out, power on/off, sleep/
287/// resume, agent milestones, diagnostic bundle pointers. The agent
288/// publishes on `obs.<pc_id>` (see [`crate::subject::obs`]) and
289/// this stream catches everything matching [`crate::subject::OBS_FILTER`]
290/// so a backend that boots after the agent doesn't miss any
291/// already-emitted events.
292pub const STREAM_OBS_EVENTS: &str = "OBS_EVENTS";
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297
298    /// NATS KV bucket names must be domain-safe ASCII (a-z, A-Z, 0-9, _, -).
299    /// Lock the constants down so a future edit doesn't introduce a `.` and
300    /// break create_key_value silently on the broker side.
301    #[test]
302    fn bucket_names_are_domain_safe() {
303        for name in [
304            BUCKET_SCRIPT_CURRENT,
305            BUCKET_SCRIPT_STATUS,
306            BUCKET_AGENTS_STATE,
307            BUCKET_AGENT_CONFIG,
308            BUCKET_AGENT_GROUPS,
309            BUCKET_SCHEDULES,
310            BUCKET_JOBS,
311            BUCKET_JOBS_YAML,
312            BUCKET_SCHEDULES_YAML,
313            BUCKET_FLEET_CONFIG,
314            BUCKET_NOTIFICATIONS_READ,
315            BUCKET_SCHEDULER_DISPATCH,
316            OBJECT_AGENT_RELEASES,
317            OBJECT_APP_PACKAGES,
318            OBJECT_SCRIPTS,
319            OBJECT_RESULT_OUTPUT,
320            OBJECT_COLLECTIONS,
321        ] {
322            assert!(
323                !name.contains('.'),
324                "bucket name {name:?} contains a dot, which NATS KV rejects"
325            );
326            assert!(
327                name.chars()
328                    .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-'),
329                "bucket name {name:?} has non-domain-safe characters"
330            );
331        }
332    }
333
334    #[test]
335    fn stream_names_are_unique() {
336        let names = [
337            STREAM_INVENTORY,
338            STREAM_RESULTS,
339            STREAM_EXEC,
340            STREAM_EVENTS,
341            STREAM_AUDIT,
342            STREAM_OBS_EVENTS,
343            STREAM_NOTIFICATIONS,
344        ];
345        let mut deduped = names.to_vec();
346        deduped.sort_unstable();
347        deduped.dedup();
348        assert_eq!(
349            deduped.len(),
350            names.len(),
351            "stream constants collide: {names:?}"
352        );
353    }
354
355    #[test]
356    fn notifications_read_key_and_prefix_align() {
357        let key = notifications_read_key("PC1234", "S-1-5-21-1001", "notif-9f3a");
358        assert_eq!(key, "PC1234.S-1-5-21-1001.notif-9f3a");
359        let prefix = notifications_read_prefix("PC1234", "S-1-5-21-1001");
360        assert_eq!(prefix, "PC1234.S-1-5-21-1001.");
361        // The list path selects a user's read set by this prefix — the
362        // key for any of that user's notifications must carry it.
363        assert!(key.starts_with(&prefix));
364        // A different user's key must NOT match the prefix.
365        let other = notifications_read_key("PC1234", "S-1-5-21-1002", "notif-9f3a");
366        assert!(!other.starts_with(&prefix));
367    }
368
369    #[test]
370    fn script_status_strings() {
371        assert_eq!(SCRIPT_STATUS_ACTIVE, "ACTIVE");
372        assert_eq!(SCRIPT_STATUS_REVOKED, "REVOKED");
373        assert_ne!(SCRIPT_STATUS_ACTIVE, SCRIPT_STATUS_REVOKED);
374    }
375
376    #[test]
377    fn key_agent_target_version_constant() {
378        assert_eq!(KEY_AGENT_TARGET_VERSION, "target_version");
379    }
380
381    #[test]
382    fn agent_config_group_key_round_trips() {
383        let k = agent_config_group_key("canary");
384        assert_eq!(k, "groups.canary");
385        assert_eq!(parse_agent_config_group_key(&k), Some("canary"));
386    }
387
388    #[test]
389    fn agent_config_pc_key_round_trips() {
390        let k = agent_config_pc_key("PC-01");
391        assert_eq!(k, "pcs.PC-01");
392        assert_eq!(parse_agent_config_pc_key(&k), Some("PC-01"));
393    }
394
395    #[test]
396    fn dispatch_mark_keys_are_distinct_by_kind() {
397        // The whole-target key for one schedule must never equal the
398        // per-pc key for another — the `pc.` / `target.` prefixes keep
399        // the two namespaces apart even when ids look alike.
400        let per_pc = dispatch_mark_pc_key("collect-winlog-events", "PC-01");
401        let target = dispatch_mark_target_key("collect-winlog-events");
402        assert_eq!(per_pc, "pc.21.collect-winlog-events.5.PC-01");
403        assert_eq!(target, "target.21.collect-winlog-events");
404        assert_ne!(per_pc, target);
405        // A schedule literally named "collect-winlog-events.PC-01"
406        // still can't collide with the per-pc key above.
407        assert_ne!(
408            dispatch_mark_target_key("collect-winlog-events.PC-01"),
409            per_pc,
410        );
411    }
412
413    #[test]
414    fn dispatch_mark_pc_key_has_no_dot_collision() {
415        // Length-prefixing makes the encoding injective: a dotted
416        // schedule_id can't borrow a leading segment from the pc_id (or
417        // vice versa) to forge a colliding key. (CodeRabbit / claude #444.)
418        assert_ne!(
419            dispatch_mark_pc_key("a.b", "c"),
420            dispatch_mark_pc_key("a", "b.c"),
421        );
422        assert_ne!(
423            dispatch_mark_pc_key("x", "y.z"),
424            dispatch_mark_pc_key("x.y", "z"),
425        );
426        // Same components, swapped roles — also distinct.
427        assert_ne!(
428            dispatch_mark_pc_key("foo", "bar"),
429            dispatch_mark_pc_key("bar", "foo"),
430        );
431    }
432
433    #[test]
434    fn agent_config_scope_keys_do_not_collide() {
435        // Belt + braces: make sure no pc id starting with "groups." would
436        // be misparsed (or vice versa). The prefixes are distinct because
437        // they each end in `.` and the parent buckets disagree on what
438        // comes after — pcs holds host names, groups holds membership
439        // names — but locking the invariant in a test stops a future
440        // rename from breaking it.
441        assert_ne!(PREFIX_AGENT_CONFIG_GROUPS, PREFIX_AGENT_CONFIG_PCS);
442        assert!(parse_agent_config_group_key("pcs.someone").is_none());
443        assert!(parse_agent_config_pc_key("groups.someone").is_none());
444        assert_eq!(parse_agent_config_group_key("global"), None);
445        assert_eq!(parse_agent_config_pc_key("global"), None);
446    }
447}