1use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::mem;
18use std::path::PathBuf;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::sync::{Arc, RwLock, RwLockWriteGuard};
21use std::thread;
22use std::time::{Duration, Instant};
23
24use chrono::Utc;
25use malloc_size_of::MallocSizeOf;
26use malloc_size_of_derive::MallocSizeOf;
27
28use crate::error::ErrorKind;
29use crate::TimerId;
30use crate::{internal_metrics::UploadMetrics, Glean};
31pub use directory::process_metadata;
32use directory::{PingDirectoryManager, PingPayloadsByDirectory};
33use policy::Policy;
34use request::create_date_header_value;
35
36pub use directory::{PingMetadata, PingPayload};
37pub use request::{HeaderMap, PingRequest};
38pub use result::{UploadResult, UploadTaskAction};
39
40mod directory;
41mod policy;
42mod request;
43mod result;
44
45const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; #[derive(Debug, MallocSizeOf)]
48struct RateLimiter {
49 started: Option<Instant>,
51 count: u32,
53 interval: Duration,
55 max_count: u32,
57}
58
59#[derive(PartialEq)]
61enum RateLimiterState {
62 Incrementing,
64 Throttled(u64),
69}
70
71impl RateLimiter {
72 pub fn new(interval: Duration, max_count: u32) -> Self {
73 Self {
74 started: None,
75 count: 0,
76 interval,
77 max_count,
78 }
79 }
80
81 fn reset(&mut self) {
82 self.started = Some(Instant::now());
83 self.count = 0;
84 }
85
86 fn elapsed(&self) -> Duration {
87 self.started.unwrap().elapsed()
88 }
89
90 fn should_reset(&self) -> bool {
96 if self.started.is_none() {
97 return true;
98 }
99
100 if self.elapsed() > self.interval {
102 return true;
103 }
104
105 false
106 }
107
108 pub fn get_state(&mut self) -> RateLimiterState {
114 if self.should_reset() {
115 self.reset();
116 }
117
118 if self.count == self.max_count {
119 let remaining = self.interval.as_millis() - self.elapsed().as_millis();
122 return RateLimiterState::Throttled(
123 remaining
124 .try_into()
125 .unwrap_or(self.interval.as_secs() * 1000),
126 );
127 }
128
129 self.count += 1;
130 RateLimiterState::Incrementing
131 }
132}
133
134#[derive(PartialEq, Eq, Debug)]
139pub enum PingUploadTask {
140 Upload {
142 request: PingRequest,
145 },
146
147 Wait {
150 time: u64,
153 },
154
155 Done {
168 #[doc(hidden)]
169 unused: i8,
171 },
172}
173
174impl PingUploadTask {
175 pub fn is_upload(&self) -> bool {
177 matches!(self, PingUploadTask::Upload { .. })
178 }
179
180 pub fn is_wait(&self) -> bool {
182 matches!(self, PingUploadTask::Wait { .. })
183 }
184
185 pub(crate) fn done() -> Self {
186 PingUploadTask::Done { unused: 0 }
187 }
188}
189
190#[derive(Debug)]
192pub struct PingUploadManager {
193 queue: RwLock<VecDeque<PingRequest>>,
195 directory_manager: PingDirectoryManager,
197 processed_pending_pings: Arc<AtomicBool>,
199 cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
201 recoverable_failure_count: AtomicU32,
203 wait_attempt_count: AtomicU32,
205 rate_limiter: Option<RwLock<RateLimiter>>,
210 language_binding_name: String,
214 upload_metrics: UploadMetrics,
216 policy: Policy,
218
219 in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
220}
221
222impl MallocSizeOf for PingUploadManager {
223 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
224 let shallow_size = {
225 let queue = self.queue.read().unwrap();
226 if ops.has_malloc_enclosing_size_of() {
227 if let Some(front) = queue.front() {
228 unsafe { ops.malloc_enclosing_size_of(front) }
231 } else {
232 0
234 }
235 } else {
236 queue.capacity() * mem::size_of::<PingRequest>()
240 }
241 };
242
243 let mut n = shallow_size
244 + self.directory_manager.size_of(ops)
245 + unsafe { ops.malloc_size_of(self.processed_pending_pings.as_ptr()) }
247 + self.cached_pings.read().unwrap().size_of(ops)
248 + self.rate_limiter.as_ref().map(|rl| {
249 let lock = rl.read().unwrap();
250 (*lock).size_of(ops)
251 }).unwrap_or(0)
252 + self.language_binding_name.size_of(ops)
253 + self.upload_metrics.size_of(ops)
254 + self.policy.size_of(ops);
255
256 let in_flight = self.in_flight.read().unwrap();
257 n += in_flight.size_of(ops);
258
259 n
260 }
261}
262
263impl PingUploadManager {
264 pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
275 Self {
276 queue: RwLock::new(VecDeque::new()),
277 directory_manager: PingDirectoryManager::new(data_path),
278 processed_pending_pings: Arc::new(AtomicBool::new(false)),
279 cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
280 recoverable_failure_count: AtomicU32::new(0),
281 wait_attempt_count: AtomicU32::new(0),
282 rate_limiter: None,
283 language_binding_name: language_binding_name.into(),
284 upload_metrics: UploadMetrics::new(),
285 policy: Policy::default(),
286 in_flight: RwLock::new(HashMap::default()),
287 }
288 }
289
290 pub fn scan_pending_pings_directories(
297 &self,
298 trigger_upload: bool,
299 ) -> std::thread::JoinHandle<()> {
300 let local_manager = self.directory_manager.clone();
301 let local_cached_pings = self.cached_pings.clone();
302 let local_flag = self.processed_pending_pings.clone();
303 thread::Builder::new()
304 .name("glean.ping_directory_manager.process_dir".to_string())
305 .spawn(move || {
306 {
307 let mut local_cached_pings = local_cached_pings
309 .write()
310 .expect("Can't write to pending pings cache.");
311 local_cached_pings.extend(local_manager.process_dirs());
312 local_flag.store(true, Ordering::SeqCst);
313 }
314 if trigger_upload {
315 crate::dispatcher::launch(|| {
316 if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok())
317 {
318 if let Err(e) = state.callbacks.trigger_upload() {
319 log::error!(
320 "Triggering upload after pending ping scan failed. Error: {}",
321 e
322 );
323 }
324 }
325 });
326 }
327 })
328 .expect("Unable to spawn thread to process pings directories.")
329 }
330
331 #[cfg(test)]
333 pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
334 let mut upload_manager = Self::new(data_path, "Test");
335
336 upload_manager.policy.set_max_recoverable_failures(None);
338 upload_manager.policy.set_max_wait_attempts(None);
339 upload_manager.policy.set_max_ping_body_size(None);
340 upload_manager
341 .policy
342 .set_max_pending_pings_directory_size(None);
343 upload_manager.policy.set_max_pending_pings_count(None);
344
345 upload_manager
347 .scan_pending_pings_directories(false)
348 .join()
349 .unwrap();
350
351 upload_manager
352 }
353
354 fn processed_pending_pings(&self) -> bool {
355 self.processed_pending_pings.load(Ordering::SeqCst)
356 }
357
358 fn recoverable_failure_count(&self) -> u32 {
359 self.recoverable_failure_count.load(Ordering::SeqCst)
360 }
361
362 fn wait_attempt_count(&self) -> u32 {
363 self.wait_attempt_count.load(Ordering::SeqCst)
364 }
365
366 fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
371 let PingPayload {
372 document_id,
373 upload_path: path,
374 json_body: body,
375 headers,
376 body_has_info_sections,
377 ping_name,
378 uploader_capabilities,
379 } = ping;
380 let mut request = PingRequest::builder(
381 &self.language_binding_name,
382 self.policy.max_ping_body_size(),
383 )
384 .document_id(&document_id)
385 .path(path)
386 .body(body)
387 .body_has_info_sections(body_has_info_sections)
388 .ping_name(ping_name)
389 .uploader_capabilities(uploader_capabilities);
390
391 if let Some(headers) = headers {
392 request = request.headers(headers);
393 }
394
395 match request.build() {
396 Ok(request) => Some(request),
397 Err(e) => {
398 log::warn!("Error trying to build ping request: {}", e);
399 self.directory_manager.delete_file(&document_id);
400
401 if let ErrorKind::PingBodyOverflow(s) = e.kind() {
404 self.upload_metrics
405 .discarded_exceeding_pings_size
406 .accumulate_sync(glean, *s as i64 / 1024);
407 }
408
409 None
410 }
411 }
412 }
413
414 pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
416 let mut queue = self
417 .queue
418 .write()
419 .expect("Can't write to pending pings queue.");
420
421 let PingPayload {
422 ref document_id,
423 upload_path: ref path,
424 ..
425 } = ping;
426 if queue
428 .iter()
429 .any(|request| request.document_id.as_str() == document_id)
430 {
431 log::warn!(
432 "Attempted to enqueue a duplicate ping {} at {}.",
433 document_id,
434 path
435 );
436 return;
437 }
438
439 {
440 let in_flight = self.in_flight.read().unwrap();
441 if in_flight.contains_key(document_id) {
442 log::warn!(
443 "Attempted to enqueue an in-flight ping {} at {}.",
444 document_id,
445 path
446 );
447 self.upload_metrics
448 .in_flight_pings_dropped
449 .add_sync(glean, 0);
450 return;
451 }
452 }
453
454 log::trace!("Enqueuing ping {} at {}", document_id, path);
455 if let Some(request) = self.build_ping_request(glean, ping) {
456 queue.push_back(request)
457 }
458 }
459
460 fn enqueue_cached_pings(&self, glean: &Glean) {
477 let mut cached_pings = self
478 .cached_pings
479 .write()
480 .expect("Can't write to pending pings cache.");
481
482 if cached_pings.len() > 0 {
483 let mut pending_pings_directory_size: u64 = 0;
484 let mut pending_pings_count = 0;
485 let mut deleting = false;
486
487 let total = cached_pings.pending_pings.len() as u64;
488 self.upload_metrics
489 .pending_pings
490 .add_sync(glean, total.try_into().unwrap_or(0));
491
492 if total > self.policy.max_pending_pings_count() {
493 log::warn!(
494 "More than {} pending pings in the directory, will delete {} old pings.",
495 self.policy.max_pending_pings_count(),
496 total - self.policy.max_pending_pings_count()
497 );
498 }
499
500 cached_pings.pending_pings.reverse();
506 cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
507 pending_pings_count += 1;
508 pending_pings_directory_size += file_size;
509
510 if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
512 log::warn!(
513 "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
514 self.policy.max_pending_pings_directory_size()
515 );
516 deleting = true;
517 }
518
519 if pending_pings_count > self.policy.max_pending_pings_count() {
523 deleting = true;
524 }
525
526 if deleting && self.directory_manager.delete_file(document_id) {
527 self.upload_metrics
528 .deleted_pings_after_quota_hit
529 .add_sync(glean, 1);
530 return false;
531 }
532
533 true
534 });
535 cached_pings.pending_pings.reverse();
538 self.upload_metrics
539 .pending_pings_directory_size
540 .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
541
542 cached_pings
545 .deletion_request_pings
546 .drain(..)
547 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
548 cached_pings
549 .pending_pings
550 .drain(..)
551 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
552 }
553 }
554
555 pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
568 self.rate_limiter = Some(RwLock::new(RateLimiter::new(
569 Duration::from_secs(interval),
570 max_tasks,
571 )));
572 }
573
574 pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
583 if let Some(ping) = self.directory_manager.process_file(document_id) {
584 self.enqueue_ping(glean, ping);
585 }
586 }
587
588 pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
590 log::trace!("Clearing ping queue");
591 let mut queue = self
592 .queue
593 .write()
594 .expect("Can't write to pending pings queue.");
595
596 queue.retain(|ping| ping.is_deletion_request());
597 log::trace!(
598 "{} pings left in the queue (only deletion-request expected)",
599 queue.len()
600 );
601 queue
602 }
603
604 fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
605 let wait_or_done = |time: u64| {
610 self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
611 if self.wait_attempt_count() > self.policy.max_wait_attempts() {
612 PingUploadTask::done()
613 } else {
614 PingUploadTask::Wait { time }
615 }
616 };
617
618 if !self.processed_pending_pings() {
619 log::info!(
620 "Tried getting an upload task, but processing is ongoing. Will come back later."
621 );
622 return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
623 }
624
625 self.enqueue_cached_pings(glean);
627
628 if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
629 log::warn!(
630 "Reached maximum recoverable failures for the current uploading window. You are done."
631 );
632 return PingUploadTask::done();
633 }
634
635 let mut queue = self
636 .queue
637 .write()
638 .expect("Can't write to pending pings queue.");
639 match queue.front() {
640 Some(request) => {
641 if let Some(rate_limiter) = &self.rate_limiter {
642 let mut rate_limiter = rate_limiter
643 .write()
644 .expect("Can't write to the rate limiter.");
645 if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
646 log::info!(
647 "Tried getting an upload task, but we are throttled at the moment."
648 );
649 return wait_or_done(remaining);
650 }
651 }
652
653 log::info!(
654 "New upload task with id {} (path: {})",
655 request.document_id,
656 request.path
657 );
658
659 if log_ping {
660 if let Some(body) = request.pretty_body() {
661 chunked_log_info(&request.path, &body);
662 } else {
663 chunked_log_info(&request.path, "<invalid ping payload>");
664 }
665 }
666
667 {
668 let mut in_flight = self.in_flight.write().unwrap();
672 let success_id = self.upload_metrics.send_success.start_sync();
673 let failure_id = self.upload_metrics.send_failure.start_sync();
674 in_flight.insert(request.document_id.clone(), (success_id, failure_id));
675 }
676
677 let mut request = queue.pop_front().unwrap();
678
679 request
681 .headers
682 .insert("Date".to_string(), create_date_header_value(Utc::now()));
683
684 PingUploadTask::Upload { request }
685 }
686 None => {
687 log::info!("No more pings to upload! You are done.");
688 PingUploadTask::done()
689 }
690 }
691 }
692
693 pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
704 let task = self.get_upload_task_internal(glean, log_ping);
705
706 if !task.is_wait() && self.wait_attempt_count() > 0 {
707 self.wait_attempt_count.store(0, Ordering::SeqCst);
708 }
709
710 if !task.is_upload() && self.recoverable_failure_count() > 0 {
711 self.recoverable_failure_count.store(0, Ordering::SeqCst);
712 }
713
714 task
715 }
716
717 pub fn process_ping_upload_response(
756 &self,
757 glean: &Glean,
758 document_id: &str,
759 status: UploadResult,
760 ) -> UploadTaskAction {
761 use UploadResult::*;
762
763 let stop_time = time::precise_time_ns();
764
765 if let Some(label) = status.get_label() {
766 let metric = self.upload_metrics.ping_upload_failure.get(label);
767 metric.add_sync(glean, 1);
768 }
769
770 let send_ids = {
771 let mut lock = self.in_flight.write().unwrap();
772 lock.remove(document_id)
773 };
774
775 if send_ids.is_none() {
776 self.upload_metrics.missing_send_ids.add_sync(glean, 1);
777 }
778
779 match status {
780 HttpStatus { code } if (200..=299).contains(&code) => {
781 log::info!("Ping {} successfully sent {}.", document_id, code);
782 if let Some((success_id, failure_id)) = send_ids {
783 self.upload_metrics
784 .send_success
785 .set_stop_and_accumulate(glean, success_id, stop_time);
786 self.upload_metrics.send_failure.cancel_sync(failure_id);
787 }
788 self.directory_manager.delete_file(document_id);
789 }
790
791 UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
792 log::warn!(
793 "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
794 document_id,
795 status
796 );
797 if let Some((success_id, failure_id)) = send_ids {
798 self.upload_metrics.send_success.cancel_sync(success_id);
799 self.upload_metrics
800 .send_failure
801 .set_stop_and_accumulate(glean, failure_id, stop_time);
802 }
803 self.directory_manager.delete_file(document_id);
804 }
805
806 RecoverableFailure { .. } | HttpStatus { .. } => {
807 log::warn!(
808 "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
809 document_id,
810 status
811 );
812 if let Some((success_id, failure_id)) = send_ids {
813 self.upload_metrics.send_success.cancel_sync(success_id);
814 self.upload_metrics
815 .send_failure
816 .set_stop_and_accumulate(glean, failure_id, stop_time);
817 }
818 self.enqueue_ping_from_file(glean, document_id);
819 self.recoverable_failure_count
820 .fetch_add(1, Ordering::SeqCst);
821 }
822
823 Done { .. } => {
824 log::debug!("Uploader signaled Done. Exiting.");
825 if let Some((success_id, failure_id)) = send_ids {
826 self.upload_metrics.send_success.cancel_sync(success_id);
827 self.upload_metrics.send_failure.cancel_sync(failure_id);
828 }
829 return UploadTaskAction::End;
830 }
831 };
832
833 UploadTaskAction::Next
834 }
835}
836
837#[cfg(target_os = "android")]
839pub fn chunked_log_info(path: &str, payload: &str) {
840 const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
844
845 if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
849 log::info!("Glean ping to URL: {}\n{}", path, payload);
850 return;
851 }
852
853 let mut start = 0;
856 let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
857 let mut chunk_idx = 1;
858 let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
860
861 while end < payload.len() {
862 for _ in 0..4 {
865 if payload.is_char_boundary(end) {
866 break;
867 }
868 end -= 1;
869 }
870
871 log::info!(
872 "Glean ping to URL: {} [Part {} of {}]\n{}",
873 path,
874 chunk_idx,
875 total_chunks,
876 &payload[start..end]
877 );
878
879 start = end;
881 end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
882 chunk_idx += 1;
883 }
884
885 if start < payload.len() {
887 log::info!(
888 "Glean ping to URL: {} [Part {} of {}]\n{}",
889 path,
890 chunk_idx,
891 total_chunks,
892 &payload[start..]
893 );
894 }
895}
896
897#[cfg(not(target_os = "android"))]
899pub fn chunked_log_info(_path: &str, payload: &str) {
900 log::info!("{}", payload)
901}
902
903#[cfg(test)]
904mod test {
905 use uuid::Uuid;
906
907 use super::*;
908 use crate::metrics::PingType;
909 use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
910
911 const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
912
913 #[test]
914 fn doesnt_error_when_there_are_no_pending_pings() {
915 let (glean, _t) = new_glean(None);
916
917 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
920 }
921
922 #[test]
923 fn returns_ping_request_when_there_is_one() {
924 let (glean, dir) = new_glean(None);
925
926 let upload_manager = PingUploadManager::no_policy(dir.path());
927
928 upload_manager.enqueue_ping(
930 &glean,
931 PingPayload {
932 document_id: Uuid::new_v4().to_string(),
933 upload_path: PATH.into(),
934 json_body: "".into(),
935 headers: None,
936 body_has_info_sections: true,
937 ping_name: "ping-name".into(),
938 uploader_capabilities: vec![],
939 },
940 );
941
942 let task = upload_manager.get_upload_task(&glean, false);
945 assert!(task.is_upload());
946 }
947
948 #[test]
949 fn returns_as_many_ping_requests_as_there_are() {
950 let (glean, dir) = new_glean(None);
951
952 let upload_manager = PingUploadManager::no_policy(dir.path());
953
954 let n = 10;
956 for _ in 0..n {
957 upload_manager.enqueue_ping(
958 &glean,
959 PingPayload {
960 document_id: Uuid::new_v4().to_string(),
961 upload_path: PATH.into(),
962 json_body: "".into(),
963 headers: None,
964 body_has_info_sections: true,
965 ping_name: "ping-name".into(),
966 uploader_capabilities: vec![],
967 },
968 );
969 }
970
971 for _ in 0..n {
973 let task = upload_manager.get_upload_task(&glean, false);
974 assert!(task.is_upload());
975 }
976
977 assert_eq!(
979 upload_manager.get_upload_task(&glean, false),
980 PingUploadTask::done()
981 );
982 }
983
984 #[test]
985 fn limits_the_number_of_pings_when_there_is_rate_limiting() {
986 let (glean, dir) = new_glean(None);
987
988 let mut upload_manager = PingUploadManager::no_policy(dir.path());
989
990 let max_pings_per_interval = 10;
992 upload_manager.set_rate_limiter(3, 10);
993
994 for _ in 0..max_pings_per_interval {
996 upload_manager.enqueue_ping(
997 &glean,
998 PingPayload {
999 document_id: Uuid::new_v4().to_string(),
1000 upload_path: PATH.into(),
1001 json_body: "".into(),
1002 headers: None,
1003 body_has_info_sections: true,
1004 ping_name: "ping-name".into(),
1005 uploader_capabilities: vec![],
1006 },
1007 );
1008 }
1009
1010 for _ in 0..max_pings_per_interval {
1012 let task = upload_manager.get_upload_task(&glean, false);
1013 assert!(task.is_upload());
1014 }
1015
1016 upload_manager.enqueue_ping(
1018 &glean,
1019 PingPayload {
1020 document_id: Uuid::new_v4().to_string(),
1021 upload_path: PATH.into(),
1022 json_body: "".into(),
1023 headers: None,
1024 body_has_info_sections: true,
1025 ping_name: "ping-name".into(),
1026 uploader_capabilities: vec![],
1027 },
1028 );
1029
1030 match upload_manager.get_upload_task(&glean, false) {
1032 PingUploadTask::Wait { time } => {
1033 thread::sleep(Duration::from_millis(time));
1035 }
1036 _ => panic!("Expected upload manager to return a wait task!"),
1037 };
1038
1039 let task = upload_manager.get_upload_task(&glean, false);
1040 assert!(task.is_upload());
1041 }
1042
1043 #[test]
1044 fn clearing_the_queue_works_correctly() {
1045 let (glean, dir) = new_glean(None);
1046
1047 let upload_manager = PingUploadManager::no_policy(dir.path());
1048
1049 for _ in 0..10 {
1051 upload_manager.enqueue_ping(
1052 &glean,
1053 PingPayload {
1054 document_id: Uuid::new_v4().to_string(),
1055 upload_path: PATH.into(),
1056 json_body: "".into(),
1057 headers: None,
1058 body_has_info_sections: true,
1059 ping_name: "ping-name".into(),
1060 uploader_capabilities: vec![],
1061 },
1062 );
1063 }
1064
1065 drop(upload_manager.clear_ping_queue());
1067
1068 assert_eq!(
1070 upload_manager.get_upload_task(&glean, false),
1071 PingUploadTask::done()
1072 );
1073 }
1074
1075 #[test]
1076 fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1077 let (mut glean, _t) = new_glean(None);
1078
1079 let ping_type = PingType::new(
1081 "test",
1082 true,
1083 true,
1084 true,
1085 true,
1086 true,
1087 vec![],
1088 vec![],
1089 true,
1090 vec![],
1091 );
1092 glean.register_ping_type(&ping_type);
1093
1094 let n = 10;
1096 for _ in 0..n {
1097 ping_type.submit_sync(&glean, None);
1098 }
1099
1100 glean
1101 .internal_pings
1102 .deletion_request
1103 .submit_sync(&glean, None);
1104
1105 drop(glean.upload_manager.clear_ping_queue());
1107
1108 let upload_task = glean.get_upload_task();
1109 match upload_task {
1110 PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1111 _ => panic!("Expected upload manager to return the next request!"),
1112 }
1113
1114 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1116 }
1117
1118 #[test]
1119 fn fills_up_queue_successfully_from_disk() {
1120 let (mut glean, dir) = new_glean(None);
1121
1122 let ping_type = PingType::new(
1124 "test",
1125 true,
1126 true,
1127 true,
1128 true,
1129 true,
1130 vec![],
1131 vec![],
1132 true,
1133 vec![],
1134 );
1135 glean.register_ping_type(&ping_type);
1136
1137 let n = 10;
1139 for _ in 0..n {
1140 ping_type.submit_sync(&glean, None);
1141 }
1142
1143 let upload_manager = PingUploadManager::no_policy(dir.path());
1145
1146 for _ in 0..n {
1148 let task = upload_manager.get_upload_task(&glean, false);
1149 assert!(task.is_upload());
1150 }
1151
1152 assert_eq!(
1154 upload_manager.get_upload_task(&glean, false),
1155 PingUploadTask::done()
1156 );
1157 }
1158
1159 #[test]
1160 fn processes_correctly_success_upload_response() {
1161 let (mut glean, dir) = new_glean(None);
1162
1163 let ping_type = PingType::new(
1165 "test",
1166 true,
1167 true,
1168 true,
1169 true,
1170 true,
1171 vec![],
1172 vec![],
1173 true,
1174 vec![],
1175 );
1176 glean.register_ping_type(&ping_type);
1177
1178 ping_type.submit_sync(&glean, None);
1180
1181 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1183
1184 match glean.get_upload_task() {
1186 PingUploadTask::Upload { request } => {
1187 let document_id = request.document_id;
1189 glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1190 assert!(!pending_pings_dir.join(document_id).exists());
1192 }
1193 _ => panic!("Expected upload manager to return the next request!"),
1194 }
1195
1196 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1198 }
1199
1200 #[test]
1201 fn processes_correctly_client_error_upload_response() {
1202 let (mut glean, dir) = new_glean(None);
1203
1204 let ping_type = PingType::new(
1206 "test",
1207 true,
1208 true,
1209 true,
1210 true,
1211 true,
1212 vec![],
1213 vec![],
1214 true,
1215 vec![],
1216 );
1217 glean.register_ping_type(&ping_type);
1218
1219 ping_type.submit_sync(&glean, None);
1221
1222 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1224
1225 match glean.get_upload_task() {
1227 PingUploadTask::Upload { request } => {
1228 let document_id = request.document_id;
1230 glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1231 assert!(!pending_pings_dir.join(document_id).exists());
1233 }
1234 _ => panic!("Expected upload manager to return the next request!"),
1235 }
1236
1237 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1239 }
1240
1241 #[test]
1242 fn processes_correctly_server_error_upload_response() {
1243 let (mut glean, _t) = new_glean(None);
1244
1245 let ping_type = PingType::new(
1247 "test",
1248 true,
1249 true,
1250 true,
1251 true,
1252 true,
1253 vec![],
1254 vec![],
1255 true,
1256 vec![],
1257 );
1258 glean.register_ping_type(&ping_type);
1259
1260 ping_type.submit_sync(&glean, None);
1262
1263 match glean.get_upload_task() {
1265 PingUploadTask::Upload { request } => {
1266 let document_id = request.document_id;
1268 glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1269 match glean.get_upload_task() {
1271 PingUploadTask::Upload { request } => {
1272 assert_eq!(document_id, request.document_id);
1273 }
1274 _ => panic!("Expected upload manager to return the next request!"),
1275 }
1276 }
1277 _ => panic!("Expected upload manager to return the next request!"),
1278 }
1279
1280 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1282 }
1283
1284 #[test]
1285 fn processes_correctly_unrecoverable_upload_response() {
1286 let (mut glean, dir) = new_glean(None);
1287
1288 let ping_type = PingType::new(
1290 "test",
1291 true,
1292 true,
1293 true,
1294 true,
1295 true,
1296 vec![],
1297 vec![],
1298 true,
1299 vec![],
1300 );
1301 glean.register_ping_type(&ping_type);
1302
1303 ping_type.submit_sync(&glean, None);
1305
1306 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1308
1309 match glean.get_upload_task() {
1311 PingUploadTask::Upload { request } => {
1312 let document_id = request.document_id;
1314 glean.process_ping_upload_response(
1315 &document_id,
1316 UploadResult::unrecoverable_failure(),
1317 );
1318 assert!(!pending_pings_dir.join(document_id).exists());
1320 }
1321 _ => panic!("Expected upload manager to return the next request!"),
1322 }
1323
1324 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1326 }
1327
1328 #[test]
1329 fn new_pings_are_added_while_upload_in_progress() {
1330 let (glean, dir) = new_glean(None);
1331
1332 let upload_manager = PingUploadManager::no_policy(dir.path());
1333
1334 let doc1 = Uuid::new_v4().to_string();
1335 let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1336
1337 let doc2 = Uuid::new_v4().to_string();
1338 let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1339
1340 upload_manager.enqueue_ping(
1342 &glean,
1343 PingPayload {
1344 document_id: doc1.clone(),
1345 upload_path: path1,
1346 json_body: "".into(),
1347 headers: None,
1348 body_has_info_sections: true,
1349 ping_name: "test-ping".into(),
1350 uploader_capabilities: vec![],
1351 },
1352 );
1353
1354 let req = match upload_manager.get_upload_task(&glean, false) {
1356 PingUploadTask::Upload { request } => request,
1357 _ => panic!("Expected upload manager to return the next request!"),
1358 };
1359 assert_eq!(doc1, req.document_id);
1360
1361 upload_manager.enqueue_ping(
1363 &glean,
1364 PingPayload {
1365 document_id: doc2.clone(),
1366 upload_path: path2,
1367 json_body: "".into(),
1368 headers: None,
1369 body_has_info_sections: true,
1370 ping_name: "test-ping".into(),
1371 uploader_capabilities: vec![],
1372 },
1373 );
1374
1375 upload_manager.process_ping_upload_response(
1377 &glean,
1378 &req.document_id,
1379 UploadResult::http_status(200),
1380 );
1381
1382 let req = match upload_manager.get_upload_task(&glean, false) {
1384 PingUploadTask::Upload { request } => request,
1385 _ => panic!("Expected upload manager to return the next request!"),
1386 };
1387 assert_eq!(doc2, req.document_id);
1388
1389 upload_manager.process_ping_upload_response(
1391 &glean,
1392 &req.document_id,
1393 UploadResult::http_status(200),
1394 );
1395
1396 assert_eq!(
1398 upload_manager.get_upload_task(&glean, false),
1399 PingUploadTask::done()
1400 );
1401 }
1402
1403 #[test]
1404 fn adds_debug_view_header_to_requests_when_tag_is_set() {
1405 let (mut glean, _t) = new_glean(None);
1406
1407 glean.set_debug_view_tag("valid-tag");
1408
1409 let ping_type = PingType::new(
1411 "test",
1412 true,
1413 true,
1414 true,
1415 true,
1416 true,
1417 vec![],
1418 vec![],
1419 true,
1420 vec![],
1421 );
1422 glean.register_ping_type(&ping_type);
1423
1424 ping_type.submit_sync(&glean, None);
1426
1427 match glean.get_upload_task() {
1429 PingUploadTask::Upload { request } => {
1430 assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1431 }
1432 _ => panic!("Expected upload manager to return the next request!"),
1433 }
1434 }
1435
1436 #[test]
1437 fn duplicates_are_not_enqueued() {
1438 let (glean, dir) = new_glean(None);
1439
1440 let upload_manager = PingUploadManager::no_policy(dir.path());
1443
1444 let doc_id = Uuid::new_v4().to_string();
1445 let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1446
1447 upload_manager.enqueue_ping(
1449 &glean,
1450 PingPayload {
1451 document_id: doc_id.clone(),
1452 upload_path: path.clone(),
1453 json_body: "".into(),
1454 headers: None,
1455 body_has_info_sections: true,
1456 ping_name: "test-ping".into(),
1457 uploader_capabilities: vec![],
1458 },
1459 );
1460 upload_manager.enqueue_ping(
1461 &glean,
1462 PingPayload {
1463 document_id: doc_id,
1464 upload_path: path,
1465 json_body: "".into(),
1466 headers: None,
1467 body_has_info_sections: true,
1468 ping_name: "test-ping".into(),
1469 uploader_capabilities: vec![],
1470 },
1471 );
1472
1473 let task = upload_manager.get_upload_task(&glean, false);
1475 assert!(task.is_upload());
1476
1477 assert_eq!(
1479 upload_manager.get_upload_task(&glean, false),
1480 PingUploadTask::done()
1481 );
1482 }
1483
1484 #[test]
1485 fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1486 let (mut glean, dir) = new_glean(None);
1487
1488 let ping_type = PingType::new(
1490 "test",
1491 true,
1492 true,
1493 true,
1494 true,
1495 true,
1496 vec![],
1497 vec![],
1498 true,
1499 vec![],
1500 );
1501 glean.register_ping_type(&ping_type);
1502
1503 let n = 5;
1505 for _ in 0..n {
1506 ping_type.submit_sync(&glean, None);
1507 }
1508
1509 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1510
1511 let max_recoverable_failures = 3;
1513 upload_manager
1514 .policy
1515 .set_max_recoverable_failures(Some(max_recoverable_failures));
1516
1517 for _ in 0..max_recoverable_failures {
1519 match upload_manager.get_upload_task(&glean, false) {
1520 PingUploadTask::Upload { request } => {
1521 upload_manager.process_ping_upload_response(
1522 &glean,
1523 &request.document_id,
1524 UploadResult::recoverable_failure(),
1525 );
1526 }
1527 _ => panic!("Expected upload manager to return the next request!"),
1528 }
1529 }
1530
1531 assert_eq!(
1534 upload_manager.get_upload_task(&glean, false),
1535 PingUploadTask::done()
1536 );
1537
1538 for _ in 0..n {
1540 let task = upload_manager.get_upload_task(&glean, false);
1541 assert!(task.is_upload());
1542 }
1543 }
1544
1545 #[test]
1546 fn quota_is_enforced_when_enqueueing_cached_pings() {
1547 let (mut glean, dir) = new_glean(None);
1548
1549 let ping_type = PingType::new(
1551 "test",
1552 true,
1553 true,
1554 true,
1555 true,
1556 true,
1557 vec![],
1558 vec![],
1559 true,
1560 vec![],
1561 );
1562 glean.register_ping_type(&ping_type);
1563
1564 let n = 10;
1566 for _ in 0..n {
1567 ping_type.submit_sync(&glean, None);
1568 }
1569
1570 let directory_manager = PingDirectoryManager::new(dir.path());
1571 let pending_pings = directory_manager.process_dirs().pending_pings;
1572 let (_, newest_ping) = &pending_pings.last().unwrap();
1575 let PingPayload {
1576 document_id: newest_ping_id,
1577 ..
1578 } = &newest_ping;
1579
1580 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1582
1583 upload_manager
1590 .policy
1591 .set_max_pending_pings_directory_size(Some(500));
1592
1593 match upload_manager.get_upload_task(&glean, false) {
1597 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1598 _ => panic!("Expected upload manager to return the next request!"),
1599 }
1600
1601 assert_eq!(
1604 upload_manager.get_upload_task(&glean, false),
1605 PingUploadTask::done()
1606 );
1607
1608 assert_eq!(
1610 n - 1,
1611 upload_manager
1612 .upload_metrics
1613 .deleted_pings_after_quota_hit
1614 .get_value(&glean, Some("metrics"))
1615 .unwrap()
1616 );
1617 assert_eq!(
1618 n,
1619 upload_manager
1620 .upload_metrics
1621 .pending_pings
1622 .get_value(&glean, Some("metrics"))
1623 .unwrap()
1624 );
1625 }
1626
1627 #[test]
1628 fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1629 let (mut glean, dir) = new_glean(None);
1630
1631 let ping_type = PingType::new(
1633 "test",
1634 true,
1635 true,
1636 true,
1637 true,
1638 true,
1639 vec![],
1640 vec![],
1641 true,
1642 vec![],
1643 );
1644 glean.register_ping_type(&ping_type);
1645
1646 let count_quota = 3;
1648 let n = 10;
1650
1651 for _ in 0..n {
1653 ping_type.submit_sync(&glean, None);
1654 }
1655
1656 let directory_manager = PingDirectoryManager::new(dir.path());
1657 let pending_pings = directory_manager.process_dirs().pending_pings;
1658 let expected_pings = pending_pings
1661 .iter()
1662 .rev()
1663 .take(count_quota)
1664 .map(|(_, ping)| ping.document_id.clone())
1665 .collect::<Vec<_>>();
1666
1667 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1669
1670 upload_manager
1671 .policy
1672 .set_max_pending_pings_count(Some(count_quota as u64));
1673
1674 for ping_id in expected_pings.iter().rev() {
1678 match upload_manager.get_upload_task(&glean, false) {
1679 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1680 _ => panic!("Expected upload manager to return the next request!"),
1681 }
1682 }
1683
1684 assert_eq!(
1687 upload_manager.get_upload_task(&glean, false),
1688 PingUploadTask::done()
1689 );
1690
1691 assert_eq!(
1693 (n - count_quota) as i32,
1694 upload_manager
1695 .upload_metrics
1696 .deleted_pings_after_quota_hit
1697 .get_value(&glean, Some("metrics"))
1698 .unwrap()
1699 );
1700 assert_eq!(
1701 n as i32,
1702 upload_manager
1703 .upload_metrics
1704 .pending_pings
1705 .get_value(&glean, Some("metrics"))
1706 .unwrap()
1707 );
1708 }
1709
1710 #[test]
1711 fn size_and_count_quota_work_together_size_first() {
1712 let (mut glean, dir) = new_glean(None);
1713
1714 let ping_type = PingType::new(
1716 "test",
1717 true,
1718 true,
1719 true,
1720 true,
1721 true,
1722 vec![],
1723 vec![],
1724 true,
1725 vec![],
1726 );
1727 glean.register_ping_type(&ping_type);
1728
1729 let expected_number_of_pings = 3;
1730 let n = 10;
1732
1733 for _ in 0..n {
1735 ping_type.submit_sync(&glean, None);
1736 }
1737
1738 let directory_manager = PingDirectoryManager::new(dir.path());
1739 let pending_pings = directory_manager.process_dirs().pending_pings;
1740 let expected_pings = pending_pings
1743 .iter()
1744 .rev()
1745 .take(expected_number_of_pings)
1746 .map(|(_, ping)| ping.document_id.clone())
1747 .collect::<Vec<_>>();
1748
1749 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1751
1752 upload_manager
1755 .policy
1756 .set_max_pending_pings_directory_size(Some(1300));
1757 upload_manager.policy.set_max_pending_pings_count(Some(5));
1758
1759 for ping_id in expected_pings.iter().rev() {
1763 match upload_manager.get_upload_task(&glean, false) {
1764 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1765 _ => panic!("Expected upload manager to return the next request!"),
1766 }
1767 }
1768
1769 assert_eq!(
1772 upload_manager.get_upload_task(&glean, false),
1773 PingUploadTask::done()
1774 );
1775
1776 assert_eq!(
1778 (n - expected_number_of_pings) as i32,
1779 upload_manager
1780 .upload_metrics
1781 .deleted_pings_after_quota_hit
1782 .get_value(&glean, Some("metrics"))
1783 .unwrap()
1784 );
1785 assert_eq!(
1786 n as i32,
1787 upload_manager
1788 .upload_metrics
1789 .pending_pings
1790 .get_value(&glean, Some("metrics"))
1791 .unwrap()
1792 );
1793 }
1794
1795 #[test]
1796 fn size_and_count_quota_work_together_count_first() {
1797 let (mut glean, dir) = new_glean(None);
1798
1799 let ping_type = PingType::new(
1801 "test",
1802 true,
1803 true,
1804 true,
1805 true,
1806 true,
1807 vec![],
1808 vec![],
1809 true,
1810 vec![],
1811 );
1812 glean.register_ping_type(&ping_type);
1813
1814 let expected_number_of_pings = 2;
1815 let n = 10;
1817
1818 for _ in 0..n {
1820 ping_type.submit_sync(&glean, None);
1821 }
1822
1823 let directory_manager = PingDirectoryManager::new(dir.path());
1824 let pending_pings = directory_manager.process_dirs().pending_pings;
1825 let expected_pings = pending_pings
1828 .iter()
1829 .rev()
1830 .take(expected_number_of_pings)
1831 .map(|(_, ping)| ping.document_id.clone())
1832 .collect::<Vec<_>>();
1833
1834 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1836
1837 upload_manager
1840 .policy
1841 .set_max_pending_pings_directory_size(Some(1000));
1842 upload_manager.policy.set_max_pending_pings_count(Some(2));
1843
1844 for ping_id in expected_pings.iter().rev() {
1848 match upload_manager.get_upload_task(&glean, false) {
1849 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1850 _ => panic!("Expected upload manager to return the next request!"),
1851 }
1852 }
1853
1854 assert_eq!(
1857 upload_manager.get_upload_task(&glean, false),
1858 PingUploadTask::done()
1859 );
1860
1861 assert_eq!(
1863 (n - expected_number_of_pings) as i32,
1864 upload_manager
1865 .upload_metrics
1866 .deleted_pings_after_quota_hit
1867 .get_value(&glean, Some("metrics"))
1868 .unwrap()
1869 );
1870 assert_eq!(
1871 n as i32,
1872 upload_manager
1873 .upload_metrics
1874 .pending_pings
1875 .get_value(&glean, Some("metrics"))
1876 .unwrap()
1877 );
1878 }
1879
1880 #[test]
1881 fn maximum_wait_attemps_is_enforced() {
1882 let (glean, dir) = new_glean(None);
1883
1884 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1885
1886 let max_wait_attempts = 3;
1888 upload_manager
1889 .policy
1890 .set_max_wait_attempts(Some(max_wait_attempts));
1891
1892 let secs_per_interval = 5;
1898 let max_pings_per_interval = 1;
1899 upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
1900
1901 upload_manager.enqueue_ping(
1903 &glean,
1904 PingPayload {
1905 document_id: Uuid::new_v4().to_string(),
1906 upload_path: PATH.into(),
1907 json_body: "".into(),
1908 headers: None,
1909 body_has_info_sections: true,
1910 ping_name: "ping-name".into(),
1911 uploader_capabilities: vec![],
1912 },
1913 );
1914 upload_manager.enqueue_ping(
1915 &glean,
1916 PingPayload {
1917 document_id: Uuid::new_v4().to_string(),
1918 upload_path: PATH.into(),
1919 json_body: "".into(),
1920 headers: None,
1921 body_has_info_sections: true,
1922 ping_name: "ping-name".into(),
1923 uploader_capabilities: vec![],
1924 },
1925 );
1926
1927 match upload_manager.get_upload_task(&glean, false) {
1929 PingUploadTask::Upload { .. } => {}
1930 _ => panic!("Expected upload manager to return the next request!"),
1931 }
1932
1933 for _ in 0..max_wait_attempts {
1937 let task = upload_manager.get_upload_task(&glean, false);
1938 assert!(task.is_wait());
1939 }
1940
1941 assert_eq!(
1944 upload_manager.get_upload_task(&glean, false),
1945 PingUploadTask::done()
1946 );
1947
1948 thread::sleep(Duration::from_secs(secs_per_interval));
1950
1951 let task = upload_manager.get_upload_task(&glean, false);
1953 assert!(task.is_upload());
1954
1955 assert_eq!(
1957 upload_manager.get_upload_task(&glean, false),
1958 PingUploadTask::done()
1959 );
1960 }
1961
1962 #[test]
1963 fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
1964 let (glean, dir) = new_glean(None);
1965 let upload_manager = PingUploadManager::new(dir.path(), "test");
1966 match upload_manager.get_upload_task(&glean, false) {
1967 PingUploadTask::Wait { time } => {
1968 assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
1969 }
1970 _ => panic!("Expected upload manager to return a wait task!"),
1971 };
1972 }
1973
1974 #[test]
1975 fn cannot_enqueue_ping_while_its_being_processed() {
1976 let (glean, dir) = new_glean(None);
1977
1978 let upload_manager = PingUploadManager::no_policy(dir.path());
1979
1980 let identifier = &Uuid::new_v4();
1982 let ping = PingPayload {
1983 document_id: identifier.to_string(),
1984 upload_path: PATH.into(),
1985 json_body: "".into(),
1986 headers: None,
1987 body_has_info_sections: true,
1988 ping_name: "ping-name".into(),
1989 uploader_capabilities: vec![],
1990 };
1991 upload_manager.enqueue_ping(&glean, ping);
1992 assert!(upload_manager.get_upload_task(&glean, false).is_upload());
1993
1994 let ping = PingPayload {
1996 document_id: identifier.to_string(),
1997 upload_path: PATH.into(),
1998 json_body: "".into(),
1999 headers: None,
2000 body_has_info_sections: true,
2001 ping_name: "ping-name".into(),
2002 uploader_capabilities: vec![],
2003 };
2004 upload_manager.enqueue_ping(&glean, ping);
2005
2006 assert_eq!(
2008 upload_manager.get_upload_task(&glean, false),
2009 PingUploadTask::done()
2010 );
2011
2012 upload_manager.process_ping_upload_response(
2014 &glean,
2015 &identifier.to_string(),
2016 UploadResult::http_status(200),
2017 );
2018 }
2019}