kanade_shared/manifest.rs
1use serde::{Deserialize, Serialize};
2
3use crate::wire::{RunAs, Shell, Staleness};
4
5/// YAML job manifest (= registered "what to run", v0.18.0+).
6///
7/// Owns only script-intrinsic fields. **Who** (`target`), **how to
8/// phase fanout** (`rollout`), and **when to stagger start**
9/// (`jitter`) all moved to the Schedule / exec request side — same
10/// script can now be fired against different targets / rollouts
11/// without copying the script body.
12///
13/// `deny_unknown_fields` makes operators copy-pasting an older yaml
14/// that still has `target:` / `rollout:` see a clear parse error at
15/// `kanade job create` time instead of mysteriously losing it.
16#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
17#[serde(deny_unknown_fields)]
18pub struct Manifest {
19 pub id: String,
20 pub version: String,
21 #[serde(default)]
22 pub description: Option<String>,
23 pub execute: Execute,
24 #[serde(default)]
25 pub require_approval: bool,
26 /// Opt-in marker that this job produces a JSON inventory fact
27 /// payload on stdout. When present, the backend's results
28 /// projector parses `ExecResult.stdout` as JSON and upserts an
29 /// `inventory_facts` row keyed by `(pc_id, manifest.id)`. The
30 /// `display` sub-config drives the SPA's Inventory page render.
31 #[serde(default)]
32 pub inventory: Option<InventoryHint>,
33 /// Issue #246: opt-in marker that this job emits per-line
34 /// observability events on stdout (one JSON `ObsEvent` per
35 /// newline). When present, the agent — after the script exits
36 /// successfully — parses each non-empty stdout line as an
37 /// `ObsEvent`, publishes it on `obs.<pc_id>` via the
38 /// `obs_outbox`, and (intentionally) **omits the stdout from
39 /// the `ExecResult`** so the timeline data doesn't double up
40 /// in `execution_results.stdout` (which would multiply rows
41 /// by ~50/day/PC of noise).
42 ///
43 /// Distinct from `inventory:` (single JSON object → projector
44 /// upsert) — events are append-only timeline points consumed
45 /// by the dedicated `obs_events` table.
46 #[serde(default)]
47 pub emit: Option<EmitConfig>,
48 /// v0.26: Layer 2 staleness policy (SPEC.md §2.6.2). Controls
49 /// what the agent does at fire time when it can't verify the
50 /// `script_current` / `script_status` KV values are fresh —
51 /// especially relevant for `runs_on: agent` schedules where
52 /// the agent may fire from cache while offline. Defaults to
53 /// `Staleness::Cached` (silently use cached values), which
54 /// matches every pre-v0.26 Manifest.
55 #[serde(default)]
56 pub staleness: Staleness,
57}
58
59/// "Who + how + when-to-stagger" — the fanout-plan side of an exec.
60/// Used both as the POST `/api/exec/{job_id}` body and as the embedded
61/// `target` / `rollout` / `jitter` slot on [`Schedule`]. Centralising
62/// here keeps the validation + serialisation logic in one place.
63#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default)]
64pub struct FanoutPlan {
65 #[serde(default)]
66 pub target: Target,
67 /// Optional wave rollout — when present, the backend publishes
68 /// each wave's group subject on its own delay schedule instead
69 /// of fanning out the `target` block in one go. `target` then
70 /// only labels the deploy for the audit log.
71 #[serde(default, skip_serializing_if = "Option::is_none")]
72 pub rollout: Option<Rollout>,
73 /// Optional humantime jitter; agent uses it to randomise
74 /// execution start. Lives here (not on the script) so different
75 /// schedules / ad-hoc fires of the same job can pick different
76 /// stagger windows.
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub jitter: Option<String>,
79 /// Absolute time the scheduler stamps on each emitted Command
80 /// when this exec was driven by a [`Schedule`] with
81 /// `starting_deadline`. Agents receiving a Command after this
82 /// instant publish a synthetic skipped-result instead of
83 /// running the script. `None` (default) = no deadline / catch
84 /// up whenever delivered. Operators don't usually set this
85 /// directly — the scheduler computes it from `tick_at +
86 /// starting_deadline`.
87 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub deadline_at: Option<chrono::DateTime<chrono::Utc>>,
89}
90
91/// Manifest sub-section: how the SPA should render the inventory
92/// facts this job produces. Each field name (`field`) is a top-level
93/// key in the stdout JSON, e.g. `hostname`, `ram_gb`.
94///
95/// Two render modes:
96/// * `display` — vertical "field / value" per PC, used by the
97/// `/inventory?pc=<id>` detail view. ALL columns the operator
98/// wants visible on the detail page.
99/// * `summary` — horizontal table across the fleet (row = PC,
100/// column = field) on `/inventory`. Optional; when omitted the
101/// SPA falls back to `display`, but operators usually want a
102/// trimmer "hostname / OS / CPU / RAM" set for the fleet view.
103#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
104pub struct InventoryHint {
105 /// Detail-view columns, in order.
106 pub display: Vec<DisplayField>,
107 /// Optional fleet-list columns (row = PC). Defaults to `display`
108 /// when omitted, but operators usually pick a 3-5 column subset.
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub summary: Option<Vec<DisplayField>>,
111 /// v0.31 / #40: payload arrays that should be exploded into
112 /// per-element rows of a derived SQLite table. Lets operators
113 /// answer cross-PC questions ("which PCs still have Chrome <
114 /// 120?", "C: >90% full") with normal SQL filters + indexes
115 /// instead of grepping JSON. The projector creates the derived
116 /// table on register and replaces this PC's rows on each result
117 /// (DELETE WHERE pc_id=? AND job_id=? + bulk INSERT). See
118 /// [`ExplodeSpec`] for the per-spec schema.
119 #[serde(default, skip_serializing_if = "Option::is_none")]
120 pub explode: Option<Vec<ExplodeSpec>>,
121 /// v0.35 / #93: top-level scalar fields whose changes the
122 /// projector logs to `inventory_history` (one event per
123 /// changed field per scan). Pairs with `explode[].track_history`
124 /// — that covers array elements; this covers single-valued
125 /// fields like `ram_bytes` / `os_version` / `cpu_model` /
126 /// `os_build` that operators want to track for "did the RAM
127 /// get upgraded?" / "when did Win 11 land on this PC?" /
128 /// "BIOS / firmware bumped?" questions. Field name = `field_path`
129 /// in the history row, `identity_json` is NULL, `before_json`
130 /// / `after_json` each carry `{"value": <prior or new value>}`.
131 /// First-ever observation of a scalar (no prior facts row)
132 /// emits `added`; subsequent value changes emit `changed`. No
133 /// `removed` events — a scalar disappearing from the payload
134 /// is rare and the operator can still see the last value via
135 /// the `before_json` of the most recent change.
136 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub history_scalars: Option<Vec<String>>,
138}
139
140/// Issue #246 — `emit:` manifest block for jobs whose stdout is
141/// NDJSON observability events (one `ObsEvent` per line). Parallel
142/// to `inventory:` but for the append-only timeline pipeline; see
143/// `Manifest::emit` for the full contract.
144#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
145#[serde(deny_unknown_fields)]
146pub struct EmitConfig {
147 /// What kind of payload the agent should expect on stdout. Only
148 /// `events` is defined today (parses each non-empty line as
149 /// `ObsEvent` and publishes on `obs.<pc_id>`); future variants
150 /// (e.g. metrics streams, structured trace events) plug in here.
151 #[serde(rename = "type")]
152 pub kind: EmitKind,
153 /// Operator hint for where the script keeps its own state — the
154 /// watermark file the PowerShell / sh body reads + writes
155 /// between runs so it only emits NEW events since the last
156 /// poll. The agent doesn't read this; it's documentation that
157 /// the SPA (and `kanade job edit`) can surface to operators
158 /// reviewing the manifest. Optional; the script is allowed to
159 /// keep state anywhere (registry, env, etc.) — the field's
160 /// presence makes the convention discoverable.
161 #[serde(default, skip_serializing_if = "Option::is_none")]
162 pub watermark_path: Option<String>,
163}
164
165/// `emit.type` enum. Lowercase serde so manifests read
166/// `type: events` rather than `Events`.
167#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
168#[serde(rename_all = "lowercase")]
169pub enum EmitKind {
170 /// Per-line `ObsEvent` JSON. Agent parses + publishes on
171 /// `obs.<pc_id>`, drops the stdout from the resulting
172 /// `ExecResult`.
173 Events,
174}
175
176/// v0.31 / #40: declarative "flatten this JSON array into a real
177/// SQLite table" spec on an inventory manifest. The projector
178/// creates the table on first registration (CREATE TABLE IF NOT
179/// EXISTS + indexes) and writes a row per element of
180/// `payload[field]` on every result, scoped by (pc_id, job_id) so
181/// each PC's rows replace cleanly without a per-PC schema.
182#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
183pub struct ExplodeSpec {
184 /// JSON array key under the payload to explode. E.g. `"apps"`
185 /// for `payload: { apps: [{...}, {...}] }`.
186 pub field: String,
187 /// Derived SQLite table name. Operators choose this — pick
188 /// something namespaced + stable (`inventory_sw_apps`, not
189 /// `apps`) so multiple inventory manifests don't collide on a
190 /// generic name.
191 pub table: String,
192 /// Element-level fields that uniquely identify a row inside one
193 /// PC's payload. The full PK is `(pc_id, job_id) + these
194 /// columns`. Required — operators must think about uniqueness
195 /// (e.g. `["name", "source"]` for installed apps because the
196 /// same name appears in multiple uninstall hives).
197 ///
198 /// v0.31 / #41: same tuple drives history identity. When
199 /// `track_history` is on, the projector serialises these
200 /// fields' values into `inventory_history.identity_json` for
201 /// every change event, so queries like "every PC that ever
202 /// installed Chrome (any source)" filter on identity_json
203 /// content without a per-manifest schema.
204 pub primary_key: Vec<String>,
205 /// Per-element fields that become columns in the derived table.
206 pub columns: Vec<ExplodeColumn>,
207 /// v0.31 / #41: when true (default false), the projector
208 /// diffs each PC's incoming payload against the prior rows
209 /// for the same (pc_id, job_id) BEFORE the DELETE-then-INSERT
210 /// replace, and writes added / removed / changed events into
211 /// `inventory_history`. Lets operators answer time-dimension
212 /// questions ("when did Chrome 120 first appear on PC X?",
213 /// "what's the Win 11 23H2 rollout curve") without storing
214 /// per-scan snapshots. Off by default so operators opt in
215 /// per-spec — history has a real storage cost on long-lived
216 /// deployments (mitigated by the 90-day default retention
217 /// sweeper, see `cleanup` module).
218 #[serde(default)]
219 pub track_history: bool,
220}
221
222/// One column in an [`ExplodeSpec`]'s derived table.
223#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
224pub struct ExplodeColumn {
225 /// JSON key under each array element. Becomes the column name
226 /// in the derived SQLite table — we don't rename.
227 pub field: String,
228 /// SQLite affinity: `"text"` (default), `"integer"`, `"real"`.
229 /// Storage maps directly via `sqlx::query.bind(...)`; type
230 /// mismatches at INSERT-time fail loudly rather than silently
231 /// dropping the row.
232 #[serde(default, skip_serializing_if = "Option::is_none")]
233 #[serde(rename = "type")]
234 pub kind: Option<String>,
235 /// When true, the projector creates a `CREATE INDEX` on this
236 /// column at table-creation time. Boost for the common-filter
237 /// columns (`name`, `version`) — operators mark them
238 /// explicitly, the projector won't guess.
239 #[serde(default)]
240 pub index: bool,
241}
242
243#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
244pub struct DisplayField {
245 /// Top-level key in the stdout JSON.
246 pub field: String,
247 /// Human-readable column header.
248 pub label: String,
249 /// Optional render hint — `"number"`, `"bytes"`, `"timestamp"`,
250 /// or `"table"` (#39). Defaults to plain text rendering on the
251 /// SPA side. `"table"` expects the field's value to be a JSON
252 /// array of objects and renders a nested sub-table on the
253 /// per-PC detail page using `columns` as the schema; the fleet
254 /// summary view falls back to showing the row count for
255 /// `"table"` cells so the wide list stays compact.
256 #[serde(default, skip_serializing_if = "Option::is_none")]
257 #[serde(rename = "type")]
258 pub kind: Option<String>,
259 /// v0.30 / #39: when `kind == "table"`, the SPA renders the
260 /// field's value (an array of objects like
261 /// `disks: [{ device_id, size_bytes, ... }]`) as a nested
262 /// sub-table using these columns. Each column is itself a
263 /// `DisplayField`, so the nested cells reuse the same render
264 /// hints (`bytes`, `number`, `timestamp`) — no parallel format
265 /// pipeline. Ignored for any other `kind`.
266 #[serde(default, skip_serializing_if = "Option::is_none")]
267 pub columns: Option<Vec<DisplayField>>,
268}
269
270#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
271pub struct Rollout {
272 #[serde(default)]
273 pub strategy: RolloutStrategy,
274 pub waves: Vec<Wave>,
275}
276
277#[derive(
278 Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
279)]
280#[serde(rename_all = "lowercase")]
281pub enum RolloutStrategy {
282 #[default]
283 Wave,
284}
285
286#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
287pub struct Wave {
288 pub group: String,
289 /// humantime delay measured from the deploy's publish time. wave[0]
290 /// typically has "0s"; subsequent waves use minutes / hours.
291 pub delay: String,
292}
293
294#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default)]
295pub struct Target {
296 #[serde(default)]
297 pub groups: Vec<String>,
298 #[serde(default)]
299 pub pcs: Vec<String>,
300 #[serde(default)]
301 pub all: bool,
302}
303
304impl Target {
305 /// At least one of all / groups / pcs is set.
306 pub fn is_specified(&self) -> bool {
307 self.all || !self.groups.is_empty() || !self.pcs.is_empty()
308 }
309}
310
311#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
312#[serde(deny_unknown_fields)]
313pub struct Execute {
314 pub shell: ExecuteShell,
315 /// Inline script body. Mutually exclusive with [`script_file`]
316 /// and [`script_object`]; exactly one of the three must be set
317 /// (enforced by [`Execute::validate_script_source`] at the
318 /// write-side parse boundaries — `kanade job create` and
319 /// `POST /api/jobs`).
320 ///
321 /// Empty string is treated as **unset** so operators can swap
322 /// to a `script_file:` / `script_object:` alternative just by
323 /// commenting out the body, without having to also drop the
324 /// `script:` key entirely.
325 ///
326 /// [`script_file`]: Self::script_file
327 /// [`script_object`]: Self::script_object
328 #[serde(default, skip_serializing_if = "Option::is_none")]
329 pub script: Option<String>,
330 /// Repo-local file path resolved by the operator-side CLI at
331 /// `kanade job create` time. The CLI reads the file, slots its
332 /// contents into `script`, and clears this field before
333 /// POSTing — so the backend / agents never see `script_file`
334 /// in stored manifests. SPEC §2.4.1.
335 ///
336 /// Resolver lands in a follow-up PR
337 /// (yukimemi/kanade#210); today this field passes parse-time
338 /// validation but the operator-side CLI bails with "not yet
339 /// implemented" until the resolver ships, so manifests that
340 /// reach the backend with `script_file` set are treated as a
341 /// schema-bug.
342 #[serde(default, skip_serializing_if = "Option::is_none")]
343 pub script_file: Option<String>,
344 /// Object Store reference (`<name>/<version>`) into the
345 /// `scripts` bucket (`OBJECT_SCRIPTS`). Agents fetch the body
346 /// at Execute time via `/api/script-objects/{name}/{version}`
347 /// and cache it locally. SPEC §2.4.1.
348 ///
349 /// Resolver lands in the same follow-up PR as `script_file`;
350 /// today this field passes parse-time validation but the
351 /// backend / agent exec paths bail with "not yet implemented"
352 /// when they see it.
353 #[serde(default, skip_serializing_if = "Option::is_none")]
354 pub script_object: Option<String>,
355 /// humantime duration string (e.g. "30s", "10m"). Script-intrinsic
356 /// — represents how long this script reasonably takes to run.
357 pub timeout: String,
358 /// Token + session combination the agent uses to launch the
359 /// script (v0.21). Default = [`RunAs::System`] (Session 0,
360 /// LocalSystem privileges, no GUI) — matches pre-v0.21 behavior.
361 #[serde(default)]
362 pub run_as: RunAs,
363 /// Working directory for the spawned child (v0.21.1). When
364 /// unset, the child inherits the agent's cwd — on Windows that
365 /// means `%SystemRoot%\System32` for the prod service, which is
366 /// almost never what operators actually want. Use an absolute
367 /// path; relative paths are passed through to the OS verbatim.
368 /// `%PROGRAMDATA%` works for `run_as: system`; for `run_as: user`
369 /// you'd want `%USERPROFILE%` (but expansion happens in the
370 /// shell, so write `$env:USERPROFILE` for PowerShell, or set
371 /// it via teravars before `kanade job create`).
372 #[serde(default, skip_serializing_if = "Option::is_none")]
373 pub cwd: Option<String>,
374}
375
376impl Execute {
377 /// Treat an empty `script:` body as "intentionally unset". Operators
378 /// commenting out a block-scalar tend to leave the key behind, and
379 /// failing the validator on `script: ""` would surprise them.
380 fn has_inline_script(&self) -> bool {
381 matches!(&self.script, Some(s) if !s.is_empty())
382 }
383
384 /// Enforce that exactly one of `script` / `script_file` /
385 /// `script_object` is set. Called at the write-side parse
386 /// boundaries (CLI `kanade job create` + backend
387 /// `POST /api/jobs`) so ambiguous YAML is rejected before it
388 /// reaches the JOBS KV. Read paths (projector, agent
389 /// scheduler, list endpoints) skip this check — they only ever
390 /// see what the write path already validated.
391 pub fn validate_script_source(&self) -> Result<(), String> {
392 let inline = self.has_inline_script();
393 let file = self.script_file.is_some();
394 let obj = self.script_object.is_some();
395 let set = [inline, file, obj].into_iter().filter(|b| *b).count();
396 match set {
397 1 => Ok(()),
398 0 => Err("execute: one of `script`, `script_file`, `script_object` must be set".into()),
399 _ => Err(format!(
400 "execute: only one of `script` / `script_file` / `script_object` may be set \
401 (got script={inline}, script_file={file}, script_object={obj})"
402 )),
403 }
404 }
405}
406
407impl Manifest {
408 /// Cross-field semantic checks that don't fit into pure serde
409 /// derive. Currently delegates to
410 /// [`Execute::validate_script_source`] — see that method's
411 /// docs for the rationale on which call sites should run this.
412 pub fn validate(&self) -> Result<(), String> {
413 self.execute.validate_script_source()?;
414 Ok(())
415 }
416}
417
418#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
419#[serde(rename_all = "lowercase")]
420pub enum ExecuteShell {
421 Powershell,
422 Cmd,
423}
424
425impl From<ExecuteShell> for Shell {
426 fn from(s: ExecuteShell) -> Self {
427 match s {
428 ExecuteShell::Powershell => Shell::Powershell,
429 ExecuteShell::Cmd => Shell::Cmd,
430 }
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn target_is_specified_requires_at_least_one_field() {
440 let empty = Target::default();
441 assert!(!empty.is_specified());
442
443 let with_all = Target {
444 all: true,
445 ..Target::default()
446 };
447 assert!(with_all.is_specified());
448
449 let with_groups = Target {
450 groups: vec!["canary".into()],
451 ..Target::default()
452 };
453 assert!(with_groups.is_specified());
454
455 let with_pcs = Target {
456 pcs: vec!["pc-01".into()],
457 ..Target::default()
458 };
459 assert!(with_pcs.is_specified());
460 }
461
462 #[test]
463 fn manifest_deserialises_minimal_yaml() {
464 // Matches jobs/echo-test.yaml. v0.18: no target/rollout/jitter
465 // — those live on the schedule / exec request now.
466 let yaml = r#"
467id: echo-test
468version: 0.0.1
469execute:
470 shell: powershell
471 script: "echo 'kanade'"
472 timeout: 30s
473"#;
474 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
475 assert_eq!(m.id, "echo-test");
476 assert_eq!(m.version, "0.0.1");
477 assert!(matches!(m.execute.shell, ExecuteShell::Powershell));
478 assert_eq!(
479 m.execute.script.as_deref().map(str::trim),
480 Some("echo 'kanade'")
481 );
482 assert!(m.execute.script_file.is_none());
483 assert!(m.execute.script_object.is_none());
484 assert_eq!(m.execute.timeout, "30s");
485 assert!(!m.require_approval);
486 m.validate()
487 .expect("inline-script manifest passes validation");
488 }
489
490 fn execute_with(
491 script: Option<&str>,
492 script_file: Option<&str>,
493 script_object: Option<&str>,
494 ) -> Execute {
495 Execute {
496 shell: ExecuteShell::Powershell,
497 script: script.map(str::to_owned),
498 script_file: script_file.map(str::to_owned),
499 script_object: script_object.map(str::to_owned),
500 timeout: "30s".into(),
501 run_as: RunAs::default(),
502 cwd: None,
503 }
504 }
505
506 #[test]
507 fn validate_accepts_inline_script() {
508 let e = execute_with(Some("echo hi"), None, None);
509 assert!(e.validate_script_source().is_ok());
510 }
511
512 #[test]
513 fn validate_accepts_script_file_alone() {
514 let e = execute_with(None, Some("scripts/cleanup.ps1"), None);
515 assert!(e.validate_script_source().is_ok());
516 }
517
518 #[test]
519 fn validate_accepts_script_object_alone() {
520 let e = execute_with(None, None, Some("cleanup/1.0.0"));
521 assert!(e.validate_script_source().is_ok());
522 }
523
524 #[test]
525 fn validate_treats_empty_inline_script_as_unset() {
526 // `script: ""` + `script_object` set is the natural shape
527 // when an operator comments out the YAML block-scalar body
528 // but leaves the key. Should pass.
529 let e = execute_with(Some(""), None, Some("cleanup/1.0.0"));
530 assert!(e.validate_script_source().is_ok());
531 }
532
533 #[test]
534 fn validate_rejects_zero_sources() {
535 let e = execute_with(None, None, None);
536 let err = e.validate_script_source().unwrap_err();
537 assert!(err.contains("must be set"), "got: {err}");
538 }
539
540 #[test]
541 fn validate_rejects_empty_inline_only() {
542 let e = execute_with(Some(""), None, None);
543 let err = e.validate_script_source().unwrap_err();
544 assert!(err.contains("must be set"), "got: {err}");
545 }
546
547 #[test]
548 fn validate_rejects_inline_plus_file() {
549 let e = execute_with(Some("echo hi"), Some("scripts/cleanup.ps1"), None);
550 let err = e.validate_script_source().unwrap_err();
551 assert!(err.contains("only one of"), "got: {err}");
552 }
553
554 #[test]
555 fn validate_rejects_inline_plus_object() {
556 let e = execute_with(Some("echo hi"), None, Some("cleanup/1.0.0"));
557 let err = e.validate_script_source().unwrap_err();
558 assert!(err.contains("only one of"), "got: {err}");
559 }
560
561 #[test]
562 fn validate_rejects_file_plus_object() {
563 let e = execute_with(None, Some("scripts/cleanup.ps1"), Some("cleanup/1.0.0"));
564 let err = e.validate_script_source().unwrap_err();
565 assert!(err.contains("only one of"), "got: {err}");
566 }
567
568 #[test]
569 fn validate_rejects_all_three() {
570 let e = execute_with(
571 Some("echo hi"),
572 Some("scripts/cleanup.ps1"),
573 Some("cleanup/1.0.0"),
574 );
575 let err = e.validate_script_source().unwrap_err();
576 assert!(err.contains("only one of"), "got: {err}");
577 }
578
579 #[test]
580 fn manifest_deserialises_script_object_yaml() {
581 // SPEC §2.4.1 example shape with the Object Store
582 // reference picked over inline.
583 let yaml = r#"
584id: cleanup-disk-temp
585version: 1.0.1
586execute:
587 shell: powershell
588 script_object: cleanup-disk-temp/1.0.1
589 timeout: 600s
590"#;
591 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
592 assert_eq!(
593 m.execute.script_object.as_deref(),
594 Some("cleanup-disk-temp/1.0.1")
595 );
596 assert!(m.execute.script.is_none());
597 m.validate()
598 .expect("script_object-only manifest passes validation");
599 }
600
601 #[test]
602 fn manifest_rejects_typo_in_script_field_name() {
603 // `deny_unknown_fields` on Execute catches `script_objectt`
604 // and similar fat-fingers at parse time instead of letting
605 // them silently fall through to "all three unset".
606 let yaml = r#"
607id: typo
608version: 1.0.0
609execute:
610 shell: powershell
611 script_objectt: oops
612 timeout: 30s
613"#;
614 let r: Result<Manifest, _> = serde_yaml::from_str(yaml);
615 assert!(r.is_err(), "expected parse error, got {r:?}");
616 }
617
618 #[test]
619 fn schedule_carries_target_and_rollout() {
620 let yaml = r#"
621id: hourly-cleanup-canary
622when:
623 per_pc: { every: 1h }
624job_id: cleanup
625enabled: true
626target:
627 groups: [canary, wave1]
628jitter: 30s
629rollout:
630 strategy: wave
631 waves:
632 - { group: canary, delay: 0s }
633 - { group: wave1, delay: 5s }
634"#;
635 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
636 assert_eq!(s.id, "hourly-cleanup-canary");
637 assert_eq!(s.job_id, "cleanup");
638 assert_eq!(s.plan.target.groups, vec!["canary", "wave1"]);
639 assert_eq!(s.plan.jitter.as_deref(), Some("30s"));
640 let rollout = s.plan.rollout.expect("rollout present");
641 assert_eq!(rollout.waves.len(), 2);
642 assert_eq!(rollout.waves[0].group, "canary");
643 assert_eq!(rollout.waves[1].delay, "5s");
644 assert_eq!(rollout.strategy, RolloutStrategy::Wave);
645 }
646
647 #[test]
648 fn schedule_minimal_target_all() {
649 let yaml = r#"
650id: kitting
651when:
652 per_pc: once
653enabled: true
654job_id: scheduled-echo
655target: { all: true }
656"#;
657 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
658 assert_eq!(s.id, "kitting");
659 assert_eq!(s.when, When::PerPc(PerPolicy::Once(OnceLiteral::Once)));
660 assert!(s.enabled);
661 assert_eq!(s.job_id, "scheduled-echo");
662 assert!(s.plan.target.all);
663 assert!(s.plan.rollout.is_none());
664 assert!(s.plan.jitter.is_none());
665 assert!(s.active.is_empty());
666 }
667
668 #[test]
669 fn schedule_enabled_defaults_to_true() {
670 let yaml = r#"
671id: x
672when:
673 per_pc: once
674job_id: y
675target: { all: true }
676"#;
677 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
678 assert!(s.enabled);
679 }
680
681 // ---- `when` parsing (#418 Phase 1) ----
682
683 fn schedule_yaml_with(when_block: &str) -> String {
684 format!(
685 r#"
686id: x
687when:
688{when_block}
689job_id: y
690target: {{ all: true }}
691"#
692 )
693 }
694
695 #[test]
696 fn when_per_pc_every_parses_unquoted_humantime() {
697 // `6h` is digit-led but non-numeric → YAML string, same as
698 // the old `cooldown: 6h` convention. No quotes needed.
699 let s: Schedule =
700 serde_yaml::from_str(&schedule_yaml_with(" per_pc: { every: 6h }")).expect("parse");
701 assert_eq!(
702 s.when,
703 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() }))
704 );
705 }
706
707 #[test]
708 fn when_per_target_every_parses() {
709 let s: Schedule = serde_yaml::from_str(&schedule_yaml_with(" per_target: { every: 24h }"))
710 .expect("parse");
711 assert_eq!(
712 s.when,
713 When::PerTarget(PerPolicy::Every(EverySpec {
714 every: "24h".into()
715 }))
716 );
717 }
718
719 #[test]
720 fn when_per_target_once_parses() {
721 // Falls out of the shared PerPolicy shape and decide_fire
722 // already implements it ("any one pc succeeds → skip the
723 // target forever"), so it is allowed, not rejected.
724 let s: Schedule =
725 serde_yaml::from_str(&schedule_yaml_with(" per_target: once")).expect("parse");
726 assert_eq!(s.when, When::PerTarget(PerPolicy::Once(OnceLiteral::Once)));
727 }
728
729 #[test]
730 fn when_cron_escape_hatch_parses() {
731 let s: Schedule =
732 serde_yaml::from_str(&schedule_yaml_with(" cron: \"0 0 9 * * mon-fri\""))
733 .expect("parse");
734 assert_eq!(s.when, When::Cron("0 0 9 * * mon-fri".into()));
735 }
736
737 #[test]
738 fn when_rejects_bad_once_keyword() {
739 // `onec` must be a parse error, not a silently-absorbed
740 // string (OnceLiteral is a single-variant enum for exactly
741 // this reason).
742 let r: Result<Schedule, _> = serde_yaml::from_str(&schedule_yaml_with(" per_pc: onec"));
743 assert!(r.is_err(), "expected parse error, got {r:?}");
744 }
745
746 #[test]
747 fn when_rejects_unknown_key_in_every() {
748 // EverySpec is deny_unknown_fields so `evry:` typos fail
749 // even under the untagged PerPolicy.
750 let r: Result<Schedule, _> =
751 serde_yaml::from_str(&schedule_yaml_with(" per_pc: { evry: 6h }"));
752 assert!(r.is_err(), "expected parse error, got {r:?}");
753 }
754
755 #[test]
756 fn when_rejects_unknown_variant() {
757 let r: Result<Schedule, _> =
758 serde_yaml::from_str(&schedule_yaml_with(" per_galaxy: once"));
759 assert!(r.is_err(), "expected parse error, got {r:?}");
760 }
761
762 #[test]
763 fn when_rejects_old_top_level_cron_field() {
764 // Pre-#418 shape: top-level `cron:` + no `when:`. Must fail
765 // loudly (missing `when`), which is what turns stale KV
766 // blobs into warn-skips after the upgrade.
767 let yaml = r#"
768id: x
769cron: "* * * * * *"
770job_id: y
771target: { all: true }
772"#;
773 let r: Result<Schedule, _> = serde_yaml::from_str(yaml);
774 assert!(r.is_err(), "expected parse error, got {r:?}");
775 }
776
777 #[test]
778 fn when_round_trips_json_and_yaml() {
779 // Round-trip through the full Schedule: that is the wire
780 // unit for both stores (JSON catalog KV + YAML mirror), and
781 // it exercises the singleton_map field attribute that keeps
782 // serde_yaml on the map shape instead of `!per_pc` tags.
783 for when in [
784 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
785 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
786 When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
787 When::PerTarget(PerPolicy::Every(EverySpec {
788 every: "24h".into(),
789 })),
790 When::Cron("0 0 9 * * mon-fri".into()),
791 ] {
792 let s = schedule_with(when.clone(), RunsOn::Backend);
793
794 let json = serde_json::to_string(&s).expect("json serialise");
795 let back: Schedule = serde_json::from_str(&json).expect("json deserialise");
796 assert_eq!(back.when, when, "json round-trip for {when}");
797
798 let yaml = serde_yaml::to_string(&s).expect("yaml serialise");
799 assert!(
800 !yaml.contains('!'),
801 "yaml must use the map shape, not tags: {yaml}"
802 );
803 let back: Schedule = serde_yaml::from_str(&yaml).expect("yaml deserialise");
804 assert_eq!(back.when, when, "yaml round-trip for {when}");
805 }
806 }
807
808 #[test]
809 fn when_once_serialises_as_bare_keyword() {
810 // The wire shape operators see in the YAML mirror must stay
811 // the ergonomic `per_pc: once`, not a one-variant map.
812 let json = serde_json::to_value(When::PerPc(PerPolicy::Once(OnceLiteral::Once)))
813 .expect("serialise");
814 assert_eq!(json, serde_json::json!({ "per_pc": "once" }));
815 }
816
817 #[test]
818 fn when_displays_operator_summary() {
819 for (when, expected) in [
820 (
821 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
822 "per_pc once",
823 ),
824 (
825 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
826 "per_pc every 6h",
827 ),
828 (
829 When::PerTarget(PerPolicy::Every(EverySpec {
830 every: "24h".into(),
831 })),
832 "per_target every 24h",
833 ),
834 (
835 When::Cron("0 0 9 * * mon-fri".into()),
836 "cron: 0 0 9 * * mon-fri",
837 ),
838 ] {
839 assert_eq!(when.to_string(), expected);
840 }
841 }
842
843 // ---- lowering (#418: when → engine vocabulary) ----
844
845 fn schedule_with(when: When, runs_on: RunsOn) -> Schedule {
846 Schedule {
847 id: "x".into(),
848 when,
849 job_id: "y".into(),
850 plan: FanoutPlan::default(),
851 active: Active::default(),
852 starting_deadline: None,
853 runs_on,
854 enabled: true,
855 }
856 }
857
858 #[test]
859 fn lowering_matches_the_418_table() {
860 let cases = [
861 (
862 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
863 (POLL_CRON, ExecMode::OncePerPc, None),
864 ),
865 (
866 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
867 (POLL_CRON, ExecMode::OncePerPc, Some("6h")),
868 ),
869 (
870 When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
871 (POLL_CRON, ExecMode::OncePerTarget, None),
872 ),
873 (
874 When::PerTarget(PerPolicy::Every(EverySpec {
875 every: "24h".into(),
876 })),
877 (POLL_CRON, ExecMode::OncePerTarget, Some("24h")),
878 ),
879 (
880 When::Cron("0 0 9 * * mon-fri".into()),
881 ("0 0 9 * * mon-fri", ExecMode::EveryTick, None),
882 ),
883 ];
884 for (when, (cron, mode, cooldown)) in cases {
885 let l = schedule_with(when.clone(), RunsOn::Backend).lowered();
886 assert_eq!(l.cron, cron, "cron for {when}");
887 assert_eq!(l.mode, mode, "mode for {when}");
888 assert_eq!(l.cooldown.as_deref(), cooldown, "cooldown for {when}");
889 }
890 }
891
892 #[test]
893 fn poll_cron_is_accepted_by_the_engine_parser() {
894 // POLL_CRON is system-generated — if the engine's parser
895 // ever rejected it every reconcile schedule would die at
896 // register time. Validate it with the same croner config.
897 let s = schedule_with(When::Cron(POLL_CRON.into()), RunsOn::Backend);
898 s.validate().expect("POLL_CRON must be valid");
899 }
900
901 // ---- Schedule::validate() (#418 decision F) ----
902
903 #[test]
904 fn validate_accepts_reconcile_shapes() {
905 for when in [
906 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
907 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
908 When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
909 When::PerTarget(PerPolicy::Every(EverySpec {
910 every: "24h".into(),
911 })),
912 ] {
913 schedule_with(when.clone(), RunsOn::Backend)
914 .validate()
915 .unwrap_or_else(|e| panic!("{when} should validate: {e}"));
916 }
917 }
918
919 #[test]
920 fn validate_accepts_per_pc_on_agent() {
921 schedule_with(
922 When::PerPc(PerPolicy::Every(EverySpec { every: "1h".into() })),
923 RunsOn::Agent,
924 )
925 .validate()
926 .expect("per_pc + agent is the offline-inventory shape");
927 }
928
929 #[test]
930 fn validate_rejects_per_target_on_agent() {
931 let err = schedule_with(
932 When::PerTarget(PerPolicy::Every(EverySpec {
933 every: "24h".into(),
934 })),
935 RunsOn::Agent,
936 )
937 .validate()
938 .unwrap_err();
939 assert!(err.contains("per_target"), "got: {err}");
940 assert!(err.contains("runs_on: agent"), "got: {err}");
941
942 // per_target: once is also backend-only.
943 let err = schedule_with(
944 When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
945 RunsOn::Agent,
946 )
947 .validate()
948 .unwrap_err();
949 assert!(err.contains("per_target"), "got (once): {err}");
950 assert!(err.contains("runs_on: agent"), "got (once): {err}");
951 }
952
953 #[test]
954 fn validate_rejects_bad_every_duration() {
955 let err = schedule_with(
956 When::PerPc(PerPolicy::Every(EverySpec { every: "6x".into() })),
957 RunsOn::Backend,
958 )
959 .validate()
960 .unwrap_err();
961 assert!(err.contains("when.every"), "got: {err}");
962 }
963
964 #[test]
965 fn validate_rejects_bad_jitter_and_starting_deadline() {
966 let mut s = schedule_with(
967 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
968 RunsOn::Backend,
969 );
970 s.plan.jitter = Some("5x".into());
971 let err = s.validate().unwrap_err();
972 assert!(err.contains("jitter"), "got: {err}");
973
974 let mut s = schedule_with(
975 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
976 RunsOn::Backend,
977 );
978 s.starting_deadline = Some("soon".into());
979 let err = s.validate().unwrap_err();
980 assert!(err.contains("starting_deadline"), "got: {err}");
981 }
982
983 #[test]
984 fn validate_rejects_bad_cron() {
985 for bad in ["not a cron", "* * * * *", "99 * * * * *"] {
986 let err = schedule_with(When::Cron(bad.into()), RunsOn::Backend)
987 .validate()
988 .unwrap_err();
989 assert!(err.contains("when.cron"), "for '{bad}', got: {err}");
990 }
991 }
992
993 #[test]
994 fn validate_accepts_engine_cron() {
995 // The morning-greeting expression — the escape hatch's
996 // raison d'être — must pass.
997 schedule_with(When::Cron("0 0 9 * * mon-fri".into()), RunsOn::Backend)
998 .validate()
999 .expect("week-day cron should validate");
1000 }
1001
1002 fn schedule_with_active(from: Option<&str>, until: Option<&str>) -> Schedule {
1003 let mut s = schedule_with(
1004 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1005 RunsOn::Backend,
1006 );
1007 s.active = Active {
1008 from: from.map(str::to_owned),
1009 until: until.map(str::to_owned),
1010 };
1011 s
1012 }
1013
1014 #[test]
1015 fn validate_accepts_active_window() {
1016 schedule_with_active(Some("2026-07-01"), Some("2026-08-01T12:00:00+09:00"))
1017 .validate()
1018 .expect("date + rfc3339 bounds should validate");
1019 }
1020
1021 #[test]
1022 fn validate_rejects_unparseable_active_bound() {
1023 let err = schedule_with_active(Some("July 1st"), None)
1024 .validate()
1025 .unwrap_err();
1026 assert!(err.contains("active"), "got: {err}");
1027 }
1028
1029 #[test]
1030 fn validate_rejects_from_not_before_until() {
1031 let err = schedule_with_active(Some("2026-08-01"), Some("2026-07-01"))
1032 .validate()
1033 .unwrap_err();
1034 assert!(err.contains("strictly before"), "got: {err}");
1035
1036 let err = schedule_with_active(Some("2026-07-01"), Some("2026-07-01"))
1037 .validate()
1038 .unwrap_err();
1039 assert!(err.contains("strictly before"), "got: {err}");
1040 }
1041
1042 // ---- Active window semantics ----
1043
1044 #[test]
1045 fn active_window_is_half_open() {
1046 use chrono::TimeZone;
1047 let active = Active {
1048 from: Some("2026-07-01".into()),
1049 until: Some("2026-08-01".into()),
1050 };
1051 let at = |y, m, d, h| chrono::Utc.with_ymd_and_hms(y, m, d, h, 0, 0).unwrap();
1052 assert!(!active.contains(at(2026, 6, 30, 23)), "before from");
1053 assert!(active.contains(at(2026, 7, 1, 0)), "at from (inclusive)");
1054 assert!(active.contains(at(2026, 7, 15, 12)), "inside");
1055 assert!(!active.contains(at(2026, 8, 1, 0)), "at until (exclusive)");
1056 assert!(!active.contains(at(2026, 8, 2, 0)), "after until");
1057 }
1058
1059 #[test]
1060 fn active_empty_window_is_always_active() {
1061 assert!(Active::default().contains(chrono::Utc::now()));
1062 }
1063
1064 #[test]
1065 fn active_rfc3339_bound_honours_offset() {
1066 use chrono::TimeZone;
1067 let active = Active {
1068 from: Some("2026-07-01T09:00:00+09:00".into()),
1069 until: None,
1070 };
1071 // 09:00 JST = 00:00 UTC.
1072 assert!(
1073 !active.contains(
1074 chrono::Utc
1075 .with_ymd_and_hms(2026, 6, 30, 23, 59, 0)
1076 .unwrap()
1077 )
1078 );
1079 assert!(active.contains(chrono::Utc.with_ymd_and_hms(2026, 7, 1, 0, 0, 0).unwrap()));
1080 }
1081
1082 #[test]
1083 fn active_empty_is_skipped_when_serialising() {
1084 let s = schedule_with(
1085 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1086 RunsOn::Backend,
1087 );
1088 let json = serde_json::to_value(&s).expect("serialise");
1089 assert!(
1090 json.get("active").is_none(),
1091 "empty active must not appear on the wire: {json}"
1092 );
1093 }
1094
1095 #[test]
1096 fn shipped_schedule_configs_parse_and_validate() {
1097 // Every YAML under configs/schedules/ must parse with the
1098 // current Schedule serde AND pass validate() — keeps the
1099 // shipped examples from drifting out of sync with the model
1100 // (#418 removed back-compat, so drift = broken at create).
1101 let dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../../configs/schedules");
1102 let mut seen = 0;
1103 for entry in std::fs::read_dir(&dir).expect("read configs/schedules") {
1104 let path = entry.expect("dir entry").path();
1105 if path.extension().and_then(|e| e.to_str()) != Some("yaml") {
1106 continue;
1107 }
1108 let body = std::fs::read_to_string(&path).expect("read yaml");
1109 let s: Schedule = serde_yaml::from_str(&body)
1110 .unwrap_or_else(|e| panic!("{} failed to parse: {e}", path.display()));
1111 s.validate()
1112 .unwrap_or_else(|e| panic!("{} failed validate(): {e}", path.display()));
1113 seen += 1;
1114 }
1115 assert!(seen > 0, "no schedule YAMLs found in {}", dir.display());
1116 }
1117
1118 // ---- pre-existing enum wire formats (unchanged by #418) ----
1119
1120 #[test]
1121 fn exec_mode_serialises_snake_case() {
1122 for (mode, expected) in [
1123 (ExecMode::EveryTick, "every_tick"),
1124 (ExecMode::OncePerPc, "once_per_pc"),
1125 (ExecMode::OncePerTarget, "once_per_target"),
1126 ] {
1127 let s = serde_json::to_value(mode).expect("serialise");
1128 assert_eq!(s, serde_json::Value::String(expected.into()));
1129 let back: ExecMode = serde_json::from_value(serde_json::Value::String(expected.into()))
1130 .expect("deserialise");
1131 assert_eq!(back, mode, "round-trip for {expected}");
1132 }
1133 }
1134
1135 #[test]
1136 fn schedule_runs_on_defaults_to_backend() {
1137 let yaml = r#"
1138id: x
1139when:
1140 per_pc: once
1141job_id: y
1142target: { all: true }
1143"#;
1144 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
1145 assert_eq!(s.runs_on, RunsOn::Backend);
1146 }
1147
1148 #[test]
1149 fn schedule_runs_on_agent_parses() {
1150 let yaml = r#"
1151id: offline-inv
1152when:
1153 per_pc: { every: 1h }
1154job_id: inventory-hw
1155target: { all: true }
1156runs_on: agent
1157"#;
1158 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
1159 assert_eq!(s.runs_on, RunsOn::Agent);
1160 assert_eq!(s.lowered().mode, ExecMode::OncePerPc);
1161 }
1162
1163 #[test]
1164 fn runs_on_serialises_snake_case() {
1165 for (mode, expected) in [(RunsOn::Backend, "backend"), (RunsOn::Agent, "agent")] {
1166 let s = serde_json::to_value(mode).expect("serialise");
1167 assert_eq!(s, serde_json::Value::String(expected.into()));
1168 let back: RunsOn = serde_json::from_value(serde_json::Value::String(expected.into()))
1169 .expect("deserialise");
1170 assert_eq!(back, mode);
1171 }
1172 }
1173
1174 #[test]
1175 fn execute_shell_into_wire_shell() {
1176 assert_eq!(Shell::from(ExecuteShell::Powershell), Shell::Powershell);
1177 assert_eq!(Shell::from(ExecuteShell::Cmd), Shell::Cmd);
1178 }
1179
1180 #[test]
1181 fn manifest_staleness_defaults_to_cached() {
1182 let yaml = r#"
1183id: x
1184version: 1.0.0
1185execute:
1186 shell: powershell
1187 script: "echo"
1188 timeout: 1s
1189"#;
1190 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1191 assert_eq!(m.staleness, Staleness::Cached);
1192 }
1193
1194 #[test]
1195 fn manifest_strict_staleness_parses() {
1196 let yaml = r#"
1197id: urgent-patch
1198version: 2.5.1
1199execute:
1200 shell: powershell
1201 script: Install-Hotfix
1202 timeout: 5m
1203staleness:
1204 mode: strict
1205 max_cache_age: 0s
1206"#;
1207 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1208 match m.staleness {
1209 Staleness::Strict { max_cache_age } => assert_eq!(max_cache_age, "0s"),
1210 other => panic!("expected strict, got {other:?}"),
1211 }
1212 }
1213
1214 #[test]
1215 fn manifest_unchecked_staleness_parses() {
1216 let yaml = r#"
1217id: legacy
1218version: 0.1.0
1219execute:
1220 shell: cmd
1221 script: "echo"
1222 timeout: 1s
1223staleness:
1224 mode: unchecked
1225"#;
1226 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1227 assert_eq!(m.staleness, Staleness::Unchecked);
1228 }
1229
1230 #[test]
1231 fn missing_required_field_errors() {
1232 // `id` missing.
1233 let yaml = r#"
1234version: 1.0.0
1235target: { all: true }
1236execute:
1237 shell: powershell
1238 script: "echo"
1239 timeout: 1s
1240"#;
1241 let r: Result<Manifest, _> = serde_yaml::from_str(yaml);
1242 assert!(r.is_err(), "expected error, got {:?}", r);
1243 }
1244
1245 #[test]
1246 fn display_field_table_kind_round_trips_with_nested_columns() {
1247 // #39: `type: table` + `columns:` on a DisplayField gets
1248 // round-tripped through serde so the SPA receives the
1249 // nested schema verbatim. Nested columns themselves are
1250 // DisplayFields so they can carry `type: bytes` /
1251 // `type: number` for cell formatting.
1252 let yaml = r#"
1253id: inv-hw
1254version: 1.0.0
1255execute:
1256 shell: powershell
1257 script: "echo"
1258 timeout: 60s
1259inventory:
1260 display:
1261 - field: hostname
1262 label: Hostname
1263 - field: disks
1264 label: Disks
1265 type: table
1266 columns:
1267 - field: device_id
1268 label: Drive
1269 - field: size_bytes
1270 label: Size
1271 type: bytes
1272 - field: free_bytes
1273 label: Free
1274 type: bytes
1275 - field: file_system
1276 label: FS
1277"#;
1278 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1279 let inv = m.inventory.as_ref().expect("inventory hint");
1280 let disks = inv
1281 .display
1282 .iter()
1283 .find(|d| d.field == "disks")
1284 .expect("disks display row");
1285 assert_eq!(disks.kind.as_deref(), Some("table"));
1286 let cols = disks.columns.as_ref().expect("table needs columns");
1287 assert_eq!(cols.len(), 4);
1288 assert_eq!(cols[1].field, "size_bytes");
1289 assert_eq!(cols[1].kind.as_deref(), Some("bytes"));
1290 }
1291
1292 #[test]
1293 fn display_field_scalar_kind_keeps_columns_none() {
1294 // Defensive: when type is a scalar (`bytes` / `number` /
1295 // `timestamp`) the `columns` field stays None — the SPA
1296 // uses its presence as the "render nested table" signal,
1297 // so it must not leak in via serde defaults.
1298 let yaml = r#"
1299id: x
1300version: 1.0.0
1301execute:
1302 shell: powershell
1303 script: "echo"
1304 timeout: 5s
1305inventory:
1306 display:
1307 - { field: ram_bytes, label: RAM, type: bytes }
1308"#;
1309 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1310 let inv = m.inventory.as_ref().unwrap();
1311 assert!(inv.display[0].columns.is_none());
1312 }
1313}
1314
1315/// Periodic schedule (spec §2.4.3). v0.18.0 carries the fanout plan
1316/// (target + optional rollout + optional jitter) inline; the
1317/// referenced job (`job_id` → [`BUCKET_JOBS`]) supplies only the
1318/// script body. Two schedules of the same job can target different
1319/// groups on different cadences without copying the manifest.
1320///
1321/// #418 Phase 1: the cadence is the single [`When`] field. The old
1322/// `cron` × `mode` × `cooldown` × `auto_disable_when_done` quartet
1323/// is gone (no back-compat — pre-Phase-1 KV blobs fail to parse and
1324/// are warn-skipped; re-`schedule create` to upgrade them). The
1325/// engine underneath is unchanged: [`Schedule::lowered`] maps `when`
1326/// onto the same (cron, ExecMode, cooldown) trio the scheduler and
1327/// `decide_fire` always ran on.
1328#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
1329pub struct Schedule {
1330 pub id: String,
1331 /// When to fire — a reconcile cadence (`per_pc` / `per_target`)
1332 /// or the temporary raw-`cron` escape hatch. See [`When`].
1333 ///
1334 /// `singleton_map`: serde_yaml 0.9 renders externally-tagged
1335 /// enums as `!per_pc` YAML tags by default; this keeps the
1336 /// operator-facing map shape (`when: { per_pc: once }`). JSON
1337 /// output is identical either way, and the schemars schema
1338 /// (external tagging = oneOf of single-key objects) already
1339 /// matches the singleton-map wire shape.
1340 #[serde(with = "serde_yaml::with::singleton_map")]
1341 #[schemars(with = "When")]
1342 pub when: When,
1343 /// Key into [`crate::kv::BUCKET_JOBS`]. Must equal a registered
1344 /// Manifest's `id`.
1345 pub job_id: String,
1346 /// Who + how-to-phase + when-to-stagger. The Manifest doesn't
1347 /// carry these any more — same job + different fanout = different
1348 /// schedule.
1349 #[serde(flatten)]
1350 pub plan: FanoutPlan,
1351 /// Optional validity window. Outside `[from, until)` the
1352 /// schedule is dormant — still registered, still visible, but
1353 /// every tick is skipped (deleted ≠ dormant: a campaign that
1354 /// ended stays inspectable and can be re-armed by editing the
1355 /// window). Checked at tick time on both the backend scheduler
1356 /// and the agent's local scheduler.
1357 #[serde(default, skip_serializing_if = "Active::is_empty")]
1358 pub active: Active,
1359 /// v0.22: optional humantime window after a cron tick during
1360 /// which the Command is still considered "live". The scheduler
1361 /// computes `tick_at + starting_deadline` and stamps it onto
1362 /// each Command as `deadline_at`; agents skip Commands they
1363 /// receive after that absolute time. `None` (default) = no
1364 /// deadline, meaning a Command queued in the broker / stream
1365 /// during agent downtime runs whenever the agent reconnects —
1366 /// good for kitting / inventory / cleanup. Set this for
1367 /// time-of-day notifications, lunch reminders, etc., where
1368 /// "fire 3 hours late" would be wrong.
1369 #[serde(default, skip_serializing_if = "Option::is_none")]
1370 pub starting_deadline: Option<String>,
1371 /// v0.23: where does the cron tick happen? `Backend` (default,
1372 /// historical) = backend's scheduler fires Commands via NATS;
1373 /// agents passively receive. `Agent` = each targeted agent runs
1374 /// its own internal cron and fires locally, so the schedule
1375 /// keeps ticking even when the broker is unreachable (laptop on
1376 /// the train, broker maintenance window, full WAN outage). The
1377 /// two locations are mutually exclusive — when `Agent`, the
1378 /// backend scheduler stays out and just keeps the definition in
1379 /// KV for agents to read.
1380 #[serde(default)]
1381 pub runs_on: RunsOn,
1382 #[serde(default = "default_true")]
1383 pub enabled: bool,
1384}
1385
1386/// v0.23 — where the cron tick fires from.
1387#[derive(
1388 Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1389)]
1390#[serde(rename_all = "snake_case")]
1391pub enum RunsOn {
1392 /// Backend's central scheduler ticks and publishes Commands to
1393 /// NATS. Historical default, what every pre-v0.23 schedule
1394 /// uses. Agent offline ⇒ Command queued in STREAM_EXEC; agent
1395 /// reconnects ⇒ catch-up via [`command_replay`](crate)
1396 /// (see kanade-agent's command_replay module).
1397 #[default]
1398 Backend,
1399 /// Each targeted agent runs the cron tick locally. Survives
1400 /// broker / WAN outages. Best for laptops / mobile devices that
1401 /// roam off the corporate network. Agent must be online for the
1402 /// initial schedule + job-catalog pull, but once cached the
1403 /// agent fires the script standalone.
1404 Agent,
1405}
1406
1407/// Per-pc/per-target dedup semantics for a [`Schedule`] (v0.19).
1408#[derive(
1409 Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1410)]
1411#[serde(rename_all = "snake_case")]
1412pub enum ExecMode {
1413 /// Fire on every cron tick at the whole target. Historical
1414 /// (pre-v0.19) behavior; no dedup.
1415 #[default]
1416 EveryTick,
1417 /// Fire at each pc until that pc succeeds; then skip it until
1418 /// the optional cooldown elapses (or forever if no cooldown).
1419 /// Use for kitting / first-boot / per-pc compliance checks.
1420 OncePerPc,
1421 /// Fire at the whole target until **any** pc succeeds; then
1422 /// skip the whole target until the optional cooldown elapses
1423 /// (or forever if no cooldown). Use for "one delegate is
1424 /// enough" tasks like license check-in.
1425 OncePerTarget,
1426}
1427
1428/// #418 Phase 1 — the single "when does this fire" axis.
1429///
1430/// Replaces the old `cron` + `mode` + `cooldown` trio whose
1431/// interactions were implicit (cron doubled as both a real
1432/// time-of-day trigger and a reconcile poll period; contradictory
1433/// combinations silently no-opped). Two shapes:
1434///
1435/// * **reconcile** (`per_pc` / `per_target`) — desired-state: "each
1436/// pc (or one delegate) should have run this within `every`".
1437/// The poll period is system-generated ([`POLL_CRON`], every
1438/// minute) and no longer the operator's concern.
1439/// * **escape hatch** (`cron`) — verbatim 6-field cron, fires at
1440/// the whole target on every tick with no dedup (the old
1441/// `every_tick`). Temporary: exists only to keep time-of-day
1442/// schedules (morning-greeting) alive until the Phase 2
1443/// calendar form (`at` / `days` / `tz`) lands. New reconcile
1444/// work must NOT use it.
1445#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1446#[serde(rename_all = "snake_case")]
1447pub enum When {
1448 /// Fire at each targeted pc: `once` (kitting — succeed once,
1449 /// skip forever, forever catching brand-new / re-imaged pcs)
1450 /// or `{ every: <humantime> }` (patrol — re-arm per pc after
1451 /// the interval).
1452 PerPc(PerPolicy),
1453 /// Fire until **any** one pc of the target succeeds, then skip
1454 /// the whole target (`once`) or re-arm after `every`. Needs
1455 /// fleet-wide completion data, so it is backend-only —
1456 /// `runs_on: agent` + `per_target` is rejected by
1457 /// [`Schedule::validate`].
1458 PerTarget(PerPolicy),
1459 /// Escape hatch: verbatim 6-field cron expression
1460 /// (`sec min hour day month day-of-week`), validated with the
1461 /// same parser `tokio-cron-scheduler` uses. Every tick fires
1462 /// the whole target — no dedup, no cooldown.
1463 Cron(String),
1464}
1465
1466/// `once` vs `{ every: <humantime> }` — shared by `per_pc` /
1467/// `per_target`. Untagged so the YAML stays the bare keyword or a
1468/// one-key map, nothing more ceremonial.
1469#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1470#[serde(untagged)]
1471pub enum PerPolicy {
1472 /// The bare string `once`: succeed once, then skip permanently
1473 /// (cooldown = infinity).
1474 Once(OnceLiteral),
1475 /// Re-arm after the humantime interval, e.g. `{ every: 6h }`.
1476 Every(EverySpec),
1477}
1478
1479/// Single-variant enum so serde accepts exactly the string `once`
1480/// (a free-form `String` would swallow typos like `onec`).
1481#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
1482#[serde(rename_all = "snake_case")]
1483pub enum OnceLiteral {
1484 Once,
1485}
1486
1487/// `{ every: <humantime> }`. Standalone struct (not an inline
1488/// struct variant) so `deny_unknown_fields` still bites under the
1489/// untagged [`PerPolicy`] — `{ evry: 6h }` is a parse error, not a
1490/// silently-ignored key.
1491#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1492#[serde(deny_unknown_fields)]
1493pub struct EverySpec {
1494 /// Humantime interval (`10m`, `6h`, `1d`...). Parsed lazily —
1495 /// [`Schedule::validate`] rejects garbage at create time.
1496 pub every: String,
1497}
1498
1499impl PerPolicy {
1500 /// The cooldown this policy lowers to: `once` = `None`
1501 /// (permanent skip), `every` = the interval.
1502 fn cooldown(&self) -> Option<String> {
1503 match self {
1504 PerPolicy::Once(_) => None,
1505 PerPolicy::Every(EverySpec { every }) => Some(every.clone()),
1506 }
1507 }
1508}
1509
1510impl std::fmt::Display for When {
1511 /// Operator-facing one-liner (`per_pc once` / `per_pc every 6h`
1512 /// / `cron: 0 0 9 * * mon-fri`) for log lines, audit payloads
1513 /// and the API's `ScheduleSummary`.
1514 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1515 let policy = |p: &PerPolicy| match p {
1516 PerPolicy::Once(_) => "once".to_string(),
1517 PerPolicy::Every(EverySpec { every }) => format!("every {every}"),
1518 };
1519 match self {
1520 When::PerPc(p) => write!(f, "per_pc {}", policy(p)),
1521 When::PerTarget(p) => write!(f, "per_target {}", policy(p)),
1522 When::Cron(expr) => write!(f, "cron: {expr}"),
1523 }
1524 }
1525}
1526
1527/// Optional validity window for a [`Schedule`] (#418 decision G).
1528/// Half-open `[from, until)`; either bound may be omitted. Bounds
1529/// are `YYYY-MM-DD` (= that day's 00:00 **UTC**) or full RFC3339
1530/// (use an offset for local-time precision). Kept as strings so the
1531/// JSON Schema the SPA editor consumes stays two plain string
1532/// fields, mirroring how `jitter` / `starting_deadline` are stored.
1533#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default, PartialEq, Eq)]
1534#[serde(deny_unknown_fields)]
1535pub struct Active {
1536 /// Dormant before this instant.
1537 #[serde(default, skip_serializing_if = "Option::is_none")]
1538 pub from: Option<String>,
1539 /// Dormant from this instant on (exclusive).
1540 #[serde(default, skip_serializing_if = "Option::is_none")]
1541 pub until: Option<String>,
1542}
1543
1544impl Active {
1545 /// `skip_serializing_if` helper — an empty window means "always
1546 /// active" and is omitted from the wire format entirely.
1547 pub fn is_empty(&self) -> bool {
1548 self.from.is_none() && self.until.is_none()
1549 }
1550
1551 /// Parse one bound: RFC3339 first, then bare `YYYY-MM-DD`
1552 /// (midnight UTC).
1553 pub fn parse_bound(s: &str) -> Result<chrono::DateTime<chrono::Utc>, String> {
1554 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
1555 return Ok(dt.with_timezone(&chrono::Utc));
1556 }
1557 if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1558 let midnight = d.and_hms_opt(0, 0, 0).expect("00:00:00 is always valid");
1559 return Ok(chrono::DateTime::from_naive_utc_and_offset(
1560 midnight,
1561 chrono::Utc,
1562 ));
1563 }
1564 Err(format!(
1565 "active: unparseable bound '{s}' (want YYYY-MM-DD or RFC3339)"
1566 ))
1567 }
1568
1569 /// Is `now` inside the window? Unparseable bounds are treated
1570 /// as absent here (fail-open) — [`Schedule::validate`] is the
1571 /// place that rejects them loudly; this runs on every tick and
1572 /// must never panic on a stale KV blob.
1573 pub fn contains(&self, now: chrono::DateTime<chrono::Utc>) -> bool {
1574 let bound = |s: &Option<String>| s.as_deref().and_then(|s| Self::parse_bound(s).ok());
1575 if bound(&self.from).is_some_and(|from| now < from) {
1576 return false;
1577 }
1578 if bound(&self.until).is_some_and(|until| now >= until) {
1579 return false;
1580 }
1581 true
1582 }
1583}
1584
1585/// The system-generated poll cadence every reconcile-shaped `when`
1586/// lowers to. Operators never write this: the real inter-run
1587/// spacing is the `every` cooldown; this only bounds "how soon do
1588/// we notice somebody is due" (#418 decision B took the poll
1589/// period away from the operator).
1590pub const POLL_CRON: &str = "0 * * * * *";
1591
1592/// What a [`When`] lowers to — the exact (cron, mode, cooldown)
1593/// trio the pre-#418 engine ran on. Keeping the engine vocabulary
1594/// unchanged is what lets Phase 1 swap the operator surface without
1595/// touching the tick / dedup machinery.
1596pub struct Lowered {
1597 /// 6-field cron handed to `tokio-cron-scheduler` — [`POLL_CRON`]
1598 /// for reconcile shapes, verbatim for the escape hatch.
1599 pub cron: String,
1600 /// Dedup semantics for `decide_fire`.
1601 pub mode: ExecMode,
1602 /// Humantime re-arm interval (`None` = succeed once, skip
1603 /// forever).
1604 pub cooldown: Option<String>,
1605}
1606
1607impl Schedule {
1608 /// Lower the operator-facing `when` onto the engine vocabulary.
1609 /// Single seam shared by the backend scheduler and the agent's
1610 /// local scheduler so the two can never drift.
1611 pub fn lowered(&self) -> Lowered {
1612 match &self.when {
1613 When::PerPc(p) => Lowered {
1614 cron: POLL_CRON.into(),
1615 mode: ExecMode::OncePerPc,
1616 cooldown: p.cooldown(),
1617 },
1618 When::PerTarget(p) => Lowered {
1619 cron: POLL_CRON.into(),
1620 mode: ExecMode::OncePerTarget,
1621 cooldown: p.cooldown(),
1622 },
1623 When::Cron(expr) => Lowered {
1624 cron: expr.clone(),
1625 mode: ExecMode::EveryTick,
1626 cooldown: None,
1627 },
1628 }
1629 }
1630
1631 /// Cross-field semantic checks that don't fit pure serde derive
1632 /// — the [`Manifest::validate`] counterpart (#418 decision F;
1633 /// pre-Phase-1 a broken schedule was accepted at create time
1634 /// and silently warn-skipped at tick time). Run at every create
1635 /// site: `kanade schedule create` (client-side) and
1636 /// `POST /api/schedules`. The job_id-exists check lives in the
1637 /// API handler instead — it needs the JOBS KV.
1638 pub fn validate(&self) -> Result<(), String> {
1639 if matches!(self.runs_on, RunsOn::Agent) && matches!(self.when, When::PerTarget(_)) {
1640 return Err(
1641 "when.per_target needs fleet-wide completion data and is backend-only; \
1642 it cannot be combined with runs_on: agent (each agent self-schedules, \
1643 so per-target dedup would be deduping across a target of 1)"
1644 .into(),
1645 );
1646 }
1647 if let Some(cd) = self.lowered().cooldown.as_deref() {
1648 humantime::parse_duration(cd)
1649 .map_err(|e| format!("when.every: invalid duration '{cd}': {e}"))?;
1650 }
1651 if let When::Cron(expr) = &self.when {
1652 // Same parser configuration tokio-cron-scheduler 0.15
1653 // uses internally (croner, seconds required, DOM-and-DOW
1654 // both honored) — create-time validation can never
1655 // accept what register() would reject.
1656 croner::parser::CronParser::builder()
1657 .seconds(croner::parser::Seconds::Required)
1658 .dom_and_dow(true)
1659 .build()
1660 .parse(expr)
1661 .map_err(|e| format!("when.cron: invalid 6-field cron '{expr}': {e}"))?;
1662 }
1663 // The other humantime strings on the schedule (claude #419
1664 // review): runtime degrades gracefully on both (bad jitter →
1665 // silent no-op, bad starting_deadline → warn + skipped tick),
1666 // but "rejected at create time" should cover every field the
1667 // operator can typo, not just `when`.
1668 if let Some(j) = &self.plan.jitter {
1669 humantime::parse_duration(j)
1670 .map_err(|e| format!("jitter: invalid duration '{j}': {e}"))?;
1671 }
1672 if let Some(sd) = &self.starting_deadline {
1673 humantime::parse_duration(sd)
1674 .map_err(|e| format!("starting_deadline: invalid duration '{sd}': {e}"))?;
1675 }
1676 let from = self
1677 .active
1678 .from
1679 .as_deref()
1680 .map(Active::parse_bound)
1681 .transpose()?;
1682 let until = self
1683 .active
1684 .until
1685 .as_deref()
1686 .map(Active::parse_bound)
1687 .transpose()?;
1688 if let (Some(f), Some(u)) = (from, until) {
1689 if f >= u {
1690 return Err(format!(
1691 "active.from ({}) must be strictly before active.until ({})",
1692 self.active.from.as_deref().unwrap_or_default(),
1693 self.active.until.as_deref().unwrap_or_default(),
1694 ));
1695 }
1696 }
1697 Ok(())
1698 }
1699}
1700
1701fn default_true() -> bool {
1702 true
1703}