Skip to main content

dev_async/
lib.rs

1//! # dev-async
2//!
3//! Async-specific validation for Rust. Deadlocks, task leaks, hung
4//! futures, graceful shutdown. Part of the `dev-*` verification suite.
5//!
6//! Async Rust fails in subtle ways that synchronous unit tests can't
7//! catch: a future that never completes, a task that gets dropped
8//! without cleanup, a shutdown that hangs because one worker is stuck
9//! in a blocking call. `dev-async` provides primitives for catching
10//! these issues programmatically.
11//!
12//! ## Quick example
13//!
14//! Run a future with a hard timeout. If it doesn't finish in time, you
15//! get a `Fail` verdict, not a hang.
16//!
17//! ```no_run
18//! use dev_async::run_with_timeout;
19//! use std::time::Duration;
20//!
21//! # async fn example() {
22//! let _check = run_with_timeout(
23//!     "user_login",
24//!     Duration::from_secs(2),
25//!     async { do_login().await }
26//! ).await;
27//! # }
28//! # async fn do_login() {}
29//! ```
30//!
31//! ## Modules
32//!
33//! - [`deadlock`] — `try_lock_with_timeout` helpers.
34//! - [`tasks`] — `TrackedTaskGroup` for leak detection.
35//! - [`shutdown`] — `ShutdownProbe` for graceful-shutdown verification.
36//! - `blocking` (feature `block-detect`) — heuristic blocking-call
37//!   detection inside async tasks (visible in rustdoc when the
38//!   feature is enabled).
39
40#![cfg_attr(docsrs, feature(doc_cfg))]
41#![warn(missing_docs)]
42#![warn(rust_2018_idioms)]
43
44use std::future::Future;
45use std::time::{Duration, Instant};
46
47use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
48
49pub mod deadlock;
50pub mod shutdown;
51pub mod tasks;
52
53#[cfg(feature = "block-detect")]
54#[cfg_attr(docsrs, doc(cfg(feature = "block-detect")))]
55pub mod blocking;
56
57/// Run a future with a hard timeout. Produces a [`CheckResult`] tagged
58/// `async`.
59///
60/// If the future completes before the timeout, the verdict is `Pass`
61/// and the duration is recorded.
62///
63/// If the future does not complete in time, the verdict is `Fail` with
64/// severity `Error`. The future itself is dropped (cancelled) when the
65/// timeout expires.
66///
67/// The returned `CheckResult` carries numeric `Evidence` for
68/// `timeout_ms` and (on the pass path) `elapsed_ms`.
69///
70/// # Example
71///
72/// ```no_run
73/// use dev_async::run_with_timeout;
74/// use std::time::Duration;
75///
76/// # async fn ex() {
77/// let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
78/// assert!(check.has_tag("async"));
79/// # }
80/// ```
81pub async fn run_with_timeout<F, T>(
82    name: impl Into<String>,
83    timeout: Duration,
84    fut: F,
85) -> CheckResult
86where
87    F: Future<Output = T>,
88{
89    let name = name.into();
90    let started = Instant::now();
91    match tokio::time::timeout(timeout, fut).await {
92        Ok(_value) => {
93            let elapsed = started.elapsed();
94            let mut c = CheckResult::pass(format!("async::{name}"))
95                .with_duration_ms(elapsed.as_millis() as u64);
96            c.tags = vec!["async".to_string()];
97            c.evidence = vec![
98                Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64),
99                Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
100            ];
101            c
102        }
103        Err(_elapsed) => {
104            let mut c = CheckResult::fail(format!("async::{name}"), Severity::Error)
105                .with_detail(format!("future did not complete within {timeout:?}"));
106            c.tags = vec![
107                "async".to_string(),
108                "timeout".to_string(),
109                "regression".to_string(),
110            ];
111            c.evidence = vec![Evidence::numeric("timeout_ms", timeout.as_millis() as f64)];
112            c
113        }
114    }
115}
116
117/// Verify that all spawned tasks finish within the given timeout.
118///
119/// Pass a vector of `JoinHandle`s. Returns one [`CheckResult`] per task,
120/// each tagged `async` with numeric `Evidence` for the index and
121/// timeout / elapsed.
122///
123/// Verdicts:
124/// - Task completed -> `Pass`, with `elapsed_ms` evidence.
125/// - Task panicked or was cancelled -> `Fail (Critical)`, with
126///   `task_panicked` tag.
127/// - Task did not finish in time -> `Fail (Error)`, with `timeout` tag.
128pub async fn join_all_with_timeout<T>(
129    name: impl Into<String>,
130    timeout: Duration,
131    handles: Vec<tokio::task::JoinHandle<T>>,
132) -> Vec<CheckResult> {
133    let name = name.into();
134    let mut results = Vec::with_capacity(handles.len());
135    for (i, h) in handles.into_iter().enumerate() {
136        let task_name = format!("async::{name}::task{i}");
137        let started = Instant::now();
138        let evidence_base = vec![
139            Evidence::numeric("task_index", i as f64),
140            Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
141        ];
142        let result = match tokio::time::timeout(timeout, h).await {
143            Ok(Ok(_)) => {
144                let elapsed = started.elapsed();
145                let mut c =
146                    CheckResult::pass(task_name).with_duration_ms(elapsed.as_millis() as u64);
147                c.tags = vec!["async".to_string()];
148                c.evidence = {
149                    let mut e = evidence_base;
150                    e.push(Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64));
151                    e
152                };
153                c
154            }
155            Ok(Err(join_err)) => {
156                let mut c = CheckResult::fail(task_name, Severity::Critical)
157                    .with_detail(format!("task panicked or was cancelled: {join_err}"));
158                c.tags = vec![
159                    "async".to_string(),
160                    "task_panicked".to_string(),
161                    "regression".to_string(),
162                ];
163                c.evidence = evidence_base;
164                c
165            }
166            Err(_) => {
167                let mut c = CheckResult::fail(task_name, Severity::Error)
168                    .with_detail(format!("task did not complete within {timeout:?}"));
169                c.tags = vec![
170                    "async".to_string(),
171                    "timeout".to_string(),
172                    "regression".to_string(),
173                ];
174                c.evidence = evidence_base;
175                c
176            }
177        };
178        results.push(result);
179    }
180    results
181}
182
183/// A trait for any async harness that produces a verdict via a future.
184///
185/// `dev-report::Producer` is synchronous, which doesn't fit async
186/// harnesses. `AsyncCheck` is the async equivalent.
187///
188/// # Example
189///
190/// ```no_run
191/// use dev_async::AsyncCheck;
192/// use dev_report::CheckResult;
193/// use std::future::Future;
194/// use std::pin::Pin;
195///
196/// struct PingCheck;
197/// impl AsyncCheck for PingCheck {
198///     type Output = CheckResult;
199///     type Fut = Pin<Box<dyn Future<Output = CheckResult> + Send>>;
200///     fn run(self) -> Self::Fut {
201///         Box::pin(async move { CheckResult::pass("ping") })
202///     }
203/// }
204/// ```
205pub trait AsyncCheck {
206    /// Output of the check. Typically `CheckResult`.
207    type Output;
208    /// The future returned by `run`.
209    type Fut: Future<Output = Self::Output>;
210    /// Run the check.
211    fn run(self) -> Self::Fut;
212}
213
214/// An async producer that builds a `Report`.
215///
216/// `dev-report::Producer` is synchronous. `AsyncProducer` is the
217/// async equivalent that returns a future. Bridge to a sync
218/// `Producer` via [`BlockingAsyncProducer`].
219pub trait AsyncProducer {
220    /// The future returned by `produce`.
221    type Fut: Future<Output = Report>;
222    /// Run the producer and return a finalized [`Report`].
223    fn produce(self) -> Self::Fut;
224}
225
226/// Adapter that wraps an `async fn` returning a [`Report`] and
227/// implements `dev_report::Producer` by calling
228/// `tokio::runtime::Handle::current().block_on(...)`.
229///
230/// MUST be invoked from a sync context that *is not* itself running
231/// inside a `current_thread` runtime. Calling `block_on` from inside
232/// an async runtime would deadlock; if you need that, use
233/// [`AsyncProducer`] directly without going through `Producer`.
234///
235/// # Example
236///
237/// ```no_run
238/// use dev_async::{run_with_timeout, BlockingAsyncProducer};
239/// use dev_report::{Producer, Report};
240/// use std::time::Duration;
241///
242/// fn build_report() -> impl std::future::Future<Output = Report> {
243///     async {
244///         let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
245///         let mut r = Report::new("crate", "0.1.0").with_producer("dev-async");
246///         r.push(check);
247///         r.finish();
248///         r
249///     }
250/// }
251///
252/// // From a sync test or main:
253/// // let rt = tokio::runtime::Runtime::new().unwrap();
254/// // let handle = rt.handle().clone();
255/// // let producer = BlockingAsyncProducer::new(handle, build_report);
256/// // let report = producer.produce();
257/// ```
258pub struct BlockingAsyncProducer<F, Fut>
259where
260    F: Fn() -> Fut,
261    Fut: Future<Output = Report>,
262{
263    handle: tokio::runtime::Handle,
264    /// Owned runtime, when constructed via `with_new_runtime` /
265    /// `with_current_thread_runtime`. `None` when borrowing an
266    /// externally-supplied handle. Kept alive for the lifetime of the
267    /// producer so `block_on` always has a valid runtime.
268    _owned_runtime: Option<tokio::runtime::Runtime>,
269    factory: F,
270}
271
272impl<F, Fut> BlockingAsyncProducer<F, Fut>
273where
274    F: Fn() -> Fut,
275    Fut: Future<Output = Report>,
276{
277    /// Build a new adapter bound to an externally-supplied `handle`.
278    ///
279    /// `factory` is invoked once per `produce()` call and must return
280    /// a fresh future each time.
281    ///
282    /// Use this when you already have a `tokio::runtime::Handle`
283    /// (e.g. from a long-lived runtime in your test harness). For the
284    /// common case of "I just want to drive an async producer from a
285    /// sync test", prefer [`with_new_runtime`](Self::with_new_runtime).
286    pub fn new(handle: tokio::runtime::Handle, factory: F) -> Self {
287        Self {
288            handle,
289            _owned_runtime: None,
290            factory,
291        }
292    }
293
294    /// Build a new adapter that owns a fresh multi-thread `tokio::runtime::Runtime`.
295    ///
296    /// The runtime lives for the lifetime of the producer and is
297    /// dropped (along with all its workers) when the producer is
298    /// dropped. Use this when you don't already have a runtime and
299    /// just want to drive an async producer from sync code.
300    ///
301    /// # Example
302    ///
303    /// ```no_run
304    /// use dev_async::{run_with_timeout, BlockingAsyncProducer};
305    /// use dev_report::{Producer, Report};
306    /// use std::time::Duration;
307    ///
308    /// let producer = BlockingAsyncProducer::with_new_runtime(|| async {
309    ///     let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
310    ///     let mut r = Report::new("crate", "0.1.0").with_producer("dev-async");
311    ///     r.push(check);
312    ///     r.finish();
313    ///     r
314    /// })
315    /// .expect("build runtime");
316    /// let _report = producer.produce();
317    /// ```
318    pub fn with_new_runtime(factory: F) -> std::io::Result<Self> {
319        let rt = tokio::runtime::Builder::new_multi_thread()
320            .enable_all()
321            .build()?;
322        let handle = rt.handle().clone();
323        Ok(Self {
324            handle,
325            _owned_runtime: Some(rt),
326            factory,
327        })
328    }
329
330    /// Build a new adapter that owns a fresh `current_thread`
331    /// `tokio::runtime::Runtime`.
332    ///
333    /// Lighter-weight than [`with_new_runtime`](Self::with_new_runtime):
334    /// no worker threads are spawned. Suitable for tests and
335    /// single-threaded harnesses.
336    ///
337    /// # Example
338    ///
339    /// ```no_run
340    /// use dev_async::BlockingAsyncProducer;
341    /// use dev_report::{Producer, Report};
342    ///
343    /// let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
344    ///     Report::new("c", "0.1.0").with_producer("dev-async")
345    /// })
346    /// .expect("build runtime");
347    /// let _r = producer.produce();
348    /// ```
349    pub fn with_current_thread_runtime(factory: F) -> std::io::Result<Self> {
350        let rt = tokio::runtime::Builder::new_current_thread()
351            .enable_all()
352            .build()?;
353        let handle = rt.handle().clone();
354        Ok(Self {
355            handle,
356            _owned_runtime: Some(rt),
357            factory,
358        })
359    }
360}
361
362impl<F, Fut> Producer for BlockingAsyncProducer<F, Fut>
363where
364    F: Fn() -> Fut,
365    Fut: Future<Output = Report>,
366{
367    fn produce(&self) -> Report {
368        let fut = (self.factory)();
369        self.handle.block_on(fut)
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use dev_report::Verdict;
377
378    #[tokio::test]
379    async fn timeout_pass_fast_future() {
380        let check = run_with_timeout("fast", Duration::from_millis(500), async {}).await;
381        assert_eq!(check.verdict, Verdict::Pass);
382        assert!(check.has_tag("async"));
383        let labels: Vec<&str> = check.evidence.iter().map(|e| e.label.as_str()).collect();
384        assert!(labels.contains(&"elapsed_ms"));
385        assert!(labels.contains(&"timeout_ms"));
386    }
387
388    #[tokio::test]
389    async fn timeout_fail_slow_future() {
390        let check = run_with_timeout("slow", Duration::from_millis(10), async {
391            tokio::time::sleep(Duration::from_millis(200)).await;
392        })
393        .await;
394        assert_eq!(check.verdict, Verdict::Fail);
395        assert!(check.has_tag("timeout"));
396        assert!(check.has_tag("regression"));
397    }
398
399    #[tokio::test]
400    async fn join_all_basic() {
401        let h1 = tokio::spawn(async { 1 });
402        let h2 = tokio::spawn(async { 2 });
403        let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h1, h2]).await;
404        assert_eq!(results.len(), 2);
405        assert!(results.iter().all(|r| r.verdict == Verdict::Pass));
406        assert!(results.iter().all(|r| r.has_tag("async")));
407    }
408
409    #[tokio::test]
410    async fn join_all_panic_is_critical() {
411        let h = tokio::spawn(async { panic!("oops") });
412        let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h]).await;
413        assert_eq!(results.len(), 1);
414        assert_eq!(results[0].verdict, Verdict::Fail);
415        assert_eq!(results[0].severity, Some(Severity::Critical));
416        assert!(results[0].has_tag("task_panicked"));
417    }
418
419    #[tokio::test]
420    async fn join_all_timeout_is_error() {
421        let h = tokio::spawn(async {
422            tokio::time::sleep(Duration::from_millis(500)).await;
423        });
424        let results = join_all_with_timeout("g", Duration::from_millis(20), vec![h]).await;
425        assert_eq!(results[0].verdict, Verdict::Fail);
426        assert_eq!(results[0].severity, Some(Severity::Error));
427        assert!(results[0].has_tag("timeout"));
428    }
429
430    #[tokio::test]
431    async fn check_evidence_includes_timeout() {
432        let check = run_with_timeout("x", Duration::from_millis(50), async {}).await;
433        let timeout_evidence = check
434            .evidence
435            .iter()
436            .find(|e| e.label == "timeout_ms")
437            .expect("timeout_ms evidence present");
438        // The exact match isn't crucial; just ensure shape is numeric.
439        if let dev_report::EvidenceData::Numeric(n) = timeout_evidence.data {
440            assert_eq!(n, 50.0);
441        } else {
442            panic!("expected numeric");
443        }
444    }
445
446    #[test]
447    fn blocking_async_producer_with_new_runtime() {
448        let producer = BlockingAsyncProducer::with_new_runtime(|| async {
449            let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
450            r.push(dev_report::CheckResult::pass("x"));
451            r.finish();
452            r
453        })
454        .expect("build runtime");
455        let report = producer.produce();
456        assert_eq!(report.checks.len(), 1);
457        assert_eq!(report.overall_verdict(), Verdict::Pass);
458    }
459
460    #[test]
461    fn blocking_async_producer_with_current_thread_runtime() {
462        let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
463            let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
464            r.push(dev_report::CheckResult::pass("y"));
465            r.finish();
466            r
467        })
468        .expect("build runtime");
469        let report = producer.produce();
470        assert_eq!(report.checks.len(), 1);
471    }
472
473    #[test]
474    fn blocking_async_producer_can_drive_run_with_timeout() {
475        let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
476            let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
477            let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
478            r.push(check);
479            r.finish();
480            r
481        })
482        .expect("build runtime");
483        let report = producer.produce();
484        assert!(matches!(report.overall_verdict(), Verdict::Pass));
485    }
486}