ex3_ic_agent/agent/http_transport/
hyper_transport.rs1pub use hyper;
3
4use std::{any, error::Error, future::Future, marker::PhantomData, sync::atomic::AtomicPtr};
5
6use http::uri::{Authority, PathAndQuery};
7use http_body::{LengthLimitError, Limited};
8use hyper::{
9 body::{Bytes, HttpBody},
10 client::HttpConnector,
11 header::CONTENT_TYPE,
12 service::Service,
13 Client, Method, Request, Response, Uri,
14};
15use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
16
17use crate::{
18 agent::{
19 agent_error::HttpErrorPayload,
20 http_transport::{IC0_DOMAIN, IC0_SUB_DOMAIN},
21 AgentFuture, Transport,
22 },
23 export::Principal,
24 AgentError, RequestId,
25};
26
27#[derive(Debug)]
29pub struct HyperTransport<B1, S = Client<HttpsConnector<HttpConnector>, B1>> {
30 _marker: PhantomData<AtomicPtr<B1>>,
31 url: Uri,
32 max_response_body_size: Option<usize>,
33 service: S,
34}
35
36#[doc(hidden)]
37pub use HyperTransport as HyperReplicaV2Transport; pub trait HyperBody:
41 HttpBody<Data = Self::BodyData, Error = Self::BodyError> + Send + From<Vec<u8>> + 'static
42{
43 type BodyData: Send;
45 type BodyError: Error + Send + Sync + 'static;
47}
48
49impl<B> HyperBody for B
50where
51 B: HttpBody + Send + From<Vec<u8>> + 'static,
52 B::Data: Send,
53 B::Error: Error + Send + Sync + 'static,
54{
55 type BodyData = B::Data;
56 type BodyError = B::Error;
57}
58
59pub trait HyperService<B1: HyperBody>:
61 Send
62 + Sync
63 + Clone
64 + Service<
65 Request<B1>,
66 Response = Response<Self::ResponseBody>,
67 Error = hyper::Error,
68 Future = Self::ServiceFuture,
69 >
70{
71 type ResponseBody: HyperBody;
73 type ServiceFuture: Send + Future<Output = Result<Self::Response, Self::Error>>;
75}
76
77impl<B1, B2, S> HyperService<B1> for S
78where
79 B1: HyperBody,
80 B2: HyperBody,
81 S: Send + Sync + Clone + Service<Request<B1>, Response = Response<B2>, Error = hyper::Error>,
82 S::Future: Send,
83{
84 type ResponseBody = B2;
85 type ServiceFuture = S::Future;
86}
87
88impl<B1: HyperBody> HyperTransport<B1> {
89 pub fn create<U: Into<Uri>>(url: U) -> Result<Self, AgentError> {
91 let connector = HttpsConnectorBuilder::new()
92 .with_webpki_roots()
93 .https_or_http()
94 .enable_http1()
95 .enable_http2()
96 .build();
97 Self::create_with_service(url, Client::builder().build(connector))
98 }
99}
100
101impl<B1, S> HyperTransport<B1, S>
102where
103 B1: HyperBody,
104 S: HyperService<B1>,
105{
106 pub fn create_with_service<U: Into<Uri>>(url: U, service: S) -> Result<Self, AgentError> {
108 let url = url.into();
110 let mut parts = url.clone().into_parts();
111 parts.authority = parts
112 .authority
113 .map(|v| {
114 let host = v.host();
115 let host = match host.len().checked_sub(IC0_SUB_DOMAIN.len()) {
116 None => host,
117 Some(start) if host[start..].eq_ignore_ascii_case(IC0_SUB_DOMAIN) => IC0_DOMAIN,
118 Some(_) => host,
119 };
120 let port = v.port();
121 let (colon, port) = match port.as_ref() {
122 Some(v) => (":", v.as_str()),
123 None => ("", ""),
124 };
125 Authority::from_maybe_shared(Bytes::from(format!("{host}{colon}{port}")))
126 })
127 .transpose()
128 .map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?;
129 parts.path_and_query = Some(
130 parts
131 .path_and_query
132 .map_or(Ok(PathAndQuery::from_static("/api/v2")), |v| {
133 let mut found = false;
134 fn replace<T>(a: T, b: &mut T) -> T {
135 std::mem::replace(b, a)
136 }
137 let v = v
138 .path()
139 .trim_end_matches(|c| !replace(found || c == '/', &mut found));
140 PathAndQuery::from_maybe_shared(Bytes::from(format!("{v}/api/v2")))
141 })
142 .map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?,
143 );
144 let url =
145 Uri::from_parts(parts).map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?;
146
147 Ok(Self {
148 _marker: PhantomData,
149 url,
150 service,
151 max_response_body_size: None,
152 })
153 }
154
155 pub fn with_max_response_body_size(self, max_response_body_size: usize) -> Self {
157 Self {
158 max_response_body_size: Some(max_response_body_size),
159 ..self
160 }
161 }
162
163 async fn request(
164 &self,
165 method: Method,
166 url: String,
167 body: Option<Vec<u8>>,
168 ) -> Result<Vec<u8>, AgentError> {
169 let http_request = Request::builder()
170 .method(method)
171 .uri(url)
172 .header(CONTENT_TYPE, "application/cbor")
173 .body(body.unwrap_or_default().into())
174 .map_err(|err| AgentError::TransportError(Box::new(err)))?;
175
176 fn map_error<E: Error + Send + Sync + 'static>(err: E) -> AgentError {
177 if any::TypeId::of::<E>() == any::TypeId::of::<AgentError>() {
178 let mut slot = Some(err);
181
182 let val = (&mut slot as &mut dyn any::Any)
184 .downcast_mut::<Option<AgentError>>()
185 .unwrap()
186 .take()
187 .unwrap();
188
189 return val;
191 }
192 AgentError::TransportError(Box::new(err))
193 }
194 let response = self
195 .service
196 .clone()
197 .call(http_request)
198 .await
199 .map_err(map_error)?;
200
201 let (parts, body) = response.into_parts();
202 let body = if let Some(limit) = self.max_response_body_size {
203 hyper::body::to_bytes(Limited::new(body, limit))
204 .await
205 .map_err(|err| {
206 if err.downcast_ref::<LengthLimitError>().is_some() {
207 AgentError::ResponseSizeExceededLimit()
208 } else {
209 AgentError::TransportError(err)
210 }
211 })?
212 } else {
213 hyper::body::to_bytes(body)
214 .await
215 .map_err(|err| AgentError::TransportError(Box::new(err)))?
216 };
217
218 let (status, headers, body) = (parts.status, parts.headers, body.to_vec());
219 if status.is_client_error() || status.is_server_error() {
220 Err(AgentError::HttpError(HttpErrorPayload {
221 status: status.into(),
222 content_type: headers
223 .get(CONTENT_TYPE)
224 .and_then(|value| value.to_str().ok())
225 .map(|x| x.to_string()),
226 content: body,
227 }))
228 } else {
229 Ok(body)
230 }
231 }
232}
233
234impl<B1, S> Transport for HyperTransport<B1, S>
235where
236 B1: HyperBody,
237 S: HyperService<B1>,
238{
239 fn call(
240 &self,
241 effective_canister_id: Principal,
242 envelope: Vec<u8>,
243 _request_id: RequestId,
244 ) -> AgentFuture<()> {
245 Box::pin(async move {
246 let url = format!("{}/canister/{effective_canister_id}/call", self.url);
247 self.request(Method::POST, url, Some(envelope)).await?;
248 Ok(())
249 })
250 }
251
252 fn read_state(
253 &self,
254 effective_canister_id: Principal,
255 envelope: Vec<u8>,
256 ) -> AgentFuture<Vec<u8>> {
257 Box::pin(async move {
258 let url = format!("{}/canister/{effective_canister_id}/read_state", self.url);
259 self.request(Method::POST, url, Some(envelope)).await
260 })
261 }
262
263 fn query(&self, effective_canister_id: Principal, envelope: Vec<u8>) -> AgentFuture<Vec<u8>> {
264 Box::pin(async move {
265 let url = format!("{}/canister/{effective_canister_id}/query", self.url);
266 self.request(Method::POST, url, Some(envelope)).await
267 })
268 }
269
270 fn status(&self) -> AgentFuture<Vec<u8>> {
271 Box::pin(async move {
272 let url = format!("{}/status", self.url);
273 self.request(Method::GET, url, None).await
274 })
275 }
276}
277
278#[cfg(test)]
279mod test {
280 use super::HyperTransport;
281 use hyper::{Client, Uri};
282
283 #[test]
284 fn redirect() {
285 fn test(base: &str, result: &str) {
286 let client: Client<_> = Client::builder().build_http();
287 let uri: Uri = base.parse().unwrap();
288 let t = HyperTransport::create_with_service(uri, client).unwrap();
289 assert_eq!(t.url, result, "{}", base);
290 }
291
292 test("https://ic0.app", "https://ic0.app/api/v2");
293 test("https://IC0.app", "https://ic0.app/api/v2");
294 test("https://foo.ic0.app", "https://ic0.app/api/v2");
295 test("https://foo.IC0.app", "https://ic0.app/api/v2");
296 test("https://foo.Ic0.app", "https://ic0.app/api/v2");
297 test("https://foo.iC0.app", "https://ic0.app/api/v2");
298 test("https://foo.bar.ic0.app", "https://ic0.app/api/v2");
299 test("https://ic0.app/foo/", "https://ic0.app/foo/api/v2");
300 test("https://foo.ic0.app/foo/", "https://ic0.app/foo/api/v2");
301
302 test("https://ic1.app", "https://ic1.app/api/v2");
303 test("https://foo.ic1.app", "https://foo.ic1.app/api/v2");
304 test("https://ic0.app.ic1.app", "https://ic0.app.ic1.app/api/v2");
305
306 test("https://fooic0.app", "https://fooic0.app/api/v2");
307 test("https://fooic0.app.ic0.app", "https://ic0.app/api/v2");
308 }
309}