1use prometheus::{Encoder, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder};
37
38use crate::cluster::peer::PeerState;
39use crate::stats::codec::{StatsMetricType, POOL_CODEC, SERVER_CODEC};
40use crate::stats::failure::FailureSnapshot;
41use crate::stats::snapshot::{HistogramSummary, Snapshot};
42
43pub fn render_prometheus(snap: &Snapshot) -> String {
70 let registry = Registry::new();
71 register_build_info(®istry, snap);
72 register_uptime(®istry, snap);
73 register_resource_usage(®istry, snap);
74 register_pool(®istry, snap);
75 register_server(®istry, snap);
76 register_peer_state(®istry, snap);
77 register_failure_metrics(®istry, &snap.failure);
78 register_histogram_summaries(®istry, snap);
79 register_queue_p99s(®istry, snap);
80
81 let mut buf = Vec::with_capacity(8 * 1024);
82 let encoder = TextEncoder::new();
83 encoder
84 .encode(®istry.gather(), &mut buf)
85 .expect("invariant: TextEncoder writes valid UTF-8 into Vec<u8>");
86 String::from_utf8(buf).expect("invariant: TextEncoder emits UTF-8")
87}
88
89fn register_build_info(registry: &Registry, snap: &Snapshot) {
90 let opts = Opts::new(
91 "dynomite_build_info",
92 "Static identification of the running engine; value is always 1.",
93 );
94 let gauge = IntGaugeVec::new(opts, &["version", "source", "rack", "dc"])
95 .expect("invariant: build_info descriptor is valid");
96 gauge
97 .with_label_values(&[
98 &snap.info.version,
99 &snap.info.source,
100 &snap.info.rack,
101 &snap.info.dc,
102 ])
103 .set(1);
104 registry
105 .register(Box::new(gauge))
106 .expect("invariant: build_info registers cleanly");
107}
108
109fn register_uptime(registry: &Registry, snap: &Snapshot) {
110 let opts = Opts::new(
111 "dynomite_uptime_seconds",
112 "Seconds elapsed since the engine started.",
113 );
114 let gauge = IntGaugeVec::new(opts, &[]).expect("invariant: uptime descriptor is valid");
115 gauge.with_label_values::<&str>(&[]).set(snap.uptime);
116 registry
117 .register(Box::new(gauge))
118 .expect("invariant: uptime registers cleanly");
119
120 let opts = Opts::new(
121 "dynomite_timestamp_seconds",
122 "Wall-clock seconds since the UNIX epoch at snapshot time.",
123 );
124 let gauge = IntGaugeVec::new(opts, &[]).expect("invariant: timestamp descriptor is valid");
125 gauge.with_label_values::<&str>(&[]).set(snap.timestamp);
126 registry
127 .register(Box::new(gauge))
128 .expect("invariant: timestamp registers cleanly");
129}
130
131fn register_resource_usage(registry: &Registry, snap: &Snapshot) {
132 let entries: [(&str, &str, i64); 5] = [
133 (
134 "dynomite_alloc_msgs",
135 "Number of message structs currently allocated.",
136 snap.alloc_msgs,
137 ),
138 (
139 "dynomite_free_msgs",
140 "Number of message structs on the free list.",
141 snap.free_msgs,
142 ),
143 (
144 "dynomite_alloc_mbufs",
145 "Number of mbuf chunks currently allocated.",
146 snap.alloc_mbufs,
147 ),
148 (
149 "dynomite_free_mbufs",
150 "Number of mbuf chunks on the free list.",
151 snap.free_mbufs,
152 ),
153 (
154 "dynomite_memory_bytes",
155 "Resident set size of the engine in bytes.",
156 snap.dyn_memory,
157 ),
158 ];
159 for (name, help, value) in entries {
160 let gauge = IntGaugeVec::new(Opts::new(name, help), &[])
161 .expect("invariant: resource gauge descriptor is valid");
162 gauge.with_label_values::<&str>(&[]).set(value);
163 registry
164 .register(Box::new(gauge))
165 .expect("invariant: resource gauge registers cleanly");
166 }
167}
168
169fn register_pool(registry: &Registry, snap: &Snapshot) {
170 let pool = &snap.pool.name;
171 for (i, spec) in POOL_CODEC.iter().enumerate() {
172 let value = snap.pool.metrics.get(i).copied().unwrap_or(0);
173 match spec.kind {
174 StatsMetricType::Counter => {
175 let name = format!("dynomite_pool_{}_total", spec.name);
176 let opts = Opts::new(name, spec.description);
177 let counter = IntCounterVec::new(opts, &["pool"])
178 .expect("invariant: pool counter descriptor is valid");
179 if value > 0 {
180 counter
181 .with_label_values(&[pool.as_str()])
182 .inc_by(u64::try_from(value).unwrap_or(0));
183 } else {
184 let _ = counter.with_label_values(&[pool.as_str()]);
185 }
186 registry
187 .register(Box::new(counter))
188 .expect("invariant: pool counter registers cleanly");
189 }
190 StatsMetricType::Gauge | StatsMetricType::Timestamp => {
191 let name = format!("dynomite_pool_{}", spec.name);
192 let opts = Opts::new(name, spec.description);
193 let gauge = IntGaugeVec::new(opts, &["pool"])
194 .expect("invariant: pool gauge descriptor is valid");
195 gauge.with_label_values(&[pool.as_str()]).set(value);
196 registry
197 .register(Box::new(gauge))
198 .expect("invariant: pool gauge registers cleanly");
199 }
200 }
201 }
202}
203
204fn register_server(registry: &Registry, snap: &Snapshot) {
205 let server = &snap.server.name;
206 for (i, spec) in SERVER_CODEC.iter().enumerate() {
207 let value = snap.server.metrics.get(i).copied().unwrap_or(0);
208 match spec.kind {
209 StatsMetricType::Counter => {
210 let name = format!("dynomite_server_{}_total", spec.name);
211 let opts = Opts::new(name, spec.description);
212 let counter = IntCounterVec::new(opts, &["server"])
213 .expect("invariant: server counter descriptor is valid");
214 if value > 0 {
215 counter
216 .with_label_values(&[server.as_str()])
217 .inc_by(u64::try_from(value).unwrap_or(0));
218 } else {
219 let _ = counter.with_label_values(&[server.as_str()]);
220 }
221 registry
222 .register(Box::new(counter))
223 .expect("invariant: server counter registers cleanly");
224 }
225 StatsMetricType::Gauge | StatsMetricType::Timestamp => {
226 let name = format!("dynomite_server_{}", spec.name);
227 let opts = Opts::new(name, spec.description);
228 let gauge = IntGaugeVec::new(opts, &["server"])
229 .expect("invariant: server gauge descriptor is valid");
230 gauge.with_label_values(&[server.as_str()]).set(value);
231 registry
232 .register(Box::new(gauge))
233 .expect("invariant: server gauge registers cleanly");
234 }
235 }
236 }
237}
238
239fn register_peer_state(registry: &Registry, snap: &Snapshot) {
240 let opts = Opts::new(
241 "dynomite_peer_state",
242 "Peer up/down indicator. The active state has value 1; the other has value 0.",
243 );
244 let gauge = IntGaugeVec::new(opts, &["peer", "state"])
245 .expect("invariant: peer_state descriptor is valid");
246 let peer = snap.server.name.as_str();
247 gauge.with_label_values(&[peer, "up"]).set(1);
248 gauge.with_label_values(&[peer, "down"]).set(0);
249 registry
250 .register(Box::new(gauge))
251 .expect("invariant: peer_state registers cleanly");
252}
253
254fn register_failure_metrics(registry: &Registry, failure: &FailureSnapshot) {
255 register_failure_no_targets(registry, failure);
256 register_failure_peer_send(registry, failure);
257 register_failure_backend_send(registry, failure);
258 register_failure_response_timeout(registry, failure);
259 register_failure_peer_state(registry, failure);
260 register_failure_phi(registry, failure);
261 register_failure_phi_threshold(registry, failure);
262 register_failure_dwell(registry, failure);
263}
264
265fn register_failure_no_targets(registry: &Registry, failure: &FailureSnapshot) {
266 let opts = Opts::new(
267 "dispatch_no_targets_total",
268 "Dispatch failures because the only routable peer for the hashed token was Down or absent.",
269 );
270 let counter = IntCounterVec::new(opts, &["dc", "rack", "consistency_level"])
271 .expect("invariant: dispatch_no_targets descriptor is valid");
272 for entry in &failure.no_targets {
273 counter
274 .with_label_values(&[
275 entry.dc.as_str(),
276 entry.rack.as_str(),
277 entry.consistency.name(),
278 ])
279 .inc_by(entry.count);
280 }
281 registry
282 .register(Box::new(counter))
283 .expect("invariant: dispatch_no_targets registers cleanly");
284}
285
286fn register_failure_peer_send(registry: &Registry, failure: &FailureSnapshot) {
287 let full = IntCounterVec::new(
288 Opts::new(
289 "dispatch_peer_send_full_total",
290 "Dispatcher try_send to a peer's outbound channel returned Full.",
291 ),
292 &["peer_idx", "peer_dc"],
293 )
294 .expect("invariant: dispatch_peer_send_full descriptor is valid");
295 for entry in &failure.peer_send_full {
296 full.with_label_values(&[&entry.peer_idx.to_string(), &entry.peer_dc])
297 .inc_by(entry.count);
298 }
299 registry
300 .register(Box::new(full))
301 .expect("invariant: dispatch_peer_send_full registers cleanly");
302
303 let closed = IntCounterVec::new(
304 Opts::new(
305 "dispatch_peer_send_closed_total",
306 "Dispatcher try_send to a peer's outbound channel returned Closed.",
307 ),
308 &["peer_idx", "peer_dc"],
309 )
310 .expect("invariant: dispatch_peer_send_closed descriptor is valid");
311 for entry in &failure.peer_send_closed {
312 closed
313 .with_label_values(&[&entry.peer_idx.to_string(), &entry.peer_dc])
314 .inc_by(entry.count);
315 }
316 registry
317 .register(Box::new(closed))
318 .expect("invariant: dispatch_peer_send_closed registers cleanly");
319}
320
321fn register_failure_backend_send(registry: &Registry, failure: &FailureSnapshot) {
322 let full = IntCounterVec::new(
323 Opts::new(
324 "dispatch_backend_send_full_total",
325 "Dispatcher try_send to the local datastore backend returned Full.",
326 ),
327 &[],
328 )
329 .expect("invariant: dispatch_backend_send_full descriptor is valid");
330 if failure.backend_send_full > 0 {
331 full.with_label_values::<&str>(&[])
332 .inc_by(failure.backend_send_full);
333 } else {
334 let _ = full.with_label_values::<&str>(&[]);
335 }
336 registry
337 .register(Box::new(full))
338 .expect("invariant: dispatch_backend_send_full registers cleanly");
339
340 let closed = IntCounterVec::new(
341 Opts::new(
342 "dispatch_backend_send_closed_total",
343 "Dispatcher try_send to the local datastore backend returned Closed.",
344 ),
345 &[],
346 )
347 .expect("invariant: dispatch_backend_send_closed descriptor is valid");
348 if failure.backend_send_closed > 0 {
349 closed
350 .with_label_values::<&str>(&[])
351 .inc_by(failure.backend_send_closed);
352 } else {
353 let _ = closed.with_label_values::<&str>(&[]);
354 }
355 registry
356 .register(Box::new(closed))
357 .expect("invariant: dispatch_backend_send_closed registers cleanly");
358}
359
360fn register_failure_response_timeout(registry: &Registry, failure: &FailureSnapshot) {
361 let counter = IntCounterVec::new(
362 Opts::new(
363 "dispatch_response_timeout_total",
364 "Dispatcher's response coalescer gave up waiting for replies.",
365 ),
366 &["consistency_level"],
367 )
368 .expect("invariant: dispatch_response_timeout descriptor is valid");
369 for entry in &failure.response_timeout {
370 counter
371 .with_label_values(&[entry.consistency.name()])
372 .inc_by(entry.count);
373 }
374 registry
375 .register(Box::new(counter))
376 .expect("invariant: dispatch_response_timeout registers cleanly");
377}
378
379fn register_failure_peer_state(registry: &Registry, failure: &FailureSnapshot) {
380 let trans = IntCounterVec::new(
381 Opts::new(
382 "peer_state_transitions_total",
383 "Number of gossip-driven peer-state transitions, labelled by from/to state.",
384 ),
385 &["peer_idx", "from_state", "to_state"],
386 )
387 .expect("invariant: peer_state_transitions descriptor is valid");
388 for entry in &failure.peer_state_transitions {
389 let peer_idx = entry.peer_idx.to_string();
390 trans
391 .with_label_values(&[peer_idx.as_str(), entry.from.name(), entry.to.name()])
392 .inc_by(entry.count);
393 }
394 registry
395 .register(Box::new(trans))
396 .expect("invariant: peer_state_transitions registers cleanly");
397
398 let current = IntGaugeVec::new(
399 Opts::new(
400 "peer_state_current",
401 "Current peer state. Numeric value matches PeerState's repr(u8): \
402 0=Unknown, 1=Joining, 2=Normal, 3=Standby, 4=Down, 5=Reset, 6=Leaving.",
403 ),
404 &["peer_idx", "dc", "rack"],
405 )
406 .expect("invariant: peer_state_current descriptor is valid");
407 for entry in &failure.peer_state_current {
408 current
409 .with_label_values(&[&entry.peer_idx.to_string(), &entry.dc, &entry.rack])
410 .set(peer_state_value(entry.state));
411 }
412 registry
413 .register(Box::new(current))
414 .expect("invariant: peer_state_current registers cleanly");
415}
416
417fn register_failure_phi(registry: &Registry, failure: &FailureSnapshot) {
418 let gauge = IntGaugeVec::new(
419 Opts::new(
420 "gossip_phi_score_milli",
421 "Phi-accrual failure detector score per peer, scaled by 1000 (gauge units = thousandths).",
422 ),
423 &["peer_idx", "dc", "rack"],
424 )
425 .expect("invariant: gossip_phi_score descriptor is valid");
426 for entry in &failure.peer_phi {
427 let value = phi_to_milli_clamped(entry.phi);
428 gauge
429 .with_label_values(&[&entry.peer_idx.to_string(), &entry.dc, &entry.rack])
430 .set(value);
431 }
432 registry
433 .register(Box::new(gauge))
434 .expect("invariant: gossip_phi_score registers cleanly");
435}
436
437fn register_failure_phi_threshold(registry: &Registry, failure: &FailureSnapshot) {
438 let gauge = IntGaugeVec::new(
439 Opts::new(
440 "gossip_phi_threshold_observed_milli",
441 "Phi-accrual threshold the failure detector last evaluated against the peer, \
442 scaled by 1000 (gauge units = thousandths). Use to confirm operator-tuned \
443 thresholds against the gossip handler's running config.",
444 ),
445 &["peer_idx", "dc", "rack"],
446 )
447 .expect("invariant: gossip_phi_threshold_observed descriptor is valid");
448 for entry in &failure.peer_threshold {
449 let value = phi_to_milli_clamped(entry.threshold);
450 gauge
451 .with_label_values(&[&entry.peer_idx.to_string(), &entry.dc, &entry.rack])
452 .set(value);
453 }
454 registry
455 .register(Box::new(gauge))
456 .expect("invariant: gossip_phi_threshold_observed registers cleanly");
457}
458
459fn register_failure_dwell(registry: &Registry, failure: &FailureSnapshot) {
460 use crate::stats::failure::DWELL_BUCKETS_SECONDS;
461 if failure.peer_state_dwell.is_empty() {
462 return;
463 }
464 let bucket_gauge = IntGaugeVec::new(
466 Opts::new(
467 "peer_state_dwell_seconds_bucket",
468 "Cumulative count of peer-state dwell observations whose duration is <= 'le', per state.",
469 ),
470 &["state", "le"],
471 )
472 .expect("invariant: peer_state_dwell_seconds_bucket descriptor is valid");
473 let count_gauge = IntGaugeVec::new(
474 Opts::new(
475 "peer_state_dwell_seconds_count",
476 "Total number of peer-state dwell observations recorded for the labelled state.",
477 ),
478 &["state"],
479 )
480 .expect("invariant: peer_state_dwell_seconds_count descriptor is valid");
481 let sum_gauge = IntGaugeVec::new(
482 Opts::new(
483 "peer_state_dwell_seconds_sum_milli",
484 "Sum of dwell observations in milliseconds per state. Divide by 1000 for seconds.",
485 ),
486 &["state"],
487 )
488 .expect("invariant: peer_state_dwell_seconds_sum descriptor is valid");
489 for entry in &failure.peer_state_dwell {
490 let state_label = entry.state.name();
491 let count = i64::try_from(entry.count).unwrap_or(i64::MAX);
492 count_gauge.with_label_values(&[state_label]).set(count);
493 let sum_milli = phi_to_milli_clamped(entry.sum_seconds);
494 sum_gauge.with_label_values(&[state_label]).set(sum_milli);
495 for (i, upper) in DWELL_BUCKETS_SECONDS.iter().enumerate() {
496 if let Some(c) = entry.bucket_counts.get(i) {
497 let val = i64::try_from(*c).unwrap_or(i64::MAX);
498 let le = format_le(*upper);
499 bucket_gauge.with_label_values(&[state_label, &le]).set(val);
500 }
501 }
502 if let Some(c) = entry.bucket_counts.last() {
503 let val = i64::try_from(*c).unwrap_or(i64::MAX);
504 bucket_gauge
505 .with_label_values(&[state_label, "+Inf"])
506 .set(val);
507 }
508 }
509 registry
510 .register(Box::new(bucket_gauge))
511 .expect("invariant: peer_state_dwell_seconds_bucket registers cleanly");
512 registry
513 .register(Box::new(count_gauge))
514 .expect("invariant: peer_state_dwell_seconds_count registers cleanly");
515 registry
516 .register(Box::new(sum_gauge))
517 .expect("invariant: peer_state_dwell_seconds_sum registers cleanly");
518}
519
520fn format_le(upper: f64) -> String {
524 if upper.fract() == 0.0 && (0.0..1e15).contains(&upper) {
525 let as_u64 = if (0.0..1e15).contains(&upper) {
529 #[allow(
530 clippy::cast_possible_truncation,
531 clippy::cast_sign_loss,
532 reason = "label rendering of a known finite, non-negative, sub-1e15 bucket boundary"
533 )]
534 {
535 upper as u64
536 }
537 } else {
538 0
539 };
540 format!("{as_u64}")
541 } else {
542 format!("{upper}")
543 }
544}
545
546fn peer_state_value(state: PeerState) -> i64 {
551 match state {
552 PeerState::Unknown => 0,
553 PeerState::Joining => 1,
554 PeerState::Normal => 2,
555 PeerState::Standby => 3,
556 PeerState::Down => 4,
557 PeerState::Reset => 5,
558 PeerState::Leaving => 6,
559 }
560}
561
562fn phi_to_milli_clamped(phi: f64) -> i64 {
566 if !phi.is_finite() || phi <= 0.0 {
567 return 0;
568 }
569 let saturating = i64::MAX / 1000;
570 let scaled = (phi * 1000.0).round();
571 if !scaled.is_finite() || scaled <= 0.0 {
572 return 0;
573 }
574 let bits = scaled.to_bits();
575 let exp_field = u32::try_from((bits >> 52) & 0x7FF).unwrap_or(0);
576 if exp_field < 1023 {
577 return 0;
578 }
579 let unbiased = exp_field - 1023;
580 if unbiased >= 63 {
581 return saturating;
582 }
583 let mant = bits & ((1u64 << 52) - 1);
584 let m = (1u64 << 52) | mant;
585 let value = if unbiased >= 52 {
586 m.checked_shl(unbiased - 52).unwrap_or(u64::MAX)
587 } else {
588 m >> (52 - unbiased)
589 };
590 i64::try_from(value).unwrap_or(saturating).min(saturating)
591}
592
593fn register_histogram_summaries(registry: &Registry, snap: &Snapshot) {
594 let entries: [(&str, &str, &HistogramSummary); 8] = [
595 (
596 "dynomite_request_latency_microseconds",
597 "Top-level request latency in microseconds.",
598 &snap.latency,
599 ),
600 (
601 "dynomite_payload_size_bytes",
602 "Observed request/response payload sizes in bytes.",
603 &snap.payload_size,
604 ),
605 (
606 "dynomite_cross_region_latency_microseconds",
607 "Cross-region peer round-trip latency in microseconds.",
608 &snap.cross_region_latency,
609 ),
610 (
611 "dynomite_cross_zone_latency_microseconds",
612 "Cross-zone peer latency in microseconds.",
613 &snap.cross_zone_latency,
614 ),
615 (
616 "dynomite_server_latency_microseconds",
617 "Backing-server response latency in microseconds.",
618 &snap.server_latency,
619 ),
620 (
621 "dynomite_cross_region_queue_wait_microseconds",
622 "Cross-region queue wait time in microseconds.",
623 &snap.cross_region_queue_wait,
624 ),
625 (
626 "dynomite_cross_zone_queue_wait_microseconds",
627 "Cross-zone queue wait time in microseconds.",
628 &snap.cross_zone_queue_wait,
629 ),
630 (
631 "dynomite_server_queue_wait_microseconds",
632 "Server queue wait time in microseconds.",
633 &snap.server_queue_wait,
634 ),
635 ];
636 for (name, help, summary) in entries {
637 let gauge = IntGaugeVec::new(Opts::new(name, help), &["quantile"])
638 .expect("invariant: histogram quantile gauge is valid");
639 let s = *summary;
640 let mean_v = i64::try_from(s.mean).unwrap_or(i64::MAX);
641 let q95 = i64::try_from(s.p95).unwrap_or(i64::MAX);
642 let q99 = i64::try_from(s.p99).unwrap_or(i64::MAX);
643 let q999 = i64::try_from(s.p999).unwrap_or(i64::MAX);
644 let max_v = i64::try_from(s.max).unwrap_or(i64::MAX);
645 gauge.with_label_values(&["mean"]).set(mean_v);
646 gauge.with_label_values(&["0.95"]).set(q95);
647 gauge.with_label_values(&["0.99"]).set(q99);
648 gauge.with_label_values(&["0.999"]).set(q999);
649 gauge.with_label_values(&["max"]).set(max_v);
650 registry
651 .register(Box::new(gauge))
652 .expect("invariant: histogram quantile gauge registers cleanly");
653 }
654}
655
656fn register_queue_p99s(registry: &Registry, snap: &Snapshot) {
657 let entries: [(&str, &str, u64); 8] = [
658 (
659 "dynomite_client_out_queue_p99",
660 "99th percentile of the client outbound queue length.",
661 snap.client_out_queue_p99,
662 ),
663 (
664 "dynomite_server_in_queue_p99",
665 "99th percentile of the server inbound queue length.",
666 snap.server_in_queue_p99,
667 ),
668 (
669 "dynomite_server_out_queue_p99",
670 "99th percentile of the server outbound queue length.",
671 snap.server_out_queue_p99,
672 ),
673 (
674 "dynomite_dnode_client_out_queue_p99",
675 "99th percentile of the dnode client outbound queue length.",
676 snap.dnode_client_out_queue_p99,
677 ),
678 (
679 "dynomite_peer_in_queue_p99",
680 "99th percentile of the local-DC peer inbound queue length.",
681 snap.peer_in_queue_p99,
682 ),
683 (
684 "dynomite_peer_out_queue_p99",
685 "99th percentile of the local-DC peer outbound queue length.",
686 snap.peer_out_queue_p99,
687 ),
688 (
689 "dynomite_remote_peer_in_queue_p99",
690 "99th percentile of the remote-DC peer inbound queue length.",
691 snap.remote_peer_in_queue_p99,
692 ),
693 (
694 "dynomite_remote_peer_out_queue_p99",
695 "99th percentile of the remote-DC peer outbound queue length.",
696 snap.remote_peer_out_queue_p99,
697 ),
698 ];
699 for (name, help, value) in entries {
700 let gauge = IntGaugeVec::new(Opts::new(name, help), &[])
701 .expect("invariant: queue p99 gauge descriptor is valid");
702 let value_i64 = i64::try_from(value).unwrap_or(i64::MAX);
703 gauge.with_label_values::<&str>(&[]).set(value_i64);
704 registry
705 .register(Box::new(gauge))
706 .expect("invariant: queue p99 gauge registers cleanly");
707 }
708}
709
710#[cfg(test)]
711mod tests {
712 use super::*;
713 use crate::stats::codec::PoolField;
714 use crate::stats::snapshot::{PoolStats, ServerStats, ServiceInfo};
715
716 fn make_snap() -> Snapshot {
717 Snapshot {
718 info: ServiceInfo {
719 source: "node-a".into(),
720 version: "0.0.1".into(),
721 rack: "r1".into(),
722 dc: "dc1".into(),
723 },
724 pool: PoolStats::new("dyn_o_mite"),
725 server: ServerStats::new("redis_local"),
726 ..Snapshot::default()
727 }
728 }
729
730 #[test]
731 fn render_prometheus_includes_help_and_type_lines() {
732 let mut snap = make_snap();
733 snap.pool.metrics[PoolField::ClientEof.index()] = 7;
734 let out = render_prometheus(&snap);
735 assert!(
736 out.contains("# HELP dynomite_pool_client_eof_total"),
737 "missing # HELP for pool client_eof:\n{out}"
738 );
739 assert!(
740 out.contains("# TYPE dynomite_pool_client_eof_total counter"),
741 "missing # TYPE for pool client_eof:\n{out}"
742 );
743 assert!(
744 out.contains("dynomite_pool_client_eof_total{pool=\"dyn_o_mite\"} 7"),
745 "missing pool client_eof value line:\n{out}"
746 );
747 }
748
749 #[test]
750 fn render_prometheus_quotes_label_values() {
751 let mut snap = make_snap();
752 snap.pool = PoolStats::new("my\\pool\"");
753 snap.pool.metrics[PoolField::ClientEof.index()] = 3;
754 let out = render_prometheus(&snap);
755 let backslash = "\\\\";
756 let escaped_quote = "\\\"";
757 let expected_label = format!("pool=\"my{backslash}pool{escaped_quote}\"");
758 assert!(
759 out.contains(&expected_label),
760 "expected escaped label `{expected_label}` not found in:\n{out}"
761 );
762 }
763
764 #[test]
765 fn render_prometheus_emits_build_info() {
766 let snap = make_snap();
767 let out = render_prometheus(&snap);
768 assert!(
769 out.contains("# TYPE dynomite_build_info gauge"),
770 "missing build_info type line:\n{out}"
771 );
772 let needle = "dynomite_build_info{";
773 let pos = out
774 .find(needle)
775 .unwrap_or_else(|| panic!("missing build_info value line:\n{out}"));
776 let line_end = out[pos..].find('\n').map_or(out.len(), |n| pos + n);
777 let line = &out[pos..line_end];
778 assert!(
779 line.contains("version=\"0.0.1\""),
780 "build_info missing version label: {line}"
781 );
782 assert!(line.ends_with(" 1"), "build_info value should be 1: {line}");
783 }
784
785 #[test]
786 fn render_prometheus_includes_server_counters_and_uptime() {
787 let mut snap = make_snap();
788 snap.uptime = 42;
789 snap.server.metrics[crate::stats::ServerField::ReadRequests.index()] = 5;
790 let out = render_prometheus(&snap);
791 assert!(
792 out.contains("# TYPE dynomite_server_read_requests_total counter"),
793 "server counter type line missing"
794 );
795 assert!(
796 out.contains("dynomite_server_read_requests_total{server=\"redis_local\"} 5"),
797 "server counter value missing:\n{out}"
798 );
799 assert!(
800 out.contains("dynomite_uptime_seconds 42"),
801 "uptime gauge value missing:\n{out}"
802 );
803 }
804
805 #[test]
806 fn render_prometheus_emits_peer_state_for_server() {
807 let snap = make_snap();
808 let out = render_prometheus(&snap);
809 assert!(
810 out.contains("dynomite_peer_state{peer=\"redis_local\",state=\"up\"} 1"),
811 "peer_state up line missing:\n{out}"
812 );
813 assert!(
814 out.contains("dynomite_peer_state{peer=\"redis_local\",state=\"down\"} 0"),
815 "peer_state down line missing:\n{out}"
816 );
817 }
818
819 #[test]
824 fn render_prometheus_emits_failure_cause_counters() {
825 use crate::cluster::peer::PeerState;
826 use crate::msg::ConsistencyLevel;
827 use crate::stats::FailureMetrics;
828
829 let metrics = FailureMetrics::new();
830 metrics.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
831 metrics.record_peer_send_full(7, "dc2");
832 metrics.record_peer_send_closed(7, "dc2");
833 metrics.record_backend_send_full();
834 metrics.record_backend_send_closed();
835 metrics.record_response_timeout(ConsistencyLevel::DcOne);
836 metrics.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
837 metrics.observe_phi(3, "dc1", "rA", 4.5);
838
839 let mut snap = make_snap();
840 snap.failure = metrics.snapshot();
841 let out = render_prometheus(&snap);
842
843 assert!(
844 out.contains("# TYPE dispatch_no_targets_total counter"),
845 "missing dispatch_no_targets type line:\n{out}"
846 );
847 assert!(
848 out.contains(
849 "dispatch_no_targets_total{consistency_level=\"DC_QUORUM\",dc=\"dc1\",rack=\"rA\"} 1"
850 ),
851 "missing dispatch_no_targets row:\n{out}"
852 );
853 assert!(
854 out.contains("# TYPE dispatch_peer_send_full_total counter"),
855 "missing dispatch_peer_send_full type line:\n{out}"
856 );
857 assert!(
858 out.contains("dispatch_peer_send_full_total{peer_dc=\"dc2\",peer_idx=\"7\"} 1"),
859 "missing dispatch_peer_send_full row:\n{out}"
860 );
861 assert!(
862 out.contains("dispatch_peer_send_closed_total{peer_dc=\"dc2\",peer_idx=\"7\"} 1"),
863 "missing dispatch_peer_send_closed row:\n{out}"
864 );
865 assert!(
866 out.contains("dispatch_backend_send_full_total 1"),
867 "missing dispatch_backend_send_full row:\n{out}"
868 );
869 assert!(
870 out.contains("dispatch_backend_send_closed_total 1"),
871 "missing dispatch_backend_send_closed row:\n{out}"
872 );
873 assert!(
874 out.contains("dispatch_response_timeout_total{consistency_level=\"DC_ONE\"} 1"),
875 "missing dispatch_response_timeout row:\n{out}"
876 );
877 assert!(
878 out.contains(
879 "peer_state_transitions_total{from_state=\"NORMAL\",peer_idx=\"3\",to_state=\"DOWN\"} 1"
880 ),
881 "missing peer_state_transitions row:\n{out}"
882 );
883 assert!(
884 out.contains("peer_state_current{dc=\"dc1\",peer_idx=\"3\",rack=\"rA\"} 4"),
885 "missing peer_state_current row (state=Down=4):\n{out}"
886 );
887 assert!(
889 out.contains("gossip_phi_score_milli{dc=\"dc1\",peer_idx=\"3\",rack=\"rA\"} 4500"),
890 "missing gossip_phi_score_milli row:\n{out}"
891 );
892 }
893
894 #[test]
897 fn render_prometheus_emits_threshold_and_dwell_rows() {
898 use crate::cluster::peer::PeerState;
899 use crate::stats::FailureMetrics;
900 use std::time::{Duration, Instant};
901
902 let metrics = FailureMetrics::new();
903 metrics.observe_threshold(2, "dc1", "rA", 8.0);
904 let t0 = Instant::now();
905 metrics.record_peer_state_transition_at(
906 2,
907 "dc1",
908 "rA",
909 PeerState::Unknown,
910 PeerState::Normal,
911 t0,
912 );
913 metrics.record_peer_state_transition_at(
915 2,
916 "dc1",
917 "rA",
918 PeerState::Normal,
919 PeerState::Down,
920 t0 + Duration::from_millis(1_250),
921 );
922
923 let mut snap = make_snap();
924 snap.failure = metrics.snapshot();
925 let out = render_prometheus(&snap);
926
927 assert!(
928 out.contains(
929 "gossip_phi_threshold_observed_milli{dc=\"dc1\",peer_idx=\"2\",rack=\"rA\"} 8000"
930 ),
931 "missing gossip_phi_threshold_observed_milli row:\n{out}"
932 );
933 assert!(
934 out.contains("peer_state_dwell_seconds_count{state=\"NORMAL\"} 1"),
935 "missing peer_state_dwell_seconds_count row:\n{out}"
936 );
937 assert!(
938 out.contains("peer_state_dwell_seconds_bucket{le=\"+Inf\",state=\"NORMAL\"} 1"),
939 "missing peer_state_dwell_seconds_bucket +Inf row:\n{out}"
940 );
941 assert!(
943 out.contains("peer_state_dwell_seconds_bucket{le=\"5\",state=\"NORMAL\"} 1"),
944 "missing peer_state_dwell_seconds_bucket le=5 row:\n{out}"
945 );
946 assert!(
947 out.contains("peer_state_dwell_seconds_bucket{le=\"1\",state=\"NORMAL\"} 0"),
948 "missing peer_state_dwell_seconds_bucket le=1 (should be 0):\n{out}"
949 );
950 }
951}