ex3_ic_agent/agent/http_transport/
hyper_transport.rs

1//! A [`Transport`] that connects using a [`hyper`] client.
2pub use hyper;
3
4use std::{any, error::Error, future::Future, marker::PhantomData, sync::atomic::AtomicPtr};
5
6use http::uri::{Authority, PathAndQuery};
7use http_body::{LengthLimitError, Limited};
8use hyper::{
9    body::{Bytes, HttpBody},
10    client::HttpConnector,
11    header::CONTENT_TYPE,
12    service::Service,
13    Client, Method, Request, Response, Uri,
14};
15use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
16
17use crate::{
18    agent::{
19        agent_error::HttpErrorPayload,
20        http_transport::{IC0_DOMAIN, IC0_SUB_DOMAIN},
21        AgentFuture, Transport,
22    },
23    export::Principal,
24    AgentError, RequestId,
25};
26
27/// A [`Transport`] using [`hyper`] to make HTTP calls to the Internet Computer.
28#[derive(Debug)]
29pub struct HyperTransport<B1, S = Client<HttpsConnector<HttpConnector>, B1>> {
30    _marker: PhantomData<AtomicPtr<B1>>,
31    url: Uri,
32    max_response_body_size: Option<usize>,
33    service: S,
34}
35
36#[doc(hidden)]
37pub use HyperTransport as HyperReplicaV2Transport; // deprecate after 0.24
38
39/// Trait representing the contraints on [`HttpBody`] that [`HyperTransport`] requires
40pub trait HyperBody:
41    HttpBody<Data = Self::BodyData, Error = Self::BodyError> + Send + From<Vec<u8>> + 'static
42{
43    /// Values yielded by the `Body`.
44    type BodyData: Send;
45    /// The error type this `Body` might generate.
46    type BodyError: Error + Send + Sync + 'static;
47}
48
49impl<B> HyperBody for B
50where
51    B: HttpBody + Send + From<Vec<u8>> + 'static,
52    B::Data: Send,
53    B::Error: Error + Send + Sync + 'static,
54{
55    type BodyData = B::Data;
56    type BodyError = B::Error;
57}
58
59/// Trait representing the contraints on [`Service`] that [`HyperTransport`] requires.
60pub trait HyperService<B1: HyperBody>:
61    Send
62    + Sync
63    + Clone
64    + Service<
65        Request<B1>,
66        Response = Response<Self::ResponseBody>,
67        Error = hyper::Error,
68        Future = Self::ServiceFuture,
69    >
70{
71    /// Values yielded in the `Body` of the `Response`.
72    type ResponseBody: HyperBody;
73    /// The future response value.
74    type ServiceFuture: Send + Future<Output = Result<Self::Response, Self::Error>>;
75}
76
77impl<B1, B2, S> HyperService<B1> for S
78where
79    B1: HyperBody,
80    B2: HyperBody,
81    S: Send + Sync + Clone + Service<Request<B1>, Response = Response<B2>, Error = hyper::Error>,
82    S::Future: Send,
83{
84    type ResponseBody = B2;
85    type ServiceFuture = S::Future;
86}
87
88impl<B1: HyperBody> HyperTransport<B1> {
89    /// Creates a replica transport from a HTTP URL.
90    pub fn create<U: Into<Uri>>(url: U) -> Result<Self, AgentError> {
91        let connector = HttpsConnectorBuilder::new()
92            .with_webpki_roots()
93            .https_or_http()
94            .enable_http1()
95            .enable_http2()
96            .build();
97        Self::create_with_service(url, Client::builder().build(connector))
98    }
99}
100
101impl<B1, S> HyperTransport<B1, S>
102where
103    B1: HyperBody,
104    S: HyperService<B1>,
105{
106    /// Creates a replica transport from a HTTP URL and a [`HyperService`].
107    pub fn create_with_service<U: Into<Uri>>(url: U, service: S) -> Result<Self, AgentError> {
108        // Parse the url
109        let url = url.into();
110        let mut parts = url.clone().into_parts();
111        parts.authority = parts
112            .authority
113            .map(|v| {
114                let host = v.host();
115                let host = match host.len().checked_sub(IC0_SUB_DOMAIN.len()) {
116                    None => host,
117                    Some(start) if host[start..].eq_ignore_ascii_case(IC0_SUB_DOMAIN) => IC0_DOMAIN,
118                    Some(_) => host,
119                };
120                let port = v.port();
121                let (colon, port) = match port.as_ref() {
122                    Some(v) => (":", v.as_str()),
123                    None => ("", ""),
124                };
125                Authority::from_maybe_shared(Bytes::from(format!("{host}{colon}{port}")))
126            })
127            .transpose()
128            .map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?;
129        parts.path_and_query = Some(
130            parts
131                .path_and_query
132                .map_or(Ok(PathAndQuery::from_static("/api/v2")), |v| {
133                    let mut found = false;
134                    fn replace<T>(a: T, b: &mut T) -> T {
135                        std::mem::replace(b, a)
136                    }
137                    let v = v
138                        .path()
139                        .trim_end_matches(|c| !replace(found || c == '/', &mut found));
140                    PathAndQuery::from_maybe_shared(Bytes::from(format!("{v}/api/v2")))
141                })
142                .map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?,
143        );
144        let url =
145            Uri::from_parts(parts).map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?;
146
147        Ok(Self {
148            _marker: PhantomData,
149            url,
150            service,
151            max_response_body_size: None,
152        })
153    }
154
155    /// Sets a max response body size limit
156    pub fn with_max_response_body_size(self, max_response_body_size: usize) -> Self {
157        Self {
158            max_response_body_size: Some(max_response_body_size),
159            ..self
160        }
161    }
162
163    async fn request(
164        &self,
165        method: Method,
166        url: String,
167        body: Option<Vec<u8>>,
168    ) -> Result<Vec<u8>, AgentError> {
169        let http_request = Request::builder()
170            .method(method)
171            .uri(url)
172            .header(CONTENT_TYPE, "application/cbor")
173            .body(body.unwrap_or_default().into())
174            .map_err(|err| AgentError::TransportError(Box::new(err)))?;
175
176        fn map_error<E: Error + Send + Sync + 'static>(err: E) -> AgentError {
177            if any::TypeId::of::<E>() == any::TypeId::of::<AgentError>() {
178                // Store the value in an `Option` so we can `take`
179                // it after casting to `&mut dyn Any`.
180                let mut slot = Some(err);
181
182                // Re-write the `$val` ident with the downcasted value.
183                let val = (&mut slot as &mut dyn any::Any)
184                    .downcast_mut::<Option<AgentError>>()
185                    .unwrap()
186                    .take()
187                    .unwrap();
188
189                // Run the $body in scope of the replaced val.
190                return val;
191            }
192            AgentError::TransportError(Box::new(err))
193        }
194        let response = self
195            .service
196            .clone()
197            .call(http_request)
198            .await
199            .map_err(map_error)?;
200
201        let (parts, body) = response.into_parts();
202        let body = if let Some(limit) = self.max_response_body_size {
203            hyper::body::to_bytes(Limited::new(body, limit))
204                .await
205                .map_err(|err| {
206                    if err.downcast_ref::<LengthLimitError>().is_some() {
207                        AgentError::ResponseSizeExceededLimit()
208                    } else {
209                        AgentError::TransportError(err)
210                    }
211                })?
212        } else {
213            hyper::body::to_bytes(body)
214                .await
215                .map_err(|err| AgentError::TransportError(Box::new(err)))?
216        };
217
218        let (status, headers, body) = (parts.status, parts.headers, body.to_vec());
219        if status.is_client_error() || status.is_server_error() {
220            Err(AgentError::HttpError(HttpErrorPayload {
221                status: status.into(),
222                content_type: headers
223                    .get(CONTENT_TYPE)
224                    .and_then(|value| value.to_str().ok())
225                    .map(|x| x.to_string()),
226                content: body,
227            }))
228        } else {
229            Ok(body)
230        }
231    }
232}
233
234impl<B1, S> Transport for HyperTransport<B1, S>
235where
236    B1: HyperBody,
237    S: HyperService<B1>,
238{
239    fn call(
240        &self,
241        effective_canister_id: Principal,
242        envelope: Vec<u8>,
243        _request_id: RequestId,
244    ) -> AgentFuture<()> {
245        Box::pin(async move {
246            let url = format!("{}/canister/{effective_canister_id}/call", self.url);
247            self.request(Method::POST, url, Some(envelope)).await?;
248            Ok(())
249        })
250    }
251
252    fn read_state(
253        &self,
254        effective_canister_id: Principal,
255        envelope: Vec<u8>,
256    ) -> AgentFuture<Vec<u8>> {
257        Box::pin(async move {
258            let url = format!("{}/canister/{effective_canister_id}/read_state", self.url);
259            self.request(Method::POST, url, Some(envelope)).await
260        })
261    }
262
263    fn query(&self, effective_canister_id: Principal, envelope: Vec<u8>) -> AgentFuture<Vec<u8>> {
264        Box::pin(async move {
265            let url = format!("{}/canister/{effective_canister_id}/query", self.url);
266            self.request(Method::POST, url, Some(envelope)).await
267        })
268    }
269
270    fn status(&self) -> AgentFuture<Vec<u8>> {
271        Box::pin(async move {
272            let url = format!("{}/status", self.url);
273            self.request(Method::GET, url, None).await
274        })
275    }
276}
277
278#[cfg(test)]
279mod test {
280    use super::HyperTransport;
281    use hyper::{Client, Uri};
282
283    #[test]
284    fn redirect() {
285        fn test(base: &str, result: &str) {
286            let client: Client<_> = Client::builder().build_http();
287            let uri: Uri = base.parse().unwrap();
288            let t = HyperTransport::create_with_service(uri, client).unwrap();
289            assert_eq!(t.url, result, "{}", base);
290        }
291
292        test("https://ic0.app", "https://ic0.app/api/v2");
293        test("https://IC0.app", "https://ic0.app/api/v2");
294        test("https://foo.ic0.app", "https://ic0.app/api/v2");
295        test("https://foo.IC0.app", "https://ic0.app/api/v2");
296        test("https://foo.Ic0.app", "https://ic0.app/api/v2");
297        test("https://foo.iC0.app", "https://ic0.app/api/v2");
298        test("https://foo.bar.ic0.app", "https://ic0.app/api/v2");
299        test("https://ic0.app/foo/", "https://ic0.app/foo/api/v2");
300        test("https://foo.ic0.app/foo/", "https://ic0.app/foo/api/v2");
301
302        test("https://ic1.app", "https://ic1.app/api/v2");
303        test("https://foo.ic1.app", "https://foo.ic1.app/api/v2");
304        test("https://ic0.app.ic1.app", "https://ic0.app.ic1.app/api/v2");
305
306        test("https://fooic0.app", "https://fooic0.app/api/v2");
307        test("https://fooic0.app.ic0.app", "https://ic0.app/api/v2");
308    }
309}