use std::collections::VecDeque;
use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
use std::thread;
use std::time::{Duration, Instant};
use crate::error::ErrorKind;
use crate::{internal_metrics::UploadMetrics, Glean};
use directory::{PingDirectoryManager, PingPayloadsByDirectory};
use policy::Policy;
pub use request::{HeaderMap, PingRequest};
pub use result::{ffi_upload_result, UploadResult};
mod directory;
mod policy;
mod request;
mod result;
const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000;
#[derive(Debug)]
struct RateLimiter {
started: Option<Instant>,
count: u32,
interval: Duration,
max_count: u32,
}
#[derive(PartialEq)]
enum RateLimiterState {
Incrementing,
Throttled(u64),
}
impl RateLimiter {
pub fn new(interval: Duration, max_count: u32) -> Self {
Self {
started: None,
count: 0,
interval,
max_count,
}
}
fn reset(&mut self) {
self.started = Some(Instant::now());
self.count = 0;
}
fn elapsed(&self) -> Duration {
self.started.unwrap().elapsed()
}
fn should_reset(&self) -> bool {
if self.started.is_none() {
return true;
}
if self.elapsed() > self.interval {
return true;
}
false
}
pub fn get_state(&mut self) -> RateLimiterState {
if self.should_reset() {
self.reset();
}
if self.count == self.max_count {
let remaining = self.interval.as_millis() - self.elapsed().as_millis();
return RateLimiterState::Throttled(
remaining
.try_into()
.unwrap_or(self.interval.as_secs() * 1000),
);
}
self.count += 1;
RateLimiterState::Incrementing
}
}
#[derive(PartialEq, Debug)]
pub enum PingUploadTask {
Upload(PingRequest),
Wait(u64),
Done,
}
impl PingUploadTask {
pub fn is_upload(&self) -> bool {
matches!(self, PingUploadTask::Upload(_))
}
pub fn is_wait(&self) -> bool {
matches!(self, PingUploadTask::Wait(_))
}
}
#[derive(Debug)]
pub struct PingUploadManager {
queue: RwLock<VecDeque<PingRequest>>,
directory_manager: PingDirectoryManager,
processed_pending_pings: Arc<AtomicBool>,
cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
recoverable_failure_count: AtomicU32,
wait_attempt_count: AtomicU32,
rate_limiter: Option<RwLock<RateLimiter>>,
language_binding_name: String,
upload_metrics: UploadMetrics,
policy: Policy,
}
impl PingUploadManager {
pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
Self {
queue: RwLock::new(VecDeque::new()),
directory_manager: PingDirectoryManager::new(data_path),
processed_pending_pings: Arc::new(AtomicBool::new(false)),
cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
recoverable_failure_count: AtomicU32::new(0),
wait_attempt_count: AtomicU32::new(0),
rate_limiter: None,
language_binding_name: language_binding_name.into(),
upload_metrics: UploadMetrics::new(),
policy: Policy::default(),
}
}
pub fn scan_pending_pings_directories(&self) -> std::thread::JoinHandle<()> {
let local_manager = self.directory_manager.clone();
let local_cached_pings = self.cached_pings.clone();
let local_flag = self.processed_pending_pings.clone();
thread::Builder::new()
.name("glean.ping_directory_manager.process_dir".to_string())
.spawn(move || {
let mut local_cached_pings = local_cached_pings
.write()
.expect("Can't write to pending pings cache.");
local_cached_pings.extend(local_manager.process_dirs());
local_flag.store(true, Ordering::SeqCst);
})
.expect("Unable to spawn thread to process pings directories.")
}
#[cfg(test)]
pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
let mut upload_manager = Self::new(data_path, "Test");
upload_manager.policy.set_max_recoverable_failures(None);
upload_manager.policy.set_max_wait_attempts(None);
upload_manager.policy.set_max_ping_body_size(None);
upload_manager
.policy
.set_max_pending_pings_directory_size(None);
upload_manager.policy.set_max_pending_pings_count(None);
upload_manager
.scan_pending_pings_directories()
.join()
.unwrap();
upload_manager
}
fn processed_pending_pings(&self) -> bool {
self.processed_pending_pings.load(Ordering::SeqCst)
}
fn recoverable_failure_count(&self) -> u32 {
self.recoverable_failure_count.load(Ordering::SeqCst)
}
fn wait_attempt_count(&self) -> u32 {
self.wait_attempt_count.load(Ordering::SeqCst)
}
fn build_ping_request(
&self,
glean: &Glean,
document_id: &str,
path: &str,
body: &str,
headers: Option<HeaderMap>,
) -> Option<PingRequest> {
let mut request = PingRequest::builder(
&self.language_binding_name,
self.policy.max_ping_body_size(),
)
.document_id(document_id)
.path(path)
.body(body);
if let Some(headers) = headers {
request = request.headers(headers);
}
match request.build() {
Ok(request) => Some(request),
Err(e) => {
log::warn!("Error trying to build ping request: {}", e);
self.directory_manager.delete_file(&document_id);
if let ErrorKind::PingBodyOverflow(s) = e.kind() {
self.upload_metrics
.discarded_exceeding_pings_size
.accumulate(glean, *s as u64 / 1024);
}
None
}
}
}
fn enqueue_ping(
&self,
glean: &Glean,
document_id: &str,
path: &str,
body: &str,
headers: Option<HeaderMap>,
) {
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
if queue
.iter()
.any(|request| request.document_id == document_id)
{
log::warn!(
"Attempted to enqueue a duplicate ping {} at {}.",
document_id,
path
);
return;
}
log::trace!("Enqueuing ping {} at {}", document_id, path);
if let Some(request) = self.build_ping_request(glean, document_id, path, body, headers) {
queue.push_back(request)
}
}
fn enqueue_cached_pings(&self, glean: &Glean) {
let mut cached_pings = self
.cached_pings
.write()
.expect("Can't write to pending pings cache.");
if cached_pings.len() > 0 {
let mut pending_pings_directory_size: u64 = 0;
let mut pending_pings_count = 0;
let mut deleting = false;
let total = cached_pings.pending_pings.len() as u64;
self.upload_metrics
.pending_pings
.add(glean, total.try_into().unwrap_or(0));
if total > self.policy.max_pending_pings_count() {
log::warn!(
"More than {} pending pings in the directory, will delete {} old pings.",
self.policy.max_pending_pings_count(),
total - self.policy.max_pending_pings_count()
);
}
cached_pings.pending_pings.reverse();
cached_pings.pending_pings.retain(|(file_size, (document_id, _, _, _))| {
pending_pings_count += 1;
pending_pings_directory_size += file_size;
if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
log::warn!(
"Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
self.policy.max_pending_pings_directory_size()
);
deleting = true;
}
if pending_pings_count > self.policy.max_pending_pings_count() {
deleting = true;
}
if deleting && self.directory_manager.delete_file(&document_id) {
self.upload_metrics
.deleted_pings_after_quota_hit
.add(glean, 1);
return false;
}
true
});
cached_pings.pending_pings.reverse();
self.upload_metrics
.pending_pings_directory_size
.accumulate(glean, pending_pings_directory_size as u64 / 1024);
let deletion_request_pings = cached_pings.deletion_request_pings.drain(..);
for (_, (document_id, path, body, headers)) in deletion_request_pings {
self.enqueue_ping(glean, &document_id, &path, &body, headers);
}
let pending_pings = cached_pings.pending_pings.drain(..);
for (_, (document_id, path, body, headers)) in pending_pings {
self.enqueue_ping(glean, &document_id, &path, &body, headers);
}
}
}
pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
self.rate_limiter = Some(RwLock::new(RateLimiter::new(
Duration::from_secs(interval),
max_tasks,
)));
}
pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
if let Some((doc_id, path, body, headers)) =
self.directory_manager.process_file(document_id)
{
self.enqueue_ping(glean, &doc_id, &path, &body, headers)
}
}
pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
log::trace!("Clearing ping queue");
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
queue.retain(|ping| ping.is_deletion_request());
log::trace!(
"{} pings left in the queue (only deletion-request expected)",
queue.len()
);
queue
}
fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
let wait_or_done = |time: u64| {
self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
if self.wait_attempt_count() > self.policy.max_wait_attempts() {
PingUploadTask::Done
} else {
PingUploadTask::Wait(time)
}
};
if !self.processed_pending_pings() {
log::info!(
"Tried getting an upload task, but processing is ongoing. Will come back later."
);
return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
}
self.enqueue_cached_pings(glean);
if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
log::warn!(
"Reached maximum recoverable failures for the current uploading window. You are done."
);
return PingUploadTask::Done;
}
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
match queue.front() {
Some(request) => {
if let Some(rate_limiter) = &self.rate_limiter {
let mut rate_limiter = rate_limiter
.write()
.expect("Can't write to the rate limiter.");
if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
log::info!(
"Tried getting an upload task, but we are throttled at the moment."
);
return wait_or_done(remaining);
}
}
log::info!(
"New upload task with id {} (path: {})",
request.document_id,
request.path
);
if log_ping {
if let Some(body) = request.pretty_body() {
chunked_log_info(&request.path, &body);
} else {
chunked_log_info(&request.path, "<invalid ping payload>");
}
}
PingUploadTask::Upload(queue.pop_front().unwrap())
}
None => {
log::info!("No more pings to upload! You are done.");
PingUploadTask::Done
}
}
}
pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
let task = self.get_upload_task_internal(glean, log_ping);
if !task.is_wait() && self.wait_attempt_count() > 0 {
self.wait_attempt_count.store(0, Ordering::SeqCst);
}
if !task.is_upload() && self.recoverable_failure_count() > 0 {
self.recoverable_failure_count.store(0, Ordering::SeqCst);
}
task
}
pub fn process_ping_upload_response(
&self,
glean: &Glean,
document_id: &str,
status: UploadResult,
) {
use UploadResult::*;
if let Some(label) = status.get_label() {
let metric = self.upload_metrics.ping_upload_failure.get(label);
metric.add(glean, 1);
}
match status {
HttpStatus(status @ 200..=299) => {
log::info!("Ping {} successfully sent {}.", document_id, status);
self.directory_manager.delete_file(document_id);
}
UnrecoverableFailure | HttpStatus(400..=499) => {
log::warn!(
"Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
document_id,
status
);
self.directory_manager.delete_file(document_id);
}
RecoverableFailure | HttpStatus(_) => {
log::warn!(
"Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
document_id,
status
);
self.enqueue_ping_from_file(glean, &document_id);
self.recoverable_failure_count
.fetch_add(1, Ordering::SeqCst);
}
};
}
}
#[cfg(target_os = "android")]
pub fn chunked_log_info(path: &str, payload: &str) {
const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
log::info!("Glean ping to URL: {}\n{}", path, payload);
return;
}
let mut start = 0;
let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
let mut chunk_idx = 1;
let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
while end < payload.len() {
for _ in 0..4 {
if payload.is_char_boundary(end) {
break;
}
end -= 1;
}
log::info!(
"Glean ping to URL: {} [Part {} of {}]\n{}",
path,
chunk_idx,
total_chunks,
&payload[start..end]
);
start = end;
end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
chunk_idx += 1;
}
if start < payload.len() {
log::info!(
"Glean ping to URL: {} [Part {} of {}]\n{}",
path,
chunk_idx,
total_chunks,
&payload[start..]
);
}
}
#[cfg(not(target_os = "android"))]
pub fn chunked_log_info(_path: &str, payload: &str) {
log::info!("{}", payload)
}
#[cfg(test)]
mod test {
use std::thread;
use std::time::Duration;
use uuid::Uuid;
use super::UploadResult::*;
use super::*;
use crate::metrics::PingType;
use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
#[test]
fn doesnt_error_when_there_are_no_pending_pings() {
let (glean, _) = new_glean(None);
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn returns_ping_request_when_there_is_one() {
let (glean, dir) = new_glean(None);
let upload_manager = PingUploadManager::no_policy(dir.path());
upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_upload());
}
#[test]
fn returns_as_many_ping_requests_as_there_are() {
let (glean, dir) = new_glean(None);
let upload_manager = PingUploadManager::no_policy(dir.path());
let n = 10;
for _ in 0..n {
upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
}
for _ in 0..n {
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_upload());
}
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
}
#[test]
fn limits_the_number_of_pings_when_there_is_rate_limiting() {
let (glean, dir) = new_glean(None);
let mut upload_manager = PingUploadManager::no_policy(dir.path());
let max_pings_per_interval = 10;
upload_manager.set_rate_limiter(3, 10);
for _ in 0..max_pings_per_interval {
upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
}
for _ in 0..max_pings_per_interval {
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_upload());
}
upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Wait(time) => {
thread::sleep(Duration::from_millis(time));
}
_ => panic!("Expected upload manager to return a wait task!"),
};
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_upload());
}
#[test]
fn clearing_the_queue_works_correctly() {
let (glean, dir) = new_glean(None);
let upload_manager = PingUploadManager::no_policy(dir.path());
for _ in 0..10 {
upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
}
drop(upload_manager.clear_ping_queue());
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
}
#[test]
fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
let (mut glean, _) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
glean
.internal_pings
.deletion_request
.submit(&glean, None)
.unwrap();
drop(glean.upload_manager.clear_ping_queue());
let upload_task = glean.get_upload_task();
match upload_task {
PingUploadTask::Upload(request) => assert!(request.is_deletion_request()),
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn fills_up_queue_successfully_from_disk() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
let upload_manager = PingUploadManager::no_policy(dir.path());
for _ in 0..n {
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_upload());
}
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
}
#[test]
fn processes_correctly_success_upload_response() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
glean.process_ping_upload_response(&document_id, HttpStatus(200));
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn processes_correctly_client_error_upload_response() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
glean.process_ping_upload_response(&document_id, HttpStatus(404));
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn processes_correctly_server_error_upload_response() {
let (mut glean, _) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
glean.process_ping_upload_response(&document_id, HttpStatus(500));
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
assert_eq!(document_id, request.document_id);
}
_ => panic!("Expected upload manager to return the next request!"),
}
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn processes_correctly_unrecoverable_upload_response() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
glean.process_ping_upload_response(&document_id, UnrecoverableFailure);
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn new_pings_are_added_while_upload_in_progress() {
let (glean, dir) = new_glean(None);
let upload_manager = PingUploadManager::no_policy(dir.path());
let doc1 = Uuid::new_v4().to_string();
let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
let doc2 = Uuid::new_v4().to_string();
let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
upload_manager.enqueue_ping(&glean, &doc1, &path1, "", None);
let req = match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Upload(req) => req,
_ => panic!("Expected upload manager to return the next request!"),
};
assert_eq!(doc1, req.document_id);
upload_manager.enqueue_ping(&glean, &doc2, &path2, "", None);
upload_manager.process_ping_upload_response(&glean, &req.document_id, HttpStatus(200));
let req = match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Upload(req) => req,
_ => panic!("Expected upload manager to return the next request!"),
};
assert_eq!(doc2, req.document_id);
upload_manager.process_ping_upload_response(&glean, &req.document_id, HttpStatus(200));
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
}
#[test]
fn adds_debug_view_header_to_requests_when_tag_is_set() {
let (mut glean, _) = new_glean(None);
glean.set_debug_view_tag("valid-tag");
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
}
_ => panic!("Expected upload manager to return the next request!"),
}
}
#[test]
fn duplicates_are_not_enqueued() {
let (glean, dir) = new_glean(None);
let upload_manager = PingUploadManager::no_policy(dir.path());
let doc_id = Uuid::new_v4().to_string();
let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
upload_manager.enqueue_ping(&glean, &doc_id, &path, "", None);
upload_manager.enqueue_ping(&glean, &doc_id, &path, "", None);
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_upload());
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
}
#[test]
fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let n = 5;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
let mut upload_manager = PingUploadManager::no_policy(dir.path());
let max_recoverable_failures = 3;
upload_manager
.policy
.set_max_recoverable_failures(Some(max_recoverable_failures));
for _ in 0..max_recoverable_failures {
match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Upload(req) => upload_manager.process_ping_upload_response(
&glean,
&req.document_id,
RecoverableFailure,
),
_ => panic!("Expected upload manager to return the next request!"),
}
}
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
for _ in 0..n {
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_upload());
}
}
#[test]
fn quota_is_enforced_when_enqueueing_cached_pings() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
let directory_manager = PingDirectoryManager::new(dir.path());
let pending_pings = directory_manager.process_dirs().pending_pings;
let (_, newest_ping) = &pending_pings.last().unwrap();
let (newest_ping_id, _, _, _) = &newest_ping;
let mut upload_manager = PingUploadManager::no_policy(dir.path());
upload_manager
.policy
.set_max_pending_pings_directory_size(Some(500));
match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Upload(request) => assert_eq!(&request.document_id, newest_ping_id),
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
assert_eq!(
n - 1,
upload_manager
.upload_metrics
.deleted_pings_after_quota_hit
.test_get_value(&glean, "metrics")
.unwrap()
);
assert_eq!(
n as i32,
upload_manager
.upload_metrics
.pending_pings
.test_get_value(&glean, "metrics")
.unwrap()
);
}
#[test]
fn number_quota_is_enforced_when_enqueueing_cached_pings() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let count_quota = 3;
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
let directory_manager = PingDirectoryManager::new(dir.path());
let pending_pings = directory_manager.process_dirs().pending_pings;
let expected_pings = pending_pings
.iter()
.rev()
.take(count_quota)
.map(|(_, ping)| ping.0.clone())
.collect::<Vec<_>>();
let mut upload_manager = PingUploadManager::no_policy(dir.path());
upload_manager
.policy
.set_max_pending_pings_count(Some(count_quota as u64));
for ping_id in expected_pings.iter().rev() {
match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Upload(request) => assert_eq!(&request.document_id, ping_id),
_ => panic!("Expected upload manager to return the next request!"),
}
}
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
assert_eq!(
(n - count_quota) as i32,
upload_manager
.upload_metrics
.deleted_pings_after_quota_hit
.test_get_value(&glean, "metrics")
.unwrap()
);
assert_eq!(
n as i32,
upload_manager
.upload_metrics
.pending_pings
.test_get_value(&glean, "metrics")
.unwrap()
);
}
#[test]
fn size_and_count_quota_work_together_size_first() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let expected_number_of_pings = 3;
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
let directory_manager = PingDirectoryManager::new(dir.path());
let pending_pings = directory_manager.process_dirs().pending_pings;
let expected_pings = pending_pings
.iter()
.rev()
.take(expected_number_of_pings)
.map(|(_, ping)| ping.0.clone())
.collect::<Vec<_>>();
let mut upload_manager = PingUploadManager::no_policy(dir.path());
upload_manager
.policy
.set_max_pending_pings_directory_size(Some(1000));
upload_manager.policy.set_max_pending_pings_count(Some(5));
for ping_id in expected_pings.iter().rev() {
match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Upload(request) => assert_eq!(&request.document_id, ping_id),
_ => panic!("Expected upload manager to return the next request!"),
}
}
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
assert_eq!(
(n - expected_number_of_pings) as i32,
upload_manager
.upload_metrics
.deleted_pings_after_quota_hit
.test_get_value(&glean, "metrics")
.unwrap()
);
assert_eq!(
n as i32,
upload_manager
.upload_metrics
.pending_pings
.test_get_value(&glean, "metrics")
.unwrap()
);
}
#[test]
fn size_and_count_quota_work_together_count_first() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let expected_number_of_pings = 2;
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
let directory_manager = PingDirectoryManager::new(dir.path());
let pending_pings = directory_manager.process_dirs().pending_pings;
let expected_pings = pending_pings
.iter()
.rev()
.take(expected_number_of_pings)
.map(|(_, ping)| ping.0.clone())
.collect::<Vec<_>>();
let mut upload_manager = PingUploadManager::no_policy(dir.path());
upload_manager
.policy
.set_max_pending_pings_directory_size(Some(1000));
upload_manager.policy.set_max_pending_pings_count(Some(2));
for ping_id in expected_pings.iter().rev() {
match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Upload(request) => assert_eq!(&request.document_id, ping_id),
_ => panic!("Expected upload manager to return the next request!"),
}
}
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
assert_eq!(
(n - expected_number_of_pings) as i32,
upload_manager
.upload_metrics
.deleted_pings_after_quota_hit
.test_get_value(&glean, "metrics")
.unwrap()
);
assert_eq!(
n as i32,
upload_manager
.upload_metrics
.pending_pings
.test_get_value(&glean, "metrics")
.unwrap()
);
}
#[test]
fn maximum_wait_attemps_is_enforced() {
let (glean, dir) = new_glean(None);
let mut upload_manager = PingUploadManager::no_policy(dir.path());
let max_wait_attempts = 3;
upload_manager
.policy
.set_max_wait_attempts(Some(max_wait_attempts));
let secs_per_interval = 5;
let max_pings_per_interval = 1;
upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
for _ in 0..max_wait_attempts {
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_wait());
}
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
thread::sleep(Duration::from_secs(secs_per_interval));
let task = upload_manager.get_upload_task(&glean, false);
assert!(task.is_upload());
assert_eq!(
upload_manager.get_upload_task(&glean, false),
PingUploadTask::Done
);
}
#[test]
fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
let (glean, dir) = new_glean(None);
let upload_manager = PingUploadManager::new(dir.path(), "test");
match upload_manager.get_upload_task(&glean, false) {
PingUploadTask::Wait(time) => {
assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
}
_ => panic!("Expected upload manager to return a wait task!"),
};
}
}