Skip to main content

kanade_shared/
manifest.rs

1use serde::{Deserialize, Serialize};
2
3use crate::wire::{RunAs, Shell, Staleness};
4
5/// YAML job manifest (= registered "what to run", v0.18.0+).
6///
7/// Owns only script-intrinsic fields. **Who** (`target`), **how to
8/// phase fanout** (`rollout`), and **when to stagger start**
9/// (`jitter`) all moved to the Schedule / exec request side — same
10/// script can now be fired against different targets / rollouts
11/// without copying the script body.
12///
13/// `deny_unknown_fields` makes operators copy-pasting an older yaml
14/// that still has `target:` / `rollout:` see a clear parse error at
15/// `kanade job create` time instead of mysteriously losing it.
16#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
17#[serde(deny_unknown_fields)]
18pub struct Manifest {
19    pub id: String,
20    pub version: String,
21    #[serde(default)]
22    pub description: Option<String>,
23    pub execute: Execute,
24    #[serde(default)]
25    pub require_approval: bool,
26    /// Opt-in marker that this job produces a JSON inventory fact
27    /// payload on stdout. When present, the backend's results
28    /// projector parses `ExecResult.stdout` as JSON and upserts an
29    /// `inventory_facts` row keyed by `(pc_id, manifest.id)`. The
30    /// `display` sub-config drives the SPA's Inventory page render.
31    #[serde(default)]
32    pub inventory: Option<InventoryHint>,
33    /// Issue #246: opt-in marker that this job emits per-line
34    /// observability events on stdout (one JSON `ObsEvent` per
35    /// newline). When present, the agent — after the script exits
36    /// successfully — parses each non-empty stdout line as an
37    /// `ObsEvent`, publishes it on `obs.<pc_id>` via the
38    /// `obs_outbox`, and (intentionally) **omits the stdout from
39    /// the `ExecResult`** so the timeline data doesn't double up
40    /// in `execution_results.stdout` (which would multiply rows
41    /// by ~50/day/PC of noise).
42    ///
43    /// Distinct from `inventory:` (single JSON object → projector
44    /// upsert) — events are append-only timeline points consumed
45    /// by the dedicated `obs_events` table.
46    #[serde(default)]
47    pub emit: Option<EmitConfig>,
48    /// v0.26: Layer 2 staleness policy (SPEC.md §2.6.2). Controls
49    /// what the agent does at fire time when it can't verify the
50    /// `script_current` / `script_status` KV values are fresh —
51    /// especially relevant for `runs_on: agent` schedules where
52    /// the agent may fire from cache while offline. Defaults to
53    /// `Staleness::Cached` (silently use cached values), which
54    /// matches every pre-v0.26 Manifest.
55    #[serde(default)]
56    pub staleness: Staleness,
57}
58
59/// "Who + how + when-to-stagger" — the fanout-plan side of an exec.
60/// Used both as the POST `/api/exec/{job_id}` body and as the embedded
61/// `target` / `rollout` / `jitter` slot on [`Schedule`]. Centralising
62/// here keeps the validation + serialisation logic in one place.
63#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default)]
64pub struct FanoutPlan {
65    #[serde(default)]
66    pub target: Target,
67    /// Optional wave rollout — when present, the backend publishes
68    /// each wave's group subject on its own delay schedule instead
69    /// of fanning out the `target` block in one go. `target` then
70    /// only labels the deploy for the audit log.
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub rollout: Option<Rollout>,
73    /// Optional humantime jitter; agent uses it to randomise
74    /// execution start. Lives here (not on the script) so different
75    /// schedules / ad-hoc fires of the same job can pick different
76    /// stagger windows.
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub jitter: Option<String>,
79    /// Absolute time the scheduler stamps on each emitted Command
80    /// when this exec was driven by a [`Schedule`] with
81    /// `starting_deadline`. Agents receiving a Command after this
82    /// instant publish a synthetic skipped-result instead of
83    /// running the script. `None` (default) = no deadline / catch
84    /// up whenever delivered. Operators don't usually set this
85    /// directly — the scheduler computes it from `tick_at +
86    /// starting_deadline`.
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub deadline_at: Option<chrono::DateTime<chrono::Utc>>,
89}
90
91/// Manifest sub-section: how the SPA should render the inventory
92/// facts this job produces. Each field name (`field`) is a top-level
93/// key in the stdout JSON, e.g. `hostname`, `ram_gb`.
94///
95/// Two render modes:
96///   * `display` — vertical "field / value" per PC, used by the
97///     `/inventory?pc=<id>` detail view. ALL columns the operator
98///     wants visible on the detail page.
99///   * `summary` — horizontal table across the fleet (row = PC,
100///     column = field) on `/inventory`. Optional; when omitted the
101///     SPA falls back to `display`, but operators usually want a
102///     trimmer "hostname / OS / CPU / RAM" set for the fleet view.
103#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
104pub struct InventoryHint {
105    /// Detail-view columns, in order.
106    pub display: Vec<DisplayField>,
107    /// Optional fleet-list columns (row = PC). Defaults to `display`
108    /// when omitted, but operators usually pick a 3-5 column subset.
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub summary: Option<Vec<DisplayField>>,
111    /// v0.31 / #40: payload arrays that should be exploded into
112    /// per-element rows of a derived SQLite table. Lets operators
113    /// answer cross-PC questions ("which PCs still have Chrome <
114    /// 120?", "C: >90% full") with normal SQL filters + indexes
115    /// instead of grepping JSON. The projector creates the derived
116    /// table on register and replaces this PC's rows on each result
117    /// (DELETE WHERE pc_id=? AND job_id=? + bulk INSERT). See
118    /// [`ExplodeSpec`] for the per-spec schema.
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    pub explode: Option<Vec<ExplodeSpec>>,
121    /// v0.35 / #93: top-level scalar fields whose changes the
122    /// projector logs to `inventory_history` (one event per
123    /// changed field per scan). Pairs with `explode[].track_history`
124    /// — that covers array elements; this covers single-valued
125    /// fields like `ram_bytes` / `os_version` / `cpu_model` /
126    /// `os_build` that operators want to track for "did the RAM
127    /// get upgraded?" / "when did Win 11 land on this PC?" /
128    /// "BIOS / firmware bumped?" questions. Field name = `field_path`
129    /// in the history row, `identity_json` is NULL, `before_json`
130    /// / `after_json` each carry `{"value": <prior or new value>}`.
131    /// First-ever observation of a scalar (no prior facts row)
132    /// emits `added`; subsequent value changes emit `changed`. No
133    /// `removed` events — a scalar disappearing from the payload
134    /// is rare and the operator can still see the last value via
135    /// the `before_json` of the most recent change.
136    #[serde(default, skip_serializing_if = "Option::is_none")]
137    pub history_scalars: Option<Vec<String>>,
138}
139
140/// Issue #246 — `emit:` manifest block for jobs whose stdout is
141/// NDJSON observability events (one `ObsEvent` per line). Parallel
142/// to `inventory:` but for the append-only timeline pipeline; see
143/// `Manifest::emit` for the full contract.
144#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
145#[serde(deny_unknown_fields)]
146pub struct EmitConfig {
147    /// What kind of payload the agent should expect on stdout. Only
148    /// `events` is defined today (parses each non-empty line as
149    /// `ObsEvent` and publishes on `obs.<pc_id>`); future variants
150    /// (e.g. metrics streams, structured trace events) plug in here.
151    #[serde(rename = "type")]
152    pub kind: EmitKind,
153    /// Operator hint for where the script keeps its own state — the
154    /// watermark file the PowerShell / sh body reads + writes
155    /// between runs so it only emits NEW events since the last
156    /// poll. The agent doesn't read this; it's documentation that
157    /// the SPA (and `kanade job edit`) can surface to operators
158    /// reviewing the manifest. Optional; the script is allowed to
159    /// keep state anywhere (registry, env, etc.) — the field's
160    /// presence makes the convention discoverable.
161    #[serde(default, skip_serializing_if = "Option::is_none")]
162    pub watermark_path: Option<String>,
163}
164
165/// `emit.type` enum. Lowercase serde so manifests read
166/// `type: events` rather than `Events`.
167#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
168#[serde(rename_all = "lowercase")]
169pub enum EmitKind {
170    /// Per-line `ObsEvent` JSON. Agent parses + publishes on
171    /// `obs.<pc_id>`, drops the stdout from the resulting
172    /// `ExecResult`.
173    Events,
174}
175
176/// v0.31 / #40: declarative "flatten this JSON array into a real
177/// SQLite table" spec on an inventory manifest. The projector
178/// creates the table on first registration (CREATE TABLE IF NOT
179/// EXISTS + indexes) and writes a row per element of
180/// `payload[field]` on every result, scoped by (pc_id, job_id) so
181/// each PC's rows replace cleanly without a per-PC schema.
182#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
183pub struct ExplodeSpec {
184    /// JSON array key under the payload to explode. E.g. `"apps"`
185    /// for `payload: { apps: [{...}, {...}] }`.
186    pub field: String,
187    /// Derived SQLite table name. Operators choose this — pick
188    /// something namespaced + stable (`inventory_sw_apps`, not
189    /// `apps`) so multiple inventory manifests don't collide on a
190    /// generic name.
191    pub table: String,
192    /// Element-level fields that uniquely identify a row inside one
193    /// PC's payload. The full PK is `(pc_id, job_id) + these
194    /// columns`. Required — operators must think about uniqueness
195    /// (e.g. `["name", "source"]` for installed apps because the
196    /// same name appears in multiple uninstall hives).
197    ///
198    /// v0.31 / #41: same tuple drives history identity. When
199    /// `track_history` is on, the projector serialises these
200    /// fields' values into `inventory_history.identity_json` for
201    /// every change event, so queries like "every PC that ever
202    /// installed Chrome (any source)" filter on identity_json
203    /// content without a per-manifest schema.
204    pub primary_key: Vec<String>,
205    /// Per-element fields that become columns in the derived table.
206    pub columns: Vec<ExplodeColumn>,
207    /// v0.31 / #41: when true (default false), the projector
208    /// diffs each PC's incoming payload against the prior rows
209    /// for the same (pc_id, job_id) BEFORE the DELETE-then-INSERT
210    /// replace, and writes added / removed / changed events into
211    /// `inventory_history`. Lets operators answer time-dimension
212    /// questions ("when did Chrome 120 first appear on PC X?",
213    /// "what's the Win 11 23H2 rollout curve") without storing
214    /// per-scan snapshots. Off by default so operators opt in
215    /// per-spec — history has a real storage cost on long-lived
216    /// deployments (mitigated by the 90-day default retention
217    /// sweeper, see `cleanup` module).
218    #[serde(default)]
219    pub track_history: bool,
220}
221
222/// One column in an [`ExplodeSpec`]'s derived table.
223#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
224pub struct ExplodeColumn {
225    /// JSON key under each array element. Becomes the column name
226    /// in the derived SQLite table — we don't rename.
227    pub field: String,
228    /// SQLite affinity: `"text"` (default), `"integer"`, `"real"`.
229    /// Storage maps directly via `sqlx::query.bind(...)`; type
230    /// mismatches at INSERT-time fail loudly rather than silently
231    /// dropping the row.
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    #[serde(rename = "type")]
234    pub kind: Option<String>,
235    /// When true, the projector creates a `CREATE INDEX` on this
236    /// column at table-creation time. Boost for the common-filter
237    /// columns (`name`, `version`) — operators mark them
238    /// explicitly, the projector won't guess.
239    #[serde(default)]
240    pub index: bool,
241}
242
243#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
244pub struct DisplayField {
245    /// Top-level key in the stdout JSON.
246    pub field: String,
247    /// Human-readable column header.
248    pub label: String,
249    /// Optional render hint — `"number"`, `"bytes"`, `"timestamp"`,
250    /// or `"table"` (#39). Defaults to plain text rendering on the
251    /// SPA side. `"table"` expects the field's value to be a JSON
252    /// array of objects and renders a nested sub-table on the
253    /// per-PC detail page using `columns` as the schema; the fleet
254    /// summary view falls back to showing the row count for
255    /// `"table"` cells so the wide list stays compact.
256    #[serde(default, skip_serializing_if = "Option::is_none")]
257    #[serde(rename = "type")]
258    pub kind: Option<String>,
259    /// v0.30 / #39: when `kind == "table"`, the SPA renders the
260    /// field's value (an array of objects like
261    /// `disks: [{ device_id, size_bytes, ... }]`) as a nested
262    /// sub-table using these columns. Each column is itself a
263    /// `DisplayField`, so the nested cells reuse the same render
264    /// hints (`bytes`, `number`, `timestamp`) — no parallel format
265    /// pipeline. Ignored for any other `kind`.
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub columns: Option<Vec<DisplayField>>,
268}
269
270#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
271pub struct Rollout {
272    #[serde(default)]
273    pub strategy: RolloutStrategy,
274    pub waves: Vec<Wave>,
275}
276
277#[derive(
278    Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
279)]
280#[serde(rename_all = "lowercase")]
281pub enum RolloutStrategy {
282    #[default]
283    Wave,
284}
285
286#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
287pub struct Wave {
288    pub group: String,
289    /// humantime delay measured from the deploy's publish time. wave[0]
290    /// typically has "0s"; subsequent waves use minutes / hours.
291    pub delay: String,
292}
293
294#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default)]
295pub struct Target {
296    #[serde(default)]
297    pub groups: Vec<String>,
298    #[serde(default)]
299    pub pcs: Vec<String>,
300    #[serde(default)]
301    pub all: bool,
302}
303
304impl Target {
305    /// At least one of all / groups / pcs is set.
306    pub fn is_specified(&self) -> bool {
307        self.all || !self.groups.is_empty() || !self.pcs.is_empty()
308    }
309}
310
311#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
312#[serde(deny_unknown_fields)]
313pub struct Execute {
314    pub shell: ExecuteShell,
315    /// Inline script body. Mutually exclusive with [`script_file`]
316    /// and [`script_object`]; exactly one of the three must be set
317    /// (enforced by [`Execute::validate_script_source`] at the
318    /// write-side parse boundaries — `kanade job create` and
319    /// `POST /api/jobs`).
320    ///
321    /// Empty string is treated as **unset** so operators can swap
322    /// to a `script_file:` / `script_object:` alternative just by
323    /// commenting out the body, without having to also drop the
324    /// `script:` key entirely.
325    ///
326    /// [`script_file`]: Self::script_file
327    /// [`script_object`]: Self::script_object
328    #[serde(default, skip_serializing_if = "Option::is_none")]
329    pub script: Option<String>,
330    /// Repo-local file path resolved by the operator-side CLI at
331    /// `kanade job create` time. The CLI reads the file, slots its
332    /// contents into `script`, and clears this field before
333    /// POSTing — so the backend / agents never see `script_file`
334    /// in stored manifests. SPEC §2.4.1.
335    ///
336    /// Resolver lands in a follow-up PR
337    /// (yukimemi/kanade#210); today this field passes parse-time
338    /// validation but the operator-side CLI bails with "not yet
339    /// implemented" until the resolver ships, so manifests that
340    /// reach the backend with `script_file` set are treated as a
341    /// schema-bug.
342    #[serde(default, skip_serializing_if = "Option::is_none")]
343    pub script_file: Option<String>,
344    /// Object Store reference (`<name>/<version>`) into the
345    /// `scripts` bucket (`OBJECT_SCRIPTS`). Agents fetch the body
346    /// at Execute time via `/api/script-objects/{name}/{version}`
347    /// and cache it locally. SPEC §2.4.1.
348    ///
349    /// Resolver lands in the same follow-up PR as `script_file`;
350    /// today this field passes parse-time validation but the
351    /// backend / agent exec paths bail with "not yet implemented"
352    /// when they see it.
353    #[serde(default, skip_serializing_if = "Option::is_none")]
354    pub script_object: Option<String>,
355    /// humantime duration string (e.g. "30s", "10m"). Script-intrinsic
356    /// — represents how long this script reasonably takes to run.
357    pub timeout: String,
358    /// Token + session combination the agent uses to launch the
359    /// script (v0.21). Default = [`RunAs::System`] (Session 0,
360    /// LocalSystem privileges, no GUI) — matches pre-v0.21 behavior.
361    #[serde(default)]
362    pub run_as: RunAs,
363    /// Working directory for the spawned child (v0.21.1). When
364    /// unset, the child inherits the agent's cwd — on Windows that
365    /// means `%SystemRoot%\System32` for the prod service, which is
366    /// almost never what operators actually want. Use an absolute
367    /// path; relative paths are passed through to the OS verbatim.
368    /// `%PROGRAMDATA%` works for `run_as: system`; for `run_as: user`
369    /// you'd want `%USERPROFILE%` (but expansion happens in the
370    /// shell, so write `$env:USERPROFILE` for PowerShell, or set
371    /// it via teravars before `kanade job create`).
372    #[serde(default, skip_serializing_if = "Option::is_none")]
373    pub cwd: Option<String>,
374}
375
376impl Execute {
377    /// Treat an empty `script:` body as "intentionally unset". Operators
378    /// commenting out a block-scalar tend to leave the key behind, and
379    /// failing the validator on `script: ""` would surprise them.
380    fn has_inline_script(&self) -> bool {
381        matches!(&self.script, Some(s) if !s.is_empty())
382    }
383
384    /// Enforce that exactly one of `script` / `script_file` /
385    /// `script_object` is set. Called at the write-side parse
386    /// boundaries (CLI `kanade job create` + backend
387    /// `POST /api/jobs`) so ambiguous YAML is rejected before it
388    /// reaches the JOBS KV. Read paths (projector, agent
389    /// scheduler, list endpoints) skip this check — they only ever
390    /// see what the write path already validated.
391    pub fn validate_script_source(&self) -> Result<(), String> {
392        let inline = self.has_inline_script();
393        let file = self.script_file.is_some();
394        let obj = self.script_object.is_some();
395        let set = [inline, file, obj].into_iter().filter(|b| *b).count();
396        match set {
397            1 => Ok(()),
398            0 => Err("execute: one of `script`, `script_file`, `script_object` must be set".into()),
399            _ => Err(format!(
400                "execute: only one of `script` / `script_file` / `script_object` may be set \
401                 (got script={inline}, script_file={file}, script_object={obj})"
402            )),
403        }
404    }
405}
406
407impl Manifest {
408    /// Cross-field semantic checks that don't fit into pure serde
409    /// derive. Currently delegates to
410    /// [`Execute::validate_script_source`] — see that method's
411    /// docs for the rationale on which call sites should run this.
412    pub fn validate(&self) -> Result<(), String> {
413        self.execute.validate_script_source()?;
414        Ok(())
415    }
416}
417
418#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
419#[serde(rename_all = "lowercase")]
420pub enum ExecuteShell {
421    Powershell,
422    Cmd,
423}
424
425impl From<ExecuteShell> for Shell {
426    fn from(s: ExecuteShell) -> Self {
427        match s {
428            ExecuteShell::Powershell => Shell::Powershell,
429            ExecuteShell::Cmd => Shell::Cmd,
430        }
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    #[test]
439    fn target_is_specified_requires_at_least_one_field() {
440        let empty = Target::default();
441        assert!(!empty.is_specified());
442
443        let with_all = Target {
444            all: true,
445            ..Target::default()
446        };
447        assert!(with_all.is_specified());
448
449        let with_groups = Target {
450            groups: vec!["canary".into()],
451            ..Target::default()
452        };
453        assert!(with_groups.is_specified());
454
455        let with_pcs = Target {
456            pcs: vec!["pc-01".into()],
457            ..Target::default()
458        };
459        assert!(with_pcs.is_specified());
460    }
461
462    #[test]
463    fn manifest_deserialises_minimal_yaml() {
464        // Matches jobs/echo-test.yaml. v0.18: no target/rollout/jitter
465        // — those live on the schedule / exec request now.
466        let yaml = r#"
467id: echo-test
468version: 0.0.1
469execute:
470  shell: powershell
471  script: "echo 'kanade'"
472  timeout: 30s
473"#;
474        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
475        assert_eq!(m.id, "echo-test");
476        assert_eq!(m.version, "0.0.1");
477        assert!(matches!(m.execute.shell, ExecuteShell::Powershell));
478        assert_eq!(
479            m.execute.script.as_deref().map(str::trim),
480            Some("echo 'kanade'")
481        );
482        assert!(m.execute.script_file.is_none());
483        assert!(m.execute.script_object.is_none());
484        assert_eq!(m.execute.timeout, "30s");
485        assert!(!m.require_approval);
486        m.validate()
487            .expect("inline-script manifest passes validation");
488    }
489
490    fn execute_with(
491        script: Option<&str>,
492        script_file: Option<&str>,
493        script_object: Option<&str>,
494    ) -> Execute {
495        Execute {
496            shell: ExecuteShell::Powershell,
497            script: script.map(str::to_owned),
498            script_file: script_file.map(str::to_owned),
499            script_object: script_object.map(str::to_owned),
500            timeout: "30s".into(),
501            run_as: RunAs::default(),
502            cwd: None,
503        }
504    }
505
506    #[test]
507    fn validate_accepts_inline_script() {
508        let e = execute_with(Some("echo hi"), None, None);
509        assert!(e.validate_script_source().is_ok());
510    }
511
512    #[test]
513    fn validate_accepts_script_file_alone() {
514        let e = execute_with(None, Some("scripts/cleanup.ps1"), None);
515        assert!(e.validate_script_source().is_ok());
516    }
517
518    #[test]
519    fn validate_accepts_script_object_alone() {
520        let e = execute_with(None, None, Some("cleanup/1.0.0"));
521        assert!(e.validate_script_source().is_ok());
522    }
523
524    #[test]
525    fn validate_treats_empty_inline_script_as_unset() {
526        // `script: ""` + `script_object` set is the natural shape
527        // when an operator comments out the YAML block-scalar body
528        // but leaves the key. Should pass.
529        let e = execute_with(Some(""), None, Some("cleanup/1.0.0"));
530        assert!(e.validate_script_source().is_ok());
531    }
532
533    #[test]
534    fn validate_rejects_zero_sources() {
535        let e = execute_with(None, None, None);
536        let err = e.validate_script_source().unwrap_err();
537        assert!(err.contains("must be set"), "got: {err}");
538    }
539
540    #[test]
541    fn validate_rejects_empty_inline_only() {
542        let e = execute_with(Some(""), None, None);
543        let err = e.validate_script_source().unwrap_err();
544        assert!(err.contains("must be set"), "got: {err}");
545    }
546
547    #[test]
548    fn validate_rejects_inline_plus_file() {
549        let e = execute_with(Some("echo hi"), Some("scripts/cleanup.ps1"), None);
550        let err = e.validate_script_source().unwrap_err();
551        assert!(err.contains("only one of"), "got: {err}");
552    }
553
554    #[test]
555    fn validate_rejects_inline_plus_object() {
556        let e = execute_with(Some("echo hi"), None, Some("cleanup/1.0.0"));
557        let err = e.validate_script_source().unwrap_err();
558        assert!(err.contains("only one of"), "got: {err}");
559    }
560
561    #[test]
562    fn validate_rejects_file_plus_object() {
563        let e = execute_with(None, Some("scripts/cleanup.ps1"), Some("cleanup/1.0.0"));
564        let err = e.validate_script_source().unwrap_err();
565        assert!(err.contains("only one of"), "got: {err}");
566    }
567
568    #[test]
569    fn validate_rejects_all_three() {
570        let e = execute_with(
571            Some("echo hi"),
572            Some("scripts/cleanup.ps1"),
573            Some("cleanup/1.0.0"),
574        );
575        let err = e.validate_script_source().unwrap_err();
576        assert!(err.contains("only one of"), "got: {err}");
577    }
578
579    #[test]
580    fn manifest_deserialises_script_object_yaml() {
581        // SPEC §2.4.1 example shape with the Object Store
582        // reference picked over inline.
583        let yaml = r#"
584id: cleanup-disk-temp
585version: 1.0.1
586execute:
587  shell: powershell
588  script_object: cleanup-disk-temp/1.0.1
589  timeout: 600s
590"#;
591        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
592        assert_eq!(
593            m.execute.script_object.as_deref(),
594            Some("cleanup-disk-temp/1.0.1")
595        );
596        assert!(m.execute.script.is_none());
597        m.validate()
598            .expect("script_object-only manifest passes validation");
599    }
600
601    #[test]
602    fn manifest_rejects_typo_in_script_field_name() {
603        // `deny_unknown_fields` on Execute catches `script_objectt`
604        // and similar fat-fingers at parse time instead of letting
605        // them silently fall through to "all three unset".
606        let yaml = r#"
607id: typo
608version: 1.0.0
609execute:
610  shell: powershell
611  script_objectt: oops
612  timeout: 30s
613"#;
614        let r: Result<Manifest, _> = serde_yaml::from_str(yaml);
615        assert!(r.is_err(), "expected parse error, got {r:?}");
616    }
617
618    #[test]
619    fn schedule_carries_target_and_rollout() {
620        let yaml = r#"
621id: hourly-cleanup-canary
622when:
623  per_pc: { every: 1h }
624job_id: cleanup
625enabled: true
626target:
627  groups: [canary, wave1]
628jitter: 30s
629rollout:
630  strategy: wave
631  waves:
632    - { group: canary, delay: 0s }
633    - { group: wave1,  delay: 5s }
634"#;
635        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
636        assert_eq!(s.id, "hourly-cleanup-canary");
637        assert_eq!(s.job_id, "cleanup");
638        assert_eq!(s.plan.target.groups, vec!["canary", "wave1"]);
639        assert_eq!(s.plan.jitter.as_deref(), Some("30s"));
640        let rollout = s.plan.rollout.expect("rollout present");
641        assert_eq!(rollout.waves.len(), 2);
642        assert_eq!(rollout.waves[0].group, "canary");
643        assert_eq!(rollout.waves[1].delay, "5s");
644        assert_eq!(rollout.strategy, RolloutStrategy::Wave);
645    }
646
647    #[test]
648    fn schedule_minimal_target_all() {
649        let yaml = r#"
650id: kitting
651when:
652  per_pc: once
653enabled: true
654job_id: scheduled-echo
655target: { all: true }
656"#;
657        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
658        assert_eq!(s.id, "kitting");
659        assert_eq!(s.when, When::PerPc(PerPolicy::Once(OnceLiteral::Once)));
660        assert!(s.enabled);
661        assert_eq!(s.job_id, "scheduled-echo");
662        assert!(s.plan.target.all);
663        assert!(s.plan.rollout.is_none());
664        assert!(s.plan.jitter.is_none());
665        assert!(s.active.is_empty());
666    }
667
668    #[test]
669    fn schedule_enabled_defaults_to_true() {
670        let yaml = r#"
671id: x
672when:
673  per_pc: once
674job_id: y
675target: { all: true }
676"#;
677        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
678        assert!(s.enabled);
679    }
680
681    // ---- `when` parsing (#418 Phase 1) ----
682
683    fn schedule_yaml_with(when_block: &str) -> String {
684        format!(
685            r#"
686id: x
687when:
688{when_block}
689job_id: y
690target: {{ all: true }}
691"#
692        )
693    }
694
695    #[test]
696    fn when_per_pc_every_parses_unquoted_humantime() {
697        // `6h` is digit-led but non-numeric → YAML string, same as
698        // the old `cooldown: 6h` convention. No quotes needed.
699        let s: Schedule =
700            serde_yaml::from_str(&schedule_yaml_with("  per_pc: { every: 6h }")).expect("parse");
701        assert_eq!(
702            s.when,
703            When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() }))
704        );
705    }
706
707    #[test]
708    fn when_per_target_every_parses() {
709        let s: Schedule = serde_yaml::from_str(&schedule_yaml_with("  per_target: { every: 24h }"))
710            .expect("parse");
711        assert_eq!(
712            s.when,
713            When::PerTarget(PerPolicy::Every(EverySpec {
714                every: "24h".into()
715            }))
716        );
717    }
718
719    #[test]
720    fn when_per_target_once_parses() {
721        // Falls out of the shared PerPolicy shape and decide_fire
722        // already implements it ("any one pc succeeds → skip the
723        // target forever"), so it is allowed, not rejected.
724        let s: Schedule =
725            serde_yaml::from_str(&schedule_yaml_with("  per_target: once")).expect("parse");
726        assert_eq!(s.when, When::PerTarget(PerPolicy::Once(OnceLiteral::Once)));
727    }
728
729    #[test]
730    fn when_cron_escape_hatch_parses() {
731        let s: Schedule =
732            serde_yaml::from_str(&schedule_yaml_with("  cron: \"0 0 9 * * mon-fri\""))
733                .expect("parse");
734        assert_eq!(s.when, When::Cron("0 0 9 * * mon-fri".into()));
735    }
736
737    #[test]
738    fn when_rejects_bad_once_keyword() {
739        // `onec` must be a parse error, not a silently-absorbed
740        // string (OnceLiteral is a single-variant enum for exactly
741        // this reason).
742        let r: Result<Schedule, _> = serde_yaml::from_str(&schedule_yaml_with("  per_pc: onec"));
743        assert!(r.is_err(), "expected parse error, got {r:?}");
744    }
745
746    #[test]
747    fn when_rejects_unknown_key_in_every() {
748        // EverySpec is deny_unknown_fields so `evry:` typos fail
749        // even under the untagged PerPolicy.
750        let r: Result<Schedule, _> =
751            serde_yaml::from_str(&schedule_yaml_with("  per_pc: { evry: 6h }"));
752        assert!(r.is_err(), "expected parse error, got {r:?}");
753    }
754
755    #[test]
756    fn when_rejects_unknown_variant() {
757        let r: Result<Schedule, _> =
758            serde_yaml::from_str(&schedule_yaml_with("  per_galaxy: once"));
759        assert!(r.is_err(), "expected parse error, got {r:?}");
760    }
761
762    #[test]
763    fn when_rejects_old_top_level_cron_field() {
764        // Pre-#418 shape: top-level `cron:` + no `when:`. Must fail
765        // loudly (missing `when`), which is what turns stale KV
766        // blobs into warn-skips after the upgrade.
767        let yaml = r#"
768id: x
769cron: "* * * * * *"
770job_id: y
771target: { all: true }
772"#;
773        let r: Result<Schedule, _> = serde_yaml::from_str(yaml);
774        assert!(r.is_err(), "expected parse error, got {r:?}");
775    }
776
777    #[test]
778    fn when_round_trips_json_and_yaml() {
779        // Round-trip through the full Schedule: that is the wire
780        // unit for both stores (JSON catalog KV + YAML mirror), and
781        // it exercises the singleton_map field attribute that keeps
782        // serde_yaml on the map shape instead of `!per_pc` tags.
783        for when in [
784            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
785            When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
786            When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
787            When::PerTarget(PerPolicy::Every(EverySpec {
788                every: "24h".into(),
789            })),
790            When::Cron("0 0 9 * * mon-fri".into()),
791        ] {
792            let s = schedule_with(when.clone(), RunsOn::Backend);
793
794            let json = serde_json::to_string(&s).expect("json serialise");
795            let back: Schedule = serde_json::from_str(&json).expect("json deserialise");
796            assert_eq!(back.when, when, "json round-trip for {when}");
797
798            let yaml = serde_yaml::to_string(&s).expect("yaml serialise");
799            assert!(
800                !yaml.contains('!'),
801                "yaml must use the map shape, not tags: {yaml}"
802            );
803            let back: Schedule = serde_yaml::from_str(&yaml).expect("yaml deserialise");
804            assert_eq!(back.when, when, "yaml round-trip for {when}");
805        }
806    }
807
808    #[test]
809    fn when_once_serialises_as_bare_keyword() {
810        // The wire shape operators see in the YAML mirror must stay
811        // the ergonomic `per_pc: once`, not a one-variant map.
812        let json = serde_json::to_value(When::PerPc(PerPolicy::Once(OnceLiteral::Once)))
813            .expect("serialise");
814        assert_eq!(json, serde_json::json!({ "per_pc": "once" }));
815    }
816
817    #[test]
818    fn when_displays_operator_summary() {
819        for (when, expected) in [
820            (
821                When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
822                "per_pc once",
823            ),
824            (
825                When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
826                "per_pc every 6h",
827            ),
828            (
829                When::PerTarget(PerPolicy::Every(EverySpec {
830                    every: "24h".into(),
831                })),
832                "per_target every 24h",
833            ),
834            (
835                When::Cron("0 0 9 * * mon-fri".into()),
836                "cron: 0 0 9 * * mon-fri",
837            ),
838        ] {
839            assert_eq!(when.to_string(), expected);
840        }
841    }
842
843    // ---- lowering (#418: when → engine vocabulary) ----
844
845    fn schedule_with(when: When, runs_on: RunsOn) -> Schedule {
846        Schedule {
847            id: "x".into(),
848            when,
849            job_id: "y".into(),
850            plan: FanoutPlan::default(),
851            active: Active::default(),
852            starting_deadline: None,
853            runs_on,
854            enabled: true,
855        }
856    }
857
858    #[test]
859    fn lowering_matches_the_418_table() {
860        let cases = [
861            (
862                When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
863                (POLL_CRON, ExecMode::OncePerPc, None),
864            ),
865            (
866                When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
867                (POLL_CRON, ExecMode::OncePerPc, Some("6h")),
868            ),
869            (
870                When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
871                (POLL_CRON, ExecMode::OncePerTarget, None),
872            ),
873            (
874                When::PerTarget(PerPolicy::Every(EverySpec {
875                    every: "24h".into(),
876                })),
877                (POLL_CRON, ExecMode::OncePerTarget, Some("24h")),
878            ),
879            (
880                When::Cron("0 0 9 * * mon-fri".into()),
881                ("0 0 9 * * mon-fri", ExecMode::EveryTick, None),
882            ),
883        ];
884        for (when, (cron, mode, cooldown)) in cases {
885            let l = schedule_with(when.clone(), RunsOn::Backend).lowered();
886            assert_eq!(l.cron, cron, "cron for {when}");
887            assert_eq!(l.mode, mode, "mode for {when}");
888            assert_eq!(l.cooldown.as_deref(), cooldown, "cooldown for {when}");
889        }
890    }
891
892    #[test]
893    fn poll_cron_is_accepted_by_the_engine_parser() {
894        // POLL_CRON is system-generated — if the engine's parser
895        // ever rejected it every reconcile schedule would die at
896        // register time. Validate it with the same croner config.
897        let s = schedule_with(When::Cron(POLL_CRON.into()), RunsOn::Backend);
898        s.validate().expect("POLL_CRON must be valid");
899    }
900
901    // ---- Schedule::validate() (#418 decision F) ----
902
903    #[test]
904    fn validate_accepts_reconcile_shapes() {
905        for when in [
906            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
907            When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
908            When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
909            When::PerTarget(PerPolicy::Every(EverySpec {
910                every: "24h".into(),
911            })),
912        ] {
913            schedule_with(when.clone(), RunsOn::Backend)
914                .validate()
915                .unwrap_or_else(|e| panic!("{when} should validate: {e}"));
916        }
917    }
918
919    #[test]
920    fn validate_accepts_per_pc_on_agent() {
921        schedule_with(
922            When::PerPc(PerPolicy::Every(EverySpec { every: "1h".into() })),
923            RunsOn::Agent,
924        )
925        .validate()
926        .expect("per_pc + agent is the offline-inventory shape");
927    }
928
929    #[test]
930    fn validate_rejects_per_target_on_agent() {
931        let err = schedule_with(
932            When::PerTarget(PerPolicy::Every(EverySpec {
933                every: "24h".into(),
934            })),
935            RunsOn::Agent,
936        )
937        .validate()
938        .unwrap_err();
939        assert!(err.contains("per_target"), "got: {err}");
940        assert!(err.contains("runs_on: agent"), "got: {err}");
941
942        // per_target: once is also backend-only.
943        let err = schedule_with(
944            When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
945            RunsOn::Agent,
946        )
947        .validate()
948        .unwrap_err();
949        assert!(err.contains("per_target"), "got (once): {err}");
950        assert!(err.contains("runs_on: agent"), "got (once): {err}");
951    }
952
953    #[test]
954    fn validate_rejects_bad_every_duration() {
955        let err = schedule_with(
956            When::PerPc(PerPolicy::Every(EverySpec { every: "6x".into() })),
957            RunsOn::Backend,
958        )
959        .validate()
960        .unwrap_err();
961        assert!(err.contains("when.every"), "got: {err}");
962    }
963
964    #[test]
965    fn validate_rejects_bad_jitter_and_starting_deadline() {
966        let mut s = schedule_with(
967            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
968            RunsOn::Backend,
969        );
970        s.plan.jitter = Some("5x".into());
971        let err = s.validate().unwrap_err();
972        assert!(err.contains("jitter"), "got: {err}");
973
974        let mut s = schedule_with(
975            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
976            RunsOn::Backend,
977        );
978        s.starting_deadline = Some("soon".into());
979        let err = s.validate().unwrap_err();
980        assert!(err.contains("starting_deadline"), "got: {err}");
981    }
982
983    #[test]
984    fn validate_rejects_bad_cron() {
985        for bad in ["not a cron", "* * * * *", "99 * * * * *"] {
986            let err = schedule_with(When::Cron(bad.into()), RunsOn::Backend)
987                .validate()
988                .unwrap_err();
989            assert!(err.contains("when.cron"), "for '{bad}', got: {err}");
990        }
991    }
992
993    #[test]
994    fn validate_accepts_engine_cron() {
995        // The morning-greeting expression — the escape hatch's
996        // raison d'être — must pass.
997        schedule_with(When::Cron("0 0 9 * * mon-fri".into()), RunsOn::Backend)
998            .validate()
999            .expect("week-day cron should validate");
1000    }
1001
1002    fn schedule_with_active(from: Option<&str>, until: Option<&str>) -> Schedule {
1003        let mut s = schedule_with(
1004            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1005            RunsOn::Backend,
1006        );
1007        s.active = Active {
1008            from: from.map(str::to_owned),
1009            until: until.map(str::to_owned),
1010        };
1011        s
1012    }
1013
1014    #[test]
1015    fn validate_accepts_active_window() {
1016        schedule_with_active(Some("2026-07-01"), Some("2026-08-01T12:00:00+09:00"))
1017            .validate()
1018            .expect("date + rfc3339 bounds should validate");
1019    }
1020
1021    #[test]
1022    fn validate_rejects_unparseable_active_bound() {
1023        let err = schedule_with_active(Some("July 1st"), None)
1024            .validate()
1025            .unwrap_err();
1026        assert!(err.contains("active"), "got: {err}");
1027    }
1028
1029    #[test]
1030    fn validate_rejects_from_not_before_until() {
1031        let err = schedule_with_active(Some("2026-08-01"), Some("2026-07-01"))
1032            .validate()
1033            .unwrap_err();
1034        assert!(err.contains("strictly before"), "got: {err}");
1035
1036        let err = schedule_with_active(Some("2026-07-01"), Some("2026-07-01"))
1037            .validate()
1038            .unwrap_err();
1039        assert!(err.contains("strictly before"), "got: {err}");
1040    }
1041
1042    // ---- Active window semantics ----
1043
1044    #[test]
1045    fn active_window_is_half_open() {
1046        use chrono::TimeZone;
1047        let active = Active {
1048            from: Some("2026-07-01".into()),
1049            until: Some("2026-08-01".into()),
1050        };
1051        let at = |y, m, d, h| chrono::Utc.with_ymd_and_hms(y, m, d, h, 0, 0).unwrap();
1052        assert!(!active.contains(at(2026, 6, 30, 23)), "before from");
1053        assert!(active.contains(at(2026, 7, 1, 0)), "at from (inclusive)");
1054        assert!(active.contains(at(2026, 7, 15, 12)), "inside");
1055        assert!(!active.contains(at(2026, 8, 1, 0)), "at until (exclusive)");
1056        assert!(!active.contains(at(2026, 8, 2, 0)), "after until");
1057    }
1058
1059    #[test]
1060    fn active_empty_window_is_always_active() {
1061        assert!(Active::default().contains(chrono::Utc::now()));
1062    }
1063
1064    #[test]
1065    fn active_rfc3339_bound_honours_offset() {
1066        use chrono::TimeZone;
1067        let active = Active {
1068            from: Some("2026-07-01T09:00:00+09:00".into()),
1069            until: None,
1070        };
1071        // 09:00 JST = 00:00 UTC.
1072        assert!(
1073            !active.contains(
1074                chrono::Utc
1075                    .with_ymd_and_hms(2026, 6, 30, 23, 59, 0)
1076                    .unwrap()
1077            )
1078        );
1079        assert!(active.contains(chrono::Utc.with_ymd_and_hms(2026, 7, 1, 0, 0, 0).unwrap()));
1080    }
1081
1082    #[test]
1083    fn active_empty_is_skipped_when_serialising() {
1084        let s = schedule_with(
1085            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1086            RunsOn::Backend,
1087        );
1088        let json = serde_json::to_value(&s).expect("serialise");
1089        assert!(
1090            json.get("active").is_none(),
1091            "empty active must not appear on the wire: {json}"
1092        );
1093    }
1094
1095    #[test]
1096    fn shipped_schedule_configs_parse_and_validate() {
1097        // Every YAML under configs/schedules/ must parse with the
1098        // current Schedule serde AND pass validate() — keeps the
1099        // shipped examples from drifting out of sync with the model
1100        // (#418 removed back-compat, so drift = broken at create).
1101        let dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../../configs/schedules");
1102        let mut seen = 0;
1103        for entry in std::fs::read_dir(&dir).expect("read configs/schedules") {
1104            let path = entry.expect("dir entry").path();
1105            if path.extension().and_then(|e| e.to_str()) != Some("yaml") {
1106                continue;
1107            }
1108            let body = std::fs::read_to_string(&path).expect("read yaml");
1109            let s: Schedule = serde_yaml::from_str(&body)
1110                .unwrap_or_else(|e| panic!("{} failed to parse: {e}", path.display()));
1111            s.validate()
1112                .unwrap_or_else(|e| panic!("{} failed validate(): {e}", path.display()));
1113            seen += 1;
1114        }
1115        assert!(seen > 0, "no schedule YAMLs found in {}", dir.display());
1116    }
1117
1118    // ---- pre-existing enum wire formats (unchanged by #418) ----
1119
1120    #[test]
1121    fn exec_mode_serialises_snake_case() {
1122        for (mode, expected) in [
1123            (ExecMode::EveryTick, "every_tick"),
1124            (ExecMode::OncePerPc, "once_per_pc"),
1125            (ExecMode::OncePerTarget, "once_per_target"),
1126        ] {
1127            let s = serde_json::to_value(mode).expect("serialise");
1128            assert_eq!(s, serde_json::Value::String(expected.into()));
1129            let back: ExecMode = serde_json::from_value(serde_json::Value::String(expected.into()))
1130                .expect("deserialise");
1131            assert_eq!(back, mode, "round-trip for {expected}");
1132        }
1133    }
1134
1135    #[test]
1136    fn schedule_runs_on_defaults_to_backend() {
1137        let yaml = r#"
1138id: x
1139when:
1140  per_pc: once
1141job_id: y
1142target: { all: true }
1143"#;
1144        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
1145        assert_eq!(s.runs_on, RunsOn::Backend);
1146    }
1147
1148    #[test]
1149    fn schedule_runs_on_agent_parses() {
1150        let yaml = r#"
1151id: offline-inv
1152when:
1153  per_pc: { every: 1h }
1154job_id: inventory-hw
1155target: { all: true }
1156runs_on: agent
1157"#;
1158        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
1159        assert_eq!(s.runs_on, RunsOn::Agent);
1160        assert_eq!(s.lowered().mode, ExecMode::OncePerPc);
1161    }
1162
1163    #[test]
1164    fn runs_on_serialises_snake_case() {
1165        for (mode, expected) in [(RunsOn::Backend, "backend"), (RunsOn::Agent, "agent")] {
1166            let s = serde_json::to_value(mode).expect("serialise");
1167            assert_eq!(s, serde_json::Value::String(expected.into()));
1168            let back: RunsOn = serde_json::from_value(serde_json::Value::String(expected.into()))
1169                .expect("deserialise");
1170            assert_eq!(back, mode);
1171        }
1172    }
1173
1174    #[test]
1175    fn execute_shell_into_wire_shell() {
1176        assert_eq!(Shell::from(ExecuteShell::Powershell), Shell::Powershell);
1177        assert_eq!(Shell::from(ExecuteShell::Cmd), Shell::Cmd);
1178    }
1179
1180    #[test]
1181    fn manifest_staleness_defaults_to_cached() {
1182        let yaml = r#"
1183id: x
1184version: 1.0.0
1185execute:
1186  shell: powershell
1187  script: "echo"
1188  timeout: 1s
1189"#;
1190        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1191        assert_eq!(m.staleness, Staleness::Cached);
1192    }
1193
1194    #[test]
1195    fn manifest_strict_staleness_parses() {
1196        let yaml = r#"
1197id: urgent-patch
1198version: 2.5.1
1199execute:
1200  shell: powershell
1201  script: Install-Hotfix
1202  timeout: 5m
1203staleness:
1204  mode: strict
1205  max_cache_age: 0s
1206"#;
1207        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1208        match m.staleness {
1209            Staleness::Strict { max_cache_age } => assert_eq!(max_cache_age, "0s"),
1210            other => panic!("expected strict, got {other:?}"),
1211        }
1212    }
1213
1214    #[test]
1215    fn manifest_unchecked_staleness_parses() {
1216        let yaml = r#"
1217id: legacy
1218version: 0.1.0
1219execute:
1220  shell: cmd
1221  script: "echo"
1222  timeout: 1s
1223staleness:
1224  mode: unchecked
1225"#;
1226        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1227        assert_eq!(m.staleness, Staleness::Unchecked);
1228    }
1229
1230    #[test]
1231    fn missing_required_field_errors() {
1232        // `id` missing.
1233        let yaml = r#"
1234version: 1.0.0
1235target: { all: true }
1236execute:
1237  shell: powershell
1238  script: "echo"
1239  timeout: 1s
1240"#;
1241        let r: Result<Manifest, _> = serde_yaml::from_str(yaml);
1242        assert!(r.is_err(), "expected error, got {:?}", r);
1243    }
1244
1245    #[test]
1246    fn display_field_table_kind_round_trips_with_nested_columns() {
1247        // #39: `type: table` + `columns:` on a DisplayField gets
1248        // round-tripped through serde so the SPA receives the
1249        // nested schema verbatim. Nested columns themselves are
1250        // DisplayFields so they can carry `type: bytes` /
1251        // `type: number` for cell formatting.
1252        let yaml = r#"
1253id: inv-hw
1254version: 1.0.0
1255execute:
1256  shell: powershell
1257  script: "echo"
1258  timeout: 60s
1259inventory:
1260  display:
1261    - field: hostname
1262      label: Hostname
1263    - field: disks
1264      label: Disks
1265      type: table
1266      columns:
1267        - field: device_id
1268          label: Drive
1269        - field: size_bytes
1270          label: Size
1271          type: bytes
1272        - field: free_bytes
1273          label: Free
1274          type: bytes
1275        - field: file_system
1276          label: FS
1277"#;
1278        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1279        let inv = m.inventory.as_ref().expect("inventory hint");
1280        let disks = inv
1281            .display
1282            .iter()
1283            .find(|d| d.field == "disks")
1284            .expect("disks display row");
1285        assert_eq!(disks.kind.as_deref(), Some("table"));
1286        let cols = disks.columns.as_ref().expect("table needs columns");
1287        assert_eq!(cols.len(), 4);
1288        assert_eq!(cols[1].field, "size_bytes");
1289        assert_eq!(cols[1].kind.as_deref(), Some("bytes"));
1290    }
1291
1292    #[test]
1293    fn display_field_scalar_kind_keeps_columns_none() {
1294        // Defensive: when type is a scalar (`bytes` / `number` /
1295        // `timestamp`) the `columns` field stays None — the SPA
1296        // uses its presence as the "render nested table" signal,
1297        // so it must not leak in via serde defaults.
1298        let yaml = r#"
1299id: x
1300version: 1.0.0
1301execute:
1302  shell: powershell
1303  script: "echo"
1304  timeout: 5s
1305inventory:
1306  display:
1307    - { field: ram_bytes, label: RAM, type: bytes }
1308"#;
1309        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1310        let inv = m.inventory.as_ref().unwrap();
1311        assert!(inv.display[0].columns.is_none());
1312    }
1313}
1314
1315/// Periodic schedule (spec §2.4.3). v0.18.0 carries the fanout plan
1316/// (target + optional rollout + optional jitter) inline; the
1317/// referenced job (`job_id` → [`BUCKET_JOBS`]) supplies only the
1318/// script body. Two schedules of the same job can target different
1319/// groups on different cadences without copying the manifest.
1320///
1321/// #418 Phase 1: the cadence is the single [`When`] field. The old
1322/// `cron` × `mode` × `cooldown` × `auto_disable_when_done` quartet
1323/// is gone (no back-compat — pre-Phase-1 KV blobs fail to parse and
1324/// are warn-skipped; re-`schedule create` to upgrade them). The
1325/// engine underneath is unchanged: [`Schedule::lowered`] maps `when`
1326/// onto the same (cron, ExecMode, cooldown) trio the scheduler and
1327/// `decide_fire` always ran on.
1328#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
1329pub struct Schedule {
1330    pub id: String,
1331    /// When to fire — a reconcile cadence (`per_pc` / `per_target`)
1332    /// or the temporary raw-`cron` escape hatch. See [`When`].
1333    ///
1334    /// `singleton_map`: serde_yaml 0.9 renders externally-tagged
1335    /// enums as `!per_pc` YAML tags by default; this keeps the
1336    /// operator-facing map shape (`when: { per_pc: once }`). JSON
1337    /// output is identical either way, and the schemars schema
1338    /// (external tagging = oneOf of single-key objects) already
1339    /// matches the singleton-map wire shape.
1340    #[serde(with = "serde_yaml::with::singleton_map")]
1341    #[schemars(with = "When")]
1342    pub when: When,
1343    /// Key into [`crate::kv::BUCKET_JOBS`]. Must equal a registered
1344    /// Manifest's `id`.
1345    pub job_id: String,
1346    /// Who + how-to-phase + when-to-stagger. The Manifest doesn't
1347    /// carry these any more — same job + different fanout = different
1348    /// schedule.
1349    #[serde(flatten)]
1350    pub plan: FanoutPlan,
1351    /// Optional validity window. Outside `[from, until)` the
1352    /// schedule is dormant — still registered, still visible, but
1353    /// every tick is skipped (deleted ≠ dormant: a campaign that
1354    /// ended stays inspectable and can be re-armed by editing the
1355    /// window). Checked at tick time on both the backend scheduler
1356    /// and the agent's local scheduler.
1357    #[serde(default, skip_serializing_if = "Active::is_empty")]
1358    pub active: Active,
1359    /// v0.22: optional humantime window after a cron tick during
1360    /// which the Command is still considered "live". The scheduler
1361    /// computes `tick_at + starting_deadline` and stamps it onto
1362    /// each Command as `deadline_at`; agents skip Commands they
1363    /// receive after that absolute time. `None` (default) = no
1364    /// deadline, meaning a Command queued in the broker / stream
1365    /// during agent downtime runs whenever the agent reconnects —
1366    /// good for kitting / inventory / cleanup. Set this for
1367    /// time-of-day notifications, lunch reminders, etc., where
1368    /// "fire 3 hours late" would be wrong.
1369    #[serde(default, skip_serializing_if = "Option::is_none")]
1370    pub starting_deadline: Option<String>,
1371    /// v0.23: where does the cron tick happen? `Backend` (default,
1372    /// historical) = backend's scheduler fires Commands via NATS;
1373    /// agents passively receive. `Agent` = each targeted agent runs
1374    /// its own internal cron and fires locally, so the schedule
1375    /// keeps ticking even when the broker is unreachable (laptop on
1376    /// the train, broker maintenance window, full WAN outage). The
1377    /// two locations are mutually exclusive — when `Agent`, the
1378    /// backend scheduler stays out and just keeps the definition in
1379    /// KV for agents to read.
1380    #[serde(default)]
1381    pub runs_on: RunsOn,
1382    #[serde(default = "default_true")]
1383    pub enabled: bool,
1384}
1385
1386/// v0.23 — where the cron tick fires from.
1387#[derive(
1388    Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1389)]
1390#[serde(rename_all = "snake_case")]
1391pub enum RunsOn {
1392    /// Backend's central scheduler ticks and publishes Commands to
1393    /// NATS. Historical default, what every pre-v0.23 schedule
1394    /// uses. Agent offline ⇒ Command queued in STREAM_EXEC; agent
1395    /// reconnects ⇒ catch-up via [`command_replay`](crate)
1396    /// (see kanade-agent's command_replay module).
1397    #[default]
1398    Backend,
1399    /// Each targeted agent runs the cron tick locally. Survives
1400    /// broker / WAN outages. Best for laptops / mobile devices that
1401    /// roam off the corporate network. Agent must be online for the
1402    /// initial schedule + job-catalog pull, but once cached the
1403    /// agent fires the script standalone.
1404    Agent,
1405}
1406
1407/// Per-pc/per-target dedup semantics for a [`Schedule`] (v0.19).
1408#[derive(
1409    Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1410)]
1411#[serde(rename_all = "snake_case")]
1412pub enum ExecMode {
1413    /// Fire on every cron tick at the whole target. Historical
1414    /// (pre-v0.19) behavior; no dedup.
1415    #[default]
1416    EveryTick,
1417    /// Fire at each pc until that pc succeeds; then skip it until
1418    /// the optional cooldown elapses (or forever if no cooldown).
1419    /// Use for kitting / first-boot / per-pc compliance checks.
1420    OncePerPc,
1421    /// Fire at the whole target until **any** pc succeeds; then
1422    /// skip the whole target until the optional cooldown elapses
1423    /// (or forever if no cooldown). Use for "one delegate is
1424    /// enough" tasks like license check-in.
1425    OncePerTarget,
1426}
1427
1428/// #418 Phase 1 — the single "when does this fire" axis.
1429///
1430/// Replaces the old `cron` + `mode` + `cooldown` trio whose
1431/// interactions were implicit (cron doubled as both a real
1432/// time-of-day trigger and a reconcile poll period; contradictory
1433/// combinations silently no-opped). Two shapes:
1434///
1435/// * **reconcile** (`per_pc` / `per_target`) — desired-state: "each
1436///   pc (or one delegate) should have run this within `every`".
1437///   The poll period is system-generated ([`POLL_CRON`], every
1438///   minute) and no longer the operator's concern.
1439/// * **escape hatch** (`cron`) — verbatim 6-field cron, fires at
1440///   the whole target on every tick with no dedup (the old
1441///   `every_tick`). Temporary: exists only to keep time-of-day
1442///   schedules (morning-greeting) alive until the Phase 2
1443///   calendar form (`at` / `days` / `tz`) lands. New reconcile
1444///   work must NOT use it.
1445#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1446#[serde(rename_all = "snake_case")]
1447pub enum When {
1448    /// Fire at each targeted pc: `once` (kitting — succeed once,
1449    /// skip forever, forever catching brand-new / re-imaged pcs)
1450    /// or `{ every: <humantime> }` (patrol — re-arm per pc after
1451    /// the interval).
1452    PerPc(PerPolicy),
1453    /// Fire until **any** one pc of the target succeeds, then skip
1454    /// the whole target (`once`) or re-arm after `every`. Needs
1455    /// fleet-wide completion data, so it is backend-only —
1456    /// `runs_on: agent` + `per_target` is rejected by
1457    /// [`Schedule::validate`].
1458    PerTarget(PerPolicy),
1459    /// Escape hatch: verbatim 6-field cron expression
1460    /// (`sec min hour day month day-of-week`), validated with the
1461    /// same parser `tokio-cron-scheduler` uses. Every tick fires
1462    /// the whole target — no dedup, no cooldown.
1463    Cron(String),
1464}
1465
1466/// `once` vs `{ every: <humantime> }` — shared by `per_pc` /
1467/// `per_target`. Untagged so the YAML stays the bare keyword or a
1468/// one-key map, nothing more ceremonial.
1469#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1470#[serde(untagged)]
1471pub enum PerPolicy {
1472    /// The bare string `once`: succeed once, then skip permanently
1473    /// (cooldown = infinity).
1474    Once(OnceLiteral),
1475    /// Re-arm after the humantime interval, e.g. `{ every: 6h }`.
1476    Every(EverySpec),
1477}
1478
1479/// Single-variant enum so serde accepts exactly the string `once`
1480/// (a free-form `String` would swallow typos like `onec`).
1481#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
1482#[serde(rename_all = "snake_case")]
1483pub enum OnceLiteral {
1484    Once,
1485}
1486
1487/// `{ every: <humantime> }`. Standalone struct (not an inline
1488/// struct variant) so `deny_unknown_fields` still bites under the
1489/// untagged [`PerPolicy`] — `{ evry: 6h }` is a parse error, not a
1490/// silently-ignored key.
1491#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1492#[serde(deny_unknown_fields)]
1493pub struct EverySpec {
1494    /// Humantime interval (`10m`, `6h`, `1d`...). Parsed lazily —
1495    /// [`Schedule::validate`] rejects garbage at create time.
1496    pub every: String,
1497}
1498
1499impl PerPolicy {
1500    /// The cooldown this policy lowers to: `once` = `None`
1501    /// (permanent skip), `every` = the interval.
1502    fn cooldown(&self) -> Option<String> {
1503        match self {
1504            PerPolicy::Once(_) => None,
1505            PerPolicy::Every(EverySpec { every }) => Some(every.clone()),
1506        }
1507    }
1508}
1509
1510impl std::fmt::Display for When {
1511    /// Operator-facing one-liner (`per_pc once` / `per_pc every 6h`
1512    /// / `cron: 0 0 9 * * mon-fri`) for log lines, audit payloads
1513    /// and the API's `ScheduleSummary`.
1514    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1515        let policy = |p: &PerPolicy| match p {
1516            PerPolicy::Once(_) => "once".to_string(),
1517            PerPolicy::Every(EverySpec { every }) => format!("every {every}"),
1518        };
1519        match self {
1520            When::PerPc(p) => write!(f, "per_pc {}", policy(p)),
1521            When::PerTarget(p) => write!(f, "per_target {}", policy(p)),
1522            When::Cron(expr) => write!(f, "cron: {expr}"),
1523        }
1524    }
1525}
1526
1527/// Optional validity window for a [`Schedule`] (#418 decision G).
1528/// Half-open `[from, until)`; either bound may be omitted. Bounds
1529/// are `YYYY-MM-DD` (= that day's 00:00 **UTC**) or full RFC3339
1530/// (use an offset for local-time precision). Kept as strings so the
1531/// JSON Schema the SPA editor consumes stays two plain string
1532/// fields, mirroring how `jitter` / `starting_deadline` are stored.
1533#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default, PartialEq, Eq)]
1534#[serde(deny_unknown_fields)]
1535pub struct Active {
1536    /// Dormant before this instant.
1537    #[serde(default, skip_serializing_if = "Option::is_none")]
1538    pub from: Option<String>,
1539    /// Dormant from this instant on (exclusive).
1540    #[serde(default, skip_serializing_if = "Option::is_none")]
1541    pub until: Option<String>,
1542}
1543
1544impl Active {
1545    /// `skip_serializing_if` helper — an empty window means "always
1546    /// active" and is omitted from the wire format entirely.
1547    pub fn is_empty(&self) -> bool {
1548        self.from.is_none() && self.until.is_none()
1549    }
1550
1551    /// Parse one bound: RFC3339 first, then bare `YYYY-MM-DD`
1552    /// (midnight UTC).
1553    pub fn parse_bound(s: &str) -> Result<chrono::DateTime<chrono::Utc>, String> {
1554        if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
1555            return Ok(dt.with_timezone(&chrono::Utc));
1556        }
1557        if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1558            let midnight = d.and_hms_opt(0, 0, 0).expect("00:00:00 is always valid");
1559            return Ok(chrono::DateTime::from_naive_utc_and_offset(
1560                midnight,
1561                chrono::Utc,
1562            ));
1563        }
1564        Err(format!(
1565            "active: unparseable bound '{s}' (want YYYY-MM-DD or RFC3339)"
1566        ))
1567    }
1568
1569    /// Is `now` inside the window? Unparseable bounds are treated
1570    /// as absent here (fail-open) — [`Schedule::validate`] is the
1571    /// place that rejects them loudly; this runs on every tick and
1572    /// must never panic on a stale KV blob.
1573    pub fn contains(&self, now: chrono::DateTime<chrono::Utc>) -> bool {
1574        let bound = |s: &Option<String>| s.as_deref().and_then(|s| Self::parse_bound(s).ok());
1575        if bound(&self.from).is_some_and(|from| now < from) {
1576            return false;
1577        }
1578        if bound(&self.until).is_some_and(|until| now >= until) {
1579            return false;
1580        }
1581        true
1582    }
1583}
1584
1585/// The system-generated poll cadence every reconcile-shaped `when`
1586/// lowers to. Operators never write this: the real inter-run
1587/// spacing is the `every` cooldown; this only bounds "how soon do
1588/// we notice somebody is due" (#418 decision B took the poll
1589/// period away from the operator).
1590pub const POLL_CRON: &str = "0 * * * * *";
1591
1592/// What a [`When`] lowers to — the exact (cron, mode, cooldown)
1593/// trio the pre-#418 engine ran on. Keeping the engine vocabulary
1594/// unchanged is what lets Phase 1 swap the operator surface without
1595/// touching the tick / dedup machinery.
1596pub struct Lowered {
1597    /// 6-field cron handed to `tokio-cron-scheduler` — [`POLL_CRON`]
1598    /// for reconcile shapes, verbatim for the escape hatch.
1599    pub cron: String,
1600    /// Dedup semantics for `decide_fire`.
1601    pub mode: ExecMode,
1602    /// Humantime re-arm interval (`None` = succeed once, skip
1603    /// forever).
1604    pub cooldown: Option<String>,
1605}
1606
1607impl Schedule {
1608    /// Lower the operator-facing `when` onto the engine vocabulary.
1609    /// Single seam shared by the backend scheduler and the agent's
1610    /// local scheduler so the two can never drift.
1611    pub fn lowered(&self) -> Lowered {
1612        match &self.when {
1613            When::PerPc(p) => Lowered {
1614                cron: POLL_CRON.into(),
1615                mode: ExecMode::OncePerPc,
1616                cooldown: p.cooldown(),
1617            },
1618            When::PerTarget(p) => Lowered {
1619                cron: POLL_CRON.into(),
1620                mode: ExecMode::OncePerTarget,
1621                cooldown: p.cooldown(),
1622            },
1623            When::Cron(expr) => Lowered {
1624                cron: expr.clone(),
1625                mode: ExecMode::EveryTick,
1626                cooldown: None,
1627            },
1628        }
1629    }
1630
1631    /// Cross-field semantic checks that don't fit pure serde derive
1632    /// — the [`Manifest::validate`] counterpart (#418 decision F;
1633    /// pre-Phase-1 a broken schedule was accepted at create time
1634    /// and silently warn-skipped at tick time). Run at every create
1635    /// site: `kanade schedule create` (client-side) and
1636    /// `POST /api/schedules`. The job_id-exists check lives in the
1637    /// API handler instead — it needs the JOBS KV.
1638    pub fn validate(&self) -> Result<(), String> {
1639        if matches!(self.runs_on, RunsOn::Agent) && matches!(self.when, When::PerTarget(_)) {
1640            return Err(
1641                "when.per_target needs fleet-wide completion data and is backend-only; \
1642                 it cannot be combined with runs_on: agent (each agent self-schedules, \
1643                 so per-target dedup would be deduping across a target of 1)"
1644                    .into(),
1645            );
1646        }
1647        if let Some(cd) = self.lowered().cooldown.as_deref() {
1648            humantime::parse_duration(cd)
1649                .map_err(|e| format!("when.every: invalid duration '{cd}': {e}"))?;
1650        }
1651        if let When::Cron(expr) = &self.when {
1652            // Same parser configuration tokio-cron-scheduler 0.15
1653            // uses internally (croner, seconds required, DOM-and-DOW
1654            // both honored) — create-time validation can never
1655            // accept what register() would reject.
1656            croner::parser::CronParser::builder()
1657                .seconds(croner::parser::Seconds::Required)
1658                .dom_and_dow(true)
1659                .build()
1660                .parse(expr)
1661                .map_err(|e| format!("when.cron: invalid 6-field cron '{expr}': {e}"))?;
1662        }
1663        // The other humantime strings on the schedule (claude #419
1664        // review): runtime degrades gracefully on both (bad jitter →
1665        // silent no-op, bad starting_deadline → warn + skipped tick),
1666        // but "rejected at create time" should cover every field the
1667        // operator can typo, not just `when`.
1668        if let Some(j) = &self.plan.jitter {
1669            humantime::parse_duration(j)
1670                .map_err(|e| format!("jitter: invalid duration '{j}': {e}"))?;
1671        }
1672        if let Some(sd) = &self.starting_deadline {
1673            humantime::parse_duration(sd)
1674                .map_err(|e| format!("starting_deadline: invalid duration '{sd}': {e}"))?;
1675        }
1676        let from = self
1677            .active
1678            .from
1679            .as_deref()
1680            .map(Active::parse_bound)
1681            .transpose()?;
1682        let until = self
1683            .active
1684            .until
1685            .as_deref()
1686            .map(Active::parse_bound)
1687            .transpose()?;
1688        if let (Some(f), Some(u)) = (from, until) {
1689            if f >= u {
1690                return Err(format!(
1691                    "active.from ({}) must be strictly before active.until ({})",
1692                    self.active.from.as_deref().unwrap_or_default(),
1693                    self.active.until.as_deref().unwrap_or_default(),
1694                ));
1695            }
1696        }
1697        Ok(())
1698    }
1699}
1700
1701fn default_true() -> bool {
1702    true
1703}