use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
use std::thread;
use std::time::{Duration, Instant};
use directory::PingDirectoryManager;
pub use request::{HeaderMap, PingRequest};
pub use result::{ffi_upload_result, UploadResult};
mod directory;
mod request;
mod result;
#[derive(Debug)]
struct RateLimiter {
started: Option<Instant>,
count: u32,
interval: Duration,
max_count: u32,
}
#[derive(PartialEq)]
enum RateLimiterState {
Incrementing,
Throttled,
}
impl RateLimiter {
pub fn new(interval: Duration, max_count: u32) -> Self {
Self {
started: None,
count: 0,
interval,
max_count,
}
}
fn reset(&mut self) {
self.started = Some(Instant::now());
self.count = 0;
}
fn should_reset(&self) -> bool {
if self.started.is_none() {
return true;
}
let elapsed = self.started.unwrap().elapsed();
if elapsed > self.interval {
return true;
}
false
}
pub fn get_state(&mut self) -> RateLimiterState {
if self.should_reset() {
self.reset();
}
if self.count == self.max_count {
return RateLimiterState::Throttled;
}
self.count += 1;
RateLimiterState::Incrementing
}
}
#[derive(PartialEq, Debug)]
pub enum PingUploadTask {
Upload(PingRequest),
Wait,
Done,
}
#[derive(Debug)]
pub struct PingUploadManager {
queue: Arc<RwLock<VecDeque<PingRequest>>>,
directory_manager: PingDirectoryManager,
processed_pending_pings: Arc<AtomicBool>,
rate_limiter: Option<RwLock<RateLimiter>>,
language_binding_name: String,
}
impl PingUploadManager {
pub fn new<P: Into<PathBuf>>(
data_path: P,
language_binding_name: &str,
sync_scan: bool,
) -> Self {
let queue = Arc::new(RwLock::new(VecDeque::new()));
let directory_manager = PingDirectoryManager::new(data_path);
let processed_pending_pings = Arc::new(AtomicBool::new(false));
let local_queue = queue.clone();
let local_flag = processed_pending_pings.clone();
let local_manager = directory_manager.clone();
let local_language_binding_name = language_binding_name.to_string();
let ping_scanning_thread = thread::Builder::new()
.name("glean.ping_directory_manager.process_dir".to_string())
.spawn(move || {
let mut local_queue = local_queue
.write()
.expect("Can't write to pending pings queue.");
for (document_id, path, body, headers) in local_manager.process_dir() {
if Self::is_enqueued(&local_queue, &document_id) {
continue;
}
let mut request = PingRequest::builder(&local_language_binding_name)
.document_id(document_id)
.path(path)
.body(body);
if let Some(headers) = headers {
request = request.headers(headers);
}
local_queue.push_back(request.build());
}
local_flag.store(true, Ordering::SeqCst);
})
.expect("Unable to spawn thread to process pings directories.");
if sync_scan {
ping_scanning_thread
.join()
.expect("Unable to wait for startup ping processing to finish.");
}
Self {
queue,
processed_pending_pings,
directory_manager,
rate_limiter: None,
language_binding_name: language_binding_name.into(),
}
}
fn has_processed_pings_dir(&self) -> bool {
self.processed_pending_pings.load(Ordering::SeqCst)
}
fn is_enqueued(queue: &VecDeque<PingRequest>, document_id: &str) -> bool {
queue
.iter()
.any(|request| request.document_id == document_id)
}
pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
self.rate_limiter = Some(RwLock::new(RateLimiter::new(
Duration::from_secs(interval),
max_tasks,
)));
}
fn enqueue_ping(&self, document_id: &str, path: &str, body: &str, headers: Option<HeaderMap>) {
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
if Self::is_enqueued(&queue, &document_id) {
log::trace!(
"Attempted to enqueue a duplicate ping {} at {}.",
document_id,
path
);
return;
}
log::trace!("Enqueuing ping {} at {}", document_id, path);
let mut request = PingRequest::builder(&self.language_binding_name)
.document_id(document_id)
.path(path)
.body(body);
if let Some(headers) = headers {
request = request.headers(headers);
}
queue.push_back(request.build());
}
pub fn enqueue_ping_from_file(&self, document_id: &str) {
if let Some((doc_id, path, body, headers)) =
self.directory_manager.process_file(document_id)
{
self.enqueue_ping(&doc_id, &path, &body, headers)
}
}
pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
log::trace!("Clearing ping queue");
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
queue.retain(|ping| ping.is_deletion_request());
log::trace!(
"{} pings left in the queue (only deletion-request expected)",
queue.len()
);
queue
}
pub fn get_upload_task(&self, log_ping: bool) -> PingUploadTask {
if !self.has_processed_pings_dir() {
log::info!(
"Tried getting an upload task, but processing is ongoing. Will come back later."
);
return PingUploadTask::Wait;
}
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
match queue.front() {
Some(request) => {
if let Some(rate_limiter) = &self.rate_limiter {
let mut rate_limiter = rate_limiter
.write()
.expect("Can't write to the rate limiter.");
if rate_limiter.get_state() == RateLimiterState::Throttled {
log::info!(
"Tried getting an upload task, but we are throttled at the moment."
);
return PingUploadTask::Wait;
}
}
log::info!(
"New upload task with id {} (path: {})",
request.document_id,
request.path
);
if log_ping {
if let Some(body) = request.pretty_body() {
chunked_log_info(&request.path, &body);
} else {
chunked_log_info(&request.path, "<invalid ping payload>");
}
}
PingUploadTask::Upload(queue.pop_front().unwrap())
}
None => {
log::info!("No more pings to upload! You are done.");
PingUploadTask::Done
}
}
}
pub fn process_ping_upload_response(&self, document_id: &str, status: UploadResult) {
use UploadResult::*;
match status {
HttpStatus(status @ 200..=299) => {
log::info!("Ping {} successfully sent {}.", document_id, status);
self.directory_manager.delete_file(document_id);
}
UnrecoverableFailure | HttpStatus(400..=499) => {
log::error!(
"Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
document_id,
status
);
self.directory_manager.delete_file(document_id);
}
RecoverableFailure | HttpStatus(_) => {
log::error!(
"Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
document_id,
status
);
self.enqueue_ping_from_file(&document_id);
}
};
}
}
#[cfg(target_os = "android")]
pub fn chunked_log_info(path: &str, payload: &str) {
const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
log::info!("Glean ping to URL: {}\n{}", path, payload);
return;
}
let mut start = 0;
let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
let mut chunk_idx = 1;
let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
while end < payload.len() {
for _ in 0..4 {
if payload.is_char_boundary(end) {
break;
}
end -= 1;
}
log::info!(
"Glean ping to URL: {} [Part {} of {}]\n{}",
path,
chunk_idx,
total_chunks,
&payload[start..end]
);
start = end;
end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
chunk_idx += 1;
}
if start < payload.len() {
log::info!(
"Glean ping to URL: {} [Part {} of {}]\n{}",
path,
chunk_idx,
total_chunks,
&payload[start..]
);
}
}
#[cfg(not(target_os = "android"))]
pub fn chunked_log_info(_path: &str, payload: &str) {
log::info!("{}", payload)
}
#[cfg(test)]
mod test {
use std::thread;
use std::time::Duration;
use uuid::Uuid;
use super::UploadResult::*;
use super::*;
use crate::metrics::PingType;
use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
#[test]
fn doesnt_error_when_there_are_no_pending_pings() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), "Testing", false);
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
assert_eq!(upload_manager.get_upload_task(false), PingUploadTask::Done);
}
#[test]
fn returns_ping_request_when_there_is_one() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), "Testing", false);
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
upload_manager.enqueue_ping(&Uuid::new_v4().to_string(), PATH, "", None);
match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}
#[test]
fn returns_as_many_ping_requests_as_there_are() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), "Testing", false);
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let n = 10;
for _ in 0..n {
upload_manager.enqueue_ping(&Uuid::new_v4().to_string(), PATH, "", None);
}
for _ in 0..n {
match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}
assert_eq!(upload_manager.get_upload_task(false), PingUploadTask::Done);
}
#[test]
fn limits_the_number_of_pings_when_there_is_rate_limiting() {
let dir = tempfile::tempdir().unwrap();
let mut upload_manager = PingUploadManager::new(dir.path(), "Testing", false);
let secs_per_interval = 3;
let max_pings_per_interval = 10;
upload_manager.set_rate_limiter(secs_per_interval, 10);
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
for _ in 0..max_pings_per_interval {
upload_manager.enqueue_ping(&Uuid::new_v4().to_string(), PATH, "", None);
}
for _ in 0..max_pings_per_interval {
match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}
upload_manager.enqueue_ping(&Uuid::new_v4().to_string(), PATH, "", None);
assert_eq!(PingUploadTask::Wait, upload_manager.get_upload_task(false));
thread::sleep(Duration::from_secs(secs_per_interval));
match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}
#[test]
fn clearing_the_queue_works_correctly() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), "Testing", false);
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
for _ in 0..10 {
upload_manager.enqueue_ping(&Uuid::new_v4().to_string(), PATH, "", None);
}
drop(upload_manager.clear_ping_queue());
assert_eq!(upload_manager.get_upload_task(false), PingUploadTask::Done);
}
#[test]
fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
let (mut glean, _) = new_glean(None);
while glean.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
glean
.internal_pings
.deletion_request
.submit(&glean, None)
.unwrap();
drop(glean.upload_manager.clear_ping_queue());
let upload_task = glean.get_upload_task();
match upload_task {
PingUploadTask::Upload(request) => assert!(request.is_deletion_request()),
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn fills_up_queue_successfully_from_disk() {
let (mut glean, _) = new_glean(None);
while glean.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
let mut upload_task = glean.get_upload_task();
while upload_task == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
upload_task = glean.get_upload_task();
}
for _ in 0..n {
match upload_task {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
upload_task = glean.get_upload_task();
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn processes_correctly_success_upload_response() {
let (mut glean, dir) = new_glean(None);
while glean.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
glean.process_ping_upload_response(&document_id, HttpStatus(200));
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn processes_correctly_client_error_upload_response() {
let (mut glean, dir) = new_glean(None);
while glean.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
glean.process_ping_upload_response(&document_id, HttpStatus(404));
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn processes_correctly_server_error_upload_response() {
let (mut glean, _) = new_glean(None);
while glean.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
glean.process_ping_upload_response(&document_id, HttpStatus(500));
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
assert_eq!(document_id, request.document_id);
}
_ => panic!("Expected upload manager to return the next request!"),
}
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn processes_correctly_unrecoverable_upload_response() {
let (mut glean, dir) = new_glean(None);
while glean.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
glean.process_ping_upload_response(&document_id, UnrecoverableFailure);
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn new_pings_are_added_while_upload_in_progress() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), "Testing", false);
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let doc1 = Uuid::new_v4().to_string();
let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
let doc2 = Uuid::new_v4().to_string();
let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
upload_manager.enqueue_ping(&doc1, &path1, "", None);
let req = match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(req) => req,
_ => panic!("Expected upload manager to return the next request!"),
};
assert_eq!(doc1, req.document_id);
upload_manager.enqueue_ping(&doc2, &path2, "", None);
upload_manager.process_ping_upload_response(&req.document_id, HttpStatus(200));
let req = match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(req) => req,
_ => panic!("Expected upload manager to return the next request!"),
};
assert_eq!(doc2, req.document_id);
upload_manager.process_ping_upload_response(&req.document_id, HttpStatus(200));
assert_eq!(upload_manager.get_upload_task(false), PingUploadTask::Done);
}
#[test]
fn uploader_sync_init() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), "Testing", true);
assert_eq!(PingUploadTask::Done, upload_manager.get_upload_task(false))
}
#[test]
fn adds_debug_view_header_to_requests_when_tag_is_set() {
let (mut glean, _) = new_glean(None);
while glean.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
glean.set_debug_view_tag("valid-tag");
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
match glean.get_upload_task() {
PingUploadTask::Upload(request) => {
assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
}
_ => panic!("Expected upload manager to return the next request!"),
}
}
#[test]
fn duplicates_are_not_enqueued() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), "Testing", false);
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let doc_id = Uuid::new_v4().to_string();
let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
upload_manager.enqueue_ping(&doc_id, &path, "", None);
upload_manager.enqueue_ping(&doc_id, &path, "", None);
match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(upload_manager.get_upload_task(false), PingUploadTask::Done);
}
}