use metrique::ServiceMetrics;
use metrique::emf::Emf;
use metrique::unit::Millisecond;
use metrique::unit_of_work::metrics;
use metrique::writer::Entry;
use metrique_aggregation::histogram::Histogram;
use metrique_aggregation::sink::{TeeSink, non_aggregate};
use metrique_aggregation::traits::{AggregateStrategy, Key};
use metrique_aggregation::value::Sum;
use metrique_aggregation::{aggregate, aggregator::KeyedAggregator, sink::WorkerSink};
use metrique_writer::sample::SampledFormatExt;
use metrique_writer::value::ToString;
use metrique_writer::{AttachGlobalEntrySinkExt, FormatExt, GlobalEntrySink};
use metrique_writer_core::global_entry_sink;
use std::borrow::Cow;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::info;
#[aggregate(ref)]
#[metrics(emf::dimension_sets = [["has_errors", "endpoint"], ["endpoint"]])]
struct ApiCall {
#[aggregate(key)]
endpoint: String,
#[aggregate(strategy = Sum)]
request_count: u64,
#[aggregate(strategy = Histogram<Duration>)]
#[metrics(unit = Millisecond)]
latency: Duration,
#[aggregate(strategy = Sum)]
errors: u64,
}
struct AggregateByErrorsEndpoint;
impl AggregateStrategy for AggregateByErrorsEndpoint {
type Source = ApiCallEntry;
type Key = AggregateByErrorsEndpoint;
}
#[derive(Debug, Hash, Clone, PartialEq, Eq)]
#[metrics]
pub struct ByErrorsEndpoint<'a> {
#[metrics(format = ToString)]
has_errors: bool,
endpoint: Cow<'a, str>,
}
impl Key<ApiCallEntry> for AggregateByErrorsEndpoint {
type Key<'a> = ByErrorsEndpoint<'a>;
fn from_source(source: &ApiCallEntry) -> Self::Key<'_> {
#[expect(deprecated)]
ByErrorsEndpoint {
has_errors: source.errors > 0,
endpoint: Cow::Borrowed(&source.endpoint),
}
}
fn static_key<'a>(key: &Self::Key<'a>) -> Self::Key<'static> {
ByErrorsEndpoint {
has_errors: key.has_errors,
endpoint: Cow::Owned(key.endpoint.clone().into_owned()),
}
}
fn static_key_matches<'a>(owned: &Self::Key<'static>, borrowed: &Self::Key<'a>) -> bool {
owned == borrowed
}
}
async fn make_api_call(endpoint: &str) -> Result<(), String> {
let delay = match endpoint {
"GetUser" => 15,
"UpdateUser" => 45,
"DeleteUser" => 30,
"ListUsers" => 100,
_ => 25,
};
tokio::time::sleep(Duration::from_millis(delay)).await;
if endpoint == "DeleteUser" && rand::random::<f32>() < 0.2 {
Err("Permission denied".to_string())
} else {
Ok(())
}
}
async fn api_service(mut requests: mpsc::Receiver<String>) {
let aggregate_by_endpoint = KeyedAggregator::<ApiCall>::new(ServiceMetrics::sink());
let aggregate_by_endoint_errors =
KeyedAggregator::<AggregateByErrorsEndpoint>::new(ServiceMetrics::sink());
let destination = TeeSink::new(
aggregate_by_endpoint,
TeeSink::new(
aggregate_by_endoint_errors,
non_aggregate(SampledMetrics::sink()),
),
);
let sink = WorkerSink::new(destination, Duration::from_millis(5000));
info!("API service started. Processing requests...\n");
while let Some(endpoint) = requests.recv().await {
let start = std::time::Instant::now();
let result = make_api_call(&endpoint).await;
let latency = start.elapsed();
ApiCall {
endpoint: endpoint.clone(),
request_count: 1,
latency,
errors: if result.is_err() { 1 } else { 0 },
}
.close_and_merge(sink.clone());
}
info!("\nFlushing metrics...");
sink.flush().await;
}
global_entry_sink! { SampledMetrics }
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
#[derive(Entry)]
struct Globals {
aggregated: &'static str,
}
let _handle = ServiceMetrics::attach_to_stream(
Emf::builder(
"RequestMetrics".to_string(),
vec![vec!["aggregated".to_string()]],
)
.skip_all_validations(true)
.build()
.merge_globals(Globals { aggregated: "true" })
.output_to_makewriter(|| std::io::stdout().lock()),
);
let sampled_stream = Emf::builder(
"SampledRequestMetrics".to_string(),
vec![vec!["aggregated".to_string()]],
)
.skip_all_validations(true)
.build()
.with_sampling()
.sample_by_fixed_fraction(0.01) .merge_globals(Globals {
aggregated: "false",
})
.output_to_makewriter(|| std::io::stdout().lock());
let _handle = SampledMetrics::attach_to_stream(sampled_stream);
let (tx, rx) = mpsc::channel(100);
let service = tokio::spawn(api_service(rx));
let requests = vec![
"GetUser",
"GetUser",
"GetUser",
"UpdateUser",
"UpdateUser",
"DeleteUser",
"DeleteUser",
"DeleteUser",
"ListUsers",
"GetUser",
"UpdateUser",
"DeleteUser",
];
for _i in 0..100 {
for endpoint in &requests {
tx.send(endpoint.to_string()).await.unwrap();
}
}
drop(tx);
service.await.unwrap();
}