Skip to main content

kanade_shared/
manifest.rs

1use serde::{Deserialize, Serialize};
2
3use crate::wire::{RunAs, Shell, Staleness};
4
5/// YAML job manifest (= registered "what to run", v0.18.0+).
6///
7/// Owns only script-intrinsic fields. **Who** (`target`), **how to
8/// phase fanout** (`rollout`), and **when to stagger start**
9/// (`jitter`) all moved to the Schedule / exec request side — same
10/// script can now be fired against different targets / rollouts
11/// without copying the script body.
12///
13/// `deny_unknown_fields` makes operators copy-pasting an older yaml
14/// that still has `target:` / `rollout:` see a clear parse error at
15/// `kanade job create` time instead of mysteriously losing it.
16#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
17#[serde(deny_unknown_fields)]
18pub struct Manifest {
19    pub id: String,
20    pub version: String,
21    #[serde(default)]
22    pub description: Option<String>,
23    pub execute: Execute,
24    #[serde(default)]
25    pub require_approval: bool,
26    /// Opt-in marker that this job produces a JSON inventory fact
27    /// payload on stdout. When present, the backend's results
28    /// projector parses `ExecResult.stdout` as JSON and upserts an
29    /// `inventory_facts` row keyed by `(pc_id, manifest.id)`. The
30    /// `display` sub-config drives the SPA's Inventory page render.
31    #[serde(default)]
32    pub inventory: Option<InventoryHint>,
33    /// Issue #246: opt-in marker that this job emits per-line
34    /// observability events on stdout (one JSON `ObsEvent` per
35    /// newline). When present, the agent — after the script exits
36    /// successfully — parses each non-empty stdout line as an
37    /// `ObsEvent`, publishes it on `obs.<pc_id>` via the
38    /// `obs_outbox`, and (intentionally) **omits the stdout from
39    /// the `ExecResult`** so the timeline data doesn't double up
40    /// in `execution_results.stdout` (which would multiply rows
41    /// by ~50/day/PC of noise).
42    ///
43    /// Distinct from `inventory:` (single JSON object → projector
44    /// upsert) — events are append-only timeline points consumed
45    /// by the dedicated `obs_events` table.
46    #[serde(default)]
47    pub emit: Option<EmitConfig>,
48    /// v0.26: Layer 2 staleness policy (SPEC.md §2.6.2). Controls
49    /// what the agent does at fire time when it can't verify the
50    /// `script_current` / `script_status` KV values are fresh —
51    /// especially relevant for `runs_on: agent` schedules where
52    /// the agent may fire from cache while offline. Defaults to
53    /// `Staleness::Cached` (silently use cached values), which
54    /// matches every pre-v0.26 Manifest.
55    #[serde(default)]
56    pub staleness: Staleness,
57}
58
59/// "Who + how + when-to-stagger" — the fanout-plan side of an exec.
60/// Used both as the POST `/api/exec/{job_id}` body and as the embedded
61/// `target` / `rollout` / `jitter` slot on [`Schedule`]. Centralising
62/// here keeps the validation + serialisation logic in one place.
63#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default)]
64pub struct FanoutPlan {
65    #[serde(default)]
66    pub target: Target,
67    /// Optional wave rollout — when present, the backend publishes
68    /// each wave's group subject on its own delay schedule instead
69    /// of fanning out the `target` block in one go. `target` then
70    /// only labels the deploy for the audit log.
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub rollout: Option<Rollout>,
73    /// Optional humantime jitter; agent uses it to randomise
74    /// execution start. Lives here (not on the script) so different
75    /// schedules / ad-hoc fires of the same job can pick different
76    /// stagger windows.
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub jitter: Option<String>,
79    /// Absolute time the scheduler stamps on each emitted Command
80    /// when this exec was driven by a [`Schedule`] with
81    /// `starting_deadline`. Agents receiving a Command after this
82    /// instant publish a synthetic skipped-result instead of
83    /// running the script. `None` (default) = no deadline / catch
84    /// up whenever delivered. Operators don't usually set this
85    /// directly — the scheduler computes it from `tick_at +
86    /// starting_deadline`.
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub deadline_at: Option<chrono::DateTime<chrono::Utc>>,
89}
90
91/// Manifest sub-section: how the SPA should render the inventory
92/// facts this job produces. Each field name (`field`) is a top-level
93/// key in the stdout JSON, e.g. `hostname`, `ram_gb`.
94///
95/// Two render modes:
96///   * `display` — vertical "field / value" per PC, used by the
97///     `/inventory?pc=<id>` detail view. ALL columns the operator
98///     wants visible on the detail page.
99///   * `summary` — horizontal table across the fleet (row = PC,
100///     column = field) on `/inventory`. Optional; when omitted the
101///     SPA falls back to `display`, but operators usually want a
102///     trimmer "hostname / OS / CPU / RAM" set for the fleet view.
103#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
104pub struct InventoryHint {
105    /// Detail-view columns, in order.
106    pub display: Vec<DisplayField>,
107    /// Optional fleet-list columns (row = PC). Defaults to `display`
108    /// when omitted, but operators usually pick a 3-5 column subset.
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub summary: Option<Vec<DisplayField>>,
111    /// v0.31 / #40: payload arrays that should be exploded into
112    /// per-element rows of a derived SQLite table. Lets operators
113    /// answer cross-PC questions ("which PCs still have Chrome <
114    /// 120?", "C: >90% full") with normal SQL filters + indexes
115    /// instead of grepping JSON. The projector creates the derived
116    /// table on register and replaces this PC's rows on each result
117    /// (DELETE WHERE pc_id=? AND job_id=? + bulk INSERT). See
118    /// [`ExplodeSpec`] for the per-spec schema.
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    pub explode: Option<Vec<ExplodeSpec>>,
121    /// v0.35 / #93: top-level scalar fields whose changes the
122    /// projector logs to `inventory_history` (one event per
123    /// changed field per scan). Pairs with `explode[].track_history`
124    /// — that covers array elements; this covers single-valued
125    /// fields like `ram_bytes` / `os_version` / `cpu_model` /
126    /// `os_build` that operators want to track for "did the RAM
127    /// get upgraded?" / "when did Win 11 land on this PC?" /
128    /// "BIOS / firmware bumped?" questions. Field name = `field_path`
129    /// in the history row, `identity_json` is NULL, `before_json`
130    /// / `after_json` each carry `{"value": <prior or new value>}`.
131    /// First-ever observation of a scalar (no prior facts row)
132    /// emits `added`; subsequent value changes emit `changed`. No
133    /// `removed` events — a scalar disappearing from the payload
134    /// is rare and the operator can still see the last value via
135    /// the `before_json` of the most recent change.
136    #[serde(default, skip_serializing_if = "Option::is_none")]
137    pub history_scalars: Option<Vec<String>>,
138}
139
140/// Issue #246 — `emit:` manifest block for jobs whose stdout is
141/// NDJSON observability events (one `ObsEvent` per line). Parallel
142/// to `inventory:` but for the append-only timeline pipeline; see
143/// `Manifest::emit` for the full contract.
144#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
145#[serde(deny_unknown_fields)]
146pub struct EmitConfig {
147    /// What kind of payload the agent should expect on stdout. Only
148    /// `events` is defined today (parses each non-empty line as
149    /// `ObsEvent` and publishes on `obs.<pc_id>`); future variants
150    /// (e.g. metrics streams, structured trace events) plug in here.
151    #[serde(rename = "type")]
152    pub kind: EmitKind,
153    /// Operator hint for where the script keeps its own state — the
154    /// watermark file the PowerShell / sh body reads + writes
155    /// between runs so it only emits NEW events since the last
156    /// poll. The agent doesn't read this; it's documentation that
157    /// the SPA (and `kanade job edit`) can surface to operators
158    /// reviewing the manifest. Optional; the script is allowed to
159    /// keep state anywhere (registry, env, etc.) — the field's
160    /// presence makes the convention discoverable.
161    #[serde(default, skip_serializing_if = "Option::is_none")]
162    pub watermark_path: Option<String>,
163}
164
165/// `emit.type` enum. Lowercase serde so manifests read
166/// `type: events` rather than `Events`.
167#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
168#[serde(rename_all = "lowercase")]
169pub enum EmitKind {
170    /// Per-line `ObsEvent` JSON. Agent parses + publishes on
171    /// `obs.<pc_id>`, drops the stdout from the resulting
172    /// `ExecResult`.
173    Events,
174}
175
176/// v0.31 / #40: declarative "flatten this JSON array into a real
177/// SQLite table" spec on an inventory manifest. The projector
178/// creates the table on first registration (CREATE TABLE IF NOT
179/// EXISTS + indexes) and writes a row per element of
180/// `payload[field]` on every result, scoped by (pc_id, job_id) so
181/// each PC's rows replace cleanly without a per-PC schema.
182#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
183pub struct ExplodeSpec {
184    /// JSON array key under the payload to explode. E.g. `"apps"`
185    /// for `payload: { apps: [{...}, {...}] }`.
186    pub field: String,
187    /// Derived SQLite table name. Operators choose this — pick
188    /// something namespaced + stable (`inventory_sw_apps`, not
189    /// `apps`) so multiple inventory manifests don't collide on a
190    /// generic name.
191    pub table: String,
192    /// Element-level fields that uniquely identify a row inside one
193    /// PC's payload. The full PK is `(pc_id, job_id) + these
194    /// columns`. Required — operators must think about uniqueness
195    /// (e.g. `["name", "source"]` for installed apps because the
196    /// same name appears in multiple uninstall hives).
197    ///
198    /// v0.31 / #41: same tuple drives history identity. When
199    /// `track_history` is on, the projector serialises these
200    /// fields' values into `inventory_history.identity_json` for
201    /// every change event, so queries like "every PC that ever
202    /// installed Chrome (any source)" filter on identity_json
203    /// content without a per-manifest schema.
204    pub primary_key: Vec<String>,
205    /// Per-element fields that become columns in the derived table.
206    pub columns: Vec<ExplodeColumn>,
207    /// v0.31 / #41: when true (default false), the projector
208    /// diffs each PC's incoming payload against the prior rows
209    /// for the same (pc_id, job_id) BEFORE the DELETE-then-INSERT
210    /// replace, and writes added / removed / changed events into
211    /// `inventory_history`. Lets operators answer time-dimension
212    /// questions ("when did Chrome 120 first appear on PC X?",
213    /// "what's the Win 11 23H2 rollout curve") without storing
214    /// per-scan snapshots. Off by default so operators opt in
215    /// per-spec — history has a real storage cost on long-lived
216    /// deployments (mitigated by the 90-day default retention
217    /// sweeper, see `cleanup` module).
218    #[serde(default)]
219    pub track_history: bool,
220}
221
222/// One column in an [`ExplodeSpec`]'s derived table.
223#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
224pub struct ExplodeColumn {
225    /// JSON key under each array element. Becomes the column name
226    /// in the derived SQLite table — we don't rename.
227    pub field: String,
228    /// SQLite affinity: `"text"` (default), `"integer"`, `"real"`.
229    /// Storage maps directly via `sqlx::query.bind(...)`; type
230    /// mismatches at INSERT-time fail loudly rather than silently
231    /// dropping the row.
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    #[serde(rename = "type")]
234    pub kind: Option<String>,
235    /// When true, the projector creates a `CREATE INDEX` on this
236    /// column at table-creation time. Boost for the common-filter
237    /// columns (`name`, `version`) — operators mark them
238    /// explicitly, the projector won't guess.
239    #[serde(default)]
240    pub index: bool,
241}
242
243#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
244pub struct DisplayField {
245    /// Top-level key in the stdout JSON.
246    pub field: String,
247    /// Human-readable column header.
248    pub label: String,
249    /// Optional render hint — `"number"`, `"bytes"`, `"timestamp"`,
250    /// or `"table"` (#39). Defaults to plain text rendering on the
251    /// SPA side. `"table"` expects the field's value to be a JSON
252    /// array of objects and renders a nested sub-table on the
253    /// per-PC detail page using `columns` as the schema; the fleet
254    /// summary view falls back to showing the row count for
255    /// `"table"` cells so the wide list stays compact.
256    #[serde(default, skip_serializing_if = "Option::is_none")]
257    #[serde(rename = "type")]
258    pub kind: Option<String>,
259    /// v0.30 / #39: when `kind == "table"`, the SPA renders the
260    /// field's value (an array of objects like
261    /// `disks: [{ device_id, size_bytes, ... }]`) as a nested
262    /// sub-table using these columns. Each column is itself a
263    /// `DisplayField`, so the nested cells reuse the same render
264    /// hints (`bytes`, `number`, `timestamp`) — no parallel format
265    /// pipeline. Ignored for any other `kind`.
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub columns: Option<Vec<DisplayField>>,
268}
269
270#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
271pub struct Rollout {
272    #[serde(default)]
273    pub strategy: RolloutStrategy,
274    pub waves: Vec<Wave>,
275}
276
277#[derive(
278    Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
279)]
280#[serde(rename_all = "lowercase")]
281pub enum RolloutStrategy {
282    #[default]
283    Wave,
284}
285
286#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
287pub struct Wave {
288    pub group: String,
289    /// humantime delay measured from the deploy's publish time. wave[0]
290    /// typically has "0s"; subsequent waves use minutes / hours.
291    pub delay: String,
292}
293
294#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default)]
295pub struct Target {
296    #[serde(default)]
297    pub groups: Vec<String>,
298    #[serde(default)]
299    pub pcs: Vec<String>,
300    #[serde(default)]
301    pub all: bool,
302}
303
304impl Target {
305    /// At least one of all / groups / pcs is set.
306    pub fn is_specified(&self) -> bool {
307        self.all || !self.groups.is_empty() || !self.pcs.is_empty()
308    }
309}
310
311#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
312#[serde(deny_unknown_fields)]
313pub struct Execute {
314    pub shell: ExecuteShell,
315    /// Inline script body. Mutually exclusive with [`script_file`]
316    /// and [`script_object`]; exactly one of the three must be set
317    /// (enforced by [`Execute::validate_script_source`] at the
318    /// write-side parse boundaries — `kanade job create` and
319    /// `POST /api/jobs`).
320    ///
321    /// Empty string is treated as **unset** so operators can swap
322    /// to a `script_file:` / `script_object:` alternative just by
323    /// commenting out the body, without having to also drop the
324    /// `script:` key entirely.
325    ///
326    /// [`script_file`]: Self::script_file
327    /// [`script_object`]: Self::script_object
328    #[serde(default, skip_serializing_if = "Option::is_none")]
329    pub script: Option<String>,
330    /// Repo-local file path resolved by the operator-side CLI at
331    /// `kanade job create` time. The CLI reads the file, slots its
332    /// contents into `script`, and clears this field before
333    /// POSTing — so the backend / agents never see `script_file`
334    /// in stored manifests. SPEC §2.4.1.
335    ///
336    /// Resolver lands in a follow-up PR
337    /// (yukimemi/kanade#210); today this field passes parse-time
338    /// validation but the operator-side CLI bails with "not yet
339    /// implemented" until the resolver ships, so manifests that
340    /// reach the backend with `script_file` set are treated as a
341    /// schema-bug.
342    #[serde(default, skip_serializing_if = "Option::is_none")]
343    pub script_file: Option<String>,
344    /// Object Store reference (`<name>/<version>`) into the
345    /// `scripts` bucket (`OBJECT_SCRIPTS`). Agents fetch the body
346    /// at Execute time via `/api/script-objects/{name}/{version}`
347    /// and cache it locally. SPEC §2.4.1.
348    ///
349    /// Resolver lands in the same follow-up PR as `script_file`;
350    /// today this field passes parse-time validation but the
351    /// backend / agent exec paths bail with "not yet implemented"
352    /// when they see it.
353    #[serde(default, skip_serializing_if = "Option::is_none")]
354    pub script_object: Option<String>,
355    /// humantime duration string (e.g. "30s", "10m"). Script-intrinsic
356    /// — represents how long this script reasonably takes to run.
357    pub timeout: String,
358    /// Token + session combination the agent uses to launch the
359    /// script (v0.21). Default = [`RunAs::System`] (Session 0,
360    /// LocalSystem privileges, no GUI) — matches pre-v0.21 behavior.
361    #[serde(default)]
362    pub run_as: RunAs,
363    /// Working directory for the spawned child (v0.21.1). When
364    /// unset, the child inherits the agent's cwd — on Windows that
365    /// means `%SystemRoot%\System32` for the prod service, which is
366    /// almost never what operators actually want. Use an absolute
367    /// path; relative paths are passed through to the OS verbatim.
368    /// `%PROGRAMDATA%` works for `run_as: system`; for `run_as: user`
369    /// you'd want `%USERPROFILE%` (but expansion happens in the
370    /// shell, so write `$env:USERPROFILE` for PowerShell, or set
371    /// it via teravars before `kanade job create`).
372    #[serde(default, skip_serializing_if = "Option::is_none")]
373    pub cwd: Option<String>,
374}
375
376impl Execute {
377    /// Treat an empty `script:` body as "intentionally unset". Operators
378    /// commenting out a block-scalar tend to leave the key behind, and
379    /// failing the validator on `script: ""` would surprise them.
380    fn has_inline_script(&self) -> bool {
381        matches!(&self.script, Some(s) if !s.is_empty())
382    }
383
384    /// Enforce that exactly one of `script` / `script_file` /
385    /// `script_object` is set. Called at the write-side parse
386    /// boundaries (CLI `kanade job create` + backend
387    /// `POST /api/jobs`) so ambiguous YAML is rejected before it
388    /// reaches the JOBS KV. Read paths (projector, agent
389    /// scheduler, list endpoints) skip this check — they only ever
390    /// see what the write path already validated.
391    pub fn validate_script_source(&self) -> Result<(), String> {
392        let inline = self.has_inline_script();
393        let file = self.script_file.is_some();
394        let obj = self.script_object.is_some();
395        let set = [inline, file, obj].into_iter().filter(|b| *b).count();
396        match set {
397            1 => Ok(()),
398            0 => Err("execute: one of `script`, `script_file`, `script_object` must be set".into()),
399            _ => Err(format!(
400                "execute: only one of `script` / `script_file` / `script_object` may be set \
401                 (got script={inline}, script_file={file}, script_object={obj})"
402            )),
403        }
404    }
405}
406
407impl Manifest {
408    /// Cross-field semantic checks that don't fit into pure serde
409    /// derive. Currently delegates to
410    /// [`Execute::validate_script_source`] — see that method's
411    /// docs for the rationale on which call sites should run this.
412    pub fn validate(&self) -> Result<(), String> {
413        self.execute.validate_script_source()?;
414        Ok(())
415    }
416}
417
418#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
419#[serde(rename_all = "lowercase")]
420pub enum ExecuteShell {
421    Powershell,
422    Cmd,
423}
424
425impl From<ExecuteShell> for Shell {
426    fn from(s: ExecuteShell) -> Self {
427        match s {
428            ExecuteShell::Powershell => Shell::Powershell,
429            ExecuteShell::Cmd => Shell::Cmd,
430        }
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    #[test]
439    fn target_is_specified_requires_at_least_one_field() {
440        let empty = Target::default();
441        assert!(!empty.is_specified());
442
443        let with_all = Target {
444            all: true,
445            ..Target::default()
446        };
447        assert!(with_all.is_specified());
448
449        let with_groups = Target {
450            groups: vec!["canary".into()],
451            ..Target::default()
452        };
453        assert!(with_groups.is_specified());
454
455        let with_pcs = Target {
456            pcs: vec!["pc-01".into()],
457            ..Target::default()
458        };
459        assert!(with_pcs.is_specified());
460    }
461
462    #[test]
463    fn manifest_deserialises_minimal_yaml() {
464        // Matches jobs/echo-test.yaml. v0.18: no target/rollout/jitter
465        // — those live on the schedule / exec request now.
466        let yaml = r#"
467id: echo-test
468version: 0.0.1
469execute:
470  shell: powershell
471  script: "echo 'kanade'"
472  timeout: 30s
473"#;
474        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
475        assert_eq!(m.id, "echo-test");
476        assert_eq!(m.version, "0.0.1");
477        assert!(matches!(m.execute.shell, ExecuteShell::Powershell));
478        assert_eq!(
479            m.execute.script.as_deref().map(str::trim),
480            Some("echo 'kanade'")
481        );
482        assert!(m.execute.script_file.is_none());
483        assert!(m.execute.script_object.is_none());
484        assert_eq!(m.execute.timeout, "30s");
485        assert!(!m.require_approval);
486        m.validate()
487            .expect("inline-script manifest passes validation");
488    }
489
490    fn execute_with(
491        script: Option<&str>,
492        script_file: Option<&str>,
493        script_object: Option<&str>,
494    ) -> Execute {
495        Execute {
496            shell: ExecuteShell::Powershell,
497            script: script.map(str::to_owned),
498            script_file: script_file.map(str::to_owned),
499            script_object: script_object.map(str::to_owned),
500            timeout: "30s".into(),
501            run_as: RunAs::default(),
502            cwd: None,
503        }
504    }
505
506    #[test]
507    fn validate_accepts_inline_script() {
508        let e = execute_with(Some("echo hi"), None, None);
509        assert!(e.validate_script_source().is_ok());
510    }
511
512    #[test]
513    fn validate_accepts_script_file_alone() {
514        let e = execute_with(None, Some("scripts/cleanup.ps1"), None);
515        assert!(e.validate_script_source().is_ok());
516    }
517
518    #[test]
519    fn validate_accepts_script_object_alone() {
520        let e = execute_with(None, None, Some("cleanup/1.0.0"));
521        assert!(e.validate_script_source().is_ok());
522    }
523
524    #[test]
525    fn validate_treats_empty_inline_script_as_unset() {
526        // `script: ""` + `script_object` set is the natural shape
527        // when an operator comments out the YAML block-scalar body
528        // but leaves the key. Should pass.
529        let e = execute_with(Some(""), None, Some("cleanup/1.0.0"));
530        assert!(e.validate_script_source().is_ok());
531    }
532
533    #[test]
534    fn validate_rejects_zero_sources() {
535        let e = execute_with(None, None, None);
536        let err = e.validate_script_source().unwrap_err();
537        assert!(err.contains("must be set"), "got: {err}");
538    }
539
540    #[test]
541    fn validate_rejects_empty_inline_only() {
542        let e = execute_with(Some(""), None, None);
543        let err = e.validate_script_source().unwrap_err();
544        assert!(err.contains("must be set"), "got: {err}");
545    }
546
547    #[test]
548    fn validate_rejects_inline_plus_file() {
549        let e = execute_with(Some("echo hi"), Some("scripts/cleanup.ps1"), None);
550        let err = e.validate_script_source().unwrap_err();
551        assert!(err.contains("only one of"), "got: {err}");
552    }
553
554    #[test]
555    fn validate_rejects_inline_plus_object() {
556        let e = execute_with(Some("echo hi"), None, Some("cleanup/1.0.0"));
557        let err = e.validate_script_source().unwrap_err();
558        assert!(err.contains("only one of"), "got: {err}");
559    }
560
561    #[test]
562    fn validate_rejects_file_plus_object() {
563        let e = execute_with(None, Some("scripts/cleanup.ps1"), Some("cleanup/1.0.0"));
564        let err = e.validate_script_source().unwrap_err();
565        assert!(err.contains("only one of"), "got: {err}");
566    }
567
568    #[test]
569    fn validate_rejects_all_three() {
570        let e = execute_with(
571            Some("echo hi"),
572            Some("scripts/cleanup.ps1"),
573            Some("cleanup/1.0.0"),
574        );
575        let err = e.validate_script_source().unwrap_err();
576        assert!(err.contains("only one of"), "got: {err}");
577    }
578
579    #[test]
580    fn manifest_deserialises_script_object_yaml() {
581        // SPEC §2.4.1 example shape with the Object Store
582        // reference picked over inline.
583        let yaml = r#"
584id: cleanup-disk-temp
585version: 1.0.1
586execute:
587  shell: powershell
588  script_object: cleanup-disk-temp/1.0.1
589  timeout: 600s
590"#;
591        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
592        assert_eq!(
593            m.execute.script_object.as_deref(),
594            Some("cleanup-disk-temp/1.0.1")
595        );
596        assert!(m.execute.script.is_none());
597        m.validate()
598            .expect("script_object-only manifest passes validation");
599    }
600
601    #[test]
602    fn manifest_rejects_typo_in_script_field_name() {
603        // `deny_unknown_fields` on Execute catches `script_objectt`
604        // and similar fat-fingers at parse time instead of letting
605        // them silently fall through to "all three unset".
606        let yaml = r#"
607id: typo
608version: 1.0.0
609execute:
610  shell: powershell
611  script_objectt: oops
612  timeout: 30s
613"#;
614        let r: Result<Manifest, _> = serde_yaml::from_str(yaml);
615        assert!(r.is_err(), "expected parse error, got {r:?}");
616    }
617
618    #[test]
619    fn schedule_carries_target_and_rollout() {
620        let yaml = r#"
621id: hourly-cleanup-canary
622when:
623  per_pc: { every: 1h }
624job_id: cleanup
625enabled: true
626target:
627  groups: [canary, wave1]
628jitter: 30s
629rollout:
630  strategy: wave
631  waves:
632    - { group: canary, delay: 0s }
633    - { group: wave1,  delay: 5s }
634"#;
635        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
636        assert_eq!(s.id, "hourly-cleanup-canary");
637        assert_eq!(s.job_id, "cleanup");
638        assert_eq!(s.plan.target.groups, vec!["canary", "wave1"]);
639        assert_eq!(s.plan.jitter.as_deref(), Some("30s"));
640        let rollout = s.plan.rollout.expect("rollout present");
641        assert_eq!(rollout.waves.len(), 2);
642        assert_eq!(rollout.waves[0].group, "canary");
643        assert_eq!(rollout.waves[1].delay, "5s");
644        assert_eq!(rollout.strategy, RolloutStrategy::Wave);
645    }
646
647    #[test]
648    fn schedule_minimal_target_all() {
649        let yaml = r#"
650id: kitting
651when:
652  per_pc: once
653enabled: true
654job_id: scheduled-echo
655target: { all: true }
656"#;
657        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
658        assert_eq!(s.id, "kitting");
659        assert_eq!(s.when, When::PerPc(PerPolicy::Once(OnceLiteral::Once)));
660        assert!(s.enabled);
661        assert_eq!(s.job_id, "scheduled-echo");
662        assert!(s.plan.target.all);
663        assert!(s.plan.rollout.is_none());
664        assert!(s.plan.jitter.is_none());
665        assert!(s.active.is_empty());
666    }
667
668    #[test]
669    fn schedule_enabled_defaults_to_true() {
670        let yaml = r#"
671id: x
672when:
673  per_pc: once
674job_id: y
675target: { all: true }
676"#;
677        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
678        assert!(s.enabled);
679    }
680
681    // ---- `when` parsing (#418 Phase 1) ----
682
683    fn schedule_yaml_with(when_block: &str) -> String {
684        format!(
685            r#"
686id: x
687when:
688{when_block}
689job_id: y
690target: {{ all: true }}
691"#
692        )
693    }
694
695    #[test]
696    fn when_per_pc_every_parses_unquoted_humantime() {
697        // `6h` is digit-led but non-numeric → YAML string, same as
698        // the old `cooldown: 6h` convention. No quotes needed.
699        let s: Schedule =
700            serde_yaml::from_str(&schedule_yaml_with("  per_pc: { every: 6h }")).expect("parse");
701        assert_eq!(
702            s.when,
703            When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() }))
704        );
705    }
706
707    #[test]
708    fn when_per_target_every_parses() {
709        let s: Schedule = serde_yaml::from_str(&schedule_yaml_with("  per_target: { every: 24h }"))
710            .expect("parse");
711        assert_eq!(
712            s.when,
713            When::PerTarget(PerPolicy::Every(EverySpec {
714                every: "24h".into()
715            }))
716        );
717    }
718
719    #[test]
720    fn when_per_target_once_parses() {
721        // Falls out of the shared PerPolicy shape and decide_fire
722        // already implements it ("any one pc succeeds → skip the
723        // target forever"), so it is allowed, not rejected.
724        let s: Schedule =
725            serde_yaml::from_str(&schedule_yaml_with("  per_target: once")).expect("parse");
726        assert_eq!(s.when, When::PerTarget(PerPolicy::Once(OnceLiteral::Once)));
727    }
728
729    #[test]
730    fn when_calendar_time_parses() {
731        let s: Schedule = serde_yaml::from_str(&schedule_yaml_with(
732            "  calendar:\n    at: \"09:00\"\n    days: [mon-fri]",
733        ))
734        .expect("parse");
735        match &s.when {
736            When::Calendar(c) => {
737                assert_eq!(c.at, "09:00");
738                assert_eq!(c.days, vec!["mon-fri"]);
739            }
740            other => panic!("expected calendar, got {other:?}"),
741        }
742    }
743
744    #[test]
745    fn when_calendar_days_default_empty() {
746        let s: Schedule =
747            serde_yaml::from_str(&schedule_yaml_with("  calendar:\n    at: \"09:00\""))
748                .expect("parse");
749        match &s.when {
750            When::Calendar(c) => assert!(c.days.is_empty(), "days defaults to empty (= daily)"),
751            other => panic!("expected calendar, got {other:?}"),
752        }
753    }
754
755    #[test]
756    fn when_calendar_datetime_parses_all_separators() {
757        // one-shot: date+time in hyphen / ISO-T / slash forms
758        for at in ["2026-06-10 09:00", "2026-06-10T09:00", "2026/06/10 09:00"] {
759            let block = format!("  calendar:\n    at: \"{at}\"");
760            let s: Schedule = serde_yaml::from_str(&schedule_yaml_with(&block))
761                .unwrap_or_else(|e| panic!("parse '{at}': {e}"));
762            match &s.when {
763                When::Calendar(c) => {
764                    use chrono::Datelike;
765                    let p = c.parse_at().expect("parse_at");
766                    let d = p.date.expect("datetime at carries a date");
767                    assert_eq!((d.year(), d.month(), d.day()), (2026, 6, 10), "for '{at}'");
768                }
769                other => panic!("expected calendar, got {other:?}"),
770            }
771        }
772    }
773
774    #[test]
775    fn when_rejects_bad_once_keyword() {
776        // `onec` must be a parse error, not a silently-absorbed
777        // string (OnceLiteral is a single-variant enum for exactly
778        // this reason).
779        let r: Result<Schedule, _> = serde_yaml::from_str(&schedule_yaml_with("  per_pc: onec"));
780        assert!(r.is_err(), "expected parse error, got {r:?}");
781    }
782
783    #[test]
784    fn when_rejects_unknown_key_in_every() {
785        // EverySpec is deny_unknown_fields so `evry:` typos fail
786        // even under the untagged PerPolicy.
787        let r: Result<Schedule, _> =
788            serde_yaml::from_str(&schedule_yaml_with("  per_pc: { evry: 6h }"));
789        assert!(r.is_err(), "expected parse error, got {r:?}");
790    }
791
792    #[test]
793    fn when_rejects_unknown_variant() {
794        let r: Result<Schedule, _> =
795            serde_yaml::from_str(&schedule_yaml_with("  per_galaxy: once"));
796        assert!(r.is_err(), "expected parse error, got {r:?}");
797    }
798
799    #[test]
800    fn when_rejects_old_top_level_cron_field() {
801        // Pre-#418 shape: top-level `cron:` + no `when:`. Must fail
802        // loudly (missing `when`), which is what turns stale KV
803        // blobs into warn-skips after the upgrade.
804        let yaml = r#"
805id: x
806cron: "* * * * * *"
807job_id: y
808target: { all: true }
809"#;
810        let r: Result<Schedule, _> = serde_yaml::from_str(yaml);
811        assert!(r.is_err(), "expected parse error, got {r:?}");
812    }
813
814    #[test]
815    fn when_rejects_retired_cron_escape_hatch() {
816        // #418 Phase 2 retired `when: { cron: "..." }`. A raw cron
817        // is now an unknown variant → parse error (operators use the
818        // calendar form instead).
819        let r: Result<Schedule, _> =
820            serde_yaml::from_str(&schedule_yaml_with("  cron: \"0 0 9 * * mon-fri\""));
821        assert!(
822            r.is_err(),
823            "expected parse error for retired cron, got {r:?}"
824        );
825    }
826
827    #[test]
828    fn when_round_trips_json_and_yaml() {
829        // Round-trip through the full Schedule: that is the wire
830        // unit for both stores (JSON catalog KV + YAML mirror), and
831        // it exercises the singleton_map field attribute that keeps
832        // serde_yaml on the map shape instead of `!per_pc` tags.
833        for when in [
834            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
835            When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
836            When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
837            When::PerTarget(PerPolicy::Every(EverySpec {
838                every: "24h".into(),
839            })),
840            calendar("09:00", &["mon-fri"]),
841            calendar("2026-06-10 09:00", &[]),
842        ] {
843            let s = schedule_with(when.clone(), RunsOn::Backend);
844
845            let json = serde_json::to_string(&s).expect("json serialise");
846            let back: Schedule = serde_json::from_str(&json).expect("json deserialise");
847            assert_eq!(back.when, when, "json round-trip for {when}");
848
849            let yaml = serde_yaml::to_string(&s).expect("yaml serialise");
850            assert!(
851                !yaml.contains('!'),
852                "yaml must use the map shape, not tags: {yaml}"
853            );
854            let back: Schedule = serde_yaml::from_str(&yaml).expect("yaml deserialise");
855            assert_eq!(back.when, when, "yaml round-trip for {when}");
856        }
857    }
858
859    #[test]
860    fn when_once_serialises_as_bare_keyword() {
861        // The wire shape operators see in the YAML mirror must stay
862        // the ergonomic `per_pc: once`, not a one-variant map.
863        let json = serde_json::to_value(When::PerPc(PerPolicy::Once(OnceLiteral::Once)))
864            .expect("serialise");
865        assert_eq!(json, serde_json::json!({ "per_pc": "once" }));
866    }
867
868    #[test]
869    fn when_displays_operator_summary() {
870        for (when, expected) in [
871            (
872                When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
873                "per_pc once",
874            ),
875            (
876                When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
877                "per_pc every 6h",
878            ),
879            (
880                When::PerTarget(PerPolicy::Every(EverySpec {
881                    every: "24h".into(),
882                })),
883                "per_target every 24h",
884            ),
885            (calendar("09:00", &["mon-fri"]), "at 09:00 [mon-fri]"),
886            (calendar("2026-06-10 09:00", &[]), "at 2026-06-10 09:00"),
887        ] {
888            assert_eq!(when.to_string(), expected);
889        }
890    }
891
892    // ---- lowering (#418: when → engine vocabulary) ----
893
894    fn schedule_with(when: When, runs_on: RunsOn) -> Schedule {
895        Schedule {
896            id: "x".into(),
897            when,
898            job_id: "y".into(),
899            plan: FanoutPlan::default(),
900            active: Active::default(),
901            tz: ScheduleTz::default(),
902            starting_deadline: None,
903            runs_on,
904            enabled: true,
905        }
906    }
907
908    fn calendar(at: &str, days: &[&str]) -> When {
909        When::Calendar(CalendarSpec {
910            at: at.into(),
911            days: days.iter().map(|d| (*d).to_string()).collect(),
912        })
913    }
914
915    #[test]
916    fn lowering_matches_the_418_table() {
917        let cases = [
918            (
919                When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
920                (POLL_CRON, ExecMode::OncePerPc, None),
921            ),
922            (
923                When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
924                (POLL_CRON, ExecMode::OncePerPc, Some("6h")),
925            ),
926            (
927                When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
928                (POLL_CRON, ExecMode::OncePerTarget, None),
929            ),
930            (
931                When::PerTarget(PerPolicy::Every(EverySpec {
932                    every: "24h".into(),
933                })),
934                (POLL_CRON, ExecMode::OncePerTarget, Some("24h")),
935            ),
936            // calendar repeating → 6-field cron
937            (
938                calendar("09:00", &["mon-fri"]),
939                ("0 0 9 * * mon-fri", ExecMode::EveryTick, None),
940            ),
941            // calendar daily (no days) → DOW *
942            (
943                calendar("18:30", &[]),
944                ("0 30 18 * * *", ExecMode::EveryTick, None),
945            ),
946            // calendar one-shot → 7-field year cron
947            (
948                calendar("2026-06-10 09:00", &[]),
949                ("0 0 9 10 6 * 2026", ExecMode::EveryTick, None),
950            ),
951        ];
952        for (when, (cron, mode, cooldown)) in cases {
953            let l = schedule_with(when.clone(), RunsOn::Backend).lowered();
954            assert_eq!(l.cron, cron, "cron for {when}");
955            assert_eq!(l.mode, mode, "mode for {when}");
956            assert_eq!(l.cooldown.as_deref(), cooldown, "cooldown for {when}");
957        }
958    }
959
960    #[test]
961    fn lowered_carries_schedule_tz() {
962        for (tz, want) in [
963            (ScheduleTz::Local, ScheduleTz::Local),
964            (ScheduleTz::Utc, ScheduleTz::Utc),
965        ] {
966            let mut s = schedule_with(calendar("09:00", &["mon-fri"]), RunsOn::Backend);
967            s.tz = tz;
968            assert_eq!(s.lowered().tz, want, "calendar carries tz");
969            // reconcile shapes carry tz too (for the active-window check)
970            let mut s = schedule_with(
971                When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
972                RunsOn::Backend,
973            );
974            s.tz = tz;
975            assert_eq!(s.lowered().tz, want, "reconcile carries tz");
976        }
977    }
978
979    #[test]
980    fn poll_cron_is_accepted_by_the_engine_parser() {
981        // POLL_CRON is system-generated — if the engine's parser
982        // ever rejected it every reconcile schedule would die at
983        // register time. Validate it with the same croner config
984        // (Seconds::Required, dom_and_dow, year optional).
985        croner::parser::CronParser::builder()
986            .seconds(croner::parser::Seconds::Required)
987            .dom_and_dow(true)
988            .build()
989            .parse(POLL_CRON)
990            .expect("POLL_CRON must parse");
991    }
992
993    // ---- Schedule::validate() (#418 decision F) ----
994
995    #[test]
996    fn validate_accepts_reconcile_shapes() {
997        for when in [
998            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
999            When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
1000            When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
1001            When::PerTarget(PerPolicy::Every(EverySpec {
1002                every: "24h".into(),
1003            })),
1004        ] {
1005            schedule_with(when.clone(), RunsOn::Backend)
1006                .validate()
1007                .unwrap_or_else(|e| panic!("{when} should validate: {e}"));
1008        }
1009    }
1010
1011    #[test]
1012    fn validate_accepts_per_pc_on_agent() {
1013        schedule_with(
1014            When::PerPc(PerPolicy::Every(EverySpec { every: "1h".into() })),
1015            RunsOn::Agent,
1016        )
1017        .validate()
1018        .expect("per_pc + agent is the offline-inventory shape");
1019    }
1020
1021    #[test]
1022    fn validate_rejects_per_target_on_agent() {
1023        let err = schedule_with(
1024            When::PerTarget(PerPolicy::Every(EverySpec {
1025                every: "24h".into(),
1026            })),
1027            RunsOn::Agent,
1028        )
1029        .validate()
1030        .unwrap_err();
1031        assert!(err.contains("per_target"), "got: {err}");
1032        assert!(err.contains("runs_on: agent"), "got: {err}");
1033
1034        // per_target: once is also backend-only.
1035        let err = schedule_with(
1036            When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
1037            RunsOn::Agent,
1038        )
1039        .validate()
1040        .unwrap_err();
1041        assert!(err.contains("per_target"), "got (once): {err}");
1042        assert!(err.contains("runs_on: agent"), "got (once): {err}");
1043    }
1044
1045    #[test]
1046    fn validate_rejects_bad_every_duration() {
1047        let err = schedule_with(
1048            When::PerPc(PerPolicy::Every(EverySpec { every: "6x".into() })),
1049            RunsOn::Backend,
1050        )
1051        .validate()
1052        .unwrap_err();
1053        assert!(err.contains("when.every"), "got: {err}");
1054    }
1055
1056    #[test]
1057    fn validate_rejects_bad_jitter_and_starting_deadline() {
1058        let mut s = schedule_with(
1059            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1060            RunsOn::Backend,
1061        );
1062        s.plan.jitter = Some("5x".into());
1063        let err = s.validate().unwrap_err();
1064        assert!(err.contains("jitter"), "got: {err}");
1065
1066        let mut s = schedule_with(
1067            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1068            RunsOn::Backend,
1069        );
1070        s.starting_deadline = Some("soon".into());
1071        let err = s.validate().unwrap_err();
1072        assert!(err.contains("starting_deadline"), "got: {err}");
1073    }
1074
1075    #[test]
1076    fn validate_accepts_calendar_shapes() {
1077        for when in [
1078            calendar("09:00", &["mon-fri"]),   // weekday morning
1079            calendar("00:00", &["sun"]),       // weekly
1080            calendar("18:30", &[]),            // daily
1081            calendar("2026-06-10 09:00", &[]), // one-shot
1082            calendar("2026/12/25 00:00", &[]), // one-shot, slash form
1083        ] {
1084            schedule_with(when.clone(), RunsOn::Backend)
1085                .validate()
1086                .unwrap_or_else(|e| panic!("{when} should validate: {e}"));
1087        }
1088    }
1089
1090    #[test]
1091    fn validate_rejects_bad_at() {
1092        for bad in ["25:00", "09:60", "9", "noon", "2026-13-01 09:00"] {
1093            let err = schedule_with(calendar(bad, &[]), RunsOn::Backend)
1094                .validate()
1095                .unwrap_err();
1096            assert!(err.contains("when.at"), "for '{bad}', got: {err}");
1097        }
1098    }
1099
1100    #[test]
1101    fn validate_rejects_datetime_at_with_days() {
1102        // A dated `at` is a one-shot — pairing it with days is a
1103        // contradiction (the date already pins the day).
1104        let err = schedule_with(calendar("2026-06-10 09:00", &["mon"]), RunsOn::Backend)
1105            .validate()
1106            .unwrap_err();
1107        assert!(
1108            err.contains("one-shot") && err.contains("days"),
1109            "got: {err}"
1110        );
1111    }
1112
1113    #[test]
1114    fn validate_rejects_bad_day_name() {
1115        // A garbage DOW token is caught by the days pre-flight and
1116        // reported against `when.days`, not the confusing
1117        // "when.at lowered to invalid cron" (claude #432 review).
1118        let err = schedule_with(calendar("09:00", &["funday"]), RunsOn::Backend)
1119            .validate()
1120            .unwrap_err();
1121        assert!(err.contains("when.days"), "got: {err}");
1122        assert!(err.contains("funday"), "names the bad token: {err}");
1123        // a degenerate range like `mon-` reports the whole token, not
1124        // a cryptic empty part (claude #432 follow-up)
1125        let err = schedule_with(calendar("09:00", &["mon-"]), RunsOn::Backend)
1126            .validate()
1127            .unwrap_err();
1128        assert!(err.contains("'mon-'"), "names the whole token: {err}");
1129        // valid names / ranges / numeric / * all pass
1130        for ok in [
1131            calendar("09:00", &["mon-fri"]),
1132            calendar("09:00", &["mon", "wed", "sun"]),
1133            calendar("09:00", &["1-5"]),
1134        ] {
1135            schedule_with(ok.clone(), RunsOn::Backend)
1136                .validate()
1137                .unwrap_or_else(|e| panic!("{ok} should validate: {e}"));
1138        }
1139    }
1140
1141    #[test]
1142    fn calendar_oneshot_instant_detects_past() {
1143        use chrono::TimeZone;
1144        // a dated `at` resolves to an absolute instant…
1145        let c = CalendarSpec {
1146            at: "2024-01-01 09:00".into(),
1147            days: vec![],
1148        };
1149        let t = c
1150            .oneshot_instant(ScheduleTz::Utc)
1151            .expect("one-shot instant");
1152        assert_eq!(
1153            t,
1154            chrono::Utc.with_ymd_and_hms(2024, 1, 1, 9, 0, 0).unwrap()
1155        );
1156        assert!(t < chrono::Utc::now(), "2024 is in the past");
1157        // …while a repeating (time-only) calendar has no instant
1158        let rep = CalendarSpec {
1159            at: "09:00".into(),
1160            days: vec!["mon-fri".into()],
1161        };
1162        assert!(rep.oneshot_instant(ScheduleTz::Utc).is_none());
1163    }
1164
1165    fn schedule_with_active(from: Option<&str>, until: Option<&str>) -> Schedule {
1166        let mut s = schedule_with(
1167            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1168            RunsOn::Backend,
1169        );
1170        s.active = Active {
1171            from: from.map(str::to_owned),
1172            until: until.map(str::to_owned),
1173        };
1174        s
1175    }
1176
1177    #[test]
1178    fn validate_accepts_active_window() {
1179        schedule_with_active(Some("2026-07-01"), Some("2026-08-01T12:00:00+09:00"))
1180            .validate()
1181            .expect("date + rfc3339 bounds should validate");
1182    }
1183
1184    #[test]
1185    fn validate_rejects_unparseable_active_bound() {
1186        let err = schedule_with_active(Some("July 1st"), None)
1187            .validate()
1188            .unwrap_err();
1189        assert!(err.contains("active"), "got: {err}");
1190    }
1191
1192    #[test]
1193    fn validate_rejects_from_not_before_until() {
1194        let err = schedule_with_active(Some("2026-08-01"), Some("2026-07-01"))
1195            .validate()
1196            .unwrap_err();
1197        assert!(err.contains("strictly before"), "got: {err}");
1198
1199        let err = schedule_with_active(Some("2026-07-01"), Some("2026-07-01"))
1200            .validate()
1201            .unwrap_err();
1202        assert!(err.contains("strictly before"), "got: {err}");
1203    }
1204
1205    // ---- Active window semantics ----
1206
1207    #[test]
1208    fn active_window_is_half_open() {
1209        use chrono::TimeZone;
1210        let active = Active {
1211            from: Some("2026-07-01".into()),
1212            until: Some("2026-08-01".into()),
1213        };
1214        // UTC tz so the date bounds are UTC midnight.
1215        let at = |y, m, d, h| chrono::Utc.with_ymd_and_hms(y, m, d, h, 0, 0).unwrap();
1216        let c = |t| active.contains(t, ScheduleTz::Utc);
1217        assert!(!c(at(2026, 6, 30, 23)), "before from");
1218        assert!(c(at(2026, 7, 1, 0)), "at from (inclusive)");
1219        assert!(c(at(2026, 7, 15, 12)), "inside");
1220        assert!(!c(at(2026, 8, 1, 0)), "at until (exclusive)");
1221        assert!(!c(at(2026, 8, 2, 0)), "after until");
1222    }
1223
1224    #[test]
1225    fn active_empty_window_is_always_active() {
1226        assert!(Active::default().contains(chrono::Utc::now(), ScheduleTz::Local));
1227    }
1228
1229    #[test]
1230    fn active_rfc3339_bound_honours_offset_regardless_of_tz() {
1231        use chrono::TimeZone;
1232        let active = Active {
1233            from: Some("2026-07-01T09:00:00+09:00".into()),
1234            until: None,
1235        };
1236        // RFC3339 carries its own offset → tz arg is ignored.
1237        // 09:00 JST = 00:00 UTC.
1238        for tz in [ScheduleTz::Utc, ScheduleTz::Local] {
1239            assert!(
1240                !active.contains(
1241                    chrono::Utc
1242                        .with_ymd_and_hms(2026, 6, 30, 23, 59, 0)
1243                        .unwrap(),
1244                    tz
1245                )
1246            );
1247            assert!(active.contains(
1248                chrono::Utc.with_ymd_and_hms(2026, 7, 1, 0, 0, 0).unwrap(),
1249                tz
1250            ));
1251        }
1252    }
1253
1254    #[test]
1255    fn active_date_bound_respects_tz() {
1256        // A bare `YYYY-MM-DD` bound is midnight *in the schedule's
1257        // tz* (#418 Phase 2). The UTC interpretation is exact and
1258        // host-independent; assert that precisely.
1259        use chrono::TimeZone;
1260        let utc = Active::parse_bound("2026-07-01", ScheduleTz::Utc).expect("utc");
1261        assert_eq!(
1262            utc,
1263            chrono::Utc.with_ymd_and_hms(2026, 7, 1, 0, 0, 0).unwrap()
1264        );
1265
1266        // The local interpretation must equal what chrono::Local
1267        // computes for the same wall-clock midnight — proves the tz
1268        // path is wired to the host zone (the magnitude vs UTC is
1269        // host-dependent, so we compare against Local directly rather
1270        // than hard-coding the JST offset, keeping CI green on UTC
1271        // runners).
1272        let local = Active::parse_bound("2026-07-01", ScheduleTz::Local).expect("local");
1273        let want = chrono::Local
1274            .with_ymd_and_hms(2026, 7, 1, 0, 0, 0)
1275            .single()
1276            .expect("local midnight is unambiguous")
1277            .with_timezone(&chrono::Utc);
1278        assert_eq!(local, want, "date bound resolved in host-local tz");
1279    }
1280
1281    #[test]
1282    fn active_empty_is_skipped_when_serialising() {
1283        let s = schedule_with(
1284            When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1285            RunsOn::Backend,
1286        );
1287        let json = serde_json::to_value(&s).expect("serialise");
1288        assert!(
1289            json.get("active").is_none(),
1290            "empty active must not appear on the wire: {json}"
1291        );
1292    }
1293
1294    #[test]
1295    fn shipped_schedule_configs_parse_and_validate() {
1296        // Every YAML under configs/schedules/ must parse with the
1297        // current Schedule serde AND pass validate() — keeps the
1298        // shipped examples from drifting out of sync with the model
1299        // (#418 removed back-compat, so drift = broken at create).
1300        let dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../../configs/schedules");
1301        let mut seen = 0;
1302        for entry in std::fs::read_dir(&dir).expect("read configs/schedules") {
1303            let path = entry.expect("dir entry").path();
1304            if path.extension().and_then(|e| e.to_str()) != Some("yaml") {
1305                continue;
1306            }
1307            let body = std::fs::read_to_string(&path).expect("read yaml");
1308            let s: Schedule = serde_yaml::from_str(&body)
1309                .unwrap_or_else(|e| panic!("{} failed to parse: {e}", path.display()));
1310            s.validate()
1311                .unwrap_or_else(|e| panic!("{} failed validate(): {e}", path.display()));
1312            seen += 1;
1313        }
1314        assert!(seen > 0, "no schedule YAMLs found in {}", dir.display());
1315    }
1316
1317    // ---- pre-existing enum wire formats (unchanged by #418) ----
1318
1319    #[test]
1320    fn exec_mode_serialises_snake_case() {
1321        for (mode, expected) in [
1322            (ExecMode::EveryTick, "every_tick"),
1323            (ExecMode::OncePerPc, "once_per_pc"),
1324            (ExecMode::OncePerTarget, "once_per_target"),
1325        ] {
1326            let s = serde_json::to_value(mode).expect("serialise");
1327            assert_eq!(s, serde_json::Value::String(expected.into()));
1328            let back: ExecMode = serde_json::from_value(serde_json::Value::String(expected.into()))
1329                .expect("deserialise");
1330            assert_eq!(back, mode, "round-trip for {expected}");
1331        }
1332    }
1333
1334    #[test]
1335    fn schedule_runs_on_defaults_to_backend() {
1336        let yaml = r#"
1337id: x
1338when:
1339  per_pc: once
1340job_id: y
1341target: { all: true }
1342"#;
1343        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
1344        assert_eq!(s.runs_on, RunsOn::Backend);
1345    }
1346
1347    #[test]
1348    fn schedule_runs_on_agent_parses() {
1349        let yaml = r#"
1350id: offline-inv
1351when:
1352  per_pc: { every: 1h }
1353job_id: inventory-hw
1354target: { all: true }
1355runs_on: agent
1356"#;
1357        let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
1358        assert_eq!(s.runs_on, RunsOn::Agent);
1359        assert_eq!(s.lowered().mode, ExecMode::OncePerPc);
1360    }
1361
1362    #[test]
1363    fn runs_on_serialises_snake_case() {
1364        for (mode, expected) in [(RunsOn::Backend, "backend"), (RunsOn::Agent, "agent")] {
1365            let s = serde_json::to_value(mode).expect("serialise");
1366            assert_eq!(s, serde_json::Value::String(expected.into()));
1367            let back: RunsOn = serde_json::from_value(serde_json::Value::String(expected.into()))
1368                .expect("deserialise");
1369            assert_eq!(back, mode);
1370        }
1371    }
1372
1373    #[test]
1374    fn execute_shell_into_wire_shell() {
1375        assert_eq!(Shell::from(ExecuteShell::Powershell), Shell::Powershell);
1376        assert_eq!(Shell::from(ExecuteShell::Cmd), Shell::Cmd);
1377    }
1378
1379    #[test]
1380    fn manifest_staleness_defaults_to_cached() {
1381        let yaml = r#"
1382id: x
1383version: 1.0.0
1384execute:
1385  shell: powershell
1386  script: "echo"
1387  timeout: 1s
1388"#;
1389        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1390        assert_eq!(m.staleness, Staleness::Cached);
1391    }
1392
1393    #[test]
1394    fn manifest_strict_staleness_parses() {
1395        let yaml = r#"
1396id: urgent-patch
1397version: 2.5.1
1398execute:
1399  shell: powershell
1400  script: Install-Hotfix
1401  timeout: 5m
1402staleness:
1403  mode: strict
1404  max_cache_age: 0s
1405"#;
1406        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1407        match m.staleness {
1408            Staleness::Strict { max_cache_age } => assert_eq!(max_cache_age, "0s"),
1409            other => panic!("expected strict, got {other:?}"),
1410        }
1411    }
1412
1413    #[test]
1414    fn manifest_unchecked_staleness_parses() {
1415        let yaml = r#"
1416id: legacy
1417version: 0.1.0
1418execute:
1419  shell: cmd
1420  script: "echo"
1421  timeout: 1s
1422staleness:
1423  mode: unchecked
1424"#;
1425        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1426        assert_eq!(m.staleness, Staleness::Unchecked);
1427    }
1428
1429    #[test]
1430    fn missing_required_field_errors() {
1431        // `id` missing.
1432        let yaml = r#"
1433version: 1.0.0
1434target: { all: true }
1435execute:
1436  shell: powershell
1437  script: "echo"
1438  timeout: 1s
1439"#;
1440        let r: Result<Manifest, _> = serde_yaml::from_str(yaml);
1441        assert!(r.is_err(), "expected error, got {:?}", r);
1442    }
1443
1444    #[test]
1445    fn display_field_table_kind_round_trips_with_nested_columns() {
1446        // #39: `type: table` + `columns:` on a DisplayField gets
1447        // round-tripped through serde so the SPA receives the
1448        // nested schema verbatim. Nested columns themselves are
1449        // DisplayFields so they can carry `type: bytes` /
1450        // `type: number` for cell formatting.
1451        let yaml = r#"
1452id: inv-hw
1453version: 1.0.0
1454execute:
1455  shell: powershell
1456  script: "echo"
1457  timeout: 60s
1458inventory:
1459  display:
1460    - field: hostname
1461      label: Hostname
1462    - field: disks
1463      label: Disks
1464      type: table
1465      columns:
1466        - field: device_id
1467          label: Drive
1468        - field: size_bytes
1469          label: Size
1470          type: bytes
1471        - field: free_bytes
1472          label: Free
1473          type: bytes
1474        - field: file_system
1475          label: FS
1476"#;
1477        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1478        let inv = m.inventory.as_ref().expect("inventory hint");
1479        let disks = inv
1480            .display
1481            .iter()
1482            .find(|d| d.field == "disks")
1483            .expect("disks display row");
1484        assert_eq!(disks.kind.as_deref(), Some("table"));
1485        let cols = disks.columns.as_ref().expect("table needs columns");
1486        assert_eq!(cols.len(), 4);
1487        assert_eq!(cols[1].field, "size_bytes");
1488        assert_eq!(cols[1].kind.as_deref(), Some("bytes"));
1489    }
1490
1491    #[test]
1492    fn display_field_scalar_kind_keeps_columns_none() {
1493        // Defensive: when type is a scalar (`bytes` / `number` /
1494        // `timestamp`) the `columns` field stays None — the SPA
1495        // uses its presence as the "render nested table" signal,
1496        // so it must not leak in via serde defaults.
1497        let yaml = r#"
1498id: x
1499version: 1.0.0
1500execute:
1501  shell: powershell
1502  script: "echo"
1503  timeout: 5s
1504inventory:
1505  display:
1506    - { field: ram_bytes, label: RAM, type: bytes }
1507"#;
1508        let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1509        let inv = m.inventory.as_ref().unwrap();
1510        assert!(inv.display[0].columns.is_none());
1511    }
1512}
1513
1514/// Periodic schedule (spec §2.4.3). v0.18.0 carries the fanout plan
1515/// (target + optional rollout + optional jitter) inline; the
1516/// referenced job (`job_id` → [`BUCKET_JOBS`]) supplies only the
1517/// script body. Two schedules of the same job can target different
1518/// groups on different cadences without copying the manifest.
1519///
1520/// #418 Phase 1: the cadence is the single [`When`] field. The old
1521/// `cron` × `mode` × `cooldown` × `auto_disable_when_done` quartet
1522/// is gone (no back-compat — pre-Phase-1 KV blobs fail to parse and
1523/// are warn-skipped; re-`schedule create` to upgrade them). The
1524/// engine underneath is unchanged: [`Schedule::lowered`] maps `when`
1525/// onto the same (cron, ExecMode, cooldown) trio the scheduler and
1526/// `decide_fire` always ran on.
1527#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
1528pub struct Schedule {
1529    pub id: String,
1530    /// When to fire — a reconcile cadence (`per_pc` / `per_target`)
1531    /// or a calendar time trigger (`at` / `days`). See [`When`].
1532    ///
1533    /// `singleton_map`: serde_yaml 0.9 renders externally-tagged
1534    /// enums as `!per_pc` YAML tags by default; this keeps the
1535    /// operator-facing map shape (`when: { per_pc: once }`). JSON
1536    /// output is identical either way, and the schemars schema
1537    /// (external tagging = oneOf of single-key objects) already
1538    /// matches the singleton-map wire shape.
1539    #[serde(with = "serde_yaml::with::singleton_map")]
1540    #[schemars(with = "When")]
1541    pub when: When,
1542    /// Key into [`crate::kv::BUCKET_JOBS`]. Must equal a registered
1543    /// Manifest's `id`.
1544    pub job_id: String,
1545    /// Who + how-to-phase + when-to-stagger. The Manifest doesn't
1546    /// carry these any more — same job + different fanout = different
1547    /// schedule.
1548    #[serde(flatten)]
1549    pub plan: FanoutPlan,
1550    /// Optional validity window. Outside `[from, until)` the
1551    /// schedule is dormant — still registered, still visible, but
1552    /// every tick is skipped (deleted ≠ dormant: a campaign that
1553    /// ended stays inspectable and can be re-armed by editing the
1554    /// window). Checked at tick time on both the backend scheduler
1555    /// and the agent's local scheduler.
1556    #[serde(default, skip_serializing_if = "Active::is_empty")]
1557    pub active: Active,
1558    /// #418 Phase 2: the timezone this schedule's wall-clock fields
1559    /// are evaluated in — both the calendar `at` firing time AND the
1560    /// `active.{from,until}` window bounds. `local` (default) = the
1561    /// running host's TZ (the agent's for `runs_on: agent`, the
1562    /// backend server's otherwise); `utc` for TZ-independent
1563    /// schedules. Reconcile shapes (`per_pc`/`per_target`) ignore it
1564    /// for firing (poll cron runs every minute regardless) but still
1565    /// honor it for the `active` window.
1566    #[serde(default)]
1567    pub tz: ScheduleTz,
1568    /// v0.22: optional humantime window after a cron tick during
1569    /// which the Command is still considered "live". The scheduler
1570    /// computes `tick_at + starting_deadline` and stamps it onto
1571    /// each Command as `deadline_at`; agents skip Commands they
1572    /// receive after that absolute time. `None` (default) = no
1573    /// deadline, meaning a Command queued in the broker / stream
1574    /// during agent downtime runs whenever the agent reconnects —
1575    /// good for kitting / inventory / cleanup. Set this for
1576    /// time-of-day notifications, lunch reminders, etc., where
1577    /// "fire 3 hours late" would be wrong.
1578    #[serde(default, skip_serializing_if = "Option::is_none")]
1579    pub starting_deadline: Option<String>,
1580    /// v0.23: where does the cron tick happen? `Backend` (default,
1581    /// historical) = backend's scheduler fires Commands via NATS;
1582    /// agents passively receive. `Agent` = each targeted agent runs
1583    /// its own internal cron and fires locally, so the schedule
1584    /// keeps ticking even when the broker is unreachable (laptop on
1585    /// the train, broker maintenance window, full WAN outage). The
1586    /// two locations are mutually exclusive — when `Agent`, the
1587    /// backend scheduler stays out and just keeps the definition in
1588    /// KV for agents to read.
1589    #[serde(default)]
1590    pub runs_on: RunsOn,
1591    #[serde(default = "default_true")]
1592    pub enabled: bool,
1593}
1594
1595/// v0.23 — where the cron tick fires from.
1596#[derive(
1597    Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1598)]
1599#[serde(rename_all = "snake_case")]
1600pub enum RunsOn {
1601    /// Backend's central scheduler ticks and publishes Commands to
1602    /// NATS. Historical default, what every pre-v0.23 schedule
1603    /// uses. Agent offline ⇒ Command queued in STREAM_EXEC; agent
1604    /// reconnects ⇒ catch-up via [`command_replay`](crate)
1605    /// (see kanade-agent's command_replay module).
1606    #[default]
1607    Backend,
1608    /// Each targeted agent runs the cron tick locally. Survives
1609    /// broker / WAN outages. Best for laptops / mobile devices that
1610    /// roam off the corporate network. Agent must be online for the
1611    /// initial schedule + job-catalog pull, but once cached the
1612    /// agent fires the script standalone.
1613    Agent,
1614}
1615
1616/// Per-pc/per-target dedup semantics for a [`Schedule`] (v0.19).
1617#[derive(
1618    Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1619)]
1620#[serde(rename_all = "snake_case")]
1621pub enum ExecMode {
1622    /// Fire on every cron tick at the whole target. Historical
1623    /// (pre-v0.19) behavior; no dedup.
1624    #[default]
1625    EveryTick,
1626    /// Fire at each pc until that pc succeeds; then skip it until
1627    /// the optional cooldown elapses (or forever if no cooldown).
1628    /// Use for kitting / first-boot / per-pc compliance checks.
1629    OncePerPc,
1630    /// Fire at the whole target until **any** pc succeeds; then
1631    /// skip the whole target until the optional cooldown elapses
1632    /// (or forever if no cooldown). Use for "one delegate is
1633    /// enough" tasks like license check-in.
1634    OncePerTarget,
1635}
1636
1637/// #418 Phase 1 — the single "when does this fire" axis.
1638///
1639/// Replaces the old `cron` + `mode` + `cooldown` trio whose
1640/// interactions were implicit (cron doubled as both a real
1641/// time-of-day trigger and a reconcile poll period; contradictory
1642/// combinations silently no-opped). Two shapes:
1643///
1644/// * **reconcile** (`per_pc` / `per_target`) — desired-state: "each
1645///   pc (or one delegate) should have run this within `every`".
1646///   The poll period is system-generated ([`POLL_CRON`], every
1647///   minute) and no longer the operator's concern.
1648/// * **calendar** (`{ at, days }`) — a wall-clock time trigger
1649///   (#418 Phase 2, replacing the old raw-cron escape hatch). Fires
1650///   the whole target at the given time, no dedup. `at: "09:00"` +
1651///   `days` repeats; `at: "2026-06-10 09:00"` (a date+time) fires
1652///   exactly once. Evaluated in the schedule's top-level `tz`.
1653#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1654#[serde(rename_all = "snake_case")]
1655pub enum When {
1656    /// Fire at each targeted pc: `once` (kitting — succeed once,
1657    /// skip forever, forever catching brand-new / re-imaged pcs)
1658    /// or `{ every: <humantime> }` (patrol — re-arm per pc after
1659    /// the interval).
1660    PerPc(PerPolicy),
1661    /// Fire until **any** one pc of the target succeeds, then skip
1662    /// the whole target (`once`) or re-arm after `every`. Needs
1663    /// fleet-wide completion data, so it is backend-only —
1664    /// `runs_on: agent` + `per_target` is rejected by
1665    /// [`Schedule::validate`].
1666    PerTarget(PerPolicy),
1667    /// Calendar time trigger: `{ at: "09:00", days: [mon-fri] }`
1668    /// (repeating) or `{ at: "2026-06-10 09:00" }` (one-shot). Fires
1669    /// the whole target at that wall-clock time in the schedule's
1670    /// `tz` — no dedup, no cooldown.
1671    Calendar(CalendarSpec),
1672}
1673
1674/// Calendar time trigger (#418 Phase 2). `at` is either a time of
1675/// day (`"HH:MM"`, repeating — combine with `days`) or a full
1676/// date+time (`"YYYY-MM-DD HH:MM"`, a one-shot that fires once and
1677/// never again). Evaluated in the schedule's top-level `tz`.
1678#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1679#[serde(deny_unknown_fields)]
1680pub struct CalendarSpec {
1681    /// `"HH:MM"` (24h) for a repeating trigger, or
1682    /// `"YYYY-MM-DD HH:MM"` (hyphen / slash / `T` separators all
1683    /// accepted) for a one-shot. Parsed lazily —
1684    /// [`Schedule::validate`] rejects garbage at create time.
1685    pub at: String,
1686    /// Day-of-week filter for a time-of-day `at`: `["mon-fri"]`,
1687    /// `["mon","wed","fri"]`, … (passed verbatim to the cron DOW
1688    /// field, so ranges and names both work). Empty = every day.
1689    /// Must be empty when `at` carries a date (the date already
1690    /// pins the day).
1691    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1692    pub days: Vec<String>,
1693}
1694
1695/// Parsed `CalendarSpec.at`: the wall-clock minute/hour, plus the
1696/// date for a one-shot (`None` = repeating time-of-day).
1697struct ParsedAt {
1698    minute: u32,
1699    hour: u32,
1700    date: Option<chrono::NaiveDate>,
1701}
1702
1703impl CalendarSpec {
1704    /// Parse `at`: a date+time (`YYYY-MM-DD HH:MM`, hyphen / slash /
1705    /// `T` separators) is a one-shot; a bare `HH:MM` is repeating.
1706    fn parse_at(&self) -> Result<ParsedAt, String> {
1707        use chrono::Timelike;
1708        let s = self.at.trim();
1709        for fmt in ["%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M", "%Y/%m/%d %H:%M"] {
1710            if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, fmt) {
1711                return Ok(ParsedAt {
1712                    minute: dt.minute(),
1713                    hour: dt.hour(),
1714                    date: Some(dt.date()),
1715                });
1716            }
1717        }
1718        if let Ok(t) = chrono::NaiveTime::parse_from_str(s, "%H:%M") {
1719            return Ok(ParsedAt {
1720                minute: t.minute(),
1721                hour: t.hour(),
1722                date: None,
1723            });
1724        }
1725        Err(format!(
1726            "when.at: unparseable '{}' (want HH:MM or YYYY-MM-DD HH:MM)",
1727            self.at
1728        ))
1729    }
1730
1731    /// Pre-flight check on the `days` tokens so a bad day name gives
1732    /// a `when.days:`-scoped error instead of croner's confusing
1733    /// "when.at lowered to invalid cron" (claude #432 review). Each
1734    /// token is a day name (`mon`..`sun`), a numeric DOW (`0`..`7`),
1735    /// `*`, or a `-` range of those.
1736    fn validate_days(&self) -> Result<(), String> {
1737        const NAMES: [&str; 7] = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"];
1738        for tok in &self.days {
1739            // Report the whole token on a malformed range like `mon-`
1740            // (which would otherwise split to a cryptic empty part —
1741            // claude #432 follow-up).
1742            let invalid = |reason: &str| {
1743                Err(format!(
1744                    "when.days: invalid day token '{tok}' ({reason}; \
1745                     want mon..sun, 0-7, a range like mon-fri, or *)"
1746                ))
1747            };
1748            for part in tok.split('-') {
1749                let p = part.trim().to_ascii_lowercase();
1750                if p.is_empty() {
1751                    return invalid("empty range bound");
1752                }
1753                let ok = p == "*"
1754                    || NAMES.contains(&p.as_str())
1755                    || p.parse::<u8>().map(|n| n <= 7).unwrap_or(false);
1756                if !ok {
1757                    return invalid(&format!("'{part}' is not a day"));
1758                }
1759            }
1760        }
1761        Ok(())
1762    }
1763
1764    /// For a one-shot (`at` carries a date), the absolute instant it
1765    /// fires in `tz`. `None` for a repeating calendar. Used to warn
1766    /// about a one-shot whose date is already in the past (it would
1767    /// never fire).
1768    pub fn oneshot_instant(&self, tz: ScheduleTz) -> Option<chrono::DateTime<chrono::Utc>> {
1769        let p = self.parse_at().ok()?;
1770        let date = p.date?;
1771        let naive = date.and_hms_opt(p.hour, p.minute, 0)?;
1772        tz.naive_to_utc(naive)
1773    }
1774
1775    /// Lower to the cron string the scheduler engine runs. Repeating
1776    /// → 6-field `0 {min} {hour} * * {dow}`; one-shot → 7-field
1777    /// `0 {min} {hour} {day} {month} * {year}` (a past year never
1778    /// fires — that's what makes it one-shot).
1779    fn to_cron(&self) -> Result<String, String> {
1780        use chrono::Datelike;
1781        let ParsedAt { minute, hour, date } = self.parse_at()?;
1782        match date {
1783            Some(d) => {
1784                if !self.days.is_empty() {
1785                    return Err(
1786                        "when.at with a date is a one-shot and cannot be combined with days".into(),
1787                    );
1788                }
1789                Ok(format!(
1790                    "0 {minute} {hour} {} {} * {}",
1791                    d.day(),
1792                    d.month(),
1793                    d.year()
1794                ))
1795            }
1796            None => {
1797                let dow = if self.days.is_empty() {
1798                    "*".to_string()
1799                } else {
1800                    self.validate_days()?;
1801                    self.days.join(",")
1802                };
1803                Ok(format!("0 {minute} {hour} * * {dow}"))
1804            }
1805        }
1806    }
1807}
1808
1809/// The timezone a schedule's wall-clock fields (`when.at`,
1810/// `active.{from,until}`) are evaluated in (#418 Phase 2).
1811#[derive(
1812    Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1813)]
1814#[serde(rename_all = "snake_case")]
1815pub enum ScheduleTz {
1816    /// The running host's local timezone — the agent's for
1817    /// `runs_on: agent`, the backend server's otherwise. Default.
1818    #[default]
1819    Local,
1820    /// UTC — for timezone-independent schedules.
1821    Utc,
1822}
1823
1824impl ScheduleTz {
1825    /// Interpret a naive (zoneless) datetime as being in this tz and
1826    /// convert to UTC. On a DST *fold* (the local time occurs twice
1827    /// when clocks go back) we pick `.earliest()` rather than
1828    /// rejecting it; `None` is reserved for a true DST *gap* (a local
1829    /// time that never exists). `Utc` is fixed-offset so neither ever
1830    /// happens; `Local` is whatever timezone the running host is set
1831    /// to and *can* hit a gap/fold on any DST-observing host — not
1832    /// just the JST we run today (gemini + claude #432 review).
1833    fn naive_to_utc(self, naive: chrono::NaiveDateTime) -> Option<chrono::DateTime<chrono::Utc>> {
1834        use chrono::TimeZone;
1835        match self {
1836            ScheduleTz::Utc => Some(chrono::DateTime::from_naive_utc_and_offset(
1837                naive,
1838                chrono::Utc,
1839            )),
1840            ScheduleTz::Local => chrono::Local
1841                .from_local_datetime(&naive)
1842                .earliest()
1843                .map(|dt| dt.with_timezone(&chrono::Utc)),
1844        }
1845    }
1846}
1847
1848/// `once` vs `{ every: <humantime> }` — shared by `per_pc` /
1849/// `per_target`. Untagged so the YAML stays the bare keyword or a
1850/// one-key map, nothing more ceremonial.
1851#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1852#[serde(untagged)]
1853pub enum PerPolicy {
1854    /// The bare string `once`: succeed once, then skip permanently
1855    /// (cooldown = infinity).
1856    Once(OnceLiteral),
1857    /// Re-arm after the humantime interval, e.g. `{ every: 6h }`.
1858    Every(EverySpec),
1859}
1860
1861/// Single-variant enum so serde accepts exactly the string `once`
1862/// (a free-form `String` would swallow typos like `onec`).
1863#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
1864#[serde(rename_all = "snake_case")]
1865pub enum OnceLiteral {
1866    Once,
1867}
1868
1869/// `{ every: <humantime> }`. Standalone struct (not an inline
1870/// struct variant) so `deny_unknown_fields` still bites under the
1871/// untagged [`PerPolicy`] — `{ evry: 6h }` is a parse error, not a
1872/// silently-ignored key.
1873#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1874#[serde(deny_unknown_fields)]
1875pub struct EverySpec {
1876    /// Humantime interval (`10m`, `6h`, `1d`...). Parsed lazily —
1877    /// [`Schedule::validate`] rejects garbage at create time.
1878    pub every: String,
1879}
1880
1881impl PerPolicy {
1882    /// The cooldown this policy lowers to: `once` = `None`
1883    /// (permanent skip), `every` = the interval.
1884    fn cooldown(&self) -> Option<String> {
1885        match self {
1886            PerPolicy::Once(_) => None,
1887            PerPolicy::Every(EverySpec { every }) => Some(every.clone()),
1888        }
1889    }
1890}
1891
1892impl std::fmt::Display for When {
1893    /// Operator-facing one-liner (`per_pc once` / `per_pc every 6h`
1894    /// / `at 09:00 [mon-fri]` / `at 2026-06-10 09:00`) for log
1895    /// lines, audit payloads and the API's `ScheduleSummary`.
1896    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1897        let policy = |p: &PerPolicy| match p {
1898            PerPolicy::Once(_) => "once".to_string(),
1899            PerPolicy::Every(EverySpec { every }) => format!("every {every}"),
1900        };
1901        match self {
1902            When::PerPc(p) => write!(f, "per_pc {}", policy(p)),
1903            When::PerTarget(p) => write!(f, "per_target {}", policy(p)),
1904            When::Calendar(c) if c.days.is_empty() => write!(f, "at {}", c.at),
1905            When::Calendar(c) => write!(f, "at {} [{}]", c.at, c.days.join(",")),
1906        }
1907    }
1908}
1909
1910/// Optional validity window for a [`Schedule`] (#418 decision G).
1911/// Half-open `[from, until)`; either bound may be omitted. Bounds
1912/// are `YYYY-MM-DD` (= that day's 00:00 in the schedule's `tz`) or
1913/// full RFC3339 (offset is honored as-is, `tz` ignored). Kept as
1914/// strings so the JSON Schema the SPA editor consumes stays two
1915/// plain string fields, mirroring `jitter` / `starting_deadline`.
1916///
1917/// #418 Phase 2: bounds are evaluated in the schedule's top-level
1918/// `tz` (was UTC-only in Phase 1) so `tz: local` makes both the
1919/// calendar `at` AND the `active` window local — one consistent
1920/// timezone per schedule.
1921#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default, PartialEq, Eq)]
1922#[serde(deny_unknown_fields)]
1923pub struct Active {
1924    /// Dormant before this instant.
1925    #[serde(default, skip_serializing_if = "Option::is_none")]
1926    pub from: Option<String>,
1927    /// Dormant from this instant on (exclusive).
1928    #[serde(default, skip_serializing_if = "Option::is_none")]
1929    pub until: Option<String>,
1930}
1931
1932impl Active {
1933    /// `skip_serializing_if` helper — an empty window means "always
1934    /// active" and is omitted from the wire format entirely.
1935    pub fn is_empty(&self) -> bool {
1936        self.from.is_none() && self.until.is_none()
1937    }
1938
1939    /// Parse one bound: RFC3339 first (offset honored, `tz`
1940    /// ignored), then bare `YYYY-MM-DD` (00:00 in `tz`).
1941    pub fn parse_bound(s: &str, tz: ScheduleTz) -> Result<chrono::DateTime<chrono::Utc>, String> {
1942        if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
1943            return Ok(dt.with_timezone(&chrono::Utc));
1944        }
1945        if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1946            let midnight = d.and_hms_opt(0, 0, 0).expect("00:00:00 is always valid");
1947            return tz.naive_to_utc(midnight).ok_or_else(|| {
1948                format!("active: bound '{s}' falls in a DST gap for the schedule's tz")
1949            });
1950        }
1951        Err(format!(
1952            "active: unparseable bound '{s}' (want YYYY-MM-DD or RFC3339)"
1953        ))
1954    }
1955
1956    /// Is `now` inside the window? Unparseable bounds are treated
1957    /// as absent here (fail-open) — [`Schedule::validate`] is the
1958    /// place that rejects them loudly; this runs on every tick and
1959    /// must never panic on a stale KV blob.
1960    pub fn contains(&self, now: chrono::DateTime<chrono::Utc>, tz: ScheduleTz) -> bool {
1961        let bound = |s: &Option<String>| s.as_deref().and_then(|s| Self::parse_bound(s, tz).ok());
1962        if bound(&self.from).is_some_and(|from| now < from) {
1963            return false;
1964        }
1965        if bound(&self.until).is_some_and(|until| now >= until) {
1966            return false;
1967        }
1968        true
1969    }
1970}
1971
1972/// The system-generated poll cadence every reconcile-shaped `when`
1973/// lowers to. Operators never write this: the real inter-run
1974/// spacing is the `every` cooldown; this only bounds "how soon do
1975/// we notice somebody is due" (#418 decision B took the poll
1976/// period away from the operator).
1977pub const POLL_CRON: &str = "0 * * * * *";
1978
1979/// What a [`When`] lowers to — the exact (cron, mode, cooldown)
1980/// trio the pre-#418 engine ran on. Keeping the engine vocabulary
1981/// unchanged is what lets Phase 1 swap the operator surface without
1982/// touching the tick / dedup machinery.
1983pub struct Lowered {
1984    /// Cron handed to `tokio-cron-scheduler` — [`POLL_CRON`] for
1985    /// reconcile shapes, a 6/7-field cron for calendar shapes.
1986    pub cron: String,
1987    /// Dedup semantics for `decide_fire`.
1988    pub mode: ExecMode,
1989    /// Humantime re-arm interval (`None` = succeed once, skip
1990    /// forever).
1991    pub cooldown: Option<String>,
1992    /// Timezone to evaluate `cron` in (#418 Phase 2). The scheduler
1993    /// passes this to `Job::new_async_tz`. Reconcile shapes carry
1994    /// the schedule's tz too even though POLL_CRON is tz-agnostic,
1995    /// so the same value drives the `active`-window check.
1996    pub tz: ScheduleTz,
1997}
1998
1999impl Schedule {
2000    /// Lower the operator-facing `when` onto the engine vocabulary.
2001    /// Single seam shared by the backend scheduler and the agent's
2002    /// local scheduler so the two can never drift.
2003    pub fn lowered(&self) -> Lowered {
2004        let tz = self.tz;
2005        match &self.when {
2006            When::PerPc(p) => Lowered {
2007                cron: POLL_CRON.into(),
2008                mode: ExecMode::OncePerPc,
2009                cooldown: p.cooldown(),
2010                tz,
2011            },
2012            When::PerTarget(p) => Lowered {
2013                cron: POLL_CRON.into(),
2014                mode: ExecMode::OncePerTarget,
2015                cooldown: p.cooldown(),
2016                tz,
2017            },
2018            // `to_cron` only fails on a malformed `at` (rejected by
2019            // validate() at create time). For a hand-edited KV blob
2020            // that slipped past, emit a deliberately-invalid cron so
2021            // register()'s Job::new_async_tz fails → warn+skip,
2022            // rather than firing at the wrong time.
2023            When::Calendar(c) => Lowered {
2024                cron: c
2025                    .to_cron()
2026                    .unwrap_or_else(|_| "# invalid calendar at".into()),
2027                mode: ExecMode::EveryTick,
2028                cooldown: None,
2029                tz,
2030            },
2031        }
2032    }
2033
2034    /// Cross-field semantic checks that don't fit pure serde derive
2035    /// — the [`Manifest::validate`] counterpart (#418 decision F;
2036    /// pre-Phase-1 a broken schedule was accepted at create time
2037    /// and silently warn-skipped at tick time). Run at every create
2038    /// site: `kanade schedule create` (client-side) and
2039    /// `POST /api/schedules`. The job_id-exists check lives in the
2040    /// API handler instead — it needs the JOBS KV.
2041    pub fn validate(&self) -> Result<(), String> {
2042        if matches!(self.runs_on, RunsOn::Agent) && matches!(self.when, When::PerTarget(_)) {
2043            return Err(
2044                "when.per_target needs fleet-wide completion data and is backend-only; \
2045                 it cannot be combined with runs_on: agent (each agent self-schedules, \
2046                 so per-target dedup would be deduping across a target of 1)"
2047                    .into(),
2048            );
2049        }
2050        if let Some(cd) = self.lowered().cooldown.as_deref() {
2051            humantime::parse_duration(cd)
2052                .map_err(|e| format!("when.every: invalid duration '{cd}': {e}"))?;
2053        }
2054        if let When::Calendar(c) = &self.when {
2055            // Lower the calendar form to its cron (catches a bad `at`
2056            // and the date+days conflict), then validate that cron
2057            // with the same parser configuration tokio-cron-scheduler
2058            // 0.15 uses internally (croner, seconds required,
2059            // DOM-and-DOW both honored, year optional) — create-time
2060            // validation can never accept what register() rejects.
2061            let cron = c.to_cron()?;
2062            croner::parser::CronParser::builder()
2063                .seconds(croner::parser::Seconds::Required)
2064                .dom_and_dow(true)
2065                .build()
2066                .parse(&cron)
2067                .map_err(|e| format!("when.at lowered to invalid cron '{cron}': {e}"))?;
2068        }
2069        // The other humantime strings on the schedule (claude #419
2070        // review): runtime degrades gracefully on both (bad jitter →
2071        // silent no-op, bad starting_deadline → warn + skipped tick),
2072        // but "rejected at create time" should cover every field the
2073        // operator can typo, not just `when`.
2074        if let Some(j) = &self.plan.jitter {
2075            humantime::parse_duration(j)
2076                .map_err(|e| format!("jitter: invalid duration '{j}': {e}"))?;
2077        }
2078        if let Some(sd) = &self.starting_deadline {
2079            humantime::parse_duration(sd)
2080                .map_err(|e| format!("starting_deadline: invalid duration '{sd}': {e}"))?;
2081        }
2082        let from = self
2083            .active
2084            .from
2085            .as_deref()
2086            .map(|s| Active::parse_bound(s, self.tz))
2087            .transpose()?;
2088        let until = self
2089            .active
2090            .until
2091            .as_deref()
2092            .map(|s| Active::parse_bound(s, self.tz))
2093            .transpose()?;
2094        if let (Some(f), Some(u)) = (from, until) {
2095            if f >= u {
2096                return Err(format!(
2097                    "active.from ({}) must be strictly before active.until ({})",
2098                    self.active.from.as_deref().unwrap_or_default(),
2099                    self.active.until.as_deref().unwrap_or_default(),
2100                ));
2101            }
2102        }
2103        Ok(())
2104    }
2105}
2106
2107fn default_true() -> bool {
2108    true
2109}