Skip to main content

dev_async/
lib.rs

1//! # dev-async
2//!
3//! Async-specific validation for Rust. Deadlocks, task leaks, hung
4//! futures, graceful shutdown. Part of the `dev-*` verification suite.
5//!
6//! Async Rust fails in subtle ways that synchronous unit tests can't
7//! catch: a future that never completes, a task that gets dropped
8//! without cleanup, a shutdown that hangs because one worker is stuck
9//! in a blocking call. `dev-async` provides primitives for catching
10//! these issues programmatically.
11//!
12//! ## Quick example
13//!
14//! Run a future with a hard timeout. If it doesn't finish in time, you
15//! get a `Fail` verdict, not a hang.
16//!
17//! ```no_run
18//! use dev_async::run_with_timeout;
19//! use std::time::Duration;
20//!
21//! # async fn example() {
22//! let _check = run_with_timeout(
23//!     "user_login",
24//!     Duration::from_secs(2),
25//!     async { do_login().await }
26//! ).await;
27//! # }
28//! # async fn do_login() {}
29//! ```
30//!
31//! ## Modules
32//!
33//! - [`deadlock`] — `try_lock_with_timeout` helpers.
34//! - [`tasks`] — `TrackedTaskGroup` for leak detection.
35//! - [`shutdown`] — `ShutdownProbe` for graceful-shutdown verification.
36//! - [`blocking`] (feature `block-detect`) — heuristic blocking-call
37//!   detection inside async tasks.
38
39#![cfg_attr(docsrs, feature(doc_cfg))]
40#![warn(missing_docs)]
41#![warn(rust_2018_idioms)]
42
43use std::future::Future;
44use std::time::{Duration, Instant};
45
46use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
47
48pub mod deadlock;
49pub mod shutdown;
50pub mod tasks;
51
52#[cfg(feature = "block-detect")]
53#[cfg_attr(docsrs, doc(cfg(feature = "block-detect")))]
54pub mod blocking;
55
56/// Run a future with a hard timeout. Produces a [`CheckResult`] tagged
57/// `async`.
58///
59/// If the future completes before the timeout, the verdict is `Pass`
60/// and the duration is recorded.
61///
62/// If the future does not complete in time, the verdict is `Fail` with
63/// severity `Error`. The future itself is dropped (cancelled) when the
64/// timeout expires.
65///
66/// The returned `CheckResult` carries numeric `Evidence` for
67/// `timeout_ms` and (on the pass path) `elapsed_ms`.
68///
69/// # Example
70///
71/// ```no_run
72/// use dev_async::run_with_timeout;
73/// use std::time::Duration;
74///
75/// # async fn ex() {
76/// let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
77/// assert!(check.has_tag("async"));
78/// # }
79/// ```
80pub async fn run_with_timeout<F, T>(
81    name: impl Into<String>,
82    timeout: Duration,
83    fut: F,
84) -> CheckResult
85where
86    F: Future<Output = T>,
87{
88    let name = name.into();
89    let started = Instant::now();
90    match tokio::time::timeout(timeout, fut).await {
91        Ok(_value) => {
92            let elapsed = started.elapsed();
93            let mut c = CheckResult::pass(format!("async::{name}"))
94                .with_duration_ms(elapsed.as_millis() as u64);
95            c.tags = vec!["async".to_string()];
96            c.evidence = vec![
97                Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64),
98                Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
99            ];
100            c
101        }
102        Err(_elapsed) => {
103            let mut c = CheckResult::fail(format!("async::{name}"), Severity::Error)
104                .with_detail(format!("future did not complete within {timeout:?}"));
105            c.tags = vec![
106                "async".to_string(),
107                "timeout".to_string(),
108                "regression".to_string(),
109            ];
110            c.evidence = vec![Evidence::numeric("timeout_ms", timeout.as_millis() as f64)];
111            c
112        }
113    }
114}
115
116/// Verify that all spawned tasks finish within the given timeout.
117///
118/// Pass a vector of `JoinHandle`s. Returns one [`CheckResult`] per task,
119/// each tagged `async` with numeric `Evidence` for the index and
120/// timeout / elapsed.
121///
122/// Verdicts:
123/// - Task completed -> `Pass`, with `elapsed_ms` evidence.
124/// - Task panicked or was cancelled -> `Fail (Critical)`, with
125///   `task_panicked` tag.
126/// - Task did not finish in time -> `Fail (Error)`, with `timeout` tag.
127pub async fn join_all_with_timeout<T>(
128    name: impl Into<String>,
129    timeout: Duration,
130    handles: Vec<tokio::task::JoinHandle<T>>,
131) -> Vec<CheckResult> {
132    let name = name.into();
133    let mut results = Vec::with_capacity(handles.len());
134    for (i, h) in handles.into_iter().enumerate() {
135        let task_name = format!("async::{name}::task{i}");
136        let started = Instant::now();
137        let evidence_base = vec![
138            Evidence::numeric("task_index", i as f64),
139            Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
140        ];
141        let result = match tokio::time::timeout(timeout, h).await {
142            Ok(Ok(_)) => {
143                let elapsed = started.elapsed();
144                let mut c =
145                    CheckResult::pass(task_name).with_duration_ms(elapsed.as_millis() as u64);
146                c.tags = vec!["async".to_string()];
147                c.evidence = {
148                    let mut e = evidence_base;
149                    e.push(Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64));
150                    e
151                };
152                c
153            }
154            Ok(Err(join_err)) => {
155                let mut c = CheckResult::fail(task_name, Severity::Critical)
156                    .with_detail(format!("task panicked or was cancelled: {join_err}"));
157                c.tags = vec![
158                    "async".to_string(),
159                    "task_panicked".to_string(),
160                    "regression".to_string(),
161                ];
162                c.evidence = evidence_base;
163                c
164            }
165            Err(_) => {
166                let mut c = CheckResult::fail(task_name, Severity::Error)
167                    .with_detail(format!("task did not complete within {timeout:?}"));
168                c.tags = vec![
169                    "async".to_string(),
170                    "timeout".to_string(),
171                    "regression".to_string(),
172                ];
173                c.evidence = evidence_base;
174                c
175            }
176        };
177        results.push(result);
178    }
179    results
180}
181
182/// A trait for any async harness that produces a verdict via a future.
183///
184/// `dev-report::Producer` is synchronous, which doesn't fit async
185/// harnesses. `AsyncCheck` is the async equivalent.
186///
187/// # Example
188///
189/// ```no_run
190/// use dev_async::AsyncCheck;
191/// use dev_report::CheckResult;
192/// use std::future::Future;
193/// use std::pin::Pin;
194///
195/// struct PingCheck;
196/// impl AsyncCheck for PingCheck {
197///     type Output = CheckResult;
198///     type Fut = Pin<Box<dyn Future<Output = CheckResult> + Send>>;
199///     fn run(self) -> Self::Fut {
200///         Box::pin(async move { CheckResult::pass("ping") })
201///     }
202/// }
203/// ```
204pub trait AsyncCheck {
205    /// Output of the check. Typically `CheckResult`.
206    type Output;
207    /// The future returned by `run`.
208    type Fut: Future<Output = Self::Output>;
209    /// Run the check.
210    fn run(self) -> Self::Fut;
211}
212
213/// An async producer that builds a `Report`.
214///
215/// `dev-report::Producer` is synchronous. `AsyncProducer` is the
216/// async equivalent that returns a future. Bridge to a sync
217/// `Producer` via [`BlockingAsyncProducer`].
218pub trait AsyncProducer {
219    /// The future returned by `produce`.
220    type Fut: Future<Output = Report>;
221    /// Run the producer and return a finalized [`Report`].
222    fn produce(self) -> Self::Fut;
223}
224
225/// Adapter that wraps an `async fn` returning a [`Report`] and
226/// implements `dev_report::Producer` by calling
227/// `tokio::runtime::Handle::current().block_on(...)`.
228///
229/// MUST be invoked from a sync context that *is not* itself running
230/// inside a `current_thread` runtime. Calling `block_on` from inside
231/// an async runtime would deadlock; if you need that, use
232/// [`AsyncProducer`] directly without going through `Producer`.
233///
234/// # Example
235///
236/// ```no_run
237/// use dev_async::{run_with_timeout, BlockingAsyncProducer};
238/// use dev_report::{Producer, Report};
239/// use std::time::Duration;
240///
241/// fn build_report() -> impl std::future::Future<Output = Report> {
242///     async {
243///         let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
244///         let mut r = Report::new("crate", "0.1.0").with_producer("dev-async");
245///         r.push(check);
246///         r.finish();
247///         r
248///     }
249/// }
250///
251/// // From a sync test or main:
252/// // let rt = tokio::runtime::Runtime::new().unwrap();
253/// // let handle = rt.handle().clone();
254/// // let producer = BlockingAsyncProducer::new(handle, build_report);
255/// // let report = producer.produce();
256/// ```
257pub struct BlockingAsyncProducer<F, Fut>
258where
259    F: Fn() -> Fut,
260    Fut: Future<Output = Report>,
261{
262    handle: tokio::runtime::Handle,
263    factory: F,
264}
265
266impl<F, Fut> BlockingAsyncProducer<F, Fut>
267where
268    F: Fn() -> Fut,
269    Fut: Future<Output = Report>,
270{
271    /// Build a new adapter bound to `handle`.
272    ///
273    /// `factory` is invoked once per `produce()` call and must return
274    /// a fresh future each time.
275    pub fn new(handle: tokio::runtime::Handle, factory: F) -> Self {
276        Self { handle, factory }
277    }
278}
279
280impl<F, Fut> Producer for BlockingAsyncProducer<F, Fut>
281where
282    F: Fn() -> Fut,
283    Fut: Future<Output = Report>,
284{
285    fn produce(&self) -> Report {
286        let fut = (self.factory)();
287        self.handle.block_on(fut)
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use dev_report::Verdict;
295
296    #[tokio::test]
297    async fn timeout_pass_fast_future() {
298        let check = run_with_timeout("fast", Duration::from_millis(500), async {}).await;
299        assert_eq!(check.verdict, Verdict::Pass);
300        assert!(check.has_tag("async"));
301        let labels: Vec<&str> = check.evidence.iter().map(|e| e.label.as_str()).collect();
302        assert!(labels.contains(&"elapsed_ms"));
303        assert!(labels.contains(&"timeout_ms"));
304    }
305
306    #[tokio::test]
307    async fn timeout_fail_slow_future() {
308        let check = run_with_timeout("slow", Duration::from_millis(10), async {
309            tokio::time::sleep(Duration::from_millis(200)).await;
310        })
311        .await;
312        assert_eq!(check.verdict, Verdict::Fail);
313        assert!(check.has_tag("timeout"));
314        assert!(check.has_tag("regression"));
315    }
316
317    #[tokio::test]
318    async fn join_all_basic() {
319        let h1 = tokio::spawn(async { 1 });
320        let h2 = tokio::spawn(async { 2 });
321        let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h1, h2]).await;
322        assert_eq!(results.len(), 2);
323        assert!(results.iter().all(|r| r.verdict == Verdict::Pass));
324        assert!(results.iter().all(|r| r.has_tag("async")));
325    }
326
327    #[tokio::test]
328    async fn join_all_panic_is_critical() {
329        let h = tokio::spawn(async { panic!("oops") });
330        let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h]).await;
331        assert_eq!(results.len(), 1);
332        assert_eq!(results[0].verdict, Verdict::Fail);
333        assert_eq!(results[0].severity, Some(Severity::Critical));
334        assert!(results[0].has_tag("task_panicked"));
335    }
336
337    #[tokio::test]
338    async fn join_all_timeout_is_error() {
339        let h = tokio::spawn(async {
340            tokio::time::sleep(Duration::from_millis(500)).await;
341        });
342        let results = join_all_with_timeout("g", Duration::from_millis(20), vec![h]).await;
343        assert_eq!(results[0].verdict, Verdict::Fail);
344        assert_eq!(results[0].severity, Some(Severity::Error));
345        assert!(results[0].has_tag("timeout"));
346    }
347
348    #[tokio::test]
349    async fn check_evidence_includes_timeout() {
350        let check = run_with_timeout("x", Duration::from_millis(50), async {}).await;
351        let timeout_evidence = check
352            .evidence
353            .iter()
354            .find(|e| e.label == "timeout_ms")
355            .expect("timeout_ms evidence present");
356        // The exact match isn't crucial; just ensure shape is numeric.
357        if let dev_report::EvidenceData::Numeric(n) = timeout_evidence.data {
358            assert_eq!(n, 50.0);
359        } else {
360            panic!("expected numeric");
361        }
362    }
363}