1use crate::server::node::NodeState;
38use pallas_codec::minicbor::{self, Decode, Decoder, Encode, Encoder, data::Type};
39use pallas_network::multiplexer::{ChannelBuffer, Error};
40use prometheus::{GaugeVec, IntCounterVec, Opts, Registry};
41use std::collections::HashMap;
42use std::sync::{Arc, Mutex};
43use std::time::Duration;
44use tracing::{debug, warn};
45
46#[derive(Debug, Clone)]
52pub enum EkgValue {
53 Counter(i64),
55 Gauge(i64),
57 Label(String),
59}
60
61impl Encode<()> for EkgValue {
62 fn encode<W: minicbor::encode::Write>(
63 &self,
64 e: &mut Encoder<W>,
65 _ctx: &mut (),
66 ) -> Result<(), minicbor::encode::Error<W::Error>> {
67 e.array(2)?;
68 match self {
69 EkgValue::Counter(v) => {
70 e.u8(0)?.i64(*v)?;
71 }
72 EkgValue::Gauge(v) => {
73 e.u8(1)?.i64(*v)?;
74 }
75 EkgValue::Label(s) => {
76 e.u8(2)?.str(s)?;
77 }
78 }
79 Ok(())
80 }
81}
82
83impl<'b> Decode<'b, ()> for EkgValue {
84 fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, minicbor::decode::Error> {
85 d.array()?;
86 let tag = d.u8()?;
87 match tag {
88 0 => Ok(EkgValue::Counter(d.i64()?)),
89 1 => Ok(EkgValue::Gauge(d.i64()?)),
90 2 => Ok(EkgValue::Label(d.str()?.to_string())),
91 _ => {
92 d.skip()?;
93 Ok(EkgValue::Label(format!("<tag {}>", tag)))
94 }
95 }
96 }
97}
98
99#[derive(Debug)]
101pub enum EkgMessage {
102 Req(bool),
104 Resp(HashMap<String, EkgValue>),
106 Done,
108}
109
110impl Encode<()> for EkgMessage {
111 fn encode<W: minicbor::encode::Write>(
112 &self,
113 e: &mut Encoder<W>,
114 ctx: &mut (),
115 ) -> Result<(), minicbor::encode::Error<W::Error>> {
116 match self {
117 EkgMessage::Req(get_all) => {
118 e.array(2)?.u8(0)?;
121 if *get_all {
122 e.array(1)?.u8(0)?; } else {
124 e.array(1)?.u8(2)?; }
126 }
127 EkgMessage::Resp(metrics) => {
128 e.array(2)?.u8(1)?;
131 e.array(2)?.u8(0)?;
132 e.array(metrics.len() as u64)?;
133 for (k, v) in metrics {
134 e.array(2)?;
135 e.str(k)?;
136 v.encode(e, ctx)?;
137 }
138 }
139 EkgMessage::Done => {
140 e.array(1)?.u8(1)?;
142 }
143 }
144 Ok(())
145 }
146}
147
148impl<'b> Decode<'b, ()> for EkgMessage {
149 fn decode(d: &mut Decoder<'b>, ctx: &mut ()) -> Result<Self, minicbor::decode::Error> {
150 let arr_len = d.array()?;
151 let tag = d.u8()?;
152 match (arr_len, tag) {
154 (Some(2), 0) => {
156 d.array()?;
159 let req_tag = d.u8()?;
160 Ok(EkgMessage::Req(req_tag == 0))
161 }
162 (Some(1), 1) => Ok(EkgMessage::Done),
164 (Some(2), 1) => {
166 d.array()?;
168 d.u8()?; let list_len = d.array()?;
170 let mut metrics = HashMap::new();
171 let mut count = 0u64;
172 loop {
173 match list_len {
174 None => {
175 if d.datatype()? == Type::Break {
176 d.skip()?;
177 break;
178 }
179 }
180 Some(n) => {
181 if count >= n {
182 break;
183 }
184 }
185 }
186 d.array()?;
188 let key = d.str()?.to_string();
189 let val = EkgValue::decode(d, ctx)?;
190 metrics.insert(key, val);
191 count += 1;
192 }
193 Ok(EkgMessage::Resp(metrics))
194 }
195 _ => Err(minicbor::decode::Error::message(
196 "unknown EKG message format",
197 )),
198 }
199 }
200}
201
202pub struct EkgPoller {
208 channel: ChannelBuffer,
209 node_state: Arc<NodeState>,
210 request_full: bool,
211 gauge_cache: Mutex<HashMap<String, GaugeVec>>,
212 label_gauge_cache: Mutex<HashMap<String, GaugeVec>>,
214 counter_cache: Mutex<HashMap<String, IntCounterVec>>,
215 counter_values: Mutex<HashMap<String, i64>>,
216}
217
218impl EkgPoller {
219 pub fn new(
221 channel: pallas_network::multiplexer::AgentChannel,
222 node_state: Arc<NodeState>,
223 request_full: bool,
224 ) -> Self {
225 EkgPoller {
226 channel: ChannelBuffer::new(channel),
227 node_state,
228 request_full,
229 gauge_cache: Mutex::new(HashMap::new()),
230 label_gauge_cache: Mutex::new(HashMap::new()),
231 counter_cache: Mutex::new(HashMap::new()),
232 counter_values: Mutex::new(HashMap::new()),
233 }
234 }
235
236 pub async fn run_poll_loop(&mut self, freq_secs: f64) {
238 let interval = Duration::from_secs_f64(freq_secs.max(0.1));
239 loop {
240 match self.poll_once().await {
241 Ok(true) => {
242 debug!("EKG: node {} sent Done", self.node_state.id);
243 return;
244 }
245 Ok(false) => {}
246 Err(e) => {
247 warn!("EKG poll error for {}: {}", self.node_state.id, e);
248 return;
249 }
250 }
251 tokio::time::sleep(interval).await;
252 }
253 }
254
255 async fn poll_once(&mut self) -> Result<bool, Error> {
257 let req = EkgMessage::Req(self.request_full);
258 self.channel.send_msg_chunks(&req).await?;
259
260 let msg: EkgMessage = self.channel.recv_full_msg().await?;
261 match msg {
262 EkgMessage::Resp(metrics) => {
263 self.update_registry(&metrics);
264 Ok(false)
265 }
266 EkgMessage::Done => Ok(true),
267 EkgMessage::Req(_) => Err(Error::Decoding("unexpected Req from forwarder".into())),
268 }
269 }
270
271 fn update_registry(&self, metrics: &HashMap<String, EkgValue>) {
272 let registry = &self.node_state.registry;
273 for (name, value) in metrics {
274 if let Err(e) = update_metric(
275 registry,
276 name,
277 value,
278 &self.gauge_cache,
279 &self.label_gauge_cache,
280 &self.counter_cache,
281 &self.counter_values,
282 ) {
283 debug!(
284 "EKG registry error for {}/{}: {}",
285 self.node_state.id, name, e
286 );
287 }
288 }
289 }
290}
291
292pub(crate) fn update_metric(
299 registry: &Registry,
300 name: &str,
301 value: &EkgValue,
302 gauge_cache: &Mutex<HashMap<String, GaugeVec>>,
303 label_gauge_cache: &Mutex<HashMap<String, GaugeVec>>,
304 counter_cache: &Mutex<HashMap<String, IntCounterVec>>,
305 counter_values: &Mutex<HashMap<String, i64>>,
306) -> anyhow::Result<()> {
307 match value {
308 EkgValue::Counter(v) => {
309 let counter = get_or_create_counter(registry, counter_cache, name)?;
310 let mut prev_map = counter_values.lock().unwrap();
311 let prev = prev_map.entry(name.to_string()).or_insert(0);
312 let delta = v.saturating_sub(*prev).max(0) as u64;
313 if delta > 0 {
314 counter.with_label_values(&[]).inc_by(delta);
315 }
316 *prev = *v;
317 }
318 EkgValue::Gauge(v) => {
319 let gauge = get_or_create_gauge(registry, gauge_cache, name)?;
320 gauge.with_label_values(&[]).set(*v as f64);
321 }
322 EkgValue::Label(text) => {
323 let metric_name = sanitise_name(&format!("{}_info", name));
328 let gauge = get_or_create_label_gauge(registry, label_gauge_cache, &metric_name)?;
329 gauge.with_label_values(&[text.as_str()]).set(1.0);
330 }
331 }
332 Ok(())
333}
334
335fn get_or_create_gauge(
336 registry: &Registry,
337 cache: &Mutex<HashMap<String, GaugeVec>>,
338 name: &str,
339) -> anyhow::Result<GaugeVec> {
340 let mut lock = cache.lock().unwrap();
341 if let Some(g) = lock.get(name) {
342 return Ok(g.clone());
343 }
344 let opts = Opts::new(sanitise_name(name), name.to_string());
345 let g =
346 GaugeVec::new(opts, &[]).map_err(|e| anyhow::anyhow!("create gauge {}: {}", name, e))?;
347 registry
348 .register(Box::new(g.clone()))
349 .map_err(|e| anyhow::anyhow!("register gauge {}: {}", name, e))?;
350 lock.insert(name.to_string(), g.clone());
351 Ok(g)
352}
353
354fn get_or_create_counter(
355 registry: &Registry,
356 cache: &Mutex<HashMap<String, IntCounterVec>>,
357 name: &str,
358) -> anyhow::Result<IntCounterVec> {
359 let mut lock = cache.lock().unwrap();
360 if let Some(c) = lock.get(name) {
361 return Ok(c.clone());
362 }
363 let opts = Opts::new(sanitise_name(name), name.to_string());
364 let c = IntCounterVec::new(opts, &[])
365 .map_err(|e| anyhow::anyhow!("create counter {}: {}", name, e))?;
366 registry
367 .register(Box::new(c.clone()))
368 .map_err(|e| anyhow::anyhow!("register counter {}: {}", name, e))?;
369 lock.insert(name.to_string(), c.clone());
370 Ok(c)
371}
372
373fn get_or_create_label_gauge(
374 registry: &Registry,
375 cache: &Mutex<HashMap<String, GaugeVec>>,
376 name: &str,
377) -> anyhow::Result<GaugeVec> {
378 let mut lock = cache.lock().unwrap();
379 if let Some(g) = lock.get(name) {
380 return Ok(g.clone());
381 }
382 let opts = Opts::new(sanitise_name(name), name.to_string());
383 let g = GaugeVec::new(opts, &["value"])
384 .map_err(|e| anyhow::anyhow!("create label gauge {}: {}", name, e))?;
385 registry
386 .register(Box::new(g.clone()))
387 .map_err(|e| anyhow::anyhow!("register label gauge {}: {}", name, e))?;
388 lock.insert(name.to_string(), g.clone());
389 Ok(g)
390}
391
392fn sanitise_name(name: &str) -> String {
393 name.chars()
394 .map(|c| {
395 if c.is_alphanumeric() || c == '_' {
396 c
397 } else {
398 '_'
399 }
400 })
401 .collect()
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use pallas_codec::minicbor;
408
409 fn encode<T: minicbor::Encode<()>>(value: &T) -> Vec<u8> {
410 let mut buf = Vec::new();
411 minicbor::encode_with(value, &mut buf, &mut ()).unwrap();
412 buf
413 }
414
415 fn decode<T: for<'b> minicbor::Decode<'b, ()>>(buf: &[u8]) -> T {
416 minicbor::decode_with(buf, &mut ()).unwrap()
417 }
418
419 type GaugeCache = Mutex<HashMap<String, GaugeVec>>;
420 type CounterCache = Mutex<HashMap<String, IntCounterVec>>;
421 type CounterValues = Mutex<HashMap<String, i64>>;
422
423 fn empty_caches() -> (GaugeCache, GaugeCache, CounterCache, CounterValues) {
424 (
425 Mutex::new(HashMap::new()),
426 Mutex::new(HashMap::new()),
427 Mutex::new(HashMap::new()),
428 Mutex::new(HashMap::new()),
429 )
430 }
431
432 #[test]
435 fn ekg_value_counter_round_trip() {
436 let v = EkgValue::Counter(-42);
437 assert!(matches!(
438 decode::<EkgValue>(&encode(&v)),
439 EkgValue::Counter(-42)
440 ));
441 }
442
443 #[test]
444 fn ekg_value_gauge_round_trip() {
445 let v = EkgValue::Gauge(100);
446 assert!(matches!(
447 decode::<EkgValue>(&encode(&v)),
448 EkgValue::Gauge(100)
449 ));
450 }
451
452 #[test]
453 fn ekg_value_label_round_trip() {
454 let v = EkgValue::Label("RTS v1.0".to_string());
455 match decode::<EkgValue>(&encode(&v)) {
456 EkgValue::Label(s) => assert_eq!(s, "RTS v1.0"),
457 _ => panic!("wrong variant"),
458 }
459 }
460
461 #[test]
464 fn ekg_req_get_all_round_trip() {
465 assert!(matches!(
466 decode::<EkgMessage>(&encode(&EkgMessage::Req(true))),
467 EkgMessage::Req(true)
468 ));
469 }
470
471 #[test]
472 fn ekg_req_get_updated_round_trip() {
473 assert!(matches!(
474 decode::<EkgMessage>(&encode(&EkgMessage::Req(false))),
475 EkgMessage::Req(false)
476 ));
477 }
478
479 #[test]
480 fn ekg_done_round_trip() {
481 assert!(matches!(
482 decode::<EkgMessage>(&encode(&EkgMessage::Done)),
483 EkgMessage::Done
484 ));
485 }
486
487 #[test]
488 fn ekg_resp_round_trip() {
489 let mut metrics = HashMap::new();
490 metrics.insert("cpu".to_string(), EkgValue::Gauge(75));
491 metrics.insert("mem".to_string(), EkgValue::Counter(1024));
492 metrics.insert("rts".to_string(), EkgValue::Label("v1".to_string()));
493 let msg = EkgMessage::Resp(metrics);
494 match decode::<EkgMessage>(&encode(&msg)) {
495 EkgMessage::Resp(m) => {
496 assert_eq!(m.len(), 3);
497 assert!(matches!(m["cpu"], EkgValue::Gauge(75)));
498 assert!(matches!(m["mem"], EkgValue::Counter(1024)));
499 assert!(matches!(m["rts"], EkgValue::Label(ref s) if s == "v1"));
500 }
501 _ => panic!("wrong variant"),
502 }
503 }
504
505 #[test]
506 fn ekg_resp_empty_round_trip() {
507 let msg = EkgMessage::Resp(HashMap::new());
508 match decode::<EkgMessage>(&encode(&msg)) {
509 EkgMessage::Resp(m) => assert!(m.is_empty()),
510 _ => panic!("wrong variant"),
511 }
512 }
513
514 #[test]
517 fn update_metric_gauge_sets_value() {
518 let registry = Registry::new();
519 let (gc, lgc, cc, cv) = empty_caches();
520 update_metric(
521 ®istry,
522 "my_gauge",
523 &EkgValue::Gauge(42),
524 &gc,
525 &lgc,
526 &cc,
527 &cv,
528 )
529 .unwrap();
530 let families = registry.gather();
531 assert_eq!(families.len(), 1);
532 assert_eq!(families[0].get_name(), "my_gauge");
533 assert_eq!(families[0].get_metric()[0].get_gauge().get_value(), 42.0);
534 }
535
536 #[test]
537 fn update_metric_gauge_overwrites() {
538 let registry = Registry::new();
539 let (gc, lgc, cc, cv) = empty_caches();
540 update_metric(®istry, "g", &EkgValue::Gauge(10), &gc, &lgc, &cc, &cv).unwrap();
541 update_metric(®istry, "g", &EkgValue::Gauge(99), &gc, &lgc, &cc, &cv).unwrap();
542 let families = registry.gather();
543 assert_eq!(families[0].get_metric()[0].get_gauge().get_value(), 99.0);
544 }
545
546 #[test]
547 fn update_metric_counter_accumulates_deltas() {
548 let registry = Registry::new();
549 let (gc, lgc, cc, cv) = empty_caches();
550 update_metric(®istry, "ops", &EkgValue::Counter(5), &gc, &lgc, &cc, &cv).unwrap();
551 update_metric(®istry, "ops", &EkgValue::Counter(8), &gc, &lgc, &cc, &cv).unwrap();
552 let families = registry.gather();
553 let ops = families.iter().find(|f| f.get_name() == "ops").unwrap();
554 assert_eq!(ops.get_metric()[0].get_counter().get_value(), 8.0);
556 }
557
558 #[test]
559 fn update_metric_counter_ignores_decreasing_value() {
560 let registry = Registry::new();
562 let (gc, lgc, cc, cv) = empty_caches();
563 update_metric(®istry, "c", &EkgValue::Counter(10), &gc, &lgc, &cc, &cv).unwrap();
564 update_metric(®istry, "c", &EkgValue::Counter(3), &gc, &lgc, &cc, &cv).unwrap();
565 let families = registry.gather();
566 assert_eq!(families[0].get_metric()[0].get_counter().get_value(), 10.0);
568 }
569
570 #[test]
571 fn update_metric_label_creates_info_gauge_with_value_label() {
572 let registry = Registry::new();
573 let (gc, lgc, cc, cv) = empty_caches();
574 update_metric(
575 ®istry,
576 "rts_version",
577 &EkgValue::Label("RTS v1.0".to_string()),
578 &gc,
579 &lgc,
580 &cc,
581 &cv,
582 )
583 .unwrap();
584 let families = registry.gather();
585 let info = families
586 .iter()
587 .find(|f| f.get_name() == "rts_version_info")
588 .expect("rts_version_info metric should exist");
589 let metric = &info.get_metric()[0];
590 assert_eq!(metric.get_gauge().get_value(), 1.0);
591 let labels = metric.get_label();
592 assert_eq!(labels.len(), 1);
593 assert_eq!(labels[0].get_name(), "value");
594 assert_eq!(labels[0].get_value(), "RTS v1.0");
595 }
596
597 #[test]
598 fn sanitise_name_replaces_dots_and_slashes() {
599 assert_eq!(sanitise_name("a.b/c"), "a_b_c");
600 }
601
602 #[test]
603 fn sanitise_name_keeps_alphanumeric_and_underscore() {
604 assert_eq!(sanitise_name("abc_123"), "abc_123");
605 }
606}