Skip to main content

hyperi_rustlib/metrics/dfe_groups/
buffer.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/dfe_groups/buffer.rs
3// Purpose:   DFE buffer metrics group
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Buffer metrics for apps with batching (receiver, loader, archiver).
10
11use metrics::{Counter, Gauge, Histogram};
12
13use super::super::MetricsManager;
14use super::super::manifest::{MetricDescriptor, MetricType};
15
16/// Default histogram buckets for buffer flush duration.
17const BUFFER_FLUSH_BUCKETS: &[f64] = &[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
18
19/// Buffer metrics for DFE apps with batching.
20///
21/// Tracks buffer depth, flush operations, and flush trigger reasons.
22#[derive(Clone)]
23pub struct BufferMetrics {
24    pub buffer_bytes: Gauge,
25    pub buffer_records: Gauge,
26    pub buffer_flush: Counter,
27    pub buffer_flush_duration: Histogram,
28    namespace: String,
29}
30
31impl BufferMetrics {
32    #[must_use]
33    pub fn new(manager: &MetricsManager) -> Self {
34        let ns = manager.namespace();
35
36        // buffer_flush_trigger_total -- label-based, register descriptor manually
37        let trigger_key = if ns.is_empty() {
38            "buffer_flush_trigger_total".to_string()
39        } else {
40            format!("{ns}_buffer_flush_trigger_total")
41        };
42        metrics::describe_counter!(trigger_key.clone(), "Buffer flush trigger reason");
43        manager.registry().push(MetricDescriptor {
44            name: trigger_key,
45            metric_type: MetricType::Counter,
46            description: "Buffer flush trigger reason".into(),
47            unit: String::new(),
48            labels: vec!["trigger".into()],
49            group: "buffer".into(),
50            buckets: None,
51            use_cases: vec![],
52            dashboard_hint: None,
53        });
54
55        Self {
56            buffer_bytes: manager.gauge_with_labels(
57                "buffer_bytes",
58                "Current buffer size in bytes",
59                &[],
60                "buffer",
61            ),
62            buffer_records: manager.gauge_with_labels(
63                "buffer_records",
64                "Current buffered record count",
65                &[],
66                "buffer",
67            ),
68            buffer_flush: manager.counter_with_labels(
69                "buffer_flush_total",
70                "Buffer flush operations",
71                &[],
72                "buffer",
73            ),
74            buffer_flush_duration: manager.histogram_with_labels(
75                "buffer_flush_duration_seconds",
76                "Buffer flush latency",
77                &[],
78                "buffer",
79                Some(BUFFER_FLUSH_BUCKETS),
80            ),
81            namespace: ns.to_string(),
82        }
83    }
84
85    #[inline]
86    pub fn set_buffer(&self, bytes: usize, records: usize) {
87        self.buffer_bytes.set(bytes as f64);
88        self.buffer_records.set(records as f64);
89    }
90
91    /// Record a flush with its duration and trigger reason.
92    #[inline]
93    pub fn record_flush(&self, duration_secs: f64, trigger: crate::metrics::FlushTrigger) {
94        self.buffer_flush.increment(1);
95        self.buffer_flush_duration.record(duration_secs);
96        let key = if self.namespace.is_empty() {
97            "buffer_flush_trigger_total".to_string()
98        } else {
99            format!("{}_buffer_flush_trigger_total", self.namespace)
100        };
101        metrics::counter!(key, "trigger" => trigger.as_label()).increment(1);
102    }
103}