1use core::time::Duration;
2
3pub use async_trait::async_trait;
4pub use http_api_client_endpoint::{Body, Request, Response, http};
5use http_api_client_endpoint::{Endpoint, RetryableEndpoint, RetryableEndpointRetry};
6
7#[async_trait]
8pub trait Client {
9 type RespondError: core::error::Error + Send + Sync + 'static;
10
11 async fn respond(&self, request: Request<Body>) -> Result<Response<Body>, Self::RespondError>;
12
13 async fn respond_endpoint<EP>(
14 &self,
15 endpoint: &EP,
16 ) -> Result<
17 EP::ParseResponseOutput,
18 ClientRespondEndpointError<
19 Self::RespondError,
20 EP::RenderRequestError,
21 EP::ParseResponseError,
22 >,
23 >
24 where
25 EP: Endpoint + Send + Sync,
26 {
27 self.respond_endpoint_with_callback(endpoint, |req| req, |_| {})
28 .await
29 }
30
31 async fn respond_endpoint_with_callback<EP, PreRCB, PostRCB>(
32 &self,
33 endpoint: &EP,
34 pre_request_callback: PreRCB,
35 post_request_callback: PostRCB,
36 ) -> Result<
37 EP::ParseResponseOutput,
38 ClientRespondEndpointError<
39 Self::RespondError,
40 EP::RenderRequestError,
41 EP::ParseResponseError,
42 >,
43 >
44 where
45 EP: Endpoint + Send + Sync,
46 PreRCB: FnMut(Request<Body>) -> Request<Body> + Send,
47 PostRCB: FnMut(&Response<Body>) + Send,
48 {
49 self.respond_dyn_endpoint_with_callback(
50 endpoint,
51 pre_request_callback,
52 post_request_callback,
53 )
54 .await
55 }
56
57 async fn respond_dyn_endpoint<RRE, PRO, PRE>(
58 &self,
59 endpoint: &(
60 dyn Endpoint<
61 RenderRequestError = RRE,
62 ParseResponseOutput = PRO,
63 ParseResponseError = PRE,
64 > + Send
65 + Sync
66 ),
67 ) -> Result<PRO, ClientRespondEndpointError<Self::RespondError, RRE, PRE>>
68 where
69 RRE: core::error::Error + Send + Sync + 'static,
70 PRE: core::error::Error + Send + Sync + 'static,
71 {
72 self.respond_dyn_endpoint_with_callback(endpoint, |req| req, |_| {})
73 .await
74 }
75
76 async fn respond_dyn_endpoint_with_callback<RRE, PRO, PRE, PreRCB, PostRCB>(
77 &self,
78 endpoint: &(
79 dyn Endpoint<
80 RenderRequestError = RRE,
81 ParseResponseOutput = PRO,
82 ParseResponseError = PRE,
83 > + Send
84 + Sync
85 ),
86 mut pre_request_callback: PreRCB,
87 mut post_request_callback: PostRCB,
88 ) -> Result<PRO, ClientRespondEndpointError<Self::RespondError, RRE, PRE>>
89 where
90 RRE: core::error::Error + Send + Sync + 'static,
91 PRE: core::error::Error + Send + Sync + 'static,
92 PreRCB: FnMut(Request<Body>) -> Request<Body> + Send,
93 PostRCB: FnMut(&Response<Body>) + Send,
94 {
95 let request = endpoint
96 .render_request()
97 .map_err(ClientRespondEndpointError::EndpointRenderRequestFailed)?;
98
99 let request = pre_request_callback(request);
100
101 let response = self
102 .respond(request)
103 .await
104 .map_err(ClientRespondEndpointError::RespondFailed)?;
105
106 post_request_callback(&response);
107
108 endpoint
109 .parse_response(response)
110 .map_err(ClientRespondEndpointError::EndpointParseResponseFailed)
111 }
112}
113
114#[async_trait]
115pub trait RetryableClient: Client {
116 async fn sleep(&self, dur: Duration);
117
118 async fn respond_endpoint_until_done<EP>(
119 &self,
120 endpoint: &EP,
121 ) -> Result<
122 EP::ParseResponseOutput,
123 RetryableClientRespondEndpointUntilDoneError<
124 Self::RespondError,
125 EP::RenderRequestError,
126 EP::ParseResponseError,
127 >,
128 >
129 where
130 EP: RetryableEndpoint + Send + Sync,
131 {
132 self.respond_endpoint_until_done_with_callback(endpoint, |req, _| req, |_, _| {})
133 .await
134 }
135
136 async fn respond_endpoint_until_done_with_callback<EP, PreRCB, PostRCB>(
137 &self,
138 endpoint: &EP,
139 mut pre_request_callback: PreRCB,
140 mut post_request_callback: PostRCB,
141 ) -> Result<
142 EP::ParseResponseOutput,
143 RetryableClientRespondEndpointUntilDoneError<
144 Self::RespondError,
145 EP::RenderRequestError,
146 EP::ParseResponseError,
147 >,
148 >
149 where
150 EP: RetryableEndpoint + Send + Sync,
151 PreRCB: FnMut(Request<Body>, Option<&RetryableEndpointRetry<EP::RetryReason>>) -> Request<Body>
152 + Send,
153 PostRCB: FnMut(&Response<Body>, Option<&RetryableEndpointRetry<EP::RetryReason>>) + Send,
154 {
155 let mut retry = None;
156
157 loop {
158 let request = endpoint.render_request(retry.as_ref()).map_err(
159 RetryableClientRespondEndpointUntilDoneError::EndpointRenderRequestFailed,
160 )?;
161
162 let request = pre_request_callback(request, retry.as_ref());
163
164 let response = self
165 .respond(request)
166 .await
167 .map_err(RetryableClientRespondEndpointUntilDoneError::RespondFailed)?;
168
169 post_request_callback(&response, retry.as_ref());
170
171 match endpoint.parse_response(response, retry.as_ref()).map_err(
172 RetryableClientRespondEndpointUntilDoneError::EndpointParseResponseFailed,
173 )? {
174 Ok(output) => return Ok(output),
175 Err(reason) => {
176 let x = retry.get_or_insert(RetryableEndpointRetry::new(0, reason.clone()));
177 x.count += 1;
178 x.reason = reason;
179 }
180 }
181
182 if let Some(retry) = &retry {
184 if retry.count >= endpoint.max_retry_count() {
185 return Err(RetryableClientRespondEndpointUntilDoneError::ReachedMaxRetries);
186 }
187
188 self.sleep(endpoint.next_retry_in(retry)).await;
189 }
190 }
191 }
192}
193
194#[derive(Debug)]
196pub enum ClientRespondEndpointError<RE, EPRRE, EPPRE>
197where
198 RE: core::error::Error + Send + Sync + 'static,
199 EPRRE: core::error::Error + Send + Sync + 'static,
200 EPPRE: core::error::Error + Send + Sync + 'static,
201{
202 RespondFailed(RE),
203 EndpointRenderRequestFailed(EPRRE),
204 EndpointParseResponseFailed(EPPRE),
205}
206impl<RE, EPRRE, EPPRE> core::fmt::Display for ClientRespondEndpointError<RE, EPRRE, EPPRE>
207where
208 RE: core::error::Error + Send + Sync + 'static,
209 EPRRE: core::error::Error + Send + Sync + 'static,
210 EPPRE: core::error::Error + Send + Sync + 'static,
211{
212 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
213 write!(f, "{:?}", self)
214 }
215}
216impl<RE, EPRRE, EPPRE> core::error::Error for ClientRespondEndpointError<RE, EPRRE, EPPRE>
217where
218 RE: core::error::Error + Send + Sync + 'static,
219 EPRRE: core::error::Error + Send + Sync + 'static,
220 EPPRE: core::error::Error + Send + Sync + 'static,
221{
222}
223
224#[derive(Debug)]
226pub enum RetryableClientRespondEndpointUntilDoneError<RE, EPRRE, EPPRE>
227where
228 RE: core::error::Error + Send + Sync + 'static,
229 EPRRE: core::error::Error + Send + Sync + 'static,
230 EPPRE: core::error::Error + Send + Sync + 'static,
231{
232 RespondFailed(RE),
233 EndpointRenderRequestFailed(EPRRE),
234 EndpointParseResponseFailed(EPPRE),
235 ReachedMaxRetries,
236}
237impl<RE, EPRRE, EPPRE> core::fmt::Display
238 for RetryableClientRespondEndpointUntilDoneError<RE, EPRRE, EPPRE>
239where
240 RE: core::error::Error + Send + Sync + 'static,
241 EPRRE: core::error::Error + Send + Sync + 'static,
242 EPPRE: core::error::Error + Send + Sync + 'static,
243{
244 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
245 write!(f, "{:?}", self)
246 }
247}
248impl<RE, EPRRE, EPPRE> core::error::Error
249 for RetryableClientRespondEndpointUntilDoneError<RE, EPRRE, EPPRE>
250where
251 RE: core::error::Error + Send + Sync + 'static,
252 EPRRE: core::error::Error + Send + Sync + 'static,
253 EPPRE: core::error::Error + Send + Sync + 'static,
254{
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 use std::{collections::HashMap, io::Error as IoError, panic};
262
263 use futures_executor::block_on;
264
265 #[derive(Clone)]
266 struct MyEndpoint;
267 impl Endpoint for MyEndpoint {
268 type RenderRequestError = IoError;
269
270 type ParseResponseOutput = ();
271 type ParseResponseError = IoError;
272
273 fn render_request(&self) -> Result<Request<Body>, Self::RenderRequestError> {
274 unimplemented!()
275 }
276
277 fn parse_response(
278 &self,
279 _response: Response<Body>,
280 ) -> Result<Self::ParseResponseOutput, Self::ParseResponseError> {
281 unreachable!()
282 }
283 }
284
285 #[derive(Clone)]
286 struct MyClient;
287 #[async_trait]
288 impl Client for MyClient {
289 type RespondError = IoError;
290
291 async fn respond(
292 &self,
293 _request: Request<Body>,
294 ) -> Result<Response<Body>, Self::RespondError> {
295 unreachable!()
296 }
297 }
298
299 #[test]
300 fn test_respond_dyn_endpoint() {
301 let prev_hook = panic::take_hook();
302 panic::set_hook(Box::new(|_| {}));
303 let ret = panic::catch_unwind(|| {
304 block_on(async move {
305 let mut map: HashMap<
306 &'static str,
307 Box<
308 dyn Endpoint<
309 RenderRequestError = IoError,
310 ParseResponseOutput = (),
311 ParseResponseError = IoError,
312 > + Send
313 + Sync,
314 >,
315 > = HashMap::new();
316
317 let key = "x";
318 map.insert(key, Box::new(MyEndpoint));
319 let client = MyClient;
320
321 let endpoint = map.get(key).unwrap();
322 client.respond_dyn_endpoint(endpoint.as_ref()).await
323 })
324 });
325 panic::set_hook(prev_hook);
326
327 match ret {
328 Err(err) => {
329 if let Some(s) = err.downcast_ref::<&str>() {
330 assert!(s.contains("not implemented"))
331 } else {
332 panic!("{:?}", err)
333 }
334 }
335 err => panic!("{:?}", err),
336 }
337 }
338}