glean_core/upload/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Manages the pending pings queue and directory.
6//!
7//! * Keeps track of pending pings, loading any unsent ping from disk on startup;
8//! * Exposes [`get_upload_task`](PingUploadManager::get_upload_task) API for
9//!   the platform layer to request next upload task;
10//! * Exposes
11//!   [`process_ping_upload_response`](PingUploadManager::process_ping_upload_response)
12//!   API to check the HTTP response from the ping upload and either delete the
13//!   corresponding ping from disk or re-enqueue it for sending.
14
15use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::path::PathBuf;
18use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
19use std::sync::{Arc, RwLock, RwLockWriteGuard};
20use std::thread;
21use std::time::{Duration, Instant};
22
23use chrono::Utc;
24
25use crate::error::ErrorKind;
26use crate::TimerId;
27use crate::{internal_metrics::UploadMetrics, Glean};
28pub use directory::process_metadata;
29use directory::{PingDirectoryManager, PingPayloadsByDirectory};
30use policy::Policy;
31use request::create_date_header_value;
32
33pub use directory::{PingMetadata, PingPayload};
34pub use request::{HeaderMap, PingRequest};
35pub use result::{UploadResult, UploadTaskAction};
36
37mod directory;
38mod policy;
39mod request;
40mod result;
41
42const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; // in milliseconds
43
44#[derive(Debug)]
45struct RateLimiter {
46    /// The instant the current interval has started.
47    started: Option<Instant>,
48    /// The count for the current interval.
49    count: u32,
50    /// The duration of each interval.
51    interval: Duration,
52    /// The maximum count per interval.
53    max_count: u32,
54}
55
56/// An enum to represent the current state of the RateLimiter.
57#[derive(PartialEq)]
58enum RateLimiterState {
59    /// The RateLimiter has not reached the maximum count and is still incrementing.
60    Incrementing,
61    /// The RateLimiter has reached the maximum count for the  current interval.
62    ///
63    /// This variant contains the remaining time (in milliseconds)
64    /// until the rate limiter is not throttled anymore.
65    Throttled(u64),
66}
67
68impl RateLimiter {
69    pub fn new(interval: Duration, max_count: u32) -> Self {
70        Self {
71            started: None,
72            count: 0,
73            interval,
74            max_count,
75        }
76    }
77
78    fn reset(&mut self) {
79        self.started = Some(Instant::now());
80        self.count = 0;
81    }
82
83    fn elapsed(&self) -> Duration {
84        self.started.unwrap().elapsed()
85    }
86
87    // The counter should reset if
88    //
89    // 1. It has never started;
90    // 2. It has been started more than the interval time ago;
91    // 3. Something goes wrong while trying to calculate the elapsed time since the last reset.
92    fn should_reset(&self) -> bool {
93        if self.started.is_none() {
94            return true;
95        }
96
97        // Safe unwrap, we already stated that `self.started` is not `None` above.
98        if self.elapsed() > self.interval {
99            return true;
100        }
101
102        false
103    }
104
105    /// Tries to increment the internal counter.
106    ///
107    /// # Returns
108    ///
109    /// The current state of the RateLimiter.
110    pub fn get_state(&mut self) -> RateLimiterState {
111        if self.should_reset() {
112            self.reset();
113        }
114
115        if self.count == self.max_count {
116            // Note that `remining` can't be a negative number because we just called `reset`,
117            // which will check if it is and reset if so.
118            let remaining = self.interval.as_millis() - self.elapsed().as_millis();
119            return RateLimiterState::Throttled(
120                remaining
121                    .try_into()
122                    .unwrap_or(self.interval.as_secs() * 1000),
123            );
124        }
125
126        self.count += 1;
127        RateLimiterState::Incrementing
128    }
129}
130
131/// An enum representing the possible upload tasks to be performed by an uploader.
132///
133/// When asking for the next ping request to upload,
134/// the requester may receive one out of three possible tasks.
135#[derive(PartialEq, Eq, Debug)]
136pub enum PingUploadTask {
137    /// An upload task
138    Upload {
139        /// The ping request for upload
140        /// See [`PingRequest`](struct.PingRequest.html) for more information.
141        request: PingRequest,
142    },
143
144    /// A flag signaling that the pending pings directories are not done being processed,
145    /// thus the requester should wait and come back later.
146    Wait {
147        /// The time in milliseconds
148        /// the requester should wait before requesting a new task.
149        time: u64,
150    },
151
152    /// A flag signaling that requester doesn't need to request any more upload tasks at this moment.
153    ///
154    /// There are three possibilities for this scenario:
155    /// * Pending pings queue is empty, no more pings to request;
156    /// * Requester has gotten more than MAX_WAIT_ATTEMPTS (3, by default) `PingUploadTask::Wait` responses in a row;
157    /// * Requester has reported more than MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW
158    ///   recoverable upload failures on the same uploading window (see below)
159    ///   and should stop requesting at this moment.
160    ///
161    /// An "uploading window" starts when a requester gets a new
162    /// `PingUploadTask::Upload(PingRequest)` response and finishes when they
163    /// finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response.
164    Done {
165        #[doc(hidden)]
166        /// Unused field. Required because UniFFI can't handle variants without fields.
167        unused: i8,
168    },
169}
170
171impl PingUploadTask {
172    /// Whether the current task is an upload task.
173    pub fn is_upload(&self) -> bool {
174        matches!(self, PingUploadTask::Upload { .. })
175    }
176
177    /// Whether the current task is wait task.
178    pub fn is_wait(&self) -> bool {
179        matches!(self, PingUploadTask::Wait { .. })
180    }
181
182    pub(crate) fn done() -> Self {
183        PingUploadTask::Done { unused: 0 }
184    }
185}
186
187/// Manages the pending pings queue and directory.
188#[derive(Debug)]
189pub struct PingUploadManager {
190    /// A FIFO queue storing a `PingRequest` for each pending ping.
191    queue: RwLock<VecDeque<PingRequest>>,
192    /// A manager for the pending pings directories.
193    directory_manager: PingDirectoryManager,
194    /// A flag signaling if we are done processing the pending pings directories.
195    processed_pending_pings: Arc<AtomicBool>,
196    /// A vector to store the pending pings processed off-thread.
197    cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
198    /// The number of upload failures for the current uploading window.
199    recoverable_failure_count: AtomicU32,
200    /// The number or times in a row a user has received a `PingUploadTask::Wait` response.
201    wait_attempt_count: AtomicU32,
202    /// A ping counter to help rate limit the ping uploads.
203    ///
204    /// To keep resource usage in check,
205    /// we may want to limit the amount of pings sent in a given interval.
206    rate_limiter: Option<RwLock<RateLimiter>>,
207    /// The name of the programming language used by the binding creating this instance of PingUploadManager.
208    ///
209    /// This will be used to build the value User-Agent header for each ping request.
210    language_binding_name: String,
211    /// Metrics related to ping uploading.
212    upload_metrics: UploadMetrics,
213    /// Policies for ping storage, uploading and requests.
214    policy: Policy,
215
216    in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
217}
218
219impl PingUploadManager {
220    /// Creates a new PingUploadManager.
221    ///
222    /// # Arguments
223    ///
224    /// * `data_path` - Path to the pending pings directory.
225    /// * `language_binding_name` - The name of the language binding calling this managers instance.
226    ///
227    /// # Panics
228    ///
229    /// Will panic if unable to spawn a new thread.
230    pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
231        Self {
232            queue: RwLock::new(VecDeque::new()),
233            directory_manager: PingDirectoryManager::new(data_path),
234            processed_pending_pings: Arc::new(AtomicBool::new(false)),
235            cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
236            recoverable_failure_count: AtomicU32::new(0),
237            wait_attempt_count: AtomicU32::new(0),
238            rate_limiter: None,
239            language_binding_name: language_binding_name.into(),
240            upload_metrics: UploadMetrics::new(),
241            policy: Policy::default(),
242            in_flight: RwLock::new(HashMap::default()),
243        }
244    }
245
246    /// Spawns a new thread and processes the pending pings directories,
247    /// filling up the queue with whatever pings are in there.
248    ///
249    /// # Returns
250    ///
251    /// The `JoinHandle` to the spawned thread
252    pub fn scan_pending_pings_directories(
253        &self,
254        trigger_upload: bool,
255    ) -> std::thread::JoinHandle<()> {
256        let local_manager = self.directory_manager.clone();
257        let local_cached_pings = self.cached_pings.clone();
258        let local_flag = self.processed_pending_pings.clone();
259        thread::Builder::new()
260            .name("glean.ping_directory_manager.process_dir".to_string())
261            .spawn(move || {
262                {
263                    // Be sure to drop local_cached_pings lock before triggering upload.
264                    let mut local_cached_pings = local_cached_pings
265                        .write()
266                        .expect("Can't write to pending pings cache.");
267                    local_cached_pings.extend(local_manager.process_dirs());
268                    local_flag.store(true, Ordering::SeqCst);
269                }
270                if trigger_upload {
271                    crate::dispatcher::launch(|| {
272                        if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok())
273                        {
274                            if let Err(e) = state.callbacks.trigger_upload() {
275                                log::error!(
276                                    "Triggering upload after pending ping scan failed. Error: {}",
277                                    e
278                                );
279                            }
280                        }
281                    });
282                }
283            })
284            .expect("Unable to spawn thread to process pings directories.")
285    }
286
287    /// Creates a new upload manager with no limitations, for tests.
288    #[cfg(test)]
289    pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
290        let mut upload_manager = Self::new(data_path, "Test");
291
292        // Disable all policies for tests, if necessary individuals tests can re-enable them.
293        upload_manager.policy.set_max_recoverable_failures(None);
294        upload_manager.policy.set_max_wait_attempts(None);
295        upload_manager.policy.set_max_ping_body_size(None);
296        upload_manager
297            .policy
298            .set_max_pending_pings_directory_size(None);
299        upload_manager.policy.set_max_pending_pings_count(None);
300
301        // When building for tests, always scan the pending pings directories and do it sync.
302        upload_manager
303            .scan_pending_pings_directories(false)
304            .join()
305            .unwrap();
306
307        upload_manager
308    }
309
310    fn processed_pending_pings(&self) -> bool {
311        self.processed_pending_pings.load(Ordering::SeqCst)
312    }
313
314    fn recoverable_failure_count(&self) -> u32 {
315        self.recoverable_failure_count.load(Ordering::SeqCst)
316    }
317
318    fn wait_attempt_count(&self) -> u32 {
319        self.wait_attempt_count.load(Ordering::SeqCst)
320    }
321
322    /// Attempts to build a ping request from a ping file payload.
323    ///
324    /// Returns the `PingRequest` or `None` if unable to build,
325    /// in which case it will delete the ping file and record an error.
326    fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
327        let PingPayload {
328            document_id,
329            upload_path: path,
330            json_body: body,
331            headers,
332            body_has_info_sections,
333            ping_name,
334            uploader_capabilities,
335        } = ping;
336        let mut request = PingRequest::builder(
337            &self.language_binding_name,
338            self.policy.max_ping_body_size(),
339        )
340        .document_id(&document_id)
341        .path(path)
342        .body(body)
343        .body_has_info_sections(body_has_info_sections)
344        .ping_name(ping_name)
345        .uploader_capabilities(uploader_capabilities);
346
347        if let Some(headers) = headers {
348            request = request.headers(headers);
349        }
350
351        match request.build() {
352            Ok(request) => Some(request),
353            Err(e) => {
354                log::warn!("Error trying to build ping request: {}", e);
355                self.directory_manager.delete_file(&document_id);
356
357                // Record the error.
358                // Currently the only possible error is PingBodyOverflow.
359                if let ErrorKind::PingBodyOverflow(s) = e.kind() {
360                    self.upload_metrics
361                        .discarded_exceeding_pings_size
362                        .accumulate_sync(glean, *s as i64 / 1024);
363                }
364
365                None
366            }
367        }
368    }
369
370    /// Enqueue a ping for upload.
371    pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
372        let mut queue = self
373            .queue
374            .write()
375            .expect("Can't write to pending pings queue.");
376
377        let PingPayload {
378            ref document_id,
379            upload_path: ref path,
380            ..
381        } = ping;
382        // Checks if a ping with this `document_id` is already enqueued.
383        if queue
384            .iter()
385            .any(|request| request.document_id.as_str() == document_id)
386        {
387            log::warn!(
388                "Attempted to enqueue a duplicate ping {} at {}.",
389                document_id,
390                path
391            );
392            return;
393        }
394
395        {
396            let in_flight = self.in_flight.read().unwrap();
397            if in_flight.contains_key(document_id) {
398                log::warn!(
399                    "Attempted to enqueue an in-flight ping {} at {}.",
400                    document_id,
401                    path
402                );
403                self.upload_metrics
404                    .in_flight_pings_dropped
405                    .add_sync(glean, 0);
406                return;
407            }
408        }
409
410        log::trace!("Enqueuing ping {} at {}", document_id, path);
411        if let Some(request) = self.build_ping_request(glean, ping) {
412            queue.push_back(request)
413        }
414    }
415
416    /// Enqueues pings that might have been cached.
417    ///
418    /// The size of the PENDING_PINGS_DIRECTORY directory will be calculated
419    /// (by accumulating each ping's size in that directory)
420    /// and in case we exceed the quota, defined by the `quota` arg,
421    /// outstanding pings get deleted and are not enqueued.
422    ///
423    /// The size of the DELETION_REQUEST_PINGS_DIRECTORY will not be calculated
424    /// and no deletion-request pings will be deleted. Deletion request pings
425    /// are not very common and usually don't contain any data,
426    /// we don't expect that directory to ever reach quota.
427    /// Most importantly, we don't want to ever delete deletion-request pings.
428    ///
429    /// # Arguments
430    ///
431    /// * `glean` - The Glean object holding the database.
432    fn enqueue_cached_pings(&self, glean: &Glean) {
433        let mut cached_pings = self
434            .cached_pings
435            .write()
436            .expect("Can't write to pending pings cache.");
437
438        if cached_pings.len() > 0 {
439            let mut pending_pings_directory_size: u64 = 0;
440            let mut pending_pings_count = 0;
441            let mut deleting = false;
442
443            let total = cached_pings.pending_pings.len() as u64;
444            self.upload_metrics
445                .pending_pings
446                .add_sync(glean, total.try_into().unwrap_or(0));
447
448            if total > self.policy.max_pending_pings_count() {
449                log::warn!(
450                    "More than {} pending pings in the directory, will delete {} old pings.",
451                    self.policy.max_pending_pings_count(),
452                    total - self.policy.max_pending_pings_count()
453                );
454            }
455
456            // The pending pings vector is sorted by date in ascending order (oldest -> newest).
457            // We need to calculate the size of the pending pings directory
458            // and delete the **oldest** pings in case quota is reached.
459            // Thus, we reverse the order of the pending pings vector,
460            // so that we iterate in descending order (newest -> oldest).
461            cached_pings.pending_pings.reverse();
462            cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
463                pending_pings_count += 1;
464                pending_pings_directory_size += file_size;
465
466                // We don't want to spam the log for every ping over the quota.
467                if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
468                    log::warn!(
469                        "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
470                        self.policy.max_pending_pings_directory_size()
471                    );
472                    deleting = true;
473                }
474
475                // Once we reach the number of allowed pings we start deleting,
476                // no matter what size.
477                // We already log this before the loop.
478                if pending_pings_count > self.policy.max_pending_pings_count() {
479                    deleting = true;
480                }
481
482                if deleting && self.directory_manager.delete_file(document_id) {
483                    self.upload_metrics
484                        .deleted_pings_after_quota_hit
485                        .add_sync(glean, 1);
486                    return false;
487                }
488
489                true
490            });
491            // After calculating the size of the pending pings directory,
492            // we record the calculated number and reverse the pings array back for enqueueing.
493            cached_pings.pending_pings.reverse();
494            self.upload_metrics
495                .pending_pings_directory_size
496                .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
497
498            // Enqueue the remaining pending pings and
499            // enqueue all deletion-request pings.
500            cached_pings
501                .deletion_request_pings
502                .drain(..)
503                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
504            cached_pings
505                .pending_pings
506                .drain(..)
507                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
508        }
509    }
510
511    /// Adds rate limiting capability to this upload manager.
512    ///
513    /// The rate limiter will limit the amount of calls to `get_upload_task` per interval.
514    ///
515    /// Setting this will restart count and timer in case there was a previous rate limiter set
516    /// (e.g. if we have reached the current limit and call this function, we start counting again
517    /// and the caller is allowed to asks for tasks).
518    ///
519    /// # Arguments
520    ///
521    /// * `interval` - the amount of seconds in each rate limiting window.
522    /// * `max_tasks` - the maximum amount of task requests allowed per interval.
523    pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
524        self.rate_limiter = Some(RwLock::new(RateLimiter::new(
525            Duration::from_secs(interval),
526            max_tasks,
527        )));
528    }
529
530    /// Reads a ping file, creates a `PingRequest` and adds it to the queue.
531    ///
532    /// Duplicate requests won't be added.
533    ///
534    /// # Arguments
535    ///
536    /// * `glean` - The Glean object holding the database.
537    /// * `document_id` - The UUID of the ping in question.
538    pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
539        if let Some(ping) = self.directory_manager.process_file(document_id) {
540            self.enqueue_ping(glean, ping);
541        }
542    }
543
544    /// Clears the pending pings queue, leaves the deletion-request pings.
545    pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
546        log::trace!("Clearing ping queue");
547        let mut queue = self
548            .queue
549            .write()
550            .expect("Can't write to pending pings queue.");
551
552        queue.retain(|ping| ping.is_deletion_request());
553        log::trace!(
554            "{} pings left in the queue (only deletion-request expected)",
555            queue.len()
556        );
557        queue
558    }
559
560    fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
561        // Helper to decide whether to return PingUploadTask::Wait or PingUploadTask::Done.
562        //
563        // We want to limit the amount of PingUploadTask::Wait returned in a row,
564        // in case we reach MAX_WAIT_ATTEMPTS we want to actually return PingUploadTask::Done.
565        let wait_or_done = |time: u64| {
566            self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
567            if self.wait_attempt_count() > self.policy.max_wait_attempts() {
568                PingUploadTask::done()
569            } else {
570                PingUploadTask::Wait { time }
571            }
572        };
573
574        if !self.processed_pending_pings() {
575            log::info!(
576                "Tried getting an upload task, but processing is ongoing. Will come back later."
577            );
578            return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
579        }
580
581        // This is a no-op in case there are no cached pings.
582        self.enqueue_cached_pings(glean);
583
584        if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
585            log::warn!(
586                "Reached maximum recoverable failures for the current uploading window. You are done."
587            );
588            return PingUploadTask::done();
589        }
590
591        let mut queue = self
592            .queue
593            .write()
594            .expect("Can't write to pending pings queue.");
595        match queue.front() {
596            Some(request) => {
597                if let Some(rate_limiter) = &self.rate_limiter {
598                    let mut rate_limiter = rate_limiter
599                        .write()
600                        .expect("Can't write to the rate limiter.");
601                    if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
602                        log::info!(
603                            "Tried getting an upload task, but we are throttled at the moment."
604                        );
605                        return wait_or_done(remaining);
606                    }
607                }
608
609                log::info!(
610                    "New upload task with id {} (path: {})",
611                    request.document_id,
612                    request.path
613                );
614
615                if log_ping {
616                    if let Some(body) = request.pretty_body() {
617                        chunked_log_info(&request.path, &body);
618                    } else {
619                        chunked_log_info(&request.path, "<invalid ping payload>");
620                    }
621                }
622
623                {
624                    // Synchronous timer starts.
625                    // We're in the uploader thread anyway.
626                    // But also: No data is stored on disk.
627                    let mut in_flight = self.in_flight.write().unwrap();
628                    let success_id = self.upload_metrics.send_success.start_sync();
629                    let failure_id = self.upload_metrics.send_failure.start_sync();
630                    in_flight.insert(request.document_id.clone(), (success_id, failure_id));
631                }
632
633                let mut request = queue.pop_front().unwrap();
634
635                // Adding the `Date` header just before actual upload happens.
636                request
637                    .headers
638                    .insert("Date".to_string(), create_date_header_value(Utc::now()));
639
640                PingUploadTask::Upload { request }
641            }
642            None => {
643                log::info!("No more pings to upload! You are done.");
644                PingUploadTask::done()
645            }
646        }
647    }
648
649    /// Gets the next `PingUploadTask`.
650    ///
651    /// # Arguments
652    ///
653    /// * `glean` - The Glean object holding the database.
654    /// * `log_ping` - Whether to log the ping before returning.
655    ///
656    /// # Returns
657    ///
658    /// The next [`PingUploadTask`](enum.PingUploadTask.html).
659    pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
660        let task = self.get_upload_task_internal(glean, log_ping);
661
662        if !task.is_wait() && self.wait_attempt_count() > 0 {
663            self.wait_attempt_count.store(0, Ordering::SeqCst);
664        }
665
666        if !task.is_upload() && self.recoverable_failure_count() > 0 {
667            self.recoverable_failure_count.store(0, Ordering::SeqCst);
668        }
669
670        task
671    }
672
673    /// Processes the response from an attempt to upload a ping.
674    ///
675    /// Based on the HTTP status of said response,
676    /// the possible outcomes are:
677    ///
678    /// * **200 - 299 Success**
679    ///   Any status on the 2XX range is considered a succesful upload,
680    ///   which means the corresponding ping file can be deleted.
681    ///   _Known 2XX status:_
682    ///   * 200 - OK. Request accepted into the pipeline.
683    ///
684    /// * **400 - 499 Unrecoverable error**
685    ///   Any status on the 4XX range means something our client did is not correct.
686    ///   It is unlikely that the client is going to recover from this by retrying,
687    ///   so in this case the corresponding ping file can also be deleted.
688    ///   _Known 4XX status:_
689    ///   * 404 - not found - POST/PUT to an unknown namespace
690    ///   * 405 - wrong request type (anything other than POST/PUT)
691    ///   * 411 - missing content-length header
692    ///   * 413 - request body too large Note that if we have badly-behaved clients that
693    ///           retry on 4XX, we should send back 202 on body/path too long).
694    ///   * 414 - request path too long (See above)
695    ///
696    /// * **Any other error**
697    ///   For any other error, a warning is logged and the ping is re-enqueued.
698    ///   _Known other errors:_
699    ///   * 500 - internal error
700    ///
701    /// # Note
702    ///
703    /// The disk I/O performed by this function is not done off-thread,
704    /// as it is expected to be called off-thread by the platform.
705    ///
706    /// # Arguments
707    ///
708    /// * `glean` - The Glean object holding the database.
709    /// * `document_id` - The UUID of the ping in question.
710    /// * `status` - The HTTP status of the response.
711    pub fn process_ping_upload_response(
712        &self,
713        glean: &Glean,
714        document_id: &str,
715        status: UploadResult,
716    ) -> UploadTaskAction {
717        use UploadResult::*;
718
719        let stop_time = time::precise_time_ns();
720
721        if let Some(label) = status.get_label() {
722            let metric = self.upload_metrics.ping_upload_failure.get(label);
723            metric.add_sync(glean, 1);
724        }
725
726        let send_ids = {
727            let mut lock = self.in_flight.write().unwrap();
728            lock.remove(document_id)
729        };
730
731        if send_ids.is_none() {
732            self.upload_metrics.missing_send_ids.add_sync(glean, 1);
733        }
734
735        match status {
736            HttpStatus { code } if (200..=299).contains(&code) => {
737                log::info!("Ping {} successfully sent {}.", document_id, code);
738                if let Some((success_id, failure_id)) = send_ids {
739                    self.upload_metrics
740                        .send_success
741                        .set_stop_and_accumulate(glean, success_id, stop_time);
742                    self.upload_metrics.send_failure.cancel_sync(failure_id);
743                }
744                self.directory_manager.delete_file(document_id);
745            }
746
747            UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
748                log::warn!(
749                    "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
750                    document_id,
751                    status
752                );
753                if let Some((success_id, failure_id)) = send_ids {
754                    self.upload_metrics.send_success.cancel_sync(success_id);
755                    self.upload_metrics
756                        .send_failure
757                        .set_stop_and_accumulate(glean, failure_id, stop_time);
758                }
759                self.directory_manager.delete_file(document_id);
760            }
761
762            RecoverableFailure { .. } | HttpStatus { .. } => {
763                log::warn!(
764                    "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
765                    document_id,
766                    status
767                );
768                if let Some((success_id, failure_id)) = send_ids {
769                    self.upload_metrics.send_success.cancel_sync(success_id);
770                    self.upload_metrics
771                        .send_failure
772                        .set_stop_and_accumulate(glean, failure_id, stop_time);
773                }
774                self.enqueue_ping_from_file(glean, document_id);
775                self.recoverable_failure_count
776                    .fetch_add(1, Ordering::SeqCst);
777            }
778
779            Done { .. } => {
780                log::debug!("Uploader signaled Done. Exiting.");
781                if let Some((success_id, failure_id)) = send_ids {
782                    self.upload_metrics.send_success.cancel_sync(success_id);
783                    self.upload_metrics.send_failure.cancel_sync(failure_id);
784                }
785                return UploadTaskAction::End;
786            }
787        };
788
789        UploadTaskAction::Next
790    }
791}
792
793/// Splits log message into chunks on Android.
794#[cfg(target_os = "android")]
795pub fn chunked_log_info(path: &str, payload: &str) {
796    // Since the logcat ring buffer size is configurable, but it's 'max payload' size is not,
797    // we must break apart long pings into chunks no larger than the max payload size of 4076b.
798    // We leave some head space for our prefix.
799    const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
800
801    // If the length of the ping will fit within one logcat payload, then we can
802    // short-circuit here and avoid some overhead, otherwise we must split up the
803    // message so that we don't truncate it.
804    if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
805        log::info!("Glean ping to URL: {}\n{}", path, payload);
806        return;
807    }
808
809    // Otherwise we break it apart into chunks of smaller size,
810    // prefixing it with the path and a counter.
811    let mut start = 0;
812    let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
813    let mut chunk_idx = 1;
814    // Might be off by 1 on edge cases, but do we really care?
815    let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
816
817    while end < payload.len() {
818        // Find char boundary from the end.
819        // It's UTF-8, so it is within 4 bytes from here.
820        for _ in 0..4 {
821            if payload.is_char_boundary(end) {
822                break;
823            }
824            end -= 1;
825        }
826
827        log::info!(
828            "Glean ping to URL: {} [Part {} of {}]\n{}",
829            path,
830            chunk_idx,
831            total_chunks,
832            &payload[start..end]
833        );
834
835        // Move on with the string
836        start = end;
837        end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
838        chunk_idx += 1;
839    }
840
841    // Print any suffix left
842    if start < payload.len() {
843        log::info!(
844            "Glean ping to URL: {} [Part {} of {}]\n{}",
845            path,
846            chunk_idx,
847            total_chunks,
848            &payload[start..]
849        );
850    }
851}
852
853/// Logs payload in one go (all other OS).
854#[cfg(not(target_os = "android"))]
855pub fn chunked_log_info(_path: &str, payload: &str) {
856    log::info!("{}", payload)
857}
858
859#[cfg(test)]
860mod test {
861    use uuid::Uuid;
862
863    use super::*;
864    use crate::metrics::PingType;
865    use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
866
867    const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
868
869    #[test]
870    fn doesnt_error_when_there_are_no_pending_pings() {
871        let (glean, _t) = new_glean(None);
872
873        // Try and get the next request.
874        // Verify request was not returned
875        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
876    }
877
878    #[test]
879    fn returns_ping_request_when_there_is_one() {
880        let (glean, dir) = new_glean(None);
881
882        let upload_manager = PingUploadManager::no_policy(dir.path());
883
884        // Enqueue a ping
885        upload_manager.enqueue_ping(
886            &glean,
887            PingPayload {
888                document_id: Uuid::new_v4().to_string(),
889                upload_path: PATH.into(),
890                json_body: "".into(),
891                headers: None,
892                body_has_info_sections: true,
893                ping_name: "ping-name".into(),
894                uploader_capabilities: vec![],
895            },
896        );
897
898        // Try and get the next request.
899        // Verify request was returned
900        let task = upload_manager.get_upload_task(&glean, false);
901        assert!(task.is_upload());
902    }
903
904    #[test]
905    fn returns_as_many_ping_requests_as_there_are() {
906        let (glean, dir) = new_glean(None);
907
908        let upload_manager = PingUploadManager::no_policy(dir.path());
909
910        // Enqueue a ping multiple times
911        let n = 10;
912        for _ in 0..n {
913            upload_manager.enqueue_ping(
914                &glean,
915                PingPayload {
916                    document_id: Uuid::new_v4().to_string(),
917                    upload_path: PATH.into(),
918                    json_body: "".into(),
919                    headers: None,
920                    body_has_info_sections: true,
921                    ping_name: "ping-name".into(),
922                    uploader_capabilities: vec![],
923                },
924            );
925        }
926
927        // Verify a request is returned for each submitted ping
928        for _ in 0..n {
929            let task = upload_manager.get_upload_task(&glean, false);
930            assert!(task.is_upload());
931        }
932
933        // Verify that after all requests are returned, none are left
934        assert_eq!(
935            upload_manager.get_upload_task(&glean, false),
936            PingUploadTask::done()
937        );
938    }
939
940    #[test]
941    fn limits_the_number_of_pings_when_there_is_rate_limiting() {
942        let (glean, dir) = new_glean(None);
943
944        let mut upload_manager = PingUploadManager::no_policy(dir.path());
945
946        // Add a rate limiter to the upload mangager with max of 10 pings every 3 seconds.
947        let max_pings_per_interval = 10;
948        upload_manager.set_rate_limiter(3, 10);
949
950        // Enqueue the max number of pings allowed per uploading window
951        for _ in 0..max_pings_per_interval {
952            upload_manager.enqueue_ping(
953                &glean,
954                PingPayload {
955                    document_id: Uuid::new_v4().to_string(),
956                    upload_path: PATH.into(),
957                    json_body: "".into(),
958                    headers: None,
959                    body_has_info_sections: true,
960                    ping_name: "ping-name".into(),
961                    uploader_capabilities: vec![],
962                },
963            );
964        }
965
966        // Verify a request is returned for each submitted ping
967        for _ in 0..max_pings_per_interval {
968            let task = upload_manager.get_upload_task(&glean, false);
969            assert!(task.is_upload());
970        }
971
972        // Enqueue just one more ping
973        upload_manager.enqueue_ping(
974            &glean,
975            PingPayload {
976                document_id: Uuid::new_v4().to_string(),
977                upload_path: PATH.into(),
978                json_body: "".into(),
979                headers: None,
980                body_has_info_sections: true,
981                ping_name: "ping-name".into(),
982                uploader_capabilities: vec![],
983            },
984        );
985
986        // Verify that we are indeed told to wait because we are at capacity
987        match upload_manager.get_upload_task(&glean, false) {
988            PingUploadTask::Wait { time } => {
989                // Wait for the uploading window to reset
990                thread::sleep(Duration::from_millis(time));
991            }
992            _ => panic!("Expected upload manager to return a wait task!"),
993        };
994
995        let task = upload_manager.get_upload_task(&glean, false);
996        assert!(task.is_upload());
997    }
998
999    #[test]
1000    fn clearing_the_queue_works_correctly() {
1001        let (glean, dir) = new_glean(None);
1002
1003        let upload_manager = PingUploadManager::no_policy(dir.path());
1004
1005        // Enqueue a ping multiple times
1006        for _ in 0..10 {
1007            upload_manager.enqueue_ping(
1008                &glean,
1009                PingPayload {
1010                    document_id: Uuid::new_v4().to_string(),
1011                    upload_path: PATH.into(),
1012                    json_body: "".into(),
1013                    headers: None,
1014                    body_has_info_sections: true,
1015                    ping_name: "ping-name".into(),
1016                    uploader_capabilities: vec![],
1017                },
1018            );
1019        }
1020
1021        // Clear the queue
1022        drop(upload_manager.clear_ping_queue());
1023
1024        // Verify there really isn't any ping in the queue
1025        assert_eq!(
1026            upload_manager.get_upload_task(&glean, false),
1027            PingUploadTask::done()
1028        );
1029    }
1030
1031    #[test]
1032    fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1033        let (mut glean, _t) = new_glean(None);
1034
1035        // Register a ping for testing
1036        let ping_type = PingType::new(
1037            "test",
1038            true,
1039            /* send_if_empty */ true,
1040            true,
1041            true,
1042            true,
1043            vec![],
1044            vec![],
1045            true,
1046            vec![],
1047        );
1048        glean.register_ping_type(&ping_type);
1049
1050        // Submit the ping multiple times
1051        let n = 10;
1052        for _ in 0..n {
1053            ping_type.submit_sync(&glean, None);
1054        }
1055
1056        glean
1057            .internal_pings
1058            .deletion_request
1059            .submit_sync(&glean, None);
1060
1061        // Clear the queue
1062        drop(glean.upload_manager.clear_ping_queue());
1063
1064        let upload_task = glean.get_upload_task();
1065        match upload_task {
1066            PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1067            _ => panic!("Expected upload manager to return the next request!"),
1068        }
1069
1070        // Verify there really isn't any other pings in the queue
1071        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1072    }
1073
1074    #[test]
1075    fn fills_up_queue_successfully_from_disk() {
1076        let (mut glean, dir) = new_glean(None);
1077
1078        // Register a ping for testing
1079        let ping_type = PingType::new(
1080            "test",
1081            true,
1082            /* send_if_empty */ true,
1083            true,
1084            true,
1085            true,
1086            vec![],
1087            vec![],
1088            true,
1089            vec![],
1090        );
1091        glean.register_ping_type(&ping_type);
1092
1093        // Submit the ping multiple times
1094        let n = 10;
1095        for _ in 0..n {
1096            ping_type.submit_sync(&glean, None);
1097        }
1098
1099        // Create a new upload manager pointing to the same data_path as the glean instance.
1100        let upload_manager = PingUploadManager::no_policy(dir.path());
1101
1102        // Verify the requests were properly enqueued
1103        for _ in 0..n {
1104            let task = upload_manager.get_upload_task(&glean, false);
1105            assert!(task.is_upload());
1106        }
1107
1108        // Verify that after all requests are returned, none are left
1109        assert_eq!(
1110            upload_manager.get_upload_task(&glean, false),
1111            PingUploadTask::done()
1112        );
1113    }
1114
1115    #[test]
1116    fn processes_correctly_success_upload_response() {
1117        let (mut glean, dir) = new_glean(None);
1118
1119        // Register a ping for testing
1120        let ping_type = PingType::new(
1121            "test",
1122            true,
1123            /* send_if_empty */ true,
1124            true,
1125            true,
1126            true,
1127            vec![],
1128            vec![],
1129            true,
1130            vec![],
1131        );
1132        glean.register_ping_type(&ping_type);
1133
1134        // Submit a ping
1135        ping_type.submit_sync(&glean, None);
1136
1137        // Get the pending ping directory path
1138        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1139
1140        // Get the submitted PingRequest
1141        match glean.get_upload_task() {
1142            PingUploadTask::Upload { request } => {
1143                // Simulate the processing of a sucessfull request
1144                let document_id = request.document_id;
1145                glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1146                // Verify file was deleted
1147                assert!(!pending_pings_dir.join(document_id).exists());
1148            }
1149            _ => panic!("Expected upload manager to return the next request!"),
1150        }
1151
1152        // Verify that after request is returned, none are left
1153        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1154    }
1155
1156    #[test]
1157    fn processes_correctly_client_error_upload_response() {
1158        let (mut glean, dir) = new_glean(None);
1159
1160        // Register a ping for testing
1161        let ping_type = PingType::new(
1162            "test",
1163            true,
1164            /* send_if_empty */ true,
1165            true,
1166            true,
1167            true,
1168            vec![],
1169            vec![],
1170            true,
1171            vec![],
1172        );
1173        glean.register_ping_type(&ping_type);
1174
1175        // Submit a ping
1176        ping_type.submit_sync(&glean, None);
1177
1178        // Get the pending ping directory path
1179        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1180
1181        // Get the submitted PingRequest
1182        match glean.get_upload_task() {
1183            PingUploadTask::Upload { request } => {
1184                // Simulate the processing of a client error
1185                let document_id = request.document_id;
1186                glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1187                // Verify file was deleted
1188                assert!(!pending_pings_dir.join(document_id).exists());
1189            }
1190            _ => panic!("Expected upload manager to return the next request!"),
1191        }
1192
1193        // Verify that after request is returned, none are left
1194        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1195    }
1196
1197    #[test]
1198    fn processes_correctly_server_error_upload_response() {
1199        let (mut glean, _t) = new_glean(None);
1200
1201        // Register a ping for testing
1202        let ping_type = PingType::new(
1203            "test",
1204            true,
1205            /* send_if_empty */ true,
1206            true,
1207            true,
1208            true,
1209            vec![],
1210            vec![],
1211            true,
1212            vec![],
1213        );
1214        glean.register_ping_type(&ping_type);
1215
1216        // Submit a ping
1217        ping_type.submit_sync(&glean, None);
1218
1219        // Get the submitted PingRequest
1220        match glean.get_upload_task() {
1221            PingUploadTask::Upload { request } => {
1222                // Simulate the processing of a client error
1223                let document_id = request.document_id;
1224                glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1225                // Verify this ping was indeed re-enqueued
1226                match glean.get_upload_task() {
1227                    PingUploadTask::Upload { request } => {
1228                        assert_eq!(document_id, request.document_id);
1229                    }
1230                    _ => panic!("Expected upload manager to return the next request!"),
1231                }
1232            }
1233            _ => panic!("Expected upload manager to return the next request!"),
1234        }
1235
1236        // Verify that after request is returned, none are left
1237        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1238    }
1239
1240    #[test]
1241    fn processes_correctly_unrecoverable_upload_response() {
1242        let (mut glean, dir) = new_glean(None);
1243
1244        // Register a ping for testing
1245        let ping_type = PingType::new(
1246            "test",
1247            true,
1248            /* send_if_empty */ true,
1249            true,
1250            true,
1251            true,
1252            vec![],
1253            vec![],
1254            true,
1255            vec![],
1256        );
1257        glean.register_ping_type(&ping_type);
1258
1259        // Submit a ping
1260        ping_type.submit_sync(&glean, None);
1261
1262        // Get the pending ping directory path
1263        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1264
1265        // Get the submitted PingRequest
1266        match glean.get_upload_task() {
1267            PingUploadTask::Upload { request } => {
1268                // Simulate the processing of a client error
1269                let document_id = request.document_id;
1270                glean.process_ping_upload_response(
1271                    &document_id,
1272                    UploadResult::unrecoverable_failure(),
1273                );
1274                // Verify file was deleted
1275                assert!(!pending_pings_dir.join(document_id).exists());
1276            }
1277            _ => panic!("Expected upload manager to return the next request!"),
1278        }
1279
1280        // Verify that after request is returned, none are left
1281        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1282    }
1283
1284    #[test]
1285    fn new_pings_are_added_while_upload_in_progress() {
1286        let (glean, dir) = new_glean(None);
1287
1288        let upload_manager = PingUploadManager::no_policy(dir.path());
1289
1290        let doc1 = Uuid::new_v4().to_string();
1291        let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1292
1293        let doc2 = Uuid::new_v4().to_string();
1294        let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1295
1296        // Enqueue a ping
1297        upload_manager.enqueue_ping(
1298            &glean,
1299            PingPayload {
1300                document_id: doc1.clone(),
1301                upload_path: path1,
1302                json_body: "".into(),
1303                headers: None,
1304                body_has_info_sections: true,
1305                ping_name: "test-ping".into(),
1306                uploader_capabilities: vec![],
1307            },
1308        );
1309
1310        // Try and get the first request.
1311        let req = match upload_manager.get_upload_task(&glean, false) {
1312            PingUploadTask::Upload { request } => request,
1313            _ => panic!("Expected upload manager to return the next request!"),
1314        };
1315        assert_eq!(doc1, req.document_id);
1316
1317        // Schedule the next one while the first one is "in progress"
1318        upload_manager.enqueue_ping(
1319            &glean,
1320            PingPayload {
1321                document_id: doc2.clone(),
1322                upload_path: path2,
1323                json_body: "".into(),
1324                headers: None,
1325                body_has_info_sections: true,
1326                ping_name: "test-ping".into(),
1327                uploader_capabilities: vec![],
1328            },
1329        );
1330
1331        // Mark as processed
1332        upload_manager.process_ping_upload_response(
1333            &glean,
1334            &req.document_id,
1335            UploadResult::http_status(200),
1336        );
1337
1338        // Get the second request.
1339        let req = match upload_manager.get_upload_task(&glean, false) {
1340            PingUploadTask::Upload { request } => request,
1341            _ => panic!("Expected upload manager to return the next request!"),
1342        };
1343        assert_eq!(doc2, req.document_id);
1344
1345        // Mark as processed
1346        upload_manager.process_ping_upload_response(
1347            &glean,
1348            &req.document_id,
1349            UploadResult::http_status(200),
1350        );
1351
1352        // ... and then we're done.
1353        assert_eq!(
1354            upload_manager.get_upload_task(&glean, false),
1355            PingUploadTask::done()
1356        );
1357    }
1358
1359    #[test]
1360    fn adds_debug_view_header_to_requests_when_tag_is_set() {
1361        let (mut glean, _t) = new_glean(None);
1362
1363        glean.set_debug_view_tag("valid-tag");
1364
1365        // Register a ping for testing
1366        let ping_type = PingType::new(
1367            "test",
1368            true,
1369            /* send_if_empty */ true,
1370            true,
1371            true,
1372            true,
1373            vec![],
1374            vec![],
1375            true,
1376            vec![],
1377        );
1378        glean.register_ping_type(&ping_type);
1379
1380        // Submit a ping
1381        ping_type.submit_sync(&glean, None);
1382
1383        // Get the submitted PingRequest
1384        match glean.get_upload_task() {
1385            PingUploadTask::Upload { request } => {
1386                assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1387            }
1388            _ => panic!("Expected upload manager to return the next request!"),
1389        }
1390    }
1391
1392    #[test]
1393    fn duplicates_are_not_enqueued() {
1394        let (glean, dir) = new_glean(None);
1395
1396        // Create a new upload manager so that we have access to its functions directly,
1397        // make it synchronous so we don't have to manually wait for the scanning to finish.
1398        let upload_manager = PingUploadManager::no_policy(dir.path());
1399
1400        let doc_id = Uuid::new_v4().to_string();
1401        let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1402
1403        // Try to enqueue a ping with the same doc_id twice
1404        upload_manager.enqueue_ping(
1405            &glean,
1406            PingPayload {
1407                document_id: doc_id.clone(),
1408                upload_path: path.clone(),
1409                json_body: "".into(),
1410                headers: None,
1411                body_has_info_sections: true,
1412                ping_name: "test-ping".into(),
1413                uploader_capabilities: vec![],
1414            },
1415        );
1416        upload_manager.enqueue_ping(
1417            &glean,
1418            PingPayload {
1419                document_id: doc_id,
1420                upload_path: path,
1421                json_body: "".into(),
1422                headers: None,
1423                body_has_info_sections: true,
1424                ping_name: "test-ping".into(),
1425                uploader_capabilities: vec![],
1426            },
1427        );
1428
1429        // Get a task once
1430        let task = upload_manager.get_upload_task(&glean, false);
1431        assert!(task.is_upload());
1432
1433        // There should be no more queued tasks
1434        assert_eq!(
1435            upload_manager.get_upload_task(&glean, false),
1436            PingUploadTask::done()
1437        );
1438    }
1439
1440    #[test]
1441    fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1442        let (mut glean, dir) = new_glean(None);
1443
1444        // Register a ping for testing
1445        let ping_type = PingType::new(
1446            "test",
1447            true,
1448            /* send_if_empty */ true,
1449            true,
1450            true,
1451            true,
1452            vec![],
1453            vec![],
1454            true,
1455            vec![],
1456        );
1457        glean.register_ping_type(&ping_type);
1458
1459        // Submit the ping multiple times
1460        let n = 5;
1461        for _ in 0..n {
1462            ping_type.submit_sync(&glean, None);
1463        }
1464
1465        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1466
1467        // Set a policy for max recoverable failures, this is usually disabled for tests.
1468        let max_recoverable_failures = 3;
1469        upload_manager
1470            .policy
1471            .set_max_recoverable_failures(Some(max_recoverable_failures));
1472
1473        // Return the max recoverable error failures in a row
1474        for _ in 0..max_recoverable_failures {
1475            match upload_manager.get_upload_task(&glean, false) {
1476                PingUploadTask::Upload { request } => {
1477                    upload_manager.process_ping_upload_response(
1478                        &glean,
1479                        &request.document_id,
1480                        UploadResult::recoverable_failure(),
1481                    );
1482                }
1483                _ => panic!("Expected upload manager to return the next request!"),
1484            }
1485        }
1486
1487        // Verify that after returning the max amount of recoverable failures,
1488        // we are done even though we haven't gotten all the enqueued requests.
1489        assert_eq!(
1490            upload_manager.get_upload_task(&glean, false),
1491            PingUploadTask::done()
1492        );
1493
1494        // Verify all requests are returned when we try again.
1495        for _ in 0..n {
1496            let task = upload_manager.get_upload_task(&glean, false);
1497            assert!(task.is_upload());
1498        }
1499    }
1500
1501    #[test]
1502    fn quota_is_enforced_when_enqueueing_cached_pings() {
1503        let (mut glean, dir) = new_glean(None);
1504
1505        // Register a ping for testing
1506        let ping_type = PingType::new(
1507            "test",
1508            true,
1509            /* send_if_empty */ true,
1510            true,
1511            true,
1512            true,
1513            vec![],
1514            vec![],
1515            true,
1516            vec![],
1517        );
1518        glean.register_ping_type(&ping_type);
1519
1520        // Submit the ping multiple times
1521        let n = 10;
1522        for _ in 0..n {
1523            ping_type.submit_sync(&glean, None);
1524        }
1525
1526        let directory_manager = PingDirectoryManager::new(dir.path());
1527        let pending_pings = directory_manager.process_dirs().pending_pings;
1528        // The pending pings array is sorted by date in ascending order,
1529        // the newest element is the last one.
1530        let (_, newest_ping) = &pending_pings.last().unwrap();
1531        let PingPayload {
1532            document_id: newest_ping_id,
1533            ..
1534        } = &newest_ping;
1535
1536        // Create a new upload manager pointing to the same data_path as the glean instance.
1537        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1538
1539        // Set the quota to just a little over the size on an empty ping file.
1540        // This way we can check that one ping is kept and all others are deleted.
1541        //
1542        // From manual testing I figured out an empty ping file is 324bytes,
1543        // I am setting this a little over just so that minor changes to the ping structure
1544        // don't immediatelly break this.
1545        upload_manager
1546            .policy
1547            .set_max_pending_pings_directory_size(Some(500));
1548
1549        // Get a task once
1550        // One ping should have been enqueued.
1551        // Make sure it is the newest ping.
1552        match upload_manager.get_upload_task(&glean, false) {
1553            PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1554            _ => panic!("Expected upload manager to return the next request!"),
1555        }
1556
1557        // Verify that no other requests were returned,
1558        // they should all have been deleted because pending pings quota was hit.
1559        assert_eq!(
1560            upload_manager.get_upload_task(&glean, false),
1561            PingUploadTask::done()
1562        );
1563
1564        // Verify that the correct number of deleted pings was recorded
1565        assert_eq!(
1566            n - 1,
1567            upload_manager
1568                .upload_metrics
1569                .deleted_pings_after_quota_hit
1570                .get_value(&glean, Some("metrics"))
1571                .unwrap()
1572        );
1573        assert_eq!(
1574            n,
1575            upload_manager
1576                .upload_metrics
1577                .pending_pings
1578                .get_value(&glean, Some("metrics"))
1579                .unwrap()
1580        );
1581    }
1582
1583    #[test]
1584    fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1585        let (mut glean, dir) = new_glean(None);
1586
1587        // Register a ping for testing
1588        let ping_type = PingType::new(
1589            "test",
1590            true,
1591            /* send_if_empty */ true,
1592            true,
1593            true,
1594            true,
1595            vec![],
1596            vec![],
1597            true,
1598            vec![],
1599        );
1600        glean.register_ping_type(&ping_type);
1601
1602        // How many pings we allow at maximum
1603        let count_quota = 3;
1604        // The number of pings we fill the pending pings directory with.
1605        let n = 10;
1606
1607        // Submit the ping multiple times
1608        for _ in 0..n {
1609            ping_type.submit_sync(&glean, None);
1610        }
1611
1612        let directory_manager = PingDirectoryManager::new(dir.path());
1613        let pending_pings = directory_manager.process_dirs().pending_pings;
1614        // The pending pings array is sorted by date in ascending order,
1615        // the newest element is the last one.
1616        let expected_pings = pending_pings
1617            .iter()
1618            .rev()
1619            .take(count_quota)
1620            .map(|(_, ping)| ping.document_id.clone())
1621            .collect::<Vec<_>>();
1622
1623        // Create a new upload manager pointing to the same data_path as the glean instance.
1624        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1625
1626        upload_manager
1627            .policy
1628            .set_max_pending_pings_count(Some(count_quota as u64));
1629
1630        // Get a task once
1631        // One ping should have been enqueued.
1632        // Make sure it is the newest ping.
1633        for ping_id in expected_pings.iter().rev() {
1634            match upload_manager.get_upload_task(&glean, false) {
1635                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1636                _ => panic!("Expected upload manager to return the next request!"),
1637            }
1638        }
1639
1640        // Verify that no other requests were returned,
1641        // they should all have been deleted because pending pings quota was hit.
1642        assert_eq!(
1643            upload_manager.get_upload_task(&glean, false),
1644            PingUploadTask::done()
1645        );
1646
1647        // Verify that the correct number of deleted pings was recorded
1648        assert_eq!(
1649            (n - count_quota) as i32,
1650            upload_manager
1651                .upload_metrics
1652                .deleted_pings_after_quota_hit
1653                .get_value(&glean, Some("metrics"))
1654                .unwrap()
1655        );
1656        assert_eq!(
1657            n as i32,
1658            upload_manager
1659                .upload_metrics
1660                .pending_pings
1661                .get_value(&glean, Some("metrics"))
1662                .unwrap()
1663        );
1664    }
1665
1666    #[test]
1667    fn size_and_count_quota_work_together_size_first() {
1668        let (mut glean, dir) = new_glean(None);
1669
1670        // Register a ping for testing
1671        let ping_type = PingType::new(
1672            "test",
1673            true,
1674            /* send_if_empty */ true,
1675            true,
1676            true,
1677            true,
1678            vec![],
1679            vec![],
1680            true,
1681            vec![],
1682        );
1683        glean.register_ping_type(&ping_type);
1684
1685        let expected_number_of_pings = 3;
1686        // The number of pings we fill the pending pings directory with.
1687        let n = 10;
1688
1689        // Submit the ping multiple times
1690        for _ in 0..n {
1691            ping_type.submit_sync(&glean, None);
1692        }
1693
1694        let directory_manager = PingDirectoryManager::new(dir.path());
1695        let pending_pings = directory_manager.process_dirs().pending_pings;
1696        // The pending pings array is sorted by date in ascending order,
1697        // the newest element is the last one.
1698        let expected_pings = pending_pings
1699            .iter()
1700            .rev()
1701            .take(expected_number_of_pings)
1702            .map(|(_, ping)| ping.document_id.clone())
1703            .collect::<Vec<_>>();
1704
1705        // Create a new upload manager pointing to the same data_path as the glean instance.
1706        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1707
1708        // From manual testing we figured out a basically empty ping file is 399 bytes,
1709        // so this allows 3 pings with some headroom in case of future changes.
1710        upload_manager
1711            .policy
1712            .set_max_pending_pings_directory_size(Some(1300));
1713        upload_manager.policy.set_max_pending_pings_count(Some(5));
1714
1715        // Get a task once
1716        // One ping should have been enqueued.
1717        // Make sure it is the newest ping.
1718        for ping_id in expected_pings.iter().rev() {
1719            match upload_manager.get_upload_task(&glean, false) {
1720                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1721                _ => panic!("Expected upload manager to return the next request!"),
1722            }
1723        }
1724
1725        // Verify that no other requests were returned,
1726        // they should all have been deleted because pending pings quota was hit.
1727        assert_eq!(
1728            upload_manager.get_upload_task(&glean, false),
1729            PingUploadTask::done()
1730        );
1731
1732        // Verify that the correct number of deleted pings was recorded
1733        assert_eq!(
1734            (n - expected_number_of_pings) as i32,
1735            upload_manager
1736                .upload_metrics
1737                .deleted_pings_after_quota_hit
1738                .get_value(&glean, Some("metrics"))
1739                .unwrap()
1740        );
1741        assert_eq!(
1742            n as i32,
1743            upload_manager
1744                .upload_metrics
1745                .pending_pings
1746                .get_value(&glean, Some("metrics"))
1747                .unwrap()
1748        );
1749    }
1750
1751    #[test]
1752    fn size_and_count_quota_work_together_count_first() {
1753        let (mut glean, dir) = new_glean(None);
1754
1755        // Register a ping for testing
1756        let ping_type = PingType::new(
1757            "test",
1758            true,
1759            /* send_if_empty */ true,
1760            true,
1761            true,
1762            true,
1763            vec![],
1764            vec![],
1765            true,
1766            vec![],
1767        );
1768        glean.register_ping_type(&ping_type);
1769
1770        let expected_number_of_pings = 2;
1771        // The number of pings we fill the pending pings directory with.
1772        let n = 10;
1773
1774        // Submit the ping multiple times
1775        for _ in 0..n {
1776            ping_type.submit_sync(&glean, None);
1777        }
1778
1779        let directory_manager = PingDirectoryManager::new(dir.path());
1780        let pending_pings = directory_manager.process_dirs().pending_pings;
1781        // The pending pings array is sorted by date in ascending order,
1782        // the newest element is the last one.
1783        let expected_pings = pending_pings
1784            .iter()
1785            .rev()
1786            .take(expected_number_of_pings)
1787            .map(|(_, ping)| ping.document_id.clone())
1788            .collect::<Vec<_>>();
1789
1790        // Create a new upload manager pointing to the same data_path as the glean instance.
1791        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1792
1793        // From manual testing we figured out an empty ping file is 324bytes,
1794        // so this allows 3 pings.
1795        upload_manager
1796            .policy
1797            .set_max_pending_pings_directory_size(Some(1000));
1798        upload_manager.policy.set_max_pending_pings_count(Some(2));
1799
1800        // Get a task once
1801        // One ping should have been enqueued.
1802        // Make sure it is the newest ping.
1803        for ping_id in expected_pings.iter().rev() {
1804            match upload_manager.get_upload_task(&glean, false) {
1805                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1806                _ => panic!("Expected upload manager to return the next request!"),
1807            }
1808        }
1809
1810        // Verify that no other requests were returned,
1811        // they should all have been deleted because pending pings quota was hit.
1812        assert_eq!(
1813            upload_manager.get_upload_task(&glean, false),
1814            PingUploadTask::done()
1815        );
1816
1817        // Verify that the correct number of deleted pings was recorded
1818        assert_eq!(
1819            (n - expected_number_of_pings) as i32,
1820            upload_manager
1821                .upload_metrics
1822                .deleted_pings_after_quota_hit
1823                .get_value(&glean, Some("metrics"))
1824                .unwrap()
1825        );
1826        assert_eq!(
1827            n as i32,
1828            upload_manager
1829                .upload_metrics
1830                .pending_pings
1831                .get_value(&glean, Some("metrics"))
1832                .unwrap()
1833        );
1834    }
1835
1836    #[test]
1837    fn maximum_wait_attemps_is_enforced() {
1838        let (glean, dir) = new_glean(None);
1839
1840        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1841
1842        // Define a max_wait_attemps policy, this is disabled for tests by default.
1843        let max_wait_attempts = 3;
1844        upload_manager
1845            .policy
1846            .set_max_wait_attempts(Some(max_wait_attempts));
1847
1848        // Add a rate limiter to the upload mangager with max of 1 ping 5secs.
1849        //
1850        // We arbitrarily set the maximum pings per interval to a very low number,
1851        // when the rate limiter reaches it's limit get_upload_task returns a PingUploadTask::Wait,
1852        // which will allow us to test the limitations around returning too many of those in a row.
1853        let secs_per_interval = 5;
1854        let max_pings_per_interval = 1;
1855        upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
1856
1857        // Enqueue two pings
1858        upload_manager.enqueue_ping(
1859            &glean,
1860            PingPayload {
1861                document_id: Uuid::new_v4().to_string(),
1862                upload_path: PATH.into(),
1863                json_body: "".into(),
1864                headers: None,
1865                body_has_info_sections: true,
1866                ping_name: "ping-name".into(),
1867                uploader_capabilities: vec![],
1868            },
1869        );
1870        upload_manager.enqueue_ping(
1871            &glean,
1872            PingPayload {
1873                document_id: Uuid::new_v4().to_string(),
1874                upload_path: PATH.into(),
1875                json_body: "".into(),
1876                headers: None,
1877                body_has_info_sections: true,
1878                ping_name: "ping-name".into(),
1879                uploader_capabilities: vec![],
1880            },
1881        );
1882
1883        // Get the first ping, it should be returned normally.
1884        match upload_manager.get_upload_task(&glean, false) {
1885            PingUploadTask::Upload { .. } => {}
1886            _ => panic!("Expected upload manager to return the next request!"),
1887        }
1888
1889        // Try to get the next ping,
1890        // we should be throttled and thus get a PingUploadTask::Wait.
1891        // Check that we are indeed allowed to get this response as many times as expected.
1892        for _ in 0..max_wait_attempts {
1893            let task = upload_manager.get_upload_task(&glean, false);
1894            assert!(task.is_wait());
1895        }
1896
1897        // Check that after we get PingUploadTask::Wait the allowed number of times,
1898        // we then get PingUploadTask::Done.
1899        assert_eq!(
1900            upload_manager.get_upload_task(&glean, false),
1901            PingUploadTask::done()
1902        );
1903
1904        // Wait for the rate limiter to allow upload tasks again.
1905        thread::sleep(Duration::from_secs(secs_per_interval));
1906
1907        // Check that we are allowed again to get pings.
1908        let task = upload_manager.get_upload_task(&glean, false);
1909        assert!(task.is_upload());
1910
1911        // And once we are done we don't need to wait anymore.
1912        assert_eq!(
1913            upload_manager.get_upload_task(&glean, false),
1914            PingUploadTask::done()
1915        );
1916    }
1917
1918    #[test]
1919    fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
1920        let (glean, dir) = new_glean(None);
1921        let upload_manager = PingUploadManager::new(dir.path(), "test");
1922        match upload_manager.get_upload_task(&glean, false) {
1923            PingUploadTask::Wait { time } => {
1924                assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
1925            }
1926            _ => panic!("Expected upload manager to return a wait task!"),
1927        };
1928    }
1929
1930    #[test]
1931    fn cannot_enqueue_ping_while_its_being_processed() {
1932        let (glean, dir) = new_glean(None);
1933
1934        let upload_manager = PingUploadManager::no_policy(dir.path());
1935
1936        // Enqueue a ping and start processing it
1937        let identifier = &Uuid::new_v4();
1938        let ping = PingPayload {
1939            document_id: identifier.to_string(),
1940            upload_path: PATH.into(),
1941            json_body: "".into(),
1942            headers: None,
1943            body_has_info_sections: true,
1944            ping_name: "ping-name".into(),
1945            uploader_capabilities: vec![],
1946        };
1947        upload_manager.enqueue_ping(&glean, ping);
1948        assert!(upload_manager.get_upload_task(&glean, false).is_upload());
1949
1950        // Attempt to re-enqueue the same ping
1951        let ping = PingPayload {
1952            document_id: identifier.to_string(),
1953            upload_path: PATH.into(),
1954            json_body: "".into(),
1955            headers: None,
1956            body_has_info_sections: true,
1957            ping_name: "ping-name".into(),
1958            uploader_capabilities: vec![],
1959        };
1960        upload_manager.enqueue_ping(&glean, ping);
1961
1962        // No new pings should have been enqueued so the upload task is Done.
1963        assert_eq!(
1964            upload_manager.get_upload_task(&glean, false),
1965            PingUploadTask::done()
1966        );
1967
1968        // Process the upload response
1969        upload_manager.process_ping_upload_response(
1970            &glean,
1971            &identifier.to_string(),
1972            UploadResult::http_status(200),
1973        );
1974    }
1975}