use std::fs::{self, create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use log::info;
use serde_json::{json, Value as JsonValue};
use crate::common_metric_data::{CommonMetricData, Lifetime};
use crate::metrics::{CounterMetric, DatetimeMetric, Metric, MetricType, PingType, TimeUnit};
use crate::storage::{StorageManager, INTERNAL_STORAGE};
use crate::upload::{HeaderMap, PingMetadata};
use crate::util::{get_iso_time_string, local_now_with_offset};
use crate::{Glean, Result, DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};
pub struct Ping<'a> {
pub doc_id: &'a str,
pub name: &'a str,
pub url_path: &'a str,
pub content: JsonValue,
pub headers: HeaderMap,
pub includes_info_sections: bool,
pub schedules_pings: Vec<String>,
pub uploader_capabilities: Vec<String>,
}
pub struct PingMaker;
fn merge(a: &mut JsonValue, b: &JsonValue) {
match (a, b) {
(&mut JsonValue::Object(ref mut a), JsonValue::Object(b)) => {
for (k, v) in b {
merge(a.entry(k.clone()).or_insert(JsonValue::Null), v);
}
}
(a, b) => {
*a = b.clone();
}
}
}
impl Default for PingMaker {
fn default() -> Self {
Self::new()
}
}
impl PingMaker {
pub fn new() -> Self {
Self
}
fn get_ping_seq(&self, glean: &Glean, storage_name: &str) -> usize {
if !glean.is_ping_enabled(storage_name) {
return 0;
}
let seq = CounterMetric::new(CommonMetricData {
name: format!("{}#sequence", storage_name),
category: "".into(),
send_in_pings: vec![INTERNAL_STORAGE.into()],
lifetime: Lifetime::User,
..Default::default()
});
let current_seq = match StorageManager.snapshot_metric(
glean.storage(),
INTERNAL_STORAGE,
&seq.meta().identifier(glean),
seq.meta().inner.lifetime,
) {
Some(Metric::Counter(i)) => i,
_ => 0,
};
seq.add_sync(glean, 1);
current_seq as usize
}
fn get_start_end_times(
&self,
glean: &Glean,
storage_name: &str,
time_unit: TimeUnit,
) -> (String, String) {
let start_time = DatetimeMetric::new(
CommonMetricData {
name: format!("{}#start", storage_name),
category: "".into(),
send_in_pings: vec![INTERNAL_STORAGE.into()],
lifetime: Lifetime::User,
..Default::default()
},
time_unit,
);
let start_time_data = start_time
.get_value(glean, INTERNAL_STORAGE)
.unwrap_or_else(|| glean.start_time());
let end_time_data = local_now_with_offset();
start_time.set_sync_chrono(glean, end_time_data);
let start_time_data = get_iso_time_string(start_time_data, time_unit);
let end_time_data = get_iso_time_string(end_time_data, time_unit);
(start_time_data, end_time_data)
}
fn get_ping_info(
&self,
glean: &Glean,
storage_name: &str,
reason: Option<&str>,
precision: TimeUnit,
) -> JsonValue {
let (start_time, end_time) = self.get_start_end_times(glean, storage_name, precision);
let mut map = json!({
"seq": self.get_ping_seq(glean, storage_name),
"start_time": start_time,
"end_time": end_time,
});
if let Some(reason) = reason {
map.as_object_mut()
.unwrap() .insert("reason".to_string(), JsonValue::String(reason.to_string()));
};
if let Some(experiment_data) =
StorageManager.snapshot_experiments_as_json(glean.storage(), INTERNAL_STORAGE)
{
map.as_object_mut()
.unwrap() .insert("experiments".to_string(), experiment_data);
};
if let Some(config_json) = glean
.additional_metrics
.server_knobs_config
.get_value(glean, INTERNAL_STORAGE)
{
let server_knobs_config = serde_json::from_str(&config_json).unwrap();
map.as_object_mut()
.unwrap() .insert("server_knobs_config".to_string(), server_knobs_config);
}
map
}
fn get_client_info(&self, glean: &Glean, include_client_id: bool) -> JsonValue {
let mut map = json!({
"telemetry_sdk_build": crate::GLEAN_VERSION,
});
if let Some(client_info) =
StorageManager.snapshot_as_json(glean.storage(), "glean_client_info", true)
{
let client_info_obj = client_info.as_object().unwrap(); for (_metric_type, metrics) in client_info_obj {
merge(&mut map, metrics);
}
let map = map.as_object_mut().unwrap(); let mut attribution = serde_json::Map::new();
let mut distribution = serde_json::Map::new();
map.retain(|name, value| {
let mut split = name.split('.');
let category = split.next();
let name = split.next();
if let (Some(category), Some(name)) = (category, name) {
if category == "attribution" {
attribution.insert(name.into(), value.take());
false
} else if category == "distribution" {
distribution.insert(name.into(), value.take());
false
} else {
true
}
} else {
true
}
});
if !attribution.is_empty() {
map.insert("attribution".into(), serde_json::Value::from(attribution));
}
if !distribution.is_empty() {
map.insert("distribution".into(), serde_json::Value::from(distribution));
}
} else {
log::warn!("Empty client info data.");
}
if !include_client_id {
map.as_object_mut().unwrap().remove("client_id");
}
json!(map)
}
fn get_headers(&self, glean: &Glean) -> HeaderMap {
let mut headers_map = HeaderMap::new();
if let Some(debug_view_tag) = glean.debug_view_tag() {
headers_map.insert("X-Debug-ID".to_string(), debug_view_tag.to_string());
}
if let Some(source_tags) = glean.source_tags() {
headers_map.insert("X-Source-Tags".to_string(), source_tags.join(","));
}
headers_map
}
pub fn collect<'a>(
&self,
glean: &Glean,
ping: &'a PingType,
reason: Option<&str>,
doc_id: &'a str,
url_path: &'a str,
) -> Option<Ping<'a>> {
info!("Collecting {}", ping.name());
let database = glean.storage();
let write_samples = database.write_timings.replace(Vec::with_capacity(64));
if !write_samples.is_empty() {
glean
.database_metrics
.write_time
.accumulate_samples_sync(glean, &write_samples);
}
let mut metrics_data = StorageManager.snapshot_as_json(database, ping.name(), true);
let events_data = glean
.event_storage()
.snapshot_as_json(glean, ping.name(), true);
let uploader_capabilities = ping.uploader_capabilities();
if !uploader_capabilities.is_empty() {
if metrics_data.is_none() && (ping.send_if_empty() || events_data.is_some()) {
metrics_data = Some(json!({}))
}
if let Some(map) = metrics_data.as_mut().and_then(|o| o.as_object_mut()) {
let lists = map
.entry("string_list")
.or_insert_with(|| json!({}))
.as_object_mut()
.unwrap();
lists.insert(
"glean.ping.uploader_capabilities".to_string(),
json!(uploader_capabilities),
);
}
}
if (!ping.include_client_id() || !ping.send_if_empty() || !ping.include_info_sections())
&& glean.test_get_experimentation_id().is_some()
&& metrics_data.is_some()
{
let metrics = metrics_data.as_mut().unwrap().as_object_mut().unwrap();
let metrics_count = metrics.len();
let strings = metrics.get_mut("string").unwrap().as_object_mut().unwrap();
let string_count = strings.len();
let empty_payload = events_data.is_none() && metrics_count == 1 && string_count == 1;
if !ping.include_client_id() || (!ping.send_if_empty() && empty_payload) {
strings.remove("glean.client.annotation.experimentation_id");
}
if strings.is_empty() {
metrics.remove("string");
}
if metrics.is_empty() {
metrics_data = None;
}
}
let is_empty = metrics_data.is_none() && events_data.is_none();
if !ping.send_if_empty() && is_empty {
info!("Storage for {} empty. Bailing out.", ping.name());
return None;
} else if ping.name() == "events" && events_data.is_none() {
info!("No events for 'events' ping. Bailing out.");
return None;
} else if is_empty {
info!(
"Storage for {} empty. Ping will still be sent.",
ping.name()
);
}
let precision = if ping.precise_timestamps() {
TimeUnit::Millisecond
} else {
TimeUnit::Minute
};
let mut json = if ping.include_info_sections() {
let ping_info = self.get_ping_info(glean, ping.name(), reason, precision);
let client_info = self.get_client_info(glean, ping.include_client_id());
json!({
"ping_info": ping_info,
"client_info": client_info
})
} else {
json!({})
};
let json_obj = json.as_object_mut()?;
if let Some(metrics_data) = metrics_data {
json_obj.insert("metrics".to_string(), metrics_data);
}
if let Some(events_data) = events_data {
json_obj.insert("events".to_string(), events_data);
}
Some(Ping {
content: json,
name: ping.name(),
doc_id,
url_path,
headers: self.get_headers(glean),
includes_info_sections: ping.include_info_sections(),
schedules_pings: ping.schedules_pings().to_vec(),
uploader_capabilities: ping.uploader_capabilities().to_vec(),
})
}
fn get_pings_dir(&self, data_path: &Path, ping_type: Option<&str>) -> std::io::Result<PathBuf> {
let pings_dir = match ping_type {
Some("deletion-request") => data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
_ => data_path.join(PENDING_PINGS_DIRECTORY),
};
create_dir_all(&pings_dir)?;
Ok(pings_dir)
}
fn get_tmp_dir(&self, data_path: &Path) -> std::io::Result<PathBuf> {
let pings_dir = data_path.join("tmp");
create_dir_all(&pings_dir)?;
Ok(pings_dir)
}
pub fn store_ping(&self, data_path: &Path, ping: &Ping) -> std::io::Result<()> {
let pings_dir = self.get_pings_dir(data_path, Some(ping.name))?;
let temp_dir = self.get_tmp_dir(data_path)?;
let temp_ping_path = temp_dir.join(ping.doc_id);
let ping_path = pings_dir.join(ping.doc_id);
log::debug!(
"Storing ping '{}' at '{}'",
ping.doc_id,
ping_path.display()
);
{
let mut file = File::create(&temp_ping_path)?;
file.write_all(ping.url_path.as_bytes())?;
file.write_all(b"\n")?;
file.write_all(::serde_json::to_string(&ping.content)?.as_bytes())?;
file.write_all(b"\n")?;
let metadata = PingMetadata {
headers: Some(ping.headers.clone()),
body_has_info_sections: Some(ping.includes_info_sections),
ping_name: Some(ping.name.to_string()),
uploader_capabilities: Some(ping.uploader_capabilities.clone()),
};
file.write_all(::serde_json::to_string(&metadata)?.as_bytes())?;
}
if let Err(e) = std::fs::rename(&temp_ping_path, &ping_path) {
log::warn!(
"Unable to move '{}' to '{}",
temp_ping_path.display(),
ping_path.display()
);
return Err(e);
}
Ok(())
}
pub fn clear_pending_pings(&self, data_path: &Path, ping_names: &[&str]) -> Result<()> {
let pings_dir = self.get_pings_dir(data_path, None)?;
let entries = pings_dir.read_dir()?;
for entry in entries.filter_map(|entry| entry.ok()) {
if let Ok(file_type) = entry.file_type() {
if !file_type.is_file() {
continue;
}
} else {
continue;
}
let file = match File::open(entry.path()) {
Ok(file) => file,
Err(_) => {
continue;
}
};
let mut lines = BufReader::new(file).lines();
if let (Some(Ok(path)), Some(Ok(_body)), Ok(metadata)) =
(lines.next(), lines.next(), lines.next().transpose())
{
let PingMetadata { ping_name, .. } = metadata
.and_then(|m| crate::upload::process_metadata(&path, &m))
.unwrap_or_default();
let ping_name =
ping_name.unwrap_or_else(|| path.split('/').nth(3).unwrap_or("").into());
if ping_names.contains(&&ping_name[..]) {
_ = fs::remove_file(entry.path());
}
} else {
continue;
}
}
log::debug!("All pending pings deleted");
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::tests::new_glean;
#[test]
fn sequence_numbers_should_be_reset_when_toggling_uploading() {
let (mut glean, _t) = new_glean(None);
let ping_maker = PingMaker::new();
assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
assert_eq!(1, ping_maker.get_ping_seq(&glean, "store1"));
glean.set_upload_enabled(false);
assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
glean.set_upload_enabled(true);
assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
assert_eq!(1, ping_maker.get_ping_seq(&glean, "store1"));
}
#[test]
fn test_server_knobs_config_appears_in_ping_info() {
use crate::metrics::RemoteSettingsConfig;
use std::collections::HashMap;
let (glean, _t) = new_glean(None);
let mut metrics_enabled = HashMap::new();
metrics_enabled.insert("test.counter".to_string(), true);
let mut pings_enabled = HashMap::new();
pings_enabled.insert("custom".to_string(), false);
let config = RemoteSettingsConfig {
metrics_enabled,
pings_enabled,
event_threshold: Some(41),
};
glean.apply_server_knobs_config(config);
let ping_maker = PingMaker::new();
let ping_info = ping_maker.get_ping_info(&glean, "store1", None, TimeUnit::Minute);
let server_knobs = &ping_info["server_knobs_config"];
assert_eq!(server_knobs["metrics_enabled"]["test.counter"], true);
assert_eq!(server_knobs["pings_enabled"]["custom"], false);
assert_eq!(server_knobs["event_threshold"], 41);
}
#[test]
fn test_server_knobs_not_included_when_no_config() {
let (glean, _t) = new_glean(None);
let ping_maker = PingMaker::new();
let ping_info = ping_maker.get_ping_info(&glean, "store1", None, TimeUnit::Minute);
assert!(ping_info.get("server_knobs_config").is_none());
}
#[test]
fn test_server_knobs_appears_in_all_pings() {
use crate::metrics::RemoteSettingsConfig;
use std::collections::HashMap;
let (glean, _t) = new_glean(None);
let mut metrics_enabled = HashMap::new();
metrics_enabled.insert("test.counter".to_string(), true);
let config = RemoteSettingsConfig {
metrics_enabled,
..Default::default()
};
glean.apply_server_knobs_config(config);
let ping_maker = PingMaker::new();
let ping_info1 = ping_maker.get_ping_info(&glean, "store1", None, TimeUnit::Minute);
let ping_info2 = ping_maker.get_ping_info(&glean, "store2", None, TimeUnit::Minute);
assert_eq!(
ping_info1["server_knobs_config"]["metrics_enabled"]["test.counter"],
true
);
assert_eq!(
ping_info2["server_knobs_config"]["metrics_enabled"]["test.counter"],
true
);
}
#[test]
fn test_server_knobs_config_omits_empty_fields() {
use crate::metrics::RemoteSettingsConfig;
use std::collections::HashMap;
let (glean, _t) = new_glean(None);
let mut metrics_enabled = HashMap::new();
metrics_enabled.insert("test.counter".to_string(), true);
let config = RemoteSettingsConfig {
metrics_enabled,
..Default::default()
};
glean.apply_server_knobs_config(config);
let ping_maker = PingMaker::new();
let ping_info = ping_maker.get_ping_info(&glean, "store1", None, TimeUnit::Minute);
let server_knobs = &ping_info["server_knobs_config"];
assert_eq!(server_knobs["metrics_enabled"]["test.counter"], true);
assert!(server_knobs.get("pings_enabled").is_none());
assert!(server_knobs.get("event_threshold").is_none());
}
}