1use std::fmt::{self, Write};
4
5use crate::stats::codec::{StatsMetricType, POOL_CODEC, SERVER_CODEC};
6use crate::stats::failure::FailureSnapshot;
7use crate::stats::histogram::Histogram;
8
9#[derive(Clone, Debug, Default)]
24pub struct ServiceInfo {
25 pub source: String,
27 pub version: String,
29 pub rack: String,
31 pub dc: String,
33}
34
35#[derive(Clone, Copy, Debug, Default)]
45pub struct HistogramSummary {
46 pub max: u64,
48 pub p999: u64,
50 pub p99: u64,
52 pub p95: u64,
54 pub mean: u64,
56}
57
58impl HistogramSummary {
59 pub fn from_histogram(h: &Histogram) -> Self {
76 if h.is_overflowing() {
77 return Self::default();
78 }
79 let mean_f = h.mean();
80 let mean = if mean_f.is_finite() && mean_f > 0.0 {
81 ceil_f64_to_u64(mean_f)
83 } else {
84 0
85 };
86 Self {
87 max: h.max(),
88 p999: h.percentile(0.999),
89 p99: h.percentile(0.99),
90 p95: h.percentile(0.95),
91 mean,
92 }
93 }
94}
95
96fn ceil_f64_to_u64(x: f64) -> u64 {
99 if !x.is_finite() || x <= 0.0 {
100 return 0;
101 }
102 let ceil = x.ceil();
103 let bits = ceil.to_bits();
104 let exp = u32::try_from((bits >> 52) & 0x7FF).expect("11-bit field");
105 let mant = bits & ((1u64 << 52) - 1);
106 if exp < 1023 {
107 return 1;
109 }
110 let unbiased = exp - 1023;
111 if unbiased >= 64 {
112 return u64::MAX;
113 }
114 let m = (1u64 << 52) | mant;
115 if unbiased >= 52 {
116 let shift = unbiased - 52;
117 m.checked_shl(shift).unwrap_or(u64::MAX)
118 } else {
119 m >> (52 - unbiased)
120 }
121}
122
123#[derive(Clone, Debug)]
134pub struct PoolStats {
135 pub name: String,
137 pub metrics: Vec<i64>,
139}
140
141impl Default for PoolStats {
142 fn default() -> Self {
143 Self::new(String::new())
144 }
145}
146
147impl PoolStats {
148 pub fn new(name: impl Into<String>) -> Self {
158 Self {
159 name: name.into(),
160 metrics: vec![0; POOL_CODEC.len()],
161 }
162 }
163}
164
165#[derive(Clone, Debug)]
175pub struct ServerStats {
176 pub name: String,
178 pub metrics: Vec<i64>,
180}
181
182impl Default for ServerStats {
183 fn default() -> Self {
184 Self::new(String::new())
185 }
186}
187
188impl ServerStats {
189 pub fn new(name: impl Into<String>) -> Self {
199 Self {
200 name: name.into(),
201 metrics: vec![0; SERVER_CODEC.len()],
202 }
203 }
204}
205
206#[derive(Clone, Debug)]
216pub struct PeerStats {
217 pub name: String,
219 pub metrics: Vec<i64>,
221}
222
223impl PeerStats {
224 pub fn new(name: impl Into<String>) -> Self {
234 Self {
235 name: name.into(),
236 metrics: vec![0; SERVER_CODEC.len()],
237 }
238 }
239}
240
241#[derive(Clone, Debug, Default)]
246pub struct Snapshot {
247 pub info: ServiceInfo,
249 pub uptime: i64,
251 pub timestamp: i64,
253 pub latency: HistogramSummary,
255 pub payload_size: HistogramSummary,
257 pub cross_region_latency: HistogramSummary,
259 pub cross_zone_latency: HistogramSummary,
261 pub server_latency: HistogramSummary,
263 pub cross_region_queue_wait: HistogramSummary,
265 pub cross_zone_queue_wait: HistogramSummary,
267 pub server_queue_wait: HistogramSummary,
269 pub client_out_queue_p99: u64,
271 pub server_in_queue_p99: u64,
273 pub server_out_queue_p99: u64,
275 pub dnode_client_out_queue_p99: u64,
277 pub peer_in_queue_p99: u64,
279 pub peer_out_queue_p99: u64,
281 pub remote_peer_in_queue_p99: u64,
283 pub remote_peer_out_queue_p99: u64,
285 pub alloc_msgs: i64,
287 pub free_msgs: i64,
289 pub alloc_mbufs: i64,
291 pub free_mbufs: i64,
293 pub dyn_memory: i64,
295 pub pool: PoolStats,
297 pub server: ServerStats,
299 pub failure: FailureSnapshot,
301}
302
303impl Snapshot {
304 pub fn to_json(&self) -> String {
323 let mut out = String::new();
324 self.write_json(&mut out)
325 .expect("writing to a String never fails");
326 out
327 }
328
329 pub fn write_json<W: Write>(&self, w: &mut W) -> fmt::Result {
341 w.write_char('{')?;
342 self.write_header(w)?;
343 self.write_pool(w)?;
344 w.write_char('}')?;
345 Ok(())
346 }
347
348 fn write_header<W: Write>(&self, w: &mut W) -> fmt::Result {
349 write_string(w, "service", "dynomite")?;
350 write_string(w, "source", &self.info.source)?;
351 write_string(w, "version", &self.info.version)?;
352 write_num(w, "uptime", self.uptime)?;
353 write_num(w, "timestamp", self.timestamp)?;
354 write_string(w, "rack", &self.info.rack)?;
355 write_string(w, "dc", &self.info.dc)?;
356
357 write_num_u64(w, "latency_max", self.latency.max)?;
358 write_num_u64(w, "latency_999th", self.latency.p999)?;
359 write_num_u64(w, "latency_99th", self.latency.p99)?;
360 write_num_u64(w, "latency_95th", self.latency.p95)?;
361 write_num_u64(w, "latency_mean", self.latency.mean)?;
362
363 write_num_u64(w, "payload_size_max", self.payload_size.max)?;
364 write_num_u64(w, "payload_size_999th", self.payload_size.p999)?;
365 write_num_u64(w, "payload_size_99th", self.payload_size.p99)?;
366 write_num_u64(w, "payload_size_95th", self.payload_size.p95)?;
367 write_num_u64(w, "payload_size_mean", self.payload_size.mean)?;
368
369 self.write_cross_region_latency(w)?;
370 self.write_queue_wait(w)?;
371 self.write_queue_p99s(w)?;
372 self.write_resource_usage(w)?;
373 Ok(())
374 }
375
376 fn write_cross_region_latency<W: Write>(&self, w: &mut W) -> fmt::Result {
377 write_num_u64(
378 w,
379 "average_cross_region_rtt",
380 self.cross_region_latency.mean,
381 )?;
382 write_num_u64(w, "99_cross_region_rtt", self.cross_region_latency.p99)?;
383 write_num_u64(
384 w,
385 "average_cross_zone_latency",
386 self.cross_zone_latency.mean,
387 )?;
388 write_num_u64(w, "99_cross_zone_latency", self.cross_zone_latency.p99)?;
389 write_num_u64(w, "average_server_latency", self.server_latency.mean)?;
390 write_num_u64(w, "99_server_latency", self.server_latency.p99)?;
391 Ok(())
392 }
393
394 fn write_queue_wait<W: Write>(&self, w: &mut W) -> fmt::Result {
395 write_num_u64(
396 w,
397 "average_cross_region_queue_wait",
398 self.cross_region_queue_wait.mean,
399 )?;
400 write_num_u64(
401 w,
402 "99_cross_region_queue_wait",
403 self.cross_region_queue_wait.p99,
404 )?;
405 write_num_u64(
406 w,
407 "average_cross_zone_queue_wait",
408 self.cross_zone_queue_wait.mean,
409 )?;
410 write_num_u64(
411 w,
412 "99_cross_zone_queue_wait",
413 self.cross_zone_queue_wait.p99,
414 )?;
415 write_num_u64(w, "average_server_queue_wait", self.server_queue_wait.mean)?;
416 write_num_u64(w, "99_server_queue_wait", self.server_queue_wait.p99)?;
417 Ok(())
418 }
419
420 fn write_queue_p99s<W: Write>(&self, w: &mut W) -> fmt::Result {
421 write_num_u64(w, "client_out_queue_99", self.client_out_queue_p99)?;
422 write_num_u64(w, "server_in_queue_99", self.server_in_queue_p99)?;
423 write_num_u64(w, "server_out_queue_99", self.server_out_queue_p99)?;
424 write_num_u64(
425 w,
426 "dnode_client_out_queue_99",
427 self.dnode_client_out_queue_p99,
428 )?;
429 write_num_u64(w, "peer_in_queue_99", self.peer_in_queue_p99)?;
430 write_num_u64(w, "peer_out_queue_99", self.peer_out_queue_p99)?;
431 write_num_u64(
432 w,
433 "remote_peer_out_queue_99",
434 self.remote_peer_out_queue_p99,
435 )?;
436 write_num_u64(w, "remote_peer_in_queue_99", self.remote_peer_in_queue_p99)?;
437 Ok(())
438 }
439
440 fn write_resource_usage<W: Write>(&self, w: &mut W) -> fmt::Result {
441 write_num(w, "alloc_msgs", self.alloc_msgs)?;
442 write_num(w, "free_msgs", self.free_msgs)?;
443 write_num(w, "alloc_mbufs", self.alloc_mbufs)?;
444 write_num(w, "free_mbufs", self.free_mbufs)?;
445 write_num(w, "dyn_memory", self.dyn_memory)?;
446 Ok(())
447 }
448
449 fn write_pool<W: Write>(&self, w: &mut W) -> fmt::Result {
450 write!(w, "\"{}\":{{", escape_str(&self.pool.name))?;
451 for (i, spec) in POOL_CODEC.iter().enumerate() {
452 if !is_visible_metric(spec.kind) {
453 continue;
454 }
455 let value = self.pool.metrics.get(i).copied().unwrap_or(0);
456 write_num(w, spec.name, value)?;
457 }
458 self.write_server(w)?;
459 w.write_str("}")?;
460 Ok(())
461 }
462
463 fn write_server<W: Write>(&self, w: &mut W) -> fmt::Result {
464 write!(w, "\"{}\":{{", escape_str(&self.server.name))?;
465 let server_visible: Vec<usize> = SERVER_CODEC
466 .iter()
467 .enumerate()
468 .filter(|(_, s)| is_visible_metric(s.kind))
469 .map(|(i, _)| i)
470 .collect();
471 for (j, idx) in server_visible.iter().copied().enumerate() {
472 let spec = &SERVER_CODEC[idx];
473 let value = self.server.metrics.get(idx).copied().unwrap_or(0);
474 if j + 1 == server_visible.len() {
475 write_num_no_comma(w, spec.name, value)?;
476 } else {
477 write_num(w, spec.name, value)?;
478 }
479 }
480 w.write_str("}")?;
481 Ok(())
482 }
483}
484
485fn is_visible_metric(kind: StatsMetricType) -> bool {
489 matches!(
490 kind,
491 StatsMetricType::Counter | StatsMetricType::Gauge | StatsMetricType::Timestamp
492 )
493}
494
495fn write_string<W: Write>(w: &mut W, key: &str, value: &str) -> fmt::Result {
496 write!(w, "\"{}\":\"{}\",", escape_str(key), escape_str(value))
497}
498
499fn write_num<W: Write>(w: &mut W, key: &str, value: i64) -> fmt::Result {
500 write!(w, "\"{}\":{value},", escape_str(key))
501}
502
503fn write_num_no_comma<W: Write>(w: &mut W, key: &str, value: i64) -> fmt::Result {
504 write!(w, "\"{}\":{value}", escape_str(key))
505}
506
507fn write_num_u64<W: Write>(w: &mut W, key: &str, value: u64) -> fmt::Result {
508 write!(w, "\"{}\":{value},", escape_str(key))
509}
510
511fn escape_str(s: &str) -> EscapedStr<'_> {
514 EscapedStr(s)
515}
516
517struct EscapedStr<'a>(&'a str);
518
519impl fmt::Display for EscapedStr<'_> {
520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521 for ch in self.0.chars() {
522 match ch {
523 '\\' => f.write_str("\\\\")?,
524 '"' => f.write_str("\\\"")?,
525 '\n' => f.write_str("\\n")?,
526 '\r' => f.write_str("\\r")?,
527 '\t' => f.write_str("\\t")?,
528 c if (c as u32) < 0x20 => write!(f, "\\u{:04x}", c as u32)?,
529 c => f.write_char(c)?,
530 }
531 }
532 Ok(())
533 }
534}
535
536pub fn describe_stats() -> String {
547 let mut out = String::new();
548 out.push_str("pool stats:\n");
549 for spec in POOL_CODEC {
550 let _ = writeln!(out, " {:<20}\"{}\"", spec.name, spec.description);
551 }
552 out.push('\n');
553 out.push_str("server stats:\n");
554 for spec in SERVER_CODEC {
555 let _ = writeln!(out, " {:<20}\"{}\"", spec.name, spec.description);
556 }
557 out
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563
564 #[test]
565 fn ceil_helper_matches_known_values() {
566 assert_eq!(ceil_f64_to_u64(0.0), 0);
567 assert_eq!(ceil_f64_to_u64(1.0), 1);
568 assert_eq!(ceil_f64_to_u64(1.5), 2);
569 assert_eq!(ceil_f64_to_u64(2.0), 2);
570 assert_eq!(ceil_f64_to_u64(99.99), 100);
571 assert_eq!(ceil_f64_to_u64(f64::NAN), 0);
572 assert_eq!(ceil_f64_to_u64(f64::INFINITY), 0);
573 assert_eq!(ceil_f64_to_u64(-1.0), 0);
574 }
575
576 #[test]
577 fn empty_snapshot_renders_to_valid_json_skeleton() {
578 let snap = Snapshot {
579 pool: PoolStats::new("dyn_o_mite"),
580 server: ServerStats::new("redis"),
581 ..Snapshot::default()
582 };
583 let s = snap.to_json();
584 assert!(s.starts_with('{'));
585 assert!(s.ends_with('}'));
586 assert!(s.contains("\"service\":\"dynomite\""));
587 assert!(s.contains("\"dyn_o_mite\":{"));
588 assert!(s.contains("\"redis\":{"));
589 }
590
591 #[test]
592 fn describe_lists_every_metric() {
593 let text = describe_stats();
594 for spec in POOL_CODEC {
595 assert!(
596 text.contains(spec.name),
597 "pool metric {} missing",
598 spec.name
599 );
600 assert!(text.contains(spec.description));
601 }
602 for spec in SERVER_CODEC {
603 assert!(
604 text.contains(spec.name),
605 "server metric {} missing",
606 spec.name
607 );
608 }
609 }
610
611 #[test]
612 fn escape_handles_quotes_and_controls() {
613 let s = EscapedStr("a\"b\nc").to_string();
614 assert_eq!(s, r#"a\"b\nc"#);
615 }
616}