1use super::{
2 cache_dir::cache_dir_path_of,
3 dot::{ApiName, DotType, Dotter},
4 host_selector::HostSelector,
5};
6use dashmap::DashMap;
7use log::{info, warn};
8use once_cell::sync::Lazy;
9use reqwest::{blocking::Client as HTTPClient, StatusCode, Url};
10use serde::{
11 de::{Error as DeError, Visitor},
12 Deserialize, Deserializer, Serialize, Serializer,
13};
14use serde_json::{from_reader as json_from_reader, to_writer as json_to_writer};
15use std::{
16 collections::HashMap,
17 fmt,
18 fs::{rename as rename_file, OpenOptions},
19 io::{Error as IOError, ErrorKind as IOErrorKind, Result as IOResult},
20 path::Path,
21 sync::{Arc, Mutex},
22 thread::spawn,
23 time::{Duration, Instant, SystemTime},
24};
25use tap::prelude::*;
26
27#[derive(Debug, Clone, Eq, PartialEq, Hash)]
28struct CacheKey {
29 ak: Box<str>,
30 bucket: Box<str>,
31 hosts_crc32: u32,
32}
33
34impl CacheKey {
35 fn new(ak: Box<str>, bucket: Box<str>, hosts_crc32: u32) -> Self {
36 Self {
37 ak,
38 bucket,
39 hosts_crc32,
40 }
41 }
42}
43
44impl Serialize for CacheKey {
45 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
46 s.collect_str(&format!(
47 "cache-key-v2:{}:{}:{}",
48 self.ak, self.bucket, self.hosts_crc32
49 ))
50 }
51}
52
53struct CacheKeyVisitor;
54
55impl<'de> Visitor<'de> for CacheKeyVisitor {
56 type Value = CacheKey;
57
58 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
59 f.write_str("Key of cache")
60 }
61
62 fn visit_str<E: DeError>(self, value: &str) -> Result<Self::Value, E> {
63 if let Some(value) = value.strip_prefix("cache-key-v2:") {
64 let mut iter = value.splitn(3, ':');
65 match (iter.next(), iter.next(), iter.next()) {
66 (Some(ak), Some(bucket), Some(crc32_str)) => Ok(CacheKey {
67 ak: ak.into(),
68 bucket: bucket.into(),
69 hosts_crc32: crc32_str.parse().map_err(|err| {
70 E::custom(format!(
71 "Cannot parse hosts_crc32 from cache_key: {}: {}",
72 value, err
73 ))
74 })?,
75 }),
76 _ => Err(E::custom(format!("Invalid cache_key: {}", value))),
77 }
78 } else {
79 Err(E::custom(format!(
80 "Unrecognized version of cache_key: {}",
81 value
82 )))
83 }
84 }
85}
86
87impl<'de> Deserialize<'de> for CacheKey {
88 fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
89 d.deserialize_str(CacheKeyVisitor)
90 }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94struct CacheValue {
95 cached_response_body: ResponseBody,
96 cache_deadline: SystemTime,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100struct ResponseBody {
101 hosts: Vec<RegionResponseBody>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105struct RegionResponseBody {
106 ttl: u64,
107 io: DomainsResponseBody,
108 uc: DomainsResponseBody,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112struct DomainsResponseBody {
113 domains: Box<[Box<str>]>,
114}
115
116static CACHE_MAP: Lazy<DashMap<CacheKey, CacheValue>> = Lazy::new(Default::default);
117static CACHE_FILE_LOCK: Lazy<Mutex<()>> = Lazy::new(Default::default);
118static CACHE_INIT: Lazy<()> = Lazy::new(|| {
119 load_cache().ok();
120});
121
122#[derive(Clone)]
123pub(super) struct HostsQuerier {
124 uc_selector: HostSelector,
125 uc_tries: usize,
126 dotter: Dotter,
127 http_client: Arc<HTTPClient>,
128}
129
130impl HostsQuerier {
131 pub(super) fn new(
132 uc_selector: HostSelector,
133 uc_tries: usize,
134 dotter: Dotter,
135 http_client: Arc<HTTPClient>,
136 ) -> Self {
137 Self {
138 uc_selector,
139 uc_tries,
140 dotter,
141 http_client,
142 }
143 }
144
145 pub(super) fn query_for_io_urls(
146 &self,
147 ak: &str,
148 bucket: &str,
149 use_https: bool,
150 ) -> IOResult<Vec<String>> {
151 Lazy::force(&CACHE_INIT);
152
153 Ok(self
154 .query_for_domains(ak, bucket, use_https)?
155 .hosts
156 .first()
157 .expect("No host in uc query v4 response body")
158 .io
159 .domains
160 .iter()
161 .map(|domain| normalize_domain(domain, use_https))
162 .collect())
163 }
164
165 fn query_for_domains(&self, ak: &str, bucket: &str, use_https: bool) -> IOResult<ResponseBody> {
166 let cache_key = CacheKey::new(ak.into(), bucket.into(), self.uc_selector.all_hosts_crc32());
167
168 let mut modified = false;
169 let cache_value = CACHE_MAP
170 .entry(cache_key.to_owned())
171 .or_try_insert_with(|| {
172 let result = query_for_domains_without_cache(
173 ak,
174 bucket,
175 use_https,
176 &self.uc_selector,
177 self.uc_tries,
178 &self.http_client,
179 &self.dotter,
180 );
181 if result.is_ok() {
182 modified = true;
183 }
184 result
185 })?;
186
187 if cache_value.cache_deadline < SystemTime::now() {
188 let ak = ak.to_owned();
189 let bucket = bucket.to_owned();
190 let uc_selector = self.uc_selector.to_owned();
191 let http_client = self.http_client.to_owned();
192 let dotter = self.dotter.to_owned();
193 let uc_tries = self.uc_tries;
194 spawn(move || {
195 let mut modified = false;
196 CACHE_MAP.entry(cache_key).and_modify(|cache_value| {
197 if cache_value.cache_deadline < SystemTime::now() {
198 if let Ok(new_cache_value) = query_for_domains_without_cache(
199 ak,
200 bucket,
201 use_https,
202 &uc_selector,
203 uc_tries,
204 &http_client,
205 &dotter,
206 ) {
207 *cache_value = new_cache_value;
208 modified = true;
209 }
210 }
211 });
212 if modified {
213 let _ = save_cache();
214 }
215 });
216 } else if modified {
217 spawn(save_cache);
218 }
219
220 Ok(cache_value.cached_response_body.to_owned())
221 }
222}
223
224fn query_for_domains_without_cache(
225 ak: impl AsRef<str>,
226 bucket: impl AsRef<str>,
227 use_https: bool,
228 uc_selector: &HostSelector,
229 uc_tries: usize,
230 http_client: &HTTPClient,
231 dotter: &Dotter,
232) -> IOResult<CacheValue> {
233 return query_with_retry(
234 uc_selector,
235 uc_tries,
236 dotter,
237 |host, timeout_power, timeout| {
238 info!(
239 "try to query hosts from {}, ak = {}, bucket = {}",
240 host,
241 ak.as_ref(),
242 bucket.as_ref()
243 );
244
245 let url = Url::parse_with_params(
246 &format!("{}/v4/query", host),
247 &[("ak", ak.as_ref()), ("bucket", bucket.as_ref())],
248 )
249 .map_err(|err| IOError::new(IOErrorKind::InvalidInput, err))
250 .tap_err(|_| {
251 warn!("uc host {} is invalid", host);
252 })?;
253
254 http_client
255 .get(url.to_string())
256 .timeout(timeout)
257 .send()
258 .tap_err(|err| {
259 if err.is_timeout() {
260 uc_selector.increase_timeout_power_by(host, timeout_power);
261 }
262 })
263 .map_err(|err| IOError::new(IOErrorKind::ConnectionAborted, err))
264 .and_then(|resp| {
265 if resp.status() != StatusCode::OK {
266 Err(IOError::new(
267 IOErrorKind::Other,
268 format!("Unexpected status code {}", resp.status().as_u16()),
269 ))
270 } else {
271 let body = uc_selector.wrap_reader(resp, host, timeout_power);
272 serde_json::from_reader::<_, ResponseBody>(body)
273 .map_err(|err| IOError::new(IOErrorKind::BrokenPipe, err))
274 }
275 })
276 .tap_ok(|body| {
277 let uc_hosts: Vec<_> = body
278 .hosts
279 .first()
280 .map(|host| {
281 host.uc
282 .domains
283 .iter()
284 .map(|domain| normalize_domain(domain, use_https))
285 .collect()
286 })
287 .expect("No host in uc query v4 response body");
288 if !uc_hosts.is_empty() {
289 uc_selector.set_hosts(uc_hosts);
290 }
291 })
292 .map(|body| {
293 let min_ttl = body
294 .hosts
295 .iter()
296 .map(|host| host.ttl)
297 .min()
298 .expect("No host in uc query v4 response body");
299 CacheValue {
300 cached_response_body: body,
301 cache_deadline: SystemTime::now() + Duration::from_secs(min_ttl),
302 }
303 })
304 .tap_ok(|_| {
305 info!(
306 "update query cache for ak = {}, bucket = {} is successful",
307 ak.as_ref(),
308 bucket.as_ref(),
309 );
310 })
311 .tap_err(|err| {
312 warn!(
313 "failed to query hosts from {}, ak = {}, bucket = {}, err = {:?}",
314 host,
315 ak.as_ref(),
316 bucket.as_ref(),
317 err
318 );
319 })
320 },
321 );
322
323 fn query_with_retry<T>(
324 uc_selector: &HostSelector,
325 tries: usize,
326 dotter: &Dotter,
327 mut for_each_host: impl FnMut(&str, usize, Duration) -> IOResult<T>,
328 ) -> IOResult<T> {
329 let mut last_error = None;
330 for _ in 0..tries {
331 let host_info = uc_selector.select_host();
332 let begin_at = Instant::now();
333 match for_each_host(&host_info.host, host_info.timeout_power, host_info.timeout) {
334 Ok(response) => {
335 uc_selector.reward(&host_info.host);
336 dotter
337 .dot(DotType::Http, ApiName::UcV4Query, true, begin_at.elapsed())
338 .ok();
339 return Ok(response);
340 }
341 Err(err) => {
342 let punished = uc_selector.punish(&host_info.host, &err, dotter);
343 dotter
344 .dot(DotType::Http, ApiName::UcV4Query, false, begin_at.elapsed())
345 .ok();
346 if !punished {
347 return Err(err);
348 }
349 last_error = Some(err);
350 }
351 }
352 }
353 Err(last_error.expect("No UC tries error"))
354 }
355}
356
357const CACHE_FILE_NAME: &str = "query-cache.json";
358const CACHE_TEMPFILE_NAME: &str = "query-cache.tmp.json";
359
360fn load_cache() -> IOResult<()> {
361 let cache_file_path = cache_dir_path_of(CACHE_FILE_NAME)?;
362 match OpenOptions::new().read(true).open(&cache_file_path) {
363 Ok(cache_file) => {
364 let cache: HashMap<CacheKey, CacheValue> = json_from_reader(cache_file)
365 .tap_err(|err| {
366 warn!(
367 "Failed to parse cache from cache file {:?}: {}",
368 cache_file_path, err
369 )
370 })
371 .map_err(|err| IOError::new(IOErrorKind::Other, err))?;
372 CACHE_MAP.clear();
373 for (key, value) in cache.into_iter() {
374 CACHE_MAP.insert(key, value);
375 }
376 }
377 Err(err) => {
378 info!(
379 "Cache file is failed to open {:?}: {}",
380 cache_file_path, err
381 );
382 }
383 }
384 Ok(())
385}
386
387fn save_cache() -> IOResult<()> {
388 let cache_file_path = cache_dir_path_of(CACHE_FILE_NAME)?;
389 let cache_tempfile_path = cache_dir_path_of(CACHE_TEMPFILE_NAME)?;
390 let cache_file_lock_result = CACHE_FILE_LOCK.try_lock();
391 if cache_file_lock_result.is_err() {
392 info!(
393 "Cache file is locked, cannot save to {:?} now",
394 cache_file_path
395 );
396 return Ok(());
397 }
398 if let Err(err) = _save_cache(&cache_tempfile_path) {
399 warn!("Failed to save cache {:?}: {}", cache_tempfile_path, err);
400 } else {
401 info!("Save cache to {:?} successfully", cache_tempfile_path);
402 if let Err(err) = rename_file(&cache_tempfile_path, &cache_file_path) {
403 warn!(
404 "Failed to move cache file from {:?} to {:?}: {}",
405 cache_tempfile_path, cache_file_path, err
406 );
407 } else {
408 info!(
409 "Move cache from {:?} to {:?} successfully",
410 cache_tempfile_path, cache_file_path
411 );
412 }
413 }
414 return Ok(());
415
416 fn _save_cache(cache_file_path: &Path) -> anyhow::Result<()> {
417 let mut cache_file = OpenOptions::new()
418 .write(true)
419 .create(true)
420 .truncate(true)
421 .open(cache_file_path)?;
422 json_to_writer(&mut cache_file, &*CACHE_MAP)
423 .map_err(|err| IOError::new(IOErrorKind::Other, err))?;
424 Ok(())
425 }
426}
427
428#[inline]
429fn normalize_domain(domain: &str, use_https: bool) -> String {
430 if domain.contains("://") {
431 domain.to_string()
432 } else if use_https {
433 "https://".to_owned() + domain
434 } else {
435 "http://".to_owned() + domain
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::{
442 super::{
443 super::{base::credential::Credential, config::Timeouts},
444 dot::{DotRecordKey, DotRecords, DotRecordsDashMap, DOT_FILE_NAME},
445 },
446 *,
447 };
448 use futures::channel::oneshot::channel;
449 use serde::Serialize;
450 use serde_json::json;
451 use std::{
452 boxed::Box,
453 error::Error,
454 result::Result,
455 sync::{
456 atomic::{AtomicUsize, Ordering::Relaxed},
457 Arc,
458 },
459 thread::sleep,
460 };
461 use tokio::task::{spawn, spawn_blocking};
462 use warp::{
463 http::header::{HeaderValue, AUTHORIZATION},
464 hyper::Body,
465 path,
466 reply::Response,
467 Filter,
468 };
469
470 macro_rules! starts_with_server {
471 ($uc_addr:ident, $monitor_addr:ident, $uc_routes:ident, $monitor_routes:ident, $code:block) => {{
472 let (uc_tx, uc_rx) = channel();
473 let (monitor_tx, monitor_rx) = channel();
474 let ($uc_addr, uc_server) = warp::serve($uc_routes).bind_with_graceful_shutdown(
475 ([127, 0, 0, 1], 0),
476 async move {
477 uc_rx.await.unwrap();
478 },
479 );
480 let ($monitor_addr, monitor_server) = warp::serve($monitor_routes)
481 .bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
482 monitor_rx.await.unwrap();
483 });
484 spawn(uc_server);
485 spawn(monitor_server);
486 $code;
487 uc_tx.send(()).unwrap();
488 monitor_tx.send(()).unwrap();
489 }};
490 }
491
492 #[derive(Deserialize, Serialize)]
493 struct UcQueryParams {
494 ak: String,
495 bucket: String,
496 }
497
498 const ACCESS_KEY: &str = "0123456789001234567890";
499 const SECRET_KEY: &str = "abcdefghijklmnioqrstuv";
500 const BUCKET_NAME: &str = "test-bucket";
501
502 fn get_credential() -> Credential {
503 Credential::new(ACCESS_KEY, SECRET_KEY)
504 }
505
506 #[tokio::test]
507 async fn test_uc_query_v4() -> Result<(), Box<dyn Error>> {
508 env_logger::try_init().ok();
509
510 CACHE_MAP.clear();
511 clear_cache()?;
512
513 let uc_routes = path!("v4" / "query")
514 .and(warp::query::<UcQueryParams>())
515 .map(|params: UcQueryParams| {
516 assert_eq!(¶ms.ak, ACCESS_KEY);
517 assert_eq!(¶ms.bucket, BUCKET_NAME);
518 Response::new(
519 json!({
520 "hosts": [{
521 "region": "z0",
522 "ttl":10,
523 "io": {
524 "domains": [
525 "iovip.qbox.me"
526 ]
527 },
528 "uc": {
529 "domains": [
530 "uc.qbox.me"
531 ]
532 }
533 }]
534 })
535 .to_string()
536 .into(),
537 )
538 });
539
540 let monitor_called = Arc::new(AtomicUsize::new(0));
541 let monitor_routes = {
542 let monitor_called = monitor_called.to_owned();
543 path!("v1" / "stat")
544 .and(warp::header::value(AUTHORIZATION.as_str()))
545 .and(warp::body::json())
546 .map(move |authorization: HeaderValue, records: DotRecords| {
547 monitor_called.fetch_add(1, Relaxed);
548 assert!(authorization.to_str().unwrap().starts_with("UpToken "));
549 assert_eq!(records.records().len(), 1);
550 let record = records.records().first().unwrap();
551 assert_eq!(record.dot_type(), Some(DotType::Http));
552 assert_eq!(record.api_name(), Some(ApiName::UcV4Query));
553 assert_eq!(record.success_count(), Some(1));
554 assert_eq!(record.failed_count(), Some(0));
555 Response::new(Body::empty())
556 })
557 };
558 starts_with_server!(uc_addr, monitor_addr, uc_routes, monitor_routes, {
559 spawn_blocking(move || -> IOResult<()> {
560 let dotter = Dotter::new(
561 Timeouts::default_http_client(),
562 get_credential(),
563 BUCKET_NAME.to_owned(),
564 vec!["http://".to_owned() + &monitor_addr.to_string()],
565 Some(Duration::from_millis(0)),
566 Some(1),
567 None,
568 None,
569 None,
570 None,
571 None,
572 );
573 let host_selector =
574 HostSelector::builder(vec!["http://".to_owned() + &uc_addr.to_string()])
575 .build();
576 let querier =
577 HostsQuerier::new(host_selector, 1, dotter, Timeouts::default_http_client());
578 let io_urls = querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
579 assert_eq!(&io_urls, &["http://iovip.qbox.me".to_owned()]);
580 assert_eq!(
581 &querier.uc_selector.hosts(),
582 &["http://uc.qbox.me".to_owned()]
583 );
584 assert_eq!(&querier.uc_selector.select_host().host, "http://uc.qbox.me");
585 sleep(Duration::from_secs(5));
586 assert_eq!(monitor_called.load(Relaxed), 1);
587 Ok(())
588 })
589 .await??;
590 });
591 Ok(())
592 }
593
594 #[tokio::test]
595 async fn test_uc_query_v4_with_cache() -> Result<(), Box<dyn Error>> {
596 env_logger::try_init().ok();
597
598 CACHE_MAP.clear();
599 clear_cache()?;
600
601 let uc_called = Arc::new(AtomicUsize::new(0));
602 let records_map = Arc::new(DotRecordsDashMap::default());
603
604 let uc_routes = {
605 let uc_called = uc_called.to_owned();
606 path!("v4" / "query")
607 .and(warp::query::<UcQueryParams>())
608 .map(move |params: UcQueryParams| {
609 uc_called.fetch_add(1, Relaxed);
610 assert_eq!(¶ms.ak, ACCESS_KEY);
611 assert_eq!(¶ms.bucket, BUCKET_NAME);
612 Response::new(
613 json!({
614 "hosts": [{
615 "region": "z0",
616 "ttl":1,
617 "io": {
618 "domains": [
619 "iovip.qbox.me"
620 ]
621 },
622 "uc": {
623 "domains": []
624 }
625 }]
626 })
627 .to_string()
628 .into(),
629 )
630 })
631 };
632 let monitor_routes = {
633 let records_map = records_map.to_owned();
634 path!("v1" / "stat")
635 .and(warp::header::value(AUTHORIZATION.as_str()))
636 .and(warp::body::json())
637 .map(move |authorization: HeaderValue, records: DotRecords| {
638 assert!(authorization.to_str().unwrap().starts_with("UpToken "));
639 records_map.merge_with_records(records);
640 Response::new(Body::empty())
641 })
642 };
643
644 starts_with_server!(uc_addr, monitor_addr, uc_routes, monitor_routes, {
645 spawn_blocking(move || -> IOResult<()> {
646 let dotter = Dotter::new(
647 Timeouts::default_http_client(),
648 get_credential(),
649 BUCKET_NAME.to_owned(),
650 vec!["http://".to_owned() + &monitor_addr.to_string()],
651 Some(Duration::from_millis(0)),
652 Some(1),
653 None,
654 None,
655 None,
656 None,
657 None,
658 );
659 let host_selector =
660 HostSelector::builder(vec!["http://".to_owned() + &uc_addr.to_string()])
661 .build();
662 let hosts_querier =
663 HostsQuerier::new(host_selector, 1, dotter, Timeouts::default_http_client());
664 let mut io_urls =
665 hosts_querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
666 assert_eq!(io_urls, vec!["http://iovip.qbox.me".to_owned()]);
667 assert_eq!(uc_called.load(Relaxed), 1);
668
669 io_urls = hosts_querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
670 assert_eq!(io_urls, vec!["http://iovip.qbox.me".to_owned()]);
671 assert_eq!(uc_called.load(Relaxed), 1);
672
673 sleep(Duration::from_secs(3));
674
675 io_urls = hosts_querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
676 assert_eq!(io_urls, vec!["http://iovip.qbox.me".to_owned()]);
677 assert_eq!(uc_called.load(Relaxed), 1);
678
679 sleep(Duration::from_secs(3));
680 assert_eq!(uc_called.load(Relaxed), 2);
681
682 CACHE_MAP.clear();
683 load_cache().ok();
684
685 io_urls = hosts_querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
686 assert_eq!(io_urls, vec!["http://iovip.qbox.me".to_owned()]);
687 assert_eq!(uc_called.load(Relaxed), 2);
688
689 sleep(Duration::from_secs(5));
690 {
691 let record = records_map
692 .get(&DotRecordKey::new(DotType::Http, ApiName::UcV4Query))
693 .unwrap();
694 assert_eq!(record.success_count(), Some(3));
695 assert_eq!(record.failed_count(), Some(0));
696 }
697
698 Ok(())
699 })
700 .await??;
701 });
702 Ok(())
703 }
704
705 fn clear_cache() -> IOResult<()> {
706 let cache_file_path = cache_dir_path_of(CACHE_FILE_NAME)?;
707 std::fs::remove_file(cache_file_path).or_else(|err| {
708 if err.kind() == IOErrorKind::NotFound {
709 Ok(())
710 } else {
711 Err(err)
712 }
713 })?;
714 let dot_file_path = cache_dir_path_of(DOT_FILE_NAME)?;
715 std::fs::remove_file(dot_file_path).or_else(|err| {
716 if err.kind() == IOErrorKind::NotFound {
717 Ok(())
718 } else {
719 Err(err)
720 }
721 })?;
722 Ok(())
723 }
724}