1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//! Sleep utilities with abort signal support.
//!
//! Provides async sleep functions that respect tokio cancellation.
use std::time::Duration;
use tokio::time::sleep as tokio_sleep;
/// Sleep for the specified duration, respecting the abort signal.
///
/// Returns `Ok(())` if the sleep completed normally, or `Err` if aborted.
///
/// # Arguments
/// * `ms` - Duration to sleep in milliseconds
/// * `signal` - Optional abort signal (can be any Future that resolves to indicate abort)
pub async fn sleep(ms: u64, signal: impl std::future::Future<Output = ()>) -> Result<(), SleepError> {
sleep_ms_with_abort(ms, signal).await
}
/// Sleep for the specified duration with abort signal support.
///
/// This is the underlying implementation that uses `tokio::select!`.
async fn sleep_ms_with_abort(
ms: u64,
abort: impl std::future::Future<Output = ()>,
) -> Result<(), SleepError> {
tokio::select! {
_ = tokio_sleep(Duration::from_millis(ms)) => {
Ok(())
}
_ = abort => {
Err(SleepError::Aborted)
}
}
}
/// Sleep for a duration with optional early return on abort signal.
///
/// This variant checks the abort signal at the start and returns immediately
/// if already aborted, then sleeps for the remaining time.
///
/// # Arguments
/// * `ms` - Duration to sleep in milliseconds
/// * `abort` - Abort signal (returns immediately if already triggered)
pub async fn sleep_until_aborted(
ms: u64,
abort: impl std::future::Future<Output = ()>,
) -> Result<(), SleepError> {
tokio::select! {
biased; // Check abort first
_ = abort => {
Err(SleepError::Aborted)
}
_ = tokio_sleep(Duration::from_millis(ms)) => {
Ok(())
}
}
}
/// Sleep for a duration, returning `true` if completed or `false` if aborted.
pub async fn sleep_or_abort(ms: u64, abort: impl std::future::Future<Output = ()>) -> bool {
tokio::select! {
_ = tokio_sleep(Duration::from_millis(ms)) => true,
_ = abort => false,
}
}
/// Errors that can occur during sleep operations
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SleepError {
/// The sleep was aborted by the signal
Aborted,
/// The timer had an error (usually shouldn't happen)
TimerError,
}
impl std::fmt::Display for SleepError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SleepError::Aborted => write!(f, "sleep aborted"),
SleepError::TimerError => write!(f, "timer error"),
}
}
}
impl std::error::Error for SleepError {}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_sleep_completes() {
let result = sleep(10, std::future::pending::<()>()).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_sleep_aborted() {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
// Spawn a task to abort after a short delay
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
let _ = tx.send(());
});
// Convert receiver to a future that resolves to ()
let abort = async {
let _ = rx.await;
};
let result = sleep(1000, abort).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), SleepError::Aborted);
}
#[tokio::test]
async fn test_sleep_or_abort_completes() {
let result = sleep_or_abort(10, std::future::pending::<()>()).await;
assert!(result);
}
#[tokio::test]
async fn test_sleep_or_abort_aborts() {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
let _ = tx.send(());
});
// Convert receiver to a future that resolves to ()
let abort = async {
let _ = rx.await;
};
let result = sleep_or_abort(1000, abort).await;
assert!(!result);
}
}