Skip to main content

hyperi_rustlib/metrics/dfe_groups/
app.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/dfe_groups/app.rs
3// Purpose:   Mandatory app-level DFE metrics
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Mandatory app-level metrics for every DFE application.
10
11use metrics::{Counter, Gauge};
12
13use super::super::MetricsManager;
14use super::super::manifest::{MetricDescriptor, MetricType};
15
16/// Mandatory metrics for every DFE application.
17///
18/// Registers `info`, `start_time_seconds`, record counters, byte counters,
19/// memory gauges, and config reload counter -- all prefixed with the
20/// `MetricsManager` namespace.
21#[derive(Clone)]
22pub struct AppMetrics {
23    pub records_received: Counter,
24    pub records_processed: Counter,
25    pub records_error: Counter,
26    pub bytes_received: Counter,
27    pub bytes_written: Counter,
28    pub memory_used_bytes: Gauge,
29    pub memory_limit_bytes: Gauge,
30    pub config_reloads_success: Counter,
31    pub config_reloads_error: Counter,
32}
33
34impl AppMetrics {
35    /// Create and register app metrics.
36    ///
37    /// `version` and `commit` are emitted as labels on the `info` gauge.
38    #[must_use]
39    pub fn new(manager: &MetricsManager, version: &str, commit: &str) -> Self {
40        manager.set_build_info(version, commit);
41
42        // Info metric for service discovery
43        let ns = manager.namespace();
44        let info_name = if ns.is_empty() {
45            "info".to_string()
46        } else {
47            format!("{ns}_info")
48        };
49        metrics::describe_gauge!(info_name.clone(), "Application info for service discovery");
50        metrics::gauge!(
51            info_name.clone(),
52            "version" => version.to_string(),
53            "commit" => commit.to_string()
54        )
55        .set(1.0);
56        manager.registry().push(MetricDescriptor {
57            name: info_name,
58            metric_type: MetricType::Gauge,
59            description: "Application info for service discovery".into(),
60            unit: String::new(),
61            labels: vec!["version".into(), "commit".into()],
62            group: "app".into(),
63            buckets: None,
64            use_cases: vec![],
65            dashboard_hint: None,
66        });
67
68        // Start time
69        let start_time = manager.gauge_with_labels(
70            "start_time_seconds",
71            "Unix timestamp of process start",
72            &[],
73            "app",
74        );
75        start_time.set(
76            std::time::SystemTime::now()
77                .duration_since(std::time::UNIX_EPOCH)
78                .map_or(0.0, |d| d.as_secs_f64()),
79        );
80
81        // config_reloads_total -- label-based, register descriptor manually
82        let config_key = if ns.is_empty() {
83            "config_reloads_total".to_string()
84        } else {
85            format!("{ns}_config_reloads_total")
86        };
87        metrics::describe_counter!(config_key.clone(), "Config reload attempts");
88        manager.registry().push(MetricDescriptor {
89            name: config_key.clone(),
90            metric_type: MetricType::Counter,
91            description: "Config reload attempts".into(),
92            unit: String::new(),
93            labels: vec!["result".into()],
94            group: "app".into(),
95            buckets: None,
96            use_cases: vec![],
97            dashboard_hint: None,
98        });
99
100        Self {
101            records_received: manager.counter_with_labels(
102                "records_received_total",
103                "Records received from source",
104                &[],
105                "app",
106            ),
107            records_processed: manager.counter_with_labels(
108                "records_processed_total",
109                "Records successfully processed",
110                &[],
111                "app",
112            ),
113            records_error: manager.counter_with_labels(
114                "records_error_total",
115                "Records that failed processing",
116                &[],
117                "app",
118            ),
119            bytes_received: manager.counter_with_labels(
120                "bytes_received_total",
121                "Bytes received from source",
122                &[],
123                "app",
124            ),
125            bytes_written: manager.counter_with_labels(
126                "bytes_written_total",
127                "Bytes written to sink",
128                &[],
129                "app",
130            ),
131            memory_used_bytes: manager.gauge_with_labels(
132                "memory_used_bytes",
133                "Current memory usage (cgroup-aware)",
134                &[],
135                "app",
136            ),
137            memory_limit_bytes: manager.gauge_with_labels(
138                "memory_limit_bytes",
139                "Effective memory limit",
140                &[],
141                "app",
142            ),
143            config_reloads_success: metrics::counter!(config_key.clone(), "result" => "success"),
144            config_reloads_error: metrics::counter!(config_key, "result" => "error"),
145        }
146    }
147
148    #[inline]
149    pub fn record_received(&self, count: u64) {
150        self.records_received.increment(count);
151    }
152
153    #[inline]
154    pub fn record_processed(&self, count: u64) {
155        self.records_processed.increment(count);
156    }
157
158    #[inline]
159    pub fn record_error(&self, count: u64) {
160        self.records_error.increment(count);
161    }
162
163    #[inline]
164    pub fn record_bytes_received(&self, bytes: u64) {
165        self.bytes_received.increment(bytes);
166    }
167
168    #[inline]
169    pub fn record_bytes_written(&self, bytes: u64) {
170        self.bytes_written.increment(bytes);
171    }
172
173    #[inline]
174    pub fn set_memory(&self, used: u64, limit: u64) {
175        self.memory_used_bytes.set(used as f64);
176        self.memory_limit_bytes.set(limit as f64);
177    }
178
179    #[inline]
180    pub fn record_config_reload(&self, success: bool) {
181        if success {
182            self.config_reloads_success.increment(1);
183        } else {
184            self.config_reloads_error.increment(1);
185        }
186    }
187}