1use super::{
2 super::base::{
3 credential::Credential, upload_policy::UploadPolicy, upload_token::sign_upload_token,
4 },
5 cache_dir::cache_dir_path_of,
6 host_selector::{HostInfo, HostSelector, PunishResult},
7};
8use fd_lock::RwLock as FdRwLock;
9use futures::future::join_all;
10use log::{debug, info, warn};
11use reqwest::{header::AUTHORIZATION, Client as HttpClient, StatusCode};
12use scc::HashMap;
13use serde::{de::Error as DeserializeError, Deserialize, Serialize};
14use serde_json::Value as JSONValue;
15use std::{
16 collections::HashMap as StdHashMap,
17 convert::TryFrom,
18 fmt::{self, Debug},
19 future::Future,
20 io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
21 ops::Deref,
22 sync::{
23 atomic::{AtomicBool, Ordering::Relaxed},
24 Arc,
25 },
26 time::{Duration, Instant, SystemTime},
27};
28use tap::prelude::*;
29use tokio::{
30 fs::{File, OpenOptions},
31 io::{AsyncBufReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
32 spawn,
33 sync::Mutex,
34};
35
36static DOTTING_DISABLED: AtomicBool = AtomicBool::new(false);
37
38pub fn disable_dotting() {
41 DOTTING_DISABLED.store(true, Relaxed)
42}
43
44pub fn enable_dotting() {
47 DOTTING_DISABLED.store(false, Relaxed)
48}
49
50pub fn is_dotting_disabled() -> bool {
53 DOTTING_DISABLED.load(Relaxed)
54}
55
56static DOT_UPLOADING_DISABLED: AtomicBool = AtomicBool::new(false);
57
58pub fn disable_dot_uploading() {
61 DOT_UPLOADING_DISABLED.store(true, Relaxed)
62}
63
64pub fn enable_dot_uploading() {
67 DOT_UPLOADING_DISABLED.store(false, Relaxed)
68}
69
70pub fn is_dot_uploading_disabled() -> bool {
73 DOT_UPLOADING_DISABLED.load(Relaxed)
74}
75
76#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
77#[serde(rename_all = "lowercase")]
78pub(super) enum DotType {
79 Sdk,
80 Http,
81}
82
83impl fmt::Display for DotType {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 match self {
86 Self::Http => write!(f, "http"),
87 Self::Sdk => write!(f, "sdk"),
88 }
89 }
90}
91
92#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
93#[serde(rename_all = "snake_case")]
94pub(super) enum ApiName {
95 IoGetfile,
96 MonitorV1Stat,
97 UcV4Query,
98 RangeReaderReadAt,
99 RangeReaderReadMultiRanges,
100 RangeReaderExist,
101 RangeReaderFileSize,
102 RangeReaderDownloadTo,
103 RangeReaderReadLastBytes,
104}
105
106impl fmt::Display for ApiName {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 match self {
109 Self::IoGetfile => write!(f, "io_getfile"),
110 Self::MonitorV1Stat => write!(f, "monitor_v1_stat"),
111 Self::UcV4Query => write!(f, "uc_v4_query"),
112 Self::RangeReaderReadAt => write!(f, "range_reader_read_at"),
113 Self::RangeReaderReadMultiRanges => write!(f, "range_reader_read_multi_ranges"),
114 Self::RangeReaderExist => write!(f, "range_reader_exist"),
115 Self::RangeReaderFileSize => write!(f, "range_reader_file_size"),
116 Self::RangeReaderDownloadTo => write!(f, "range_reader_download_to"),
117 Self::RangeReaderReadLastBytes => write!(f, "range_reader_read_last_bytes"),
118 }
119 }
120}
121
122#[derive(Clone, Debug, Default)]
123pub(super) struct Dotter {
124 inner: Option<Arc<DotterInner>>,
125}
126
127struct DotterInner {
128 credential: Credential,
129 bucket: String,
130 monitor_selector: HostSelector,
131 buffered_records: AsyncDotRecordsMap,
132 buffered_file: Mutex<FdRwLock<File>>,
133 interval: Duration,
134 uploaded_at: Instant,
135 max_buffer_size: u64,
136 tries: usize,
137 http_client: Arc<HttpClient>,
138}
139
140impl Debug for DotterInner {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 f.debug_struct("DotterInner")
143 .field("credential", &self.credential)
144 .field("bucket", &self.bucket)
145 .field("monitor_selector", &self.monitor_selector)
146 .field("buffered_file", &self.buffered_file)
147 .field("interval", &self.interval)
148 .field("uploaded_at", &self.uploaded_at)
149 .field("max_buffer_size", &self.max_buffer_size)
150 .field("tries", &self.tries)
151 .field("http_client", &self.http_client)
152 .finish()
153 }
154}
155
156pub(super) const DOT_FILE_NAME: &str = "dot-file";
157
158impl Dotter {
159 #[allow(clippy::too_many_arguments)]
160 pub(super) async fn new(
161 http_client: Arc<HttpClient>,
162 credential: Credential,
163 bucket: String,
164 monitor_urls: Vec<String>,
165 interval: Option<Duration>,
166 max_buffer_size: Option<u64>,
167 tries: Option<usize>,
168 punish_duration: Option<Duration>,
169 max_punished_times: Option<usize>,
170 max_punished_hosts_percent: Option<u8>,
171 base_timeout: Option<Duration>,
172 ) -> Dotter {
173 if !monitor_urls.is_empty() {
174 if let Ok(buffered_file_path) = cache_dir_path_of(DOT_FILE_NAME).await {
175 if let Ok(buffer_file) = OpenOptions::new()
176 .create(true)
177 .write(true)
178 .append(true)
179 .open(&buffered_file_path)
180 .await
181 {
182 let monitor_selector = HostSelector::builder(monitor_urls)
183 .punish_duration(punish_duration.unwrap_or_else(|| Duration::from_secs(30)))
184 .max_punished_times(max_punished_times.unwrap_or(5))
185 .max_punished_hosts_percent(max_punished_hosts_percent.unwrap_or(50))
186 .base_timeout(base_timeout.unwrap_or_else(|| Duration::from_secs(1)))
187 .build()
188 .await;
189 return Self {
190 inner: Some(Arc::new(DotterInner {
191 credential,
192 bucket,
193 monitor_selector,
194 http_client,
195 buffered_records: Default::default(),
196 buffered_file: Mutex::new(FdRwLock::new(buffer_file)),
197 interval: interval.unwrap_or_else(|| Duration::from_secs(10)),
198 uploaded_at: Instant::now(),
199 max_buffer_size: max_buffer_size.unwrap_or(1 << 20),
200 tries: tries.unwrap_or(10),
201 })),
202 };
203 }
204 }
205 }
206 Self { inner: None }
207 }
208
209 pub(super) async fn dot(
210 &self,
211 dot_type: DotType,
212 api_name: ApiName,
213 successful: bool,
214 elapsed_duration: Duration,
215 ) -> IoResult<()> {
216 if is_dotting_disabled() {
217 debug!("dotting is disabled")
218 } else if let Some(inner) = self.inner.as_ref() {
219 inner
220 .fast_dot(dot_type, api_name, successful, elapsed_duration)
221 .await;
222 inner
223 .lock_buffered_file(|mut buffered_file| async move {
224 inner.flush_to_file(&mut buffered_file).await?;
225 if inner.is_time_to_upload(&buffered_file).await? {
226 self.async_upload();
227 }
228 Ok(())
229 })
230 .await?;
231 }
232 Ok(())
233 }
234
235 pub(super) async fn punish(&self) -> IoResult<()> {
236 if is_dotting_disabled() {
237 debug!("dotting is disabled")
238 } else if let Some(inner) = self.inner.as_ref() {
239 inner.fast_punish().await;
240 inner
241 .lock_buffered_file(|mut buffered_file| async move {
242 inner.flush_to_file(&mut buffered_file).await?;
243 if inner.is_time_to_upload(&buffered_file).await? {
244 self.async_upload();
245 }
246 Ok(())
247 })
248 .await?;
249 }
250 Ok(())
251 }
252
253 fn async_upload(&self) {
254 if let Some(inner) = self.inner.as_ref() {
255 let inner = inner.to_owned();
256 spawn(async move {
257 let inner2 = inner.to_owned();
258 inner
259 .lock_buffered_file(|buffered_file| async move {
260 if inner2.is_time_to_upload(&buffered_file).await? {
261 inner2.do_upload().await?;
262 }
263 Ok(())
264 })
265 .await
266 });
267 }
268 }
269}
270
271impl DotterInner {
272 async fn fast_dot(
273 &self,
274 dot_type: DotType,
275 api_name: ApiName,
276 successful: bool,
277 elapsed_duration: Duration,
278 ) {
279 let record = if successful {
280 DotRecord::new(
281 dot_type,
282 api_name,
283 1,
284 Default::default(),
285 elapsed_duration.as_millis(),
286 Default::default(),
287 )
288 } else {
289 DotRecord::new(
290 dot_type,
291 api_name,
292 Default::default(),
293 1,
294 Default::default(),
295 elapsed_duration.as_millis(),
296 )
297 };
298 self.buffered_records.merge_with_record(record).await;
299 }
300
301 async fn fast_punish(&self) {
302 self.buffered_records
303 .merge_with_record(DotRecord::punished())
304 .await;
305 }
306
307 async fn flush_to_file(&self, buffered_file: &mut File) -> IoResult<()> {
308 let buffered_file = Arc::new(Mutex::new(BufWriter::new(buffered_file)));
309 {
310 let mut futures = vec![];
311 self.buffered_records
312 .scan_async(|key, record| {
313 let key = key.to_owned();
314 let record = record.to_owned();
315 let buffered_file = buffered_file.to_owned();
316 futures.push(async move {
317 if write_to_file(&record, &mut *buffered_file.lock().await)
318 .await
319 .is_ok()
320 {
321 Some(key)
322 } else {
323 None
324 }
325 })
326 })
327 .await;
328 for key in join_all(futures).await.into_iter().flatten() {
329 self.buffered_records.remove_async(&key).await;
330 }
331 }
332
333 Arc::try_unwrap(buffered_file)
334 .unwrap()
335 .into_inner()
336 .flush()
337 .await?;
338
339 return Ok(());
340
341 async fn write_to_file<W: AsyncWrite + Unpin>(
342 record: &DotRecord,
343 file: &mut W,
344 ) -> anyhow::Result<()> {
345 let mut line = serde_json::to_string(record)?;
346 line.push('\n');
347 file.write_all(line.as_bytes())
348 .await
349 .tap_err(|err| warn!("the dot file is failed to write: {:?}", err))?;
350 Ok(())
351 }
352 }
353
354 async fn is_time_to_upload(&self, buffered_file: &File) -> IoResult<bool> {
355 if is_dotting_disabled() || is_dot_uploading_disabled() {
356 debug!("dot uploading is disabled, will not upload the dot file now");
357 return Ok(false);
358 }
359 let result = self.uploaded_at.elapsed() > self.interval
360 || buffered_file
361 .metadata()
362 .await
363 .tap_err(|err| warn!("stat the dot file error: {:?}", err))?
364 .len()
365 > self.max_buffer_size;
366 if !result {
367 debug!("dot uploading condition is not satisfied")
368 }
369 Ok(result)
370 }
371
372 async fn do_upload(&self) -> IoResult<()> {
373 self.upload_with_retry(|host_info| async move {
374 let mut buffered_file = OpenOptions::new()
375 .read(true)
376 .write(true)
377 .open(&cache_dir_path_of(DOT_FILE_NAME).await?)
378 .await?;
379 let url = format!("{}/v1/stat", host_info.host());
380 debug!("try to upload dots to {}", url);
381 let uptoken = sign_upload_token(
382 &self.credential,
383 &UploadPolicy::new_for_bucket(
384 self.bucket.to_owned(),
385 SystemTime::now() + Duration::from_secs(30),
386 ),
387 );
388 let begin_at = Instant::now();
389 let response_result = self
390 .http_client
391 .post(&url)
392 .header(AUTHORIZATION, format!("UpToken {}", uptoken))
393 .json(&self.make_request_body(&mut buffered_file).await?)
394 .timeout(host_info.timeout())
395 .send()
396 .await;
397 if let Err(err) = &response_result {
398 if err.is_timeout() {
399 self.monitor_selector
400 .increase_timeout_power_by(host_info.host(), host_info.timeout_power())
401 .await;
402 }
403 }
404 let response_result = response_result
405 .map_err(|err| IoError::new(IoErrorKind::ConnectionAborted, err))
406 .and_then(|resp| {
407 if resp.status() != StatusCode::OK {
408 Err(IoError::new(
409 IoErrorKind::Other,
410 format!("Unexpected status code {}", resp.status().as_u16()),
411 ))
412 } else {
413 Ok(())
414 }
415 });
416 self.fast_dot(
417 DotType::Http,
418 ApiName::MonitorV1Stat,
419 response_result.is_ok(),
420 begin_at.elapsed(),
421 )
422 .await;
423 response_result
424 .tap_ok(|_| info!("upload dots succeed"))
425 .tap_err(|err| warn!("failed to upload dots: {:?}", err))?;
426 buffered_file.set_len(0).await?;
427 Ok(())
428 })
429 .await?;
430 Ok(())
431 }
432
433 async fn make_request_body(&self, buffered_file: &mut File) -> IoResult<DotRecords> {
434 buffered_file.seek(SeekFrom::Start(0)).await?;
435 let file_reader = BufReader::new(buffered_file);
436 let mut lines = file_reader.lines();
437 let mut map = DotRecordsMap::default();
438
439 while let Some(line) = lines.next_line().await? {
440 if line.is_empty() {
441 continue;
442 }
443 if let Ok(record) = serde_json::from_str::<DotRecord>(&line) {
444 map.merge_with_record(record);
445 }
446 }
447 Ok(map.into_records())
448 }
449
450 async fn upload_with_retry<F: FnMut(HostInfo) -> Fut, Fut: Future<Output = IoResult<()>>>(
451 &self,
452 mut for_each_host: F,
453 ) -> IoResult<()> {
454 let mut last_error = None;
455 for _ in 0..self.tries {
456 if let Some(host_info) = self.monitor_selector.select_host(&Default::default()).await {
458 match for_each_host(host_info.to_owned()).await {
459 Ok(response) => {
460 self.monitor_selector.reward(host_info.host()).await;
461 return Ok(response);
462 }
463 Err(err) => {
464 let punished_result = self
465 .monitor_selector
466 .punish_without_dotter(host_info.host(), &err)
467 .await;
468 match punished_result {
469 PunishResult::NoPunishment => {
470 return Err(err);
471 }
472 PunishResult::PunishedAndFreezed => {
473 self.fast_punish().await;
474 }
475 PunishResult::Punished => {}
476 }
477 last_error = Some(err);
478 }
479 }
480 } else {
481 break;
482 }
483 }
484 last_error.map(Err).unwrap_or(Ok(()))
485 }
486
487 #[cfg(not(test))]
488 async fn lock_buffered_file<F: FnOnce(File) -> Fut, Fut: Future<Output = IoResult<()>>>(
489 &self,
490 f: F,
491 ) -> IoResult<()> {
492 if let Ok(mut buffered_file) = self.buffered_file.try_lock() {
493 loop {
494 match buffered_file.try_write() {
495 Ok(buffered_file) => {
496 let buffered_file = buffered_file.try_clone().await?;
497 return f(buffered_file).await;
498 }
499 Err(err) if err.kind() == IoErrorKind::WouldBlock => {
500 debug!("the dot file is locked");
501 return Ok(());
502 }
503 Err(err) if err.kind() == IoErrorKind::Interrupted => {
504 continue;
505 }
506 Err(err) => {
507 warn!("lock the dot file error: {:?}", err);
508 return Err(err);
509 }
510 }
511 }
512 } else {
513 debug!("the dot file is locked");
514 }
515 Ok(())
516 }
517
518 #[cfg(test)]
519 async fn lock_buffered_file<F: FnOnce(File) -> T, T: Future<Output = IoResult<()>>>(
520 &self,
521 f: F,
522 ) -> IoResult<()> {
523 let mut buffered_file = self.buffered_file.lock().await;
524 loop {
525 match buffered_file.write() {
526 Ok(buffered_file) => {
527 let buffered_file = buffered_file.try_clone().await?;
528 return f(buffered_file).await;
529 }
530 Err(err) if err.kind() == IoErrorKind::Interrupted => {
531 continue;
532 }
533 Err(err) => {
534 warn!("lock the dot file error: {:?}", err);
535 return Err(err);
536 }
537 }
538 }
539 }
540}
541
542#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
543#[serde(untagged)]
544pub(super) enum DotRecordKey {
545 APICalls {
546 dot_type: DotType,
547 api_name: ApiName,
548 },
549 PunishedCount,
550}
551
552impl DotRecordKey {
553 pub(super) fn new(dot_type: DotType, api_name: ApiName) -> Self {
554 Self::APICalls { dot_type, api_name }
555 }
556
557 pub(super) fn punished() -> Self {
558 Self::PunishedCount
559 }
560}
561
562#[derive(Serialize, Clone, Debug)]
563#[serde(untagged)]
564pub(super) enum DotRecord {
565 APICalls(APICallsDotRecord),
566 PunishedCount(PunishedCountDotRecord),
567}
568
569#[derive(Serialize, Deserialize, Clone, Debug)]
570pub(super) struct APICallsDotRecord {
571 #[serde(rename = "type")]
572 dot_type: DotType,
573
574 api_name: ApiName,
575 success_count: usize,
576 success_avg_elapsed_duration: u128,
577 failed_count: usize,
578 failed_avg_elapsed_duration: u128,
579}
580
581#[derive(Serialize, Deserialize, Clone, Debug)]
582pub(super) struct PunishedCountDotRecord {
583 punished_count: usize,
584}
585
586impl DotRecord {
587 fn new(
588 dot_type: DotType,
589 api_name: ApiName,
590 success_count: usize,
591 failed_count: usize,
592 success_avg_elapsed_duration: u128,
593 failed_avg_elapsed_duration: u128,
594 ) -> Self {
595 Self::APICalls(APICallsDotRecord {
596 dot_type,
597 api_name,
598 success_count,
599 success_avg_elapsed_duration,
600 failed_count,
601 failed_avg_elapsed_duration,
602 })
603 }
604
605 fn punished() -> Self {
606 Self::PunishedCount(PunishedCountDotRecord { punished_count: 1 })
607 }
608
609 pub(super) fn key(&self) -> DotRecordKey {
610 match self {
611 Self::APICalls(record) => DotRecordKey::new(record.dot_type, record.api_name),
612 Self::PunishedCount(_) => DotRecordKey::punished(),
613 }
614 }
615
616 #[cfg(test)]
617
618 pub(super) fn dot_type(&self) -> Option<DotType> {
619 match self {
620 Self::APICalls(record) => Some(record.dot_type),
621 _ => None,
622 }
623 }
624
625 #[cfg(test)]
626
627 pub(super) fn api_name(&self) -> Option<ApiName> {
628 match self {
629 Self::APICalls(record) => Some(record.api_name),
630 _ => None,
631 }
632 }
633
634 #[cfg(test)]
635
636 pub(super) fn success_count(&self) -> Option<usize> {
637 match self {
638 Self::APICalls(record) => Some(record.success_count),
639 _ => None,
640 }
641 }
642
643 #[cfg(test)]
644
645 pub(super) fn success_avg_elapsed_duration_ms(&self) -> Option<u128> {
646 match self {
647 Self::APICalls(record) => Some(record.success_avg_elapsed_duration),
648 _ => None,
649 }
650 }
651
652 #[cfg(test)]
653
654 pub(super) fn failed_count(&self) -> Option<usize> {
655 match self {
656 Self::APICalls(record) => Some(record.failed_count),
657 _ => None,
658 }
659 }
660
661 #[cfg(test)]
662
663 pub(super) fn failed_avg_elapsed_duration_ms(&self) -> Option<u128> {
664 match self {
665 Self::APICalls(record) => Some(record.failed_avg_elapsed_duration),
666 _ => None,
667 }
668 }
669
670 #[cfg(test)]
671
672 pub(super) fn punished_count(&self) -> Option<usize> {
673 match self {
674 Self::PunishedCount(record) => Some(record.punished_count),
675 _ => None,
676 }
677 }
678}
679
680impl<'de> Deserialize<'de> for DotRecord {
681 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
682 where
683 D: serde::Deserializer<'de>,
684 {
685 let value = JSONValue::deserialize(deserializer)?;
686 if let Ok(record) = APICallsDotRecord::deserialize(&value) {
687 Ok(Self::APICalls(record))
688 } else {
689 PunishedCountDotRecord::deserialize(&value)
690 .map(Self::PunishedCount)
691 .map_err(DeserializeError::custom)
692 }
693 }
694}
695
696#[derive(Serialize, Deserialize, Default, Debug)]
697pub(super) struct DotRecords {
698 #[serde(rename = "logs")]
699 records: Vec<DotRecord>,
700}
701
702impl DotRecords {
703 #[cfg(test)]
704
705 pub(super) fn records(&self) -> &[DotRecord] {
706 self.records.as_ref()
707 }
708}
709
710#[derive(Debug, Clone, Default)]
711pub(super) struct DotRecordsMap(StdHashMap<DotRecordKey, DotRecord>);
712
713impl DotRecordsMap {
714 #[allow(dead_code)]
715 pub(super) fn merge_with_record(&mut self, record: DotRecord) {
716 self.0
717 .entry(record.key())
718 .and_modify(|mut r| match (&mut r, &record) {
719 (DotRecord::APICalls(r), DotRecord::APICalls(record)) => {
720 let success_elapsed_duration_total = r.success_avg_elapsed_duration
721 * to_u128(r.success_count)
722 + record.success_avg_elapsed_duration * to_u128(record.success_count);
723 let failed_elapsed_duration_total = r.failed_avg_elapsed_duration
724 * to_u128(r.failed_count)
725 + record.failed_avg_elapsed_duration * to_u128(record.failed_count);
726 r.success_count += record.success_count;
727 r.failed_count += record.failed_count;
728 r.success_avg_elapsed_duration = if r.success_count > 0 {
729 success_elapsed_duration_total / to_u128(r.success_count)
730 } else {
731 0
732 };
733 r.failed_avg_elapsed_duration = if r.failed_count > 0 {
734 failed_elapsed_duration_total / to_u128(r.failed_count)
735 } else {
736 0
737 };
738 }
739 (DotRecord::PunishedCount(r), DotRecord::PunishedCount(record)) => {
740 r.punished_count += record.punished_count;
741 }
742 _ => panic!("Impossible merge with {:?} and {:?}", r, record),
743 })
744 .or_insert(record);
745
746 fn to_u128(v: usize) -> u128 {
747 u128::try_from(v).unwrap_or(u128::MAX)
748 }
749 }
750
751 #[allow(dead_code)]
752 pub(super) fn merge_with_records(&mut self, records: DotRecords) {
753 for record in records.records.into_iter() {
754 self.merge_with_record(record);
755 }
756 }
757
758 #[allow(dead_code)]
759 pub(super) fn into_records(self) -> DotRecords {
760 DotRecords {
761 records: self.0.into_values().collect(),
762 }
763 }
764}
765
766impl Deref for DotRecordsMap {
767 type Target = StdHashMap<DotRecordKey, DotRecord>;
768
769 fn deref(&self) -> &Self::Target {
770 &self.0
771 }
772}
773
774#[derive(Default)]
775pub(super) struct AsyncDotRecordsMap(HashMap<DotRecordKey, DotRecord>);
776
777impl AsyncDotRecordsMap {
778 #[allow(dead_code)]
779 pub(super) async fn merge_with_record(&self, record: DotRecord) {
780 self.0
781 .entry_async(record.key())
782 .await
783 .and_modify(|mut r| match (&mut r, &record) {
784 (DotRecord::APICalls(r), DotRecord::APICalls(record)) => {
785 let success_elapsed_duration_total = r.success_avg_elapsed_duration
786 * to_u128(r.success_count)
787 + record.success_avg_elapsed_duration * to_u128(record.success_count);
788 let failed_elapsed_duration_total = r.failed_avg_elapsed_duration
789 * to_u128(r.failed_count)
790 + record.failed_avg_elapsed_duration * to_u128(record.failed_count);
791 r.success_count += record.success_count;
792 r.failed_count += record.failed_count;
793 r.success_avg_elapsed_duration = if r.success_count > 0 {
794 success_elapsed_duration_total / to_u128(r.success_count)
795 } else {
796 0
797 };
798 r.failed_avg_elapsed_duration = if r.failed_count > 0 {
799 failed_elapsed_duration_total / to_u128(r.failed_count)
800 } else {
801 0
802 };
803 }
804 (DotRecord::PunishedCount(r), DotRecord::PunishedCount(record)) => {
805 r.punished_count += record.punished_count;
806 }
807 _ => panic!("Impossible merge with {:?} and {:?}", r, record),
808 })
809 .or_insert_with(|| record.to_owned());
810
811 fn to_u128(v: usize) -> u128 {
812 u128::try_from(v).unwrap_or(u128::MAX)
813 }
814 }
815
816 #[allow(dead_code)]
817 pub(super) async fn merge_with_records(&self, records: DotRecords) {
818 for record in records.records.into_iter() {
819 self.merge_with_record(record).await;
820 }
821 }
822
823 #[allow(dead_code)]
824 pub(super) async fn into_records(self) -> DotRecords {
825 let mut records = Vec::new();
826 self.0
827 .scan_async(|_, record| {
828 records.push(record.to_owned());
829 })
830 .await;
831 DotRecords { records }
832 }
833}
834
835impl Deref for AsyncDotRecordsMap {
836 type Target = HashMap<DotRecordKey, DotRecord>;
837
838 fn deref(&self) -> &Self::Target {
839 &self.0
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use super::*;
846 use crate::config::Timeouts;
847 use futures::channel::oneshot::channel;
848 use futures::future::join_all;
849 use std::{error::Error, sync::atomic::AtomicUsize};
850 use tokio::{fs::remove_file, task::spawn, time::sleep};
851 use warp::{http::HeaderValue, hyper::Body, path, reply::Response, Filter};
852
853 macro_rules! starts_with_server {
854 ($addr:ident, $routes:ident, $code:block) => {{
855 let (tx, rx) = channel();
856 let ($addr, server) =
857 warp::serve($routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
858 rx.await.unwrap();
859 });
860 spawn(server);
861 sleep(Duration::from_secs(1)).await;
862 $code;
863 tx.send(()).unwrap();
864 }};
865 }
866
867 const ACCESS_KEY: &str = "1234567890";
868 const SECRET_KEY: &str = "abcdefghijk";
869 const BUCKET_NAME: &str = "test-bucket";
870
871 mod guard {
872 use super::{disable_dotting, enable_dotting, is_dotting_disabled};
873 pub(super) struct DottingDisableGuard {
874 enabled_before: bool,
875 }
876
877 impl DottingDisableGuard {
878 pub(super) fn new() -> Self {
879 let disabled_before = is_dotting_disabled();
880 if !disabled_before {
881 disable_dotting();
882 }
883 DottingDisableGuard {
884 enabled_before: !disabled_before,
885 }
886 }
887 }
888
889 impl Drop for DottingDisableGuard {
890 fn drop(&mut self) {
891 if self.enabled_before {
892 enable_dotting();
893 }
894 }
895 }
896 }
897 use guard::DottingDisableGuard;
898
899 fn get_credential() -> Credential {
900 Credential::new(ACCESS_KEY, SECRET_KEY)
901 }
902
903 #[tokio::test]
904 async fn test_dotter_dot_nothing() -> Result<(), Box<dyn Error>> {
905 env_logger::try_init().ok();
906 clear_cache().await?;
907
908 let called = Arc::new(AtomicUsize::new(0));
909 let routes = {
910 let called = called.to_owned();
911 path!("v1" / "stat").map(move || {
912 called.fetch_add(1, Relaxed);
913 Response::new(Body::empty())
914 })
915 };
916
917 starts_with_server!(addr, routes, {
918 let dotter = Dotter::new(
919 Timeouts::default_async_http_client(),
920 get_credential(),
921 BUCKET_NAME.to_owned(),
922 vec![],
923 None,
924 None,
925 None,
926 None,
927 None,
928 None,
929 None,
930 )
931 .await;
932 assert!(dotter.inner.is_none());
933 dotter
934 .dot(
935 DotType::Http,
936 ApiName::IoGetfile,
937 true,
938 Duration::from_millis(0),
939 )
940 .await
941 .unwrap();
942 sleep(Duration::from_secs(5)).await;
943 assert_eq!(called.load(Relaxed), 0);
944
945 let urls = vec!["http://".to_owned() + &addr.to_string()];
946 let dotter = Dotter::new(
947 Timeouts::default_async_http_client(),
948 get_credential(),
949 BUCKET_NAME.to_owned(),
950 urls,
951 Some(Duration::from_millis(0)),
952 Some(1),
953 None,
954 None,
955 None,
956 None,
957 None,
958 )
959 .await;
960 assert!(dotter.inner.is_some());
961
962 let _guard = DottingDisableGuard::new();
963 dotter
964 .dot(
965 DotType::Http,
966 ApiName::IoGetfile,
967 true,
968 Duration::from_millis(0),
969 )
970 .await
971 .unwrap();
972 sleep(Duration::from_secs(5)).await;
973 assert_eq!(called.load(Relaxed), 0);
974 });
975
976 Ok(())
977 }
978
979 #[tokio::test]
980 async fn test_dotter_dot_something() -> Result<(), Box<dyn Error>> {
981 env_logger::try_init().ok();
982 clear_cache().await?;
983 let records_map = Arc::new(AsyncDotRecordsMap::default());
984
985 let routes = {
986 let records_map = records_map.to_owned();
987 path!("v1" / "stat")
988 .and(warp::header::value(AUTHORIZATION.as_str()))
989 .and(warp::body::json())
990 .then(move |authorization: HeaderValue, records: DotRecords| {
991 assert!(authorization.to_str().unwrap().starts_with("UpToken "));
992 let records_map = records_map.to_owned();
993 async move {
994 records_map.merge_with_records(records).await;
995 Response::new(Body::empty())
996 }
997 })
998 };
999
1000 starts_with_server!(addr, routes, {
1001 let urls = vec![
1002 "http://".to_owned() + &addr.to_string() + "1",
1003 "http://".to_owned() + &addr.to_string() + "2",
1004 "http://".to_owned() + &addr.to_string() + "3",
1005 "http://".to_owned() + &addr.to_string() + "4",
1006 "http://".to_owned() + &addr.to_string(),
1007 ];
1008 let dotter = Dotter::new(
1009 Timeouts::default_async_http_client(),
1010 get_credential(),
1011 BUCKET_NAME.to_owned(),
1012 urls,
1013 Some(Duration::from_millis(0)),
1014 Some(1),
1015 None,
1016 None,
1017 None,
1018 None,
1019 None,
1020 )
1021 .await;
1022
1023 let mut tasks = Vec::new();
1024 tasks.push({
1025 let dotter = dotter.to_owned();
1026 spawn(async move {
1027 dotter
1028 .dot(
1029 DotType::Sdk,
1030 ApiName::IoGetfile,
1031 true,
1032 Duration::from_millis(10),
1033 )
1034 .await
1035 .unwrap();
1036 })
1037 });
1038 tasks.push({
1039 let dotter = dotter.to_owned();
1040 spawn(async move {
1041 dotter
1042 .dot(
1043 DotType::Sdk,
1044 ApiName::IoGetfile,
1045 false,
1046 Duration::from_millis(12),
1047 )
1048 .await
1049 .unwrap();
1050 })
1051 });
1052 tasks.push({
1053 let dotter = dotter.to_owned();
1054 spawn(async move {
1055 dotter
1056 .dot(
1057 DotType::Sdk,
1058 ApiName::UcV4Query,
1059 true,
1060 Duration::from_millis(14),
1061 )
1062 .await
1063 .unwrap();
1064 })
1065 });
1066 tasks.push({
1067 let dotter = dotter.to_owned();
1068 spawn(async move {
1069 dotter
1070 .dot(
1071 DotType::Sdk,
1072 ApiName::UcV4Query,
1073 true,
1074 Duration::from_millis(16),
1075 )
1076 .await
1077 .unwrap();
1078 })
1079 });
1080 tasks.push({
1081 let dotter = dotter.to_owned();
1082 spawn(async move {
1083 dotter
1084 .dot(
1085 DotType::Sdk,
1086 ApiName::UcV4Query,
1087 false,
1088 Duration::from_millis(18),
1089 )
1090 .await
1091 .unwrap();
1092 })
1093 });
1094 tasks.push({
1095 let dotter = dotter.to_owned();
1096 spawn(async move {
1097 dotter
1098 .dot(
1099 DotType::Http,
1100 ApiName::IoGetfile,
1101 true,
1102 Duration::from_millis(20),
1103 )
1104 .await
1105 .unwrap();
1106 })
1107 });
1108 tasks.push({
1109 let dotter = dotter.to_owned();
1110 spawn(async move {
1111 dotter
1112 .dot(
1113 DotType::Http,
1114 ApiName::IoGetfile,
1115 true,
1116 Duration::from_millis(22),
1117 )
1118 .await
1119 .unwrap();
1120 })
1121 });
1122 tasks.push({
1123 let dotter = dotter.to_owned();
1124 spawn(async move {
1125 dotter
1126 .dot(
1127 DotType::Http,
1128 ApiName::IoGetfile,
1129 false,
1130 Duration::from_millis(24),
1131 )
1132 .await
1133 .unwrap();
1134 })
1135 });
1136 tasks.push({
1137 let dotter = dotter.to_owned();
1138 spawn(async move {
1139 dotter
1140 .dot(
1141 DotType::Http,
1142 ApiName::UcV4Query,
1143 true,
1144 Duration::from_millis(26),
1145 )
1146 .await
1147 .unwrap();
1148 })
1149 });
1150 tasks.push({
1151 let dotter = dotter.to_owned();
1152 spawn(async move {
1153 dotter
1154 .dot(
1155 DotType::Http,
1156 ApiName::UcV4Query,
1157 true,
1158 Duration::from_millis(28),
1159 )
1160 .await
1161 .unwrap();
1162 })
1163 });
1164 tasks.push({
1165 let dotter = dotter.to_owned();
1166 spawn(async move {
1167 dotter
1168 .dot(
1169 DotType::Http,
1170 ApiName::UcV4Query,
1171 true,
1172 Duration::from_millis(28),
1173 )
1174 .await
1175 .unwrap();
1176 })
1177 });
1178 tasks.push({
1179 let dotter = dotter.to_owned();
1180 spawn(async move {
1181 dotter
1182 .dot(
1183 DotType::Http,
1184 ApiName::UcV4Query,
1185 false,
1186 Duration::from_millis(30),
1187 )
1188 .await
1189 .unwrap();
1190 })
1191 });
1192 tasks.push({
1193 let dotter = dotter.to_owned();
1194 spawn(async move {
1195 dotter
1196 .dot(
1197 DotType::Http,
1198 ApiName::UcV4Query,
1199 true,
1200 Duration::from_millis(32),
1201 )
1202 .await
1203 .unwrap();
1204 })
1205 });
1206 join_all(tasks).await;
1207 sleep(Duration::from_secs(5)).await;
1208 {
1209 let record = records_map
1210 .read_async(
1211 &DotRecordKey::new(DotType::Sdk, ApiName::UcV4Query),
1212 |_, record| record.to_owned(),
1213 )
1214 .await
1215 .unwrap();
1216 assert_eq!(record.success_count(), Some(2));
1217 assert_eq!(record.failed_count(), Some(1));
1218 assert_eq!(record.success_avg_elapsed_duration_ms(), Some(15));
1219 assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(18));
1220 }
1221 {
1222 let record = records_map
1223 .read_async(
1224 &DotRecordKey::new(DotType::Sdk, ApiName::IoGetfile),
1225 |_, record| record.to_owned(),
1226 )
1227 .await
1228 .unwrap();
1229 assert_eq!(record.success_count(), Some(1));
1230 assert_eq!(record.failed_count(), Some(1));
1231 assert_eq!(record.success_avg_elapsed_duration_ms(), Some(10));
1232 assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(12));
1233 }
1234 {
1235 let record = records_map
1236 .read_async(
1237 &DotRecordKey::new(DotType::Http, ApiName::UcV4Query),
1238 |_, record| record.to_owned(),
1239 )
1240 .await
1241 .unwrap();
1242 assert_eq!(record.success_count(), Some(4));
1243 assert_eq!(record.failed_count(), Some(1));
1244 assert_eq!(record.success_avg_elapsed_duration_ms(), Some(28));
1245 assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(30));
1246 }
1247 {
1248 let record = records_map
1249 .read_async(
1250 &DotRecordKey::new(DotType::Http, ApiName::IoGetfile),
1251 |_, record| record.to_owned(),
1252 )
1253 .await
1254 .unwrap();
1255 assert_eq!(record.success_count(), Some(2));
1256 assert_eq!(record.failed_count(), Some(1));
1257 assert_eq!(record.success_avg_elapsed_duration_ms(), Some(21));
1258 assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(24));
1259 }
1260 });
1261 Ok(())
1262 }
1263
1264 #[tokio::test]
1265 async fn test_dotter_punish() -> Result<(), Box<dyn Error>> {
1266 env_logger::try_init().ok();
1267 clear_cache().await?;
1268 let records_map = Arc::new(AsyncDotRecordsMap::default());
1269
1270 let routes = {
1271 let records_map = records_map.to_owned();
1272 path!("v1" / "stat")
1273 .and(warp::header::value(AUTHORIZATION.as_str()))
1274 .and(warp::body::json())
1275 .then(move |authorization: HeaderValue, records: DotRecords| {
1276 assert!(authorization.to_str().unwrap().starts_with("UpToken "));
1277 let records_map = records_map.to_owned();
1278 async move {
1279 records_map.merge_with_records(records).await;
1280 Response::new(Body::empty())
1281 }
1282 })
1283 };
1284 starts_with_server!(addr, routes, {
1285 let urls = vec!["http://".to_owned() + &addr.to_string()];
1286 let dotter = Dotter::new(
1287 Timeouts::default_async_http_client(),
1288 get_credential(),
1289 BUCKET_NAME.to_owned(),
1290 urls,
1291 Some(Duration::from_millis(0)),
1292 Some(1),
1293 None,
1294 None,
1295 None,
1296 None,
1297 None,
1298 )
1299 .await;
1300
1301 let mut tasks = Vec::new();
1302 tasks.push({
1303 let dotter = dotter.to_owned();
1304 spawn(async move {
1305 dotter
1306 .dot(
1307 DotType::Sdk,
1308 ApiName::IoGetfile,
1309 true,
1310 Duration::from_millis(10),
1311 )
1312 .await
1313 .unwrap();
1314 })
1315 });
1316 for _ in 0..5 {
1317 let dotter = dotter.to_owned();
1318 tasks.push(spawn(async move {
1319 dotter.punish().await.unwrap();
1320 }));
1321 }
1322
1323 sleep(Duration::from_secs(5)).await;
1324 {
1325 let record = records_map
1326 .read_async(
1327 &DotRecordKey::new(DotType::Sdk, ApiName::IoGetfile),
1328 |_, record| record.to_owned(),
1329 )
1330 .await
1331 .unwrap();
1332 assert_eq!(record.success_count(), Some(1));
1333 assert_eq!(record.failed_count(), Some(0));
1334 assert_eq!(record.success_avg_elapsed_duration_ms(), Some(10));
1335 assert_eq!(record.failed_avg_elapsed_duration_ms(), Some(0));
1336 }
1337 {
1338 let record = records_map
1339 .read_async(&DotRecordKey::punished(), |_, record| record.to_owned())
1340 .await
1341 .unwrap();
1342 assert_eq!(record.punished_count(), Some(5));
1343 }
1344 });
1345 Ok(())
1346 }
1347
1348 async fn clear_cache() -> IoResult<()> {
1349 let cache_file_path = cache_dir_path_of(DOT_FILE_NAME).await?;
1350 remove_file(&cache_file_path).await.or_else(|err| {
1351 if err.kind() == IoErrorKind::NotFound {
1352 Ok(())
1353 } else {
1354 Err(err)
1355 }
1356 })
1357 }
1358}