1use futures::TryStreamExt;
7use futures_core::stream::Stream;
8use http_body_util::{BodyExt, Empty, Full, StreamBody, combinators::BoxBody};
9use hyper::http::StatusCode;
10use hyper::{Method, body::Body, body::Bytes};
11use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
12use hyper_util::client::legacy::{Client, connect::HttpConnector};
13use hyper_util::rt::TokioExecutor;
14use serde::{Deserialize, Serialize, de::DeserializeOwned};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::io::AsyncRead;
18use tokio::time::timeout;
19
20const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
22
23use crate::prelude::*;
24use cloudillo_types::action_types::CreateAction;
25use cloudillo_types::auth_adapter::AuthAdapter;
26
27fn to_boxed<B>(body: B) -> BoxBody<Bytes, Error>
28where
29 B: Body<Data = Bytes> + Send + Sync + 'static,
30 B::Error: Send + 'static,
31{
32 body.map_err(|_err| Error::NetworkError("body stream error".into())).boxed()
33}
34
35#[derive(Deserialize)]
36struct TokenData {
37 token: Box<str>,
38}
39
40#[derive(Deserialize)]
41struct TokenRes {
42 data: TokenData,
43}
44
45#[derive(Debug)]
47pub enum ConditionalResult<T> {
48 Modified { data: T, etag: Option<Box<str>> },
50 NotModified,
52}
53
54#[derive(Debug, Clone)]
55pub struct Request {
56 pub auth_adapter: Arc<dyn AuthAdapter>,
57 client: Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, Error>>,
58 proxy_tokens: Arc<crate::ProxyTokenCache>,
59}
60
61impl Request {
62 pub fn new(
63 auth_adapter: Arc<dyn AuthAdapter>,
64 proxy_tokens: Arc<crate::ProxyTokenCache>,
65 ) -> ClResult<Self> {
66 let client = HttpsConnectorBuilder::new()
67 .with_native_roots()
68 .map_err(|_| Error::ConfigError("no native root CA certificates found".into()))?
69 .https_only()
70 .enable_http1()
71 .build();
72
73 Ok(Request {
74 auth_adapter,
75 client: Client::builder(TokioExecutor::new()).build(client),
76 proxy_tokens,
77 })
78 }
79
80 async fn timed_request(
82 &self,
83 req: hyper::Request<BoxBody<Bytes, Error>>,
84 ) -> ClResult<hyper::Response<hyper::body::Incoming>> {
85 timeout(REQUEST_TIMEOUT, self.client.request(req))
86 .await
87 .map_err(|_| Error::Timeout)?
88 .map_err(Error::from)
89 }
90
91 async fn collect_body(body: hyper::body::Incoming) -> ClResult<Bytes> {
93 timeout(REQUEST_TIMEOUT, body.collect())
94 .await
95 .map_err(|_| Error::Timeout)?
96 .map_err(|_| Error::NetworkError("body collection error".into()))
97 .map(http_body_util::Collected::to_bytes)
98 }
99
100 pub async fn create_proxy_token(
106 &self,
107 tn_id: TnId,
108 id_tag: &str,
109 subject: Option<&str>,
110 ) -> ClResult<Box<str>> {
111 let auth_token = self
112 .auth_adapter
113 .create_action_token(
114 tn_id,
115 CreateAction {
116 typ: "PROXY".into(),
117 audience_tag: Some(id_tag.into()),
118 expires_at: Some(Timestamp::from_now(60)), ..Default::default()
120 },
121 )
122 .await?;
123 let req = hyper::Request::builder()
124 .method(Method::GET)
125 .uri(format!(
126 "https://cl-o.{}/api/auth/access-token?token={}{}",
127 id_tag,
128 auth_token,
129 if let Some(subject) = subject {
130 format!("&subject={}", subject)
131 } else {
132 String::new()
133 }
134 ))
135 .body(to_boxed(Empty::new()))?;
136 let res = self.timed_request(req).await?;
137 if !res.status().is_success() {
138 return Err(Error::PermissionDenied);
139 }
140 let parsed: TokenRes = serde_json::from_slice(&Self::collect_body(res.into_body()).await?)?;
141
142 Ok(parsed.data.token)
143 }
144
145 async fn get_or_mint_proxy_token(&self, tn_id: TnId, id_tag: &str) -> ClResult<Box<str>> {
149 if let Some(token) = self.proxy_tokens.get(tn_id, id_tag) {
150 return Ok(token);
151 }
152 let token = self.create_proxy_token(tn_id, id_tag, None).await?;
153 self.proxy_tokens.insert(tn_id, id_tag, token.clone());
154 Ok(token)
155 }
156
157 pub async fn get_bin(
158 &self,
159 tn_id: TnId,
160 id_tag: &str,
161 path: &str,
162 auth: bool,
163 ) -> ClResult<Bytes> {
164 let mut attempt = 0u8;
165 loop {
166 let req = hyper::Request::builder()
167 .method(Method::GET)
168 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
169 let req = if auth {
170 req.header(
171 "Authorization",
172 format!("Bearer {}", self.get_or_mint_proxy_token(tn_id, id_tag).await?),
173 )
174 } else {
175 req
176 };
177 let req = req.body(to_boxed(Empty::new()))?;
178 let res = self.timed_request(req).await?;
179 debug!(status = %res.status(), "federated GET response");
180 match res.status() {
181 StatusCode::OK => return Self::collect_body(res.into_body()).await,
182 StatusCode::NOT_FOUND => return Err(Error::NotFound),
183 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if auth && attempt == 0 => {
184 debug!(id_tag = %id_tag, path = %path,
185 "auth rejected, refreshing cached token and retrying");
186 self.proxy_tokens.invalidate(tn_id, id_tag);
187 attempt += 1;
188 }
189 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
190 return Err(Error::PermissionDenied);
191 }
192 code => {
193 return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
194 }
195 }
196 }
197 }
198
199 pub async fn get_stream(
205 &self,
206 tn_id: TnId,
207 id_tag: &str,
208 path: &str,
209 auth: bool,
210 ) -> ClResult<impl AsyncRead + Send + Unpin + use<>> {
211 let mut attempt = 0u8;
212 loop {
213 let req = hyper::Request::builder()
214 .method(Method::GET)
215 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
216 let req = if auth {
217 let token = self.get_or_mint_proxy_token(tn_id, id_tag).await?;
218 debug!("Got proxy token (len={})", token.len());
219 req.header("Authorization", format!("Bearer {}", token))
220 } else {
221 req
222 };
223 let req = req.body(to_boxed(Empty::new()))?;
224 let res = self.timed_request(req).await?;
225 match res.status() {
226 StatusCode::OK => {
227 let stream = res.into_body().into_data_stream().map_err(std::io::Error::other);
228 return Ok(tokio_util::io::StreamReader::new(stream));
229 }
230 StatusCode::NOT_FOUND => return Err(Error::NotFound),
231 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if auth && attempt == 0 => {
232 debug!(id_tag = %id_tag, path = %path,
233 "auth rejected on stream, refreshing cached token and retrying");
234 self.proxy_tokens.invalidate(tn_id, id_tag);
235 attempt += 1;
236 }
237 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
238 return Err(Error::PermissionDenied);
239 }
240 code => {
241 return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
242 }
243 }
244 }
245 }
246
247 pub async fn get<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
248 where
249 Res: DeserializeOwned,
250 {
251 let res = self.get_bin(tn_id, id_tag, path, true).await?;
252 let parsed: Res = serde_json::from_slice(&res)?;
253 Ok(parsed)
254 }
255
256 pub async fn get_noauth<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
257 where
258 Res: DeserializeOwned,
259 {
260 let res = self.get_bin(tn_id, id_tag, path, false).await?;
261 let parsed: Res = serde_json::from_slice(&res)?;
262 Ok(parsed)
263 }
264
265 pub async fn get_public<Res>(&self, id_tag: &str, path: &str) -> ClResult<Res>
267 where
268 Res: DeserializeOwned,
269 {
270 let req = hyper::Request::builder()
271 .method(Method::GET)
272 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
273 .body(to_boxed(Empty::new()))?;
274 let res = self.timed_request(req).await?;
275 match res.status() {
276 StatusCode::OK => {
277 let bytes = Self::collect_body(res.into_body()).await?;
278 let parsed: Res = serde_json::from_slice(&bytes)?;
279 Ok(parsed)
280 }
281 StatusCode::NOT_FOUND => Err(Error::NotFound),
282 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
283 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
284 }
285 }
286
287 pub async fn get_conditional<Res>(
292 &self,
293 id_tag: &str,
294 path: &str,
295 etag: Option<&str>,
296 ) -> ClResult<ConditionalResult<Res>>
297 where
298 Res: DeserializeOwned,
299 {
300 let mut builder = hyper::Request::builder()
301 .method(Method::GET)
302 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
303
304 if let Some(etag) = etag {
306 builder = builder.header("If-None-Match", etag);
307 }
308
309 let req = builder.body(to_boxed(Empty::new()))?;
310 let res = self.timed_request(req).await?;
311
312 match res.status() {
313 StatusCode::NOT_MODIFIED => Ok(ConditionalResult::NotModified),
314 StatusCode::OK => {
315 let new_etag = res
317 .headers()
318 .get("etag")
319 .and_then(|v| v.to_str().ok())
320 .map(|s| s.trim_matches('"').into());
321
322 let bytes = Self::collect_body(res.into_body()).await?;
323 let parsed: Res = serde_json::from_slice(&bytes)?;
324 Ok(ConditionalResult::Modified { data: parsed, etag: new_etag })
325 }
326 StatusCode::NOT_FOUND => Err(Error::NotFound),
327 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
328 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
329 }
330 }
331
332 pub async fn post_public<Req, Res>(&self, id_tag: &str, path: &str, data: &Req) -> ClResult<Res>
334 where
335 Req: Serialize,
336 Res: DeserializeOwned,
337 {
338 let json_data = serde_json::to_vec(data)?;
339 let req = hyper::Request::builder()
340 .method(Method::POST)
341 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
342 .header("Content-Type", "application/json")
343 .body(to_boxed(Full::from(json_data)))?;
344 let res = self.timed_request(req).await?;
345 match res.status() {
346 StatusCode::OK | StatusCode::CREATED => {
347 let bytes = Self::collect_body(res.into_body()).await?;
348 let parsed: Res = serde_json::from_slice(&bytes)?;
349 Ok(parsed)
350 }
351 StatusCode::NOT_FOUND => Err(Error::NotFound),
352 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
353 StatusCode::UNPROCESSABLE_ENTITY => Err(Error::ValidationError(
354 "IDP registration failed - validation error".to_string(),
355 )),
356 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
357 }
358 }
359
360 pub async fn post_bin(
366 &self,
367 _tn_id: TnId,
368 id_tag: &str,
369 path: &str,
370 data: Bytes,
371 ) -> ClResult<Bytes> {
372 let req = hyper::Request::builder()
373 .method(Method::POST)
374 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
375 .header("Content-Type", "application/json")
376 .body(to_boxed(Full::from(data)))?;
377 let res = self.timed_request(req).await?;
378 Self::collect_body(res.into_body()).await
379 }
380
381 pub async fn post_stream<S>(
384 &self,
385 _tn_id: TnId,
386 id_tag: &str,
387 path: &str,
388 stream: S,
389 ) -> ClResult<Bytes>
390 where
391 S: Stream<Item = Result<hyper::body::Frame<Bytes>, hyper::Error>> + Send + Sync + 'static,
392 {
393 let req = hyper::Request::builder()
394 .method(Method::POST)
395 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
396 .body(to_boxed(StreamBody::new(stream)))?;
397 let res = self.timed_request(req).await?;
398 Self::collect_body(res.into_body()).await
399 }
400
401 pub async fn post<Res>(
402 &self,
403 tn_id: TnId,
404 id_tag: &str,
405 path: &str,
406 data: &impl Serialize,
407 ) -> ClResult<Res>
408 where
409 Res: DeserializeOwned,
410 {
411 let res = self.post_bin(tn_id, id_tag, path, serde_json::to_vec(data)?.into()).await?;
412 let parsed: Res = serde_json::from_slice(&res)?;
413 Ok(parsed)
414 }
415
416 pub async fn post_json_authed(
426 &self,
427 tn_id: TnId,
428 id_tag: &str,
429 path: &str,
430 data: Bytes,
431 ) -> ClResult<Bytes> {
432 let mut attempt = 0u8;
433 loop {
434 let token = self.get_or_mint_proxy_token(tn_id, id_tag).await?;
435 let req = hyper::Request::builder()
436 .method(Method::POST)
437 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
438 .header("Content-Type", "application/json")
439 .header("Authorization", format!("Bearer {}", token))
440 .body(to_boxed(Full::from(data.clone())))?;
441 let res = self.timed_request(req).await?;
442 debug!(status = %res.status(), "federated POST response");
443 match res.status() {
444 StatusCode::NOT_FOUND => return Err(Error::NotFound),
445 StatusCode::GONE => return Err(Error::Gone),
446 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if attempt == 0 => {
447 debug!(id_tag = %id_tag, path = %path,
448 "auth rejected on POST, refreshing cached token and retrying");
449 self.proxy_tokens.invalidate(tn_id, id_tag);
450 attempt += 1;
451 }
452 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
453 return Err(Error::PermissionDenied);
454 }
455 code if code.is_success() => {
456 return Self::collect_body(res.into_body()).await;
461 }
462 code => {
463 return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
464 }
465 }
466 }
467 }
468
469 pub async fn post_authed<Res>(
471 &self,
472 tn_id: TnId,
473 id_tag: &str,
474 path: &str,
475 data: &impl Serialize,
476 ) -> ClResult<Res>
477 where
478 Res: DeserializeOwned,
479 {
480 let res = self
481 .post_json_authed(tn_id, id_tag, path, serde_json::to_vec(data)?.into())
482 .await?;
483 let parsed: Res = serde_json::from_slice(&res)?;
484 Ok(parsed)
485 }
486}
487
488