1#![forbid(unsafe_code)]
8
9use shigoto_dag::Dag;
10use shigoto_types::{GateAggregate, JobId, JobPhase, SkipReason, Snapshot};
11
12pub trait Gate: Send + Sync + 'static {
16 fn name(&self) -> &'static str;
19
20 fn evaluate(&self, ctx: &GateContext) -> GateOutcome;
23}
24
25pub struct GateContext<'a> {
29 pub job_id: &'a JobId,
30 pub snapshot: &'a Snapshot,
31 pub dag: &'a Dag,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum GateOutcome {
36 Pass,
38 Wait,
41 Skip(SkipReason),
44}
45
46#[must_use]
51pub fn reduce(outcomes: &[GateOutcome]) -> GateAggregate {
52 let mut any_wait = false;
55 for o in outcomes {
56 match o {
57 GateOutcome::Skip(reason) => return GateAggregate::Skipped(reason.clone()),
58 GateOutcome::Wait => any_wait = true,
59 GateOutcome::Pass => {}
60 }
61 }
62 if any_wait {
63 GateAggregate::SomeWaiting
64 } else {
65 GateAggregate::AllPassed
66 }
67}
68
69pub struct AllUpstreamsTerminal;
82
83impl Gate for AllUpstreamsTerminal {
84 fn name(&self) -> &'static str {
85 "all-upstreams-terminal"
86 }
87
88 fn evaluate(&self, ctx: &GateContext) -> GateOutcome {
89 for pred in ctx.dag.predecessors(ctx.job_id) {
90 let phase = ctx
91 .snapshot
92 .phases
93 .get(&pred)
94 .cloned()
95 .unwrap_or(JobPhase::Pending);
96 if !is_terminal(&phase) {
97 return GateOutcome::Wait;
98 }
99 }
100 GateOutcome::Pass
101 }
102}
103
104pub struct OperatorApproved {
110 name: &'static str,
111 check: Box<dyn Fn() -> bool + Send + Sync>,
112}
113
114impl OperatorApproved {
115 pub fn new(name: &'static str, check: impl Fn() -> bool + Send + Sync + 'static) -> Self {
116 Self {
117 name,
118 check: Box::new(check),
119 }
120 }
121}
122
123impl Gate for OperatorApproved {
124 fn name(&self) -> &'static str {
125 self.name
126 }
127
128 fn evaluate(&self, _ctx: &GateContext) -> GateOutcome {
129 if (self.check)() {
130 GateOutcome::Pass
131 } else {
132 GateOutcome::Wait
133 }
134 }
135}
136
137fn is_terminal(phase: &JobPhase) -> bool {
140 matches!(
141 phase,
142 JobPhase::Succeeded | JobPhase::Skipped(_) | JobPhase::Deadlettered
143 )
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use shigoto_types::{JobKindId, JobScope, JobSubject};
150 use std::collections::HashMap;
151
152 fn n(name: &str) -> JobId {
153 JobId {
154 scope: JobScope::Global,
155 kind: JobKindId::new("test"),
156 subject: JobSubject::Pinned(name.into()),
157 }
158 }
159
160 #[test]
163 fn reduce_empty_is_all_passed() {
164 assert_eq!(reduce(&[]), GateAggregate::AllPassed);
165 }
166
167 #[test]
168 fn reduce_all_pass_is_all_passed() {
169 assert_eq!(
170 reduce(&[GateOutcome::Pass, GateOutcome::Pass]),
171 GateAggregate::AllPassed
172 );
173 }
174
175 #[test]
176 fn reduce_one_wait_is_some_waiting() {
177 assert_eq!(
178 reduce(&[GateOutcome::Pass, GateOutcome::Wait, GateOutcome::Pass]),
179 GateAggregate::SomeWaiting
180 );
181 }
182
183 #[test]
184 fn reduce_skip_short_circuits_to_skipped() {
185 assert_eq!(
186 reduce(&[
187 GateOutcome::Wait,
188 GateOutcome::Skip(SkipReason::GateRejected),
189 GateOutcome::Pass,
190 ]),
191 GateAggregate::Skipped(SkipReason::GateRejected)
192 );
193 }
194
195 #[test]
196 fn reduce_skip_takes_priority_over_wait() {
197 assert_eq!(
199 reduce(&[
200 GateOutcome::Skip(SkipReason::Other("first".into())),
201 GateOutcome::Wait,
202 ]),
203 GateAggregate::Skipped(SkipReason::Other("first".into()))
204 );
205 }
206
207 #[test]
210 fn all_upstreams_terminal_passes_for_root_with_no_predecessors() {
211 let dag = Dag::new();
212 let snapshot = Snapshot {
213 phases: HashMap::new(),
214 };
215 let root = n("root");
216 let ctx = GateContext {
217 job_id: &root,
218 snapshot: &snapshot,
219 dag: &dag,
220 };
221 assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
222 }
223
224 #[test]
225 fn all_upstreams_terminal_waits_when_upstream_is_pending() {
226 let mut dag = Dag::new();
227 dag.add_edge(n("root"), n("leaf"));
228 let mut phases = HashMap::new();
229 phases.insert(n("root"), JobPhase::Pending);
230 let snapshot = Snapshot { phases };
231 let leaf = n("leaf");
232 let ctx = GateContext {
233 job_id: &leaf,
234 snapshot: &snapshot,
235 dag: &dag,
236 };
237 assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Wait);
238 }
239
240 #[test]
241 fn all_upstreams_terminal_passes_when_upstream_succeeded() {
242 let mut dag = Dag::new();
243 dag.add_edge(n("root"), n("leaf"));
244 let mut phases = HashMap::new();
245 phases.insert(n("root"), JobPhase::Succeeded);
246 let snapshot = Snapshot { phases };
247 let leaf = n("leaf");
248 let ctx = GateContext {
249 job_id: &leaf,
250 snapshot: &snapshot,
251 dag: &dag,
252 };
253 assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
254 }
255
256 #[test]
257 fn all_upstreams_terminal_passes_when_upstream_deadlettered() {
258 let mut dag = Dag::new();
260 dag.add_edge(n("root"), n("leaf"));
261 let mut phases = HashMap::new();
262 phases.insert(n("root"), JobPhase::Deadlettered);
263 let snapshot = Snapshot { phases };
264 let leaf = n("leaf");
265 let ctx = GateContext {
266 job_id: &leaf,
267 snapshot: &snapshot,
268 dag: &dag,
269 };
270 assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
271 }
272
273 #[test]
274 fn all_upstreams_terminal_waits_when_any_upstream_pending() {
275 let mut dag = Dag::new();
277 dag.add_edge(n("a"), n("leaf"));
278 dag.add_edge(n("b"), n("leaf"));
279 let mut phases = HashMap::new();
280 phases.insert(n("a"), JobPhase::Succeeded);
281 phases.insert(n("b"), JobPhase::Running);
282 let snapshot = Snapshot { phases };
283 let leaf = n("leaf");
284 let ctx = GateContext {
285 job_id: &leaf,
286 snapshot: &snapshot,
287 dag: &dag,
288 };
289 assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Wait);
290 }
291
292 #[test]
295 fn operator_approved_waits_when_check_false() {
296 let dag = Dag::new();
297 let snapshot = Snapshot {
298 phases: HashMap::new(),
299 };
300 let job = n("x");
301 let ctx = GateContext {
302 job_id: &job,
303 snapshot: &snapshot,
304 dag: &dag,
305 };
306 let gate = OperatorApproved::new("manual-approve", || false);
307 assert_eq!(gate.evaluate(&ctx), GateOutcome::Wait);
308 assert_eq!(gate.name(), "manual-approve");
309 }
310
311 #[test]
312 fn operator_approved_passes_when_check_true() {
313 let dag = Dag::new();
314 let snapshot = Snapshot {
315 phases: HashMap::new(),
316 };
317 let job = n("x");
318 let ctx = GateContext {
319 job_id: &job,
320 snapshot: &snapshot,
321 dag: &dag,
322 };
323 let gate = OperatorApproved::new("manual-approve", || true);
324 assert_eq!(gate.evaluate(&ctx), GateOutcome::Pass);
325 }
326}