http_api_client/
lib.rs

1use core::time::Duration;
2
3pub use async_trait::async_trait;
4pub use http_api_client_endpoint::{Body, Request, Response, http};
5use http_api_client_endpoint::{Endpoint, RetryableEndpoint, RetryableEndpointRetry};
6
7#[async_trait]
8pub trait Client {
9    type RespondError: core::error::Error + Send + Sync + 'static;
10
11    async fn respond(&self, request: Request<Body>) -> Result<Response<Body>, Self::RespondError>;
12
13    async fn respond_endpoint<EP>(
14        &self,
15        endpoint: &EP,
16    ) -> Result<
17        EP::ParseResponseOutput,
18        ClientRespondEndpointError<
19            Self::RespondError,
20            EP::RenderRequestError,
21            EP::ParseResponseError,
22        >,
23    >
24    where
25        EP: Endpoint + Send + Sync,
26    {
27        self.respond_endpoint_with_callback(endpoint, |req| req, |_| {})
28            .await
29    }
30
31    async fn respond_endpoint_with_callback<EP, PreRCB, PostRCB>(
32        &self,
33        endpoint: &EP,
34        pre_request_callback: PreRCB,
35        post_request_callback: PostRCB,
36    ) -> Result<
37        EP::ParseResponseOutput,
38        ClientRespondEndpointError<
39            Self::RespondError,
40            EP::RenderRequestError,
41            EP::ParseResponseError,
42        >,
43    >
44    where
45        EP: Endpoint + Send + Sync,
46        PreRCB: FnMut(Request<Body>) -> Request<Body> + Send,
47        PostRCB: FnMut(&Response<Body>) + Send,
48    {
49        self.respond_dyn_endpoint_with_callback(
50            endpoint,
51            pre_request_callback,
52            post_request_callback,
53        )
54        .await
55    }
56
57    async fn respond_dyn_endpoint<RRE, PRO, PRE>(
58        &self,
59        endpoint: &(
60             dyn Endpoint<
61            RenderRequestError = RRE,
62            ParseResponseOutput = PRO,
63            ParseResponseError = PRE,
64        > + Send
65                 + Sync
66         ),
67    ) -> Result<PRO, ClientRespondEndpointError<Self::RespondError, RRE, PRE>>
68    where
69        RRE: core::error::Error + Send + Sync + 'static,
70        PRE: core::error::Error + Send + Sync + 'static,
71    {
72        self.respond_dyn_endpoint_with_callback(endpoint, |req| req, |_| {})
73            .await
74    }
75
76    async fn respond_dyn_endpoint_with_callback<RRE, PRO, PRE, PreRCB, PostRCB>(
77        &self,
78        endpoint: &(
79             dyn Endpoint<
80            RenderRequestError = RRE,
81            ParseResponseOutput = PRO,
82            ParseResponseError = PRE,
83        > + Send
84                 + Sync
85         ),
86        mut pre_request_callback: PreRCB,
87        mut post_request_callback: PostRCB,
88    ) -> Result<PRO, ClientRespondEndpointError<Self::RespondError, RRE, PRE>>
89    where
90        RRE: core::error::Error + Send + Sync + 'static,
91        PRE: core::error::Error + Send + Sync + 'static,
92        PreRCB: FnMut(Request<Body>) -> Request<Body> + Send,
93        PostRCB: FnMut(&Response<Body>) + Send,
94    {
95        let request = endpoint
96            .render_request()
97            .map_err(ClientRespondEndpointError::EndpointRenderRequestFailed)?;
98
99        let request = pre_request_callback(request);
100
101        let response = self
102            .respond(request)
103            .await
104            .map_err(ClientRespondEndpointError::RespondFailed)?;
105
106        post_request_callback(&response);
107
108        endpoint
109            .parse_response(response)
110            .map_err(ClientRespondEndpointError::EndpointParseResponseFailed)
111    }
112}
113
114#[async_trait]
115pub trait RetryableClient: Client {
116    async fn sleep(&self, dur: Duration);
117
118    async fn respond_endpoint_until_done<EP>(
119        &self,
120        endpoint: &EP,
121    ) -> Result<
122        EP::ParseResponseOutput,
123        RetryableClientRespondEndpointUntilDoneError<
124            Self::RespondError,
125            EP::RenderRequestError,
126            EP::ParseResponseError,
127        >,
128    >
129    where
130        EP: RetryableEndpoint + Send + Sync,
131    {
132        self.respond_endpoint_until_done_with_callback(endpoint, |req, _| req, |_, _| {})
133            .await
134    }
135
136    async fn respond_endpoint_until_done_with_callback<EP, PreRCB, PostRCB>(
137        &self,
138        endpoint: &EP,
139        mut pre_request_callback: PreRCB,
140        mut post_request_callback: PostRCB,
141    ) -> Result<
142        EP::ParseResponseOutput,
143        RetryableClientRespondEndpointUntilDoneError<
144            Self::RespondError,
145            EP::RenderRequestError,
146            EP::ParseResponseError,
147        >,
148    >
149    where
150        EP: RetryableEndpoint + Send + Sync,
151        PreRCB: FnMut(Request<Body>, Option<&RetryableEndpointRetry<EP::RetryReason>>) -> Request<Body>
152            + Send,
153        PostRCB: FnMut(&Response<Body>, Option<&RetryableEndpointRetry<EP::RetryReason>>) + Send,
154    {
155        let mut retry = None;
156
157        loop {
158            let request = endpoint.render_request(retry.as_ref()).map_err(
159                RetryableClientRespondEndpointUntilDoneError::EndpointRenderRequestFailed,
160            )?;
161
162            let request = pre_request_callback(request, retry.as_ref());
163
164            let response = self
165                .respond(request)
166                .await
167                .map_err(RetryableClientRespondEndpointUntilDoneError::RespondFailed)?;
168
169            post_request_callback(&response, retry.as_ref());
170
171            match endpoint.parse_response(response, retry.as_ref()).map_err(
172                RetryableClientRespondEndpointUntilDoneError::EndpointParseResponseFailed,
173            )? {
174                Ok(output) => return Ok(output),
175                Err(reason) => {
176                    let x = retry.get_or_insert(RetryableEndpointRetry::new(0, reason.clone()));
177                    x.count += 1;
178                    x.reason = reason;
179                }
180            }
181
182            //
183            if let Some(retry) = &retry {
184                if retry.count >= endpoint.max_retry_count() {
185                    return Err(RetryableClientRespondEndpointUntilDoneError::ReachedMaxRetries);
186                }
187
188                self.sleep(endpoint.next_retry_in(retry)).await;
189            }
190        }
191    }
192}
193
194//
195#[derive(Debug)]
196pub enum ClientRespondEndpointError<RE, EPRRE, EPPRE>
197where
198    RE: core::error::Error + Send + Sync + 'static,
199    EPRRE: core::error::Error + Send + Sync + 'static,
200    EPPRE: core::error::Error + Send + Sync + 'static,
201{
202    RespondFailed(RE),
203    EndpointRenderRequestFailed(EPRRE),
204    EndpointParseResponseFailed(EPPRE),
205}
206impl<RE, EPRRE, EPPRE> core::fmt::Display for ClientRespondEndpointError<RE, EPRRE, EPPRE>
207where
208    RE: core::error::Error + Send + Sync + 'static,
209    EPRRE: core::error::Error + Send + Sync + 'static,
210    EPPRE: core::error::Error + Send + Sync + 'static,
211{
212    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
213        write!(f, "{:?}", self)
214    }
215}
216impl<RE, EPRRE, EPPRE> core::error::Error for ClientRespondEndpointError<RE, EPRRE, EPPRE>
217where
218    RE: core::error::Error + Send + Sync + 'static,
219    EPRRE: core::error::Error + Send + Sync + 'static,
220    EPPRE: core::error::Error + Send + Sync + 'static,
221{
222}
223
224//
225#[derive(Debug)]
226pub enum RetryableClientRespondEndpointUntilDoneError<RE, EPRRE, EPPRE>
227where
228    RE: core::error::Error + Send + Sync + 'static,
229    EPRRE: core::error::Error + Send + Sync + 'static,
230    EPPRE: core::error::Error + Send + Sync + 'static,
231{
232    RespondFailed(RE),
233    EndpointRenderRequestFailed(EPRRE),
234    EndpointParseResponseFailed(EPPRE),
235    ReachedMaxRetries,
236}
237impl<RE, EPRRE, EPPRE> core::fmt::Display
238    for RetryableClientRespondEndpointUntilDoneError<RE, EPRRE, EPPRE>
239where
240    RE: core::error::Error + Send + Sync + 'static,
241    EPRRE: core::error::Error + Send + Sync + 'static,
242    EPPRE: core::error::Error + Send + Sync + 'static,
243{
244    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
245        write!(f, "{:?}", self)
246    }
247}
248impl<RE, EPRRE, EPPRE> core::error::Error
249    for RetryableClientRespondEndpointUntilDoneError<RE, EPRRE, EPPRE>
250where
251    RE: core::error::Error + Send + Sync + 'static,
252    EPRRE: core::error::Error + Send + Sync + 'static,
253    EPPRE: core::error::Error + Send + Sync + 'static,
254{
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    use std::{collections::HashMap, io::Error as IoError, panic};
262
263    use futures_executor::block_on;
264
265    #[derive(Clone)]
266    struct MyEndpoint;
267    impl Endpoint for MyEndpoint {
268        type RenderRequestError = IoError;
269
270        type ParseResponseOutput = ();
271        type ParseResponseError = IoError;
272
273        fn render_request(&self) -> Result<Request<Body>, Self::RenderRequestError> {
274            unimplemented!()
275        }
276
277        fn parse_response(
278            &self,
279            _response: Response<Body>,
280        ) -> Result<Self::ParseResponseOutput, Self::ParseResponseError> {
281            unreachable!()
282        }
283    }
284
285    #[derive(Clone)]
286    struct MyClient;
287    #[async_trait]
288    impl Client for MyClient {
289        type RespondError = IoError;
290
291        async fn respond(
292            &self,
293            _request: Request<Body>,
294        ) -> Result<Response<Body>, Self::RespondError> {
295            unreachable!()
296        }
297    }
298
299    #[test]
300    fn test_respond_dyn_endpoint() {
301        let prev_hook = panic::take_hook();
302        panic::set_hook(Box::new(|_| {}));
303        let ret = panic::catch_unwind(|| {
304            block_on(async move {
305                let mut map: HashMap<
306                    &'static str,
307                    Box<
308                        dyn Endpoint<
309                                RenderRequestError = IoError,
310                                ParseResponseOutput = (),
311                                ParseResponseError = IoError,
312                            > + Send
313                            + Sync,
314                    >,
315                > = HashMap::new();
316
317                let key = "x";
318                map.insert(key, Box::new(MyEndpoint));
319                let client = MyClient;
320
321                let endpoint = map.get(key).unwrap();
322                client.respond_dyn_endpoint(endpoint.as_ref()).await
323            })
324        });
325        panic::set_hook(prev_hook);
326
327        match ret {
328            Err(err) => {
329                if let Some(s) = err.downcast_ref::<&str>() {
330                    assert!(s.contains("not implemented"))
331                } else {
332                    panic!("{:?}", err)
333                }
334            }
335            err => panic!("{:?}", err),
336        }
337    }
338}