kanade_shared/manifest.rs
1use serde::{Deserialize, Serialize};
2
3use crate::wire::{RunAs, Shell, Staleness};
4
5/// YAML job manifest (= registered "what to run", v0.18.0+).
6///
7/// Owns only script-intrinsic fields. **Who** (`target`), **how to
8/// phase fanout** (`rollout`), and **when to stagger start**
9/// (`jitter`) all moved to the Schedule / exec request side — same
10/// script can now be fired against different targets / rollouts
11/// without copying the script body.
12///
13/// `deny_unknown_fields` makes operators copy-pasting an older yaml
14/// that still has `target:` / `rollout:` see a clear parse error at
15/// `kanade job create` time instead of mysteriously losing it.
16#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
17#[serde(deny_unknown_fields)]
18pub struct Manifest {
19 pub id: String,
20 pub version: String,
21 #[serde(default)]
22 pub description: Option<String>,
23 pub execute: Execute,
24 #[serde(default)]
25 pub require_approval: bool,
26 /// Opt-in marker that this job produces a JSON inventory fact
27 /// payload on stdout. When present, the backend's results
28 /// projector parses `ExecResult.stdout` as JSON and upserts an
29 /// `inventory_facts` row keyed by `(pc_id, manifest.id)`. The
30 /// `display` sub-config drives the SPA's Inventory page render.
31 #[serde(default)]
32 pub inventory: Option<InventoryHint>,
33 /// Issue #246: opt-in marker that this job emits per-line
34 /// observability events on stdout (one JSON `ObsEvent` per
35 /// newline). When present, the agent — after the script exits
36 /// successfully — parses each non-empty stdout line as an
37 /// `ObsEvent`, publishes it on `obs.<pc_id>` via the
38 /// `obs_outbox`, and (intentionally) **omits the stdout from
39 /// the `ExecResult`** so the timeline data doesn't double up
40 /// in `execution_results.stdout` (which would multiply rows
41 /// by ~50/day/PC of noise).
42 ///
43 /// Distinct from `inventory:` (single JSON object → projector
44 /// upsert) — events are append-only timeline points consumed
45 /// by the dedicated `obs_events` table.
46 #[serde(default)]
47 pub emit: Option<EmitConfig>,
48 /// v0.26: Layer 2 staleness policy (SPEC.md §2.6.2). Controls
49 /// what the agent does at fire time when it can't verify the
50 /// `script_current` / `script_status` KV values are fresh —
51 /// especially relevant for `runs_on: agent` schedules where
52 /// the agent may fire from cache while offline. Defaults to
53 /// `Staleness::Cached` (silently use cached values), which
54 /// matches every pre-v0.26 Manifest.
55 #[serde(default)]
56 pub staleness: Staleness,
57}
58
59/// "Who + how + when-to-stagger" — the fanout-plan side of an exec.
60/// Used both as the POST `/api/exec/{job_id}` body and as the embedded
61/// `target` / `rollout` / `jitter` slot on [`Schedule`]. Centralising
62/// here keeps the validation + serialisation logic in one place.
63#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default)]
64pub struct FanoutPlan {
65 #[serde(default)]
66 pub target: Target,
67 /// Optional wave rollout — when present, the backend publishes
68 /// each wave's group subject on its own delay schedule instead
69 /// of fanning out the `target` block in one go. `target` then
70 /// only labels the deploy for the audit log.
71 #[serde(default, skip_serializing_if = "Option::is_none")]
72 pub rollout: Option<Rollout>,
73 /// Optional humantime jitter; agent uses it to randomise
74 /// execution start. Lives here (not on the script) so different
75 /// schedules / ad-hoc fires of the same job can pick different
76 /// stagger windows.
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub jitter: Option<String>,
79 /// Absolute time the scheduler stamps on each emitted Command
80 /// when this exec was driven by a [`Schedule`] with
81 /// `starting_deadline`. Agents receiving a Command after this
82 /// instant publish a synthetic skipped-result instead of
83 /// running the script. `None` (default) = no deadline / catch
84 /// up whenever delivered. Operators don't usually set this
85 /// directly — the scheduler computes it from `tick_at +
86 /// starting_deadline`.
87 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub deadline_at: Option<chrono::DateTime<chrono::Utc>>,
89}
90
91/// Manifest sub-section: how the SPA should render the inventory
92/// facts this job produces. Each field name (`field`) is a top-level
93/// key in the stdout JSON, e.g. `hostname`, `ram_gb`.
94///
95/// Two render modes:
96/// * `display` — vertical "field / value" per PC, used by the
97/// `/inventory?pc=<id>` detail view. ALL columns the operator
98/// wants visible on the detail page.
99/// * `summary` — horizontal table across the fleet (row = PC,
100/// column = field) on `/inventory`. Optional; when omitted the
101/// SPA falls back to `display`, but operators usually want a
102/// trimmer "hostname / OS / CPU / RAM" set for the fleet view.
103#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
104pub struct InventoryHint {
105 /// Detail-view columns, in order.
106 pub display: Vec<DisplayField>,
107 /// Optional fleet-list columns (row = PC). Defaults to `display`
108 /// when omitted, but operators usually pick a 3-5 column subset.
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub summary: Option<Vec<DisplayField>>,
111 /// v0.31 / #40: payload arrays that should be exploded into
112 /// per-element rows of a derived SQLite table. Lets operators
113 /// answer cross-PC questions ("which PCs still have Chrome <
114 /// 120?", "C: >90% full") with normal SQL filters + indexes
115 /// instead of grepping JSON. The projector creates the derived
116 /// table on register and replaces this PC's rows on each result
117 /// (DELETE WHERE pc_id=? AND job_id=? + bulk INSERT). See
118 /// [`ExplodeSpec`] for the per-spec schema.
119 #[serde(default, skip_serializing_if = "Option::is_none")]
120 pub explode: Option<Vec<ExplodeSpec>>,
121 /// v0.35 / #93: top-level scalar fields whose changes the
122 /// projector logs to `inventory_history` (one event per
123 /// changed field per scan). Pairs with `explode[].track_history`
124 /// — that covers array elements; this covers single-valued
125 /// fields like `ram_bytes` / `os_version` / `cpu_model` /
126 /// `os_build` that operators want to track for "did the RAM
127 /// get upgraded?" / "when did Win 11 land on this PC?" /
128 /// "BIOS / firmware bumped?" questions. Field name = `field_path`
129 /// in the history row, `identity_json` is NULL, `before_json`
130 /// / `after_json` each carry `{"value": <prior or new value>}`.
131 /// First-ever observation of a scalar (no prior facts row)
132 /// emits `added`; subsequent value changes emit `changed`. No
133 /// `removed` events — a scalar disappearing from the payload
134 /// is rare and the operator can still see the last value via
135 /// the `before_json` of the most recent change.
136 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub history_scalars: Option<Vec<String>>,
138}
139
140/// Issue #246 — `emit:` manifest block for jobs whose stdout is
141/// NDJSON observability events (one `ObsEvent` per line). Parallel
142/// to `inventory:` but for the append-only timeline pipeline; see
143/// `Manifest::emit` for the full contract.
144#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
145#[serde(deny_unknown_fields)]
146pub struct EmitConfig {
147 /// What kind of payload the agent should expect on stdout. Only
148 /// `events` is defined today (parses each non-empty line as
149 /// `ObsEvent` and publishes on `obs.<pc_id>`); future variants
150 /// (e.g. metrics streams, structured trace events) plug in here.
151 #[serde(rename = "type")]
152 pub kind: EmitKind,
153 /// Operator hint for where the script keeps its own state — the
154 /// watermark file the PowerShell / sh body reads + writes
155 /// between runs so it only emits NEW events since the last
156 /// poll. The agent doesn't read this; it's documentation that
157 /// the SPA (and `kanade job edit`) can surface to operators
158 /// reviewing the manifest. Optional; the script is allowed to
159 /// keep state anywhere (registry, env, etc.) — the field's
160 /// presence makes the convention discoverable.
161 #[serde(default, skip_serializing_if = "Option::is_none")]
162 pub watermark_path: Option<String>,
163}
164
165/// `emit.type` enum. Lowercase serde so manifests read
166/// `type: events` rather than `Events`.
167#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
168#[serde(rename_all = "lowercase")]
169pub enum EmitKind {
170 /// Per-line `ObsEvent` JSON. Agent parses + publishes on
171 /// `obs.<pc_id>`, drops the stdout from the resulting
172 /// `ExecResult`.
173 Events,
174}
175
176/// v0.31 / #40: declarative "flatten this JSON array into a real
177/// SQLite table" spec on an inventory manifest. The projector
178/// creates the table on first registration (CREATE TABLE IF NOT
179/// EXISTS + indexes) and writes a row per element of
180/// `payload[field]` on every result, scoped by (pc_id, job_id) so
181/// each PC's rows replace cleanly without a per-PC schema.
182#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
183pub struct ExplodeSpec {
184 /// JSON array key under the payload to explode. E.g. `"apps"`
185 /// for `payload: { apps: [{...}, {...}] }`.
186 pub field: String,
187 /// Derived SQLite table name. Operators choose this — pick
188 /// something namespaced + stable (`inventory_sw_apps`, not
189 /// `apps`) so multiple inventory manifests don't collide on a
190 /// generic name.
191 pub table: String,
192 /// Element-level fields that uniquely identify a row inside one
193 /// PC's payload. The full PK is `(pc_id, job_id) + these
194 /// columns`. Required — operators must think about uniqueness
195 /// (e.g. `["name", "source"]` for installed apps because the
196 /// same name appears in multiple uninstall hives).
197 ///
198 /// v0.31 / #41: same tuple drives history identity. When
199 /// `track_history` is on, the projector serialises these
200 /// fields' values into `inventory_history.identity_json` for
201 /// every change event, so queries like "every PC that ever
202 /// installed Chrome (any source)" filter on identity_json
203 /// content without a per-manifest schema.
204 pub primary_key: Vec<String>,
205 /// Per-element fields that become columns in the derived table.
206 pub columns: Vec<ExplodeColumn>,
207 /// v0.31 / #41: when true (default false), the projector
208 /// diffs each PC's incoming payload against the prior rows
209 /// for the same (pc_id, job_id) BEFORE the DELETE-then-INSERT
210 /// replace, and writes added / removed / changed events into
211 /// `inventory_history`. Lets operators answer time-dimension
212 /// questions ("when did Chrome 120 first appear on PC X?",
213 /// "what's the Win 11 23H2 rollout curve") without storing
214 /// per-scan snapshots. Off by default so operators opt in
215 /// per-spec — history has a real storage cost on long-lived
216 /// deployments (mitigated by the 90-day default retention
217 /// sweeper, see `cleanup` module).
218 #[serde(default)]
219 pub track_history: bool,
220}
221
222/// One column in an [`ExplodeSpec`]'s derived table.
223#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
224pub struct ExplodeColumn {
225 /// JSON key under each array element. Becomes the column name
226 /// in the derived SQLite table — we don't rename.
227 pub field: String,
228 /// SQLite affinity: `"text"` (default), `"integer"`, `"real"`.
229 /// Storage maps directly via `sqlx::query.bind(...)`; type
230 /// mismatches at INSERT-time fail loudly rather than silently
231 /// dropping the row.
232 #[serde(default, skip_serializing_if = "Option::is_none")]
233 #[serde(rename = "type")]
234 pub kind: Option<String>,
235 /// When true, the projector creates a `CREATE INDEX` on this
236 /// column at table-creation time. Boost for the common-filter
237 /// columns (`name`, `version`) — operators mark them
238 /// explicitly, the projector won't guess.
239 #[serde(default)]
240 pub index: bool,
241}
242
243#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
244pub struct DisplayField {
245 /// Top-level key in the stdout JSON.
246 pub field: String,
247 /// Human-readable column header.
248 pub label: String,
249 /// Optional render hint — `"number"`, `"bytes"`, `"timestamp"`,
250 /// or `"table"` (#39). Defaults to plain text rendering on the
251 /// SPA side. `"table"` expects the field's value to be a JSON
252 /// array of objects and renders a nested sub-table on the
253 /// per-PC detail page using `columns` as the schema; the fleet
254 /// summary view falls back to showing the row count for
255 /// `"table"` cells so the wide list stays compact.
256 #[serde(default, skip_serializing_if = "Option::is_none")]
257 #[serde(rename = "type")]
258 pub kind: Option<String>,
259 /// v0.30 / #39: when `kind == "table"`, the SPA renders the
260 /// field's value (an array of objects like
261 /// `disks: [{ device_id, size_bytes, ... }]`) as a nested
262 /// sub-table using these columns. Each column is itself a
263 /// `DisplayField`, so the nested cells reuse the same render
264 /// hints (`bytes`, `number`, `timestamp`) — no parallel format
265 /// pipeline. Ignored for any other `kind`.
266 #[serde(default, skip_serializing_if = "Option::is_none")]
267 pub columns: Option<Vec<DisplayField>>,
268}
269
270#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
271pub struct Rollout {
272 #[serde(default)]
273 pub strategy: RolloutStrategy,
274 pub waves: Vec<Wave>,
275}
276
277#[derive(
278 Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
279)]
280#[serde(rename_all = "lowercase")]
281pub enum RolloutStrategy {
282 #[default]
283 Wave,
284}
285
286#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
287pub struct Wave {
288 pub group: String,
289 /// humantime delay measured from the deploy's publish time. wave[0]
290 /// typically has "0s"; subsequent waves use minutes / hours.
291 pub delay: String,
292}
293
294#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default)]
295pub struct Target {
296 #[serde(default)]
297 pub groups: Vec<String>,
298 #[serde(default)]
299 pub pcs: Vec<String>,
300 #[serde(default)]
301 pub all: bool,
302}
303
304impl Target {
305 /// At least one of all / groups / pcs is set.
306 pub fn is_specified(&self) -> bool {
307 self.all || !self.groups.is_empty() || !self.pcs.is_empty()
308 }
309}
310
311#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
312#[serde(deny_unknown_fields)]
313pub struct Execute {
314 pub shell: ExecuteShell,
315 /// Inline script body. Mutually exclusive with [`script_file`]
316 /// and [`script_object`]; exactly one of the three must be set
317 /// (enforced by [`Execute::validate_script_source`] at the
318 /// write-side parse boundaries — `kanade job create` and
319 /// `POST /api/jobs`).
320 ///
321 /// Empty string is treated as **unset** so operators can swap
322 /// to a `script_file:` / `script_object:` alternative just by
323 /// commenting out the body, without having to also drop the
324 /// `script:` key entirely.
325 ///
326 /// [`script_file`]: Self::script_file
327 /// [`script_object`]: Self::script_object
328 #[serde(default, skip_serializing_if = "Option::is_none")]
329 pub script: Option<String>,
330 /// Repo-local file path resolved by the operator-side CLI at
331 /// `kanade job create` time. The CLI reads the file, slots its
332 /// contents into `script`, and clears this field before
333 /// POSTing — so the backend / agents never see `script_file`
334 /// in stored manifests. SPEC §2.4.1.
335 ///
336 /// Resolver lands in a follow-up PR
337 /// (yukimemi/kanade#210); today this field passes parse-time
338 /// validation but the operator-side CLI bails with "not yet
339 /// implemented" until the resolver ships, so manifests that
340 /// reach the backend with `script_file` set are treated as a
341 /// schema-bug.
342 #[serde(default, skip_serializing_if = "Option::is_none")]
343 pub script_file: Option<String>,
344 /// Object Store reference (`<name>/<version>`) into the
345 /// `scripts` bucket (`OBJECT_SCRIPTS`). Agents fetch the body
346 /// at Execute time via `/api/script-objects/{name}/{version}`
347 /// and cache it locally. SPEC §2.4.1.
348 ///
349 /// Resolver lands in the same follow-up PR as `script_file`;
350 /// today this field passes parse-time validation but the
351 /// backend / agent exec paths bail with "not yet implemented"
352 /// when they see it.
353 #[serde(default, skip_serializing_if = "Option::is_none")]
354 pub script_object: Option<String>,
355 /// humantime duration string (e.g. "30s", "10m"). Script-intrinsic
356 /// — represents how long this script reasonably takes to run.
357 pub timeout: String,
358 /// Token + session combination the agent uses to launch the
359 /// script (v0.21). Default = [`RunAs::System`] (Session 0,
360 /// LocalSystem privileges, no GUI) — matches pre-v0.21 behavior.
361 #[serde(default)]
362 pub run_as: RunAs,
363 /// Working directory for the spawned child (v0.21.1). When
364 /// unset, the child inherits the agent's cwd — on Windows that
365 /// means `%SystemRoot%\System32` for the prod service, which is
366 /// almost never what operators actually want. Use an absolute
367 /// path; relative paths are passed through to the OS verbatim.
368 /// `%PROGRAMDATA%` works for `run_as: system`; for `run_as: user`
369 /// you'd want `%USERPROFILE%` (but expansion happens in the
370 /// shell, so write `$env:USERPROFILE` for PowerShell, or set
371 /// it via teravars before `kanade job create`).
372 #[serde(default, skip_serializing_if = "Option::is_none")]
373 pub cwd: Option<String>,
374}
375
376impl Execute {
377 /// Treat an empty `script:` body as "intentionally unset". Operators
378 /// commenting out a block-scalar tend to leave the key behind, and
379 /// failing the validator on `script: ""` would surprise them.
380 fn has_inline_script(&self) -> bool {
381 matches!(&self.script, Some(s) if !s.is_empty())
382 }
383
384 /// Enforce that exactly one of `script` / `script_file` /
385 /// `script_object` is set. Called at the write-side parse
386 /// boundaries (CLI `kanade job create` + backend
387 /// `POST /api/jobs`) so ambiguous YAML is rejected before it
388 /// reaches the JOBS KV. Read paths (projector, agent
389 /// scheduler, list endpoints) skip this check — they only ever
390 /// see what the write path already validated.
391 pub fn validate_script_source(&self) -> Result<(), String> {
392 let inline = self.has_inline_script();
393 let file = self.script_file.is_some();
394 let obj = self.script_object.is_some();
395 let set = [inline, file, obj].into_iter().filter(|b| *b).count();
396 match set {
397 1 => Ok(()),
398 0 => Err("execute: one of `script`, `script_file`, `script_object` must be set".into()),
399 _ => Err(format!(
400 "execute: only one of `script` / `script_file` / `script_object` may be set \
401 (got script={inline}, script_file={file}, script_object={obj})"
402 )),
403 }
404 }
405}
406
407impl Manifest {
408 /// Cross-field semantic checks that don't fit into pure serde
409 /// derive. Currently delegates to
410 /// [`Execute::validate_script_source`] — see that method's
411 /// docs for the rationale on which call sites should run this.
412 pub fn validate(&self) -> Result<(), String> {
413 self.execute.validate_script_source()?;
414 Ok(())
415 }
416}
417
418#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
419#[serde(rename_all = "lowercase")]
420pub enum ExecuteShell {
421 Powershell,
422 Cmd,
423}
424
425impl From<ExecuteShell> for Shell {
426 fn from(s: ExecuteShell) -> Self {
427 match s {
428 ExecuteShell::Powershell => Shell::Powershell,
429 ExecuteShell::Cmd => Shell::Cmd,
430 }
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn target_is_specified_requires_at_least_one_field() {
440 let empty = Target::default();
441 assert!(!empty.is_specified());
442
443 let with_all = Target {
444 all: true,
445 ..Target::default()
446 };
447 assert!(with_all.is_specified());
448
449 let with_groups = Target {
450 groups: vec!["canary".into()],
451 ..Target::default()
452 };
453 assert!(with_groups.is_specified());
454
455 let with_pcs = Target {
456 pcs: vec!["pc-01".into()],
457 ..Target::default()
458 };
459 assert!(with_pcs.is_specified());
460 }
461
462 #[test]
463 fn manifest_deserialises_minimal_yaml() {
464 // Matches jobs/echo-test.yaml. v0.18: no target/rollout/jitter
465 // — those live on the schedule / exec request now.
466 let yaml = r#"
467id: echo-test
468version: 0.0.1
469execute:
470 shell: powershell
471 script: "echo 'kanade'"
472 timeout: 30s
473"#;
474 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
475 assert_eq!(m.id, "echo-test");
476 assert_eq!(m.version, "0.0.1");
477 assert!(matches!(m.execute.shell, ExecuteShell::Powershell));
478 assert_eq!(
479 m.execute.script.as_deref().map(str::trim),
480 Some("echo 'kanade'")
481 );
482 assert!(m.execute.script_file.is_none());
483 assert!(m.execute.script_object.is_none());
484 assert_eq!(m.execute.timeout, "30s");
485 assert!(!m.require_approval);
486 m.validate()
487 .expect("inline-script manifest passes validation");
488 }
489
490 fn execute_with(
491 script: Option<&str>,
492 script_file: Option<&str>,
493 script_object: Option<&str>,
494 ) -> Execute {
495 Execute {
496 shell: ExecuteShell::Powershell,
497 script: script.map(str::to_owned),
498 script_file: script_file.map(str::to_owned),
499 script_object: script_object.map(str::to_owned),
500 timeout: "30s".into(),
501 run_as: RunAs::default(),
502 cwd: None,
503 }
504 }
505
506 #[test]
507 fn validate_accepts_inline_script() {
508 let e = execute_with(Some("echo hi"), None, None);
509 assert!(e.validate_script_source().is_ok());
510 }
511
512 #[test]
513 fn validate_accepts_script_file_alone() {
514 let e = execute_with(None, Some("scripts/cleanup.ps1"), None);
515 assert!(e.validate_script_source().is_ok());
516 }
517
518 #[test]
519 fn validate_accepts_script_object_alone() {
520 let e = execute_with(None, None, Some("cleanup/1.0.0"));
521 assert!(e.validate_script_source().is_ok());
522 }
523
524 #[test]
525 fn validate_treats_empty_inline_script_as_unset() {
526 // `script: ""` + `script_object` set is the natural shape
527 // when an operator comments out the YAML block-scalar body
528 // but leaves the key. Should pass.
529 let e = execute_with(Some(""), None, Some("cleanup/1.0.0"));
530 assert!(e.validate_script_source().is_ok());
531 }
532
533 #[test]
534 fn validate_rejects_zero_sources() {
535 let e = execute_with(None, None, None);
536 let err = e.validate_script_source().unwrap_err();
537 assert!(err.contains("must be set"), "got: {err}");
538 }
539
540 #[test]
541 fn validate_rejects_empty_inline_only() {
542 let e = execute_with(Some(""), None, None);
543 let err = e.validate_script_source().unwrap_err();
544 assert!(err.contains("must be set"), "got: {err}");
545 }
546
547 #[test]
548 fn validate_rejects_inline_plus_file() {
549 let e = execute_with(Some("echo hi"), Some("scripts/cleanup.ps1"), None);
550 let err = e.validate_script_source().unwrap_err();
551 assert!(err.contains("only one of"), "got: {err}");
552 }
553
554 #[test]
555 fn validate_rejects_inline_plus_object() {
556 let e = execute_with(Some("echo hi"), None, Some("cleanup/1.0.0"));
557 let err = e.validate_script_source().unwrap_err();
558 assert!(err.contains("only one of"), "got: {err}");
559 }
560
561 #[test]
562 fn validate_rejects_file_plus_object() {
563 let e = execute_with(None, Some("scripts/cleanup.ps1"), Some("cleanup/1.0.0"));
564 let err = e.validate_script_source().unwrap_err();
565 assert!(err.contains("only one of"), "got: {err}");
566 }
567
568 #[test]
569 fn validate_rejects_all_three() {
570 let e = execute_with(
571 Some("echo hi"),
572 Some("scripts/cleanup.ps1"),
573 Some("cleanup/1.0.0"),
574 );
575 let err = e.validate_script_source().unwrap_err();
576 assert!(err.contains("only one of"), "got: {err}");
577 }
578
579 #[test]
580 fn manifest_deserialises_script_object_yaml() {
581 // SPEC §2.4.1 example shape with the Object Store
582 // reference picked over inline.
583 let yaml = r#"
584id: cleanup-disk-temp
585version: 1.0.1
586execute:
587 shell: powershell
588 script_object: cleanup-disk-temp/1.0.1
589 timeout: 600s
590"#;
591 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
592 assert_eq!(
593 m.execute.script_object.as_deref(),
594 Some("cleanup-disk-temp/1.0.1")
595 );
596 assert!(m.execute.script.is_none());
597 m.validate()
598 .expect("script_object-only manifest passes validation");
599 }
600
601 #[test]
602 fn manifest_rejects_typo_in_script_field_name() {
603 // `deny_unknown_fields` on Execute catches `script_objectt`
604 // and similar fat-fingers at parse time instead of letting
605 // them silently fall through to "all three unset".
606 let yaml = r#"
607id: typo
608version: 1.0.0
609execute:
610 shell: powershell
611 script_objectt: oops
612 timeout: 30s
613"#;
614 let r: Result<Manifest, _> = serde_yaml::from_str(yaml);
615 assert!(r.is_err(), "expected parse error, got {r:?}");
616 }
617
618 #[test]
619 fn schedule_carries_target_and_rollout() {
620 let yaml = r#"
621id: hourly-cleanup-canary
622when:
623 per_pc: { every: 1h }
624job_id: cleanup
625enabled: true
626target:
627 groups: [canary, wave1]
628jitter: 30s
629rollout:
630 strategy: wave
631 waves:
632 - { group: canary, delay: 0s }
633 - { group: wave1, delay: 5s }
634"#;
635 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
636 assert_eq!(s.id, "hourly-cleanup-canary");
637 assert_eq!(s.job_id, "cleanup");
638 assert_eq!(s.plan.target.groups, vec!["canary", "wave1"]);
639 assert_eq!(s.plan.jitter.as_deref(), Some("30s"));
640 let rollout = s.plan.rollout.expect("rollout present");
641 assert_eq!(rollout.waves.len(), 2);
642 assert_eq!(rollout.waves[0].group, "canary");
643 assert_eq!(rollout.waves[1].delay, "5s");
644 assert_eq!(rollout.strategy, RolloutStrategy::Wave);
645 }
646
647 #[test]
648 fn schedule_minimal_target_all() {
649 let yaml = r#"
650id: kitting
651when:
652 per_pc: once
653enabled: true
654job_id: scheduled-echo
655target: { all: true }
656"#;
657 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
658 assert_eq!(s.id, "kitting");
659 assert_eq!(s.when, When::PerPc(PerPolicy::Once(OnceLiteral::Once)));
660 assert!(s.enabled);
661 assert_eq!(s.job_id, "scheduled-echo");
662 assert!(s.plan.target.all);
663 assert!(s.plan.rollout.is_none());
664 assert!(s.plan.jitter.is_none());
665 assert!(s.active.is_empty());
666 }
667
668 #[test]
669 fn schedule_enabled_defaults_to_true() {
670 let yaml = r#"
671id: x
672when:
673 per_pc: once
674job_id: y
675target: { all: true }
676"#;
677 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
678 assert!(s.enabled);
679 }
680
681 // ---- `when` parsing (#418 Phase 1) ----
682
683 fn schedule_yaml_with(when_block: &str) -> String {
684 format!(
685 r#"
686id: x
687when:
688{when_block}
689job_id: y
690target: {{ all: true }}
691"#
692 )
693 }
694
695 #[test]
696 fn when_per_pc_every_parses_unquoted_humantime() {
697 // `6h` is digit-led but non-numeric → YAML string, same as
698 // the old `cooldown: 6h` convention. No quotes needed.
699 let s: Schedule =
700 serde_yaml::from_str(&schedule_yaml_with(" per_pc: { every: 6h }")).expect("parse");
701 assert_eq!(
702 s.when,
703 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() }))
704 );
705 }
706
707 #[test]
708 fn when_per_target_every_parses() {
709 let s: Schedule = serde_yaml::from_str(&schedule_yaml_with(" per_target: { every: 24h }"))
710 .expect("parse");
711 assert_eq!(
712 s.when,
713 When::PerTarget(PerPolicy::Every(EverySpec {
714 every: "24h".into()
715 }))
716 );
717 }
718
719 #[test]
720 fn when_per_target_once_parses() {
721 // Falls out of the shared PerPolicy shape and decide_fire
722 // already implements it ("any one pc succeeds → skip the
723 // target forever"), so it is allowed, not rejected.
724 let s: Schedule =
725 serde_yaml::from_str(&schedule_yaml_with(" per_target: once")).expect("parse");
726 assert_eq!(s.when, When::PerTarget(PerPolicy::Once(OnceLiteral::Once)));
727 }
728
729 #[test]
730 fn when_calendar_time_parses() {
731 let s: Schedule = serde_yaml::from_str(&schedule_yaml_with(
732 " calendar:\n at: \"09:00\"\n days: [mon-fri]",
733 ))
734 .expect("parse");
735 match &s.when {
736 When::Calendar(c) => {
737 assert_eq!(c.at, "09:00");
738 assert_eq!(c.days, vec!["mon-fri"]);
739 }
740 other => panic!("expected calendar, got {other:?}"),
741 }
742 }
743
744 #[test]
745 fn when_calendar_days_default_empty() {
746 let s: Schedule =
747 serde_yaml::from_str(&schedule_yaml_with(" calendar:\n at: \"09:00\""))
748 .expect("parse");
749 match &s.when {
750 When::Calendar(c) => assert!(c.days.is_empty(), "days defaults to empty (= daily)"),
751 other => panic!("expected calendar, got {other:?}"),
752 }
753 }
754
755 #[test]
756 fn when_calendar_datetime_parses_all_separators() {
757 // one-shot: date+time in hyphen / ISO-T / slash forms
758 for at in ["2026-06-10 09:00", "2026-06-10T09:00", "2026/06/10 09:00"] {
759 let block = format!(" calendar:\n at: \"{at}\"");
760 let s: Schedule = serde_yaml::from_str(&schedule_yaml_with(&block))
761 .unwrap_or_else(|e| panic!("parse '{at}': {e}"));
762 match &s.when {
763 When::Calendar(c) => {
764 use chrono::Datelike;
765 let p = c.parse_at().expect("parse_at");
766 let d = p.date.expect("datetime at carries a date");
767 assert_eq!((d.year(), d.month(), d.day()), (2026, 6, 10), "for '{at}'");
768 }
769 other => panic!("expected calendar, got {other:?}"),
770 }
771 }
772 }
773
774 #[test]
775 fn when_rejects_bad_once_keyword() {
776 // `onec` must be a parse error, not a silently-absorbed
777 // string (OnceLiteral is a single-variant enum for exactly
778 // this reason).
779 let r: Result<Schedule, _> = serde_yaml::from_str(&schedule_yaml_with(" per_pc: onec"));
780 assert!(r.is_err(), "expected parse error, got {r:?}");
781 }
782
783 #[test]
784 fn when_rejects_unknown_key_in_every() {
785 // EverySpec is deny_unknown_fields so `evry:` typos fail
786 // even under the untagged PerPolicy.
787 let r: Result<Schedule, _> =
788 serde_yaml::from_str(&schedule_yaml_with(" per_pc: { evry: 6h }"));
789 assert!(r.is_err(), "expected parse error, got {r:?}");
790 }
791
792 #[test]
793 fn when_rejects_unknown_variant() {
794 let r: Result<Schedule, _> =
795 serde_yaml::from_str(&schedule_yaml_with(" per_galaxy: once"));
796 assert!(r.is_err(), "expected parse error, got {r:?}");
797 }
798
799 #[test]
800 fn when_rejects_old_top_level_cron_field() {
801 // Pre-#418 shape: top-level `cron:` + no `when:`. Must fail
802 // loudly (missing `when`), which is what turns stale KV
803 // blobs into warn-skips after the upgrade.
804 let yaml = r#"
805id: x
806cron: "* * * * * *"
807job_id: y
808target: { all: true }
809"#;
810 let r: Result<Schedule, _> = serde_yaml::from_str(yaml);
811 assert!(r.is_err(), "expected parse error, got {r:?}");
812 }
813
814 #[test]
815 fn when_rejects_retired_cron_escape_hatch() {
816 // #418 Phase 2 retired `when: { cron: "..." }`. A raw cron
817 // is now an unknown variant → parse error (operators use the
818 // calendar form instead).
819 let r: Result<Schedule, _> =
820 serde_yaml::from_str(&schedule_yaml_with(" cron: \"0 0 9 * * mon-fri\""));
821 assert!(
822 r.is_err(),
823 "expected parse error for retired cron, got {r:?}"
824 );
825 }
826
827 #[test]
828 fn when_round_trips_json_and_yaml() {
829 // Round-trip through the full Schedule: that is the wire
830 // unit for both stores (JSON catalog KV + YAML mirror), and
831 // it exercises the singleton_map field attribute that keeps
832 // serde_yaml on the map shape instead of `!per_pc` tags.
833 for when in [
834 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
835 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
836 When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
837 When::PerTarget(PerPolicy::Every(EverySpec {
838 every: "24h".into(),
839 })),
840 calendar("09:00", &["mon-fri"]),
841 calendar("2026-06-10 09:00", &[]),
842 ] {
843 let s = schedule_with(when.clone(), RunsOn::Backend);
844
845 let json = serde_json::to_string(&s).expect("json serialise");
846 let back: Schedule = serde_json::from_str(&json).expect("json deserialise");
847 assert_eq!(back.when, when, "json round-trip for {when}");
848
849 let yaml = serde_yaml::to_string(&s).expect("yaml serialise");
850 assert!(
851 !yaml.contains('!'),
852 "yaml must use the map shape, not tags: {yaml}"
853 );
854 let back: Schedule = serde_yaml::from_str(&yaml).expect("yaml deserialise");
855 assert_eq!(back.when, when, "yaml round-trip for {when}");
856 }
857 }
858
859 #[test]
860 fn when_once_serialises_as_bare_keyword() {
861 // The wire shape operators see in the YAML mirror must stay
862 // the ergonomic `per_pc: once`, not a one-variant map.
863 let json = serde_json::to_value(When::PerPc(PerPolicy::Once(OnceLiteral::Once)))
864 .expect("serialise");
865 assert_eq!(json, serde_json::json!({ "per_pc": "once" }));
866 }
867
868 #[test]
869 fn when_displays_operator_summary() {
870 for (when, expected) in [
871 (
872 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
873 "per_pc once",
874 ),
875 (
876 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
877 "per_pc every 6h",
878 ),
879 (
880 When::PerTarget(PerPolicy::Every(EverySpec {
881 every: "24h".into(),
882 })),
883 "per_target every 24h",
884 ),
885 (calendar("09:00", &["mon-fri"]), "at 09:00 [mon-fri]"),
886 (calendar("2026-06-10 09:00", &[]), "at 2026-06-10 09:00"),
887 ] {
888 assert_eq!(when.to_string(), expected);
889 }
890 }
891
892 // ---- lowering (#418: when → engine vocabulary) ----
893
894 fn schedule_with(when: When, runs_on: RunsOn) -> Schedule {
895 Schedule {
896 id: "x".into(),
897 when,
898 job_id: "y".into(),
899 plan: FanoutPlan::default(),
900 active: Active::default(),
901 tz: ScheduleTz::default(),
902 starting_deadline: None,
903 runs_on,
904 enabled: true,
905 }
906 }
907
908 fn calendar(at: &str, days: &[&str]) -> When {
909 When::Calendar(CalendarSpec {
910 at: at.into(),
911 days: days.iter().map(|d| (*d).to_string()).collect(),
912 })
913 }
914
915 #[test]
916 fn lowering_matches_the_418_table() {
917 let cases = [
918 (
919 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
920 (POLL_CRON, ExecMode::OncePerPc, None),
921 ),
922 (
923 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
924 (POLL_CRON, ExecMode::OncePerPc, Some("6h")),
925 ),
926 (
927 When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
928 (POLL_CRON, ExecMode::OncePerTarget, None),
929 ),
930 (
931 When::PerTarget(PerPolicy::Every(EverySpec {
932 every: "24h".into(),
933 })),
934 (POLL_CRON, ExecMode::OncePerTarget, Some("24h")),
935 ),
936 // calendar repeating → 6-field cron
937 (
938 calendar("09:00", &["mon-fri"]),
939 ("0 0 9 * * mon-fri", ExecMode::EveryTick, None),
940 ),
941 // calendar daily (no days) → DOW *
942 (
943 calendar("18:30", &[]),
944 ("0 30 18 * * *", ExecMode::EveryTick, None),
945 ),
946 // calendar one-shot → 7-field year cron
947 (
948 calendar("2026-06-10 09:00", &[]),
949 ("0 0 9 10 6 * 2026", ExecMode::EveryTick, None),
950 ),
951 ];
952 for (when, (cron, mode, cooldown)) in cases {
953 let l = schedule_with(when.clone(), RunsOn::Backend).lowered();
954 assert_eq!(l.cron, cron, "cron for {when}");
955 assert_eq!(l.mode, mode, "mode for {when}");
956 assert_eq!(l.cooldown.as_deref(), cooldown, "cooldown for {when}");
957 }
958 }
959
960 #[test]
961 fn lowered_carries_schedule_tz() {
962 for (tz, want) in [
963 (ScheduleTz::Local, ScheduleTz::Local),
964 (ScheduleTz::Utc, ScheduleTz::Utc),
965 ] {
966 let mut s = schedule_with(calendar("09:00", &["mon-fri"]), RunsOn::Backend);
967 s.tz = tz;
968 assert_eq!(s.lowered().tz, want, "calendar carries tz");
969 // reconcile shapes carry tz too (for the active-window check)
970 let mut s = schedule_with(
971 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
972 RunsOn::Backend,
973 );
974 s.tz = tz;
975 assert_eq!(s.lowered().tz, want, "reconcile carries tz");
976 }
977 }
978
979 #[test]
980 fn poll_cron_is_accepted_by_the_engine_parser() {
981 // POLL_CRON is system-generated — if the engine's parser
982 // ever rejected it every reconcile schedule would die at
983 // register time. Validate it with the same croner config
984 // (Seconds::Required, dom_and_dow, year optional).
985 croner::parser::CronParser::builder()
986 .seconds(croner::parser::Seconds::Required)
987 .dom_and_dow(true)
988 .build()
989 .parse(POLL_CRON)
990 .expect("POLL_CRON must parse");
991 }
992
993 // ---- Schedule::validate() (#418 decision F) ----
994
995 #[test]
996 fn validate_accepts_reconcile_shapes() {
997 for when in [
998 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
999 When::PerPc(PerPolicy::Every(EverySpec { every: "6h".into() })),
1000 When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
1001 When::PerTarget(PerPolicy::Every(EverySpec {
1002 every: "24h".into(),
1003 })),
1004 ] {
1005 schedule_with(when.clone(), RunsOn::Backend)
1006 .validate()
1007 .unwrap_or_else(|e| panic!("{when} should validate: {e}"));
1008 }
1009 }
1010
1011 #[test]
1012 fn validate_accepts_per_pc_on_agent() {
1013 schedule_with(
1014 When::PerPc(PerPolicy::Every(EverySpec { every: "1h".into() })),
1015 RunsOn::Agent,
1016 )
1017 .validate()
1018 .expect("per_pc + agent is the offline-inventory shape");
1019 }
1020
1021 #[test]
1022 fn validate_rejects_per_target_on_agent() {
1023 let err = schedule_with(
1024 When::PerTarget(PerPolicy::Every(EverySpec {
1025 every: "24h".into(),
1026 })),
1027 RunsOn::Agent,
1028 )
1029 .validate()
1030 .unwrap_err();
1031 assert!(err.contains("per_target"), "got: {err}");
1032 assert!(err.contains("runs_on: agent"), "got: {err}");
1033
1034 // per_target: once is also backend-only.
1035 let err = schedule_with(
1036 When::PerTarget(PerPolicy::Once(OnceLiteral::Once)),
1037 RunsOn::Agent,
1038 )
1039 .validate()
1040 .unwrap_err();
1041 assert!(err.contains("per_target"), "got (once): {err}");
1042 assert!(err.contains("runs_on: agent"), "got (once): {err}");
1043 }
1044
1045 #[test]
1046 fn validate_rejects_bad_every_duration() {
1047 let err = schedule_with(
1048 When::PerPc(PerPolicy::Every(EverySpec { every: "6x".into() })),
1049 RunsOn::Backend,
1050 )
1051 .validate()
1052 .unwrap_err();
1053 assert!(err.contains("when.every"), "got: {err}");
1054 }
1055
1056 #[test]
1057 fn validate_rejects_bad_jitter_and_starting_deadline() {
1058 let mut s = schedule_with(
1059 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1060 RunsOn::Backend,
1061 );
1062 s.plan.jitter = Some("5x".into());
1063 let err = s.validate().unwrap_err();
1064 assert!(err.contains("jitter"), "got: {err}");
1065
1066 let mut s = schedule_with(
1067 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1068 RunsOn::Backend,
1069 );
1070 s.starting_deadline = Some("soon".into());
1071 let err = s.validate().unwrap_err();
1072 assert!(err.contains("starting_deadline"), "got: {err}");
1073 }
1074
1075 #[test]
1076 fn validate_accepts_calendar_shapes() {
1077 for when in [
1078 calendar("09:00", &["mon-fri"]), // weekday morning
1079 calendar("00:00", &["sun"]), // weekly
1080 calendar("18:30", &[]), // daily
1081 calendar("2026-06-10 09:00", &[]), // one-shot
1082 calendar("2026/12/25 00:00", &[]), // one-shot, slash form
1083 ] {
1084 schedule_with(when.clone(), RunsOn::Backend)
1085 .validate()
1086 .unwrap_or_else(|e| panic!("{when} should validate: {e}"));
1087 }
1088 }
1089
1090 #[test]
1091 fn validate_rejects_bad_at() {
1092 for bad in ["25:00", "09:60", "9", "noon", "2026-13-01 09:00"] {
1093 let err = schedule_with(calendar(bad, &[]), RunsOn::Backend)
1094 .validate()
1095 .unwrap_err();
1096 assert!(err.contains("when.at"), "for '{bad}', got: {err}");
1097 }
1098 }
1099
1100 #[test]
1101 fn validate_rejects_datetime_at_with_days() {
1102 // A dated `at` is a one-shot — pairing it with days is a
1103 // contradiction (the date already pins the day).
1104 let err = schedule_with(calendar("2026-06-10 09:00", &["mon"]), RunsOn::Backend)
1105 .validate()
1106 .unwrap_err();
1107 assert!(
1108 err.contains("one-shot") && err.contains("days"),
1109 "got: {err}"
1110 );
1111 }
1112
1113 #[test]
1114 fn validate_rejects_bad_day_name() {
1115 // A garbage DOW token is caught by the days pre-flight and
1116 // reported against `when.days`, not the confusing
1117 // "when.at lowered to invalid cron" (claude #432 review).
1118 let err = schedule_with(calendar("09:00", &["funday"]), RunsOn::Backend)
1119 .validate()
1120 .unwrap_err();
1121 assert!(err.contains("when.days"), "got: {err}");
1122 assert!(err.contains("funday"), "names the bad token: {err}");
1123 // a degenerate range like `mon-` reports the whole token, not
1124 // a cryptic empty part (claude #432 follow-up)
1125 let err = schedule_with(calendar("09:00", &["mon-"]), RunsOn::Backend)
1126 .validate()
1127 .unwrap_err();
1128 assert!(err.contains("'mon-'"), "names the whole token: {err}");
1129 // valid names / ranges / numeric / * all pass
1130 for ok in [
1131 calendar("09:00", &["mon-fri"]),
1132 calendar("09:00", &["mon", "wed", "sun"]),
1133 calendar("09:00", &["1-5"]),
1134 ] {
1135 schedule_with(ok.clone(), RunsOn::Backend)
1136 .validate()
1137 .unwrap_or_else(|e| panic!("{ok} should validate: {e}"));
1138 }
1139 }
1140
1141 #[test]
1142 fn calendar_oneshot_instant_detects_past() {
1143 use chrono::TimeZone;
1144 // a dated `at` resolves to an absolute instant…
1145 let c = CalendarSpec {
1146 at: "2024-01-01 09:00".into(),
1147 days: vec![],
1148 };
1149 let t = c
1150 .oneshot_instant(ScheduleTz::Utc)
1151 .expect("one-shot instant");
1152 assert_eq!(
1153 t,
1154 chrono::Utc.with_ymd_and_hms(2024, 1, 1, 9, 0, 0).unwrap()
1155 );
1156 assert!(t < chrono::Utc::now(), "2024 is in the past");
1157 // …while a repeating (time-only) calendar has no instant
1158 let rep = CalendarSpec {
1159 at: "09:00".into(),
1160 days: vec!["mon-fri".into()],
1161 };
1162 assert!(rep.oneshot_instant(ScheduleTz::Utc).is_none());
1163 }
1164
1165 fn schedule_with_active(from: Option<&str>, until: Option<&str>) -> Schedule {
1166 let mut s = schedule_with(
1167 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1168 RunsOn::Backend,
1169 );
1170 s.active = Active {
1171 from: from.map(str::to_owned),
1172 until: until.map(str::to_owned),
1173 };
1174 s
1175 }
1176
1177 #[test]
1178 fn validate_accepts_active_window() {
1179 schedule_with_active(Some("2026-07-01"), Some("2026-08-01T12:00:00+09:00"))
1180 .validate()
1181 .expect("date + rfc3339 bounds should validate");
1182 }
1183
1184 #[test]
1185 fn validate_rejects_unparseable_active_bound() {
1186 let err = schedule_with_active(Some("July 1st"), None)
1187 .validate()
1188 .unwrap_err();
1189 assert!(err.contains("active"), "got: {err}");
1190 }
1191
1192 #[test]
1193 fn validate_rejects_from_not_before_until() {
1194 let err = schedule_with_active(Some("2026-08-01"), Some("2026-07-01"))
1195 .validate()
1196 .unwrap_err();
1197 assert!(err.contains("strictly before"), "got: {err}");
1198
1199 let err = schedule_with_active(Some("2026-07-01"), Some("2026-07-01"))
1200 .validate()
1201 .unwrap_err();
1202 assert!(err.contains("strictly before"), "got: {err}");
1203 }
1204
1205 // ---- Active window semantics ----
1206
1207 #[test]
1208 fn active_window_is_half_open() {
1209 use chrono::TimeZone;
1210 let active = Active {
1211 from: Some("2026-07-01".into()),
1212 until: Some("2026-08-01".into()),
1213 };
1214 // UTC tz so the date bounds are UTC midnight.
1215 let at = |y, m, d, h| chrono::Utc.with_ymd_and_hms(y, m, d, h, 0, 0).unwrap();
1216 let c = |t| active.contains(t, ScheduleTz::Utc);
1217 assert!(!c(at(2026, 6, 30, 23)), "before from");
1218 assert!(c(at(2026, 7, 1, 0)), "at from (inclusive)");
1219 assert!(c(at(2026, 7, 15, 12)), "inside");
1220 assert!(!c(at(2026, 8, 1, 0)), "at until (exclusive)");
1221 assert!(!c(at(2026, 8, 2, 0)), "after until");
1222 }
1223
1224 #[test]
1225 fn active_empty_window_is_always_active() {
1226 assert!(Active::default().contains(chrono::Utc::now(), ScheduleTz::Local));
1227 }
1228
1229 #[test]
1230 fn active_rfc3339_bound_honours_offset_regardless_of_tz() {
1231 use chrono::TimeZone;
1232 let active = Active {
1233 from: Some("2026-07-01T09:00:00+09:00".into()),
1234 until: None,
1235 };
1236 // RFC3339 carries its own offset → tz arg is ignored.
1237 // 09:00 JST = 00:00 UTC.
1238 for tz in [ScheduleTz::Utc, ScheduleTz::Local] {
1239 assert!(
1240 !active.contains(
1241 chrono::Utc
1242 .with_ymd_and_hms(2026, 6, 30, 23, 59, 0)
1243 .unwrap(),
1244 tz
1245 )
1246 );
1247 assert!(active.contains(
1248 chrono::Utc.with_ymd_and_hms(2026, 7, 1, 0, 0, 0).unwrap(),
1249 tz
1250 ));
1251 }
1252 }
1253
1254 #[test]
1255 fn active_date_bound_respects_tz() {
1256 // A bare `YYYY-MM-DD` bound is midnight *in the schedule's
1257 // tz* (#418 Phase 2). The UTC interpretation is exact and
1258 // host-independent; assert that precisely.
1259 use chrono::TimeZone;
1260 let utc = Active::parse_bound("2026-07-01", ScheduleTz::Utc).expect("utc");
1261 assert_eq!(
1262 utc,
1263 chrono::Utc.with_ymd_and_hms(2026, 7, 1, 0, 0, 0).unwrap()
1264 );
1265
1266 // The local interpretation must equal what chrono::Local
1267 // computes for the same wall-clock midnight — proves the tz
1268 // path is wired to the host zone (the magnitude vs UTC is
1269 // host-dependent, so we compare against Local directly rather
1270 // than hard-coding the JST offset, keeping CI green on UTC
1271 // runners).
1272 let local = Active::parse_bound("2026-07-01", ScheduleTz::Local).expect("local");
1273 let want = chrono::Local
1274 .with_ymd_and_hms(2026, 7, 1, 0, 0, 0)
1275 .single()
1276 .expect("local midnight is unambiguous")
1277 .with_timezone(&chrono::Utc);
1278 assert_eq!(local, want, "date bound resolved in host-local tz");
1279 }
1280
1281 #[test]
1282 fn active_empty_is_skipped_when_serialising() {
1283 let s = schedule_with(
1284 When::PerPc(PerPolicy::Once(OnceLiteral::Once)),
1285 RunsOn::Backend,
1286 );
1287 let json = serde_json::to_value(&s).expect("serialise");
1288 assert!(
1289 json.get("active").is_none(),
1290 "empty active must not appear on the wire: {json}"
1291 );
1292 }
1293
1294 #[test]
1295 fn shipped_schedule_configs_parse_and_validate() {
1296 // Every YAML under configs/schedules/ must parse with the
1297 // current Schedule serde AND pass validate() — keeps the
1298 // shipped examples from drifting out of sync with the model
1299 // (#418 removed back-compat, so drift = broken at create).
1300 let dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../../configs/schedules");
1301 let mut seen = 0;
1302 for entry in std::fs::read_dir(&dir).expect("read configs/schedules") {
1303 let path = entry.expect("dir entry").path();
1304 if path.extension().and_then(|e| e.to_str()) != Some("yaml") {
1305 continue;
1306 }
1307 let body = std::fs::read_to_string(&path).expect("read yaml");
1308 let s: Schedule = serde_yaml::from_str(&body)
1309 .unwrap_or_else(|e| panic!("{} failed to parse: {e}", path.display()));
1310 s.validate()
1311 .unwrap_or_else(|e| panic!("{} failed validate(): {e}", path.display()));
1312 seen += 1;
1313 }
1314 assert!(seen > 0, "no schedule YAMLs found in {}", dir.display());
1315 }
1316
1317 // ---- pre-existing enum wire formats (unchanged by #418) ----
1318
1319 #[test]
1320 fn exec_mode_serialises_snake_case() {
1321 for (mode, expected) in [
1322 (ExecMode::EveryTick, "every_tick"),
1323 (ExecMode::OncePerPc, "once_per_pc"),
1324 (ExecMode::OncePerTarget, "once_per_target"),
1325 ] {
1326 let s = serde_json::to_value(mode).expect("serialise");
1327 assert_eq!(s, serde_json::Value::String(expected.into()));
1328 let back: ExecMode = serde_json::from_value(serde_json::Value::String(expected.into()))
1329 .expect("deserialise");
1330 assert_eq!(back, mode, "round-trip for {expected}");
1331 }
1332 }
1333
1334 #[test]
1335 fn schedule_runs_on_defaults_to_backend() {
1336 let yaml = r#"
1337id: x
1338when:
1339 per_pc: once
1340job_id: y
1341target: { all: true }
1342"#;
1343 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
1344 assert_eq!(s.runs_on, RunsOn::Backend);
1345 }
1346
1347 #[test]
1348 fn schedule_runs_on_agent_parses() {
1349 let yaml = r#"
1350id: offline-inv
1351when:
1352 per_pc: { every: 1h }
1353job_id: inventory-hw
1354target: { all: true }
1355runs_on: agent
1356"#;
1357 let s: Schedule = serde_yaml::from_str(yaml).expect("parse");
1358 assert_eq!(s.runs_on, RunsOn::Agent);
1359 assert_eq!(s.lowered().mode, ExecMode::OncePerPc);
1360 }
1361
1362 #[test]
1363 fn runs_on_serialises_snake_case() {
1364 for (mode, expected) in [(RunsOn::Backend, "backend"), (RunsOn::Agent, "agent")] {
1365 let s = serde_json::to_value(mode).expect("serialise");
1366 assert_eq!(s, serde_json::Value::String(expected.into()));
1367 let back: RunsOn = serde_json::from_value(serde_json::Value::String(expected.into()))
1368 .expect("deserialise");
1369 assert_eq!(back, mode);
1370 }
1371 }
1372
1373 #[test]
1374 fn execute_shell_into_wire_shell() {
1375 assert_eq!(Shell::from(ExecuteShell::Powershell), Shell::Powershell);
1376 assert_eq!(Shell::from(ExecuteShell::Cmd), Shell::Cmd);
1377 }
1378
1379 #[test]
1380 fn manifest_staleness_defaults_to_cached() {
1381 let yaml = r#"
1382id: x
1383version: 1.0.0
1384execute:
1385 shell: powershell
1386 script: "echo"
1387 timeout: 1s
1388"#;
1389 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1390 assert_eq!(m.staleness, Staleness::Cached);
1391 }
1392
1393 #[test]
1394 fn manifest_strict_staleness_parses() {
1395 let yaml = r#"
1396id: urgent-patch
1397version: 2.5.1
1398execute:
1399 shell: powershell
1400 script: Install-Hotfix
1401 timeout: 5m
1402staleness:
1403 mode: strict
1404 max_cache_age: 0s
1405"#;
1406 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1407 match m.staleness {
1408 Staleness::Strict { max_cache_age } => assert_eq!(max_cache_age, "0s"),
1409 other => panic!("expected strict, got {other:?}"),
1410 }
1411 }
1412
1413 #[test]
1414 fn manifest_unchecked_staleness_parses() {
1415 let yaml = r#"
1416id: legacy
1417version: 0.1.0
1418execute:
1419 shell: cmd
1420 script: "echo"
1421 timeout: 1s
1422staleness:
1423 mode: unchecked
1424"#;
1425 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1426 assert_eq!(m.staleness, Staleness::Unchecked);
1427 }
1428
1429 #[test]
1430 fn missing_required_field_errors() {
1431 // `id` missing.
1432 let yaml = r#"
1433version: 1.0.0
1434target: { all: true }
1435execute:
1436 shell: powershell
1437 script: "echo"
1438 timeout: 1s
1439"#;
1440 let r: Result<Manifest, _> = serde_yaml::from_str(yaml);
1441 assert!(r.is_err(), "expected error, got {:?}", r);
1442 }
1443
1444 #[test]
1445 fn display_field_table_kind_round_trips_with_nested_columns() {
1446 // #39: `type: table` + `columns:` on a DisplayField gets
1447 // round-tripped through serde so the SPA receives the
1448 // nested schema verbatim. Nested columns themselves are
1449 // DisplayFields so they can carry `type: bytes` /
1450 // `type: number` for cell formatting.
1451 let yaml = r#"
1452id: inv-hw
1453version: 1.0.0
1454execute:
1455 shell: powershell
1456 script: "echo"
1457 timeout: 60s
1458inventory:
1459 display:
1460 - field: hostname
1461 label: Hostname
1462 - field: disks
1463 label: Disks
1464 type: table
1465 columns:
1466 - field: device_id
1467 label: Drive
1468 - field: size_bytes
1469 label: Size
1470 type: bytes
1471 - field: free_bytes
1472 label: Free
1473 type: bytes
1474 - field: file_system
1475 label: FS
1476"#;
1477 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1478 let inv = m.inventory.as_ref().expect("inventory hint");
1479 let disks = inv
1480 .display
1481 .iter()
1482 .find(|d| d.field == "disks")
1483 .expect("disks display row");
1484 assert_eq!(disks.kind.as_deref(), Some("table"));
1485 let cols = disks.columns.as_ref().expect("table needs columns");
1486 assert_eq!(cols.len(), 4);
1487 assert_eq!(cols[1].field, "size_bytes");
1488 assert_eq!(cols[1].kind.as_deref(), Some("bytes"));
1489 }
1490
1491 #[test]
1492 fn display_field_scalar_kind_keeps_columns_none() {
1493 // Defensive: when type is a scalar (`bytes` / `number` /
1494 // `timestamp`) the `columns` field stays None — the SPA
1495 // uses its presence as the "render nested table" signal,
1496 // so it must not leak in via serde defaults.
1497 let yaml = r#"
1498id: x
1499version: 1.0.0
1500execute:
1501 shell: powershell
1502 script: "echo"
1503 timeout: 5s
1504inventory:
1505 display:
1506 - { field: ram_bytes, label: RAM, type: bytes }
1507"#;
1508 let m: Manifest = serde_yaml::from_str(yaml).expect("parse");
1509 let inv = m.inventory.as_ref().unwrap();
1510 assert!(inv.display[0].columns.is_none());
1511 }
1512}
1513
1514/// Periodic schedule (spec §2.4.3). v0.18.0 carries the fanout plan
1515/// (target + optional rollout + optional jitter) inline; the
1516/// referenced job (`job_id` → [`BUCKET_JOBS`]) supplies only the
1517/// script body. Two schedules of the same job can target different
1518/// groups on different cadences without copying the manifest.
1519///
1520/// #418 Phase 1: the cadence is the single [`When`] field. The old
1521/// `cron` × `mode` × `cooldown` × `auto_disable_when_done` quartet
1522/// is gone (no back-compat — pre-Phase-1 KV blobs fail to parse and
1523/// are warn-skipped; re-`schedule create` to upgrade them). The
1524/// engine underneath is unchanged: [`Schedule::lowered`] maps `when`
1525/// onto the same (cron, ExecMode, cooldown) trio the scheduler and
1526/// `decide_fire` always ran on.
1527#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone)]
1528pub struct Schedule {
1529 pub id: String,
1530 /// When to fire — a reconcile cadence (`per_pc` / `per_target`)
1531 /// or a calendar time trigger (`at` / `days`). See [`When`].
1532 ///
1533 /// `singleton_map`: serde_yaml 0.9 renders externally-tagged
1534 /// enums as `!per_pc` YAML tags by default; this keeps the
1535 /// operator-facing map shape (`when: { per_pc: once }`). JSON
1536 /// output is identical either way, and the schemars schema
1537 /// (external tagging = oneOf of single-key objects) already
1538 /// matches the singleton-map wire shape.
1539 #[serde(with = "serde_yaml::with::singleton_map")]
1540 #[schemars(with = "When")]
1541 pub when: When,
1542 /// Key into [`crate::kv::BUCKET_JOBS`]. Must equal a registered
1543 /// Manifest's `id`.
1544 pub job_id: String,
1545 /// Who + how-to-phase + when-to-stagger. The Manifest doesn't
1546 /// carry these any more — same job + different fanout = different
1547 /// schedule.
1548 #[serde(flatten)]
1549 pub plan: FanoutPlan,
1550 /// Optional validity window. Outside `[from, until)` the
1551 /// schedule is dormant — still registered, still visible, but
1552 /// every tick is skipped (deleted ≠ dormant: a campaign that
1553 /// ended stays inspectable and can be re-armed by editing the
1554 /// window). Checked at tick time on both the backend scheduler
1555 /// and the agent's local scheduler.
1556 #[serde(default, skip_serializing_if = "Active::is_empty")]
1557 pub active: Active,
1558 /// #418 Phase 2: the timezone this schedule's wall-clock fields
1559 /// are evaluated in — both the calendar `at` firing time AND the
1560 /// `active.{from,until}` window bounds. `local` (default) = the
1561 /// running host's TZ (the agent's for `runs_on: agent`, the
1562 /// backend server's otherwise); `utc` for TZ-independent
1563 /// schedules. Reconcile shapes (`per_pc`/`per_target`) ignore it
1564 /// for firing (poll cron runs every minute regardless) but still
1565 /// honor it for the `active` window.
1566 #[serde(default)]
1567 pub tz: ScheduleTz,
1568 /// v0.22: optional humantime window after a cron tick during
1569 /// which the Command is still considered "live". The scheduler
1570 /// computes `tick_at + starting_deadline` and stamps it onto
1571 /// each Command as `deadline_at`; agents skip Commands they
1572 /// receive after that absolute time. `None` (default) = no
1573 /// deadline, meaning a Command queued in the broker / stream
1574 /// during agent downtime runs whenever the agent reconnects —
1575 /// good for kitting / inventory / cleanup. Set this for
1576 /// time-of-day notifications, lunch reminders, etc., where
1577 /// "fire 3 hours late" would be wrong.
1578 #[serde(default, skip_serializing_if = "Option::is_none")]
1579 pub starting_deadline: Option<String>,
1580 /// v0.23: where does the cron tick happen? `Backend` (default,
1581 /// historical) = backend's scheduler fires Commands via NATS;
1582 /// agents passively receive. `Agent` = each targeted agent runs
1583 /// its own internal cron and fires locally, so the schedule
1584 /// keeps ticking even when the broker is unreachable (laptop on
1585 /// the train, broker maintenance window, full WAN outage). The
1586 /// two locations are mutually exclusive — when `Agent`, the
1587 /// backend scheduler stays out and just keeps the definition in
1588 /// KV for agents to read.
1589 #[serde(default)]
1590 pub runs_on: RunsOn,
1591 #[serde(default = "default_true")]
1592 pub enabled: bool,
1593}
1594
1595/// v0.23 — where the cron tick fires from.
1596#[derive(
1597 Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1598)]
1599#[serde(rename_all = "snake_case")]
1600pub enum RunsOn {
1601 /// Backend's central scheduler ticks and publishes Commands to
1602 /// NATS. Historical default, what every pre-v0.23 schedule
1603 /// uses. Agent offline ⇒ Command queued in STREAM_EXEC; agent
1604 /// reconnects ⇒ catch-up via [`command_replay`](crate)
1605 /// (see kanade-agent's command_replay module).
1606 #[default]
1607 Backend,
1608 /// Each targeted agent runs the cron tick locally. Survives
1609 /// broker / WAN outages. Best for laptops / mobile devices that
1610 /// roam off the corporate network. Agent must be online for the
1611 /// initial schedule + job-catalog pull, but once cached the
1612 /// agent fires the script standalone.
1613 Agent,
1614}
1615
1616/// Per-pc/per-target dedup semantics for a [`Schedule`] (v0.19).
1617#[derive(
1618 Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1619)]
1620#[serde(rename_all = "snake_case")]
1621pub enum ExecMode {
1622 /// Fire on every cron tick at the whole target. Historical
1623 /// (pre-v0.19) behavior; no dedup.
1624 #[default]
1625 EveryTick,
1626 /// Fire at each pc until that pc succeeds; then skip it until
1627 /// the optional cooldown elapses (or forever if no cooldown).
1628 /// Use for kitting / first-boot / per-pc compliance checks.
1629 OncePerPc,
1630 /// Fire at the whole target until **any** pc succeeds; then
1631 /// skip the whole target until the optional cooldown elapses
1632 /// (or forever if no cooldown). Use for "one delegate is
1633 /// enough" tasks like license check-in.
1634 OncePerTarget,
1635}
1636
1637/// #418 Phase 1 — the single "when does this fire" axis.
1638///
1639/// Replaces the old `cron` + `mode` + `cooldown` trio whose
1640/// interactions were implicit (cron doubled as both a real
1641/// time-of-day trigger and a reconcile poll period; contradictory
1642/// combinations silently no-opped). Two shapes:
1643///
1644/// * **reconcile** (`per_pc` / `per_target`) — desired-state: "each
1645/// pc (or one delegate) should have run this within `every`".
1646/// The poll period is system-generated ([`POLL_CRON`], every
1647/// minute) and no longer the operator's concern.
1648/// * **calendar** (`{ at, days }`) — a wall-clock time trigger
1649/// (#418 Phase 2, replacing the old raw-cron escape hatch). Fires
1650/// the whole target at the given time, no dedup. `at: "09:00"` +
1651/// `days` repeats; `at: "2026-06-10 09:00"` (a date+time) fires
1652/// exactly once. Evaluated in the schedule's top-level `tz`.
1653#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1654#[serde(rename_all = "snake_case")]
1655pub enum When {
1656 /// Fire at each targeted pc: `once` (kitting — succeed once,
1657 /// skip forever, forever catching brand-new / re-imaged pcs)
1658 /// or `{ every: <humantime> }` (patrol — re-arm per pc after
1659 /// the interval).
1660 PerPc(PerPolicy),
1661 /// Fire until **any** one pc of the target succeeds, then skip
1662 /// the whole target (`once`) or re-arm after `every`. Needs
1663 /// fleet-wide completion data, so it is backend-only —
1664 /// `runs_on: agent` + `per_target` is rejected by
1665 /// [`Schedule::validate`].
1666 PerTarget(PerPolicy),
1667 /// Calendar time trigger: `{ at: "09:00", days: [mon-fri] }`
1668 /// (repeating) or `{ at: "2026-06-10 09:00" }` (one-shot). Fires
1669 /// the whole target at that wall-clock time in the schedule's
1670 /// `tz` — no dedup, no cooldown.
1671 Calendar(CalendarSpec),
1672}
1673
1674/// Calendar time trigger (#418 Phase 2). `at` is either a time of
1675/// day (`"HH:MM"`, repeating — combine with `days`) or a full
1676/// date+time (`"YYYY-MM-DD HH:MM"`, a one-shot that fires once and
1677/// never again). Evaluated in the schedule's top-level `tz`.
1678#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1679#[serde(deny_unknown_fields)]
1680pub struct CalendarSpec {
1681 /// `"HH:MM"` (24h) for a repeating trigger, or
1682 /// `"YYYY-MM-DD HH:MM"` (hyphen / slash / `T` separators all
1683 /// accepted) for a one-shot. Parsed lazily —
1684 /// [`Schedule::validate`] rejects garbage at create time.
1685 pub at: String,
1686 /// Day-of-week filter for a time-of-day `at`: `["mon-fri"]`,
1687 /// `["mon","wed","fri"]`, … (passed verbatim to the cron DOW
1688 /// field, so ranges and names both work). Empty = every day.
1689 /// Must be empty when `at` carries a date (the date already
1690 /// pins the day).
1691 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1692 pub days: Vec<String>,
1693}
1694
1695/// Parsed `CalendarSpec.at`: the wall-clock minute/hour, plus the
1696/// date for a one-shot (`None` = repeating time-of-day).
1697struct ParsedAt {
1698 minute: u32,
1699 hour: u32,
1700 date: Option<chrono::NaiveDate>,
1701}
1702
1703impl CalendarSpec {
1704 /// Parse `at`: a date+time (`YYYY-MM-DD HH:MM`, hyphen / slash /
1705 /// `T` separators) is a one-shot; a bare `HH:MM` is repeating.
1706 fn parse_at(&self) -> Result<ParsedAt, String> {
1707 use chrono::Timelike;
1708 let s = self.at.trim();
1709 for fmt in ["%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M", "%Y/%m/%d %H:%M"] {
1710 if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, fmt) {
1711 return Ok(ParsedAt {
1712 minute: dt.minute(),
1713 hour: dt.hour(),
1714 date: Some(dt.date()),
1715 });
1716 }
1717 }
1718 if let Ok(t) = chrono::NaiveTime::parse_from_str(s, "%H:%M") {
1719 return Ok(ParsedAt {
1720 minute: t.minute(),
1721 hour: t.hour(),
1722 date: None,
1723 });
1724 }
1725 Err(format!(
1726 "when.at: unparseable '{}' (want HH:MM or YYYY-MM-DD HH:MM)",
1727 self.at
1728 ))
1729 }
1730
1731 /// Pre-flight check on the `days` tokens so a bad day name gives
1732 /// a `when.days:`-scoped error instead of croner's confusing
1733 /// "when.at lowered to invalid cron" (claude #432 review). Each
1734 /// token is a day name (`mon`..`sun`), a numeric DOW (`0`..`7`),
1735 /// `*`, or a `-` range of those.
1736 fn validate_days(&self) -> Result<(), String> {
1737 const NAMES: [&str; 7] = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"];
1738 for tok in &self.days {
1739 // Report the whole token on a malformed range like `mon-`
1740 // (which would otherwise split to a cryptic empty part —
1741 // claude #432 follow-up).
1742 let invalid = |reason: &str| {
1743 Err(format!(
1744 "when.days: invalid day token '{tok}' ({reason}; \
1745 want mon..sun, 0-7, a range like mon-fri, or *)"
1746 ))
1747 };
1748 for part in tok.split('-') {
1749 let p = part.trim().to_ascii_lowercase();
1750 if p.is_empty() {
1751 return invalid("empty range bound");
1752 }
1753 let ok = p == "*"
1754 || NAMES.contains(&p.as_str())
1755 || p.parse::<u8>().map(|n| n <= 7).unwrap_or(false);
1756 if !ok {
1757 return invalid(&format!("'{part}' is not a day"));
1758 }
1759 }
1760 }
1761 Ok(())
1762 }
1763
1764 /// For a one-shot (`at` carries a date), the absolute instant it
1765 /// fires in `tz`. `None` for a repeating calendar. Used to warn
1766 /// about a one-shot whose date is already in the past (it would
1767 /// never fire).
1768 pub fn oneshot_instant(&self, tz: ScheduleTz) -> Option<chrono::DateTime<chrono::Utc>> {
1769 let p = self.parse_at().ok()?;
1770 let date = p.date?;
1771 let naive = date.and_hms_opt(p.hour, p.minute, 0)?;
1772 tz.naive_to_utc(naive)
1773 }
1774
1775 /// Lower to the cron string the scheduler engine runs. Repeating
1776 /// → 6-field `0 {min} {hour} * * {dow}`; one-shot → 7-field
1777 /// `0 {min} {hour} {day} {month} * {year}` (a past year never
1778 /// fires — that's what makes it one-shot).
1779 fn to_cron(&self) -> Result<String, String> {
1780 use chrono::Datelike;
1781 let ParsedAt { minute, hour, date } = self.parse_at()?;
1782 match date {
1783 Some(d) => {
1784 if !self.days.is_empty() {
1785 return Err(
1786 "when.at with a date is a one-shot and cannot be combined with days".into(),
1787 );
1788 }
1789 Ok(format!(
1790 "0 {minute} {hour} {} {} * {}",
1791 d.day(),
1792 d.month(),
1793 d.year()
1794 ))
1795 }
1796 None => {
1797 let dow = if self.days.is_empty() {
1798 "*".to_string()
1799 } else {
1800 self.validate_days()?;
1801 self.days.join(",")
1802 };
1803 Ok(format!("0 {minute} {hour} * * {dow}"))
1804 }
1805 }
1806 }
1807}
1808
1809/// The timezone a schedule's wall-clock fields (`when.at`,
1810/// `active.{from,until}`) are evaluated in (#418 Phase 2).
1811#[derive(
1812 Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq, Default,
1813)]
1814#[serde(rename_all = "snake_case")]
1815pub enum ScheduleTz {
1816 /// The running host's local timezone — the agent's for
1817 /// `runs_on: agent`, the backend server's otherwise. Default.
1818 #[default]
1819 Local,
1820 /// UTC — for timezone-independent schedules.
1821 Utc,
1822}
1823
1824impl ScheduleTz {
1825 /// Interpret a naive (zoneless) datetime as being in this tz and
1826 /// convert to UTC. On a DST *fold* (the local time occurs twice
1827 /// when clocks go back) we pick `.earliest()` rather than
1828 /// rejecting it; `None` is reserved for a true DST *gap* (a local
1829 /// time that never exists). `Utc` is fixed-offset so neither ever
1830 /// happens; `Local` is whatever timezone the running host is set
1831 /// to and *can* hit a gap/fold on any DST-observing host — not
1832 /// just the JST we run today (gemini + claude #432 review).
1833 fn naive_to_utc(self, naive: chrono::NaiveDateTime) -> Option<chrono::DateTime<chrono::Utc>> {
1834 use chrono::TimeZone;
1835 match self {
1836 ScheduleTz::Utc => Some(chrono::DateTime::from_naive_utc_and_offset(
1837 naive,
1838 chrono::Utc,
1839 )),
1840 ScheduleTz::Local => chrono::Local
1841 .from_local_datetime(&naive)
1842 .earliest()
1843 .map(|dt| dt.with_timezone(&chrono::Utc)),
1844 }
1845 }
1846}
1847
1848/// `once` vs `{ every: <humantime> }` — shared by `per_pc` /
1849/// `per_target`. Untagged so the YAML stays the bare keyword or a
1850/// one-key map, nothing more ceremonial.
1851#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1852#[serde(untagged)]
1853pub enum PerPolicy {
1854 /// The bare string `once`: succeed once, then skip permanently
1855 /// (cooldown = infinity).
1856 Once(OnceLiteral),
1857 /// Re-arm after the humantime interval, e.g. `{ every: 6h }`.
1858 Every(EverySpec),
1859}
1860
1861/// Single-variant enum so serde accepts exactly the string `once`
1862/// (a free-form `String` would swallow typos like `onec`).
1863#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Copy, PartialEq, Eq)]
1864#[serde(rename_all = "snake_case")]
1865pub enum OnceLiteral {
1866 Once,
1867}
1868
1869/// `{ every: <humantime> }`. Standalone struct (not an inline
1870/// struct variant) so `deny_unknown_fields` still bites under the
1871/// untagged [`PerPolicy`] — `{ evry: 6h }` is a parse error, not a
1872/// silently-ignored key.
1873#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, PartialEq, Eq)]
1874#[serde(deny_unknown_fields)]
1875pub struct EverySpec {
1876 /// Humantime interval (`10m`, `6h`, `1d`...). Parsed lazily —
1877 /// [`Schedule::validate`] rejects garbage at create time.
1878 pub every: String,
1879}
1880
1881impl PerPolicy {
1882 /// The cooldown this policy lowers to: `once` = `None`
1883 /// (permanent skip), `every` = the interval.
1884 fn cooldown(&self) -> Option<String> {
1885 match self {
1886 PerPolicy::Once(_) => None,
1887 PerPolicy::Every(EverySpec { every }) => Some(every.clone()),
1888 }
1889 }
1890}
1891
1892impl std::fmt::Display for When {
1893 /// Operator-facing one-liner (`per_pc once` / `per_pc every 6h`
1894 /// / `at 09:00 [mon-fri]` / `at 2026-06-10 09:00`) for log
1895 /// lines, audit payloads and the API's `ScheduleSummary`.
1896 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1897 let policy = |p: &PerPolicy| match p {
1898 PerPolicy::Once(_) => "once".to_string(),
1899 PerPolicy::Every(EverySpec { every }) => format!("every {every}"),
1900 };
1901 match self {
1902 When::PerPc(p) => write!(f, "per_pc {}", policy(p)),
1903 When::PerTarget(p) => write!(f, "per_target {}", policy(p)),
1904 When::Calendar(c) if c.days.is_empty() => write!(f, "at {}", c.at),
1905 When::Calendar(c) => write!(f, "at {} [{}]", c.at, c.days.join(",")),
1906 }
1907 }
1908}
1909
1910/// Optional validity window for a [`Schedule`] (#418 decision G).
1911/// Half-open `[from, until)`; either bound may be omitted. Bounds
1912/// are `YYYY-MM-DD` (= that day's 00:00 in the schedule's `tz`) or
1913/// full RFC3339 (offset is honored as-is, `tz` ignored). Kept as
1914/// strings so the JSON Schema the SPA editor consumes stays two
1915/// plain string fields, mirroring `jitter` / `starting_deadline`.
1916///
1917/// #418 Phase 2: bounds are evaluated in the schedule's top-level
1918/// `tz` (was UTC-only in Phase 1) so `tz: local` makes both the
1919/// calendar `at` AND the `active` window local — one consistent
1920/// timezone per schedule.
1921#[derive(Serialize, Deserialize, schemars::JsonSchema, Debug, Clone, Default, PartialEq, Eq)]
1922#[serde(deny_unknown_fields)]
1923pub struct Active {
1924 /// Dormant before this instant.
1925 #[serde(default, skip_serializing_if = "Option::is_none")]
1926 pub from: Option<String>,
1927 /// Dormant from this instant on (exclusive).
1928 #[serde(default, skip_serializing_if = "Option::is_none")]
1929 pub until: Option<String>,
1930}
1931
1932impl Active {
1933 /// `skip_serializing_if` helper — an empty window means "always
1934 /// active" and is omitted from the wire format entirely.
1935 pub fn is_empty(&self) -> bool {
1936 self.from.is_none() && self.until.is_none()
1937 }
1938
1939 /// Parse one bound: RFC3339 first (offset honored, `tz`
1940 /// ignored), then bare `YYYY-MM-DD` (00:00 in `tz`).
1941 pub fn parse_bound(s: &str, tz: ScheduleTz) -> Result<chrono::DateTime<chrono::Utc>, String> {
1942 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
1943 return Ok(dt.with_timezone(&chrono::Utc));
1944 }
1945 if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1946 let midnight = d.and_hms_opt(0, 0, 0).expect("00:00:00 is always valid");
1947 return tz.naive_to_utc(midnight).ok_or_else(|| {
1948 format!("active: bound '{s}' falls in a DST gap for the schedule's tz")
1949 });
1950 }
1951 Err(format!(
1952 "active: unparseable bound '{s}' (want YYYY-MM-DD or RFC3339)"
1953 ))
1954 }
1955
1956 /// Is `now` inside the window? Unparseable bounds are treated
1957 /// as absent here (fail-open) — [`Schedule::validate`] is the
1958 /// place that rejects them loudly; this runs on every tick and
1959 /// must never panic on a stale KV blob.
1960 pub fn contains(&self, now: chrono::DateTime<chrono::Utc>, tz: ScheduleTz) -> bool {
1961 let bound = |s: &Option<String>| s.as_deref().and_then(|s| Self::parse_bound(s, tz).ok());
1962 if bound(&self.from).is_some_and(|from| now < from) {
1963 return false;
1964 }
1965 if bound(&self.until).is_some_and(|until| now >= until) {
1966 return false;
1967 }
1968 true
1969 }
1970}
1971
1972/// The system-generated poll cadence every reconcile-shaped `when`
1973/// lowers to. Operators never write this: the real inter-run
1974/// spacing is the `every` cooldown; this only bounds "how soon do
1975/// we notice somebody is due" (#418 decision B took the poll
1976/// period away from the operator).
1977pub const POLL_CRON: &str = "0 * * * * *";
1978
1979/// What a [`When`] lowers to — the exact (cron, mode, cooldown)
1980/// trio the pre-#418 engine ran on. Keeping the engine vocabulary
1981/// unchanged is what lets Phase 1 swap the operator surface without
1982/// touching the tick / dedup machinery.
1983pub struct Lowered {
1984 /// Cron handed to `tokio-cron-scheduler` — [`POLL_CRON`] for
1985 /// reconcile shapes, a 6/7-field cron for calendar shapes.
1986 pub cron: String,
1987 /// Dedup semantics for `decide_fire`.
1988 pub mode: ExecMode,
1989 /// Humantime re-arm interval (`None` = succeed once, skip
1990 /// forever).
1991 pub cooldown: Option<String>,
1992 /// Timezone to evaluate `cron` in (#418 Phase 2). The scheduler
1993 /// passes this to `Job::new_async_tz`. Reconcile shapes carry
1994 /// the schedule's tz too even though POLL_CRON is tz-agnostic,
1995 /// so the same value drives the `active`-window check.
1996 pub tz: ScheduleTz,
1997}
1998
1999impl Schedule {
2000 /// Lower the operator-facing `when` onto the engine vocabulary.
2001 /// Single seam shared by the backend scheduler and the agent's
2002 /// local scheduler so the two can never drift.
2003 pub fn lowered(&self) -> Lowered {
2004 let tz = self.tz;
2005 match &self.when {
2006 When::PerPc(p) => Lowered {
2007 cron: POLL_CRON.into(),
2008 mode: ExecMode::OncePerPc,
2009 cooldown: p.cooldown(),
2010 tz,
2011 },
2012 When::PerTarget(p) => Lowered {
2013 cron: POLL_CRON.into(),
2014 mode: ExecMode::OncePerTarget,
2015 cooldown: p.cooldown(),
2016 tz,
2017 },
2018 // `to_cron` only fails on a malformed `at` (rejected by
2019 // validate() at create time). For a hand-edited KV blob
2020 // that slipped past, emit a deliberately-invalid cron so
2021 // register()'s Job::new_async_tz fails → warn+skip,
2022 // rather than firing at the wrong time.
2023 When::Calendar(c) => Lowered {
2024 cron: c
2025 .to_cron()
2026 .unwrap_or_else(|_| "# invalid calendar at".into()),
2027 mode: ExecMode::EveryTick,
2028 cooldown: None,
2029 tz,
2030 },
2031 }
2032 }
2033
2034 /// Cross-field semantic checks that don't fit pure serde derive
2035 /// — the [`Manifest::validate`] counterpart (#418 decision F;
2036 /// pre-Phase-1 a broken schedule was accepted at create time
2037 /// and silently warn-skipped at tick time). Run at every create
2038 /// site: `kanade schedule create` (client-side) and
2039 /// `POST /api/schedules`. The job_id-exists check lives in the
2040 /// API handler instead — it needs the JOBS KV.
2041 pub fn validate(&self) -> Result<(), String> {
2042 if matches!(self.runs_on, RunsOn::Agent) && matches!(self.when, When::PerTarget(_)) {
2043 return Err(
2044 "when.per_target needs fleet-wide completion data and is backend-only; \
2045 it cannot be combined with runs_on: agent (each agent self-schedules, \
2046 so per-target dedup would be deduping across a target of 1)"
2047 .into(),
2048 );
2049 }
2050 if let Some(cd) = self.lowered().cooldown.as_deref() {
2051 humantime::parse_duration(cd)
2052 .map_err(|e| format!("when.every: invalid duration '{cd}': {e}"))?;
2053 }
2054 if let When::Calendar(c) = &self.when {
2055 // Lower the calendar form to its cron (catches a bad `at`
2056 // and the date+days conflict), then validate that cron
2057 // with the same parser configuration tokio-cron-scheduler
2058 // 0.15 uses internally (croner, seconds required,
2059 // DOM-and-DOW both honored, year optional) — create-time
2060 // validation can never accept what register() rejects.
2061 let cron = c.to_cron()?;
2062 croner::parser::CronParser::builder()
2063 .seconds(croner::parser::Seconds::Required)
2064 .dom_and_dow(true)
2065 .build()
2066 .parse(&cron)
2067 .map_err(|e| format!("when.at lowered to invalid cron '{cron}': {e}"))?;
2068 }
2069 // The other humantime strings on the schedule (claude #419
2070 // review): runtime degrades gracefully on both (bad jitter →
2071 // silent no-op, bad starting_deadline → warn + skipped tick),
2072 // but "rejected at create time" should cover every field the
2073 // operator can typo, not just `when`.
2074 if let Some(j) = &self.plan.jitter {
2075 humantime::parse_duration(j)
2076 .map_err(|e| format!("jitter: invalid duration '{j}': {e}"))?;
2077 }
2078 if let Some(sd) = &self.starting_deadline {
2079 humantime::parse_duration(sd)
2080 .map_err(|e| format!("starting_deadline: invalid duration '{sd}': {e}"))?;
2081 }
2082 let from = self
2083 .active
2084 .from
2085 .as_deref()
2086 .map(|s| Active::parse_bound(s, self.tz))
2087 .transpose()?;
2088 let until = self
2089 .active
2090 .until
2091 .as_deref()
2092 .map(|s| Active::parse_bound(s, self.tz))
2093 .transpose()?;
2094 if let (Some(f), Some(u)) = (from, until) {
2095 if f >= u {
2096 return Err(format!(
2097 "active.from ({}) must be strictly before active.until ({})",
2098 self.active.from.as_deref().unwrap_or_default(),
2099 self.active.until.as_deref().unwrap_or_default(),
2100 ));
2101 }
2102 }
2103 Ok(())
2104 }
2105}
2106
2107fn default_true() -> bool {
2108 true
2109}