#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
#![warn(rust_2018_idioms)]
use std::future::Future;
use std::time::{Duration, Instant};
use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
pub mod deadlock;
pub mod shutdown;
pub mod tasks;
#[cfg(feature = "block-detect")]
#[cfg_attr(docsrs, doc(cfg(feature = "block-detect")))]
pub mod blocking;
pub async fn run_with_timeout<F, T>(
name: impl Into<String>,
timeout: Duration,
fut: F,
) -> CheckResult
where
F: Future<Output = T>,
{
let name = name.into();
let started = Instant::now();
match tokio::time::timeout(timeout, fut).await {
Ok(_value) => {
let elapsed = started.elapsed();
let mut c = CheckResult::pass(format!("async::{name}"))
.with_duration_ms(elapsed.as_millis() as u64);
c.tags = vec!["async".to_string()];
c.evidence = vec![
Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64),
Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
];
c
}
Err(_elapsed) => {
let mut c = CheckResult::fail(format!("async::{name}"), Severity::Error)
.with_detail(format!("future did not complete within {timeout:?}"));
c.tags = vec![
"async".to_string(),
"timeout".to_string(),
"regression".to_string(),
];
c.evidence = vec![Evidence::numeric("timeout_ms", timeout.as_millis() as f64)];
c
}
}
}
pub async fn join_all_with_timeout<T>(
name: impl Into<String>,
timeout: Duration,
handles: Vec<tokio::task::JoinHandle<T>>,
) -> Vec<CheckResult> {
let name = name.into();
let mut results = Vec::with_capacity(handles.len());
for (i, h) in handles.into_iter().enumerate() {
let task_name = format!("async::{name}::task{i}");
let started = Instant::now();
let evidence_base = vec![
Evidence::numeric("task_index", i as f64),
Evidence::numeric("timeout_ms", timeout.as_millis() as f64),
];
let result = match tokio::time::timeout(timeout, h).await {
Ok(Ok(_)) => {
let elapsed = started.elapsed();
let mut c =
CheckResult::pass(task_name).with_duration_ms(elapsed.as_millis() as u64);
c.tags = vec!["async".to_string()];
c.evidence = {
let mut e = evidence_base;
e.push(Evidence::numeric("elapsed_ms", elapsed.as_millis() as f64));
e
};
c
}
Ok(Err(join_err)) => {
let mut c = CheckResult::fail(task_name, Severity::Critical)
.with_detail(format!("task panicked or was cancelled: {join_err}"));
c.tags = vec![
"async".to_string(),
"task_panicked".to_string(),
"regression".to_string(),
];
c.evidence = evidence_base;
c
}
Err(_) => {
let mut c = CheckResult::fail(task_name, Severity::Error)
.with_detail(format!("task did not complete within {timeout:?}"));
c.tags = vec![
"async".to_string(),
"timeout".to_string(),
"regression".to_string(),
];
c.evidence = evidence_base;
c
}
};
results.push(result);
}
results
}
pub trait AsyncCheck {
type Output;
type Fut: Future<Output = Self::Output>;
fn run(self) -> Self::Fut;
}
pub trait AsyncProducer {
type Fut: Future<Output = Report>;
fn produce(self) -> Self::Fut;
}
pub struct BlockingAsyncProducer<F, Fut>
where
F: Fn() -> Fut,
Fut: Future<Output = Report>,
{
handle: tokio::runtime::Handle,
factory: F,
}
impl<F, Fut> BlockingAsyncProducer<F, Fut>
where
F: Fn() -> Fut,
Fut: Future<Output = Report>,
{
pub fn new(handle: tokio::runtime::Handle, factory: F) -> Self {
Self { handle, factory }
}
}
impl<F, Fut> Producer for BlockingAsyncProducer<F, Fut>
where
F: Fn() -> Fut,
Fut: Future<Output = Report>,
{
fn produce(&self) -> Report {
let fut = (self.factory)();
self.handle.block_on(fut)
}
}
#[cfg(test)]
mod tests {
use super::*;
use dev_report::Verdict;
#[tokio::test]
async fn timeout_pass_fast_future() {
let check = run_with_timeout("fast", Duration::from_millis(500), async {}).await;
assert_eq!(check.verdict, Verdict::Pass);
assert!(check.has_tag("async"));
let labels: Vec<&str> = check.evidence.iter().map(|e| e.label.as_str()).collect();
assert!(labels.contains(&"elapsed_ms"));
assert!(labels.contains(&"timeout_ms"));
}
#[tokio::test]
async fn timeout_fail_slow_future() {
let check = run_with_timeout("slow", Duration::from_millis(10), async {
tokio::time::sleep(Duration::from_millis(200)).await;
})
.await;
assert_eq!(check.verdict, Verdict::Fail);
assert!(check.has_tag("timeout"));
assert!(check.has_tag("regression"));
}
#[tokio::test]
async fn join_all_basic() {
let h1 = tokio::spawn(async { 1 });
let h2 = tokio::spawn(async { 2 });
let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h1, h2]).await;
assert_eq!(results.len(), 2);
assert!(results.iter().all(|r| r.verdict == Verdict::Pass));
assert!(results.iter().all(|r| r.has_tag("async")));
}
#[tokio::test]
async fn join_all_panic_is_critical() {
let h = tokio::spawn(async { panic!("oops") });
let results = join_all_with_timeout("g", Duration::from_secs(1), vec![h]).await;
assert_eq!(results.len(), 1);
assert_eq!(results[0].verdict, Verdict::Fail);
assert_eq!(results[0].severity, Some(Severity::Critical));
assert!(results[0].has_tag("task_panicked"));
}
#[tokio::test]
async fn join_all_timeout_is_error() {
let h = tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(500)).await;
});
let results = join_all_with_timeout("g", Duration::from_millis(20), vec![h]).await;
assert_eq!(results[0].verdict, Verdict::Fail);
assert_eq!(results[0].severity, Some(Severity::Error));
assert!(results[0].has_tag("timeout"));
}
#[tokio::test]
async fn check_evidence_includes_timeout() {
let check = run_with_timeout("x", Duration::from_millis(50), async {}).await;
let timeout_evidence = check
.evidence
.iter()
.find(|e| e.label == "timeout_ms")
.expect("timeout_ms evidence present");
if let dev_report::EvidenceData::Numeric(n) = timeout_evidence.data {
assert_eq!(n, 50.0);
} else {
panic!("expected numeric");
}
}
}