Skip to main content

aster/network/
timeout.rs

1//! 网络超时和取消控制
2//!
3//! 支持超时配置和取消令牌
4
5use serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8use thiserror::Error;
9use tokio::time::timeout;
10use tokio_util::sync::CancellationToken;
11
12/// 超时配置
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct TimeoutConfig {
15    /// 连接超时(毫秒)
16    #[serde(default = "default_connect_timeout")]
17    pub connect: u64,
18    /// 请求超时(毫秒)
19    #[serde(default = "default_request_timeout")]
20    pub request: u64,
21    /// 响应超时(毫秒)
22    #[serde(default = "default_response_timeout")]
23    pub response: u64,
24    /// Socket 空闲超时(毫秒)
25    #[serde(default = "default_idle_timeout")]
26    pub idle: u64,
27}
28
29fn default_connect_timeout() -> u64 {
30    30000
31}
32fn default_request_timeout() -> u64 {
33    120000
34}
35fn default_response_timeout() -> u64 {
36    120000
37}
38fn default_idle_timeout() -> u64 {
39    60000
40}
41
42impl Default for TimeoutConfig {
43    fn default() -> Self {
44        DEFAULT_TIMEOUTS
45    }
46}
47
48/// 默认超时配置
49pub const DEFAULT_TIMEOUTS: TimeoutConfig = TimeoutConfig {
50    connect: 30000,   // 30秒
51    request: 120000,  // 2分钟
52    response: 120000, // 2分钟
53    idle: 60000,      // 1分钟
54};
55
56/// 超时错误
57#[derive(Debug, Error)]
58#[error("Operation timed out after {timeout_ms}ms")]
59pub struct TimeoutError {
60    /// 超时时间(毫秒)
61    pub timeout_ms: u64,
62}
63
64/// 取消错误
65#[derive(Debug, Error)]
66#[error("Operation aborted")]
67pub struct AbortError;
68
69/// 检查错误是否为超时错误
70pub fn is_timeout_error(error: &dyn std::error::Error) -> bool {
71    error.to_string().contains("timed out") || error.to_string().contains("timeout")
72}
73
74/// 检查错误是否为取消错误
75pub fn is_abort_error(error: &dyn std::error::Error) -> bool {
76    error.to_string().contains("abort") || error.to_string().contains("cancel")
77}
78
79/// 带超时执行异步操作
80pub async fn with_timeout<T, F>(future: F, timeout_ms: u64) -> Result<T, TimeoutError>
81where
82    F: Future<Output = T>,
83{
84    match timeout(Duration::from_millis(timeout_ms), future).await {
85        Ok(result) => Ok(result),
86        Err(_) => Err(TimeoutError { timeout_ms }),
87    }
88}
89
90/// 带超时和取消执行异步操作
91pub async fn with_timeout_and_cancel<T, F>(
92    future: F,
93    timeout_ms: u64,
94    cancel_token: &CancellationToken,
95) -> Result<T, TimeoutOrAbortError>
96where
97    F: Future<Output = T>,
98{
99    tokio::select! {
100        result = timeout(Duration::from_millis(timeout_ms), future) => {
101            match result {
102                Ok(value) => Ok(value),
103                Err(_) => Err(TimeoutOrAbortError::Timeout(TimeoutError { timeout_ms })),
104            }
105        }
106        _ = cancel_token.cancelled() => {
107            Err(TimeoutOrAbortError::Abort(AbortError))
108        }
109    }
110}
111
112/// 超时或取消错误
113#[derive(Debug, Error)]
114pub enum TimeoutOrAbortError {
115    #[error("{0}")]
116    Timeout(#[from] TimeoutError),
117    #[error("{0}")]
118    Abort(#[from] AbortError),
119}
120
121/// 可取消的延迟
122pub async fn cancelable_delay(
123    ms: u64,
124    cancel_token: Option<&CancellationToken>,
125) -> Result<(), AbortError> {
126    let delay = tokio::time::sleep(Duration::from_millis(ms));
127
128    match cancel_token {
129        Some(token) => {
130            tokio::select! {
131                _ = delay => Ok(()),
132                _ = token.cancelled() => Err(AbortError),
133            }
134        }
135        None => {
136            delay.await;
137            Ok(())
138        }
139    }
140}
141
142/// 创建超时 Duration
143pub fn timeout_duration(config: &TimeoutConfig) -> Duration {
144    Duration::from_millis(config.request)
145}
146
147/// 创建连接超时 Duration
148pub fn connect_timeout_duration(config: &TimeoutConfig) -> Duration {
149    Duration::from_millis(config.connect)
150}