hyperi_rustlib/metrics/dfe_groups/
app.rs1use metrics::{Counter, Gauge};
12
13use super::super::MetricsManager;
14use super::super::manifest::{MetricDescriptor, MetricType};
15
16#[derive(Clone)]
22pub struct AppMetrics {
23 pub records_received: Counter,
24 pub records_processed: Counter,
25 pub records_error: Counter,
26 pub bytes_received: Counter,
27 pub bytes_written: Counter,
28 pub memory_used_bytes: Gauge,
29 pub memory_limit_bytes: Gauge,
30 pub config_reloads_success: Counter,
31 pub config_reloads_error: Counter,
32}
33
34impl AppMetrics {
35 #[must_use]
39 pub fn new(manager: &MetricsManager, version: &str, commit: &str) -> Self {
40 manager.set_build_info(version, commit);
41
42 let ns = manager.namespace();
44 let info_name = if ns.is_empty() {
45 "info".to_string()
46 } else {
47 format!("{ns}_info")
48 };
49 metrics::describe_gauge!(info_name.clone(), "Application info for service discovery");
50 metrics::gauge!(
51 info_name.clone(),
52 "version" => version.to_string(),
53 "commit" => commit.to_string()
54 )
55 .set(1.0);
56 manager.registry().push(MetricDescriptor {
57 name: info_name,
58 metric_type: MetricType::Gauge,
59 description: "Application info for service discovery".into(),
60 unit: String::new(),
61 labels: vec!["version".into(), "commit".into()],
62 group: "app".into(),
63 buckets: None,
64 use_cases: vec![],
65 dashboard_hint: None,
66 });
67
68 let start_time = manager.gauge_with_labels(
70 "start_time_seconds",
71 "Unix timestamp of process start",
72 &[],
73 "app",
74 );
75 start_time.set(
76 std::time::SystemTime::now()
77 .duration_since(std::time::UNIX_EPOCH)
78 .map_or(0.0, |d| d.as_secs_f64()),
79 );
80
81 let config_key = if ns.is_empty() {
83 "config_reloads_total".to_string()
84 } else {
85 format!("{ns}_config_reloads_total")
86 };
87 metrics::describe_counter!(config_key.clone(), "Config reload attempts");
88 manager.registry().push(MetricDescriptor {
89 name: config_key.clone(),
90 metric_type: MetricType::Counter,
91 description: "Config reload attempts".into(),
92 unit: String::new(),
93 labels: vec!["result".into()],
94 group: "app".into(),
95 buckets: None,
96 use_cases: vec![],
97 dashboard_hint: None,
98 });
99
100 Self {
101 records_received: manager.counter_with_labels(
102 "records_received_total",
103 "Records received from source",
104 &[],
105 "app",
106 ),
107 records_processed: manager.counter_with_labels(
108 "records_processed_total",
109 "Records successfully processed",
110 &[],
111 "app",
112 ),
113 records_error: manager.counter_with_labels(
114 "records_error_total",
115 "Records that failed processing",
116 &[],
117 "app",
118 ),
119 bytes_received: manager.counter_with_labels(
120 "bytes_received_total",
121 "Bytes received from source",
122 &[],
123 "app",
124 ),
125 bytes_written: manager.counter_with_labels(
126 "bytes_written_total",
127 "Bytes written to sink",
128 &[],
129 "app",
130 ),
131 memory_used_bytes: manager.gauge_with_labels(
132 "memory_used_bytes",
133 "Current memory usage (cgroup-aware)",
134 &[],
135 "app",
136 ),
137 memory_limit_bytes: manager.gauge_with_labels(
138 "memory_limit_bytes",
139 "Effective memory limit",
140 &[],
141 "app",
142 ),
143 config_reloads_success: metrics::counter!(config_key.clone(), "result" => "success"),
144 config_reloads_error: metrics::counter!(config_key, "result" => "error"),
145 }
146 }
147
148 #[inline]
149 pub fn record_received(&self, count: u64) {
150 self.records_received.increment(count);
151 }
152
153 #[inline]
154 pub fn record_processed(&self, count: u64) {
155 self.records_processed.increment(count);
156 }
157
158 #[inline]
159 pub fn record_error(&self, count: u64) {
160 self.records_error.increment(count);
161 }
162
163 #[inline]
164 pub fn record_bytes_received(&self, bytes: u64) {
165 self.bytes_received.increment(bytes);
166 }
167
168 #[inline]
169 pub fn record_bytes_written(&self, bytes: u64) {
170 self.bytes_written.increment(bytes);
171 }
172
173 #[inline]
174 pub fn set_memory(&self, used: u64, limit: u64) {
175 self.memory_used_bytes.set(used as f64);
176 self.memory_limit_bytes.set(limit as f64);
177 }
178
179 #[inline]
180 pub fn record_config_reload(&self, success: bool) {
181 if success {
182 self.config_reloads_success.increment(1);
183 } else {
184 self.config_reloads_error.increment(1);
185 }
186 }
187}