Skip to main content

shigoto_gate/
lib.rs

1//! shigoto-gate — typed precondition Gate trait + standard gates.
2//!
3//! Spec: `theory/SHIGOTO.md` §III.9. Gates are PURE — they evaluate
4//! against the scheduler snapshot without IO. IO-dependent gating is
5//! itself a Job that produces a typed fact a downstream gate checks.
6
7#![forbid(unsafe_code)]
8
9use shigoto_dag::Dag;
10use shigoto_types::{GateAggregate, JobId, JobPhase, SkipReason, Snapshot};
11
12/// One typed precondition. Pure — no IO. Gate impls that "need"
13/// IO are antipatterns; the right shape is a Job that emits a typed
14/// fact and a downstream gate that checks the fact.
15pub trait Gate: Send + Sync + 'static {
16    /// Human-readable identifier. Surfaces in transition reasons +
17    /// audit events.
18    fn name(&self) -> &'static str;
19
20    /// Evaluate this gate against the current scheduler state for the
21    /// given job. Returns `Pass | Wait | Skip(reason)`.
22    fn evaluate(&self, ctx: &GateContext) -> GateOutcome;
23}
24
25/// Everything a Gate needs to make its decision: the job in question,
26/// the FSM snapshot (every other job's phase), the DAG (so gates
27/// like `AllUpstreamsTerminal` can ask about predecessors).
28pub struct GateContext<'a> {
29    pub job_id: &'a JobId,
30    pub snapshot: &'a Snapshot,
31    pub dag: &'a Dag,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum GateOutcome {
36    /// This gate is satisfied; the job may advance past it.
37    Pass,
38    /// This gate is not yet satisfied but expects to be later. The
39    /// scheduler re-evaluates on each tick.
40    Wait,
41    /// This gate refuses the job permanently. The job advances to
42    /// `Skipped(reason)` — terminal.
43    Skip(SkipReason),
44}
45
46/// Reduce a cohort of Gate outcomes to one `GateAggregate` that the
47/// FSM `advance()` consumes. Worst-outcome wins: any Skip → Skipped;
48/// no Skip but any Wait → SomeWaiting; otherwise → AllPassed. The
49/// first Skip's reason is preserved.
50#[must_use]
51pub fn reduce(outcomes: &[GateOutcome]) -> GateAggregate {
52    // Cohorts of size 0 trivially pass — a job with no gates can run
53    // immediately. The scheduler's default behavior matches.
54    let mut any_wait = false;
55    for o in outcomes {
56        match o {
57            GateOutcome::Skip(reason) => return GateAggregate::Skipped(reason.clone()),
58            GateOutcome::Wait => any_wait = true,
59            GateOutcome::Pass => {}
60        }
61    }
62    if any_wait {
63        GateAggregate::SomeWaiting
64    } else {
65        GateAggregate::AllPassed
66    }
67}
68
69// ── Standard gates ───────────────────────────────────────────────────
70
71/// `AllUpstreamsTerminal` — every direct DAG predecessor has reached
72/// a terminal phase ({Succeeded, Skipped, Deadlettered}). The
73/// scheduler implicitly applies this gate to enforce DAG edge
74/// semantics; consumers don't normally register it explicitly.
75///
76/// Pass iff every predecessor is terminal. Wait if any predecessor
77/// is non-terminal. (We never Skip from this gate — a Deadlettered
78/// ancestor releases its descendants per §VII.4's cascading-skip
79/// rule, but the descendant's own gate cohort decides whether to
80/// run.)
81pub struct AllUpstreamsTerminal;
82
83impl Gate for AllUpstreamsTerminal {
84    fn name(&self) -> &'static str {
85        "all-upstreams-terminal"
86    }
87
88    fn evaluate(&self, ctx: &GateContext) -> GateOutcome {
89        for pred in ctx.dag.predecessors(ctx.job_id) {
90            let phase = ctx
91                .snapshot
92                .phases
93                .get(&pred)
94                .cloned()
95                .unwrap_or(JobPhase::Pending);
96            if !is_terminal(&phase) {
97                return GateOutcome::Wait;
98            }
99        }
100        GateOutcome::Pass
101    }
102}
103
104/// `OperatorApproved` — pass iff an external operator has flipped a
105/// pre-arranged flag. The flag itself lives in the consumer's state
106/// store; this gate is a thin wrapper that holds an `Arc<AtomicBool>`
107/// or similar. v0.1 ships with a `Closure` variant that takes a
108/// `Fn() -> bool` for tests + ad-hoc cases.
109pub struct OperatorApproved {
110    name: &'static str,
111    check: Box<dyn Fn() -> bool + Send + Sync>,
112}
113
114impl OperatorApproved {
115    pub fn new(name: &'static str, check: impl Fn() -> bool + Send + Sync + 'static) -> Self {
116        Self {
117            name,
118            check: Box::new(check),
119        }
120    }
121}
122
123impl Gate for OperatorApproved {
124    fn name(&self) -> &'static str {
125        self.name
126    }
127
128    fn evaluate(&self, _ctx: &GateContext) -> GateOutcome {
129        if (self.check)() {
130            GateOutcome::Pass
131        } else {
132            GateOutcome::Wait
133        }
134    }
135}
136
137// ── Internals ────────────────────────────────────────────────────────
138
139fn is_terminal(phase: &JobPhase) -> bool {
140    matches!(
141        phase,
142        JobPhase::Succeeded | JobPhase::Skipped(_) | JobPhase::Deadlettered
143    )
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use shigoto_types::{JobKindId, JobScope, JobSubject};
150    use std::collections::HashMap;
151
152    fn n(name: &str) -> JobId {
153        JobId {
154            scope: JobScope::Global,
155            kind: JobKindId::new("test"),
156            subject: JobSubject::Pinned(name.into()),
157        }
158    }
159
160    // ── reduce ────────────────────────────────────────────────
161
162    #[test]
163    fn reduce_empty_is_all_passed() {
164        assert_eq!(reduce(&[]), GateAggregate::AllPassed);
165    }
166
167    #[test]
168    fn reduce_all_pass_is_all_passed() {
169        assert_eq!(
170            reduce(&[GateOutcome::Pass, GateOutcome::Pass]),
171            GateAggregate::AllPassed
172        );
173    }
174
175    #[test]
176    fn reduce_one_wait_is_some_waiting() {
177        assert_eq!(
178            reduce(&[GateOutcome::Pass, GateOutcome::Wait, GateOutcome::Pass]),
179            GateAggregate::SomeWaiting
180        );
181    }
182
183    #[test]
184    fn reduce_skip_short_circuits_to_skipped() {
185        assert_eq!(
186            reduce(&[
187                GateOutcome::Wait,
188                GateOutcome::Skip(SkipReason::GateRejected),
189                GateOutcome::Pass,
190            ]),
191            GateAggregate::Skipped(SkipReason::GateRejected)
192        );
193    }
194
195    #[test]
196    fn reduce_skip_takes_priority_over_wait() {
197        // First Skip wins; subsequent Wait/Pass don't matter.
198        assert_eq!(
199            reduce(&[
200                GateOutcome::Skip(SkipReason::Other("first".into())),
201                GateOutcome::Wait,
202            ]),
203            GateAggregate::Skipped(SkipReason::Other("first".into()))
204        );
205    }
206
207    // ── AllUpstreamsTerminal ──────────────────────────────────
208
209    #[test]
210    fn all_upstreams_terminal_passes_for_root_with_no_predecessors() {
211        let dag = Dag::new();
212        let snapshot = Snapshot {
213            phases: HashMap::new(),
214        };
215        let root = n("root");
216        let ctx = GateContext {
217            job_id: &root,
218            snapshot: &snapshot,
219            dag: &dag,
220        };
221        assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
222    }
223
224    #[test]
225    fn all_upstreams_terminal_waits_when_upstream_is_pending() {
226        let mut dag = Dag::new();
227        dag.add_edge(n("root"), n("leaf"));
228        let mut phases = HashMap::new();
229        phases.insert(n("root"), JobPhase::Pending);
230        let snapshot = Snapshot { phases };
231        let leaf = n("leaf");
232        let ctx = GateContext {
233            job_id: &leaf,
234            snapshot: &snapshot,
235            dag: &dag,
236        };
237        assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Wait);
238    }
239
240    #[test]
241    fn all_upstreams_terminal_passes_when_upstream_succeeded() {
242        let mut dag = Dag::new();
243        dag.add_edge(n("root"), n("leaf"));
244        let mut phases = HashMap::new();
245        phases.insert(n("root"), JobPhase::Succeeded);
246        let snapshot = Snapshot { phases };
247        let leaf = n("leaf");
248        let ctx = GateContext {
249            job_id: &leaf,
250            snapshot: &snapshot,
251            dag: &dag,
252        };
253        assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
254    }
255
256    #[test]
257    fn all_upstreams_terminal_passes_when_upstream_deadlettered() {
258        // Per §VII.4: a deadlettered ancestor releases its descendants.
259        let mut dag = Dag::new();
260        dag.add_edge(n("root"), n("leaf"));
261        let mut phases = HashMap::new();
262        phases.insert(n("root"), JobPhase::Deadlettered);
263        let snapshot = Snapshot { phases };
264        let leaf = n("leaf");
265        let ctx = GateContext {
266            job_id: &leaf,
267            snapshot: &snapshot,
268            dag: &dag,
269        };
270        assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
271    }
272
273    #[test]
274    fn all_upstreams_terminal_waits_when_any_upstream_pending() {
275        // Two upstreams; one succeeded, one pending → wait.
276        let mut dag = Dag::new();
277        dag.add_edge(n("a"), n("leaf"));
278        dag.add_edge(n("b"), n("leaf"));
279        let mut phases = HashMap::new();
280        phases.insert(n("a"), JobPhase::Succeeded);
281        phases.insert(n("b"), JobPhase::Running);
282        let snapshot = Snapshot { phases };
283        let leaf = n("leaf");
284        let ctx = GateContext {
285            job_id: &leaf,
286            snapshot: &snapshot,
287            dag: &dag,
288        };
289        assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Wait);
290    }
291
292    // ── OperatorApproved ──────────────────────────────────────
293
294    #[test]
295    fn operator_approved_waits_when_check_false() {
296        let dag = Dag::new();
297        let snapshot = Snapshot {
298            phases: HashMap::new(),
299        };
300        let job = n("x");
301        let ctx = GateContext {
302            job_id: &job,
303            snapshot: &snapshot,
304            dag: &dag,
305        };
306        let gate = OperatorApproved::new("manual-approve", || false);
307        assert_eq!(gate.evaluate(&ctx), GateOutcome::Wait);
308        assert_eq!(gate.name(), "manual-approve");
309    }
310
311    #[test]
312    fn operator_approved_passes_when_check_true() {
313        let dag = Dag::new();
314        let snapshot = Snapshot {
315            phases: HashMap::new(),
316        };
317        let job = n("x");
318        let ctx = GateContext {
319            job_id: &job,
320            snapshot: &snapshot,
321            dag: &dag,
322        };
323        let gate = OperatorApproved::new("manual-approve", || true);
324        assert_eq!(gate.evaluate(&ctx), GateOutcome::Pass);
325    }
326}