use std::fmt::Write;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Default)]
pub struct Counter {
value: AtomicU64,
}
impl Counter {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn inc(&self) {
self.add(1);
}
#[inline]
pub fn add(&self, n: u64) {
self.value.fetch_add(n, Ordering::Relaxed);
}
#[inline]
pub fn get(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
pub fn reset(&self) {
self.value.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Default)]
pub struct Gauge {
value: AtomicU64,
underflow_warned: AtomicBool,
}
impl Gauge {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn set(&self, value: u64) {
self.value.store(value, Ordering::Relaxed);
}
#[inline]
pub fn get(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
#[inline]
pub fn inc(&self) {
self.value.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn dec(&self) {
let result = self
.value
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v == 0 { None } else { Some(v - 1) }
});
if result.is_err() && !self.underflow_warned.swap(true, Ordering::AcqRel) {
tracing::warn!(
"Gauge::dec() called when value was already 0 — possible inc/dec mismatch \
(this warning fires once per gauge instance)"
);
}
}
}
#[derive(Debug)]
pub struct LatencyTracker {
count: AtomicU64,
sum_nanos: AtomicU64,
min_nanos: AtomicU64,
max_nanos: AtomicU64,
histogram: [AtomicU64; 256],
}
impl Default for LatencyTracker {
fn default() -> Self {
Self::new()
}
}
impl LatencyTracker {
pub fn new() -> Self {
Self {
count: AtomicU64::new(0),
sum_nanos: AtomicU64::new(0),
min_nanos: AtomicU64::new(u64::MAX),
max_nanos: AtomicU64::new(0),
histogram: std::array::from_fn(|_| AtomicU64::new(0)),
}
}
#[inline]
fn bucket_for(nanos: u64) -> usize {
if nanos == 0 {
return 0;
}
let i = (63 - nanos.leading_zeros()) as usize; if i < 2 {
return 1;
}
let sub = ((nanos >> (i - 2)) & 3) as usize;
((i - 2) * 4 + 2 + sub).min(255)
}
#[inline]
fn estimate_nanos_for_bucket(bucket: usize) -> u64 {
match bucket {
0 => 0,
1 => 2, _ => {
let idx = bucket - 2;
let band = idx / 4 + 2;
let sub = idx % 4;
if band == 2 {
4u64 + sub as u64
} else {
let lower = 1u64 << band;
let sub_width = 1u64 << (band - 2);
let half_sub = 1u64 << (band - 3);
lower + sub as u64 * sub_width + half_sub
}
}
}
}
#[inline]
pub fn record(&self, duration: Duration) {
let nanos = duration.as_nanos() as u64;
self.sum_nanos.fetch_add(nanos, Ordering::Relaxed);
self.min_nanos.fetch_min(nanos, Ordering::Relaxed);
self.max_nanos.fetch_max(nanos, Ordering::Relaxed);
self.histogram[Self::bucket_for(nanos)].fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn start(&self) -> LatencyGuard<'_> {
LatencyGuard {
tracker: self,
start: Instant::now(),
}
}
#[inline]
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
pub fn sum(&self) -> Duration {
Duration::from_nanos(self.sum_nanos.load(Ordering::Relaxed))
}
pub fn min(&self) -> Option<Duration> {
let min = self.min_nanos.load(Ordering::Relaxed);
if min == u64::MAX {
None
} else {
Some(Duration::from_nanos(min))
}
}
pub fn max(&self) -> Option<Duration> {
if self.count() == 0 {
None
} else {
Some(Duration::from_nanos(self.max_nanos.load(Ordering::Relaxed)))
}
}
pub fn avg(&self) -> Option<Duration> {
let count = self.count();
let sum = self.sum_nanos.load(Ordering::Relaxed);
sum.checked_div(count).map(Duration::from_nanos)
}
pub fn percentile(&self, percentile: f64) -> Option<Duration> {
let total = self.count();
if total == 0 {
return None;
}
let p = percentile.clamp(0.0, 100.0);
let target = ((p / 100.0) * total as f64).ceil().max(1.0) as u64;
let mut cumulative: u64 = 0;
for (i, bucket) in self.histogram.iter().enumerate() {
cumulative += bucket.load(Ordering::Relaxed);
if cumulative >= target {
return Some(Duration::from_nanos(Self::estimate_nanos_for_bucket(i)));
}
}
self.max()
}
pub fn snapshot(&self) -> LatencySnapshot {
LatencySnapshot {
count: self.count(),
sum: self.sum(),
min: self.min(),
max: self.max(),
avg: self.avg(),
p50: self.percentile(50.0),
p95: self.percentile(95.0),
p99: self.percentile(99.0),
}
}
pub fn reset(&self) {
self.count.store(0, Ordering::Relaxed);
self.sum_nanos.store(0, Ordering::Relaxed);
self.min_nanos.store(u64::MAX, Ordering::Relaxed);
self.max_nanos.store(0, Ordering::Relaxed);
for bucket in &self.histogram {
bucket.store(0, Ordering::Relaxed);
}
}
}
pub struct LatencyGuard<'a> {
tracker: &'a LatencyTracker,
start: Instant,
}
impl Drop for LatencyGuard<'_> {
fn drop(&mut self) {
self.tracker.record(self.start.elapsed());
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct LatencySnapshot {
pub count: u64,
pub sum: Duration,
pub min: Option<Duration>,
pub max: Option<Duration>,
pub avg: Option<Duration>,
pub p50: Option<Duration>,
pub p95: Option<Duration>,
pub p99: Option<Duration>,
}
pub trait MetricsExporter {
fn export_counter(&mut self, name: &str, help: &str, value: u64);
fn export_gauge(&mut self, name: &str, help: &str, value: u64);
fn export_latency(&mut self, name: &str, help: &str, snapshot: &LatencySnapshot);
}
pub trait MetricsVisitable {
fn export_metrics(&self, prefix: &str, exporter: &mut dyn MetricsExporter);
fn to_prometheus_text(&self, prefix: &str) -> String {
let mut exporter = PrometheusExporter::new();
self.export_metrics(prefix, &mut exporter);
exporter.finish()
}
fn to_json(&self, prefix: &str) -> String {
let mut exporter = JsonExporter::new();
self.export_metrics(prefix, &mut exporter);
exporter.finish()
}
}
pub struct PrometheusExporter {
output: String,
}
impl PrometheusExporter {
pub fn new() -> Self {
Self {
output: String::with_capacity(4096),
}
}
pub fn finish(self) -> String {
self.output
}
}
impl Default for PrometheusExporter {
fn default() -> Self {
Self::new()
}
}
fn sanitize_prometheus_name(name: &str) -> String {
let mut out = String::with_capacity(name.len());
for ch in name.chars() {
if ch.is_ascii_alphanumeric() || ch == '_' || ch == ':' {
out.push(ch);
} else {
out.push('_');
}
}
if out.starts_with(|c: char| c.is_ascii_digit()) {
out.insert(0, '_');
}
out
}
impl MetricsExporter for PrometheusExporter {
fn export_counter(&mut self, name: &str, help: &str, value: u64) {
let name = sanitize_prometheus_name(name);
let _ = writeln!(self.output, "# HELP {}_total {}", name, help);
let _ = writeln!(self.output, "# TYPE {}_total counter", name);
let _ = writeln!(self.output, "{}_total {}", name, value);
}
fn export_gauge(&mut self, name: &str, help: &str, value: u64) {
let name = sanitize_prometheus_name(name);
let _ = writeln!(self.output, "# HELP {} {}", name, help);
let _ = writeln!(self.output, "# TYPE {} gauge", name);
let _ = writeln!(self.output, "{} {}", name, value);
}
fn export_latency(&mut self, name: &str, help: &str, snapshot: &LatencySnapshot) {
let name = sanitize_prometheus_name(name);
let _ = writeln!(
self.output,
"# HELP {}_seconds {} (quantiles estimated from 256-sub-bucket histogram; relative error ≤12.5% per sub-bucket)",
name, help
);
let _ = writeln!(self.output, "# TYPE {}_seconds summary", name);
let _ = writeln!(self.output, "{}_seconds_count {}", name, snapshot.count);
let _ = writeln!(
self.output,
"{}_seconds_sum {:.9}",
name,
snapshot.sum.as_secs_f64()
);
if let Some(min) = snapshot.min {
let _ = writeln!(self.output, "{}_seconds_min {:.9}", name, min.as_secs_f64());
}
if let Some(max) = snapshot.max {
let _ = writeln!(self.output, "{}_seconds_max {:.9}", name, max.as_secs_f64());
}
if let Some(p50) = snapshot.p50 {
let _ = writeln!(
self.output,
"{}_seconds{{quantile=\"0.5\"}} {:.9}",
name,
p50.as_secs_f64()
);
}
if let Some(p95) = snapshot.p95 {
let _ = writeln!(
self.output,
"{}_seconds{{quantile=\"0.95\"}} {:.9}",
name,
p95.as_secs_f64()
);
}
if let Some(p99) = snapshot.p99 {
let _ = writeln!(
self.output,
"{}_seconds{{quantile=\"0.99\"}} {:.9}",
name,
p99.as_secs_f64()
);
}
}
}
pub struct JsonExporter {
entries: Vec<String>,
}
impl JsonExporter {
pub fn new() -> Self {
Self {
entries: Vec::new(),
}
}
pub fn finish(self) -> String {
let mut output = String::with_capacity(self.entries.iter().map(|e| e.len() + 1).sum());
output.push('[');
for (i, entry) in self.entries.iter().enumerate() {
if i > 0 {
output.push(',');
}
output.push_str(entry);
}
output.push(']');
output
}
}
impl Default for JsonExporter {
fn default() -> Self {
Self::new()
}
}
fn json_escape(s: &str) -> String {
let mut escaped = String::with_capacity(s.len());
for c in s.chars() {
match c {
'"' => escaped.push_str("\\\""),
'\\' => escaped.push_str("\\\\"),
'\n' => escaped.push_str("\\n"),
'\r' => escaped.push_str("\\r"),
'\t' => escaped.push_str("\\t"),
c if c.is_control() => {
let _ = write!(escaped, "\\u{:04x}", c as u32);
}
c => escaped.push(c),
}
}
escaped
}
impl MetricsExporter for JsonExporter {
fn export_counter(&mut self, name: &str, help: &str, value: u64) {
self.entries.push(format!(
"{{\"name\":\"{}\",\"type\":\"counter\",\"help\":\"{}\",\"value\":{}}}",
json_escape(name),
json_escape(help),
value,
));
}
fn export_gauge(&mut self, name: &str, help: &str, value: u64) {
self.entries.push(format!(
"{{\"name\":\"{}\",\"type\":\"gauge\",\"help\":\"{}\",\"value\":{}}}",
json_escape(name),
json_escape(help),
value,
));
}
fn export_latency(&mut self, name: &str, help: &str, snapshot: &LatencySnapshot) {
let min_str = snapshot
.min
.map(|d| format!("{:.9}", d.as_secs_f64()))
.unwrap_or_else(|| "null".to_string());
let max_str = snapshot
.max
.map(|d| format!("{:.9}", d.as_secs_f64()))
.unwrap_or_else(|| "null".to_string());
let avg_str = snapshot
.avg
.map(|d| format!("{:.9}", d.as_secs_f64()))
.unwrap_or_else(|| "null".to_string());
let p50_str = snapshot
.p50
.map(|d| format!("{:.9}", d.as_secs_f64()))
.unwrap_or_else(|| "null".to_string());
let p95_str = snapshot
.p95
.map(|d| format!("{:.9}", d.as_secs_f64()))
.unwrap_or_else(|| "null".to_string());
let p99_str = snapshot
.p99
.map(|d| format!("{:.9}", d.as_secs_f64()))
.unwrap_or_else(|| "null".to_string());
self.entries.push(format!(
"{{\"name\":\"{}\",\"type\":\"latency\",\"help\":\"{}\",\"count\":{},\"sum_seconds\":{:.9},\"min_seconds\":{},\"max_seconds\":{},\"avg_seconds\":{},\"p50_seconds\":{},\"p95_seconds\":{},\"p99_seconds\":{}}}",
json_escape(name),
json_escape(help),
snapshot.count,
snapshot.sum.as_secs_f64(),
min_str,
max_str,
avg_str,
p50_str,
p95_str,
p99_str,
));
}
}
impl MetricsVisitable for ProducerMetrics {
fn export_metrics(&self, prefix: &str, exporter: &mut dyn MetricsExporter) {
exporter.export_counter(
&format!("{prefix}_records_sent"),
"Total number of records sent",
self.records_sent.get(),
);
exporter.export_counter(
&format!("{prefix}_bytes_sent"),
"Total bytes sent",
self.bytes_sent.get(),
);
exporter.export_counter(
&format!("{prefix}_batches_sent"),
"Total batches sent",
self.batches_sent.get(),
);
exporter.export_counter(
&format!("{prefix}_errors"),
"Total send errors",
self.errors.get(),
);
exporter.export_counter(
&format!("{prefix}_retries"),
"Total retries",
self.retries.get(),
);
exporter.export_gauge(
&format!("{prefix}_connections"),
"Current active connections",
self.connections.get(),
);
exporter.export_gauge(
&format!("{prefix}_buffered_records"),
"Producer records currently admitted under the memory budget",
self.buffered_records.get(),
);
exporter.export_counter(
&format!("{prefix}_compressed_bytes"),
"Total compressed bytes written for compressed batches",
self.compressed_bytes.get(),
);
exporter.export_counter(
&format!("{prefix}_uncompressed_bytes"),
"Total uncompressed bytes for the same compressed batches",
self.uncompressed_bytes.get(),
);
exporter.export_latency(
&format!("{prefix}_send_latency"),
"Send latency",
&self.send_latency.snapshot(),
);
}
}
impl MetricsVisitable for ConsumerMetrics {
fn export_metrics(&self, prefix: &str, exporter: &mut dyn MetricsExporter) {
exporter.export_counter(
&format!("{prefix}_records_received"),
"Total records received",
self.records_received.get(),
);
exporter.export_counter(
&format!("{prefix}_bytes_received"),
"Total bytes received",
self.bytes_received.get(),
);
exporter.export_counter(
&format!("{prefix}_fetches"),
"Total fetch requests",
self.fetches.get(),
);
exporter.export_counter(
&format!("{prefix}_polls"),
"Total poll operations",
self.polls.get(),
);
exporter.export_counter(
&format!("{prefix}_empty_polls"),
"Total empty polls",
self.empty_polls.get(),
);
exporter.export_counter(
&format!("{prefix}_commits"),
"Total commit operations",
self.commits.get(),
);
exporter.export_counter(
&format!("{prefix}_errors"),
"Total errors",
self.errors.get(),
);
exporter.export_counter(
&format!("{prefix}_rebalances"),
"Total rebalances",
self.rebalances.get(),
);
exporter.export_counter(
&format!("{prefix}_seeks"),
"Total seek operations (seek + seek_many partition count)",
self.seeks.get(),
);
exporter.export_gauge(
&format!("{prefix}_lag"),
"Total consumer lag across all assigned partitions",
self.lag.get(),
);
exporter.export_gauge(
&format!("{prefix}_lag_max"),
"Maximum per-partition consumer lag",
self.lag_max.get(),
);
exporter.export_gauge(
&format!("{prefix}_assigned_partitions"),
"Currently assigned partitions",
self.assigned_partitions.get(),
);
exporter.export_gauge(
&format!("{prefix}_paused_partitions"),
"Currently paused partitions",
self.paused_partitions.get(),
);
exporter.export_gauge(
&format!("{prefix}_buffered_records"),
"Currently buffered records in recv() buffer",
self.buffered_records.get(),
);
exporter.export_latency(
&format!("{prefix}_poll_latency"),
"Poll latency",
&self.poll_latency.snapshot(),
);
exporter.export_latency(
&format!("{prefix}_fetch_latency"),
"Fetch latency",
&self.fetch_latency.snapshot(),
);
}
}
impl MetricsVisitable for ConnectionMetrics {
fn export_metrics(&self, prefix: &str, exporter: &mut dyn MetricsExporter) {
exporter.export_counter(
&format!("{prefix}_connections_created"),
"Total connections created",
self.connections_created.get(),
);
exporter.export_counter(
&format!("{prefix}_connections_closed"),
"Total connections closed",
self.connections_closed.get(),
);
exporter.export_counter(
&format!("{prefix}_connection_errors"),
"Total connection errors",
self.connection_errors.get(),
);
exporter.export_counter(
&format!("{prefix}_high_priority_requests"),
"Total high-priority requests sent",
self.high_priority_requests.get(),
);
exporter.export_counter(
&format!("{prefix}_normal_priority_requests"),
"Total normal-priority requests sent",
self.normal_priority_requests.get(),
);
exporter.export_counter(
&format!("{prefix}_high_priority_bypasses"),
"High-priority requests processed ahead of normal-priority work",
self.high_priority_bypasses.get(),
);
exporter.export_counter(
&format!("{prefix}_high_priority_bypass_yields"),
"Forced normal-priority drain steps after exhausting the high-priority bypass budget",
self.high_priority_bypass_yields.get(),
);
exporter.export_counter(
&format!("{prefix}_throttle_delays"),
"Normal-priority requests delayed due to broker throttling",
self.throttle_delays.get(),
);
exporter.export_counter(
&format!("{prefix}_throttle_delay_ms"),
"Total broker-throttle delay applied to normal-priority requests in milliseconds",
self.throttle_delay_ms.get(),
);
exporter.export_gauge(
&format!("{prefix}_active_connections"),
"Current active connections",
self.active_connections.get(),
);
exporter.export_latency(
&format!("{prefix}_connect_latency"),
"Connection establishment latency",
&self.connect_latency.snapshot(),
);
exporter.export_latency(
&format!("{prefix}_tls_handshake_latency"),
"TLS handshake latency (TLS connections only)",
&self.tls_handshake_latency.snapshot(),
);
}
}
#[derive(Debug, Clone)]
pub struct KrafkaMetrics {
producer: Arc<ProducerMetrics>,
consumer: Arc<ConsumerMetrics>,
connection: Arc<ConnectionMetrics>,
}
impl Default for KrafkaMetrics {
fn default() -> Self {
Self::new()
}
}
impl KrafkaMetrics {
pub fn new() -> Self {
Self {
producer: Arc::new(ProducerMetrics::new()),
consumer: Arc::new(ConsumerMetrics::new()),
connection: Arc::new(ConnectionMetrics::new()),
}
}
#[must_use]
pub fn producer_metrics(&self) -> Arc<ProducerMetrics> {
self.producer.clone()
}
#[must_use]
pub fn consumer_metrics(&self) -> Arc<ConsumerMetrics> {
self.consumer.clone()
}
#[must_use]
pub fn connection_metrics(&self) -> Arc<ConnectionMetrics> {
self.connection.clone()
}
pub fn export_all(&self, exporter: &mut dyn MetricsExporter) {
self.export_all_with_prefix("krafka", exporter);
}
pub fn export_all_with_prefix(&self, prefix: &str, exporter: &mut dyn MetricsExporter) {
self.producer
.export_metrics(&format!("{prefix}_producer"), exporter);
self.consumer
.export_metrics(&format!("{prefix}_consumer"), exporter);
self.connection
.export_metrics(&format!("{prefix}_connection"), exporter);
}
pub fn to_prometheus_text(&self) -> String {
self.to_prometheus_text_with_prefix("krafka")
}
pub fn to_prometheus_text_with_prefix(&self, prefix: &str) -> String {
let mut exporter = PrometheusExporter::new();
self.export_all_with_prefix(prefix, &mut exporter);
exporter.finish()
}
pub fn to_json(&self) -> String {
self.to_json_with_prefix("krafka")
}
pub fn to_json_with_prefix(&self, prefix: &str) -> String {
let mut exporter = JsonExporter::new();
self.export_all_with_prefix(prefix, &mut exporter);
exporter.finish()
}
pub fn reset(&self) {
self.producer.records_sent.reset();
self.producer.bytes_sent.reset();
self.producer.batches_sent.reset();
self.producer.errors.reset();
self.producer.retries.reset();
self.producer.send_latency.reset();
self.consumer.records_received.reset();
self.consumer.bytes_received.reset();
self.consumer.fetches.reset();
self.consumer.polls.reset();
self.consumer.empty_polls.reset();
self.consumer.commits.reset();
self.consumer.errors.reset();
self.consumer.rebalances.reset();
self.consumer.seeks.reset();
self.consumer.poll_latency.reset();
self.consumer.fetch_latency.reset();
self.consumer.lag.set(0);
self.consumer.lag_max.set(0);
self.consumer.assigned_partitions.set(0);
self.consumer.paused_partitions.set(0);
self.consumer.buffered_records.set(0);
self.producer.connections.set(0);
self.producer.buffered_records.set(0);
self.producer.compressed_bytes.reset();
self.producer.uncompressed_bytes.reset();
self.connection.connections_created.reset();
self.connection.connections_closed.reset();
self.connection.connection_errors.reset();
self.connection.high_priority_requests.reset();
self.connection.normal_priority_requests.reset();
self.connection.high_priority_bypasses.reset();
self.connection.high_priority_bypass_yields.reset();
self.connection.throttle_delays.reset();
self.connection.throttle_delay_ms.reset();
self.connection.active_connections.set(0);
self.connection.connect_latency.reset();
self.connection.tls_handshake_latency.reset();
}
}
#[derive(Debug, Default)]
pub struct ProducerMetrics {
pub records_sent: Counter,
pub bytes_sent: Counter,
pub batches_sent: Counter,
pub errors: Counter,
pub retries: Counter,
pub send_latency: LatencyTracker,
pub connections: Gauge,
pub buffered_records: Gauge,
pub compressed_bytes: Counter,
pub uncompressed_bytes: Counter,
}
impl ProducerMetrics {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn record_send(&self, bytes: u64) {
self.records_sent.inc();
self.bytes_sent.add(bytes);
}
#[inline]
pub fn record_batch(&self, records: u64) {
self.batches_sent.inc();
self.records_sent.add(records);
}
#[inline]
pub fn record_error(&self) {
self.errors.inc();
}
#[inline]
pub fn record_retry(&self) {
self.retries.inc();
}
#[inline]
pub fn record_compression(&self, compressed: u64, uncompressed: u64) {
self.compressed_bytes.add(compressed);
self.uncompressed_bytes.add(uncompressed);
}
pub fn snapshot(&self) -> ProducerMetricsSnapshot {
let compressed = self.compressed_bytes.get();
let uncompressed = self.uncompressed_bytes.get();
let compression_ratio_avg = if uncompressed > 0 {
Some(compressed as f64 / uncompressed as f64)
} else {
None
};
ProducerMetricsSnapshot {
records_sent: self.records_sent.get(),
bytes_sent: self.bytes_sent.get(),
batches_sent: self.batches_sent.get(),
errors: self.errors.get(),
retries: self.retries.get(),
send_latency: self.send_latency.snapshot(),
connections: self.connections.get(),
buffered_records: self.buffered_records.get(),
compressed_bytes: compressed,
uncompressed_bytes: uncompressed,
compression_ratio_avg,
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ProducerMetricsSnapshot {
pub records_sent: u64,
pub bytes_sent: u64,
pub batches_sent: u64,
pub errors: u64,
pub retries: u64,
pub send_latency: LatencySnapshot,
pub connections: u64,
pub buffered_records: u64,
pub compressed_bytes: u64,
pub uncompressed_bytes: u64,
pub compression_ratio_avg: Option<f64>,
}
#[derive(Debug, Default)]
pub struct ConsumerMetrics {
pub records_received: Counter,
pub bytes_received: Counter,
pub fetches: Counter,
pub polls: Counter,
pub empty_polls: Counter,
pub commits: Counter,
pub errors: Counter,
pub rebalances: Counter,
pub poll_latency: LatencyTracker,
pub fetch_latency: LatencyTracker,
pub lag: Gauge,
pub lag_max: Gauge,
pub assigned_partitions: Gauge,
pub paused_partitions: Gauge,
pub buffered_records: Gauge,
pub seeks: Counter,
}
impl ConsumerMetrics {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn record_seek(&self, n: u64) {
self.seeks.add(n);
}
#[inline]
pub fn record_receive(&self, records: u64, bytes: u64) {
self.records_received.add(records);
self.bytes_received.add(bytes);
}
#[inline]
pub fn record_poll(&self, records: u64) {
self.polls.inc();
if records == 0 {
self.empty_polls.inc();
}
}
#[inline]
pub fn record_fetch(&self) {
self.fetches.inc();
}
#[inline]
pub fn record_commit(&self) {
self.commits.inc();
}
#[inline]
pub fn record_error(&self) {
self.errors.inc();
}
#[inline]
pub fn record_rebalance(&self) {
self.rebalances.inc();
}
pub fn snapshot(&self) -> ConsumerMetricsSnapshot {
ConsumerMetricsSnapshot {
records_received: self.records_received.get(),
bytes_received: self.bytes_received.get(),
fetches: self.fetches.get(),
polls: self.polls.get(),
empty_polls: self.empty_polls.get(),
commits: self.commits.get(),
errors: self.errors.get(),
rebalances: self.rebalances.get(),
poll_latency: self.poll_latency.snapshot(),
fetch_latency: self.fetch_latency.snapshot(),
lag: self.lag.get(),
lag_max: self.lag_max.get(),
assigned_partitions: self.assigned_partitions.get(),
paused_partitions: self.paused_partitions.get(),
buffered_records: self.buffered_records.get(),
seeks: self.seeks.get(),
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ConsumerMetricsSnapshot {
pub records_received: u64,
pub bytes_received: u64,
pub fetches: u64,
pub polls: u64,
pub empty_polls: u64,
pub commits: u64,
pub errors: u64,
pub rebalances: u64,
pub poll_latency: LatencySnapshot,
pub fetch_latency: LatencySnapshot,
pub lag: u64,
pub lag_max: u64,
pub assigned_partitions: u64,
pub paused_partitions: u64,
pub buffered_records: u64,
pub seeks: u64,
}
#[derive(Debug, Default)]
pub struct ConnectionMetrics {
pub connections_created: Counter,
pub connections_closed: Counter,
pub connection_errors: Counter,
pub high_priority_requests: Counter,
pub normal_priority_requests: Counter,
pub high_priority_bypasses: Counter,
pub high_priority_bypass_yields: Counter,
pub throttle_delays: Counter,
pub throttle_delay_ms: Counter,
pub active_connections: Gauge,
pub connect_latency: LatencyTracker,
pub tls_handshake_latency: LatencyTracker,
}
impl ConnectionMetrics {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn record_connect(&self) {
self.connections_created.inc();
self.active_connections.inc();
}
#[inline]
pub fn record_close(&self) {
self.connections_closed.inc();
self.active_connections.dec();
}
#[inline]
pub fn record_error(&self) {
self.connection_errors.inc();
}
#[inline]
pub fn record_high_priority_request(&self) {
self.high_priority_requests.inc();
}
#[inline]
pub fn record_normal_priority_request(&self) {
self.normal_priority_requests.inc();
}
#[inline]
pub fn record_high_priority_bypass(&self) {
self.high_priority_bypasses.inc();
}
#[inline]
pub fn record_high_priority_bypass_yield(&self) {
self.high_priority_bypass_yields.inc();
}
#[inline]
pub fn record_throttle_delay(&self, delay: Duration) {
self.throttle_delays.inc();
let millis = delay.as_millis().min(u64::MAX as u128) as u64;
self.throttle_delay_ms.add(millis);
}
#[inline]
pub fn record_tls_handshake(&self, duration: Duration) {
self.tls_handshake_latency.record(duration);
}
pub fn snapshot(&self) -> ConnectionMetricsSnapshot {
ConnectionMetricsSnapshot {
connections_created: self.connections_created.get(),
connections_closed: self.connections_closed.get(),
connection_errors: self.connection_errors.get(),
high_priority_requests: self.high_priority_requests.get(),
normal_priority_requests: self.normal_priority_requests.get(),
high_priority_bypasses: self.high_priority_bypasses.get(),
high_priority_bypass_yields: self.high_priority_bypass_yields.get(),
throttle_delays: self.throttle_delays.get(),
throttle_delay_ms: self.throttle_delay_ms.get(),
active_connections: self.active_connections.get(),
connect_latency: self.connect_latency.snapshot(),
tls_handshake_latency: self.tls_handshake_latency.snapshot(),
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ConnectionMetricsSnapshot {
pub connections_created: u64,
pub connections_closed: u64,
pub connection_errors: u64,
pub high_priority_requests: u64,
pub normal_priority_requests: u64,
pub high_priority_bypasses: u64,
pub high_priority_bypass_yields: u64,
pub throttle_delays: u64,
pub throttle_delay_ms: u64,
pub active_connections: u64,
pub connect_latency: LatencySnapshot,
pub tls_handshake_latency: LatencySnapshot,
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_counter() {
let counter = Counter::new();
assert_eq!(counter.get(), 0);
counter.inc();
assert_eq!(counter.get(), 1);
counter.add(5);
assert_eq!(counter.get(), 6);
counter.reset();
assert_eq!(counter.get(), 0);
}
#[test]
fn test_gauge() {
let gauge = Gauge::new();
assert_eq!(gauge.get(), 0);
gauge.set(10);
assert_eq!(gauge.get(), 10);
gauge.inc();
assert_eq!(gauge.get(), 11);
gauge.dec();
assert_eq!(gauge.get(), 10);
}
#[test]
fn test_gauge_dec_saturates_at_zero() {
let gauge = Gauge::new();
assert_eq!(gauge.get(), 0);
gauge.dec();
assert_eq!(
gauge.get(),
0,
"Gauge::dec() should saturate at 0, not underflow"
);
gauge.dec();
gauge.dec();
assert_eq!(gauge.get(), 0);
gauge.set(1);
gauge.dec();
assert_eq!(gauge.get(), 0);
gauge.dec();
assert_eq!(gauge.get(), 0, "Gauge should not wrap around u64::MAX");
}
#[test]
fn test_gauge_dec_concurrent_no_underflow() {
use std::sync::Arc;
let gauge = Arc::new(Gauge::new());
gauge.set(1);
let g1 = Arc::clone(&gauge);
let g2 = Arc::clone(&gauge);
let t1 = std::thread::spawn(move || g1.dec());
let t2 = std::thread::spawn(move || g2.dec());
t1.join().unwrap();
t2.join().unwrap();
assert_eq!(
gauge.get(),
0,
"concurrent dec() must not underflow to u64::MAX"
);
}
#[test]
fn test_latency_tracker() {
let tracker = LatencyTracker::new();
assert_eq!(tracker.count(), 0);
assert!(tracker.min().is_none());
assert!(tracker.max().is_none());
assert!(tracker.avg().is_none());
tracker.record(Duration::from_millis(100));
tracker.record(Duration::from_millis(200));
tracker.record(Duration::from_millis(300));
assert_eq!(tracker.count(), 3);
assert_eq!(tracker.min(), Some(Duration::from_millis(100)));
assert_eq!(tracker.max(), Some(Duration::from_millis(300)));
assert_eq!(tracker.avg(), Some(Duration::from_millis(200)));
}
#[test]
fn test_latency_tracker_zero_duration_sample() {
let tracker = LatencyTracker::new();
tracker.record(Duration::ZERO);
assert_eq!(tracker.count(), 1);
assert_eq!(
tracker.max(),
Some(Duration::ZERO),
"max() must return Some(ZERO) for a zero-duration sample, not None"
);
assert_eq!(
tracker.min(),
Some(Duration::ZERO),
"min() must return Some(ZERO) for a zero-duration sample, not None"
);
}
#[test]
fn test_latency_guard() {
let tracker = LatencyTracker::new();
{
let _guard = tracker.start();
std::thread::sleep(Duration::from_millis(10));
}
assert_eq!(tracker.count(), 1);
assert!(tracker.min().unwrap() >= Duration::from_millis(10));
}
#[test]
fn test_producer_metrics() {
let metrics = ProducerMetrics::new();
metrics.record_send(100);
metrics.record_send(200);
metrics.record_batch(5);
metrics.record_error();
metrics.record_retry();
metrics.connections.set(3);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.records_sent, 7); assert_eq!(snapshot.bytes_sent, 300);
assert_eq!(snapshot.batches_sent, 1);
assert_eq!(snapshot.errors, 1);
assert_eq!(snapshot.retries, 1);
assert_eq!(snapshot.connections, 3);
}
#[test]
fn test_consumer_metrics() {
let metrics = ConsumerMetrics::new();
metrics.record_receive(10, 1000);
metrics.record_poll(10);
metrics.record_poll(0); metrics.record_fetch();
metrics.record_commit();
metrics.record_rebalance();
metrics.assigned_partitions.set(4);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.records_received, 10);
assert_eq!(snapshot.bytes_received, 1000);
assert_eq!(snapshot.polls, 2);
assert_eq!(snapshot.empty_polls, 1);
assert_eq!(snapshot.fetches, 1);
assert_eq!(snapshot.commits, 1);
assert_eq!(snapshot.rebalances, 1);
assert_eq!(snapshot.assigned_partitions, 4);
}
#[test]
fn test_connection_metrics() {
let metrics = ConnectionMetrics::new();
metrics.record_connect();
metrics.record_connect();
metrics.record_close();
metrics.record_error();
metrics.record_high_priority_request();
metrics.record_normal_priority_request();
metrics.record_high_priority_bypass();
metrics.record_high_priority_bypass_yield();
metrics.record_throttle_delay(Duration::from_millis(25));
let snapshot = metrics.snapshot();
assert_eq!(snapshot.connections_created, 2);
assert_eq!(snapshot.connections_closed, 1);
assert_eq!(snapshot.active_connections, 1);
assert_eq!(snapshot.connection_errors, 1);
assert_eq!(snapshot.high_priority_requests, 1);
assert_eq!(snapshot.normal_priority_requests, 1);
assert_eq!(snapshot.high_priority_bypasses, 1);
assert_eq!(snapshot.high_priority_bypass_yields, 1);
assert_eq!(snapshot.throttle_delays, 1);
assert_eq!(snapshot.throttle_delay_ms, 25);
}
#[test]
fn test_latency_reset() {
let tracker = LatencyTracker::new();
tracker.record(Duration::from_millis(100));
assert_eq!(tracker.count(), 1);
tracker.reset();
assert_eq!(tracker.count(), 0);
assert!(tracker.min().is_none());
}
#[test]
fn test_producer_prometheus_export() {
let metrics = ProducerMetrics::new();
metrics.record_send(100);
metrics.record_batch(5);
metrics.record_error();
let output = metrics.to_prometheus_text("krafka_producer");
assert!(output.contains("# TYPE krafka_producer_records_sent_total counter"));
assert!(output.contains("krafka_producer_records_sent_total 6"));
assert!(output.contains("krafka_producer_bytes_sent_total 100"));
assert!(output.contains("krafka_producer_errors_total 1"));
}
#[test]
fn test_consumer_prometheus_export() {
let metrics = ConsumerMetrics::new();
metrics.record_receive(10, 500);
metrics.record_poll(10);
metrics.record_seek(3);
metrics.assigned_partitions.set(3);
let output = metrics.to_prometheus_text("krafka_consumer");
assert!(output.contains("# TYPE krafka_consumer_records_received_total counter"));
assert!(output.contains("krafka_consumer_records_received_total 10"));
assert!(output.contains("krafka_consumer_bytes_received_total 500"));
assert!(output.contains("krafka_consumer_seeks_total 3"));
assert!(output.contains("krafka_consumer_assigned_partitions 3"));
}
#[test]
fn test_connection_prometheus_export() {
let metrics = ConnectionMetrics::new();
metrics.record_connect();
metrics.record_connect();
metrics.record_close();
let output = metrics.to_prometheus_text("krafka_connection");
assert!(output.contains("krafka_connection_connections_created_total 2"));
assert!(output.contains("krafka_connection_connections_closed_total 1"));
assert!(output.contains("krafka_connection_active_connections 1"));
}
#[test]
fn test_connection_priority_prometheus_export() {
let metrics = ConnectionMetrics::new();
metrics.record_high_priority_request();
metrics.record_normal_priority_request();
metrics.record_high_priority_bypass();
metrics.record_high_priority_bypass_yield();
metrics.record_throttle_delay(Duration::from_millis(75));
let output = metrics.to_prometheus_text("krafka_connection");
assert!(output.contains("krafka_connection_high_priority_requests_total 1"));
assert!(output.contains("krafka_connection_normal_priority_requests_total 1"));
assert!(output.contains("krafka_connection_high_priority_bypasses_total 1"));
assert!(output.contains("krafka_connection_high_priority_bypass_yields_total 1"));
assert!(output.contains("krafka_connection_throttle_delays_total 1"));
assert!(output.contains("krafka_connection_throttle_delay_ms_total 75"));
}
#[test]
fn test_krafka_metrics_registry() {
let registry = KrafkaMetrics::new();
let producer = registry.producer_metrics();
let consumer = registry.consumer_metrics();
producer.record_send(100);
consumer.record_poll(5);
let output = registry.to_prometheus_text();
assert!(output.contains("krafka_producer_records_sent_total 1"));
assert!(output.contains("krafka_consumer_polls_total 1"));
}
#[test]
fn test_krafka_metrics_reset() {
let registry = KrafkaMetrics::new();
let producer = registry.producer_metrics();
let consumer = registry.consumer_metrics();
let connection = registry.connection_metrics();
producer.record_send(100);
consumer.record_seek(2);
connection.record_high_priority_request();
connection.record_high_priority_bypass_yield();
connection.record_throttle_delay(Duration::from_millis(10));
assert_eq!(producer.records_sent.get(), 1);
assert_eq!(consumer.seeks.get(), 2);
assert_eq!(connection.high_priority_requests.get(), 1);
assert_eq!(connection.high_priority_bypass_yields.get(), 1);
assert_eq!(connection.throttle_delay_ms.get(), 10);
registry.reset();
assert_eq!(producer.records_sent.get(), 0);
assert_eq!(consumer.seeks.get(), 0);
assert_eq!(connection.high_priority_requests.get(), 0);
assert_eq!(connection.high_priority_bypass_yields.get(), 0);
assert_eq!(connection.throttle_delay_ms.get(), 0);
}
#[test]
fn test_latency_prometheus_format() {
let metrics = ProducerMetrics::new();
metrics.send_latency.record(Duration::from_millis(50));
metrics.send_latency.record(Duration::from_millis(100));
let output = metrics.to_prometheus_text("test");
assert!(output.contains("# TYPE test_send_latency_seconds summary"));
assert!(output.contains("test_send_latency_seconds_count 2"));
assert!(output.contains("test_send_latency_seconds_sum"));
assert!(output.contains("test_send_latency_seconds_min"));
assert!(output.contains("test_send_latency_seconds_max"));
}
#[test]
fn test_consumer_lag_metrics() {
let metrics = ConsumerMetrics::new();
assert_eq!(metrics.lag.get(), 0);
assert_eq!(metrics.lag_max.get(), 0);
metrics.lag.set(42);
metrics.lag_max.set(15);
assert_eq!(metrics.lag.get(), 42);
assert_eq!(metrics.lag_max.get(), 15);
let snap = metrics.snapshot();
assert_eq!(snap.lag, 42);
assert_eq!(snap.lag_max, 15);
}
#[test]
fn test_consumer_lag_prometheus_export() {
let metrics = ConsumerMetrics::new();
metrics.lag.set(100);
metrics.lag_max.set(30);
let output = metrics.to_prometheus_text("c");
assert!(output.contains("# HELP c_lag Total consumer lag across all assigned partitions"));
assert!(output.contains("# TYPE c_lag gauge"));
assert!(output.contains("c_lag 100"));
assert!(output.contains("# HELP c_lag_max Maximum per-partition consumer lag"));
assert!(output.contains("# TYPE c_lag_max gauge"));
assert!(output.contains("c_lag_max 30"));
}
#[test]
fn test_json_exporter_counter() {
let metrics = ProducerMetrics::new();
metrics.record_send(100);
metrics.record_batch(5);
let json = metrics.to_json("p");
assert!(json.starts_with('['));
assert!(json.ends_with(']'));
assert!(json.contains("\"name\":\"p_records_sent\""));
assert!(json.contains("\"type\":\"counter\""));
assert!(json.contains("\"value\":6"));
}
#[test]
fn test_json_exporter_gauge() {
let metrics = ConsumerMetrics::new();
metrics.assigned_partitions.set(4);
let json = metrics.to_json("c");
assert!(json.contains("\"name\":\"c_assigned_partitions\""));
assert!(json.contains("\"type\":\"gauge\""));
assert!(json.contains("\"value\":4"));
}
#[test]
fn test_json_exporter_latency() {
let metrics = ProducerMetrics::new();
metrics.send_latency.record(Duration::from_millis(50));
metrics.send_latency.record(Duration::from_millis(100));
let json = metrics.to_json("p");
assert!(json.contains("\"name\":\"p_send_latency\""));
assert!(json.contains("\"type\":\"latency\""));
assert!(json.contains("\"count\":2"));
assert!(json.contains("\"sum_seconds\":"));
}
#[test]
fn test_json_exporter_empty() {
let exporter = JsonExporter::new();
assert_eq!(exporter.finish(), "[]");
}
#[test]
fn test_krafka_metrics_json() {
let registry = KrafkaMetrics::new();
let producer = registry.producer_metrics();
producer.record_send(42);
let json = registry.to_json();
assert!(json.contains("\"name\":\"krafka_producer_records_sent\""));
assert!(json.contains("\"value\":1"));
}
#[test]
fn test_krafka_metrics_export_all() {
let registry = KrafkaMetrics::new();
let producer = registry.producer_metrics();
producer.record_send(100);
let mut exporter = PrometheusExporter::new();
registry.export_all(&mut exporter);
let output = exporter.finish();
assert!(output.contains("krafka_producer_records_sent_total 1"));
assert!(output.contains("krafka_consumer_polls_total 0"));
assert!(output.contains("krafka_connection_connections_created_total 0"));
}
#[test]
fn test_custom_exporter() {
struct CountingExporter {
counters: usize,
gauges: usize,
latencies: usize,
}
impl MetricsExporter for CountingExporter {
fn export_counter(&mut self, _name: &str, _help: &str, _value: u64) {
self.counters += 1;
}
fn export_gauge(&mut self, _name: &str, _help: &str, _value: u64) {
self.gauges += 1;
}
fn export_latency(&mut self, _name: &str, _help: &str, _snapshot: &LatencySnapshot) {
self.latencies += 1;
}
}
let metrics = ProducerMetrics::new();
let mut exporter = CountingExporter {
counters: 0,
gauges: 0,
latencies: 0,
};
metrics.export_metrics("test", &mut exporter);
assert_eq!(exporter.counters, 7);
assert_eq!(exporter.gauges, 2);
assert_eq!(exporter.latencies, 1);
}
#[test]
fn test_json_escape() {
assert_eq!(json_escape("hello"), "hello");
assert_eq!(json_escape("he\"llo"), "he\\\"llo");
assert_eq!(json_escape("he\\llo"), "he\\\\llo");
assert_eq!(json_escape("he\nllo"), "he\\nllo");
}
#[test]
fn test_sanitize_prometheus_name_valid() {
assert_eq!(
sanitize_prometheus_name("krafka_requests_total"),
"krafka_requests_total"
);
}
#[test]
fn test_sanitize_prometheus_name_dots_hyphens() {
assert_eq!(
sanitize_prometheus_name("kafka.producer-send.rate"),
"kafka_producer_send_rate"
);
}
#[test]
fn test_sanitize_prometheus_name_leading_digit() {
assert_eq!(sanitize_prometheus_name("9lives"), "_9lives");
}
#[test]
fn test_sanitize_prometheus_name_colons_preserved() {
assert_eq!(
sanitize_prometheus_name("namespace:metric"),
"namespace:metric"
);
}
#[test]
fn test_latency_fetch_min_max() {
let tracker = LatencyTracker::new();
tracker.record(Duration::from_millis(100));
tracker.record(Duration::from_millis(50));
tracker.record(Duration::from_millis(200));
let snapshot = tracker.snapshot();
assert_eq!(snapshot.count, 3);
assert_eq!(snapshot.min, Some(Duration::from_millis(50)));
assert_eq!(snapshot.max, Some(Duration::from_millis(200)));
}
#[test]
fn test_bucket_for_zero_and_small() {
assert_eq!(LatencyTracker::bucket_for(0), 0);
assert_eq!(LatencyTracker::bucket_for(1), 1);
assert_eq!(LatencyTracker::bucket_for(2), 1);
assert_eq!(LatencyTracker::bucket_for(3), 1);
assert_eq!(LatencyTracker::bucket_for(4), 2);
assert_eq!(LatencyTracker::bucket_for(5), 3);
assert_eq!(LatencyTracker::bucket_for(6), 4);
assert_eq!(LatencyTracker::bucket_for(7), 5);
assert_eq!(LatencyTracker::bucket_for(8), 6); assert_eq!(LatencyTracker::bucket_for(9), 6); assert_eq!(LatencyTracker::bucket_for(10), 7); assert_eq!(LatencyTracker::bucket_for(14), 9); assert_eq!(LatencyTracker::bucket_for(15), 9); assert_eq!(LatencyTracker::bucket_for(16), 10);
assert_eq!(LatencyTracker::bucket_for(20), 11);
assert_eq!(LatencyTracker::bucket_for(24), 12);
assert_eq!(LatencyTracker::bucket_for(28), 13);
assert_eq!(LatencyTracker::bucket_for(31), 13);
}
#[test]
fn test_estimate_nanos_for_bucket_correctness() {
assert_eq!(LatencyTracker::estimate_nanos_for_bucket(0), 0);
assert_eq!(LatencyTracker::estimate_nanos_for_bucket(1), 2);
assert_eq!(LatencyTracker::estimate_nanos_for_bucket(2), 4);
assert_eq!(LatencyTracker::estimate_nanos_for_bucket(3), 5);
assert_eq!(LatencyTracker::estimate_nanos_for_bucket(4), 6);
assert_eq!(LatencyTracker::estimate_nanos_for_bucket(5), 7);
assert_eq!(LatencyTracker::estimate_nanos_for_bucket(6), 9); assert_eq!(LatencyTracker::estimate_nanos_for_bucket(7), 11); assert_eq!(LatencyTracker::estimate_nanos_for_bucket(8), 13); assert_eq!(LatencyTracker::estimate_nanos_for_bucket(9), 15); assert_eq!(LatencyTracker::estimate_nanos_for_bucket(10), 18); assert_eq!(LatencyTracker::estimate_nanos_for_bucket(11), 22); assert_eq!(LatencyTracker::estimate_nanos_for_bucket(12), 26); assert_eq!(LatencyTracker::estimate_nanos_for_bucket(13), 30); }
#[test]
fn test_sub_bucket_relative_error_within_12_5_percent() {
let test_nanos: &[u64] = &[
4,
8,
16,
32,
64,
128,
256,
512,
1_000,
2_000,
4_000,
8_000,
16_000,
32_000,
64_000,
100_000,
1_000_000,
10_000_000,
50_000_000,
100_000_000,
];
for &nanos in test_nanos {
let bucket = LatencyTracker::bucket_for(nanos);
let estimate = LatencyTracker::estimate_nanos_for_bucket(bucket) as f64;
let relative_error = (estimate - nanos as f64) / nanos as f64;
assert!(
(-0.125 - 1e-9..=0.125 + 1e-9).contains(&relative_error),
"nanos={nanos}: estimate={estimate}, relative_error={relative_error:.4} > 12.5%"
);
}
}
#[test]
fn test_percentile_accuracy_for_known_values() {
let tracker = LatencyTracker::new();
for _ in 0..100 {
tracker.record(Duration::from_millis(10));
}
let exact_nanos = 10_000_000u64;
for pct in [50.0_f64, 95.0, 99.0] {
let est = tracker.percentile(pct).unwrap().as_nanos() as u64;
let err = (est as f64 - exact_nanos as f64) / exact_nanos as f64;
assert!(
err.abs() <= 0.125 + 1e-9,
"p{pct}: estimate {est} ns, true {exact_nanos} ns, err={err:.4}"
);
}
}
}