Skip to main content

cloudillo_core/
request.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Request client implementation
5
6use futures::TryStreamExt;
7use futures_core::stream::Stream;
8use http_body_util::{BodyExt, Empty, Full, StreamBody, combinators::BoxBody};
9use hyper::http::StatusCode;
10use hyper::{Method, body::Body, body::Bytes};
11use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
12use hyper_util::client::legacy::{Client, connect::HttpConnector};
13use hyper_util::rt::TokioExecutor;
14use serde::{Deserialize, Serialize, de::DeserializeOwned};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::io::AsyncRead;
18use tokio::time::timeout;
19
20/// Default HTTP request timeout (10 seconds)
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
22
23use crate::prelude::*;
24use cloudillo_types::action_types::CreateAction;
25use cloudillo_types::auth_adapter::AuthAdapter;
26
27fn to_boxed<B>(body: B) -> BoxBody<Bytes, Error>
28where
29	B: Body<Data = Bytes> + Send + Sync + 'static,
30	B::Error: Send + 'static,
31{
32	body.map_err(|_err| Error::NetworkError("body stream error".into())).boxed()
33}
34
35#[derive(Deserialize)]
36struct TokenData {
37	token: Box<str>,
38}
39
40#[derive(Deserialize)]
41struct TokenRes {
42	data: TokenData,
43}
44
45/// Result of a conditional GET request
46#[derive(Debug)]
47pub enum ConditionalResult<T> {
48	/// 200 OK - new data with new etag
49	Modified { data: T, etag: Option<Box<str>> },
50	/// 304 Not Modified - etag unchanged
51	NotModified,
52}
53
54#[derive(Debug, Clone)]
55pub struct Request {
56	pub auth_adapter: Arc<dyn AuthAdapter>,
57	client: Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, Error>>,
58	proxy_tokens: Arc<crate::ProxyTokenCache>,
59}
60
61impl Request {
62	pub fn new(
63		auth_adapter: Arc<dyn AuthAdapter>,
64		proxy_tokens: Arc<crate::ProxyTokenCache>,
65	) -> ClResult<Self> {
66		let client = HttpsConnectorBuilder::new()
67			.with_native_roots()
68			.map_err(|_| Error::ConfigError("no native root CA certificates found".into()))?
69			.https_only()
70			.enable_http1()
71			.build();
72
73		Ok(Request {
74			auth_adapter,
75			client: Client::builder(TokioExecutor::new()).build(client),
76			proxy_tokens,
77		})
78	}
79
80	/// Execute an HTTP request with timeout wrapper
81	async fn timed_request(
82		&self,
83		req: hyper::Request<BoxBody<Bytes, Error>>,
84	) -> ClResult<hyper::Response<hyper::body::Incoming>> {
85		timeout(REQUEST_TIMEOUT, self.client.request(req))
86			.await
87			.map_err(|_| Error::Timeout)?
88			.map_err(Error::from)
89	}
90
91	/// Collect response body with timeout
92	async fn collect_body(body: hyper::body::Incoming) -> ClResult<Bytes> {
93		timeout(REQUEST_TIMEOUT, body.collect())
94			.await
95			.map_err(|_| Error::Timeout)?
96			.map_err(|_| Error::NetworkError("body collection error".into()))
97			.map(http_body_util::Collected::to_bytes)
98	}
99
100	// NOTE: Despite the name, this returns the **HS256 access token** the
101	// remote signs back, NOT the PROXY action token (the PROXY token is a
102	// short-lived bearer for the access-token endpoint only). Most callers
103	// should go through `get_or_mint_proxy_token` to benefit from caching
104	// and the 401-retry path.
105	pub async fn create_proxy_token(
106		&self,
107		tn_id: TnId,
108		id_tag: &str,
109		subject: Option<&str>,
110	) -> ClResult<Box<str>> {
111		let auth_token = self
112			.auth_adapter
113			.create_action_token(
114				tn_id,
115				CreateAction {
116					typ: "PROXY".into(),
117					audience_tag: Some(id_tag.into()),
118					expires_at: Some(Timestamp::from_now(60)), // 1 min
119					..Default::default()
120				},
121			)
122			.await?;
123		let req = hyper::Request::builder()
124			.method(Method::GET)
125			.uri(format!(
126				"https://cl-o.{}/api/auth/access-token?token={}{}",
127				id_tag,
128				auth_token,
129				if let Some(subject) = subject {
130					format!("&subject={}", subject)
131				} else {
132					String::new()
133				}
134			))
135			.body(to_boxed(Empty::new()))?;
136		let res = self.timed_request(req).await?;
137		if !res.status().is_success() {
138			return Err(Error::PermissionDenied);
139		}
140		let parsed: TokenRes = serde_json::from_slice(&Self::collect_body(res.into_body()).await?)?;
141
142		Ok(parsed.data.token)
143	}
144
145	/// Returns a cached access token, minting one via `create_proxy_token`
146	/// and inserting on miss. Callers that hit 401/403 should call
147	/// `self.proxy_tokens.invalidate(tn_id, id_tag)` and retry once.
148	async fn get_or_mint_proxy_token(&self, tn_id: TnId, id_tag: &str) -> ClResult<Box<str>> {
149		if let Some(token) = self.proxy_tokens.get(tn_id, id_tag) {
150			return Ok(token);
151		}
152		let token = self.create_proxy_token(tn_id, id_tag, None).await?;
153		self.proxy_tokens.insert(tn_id, id_tag, token.clone());
154		Ok(token)
155	}
156
157	pub async fn get_bin(
158		&self,
159		tn_id: TnId,
160		id_tag: &str,
161		path: &str,
162		auth: bool,
163	) -> ClResult<Bytes> {
164		let mut attempt = 0u8;
165		loop {
166			let req = hyper::Request::builder()
167				.method(Method::GET)
168				.uri(format!("https://cl-o.{}/api{}", id_tag, path));
169			let req = if auth {
170				req.header(
171					"Authorization",
172					format!("Bearer {}", self.get_or_mint_proxy_token(tn_id, id_tag).await?),
173				)
174			} else {
175				req
176			};
177			let req = req.body(to_boxed(Empty::new()))?;
178			let res = self.timed_request(req).await?;
179			debug!(status = %res.status(), "federated GET response");
180			match res.status() {
181				StatusCode::OK => return Self::collect_body(res.into_body()).await,
182				StatusCode::NOT_FOUND => return Err(Error::NotFound),
183				StatusCode::GONE => return Err(Error::Gone),
184				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if auth && attempt == 0 => {
185					debug!(id_tag = %id_tag, path = %path,
186						"auth rejected, refreshing cached token and retrying");
187					self.proxy_tokens.invalidate(tn_id, id_tag);
188					attempt += 1;
189				}
190				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
191					return Err(Error::PermissionDenied);
192				}
193				code => {
194					return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
195				}
196			}
197		}
198	}
199
200	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<BodyDataStream<hyper::body::Incoming>> {
201	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<BodyDataStream<ClResult<Bytes>>> {
202	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<impl Stream<Item = ClResult<Bytes>>> {
203	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<TokioIo<BodyDataStream<hyper::body::Incoming>>> {
204	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<StreamReader<BodyDataStream<hyper::body::Incoming>, Bytes>> {
205	pub async fn get_stream(
206		&self,
207		tn_id: TnId,
208		id_tag: &str,
209		path: &str,
210		auth: bool,
211	) -> ClResult<impl AsyncRead + Send + Unpin + use<>> {
212		let mut attempt = 0u8;
213		loop {
214			let req = hyper::Request::builder()
215				.method(Method::GET)
216				.uri(format!("https://cl-o.{}/api{}", id_tag, path));
217			let req = if auth {
218				let token = self.get_or_mint_proxy_token(tn_id, id_tag).await?;
219				debug!("Got proxy token (len={})", token.len());
220				req.header("Authorization", format!("Bearer {}", token))
221			} else {
222				req
223			};
224			let req = req.body(to_boxed(Empty::new()))?;
225			let res = self.timed_request(req).await?;
226			match res.status() {
227				StatusCode::OK => {
228					let stream = res.into_body().into_data_stream().map_err(std::io::Error::other);
229					return Ok(tokio_util::io::StreamReader::new(stream));
230				}
231				StatusCode::NOT_FOUND => return Err(Error::NotFound),
232				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if auth && attempt == 0 => {
233					debug!(id_tag = %id_tag, path = %path,
234						"auth rejected on stream, refreshing cached token and retrying");
235					self.proxy_tokens.invalidate(tn_id, id_tag);
236					attempt += 1;
237				}
238				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
239					return Err(Error::PermissionDenied);
240				}
241				code => {
242					return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
243				}
244			}
245		}
246	}
247
248	pub async fn get<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
249	where
250		Res: DeserializeOwned,
251	{
252		let res = self.get_bin(tn_id, id_tag, path, true).await?;
253		let parsed: Res = serde_json::from_slice(&res)?;
254		Ok(parsed)
255	}
256
257	pub async fn get_noauth<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
258	where
259		Res: DeserializeOwned,
260	{
261		let res = self.get_bin(tn_id, id_tag, path, false).await?;
262		let parsed: Res = serde_json::from_slice(&res)?;
263		Ok(parsed)
264	}
265
266	/// Make a public GET request without authentication or tenant context
267	pub async fn get_public<Res>(&self, id_tag: &str, path: &str) -> ClResult<Res>
268	where
269		Res: DeserializeOwned,
270	{
271		let req = hyper::Request::builder()
272			.method(Method::GET)
273			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
274			.body(to_boxed(Empty::new()))?;
275		let res = self.timed_request(req).await?;
276		match res.status() {
277			StatusCode::OK => {
278				let bytes = Self::collect_body(res.into_body()).await?;
279				let parsed: Res = serde_json::from_slice(&bytes)?;
280				Ok(parsed)
281			}
282			StatusCode::NOT_FOUND => Err(Error::NotFound),
283			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
284			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
285		}
286	}
287
288	/// Make a conditional GET request with If-None-Match header for etag support
289	///
290	/// Returns `ConditionalResult::NotModified` if server returns 304,
291	/// or `ConditionalResult::Modified` with data and new etag if content changed.
292	pub async fn get_conditional<Res>(
293		&self,
294		id_tag: &str,
295		path: &str,
296		etag: Option<&str>,
297	) -> ClResult<ConditionalResult<Res>>
298	where
299		Res: DeserializeOwned,
300	{
301		let mut builder = hyper::Request::builder()
302			.method(Method::GET)
303			.uri(format!("https://cl-o.{}/api{}", id_tag, path));
304
305		// Add If-None-Match header if we have an etag
306		if let Some(etag) = etag {
307			builder = builder.header("If-None-Match", etag);
308		}
309
310		let req = builder.body(to_boxed(Empty::new()))?;
311		let res = self.timed_request(req).await?;
312
313		match res.status() {
314			StatusCode::NOT_MODIFIED => Ok(ConditionalResult::NotModified),
315			StatusCode::OK => {
316				// Extract ETag from response headers
317				let new_etag = res
318					.headers()
319					.get("etag")
320					.and_then(|v| v.to_str().ok())
321					.map(|s| s.trim_matches('"').into());
322
323				let bytes = Self::collect_body(res.into_body()).await?;
324				let parsed: Res = serde_json::from_slice(&bytes)?;
325				Ok(ConditionalResult::Modified { data: parsed, etag: new_etag })
326			}
327			StatusCode::NOT_FOUND => Err(Error::NotFound),
328			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
329			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
330		}
331	}
332
333	/// Make a public POST request without authentication or tenant context
334	pub async fn post_public<Req, Res>(&self, id_tag: &str, path: &str, data: &Req) -> ClResult<Res>
335	where
336		Req: Serialize,
337		Res: DeserializeOwned,
338	{
339		let json_data = serde_json::to_vec(data)?;
340		let req = hyper::Request::builder()
341			.method(Method::POST)
342			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
343			.header("Content-Type", "application/json")
344			.body(to_boxed(Full::from(json_data)))?;
345		let res = self.timed_request(req).await?;
346		match res.status() {
347			StatusCode::OK | StatusCode::CREATED => {
348				let bytes = Self::collect_body(res.into_body()).await?;
349				let parsed: Res = serde_json::from_slice(&bytes)?;
350				Ok(parsed)
351			}
352			StatusCode::NOT_FOUND => Err(Error::NotFound),
353			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
354			StatusCode::UNPROCESSABLE_ENTITY => Err(Error::ValidationError(
355				"IDP registration failed - validation error".to_string(),
356			)),
357			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
358		}
359	}
360
361	/// Unauthenticated POST to a remote tenant.
362	///
363	/// Callers (e.g. IDP registration/resend, action federation inbox) authenticate
364	/// via signed tokens carried in the request body, not via an `Authorization`
365	/// header — `tn_id` is therefore unused at the transport layer.
366	pub async fn post_bin(
367		&self,
368		_tn_id: TnId,
369		id_tag: &str,
370		path: &str,
371		data: Bytes,
372	) -> ClResult<Bytes> {
373		let req = hyper::Request::builder()
374			.method(Method::POST)
375			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
376			.header("Content-Type", "application/json")
377			.body(to_boxed(Full::from(data)))?;
378		let res = self.timed_request(req).await?;
379		Self::collect_body(res.into_body()).await
380	}
381
382	/// Unauthenticated streaming POST to a remote tenant. See [`Self::post_bin`]
383	/// for why no `Authorization` header is attached.
384	pub async fn post_stream<S>(
385		&self,
386		_tn_id: TnId,
387		id_tag: &str,
388		path: &str,
389		stream: S,
390	) -> ClResult<Bytes>
391	where
392		S: Stream<Item = Result<hyper::body::Frame<Bytes>, hyper::Error>> + Send + Sync + 'static,
393	{
394		let req = hyper::Request::builder()
395			.method(Method::POST)
396			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
397			.body(to_boxed(StreamBody::new(stream)))?;
398		let res = self.timed_request(req).await?;
399		Self::collect_body(res.into_body()).await
400	}
401
402	pub async fn post<Res>(
403		&self,
404		tn_id: TnId,
405		id_tag: &str,
406		path: &str,
407		data: &impl Serialize,
408	) -> ClResult<Res>
409	where
410		Res: DeserializeOwned,
411	{
412		let res = self.post_bin(tn_id, id_tag, path, serde_json::to_vec(data)?.into()).await?;
413		let parsed: Res = serde_json::from_slice(&res)?;
414		Ok(parsed)
415	}
416
417	/// Authenticated JSON POST to a remote tenant. Mirrors [`Self::get_bin`]:
418	/// attaches a cached proxy token in the `Authorization` header and retries
419	/// once on 401/403 after invalidating the cached token. Branches on
420	/// response status so callers see a typed `Error` (NotFound / Gone /
421	/// PermissionDenied / NetworkError) instead of a downstream JSON-
422	/// deserialization failure when the upstream returns an error envelope.
423	///
424	/// Hard-codes `Content-Type: application/json`; for binary authed POSTs
425	/// add a separate helper that takes an explicit content-type parameter.
426	pub async fn post_json_authed(
427		&self,
428		tn_id: TnId,
429		id_tag: &str,
430		path: &str,
431		data: Bytes,
432	) -> ClResult<Bytes> {
433		let mut attempt = 0u8;
434		loop {
435			let token = self.get_or_mint_proxy_token(tn_id, id_tag).await?;
436			let req = hyper::Request::builder()
437				.method(Method::POST)
438				.uri(format!("https://cl-o.{}/api{}", id_tag, path))
439				.header("Content-Type", "application/json")
440				.header("Authorization", format!("Bearer {}", token))
441				.body(to_boxed(Full::from(data.clone())))?;
442			let res = self.timed_request(req).await?;
443			debug!(status = %res.status(), "federated POST response");
444			match res.status() {
445				StatusCode::NOT_FOUND => return Err(Error::NotFound),
446				StatusCode::GONE => return Err(Error::Gone),
447				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if attempt == 0 => {
448					debug!(id_tag = %id_tag, path = %path,
449						"auth rejected on POST, refreshing cached token and retrying");
450					self.proxy_tokens.invalidate(tn_id, id_tag);
451					attempt += 1;
452				}
453				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
454					return Err(Error::PermissionDenied);
455				}
456				code if code.is_success() => {
457					// Accept any 2xx — the wire shape is up to the caller.
458					// `204 No Content` returns empty bytes; the typed
459					// `post_authed` wrapper will surface that as a JSON parse
460					// error, which is acceptable for callers that opted in.
461					return Self::collect_body(res.into_body()).await;
462				}
463				code => {
464					return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
465				}
466			}
467		}
468	}
469
470	/// Typed JSON wrapper around [`Self::post_json_authed`].
471	pub async fn post_authed<Res>(
472		&self,
473		tn_id: TnId,
474		id_tag: &str,
475		path: &str,
476		data: &impl Serialize,
477	) -> ClResult<Res>
478	where
479		Res: DeserializeOwned,
480	{
481		let res = self
482			.post_json_authed(tn_id, id_tag, path, serde_json::to_vec(data)?.into())
483			.await?;
484		let parsed: Res = serde_json::from_slice(&res)?;
485		Ok(parsed)
486	}
487}
488
489// vim: ts=4