Skip to main content

dev_async/
lib.rs

1//! # dev-async
2//!
3//! Async-specific validation for Rust. Deadlocks, task leaks, hung
4//! futures, graceful shutdown. Part of the `dev-*` verification suite.
5//!
6//! Async Rust fails in subtle ways that synchronous unit tests can't
7//! catch: a future that never completes, a task that gets dropped
8//! without cleanup, a shutdown that hangs because one worker is stuck
9//! in a blocking call. `dev-async` provides primitives for catching
10//! these issues programmatically.
11//!
12//! ## Quick example
13//!
14//! Run a future with a hard timeout. If it doesn't finish in time, you
15//! get a `Fail` verdict, not a hang.
16//!
17//! ```no_run
18//! use dev_async::run_with_timeout;
19//! use std::time::Duration;
20//!
21//! # async fn example() {
22//! let _check = run_with_timeout(
23//!     "user_login",
24//!     Duration::from_secs(2),
25//!     async { do_login().await }
26//! ).await;
27//! # }
28//! # async fn do_login() {}
29//! ```
30//!
31//! ## Modules
32//!
33//! - [`deadlock`] — `try_lock_with_timeout` helpers.
34//! - [`tasks`] — `TrackedTaskGroup` for leak detection.
35//! - [`shutdown`] — `ShutdownProbe` for graceful-shutdown verification.
36//! - [`cancellation_safety`] — `check_cancel_safe` for verifying that
37//!   futures dropped mid-poll leave observable state consistent.
38//! - `blocking` (feature `block-detect`) — heuristic blocking-call
39//!   detection inside async tasks (visible in rustdoc when the
40//!   feature is enabled).
41
42#![cfg_attr(docsrs, feature(doc_cfg))]
43#![warn(missing_docs)]
44#![warn(rust_2018_idioms)]
45
46use std::future::Future;
47use std::time::{Duration, Instant};
48
49use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
50
51pub mod cancellation_safety;
52pub mod deadlock;
53pub mod shutdown;
54pub mod tasks;
55
56#[cfg(feature = "block-detect")]
57#[cfg_attr(docsrs, doc(cfg(feature = "block-detect")))]
58pub mod blocking;
59
60/// Run a future with a hard timeout. Produces a [`CheckResult`] tagged
61/// `async`.
62///
63/// If the future completes before the timeout, the verdict is `Pass`
64/// and the duration is recorded.
65///
66/// If the future does not complete in time, the verdict is `Fail` with
67/// severity `Error`. The future itself is dropped (cancelled) when the
68/// timeout expires.
69///
70/// The returned `CheckResult` carries numeric `Evidence` for
71/// `timeout_ms` and (on the pass path) `elapsed_ms`.
72///
73/// # Example
74///
75/// ```no_run
76/// use dev_async::run_with_timeout;
77/// use std::time::Duration;
78///
79/// # async fn ex() {
80/// let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
81/// assert!(check.has_tag("async"));
82/// # }
83/// ```
84pub async fn run_with_timeout<F, T>(
85    name: impl Into<String>,
86    timeout: Duration,
87    fut: F,
88) -> CheckResult
89where
90    F: Future<Output = T>,
91{
92    let name = name.into();
93    let started = Instant::now();
94    match tokio::time::timeout(timeout, fut).await {
95        Ok(_value) => {
96            let elapsed = started.elapsed();
97            let mut c = CheckResult::pass(format!("async::{name}"))
98                .with_duration_ms(elapsed.as_millis() as u64);
99            c.tags = vec!["async".to_string()];
100            c.evidence = vec![
101                Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64),
102                Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
103            ];
104            c
105        }
106        Err(_elapsed) => {
107            let mut c = CheckResult::fail(format!("async::{name}"), Severity::Error)
108                .with_detail(format!("future did not complete within {timeout:?}"));
109            c.tags = vec![
110                "async".to_string(),
111                "timeout".to_string(),
112                "regression".to_string(),
113            ];
114            c.evidence = vec![Evidence::numeric("timeout_ms", timeout.as_millis() as f64)];
115            c
116        }
117    }
118}
119
120/// Verify that all spawned tasks finish within the given timeout.
121///
122/// Pass a vector of `JoinHandle`s. Returns one [`CheckResult`] per task,
123/// each tagged `async` with numeric `Evidence` for the index and
124/// timeout / elapsed.
125///
126/// Verdicts:
127/// - Task completed -> `Pass`, with `elapsed_ms` evidence.
128/// - Task panicked or was cancelled -> `Fail (Critical)`, with
129///   `task_panicked` tag.
130/// - Task did not finish in time -> `Fail (Error)`, with `timeout` tag.
131pub async fn join_all_with_timeout<T>(
132    name: impl Into<String>,
133    timeout: Duration,
134    handles: Vec<tokio::task::JoinHandle<T>>,
135) -> Vec<CheckResult> {
136    let name = name.into();
137    let mut results = Vec::with_capacity(handles.len());
138    for (i, h) in handles.into_iter().enumerate() {
139        let task_name = format!("async::{name}::task{i}");
140        let started = Instant::now();
141        let evidence_base = vec![
142            Evidence::numeric("task_index", i as f64),
143            Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
144        ];
145        let result = match tokio::time::timeout(timeout, h).await {
146            Ok(Ok(_)) => {
147                let elapsed = started.elapsed();
148                let mut c =
149                    CheckResult::pass(task_name).with_duration_ms(elapsed.as_millis() as u64);
150                c.tags = vec!["async".to_string()];
151                c.evidence = {
152                    let mut e = evidence_base;
153                    e.push(Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64));
154                    e
155                };
156                c
157            }
158            Ok(Err(join_err)) => {
159                let mut c = CheckResult::fail(task_name, Severity::Critical)
160                    .with_detail(format!("task panicked or was cancelled: {join_err}"));
161                c.tags = vec![
162                    "async".to_string(),
163                    "task_panicked".to_string(),
164                    "regression".to_string(),
165                ];
166                c.evidence = evidence_base;
167                c
168            }
169            Err(_) => {
170                let mut c = CheckResult::fail(task_name, Severity::Error)
171                    .with_detail(format!("task did not complete within {timeout:?}"));
172                c.tags = vec![
173                    "async".to_string(),
174                    "timeout".to_string(),
175                    "regression".to_string(),
176                ];
177                c.evidence = evidence_base;
178                c
179            }
180        };
181        results.push(result);
182    }
183    results
184}
185
186/// A trait for any async harness that produces a verdict via a future.
187///
188/// `dev-report::Producer` is synchronous, which doesn't fit async
189/// harnesses. `AsyncCheck` is the async equivalent.
190///
191/// # Example
192///
193/// ```no_run
194/// use dev_async::AsyncCheck;
195/// use dev_report::CheckResult;
196/// use std::future::Future;
197/// use std::pin::Pin;
198///
199/// struct PingCheck;
200/// impl AsyncCheck for PingCheck {
201///     type Output = CheckResult;
202///     type Fut = Pin<Box<dyn Future<Output = CheckResult> + Send>>;
203///     fn run(self) -> Self::Fut {
204///         Box::pin(async move { CheckResult::pass("ping") })
205///     }
206/// }
207/// ```
208pub trait AsyncCheck {
209    /// Output of the check. Typically `CheckResult`.
210    type Output;
211    /// The future returned by `run`.
212    type Fut: Future<Output = Self::Output>;
213    /// Run the check.
214    fn run(self) -> Self::Fut;
215}
216
217/// An async producer that builds a `Report`.
218///
219/// `dev-report::Producer` is synchronous. `AsyncProducer` is the
220/// async equivalent that returns a future. Bridge to a sync
221/// `Producer` via [`BlockingAsyncProducer`].
222pub trait AsyncProducer {
223    /// The future returned by `produce`.
224    type Fut: Future<Output = Report>;
225    /// Run the producer and return a finalized [`Report`].
226    fn produce(self) -> Self::Fut;
227}
228
229/// Adapter that wraps an `async fn` returning a [`Report`] and
230/// implements `dev_report::Producer` by calling
231/// `tokio::runtime::Handle::current().block_on(...)`.
232///
233/// MUST be invoked from a sync context that *is not* itself running
234/// inside a `current_thread` runtime. Calling `block_on` from inside
235/// an async runtime would deadlock; if you need that, use
236/// [`AsyncProducer`] directly without going through `Producer`.
237///
238/// # Example
239///
240/// ```no_run
241/// use dev_async::{run_with_timeout, BlockingAsyncProducer};
242/// use dev_report::{Producer, Report};
243/// use std::time::Duration;
244///
245/// fn build_report() -> impl std::future::Future<Output = Report> {
246///     async {
247///         let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
248///         let mut r = Report::new("crate", "0.1.0").with_producer("dev-async");
249///         r.push(check);
250///         r.finish();
251///         r
252///     }
253/// }
254///
255/// // From a sync test or main:
256/// // let rt = tokio::runtime::Runtime::new().unwrap();
257/// // let handle = rt.handle().clone();
258/// // let producer = BlockingAsyncProducer::new(handle, build_report);
259/// // let report = producer.produce();
260/// ```
261pub struct BlockingAsyncProducer<F, Fut>
262where
263    F: Fn() -> Fut,
264    Fut: Future<Output = Report>,
265{
266    handle: tokio::runtime::Handle,
267    /// Owned runtime, when constructed via `with_new_runtime` /
268    /// `with_current_thread_runtime`. `None` when borrowing an
269    /// externally-supplied handle. Kept alive for the lifetime of the
270    /// producer so `block_on` always has a valid runtime.
271    _owned_runtime: Option<tokio::runtime::Runtime>,
272    factory: F,
273}
274
275impl<F, Fut> BlockingAsyncProducer<F, Fut>
276where
277    F: Fn() -> Fut,
278    Fut: Future<Output = Report>,
279{
280    /// Build a new adapter bound to an externally-supplied `handle`.
281    ///
282    /// `factory` is invoked once per `produce()` call and must return
283    /// a fresh future each time.
284    ///
285    /// Use this when you already have a `tokio::runtime::Handle`
286    /// (e.g. from a long-lived runtime in your test harness). For the
287    /// common case of "I just want to drive an async producer from a
288    /// sync test", prefer [`with_new_runtime`](Self::with_new_runtime).
289    pub fn new(handle: tokio::runtime::Handle, factory: F) -> Self {
290        Self {
291            handle,
292            _owned_runtime: None,
293            factory,
294        }
295    }
296
297    /// Build a new adapter that owns a fresh multi-thread `tokio::runtime::Runtime`.
298    ///
299    /// The runtime lives for the lifetime of the producer and is
300    /// dropped (along with all its workers) when the producer is
301    /// dropped. Use this when you don't already have a runtime and
302    /// just want to drive an async producer from sync code.
303    ///
304    /// # Example
305    ///
306    /// ```no_run
307    /// use dev_async::{run_with_timeout, BlockingAsyncProducer};
308    /// use dev_report::{Producer, Report};
309    /// use std::time::Duration;
310    ///
311    /// let producer = BlockingAsyncProducer::with_new_runtime(|| async {
312    ///     let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
313    ///     let mut r = Report::new("crate", "0.1.0").with_producer("dev-async");
314    ///     r.push(check);
315    ///     r.finish();
316    ///     r
317    /// })
318    /// .expect("build runtime");
319    /// let _report = producer.produce();
320    /// ```
321    pub fn with_new_runtime(factory: F) -> std::io::Result<Self> {
322        let rt = tokio::runtime::Builder::new_multi_thread()
323            .enable_all()
324            .build()?;
325        let handle = rt.handle().clone();
326        Ok(Self {
327            handle,
328            _owned_runtime: Some(rt),
329            factory,
330        })
331    }
332
333    /// Build a new adapter that owns a fresh `current_thread`
334    /// `tokio::runtime::Runtime`.
335    ///
336    /// Lighter-weight than [`with_new_runtime`](Self::with_new_runtime):
337    /// no worker threads are spawned. Suitable for tests and
338    /// single-threaded harnesses.
339    ///
340    /// # Example
341    ///
342    /// ```no_run
343    /// use dev_async::BlockingAsyncProducer;
344    /// use dev_report::{Producer, Report};
345    ///
346    /// let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
347    ///     Report::new("c", "0.1.0").with_producer("dev-async")
348    /// })
349    /// .expect("build runtime");
350    /// let _r = producer.produce();
351    /// ```
352    pub fn with_current_thread_runtime(factory: F) -> std::io::Result<Self> {
353        let rt = tokio::runtime::Builder::new_current_thread()
354            .enable_all()
355            .build()?;
356        let handle = rt.handle().clone();
357        Ok(Self {
358            handle,
359            _owned_runtime: Some(rt),
360            factory,
361        })
362    }
363
364    /// Build a new adapter with a runtime configured by the caller.
365    ///
366    /// `configure` receives a `tokio::runtime::Builder` that the
367    /// caller can customize (worker thread count, thread name,
368    /// stack size, IO/time enablement, etc.) before it is built.
369    /// The resulting runtime is owned by the producer.
370    ///
371    /// # Example
372    ///
373    /// ```no_run
374    /// use dev_async::BlockingAsyncProducer;
375    /// use dev_report::{Producer, Report};
376    ///
377    /// let producer = BlockingAsyncProducer::with_runtime_builder(
378    ///     |b| {
379    ///         b.worker_threads(2);
380    ///         b.thread_name("my-async-test");
381    ///         b.enable_all();
382    ///         b
383    ///     },
384    ///     || async { Report::new("c", "0.1.0").with_producer("dev-async") },
385    /// )
386    /// .expect("build runtime");
387    /// let _r = producer.produce();
388    /// ```
389    pub fn with_runtime_builder<C>(configure: C, factory: F) -> std::io::Result<Self>
390    where
391        C: FnOnce(&mut tokio::runtime::Builder) -> &mut tokio::runtime::Builder,
392    {
393        let mut builder = tokio::runtime::Builder::new_multi_thread();
394        configure(&mut builder);
395        let rt = builder.build()?;
396        let handle = rt.handle().clone();
397        Ok(Self {
398            handle,
399            _owned_runtime: Some(rt),
400            factory,
401        })
402    }
403}
404
405impl<F, Fut> Producer for BlockingAsyncProducer<F, Fut>
406where
407    F: Fn() -> Fut,
408    Fut: Future<Output = Report>,
409{
410    fn produce(&self) -> Report {
411        let fut = (self.factory)();
412        self.handle.block_on(fut)
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use dev_report::Verdict;
420
421    #[tokio::test]
422    async fn timeout_pass_fast_future() {
423        let check = run_with_timeout("fast", Duration::from_millis(500), async {}).await;
424        assert_eq!(check.verdict, Verdict::Pass);
425        assert!(check.has_tag("async"));
426        let labels: Vec<&str> = check.evidence.iter().map(|e| e.label.as_str()).collect();
427        assert!(labels.contains(&"elapsed_ms"));
428        assert!(labels.contains(&"timeout_ms"));
429    }
430
431    #[tokio::test]
432    async fn timeout_fail_slow_future() {
433        let check = run_with_timeout("slow", Duration::from_millis(10), async {
434            tokio::time::sleep(Duration::from_millis(200)).await;
435        })
436        .await;
437        assert_eq!(check.verdict, Verdict::Fail);
438        assert!(check.has_tag("timeout"));
439        assert!(check.has_tag("regression"));
440    }
441
442    #[tokio::test]
443    async fn join_all_basic() {
444        let h1 = tokio::spawn(async { 1 });
445        let h2 = tokio::spawn(async { 2 });
446        let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h1, h2]).await;
447        assert_eq!(results.len(), 2);
448        assert!(results.iter().all(|r| r.verdict == Verdict::Pass));
449        assert!(results.iter().all(|r| r.has_tag("async")));
450    }
451
452    #[tokio::test]
453    async fn join_all_panic_is_critical() {
454        let h = tokio::spawn(async { panic!("oops") });
455        let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h]).await;
456        assert_eq!(results.len(), 1);
457        assert_eq!(results[0].verdict, Verdict::Fail);
458        assert_eq!(results[0].severity, Some(Severity::Critical));
459        assert!(results[0].has_tag("task_panicked"));
460    }
461
462    #[tokio::test]
463    async fn join_all_timeout_is_error() {
464        let h = tokio::spawn(async {
465            tokio::time::sleep(Duration::from_millis(500)).await;
466        });
467        let results = join_all_with_timeout("g", Duration::from_millis(20), vec![h]).await;
468        assert_eq!(results[0].verdict, Verdict::Fail);
469        assert_eq!(results[0].severity, Some(Severity::Error));
470        assert!(results[0].has_tag("timeout"));
471    }
472
473    #[tokio::test]
474    async fn check_evidence_includes_timeout() {
475        let check = run_with_timeout("x", Duration::from_millis(50), async {}).await;
476        let timeout_evidence = check
477            .evidence
478            .iter()
479            .find(|e| e.label == "timeout_ms")
480            .expect("timeout_ms evidence present");
481        // The exact match isn't crucial; just ensure shape is numeric.
482        if let dev_report::EvidenceData::Numeric(n) = timeout_evidence.data {
483            assert_eq!(n, 50.0);
484        } else {
485            panic!("expected numeric");
486        }
487    }
488
489    #[test]
490    fn blocking_async_producer_with_new_runtime() {
491        let producer = BlockingAsyncProducer::with_new_runtime(|| async {
492            let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
493            r.push(dev_report::CheckResult::pass("x"));
494            r.finish();
495            r
496        })
497        .expect("build runtime");
498        let report = producer.produce();
499        assert_eq!(report.checks.len(), 1);
500        assert_eq!(report.overall_verdict(), Verdict::Pass);
501    }
502
503    #[test]
504    fn blocking_async_producer_with_current_thread_runtime() {
505        let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
506            let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
507            r.push(dev_report::CheckResult::pass("y"));
508            r.finish();
509            r
510        })
511        .expect("build runtime");
512        let report = producer.produce();
513        assert_eq!(report.checks.len(), 1);
514    }
515
516    #[test]
517    fn blocking_async_producer_can_drive_run_with_timeout() {
518        let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
519            let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
520            let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
521            r.push(check);
522            r.finish();
523            r
524        })
525        .expect("build runtime");
526        let report = producer.produce();
527        assert!(matches!(report.overall_verdict(), Verdict::Pass));
528    }
529
530    #[test]
531    fn blocking_async_producer_with_runtime_builder() {
532        let producer = BlockingAsyncProducer::with_runtime_builder(
533            |b| {
534                b.worker_threads(1);
535                b.enable_all();
536                b
537            },
538            || async {
539                let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
540                r.push(dev_report::CheckResult::pass("custom-rt"));
541                r.finish();
542                r
543            },
544        )
545        .expect("build runtime");
546        let report = producer.produce();
547        assert_eq!(report.checks.len(), 1);
548    }
549}