Skip to main content

dev_async/
tasks.rs

1//! Task tracking for leak detection.
2//!
3//! [`TrackedTaskGroup`] records every task spawned through it and
4//! reports any tasks that were still running when the group is
5//! dropped or finalized.
6
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use dev_report::{CheckResult, Evidence, Severity};
12use tokio::task::JoinHandle;
13
14/// A group of spawned tasks whose lifecycle is tracked.
15///
16/// When [`finalize`](TrackedTaskGroup::finalize) is called, the group
17/// joins each handle with the configured grace period and reports any
18/// tasks still running as leaks.
19///
20/// # Example
21///
22/// ```no_run
23/// use dev_async::tasks::TrackedTaskGroup;
24/// use std::time::Duration;
25///
26/// # async fn ex() {
27/// let mut group = TrackedTaskGroup::new("workers");
28/// group.spawn(async { tokio::time::sleep(Duration::from_millis(5)).await; });
29/// group.spawn(async { tokio::time::sleep(Duration::from_millis(5)).await; });
30///
31/// let check = group.finalize(Duration::from_millis(50)).await;
32/// assert!(check.has_tag("async"));
33/// # }
34/// ```
35pub struct TrackedTaskGroup {
36    name: String,
37    handles: Vec<JoinHandle<()>>,
38    spawned: Arc<AtomicUsize>,
39}
40
41impl TrackedTaskGroup {
42    /// Create a new group with a stable name.
43    pub fn new(name: impl Into<String>) -> Self {
44        Self {
45            name: name.into(),
46            handles: Vec::new(),
47            spawned: Arc::new(AtomicUsize::new(0)),
48        }
49    }
50
51    /// Spawn a future and track its handle.
52    ///
53    /// The future MUST resolve to `()`. If you need a result, capture
54    /// it via shared state (e.g. `tokio::sync::oneshot::Sender`).
55    pub fn spawn<F>(&mut self, fut: F)
56    where
57        F: std::future::Future<Output = ()> + Send + 'static,
58    {
59        self.spawned.fetch_add(1, Ordering::Relaxed);
60        self.handles.push(tokio::spawn(fut));
61    }
62
63    /// How many tasks have been spawned through this group so far.
64    pub fn spawned_count(&self) -> usize {
65        self.spawned.load(Ordering::Relaxed)
66    }
67
68    /// Join all tracked tasks with a per-task grace period and emit a
69    /// [`CheckResult`].
70    ///
71    /// Verdicts:
72    /// - All tasks completed cleanly -> `Pass`.
73    /// - One or more tasks panicked -> `Fail (Critical)` with
74    ///   `task_panicked` tag.
75    /// - One or more tasks did not finish in time (leak suspected) ->
76    ///   `Fail (Error)` with `task_leak` tag.
77    pub async fn finalize(self, grace: Duration) -> CheckResult {
78        let name = format!("async::tasks::{}", self.name);
79        let total = self.handles.len();
80        let mut completed = 0usize;
81        let mut panicked = 0usize;
82        let mut leaked = 0usize;
83
84        for h in self.handles {
85            match tokio::time::timeout(grace, h).await {
86                Ok(Ok(())) => completed += 1,
87                Ok(Err(_join_err)) => panicked += 1,
88                Err(_) => leaked += 1,
89            }
90        }
91
92        let evidence = vec![
93            Evidence::numeric("spawned", total as f64),
94            Evidence::numeric("completed", completed as f64),
95            Evidence::numeric("panicked", panicked as f64),
96            Evidence::numeric("leaked", leaked as f64),
97            Evidence::numeric("grace_ms", grace.as_millis() as f64),
98        ];
99
100        let detail = format!(
101            "spawned={} completed={} panicked={} leaked={}",
102            total, completed, panicked, leaked
103        );
104
105        if panicked > 0 {
106            let mut c = CheckResult::fail(name, Severity::Critical).with_detail(detail);
107            c.tags = vec![
108                "async".to_string(),
109                "tasks".to_string(),
110                "task_panicked".to_string(),
111                "regression".to_string(),
112            ];
113            c.evidence = evidence;
114            return c;
115        }
116
117        if leaked > 0 {
118            let mut c = CheckResult::fail(name, Severity::Error).with_detail(detail);
119            c.tags = vec![
120                "async".to_string(),
121                "tasks".to_string(),
122                "task_leak".to_string(),
123                "regression".to_string(),
124            ];
125            c.evidence = evidence;
126            return c;
127        }
128
129        let mut c = CheckResult::pass(name).with_detail(detail);
130        c.tags = vec!["async".to_string(), "tasks".to_string()];
131        c.evidence = evidence;
132        c
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use dev_report::Verdict;
140
141    #[tokio::test]
142    async fn empty_group_passes() {
143        let g = TrackedTaskGroup::new("empty");
144        let c = g.finalize(Duration::from_millis(10)).await;
145        assert_eq!(c.verdict, Verdict::Pass);
146    }
147
148    #[tokio::test]
149    async fn all_complete_passes() {
150        let mut g = TrackedTaskGroup::new("clean");
151        for _ in 0..3 {
152            g.spawn(async {
153                tokio::time::sleep(Duration::from_millis(2)).await;
154            });
155        }
156        let c = g.finalize(Duration::from_millis(100)).await;
157        assert_eq!(c.verdict, Verdict::Pass);
158        assert!(c.has_tag("tasks"));
159        assert!(!c.has_tag("regression"));
160    }
161
162    #[tokio::test]
163    async fn panic_yields_critical() {
164        let mut g = TrackedTaskGroup::new("panicky");
165        g.spawn(async {
166            panic!("oops");
167        });
168        let c = g.finalize(Duration::from_millis(50)).await;
169        assert_eq!(c.verdict, Verdict::Fail);
170        assert_eq!(c.severity, Some(Severity::Critical));
171        assert!(c.has_tag("task_panicked"));
172    }
173
174    #[tokio::test]
175    async fn leak_yields_error_with_task_leak_tag() {
176        let mut g = TrackedTaskGroup::new("leaky");
177        g.spawn(async {
178            tokio::time::sleep(Duration::from_millis(500)).await;
179        });
180        let c = g.finalize(Duration::from_millis(20)).await;
181        assert_eq!(c.verdict, Verdict::Fail);
182        assert_eq!(c.severity, Some(Severity::Error));
183        assert!(c.has_tag("task_leak"));
184    }
185
186    #[tokio::test]
187    async fn spawned_count_tracks_calls() {
188        let mut g = TrackedTaskGroup::new("count");
189        for _ in 0..4 {
190            g.spawn(async {});
191        }
192        assert_eq!(g.spawned_count(), 4);
193        let _ = g.finalize(Duration::from_millis(50)).await;
194    }
195}