use std::mem;
use std::sync::Arc;
use crate::common_metric_data::{CommonMetricDataInternal, DynamicLabelType};
use crate::error_recording::{record_error, test_get_num_recorded_errors, ErrorType};
use crate::histogram::{Functional, Histogram};
use crate::metrics::memory_unit::MemoryUnit;
use crate::metrics::{DistributionData, Metric, MetricType};
use crate::storage::StorageManager;
use crate::Glean;
use crate::{CommonMetricData, TestGetValue};
const LOG_BASE: f64 = 2.0;
const BUCKETS_PER_MAGNITUDE: f64 = 16.0;
const MAX_BYTES: u64 = 1 << 40;
#[derive(Clone, Debug)]
pub struct MemoryDistributionMetric {
meta: Arc<CommonMetricDataInternal>,
memory_unit: MemoryUnit,
}
pub(crate) fn snapshot(hist: &Histogram<Functional>) -> DistributionData {
DistributionData {
values: hist
.snapshot()
.iter()
.map(|(&k, &v)| (k as i64, v as i64))
.collect(),
sum: hist.sum() as i64,
count: hist.count() as i64,
}
}
impl MetricType for MemoryDistributionMetric {
fn meta(&self) -> &CommonMetricDataInternal {
&self.meta
}
fn with_name(&self, name: String) -> Self {
let mut meta = (*self.meta).clone();
meta.inner.name = name;
Self {
meta: Arc::new(meta),
memory_unit: self.memory_unit,
}
}
fn with_dynamic_label(&self, label: DynamicLabelType) -> Self {
let mut meta = (*self.meta).clone();
meta.inner.dynamic_label = Some(label);
Self {
meta: Arc::new(meta),
memory_unit: self.memory_unit,
}
}
}
impl MemoryDistributionMetric {
pub fn new(meta: CommonMetricData, memory_unit: MemoryUnit) -> Self {
Self {
meta: Arc::new(meta.into()),
memory_unit,
}
}
pub fn accumulate(&self, sample: i64) {
let metric = self.clone();
crate::launch_with_glean(move |glean| metric.accumulate_sync(glean, sample))
}
#[doc(hidden)]
pub fn accumulate_sync(&self, glean: &Glean, sample: i64) {
if !self.should_record(glean) {
return;
}
if sample < 0 {
record_error(
glean,
&self.meta,
ErrorType::InvalidValue,
"Accumulated a negative sample",
None,
);
return;
}
let mut sample = self.memory_unit.as_bytes(sample as u64);
if sample > MAX_BYTES {
let msg = "Sample is bigger than 1 terabyte";
record_error(glean, &self.meta, ErrorType::InvalidValue, msg, None);
sample = MAX_BYTES;
}
if let Some(storage) = glean.storage_opt() {
storage.record_with(glean, &self.meta, |old_value| match old_value {
Some(Metric::MemoryDistribution(mut hist)) => {
hist.accumulate(sample);
Metric::MemoryDistribution(hist)
}
_ => {
let mut hist = Histogram::functional(LOG_BASE, BUCKETS_PER_MAGNITUDE);
hist.accumulate(sample);
Metric::MemoryDistribution(hist)
}
});
} else {
log::warn!(
"Couldn't get storage. Can't record memory distribution '{}'.",
self.meta.base_identifier()
);
}
}
pub fn accumulate_samples(&self, samples: Vec<i64>) {
let metric = self.clone();
crate::launch_with_glean(move |glean| metric.accumulate_samples_sync(glean, samples))
}
#[doc(hidden)]
pub fn accumulate_samples_sync(&self, glean: &Glean, samples: Vec<i64>) {
if !self.should_record(glean) {
return;
}
let mut num_negative_samples = 0;
let mut num_too_log_samples = 0;
glean.storage().record_with(glean, &self.meta, |old_value| {
let mut hist = match old_value {
Some(Metric::MemoryDistribution(hist)) => hist,
_ => Histogram::functional(LOG_BASE, BUCKETS_PER_MAGNITUDE),
};
for &sample in samples.iter() {
if sample < 0 {
num_negative_samples += 1;
} else {
let sample = sample as u64;
let mut sample = self.memory_unit.as_bytes(sample);
if sample > MAX_BYTES {
num_too_log_samples += 1;
sample = MAX_BYTES;
}
hist.accumulate(sample);
}
}
Metric::MemoryDistribution(hist)
});
if num_negative_samples > 0 {
let msg = format!("Accumulated {} negative samples", num_negative_samples);
record_error(
glean,
&self.meta,
ErrorType::InvalidValue,
msg,
num_negative_samples,
);
}
if num_too_log_samples > 0 {
let msg = format!(
"Accumulated {} samples larger than 1TB",
num_too_log_samples
);
record_error(
glean,
&self.meta,
ErrorType::InvalidValue,
msg,
num_too_log_samples,
);
}
}
#[doc(hidden)]
pub fn get_value<'a, S: Into<Option<&'a str>>>(
&self,
glean: &Glean,
ping_name: S,
) -> Option<DistributionData> {
let queried_ping_name = ping_name
.into()
.unwrap_or_else(|| &self.meta().inner.send_in_pings[0]);
match StorageManager.snapshot_metric(
glean.storage(),
queried_ping_name,
&self.meta.identifier(glean),
self.meta.inner.lifetime,
) {
Some(Metric::MemoryDistribution(hist)) => Some(snapshot(&hist)),
_ => None,
}
}
pub fn test_get_num_recorded_errors(&self, error: ErrorType) -> i32 {
crate::block_on_dispatcher();
crate::core::with_glean(|glean| {
test_get_num_recorded_errors(glean, self.meta(), error).unwrap_or(0)
})
}
pub fn start_buffer(&self) -> LocalMemoryDistribution<'_> {
LocalMemoryDistribution::new(self)
}
fn commit_histogram(&self, histogram: Histogram<Functional>, errors: usize) {
let metric = self.clone();
crate::launch_with_glean(move |glean| {
if errors > 0 {
let msg = format!("Accumulated {} samples larger than 1TB", errors);
record_error(
glean,
&metric.meta,
ErrorType::InvalidValue,
msg,
Some(errors as i32),
);
}
glean
.storage()
.record_with(glean, &metric.meta, move |old_value| {
let mut hist = match old_value {
Some(Metric::MemoryDistribution(hist)) => hist,
_ => Histogram::functional(LOG_BASE, BUCKETS_PER_MAGNITUDE),
};
hist.merge(&histogram);
Metric::MemoryDistribution(hist)
});
});
}
}
impl TestGetValue for MemoryDistributionMetric {
type Output = DistributionData;
fn test_get_value(&self, ping_name: Option<String>) -> Option<DistributionData> {
crate::block_on_dispatcher();
crate::core::with_glean(|glean| self.get_value(glean, ping_name.as_deref()))
}
}
#[derive(Debug)]
pub struct LocalMemoryDistribution<'a> {
histogram: Histogram<Functional>,
metric: &'a MemoryDistributionMetric,
errors: usize,
}
impl<'a> LocalMemoryDistribution<'a> {
fn new(metric: &'a MemoryDistributionMetric) -> Self {
let histogram = Histogram::functional(LOG_BASE, BUCKETS_PER_MAGNITUDE);
Self {
histogram,
metric,
errors: 0,
}
}
pub fn accumulate(&mut self, sample: u64) {
let mut sample = self.metric.memory_unit.as_bytes(sample);
if sample > MAX_BYTES {
self.errors += 1;
sample = MAX_BYTES;
}
self.histogram.accumulate(sample)
}
pub fn abandon(mut self) {
self.histogram.clear();
}
}
impl Drop for LocalMemoryDistribution<'_> {
fn drop(&mut self) {
if self.histogram.is_empty() {
return;
}
let buffer = mem::replace(&mut self.histogram, Histogram::functional(0.0, 0.0));
self.metric.commit_histogram(buffer, self.errors);
}
}