#![forbid(unsafe_code)]
use shigoto_dag::Dag;
use shigoto_types::{GateAggregate, JobId, JobPhase, SkipReason, Snapshot};
pub trait Gate: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn evaluate(&self, ctx: &GateContext) -> GateOutcome;
}
pub struct GateContext<'a> {
pub job_id: &'a JobId,
pub snapshot: &'a Snapshot,
pub dag: &'a Dag,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GateOutcome {
Pass,
Wait,
Skip(SkipReason),
}
#[must_use]
pub fn reduce(outcomes: &[GateOutcome]) -> GateAggregate {
let mut any_wait = false;
for o in outcomes {
match o {
GateOutcome::Skip(reason) => return GateAggregate::Skipped(reason.clone()),
GateOutcome::Wait => any_wait = true,
GateOutcome::Pass => {}
}
}
if any_wait {
GateAggregate::SomeWaiting
} else {
GateAggregate::AllPassed
}
}
pub struct AllUpstreamsTerminal;
impl Gate for AllUpstreamsTerminal {
fn name(&self) -> &'static str {
"all-upstreams-terminal"
}
fn evaluate(&self, ctx: &GateContext) -> GateOutcome {
for pred in ctx.dag.predecessors(ctx.job_id) {
let phase = ctx
.snapshot
.phases
.get(&pred)
.cloned()
.unwrap_or(JobPhase::Pending);
if !is_terminal(&phase) {
return GateOutcome::Wait;
}
}
GateOutcome::Pass
}
}
pub struct OperatorApproved {
name: &'static str,
check: Box<dyn Fn() -> bool + Send + Sync>,
}
impl OperatorApproved {
pub fn new(name: &'static str, check: impl Fn() -> bool + Send + Sync + 'static) -> Self {
Self {
name,
check: Box::new(check),
}
}
}
impl Gate for OperatorApproved {
fn name(&self) -> &'static str {
self.name
}
fn evaluate(&self, _ctx: &GateContext) -> GateOutcome {
if (self.check)() {
GateOutcome::Pass
} else {
GateOutcome::Wait
}
}
}
fn is_terminal(phase: &JobPhase) -> bool {
matches!(
phase,
JobPhase::Succeeded | JobPhase::Skipped(_) | JobPhase::Deadlettered
)
}
#[cfg(test)]
mod tests {
use super::*;
use shigoto_types::{JobKindId, JobScope, JobSubject};
use std::collections::HashMap;
fn n(name: &str) -> JobId {
JobId {
scope: JobScope::Global,
kind: JobKindId::new("test"),
subject: JobSubject::Pinned(name.into()),
}
}
#[test]
fn reduce_empty_is_all_passed() {
assert_eq!(reduce(&[]), GateAggregate::AllPassed);
}
#[test]
fn reduce_all_pass_is_all_passed() {
assert_eq!(
reduce(&[GateOutcome::Pass, GateOutcome::Pass]),
GateAggregate::AllPassed
);
}
#[test]
fn reduce_one_wait_is_some_waiting() {
assert_eq!(
reduce(&[GateOutcome::Pass, GateOutcome::Wait, GateOutcome::Pass]),
GateAggregate::SomeWaiting
);
}
#[test]
fn reduce_skip_short_circuits_to_skipped() {
assert_eq!(
reduce(&[
GateOutcome::Wait,
GateOutcome::Skip(SkipReason::GateRejected),
GateOutcome::Pass,
]),
GateAggregate::Skipped(SkipReason::GateRejected)
);
}
#[test]
fn reduce_skip_takes_priority_over_wait() {
assert_eq!(
reduce(&[
GateOutcome::Skip(SkipReason::Other("first".into())),
GateOutcome::Wait,
]),
GateAggregate::Skipped(SkipReason::Other("first".into()))
);
}
#[test]
fn all_upstreams_terminal_passes_for_root_with_no_predecessors() {
let dag = Dag::new();
let snapshot = Snapshot {
phases: HashMap::new(),
};
let root = n("root");
let ctx = GateContext {
job_id: &root,
snapshot: &snapshot,
dag: &dag,
};
assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
}
#[test]
fn all_upstreams_terminal_waits_when_upstream_is_pending() {
let mut dag = Dag::new();
dag.add_edge(n("root"), n("leaf"));
let mut phases = HashMap::new();
phases.insert(n("root"), JobPhase::Pending);
let snapshot = Snapshot { phases };
let leaf = n("leaf");
let ctx = GateContext {
job_id: &leaf,
snapshot: &snapshot,
dag: &dag,
};
assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Wait);
}
#[test]
fn all_upstreams_terminal_passes_when_upstream_succeeded() {
let mut dag = Dag::new();
dag.add_edge(n("root"), n("leaf"));
let mut phases = HashMap::new();
phases.insert(n("root"), JobPhase::Succeeded);
let snapshot = Snapshot { phases };
let leaf = n("leaf");
let ctx = GateContext {
job_id: &leaf,
snapshot: &snapshot,
dag: &dag,
};
assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
}
#[test]
fn all_upstreams_terminal_passes_when_upstream_deadlettered() {
let mut dag = Dag::new();
dag.add_edge(n("root"), n("leaf"));
let mut phases = HashMap::new();
phases.insert(n("root"), JobPhase::Deadlettered);
let snapshot = Snapshot { phases };
let leaf = n("leaf");
let ctx = GateContext {
job_id: &leaf,
snapshot: &snapshot,
dag: &dag,
};
assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Pass);
}
#[test]
fn all_upstreams_terminal_waits_when_any_upstream_pending() {
let mut dag = Dag::new();
dag.add_edge(n("a"), n("leaf"));
dag.add_edge(n("b"), n("leaf"));
let mut phases = HashMap::new();
phases.insert(n("a"), JobPhase::Succeeded);
phases.insert(n("b"), JobPhase::Running);
let snapshot = Snapshot { phases };
let leaf = n("leaf");
let ctx = GateContext {
job_id: &leaf,
snapshot: &snapshot,
dag: &dag,
};
assert_eq!(AllUpstreamsTerminal.evaluate(&ctx), GateOutcome::Wait);
}
#[test]
fn operator_approved_waits_when_check_false() {
let dag = Dag::new();
let snapshot = Snapshot {
phases: HashMap::new(),
};
let job = n("x");
let ctx = GateContext {
job_id: &job,
snapshot: &snapshot,
dag: &dag,
};
let gate = OperatorApproved::new("manual-approve", || false);
assert_eq!(gate.evaluate(&ctx), GateOutcome::Wait);
assert_eq!(gate.name(), "manual-approve");
}
#[test]
fn operator_approved_passes_when_check_true() {
let dag = Dag::new();
let snapshot = Snapshot {
phases: HashMap::new(),
};
let job = n("x");
let ctx = GateContext {
job_id: &job,
snapshot: &snapshot,
dag: &dag,
};
let gate = OperatorApproved::new("manual-approve", || true);
assert_eq!(gate.evaluate(&ctx), GateOutcome::Pass);
}
}