use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
#[derive(Debug)]
#[non_exhaustive]
#[must_use = "DetachOutcome carries the operation result; ignoring it discards either the value or the cancel/timeout signal"]
pub enum DetachOutcome<T> {
Completed(T),
Cancelled,
TimedOut,
Panicked(tokio::task::JoinError),
}
#[must_use = "DetachOutcome must be inspected to distinguish completion from cancel/timeout/panic"]
pub async fn run_with_cancel_and_timeout<F, T>(
fut: F,
ct: &CancellationToken,
timeout: Option<Duration>,
) -> DetachOutcome<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
if ct.is_cancelled() {
return DetachOutcome::Cancelled;
}
let mut handle = tokio::spawn(fut.instrument(tracing::Span::current()));
if let Some(t) = timeout {
tokio::select! {
biased;
joined = &mut handle => map_join(joined),
() = ct.cancelled() => DetachOutcome::Cancelled,
() = tokio::time::sleep(t) => DetachOutcome::TimedOut,
}
} else {
tokio::select! {
biased;
joined = &mut handle => map_join(joined),
() = ct.cancelled() => DetachOutcome::Cancelled,
}
}
}
fn map_join<T>(joined: Result<T, tokio::task::JoinError>) -> DetachOutcome<T> {
match joined {
Ok(v) => DetachOutcome::Completed(v),
Err(join_err) => DetachOutcome::Panicked(join_err),
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
reason = "test-only relaxations; production code uses ? and tracing"
)]
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU32, Ordering},
};
use tokio::time::Duration;
use super::*;
#[tokio::test]
async fn completed_returns_value_when_future_wins() {
let ct = CancellationToken::new();
let out =
run_with_cancel_and_timeout(async { 42_u32 }, &ct, Some(Duration::from_secs(5))).await;
assert!(matches!(out, DetachOutcome::Completed(42)));
}
#[tokio::test]
async fn cancel_outcome_and_detached_future_runs_to_completion() {
let done = Arc::new(AtomicBool::new(false));
let done_clone = Arc::clone(&done);
let ct = CancellationToken::new();
let fut = async move {
tokio::time::sleep(Duration::from_millis(100)).await;
done_clone.store(true, Ordering::SeqCst);
};
let ct_for_cancel = ct.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
ct_for_cancel.cancel();
});
let out = run_with_cancel_and_timeout(fut, &ct, None).await;
assert!(matches!(out, DetachOutcome::Cancelled));
tokio::time::sleep(Duration::from_millis(300)).await;
assert!(
done.load(Ordering::SeqCst),
"detached future must run to completion after cancel"
);
}
#[tokio::test]
async fn timeout_outcome_and_detached_future_runs_to_completion() {
let done = Arc::new(AtomicBool::new(false));
let done_clone = Arc::clone(&done);
let ct = CancellationToken::new();
let fut = async move {
tokio::time::sleep(Duration::from_millis(100)).await;
done_clone.store(true, Ordering::SeqCst);
};
let out = run_with_cancel_and_timeout(fut, &ct, Some(Duration::from_millis(10))).await;
assert!(matches!(out, DetachOutcome::TimedOut));
tokio::time::sleep(Duration::from_millis(300)).await;
assert!(
done.load(Ordering::SeqCst),
"detached future must run to completion after timeout"
);
}
#[tokio::test]
async fn panic_in_detached_future_surfaces_as_panicked() {
let ct = CancellationToken::new();
let out: DetachOutcome<()> = run_with_cancel_and_timeout(
async { panic!("boom") },
&ct,
Some(Duration::from_secs(5)),
)
.await;
assert!(
matches!(out, DetachOutcome::Panicked(ref e) if e.is_panic()),
"expected Panicked carrying a panic JoinError"
);
}
#[tokio::test]
async fn pre_cancelled_token_skips_spawn() {
let started = Arc::new(AtomicU32::new(0));
let started_clone = Arc::clone(&started);
let ct = CancellationToken::new();
ct.cancel();
let out = run_with_cancel_and_timeout(
async move {
started_clone.fetch_add(1, Ordering::SeqCst);
},
&ct,
None,
)
.await;
assert!(matches!(out, DetachOutcome::Cancelled));
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
started.load(Ordering::SeqCst),
0,
"pre-cancelled token must not spawn the future"
);
}
#[tokio::test]
async fn completion_wins_on_tie_with_cancel() {
for _ in 0..50 {
let ct = CancellationToken::new();
let ct_for_cancel = ct.clone();
tokio::spawn(async move {
ct_for_cancel.cancel();
});
let out = run_with_cancel_and_timeout(async { 7_u32 }, &ct, None).await;
match out {
DetachOutcome::Completed(7) | DetachOutcome::Cancelled => {}
DetachOutcome::Completed(other_val) => {
panic!("unexpected Completed value on tie race: {other_val}")
}
DetachOutcome::TimedOut => {
panic!("unexpected TimedOut on tie race (no timeout configured)")
}
DetachOutcome::Panicked(join_err) => {
panic!("unexpected Panicked on tie race: {join_err}")
}
}
}
}
}