#![allow(clippy::float_cmp)]
pub use super::stats::*;
use slog::o;
#[allow(unused_imports)] use std::io::BufRead;
use std::sync::Mutex;
pub type Buffer = iobuffer::IoBuffer;
pub fn new_test_logger(data: Buffer) -> slog::Logger {
slog::Logger::root(
slog::Fuse::new(Mutex::new(slog_json::Json::default(data))),
o!(),
)
}
pub fn read_json_values(data: &mut Buffer) -> Vec<serde_json::Value> {
let iter = data.lines().map(move |line| {
serde_json::from_slice::<serde_json::Value>(&line).expect("JSON parse error")
});
iter.collect()
}
pub fn log_in_range(min_id: &str, max_id: &str, log: &serde_json::Value) -> bool {
match log["log_id"].as_str() {
Some(log_id) => log_id >= min_id && log_id < max_id,
None => false,
}
}
pub fn logs_in_range(min_id: &str, max_id: &str, data: &mut Buffer) -> Vec<serde_json::Value> {
let mut v = read_json_values(data)
.into_iter()
.filter(|log| log_in_range(min_id, max_id, log))
.collect::<Vec<_>>(); v.sort_by_key(|log| log["log_id"].as_str().map(ToString::to_string));
v
}
pub fn assert_json_matches(actual: &serde_json::Value, expected: &serde_json::Value) {
fn check(
actual: &serde_json::Value,
expected: &serde_json::Value,
left: &serde_json::Value,
right: &serde_json::Value,
path: &str,
) {
if left.is_object() && right.is_object() {
for (key, value) in right.as_object().unwrap().iter() {
let path = format!("{}.{}", path, key);
check(actual, expected, &left[key], value, &path);
}
} else if left.is_array() && right.is_array() {
for (index, value) in right.as_array().unwrap().iter().enumerate() {
let path = format!("{}.{}", path, index);
check(actual, expected, &left[index], value, &path);
}
} else {
assert!(
left == right,
"Mismatch at {}:\nexpected:\n{}\nbut found:\n{}",
path,
expected,
actual
);
}
}
check(actual, expected, actual, expected, "");
}
pub static TEST_LOG_INTERVAL: u64 = 5;
#[cfg(feature = "interval_logging")]
fn new_stats_logger(stats: StatDefinitions, logger: slog::Logger) -> StatisticsLogger {
let builder = StatsLoggerBuilder::default();
builder
.with_stats(vec![stats])
.fuse_with_log_interval::<DefaultStatisticsLogFormatter>(TEST_LOG_INTERVAL, logger)
}
#[cfg(not(feature = "interval_logging"))]
fn new_stats_logger(stats: StatDefinitions, logger: slog::Logger) -> StatisticsLogger {
let builder = StatsLoggerBuilder::default();
builder.with_stats(vec![stats]).fuse(logger)
}
pub fn create_logger_buffer(stats: StatDefinitions) -> (StatisticsLogger, Buffer) {
let data = iobuffer::IoBuffer::new();
let logger = new_test_logger(data.clone());
let stats_logger = new_stats_logger(stats, logger);
(stats_logger, data)
}
pub struct ExpectedStat {
pub stat_name: &'static str,
pub tag: Option<&'static str>,
pub value: f64,
pub metric_type: &'static str,
}
pub fn check_expected_stats(logs: &[serde_json::Value], mut expected_stats: Vec<ExpectedStat>) {
for log in logs {
let mut matched = None;
for (id, exp) in expected_stats.iter().enumerate() {
if log["name"] == exp.stat_name
&& (exp.tag.is_none() || log["tags"] == exp.tag.unwrap())
{
assert_eq!(logs[0]["metric_type"], exp.metric_type);
assert_eq!(log["value"].as_f64(), Some(exp.value));
matched = Some(id);
break;
}
}
assert!(matched.is_some());
expected_stats.remove(matched.unwrap());
}
assert_eq!(expected_stats.len(), 0);
}
#[derive(Debug)]
pub struct ExpectedStatSnapshot {
pub name: &'static str,
pub description: &'static str,
pub stat_type: StatType,
pub values: Vec<ExpectedStatSnapshotValue>,
pub buckets: Option<Buckets>,
}
#[derive(Debug)]
pub struct ExpectedStatSnapshotValue {
pub group_values: Vec<String>,
pub value: f64,
pub bucket_limit: Option<BucketLimit>,
}
pub fn check_expected_stat_snapshots(
stats: &[StatSnapshot],
expected_stats: &[ExpectedStatSnapshot],
) {
for stat in expected_stats {
let found_stat = stats.iter().find(|s| s.definition.name() == stat.name);
assert!(found_stat.is_some(), "Failed to find stat {}", stat.name);
let found_stat = found_stat.unwrap();
assert_eq!(found_stat.definition.stype(), stat.stat_type);
assert_eq!(found_stat.definition.description(), stat.description);
assert_eq!(found_stat.definition.buckets(), stat.buckets);
match found_stat.values {
StatSnapshotValues::Counter(ref vals) | StatSnapshotValues::Gauge(ref vals) => {
for value in &stat.values {
let found_value = vals
.iter()
.find(|val| val.group_values == value.group_values);
assert!(
found_value.is_some(),
"Failed to find value with groups {:?} and bucket_limit {:?} for stat {}",
value.group_values, value.bucket_limit, stat.name );
let found_value = found_value.unwrap();
assert_eq!(found_value.group_values, value.group_values);
assert_eq!(found_value.value, value.value);
}
}
StatSnapshotValues::BucketCounter(ref buckets, ref vals) => {
assert_eq!(Some(buckets), stat.buckets.as_ref());
for value in &stat.values {
let found_value = vals.iter().find(|(val, bucket)| {
val.group_values == value.group_values
&& Some(bucket) == value.bucket_limit.as_ref()
});
assert!(
found_value.is_some(),
"Failed to find value with groups {:?} and bucket_limit {:?} for stat {}",
value.group_values, value.bucket_limit, stat.name );
let (found_value, _) = found_value.unwrap();
assert_eq!(found_value.group_values, value.group_values);
assert_eq!(found_value.value, value.value);
}
}
}
}
assert_eq!(stats.len(), expected_stats.len());
}
#[cfg(test)]
mod tests {
use super::*;
use slog::debug;
use std::sync::mpsc;
use std::thread;
#[test]
fn test_partial_write() {
let mut data = iobuffer::IoBuffer::new();
let logger = new_test_logger(data.clone());
let (started_send, started_recv) = mpsc::channel();
let (done_send, done_recv) = mpsc::channel();
let _ = thread::spawn(move || {
started_send.send(()).unwrap();
while done_recv.try_recv().is_err() {
debug!(logger, "Some data";
"alfa" => "alpha",
"bravo" => "beta",
"charlie" => "gamma",
"delta" => "delta",
"echo" => "epsilon");
}
});
started_recv.recv().unwrap();
let _ = read_json_values(&mut data);
done_send.send(()).unwrap();
}
}