Skip to main content

shigoto_types/
lib.rs

1//! shigoto-types — typed primitives every other shigoto crate consumes.
2//!
3//! Spec: `theory/SHIGOTO.md` §III.1–III.4 + §III.11–III.12.
4//!
5//! v0.1.0 is the scaffold: trait + enum + struct surfaces declared,
6//! implementations land as the bootstrap consumer (`tend`) migrates
7//! per `theory/SHIGOTO.md` §IV.3 (M0.9 in the broader plan).
8
9#![forbid(unsafe_code)]
10
11use std::path::PathBuf;
12
13use serde::{Deserialize, Serialize};
14
15pub mod failure;
16pub use failure::{Failure, FailureKind, classify, signature};
17
18pub mod sink;
19pub use sink::{AuditFileSink, InMemorySink, MultiSink, NullSink, Sink};
20
21pub mod chain;
22pub use chain::Chain;
23
24pub mod classify;
25pub use classify::{ChainedClassifier, Classifier, FailureClassifier, FnClassifier};
26
27pub mod watch;
28pub use watch::{EscalationRouting, ScheduleWindow, TimeoutWatcher, WatchAction, WatchRule};
29
30// Lightweight convergence primitives re-homed from magma-converge
31// (2026-06-02) — see theory/CONVERGENCE-ADOPTION.md. General, pure,
32// IaC-free, so lightweight controllers adopt them without the magma
33// executor closure.
34pub mod policy;
35pub use policy::CascadePolicy;
36pub mod decision;
37pub use decision::Decision;
38// `converge` — the universal Reconciler trait + typed Plan/Outcome border,
39// RE-HOMED from magma-converge (the third leg of the CascadePolicy/Decision
40// arc). Serde-only; completes shigoto-types as the single home for the whole
41// convergence-primitive family. magma-converge re-exports it for back-compat.
42pub mod converge;
43pub use converge::{
44    Action, AppliedChange, ApplyMetrics, Change, ChangeSeverity, FailedChange, NoMetrics, Outcome,
45    Plan, PlanId, Reconciler, ReconcilerError, SharedReconciler, build_outcome, change,
46    change_with_severity,
47};
48
49pub mod testing;
50
51/// Typed identity for a Job. Stable across cycles + scheduler restarts.
52#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
53pub struct JobId {
54    pub scope: JobScope,
55    pub kind: JobKindId,
56    pub subject: JobSubject,
57}
58
59#[derive(
60    Serialize,
61    Deserialize,
62    Debug,
63    Clone,
64    PartialEq,
65    Eq,
66    Hash,
67    gen_platform::Discriminant,
68    gen_platform::IsVariant,
69)]
70#[discriminant(method = "kind", case = "kebab")]
71pub enum JobScope {
72    Global,
73    Workspace(String),
74    Repo { workspace: String, repo: String },
75}
76
77#[derive(
78    Serialize,
79    Deserialize,
80    Debug,
81    Clone,
82    PartialEq,
83    Eq,
84    Hash,
85    gen_platform::Discriminant,
86    gen_platform::IsVariant,
87)]
88#[discriminant(method = "kind", case = "kebab")]
89pub enum JobSubject {
90    None,
91    Repo(String),
92    Org(String),
93    Path(PathBuf),
94    Pinned(String),
95}
96
97/// Typed work-class identifier. Stored as `String` (not `&'static str`)
98/// so it serializes through serde without lifetime constraints. Cheap
99/// `Clone` is fine for the volume we expect (≤ ~100 kinds across the
100/// whole scheduler).
101#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
102pub struct JobKindId(pub String);
103
104impl JobKindId {
105    #[must_use]
106    pub fn new(s: impl Into<String>) -> Self {
107        Self(s.into())
108    }
109}
110
111/// FSM phase a Job inhabits. See `theory/SHIGOTO.md` §III.3 for the
112/// transition table.
113///
114/// `kind()` (variant → stable kebab-case string) + variant predicates
115/// (`is_pending`, `is_gated`, ...) auto-generated via gen-platform
116/// derives.
117#[derive(
118    Serialize,
119    Deserialize,
120    Debug,
121    Clone,
122    PartialEq,
123    Eq,
124    Hash,
125    gen_platform::Discriminant,
126    gen_platform::IsVariant,
127)]
128#[discriminant(method = "kind", case = "kebab")]
129pub enum JobPhase {
130    Pending,
131    Gated,
132    Ready,
133    Running,
134    Succeeded,
135    Failed { attempts: u32 },
136    Retrying { until_ms: i64 },
137    Skipped(SkipReason),
138    Deadlettered,
139    WaitingForOperator,
140}
141
142#[derive(
143    Serialize,
144    Deserialize,
145    Debug,
146    Clone,
147    PartialEq,
148    Eq,
149    Hash,
150    gen_platform::Discriminant,
151    gen_platform::IsVariant,
152)]
153#[discriminant(method = "kind", case = "kebab")]
154pub enum SkipReason {
155    GateRejected,
156    BlockedByDeadletteredAncestor,
157    OperatorDecision,
158    Other(String),
159}
160
161/// FSM driver — every legal way a Job's phase can change. Exhaustive
162/// over the `(JobPhase, Signal)` cross-product per `theory/SHIGOTO.md`
163/// §IV.1; the `advance` table below enumerates every cell.
164///
165/// `kind()` + variant predicates auto-generated via gen-platform.
166#[derive(Debug, Clone, PartialEq, Eq, gen_platform::Discriminant, gen_platform::IsVariant)]
167#[discriminant(method = "kind", case = "kebab")]
168pub enum Signal {
169    /// Re-evaluate gates for a Pending or Gated or Retrying job. The
170    /// outcome (Gated / Ready / Skipped) is carried in the signal
171    /// payload so the FSM stays pure — gate evaluation itself is in
172    /// shigoto-gate.
173    EvaluateGates(GateAggregate),
174    /// Scheduler chose to start this Ready job (budget allocated).
175    AllocateBudget,
176    /// `execute()` returned Ok(output).
177    ExecutionSucceeded,
178    /// `execute()` returned Err.
179    ExecutionFailed,
180    /// Retry decision after Failed. Carries whether to retry (then
181    /// Retrying with backoff) or deadletter.
182    RetryDecide(RetryOutcome),
183    /// Cooperative cancellation signal — Running job was told to
184    /// stop. Maps to Failed (with cancellation error).
185    Cancel,
186    /// Per-job timeout elapsed while Running. Maps to Failed (with
187    /// timeout error).
188    Timeout,
189    /// Retry backoff window elapsed; ready to re-evaluate gates.
190    BackoffElapsed,
191    /// Externally-driven transition from operator action. Constrained
192    /// to (WaitingForOperator → Ready|Skipped) and
193    /// (Deadlettered → Pending) per §VII.3.
194    OperatorTransition(JobPhase),
195}
196
197/// Aggregate gate outcome — what the cohort of gates collectively said.
198/// Per §III.9 individual gates return Pass / Wait / Skip; the
199/// aggregate is the worst outcome (Skip > Wait > Pass) per a typed
200/// reducer in shigoto-gate. We carry the rolled-up result here so the
201/// FSM stays language-agnostic about how the rollup is computed.
202///
203/// `kind()` + variant predicates auto-generated via gen-platform.
204#[derive(Debug, Clone, PartialEq, Eq, gen_platform::Discriminant, gen_platform::IsVariant)]
205#[discriminant(method = "kind", case = "kebab")]
206pub enum GateAggregate {
207    /// Every gate returned Pass — job advances to Ready.
208    AllPassed,
209    /// At least one gate returned Wait — job stays Gated.
210    SomeWaiting,
211    /// At least one gate returned Skip — job advances to Skipped with
212    /// that reason.
213    Skipped(SkipReason),
214}
215
216/// Retry decision from a `RetryPolicy::decide()` call. Same shape that
217/// shigoto-retry's `RetryDecision` exposes — duplicated as a typed
218/// signal payload so the FSM stays in shigoto-types without a
219/// dependency on shigoto-retry.
220#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, gen_platform::TypedDispatcher)]
221pub enum RetryOutcome {
222    /// Retry after `until_ms` (absolute timestamp).
223    Retry { until_ms: i64 },
224    /// No more attempts — deadletter.
225    Deadletter,
226}
227
228// Fleet-wide dispatcher-catalog registration for shigoto's retry
229// outcome typed surface. Fifth consumer class (after gen / caixa /
230// wasm-platform / cofre). See theory/UNIFIED-COMPUTING-MODEL.md §VI
231// for the absorption roadmap.
232gen_platform::register_dispatcher!("shigoto.retry-outcome", RetryOutcome);
233
234/// Rejected transition — `(from, signal)` is not a legal FSM cell.
235/// Returning Result instead of panicking lets consumers report drift
236/// (an operator action attempting an illegal transition) without
237/// crashing the scheduler.
238#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
239#[error("illegal transition: {from:?} cannot consume {signal:?}")]
240pub struct IllegalTransition {
241    pub from: JobPhase,
242    pub signal: Signal,
243}
244
245impl Signal {
246    /// Whether this signal originated from an operator command.
247    /// Operator-driven transitions get logged with extra metadata.
248    #[must_use]
249    pub fn is_operator_driven(&self) -> bool {
250        matches!(self, Self::OperatorTransition(_))
251    }
252}
253
254/// The canonical FSM driver. Pure: same `(from, signal)` always
255/// produces the same result. Exhaustive over `JobPhase × Signal` —
256/// adding a new phase or signal fails to compile until every cell
257/// of the cross-product is decided.
258///
259/// Per `theory/SHIGOTO.md` §IV.1 + the diagram in §III.3.
260pub fn advance(from: JobPhase, signal: Signal) -> Result<JobPhase, IllegalTransition> {
261    use Signal::*;
262    let new = match (&from, &signal) {
263        // ── Pending dispatches via gate evaluation ─────────────
264        (JobPhase::Pending, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
265        (JobPhase::Pending, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
266        (JobPhase::Pending, EvaluateGates(GateAggregate::Skipped(r))) => {
267            JobPhase::Skipped(r.clone())
268        }
269
270        // ── Gated re-evaluates each tick ───────────────────────
271        (JobPhase::Gated, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
272        (JobPhase::Gated, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
273        (JobPhase::Gated, EvaluateGates(GateAggregate::Skipped(r))) => JobPhase::Skipped(r.clone()),
274
275        // ── Ready awaits budget allocation ─────────────────────
276        (JobPhase::Ready, AllocateBudget) => JobPhase::Running,
277        // Re-evaluating gates from Ready is allowed (a config change
278        // may have invalidated a previously-Pass gate); same dispatch
279        // as Gated.
280        (JobPhase::Ready, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
281        (JobPhase::Ready, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
282        (JobPhase::Ready, EvaluateGates(GateAggregate::Skipped(r))) => JobPhase::Skipped(r.clone()),
283
284        // ── Running terminates per execute() outcome ───────────
285        (JobPhase::Running, ExecutionSucceeded) => JobPhase::Succeeded,
286        (JobPhase::Running, ExecutionFailed) => JobPhase::Failed { attempts: 1 },
287        (JobPhase::Running, Cancel) => JobPhase::Failed { attempts: 1 },
288        (JobPhase::Running, Timeout) => JobPhase::Failed { attempts: 1 },
289
290        // ── Failed waits for the retry policy's decision ───────
291        (JobPhase::Failed { attempts: _ }, RetryDecide(RetryOutcome::Retry { until_ms })) => {
292            JobPhase::Retrying {
293                until_ms: *until_ms,
294            }
295        }
296        (JobPhase::Failed { .. }, RetryDecide(RetryOutcome::Deadletter)) => JobPhase::Deadlettered,
297
298        // ── Retrying re-evaluates after the backoff window ─────
299        (JobPhase::Retrying { .. }, BackoffElapsed) => JobPhase::Pending,
300        // Gate re-eval from Retrying is allowed if a precondition
301        // changes (rare; treated like Pending re-eval).
302        (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
303        (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
304        (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::Skipped(r))) => {
305            JobPhase::Skipped(r.clone())
306        }
307
308        // ── Operator-driven transitions (§VII.3) ───────────────
309        // WaitingForOperator → Ready or Skipped.
310        (JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Ready)) => JobPhase::Ready,
311        (JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Skipped(r))) => {
312            JobPhase::Skipped(r.clone())
313        }
314        // Deadlettered → Pending (operator retries from scratch).
315        (JobPhase::Deadlettered, OperatorTransition(JobPhase::Pending)) => JobPhase::Pending,
316
317        // ── Every other (phase, signal) pair is illegal ────────
318        _ => return Err(IllegalTransition { from, signal }),
319    };
320    Ok(new)
321}
322
323/// Inputs / Outputs / Errors implement these marker traits so the
324/// scheduler can serialize across boundaries when persistence lands.
325pub trait JobInput: Send + Sync + 'static {}
326pub trait JobOutput: Send + Sync + 'static {}
327pub trait JobError: std::error::Error + Send + Sync + 'static {}
328
329/// Typed receiver for `Job::Output` values. Jobs call `record` on a
330/// successful `execute` so consumers (reconcile receipts, audit
331/// trails, dashboards) can read the typed outcomes the scheduler's
332/// phase-tracking discards.
333///
334/// Per `theory/SHIGOTO.md` §VIII (output capture) — the scheduler
335/// itself doesn't hold sinks; Jobs carry them. This keeps the typed
336/// `O` parameter from leaking into the scheduler's heterogeneous
337/// `Vec<Box<dyn ErasedJob>>` storage (which would require type
338/// erasure on the sink too). The cost: each Job impl decides which
339/// sink (if any) to wire in, but the gain is full type safety from
340/// producer to consumer.
341///
342/// Implementations:
343/// - `NullSink<O>` — discards everything; default for Jobs that
344///   don't care about output capture.
345/// - `InMemorySink<O>` — stores into `Arc<Mutex<HashMap<JobId, O>>>`
346///   so the consumer can drain after ticks complete.
347/// - Both live in `shigoto-emit` alongside the `TransitionEmitter`
348///   sinks, since the two surfaces are conceptually paired.
349///
350/// `record` takes `&O` so non-`Clone` Outputs are allowed at the
351/// trait level. Concrete sinks that need owned values (`InMemorySink`)
352/// add `O: Clone` at their `impl` boundary, not on the trait.
353///
354/// `record` is async because audit-style sinks may want to fsync or
355/// push to a queue; in-memory sinks return immediately.
356#[async_trait::async_trait]
357pub trait OutputSink<O>: Send + Sync + 'static
358where
359    O: Send + Sync + 'static,
360{
361    /// Called by a Job from its `execute` method after computing a
362    /// successful `Output`. The Job retains ownership; sinks that
363    /// need storage should clone internally.
364    ///
365    /// `O: Sync` is required because the async-trait desugar captures
366    /// `&O` across an `await` boundary — the resulting future is only
367    /// `Send` when the borrowed reference is. Outputs that aren't
368    /// `Sync` (rare for plain data; common for things like `Cell`)
369    /// can't use the typed sink and must capture via a side channel.
370    async fn record(&self, job_id: &JobId, output: &O);
371}
372
373/// Convenience trait that captures the most common Job authoring
374/// shape across pleme-io consumers: a Job whose typed Output flows
375/// through an [`OutputSink`] for consumer-side capture, and whose
376/// identity decomposes into (scope, kind, subject).
377///
378/// Implementations write only the per-Job logic:
379/// - `KIND` — the typed work-class id constant.
380/// - `scope()` / `subject()` — the two non-kind coordinates of `JobId`.
381/// - `output_sink()` — optional wired sink for output capture.
382/// - `execute_body()` — the actual side-effecting work.
383///
384/// The blanket `impl<T: RecordingJob> Job for T` below derives:
385/// - `Job::id()` — assembled from scope() + subject() + KIND.
386/// - `Job::kind()` — `JobKindId::new(T::KIND)`.
387/// - `Job::execute()` — calls `execute_body`, then records to the
388///   sink (when present) before returning the typed Output.
389///
390/// Consumers either implement `Job` directly (full control) or
391/// `RecordingJob` (the common case). Not both — the orphan rule +
392/// the blanket impl mean implementing one excludes the other for
393/// the same type.
394#[async_trait::async_trait]
395pub trait RecordingJob: Send + Sync + 'static {
396    /// Typed work output. `Send + Sync + Clone` are needed so the
397    /// blanket `Job::execute` can call `sink.record(&id, &output)`
398    /// across an await boundary, and so `InMemorySink<O>` can hold
399    /// owned copies.
400    type Output: Send + Sync + Clone + 'static;
401
402    /// Typed error. Same bounds as `Job::Error`.
403    type Error: std::error::Error + Send + Sync + 'static;
404
405    /// Canonical kind id. Compile-time constant so two Jobs of the
406    /// same impl always share the same `JobKindId` without an
407    /// allocation per call.
408    const KIND: &'static str;
409
410    /// First coordinate of `JobId`. Implementations typically clone
411    /// from `self.workspace` or similar.
412    fn scope(&self) -> JobScope;
413
414    /// Third coordinate of `JobId`. Implementations typically clone
415    /// from `self.repo_name` or similar.
416    fn subject(&self) -> JobSubject;
417
418    /// Optional typed sink. `None` means outputs are dropped after
419    /// execute returns; `Some` records every successful outcome.
420    fn output_sink(&self) -> Option<&std::sync::Arc<dyn OutputSink<Self::Output>>>;
421
422    /// The actual work. Run only on Ready→Running. MUST be idempotent.
423    /// The blanket `Job::execute` wraps this with sink recording so
424    /// callers never write the `if let Some(sink) = ... { sink.record... }`
425    /// dance themselves.
426    async fn execute_body(&self) -> Result<Self::Output, Self::Error>;
427}
428
429#[async_trait::async_trait]
430impl<T: RecordingJob> Job for T {
431    type Output = T::Output;
432    type Error = T::Error;
433
434    fn id(&self) -> JobId {
435        JobId {
436            scope: self.scope(),
437            kind: JobKindId::new(T::KIND),
438            subject: self.subject(),
439        }
440    }
441
442    fn kind(&self) -> JobKindId {
443        JobKindId::new(T::KIND)
444    }
445
446    async fn execute(&self) -> Result<T::Output, T::Error> {
447        let outcome = self.execute_body().await?;
448        if let Some(sink) = self.output_sink() {
449            // Compute the JobId twice (once here, once via id() above).
450            // Cheap — id() is pure data clones.
451            let id = JobId {
452                scope: self.scope(),
453                kind: JobKindId::new(T::KIND),
454                subject: self.subject(),
455            };
456            sink.record(&id, &outcome).await;
457        }
458        Ok(outcome)
459    }
460}
461
462/// The typed Job trait — what every consumer's domain-specific job
463/// implements. Per `theory/SHIGOTO.md` §III.1.
464///
465/// Constraints baked in:
466/// - `'static + Send + Sync` — jobs may move between scheduler threads.
467/// - Typed `Output` / `Error` — no untyped `Box<dyn Error>` in the
468///   business-logic surface. Erased dispatch is `ErasedJob`.
469/// - `execute` is async on tokio. Sync work uses `spawn_blocking`.
470/// - `id()` and `kind()` are pure: scheduler reads them many times
471///   per cycle; no IO.
472/// - `execute` MUST be idempotent (§IV.2). A scheduler that crashes
473///   between completing a side effect and emitting `Succeeded` will
474///   re-invoke execute on the next cycle.
475///
476/// v0.1 omits Input + JobContext — they land when a real consumer
477/// proves a need. For now: jobs hold their input in their own state
478/// (`self`) and the scheduler manages cancellation/clock externally
479/// (out-of-band via `tokio::time::timeout` + `CancellationToken`).
480#[async_trait::async_trait]
481pub trait Job: Send + Sync + 'static {
482    type Output: Send + 'static;
483    type Error: std::error::Error + Send + Sync + 'static;
484
485    /// Typed identity. Stable across cycles + scheduler restarts.
486    fn id(&self) -> JobId;
487
488    /// Typed work class.
489    fn kind(&self) -> JobKindId;
490
491    /// Side-effecting work. Called only on the `Ready → Running`
492    /// transition (`Signal::AllocateBudget` → `advance` → Running).
493    /// MUST be idempotent.
494    async fn execute(&self) -> Result<Self::Output, Self::Error>;
495}
496
497/// Trait-object dispatch surface. The scheduler holds
498/// `Box<dyn ErasedJob>` (`Job` itself isn't object-safe because of
499/// the associated types); `ErasedJob` collapses the typed Output +
500/// Error to `()` + boxed error so the scheduler can store
501/// heterogeneous jobs in one DAG.
502///
503/// Blanket impl below gives every `T: Job` an `ErasedJob` view for
504/// free — consumers write `impl Job for MyJob` and the scheduler
505/// consumes it via `Box<dyn ErasedJob>` automatically.
506#[async_trait::async_trait]
507pub trait ErasedJob: Send + Sync + 'static {
508    fn id(&self) -> JobId;
509    fn kind(&self) -> JobKindId;
510    async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
511}
512
513#[async_trait::async_trait]
514impl<T: Job> ErasedJob for T {
515    fn id(&self) -> JobId {
516        <T as Job>::id(self)
517    }
518
519    fn kind(&self) -> JobKindId {
520        <T as Job>::kind(self)
521    }
522
523    async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
524        match <T as Job>::execute(self).await {
525            Ok(_) => Ok(()),
526            Err(e) => Err(Box::new(e)),
527        }
528    }
529}
530
531/// Derived per-tick rollup the scheduler emits on every `tick`.
532#[derive(Serialize, Deserialize, Debug, Clone)]
533pub struct TickReceipt {
534    pub tick_at: chrono::DateTime<chrono::Utc>,
535    pub phase_counts: std::collections::BTreeMap<String, u32>,
536    pub transitions_this_tick: Vec<TransitionEvent>,
537    pub unhealed: Vec<UnhealedDrift>,
538}
539
540#[derive(Serialize, Deserialize, Debug, Clone)]
541pub struct TransitionEvent {
542    pub at: chrono::DateTime<chrono::Utc>,
543    pub job_id: JobId,
544    pub from: JobPhase,
545    pub to: JobPhase,
546    pub reason: TransitionReason,
547    /// Consumer-name tag (e.g. "tend", "forge-gen"). Stored as String
548    /// for serde compatibility.
549    pub tool: String,
550}
551
552#[derive(Serialize, Deserialize, Debug, Clone)]
553pub struct UnhealedDrift {
554    pub job_id: JobId,
555    pub phase: JobPhase,
556    pub age_seconds: u64,
557}
558
559#[derive(Serialize, Deserialize, Debug, Clone)]
560pub enum TransitionReason {
561    GateEvaluation,
562    BudgetAllocated,
563    ExecutionSucceeded,
564    ExecutionFailed(String),
565    RetryScheduled,
566    BackoffElapsed,
567    TimedOut,
568    Cancelled,
569    OperatorAction(String),
570}
571
572/// Read-only snapshot of the scheduler's current FSM map.
573#[derive(Debug, Clone)]
574pub struct Snapshot {
575    pub phases: std::collections::HashMap<JobId, JobPhase>,
576}
577
578impl Snapshot {
579    /// Derived view: every job currently in {Failed, Retrying,
580    /// Deadlettered}. Per `theory/SHIGOTO.md` §VII.2 — broader than
581    /// `TickReceipt.unhealed` (which is just Deadlettered +
582    /// WaitingForOperator) because it includes the transient retry
583    /// states. Useful for debugging "what's currently failing across
584    /// the fleet?" without waiting for jobs to deadletter.
585    ///
586    /// Returns owned tuples (cloned JobId + phase) so consumers can
587    /// hold the result across snapshot drops.
588    #[must_use]
589    pub fn failure_set(&self) -> Vec<(JobId, JobPhase)> {
590        self.phases
591            .iter()
592            .filter(|(_, p)| {
593                matches!(
594                    p,
595                    JobPhase::Failed { .. } | JobPhase::Retrying { .. } | JobPhase::Deadlettered
596                )
597            })
598            .map(|(id, p)| (id.clone(), p.clone()))
599            .collect()
600    }
601
602    /// Count of jobs in each named phase. Stable ordering. Useful for
603    /// receipts + dashboards.
604    #[must_use]
605    pub fn phase_counts(&self) -> std::collections::BTreeMap<&'static str, u32> {
606        let mut counts: std::collections::BTreeMap<&'static str, u32> =
607            std::collections::BTreeMap::new();
608        for phase in self.phases.values() {
609            let key = match phase {
610                JobPhase::Pending => "pending",
611                JobPhase::Gated => "gated",
612                JobPhase::Ready => "ready",
613                JobPhase::Running => "running",
614                JobPhase::Succeeded => "succeeded",
615                JobPhase::Failed { .. } => "failed",
616                JobPhase::Retrying { .. } => "retrying",
617                JobPhase::Skipped(_) => "skipped",
618                JobPhase::Deadlettered => "deadlettered",
619                JobPhase::WaitingForOperator => "waiting-for-operator",
620            };
621            *counts.entry(key).or_insert(0) += 1;
622        }
623        counts
624    }
625}
626
627#[cfg(test)]
628mod fsm_tests {
629    use super::*;
630
631    fn pass() -> Signal {
632        Signal::EvaluateGates(GateAggregate::AllPassed)
633    }
634    fn wait() -> Signal {
635        Signal::EvaluateGates(GateAggregate::SomeWaiting)
636    }
637    fn skip() -> Signal {
638        Signal::EvaluateGates(GateAggregate::Skipped(SkipReason::GateRejected))
639    }
640
641    // ── Pending dispatches ─────────────────────────────────────
642
643    #[test]
644    fn pending_with_all_pass_advances_to_ready() {
645        assert_eq!(advance(JobPhase::Pending, pass()).unwrap(), JobPhase::Ready);
646    }
647
648    #[test]
649    fn pending_with_some_wait_advances_to_gated() {
650        assert_eq!(advance(JobPhase::Pending, wait()).unwrap(), JobPhase::Gated);
651    }
652
653    #[test]
654    fn pending_with_skip_advances_to_skipped() {
655        match advance(JobPhase::Pending, skip()).unwrap() {
656            JobPhase::Skipped(SkipReason::GateRejected) => {}
657            other => panic!("expected Skipped(GateRejected), got {other:?}"),
658        }
659    }
660
661    // ── Gated re-evaluates each tick ───────────────────────────
662
663    #[test]
664    fn gated_to_ready_on_all_pass() {
665        assert_eq!(advance(JobPhase::Gated, pass()).unwrap(), JobPhase::Ready);
666    }
667
668    #[test]
669    fn gated_stays_gated_on_some_wait() {
670        assert_eq!(advance(JobPhase::Gated, wait()).unwrap(), JobPhase::Gated);
671    }
672
673    #[test]
674    fn gated_to_skipped_on_skip() {
675        matches!(
676            advance(JobPhase::Gated, skip()).unwrap(),
677            JobPhase::Skipped(_)
678        );
679    }
680
681    // ── Ready → Running on budget allocation ───────────────────
682
683    #[test]
684    fn ready_to_running_on_allocate_budget() {
685        assert_eq!(
686            advance(JobPhase::Ready, Signal::AllocateBudget).unwrap(),
687            JobPhase::Running
688        );
689    }
690
691    // ── Running terminates four ways ──────────────────────────
692
693    #[test]
694    fn running_to_succeeded_on_ok() {
695        assert_eq!(
696            advance(JobPhase::Running, Signal::ExecutionSucceeded).unwrap(),
697            JobPhase::Succeeded
698        );
699    }
700
701    #[test]
702    fn running_to_failed_on_err() {
703        assert_eq!(
704            advance(JobPhase::Running, Signal::ExecutionFailed).unwrap(),
705            JobPhase::Failed { attempts: 1 }
706        );
707    }
708
709    #[test]
710    fn running_to_failed_on_cancel() {
711        assert_eq!(
712            advance(JobPhase::Running, Signal::Cancel).unwrap(),
713            JobPhase::Failed { attempts: 1 }
714        );
715    }
716
717    #[test]
718    fn running_to_failed_on_timeout() {
719        assert_eq!(
720            advance(JobPhase::Running, Signal::Timeout).unwrap(),
721            JobPhase::Failed { attempts: 1 }
722        );
723    }
724
725    // ── Failed waits for retry decision ───────────────────────
726
727    #[test]
728    fn failed_to_retrying_when_retry_decided() {
729        assert_eq!(
730            advance(
731                JobPhase::Failed { attempts: 1 },
732                Signal::RetryDecide(RetryOutcome::Retry { until_ms: 12345 })
733            )
734            .unwrap(),
735            JobPhase::Retrying { until_ms: 12345 }
736        );
737    }
738
739    #[test]
740    fn failed_to_deadlettered_when_retries_exhausted() {
741        assert_eq!(
742            advance(
743                JobPhase::Failed { attempts: 3 },
744                Signal::RetryDecide(RetryOutcome::Deadletter)
745            )
746            .unwrap(),
747            JobPhase::Deadlettered
748        );
749    }
750
751    // ── Retrying re-enters Pending after backoff ───────────────
752
753    #[test]
754    fn retrying_to_pending_after_backoff() {
755        assert_eq!(
756            advance(JobPhase::Retrying { until_ms: 100 }, Signal::BackoffElapsed).unwrap(),
757            JobPhase::Pending
758        );
759    }
760
761    // ── Operator-driven transitions ───────────────────────────
762
763    #[test]
764    fn waiting_for_operator_to_ready_via_operator() {
765        assert_eq!(
766            advance(
767                JobPhase::WaitingForOperator,
768                Signal::OperatorTransition(JobPhase::Ready)
769            )
770            .unwrap(),
771            JobPhase::Ready
772        );
773    }
774
775    #[test]
776    fn waiting_for_operator_to_skipped_via_operator() {
777        let result = advance(
778            JobPhase::WaitingForOperator,
779            Signal::OperatorTransition(JobPhase::Skipped(SkipReason::OperatorDecision)),
780        )
781        .unwrap();
782        assert!(matches!(
783            result,
784            JobPhase::Skipped(SkipReason::OperatorDecision)
785        ));
786    }
787
788    #[test]
789    fn deadlettered_to_pending_via_operator() {
790        assert_eq!(
791            advance(
792                JobPhase::Deadlettered,
793                Signal::OperatorTransition(JobPhase::Pending)
794            )
795            .unwrap(),
796            JobPhase::Pending
797        );
798    }
799
800    // ── Illegal transitions ───────────────────────────────────
801
802    #[test]
803    fn pending_with_allocate_budget_is_illegal() {
804        let err = advance(JobPhase::Pending, Signal::AllocateBudget).unwrap_err();
805        assert_eq!(err.from, JobPhase::Pending);
806    }
807
808    #[test]
809    fn succeeded_with_anything_is_illegal_except_no_outbound() {
810        // Succeeded is terminal-for-cycle. No transition signals
811        // advance from it (operator re-runs land on Deadlettered →
812        // Pending, not Succeeded → anything).
813        let err = advance(JobPhase::Succeeded, Signal::AllocateBudget).unwrap_err();
814        assert_eq!(err.from, JobPhase::Succeeded);
815    }
816
817    #[test]
818    fn deadlettered_with_random_operator_transition_is_illegal() {
819        // Only Deadlettered → Pending via OperatorTransition.
820        let err = advance(
821            JobPhase::Deadlettered,
822            Signal::OperatorTransition(JobPhase::Ready),
823        )
824        .unwrap_err();
825        assert!(matches!(err.from, JobPhase::Deadlettered));
826    }
827
828    #[test]
829    fn running_with_evaluate_gates_is_illegal() {
830        // A Running job is past the gate phase; re-evaluating gates
831        // doesn't apply.
832        let err = advance(JobPhase::Running, pass()).unwrap_err();
833        assert_eq!(err.from, JobPhase::Running);
834    }
835
836    #[test]
837    fn waiting_for_operator_with_evaluate_gates_is_illegal() {
838        // WaitingForOperator requires explicit operator action.
839        let err = advance(JobPhase::WaitingForOperator, pass()).unwrap_err();
840        assert_eq!(err.from, JobPhase::WaitingForOperator);
841    }
842
843    #[test]
844    fn signal_is_operator_driven_classifier() {
845        assert!(Signal::OperatorTransition(JobPhase::Pending).is_operator_driven());
846        assert!(!Signal::AllocateBudget.is_operator_driven());
847        assert!(!pass().is_operator_driven());
848    }
849}
850
851#[cfg(test)]
852mod job_tests {
853    use super::*;
854
855    /// Sample Job impl — a no-op that succeeds. Verifies the trait
856    /// shape compiles + the ErasedJob blanket impl gives us
857    /// trait-object dispatch.
858    struct NoopJob;
859
860    #[derive(thiserror::Error, Debug)]
861    #[error("noop")]
862    struct NoopError;
863
864    #[async_trait::async_trait]
865    impl Job for NoopJob {
866        type Output = ();
867        type Error = NoopError;
868
869        fn id(&self) -> JobId {
870            JobId {
871                scope: JobScope::Global,
872                kind: JobKindId::new("noop"),
873                subject: JobSubject::None,
874            }
875        }
876
877        fn kind(&self) -> JobKindId {
878            JobKindId::new("noop")
879        }
880
881        async fn execute(&self) -> Result<(), NoopError> {
882            Ok(())
883        }
884    }
885
886    #[tokio::test]
887    async fn job_trait_compiles_and_executes() {
888        let j = NoopJob;
889        assert_eq!(<NoopJob as Job>::id(&j).kind.0, "noop");
890        assert!(j.execute().await.is_ok());
891    }
892
893    #[tokio::test]
894    async fn erased_job_blanket_impl_gives_trait_object() {
895        let j: Box<dyn ErasedJob> = Box::new(NoopJob);
896        assert_eq!(j.id().kind.0, "noop");
897        assert!(j.execute_erased().await.is_ok());
898    }
899
900    // ── RecordingJob tests ──────────────────────────────────────────
901
902    use std::sync::Arc;
903    use std::sync::Mutex;
904
905    /// Simple in-memory sink used to verify the blanket `Job::execute`
906    /// records outputs after `execute_body` succeeds.
907    #[derive(Default)]
908    struct CaptureSink<O: Clone + Send + Sync + 'static> {
909        records: Mutex<Vec<(JobId, O)>>,
910    }
911
912    #[async_trait::async_trait]
913    impl<O: Clone + Send + Sync + 'static> OutputSink<O> for CaptureSink<O> {
914        async fn record(&self, job_id: &JobId, output: &O) {
915            self.records
916                .lock()
917                .expect("CaptureSink mutex poisoned")
918                .push((job_id.clone(), output.clone()));
919        }
920    }
921
922    /// Reference impl exercising every `RecordingJob` callback.
923    struct RecJob {
924        scope: JobScope,
925        subject: JobSubject,
926        sink: Option<Arc<dyn OutputSink<u32>>>,
927        answer: u32,
928    }
929
930    #[async_trait::async_trait]
931    impl RecordingJob for RecJob {
932        type Output = u32;
933        type Error = NoopError;
934        const KIND: &'static str = "test-recording";
935
936        fn scope(&self) -> JobScope {
937            self.scope.clone()
938        }
939        fn subject(&self) -> JobSubject {
940            self.subject.clone()
941        }
942        fn output_sink(&self) -> Option<&Arc<dyn OutputSink<Self::Output>>> {
943            self.sink.as_ref()
944        }
945        async fn execute_body(&self) -> Result<u32, NoopError> {
946            Ok(self.answer)
947        }
948    }
949
950    #[tokio::test]
951    async fn recording_job_blanket_provides_job_id_and_kind() {
952        let job = RecJob {
953            scope: JobScope::Workspace("ws".into()),
954            subject: JobSubject::Repo("r".into()),
955            sink: None,
956            answer: 1,
957        };
958        let id = <RecJob as Job>::id(&job);
959        assert_eq!(id.kind.0, "test-recording");
960        match id.scope {
961            JobScope::Workspace(w) => assert_eq!(w, "ws"),
962            _ => panic!("wrong scope"),
963        }
964        match id.subject {
965            JobSubject::Repo(r) => assert_eq!(r, "r"),
966            _ => panic!("wrong subject"),
967        }
968        let kind = <RecJob as Job>::kind(&job);
969        assert_eq!(kind.0, "test-recording");
970    }
971
972    #[tokio::test]
973    async fn recording_job_blanket_execute_records_to_sink_on_success() {
974        let sink: Arc<CaptureSink<u32>> = Arc::new(CaptureSink::default());
975        let sink_dyn: Arc<dyn OutputSink<u32>> = sink.clone();
976        let job = RecJob {
977            scope: JobScope::Global,
978            subject: JobSubject::None,
979            sink: Some(sink_dyn),
980            answer: 42,
981        };
982        let result = <RecJob as Job>::execute(&job).await.unwrap();
983        assert_eq!(result, 42);
984
985        let recs = sink.records.lock().unwrap();
986        assert_eq!(recs.len(), 1, "sink should have captured one record");
987        assert_eq!(recs[0].1, 42);
988    }
989
990    #[tokio::test]
991    async fn recording_job_without_sink_skips_recording() {
992        let job = RecJob {
993            scope: JobScope::Global,
994            subject: JobSubject::None,
995            sink: None,
996            answer: 7,
997        };
998        // execute returns Ok with the typed Output; no sink to verify
999        // against — just confirm the absent-sink path doesn't panic.
1000        let result = <RecJob as Job>::execute(&job).await.unwrap();
1001        assert_eq!(result, 7);
1002    }
1003}
1004
1005#[cfg(test)]
1006mod snapshot_tests {
1007    use super::*;
1008    use std::collections::HashMap;
1009
1010    fn id(name: &str) -> JobId {
1011        JobId {
1012            scope: JobScope::Global,
1013            kind: JobKindId::new("k"),
1014            subject: JobSubject::Pinned(name.into()),
1015        }
1016    }
1017
1018    fn snapshot_with(entries: Vec<(&str, JobPhase)>) -> Snapshot {
1019        let mut phases: HashMap<JobId, JobPhase> = HashMap::new();
1020        for (name, phase) in entries {
1021            phases.insert(id(name), phase);
1022        }
1023        Snapshot { phases }
1024    }
1025
1026    #[test]
1027    fn failure_set_includes_failed_retrying_deadlettered() {
1028        let s = snapshot_with(vec![
1029            ("ok", JobPhase::Succeeded),
1030            ("dead", JobPhase::Deadlettered),
1031            ("flap", JobPhase::Failed { attempts: 2 }),
1032            ("waiting", JobPhase::WaitingForOperator),
1033            ("retry", JobPhase::Retrying { until_ms: 0 }),
1034            ("ready", JobPhase::Ready),
1035        ]);
1036        let fs = s.failure_set();
1037        let names: std::collections::HashSet<String> = fs
1038            .iter()
1039            .filter_map(|(id, _)| match &id.subject {
1040                JobSubject::Pinned(s) => Some(s.clone()),
1041                _ => None,
1042            })
1043            .collect();
1044        assert_eq!(names.len(), 3);
1045        assert!(names.contains("dead"));
1046        assert!(names.contains("flap"));
1047        assert!(names.contains("retry"));
1048        // WaitingForOperator does NOT appear (it's unhealed, not
1049        // a failure per §VII.2).
1050        assert!(!names.contains("waiting"));
1051        // Ok / Ready don't appear either.
1052        assert!(!names.contains("ok"));
1053        assert!(!names.contains("ready"));
1054    }
1055
1056    #[test]
1057    fn phase_counts_summarizes_every_phase() {
1058        let s = snapshot_with(vec![
1059            ("a", JobPhase::Pending),
1060            ("b", JobPhase::Pending),
1061            ("c", JobPhase::Succeeded),
1062            ("d", JobPhase::Deadlettered),
1063        ]);
1064        let counts = s.phase_counts();
1065        assert_eq!(counts.get("pending"), Some(&2));
1066        assert_eq!(counts.get("succeeded"), Some(&1));
1067        assert_eq!(counts.get("deadlettered"), Some(&1));
1068        // Absent phases don't appear (no 0-counts).
1069        assert!(counts.get("ready").is_none());
1070    }
1071}