use metrique::DefaultSink;
use metrique::emf::Emf;
use metrique::unit::Millisecond;
use metrique::unit_of_work::metrics;
use metrique::writer::BoxEntrySink;
use metrique::writer::{FormatExt, sink::FlushImmediatelyBuilder};
use metrique_aggregation::aggregate;
use metrique_aggregation::aggregator::Aggregate;
use metrique_aggregation::histogram::Histogram;
use metrique_aggregation::value::{KeepLast, Sum};
use metrique_writer::{AttachGlobalEntrySinkExt, GlobalEntrySink};
use metrique_writer_core::global_entry_sink;
use std::sync::Arc;
use std::time::Duration;
#[aggregate(ref)]
#[metrics]
struct BackendCall {
#[aggregate(strategy = Sum)]
requests_made: usize,
#[aggregate(strategy = Histogram<Duration>)]
#[metrics(unit = Millisecond)]
latency: Duration,
#[aggregate(strategy = Sum)]
errors: u64,
#[aggregate(strategy = KeepLast, clone)]
error_message: Option<String>,
#[aggregate(ignore)]
query_id: Arc<String>,
}
#[metrics(rename_all = "PascalCase", emf::dimension_sets = [["QueryId"]])]
struct DistributedQuery {
#[metrics(no_close)]
query_id: Arc<String>,
#[metrics(flatten)]
backend_calls: Aggregate<BackendCall>,
}
async fn call_backend(shard: &str, _query: &str) -> Result<String, String> {
let delay = match shard {
"shard1" => 45,
"shard2" => 67,
"shard3" => 52,
"shard4" => 71,
"shard5" => 58,
_ => 50,
};
tokio::time::sleep(Duration::from_millis(delay)).await;
if shard == "shard3" {
Err("Connection timeout".to_string())
} else {
Ok(format!("Results from {}", shard))
}
}
async fn execute_distributed_query(query: &str, sink: BoxEntrySink) {
let mut metrics = DistributedQuery {
query_id: Arc::new(uuid::Uuid::new_v4().to_string()),
backend_calls: Aggregate::default(),
}
.append_on_drop(sink);
let query_id = metrics.query_id.clone();
let sampled_calls = SampledApiCalls::sink();
for shard in 0..100 {
let start = std::time::Instant::now();
let result = call_backend(&format!("shard{shard}"), query).await;
let latency = start.elapsed();
let should_sample = latency > Duration::from_millis(70) || result.is_err();
let backend_call = BackendCall {
requests_made: 1,
latency,
errors: if result.is_err() { 1 } else { 0 },
error_message: result.err().map(|err| format!("{err}")),
query_id: Arc::clone(&query_id),
};
if should_sample {
metrics
.backend_calls
.insert_and_send_to(backend_call, &sampled_calls);
} else {
metrics.backend_calls.insert(backend_call);
}
}
}
global_entry_sink! { SampledApiCalls }
#[tokio::main]
async fn main() {
let emf_stream = Emf::builder("DistributedQueryMetrics".to_string(), vec![vec![]])
.build()
.output_to_makewriter(|| std::io::stdout().lock());
let emf: DefaultSink = FlushImmediatelyBuilder::new().build_boxed(emf_stream);
let sampled_stream = Emf::builder("SampledBackendCalls".to_string(), vec![vec![]])
.skip_all_validations(true)
.build()
.output_to_makewriter(|| std::io::stdout().lock());
let _handle = SampledApiCalls::attach_to_stream(sampled_stream);
for _i in 0..5 {
execute_distributed_query("SELECT * FROM users WHERE active = true", emf.clone()).await;
}
}