Skip to main content

cloudillo_core/
request.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Request client implementation
5
6use futures::TryStreamExt;
7use futures_core::stream::Stream;
8use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody};
9use hyper::http::StatusCode;
10use hyper::{body::Body, body::Bytes, Method};
11use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
12use hyper_util::client::legacy::{connect::HttpConnector, Client};
13use hyper_util::rt::TokioExecutor;
14use serde::{de::DeserializeOwned, Deserialize, Serialize};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::io::AsyncRead;
18use tokio::time::timeout;
19
20/// Default HTTP request timeout (10 seconds)
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
22
23use crate::prelude::*;
24use cloudillo_types::action_types::CreateAction;
25use cloudillo_types::auth_adapter::AuthAdapter;
26
27fn to_boxed<B>(body: B) -> BoxBody<Bytes, Error>
28where
29	B: Body<Data = Bytes> + Send + Sync + 'static,
30	B::Error: Send + 'static,
31{
32	body.map_err(|_err| Error::NetworkError("body stream error".into())).boxed()
33}
34
35#[derive(Deserialize)]
36struct TokenData {
37	token: Box<str>,
38}
39
40#[derive(Deserialize)]
41struct TokenRes {
42	data: TokenData,
43}
44
45/// Result of a conditional GET request
46#[derive(Debug)]
47pub enum ConditionalResult<T> {
48	/// 200 OK - new data with new etag
49	Modified { data: T, etag: Option<Box<str>> },
50	/// 304 Not Modified - etag unchanged
51	NotModified,
52}
53
54#[derive(Debug, Clone)]
55pub struct Request {
56	pub auth_adapter: Arc<dyn AuthAdapter>,
57	client: Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, Error>>,
58}
59
60impl Request {
61	pub fn new(auth_adapter: Arc<dyn AuthAdapter>) -> ClResult<Self> {
62		let client = HttpsConnectorBuilder::new()
63			.with_native_roots()
64			.map_err(|_| Error::ConfigError("no native root CA certificates found".into()))?
65			.https_only()
66			.enable_http1()
67			.build();
68
69		Ok(Request { auth_adapter, client: Client::builder(TokioExecutor::new()).build(client) })
70	}
71
72	/// Execute an HTTP request with timeout wrapper
73	async fn timed_request(
74		&self,
75		req: hyper::Request<BoxBody<Bytes, Error>>,
76	) -> ClResult<hyper::Response<hyper::body::Incoming>> {
77		timeout(REQUEST_TIMEOUT, self.client.request(req))
78			.await
79			.map_err(|_| Error::Timeout)?
80			.map_err(Error::from)
81	}
82
83	/// Collect response body with timeout
84	async fn collect_body(body: hyper::body::Incoming) -> ClResult<Bytes> {
85		timeout(REQUEST_TIMEOUT, body.collect())
86			.await
87			.map_err(|_| Error::Timeout)?
88			.map_err(|_| Error::NetworkError("body collection error".into()))
89			.map(http_body_util::Collected::to_bytes)
90	}
91
92	pub async fn create_proxy_token(
93		&self,
94		tn_id: TnId,
95		id_tag: &str,
96		subject: Option<&str>,
97	) -> ClResult<Box<str>> {
98		let auth_token = self
99			.auth_adapter
100			.create_action_token(
101				tn_id,
102				CreateAction {
103					typ: "PROXY".into(),
104					audience_tag: Some(id_tag.into()),
105					expires_at: Some(Timestamp::from_now(60)), // 1 min
106					..Default::default()
107				},
108			)
109			.await?;
110		let req = hyper::Request::builder()
111			.method(Method::GET)
112			.uri(format!(
113				"https://cl-o.{}/api/auth/access-token?token={}{}",
114				id_tag,
115				auth_token,
116				if let Some(subject) = subject {
117					format!("&subject={}", subject)
118				} else {
119					String::new()
120				}
121			))
122			.body(to_boxed(Empty::new()))?;
123		let res = self.timed_request(req).await?;
124		if !res.status().is_success() {
125			return Err(Error::PermissionDenied);
126		}
127		let parsed: TokenRes = serde_json::from_slice(&Self::collect_body(res.into_body()).await?)?;
128
129		Ok(parsed.data.token)
130	}
131
132	pub async fn get_bin(
133		&self,
134		tn_id: TnId,
135		id_tag: &str,
136		path: &str,
137		auth: bool,
138	) -> ClResult<Bytes> {
139		let req = hyper::Request::builder()
140			.method(Method::GET)
141			.uri(format!("https://cl-o.{}/api{}", id_tag, path));
142		let req = if auth {
143			req.header(
144				"Authorization",
145				format!("Bearer {}", self.create_proxy_token(tn_id, id_tag, None).await?),
146			)
147		} else {
148			req
149		};
150		let req = req.body(to_boxed(Empty::new()))?;
151		let res = self.timed_request(req).await?;
152		info!("Got response: {}", res.status());
153		match res.status() {
154			StatusCode::OK => Self::collect_body(res.into_body()).await,
155			StatusCode::NOT_FOUND => Err(Error::NotFound),
156			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
157			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
158		}
159	}
160
161	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<BodyDataStream<hyper::body::Incoming>> {
162	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<BodyDataStream<ClResult<Bytes>>> {
163	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<impl Stream<Item = ClResult<Bytes>>> {
164	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<TokioIo<BodyDataStream<hyper::body::Incoming>>> {
165	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<StreamReader<BodyDataStream<hyper::body::Incoming>, Bytes>> {
166	pub async fn get_stream(
167		&self,
168		tn_id: TnId,
169		id_tag: &str,
170		path: &str,
171	) -> ClResult<impl AsyncRead + Send + Unpin> {
172		let token = self.create_proxy_token(tn_id, id_tag, None).await?;
173		debug!("Got proxy token (len={})", token.len());
174		let req = hyper::Request::builder()
175			.method(Method::GET)
176			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
177			.header("Authorization", format!("Bearer {}", token))
178			.body(to_boxed(Empty::new()))?;
179		let res = self.timed_request(req).await?;
180		match res.status() {
181			//StatusCode::OK => Ok(res.into_body().into_data_stream().map(|stream| stream.map_err(|err| Error::Unknown))),
182			//StatusCode::OK => Ok(res.into_body().into_data_stream().map(|res| res.map_err(|err| Error::Unknown))),
183			//StatusCode::OK => Ok(hyper_util::rt::TokioIo::new(res.into_body().into_data_stream())),
184			StatusCode::OK => {
185				let stream = res.into_body().into_data_stream()
186					//.map_ok(|f| f.into_data().unwrap_or_defailt())
187					.map_err(std::io::Error::other);
188				Ok(tokio_util::io::StreamReader::new(stream))
189			}
190			StatusCode::NOT_FOUND => Err(Error::NotFound),
191			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
192			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
193		}
194	}
195
196	pub async fn get<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
197	where
198		Res: DeserializeOwned,
199	{
200		let res = self.get_bin(tn_id, id_tag, path, true).await?;
201		let parsed: Res = serde_json::from_slice(&res)?;
202		Ok(parsed)
203	}
204
205	pub async fn get_noauth<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
206	where
207		Res: DeserializeOwned,
208	{
209		let res = self.get_bin(tn_id, id_tag, path, false).await?;
210		let parsed: Res = serde_json::from_slice(&res)?;
211		Ok(parsed)
212	}
213
214	/// Make a public GET request without authentication or tenant context
215	pub async fn get_public<Res>(&self, id_tag: &str, path: &str) -> ClResult<Res>
216	where
217		Res: DeserializeOwned,
218	{
219		let req = hyper::Request::builder()
220			.method(Method::GET)
221			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
222			.body(to_boxed(Empty::new()))?;
223		let res = self.timed_request(req).await?;
224		match res.status() {
225			StatusCode::OK => {
226				let bytes = Self::collect_body(res.into_body()).await?;
227				let parsed: Res = serde_json::from_slice(&bytes)?;
228				Ok(parsed)
229			}
230			StatusCode::NOT_FOUND => Err(Error::NotFound),
231			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
232			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
233		}
234	}
235
236	/// Make a conditional GET request with If-None-Match header for etag support
237	///
238	/// Returns `ConditionalResult::NotModified` if server returns 304,
239	/// or `ConditionalResult::Modified` with data and new etag if content changed.
240	pub async fn get_conditional<Res>(
241		&self,
242		id_tag: &str,
243		path: &str,
244		etag: Option<&str>,
245	) -> ClResult<ConditionalResult<Res>>
246	where
247		Res: DeserializeOwned,
248	{
249		let mut builder = hyper::Request::builder()
250			.method(Method::GET)
251			.uri(format!("https://cl-o.{}/api{}", id_tag, path));
252
253		// Add If-None-Match header if we have an etag
254		if let Some(etag) = etag {
255			builder = builder.header("If-None-Match", etag);
256		}
257
258		let req = builder.body(to_boxed(Empty::new()))?;
259		let res = self.timed_request(req).await?;
260
261		match res.status() {
262			StatusCode::NOT_MODIFIED => Ok(ConditionalResult::NotModified),
263			StatusCode::OK => {
264				// Extract ETag from response headers
265				let new_etag = res
266					.headers()
267					.get("etag")
268					.and_then(|v| v.to_str().ok())
269					.map(|s| s.trim_matches('"').into());
270
271				let bytes = Self::collect_body(res.into_body()).await?;
272				let parsed: Res = serde_json::from_slice(&bytes)?;
273				Ok(ConditionalResult::Modified { data: parsed, etag: new_etag })
274			}
275			StatusCode::NOT_FOUND => Err(Error::NotFound),
276			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
277			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
278		}
279	}
280
281	/// Make a public POST request without authentication or tenant context
282	pub async fn post_public<Req, Res>(&self, id_tag: &str, path: &str, data: &Req) -> ClResult<Res>
283	where
284		Req: Serialize,
285		Res: DeserializeOwned,
286	{
287		let json_data = serde_json::to_vec(data)?;
288		let req = hyper::Request::builder()
289			.method(Method::POST)
290			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
291			.header("Content-Type", "application/json")
292			.body(to_boxed(Full::from(json_data)))?;
293		let res = self.timed_request(req).await?;
294		match res.status() {
295			StatusCode::OK | StatusCode::CREATED => {
296				let bytes = Self::collect_body(res.into_body()).await?;
297				let parsed: Res = serde_json::from_slice(&bytes)?;
298				Ok(parsed)
299			}
300			StatusCode::NOT_FOUND => Err(Error::NotFound),
301			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
302			StatusCode::UNPROCESSABLE_ENTITY => Err(Error::ValidationError(
303				"IDP registration failed - validation error".to_string(),
304			)),
305			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
306		}
307	}
308
309	pub async fn post_bin(
310		&self,
311		_tn_id: TnId,
312		id_tag: &str,
313		path: &str,
314		data: Bytes,
315	) -> ClResult<Bytes> {
316		let req = hyper::Request::builder()
317			.method(Method::POST)
318			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
319			.header("Content-Type", "application/json")
320			.body(to_boxed(Full::from(data)))?;
321		let res = self.timed_request(req).await?;
322		Self::collect_body(res.into_body()).await
323	}
324
325	pub async fn post_stream<S>(
326		&self,
327		_tn_id: TnId,
328		id_tag: &str,
329		path: &str,
330		stream: S,
331	) -> ClResult<Bytes>
332	where
333		S: Stream<Item = Result<hyper::body::Frame<Bytes>, hyper::Error>> + Send + Sync + 'static,
334	{
335		let req = hyper::Request::builder()
336			.method(Method::POST)
337			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
338			.body(to_boxed(StreamBody::new(stream)))?;
339		let res = self.timed_request(req).await?;
340		Self::collect_body(res.into_body()).await
341	}
342
343	pub async fn post<Res>(
344		&self,
345		tn_id: TnId,
346		id_tag: &str,
347		path: &str,
348		data: &impl Serialize,
349	) -> ClResult<Res>
350	where
351		Res: DeserializeOwned,
352	{
353		let res = self.post_bin(tn_id, id_tag, path, serde_json::to_vec(data)?.into()).await?;
354		let parsed: Res = serde_json::from_slice(&res)?;
355		Ok(parsed)
356	}
357}
358
359// vim: ts=4