Skip to main content

nydus_storage/backend/
http_proxy.rs

1// Copyright 2023 Ant Group. All rights reserved.
2
3// SPDX-License-Identifier: Apache-2.0
4
5// ! Storage backend driver to access the blobs through a http proxy.
6
7use http::{HeaderMap, HeaderValue, Method, Request};
8use http_body_util::{BodyExt, Full};
9use hyper::{body::Bytes, Response};
10use hyper_util::client::legacy::Client as HyperClient;
11use hyperlocal::Uri as HyperLocalUri;
12use hyperlocal::{UnixClientExt, UnixConnector};
13use nydus_api::HttpProxyConfig;
14use nydus_utils::metrics::BackendMetrics;
15use reqwest;
16use tokio::runtime::Runtime;
17
18use super::connection::{Connection, ConnectionConfig, ConnectionError};
19use super::{BackendError, BackendResult, BlobBackend, BlobReader};
20use std::path::Path;
21use std::{
22    fmt,
23    io::{Error, Result},
24    num::ParseIntError,
25    str::{self},
26    sync::Arc,
27};
28
29const HYPER_LOCAL_CLIENT_RUNTIME_THREAD_NUM: usize = 1;
30
31#[derive(Debug)]
32pub enum HttpProxyError {
33    /// Failed to parse string to integer.
34    ParseStringToInteger(ParseIntError),
35    ParseContentLengthFromHeader(http::header::ToStrError),
36    /// Failed to get response from the local http server.
37    LocalRequest(hyper_util::client::legacy::Error),
38    /// Failed to get response from the remote http server.
39    RemoteRequest(ConnectionError),
40    /// Failed to build the tokio runtime.
41    BuildTokioRuntime(Error),
42    /// Failed to build local http request.
43    BuildHttpRequest(http::Error),
44    /// Failed to read the response body.
45    ReadResponseBody(hyper::Error),
46    /// Failed to transport the remote response body.
47    Transport(reqwest::Error),
48    /// Failed to copy the buffer.
49    CopyBuffer(Error),
50    /// Invalid path.
51    InvalidPath,
52    /// Failed to build request header.
53    ConstructHeader(String),
54}
55
56impl fmt::Display for HttpProxyError {
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        match self {
59            HttpProxyError::ParseStringToInteger(e) => {
60                write!(f, "failed to parse string to integer, {}", e)
61            }
62            HttpProxyError::ParseContentLengthFromHeader(e) => {
63                write!(f, "failed to parse content length from header, {}", e)
64            }
65            HttpProxyError::LocalRequest(e) => write!(f, "failed to get response, {}", e),
66            HttpProxyError::RemoteRequest(e) => write!(f, "failed to get response, {}", e),
67            HttpProxyError::BuildTokioRuntime(e) => {
68                write!(f, "failed to build tokio runtime, {}", e)
69            }
70            HttpProxyError::BuildHttpRequest(e) => {
71                write!(f, "failed to build http request, {}", e)
72            }
73            HttpProxyError::Transport(e) => {
74                write!(f, "failed to transport remote response body, {}", e)
75            }
76            HttpProxyError::ReadResponseBody(e) => {
77                write!(f, "failed to read response body, {}", e)
78            }
79            HttpProxyError::CopyBuffer(e) => write!(f, "failed to copy buffer, {}", e),
80            HttpProxyError::InvalidPath => write!(f, "invalid path"),
81            HttpProxyError::ConstructHeader(e) => {
82                write!(f, "failed to construct request header, {}", e)
83            }
84        }
85    }
86}
87
88impl From<HttpProxyError> for BackendError {
89    fn from(error: HttpProxyError) -> Self {
90        BackendError::HttpProxy(error)
91    }
92}
93
94/// A storage backend driver to access blobs through a http proxy server.
95/// The http proxy server may be local (using unix socket) or be remote (using `http://` or `https://`).
96///
97/// `HttpProxy` uses two API endpoints to access the blobs:
98/// - `HEAD /path/to/blob` to get the blob size
99/// - `GET /path/to/blob` to read the blob
100///
101/// The http proxy server should respect [the `Range` header](https://www.rfc-editor.org/rfc/rfc9110.html#name-range) to support range reading.
102pub struct HttpProxy {
103    addr: String,
104    path: String,
105    client: Client,
106    metrics: Option<Arc<BackendMetrics>>,
107}
108
109/// HttpProxyReader is a BlobReader to implement the HttpProxy backend driver.
110pub struct HttpProxyReader {
111    client: Client,
112    uri: Uri,
113    metrics: Arc<BackendMetrics>,
114}
115
116#[derive(Clone)]
117struct LocalClient {
118    client: Arc<HyperClient<UnixConnector, Full<Bytes>>>,
119    runtime: Arc<Runtime>,
120}
121
122#[derive(Clone)]
123enum Client {
124    Local(LocalClient),
125    Remote(Arc<Connection>),
126}
127
128enum Uri {
129    Local(Arc<hyper::Uri>),
130    Remote(String),
131}
132
133fn range_str_for_header(offset: u64, len: Option<usize>) -> String {
134    match len {
135        Some(len) => format!("bytes={}-{}", offset, offset + len as u64 - 1),
136        None => format!("bytes={}-", offset),
137    }
138}
139
140fn build_tokio_runtime(name: &str, thread_num: usize) -> Result<Runtime> {
141    let runtime = tokio::runtime::Builder::new_multi_thread()
142        .thread_name(name)
143        .worker_threads(thread_num)
144        .enable_all()
145        .build()?;
146    Ok(runtime)
147}
148
149impl LocalClient {
150    async fn do_req(
151        &self,
152        uri: Arc<hyper::Uri>,
153        only_head: bool,
154        offset: u64,
155        len: Option<usize>,
156    ) -> BackendResult<Response<hyper::body::Incoming>> {
157        let method = if only_head { Method::HEAD } else { Method::GET };
158        let req = Request::builder()
159            .method(method)
160            .uri(uri.as_ref())
161            .header(http::header::RANGE, range_str_for_header(offset, len))
162            .body(Full::new(Bytes::new()))
163            .map_err(HttpProxyError::BuildHttpRequest)?;
164        let resp = self
165            .client
166            .request(req)
167            .await
168            .map_err(HttpProxyError::LocalRequest)?;
169        Ok(resp)
170    }
171
172    fn get_headers(&self, uri: Arc<hyper::Uri>) -> BackendResult<HeaderMap<HeaderValue>> {
173        let headers = self
174            .runtime
175            .block_on(self.do_req(uri, true, 0, None))?
176            .headers()
177            .to_owned();
178        Ok(headers)
179    }
180
181    fn try_read(&self, uri: Arc<hyper::Uri>, offset: u64, len: usize) -> BackendResult<Vec<u8>> {
182        self.runtime.block_on(async {
183            let resp = self.do_req(uri, false, offset, Some(len)).await;
184            match resp {
185                Ok(mut resp) => resp
186                    .body_mut()
187                    .collect()
188                    .await
189                    .map_err(|e| HttpProxyError::ReadResponseBody(e).into())
190                    .map(|b| b.to_bytes().to_vec()),
191                Err(e) => Err(e),
192            }
193        })
194    }
195}
196
197impl BlobReader for HttpProxyReader {
198    fn blob_size(&self) -> super::BackendResult<u64> {
199        let headers = match &self.client {
200            Client::Local(client) => {
201                let uri = match self.uri {
202                    Uri::Local(ref uri) => uri.clone(),
203                    Uri::Remote(_) => unreachable!(),
204                };
205                client.get_headers(uri)
206            }
207            Client::Remote(connection) => {
208                let uri = match self.uri {
209                    Uri::Local(_) => unreachable!(),
210                    Uri::Remote(ref uri) => uri.clone(),
211                };
212                connection
213                    .call::<&[u8]>(
214                        Method::HEAD,
215                        uri.as_str(),
216                        None,
217                        None,
218                        &mut HeaderMap::new(),
219                        true,
220                    )
221                    .map(|resp| resp.headers().to_owned())
222                    .map_err(|e| HttpProxyError::RemoteRequest(e).into())
223            }
224        };
225        let content_length = headers?[http::header::CONTENT_LENGTH]
226            .to_str()
227            .map_err(HttpProxyError::ParseContentLengthFromHeader)?
228            .parse::<u64>()
229            .map_err(HttpProxyError::ParseStringToInteger)?;
230        Ok(content_length)
231    }
232
233    fn try_read(&self, mut buf: &mut [u8], offset: u64) -> BackendResult<usize> {
234        match &self.client {
235            Client::Local(client) => {
236                let uri = match self.uri {
237                    Uri::Local(ref uri) => uri.clone(),
238                    Uri::Remote(_) => unreachable!(),
239                };
240                let content = client.try_read(uri, offset, buf.len())?;
241                let copied_size = std::io::copy(&mut content.as_slice(), &mut buf)
242                    .map_err(HttpProxyError::CopyBuffer)?;
243                Ok(copied_size as usize)
244            }
245            Client::Remote(connection) => {
246                let uri = match self.uri {
247                    Uri::Local(_) => unreachable!(),
248                    Uri::Remote(ref uri) => uri.clone(),
249                };
250                let mut headers = HeaderMap::new();
251                let range = range_str_for_header(offset, Some(buf.len()));
252                headers.insert(
253                    http::header::RANGE,
254                    range
255                        .as_str()
256                        .parse()
257                        .map_err(|e| HttpProxyError::ConstructHeader(format!("{}", e)))?,
258                );
259                let mut resp = connection
260                    .call::<&[u8]>(Method::GET, uri.as_str(), None, None, &mut headers, true)
261                    .map_err(HttpProxyError::RemoteRequest)?;
262
263                Ok(resp
264                    .copy_to(&mut buf)
265                    .map_err(HttpProxyError::Transport)
266                    .map(|size| size as usize)?)
267            }
268        }
269    }
270
271    fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics {
272        &self.metrics
273    }
274}
275
276impl HttpProxy {
277    pub fn new(config: &HttpProxyConfig, id: Option<&str>) -> Result<HttpProxy> {
278        let client = if config.addr.starts_with("http://") || config.addr.starts_with("https://") {
279            let conn_cfg: ConnectionConfig = config.clone().into();
280            let conn = Connection::new(&conn_cfg)?;
281            Client::Remote(conn)
282        } else {
283            let client = HyperClient::unix();
284            let runtime = build_tokio_runtime("http-proxy", HYPER_LOCAL_CLIENT_RUNTIME_THREAD_NUM)?;
285            let local_client = LocalClient {
286                client: Arc::new(client),
287                runtime: Arc::new(runtime),
288            };
289            Client::Local(local_client)
290        };
291        Ok(HttpProxy {
292            addr: config.addr.to_string(),
293            path: config.path.to_string(),
294            client,
295            metrics: id.map(|i| BackendMetrics::new(i, "http-proxy")),
296        })
297    }
298}
299
300impl BlobBackend for HttpProxy {
301    fn shutdown(&self) {
302        match &self.client {
303            Client::Local(_) => {
304                // do nothing
305            }
306            Client::Remote(remote_client) => {
307                remote_client.shutdown();
308            }
309        }
310    }
311
312    fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics {
313        // `metrics()` is only used for nydusd, which will always provide valid `blob_id`, thus
314        // `self.metrics` has valid value.
315        self.metrics.as_ref().unwrap()
316    }
317
318    fn get_reader(
319        &self,
320        blob_id: &str,
321    ) -> super::BackendResult<std::sync::Arc<dyn super::BlobReader>> {
322        let path = Path::new(&self.path).join(blob_id);
323        let path = path.to_str().ok_or(HttpProxyError::InvalidPath)?;
324        let uri = match &self.client {
325            Client::Local(_) => {
326                let uri: Arc<hyper::Uri> =
327                    Arc::new(HyperLocalUri::new(self.addr.clone(), "/").into());
328                Uri::Local(uri)
329            }
330            Client::Remote(_) => {
331                let uri = format!("{}{}", self.addr, path);
332                Uri::Remote(uri)
333            }
334        };
335        let reader = Arc::new(HttpProxyReader {
336            client: self.client.clone(),
337            uri,
338            metrics: self.metrics.as_ref().unwrap().clone(),
339        });
340        Ok(reader)
341    }
342}
343
344impl Drop for HttpProxy {
345    fn drop(&mut self) {
346        self.shutdown();
347        if let Some(metrics) = self.metrics.as_ref() {
348            metrics.release().unwrap_or_else(|e| error!("{:?}", e));
349        }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355
356    use crate::{
357        backend::{http_proxy::HttpProxy, BlobBackend},
358        utils::alloc_buf,
359    };
360
361    use http::{status, Request};
362    use http_body_util::Full;
363    use hyper::body::Incoming;
364    use hyper::service::service_fn;
365    use hyper::Response;
366    use hyper_util::rt::TokioIo;
367    use hyper_util::server::conn::auto::Builder;
368    use nydus_api::HttpProxyConfig;
369    use std::{
370        cmp,
371        fs::{self},
372        net::{IpAddr, Ipv4Addr, SocketAddr},
373        path::Path,
374        thread,
375        time::Duration,
376    };
377    use tokio::net::{TcpListener, UnixListener};
378
379    use super::build_tokio_runtime;
380    use super::Bytes;
381
382    const CONTENT: &str = "some content for test";
383    const SOCKET_PATH: &str = "/tmp/nydus-test-local-http-proxy.sock";
384
385    fn parse_range_header(range_str: &str) -> (u64, Option<u64>) {
386        let range_str = range_str.trim_start_matches("bytes=");
387        let range: Vec<&str> = range_str.split('-').collect();
388        let start = range[0].parse::<u64>().unwrap();
389        let end = match range[1] {
390            "" => None,
391            _ => Some(cmp::min(
392                range[1].parse::<u64>().unwrap(),
393                (CONTENT.len() - 1) as u64,
394            )),
395        };
396        (start, end)
397    }
398
399    async fn server_handler(
400        req: Request<Incoming>,
401    ) -> Result<Response<Full<Bytes>>, std::convert::Infallible> {
402        match *req.method() {
403            hyper::Method::HEAD => Ok::<_, std::convert::Infallible>(
404                Response::builder()
405                    .status(200)
406                    .header(http::header::CONTENT_LENGTH, CONTENT.len())
407                    .body(Full::new(Bytes::new()))
408                    .unwrap(),
409            ),
410            hyper::Method::GET => {
411                let range = req.headers()[http::header::RANGE].to_str().unwrap();
412                println!("range: {}", range);
413                let (start, end) = parse_range_header(range);
414                let length = match end {
415                    Some(e) => e - start + 1,
416                    None => CONTENT.len() as u64,
417                };
418                println!("start: {}, end: {:?}, length: {}", start, end, length);
419                let end = match end {
420                    Some(e) => e,
421                    None => (CONTENT.len() - 1) as u64,
422                };
423                let content = CONTENT.as_bytes()[start as usize..(end + 1) as usize].to_vec();
424                Ok::<_, std::convert::Infallible>(
425                    Response::builder()
426                        .status(200)
427                        .header(http::header::CONTENT_LENGTH, length)
428                        .body(Full::new(Bytes::from(content)))
429                        .unwrap(),
430                )
431            }
432            _ => Ok::<_, std::convert::Infallible>(
433                Response::builder()
434                    .status(status::StatusCode::METHOD_NOT_ALLOWED)
435                    .body(Full::new(Bytes::new()))
436                    .unwrap(),
437            ),
438        }
439    }
440
441    #[test]
442    fn test_head_and_get() {
443        thread::spawn(|| {
444            let rt = build_tokio_runtime("test-local-http-proxy-server", 1).unwrap();
445            rt.block_on(async {
446                println!("\nstarting local http proxy server......");
447                let path = Path::new(SOCKET_PATH);
448                if path.exists() {
449                    fs::remove_file(path).unwrap();
450                }
451                let listener = UnixListener::bind(path).unwrap();
452                loop {
453                    let (stream, _) = listener.accept().await.unwrap();
454                    let io = TokioIo::new(stream);
455                    tokio::spawn(async move {
456                        Builder::new(hyper_util::rt::TokioExecutor::new())
457                            .serve_connection(io, service_fn(server_handler))
458                            .await
459                            .ok();
460                    });
461                }
462            });
463        });
464
465        thread::spawn(|| {
466            let rt = build_tokio_runtime("test-remote-http-proxy-server", 1).unwrap();
467            rt.block_on(async {
468                println!("\nstarting remote http proxy server......");
469                let listener = TcpListener::bind(SocketAddr::new(
470                    IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
471                    9977,
472                ))
473                .await
474                .unwrap();
475                loop {
476                    let (stream, _) = listener.accept().await.unwrap();
477                    let io = TokioIo::new(stream);
478                    tokio::spawn(async move {
479                        Builder::new(hyper_util::rt::TokioExecutor::new())
480                            .serve_connection(io, service_fn(server_handler))
481                            .await
482                            .ok();
483                    });
484                }
485            });
486        });
487
488        // wait for server to start
489        thread::sleep(Duration::from_secs(5));
490
491        // start the client and test
492        let test_list: Vec<(String, String)> = vec![
493            (
494                format!(
495                    "{{\"addr\":\"{}\",\"path\":\"/namespace/<repo>/blobs\"}}",
496                    SOCKET_PATH,
497                ),
498                "test-local-http-proxy".to_string(),
499            ),
500            (
501                "{\"addr\":\"http://127.0.0.1:9977\",\"path\":\"/namespace/<repo>/blobs\"}"
502                    .to_string(),
503                "test-remote-http-proxy".to_string(),
504            ),
505        ];
506        for test_case in test_list.iter() {
507            let config: HttpProxyConfig = serde_json::from_str(test_case.0.as_str()).unwrap();
508            let backend = HttpProxy::new(&config, Some(test_case.1.as_str())).unwrap();
509            let reader = backend.get_reader("blob_id").unwrap();
510
511            println!();
512            println!("testing blob_size()......");
513            let blob_size = reader
514                .blob_size()
515                .map_err(|e| {
516                    println!("blob_size() failed: {}", e);
517                    e
518                })
519                .unwrap();
520            assert_eq!(blob_size, CONTENT.len() as u64);
521
522            println!();
523            println!("testing read() range......");
524            let mut buf = alloc_buf(3);
525            let size = reader
526                .try_read(&mut buf, 0)
527                .map_err(|e| {
528                    println!("read() range failed: {}", e);
529                    e
530                })
531                .unwrap();
532            assert_eq!(size, 3);
533            assert_eq!(buf, CONTENT.as_bytes()[0..3]);
534
535            println!();
536            println!("testing read() full......");
537            let mut buf = alloc_buf(80);
538            let size = reader
539                .try_read(&mut buf, 0)
540                .map_err(|e| {
541                    println!("read() range failed: {}", e);
542                    e
543                })
544                .unwrap();
545            assert_eq!(size, CONTENT.len() as usize);
546            assert_eq!(&buf[0..CONTENT.len()], CONTENT.as_bytes());
547        }
548    }
549}