dynomite/stats/mod.rs
1//! Pool, server, and peer metrics with histograms and a JSON snapshot.
2//!
3//! The stats subsystem is split into small modules:
4//!
5//! * [`Histogram`] - Cassandra-style estimated histogram.
6//! * [`PoolField`] / [`ServerField`] - typed metric handles.
7//! * [`Snapshot`] - aggregate value rendered to JSON.
8//! * [`StatsServer`] - REST endpoint serving the latest snapshot.
9//!
10//! [`Stats`] glues the pieces together: a writer accumulates counters,
11//! gauges, and histogram observations; a periodic aggregator publishes
12//! a fresh [`Snapshot`] that the REST endpoint serves.
13
14mod codec;
15mod failure;
16mod histogram;
17mod numeric;
18mod prometheus;
19mod rest;
20mod snapshot;
21
22use std::sync::Arc;
23use std::time::{SystemTime, UNIX_EPOCH};
24
25use parking_lot::Mutex;
26use tokio::time::{Duration, Instant};
27use tokio_util::sync::CancellationToken;
28
29pub use crate::stats::codec::{
30 MetricSpec, PoolField, ServerField, StatsMetricType, POOL_CODEC, SERVER_CODEC,
31};
32pub use crate::stats::failure::{
33 FailureMetrics, FailureSnapshot, NoTargetsEntry, PeerEntry, PeerStateEntry, PhiEntry,
34 TimeoutEntry, TransitionEntry,
35};
36pub use crate::stats::histogram::{Histogram, BUCKET_COUNT};
37pub use crate::stats::prometheus::render_prometheus;
38pub use crate::stats::rest::{ClusterInfoProvider, StatsServer, MAX_HEADERS, MAX_REQUEST_BYTES};
39pub use crate::stats::snapshot::{
40 describe_stats, HistogramSummary, PeerStats, PoolStats, ServerStats, ServiceInfo, Snapshot,
41};
42
43/// Live, mutable counters and histograms for a single engine instance.
44///
45/// `Stats` is the writer side; readers consume frozen [`Snapshot`]
46/// values produced by [`Stats::snapshot`].
47///
48/// # Examples
49///
50/// ```
51/// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
52/// let stats = Stats::new(
53/// ServiceInfo::default(),
54/// PoolStats::new("dyn_o_mite"),
55/// ServerStats::new("redis"),
56/// );
57/// assert_eq!(stats.snapshot().pool.name, "dyn_o_mite");
58/// ```
59#[derive(Debug)]
60pub struct Stats {
61 inner: Arc<Mutex<StatsInner>>,
62 failure: Arc<FailureMetrics>,
63 started: Instant,
64}
65
66#[derive(Debug)]
67struct StatsInner {
68 info: ServiceInfo,
69 pool: PoolStats,
70 server: ServerStats,
71 latency: Histogram,
72 payload_size: Histogram,
73 cross_region_latency: Histogram,
74 cross_zone_latency: Histogram,
75 server_latency: Histogram,
76 cross_region_queue_wait: Histogram,
77 cross_zone_queue_wait: Histogram,
78 server_queue_wait: Histogram,
79 client_out_queue: Histogram,
80 server_in_queue: Histogram,
81 server_out_queue: Histogram,
82 dnode_client_out_queue: Histogram,
83 peer_in_queue: Histogram,
84 peer_out_queue: Histogram,
85 remote_peer_in_queue: Histogram,
86 remote_peer_out_queue: Histogram,
87 alloc_msgs: i64,
88 free_msgs: i64,
89 alloc_mbufs: i64,
90 free_mbufs: i64,
91 dyn_memory: i64,
92}
93
94impl StatsInner {
95 fn new(info: ServiceInfo, pool: PoolStats, server: ServerStats) -> Self {
96 Self {
97 info,
98 pool,
99 server,
100 latency: Histogram::new(),
101 payload_size: Histogram::new(),
102 cross_region_latency: Histogram::new(),
103 cross_zone_latency: Histogram::new(),
104 server_latency: Histogram::new(),
105 cross_region_queue_wait: Histogram::new(),
106 cross_zone_queue_wait: Histogram::new(),
107 server_queue_wait: Histogram::new(),
108 client_out_queue: Histogram::new(),
109 server_in_queue: Histogram::new(),
110 server_out_queue: Histogram::new(),
111 dnode_client_out_queue: Histogram::new(),
112 peer_in_queue: Histogram::new(),
113 peer_out_queue: Histogram::new(),
114 remote_peer_in_queue: Histogram::new(),
115 remote_peer_out_queue: Histogram::new(),
116 alloc_msgs: 0,
117 free_msgs: 0,
118 alloc_mbufs: 0,
119 free_mbufs: 0,
120 dyn_memory: 0,
121 }
122 }
123}
124
125/// Channels used to mutate histogram observations.
126///
127/// # Examples
128///
129/// ```
130/// use dynomite::stats::Latency;
131/// assert_ne!(Latency::Request, Latency::Server);
132/// assert_eq!(Latency::Request, Latency::Request);
133/// // The variant set is small and copy-able.
134/// let copied = Latency::CrossRegion;
135/// assert_eq!(copied, Latency::CrossRegion);
136/// ```
137#[derive(Copy, Clone, Eq, PartialEq, Debug)]
138pub enum Latency {
139 /// Top-level request latency.
140 Request,
141 /// Cross-region peer round-trip time.
142 CrossRegion,
143 /// Cross-zone peer latency.
144 CrossZone,
145 /// Backing-server response latency.
146 Server,
147}
148
149/// Channels used for queue-wait-time observations.
150///
151/// # Examples
152///
153/// ```
154/// use dynomite::stats::QueueWait;
155/// assert_ne!(QueueWait::CrossRegion, QueueWait::CrossZone);
156/// assert_ne!(QueueWait::CrossZone, QueueWait::Server);
157/// // Variants implement Copy, so a binding survives a move.
158/// let original = QueueWait::Server;
159/// let copy = original;
160/// assert_eq!(original, copy);
161/// ```
162#[derive(Copy, Clone, Eq, PartialEq, Debug)]
163pub enum QueueWait {
164 /// Cross-region queue wait time.
165 CrossRegion,
166 /// Cross-zone queue wait time.
167 CrossZone,
168 /// Backing-server queue wait time.
169 Server,
170}
171
172/// Channels used for queue-length observations (observed at sample
173/// time, not events).
174///
175/// # Examples
176///
177/// ```
178/// use dynomite::stats::QueueGauge;
179/// // Each variant is distinct so the dispatch in `record_queue_len`
180/// // routes to a unique histogram.
181/// let all = [
182/// QueueGauge::ClientOut,
183/// QueueGauge::ServerIn,
184/// QueueGauge::ServerOut,
185/// QueueGauge::DnodeClientOut,
186/// QueueGauge::PeerIn,
187/// QueueGauge::PeerOut,
188/// QueueGauge::RemotePeerIn,
189/// QueueGauge::RemotePeerOut,
190/// ];
191/// for (i, lhs) in all.iter().enumerate() {
192/// for rhs in &all[i + 1..] {
193/// assert_ne!(lhs, rhs);
194/// }
195/// }
196/// ```
197#[derive(Copy, Clone, Eq, PartialEq, Debug)]
198pub enum QueueGauge {
199 /// Client out-queue length.
200 ClientOut,
201 /// Server in-queue length.
202 ServerIn,
203 /// Server out-queue length.
204 ServerOut,
205 /// Dnode client out-queue length.
206 DnodeClientOut,
207 /// Local-DC peer in-queue length.
208 PeerIn,
209 /// Local-DC peer out-queue length.
210 PeerOut,
211 /// Remote-DC peer in-queue length.
212 RemotePeerIn,
213 /// Remote-DC peer out-queue length.
214 RemotePeerOut,
215}
216
217impl Stats {
218 /// Construct a new `Stats` with empty counters and histograms.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
224 ///
225 /// let info = ServiceInfo {
226 /// source: "node-a".into(),
227 /// version: "0.0.1".into(),
228 /// rack: "r1".into(),
229 /// dc: "dc1".into(),
230 /// };
231 /// let stats = Stats::new(
232 /// info,
233 /// PoolStats::new("dyn_o_mite"),
234 /// ServerStats::new("redis_local"),
235 /// );
236 /// let snap = stats.snapshot();
237 /// assert_eq!(snap.pool.name, "dyn_o_mite");
238 /// ```
239 pub fn new(info: ServiceInfo, pool: PoolStats, server: ServerStats) -> Self {
240 Self {
241 inner: Arc::new(Mutex::new(StatsInner::new(info, pool, server))),
242 failure: Arc::new(FailureMetrics::new()),
243 started: Instant::now(),
244 }
245 }
246
247 /// Borrow the failure-cause metrics handle.
248 ///
249 /// The dispatcher and the gossip handler clone this `Arc`
250 /// so they can record per-cause errors and per-peer state
251 /// transitions. The handle is created at construction time
252 /// and lives for the lifetime of the [`Stats`] value.
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
258 /// let s = Stats::new(
259 /// ServiceInfo::default(),
260 /// PoolStats::new("p"),
261 /// ServerStats::new("s"),
262 /// );
263 /// let m = s.failure_metrics();
264 /// assert!(m.snapshot().is_empty());
265 /// ```
266 #[must_use]
267 pub fn failure_metrics(&self) -> Arc<FailureMetrics> {
268 self.failure.clone()
269 }
270
271 /// Record a latency observation in the matching histogram.
272 ///
273 /// # Examples
274 ///
275 /// ```
276 /// use dynomite::stats::{Latency, PoolStats, ServerStats, ServiceInfo, Stats};
277 /// let stats = Stats::new(
278 /// ServiceInfo::default(),
279 /// PoolStats::new("p"),
280 /// ServerStats::new("s"),
281 /// );
282 /// stats.record_latency(Latency::Request, 100);
283 /// ```
284 pub fn record_latency(&self, channel: Latency, value: u64) {
285 let mut inner = self.inner.lock();
286 match channel {
287 Latency::Request => inner.latency.record(value),
288 Latency::CrossRegion => inner.cross_region_latency.record(value),
289 Latency::CrossZone => inner.cross_zone_latency.record(value),
290 Latency::Server => inner.server_latency.record(value),
291 }
292 }
293
294 /// Record a payload-size observation.
295 ///
296 /// # Examples
297 ///
298 /// ```
299 /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
300 /// let stats = Stats::new(
301 /// ServiceInfo::default(),
302 /// PoolStats::new("p"),
303 /// ServerStats::new("s"),
304 /// );
305 /// stats.record_payload_size(2048);
306 /// ```
307 pub fn record_payload_size(&self, value: u64) {
308 self.inner.lock().payload_size.record(value);
309 }
310
311 /// Record a queue wait time observation.
312 ///
313 /// # Examples
314 ///
315 /// ```
316 /// use dynomite::stats::{PoolStats, QueueWait, ServerStats, ServiceInfo, Stats};
317 /// let stats = Stats::new(
318 /// ServiceInfo::default(),
319 /// PoolStats::new("p"),
320 /// ServerStats::new("s"),
321 /// );
322 /// stats.record_queue_wait(QueueWait::Server, 12);
323 /// ```
324 pub fn record_queue_wait(&self, channel: QueueWait, value: u64) {
325 let mut inner = self.inner.lock();
326 match channel {
327 QueueWait::CrossRegion => inner.cross_region_queue_wait.record(value),
328 QueueWait::CrossZone => inner.cross_zone_queue_wait.record(value),
329 QueueWait::Server => inner.server_queue_wait.record(value),
330 }
331 }
332
333 /// Record a queue-length sample.
334 ///
335 /// # Examples
336 ///
337 /// ```
338 /// use dynomite::stats::{PoolStats, QueueGauge, ServerStats, ServiceInfo, Stats};
339 /// let stats = Stats::new(
340 /// ServiceInfo::default(),
341 /// PoolStats::new("p"),
342 /// ServerStats::new("s"),
343 /// );
344 /// stats.record_queue_len(QueueGauge::ClientOut, 4);
345 /// ```
346 pub fn record_queue_len(&self, channel: QueueGauge, value: u64) {
347 let mut inner = self.inner.lock();
348 match channel {
349 QueueGauge::ClientOut => inner.client_out_queue.record(value),
350 QueueGauge::ServerIn => inner.server_in_queue.record(value),
351 QueueGauge::ServerOut => inner.server_out_queue.record(value),
352 QueueGauge::DnodeClientOut => inner.dnode_client_out_queue.record(value),
353 QueueGauge::PeerIn => inner.peer_in_queue.record(value),
354 QueueGauge::PeerOut => inner.peer_out_queue.record(value),
355 QueueGauge::RemotePeerIn => inner.remote_peer_in_queue.record(value),
356 QueueGauge::RemotePeerOut => inner.remote_peer_out_queue.record(value),
357 }
358 }
359
360 /// Increment a pool counter or gauge by one.
361 ///
362 /// # Examples
363 ///
364 /// ```
365 /// use dynomite::stats::{PoolField, PoolStats, ServerStats, ServiceInfo, Stats};
366 /// let stats = Stats::new(
367 /// ServiceInfo::default(),
368 /// PoolStats::new("p"),
369 /// ServerStats::new("s"),
370 /// );
371 /// stats.pool_incr(PoolField::ClientEof);
372 /// assert_eq!(stats.pool_get(PoolField::ClientEof), 1);
373 /// ```
374 pub fn pool_incr(&self, field: PoolField) {
375 self.pool_incr_by(field, 1);
376 }
377
378 /// Decrement a pool gauge by one.
379 ///
380 /// # Examples
381 ///
382 /// ```
383 /// use dynomite::stats::{PoolField, PoolStats, ServerStats, ServiceInfo, Stats};
384 /// let stats = Stats::new(
385 /// ServiceInfo::default(),
386 /// PoolStats::new("p"),
387 /// ServerStats::new("s"),
388 /// );
389 /// stats.pool_set(PoolField::ClientConnections, 5);
390 /// stats.pool_decr(PoolField::ClientConnections);
391 /// assert_eq!(stats.pool_get(PoolField::ClientConnections), 4);
392 /// ```
393 pub fn pool_decr(&self, field: PoolField) {
394 self.pool_incr_by(field, -1);
395 }
396
397 /// Add `delta` to a pool counter or gauge.
398 ///
399 /// Wraps on overflow to mirror the reference engine's `++` / `+=`
400 /// semantics. Counters are 64-bit signed and never reach the wrap
401 /// boundary under realistic workloads.
402 pub fn pool_incr_by(&self, field: PoolField, delta: i64) {
403 let mut inner = self.inner.lock();
404 let slot = &mut inner.pool.metrics[field.index()];
405 *slot = slot.wrapping_add(delta);
406 }
407
408 /// Set a pool gauge or timestamp to an absolute value.
409 ///
410 /// # Examples
411 ///
412 /// ```
413 /// use dynomite::stats::{PoolField, PoolStats, ServerStats, ServiceInfo, Stats};
414 /// let stats = Stats::new(
415 /// ServiceInfo::default(),
416 /// PoolStats::new("p"),
417 /// ServerStats::new("s"),
418 /// );
419 /// stats.pool_set(PoolField::PeerEjectedAt, 1_700_000_000);
420 /// assert_eq!(stats.pool_get(PoolField::PeerEjectedAt), 1_700_000_000);
421 /// ```
422 pub fn pool_set(&self, field: PoolField, value: i64) {
423 self.inner.lock().pool.metrics[field.index()] = value;
424 }
425
426 /// Read the current value of a pool metric.
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// use dynomite::stats::{PoolField, PoolStats, ServerStats, ServiceInfo, Stats};
432 /// let stats = Stats::new(
433 /// ServiceInfo::default(),
434 /// PoolStats::new("p"),
435 /// ServerStats::new("s"),
436 /// );
437 /// assert_eq!(stats.pool_get(PoolField::ClientEof), 0);
438 /// ```
439 pub fn pool_get(&self, field: PoolField) -> i64 {
440 self.inner.lock().pool.metrics[field.index()]
441 }
442
443 /// Increment a server counter or gauge by one.
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// use dynomite::stats::{PoolStats, ServerField, ServerStats, ServiceInfo, Stats};
449 /// let stats = Stats::new(
450 /// ServiceInfo::default(),
451 /// PoolStats::new("p"),
452 /// ServerStats::new("s"),
453 /// );
454 /// stats.server_incr(ServerField::ReadRequests);
455 /// assert_eq!(stats.server_get(ServerField::ReadRequests), 1);
456 /// ```
457 pub fn server_incr(&self, field: ServerField) {
458 self.server_incr_by(field, 1);
459 }
460
461 /// Decrement a server gauge by one.
462 ///
463 /// # Examples
464 ///
465 /// ```
466 /// use dynomite::stats::{PoolStats, ServerField, ServerStats, ServiceInfo, Stats};
467 /// let stats = Stats::new(
468 /// ServiceInfo::default(),
469 /// PoolStats::new("p"),
470 /// ServerStats::new("s"),
471 /// );
472 /// stats.server_set(ServerField::InQueue, 3);
473 /// stats.server_decr(ServerField::InQueue);
474 /// assert_eq!(stats.server_get(ServerField::InQueue), 2);
475 /// ```
476 pub fn server_decr(&self, field: ServerField) {
477 self.server_incr_by(field, -1);
478 }
479
480 /// Add `delta` to a server counter or gauge.
481 ///
482 /// Wraps on overflow to mirror the reference engine's `++` / `+=`
483 /// semantics. Counters are 64-bit signed and never reach the wrap
484 /// boundary under realistic workloads.
485 pub fn server_incr_by(&self, field: ServerField, delta: i64) {
486 let mut inner = self.inner.lock();
487 let slot = &mut inner.server.metrics[field.index()];
488 *slot = slot.wrapping_add(delta);
489 }
490
491 /// Set a server gauge or timestamp to an absolute value.
492 ///
493 /// # Examples
494 ///
495 /// ```
496 /// use dynomite::stats::{PoolStats, ServerField, ServerStats, ServiceInfo, Stats};
497 /// let stats = Stats::new(
498 /// ServiceInfo::default(),
499 /// PoolStats::new("p"),
500 /// ServerStats::new("s"),
501 /// );
502 /// stats.server_set(ServerField::ServerEjectedAt, 1_700_000_000);
503 /// assert_eq!(stats.server_get(ServerField::ServerEjectedAt), 1_700_000_000);
504 /// ```
505 pub fn server_set(&self, field: ServerField, value: i64) {
506 self.inner.lock().server.metrics[field.index()] = value;
507 }
508
509 /// Read the current value of a server metric.
510 ///
511 /// # Examples
512 ///
513 /// ```
514 /// use dynomite::stats::{PoolStats, ServerField, ServerStats, ServiceInfo, Stats};
515 /// let stats = Stats::new(
516 /// ServiceInfo::default(),
517 /// PoolStats::new("p"),
518 /// ServerStats::new("s"),
519 /// );
520 /// assert_eq!(stats.server_get(ServerField::ReadRequests), 0);
521 /// ```
522 pub fn server_get(&self, field: ServerField) -> i64 {
523 self.inner.lock().server.metrics[field.index()]
524 }
525
526 /// Set the resource usage gauges that the reference engine samples
527 /// once per aggregation cycle.
528 ///
529 /// # Examples
530 ///
531 /// ```
532 /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
533 /// let stats = Stats::new(
534 /// ServiceInfo::default(),
535 /// PoolStats::new("p"),
536 /// ServerStats::new("s"),
537 /// );
538 /// stats.set_resource_usage(0, 0, 0, 0, 0);
539 /// assert_eq!(stats.snapshot().alloc_msgs, 0);
540 /// ```
541 pub fn set_resource_usage(
542 &self,
543 alloc_msgs: i64,
544 free_msgs: i64,
545 alloc_mbufs: i64,
546 free_mbufs: i64,
547 dyn_memory: i64,
548 ) {
549 let mut inner = self.inner.lock();
550 inner.alloc_msgs = alloc_msgs;
551 inner.free_msgs = free_msgs;
552 inner.alloc_mbufs = alloc_mbufs;
553 inner.free_mbufs = free_mbufs;
554 inner.dyn_memory = dyn_memory;
555 }
556
557 /// Build an immutable snapshot of every counter, gauge, and
558 /// histogram quantile at the current instant.
559 ///
560 /// # Examples
561 ///
562 /// ```
563 /// use dynomite::stats::{PoolStats, ServerStats, ServiceInfo, Stats};
564 /// let stats = Stats::new(
565 /// ServiceInfo::default(),
566 /// PoolStats::new("p"),
567 /// ServerStats::new("s"),
568 /// );
569 /// let snap = stats.snapshot();
570 /// assert_eq!(snap.pool.name, "p");
571 /// ```
572 pub fn snapshot(&self) -> Snapshot {
573 let inner = self.inner.lock();
574 let elapsed = self.started.elapsed();
575 let timestamp = SystemTime::now()
576 .duration_since(UNIX_EPOCH)
577 .map(|d| d.as_secs())
578 .unwrap_or(0);
579 Snapshot {
580 info: inner.info.clone(),
581 uptime: i64::try_from(elapsed.as_secs()).unwrap_or(i64::MAX),
582 timestamp: i64::try_from(timestamp).unwrap_or(i64::MAX),
583 latency: HistogramSummary::from_histogram(&inner.latency),
584 payload_size: HistogramSummary::from_histogram(&inner.payload_size),
585 cross_region_latency: HistogramSummary::from_histogram(&inner.cross_region_latency),
586 cross_zone_latency: HistogramSummary::from_histogram(&inner.cross_zone_latency),
587 server_latency: HistogramSummary::from_histogram(&inner.server_latency),
588 cross_region_queue_wait: HistogramSummary::from_histogram(
589 &inner.cross_region_queue_wait,
590 ),
591 cross_zone_queue_wait: HistogramSummary::from_histogram(&inner.cross_zone_queue_wait),
592 server_queue_wait: HistogramSummary::from_histogram(&inner.server_queue_wait),
593 client_out_queue_p99: queue_p99(&inner.client_out_queue),
594 server_in_queue_p99: queue_p99(&inner.server_in_queue),
595 server_out_queue_p99: queue_p99(&inner.server_out_queue),
596 dnode_client_out_queue_p99: queue_p99(&inner.dnode_client_out_queue),
597 peer_in_queue_p99: queue_p99(&inner.peer_in_queue),
598 peer_out_queue_p99: queue_p99(&inner.peer_out_queue),
599 remote_peer_in_queue_p99: queue_p99(&inner.remote_peer_in_queue),
600 remote_peer_out_queue_p99: queue_p99(&inner.remote_peer_out_queue),
601 alloc_msgs: inner.alloc_msgs,
602 free_msgs: inner.free_msgs,
603 alloc_mbufs: inner.alloc_mbufs,
604 free_mbufs: inner.free_mbufs,
605 dyn_memory: inner.dyn_memory,
606 pool: inner.pool.clone(),
607 server: inner.server.clone(),
608 failure: self.failure.snapshot(),
609 }
610 }
611
612 /// Reset every histogram. The reference engine does this every
613 /// five minutes from inside the aggregation loop.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// use dynomite::stats::{Latency, PoolStats, ServerStats, ServiceInfo, Stats};
619 /// let stats = Stats::new(
620 /// ServiceInfo::default(),
621 /// PoolStats::new("p"),
622 /// ServerStats::new("s"),
623 /// );
624 /// stats.record_latency(Latency::Request, 50);
625 /// stats.reset_histograms();
626 /// assert_eq!(stats.snapshot().latency.max, 0);
627 /// ```
628 pub fn reset_histograms(&self) {
629 let mut inner = self.inner.lock();
630 inner.latency.reset();
631 inner.payload_size.reset();
632 inner.cross_region_latency.reset();
633 inner.cross_zone_latency.reset();
634 inner.server_latency.reset();
635 inner.cross_region_queue_wait.reset();
636 inner.cross_zone_queue_wait.reset();
637 inner.server_queue_wait.reset();
638 inner.client_out_queue.reset();
639 inner.server_in_queue.reset();
640 inner.server_out_queue.reset();
641 inner.dnode_client_out_queue.reset();
642 inner.peer_in_queue.reset();
643 inner.peer_out_queue.reset();
644 inner.remote_peer_in_queue.reset();
645 inner.remote_peer_out_queue.reset();
646 }
647}
648
649/// Returns the queue p99 from `h`, or `0` when the histogram is
650/// overflowing. The reference implementation suppresses percentile
651/// publishing in the overflow path; mirroring that keeps overflow
652/// values from leaking into the JSON output as `u64::MAX`.
653fn queue_p99(h: &Histogram) -> u64 {
654 if h.is_overflowing() {
655 0
656 } else {
657 h.percentile(0.99)
658 }
659}
660
661/// Async aggregator handle: snapshots at a fixed interval into a
662/// shared cell that the REST server reads from.
663///
664/// # Examples
665///
666/// ```no_run
667/// use std::sync::Arc;
668/// use std::time::Duration;
669/// use dynomite::stats::{Aggregator, PoolStats, ServerStats, ServiceInfo, Snapshot, Stats};
670/// use parking_lot::Mutex;
671/// use tokio_util::sync::CancellationToken;
672///
673/// # async fn _example() {
674/// let stats = Arc::new(Stats::new(
675/// ServiceInfo::default(),
676/// PoolStats::new("dyn_o_mite"),
677/// ServerStats::new("redis"),
678/// ));
679/// let sink = Arc::new(Mutex::new(Snapshot::default()));
680/// let token = CancellationToken::new();
681/// let agg = Aggregator::new(stats, sink, Duration::from_secs(1), Duration::from_secs(300));
682/// let _ = tokio::spawn({ let token = token.clone(); async move { agg.run(token).await } });
683/// token.cancel();
684/// # }
685/// ```
686pub struct Aggregator {
687 stats: Arc<Stats>,
688 sink: Arc<Mutex<Snapshot>>,
689 interval: Duration,
690 histogram_reset: Duration,
691}
692
693impl Aggregator {
694 /// Create a new aggregator. The aggregation loop reads from
695 /// `stats` and publishes to `sink` once every `interval`.
696 /// Histograms are reset every `histogram_reset` elapsed time, the
697 /// same five-minute cadence the C reference uses by default.
698 ///
699 /// # Examples
700 ///
701 /// ```
702 /// use std::sync::Arc;
703 /// use std::time::Duration;
704 /// use dynomite::stats::{Aggregator, PoolStats, ServerStats, ServiceInfo, Snapshot, Stats};
705 /// use parking_lot::Mutex;
706 ///
707 /// let stats = Arc::new(Stats::new(
708 /// ServiceInfo::default(),
709 /// PoolStats::new("dyn_o_mite"),
710 /// ServerStats::new("redis"),
711 /// ));
712 /// let sink = Arc::new(Mutex::new(Snapshot::default()));
713 /// let _agg = Aggregator::new(stats, sink, Duration::from_secs(1), Duration::from_secs(300));
714 /// ```
715 pub fn new(
716 stats: Arc<Stats>,
717 sink: Arc<Mutex<Snapshot>>,
718 interval: Duration,
719 histogram_reset: Duration,
720 ) -> Self {
721 Self {
722 stats,
723 sink,
724 interval,
725 histogram_reset,
726 }
727 }
728
729 /// Run the aggregation loop until `cancel` is triggered. The future
730 /// returns `()` after observing cancellation; callers that want a
731 /// clean shutdown should clone the token and call
732 /// [`CancellationToken::cancel`] on it.
733 ///
734 /// # Examples
735 ///
736 /// ```no_run
737 /// use std::sync::Arc;
738 /// use std::time::Duration;
739 /// use dynomite::stats::{Aggregator, PoolStats, ServerStats, ServiceInfo, Snapshot, Stats};
740 /// use parking_lot::Mutex;
741 /// use tokio_util::sync::CancellationToken;
742 ///
743 /// # async fn _example() {
744 /// let stats = Arc::new(Stats::new(
745 /// ServiceInfo::default(),
746 /// PoolStats::new("dyn_o_mite"),
747 /// ServerStats::new("redis"),
748 /// ));
749 /// let sink = Arc::new(Mutex::new(Snapshot::default()));
750 /// let token = CancellationToken::new();
751 /// let agg = Aggregator::new(stats, sink, Duration::from_secs(1), Duration::from_secs(300));
752 /// let cancel = token.clone();
753 /// let handle = tokio::spawn(async move { agg.run(cancel).await });
754 /// token.cancel();
755 /// let _ = handle.await;
756 /// # }
757 /// ```
758 pub async fn run(self, cancel: CancellationToken) {
759 let mut ticker = tokio::time::interval(self.interval);
760 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
761 let mut last_reset = Instant::now();
762 loop {
763 tokio::select! {
764 biased;
765 () = cancel.cancelled() => return,
766 _ = ticker.tick() => {}
767 }
768 let snap = self.stats.snapshot();
769 *self.sink.lock() = snap;
770 if last_reset.elapsed() >= self.histogram_reset {
771 self.stats.reset_histograms();
772 last_reset = Instant::now();
773 }
774 }
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781
782 fn fresh() -> Stats {
783 Stats::new(
784 ServiceInfo {
785 source: "node".into(),
786 version: "0.0.1".into(),
787 rack: "r".into(),
788 dc: "d".into(),
789 },
790 PoolStats::new("dyn_o_mite"),
791 ServerStats::new("redis"),
792 )
793 }
794
795 #[test]
796 fn counter_incr_and_get() {
797 let s = fresh();
798 s.pool_incr(PoolField::ClientEof);
799 s.pool_incr(PoolField::ClientEof);
800 assert_eq!(s.pool_get(PoolField::ClientEof), 2);
801 }
802
803 #[test]
804 fn gauge_set_and_decrement() {
805 let s = fresh();
806 s.pool_set(PoolField::ClientConnections, 5);
807 s.pool_decr(PoolField::ClientConnections);
808 assert_eq!(s.pool_get(PoolField::ClientConnections), 4);
809 }
810
811 #[test]
812 fn server_metric_round_trip() {
813 let s = fresh();
814 s.server_incr_by(ServerField::ReadRequests, 42);
815 s.server_set(ServerField::ServerEjectedAt, 1_700_000_000);
816 assert_eq!(s.server_get(ServerField::ReadRequests), 42);
817 assert_eq!(s.server_get(ServerField::ServerEjectedAt), 1_700_000_000);
818 }
819
820 #[test]
821 fn snapshot_reflects_writes() {
822 let s = fresh();
823 s.pool_incr(PoolField::StatsCount);
824 s.record_latency(Latency::Request, 100);
825 s.record_payload_size(2048);
826 let snap = s.snapshot();
827 assert_eq!(snap.pool.metrics[PoolField::StatsCount.index()], 1);
828 assert_eq!(snap.latency.max, 100);
829 assert!(snap.payload_size.max >= 2048);
830 }
831
832 #[test]
833 fn metric_indexes_have_canonical_order() {
834 for (i, variant) in PoolField::ALL.iter().enumerate() {
835 assert_eq!(variant.index(), i);
836 }
837 }
838}