use std::collections::HashMap;
use std::fs;
use std::fs::{create_dir_all, File, OpenOptions};
use std::io::BufRead;
use std::io::BufReader;
use std::io::Write;
use std::iter::FromIterator;
use std::path::{Path, PathBuf};
use std::sync::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JsonValue};
use crate::CommonMetricData;
use crate::Glean;
use crate::Result;
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct RecordedEvent {
pub timestamp: u64,
pub category: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<HashMap<String, String>>,
}
impl RecordedEvent {
fn serialize_relative(&self, timestamp_offset: u64) -> JsonValue {
json!(&RecordedEvent {
timestamp: self.timestamp - timestamp_offset,
category: self.category.clone(),
name: self.name.clone(),
extra: self.extra.clone(),
})
}
}
#[derive(Debug)]
pub struct EventDatabase {
pub path: PathBuf,
event_stores: RwLock<HashMap<String, Vec<RecordedEvent>>>,
file_lock: RwLock<()>,
}
impl EventDatabase {
pub fn new(data_path: &str) -> Result<Self> {
let path = Path::new(data_path).join("events");
create_dir_all(&path)?;
Ok(Self {
path,
event_stores: RwLock::new(HashMap::new()),
file_lock: RwLock::new(()),
})
}
pub fn flush_pending_events_on_startup(&self, glean: &Glean) -> bool {
match self.load_events_from_disk() {
Ok(_) => self.send_all_events(glean),
Err(err) => {
log::warn!("Error loading events from disk: {}", err);
false
}
}
}
fn load_events_from_disk(&self) -> Result<()> {
let mut db = self.event_stores.write().unwrap();
let _lock = self.file_lock.read().unwrap();
for entry in fs::read_dir(&self.path)? {
let entry = entry?;
if entry.file_type()?.is_file() {
let store_name = entry.file_name().into_string()?;
let file = BufReader::new(File::open(entry.path())?);
db.insert(
store_name,
file.lines()
.filter_map(|line| line.ok())
.filter_map(|line| serde_json::from_str::<RecordedEvent>(&line).ok())
.collect(),
);
}
}
Ok(())
}
fn send_all_events(&self, glean: &Glean) -> bool {
let store_names = {
let db = self.event_stores.read().unwrap();
db.keys().cloned().collect::<Vec<String>>()
};
let mut ping_sent = false;
for store_name in store_names {
if let Err(err) = glean.submit_ping_by_name(&store_name, Some("startup")) {
log::warn!(
"Error flushing existing events to the '{}' ping: {}",
store_name,
err
);
} else {
ping_sent = true;
}
}
ping_sent
}
pub fn record(
&self,
glean: &Glean,
meta: &CommonMetricData,
timestamp: u64,
extra: Option<HashMap<String, String>>,
) {
if !glean.is_upload_enabled() {
return;
}
let event = RecordedEvent {
timestamp,
category: meta.category.to_string(),
name: meta.name.to_string(),
extra,
};
let event_json = serde_json::to_string(&event).unwrap();
let mut stores_to_submit: Vec<&str> = Vec::new();
{
let mut db = self.event_stores.write().unwrap();
for store_name in meta.send_in_pings.iter() {
let store = db.entry(store_name.to_string()).or_insert_with(Vec::new);
store.push(event.clone());
self.write_event_to_disk(store_name, &event_json);
if store.len() == glean.get_max_events() {
stores_to_submit.push(&store_name);
}
}
}
for store_name in stores_to_submit {
if let Err(err) = glean.submit_ping_by_name(store_name, Some("max_capacity")) {
log::warn!(
"Got more than {} events, but could not send {} ping: {}",
glean.get_max_events(),
store_name,
err
);
}
}
}
fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
let _lock = self.file_lock.write().unwrap();
if let Err(err) = OpenOptions::new()
.create(true)
.append(true)
.open(self.path.join(store_name))
.and_then(|mut file| writeln!(file, "{}", event_json))
{
log::warn!("IO error writing event to store '{}': {}", store_name, err);
}
}
pub fn snapshot_as_json(&self, store_name: &str, clear_store: bool) -> Option<JsonValue> {
let result = {
let mut db = self.event_stores.write().unwrap();
db.get_mut(&store_name.to_string()).and_then(|store| {
if !store.is_empty() {
store.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
let first_timestamp = store[0].timestamp;
Some(JsonValue::from_iter(
store.iter().map(|e| e.serialize_relative(first_timestamp)),
))
} else {
log::warn!("Unexpectly got empty event store for '{}'", store_name);
None
}
})
};
if clear_store {
self.event_stores
.write()
.unwrap()
.remove(&store_name.to_string());
let _lock = self.file_lock.write().unwrap();
if let Err(err) = fs::remove_file(self.path.join(store_name)) {
match err.kind() {
std::io::ErrorKind::NotFound => {
}
_ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
}
}
}
result
}
pub fn clear_all(&self) -> Result<()> {
self.event_stores.write().unwrap().clear();
let _lock = self.file_lock.write().unwrap();
std::fs::remove_dir_all(&self.path)?;
create_dir_all(&self.path)?;
Ok(())
}
pub fn test_has_value<'a>(&'a self, meta: &'a CommonMetricData, store_name: &str) -> bool {
self.event_stores
.read()
.unwrap()
.get(&store_name.to_string())
.into_iter()
.flatten()
.any(|event| event.name == meta.name && event.category == meta.category)
}
pub fn test_get_value<'a>(
&'a self,
meta: &'a CommonMetricData,
store_name: &str,
) -> Option<Vec<RecordedEvent>> {
let value: Vec<RecordedEvent> = self
.event_stores
.read()
.unwrap()
.get(&store_name.to_string())
.into_iter()
.flatten()
.filter(|event| event.name == meta.name && event.category == meta.category)
.cloned()
.collect();
if !value.is_empty() {
Some(value)
} else {
None
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::tests::new_glean;
use crate::CommonMetricData;
#[test]
fn handle_truncated_events_on_disk() {
let t = tempfile::tempdir().unwrap();
{
let db = EventDatabase::new(&t.path().display().to_string()).unwrap();
db.write_event_to_disk("events", "{\"timestamp\": 500");
db.write_event_to_disk("events", "{\"timestamp\"");
db.write_event_to_disk(
"events",
"{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
);
}
{
let db = EventDatabase::new(&t.path().display().to_string()).unwrap();
db.load_events_from_disk().unwrap();
let events = &db.event_stores.read().unwrap()["events"];
assert_eq!(1, events.len());
}
}
#[test]
fn stable_serialization() {
let event_empty = RecordedEvent {
timestamp: 2,
category: "cat".to_string(),
name: "name".to_string(),
extra: None,
};
let mut data = HashMap::new();
data.insert("a key".to_string(), "a value".to_string());
let event_data = RecordedEvent {
timestamp: 2,
category: "cat".to_string(),
name: "name".to_string(),
extra: Some(data),
};
let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
assert_eq!(
event_empty,
serde_json::from_str(&event_empty_json).unwrap()
);
assert_eq!(event_data, serde_json::from_str(&event_data_json).unwrap());
}
#[test]
fn deserialize_existing_data() {
let event_empty_json = r#"
{
"timestamp": 2,
"category": "cat",
"name": "name"
}
"#;
let event_data_json = r#"
{
"timestamp": 2,
"category": "cat",
"name": "name",
"extra": {
"a key": "a value"
}
}
"#;
let event_empty = RecordedEvent {
timestamp: 2,
category: "cat".to_string(),
name: "name".to_string(),
extra: None,
};
let mut data = HashMap::new();
data.insert("a key".to_string(), "a value".to_string());
let event_data = RecordedEvent {
timestamp: 2,
category: "cat".to_string(),
name: "name".to_string(),
extra: Some(data),
};
assert_eq!(
event_empty,
serde_json::from_str(&event_empty_json).unwrap()
);
assert_eq!(event_data, serde_json::from_str(&event_data_json).unwrap());
}
#[test]
fn doesnt_record_when_upload_is_disabled() {
let (mut glean, dir) = new_glean(None);
let db = EventDatabase::new(dir.path().to_str().unwrap()).unwrap();
let test_storage = "test-storage";
let test_category = "category";
let test_name = "name";
let test_timestamp = 2;
let test_meta = CommonMetricData::new(test_category, test_name, test_storage);
let event_data = RecordedEvent {
timestamp: test_timestamp,
category: test_category.to_string(),
name: test_name.to_string(),
extra: None,
};
db.record(&glean, &test_meta, 2, None);
{
let event_stores = db.event_stores.read().unwrap();
assert_eq!(&event_data, &event_stores.get(test_storage).unwrap()[0]);
assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
}
glean.set_upload_enabled(false);
db.record(&glean, &test_meta, 2, None);
{
let event_stores = db.event_stores.read().unwrap();
assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
}
}
}