use std::cmp::Ordering;
use std::fs::{self, File};
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use serde::Deserialize;
use uuid::Uuid;
use super::request::HeaderMap;
use crate::{DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};
pub type PingPayload = (String, String, String, Option<HeaderMap>);
#[derive(Clone, Debug, Default)]
pub struct PingPayloadsByDirectory {
pub pending_pings: Vec<(u64, PingPayload)>,
pub deletion_request_pings: Vec<(u64, PingPayload)>,
}
impl PingPayloadsByDirectory {
pub fn extend(&mut self, other: PingPayloadsByDirectory) {
self.pending_pings.extend(other.pending_pings);
self.deletion_request_pings
.extend(other.deletion_request_pings);
}
pub fn len(&self) -> usize {
self.pending_pings.len() + self.deletion_request_pings.len()
}
}
fn get_file_name_as_str(path: &Path) -> Option<&str> {
match path.file_name() {
None => {
log::warn!("Error getting file name from path: {}", path.display());
None
}
Some(file_name) => {
let file_name = file_name.to_str();
if file_name.is_none() {
log::warn!("File name is not valid unicode: {}", path.display());
}
file_name
}
}
}
fn process_metadata(path: &str, metadata: &str) -> Option<HeaderMap> {
#[derive(Deserialize)]
struct PingMetadata {
pub headers: HeaderMap,
}
if let Ok(metadata) = serde_json::from_str::<PingMetadata>(metadata) {
return Some(metadata.headers);
} else {
log::warn!("Error while parsing ping metadata: {}", path);
}
None
}
#[derive(Debug, Clone)]
pub struct PingDirectoryManager {
pending_pings_dir: PathBuf,
deletion_request_pings_dir: PathBuf,
}
impl PingDirectoryManager {
pub fn new<P: Into<PathBuf>>(data_path: P) -> Self {
let data_path = data_path.into();
Self {
pending_pings_dir: data_path.join(PENDING_PINGS_DIRECTORY),
deletion_request_pings_dir: data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
}
}
pub fn delete_file(&self, uuid: &str) -> bool {
let path = match self.get_file_path(uuid) {
Some(path) => path,
None => {
log::warn!("Cannot find ping file to delete {}", uuid);
return false;
}
};
match fs::remove_file(&path) {
Err(e) => {
log::warn!("Error deleting file {}. {}", path.display(), e);
return false;
}
_ => log::info!("File was deleted {}", path.display()),
};
true
}
pub fn process_file(&self, document_id: &str) -> Option<PingPayload> {
let path = match self.get_file_path(document_id) {
Some(path) => path,
None => {
log::warn!("Cannot find ping file to process {}", document_id);
return None;
}
};
let file = match File::open(&path) {
Ok(file) => file,
Err(e) => {
log::warn!("Error reading ping file {}. {}", path.display(), e);
return None;
}
};
log::info!("Processing ping at: {}", path.display());
let mut lines = BufReader::new(file).lines();
if let (Some(Ok(path)), Some(Ok(body)), Ok(metadata)) =
(lines.next(), lines.next(), lines.next().transpose())
{
let headers = metadata.and_then(|m| process_metadata(&path, &m));
return Some((document_id.into(), path, body, headers));
} else {
log::warn!(
"Error processing ping file: {}. Ping file is not formatted as expected.",
document_id
);
}
self.delete_file(document_id);
None
}
pub fn process_dirs(&self) -> PingPayloadsByDirectory {
PingPayloadsByDirectory {
pending_pings: self.process_dir(&self.pending_pings_dir),
deletion_request_pings: self.process_dir(&self.deletion_request_pings_dir),
}
}
fn process_dir(&self, dir: &Path) -> Vec<(u64, PingPayload)> {
log::trace!("Processing persisted pings.");
let entries = match dir.read_dir() {
Ok(entries) => entries,
Err(_) => {
return Vec::new();
}
};
let mut pending_pings: Vec<_> = entries
.filter_map(|entry| entry.ok())
.filter_map(|entry| {
let path = entry.path();
if let Some(file_name) = get_file_name_as_str(&path) {
if Uuid::parse_str(file_name).is_err() {
log::warn!("Pattern mismatch. Deleting {}", path.display());
self.delete_file(file_name);
return None;
}
if let Some(data) = self.process_file(file_name) {
let metadata = match fs::metadata(&path) {
Ok(metadata) => metadata,
Err(e) => {
log::warn!(
"Unable to read metadata for file: {}, error: {:?}",
path.display(),
e
);
return None;
}
};
return Some((metadata, data));
}
};
None
})
.collect();
pending_pings.sort_by(|(a, _), (b, _)| {
if let (Ok(a), Ok(b)) = (a.modified(), b.modified()) {
a.cmp(&b)
} else {
Ordering::Less
}
});
pending_pings
.into_iter()
.map(|(metadata, data)| (metadata.len(), data))
.collect()
}
fn get_file_path(&self, document_id: &str) -> Option<PathBuf> {
for dir in [&self.pending_pings_dir, &self.deletion_request_pings_dir].iter() {
let path = dir.join(document_id);
if path.exists() {
return Some(path);
}
}
None
}
}
#[cfg(test)]
mod test {
use std::fs::File;
use super::*;
use crate::metrics::PingType;
use crate::tests::new_glean;
#[test]
fn doesnt_panic_if_no_pending_pings_directory() {
let dir = tempfile::tempdir().unwrap();
let directory_manager = PingDirectoryManager::new(dir.path());
let data = directory_manager.process_dirs();
assert_eq!(data.pending_pings.len(), 0);
assert_eq!(data.deletion_request_pings.len(), 0);
}
#[test]
fn gets_correct_data_from_valid_ping_file() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
ping_type.submit_sync(&glean, None);
let directory_manager = PingDirectoryManager::new(dir.path());
let data = directory_manager.process_dirs();
assert_eq!(data.pending_pings.len(), 1);
assert_eq!(data.deletion_request_pings.len(), 0);
let ping = &data.pending_pings[0].1;
let request_ping_type = ping.1.split('/').nth(3).unwrap();
assert_eq!(request_ping_type, "test");
}
#[test]
fn non_uuid_files_are_deleted_and_ignored() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
ping_type.submit_sync(&glean, None);
let directory_manager = PingDirectoryManager::new(&dir.path());
let not_uuid_path = dir
.path()
.join(PENDING_PINGS_DIRECTORY)
.join("not-uuid-file-name.txt");
File::create(¬_uuid_path).unwrap();
let data = directory_manager.process_dirs();
assert_eq!(data.pending_pings.len(), 1);
assert_eq!(data.deletion_request_pings.len(), 0);
let ping = &data.pending_pings[0].1;
let request_ping_type = ping.1.split('/').nth(3).unwrap();
assert_eq!(request_ping_type, "test");
assert!(!not_uuid_path.exists());
}
#[test]
fn wrongly_formatted_files_are_deleted_and_ignored() {
let (mut glean, dir) = new_glean(None);
let ping_type = PingType::new("test", true, true, vec![]);
glean.register_ping_type(&ping_type);
ping_type.submit_sync(&glean, None);
let directory_manager = PingDirectoryManager::new(&dir.path());
let wrong_contents_file_path = dir
.path()
.join(PENDING_PINGS_DIRECTORY)
.join(Uuid::new_v4().to_string());
File::create(&wrong_contents_file_path).unwrap();
let data = directory_manager.process_dirs();
assert_eq!(data.pending_pings.len(), 1);
assert_eq!(data.deletion_request_pings.len(), 0);
let ping = &data.pending_pings[0].1;
let request_ping_type = ping.1.split('/').nth(3).unwrap();
assert_eq!(request_ping_type, "test");
assert!(!wrong_contents_file_path.exists());
}
#[test]
fn takes_deletion_request_pings_into_account_while_processing() {
let (glean, dir) = new_glean(None);
glean
.internal_pings
.deletion_request
.submit_sync(&glean, None);
let directory_manager = PingDirectoryManager::new(dir.path());
let data = directory_manager.process_dirs();
assert_eq!(data.pending_pings.len(), 0);
assert_eq!(data.deletion_request_pings.len(), 1);
let ping = &data.deletion_request_pings[0].1;
let request_ping_type = ping.1.split('/').nth(3).unwrap();
assert_eq!(request_ping_type, "deletion-request");
}
}