1use futures::TryStreamExt;
7use futures_core::stream::Stream;
8use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody};
9use hyper::http::StatusCode;
10use hyper::{body::Body, body::Bytes, Method};
11use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
12use hyper_util::client::legacy::{connect::HttpConnector, Client};
13use hyper_util::rt::TokioExecutor;
14use serde::{de::DeserializeOwned, Deserialize, Serialize};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::io::AsyncRead;
18use tokio::time::timeout;
19
20const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
22
23use crate::prelude::*;
24use cloudillo_types::action_types::CreateAction;
25use cloudillo_types::auth_adapter::AuthAdapter;
26
27fn to_boxed<B>(body: B) -> BoxBody<Bytes, Error>
28where
29 B: Body<Data = Bytes> + Send + Sync + 'static,
30 B::Error: Send + 'static,
31{
32 body.map_err(|_err| Error::NetworkError("body stream error".into())).boxed()
33}
34
35#[derive(Deserialize)]
36struct TokenData {
37 token: Box<str>,
38}
39
40#[derive(Deserialize)]
41struct TokenRes {
42 data: TokenData,
43}
44
45#[derive(Debug)]
47pub enum ConditionalResult<T> {
48 Modified { data: T, etag: Option<Box<str>> },
50 NotModified,
52}
53
54#[derive(Debug, Clone)]
55pub struct Request {
56 pub auth_adapter: Arc<dyn AuthAdapter>,
57 client: Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, Error>>,
58}
59
60impl Request {
61 pub fn new(auth_adapter: Arc<dyn AuthAdapter>) -> ClResult<Self> {
62 let client = HttpsConnectorBuilder::new()
63 .with_native_roots()
64 .map_err(|_| Error::ConfigError("no native root CA certificates found".into()))?
65 .https_only()
66 .enable_http1()
67 .build();
68
69 Ok(Request { auth_adapter, client: Client::builder(TokioExecutor::new()).build(client) })
70 }
71
72 async fn timed_request(
74 &self,
75 req: hyper::Request<BoxBody<Bytes, Error>>,
76 ) -> ClResult<hyper::Response<hyper::body::Incoming>> {
77 timeout(REQUEST_TIMEOUT, self.client.request(req))
78 .await
79 .map_err(|_| Error::Timeout)?
80 .map_err(Error::from)
81 }
82
83 async fn collect_body(body: hyper::body::Incoming) -> ClResult<Bytes> {
85 timeout(REQUEST_TIMEOUT, body.collect())
86 .await
87 .map_err(|_| Error::Timeout)?
88 .map_err(|_| Error::NetworkError("body collection error".into()))
89 .map(http_body_util::Collected::to_bytes)
90 }
91
92 pub async fn create_proxy_token(
93 &self,
94 tn_id: TnId,
95 id_tag: &str,
96 subject: Option<&str>,
97 ) -> ClResult<Box<str>> {
98 let auth_token = self
99 .auth_adapter
100 .create_action_token(
101 tn_id,
102 CreateAction {
103 typ: "PROXY".into(),
104 audience_tag: Some(id_tag.into()),
105 expires_at: Some(Timestamp::from_now(60)), ..Default::default()
107 },
108 )
109 .await?;
110 let req = hyper::Request::builder()
111 .method(Method::GET)
112 .uri(format!(
113 "https://cl-o.{}/api/auth/access-token?token={}{}",
114 id_tag,
115 auth_token,
116 if let Some(subject) = subject {
117 format!("&subject={}", subject)
118 } else {
119 String::new()
120 }
121 ))
122 .body(to_boxed(Empty::new()))?;
123 let res = self.timed_request(req).await?;
124 if !res.status().is_success() {
125 return Err(Error::PermissionDenied);
126 }
127 let parsed: TokenRes = serde_json::from_slice(&Self::collect_body(res.into_body()).await?)?;
128
129 Ok(parsed.data.token)
130 }
131
132 pub async fn get_bin(
133 &self,
134 tn_id: TnId,
135 id_tag: &str,
136 path: &str,
137 auth: bool,
138 ) -> ClResult<Bytes> {
139 let req = hyper::Request::builder()
140 .method(Method::GET)
141 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
142 let req = if auth {
143 req.header(
144 "Authorization",
145 format!("Bearer {}", self.create_proxy_token(tn_id, id_tag, None).await?),
146 )
147 } else {
148 req
149 };
150 let req = req.body(to_boxed(Empty::new()))?;
151 let res = self.timed_request(req).await?;
152 info!("Got response: {}", res.status());
153 match res.status() {
154 StatusCode::OK => Self::collect_body(res.into_body()).await,
155 StatusCode::NOT_FOUND => Err(Error::NotFound),
156 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
157 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
158 }
159 }
160
161 pub async fn get_stream(
167 &self,
168 tn_id: TnId,
169 id_tag: &str,
170 path: &str,
171 ) -> ClResult<impl AsyncRead + Send + Unpin> {
172 let token = self.create_proxy_token(tn_id, id_tag, None).await?;
173 debug!("Got proxy token (len={})", token.len());
174 let req = hyper::Request::builder()
175 .method(Method::GET)
176 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
177 .header("Authorization", format!("Bearer {}", token))
178 .body(to_boxed(Empty::new()))?;
179 let res = self.timed_request(req).await?;
180 match res.status() {
181 StatusCode::OK => {
185 let stream = res.into_body().into_data_stream()
186 .map_err(std::io::Error::other);
188 Ok(tokio_util::io::StreamReader::new(stream))
189 }
190 StatusCode::NOT_FOUND => Err(Error::NotFound),
191 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
192 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
193 }
194 }
195
196 pub async fn get<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
197 where
198 Res: DeserializeOwned,
199 {
200 let res = self.get_bin(tn_id, id_tag, path, true).await?;
201 let parsed: Res = serde_json::from_slice(&res)?;
202 Ok(parsed)
203 }
204
205 pub async fn get_noauth<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
206 where
207 Res: DeserializeOwned,
208 {
209 let res = self.get_bin(tn_id, id_tag, path, false).await?;
210 let parsed: Res = serde_json::from_slice(&res)?;
211 Ok(parsed)
212 }
213
214 pub async fn get_public<Res>(&self, id_tag: &str, path: &str) -> ClResult<Res>
216 where
217 Res: DeserializeOwned,
218 {
219 let req = hyper::Request::builder()
220 .method(Method::GET)
221 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
222 .body(to_boxed(Empty::new()))?;
223 let res = self.timed_request(req).await?;
224 match res.status() {
225 StatusCode::OK => {
226 let bytes = Self::collect_body(res.into_body()).await?;
227 let parsed: Res = serde_json::from_slice(&bytes)?;
228 Ok(parsed)
229 }
230 StatusCode::NOT_FOUND => Err(Error::NotFound),
231 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
232 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
233 }
234 }
235
236 pub async fn get_conditional<Res>(
241 &self,
242 id_tag: &str,
243 path: &str,
244 etag: Option<&str>,
245 ) -> ClResult<ConditionalResult<Res>>
246 where
247 Res: DeserializeOwned,
248 {
249 let mut builder = hyper::Request::builder()
250 .method(Method::GET)
251 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
252
253 if let Some(etag) = etag {
255 builder = builder.header("If-None-Match", etag);
256 }
257
258 let req = builder.body(to_boxed(Empty::new()))?;
259 let res = self.timed_request(req).await?;
260
261 match res.status() {
262 StatusCode::NOT_MODIFIED => Ok(ConditionalResult::NotModified),
263 StatusCode::OK => {
264 let new_etag = res
266 .headers()
267 .get("etag")
268 .and_then(|v| v.to_str().ok())
269 .map(|s| s.trim_matches('"').into());
270
271 let bytes = Self::collect_body(res.into_body()).await?;
272 let parsed: Res = serde_json::from_slice(&bytes)?;
273 Ok(ConditionalResult::Modified { data: parsed, etag: new_etag })
274 }
275 StatusCode::NOT_FOUND => Err(Error::NotFound),
276 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
277 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
278 }
279 }
280
281 pub async fn post_public<Req, Res>(&self, id_tag: &str, path: &str, data: &Req) -> ClResult<Res>
283 where
284 Req: Serialize,
285 Res: DeserializeOwned,
286 {
287 let json_data = serde_json::to_vec(data)?;
288 let req = hyper::Request::builder()
289 .method(Method::POST)
290 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
291 .header("Content-Type", "application/json")
292 .body(to_boxed(Full::from(json_data)))?;
293 let res = self.timed_request(req).await?;
294 match res.status() {
295 StatusCode::OK | StatusCode::CREATED => {
296 let bytes = Self::collect_body(res.into_body()).await?;
297 let parsed: Res = serde_json::from_slice(&bytes)?;
298 Ok(parsed)
299 }
300 StatusCode::NOT_FOUND => Err(Error::NotFound),
301 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
302 StatusCode::UNPROCESSABLE_ENTITY => Err(Error::ValidationError(
303 "IDP registration failed - validation error".to_string(),
304 )),
305 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
306 }
307 }
308
309 pub async fn post_bin(
310 &self,
311 _tn_id: TnId,
312 id_tag: &str,
313 path: &str,
314 data: Bytes,
315 ) -> ClResult<Bytes> {
316 let req = hyper::Request::builder()
317 .method(Method::POST)
318 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
319 .header("Content-Type", "application/json")
320 .body(to_boxed(Full::from(data)))?;
321 let res = self.timed_request(req).await?;
322 Self::collect_body(res.into_body()).await
323 }
324
325 pub async fn post_stream<S>(
326 &self,
327 _tn_id: TnId,
328 id_tag: &str,
329 path: &str,
330 stream: S,
331 ) -> ClResult<Bytes>
332 where
333 S: Stream<Item = Result<hyper::body::Frame<Bytes>, hyper::Error>> + Send + Sync + 'static,
334 {
335 let req = hyper::Request::builder()
336 .method(Method::POST)
337 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
338 .body(to_boxed(StreamBody::new(stream)))?;
339 let res = self.timed_request(req).await?;
340 Self::collect_body(res.into_body()).await
341 }
342
343 pub async fn post<Res>(
344 &self,
345 tn_id: TnId,
346 id_tag: &str,
347 path: &str,
348 data: &impl Serialize,
349 ) -> ClResult<Res>
350 where
351 Res: DeserializeOwned,
352 {
353 let res = self.post_bin(tn_id, id_tag, path, serde_json::to_vec(data)?.into()).await?;
354 let parsed: Res = serde_json::from_slice(&res)?;
355 Ok(parsed)
356 }
357}
358
359