Skip to main content

nydus_storage/backend/
connection.rs

1// Copyright 2020 Ant Group. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5//! Help library to manage network connections.
6use std::cell::RefCell;
7use std::collections::HashMap;
8use std::io::{Read, Result};
9use std::str::FromStr;
10use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU64, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13use std::{fmt, thread};
14
15use log::{max_level, Level};
16
17use reqwest::{
18    self,
19    blocking::{Body, Client, Response},
20    header::HeaderMap,
21    redirect::Policy,
22    Method, StatusCode, Url,
23};
24
25use nydus_api::{HttpProxyConfig, OssConfig, ProxyConfig, RegistryConfig, S3Config};
26use url::ParseError;
27
28const HEADER_AUTHORIZATION: &str = "Authorization";
29
30const RATE_LIMITED_LOG_TIME: u8 = 2;
31
32thread_local! {
33    pub static LAST_FALLBACK_AT: RefCell<SystemTime> = const { RefCell::new(UNIX_EPOCH) };
34}
35
36/// Error codes related to network communication.
37#[derive(Debug)]
38pub enum ConnectionError {
39    Disconnected,
40    ErrorWithMsg(String),
41    Common(reqwest::Error),
42    Format(reqwest::Error),
43    Url(String, ParseError),
44    Scheme(String),
45}
46
47impl fmt::Display for ConnectionError {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            ConnectionError::Disconnected => write!(f, "network connection disconnected"),
51            ConnectionError::ErrorWithMsg(s) => write!(f, "network error, {}", s),
52            ConnectionError::Common(e) => write!(f, "network error, {}", e),
53            ConnectionError::Format(e) => write!(f, "{}", e),
54            ConnectionError::Url(s, e) => write!(f, "failed to parse URL {}, {}", s, e),
55            ConnectionError::Scheme(s) => write!(f, "invalid scheme {}", s),
56        }
57    }
58}
59
60/// Specialized `Result` for network communication.
61type ConnectionResult<T> = std::result::Result<T, ConnectionError>;
62
63/// Generic configuration for storage backends.
64#[derive(Debug, Clone)]
65pub(crate) struct ConnectionConfig {
66    pub proxy: ProxyConfig,
67    pub skip_verify: bool,
68    pub timeout: u32,
69    pub connect_timeout: u32,
70    pub retry_limit: u8,
71}
72
73impl Default for ConnectionConfig {
74    fn default() -> Self {
75        Self {
76            proxy: ProxyConfig::default(),
77            skip_verify: false,
78            timeout: 5,
79            connect_timeout: 5,
80            retry_limit: 0,
81        }
82    }
83}
84
85impl From<OssConfig> for ConnectionConfig {
86    fn from(c: OssConfig) -> ConnectionConfig {
87        ConnectionConfig {
88            proxy: c.proxy,
89            skip_verify: c.skip_verify,
90            timeout: c.timeout,
91            connect_timeout: c.connect_timeout,
92            retry_limit: c.retry_limit,
93        }
94    }
95}
96
97impl From<S3Config> for ConnectionConfig {
98    fn from(c: S3Config) -> ConnectionConfig {
99        ConnectionConfig {
100            proxy: c.proxy,
101            skip_verify: c.skip_verify,
102            timeout: c.timeout,
103            connect_timeout: c.connect_timeout,
104            retry_limit: c.retry_limit,
105        }
106    }
107}
108
109impl From<RegistryConfig> for ConnectionConfig {
110    fn from(c: RegistryConfig) -> ConnectionConfig {
111        ConnectionConfig {
112            proxy: c.proxy,
113            skip_verify: c.skip_verify,
114            timeout: c.timeout,
115            connect_timeout: c.connect_timeout,
116            retry_limit: c.retry_limit,
117        }
118    }
119}
120
121impl From<HttpProxyConfig> for ConnectionConfig {
122    fn from(c: HttpProxyConfig) -> ConnectionConfig {
123        ConnectionConfig {
124            proxy: c.proxy,
125            skip_verify: c.skip_verify,
126            timeout: c.timeout,
127            connect_timeout: c.connect_timeout,
128            retry_limit: c.retry_limit,
129        }
130    }
131}
132
133/// HTTP request data with progress callback.
134#[derive(Clone)]
135pub struct Progress<R> {
136    inner: R,
137    current: usize,
138    total: usize,
139    callback: fn((usize, usize)),
140}
141
142impl<R> Progress<R> {
143    /// Create a new `Progress` object.
144    pub fn new(r: R, total: usize, callback: fn((usize, usize))) -> Progress<R> {
145        Progress {
146            inner: r,
147            current: 0,
148            total,
149            callback,
150        }
151    }
152}
153
154impl<R: Read + Send + 'static> Read for Progress<R> {
155    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
156        self.inner.read(buf).inspect(|&count| {
157            self.current += count as usize;
158            (self.callback)((self.current, self.total));
159        })
160    }
161}
162
163/// HTTP request data to send to server.
164#[derive(Clone)]
165pub enum ReqBody<R: Clone> {
166    Read(Progress<R>, usize),
167    Buf(Vec<u8>),
168    Form(HashMap<String, String>),
169}
170
171#[derive(Debug)]
172struct ProxyHealth {
173    status: AtomicBool,
174    ping_url: Option<Url>,
175    check_interval: Duration,
176    check_pause_elapsed: u64,
177}
178
179impl ProxyHealth {
180    fn new(check_interval: u64, check_pause_elapsed: u64, ping_url: Option<Url>) -> Self {
181        ProxyHealth {
182            status: AtomicBool::from(true),
183            ping_url,
184            check_interval: Duration::from_secs(check_interval),
185            check_pause_elapsed,
186        }
187    }
188
189    fn ok(&self) -> bool {
190        self.status.load(Ordering::Relaxed)
191    }
192
193    fn set(&self, health: bool) {
194        self.status.store(health, Ordering::Relaxed);
195    }
196}
197
198const SCHEME_REVERSION_CACHE_UNSET: i16 = 0;
199const SCHEME_REVERSION_CACHE_REPLACE: i16 = 1;
200const SCHEME_REVERSION_CACHE_RETAIN: i16 = 2;
201
202#[derive(Debug)]
203struct Proxy {
204    client: Client,
205    health: ProxyHealth,
206    fallback: bool,
207    use_http: bool,
208    // Cache whether should try to replace scheme for proxy url.
209    replace_scheme: AtomicI16,
210}
211
212impl Proxy {
213    fn try_use_http(&self, url: &str) -> Option<String> {
214        if self.replace_scheme.load(Ordering::Relaxed) == SCHEME_REVERSION_CACHE_REPLACE {
215            Some(url.replacen("https", "http", 1))
216        } else if self.replace_scheme.load(Ordering::Relaxed) == SCHEME_REVERSION_CACHE_UNSET {
217            if url.starts_with("https:") {
218                self.replace_scheme
219                    .store(SCHEME_REVERSION_CACHE_REPLACE, Ordering::Relaxed);
220                info!("Will replace backend's URL's scheme with http");
221                Some(url.replacen("https", "http", 1))
222            } else if url.starts_with("http:") {
223                self.replace_scheme
224                    .store(SCHEME_REVERSION_CACHE_RETAIN, Ordering::Relaxed);
225                None
226            } else {
227                warn!("Can't replace http scheme, url {}", url);
228                None
229            }
230        } else {
231            None
232        }
233    }
234}
235
236/// Check whether the HTTP status code is a success result.
237pub(crate) fn is_success_status(status: StatusCode) -> bool {
238    status >= StatusCode::OK && status < StatusCode::BAD_REQUEST
239}
240
241/// Convert a HTTP `Response` into an `Result<Response>`.
242pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult<Response> {
243    if !catch_status || is_success_status(resp.status()) {
244        Ok(resp)
245    } else {
246        let msg = resp.text().map_err(ConnectionError::Format)?;
247        Err(ConnectionError::ErrorWithMsg(msg))
248    }
249}
250
251/// A network connection to communicate with remote server.
252#[derive(Debug)]
253pub(crate) struct Connection {
254    client: Client,
255    proxy: Option<Arc<Proxy>>,
256    pub shutdown: AtomicBool,
257    /// Timestamp of connection's last active request, represents as duration since UNIX_EPOCH in seconds.
258    last_active: Arc<AtomicU64>,
259}
260
261impl Connection {
262    /// Create a new connection according to the configuration.
263    pub fn new(config: &ConnectionConfig) -> Result<Arc<Connection>> {
264        info!("backend config: {:?}", config);
265        let client = Self::build_connection("", config)?;
266
267        let proxy = if !config.proxy.url.is_empty() {
268            let ping_url = if !config.proxy.ping_url.is_empty() {
269                Some(Url::from_str(&config.proxy.ping_url).map_err(|e| einval!(e))?)
270            } else {
271                None
272            };
273            Some(Arc::new(Proxy {
274                client: Self::build_connection(&config.proxy.url, config)?,
275                health: ProxyHealth::new(
276                    config.proxy.check_interval,
277                    config.proxy.check_pause_elapsed,
278                    ping_url,
279                ),
280                fallback: config.proxy.fallback,
281                use_http: config.proxy.use_http,
282                replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET),
283            }))
284        } else {
285            None
286        };
287
288        let connection = Arc::new(Connection {
289            client,
290            proxy,
291            shutdown: AtomicBool::new(false),
292            last_active: Arc::new(AtomicU64::new(
293                SystemTime::now()
294                    .duration_since(UNIX_EPOCH)
295                    .unwrap()
296                    .as_secs(),
297            )),
298        });
299
300        // Start proxy's health checking thread.
301        connection.start_proxy_health_thread(config.connect_timeout as u64);
302
303        Ok(connection)
304    }
305
306    fn start_proxy_health_thread(&self, connect_timeout: u64) {
307        if let Some(proxy) = self.proxy.as_ref() {
308            if proxy.health.ping_url.is_some() {
309                let proxy = proxy.clone();
310                let last_active = Arc::clone(&self.last_active);
311
312                // Spawn thread to update the health status of proxy server.
313                thread::spawn(move || {
314                    let ping_url = proxy.health.ping_url.as_ref().unwrap();
315                    let mut last_success = true;
316
317                    loop {
318                        let elapsed = SystemTime::now()
319                            .duration_since(UNIX_EPOCH)
320                            .unwrap()
321                            .as_secs()
322                            - last_active.load(Ordering::Relaxed);
323                        // If the connection is not active for a set time, skip proxy health check.
324                        if elapsed <= proxy.health.check_pause_elapsed {
325                            let client = Client::new();
326                            let _ = client
327                                .get(ping_url.clone())
328                                .timeout(Duration::from_secs(connect_timeout as u64))
329                                .send()
330                                .map(|resp| {
331                                    let success = is_success_status(resp.status());
332                                    if last_success && !success {
333                                        warn!(
334                                            "Detected proxy unhealthy when pinging proxy, response status {}",
335                                            resp.status()
336                                        );
337                                    } else if !last_success && success {
338                                        info!("Backend proxy recovered")
339                                    }
340                                    last_success = success;
341                                    proxy.health.set(success);
342                                })
343                                .map_err(|e| {
344                                    if last_success {
345                                        warn!("Detected proxy unhealthy when ping proxy, {}", e);
346                                    }
347                                    last_success = false;
348                                    proxy.health.set(false)
349                                });
350                        }
351
352                        thread::sleep(proxy.health.check_interval);
353                    }
354                });
355            }
356        }
357    }
358
359    /// Shutdown the connection.
360    pub fn shutdown(&self) {
361        self.shutdown.store(true, Ordering::Release);
362    }
363
364    #[allow(clippy::too_many_arguments)]
365    pub fn call<R: Read + Clone + Send + 'static>(
366        &self,
367        method: Method,
368        url: &str,
369        query: Option<&[(&str, &str)]>,
370        data: Option<ReqBody<R>>,
371        headers: &mut HeaderMap,
372        catch_status: bool,
373    ) -> ConnectionResult<Response> {
374        if self.shutdown.load(Ordering::Acquire) {
375            return Err(ConnectionError::Disconnected);
376        }
377        self.last_active.store(
378            SystemTime::now()
379                .duration_since(UNIX_EPOCH)
380                .unwrap()
381                .as_secs(),
382            Ordering::Relaxed,
383        );
384
385        if let Some(proxy) = &self.proxy {
386            if proxy.health.ok() {
387                let data_cloned = data.as_ref().cloned();
388
389                let http_url: Option<String>;
390                let mut replaced_url = url;
391
392                if proxy.use_http {
393                    http_url = proxy.try_use_http(url);
394                    if let Some(ref r) = http_url {
395                        replaced_url = r.as_str();
396                    }
397                }
398
399                let result = self.call_inner(
400                    &proxy.client,
401                    method.clone(),
402                    replaced_url,
403                    &query,
404                    data_cloned,
405                    headers,
406                    catch_status,
407                    true,
408                );
409
410                match result {
411                    Ok(resp) => {
412                        if !proxy.fallback || resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
413                            return Ok(resp);
414                        }
415                    }
416                    Err(err) => {
417                        if !proxy.fallback {
418                            return Err(err);
419                        }
420                    }
421                }
422                // If proxy server responds invalid status code or http connection failed, we need to
423                // fallback to origin server, the policy only applicable to non-upload operation
424                warn!("Request proxy server failed, fallback to original server");
425            } else {
426                LAST_FALLBACK_AT.with(|f| {
427                    let current = SystemTime::now();
428                    if current.duration_since(*f.borrow()).unwrap().as_secs()
429                        >= RATE_LIMITED_LOG_TIME as u64
430                    {
431                        warn!("Proxy server is not healthy, fallback to original server");
432                        f.replace(current);
433                    }
434                })
435            }
436        }
437
438        self.call_inner(
439            &self.client,
440            method,
441            url,
442            &query,
443            data,
444            headers,
445            catch_status,
446            false,
447        )
448    }
449
450    fn build_connection(proxy: &str, config: &ConnectionConfig) -> Result<Client> {
451        let connect_timeout = if config.connect_timeout != 0 {
452            Some(Duration::from_secs(config.connect_timeout as u64))
453        } else {
454            None
455        };
456        let timeout = if config.timeout != 0 {
457            Some(Duration::from_secs(config.timeout as u64))
458        } else {
459            None
460        };
461
462        let mut cb = Client::builder()
463            .timeout(timeout)
464            .connect_timeout(connect_timeout)
465            // same number of redirects as containerd
466            // https://github.com/containerd/containerd/blob/main/core/remotes/docker/resolver.go#L596
467            .redirect(Policy::limited(10));
468
469        if config.skip_verify {
470            cb = cb.danger_accept_invalid_certs(true);
471        }
472
473        if !proxy.is_empty() {
474            cb = cb.proxy(reqwest::Proxy::all(proxy).map_err(|e| einval!(e))?)
475        }
476
477        cb.build().map_err(|e| einval!(e))
478    }
479
480    #[allow(clippy::too_many_arguments)]
481    fn call_inner<R: Read + Clone + Send + 'static>(
482        &self,
483        client: &Client,
484        method: Method,
485        url: &str,
486        query: &Option<&[(&str, &str)]>,
487        data: Option<ReqBody<R>>,
488        headers: &HeaderMap,
489        catch_status: bool,
490        proxy: bool,
491    ) -> ConnectionResult<Response> {
492        // Only clone header when debugging to reduce potential overhead.
493        let display_headers = if max_level() >= Level::Debug {
494            let mut display_headers = headers.clone();
495            display_headers.remove(HEADER_AUTHORIZATION);
496            Some(display_headers)
497        } else {
498            None
499        };
500        let has_data = data.is_some();
501        let start = Instant::now();
502
503        let mut rb = client.request(method.clone(), url).headers(headers.clone());
504        if let Some(q) = query.as_ref() {
505            rb = rb.query(q);
506        }
507
508        let ret;
509        if let Some(data) = data {
510            match data {
511                ReqBody::Read(body, total) => {
512                    let body = Body::sized(body, total as u64);
513                    ret = rb.body(body).send();
514                }
515                ReqBody::Buf(buf) => {
516                    ret = rb.body(buf).send();
517                }
518                ReqBody::Form(form) => {
519                    ret = rb.form(&form).send();
520                }
521            }
522        } else {
523            ret = rb.body("").send();
524        }
525
526        debug!(
527            "{} Request: {} {} headers: {:?}, proxy: {}, data: {}, duration: {}ms",
528            std::thread::current().name().unwrap_or_default(),
529            method,
530            url,
531            display_headers,
532            proxy,
533            has_data,
534            Instant::now().duration_since(start).as_millis(),
535        );
536
537        match ret {
538            Err(err) => Err(ConnectionError::Common(err)),
539            Ok(resp) => respond(resp, catch_status),
540        }
541    }
542}
543
544#[cfg(test)]
545mod tests {
546    use super::*;
547    use std::io::Cursor;
548
549    #[test]
550    fn test_progress() {
551        let buf = vec![0x1u8, 2, 3, 4, 5];
552        let mut progress = Progress::new(Cursor::new(buf), 5, |(curr, total)| {
553            assert!(curr == 2 || curr == 4);
554            assert_eq!(total, 5);
555        });
556
557        let mut buf1 = [0x0u8; 2];
558        assert_eq!(progress.read(&mut buf1).unwrap(), 2);
559        assert_eq!(buf1[0], 1);
560        assert_eq!(buf1[1], 2);
561
562        assert_eq!(progress.read(&mut buf1).unwrap(), 2);
563        assert_eq!(buf1[0], 3);
564        assert_eq!(buf1[1], 4);
565    }
566
567    #[test]
568    fn test_proxy_health() {
569        let checker = ProxyHealth::new(5, 300, None);
570
571        assert!(checker.ok());
572        assert!(checker.ok());
573        checker.set(false);
574        assert!(!checker.ok());
575        assert!(!checker.ok());
576        checker.set(true);
577        assert!(checker.ok());
578        assert!(checker.ok());
579    }
580
581    #[test]
582    fn test_is_success_status() {
583        assert!(!is_success_status(StatusCode::CONTINUE));
584        assert!(is_success_status(StatusCode::OK));
585        assert!(is_success_status(StatusCode::PERMANENT_REDIRECT));
586        assert!(!is_success_status(StatusCode::BAD_REQUEST));
587    }
588
589    #[test]
590    fn test_connection_config_default() {
591        let config = ConnectionConfig::default();
592
593        assert_eq!(config.timeout, 5);
594        assert_eq!(config.connect_timeout, 5);
595        assert_eq!(config.retry_limit, 0);
596        assert_eq!(config.proxy.check_interval, 5);
597        assert_eq!(config.proxy.check_pause_elapsed, 300);
598        assert!(config.proxy.fallback);
599        assert_eq!(config.proxy.ping_url, "");
600        assert_eq!(config.proxy.url, "");
601    }
602}