use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::LazyLock;
use dashmap::DashMap;
const TTFT_BUCKET_LIMITS_MS: [u64; 8] = [100, 250, 500, 1000, 2000, 5000, 10000, 30000];
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
struct ChunkKey {
provider: String,
kind: String,
}
struct Histogram {
buckets: [AtomicU64; 8],
count: AtomicU64,
sum_ms: AtomicU64,
}
impl Histogram {
fn new() -> Self {
Self {
buckets: std::array::from_fn(|_| AtomicU64::new(0)),
count: AtomicU64::new(0),
sum_ms: AtomicU64::new(0),
}
}
fn observe(&self, duration_ms: u64) {
self.count.fetch_add(1, Ordering::Relaxed);
self.sum_ms.fetch_add(duration_ms, Ordering::Relaxed);
for (idx, upper) in TTFT_BUCKET_LIMITS_MS.iter().enumerate() {
if duration_ms <= *upper {
self.buckets[idx].fetch_add(1, Ordering::Relaxed);
}
}
}
}
static STREAM_TTFT: LazyLock<DashMap<String, Histogram>> = LazyLock::new(DashMap::new);
static STREAM_CHUNKS: LazyLock<DashMap<ChunkKey, AtomicU64>> = LazyLock::new(DashMap::new);
pub fn observe_stream_ttft_ms(provider: &str, duration_ms: u64) {
STREAM_TTFT
.entry(provider.to_string())
.or_insert_with(Histogram::new)
.observe(duration_ms);
}
pub fn inc_stream_chunks_total(provider: &str, kind: &str) {
STREAM_CHUNKS
.entry(ChunkKey {
provider: provider.to_string(),
kind: kind.to_string(),
})
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
#[cfg(test)]
pub fn reset_for_test() {
STREAM_TTFT.clear();
STREAM_CHUNKS.clear();
}
fn escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
_ => out.push(c),
}
}
out
}
fn fmt_secs(ms: u64) -> String {
format!("{:.3}", ms as f64 / 1000.0)
}
pub fn render_prometheus() -> String {
let mut out = String::new();
out.push_str(
"# HELP agent_llm_stream_ttft_seconds Time-to-first-token for streaming responses.\n",
);
out.push_str("# TYPE agent_llm_stream_ttft_seconds histogram\n");
if STREAM_TTFT.is_empty() {
for upper in TTFT_BUCKET_LIMITS_MS.iter() {
out.push_str(&format!(
"agent_llm_stream_ttft_seconds_bucket{{provider=\"\",le=\"{}\"}} 0\n",
fmt_secs(*upper)
));
}
out.push_str("agent_llm_stream_ttft_seconds_bucket{provider=\"\",le=\"+Inf\"} 0\n");
out.push_str("agent_llm_stream_ttft_seconds_sum{provider=\"\"} 0\n");
out.push_str("agent_llm_stream_ttft_seconds_count{provider=\"\"} 0\n");
} else {
let mut providers: Vec<String> = STREAM_TTFT.iter().map(|e| e.key().clone()).collect();
providers.sort();
providers.dedup();
for provider in providers {
let Some(series) = STREAM_TTFT.get(&provider) else {
continue;
};
let provider = escape(&provider);
for (idx, upper) in TTFT_BUCKET_LIMITS_MS.iter().enumerate() {
out.push_str(&format!(
"agent_llm_stream_ttft_seconds_bucket{{provider=\"{provider}\",le=\"{}\"}} {}\n",
fmt_secs(*upper),
series.buckets[idx].load(Ordering::Relaxed)
));
}
let count = series.count.load(Ordering::Relaxed);
out.push_str(&format!(
"agent_llm_stream_ttft_seconds_bucket{{provider=\"{provider}\",le=\"+Inf\"}} {count}\n"
));
out.push_str(&format!(
"agent_llm_stream_ttft_seconds_sum{{provider=\"{provider}\"}} {}\n",
fmt_secs(series.sum_ms.load(Ordering::Relaxed))
));
out.push_str(&format!(
"agent_llm_stream_ttft_seconds_count{{provider=\"{provider}\"}} {count}\n"
));
}
}
out.push_str(
"# HELP agent_llm_stream_chunks_total Streaming chunks emitted by provider/kind.\n",
);
out.push_str("# TYPE agent_llm_stream_chunks_total counter\n");
if STREAM_CHUNKS.is_empty() {
out.push_str("agent_llm_stream_chunks_total{provider=\"\",kind=\"\"} 0\n");
} else {
let mut rows: Vec<_> = STREAM_CHUNKS
.iter()
.map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
.collect();
rows.sort_by(|a, b| {
(a.0.provider.clone(), a.0.kind.clone()).cmp(&(b.0.provider.clone(), b.0.kind.clone()))
});
for (key, v) in rows {
out.push_str(&format!(
"agent_llm_stream_chunks_total{{provider=\"{}\",kind=\"{}\"}} {}\n",
escape(&key.provider),
escape(&key.kind),
v
));
}
}
out
}
#[cfg(test)]
pub(crate) static TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[cfg(test)]
mod tests {
use super::*;
fn contains_line(haystack: &str, needle: &str) -> bool {
haystack.lines().any(|line| line == needle)
}
#[test]
fn render_stream_metrics_and_escaping() {
let _guard = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
reset_for_test();
observe_stream_ttft_ms("openai", 120);
observe_stream_ttft_ms("openai", 920);
inc_stream_chunks_total("openai", "text_delta");
inc_stream_chunks_total("openai", "usage");
inc_stream_chunks_total("prov\"x", "line\nbreak");
let body = render_prometheus();
assert!(contains_line(
&body,
"agent_llm_stream_ttft_seconds_count{provider=\"openai\"} 2"
));
assert!(contains_line(
&body,
"agent_llm_stream_chunks_total{provider=\"openai\",kind=\"text_delta\"} 1"
));
assert!(contains_line(
&body,
"agent_llm_stream_chunks_total{provider=\"openai\",kind=\"usage\"} 1"
));
assert!(contains_line(
&body,
"agent_llm_stream_chunks_total{provider=\"prov\\\"x\",kind=\"line\\nbreak\"} 1"
));
}
#[test]
fn render_empty_series_when_no_samples() {
let _guard = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
reset_for_test();
let body = render_prometheus();
assert!(contains_line(
&body,
"agent_llm_stream_ttft_seconds_count{provider=\"\"} 0"
));
assert!(contains_line(
&body,
"agent_llm_stream_chunks_total{provider=\"\",kind=\"\"} 0"
));
}
}