cyfs_lib/requestor/
requestor.rs

1use crate::base::CYFS_CURRENT_API_EDITION;
2use cyfs_base::*;
3
4use async_std::net::SocketAddr;
5use async_trait::async_trait;
6use http_types::{Request, Response};
7use std::sync::Arc;
8use std::time::Duration;
9
10pub enum HttpRequestConnectionInfo {
11    None,
12    Tcp((SocketAddr, SocketAddr)),
13    Bdt((Endpoint, Endpoint)),
14}
15
16#[async_trait]
17pub trait HttpRequestor: Send + Sync {
18    fn remote_addr(&self) -> String;
19    fn remote_device(&self) -> Option<DeviceId>;
20
21    fn add_default_headers(&self, mut req: Request) -> Request {
22        req.insert_header(CYFS_API_EDITION, CYFS_CURRENT_API_EDITION.to_string());
23        req
24    }
25
26    async fn request(&self, req: Request) -> BuckyResult<Response> {
27        self.request_ext(&mut Some(req), None).await
28    }
29
30    async fn request_with_conn_info(
31        &self,
32        req: Request,
33        conn_info: Option<&mut HttpRequestConnectionInfo>,
34    ) -> BuckyResult<Response> {
35        self.request_ext(&mut Some(req), conn_info).await
36    }
37
38    async fn request_with_conn_info_timeout(
39        &self,
40        req: Request,
41        conn_info: Option<&mut HttpRequestConnectionInfo>,
42        dur: Duration,
43    ) -> BuckyResult<Response> {
44        self.request_ext_timeout(&mut Some(req), dur, conn_info)
45            .await
46    }
47
48    async fn request_timeout(&self, req: Request, dur: Duration) -> BuckyResult<Response> {
49        self.request_ext_timeout(&mut Some(req), dur, None).await
50    }
51
52    async fn request_ext_timeout(
53        &self,
54        req: &mut Option<Request>,
55        dur: Duration,
56        conn_info: Option<&mut HttpRequestConnectionInfo>,
57    ) -> BuckyResult<Response> {
58        match async_std::future::timeout(dur, self.request_ext(req, conn_info)).await {
59            Ok(ret) => ret,
60            Err(async_std::future::TimeoutError { .. }) => {
61                let msg = format!("request timeout, remote={}", self.remote_addr(),);
62                error!("{}", msg);
63
64                Err(BuckyError::new(BuckyErrorCode::Timeout, msg))
65            }
66        }
67    }
68
69    async fn request_ext(
70        &self,
71        req: &mut Option<Request>,
72        conn_info: Option<&mut HttpRequestConnectionInfo>,
73    ) -> BuckyResult<Response>;
74
75    fn clone_requestor(&self) -> Box<dyn HttpRequestor>;
76
77    async fn stop(&self);
78}
79
80pub type HttpRequestorRef = Arc<Box<dyn HttpRequestor>>;
81
82#[derive(Clone, Eq, PartialEq)]
83pub enum RequestorRetryStrategy {
84    EqualInterval,
85    ExpInterval,
86}
87
88pub struct RequestorWithRetry {
89    requestor: Box<dyn HttpRequestor>,
90    retry_count: u32,
91    retry_strategy: RequestorRetryStrategy,
92    timeout: Option<Duration>,
93}
94
95impl Clone for RequestorWithRetry {
96    fn clone(&self) -> Self {
97        Self {
98            requestor: self.requestor.clone_requestor(),
99            retry_count: self.retry_count,
100            retry_strategy: self.retry_strategy.clone(),
101            timeout: None,
102        }
103    }
104}
105
106impl RequestorWithRetry {
107    pub fn new(
108        requestor: Box<dyn HttpRequestor>,
109        retry_count: u32,
110        retry_strategy: RequestorRetryStrategy,
111        timeout: Option<Duration>,
112    ) -> Self {
113        Self {
114            requestor,
115            retry_count,
116            retry_strategy,
117            timeout,
118        }
119    }
120}
121
122#[async_trait]
123impl HttpRequestor for RequestorWithRetry {
124    async fn request_ext(
125        &self,
126        req: &mut Option<Request>,
127        mut conn_info: Option<&mut HttpRequestConnectionInfo>,
128    ) -> BuckyResult<Response> {
129        let mut retry_count = 0;
130        let info = &mut conn_info;
131        loop {
132            let ret = if let Some(t) = &self.timeout {
133                self.requestor
134                    .request_ext_timeout(req, t.clone(), info.as_deref_mut())
135                    .await
136            } else {
137                self.requestor.request_ext(req, info.as_deref_mut()).await
138            };
139
140            match ret {
141                Ok(resp) => break Ok(resp),
142                Err(e) => {
143                    // 只对连接失败的错误进行重试
144                    // 请求超时的错误不能重试,req已经被消耗掉了
145                    if e.code() != BuckyErrorCode::ConnectFailed {
146                        break Err(e);
147                    }
148
149                    // 连接失败情况下,req不会被消耗,所以可以用做下次重试
150                    assert!(req.is_some());
151
152                    if retry_count >= self.retry_count {
153                        warn!(
154                            "bdt connect to {} extend max retry limit",
155                            self.requestor.remote_addr()
156                        );
157                        break Err(e);
158                    }
159
160                    retry_count += 1;
161                    let secd = match self.retry_strategy {
162                        RequestorRetryStrategy::EqualInterval => 2_u64 * retry_count as u64,
163                        RequestorRetryStrategy::ExpInterval => 2_u64.pow(retry_count),
164                    };
165
166                    warn!(
167                        "bdt connect to {} error, now will retry after {} secs",
168                        self.requestor.remote_addr(),
169                        secd
170                    );
171
172                    async_std::task::sleep(Duration::from_secs(secd)).await;
173                }
174            }
175        }
176    }
177
178    fn remote_addr(&self) -> String {
179        self.requestor.remote_addr()
180    }
181
182    fn remote_device(&self) -> Option<DeviceId> {
183        self.requestor.remote_device()
184    }
185
186    fn clone_requestor(&self) -> Box<dyn HttpRequestor> {
187        self.requestor.clone_requestor()
188    }
189
190    async fn stop(&self) {
191        self.requestor.stop().await
192    }
193}