1use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::mem;
18use std::path::PathBuf;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::sync::{Arc, RwLock, RwLockWriteGuard};
21use std::time::{Duration, Instant};
22
23use chrono::Utc;
24use malloc_size_of::MallocSizeOf;
25use malloc_size_of_derive::MallocSizeOf;
26
27use crate::error::ErrorKind;
28use crate::TimerId;
29use crate::{internal_metrics::UploadMetrics, Glean};
30pub use directory::process_metadata;
31use directory::{PingDirectoryManager, PingPayloadsByDirectory};
32use policy::Policy;
33use request::create_date_header_value;
34
35pub use directory::{PingMetadata, PingPayload};
36pub use request::{HeaderMap, PingRequest};
37pub use result::{UploadResult, UploadTaskAction};
38
39mod directory;
40mod policy;
41mod request;
42mod result;
43
44const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; #[derive(Debug, MallocSizeOf)]
47struct RateLimiter {
48 started: Option<Instant>,
50 count: u32,
52 interval: Duration,
54 max_count: u32,
56}
57
58#[derive(PartialEq)]
60enum RateLimiterState {
61 Incrementing,
63 Throttled(u64),
68}
69
70impl RateLimiter {
71 pub fn new(interval: Duration, max_count: u32) -> Self {
72 Self {
73 started: None,
74 count: 0,
75 interval,
76 max_count,
77 }
78 }
79
80 fn reset(&mut self) {
81 self.started = Some(Instant::now());
82 self.count = 0;
83 }
84
85 fn elapsed(&self) -> Duration {
86 self.started.unwrap().elapsed()
87 }
88
89 fn should_reset(&self) -> bool {
95 if self.started.is_none() {
96 return true;
97 }
98
99 if self.elapsed() > self.interval {
101 return true;
102 }
103
104 false
105 }
106
107 pub fn get_state(&mut self) -> RateLimiterState {
113 if self.should_reset() {
114 self.reset();
115 }
116
117 if self.count == self.max_count {
118 let remaining = self.interval.as_millis() - self.elapsed().as_millis();
121 return RateLimiterState::Throttled(
122 remaining
123 .try_into()
124 .unwrap_or(self.interval.as_secs() * 1000),
125 );
126 }
127
128 self.count += 1;
129 RateLimiterState::Incrementing
130 }
131}
132
133#[derive(PartialEq, Eq, Debug)]
138pub enum PingUploadTask {
139 Upload {
141 request: PingRequest,
144 },
145
146 Wait {
149 time: u64,
152 },
153
154 Done {
167 #[doc(hidden)]
168 unused: i8,
170 },
171}
172
173impl PingUploadTask {
174 pub fn is_upload(&self) -> bool {
176 matches!(self, PingUploadTask::Upload { .. })
177 }
178
179 pub fn is_wait(&self) -> bool {
181 matches!(self, PingUploadTask::Wait { .. })
182 }
183
184 pub(crate) fn done() -> Self {
185 PingUploadTask::Done { unused: 0 }
186 }
187}
188
189#[derive(Debug)]
191pub struct PingUploadManager {
192 queue: RwLock<VecDeque<PingRequest>>,
194 directory_manager: PingDirectoryManager,
196 processed_pending_pings: Arc<AtomicBool>,
198 cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
200 recoverable_failure_count: AtomicU32,
202 wait_attempt_count: AtomicU32,
204 rate_limiter: Option<RwLock<RateLimiter>>,
209 language_binding_name: String,
213 upload_metrics: UploadMetrics,
215 policy: Policy,
217
218 in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
219}
220
221impl MallocSizeOf for PingUploadManager {
222 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
223 let shallow_size = {
224 let queue = self.queue.read().unwrap();
225 if ops.has_malloc_enclosing_size_of() {
226 if let Some(front) = queue.front() {
227 unsafe { ops.malloc_enclosing_size_of(front) }
230 } else {
231 0
233 }
234 } else {
235 queue.capacity() * mem::size_of::<PingRequest>()
239 }
240 };
241
242 let mut n = shallow_size
243 + self.directory_manager.size_of(ops)
244 + unsafe { ops.malloc_size_of(self.processed_pending_pings.as_ptr()) }
246 + self.cached_pings.read().unwrap().size_of(ops)
247 + self.rate_limiter.as_ref().map(|rl| {
248 let lock = rl.read().unwrap();
249 (*lock).size_of(ops)
250 }).unwrap_or(0)
251 + self.language_binding_name.size_of(ops)
252 + self.upload_metrics.size_of(ops)
253 + self.policy.size_of(ops);
254
255 let in_flight = self.in_flight.read().unwrap();
256 n += in_flight.size_of(ops);
257
258 n
259 }
260}
261
262impl PingUploadManager {
263 pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
274 Self {
275 queue: RwLock::new(VecDeque::new()),
276 directory_manager: PingDirectoryManager::new(data_path),
277 processed_pending_pings: Arc::new(AtomicBool::new(false)),
278 cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
279 recoverable_failure_count: AtomicU32::new(0),
280 wait_attempt_count: AtomicU32::new(0),
281 rate_limiter: None,
282 language_binding_name: language_binding_name.into(),
283 upload_metrics: UploadMetrics::new(),
284 policy: Policy::default(),
285 in_flight: RwLock::new(HashMap::default()),
286 }
287 }
288
289 pub fn scan_pending_pings_directories(
296 &self,
297 trigger_upload: bool,
298 ) -> std::thread::JoinHandle<()> {
299 let local_manager = self.directory_manager.clone();
300 let local_cached_pings = self.cached_pings.clone();
301 let local_flag = self.processed_pending_pings.clone();
302 crate::thread::spawn("glean.ping_directory_manager.process_dir", move || {
303 {
304 let mut local_cached_pings = local_cached_pings
306 .write()
307 .expect("Can't write to pending pings cache.");
308 local_cached_pings.extend(local_manager.process_dirs());
309 local_flag.store(true, Ordering::SeqCst);
310 }
311 if trigger_upload {
312 crate::dispatcher::launch(|| {
313 if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok()) {
314 if let Err(e) = state.callbacks.trigger_upload() {
315 log::error!(
316 "Triggering upload after pending ping scan failed. Error: {}",
317 e
318 );
319 }
320 }
321 });
322 }
323 })
324 .expect("Unable to spawn thread to process pings directories.")
325 }
326
327 #[cfg(test)]
329 pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
330 let mut upload_manager = Self::new(data_path, "Test");
331
332 upload_manager.policy.set_max_recoverable_failures(None);
334 upload_manager.policy.set_max_wait_attempts(None);
335 upload_manager.policy.set_max_ping_body_size(None);
336 upload_manager
337 .policy
338 .set_max_pending_pings_directory_size(None);
339 upload_manager.policy.set_max_pending_pings_count(None);
340
341 upload_manager
343 .scan_pending_pings_directories(false)
344 .join()
345 .unwrap();
346
347 upload_manager
348 }
349
350 fn processed_pending_pings(&self) -> bool {
351 self.processed_pending_pings.load(Ordering::SeqCst)
352 }
353
354 fn recoverable_failure_count(&self) -> u32 {
355 self.recoverable_failure_count.load(Ordering::SeqCst)
356 }
357
358 fn wait_attempt_count(&self) -> u32 {
359 self.wait_attempt_count.load(Ordering::SeqCst)
360 }
361
362 fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
367 let PingPayload {
368 document_id,
369 upload_path: path,
370 json_body: body,
371 headers,
372 body_has_info_sections,
373 ping_name,
374 uploader_capabilities,
375 } = ping;
376 let mut request = PingRequest::builder(
377 &self.language_binding_name,
378 self.policy.max_ping_body_size(),
379 )
380 .document_id(&document_id)
381 .path(path)
382 .body(body)
383 .body_has_info_sections(body_has_info_sections)
384 .ping_name(ping_name)
385 .uploader_capabilities(uploader_capabilities);
386
387 if let Some(headers) = headers {
388 request = request.headers(headers);
389 }
390
391 match request.build() {
392 Ok(request) => Some(request),
393 Err(e) => {
394 log::warn!("Error trying to build ping request: {}", e);
395 self.directory_manager.delete_file(&document_id);
396
397 if let ErrorKind::PingBodyOverflow(s) = e.kind() {
400 self.upload_metrics
401 .discarded_exceeding_pings_size
402 .accumulate_sync(glean, *s as i64 / 1024);
403 }
404
405 None
406 }
407 }
408 }
409
410 pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
412 let mut queue = self
413 .queue
414 .write()
415 .expect("Can't write to pending pings queue.");
416
417 let PingPayload {
418 ref document_id,
419 upload_path: ref path,
420 ..
421 } = ping;
422 if queue
424 .iter()
425 .any(|request| request.document_id.as_str() == document_id)
426 {
427 log::warn!(
428 "Attempted to enqueue a duplicate ping {} at {}.",
429 document_id,
430 path
431 );
432 return;
433 }
434
435 {
436 let in_flight = self.in_flight.read().unwrap();
437 if in_flight.contains_key(document_id) {
438 log::warn!(
439 "Attempted to enqueue an in-flight ping {} at {}.",
440 document_id,
441 path
442 );
443 self.upload_metrics
444 .in_flight_pings_dropped
445 .add_sync(glean, 0);
446 return;
447 }
448 }
449
450 log::trace!("Enqueuing ping {} at {}", document_id, path);
451 if let Some(request) = self.build_ping_request(glean, ping) {
452 queue.push_back(request)
453 }
454 }
455
456 fn enqueue_cached_pings(&self, glean: &Glean) {
473 let mut cached_pings = self
474 .cached_pings
475 .write()
476 .expect("Can't write to pending pings cache.");
477
478 if cached_pings.len() > 0 {
479 let mut pending_pings_directory_size: u64 = 0;
480 let mut pending_pings_count = 0;
481 let mut deleting = false;
482
483 let total = cached_pings.pending_pings.len() as u64;
484 self.upload_metrics
485 .pending_pings
486 .add_sync(glean, total.try_into().unwrap_or(0));
487
488 if total > self.policy.max_pending_pings_count() {
489 log::warn!(
490 "More than {} pending pings in the directory, will delete {} old pings.",
491 self.policy.max_pending_pings_count(),
492 total - self.policy.max_pending_pings_count()
493 );
494 }
495
496 cached_pings.pending_pings.reverse();
502 cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
503 pending_pings_count += 1;
504 pending_pings_directory_size += file_size;
505
506 if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
508 log::warn!(
509 "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
510 self.policy.max_pending_pings_directory_size()
511 );
512 deleting = true;
513 }
514
515 if pending_pings_count > self.policy.max_pending_pings_count() {
519 deleting = true;
520 }
521
522 if deleting && self.directory_manager.delete_file(document_id) {
523 self.upload_metrics
524 .deleted_pings_after_quota_hit
525 .add_sync(glean, 1);
526 return false;
527 }
528
529 true
530 });
531 cached_pings.pending_pings.reverse();
534 self.upload_metrics
535 .pending_pings_directory_size
536 .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
537
538 cached_pings
541 .deletion_request_pings
542 .drain(..)
543 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
544 cached_pings
545 .pending_pings
546 .drain(..)
547 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
548 }
549 }
550
551 pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
564 self.rate_limiter = Some(RwLock::new(RateLimiter::new(
565 Duration::from_secs(interval),
566 max_tasks,
567 )));
568 }
569
570 pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
579 if let Some(ping) = self.directory_manager.process_file(document_id) {
580 self.enqueue_ping(glean, ping);
581 }
582 }
583
584 pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
586 log::trace!("Clearing ping queue");
587 let mut queue = self
588 .queue
589 .write()
590 .expect("Can't write to pending pings queue.");
591
592 queue.retain(|ping| ping.is_deletion_request());
593 log::trace!(
594 "{} pings left in the queue (only deletion-request expected)",
595 queue.len()
596 );
597 queue
598 }
599
600 fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
601 let wait_or_done = |time: u64| {
606 self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
607 if self.wait_attempt_count() > self.policy.max_wait_attempts() {
608 PingUploadTask::done()
609 } else {
610 PingUploadTask::Wait { time }
611 }
612 };
613
614 if !self.processed_pending_pings() {
615 log::info!(
616 "Tried getting an upload task, but processing is ongoing. Will come back later."
617 );
618 return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
619 }
620
621 self.enqueue_cached_pings(glean);
623
624 if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
625 log::warn!(
626 "Reached maximum recoverable failures for the current uploading window. You are done."
627 );
628 return PingUploadTask::done();
629 }
630
631 let mut queue = self
632 .queue
633 .write()
634 .expect("Can't write to pending pings queue.");
635 match queue.front() {
636 Some(request) => {
637 if let Some(rate_limiter) = &self.rate_limiter {
638 let mut rate_limiter = rate_limiter
639 .write()
640 .expect("Can't write to the rate limiter.");
641 if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
642 log::info!(
643 "Tried getting an upload task, but we are throttled at the moment."
644 );
645 return wait_or_done(remaining);
646 }
647 }
648
649 log::info!(
650 "New upload task with id {} (path: {})",
651 request.document_id,
652 request.path
653 );
654
655 if log_ping {
656 if let Some(body) = request.pretty_body() {
657 chunked_log_info(&request.path, &body);
658 } else {
659 chunked_log_info(&request.path, "<invalid ping payload>");
660 }
661 }
662
663 {
664 let mut in_flight = self.in_flight.write().unwrap();
668 let success_id = self.upload_metrics.send_success.start_sync();
669 let failure_id = self.upload_metrics.send_failure.start_sync();
670 in_flight.insert(request.document_id.clone(), (success_id, failure_id));
671 }
672
673 let mut request = queue.pop_front().unwrap();
674
675 request
677 .headers
678 .insert("Date".to_string(), create_date_header_value(Utc::now()));
679
680 PingUploadTask::Upload { request }
681 }
682 None => {
683 log::info!("No more pings to upload! You are done.");
684 PingUploadTask::done()
685 }
686 }
687 }
688
689 pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
700 let task = self.get_upload_task_internal(glean, log_ping);
701
702 if !task.is_wait() && self.wait_attempt_count() > 0 {
703 self.wait_attempt_count.store(0, Ordering::SeqCst);
704 }
705
706 if !task.is_upload() && self.recoverable_failure_count() > 0 {
707 self.recoverable_failure_count.store(0, Ordering::SeqCst);
708 }
709
710 task
711 }
712
713 pub fn process_ping_upload_response(
752 &self,
753 glean: &Glean,
754 document_id: &str,
755 status: UploadResult,
756 ) -> UploadTaskAction {
757 use UploadResult::*;
758
759 let stop_time = zeitstempel::now();
760
761 if let Some(label) = status.get_label() {
762 let metric = self.upload_metrics.ping_upload_failure.get(label);
763 metric.add_sync(glean, 1);
764 }
765
766 let send_ids = {
767 let mut lock = self.in_flight.write().unwrap();
768 lock.remove(document_id)
769 };
770
771 if send_ids.is_none() {
772 self.upload_metrics.missing_send_ids.add_sync(glean, 1);
773 }
774
775 match status {
776 HttpStatus { code } if (200..=299).contains(&code) => {
777 log::info!("Ping {} successfully sent {}.", document_id, code);
778 if let Some((success_id, failure_id)) = send_ids {
779 self.upload_metrics
780 .send_success
781 .set_stop_and_accumulate(glean, success_id, stop_time);
782 self.upload_metrics.send_failure.cancel_sync(failure_id);
783 }
784 self.directory_manager.delete_file(document_id);
785 }
786
787 UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
788 log::warn!(
789 "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
790 document_id,
791 status
792 );
793 if let Some((success_id, failure_id)) = send_ids {
794 self.upload_metrics.send_success.cancel_sync(success_id);
795 self.upload_metrics
796 .send_failure
797 .set_stop_and_accumulate(glean, failure_id, stop_time);
798 }
799 self.directory_manager.delete_file(document_id);
800 }
801
802 RecoverableFailure { .. } | HttpStatus { .. } => {
803 log::warn!(
804 "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
805 document_id,
806 status
807 );
808 if let Some((success_id, failure_id)) = send_ids {
809 self.upload_metrics.send_success.cancel_sync(success_id);
810 self.upload_metrics
811 .send_failure
812 .set_stop_and_accumulate(glean, failure_id, stop_time);
813 }
814 self.enqueue_ping_from_file(glean, document_id);
815 self.recoverable_failure_count
816 .fetch_add(1, Ordering::SeqCst);
817 }
818
819 Done { .. } => {
820 log::debug!("Uploader signaled Done. Exiting.");
821 if let Some((success_id, failure_id)) = send_ids {
822 self.upload_metrics.send_success.cancel_sync(success_id);
823 self.upload_metrics.send_failure.cancel_sync(failure_id);
824 }
825 return UploadTaskAction::End;
826 }
827 };
828
829 UploadTaskAction::Next
830 }
831}
832
833#[cfg(target_os = "android")]
835pub fn chunked_log_info(path: &str, payload: &str) {
836 const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
840
841 if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
845 log::info!("Glean ping to URL: {}\n{}", path, payload);
846 return;
847 }
848
849 let mut start = 0;
852 let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
853 let mut chunk_idx = 1;
854 let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
856
857 while end < payload.len() {
858 for _ in 0..4 {
861 if payload.is_char_boundary(end) {
862 break;
863 }
864 end -= 1;
865 }
866
867 log::info!(
868 "Glean ping to URL: {} [Part {} of {}]\n{}",
869 path,
870 chunk_idx,
871 total_chunks,
872 &payload[start..end]
873 );
874
875 start = end;
877 end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
878 chunk_idx += 1;
879 }
880
881 if start < payload.len() {
883 log::info!(
884 "Glean ping to URL: {} [Part {} of {}]\n{}",
885 path,
886 chunk_idx,
887 total_chunks,
888 &payload[start..]
889 );
890 }
891}
892
893#[cfg(not(target_os = "android"))]
895pub fn chunked_log_info(_path: &str, payload: &str) {
896 log::info!("{}", payload)
897}
898
899#[cfg(test)]
900mod test {
901 use std::thread;
902 use uuid::Uuid;
903
904 use super::*;
905 use crate::metrics::PingType;
906 use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
907
908 const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
909
910 #[test]
911 fn doesnt_error_when_there_are_no_pending_pings() {
912 let (glean, _t) = new_glean(None);
913
914 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
917 }
918
919 #[test]
920 fn returns_ping_request_when_there_is_one() {
921 let (glean, dir) = new_glean(None);
922
923 let upload_manager = PingUploadManager::no_policy(dir.path());
924
925 upload_manager.enqueue_ping(
927 &glean,
928 PingPayload {
929 document_id: Uuid::new_v4().to_string(),
930 upload_path: PATH.into(),
931 json_body: "".into(),
932 headers: None,
933 body_has_info_sections: true,
934 ping_name: "ping-name".into(),
935 uploader_capabilities: vec![],
936 },
937 );
938
939 let task = upload_manager.get_upload_task(&glean, false);
942 assert!(task.is_upload());
943 }
944
945 #[test]
946 fn returns_as_many_ping_requests_as_there_are() {
947 let (glean, dir) = new_glean(None);
948
949 let upload_manager = PingUploadManager::no_policy(dir.path());
950
951 let n = 10;
953 for _ in 0..n {
954 upload_manager.enqueue_ping(
955 &glean,
956 PingPayload {
957 document_id: Uuid::new_v4().to_string(),
958 upload_path: PATH.into(),
959 json_body: "".into(),
960 headers: None,
961 body_has_info_sections: true,
962 ping_name: "ping-name".into(),
963 uploader_capabilities: vec![],
964 },
965 );
966 }
967
968 for _ in 0..n {
970 let task = upload_manager.get_upload_task(&glean, false);
971 assert!(task.is_upload());
972 }
973
974 assert_eq!(
976 upload_manager.get_upload_task(&glean, false),
977 PingUploadTask::done()
978 );
979 }
980
981 #[test]
982 fn limits_the_number_of_pings_when_there_is_rate_limiting() {
983 let (glean, dir) = new_glean(None);
984
985 let mut upload_manager = PingUploadManager::no_policy(dir.path());
986
987 let max_pings_per_interval = 10;
989 upload_manager.set_rate_limiter(3, 10);
990
991 for _ in 0..max_pings_per_interval {
993 upload_manager.enqueue_ping(
994 &glean,
995 PingPayload {
996 document_id: Uuid::new_v4().to_string(),
997 upload_path: PATH.into(),
998 json_body: "".into(),
999 headers: None,
1000 body_has_info_sections: true,
1001 ping_name: "ping-name".into(),
1002 uploader_capabilities: vec![],
1003 },
1004 );
1005 }
1006
1007 for _ in 0..max_pings_per_interval {
1009 let task = upload_manager.get_upload_task(&glean, false);
1010 assert!(task.is_upload());
1011 }
1012
1013 upload_manager.enqueue_ping(
1015 &glean,
1016 PingPayload {
1017 document_id: Uuid::new_v4().to_string(),
1018 upload_path: PATH.into(),
1019 json_body: "".into(),
1020 headers: None,
1021 body_has_info_sections: true,
1022 ping_name: "ping-name".into(),
1023 uploader_capabilities: vec![],
1024 },
1025 );
1026
1027 match upload_manager.get_upload_task(&glean, false) {
1029 PingUploadTask::Wait { time } => {
1030 thread::sleep(Duration::from_millis(time));
1032 }
1033 _ => panic!("Expected upload manager to return a wait task!"),
1034 };
1035
1036 let task = upload_manager.get_upload_task(&glean, false);
1037 assert!(task.is_upload());
1038 }
1039
1040 #[test]
1041 fn clearing_the_queue_works_correctly() {
1042 let (glean, dir) = new_glean(None);
1043
1044 let upload_manager = PingUploadManager::no_policy(dir.path());
1045
1046 for _ in 0..10 {
1048 upload_manager.enqueue_ping(
1049 &glean,
1050 PingPayload {
1051 document_id: Uuid::new_v4().to_string(),
1052 upload_path: PATH.into(),
1053 json_body: "".into(),
1054 headers: None,
1055 body_has_info_sections: true,
1056 ping_name: "ping-name".into(),
1057 uploader_capabilities: vec![],
1058 },
1059 );
1060 }
1061
1062 drop(upload_manager.clear_ping_queue());
1064
1065 assert_eq!(
1067 upload_manager.get_upload_task(&glean, false),
1068 PingUploadTask::done()
1069 );
1070 }
1071
1072 #[test]
1073 fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1074 let (mut glean, _t) = new_glean(None);
1075
1076 let ping_type = PingType::new(
1078 "test",
1079 true,
1080 true,
1081 true,
1082 true,
1083 true,
1084 vec![],
1085 vec![],
1086 true,
1087 vec![],
1088 );
1089 glean.register_ping_type(&ping_type);
1090
1091 let n = 10;
1093 for _ in 0..n {
1094 ping_type.submit_sync(&glean, None);
1095 }
1096
1097 glean
1098 .internal_pings
1099 .deletion_request
1100 .submit_sync(&glean, None);
1101
1102 drop(glean.upload_manager.clear_ping_queue());
1104
1105 let upload_task = glean.get_upload_task();
1106 match upload_task {
1107 PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1108 _ => panic!("Expected upload manager to return the next request!"),
1109 }
1110
1111 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1113 }
1114
1115 #[test]
1116 fn fills_up_queue_successfully_from_disk() {
1117 let (mut glean, dir) = new_glean(None);
1118
1119 let ping_type = PingType::new(
1121 "test",
1122 true,
1123 true,
1124 true,
1125 true,
1126 true,
1127 vec![],
1128 vec![],
1129 true,
1130 vec![],
1131 );
1132 glean.register_ping_type(&ping_type);
1133
1134 let n = 10;
1136 for _ in 0..n {
1137 ping_type.submit_sync(&glean, None);
1138 }
1139
1140 let upload_manager = PingUploadManager::no_policy(dir.path());
1142
1143 for _ in 0..n {
1145 let task = upload_manager.get_upload_task(&glean, false);
1146 assert!(task.is_upload());
1147 }
1148
1149 assert_eq!(
1151 upload_manager.get_upload_task(&glean, false),
1152 PingUploadTask::done()
1153 );
1154 }
1155
1156 #[test]
1157 fn processes_correctly_success_upload_response() {
1158 let (mut glean, dir) = new_glean(None);
1159
1160 let ping_type = PingType::new(
1162 "test",
1163 true,
1164 true,
1165 true,
1166 true,
1167 true,
1168 vec![],
1169 vec![],
1170 true,
1171 vec![],
1172 );
1173 glean.register_ping_type(&ping_type);
1174
1175 ping_type.submit_sync(&glean, None);
1177
1178 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1180
1181 match glean.get_upload_task() {
1183 PingUploadTask::Upload { request } => {
1184 let document_id = request.document_id;
1186 glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1187 assert!(!pending_pings_dir.join(document_id).exists());
1189 }
1190 _ => panic!("Expected upload manager to return the next request!"),
1191 }
1192
1193 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1195 }
1196
1197 #[test]
1198 fn processes_correctly_client_error_upload_response() {
1199 let (mut glean, dir) = new_glean(None);
1200
1201 let ping_type = PingType::new(
1203 "test",
1204 true,
1205 true,
1206 true,
1207 true,
1208 true,
1209 vec![],
1210 vec![],
1211 true,
1212 vec![],
1213 );
1214 glean.register_ping_type(&ping_type);
1215
1216 ping_type.submit_sync(&glean, None);
1218
1219 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1221
1222 match glean.get_upload_task() {
1224 PingUploadTask::Upload { request } => {
1225 let document_id = request.document_id;
1227 glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1228 assert!(!pending_pings_dir.join(document_id).exists());
1230 }
1231 _ => panic!("Expected upload manager to return the next request!"),
1232 }
1233
1234 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1236 }
1237
1238 #[test]
1239 fn processes_correctly_server_error_upload_response() {
1240 let (mut glean, _t) = new_glean(None);
1241
1242 let ping_type = PingType::new(
1244 "test",
1245 true,
1246 true,
1247 true,
1248 true,
1249 true,
1250 vec![],
1251 vec![],
1252 true,
1253 vec![],
1254 );
1255 glean.register_ping_type(&ping_type);
1256
1257 ping_type.submit_sync(&glean, None);
1259
1260 match glean.get_upload_task() {
1262 PingUploadTask::Upload { request } => {
1263 let document_id = request.document_id;
1265 glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1266 match glean.get_upload_task() {
1268 PingUploadTask::Upload { request } => {
1269 assert_eq!(document_id, request.document_id);
1270 }
1271 _ => panic!("Expected upload manager to return the next request!"),
1272 }
1273 }
1274 _ => panic!("Expected upload manager to return the next request!"),
1275 }
1276
1277 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1279 }
1280
1281 #[test]
1282 fn processes_correctly_unrecoverable_upload_response() {
1283 let (mut glean, dir) = new_glean(None);
1284
1285 let ping_type = PingType::new(
1287 "test",
1288 true,
1289 true,
1290 true,
1291 true,
1292 true,
1293 vec![],
1294 vec![],
1295 true,
1296 vec![],
1297 );
1298 glean.register_ping_type(&ping_type);
1299
1300 ping_type.submit_sync(&glean, None);
1302
1303 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1305
1306 match glean.get_upload_task() {
1308 PingUploadTask::Upload { request } => {
1309 let document_id = request.document_id;
1311 glean.process_ping_upload_response(
1312 &document_id,
1313 UploadResult::unrecoverable_failure(),
1314 );
1315 assert!(!pending_pings_dir.join(document_id).exists());
1317 }
1318 _ => panic!("Expected upload manager to return the next request!"),
1319 }
1320
1321 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1323 }
1324
1325 #[test]
1326 fn new_pings_are_added_while_upload_in_progress() {
1327 let (glean, dir) = new_glean(None);
1328
1329 let upload_manager = PingUploadManager::no_policy(dir.path());
1330
1331 let doc1 = Uuid::new_v4().to_string();
1332 let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1333
1334 let doc2 = Uuid::new_v4().to_string();
1335 let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1336
1337 upload_manager.enqueue_ping(
1339 &glean,
1340 PingPayload {
1341 document_id: doc1.clone(),
1342 upload_path: path1,
1343 json_body: "".into(),
1344 headers: None,
1345 body_has_info_sections: true,
1346 ping_name: "test-ping".into(),
1347 uploader_capabilities: vec![],
1348 },
1349 );
1350
1351 let req = match upload_manager.get_upload_task(&glean, false) {
1353 PingUploadTask::Upload { request } => request,
1354 _ => panic!("Expected upload manager to return the next request!"),
1355 };
1356 assert_eq!(doc1, req.document_id);
1357
1358 upload_manager.enqueue_ping(
1360 &glean,
1361 PingPayload {
1362 document_id: doc2.clone(),
1363 upload_path: path2,
1364 json_body: "".into(),
1365 headers: None,
1366 body_has_info_sections: true,
1367 ping_name: "test-ping".into(),
1368 uploader_capabilities: vec![],
1369 },
1370 );
1371
1372 upload_manager.process_ping_upload_response(
1374 &glean,
1375 &req.document_id,
1376 UploadResult::http_status(200),
1377 );
1378
1379 let req = match upload_manager.get_upload_task(&glean, false) {
1381 PingUploadTask::Upload { request } => request,
1382 _ => panic!("Expected upload manager to return the next request!"),
1383 };
1384 assert_eq!(doc2, req.document_id);
1385
1386 upload_manager.process_ping_upload_response(
1388 &glean,
1389 &req.document_id,
1390 UploadResult::http_status(200),
1391 );
1392
1393 assert_eq!(
1395 upload_manager.get_upload_task(&glean, false),
1396 PingUploadTask::done()
1397 );
1398 }
1399
1400 #[test]
1401 fn adds_debug_view_header_to_requests_when_tag_is_set() {
1402 let (mut glean, _t) = new_glean(None);
1403
1404 glean.set_debug_view_tag("valid-tag");
1405
1406 let ping_type = PingType::new(
1408 "test",
1409 true,
1410 true,
1411 true,
1412 true,
1413 true,
1414 vec![],
1415 vec![],
1416 true,
1417 vec![],
1418 );
1419 glean.register_ping_type(&ping_type);
1420
1421 ping_type.submit_sync(&glean, None);
1423
1424 match glean.get_upload_task() {
1426 PingUploadTask::Upload { request } => {
1427 assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1428 }
1429 _ => panic!("Expected upload manager to return the next request!"),
1430 }
1431 }
1432
1433 #[test]
1434 fn duplicates_are_not_enqueued() {
1435 let (glean, dir) = new_glean(None);
1436
1437 let upload_manager = PingUploadManager::no_policy(dir.path());
1440
1441 let doc_id = Uuid::new_v4().to_string();
1442 let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1443
1444 upload_manager.enqueue_ping(
1446 &glean,
1447 PingPayload {
1448 document_id: doc_id.clone(),
1449 upload_path: path.clone(),
1450 json_body: "".into(),
1451 headers: None,
1452 body_has_info_sections: true,
1453 ping_name: "test-ping".into(),
1454 uploader_capabilities: vec![],
1455 },
1456 );
1457 upload_manager.enqueue_ping(
1458 &glean,
1459 PingPayload {
1460 document_id: doc_id,
1461 upload_path: path,
1462 json_body: "".into(),
1463 headers: None,
1464 body_has_info_sections: true,
1465 ping_name: "test-ping".into(),
1466 uploader_capabilities: vec![],
1467 },
1468 );
1469
1470 let task = upload_manager.get_upload_task(&glean, false);
1472 assert!(task.is_upload());
1473
1474 assert_eq!(
1476 upload_manager.get_upload_task(&glean, false),
1477 PingUploadTask::done()
1478 );
1479 }
1480
1481 #[test]
1482 fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1483 let (mut glean, dir) = new_glean(None);
1484
1485 let ping_type = PingType::new(
1487 "test",
1488 true,
1489 true,
1490 true,
1491 true,
1492 true,
1493 vec![],
1494 vec![],
1495 true,
1496 vec![],
1497 );
1498 glean.register_ping_type(&ping_type);
1499
1500 let n = 5;
1502 for _ in 0..n {
1503 ping_type.submit_sync(&glean, None);
1504 }
1505
1506 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1507
1508 let max_recoverable_failures = 3;
1510 upload_manager
1511 .policy
1512 .set_max_recoverable_failures(Some(max_recoverable_failures));
1513
1514 for _ in 0..max_recoverable_failures {
1516 match upload_manager.get_upload_task(&glean, false) {
1517 PingUploadTask::Upload { request } => {
1518 upload_manager.process_ping_upload_response(
1519 &glean,
1520 &request.document_id,
1521 UploadResult::recoverable_failure(),
1522 );
1523 }
1524 _ => panic!("Expected upload manager to return the next request!"),
1525 }
1526 }
1527
1528 assert_eq!(
1531 upload_manager.get_upload_task(&glean, false),
1532 PingUploadTask::done()
1533 );
1534
1535 for _ in 0..n {
1537 let task = upload_manager.get_upload_task(&glean, false);
1538 assert!(task.is_upload());
1539 }
1540 }
1541
1542 #[test]
1543 fn quota_is_enforced_when_enqueueing_cached_pings() {
1544 let (mut glean, dir) = new_glean(None);
1545
1546 let ping_type = PingType::new(
1548 "test",
1549 true,
1550 true,
1551 true,
1552 true,
1553 true,
1554 vec![],
1555 vec![],
1556 true,
1557 vec![],
1558 );
1559 glean.register_ping_type(&ping_type);
1560
1561 let n = 10;
1563 for _ in 0..n {
1564 ping_type.submit_sync(&glean, None);
1565 }
1566
1567 let directory_manager = PingDirectoryManager::new(dir.path());
1568 let pending_pings = directory_manager.process_dirs().pending_pings;
1569 let (_, newest_ping) = &pending_pings.last().unwrap();
1572 let PingPayload {
1573 document_id: newest_ping_id,
1574 ..
1575 } = &newest_ping;
1576
1577 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1579
1580 upload_manager
1587 .policy
1588 .set_max_pending_pings_directory_size(Some(500));
1589
1590 match upload_manager.get_upload_task(&glean, false) {
1594 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1595 _ => panic!("Expected upload manager to return the next request!"),
1596 }
1597
1598 assert_eq!(
1601 upload_manager.get_upload_task(&glean, false),
1602 PingUploadTask::done()
1603 );
1604
1605 assert_eq!(
1607 n - 1,
1608 upload_manager
1609 .upload_metrics
1610 .deleted_pings_after_quota_hit
1611 .get_value(&glean, Some("metrics"))
1612 .unwrap()
1613 );
1614 assert_eq!(
1615 n,
1616 upload_manager
1617 .upload_metrics
1618 .pending_pings
1619 .get_value(&glean, Some("metrics"))
1620 .unwrap()
1621 );
1622 }
1623
1624 #[test]
1625 fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1626 let (mut glean, dir) = new_glean(None);
1627
1628 let ping_type = PingType::new(
1630 "test",
1631 true,
1632 true,
1633 true,
1634 true,
1635 true,
1636 vec![],
1637 vec![],
1638 true,
1639 vec![],
1640 );
1641 glean.register_ping_type(&ping_type);
1642
1643 let count_quota = 3;
1645 let n = 10;
1647
1648 for _ in 0..n {
1650 ping_type.submit_sync(&glean, None);
1651 }
1652
1653 let directory_manager = PingDirectoryManager::new(dir.path());
1654 let pending_pings = directory_manager.process_dirs().pending_pings;
1655 let expected_pings = pending_pings
1658 .iter()
1659 .rev()
1660 .take(count_quota)
1661 .map(|(_, ping)| ping.document_id.clone())
1662 .collect::<Vec<_>>();
1663
1664 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1666
1667 upload_manager
1668 .policy
1669 .set_max_pending_pings_count(Some(count_quota as u64));
1670
1671 for ping_id in expected_pings.iter().rev() {
1675 match upload_manager.get_upload_task(&glean, false) {
1676 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1677 _ => panic!("Expected upload manager to return the next request!"),
1678 }
1679 }
1680
1681 assert_eq!(
1684 upload_manager.get_upload_task(&glean, false),
1685 PingUploadTask::done()
1686 );
1687
1688 assert_eq!(
1690 (n - count_quota) as i32,
1691 upload_manager
1692 .upload_metrics
1693 .deleted_pings_after_quota_hit
1694 .get_value(&glean, Some("metrics"))
1695 .unwrap()
1696 );
1697 assert_eq!(
1698 n as i32,
1699 upload_manager
1700 .upload_metrics
1701 .pending_pings
1702 .get_value(&glean, Some("metrics"))
1703 .unwrap()
1704 );
1705 }
1706
1707 #[test]
1708 fn size_and_count_quota_work_together_size_first() {
1709 let (mut glean, dir) = new_glean(None);
1710
1711 let ping_type = PingType::new(
1713 "test",
1714 true,
1715 true,
1716 true,
1717 true,
1718 true,
1719 vec![],
1720 vec![],
1721 true,
1722 vec![],
1723 );
1724 glean.register_ping_type(&ping_type);
1725
1726 let expected_number_of_pings = 3;
1727 let n = 10;
1729
1730 for _ in 0..n {
1732 ping_type.submit_sync(&glean, None);
1733 }
1734
1735 let directory_manager = PingDirectoryManager::new(dir.path());
1736 let pending_pings = directory_manager.process_dirs().pending_pings;
1737 let expected_pings = pending_pings
1740 .iter()
1741 .rev()
1742 .take(expected_number_of_pings)
1743 .map(|(_, ping)| ping.document_id.clone())
1744 .collect::<Vec<_>>();
1745
1746 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1748
1749 upload_manager
1752 .policy
1753 .set_max_pending_pings_directory_size(Some(1300));
1754 upload_manager.policy.set_max_pending_pings_count(Some(5));
1755
1756 for ping_id in expected_pings.iter().rev() {
1760 match upload_manager.get_upload_task(&glean, false) {
1761 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1762 _ => panic!("Expected upload manager to return the next request!"),
1763 }
1764 }
1765
1766 assert_eq!(
1769 upload_manager.get_upload_task(&glean, false),
1770 PingUploadTask::done()
1771 );
1772
1773 assert_eq!(
1775 (n - expected_number_of_pings) as i32,
1776 upload_manager
1777 .upload_metrics
1778 .deleted_pings_after_quota_hit
1779 .get_value(&glean, Some("metrics"))
1780 .unwrap()
1781 );
1782 assert_eq!(
1783 n as i32,
1784 upload_manager
1785 .upload_metrics
1786 .pending_pings
1787 .get_value(&glean, Some("metrics"))
1788 .unwrap()
1789 );
1790 }
1791
1792 #[test]
1793 fn size_and_count_quota_work_together_count_first() {
1794 let (mut glean, dir) = new_glean(None);
1795
1796 let ping_type = PingType::new(
1798 "test",
1799 true,
1800 true,
1801 true,
1802 true,
1803 true,
1804 vec![],
1805 vec![],
1806 true,
1807 vec![],
1808 );
1809 glean.register_ping_type(&ping_type);
1810
1811 let expected_number_of_pings = 2;
1812 let n = 10;
1814
1815 for _ in 0..n {
1817 ping_type.submit_sync(&glean, None);
1818 }
1819
1820 let directory_manager = PingDirectoryManager::new(dir.path());
1821 let pending_pings = directory_manager.process_dirs().pending_pings;
1822 let expected_pings = pending_pings
1825 .iter()
1826 .rev()
1827 .take(expected_number_of_pings)
1828 .map(|(_, ping)| ping.document_id.clone())
1829 .collect::<Vec<_>>();
1830
1831 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1833
1834 upload_manager
1837 .policy
1838 .set_max_pending_pings_directory_size(Some(1000));
1839 upload_manager.policy.set_max_pending_pings_count(Some(2));
1840
1841 for ping_id in expected_pings.iter().rev() {
1845 match upload_manager.get_upload_task(&glean, false) {
1846 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1847 _ => panic!("Expected upload manager to return the next request!"),
1848 }
1849 }
1850
1851 assert_eq!(
1854 upload_manager.get_upload_task(&glean, false),
1855 PingUploadTask::done()
1856 );
1857
1858 assert_eq!(
1860 (n - expected_number_of_pings) as i32,
1861 upload_manager
1862 .upload_metrics
1863 .deleted_pings_after_quota_hit
1864 .get_value(&glean, Some("metrics"))
1865 .unwrap()
1866 );
1867 assert_eq!(
1868 n as i32,
1869 upload_manager
1870 .upload_metrics
1871 .pending_pings
1872 .get_value(&glean, Some("metrics"))
1873 .unwrap()
1874 );
1875 }
1876
1877 #[test]
1878 fn maximum_wait_attemps_is_enforced() {
1879 let (glean, dir) = new_glean(None);
1880
1881 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1882
1883 let max_wait_attempts = 3;
1885 upload_manager
1886 .policy
1887 .set_max_wait_attempts(Some(max_wait_attempts));
1888
1889 let secs_per_interval = 5;
1895 let max_pings_per_interval = 1;
1896 upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
1897
1898 upload_manager.enqueue_ping(
1900 &glean,
1901 PingPayload {
1902 document_id: Uuid::new_v4().to_string(),
1903 upload_path: PATH.into(),
1904 json_body: "".into(),
1905 headers: None,
1906 body_has_info_sections: true,
1907 ping_name: "ping-name".into(),
1908 uploader_capabilities: vec![],
1909 },
1910 );
1911 upload_manager.enqueue_ping(
1912 &glean,
1913 PingPayload {
1914 document_id: Uuid::new_v4().to_string(),
1915 upload_path: PATH.into(),
1916 json_body: "".into(),
1917 headers: None,
1918 body_has_info_sections: true,
1919 ping_name: "ping-name".into(),
1920 uploader_capabilities: vec![],
1921 },
1922 );
1923
1924 match upload_manager.get_upload_task(&glean, false) {
1926 PingUploadTask::Upload { .. } => {}
1927 _ => panic!("Expected upload manager to return the next request!"),
1928 }
1929
1930 for _ in 0..max_wait_attempts {
1934 let task = upload_manager.get_upload_task(&glean, false);
1935 assert!(task.is_wait());
1936 }
1937
1938 assert_eq!(
1941 upload_manager.get_upload_task(&glean, false),
1942 PingUploadTask::done()
1943 );
1944
1945 thread::sleep(Duration::from_secs(secs_per_interval));
1947
1948 let task = upload_manager.get_upload_task(&glean, false);
1950 assert!(task.is_upload());
1951
1952 assert_eq!(
1954 upload_manager.get_upload_task(&glean, false),
1955 PingUploadTask::done()
1956 );
1957 }
1958
1959 #[test]
1960 fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
1961 let (glean, dir) = new_glean(None);
1962 let upload_manager = PingUploadManager::new(dir.path(), "test");
1963 match upload_manager.get_upload_task(&glean, false) {
1964 PingUploadTask::Wait { time } => {
1965 assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
1966 }
1967 _ => panic!("Expected upload manager to return a wait task!"),
1968 };
1969 }
1970
1971 #[test]
1972 fn cannot_enqueue_ping_while_its_being_processed() {
1973 let (glean, dir) = new_glean(None);
1974
1975 let upload_manager = PingUploadManager::no_policy(dir.path());
1976
1977 let identifier = &Uuid::new_v4();
1979 let ping = PingPayload {
1980 document_id: identifier.to_string(),
1981 upload_path: PATH.into(),
1982 json_body: "".into(),
1983 headers: None,
1984 body_has_info_sections: true,
1985 ping_name: "ping-name".into(),
1986 uploader_capabilities: vec![],
1987 };
1988 upload_manager.enqueue_ping(&glean, ping);
1989 assert!(upload_manager.get_upload_task(&glean, false).is_upload());
1990
1991 let ping = PingPayload {
1993 document_id: identifier.to_string(),
1994 upload_path: PATH.into(),
1995 json_body: "".into(),
1996 headers: None,
1997 body_has_info_sections: true,
1998 ping_name: "ping-name".into(),
1999 uploader_capabilities: vec![],
2000 };
2001 upload_manager.enqueue_ping(&glean, ping);
2002
2003 assert_eq!(
2005 upload_manager.get_upload_task(&glean, false),
2006 PingUploadTask::done()
2007 );
2008
2009 upload_manager.process_ping_upload_response(
2011 &glean,
2012 &identifier.to_string(),
2013 UploadResult::http_status(200),
2014 );
2015 }
2016}