Skip to main content

shigoto_types/
lib.rs

1//! shigoto-types — typed primitives every other shigoto crate consumes.
2//!
3//! Spec: `theory/SHIGOTO.md` §III.1–III.4 + §III.11–III.12.
4//!
5//! v0.1.0 is the scaffold: trait + enum + struct surfaces declared,
6//! implementations land as the bootstrap consumer (`tend`) migrates
7//! per `theory/SHIGOTO.md` §IV.3 (M0.9 in the broader plan).
8
9#![forbid(unsafe_code)]
10
11use std::path::PathBuf;
12
13use serde::{Deserialize, Serialize};
14
15pub mod failure;
16pub use failure::{classify, signature, Failure, FailureKind};
17
18pub mod sink;
19pub use sink::{AuditFileSink, InMemorySink, MultiSink, NullSink, Sink};
20
21pub mod chain;
22pub use chain::Chain;
23
24pub mod classify;
25pub use classify::{ChainedClassifier, Classifier, FailureClassifier, FnClassifier};
26
27pub mod watch;
28pub use watch::{EscalationRouting, ScheduleWindow, TimeoutWatcher, WatchAction, WatchRule};
29
30// Lightweight convergence primitives re-homed from magma-converge
31// (2026-06-02) — see theory/CONVERGENCE-ADOPTION.md. General, pure,
32// IaC-free, so lightweight controllers adopt them without the magma
33// executor closure.
34pub mod policy;
35pub use policy::CascadePolicy;
36pub mod decision;
37pub use decision::Decision;
38
39pub mod testing;
40
41/// Typed identity for a Job. Stable across cycles + scheduler restarts.
42#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
43pub struct JobId {
44    pub scope: JobScope,
45    pub kind: JobKindId,
46    pub subject: JobSubject,
47}
48
49#[derive(
50    Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
51    gen_platform::Discriminant,
52    gen_platform::IsVariant,
53)]
54#[discriminant(method = "kind", case = "kebab")]
55pub enum JobScope {
56    Global,
57    Workspace(String),
58    Repo { workspace: String, repo: String },
59}
60
61#[derive(
62    Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
63    gen_platform::Discriminant,
64    gen_platform::IsVariant,
65)]
66#[discriminant(method = "kind", case = "kebab")]
67pub enum JobSubject {
68    None,
69    Repo(String),
70    Org(String),
71    Path(PathBuf),
72    Pinned(String),
73}
74
75/// Typed work-class identifier. Stored as `String` (not `&'static str`)
76/// so it serializes through serde without lifetime constraints. Cheap
77/// `Clone` is fine for the volume we expect (≤ ~100 kinds across the
78/// whole scheduler).
79#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
80pub struct JobKindId(pub String);
81
82impl JobKindId {
83    #[must_use]
84    pub fn new(s: impl Into<String>) -> Self {
85        Self(s.into())
86    }
87}
88
89/// FSM phase a Job inhabits. See `theory/SHIGOTO.md` §III.3 for the
90/// transition table.
91///
92/// `kind()` (variant → stable kebab-case string) + variant predicates
93/// (`is_pending`, `is_gated`, ...) auto-generated via gen-platform
94/// derives.
95#[derive(
96    Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
97    gen_platform::Discriminant,
98    gen_platform::IsVariant,
99)]
100#[discriminant(method = "kind", case = "kebab")]
101pub enum JobPhase {
102    Pending,
103    Gated,
104    Ready,
105    Running,
106    Succeeded,
107    Failed { attempts: u32 },
108    Retrying { until_ms: i64 },
109    Skipped(SkipReason),
110    Deadlettered,
111    WaitingForOperator,
112}
113
114#[derive(
115    Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
116    gen_platform::Discriminant,
117    gen_platform::IsVariant,
118)]
119#[discriminant(method = "kind", case = "kebab")]
120pub enum SkipReason {
121    GateRejected,
122    BlockedByDeadletteredAncestor,
123    OperatorDecision,
124    Other(String),
125}
126
127/// FSM driver — every legal way a Job's phase can change. Exhaustive
128/// over the `(JobPhase, Signal)` cross-product per `theory/SHIGOTO.md`
129/// §IV.1; the `advance` table below enumerates every cell.
130///
131/// `kind()` + variant predicates auto-generated via gen-platform.
132#[derive(
133    Debug, Clone, PartialEq, Eq,
134    gen_platform::Discriminant,
135    gen_platform::IsVariant,
136)]
137#[discriminant(method = "kind", case = "kebab")]
138pub enum Signal {
139    /// Re-evaluate gates for a Pending or Gated or Retrying job. The
140    /// outcome (Gated / Ready / Skipped) is carried in the signal
141    /// payload so the FSM stays pure — gate evaluation itself is in
142    /// shigoto-gate.
143    EvaluateGates(GateAggregate),
144    /// Scheduler chose to start this Ready job (budget allocated).
145    AllocateBudget,
146    /// `execute()` returned Ok(output).
147    ExecutionSucceeded,
148    /// `execute()` returned Err.
149    ExecutionFailed,
150    /// Retry decision after Failed. Carries whether to retry (then
151    /// Retrying with backoff) or deadletter.
152    RetryDecide(RetryOutcome),
153    /// Cooperative cancellation signal — Running job was told to
154    /// stop. Maps to Failed (with cancellation error).
155    Cancel,
156    /// Per-job timeout elapsed while Running. Maps to Failed (with
157    /// timeout error).
158    Timeout,
159    /// Retry backoff window elapsed; ready to re-evaluate gates.
160    BackoffElapsed,
161    /// Externally-driven transition from operator action. Constrained
162    /// to (WaitingForOperator → Ready|Skipped) and
163    /// (Deadlettered → Pending) per §VII.3.
164    OperatorTransition(JobPhase),
165}
166
167/// Aggregate gate outcome — what the cohort of gates collectively said.
168/// Per §III.9 individual gates return Pass / Wait / Skip; the
169/// aggregate is the worst outcome (Skip > Wait > Pass) per a typed
170/// reducer in shigoto-gate. We carry the rolled-up result here so the
171/// FSM stays language-agnostic about how the rollup is computed.
172///
173/// `kind()` + variant predicates auto-generated via gen-platform.
174#[derive(
175    Debug, Clone, PartialEq, Eq,
176    gen_platform::Discriminant,
177    gen_platform::IsVariant,
178)]
179#[discriminant(method = "kind", case = "kebab")]
180pub enum GateAggregate {
181    /// Every gate returned Pass — job advances to Ready.
182    AllPassed,
183    /// At least one gate returned Wait — job stays Gated.
184    SomeWaiting,
185    /// At least one gate returned Skip — job advances to Skipped with
186    /// that reason.
187    Skipped(SkipReason),
188}
189
190/// Retry decision from a `RetryPolicy::decide()` call. Same shape that
191/// shigoto-retry's `RetryDecision` exposes — duplicated as a typed
192/// signal payload so the FSM stays in shigoto-types without a
193/// dependency on shigoto-retry.
194#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, gen_platform::TypedDispatcher)]
195pub enum RetryOutcome {
196    /// Retry after `until_ms` (absolute timestamp).
197    Retry { until_ms: i64 },
198    /// No more attempts — deadletter.
199    Deadletter,
200}
201
202// Fleet-wide dispatcher-catalog registration for shigoto's retry
203// outcome typed surface. Fifth consumer class (after gen / caixa /
204// wasm-platform / cofre). See theory/UNIFIED-COMPUTING-MODEL.md §VI
205// for the absorption roadmap.
206gen_platform::register_dispatcher!("shigoto.retry-outcome", RetryOutcome);
207
208/// Rejected transition — `(from, signal)` is not a legal FSM cell.
209/// Returning Result instead of panicking lets consumers report drift
210/// (an operator action attempting an illegal transition) without
211/// crashing the scheduler.
212#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
213#[error("illegal transition: {from:?} cannot consume {signal:?}")]
214pub struct IllegalTransition {
215    pub from: JobPhase,
216    pub signal: Signal,
217}
218
219impl Signal {
220    /// Whether this signal originated from an operator command.
221    /// Operator-driven transitions get logged with extra metadata.
222    #[must_use]
223    pub fn is_operator_driven(&self) -> bool {
224        matches!(self, Self::OperatorTransition(_))
225    }
226}
227
228/// The canonical FSM driver. Pure: same `(from, signal)` always
229/// produces the same result. Exhaustive over `JobPhase × Signal` —
230/// adding a new phase or signal fails to compile until every cell
231/// of the cross-product is decided.
232///
233/// Per `theory/SHIGOTO.md` §IV.1 + the diagram in §III.3.
234pub fn advance(from: JobPhase, signal: Signal) -> Result<JobPhase, IllegalTransition> {
235    use Signal::*;
236    let new = match (&from, &signal) {
237        // ── Pending dispatches via gate evaluation ─────────────
238        (JobPhase::Pending, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
239        (JobPhase::Pending, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
240        (JobPhase::Pending, EvaluateGates(GateAggregate::Skipped(r))) => {
241            JobPhase::Skipped(r.clone())
242        }
243
244        // ── Gated re-evaluates each tick ───────────────────────
245        (JobPhase::Gated, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
246        (JobPhase::Gated, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
247        (JobPhase::Gated, EvaluateGates(GateAggregate::Skipped(r))) => {
248            JobPhase::Skipped(r.clone())
249        }
250
251        // ── Ready awaits budget allocation ─────────────────────
252        (JobPhase::Ready, AllocateBudget) => JobPhase::Running,
253        // Re-evaluating gates from Ready is allowed (a config change
254        // may have invalidated a previously-Pass gate); same dispatch
255        // as Gated.
256        (JobPhase::Ready, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
257        (JobPhase::Ready, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
258        (JobPhase::Ready, EvaluateGates(GateAggregate::Skipped(r))) => {
259            JobPhase::Skipped(r.clone())
260        }
261
262        // ── Running terminates per execute() outcome ───────────
263        (JobPhase::Running, ExecutionSucceeded) => JobPhase::Succeeded,
264        (JobPhase::Running, ExecutionFailed) => JobPhase::Failed { attempts: 1 },
265        (JobPhase::Running, Cancel) => JobPhase::Failed { attempts: 1 },
266        (JobPhase::Running, Timeout) => JobPhase::Failed { attempts: 1 },
267
268        // ── Failed waits for the retry policy's decision ───────
269        (JobPhase::Failed { attempts: _ }, RetryDecide(RetryOutcome::Retry { until_ms })) => {
270            JobPhase::Retrying {
271                until_ms: *until_ms,
272            }
273        }
274        (JobPhase::Failed { .. }, RetryDecide(RetryOutcome::Deadletter)) => JobPhase::Deadlettered,
275
276        // ── Retrying re-evaluates after the backoff window ─────
277        (JobPhase::Retrying { .. }, BackoffElapsed) => JobPhase::Pending,
278        // Gate re-eval from Retrying is allowed if a precondition
279        // changes (rare; treated like Pending re-eval).
280        (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
281        (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
282        (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::Skipped(r))) => {
283            JobPhase::Skipped(r.clone())
284        }
285
286        // ── Operator-driven transitions (§VII.3) ───────────────
287        // WaitingForOperator → Ready or Skipped.
288        (JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Ready)) => JobPhase::Ready,
289        (JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Skipped(r))) => {
290            JobPhase::Skipped(r.clone())
291        }
292        // Deadlettered → Pending (operator retries from scratch).
293        (JobPhase::Deadlettered, OperatorTransition(JobPhase::Pending)) => JobPhase::Pending,
294
295        // ── Every other (phase, signal) pair is illegal ────────
296        _ => return Err(IllegalTransition { from, signal }),
297    };
298    Ok(new)
299}
300
301/// Inputs / Outputs / Errors implement these marker traits so the
302/// scheduler can serialize across boundaries when persistence lands.
303pub trait JobInput: Send + Sync + 'static {}
304pub trait JobOutput: Send + Sync + 'static {}
305pub trait JobError: std::error::Error + Send + Sync + 'static {}
306
307/// Typed receiver for `Job::Output` values. Jobs call `record` on a
308/// successful `execute` so consumers (reconcile receipts, audit
309/// trails, dashboards) can read the typed outcomes the scheduler's
310/// phase-tracking discards.
311///
312/// Per `theory/SHIGOTO.md` §VIII (output capture) — the scheduler
313/// itself doesn't hold sinks; Jobs carry them. This keeps the typed
314/// `O` parameter from leaking into the scheduler's heterogeneous
315/// `Vec<Box<dyn ErasedJob>>` storage (which would require type
316/// erasure on the sink too). The cost: each Job impl decides which
317/// sink (if any) to wire in, but the gain is full type safety from
318/// producer to consumer.
319///
320/// Implementations:
321/// - `NullSink<O>` — discards everything; default for Jobs that
322///   don't care about output capture.
323/// - `InMemorySink<O>` — stores into `Arc<Mutex<HashMap<JobId, O>>>`
324///   so the consumer can drain after ticks complete.
325/// - Both live in `shigoto-emit` alongside the `TransitionEmitter`
326///   sinks, since the two surfaces are conceptually paired.
327///
328/// `record` takes `&O` so non-`Clone` Outputs are allowed at the
329/// trait level. Concrete sinks that need owned values (`InMemorySink`)
330/// add `O: Clone` at their `impl` boundary, not on the trait.
331///
332/// `record` is async because audit-style sinks may want to fsync or
333/// push to a queue; in-memory sinks return immediately.
334#[async_trait::async_trait]
335pub trait OutputSink<O>: Send + Sync + 'static
336where
337    O: Send + Sync + 'static,
338{
339    /// Called by a Job from its `execute` method after computing a
340    /// successful `Output`. The Job retains ownership; sinks that
341    /// need storage should clone internally.
342    ///
343    /// `O: Sync` is required because the async-trait desugar captures
344    /// `&O` across an `await` boundary — the resulting future is only
345    /// `Send` when the borrowed reference is. Outputs that aren't
346    /// `Sync` (rare for plain data; common for things like `Cell`)
347    /// can't use the typed sink and must capture via a side channel.
348    async fn record(&self, job_id: &JobId, output: &O);
349}
350
351/// Convenience trait that captures the most common Job authoring
352/// shape across pleme-io consumers: a Job whose typed Output flows
353/// through an [`OutputSink`] for consumer-side capture, and whose
354/// identity decomposes into (scope, kind, subject).
355///
356/// Implementations write only the per-Job logic:
357/// - `KIND` — the typed work-class id constant.
358/// - `scope()` / `subject()` — the two non-kind coordinates of `JobId`.
359/// - `output_sink()` — optional wired sink for output capture.
360/// - `execute_body()` — the actual side-effecting work.
361///
362/// The blanket `impl<T: RecordingJob> Job for T` below derives:
363/// - `Job::id()` — assembled from scope() + subject() + KIND.
364/// - `Job::kind()` — `JobKindId::new(T::KIND)`.
365/// - `Job::execute()` — calls `execute_body`, then records to the
366///   sink (when present) before returning the typed Output.
367///
368/// Consumers either implement `Job` directly (full control) or
369/// `RecordingJob` (the common case). Not both — the orphan rule +
370/// the blanket impl mean implementing one excludes the other for
371/// the same type.
372#[async_trait::async_trait]
373pub trait RecordingJob: Send + Sync + 'static {
374    /// Typed work output. `Send + Sync + Clone` are needed so the
375    /// blanket `Job::execute` can call `sink.record(&id, &output)`
376    /// across an await boundary, and so `InMemorySink<O>` can hold
377    /// owned copies.
378    type Output: Send + Sync + Clone + 'static;
379
380    /// Typed error. Same bounds as `Job::Error`.
381    type Error: std::error::Error + Send + Sync + 'static;
382
383    /// Canonical kind id. Compile-time constant so two Jobs of the
384    /// same impl always share the same `JobKindId` without an
385    /// allocation per call.
386    const KIND: &'static str;
387
388    /// First coordinate of `JobId`. Implementations typically clone
389    /// from `self.workspace` or similar.
390    fn scope(&self) -> JobScope;
391
392    /// Third coordinate of `JobId`. Implementations typically clone
393    /// from `self.repo_name` or similar.
394    fn subject(&self) -> JobSubject;
395
396    /// Optional typed sink. `None` means outputs are dropped after
397    /// execute returns; `Some` records every successful outcome.
398    fn output_sink(&self) -> Option<&std::sync::Arc<dyn OutputSink<Self::Output>>>;
399
400    /// The actual work. Run only on Ready→Running. MUST be idempotent.
401    /// The blanket `Job::execute` wraps this with sink recording so
402    /// callers never write the `if let Some(sink) = ... { sink.record... }`
403    /// dance themselves.
404    async fn execute_body(&self) -> Result<Self::Output, Self::Error>;
405}
406
407#[async_trait::async_trait]
408impl<T: RecordingJob> Job for T {
409    type Output = T::Output;
410    type Error = T::Error;
411
412    fn id(&self) -> JobId {
413        JobId {
414            scope: self.scope(),
415            kind: JobKindId::new(T::KIND),
416            subject: self.subject(),
417        }
418    }
419
420    fn kind(&self) -> JobKindId {
421        JobKindId::new(T::KIND)
422    }
423
424    async fn execute(&self) -> Result<T::Output, T::Error> {
425        let outcome = self.execute_body().await?;
426        if let Some(sink) = self.output_sink() {
427            // Compute the JobId twice (once here, once via id() above).
428            // Cheap — id() is pure data clones.
429            let id = JobId {
430                scope: self.scope(),
431                kind: JobKindId::new(T::KIND),
432                subject: self.subject(),
433            };
434            sink.record(&id, &outcome).await;
435        }
436        Ok(outcome)
437    }
438}
439
440/// The typed Job trait — what every consumer's domain-specific job
441/// implements. Per `theory/SHIGOTO.md` §III.1.
442///
443/// Constraints baked in:
444/// - `'static + Send + Sync` — jobs may move between scheduler threads.
445/// - Typed `Output` / `Error` — no untyped `Box<dyn Error>` in the
446///   business-logic surface. Erased dispatch is `ErasedJob`.
447/// - `execute` is async on tokio. Sync work uses `spawn_blocking`.
448/// - `id()` and `kind()` are pure: scheduler reads them many times
449///   per cycle; no IO.
450/// - `execute` MUST be idempotent (§IV.2). A scheduler that crashes
451///   between completing a side effect and emitting `Succeeded` will
452///   re-invoke execute on the next cycle.
453///
454/// v0.1 omits Input + JobContext — they land when a real consumer
455/// proves a need. For now: jobs hold their input in their own state
456/// (`self`) and the scheduler manages cancellation/clock externally
457/// (out-of-band via `tokio::time::timeout` + `CancellationToken`).
458#[async_trait::async_trait]
459pub trait Job: Send + Sync + 'static {
460    type Output: Send + 'static;
461    type Error: std::error::Error + Send + Sync + 'static;
462
463    /// Typed identity. Stable across cycles + scheduler restarts.
464    fn id(&self) -> JobId;
465
466    /// Typed work class.
467    fn kind(&self) -> JobKindId;
468
469    /// Side-effecting work. Called only on the `Ready → Running`
470    /// transition (`Signal::AllocateBudget` → `advance` → Running).
471    /// MUST be idempotent.
472    async fn execute(&self) -> Result<Self::Output, Self::Error>;
473}
474
475/// Trait-object dispatch surface. The scheduler holds
476/// `Box<dyn ErasedJob>` (`Job` itself isn't object-safe because of
477/// the associated types); `ErasedJob` collapses the typed Output +
478/// Error to `()` + boxed error so the scheduler can store
479/// heterogeneous jobs in one DAG.
480///
481/// Blanket impl below gives every `T: Job` an `ErasedJob` view for
482/// free — consumers write `impl Job for MyJob` and the scheduler
483/// consumes it via `Box<dyn ErasedJob>` automatically.
484#[async_trait::async_trait]
485pub trait ErasedJob: Send + Sync + 'static {
486    fn id(&self) -> JobId;
487    fn kind(&self) -> JobKindId;
488    async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
489}
490
491#[async_trait::async_trait]
492impl<T: Job> ErasedJob for T {
493    fn id(&self) -> JobId {
494        <T as Job>::id(self)
495    }
496
497    fn kind(&self) -> JobKindId {
498        <T as Job>::kind(self)
499    }
500
501    async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
502        match <T as Job>::execute(self).await {
503            Ok(_) => Ok(()),
504            Err(e) => Err(Box::new(e)),
505        }
506    }
507}
508
509/// Derived per-tick rollup the scheduler emits on every `tick`.
510#[derive(Serialize, Deserialize, Debug, Clone)]
511pub struct TickReceipt {
512    pub tick_at: chrono::DateTime<chrono::Utc>,
513    pub phase_counts: std::collections::BTreeMap<String, u32>,
514    pub transitions_this_tick: Vec<TransitionEvent>,
515    pub unhealed: Vec<UnhealedDrift>,
516}
517
518#[derive(Serialize, Deserialize, Debug, Clone)]
519pub struct TransitionEvent {
520    pub at: chrono::DateTime<chrono::Utc>,
521    pub job_id: JobId,
522    pub from: JobPhase,
523    pub to: JobPhase,
524    pub reason: TransitionReason,
525    /// Consumer-name tag (e.g. "tend", "forge-gen"). Stored as String
526    /// for serde compatibility.
527    pub tool: String,
528}
529
530#[derive(Serialize, Deserialize, Debug, Clone)]
531pub struct UnhealedDrift {
532    pub job_id: JobId,
533    pub phase: JobPhase,
534    pub age_seconds: u64,
535}
536
537#[derive(Serialize, Deserialize, Debug, Clone)]
538pub enum TransitionReason {
539    GateEvaluation,
540    BudgetAllocated,
541    ExecutionSucceeded,
542    ExecutionFailed(String),
543    RetryScheduled,
544    BackoffElapsed,
545    TimedOut,
546    Cancelled,
547    OperatorAction(String),
548}
549
550/// Read-only snapshot of the scheduler's current FSM map.
551#[derive(Debug, Clone)]
552pub struct Snapshot {
553    pub phases: std::collections::HashMap<JobId, JobPhase>,
554}
555
556impl Snapshot {
557    /// Derived view: every job currently in {Failed, Retrying,
558    /// Deadlettered}. Per `theory/SHIGOTO.md` §VII.2 — broader than
559    /// `TickReceipt.unhealed` (which is just Deadlettered +
560    /// WaitingForOperator) because it includes the transient retry
561    /// states. Useful for debugging "what's currently failing across
562    /// the fleet?" without waiting for jobs to deadletter.
563    ///
564    /// Returns owned tuples (cloned JobId + phase) so consumers can
565    /// hold the result across snapshot drops.
566    #[must_use]
567    pub fn failure_set(&self) -> Vec<(JobId, JobPhase)> {
568        self.phases
569            .iter()
570            .filter(|(_, p)| {
571                matches!(
572                    p,
573                    JobPhase::Failed { .. }
574                        | JobPhase::Retrying { .. }
575                        | JobPhase::Deadlettered
576                )
577            })
578            .map(|(id, p)| (id.clone(), p.clone()))
579            .collect()
580    }
581
582    /// Count of jobs in each named phase. Stable ordering. Useful for
583    /// receipts + dashboards.
584    #[must_use]
585    pub fn phase_counts(&self) -> std::collections::BTreeMap<&'static str, u32> {
586        let mut counts: std::collections::BTreeMap<&'static str, u32> =
587            std::collections::BTreeMap::new();
588        for phase in self.phases.values() {
589            let key = match phase {
590                JobPhase::Pending => "pending",
591                JobPhase::Gated => "gated",
592                JobPhase::Ready => "ready",
593                JobPhase::Running => "running",
594                JobPhase::Succeeded => "succeeded",
595                JobPhase::Failed { .. } => "failed",
596                JobPhase::Retrying { .. } => "retrying",
597                JobPhase::Skipped(_) => "skipped",
598                JobPhase::Deadlettered => "deadlettered",
599                JobPhase::WaitingForOperator => "waiting-for-operator",
600            };
601            *counts.entry(key).or_insert(0) += 1;
602        }
603        counts
604    }
605}
606
607#[cfg(test)]
608mod fsm_tests {
609    use super::*;
610
611    fn pass() -> Signal {
612        Signal::EvaluateGates(GateAggregate::AllPassed)
613    }
614    fn wait() -> Signal {
615        Signal::EvaluateGates(GateAggregate::SomeWaiting)
616    }
617    fn skip() -> Signal {
618        Signal::EvaluateGates(GateAggregate::Skipped(SkipReason::GateRejected))
619    }
620
621    // ── Pending dispatches ─────────────────────────────────────
622
623    #[test]
624    fn pending_with_all_pass_advances_to_ready() {
625        assert_eq!(advance(JobPhase::Pending, pass()).unwrap(), JobPhase::Ready);
626    }
627
628    #[test]
629    fn pending_with_some_wait_advances_to_gated() {
630        assert_eq!(advance(JobPhase::Pending, wait()).unwrap(), JobPhase::Gated);
631    }
632
633    #[test]
634    fn pending_with_skip_advances_to_skipped() {
635        match advance(JobPhase::Pending, skip()).unwrap() {
636            JobPhase::Skipped(SkipReason::GateRejected) => {}
637            other => panic!("expected Skipped(GateRejected), got {other:?}"),
638        }
639    }
640
641    // ── Gated re-evaluates each tick ───────────────────────────
642
643    #[test]
644    fn gated_to_ready_on_all_pass() {
645        assert_eq!(advance(JobPhase::Gated, pass()).unwrap(), JobPhase::Ready);
646    }
647
648    #[test]
649    fn gated_stays_gated_on_some_wait() {
650        assert_eq!(advance(JobPhase::Gated, wait()).unwrap(), JobPhase::Gated);
651    }
652
653    #[test]
654    fn gated_to_skipped_on_skip() {
655        matches!(
656            advance(JobPhase::Gated, skip()).unwrap(),
657            JobPhase::Skipped(_)
658        );
659    }
660
661    // ── Ready → Running on budget allocation ───────────────────
662
663    #[test]
664    fn ready_to_running_on_allocate_budget() {
665        assert_eq!(
666            advance(JobPhase::Ready, Signal::AllocateBudget).unwrap(),
667            JobPhase::Running
668        );
669    }
670
671    // ── Running terminates four ways ──────────────────────────
672
673    #[test]
674    fn running_to_succeeded_on_ok() {
675        assert_eq!(
676            advance(JobPhase::Running, Signal::ExecutionSucceeded).unwrap(),
677            JobPhase::Succeeded
678        );
679    }
680
681    #[test]
682    fn running_to_failed_on_err() {
683        assert_eq!(
684            advance(JobPhase::Running, Signal::ExecutionFailed).unwrap(),
685            JobPhase::Failed { attempts: 1 }
686        );
687    }
688
689    #[test]
690    fn running_to_failed_on_cancel() {
691        assert_eq!(
692            advance(JobPhase::Running, Signal::Cancel).unwrap(),
693            JobPhase::Failed { attempts: 1 }
694        );
695    }
696
697    #[test]
698    fn running_to_failed_on_timeout() {
699        assert_eq!(
700            advance(JobPhase::Running, Signal::Timeout).unwrap(),
701            JobPhase::Failed { attempts: 1 }
702        );
703    }
704
705    // ── Failed waits for retry decision ───────────────────────
706
707    #[test]
708    fn failed_to_retrying_when_retry_decided() {
709        assert_eq!(
710            advance(
711                JobPhase::Failed { attempts: 1 },
712                Signal::RetryDecide(RetryOutcome::Retry { until_ms: 12345 })
713            )
714            .unwrap(),
715            JobPhase::Retrying { until_ms: 12345 }
716        );
717    }
718
719    #[test]
720    fn failed_to_deadlettered_when_retries_exhausted() {
721        assert_eq!(
722            advance(
723                JobPhase::Failed { attempts: 3 },
724                Signal::RetryDecide(RetryOutcome::Deadletter)
725            )
726            .unwrap(),
727            JobPhase::Deadlettered
728        );
729    }
730
731    // ── Retrying re-enters Pending after backoff ───────────────
732
733    #[test]
734    fn retrying_to_pending_after_backoff() {
735        assert_eq!(
736            advance(JobPhase::Retrying { until_ms: 100 }, Signal::BackoffElapsed).unwrap(),
737            JobPhase::Pending
738        );
739    }
740
741    // ── Operator-driven transitions ───────────────────────────
742
743    #[test]
744    fn waiting_for_operator_to_ready_via_operator() {
745        assert_eq!(
746            advance(
747                JobPhase::WaitingForOperator,
748                Signal::OperatorTransition(JobPhase::Ready)
749            )
750            .unwrap(),
751            JobPhase::Ready
752        );
753    }
754
755    #[test]
756    fn waiting_for_operator_to_skipped_via_operator() {
757        let result = advance(
758            JobPhase::WaitingForOperator,
759            Signal::OperatorTransition(JobPhase::Skipped(SkipReason::OperatorDecision)),
760        )
761        .unwrap();
762        assert!(matches!(
763            result,
764            JobPhase::Skipped(SkipReason::OperatorDecision)
765        ));
766    }
767
768    #[test]
769    fn deadlettered_to_pending_via_operator() {
770        assert_eq!(
771            advance(
772                JobPhase::Deadlettered,
773                Signal::OperatorTransition(JobPhase::Pending)
774            )
775            .unwrap(),
776            JobPhase::Pending
777        );
778    }
779
780    // ── Illegal transitions ───────────────────────────────────
781
782    #[test]
783    fn pending_with_allocate_budget_is_illegal() {
784        let err = advance(JobPhase::Pending, Signal::AllocateBudget).unwrap_err();
785        assert_eq!(err.from, JobPhase::Pending);
786    }
787
788    #[test]
789    fn succeeded_with_anything_is_illegal_except_no_outbound() {
790        // Succeeded is terminal-for-cycle. No transition signals
791        // advance from it (operator re-runs land on Deadlettered →
792        // Pending, not Succeeded → anything).
793        let err = advance(JobPhase::Succeeded, Signal::AllocateBudget).unwrap_err();
794        assert_eq!(err.from, JobPhase::Succeeded);
795    }
796
797    #[test]
798    fn deadlettered_with_random_operator_transition_is_illegal() {
799        // Only Deadlettered → Pending via OperatorTransition.
800        let err = advance(
801            JobPhase::Deadlettered,
802            Signal::OperatorTransition(JobPhase::Ready),
803        )
804        .unwrap_err();
805        assert!(matches!(err.from, JobPhase::Deadlettered));
806    }
807
808    #[test]
809    fn running_with_evaluate_gates_is_illegal() {
810        // A Running job is past the gate phase; re-evaluating gates
811        // doesn't apply.
812        let err = advance(JobPhase::Running, pass()).unwrap_err();
813        assert_eq!(err.from, JobPhase::Running);
814    }
815
816    #[test]
817    fn waiting_for_operator_with_evaluate_gates_is_illegal() {
818        // WaitingForOperator requires explicit operator action.
819        let err = advance(JobPhase::WaitingForOperator, pass()).unwrap_err();
820        assert_eq!(err.from, JobPhase::WaitingForOperator);
821    }
822
823    #[test]
824    fn signal_is_operator_driven_classifier() {
825        assert!(Signal::OperatorTransition(JobPhase::Pending).is_operator_driven());
826        assert!(!Signal::AllocateBudget.is_operator_driven());
827        assert!(!pass().is_operator_driven());
828    }
829}
830
831#[cfg(test)]
832mod job_tests {
833    use super::*;
834
835    /// Sample Job impl — a no-op that succeeds. Verifies the trait
836    /// shape compiles + the ErasedJob blanket impl gives us
837    /// trait-object dispatch.
838    struct NoopJob;
839
840    #[derive(thiserror::Error, Debug)]
841    #[error("noop")]
842    struct NoopError;
843
844    #[async_trait::async_trait]
845    impl Job for NoopJob {
846        type Output = ();
847        type Error = NoopError;
848
849        fn id(&self) -> JobId {
850            JobId {
851                scope: JobScope::Global,
852                kind: JobKindId::new("noop"),
853                subject: JobSubject::None,
854            }
855        }
856
857        fn kind(&self) -> JobKindId {
858            JobKindId::new("noop")
859        }
860
861        async fn execute(&self) -> Result<(), NoopError> {
862            Ok(())
863        }
864    }
865
866    #[tokio::test]
867    async fn job_trait_compiles_and_executes() {
868        let j = NoopJob;
869        assert_eq!(<NoopJob as Job>::id(&j).kind.0, "noop");
870        assert!(j.execute().await.is_ok());
871    }
872
873    #[tokio::test]
874    async fn erased_job_blanket_impl_gives_trait_object() {
875        let j: Box<dyn ErasedJob> = Box::new(NoopJob);
876        assert_eq!(j.id().kind.0, "noop");
877        assert!(j.execute_erased().await.is_ok());
878    }
879
880    // ── RecordingJob tests ──────────────────────────────────────────
881
882    use std::sync::Arc;
883    use std::sync::Mutex;
884
885    /// Simple in-memory sink used to verify the blanket `Job::execute`
886    /// records outputs after `execute_body` succeeds.
887    #[derive(Default)]
888    struct CaptureSink<O: Clone + Send + Sync + 'static> {
889        records: Mutex<Vec<(JobId, O)>>,
890    }
891
892    #[async_trait::async_trait]
893    impl<O: Clone + Send + Sync + 'static> OutputSink<O> for CaptureSink<O> {
894        async fn record(&self, job_id: &JobId, output: &O) {
895            self.records
896                .lock()
897                .expect("CaptureSink mutex poisoned")
898                .push((job_id.clone(), output.clone()));
899        }
900    }
901
902    /// Reference impl exercising every `RecordingJob` callback.
903    struct RecJob {
904        scope: JobScope,
905        subject: JobSubject,
906        sink: Option<Arc<dyn OutputSink<u32>>>,
907        answer: u32,
908    }
909
910    #[async_trait::async_trait]
911    impl RecordingJob for RecJob {
912        type Output = u32;
913        type Error = NoopError;
914        const KIND: &'static str = "test-recording";
915
916        fn scope(&self) -> JobScope {
917            self.scope.clone()
918        }
919        fn subject(&self) -> JobSubject {
920            self.subject.clone()
921        }
922        fn output_sink(&self) -> Option<&Arc<dyn OutputSink<Self::Output>>> {
923            self.sink.as_ref()
924        }
925        async fn execute_body(&self) -> Result<u32, NoopError> {
926            Ok(self.answer)
927        }
928    }
929
930    #[tokio::test]
931    async fn recording_job_blanket_provides_job_id_and_kind() {
932        let job = RecJob {
933            scope: JobScope::Workspace("ws".into()),
934            subject: JobSubject::Repo("r".into()),
935            sink: None,
936            answer: 1,
937        };
938        let id = <RecJob as Job>::id(&job);
939        assert_eq!(id.kind.0, "test-recording");
940        match id.scope {
941            JobScope::Workspace(w) => assert_eq!(w, "ws"),
942            _ => panic!("wrong scope"),
943        }
944        match id.subject {
945            JobSubject::Repo(r) => assert_eq!(r, "r"),
946            _ => panic!("wrong subject"),
947        }
948        let kind = <RecJob as Job>::kind(&job);
949        assert_eq!(kind.0, "test-recording");
950    }
951
952    #[tokio::test]
953    async fn recording_job_blanket_execute_records_to_sink_on_success() {
954        let sink: Arc<CaptureSink<u32>> = Arc::new(CaptureSink::default());
955        let sink_dyn: Arc<dyn OutputSink<u32>> = sink.clone();
956        let job = RecJob {
957            scope: JobScope::Global,
958            subject: JobSubject::None,
959            sink: Some(sink_dyn),
960            answer: 42,
961        };
962        let result = <RecJob as Job>::execute(&job).await.unwrap();
963        assert_eq!(result, 42);
964
965        let recs = sink.records.lock().unwrap();
966        assert_eq!(recs.len(), 1, "sink should have captured one record");
967        assert_eq!(recs[0].1, 42);
968    }
969
970    #[tokio::test]
971    async fn recording_job_without_sink_skips_recording() {
972        let job = RecJob {
973            scope: JobScope::Global,
974            subject: JobSubject::None,
975            sink: None,
976            answer: 7,
977        };
978        // execute returns Ok with the typed Output; no sink to verify
979        // against — just confirm the absent-sink path doesn't panic.
980        let result = <RecJob as Job>::execute(&job).await.unwrap();
981        assert_eq!(result, 7);
982    }
983}
984
985#[cfg(test)]
986mod snapshot_tests {
987    use super::*;
988    use std::collections::HashMap;
989
990    fn id(name: &str) -> JobId {
991        JobId {
992            scope: JobScope::Global,
993            kind: JobKindId::new("k"),
994            subject: JobSubject::Pinned(name.into()),
995        }
996    }
997
998    fn snapshot_with(entries: Vec<(&str, JobPhase)>) -> Snapshot {
999        let mut phases: HashMap<JobId, JobPhase> = HashMap::new();
1000        for (name, phase) in entries {
1001            phases.insert(id(name), phase);
1002        }
1003        Snapshot { phases }
1004    }
1005
1006    #[test]
1007    fn failure_set_includes_failed_retrying_deadlettered() {
1008        let s = snapshot_with(vec![
1009            ("ok", JobPhase::Succeeded),
1010            ("dead", JobPhase::Deadlettered),
1011            ("flap", JobPhase::Failed { attempts: 2 }),
1012            ("waiting", JobPhase::WaitingForOperator),
1013            ("retry", JobPhase::Retrying { until_ms: 0 }),
1014            ("ready", JobPhase::Ready),
1015        ]);
1016        let fs = s.failure_set();
1017        let names: std::collections::HashSet<String> = fs
1018            .iter()
1019            .filter_map(|(id, _)| match &id.subject {
1020                JobSubject::Pinned(s) => Some(s.clone()),
1021                _ => None,
1022            })
1023            .collect();
1024        assert_eq!(names.len(), 3);
1025        assert!(names.contains("dead"));
1026        assert!(names.contains("flap"));
1027        assert!(names.contains("retry"));
1028        // WaitingForOperator does NOT appear (it's unhealed, not
1029        // a failure per §VII.2).
1030        assert!(!names.contains("waiting"));
1031        // Ok / Ready don't appear either.
1032        assert!(!names.contains("ok"));
1033        assert!(!names.contains("ready"));
1034    }
1035
1036    #[test]
1037    fn phase_counts_summarizes_every_phase() {
1038        let s = snapshot_with(vec![
1039            ("a", JobPhase::Pending),
1040            ("b", JobPhase::Pending),
1041            ("c", JobPhase::Succeeded),
1042            ("d", JobPhase::Deadlettered),
1043        ]);
1044        let counts = s.phase_counts();
1045        assert_eq!(counts.get("pending"), Some(&2));
1046        assert_eq!(counts.get("succeeded"), Some(&1));
1047        assert_eq!(counts.get("deadlettered"), Some(&1));
1048        // Absent phases don't appear (no 0-counts).
1049        assert!(counts.get("ready").is_none());
1050    }
1051}