1use serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8use thiserror::Error;
9use tokio::time::timeout;
10use tokio_util::sync::CancellationToken;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct TimeoutConfig {
15 #[serde(default = "default_connect_timeout")]
17 pub connect: u64,
18 #[serde(default = "default_request_timeout")]
20 pub request: u64,
21 #[serde(default = "default_response_timeout")]
23 pub response: u64,
24 #[serde(default = "default_idle_timeout")]
26 pub idle: u64,
27}
28
29fn default_connect_timeout() -> u64 {
30 30000
31}
32fn default_request_timeout() -> u64 {
33 120000
34}
35fn default_response_timeout() -> u64 {
36 120000
37}
38fn default_idle_timeout() -> u64 {
39 60000
40}
41
42impl Default for TimeoutConfig {
43 fn default() -> Self {
44 DEFAULT_TIMEOUTS
45 }
46}
47
48pub const DEFAULT_TIMEOUTS: TimeoutConfig = TimeoutConfig {
50 connect: 30000, request: 120000, response: 120000, idle: 60000, };
55
56#[derive(Debug, Error)]
58#[error("Operation timed out after {timeout_ms}ms")]
59pub struct TimeoutError {
60 pub timeout_ms: u64,
62}
63
64#[derive(Debug, Error)]
66#[error("Operation aborted")]
67pub struct AbortError;
68
69pub fn is_timeout_error(error: &dyn std::error::Error) -> bool {
71 error.to_string().contains("timed out") || error.to_string().contains("timeout")
72}
73
74pub fn is_abort_error(error: &dyn std::error::Error) -> bool {
76 error.to_string().contains("abort") || error.to_string().contains("cancel")
77}
78
79pub async fn with_timeout<T, F>(future: F, timeout_ms: u64) -> Result<T, TimeoutError>
81where
82 F: Future<Output = T>,
83{
84 match timeout(Duration::from_millis(timeout_ms), future).await {
85 Ok(result) => Ok(result),
86 Err(_) => Err(TimeoutError { timeout_ms }),
87 }
88}
89
90pub async fn with_timeout_and_cancel<T, F>(
92 future: F,
93 timeout_ms: u64,
94 cancel_token: &CancellationToken,
95) -> Result<T, TimeoutOrAbortError>
96where
97 F: Future<Output = T>,
98{
99 tokio::select! {
100 result = timeout(Duration::from_millis(timeout_ms), future) => {
101 match result {
102 Ok(value) => Ok(value),
103 Err(_) => Err(TimeoutOrAbortError::Timeout(TimeoutError { timeout_ms })),
104 }
105 }
106 _ = cancel_token.cancelled() => {
107 Err(TimeoutOrAbortError::Abort(AbortError))
108 }
109 }
110}
111
112#[derive(Debug, Error)]
114pub enum TimeoutOrAbortError {
115 #[error("{0}")]
116 Timeout(#[from] TimeoutError),
117 #[error("{0}")]
118 Abort(#[from] AbortError),
119}
120
121pub async fn cancelable_delay(
123 ms: u64,
124 cancel_token: Option<&CancellationToken>,
125) -> Result<(), AbortError> {
126 let delay = tokio::time::sleep(Duration::from_millis(ms));
127
128 match cancel_token {
129 Some(token) => {
130 tokio::select! {
131 _ = delay => Ok(()),
132 _ = token.cancelled() => Err(AbortError),
133 }
134 }
135 None => {
136 delay.await;
137 Ok(())
138 }
139 }
140}
141
142pub fn timeout_duration(config: &TimeoutConfig) -> Duration {
144 Duration::from_millis(config.request)
145}
146
147pub fn connect_timeout_duration(config: &TimeoutConfig) -> Duration {
149 Duration::from_millis(config.connect)
150}