use assert2::check;
use metrique::unit_of_work::metrics;
use metrique_aggregation::aggregate;
use metrique_aggregation::aggregator::KeyedAggregator;
use metrique_aggregation::histogram::{Histogram, SortAndMerge};
use metrique_aggregation::sink::WorkerSink;
use metrique_aggregation::sink::{TeeSink, non_aggregate};
use metrique_aggregation::traits::{AggregateStrategy, Key};
use metrique_writer::test_util::test_entry_sink;
use std::borrow::Cow;
use std::time::Duration;
#[aggregate(ref)]
#[metrics]
pub struct ApiCall {
#[aggregate(key)]
endpoint: String,
#[aggregate(strategy = Histogram<Duration, SortAndMerge>)]
latency: Duration,
}
struct ByEndpointAndThreshold;
#[derive(Clone, Hash, PartialEq, Eq)]
#[metrics]
struct ThresholdKey<'a> {
endpoint: Cow<'a, String>,
over_1s: bool,
}
struct ThresholdKeyExtractor;
impl Key<ApiCallEntry> for ThresholdKeyExtractor {
type Key<'a> = ThresholdKey<'a>;
fn from_source(source: &ApiCallEntry) -> Self::Key<'_> {
#[allow(deprecated)]
ThresholdKey {
endpoint: Cow::Borrowed(&source.endpoint),
over_1s: source.latency >= Duration::from_secs(1),
}
}
fn static_key<'a>(key: &Self::Key<'a>) -> Self::Key<'static> {
ThresholdKey {
endpoint: Cow::Owned(key.endpoint.clone().into_owned()),
over_1s: key.over_1s,
}
}
fn static_key_matches<'a>(owned: &Self::Key<'static>, borrowed: &Self::Key<'a>) -> bool {
owned == borrowed
}
}
impl AggregateStrategy for ByEndpointAndThreshold {
type Source = ApiCallEntry;
type Key = ThresholdKeyExtractor;
}
#[tokio::test]
async fn test_split_sink() {
let aggregated_sink_a = test_entry_sink();
let aggregated_sink_b = test_entry_sink();
let non_aggregated_sink = test_entry_sink();
let aggregator_a = KeyedAggregator::<ApiCall>::new(aggregated_sink_a.sink);
let aggregator_b = KeyedAggregator::<ByEndpointAndThreshold>::new(aggregated_sink_b.sink);
let split = TeeSink::new(
aggregator_a,
TeeSink::new(aggregator_b, non_aggregate(non_aggregated_sink.sink)),
);
let sink = WorkerSink::new(split, Duration::from_secs(10));
ApiCall {
endpoint: "api1".to_string(),
latency: Duration::from_millis(500),
}
.close_and_merge(sink.clone());
ApiCall {
endpoint: "api1".to_string(),
latency: Duration::from_millis(1500),
}
.close_and_merge(sink.clone());
ApiCall {
endpoint: "api1".to_string(),
latency: Duration::from_millis(800),
}
.close_and_merge(sink.clone());
ApiCall {
endpoint: "api2".to_string(),
latency: Duration::from_millis(2000),
}
.close_and_merge(sink.clone());
sink.flush().await;
let entries_a = aggregated_sink_a.inspector.entries();
check!(entries_a.len() == 2);
let api1_entry = entries_a
.iter()
.find(|e| e.values["endpoint"] == "api1")
.unwrap();
check!(api1_entry.metrics["latency"].distribution.len() == 3);
let api2_entry = entries_a
.iter()
.find(|e| e.values["endpoint"] == "api2")
.unwrap();
check!(api2_entry.metrics["latency"].distribution.len() == 1);
let entries_b = aggregated_sink_b.inspector.entries();
check!(entries_b.len() == 3);
let api1_under = entries_b
.iter()
.find(|e| e.values["endpoint"] == "api1" && e.metrics["over_1s"] == false)
.unwrap();
check!(api1_under.metrics["latency"].distribution.len() == 2);
let api1_over = entries_b
.iter()
.find(|e| e.values["endpoint"] == "api1" && e.metrics["over_1s"] == true)
.unwrap();
check!(api1_over.metrics["latency"].distribution.len() == 1);
let api2_over = entries_b
.iter()
.find(|e| e.values["endpoint"] == "api2" && e.metrics["over_1s"] == true)
.unwrap();
check!(api2_over.metrics["latency"].distribution.len() == 1);
check!(non_aggregated_sink.inspector.entries().len() == 4);
}