glean_core/upload/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Manages the pending pings queue and directory.
6//!
7//! * Keeps track of pending pings, loading any unsent ping from disk on startup;
8//! * Exposes [`get_upload_task`](PingUploadManager::get_upload_task) API for
9//!   the platform layer to request next upload task;
10//! * Exposes
11//!   [`process_ping_upload_response`](PingUploadManager::process_ping_upload_response)
12//!   API to check the HTTP response from the ping upload and either delete the
13//!   corresponding ping from disk or re-enqueue it for sending.
14
15use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::mem;
18use std::path::PathBuf;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::sync::{Arc, RwLock, RwLockWriteGuard};
21use std::thread;
22use std::time::{Duration, Instant};
23
24use chrono::Utc;
25use malloc_size_of::MallocSizeOf;
26use malloc_size_of_derive::MallocSizeOf;
27
28use crate::error::ErrorKind;
29use crate::TimerId;
30use crate::{internal_metrics::UploadMetrics, Glean};
31pub use directory::process_metadata;
32use directory::{PingDirectoryManager, PingPayloadsByDirectory};
33use policy::Policy;
34use request::create_date_header_value;
35
36pub use directory::{PingMetadata, PingPayload};
37pub use request::{HeaderMap, PingRequest};
38pub use result::{UploadResult, UploadTaskAction};
39
40mod directory;
41mod policy;
42mod request;
43mod result;
44
45const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; // in milliseconds
46
47#[derive(Debug, MallocSizeOf)]
48struct RateLimiter {
49    /// The instant the current interval has started.
50    started: Option<Instant>,
51    /// The count for the current interval.
52    count: u32,
53    /// The duration of each interval.
54    interval: Duration,
55    /// The maximum count per interval.
56    max_count: u32,
57}
58
59/// An enum to represent the current state of the RateLimiter.
60#[derive(PartialEq)]
61enum RateLimiterState {
62    /// The RateLimiter has not reached the maximum count and is still incrementing.
63    Incrementing,
64    /// The RateLimiter has reached the maximum count for the  current interval.
65    ///
66    /// This variant contains the remaining time (in milliseconds)
67    /// until the rate limiter is not throttled anymore.
68    Throttled(u64),
69}
70
71impl RateLimiter {
72    pub fn new(interval: Duration, max_count: u32) -> Self {
73        Self {
74            started: None,
75            count: 0,
76            interval,
77            max_count,
78        }
79    }
80
81    fn reset(&mut self) {
82        self.started = Some(Instant::now());
83        self.count = 0;
84    }
85
86    fn elapsed(&self) -> Duration {
87        self.started.unwrap().elapsed()
88    }
89
90    // The counter should reset if
91    //
92    // 1. It has never started;
93    // 2. It has been started more than the interval time ago;
94    // 3. Something goes wrong while trying to calculate the elapsed time since the last reset.
95    fn should_reset(&self) -> bool {
96        if self.started.is_none() {
97            return true;
98        }
99
100        // Safe unwrap, we already stated that `self.started` is not `None` above.
101        if self.elapsed() > self.interval {
102            return true;
103        }
104
105        false
106    }
107
108    /// Tries to increment the internal counter.
109    ///
110    /// # Returns
111    ///
112    /// The current state of the RateLimiter.
113    pub fn get_state(&mut self) -> RateLimiterState {
114        if self.should_reset() {
115            self.reset();
116        }
117
118        if self.count == self.max_count {
119            // Note that `remining` can't be a negative number because we just called `reset`,
120            // which will check if it is and reset if so.
121            let remaining = self.interval.as_millis() - self.elapsed().as_millis();
122            return RateLimiterState::Throttled(
123                remaining
124                    .try_into()
125                    .unwrap_or(self.interval.as_secs() * 1000),
126            );
127        }
128
129        self.count += 1;
130        RateLimiterState::Incrementing
131    }
132}
133
134/// An enum representing the possible upload tasks to be performed by an uploader.
135///
136/// When asking for the next ping request to upload,
137/// the requester may receive one out of three possible tasks.
138#[derive(PartialEq, Eq, Debug)]
139pub enum PingUploadTask {
140    /// An upload task
141    Upload {
142        /// The ping request for upload
143        /// See [`PingRequest`](struct.PingRequest.html) for more information.
144        request: PingRequest,
145    },
146
147    /// A flag signaling that the pending pings directories are not done being processed,
148    /// thus the requester should wait and come back later.
149    Wait {
150        /// The time in milliseconds
151        /// the requester should wait before requesting a new task.
152        time: u64,
153    },
154
155    /// A flag signaling that requester doesn't need to request any more upload tasks at this moment.
156    ///
157    /// There are three possibilities for this scenario:
158    /// * Pending pings queue is empty, no more pings to request;
159    /// * Requester has gotten more than MAX_WAIT_ATTEMPTS (3, by default) `PingUploadTask::Wait` responses in a row;
160    /// * Requester has reported more than MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW
161    ///   recoverable upload failures on the same uploading window (see below)
162    ///   and should stop requesting at this moment.
163    ///
164    /// An "uploading window" starts when a requester gets a new
165    /// `PingUploadTask::Upload(PingRequest)` response and finishes when they
166    /// finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response.
167    Done {
168        #[doc(hidden)]
169        /// Unused field. Required because UniFFI can't handle variants without fields.
170        unused: i8,
171    },
172}
173
174impl PingUploadTask {
175    /// Whether the current task is an upload task.
176    pub fn is_upload(&self) -> bool {
177        matches!(self, PingUploadTask::Upload { .. })
178    }
179
180    /// Whether the current task is wait task.
181    pub fn is_wait(&self) -> bool {
182        matches!(self, PingUploadTask::Wait { .. })
183    }
184
185    pub(crate) fn done() -> Self {
186        PingUploadTask::Done { unused: 0 }
187    }
188}
189
190/// Manages the pending pings queue and directory.
191#[derive(Debug)]
192pub struct PingUploadManager {
193    /// A FIFO queue storing a `PingRequest` for each pending ping.
194    queue: RwLock<VecDeque<PingRequest>>,
195    /// A manager for the pending pings directories.
196    directory_manager: PingDirectoryManager,
197    /// A flag signaling if we are done processing the pending pings directories.
198    processed_pending_pings: Arc<AtomicBool>,
199    /// A vector to store the pending pings processed off-thread.
200    cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
201    /// The number of upload failures for the current uploading window.
202    recoverable_failure_count: AtomicU32,
203    /// The number or times in a row a user has received a `PingUploadTask::Wait` response.
204    wait_attempt_count: AtomicU32,
205    /// A ping counter to help rate limit the ping uploads.
206    ///
207    /// To keep resource usage in check,
208    /// we may want to limit the amount of pings sent in a given interval.
209    rate_limiter: Option<RwLock<RateLimiter>>,
210    /// The name of the programming language used by the binding creating this instance of PingUploadManager.
211    ///
212    /// This will be used to build the value User-Agent header for each ping request.
213    language_binding_name: String,
214    /// Metrics related to ping uploading.
215    upload_metrics: UploadMetrics,
216    /// Policies for ping storage, uploading and requests.
217    policy: Policy,
218
219    in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
220}
221
222impl MallocSizeOf for PingUploadManager {
223    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
224        let shallow_size = {
225            let queue = self.queue.read().unwrap();
226            if ops.has_malloc_enclosing_size_of() {
227                if let Some(front) = queue.front() {
228                    // SAFETY: The front element is a valid interior pointer and thus valid to pass
229                    // to an external function.
230                    unsafe { ops.malloc_enclosing_size_of(front) }
231                } else {
232                    // This assumes that no memory is allocated when the VecDeque is empty.
233                    0
234                }
235            } else {
236                // If `ops` can't estimate the size of a pointer,
237                // we can estimate the allocation size by the size of each element and the
238                // allocated capacity.
239                queue.capacity() * mem::size_of::<PingRequest>()
240            }
241        };
242
243        let mut n = shallow_size
244            + self.directory_manager.size_of(ops)
245            // SAFETY: We own this arc and can pass a pointer to it to an external function.
246            + unsafe { ops.malloc_size_of(self.processed_pending_pings.as_ptr()) }
247            + self.cached_pings.read().unwrap().size_of(ops)
248            + self.rate_limiter.as_ref().map(|rl| {
249                let lock = rl.read().unwrap();
250                (*lock).size_of(ops)
251            }).unwrap_or(0)
252            + self.language_binding_name.size_of(ops)
253            + self.upload_metrics.size_of(ops)
254            + self.policy.size_of(ops);
255
256        let in_flight = self.in_flight.read().unwrap();
257        n += in_flight.size_of(ops);
258
259        n
260    }
261}
262
263impl PingUploadManager {
264    /// Creates a new PingUploadManager.
265    ///
266    /// # Arguments
267    ///
268    /// * `data_path` - Path to the pending pings directory.
269    /// * `language_binding_name` - The name of the language binding calling this managers instance.
270    ///
271    /// # Panics
272    ///
273    /// Will panic if unable to spawn a new thread.
274    pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
275        Self {
276            queue: RwLock::new(VecDeque::new()),
277            directory_manager: PingDirectoryManager::new(data_path),
278            processed_pending_pings: Arc::new(AtomicBool::new(false)),
279            cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
280            recoverable_failure_count: AtomicU32::new(0),
281            wait_attempt_count: AtomicU32::new(0),
282            rate_limiter: None,
283            language_binding_name: language_binding_name.into(),
284            upload_metrics: UploadMetrics::new(),
285            policy: Policy::default(),
286            in_flight: RwLock::new(HashMap::default()),
287        }
288    }
289
290    /// Spawns a new thread and processes the pending pings directories,
291    /// filling up the queue with whatever pings are in there.
292    ///
293    /// # Returns
294    ///
295    /// The `JoinHandle` to the spawned thread
296    pub fn scan_pending_pings_directories(
297        &self,
298        trigger_upload: bool,
299    ) -> std::thread::JoinHandle<()> {
300        let local_manager = self.directory_manager.clone();
301        let local_cached_pings = self.cached_pings.clone();
302        let local_flag = self.processed_pending_pings.clone();
303        thread::Builder::new()
304            .name("glean.ping_directory_manager.process_dir".to_string())
305            .spawn(move || {
306                {
307                    // Be sure to drop local_cached_pings lock before triggering upload.
308                    let mut local_cached_pings = local_cached_pings
309                        .write()
310                        .expect("Can't write to pending pings cache.");
311                    local_cached_pings.extend(local_manager.process_dirs());
312                    local_flag.store(true, Ordering::SeqCst);
313                }
314                if trigger_upload {
315                    crate::dispatcher::launch(|| {
316                        if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok())
317                        {
318                            if let Err(e) = state.callbacks.trigger_upload() {
319                                log::error!(
320                                    "Triggering upload after pending ping scan failed. Error: {}",
321                                    e
322                                );
323                            }
324                        }
325                    });
326                }
327            })
328            .expect("Unable to spawn thread to process pings directories.")
329    }
330
331    /// Creates a new upload manager with no limitations, for tests.
332    #[cfg(test)]
333    pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
334        let mut upload_manager = Self::new(data_path, "Test");
335
336        // Disable all policies for tests, if necessary individuals tests can re-enable them.
337        upload_manager.policy.set_max_recoverable_failures(None);
338        upload_manager.policy.set_max_wait_attempts(None);
339        upload_manager.policy.set_max_ping_body_size(None);
340        upload_manager
341            .policy
342            .set_max_pending_pings_directory_size(None);
343        upload_manager.policy.set_max_pending_pings_count(None);
344
345        // When building for tests, always scan the pending pings directories and do it sync.
346        upload_manager
347            .scan_pending_pings_directories(false)
348            .join()
349            .unwrap();
350
351        upload_manager
352    }
353
354    fn processed_pending_pings(&self) -> bool {
355        self.processed_pending_pings.load(Ordering::SeqCst)
356    }
357
358    fn recoverable_failure_count(&self) -> u32 {
359        self.recoverable_failure_count.load(Ordering::SeqCst)
360    }
361
362    fn wait_attempt_count(&self) -> u32 {
363        self.wait_attempt_count.load(Ordering::SeqCst)
364    }
365
366    /// Attempts to build a ping request from a ping file payload.
367    ///
368    /// Returns the `PingRequest` or `None` if unable to build,
369    /// in which case it will delete the ping file and record an error.
370    fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
371        let PingPayload {
372            document_id,
373            upload_path: path,
374            json_body: body,
375            headers,
376            body_has_info_sections,
377            ping_name,
378            uploader_capabilities,
379        } = ping;
380        let mut request = PingRequest::builder(
381            &self.language_binding_name,
382            self.policy.max_ping_body_size(),
383        )
384        .document_id(&document_id)
385        .path(path)
386        .body(body)
387        .body_has_info_sections(body_has_info_sections)
388        .ping_name(ping_name)
389        .uploader_capabilities(uploader_capabilities);
390
391        if let Some(headers) = headers {
392            request = request.headers(headers);
393        }
394
395        match request.build() {
396            Ok(request) => Some(request),
397            Err(e) => {
398                log::warn!("Error trying to build ping request: {}", e);
399                self.directory_manager.delete_file(&document_id);
400
401                // Record the error.
402                // Currently the only possible error is PingBodyOverflow.
403                if let ErrorKind::PingBodyOverflow(s) = e.kind() {
404                    self.upload_metrics
405                        .discarded_exceeding_pings_size
406                        .accumulate_sync(glean, *s as i64 / 1024);
407                }
408
409                None
410            }
411        }
412    }
413
414    /// Enqueue a ping for upload.
415    pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
416        let mut queue = self
417            .queue
418            .write()
419            .expect("Can't write to pending pings queue.");
420
421        let PingPayload {
422            ref document_id,
423            upload_path: ref path,
424            ..
425        } = ping;
426        // Checks if a ping with this `document_id` is already enqueued.
427        if queue
428            .iter()
429            .any(|request| request.document_id.as_str() == document_id)
430        {
431            log::warn!(
432                "Attempted to enqueue a duplicate ping {} at {}.",
433                document_id,
434                path
435            );
436            return;
437        }
438
439        {
440            let in_flight = self.in_flight.read().unwrap();
441            if in_flight.contains_key(document_id) {
442                log::warn!(
443                    "Attempted to enqueue an in-flight ping {} at {}.",
444                    document_id,
445                    path
446                );
447                self.upload_metrics
448                    .in_flight_pings_dropped
449                    .add_sync(glean, 0);
450                return;
451            }
452        }
453
454        log::trace!("Enqueuing ping {} at {}", document_id, path);
455        if let Some(request) = self.build_ping_request(glean, ping) {
456            queue.push_back(request)
457        }
458    }
459
460    /// Enqueues pings that might have been cached.
461    ///
462    /// The size of the PENDING_PINGS_DIRECTORY directory will be calculated
463    /// (by accumulating each ping's size in that directory)
464    /// and in case we exceed the quota, defined by the `quota` arg,
465    /// outstanding pings get deleted and are not enqueued.
466    ///
467    /// The size of the DELETION_REQUEST_PINGS_DIRECTORY will not be calculated
468    /// and no deletion-request pings will be deleted. Deletion request pings
469    /// are not very common and usually don't contain any data,
470    /// we don't expect that directory to ever reach quota.
471    /// Most importantly, we don't want to ever delete deletion-request pings.
472    ///
473    /// # Arguments
474    ///
475    /// * `glean` - The Glean object holding the database.
476    fn enqueue_cached_pings(&self, glean: &Glean) {
477        let mut cached_pings = self
478            .cached_pings
479            .write()
480            .expect("Can't write to pending pings cache.");
481
482        if cached_pings.len() > 0 {
483            let mut pending_pings_directory_size: u64 = 0;
484            let mut pending_pings_count = 0;
485            let mut deleting = false;
486
487            let total = cached_pings.pending_pings.len() as u64;
488            self.upload_metrics
489                .pending_pings
490                .add_sync(glean, total.try_into().unwrap_or(0));
491
492            if total > self.policy.max_pending_pings_count() {
493                log::warn!(
494                    "More than {} pending pings in the directory, will delete {} old pings.",
495                    self.policy.max_pending_pings_count(),
496                    total - self.policy.max_pending_pings_count()
497                );
498            }
499
500            // The pending pings vector is sorted by date in ascending order (oldest -> newest).
501            // We need to calculate the size of the pending pings directory
502            // and delete the **oldest** pings in case quota is reached.
503            // Thus, we reverse the order of the pending pings vector,
504            // so that we iterate in descending order (newest -> oldest).
505            cached_pings.pending_pings.reverse();
506            cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
507                pending_pings_count += 1;
508                pending_pings_directory_size += file_size;
509
510                // We don't want to spam the log for every ping over the quota.
511                if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
512                    log::warn!(
513                        "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
514                        self.policy.max_pending_pings_directory_size()
515                    );
516                    deleting = true;
517                }
518
519                // Once we reach the number of allowed pings we start deleting,
520                // no matter what size.
521                // We already log this before the loop.
522                if pending_pings_count > self.policy.max_pending_pings_count() {
523                    deleting = true;
524                }
525
526                if deleting && self.directory_manager.delete_file(document_id) {
527                    self.upload_metrics
528                        .deleted_pings_after_quota_hit
529                        .add_sync(glean, 1);
530                    return false;
531                }
532
533                true
534            });
535            // After calculating the size of the pending pings directory,
536            // we record the calculated number and reverse the pings array back for enqueueing.
537            cached_pings.pending_pings.reverse();
538            self.upload_metrics
539                .pending_pings_directory_size
540                .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
541
542            // Enqueue the remaining pending pings and
543            // enqueue all deletion-request pings.
544            cached_pings
545                .deletion_request_pings
546                .drain(..)
547                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
548            cached_pings
549                .pending_pings
550                .drain(..)
551                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
552        }
553    }
554
555    /// Adds rate limiting capability to this upload manager.
556    ///
557    /// The rate limiter will limit the amount of calls to `get_upload_task` per interval.
558    ///
559    /// Setting this will restart count and timer in case there was a previous rate limiter set
560    /// (e.g. if we have reached the current limit and call this function, we start counting again
561    /// and the caller is allowed to asks for tasks).
562    ///
563    /// # Arguments
564    ///
565    /// * `interval` - the amount of seconds in each rate limiting window.
566    /// * `max_tasks` - the maximum amount of task requests allowed per interval.
567    pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
568        self.rate_limiter = Some(RwLock::new(RateLimiter::new(
569            Duration::from_secs(interval),
570            max_tasks,
571        )));
572    }
573
574    /// Reads a ping file, creates a `PingRequest` and adds it to the queue.
575    ///
576    /// Duplicate requests won't be added.
577    ///
578    /// # Arguments
579    ///
580    /// * `glean` - The Glean object holding the database.
581    /// * `document_id` - The UUID of the ping in question.
582    pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
583        if let Some(ping) = self.directory_manager.process_file(document_id) {
584            self.enqueue_ping(glean, ping);
585        }
586    }
587
588    /// Clears the pending pings queue, leaves the deletion-request pings.
589    pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
590        log::trace!("Clearing ping queue");
591        let mut queue = self
592            .queue
593            .write()
594            .expect("Can't write to pending pings queue.");
595
596        queue.retain(|ping| ping.is_deletion_request());
597        log::trace!(
598            "{} pings left in the queue (only deletion-request expected)",
599            queue.len()
600        );
601        queue
602    }
603
604    fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
605        // Helper to decide whether to return PingUploadTask::Wait or PingUploadTask::Done.
606        //
607        // We want to limit the amount of PingUploadTask::Wait returned in a row,
608        // in case we reach MAX_WAIT_ATTEMPTS we want to actually return PingUploadTask::Done.
609        let wait_or_done = |time: u64| {
610            self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
611            if self.wait_attempt_count() > self.policy.max_wait_attempts() {
612                PingUploadTask::done()
613            } else {
614                PingUploadTask::Wait { time }
615            }
616        };
617
618        if !self.processed_pending_pings() {
619            log::info!(
620                "Tried getting an upload task, but processing is ongoing. Will come back later."
621            );
622            return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
623        }
624
625        // This is a no-op in case there are no cached pings.
626        self.enqueue_cached_pings(glean);
627
628        if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
629            log::warn!(
630                "Reached maximum recoverable failures for the current uploading window. You are done."
631            );
632            return PingUploadTask::done();
633        }
634
635        let mut queue = self
636            .queue
637            .write()
638            .expect("Can't write to pending pings queue.");
639        match queue.front() {
640            Some(request) => {
641                if let Some(rate_limiter) = &self.rate_limiter {
642                    let mut rate_limiter = rate_limiter
643                        .write()
644                        .expect("Can't write to the rate limiter.");
645                    if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
646                        log::info!(
647                            "Tried getting an upload task, but we are throttled at the moment."
648                        );
649                        return wait_or_done(remaining);
650                    }
651                }
652
653                log::info!(
654                    "New upload task with id {} (path: {})",
655                    request.document_id,
656                    request.path
657                );
658
659                if log_ping {
660                    if let Some(body) = request.pretty_body() {
661                        chunked_log_info(&request.path, &body);
662                    } else {
663                        chunked_log_info(&request.path, "<invalid ping payload>");
664                    }
665                }
666
667                {
668                    // Synchronous timer starts.
669                    // We're in the uploader thread anyway.
670                    // But also: No data is stored on disk.
671                    let mut in_flight = self.in_flight.write().unwrap();
672                    let success_id = self.upload_metrics.send_success.start_sync();
673                    let failure_id = self.upload_metrics.send_failure.start_sync();
674                    in_flight.insert(request.document_id.clone(), (success_id, failure_id));
675                }
676
677                let mut request = queue.pop_front().unwrap();
678
679                // Adding the `Date` header just before actual upload happens.
680                request
681                    .headers
682                    .insert("Date".to_string(), create_date_header_value(Utc::now()));
683
684                PingUploadTask::Upload { request }
685            }
686            None => {
687                log::info!("No more pings to upload! You are done.");
688                PingUploadTask::done()
689            }
690        }
691    }
692
693    /// Gets the next `PingUploadTask`.
694    ///
695    /// # Arguments
696    ///
697    /// * `glean` - The Glean object holding the database.
698    /// * `log_ping` - Whether to log the ping before returning.
699    ///
700    /// # Returns
701    ///
702    /// The next [`PingUploadTask`](enum.PingUploadTask.html).
703    pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
704        let task = self.get_upload_task_internal(glean, log_ping);
705
706        if !task.is_wait() && self.wait_attempt_count() > 0 {
707            self.wait_attempt_count.store(0, Ordering::SeqCst);
708        }
709
710        if !task.is_upload() && self.recoverable_failure_count() > 0 {
711            self.recoverable_failure_count.store(0, Ordering::SeqCst);
712        }
713
714        task
715    }
716
717    /// Processes the response from an attempt to upload a ping.
718    ///
719    /// Based on the HTTP status of said response,
720    /// the possible outcomes are:
721    ///
722    /// * **200 - 299 Success**
723    ///   Any status on the 2XX range is considered a succesful upload,
724    ///   which means the corresponding ping file can be deleted.
725    ///   _Known 2XX status:_
726    ///   * 200 - OK. Request accepted into the pipeline.
727    ///
728    /// * **400 - 499 Unrecoverable error**
729    ///   Any status on the 4XX range means something our client did is not correct.
730    ///   It is unlikely that the client is going to recover from this by retrying,
731    ///   so in this case the corresponding ping file can also be deleted.
732    ///   _Known 4XX status:_
733    ///   * 404 - not found - POST/PUT to an unknown namespace
734    ///   * 405 - wrong request type (anything other than POST/PUT)
735    ///   * 411 - missing content-length header
736    ///   * 413 - request body too large Note that if we have badly-behaved clients that
737    ///           retry on 4XX, we should send back 202 on body/path too long).
738    ///   * 414 - request path too long (See above)
739    ///
740    /// * **Any other error**
741    ///   For any other error, a warning is logged and the ping is re-enqueued.
742    ///   _Known other errors:_
743    ///   * 500 - internal error
744    ///
745    /// # Note
746    ///
747    /// The disk I/O performed by this function is not done off-thread,
748    /// as it is expected to be called off-thread by the platform.
749    ///
750    /// # Arguments
751    ///
752    /// * `glean` - The Glean object holding the database.
753    /// * `document_id` - The UUID of the ping in question.
754    /// * `status` - The HTTP status of the response.
755    pub fn process_ping_upload_response(
756        &self,
757        glean: &Glean,
758        document_id: &str,
759        status: UploadResult,
760    ) -> UploadTaskAction {
761        use UploadResult::*;
762
763        let stop_time = time::precise_time_ns();
764
765        if let Some(label) = status.get_label() {
766            let metric = self.upload_metrics.ping_upload_failure.get(label);
767            metric.add_sync(glean, 1);
768        }
769
770        let send_ids = {
771            let mut lock = self.in_flight.write().unwrap();
772            lock.remove(document_id)
773        };
774
775        if send_ids.is_none() {
776            self.upload_metrics.missing_send_ids.add_sync(glean, 1);
777        }
778
779        match status {
780            HttpStatus { code } if (200..=299).contains(&code) => {
781                log::info!("Ping {} successfully sent {}.", document_id, code);
782                if let Some((success_id, failure_id)) = send_ids {
783                    self.upload_metrics
784                        .send_success
785                        .set_stop_and_accumulate(glean, success_id, stop_time);
786                    self.upload_metrics.send_failure.cancel_sync(failure_id);
787                }
788                self.directory_manager.delete_file(document_id);
789            }
790
791            UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
792                log::warn!(
793                    "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
794                    document_id,
795                    status
796                );
797                if let Some((success_id, failure_id)) = send_ids {
798                    self.upload_metrics.send_success.cancel_sync(success_id);
799                    self.upload_metrics
800                        .send_failure
801                        .set_stop_and_accumulate(glean, failure_id, stop_time);
802                }
803                self.directory_manager.delete_file(document_id);
804            }
805
806            RecoverableFailure { .. } | HttpStatus { .. } => {
807                log::warn!(
808                    "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
809                    document_id,
810                    status
811                );
812                if let Some((success_id, failure_id)) = send_ids {
813                    self.upload_metrics.send_success.cancel_sync(success_id);
814                    self.upload_metrics
815                        .send_failure
816                        .set_stop_and_accumulate(glean, failure_id, stop_time);
817                }
818                self.enqueue_ping_from_file(glean, document_id);
819                self.recoverable_failure_count
820                    .fetch_add(1, Ordering::SeqCst);
821            }
822
823            Done { .. } => {
824                log::debug!("Uploader signaled Done. Exiting.");
825                if let Some((success_id, failure_id)) = send_ids {
826                    self.upload_metrics.send_success.cancel_sync(success_id);
827                    self.upload_metrics.send_failure.cancel_sync(failure_id);
828                }
829                return UploadTaskAction::End;
830            }
831        };
832
833        UploadTaskAction::Next
834    }
835}
836
837/// Splits log message into chunks on Android.
838#[cfg(target_os = "android")]
839pub fn chunked_log_info(path: &str, payload: &str) {
840    // Since the logcat ring buffer size is configurable, but it's 'max payload' size is not,
841    // we must break apart long pings into chunks no larger than the max payload size of 4076b.
842    // We leave some head space for our prefix.
843    const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
844
845    // If the length of the ping will fit within one logcat payload, then we can
846    // short-circuit here and avoid some overhead, otherwise we must split up the
847    // message so that we don't truncate it.
848    if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
849        log::info!("Glean ping to URL: {}\n{}", path, payload);
850        return;
851    }
852
853    // Otherwise we break it apart into chunks of smaller size,
854    // prefixing it with the path and a counter.
855    let mut start = 0;
856    let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
857    let mut chunk_idx = 1;
858    // Might be off by 1 on edge cases, but do we really care?
859    let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
860
861    while end < payload.len() {
862        // Find char boundary from the end.
863        // It's UTF-8, so it is within 4 bytes from here.
864        for _ in 0..4 {
865            if payload.is_char_boundary(end) {
866                break;
867            }
868            end -= 1;
869        }
870
871        log::info!(
872            "Glean ping to URL: {} [Part {} of {}]\n{}",
873            path,
874            chunk_idx,
875            total_chunks,
876            &payload[start..end]
877        );
878
879        // Move on with the string
880        start = end;
881        end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
882        chunk_idx += 1;
883    }
884
885    // Print any suffix left
886    if start < payload.len() {
887        log::info!(
888            "Glean ping to URL: {} [Part {} of {}]\n{}",
889            path,
890            chunk_idx,
891            total_chunks,
892            &payload[start..]
893        );
894    }
895}
896
897/// Logs payload in one go (all other OS).
898#[cfg(not(target_os = "android"))]
899pub fn chunked_log_info(_path: &str, payload: &str) {
900    log::info!("{}", payload)
901}
902
903#[cfg(test)]
904mod test {
905    use uuid::Uuid;
906
907    use super::*;
908    use crate::metrics::PingType;
909    use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
910
911    const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
912
913    #[test]
914    fn doesnt_error_when_there_are_no_pending_pings() {
915        let (glean, _t) = new_glean(None);
916
917        // Try and get the next request.
918        // Verify request was not returned
919        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
920    }
921
922    #[test]
923    fn returns_ping_request_when_there_is_one() {
924        let (glean, dir) = new_glean(None);
925
926        let upload_manager = PingUploadManager::no_policy(dir.path());
927
928        // Enqueue a ping
929        upload_manager.enqueue_ping(
930            &glean,
931            PingPayload {
932                document_id: Uuid::new_v4().to_string(),
933                upload_path: PATH.into(),
934                json_body: "".into(),
935                headers: None,
936                body_has_info_sections: true,
937                ping_name: "ping-name".into(),
938                uploader_capabilities: vec![],
939            },
940        );
941
942        // Try and get the next request.
943        // Verify request was returned
944        let task = upload_manager.get_upload_task(&glean, false);
945        assert!(task.is_upload());
946    }
947
948    #[test]
949    fn returns_as_many_ping_requests_as_there_are() {
950        let (glean, dir) = new_glean(None);
951
952        let upload_manager = PingUploadManager::no_policy(dir.path());
953
954        // Enqueue a ping multiple times
955        let n = 10;
956        for _ in 0..n {
957            upload_manager.enqueue_ping(
958                &glean,
959                PingPayload {
960                    document_id: Uuid::new_v4().to_string(),
961                    upload_path: PATH.into(),
962                    json_body: "".into(),
963                    headers: None,
964                    body_has_info_sections: true,
965                    ping_name: "ping-name".into(),
966                    uploader_capabilities: vec![],
967                },
968            );
969        }
970
971        // Verify a request is returned for each submitted ping
972        for _ in 0..n {
973            let task = upload_manager.get_upload_task(&glean, false);
974            assert!(task.is_upload());
975        }
976
977        // Verify that after all requests are returned, none are left
978        assert_eq!(
979            upload_manager.get_upload_task(&glean, false),
980            PingUploadTask::done()
981        );
982    }
983
984    #[test]
985    fn limits_the_number_of_pings_when_there_is_rate_limiting() {
986        let (glean, dir) = new_glean(None);
987
988        let mut upload_manager = PingUploadManager::no_policy(dir.path());
989
990        // Add a rate limiter to the upload mangager with max of 10 pings every 3 seconds.
991        let max_pings_per_interval = 10;
992        upload_manager.set_rate_limiter(3, 10);
993
994        // Enqueue the max number of pings allowed per uploading window
995        for _ in 0..max_pings_per_interval {
996            upload_manager.enqueue_ping(
997                &glean,
998                PingPayload {
999                    document_id: Uuid::new_v4().to_string(),
1000                    upload_path: PATH.into(),
1001                    json_body: "".into(),
1002                    headers: None,
1003                    body_has_info_sections: true,
1004                    ping_name: "ping-name".into(),
1005                    uploader_capabilities: vec![],
1006                },
1007            );
1008        }
1009
1010        // Verify a request is returned for each submitted ping
1011        for _ in 0..max_pings_per_interval {
1012            let task = upload_manager.get_upload_task(&glean, false);
1013            assert!(task.is_upload());
1014        }
1015
1016        // Enqueue just one more ping
1017        upload_manager.enqueue_ping(
1018            &glean,
1019            PingPayload {
1020                document_id: Uuid::new_v4().to_string(),
1021                upload_path: PATH.into(),
1022                json_body: "".into(),
1023                headers: None,
1024                body_has_info_sections: true,
1025                ping_name: "ping-name".into(),
1026                uploader_capabilities: vec![],
1027            },
1028        );
1029
1030        // Verify that we are indeed told to wait because we are at capacity
1031        match upload_manager.get_upload_task(&glean, false) {
1032            PingUploadTask::Wait { time } => {
1033                // Wait for the uploading window to reset
1034                thread::sleep(Duration::from_millis(time));
1035            }
1036            _ => panic!("Expected upload manager to return a wait task!"),
1037        };
1038
1039        let task = upload_manager.get_upload_task(&glean, false);
1040        assert!(task.is_upload());
1041    }
1042
1043    #[test]
1044    fn clearing_the_queue_works_correctly() {
1045        let (glean, dir) = new_glean(None);
1046
1047        let upload_manager = PingUploadManager::no_policy(dir.path());
1048
1049        // Enqueue a ping multiple times
1050        for _ in 0..10 {
1051            upload_manager.enqueue_ping(
1052                &glean,
1053                PingPayload {
1054                    document_id: Uuid::new_v4().to_string(),
1055                    upload_path: PATH.into(),
1056                    json_body: "".into(),
1057                    headers: None,
1058                    body_has_info_sections: true,
1059                    ping_name: "ping-name".into(),
1060                    uploader_capabilities: vec![],
1061                },
1062            );
1063        }
1064
1065        // Clear the queue
1066        drop(upload_manager.clear_ping_queue());
1067
1068        // Verify there really isn't any ping in the queue
1069        assert_eq!(
1070            upload_manager.get_upload_task(&glean, false),
1071            PingUploadTask::done()
1072        );
1073    }
1074
1075    #[test]
1076    fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1077        let (mut glean, _t) = new_glean(None);
1078
1079        // Register a ping for testing
1080        let ping_type = PingType::new(
1081            "test",
1082            true,
1083            /* send_if_empty */ true,
1084            true,
1085            true,
1086            true,
1087            vec![],
1088            vec![],
1089            true,
1090            vec![],
1091        );
1092        glean.register_ping_type(&ping_type);
1093
1094        // Submit the ping multiple times
1095        let n = 10;
1096        for _ in 0..n {
1097            ping_type.submit_sync(&glean, None);
1098        }
1099
1100        glean
1101            .internal_pings
1102            .deletion_request
1103            .submit_sync(&glean, None);
1104
1105        // Clear the queue
1106        drop(glean.upload_manager.clear_ping_queue());
1107
1108        let upload_task = glean.get_upload_task();
1109        match upload_task {
1110            PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1111            _ => panic!("Expected upload manager to return the next request!"),
1112        }
1113
1114        // Verify there really isn't any other pings in the queue
1115        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1116    }
1117
1118    #[test]
1119    fn fills_up_queue_successfully_from_disk() {
1120        let (mut glean, dir) = new_glean(None);
1121
1122        // Register a ping for testing
1123        let ping_type = PingType::new(
1124            "test",
1125            true,
1126            /* send_if_empty */ true,
1127            true,
1128            true,
1129            true,
1130            vec![],
1131            vec![],
1132            true,
1133            vec![],
1134        );
1135        glean.register_ping_type(&ping_type);
1136
1137        // Submit the ping multiple times
1138        let n = 10;
1139        for _ in 0..n {
1140            ping_type.submit_sync(&glean, None);
1141        }
1142
1143        // Create a new upload manager pointing to the same data_path as the glean instance.
1144        let upload_manager = PingUploadManager::no_policy(dir.path());
1145
1146        // Verify the requests were properly enqueued
1147        for _ in 0..n {
1148            let task = upload_manager.get_upload_task(&glean, false);
1149            assert!(task.is_upload());
1150        }
1151
1152        // Verify that after all requests are returned, none are left
1153        assert_eq!(
1154            upload_manager.get_upload_task(&glean, false),
1155            PingUploadTask::done()
1156        );
1157    }
1158
1159    #[test]
1160    fn processes_correctly_success_upload_response() {
1161        let (mut glean, dir) = new_glean(None);
1162
1163        // Register a ping for testing
1164        let ping_type = PingType::new(
1165            "test",
1166            true,
1167            /* send_if_empty */ true,
1168            true,
1169            true,
1170            true,
1171            vec![],
1172            vec![],
1173            true,
1174            vec![],
1175        );
1176        glean.register_ping_type(&ping_type);
1177
1178        // Submit a ping
1179        ping_type.submit_sync(&glean, None);
1180
1181        // Get the pending ping directory path
1182        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1183
1184        // Get the submitted PingRequest
1185        match glean.get_upload_task() {
1186            PingUploadTask::Upload { request } => {
1187                // Simulate the processing of a sucessfull request
1188                let document_id = request.document_id;
1189                glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1190                // Verify file was deleted
1191                assert!(!pending_pings_dir.join(document_id).exists());
1192            }
1193            _ => panic!("Expected upload manager to return the next request!"),
1194        }
1195
1196        // Verify that after request is returned, none are left
1197        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1198    }
1199
1200    #[test]
1201    fn processes_correctly_client_error_upload_response() {
1202        let (mut glean, dir) = new_glean(None);
1203
1204        // Register a ping for testing
1205        let ping_type = PingType::new(
1206            "test",
1207            true,
1208            /* send_if_empty */ true,
1209            true,
1210            true,
1211            true,
1212            vec![],
1213            vec![],
1214            true,
1215            vec![],
1216        );
1217        glean.register_ping_type(&ping_type);
1218
1219        // Submit a ping
1220        ping_type.submit_sync(&glean, None);
1221
1222        // Get the pending ping directory path
1223        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1224
1225        // Get the submitted PingRequest
1226        match glean.get_upload_task() {
1227            PingUploadTask::Upload { request } => {
1228                // Simulate the processing of a client error
1229                let document_id = request.document_id;
1230                glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1231                // Verify file was deleted
1232                assert!(!pending_pings_dir.join(document_id).exists());
1233            }
1234            _ => panic!("Expected upload manager to return the next request!"),
1235        }
1236
1237        // Verify that after request is returned, none are left
1238        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1239    }
1240
1241    #[test]
1242    fn processes_correctly_server_error_upload_response() {
1243        let (mut glean, _t) = new_glean(None);
1244
1245        // Register a ping for testing
1246        let ping_type = PingType::new(
1247            "test",
1248            true,
1249            /* send_if_empty */ true,
1250            true,
1251            true,
1252            true,
1253            vec![],
1254            vec![],
1255            true,
1256            vec![],
1257        );
1258        glean.register_ping_type(&ping_type);
1259
1260        // Submit a ping
1261        ping_type.submit_sync(&glean, None);
1262
1263        // Get the submitted PingRequest
1264        match glean.get_upload_task() {
1265            PingUploadTask::Upload { request } => {
1266                // Simulate the processing of a client error
1267                let document_id = request.document_id;
1268                glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1269                // Verify this ping was indeed re-enqueued
1270                match glean.get_upload_task() {
1271                    PingUploadTask::Upload { request } => {
1272                        assert_eq!(document_id, request.document_id);
1273                    }
1274                    _ => panic!("Expected upload manager to return the next request!"),
1275                }
1276            }
1277            _ => panic!("Expected upload manager to return the next request!"),
1278        }
1279
1280        // Verify that after request is returned, none are left
1281        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1282    }
1283
1284    #[test]
1285    fn processes_correctly_unrecoverable_upload_response() {
1286        let (mut glean, dir) = new_glean(None);
1287
1288        // Register a ping for testing
1289        let ping_type = PingType::new(
1290            "test",
1291            true,
1292            /* send_if_empty */ true,
1293            true,
1294            true,
1295            true,
1296            vec![],
1297            vec![],
1298            true,
1299            vec![],
1300        );
1301        glean.register_ping_type(&ping_type);
1302
1303        // Submit a ping
1304        ping_type.submit_sync(&glean, None);
1305
1306        // Get the pending ping directory path
1307        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1308
1309        // Get the submitted PingRequest
1310        match glean.get_upload_task() {
1311            PingUploadTask::Upload { request } => {
1312                // Simulate the processing of a client error
1313                let document_id = request.document_id;
1314                glean.process_ping_upload_response(
1315                    &document_id,
1316                    UploadResult::unrecoverable_failure(),
1317                );
1318                // Verify file was deleted
1319                assert!(!pending_pings_dir.join(document_id).exists());
1320            }
1321            _ => panic!("Expected upload manager to return the next request!"),
1322        }
1323
1324        // Verify that after request is returned, none are left
1325        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1326    }
1327
1328    #[test]
1329    fn new_pings_are_added_while_upload_in_progress() {
1330        let (glean, dir) = new_glean(None);
1331
1332        let upload_manager = PingUploadManager::no_policy(dir.path());
1333
1334        let doc1 = Uuid::new_v4().to_string();
1335        let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1336
1337        let doc2 = Uuid::new_v4().to_string();
1338        let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1339
1340        // Enqueue a ping
1341        upload_manager.enqueue_ping(
1342            &glean,
1343            PingPayload {
1344                document_id: doc1.clone(),
1345                upload_path: path1,
1346                json_body: "".into(),
1347                headers: None,
1348                body_has_info_sections: true,
1349                ping_name: "test-ping".into(),
1350                uploader_capabilities: vec![],
1351            },
1352        );
1353
1354        // Try and get the first request.
1355        let req = match upload_manager.get_upload_task(&glean, false) {
1356            PingUploadTask::Upload { request } => request,
1357            _ => panic!("Expected upload manager to return the next request!"),
1358        };
1359        assert_eq!(doc1, req.document_id);
1360
1361        // Schedule the next one while the first one is "in progress"
1362        upload_manager.enqueue_ping(
1363            &glean,
1364            PingPayload {
1365                document_id: doc2.clone(),
1366                upload_path: path2,
1367                json_body: "".into(),
1368                headers: None,
1369                body_has_info_sections: true,
1370                ping_name: "test-ping".into(),
1371                uploader_capabilities: vec![],
1372            },
1373        );
1374
1375        // Mark as processed
1376        upload_manager.process_ping_upload_response(
1377            &glean,
1378            &req.document_id,
1379            UploadResult::http_status(200),
1380        );
1381
1382        // Get the second request.
1383        let req = match upload_manager.get_upload_task(&glean, false) {
1384            PingUploadTask::Upload { request } => request,
1385            _ => panic!("Expected upload manager to return the next request!"),
1386        };
1387        assert_eq!(doc2, req.document_id);
1388
1389        // Mark as processed
1390        upload_manager.process_ping_upload_response(
1391            &glean,
1392            &req.document_id,
1393            UploadResult::http_status(200),
1394        );
1395
1396        // ... and then we're done.
1397        assert_eq!(
1398            upload_manager.get_upload_task(&glean, false),
1399            PingUploadTask::done()
1400        );
1401    }
1402
1403    #[test]
1404    fn adds_debug_view_header_to_requests_when_tag_is_set() {
1405        let (mut glean, _t) = new_glean(None);
1406
1407        glean.set_debug_view_tag("valid-tag");
1408
1409        // Register a ping for testing
1410        let ping_type = PingType::new(
1411            "test",
1412            true,
1413            /* send_if_empty */ true,
1414            true,
1415            true,
1416            true,
1417            vec![],
1418            vec![],
1419            true,
1420            vec![],
1421        );
1422        glean.register_ping_type(&ping_type);
1423
1424        // Submit a ping
1425        ping_type.submit_sync(&glean, None);
1426
1427        // Get the submitted PingRequest
1428        match glean.get_upload_task() {
1429            PingUploadTask::Upload { request } => {
1430                assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1431            }
1432            _ => panic!("Expected upload manager to return the next request!"),
1433        }
1434    }
1435
1436    #[test]
1437    fn duplicates_are_not_enqueued() {
1438        let (glean, dir) = new_glean(None);
1439
1440        // Create a new upload manager so that we have access to its functions directly,
1441        // make it synchronous so we don't have to manually wait for the scanning to finish.
1442        let upload_manager = PingUploadManager::no_policy(dir.path());
1443
1444        let doc_id = Uuid::new_v4().to_string();
1445        let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1446
1447        // Try to enqueue a ping with the same doc_id twice
1448        upload_manager.enqueue_ping(
1449            &glean,
1450            PingPayload {
1451                document_id: doc_id.clone(),
1452                upload_path: path.clone(),
1453                json_body: "".into(),
1454                headers: None,
1455                body_has_info_sections: true,
1456                ping_name: "test-ping".into(),
1457                uploader_capabilities: vec![],
1458            },
1459        );
1460        upload_manager.enqueue_ping(
1461            &glean,
1462            PingPayload {
1463                document_id: doc_id,
1464                upload_path: path,
1465                json_body: "".into(),
1466                headers: None,
1467                body_has_info_sections: true,
1468                ping_name: "test-ping".into(),
1469                uploader_capabilities: vec![],
1470            },
1471        );
1472
1473        // Get a task once
1474        let task = upload_manager.get_upload_task(&glean, false);
1475        assert!(task.is_upload());
1476
1477        // There should be no more queued tasks
1478        assert_eq!(
1479            upload_manager.get_upload_task(&glean, false),
1480            PingUploadTask::done()
1481        );
1482    }
1483
1484    #[test]
1485    fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1486        let (mut glean, dir) = new_glean(None);
1487
1488        // Register a ping for testing
1489        let ping_type = PingType::new(
1490            "test",
1491            true,
1492            /* send_if_empty */ true,
1493            true,
1494            true,
1495            true,
1496            vec![],
1497            vec![],
1498            true,
1499            vec![],
1500        );
1501        glean.register_ping_type(&ping_type);
1502
1503        // Submit the ping multiple times
1504        let n = 5;
1505        for _ in 0..n {
1506            ping_type.submit_sync(&glean, None);
1507        }
1508
1509        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1510
1511        // Set a policy for max recoverable failures, this is usually disabled for tests.
1512        let max_recoverable_failures = 3;
1513        upload_manager
1514            .policy
1515            .set_max_recoverable_failures(Some(max_recoverable_failures));
1516
1517        // Return the max recoverable error failures in a row
1518        for _ in 0..max_recoverable_failures {
1519            match upload_manager.get_upload_task(&glean, false) {
1520                PingUploadTask::Upload { request } => {
1521                    upload_manager.process_ping_upload_response(
1522                        &glean,
1523                        &request.document_id,
1524                        UploadResult::recoverable_failure(),
1525                    );
1526                }
1527                _ => panic!("Expected upload manager to return the next request!"),
1528            }
1529        }
1530
1531        // Verify that after returning the max amount of recoverable failures,
1532        // we are done even though we haven't gotten all the enqueued requests.
1533        assert_eq!(
1534            upload_manager.get_upload_task(&glean, false),
1535            PingUploadTask::done()
1536        );
1537
1538        // Verify all requests are returned when we try again.
1539        for _ in 0..n {
1540            let task = upload_manager.get_upload_task(&glean, false);
1541            assert!(task.is_upload());
1542        }
1543    }
1544
1545    #[test]
1546    fn quota_is_enforced_when_enqueueing_cached_pings() {
1547        let (mut glean, dir) = new_glean(None);
1548
1549        // Register a ping for testing
1550        let ping_type = PingType::new(
1551            "test",
1552            true,
1553            /* send_if_empty */ true,
1554            true,
1555            true,
1556            true,
1557            vec![],
1558            vec![],
1559            true,
1560            vec![],
1561        );
1562        glean.register_ping_type(&ping_type);
1563
1564        // Submit the ping multiple times
1565        let n = 10;
1566        for _ in 0..n {
1567            ping_type.submit_sync(&glean, None);
1568        }
1569
1570        let directory_manager = PingDirectoryManager::new(dir.path());
1571        let pending_pings = directory_manager.process_dirs().pending_pings;
1572        // The pending pings array is sorted by date in ascending order,
1573        // the newest element is the last one.
1574        let (_, newest_ping) = &pending_pings.last().unwrap();
1575        let PingPayload {
1576            document_id: newest_ping_id,
1577            ..
1578        } = &newest_ping;
1579
1580        // Create a new upload manager pointing to the same data_path as the glean instance.
1581        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1582
1583        // Set the quota to just a little over the size on an empty ping file.
1584        // This way we can check that one ping is kept and all others are deleted.
1585        //
1586        // From manual testing I figured out an empty ping file is 324bytes,
1587        // I am setting this a little over just so that minor changes to the ping structure
1588        // don't immediatelly break this.
1589        upload_manager
1590            .policy
1591            .set_max_pending_pings_directory_size(Some(500));
1592
1593        // Get a task once
1594        // One ping should have been enqueued.
1595        // Make sure it is the newest ping.
1596        match upload_manager.get_upload_task(&glean, false) {
1597            PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1598            _ => panic!("Expected upload manager to return the next request!"),
1599        }
1600
1601        // Verify that no other requests were returned,
1602        // they should all have been deleted because pending pings quota was hit.
1603        assert_eq!(
1604            upload_manager.get_upload_task(&glean, false),
1605            PingUploadTask::done()
1606        );
1607
1608        // Verify that the correct number of deleted pings was recorded
1609        assert_eq!(
1610            n - 1,
1611            upload_manager
1612                .upload_metrics
1613                .deleted_pings_after_quota_hit
1614                .get_value(&glean, Some("metrics"))
1615                .unwrap()
1616        );
1617        assert_eq!(
1618            n,
1619            upload_manager
1620                .upload_metrics
1621                .pending_pings
1622                .get_value(&glean, Some("metrics"))
1623                .unwrap()
1624        );
1625    }
1626
1627    #[test]
1628    fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1629        let (mut glean, dir) = new_glean(None);
1630
1631        // Register a ping for testing
1632        let ping_type = PingType::new(
1633            "test",
1634            true,
1635            /* send_if_empty */ true,
1636            true,
1637            true,
1638            true,
1639            vec![],
1640            vec![],
1641            true,
1642            vec![],
1643        );
1644        glean.register_ping_type(&ping_type);
1645
1646        // How many pings we allow at maximum
1647        let count_quota = 3;
1648        // The number of pings we fill the pending pings directory with.
1649        let n = 10;
1650
1651        // Submit the ping multiple times
1652        for _ in 0..n {
1653            ping_type.submit_sync(&glean, None);
1654        }
1655
1656        let directory_manager = PingDirectoryManager::new(dir.path());
1657        let pending_pings = directory_manager.process_dirs().pending_pings;
1658        // The pending pings array is sorted by date in ascending order,
1659        // the newest element is the last one.
1660        let expected_pings = pending_pings
1661            .iter()
1662            .rev()
1663            .take(count_quota)
1664            .map(|(_, ping)| ping.document_id.clone())
1665            .collect::<Vec<_>>();
1666
1667        // Create a new upload manager pointing to the same data_path as the glean instance.
1668        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1669
1670        upload_manager
1671            .policy
1672            .set_max_pending_pings_count(Some(count_quota as u64));
1673
1674        // Get a task once
1675        // One ping should have been enqueued.
1676        // Make sure it is the newest ping.
1677        for ping_id in expected_pings.iter().rev() {
1678            match upload_manager.get_upload_task(&glean, false) {
1679                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1680                _ => panic!("Expected upload manager to return the next request!"),
1681            }
1682        }
1683
1684        // Verify that no other requests were returned,
1685        // they should all have been deleted because pending pings quota was hit.
1686        assert_eq!(
1687            upload_manager.get_upload_task(&glean, false),
1688            PingUploadTask::done()
1689        );
1690
1691        // Verify that the correct number of deleted pings was recorded
1692        assert_eq!(
1693            (n - count_quota) as i32,
1694            upload_manager
1695                .upload_metrics
1696                .deleted_pings_after_quota_hit
1697                .get_value(&glean, Some("metrics"))
1698                .unwrap()
1699        );
1700        assert_eq!(
1701            n as i32,
1702            upload_manager
1703                .upload_metrics
1704                .pending_pings
1705                .get_value(&glean, Some("metrics"))
1706                .unwrap()
1707        );
1708    }
1709
1710    #[test]
1711    fn size_and_count_quota_work_together_size_first() {
1712        let (mut glean, dir) = new_glean(None);
1713
1714        // Register a ping for testing
1715        let ping_type = PingType::new(
1716            "test",
1717            true,
1718            /* send_if_empty */ true,
1719            true,
1720            true,
1721            true,
1722            vec![],
1723            vec![],
1724            true,
1725            vec![],
1726        );
1727        glean.register_ping_type(&ping_type);
1728
1729        let expected_number_of_pings = 3;
1730        // The number of pings we fill the pending pings directory with.
1731        let n = 10;
1732
1733        // Submit the ping multiple times
1734        for _ in 0..n {
1735            ping_type.submit_sync(&glean, None);
1736        }
1737
1738        let directory_manager = PingDirectoryManager::new(dir.path());
1739        let pending_pings = directory_manager.process_dirs().pending_pings;
1740        // The pending pings array is sorted by date in ascending order,
1741        // the newest element is the last one.
1742        let expected_pings = pending_pings
1743            .iter()
1744            .rev()
1745            .take(expected_number_of_pings)
1746            .map(|(_, ping)| ping.document_id.clone())
1747            .collect::<Vec<_>>();
1748
1749        // Create a new upload manager pointing to the same data_path as the glean instance.
1750        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1751
1752        // From manual testing we figured out a basically empty ping file is 399 bytes,
1753        // so this allows 3 pings with some headroom in case of future changes.
1754        upload_manager
1755            .policy
1756            .set_max_pending_pings_directory_size(Some(1300));
1757        upload_manager.policy.set_max_pending_pings_count(Some(5));
1758
1759        // Get a task once
1760        // One ping should have been enqueued.
1761        // Make sure it is the newest ping.
1762        for ping_id in expected_pings.iter().rev() {
1763            match upload_manager.get_upload_task(&glean, false) {
1764                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1765                _ => panic!("Expected upload manager to return the next request!"),
1766            }
1767        }
1768
1769        // Verify that no other requests were returned,
1770        // they should all have been deleted because pending pings quota was hit.
1771        assert_eq!(
1772            upload_manager.get_upload_task(&glean, false),
1773            PingUploadTask::done()
1774        );
1775
1776        // Verify that the correct number of deleted pings was recorded
1777        assert_eq!(
1778            (n - expected_number_of_pings) as i32,
1779            upload_manager
1780                .upload_metrics
1781                .deleted_pings_after_quota_hit
1782                .get_value(&glean, Some("metrics"))
1783                .unwrap()
1784        );
1785        assert_eq!(
1786            n as i32,
1787            upload_manager
1788                .upload_metrics
1789                .pending_pings
1790                .get_value(&glean, Some("metrics"))
1791                .unwrap()
1792        );
1793    }
1794
1795    #[test]
1796    fn size_and_count_quota_work_together_count_first() {
1797        let (mut glean, dir) = new_glean(None);
1798
1799        // Register a ping for testing
1800        let ping_type = PingType::new(
1801            "test",
1802            true,
1803            /* send_if_empty */ true,
1804            true,
1805            true,
1806            true,
1807            vec![],
1808            vec![],
1809            true,
1810            vec![],
1811        );
1812        glean.register_ping_type(&ping_type);
1813
1814        let expected_number_of_pings = 2;
1815        // The number of pings we fill the pending pings directory with.
1816        let n = 10;
1817
1818        // Submit the ping multiple times
1819        for _ in 0..n {
1820            ping_type.submit_sync(&glean, None);
1821        }
1822
1823        let directory_manager = PingDirectoryManager::new(dir.path());
1824        let pending_pings = directory_manager.process_dirs().pending_pings;
1825        // The pending pings array is sorted by date in ascending order,
1826        // the newest element is the last one.
1827        let expected_pings = pending_pings
1828            .iter()
1829            .rev()
1830            .take(expected_number_of_pings)
1831            .map(|(_, ping)| ping.document_id.clone())
1832            .collect::<Vec<_>>();
1833
1834        // Create a new upload manager pointing to the same data_path as the glean instance.
1835        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1836
1837        // From manual testing we figured out an empty ping file is 324bytes,
1838        // so this allows 3 pings.
1839        upload_manager
1840            .policy
1841            .set_max_pending_pings_directory_size(Some(1000));
1842        upload_manager.policy.set_max_pending_pings_count(Some(2));
1843
1844        // Get a task once
1845        // One ping should have been enqueued.
1846        // Make sure it is the newest ping.
1847        for ping_id in expected_pings.iter().rev() {
1848            match upload_manager.get_upload_task(&glean, false) {
1849                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1850                _ => panic!("Expected upload manager to return the next request!"),
1851            }
1852        }
1853
1854        // Verify that no other requests were returned,
1855        // they should all have been deleted because pending pings quota was hit.
1856        assert_eq!(
1857            upload_manager.get_upload_task(&glean, false),
1858            PingUploadTask::done()
1859        );
1860
1861        // Verify that the correct number of deleted pings was recorded
1862        assert_eq!(
1863            (n - expected_number_of_pings) as i32,
1864            upload_manager
1865                .upload_metrics
1866                .deleted_pings_after_quota_hit
1867                .get_value(&glean, Some("metrics"))
1868                .unwrap()
1869        );
1870        assert_eq!(
1871            n as i32,
1872            upload_manager
1873                .upload_metrics
1874                .pending_pings
1875                .get_value(&glean, Some("metrics"))
1876                .unwrap()
1877        );
1878    }
1879
1880    #[test]
1881    fn maximum_wait_attemps_is_enforced() {
1882        let (glean, dir) = new_glean(None);
1883
1884        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1885
1886        // Define a max_wait_attemps policy, this is disabled for tests by default.
1887        let max_wait_attempts = 3;
1888        upload_manager
1889            .policy
1890            .set_max_wait_attempts(Some(max_wait_attempts));
1891
1892        // Add a rate limiter to the upload mangager with max of 1 ping 5secs.
1893        //
1894        // We arbitrarily set the maximum pings per interval to a very low number,
1895        // when the rate limiter reaches it's limit get_upload_task returns a PingUploadTask::Wait,
1896        // which will allow us to test the limitations around returning too many of those in a row.
1897        let secs_per_interval = 5;
1898        let max_pings_per_interval = 1;
1899        upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
1900
1901        // Enqueue two pings
1902        upload_manager.enqueue_ping(
1903            &glean,
1904            PingPayload {
1905                document_id: Uuid::new_v4().to_string(),
1906                upload_path: PATH.into(),
1907                json_body: "".into(),
1908                headers: None,
1909                body_has_info_sections: true,
1910                ping_name: "ping-name".into(),
1911                uploader_capabilities: vec![],
1912            },
1913        );
1914        upload_manager.enqueue_ping(
1915            &glean,
1916            PingPayload {
1917                document_id: Uuid::new_v4().to_string(),
1918                upload_path: PATH.into(),
1919                json_body: "".into(),
1920                headers: None,
1921                body_has_info_sections: true,
1922                ping_name: "ping-name".into(),
1923                uploader_capabilities: vec![],
1924            },
1925        );
1926
1927        // Get the first ping, it should be returned normally.
1928        match upload_manager.get_upload_task(&glean, false) {
1929            PingUploadTask::Upload { .. } => {}
1930            _ => panic!("Expected upload manager to return the next request!"),
1931        }
1932
1933        // Try to get the next ping,
1934        // we should be throttled and thus get a PingUploadTask::Wait.
1935        // Check that we are indeed allowed to get this response as many times as expected.
1936        for _ in 0..max_wait_attempts {
1937            let task = upload_manager.get_upload_task(&glean, false);
1938            assert!(task.is_wait());
1939        }
1940
1941        // Check that after we get PingUploadTask::Wait the allowed number of times,
1942        // we then get PingUploadTask::Done.
1943        assert_eq!(
1944            upload_manager.get_upload_task(&glean, false),
1945            PingUploadTask::done()
1946        );
1947
1948        // Wait for the rate limiter to allow upload tasks again.
1949        thread::sleep(Duration::from_secs(secs_per_interval));
1950
1951        // Check that we are allowed again to get pings.
1952        let task = upload_manager.get_upload_task(&glean, false);
1953        assert!(task.is_upload());
1954
1955        // And once we are done we don't need to wait anymore.
1956        assert_eq!(
1957            upload_manager.get_upload_task(&glean, false),
1958            PingUploadTask::done()
1959        );
1960    }
1961
1962    #[test]
1963    fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
1964        let (glean, dir) = new_glean(None);
1965        let upload_manager = PingUploadManager::new(dir.path(), "test");
1966        match upload_manager.get_upload_task(&glean, false) {
1967            PingUploadTask::Wait { time } => {
1968                assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
1969            }
1970            _ => panic!("Expected upload manager to return a wait task!"),
1971        };
1972    }
1973
1974    #[test]
1975    fn cannot_enqueue_ping_while_its_being_processed() {
1976        let (glean, dir) = new_glean(None);
1977
1978        let upload_manager = PingUploadManager::no_policy(dir.path());
1979
1980        // Enqueue a ping and start processing it
1981        let identifier = &Uuid::new_v4();
1982        let ping = PingPayload {
1983            document_id: identifier.to_string(),
1984            upload_path: PATH.into(),
1985            json_body: "".into(),
1986            headers: None,
1987            body_has_info_sections: true,
1988            ping_name: "ping-name".into(),
1989            uploader_capabilities: vec![],
1990        };
1991        upload_manager.enqueue_ping(&glean, ping);
1992        assert!(upload_manager.get_upload_task(&glean, false).is_upload());
1993
1994        // Attempt to re-enqueue the same ping
1995        let ping = PingPayload {
1996            document_id: identifier.to_string(),
1997            upload_path: PATH.into(),
1998            json_body: "".into(),
1999            headers: None,
2000            body_has_info_sections: true,
2001            ping_name: "ping-name".into(),
2002            uploader_capabilities: vec![],
2003        };
2004        upload_manager.enqueue_ping(&glean, ping);
2005
2006        // No new pings should have been enqueued so the upload task is Done.
2007        assert_eq!(
2008            upload_manager.get_upload_task(&glean, false),
2009            PingUploadTask::done()
2010        );
2011
2012        // Process the upload response
2013        upload_manager.process_ping_upload_response(
2014            &glean,
2015            &identifier.to_string(),
2016            UploadResult::http_status(200),
2017        );
2018    }
2019}