dev_async/lib.rs
1//! # dev-async
2//!
3//! Async-specific validation for Rust. Deadlocks, task leaks, hung
4//! futures, graceful shutdown. Part of the `dev-*` verification suite.
5//!
6//! Async Rust fails in subtle ways that synchronous unit tests can't
7//! catch: a future that never completes, a task that gets dropped
8//! without cleanup, a shutdown that hangs because one worker is stuck
9//! in a blocking call. `dev-async` provides primitives for catching
10//! these issues programmatically.
11//!
12//! ## Quick example
13//!
14//! Run a future with a hard timeout. If it doesn't finish in time, you
15//! get a `Fail` verdict, not a hang.
16//!
17//! ```no_run
18//! use dev_async::run_with_timeout;
19//! use std::time::Duration;
20//!
21//! # async fn example() {
22//! let _check = run_with_timeout(
23//! "user_login",
24//! Duration::from_secs(2),
25//! async { do_login().await }
26//! ).await;
27//! # }
28//! # async fn do_login() {}
29//! ```
30//!
31//! ## Modules
32//!
33//! - [`deadlock`] — `try_lock_with_timeout` helpers.
34//! - [`tasks`] — `TrackedTaskGroup` for leak detection.
35//! - [`shutdown`] — `ShutdownProbe` for graceful-shutdown verification.
36//! - [`cancellation_safety`] — `check_cancel_safe` for verifying that
37//! futures dropped mid-poll leave observable state consistent.
38//! - `blocking` (feature `block-detect`) — heuristic blocking-call
39//! detection inside async tasks (visible in rustdoc when the
40//! feature is enabled).
41
42#![cfg_attr(docsrs, feature(doc_cfg))]
43#![warn(missing_docs)]
44#![warn(rust_2018_idioms)]
45
46use std::future::Future;
47use std::time::{Duration, Instant};
48
49use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
50
51pub mod cancellation_safety;
52pub mod deadlock;
53pub mod shutdown;
54pub mod tasks;
55
56#[cfg(feature = "block-detect")]
57#[cfg_attr(docsrs, doc(cfg(feature = "block-detect")))]
58pub mod blocking;
59
60/// Run a future with a hard timeout. Produces a [`CheckResult`] tagged
61/// `async`.
62///
63/// If the future completes before the timeout, the verdict is `Pass`
64/// and the duration is recorded.
65///
66/// If the future does not complete in time, the verdict is `Fail` with
67/// severity `Error`. The future itself is dropped (cancelled) when the
68/// timeout expires.
69///
70/// The returned `CheckResult` carries numeric `Evidence` for
71/// `timeout_ms` and (on the pass path) `elapsed_ms`.
72///
73/// # Example
74///
75/// ```no_run
76/// use dev_async::run_with_timeout;
77/// use std::time::Duration;
78///
79/// # async fn ex() {
80/// let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
81/// assert!(check.has_tag("async"));
82/// # }
83/// ```
84pub async fn run_with_timeout<F, T>(
85 name: impl Into<String>,
86 timeout: Duration,
87 fut: F,
88) -> CheckResult
89where
90 F: Future<Output = T>,
91{
92 let name = name.into();
93 let started = Instant::now();
94 match tokio::time::timeout(timeout, fut).await {
95 Ok(_value) => {
96 let elapsed = started.elapsed();
97 let mut c = CheckResult::pass(format!("async::{name}"))
98 .with_duration_ms(elapsed.as_millis() as u64);
99 c.tags = vec!["async".to_string()];
100 c.evidence = vec![
101 Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64),
102 Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
103 ];
104 c
105 }
106 Err(_elapsed) => {
107 let mut c = CheckResult::fail(format!("async::{name}"), Severity::Error)
108 .with_detail(format!("future did not complete within {timeout:?}"));
109 c.tags = vec![
110 "async".to_string(),
111 "timeout".to_string(),
112 "regression".to_string(),
113 ];
114 c.evidence = vec![Evidence::numeric("timeout_ms", timeout.as_millis() as f64)];
115 c
116 }
117 }
118}
119
120/// Verify that all spawned tasks finish within the given timeout.
121///
122/// Pass a vector of `JoinHandle`s. Returns one [`CheckResult`] per task,
123/// each tagged `async` with numeric `Evidence` for the index and
124/// timeout / elapsed.
125///
126/// Verdicts:
127/// - Task completed -> `Pass`, with `elapsed_ms` evidence.
128/// - Task panicked or was cancelled -> `Fail (Critical)`, with
129/// `task_panicked` tag.
130/// - Task did not finish in time -> `Fail (Error)`, with `timeout` tag.
131pub async fn join_all_with_timeout<T>(
132 name: impl Into<String>,
133 timeout: Duration,
134 handles: Vec<tokio::task::JoinHandle<T>>,
135) -> Vec<CheckResult> {
136 let name = name.into();
137 let mut results = Vec::with_capacity(handles.len());
138 for (i, h) in handles.into_iter().enumerate() {
139 let task_name = format!("async::{name}::task{i}");
140 let started = Instant::now();
141 let evidence_base = vec![
142 Evidence::numeric("task_index", i as f64),
143 Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
144 ];
145 let result = match tokio::time::timeout(timeout, h).await {
146 Ok(Ok(_)) => {
147 let elapsed = started.elapsed();
148 let mut c =
149 CheckResult::pass(task_name).with_duration_ms(elapsed.as_millis() as u64);
150 c.tags = vec!["async".to_string()];
151 c.evidence = {
152 let mut e = evidence_base;
153 e.push(Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64));
154 e
155 };
156 c
157 }
158 Ok(Err(join_err)) => {
159 let mut c = CheckResult::fail(task_name, Severity::Critical)
160 .with_detail(format!("task panicked or was cancelled: {join_err}"));
161 c.tags = vec![
162 "async".to_string(),
163 "task_panicked".to_string(),
164 "regression".to_string(),
165 ];
166 c.evidence = evidence_base;
167 c
168 }
169 Err(_) => {
170 let mut c = CheckResult::fail(task_name, Severity::Error)
171 .with_detail(format!("task did not complete within {timeout:?}"));
172 c.tags = vec![
173 "async".to_string(),
174 "timeout".to_string(),
175 "regression".to_string(),
176 ];
177 c.evidence = evidence_base;
178 c
179 }
180 };
181 results.push(result);
182 }
183 results
184}
185
186/// A trait for any async harness that produces a verdict via a future.
187///
188/// `dev-report::Producer` is synchronous, which doesn't fit async
189/// harnesses. `AsyncCheck` is the async equivalent.
190///
191/// # Example
192///
193/// ```no_run
194/// use dev_async::AsyncCheck;
195/// use dev_report::CheckResult;
196/// use std::future::Future;
197/// use std::pin::Pin;
198///
199/// struct PingCheck;
200/// impl AsyncCheck for PingCheck {
201/// type Output = CheckResult;
202/// type Fut = Pin<Box<dyn Future<Output = CheckResult> + Send>>;
203/// fn run(self) -> Self::Fut {
204/// Box::pin(async move { CheckResult::pass("ping") })
205/// }
206/// }
207/// ```
208pub trait AsyncCheck {
209 /// Output of the check. Typically `CheckResult`.
210 type Output;
211 /// The future returned by `run`.
212 type Fut: Future<Output = Self::Output>;
213 /// Run the check.
214 fn run(self) -> Self::Fut;
215}
216
217/// An async producer that builds a `Report`.
218///
219/// `dev-report::Producer` is synchronous. `AsyncProducer` is the
220/// async equivalent that returns a future. Bridge to a sync
221/// `Producer` via [`BlockingAsyncProducer`].
222pub trait AsyncProducer {
223 /// The future returned by `produce`.
224 type Fut: Future<Output = Report>;
225 /// Run the producer and return a finalized [`Report`].
226 fn produce(self) -> Self::Fut;
227}
228
229/// Adapter that wraps an `async fn` returning a [`Report`] and
230/// implements `dev_report::Producer` by calling
231/// `tokio::runtime::Handle::current().block_on(...)`.
232///
233/// MUST be invoked from a sync context that *is not* itself running
234/// inside a `current_thread` runtime. Calling `block_on` from inside
235/// an async runtime would deadlock; if you need that, use
236/// [`AsyncProducer`] directly without going through `Producer`.
237///
238/// # Example
239///
240/// ```no_run
241/// use dev_async::{run_with_timeout, BlockingAsyncProducer};
242/// use dev_report::{Producer, Report};
243/// use std::time::Duration;
244///
245/// fn build_report() -> impl std::future::Future<Output = Report> {
246/// async {
247/// let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
248/// let mut r = Report::new("crate", "0.1.0").with_producer("dev-async");
249/// r.push(check);
250/// r.finish();
251/// r
252/// }
253/// }
254///
255/// // From a sync test or main:
256/// // let rt = tokio::runtime::Runtime::new().unwrap();
257/// // let handle = rt.handle().clone();
258/// // let producer = BlockingAsyncProducer::new(handle, build_report);
259/// // let report = producer.produce();
260/// ```
261pub struct BlockingAsyncProducer<F, Fut>
262where
263 F: Fn() -> Fut,
264 Fut: Future<Output = Report>,
265{
266 handle: tokio::runtime::Handle,
267 /// Owned runtime, when constructed via `with_new_runtime` /
268 /// `with_current_thread_runtime`. `None` when borrowing an
269 /// externally-supplied handle. Kept alive for the lifetime of the
270 /// producer so `block_on` always has a valid runtime.
271 _owned_runtime: Option<tokio::runtime::Runtime>,
272 factory: F,
273}
274
275impl<F, Fut> BlockingAsyncProducer<F, Fut>
276where
277 F: Fn() -> Fut,
278 Fut: Future<Output = Report>,
279{
280 /// Build a new adapter bound to an externally-supplied `handle`.
281 ///
282 /// `factory` is invoked once per `produce()` call and must return
283 /// a fresh future each time.
284 ///
285 /// Use this when you already have a `tokio::runtime::Handle`
286 /// (e.g. from a long-lived runtime in your test harness). For the
287 /// common case of "I just want to drive an async producer from a
288 /// sync test", prefer [`with_new_runtime`](Self::with_new_runtime).
289 pub fn new(handle: tokio::runtime::Handle, factory: F) -> Self {
290 Self {
291 handle,
292 _owned_runtime: None,
293 factory,
294 }
295 }
296
297 /// Build a new adapter that owns a fresh multi-thread `tokio::runtime::Runtime`.
298 ///
299 /// The runtime lives for the lifetime of the producer and is
300 /// dropped (along with all its workers) when the producer is
301 /// dropped. Use this when you don't already have a runtime and
302 /// just want to drive an async producer from sync code.
303 ///
304 /// # Example
305 ///
306 /// ```no_run
307 /// use dev_async::{run_with_timeout, BlockingAsyncProducer};
308 /// use dev_report::{Producer, Report};
309 /// use std::time::Duration;
310 ///
311 /// let producer = BlockingAsyncProducer::with_new_runtime(|| async {
312 /// let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
313 /// let mut r = Report::new("crate", "0.1.0").with_producer("dev-async");
314 /// r.push(check);
315 /// r.finish();
316 /// r
317 /// })
318 /// .expect("build runtime");
319 /// let _report = producer.produce();
320 /// ```
321 pub fn with_new_runtime(factory: F) -> std::io::Result<Self> {
322 let rt = tokio::runtime::Builder::new_multi_thread()
323 .enable_all()
324 .build()?;
325 let handle = rt.handle().clone();
326 Ok(Self {
327 handle,
328 _owned_runtime: Some(rt),
329 factory,
330 })
331 }
332
333 /// Build a new adapter that owns a fresh `current_thread`
334 /// `tokio::runtime::Runtime`.
335 ///
336 /// Lighter-weight than [`with_new_runtime`](Self::with_new_runtime):
337 /// no worker threads are spawned. Suitable for tests and
338 /// single-threaded harnesses.
339 ///
340 /// # Example
341 ///
342 /// ```no_run
343 /// use dev_async::BlockingAsyncProducer;
344 /// use dev_report::{Producer, Report};
345 ///
346 /// let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
347 /// Report::new("c", "0.1.0").with_producer("dev-async")
348 /// })
349 /// .expect("build runtime");
350 /// let _r = producer.produce();
351 /// ```
352 pub fn with_current_thread_runtime(factory: F) -> std::io::Result<Self> {
353 let rt = tokio::runtime::Builder::new_current_thread()
354 .enable_all()
355 .build()?;
356 let handle = rt.handle().clone();
357 Ok(Self {
358 handle,
359 _owned_runtime: Some(rt),
360 factory,
361 })
362 }
363
364 /// Build a new adapter with a runtime configured by the caller.
365 ///
366 /// `configure` receives a `tokio::runtime::Builder` that the
367 /// caller can customize (worker thread count, thread name,
368 /// stack size, IO/time enablement, etc.) before it is built.
369 /// The resulting runtime is owned by the producer.
370 ///
371 /// # Example
372 ///
373 /// ```no_run
374 /// use dev_async::BlockingAsyncProducer;
375 /// use dev_report::{Producer, Report};
376 ///
377 /// let producer = BlockingAsyncProducer::with_runtime_builder(
378 /// |b| {
379 /// b.worker_threads(2);
380 /// b.thread_name("my-async-test");
381 /// b.enable_all();
382 /// b
383 /// },
384 /// || async { Report::new("c", "0.1.0").with_producer("dev-async") },
385 /// )
386 /// .expect("build runtime");
387 /// let _r = producer.produce();
388 /// ```
389 pub fn with_runtime_builder<C>(configure: C, factory: F) -> std::io::Result<Self>
390 where
391 C: FnOnce(&mut tokio::runtime::Builder) -> &mut tokio::runtime::Builder,
392 {
393 let mut builder = tokio::runtime::Builder::new_multi_thread();
394 configure(&mut builder);
395 let rt = builder.build()?;
396 let handle = rt.handle().clone();
397 Ok(Self {
398 handle,
399 _owned_runtime: Some(rt),
400 factory,
401 })
402 }
403}
404
405impl<F, Fut> Producer for BlockingAsyncProducer<F, Fut>
406where
407 F: Fn() -> Fut,
408 Fut: Future<Output = Report>,
409{
410 fn produce(&self) -> Report {
411 let fut = (self.factory)();
412 self.handle.block_on(fut)
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use dev_report::Verdict;
420
421 #[tokio::test]
422 async fn timeout_pass_fast_future() {
423 let check = run_with_timeout("fast", Duration::from_millis(500), async {}).await;
424 assert_eq!(check.verdict, Verdict::Pass);
425 assert!(check.has_tag("async"));
426 let labels: Vec<&str> = check.evidence.iter().map(|e| e.label.as_str()).collect();
427 assert!(labels.contains(&"elapsed_ms"));
428 assert!(labels.contains(&"timeout_ms"));
429 }
430
431 #[tokio::test]
432 async fn timeout_fail_slow_future() {
433 let check = run_with_timeout("slow", Duration::from_millis(10), async {
434 tokio::time::sleep(Duration::from_millis(200)).await;
435 })
436 .await;
437 assert_eq!(check.verdict, Verdict::Fail);
438 assert!(check.has_tag("timeout"));
439 assert!(check.has_tag("regression"));
440 }
441
442 #[tokio::test]
443 async fn join_all_basic() {
444 let h1 = tokio::spawn(async { 1 });
445 let h2 = tokio::spawn(async { 2 });
446 let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h1, h2]).await;
447 assert_eq!(results.len(), 2);
448 assert!(results.iter().all(|r| r.verdict == Verdict::Pass));
449 assert!(results.iter().all(|r| r.has_tag("async")));
450 }
451
452 #[tokio::test]
453 async fn join_all_panic_is_critical() {
454 let h = tokio::spawn(async { panic!("oops") });
455 let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h]).await;
456 assert_eq!(results.len(), 1);
457 assert_eq!(results[0].verdict, Verdict::Fail);
458 assert_eq!(results[0].severity, Some(Severity::Critical));
459 assert!(results[0].has_tag("task_panicked"));
460 }
461
462 #[tokio::test]
463 async fn join_all_timeout_is_error() {
464 let h = tokio::spawn(async {
465 tokio::time::sleep(Duration::from_millis(500)).await;
466 });
467 let results = join_all_with_timeout("g", Duration::from_millis(20), vec![h]).await;
468 assert_eq!(results[0].verdict, Verdict::Fail);
469 assert_eq!(results[0].severity, Some(Severity::Error));
470 assert!(results[0].has_tag("timeout"));
471 }
472
473 #[tokio::test]
474 async fn check_evidence_includes_timeout() {
475 let check = run_with_timeout("x", Duration::from_millis(50), async {}).await;
476 let timeout_evidence = check
477 .evidence
478 .iter()
479 .find(|e| e.label == "timeout_ms")
480 .expect("timeout_ms evidence present");
481 // The exact match isn't crucial; just ensure shape is numeric.
482 if let dev_report::EvidenceData::Numeric(n) = timeout_evidence.data {
483 assert_eq!(n, 50.0);
484 } else {
485 panic!("expected numeric");
486 }
487 }
488
489 #[test]
490 fn blocking_async_producer_with_new_runtime() {
491 let producer = BlockingAsyncProducer::with_new_runtime(|| async {
492 let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
493 r.push(dev_report::CheckResult::pass("x"));
494 r.finish();
495 r
496 })
497 .expect("build runtime");
498 let report = producer.produce();
499 assert_eq!(report.checks.len(), 1);
500 assert_eq!(report.overall_verdict(), Verdict::Pass);
501 }
502
503 #[test]
504 fn blocking_async_producer_with_current_thread_runtime() {
505 let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
506 let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
507 r.push(dev_report::CheckResult::pass("y"));
508 r.finish();
509 r
510 })
511 .expect("build runtime");
512 let report = producer.produce();
513 assert_eq!(report.checks.len(), 1);
514 }
515
516 #[test]
517 fn blocking_async_producer_can_drive_run_with_timeout() {
518 let producer = BlockingAsyncProducer::with_current_thread_runtime(|| async {
519 let check = run_with_timeout("op", Duration::from_millis(50), async {}).await;
520 let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
521 r.push(check);
522 r.finish();
523 r
524 })
525 .expect("build runtime");
526 let report = producer.produce();
527 assert!(matches!(report.overall_verdict(), Verdict::Pass));
528 }
529
530 #[test]
531 fn blocking_async_producer_with_runtime_builder() {
532 let producer = BlockingAsyncProducer::with_runtime_builder(
533 |b| {
534 b.worker_threads(1);
535 b.enable_all();
536 b
537 },
538 || async {
539 let mut r = Report::new("c", "0.1.0").with_producer("dev-async");
540 r.push(dev_report::CheckResult::pass("custom-rt"));
541 r.finish();
542 r
543 },
544 )
545 .expect("build runtime");
546 let report = producer.produce();
547 assert_eq!(report.checks.len(), 1);
548 }
549}