glean_core/upload/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Manages the pending pings queue and directory.
6//!
7//! * Keeps track of pending pings, loading any unsent ping from disk on startup;
8//! * Exposes [`get_upload_task`](PingUploadManager::get_upload_task) API for
9//!   the platform layer to request next upload task;
10//! * Exposes
11//!   [`process_ping_upload_response`](PingUploadManager::process_ping_upload_response)
12//!   API to check the HTTP response from the ping upload and either delete the
13//!   corresponding ping from disk or re-enqueue it for sending.
14
15use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::mem;
18use std::path::PathBuf;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::sync::{Arc, RwLock, RwLockWriteGuard};
21use std::time::{Duration, Instant};
22
23use chrono::Utc;
24use malloc_size_of::MallocSizeOf;
25use malloc_size_of_derive::MallocSizeOf;
26
27use crate::error::ErrorKind;
28use crate::TimerId;
29use crate::{internal_metrics::UploadMetrics, Glean};
30pub use directory::process_metadata;
31use directory::{PingDirectoryManager, PingPayloadsByDirectory};
32use policy::Policy;
33use request::create_date_header_value;
34
35pub use directory::{PingMetadata, PingPayload};
36pub use request::{HeaderMap, PingRequest};
37pub use result::{UploadResult, UploadTaskAction};
38
39mod directory;
40mod policy;
41mod request;
42mod result;
43
44const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; // in milliseconds
45
46#[derive(Debug, MallocSizeOf)]
47struct RateLimiter {
48    /// The instant the current interval has started.
49    started: Option<Instant>,
50    /// The count for the current interval.
51    count: u32,
52    /// The duration of each interval.
53    interval: Duration,
54    /// The maximum count per interval.
55    max_count: u32,
56}
57
58/// An enum to represent the current state of the RateLimiter.
59#[derive(PartialEq)]
60enum RateLimiterState {
61    /// The RateLimiter has not reached the maximum count and is still incrementing.
62    Incrementing,
63    /// The RateLimiter has reached the maximum count for the  current interval.
64    ///
65    /// This variant contains the remaining time (in milliseconds)
66    /// until the rate limiter is not throttled anymore.
67    Throttled(u64),
68}
69
70impl RateLimiter {
71    pub fn new(interval: Duration, max_count: u32) -> Self {
72        Self {
73            started: None,
74            count: 0,
75            interval,
76            max_count,
77        }
78    }
79
80    fn reset(&mut self) {
81        self.started = Some(Instant::now());
82        self.count = 0;
83    }
84
85    fn elapsed(&self) -> Duration {
86        self.started.unwrap().elapsed()
87    }
88
89    // The counter should reset if
90    //
91    // 1. It has never started;
92    // 2. It has been started more than the interval time ago;
93    // 3. Something goes wrong while trying to calculate the elapsed time since the last reset.
94    fn should_reset(&self) -> bool {
95        if self.started.is_none() {
96            return true;
97        }
98
99        // Safe unwrap, we already stated that `self.started` is not `None` above.
100        if self.elapsed() > self.interval {
101            return true;
102        }
103
104        false
105    }
106
107    /// Tries to increment the internal counter.
108    ///
109    /// # Returns
110    ///
111    /// The current state of the RateLimiter.
112    pub fn get_state(&mut self) -> RateLimiterState {
113        if self.should_reset() {
114            self.reset();
115        }
116
117        if self.count == self.max_count {
118            // Note that `remining` can't be a negative number because we just called `reset`,
119            // which will check if it is and reset if so.
120            let remaining = self.interval.as_millis() - self.elapsed().as_millis();
121            return RateLimiterState::Throttled(
122                remaining
123                    .try_into()
124                    .unwrap_or(self.interval.as_secs() * 1000),
125            );
126        }
127
128        self.count += 1;
129        RateLimiterState::Incrementing
130    }
131}
132
133/// An enum representing the possible upload tasks to be performed by an uploader.
134///
135/// When asking for the next ping request to upload,
136/// the requester may receive one out of three possible tasks.
137#[derive(PartialEq, Eq, Debug)]
138pub enum PingUploadTask {
139    /// An upload task
140    Upload {
141        /// The ping request for upload
142        /// See [`PingRequest`](struct.PingRequest.html) for more information.
143        request: PingRequest,
144    },
145
146    /// A flag signaling that the pending pings directories are not done being processed,
147    /// thus the requester should wait and come back later.
148    Wait {
149        /// The time in milliseconds
150        /// the requester should wait before requesting a new task.
151        time: u64,
152    },
153
154    /// A flag signaling that requester doesn't need to request any more upload tasks at this moment.
155    ///
156    /// There are three possibilities for this scenario:
157    /// * Pending pings queue is empty, no more pings to request;
158    /// * Requester has gotten more than MAX_WAIT_ATTEMPTS (3, by default) `PingUploadTask::Wait` responses in a row;
159    /// * Requester has reported more than MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW
160    ///   recoverable upload failures on the same uploading window (see below)
161    ///   and should stop requesting at this moment.
162    ///
163    /// An "uploading window" starts when a requester gets a new
164    /// `PingUploadTask::Upload(PingRequest)` response and finishes when they
165    /// finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response.
166    Done {
167        #[doc(hidden)]
168        /// Unused field. Required because UniFFI can't handle variants without fields.
169        unused: i8,
170    },
171}
172
173impl PingUploadTask {
174    /// Whether the current task is an upload task.
175    pub fn is_upload(&self) -> bool {
176        matches!(self, PingUploadTask::Upload { .. })
177    }
178
179    /// Whether the current task is wait task.
180    pub fn is_wait(&self) -> bool {
181        matches!(self, PingUploadTask::Wait { .. })
182    }
183
184    pub(crate) fn done() -> Self {
185        PingUploadTask::Done { unused: 0 }
186    }
187}
188
189/// Manages the pending pings queue and directory.
190#[derive(Debug)]
191pub struct PingUploadManager {
192    /// A FIFO queue storing a `PingRequest` for each pending ping.
193    queue: RwLock<VecDeque<PingRequest>>,
194    /// A manager for the pending pings directories.
195    directory_manager: PingDirectoryManager,
196    /// A flag signaling if we are done processing the pending pings directories.
197    processed_pending_pings: Arc<AtomicBool>,
198    /// A vector to store the pending pings processed off-thread.
199    cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
200    /// The number of upload failures for the current uploading window.
201    recoverable_failure_count: AtomicU32,
202    /// The number or times in a row a user has received a `PingUploadTask::Wait` response.
203    wait_attempt_count: AtomicU32,
204    /// A ping counter to help rate limit the ping uploads.
205    ///
206    /// To keep resource usage in check,
207    /// we may want to limit the amount of pings sent in a given interval.
208    rate_limiter: Option<RwLock<RateLimiter>>,
209    /// The name of the programming language used by the binding creating this instance of PingUploadManager.
210    ///
211    /// This will be used to build the value User-Agent header for each ping request.
212    language_binding_name: String,
213    /// Metrics related to ping uploading.
214    upload_metrics: UploadMetrics,
215    /// Policies for ping storage, uploading and requests.
216    policy: Policy,
217
218    in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
219}
220
221impl MallocSizeOf for PingUploadManager {
222    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
223        let shallow_size = {
224            let queue = self.queue.read().unwrap();
225            if ops.has_malloc_enclosing_size_of() {
226                if let Some(front) = queue.front() {
227                    // SAFETY: The front element is a valid interior pointer and thus valid to pass
228                    // to an external function.
229                    unsafe { ops.malloc_enclosing_size_of(front) }
230                } else {
231                    // This assumes that no memory is allocated when the VecDeque is empty.
232                    0
233                }
234            } else {
235                // If `ops` can't estimate the size of a pointer,
236                // we can estimate the allocation size by the size of each element and the
237                // allocated capacity.
238                queue.capacity() * mem::size_of::<PingRequest>()
239            }
240        };
241
242        let mut n = shallow_size
243            + self.directory_manager.size_of(ops)
244            // SAFETY: We own this arc and can pass a pointer to it to an external function.
245            + unsafe { ops.malloc_size_of(self.processed_pending_pings.as_ptr()) }
246            + self.cached_pings.read().unwrap().size_of(ops)
247            + self.rate_limiter.as_ref().map(|rl| {
248                let lock = rl.read().unwrap();
249                (*lock).size_of(ops)
250            }).unwrap_or(0)
251            + self.language_binding_name.size_of(ops)
252            + self.upload_metrics.size_of(ops)
253            + self.policy.size_of(ops);
254
255        let in_flight = self.in_flight.read().unwrap();
256        n += in_flight.size_of(ops);
257
258        n
259    }
260}
261
262impl PingUploadManager {
263    /// Creates a new PingUploadManager.
264    ///
265    /// # Arguments
266    ///
267    /// * `data_path` - Path to the pending pings directory.
268    /// * `language_binding_name` - The name of the language binding calling this managers instance.
269    ///
270    /// # Panics
271    ///
272    /// Will panic if unable to spawn a new thread.
273    pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
274        Self {
275            queue: RwLock::new(VecDeque::new()),
276            directory_manager: PingDirectoryManager::new(data_path),
277            processed_pending_pings: Arc::new(AtomicBool::new(false)),
278            cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
279            recoverable_failure_count: AtomicU32::new(0),
280            wait_attempt_count: AtomicU32::new(0),
281            rate_limiter: None,
282            language_binding_name: language_binding_name.into(),
283            upload_metrics: UploadMetrics::new(),
284            policy: Policy::default(),
285            in_flight: RwLock::new(HashMap::default()),
286        }
287    }
288
289    /// Spawns a new thread and processes the pending pings directories,
290    /// filling up the queue with whatever pings are in there.
291    ///
292    /// # Returns
293    ///
294    /// The `JoinHandle` to the spawned thread
295    pub fn scan_pending_pings_directories(
296        &self,
297        trigger_upload: bool,
298    ) -> std::thread::JoinHandle<()> {
299        let local_manager = self.directory_manager.clone();
300        let local_cached_pings = self.cached_pings.clone();
301        let local_flag = self.processed_pending_pings.clone();
302        crate::thread::spawn("glean.ping_directory_manager.process_dir", move || {
303            {
304                // Be sure to drop local_cached_pings lock before triggering upload.
305                let mut local_cached_pings = local_cached_pings
306                    .write()
307                    .expect("Can't write to pending pings cache.");
308                local_cached_pings.extend(local_manager.process_dirs());
309                local_flag.store(true, Ordering::SeqCst);
310            }
311            if trigger_upload {
312                crate::dispatcher::launch(|| {
313                    if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok()) {
314                        if let Err(e) = state.callbacks.trigger_upload() {
315                            log::error!(
316                                "Triggering upload after pending ping scan failed. Error: {}",
317                                e
318                            );
319                        }
320                    }
321                });
322            }
323        })
324        .expect("Unable to spawn thread to process pings directories.")
325    }
326
327    /// Creates a new upload manager with no limitations, for tests.
328    #[cfg(test)]
329    pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
330        let mut upload_manager = Self::new(data_path, "Test");
331
332        // Disable all policies for tests, if necessary individuals tests can re-enable them.
333        upload_manager.policy.set_max_recoverable_failures(None);
334        upload_manager.policy.set_max_wait_attempts(None);
335        upload_manager.policy.set_max_ping_body_size(None);
336        upload_manager
337            .policy
338            .set_max_pending_pings_directory_size(None);
339        upload_manager.policy.set_max_pending_pings_count(None);
340
341        // When building for tests, always scan the pending pings directories and do it sync.
342        upload_manager
343            .scan_pending_pings_directories(false)
344            .join()
345            .unwrap();
346
347        upload_manager
348    }
349
350    fn processed_pending_pings(&self) -> bool {
351        self.processed_pending_pings.load(Ordering::SeqCst)
352    }
353
354    fn recoverable_failure_count(&self) -> u32 {
355        self.recoverable_failure_count.load(Ordering::SeqCst)
356    }
357
358    fn wait_attempt_count(&self) -> u32 {
359        self.wait_attempt_count.load(Ordering::SeqCst)
360    }
361
362    /// Attempts to build a ping request from a ping file payload.
363    ///
364    /// Returns the `PingRequest` or `None` if unable to build,
365    /// in which case it will delete the ping file and record an error.
366    fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
367        let PingPayload {
368            document_id,
369            upload_path: path,
370            json_body: body,
371            headers,
372            body_has_info_sections,
373            ping_name,
374            uploader_capabilities,
375        } = ping;
376        let mut request = PingRequest::builder(
377            &self.language_binding_name,
378            self.policy.max_ping_body_size(),
379        )
380        .document_id(&document_id)
381        .path(path)
382        .body(body)
383        .body_has_info_sections(body_has_info_sections)
384        .ping_name(ping_name)
385        .uploader_capabilities(uploader_capabilities);
386
387        if let Some(headers) = headers {
388            request = request.headers(headers);
389        }
390
391        match request.build() {
392            Ok(request) => Some(request),
393            Err(e) => {
394                log::warn!("Error trying to build ping request: {}", e);
395                self.directory_manager.delete_file(&document_id);
396
397                // Record the error.
398                // Currently the only possible error is PingBodyOverflow.
399                if let ErrorKind::PingBodyOverflow(s) = e.kind() {
400                    self.upload_metrics
401                        .discarded_exceeding_pings_size
402                        .accumulate_sync(glean, *s as i64 / 1024);
403                }
404
405                None
406            }
407        }
408    }
409
410    /// Enqueue a ping for upload.
411    pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
412        let mut queue = self
413            .queue
414            .write()
415            .expect("Can't write to pending pings queue.");
416
417        let PingPayload {
418            ref document_id,
419            upload_path: ref path,
420            ..
421        } = ping;
422        // Checks if a ping with this `document_id` is already enqueued.
423        if queue
424            .iter()
425            .any(|request| request.document_id.as_str() == document_id)
426        {
427            log::warn!(
428                "Attempted to enqueue a duplicate ping {} at {}.",
429                document_id,
430                path
431            );
432            return;
433        }
434
435        {
436            let in_flight = self.in_flight.read().unwrap();
437            if in_flight.contains_key(document_id) {
438                log::warn!(
439                    "Attempted to enqueue an in-flight ping {} at {}.",
440                    document_id,
441                    path
442                );
443                self.upload_metrics
444                    .in_flight_pings_dropped
445                    .add_sync(glean, 0);
446                return;
447            }
448        }
449
450        log::trace!("Enqueuing ping {} at {}", document_id, path);
451        if let Some(request) = self.build_ping_request(glean, ping) {
452            queue.push_back(request)
453        }
454    }
455
456    /// Enqueues pings that might have been cached.
457    ///
458    /// The size of the PENDING_PINGS_DIRECTORY directory will be calculated
459    /// (by accumulating each ping's size in that directory)
460    /// and in case we exceed the quota, defined by the `quota` arg,
461    /// outstanding pings get deleted and are not enqueued.
462    ///
463    /// The size of the DELETION_REQUEST_PINGS_DIRECTORY will not be calculated
464    /// and no deletion-request pings will be deleted. Deletion request pings
465    /// are not very common and usually don't contain any data,
466    /// we don't expect that directory to ever reach quota.
467    /// Most importantly, we don't want to ever delete deletion-request pings.
468    ///
469    /// # Arguments
470    ///
471    /// * `glean` - The Glean object holding the database.
472    fn enqueue_cached_pings(&self, glean: &Glean) {
473        let mut cached_pings = self
474            .cached_pings
475            .write()
476            .expect("Can't write to pending pings cache.");
477
478        if cached_pings.len() > 0 {
479            let mut pending_pings_directory_size: u64 = 0;
480            let mut pending_pings_count = 0;
481            let mut deleting = false;
482
483            let total = cached_pings.pending_pings.len() as u64;
484            self.upload_metrics
485                .pending_pings
486                .add_sync(glean, total.try_into().unwrap_or(0));
487
488            if total > self.policy.max_pending_pings_count() {
489                log::warn!(
490                    "More than {} pending pings in the directory, will delete {} old pings.",
491                    self.policy.max_pending_pings_count(),
492                    total - self.policy.max_pending_pings_count()
493                );
494            }
495
496            // The pending pings vector is sorted by date in ascending order (oldest -> newest).
497            // We need to calculate the size of the pending pings directory
498            // and delete the **oldest** pings in case quota is reached.
499            // Thus, we reverse the order of the pending pings vector,
500            // so that we iterate in descending order (newest -> oldest).
501            cached_pings.pending_pings.reverse();
502            cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
503                pending_pings_count += 1;
504                pending_pings_directory_size += file_size;
505
506                // We don't want to spam the log for every ping over the quota.
507                if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
508                    log::warn!(
509                        "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
510                        self.policy.max_pending_pings_directory_size()
511                    );
512                    deleting = true;
513                }
514
515                // Once we reach the number of allowed pings we start deleting,
516                // no matter what size.
517                // We already log this before the loop.
518                if pending_pings_count > self.policy.max_pending_pings_count() {
519                    deleting = true;
520                }
521
522                if deleting && self.directory_manager.delete_file(document_id) {
523                    self.upload_metrics
524                        .deleted_pings_after_quota_hit
525                        .add_sync(glean, 1);
526                    return false;
527                }
528
529                true
530            });
531            // After calculating the size of the pending pings directory,
532            // we record the calculated number and reverse the pings array back for enqueueing.
533            cached_pings.pending_pings.reverse();
534            self.upload_metrics
535                .pending_pings_directory_size
536                .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
537
538            // Enqueue the remaining pending pings and
539            // enqueue all deletion-request pings.
540            cached_pings
541                .deletion_request_pings
542                .drain(..)
543                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
544            cached_pings
545                .pending_pings
546                .drain(..)
547                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
548        }
549    }
550
551    /// Adds rate limiting capability to this upload manager.
552    ///
553    /// The rate limiter will limit the amount of calls to `get_upload_task` per interval.
554    ///
555    /// Setting this will restart count and timer in case there was a previous rate limiter set
556    /// (e.g. if we have reached the current limit and call this function, we start counting again
557    /// and the caller is allowed to asks for tasks).
558    ///
559    /// # Arguments
560    ///
561    /// * `interval` - the amount of seconds in each rate limiting window.
562    /// * `max_tasks` - the maximum amount of task requests allowed per interval.
563    pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
564        self.rate_limiter = Some(RwLock::new(RateLimiter::new(
565            Duration::from_secs(interval),
566            max_tasks,
567        )));
568    }
569
570    /// Reads a ping file, creates a `PingRequest` and adds it to the queue.
571    ///
572    /// Duplicate requests won't be added.
573    ///
574    /// # Arguments
575    ///
576    /// * `glean` - The Glean object holding the database.
577    /// * `document_id` - The UUID of the ping in question.
578    pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
579        if let Some(ping) = self.directory_manager.process_file(document_id) {
580            self.enqueue_ping(glean, ping);
581        }
582    }
583
584    /// Clears the pending pings queue, leaves the deletion-request pings.
585    pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
586        log::trace!("Clearing ping queue");
587        let mut queue = self
588            .queue
589            .write()
590            .expect("Can't write to pending pings queue.");
591
592        queue.retain(|ping| ping.is_deletion_request());
593        log::trace!(
594            "{} pings left in the queue (only deletion-request expected)",
595            queue.len()
596        );
597        queue
598    }
599
600    fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
601        // Helper to decide whether to return PingUploadTask::Wait or PingUploadTask::Done.
602        //
603        // We want to limit the amount of PingUploadTask::Wait returned in a row,
604        // in case we reach MAX_WAIT_ATTEMPTS we want to actually return PingUploadTask::Done.
605        let wait_or_done = |time: u64| {
606            self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
607            if self.wait_attempt_count() > self.policy.max_wait_attempts() {
608                PingUploadTask::done()
609            } else {
610                PingUploadTask::Wait { time }
611            }
612        };
613
614        if !self.processed_pending_pings() {
615            log::info!(
616                "Tried getting an upload task, but processing is ongoing. Will come back later."
617            );
618            return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
619        }
620
621        // This is a no-op in case there are no cached pings.
622        self.enqueue_cached_pings(glean);
623
624        if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
625            log::warn!(
626                "Reached maximum recoverable failures for the current uploading window. You are done."
627            );
628            return PingUploadTask::done();
629        }
630
631        let mut queue = self
632            .queue
633            .write()
634            .expect("Can't write to pending pings queue.");
635        match queue.front() {
636            Some(request) => {
637                if let Some(rate_limiter) = &self.rate_limiter {
638                    let mut rate_limiter = rate_limiter
639                        .write()
640                        .expect("Can't write to the rate limiter.");
641                    if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
642                        log::info!(
643                            "Tried getting an upload task, but we are throttled at the moment."
644                        );
645                        return wait_or_done(remaining);
646                    }
647                }
648
649                log::info!(
650                    "New upload task with id {} (path: {})",
651                    request.document_id,
652                    request.path
653                );
654
655                if log_ping {
656                    if let Some(body) = request.pretty_body() {
657                        chunked_log_info(&request.path, &body);
658                    } else {
659                        chunked_log_info(&request.path, "<invalid ping payload>");
660                    }
661                }
662
663                {
664                    // Synchronous timer starts.
665                    // We're in the uploader thread anyway.
666                    // But also: No data is stored on disk.
667                    let mut in_flight = self.in_flight.write().unwrap();
668                    let success_id = self.upload_metrics.send_success.start_sync();
669                    let failure_id = self.upload_metrics.send_failure.start_sync();
670                    in_flight.insert(request.document_id.clone(), (success_id, failure_id));
671                }
672
673                let mut request = queue.pop_front().unwrap();
674
675                // Adding the `Date` header just before actual upload happens.
676                request
677                    .headers
678                    .insert("Date".to_string(), create_date_header_value(Utc::now()));
679
680                PingUploadTask::Upload { request }
681            }
682            None => {
683                log::info!("No more pings to upload! You are done.");
684                PingUploadTask::done()
685            }
686        }
687    }
688
689    /// Gets the next `PingUploadTask`.
690    ///
691    /// # Arguments
692    ///
693    /// * `glean` - The Glean object holding the database.
694    /// * `log_ping` - Whether to log the ping before returning.
695    ///
696    /// # Returns
697    ///
698    /// The next [`PingUploadTask`](enum.PingUploadTask.html).
699    pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
700        let task = self.get_upload_task_internal(glean, log_ping);
701
702        if !task.is_wait() && self.wait_attempt_count() > 0 {
703            self.wait_attempt_count.store(0, Ordering::SeqCst);
704        }
705
706        if !task.is_upload() && self.recoverable_failure_count() > 0 {
707            self.recoverable_failure_count.store(0, Ordering::SeqCst);
708        }
709
710        task
711    }
712
713    /// Processes the response from an attempt to upload a ping.
714    ///
715    /// Based on the HTTP status of said response,
716    /// the possible outcomes are:
717    ///
718    /// * **200 - 299 Success**
719    ///   Any status on the 2XX range is considered a succesful upload,
720    ///   which means the corresponding ping file can be deleted.
721    ///   _Known 2XX status:_
722    ///   * 200 - OK. Request accepted into the pipeline.
723    ///
724    /// * **400 - 499 Unrecoverable error**
725    ///   Any status on the 4XX range means something our client did is not correct.
726    ///   It is unlikely that the client is going to recover from this by retrying,
727    ///   so in this case the corresponding ping file can also be deleted.
728    ///   _Known 4XX status:_
729    ///   * 404 - not found - POST/PUT to an unknown namespace
730    ///   * 405 - wrong request type (anything other than POST/PUT)
731    ///   * 411 - missing content-length header
732    ///   * 413 - request body too large Note that if we have badly-behaved clients that
733    ///           retry on 4XX, we should send back 202 on body/path too long).
734    ///   * 414 - request path too long (See above)
735    ///
736    /// * **Any other error**
737    ///   For any other error, a warning is logged and the ping is re-enqueued.
738    ///   _Known other errors:_
739    ///   * 500 - internal error
740    ///
741    /// # Note
742    ///
743    /// The disk I/O performed by this function is not done off-thread,
744    /// as it is expected to be called off-thread by the platform.
745    ///
746    /// # Arguments
747    ///
748    /// * `glean` - The Glean object holding the database.
749    /// * `document_id` - The UUID of the ping in question.
750    /// * `status` - The HTTP status of the response.
751    pub fn process_ping_upload_response(
752        &self,
753        glean: &Glean,
754        document_id: &str,
755        status: UploadResult,
756    ) -> UploadTaskAction {
757        use UploadResult::*;
758
759        let stop_time = zeitstempel::now();
760
761        if let Some(label) = status.get_label() {
762            let metric = self.upload_metrics.ping_upload_failure.get(label);
763            metric.add_sync(glean, 1);
764        }
765
766        let send_ids = {
767            let mut lock = self.in_flight.write().unwrap();
768            lock.remove(document_id)
769        };
770
771        if send_ids.is_none() {
772            self.upload_metrics.missing_send_ids.add_sync(glean, 1);
773        }
774
775        match status {
776            HttpStatus { code } if (200..=299).contains(&code) => {
777                log::info!("Ping {} successfully sent {}.", document_id, code);
778                if let Some((success_id, failure_id)) = send_ids {
779                    self.upload_metrics
780                        .send_success
781                        .set_stop_and_accumulate(glean, success_id, stop_time);
782                    self.upload_metrics.send_failure.cancel_sync(failure_id);
783                }
784                self.directory_manager.delete_file(document_id);
785            }
786
787            UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
788                log::warn!(
789                    "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
790                    document_id,
791                    status
792                );
793                if let Some((success_id, failure_id)) = send_ids {
794                    self.upload_metrics.send_success.cancel_sync(success_id);
795                    self.upload_metrics
796                        .send_failure
797                        .set_stop_and_accumulate(glean, failure_id, stop_time);
798                }
799                self.directory_manager.delete_file(document_id);
800            }
801
802            RecoverableFailure { .. } | HttpStatus { .. } => {
803                log::warn!(
804                    "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
805                    document_id,
806                    status
807                );
808                if let Some((success_id, failure_id)) = send_ids {
809                    self.upload_metrics.send_success.cancel_sync(success_id);
810                    self.upload_metrics
811                        .send_failure
812                        .set_stop_and_accumulate(glean, failure_id, stop_time);
813                }
814                self.enqueue_ping_from_file(glean, document_id);
815                self.recoverable_failure_count
816                    .fetch_add(1, Ordering::SeqCst);
817            }
818
819            Done { .. } => {
820                log::debug!("Uploader signaled Done. Exiting.");
821                if let Some((success_id, failure_id)) = send_ids {
822                    self.upload_metrics.send_success.cancel_sync(success_id);
823                    self.upload_metrics.send_failure.cancel_sync(failure_id);
824                }
825                return UploadTaskAction::End;
826            }
827        };
828
829        UploadTaskAction::Next
830    }
831}
832
833/// Splits log message into chunks on Android.
834#[cfg(target_os = "android")]
835pub fn chunked_log_info(path: &str, payload: &str) {
836    // Since the logcat ring buffer size is configurable, but it's 'max payload' size is not,
837    // we must break apart long pings into chunks no larger than the max payload size of 4076b.
838    // We leave some head space for our prefix.
839    const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
840
841    // If the length of the ping will fit within one logcat payload, then we can
842    // short-circuit here and avoid some overhead, otherwise we must split up the
843    // message so that we don't truncate it.
844    if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
845        log::info!("Glean ping to URL: {}\n{}", path, payload);
846        return;
847    }
848
849    // Otherwise we break it apart into chunks of smaller size,
850    // prefixing it with the path and a counter.
851    let mut start = 0;
852    let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
853    let mut chunk_idx = 1;
854    // Might be off by 1 on edge cases, but do we really care?
855    let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
856
857    while end < payload.len() {
858        // Find char boundary from the end.
859        // It's UTF-8, so it is within 4 bytes from here.
860        for _ in 0..4 {
861            if payload.is_char_boundary(end) {
862                break;
863            }
864            end -= 1;
865        }
866
867        log::info!(
868            "Glean ping to URL: {} [Part {} of {}]\n{}",
869            path,
870            chunk_idx,
871            total_chunks,
872            &payload[start..end]
873        );
874
875        // Move on with the string
876        start = end;
877        end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
878        chunk_idx += 1;
879    }
880
881    // Print any suffix left
882    if start < payload.len() {
883        log::info!(
884            "Glean ping to URL: {} [Part {} of {}]\n{}",
885            path,
886            chunk_idx,
887            total_chunks,
888            &payload[start..]
889        );
890    }
891}
892
893/// Logs payload in one go (all other OS).
894#[cfg(not(target_os = "android"))]
895pub fn chunked_log_info(_path: &str, payload: &str) {
896    log::info!("{}", payload)
897}
898
899#[cfg(test)]
900mod test {
901    use std::thread;
902    use uuid::Uuid;
903
904    use super::*;
905    use crate::metrics::PingType;
906    use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
907
908    const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
909
910    #[test]
911    fn doesnt_error_when_there_are_no_pending_pings() {
912        let (glean, _t) = new_glean(None);
913
914        // Try and get the next request.
915        // Verify request was not returned
916        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
917    }
918
919    #[test]
920    fn returns_ping_request_when_there_is_one() {
921        let (glean, dir) = new_glean(None);
922
923        let upload_manager = PingUploadManager::no_policy(dir.path());
924
925        // Enqueue a ping
926        upload_manager.enqueue_ping(
927            &glean,
928            PingPayload {
929                document_id: Uuid::new_v4().to_string(),
930                upload_path: PATH.into(),
931                json_body: "".into(),
932                headers: None,
933                body_has_info_sections: true,
934                ping_name: "ping-name".into(),
935                uploader_capabilities: vec![],
936            },
937        );
938
939        // Try and get the next request.
940        // Verify request was returned
941        let task = upload_manager.get_upload_task(&glean, false);
942        assert!(task.is_upload());
943    }
944
945    #[test]
946    fn returns_as_many_ping_requests_as_there_are() {
947        let (glean, dir) = new_glean(None);
948
949        let upload_manager = PingUploadManager::no_policy(dir.path());
950
951        // Enqueue a ping multiple times
952        let n = 10;
953        for _ in 0..n {
954            upload_manager.enqueue_ping(
955                &glean,
956                PingPayload {
957                    document_id: Uuid::new_v4().to_string(),
958                    upload_path: PATH.into(),
959                    json_body: "".into(),
960                    headers: None,
961                    body_has_info_sections: true,
962                    ping_name: "ping-name".into(),
963                    uploader_capabilities: vec![],
964                },
965            );
966        }
967
968        // Verify a request is returned for each submitted ping
969        for _ in 0..n {
970            let task = upload_manager.get_upload_task(&glean, false);
971            assert!(task.is_upload());
972        }
973
974        // Verify that after all requests are returned, none are left
975        assert_eq!(
976            upload_manager.get_upload_task(&glean, false),
977            PingUploadTask::done()
978        );
979    }
980
981    #[test]
982    fn limits_the_number_of_pings_when_there_is_rate_limiting() {
983        let (glean, dir) = new_glean(None);
984
985        let mut upload_manager = PingUploadManager::no_policy(dir.path());
986
987        // Add a rate limiter to the upload mangager with max of 10 pings every 3 seconds.
988        let max_pings_per_interval = 10;
989        upload_manager.set_rate_limiter(3, 10);
990
991        // Enqueue the max number of pings allowed per uploading window
992        for _ in 0..max_pings_per_interval {
993            upload_manager.enqueue_ping(
994                &glean,
995                PingPayload {
996                    document_id: Uuid::new_v4().to_string(),
997                    upload_path: PATH.into(),
998                    json_body: "".into(),
999                    headers: None,
1000                    body_has_info_sections: true,
1001                    ping_name: "ping-name".into(),
1002                    uploader_capabilities: vec![],
1003                },
1004            );
1005        }
1006
1007        // Verify a request is returned for each submitted ping
1008        for _ in 0..max_pings_per_interval {
1009            let task = upload_manager.get_upload_task(&glean, false);
1010            assert!(task.is_upload());
1011        }
1012
1013        // Enqueue just one more ping
1014        upload_manager.enqueue_ping(
1015            &glean,
1016            PingPayload {
1017                document_id: Uuid::new_v4().to_string(),
1018                upload_path: PATH.into(),
1019                json_body: "".into(),
1020                headers: None,
1021                body_has_info_sections: true,
1022                ping_name: "ping-name".into(),
1023                uploader_capabilities: vec![],
1024            },
1025        );
1026
1027        // Verify that we are indeed told to wait because we are at capacity
1028        match upload_manager.get_upload_task(&glean, false) {
1029            PingUploadTask::Wait { time } => {
1030                // Wait for the uploading window to reset
1031                thread::sleep(Duration::from_millis(time));
1032            }
1033            _ => panic!("Expected upload manager to return a wait task!"),
1034        };
1035
1036        let task = upload_manager.get_upload_task(&glean, false);
1037        assert!(task.is_upload());
1038    }
1039
1040    #[test]
1041    fn clearing_the_queue_works_correctly() {
1042        let (glean, dir) = new_glean(None);
1043
1044        let upload_manager = PingUploadManager::no_policy(dir.path());
1045
1046        // Enqueue a ping multiple times
1047        for _ in 0..10 {
1048            upload_manager.enqueue_ping(
1049                &glean,
1050                PingPayload {
1051                    document_id: Uuid::new_v4().to_string(),
1052                    upload_path: PATH.into(),
1053                    json_body: "".into(),
1054                    headers: None,
1055                    body_has_info_sections: true,
1056                    ping_name: "ping-name".into(),
1057                    uploader_capabilities: vec![],
1058                },
1059            );
1060        }
1061
1062        // Clear the queue
1063        drop(upload_manager.clear_ping_queue());
1064
1065        // Verify there really isn't any ping in the queue
1066        assert_eq!(
1067            upload_manager.get_upload_task(&glean, false),
1068            PingUploadTask::done()
1069        );
1070    }
1071
1072    #[test]
1073    fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1074        let (mut glean, _t) = new_glean(None);
1075
1076        // Register a ping for testing
1077        let ping_type = PingType::new(
1078            "test",
1079            true,
1080            /* send_if_empty */ true,
1081            true,
1082            true,
1083            true,
1084            vec![],
1085            vec![],
1086            true,
1087            vec![],
1088        );
1089        glean.register_ping_type(&ping_type);
1090
1091        // Submit the ping multiple times
1092        let n = 10;
1093        for _ in 0..n {
1094            ping_type.submit_sync(&glean, None);
1095        }
1096
1097        glean
1098            .internal_pings
1099            .deletion_request
1100            .submit_sync(&glean, None);
1101
1102        // Clear the queue
1103        drop(glean.upload_manager.clear_ping_queue());
1104
1105        let upload_task = glean.get_upload_task();
1106        match upload_task {
1107            PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1108            _ => panic!("Expected upload manager to return the next request!"),
1109        }
1110
1111        // Verify there really isn't any other pings in the queue
1112        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1113    }
1114
1115    #[test]
1116    fn fills_up_queue_successfully_from_disk() {
1117        let (mut glean, dir) = new_glean(None);
1118
1119        // Register a ping for testing
1120        let ping_type = PingType::new(
1121            "test",
1122            true,
1123            /* send_if_empty */ true,
1124            true,
1125            true,
1126            true,
1127            vec![],
1128            vec![],
1129            true,
1130            vec![],
1131        );
1132        glean.register_ping_type(&ping_type);
1133
1134        // Submit the ping multiple times
1135        let n = 10;
1136        for _ in 0..n {
1137            ping_type.submit_sync(&glean, None);
1138        }
1139
1140        // Create a new upload manager pointing to the same data_path as the glean instance.
1141        let upload_manager = PingUploadManager::no_policy(dir.path());
1142
1143        // Verify the requests were properly enqueued
1144        for _ in 0..n {
1145            let task = upload_manager.get_upload_task(&glean, false);
1146            assert!(task.is_upload());
1147        }
1148
1149        // Verify that after all requests are returned, none are left
1150        assert_eq!(
1151            upload_manager.get_upload_task(&glean, false),
1152            PingUploadTask::done()
1153        );
1154    }
1155
1156    #[test]
1157    fn processes_correctly_success_upload_response() {
1158        let (mut glean, dir) = new_glean(None);
1159
1160        // Register a ping for testing
1161        let ping_type = PingType::new(
1162            "test",
1163            true,
1164            /* send_if_empty */ true,
1165            true,
1166            true,
1167            true,
1168            vec![],
1169            vec![],
1170            true,
1171            vec![],
1172        );
1173        glean.register_ping_type(&ping_type);
1174
1175        // Submit a ping
1176        ping_type.submit_sync(&glean, None);
1177
1178        // Get the pending ping directory path
1179        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1180
1181        // Get the submitted PingRequest
1182        match glean.get_upload_task() {
1183            PingUploadTask::Upload { request } => {
1184                // Simulate the processing of a sucessfull request
1185                let document_id = request.document_id;
1186                glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1187                // Verify file was deleted
1188                assert!(!pending_pings_dir.join(document_id).exists());
1189            }
1190            _ => panic!("Expected upload manager to return the next request!"),
1191        }
1192
1193        // Verify that after request is returned, none are left
1194        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1195    }
1196
1197    #[test]
1198    fn processes_correctly_client_error_upload_response() {
1199        let (mut glean, dir) = new_glean(None);
1200
1201        // Register a ping for testing
1202        let ping_type = PingType::new(
1203            "test",
1204            true,
1205            /* send_if_empty */ true,
1206            true,
1207            true,
1208            true,
1209            vec![],
1210            vec![],
1211            true,
1212            vec![],
1213        );
1214        glean.register_ping_type(&ping_type);
1215
1216        // Submit a ping
1217        ping_type.submit_sync(&glean, None);
1218
1219        // Get the pending ping directory path
1220        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1221
1222        // Get the submitted PingRequest
1223        match glean.get_upload_task() {
1224            PingUploadTask::Upload { request } => {
1225                // Simulate the processing of a client error
1226                let document_id = request.document_id;
1227                glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1228                // Verify file was deleted
1229                assert!(!pending_pings_dir.join(document_id).exists());
1230            }
1231            _ => panic!("Expected upload manager to return the next request!"),
1232        }
1233
1234        // Verify that after request is returned, none are left
1235        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1236    }
1237
1238    #[test]
1239    fn processes_correctly_server_error_upload_response() {
1240        let (mut glean, _t) = new_glean(None);
1241
1242        // Register a ping for testing
1243        let ping_type = PingType::new(
1244            "test",
1245            true,
1246            /* send_if_empty */ true,
1247            true,
1248            true,
1249            true,
1250            vec![],
1251            vec![],
1252            true,
1253            vec![],
1254        );
1255        glean.register_ping_type(&ping_type);
1256
1257        // Submit a ping
1258        ping_type.submit_sync(&glean, None);
1259
1260        // Get the submitted PingRequest
1261        match glean.get_upload_task() {
1262            PingUploadTask::Upload { request } => {
1263                // Simulate the processing of a client error
1264                let document_id = request.document_id;
1265                glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1266                // Verify this ping was indeed re-enqueued
1267                match glean.get_upload_task() {
1268                    PingUploadTask::Upload { request } => {
1269                        assert_eq!(document_id, request.document_id);
1270                    }
1271                    _ => panic!("Expected upload manager to return the next request!"),
1272                }
1273            }
1274            _ => panic!("Expected upload manager to return the next request!"),
1275        }
1276
1277        // Verify that after request is returned, none are left
1278        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1279    }
1280
1281    #[test]
1282    fn processes_correctly_unrecoverable_upload_response() {
1283        let (mut glean, dir) = new_glean(None);
1284
1285        // Register a ping for testing
1286        let ping_type = PingType::new(
1287            "test",
1288            true,
1289            /* send_if_empty */ true,
1290            true,
1291            true,
1292            true,
1293            vec![],
1294            vec![],
1295            true,
1296            vec![],
1297        );
1298        glean.register_ping_type(&ping_type);
1299
1300        // Submit a ping
1301        ping_type.submit_sync(&glean, None);
1302
1303        // Get the pending ping directory path
1304        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1305
1306        // Get the submitted PingRequest
1307        match glean.get_upload_task() {
1308            PingUploadTask::Upload { request } => {
1309                // Simulate the processing of a client error
1310                let document_id = request.document_id;
1311                glean.process_ping_upload_response(
1312                    &document_id,
1313                    UploadResult::unrecoverable_failure(),
1314                );
1315                // Verify file was deleted
1316                assert!(!pending_pings_dir.join(document_id).exists());
1317            }
1318            _ => panic!("Expected upload manager to return the next request!"),
1319        }
1320
1321        // Verify that after request is returned, none are left
1322        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1323    }
1324
1325    #[test]
1326    fn new_pings_are_added_while_upload_in_progress() {
1327        let (glean, dir) = new_glean(None);
1328
1329        let upload_manager = PingUploadManager::no_policy(dir.path());
1330
1331        let doc1 = Uuid::new_v4().to_string();
1332        let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1333
1334        let doc2 = Uuid::new_v4().to_string();
1335        let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1336
1337        // Enqueue a ping
1338        upload_manager.enqueue_ping(
1339            &glean,
1340            PingPayload {
1341                document_id: doc1.clone(),
1342                upload_path: path1,
1343                json_body: "".into(),
1344                headers: None,
1345                body_has_info_sections: true,
1346                ping_name: "test-ping".into(),
1347                uploader_capabilities: vec![],
1348            },
1349        );
1350
1351        // Try and get the first request.
1352        let req = match upload_manager.get_upload_task(&glean, false) {
1353            PingUploadTask::Upload { request } => request,
1354            _ => panic!("Expected upload manager to return the next request!"),
1355        };
1356        assert_eq!(doc1, req.document_id);
1357
1358        // Schedule the next one while the first one is "in progress"
1359        upload_manager.enqueue_ping(
1360            &glean,
1361            PingPayload {
1362                document_id: doc2.clone(),
1363                upload_path: path2,
1364                json_body: "".into(),
1365                headers: None,
1366                body_has_info_sections: true,
1367                ping_name: "test-ping".into(),
1368                uploader_capabilities: vec![],
1369            },
1370        );
1371
1372        // Mark as processed
1373        upload_manager.process_ping_upload_response(
1374            &glean,
1375            &req.document_id,
1376            UploadResult::http_status(200),
1377        );
1378
1379        // Get the second request.
1380        let req = match upload_manager.get_upload_task(&glean, false) {
1381            PingUploadTask::Upload { request } => request,
1382            _ => panic!("Expected upload manager to return the next request!"),
1383        };
1384        assert_eq!(doc2, req.document_id);
1385
1386        // Mark as processed
1387        upload_manager.process_ping_upload_response(
1388            &glean,
1389            &req.document_id,
1390            UploadResult::http_status(200),
1391        );
1392
1393        // ... and then we're done.
1394        assert_eq!(
1395            upload_manager.get_upload_task(&glean, false),
1396            PingUploadTask::done()
1397        );
1398    }
1399
1400    #[test]
1401    fn adds_debug_view_header_to_requests_when_tag_is_set() {
1402        let (mut glean, _t) = new_glean(None);
1403
1404        glean.set_debug_view_tag("valid-tag");
1405
1406        // Register a ping for testing
1407        let ping_type = PingType::new(
1408            "test",
1409            true,
1410            /* send_if_empty */ true,
1411            true,
1412            true,
1413            true,
1414            vec![],
1415            vec![],
1416            true,
1417            vec![],
1418        );
1419        glean.register_ping_type(&ping_type);
1420
1421        // Submit a ping
1422        ping_type.submit_sync(&glean, None);
1423
1424        // Get the submitted PingRequest
1425        match glean.get_upload_task() {
1426            PingUploadTask::Upload { request } => {
1427                assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1428            }
1429            _ => panic!("Expected upload manager to return the next request!"),
1430        }
1431    }
1432
1433    #[test]
1434    fn duplicates_are_not_enqueued() {
1435        let (glean, dir) = new_glean(None);
1436
1437        // Create a new upload manager so that we have access to its functions directly,
1438        // make it synchronous so we don't have to manually wait for the scanning to finish.
1439        let upload_manager = PingUploadManager::no_policy(dir.path());
1440
1441        let doc_id = Uuid::new_v4().to_string();
1442        let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1443
1444        // Try to enqueue a ping with the same doc_id twice
1445        upload_manager.enqueue_ping(
1446            &glean,
1447            PingPayload {
1448                document_id: doc_id.clone(),
1449                upload_path: path.clone(),
1450                json_body: "".into(),
1451                headers: None,
1452                body_has_info_sections: true,
1453                ping_name: "test-ping".into(),
1454                uploader_capabilities: vec![],
1455            },
1456        );
1457        upload_manager.enqueue_ping(
1458            &glean,
1459            PingPayload {
1460                document_id: doc_id,
1461                upload_path: path,
1462                json_body: "".into(),
1463                headers: None,
1464                body_has_info_sections: true,
1465                ping_name: "test-ping".into(),
1466                uploader_capabilities: vec![],
1467            },
1468        );
1469
1470        // Get a task once
1471        let task = upload_manager.get_upload_task(&glean, false);
1472        assert!(task.is_upload());
1473
1474        // There should be no more queued tasks
1475        assert_eq!(
1476            upload_manager.get_upload_task(&glean, false),
1477            PingUploadTask::done()
1478        );
1479    }
1480
1481    #[test]
1482    fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1483        let (mut glean, dir) = new_glean(None);
1484
1485        // Register a ping for testing
1486        let ping_type = PingType::new(
1487            "test",
1488            true,
1489            /* send_if_empty */ true,
1490            true,
1491            true,
1492            true,
1493            vec![],
1494            vec![],
1495            true,
1496            vec![],
1497        );
1498        glean.register_ping_type(&ping_type);
1499
1500        // Submit the ping multiple times
1501        let n = 5;
1502        for _ in 0..n {
1503            ping_type.submit_sync(&glean, None);
1504        }
1505
1506        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1507
1508        // Set a policy for max recoverable failures, this is usually disabled for tests.
1509        let max_recoverable_failures = 3;
1510        upload_manager
1511            .policy
1512            .set_max_recoverable_failures(Some(max_recoverable_failures));
1513
1514        // Return the max recoverable error failures in a row
1515        for _ in 0..max_recoverable_failures {
1516            match upload_manager.get_upload_task(&glean, false) {
1517                PingUploadTask::Upload { request } => {
1518                    upload_manager.process_ping_upload_response(
1519                        &glean,
1520                        &request.document_id,
1521                        UploadResult::recoverable_failure(),
1522                    );
1523                }
1524                _ => panic!("Expected upload manager to return the next request!"),
1525            }
1526        }
1527
1528        // Verify that after returning the max amount of recoverable failures,
1529        // we are done even though we haven't gotten all the enqueued requests.
1530        assert_eq!(
1531            upload_manager.get_upload_task(&glean, false),
1532            PingUploadTask::done()
1533        );
1534
1535        // Verify all requests are returned when we try again.
1536        for _ in 0..n {
1537            let task = upload_manager.get_upload_task(&glean, false);
1538            assert!(task.is_upload());
1539        }
1540    }
1541
1542    #[test]
1543    fn quota_is_enforced_when_enqueueing_cached_pings() {
1544        let (mut glean, dir) = new_glean(None);
1545
1546        // Register a ping for testing
1547        let ping_type = PingType::new(
1548            "test",
1549            true,
1550            /* send_if_empty */ true,
1551            true,
1552            true,
1553            true,
1554            vec![],
1555            vec![],
1556            true,
1557            vec![],
1558        );
1559        glean.register_ping_type(&ping_type);
1560
1561        // Submit the ping multiple times
1562        let n = 10;
1563        for _ in 0..n {
1564            ping_type.submit_sync(&glean, None);
1565        }
1566
1567        let directory_manager = PingDirectoryManager::new(dir.path());
1568        let pending_pings = directory_manager.process_dirs().pending_pings;
1569        // The pending pings array is sorted by date in ascending order,
1570        // the newest element is the last one.
1571        let (_, newest_ping) = &pending_pings.last().unwrap();
1572        let PingPayload {
1573            document_id: newest_ping_id,
1574            ..
1575        } = &newest_ping;
1576
1577        // Create a new upload manager pointing to the same data_path as the glean instance.
1578        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1579
1580        // Set the quota to just a little over the size on an empty ping file.
1581        // This way we can check that one ping is kept and all others are deleted.
1582        //
1583        // From manual testing I figured out an empty ping file is 324bytes,
1584        // I am setting this a little over just so that minor changes to the ping structure
1585        // don't immediatelly break this.
1586        upload_manager
1587            .policy
1588            .set_max_pending_pings_directory_size(Some(500));
1589
1590        // Get a task once
1591        // One ping should have been enqueued.
1592        // Make sure it is the newest ping.
1593        match upload_manager.get_upload_task(&glean, false) {
1594            PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1595            _ => panic!("Expected upload manager to return the next request!"),
1596        }
1597
1598        // Verify that no other requests were returned,
1599        // they should all have been deleted because pending pings quota was hit.
1600        assert_eq!(
1601            upload_manager.get_upload_task(&glean, false),
1602            PingUploadTask::done()
1603        );
1604
1605        // Verify that the correct number of deleted pings was recorded
1606        assert_eq!(
1607            n - 1,
1608            upload_manager
1609                .upload_metrics
1610                .deleted_pings_after_quota_hit
1611                .get_value(&glean, Some("metrics"))
1612                .unwrap()
1613        );
1614        assert_eq!(
1615            n,
1616            upload_manager
1617                .upload_metrics
1618                .pending_pings
1619                .get_value(&glean, Some("metrics"))
1620                .unwrap()
1621        );
1622    }
1623
1624    #[test]
1625    fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1626        let (mut glean, dir) = new_glean(None);
1627
1628        // Register a ping for testing
1629        let ping_type = PingType::new(
1630            "test",
1631            true,
1632            /* send_if_empty */ true,
1633            true,
1634            true,
1635            true,
1636            vec![],
1637            vec![],
1638            true,
1639            vec![],
1640        );
1641        glean.register_ping_type(&ping_type);
1642
1643        // How many pings we allow at maximum
1644        let count_quota = 3;
1645        // The number of pings we fill the pending pings directory with.
1646        let n = 10;
1647
1648        // Submit the ping multiple times
1649        for _ in 0..n {
1650            ping_type.submit_sync(&glean, None);
1651        }
1652
1653        let directory_manager = PingDirectoryManager::new(dir.path());
1654        let pending_pings = directory_manager.process_dirs().pending_pings;
1655        // The pending pings array is sorted by date in ascending order,
1656        // the newest element is the last one.
1657        let expected_pings = pending_pings
1658            .iter()
1659            .rev()
1660            .take(count_quota)
1661            .map(|(_, ping)| ping.document_id.clone())
1662            .collect::<Vec<_>>();
1663
1664        // Create a new upload manager pointing to the same data_path as the glean instance.
1665        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1666
1667        upload_manager
1668            .policy
1669            .set_max_pending_pings_count(Some(count_quota as u64));
1670
1671        // Get a task once
1672        // One ping should have been enqueued.
1673        // Make sure it is the newest ping.
1674        for ping_id in expected_pings.iter().rev() {
1675            match upload_manager.get_upload_task(&glean, false) {
1676                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1677                _ => panic!("Expected upload manager to return the next request!"),
1678            }
1679        }
1680
1681        // Verify that no other requests were returned,
1682        // they should all have been deleted because pending pings quota was hit.
1683        assert_eq!(
1684            upload_manager.get_upload_task(&glean, false),
1685            PingUploadTask::done()
1686        );
1687
1688        // Verify that the correct number of deleted pings was recorded
1689        assert_eq!(
1690            (n - count_quota) as i32,
1691            upload_manager
1692                .upload_metrics
1693                .deleted_pings_after_quota_hit
1694                .get_value(&glean, Some("metrics"))
1695                .unwrap()
1696        );
1697        assert_eq!(
1698            n as i32,
1699            upload_manager
1700                .upload_metrics
1701                .pending_pings
1702                .get_value(&glean, Some("metrics"))
1703                .unwrap()
1704        );
1705    }
1706
1707    #[test]
1708    fn size_and_count_quota_work_together_size_first() {
1709        let (mut glean, dir) = new_glean(None);
1710
1711        // Register a ping for testing
1712        let ping_type = PingType::new(
1713            "test",
1714            true,
1715            /* send_if_empty */ true,
1716            true,
1717            true,
1718            true,
1719            vec![],
1720            vec![],
1721            true,
1722            vec![],
1723        );
1724        glean.register_ping_type(&ping_type);
1725
1726        let expected_number_of_pings = 3;
1727        // The number of pings we fill the pending pings directory with.
1728        let n = 10;
1729
1730        // Submit the ping multiple times
1731        for _ in 0..n {
1732            ping_type.submit_sync(&glean, None);
1733        }
1734
1735        let directory_manager = PingDirectoryManager::new(dir.path());
1736        let pending_pings = directory_manager.process_dirs().pending_pings;
1737        // The pending pings array is sorted by date in ascending order,
1738        // the newest element is the last one.
1739        let expected_pings = pending_pings
1740            .iter()
1741            .rev()
1742            .take(expected_number_of_pings)
1743            .map(|(_, ping)| ping.document_id.clone())
1744            .collect::<Vec<_>>();
1745
1746        // Create a new upload manager pointing to the same data_path as the glean instance.
1747        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1748
1749        // From manual testing we figured out a basically empty ping file is 399 bytes,
1750        // so this allows 3 pings with some headroom in case of future changes.
1751        upload_manager
1752            .policy
1753            .set_max_pending_pings_directory_size(Some(1300));
1754        upload_manager.policy.set_max_pending_pings_count(Some(5));
1755
1756        // Get a task once
1757        // One ping should have been enqueued.
1758        // Make sure it is the newest ping.
1759        for ping_id in expected_pings.iter().rev() {
1760            match upload_manager.get_upload_task(&glean, false) {
1761                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1762                _ => panic!("Expected upload manager to return the next request!"),
1763            }
1764        }
1765
1766        // Verify that no other requests were returned,
1767        // they should all have been deleted because pending pings quota was hit.
1768        assert_eq!(
1769            upload_manager.get_upload_task(&glean, false),
1770            PingUploadTask::done()
1771        );
1772
1773        // Verify that the correct number of deleted pings was recorded
1774        assert_eq!(
1775            (n - expected_number_of_pings) as i32,
1776            upload_manager
1777                .upload_metrics
1778                .deleted_pings_after_quota_hit
1779                .get_value(&glean, Some("metrics"))
1780                .unwrap()
1781        );
1782        assert_eq!(
1783            n as i32,
1784            upload_manager
1785                .upload_metrics
1786                .pending_pings
1787                .get_value(&glean, Some("metrics"))
1788                .unwrap()
1789        );
1790    }
1791
1792    #[test]
1793    fn size_and_count_quota_work_together_count_first() {
1794        let (mut glean, dir) = new_glean(None);
1795
1796        // Register a ping for testing
1797        let ping_type = PingType::new(
1798            "test",
1799            true,
1800            /* send_if_empty */ true,
1801            true,
1802            true,
1803            true,
1804            vec![],
1805            vec![],
1806            true,
1807            vec![],
1808        );
1809        glean.register_ping_type(&ping_type);
1810
1811        let expected_number_of_pings = 2;
1812        // The number of pings we fill the pending pings directory with.
1813        let n = 10;
1814
1815        // Submit the ping multiple times
1816        for _ in 0..n {
1817            ping_type.submit_sync(&glean, None);
1818        }
1819
1820        let directory_manager = PingDirectoryManager::new(dir.path());
1821        let pending_pings = directory_manager.process_dirs().pending_pings;
1822        // The pending pings array is sorted by date in ascending order,
1823        // the newest element is the last one.
1824        let expected_pings = pending_pings
1825            .iter()
1826            .rev()
1827            .take(expected_number_of_pings)
1828            .map(|(_, ping)| ping.document_id.clone())
1829            .collect::<Vec<_>>();
1830
1831        // Create a new upload manager pointing to the same data_path as the glean instance.
1832        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1833
1834        // From manual testing we figured out an empty ping file is 324bytes,
1835        // so this allows 3 pings.
1836        upload_manager
1837            .policy
1838            .set_max_pending_pings_directory_size(Some(1000));
1839        upload_manager.policy.set_max_pending_pings_count(Some(2));
1840
1841        // Get a task once
1842        // One ping should have been enqueued.
1843        // Make sure it is the newest ping.
1844        for ping_id in expected_pings.iter().rev() {
1845            match upload_manager.get_upload_task(&glean, false) {
1846                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1847                _ => panic!("Expected upload manager to return the next request!"),
1848            }
1849        }
1850
1851        // Verify that no other requests were returned,
1852        // they should all have been deleted because pending pings quota was hit.
1853        assert_eq!(
1854            upload_manager.get_upload_task(&glean, false),
1855            PingUploadTask::done()
1856        );
1857
1858        // Verify that the correct number of deleted pings was recorded
1859        assert_eq!(
1860            (n - expected_number_of_pings) as i32,
1861            upload_manager
1862                .upload_metrics
1863                .deleted_pings_after_quota_hit
1864                .get_value(&glean, Some("metrics"))
1865                .unwrap()
1866        );
1867        assert_eq!(
1868            n as i32,
1869            upload_manager
1870                .upload_metrics
1871                .pending_pings
1872                .get_value(&glean, Some("metrics"))
1873                .unwrap()
1874        );
1875    }
1876
1877    #[test]
1878    fn maximum_wait_attemps_is_enforced() {
1879        let (glean, dir) = new_glean(None);
1880
1881        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1882
1883        // Define a max_wait_attemps policy, this is disabled for tests by default.
1884        let max_wait_attempts = 3;
1885        upload_manager
1886            .policy
1887            .set_max_wait_attempts(Some(max_wait_attempts));
1888
1889        // Add a rate limiter to the upload mangager with max of 1 ping 5secs.
1890        //
1891        // We arbitrarily set the maximum pings per interval to a very low number,
1892        // when the rate limiter reaches it's limit get_upload_task returns a PingUploadTask::Wait,
1893        // which will allow us to test the limitations around returning too many of those in a row.
1894        let secs_per_interval = 5;
1895        let max_pings_per_interval = 1;
1896        upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
1897
1898        // Enqueue two pings
1899        upload_manager.enqueue_ping(
1900            &glean,
1901            PingPayload {
1902                document_id: Uuid::new_v4().to_string(),
1903                upload_path: PATH.into(),
1904                json_body: "".into(),
1905                headers: None,
1906                body_has_info_sections: true,
1907                ping_name: "ping-name".into(),
1908                uploader_capabilities: vec![],
1909            },
1910        );
1911        upload_manager.enqueue_ping(
1912            &glean,
1913            PingPayload {
1914                document_id: Uuid::new_v4().to_string(),
1915                upload_path: PATH.into(),
1916                json_body: "".into(),
1917                headers: None,
1918                body_has_info_sections: true,
1919                ping_name: "ping-name".into(),
1920                uploader_capabilities: vec![],
1921            },
1922        );
1923
1924        // Get the first ping, it should be returned normally.
1925        match upload_manager.get_upload_task(&glean, false) {
1926            PingUploadTask::Upload { .. } => {}
1927            _ => panic!("Expected upload manager to return the next request!"),
1928        }
1929
1930        // Try to get the next ping,
1931        // we should be throttled and thus get a PingUploadTask::Wait.
1932        // Check that we are indeed allowed to get this response as many times as expected.
1933        for _ in 0..max_wait_attempts {
1934            let task = upload_manager.get_upload_task(&glean, false);
1935            assert!(task.is_wait());
1936        }
1937
1938        // Check that after we get PingUploadTask::Wait the allowed number of times,
1939        // we then get PingUploadTask::Done.
1940        assert_eq!(
1941            upload_manager.get_upload_task(&glean, false),
1942            PingUploadTask::done()
1943        );
1944
1945        // Wait for the rate limiter to allow upload tasks again.
1946        thread::sleep(Duration::from_secs(secs_per_interval));
1947
1948        // Check that we are allowed again to get pings.
1949        let task = upload_manager.get_upload_task(&glean, false);
1950        assert!(task.is_upload());
1951
1952        // And once we are done we don't need to wait anymore.
1953        assert_eq!(
1954            upload_manager.get_upload_task(&glean, false),
1955            PingUploadTask::done()
1956        );
1957    }
1958
1959    #[test]
1960    fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
1961        let (glean, dir) = new_glean(None);
1962        let upload_manager = PingUploadManager::new(dir.path(), "test");
1963        match upload_manager.get_upload_task(&glean, false) {
1964            PingUploadTask::Wait { time } => {
1965                assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
1966            }
1967            _ => panic!("Expected upload manager to return a wait task!"),
1968        };
1969    }
1970
1971    #[test]
1972    fn cannot_enqueue_ping_while_its_being_processed() {
1973        let (glean, dir) = new_glean(None);
1974
1975        let upload_manager = PingUploadManager::no_policy(dir.path());
1976
1977        // Enqueue a ping and start processing it
1978        let identifier = &Uuid::new_v4();
1979        let ping = PingPayload {
1980            document_id: identifier.to_string(),
1981            upload_path: PATH.into(),
1982            json_body: "".into(),
1983            headers: None,
1984            body_has_info_sections: true,
1985            ping_name: "ping-name".into(),
1986            uploader_capabilities: vec![],
1987        };
1988        upload_manager.enqueue_ping(&glean, ping);
1989        assert!(upload_manager.get_upload_task(&glean, false).is_upload());
1990
1991        // Attempt to re-enqueue the same ping
1992        let ping = PingPayload {
1993            document_id: identifier.to_string(),
1994            upload_path: PATH.into(),
1995            json_body: "".into(),
1996            headers: None,
1997            body_has_info_sections: true,
1998            ping_name: "ping-name".into(),
1999            uploader_capabilities: vec![],
2000        };
2001        upload_manager.enqueue_ping(&glean, ping);
2002
2003        // No new pings should have been enqueued so the upload task is Done.
2004        assert_eq!(
2005            upload_manager.get_upload_task(&glean, false),
2006            PingUploadTask::done()
2007        );
2008
2009        // Process the upload response
2010        upload_manager.process_ping_upload_response(
2011            &glean,
2012            &identifier.to_string(),
2013            UploadResult::http_status(200),
2014        );
2015    }
2016}