wstd_aws/
lib.rs

1use anyhow::anyhow;
2use aws_smithy_async::rt::sleep::{AsyncSleep, Sleep};
3use aws_smithy_runtime_api::client::http::{
4    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
5};
6use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
7use aws_smithy_runtime_api::client::result::ConnectorError;
8use aws_smithy_runtime_api::client::retries::ErrorKind;
9use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
10use aws_smithy_runtime_api::http::Response;
11use aws_smithy_types::body::SdkBody;
12use http_body_util::{BodyStream, StreamBody};
13use std::time::Duration;
14use sync_wrapper::SyncStream;
15use wstd::http::{Body as WstdBody, BodyExt, Client};
16
17pub fn sleep_impl() -> impl AsyncSleep + 'static {
18    WstdSleep
19}
20
21#[derive(Debug)]
22struct WstdSleep;
23impl AsyncSleep for WstdSleep {
24    fn sleep(&self, duration: Duration) -> Sleep {
25        Sleep::new(async move {
26            wstd::task::sleep(wstd::time::Duration::from(duration)).await;
27        })
28    }
29}
30
31pub fn http_client() -> impl HttpClient + 'static {
32    WstdHttpClient
33}
34
35#[derive(Debug)]
36struct WstdHttpClient;
37
38impl HttpClient for WstdHttpClient {
39    fn http_connector(
40        &self,
41        settings: &HttpConnectorSettings,
42        // afaict, none of these components are relevant to this
43        // implementation.
44        _components: &RuntimeComponents,
45    ) -> SharedHttpConnector {
46        let mut client = Client::new();
47        if let Some(timeout) = settings.connect_timeout() {
48            client.set_connect_timeout(timeout);
49        }
50        if let Some(timeout) = settings.read_timeout() {
51            client.set_first_byte_timeout(timeout);
52        }
53        SharedHttpConnector::new(WstdHttpConnector(client))
54    }
55}
56
57#[derive(Debug)]
58struct WstdHttpConnector(Client);
59
60impl HttpConnector for WstdHttpConnector {
61    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
62        let client = self.0.clone();
63        HttpConnectorFuture::new(async move {
64            let request = request
65                .try_into_http1x()
66                // This can only fail if the Extensions fail to convert
67                .map_err(|e| ConnectorError::other(Box::new(e), None))?;
68            // smithy's SdkBody Error is a non-'static boxed dyn stderror.
69            // Anyhow can't represent that, so convert it to the debug impl.
70            let request =
71                request.map(|body| WstdBody::from_http_body(body.map_err(|e| anyhow!("{e:?}"))));
72            // Any error given by send is considered a "ClientError" kind
73            // which should prevent smithy from retrying like it would for a
74            // throttling error
75            let response = client
76                .send(request)
77                .await
78                .map_err(|e| ConnectorError::other(e.into(), Some(ErrorKind::ClientError)))?;
79
80            Response::try_from(response.map(|wstd_body| {
81                // You'd think that an SdkBody would just be an impl Body with
82                // the usual error type dance.
83                let nonsync_body = wstd_body
84                    .into_boxed_body()
85                    .map_err(|e| e.into_boxed_dyn_error());
86                // But we have to do this weird dance: because Axum insists
87                // bodies are not Sync, wstd settled on non-Sync bodies.
88                // Smithy insists on Sync bodies. The SyncStream type exists
89                // to assert, because all Stream operations are on &mut self,
90                // all Streams are Sync. So, turn the Body into a Stream, make
91                // it sync, then back to a Body.
92                let nonsync_stream = BodyStream::new(nonsync_body);
93                let sync_stream = SyncStream::new(nonsync_stream);
94                let sync_body = StreamBody::new(sync_stream);
95                SdkBody::from_body_1_x(sync_body)
96            }))
97            // This can only fail if the Extensions fail to convert
98            .map_err(|e| ConnectorError::other(Box::new(e), None))
99        })
100    }
101}