1use serde::{Deserialize, Serialize};
64use std::collections::HashMap;
65use std::sync::Arc;
66use tokio::sync::RwLock;
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
74pub enum MetricType {
75 Counter,
77 Gauge,
79 Histogram,
81}
82
83#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct MetricName {
86 pub name: String,
88 pub labels: HashMap<String, String>,
90}
91
92impl std::hash::Hash for MetricName {
94 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
95 self.name.hash(state);
96 let mut labels: Vec<_> = self.labels.iter().collect();
98 labels.sort_by_key(|(k, _)| *k);
99 for (k, v) in labels {
100 k.hash(state);
101 v.hash(state);
102 }
103 }
104}
105
106impl MetricName {
107 pub fn new(name: impl Into<String>) -> Self {
109 Self {
110 name: name.into(),
111 labels: HashMap::new(),
112 }
113 }
114
115 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
117 self.labels.insert(key.into(), value.into());
118 self
119 }
120
121 pub fn prometheus_name(&self) -> String {
123 if self.labels.is_empty() {
124 self.name.clone()
125 } else {
126 let labels: Vec<String> = self.labels.iter().map(|(k, v)| format!("{}=\"{}\"", k, v)).collect();
127 format!("{}{{{}}}", self.name, labels.join(","))
128 }
129 }
130}
131
132#[derive(Debug, Clone)]
136pub struct Counter {
137 pub name: MetricName,
138 pub help: String,
139}
140
141impl Counter {
142 pub fn new(name: impl Into<String>, help: impl Into<String>) -> Self {
144 Self {
145 name: MetricName::new(name),
146 help: help.into(),
147 }
148 }
149
150 pub fn with_labels(name: impl Into<String>, help: impl Into<String>, labels: HashMap<String, String>) -> Self {
152 Self {
153 name: MetricName {
154 name: name.into(),
155 labels,
156 },
157 help: help.into(),
158 }
159 }
160}
161
162#[derive(Debug, Clone)]
166pub struct Gauge {
167 pub name: MetricName,
168 pub help: String,
169}
170
171impl Gauge {
172 pub fn new(name: impl Into<String>, help: impl Into<String>) -> Self {
174 Self {
175 name: MetricName::new(name),
176 help: help.into(),
177 }
178 }
179
180 pub fn with_labels(name: impl Into<String>, help: impl Into<String>, labels: HashMap<String, String>) -> Self {
182 Self {
183 name: MetricName {
184 name: name.into(),
185 labels,
186 },
187 help: help.into(),
188 }
189 }
190}
191
192#[derive(Debug, Clone)]
196pub struct Histogram {
197 pub name: MetricName,
198 pub help: String,
199 pub buckets: Vec<f64>,
200}
201
202impl Histogram {
203 pub fn new(name: impl Into<String>, help: impl Into<String>) -> Self {
205 Self {
206 name: MetricName::new(name),
207 help: help.into(),
208 buckets: default_buckets(),
209 }
210 }
211
212 pub fn with_buckets(name: impl Into<String>, help: impl Into<String>, buckets: Vec<f64>) -> Self {
214 Self {
215 name: MetricName::new(name),
216 help: help.into(),
217 buckets,
218 }
219 }
220}
221
222fn default_buckets() -> Vec<f64> {
223 vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
224}
225
226#[derive(Debug, Clone)]
231struct CounterValue {
232 value: f64,
233}
234
235#[derive(Debug, Clone)]
236struct GaugeValue {
237 value: f64,
238}
239
240#[derive(Debug, Clone)]
241struct HistogramValue {
242 count: u64,
243 sum: f64,
244 buckets: HashMap<usize, u64>, }
246
247pub struct MetricsRegistry {
274 counters: Arc<RwLock<HashMap<MetricName, CounterValue>>>,
275 gauges: Arc<RwLock<HashMap<MetricName, GaugeValue>>>,
276 histograms: Arc<RwLock<HashMap<MetricName, HistogramValue>>>,
277 metadata: Arc<RwLock<HashMap<String, (MetricType, String)>>>, }
279
280impl MetricsRegistry {
281 pub fn new() -> Self {
283 Self {
284 counters: Arc::new(RwLock::new(HashMap::new())),
285 gauges: Arc::new(RwLock::new(HashMap::new())),
286 histograms: Arc::new(RwLock::new(HashMap::new())),
287 metadata: Arc::new(RwLock::new(HashMap::new())),
288 }
289 }
290
291 pub async fn increment(&self, counter: &Counter, value: f64) {
293 {
295 let mut metadata = self.metadata.write().await;
296 metadata
297 .entry(counter.name.name.clone())
298 .or_insert((MetricType::Counter, counter.help.clone()));
299 }
300
301 let mut counters = self.counters.write().await;
303 counters
304 .entry(counter.name.clone())
305 .and_modify(|v| v.value += value)
306 .or_insert(CounterValue { value });
307 }
308
309 pub async fn set(&self, gauge: &Gauge, value: f64) {
311 {
313 let mut metadata = self.metadata.write().await;
314 metadata
315 .entry(gauge.name.name.clone())
316 .or_insert((MetricType::Gauge, gauge.help.clone()));
317 }
318
319 let mut gauges = self.gauges.write().await;
321 gauges
322 .entry(gauge.name.clone())
323 .and_modify(|v| v.value = value)
324 .or_insert(GaugeValue { value });
325 }
326
327 pub async fn increment_gauge(&self, gauge: &Gauge, delta: f64) {
329 let mut gauges = self.gauges.write().await;
330 gauges
331 .entry(gauge.name.clone())
332 .and_modify(|v| v.value += delta)
333 .or_insert(GaugeValue { value: delta });
334 }
335
336 pub async fn decrement_gauge(&self, gauge: &Gauge, delta: f64) {
338 self.increment_gauge(gauge, -delta).await;
339 }
340
341 pub async fn observe(&self, histogram: &Histogram, value: f64) {
343 {
345 let mut metadata = self.metadata.write().await;
346 metadata
347 .entry(histogram.name.name.clone())
348 .or_insert((MetricType::Histogram, histogram.help.clone()));
349 }
350
351 let bucket_idx = histogram
353 .buckets
354 .iter()
355 .position(|&b| value <= b)
356 .unwrap_or(histogram.buckets.len());
357
358 let mut histograms = self.histograms.write().await;
360 histograms
361 .entry(histogram.name.clone())
362 .and_modify(|h| {
363 h.count += 1;
364 h.sum += value;
365 *h.buckets.entry(bucket_idx).or_insert(0) += 1;
366 })
367 .or_insert_with(|| {
368 let mut buckets = HashMap::new();
369 buckets.insert(bucket_idx, 1);
370 HistogramValue {
371 count: 1,
372 sum: value,
373 buckets,
374 }
375 });
376 }
377
378 pub async fn export_prometheus(&self) -> String {
380 let mut output = String::new();
381
382 {
384 let counters = self.counters.read().await;
385 let metadata = self.metadata.read().await;
386
387 let mut seen_names = std::collections::HashSet::new();
388
389 for (name, value) in counters.iter() {
390 if !seen_names.contains(&name.name) {
392 if let Some((_, help)) = metadata.get(&name.name) {
393 output.push_str(&format!("# HELP {} {}\n", name.name, help));
394 output.push_str(&format!("# TYPE {} counter\n", name.name));
395 }
396 seen_names.insert(name.name.clone());
397 }
398
399 output.push_str(&format!("{} {}\n", name.prometheus_name(), value.value));
400 }
401 }
402
403 {
405 let gauges = self.gauges.read().await;
406 let metadata = self.metadata.read().await;
407
408 let mut seen_names = std::collections::HashSet::new();
409
410 for (name, value) in gauges.iter() {
411 if !seen_names.contains(&name.name) {
412 if let Some((_, help)) = metadata.get(&name.name) {
413 output.push_str(&format!("# HELP {} {}\n", name.name, help));
414 output.push_str(&format!("# TYPE {} gauge\n", name.name));
415 }
416 seen_names.insert(name.name.clone());
417 }
418
419 output.push_str(&format!("{} {}\n", name.prometheus_name(), value.value));
420 }
421 }
422
423 {
425 let histograms = self.histograms.read().await;
426 let metadata = self.metadata.read().await;
427
428 for (name, value) in histograms.iter() {
429 if let Some((_, help)) = metadata.get(&name.name) {
430 output.push_str(&format!("# HELP {} {}\n", name.name, help));
431 output.push_str(&format!("# TYPE {} histogram\n", name.name));
432 }
433
434 for (bucket_idx, count) in &value.buckets {
436 output.push_str(&format!(
437 "{}_bucket{{le=\"{}\"}} {}\n",
438 name.prometheus_name(),
439 bucket_idx,
440 count
441 ));
442 }
443
444 output.push_str(&format!("{}_sum {}\n", name.prometheus_name(), value.sum));
446 output.push_str(&format!("{}_count {}\n", name.prometheus_name(), value.count));
447 }
448 }
449
450 output
451 }
452
453 pub async fn export_json(&self) -> serde_json::Value {
455 let mut metrics = serde_json::Map::new();
456
457 {
459 let counters = self.counters.read().await;
460 for (name, value) in counters.iter() {
461 metrics.insert(
462 name.prometheus_name(),
463 serde_json::json!({
464 "type": "counter",
465 "value": value.value
466 }),
467 );
468 }
469 }
470
471 {
473 let gauges = self.gauges.read().await;
474 for (name, value) in gauges.iter() {
475 metrics.insert(
476 name.prometheus_name(),
477 serde_json::json!({
478 "type": "gauge",
479 "value": value.value
480 }),
481 );
482 }
483 }
484
485 {
487 let histograms = self.histograms.read().await;
488 for (name, value) in histograms.iter() {
489 metrics.insert(
490 name.prometheus_name(),
491 serde_json::json!({
492 "type": "histogram",
493 "count": value.count,
494 "sum": value.sum,
495 "buckets": value.buckets
496 }),
497 );
498 }
499 }
500
501 serde_json::Value::Object(metrics)
502 }
503
504 pub async fn clear(&self) {
506 self.counters.write().await.clear();
507 self.gauges.write().await.clear();
508 self.histograms.write().await.clear();
509 self.metadata.write().await.clear();
510 }
511}
512
513impl Default for MetricsRegistry {
514 fn default() -> Self {
515 Self::new()
516 }
517}
518
519pub struct NodeMetrics {
525 pub messages_received: Counter,
527 pub messages_sent: Counter,
529 pub errors_total: Counter,
531 pub processing_duration: Histogram,
533 pub active_subscriptions: Gauge,
535 pub queue_size: Gauge,
537}
538
539impl NodeMetrics {
540 pub fn new(node_id: &str) -> Self {
542 let mut labels = HashMap::new();
543 labels.insert("node".to_string(), node_id.to_string());
544
545 Self {
546 messages_received: Counter::with_labels(
547 "messages_received_total",
548 "Total messages received",
549 labels.clone(),
550 ),
551 messages_sent: Counter::with_labels("messages_sent_total", "Total messages sent", labels.clone()),
552 errors_total: Counter::with_labels("errors_total", "Total errors", labels.clone()),
553 processing_duration: Histogram::with_buckets(
554 "processing_duration_seconds",
555 "Message processing duration",
556 default_buckets(),
557 ),
558 active_subscriptions: Gauge::with_labels(
559 "active_subscriptions",
560 "Number of active subscriptions",
561 labels.clone(),
562 ),
563 queue_size: Gauge::with_labels("queue_size", "Current queue size", labels),
564 }
565 }
566}