Skip to main content

dynomite/stats/
snapshot.rs

1//! Snapshot type and JSON serialization for the stats subsystem.
2
3use std::fmt::{self, Write};
4
5use crate::stats::codec::{StatsMetricType, POOL_CODEC, SERVER_CODEC};
6use crate::stats::failure::FailureSnapshot;
7use crate::stats::histogram::Histogram;
8
9/// Engine-wide identifying strings included in every snapshot.
10///
11/// # Examples
12///
13/// ```
14/// use dynomite::stats::ServiceInfo;
15/// let info = ServiceInfo {
16///     source: "node-a".into(),
17///     version: "0.0.1".into(),
18///     rack: "r1".into(),
19///     dc: "dc1".into(),
20/// };
21/// assert_eq!(info.source, "node-a");
22/// ```
23#[derive(Clone, Debug, Default)]
24pub struct ServiceInfo {
25    /// Hostname or address of the local node.
26    pub source: String,
27    /// Engine version reported as `version` in the JSON.
28    pub version: String,
29    /// Logical rack of the local node.
30    pub rack: String,
31    /// Logical datacenter of the local node.
32    pub dc: String,
33}
34
35/// Pre-computed quantile summary derived from a [`Histogram`].
36///
37/// # Examples
38///
39/// ```
40/// use dynomite::stats::HistogramSummary;
41/// let s = HistogramSummary::default();
42/// assert_eq!(s.max, 0);
43/// ```
44#[derive(Clone, Copy, Debug, Default)]
45pub struct HistogramSummary {
46    /// Maximum observation in the window.
47    pub max: u64,
48    /// 99.9th percentile.
49    pub p999: u64,
50    /// 99th percentile.
51    pub p99: u64,
52    /// 95th percentile.
53    pub p95: u64,
54    /// Arithmetic mean of all observations.
55    pub mean: u64,
56}
57
58impl HistogramSummary {
59    /// Compute the standard quantile summary from a histogram.
60    ///
61    /// When the histogram is in overflow (a value larger than the
62    /// largest bucket offset has been recorded), the summary is
63    /// zeroed: the reference implementation refuses to publish
64    /// percentiles in that state.
65    ///
66    /// # Examples
67    ///
68    /// ```
69    /// use dynomite::stats::{Histogram, HistogramSummary};
70    /// let mut h = Histogram::new();
71    /// for v in 0..100 { h.record(v); }
72    /// let s = HistogramSummary::from_histogram(&h);
73    /// assert!(s.p99 >= s.p95);
74    /// ```
75    pub fn from_histogram(h: &Histogram) -> Self {
76        if h.is_overflowing() {
77            return Self::default();
78        }
79        let mean_f = h.mean();
80        let mean = if mean_f.is_finite() && mean_f > 0.0 {
81            // Round mean up to the nearest integer.
82            ceil_f64_to_u64(mean_f)
83        } else {
84            0
85        };
86        Self {
87            max: h.max(),
88            p999: h.percentile(0.999),
89            p99: h.percentile(0.99),
90            p95: h.percentile(0.95),
91            mean,
92        }
93    }
94}
95
96/// Computes `ceil(x)` for non-negative finite `f64` values without an
97/// `as` cast, returning `u64::MAX` on overflow.
98fn ceil_f64_to_u64(x: f64) -> u64 {
99    if !x.is_finite() || x <= 0.0 {
100        return 0;
101    }
102    let ceil = x.ceil();
103    let bits = ceil.to_bits();
104    let exp = u32::try_from((bits >> 52) & 0x7FF).expect("11-bit field");
105    let mant = bits & ((1u64 << 52) - 1);
106    if exp < 1023 {
107        // 0.0 < ceil < 1.0 cannot occur after ceil(); fall back safely.
108        return 1;
109    }
110    let unbiased = exp - 1023;
111    if unbiased >= 64 {
112        return u64::MAX;
113    }
114    let m = (1u64 << 52) | mant;
115    if unbiased >= 52 {
116        let shift = unbiased - 52;
117        m.checked_shl(shift).unwrap_or(u64::MAX)
118    } else {
119        m >> (52 - unbiased)
120    }
121}
122
123/// Per-pool collected metrics.
124///
125/// # Examples
126///
127/// ```
128/// use dynomite::stats::PoolStats;
129/// let pool = PoolStats::new("dyn_o_mite");
130/// assert_eq!(pool.name, "dyn_o_mite");
131/// assert!(!pool.metrics.is_empty());
132/// ```
133#[derive(Clone, Debug)]
134pub struct PoolStats {
135    /// Pool name as declared in the YAML configuration.
136    pub name: String,
137    /// Counter/gauge values, indexed by `PoolField::index()`.
138    pub metrics: Vec<i64>,
139}
140
141impl Default for PoolStats {
142    fn default() -> Self {
143        Self::new(String::new())
144    }
145}
146
147impl PoolStats {
148    /// Construct a fresh `PoolStats` with all metrics zeroed.
149    ///
150    /// # Examples
151    ///
152    /// ```
153    /// use dynomite::stats::PoolStats;
154    /// let p = PoolStats::new("dyn_o_mite");
155    /// assert!(p.metrics.iter().all(|&v| v == 0));
156    /// ```
157    pub fn new(name: impl Into<String>) -> Self {
158        Self {
159            name: name.into(),
160            metrics: vec![0; POOL_CODEC.len()],
161        }
162    }
163}
164
165/// Per-datastore-server collected metrics.
166///
167/// # Examples
168///
169/// ```
170/// use dynomite::stats::ServerStats;
171/// let s = ServerStats::new("redis_local");
172/// assert_eq!(s.name, "redis_local");
173/// ```
174#[derive(Clone, Debug)]
175pub struct ServerStats {
176    /// Server name (the host name from the YAML).
177    pub name: String,
178    /// Counter/gauge values, indexed by `ServerField::index()`.
179    pub metrics: Vec<i64>,
180}
181
182impl Default for ServerStats {
183    fn default() -> Self {
184        Self::new(String::new())
185    }
186}
187
188impl ServerStats {
189    /// Construct a fresh `ServerStats` with all metrics zeroed.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use dynomite::stats::ServerStats;
195    /// let s = ServerStats::new("redis_local");
196    /// assert!(s.metrics.iter().all(|&v| v == 0));
197    /// ```
198    pub fn new(name: impl Into<String>) -> Self {
199        Self {
200            name: name.into(),
201            metrics: vec![0; SERVER_CODEC.len()],
202        }
203    }
204}
205
206/// Per-peer collected metrics. Mirrors `ServerStats` for cluster peers.
207///
208/// # Examples
209///
210/// ```
211/// use dynomite::stats::PeerStats;
212/// let p = PeerStats::new("peer-a");
213/// assert_eq!(p.name, "peer-a");
214/// ```
215#[derive(Clone, Debug)]
216pub struct PeerStats {
217    /// Peer name.
218    pub name: String,
219    /// Counter/gauge values indexed by `ServerField::index()`.
220    pub metrics: Vec<i64>,
221}
222
223impl PeerStats {
224    /// Construct a fresh `PeerStats` with all metrics zeroed.
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// use dynomite::stats::PeerStats;
230    /// let p = PeerStats::new("peer-a");
231    /// assert!(p.metrics.iter().all(|&v| v == 0));
232    /// ```
233    pub fn new(name: impl Into<String>) -> Self {
234        Self {
235            name: name.into(),
236            metrics: vec![0; SERVER_CODEC.len()],
237        }
238    }
239}
240
241/// Aggregate snapshot of the stats subsystem at a point in time.
242///
243/// This is the value rendered by [`Snapshot::to_json`] and exposed
244/// through the REST endpoint. It is `Send + Sync` and cheap to clone.
245#[derive(Clone, Debug, Default)]
246pub struct Snapshot {
247    /// Static identification strings.
248    pub info: ServiceInfo,
249    /// Seconds since the engine started.
250    pub uptime: i64,
251    /// Wall-clock seconds since UNIX epoch.
252    pub timestamp: i64,
253    /// Latency histogram summary.
254    pub latency: HistogramSummary,
255    /// Payload size histogram summary.
256    pub payload_size: HistogramSummary,
257    /// Cross-region RTT histogram summary.
258    pub cross_region_latency: HistogramSummary,
259    /// Cross-zone latency histogram summary.
260    pub cross_zone_latency: HistogramSummary,
261    /// Per-server latency summary.
262    pub server_latency: HistogramSummary,
263    /// Cross-region queue wait time summary.
264    pub cross_region_queue_wait: HistogramSummary,
265    /// Cross-zone queue wait time summary.
266    pub cross_zone_queue_wait: HistogramSummary,
267    /// Server queue wait time summary.
268    pub server_queue_wait: HistogramSummary,
269    /// 99th percentile of the client outbound queue length.
270    pub client_out_queue_p99: u64,
271    /// 99th percentile of the server inbound queue length.
272    pub server_in_queue_p99: u64,
273    /// 99th percentile of the server outbound queue length.
274    pub server_out_queue_p99: u64,
275    /// 99th percentile of the dnode client outbound queue length.
276    pub dnode_client_out_queue_p99: u64,
277    /// 99th percentile of the local-DC peer inbound queue length.
278    pub peer_in_queue_p99: u64,
279    /// 99th percentile of the local-DC peer outbound queue length.
280    pub peer_out_queue_p99: u64,
281    /// 99th percentile of the remote-DC peer inbound queue length.
282    pub remote_peer_in_queue_p99: u64,
283    /// 99th percentile of the remote-DC peer outbound queue length.
284    pub remote_peer_out_queue_p99: u64,
285    /// Number of message structs allocated.
286    pub alloc_msgs: i64,
287    /// Number of message structs on the free list.
288    pub free_msgs: i64,
289    /// Number of mbuf chunks allocated.
290    pub alloc_mbufs: i64,
291    /// Number of mbuf chunks on the free list.
292    pub free_mbufs: i64,
293    /// Resident set size in bytes.
294    pub dyn_memory: i64,
295    /// Aggregated pool counters.
296    pub pool: PoolStats,
297    /// Aggregated server counters.
298    pub server: ServerStats,
299    /// Aggregated failure-cause metrics.
300    pub failure: FailureSnapshot,
301}
302
303impl Snapshot {
304    /// Serialize the snapshot to a JSON string.
305    ///
306    /// The layout is a single JSON object with flat top-level fields
307    /// followed by a nested pool object containing the per-pool metric
308    /// counters and a per-server sub-object.
309    ///
310    /// # Examples
311    ///
312    /// ```
313    /// use dynomite::stats::{Snapshot, PoolStats, ServerStats};
314    ///
315    /// let mut snap = Snapshot::default();
316    /// snap.pool = PoolStats::new("dyn_o_mite");
317    /// snap.server = ServerStats::new("redis_local");
318    /// let s = snap.to_json();
319    /// assert!(s.starts_with('{'));
320    /// assert!(s.contains("\"dyn_o_mite\""));
321    /// ```
322    pub fn to_json(&self) -> String {
323        let mut out = String::new();
324        self.write_json(&mut out)
325            .expect("writing to a String never fails");
326        out
327    }
328
329    /// Render the snapshot as JSON into any [`fmt::Write`] sink.
330    ///
331    /// # Examples
332    ///
333    /// ```
334    /// use dynomite::stats::Snapshot;
335    /// let snap = Snapshot::default();
336    /// let mut s = String::new();
337    /// snap.write_json(&mut s).expect("writing into String never fails");
338    /// assert!(s.starts_with('{'));
339    /// ```
340    pub fn write_json<W: Write>(&self, w: &mut W) -> fmt::Result {
341        w.write_char('{')?;
342        self.write_header(w)?;
343        self.write_pool(w)?;
344        w.write_char('}')?;
345        Ok(())
346    }
347
348    fn write_header<W: Write>(&self, w: &mut W) -> fmt::Result {
349        write_string(w, "service", "dynomite")?;
350        write_string(w, "source", &self.info.source)?;
351        write_string(w, "version", &self.info.version)?;
352        write_num(w, "uptime", self.uptime)?;
353        write_num(w, "timestamp", self.timestamp)?;
354        write_string(w, "rack", &self.info.rack)?;
355        write_string(w, "dc", &self.info.dc)?;
356
357        write_num_u64(w, "latency_max", self.latency.max)?;
358        write_num_u64(w, "latency_999th", self.latency.p999)?;
359        write_num_u64(w, "latency_99th", self.latency.p99)?;
360        write_num_u64(w, "latency_95th", self.latency.p95)?;
361        write_num_u64(w, "latency_mean", self.latency.mean)?;
362
363        write_num_u64(w, "payload_size_max", self.payload_size.max)?;
364        write_num_u64(w, "payload_size_999th", self.payload_size.p999)?;
365        write_num_u64(w, "payload_size_99th", self.payload_size.p99)?;
366        write_num_u64(w, "payload_size_95th", self.payload_size.p95)?;
367        write_num_u64(w, "payload_size_mean", self.payload_size.mean)?;
368
369        self.write_cross_region_latency(w)?;
370        self.write_queue_wait(w)?;
371        self.write_queue_p99s(w)?;
372        self.write_resource_usage(w)?;
373        Ok(())
374    }
375
376    fn write_cross_region_latency<W: Write>(&self, w: &mut W) -> fmt::Result {
377        write_num_u64(
378            w,
379            "average_cross_region_rtt",
380            self.cross_region_latency.mean,
381        )?;
382        write_num_u64(w, "99_cross_region_rtt", self.cross_region_latency.p99)?;
383        write_num_u64(
384            w,
385            "average_cross_zone_latency",
386            self.cross_zone_latency.mean,
387        )?;
388        write_num_u64(w, "99_cross_zone_latency", self.cross_zone_latency.p99)?;
389        write_num_u64(w, "average_server_latency", self.server_latency.mean)?;
390        write_num_u64(w, "99_server_latency", self.server_latency.p99)?;
391        Ok(())
392    }
393
394    fn write_queue_wait<W: Write>(&self, w: &mut W) -> fmt::Result {
395        write_num_u64(
396            w,
397            "average_cross_region_queue_wait",
398            self.cross_region_queue_wait.mean,
399        )?;
400        write_num_u64(
401            w,
402            "99_cross_region_queue_wait",
403            self.cross_region_queue_wait.p99,
404        )?;
405        write_num_u64(
406            w,
407            "average_cross_zone_queue_wait",
408            self.cross_zone_queue_wait.mean,
409        )?;
410        write_num_u64(
411            w,
412            "99_cross_zone_queue_wait",
413            self.cross_zone_queue_wait.p99,
414        )?;
415        write_num_u64(w, "average_server_queue_wait", self.server_queue_wait.mean)?;
416        write_num_u64(w, "99_server_queue_wait", self.server_queue_wait.p99)?;
417        Ok(())
418    }
419
420    fn write_queue_p99s<W: Write>(&self, w: &mut W) -> fmt::Result {
421        write_num_u64(w, "client_out_queue_99", self.client_out_queue_p99)?;
422        write_num_u64(w, "server_in_queue_99", self.server_in_queue_p99)?;
423        write_num_u64(w, "server_out_queue_99", self.server_out_queue_p99)?;
424        write_num_u64(
425            w,
426            "dnode_client_out_queue_99",
427            self.dnode_client_out_queue_p99,
428        )?;
429        write_num_u64(w, "peer_in_queue_99", self.peer_in_queue_p99)?;
430        write_num_u64(w, "peer_out_queue_99", self.peer_out_queue_p99)?;
431        write_num_u64(
432            w,
433            "remote_peer_out_queue_99",
434            self.remote_peer_out_queue_p99,
435        )?;
436        write_num_u64(w, "remote_peer_in_queue_99", self.remote_peer_in_queue_p99)?;
437        Ok(())
438    }
439
440    fn write_resource_usage<W: Write>(&self, w: &mut W) -> fmt::Result {
441        write_num(w, "alloc_msgs", self.alloc_msgs)?;
442        write_num(w, "free_msgs", self.free_msgs)?;
443        write_num(w, "alloc_mbufs", self.alloc_mbufs)?;
444        write_num(w, "free_mbufs", self.free_mbufs)?;
445        write_num(w, "dyn_memory", self.dyn_memory)?;
446        Ok(())
447    }
448
449    fn write_pool<W: Write>(&self, w: &mut W) -> fmt::Result {
450        write!(w, "\"{}\":{{", escape_str(&self.pool.name))?;
451        for (i, spec) in POOL_CODEC.iter().enumerate() {
452            if !is_visible_metric(spec.kind) {
453                continue;
454            }
455            let value = self.pool.metrics.get(i).copied().unwrap_or(0);
456            write_num(w, spec.name, value)?;
457        }
458        self.write_server(w)?;
459        w.write_str("}")?;
460        Ok(())
461    }
462
463    fn write_server<W: Write>(&self, w: &mut W) -> fmt::Result {
464        write!(w, "\"{}\":{{", escape_str(&self.server.name))?;
465        let server_visible: Vec<usize> = SERVER_CODEC
466            .iter()
467            .enumerate()
468            .filter(|(_, s)| is_visible_metric(s.kind))
469            .map(|(i, _)| i)
470            .collect();
471        for (j, idx) in server_visible.iter().copied().enumerate() {
472            let spec = &SERVER_CODEC[idx];
473            let value = self.server.metrics.get(idx).copied().unwrap_or(0);
474            if j + 1 == server_visible.len() {
475                write_num_no_comma(w, spec.name, value)?;
476            } else {
477                write_num(w, spec.name, value)?;
478            }
479        }
480        w.write_str("}")?;
481        Ok(())
482    }
483}
484
485/// Whether a metric kind appears in the JSON output. Counters, gauges,
486/// and timestamps are all rendered as numbers; the C reference omits
487/// invalid/string metric kinds entirely.
488fn is_visible_metric(kind: StatsMetricType) -> bool {
489    matches!(
490        kind,
491        StatsMetricType::Counter | StatsMetricType::Gauge | StatsMetricType::Timestamp
492    )
493}
494
495fn write_string<W: Write>(w: &mut W, key: &str, value: &str) -> fmt::Result {
496    write!(w, "\"{}\":\"{}\",", escape_str(key), escape_str(value))
497}
498
499fn write_num<W: Write>(w: &mut W, key: &str, value: i64) -> fmt::Result {
500    write!(w, "\"{}\":{value},", escape_str(key))
501}
502
503fn write_num_no_comma<W: Write>(w: &mut W, key: &str, value: i64) -> fmt::Result {
504    write!(w, "\"{}\":{value}", escape_str(key))
505}
506
507fn write_num_u64<W: Write>(w: &mut W, key: &str, value: u64) -> fmt::Result {
508    write!(w, "\"{}\":{value},", escape_str(key))
509}
510
511/// Minimal JSON string escaping. Backslashes, quotes, and control
512/// characters below 0x20 are escaped; everything else passes through.
513fn escape_str(s: &str) -> EscapedStr<'_> {
514    EscapedStr(s)
515}
516
517struct EscapedStr<'a>(&'a str);
518
519impl fmt::Display for EscapedStr<'_> {
520    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521        for ch in self.0.chars() {
522            match ch {
523                '\\' => f.write_str("\\\\")?,
524                '"' => f.write_str("\\\"")?,
525                '\n' => f.write_str("\\n")?,
526                '\r' => f.write_str("\\r")?,
527                '\t' => f.write_str("\\t")?,
528                c if (c as u32) < 0x20 => write!(f, "\\u{:04x}", c as u32)?,
529                c => f.write_char(c)?,
530            }
531        }
532        Ok(())
533    }
534}
535
536/// Returns the human-readable description block printed by the `-D`
537/// command-line flag.
538///
539/// # Examples
540///
541/// ```
542/// let text = dynomite::stats::describe_stats();
543/// assert!(text.contains("pool stats:"));
544/// assert!(text.contains("server stats:"));
545/// ```
546pub fn describe_stats() -> String {
547    let mut out = String::new();
548    out.push_str("pool stats:\n");
549    for spec in POOL_CODEC {
550        let _ = writeln!(out, "  {:<20}\"{}\"", spec.name, spec.description);
551    }
552    out.push('\n');
553    out.push_str("server stats:\n");
554    for spec in SERVER_CODEC {
555        let _ = writeln!(out, "  {:<20}\"{}\"", spec.name, spec.description);
556    }
557    out
558}
559
560#[cfg(test)]
561mod tests {
562    use super::*;
563
564    #[test]
565    fn ceil_helper_matches_known_values() {
566        assert_eq!(ceil_f64_to_u64(0.0), 0);
567        assert_eq!(ceil_f64_to_u64(1.0), 1);
568        assert_eq!(ceil_f64_to_u64(1.5), 2);
569        assert_eq!(ceil_f64_to_u64(2.0), 2);
570        assert_eq!(ceil_f64_to_u64(99.99), 100);
571        assert_eq!(ceil_f64_to_u64(f64::NAN), 0);
572        assert_eq!(ceil_f64_to_u64(f64::INFINITY), 0);
573        assert_eq!(ceil_f64_to_u64(-1.0), 0);
574    }
575
576    #[test]
577    fn empty_snapshot_renders_to_valid_json_skeleton() {
578        let snap = Snapshot {
579            pool: PoolStats::new("dyn_o_mite"),
580            server: ServerStats::new("redis"),
581            ..Snapshot::default()
582        };
583        let s = snap.to_json();
584        assert!(s.starts_with('{'));
585        assert!(s.ends_with('}'));
586        assert!(s.contains("\"service\":\"dynomite\""));
587        assert!(s.contains("\"dyn_o_mite\":{"));
588        assert!(s.contains("\"redis\":{"));
589    }
590
591    #[test]
592    fn describe_lists_every_metric() {
593        let text = describe_stats();
594        for spec in POOL_CODEC {
595            assert!(
596                text.contains(spec.name),
597                "pool metric {} missing",
598                spec.name
599            );
600            assert!(text.contains(spec.description));
601        }
602        for spec in SERVER_CODEC {
603            assert!(
604                text.contains(spec.name),
605                "server metric {} missing",
606                spec.name
607            );
608        }
609    }
610
611    #[test]
612    fn escape_handles_quotes_and_controls() {
613        let s = EscapedStr("a\"b\nc").to_string();
614        assert_eq!(s, r#"a\"b\nc"#);
615    }
616}