qiniu_download/async_api/
dot.rs

1use super::{
2    super::base::{
3        credential::Credential, upload_policy::UploadPolicy, upload_token::sign_upload_token,
4    },
5    cache_dir::cache_dir_path_of,
6    host_selector::{HostInfo, HostSelector, PunishResult},
7};
8use fd_lock::RwLock as FdRwLock;
9use futures::future::join_all;
10use log::{debug, info, warn};
11use reqwest::{header::AUTHORIZATION, Client as HttpClient, StatusCode};
12use scc::HashMap;
13use serde::{de::Error as DeserializeError, Deserialize, Serialize};
14use serde_json::Value as JSONValue;
15use std::{
16    collections::HashMap as StdHashMap,
17    convert::TryFrom,
18    fmt::{self, Debug},
19    future::Future,
20    io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
21    ops::Deref,
22    sync::{
23        atomic::{AtomicBool, Ordering::Relaxed},
24        Arc,
25    },
26    time::{Duration, Instant, SystemTime},
27};
28use tap::prelude::*;
29use tokio::{
30    fs::{File, OpenOptions},
31    io::{AsyncBufReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
32    spawn,
33    sync::Mutex,
34};
35
36static DOTTING_DISABLED: AtomicBool = AtomicBool::new(false);
37
38/// 禁止打点功能
39
40pub fn disable_dotting() {
41    DOTTING_DISABLED.store(true, Relaxed)
42}
43
44/// 启用打点功能
45
46pub fn enable_dotting() {
47    DOTTING_DISABLED.store(false, Relaxed)
48}
49
50/// 打点功能是否启用
51
52pub fn is_dotting_disabled() -> bool {
53    DOTTING_DISABLED.load(Relaxed)
54}
55
56static DOT_UPLOADING_DISABLED: AtomicBool = AtomicBool::new(false);
57
58/// 禁止打点上传功能
59
60pub fn disable_dot_uploading() {
61    DOT_UPLOADING_DISABLED.store(true, Relaxed)
62}
63
64/// 启用打点上传功能
65
66pub fn enable_dot_uploading() {
67    DOT_UPLOADING_DISABLED.store(false, Relaxed)
68}
69
70/// 打点上传功能是否启用
71
72pub fn is_dot_uploading_disabled() -> bool {
73    DOT_UPLOADING_DISABLED.load(Relaxed)
74}
75
76#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
77#[serde(rename_all = "lowercase")]
78pub(super) enum DotType {
79    Sdk,
80    Http,
81}
82
83impl fmt::Display for DotType {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        match self {
86            Self::Http => write!(f, "http"),
87            Self::Sdk => write!(f, "sdk"),
88        }
89    }
90}
91
92#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
93#[serde(rename_all = "snake_case")]
94pub(super) enum ApiName {
95    IoGetfile,
96    MonitorV1Stat,
97    UcV4Query,
98    RangeReaderReadAt,
99    RangeReaderReadMultiRanges,
100    RangeReaderExist,
101    RangeReaderFileSize,
102    RangeReaderDownloadTo,
103    RangeReaderReadLastBytes,
104}
105
106impl fmt::Display for ApiName {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        match self {
109            Self::IoGetfile => write!(f, "io_getfile"),
110            Self::MonitorV1Stat => write!(f, "monitor_v1_stat"),
111            Self::UcV4Query => write!(f, "uc_v4_query"),
112            Self::RangeReaderReadAt => write!(f, "range_reader_read_at"),
113            Self::RangeReaderReadMultiRanges => write!(f, "range_reader_read_multi_ranges"),
114            Self::RangeReaderExist => write!(f, "range_reader_exist"),
115            Self::RangeReaderFileSize => write!(f, "range_reader_file_size"),
116            Self::RangeReaderDownloadTo => write!(f, "range_reader_download_to"),
117            Self::RangeReaderReadLastBytes => write!(f, "range_reader_read_last_bytes"),
118        }
119    }
120}
121
122#[derive(Clone, Debug, Default)]
123pub(super) struct Dotter {
124    inner: Option<Arc<DotterInner>>,
125}
126
127struct DotterInner {
128    credential: Credential,
129    bucket: String,
130    monitor_selector: HostSelector,
131    buffered_records: AsyncDotRecordsMap,
132    buffered_file: Mutex<FdRwLock<File>>,
133    interval: Duration,
134    uploaded_at: Instant,
135    max_buffer_size: u64,
136    tries: usize,
137    http_client: Arc<HttpClient>,
138}
139
140impl Debug for DotterInner {
141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142        f.debug_struct("DotterInner")
143            .field("credential", &self.credential)
144            .field("bucket", &self.bucket)
145            .field("monitor_selector", &self.monitor_selector)
146            .field("buffered_file", &self.buffered_file)
147            .field("interval", &self.interval)
148            .field("uploaded_at", &self.uploaded_at)
149            .field("max_buffer_size", &self.max_buffer_size)
150            .field("tries", &self.tries)
151            .field("http_client", &self.http_client)
152            .finish()
153    }
154}
155
156pub(super) const DOT_FILE_NAME: &str = "dot-file";
157
158impl Dotter {
159    #[allow(clippy::too_many_arguments)]
160    pub(super) async fn new(
161        http_client: Arc<HttpClient>,
162        credential: Credential,
163        bucket: String,
164        monitor_urls: Vec<String>,
165        interval: Option<Duration>,
166        max_buffer_size: Option<u64>,
167        tries: Option<usize>,
168        punish_duration: Option<Duration>,
169        max_punished_times: Option<usize>,
170        max_punished_hosts_percent: Option<u8>,
171        base_timeout: Option<Duration>,
172    ) -> Dotter {
173        if !monitor_urls.is_empty() {
174            if let Ok(buffered_file_path) = cache_dir_path_of(DOT_FILE_NAME).await {
175                if let Ok(buffer_file) = OpenOptions::new()
176                    .create(true)
177                    .write(true)
178                    .append(true)
179                    .open(&buffered_file_path)
180                    .await
181                {
182                    let monitor_selector = HostSelector::builder(monitor_urls)
183                        .punish_duration(punish_duration.unwrap_or_else(|| Duration::from_secs(30)))
184                        .max_punished_times(max_punished_times.unwrap_or(5))
185                        .max_punished_hosts_percent(max_punished_hosts_percent.unwrap_or(50))
186                        .base_timeout(base_timeout.unwrap_or_else(|| Duration::from_secs(1)))
187                        .build()
188                        .await;
189                    return Self {
190                        inner: Some(Arc::new(DotterInner {
191                            credential,
192                            bucket,
193                            monitor_selector,
194                            http_client,
195                            buffered_records: Default::default(),
196                            buffered_file: Mutex::new(FdRwLock::new(buffer_file)),
197                            interval: interval.unwrap_or_else(|| Duration::from_secs(10)),
198                            uploaded_at: Instant::now(),
199                            max_buffer_size: max_buffer_size.unwrap_or(1 << 20),
200                            tries: tries.unwrap_or(10),
201                        })),
202                    };
203                }
204            }
205        }
206        Self { inner: None }
207    }
208
209    pub(super) async fn dot(
210        &self,
211        dot_type: DotType,
212        api_name: ApiName,
213        successful: bool,
214        elapsed_duration: Duration,
215    ) -> IoResult<()> {
216        if is_dotting_disabled() {
217            debug!("dotting is disabled")
218        } else if let Some(inner) = self.inner.as_ref() {
219            inner
220                .fast_dot(dot_type, api_name, successful, elapsed_duration)
221                .await;
222            inner
223                .lock_buffered_file(|mut buffered_file| async move {
224                    inner.flush_to_file(&mut buffered_file).await?;
225                    if inner.is_time_to_upload(&buffered_file).await? {
226                        self.async_upload();
227                    }
228                    Ok(())
229                })
230                .await?;
231        }
232        Ok(())
233    }
234
235    pub(super) async fn punish(&self) -> IoResult<()> {
236        if is_dotting_disabled() {
237            debug!("dotting is disabled")
238        } else if let Some(inner) = self.inner.as_ref() {
239            inner.fast_punish().await;
240            inner
241                .lock_buffered_file(|mut buffered_file| async move {
242                    inner.flush_to_file(&mut buffered_file).await?;
243                    if inner.is_time_to_upload(&buffered_file).await? {
244                        self.async_upload();
245                    }
246                    Ok(())
247                })
248                .await?;
249        }
250        Ok(())
251    }
252
253    fn async_upload(&self) {
254        if let Some(inner) = self.inner.as_ref() {
255            let inner = inner.to_owned();
256            spawn(async move {
257                let inner2 = inner.to_owned();
258                inner
259                    .lock_buffered_file(|buffered_file| async move {
260                        if inner2.is_time_to_upload(&buffered_file).await? {
261                            inner2.do_upload().await?;
262                        }
263                        Ok(())
264                    })
265                    .await
266            });
267        }
268    }
269}
270
271impl DotterInner {
272    async fn fast_dot(
273        &self,
274        dot_type: DotType,
275        api_name: ApiName,
276        successful: bool,
277        elapsed_duration: Duration,
278    ) {
279        let record = if successful {
280            DotRecord::new(
281                dot_type,
282                api_name,
283                1,
284                Default::default(),
285                elapsed_duration.as_millis(),
286                Default::default(),
287            )
288        } else {
289            DotRecord::new(
290                dot_type,
291                api_name,
292                Default::default(),
293                1,
294                Default::default(),
295                elapsed_duration.as_millis(),
296            )
297        };
298        self.buffered_records.merge_with_record(record).await;
299    }
300
301    async fn fast_punish(&self) {
302        self.buffered_records
303            .merge_with_record(DotRecord::punished())
304            .await;
305    }
306
307    async fn flush_to_file(&self, buffered_file: &mut File) -> IoResult<()> {
308        let buffered_file = Arc::new(Mutex::new(BufWriter::new(buffered_file)));
309        {
310            let mut futures = vec![];
311            self.buffered_records
312                .scan_async(|key, record| {
313                    let key = key.to_owned();
314                    let record = record.to_owned();
315                    let buffered_file = buffered_file.to_owned();
316                    futures.push(async move {
317                        if write_to_file(&record, &mut *buffered_file.lock().await)
318                            .await
319                            .is_ok()
320                        {
321                            Some(key)
322                        } else {
323                            None
324                        }
325                    })
326                })
327                .await;
328            for key in join_all(futures).await.into_iter().flatten() {
329                self.buffered_records.remove_async(&key).await;
330            }
331        }
332
333        Arc::try_unwrap(buffered_file)
334            .unwrap()
335            .into_inner()
336            .flush()
337            .await?;
338
339        return Ok(());
340
341        async fn write_to_file<W: AsyncWrite + Unpin>(
342            record: &DotRecord,
343            file: &mut W,
344        ) -> anyhow::Result<()> {
345            let mut line = serde_json::to_string(record)?;
346            line.push('\n');
347            file.write_all(line.as_bytes())
348                .await
349                .tap_err(|err| warn!("the dot file is failed to write: {:?}", err))?;
350            Ok(())
351        }
352    }
353
354    async fn is_time_to_upload(&self, buffered_file: &File) -> IoResult<bool> {
355        if is_dotting_disabled() || is_dot_uploading_disabled() {
356            debug!("dot uploading is disabled, will not upload the dot file now");
357            return Ok(false);
358        }
359        let result = self.uploaded_at.elapsed() > self.interval
360            || buffered_file
361                .metadata()
362                .await
363                .tap_err(|err| warn!("stat the dot file error: {:?}", err))?
364                .len()
365                > self.max_buffer_size;
366        if !result {
367            debug!("dot uploading condition is not satisfied")
368        }
369        Ok(result)
370    }
371
372    async fn do_upload(&self) -> IoResult<()> {
373        self.upload_with_retry(|host_info| async move {
374            let mut buffered_file = OpenOptions::new()
375                .read(true)
376                .write(true)
377                .open(&cache_dir_path_of(DOT_FILE_NAME).await?)
378                .await?;
379            let url = format!("{}/v1/stat", host_info.host());
380            debug!("try to upload dots to {}", url);
381            let uptoken = sign_upload_token(
382                &self.credential,
383                &UploadPolicy::new_for_bucket(
384                    self.bucket.to_owned(),
385                    SystemTime::now() + Duration::from_secs(30),
386                ),
387            );
388            let begin_at = Instant::now();
389            let response_result = self
390                .http_client
391                .post(&url)
392                .header(AUTHORIZATION, format!("UpToken {}", uptoken))
393                .json(&self.make_request_body(&mut buffered_file).await?)
394                .timeout(host_info.timeout())
395                .send()
396                .await;
397            if let Err(err) = &response_result {
398                if err.is_timeout() {
399                    self.monitor_selector
400                        .increase_timeout_power_by(host_info.host(), host_info.timeout_power())
401                        .await;
402                }
403            }
404            let response_result = response_result
405                .map_err(|err| IoError::new(IoErrorKind::ConnectionAborted, err))
406                .and_then(|resp| {
407                    if resp.status() != StatusCode::OK {
408                        Err(IoError::new(
409                            IoErrorKind::Other,
410                            format!("Unexpected status code {}", resp.status().as_u16()),
411                        ))
412                    } else {
413                        Ok(())
414                    }
415                });
416            self.fast_dot(
417                DotType::Http,
418                ApiName::MonitorV1Stat,
419                response_result.is_ok(),
420                begin_at.elapsed(),
421            )
422            .await;
423            response_result
424                .tap_ok(|_| info!("upload dots succeed"))
425                .tap_err(|err| warn!("failed to upload dots: {:?}", err))?;
426            buffered_file.set_len(0).await?;
427            Ok(())
428        })
429        .await?;
430        Ok(())
431    }
432
433    async fn make_request_body(&self, buffered_file: &mut File) -> IoResult<DotRecords> {
434        buffered_file.seek(SeekFrom::Start(0)).await?;
435        let file_reader = BufReader::new(buffered_file);
436        let mut lines = file_reader.lines();
437        let mut map = DotRecordsMap::default();
438
439        while let Some(line) = lines.next_line().await? {
440            if line.is_empty() {
441                continue;
442            }
443            if let Ok(record) = serde_json::from_str::<DotRecord>(&line) {
444                map.merge_with_record(record);
445            }
446        }
447        Ok(map.into_records())
448    }
449
450    async fn upload_with_retry<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = IoResult<()>>>(
451        &self,
452        mut for_each_host: F,
453    ) -> IoResult<()> {
454        let mut last_error = None;
455        for _ in 0..self.tries {
456            // 允许选择重复的节点,因为生产环境上可能只有一台 kodomonitor,只能选它
457            if let Some(host_info) = self.monitor_selector.select_host(&Default::default()).await {
458                match for_each_host(host_info.to_owned()).await {
459                    Ok(response) => {
460                        self.monitor_selector.reward(host_info.host()).await;
461                        return Ok(response);
462                    }
463                    Err(err) => {
464                        let punished_result = self
465                            .monitor_selector
466                            .punish_without_dotter(host_info.host(), &err)
467                            .await;
468                        match punished_result {
469                            PunishResult::NoPunishment => {
470                                return Err(err);
471                            }
472                            PunishResult::PunishedAndFreezed => {
473                                self.fast_punish().await;
474                            }
475                            PunishResult::Punished => {}
476                        }
477                        last_error = Some(err);
478                    }
479                }
480            } else {
481                break;
482            }
483        }
484        last_error.map(Err).unwrap_or(Ok(()))
485    }
486
487    #[cfg(not(test))]
488    async fn lock_buffered_file<F: FnOnce(File) -> Fut, Fut: Future<Output = IoResult<()>>>(
489        &self,
490        f: F,
491    ) -> IoResult<()> {
492        if let Ok(mut buffered_file) = self.buffered_file.try_lock() {
493            loop {
494                match buffered_file.try_write() {
495                    Ok(buffered_file) => {
496                        let buffered_file = buffered_file.try_clone().await?;
497                        return f(buffered_file).await;
498                    }
499                    Err(err) if err.kind() == IoErrorKind::WouldBlock => {
500                        debug!("the dot file is locked");
501                        return Ok(());
502                    }
503                    Err(err) if err.kind() == IoErrorKind::Interrupted => {
504                        continue;
505                    }
506                    Err(err) => {
507                        warn!("lock the dot file error: {:?}", err);
508                        return Err(err);
509                    }
510                }
511            }
512        } else {
513            debug!("the dot file is locked");
514        }
515        Ok(())
516    }
517
518    #[cfg(test)]
519    async fn lock_buffered_file<F: FnOnce(File) -> T, T: Future<Output = IoResult<()>>>(
520        &self,
521        f: F,
522    ) -> IoResult<()> {
523        let mut buffered_file = self.buffered_file.lock().await;
524        loop {
525            match buffered_file.write() {
526                Ok(buffered_file) => {
527                    let buffered_file = buffered_file.try_clone().await?;
528                    return f(buffered_file).await;
529                }
530                Err(err) if err.kind() == IoErrorKind::Interrupted => {
531                    continue;
532                }
533                Err(err) => {
534                    warn!("lock the dot file error: {:?}", err);
535                    return Err(err);
536                }
537            }
538        }
539    }
540}
541
542#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
543#[serde(untagged)]
544pub(super) enum DotRecordKey {
545    APICalls {
546        dot_type: DotType,
547        api_name: ApiName,
548    },
549    PunishedCount,
550}
551
552impl DotRecordKey {
553    pub(super) fn new(dot_type: DotType, api_name: ApiName) -> Self {
554        Self::APICalls { dot_type, api_name }
555    }
556
557    pub(super) fn punished() -> Self {
558        Self::PunishedCount
559    }
560}
561
562#[derive(Serialize, Clone, Debug)]
563#[serde(untagged)]
564pub(super) enum DotRecord {
565    APICalls(APICallsDotRecord),
566    PunishedCount(PunishedCountDotRecord),
567}
568
569#[derive(Serialize, Deserialize, Clone, Debug)]
570pub(super) struct APICallsDotRecord {
571    #[serde(rename = "type")]
572    dot_type: DotType,
573
574    api_name: ApiName,
575    success_count: usize,
576    success_avg_elapsed_duration: u128,
577    failed_count: usize,
578    failed_avg_elapsed_duration: u128,
579}
580
581#[derive(Serialize, Deserialize, Clone, Debug)]
582pub(super) struct PunishedCountDotRecord {
583    punished_count: usize,
584}
585
586impl DotRecord {
587    fn new(
588        dot_type: DotType,
589        api_name: ApiName,
590        success_count: usize,
591        failed_count: usize,
592        success_avg_elapsed_duration: u128,
593        failed_avg_elapsed_duration: u128,
594    ) -> Self {
595        Self::APICalls(APICallsDotRecord {
596            dot_type,
597            api_name,
598            success_count,
599            success_avg_elapsed_duration,
600            failed_count,
601            failed_avg_elapsed_duration,
602        })
603    }
604
605    fn punished() -> Self {
606        Self::PunishedCount(PunishedCountDotRecord { punished_count: 1 })
607    }
608
609    pub(super) fn key(&self) -> DotRecordKey {
610        match self {
611            Self::APICalls(record) => DotRecordKey::new(record.dot_type, record.api_name),
612            Self::PunishedCount(_) => DotRecordKey::punished(),
613        }
614    }
615
616    #[cfg(test)]
617
618    pub(super) fn dot_type(&self) -> Option<DotType> {
619        match self {
620            Self::APICalls(record) => Some(record.dot_type),
621            _ => None,
622        }
623    }
624
625    #[cfg(test)]
626
627    pub(super) fn api_name(&self) -> Option<ApiName> {
628        match self {
629            Self::APICalls(record) => Some(record.api_name),
630            _ => None,
631        }
632    }
633
634    #[cfg(test)]
635
636    pub(super) fn success_count(&self) -> Option<usize> {
637        match self {
638            Self::APICalls(record) => Some(record.success_count),
639            _ => None,
640        }
641    }
642
643    #[cfg(test)]
644
645    pub(super) fn success_avg_elapsed_duration_ms(&self) -> Option<u128> {
646        match self {
647            Self::APICalls(record) => Some(record.success_avg_elapsed_duration),
648            _ => None,
649        }
650    }
651
652    #[cfg(test)]
653
654    pub(super) fn failed_count(&self) -> Option<usize> {
655        match self {
656            Self::APICalls(record) => Some(record.failed_count),
657            _ => None,
658        }
659    }
660
661    #[cfg(test)]
662
663    pub(super) fn failed_avg_elapsed_duration_ms(&self) -> Option<u128> {
664        match self {
665            Self::APICalls(record) => Some(record.failed_avg_elapsed_duration),
666            _ => None,
667        }
668    }
669
670    #[cfg(test)]
671
672    pub(super) fn punished_count(&self) -> Option<usize> {
673        match self {
674            Self::PunishedCount(record) => Some(record.punished_count),
675            _ => None,
676        }
677    }
678}
679
680impl<'de> Deserialize<'de> for DotRecord {
681    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
682    where
683        D: serde::Deserializer<'de>,
684    {
685        let value = JSONValue::deserialize(deserializer)?;
686        if let Ok(record) = APICallsDotRecord::deserialize(&value) {
687            Ok(Self::APICalls(record))
688        } else {
689            PunishedCountDotRecord::deserialize(&value)
690                .map(Self::PunishedCount)
691                .map_err(DeserializeError::custom)
692        }
693    }
694}
695
696#[derive(Serialize, Deserialize, Default, Debug)]
697pub(super) struct DotRecords {
698    #[serde(rename = "logs")]
699    records: Vec<DotRecord>,
700}
701
702impl DotRecords {
703    #[cfg(test)]
704
705    pub(super) fn records(&self) -> &[DotRecord] {
706        self.records.as_ref()
707    }
708}
709
710#[derive(Debug, Clone, Default)]
711pub(super) struct DotRecordsMap(StdHashMap<DotRecordKey, DotRecord>);
712
713impl DotRecordsMap {
714    #[allow(dead_code)]
715    pub(super) fn merge_with_record(&mut self, record: DotRecord) {
716        self.0
717            .entry(record.key())
718            .and_modify(|mut r| match (&mut r, &record) {
719                (DotRecord::APICalls(r), DotRecord::APICalls(record)) => {
720                    let success_elapsed_duration_total = r.success_avg_elapsed_duration
721                        * to_u128(r.success_count)
722                        + record.success_avg_elapsed_duration * to_u128(record.success_count);
723                    let failed_elapsed_duration_total = r.failed_avg_elapsed_duration
724                        * to_u128(r.failed_count)
725                        + record.failed_avg_elapsed_duration * to_u128(record.failed_count);
726                    r.success_count += record.success_count;
727                    r.failed_count += record.failed_count;
728                    r.success_avg_elapsed_duration = if r.success_count > 0 {
729                        success_elapsed_duration_total / to_u128(r.success_count)
730                    } else {
731                        0
732                    };
733                    r.failed_avg_elapsed_duration = if r.failed_count > 0 {
734                        failed_elapsed_duration_total / to_u128(r.failed_count)
735                    } else {
736                        0
737                    };
738                }
739                (DotRecord::PunishedCount(r), DotRecord::PunishedCount(record)) => {
740                    r.punished_count += record.punished_count;
741                }
742                _ => panic!("Impossible merge with {:?} and {:?}", r, record),
743            })
744            .or_insert(record);
745
746        fn to_u128(v: usize) -> u128 {
747            u128::try_from(v).unwrap_or(u128::MAX)
748        }
749    }
750
751    #[allow(dead_code)]
752    pub(super) fn merge_with_records(&mut self, records: DotRecords) {
753        for record in records.records.into_iter() {
754            self.merge_with_record(record);
755        }
756    }
757
758    #[allow(dead_code)]
759    pub(super) fn into_records(self) -> DotRecords {
760        DotRecords {
761            records: self.0.into_values().collect(),
762        }
763    }
764}
765
766impl Deref for DotRecordsMap {
767    type Target = StdHashMap<DotRecordKey, DotRecord>;
768
769    fn deref(&self) -> &Self::Target {
770        &self.0
771    }
772}
773
774#[derive(Default)]
775pub(super) struct AsyncDotRecordsMap(HashMap<DotRecordKey, DotRecord>);
776
777impl AsyncDotRecordsMap {
778    #[allow(dead_code)]
779    pub(super) async fn merge_with_record(&self, record: DotRecord) {
780        self.0
781            .entry_async(record.key())
782            .await
783            .and_modify(|mut r| match (&mut r, &record) {
784                (DotRecord::APICalls(r), DotRecord::APICalls(record)) => {
785                    let success_elapsed_duration_total = r.success_avg_elapsed_duration
786                        * to_u128(r.success_count)
787                        + record.success_avg_elapsed_duration * to_u128(record.success_count);
788                    let failed_elapsed_duration_total = r.failed_avg_elapsed_duration
789                        * to_u128(r.failed_count)
790                        + record.failed_avg_elapsed_duration * to_u128(record.failed_count);
791                    r.success_count += record.success_count;
792                    r.failed_count += record.failed_count;
793                    r.success_avg_elapsed_duration = if r.success_count > 0 {
794                        success_elapsed_duration_total / to_u128(r.success_count)
795                    } else {
796                        0
797                    };
798                    r.failed_avg_elapsed_duration = if r.failed_count > 0 {
799                        failed_elapsed_duration_total / to_u128(r.failed_count)
800                    } else {
801                        0
802                    };
803                }
804                (DotRecord::PunishedCount(r), DotRecord::PunishedCount(record)) => {
805                    r.punished_count += record.punished_count;
806                }
807                _ => panic!("Impossible merge with {:?} and {:?}", r, record),
808            })
809            .or_insert_with(|| record.to_owned());
810
811        fn to_u128(v: usize) -> u128 {
812            u128::try_from(v).unwrap_or(u128::MAX)
813        }
814    }
815
816    #[allow(dead_code)]
817    pub(super) async fn merge_with_records(&self, records: DotRecords) {
818        for record in records.records.into_iter() {
819            self.merge_with_record(record).await;
820        }
821    }
822
823    #[allow(dead_code)]
824    pub(super) async fn into_records(self) -> DotRecords {
825        let mut records = Vec::new();
826        self.0
827            .scan_async(|_, record| {
828                records.push(record.to_owned());
829            })
830            .await;
831        DotRecords { records }
832    }
833}
834
835impl Deref for AsyncDotRecordsMap {
836    type Target = HashMap<DotRecordKey, DotRecord>;
837
838    fn deref(&self) -> &Self::Target {
839        &self.0
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use super::*;
846    use crate::config::Timeouts;
847    use futures::channel::oneshot::channel;
848    use futures::future::join_all;
849    use std::{error::Error, sync::atomic::AtomicUsize};
850    use tokio::{fs::remove_file, task::spawn, time::sleep};
851    use warp::{http::HeaderValue, hyper::Body, path, reply::Response, Filter};
852
853    macro_rules! starts_with_server {
854        ($addr:ident, $routes:ident, $code:block) => {{
855            let (tx, rx) = channel();
856            let ($addr, server) =
857                warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
858                    rx.await.unwrap();
859                });
860            spawn(server);
861            sleep(Duration::from_secs(1)).await;
862            $code;
863            tx.send(()).unwrap();
864        }};
865    }
866
867    const ACCESS_KEY: &str = "1234567890";
868    const SECRET_KEY: &str = "abcdefghijk";
869    const BUCKET_NAME: &str = "test-bucket";
870
871    mod guard {
872        use super::{disable_dotting, enable_dotting, is_dotting_disabled};
873        pub(super) struct DottingDisableGuard {
874            enabled_before: bool,
875        }
876
877        impl DottingDisableGuard {
878            pub(super) fn new() -> Self {
879                let disabled_before = is_dotting_disabled();
880                if !disabled_before {
881                    disable_dotting();
882                }
883                DottingDisableGuard {
884                    enabled_before: !disabled_before,
885                }
886            }
887        }
888
889        impl Drop for DottingDisableGuard {
890            fn drop(&mut self) {
891                if self.enabled_before {
892                    enable_dotting();
893                }
894            }
895        }
896    }
897    use guard::DottingDisableGuard;
898
899    fn get_credential() -> Credential {
900        Credential::new(ACCESS_KEY, SECRET_KEY)
901    }
902
903    #[tokio::test]
904    async fn test_dotter_dot_nothing() -> Result<(), Box<dyn Error>> {
905        env_logger::try_init().ok();
906        clear_cache().await?;
907
908        let called = Arc::new(AtomicUsize::new(0));
909        let routes = {
910            let called = called.to_owned();
911            path!("v1" / "stat").map(move || {
912                called.fetch_add(1, Relaxed);
913                Response::new(Body::empty())
914            })
915        };
916
917        starts_with_server!(addr, routes, {
918            let dotter = Dotter::new(
919                Timeouts::default_async_http_client(),
920                get_credential(),
921                BUCKET_NAME.to_owned(),
922                vec![],
923                None,
924                None,
925                None,
926                None,
927                None,
928                None,
929                None,
930            )
931            .await;
932            assert!(dotter.inner.is_none());
933            dotter
934                .dot(
935                    DotType::Http,
936                    ApiName::IoGetfile,
937                    true,
938                    Duration::from_millis(0),
939                )
940                .await
941                .unwrap();
942            sleep(Duration::from_secs(5)).await;
943            assert_eq!(called.load(Relaxed), 0);
944
945            let urls = vec!["http://".to_owned() + &addr.to_string()];
946            let dotter = Dotter::new(
947                Timeouts::default_async_http_client(),
948                get_credential(),
949                BUCKET_NAME.to_owned(),
950                urls,
951                Some(Duration::from_millis(0)),
952                Some(1),
953                None,
954                None,
955                None,
956                None,
957                None,
958            )
959            .await;
960            assert!(dotter.inner.is_some());
961
962            let _guard = DottingDisableGuard::new();
963            dotter
964                .dot(
965                    DotType::Http,
966                    ApiName::IoGetfile,
967                    true,
968                    Duration::from_millis(0),
969                )
970                .await
971                .unwrap();
972            sleep(Duration::from_secs(5)).await;
973            assert_eq!(called.load(Relaxed), 0);
974        });
975
976        Ok(())
977    }
978
979    #[tokio::test]
980    async fn test_dotter_dot_something() -> Result<(), Box<dyn Error>> {
981        env_logger::try_init().ok();
982        clear_cache().await?;
983        let records_map = Arc::new(AsyncDotRecordsMap::default());
984
985        let routes = {
986            let records_map = records_map.to_owned();
987            path!("v1" / "stat")
988                .and(warp::header::value(AUTHORIZATION.as_str()))
989                .and(warp::body::json())
990                .then(move |authorization: HeaderValue, records: DotRecords| {
991                    assert!(authorization.to_str().unwrap().starts_with("UpToken "));
992                    let records_map = records_map.to_owned();
993                    async move {
994                        records_map.merge_with_records(records).await;
995                        Response::new(Body::empty())
996                    }
997                })
998        };
999
1000        starts_with_server!(addr, routes, {
1001            let urls = vec![
1002                "http://".to_owned() + &addr.to_string() + "1",
1003                "http://".to_owned() + &addr.to_string() + "2",
1004                "http://".to_owned() + &addr.to_string() + "3",
1005                "http://".to_owned() + &addr.to_string() + "4",
1006                "http://".to_owned() + &addr.to_string(),
1007            ];
1008            let dotter = Dotter::new(
1009                Timeouts::default_async_http_client(),
1010                get_credential(),
1011                BUCKET_NAME.to_owned(),
1012                urls,
1013                Some(Duration::from_millis(0)),
1014                Some(1),
1015                None,
1016                None,
1017                None,
1018                None,
1019                None,
1020            )
1021            .await;
1022
1023            let mut tasks = Vec::new();
1024            tasks.push({
1025                let dotter = dotter.to_owned();
1026                spawn(async move {
1027                    dotter
1028                        .dot(
1029                            DotType::Sdk,
1030                            ApiName::IoGetfile,
1031                            true,
1032                            Duration::from_millis(10),
1033                        )
1034                        .await
1035                        .unwrap();
1036                })
1037            });
1038            tasks.push({
1039                let dotter = dotter.to_owned();
1040                spawn(async move {
1041                    dotter
1042                        .dot(
1043                            DotType::Sdk,
1044                            ApiName::IoGetfile,
1045                            false,
1046                            Duration::from_millis(12),
1047                        )
1048                        .await
1049                        .unwrap();
1050                })
1051            });
1052            tasks.push({
1053                let dotter = dotter.to_owned();
1054                spawn(async move {
1055                    dotter
1056                        .dot(
1057                            DotType::Sdk,
1058                            ApiName::UcV4Query,
1059                            true,
1060                            Duration::from_millis(14),
1061                        )
1062                        .await
1063                        .unwrap();
1064                })
1065            });
1066            tasks.push({
1067                let dotter = dotter.to_owned();
1068                spawn(async move {
1069                    dotter
1070                        .dot(
1071                            DotType::Sdk,
1072                            ApiName::UcV4Query,
1073                            true,
1074                            Duration::from_millis(16),
1075                        )
1076                        .await
1077                        .unwrap();
1078                })
1079            });
1080            tasks.push({
1081                let dotter = dotter.to_owned();
1082                spawn(async move {
1083                    dotter
1084                        .dot(
1085                            DotType::Sdk,
1086                            ApiName::UcV4Query,
1087                            false,
1088                            Duration::from_millis(18),
1089                        )
1090                        .await
1091                        .unwrap();
1092                })
1093            });
1094            tasks.push({
1095                let dotter = dotter.to_owned();
1096                spawn(async move {
1097                    dotter
1098                        .dot(
1099                            DotType::Http,
1100                            ApiName::IoGetfile,
1101                            true,
1102                            Duration::from_millis(20),
1103                        )
1104                        .await
1105                        .unwrap();
1106                })
1107            });
1108            tasks.push({
1109                let dotter = dotter.to_owned();
1110                spawn(async move {
1111                    dotter
1112                        .dot(
1113                            DotType::Http,
1114                            ApiName::IoGetfile,
1115                            true,
1116                            Duration::from_millis(22),
1117                        )
1118                        .await
1119                        .unwrap();
1120                })
1121            });
1122            tasks.push({
1123                let dotter = dotter.to_owned();
1124                spawn(async move {
1125                    dotter
1126                        .dot(
1127                            DotType::Http,
1128                            ApiName::IoGetfile,
1129                            false,
1130                            Duration::from_millis(24),
1131                        )
1132                        .await
1133                        .unwrap();
1134                })
1135            });
1136            tasks.push({
1137                let dotter = dotter.to_owned();
1138                spawn(async move {
1139                    dotter
1140                        .dot(
1141                            DotType::Http,
1142                            ApiName::UcV4Query,
1143                            true,
1144                            Duration::from_millis(26),
1145                        )
1146                        .await
1147                        .unwrap();
1148                })
1149            });
1150            tasks.push({
1151                let dotter = dotter.to_owned();
1152                spawn(async move {
1153                    dotter
1154                        .dot(
1155                            DotType::Http,
1156                            ApiName::UcV4Query,
1157                            true,
1158                            Duration::from_millis(28),
1159                        )
1160                        .await
1161                        .unwrap();
1162                })
1163            });
1164            tasks.push({
1165                let dotter = dotter.to_owned();
1166                spawn(async move {
1167                    dotter
1168                        .dot(
1169                            DotType::Http,
1170                            ApiName::UcV4Query,
1171                            true,
1172                            Duration::from_millis(28),
1173                        )
1174                        .await
1175                        .unwrap();
1176                })
1177            });
1178            tasks.push({
1179                let dotter = dotter.to_owned();
1180                spawn(async move {
1181                    dotter
1182                        .dot(
1183                            DotType::Http,
1184                            ApiName::UcV4Query,
1185                            false,
1186                            Duration::from_millis(30),
1187                        )
1188                        .await
1189                        .unwrap();
1190                })
1191            });
1192            tasks.push({
1193                let dotter = dotter.to_owned();
1194                spawn(async move {
1195                    dotter
1196                        .dot(
1197                            DotType::Http,
1198                            ApiName::UcV4Query,
1199                            true,
1200                            Duration::from_millis(32),
1201                        )
1202                        .await
1203                        .unwrap();
1204                })
1205            });
1206            join_all(tasks).await;
1207            sleep(Duration::from_secs(5)).await;
1208            {
1209                let record = records_map
1210                    .read_async(
1211                        &DotRecordKey::new(DotType::Sdk, ApiName::UcV4Query),
1212                        |_, record| record.to_owned(),
1213                    )
1214                    .await
1215                    .unwrap();
1216                assert_eq!(record.success_count(), Some(2));
1217                assert_eq!(record.failed_count(), Some(1));
1218                assert_eq!(record.success_avg_elapsed_duration_ms(), Some(15));
1219                assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(18));
1220            }
1221            {
1222                let record = records_map
1223                    .read_async(
1224                        &DotRecordKey::new(DotType::Sdk, ApiName::IoGetfile),
1225                        |_, record| record.to_owned(),
1226                    )
1227                    .await
1228                    .unwrap();
1229                assert_eq!(record.success_count(), Some(1));
1230                assert_eq!(record.failed_count(), Some(1));
1231                assert_eq!(record.success_avg_elapsed_duration_ms(), Some(10));
1232                assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(12));
1233            }
1234            {
1235                let record = records_map
1236                    .read_async(
1237                        &DotRecordKey::new(DotType::Http, ApiName::UcV4Query),
1238                        |_, record| record.to_owned(),
1239                    )
1240                    .await
1241                    .unwrap();
1242                assert_eq!(record.success_count(), Some(4));
1243                assert_eq!(record.failed_count(), Some(1));
1244                assert_eq!(record.success_avg_elapsed_duration_ms(), Some(28));
1245                assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(30));
1246            }
1247            {
1248                let record = records_map
1249                    .read_async(
1250                        &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1251                        |_, record| record.to_owned(),
1252                    )
1253                    .await
1254                    .unwrap();
1255                assert_eq!(record.success_count(), Some(2));
1256                assert_eq!(record.failed_count(), Some(1));
1257                assert_eq!(record.success_avg_elapsed_duration_ms(), Some(21));
1258                assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(24));
1259            }
1260        });
1261        Ok(())
1262    }
1263
1264    #[tokio::test]
1265    async fn test_dotter_punish() -> Result<(), Box<dyn Error>> {
1266        env_logger::try_init().ok();
1267        clear_cache().await?;
1268        let records_map = Arc::new(AsyncDotRecordsMap::default());
1269
1270        let routes = {
1271            let records_map = records_map.to_owned();
1272            path!("v1" / "stat")
1273                .and(warp::header::value(AUTHORIZATION.as_str()))
1274                .and(warp::body::json())
1275                .then(move |authorization: HeaderValue, records: DotRecords| {
1276                    assert!(authorization.to_str().unwrap().starts_with("UpToken "));
1277                    let records_map = records_map.to_owned();
1278                    async move {
1279                        records_map.merge_with_records(records).await;
1280                        Response::new(Body::empty())
1281                    }
1282                })
1283        };
1284        starts_with_server!(addr, routes, {
1285            let urls = vec!["http://".to_owned() + &addr.to_string()];
1286            let dotter = Dotter::new(
1287                Timeouts::default_async_http_client(),
1288                get_credential(),
1289                BUCKET_NAME.to_owned(),
1290                urls,
1291                Some(Duration::from_millis(0)),
1292                Some(1),
1293                None,
1294                None,
1295                None,
1296                None,
1297                None,
1298            )
1299            .await;
1300
1301            let mut tasks = Vec::new();
1302            tasks.push({
1303                let dotter = dotter.to_owned();
1304                spawn(async move {
1305                    dotter
1306                        .dot(
1307                            DotType::Sdk,
1308                            ApiName::IoGetfile,
1309                            true,
1310                            Duration::from_millis(10),
1311                        )
1312                        .await
1313                        .unwrap();
1314                })
1315            });
1316            for _ in 0..5 {
1317                let dotter = dotter.to_owned();
1318                tasks.push(spawn(async move {
1319                    dotter.punish().await.unwrap();
1320                }));
1321            }
1322
1323            sleep(Duration::from_secs(5)).await;
1324            {
1325                let record = records_map
1326                    .read_async(
1327                        &DotRecordKey::new(DotType::Sdk, ApiName::IoGetfile),
1328                        |_, record| record.to_owned(),
1329                    )
1330                    .await
1331                    .unwrap();
1332                assert_eq!(record.success_count(), Some(1));
1333                assert_eq!(record.failed_count(), Some(0));
1334                assert_eq!(record.success_avg_elapsed_duration_ms(), Some(10));
1335                assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(0));
1336            }
1337            {
1338                let record = records_map
1339                    .read_async(&DotRecordKey::punished(), |_, record| record.to_owned())
1340                    .await
1341                    .unwrap();
1342                assert_eq!(record.punished_count(), Some(5));
1343            }
1344        });
1345        Ok(())
1346    }
1347
1348    async fn clear_cache() -> IoResult<()> {
1349        let cache_file_path = cache_dir_path_of(DOT_FILE_NAME).await?;
1350        remove_file(&cache_file_path).await.or_else(|err| {
1351            if err.kind() == IoErrorKind::NotFound {
1352                Ok(())
1353            } else {
1354                Err(err)
1355            }
1356        })
1357    }
1358}