Skip to main content

cloudillo_core/
request.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Request client implementation
5
6use futures::TryStreamExt;
7use futures_core::stream::Stream;
8use http_body_util::{BodyExt, Empty, Full, StreamBody, combinators::BoxBody};
9use hyper::http::StatusCode;
10use hyper::{Method, body::Body, body::Bytes};
11use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
12use hyper_util::client::legacy::{Client, connect::HttpConnector};
13use hyper_util::rt::TokioExecutor;
14use serde::{Deserialize, Serialize, de::DeserializeOwned};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::io::AsyncRead;
18use tokio::time::timeout;
19
20/// Default HTTP request timeout (10 seconds)
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
22
23use crate::prelude::*;
24use cloudillo_types::action_types::CreateAction;
25use cloudillo_types::auth_adapter::AuthAdapter;
26
27fn to_boxed<B>(body: B) -> BoxBody<Bytes, Error>
28where
29	B: Body<Data = Bytes> + Send + Sync + 'static,
30	B::Error: Send + 'static,
31{
32	body.map_err(|_err| Error::NetworkError("body stream error".into())).boxed()
33}
34
35#[derive(Deserialize)]
36struct TokenData {
37	token: Box<str>,
38}
39
40#[derive(Deserialize)]
41struct TokenRes {
42	data: TokenData,
43}
44
45/// Result of a conditional GET request
46#[derive(Debug)]
47pub enum ConditionalResult<T> {
48	/// 200 OK - new data with new etag
49	Modified { data: T, etag: Option<Box<str>> },
50	/// 304 Not Modified - etag unchanged
51	NotModified,
52}
53
54#[derive(Debug, Clone)]
55pub struct Request {
56	pub auth_adapter: Arc<dyn AuthAdapter>,
57	client: Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, Error>>,
58	proxy_tokens: Arc<crate::ProxyTokenCache>,
59}
60
61impl Request {
62	pub fn new(
63		auth_adapter: Arc<dyn AuthAdapter>,
64		proxy_tokens: Arc<crate::ProxyTokenCache>,
65	) -> ClResult<Self> {
66		let client = HttpsConnectorBuilder::new()
67			.with_native_roots()
68			.map_err(|_| Error::ConfigError("no native root CA certificates found".into()))?
69			.https_only()
70			.enable_http1()
71			.build();
72
73		Ok(Request {
74			auth_adapter,
75			client: Client::builder(TokioExecutor::new()).build(client),
76			proxy_tokens,
77		})
78	}
79
80	/// Execute an HTTP request with timeout wrapper
81	async fn timed_request(
82		&self,
83		req: hyper::Request<BoxBody<Bytes, Error>>,
84	) -> ClResult<hyper::Response<hyper::body::Incoming>> {
85		timeout(REQUEST_TIMEOUT, self.client.request(req))
86			.await
87			.map_err(|_| Error::Timeout)?
88			.map_err(Error::from)
89	}
90
91	/// Collect response body with timeout
92	async fn collect_body(body: hyper::body::Incoming) -> ClResult<Bytes> {
93		timeout(REQUEST_TIMEOUT, body.collect())
94			.await
95			.map_err(|_| Error::Timeout)?
96			.map_err(|_| Error::NetworkError("body collection error".into()))
97			.map(http_body_util::Collected::to_bytes)
98	}
99
100	// NOTE: Despite the name, this returns the **HS256 access token** the
101	// remote signs back, NOT the PROXY action token (the PROXY token is a
102	// short-lived bearer for the access-token endpoint only). Most callers
103	// should go through `get_or_mint_proxy_token` to benefit from caching
104	// and the 401-retry path.
105	pub async fn create_proxy_token(
106		&self,
107		tn_id: TnId,
108		id_tag: &str,
109		subject: Option<&str>,
110	) -> ClResult<Box<str>> {
111		let auth_token = self
112			.auth_adapter
113			.create_action_token(
114				tn_id,
115				CreateAction {
116					typ: "PROXY".into(),
117					audience_tag: Some(id_tag.into()),
118					expires_at: Some(Timestamp::from_now(60)), // 1 min
119					..Default::default()
120				},
121			)
122			.await?;
123		let req = hyper::Request::builder()
124			.method(Method::GET)
125			.uri(format!(
126				"https://cl-o.{}/api/auth/access-token?token={}{}",
127				id_tag,
128				auth_token,
129				if let Some(subject) = subject {
130					format!("&subject={}", subject)
131				} else {
132					String::new()
133				}
134			))
135			.body(to_boxed(Empty::new()))?;
136		let res = self.timed_request(req).await?;
137		if !res.status().is_success() {
138			return Err(Error::PermissionDenied);
139		}
140		let parsed: TokenRes = serde_json::from_slice(&Self::collect_body(res.into_body()).await?)?;
141
142		Ok(parsed.data.token)
143	}
144
145	/// Returns a cached access token, minting one via `create_proxy_token`
146	/// and inserting on miss. Callers that hit 401/403 should call
147	/// `self.proxy_tokens.invalidate(tn_id, id_tag)` and retry once.
148	async fn get_or_mint_proxy_token(&self, tn_id: TnId, id_tag: &str) -> ClResult<Box<str>> {
149		if let Some(token) = self.proxy_tokens.get(tn_id, id_tag) {
150			return Ok(token);
151		}
152		let token = self.create_proxy_token(tn_id, id_tag, None).await?;
153		self.proxy_tokens.insert(tn_id, id_tag, token.clone());
154		Ok(token)
155	}
156
157	pub async fn get_bin(
158		&self,
159		tn_id: TnId,
160		id_tag: &str,
161		path: &str,
162		auth: bool,
163	) -> ClResult<Bytes> {
164		let mut attempt = 0u8;
165		loop {
166			let req = hyper::Request::builder()
167				.method(Method::GET)
168				.uri(format!("https://cl-o.{}/api{}", id_tag, path));
169			let req = if auth {
170				req.header(
171					"Authorization",
172					format!("Bearer {}", self.get_or_mint_proxy_token(tn_id, id_tag).await?),
173				)
174			} else {
175				req
176			};
177			let req = req.body(to_boxed(Empty::new()))?;
178			let res = self.timed_request(req).await?;
179			debug!(status = %res.status(), "federated GET response");
180			match res.status() {
181				StatusCode::OK => return Self::collect_body(res.into_body()).await,
182				StatusCode::NOT_FOUND => return Err(Error::NotFound),
183				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if auth && attempt == 0 => {
184					debug!(id_tag = %id_tag, path = %path,
185						"auth rejected, refreshing cached token and retrying");
186					self.proxy_tokens.invalidate(tn_id, id_tag);
187					attempt += 1;
188				}
189				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
190					return Err(Error::PermissionDenied);
191				}
192				code => {
193					return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
194				}
195			}
196		}
197	}
198
199	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<BodyDataStream<hyper::body::Incoming>> {
200	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<BodyDataStream<ClResult<Bytes>>> {
201	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<impl Stream<Item = ClResult<Bytes>>> {
202	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<TokioIo<BodyDataStream<hyper::body::Incoming>>> {
203	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<StreamReader<BodyDataStream<hyper::body::Incoming>, Bytes>> {
204	pub async fn get_stream(
205		&self,
206		tn_id: TnId,
207		id_tag: &str,
208		path: &str,
209		auth: bool,
210	) -> ClResult<impl AsyncRead + Send + Unpin + use<>> {
211		let mut attempt = 0u8;
212		loop {
213			let req = hyper::Request::builder()
214				.method(Method::GET)
215				.uri(format!("https://cl-o.{}/api{}", id_tag, path));
216			let req = if auth {
217				let token = self.get_or_mint_proxy_token(tn_id, id_tag).await?;
218				debug!("Got proxy token (len={})", token.len());
219				req.header("Authorization", format!("Bearer {}", token))
220			} else {
221				req
222			};
223			let req = req.body(to_boxed(Empty::new()))?;
224			let res = self.timed_request(req).await?;
225			match res.status() {
226				StatusCode::OK => {
227					let stream = res.into_body().into_data_stream().map_err(std::io::Error::other);
228					return Ok(tokio_util::io::StreamReader::new(stream));
229				}
230				StatusCode::NOT_FOUND => return Err(Error::NotFound),
231				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if auth && attempt == 0 => {
232					debug!(id_tag = %id_tag, path = %path,
233						"auth rejected on stream, refreshing cached token and retrying");
234					self.proxy_tokens.invalidate(tn_id, id_tag);
235					attempt += 1;
236				}
237				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
238					return Err(Error::PermissionDenied);
239				}
240				code => {
241					return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
242				}
243			}
244		}
245	}
246
247	pub async fn get<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
248	where
249		Res: DeserializeOwned,
250	{
251		let res = self.get_bin(tn_id, id_tag, path, true).await?;
252		let parsed: Res = serde_json::from_slice(&res)?;
253		Ok(parsed)
254	}
255
256	pub async fn get_noauth<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
257	where
258		Res: DeserializeOwned,
259	{
260		let res = self.get_bin(tn_id, id_tag, path, false).await?;
261		let parsed: Res = serde_json::from_slice(&res)?;
262		Ok(parsed)
263	}
264
265	/// Make a public GET request without authentication or tenant context
266	pub async fn get_public<Res>(&self, id_tag: &str, path: &str) -> ClResult<Res>
267	where
268		Res: DeserializeOwned,
269	{
270		let req = hyper::Request::builder()
271			.method(Method::GET)
272			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
273			.body(to_boxed(Empty::new()))?;
274		let res = self.timed_request(req).await?;
275		match res.status() {
276			StatusCode::OK => {
277				let bytes = Self::collect_body(res.into_body()).await?;
278				let parsed: Res = serde_json::from_slice(&bytes)?;
279				Ok(parsed)
280			}
281			StatusCode::NOT_FOUND => Err(Error::NotFound),
282			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
283			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
284		}
285	}
286
287	/// Make a conditional GET request with If-None-Match header for etag support
288	///
289	/// Returns `ConditionalResult::NotModified` if server returns 304,
290	/// or `ConditionalResult::Modified` with data and new etag if content changed.
291	pub async fn get_conditional<Res>(
292		&self,
293		id_tag: &str,
294		path: &str,
295		etag: Option<&str>,
296	) -> ClResult<ConditionalResult<Res>>
297	where
298		Res: DeserializeOwned,
299	{
300		let mut builder = hyper::Request::builder()
301			.method(Method::GET)
302			.uri(format!("https://cl-o.{}/api{}", id_tag, path));
303
304		// Add If-None-Match header if we have an etag
305		if let Some(etag) = etag {
306			builder = builder.header("If-None-Match", etag);
307		}
308
309		let req = builder.body(to_boxed(Empty::new()))?;
310		let res = self.timed_request(req).await?;
311
312		match res.status() {
313			StatusCode::NOT_MODIFIED => Ok(ConditionalResult::NotModified),
314			StatusCode::OK => {
315				// Extract ETag from response headers
316				let new_etag = res
317					.headers()
318					.get("etag")
319					.and_then(|v| v.to_str().ok())
320					.map(|s| s.trim_matches('"').into());
321
322				let bytes = Self::collect_body(res.into_body()).await?;
323				let parsed: Res = serde_json::from_slice(&bytes)?;
324				Ok(ConditionalResult::Modified { data: parsed, etag: new_etag })
325			}
326			StatusCode::NOT_FOUND => Err(Error::NotFound),
327			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
328			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
329		}
330	}
331
332	/// Make a public POST request without authentication or tenant context
333	pub async fn post_public<Req, Res>(&self, id_tag: &str, path: &str, data: &Req) -> ClResult<Res>
334	where
335		Req: Serialize,
336		Res: DeserializeOwned,
337	{
338		let json_data = serde_json::to_vec(data)?;
339		let req = hyper::Request::builder()
340			.method(Method::POST)
341			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
342			.header("Content-Type", "application/json")
343			.body(to_boxed(Full::from(json_data)))?;
344		let res = self.timed_request(req).await?;
345		match res.status() {
346			StatusCode::OK | StatusCode::CREATED => {
347				let bytes = Self::collect_body(res.into_body()).await?;
348				let parsed: Res = serde_json::from_slice(&bytes)?;
349				Ok(parsed)
350			}
351			StatusCode::NOT_FOUND => Err(Error::NotFound),
352			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
353			StatusCode::UNPROCESSABLE_ENTITY => Err(Error::ValidationError(
354				"IDP registration failed - validation error".to_string(),
355			)),
356			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
357		}
358	}
359
360	/// Unauthenticated POST to a remote tenant.
361	///
362	/// Callers (e.g. IDP registration/resend, action federation inbox) authenticate
363	/// via signed tokens carried in the request body, not via an `Authorization`
364	/// header — `tn_id` is therefore unused at the transport layer.
365	pub async fn post_bin(
366		&self,
367		_tn_id: TnId,
368		id_tag: &str,
369		path: &str,
370		data: Bytes,
371	) -> ClResult<Bytes> {
372		let req = hyper::Request::builder()
373			.method(Method::POST)
374			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
375			.header("Content-Type", "application/json")
376			.body(to_boxed(Full::from(data)))?;
377		let res = self.timed_request(req).await?;
378		Self::collect_body(res.into_body()).await
379	}
380
381	/// Unauthenticated streaming POST to a remote tenant. See [`Self::post_bin`]
382	/// for why no `Authorization` header is attached.
383	pub async fn post_stream<S>(
384		&self,
385		_tn_id: TnId,
386		id_tag: &str,
387		path: &str,
388		stream: S,
389	) -> ClResult<Bytes>
390	where
391		S: Stream<Item = Result<hyper::body::Frame<Bytes>, hyper::Error>> + Send + Sync + 'static,
392	{
393		let req = hyper::Request::builder()
394			.method(Method::POST)
395			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
396			.body(to_boxed(StreamBody::new(stream)))?;
397		let res = self.timed_request(req).await?;
398		Self::collect_body(res.into_body()).await
399	}
400
401	pub async fn post<Res>(
402		&self,
403		tn_id: TnId,
404		id_tag: &str,
405		path: &str,
406		data: &impl Serialize,
407	) -> ClResult<Res>
408	where
409		Res: DeserializeOwned,
410	{
411		let res = self.post_bin(tn_id, id_tag, path, serde_json::to_vec(data)?.into()).await?;
412		let parsed: Res = serde_json::from_slice(&res)?;
413		Ok(parsed)
414	}
415
416	/// Authenticated JSON POST to a remote tenant. Mirrors [`Self::get_bin`]:
417	/// attaches a cached proxy token in the `Authorization` header and retries
418	/// once on 401/403 after invalidating the cached token. Branches on
419	/// response status so callers see a typed `Error` (NotFound / Gone /
420	/// PermissionDenied / NetworkError) instead of a downstream JSON-
421	/// deserialization failure when the upstream returns an error envelope.
422	///
423	/// Hard-codes `Content-Type: application/json`; for binary authed POSTs
424	/// add a separate helper that takes an explicit content-type parameter.
425	pub async fn post_json_authed(
426		&self,
427		tn_id: TnId,
428		id_tag: &str,
429		path: &str,
430		data: Bytes,
431	) -> ClResult<Bytes> {
432		let mut attempt = 0u8;
433		loop {
434			let token = self.get_or_mint_proxy_token(tn_id, id_tag).await?;
435			let req = hyper::Request::builder()
436				.method(Method::POST)
437				.uri(format!("https://cl-o.{}/api{}", id_tag, path))
438				.header("Content-Type", "application/json")
439				.header("Authorization", format!("Bearer {}", token))
440				.body(to_boxed(Full::from(data.clone())))?;
441			let res = self.timed_request(req).await?;
442			debug!(status = %res.status(), "federated POST response");
443			match res.status() {
444				StatusCode::NOT_FOUND => return Err(Error::NotFound),
445				StatusCode::GONE => return Err(Error::Gone),
446				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN if attempt == 0 => {
447					debug!(id_tag = %id_tag, path = %path,
448						"auth rejected on POST, refreshing cached token and retrying");
449					self.proxy_tokens.invalidate(tn_id, id_tag);
450					attempt += 1;
451				}
452				StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
453					return Err(Error::PermissionDenied);
454				}
455				code if code.is_success() => {
456					// Accept any 2xx — the wire shape is up to the caller.
457					// `204 No Content` returns empty bytes; the typed
458					// `post_authed` wrapper will surface that as a JSON parse
459					// error, which is acceptable for callers that opted in.
460					return Self::collect_body(res.into_body()).await;
461				}
462				code => {
463					return Err(Error::NetworkError(format!("unexpected HTTP status: {}", code)));
464				}
465			}
466		}
467	}
468
469	/// Typed JSON wrapper around [`Self::post_json_authed`].
470	pub async fn post_authed<Res>(
471		&self,
472		tn_id: TnId,
473		id_tag: &str,
474		path: &str,
475		data: &impl Serialize,
476	) -> ClResult<Res>
477	where
478		Res: DeserializeOwned,
479	{
480		let res = self
481			.post_json_authed(tn_id, id_tag, path, serde_json::to_vec(data)?.into())
482			.await?;
483		let parsed: Res = serde_json::from_slice(&res)?;
484		Ok(parsed)
485	}
486}
487
488// vim: ts=4