Skip to main content

dynomite/stats/
prometheus.rs

1//! Prometheus text exposition rendering for stats snapshots.
2//!
3//! The renderer walks the immutable [`Snapshot`] value and produces a
4//! Prometheus 0.0.4 text-format string suitable for serving over an
5//! HTTP `/metrics` endpoint. The rendering uses the `prometheus`
6//! crate's [`Registry`] and [`TextEncoder`]: a fresh registry is built
7//! per call, every metric family is registered with its `# HELP` and
8//! `# TYPE` headers, and the counter/gauge values are filled from the
9//! snapshot before encoding.
10//!
11//! Naming conventions:
12//!
13//! * Pool counters become `dynomite_pool_<field>_total` with a single
14//!   `pool` label.
15//! * Pool gauges and timestamps become `dynomite_pool_<field>` with a
16//!   single `pool` label.
17//! * Server counters become `dynomite_server_<field>_total` with a
18//!   single `server` label.
19//! * Server gauges and timestamps become `dynomite_server_<field>`
20//!   with a single `server` label.
21//! * Histogram summaries (latency, payload size, queue waits, etc.)
22//!   are exposed as gauges named `dynomite_<channel>_microseconds`
23//!   carrying a `quantile` label. Prometheus does not have a way to
24//!   round-trip a pre-aggregated estimated histogram, so we publish
25//!   the same quantile rollups the JSON endpoint already exposes.
26//! * The build identification block is published as
27//!   `dynomite_build_info{version,source,rack,dc}` set to `1`, the
28//!   convention popularised by `node_exporter`.
29//! * Each server entry also produces `dynomite_peer_state` with the
30//!   `peer` and `state` labels, set to `1` for `up` and `0` for
31//!   `down`. The current snapshot model treats every server as up
32//!   (the eject timestamps live in their respective metrics); the
33//!   gauge is emitted so dashboards have a stable label set to
34//!   target.
35
36use prometheus::{Encoder, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder};
37
38use crate::cluster::peer::PeerState;
39use crate::stats::codec::{StatsMetricType, POOL_CODEC, SERVER_CODEC};
40use crate::stats::failure::FailureSnapshot;
41use crate::stats::snapshot::{HistogramSummary, Snapshot};
42
43/// Render a [`Snapshot`] in the Prometheus 0.0.4 text exposition format.
44///
45/// The string returned is a complete, self-contained response body
46/// that may be served directly with a
47/// `Content-Type: text/plain; version=0.0.4; charset=utf-8` header.
48///
49/// # Examples
50///
51/// ```
52/// use dynomite::stats::{render_prometheus, PoolStats, ServerStats, ServiceInfo, Snapshot};
53///
54/// let snap = Snapshot {
55///     info: ServiceInfo {
56///         source: "node-a".into(),
57///         version: "0.0.1".into(),
58///         rack: "r1".into(),
59///         dc: "dc1".into(),
60///     },
61///     pool: PoolStats::new("dyn_o_mite"),
62///     server: ServerStats::new("redis_local"),
63///     ..Snapshot::default()
64/// };
65/// let text = render_prometheus(&snap);
66/// assert!(text.contains("dynomite_build_info"));
67/// assert!(text.contains("# TYPE dynomite_build_info gauge"));
68/// ```
69pub fn render_prometheus(snap: &Snapshot) -> String {
70    let registry = Registry::new();
71    register_build_info(&registry, snap);
72    register_uptime(&registry, snap);
73    register_resource_usage(&registry, snap);
74    register_pool(&registry, snap);
75    register_server(&registry, snap);
76    register_peer_state(&registry, snap);
77    register_failure_metrics(&registry, &snap.failure);
78    register_histogram_summaries(&registry, snap);
79    register_queue_p99s(&registry, snap);
80
81    let mut buf = Vec::with_capacity(8 * 1024);
82    let encoder = TextEncoder::new();
83    encoder
84        .encode(&registry.gather(), &mut buf)
85        .expect("invariant: TextEncoder writes valid UTF-8 into Vec<u8>");
86    String::from_utf8(buf).expect("invariant: TextEncoder emits UTF-8")
87}
88
89fn register_build_info(registry: &Registry, snap: &Snapshot) {
90    let opts = Opts::new(
91        "dynomite_build_info",
92        "Static identification of the running engine; value is always 1.",
93    );
94    let gauge = IntGaugeVec::new(opts, &["version", "source", "rack", "dc"])
95        .expect("invariant: build_info descriptor is valid");
96    gauge
97        .with_label_values(&[
98            &snap.info.version,
99            &snap.info.source,
100            &snap.info.rack,
101            &snap.info.dc,
102        ])
103        .set(1);
104    registry
105        .register(Box::new(gauge))
106        .expect("invariant: build_info registers cleanly");
107}
108
109fn register_uptime(registry: &Registry, snap: &Snapshot) {
110    let opts = Opts::new(
111        "dynomite_uptime_seconds",
112        "Seconds elapsed since the engine started.",
113    );
114    let gauge = IntGaugeVec::new(opts, &[]).expect("invariant: uptime descriptor is valid");
115    gauge.with_label_values::<&str>(&[]).set(snap.uptime);
116    registry
117        .register(Box::new(gauge))
118        .expect("invariant: uptime registers cleanly");
119
120    let opts = Opts::new(
121        "dynomite_timestamp_seconds",
122        "Wall-clock seconds since the UNIX epoch at snapshot time.",
123    );
124    let gauge = IntGaugeVec::new(opts, &[]).expect("invariant: timestamp descriptor is valid");
125    gauge.with_label_values::<&str>(&[]).set(snap.timestamp);
126    registry
127        .register(Box::new(gauge))
128        .expect("invariant: timestamp registers cleanly");
129}
130
131fn register_resource_usage(registry: &Registry, snap: &Snapshot) {
132    let entries: [(&str, &str, i64); 5] = [
133        (
134            "dynomite_alloc_msgs",
135            "Number of message structs currently allocated.",
136            snap.alloc_msgs,
137        ),
138        (
139            "dynomite_free_msgs",
140            "Number of message structs on the free list.",
141            snap.free_msgs,
142        ),
143        (
144            "dynomite_alloc_mbufs",
145            "Number of mbuf chunks currently allocated.",
146            snap.alloc_mbufs,
147        ),
148        (
149            "dynomite_free_mbufs",
150            "Number of mbuf chunks on the free list.",
151            snap.free_mbufs,
152        ),
153        (
154            "dynomite_memory_bytes",
155            "Resident set size of the engine in bytes.",
156            snap.dyn_memory,
157        ),
158    ];
159    for (name, help, value) in entries {
160        let gauge = IntGaugeVec::new(Opts::new(name, help), &[])
161            .expect("invariant: resource gauge descriptor is valid");
162        gauge.with_label_values::<&str>(&[]).set(value);
163        registry
164            .register(Box::new(gauge))
165            .expect("invariant: resource gauge registers cleanly");
166    }
167}
168
169fn register_pool(registry: &Registry, snap: &Snapshot) {
170    let pool = &snap.pool.name;
171    for (i, spec) in POOL_CODEC.iter().enumerate() {
172        let value = snap.pool.metrics.get(i).copied().unwrap_or(0);
173        match spec.kind {
174            StatsMetricType::Counter => {
175                let name = format!("dynomite_pool_{}_total", spec.name);
176                let opts = Opts::new(name, spec.description);
177                let counter = IntCounterVec::new(opts, &["pool"])
178                    .expect("invariant: pool counter descriptor is valid");
179                if value > 0 {
180                    counter
181                        .with_label_values(&[pool.as_str()])
182                        .inc_by(u64::try_from(value).unwrap_or(0));
183                } else {
184                    let _ = counter.with_label_values(&[pool.as_str()]);
185                }
186                registry
187                    .register(Box::new(counter))
188                    .expect("invariant: pool counter registers cleanly");
189            }
190            StatsMetricType::Gauge | StatsMetricType::Timestamp => {
191                let name = format!("dynomite_pool_{}", spec.name);
192                let opts = Opts::new(name, spec.description);
193                let gauge = IntGaugeVec::new(opts, &["pool"])
194                    .expect("invariant: pool gauge descriptor is valid");
195                gauge.with_label_values(&[pool.as_str()]).set(value);
196                registry
197                    .register(Box::new(gauge))
198                    .expect("invariant: pool gauge registers cleanly");
199            }
200        }
201    }
202}
203
204fn register_server(registry: &Registry, snap: &Snapshot) {
205    let server = &snap.server.name;
206    for (i, spec) in SERVER_CODEC.iter().enumerate() {
207        let value = snap.server.metrics.get(i).copied().unwrap_or(0);
208        match spec.kind {
209            StatsMetricType::Counter => {
210                let name = format!("dynomite_server_{}_total", spec.name);
211                let opts = Opts::new(name, spec.description);
212                let counter = IntCounterVec::new(opts, &["server"])
213                    .expect("invariant: server counter descriptor is valid");
214                if value > 0 {
215                    counter
216                        .with_label_values(&[server.as_str()])
217                        .inc_by(u64::try_from(value).unwrap_or(0));
218                } else {
219                    let _ = counter.with_label_values(&[server.as_str()]);
220                }
221                registry
222                    .register(Box::new(counter))
223                    .expect("invariant: server counter registers cleanly");
224            }
225            StatsMetricType::Gauge | StatsMetricType::Timestamp => {
226                let name = format!("dynomite_server_{}", spec.name);
227                let opts = Opts::new(name, spec.description);
228                let gauge = IntGaugeVec::new(opts, &["server"])
229                    .expect("invariant: server gauge descriptor is valid");
230                gauge.with_label_values(&[server.as_str()]).set(value);
231                registry
232                    .register(Box::new(gauge))
233                    .expect("invariant: server gauge registers cleanly");
234            }
235        }
236    }
237}
238
239fn register_peer_state(registry: &Registry, snap: &Snapshot) {
240    let opts = Opts::new(
241        "dynomite_peer_state",
242        "Peer up/down indicator. The active state has value 1; the other has value 0.",
243    );
244    let gauge = IntGaugeVec::new(opts, &["peer", "state"])
245        .expect("invariant: peer_state descriptor is valid");
246    let peer = snap.server.name.as_str();
247    gauge.with_label_values(&[peer, "up"]).set(1);
248    gauge.with_label_values(&[peer, "down"]).set(0);
249    registry
250        .register(Box::new(gauge))
251        .expect("invariant: peer_state registers cleanly");
252}
253
254fn register_failure_metrics(registry: &Registry, failure: &FailureSnapshot) {
255    register_failure_no_targets(registry, failure);
256    register_failure_peer_send(registry, failure);
257    register_failure_backend_send(registry, failure);
258    register_failure_response_timeout(registry, failure);
259    register_failure_peer_state(registry, failure);
260    register_failure_phi(registry, failure);
261    register_failure_phi_threshold(registry, failure);
262    register_failure_dwell(registry, failure);
263}
264
265fn register_failure_no_targets(registry: &Registry, failure: &FailureSnapshot) {
266    let opts = Opts::new(
267        "dispatch_no_targets_total",
268        "Dispatch failures because the only routable peer for the hashed token was Down or absent.",
269    );
270    let counter = IntCounterVec::new(opts, &["dc", "rack", "consistency_level"])
271        .expect("invariant: dispatch_no_targets descriptor is valid");
272    for entry in &failure.no_targets {
273        counter
274            .with_label_values(&[
275                entry.dc.as_str(),
276                entry.rack.as_str(),
277                entry.consistency.name(),
278            ])
279            .inc_by(entry.count);
280    }
281    registry
282        .register(Box::new(counter))
283        .expect("invariant: dispatch_no_targets registers cleanly");
284}
285
286fn register_failure_peer_send(registry: &Registry, failure: &FailureSnapshot) {
287    let full = IntCounterVec::new(
288        Opts::new(
289            "dispatch_peer_send_full_total",
290            "Dispatcher try_send to a peer's outbound channel returned Full.",
291        ),
292        &["peer_idx", "peer_dc"],
293    )
294    .expect("invariant: dispatch_peer_send_full descriptor is valid");
295    for entry in &failure.peer_send_full {
296        full.with_label_values(&[&entry.peer_idx.to_string(), &entry.peer_dc])
297            .inc_by(entry.count);
298    }
299    registry
300        .register(Box::new(full))
301        .expect("invariant: dispatch_peer_send_full registers cleanly");
302
303    let closed = IntCounterVec::new(
304        Opts::new(
305            "dispatch_peer_send_closed_total",
306            "Dispatcher try_send to a peer's outbound channel returned Closed.",
307        ),
308        &["peer_idx", "peer_dc"],
309    )
310    .expect("invariant: dispatch_peer_send_closed descriptor is valid");
311    for entry in &failure.peer_send_closed {
312        closed
313            .with_label_values(&[&entry.peer_idx.to_string(), &entry.peer_dc])
314            .inc_by(entry.count);
315    }
316    registry
317        .register(Box::new(closed))
318        .expect("invariant: dispatch_peer_send_closed registers cleanly");
319}
320
321fn register_failure_backend_send(registry: &Registry, failure: &FailureSnapshot) {
322    let full = IntCounterVec::new(
323        Opts::new(
324            "dispatch_backend_send_full_total",
325            "Dispatcher try_send to the local datastore backend returned Full.",
326        ),
327        &[],
328    )
329    .expect("invariant: dispatch_backend_send_full descriptor is valid");
330    if failure.backend_send_full > 0 {
331        full.with_label_values::<&str>(&[])
332            .inc_by(failure.backend_send_full);
333    } else {
334        let _ = full.with_label_values::<&str>(&[]);
335    }
336    registry
337        .register(Box::new(full))
338        .expect("invariant: dispatch_backend_send_full registers cleanly");
339
340    let closed = IntCounterVec::new(
341        Opts::new(
342            "dispatch_backend_send_closed_total",
343            "Dispatcher try_send to the local datastore backend returned Closed.",
344        ),
345        &[],
346    )
347    .expect("invariant: dispatch_backend_send_closed descriptor is valid");
348    if failure.backend_send_closed > 0 {
349        closed
350            .with_label_values::<&str>(&[])
351            .inc_by(failure.backend_send_closed);
352    } else {
353        let _ = closed.with_label_values::<&str>(&[]);
354    }
355    registry
356        .register(Box::new(closed))
357        .expect("invariant: dispatch_backend_send_closed registers cleanly");
358}
359
360fn register_failure_response_timeout(registry: &Registry, failure: &FailureSnapshot) {
361    let counter = IntCounterVec::new(
362        Opts::new(
363            "dispatch_response_timeout_total",
364            "Dispatcher's response coalescer gave up waiting for replies.",
365        ),
366        &["consistency_level"],
367    )
368    .expect("invariant: dispatch_response_timeout descriptor is valid");
369    for entry in &failure.response_timeout {
370        counter
371            .with_label_values(&[entry.consistency.name()])
372            .inc_by(entry.count);
373    }
374    registry
375        .register(Box::new(counter))
376        .expect("invariant: dispatch_response_timeout registers cleanly");
377}
378
379fn register_failure_peer_state(registry: &Registry, failure: &FailureSnapshot) {
380    let trans = IntCounterVec::new(
381        Opts::new(
382            "peer_state_transitions_total",
383            "Number of gossip-driven peer-state transitions, labelled by from/to state.",
384        ),
385        &["peer_idx", "from_state", "to_state"],
386    )
387    .expect("invariant: peer_state_transitions descriptor is valid");
388    for entry in &failure.peer_state_transitions {
389        let peer_idx = entry.peer_idx.to_string();
390        trans
391            .with_label_values(&[peer_idx.as_str(), entry.from.name(), entry.to.name()])
392            .inc_by(entry.count);
393    }
394    registry
395        .register(Box::new(trans))
396        .expect("invariant: peer_state_transitions registers cleanly");
397
398    let current = IntGaugeVec::new(
399        Opts::new(
400            "peer_state_current",
401            "Current peer state. Numeric value matches PeerState's repr(u8): \
402             0=Unknown, 1=Joining, 2=Normal, 3=Standby, 4=Down, 5=Reset, 6=Leaving.",
403        ),
404        &["peer_idx", "dc", "rack"],
405    )
406    .expect("invariant: peer_state_current descriptor is valid");
407    for entry in &failure.peer_state_current {
408        current
409            .with_label_values(&[&entry.peer_idx.to_string(), &entry.dc, &entry.rack])
410            .set(peer_state_value(entry.state));
411    }
412    registry
413        .register(Box::new(current))
414        .expect("invariant: peer_state_current registers cleanly");
415}
416
417fn register_failure_phi(registry: &Registry, failure: &FailureSnapshot) {
418    let gauge = IntGaugeVec::new(
419        Opts::new(
420            "gossip_phi_score_milli",
421            "Phi-accrual failure detector score per peer, scaled by 1000 (gauge units = thousandths).",
422        ),
423        &["peer_idx", "dc", "rack"],
424    )
425    .expect("invariant: gossip_phi_score descriptor is valid");
426    for entry in &failure.peer_phi {
427        let value = phi_to_milli_clamped(entry.phi);
428        gauge
429            .with_label_values(&[&entry.peer_idx.to_string(), &entry.dc, &entry.rack])
430            .set(value);
431    }
432    registry
433        .register(Box::new(gauge))
434        .expect("invariant: gossip_phi_score registers cleanly");
435}
436
437fn register_failure_phi_threshold(registry: &Registry, failure: &FailureSnapshot) {
438    let gauge = IntGaugeVec::new(
439        Opts::new(
440            "gossip_phi_threshold_observed_milli",
441            "Phi-accrual threshold the failure detector last evaluated against the peer, \
442             scaled by 1000 (gauge units = thousandths). Use to confirm operator-tuned \
443             thresholds against the gossip handler's running config.",
444        ),
445        &["peer_idx", "dc", "rack"],
446    )
447    .expect("invariant: gossip_phi_threshold_observed descriptor is valid");
448    for entry in &failure.peer_threshold {
449        let value = phi_to_milli_clamped(entry.threshold);
450        gauge
451            .with_label_values(&[&entry.peer_idx.to_string(), &entry.dc, &entry.rack])
452            .set(value);
453    }
454    registry
455        .register(Box::new(gauge))
456        .expect("invariant: gossip_phi_threshold_observed registers cleanly");
457}
458
459fn register_failure_dwell(registry: &Registry, failure: &FailureSnapshot) {
460    use crate::stats::failure::DWELL_BUCKETS_SECONDS;
461    if failure.peer_state_dwell.is_empty() {
462        return;
463    }
464    // Cumulative bucket counts emitted as `peer_state_dwell_seconds_bucket{state, le}`.
465    let bucket_gauge = IntGaugeVec::new(
466        Opts::new(
467            "peer_state_dwell_seconds_bucket",
468            "Cumulative count of peer-state dwell observations whose duration is <= 'le', per state.",
469        ),
470        &["state", "le"],
471    )
472    .expect("invariant: peer_state_dwell_seconds_bucket descriptor is valid");
473    let count_gauge = IntGaugeVec::new(
474        Opts::new(
475            "peer_state_dwell_seconds_count",
476            "Total number of peer-state dwell observations recorded for the labelled state.",
477        ),
478        &["state"],
479    )
480    .expect("invariant: peer_state_dwell_seconds_count descriptor is valid");
481    let sum_gauge = IntGaugeVec::new(
482        Opts::new(
483            "peer_state_dwell_seconds_sum_milli",
484            "Sum of dwell observations in milliseconds per state. Divide by 1000 for seconds.",
485        ),
486        &["state"],
487    )
488    .expect("invariant: peer_state_dwell_seconds_sum descriptor is valid");
489    for entry in &failure.peer_state_dwell {
490        let state_label = entry.state.name();
491        let count = i64::try_from(entry.count).unwrap_or(i64::MAX);
492        count_gauge.with_label_values(&[state_label]).set(count);
493        let sum_milli = phi_to_milli_clamped(entry.sum_seconds);
494        sum_gauge.with_label_values(&[state_label]).set(sum_milli);
495        for (i, upper) in DWELL_BUCKETS_SECONDS.iter().enumerate() {
496            if let Some(c) = entry.bucket_counts.get(i) {
497                let val = i64::try_from(*c).unwrap_or(i64::MAX);
498                let le = format_le(*upper);
499                bucket_gauge.with_label_values(&[state_label, &le]).set(val);
500            }
501        }
502        if let Some(c) = entry.bucket_counts.last() {
503            let val = i64::try_from(*c).unwrap_or(i64::MAX);
504            bucket_gauge
505                .with_label_values(&[state_label, "+Inf"])
506                .set(val);
507        }
508    }
509    registry
510        .register(Box::new(bucket_gauge))
511        .expect("invariant: peer_state_dwell_seconds_bucket registers cleanly");
512    registry
513        .register(Box::new(count_gauge))
514        .expect("invariant: peer_state_dwell_seconds_count registers cleanly");
515    registry
516        .register(Box::new(sum_gauge))
517        .expect("invariant: peer_state_dwell_seconds_sum registers cleanly");
518}
519
520/// Format a bucket upper-bound for the `le` label. Whole-second
521/// boundaries are rendered without a fractional component so a
522/// dashboard cleanly groups buckets like `1` instead of `1.0`.
523fn format_le(upper: f64) -> String {
524    if upper.fract() == 0.0 && (0.0..1e15).contains(&upper) {
525        // Safe: the integer projection of a non-negative finite
526        // value below 10^15 fits in u64 and we only emit it for
527        // the label string.
528        let as_u64 = if (0.0..1e15).contains(&upper) {
529            #[allow(
530                clippy::cast_possible_truncation,
531                clippy::cast_sign_loss,
532                reason = "label rendering of a known finite, non-negative, sub-1e15 bucket boundary"
533            )]
534            {
535                upper as u64
536            }
537        } else {
538            0
539        };
540        format!("{as_u64}")
541    } else {
542        format!("{upper}")
543    }
544}
545
546/// Map a [`PeerState`] to the integer value the Prometheus gauge
547/// publishes. Mirrors the enum's `repr(u8)` discriminants but
548/// goes via a match so the conversion is explicit and the
549/// pedantic cast lints stay clean.
550fn peer_state_value(state: PeerState) -> i64 {
551    match state {
552        PeerState::Unknown => 0,
553        PeerState::Joining => 1,
554        PeerState::Normal => 2,
555        PeerState::Standby => 3,
556        PeerState::Down => 4,
557        PeerState::Reset => 5,
558        PeerState::Leaving => 6,
559    }
560}
561
562/// Render a finite phi value in thousandths as an `i64`. The
563/// snapshot already clamps the upstream value; this helper
564/// repeats the clamp for safety against future refactors.
565fn phi_to_milli_clamped(phi: f64) -> i64 {
566    if !phi.is_finite() || phi <= 0.0 {
567        return 0;
568    }
569    let saturating = i64::MAX / 1000;
570    let scaled = (phi * 1000.0).round();
571    if !scaled.is_finite() || scaled <= 0.0 {
572        return 0;
573    }
574    let bits = scaled.to_bits();
575    let exp_field = u32::try_from((bits >> 52) & 0x7FF).unwrap_or(0);
576    if exp_field < 1023 {
577        return 0;
578    }
579    let unbiased = exp_field - 1023;
580    if unbiased >= 63 {
581        return saturating;
582    }
583    let mant = bits & ((1u64 << 52) - 1);
584    let m = (1u64 << 52) | mant;
585    let value = if unbiased >= 52 {
586        m.checked_shl(unbiased - 52).unwrap_or(u64::MAX)
587    } else {
588        m >> (52 - unbiased)
589    };
590    i64::try_from(value).unwrap_or(saturating).min(saturating)
591}
592
593fn register_histogram_summaries(registry: &Registry, snap: &Snapshot) {
594    let entries: [(&str, &str, &HistogramSummary); 8] = [
595        (
596            "dynomite_request_latency_microseconds",
597            "Top-level request latency in microseconds.",
598            &snap.latency,
599        ),
600        (
601            "dynomite_payload_size_bytes",
602            "Observed request/response payload sizes in bytes.",
603            &snap.payload_size,
604        ),
605        (
606            "dynomite_cross_region_latency_microseconds",
607            "Cross-region peer round-trip latency in microseconds.",
608            &snap.cross_region_latency,
609        ),
610        (
611            "dynomite_cross_zone_latency_microseconds",
612            "Cross-zone peer latency in microseconds.",
613            &snap.cross_zone_latency,
614        ),
615        (
616            "dynomite_server_latency_microseconds",
617            "Backing-server response latency in microseconds.",
618            &snap.server_latency,
619        ),
620        (
621            "dynomite_cross_region_queue_wait_microseconds",
622            "Cross-region queue wait time in microseconds.",
623            &snap.cross_region_queue_wait,
624        ),
625        (
626            "dynomite_cross_zone_queue_wait_microseconds",
627            "Cross-zone queue wait time in microseconds.",
628            &snap.cross_zone_queue_wait,
629        ),
630        (
631            "dynomite_server_queue_wait_microseconds",
632            "Server queue wait time in microseconds.",
633            &snap.server_queue_wait,
634        ),
635    ];
636    for (name, help, summary) in entries {
637        let gauge = IntGaugeVec::new(Opts::new(name, help), &["quantile"])
638            .expect("invariant: histogram quantile gauge is valid");
639        let s = *summary;
640        let mean_v = i64::try_from(s.mean).unwrap_or(i64::MAX);
641        let q95 = i64::try_from(s.p95).unwrap_or(i64::MAX);
642        let q99 = i64::try_from(s.p99).unwrap_or(i64::MAX);
643        let q999 = i64::try_from(s.p999).unwrap_or(i64::MAX);
644        let max_v = i64::try_from(s.max).unwrap_or(i64::MAX);
645        gauge.with_label_values(&["mean"]).set(mean_v);
646        gauge.with_label_values(&["0.95"]).set(q95);
647        gauge.with_label_values(&["0.99"]).set(q99);
648        gauge.with_label_values(&["0.999"]).set(q999);
649        gauge.with_label_values(&["max"]).set(max_v);
650        registry
651            .register(Box::new(gauge))
652            .expect("invariant: histogram quantile gauge registers cleanly");
653    }
654}
655
656fn register_queue_p99s(registry: &Registry, snap: &Snapshot) {
657    let entries: [(&str, &str, u64); 8] = [
658        (
659            "dynomite_client_out_queue_p99",
660            "99th percentile of the client outbound queue length.",
661            snap.client_out_queue_p99,
662        ),
663        (
664            "dynomite_server_in_queue_p99",
665            "99th percentile of the server inbound queue length.",
666            snap.server_in_queue_p99,
667        ),
668        (
669            "dynomite_server_out_queue_p99",
670            "99th percentile of the server outbound queue length.",
671            snap.server_out_queue_p99,
672        ),
673        (
674            "dynomite_dnode_client_out_queue_p99",
675            "99th percentile of the dnode client outbound queue length.",
676            snap.dnode_client_out_queue_p99,
677        ),
678        (
679            "dynomite_peer_in_queue_p99",
680            "99th percentile of the local-DC peer inbound queue length.",
681            snap.peer_in_queue_p99,
682        ),
683        (
684            "dynomite_peer_out_queue_p99",
685            "99th percentile of the local-DC peer outbound queue length.",
686            snap.peer_out_queue_p99,
687        ),
688        (
689            "dynomite_remote_peer_in_queue_p99",
690            "99th percentile of the remote-DC peer inbound queue length.",
691            snap.remote_peer_in_queue_p99,
692        ),
693        (
694            "dynomite_remote_peer_out_queue_p99",
695            "99th percentile of the remote-DC peer outbound queue length.",
696            snap.remote_peer_out_queue_p99,
697        ),
698    ];
699    for (name, help, value) in entries {
700        let gauge = IntGaugeVec::new(Opts::new(name, help), &[])
701            .expect("invariant: queue p99 gauge descriptor is valid");
702        let value_i64 = i64::try_from(value).unwrap_or(i64::MAX);
703        gauge.with_label_values::<&str>(&[]).set(value_i64);
704        registry
705            .register(Box::new(gauge))
706            .expect("invariant: queue p99 gauge registers cleanly");
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use super::*;
713    use crate::stats::codec::PoolField;
714    use crate::stats::snapshot::{PoolStats, ServerStats, ServiceInfo};
715
716    fn make_snap() -> Snapshot {
717        Snapshot {
718            info: ServiceInfo {
719                source: "node-a".into(),
720                version: "0.0.1".into(),
721                rack: "r1".into(),
722                dc: "dc1".into(),
723            },
724            pool: PoolStats::new("dyn_o_mite"),
725            server: ServerStats::new("redis_local"),
726            ..Snapshot::default()
727        }
728    }
729
730    #[test]
731    fn render_prometheus_includes_help_and_type_lines() {
732        let mut snap = make_snap();
733        snap.pool.metrics[PoolField::ClientEof.index()] = 7;
734        let out = render_prometheus(&snap);
735        assert!(
736            out.contains("# HELP dynomite_pool_client_eof_total"),
737            "missing # HELP for pool client_eof:\n{out}"
738        );
739        assert!(
740            out.contains("# TYPE dynomite_pool_client_eof_total counter"),
741            "missing # TYPE for pool client_eof:\n{out}"
742        );
743        assert!(
744            out.contains("dynomite_pool_client_eof_total{pool=\"dyn_o_mite\"} 7"),
745            "missing pool client_eof value line:\n{out}"
746        );
747    }
748
749    #[test]
750    fn render_prometheus_quotes_label_values() {
751        let mut snap = make_snap();
752        snap.pool = PoolStats::new("my\\pool\"");
753        snap.pool.metrics[PoolField::ClientEof.index()] = 3;
754        let out = render_prometheus(&snap);
755        let backslash = "\\\\";
756        let escaped_quote = "\\\"";
757        let expected_label = format!("pool=\"my{backslash}pool{escaped_quote}\"");
758        assert!(
759            out.contains(&expected_label),
760            "expected escaped label `{expected_label}` not found in:\n{out}"
761        );
762    }
763
764    #[test]
765    fn render_prometheus_emits_build_info() {
766        let snap = make_snap();
767        let out = render_prometheus(&snap);
768        assert!(
769            out.contains("# TYPE dynomite_build_info gauge"),
770            "missing build_info type line:\n{out}"
771        );
772        let needle = "dynomite_build_info{";
773        let pos = out
774            .find(needle)
775            .unwrap_or_else(|| panic!("missing build_info value line:\n{out}"));
776        let line_end = out[pos..].find('\n').map_or(out.len(), |n| pos + n);
777        let line = &out[pos..line_end];
778        assert!(
779            line.contains("version=\"0.0.1\""),
780            "build_info missing version label: {line}"
781        );
782        assert!(line.ends_with(" 1"), "build_info value should be 1: {line}");
783    }
784
785    #[test]
786    fn render_prometheus_includes_server_counters_and_uptime() {
787        let mut snap = make_snap();
788        snap.uptime = 42;
789        snap.server.metrics[crate::stats::ServerField::ReadRequests.index()] = 5;
790        let out = render_prometheus(&snap);
791        assert!(
792            out.contains("# TYPE dynomite_server_read_requests_total counter"),
793            "server counter type line missing"
794        );
795        assert!(
796            out.contains("dynomite_server_read_requests_total{server=\"redis_local\"} 5"),
797            "server counter value missing:\n{out}"
798        );
799        assert!(
800            out.contains("dynomite_uptime_seconds 42"),
801            "uptime gauge value missing:\n{out}"
802        );
803    }
804
805    #[test]
806    fn render_prometheus_emits_peer_state_for_server() {
807        let snap = make_snap();
808        let out = render_prometheus(&snap);
809        assert!(
810            out.contains("dynomite_peer_state{peer=\"redis_local\",state=\"up\"} 1"),
811            "peer_state up line missing:\n{out}"
812        );
813        assert!(
814            out.contains("dynomite_peer_state{peer=\"redis_local\",state=\"down\"} 0"),
815            "peer_state down line missing:\n{out}"
816        );
817    }
818
819    /// The failure-cause counters are wired into the
820    /// renderer; verify each family lands with the expected
821    /// HELP and TYPE headers and that label values from the
822    /// snapshot make it onto the wire.
823    #[test]
824    fn render_prometheus_emits_failure_cause_counters() {
825        use crate::cluster::peer::PeerState;
826        use crate::msg::ConsistencyLevel;
827        use crate::stats::FailureMetrics;
828
829        let metrics = FailureMetrics::new();
830        metrics.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
831        metrics.record_peer_send_full(7, "dc2");
832        metrics.record_peer_send_closed(7, "dc2");
833        metrics.record_backend_send_full();
834        metrics.record_backend_send_closed();
835        metrics.record_response_timeout(ConsistencyLevel::DcOne);
836        metrics.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
837        metrics.observe_phi(3, "dc1", "rA", 4.5);
838
839        let mut snap = make_snap();
840        snap.failure = metrics.snapshot();
841        let out = render_prometheus(&snap);
842
843        assert!(
844            out.contains("# TYPE dispatch_no_targets_total counter"),
845            "missing dispatch_no_targets type line:\n{out}"
846        );
847        assert!(
848            out.contains(
849                "dispatch_no_targets_total{consistency_level=\"DC_QUORUM\",dc=\"dc1\",rack=\"rA\"} 1"
850            ),
851            "missing dispatch_no_targets row:\n{out}"
852        );
853        assert!(
854            out.contains("# TYPE dispatch_peer_send_full_total counter"),
855            "missing dispatch_peer_send_full type line:\n{out}"
856        );
857        assert!(
858            out.contains("dispatch_peer_send_full_total{peer_dc=\"dc2\",peer_idx=\"7\"} 1"),
859            "missing dispatch_peer_send_full row:\n{out}"
860        );
861        assert!(
862            out.contains("dispatch_peer_send_closed_total{peer_dc=\"dc2\",peer_idx=\"7\"} 1"),
863            "missing dispatch_peer_send_closed row:\n{out}"
864        );
865        assert!(
866            out.contains("dispatch_backend_send_full_total 1"),
867            "missing dispatch_backend_send_full row:\n{out}"
868        );
869        assert!(
870            out.contains("dispatch_backend_send_closed_total 1"),
871            "missing dispatch_backend_send_closed row:\n{out}"
872        );
873        assert!(
874            out.contains("dispatch_response_timeout_total{consistency_level=\"DC_ONE\"} 1"),
875            "missing dispatch_response_timeout row:\n{out}"
876        );
877        assert!(
878            out.contains(
879                "peer_state_transitions_total{from_state=\"NORMAL\",peer_idx=\"3\",to_state=\"DOWN\"} 1"
880            ),
881            "missing peer_state_transitions row:\n{out}"
882        );
883        assert!(
884            out.contains("peer_state_current{dc=\"dc1\",peer_idx=\"3\",rack=\"rA\"} 4"),
885            "missing peer_state_current row (state=Down=4):\n{out}"
886        );
887        // phi=4.5 -> 4500 in the milli gauge.
888        assert!(
889            out.contains("gossip_phi_score_milli{dc=\"dc1\",peer_idx=\"3\",rack=\"rA\"} 4500"),
890            "missing gossip_phi_score_milli row:\n{out}"
891        );
892    }
893
894    /// The new threshold gauge and per-state dwell histogram
895    /// must reach the wire when populated.
896    #[test]
897    fn render_prometheus_emits_threshold_and_dwell_rows() {
898        use crate::cluster::peer::PeerState;
899        use crate::stats::FailureMetrics;
900        use std::time::{Duration, Instant};
901
902        let metrics = FailureMetrics::new();
903        metrics.observe_threshold(2, "dc1", "rA", 8.0);
904        let t0 = Instant::now();
905        metrics.record_peer_state_transition_at(
906            2,
907            "dc1",
908            "rA",
909            PeerState::Unknown,
910            PeerState::Normal,
911            t0,
912        );
913        // 1.25s in Normal -> Down.
914        metrics.record_peer_state_transition_at(
915            2,
916            "dc1",
917            "rA",
918            PeerState::Normal,
919            PeerState::Down,
920            t0 + Duration::from_millis(1_250),
921        );
922
923        let mut snap = make_snap();
924        snap.failure = metrics.snapshot();
925        let out = render_prometheus(&snap);
926
927        assert!(
928            out.contains(
929                "gossip_phi_threshold_observed_milli{dc=\"dc1\",peer_idx=\"2\",rack=\"rA\"} 8000"
930            ),
931            "missing gossip_phi_threshold_observed_milli row:\n{out}"
932        );
933        assert!(
934            out.contains("peer_state_dwell_seconds_count{state=\"NORMAL\"} 1"),
935            "missing peer_state_dwell_seconds_count row:\n{out}"
936        );
937        assert!(
938            out.contains("peer_state_dwell_seconds_bucket{le=\"+Inf\",state=\"NORMAL\"} 1"),
939            "missing peer_state_dwell_seconds_bucket +Inf row:\n{out}"
940        );
941        // 1.25s falls in 5s bucket but not in 1s bucket.
942        assert!(
943            out.contains("peer_state_dwell_seconds_bucket{le=\"5\",state=\"NORMAL\"} 1"),
944            "missing peer_state_dwell_seconds_bucket le=5 row:\n{out}"
945        );
946        assert!(
947            out.contains("peer_state_dwell_seconds_bucket{le=\"1\",state=\"NORMAL\"} 0"),
948            "missing peer_state_dwell_seconds_bucket le=1 (should be 0):\n{out}"
949        );
950    }
951}