ibc_telemetry/
state.rs

1use core::fmt::{Display, Error as FmtError, Formatter};
2use std::{
3    ops::Range,
4    sync::Mutex,
5    time::{Duration, Instant},
6};
7
8use dashmap::{DashMap, DashSet};
9use opentelemetry::{
10    global,
11    metrics::{Counter, Histogram, ObservableGauge, Unit, UpDownCounter},
12    KeyValue,
13};
14use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, MeterProvider, Stream};
15use prometheus::{proto::MetricFamily, Registry};
16
17use ibc_relayer_types::{
18    applications::transfer::Coin,
19    core::ics24_host::identifier::{ChainId, ChannelId, ClientId, PortId},
20    signer::Signer,
21};
22
23use tendermint::Time;
24
25use crate::{broadcast_error::BroadcastError, path_identifier::PathIdentifier};
26
27const EMPTY_BACKLOG_SYMBOL: u64 = 0;
28const BACKLOG_CAPACITY: usize = 1000;
29const BACKLOG_RESET_THRESHOLD: usize = 900;
30
31const QUERY_TYPES_CACHE: [&str; 4] = [
32    "query_latest_height",
33    "query_client_state",
34    "query_connection",
35    "query_channel",
36];
37
38const QUERY_TYPES: [&str; 26] = [
39    "query_latest_height",
40    "query_block",
41    "query_blocks",
42    "query_packet_events",
43    "query_txs",
44    "query_next_sequence_receive",
45    "query_unreceived_acknowledgements",
46    "query_packet_acknowledgements",
47    "query_unreceived_packets",
48    "query_packet_commitments",
49    "query_channel_client_state",
50    "query_channel",
51    "query_channels",
52    "query_connection_channels",
53    "query_connection",
54    "query_connections",
55    "query_client_connections",
56    "query_consensus_state",
57    "query_consensus_states",
58    "query_upgraded_consensus_state",
59    "query_client_state",
60    "query_clients",
61    "query_application_status",
62    "query_commitment_prefix",
63    "query_latest_height",
64    "query_staking_params",
65];
66
67// Constant value used to define the number of seconds
68// the rewarded fees Cache value live.
69// Current value is 7 days.
70const FEE_LIFETIME: Duration = Duration::from_secs(60 * 60 * 24 * 7);
71
72#[derive(Copy, Clone, Debug)]
73pub enum WorkerType {
74    Client,
75    Connection,
76    Channel,
77    Packet,
78    Wallet,
79    CrossChainQuery,
80}
81
82impl Display for WorkerType {
83    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
84        match self {
85            Self::Client => write!(f, "client"),
86            Self::Connection => write!(f, "connection"),
87            Self::Channel => write!(f, "channel"),
88            Self::Packet => write!(f, "packet"),
89            Self::Wallet => write!(f, "wallet"),
90            Self::CrossChainQuery => write!(f, "cross-chain-query"),
91        }
92    }
93}
94
95pub struct TelemetryState {
96    registry: Registry,
97    _meter_provider: MeterProvider,
98
99    /// Number of workers per type
100    workers: UpDownCounter<i64>,
101
102    /// Number of client update messages submitted per client
103    client_updates_submitted: Counter<u64>,
104
105    /// Number of client update skipped due to consensus state already
106    /// existing
107    client_updates_skipped: Counter<u64>,
108
109    /// Number of misbehaviours detected and submitted per client
110    client_misbehaviours_submitted: Counter<u64>,
111
112    /// Number of confirmed receive packets per channel
113    receive_packets_confirmed: Counter<u64>,
114
115    /// Number of confirmed acknowledgment packets per channel
116    acknowledgment_packets_confirmed: Counter<u64>,
117
118    /// Number of confirmed timeout packets per channel
119    timeout_packets_confirmed: Counter<u64>,
120
121    /// Number of queries submitted by Hermes, per chain and query type
122    queries: Counter<u64>,
123
124    /// Number of cache hits for queries submitted by Hermes, per chain and query type
125    queries_cache_hits: Counter<u64>,
126
127    /// Number of times Hermes reconnected to the websocket endpoint, per chain
128    ws_reconnect: Counter<u64>,
129
130    /// How many IBC events did Hermes receive via the WebSocket subscription, per chain
131    ws_events: Counter<u64>,
132
133    /// Number of messages submitted to a specific chain
134    messages_submitted: Counter<u64>,
135
136    /// The balance of each wallet Hermes uses per chain
137    wallet_balance: ObservableGauge<f64>,
138
139    /// Indicates the latency for all transactions submitted to a specific chain,
140    /// i.e. the difference between the moment when Hermes received a batch of events
141    /// until the corresponding transaction(s) were submitted. Milliseconds.
142    tx_latency_submitted: Histogram<u64>,
143
144    /// Indicates the latency for all transactions submitted to a specific chain,
145    /// i.e. the difference between the moment when Hermes received a batch of events
146    /// until the corresponding transaction(s) were confirmed. Milliseconds.
147    tx_latency_confirmed: Histogram<u64>,
148
149    /// Records the time at which we started processing an event batch.
150    /// Used for computing the `tx_latency` metric.
151    in_flight_events: moka::sync::Cache<String, Instant>,
152
153    /// Number of SendPacket events received
154    send_packet_events: Counter<u64>,
155
156    /// Number of WriteAcknowledgement events received
157    acknowledgement_events: Counter<u64>,
158
159    /// Number of Timeout events received
160    timeout_events: Counter<u64>,
161
162    /// Number of SendPacket events received during the initial and periodic clearing
163    cleared_send_packet_events: Counter<u64>,
164
165    /// Number of WriteAcknowledgement events received during the initial and periodic clearing
166    cleared_acknowledgment_events: Counter<u64>,
167
168    /// Records the sequence number of the oldest pending packet. This corresponds to
169    /// the sequence number of the oldest SendPacket event for which no
170    /// WriteAcknowledgement or Timeout events have been received. The value is 0 if all the
171    /// SendPacket events were relayed.
172    backlog_oldest_sequence: ObservableGauge<u64>,
173
174    /// Record the timestamp of the last time the `backlog_*` metrics have been updated.
175    /// The timestamp is the time passed since the unix epoch in seconds.
176    backlog_latest_update_timestamp: ObservableGauge<u64>,
177
178    /// Records the length of the backlog, i.e., how many packets are pending.
179    backlog_size: ObservableGauge<u64>,
180
181    /// Stores the backlogs for all the paths the relayer is active on.
182    /// This is a map of multiple inner backlogs, one inner backlog per path.
183    ///
184    /// Each inner backlog is represented as a [`DashMap`].
185    /// Each inner backlog captures the sequence numbers & timestamp for all SendPacket events
186    /// that the relayer observed, and for which there was no associated Acknowledgement or
187    /// Timeout event.
188    backlogs: DashMap<PathIdentifier, DashMap<u64, u64>>,
189
190    /// Total amount of fees received from ICS29 fees.
191    fee_amounts: Counter<u64>,
192
193    /// List of addresses for which rewarded fees from ICS29 should be recorded.
194    visible_fee_addresses: DashSet<String>,
195
196    /// Vector of rewarded fees stored in a moka Cache value
197    cached_fees: Mutex<Vec<moka::sync::Cache<String, u64>>>,
198
199    /// Sum of rewarded fees over the past FEE_LIFETIME seconds
200    period_fees: ObservableGauge<u64>,
201
202    /// Number of errors observed by Hermes when broadcasting a Tx
203    broadcast_errors: Counter<u64>,
204
205    /// Number of errors observed by Hermes when simulating a Tx
206    simulate_errors: Counter<u64>,
207
208    /// The EIP-1559 base fee queried
209    dynamic_gas_queried_fees: Histogram<f64>,
210
211    /// The EIP-1559 base fee paid
212    dynamic_gas_paid_fees: Histogram<f64>,
213
214    /// The EIP-1559 base fee successfully queried
215    dynamic_gas_queried_success_fees: Histogram<f64>,
216
217    /// Number of ICS-20 packets filtered because the memo and/or the receiver fields were exceeding the configured limits
218    filtered_packets: Counter<u64>,
219
220    /// Observed ICS31 CrossChainQueries
221    cross_chain_queries: Counter<u64>,
222
223    /// Observed ICS31 CrossChainQuery successful Responses
224    cross_chain_query_responses: Counter<u64>,
225
226    /// Observed ICS31 CrossChainQuery error Responses
227    cross_chain_query_error_responses: Counter<u64>,
228}
229
230impl TelemetryState {
231    pub fn new(
232        tx_latency_submitted_range: Range<u64>,
233        tx_latency_submitted_buckets: u64,
234        tx_latency_confirmed_range: Range<u64>,
235        tx_latency_confirmed_buckets: u64,
236        namespace: &str,
237    ) -> Self {
238        let registry = Registry::new();
239
240        // Create views for custom histogram buckets
241        let tx_submitted_buckets = build_histogram_buckets(
242            tx_latency_submitted_range.start,
243            tx_latency_submitted_range.end,
244            tx_latency_submitted_buckets,
245        );
246
247        let tx_confirmed_buckets = build_histogram_buckets(
248            tx_latency_confirmed_range.start,
249            tx_latency_confirmed_range.end,
250            tx_latency_confirmed_buckets,
251        );
252
253        let tx_submitted_view = new_view(
254            Instrument::new().name("tx_latency_submitted"),
255            Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
256                boundaries: tx_submitted_buckets,
257                record_min_max: true,
258            }),
259        )
260        .unwrap();
261
262        let tx_confirmed_view = new_view(
263            Instrument::new().name("tx_latency_confirmed"),
264            Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
265                boundaries: tx_confirmed_buckets,
266                record_min_max: true,
267            }),
268        )
269        .unwrap();
270
271        let gas_fees_view = new_view(
272            Instrument::new().name("dynamic_gas_*_fees"),
273            Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
274                boundaries: vec![0.0025, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0],
275                record_min_max: true,
276            }),
277        )
278        .unwrap();
279
280        let raw_exporter = opentelemetry_prometheus::exporter().with_registry(registry.clone());
281
282        // Condition required to avoid prefixing `_` when using empty namespace
283        let exporter = if !namespace.is_empty() {
284            raw_exporter
285                .with_namespace(namespace)
286                .build()
287                .expect("Failed to create Prometheus exporter")
288        } else {
289            raw_exporter
290                .build()
291                .expect("Failed to create Prometheus exporter")
292        };
293
294        // Build MeterProvider with views
295        let meter_provider = MeterProvider::builder()
296            .with_reader(exporter)
297            .with_view(tx_submitted_view)
298            .with_view(tx_confirmed_view)
299            .with_view(gas_fees_view)
300            .build();
301        global::set_meter_provider(meter_provider.clone());
302
303        let meter = global::meter("hermes");
304
305        Self {
306            registry,
307            _meter_provider: meter_provider,
308
309            workers: meter
310                .i64_up_down_counter("workers")
311                .with_description("Number of workers")
312                .init(),
313
314            client_updates_submitted: meter
315                .u64_counter("client_updates_submitted")
316                .with_description("Number of client update messages submitted")
317                .init(),
318
319            client_updates_skipped: meter
320                .u64_counter("client_updates_skipped")
321                .with_description("Number of client update messages skipped")
322                .init(),
323
324            client_misbehaviours_submitted: meter
325                .u64_counter("client_misbehaviours_submitted")
326                .with_description("Number of misbehaviours detected and submitted")
327                .init(),
328
329            receive_packets_confirmed: meter
330                .u64_counter("receive_packets_confirmed")
331                .with_description("Number of confirmed receive packets. Available if relayer runs with Tx confirmation enabled")
332                .init(),
333
334            acknowledgment_packets_confirmed: meter
335                .u64_counter("acknowledgment_packets_confirmed")
336                .with_description("Number of confirmed acknowledgment packets. Available if relayer runs with Tx confirmation enabled")
337                .init(),
338
339            timeout_packets_confirmed: meter
340                .u64_counter("timeout_packets_confirmed")
341                .with_description("Number of confirmed timeout packets. Available if relayer runs with Tx confirmation enabled")
342                .init(),
343
344            queries: meter
345                .u64_counter("queries")
346                .with_description(
347                    "Number of queries submitted by Hermes",
348                )
349                .init(),
350
351            queries_cache_hits: meter
352                .u64_counter("queries_cache_hits")
353                .with_description("Number of cache hits for queries submitted by Hermes")
354                .init(),
355
356            ws_reconnect: meter
357                .u64_counter("ws_reconnect")
358                .with_description("Number of times Hermes reconnected to the websocket endpoint")
359                .init(),
360
361            ws_events: meter
362                .u64_counter("ws_events")
363                .with_description("How many IBC events did Hermes receive via the websocket subscription")
364                .init(),
365
366            messages_submitted: meter
367                .u64_counter("messages_submitted")
368                .with_description("Number of messages submitted to a specific chain")
369                .init(),
370
371            wallet_balance: meter
372                .f64_observable_gauge("wallet_balance")
373                .with_description("The balance of each wallet Hermes uses per chain. Please note that when converting the balance to f64 a loss in precision might be introduced in the displayed value")
374                .init(),
375
376            send_packet_events: meter
377                .u64_counter("send_packet_events")
378                .with_description("Number of SendPacket events received")
379                .init(),
380
381            acknowledgement_events: meter
382                .u64_counter("acknowledgement_events")
383                .with_description("Number of WriteAcknowledgement events received")
384                .init(),
385
386            timeout_events: meter
387                .u64_counter("timeout_events")
388                .with_description("Number of TimeoutPacket events received")
389                .init(),
390
391            cleared_send_packet_events: meter
392                .u64_counter("cleared_send_packet_events")
393                .with_description("Number of SendPacket events received during the initial and periodic clearing")
394                .init(),
395
396            cleared_acknowledgment_events: meter
397                .u64_counter("cleared_acknowledgment_events")
398                .with_description("Number of WriteAcknowledgement events received during the initial and periodic clearing")
399                .init(),
400
401            tx_latency_submitted: meter
402                .u64_histogram("tx_latency_submitted")
403                .with_unit(Unit::new("milliseconds"))
404                .with_description("The latency for all transactions submitted to a specific chain, \
405                    i.e. the difference between the moment when Hermes received a batch of events \
406                    and when it submitted the corresponding transaction(s). Milliseconds.")
407                .init(),
408
409            tx_latency_confirmed: meter
410                .u64_histogram("tx_latency_confirmed")
411                .with_unit(Unit::new("milliseconds"))
412                .with_description("The latency for all transactions submitted & confirmed to a specific chain, \
413                    i.e. the difference between the moment when Hermes received a batch of events \
414                    until the corresponding transaction(s) were confirmed. Milliseconds.")
415                .init(),
416
417            in_flight_events: moka::sync::Cache::builder()
418                .time_to_live(Duration::from_secs(60 * 60)) // Remove entries after 1 hour
419                .time_to_idle(Duration::from_secs(30 * 60)) // Remove entries if they have been idle for 30 minutes
420                .build(),
421
422            backlogs: DashMap::new(),
423
424            backlog_oldest_sequence: meter
425                .u64_observable_gauge("backlog_oldest_sequence")
426                .with_description("Sequence number of the oldest SendPacket event in the backlog")
427                .init(),
428
429            backlog_latest_update_timestamp: meter
430                .u64_observable_gauge("backlog_latest_update_timestamp")
431                .with_unit(Unit::new("seconds"))
432                .with_description("Local timestamp for the last time the backlog metrics have been updated")
433                .init(),
434
435            backlog_size: meter
436                .u64_observable_gauge("backlog_size")
437                .with_description("Total number of SendPacket events in the backlog")
438                .init(),
439
440            fee_amounts: meter
441                .u64_counter("ics29_fee_amounts")
442                .with_description("Total amount received from ICS29 fees")
443                .init(),
444
445            visible_fee_addresses: DashSet::new(),
446
447            cached_fees: Mutex::new(Vec::new()),
448
449            period_fees: meter
450                .u64_observable_gauge("ics29_period_fees")
451                .with_description("Amount of ICS29 fees rewarded over the past 7 days")
452                .init(),
453
454            broadcast_errors: meter
455                .u64_counter("broadcast_errors")
456                .with_description(
457                    "Number of errors observed by Hermes when broadcasting a Tx",
458                )
459                .init(),
460
461            simulate_errors: meter
462                .u64_counter("simulate_errors")
463                .with_description(
464                    "Number of errors observed by Hermes when simulating a Tx",
465                )
466                .init(),
467
468            dynamic_gas_queried_fees: meter
469                .f64_histogram("dynamic_gas_queried_fees")
470                .with_description("The EIP-1559 base fee queried")
471                .init(),
472
473            dynamic_gas_paid_fees: meter
474                .f64_histogram("dynamic_gas_paid_fees")
475                .with_description("The EIP-1559 base fee paid")
476                .init(),
477
478            dynamic_gas_queried_success_fees: meter
479                .f64_histogram("dynamic_gas_queried_success_fees")
480                .with_description("The EIP-1559 base fee successfully queried")
481                .init(),
482
483            filtered_packets: meter
484                .u64_counter("filtered_packets")
485                .with_description("Number of ICS-20 packets filtered because the memo and/or the receiver fields were exceeding the configured limits")
486                .init(),
487
488            cross_chain_queries: meter
489                .u64_counter("cross_chain_queries")
490                .with_description("Number of ICS-31 queries received")
491                .init(),
492
493            cross_chain_query_responses: meter
494                .u64_counter("cross_chain_query_responses")
495                .with_description("Number of ICS-31 successful query responses")
496                .init(),
497
498            cross_chain_query_error_responses: meter
499                .u64_counter("cross_chain_query_error_responses")
500                .with_description("Number of ICS-31 error query responses")
501                .init(),
502        }
503    }
504
505    /// Gather the metrics for export
506    pub fn gather(&self) -> Vec<MetricFamily> {
507        self.registry.gather()
508    }
509
510    pub fn init_worker_by_type(&self, worker_type: WorkerType) {
511        self.worker(worker_type, 0);
512    }
513
514    pub fn init_per_chain(&self, chain_id: &ChainId) {
515        let labels = &[KeyValue::new("chain", chain_id.to_string())];
516
517        self.ws_reconnect.add(0, labels);
518        self.ws_events.add(0, labels);
519        self.messages_submitted.add(0, labels);
520
521        self.init_queries(chain_id);
522    }
523
524    pub fn init_per_channel(
525        &self,
526        src_chain: &ChainId,
527        dst_chain: &ChainId,
528        src_channel: &ChannelId,
529        dst_channel: &ChannelId,
530        src_port: &PortId,
531        dst_port: &PortId,
532    ) {
533        let labels = &[
534            KeyValue::new("src_chain", src_chain.to_string()),
535            KeyValue::new("dst_chain", dst_chain.to_string()),
536            KeyValue::new("src_channel", src_channel.to_string()),
537            KeyValue::new("dst_channel", dst_channel.to_string()),
538            KeyValue::new("src_port", src_port.to_string()),
539            KeyValue::new("dst_port", dst_port.to_string()),
540        ];
541
542        self.receive_packets_confirmed.add(0, labels);
543        self.acknowledgment_packets_confirmed.add(0, labels);
544        self.timeout_packets_confirmed.add(0, labels);
545    }
546
547    pub fn init_per_path(
548        &self,
549        chain: &ChainId,
550        counterparty: &ChainId,
551        channel: &ChannelId,
552        port: &PortId,
553        clear_packets: bool,
554    ) {
555        let labels = &[
556            KeyValue::new("chain", chain.to_string()),
557            KeyValue::new("counterparty", counterparty.to_string()),
558            KeyValue::new("channel", channel.to_string()),
559            KeyValue::new("port", port.to_string()),
560        ];
561
562        self.send_packet_events.add(0, labels);
563        self.acknowledgement_events.add(0, labels);
564        self.timeout_events.add(0, labels);
565
566        if clear_packets {
567            self.cleared_send_packet_events.add(0, labels);
568            self.cleared_acknowledgment_events.add(0, labels);
569        }
570
571        self.backlog_oldest_sequence.observe(0, labels);
572        self.backlog_latest_update_timestamp.observe(0, labels);
573        self.backlog_size.observe(0, labels);
574    }
575
576    pub fn init_per_client(
577        &self,
578        src_chain: &ChainId,
579        dst_chain: &ChainId,
580        client: &ClientId,
581        misbehaviour: bool,
582    ) {
583        let labels = &[
584            KeyValue::new("src_chain", src_chain.to_string()),
585            KeyValue::new("dst_chain", dst_chain.to_string()),
586            KeyValue::new("client", client.to_string()),
587        ];
588
589        self.client_updates_submitted.add(0, labels);
590        self.client_updates_skipped.add(0, labels);
591
592        if misbehaviour {
593            self.client_misbehaviours_submitted.add(0, labels);
594        }
595    }
596
597    fn init_queries(&self, chain_id: &ChainId) {
598        for query_type in QUERY_TYPES {
599            let labels = &[
600                KeyValue::new("chain", chain_id.to_string()),
601                KeyValue::new("query_type", query_type),
602            ];
603
604            self.queries.add(0, labels);
605        }
606
607        for query_type in QUERY_TYPES_CACHE {
608            let labels = &[
609                KeyValue::new("chain", chain_id.to_string()),
610                KeyValue::new("query_type", query_type),
611            ];
612
613            self.queries_cache_hits.add(0, labels);
614        }
615    }
616
617    /// Update the number of workers per object
618    pub fn worker(&self, worker_type: WorkerType, count: i64) {
619        let labels = &[KeyValue::new("type", worker_type.to_string())];
620        self.workers.add(count, labels);
621    }
622
623    /// Update the number of client updates per client
624    pub fn client_updates_submitted(
625        &self,
626        src_chain: &ChainId,
627        dst_chain: &ChainId,
628        client: &ClientId,
629        count: u64,
630    ) {
631        let labels = &[
632            KeyValue::new("src_chain", src_chain.to_string()),
633            KeyValue::new("dst_chain", dst_chain.to_string()),
634            KeyValue::new("client", client.to_string()),
635        ];
636
637        self.client_updates_submitted.add(count, labels);
638    }
639
640    /// Update the number of client updates skipped per client
641    pub fn client_updates_skipped(
642        &self,
643        src_chain: &ChainId,
644        dst_chain: &ChainId,
645        client: &ClientId,
646        count: u64,
647    ) {
648        let labels = &[
649            KeyValue::new("src_chain", src_chain.to_string()),
650            KeyValue::new("dst_chain", dst_chain.to_string()),
651            KeyValue::new("client", client.to_string()),
652        ];
653
654        self.client_updates_skipped.add(count, labels);
655    }
656
657    /// Number of client misbehaviours per client
658    pub fn client_misbehaviours_submitted(
659        &self,
660        src_chain: &ChainId,
661        dst_chain: &ChainId,
662        client: &ClientId,
663        count: u64,
664    ) {
665        let labels = &[
666            KeyValue::new("src_chain", src_chain.to_string()),
667            KeyValue::new("dst_chain", dst_chain.to_string()),
668            KeyValue::new("client", client.to_string()),
669        ];
670
671        self.client_misbehaviours_submitted.add(count, labels);
672    }
673
674    /// Number of receive packets relayed, per channel
675    #[allow(clippy::too_many_arguments)]
676    pub fn receive_packets_confirmed(
677        &self,
678        src_chain: &ChainId,
679        dst_chain: &ChainId,
680        src_channel: &ChannelId,
681        dst_channel: &ChannelId,
682        src_port: &PortId,
683        dst_port: &PortId,
684        count: u64,
685    ) {
686        if count > 0 {
687            let labels = &[
688                KeyValue::new("src_chain", src_chain.to_string()),
689                KeyValue::new("dst_chain", dst_chain.to_string()),
690                KeyValue::new("src_channel", src_channel.to_string()),
691                KeyValue::new("dst_channel", dst_channel.to_string()),
692                KeyValue::new("src_port", src_port.to_string()),
693                KeyValue::new("dst_port", dst_port.to_string()),
694            ];
695
696            self.receive_packets_confirmed.add(count, labels);
697        }
698    }
699
700    /// Number of acknowledgment packets relayed, per channel
701    #[allow(clippy::too_many_arguments)]
702    pub fn acknowledgment_packets_confirmed(
703        &self,
704        src_chain: &ChainId,
705        dst_chain: &ChainId,
706        src_channel: &ChannelId,
707        dst_channel: &ChannelId,
708        src_port: &PortId,
709        dst_port: &PortId,
710        count: u64,
711    ) {
712        if count > 0 {
713            let labels = &[
714                KeyValue::new("src_chain", src_chain.to_string()),
715                KeyValue::new("dst_chain", dst_chain.to_string()),
716                KeyValue::new("src_channel", src_channel.to_string()),
717                KeyValue::new("dst_channel", dst_channel.to_string()),
718                KeyValue::new("src_port", src_port.to_string()),
719                KeyValue::new("dst_port", dst_port.to_string()),
720            ];
721
722            self.acknowledgment_packets_confirmed.add(count, labels);
723        }
724    }
725
726    /// Number of timeout packets relayed, per channel
727    #[allow(clippy::too_many_arguments)]
728    pub fn timeout_packets_confirmed(
729        &self,
730        src_chain: &ChainId,
731        dst_chain: &ChainId,
732        src_channel: &ChannelId,
733        dst_channel: &ChannelId,
734        src_port: &PortId,
735        dst_port: &PortId,
736        count: u64,
737    ) {
738        if count > 0 {
739            let labels = &[
740                KeyValue::new("src_chain", src_chain.to_string()),
741                KeyValue::new("dst_chain", dst_chain.to_string()),
742                KeyValue::new("src_channel", src_channel.to_string()),
743                KeyValue::new("dst_channel", dst_channel.to_string()),
744                KeyValue::new("src_port", src_port.to_string()),
745                KeyValue::new("dst_port", dst_port.to_string()),
746            ];
747
748            self.timeout_packets_confirmed.add(count, labels);
749        }
750    }
751
752    /// Number of queries emitted by the relayer, per chain and query type
753    pub fn query(&self, chain_id: &ChainId, query_type: &'static str) {
754        let labels = &[
755            KeyValue::new("chain", chain_id.to_string()),
756            KeyValue::new("query_type", query_type),
757        ];
758
759        self.queries.add(1, labels);
760    }
761
762    /// Number of cache hits for queries emitted by the relayer, per chain and query type
763    pub fn queries_cache_hits(&self, chain_id: &ChainId, query_type: &'static str) {
764        let labels = &[
765            KeyValue::new("chain", chain_id.to_string()),
766            KeyValue::new("query_type", query_type),
767        ];
768
769        self.queries_cache_hits.add(1, labels);
770    }
771
772    /// Number of time the relayer had to reconnect to the WebSocket endpoint, per chain
773    pub fn ws_reconnect(&self, chain_id: &ChainId) {
774        let labels = &[KeyValue::new("chain", chain_id.to_string())];
775
776        self.ws_reconnect.add(1, labels);
777    }
778
779    /// How many IBC events did Hermes receive via the WebSocket subscription, per chain
780    pub fn ws_events(&self, chain_id: &ChainId, count: u64) {
781        let labels = &[KeyValue::new("chain", chain_id.to_string())];
782
783        self.ws_events.add(count, labels);
784    }
785
786    /// How many messages Hermes submitted to the chain
787    pub fn messages_submitted(&self, chain_id: &ChainId, count: u64) {
788        let labels = &[KeyValue::new("chain", chain_id.to_string())];
789
790        self.messages_submitted.add(count, labels);
791    }
792
793    /// The balance in each wallet that Hermes is using, per account, denom and chain.
794    /// The amount given is of unit: 10^6 * `denom`
795    pub fn wallet_balance(&self, chain_id: &ChainId, account: &str, amount: f64, denom: &str) {
796        let labels = &[
797            KeyValue::new("chain", chain_id.to_string()),
798            KeyValue::new("account", account.to_string()),
799            KeyValue::new("denom", denom.to_string()),
800        ];
801
802        self.wallet_balance.observe(amount, labels);
803    }
804
805    pub fn received_event_batch(&self, tracking_id: impl ToString) {
806        self.in_flight_events
807            .insert(tracking_id.to_string(), Instant::now());
808    }
809
810    pub fn tx_submitted(
811        &self,
812        tx_count: usize,
813        tracking_id: impl ToString,
814        chain_id: &ChainId,
815        channel_id: &ChannelId,
816        port_id: &PortId,
817        counterparty_chain_id: &ChainId,
818    ) {
819        let tracking_id = tracking_id.to_string();
820
821        if let Some(start) = self.in_flight_events.get(&tracking_id) {
822            let latency = start.elapsed().as_millis() as u64;
823
824            let labels = &[
825                // KeyValue::new("tracking_id", tracking_id),
826                KeyValue::new("chain", chain_id.to_string()),
827                KeyValue::new("counterparty", counterparty_chain_id.to_string()),
828                KeyValue::new("channel", channel_id.to_string()),
829                KeyValue::new("port", port_id.to_string()),
830            ];
831
832            for _ in 0..tx_count {
833                self.tx_latency_submitted.record(latency, labels);
834            }
835        }
836    }
837
838    pub fn tx_confirmed(
839        &self,
840        tx_count: usize,
841        tracking_id: impl ToString,
842        chain_id: &ChainId,
843        channel_id: &ChannelId,
844        port_id: &PortId,
845        counterparty_chain_id: &ChainId,
846    ) {
847        let tracking_id = tracking_id.to_string();
848
849        if let Some(start) = self.in_flight_events.get(&tracking_id) {
850            let latency = start.elapsed().as_millis() as u64;
851
852            let labels = &[
853                // KeyValue::new("tracking_id", tracking_id),
854                KeyValue::new("chain", chain_id.to_string()),
855                KeyValue::new("counterparty", counterparty_chain_id.to_string()),
856                KeyValue::new("channel", channel_id.to_string()),
857                KeyValue::new("port", port_id.to_string()),
858            ];
859
860            for _ in 0..tx_count {
861                self.tx_latency_confirmed.record(latency, labels);
862            }
863        }
864    }
865
866    pub fn send_packet_events(
867        &self,
868        _seq_nr: u64,
869        _height: u64,
870        chain_id: &ChainId,
871        channel_id: &ChannelId,
872        port_id: &PortId,
873        counterparty_chain_id: &ChainId,
874    ) {
875        let labels = &[
876            KeyValue::new("chain", chain_id.to_string()),
877            KeyValue::new("counterparty", counterparty_chain_id.to_string()),
878            KeyValue::new("channel", channel_id.to_string()),
879            KeyValue::new("port", port_id.to_string()),
880        ];
881
882        self.send_packet_events.add(1, labels);
883    }
884
885    pub fn acknowledgement_events(
886        &self,
887        _seq_nr: u64,
888        _height: u64,
889        chain_id: &ChainId,
890        channel_id: &ChannelId,
891        port_id: &PortId,
892        counterparty_chain_id: &ChainId,
893    ) {
894        let labels = &[
895            KeyValue::new("chain", chain_id.to_string()),
896            KeyValue::new("counterparty", counterparty_chain_id.to_string()),
897            KeyValue::new("channel", channel_id.to_string()),
898            KeyValue::new("port", port_id.to_string()),
899        ];
900
901        self.acknowledgement_events.add(1, labels);
902    }
903
904    pub fn timeout_events(
905        &self,
906        chain_id: &ChainId,
907        channel_id: &ChannelId,
908        port_id: &PortId,
909        counterparty_chain_id: &ChainId,
910    ) {
911        let labels = &[
912            KeyValue::new("chain", chain_id.to_string()),
913            KeyValue::new("counterparty", counterparty_chain_id.to_string()),
914            KeyValue::new("channel", channel_id.to_string()),
915            KeyValue::new("port", port_id.to_string()),
916        ];
917
918        self.timeout_events.add(1, labels);
919    }
920
921    pub fn cleared_send_packet_events(
922        &self,
923        _seq_nr: u64,
924        _height: u64,
925        chain_id: &ChainId,
926        channel_id: &ChannelId,
927        port_id: &PortId,
928        counterparty_chain_id: &ChainId,
929    ) {
930        let labels: &[KeyValue; 4] = &[
931            KeyValue::new("chain", chain_id.to_string()),
932            KeyValue::new("counterparty", counterparty_chain_id.to_string()),
933            KeyValue::new("channel", channel_id.to_string()),
934            KeyValue::new("port", port_id.to_string()),
935        ];
936
937        self.cleared_send_packet_events.add(1, labels);
938    }
939
940    pub fn cleared_acknowledgment_events(
941        &self,
942        _seq_nr: u64,
943        _height: u64,
944        chain_id: &ChainId,
945        channel_id: &ChannelId,
946        port_id: &PortId,
947        counterparty_chain_id: &ChainId,
948    ) {
949        let labels: &[KeyValue; 4] = &[
950            KeyValue::new("chain", chain_id.to_string()),
951            KeyValue::new("counterparty", counterparty_chain_id.to_string()),
952            KeyValue::new("channel", channel_id.to_string()),
953            KeyValue::new("port", port_id.to_string()),
954        ];
955
956        self.cleared_acknowledgment_events.add(1, labels);
957    }
958
959    /// Inserts in the backlog a new event for the given sequence number.
960    /// This happens when the relayer observed a new SendPacket event.
961    pub fn backlog_insert(
962        &self,
963        seq_nr: u64,
964        chain_id: &ChainId,
965        channel_id: &ChannelId,
966        port_id: &PortId,
967        counterparty_chain_id: &ChainId,
968    ) {
969        // Unique identifier for a chain/channel/port.
970        let path_uid: PathIdentifier = PathIdentifier::new(
971            chain_id.to_string(),
972            channel_id.to_string(),
973            port_id.to_string(),
974        );
975
976        let labels = &[
977            KeyValue::new("chain", chain_id.to_string()),
978            KeyValue::new("counterparty", counterparty_chain_id.to_string()),
979            KeyValue::new("channel", channel_id.to_string()),
980            KeyValue::new("port", port_id.to_string()),
981        ];
982
983        // Retrieve local timestamp when this SendPacket event was recorded.
984        let now = Time::now();
985        let timestamp = match now.duration_since(Time::unix_epoch()) {
986            Ok(ts) => ts.as_secs(),
987            Err(_) => 0,
988        };
989
990        // Update the backlog with the incoming data and retrieve the oldest values
991        let (oldest_sn, total) = if let Some(path_backlog) = self.backlogs.get(&path_uid) {
992            // Avoid having the inner backlog map growing more than a given threshold, by removing
993            // the oldest sequence number entry.
994            if path_backlog.len() > BACKLOG_RESET_THRESHOLD {
995                if let Some(min) = path_backlog.iter().map(|v| *v.key()).min() {
996                    path_backlog.remove(&min);
997                }
998            }
999            path_backlog.insert(seq_nr, timestamp);
1000
1001            // Return the oldest event information to be recorded in telemetry
1002            if let Some(min) = path_backlog.iter().map(|v| *v.key()).min() {
1003                (min, path_backlog.len() as u64)
1004            } else {
1005                // We just inserted a new key/value, so this else branch is unlikely to activate,
1006                // but it can happen in case of concurrent updates to the backlog.
1007                (EMPTY_BACKLOG_SYMBOL, EMPTY_BACKLOG_SYMBOL)
1008            }
1009        } else {
1010            // If there is no inner backlog for this path, create a new map to store it.
1011            let new_path_backlog = DashMap::with_capacity(BACKLOG_CAPACITY);
1012            new_path_backlog.insert(seq_nr, timestamp);
1013            // Record it in the global backlog
1014            self.backlogs.insert(path_uid, new_path_backlog);
1015
1016            // Return the current event information to be recorded in telemetry
1017            (seq_nr, 1)
1018        };
1019
1020        // Update metrics to reflect the new state of the backlog
1021        self.backlog_oldest_sequence.observe(oldest_sn, labels);
1022        self.backlog_latest_update_timestamp
1023            .observe(timestamp, labels);
1024        self.backlog_size.observe(total, labels);
1025    }
1026
1027    /// Inserts in the backlog a new event for the given sequence number.
1028    /// This happens when the relayer observed a new SendPacket event.
1029    pub fn update_backlog(
1030        &self,
1031        sequences: Vec<u64>,
1032        chain_id: &ChainId,
1033        channel_id: &ChannelId,
1034        port_id: &PortId,
1035        counterparty_chain_id: &ChainId,
1036    ) {
1037        // Unique identifier for a chain/channel/port.
1038        let path_uid: PathIdentifier = PathIdentifier::new(
1039            chain_id.to_string(),
1040            channel_id.to_string(),
1041            port_id.to_string(),
1042        );
1043
1044        // This condition is done in order to avoid having an incorrect `backlog_latest_update_timestamp`.
1045        // If the sequences is an empty vector by removing the entries using `backlog_remove` the `backlog_latest_update_timestamp`
1046        // will only be updated if the current backlog is not empty.
1047        // If the sequences is not empty, then it is possible to simple remove the backlog for that path and insert the sequences.
1048        if sequences.is_empty() {
1049            if let Some(path_backlog) = self.backlogs.get(&path_uid) {
1050                let current_keys: Vec<u64> = path_backlog
1051                    .value()
1052                    .iter()
1053                    .map(|entry| *entry.key())
1054                    .collect();
1055
1056                for key in current_keys.iter() {
1057                    self.backlog_remove(*key, chain_id, channel_id, port_id, counterparty_chain_id)
1058                }
1059            }
1060        } else {
1061            self.backlogs.remove(&path_uid);
1062            for key in sequences.iter() {
1063                self.backlog_insert(*key, chain_id, channel_id, port_id, counterparty_chain_id)
1064            }
1065        }
1066    }
1067
1068    /// Evicts from the backlog the event for the given sequence number.
1069    /// Removing events happens when the relayer observed either an acknowledgment
1070    /// or a timeout for a packet sequence number, which means that the corresponding
1071    /// packet was relayed.
1072    pub fn backlog_remove(
1073        &self,
1074        seq_nr: u64,
1075        chain_id: &ChainId,
1076        channel_id: &ChannelId,
1077        port_id: &PortId,
1078        counterparty_chain_id: &ChainId,
1079    ) {
1080        // Unique identifier for a chain/channel/port path.
1081        let path_uid: PathIdentifier = PathIdentifier::new(
1082            chain_id.to_string(),
1083            channel_id.to_string(),
1084            port_id.to_string(),
1085        );
1086
1087        let labels = &[
1088            KeyValue::new("chain", chain_id.to_string()),
1089            KeyValue::new("counterparty", counterparty_chain_id.to_string()),
1090            KeyValue::new("channel", channel_id.to_string()),
1091            KeyValue::new("port", port_id.to_string()),
1092        ];
1093
1094        // Retrieve local timestamp when this SendPacket event was recorded.
1095        let now = Time::now();
1096        let timestamp = match now.duration_since(Time::unix_epoch()) {
1097            Ok(ts) => ts.as_secs(),
1098            Err(_) => 0,
1099        };
1100
1101        if let Some(path_backlog) = self.backlogs.get(&path_uid) {
1102            if path_backlog.remove(&seq_nr).is_some() {
1103                // If the entry was removed update the latest update timestamp.
1104                self.backlog_latest_update_timestamp
1105                    .observe(timestamp, labels);
1106                // The oldest pending sequence number is the minimum key in the inner (path) backlog.
1107                if let Some(min_key) = path_backlog.iter().map(|v| *v.key()).min() {
1108                    self.backlog_oldest_sequence.observe(min_key, labels);
1109                    self.backlog_size.observe(path_backlog.len() as u64, labels);
1110                } else {
1111                    // No minimum found, update the metrics to reflect an empty backlog
1112                    self.backlog_oldest_sequence
1113                        .observe(EMPTY_BACKLOG_SYMBOL, labels);
1114                    self.backlog_size.observe(EMPTY_BACKLOG_SYMBOL, labels);
1115                }
1116            }
1117        }
1118    }
1119
1120    /// Record the rewarded fee from ICS29 if the address is in the registered addresses
1121    /// list.
1122    pub fn fees_amount(&self, chain_id: &ChainId, receiver: &Signer, fee_amounts: Coin<String>) {
1123        // If the address isn't in the filter list, don't record the metric.
1124        if !self.visible_fee_addresses.contains(&receiver.to_string()) {
1125            return;
1126        }
1127        let labels = &[
1128            KeyValue::new("chain", chain_id.to_string()),
1129            KeyValue::new("receiver", receiver.to_string()),
1130            KeyValue::new("denom", fee_amounts.denom.to_string()),
1131        ];
1132
1133        let fee_amount = fee_amounts.amount.0.as_u64();
1134
1135        self.fee_amounts.add(fee_amount, labels);
1136
1137        let ephemeral_fee: moka::sync::Cache<String, u64> = moka::sync::Cache::builder()
1138            .time_to_live(FEE_LIFETIME) // Remove entries after 1 hour without insert
1139            .time_to_idle(FEE_LIFETIME) // Remove entries if they have been idle for 30 minutes without get or insert
1140            .build();
1141
1142        let key = format!("fee_amount:{chain_id}/{receiver}/{}", fee_amounts.denom);
1143        ephemeral_fee.insert(key.clone(), fee_amount);
1144
1145        let mut cached_fees = self.cached_fees.lock().unwrap();
1146        cached_fees.push(ephemeral_fee);
1147
1148        let sum: u64 = cached_fees.iter().filter_map(|e| e.get(&key)).sum();
1149
1150        self.period_fees.observe(sum, labels);
1151    }
1152
1153    pub fn update_period_fees(&self, chain_id: &ChainId, receiver: &String, denom: &String) {
1154        let labels = &[
1155            KeyValue::new("chain", chain_id.to_string()),
1156            KeyValue::new("receiver", receiver.to_string()),
1157            KeyValue::new("denom", denom.to_string()),
1158        ];
1159
1160        let key = format!("fee_amount:{chain_id}/{receiver}/{}", denom);
1161
1162        let cached_fees = self.cached_fees.lock().unwrap();
1163
1164        let sum: u64 = cached_fees.iter().filter_map(|e| e.get(&key)).sum();
1165
1166        self.period_fees.observe(sum, labels);
1167    }
1168
1169    // Add an address to the list of addresses which will record
1170    // the rewarded fees from ICS29.
1171    pub fn add_visible_fee_address(&self, address: String) {
1172        self.visible_fee_addresses.insert(address);
1173    }
1174
1175    /// Add an error and its description to the list of errors observed after broadcasting
1176    /// a Tx with a specific account.
1177    pub fn broadcast_errors(&self, address: &String, error_code: u32, error_description: &str) {
1178        let broadcast_error = BroadcastError::new(error_code, error_description);
1179
1180        let labels = &[
1181            KeyValue::new("account", address.to_string()),
1182            KeyValue::new("error_code", broadcast_error.code.to_string()),
1183            KeyValue::new("error_description", broadcast_error.description),
1184        ];
1185
1186        self.broadcast_errors.add(1, labels);
1187    }
1188
1189    /// Add an error and its description to the list of errors observed after simulating
1190    /// a Tx with a specific account.
1191    pub fn simulate_errors(&self, address: &String, recoverable: bool, error_description: String) {
1192        let labels = &[
1193            KeyValue::new("account", address.to_string()),
1194            KeyValue::new("recoverable", recoverable.to_string()),
1195            KeyValue::new("error_description", error_description.to_owned()),
1196        ];
1197
1198        self.simulate_errors.add(1, labels);
1199    }
1200
1201    pub fn dynamic_gas_queried_fees(&self, chain_id: &ChainId, amount: f64) {
1202        let labels = &[KeyValue::new("identifier", chain_id.to_string())];
1203
1204        self.dynamic_gas_queried_fees.record(amount, labels);
1205    }
1206
1207    pub fn dynamic_gas_paid_fees(&self, chain_id: &ChainId, amount: f64) {
1208        let labels = &[KeyValue::new("identifier", chain_id.to_string())];
1209
1210        self.dynamic_gas_paid_fees.record(amount, labels);
1211    }
1212
1213    pub fn dynamic_gas_queried_success_fees(&self, chain_id: &ChainId, amount: f64) {
1214        let labels = &[KeyValue::new("identifier", chain_id.to_string())];
1215
1216        self.dynamic_gas_queried_success_fees.record(amount, labels);
1217    }
1218
1219    /// Increment number of packets filtered because the memo field is too big
1220    #[allow(clippy::too_many_arguments)]
1221    pub fn filtered_packets(
1222        &self,
1223        src_chain: &ChainId,
1224        dst_chain: &ChainId,
1225        src_channel: &ChannelId,
1226        dst_channel: &ChannelId,
1227        src_port: &PortId,
1228        dst_port: &PortId,
1229        count: u64,
1230    ) {
1231        if count > 0 {
1232            let labels = &[
1233                KeyValue::new("src_chain", src_chain.to_string()),
1234                KeyValue::new("dst_chain", dst_chain.to_string()),
1235                KeyValue::new("src_channel", src_channel.to_string()),
1236                KeyValue::new("dst_channel", dst_channel.to_string()),
1237                KeyValue::new("src_port", src_port.to_string()),
1238                KeyValue::new("dst_port", dst_port.to_string()),
1239            ];
1240
1241            self.filtered_packets.add(count, labels);
1242        }
1243    }
1244
1245    pub fn cross_chain_queries(&self, src_chain: &ChainId, dst_chain: &ChainId, count: usize) {
1246        if count > 0 {
1247            let labels = &[
1248                KeyValue::new("src_chain", src_chain.to_string()),
1249                KeyValue::new("dst_chain", dst_chain.to_string()),
1250            ];
1251
1252            self.cross_chain_queries.add(count as u64, labels);
1253        }
1254    }
1255
1256    pub fn cross_chain_query_responses(
1257        &self,
1258        src_chain: &ChainId,
1259        dst_chain: &ChainId,
1260        ccq_responses_codes: Vec<tendermint::abci::Code>,
1261    ) {
1262        let labels = &[
1263            KeyValue::new("src_chain", src_chain.to_string()),
1264            KeyValue::new("dst_chain", dst_chain.to_string()),
1265        ];
1266
1267        for code in ccq_responses_codes.iter() {
1268            if code.is_ok() {
1269                self.cross_chain_query_responses.add(1, labels);
1270            } else {
1271                self.cross_chain_query_error_responses.add(1, labels);
1272            }
1273        }
1274    }
1275}
1276
1277fn build_histogram_buckets(start: u64, end: u64, buckets: u64) -> Vec<f64> {
1278    let step = (end - start) / buckets;
1279    (0..=buckets)
1280        .map(|i| (start + i * step) as f64)
1281        .collect::<Vec<_>>()
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286    use prometheus::proto::Metric;
1287
1288    use super::*;
1289
1290    #[test]
1291    fn insert_remove_backlog() {
1292        let state = TelemetryState::new(
1293            Range {
1294                start: 0,
1295                end: 5000,
1296            },
1297            5,
1298            Range {
1299                start: 0,
1300                end: 5000,
1301            },
1302            5,
1303            "hermes",
1304        );
1305
1306        let chain_id = ChainId::from_string("chain-test");
1307        let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
1308        let channel_id = ChannelId::new(0);
1309        let port_id = PortId::transfer();
1310
1311        state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1312        state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1313        state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1314        state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1315        state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1316        state.backlog_remove(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1317        state.backlog_remove(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1318
1319        let metrics = state.registry.gather().clone();
1320        let backlog_size = metrics
1321            .iter()
1322            .find(|metric| metric.get_name() == "hermes_backlog_size")
1323            .unwrap();
1324        assert!(
1325            assert_metric_value(backlog_size.get_metric(), 3),
1326            "expected backlog_size to be 3"
1327        );
1328        let backlog_oldest_sequence = metrics
1329            .iter()
1330            .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence")
1331            .unwrap();
1332        assert!(
1333            assert_metric_value(backlog_oldest_sequence.get_metric(), 2),
1334            "expected backlog_oldest_sequence to be 2"
1335        );
1336    }
1337
1338    #[test]
1339    fn update_backlog() {
1340        let state = TelemetryState::new(
1341            Range {
1342                start: 0,
1343                end: 5000,
1344            },
1345            5,
1346            Range {
1347                start: 0,
1348                end: 5000,
1349            },
1350            5,
1351            "hermes",
1352        );
1353
1354        let chain_id = ChainId::from_string("chain-test");
1355        let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
1356        let channel_id = ChannelId::new(0);
1357        let port_id = PortId::transfer();
1358
1359        state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1360        state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1361        state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1362        state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1363        state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1364
1365        state.update_backlog(
1366            vec![5],
1367            &chain_id,
1368            &channel_id,
1369            &port_id,
1370            &counterparty_chain_id,
1371        );
1372
1373        let metrics = state.registry.gather().clone();
1374        let backlog_size = metrics
1375            .iter()
1376            .find(|&metric| metric.get_name() == "hermes_backlog_size")
1377            .unwrap();
1378        assert!(
1379            assert_metric_value(backlog_size.get_metric(), 1),
1380            "expected backlog_size to be 1"
1381        );
1382        let backlog_oldest_sequence = metrics
1383            .iter()
1384            .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence")
1385            .unwrap();
1386        assert!(
1387            assert_metric_value(backlog_oldest_sequence.get_metric(), 5),
1388            "expected backlog_oldest_sequence to be 5"
1389        );
1390    }
1391
1392    #[test]
1393    fn update_backlog_empty() {
1394        let state = TelemetryState::new(
1395            Range {
1396                start: 0,
1397                end: 5000,
1398            },
1399            5,
1400            Range {
1401                start: 0,
1402                end: 5000,
1403            },
1404            5,
1405            "hermes_",
1406        );
1407
1408        let chain_id = ChainId::from_string("chain-test");
1409        let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
1410        let channel_id = ChannelId::new(0);
1411        let port_id = PortId::transfer();
1412
1413        state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1414        state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1415        state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1416        state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1417        state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1418
1419        state.update_backlog(
1420            vec![],
1421            &chain_id,
1422            &channel_id,
1423            &port_id,
1424            &counterparty_chain_id,
1425        );
1426
1427        let metrics = state.registry.gather().clone();
1428        let backlog_size = metrics
1429            .iter()
1430            .find(|&metric| metric.get_name() == "hermes_backlog_size")
1431            .unwrap();
1432        assert!(
1433            assert_metric_value(backlog_size.get_metric(), 0),
1434            "expected backlog_size to be 0"
1435        );
1436        let backlog_oldest_sequence = metrics
1437            .iter()
1438            .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence")
1439            .unwrap();
1440        assert!(
1441            assert_metric_value(backlog_oldest_sequence.get_metric(), 0),
1442            "expected backlog_oldest_sequence to be 0"
1443        );
1444    }
1445
1446    fn assert_metric_value(metric: &[Metric], expected: u64) -> bool {
1447        metric
1448            .iter()
1449            .any(|m| m.get_gauge().get_value() as u64 == expected)
1450    }
1451}