1#![forbid(unsafe_code)]
10
11use std::path::PathBuf;
12
13use serde::{Deserialize, Serialize};
14
15pub mod failure;
16pub use failure::{Failure, FailureKind, classify, signature};
17
18pub mod sink;
19pub use sink::{AuditFileSink, InMemorySink, MultiSink, NullSink, Sink};
20
21pub mod chain;
22pub use chain::Chain;
23
24pub mod classify;
25pub use classify::{ChainedClassifier, Classifier, FailureClassifier, FnClassifier};
26
27pub mod watch;
28pub use watch::{EscalationRouting, ScheduleWindow, TimeoutWatcher, WatchAction, WatchRule};
29
30pub mod policy;
35pub use policy::CascadePolicy;
36pub mod decision;
37pub use decision::Decision;
38pub mod converge;
43pub use converge::{
44 Action, AppliedChange, ApplyMetrics, Change, ChangeSeverity, FailedChange, NoMetrics, Outcome,
45 Plan, PlanId, Reconciler, ReconcilerError, SharedReconciler, build_outcome, change,
46 change_with_severity,
47};
48
49pub mod testing;
50
51#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
53pub struct JobId {
54 pub scope: JobScope,
55 pub kind: JobKindId,
56 pub subject: JobSubject,
57}
58
59#[derive(
60 Serialize,
61 Deserialize,
62 Debug,
63 Clone,
64 PartialEq,
65 Eq,
66 Hash,
67 gen_platform::Discriminant,
68 gen_platform::IsVariant,
69)]
70#[discriminant(method = "kind", case = "kebab")]
71pub enum JobScope {
72 Global,
73 Workspace(String),
74 Repo { workspace: String, repo: String },
75}
76
77#[derive(
78 Serialize,
79 Deserialize,
80 Debug,
81 Clone,
82 PartialEq,
83 Eq,
84 Hash,
85 gen_platform::Discriminant,
86 gen_platform::IsVariant,
87)]
88#[discriminant(method = "kind", case = "kebab")]
89pub enum JobSubject {
90 None,
91 Repo(String),
92 Org(String),
93 Path(PathBuf),
94 Pinned(String),
95}
96
97#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
102pub struct JobKindId(pub String);
103
104impl JobKindId {
105 #[must_use]
106 pub fn new(s: impl Into<String>) -> Self {
107 Self(s.into())
108 }
109}
110
111#[derive(
118 Serialize,
119 Deserialize,
120 Debug,
121 Clone,
122 PartialEq,
123 Eq,
124 Hash,
125 gen_platform::Discriminant,
126 gen_platform::IsVariant,
127)]
128#[discriminant(method = "kind", case = "kebab")]
129pub enum JobPhase {
130 Pending,
131 Gated,
132 Ready,
133 Running,
134 Succeeded,
135 Failed { attempts: u32 },
136 Retrying { until_ms: i64 },
137 Skipped(SkipReason),
138 Deadlettered,
139 WaitingForOperator,
140}
141
142#[derive(
143 Serialize,
144 Deserialize,
145 Debug,
146 Clone,
147 PartialEq,
148 Eq,
149 Hash,
150 gen_platform::Discriminant,
151 gen_platform::IsVariant,
152)]
153#[discriminant(method = "kind", case = "kebab")]
154pub enum SkipReason {
155 GateRejected,
156 BlockedByDeadletteredAncestor,
157 OperatorDecision,
158 Other(String),
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, gen_platform::Discriminant, gen_platform::IsVariant)]
167#[discriminant(method = "kind", case = "kebab")]
168pub enum Signal {
169 EvaluateGates(GateAggregate),
174 AllocateBudget,
176 ExecutionSucceeded,
178 ExecutionFailed,
180 RetryDecide(RetryOutcome),
183 Cancel,
186 Timeout,
189 BackoffElapsed,
191 OperatorTransition(JobPhase),
195}
196
197#[derive(Debug, Clone, PartialEq, Eq, gen_platform::Discriminant, gen_platform::IsVariant)]
205#[discriminant(method = "kind", case = "kebab")]
206pub enum GateAggregate {
207 AllPassed,
209 SomeWaiting,
211 Skipped(SkipReason),
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, gen_platform::TypedDispatcher)]
221pub enum RetryOutcome {
222 Retry { until_ms: i64 },
224 Deadletter,
226}
227
228gen_platform::register_dispatcher!("shigoto.retry-outcome", RetryOutcome);
233
234#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
239#[error("illegal transition: {from:?} cannot consume {signal:?}")]
240pub struct IllegalTransition {
241 pub from: JobPhase,
242 pub signal: Signal,
243}
244
245impl Signal {
246 #[must_use]
249 pub fn is_operator_driven(&self) -> bool {
250 matches!(self, Self::OperatorTransition(_))
251 }
252}
253
254pub fn advance(from: JobPhase, signal: Signal) -> Result<JobPhase, IllegalTransition> {
261 use Signal::*;
262 let new = match (&from, &signal) {
263 (JobPhase::Pending, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
265 (JobPhase::Pending, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
266 (JobPhase::Pending, EvaluateGates(GateAggregate::Skipped(r))) => {
267 JobPhase::Skipped(r.clone())
268 }
269
270 (JobPhase::Gated, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
272 (JobPhase::Gated, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
273 (JobPhase::Gated, EvaluateGates(GateAggregate::Skipped(r))) => JobPhase::Skipped(r.clone()),
274
275 (JobPhase::Ready, AllocateBudget) => JobPhase::Running,
277 (JobPhase::Ready, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
281 (JobPhase::Ready, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
282 (JobPhase::Ready, EvaluateGates(GateAggregate::Skipped(r))) => JobPhase::Skipped(r.clone()),
283
284 (JobPhase::Running, ExecutionSucceeded) => JobPhase::Succeeded,
286 (JobPhase::Running, ExecutionFailed) => JobPhase::Failed { attempts: 1 },
287 (JobPhase::Running, Cancel) => JobPhase::Failed { attempts: 1 },
288 (JobPhase::Running, Timeout) => JobPhase::Failed { attempts: 1 },
289
290 (JobPhase::Failed { attempts: _ }, RetryDecide(RetryOutcome::Retry { until_ms })) => {
292 JobPhase::Retrying {
293 until_ms: *until_ms,
294 }
295 }
296 (JobPhase::Failed { .. }, RetryDecide(RetryOutcome::Deadletter)) => JobPhase::Deadlettered,
297
298 (JobPhase::Retrying { .. }, BackoffElapsed) => JobPhase::Pending,
300 (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::AllPassed)) => JobPhase::Ready,
303 (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::SomeWaiting)) => JobPhase::Gated,
304 (JobPhase::Retrying { .. }, EvaluateGates(GateAggregate::Skipped(r))) => {
305 JobPhase::Skipped(r.clone())
306 }
307
308 (JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Ready)) => JobPhase::Ready,
311 (JobPhase::WaitingForOperator, OperatorTransition(JobPhase::Skipped(r))) => {
312 JobPhase::Skipped(r.clone())
313 }
314 (JobPhase::Deadlettered, OperatorTransition(JobPhase::Pending)) => JobPhase::Pending,
316
317 _ => return Err(IllegalTransition { from, signal }),
319 };
320 Ok(new)
321}
322
323pub trait JobInput: Send + Sync + 'static {}
326pub trait JobOutput: Send + Sync + 'static {}
327pub trait JobError: std::error::Error + Send + Sync + 'static {}
328
329#[async_trait::async_trait]
357pub trait OutputSink<O>: Send + Sync + 'static
358where
359 O: Send + Sync + 'static,
360{
361 async fn record(&self, job_id: &JobId, output: &O);
371}
372
373#[async_trait::async_trait]
395pub trait RecordingJob: Send + Sync + 'static {
396 type Output: Send + Sync + Clone + 'static;
401
402 type Error: std::error::Error + Send + Sync + 'static;
404
405 const KIND: &'static str;
409
410 fn scope(&self) -> JobScope;
413
414 fn subject(&self) -> JobSubject;
417
418 fn output_sink(&self) -> Option<&std::sync::Arc<dyn OutputSink<Self::Output>>>;
421
422 async fn execute_body(&self) -> Result<Self::Output, Self::Error>;
427}
428
429#[async_trait::async_trait]
430impl<T: RecordingJob> Job for T {
431 type Output = T::Output;
432 type Error = T::Error;
433
434 fn id(&self) -> JobId {
435 JobId {
436 scope: self.scope(),
437 kind: JobKindId::new(T::KIND),
438 subject: self.subject(),
439 }
440 }
441
442 fn kind(&self) -> JobKindId {
443 JobKindId::new(T::KIND)
444 }
445
446 async fn execute(&self) -> Result<T::Output, T::Error> {
447 let outcome = self.execute_body().await?;
448 if let Some(sink) = self.output_sink() {
449 let id = JobId {
452 scope: self.scope(),
453 kind: JobKindId::new(T::KIND),
454 subject: self.subject(),
455 };
456 sink.record(&id, &outcome).await;
457 }
458 Ok(outcome)
459 }
460}
461
462#[async_trait::async_trait]
481pub trait Job: Send + Sync + 'static {
482 type Output: Send + 'static;
483 type Error: std::error::Error + Send + Sync + 'static;
484
485 fn id(&self) -> JobId;
487
488 fn kind(&self) -> JobKindId;
490
491 async fn execute(&self) -> Result<Self::Output, Self::Error>;
495}
496
497#[async_trait::async_trait]
507pub trait ErasedJob: Send + Sync + 'static {
508 fn id(&self) -> JobId;
509 fn kind(&self) -> JobKindId;
510 async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
511}
512
513#[async_trait::async_trait]
514impl<T: Job> ErasedJob for T {
515 fn id(&self) -> JobId {
516 <T as Job>::id(self)
517 }
518
519 fn kind(&self) -> JobKindId {
520 <T as Job>::kind(self)
521 }
522
523 async fn execute_erased(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
524 match <T as Job>::execute(self).await {
525 Ok(_) => Ok(()),
526 Err(e) => Err(Box::new(e)),
527 }
528 }
529}
530
531#[derive(Serialize, Deserialize, Debug, Clone)]
533pub struct TickReceipt {
534 pub tick_at: chrono::DateTime<chrono::Utc>,
535 pub phase_counts: std::collections::BTreeMap<String, u32>,
536 pub transitions_this_tick: Vec<TransitionEvent>,
537 pub unhealed: Vec<UnhealedDrift>,
538}
539
540#[derive(Serialize, Deserialize, Debug, Clone)]
541pub struct TransitionEvent {
542 pub at: chrono::DateTime<chrono::Utc>,
543 pub job_id: JobId,
544 pub from: JobPhase,
545 pub to: JobPhase,
546 pub reason: TransitionReason,
547 pub tool: String,
550}
551
552#[derive(Serialize, Deserialize, Debug, Clone)]
553pub struct UnhealedDrift {
554 pub job_id: JobId,
555 pub phase: JobPhase,
556 pub age_seconds: u64,
557}
558
559#[derive(Serialize, Deserialize, Debug, Clone)]
560pub enum TransitionReason {
561 GateEvaluation,
562 BudgetAllocated,
563 ExecutionSucceeded,
564 ExecutionFailed(String),
565 RetryScheduled,
566 BackoffElapsed,
567 TimedOut,
568 Cancelled,
569 OperatorAction(String),
570}
571
572#[derive(Debug, Clone)]
574pub struct Snapshot {
575 pub phases: std::collections::HashMap<JobId, JobPhase>,
576}
577
578impl Snapshot {
579 #[must_use]
589 pub fn failure_set(&self) -> Vec<(JobId, JobPhase)> {
590 self.phases
591 .iter()
592 .filter(|(_, p)| {
593 matches!(
594 p,
595 JobPhase::Failed { .. } | JobPhase::Retrying { .. } | JobPhase::Deadlettered
596 )
597 })
598 .map(|(id, p)| (id.clone(), p.clone()))
599 .collect()
600 }
601
602 #[must_use]
605 pub fn phase_counts(&self) -> std::collections::BTreeMap<&'static str, u32> {
606 let mut counts: std::collections::BTreeMap<&'static str, u32> =
607 std::collections::BTreeMap::new();
608 for phase in self.phases.values() {
609 let key = match phase {
610 JobPhase::Pending => "pending",
611 JobPhase::Gated => "gated",
612 JobPhase::Ready => "ready",
613 JobPhase::Running => "running",
614 JobPhase::Succeeded => "succeeded",
615 JobPhase::Failed { .. } => "failed",
616 JobPhase::Retrying { .. } => "retrying",
617 JobPhase::Skipped(_) => "skipped",
618 JobPhase::Deadlettered => "deadlettered",
619 JobPhase::WaitingForOperator => "waiting-for-operator",
620 };
621 *counts.entry(key).or_insert(0) += 1;
622 }
623 counts
624 }
625}
626
627#[cfg(test)]
628mod fsm_tests {
629 use super::*;
630
631 fn pass() -> Signal {
632 Signal::EvaluateGates(GateAggregate::AllPassed)
633 }
634 fn wait() -> Signal {
635 Signal::EvaluateGates(GateAggregate::SomeWaiting)
636 }
637 fn skip() -> Signal {
638 Signal::EvaluateGates(GateAggregate::Skipped(SkipReason::GateRejected))
639 }
640
641 #[test]
644 fn pending_with_all_pass_advances_to_ready() {
645 assert_eq!(advance(JobPhase::Pending, pass()).unwrap(), JobPhase::Ready);
646 }
647
648 #[test]
649 fn pending_with_some_wait_advances_to_gated() {
650 assert_eq!(advance(JobPhase::Pending, wait()).unwrap(), JobPhase::Gated);
651 }
652
653 #[test]
654 fn pending_with_skip_advances_to_skipped() {
655 match advance(JobPhase::Pending, skip()).unwrap() {
656 JobPhase::Skipped(SkipReason::GateRejected) => {}
657 other => panic!("expected Skipped(GateRejected), got {other:?}"),
658 }
659 }
660
661 #[test]
664 fn gated_to_ready_on_all_pass() {
665 assert_eq!(advance(JobPhase::Gated, pass()).unwrap(), JobPhase::Ready);
666 }
667
668 #[test]
669 fn gated_stays_gated_on_some_wait() {
670 assert_eq!(advance(JobPhase::Gated, wait()).unwrap(), JobPhase::Gated);
671 }
672
673 #[test]
674 fn gated_to_skipped_on_skip() {
675 matches!(
676 advance(JobPhase::Gated, skip()).unwrap(),
677 JobPhase::Skipped(_)
678 );
679 }
680
681 #[test]
684 fn ready_to_running_on_allocate_budget() {
685 assert_eq!(
686 advance(JobPhase::Ready, Signal::AllocateBudget).unwrap(),
687 JobPhase::Running
688 );
689 }
690
691 #[test]
694 fn running_to_succeeded_on_ok() {
695 assert_eq!(
696 advance(JobPhase::Running, Signal::ExecutionSucceeded).unwrap(),
697 JobPhase::Succeeded
698 );
699 }
700
701 #[test]
702 fn running_to_failed_on_err() {
703 assert_eq!(
704 advance(JobPhase::Running, Signal::ExecutionFailed).unwrap(),
705 JobPhase::Failed { attempts: 1 }
706 );
707 }
708
709 #[test]
710 fn running_to_failed_on_cancel() {
711 assert_eq!(
712 advance(JobPhase::Running, Signal::Cancel).unwrap(),
713 JobPhase::Failed { attempts: 1 }
714 );
715 }
716
717 #[test]
718 fn running_to_failed_on_timeout() {
719 assert_eq!(
720 advance(JobPhase::Running, Signal::Timeout).unwrap(),
721 JobPhase::Failed { attempts: 1 }
722 );
723 }
724
725 #[test]
728 fn failed_to_retrying_when_retry_decided() {
729 assert_eq!(
730 advance(
731 JobPhase::Failed { attempts: 1 },
732 Signal::RetryDecide(RetryOutcome::Retry { until_ms: 12345 })
733 )
734 .unwrap(),
735 JobPhase::Retrying { until_ms: 12345 }
736 );
737 }
738
739 #[test]
740 fn failed_to_deadlettered_when_retries_exhausted() {
741 assert_eq!(
742 advance(
743 JobPhase::Failed { attempts: 3 },
744 Signal::RetryDecide(RetryOutcome::Deadletter)
745 )
746 .unwrap(),
747 JobPhase::Deadlettered
748 );
749 }
750
751 #[test]
754 fn retrying_to_pending_after_backoff() {
755 assert_eq!(
756 advance(JobPhase::Retrying { until_ms: 100 }, Signal::BackoffElapsed).unwrap(),
757 JobPhase::Pending
758 );
759 }
760
761 #[test]
764 fn waiting_for_operator_to_ready_via_operator() {
765 assert_eq!(
766 advance(
767 JobPhase::WaitingForOperator,
768 Signal::OperatorTransition(JobPhase::Ready)
769 )
770 .unwrap(),
771 JobPhase::Ready
772 );
773 }
774
775 #[test]
776 fn waiting_for_operator_to_skipped_via_operator() {
777 let result = advance(
778 JobPhase::WaitingForOperator,
779 Signal::OperatorTransition(JobPhase::Skipped(SkipReason::OperatorDecision)),
780 )
781 .unwrap();
782 assert!(matches!(
783 result,
784 JobPhase::Skipped(SkipReason::OperatorDecision)
785 ));
786 }
787
788 #[test]
789 fn deadlettered_to_pending_via_operator() {
790 assert_eq!(
791 advance(
792 JobPhase::Deadlettered,
793 Signal::OperatorTransition(JobPhase::Pending)
794 )
795 .unwrap(),
796 JobPhase::Pending
797 );
798 }
799
800 #[test]
803 fn pending_with_allocate_budget_is_illegal() {
804 let err = advance(JobPhase::Pending, Signal::AllocateBudget).unwrap_err();
805 assert_eq!(err.from, JobPhase::Pending);
806 }
807
808 #[test]
809 fn succeeded_with_anything_is_illegal_except_no_outbound() {
810 let err = advance(JobPhase::Succeeded, Signal::AllocateBudget).unwrap_err();
814 assert_eq!(err.from, JobPhase::Succeeded);
815 }
816
817 #[test]
818 fn deadlettered_with_random_operator_transition_is_illegal() {
819 let err = advance(
821 JobPhase::Deadlettered,
822 Signal::OperatorTransition(JobPhase::Ready),
823 )
824 .unwrap_err();
825 assert!(matches!(err.from, JobPhase::Deadlettered));
826 }
827
828 #[test]
829 fn running_with_evaluate_gates_is_illegal() {
830 let err = advance(JobPhase::Running, pass()).unwrap_err();
833 assert_eq!(err.from, JobPhase::Running);
834 }
835
836 #[test]
837 fn waiting_for_operator_with_evaluate_gates_is_illegal() {
838 let err = advance(JobPhase::WaitingForOperator, pass()).unwrap_err();
840 assert_eq!(err.from, JobPhase::WaitingForOperator);
841 }
842
843 #[test]
844 fn signal_is_operator_driven_classifier() {
845 assert!(Signal::OperatorTransition(JobPhase::Pending).is_operator_driven());
846 assert!(!Signal::AllocateBudget.is_operator_driven());
847 assert!(!pass().is_operator_driven());
848 }
849}
850
851#[cfg(test)]
852mod job_tests {
853 use super::*;
854
855 struct NoopJob;
859
860 #[derive(thiserror::Error, Debug)]
861 #[error("noop")]
862 struct NoopError;
863
864 #[async_trait::async_trait]
865 impl Job for NoopJob {
866 type Output = ();
867 type Error = NoopError;
868
869 fn id(&self) -> JobId {
870 JobId {
871 scope: JobScope::Global,
872 kind: JobKindId::new("noop"),
873 subject: JobSubject::None,
874 }
875 }
876
877 fn kind(&self) -> JobKindId {
878 JobKindId::new("noop")
879 }
880
881 async fn execute(&self) -> Result<(), NoopError> {
882 Ok(())
883 }
884 }
885
886 #[tokio::test]
887 async fn job_trait_compiles_and_executes() {
888 let j = NoopJob;
889 assert_eq!(<NoopJob as Job>::id(&j).kind.0, "noop");
890 assert!(j.execute().await.is_ok());
891 }
892
893 #[tokio::test]
894 async fn erased_job_blanket_impl_gives_trait_object() {
895 let j: Box<dyn ErasedJob> = Box::new(NoopJob);
896 assert_eq!(j.id().kind.0, "noop");
897 assert!(j.execute_erased().await.is_ok());
898 }
899
900 use std::sync::Arc;
903 use std::sync::Mutex;
904
905 #[derive(Default)]
908 struct CaptureSink<O: Clone + Send + Sync + 'static> {
909 records: Mutex<Vec<(JobId, O)>>,
910 }
911
912 #[async_trait::async_trait]
913 impl<O: Clone + Send + Sync + 'static> OutputSink<O> for CaptureSink<O> {
914 async fn record(&self, job_id: &JobId, output: &O) {
915 self.records
916 .lock()
917 .expect("CaptureSink mutex poisoned")
918 .push((job_id.clone(), output.clone()));
919 }
920 }
921
922 struct RecJob {
924 scope: JobScope,
925 subject: JobSubject,
926 sink: Option<Arc<dyn OutputSink<u32>>>,
927 answer: u32,
928 }
929
930 #[async_trait::async_trait]
931 impl RecordingJob for RecJob {
932 type Output = u32;
933 type Error = NoopError;
934 const KIND: &'static str = "test-recording";
935
936 fn scope(&self) -> JobScope {
937 self.scope.clone()
938 }
939 fn subject(&self) -> JobSubject {
940 self.subject.clone()
941 }
942 fn output_sink(&self) -> Option<&Arc<dyn OutputSink<Self::Output>>> {
943 self.sink.as_ref()
944 }
945 async fn execute_body(&self) -> Result<u32, NoopError> {
946 Ok(self.answer)
947 }
948 }
949
950 #[tokio::test]
951 async fn recording_job_blanket_provides_job_id_and_kind() {
952 let job = RecJob {
953 scope: JobScope::Workspace("ws".into()),
954 subject: JobSubject::Repo("r".into()),
955 sink: None,
956 answer: 1,
957 };
958 let id = <RecJob as Job>::id(&job);
959 assert_eq!(id.kind.0, "test-recording");
960 match id.scope {
961 JobScope::Workspace(w) => assert_eq!(w, "ws"),
962 _ => panic!("wrong scope"),
963 }
964 match id.subject {
965 JobSubject::Repo(r) => assert_eq!(r, "r"),
966 _ => panic!("wrong subject"),
967 }
968 let kind = <RecJob as Job>::kind(&job);
969 assert_eq!(kind.0, "test-recording");
970 }
971
972 #[tokio::test]
973 async fn recording_job_blanket_execute_records_to_sink_on_success() {
974 let sink: Arc<CaptureSink<u32>> = Arc::new(CaptureSink::default());
975 let sink_dyn: Arc<dyn OutputSink<u32>> = sink.clone();
976 let job = RecJob {
977 scope: JobScope::Global,
978 subject: JobSubject::None,
979 sink: Some(sink_dyn),
980 answer: 42,
981 };
982 let result = <RecJob as Job>::execute(&job).await.unwrap();
983 assert_eq!(result, 42);
984
985 let recs = sink.records.lock().unwrap();
986 assert_eq!(recs.len(), 1, "sink should have captured one record");
987 assert_eq!(recs[0].1, 42);
988 }
989
990 #[tokio::test]
991 async fn recording_job_without_sink_skips_recording() {
992 let job = RecJob {
993 scope: JobScope::Global,
994 subject: JobSubject::None,
995 sink: None,
996 answer: 7,
997 };
998 let result = <RecJob as Job>::execute(&job).await.unwrap();
1001 assert_eq!(result, 7);
1002 }
1003}
1004
1005#[cfg(test)]
1006mod snapshot_tests {
1007 use super::*;
1008 use std::collections::HashMap;
1009
1010 fn id(name: &str) -> JobId {
1011 JobId {
1012 scope: JobScope::Global,
1013 kind: JobKindId::new("k"),
1014 subject: JobSubject::Pinned(name.into()),
1015 }
1016 }
1017
1018 fn snapshot_with(entries: Vec<(&str, JobPhase)>) -> Snapshot {
1019 let mut phases: HashMap<JobId, JobPhase> = HashMap::new();
1020 for (name, phase) in entries {
1021 phases.insert(id(name), phase);
1022 }
1023 Snapshot { phases }
1024 }
1025
1026 #[test]
1027 fn failure_set_includes_failed_retrying_deadlettered() {
1028 let s = snapshot_with(vec![
1029 ("ok", JobPhase::Succeeded),
1030 ("dead", JobPhase::Deadlettered),
1031 ("flap", JobPhase::Failed { attempts: 2 }),
1032 ("waiting", JobPhase::WaitingForOperator),
1033 ("retry", JobPhase::Retrying { until_ms: 0 }),
1034 ("ready", JobPhase::Ready),
1035 ]);
1036 let fs = s.failure_set();
1037 let names: std::collections::HashSet<String> = fs
1038 .iter()
1039 .filter_map(|(id, _)| match &id.subject {
1040 JobSubject::Pinned(s) => Some(s.clone()),
1041 _ => None,
1042 })
1043 .collect();
1044 assert_eq!(names.len(), 3);
1045 assert!(names.contains("dead"));
1046 assert!(names.contains("flap"));
1047 assert!(names.contains("retry"));
1048 assert!(!names.contains("waiting"));
1051 assert!(!names.contains("ok"));
1053 assert!(!names.contains("ready"));
1054 }
1055
1056 #[test]
1057 fn phase_counts_summarizes_every_phase() {
1058 let s = snapshot_with(vec![
1059 ("a", JobPhase::Pending),
1060 ("b", JobPhase::Pending),
1061 ("c", JobPhase::Succeeded),
1062 ("d", JobPhase::Deadlettered),
1063 ]);
1064 let counts = s.phase_counts();
1065 assert_eq!(counts.get("pending"), Some(&2));
1066 assert_eq!(counts.get("succeeded"), Some(&1));
1067 assert_eq!(counts.get("deadlettered"), Some(&1));
1068 assert!(counts.get("ready").is_none());
1070 }
1071}