cyfs_lib/requestor/
requestor.rs1use crate::base::CYFS_CURRENT_API_EDITION;
2use cyfs_base::*;
3
4use async_std::net::SocketAddr;
5use async_trait::async_trait;
6use http_types::{Request, Response};
7use std::sync::Arc;
8use std::time::Duration;
9
10pub enum HttpRequestConnectionInfo {
11 None,
12 Tcp((SocketAddr, SocketAddr)),
13 Bdt((Endpoint, Endpoint)),
14}
15
16#[async_trait]
17pub trait HttpRequestor: Send + Sync {
18 fn remote_addr(&self) -> String;
19 fn remote_device(&self) -> Option<DeviceId>;
20
21 fn add_default_headers(&self, mut req: Request) -> Request {
22 req.insert_header(CYFS_API_EDITION, CYFS_CURRENT_API_EDITION.to_string());
23 req
24 }
25
26 async fn request(&self, req: Request) -> BuckyResult<Response> {
27 self.request_ext(&mut Some(req), None).await
28 }
29
30 async fn request_with_conn_info(
31 &self,
32 req: Request,
33 conn_info: Option<&mut HttpRequestConnectionInfo>,
34 ) -> BuckyResult<Response> {
35 self.request_ext(&mut Some(req), conn_info).await
36 }
37
38 async fn request_with_conn_info_timeout(
39 &self,
40 req: Request,
41 conn_info: Option<&mut HttpRequestConnectionInfo>,
42 dur: Duration,
43 ) -> BuckyResult<Response> {
44 self.request_ext_timeout(&mut Some(req), dur, conn_info)
45 .await
46 }
47
48 async fn request_timeout(&self, req: Request, dur: Duration) -> BuckyResult<Response> {
49 self.request_ext_timeout(&mut Some(req), dur, None).await
50 }
51
52 async fn request_ext_timeout(
53 &self,
54 req: &mut Option<Request>,
55 dur: Duration,
56 conn_info: Option<&mut HttpRequestConnectionInfo>,
57 ) -> BuckyResult<Response> {
58 match async_std::future::timeout(dur, self.request_ext(req, conn_info)).await {
59 Ok(ret) => ret,
60 Err(async_std::future::TimeoutError { .. }) => {
61 let msg = format!("request timeout, remote={}", self.remote_addr(),);
62 error!("{}", msg);
63
64 Err(BuckyError::new(BuckyErrorCode::Timeout, msg))
65 }
66 }
67 }
68
69 async fn request_ext(
70 &self,
71 req: &mut Option<Request>,
72 conn_info: Option<&mut HttpRequestConnectionInfo>,
73 ) -> BuckyResult<Response>;
74
75 fn clone_requestor(&self) -> Box<dyn HttpRequestor>;
76
77 async fn stop(&self);
78}
79
80pub type HttpRequestorRef = Arc<Box<dyn HttpRequestor>>;
81
82#[derive(Clone, Eq, PartialEq)]
83pub enum RequestorRetryStrategy {
84 EqualInterval,
85 ExpInterval,
86}
87
88pub struct RequestorWithRetry {
89 requestor: Box<dyn HttpRequestor>,
90 retry_count: u32,
91 retry_strategy: RequestorRetryStrategy,
92 timeout: Option<Duration>,
93}
94
95impl Clone for RequestorWithRetry {
96 fn clone(&self) -> Self {
97 Self {
98 requestor: self.requestor.clone_requestor(),
99 retry_count: self.retry_count,
100 retry_strategy: self.retry_strategy.clone(),
101 timeout: None,
102 }
103 }
104}
105
106impl RequestorWithRetry {
107 pub fn new(
108 requestor: Box<dyn HttpRequestor>,
109 retry_count: u32,
110 retry_strategy: RequestorRetryStrategy,
111 timeout: Option<Duration>,
112 ) -> Self {
113 Self {
114 requestor,
115 retry_count,
116 retry_strategy,
117 timeout,
118 }
119 }
120}
121
122#[async_trait]
123impl HttpRequestor for RequestorWithRetry {
124 async fn request_ext(
125 &self,
126 req: &mut Option<Request>,
127 mut conn_info: Option<&mut HttpRequestConnectionInfo>,
128 ) -> BuckyResult<Response> {
129 let mut retry_count = 0;
130 let info = &mut conn_info;
131 loop {
132 let ret = if let Some(t) = &self.timeout {
133 self.requestor
134 .request_ext_timeout(req, t.clone(), info.as_deref_mut())
135 .await
136 } else {
137 self.requestor.request_ext(req, info.as_deref_mut()).await
138 };
139
140 match ret {
141 Ok(resp) => break Ok(resp),
142 Err(e) => {
143 if e.code() != BuckyErrorCode::ConnectFailed {
146 break Err(e);
147 }
148
149 assert!(req.is_some());
151
152 if retry_count >= self.retry_count {
153 warn!(
154 "bdt connect to {} extend max retry limit",
155 self.requestor.remote_addr()
156 );
157 break Err(e);
158 }
159
160 retry_count += 1;
161 let secd = match self.retry_strategy {
162 RequestorRetryStrategy::EqualInterval => 2_u64 * retry_count as u64,
163 RequestorRetryStrategy::ExpInterval => 2_u64.pow(retry_count),
164 };
165
166 warn!(
167 "bdt connect to {} error, now will retry after {} secs",
168 self.requestor.remote_addr(),
169 secd
170 );
171
172 async_std::task::sleep(Duration::from_secs(secd)).await;
173 }
174 }
175 }
176 }
177
178 fn remote_addr(&self) -> String {
179 self.requestor.remote_addr()
180 }
181
182 fn remote_device(&self) -> Option<DeviceId> {
183 self.requestor.remote_device()
184 }
185
186 fn clone_requestor(&self) -> Box<dyn HttpRequestor> {
187 self.requestor.clone_requestor()
188 }
189
190 async fn stop(&self) {
191 self.requestor.stop().await
192 }
193}