#![allow(dead_code)]
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
use std::thread;
use serde_json::Value as JsonValue;
use directory::PingDirectoryManager;
pub use request::PingRequest;
pub use result::{ffi_upload_result, UploadResult};
mod directory;
mod request;
mod result;
#[derive(PartialEq, Debug)]
pub enum PingUploadTask {
Upload(PingRequest),
Wait,
Done,
}
#[derive(Debug)]
pub struct PingUploadManager {
queue: Arc<RwLock<VecDeque<PingRequest>>>,
directory_manager: PingDirectoryManager,
processed_pending_pings: Arc<AtomicBool>,
}
impl PingUploadManager {
pub fn new<P: Into<PathBuf>>(data_path: P) -> Self {
let queue = Arc::new(RwLock::new(VecDeque::new()));
let directory_manager = PingDirectoryManager::new(data_path);
let processed_pending_pings = Arc::new(AtomicBool::new(false));
let local_queue = queue.clone();
let local_flag = processed_pending_pings.clone();
let local_manager = directory_manager.clone();
let _ = thread::Builder::new()
.name("glean.ping_directory_manager.process_dir".to_string())
.spawn(move || {
let mut local_queue = local_queue
.write()
.expect("Can't write to pending pings queue.");
local_queue.extend(local_manager.process_dir());
local_flag.store(true, Ordering::SeqCst);
})
.expect("Unable to spawn thread to process pings directories.");
Self {
queue,
processed_pending_pings,
directory_manager,
}
}
fn has_processed_pings_dir(&self) -> bool {
self.processed_pending_pings.load(Ordering::SeqCst)
}
pub fn enqueue_ping(&self, document_id: &str, path: &str, body: JsonValue) {
log::trace!("Enqueuing ping {} at {}", document_id, path);
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
let request = PingRequest::new(document_id, path, body);
queue.push_back(request);
}
pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
log::trace!("Clearing ping queue");
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
queue.retain(|ping| ping.is_deletion_request());
log::trace!(
"{} pings left in the queue (only deletion-request expected)",
queue.len()
);
queue
}
pub fn get_upload_task(&self) -> PingUploadTask {
if !self.has_processed_pings_dir() {
log::info!(
"Tried getting an upload task, but processing is ongoing. Will come back later."
);
return PingUploadTask::Wait;
}
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
match queue.pop_front() {
Some(request) => {
log::info!(
"New upload task with id {} (path: {})",
request.document_id,
request.path
);
PingUploadTask::Upload(request)
}
None => {
log::info!("No more pings to upload! You are done.");
PingUploadTask::Done
}
}
}
pub fn process_ping_upload_response(&self, document_id: &str, status: UploadResult) {
use UploadResult::*;
match status {
HttpStatus(status @ 200..=299) => {
log::info!("Ping {} successfully sent {}.", document_id, status);
self.directory_manager.delete_file(document_id);
}
UnrecoverableFailure | HttpStatus(400..=499) => {
log::error!(
"Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
document_id,
status
);
self.directory_manager.delete_file(document_id);
}
RecoverableFailure | HttpStatus(_) => {
log::error!(
"Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
document_id,
status
);
if let Some(request) = self.directory_manager.process_file(document_id) {
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
queue.push_back(request);
}
}
};
}
}
#[cfg(test)]
mod test {
use std::thread;
use std::time::Duration;
use serde_json::json;
use super::UploadResult::*;
use super::*;
use crate::metrics::PingType;
use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
const DOCUMENT_ID: &str = "40e31919-684f-43b0-a5aa-e15c2d56a674";
const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
#[test]
fn test_doesnt_error_when_there_are_no_pending_pings() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path());
while upload_manager.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
assert_eq!(upload_manager.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn test_returns_ping_request_when_there_is_one() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path());
while upload_manager.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
upload_manager.enqueue_ping(DOCUMENT_ID, PATH, json!({}));
match upload_manager.get_upload_task() {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}
#[test]
fn test_returns_as_many_ping_requests_as_there_are() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path());
while upload_manager.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let n = 10;
for _ in 0..n {
upload_manager.enqueue_ping(DOCUMENT_ID, PATH, json!({}));
}
for _ in 0..n {
match upload_manager.get_upload_task() {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}
assert_eq!(upload_manager.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn test_clearing_the_queue_works_correctly() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path());
while upload_manager.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
for _ in 0..10 {
upload_manager.enqueue_ping(DOCUMENT_ID, PATH, json!({}));
}
drop(upload_manager.clear_ping_queue());
assert_eq!(upload_manager.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn test_clearing_the_queue_doesnt_clear_deletion_request_pings() {
let (mut glean, _) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
glean
.internal_pings
.deletion_request
.submit(&glean, None)
.unwrap();
drop(glean.upload_manager.clear_ping_queue());
let upload_task = glean.get_upload_task();
match upload_task {
PingUploadTask::Upload(request) => assert!(request.is_deletion_request()),
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn test_fills_up_queue_successfully_from_disk() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
let n = 10;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
let upload_manager = PingUploadManager::new(dir.path());
let mut upload_task = upload_manager.get_upload_task();
while upload_task == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
upload_task = upload_manager.get_upload_task();
}
for _ in 0..n {
match upload_task {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
upload_task = upload_manager.get_upload_task();
}
assert_eq!(upload_manager.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn test_processes_correctly_success_upload_response() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let upload_manager = PingUploadManager::new(&dir.path());
let mut upload_task = upload_manager.get_upload_task();
while upload_task == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
upload_task = upload_manager.get_upload_task();
}
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match upload_task {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
upload_manager.process_ping_upload_response(&document_id, HttpStatus(200));
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(upload_manager.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn test_processes_correctly_client_error_upload_response() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let upload_manager = PingUploadManager::new(&dir.path());
let mut upload_task = upload_manager.get_upload_task();
while upload_task == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
upload_task = upload_manager.get_upload_task();
}
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match upload_task {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
upload_manager.process_ping_upload_response(&document_id, HttpStatus(404));
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(upload_manager.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn test_processes_correctly_server_error_upload_response() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let upload_manager = PingUploadManager::new(dir.path());
let mut upload_task = upload_manager.get_upload_task();
while upload_task == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
upload_task = upload_manager.get_upload_task();
}
match upload_task {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
upload_manager.process_ping_upload_response(&document_id, HttpStatus(500));
match upload_manager.get_upload_task() {
PingUploadTask::Upload(request) => {
assert_eq!(document_id, request.document_id);
}
_ => panic!("Expected upload manager to return the next request!"),
}
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(upload_manager.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn test_processes_correctly_unrecoverable_upload_response() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
glean.submit_ping(&ping_type, None).unwrap();
let upload_manager = PingUploadManager::new(&dir.path());
let mut upload_task = upload_manager.get_upload_task();
while upload_task == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
upload_task = upload_manager.get_upload_task();
}
let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
match upload_task {
PingUploadTask::Upload(request) => {
let document_id = request.document_id;
upload_manager.process_ping_upload_response(&document_id, UnrecoverableFailure);
assert!(!pending_pings_dir.join(document_id).exists());
}
_ => panic!("Expected upload manager to return the next request!"),
}
assert_eq!(upload_manager.get_upload_task(), PingUploadTask::Done);
}
#[test]
fn new_pings_are_added_while_upload_in_progress() {
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path());
while upload_manager.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let doc1 = "684fa150-8dff-11ea-8faf-cb1ff3b11119";
let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
let doc2 = "74f14e9a-8dff-11ea-b45a-6f936923f639";
let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
upload_manager.enqueue_ping(doc1, &path1, json!({}));
let req = match upload_manager.get_upload_task() {
PingUploadTask::Upload(req) => req,
_ => panic!("Expected upload manager to return the next request!"),
};
assert_eq!(doc1, req.document_id);
upload_manager.enqueue_ping(doc2, &path2, json!({}));
upload_manager.process_ping_upload_response(&req.document_id, HttpStatus(200));
let req = match upload_manager.get_upload_task() {
PingUploadTask::Upload(req) => req,
_ => panic!("Expected upload manager to return the next request!"),
};
assert_eq!(doc2, req.document_id);
upload_manager.process_ping_upload_response(&req.document_id, HttpStatus(200));
match upload_manager.get_upload_task() {
PingUploadTask::Done => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}
}