1use std::{
16 sync::Arc,
17 sync::atomic::{AtomicU32, AtomicU64, Ordering},
18 time::Duration,
19};
20use tokio::time::sleep;
21use tokio_util::sync::CancellationToken;
22
23struct MetricsSubscriber {
24 starts: AtomicU64,
25 failures: AtomicU64,
26 successes: AtomicU64,
27}
28
29impl MetricsSubscriber {
30 fn new() -> Self {
31 Self {
32 starts: AtomicU64::new(0),
33 failures: AtomicU64::new(0),
34 successes: AtomicU64::new(0),
35 }
36 }
37 fn print_stats(&self) {
38 println!();
39 println!("Metrics:");
40 println!(" ├─► Starts: {}", self.starts.load(Ordering::Relaxed));
41 println!(" ├─► Failures: {}", self.failures.load(Ordering::Relaxed));
42 println!(" └─► Successes: {}", self.successes.load(Ordering::Relaxed));
43 }
44}
45
46#[async_trait::async_trait]
47impl taskvisor::Subscribe for MetricsSubscriber {
48 async fn on_event(&self, ev: &taskvisor::Event) {
49 match ev.kind {
50 taskvisor::EventKind::TaskStarting => {
51 self.starts.fetch_add(1, Ordering::Relaxed);
52 }
53 taskvisor::EventKind::TaskStopped => {
54 self.successes.fetch_add(1, Ordering::Relaxed);
55 }
56 taskvisor::EventKind::TaskFailed => {
57 self.failures.fetch_add(1, Ordering::Relaxed);
58 }
59 _ => {}
60 }
61 }
62 fn name(&self) -> &'static str {
63 "metrics"
64 }
65 fn queue_capacity(&self) -> usize {
66 1024
67 }
68}
69
70fn make_spec() -> taskvisor::TaskSpec {
71 let counter = Arc::new(AtomicU32::new(0));
72
73 let task: taskvisor::TaskRef =
74 taskvisor::TaskFn::arc("flaky", move |ctx: CancellationToken| {
75 let counter = Arc::clone(&counter);
76 async move {
77 if ctx.is_cancelled() {
78 return Err(taskvisor::TaskError::Canceled);
79 }
80
81 let attempt = counter.fetch_add(1, Ordering::Relaxed) + 1;
82 sleep(Duration::from_millis(100)).await;
83
84 if attempt <= 4 {
85 return Err(taskvisor::TaskError::Fail {
86 reason: format!("attempt {attempt} failed"),
87 });
88 }
89 Ok(())
90 }
91 });
92 taskvisor::TaskSpec::new(
93 task,
94 taskvisor::RestartPolicy::OnFailure,
95 taskvisor::BackoffPolicy::default(),
96 None,
97 )
98}
99
100#[tokio::main(flavor = "current_thread")]
101async fn main() -> anyhow::Result<()> {
102 let metrics = Arc::new(MetricsSubscriber::new());
103
104 let subs: Vec<Arc<dyn taskvisor::Subscribe>> =
105 vec![Arc::clone(&metrics) as Arc<dyn taskvisor::Subscribe>];
106 let sup = taskvisor::Supervisor::new(taskvisor::SupervisorConfig::default(), subs);
107
108 sup.run(vec![make_spec()]).await?;
109 metrics.print_stats();
110 Ok(())
111}