#![forbid(unsafe_code)]
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
pub mod failure;
pub use failure::{classify, signature, Failure, FailureKind};
pub mod sink;
pub use sink::{AuditFileSink, InMemorySink, MultiSink, NullSink, Sink};
pub mod chain;
pub use chain::Chain;
pub mod classify;
pub use classify::{ChainedClassifier, Classifier, FailureClassifier, FnClassifier};
pub mod watch;
pub use watch::{EscalationRouting, ScheduleWindow, TimeoutWatcher, WatchAction, WatchRule};
pub mod policy;
pub use policy::CascadePolicy;
pub mod decision;
pub use decision::Decision;
pub mod testing;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct JobId {
pub scope: JobScope,
pub kind: JobKindId,
pub subject: JobSubject,
}
#[derive(
Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
gen_platform::Discriminant,
gen_platform::IsVariant,
)]
#[discriminant(method = "kind", case = "kebab")]
pub enum JobScope {
Global,
Workspace(String),
Repo { workspace: String, repo: String },
}
#[derive(
Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
gen_platform::Discriminant,
gen_platform::IsVariant,
)]
#[discriminant(method = "kind", case = "kebab")]
pub enum JobSubject {
None,
Repo(String),
Org(String),
Path(PathBuf),
Pinned(String),
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct JobKindId(pub String);
impl JobKindId {
#[must_use]
pub fn new(s: impl Into<String>) -> Self {
Self(s.into())
}
}
#[derive(
Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
gen_platform::Discriminant,
gen_platform::IsVariant,
)]
#[discriminant(method = "kind", case = "kebab")]
pub enum JobPhase {
Pending,
Gated,
Ready,
Running,
Succeeded,
Failed { attempts: u32 },
Retrying { until_ms: i64 },
Skipped(SkipReason),
Deadlettered,
WaitingForOperator,
}
#[derive(
Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
gen_platform::Discriminant,
gen_platform::IsVariant,
)]
#[discriminant(method = "kind", case = "kebab")]
pub enum SkipReason {
GateRejected,
BlockedByDeadletteredAncestor,
OperatorDecision,
Other(String),
}
#[derive(
Debug, Clone, PartialEq, Eq,
gen_platform::Discriminant,
gen_platform::IsVariant,
)]
#[discriminant(method = "kind", case = "kebab")]
pub enum Signal {
EvaluateGates(GateAggregate),
AllocateBudget,
ExecutionSucceeded,
ExecutionFailed,
RetryDecide(RetryOutcome),
Cancel,
Timeout,
BackoffElapsed,
OperatorTransition(JobPhase),
}
#[derive(
Debug, Clone, PartialEq, Eq,
gen_platform::Discriminant,
gen_platform::IsVariant,
)]
#[discriminant(method = "kind", case = "kebab")]
pub enum GateAggregate {
AllPassed,
SomeWaiting,
Skipped(SkipReason),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, gen_platform::TypedDispatcher)]
pub enum RetryOutcome {
Retry { until_ms: i64 },
Deadletter,
}
gen_platform::register_dispatcher!("shigoto.retry-outcome", RetryOutcome);
#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
#[error("illegal transition: {from:?} cannot consume {signal:?}")]
pub struct IllegalTransition {
pub from: JobPhase,
pub signal: Signal,
}
impl Signal {
#[must_use]
pub fn is_operator_driven(&self) -> bool {
matches!(self, Self::OperatorTransition(_))
}
}
pub fn advance(from: JobPhase, signal: Signal) -> Result<JobPhase, IllegalTransition> {
use Signal::*;
let new = match (&from, &signal) {
(JobPhase::Pending, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
(JobPhase::Pending, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
(JobPhase::Pending, EvaluateGates(GateAggregate::Skipped(r))) => {
JobPhase::Skipped(r.clone())
}
(JobPhase::Gated, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
(JobPhase::Gated, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
(JobPhase::Gated, EvaluateGates(GateAggregate::Skipped(r))) => {
JobPhase::Skipped(r.clone())
}
(JobPhase::Ready, AllocateBudget) => JobPhase::Running,
(JobPhase::Ready, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
(JobPhase::Ready, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
(JobPhase::Ready, EvaluateGates(GateAggregate::Skipped(r))) => {
JobPhase::Skipped(r.clone())
}
(JobPhase::Running, ExecutionSucceeded) => JobPhase::Succeeded,
(JobPhase::Running, ExecutionFailed) => JobPhase::Failed { attempts: 1 },
(JobPhase::Running, Cancel) => JobPhase::Failed { attempts: 1 },
(JobPhase::Running, Timeout) => JobPhase::Failed { attempts: 1 },
(JobPhase::Failed { attempts: _ }, RetryDecide(RetryOutcome::Retry { until_ms })) => {
JobPhase::Retrying {
until_ms: *until_ms,
}
}
(JobPhase::Failed { .. }, RetryDecide(RetryOutcome::Deadletter)) => JobPhase::Deadlettered,
(JobPhase::Retrying { .. }, BackoffElapsed) => JobPhase::Pending,
(JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
(JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
(JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::Skipped(r))) => {
JobPhase::Skipped(r.clone())
}
(JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Ready)) => JobPhase::Ready,
(JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Skipped(r))) => {
JobPhase::Skipped(r.clone())
}
(JobPhase::Deadlettered, OperatorTransition(JobPhase::Pending)) => JobPhase::Pending,
_ => return Err(IllegalTransition { from, signal }),
};
Ok(new)
}
pub trait JobInput: Send + Sync + 'static {}
pub trait JobOutput: Send + Sync + 'static {}
pub trait JobError: std::error::Error + Send + Sync + 'static {}
#[async_trait::async_trait]
pub trait OutputSink<O>: Send + Sync + 'static
where
O: Send + Sync + 'static,
{
async fn record(&self, job_id: &JobId, output: &O);
}
#[async_trait::async_trait]
pub trait RecordingJob: Send + Sync + 'static {
type Output: Send + Sync + Clone + 'static;
type Error: std::error::Error + Send + Sync + 'static;
const KIND: &'static str;
fn scope(&self) -> JobScope;
fn subject(&self) -> JobSubject;
fn output_sink(&self) -> Option<&std::sync::Arc<dyn OutputSink<Self::Output>>>;
async fn execute_body(&self) -> Result<Self::Output, Self::Error>;
}
#[async_trait::async_trait]
impl<T: RecordingJob> Job for T {
type Output = T::Output;
type Error = T::Error;
fn id(&self) -> JobId {
JobId {
scope: self.scope(),
kind: JobKindId::new(T::KIND),
subject: self.subject(),
}
}
fn kind(&self) -> JobKindId {
JobKindId::new(T::KIND)
}
async fn execute(&self) -> Result<T::Output, T::Error> {
let outcome = self.execute_body().await?;
if let Some(sink) = self.output_sink() {
let id = JobId {
scope: self.scope(),
kind: JobKindId::new(T::KIND),
subject: self.subject(),
};
sink.record(&id, &outcome).await;
}
Ok(outcome)
}
}
#[async_trait::async_trait]
pub trait Job: Send + Sync + 'static {
type Output: Send + 'static;
type Error: std::error::Error + Send + Sync + 'static;
fn id(&self) -> JobId;
fn kind(&self) -> JobKindId;
async fn execute(&self) -> Result<Self::Output, Self::Error>;
}
#[async_trait::async_trait]
pub trait ErasedJob: Send + Sync + 'static {
fn id(&self) -> JobId;
fn kind(&self) -> JobKindId;
async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
#[async_trait::async_trait]
impl<T: Job> ErasedJob for T {
fn id(&self) -> JobId {
<T as Job>::id(self)
}
fn kind(&self) -> JobKindId {
<T as Job>::kind(self)
}
async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match <T as Job>::execute(self).await {
Ok(_) => Ok(()),
Err(e) => Err(Box::new(e)),
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TickReceipt {
pub tick_at: chrono::DateTime<chrono::Utc>,
pub phase_counts: std::collections::BTreeMap<String, u32>,
pub transitions_this_tick: Vec<TransitionEvent>,
pub unhealed: Vec<UnhealedDrift>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TransitionEvent {
pub at: chrono::DateTime<chrono::Utc>,
pub job_id: JobId,
pub from: JobPhase,
pub to: JobPhase,
pub reason: TransitionReason,
pub tool: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UnhealedDrift {
pub job_id: JobId,
pub phase: JobPhase,
pub age_seconds: u64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TransitionReason {
GateEvaluation,
BudgetAllocated,
ExecutionSucceeded,
ExecutionFailed(String),
RetryScheduled,
BackoffElapsed,
TimedOut,
Cancelled,
OperatorAction(String),
}
#[derive(Debug, Clone)]
pub struct Snapshot {
pub phases: std::collections::HashMap<JobId, JobPhase>,
}
impl Snapshot {
#[must_use]
pub fn failure_set(&self) -> Vec<(JobId, JobPhase)> {
self.phases
.iter()
.filter(|(_, p)| {
matches!(
p,
JobPhase::Failed { .. }
| JobPhase::Retrying { .. }
| JobPhase::Deadlettered
)
})
.map(|(id, p)| (id.clone(), p.clone()))
.collect()
}
#[must_use]
pub fn phase_counts(&self) -> std::collections::BTreeMap<&'static str, u32> {
let mut counts: std::collections::BTreeMap<&'static str, u32> =
std::collections::BTreeMap::new();
for phase in self.phases.values() {
let key = match phase {
JobPhase::Pending => "pending",
JobPhase::Gated => "gated",
JobPhase::Ready => "ready",
JobPhase::Running => "running",
JobPhase::Succeeded => "succeeded",
JobPhase::Failed { .. } => "failed",
JobPhase::Retrying { .. } => "retrying",
JobPhase::Skipped(_) => "skipped",
JobPhase::Deadlettered => "deadlettered",
JobPhase::WaitingForOperator => "waiting-for-operator",
};
*counts.entry(key).or_insert(0) += 1;
}
counts
}
}
#[cfg(test)]
mod fsm_tests {
use super::*;
fn pass() -> Signal {
Signal::EvaluateGates(GateAggregate::AllPassed)
}
fn wait() -> Signal {
Signal::EvaluateGates(GateAggregate::SomeWaiting)
}
fn skip() -> Signal {
Signal::EvaluateGates(GateAggregate::Skipped(SkipReason::GateRejected))
}
#[test]
fn pending_with_all_pass_advances_to_ready() {
assert_eq!(advance(JobPhase::Pending, pass()).unwrap(), JobPhase::Ready);
}
#[test]
fn pending_with_some_wait_advances_to_gated() {
assert_eq!(advance(JobPhase::Pending, wait()).unwrap(), JobPhase::Gated);
}
#[test]
fn pending_with_skip_advances_to_skipped() {
match advance(JobPhase::Pending, skip()).unwrap() {
JobPhase::Skipped(SkipReason::GateRejected) => {}
other => panic!("expected Skipped(GateRejected), got {other:?}"),
}
}
#[test]
fn gated_to_ready_on_all_pass() {
assert_eq!(advance(JobPhase::Gated, pass()).unwrap(), JobPhase::Ready);
}
#[test]
fn gated_stays_gated_on_some_wait() {
assert_eq!(advance(JobPhase::Gated, wait()).unwrap(), JobPhase::Gated);
}
#[test]
fn gated_to_skipped_on_skip() {
matches!(
advance(JobPhase::Gated, skip()).unwrap(),
JobPhase::Skipped(_)
);
}
#[test]
fn ready_to_running_on_allocate_budget() {
assert_eq!(
advance(JobPhase::Ready, Signal::AllocateBudget).unwrap(),
JobPhase::Running
);
}
#[test]
fn running_to_succeeded_on_ok() {
assert_eq!(
advance(JobPhase::Running, Signal::ExecutionSucceeded).unwrap(),
JobPhase::Succeeded
);
}
#[test]
fn running_to_failed_on_err() {
assert_eq!(
advance(JobPhase::Running, Signal::ExecutionFailed).unwrap(),
JobPhase::Failed { attempts: 1 }
);
}
#[test]
fn running_to_failed_on_cancel() {
assert_eq!(
advance(JobPhase::Running, Signal::Cancel).unwrap(),
JobPhase::Failed { attempts: 1 }
);
}
#[test]
fn running_to_failed_on_timeout() {
assert_eq!(
advance(JobPhase::Running, Signal::Timeout).unwrap(),
JobPhase::Failed { attempts: 1 }
);
}
#[test]
fn failed_to_retrying_when_retry_decided() {
assert_eq!(
advance(
JobPhase::Failed { attempts: 1 },
Signal::RetryDecide(RetryOutcome::Retry { until_ms: 12345 })
)
.unwrap(),
JobPhase::Retrying { until_ms: 12345 }
);
}
#[test]
fn failed_to_deadlettered_when_retries_exhausted() {
assert_eq!(
advance(
JobPhase::Failed { attempts: 3 },
Signal::RetryDecide(RetryOutcome::Deadletter)
)
.unwrap(),
JobPhase::Deadlettered
);
}
#[test]
fn retrying_to_pending_after_backoff() {
assert_eq!(
advance(JobPhase::Retrying { until_ms: 100 }, Signal::BackoffElapsed).unwrap(),
JobPhase::Pending
);
}
#[test]
fn waiting_for_operator_to_ready_via_operator() {
assert_eq!(
advance(
JobPhase::WaitingForOperator,
Signal::OperatorTransition(JobPhase::Ready)
)
.unwrap(),
JobPhase::Ready
);
}
#[test]
fn waiting_for_operator_to_skipped_via_operator() {
let result = advance(
JobPhase::WaitingForOperator,
Signal::OperatorTransition(JobPhase::Skipped(SkipReason::OperatorDecision)),
)
.unwrap();
assert!(matches!(
result,
JobPhase::Skipped(SkipReason::OperatorDecision)
));
}
#[test]
fn deadlettered_to_pending_via_operator() {
assert_eq!(
advance(
JobPhase::Deadlettered,
Signal::OperatorTransition(JobPhase::Pending)
)
.unwrap(),
JobPhase::Pending
);
}
#[test]
fn pending_with_allocate_budget_is_illegal() {
let err = advance(JobPhase::Pending, Signal::AllocateBudget).unwrap_err();
assert_eq!(err.from, JobPhase::Pending);
}
#[test]
fn succeeded_with_anything_is_illegal_except_no_outbound() {
let err = advance(JobPhase::Succeeded, Signal::AllocateBudget).unwrap_err();
assert_eq!(err.from, JobPhase::Succeeded);
}
#[test]
fn deadlettered_with_random_operator_transition_is_illegal() {
let err = advance(
JobPhase::Deadlettered,
Signal::OperatorTransition(JobPhase::Ready),
)
.unwrap_err();
assert!(matches!(err.from, JobPhase::Deadlettered));
}
#[test]
fn running_with_evaluate_gates_is_illegal() {
let err = advance(JobPhase::Running, pass()).unwrap_err();
assert_eq!(err.from, JobPhase::Running);
}
#[test]
fn waiting_for_operator_with_evaluate_gates_is_illegal() {
let err = advance(JobPhase::WaitingForOperator, pass()).unwrap_err();
assert_eq!(err.from, JobPhase::WaitingForOperator);
}
#[test]
fn signal_is_operator_driven_classifier() {
assert!(Signal::OperatorTransition(JobPhase::Pending).is_operator_driven());
assert!(!Signal::AllocateBudget.is_operator_driven());
assert!(!pass().is_operator_driven());
}
}
#[cfg(test)]
mod job_tests {
use super::*;
struct NoopJob;
#[derive(thiserror::Error, Debug)]
#[error("noop")]
struct NoopError;
#[async_trait::async_trait]
impl Job for NoopJob {
type Output = ();
type Error = NoopError;
fn id(&self) -> JobId {
JobId {
scope: JobScope::Global,
kind: JobKindId::new("noop"),
subject: JobSubject::None,
}
}
fn kind(&self) -> JobKindId {
JobKindId::new("noop")
}
async fn execute(&self) -> Result<(), NoopError> {
Ok(())
}
}
#[tokio::test]
async fn job_trait_compiles_and_executes() {
let j = NoopJob;
assert_eq!(<NoopJob as Job>::id(&j).kind.0, "noop");
assert!(j.execute().await.is_ok());
}
#[tokio::test]
async fn erased_job_blanket_impl_gives_trait_object() {
let j: Box<dyn ErasedJob> = Box::new(NoopJob);
assert_eq!(j.id().kind.0, "noop");
assert!(j.execute_erased().await.is_ok());
}
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Default)]
struct CaptureSink<O: Clone + Send + Sync + 'static> {
records: Mutex<Vec<(JobId, O)>>,
}
#[async_trait::async_trait]
impl<O: Clone + Send + Sync + 'static> OutputSink<O> for CaptureSink<O> {
async fn record(&self, job_id: &JobId, output: &O) {
self.records
.lock()
.expect("CaptureSink mutex poisoned")
.push((job_id.clone(), output.clone()));
}
}
struct RecJob {
scope: JobScope,
subject: JobSubject,
sink: Option<Arc<dyn OutputSink<u32>>>,
answer: u32,
}
#[async_trait::async_trait]
impl RecordingJob for RecJob {
type Output = u32;
type Error = NoopError;
const KIND: &'static str = "test-recording";
fn scope(&self) -> JobScope {
self.scope.clone()
}
fn subject(&self) -> JobSubject {
self.subject.clone()
}
fn output_sink(&self) -> Option<&Arc<dyn OutputSink<Self::Output>>> {
self.sink.as_ref()
}
async fn execute_body(&self) -> Result<u32, NoopError> {
Ok(self.answer)
}
}
#[tokio::test]
async fn recording_job_blanket_provides_job_id_and_kind() {
let job = RecJob {
scope: JobScope::Workspace("ws".into()),
subject: JobSubject::Repo("r".into()),
sink: None,
answer: 1,
};
let id = <RecJob as Job>::id(&job);
assert_eq!(id.kind.0, "test-recording");
match id.scope {
JobScope::Workspace(w) => assert_eq!(w, "ws"),
_ => panic!("wrong scope"),
}
match id.subject {
JobSubject::Repo(r) => assert_eq!(r, "r"),
_ => panic!("wrong subject"),
}
let kind = <RecJob as Job>::kind(&job);
assert_eq!(kind.0, "test-recording");
}
#[tokio::test]
async fn recording_job_blanket_execute_records_to_sink_on_success() {
let sink: Arc<CaptureSink<u32>> = Arc::new(CaptureSink::default());
let sink_dyn: Arc<dyn OutputSink<u32>> = sink.clone();
let job = RecJob {
scope: JobScope::Global,
subject: JobSubject::None,
sink: Some(sink_dyn),
answer: 42,
};
let result = <RecJob as Job>::execute(&job).await.unwrap();
assert_eq!(result, 42);
let recs = sink.records.lock().unwrap();
assert_eq!(recs.len(), 1, "sink should have captured one record");
assert_eq!(recs[0].1, 42);
}
#[tokio::test]
async fn recording_job_without_sink_skips_recording() {
let job = RecJob {
scope: JobScope::Global,
subject: JobSubject::None,
sink: None,
answer: 7,
};
let result = <RecJob as Job>::execute(&job).await.unwrap();
assert_eq!(result, 7);
}
}
#[cfg(test)]
mod snapshot_tests {
use super::*;
use std::collections::HashMap;
fn id(name: &str) -> JobId {
JobId {
scope: JobScope::Global,
kind: JobKindId::new("k"),
subject: JobSubject::Pinned(name.into()),
}
}
fn snapshot_with(entries: Vec<(&str, JobPhase)>) -> Snapshot {
let mut phases: HashMap<JobId, JobPhase> = HashMap::new();
for (name, phase) in entries {
phases.insert(id(name), phase);
}
Snapshot { phases }
}
#[test]
fn failure_set_includes_failed_retrying_deadlettered() {
let s = snapshot_with(vec![
("ok", JobPhase::Succeeded),
("dead", JobPhase::Deadlettered),
("flap", JobPhase::Failed { attempts: 2 }),
("waiting", JobPhase::WaitingForOperator),
("retry", JobPhase::Retrying { until_ms: 0 }),
("ready", JobPhase::Ready),
]);
let fs = s.failure_set();
let names: std::collections::HashSet<String> = fs
.iter()
.filter_map(|(id, _)| match &id.subject {
JobSubject::Pinned(s) => Some(s.clone()),
_ => None,
})
.collect();
assert_eq!(names.len(), 3);
assert!(names.contains("dead"));
assert!(names.contains("flap"));
assert!(names.contains("retry"));
assert!(!names.contains("waiting"));
assert!(!names.contains("ok"));
assert!(!names.contains("ready"));
}
#[test]
fn phase_counts_summarizes_every_phase() {
let s = snapshot_with(vec![
("a", JobPhase::Pending),
("b", JobPhase::Pending),
("c", JobPhase::Succeeded),
("d", JobPhase::Deadlettered),
]);
let counts = s.phase_counts();
assert_eq!(counts.get("pending"), Some(&2));
assert_eq!(counts.get("succeeded"), Some(&1));
assert_eq!(counts.get("deadlettered"), Some(&1));
assert!(counts.get("ready").is_none());
}
}