1use futures::TryStreamExt;
7use futures_core::stream::Stream;
8use http_body_util::{BodyExt, Empty, Full, StreamBody, combinators::BoxBody};
9use hyper::http::StatusCode;
10use hyper::{Method, body::Body, body::Bytes};
11use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
12use hyper_util::client::legacy::{Client, connect::HttpConnector};
13use hyper_util::rt::TokioExecutor;
14use serde::{Deserialize, Serialize, de::DeserializeOwned};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::io::AsyncRead;
18use tokio::time::timeout;
19
20const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
22
23use crate::prelude::*;
24use cloudillo_types::action_types::CreateAction;
25use cloudillo_types::auth_adapter::AuthAdapter;
26
27fn to_boxed<B>(body: B) -> BoxBody<Bytes, Error>
28where
29 B: Body<Data = Bytes> + Send + Sync + 'static,
30 B::Error: Send + 'static,
31{
32 body.map_err(|_err| Error::NetworkError("body stream error".into())).boxed()
33}
34
35#[derive(Deserialize)]
36struct TokenData {
37 token: Box<str>,
38}
39
40#[derive(Deserialize)]
41struct TokenRes {
42 data: TokenData,
43}
44
45#[derive(Debug)]
47pub enum ConditionalResult<T> {
48 Modified { data: T, etag: Option<Box<str>> },
50 NotModified,
52}
53
54#[derive(Debug, Clone)]
55pub struct Request {
56 pub auth_adapter: Arc<dyn AuthAdapter>,
57 client: Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, Error>>,
58 proxy_tokens: Arc<crate::ProxyTokenCache>,
59}
60
61impl Request {
62 pub fn new(
63 auth_adapter: Arc<dyn AuthAdapter>,
64 proxy_tokens: Arc<crate::ProxyTokenCache>,
65 ) -> ClResult<Self> {
66 let client = HttpsConnectorBuilder::new()
67 .with_native_roots()
68 .map_err(|_| Error::ConfigError("no native root CA certificates found".into()))?
69 .https_only()
70 .enable_http1()
71 .build();
72
73 Ok(Request {
74 auth_adapter,
75 client: Client::builder(TokioExecutor::new()).build(client),
76 proxy_tokens,
77 })
78 }
79
80 async fn timed_request(
82 &self,
83 req: hyper::Request<BoxBody<Bytes, Error>>,
84 ) -> ClResult<hyper::Response<hyper::body::Incoming>> {
85 timeout(REQUEST_TIMEOUT, self.client.request(req))
86 .await
87 .map_err(|_| Error::Timeout)?
88 .map_err(Error::from)
89 }
90
91 async fn collect_body(body: hyper::body::Incoming) -> ClResult<Bytes> {
93 timeout(REQUEST_TIMEOUT, body.collect())
94 .await
95 .map_err(|_| Error::Timeout)?
96 .map_err(|_| Error::NetworkError("body collection error".into()))
97 .map(http_body_util::Collected::to_bytes)
98 }
99
100 pub async fn create_proxy_token(
106 &self,
107 tn_id: TnId,
108 id_tag: &str,
109 subject: Option<&str>,
110 ) -> ClResult<Box<str>> {
111 let auth_token = self
112 .auth_adapter
113 .create_action_token(
114 tn_id,
115 CreateAction {
116 typ: "PROXY".into(),
117 audience_tag: Some(id_tag.into()),
118 expires_at: Some(Timestamp::from_now(60)), ..Default::default()
120 },
121 )
122 .await?;
123 let req = hyper::Request::builder()
124 .method(Method::GET)
125 .uri(format!(
126 "https://cl-o.{}/api/auth/access-token?token={}{}",
127 id_tag,
128 auth_token,
129 if let Some(subject) = subject {
130 format!("&subject={}", subject)
131 } else {
132 String::new()
133 }
134 ))
135 .body(to_boxed(Empty::new()))?;
136 let res = self.timed_request(req).await?;
137 if !res.status().is_success() {
138 return Err(Error::PermissionDenied);
139 }
140 let parsed: TokenRes = serde_json::from_slice(&Self::collect_body(res.into_body()).await?)?;
141
142 Ok(parsed.data.token)
143 }
144
145 async fn get_or_mint_proxy_token(&self, tn_id: TnId, id_tag: &str) -> ClResult<Box<str>> {
149 if let Some(token) = self.proxy_tokens.get(tn_id, id_tag) {
150 return Ok(token);
151 }
152 let token = self.create_proxy_token(tn_id, id_tag, None).await?;
153 self.proxy_tokens.insert(tn_id, id_tag, token.clone());
154 Ok(token)
155 }
156
157 pub async fn get_bin(
158 &self,
159 tn_id: TnId,
160 id_tag: &str,
161 path: &str,
162 auth: bool,
163 ) -> ClResult<Bytes> {
164 let mut attempt = 0u8;
165 loop {
166 let req = hyper::Request::builder()
167 .method(Method::GET)
168 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
169 let req = if auth {
170 req.header(
171 "Authorization",
172 format!("Bearer {}", self.get_or_mint_proxy_token(tn_id, id_tag).await?),
173 )
174 } else {
175 req
176 };
177 let req = req.body(to_boxed(Empty::new()))?;
178 let res = self.timed_request(req).await?;
179 debug!(status = %res.status(), "federated GET response");
180 match res.status() {
181 StatusCode::OK => return Self::collect_body(res.into_body()).await,
182 StatusCode::NOT_FOUND => return Err(Error::NotFound),
183 StatusCode::GONE => return Err(Error::Gone),
184 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if auth && attempt == 0 => {
185 debug!(id_tag = %id_tag, path = %path,
186 "auth rejected, refreshing cached token and retrying");
187 self.proxy_tokens.invalidate(tn_id, id_tag);
188 attempt += 1;
189 }
190 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
191 return Err(Error::PermissionDenied);
192 }
193 code => {
194 return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
195 }
196 }
197 }
198 }
199
200 pub async fn get_stream(
206 &self,
207 tn_id: TnId,
208 id_tag: &str,
209 path: &str,
210 auth: bool,
211 ) -> ClResult<impl AsyncRead + Send + Unpin + use<>> {
212 let mut attempt = 0u8;
213 loop {
214 let req = hyper::Request::builder()
215 .method(Method::GET)
216 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
217 let req = if auth {
218 let token = self.get_or_mint_proxy_token(tn_id, id_tag).await?;
219 debug!("Got proxy token (len={})", token.len());
220 req.header("Authorization", format!("Bearer {}", token))
221 } else {
222 req
223 };
224 let req = req.body(to_boxed(Empty::new()))?;
225 let res = self.timed_request(req).await?;
226 match res.status() {
227 StatusCode::OK => {
228 let stream = res.into_body().into_data_stream().map_err(std::io::Error::other);
229 return Ok(tokio_util::io::StreamReader::new(stream));
230 }
231 StatusCode::NOT_FOUND => return Err(Error::NotFound),
232 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if auth && attempt == 0 => {
233 debug!(id_tag = %id_tag, path = %path,
234 "auth rejected on stream, refreshing cached token and retrying");
235 self.proxy_tokens.invalidate(tn_id, id_tag);
236 attempt += 1;
237 }
238 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
239 return Err(Error::PermissionDenied);
240 }
241 code => {
242 return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
243 }
244 }
245 }
246 }
247
248 pub async fn get<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
249 where
250 Res: DeserializeOwned,
251 {
252 let res = self.get_bin(tn_id, id_tag, path, true).await?;
253 let parsed: Res = serde_json::from_slice(&res)?;
254 Ok(parsed)
255 }
256
257 pub async fn get_noauth<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
258 where
259 Res: DeserializeOwned,
260 {
261 let res = self.get_bin(tn_id, id_tag, path, false).await?;
262 let parsed: Res = serde_json::from_slice(&res)?;
263 Ok(parsed)
264 }
265
266 pub async fn get_public<Res>(&self, id_tag: &str, path: &str) -> ClResult<Res>
268 where
269 Res: DeserializeOwned,
270 {
271 let req = hyper::Request::builder()
272 .method(Method::GET)
273 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
274 .body(to_boxed(Empty::new()))?;
275 let res = self.timed_request(req).await?;
276 match res.status() {
277 StatusCode::OK => {
278 let bytes = Self::collect_body(res.into_body()).await?;
279 let parsed: Res = serde_json::from_slice(&bytes)?;
280 Ok(parsed)
281 }
282 StatusCode::NOT_FOUND => Err(Error::NotFound),
283 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
284 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
285 }
286 }
287
288 pub async fn get_conditional<Res>(
293 &self,
294 id_tag: &str,
295 path: &str,
296 etag: Option<&str>,
297 ) -> ClResult<ConditionalResult<Res>>
298 where
299 Res: DeserializeOwned,
300 {
301 let mut builder = hyper::Request::builder()
302 .method(Method::GET)
303 .uri(format!("https://cl-o.{}/api{}", id_tag, path));
304
305 if let Some(etag) = etag {
307 builder = builder.header("If-None-Match", etag);
308 }
309
310 let req = builder.body(to_boxed(Empty::new()))?;
311 let res = self.timed_request(req).await?;
312
313 match res.status() {
314 StatusCode::NOT_MODIFIED => Ok(ConditionalResult::NotModified),
315 StatusCode::OK => {
316 let new_etag = res
318 .headers()
319 .get("etag")
320 .and_then(|v| v.to_str().ok())
321 .map(|s| s.trim_matches('"').into());
322
323 let bytes = Self::collect_body(res.into_body()).await?;
324 let parsed: Res = serde_json::from_slice(&bytes)?;
325 Ok(ConditionalResult::Modified { data: parsed, etag: new_etag })
326 }
327 StatusCode::NOT_FOUND => Err(Error::NotFound),
328 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
329 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
330 }
331 }
332
333 pub async fn post_public<Req, Res>(&self, id_tag: &str, path: &str, data: &Req) -> ClResult<Res>
335 where
336 Req: Serialize,
337 Res: DeserializeOwned,
338 {
339 let json_data = serde_json::to_vec(data)?;
340 let req = hyper::Request::builder()
341 .method(Method::POST)
342 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
343 .header("Content-Type", "application/json")
344 .body(to_boxed(Full::from(json_data)))?;
345 let res = self.timed_request(req).await?;
346 match res.status() {
347 StatusCode::OK | StatusCode::CREATED => {
348 let bytes = Self::collect_body(res.into_body()).await?;
349 let parsed: Res = serde_json::from_slice(&bytes)?;
350 Ok(parsed)
351 }
352 StatusCode::NOT_FOUND => Err(Error::NotFound),
353 StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
354 StatusCode::UNPROCESSABLE_ENTITY => Err(Error::ValidationError(
355 "IDP registration failed - validation error".to_string(),
356 )),
357 code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
358 }
359 }
360
361 pub async fn post_bin(
367 &self,
368 _tn_id: TnId,
369 id_tag: &str,
370 path: &str,
371 data: Bytes,
372 ) -> ClResult<Bytes> {
373 let req = hyper::Request::builder()
374 .method(Method::POST)
375 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
376 .header("Content-Type", "application/json")
377 .body(to_boxed(Full::from(data)))?;
378 let res = self.timed_request(req).await?;
379 Self::collect_body(res.into_body()).await
380 }
381
382 pub async fn post_stream<S>(
385 &self,
386 _tn_id: TnId,
387 id_tag: &str,
388 path: &str,
389 stream: S,
390 ) -> ClResult<Bytes>
391 where
392 S: Stream<Item = Result<hyper::body::Frame<Bytes>, hyper::Error>> + Send + Sync + 'static,
393 {
394 let req = hyper::Request::builder()
395 .method(Method::POST)
396 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
397 .body(to_boxed(StreamBody::new(stream)))?;
398 let res = self.timed_request(req).await?;
399 Self::collect_body(res.into_body()).await
400 }
401
402 pub async fn post<Res>(
403 &self,
404 tn_id: TnId,
405 id_tag: &str,
406 path: &str,
407 data: &impl Serialize,
408 ) -> ClResult<Res>
409 where
410 Res: DeserializeOwned,
411 {
412 let res = self.post_bin(tn_id, id_tag, path, serde_json::to_vec(data)?.into()).await?;
413 let parsed: Res = serde_json::from_slice(&res)?;
414 Ok(parsed)
415 }
416
417 pub async fn post_json_authed(
427 &self,
428 tn_id: TnId,
429 id_tag: &str,
430 path: &str,
431 data: Bytes,
432 ) -> ClResult<Bytes> {
433 let mut attempt = 0u8;
434 loop {
435 let token = self.get_or_mint_proxy_token(tn_id, id_tag).await?;
436 let req = hyper::Request::builder()
437 .method(Method::POST)
438 .uri(format!("https://cl-o.{}/api{}", id_tag, path))
439 .header("Content-Type", "application/json")
440 .header("Authorization", format!("Bearer {}", token))
441 .body(to_boxed(Full::from(data.clone())))?;
442 let res = self.timed_request(req).await?;
443 debug!(status = %res.status(), "federated POST response");
444 match res.status() {
445 StatusCode::NOT_FOUND => return Err(Error::NotFound),
446 StatusCode::GONE => return Err(Error::Gone),
447 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if attempt == 0 => {
448 debug!(id_tag = %id_tag, path = %path,
449 "auth rejected on POST, refreshing cached token and retrying");
450 self.proxy_tokens.invalidate(tn_id, id_tag);
451 attempt += 1;
452 }
453 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
454 return Err(Error::PermissionDenied);
455 }
456 code if code.is_success() => {
457 return Self::collect_body(res.into_body()).await;
462 }
463 code => {
464 return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
465 }
466 }
467 }
468 }
469
470 pub async fn post_authed<Res>(
472 &self,
473 tn_id: TnId,
474 id_tag: &str,
475 path: &str,
476 data: &impl Serialize,
477 ) -> ClResult<Res>
478 where
479 Res: DeserializeOwned,
480 {
481 let res = self
482 .post_json_authed(tn_id, id_tag, path, serde_json::to_vec(data)?.into())
483 .await?;
484 let parsed: Res = serde_json::from_slice(&res)?;
485 Ok(parsed)
486 }
487}
488
489