Skip to main content

dynomite/stats/
mod.rs

1//! Pool, server, and peer metrics with histograms and a JSON snapshot.
2//!
3//! The stats subsystem is split into small modules:
4//!
5//! * [`Histogram`] - Cassandra-style estimated histogram.
6//! * [`PoolField`] / [`ServerField`] - typed metric handles.
7//! * [`Snapshot`] - aggregate value rendered to JSON.
8//! * [`StatsServer`] - REST endpoint serving the latest snapshot.
9//!
10//! [`Stats`] glues the pieces together: a writer accumulates counters,
11//! gauges, and histogram observations; a periodic aggregator publishes
12//! a fresh [`Snapshot`] that the REST endpoint serves.
13
14mod codec;
15mod failure;
16mod histogram;
17mod numeric;
18mod prometheus;
19mod rest;
20mod snapshot;
21
22use std::sync::Arc;
23use std::time::{SystemTime, UNIX_EPOCH};
24
25use parking_lot::Mutex;
26use tokio::time::{Duration, Instant};
27use tokio_util::sync::CancellationToken;
28
29pub use crate::stats::codec::{
30    MetricSpec, PoolField, ServerField, StatsMetricType, POOL_CODEC, SERVER_CODEC,
31};
32pub use crate::stats::failure::{
33    FailureMetrics, FailureSnapshot, NoTargetsEntry, PeerEntry, PeerStateEntry, PhiEntry,
34    TimeoutEntry, TransitionEntry,
35};
36pub use crate::stats::histogram::{Histogram, BUCKET_COUNT};
37pub use crate::stats::prometheus::render_prometheus;
38pub use crate::stats::rest::{ClusterInfoProvider, StatsServer, MAX_HEADERS, MAX_REQUEST_BYTES};
39pub use crate::stats::snapshot::{
40    describe_stats, HistogramSummary, PeerStats, PoolStats, ServerStats, ServiceInfo, Snapshot,
41};
42
43/// Live, mutable counters and histograms for a single engine instance.
44///
45/// `Stats` is the writer side; readers consume frozen [`Snapshot`]
46/// values produced by [`Stats::snapshot`].
47///
48/// # Examples
49///
50/// ```
51/// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
52/// let stats = Stats::new(
53///     ServiceInfo::default(),
54///     PoolStats::new("dyn_o_mite"),
55///     ServerStats::new("redis"),
56/// );
57/// assert_eq!(stats.snapshot().pool.name, "dyn_o_mite");
58/// ```
59#[derive(Debug)]
60pub struct Stats {
61    inner: Arc<Mutex<StatsInner>>,
62    failure: Arc<FailureMetrics>,
63    started: Instant,
64}
65
66#[derive(Debug)]
67struct StatsInner {
68    info: ServiceInfo,
69    pool: PoolStats,
70    server: ServerStats,
71    latency: Histogram,
72    payload_size: Histogram,
73    cross_region_latency: Histogram,
74    cross_zone_latency: Histogram,
75    server_latency: Histogram,
76    cross_region_queue_wait: Histogram,
77    cross_zone_queue_wait: Histogram,
78    server_queue_wait: Histogram,
79    client_out_queue: Histogram,
80    server_in_queue: Histogram,
81    server_out_queue: Histogram,
82    dnode_client_out_queue: Histogram,
83    peer_in_queue: Histogram,
84    peer_out_queue: Histogram,
85    remote_peer_in_queue: Histogram,
86    remote_peer_out_queue: Histogram,
87    alloc_msgs: i64,
88    free_msgs: i64,
89    alloc_mbufs: i64,
90    free_mbufs: i64,
91    dyn_memory: i64,
92}
93
94impl StatsInner {
95    fn new(info: ServiceInfo, pool: PoolStats, server: ServerStats) -> Self {
96        Self {
97            info,
98            pool,
99            server,
100            latency: Histogram::new(),
101            payload_size: Histogram::new(),
102            cross_region_latency: Histogram::new(),
103            cross_zone_latency: Histogram::new(),
104            server_latency: Histogram::new(),
105            cross_region_queue_wait: Histogram::new(),
106            cross_zone_queue_wait: Histogram::new(),
107            server_queue_wait: Histogram::new(),
108            client_out_queue: Histogram::new(),
109            server_in_queue: Histogram::new(),
110            server_out_queue: Histogram::new(),
111            dnode_client_out_queue: Histogram::new(),
112            peer_in_queue: Histogram::new(),
113            peer_out_queue: Histogram::new(),
114            remote_peer_in_queue: Histogram::new(),
115            remote_peer_out_queue: Histogram::new(),
116            alloc_msgs: 0,
117            free_msgs: 0,
118            alloc_mbufs: 0,
119            free_mbufs: 0,
120            dyn_memory: 0,
121        }
122    }
123}
124
125/// Channels used to mutate histogram observations.
126///
127/// # Examples
128///
129/// ```
130/// use dynomite::stats::Latency;
131/// assert_ne!(Latency::Request, Latency::Server);
132/// assert_eq!(Latency::Request, Latency::Request);
133/// // The variant set is small and copy-able.
134/// let copied = Latency::CrossRegion;
135/// assert_eq!(copied, Latency::CrossRegion);
136/// ```
137#[derive(Copy, Clone, Eq, PartialEq, Debug)]
138pub enum Latency {
139    /// Top-level request latency.
140    Request,
141    /// Cross-region peer round-trip time.
142    CrossRegion,
143    /// Cross-zone peer latency.
144    CrossZone,
145    /// Backing-server response latency.
146    Server,
147}
148
149/// Channels used for queue-wait-time observations.
150///
151/// # Examples
152///
153/// ```
154/// use dynomite::stats::QueueWait;
155/// assert_ne!(QueueWait::CrossRegion, QueueWait::CrossZone);
156/// assert_ne!(QueueWait::CrossZone, QueueWait::Server);
157/// // Variants implement Copy, so a binding survives a move.
158/// let original = QueueWait::Server;
159/// let copy = original;
160/// assert_eq!(original, copy);
161/// ```
162#[derive(Copy, Clone, Eq, PartialEq, Debug)]
163pub enum QueueWait {
164    /// Cross-region queue wait time.
165    CrossRegion,
166    /// Cross-zone queue wait time.
167    CrossZone,
168    /// Backing-server queue wait time.
169    Server,
170}
171
172/// Channels used for queue-length observations (observed at sample
173/// time, not events).
174///
175/// # Examples
176///
177/// ```
178/// use dynomite::stats::QueueGauge;
179/// // Each variant is distinct so the dispatch in `record_queue_len`
180/// // routes to a unique histogram.
181/// let all = [
182///     QueueGauge::ClientOut,
183///     QueueGauge::ServerIn,
184///     QueueGauge::ServerOut,
185///     QueueGauge::DnodeClientOut,
186///     QueueGauge::PeerIn,
187///     QueueGauge::PeerOut,
188///     QueueGauge::RemotePeerIn,
189///     QueueGauge::RemotePeerOut,
190/// ];
191/// for (i, lhs) in all.iter().enumerate() {
192///     for rhs in &all[i + 1..] {
193///         assert_ne!(lhs, rhs);
194///     }
195/// }
196/// ```
197#[derive(Copy, Clone, Eq, PartialEq, Debug)]
198pub enum QueueGauge {
199    /// Client out-queue length.
200    ClientOut,
201    /// Server in-queue length.
202    ServerIn,
203    /// Server out-queue length.
204    ServerOut,
205    /// Dnode client out-queue length.
206    DnodeClientOut,
207    /// Local-DC peer in-queue length.
208    PeerIn,
209    /// Local-DC peer out-queue length.
210    PeerOut,
211    /// Remote-DC peer in-queue length.
212    RemotePeerIn,
213    /// Remote-DC peer out-queue length.
214    RemotePeerOut,
215}
216
217impl Stats {
218    /// Construct a new `Stats` with empty counters and histograms.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
224    ///
225    /// let info = ServiceInfo {
226    ///     source: "node-a".into(),
227    ///     version: "0.0.1".into(),
228    ///     rack: "r1".into(),
229    ///     dc: "dc1".into(),
230    /// };
231    /// let stats = Stats::new(
232    ///     info,
233    ///     PoolStats::new("dyn_o_mite"),
234    ///     ServerStats::new("redis_local"),
235    /// );
236    /// let snap = stats.snapshot();
237    /// assert_eq!(snap.pool.name, "dyn_o_mite");
238    /// ```
239    pub fn new(info: ServiceInfo, pool: PoolStats, server: ServerStats) -> Self {
240        Self {
241            inner: Arc::new(Mutex::new(StatsInner::new(info, pool, server))),
242            failure: Arc::new(FailureMetrics::new()),
243            started: Instant::now(),
244        }
245    }
246
247    /// Borrow the failure-cause metrics handle.
248    ///
249    /// The dispatcher and the gossip handler clone this `Arc`
250    /// so they can record per-cause errors and per-peer state
251    /// transitions. The handle is created at construction time
252    /// and lives for the lifetime of the [`Stats`] value.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
258    /// let s = Stats::new(
259    ///     ServiceInfo::default(),
260    ///     PoolStats::new("p"),
261    ///     ServerStats::new("s"),
262    /// );
263    /// let m = s.failure_metrics();
264    /// assert!(m.snapshot().is_empty());
265    /// ```
266    #[must_use]
267    pub fn failure_metrics(&self) -> Arc<FailureMetrics> {
268        self.failure.clone()
269    }
270
271    /// Record a latency observation in the matching histogram.
272    ///
273    /// # Examples
274    ///
275    /// ```
276    /// use dynomite::stats::{Latency, PoolStats, ServerStats, ServiceInfo, Stats};
277    /// let stats = Stats::new(
278    ///     ServiceInfo::default(),
279    ///     PoolStats::new("p"),
280    ///     ServerStats::new("s"),
281    /// );
282    /// stats.record_latency(Latency::Request, 100);
283    /// ```
284    pub fn record_latency(&self, channel: Latency, value: u64) {
285        let mut inner = self.inner.lock();
286        match channel {
287            Latency::Request => inner.latency.record(value),
288            Latency::CrossRegion => inner.cross_region_latency.record(value),
289            Latency::CrossZone => inner.cross_zone_latency.record(value),
290            Latency::Server => inner.server_latency.record(value),
291        }
292    }
293
294    /// Record a payload-size observation.
295    ///
296    /// # Examples
297    ///
298    /// ```
299    /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
300    /// let stats = Stats::new(
301    ///     ServiceInfo::default(),
302    ///     PoolStats::new("p"),
303    ///     ServerStats::new("s"),
304    /// );
305    /// stats.record_payload_size(2048);
306    /// ```
307    pub fn record_payload_size(&self, value: u64) {
308        self.inner.lock().payload_size.record(value);
309    }
310
311    /// Record a queue wait time observation.
312    ///
313    /// # Examples
314    ///
315    /// ```
316    /// use dynomite::stats::{PoolStats, QueueWait, ServerStats, ServiceInfo, Stats};
317    /// let stats = Stats::new(
318    ///     ServiceInfo::default(),
319    ///     PoolStats::new("p"),
320    ///     ServerStats::new("s"),
321    /// );
322    /// stats.record_queue_wait(QueueWait::Server, 12);
323    /// ```
324    pub fn record_queue_wait(&self, channel: QueueWait, value: u64) {
325        let mut inner = self.inner.lock();
326        match channel {
327            QueueWait::CrossRegion => inner.cross_region_queue_wait.record(value),
328            QueueWait::CrossZone => inner.cross_zone_queue_wait.record(value),
329            QueueWait::Server => inner.server_queue_wait.record(value),
330        }
331    }
332
333    /// Record a queue-length sample.
334    ///
335    /// # Examples
336    ///
337    /// ```
338    /// use dynomite::stats::{PoolStats, QueueGauge, ServerStats, ServiceInfo, Stats};
339    /// let stats = Stats::new(
340    ///     ServiceInfo::default(),
341    ///     PoolStats::new("p"),
342    ///     ServerStats::new("s"),
343    /// );
344    /// stats.record_queue_len(QueueGauge::ClientOut, 4);
345    /// ```
346    pub fn record_queue_len(&self, channel: QueueGauge, value: u64) {
347        let mut inner = self.inner.lock();
348        match channel {
349            QueueGauge::ClientOut => inner.client_out_queue.record(value),
350            QueueGauge::ServerIn => inner.server_in_queue.record(value),
351            QueueGauge::ServerOut => inner.server_out_queue.record(value),
352            QueueGauge::DnodeClientOut => inner.dnode_client_out_queue.record(value),
353            QueueGauge::PeerIn => inner.peer_in_queue.record(value),
354            QueueGauge::PeerOut => inner.peer_out_queue.record(value),
355            QueueGauge::RemotePeerIn => inner.remote_peer_in_queue.record(value),
356            QueueGauge::RemotePeerOut => inner.remote_peer_out_queue.record(value),
357        }
358    }
359
360    /// Increment a pool counter or gauge by one.
361    ///
362    /// # Examples
363    ///
364    /// ```
365    /// use dynomite::stats::{PoolField, PoolStats, ServerStats, ServiceInfo, Stats};
366    /// let stats = Stats::new(
367    ///     ServiceInfo::default(),
368    ///     PoolStats::new("p"),
369    ///     ServerStats::new("s"),
370    /// );
371    /// stats.pool_incr(PoolField::ClientEof);
372    /// assert_eq!(stats.pool_get(PoolField::ClientEof), 1);
373    /// ```
374    pub fn pool_incr(&self, field: PoolField) {
375        self.pool_incr_by(field, 1);
376    }
377
378    /// Decrement a pool gauge by one.
379    ///
380    /// # Examples
381    ///
382    /// ```
383    /// use dynomite::stats::{PoolField, PoolStats, ServerStats, ServiceInfo, Stats};
384    /// let stats = Stats::new(
385    ///     ServiceInfo::default(),
386    ///     PoolStats::new("p"),
387    ///     ServerStats::new("s"),
388    /// );
389    /// stats.pool_set(PoolField::ClientConnections, 5);
390    /// stats.pool_decr(PoolField::ClientConnections);
391    /// assert_eq!(stats.pool_get(PoolField::ClientConnections), 4);
392    /// ```
393    pub fn pool_decr(&self, field: PoolField) {
394        self.pool_incr_by(field, -1);
395    }
396
397    /// Add `delta` to a pool counter or gauge.
398    ///
399    /// Wraps on overflow to mirror the reference engine's `++` / `+=`
400    /// semantics. Counters are 64-bit signed and never reach the wrap
401    /// boundary under realistic workloads.
402    pub fn pool_incr_by(&self, field: PoolField, delta: i64) {
403        let mut inner = self.inner.lock();
404        let slot = &mut inner.pool.metrics[field.index()];
405        *slot = slot.wrapping_add(delta);
406    }
407
408    /// Set a pool gauge or timestamp to an absolute value.
409    ///
410    /// # Examples
411    ///
412    /// ```
413    /// use dynomite::stats::{PoolField, PoolStats, ServerStats, ServiceInfo, Stats};
414    /// let stats = Stats::new(
415    ///     ServiceInfo::default(),
416    ///     PoolStats::new("p"),
417    ///     ServerStats::new("s"),
418    /// );
419    /// stats.pool_set(PoolField::PeerEjectedAt, 1_700_000_000);
420    /// assert_eq!(stats.pool_get(PoolField::PeerEjectedAt), 1_700_000_000);
421    /// ```
422    pub fn pool_set(&self, field: PoolField, value: i64) {
423        self.inner.lock().pool.metrics[field.index()] = value;
424    }
425
426    /// Read the current value of a pool metric.
427    ///
428    /// # Examples
429    ///
430    /// ```
431    /// use dynomite::stats::{PoolField, PoolStats, ServerStats, ServiceInfo, Stats};
432    /// let stats = Stats::new(
433    ///     ServiceInfo::default(),
434    ///     PoolStats::new("p"),
435    ///     ServerStats::new("s"),
436    /// );
437    /// assert_eq!(stats.pool_get(PoolField::ClientEof), 0);
438    /// ```
439    pub fn pool_get(&self, field: PoolField) -> i64 {
440        self.inner.lock().pool.metrics[field.index()]
441    }
442
443    /// Increment a server counter or gauge by one.
444    ///
445    /// # Examples
446    ///
447    /// ```
448    /// use dynomite::stats::{PoolStats, ServerField, ServerStats, ServiceInfo, Stats};
449    /// let stats = Stats::new(
450    ///     ServiceInfo::default(),
451    ///     PoolStats::new("p"),
452    ///     ServerStats::new("s"),
453    /// );
454    /// stats.server_incr(ServerField::ReadRequests);
455    /// assert_eq!(stats.server_get(ServerField::ReadRequests), 1);
456    /// ```
457    pub fn server_incr(&self, field: ServerField) {
458        self.server_incr_by(field, 1);
459    }
460
461    /// Decrement a server gauge by one.
462    ///
463    /// # Examples
464    ///
465    /// ```
466    /// use dynomite::stats::{PoolStats, ServerField, ServerStats, ServiceInfo, Stats};
467    /// let stats = Stats::new(
468    ///     ServiceInfo::default(),
469    ///     PoolStats::new("p"),
470    ///     ServerStats::new("s"),
471    /// );
472    /// stats.server_set(ServerField::InQueue, 3);
473    /// stats.server_decr(ServerField::InQueue);
474    /// assert_eq!(stats.server_get(ServerField::InQueue), 2);
475    /// ```
476    pub fn server_decr(&self, field: ServerField) {
477        self.server_incr_by(field, -1);
478    }
479
480    /// Add `delta` to a server counter or gauge.
481    ///
482    /// Wraps on overflow to mirror the reference engine's `++` / `+=`
483    /// semantics. Counters are 64-bit signed and never reach the wrap
484    /// boundary under realistic workloads.
485    pub fn server_incr_by(&self, field: ServerField, delta: i64) {
486        let mut inner = self.inner.lock();
487        let slot = &mut inner.server.metrics[field.index()];
488        *slot = slot.wrapping_add(delta);
489    }
490
491    /// Set a server gauge or timestamp to an absolute value.
492    ///
493    /// # Examples
494    ///
495    /// ```
496    /// use dynomite::stats::{PoolStats, ServerField, ServerStats, ServiceInfo, Stats};
497    /// let stats = Stats::new(
498    ///     ServiceInfo::default(),
499    ///     PoolStats::new("p"),
500    ///     ServerStats::new("s"),
501    /// );
502    /// stats.server_set(ServerField::ServerEjectedAt, 1_700_000_000);
503    /// assert_eq!(stats.server_get(ServerField::ServerEjectedAt), 1_700_000_000);
504    /// ```
505    pub fn server_set(&self, field: ServerField, value: i64) {
506        self.inner.lock().server.metrics[field.index()] = value;
507    }
508
509    /// Read the current value of a server metric.
510    ///
511    /// # Examples
512    ///
513    /// ```
514    /// use dynomite::stats::{PoolStats, ServerField, ServerStats, ServiceInfo, Stats};
515    /// let stats = Stats::new(
516    ///     ServiceInfo::default(),
517    ///     PoolStats::new("p"),
518    ///     ServerStats::new("s"),
519    /// );
520    /// assert_eq!(stats.server_get(ServerField::ReadRequests), 0);
521    /// ```
522    pub fn server_get(&self, field: ServerField) -> i64 {
523        self.inner.lock().server.metrics[field.index()]
524    }
525
526    /// Set the resource usage gauges that the reference engine samples
527    /// once per aggregation cycle.
528    ///
529    /// # Examples
530    ///
531    /// ```
532    /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
533    /// let stats = Stats::new(
534    ///     ServiceInfo::default(),
535    ///     PoolStats::new("p"),
536    ///     ServerStats::new("s"),
537    /// );
538    /// stats.set_resource_usage(0, 0, 0, 0, 0);
539    /// assert_eq!(stats.snapshot().alloc_msgs, 0);
540    /// ```
541    pub fn set_resource_usage(
542        &self,
543        alloc_msgs: i64,
544        free_msgs: i64,
545        alloc_mbufs: i64,
546        free_mbufs: i64,
547        dyn_memory: i64,
548    ) {
549        let mut inner = self.inner.lock();
550        inner.alloc_msgs = alloc_msgs;
551        inner.free_msgs = free_msgs;
552        inner.alloc_mbufs = alloc_mbufs;
553        inner.free_mbufs = free_mbufs;
554        inner.dyn_memory = dyn_memory;
555    }
556
557    /// Build an immutable snapshot of every counter, gauge, and
558    /// histogram quantile at the current instant.
559    ///
560    /// # Examples
561    ///
562    /// ```
563    /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
564    /// let stats = Stats::new(
565    ///     ServiceInfo::default(),
566    ///     PoolStats::new("p"),
567    ///     ServerStats::new("s"),
568    /// );
569    /// let snap = stats.snapshot();
570    /// assert_eq!(snap.pool.name, "p");
571    /// ```
572    pub fn snapshot(&self) -> Snapshot {
573        let inner = self.inner.lock();
574        let elapsed = self.started.elapsed();
575        let timestamp = SystemTime::now()
576            .duration_since(UNIX_EPOCH)
577            .map(|d| d.as_secs())
578            .unwrap_or(0);
579        Snapshot {
580            info: inner.info.clone(),
581            uptime: i64::try_from(elapsed.as_secs()).unwrap_or(i64::MAX),
582            timestamp: i64::try_from(timestamp).unwrap_or(i64::MAX),
583            latency: HistogramSummary::from_histogram(&inner.latency),
584            payload_size: HistogramSummary::from_histogram(&inner.payload_size),
585            cross_region_latency: HistogramSummary::from_histogram(&inner.cross_region_latency),
586            cross_zone_latency: HistogramSummary::from_histogram(&inner.cross_zone_latency),
587            server_latency: HistogramSummary::from_histogram(&inner.server_latency),
588            cross_region_queue_wait: HistogramSummary::from_histogram(
589                &inner.cross_region_queue_wait,
590            ),
591            cross_zone_queue_wait: HistogramSummary::from_histogram(&inner.cross_zone_queue_wait),
592            server_queue_wait: HistogramSummary::from_histogram(&inner.server_queue_wait),
593            client_out_queue_p99: queue_p99(&inner.client_out_queue),
594            server_in_queue_p99: queue_p99(&inner.server_in_queue),
595            server_out_queue_p99: queue_p99(&inner.server_out_queue),
596            dnode_client_out_queue_p99: queue_p99(&inner.dnode_client_out_queue),
597            peer_in_queue_p99: queue_p99(&inner.peer_in_queue),
598            peer_out_queue_p99: queue_p99(&inner.peer_out_queue),
599            remote_peer_in_queue_p99: queue_p99(&inner.remote_peer_in_queue),
600            remote_peer_out_queue_p99: queue_p99(&inner.remote_peer_out_queue),
601            alloc_msgs: inner.alloc_msgs,
602            free_msgs: inner.free_msgs,
603            alloc_mbufs: inner.alloc_mbufs,
604            free_mbufs: inner.free_mbufs,
605            dyn_memory: inner.dyn_memory,
606            pool: inner.pool.clone(),
607            server: inner.server.clone(),
608            failure: self.failure.snapshot(),
609        }
610    }
611
612    /// Reset every histogram. The reference engine does this every
613    /// five minutes from inside the aggregation loop.
614    ///
615    /// # Examples
616    ///
617    /// ```
618    /// use dynomite::stats::{Latency, PoolStats, ServerStats, ServiceInfo, Stats};
619    /// let stats = Stats::new(
620    ///     ServiceInfo::default(),
621    ///     PoolStats::new("p"),
622    ///     ServerStats::new("s"),
623    /// );
624    /// stats.record_latency(Latency::Request, 50);
625    /// stats.reset_histograms();
626    /// assert_eq!(stats.snapshot().latency.max, 0);
627    /// ```
628    pub fn reset_histograms(&self) {
629        let mut inner = self.inner.lock();
630        inner.latency.reset();
631        inner.payload_size.reset();
632        inner.cross_region_latency.reset();
633        inner.cross_zone_latency.reset();
634        inner.server_latency.reset();
635        inner.cross_region_queue_wait.reset();
636        inner.cross_zone_queue_wait.reset();
637        inner.server_queue_wait.reset();
638        inner.client_out_queue.reset();
639        inner.server_in_queue.reset();
640        inner.server_out_queue.reset();
641        inner.dnode_client_out_queue.reset();
642        inner.peer_in_queue.reset();
643        inner.peer_out_queue.reset();
644        inner.remote_peer_in_queue.reset();
645        inner.remote_peer_out_queue.reset();
646    }
647}
648
649/// Returns the queue p99 from `h`, or `0` when the histogram is
650/// overflowing. The reference implementation suppresses percentile
651/// publishing in the overflow path; mirroring that keeps overflow
652/// values from leaking into the JSON output as `u64::MAX`.
653fn queue_p99(h: &Histogram) -> u64 {
654    if h.is_overflowing() {
655        0
656    } else {
657        h.percentile(0.99)
658    }
659}
660
661/// Async aggregator handle: snapshots at a fixed interval into a
662/// shared cell that the REST server reads from.
663///
664/// # Examples
665///
666/// ```no_run
667/// use std::sync::Arc;
668/// use std::time::Duration;
669/// use dynomite::stats::{Aggregator, PoolStats, ServerStats, ServiceInfo, Snapshot, Stats};
670/// use parking_lot::Mutex;
671/// use tokio_util::sync::CancellationToken;
672///
673/// # async fn _example() {
674/// let stats = Arc::new(Stats::new(
675///     ServiceInfo::default(),
676///     PoolStats::new("dyn_o_mite"),
677///     ServerStats::new("redis"),
678/// ));
679/// let sink = Arc::new(Mutex::new(Snapshot::default()));
680/// let token = CancellationToken::new();
681/// let agg = Aggregator::new(stats, sink, Duration::from_secs(1), Duration::from_secs(300));
682/// let _ = tokio::spawn({ let token = token.clone(); async move { agg.run(token).await } });
683/// token.cancel();
684/// # }
685/// ```
686pub struct Aggregator {
687    stats: Arc<Stats>,
688    sink: Arc<Mutex<Snapshot>>,
689    interval: Duration,
690    histogram_reset: Duration,
691}
692
693impl Aggregator {
694    /// Create a new aggregator. The aggregation loop reads from
695    /// `stats` and publishes to `sink` once every `interval`.
696    /// Histograms are reset every `histogram_reset` elapsed time, the
697    /// same five-minute cadence the C reference uses by default.
698    ///
699    /// # Examples
700    ///
701    /// ```
702    /// use std::sync::Arc;
703    /// use std::time::Duration;
704    /// use dynomite::stats::{Aggregator, PoolStats, ServerStats, ServiceInfo, Snapshot, Stats};
705    /// use parking_lot::Mutex;
706    ///
707    /// let stats = Arc::new(Stats::new(
708    ///     ServiceInfo::default(),
709    ///     PoolStats::new("dyn_o_mite"),
710    ///     ServerStats::new("redis"),
711    /// ));
712    /// let sink = Arc::new(Mutex::new(Snapshot::default()));
713    /// let _agg = Aggregator::new(stats, sink, Duration::from_secs(1), Duration::from_secs(300));
714    /// ```
715    pub fn new(
716        stats: Arc<Stats>,
717        sink: Arc<Mutex<Snapshot>>,
718        interval: Duration,
719        histogram_reset: Duration,
720    ) -> Self {
721        Self {
722            stats,
723            sink,
724            interval,
725            histogram_reset,
726        }
727    }
728
729    /// Run the aggregation loop until `cancel` is triggered. The future
730    /// returns `()` after observing cancellation; callers that want a
731    /// clean shutdown should clone the token and call
732    /// [`CancellationToken::cancel`] on it.
733    ///
734    /// # Examples
735    ///
736    /// ```no_run
737    /// use std::sync::Arc;
738    /// use std::time::Duration;
739    /// use dynomite::stats::{Aggregator, PoolStats, ServerStats, ServiceInfo, Snapshot, Stats};
740    /// use parking_lot::Mutex;
741    /// use tokio_util::sync::CancellationToken;
742    ///
743    /// # async fn _example() {
744    /// let stats = Arc::new(Stats::new(
745    ///     ServiceInfo::default(),
746    ///     PoolStats::new("dyn_o_mite"),
747    ///     ServerStats::new("redis"),
748    /// ));
749    /// let sink = Arc::new(Mutex::new(Snapshot::default()));
750    /// let token = CancellationToken::new();
751    /// let agg = Aggregator::new(stats, sink, Duration::from_secs(1), Duration::from_secs(300));
752    /// let cancel = token.clone();
753    /// let handle = tokio::spawn(async move { agg.run(cancel).await });
754    /// token.cancel();
755    /// let _ = handle.await;
756    /// # }
757    /// ```
758    pub async fn run(self, cancel: CancellationToken) {
759        let mut ticker = tokio::time::interval(self.interval);
760        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
761        let mut last_reset = Instant::now();
762        loop {
763            tokio::select! {
764                biased;
765                () = cancel.cancelled() => return,
766                _ = ticker.tick() => {}
767            }
768            let snap = self.stats.snapshot();
769            *self.sink.lock() = snap;
770            if last_reset.elapsed() >= self.histogram_reset {
771                self.stats.reset_histograms();
772                last_reset = Instant::now();
773            }
774        }
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781
782    fn fresh() -> Stats {
783        Stats::new(
784            ServiceInfo {
785                source: "node".into(),
786                version: "0.0.1".into(),
787                rack: "r".into(),
788                dc: "d".into(),
789            },
790            PoolStats::new("dyn_o_mite"),
791            ServerStats::new("redis"),
792        )
793    }
794
795    #[test]
796    fn counter_incr_and_get() {
797        let s = fresh();
798        s.pool_incr(PoolField::ClientEof);
799        s.pool_incr(PoolField::ClientEof);
800        assert_eq!(s.pool_get(PoolField::ClientEof), 2);
801    }
802
803    #[test]
804    fn gauge_set_and_decrement() {
805        let s = fresh();
806        s.pool_set(PoolField::ClientConnections, 5);
807        s.pool_decr(PoolField::ClientConnections);
808        assert_eq!(s.pool_get(PoolField::ClientConnections), 4);
809    }
810
811    #[test]
812    fn server_metric_round_trip() {
813        let s = fresh();
814        s.server_incr_by(ServerField::ReadRequests, 42);
815        s.server_set(ServerField::ServerEjectedAt, 1_700_000_000);
816        assert_eq!(s.server_get(ServerField::ReadRequests), 42);
817        assert_eq!(s.server_get(ServerField::ServerEjectedAt), 1_700_000_000);
818    }
819
820    #[test]
821    fn snapshot_reflects_writes() {
822        let s = fresh();
823        s.pool_incr(PoolField::StatsCount);
824        s.record_latency(Latency::Request, 100);
825        s.record_payload_size(2048);
826        let snap = s.snapshot();
827        assert_eq!(snap.pool.metrics[PoolField::StatsCount.index()], 1);
828        assert_eq!(snap.latency.max, 100);
829        assert!(snap.payload_size.max >= 2048);
830    }
831
832    #[test]
833    fn metric_indexes_have_canonical_order() {
834        for (i, variant) in PoolField::ALL.iter().enumerate() {
835            assert_eq!(variant.index(), i);
836        }
837    }
838}