1use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::path::PathBuf;
18use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
19use std::sync::{Arc, RwLock, RwLockWriteGuard};
20use std::thread;
21use std::time::{Duration, Instant};
22
23use chrono::Utc;
24
25use crate::error::ErrorKind;
26use crate::TimerId;
27use crate::{internal_metrics::UploadMetrics, Glean};
28pub use directory::process_metadata;
29use directory::{PingDirectoryManager, PingPayloadsByDirectory};
30use policy::Policy;
31use request::create_date_header_value;
32
33pub use directory::{PingMetadata, PingPayload};
34pub use request::{HeaderMap, PingRequest};
35pub use result::{UploadResult, UploadTaskAction};
36
37mod directory;
38mod policy;
39mod request;
40mod result;
41
42const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; #[derive(Debug)]
45struct RateLimiter {
46 started: Option<Instant>,
48 count: u32,
50 interval: Duration,
52 max_count: u32,
54}
55
56#[derive(PartialEq)]
58enum RateLimiterState {
59 Incrementing,
61 Throttled(u64),
66}
67
68impl RateLimiter {
69 pub fn new(interval: Duration, max_count: u32) -> Self {
70 Self {
71 started: None,
72 count: 0,
73 interval,
74 max_count,
75 }
76 }
77
78 fn reset(&mut self) {
79 self.started = Some(Instant::now());
80 self.count = 0;
81 }
82
83 fn elapsed(&self) -> Duration {
84 self.started.unwrap().elapsed()
85 }
86
87 fn should_reset(&self) -> bool {
93 if self.started.is_none() {
94 return true;
95 }
96
97 if self.elapsed() > self.interval {
99 return true;
100 }
101
102 false
103 }
104
105 pub fn get_state(&mut self) -> RateLimiterState {
111 if self.should_reset() {
112 self.reset();
113 }
114
115 if self.count == self.max_count {
116 let remaining = self.interval.as_millis() - self.elapsed().as_millis();
119 return RateLimiterState::Throttled(
120 remaining
121 .try_into()
122 .unwrap_or(self.interval.as_secs() * 1000),
123 );
124 }
125
126 self.count += 1;
127 RateLimiterState::Incrementing
128 }
129}
130
131#[derive(PartialEq, Eq, Debug)]
136pub enum PingUploadTask {
137 Upload {
139 request: PingRequest,
142 },
143
144 Wait {
147 time: u64,
150 },
151
152 Done {
165 #[doc(hidden)]
166 unused: i8,
168 },
169}
170
171impl PingUploadTask {
172 pub fn is_upload(&self) -> bool {
174 matches!(self, PingUploadTask::Upload { .. })
175 }
176
177 pub fn is_wait(&self) -> bool {
179 matches!(self, PingUploadTask::Wait { .. })
180 }
181
182 pub(crate) fn done() -> Self {
183 PingUploadTask::Done { unused: 0 }
184 }
185}
186
187#[derive(Debug)]
189pub struct PingUploadManager {
190 queue: RwLock<VecDeque<PingRequest>>,
192 directory_manager: PingDirectoryManager,
194 processed_pending_pings: Arc<AtomicBool>,
196 cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
198 recoverable_failure_count: AtomicU32,
200 wait_attempt_count: AtomicU32,
202 rate_limiter: Option<RwLock<RateLimiter>>,
207 language_binding_name: String,
211 upload_metrics: UploadMetrics,
213 policy: Policy,
215
216 in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
217}
218
219impl PingUploadManager {
220 pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
231 Self {
232 queue: RwLock::new(VecDeque::new()),
233 directory_manager: PingDirectoryManager::new(data_path),
234 processed_pending_pings: Arc::new(AtomicBool::new(false)),
235 cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
236 recoverable_failure_count: AtomicU32::new(0),
237 wait_attempt_count: AtomicU32::new(0),
238 rate_limiter: None,
239 language_binding_name: language_binding_name.into(),
240 upload_metrics: UploadMetrics::new(),
241 policy: Policy::default(),
242 in_flight: RwLock::new(HashMap::default()),
243 }
244 }
245
246 pub fn scan_pending_pings_directories(
253 &self,
254 trigger_upload: bool,
255 ) -> std::thread::JoinHandle<()> {
256 let local_manager = self.directory_manager.clone();
257 let local_cached_pings = self.cached_pings.clone();
258 let local_flag = self.processed_pending_pings.clone();
259 thread::Builder::new()
260 .name("glean.ping_directory_manager.process_dir".to_string())
261 .spawn(move || {
262 {
263 let mut local_cached_pings = local_cached_pings
265 .write()
266 .expect("Can't write to pending pings cache.");
267 local_cached_pings.extend(local_manager.process_dirs());
268 local_flag.store(true, Ordering::SeqCst);
269 }
270 if trigger_upload {
271 crate::dispatcher::launch(|| {
272 if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok())
273 {
274 if let Err(e) = state.callbacks.trigger_upload() {
275 log::error!(
276 "Triggering upload after pending ping scan failed. Error: {}",
277 e
278 );
279 }
280 }
281 });
282 }
283 })
284 .expect("Unable to spawn thread to process pings directories.")
285 }
286
287 #[cfg(test)]
289 pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
290 let mut upload_manager = Self::new(data_path, "Test");
291
292 upload_manager.policy.set_max_recoverable_failures(None);
294 upload_manager.policy.set_max_wait_attempts(None);
295 upload_manager.policy.set_max_ping_body_size(None);
296 upload_manager
297 .policy
298 .set_max_pending_pings_directory_size(None);
299 upload_manager.policy.set_max_pending_pings_count(None);
300
301 upload_manager
303 .scan_pending_pings_directories(false)
304 .join()
305 .unwrap();
306
307 upload_manager
308 }
309
310 fn processed_pending_pings(&self) -> bool {
311 self.processed_pending_pings.load(Ordering::SeqCst)
312 }
313
314 fn recoverable_failure_count(&self) -> u32 {
315 self.recoverable_failure_count.load(Ordering::SeqCst)
316 }
317
318 fn wait_attempt_count(&self) -> u32 {
319 self.wait_attempt_count.load(Ordering::SeqCst)
320 }
321
322 fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
327 let PingPayload {
328 document_id,
329 upload_path: path,
330 json_body: body,
331 headers,
332 body_has_info_sections,
333 ping_name,
334 uploader_capabilities,
335 } = ping;
336 let mut request = PingRequest::builder(
337 &self.language_binding_name,
338 self.policy.max_ping_body_size(),
339 )
340 .document_id(&document_id)
341 .path(path)
342 .body(body)
343 .body_has_info_sections(body_has_info_sections)
344 .ping_name(ping_name)
345 .uploader_capabilities(uploader_capabilities);
346
347 if let Some(headers) = headers {
348 request = request.headers(headers);
349 }
350
351 match request.build() {
352 Ok(request) => Some(request),
353 Err(e) => {
354 log::warn!("Error trying to build ping request: {}", e);
355 self.directory_manager.delete_file(&document_id);
356
357 if let ErrorKind::PingBodyOverflow(s) = e.kind() {
360 self.upload_metrics
361 .discarded_exceeding_pings_size
362 .accumulate_sync(glean, *s as i64 / 1024);
363 }
364
365 None
366 }
367 }
368 }
369
370 pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
372 let mut queue = self
373 .queue
374 .write()
375 .expect("Can't write to pending pings queue.");
376
377 let PingPayload {
378 ref document_id,
379 upload_path: ref path,
380 ..
381 } = ping;
382 if queue
384 .iter()
385 .any(|request| request.document_id.as_str() == document_id)
386 {
387 log::warn!(
388 "Attempted to enqueue a duplicate ping {} at {}.",
389 document_id,
390 path
391 );
392 return;
393 }
394
395 {
396 let in_flight = self.in_flight.read().unwrap();
397 if in_flight.contains_key(document_id) {
398 log::warn!(
399 "Attempted to enqueue an in-flight ping {} at {}.",
400 document_id,
401 path
402 );
403 self.upload_metrics
404 .in_flight_pings_dropped
405 .add_sync(glean, 0);
406 return;
407 }
408 }
409
410 log::trace!("Enqueuing ping {} at {}", document_id, path);
411 if let Some(request) = self.build_ping_request(glean, ping) {
412 queue.push_back(request)
413 }
414 }
415
416 fn enqueue_cached_pings(&self, glean: &Glean) {
433 let mut cached_pings = self
434 .cached_pings
435 .write()
436 .expect("Can't write to pending pings cache.");
437
438 if cached_pings.len() > 0 {
439 let mut pending_pings_directory_size: u64 = 0;
440 let mut pending_pings_count = 0;
441 let mut deleting = false;
442
443 let total = cached_pings.pending_pings.len() as u64;
444 self.upload_metrics
445 .pending_pings
446 .add_sync(glean, total.try_into().unwrap_or(0));
447
448 if total > self.policy.max_pending_pings_count() {
449 log::warn!(
450 "More than {} pending pings in the directory, will delete {} old pings.",
451 self.policy.max_pending_pings_count(),
452 total - self.policy.max_pending_pings_count()
453 );
454 }
455
456 cached_pings.pending_pings.reverse();
462 cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
463 pending_pings_count += 1;
464 pending_pings_directory_size += file_size;
465
466 if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
468 log::warn!(
469 "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
470 self.policy.max_pending_pings_directory_size()
471 );
472 deleting = true;
473 }
474
475 if pending_pings_count > self.policy.max_pending_pings_count() {
479 deleting = true;
480 }
481
482 if deleting && self.directory_manager.delete_file(document_id) {
483 self.upload_metrics
484 .deleted_pings_after_quota_hit
485 .add_sync(glean, 1);
486 return false;
487 }
488
489 true
490 });
491 cached_pings.pending_pings.reverse();
494 self.upload_metrics
495 .pending_pings_directory_size
496 .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
497
498 cached_pings
501 .deletion_request_pings
502 .drain(..)
503 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
504 cached_pings
505 .pending_pings
506 .drain(..)
507 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
508 }
509 }
510
511 pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
524 self.rate_limiter = Some(RwLock::new(RateLimiter::new(
525 Duration::from_secs(interval),
526 max_tasks,
527 )));
528 }
529
530 pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
539 if let Some(ping) = self.directory_manager.process_file(document_id) {
540 self.enqueue_ping(glean, ping);
541 }
542 }
543
544 pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
546 log::trace!("Clearing ping queue");
547 let mut queue = self
548 .queue
549 .write()
550 .expect("Can't write to pending pings queue.");
551
552 queue.retain(|ping| ping.is_deletion_request());
553 log::trace!(
554 "{} pings left in the queue (only deletion-request expected)",
555 queue.len()
556 );
557 queue
558 }
559
560 fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
561 let wait_or_done = |time: u64| {
566 self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
567 if self.wait_attempt_count() > self.policy.max_wait_attempts() {
568 PingUploadTask::done()
569 } else {
570 PingUploadTask::Wait { time }
571 }
572 };
573
574 if !self.processed_pending_pings() {
575 log::info!(
576 "Tried getting an upload task, but processing is ongoing. Will come back later."
577 );
578 return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
579 }
580
581 self.enqueue_cached_pings(glean);
583
584 if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
585 log::warn!(
586 "Reached maximum recoverable failures for the current uploading window. You are done."
587 );
588 return PingUploadTask::done();
589 }
590
591 let mut queue = self
592 .queue
593 .write()
594 .expect("Can't write to pending pings queue.");
595 match queue.front() {
596 Some(request) => {
597 if let Some(rate_limiter) = &self.rate_limiter {
598 let mut rate_limiter = rate_limiter
599 .write()
600 .expect("Can't write to the rate limiter.");
601 if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
602 log::info!(
603 "Tried getting an upload task, but we are throttled at the moment."
604 );
605 return wait_or_done(remaining);
606 }
607 }
608
609 log::info!(
610 "New upload task with id {} (path: {})",
611 request.document_id,
612 request.path
613 );
614
615 if log_ping {
616 if let Some(body) = request.pretty_body() {
617 chunked_log_info(&request.path, &body);
618 } else {
619 chunked_log_info(&request.path, "<invalid ping payload>");
620 }
621 }
622
623 {
624 let mut in_flight = self.in_flight.write().unwrap();
628 let success_id = self.upload_metrics.send_success.start_sync();
629 let failure_id = self.upload_metrics.send_failure.start_sync();
630 in_flight.insert(request.document_id.clone(), (success_id, failure_id));
631 }
632
633 let mut request = queue.pop_front().unwrap();
634
635 request
637 .headers
638 .insert("Date".to_string(), create_date_header_value(Utc::now()));
639
640 PingUploadTask::Upload { request }
641 }
642 None => {
643 log::info!("No more pings to upload! You are done.");
644 PingUploadTask::done()
645 }
646 }
647 }
648
649 pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
660 let task = self.get_upload_task_internal(glean, log_ping);
661
662 if !task.is_wait() && self.wait_attempt_count() > 0 {
663 self.wait_attempt_count.store(0, Ordering::SeqCst);
664 }
665
666 if !task.is_upload() && self.recoverable_failure_count() > 0 {
667 self.recoverable_failure_count.store(0, Ordering::SeqCst);
668 }
669
670 task
671 }
672
673 pub fn process_ping_upload_response(
712 &self,
713 glean: &Glean,
714 document_id: &str,
715 status: UploadResult,
716 ) -> UploadTaskAction {
717 use UploadResult::*;
718
719 let stop_time = time::precise_time_ns();
720
721 if let Some(label) = status.get_label() {
722 let metric = self.upload_metrics.ping_upload_failure.get(label);
723 metric.add_sync(glean, 1);
724 }
725
726 let send_ids = {
727 let mut lock = self.in_flight.write().unwrap();
728 lock.remove(document_id)
729 };
730
731 if send_ids.is_none() {
732 self.upload_metrics.missing_send_ids.add_sync(glean, 1);
733 }
734
735 match status {
736 HttpStatus { code } if (200..=299).contains(&code) => {
737 log::info!("Ping {} successfully sent {}.", document_id, code);
738 if let Some((success_id, failure_id)) = send_ids {
739 self.upload_metrics
740 .send_success
741 .set_stop_and_accumulate(glean, success_id, stop_time);
742 self.upload_metrics.send_failure.cancel_sync(failure_id);
743 }
744 self.directory_manager.delete_file(document_id);
745 }
746
747 UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
748 log::warn!(
749 "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
750 document_id,
751 status
752 );
753 if let Some((success_id, failure_id)) = send_ids {
754 self.upload_metrics.send_success.cancel_sync(success_id);
755 self.upload_metrics
756 .send_failure
757 .set_stop_and_accumulate(glean, failure_id, stop_time);
758 }
759 self.directory_manager.delete_file(document_id);
760 }
761
762 RecoverableFailure { .. } | HttpStatus { .. } => {
763 log::warn!(
764 "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
765 document_id,
766 status
767 );
768 if let Some((success_id, failure_id)) = send_ids {
769 self.upload_metrics.send_success.cancel_sync(success_id);
770 self.upload_metrics
771 .send_failure
772 .set_stop_and_accumulate(glean, failure_id, stop_time);
773 }
774 self.enqueue_ping_from_file(glean, document_id);
775 self.recoverable_failure_count
776 .fetch_add(1, Ordering::SeqCst);
777 }
778
779 Done { .. } => {
780 log::debug!("Uploader signaled Done. Exiting.");
781 if let Some((success_id, failure_id)) = send_ids {
782 self.upload_metrics.send_success.cancel_sync(success_id);
783 self.upload_metrics.send_failure.cancel_sync(failure_id);
784 }
785 return UploadTaskAction::End;
786 }
787 };
788
789 UploadTaskAction::Next
790 }
791}
792
793#[cfg(target_os = "android")]
795pub fn chunked_log_info(path: &str, payload: &str) {
796 const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
800
801 if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
805 log::info!("Glean ping to URL: {}\n{}", path, payload);
806 return;
807 }
808
809 let mut start = 0;
812 let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
813 let mut chunk_idx = 1;
814 let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
816
817 while end < payload.len() {
818 for _ in 0..4 {
821 if payload.is_char_boundary(end) {
822 break;
823 }
824 end -= 1;
825 }
826
827 log::info!(
828 "Glean ping to URL: {} [Part {} of {}]\n{}",
829 path,
830 chunk_idx,
831 total_chunks,
832 &payload[start..end]
833 );
834
835 start = end;
837 end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
838 chunk_idx += 1;
839 }
840
841 if start < payload.len() {
843 log::info!(
844 "Glean ping to URL: {} [Part {} of {}]\n{}",
845 path,
846 chunk_idx,
847 total_chunks,
848 &payload[start..]
849 );
850 }
851}
852
853#[cfg(not(target_os = "android"))]
855pub fn chunked_log_info(_path: &str, payload: &str) {
856 log::info!("{}", payload)
857}
858
859#[cfg(test)]
860mod test {
861 use uuid::Uuid;
862
863 use super::*;
864 use crate::metrics::PingType;
865 use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
866
867 const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
868
869 #[test]
870 fn doesnt_error_when_there_are_no_pending_pings() {
871 let (glean, _t) = new_glean(None);
872
873 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
876 }
877
878 #[test]
879 fn returns_ping_request_when_there_is_one() {
880 let (glean, dir) = new_glean(None);
881
882 let upload_manager = PingUploadManager::no_policy(dir.path());
883
884 upload_manager.enqueue_ping(
886 &glean,
887 PingPayload {
888 document_id: Uuid::new_v4().to_string(),
889 upload_path: PATH.into(),
890 json_body: "".into(),
891 headers: None,
892 body_has_info_sections: true,
893 ping_name: "ping-name".into(),
894 uploader_capabilities: vec![],
895 },
896 );
897
898 let task = upload_manager.get_upload_task(&glean, false);
901 assert!(task.is_upload());
902 }
903
904 #[test]
905 fn returns_as_many_ping_requests_as_there_are() {
906 let (glean, dir) = new_glean(None);
907
908 let upload_manager = PingUploadManager::no_policy(dir.path());
909
910 let n = 10;
912 for _ in 0..n {
913 upload_manager.enqueue_ping(
914 &glean,
915 PingPayload {
916 document_id: Uuid::new_v4().to_string(),
917 upload_path: PATH.into(),
918 json_body: "".into(),
919 headers: None,
920 body_has_info_sections: true,
921 ping_name: "ping-name".into(),
922 uploader_capabilities: vec![],
923 },
924 );
925 }
926
927 for _ in 0..n {
929 let task = upload_manager.get_upload_task(&glean, false);
930 assert!(task.is_upload());
931 }
932
933 assert_eq!(
935 upload_manager.get_upload_task(&glean, false),
936 PingUploadTask::done()
937 );
938 }
939
940 #[test]
941 fn limits_the_number_of_pings_when_there_is_rate_limiting() {
942 let (glean, dir) = new_glean(None);
943
944 let mut upload_manager = PingUploadManager::no_policy(dir.path());
945
946 let max_pings_per_interval = 10;
948 upload_manager.set_rate_limiter(3, 10);
949
950 for _ in 0..max_pings_per_interval {
952 upload_manager.enqueue_ping(
953 &glean,
954 PingPayload {
955 document_id: Uuid::new_v4().to_string(),
956 upload_path: PATH.into(),
957 json_body: "".into(),
958 headers: None,
959 body_has_info_sections: true,
960 ping_name: "ping-name".into(),
961 uploader_capabilities: vec![],
962 },
963 );
964 }
965
966 for _ in 0..max_pings_per_interval {
968 let task = upload_manager.get_upload_task(&glean, false);
969 assert!(task.is_upload());
970 }
971
972 upload_manager.enqueue_ping(
974 &glean,
975 PingPayload {
976 document_id: Uuid::new_v4().to_string(),
977 upload_path: PATH.into(),
978 json_body: "".into(),
979 headers: None,
980 body_has_info_sections: true,
981 ping_name: "ping-name".into(),
982 uploader_capabilities: vec![],
983 },
984 );
985
986 match upload_manager.get_upload_task(&glean, false) {
988 PingUploadTask::Wait { time } => {
989 thread::sleep(Duration::from_millis(time));
991 }
992 _ => panic!("Expected upload manager to return a wait task!"),
993 };
994
995 let task = upload_manager.get_upload_task(&glean, false);
996 assert!(task.is_upload());
997 }
998
999 #[test]
1000 fn clearing_the_queue_works_correctly() {
1001 let (glean, dir) = new_glean(None);
1002
1003 let upload_manager = PingUploadManager::no_policy(dir.path());
1004
1005 for _ in 0..10 {
1007 upload_manager.enqueue_ping(
1008 &glean,
1009 PingPayload {
1010 document_id: Uuid::new_v4().to_string(),
1011 upload_path: PATH.into(),
1012 json_body: "".into(),
1013 headers: None,
1014 body_has_info_sections: true,
1015 ping_name: "ping-name".into(),
1016 uploader_capabilities: vec![],
1017 },
1018 );
1019 }
1020
1021 drop(upload_manager.clear_ping_queue());
1023
1024 assert_eq!(
1026 upload_manager.get_upload_task(&glean, false),
1027 PingUploadTask::done()
1028 );
1029 }
1030
1031 #[test]
1032 fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1033 let (mut glean, _t) = new_glean(None);
1034
1035 let ping_type = PingType::new(
1037 "test",
1038 true,
1039 true,
1040 true,
1041 true,
1042 true,
1043 vec![],
1044 vec![],
1045 true,
1046 vec![],
1047 );
1048 glean.register_ping_type(&ping_type);
1049
1050 let n = 10;
1052 for _ in 0..n {
1053 ping_type.submit_sync(&glean, None);
1054 }
1055
1056 glean
1057 .internal_pings
1058 .deletion_request
1059 .submit_sync(&glean, None);
1060
1061 drop(glean.upload_manager.clear_ping_queue());
1063
1064 let upload_task = glean.get_upload_task();
1065 match upload_task {
1066 PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1067 _ => panic!("Expected upload manager to return the next request!"),
1068 }
1069
1070 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1072 }
1073
1074 #[test]
1075 fn fills_up_queue_successfully_from_disk() {
1076 let (mut glean, dir) = new_glean(None);
1077
1078 let ping_type = PingType::new(
1080 "test",
1081 true,
1082 true,
1083 true,
1084 true,
1085 true,
1086 vec![],
1087 vec![],
1088 true,
1089 vec![],
1090 );
1091 glean.register_ping_type(&ping_type);
1092
1093 let n = 10;
1095 for _ in 0..n {
1096 ping_type.submit_sync(&glean, None);
1097 }
1098
1099 let upload_manager = PingUploadManager::no_policy(dir.path());
1101
1102 for _ in 0..n {
1104 let task = upload_manager.get_upload_task(&glean, false);
1105 assert!(task.is_upload());
1106 }
1107
1108 assert_eq!(
1110 upload_manager.get_upload_task(&glean, false),
1111 PingUploadTask::done()
1112 );
1113 }
1114
1115 #[test]
1116 fn processes_correctly_success_upload_response() {
1117 let (mut glean, dir) = new_glean(None);
1118
1119 let ping_type = PingType::new(
1121 "test",
1122 true,
1123 true,
1124 true,
1125 true,
1126 true,
1127 vec![],
1128 vec![],
1129 true,
1130 vec![],
1131 );
1132 glean.register_ping_type(&ping_type);
1133
1134 ping_type.submit_sync(&glean, None);
1136
1137 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1139
1140 match glean.get_upload_task() {
1142 PingUploadTask::Upload { request } => {
1143 let document_id = request.document_id;
1145 glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1146 assert!(!pending_pings_dir.join(document_id).exists());
1148 }
1149 _ => panic!("Expected upload manager to return the next request!"),
1150 }
1151
1152 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1154 }
1155
1156 #[test]
1157 fn processes_correctly_client_error_upload_response() {
1158 let (mut glean, dir) = new_glean(None);
1159
1160 let ping_type = PingType::new(
1162 "test",
1163 true,
1164 true,
1165 true,
1166 true,
1167 true,
1168 vec![],
1169 vec![],
1170 true,
1171 vec![],
1172 );
1173 glean.register_ping_type(&ping_type);
1174
1175 ping_type.submit_sync(&glean, None);
1177
1178 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1180
1181 match glean.get_upload_task() {
1183 PingUploadTask::Upload { request } => {
1184 let document_id = request.document_id;
1186 glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1187 assert!(!pending_pings_dir.join(document_id).exists());
1189 }
1190 _ => panic!("Expected upload manager to return the next request!"),
1191 }
1192
1193 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1195 }
1196
1197 #[test]
1198 fn processes_correctly_server_error_upload_response() {
1199 let (mut glean, _t) = new_glean(None);
1200
1201 let ping_type = PingType::new(
1203 "test",
1204 true,
1205 true,
1206 true,
1207 true,
1208 true,
1209 vec![],
1210 vec![],
1211 true,
1212 vec![],
1213 );
1214 glean.register_ping_type(&ping_type);
1215
1216 ping_type.submit_sync(&glean, None);
1218
1219 match glean.get_upload_task() {
1221 PingUploadTask::Upload { request } => {
1222 let document_id = request.document_id;
1224 glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1225 match glean.get_upload_task() {
1227 PingUploadTask::Upload { request } => {
1228 assert_eq!(document_id, request.document_id);
1229 }
1230 _ => panic!("Expected upload manager to return the next request!"),
1231 }
1232 }
1233 _ => panic!("Expected upload manager to return the next request!"),
1234 }
1235
1236 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1238 }
1239
1240 #[test]
1241 fn processes_correctly_unrecoverable_upload_response() {
1242 let (mut glean, dir) = new_glean(None);
1243
1244 let ping_type = PingType::new(
1246 "test",
1247 true,
1248 true,
1249 true,
1250 true,
1251 true,
1252 vec![],
1253 vec![],
1254 true,
1255 vec![],
1256 );
1257 glean.register_ping_type(&ping_type);
1258
1259 ping_type.submit_sync(&glean, None);
1261
1262 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1264
1265 match glean.get_upload_task() {
1267 PingUploadTask::Upload { request } => {
1268 let document_id = request.document_id;
1270 glean.process_ping_upload_response(
1271 &document_id,
1272 UploadResult::unrecoverable_failure(),
1273 );
1274 assert!(!pending_pings_dir.join(document_id).exists());
1276 }
1277 _ => panic!("Expected upload manager to return the next request!"),
1278 }
1279
1280 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1282 }
1283
1284 #[test]
1285 fn new_pings_are_added_while_upload_in_progress() {
1286 let (glean, dir) = new_glean(None);
1287
1288 let upload_manager = PingUploadManager::no_policy(dir.path());
1289
1290 let doc1 = Uuid::new_v4().to_string();
1291 let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1292
1293 let doc2 = Uuid::new_v4().to_string();
1294 let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1295
1296 upload_manager.enqueue_ping(
1298 &glean,
1299 PingPayload {
1300 document_id: doc1.clone(),
1301 upload_path: path1,
1302 json_body: "".into(),
1303 headers: None,
1304 body_has_info_sections: true,
1305 ping_name: "test-ping".into(),
1306 uploader_capabilities: vec![],
1307 },
1308 );
1309
1310 let req = match upload_manager.get_upload_task(&glean, false) {
1312 PingUploadTask::Upload { request } => request,
1313 _ => panic!("Expected upload manager to return the next request!"),
1314 };
1315 assert_eq!(doc1, req.document_id);
1316
1317 upload_manager.enqueue_ping(
1319 &glean,
1320 PingPayload {
1321 document_id: doc2.clone(),
1322 upload_path: path2,
1323 json_body: "".into(),
1324 headers: None,
1325 body_has_info_sections: true,
1326 ping_name: "test-ping".into(),
1327 uploader_capabilities: vec![],
1328 },
1329 );
1330
1331 upload_manager.process_ping_upload_response(
1333 &glean,
1334 &req.document_id,
1335 UploadResult::http_status(200),
1336 );
1337
1338 let req = match upload_manager.get_upload_task(&glean, false) {
1340 PingUploadTask::Upload { request } => request,
1341 _ => panic!("Expected upload manager to return the next request!"),
1342 };
1343 assert_eq!(doc2, req.document_id);
1344
1345 upload_manager.process_ping_upload_response(
1347 &glean,
1348 &req.document_id,
1349 UploadResult::http_status(200),
1350 );
1351
1352 assert_eq!(
1354 upload_manager.get_upload_task(&glean, false),
1355 PingUploadTask::done()
1356 );
1357 }
1358
1359 #[test]
1360 fn adds_debug_view_header_to_requests_when_tag_is_set() {
1361 let (mut glean, _t) = new_glean(None);
1362
1363 glean.set_debug_view_tag("valid-tag");
1364
1365 let ping_type = PingType::new(
1367 "test",
1368 true,
1369 true,
1370 true,
1371 true,
1372 true,
1373 vec![],
1374 vec![],
1375 true,
1376 vec![],
1377 );
1378 glean.register_ping_type(&ping_type);
1379
1380 ping_type.submit_sync(&glean, None);
1382
1383 match glean.get_upload_task() {
1385 PingUploadTask::Upload { request } => {
1386 assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1387 }
1388 _ => panic!("Expected upload manager to return the next request!"),
1389 }
1390 }
1391
1392 #[test]
1393 fn duplicates_are_not_enqueued() {
1394 let (glean, dir) = new_glean(None);
1395
1396 let upload_manager = PingUploadManager::no_policy(dir.path());
1399
1400 let doc_id = Uuid::new_v4().to_string();
1401 let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1402
1403 upload_manager.enqueue_ping(
1405 &glean,
1406 PingPayload {
1407 document_id: doc_id.clone(),
1408 upload_path: path.clone(),
1409 json_body: "".into(),
1410 headers: None,
1411 body_has_info_sections: true,
1412 ping_name: "test-ping".into(),
1413 uploader_capabilities: vec![],
1414 },
1415 );
1416 upload_manager.enqueue_ping(
1417 &glean,
1418 PingPayload {
1419 document_id: doc_id,
1420 upload_path: path,
1421 json_body: "".into(),
1422 headers: None,
1423 body_has_info_sections: true,
1424 ping_name: "test-ping".into(),
1425 uploader_capabilities: vec![],
1426 },
1427 );
1428
1429 let task = upload_manager.get_upload_task(&glean, false);
1431 assert!(task.is_upload());
1432
1433 assert_eq!(
1435 upload_manager.get_upload_task(&glean, false),
1436 PingUploadTask::done()
1437 );
1438 }
1439
1440 #[test]
1441 fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1442 let (mut glean, dir) = new_glean(None);
1443
1444 let ping_type = PingType::new(
1446 "test",
1447 true,
1448 true,
1449 true,
1450 true,
1451 true,
1452 vec![],
1453 vec![],
1454 true,
1455 vec![],
1456 );
1457 glean.register_ping_type(&ping_type);
1458
1459 let n = 5;
1461 for _ in 0..n {
1462 ping_type.submit_sync(&glean, None);
1463 }
1464
1465 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1466
1467 let max_recoverable_failures = 3;
1469 upload_manager
1470 .policy
1471 .set_max_recoverable_failures(Some(max_recoverable_failures));
1472
1473 for _ in 0..max_recoverable_failures {
1475 match upload_manager.get_upload_task(&glean, false) {
1476 PingUploadTask::Upload { request } => {
1477 upload_manager.process_ping_upload_response(
1478 &glean,
1479 &request.document_id,
1480 UploadResult::recoverable_failure(),
1481 );
1482 }
1483 _ => panic!("Expected upload manager to return the next request!"),
1484 }
1485 }
1486
1487 assert_eq!(
1490 upload_manager.get_upload_task(&glean, false),
1491 PingUploadTask::done()
1492 );
1493
1494 for _ in 0..n {
1496 let task = upload_manager.get_upload_task(&glean, false);
1497 assert!(task.is_upload());
1498 }
1499 }
1500
1501 #[test]
1502 fn quota_is_enforced_when_enqueueing_cached_pings() {
1503 let (mut glean, dir) = new_glean(None);
1504
1505 let ping_type = PingType::new(
1507 "test",
1508 true,
1509 true,
1510 true,
1511 true,
1512 true,
1513 vec![],
1514 vec![],
1515 true,
1516 vec![],
1517 );
1518 glean.register_ping_type(&ping_type);
1519
1520 let n = 10;
1522 for _ in 0..n {
1523 ping_type.submit_sync(&glean, None);
1524 }
1525
1526 let directory_manager = PingDirectoryManager::new(dir.path());
1527 let pending_pings = directory_manager.process_dirs().pending_pings;
1528 let (_, newest_ping) = &pending_pings.last().unwrap();
1531 let PingPayload {
1532 document_id: newest_ping_id,
1533 ..
1534 } = &newest_ping;
1535
1536 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1538
1539 upload_manager
1546 .policy
1547 .set_max_pending_pings_directory_size(Some(500));
1548
1549 match upload_manager.get_upload_task(&glean, false) {
1553 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1554 _ => panic!("Expected upload manager to return the next request!"),
1555 }
1556
1557 assert_eq!(
1560 upload_manager.get_upload_task(&glean, false),
1561 PingUploadTask::done()
1562 );
1563
1564 assert_eq!(
1566 n - 1,
1567 upload_manager
1568 .upload_metrics
1569 .deleted_pings_after_quota_hit
1570 .get_value(&glean, Some("metrics"))
1571 .unwrap()
1572 );
1573 assert_eq!(
1574 n,
1575 upload_manager
1576 .upload_metrics
1577 .pending_pings
1578 .get_value(&glean, Some("metrics"))
1579 .unwrap()
1580 );
1581 }
1582
1583 #[test]
1584 fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1585 let (mut glean, dir) = new_glean(None);
1586
1587 let ping_type = PingType::new(
1589 "test",
1590 true,
1591 true,
1592 true,
1593 true,
1594 true,
1595 vec![],
1596 vec![],
1597 true,
1598 vec![],
1599 );
1600 glean.register_ping_type(&ping_type);
1601
1602 let count_quota = 3;
1604 let n = 10;
1606
1607 for _ in 0..n {
1609 ping_type.submit_sync(&glean, None);
1610 }
1611
1612 let directory_manager = PingDirectoryManager::new(dir.path());
1613 let pending_pings = directory_manager.process_dirs().pending_pings;
1614 let expected_pings = pending_pings
1617 .iter()
1618 .rev()
1619 .take(count_quota)
1620 .map(|(_, ping)| ping.document_id.clone())
1621 .collect::<Vec<_>>();
1622
1623 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1625
1626 upload_manager
1627 .policy
1628 .set_max_pending_pings_count(Some(count_quota as u64));
1629
1630 for ping_id in expected_pings.iter().rev() {
1634 match upload_manager.get_upload_task(&glean, false) {
1635 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1636 _ => panic!("Expected upload manager to return the next request!"),
1637 }
1638 }
1639
1640 assert_eq!(
1643 upload_manager.get_upload_task(&glean, false),
1644 PingUploadTask::done()
1645 );
1646
1647 assert_eq!(
1649 (n - count_quota) as i32,
1650 upload_manager
1651 .upload_metrics
1652 .deleted_pings_after_quota_hit
1653 .get_value(&glean, Some("metrics"))
1654 .unwrap()
1655 );
1656 assert_eq!(
1657 n as i32,
1658 upload_manager
1659 .upload_metrics
1660 .pending_pings
1661 .get_value(&glean, Some("metrics"))
1662 .unwrap()
1663 );
1664 }
1665
1666 #[test]
1667 fn size_and_count_quota_work_together_size_first() {
1668 let (mut glean, dir) = new_glean(None);
1669
1670 let ping_type = PingType::new(
1672 "test",
1673 true,
1674 true,
1675 true,
1676 true,
1677 true,
1678 vec![],
1679 vec![],
1680 true,
1681 vec![],
1682 );
1683 glean.register_ping_type(&ping_type);
1684
1685 let expected_number_of_pings = 3;
1686 let n = 10;
1688
1689 for _ in 0..n {
1691 ping_type.submit_sync(&glean, None);
1692 }
1693
1694 let directory_manager = PingDirectoryManager::new(dir.path());
1695 let pending_pings = directory_manager.process_dirs().pending_pings;
1696 let expected_pings = pending_pings
1699 .iter()
1700 .rev()
1701 .take(expected_number_of_pings)
1702 .map(|(_, ping)| ping.document_id.clone())
1703 .collect::<Vec<_>>();
1704
1705 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1707
1708 upload_manager
1711 .policy
1712 .set_max_pending_pings_directory_size(Some(1300));
1713 upload_manager.policy.set_max_pending_pings_count(Some(5));
1714
1715 for ping_id in expected_pings.iter().rev() {
1719 match upload_manager.get_upload_task(&glean, false) {
1720 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1721 _ => panic!("Expected upload manager to return the next request!"),
1722 }
1723 }
1724
1725 assert_eq!(
1728 upload_manager.get_upload_task(&glean, false),
1729 PingUploadTask::done()
1730 );
1731
1732 assert_eq!(
1734 (n - expected_number_of_pings) as i32,
1735 upload_manager
1736 .upload_metrics
1737 .deleted_pings_after_quota_hit
1738 .get_value(&glean, Some("metrics"))
1739 .unwrap()
1740 );
1741 assert_eq!(
1742 n as i32,
1743 upload_manager
1744 .upload_metrics
1745 .pending_pings
1746 .get_value(&glean, Some("metrics"))
1747 .unwrap()
1748 );
1749 }
1750
1751 #[test]
1752 fn size_and_count_quota_work_together_count_first() {
1753 let (mut glean, dir) = new_glean(None);
1754
1755 let ping_type = PingType::new(
1757 "test",
1758 true,
1759 true,
1760 true,
1761 true,
1762 true,
1763 vec![],
1764 vec![],
1765 true,
1766 vec![],
1767 );
1768 glean.register_ping_type(&ping_type);
1769
1770 let expected_number_of_pings = 2;
1771 let n = 10;
1773
1774 for _ in 0..n {
1776 ping_type.submit_sync(&glean, None);
1777 }
1778
1779 let directory_manager = PingDirectoryManager::new(dir.path());
1780 let pending_pings = directory_manager.process_dirs().pending_pings;
1781 let expected_pings = pending_pings
1784 .iter()
1785 .rev()
1786 .take(expected_number_of_pings)
1787 .map(|(_, ping)| ping.document_id.clone())
1788 .collect::<Vec<_>>();
1789
1790 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1792
1793 upload_manager
1796 .policy
1797 .set_max_pending_pings_directory_size(Some(1000));
1798 upload_manager.policy.set_max_pending_pings_count(Some(2));
1799
1800 for ping_id in expected_pings.iter().rev() {
1804 match upload_manager.get_upload_task(&glean, false) {
1805 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1806 _ => panic!("Expected upload manager to return the next request!"),
1807 }
1808 }
1809
1810 assert_eq!(
1813 upload_manager.get_upload_task(&glean, false),
1814 PingUploadTask::done()
1815 );
1816
1817 assert_eq!(
1819 (n - expected_number_of_pings) as i32,
1820 upload_manager
1821 .upload_metrics
1822 .deleted_pings_after_quota_hit
1823 .get_value(&glean, Some("metrics"))
1824 .unwrap()
1825 );
1826 assert_eq!(
1827 n as i32,
1828 upload_manager
1829 .upload_metrics
1830 .pending_pings
1831 .get_value(&glean, Some("metrics"))
1832 .unwrap()
1833 );
1834 }
1835
1836 #[test]
1837 fn maximum_wait_attemps_is_enforced() {
1838 let (glean, dir) = new_glean(None);
1839
1840 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1841
1842 let max_wait_attempts = 3;
1844 upload_manager
1845 .policy
1846 .set_max_wait_attempts(Some(max_wait_attempts));
1847
1848 let secs_per_interval = 5;
1854 let max_pings_per_interval = 1;
1855 upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
1856
1857 upload_manager.enqueue_ping(
1859 &glean,
1860 PingPayload {
1861 document_id: Uuid::new_v4().to_string(),
1862 upload_path: PATH.into(),
1863 json_body: "".into(),
1864 headers: None,
1865 body_has_info_sections: true,
1866 ping_name: "ping-name".into(),
1867 uploader_capabilities: vec![],
1868 },
1869 );
1870 upload_manager.enqueue_ping(
1871 &glean,
1872 PingPayload {
1873 document_id: Uuid::new_v4().to_string(),
1874 upload_path: PATH.into(),
1875 json_body: "".into(),
1876 headers: None,
1877 body_has_info_sections: true,
1878 ping_name: "ping-name".into(),
1879 uploader_capabilities: vec![],
1880 },
1881 );
1882
1883 match upload_manager.get_upload_task(&glean, false) {
1885 PingUploadTask::Upload { .. } => {}
1886 _ => panic!("Expected upload manager to return the next request!"),
1887 }
1888
1889 for _ in 0..max_wait_attempts {
1893 let task = upload_manager.get_upload_task(&glean, false);
1894 assert!(task.is_wait());
1895 }
1896
1897 assert_eq!(
1900 upload_manager.get_upload_task(&glean, false),
1901 PingUploadTask::done()
1902 );
1903
1904 thread::sleep(Duration::from_secs(secs_per_interval));
1906
1907 let task = upload_manager.get_upload_task(&glean, false);
1909 assert!(task.is_upload());
1910
1911 assert_eq!(
1913 upload_manager.get_upload_task(&glean, false),
1914 PingUploadTask::done()
1915 );
1916 }
1917
1918 #[test]
1919 fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
1920 let (glean, dir) = new_glean(None);
1921 let upload_manager = PingUploadManager::new(dir.path(), "test");
1922 match upload_manager.get_upload_task(&glean, false) {
1923 PingUploadTask::Wait { time } => {
1924 assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
1925 }
1926 _ => panic!("Expected upload manager to return a wait task!"),
1927 };
1928 }
1929
1930 #[test]
1931 fn cannot_enqueue_ping_while_its_being_processed() {
1932 let (glean, dir) = new_glean(None);
1933
1934 let upload_manager = PingUploadManager::no_policy(dir.path());
1935
1936 let identifier = &Uuid::new_v4();
1938 let ping = PingPayload {
1939 document_id: identifier.to_string(),
1940 upload_path: PATH.into(),
1941 json_body: "".into(),
1942 headers: None,
1943 body_has_info_sections: true,
1944 ping_name: "ping-name".into(),
1945 uploader_capabilities: vec![],
1946 };
1947 upload_manager.enqueue_ping(&glean, ping);
1948 assert!(upload_manager.get_upload_task(&glean, false).is_upload());
1949
1950 let ping = PingPayload {
1952 document_id: identifier.to_string(),
1953 upload_path: PATH.into(),
1954 json_body: "".into(),
1955 headers: None,
1956 body_has_info_sections: true,
1957 ping_name: "ping-name".into(),
1958 uploader_capabilities: vec![],
1959 };
1960 upload_manager.enqueue_ping(&glean, ping);
1961
1962 assert_eq!(
1964 upload_manager.get_upload_task(&glean, false),
1965 PingUploadTask::done()
1966 );
1967
1968 upload_manager.process_ping_upload_response(
1970 &glean,
1971 &identifier.to_string(),
1972 UploadResult::http_status(200),
1973 );
1974 }
1975}