Skip to main content

hermod/server/
ekg.rs

1//! EKG mini-protocol client (protocol 1)
2//!
3//! The acceptor side polls the connected forwarder node for EKG metrics and
4//! populates the node's Prometheus registry ([`NodeState::registry`]).
5//!
6//! ## Wire protocol (`ekg-forward` package, `codecEKGForward`)
7//!
8//! ```text
9//! MsgReq(GetAllMetrics)     →  array(2)\[word(0), array(1)\[word(0)\]\]
10//! MsgReq(GetUpdatedMetrics) →  array(2)\[word(0), array(1)\[word(2)\]\]
11//! MsgResp(metrics)          →  array(2)\[word(1), ResponseMetrics\]
12//! MsgDone                   →  array(1)\[word(1)\]
13//! ```
14//!
15//! `MsgDone` and `MsgResp` share tag `word(1)` and are distinguished by
16//! the outer array length (1 = Done, 2 = Resp).
17//!
18//! `get_all = false` requests only metrics that changed since the last poll;
19//! `get_all = true` requests the full snapshot.  Controlled by
20//! `ekgRequestFull` in the config.
21//!
22//! `ResponseMetrics` is a Haskell Generic Serialise newtype:
23//! `array(2)\[word(0), \[(text, value)...\]\]` where each pair is
24//! `array(2)\[text(name), metricvalue\]`.
25//!
26//! ## Metric value CBOR (`ekg-core` `Value` type, Generic Serialise)
27//!
28//! ```text
29//! Counter(i64)  →  array(2)\[0, i64\]
30//! Gauge(i64)    →  array(2)\[1, i64\]
31//! Label(String) →  array(2)\[2, text\]
32//! ```
33//!
34//! `Distribution` is intentionally not supported — the Haskell `ekg-forward`
35//! library itself does not yet forward distributions.
36
37use crate::server::node::NodeState;
38use pallas_codec::minicbor::{self, Decode, Decoder, Encode, Encoder, data::Type};
39use pallas_network::multiplexer::{ChannelBuffer, Error};
40use prometheus::{GaugeVec, IntCounterVec, Opts, Registry};
41use std::collections::HashMap;
42use std::sync::{Arc, Mutex};
43use std::time::Duration;
44use tracing::{debug, warn};
45
46// ---------------------------------------------------------------------------
47// Protocol message types
48// ---------------------------------------------------------------------------
49
50/// An EKG metric value
51#[derive(Debug, Clone)]
52pub enum EkgValue {
53    /// An integer counter (monotonically increasing)
54    Counter(i64),
55    /// A gauge (can go up or down)
56    Gauge(i64),
57    /// A text label
58    Label(String),
59}
60
61impl Encode<()> for EkgValue {
62    fn encode<W: minicbor::encode::Write>(
63        &self,
64        e: &mut Encoder<W>,
65        _ctx: &mut (),
66    ) -> Result<(), minicbor::encode::Error<W::Error>> {
67        e.array(2)?;
68        match self {
69            EkgValue::Counter(v) => {
70                e.u8(0)?.i64(*v)?;
71            }
72            EkgValue::Gauge(v) => {
73                e.u8(1)?.i64(*v)?;
74            }
75            EkgValue::Label(s) => {
76                e.u8(2)?.str(s)?;
77            }
78        }
79        Ok(())
80    }
81}
82
83impl<'b> Decode<'b, ()> for EkgValue {
84    fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, minicbor::decode::Error> {
85        d.array()?;
86        let tag = d.u8()?;
87        match tag {
88            0 => Ok(EkgValue::Counter(d.i64()?)),
89            1 => Ok(EkgValue::Gauge(d.i64()?)),
90            2 => Ok(EkgValue::Label(d.str()?.to_string())),
91            _ => {
92                d.skip()?;
93                Ok(EkgValue::Label(format!("<tag {}>", tag)))
94            }
95        }
96    }
97}
98
99/// Messages in the EKG mini-protocol
100#[derive(Debug)]
101pub enum EkgMessage {
102    /// Request metrics: false=updated only, true=all
103    Req(bool),
104    /// Response with metric map
105    Resp(HashMap<String, EkgValue>),
106    /// Terminate the session
107    Done,
108}
109
110impl Encode<()> for EkgMessage {
111    fn encode<W: minicbor::encode::Write>(
112        &self,
113        e: &mut Encoder<W>,
114        ctx: &mut (),
115    ) -> Result<(), minicbor::encode::Error<W::Error>> {
116        match self {
117            EkgMessage::Req(get_all) => {
118                // array(2)[word(0), Request]
119                // GetAllMetrics = array(1)[word(0)], GetUpdatedMetrics = array(1)[word(2)]
120                e.array(2)?.u8(0)?;
121                if *get_all {
122                    e.array(1)?.u8(0)?; // GetAllMetrics
123                } else {
124                    e.array(1)?.u8(2)?; // GetUpdatedMetrics
125                }
126            }
127            EkgMessage::Resp(metrics) => {
128                // array(2)[word(1), ResponseMetrics]
129                // ResponseMetrics (Generic Serialise newtype): array(2)[word(0), list_of_pairs]
130                e.array(2)?.u8(1)?;
131                e.array(2)?.u8(0)?;
132                e.array(metrics.len() as u64)?;
133                for (k, v) in metrics {
134                    e.array(2)?;
135                    e.str(k)?;
136                    v.encode(e, ctx)?;
137                }
138            }
139            EkgMessage::Done => {
140                // array(1)[word(1)]
141                e.array(1)?.u8(1)?;
142            }
143        }
144        Ok(())
145    }
146}
147
148impl<'b> Decode<'b, ()> for EkgMessage {
149    fn decode(d: &mut Decoder<'b>, ctx: &mut ()) -> Result<Self, minicbor::decode::Error> {
150        let arr_len = d.array()?;
151        let tag = d.u8()?;
152        // MsgDone and MsgResp share tag word(1); disambiguate by array length.
153        match (arr_len, tag) {
154            // MsgReq: array(2)[word(0), Request]
155            (Some(2), 0) => {
156                // Request sum type: array(1)[word(N)]
157                // 0 = GetAllMetrics, 2 = GetUpdatedMetrics
158                d.array()?;
159                let req_tag = d.u8()?;
160                Ok(EkgMessage::Req(req_tag == 0))
161            }
162            // MsgDone: array(1)[word(1)]
163            (Some(1), 1) => Ok(EkgMessage::Done),
164            // MsgResp: array(2)[word(1), ResponseMetrics]
165            (Some(2), 1) => {
166                // ResponseMetrics (Generic Serialise newtype): array(2)[word(0), list_of_pairs]
167                d.array()?;
168                d.u8()?; // constructor index 0
169                let list_len = d.array()?;
170                let mut metrics = HashMap::new();
171                let mut count = 0u64;
172                loop {
173                    match list_len {
174                        None => {
175                            if d.datatype()? == Type::Break {
176                                d.skip()?;
177                                break;
178                            }
179                        }
180                        Some(n) => {
181                            if count >= n {
182                                break;
183                            }
184                        }
185                    }
186                    // Each element is array(2)[text(name), metricvalue]
187                    d.array()?;
188                    let key = d.str()?.to_string();
189                    let val = EkgValue::decode(d, ctx)?;
190                    metrics.insert(key, val);
191                    count += 1;
192                }
193                Ok(EkgMessage::Resp(metrics))
194            }
195            _ => Err(minicbor::decode::Error::message(
196                "unknown EKG message format",
197            )),
198        }
199    }
200}
201
202// ---------------------------------------------------------------------------
203// EKG poller
204// ---------------------------------------------------------------------------
205
206/// Polls EKG metrics from a forwarder and updates the node's Prometheus registry
207pub struct EkgPoller {
208    channel: ChannelBuffer,
209    node_state: Arc<NodeState>,
210    request_full: bool,
211    gauge_cache: Mutex<HashMap<String, GaugeVec>>,
212    /// Gauges with a `value` label dimension, used for EKG Label metrics
213    label_gauge_cache: Mutex<HashMap<String, GaugeVec>>,
214    counter_cache: Mutex<HashMap<String, IntCounterVec>>,
215    counter_values: Mutex<HashMap<String, i64>>,
216}
217
218impl EkgPoller {
219    /// Create a new EKG poller
220    pub fn new(
221        channel: pallas_network::multiplexer::AgentChannel,
222        node_state: Arc<NodeState>,
223        request_full: bool,
224    ) -> Self {
225        EkgPoller {
226            channel: ChannelBuffer::new(channel),
227            node_state,
228            request_full,
229            gauge_cache: Mutex::new(HashMap::new()),
230            label_gauge_cache: Mutex::new(HashMap::new()),
231            counter_cache: Mutex::new(HashMap::new()),
232            counter_values: Mutex::new(HashMap::new()),
233        }
234    }
235
236    /// Run the polling loop until the remote sends Done or the channel closes
237    pub async fn run_poll_loop(&mut self, freq_secs: f64) {
238        let interval = Duration::from_secs_f64(freq_secs.max(0.1));
239        loop {
240            match self.poll_once().await {
241                Ok(true) => {
242                    debug!("EKG: node {} sent Done", self.node_state.id);
243                    return;
244                }
245                Ok(false) => {}
246                Err(e) => {
247                    warn!("EKG poll error for {}: {}", self.node_state.id, e);
248                    return;
249                }
250            }
251            tokio::time::sleep(interval).await;
252        }
253    }
254
255    /// Send one MsgReq, receive MsgResp/MsgDone. Returns true if done.
256    async fn poll_once(&mut self) -> Result<bool, Error> {
257        let req = EkgMessage::Req(self.request_full);
258        self.channel.send_msg_chunks(&req).await?;
259
260        let msg: EkgMessage = self.channel.recv_full_msg().await?;
261        match msg {
262            EkgMessage::Resp(metrics) => {
263                self.update_registry(&metrics);
264                Ok(false)
265            }
266            EkgMessage::Done => Ok(true),
267            EkgMessage::Req(_) => Err(Error::Decoding("unexpected Req from forwarder".into())),
268        }
269    }
270
271    fn update_registry(&self, metrics: &HashMap<String, EkgValue>) {
272        let registry = &self.node_state.registry;
273        for (name, value) in metrics {
274            if let Err(e) = update_metric(
275                registry,
276                name,
277                value,
278                &self.gauge_cache,
279                &self.label_gauge_cache,
280                &self.counter_cache,
281                &self.counter_values,
282            ) {
283                debug!(
284                    "EKG registry error for {}/{}: {}",
285                    self.node_state.id, name, e
286                );
287            }
288        }
289    }
290}
291
292// ---------------------------------------------------------------------------
293// Shared registry update helper
294// ---------------------------------------------------------------------------
295
296/// Update a single metric in the given registry.
297/// Exported so the dispatcher's EkgBackend can reuse the same logic.
298pub(crate) fn update_metric(
299    registry: &Registry,
300    name: &str,
301    value: &EkgValue,
302    gauge_cache: &Mutex<HashMap<String, GaugeVec>>,
303    label_gauge_cache: &Mutex<HashMap<String, GaugeVec>>,
304    counter_cache: &Mutex<HashMap<String, IntCounterVec>>,
305    counter_values: &Mutex<HashMap<String, i64>>,
306) -> anyhow::Result<()> {
307    match value {
308        EkgValue::Counter(v) => {
309            let counter = get_or_create_counter(registry, counter_cache, name)?;
310            let mut prev_map = counter_values.lock().unwrap();
311            let prev = prev_map.entry(name.to_string()).or_insert(0);
312            let delta = v.saturating_sub(*prev).max(0) as u64;
313            if delta > 0 {
314                counter.with_label_values(&[]).inc_by(delta);
315            }
316            *prev = *v;
317        }
318        EkgValue::Gauge(v) => {
319            let gauge = get_or_create_gauge(registry, gauge_cache, name)?;
320            gauge.with_label_values(&[]).set(*v as f64);
321        }
322        EkgValue::Label(text) => {
323            // Expose label text using the Prometheus `{_info}` pattern:
324            // a GaugeVec with a `value` label dimension set to the text value
325            // and a gauge value of 1.0.
326            // e.g. `ekg_some_metric_info{value="RTS v1.0"} 1.0`
327            let metric_name = sanitise_name(&format!("{}_info", name));
328            let gauge = get_or_create_label_gauge(registry, label_gauge_cache, &metric_name)?;
329            gauge.with_label_values(&[text.as_str()]).set(1.0);
330        }
331    }
332    Ok(())
333}
334
335fn get_or_create_gauge(
336    registry: &Registry,
337    cache: &Mutex<HashMap<String, GaugeVec>>,
338    name: &str,
339) -> anyhow::Result<GaugeVec> {
340    let mut lock = cache.lock().unwrap();
341    if let Some(g) = lock.get(name) {
342        return Ok(g.clone());
343    }
344    let opts = Opts::new(sanitise_name(name), name.to_string());
345    let g =
346        GaugeVec::new(opts, &[]).map_err(|e| anyhow::anyhow!("create gauge {}: {}", name, e))?;
347    registry
348        .register(Box::new(g.clone()))
349        .map_err(|e| anyhow::anyhow!("register gauge {}: {}", name, e))?;
350    lock.insert(name.to_string(), g.clone());
351    Ok(g)
352}
353
354fn get_or_create_counter(
355    registry: &Registry,
356    cache: &Mutex<HashMap<String, IntCounterVec>>,
357    name: &str,
358) -> anyhow::Result<IntCounterVec> {
359    let mut lock = cache.lock().unwrap();
360    if let Some(c) = lock.get(name) {
361        return Ok(c.clone());
362    }
363    let opts = Opts::new(sanitise_name(name), name.to_string());
364    let c = IntCounterVec::new(opts, &[])
365        .map_err(|e| anyhow::anyhow!("create counter {}: {}", name, e))?;
366    registry
367        .register(Box::new(c.clone()))
368        .map_err(|e| anyhow::anyhow!("register counter {}: {}", name, e))?;
369    lock.insert(name.to_string(), c.clone());
370    Ok(c)
371}
372
373fn get_or_create_label_gauge(
374    registry: &Registry,
375    cache: &Mutex<HashMap<String, GaugeVec>>,
376    name: &str,
377) -> anyhow::Result<GaugeVec> {
378    let mut lock = cache.lock().unwrap();
379    if let Some(g) = lock.get(name) {
380        return Ok(g.clone());
381    }
382    let opts = Opts::new(sanitise_name(name), name.to_string());
383    let g = GaugeVec::new(opts, &["value"])
384        .map_err(|e| anyhow::anyhow!("create label gauge {}: {}", name, e))?;
385    registry
386        .register(Box::new(g.clone()))
387        .map_err(|e| anyhow::anyhow!("register label gauge {}: {}", name, e))?;
388    lock.insert(name.to_string(), g.clone());
389    Ok(g)
390}
391
392fn sanitise_name(name: &str) -> String {
393    name.chars()
394        .map(|c| {
395            if c.is_alphanumeric() || c == '_' {
396                c
397            } else {
398                '_'
399            }
400        })
401        .collect()
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use pallas_codec::minicbor;
408
409    fn encode<T: minicbor::Encode<()>>(value: &T) -> Vec<u8> {
410        let mut buf = Vec::new();
411        minicbor::encode_with(value, &mut buf, &mut ()).unwrap();
412        buf
413    }
414
415    fn decode<T: for<'b> minicbor::Decode<'b, ()>>(buf: &[u8]) -> T {
416        minicbor::decode_with(buf, &mut ()).unwrap()
417    }
418
419    type GaugeCache = Mutex<HashMap<String, GaugeVec>>;
420    type CounterCache = Mutex<HashMap<String, IntCounterVec>>;
421    type CounterValues = Mutex<HashMap<String, i64>>;
422
423    fn empty_caches() -> (GaugeCache, GaugeCache, CounterCache, CounterValues) {
424        (
425            Mutex::new(HashMap::new()),
426            Mutex::new(HashMap::new()),
427            Mutex::new(HashMap::new()),
428            Mutex::new(HashMap::new()),
429        )
430    }
431
432    // --- EkgValue CBOR ---
433
434    #[test]
435    fn ekg_value_counter_round_trip() {
436        let v = EkgValue::Counter(-42);
437        assert!(matches!(
438            decode::<EkgValue>(&encode(&v)),
439            EkgValue::Counter(-42)
440        ));
441    }
442
443    #[test]
444    fn ekg_value_gauge_round_trip() {
445        let v = EkgValue::Gauge(100);
446        assert!(matches!(
447            decode::<EkgValue>(&encode(&v)),
448            EkgValue::Gauge(100)
449        ));
450    }
451
452    #[test]
453    fn ekg_value_label_round_trip() {
454        let v = EkgValue::Label("RTS v1.0".to_string());
455        match decode::<EkgValue>(&encode(&v)) {
456            EkgValue::Label(s) => assert_eq!(s, "RTS v1.0"),
457            _ => panic!("wrong variant"),
458        }
459    }
460
461    // --- EkgMessage CBOR ---
462
463    #[test]
464    fn ekg_req_get_all_round_trip() {
465        assert!(matches!(
466            decode::<EkgMessage>(&encode(&EkgMessage::Req(true))),
467            EkgMessage::Req(true)
468        ));
469    }
470
471    #[test]
472    fn ekg_req_get_updated_round_trip() {
473        assert!(matches!(
474            decode::<EkgMessage>(&encode(&EkgMessage::Req(false))),
475            EkgMessage::Req(false)
476        ));
477    }
478
479    #[test]
480    fn ekg_done_round_trip() {
481        assert!(matches!(
482            decode::<EkgMessage>(&encode(&EkgMessage::Done)),
483            EkgMessage::Done
484        ));
485    }
486
487    #[test]
488    fn ekg_resp_round_trip() {
489        let mut metrics = HashMap::new();
490        metrics.insert("cpu".to_string(), EkgValue::Gauge(75));
491        metrics.insert("mem".to_string(), EkgValue::Counter(1024));
492        metrics.insert("rts".to_string(), EkgValue::Label("v1".to_string()));
493        let msg = EkgMessage::Resp(metrics);
494        match decode::<EkgMessage>(&encode(&msg)) {
495            EkgMessage::Resp(m) => {
496                assert_eq!(m.len(), 3);
497                assert!(matches!(m["cpu"], EkgValue::Gauge(75)));
498                assert!(matches!(m["mem"], EkgValue::Counter(1024)));
499                assert!(matches!(m["rts"], EkgValue::Label(ref s) if s == "v1"));
500            }
501            _ => panic!("wrong variant"),
502        }
503    }
504
505    #[test]
506    fn ekg_resp_empty_round_trip() {
507        let msg = EkgMessage::Resp(HashMap::new());
508        match decode::<EkgMessage>(&encode(&msg)) {
509            EkgMessage::Resp(m) => assert!(m.is_empty()),
510            _ => panic!("wrong variant"),
511        }
512    }
513
514    // --- update_metric ---
515
516    #[test]
517    fn update_metric_gauge_sets_value() {
518        let registry = Registry::new();
519        let (gc, lgc, cc, cv) = empty_caches();
520        update_metric(
521            &registry,
522            "my_gauge",
523            &EkgValue::Gauge(42),
524            &gc,
525            &lgc,
526            &cc,
527            &cv,
528        )
529        .unwrap();
530        let families = registry.gather();
531        assert_eq!(families.len(), 1);
532        assert_eq!(families[0].get_name(), "my_gauge");
533        assert_eq!(families[0].get_metric()[0].get_gauge().get_value(), 42.0);
534    }
535
536    #[test]
537    fn update_metric_gauge_overwrites() {
538        let registry = Registry::new();
539        let (gc, lgc, cc, cv) = empty_caches();
540        update_metric(&registry, "g", &EkgValue::Gauge(10), &gc, &lgc, &cc, &cv).unwrap();
541        update_metric(&registry, "g", &EkgValue::Gauge(99), &gc, &lgc, &cc, &cv).unwrap();
542        let families = registry.gather();
543        assert_eq!(families[0].get_metric()[0].get_gauge().get_value(), 99.0);
544    }
545
546    #[test]
547    fn update_metric_counter_accumulates_deltas() {
548        let registry = Registry::new();
549        let (gc, lgc, cc, cv) = empty_caches();
550        update_metric(&registry, "ops", &EkgValue::Counter(5), &gc, &lgc, &cc, &cv).unwrap();
551        update_metric(&registry, "ops", &EkgValue::Counter(8), &gc, &lgc, &cc, &cv).unwrap();
552        let families = registry.gather();
553        let ops = families.iter().find(|f| f.get_name() == "ops").unwrap();
554        // First: delta = 5-0 = 5; second: delta = 8-5 = 3; total = 8
555        assert_eq!(ops.get_metric()[0].get_counter().get_value(), 8.0);
556    }
557
558    #[test]
559    fn update_metric_counter_ignores_decreasing_value() {
560        // Counter going backwards (e.g. node restart) should not produce negative deltas
561        let registry = Registry::new();
562        let (gc, lgc, cc, cv) = empty_caches();
563        update_metric(&registry, "c", &EkgValue::Counter(10), &gc, &lgc, &cc, &cv).unwrap();
564        update_metric(&registry, "c", &EkgValue::Counter(3), &gc, &lgc, &cc, &cv).unwrap();
565        let families = registry.gather();
566        // Only the first delta (10) was applied; the backwards step was skipped
567        assert_eq!(families[0].get_metric()[0].get_counter().get_value(), 10.0);
568    }
569
570    #[test]
571    fn update_metric_label_creates_info_gauge_with_value_label() {
572        let registry = Registry::new();
573        let (gc, lgc, cc, cv) = empty_caches();
574        update_metric(
575            &registry,
576            "rts_version",
577            &EkgValue::Label("RTS v1.0".to_string()),
578            &gc,
579            &lgc,
580            &cc,
581            &cv,
582        )
583        .unwrap();
584        let families = registry.gather();
585        let info = families
586            .iter()
587            .find(|f| f.get_name() == "rts_version_info")
588            .expect("rts_version_info metric should exist");
589        let metric = &info.get_metric()[0];
590        assert_eq!(metric.get_gauge().get_value(), 1.0);
591        let labels = metric.get_label();
592        assert_eq!(labels.len(), 1);
593        assert_eq!(labels[0].get_name(), "value");
594        assert_eq!(labels[0].get_value(), "RTS v1.0");
595    }
596
597    #[test]
598    fn sanitise_name_replaces_dots_and_slashes() {
599        assert_eq!(sanitise_name("a.b/c"), "a_b_c");
600    }
601
602    #[test]
603    fn sanitise_name_keeps_alphanumeric_and_underscore() {
604        assert_eq!(sanitise_name("abc_123"), "abc_123");
605    }
606}