1use std::cell::RefCell;
7use std::collections::HashMap;
8use std::io::{Read, Result};
9use std::str::FromStr;
10use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU64, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13use std::{fmt, thread};
14
15use log::{max_level, Level};
16
17use reqwest::{
18 self,
19 blocking::{Body, Client, Response},
20 header::HeaderMap,
21 redirect::Policy,
22 Method, StatusCode, Url,
23};
24
25use nydus_api::{HttpProxyConfig, OssConfig, ProxyConfig, RegistryConfig, S3Config};
26use url::ParseError;
27
28const HEADER_AUTHORIZATION: &str = "Authorization";
29
30const RATE_LIMITED_LOG_TIME: u8 = 2;
31
32thread_local! {
33 pub static LAST_FALLBACK_AT: RefCell<SystemTime> = const { RefCell::new(UNIX_EPOCH) };
34}
35
36#[derive(Debug)]
38pub enum ConnectionError {
39 Disconnected,
40 ErrorWithMsg(String),
41 Common(reqwest::Error),
42 Format(reqwest::Error),
43 Url(String, ParseError),
44 Scheme(String),
45}
46
47impl fmt::Display for ConnectionError {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 ConnectionError::Disconnected => write!(f, "network connection disconnected"),
51 ConnectionError::ErrorWithMsg(s) => write!(f, "network error, {}", s),
52 ConnectionError::Common(e) => write!(f, "network error, {}", e),
53 ConnectionError::Format(e) => write!(f, "{}", e),
54 ConnectionError::Url(s, e) => write!(f, "failed to parse URL {}, {}", s, e),
55 ConnectionError::Scheme(s) => write!(f, "invalid scheme {}", s),
56 }
57 }
58}
59
60type ConnectionResult<T> = std::result::Result<T, ConnectionError>;
62
63#[derive(Debug, Clone)]
65pub(crate) struct ConnectionConfig {
66 pub proxy: ProxyConfig,
67 pub skip_verify: bool,
68 pub timeout: u32,
69 pub connect_timeout: u32,
70 pub retry_limit: u8,
71}
72
73impl Default for ConnectionConfig {
74 fn default() -> Self {
75 Self {
76 proxy: ProxyConfig::default(),
77 skip_verify: false,
78 timeout: 5,
79 connect_timeout: 5,
80 retry_limit: 0,
81 }
82 }
83}
84
85impl From<OssConfig> for ConnectionConfig {
86 fn from(c: OssConfig) -> ConnectionConfig {
87 ConnectionConfig {
88 proxy: c.proxy,
89 skip_verify: c.skip_verify,
90 timeout: c.timeout,
91 connect_timeout: c.connect_timeout,
92 retry_limit: c.retry_limit,
93 }
94 }
95}
96
97impl From<S3Config> for ConnectionConfig {
98 fn from(c: S3Config) -> ConnectionConfig {
99 ConnectionConfig {
100 proxy: c.proxy,
101 skip_verify: c.skip_verify,
102 timeout: c.timeout,
103 connect_timeout: c.connect_timeout,
104 retry_limit: c.retry_limit,
105 }
106 }
107}
108
109impl From<RegistryConfig> for ConnectionConfig {
110 fn from(c: RegistryConfig) -> ConnectionConfig {
111 ConnectionConfig {
112 proxy: c.proxy,
113 skip_verify: c.skip_verify,
114 timeout: c.timeout,
115 connect_timeout: c.connect_timeout,
116 retry_limit: c.retry_limit,
117 }
118 }
119}
120
121impl From<HttpProxyConfig> for ConnectionConfig {
122 fn from(c: HttpProxyConfig) -> ConnectionConfig {
123 ConnectionConfig {
124 proxy: c.proxy,
125 skip_verify: c.skip_verify,
126 timeout: c.timeout,
127 connect_timeout: c.connect_timeout,
128 retry_limit: c.retry_limit,
129 }
130 }
131}
132
133#[derive(Clone)]
135pub struct Progress<R> {
136 inner: R,
137 current: usize,
138 total: usize,
139 callback: fn((usize, usize)),
140}
141
142impl<R> Progress<R> {
143 pub fn new(r: R, total: usize, callback: fn((usize, usize))) -> Progress<R> {
145 Progress {
146 inner: r,
147 current: 0,
148 total,
149 callback,
150 }
151 }
152}
153
154impl<R: Read + Send + 'static> Read for Progress<R> {
155 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
156 self.inner.read(buf).inspect(|&count| {
157 self.current += count as usize;
158 (self.callback)((self.current, self.total));
159 })
160 }
161}
162
163#[derive(Clone)]
165pub enum ReqBody<R: Clone> {
166 Read(Progress<R>, usize),
167 Buf(Vec<u8>),
168 Form(HashMap<String, String>),
169}
170
171#[derive(Debug)]
172struct ProxyHealth {
173 status: AtomicBool,
174 ping_url: Option<Url>,
175 check_interval: Duration,
176 check_pause_elapsed: u64,
177}
178
179impl ProxyHealth {
180 fn new(check_interval: u64, check_pause_elapsed: u64, ping_url: Option<Url>) -> Self {
181 ProxyHealth {
182 status: AtomicBool::from(true),
183 ping_url,
184 check_interval: Duration::from_secs(check_interval),
185 check_pause_elapsed,
186 }
187 }
188
189 fn ok(&self) -> bool {
190 self.status.load(Ordering::Relaxed)
191 }
192
193 fn set(&self, health: bool) {
194 self.status.store(health, Ordering::Relaxed);
195 }
196}
197
198const SCHEME_REVERSION_CACHE_UNSET: i16 = 0;
199const SCHEME_REVERSION_CACHE_REPLACE: i16 = 1;
200const SCHEME_REVERSION_CACHE_RETAIN: i16 = 2;
201
202#[derive(Debug)]
203struct Proxy {
204 client: Client,
205 health: ProxyHealth,
206 fallback: bool,
207 use_http: bool,
208 replace_scheme: AtomicI16,
210}
211
212impl Proxy {
213 fn try_use_http(&self, url: &str) -> Option<String> {
214 if self.replace_scheme.load(Ordering::Relaxed) == SCHEME_REVERSION_CACHE_REPLACE {
215 Some(url.replacen("https", "http", 1))
216 } else if self.replace_scheme.load(Ordering::Relaxed) == SCHEME_REVERSION_CACHE_UNSET {
217 if url.starts_with("https:") {
218 self.replace_scheme
219 .store(SCHEME_REVERSION_CACHE_REPLACE, Ordering::Relaxed);
220 info!("Will replace backend's URL's scheme with http");
221 Some(url.replacen("https", "http", 1))
222 } else if url.starts_with("http:") {
223 self.replace_scheme
224 .store(SCHEME_REVERSION_CACHE_RETAIN, Ordering::Relaxed);
225 None
226 } else {
227 warn!("Can't replace http scheme, url {}", url);
228 None
229 }
230 } else {
231 None
232 }
233 }
234}
235
236pub(crate) fn is_success_status(status: StatusCode) -> bool {
238 status >= StatusCode::OK && status < StatusCode::BAD_REQUEST
239}
240
241pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult<Response> {
243 if !catch_status || is_success_status(resp.status()) {
244 Ok(resp)
245 } else {
246 let msg = resp.text().map_err(ConnectionError::Format)?;
247 Err(ConnectionError::ErrorWithMsg(msg))
248 }
249}
250
251#[derive(Debug)]
253pub(crate) struct Connection {
254 client: Client,
255 proxy: Option<Arc<Proxy>>,
256 pub shutdown: AtomicBool,
257 last_active: Arc<AtomicU64>,
259}
260
261impl Connection {
262 pub fn new(config: &ConnectionConfig) -> Result<Arc<Connection>> {
264 info!("backend config: {:?}", config);
265 let client = Self::build_connection("", config)?;
266
267 let proxy = if !config.proxy.url.is_empty() {
268 let ping_url = if !config.proxy.ping_url.is_empty() {
269 Some(Url::from_str(&config.proxy.ping_url).map_err(|e| einval!(e))?)
270 } else {
271 None
272 };
273 Some(Arc::new(Proxy {
274 client: Self::build_connection(&config.proxy.url, config)?,
275 health: ProxyHealth::new(
276 config.proxy.check_interval,
277 config.proxy.check_pause_elapsed,
278 ping_url,
279 ),
280 fallback: config.proxy.fallback,
281 use_http: config.proxy.use_http,
282 replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET),
283 }))
284 } else {
285 None
286 };
287
288 let connection = Arc::new(Connection {
289 client,
290 proxy,
291 shutdown: AtomicBool::new(false),
292 last_active: Arc::new(AtomicU64::new(
293 SystemTime::now()
294 .duration_since(UNIX_EPOCH)
295 .unwrap()
296 .as_secs(),
297 )),
298 });
299
300 connection.start_proxy_health_thread(config.connect_timeout as u64);
302
303 Ok(connection)
304 }
305
306 fn start_proxy_health_thread(&self, connect_timeout: u64) {
307 if let Some(proxy) = self.proxy.as_ref() {
308 if proxy.health.ping_url.is_some() {
309 let proxy = proxy.clone();
310 let last_active = Arc::clone(&self.last_active);
311
312 thread::spawn(move || {
314 let ping_url = proxy.health.ping_url.as_ref().unwrap();
315 let mut last_success = true;
316
317 loop {
318 let elapsed = SystemTime::now()
319 .duration_since(UNIX_EPOCH)
320 .unwrap()
321 .as_secs()
322 - last_active.load(Ordering::Relaxed);
323 if elapsed <= proxy.health.check_pause_elapsed {
325 let client = Client::new();
326 let _ = client
327 .get(ping_url.clone())
328 .timeout(Duration::from_secs(connect_timeout as u64))
329 .send()
330 .map(|resp| {
331 let success = is_success_status(resp.status());
332 if last_success && !success {
333 warn!(
334 "Detected proxy unhealthy when pinging proxy, response status {}",
335 resp.status()
336 );
337 } else if !last_success && success {
338 info!("Backend proxy recovered")
339 }
340 last_success = success;
341 proxy.health.set(success);
342 })
343 .map_err(|e| {
344 if last_success {
345 warn!("Detected proxy unhealthy when ping proxy, {}", e);
346 }
347 last_success = false;
348 proxy.health.set(false)
349 });
350 }
351
352 thread::sleep(proxy.health.check_interval);
353 }
354 });
355 }
356 }
357 }
358
359 pub fn shutdown(&self) {
361 self.shutdown.store(true, Ordering::Release);
362 }
363
364 #[allow(clippy::too_many_arguments)]
365 pub fn call<R: Read + Clone + Send + 'static>(
366 &self,
367 method: Method,
368 url: &str,
369 query: Option<&[(&str, &str)]>,
370 data: Option<ReqBody<R>>,
371 headers: &mut HeaderMap,
372 catch_status: bool,
373 ) -> ConnectionResult<Response> {
374 if self.shutdown.load(Ordering::Acquire) {
375 return Err(ConnectionError::Disconnected);
376 }
377 self.last_active.store(
378 SystemTime::now()
379 .duration_since(UNIX_EPOCH)
380 .unwrap()
381 .as_secs(),
382 Ordering::Relaxed,
383 );
384
385 if let Some(proxy) = &self.proxy {
386 if proxy.health.ok() {
387 let data_cloned = data.as_ref().cloned();
388
389 let http_url: Option<String>;
390 let mut replaced_url = url;
391
392 if proxy.use_http {
393 http_url = proxy.try_use_http(url);
394 if let Some(ref r) = http_url {
395 replaced_url = r.as_str();
396 }
397 }
398
399 let result = self.call_inner(
400 &proxy.client,
401 method.clone(),
402 replaced_url,
403 &query,
404 data_cloned,
405 headers,
406 catch_status,
407 true,
408 );
409
410 match result {
411 Ok(resp) => {
412 if !proxy.fallback || resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
413 return Ok(resp);
414 }
415 }
416 Err(err) => {
417 if !proxy.fallback {
418 return Err(err);
419 }
420 }
421 }
422 warn!("Request proxy server failed, fallback to original server");
425 } else {
426 LAST_FALLBACK_AT.with(|f| {
427 let current = SystemTime::now();
428 if current.duration_since(*f.borrow()).unwrap().as_secs()
429 >= RATE_LIMITED_LOG_TIME as u64
430 {
431 warn!("Proxy server is not healthy, fallback to original server");
432 f.replace(current);
433 }
434 })
435 }
436 }
437
438 self.call_inner(
439 &self.client,
440 method,
441 url,
442 &query,
443 data,
444 headers,
445 catch_status,
446 false,
447 )
448 }
449
450 fn build_connection(proxy: &str, config: &ConnectionConfig) -> Result<Client> {
451 let connect_timeout = if config.connect_timeout != 0 {
452 Some(Duration::from_secs(config.connect_timeout as u64))
453 } else {
454 None
455 };
456 let timeout = if config.timeout != 0 {
457 Some(Duration::from_secs(config.timeout as u64))
458 } else {
459 None
460 };
461
462 let mut cb = Client::builder()
463 .timeout(timeout)
464 .connect_timeout(connect_timeout)
465 .redirect(Policy::limited(10));
468
469 if config.skip_verify {
470 cb = cb.danger_accept_invalid_certs(true);
471 }
472
473 if !proxy.is_empty() {
474 cb = cb.proxy(reqwest::Proxy::all(proxy).map_err(|e| einval!(e))?)
475 }
476
477 cb.build().map_err(|e| einval!(e))
478 }
479
480 #[allow(clippy::too_many_arguments)]
481 fn call_inner<R: Read + Clone + Send + 'static>(
482 &self,
483 client: &Client,
484 method: Method,
485 url: &str,
486 query: &Option<&[(&str, &str)]>,
487 data: Option<ReqBody<R>>,
488 headers: &HeaderMap,
489 catch_status: bool,
490 proxy: bool,
491 ) -> ConnectionResult<Response> {
492 let display_headers = if max_level() >= Level::Debug {
494 let mut display_headers = headers.clone();
495 display_headers.remove(HEADER_AUTHORIZATION);
496 Some(display_headers)
497 } else {
498 None
499 };
500 let has_data = data.is_some();
501 let start = Instant::now();
502
503 let mut rb = client.request(method.clone(), url).headers(headers.clone());
504 if let Some(q) = query.as_ref() {
505 rb = rb.query(q);
506 }
507
508 let ret;
509 if let Some(data) = data {
510 match data {
511 ReqBody::Read(body, total) => {
512 let body = Body::sized(body, total as u64);
513 ret = rb.body(body).send();
514 }
515 ReqBody::Buf(buf) => {
516 ret = rb.body(buf).send();
517 }
518 ReqBody::Form(form) => {
519 ret = rb.form(&form).send();
520 }
521 }
522 } else {
523 ret = rb.body("").send();
524 }
525
526 debug!(
527 "{} Request: {} {} headers: {:?}, proxy: {}, data: {}, duration: {}ms",
528 std::thread::current().name().unwrap_or_default(),
529 method,
530 url,
531 display_headers,
532 proxy,
533 has_data,
534 Instant::now().duration_since(start).as_millis(),
535 );
536
537 match ret {
538 Err(err) => Err(ConnectionError::Common(err)),
539 Ok(resp) => respond(resp, catch_status),
540 }
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use super::*;
547 use std::io::Cursor;
548
549 #[test]
550 fn test_progress() {
551 let buf = vec![0x1u8, 2, 3, 4, 5];
552 let mut progress = Progress::new(Cursor::new(buf), 5, |(curr, total)| {
553 assert!(curr == 2 || curr == 4);
554 assert_eq!(total, 5);
555 });
556
557 let mut buf1 = [0x0u8; 2];
558 assert_eq!(progress.read(&mut buf1).unwrap(), 2);
559 assert_eq!(buf1[0], 1);
560 assert_eq!(buf1[1], 2);
561
562 assert_eq!(progress.read(&mut buf1).unwrap(), 2);
563 assert_eq!(buf1[0], 3);
564 assert_eq!(buf1[1], 4);
565 }
566
567 #[test]
568 fn test_proxy_health() {
569 let checker = ProxyHealth::new(5, 300, None);
570
571 assert!(checker.ok());
572 assert!(checker.ok());
573 checker.set(false);
574 assert!(!checker.ok());
575 assert!(!checker.ok());
576 checker.set(true);
577 assert!(checker.ok());
578 assert!(checker.ok());
579 }
580
581 #[test]
582 fn test_is_success_status() {
583 assert!(!is_success_status(StatusCode::CONTINUE));
584 assert!(is_success_status(StatusCode::OK));
585 assert!(is_success_status(StatusCode::PERMANENT_REDIRECT));
586 assert!(!is_success_status(StatusCode::BAD_REQUEST));
587 }
588
589 #[test]
590 fn test_connection_config_default() {
591 let config = ConnectionConfig::default();
592
593 assert_eq!(config.timeout, 5);
594 assert_eq!(config.connect_timeout, 5);
595 assert_eq!(config.retry_limit, 0);
596 assert_eq!(config.proxy.check_interval, 5);
597 assert_eq!(config.proxy.check_pause_elapsed, 300);
598 assert!(config.proxy.fallback);
599 assert_eq!(config.proxy.ping_url, "");
600 assert_eq!(config.proxy.url, "");
601 }
602}