qiniu_download/sync_api/
query.rs

1use super::{
2    cache_dir::cache_dir_path_of,
3    dot::{ApiName, DotType, Dotter},
4    host_selector::HostSelector,
5};
6use dashmap::DashMap;
7use log::{info, warn};
8use once_cell::sync::Lazy;
9use reqwest::{blocking::Client as HTTPClient, StatusCode, Url};
10use serde::{
11    de::{Error as DeError, Visitor},
12    Deserialize, Deserializer, Serialize, Serializer,
13};
14use serde_json::{from_reader as json_from_reader, to_writer as json_to_writer};
15use std::{
16    collections::HashMap,
17    fmt,
18    fs::{rename as rename_file, OpenOptions},
19    io::{Error as IOError, ErrorKind as IOErrorKind, Result as IOResult},
20    path::Path,
21    sync::{Arc, Mutex},
22    thread::spawn,
23    time::{Duration, Instant, SystemTime},
24};
25use tap::prelude::*;
26
27#[derive(Debug, Clone, Eq, PartialEq, Hash)]
28struct CacheKey {
29    ak: Box<str>,
30    bucket: Box<str>,
31    hosts_crc32: u32,
32}
33
34impl CacheKey {
35    fn new(ak: Box<str>, bucket: Box<str>, hosts_crc32: u32) -> Self {
36        Self {
37            ak,
38            bucket,
39            hosts_crc32,
40        }
41    }
42}
43
44impl Serialize for CacheKey {
45    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
46        s.collect_str(&format!(
47            "cache-key-v2:{}:{}:{}",
48            self.ak, self.bucket, self.hosts_crc32
49        ))
50    }
51}
52
53struct CacheKeyVisitor;
54
55impl<'de> Visitor<'de> for CacheKeyVisitor {
56    type Value = CacheKey;
57
58    fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
59        f.write_str("Key of cache")
60    }
61
62    fn visit_str<E: DeError>(self, value: &str) -> Result<Self::Value, E> {
63        if let Some(value) = value.strip_prefix("cache-key-v2:") {
64            let mut iter = value.splitn(3, ':');
65            match (iter.next(), iter.next(), iter.next()) {
66                (Some(ak), Some(bucket), Some(crc32_str)) => Ok(CacheKey {
67                    ak: ak.into(),
68                    bucket: bucket.into(),
69                    hosts_crc32: crc32_str.parse().map_err(|err| {
70                        E::custom(format!(
71                            "Cannot parse hosts_crc32 from cache_key: {}: {}",
72                            value, err
73                        ))
74                    })?,
75                }),
76                _ => Err(E::custom(format!("Invalid cache_key: {}", value))),
77            }
78        } else {
79            Err(E::custom(format!(
80                "Unrecognized version of cache_key: {}",
81                value
82            )))
83        }
84    }
85}
86
87impl<'de> Deserialize<'de> for CacheKey {
88    fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
89        d.deserialize_str(CacheKeyVisitor)
90    }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94struct CacheValue {
95    cached_response_body: ResponseBody,
96    cache_deadline: SystemTime,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100struct ResponseBody {
101    hosts: Vec<RegionResponseBody>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105struct RegionResponseBody {
106    ttl: u64,
107    io: DomainsResponseBody,
108    uc: DomainsResponseBody,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112struct DomainsResponseBody {
113    domains: Box<[Box<str>]>,
114}
115
116static CACHE_MAP: Lazy<DashMap<CacheKey, CacheValue>> = Lazy::new(Default::default);
117static CACHE_FILE_LOCK: Lazy<Mutex<()>> = Lazy::new(Default::default);
118static CACHE_INIT: Lazy<()> = Lazy::new(|| {
119    load_cache().ok();
120});
121
122#[derive(Clone)]
123pub(super) struct HostsQuerier {
124    uc_selector: HostSelector,
125    uc_tries: usize,
126    dotter: Dotter,
127    http_client: Arc<HTTPClient>,
128}
129
130impl HostsQuerier {
131    pub(super) fn new(
132        uc_selector: HostSelector,
133        uc_tries: usize,
134        dotter: Dotter,
135        http_client: Arc<HTTPClient>,
136    ) -> Self {
137        Self {
138            uc_selector,
139            uc_tries,
140            dotter,
141            http_client,
142        }
143    }
144
145    pub(super) fn query_for_io_urls(
146        &self,
147        ak: &str,
148        bucket: &str,
149        use_https: bool,
150    ) -> IOResult<Vec<String>> {
151        Lazy::force(&CACHE_INIT);
152
153        Ok(self
154            .query_for_domains(ak, bucket, use_https)?
155            .hosts
156            .first()
157            .expect("No host in uc query v4 response body")
158            .io
159            .domains
160            .iter()
161            .map(|domain| normalize_domain(domain, use_https))
162            .collect())
163    }
164
165    fn query_for_domains(&self, ak: &str, bucket: &str, use_https: bool) -> IOResult<ResponseBody> {
166        let cache_key = CacheKey::new(ak.into(), bucket.into(), self.uc_selector.all_hosts_crc32());
167
168        let mut modified = false;
169        let cache_value = CACHE_MAP
170            .entry(cache_key.to_owned())
171            .or_try_insert_with(|| {
172                let result = query_for_domains_without_cache(
173                    ak,
174                    bucket,
175                    use_https,
176                    &self.uc_selector,
177                    self.uc_tries,
178                    &self.http_client,
179                    &self.dotter,
180                );
181                if result.is_ok() {
182                    modified = true;
183                }
184                result
185            })?;
186
187        if cache_value.cache_deadline < SystemTime::now() {
188            let ak = ak.to_owned();
189            let bucket = bucket.to_owned();
190            let uc_selector = self.uc_selector.to_owned();
191            let http_client = self.http_client.to_owned();
192            let dotter = self.dotter.to_owned();
193            let uc_tries = self.uc_tries;
194            spawn(move || {
195                let mut modified = false;
196                CACHE_MAP.entry(cache_key).and_modify(|cache_value| {
197                    if cache_value.cache_deadline < SystemTime::now() {
198                        if let Ok(new_cache_value) = query_for_domains_without_cache(
199                            ak,
200                            bucket,
201                            use_https,
202                            &uc_selector,
203                            uc_tries,
204                            &http_client,
205                            &dotter,
206                        ) {
207                            *cache_value = new_cache_value;
208                            modified = true;
209                        }
210                    }
211                });
212                if modified {
213                    let _ = save_cache();
214                }
215            });
216        } else if modified {
217            spawn(save_cache);
218        }
219
220        Ok(cache_value.cached_response_body.to_owned())
221    }
222}
223
224fn query_for_domains_without_cache(
225    ak: impl AsRef<str>,
226    bucket: impl AsRef<str>,
227    use_https: bool,
228    uc_selector: &HostSelector,
229    uc_tries: usize,
230    http_client: &HTTPClient,
231    dotter: &Dotter,
232) -> IOResult<CacheValue> {
233    return query_with_retry(
234        uc_selector,
235        uc_tries,
236        dotter,
237        |host, timeout_power, timeout| {
238            info!(
239                "try to query hosts from {}, ak = {}, bucket = {}",
240                host,
241                ak.as_ref(),
242                bucket.as_ref()
243            );
244
245            let url = Url::parse_with_params(
246                &format!("{}/v4/query", host),
247                &[("ak", ak.as_ref()), ("bucket", bucket.as_ref())],
248            )
249            .map_err(|err| IOError::new(IOErrorKind::InvalidInput, err))
250            .tap_err(|_| {
251                warn!("uc host {} is invalid", host);
252            })?;
253
254            http_client
255                .get(url.to_string())
256                .timeout(timeout)
257                .send()
258                .tap_err(|err| {
259                    if err.is_timeout() {
260                        uc_selector.increase_timeout_power_by(host, timeout_power);
261                    }
262                })
263                .map_err(|err| IOError::new(IOErrorKind::ConnectionAborted, err))
264                .and_then(|resp| {
265                    if resp.status() != StatusCode::OK {
266                        Err(IOError::new(
267                            IOErrorKind::Other,
268                            format!("Unexpected status code {}", resp.status().as_u16()),
269                        ))
270                    } else {
271                        let body = uc_selector.wrap_reader(resp, host, timeout_power);
272                        serde_json::from_reader::<_, ResponseBody>(body)
273                            .map_err(|err| IOError::new(IOErrorKind::BrokenPipe, err))
274                    }
275                })
276                .tap_ok(|body| {
277                    let uc_hosts: Vec<_> = body
278                        .hosts
279                        .first()
280                        .map(|host| {
281                            host.uc
282                                .domains
283                                .iter()
284                                .map(|domain| normalize_domain(domain, use_https))
285                                .collect()
286                        })
287                        .expect("No host in uc query v4 response body");
288                    if !uc_hosts.is_empty() {
289                        uc_selector.set_hosts(uc_hosts);
290                    }
291                })
292                .map(|body| {
293                    let min_ttl = body
294                        .hosts
295                        .iter()
296                        .map(|host| host.ttl)
297                        .min()
298                        .expect("No host in uc query v4 response body");
299                    CacheValue {
300                        cached_response_body: body,
301                        cache_deadline: SystemTime::now() + Duration::from_secs(min_ttl),
302                    }
303                })
304                .tap_ok(|_| {
305                    info!(
306                        "update query cache for ak = {}, bucket = {} is successful",
307                        ak.as_ref(),
308                        bucket.as_ref(),
309                    );
310                })
311                .tap_err(|err| {
312                    warn!(
313                        "failed to query hosts from {}, ak = {}, bucket = {}, err = {:?}",
314                        host,
315                        ak.as_ref(),
316                        bucket.as_ref(),
317                        err
318                    );
319                })
320        },
321    );
322
323    fn query_with_retry<T>(
324        uc_selector: &HostSelector,
325        tries: usize,
326        dotter: &Dotter,
327        mut for_each_host: impl FnMut(&str, usize, Duration) -> IOResult<T>,
328    ) -> IOResult<T> {
329        let mut last_error = None;
330        for _ in 0..tries {
331            let host_info = uc_selector.select_host();
332            let begin_at = Instant::now();
333            match for_each_host(&host_info.host, host_info.timeout_power, host_info.timeout) {
334                Ok(response) => {
335                    uc_selector.reward(&host_info.host);
336                    dotter
337                        .dot(DotType::Http, ApiName::UcV4Query, true, begin_at.elapsed())
338                        .ok();
339                    return Ok(response);
340                }
341                Err(err) => {
342                    let punished = uc_selector.punish(&host_info.host, &err, dotter);
343                    dotter
344                        .dot(DotType::Http, ApiName::UcV4Query, false, begin_at.elapsed())
345                        .ok();
346                    if !punished {
347                        return Err(err);
348                    }
349                    last_error = Some(err);
350                }
351            }
352        }
353        Err(last_error.expect("No UC tries error"))
354    }
355}
356
357const CACHE_FILE_NAME: &str = "query-cache.json";
358const CACHE_TEMPFILE_NAME: &str = "query-cache.tmp.json";
359
360fn load_cache() -> IOResult<()> {
361    let cache_file_path = cache_dir_path_of(CACHE_FILE_NAME)?;
362    match OpenOptions::new().read(true).open(&cache_file_path) {
363        Ok(cache_file) => {
364            let cache: HashMap<CacheKey, CacheValue> = json_from_reader(cache_file)
365                .tap_err(|err| {
366                    warn!(
367                        "Failed to parse cache from cache file {:?}: {}",
368                        cache_file_path, err
369                    )
370                })
371                .map_err(|err| IOError::new(IOErrorKind::Other, err))?;
372            CACHE_MAP.clear();
373            for (key, value) in cache.into_iter() {
374                CACHE_MAP.insert(key, value);
375            }
376        }
377        Err(err) => {
378            info!(
379                "Cache file is failed to open {:?}: {}",
380                cache_file_path, err
381            );
382        }
383    }
384    Ok(())
385}
386
387fn save_cache() -> IOResult<()> {
388    let cache_file_path = cache_dir_path_of(CACHE_FILE_NAME)?;
389    let cache_tempfile_path = cache_dir_path_of(CACHE_TEMPFILE_NAME)?;
390    let cache_file_lock_result = CACHE_FILE_LOCK.try_lock();
391    if cache_file_lock_result.is_err() {
392        info!(
393            "Cache file is locked, cannot save to {:?} now",
394            cache_file_path
395        );
396        return Ok(());
397    }
398    if let Err(err) = _save_cache(&cache_tempfile_path) {
399        warn!("Failed to save cache {:?}: {}", cache_tempfile_path, err);
400    } else {
401        info!("Save cache to {:?} successfully", cache_tempfile_path);
402        if let Err(err) = rename_file(&cache_tempfile_path, &cache_file_path) {
403            warn!(
404                "Failed to move cache file from {:?} to {:?}: {}",
405                cache_tempfile_path, cache_file_path, err
406            );
407        } else {
408            info!(
409                "Move cache from {:?} to {:?} successfully",
410                cache_tempfile_path, cache_file_path
411            );
412        }
413    }
414    return Ok(());
415
416    fn _save_cache(cache_file_path: &Path) -> anyhow::Result<()> {
417        let mut cache_file = OpenOptions::new()
418            .write(true)
419            .create(true)
420            .truncate(true)
421            .open(cache_file_path)?;
422        json_to_writer(&mut cache_file, &*CACHE_MAP)
423            .map_err(|err| IOError::new(IOErrorKind::Other, err))?;
424        Ok(())
425    }
426}
427
428#[inline]
429fn normalize_domain(domain: &str, use_https: bool) -> String {
430    if domain.contains("://") {
431        domain.to_string()
432    } else if use_https {
433        "https://".to_owned() + domain
434    } else {
435        "http://".to_owned() + domain
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::{
442        super::{
443            super::{base::credential::Credential, config::Timeouts},
444            dot::{DotRecordKey, DotRecords, DotRecordsDashMap, DOT_FILE_NAME},
445        },
446        *,
447    };
448    use futures::channel::oneshot::channel;
449    use serde::Serialize;
450    use serde_json::json;
451    use std::{
452        boxed::Box,
453        error::Error,
454        result::Result,
455        sync::{
456            atomic::{AtomicUsize, Ordering::Relaxed},
457            Arc,
458        },
459        thread::sleep,
460    };
461    use tokio::task::{spawn, spawn_blocking};
462    use warp::{
463        http::header::{HeaderValue, AUTHORIZATION},
464        hyper::Body,
465        path,
466        reply::Response,
467        Filter,
468    };
469
470    macro_rules! starts_with_server {
471        ($uc_addr:ident, $monitor_addr:ident, $uc_routes:ident, $monitor_routes:ident, $code:block) => {{
472            let (uc_tx, uc_rx) = channel();
473            let (monitor_tx, monitor_rx) = channel();
474            let ($uc_addr, uc_server) = warp::serve($uc_routes).bind_with_graceful_shutdown(
475                ([127, 0, 0, 1], 0),
476                async move {
477                    uc_rx.await.unwrap();
478                },
479            );
480            let ($monitor_addr, monitor_server) = warp::serve($monitor_routes)
481                .bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
482                    monitor_rx.await.unwrap();
483                });
484            spawn(uc_server);
485            spawn(monitor_server);
486            $code;
487            uc_tx.send(()).unwrap();
488            monitor_tx.send(()).unwrap();
489        }};
490    }
491
492    #[derive(Deserialize, Serialize)]
493    struct UcQueryParams {
494        ak: String,
495        bucket: String,
496    }
497
498    const ACCESS_KEY: &str = "0123456789001234567890";
499    const SECRET_KEY: &str = "abcdefghijklmnioqrstuv";
500    const BUCKET_NAME: &str = "test-bucket";
501
502    fn get_credential() -> Credential {
503        Credential::new(ACCESS_KEY, SECRET_KEY)
504    }
505
506    #[tokio::test]
507    async fn test_uc_query_v4() -> Result<(), Box<dyn Error>> {
508        env_logger::try_init().ok();
509
510        CACHE_MAP.clear();
511        clear_cache()?;
512
513        let uc_routes = path!("v4" / "query")
514            .and(warp::query::<UcQueryParams>())
515            .map(|params: UcQueryParams| {
516                assert_eq!(&params.ak, ACCESS_KEY);
517                assert_eq!(&params.bucket, BUCKET_NAME);
518                Response::new(
519                    json!({
520                        "hosts": [{
521                            "region": "z0",
522                            "ttl":10,
523                            "io": {
524                              "domains": [
525                                "iovip.qbox.me"
526                              ]
527                            },
528                            "uc": {
529                              "domains": [
530                                "uc.qbox.me"
531                              ]
532                            }
533                        }]
534                    })
535                    .to_string()
536                    .into(),
537                )
538            });
539
540        let monitor_called = Arc::new(AtomicUsize::new(0));
541        let monitor_routes = {
542            let monitor_called = monitor_called.to_owned();
543            path!("v1" / "stat")
544                .and(warp::header::value(AUTHORIZATION.as_str()))
545                .and(warp::body::json())
546                .map(move |authorization: HeaderValue, records: DotRecords| {
547                    monitor_called.fetch_add(1, Relaxed);
548                    assert!(authorization.to_str().unwrap().starts_with("UpToken "));
549                    assert_eq!(records.records().len(), 1);
550                    let record = records.records().first().unwrap();
551                    assert_eq!(record.dot_type(), Some(DotType::Http));
552                    assert_eq!(record.api_name(), Some(ApiName::UcV4Query));
553                    assert_eq!(record.success_count(), Some(1));
554                    assert_eq!(record.failed_count(), Some(0));
555                    Response::new(Body::empty())
556                })
557        };
558        starts_with_server!(uc_addr, monitor_addr, uc_routes, monitor_routes, {
559            spawn_blocking(move || -> IOResult<()> {
560                let dotter = Dotter::new(
561                    Timeouts::default_http_client(),
562                    get_credential(),
563                    BUCKET_NAME.to_owned(),
564                    vec!["http://".to_owned() + &monitor_addr.to_string()],
565                    Some(Duration::from_millis(0)),
566                    Some(1),
567                    None,
568                    None,
569                    None,
570                    None,
571                    None,
572                );
573                let host_selector =
574                    HostSelector::builder(vec!["http://".to_owned() + &uc_addr.to_string()])
575                        .build();
576                let querier =
577                    HostsQuerier::new(host_selector, 1, dotter, Timeouts::default_http_client());
578                let io_urls = querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
579                assert_eq!(&io_urls, &["http://iovip.qbox.me".to_owned()]);
580                assert_eq!(
581                    &querier.uc_selector.hosts(),
582                    &["http://uc.qbox.me".to_owned()]
583                );
584                assert_eq!(&querier.uc_selector.select_host().host, "http://uc.qbox.me");
585                sleep(Duration::from_secs(5));
586                assert_eq!(monitor_called.load(Relaxed), 1);
587                Ok(())
588            })
589            .await??;
590        });
591        Ok(())
592    }
593
594    #[tokio::test]
595    async fn test_uc_query_v4_with_cache() -> Result<(), Box<dyn Error>> {
596        env_logger::try_init().ok();
597
598        CACHE_MAP.clear();
599        clear_cache()?;
600
601        let uc_called = Arc::new(AtomicUsize::new(0));
602        let records_map = Arc::new(DotRecordsDashMap::default());
603
604        let uc_routes = {
605            let uc_called = uc_called.to_owned();
606            path!("v4" / "query")
607                .and(warp::query::<UcQueryParams>())
608                .map(move |params: UcQueryParams| {
609                    uc_called.fetch_add(1, Relaxed);
610                    assert_eq!(&params.ak, ACCESS_KEY);
611                    assert_eq!(&params.bucket, BUCKET_NAME);
612                    Response::new(
613                        json!({
614                            "hosts": [{
615                                "region": "z0",
616                                "ttl":1,
617                                "io": {
618                                  "domains": [
619                                    "iovip.qbox.me"
620                                  ]
621                                },
622                                "uc": {
623                                  "domains": []
624                                }
625                            }]
626                        })
627                        .to_string()
628                        .into(),
629                    )
630                })
631        };
632        let monitor_routes = {
633            let records_map = records_map.to_owned();
634            path!("v1" / "stat")
635                .and(warp::header::value(AUTHORIZATION.as_str()))
636                .and(warp::body::json())
637                .map(move |authorization: HeaderValue, records: DotRecords| {
638                    assert!(authorization.to_str().unwrap().starts_with("UpToken "));
639                    records_map.merge_with_records(records);
640                    Response::new(Body::empty())
641                })
642        };
643
644        starts_with_server!(uc_addr, monitor_addr, uc_routes, monitor_routes, {
645            spawn_blocking(move || -> IOResult<()> {
646                let dotter = Dotter::new(
647                    Timeouts::default_http_client(),
648                    get_credential(),
649                    BUCKET_NAME.to_owned(),
650                    vec!["http://".to_owned() + &monitor_addr.to_string()],
651                    Some(Duration::from_millis(0)),
652                    Some(1),
653                    None,
654                    None,
655                    None,
656                    None,
657                    None,
658                );
659                let host_selector =
660                    HostSelector::builder(vec!["http://".to_owned() + &uc_addr.to_string()])
661                        .build();
662                let hosts_querier =
663                    HostsQuerier::new(host_selector, 1, dotter, Timeouts::default_http_client());
664                let mut io_urls =
665                    hosts_querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
666                assert_eq!(io_urls, vec!["http://iovip.qbox.me".to_owned()]);
667                assert_eq!(uc_called.load(Relaxed), 1);
668
669                io_urls = hosts_querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
670                assert_eq!(io_urls, vec!["http://iovip.qbox.me".to_owned()]);
671                assert_eq!(uc_called.load(Relaxed), 1);
672
673                sleep(Duration::from_secs(3));
674
675                io_urls = hosts_querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
676                assert_eq!(io_urls, vec!["http://iovip.qbox.me".to_owned()]);
677                assert_eq!(uc_called.load(Relaxed), 1);
678
679                sleep(Duration::from_secs(3));
680                assert_eq!(uc_called.load(Relaxed), 2);
681
682                CACHE_MAP.clear();
683                load_cache().ok();
684
685                io_urls = hosts_querier.query_for_io_urls(ACCESS_KEY, BUCKET_NAME, false)?;
686                assert_eq!(io_urls, vec!["http://iovip.qbox.me".to_owned()]);
687                assert_eq!(uc_called.load(Relaxed), 2);
688
689                sleep(Duration::from_secs(5));
690                {
691                    let record = records_map
692                        .get(&DotRecordKey::new(DotType::Http, ApiName::UcV4Query))
693                        .unwrap();
694                    assert_eq!(record.success_count(), Some(3));
695                    assert_eq!(record.failed_count(), Some(0));
696                }
697
698                Ok(())
699            })
700            .await??;
701        });
702        Ok(())
703    }
704
705    fn clear_cache() -> IOResult<()> {
706        let cache_file_path = cache_dir_path_of(CACHE_FILE_NAME)?;
707        std::fs::remove_file(cache_file_path).or_else(|err| {
708            if err.kind() == IOErrorKind::NotFound {
709                Ok(())
710            } else {
711                Err(err)
712            }
713        })?;
714        let dot_file_path = cache_dir_path_of(DOT_FILE_NAME)?;
715        std::fs::remove_file(dot_file_path).or_else(|err| {
716            if err.kind() == IOErrorKind::NotFound {
717                Ok(())
718            } else {
719                Err(err)
720            }
721        })?;
722        Ok(())
723    }
724}