1use http::{HeaderMap, HeaderValue, Method, Request};
8use http_body_util::{BodyExt, Full};
9use hyper::{body::Bytes, Response};
10use hyper_util::client::legacy::Client as HyperClient;
11use hyperlocal::Uri as HyperLocalUri;
12use hyperlocal::{UnixClientExt, UnixConnector};
13use nydus_api::HttpProxyConfig;
14use nydus_utils::metrics::BackendMetrics;
15use reqwest;
16use tokio::runtime::Runtime;
17
18use super::connection::{Connection, ConnectionConfig, ConnectionError};
19use super::{BackendError, BackendResult, BlobBackend, BlobReader};
20use std::path::Path;
21use std::{
22 fmt,
23 io::{Error, Result},
24 num::ParseIntError,
25 str::{self},
26 sync::Arc,
27};
28
29const HYPER_LOCAL_CLIENT_RUNTIME_THREAD_NUM: usize = 1;
30
31#[derive(Debug)]
32pub enum HttpProxyError {
33 ParseStringToInteger(ParseIntError),
35 ParseContentLengthFromHeader(http::header::ToStrError),
36 LocalRequest(hyper_util::client::legacy::Error),
38 RemoteRequest(ConnectionError),
40 BuildTokioRuntime(Error),
42 BuildHttpRequest(http::Error),
44 ReadResponseBody(hyper::Error),
46 Transport(reqwest::Error),
48 CopyBuffer(Error),
50 InvalidPath,
52 ConstructHeader(String),
54}
55
56impl fmt::Display for HttpProxyError {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 match self {
59 HttpProxyError::ParseStringToInteger(e) => {
60 write!(f, "failed to parse string to integer, {}", e)
61 }
62 HttpProxyError::ParseContentLengthFromHeader(e) => {
63 write!(f, "failed to parse content length from header, {}", e)
64 }
65 HttpProxyError::LocalRequest(e) => write!(f, "failed to get response, {}", e),
66 HttpProxyError::RemoteRequest(e) => write!(f, "failed to get response, {}", e),
67 HttpProxyError::BuildTokioRuntime(e) => {
68 write!(f, "failed to build tokio runtime, {}", e)
69 }
70 HttpProxyError::BuildHttpRequest(e) => {
71 write!(f, "failed to build http request, {}", e)
72 }
73 HttpProxyError::Transport(e) => {
74 write!(f, "failed to transport remote response body, {}", e)
75 }
76 HttpProxyError::ReadResponseBody(e) => {
77 write!(f, "failed to read response body, {}", e)
78 }
79 HttpProxyError::CopyBuffer(e) => write!(f, "failed to copy buffer, {}", e),
80 HttpProxyError::InvalidPath => write!(f, "invalid path"),
81 HttpProxyError::ConstructHeader(e) => {
82 write!(f, "failed to construct request header, {}", e)
83 }
84 }
85 }
86}
87
88impl From<HttpProxyError> for BackendError {
89 fn from(error: HttpProxyError) -> Self {
90 BackendError::HttpProxy(error)
91 }
92}
93
94pub struct HttpProxy {
103 addr: String,
104 path: String,
105 client: Client,
106 metrics: Option<Arc<BackendMetrics>>,
107}
108
109pub struct HttpProxyReader {
111 client: Client,
112 uri: Uri,
113 metrics: Arc<BackendMetrics>,
114}
115
116#[derive(Clone)]
117struct LocalClient {
118 client: Arc<HyperClient<UnixConnector, Full<Bytes>>>,
119 runtime: Arc<Runtime>,
120}
121
122#[derive(Clone)]
123enum Client {
124 Local(LocalClient),
125 Remote(Arc<Connection>),
126}
127
128enum Uri {
129 Local(Arc<hyper::Uri>),
130 Remote(String),
131}
132
133fn range_str_for_header(offset: u64, len: Option<usize>) -> String {
134 match len {
135 Some(len) => format!("bytes={}-{}", offset, offset + len as u64 - 1),
136 None => format!("bytes={}-", offset),
137 }
138}
139
140fn build_tokio_runtime(name: &str, thread_num: usize) -> Result<Runtime> {
141 let runtime = tokio::runtime::Builder::new_multi_thread()
142 .thread_name(name)
143 .worker_threads(thread_num)
144 .enable_all()
145 .build()?;
146 Ok(runtime)
147}
148
149impl LocalClient {
150 async fn do_req(
151 &self,
152 uri: Arc<hyper::Uri>,
153 only_head: bool,
154 offset: u64,
155 len: Option<usize>,
156 ) -> BackendResult<Response<hyper::body::Incoming>> {
157 let method = if only_head { Method::HEAD } else { Method::GET };
158 let req = Request::builder()
159 .method(method)
160 .uri(uri.as_ref())
161 .header(http::header::RANGE, range_str_for_header(offset, len))
162 .body(Full::new(Bytes::new()))
163 .map_err(HttpProxyError::BuildHttpRequest)?;
164 let resp = self
165 .client
166 .request(req)
167 .await
168 .map_err(HttpProxyError::LocalRequest)?;
169 Ok(resp)
170 }
171
172 fn get_headers(&self, uri: Arc<hyper::Uri>) -> BackendResult<HeaderMap<HeaderValue>> {
173 let headers = self
174 .runtime
175 .block_on(self.do_req(uri, true, 0, None))?
176 .headers()
177 .to_owned();
178 Ok(headers)
179 }
180
181 fn try_read(&self, uri: Arc<hyper::Uri>, offset: u64, len: usize) -> BackendResult<Vec<u8>> {
182 self.runtime.block_on(async {
183 let resp = self.do_req(uri, false, offset, Some(len)).await;
184 match resp {
185 Ok(mut resp) => resp
186 .body_mut()
187 .collect()
188 .await
189 .map_err(|e| HttpProxyError::ReadResponseBody(e).into())
190 .map(|b| b.to_bytes().to_vec()),
191 Err(e) => Err(e),
192 }
193 })
194 }
195}
196
197impl BlobReader for HttpProxyReader {
198 fn blob_size(&self) -> super::BackendResult<u64> {
199 let headers = match &self.client {
200 Client::Local(client) => {
201 let uri = match self.uri {
202 Uri::Local(ref uri) => uri.clone(),
203 Uri::Remote(_) => unreachable!(),
204 };
205 client.get_headers(uri)
206 }
207 Client::Remote(connection) => {
208 let uri = match self.uri {
209 Uri::Local(_) => unreachable!(),
210 Uri::Remote(ref uri) => uri.clone(),
211 };
212 connection
213 .call::<&[u8]>(
214 Method::HEAD,
215 uri.as_str(),
216 None,
217 None,
218 &mut HeaderMap::new(),
219 true,
220 )
221 .map(|resp| resp.headers().to_owned())
222 .map_err(|e| HttpProxyError::RemoteRequest(e).into())
223 }
224 };
225 let content_length = headers?[http::header::CONTENT_LENGTH]
226 .to_str()
227 .map_err(HttpProxyError::ParseContentLengthFromHeader)?
228 .parse::<u64>()
229 .map_err(HttpProxyError::ParseStringToInteger)?;
230 Ok(content_length)
231 }
232
233 fn try_read(&self, mut buf: &mut [u8], offset: u64) -> BackendResult<usize> {
234 match &self.client {
235 Client::Local(client) => {
236 let uri = match self.uri {
237 Uri::Local(ref uri) => uri.clone(),
238 Uri::Remote(_) => unreachable!(),
239 };
240 let content = client.try_read(uri, offset, buf.len())?;
241 let copied_size = std::io::copy(&mut content.as_slice(), &mut buf)
242 .map_err(HttpProxyError::CopyBuffer)?;
243 Ok(copied_size as usize)
244 }
245 Client::Remote(connection) => {
246 let uri = match self.uri {
247 Uri::Local(_) => unreachable!(),
248 Uri::Remote(ref uri) => uri.clone(),
249 };
250 let mut headers = HeaderMap::new();
251 let range = range_str_for_header(offset, Some(buf.len()));
252 headers.insert(
253 http::header::RANGE,
254 range
255 .as_str()
256 .parse()
257 .map_err(|e| HttpProxyError::ConstructHeader(format!("{}", e)))?,
258 );
259 let mut resp = connection
260 .call::<&[u8]>(Method::GET, uri.as_str(), None, None, &mut headers, true)
261 .map_err(HttpProxyError::RemoteRequest)?;
262
263 Ok(resp
264 .copy_to(&mut buf)
265 .map_err(HttpProxyError::Transport)
266 .map(|size| size as usize)?)
267 }
268 }
269 }
270
271 fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics {
272 &self.metrics
273 }
274}
275
276impl HttpProxy {
277 pub fn new(config: &HttpProxyConfig, id: Option<&str>) -> Result<HttpProxy> {
278 let client = if config.addr.starts_with("http://") || config.addr.starts_with("https://") {
279 let conn_cfg: ConnectionConfig = config.clone().into();
280 let conn = Connection::new(&conn_cfg)?;
281 Client::Remote(conn)
282 } else {
283 let client = HyperClient::unix();
284 let runtime = build_tokio_runtime("http-proxy", HYPER_LOCAL_CLIENT_RUNTIME_THREAD_NUM)?;
285 let local_client = LocalClient {
286 client: Arc::new(client),
287 runtime: Arc::new(runtime),
288 };
289 Client::Local(local_client)
290 };
291 Ok(HttpProxy {
292 addr: config.addr.to_string(),
293 path: config.path.to_string(),
294 client,
295 metrics: id.map(|i| BackendMetrics::new(i, "http-proxy")),
296 })
297 }
298}
299
300impl BlobBackend for HttpProxy {
301 fn shutdown(&self) {
302 match &self.client {
303 Client::Local(_) => {
304 }
306 Client::Remote(remote_client) => {
307 remote_client.shutdown();
308 }
309 }
310 }
311
312 fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics {
313 self.metrics.as_ref().unwrap()
316 }
317
318 fn get_reader(
319 &self,
320 blob_id: &str,
321 ) -> super::BackendResult<std::sync::Arc<dyn super::BlobReader>> {
322 let path = Path::new(&self.path).join(blob_id);
323 let path = path.to_str().ok_or(HttpProxyError::InvalidPath)?;
324 let uri = match &self.client {
325 Client::Local(_) => {
326 let uri: Arc<hyper::Uri> =
327 Arc::new(HyperLocalUri::new(self.addr.clone(), "/").into());
328 Uri::Local(uri)
329 }
330 Client::Remote(_) => {
331 let uri = format!("{}{}", self.addr, path);
332 Uri::Remote(uri)
333 }
334 };
335 let reader = Arc::new(HttpProxyReader {
336 client: self.client.clone(),
337 uri,
338 metrics: self.metrics.as_ref().unwrap().clone(),
339 });
340 Ok(reader)
341 }
342}
343
344impl Drop for HttpProxy {
345 fn drop(&mut self) {
346 self.shutdown();
347 if let Some(metrics) = self.metrics.as_ref() {
348 metrics.release().unwrap_or_else(|e| error!("{:?}", e));
349 }
350 }
351}
352
353#[cfg(test)]
354mod tests {
355
356 use crate::{
357 backend::{http_proxy::HttpProxy, BlobBackend},
358 utils::alloc_buf,
359 };
360
361 use http::{status, Request};
362 use http_body_util::Full;
363 use hyper::body::Incoming;
364 use hyper::service::service_fn;
365 use hyper::Response;
366 use hyper_util::rt::TokioIo;
367 use hyper_util::server::conn::auto::Builder;
368 use nydus_api::HttpProxyConfig;
369 use std::{
370 cmp,
371 fs::{self},
372 net::{IpAddr, Ipv4Addr, SocketAddr},
373 path::Path,
374 thread,
375 time::Duration,
376 };
377 use tokio::net::{TcpListener, UnixListener};
378
379 use super::build_tokio_runtime;
380 use super::Bytes;
381
382 const CONTENT: &str = "some content for test";
383 const SOCKET_PATH: &str = "/tmp/nydus-test-local-http-proxy.sock";
384
385 fn parse_range_header(range_str: &str) -> (u64, Option<u64>) {
386 let range_str = range_str.trim_start_matches("bytes=");
387 let range: Vec<&str> = range_str.split('-').collect();
388 let start = range[0].parse::<u64>().unwrap();
389 let end = match range[1] {
390 "" => None,
391 _ => Some(cmp::min(
392 range[1].parse::<u64>().unwrap(),
393 (CONTENT.len() - 1) as u64,
394 )),
395 };
396 (start, end)
397 }
398
399 async fn server_handler(
400 req: Request<Incoming>,
401 ) -> Result<Response<Full<Bytes>>, std::convert::Infallible> {
402 match *req.method() {
403 hyper::Method::HEAD => Ok::<_, std::convert::Infallible>(
404 Response::builder()
405 .status(200)
406 .header(http::header::CONTENT_LENGTH, CONTENT.len())
407 .body(Full::new(Bytes::new()))
408 .unwrap(),
409 ),
410 hyper::Method::GET => {
411 let range = req.headers()[http::header::RANGE].to_str().unwrap();
412 println!("range: {}", range);
413 let (start, end) = parse_range_header(range);
414 let length = match end {
415 Some(e) => e - start + 1,
416 None => CONTENT.len() as u64,
417 };
418 println!("start: {}, end: {:?}, length: {}", start, end, length);
419 let end = match end {
420 Some(e) => e,
421 None => (CONTENT.len() - 1) as u64,
422 };
423 let content = CONTENT.as_bytes()[start as usize..(end + 1) as usize].to_vec();
424 Ok::<_, std::convert::Infallible>(
425 Response::builder()
426 .status(200)
427 .header(http::header::CONTENT_LENGTH, length)
428 .body(Full::new(Bytes::from(content)))
429 .unwrap(),
430 )
431 }
432 _ => Ok::<_, std::convert::Infallible>(
433 Response::builder()
434 .status(status::StatusCode::METHOD_NOT_ALLOWED)
435 .body(Full::new(Bytes::new()))
436 .unwrap(),
437 ),
438 }
439 }
440
441 #[test]
442 fn test_head_and_get() {
443 thread::spawn(|| {
444 let rt = build_tokio_runtime("test-local-http-proxy-server", 1).unwrap();
445 rt.block_on(async {
446 println!("\nstarting local http proxy server......");
447 let path = Path::new(SOCKET_PATH);
448 if path.exists() {
449 fs::remove_file(path).unwrap();
450 }
451 let listener = UnixListener::bind(path).unwrap();
452 loop {
453 let (stream, _) = listener.accept().await.unwrap();
454 let io = TokioIo::new(stream);
455 tokio::spawn(async move {
456 Builder::new(hyper_util::rt::TokioExecutor::new())
457 .serve_connection(io, service_fn(server_handler))
458 .await
459 .ok();
460 });
461 }
462 });
463 });
464
465 thread::spawn(|| {
466 let rt = build_tokio_runtime("test-remote-http-proxy-server", 1).unwrap();
467 rt.block_on(async {
468 println!("\nstarting remote http proxy server......");
469 let listener = TcpListener::bind(SocketAddr::new(
470 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
471 9977,
472 ))
473 .await
474 .unwrap();
475 loop {
476 let (stream, _) = listener.accept().await.unwrap();
477 let io = TokioIo::new(stream);
478 tokio::spawn(async move {
479 Builder::new(hyper_util::rt::TokioExecutor::new())
480 .serve_connection(io, service_fn(server_handler))
481 .await
482 .ok();
483 });
484 }
485 });
486 });
487
488 thread::sleep(Duration::from_secs(5));
490
491 let test_list: Vec<(String, String)> = vec![
493 (
494 format!(
495 "{{\"addr\":\"{}\",\"path\":\"/namespace/<repo>/blobs\"}}",
496 SOCKET_PATH,
497 ),
498 "test-local-http-proxy".to_string(),
499 ),
500 (
501 "{\"addr\":\"http://127.0.0.1:9977\",\"path\":\"/namespace/<repo>/blobs\"}"
502 .to_string(),
503 "test-remote-http-proxy".to_string(),
504 ),
505 ];
506 for test_case in test_list.iter() {
507 let config: HttpProxyConfig = serde_json::from_str(test_case.0.as_str()).unwrap();
508 let backend = HttpProxy::new(&config, Some(test_case.1.as_str())).unwrap();
509 let reader = backend.get_reader("blob_id").unwrap();
510
511 println!();
512 println!("testing blob_size()......");
513 let blob_size = reader
514 .blob_size()
515 .map_err(|e| {
516 println!("blob_size() failed: {}", e);
517 e
518 })
519 .unwrap();
520 assert_eq!(blob_size, CONTENT.len() as u64);
521
522 println!();
523 println!("testing read() range......");
524 let mut buf = alloc_buf(3);
525 let size = reader
526 .try_read(&mut buf, 0)
527 .map_err(|e| {
528 println!("read() range failed: {}", e);
529 e
530 })
531 .unwrap();
532 assert_eq!(size, 3);
533 assert_eq!(buf, CONTENT.as_bytes()[0..3]);
534
535 println!();
536 println!("testing read() full......");
537 let mut buf = alloc_buf(80);
538 let size = reader
539 .try_read(&mut buf, 0)
540 .map_err(|e| {
541 println!("read() range failed: {}", e);
542 e
543 })
544 .unwrap();
545 assert_eq!(size, CONTENT.len() as usize);
546 assert_eq!(&buf[0..CONTENT.len()], CONTENT.as_bytes());
547 }
548 }
549}