Skip to main content

cloudillo_core/
request.rs

1//! Request client implementation
2
3use futures::TryStreamExt;
4use futures_core::stream::Stream;
5use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody};
6use hyper::http::StatusCode;
7use hyper::{body::Body, body::Bytes, Method};
8use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
9use hyper_util::client::legacy::{connect::HttpConnector, Client};
10use hyper_util::rt::TokioExecutor;
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::io::AsyncRead;
15use tokio::time::timeout;
16
17/// Default HTTP request timeout (10 seconds)
18const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
19
20use crate::prelude::*;
21use cloudillo_types::action_types::CreateAction;
22use cloudillo_types::auth_adapter::AuthAdapter;
23
24fn to_boxed<B>(body: B) -> BoxBody<Bytes, Error>
25where
26	B: Body<Data = Bytes> + Send + Sync + 'static,
27	B::Error: Send + 'static,
28{
29	body.map_err(|_err| Error::NetworkError("body stream error".into())).boxed()
30}
31
32#[derive(Deserialize)]
33struct TokenData {
34	token: Box<str>,
35}
36
37#[derive(Deserialize)]
38struct TokenRes {
39	data: TokenData,
40}
41
42/// Result of a conditional GET request
43#[derive(Debug)]
44pub enum ConditionalResult<T> {
45	/// 200 OK - new data with new etag
46	Modified { data: T, etag: Option<Box<str>> },
47	/// 304 Not Modified - etag unchanged
48	NotModified,
49}
50
51#[derive(Debug, Clone)]
52pub struct Request {
53	pub auth_adapter: Arc<dyn AuthAdapter>,
54	client: Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, Error>>,
55}
56
57impl Request {
58	pub fn new(auth_adapter: Arc<dyn AuthAdapter>) -> ClResult<Self> {
59		let client = HttpsConnectorBuilder::new()
60			.with_native_roots()
61			.map_err(|_| Error::ConfigError("no native root CA certificates found".into()))?
62			.https_only()
63			.enable_http1()
64			.build();
65
66		Ok(Request { auth_adapter, client: Client::builder(TokioExecutor::new()).build(client) })
67	}
68
69	/// Execute an HTTP request with timeout wrapper
70	async fn timed_request(
71		&self,
72		req: hyper::Request<BoxBody<Bytes, Error>>,
73	) -> ClResult<hyper::Response<hyper::body::Incoming>> {
74		timeout(REQUEST_TIMEOUT, self.client.request(req))
75			.await
76			.map_err(|_| Error::Timeout)?
77			.map_err(Error::from)
78	}
79
80	/// Collect response body with timeout
81	async fn collect_body(body: hyper::body::Incoming) -> ClResult<Bytes> {
82		timeout(REQUEST_TIMEOUT, body.collect())
83			.await
84			.map_err(|_| Error::Timeout)?
85			.map_err(|_| Error::NetworkError("body collection error".into()))
86			.map(|collected| collected.to_bytes())
87	}
88
89	pub async fn create_proxy_token(
90		&self,
91		tn_id: TnId,
92		id_tag: &str,
93		subject: Option<&str>,
94	) -> ClResult<Box<str>> {
95		let auth_token = self
96			.auth_adapter
97			.create_action_token(
98				tn_id,
99				CreateAction {
100					typ: "PROXY".into(),
101					audience_tag: Some(id_tag.into()),
102					expires_at: Some(Timestamp::from_now(60)), // 1 min
103					..Default::default()
104				},
105			)
106			.await?;
107		let req = hyper::Request::builder()
108			.method(Method::GET)
109			.uri(format!(
110				"https://cl-o.{}/api/auth/access-token?token={}{}",
111				id_tag,
112				auth_token,
113				if let Some(subject) = subject {
114					format!("&subject={}", subject)
115				} else {
116					"".into()
117				}
118			))
119			.body(to_boxed(Empty::new()))?;
120		let res = self.timed_request(req).await?;
121		if !res.status().is_success() {
122			return Err(Error::PermissionDenied);
123		}
124		let parsed: TokenRes = serde_json::from_slice(&Self::collect_body(res.into_body()).await?)?;
125
126		Ok(parsed.data.token)
127	}
128
129	pub async fn get_bin(
130		&self,
131		tn_id: TnId,
132		id_tag: &str,
133		path: &str,
134		auth: bool,
135	) -> ClResult<Bytes> {
136		let req = hyper::Request::builder()
137			.method(Method::GET)
138			.uri(format!("https://cl-o.{}/api{}", id_tag, path));
139		let req = if auth {
140			req.header(
141				"Authorization",
142				format!("Bearer {}", self.create_proxy_token(tn_id, id_tag, None).await?),
143			)
144		} else {
145			req
146		};
147		let req = req.body(to_boxed(Empty::new()))?;
148		let res = self.timed_request(req).await?;
149		info!("Got response: {}", res.status());
150		match res.status() {
151			StatusCode::OK => Self::collect_body(res.into_body()).await,
152			StatusCode::NOT_FOUND => Err(Error::NotFound),
153			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
154			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
155		}
156	}
157
158	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<BodyDataStream<hyper::body::Incoming>> {
159	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<BodyDataStream<ClResult<Bytes>>> {
160	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<impl Stream<Item = ClResult<Bytes>>> {
161	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<TokioIo<BodyDataStream<hyper::body::Incoming>>> {
162	//pub async fn get_stream(&self, id_tag: &str, path: &str) -> ClResult<StreamReader<BodyDataStream<hyper::body::Incoming>, Bytes>> {
163	pub async fn get_stream(
164		&self,
165		tn_id: TnId,
166		id_tag: &str,
167		path: &str,
168	) -> ClResult<impl AsyncRead + Send + Unpin> {
169		let token = self.create_proxy_token(tn_id, id_tag, None).await?;
170		debug!("Got proxy token (len={})", token.len());
171		let req = hyper::Request::builder()
172			.method(Method::GET)
173			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
174			.header("Authorization", format!("Bearer {}", token))
175			.body(to_boxed(Empty::new()))?;
176		let res = self.timed_request(req).await?;
177		match res.status() {
178			//StatusCode::OK => Ok(res.into_body().into_data_stream().map(|stream| stream.map_err(|err| Error::Unknown))),
179			//StatusCode::OK => Ok(res.into_body().into_data_stream().map(|res| res.map_err(|err| Error::Unknown))),
180			//StatusCode::OK => Ok(hyper_util::rt::TokioIo::new(res.into_body().into_data_stream())),
181			StatusCode::OK => {
182				let stream = res.into_body().into_data_stream()
183					//.map_ok(|f| f.into_data().unwrap_or_defailt())
184					.map_err(std::io::Error::other);
185				Ok(tokio_util::io::StreamReader::new(stream))
186			}
187			StatusCode::NOT_FOUND => Err(Error::NotFound),
188			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
189			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
190		}
191	}
192
193	pub async fn get<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
194	where
195		Res: DeserializeOwned,
196	{
197		let res = self.get_bin(tn_id, id_tag, path, true).await?;
198		let parsed: Res = serde_json::from_slice(&res)?;
199		Ok(parsed)
200	}
201
202	pub async fn get_noauth<Res>(&self, tn_id: TnId, id_tag: &str, path: &str) -> ClResult<Res>
203	where
204		Res: DeserializeOwned,
205	{
206		let res = self.get_bin(tn_id, id_tag, path, false).await?;
207		let parsed: Res = serde_json::from_slice(&res)?;
208		Ok(parsed)
209	}
210
211	/// Make a public GET request without authentication or tenant context
212	pub async fn get_public<Res>(&self, id_tag: &str, path: &str) -> ClResult<Res>
213	where
214		Res: DeserializeOwned,
215	{
216		let req = hyper::Request::builder()
217			.method(Method::GET)
218			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
219			.body(to_boxed(Empty::new()))?;
220		let res = self.timed_request(req).await?;
221		match res.status() {
222			StatusCode::OK => {
223				let bytes = Self::collect_body(res.into_body()).await?;
224				let parsed: Res = serde_json::from_slice(&bytes)?;
225				Ok(parsed)
226			}
227			StatusCode::NOT_FOUND => Err(Error::NotFound),
228			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
229			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
230		}
231	}
232
233	/// Make a conditional GET request with If-None-Match header for etag support
234	///
235	/// Returns `ConditionalResult::NotModified` if server returns 304,
236	/// or `ConditionalResult::Modified` with data and new etag if content changed.
237	pub async fn get_conditional<Res>(
238		&self,
239		id_tag: &str,
240		path: &str,
241		etag: Option<&str>,
242	) -> ClResult<ConditionalResult<Res>>
243	where
244		Res: DeserializeOwned,
245	{
246		let mut builder = hyper::Request::builder()
247			.method(Method::GET)
248			.uri(format!("https://cl-o.{}/api{}", id_tag, path));
249
250		// Add If-None-Match header if we have an etag
251		if let Some(etag) = etag {
252			builder = builder.header("If-None-Match", etag);
253		}
254
255		let req = builder.body(to_boxed(Empty::new()))?;
256		let res = self.timed_request(req).await?;
257
258		match res.status() {
259			StatusCode::NOT_MODIFIED => Ok(ConditionalResult::NotModified),
260			StatusCode::OK => {
261				// Extract ETag from response headers
262				let new_etag = res
263					.headers()
264					.get("etag")
265					.and_then(|v| v.to_str().ok())
266					.map(|s| s.trim_matches('"').into());
267
268				let bytes = Self::collect_body(res.into_body()).await?;
269				let parsed: Res = serde_json::from_slice(&bytes)?;
270				Ok(ConditionalResult::Modified { data: parsed, etag: new_etag })
271			}
272			StatusCode::NOT_FOUND => Err(Error::NotFound),
273			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
274			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
275		}
276	}
277
278	/// Make a public POST request without authentication or tenant context
279	pub async fn post_public<Req, Res>(&self, id_tag: &str, path: &str, data: &Req) -> ClResult<Res>
280	where
281		Req: Serialize,
282		Res: DeserializeOwned,
283	{
284		let json_data = serde_json::to_vec(data)?;
285		let req = hyper::Request::builder()
286			.method(Method::POST)
287			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
288			.header("Content-Type", "application/json")
289			.body(to_boxed(Full::from(json_data)))?;
290		let res = self.timed_request(req).await?;
291		match res.status() {
292			StatusCode::OK | StatusCode::CREATED => {
293				let bytes = Self::collect_body(res.into_body()).await?;
294				let parsed: Res = serde_json::from_slice(&bytes)?;
295				Ok(parsed)
296			}
297			StatusCode::NOT_FOUND => Err(Error::NotFound),
298			StatusCode::FORBIDDEN => Err(Error::PermissionDenied),
299			StatusCode::UNPROCESSABLE_ENTITY => Err(Error::ValidationError(
300				"IDP registration failed - validation error".to_string(),
301			)),
302			code => Err(Error::NetworkError(format!("unexpected HTTP status: {}", code))),
303		}
304	}
305
306	pub async fn post_bin(
307		&self,
308		_tn_id: TnId,
309		id_tag: &str,
310		path: &str,
311		data: Bytes,
312	) -> ClResult<Bytes> {
313		let req = hyper::Request::builder()
314			.method(Method::POST)
315			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
316			.header("Content-Type", "application/json")
317			.body(to_boxed(Full::from(data)))?;
318		let res = self.timed_request(req).await?;
319		Self::collect_body(res.into_body()).await
320	}
321
322	pub async fn post_stream<S>(
323		&self,
324		_tn_id: TnId,
325		id_tag: &str,
326		path: &str,
327		stream: S,
328	) -> ClResult<Bytes>
329	where
330		S: Stream<Item = Result<hyper::body::Frame<Bytes>, hyper::Error>> + Send + Sync + 'static,
331	{
332		let req = hyper::Request::builder()
333			.method(Method::POST)
334			.uri(format!("https://cl-o.{}/api{}", id_tag, path))
335			.body(to_boxed(StreamBody::new(stream)))?;
336		let res = self.timed_request(req).await?;
337		Self::collect_body(res.into_body()).await
338	}
339
340	pub async fn post<Res>(
341		&self,
342		tn_id: TnId,
343		id_tag: &str,
344		path: &str,
345		data: &impl Serialize,
346	) -> ClResult<Res>
347	where
348		Res: DeserializeOwned,
349	{
350		let res = self.post_bin(tn_id, id_tag, path, serde_json::to_vec(data)?.into()).await?;
351		let parsed: Res = serde_json::from_slice(&res)?;
352		Ok(parsed)
353	}
354}
355
356// vim: ts=4