1#![forbid(unsafe_code)]
10
11use std::path::PathBuf;
12
13use serde::{Deserialize, Serialize};
14
15pub mod failure;
16pub use failure::{classify, signature, Failure, FailureKind};
17
18pub mod sink;
19pub use sink::{AuditFileSink, InMemorySink, MultiSink, NullSink, Sink};
20
21pub mod chain;
22pub use chain::Chain;
23
24pub mod classify;
25pub use classify::{ChainedClassifier, Classifier, FailureClassifier, FnClassifier};
26
27pub mod watch;
28pub use watch::{EscalationRouting, ScheduleWindow, TimeoutWatcher, WatchAction, WatchRule};
29
30pub mod policy;
35pub use policy::CascadePolicy;
36pub mod decision;
37pub use decision::Decision;
38
39pub mod testing;
40
41#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
43pub struct JobId {
44 pub scope: JobScope,
45 pub kind: JobKindId,
46 pub subject: JobSubject,
47}
48
49#[derive(
50 Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
51 gen_platform::Discriminant,
52 gen_platform::IsVariant,
53)]
54#[discriminant(method = "kind", case = "kebab")]
55pub enum JobScope {
56 Global,
57 Workspace(String),
58 Repo { workspace: String, repo: String },
59}
60
61#[derive(
62 Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
63 gen_platform::Discriminant,
64 gen_platform::IsVariant,
65)]
66#[discriminant(method = "kind", case = "kebab")]
67pub enum JobSubject {
68 None,
69 Repo(String),
70 Org(String),
71 Path(PathBuf),
72 Pinned(String),
73}
74
75#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
80pub struct JobKindId(pub String);
81
82impl JobKindId {
83 #[must_use]
84 pub fn new(s: impl Into<String>) -> Self {
85 Self(s.into())
86 }
87}
88
89#[derive(
96 Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
97 gen_platform::Discriminant,
98 gen_platform::IsVariant,
99)]
100#[discriminant(method = "kind", case = "kebab")]
101pub enum JobPhase {
102 Pending,
103 Gated,
104 Ready,
105 Running,
106 Succeeded,
107 Failed { attempts: u32 },
108 Retrying { until_ms: i64 },
109 Skipped(SkipReason),
110 Deadlettered,
111 WaitingForOperator,
112}
113
114#[derive(
115 Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
116 gen_platform::Discriminant,
117 gen_platform::IsVariant,
118)]
119#[discriminant(method = "kind", case = "kebab")]
120pub enum SkipReason {
121 GateRejected,
122 BlockedByDeadletteredAncestor,
123 OperatorDecision,
124 Other(String),
125}
126
127#[derive(
133 Debug, Clone, PartialEq, Eq,
134 gen_platform::Discriminant,
135 gen_platform::IsVariant,
136)]
137#[discriminant(method = "kind", case = "kebab")]
138pub enum Signal {
139 EvaluateGates(GateAggregate),
144 AllocateBudget,
146 ExecutionSucceeded,
148 ExecutionFailed,
150 RetryDecide(RetryOutcome),
153 Cancel,
156 Timeout,
159 BackoffElapsed,
161 OperatorTransition(JobPhase),
165}
166
167#[derive(
175 Debug, Clone, PartialEq, Eq,
176 gen_platform::Discriminant,
177 gen_platform::IsVariant,
178)]
179#[discriminant(method = "kind", case = "kebab")]
180pub enum GateAggregate {
181 AllPassed,
183 SomeWaiting,
185 Skipped(SkipReason),
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, gen_platform::TypedDispatcher)]
195pub enum RetryOutcome {
196 Retry { until_ms: i64 },
198 Deadletter,
200}
201
202gen_platform::register_dispatcher!("shigoto.retry-outcome", RetryOutcome);
207
208#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
213#[error("illegal transition: {from:?} cannot consume {signal:?}")]
214pub struct IllegalTransition {
215 pub from: JobPhase,
216 pub signal: Signal,
217}
218
219impl Signal {
220 #[must_use]
223 pub fn is_operator_driven(&self) -> bool {
224 matches!(self, Self::OperatorTransition(_))
225 }
226}
227
228pub fn advance(from: JobPhase, signal: Signal) -> Result<JobPhase, IllegalTransition> {
235 use Signal::*;
236 let new = match (&from, &signal) {
237 (JobPhase::Pending, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
239 (JobPhase::Pending, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
240 (JobPhase::Pending, EvaluateGates(GateAggregate::Skipped(r))) => {
241 JobPhase::Skipped(r.clone())
242 }
243
244 (JobPhase::Gated, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
246 (JobPhase::Gated, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
247 (JobPhase::Gated, EvaluateGates(GateAggregate::Skipped(r))) => {
248 JobPhase::Skipped(r.clone())
249 }
250
251 (JobPhase::Ready, AllocateBudget) => JobPhase::Running,
253 (JobPhase::Ready, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
257 (JobPhase::Ready, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
258 (JobPhase::Ready, EvaluateGates(GateAggregate::Skipped(r))) => {
259 JobPhase::Skipped(r.clone())
260 }
261
262 (JobPhase::Running, ExecutionSucceeded) => JobPhase::Succeeded,
264 (JobPhase::Running, ExecutionFailed) => JobPhase::Failed { attempts: 1 },
265 (JobPhase::Running, Cancel) => JobPhase::Failed { attempts: 1 },
266 (JobPhase::Running, Timeout) => JobPhase::Failed { attempts: 1 },
267
268 (JobPhase::Failed { attempts: _ }, RetryDecide(RetryOutcome::Retry { until_ms })) => {
270 JobPhase::Retrying {
271 until_ms: *until_ms,
272 }
273 }
274 (JobPhase::Failed { .. }, RetryDecide(RetryOutcome::Deadletter)) => JobPhase::Deadlettered,
275
276 (JobPhase::Retrying { .. }, BackoffElapsed) => JobPhase::Pending,
278 (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
281 (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
282 (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::Skipped(r))) => {
283 JobPhase::Skipped(r.clone())
284 }
285
286 (JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Ready)) => JobPhase::Ready,
289 (JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Skipped(r))) => {
290 JobPhase::Skipped(r.clone())
291 }
292 (JobPhase::Deadlettered, OperatorTransition(JobPhase::Pending)) => JobPhase::Pending,
294
295 _ => return Err(IllegalTransition { from, signal }),
297 };
298 Ok(new)
299}
300
301pub trait JobInput: Send + Sync + 'static {}
304pub trait JobOutput: Send + Sync + 'static {}
305pub trait JobError: std::error::Error + Send + Sync + 'static {}
306
307#[async_trait::async_trait]
335pub trait OutputSink<O>: Send + Sync + 'static
336where
337 O: Send + Sync + 'static,
338{
339 async fn record(&self, job_id: &JobId, output: &O);
349}
350
351#[async_trait::async_trait]
373pub trait RecordingJob: Send + Sync + 'static {
374 type Output: Send + Sync + Clone + 'static;
379
380 type Error: std::error::Error + Send + Sync + 'static;
382
383 const KIND: &'static str;
387
388 fn scope(&self) -> JobScope;
391
392 fn subject(&self) -> JobSubject;
395
396 fn output_sink(&self) -> Option<&std::sync::Arc<dyn OutputSink<Self::Output>>>;
399
400 async fn execute_body(&self) -> Result<Self::Output, Self::Error>;
405}
406
407#[async_trait::async_trait]
408impl<T: RecordingJob> Job for T {
409 type Output = T::Output;
410 type Error = T::Error;
411
412 fn id(&self) -> JobId {
413 JobId {
414 scope: self.scope(),
415 kind: JobKindId::new(T::KIND),
416 subject: self.subject(),
417 }
418 }
419
420 fn kind(&self) -> JobKindId {
421 JobKindId::new(T::KIND)
422 }
423
424 async fn execute(&self) -> Result<T::Output, T::Error> {
425 let outcome = self.execute_body().await?;
426 if let Some(sink) = self.output_sink() {
427 let id = JobId {
430 scope: self.scope(),
431 kind: JobKindId::new(T::KIND),
432 subject: self.subject(),
433 };
434 sink.record(&id, &outcome).await;
435 }
436 Ok(outcome)
437 }
438}
439
440#[async_trait::async_trait]
459pub trait Job: Send + Sync + 'static {
460 type Output: Send + 'static;
461 type Error: std::error::Error + Send + Sync + 'static;
462
463 fn id(&self) -> JobId;
465
466 fn kind(&self) -> JobKindId;
468
469 async fn execute(&self) -> Result<Self::Output, Self::Error>;
473}
474
475#[async_trait::async_trait]
485pub trait ErasedJob: Send + Sync + 'static {
486 fn id(&self) -> JobId;
487 fn kind(&self) -> JobKindId;
488 async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
489}
490
491#[async_trait::async_trait]
492impl<T: Job> ErasedJob for T {
493 fn id(&self) -> JobId {
494 <T as Job>::id(self)
495 }
496
497 fn kind(&self) -> JobKindId {
498 <T as Job>::kind(self)
499 }
500
501 async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
502 match <T as Job>::execute(self).await {
503 Ok(_) => Ok(()),
504 Err(e) => Err(Box::new(e)),
505 }
506 }
507}
508
509#[derive(Serialize, Deserialize, Debug, Clone)]
511pub struct TickReceipt {
512 pub tick_at: chrono::DateTime<chrono::Utc>,
513 pub phase_counts: std::collections::BTreeMap<String, u32>,
514 pub transitions_this_tick: Vec<TransitionEvent>,
515 pub unhealed: Vec<UnhealedDrift>,
516}
517
518#[derive(Serialize, Deserialize, Debug, Clone)]
519pub struct TransitionEvent {
520 pub at: chrono::DateTime<chrono::Utc>,
521 pub job_id: JobId,
522 pub from: JobPhase,
523 pub to: JobPhase,
524 pub reason: TransitionReason,
525 pub tool: String,
528}
529
530#[derive(Serialize, Deserialize, Debug, Clone)]
531pub struct UnhealedDrift {
532 pub job_id: JobId,
533 pub phase: JobPhase,
534 pub age_seconds: u64,
535}
536
537#[derive(Serialize, Deserialize, Debug, Clone)]
538pub enum TransitionReason {
539 GateEvaluation,
540 BudgetAllocated,
541 ExecutionSucceeded,
542 ExecutionFailed(String),
543 RetryScheduled,
544 BackoffElapsed,
545 TimedOut,
546 Cancelled,
547 OperatorAction(String),
548}
549
550#[derive(Debug, Clone)]
552pub struct Snapshot {
553 pub phases: std::collections::HashMap<JobId, JobPhase>,
554}
555
556impl Snapshot {
557 #[must_use]
567 pub fn failure_set(&self) -> Vec<(JobId, JobPhase)> {
568 self.phases
569 .iter()
570 .filter(|(_, p)| {
571 matches!(
572 p,
573 JobPhase::Failed { .. }
574 | JobPhase::Retrying { .. }
575 | JobPhase::Deadlettered
576 )
577 })
578 .map(|(id, p)| (id.clone(), p.clone()))
579 .collect()
580 }
581
582 #[must_use]
585 pub fn phase_counts(&self) -> std::collections::BTreeMap<&'static str, u32> {
586 let mut counts: std::collections::BTreeMap<&'static str, u32> =
587 std::collections::BTreeMap::new();
588 for phase in self.phases.values() {
589 let key = match phase {
590 JobPhase::Pending => "pending",
591 JobPhase::Gated => "gated",
592 JobPhase::Ready => "ready",
593 JobPhase::Running => "running",
594 JobPhase::Succeeded => "succeeded",
595 JobPhase::Failed { .. } => "failed",
596 JobPhase::Retrying { .. } => "retrying",
597 JobPhase::Skipped(_) => "skipped",
598 JobPhase::Deadlettered => "deadlettered",
599 JobPhase::WaitingForOperator => "waiting-for-operator",
600 };
601 *counts.entry(key).or_insert(0) += 1;
602 }
603 counts
604 }
605}
606
607#[cfg(test)]
608mod fsm_tests {
609 use super::*;
610
611 fn pass() -> Signal {
612 Signal::EvaluateGates(GateAggregate::AllPassed)
613 }
614 fn wait() -> Signal {
615 Signal::EvaluateGates(GateAggregate::SomeWaiting)
616 }
617 fn skip() -> Signal {
618 Signal::EvaluateGates(GateAggregate::Skipped(SkipReason::GateRejected))
619 }
620
621 #[test]
624 fn pending_with_all_pass_advances_to_ready() {
625 assert_eq!(advance(JobPhase::Pending, pass()).unwrap(), JobPhase::Ready);
626 }
627
628 #[test]
629 fn pending_with_some_wait_advances_to_gated() {
630 assert_eq!(advance(JobPhase::Pending, wait()).unwrap(), JobPhase::Gated);
631 }
632
633 #[test]
634 fn pending_with_skip_advances_to_skipped() {
635 match advance(JobPhase::Pending, skip()).unwrap() {
636 JobPhase::Skipped(SkipReason::GateRejected) => {}
637 other => panic!("expected Skipped(GateRejected), got {other:?}"),
638 }
639 }
640
641 #[test]
644 fn gated_to_ready_on_all_pass() {
645 assert_eq!(advance(JobPhase::Gated, pass()).unwrap(), JobPhase::Ready);
646 }
647
648 #[test]
649 fn gated_stays_gated_on_some_wait() {
650 assert_eq!(advance(JobPhase::Gated, wait()).unwrap(), JobPhase::Gated);
651 }
652
653 #[test]
654 fn gated_to_skipped_on_skip() {
655 matches!(
656 advance(JobPhase::Gated, skip()).unwrap(),
657 JobPhase::Skipped(_)
658 );
659 }
660
661 #[test]
664 fn ready_to_running_on_allocate_budget() {
665 assert_eq!(
666 advance(JobPhase::Ready, Signal::AllocateBudget).unwrap(),
667 JobPhase::Running
668 );
669 }
670
671 #[test]
674 fn running_to_succeeded_on_ok() {
675 assert_eq!(
676 advance(JobPhase::Running, Signal::ExecutionSucceeded).unwrap(),
677 JobPhase::Succeeded
678 );
679 }
680
681 #[test]
682 fn running_to_failed_on_err() {
683 assert_eq!(
684 advance(JobPhase::Running, Signal::ExecutionFailed).unwrap(),
685 JobPhase::Failed { attempts: 1 }
686 );
687 }
688
689 #[test]
690 fn running_to_failed_on_cancel() {
691 assert_eq!(
692 advance(JobPhase::Running, Signal::Cancel).unwrap(),
693 JobPhase::Failed { attempts: 1 }
694 );
695 }
696
697 #[test]
698 fn running_to_failed_on_timeout() {
699 assert_eq!(
700 advance(JobPhase::Running, Signal::Timeout).unwrap(),
701 JobPhase::Failed { attempts: 1 }
702 );
703 }
704
705 #[test]
708 fn failed_to_retrying_when_retry_decided() {
709 assert_eq!(
710 advance(
711 JobPhase::Failed { attempts: 1 },
712 Signal::RetryDecide(RetryOutcome::Retry { until_ms: 12345 })
713 )
714 .unwrap(),
715 JobPhase::Retrying { until_ms: 12345 }
716 );
717 }
718
719 #[test]
720 fn failed_to_deadlettered_when_retries_exhausted() {
721 assert_eq!(
722 advance(
723 JobPhase::Failed { attempts: 3 },
724 Signal::RetryDecide(RetryOutcome::Deadletter)
725 )
726 .unwrap(),
727 JobPhase::Deadlettered
728 );
729 }
730
731 #[test]
734 fn retrying_to_pending_after_backoff() {
735 assert_eq!(
736 advance(JobPhase::Retrying { until_ms: 100 }, Signal::BackoffElapsed).unwrap(),
737 JobPhase::Pending
738 );
739 }
740
741 #[test]
744 fn waiting_for_operator_to_ready_via_operator() {
745 assert_eq!(
746 advance(
747 JobPhase::WaitingForOperator,
748 Signal::OperatorTransition(JobPhase::Ready)
749 )
750 .unwrap(),
751 JobPhase::Ready
752 );
753 }
754
755 #[test]
756 fn waiting_for_operator_to_skipped_via_operator() {
757 let result = advance(
758 JobPhase::WaitingForOperator,
759 Signal::OperatorTransition(JobPhase::Skipped(SkipReason::OperatorDecision)),
760 )
761 .unwrap();
762 assert!(matches!(
763 result,
764 JobPhase::Skipped(SkipReason::OperatorDecision)
765 ));
766 }
767
768 #[test]
769 fn deadlettered_to_pending_via_operator() {
770 assert_eq!(
771 advance(
772 JobPhase::Deadlettered,
773 Signal::OperatorTransition(JobPhase::Pending)
774 )
775 .unwrap(),
776 JobPhase::Pending
777 );
778 }
779
780 #[test]
783 fn pending_with_allocate_budget_is_illegal() {
784 let err = advance(JobPhase::Pending, Signal::AllocateBudget).unwrap_err();
785 assert_eq!(err.from, JobPhase::Pending);
786 }
787
788 #[test]
789 fn succeeded_with_anything_is_illegal_except_no_outbound() {
790 let err = advance(JobPhase::Succeeded, Signal::AllocateBudget).unwrap_err();
794 assert_eq!(err.from, JobPhase::Succeeded);
795 }
796
797 #[test]
798 fn deadlettered_with_random_operator_transition_is_illegal() {
799 let err = advance(
801 JobPhase::Deadlettered,
802 Signal::OperatorTransition(JobPhase::Ready),
803 )
804 .unwrap_err();
805 assert!(matches!(err.from, JobPhase::Deadlettered));
806 }
807
808 #[test]
809 fn running_with_evaluate_gates_is_illegal() {
810 let err = advance(JobPhase::Running, pass()).unwrap_err();
813 assert_eq!(err.from, JobPhase::Running);
814 }
815
816 #[test]
817 fn waiting_for_operator_with_evaluate_gates_is_illegal() {
818 let err = advance(JobPhase::WaitingForOperator, pass()).unwrap_err();
820 assert_eq!(err.from, JobPhase::WaitingForOperator);
821 }
822
823 #[test]
824 fn signal_is_operator_driven_classifier() {
825 assert!(Signal::OperatorTransition(JobPhase::Pending).is_operator_driven());
826 assert!(!Signal::AllocateBudget.is_operator_driven());
827 assert!(!pass().is_operator_driven());
828 }
829}
830
831#[cfg(test)]
832mod job_tests {
833 use super::*;
834
835 struct NoopJob;
839
840 #[derive(thiserror::Error, Debug)]
841 #[error("noop")]
842 struct NoopError;
843
844 #[async_trait::async_trait]
845 impl Job for NoopJob {
846 type Output = ();
847 type Error = NoopError;
848
849 fn id(&self) -> JobId {
850 JobId {
851 scope: JobScope::Global,
852 kind: JobKindId::new("noop"),
853 subject: JobSubject::None,
854 }
855 }
856
857 fn kind(&self) -> JobKindId {
858 JobKindId::new("noop")
859 }
860
861 async fn execute(&self) -> Result<(), NoopError> {
862 Ok(())
863 }
864 }
865
866 #[tokio::test]
867 async fn job_trait_compiles_and_executes() {
868 let j = NoopJob;
869 assert_eq!(<NoopJob as Job>::id(&j).kind.0, "noop");
870 assert!(j.execute().await.is_ok());
871 }
872
873 #[tokio::test]
874 async fn erased_job_blanket_impl_gives_trait_object() {
875 let j: Box<dyn ErasedJob> = Box::new(NoopJob);
876 assert_eq!(j.id().kind.0, "noop");
877 assert!(j.execute_erased().await.is_ok());
878 }
879
880 use std::sync::Arc;
883 use std::sync::Mutex;
884
885 #[derive(Default)]
888 struct CaptureSink<O: Clone + Send + Sync + 'static> {
889 records: Mutex<Vec<(JobId, O)>>,
890 }
891
892 #[async_trait::async_trait]
893 impl<O: Clone + Send + Sync + 'static> OutputSink<O> for CaptureSink<O> {
894 async fn record(&self, job_id: &JobId, output: &O) {
895 self.records
896 .lock()
897 .expect("CaptureSink mutex poisoned")
898 .push((job_id.clone(), output.clone()));
899 }
900 }
901
902 struct RecJob {
904 scope: JobScope,
905 subject: JobSubject,
906 sink: Option<Arc<dyn OutputSink<u32>>>,
907 answer: u32,
908 }
909
910 #[async_trait::async_trait]
911 impl RecordingJob for RecJob {
912 type Output = u32;
913 type Error = NoopError;
914 const KIND: &'static str = "test-recording";
915
916 fn scope(&self) -> JobScope {
917 self.scope.clone()
918 }
919 fn subject(&self) -> JobSubject {
920 self.subject.clone()
921 }
922 fn output_sink(&self) -> Option<&Arc<dyn OutputSink<Self::Output>>> {
923 self.sink.as_ref()
924 }
925 async fn execute_body(&self) -> Result<u32, NoopError> {
926 Ok(self.answer)
927 }
928 }
929
930 #[tokio::test]
931 async fn recording_job_blanket_provides_job_id_and_kind() {
932 let job = RecJob {
933 scope: JobScope::Workspace("ws".into()),
934 subject: JobSubject::Repo("r".into()),
935 sink: None,
936 answer: 1,
937 };
938 let id = <RecJob as Job>::id(&job);
939 assert_eq!(id.kind.0, "test-recording");
940 match id.scope {
941 JobScope::Workspace(w) => assert_eq!(w, "ws"),
942 _ => panic!("wrong scope"),
943 }
944 match id.subject {
945 JobSubject::Repo(r) => assert_eq!(r, "r"),
946 _ => panic!("wrong subject"),
947 }
948 let kind = <RecJob as Job>::kind(&job);
949 assert_eq!(kind.0, "test-recording");
950 }
951
952 #[tokio::test]
953 async fn recording_job_blanket_execute_records_to_sink_on_success() {
954 let sink: Arc<CaptureSink<u32>> = Arc::new(CaptureSink::default());
955 let sink_dyn: Arc<dyn OutputSink<u32>> = sink.clone();
956 let job = RecJob {
957 scope: JobScope::Global,
958 subject: JobSubject::None,
959 sink: Some(sink_dyn),
960 answer: 42,
961 };
962 let result = <RecJob as Job>::execute(&job).await.unwrap();
963 assert_eq!(result, 42);
964
965 let recs = sink.records.lock().unwrap();
966 assert_eq!(recs.len(), 1, "sink should have captured one record");
967 assert_eq!(recs[0].1, 42);
968 }
969
970 #[tokio::test]
971 async fn recording_job_without_sink_skips_recording() {
972 let job = RecJob {
973 scope: JobScope::Global,
974 subject: JobSubject::None,
975 sink: None,
976 answer: 7,
977 };
978 let result = <RecJob as Job>::execute(&job).await.unwrap();
981 assert_eq!(result, 7);
982 }
983}
984
985#[cfg(test)]
986mod snapshot_tests {
987 use super::*;
988 use std::collections::HashMap;
989
990 fn id(name: &str) -> JobId {
991 JobId {
992 scope: JobScope::Global,
993 kind: JobKindId::new("k"),
994 subject: JobSubject::Pinned(name.into()),
995 }
996 }
997
998 fn snapshot_with(entries: Vec<(&str, JobPhase)>) -> Snapshot {
999 let mut phases: HashMap<JobId, JobPhase> = HashMap::new();
1000 for (name, phase) in entries {
1001 phases.insert(id(name), phase);
1002 }
1003 Snapshot { phases }
1004 }
1005
1006 #[test]
1007 fn failure_set_includes_failed_retrying_deadlettered() {
1008 let s = snapshot_with(vec![
1009 ("ok", JobPhase::Succeeded),
1010 ("dead", JobPhase::Deadlettered),
1011 ("flap", JobPhase::Failed { attempts: 2 }),
1012 ("waiting", JobPhase::WaitingForOperator),
1013 ("retry", JobPhase::Retrying { until_ms: 0 }),
1014 ("ready", JobPhase::Ready),
1015 ]);
1016 let fs = s.failure_set();
1017 let names: std::collections::HashSet<String> = fs
1018 .iter()
1019 .filter_map(|(id, _)| match &id.subject {
1020 JobSubject::Pinned(s) => Some(s.clone()),
1021 _ => None,
1022 })
1023 .collect();
1024 assert_eq!(names.len(), 3);
1025 assert!(names.contains("dead"));
1026 assert!(names.contains("flap"));
1027 assert!(names.contains("retry"));
1028 assert!(!names.contains("waiting"));
1031 assert!(!names.contains("ok"));
1033 assert!(!names.contains("ready"));
1034 }
1035
1036 #[test]
1037 fn phase_counts_summarizes_every_phase() {
1038 let s = snapshot_with(vec![
1039 ("a", JobPhase::Pending),
1040 ("b", JobPhase::Pending),
1041 ("c", JobPhase::Succeeded),
1042 ("d", JobPhase::Deadlettered),
1043 ]);
1044 let counts = s.phase_counts();
1045 assert_eq!(counts.get("pending"), Some(&2));
1046 assert_eq!(counts.get("succeeded"), Some(&1));
1047 assert_eq!(counts.get("deadlettered"), Some(&1));
1048 assert!(counts.get("ready").is_none());
1050 }
1051}