Skip to main content

subscriber/
subscriber.rs

1//! # Custom Subscriber Example
2//!
3//! Shows how to implement a custom event subscriber to track task metrics.
4//!
5//! The example counts:
6//! - Total task starts
7//! - Successful completions
8//! - Failures
9//!
10//! ## Run
11//! ```bash
12//! cargo run --example subscriber
13//! ```
14
15use std::{
16    sync::Arc,
17    sync::atomic::{AtomicU32, AtomicU64, Ordering},
18    time::Duration,
19};
20use tokio::time::sleep;
21use tokio_util::sync::CancellationToken;
22
23struct MetricsSubscriber {
24    starts: AtomicU64,
25    failures: AtomicU64,
26    successes: AtomicU64,
27}
28
29impl MetricsSubscriber {
30    fn new() -> Self {
31        Self {
32            starts: AtomicU64::new(0),
33            failures: AtomicU64::new(0),
34            successes: AtomicU64::new(0),
35        }
36    }
37    fn print_stats(&self) {
38        println!();
39        println!("Metrics:");
40        println!(" ├─► Starts:    {}", self.starts.load(Ordering::Relaxed));
41        println!(" ├─► Failures:  {}", self.failures.load(Ordering::Relaxed));
42        println!(" └─► Successes: {}", self.successes.load(Ordering::Relaxed));
43    }
44}
45
46#[async_trait::async_trait]
47impl taskvisor::Subscribe for MetricsSubscriber {
48    async fn on_event(&self, ev: &taskvisor::Event) {
49        match ev.kind {
50            taskvisor::EventKind::TaskStarting => {
51                self.starts.fetch_add(1, Ordering::Relaxed);
52            }
53            taskvisor::EventKind::TaskStopped => {
54                self.successes.fetch_add(1, Ordering::Relaxed);
55            }
56            taskvisor::EventKind::TaskFailed => {
57                self.failures.fetch_add(1, Ordering::Relaxed);
58            }
59            _ => {}
60        }
61    }
62    fn name(&self) -> &'static str {
63        "metrics"
64    }
65    fn queue_capacity(&self) -> usize {
66        1024
67    }
68}
69
70fn make_spec() -> taskvisor::TaskSpec {
71    let counter = Arc::new(AtomicU32::new(0));
72
73    let task: taskvisor::TaskRef =
74        taskvisor::TaskFn::arc("flaky", move |ctx: CancellationToken| {
75            let counter = Arc::clone(&counter);
76            async move {
77                if ctx.is_cancelled() {
78                    return Err(taskvisor::TaskError::Canceled);
79                }
80
81                let attempt = counter.fetch_add(1, Ordering::Relaxed) + 1;
82                sleep(Duration::from_millis(100)).await;
83
84                if attempt <= 4 {
85                    return Err(taskvisor::TaskError::Fail {
86                        reason: format!("attempt {attempt} failed"),
87                    });
88                }
89                Ok(())
90            }
91        });
92    taskvisor::TaskSpec::new(
93        task,
94        taskvisor::RestartPolicy::OnFailure,
95        taskvisor::BackoffPolicy::default(),
96        None,
97    )
98}
99
100#[tokio::main(flavor = "current_thread")]
101async fn main() -> anyhow::Result<()> {
102    let metrics = Arc::new(MetricsSubscriber::new());
103
104    let subs: Vec<Arc<dyn taskvisor::Subscribe>> =
105        vec![Arc::clone(&metrics) as Arc<dyn taskvisor::Subscribe>];
106    let sup = taskvisor::Supervisor::new(taskvisor::SupervisorConfig::default(), subs);
107
108    sup.run(vec![make_spec()]).await?;
109    metrics.print_stats();
110    Ok(())
111}