1use futures::TryStreamExt;
4use futures_core::stream::Stream;
5use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody};
6use hyper::http::StatusCode;
7use hyper::{body::Body, body::Bytes, Method};
8use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
9use hyper_util::client::legacy::{connect::HttpConnector, Client};
10use hyper_util::rt::TokioExecutor;
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::io::AsyncRead;
15use tokio::time::timeout;
16
17const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
19
20use crate::prelude::*;
21use cloudillo_types::action_types::CreateAction;
22use cloudillo_types::auth_adapter::AuthAdapter;
23
24fn to_boxed<B>(body: B) -> BoxBody<Bytes, Error>
25where
26 B: Body<Data = Bytes> + Send + Sync + 'static,
27 B::Error: Send + 'static,
28{
29 body.map_err(|_err| Error::NetworkError("body stream error".into())).boxed()
30}
31
32#[derive(Deserialize)]
33struct TokenData {
34 token: Box<str>,
35}
36
37#[derive(Deserialize)]
38struct TokenRes {
39 data: TokenData,
40}
41
42#[derive(Debug)]
44pub enum ConditionalResult<T> {
45 Modified { data: T, etag: Option<Box<str>> },
47 NotModified,
49}
50
51#[derive(Debug, Clone)]
52pub struct Request {
53 pub auth_adapter: Arc<dyn AuthAdapter>,
54 client: Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, Error>>,
55}
56
57impl Request {
58 pub fn new(auth_adapter: Arc<dyn AuthAdapter>) -> ClResult<Self> {
59 let client = HttpsConnectorBuilder::new()
60 .with_native_roots()
61 .map_err(|_| Error::ConfigError("no native root CA certificates found".into()))?
62 .https_only()
63 .enable_http1()
64 .build();
65
66 Ok(Request { auth_adapter, client: Client::builder(TokioExecutor::new()).build(client) })
67 }
68
69 async fn timed_request(
71 &self,
72 req: hyper::Request<BoxBody<Bytes, Error>>,
73 ) -> ClResult<hyper::Response<hyper::body::Incoming>> {
74 timeout(REQUEST_TIMEOUT, self.client.request(req))
75 .await
76 .map_err(|_| Error::Timeout)?
77 .map_err(Error::from)
78 }
79
80 async fn collect_body(body: hyper::body::Incoming) -> ClResult<Bytes> {
82 timeout(REQUEST_TIMEOUT, body.collect())
83 .await
84 .map_err(|_| Error::Timeout)?
85 .map_err(|_| Error::NetworkError("body collection error".into()))
86 .map(|collected| collected.to_bytes())
87 }
88
89 pub async fn create_proxy_token(
90 &self,
91 tn_id: TnId,
92 id_tag: &str,
93 subject: Option<&str>,
94 ) -> ClResult<Box<str>> {
95 let auth_token = self
96 .auth_adapter
97 .create_action_token(
98 tn_id,
99 CreateAction {
100 typ: "PROXY".into(),
101 audience_tag: Some(id_tag.into()),
102 expires_at: Some(Timestamp::from_now(60)), ..Default::default()
104 },
105 )
106 .await?;
107 let req = hyper::Request::builder()
108 .method(Method::GET)
109 .uri(format!(
110 "https://cl-o.{}/api/auth/access-token?token={}{}",
111 id_tag,
112 auth_token,
113 if let Some(subject) = subject {
114 format!("&subject={}", subject)
115 } else {
116 "".into()
117 }
118 ))
119 .body(to_boxed(Empty::new()))?;
120 let res = self.timed_request(req).await?;
121 if !res.status().is_success() {
122 return Err(Error::PermissionDenied);
123 }
124 let parsed: TokenRes = serde_json::from_slice(&Self::collect_body(res.into_body()).await?)?;
125
126 Ok(parsed.data.token)
127 }
128
129 pub async fn get_bin(
130 &self,
131 tn_id: TnId,
132 id_tag: &str,
133 path: &str,
134 auth: bool,
135 ) -> ClResult<Bytes> {
136 let req = hyper::Request::builder()
137 .method(Method::GET)
138 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
139 let req = if auth {
140 req.header(
141 "Authorization",
142 format!("Bearer {}", self.create_proxy_token(tn_id, id_tag, None).await?),
143 )
144 } else {
145 req
146 };
147 let req = req.body(to_boxed(Empty::new()))?;
148 let res = self.timed_request(req).await?;
149 info!("Got response: {}", res.status());
150 match res.status() {
151 StatusCode::OK => Self::collect_body(res.into_body()).await,
152 StatusCode::NOT_FOUND => Err(Error::NotFound),
153 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
154 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
155 }
156 }
157
158 pub async fn get_stream(
164 &self,
165 tn_id: TnId,
166 id_tag: &str,
167 path: &str,
168 ) -> ClResult<impl AsyncRead + Send + Unpin> {
169 let token = self.create_proxy_token(tn_id, id_tag, None).await?;
170 debug!("Got proxy token (len={})", token.len());
171 let req = hyper::Request::builder()
172 .method(Method::GET)
173 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
174 .header("Authorization", format!("Bearer {}", token))
175 .body(to_boxed(Empty::new()))?;
176 let res = self.timed_request(req).await?;
177 match res.status() {
178 StatusCode::OK => {
182 let stream = res.into_body().into_data_stream()
183 .map_err(std::io::Error::other);
185 Ok(tokio_util::io::StreamReader::new(stream))
186 }
187 StatusCode::NOT_FOUND => Err(Error::NotFound),
188 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
189 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
190 }
191 }
192
193 pub async fn get<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
194 where
195 Res: DeserializeOwned,
196 {
197 let res = self.get_bin(tn_id, id_tag, path, true).await?;
198 let parsed: Res = serde_json::from_slice(&res)?;
199 Ok(parsed)
200 }
201
202 pub async fn get_noauth<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
203 where
204 Res: DeserializeOwned,
205 {
206 let res = self.get_bin(tn_id, id_tag, path, false).await?;
207 let parsed: Res = serde_json::from_slice(&res)?;
208 Ok(parsed)
209 }
210
211 pub async fn get_public<Res>(&self, id_tag: &str, path: &str) -> ClResult<Res>
213 where
214 Res: DeserializeOwned,
215 {
216 let req = hyper::Request::builder()
217 .method(Method::GET)
218 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
219 .body(to_boxed(Empty::new()))?;
220 let res = self.timed_request(req).await?;
221 match res.status() {
222 StatusCode::OK => {
223 let bytes = Self::collect_body(res.into_body()).await?;
224 let parsed: Res = serde_json::from_slice(&bytes)?;
225 Ok(parsed)
226 }
227 StatusCode::NOT_FOUND => Err(Error::NotFound),
228 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
229 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
230 }
231 }
232
233 pub async fn get_conditional<Res>(
238 &self,
239 id_tag: &str,
240 path: &str,
241 etag: Option<&str>,
242 ) -> ClResult<ConditionalResult<Res>>
243 where
244 Res: DeserializeOwned,
245 {
246 let mut builder = hyper::Request::builder()
247 .method(Method::GET)
248 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
249
250 if let Some(etag) = etag {
252 builder = builder.header("If-None-Match", etag);
253 }
254
255 let req = builder.body(to_boxed(Empty::new()))?;
256 let res = self.timed_request(req).await?;
257
258 match res.status() {
259 StatusCode::NOT_MODIFIED => Ok(ConditionalResult::NotModified),
260 StatusCode::OK => {
261 let new_etag = res
263 .headers()
264 .get("etag")
265 .and_then(|v| v.to_str().ok())
266 .map(|s| s.trim_matches('"').into());
267
268 let bytes = Self::collect_body(res.into_body()).await?;
269 let parsed: Res = serde_json::from_slice(&bytes)?;
270 Ok(ConditionalResult::Modified { data: parsed, etag: new_etag })
271 }
272 StatusCode::NOT_FOUND => Err(Error::NotFound),
273 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
274 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
275 }
276 }
277
278 pub async fn post_public<Req, Res>(&self, id_tag: &str, path: &str, data: &Req) -> ClResult<Res>
280 where
281 Req: Serialize,
282 Res: DeserializeOwned,
283 {
284 let json_data = serde_json::to_vec(data)?;
285 let req = hyper::Request::builder()
286 .method(Method::POST)
287 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
288 .header("Content-Type", "application/json")
289 .body(to_boxed(Full::from(json_data)))?;
290 let res = self.timed_request(req).await?;
291 match res.status() {
292 StatusCode::OK | StatusCode::CREATED => {
293 let bytes = Self::collect_body(res.into_body()).await?;
294 let parsed: Res = serde_json::from_slice(&bytes)?;
295 Ok(parsed)
296 }
297 StatusCode::NOT_FOUND => Err(Error::NotFound),
298 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
299 StatusCode::UNPROCESSABLE_ENTITY => Err(Error::ValidationError(
300 "IDP registration failed - validation error".to_string(),
301 )),
302 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
303 }
304 }
305
306 pub async fn post_bin(
307 &self,
308 _tn_id: TnId,
309 id_tag: &str,
310 path: &str,
311 data: Bytes,
312 ) -> ClResult<Bytes> {
313 let req = hyper::Request::builder()
314 .method(Method::POST)
315 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
316 .header("Content-Type", "application/json")
317 .body(to_boxed(Full::from(data)))?;
318 let res = self.timed_request(req).await?;
319 Self::collect_body(res.into_body()).await
320 }
321
322 pub async fn post_stream<S>(
323 &self,
324 _tn_id: TnId,
325 id_tag: &str,
326 path: &str,
327 stream: S,
328 ) -> ClResult<Bytes>
329 where
330 S: Stream<Item = Result<hyper::body::Frame<Bytes>, hyper::Error>> + Send + Sync + 'static,
331 {
332 let req = hyper::Request::builder()
333 .method(Method::POST)
334 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
335 .body(to_boxed(StreamBody::new(stream)))?;
336 let res = self.timed_request(req).await?;
337 Self::collect_body(res.into_body()).await
338 }
339
340 pub async fn post<Res>(
341 &self,
342 tn_id: TnId,
343 id_tag: &str,
344 path: &str,
345 data: &impl Serialize,
346 ) -> ClResult<Res>
347 where
348 Res: DeserializeOwned,
349 {
350 let res = self.post_bin(tn_id, id_tag, path, serde_json::to_vec(data)?.into()).await?;
351 let parsed: Res = serde_json::from_slice(&res)?;
352 Ok(parsed)
353 }
354}
355
356