1use core::fmt::{Display, Error as FmtError, Formatter};
2use std::{
3 ops::Range,
4 sync::Mutex,
5 time::{Duration, Instant},
6};
7
8use dashmap::{DashMap, DashSet};
9use opentelemetry::{
10 global,
11 metrics::{Counter, Histogram, ObservableGauge, Unit, UpDownCounter},
12 KeyValue,
13};
14use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, MeterProvider, Stream};
15use prometheus::{proto::MetricFamily, Registry};
16
17use ibc_relayer_types::{
18 applications::transfer::Coin,
19 core::ics24_host::identifier::{ChainId, ChannelId, ClientId, PortId},
20 signer::Signer,
21};
22
23use tendermint::Time;
24
25use crate::{broadcast_error::BroadcastError, path_identifier::PathIdentifier};
26
27const EMPTY_BACKLOG_SYMBOL: u64 = 0;
28const BACKLOG_CAPACITY: usize = 1000;
29const BACKLOG_RESET_THRESHOLD: usize = 900;
30
31const QUERY_TYPES_CACHE: [&str; 4] = [
32 "query_latest_height",
33 "query_client_state",
34 "query_connection",
35 "query_channel",
36];
37
38const QUERY_TYPES: [&str; 26] = [
39 "query_latest_height",
40 "query_block",
41 "query_blocks",
42 "query_packet_events",
43 "query_txs",
44 "query_next_sequence_receive",
45 "query_unreceived_acknowledgements",
46 "query_packet_acknowledgements",
47 "query_unreceived_packets",
48 "query_packet_commitments",
49 "query_channel_client_state",
50 "query_channel",
51 "query_channels",
52 "query_connection_channels",
53 "query_connection",
54 "query_connections",
55 "query_client_connections",
56 "query_consensus_state",
57 "query_consensus_states",
58 "query_upgraded_consensus_state",
59 "query_client_state",
60 "query_clients",
61 "query_application_status",
62 "query_commitment_prefix",
63 "query_latest_height",
64 "query_staking_params",
65];
66
67const FEE_LIFETIME: Duration = Duration::from_secs(60 * 60 * 24 * 7);
71
72#[derive(Copy, Clone, Debug)]
73pub enum WorkerType {
74 Client,
75 Connection,
76 Channel,
77 Packet,
78 Wallet,
79 CrossChainQuery,
80}
81
82impl Display for WorkerType {
83 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
84 match self {
85 Self::Client => write!(f, "client"),
86 Self::Connection => write!(f, "connection"),
87 Self::Channel => write!(f, "channel"),
88 Self::Packet => write!(f, "packet"),
89 Self::Wallet => write!(f, "wallet"),
90 Self::CrossChainQuery => write!(f, "cross-chain-query"),
91 }
92 }
93}
94
95pub struct TelemetryState {
96 registry: Registry,
97 _meter_provider: MeterProvider,
98
99 workers: UpDownCounter<i64>,
101
102 client_updates_submitted: Counter<u64>,
104
105 client_updates_skipped: Counter<u64>,
108
109 client_misbehaviours_submitted: Counter<u64>,
111
112 receive_packets_confirmed: Counter<u64>,
114
115 acknowledgment_packets_confirmed: Counter<u64>,
117
118 timeout_packets_confirmed: Counter<u64>,
120
121 queries: Counter<u64>,
123
124 queries_cache_hits: Counter<u64>,
126
127 ws_reconnect: Counter<u64>,
129
130 ws_events: Counter<u64>,
132
133 messages_submitted: Counter<u64>,
135
136 wallet_balance: ObservableGauge<f64>,
138
139 tx_latency_submitted: Histogram<u64>,
143
144 tx_latency_confirmed: Histogram<u64>,
148
149 in_flight_events: moka::sync::Cache<String, Instant>,
152
153 send_packet_events: Counter<u64>,
155
156 acknowledgement_events: Counter<u64>,
158
159 timeout_events: Counter<u64>,
161
162 cleared_send_packet_events: Counter<u64>,
164
165 cleared_acknowledgment_events: Counter<u64>,
167
168 backlog_oldest_sequence: ObservableGauge<u64>,
173
174 backlog_latest_update_timestamp: ObservableGauge<u64>,
177
178 backlog_size: ObservableGauge<u64>,
180
181 backlogs: DashMap<PathIdentifier, DashMap<u64, u64>>,
189
190 fee_amounts: Counter<u64>,
192
193 visible_fee_addresses: DashSet<String>,
195
196 cached_fees: Mutex<Vec<moka::sync::Cache<String, u64>>>,
198
199 period_fees: ObservableGauge<u64>,
201
202 broadcast_errors: Counter<u64>,
204
205 simulate_errors: Counter<u64>,
207
208 dynamic_gas_queried_fees: Histogram<f64>,
210
211 dynamic_gas_paid_fees: Histogram<f64>,
213
214 dynamic_gas_queried_success_fees: Histogram<f64>,
216
217 filtered_packets: Counter<u64>,
219
220 cross_chain_queries: Counter<u64>,
222
223 cross_chain_query_responses: Counter<u64>,
225
226 cross_chain_query_error_responses: Counter<u64>,
228}
229
230impl TelemetryState {
231 pub fn new(
232 tx_latency_submitted_range: Range<u64>,
233 tx_latency_submitted_buckets: u64,
234 tx_latency_confirmed_range: Range<u64>,
235 tx_latency_confirmed_buckets: u64,
236 namespace: &str,
237 ) -> Self {
238 let registry = Registry::new();
239
240 let tx_submitted_buckets = build_histogram_buckets(
242 tx_latency_submitted_range.start,
243 tx_latency_submitted_range.end,
244 tx_latency_submitted_buckets,
245 );
246
247 let tx_confirmed_buckets = build_histogram_buckets(
248 tx_latency_confirmed_range.start,
249 tx_latency_confirmed_range.end,
250 tx_latency_confirmed_buckets,
251 );
252
253 let tx_submitted_view = new_view(
254 Instrument::new().name("tx_latency_submitted"),
255 Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
256 boundaries: tx_submitted_buckets,
257 record_min_max: true,
258 }),
259 )
260 .unwrap();
261
262 let tx_confirmed_view = new_view(
263 Instrument::new().name("tx_latency_confirmed"),
264 Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
265 boundaries: tx_confirmed_buckets,
266 record_min_max: true,
267 }),
268 )
269 .unwrap();
270
271 let gas_fees_view = new_view(
272 Instrument::new().name("dynamic_gas_*_fees"),
273 Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
274 boundaries: vec![0.0025, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0],
275 record_min_max: true,
276 }),
277 )
278 .unwrap();
279
280 let raw_exporter = opentelemetry_prometheus::exporter().with_registry(registry.clone());
281
282 let exporter = if !namespace.is_empty() {
284 raw_exporter
285 .with_namespace(namespace)
286 .build()
287 .expect("Failed to create Prometheus exporter")
288 } else {
289 raw_exporter
290 .build()
291 .expect("Failed to create Prometheus exporter")
292 };
293
294 let meter_provider = MeterProvider::builder()
296 .with_reader(exporter)
297 .with_view(tx_submitted_view)
298 .with_view(tx_confirmed_view)
299 .with_view(gas_fees_view)
300 .build();
301 global::set_meter_provider(meter_provider.clone());
302
303 let meter = global::meter("hermes");
304
305 Self {
306 registry,
307 _meter_provider: meter_provider,
308
309 workers: meter
310 .i64_up_down_counter("workers")
311 .with_description("Number of workers")
312 .init(),
313
314 client_updates_submitted: meter
315 .u64_counter("client_updates_submitted")
316 .with_description("Number of client update messages submitted")
317 .init(),
318
319 client_updates_skipped: meter
320 .u64_counter("client_updates_skipped")
321 .with_description("Number of client update messages skipped")
322 .init(),
323
324 client_misbehaviours_submitted: meter
325 .u64_counter("client_misbehaviours_submitted")
326 .with_description("Number of misbehaviours detected and submitted")
327 .init(),
328
329 receive_packets_confirmed: meter
330 .u64_counter("receive_packets_confirmed")
331 .with_description("Number of confirmed receive packets. Available if relayer runs with Tx confirmation enabled")
332 .init(),
333
334 acknowledgment_packets_confirmed: meter
335 .u64_counter("acknowledgment_packets_confirmed")
336 .with_description("Number of confirmed acknowledgment packets. Available if relayer runs with Tx confirmation enabled")
337 .init(),
338
339 timeout_packets_confirmed: meter
340 .u64_counter("timeout_packets_confirmed")
341 .with_description("Number of confirmed timeout packets. Available if relayer runs with Tx confirmation enabled")
342 .init(),
343
344 queries: meter
345 .u64_counter("queries")
346 .with_description(
347 "Number of queries submitted by Hermes",
348 )
349 .init(),
350
351 queries_cache_hits: meter
352 .u64_counter("queries_cache_hits")
353 .with_description("Number of cache hits for queries submitted by Hermes")
354 .init(),
355
356 ws_reconnect: meter
357 .u64_counter("ws_reconnect")
358 .with_description("Number of times Hermes reconnected to the websocket endpoint")
359 .init(),
360
361 ws_events: meter
362 .u64_counter("ws_events")
363 .with_description("How many IBC events did Hermes receive via the websocket subscription")
364 .init(),
365
366 messages_submitted: meter
367 .u64_counter("messages_submitted")
368 .with_description("Number of messages submitted to a specific chain")
369 .init(),
370
371 wallet_balance: meter
372 .f64_observable_gauge("wallet_balance")
373 .with_description("The balance of each wallet Hermes uses per chain. Please note that when converting the balance to f64 a loss in precision might be introduced in the displayed value")
374 .init(),
375
376 send_packet_events: meter
377 .u64_counter("send_packet_events")
378 .with_description("Number of SendPacket events received")
379 .init(),
380
381 acknowledgement_events: meter
382 .u64_counter("acknowledgement_events")
383 .with_description("Number of WriteAcknowledgement events received")
384 .init(),
385
386 timeout_events: meter
387 .u64_counter("timeout_events")
388 .with_description("Number of TimeoutPacket events received")
389 .init(),
390
391 cleared_send_packet_events: meter
392 .u64_counter("cleared_send_packet_events")
393 .with_description("Number of SendPacket events received during the initial and periodic clearing")
394 .init(),
395
396 cleared_acknowledgment_events: meter
397 .u64_counter("cleared_acknowledgment_events")
398 .with_description("Number of WriteAcknowledgement events received during the initial and periodic clearing")
399 .init(),
400
401 tx_latency_submitted: meter
402 .u64_histogram("tx_latency_submitted")
403 .with_unit(Unit::new("milliseconds"))
404 .with_description("The latency for all transactions submitted to a specific chain, \
405 i.e. the difference between the moment when Hermes received a batch of events \
406 and when it submitted the corresponding transaction(s). Milliseconds.")
407 .init(),
408
409 tx_latency_confirmed: meter
410 .u64_histogram("tx_latency_confirmed")
411 .with_unit(Unit::new("milliseconds"))
412 .with_description("The latency for all transactions submitted & confirmed to a specific chain, \
413 i.e. the difference between the moment when Hermes received a batch of events \
414 until the corresponding transaction(s) were confirmed. Milliseconds.")
415 .init(),
416
417 in_flight_events: moka::sync::Cache::builder()
418 .time_to_live(Duration::from_secs(60 * 60)) .time_to_idle(Duration::from_secs(30 * 60)) .build(),
421
422 backlogs: DashMap::new(),
423
424 backlog_oldest_sequence: meter
425 .u64_observable_gauge("backlog_oldest_sequence")
426 .with_description("Sequence number of the oldest SendPacket event in the backlog")
427 .init(),
428
429 backlog_latest_update_timestamp: meter
430 .u64_observable_gauge("backlog_latest_update_timestamp")
431 .with_unit(Unit::new("seconds"))
432 .with_description("Local timestamp for the last time the backlog metrics have been updated")
433 .init(),
434
435 backlog_size: meter
436 .u64_observable_gauge("backlog_size")
437 .with_description("Total number of SendPacket events in the backlog")
438 .init(),
439
440 fee_amounts: meter
441 .u64_counter("ics29_fee_amounts")
442 .with_description("Total amount received from ICS29 fees")
443 .init(),
444
445 visible_fee_addresses: DashSet::new(),
446
447 cached_fees: Mutex::new(Vec::new()),
448
449 period_fees: meter
450 .u64_observable_gauge("ics29_period_fees")
451 .with_description("Amount of ICS29 fees rewarded over the past 7 days")
452 .init(),
453
454 broadcast_errors: meter
455 .u64_counter("broadcast_errors")
456 .with_description(
457 "Number of errors observed by Hermes when broadcasting a Tx",
458 )
459 .init(),
460
461 simulate_errors: meter
462 .u64_counter("simulate_errors")
463 .with_description(
464 "Number of errors observed by Hermes when simulating a Tx",
465 )
466 .init(),
467
468 dynamic_gas_queried_fees: meter
469 .f64_histogram("dynamic_gas_queried_fees")
470 .with_description("The EIP-1559 base fee queried")
471 .init(),
472
473 dynamic_gas_paid_fees: meter
474 .f64_histogram("dynamic_gas_paid_fees")
475 .with_description("The EIP-1559 base fee paid")
476 .init(),
477
478 dynamic_gas_queried_success_fees: meter
479 .f64_histogram("dynamic_gas_queried_success_fees")
480 .with_description("The EIP-1559 base fee successfully queried")
481 .init(),
482
483 filtered_packets: meter
484 .u64_counter("filtered_packets")
485 .with_description("Number of ICS-20 packets filtered because the memo and/or the receiver fields were exceeding the configured limits")
486 .init(),
487
488 cross_chain_queries: meter
489 .u64_counter("cross_chain_queries")
490 .with_description("Number of ICS-31 queries received")
491 .init(),
492
493 cross_chain_query_responses: meter
494 .u64_counter("cross_chain_query_responses")
495 .with_description("Number of ICS-31 successful query responses")
496 .init(),
497
498 cross_chain_query_error_responses: meter
499 .u64_counter("cross_chain_query_error_responses")
500 .with_description("Number of ICS-31 error query responses")
501 .init(),
502 }
503 }
504
505 pub fn gather(&self) -> Vec<MetricFamily> {
507 self.registry.gather()
508 }
509
510 pub fn init_worker_by_type(&self, worker_type: WorkerType) {
511 self.worker(worker_type, 0);
512 }
513
514 pub fn init_per_chain(&self, chain_id: &ChainId) {
515 let labels = &[KeyValue::new("chain", chain_id.to_string())];
516
517 self.ws_reconnect.add(0, labels);
518 self.ws_events.add(0, labels);
519 self.messages_submitted.add(0, labels);
520
521 self.init_queries(chain_id);
522 }
523
524 pub fn init_per_channel(
525 &self,
526 src_chain: &ChainId,
527 dst_chain: &ChainId,
528 src_channel: &ChannelId,
529 dst_channel: &ChannelId,
530 src_port: &PortId,
531 dst_port: &PortId,
532 ) {
533 let labels = &[
534 KeyValue::new("src_chain", src_chain.to_string()),
535 KeyValue::new("dst_chain", dst_chain.to_string()),
536 KeyValue::new("src_channel", src_channel.to_string()),
537 KeyValue::new("dst_channel", dst_channel.to_string()),
538 KeyValue::new("src_port", src_port.to_string()),
539 KeyValue::new("dst_port", dst_port.to_string()),
540 ];
541
542 self.receive_packets_confirmed.add(0, labels);
543 self.acknowledgment_packets_confirmed.add(0, labels);
544 self.timeout_packets_confirmed.add(0, labels);
545 }
546
547 pub fn init_per_path(
548 &self,
549 chain: &ChainId,
550 counterparty: &ChainId,
551 channel: &ChannelId,
552 port: &PortId,
553 clear_packets: bool,
554 ) {
555 let labels = &[
556 KeyValue::new("chain", chain.to_string()),
557 KeyValue::new("counterparty", counterparty.to_string()),
558 KeyValue::new("channel", channel.to_string()),
559 KeyValue::new("port", port.to_string()),
560 ];
561
562 self.send_packet_events.add(0, labels);
563 self.acknowledgement_events.add(0, labels);
564 self.timeout_events.add(0, labels);
565
566 if clear_packets {
567 self.cleared_send_packet_events.add(0, labels);
568 self.cleared_acknowledgment_events.add(0, labels);
569 }
570
571 self.backlog_oldest_sequence.observe(0, labels);
572 self.backlog_latest_update_timestamp.observe(0, labels);
573 self.backlog_size.observe(0, labels);
574 }
575
576 pub fn init_per_client(
577 &self,
578 src_chain: &ChainId,
579 dst_chain: &ChainId,
580 client: &ClientId,
581 misbehaviour: bool,
582 ) {
583 let labels = &[
584 KeyValue::new("src_chain", src_chain.to_string()),
585 KeyValue::new("dst_chain", dst_chain.to_string()),
586 KeyValue::new("client", client.to_string()),
587 ];
588
589 self.client_updates_submitted.add(0, labels);
590 self.client_updates_skipped.add(0, labels);
591
592 if misbehaviour {
593 self.client_misbehaviours_submitted.add(0, labels);
594 }
595 }
596
597 fn init_queries(&self, chain_id: &ChainId) {
598 for query_type in QUERY_TYPES {
599 let labels = &[
600 KeyValue::new("chain", chain_id.to_string()),
601 KeyValue::new("query_type", query_type),
602 ];
603
604 self.queries.add(0, labels);
605 }
606
607 for query_type in QUERY_TYPES_CACHE {
608 let labels = &[
609 KeyValue::new("chain", chain_id.to_string()),
610 KeyValue::new("query_type", query_type),
611 ];
612
613 self.queries_cache_hits.add(0, labels);
614 }
615 }
616
617 pub fn worker(&self, worker_type: WorkerType, count: i64) {
619 let labels = &[KeyValue::new("type", worker_type.to_string())];
620 self.workers.add(count, labels);
621 }
622
623 pub fn client_updates_submitted(
625 &self,
626 src_chain: &ChainId,
627 dst_chain: &ChainId,
628 client: &ClientId,
629 count: u64,
630 ) {
631 let labels = &[
632 KeyValue::new("src_chain", src_chain.to_string()),
633 KeyValue::new("dst_chain", dst_chain.to_string()),
634 KeyValue::new("client", client.to_string()),
635 ];
636
637 self.client_updates_submitted.add(count, labels);
638 }
639
640 pub fn client_updates_skipped(
642 &self,
643 src_chain: &ChainId,
644 dst_chain: &ChainId,
645 client: &ClientId,
646 count: u64,
647 ) {
648 let labels = &[
649 KeyValue::new("src_chain", src_chain.to_string()),
650 KeyValue::new("dst_chain", dst_chain.to_string()),
651 KeyValue::new("client", client.to_string()),
652 ];
653
654 self.client_updates_skipped.add(count, labels);
655 }
656
657 pub fn client_misbehaviours_submitted(
659 &self,
660 src_chain: &ChainId,
661 dst_chain: &ChainId,
662 client: &ClientId,
663 count: u64,
664 ) {
665 let labels = &[
666 KeyValue::new("src_chain", src_chain.to_string()),
667 KeyValue::new("dst_chain", dst_chain.to_string()),
668 KeyValue::new("client", client.to_string()),
669 ];
670
671 self.client_misbehaviours_submitted.add(count, labels);
672 }
673
674 #[allow(clippy::too_many_arguments)]
676 pub fn receive_packets_confirmed(
677 &self,
678 src_chain: &ChainId,
679 dst_chain: &ChainId,
680 src_channel: &ChannelId,
681 dst_channel: &ChannelId,
682 src_port: &PortId,
683 dst_port: &PortId,
684 count: u64,
685 ) {
686 if count > 0 {
687 let labels = &[
688 KeyValue::new("src_chain", src_chain.to_string()),
689 KeyValue::new("dst_chain", dst_chain.to_string()),
690 KeyValue::new("src_channel", src_channel.to_string()),
691 KeyValue::new("dst_channel", dst_channel.to_string()),
692 KeyValue::new("src_port", src_port.to_string()),
693 KeyValue::new("dst_port", dst_port.to_string()),
694 ];
695
696 self.receive_packets_confirmed.add(count, labels);
697 }
698 }
699
700 #[allow(clippy::too_many_arguments)]
702 pub fn acknowledgment_packets_confirmed(
703 &self,
704 src_chain: &ChainId,
705 dst_chain: &ChainId,
706 src_channel: &ChannelId,
707 dst_channel: &ChannelId,
708 src_port: &PortId,
709 dst_port: &PortId,
710 count: u64,
711 ) {
712 if count > 0 {
713 let labels = &[
714 KeyValue::new("src_chain", src_chain.to_string()),
715 KeyValue::new("dst_chain", dst_chain.to_string()),
716 KeyValue::new("src_channel", src_channel.to_string()),
717 KeyValue::new("dst_channel", dst_channel.to_string()),
718 KeyValue::new("src_port", src_port.to_string()),
719 KeyValue::new("dst_port", dst_port.to_string()),
720 ];
721
722 self.acknowledgment_packets_confirmed.add(count, labels);
723 }
724 }
725
726 #[allow(clippy::too_many_arguments)]
728 pub fn timeout_packets_confirmed(
729 &self,
730 src_chain: &ChainId,
731 dst_chain: &ChainId,
732 src_channel: &ChannelId,
733 dst_channel: &ChannelId,
734 src_port: &PortId,
735 dst_port: &PortId,
736 count: u64,
737 ) {
738 if count > 0 {
739 let labels = &[
740 KeyValue::new("src_chain", src_chain.to_string()),
741 KeyValue::new("dst_chain", dst_chain.to_string()),
742 KeyValue::new("src_channel", src_channel.to_string()),
743 KeyValue::new("dst_channel", dst_channel.to_string()),
744 KeyValue::new("src_port", src_port.to_string()),
745 KeyValue::new("dst_port", dst_port.to_string()),
746 ];
747
748 self.timeout_packets_confirmed.add(count, labels);
749 }
750 }
751
752 pub fn query(&self, chain_id: &ChainId, query_type: &'static str) {
754 let labels = &[
755 KeyValue::new("chain", chain_id.to_string()),
756 KeyValue::new("query_type", query_type),
757 ];
758
759 self.queries.add(1, labels);
760 }
761
762 pub fn queries_cache_hits(&self, chain_id: &ChainId, query_type: &'static str) {
764 let labels = &[
765 KeyValue::new("chain", chain_id.to_string()),
766 KeyValue::new("query_type", query_type),
767 ];
768
769 self.queries_cache_hits.add(1, labels);
770 }
771
772 pub fn ws_reconnect(&self, chain_id: &ChainId) {
774 let labels = &[KeyValue::new("chain", chain_id.to_string())];
775
776 self.ws_reconnect.add(1, labels);
777 }
778
779 pub fn ws_events(&self, chain_id: &ChainId, count: u64) {
781 let labels = &[KeyValue::new("chain", chain_id.to_string())];
782
783 self.ws_events.add(count, labels);
784 }
785
786 pub fn messages_submitted(&self, chain_id: &ChainId, count: u64) {
788 let labels = &[KeyValue::new("chain", chain_id.to_string())];
789
790 self.messages_submitted.add(count, labels);
791 }
792
793 pub fn wallet_balance(&self, chain_id: &ChainId, account: &str, amount: f64, denom: &str) {
796 let labels = &[
797 KeyValue::new("chain", chain_id.to_string()),
798 KeyValue::new("account", account.to_string()),
799 KeyValue::new("denom", denom.to_string()),
800 ];
801
802 self.wallet_balance.observe(amount, labels);
803 }
804
805 pub fn received_event_batch(&self, tracking_id: impl ToString) {
806 self.in_flight_events
807 .insert(tracking_id.to_string(), Instant::now());
808 }
809
810 pub fn tx_submitted(
811 &self,
812 tx_count: usize,
813 tracking_id: impl ToString,
814 chain_id: &ChainId,
815 channel_id: &ChannelId,
816 port_id: &PortId,
817 counterparty_chain_id: &ChainId,
818 ) {
819 let tracking_id = tracking_id.to_string();
820
821 if let Some(start) = self.in_flight_events.get(&tracking_id) {
822 let latency = start.elapsed().as_millis() as u64;
823
824 let labels = &[
825 KeyValue::new("chain", chain_id.to_string()),
827 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
828 KeyValue::new("channel", channel_id.to_string()),
829 KeyValue::new("port", port_id.to_string()),
830 ];
831
832 for _ in 0..tx_count {
833 self.tx_latency_submitted.record(latency, labels);
834 }
835 }
836 }
837
838 pub fn tx_confirmed(
839 &self,
840 tx_count: usize,
841 tracking_id: impl ToString,
842 chain_id: &ChainId,
843 channel_id: &ChannelId,
844 port_id: &PortId,
845 counterparty_chain_id: &ChainId,
846 ) {
847 let tracking_id = tracking_id.to_string();
848
849 if let Some(start) = self.in_flight_events.get(&tracking_id) {
850 let latency = start.elapsed().as_millis() as u64;
851
852 let labels = &[
853 KeyValue::new("chain", chain_id.to_string()),
855 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
856 KeyValue::new("channel", channel_id.to_string()),
857 KeyValue::new("port", port_id.to_string()),
858 ];
859
860 for _ in 0..tx_count {
861 self.tx_latency_confirmed.record(latency, labels);
862 }
863 }
864 }
865
866 pub fn send_packet_events(
867 &self,
868 _seq_nr: u64,
869 _height: u64,
870 chain_id: &ChainId,
871 channel_id: &ChannelId,
872 port_id: &PortId,
873 counterparty_chain_id: &ChainId,
874 ) {
875 let labels = &[
876 KeyValue::new("chain", chain_id.to_string()),
877 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
878 KeyValue::new("channel", channel_id.to_string()),
879 KeyValue::new("port", port_id.to_string()),
880 ];
881
882 self.send_packet_events.add(1, labels);
883 }
884
885 pub fn acknowledgement_events(
886 &self,
887 _seq_nr: u64,
888 _height: u64,
889 chain_id: &ChainId,
890 channel_id: &ChannelId,
891 port_id: &PortId,
892 counterparty_chain_id: &ChainId,
893 ) {
894 let labels = &[
895 KeyValue::new("chain", chain_id.to_string()),
896 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
897 KeyValue::new("channel", channel_id.to_string()),
898 KeyValue::new("port", port_id.to_string()),
899 ];
900
901 self.acknowledgement_events.add(1, labels);
902 }
903
904 pub fn timeout_events(
905 &self,
906 chain_id: &ChainId,
907 channel_id: &ChannelId,
908 port_id: &PortId,
909 counterparty_chain_id: &ChainId,
910 ) {
911 let labels = &[
912 KeyValue::new("chain", chain_id.to_string()),
913 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
914 KeyValue::new("channel", channel_id.to_string()),
915 KeyValue::new("port", port_id.to_string()),
916 ];
917
918 self.timeout_events.add(1, labels);
919 }
920
921 pub fn cleared_send_packet_events(
922 &self,
923 _seq_nr: u64,
924 _height: u64,
925 chain_id: &ChainId,
926 channel_id: &ChannelId,
927 port_id: &PortId,
928 counterparty_chain_id: &ChainId,
929 ) {
930 let labels: &[KeyValue; 4] = &[
931 KeyValue::new("chain", chain_id.to_string()),
932 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
933 KeyValue::new("channel", channel_id.to_string()),
934 KeyValue::new("port", port_id.to_string()),
935 ];
936
937 self.cleared_send_packet_events.add(1, labels);
938 }
939
940 pub fn cleared_acknowledgment_events(
941 &self,
942 _seq_nr: u64,
943 _height: u64,
944 chain_id: &ChainId,
945 channel_id: &ChannelId,
946 port_id: &PortId,
947 counterparty_chain_id: &ChainId,
948 ) {
949 let labels: &[KeyValue; 4] = &[
950 KeyValue::new("chain", chain_id.to_string()),
951 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
952 KeyValue::new("channel", channel_id.to_string()),
953 KeyValue::new("port", port_id.to_string()),
954 ];
955
956 self.cleared_acknowledgment_events.add(1, labels);
957 }
958
959 pub fn backlog_insert(
962 &self,
963 seq_nr: u64,
964 chain_id: &ChainId,
965 channel_id: &ChannelId,
966 port_id: &PortId,
967 counterparty_chain_id: &ChainId,
968 ) {
969 let path_uid: PathIdentifier = PathIdentifier::new(
971 chain_id.to_string(),
972 channel_id.to_string(),
973 port_id.to_string(),
974 );
975
976 let labels = &[
977 KeyValue::new("chain", chain_id.to_string()),
978 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
979 KeyValue::new("channel", channel_id.to_string()),
980 KeyValue::new("port", port_id.to_string()),
981 ];
982
983 let now = Time::now();
985 let timestamp = match now.duration_since(Time::unix_epoch()) {
986 Ok(ts) => ts.as_secs(),
987 Err(_) => 0,
988 };
989
990 let (oldest_sn, total) = if let Some(path_backlog) = self.backlogs.get(&path_uid) {
992 if path_backlog.len() > BACKLOG_RESET_THRESHOLD {
995 if let Some(min) = path_backlog.iter().map(|v| *v.key()).min() {
996 path_backlog.remove(&min);
997 }
998 }
999 path_backlog.insert(seq_nr, timestamp);
1000
1001 if let Some(min) = path_backlog.iter().map(|v| *v.key()).min() {
1003 (min, path_backlog.len() as u64)
1004 } else {
1005 (EMPTY_BACKLOG_SYMBOL, EMPTY_BACKLOG_SYMBOL)
1008 }
1009 } else {
1010 let new_path_backlog = DashMap::with_capacity(BACKLOG_CAPACITY);
1012 new_path_backlog.insert(seq_nr, timestamp);
1013 self.backlogs.insert(path_uid, new_path_backlog);
1015
1016 (seq_nr, 1)
1018 };
1019
1020 self.backlog_oldest_sequence.observe(oldest_sn, labels);
1022 self.backlog_latest_update_timestamp
1023 .observe(timestamp, labels);
1024 self.backlog_size.observe(total, labels);
1025 }
1026
1027 pub fn update_backlog(
1030 &self,
1031 sequences: Vec<u64>,
1032 chain_id: &ChainId,
1033 channel_id: &ChannelId,
1034 port_id: &PortId,
1035 counterparty_chain_id: &ChainId,
1036 ) {
1037 let path_uid: PathIdentifier = PathIdentifier::new(
1039 chain_id.to_string(),
1040 channel_id.to_string(),
1041 port_id.to_string(),
1042 );
1043
1044 if sequences.is_empty() {
1049 if let Some(path_backlog) = self.backlogs.get(&path_uid) {
1050 let current_keys: Vec<u64> = path_backlog
1051 .value()
1052 .iter()
1053 .map(|entry| *entry.key())
1054 .collect();
1055
1056 for key in current_keys.iter() {
1057 self.backlog_remove(*key, chain_id, channel_id, port_id, counterparty_chain_id)
1058 }
1059 }
1060 } else {
1061 self.backlogs.remove(&path_uid);
1062 for key in sequences.iter() {
1063 self.backlog_insert(*key, chain_id, channel_id, port_id, counterparty_chain_id)
1064 }
1065 }
1066 }
1067
1068 pub fn backlog_remove(
1073 &self,
1074 seq_nr: u64,
1075 chain_id: &ChainId,
1076 channel_id: &ChannelId,
1077 port_id: &PortId,
1078 counterparty_chain_id: &ChainId,
1079 ) {
1080 let path_uid: PathIdentifier = PathIdentifier::new(
1082 chain_id.to_string(),
1083 channel_id.to_string(),
1084 port_id.to_string(),
1085 );
1086
1087 let labels = &[
1088 KeyValue::new("chain", chain_id.to_string()),
1089 KeyValue::new("counterparty", counterparty_chain_id.to_string()),
1090 KeyValue::new("channel", channel_id.to_string()),
1091 KeyValue::new("port", port_id.to_string()),
1092 ];
1093
1094 let now = Time::now();
1096 let timestamp = match now.duration_since(Time::unix_epoch()) {
1097 Ok(ts) => ts.as_secs(),
1098 Err(_) => 0,
1099 };
1100
1101 if let Some(path_backlog) = self.backlogs.get(&path_uid) {
1102 if path_backlog.remove(&seq_nr).is_some() {
1103 self.backlog_latest_update_timestamp
1105 .observe(timestamp, labels);
1106 if let Some(min_key) = path_backlog.iter().map(|v| *v.key()).min() {
1108 self.backlog_oldest_sequence.observe(min_key, labels);
1109 self.backlog_size.observe(path_backlog.len() as u64, labels);
1110 } else {
1111 self.backlog_oldest_sequence
1113 .observe(EMPTY_BACKLOG_SYMBOL, labels);
1114 self.backlog_size.observe(EMPTY_BACKLOG_SYMBOL, labels);
1115 }
1116 }
1117 }
1118 }
1119
1120 pub fn fees_amount(&self, chain_id: &ChainId, receiver: &Signer, fee_amounts: Coin<String>) {
1123 if !self.visible_fee_addresses.contains(&receiver.to_string()) {
1125 return;
1126 }
1127 let labels = &[
1128 KeyValue::new("chain", chain_id.to_string()),
1129 KeyValue::new("receiver", receiver.to_string()),
1130 KeyValue::new("denom", fee_amounts.denom.to_string()),
1131 ];
1132
1133 let fee_amount = fee_amounts.amount.0.as_u64();
1134
1135 self.fee_amounts.add(fee_amount, labels);
1136
1137 let ephemeral_fee: moka::sync::Cache<String, u64> = moka::sync::Cache::builder()
1138 .time_to_live(FEE_LIFETIME) .time_to_idle(FEE_LIFETIME) .build();
1141
1142 let key = format!("fee_amount:{chain_id}/{receiver}/{}", fee_amounts.denom);
1143 ephemeral_fee.insert(key.clone(), fee_amount);
1144
1145 let mut cached_fees = self.cached_fees.lock().unwrap();
1146 cached_fees.push(ephemeral_fee);
1147
1148 let sum: u64 = cached_fees.iter().filter_map(|e| e.get(&key)).sum();
1149
1150 self.period_fees.observe(sum, labels);
1151 }
1152
1153 pub fn update_period_fees(&self, chain_id: &ChainId, receiver: &String, denom: &String) {
1154 let labels = &[
1155 KeyValue::new("chain", chain_id.to_string()),
1156 KeyValue::new("receiver", receiver.to_string()),
1157 KeyValue::new("denom", denom.to_string()),
1158 ];
1159
1160 let key = format!("fee_amount:{chain_id}/{receiver}/{}", denom);
1161
1162 let cached_fees = self.cached_fees.lock().unwrap();
1163
1164 let sum: u64 = cached_fees.iter().filter_map(|e| e.get(&key)).sum();
1165
1166 self.period_fees.observe(sum, labels);
1167 }
1168
1169 pub fn add_visible_fee_address(&self, address: String) {
1172 self.visible_fee_addresses.insert(address);
1173 }
1174
1175 pub fn broadcast_errors(&self, address: &String, error_code: u32, error_description: &str) {
1178 let broadcast_error = BroadcastError::new(error_code, error_description);
1179
1180 let labels = &[
1181 KeyValue::new("account", address.to_string()),
1182 KeyValue::new("error_code", broadcast_error.code.to_string()),
1183 KeyValue::new("error_description", broadcast_error.description),
1184 ];
1185
1186 self.broadcast_errors.add(1, labels);
1187 }
1188
1189 pub fn simulate_errors(&self, address: &String, recoverable: bool, error_description: String) {
1192 let labels = &[
1193 KeyValue::new("account", address.to_string()),
1194 KeyValue::new("recoverable", recoverable.to_string()),
1195 KeyValue::new("error_description", error_description.to_owned()),
1196 ];
1197
1198 self.simulate_errors.add(1, labels);
1199 }
1200
1201 pub fn dynamic_gas_queried_fees(&self, chain_id: &ChainId, amount: f64) {
1202 let labels = &[KeyValue::new("identifier", chain_id.to_string())];
1203
1204 self.dynamic_gas_queried_fees.record(amount, labels);
1205 }
1206
1207 pub fn dynamic_gas_paid_fees(&self, chain_id: &ChainId, amount: f64) {
1208 let labels = &[KeyValue::new("identifier", chain_id.to_string())];
1209
1210 self.dynamic_gas_paid_fees.record(amount, labels);
1211 }
1212
1213 pub fn dynamic_gas_queried_success_fees(&self, chain_id: &ChainId, amount: f64) {
1214 let labels = &[KeyValue::new("identifier", chain_id.to_string())];
1215
1216 self.dynamic_gas_queried_success_fees.record(amount, labels);
1217 }
1218
1219 #[allow(clippy::too_many_arguments)]
1221 pub fn filtered_packets(
1222 &self,
1223 src_chain: &ChainId,
1224 dst_chain: &ChainId,
1225 src_channel: &ChannelId,
1226 dst_channel: &ChannelId,
1227 src_port: &PortId,
1228 dst_port: &PortId,
1229 count: u64,
1230 ) {
1231 if count > 0 {
1232 let labels = &[
1233 KeyValue::new("src_chain", src_chain.to_string()),
1234 KeyValue::new("dst_chain", dst_chain.to_string()),
1235 KeyValue::new("src_channel", src_channel.to_string()),
1236 KeyValue::new("dst_channel", dst_channel.to_string()),
1237 KeyValue::new("src_port", src_port.to_string()),
1238 KeyValue::new("dst_port", dst_port.to_string()),
1239 ];
1240
1241 self.filtered_packets.add(count, labels);
1242 }
1243 }
1244
1245 pub fn cross_chain_queries(&self, src_chain: &ChainId, dst_chain: &ChainId, count: usize) {
1246 if count > 0 {
1247 let labels = &[
1248 KeyValue::new("src_chain", src_chain.to_string()),
1249 KeyValue::new("dst_chain", dst_chain.to_string()),
1250 ];
1251
1252 self.cross_chain_queries.add(count as u64, labels);
1253 }
1254 }
1255
1256 pub fn cross_chain_query_responses(
1257 &self,
1258 src_chain: &ChainId,
1259 dst_chain: &ChainId,
1260 ccq_responses_codes: Vec<tendermint::abci::Code>,
1261 ) {
1262 let labels = &[
1263 KeyValue::new("src_chain", src_chain.to_string()),
1264 KeyValue::new("dst_chain", dst_chain.to_string()),
1265 ];
1266
1267 for code in ccq_responses_codes.iter() {
1268 if code.is_ok() {
1269 self.cross_chain_query_responses.add(1, labels);
1270 } else {
1271 self.cross_chain_query_error_responses.add(1, labels);
1272 }
1273 }
1274 }
1275}
1276
1277fn build_histogram_buckets(start: u64, end: u64, buckets: u64) -> Vec<f64> {
1278 let step = (end - start) / buckets;
1279 (0..=buckets)
1280 .map(|i| (start + i * step) as f64)
1281 .collect::<Vec<_>>()
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286 use prometheus::proto::Metric;
1287
1288 use super::*;
1289
1290 #[test]
1291 fn insert_remove_backlog() {
1292 let state = TelemetryState::new(
1293 Range {
1294 start: 0,
1295 end: 5000,
1296 },
1297 5,
1298 Range {
1299 start: 0,
1300 end: 5000,
1301 },
1302 5,
1303 "hermes",
1304 );
1305
1306 let chain_id = ChainId::from_string("chain-test");
1307 let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
1308 let channel_id = ChannelId::new(0);
1309 let port_id = PortId::transfer();
1310
1311 state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1312 state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1313 state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1314 state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1315 state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1316 state.backlog_remove(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1317 state.backlog_remove(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1318
1319 let metrics = state.registry.gather().clone();
1320 let backlog_size = metrics
1321 .iter()
1322 .find(|metric| metric.get_name() == "hermes_backlog_size")
1323 .unwrap();
1324 assert!(
1325 assert_metric_value(backlog_size.get_metric(), 3),
1326 "expected backlog_size to be 3"
1327 );
1328 let backlog_oldest_sequence = metrics
1329 .iter()
1330 .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence")
1331 .unwrap();
1332 assert!(
1333 assert_metric_value(backlog_oldest_sequence.get_metric(), 2),
1334 "expected backlog_oldest_sequence to be 2"
1335 );
1336 }
1337
1338 #[test]
1339 fn update_backlog() {
1340 let state = TelemetryState::new(
1341 Range {
1342 start: 0,
1343 end: 5000,
1344 },
1345 5,
1346 Range {
1347 start: 0,
1348 end: 5000,
1349 },
1350 5,
1351 "hermes",
1352 );
1353
1354 let chain_id = ChainId::from_string("chain-test");
1355 let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
1356 let channel_id = ChannelId::new(0);
1357 let port_id = PortId::transfer();
1358
1359 state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1360 state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1361 state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1362 state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1363 state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1364
1365 state.update_backlog(
1366 vec![5],
1367 &chain_id,
1368 &channel_id,
1369 &port_id,
1370 &counterparty_chain_id,
1371 );
1372
1373 let metrics = state.registry.gather().clone();
1374 let backlog_size = metrics
1375 .iter()
1376 .find(|&metric| metric.get_name() == "hermes_backlog_size")
1377 .unwrap();
1378 assert!(
1379 assert_metric_value(backlog_size.get_metric(), 1),
1380 "expected backlog_size to be 1"
1381 );
1382 let backlog_oldest_sequence = metrics
1383 .iter()
1384 .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence")
1385 .unwrap();
1386 assert!(
1387 assert_metric_value(backlog_oldest_sequence.get_metric(), 5),
1388 "expected backlog_oldest_sequence to be 5"
1389 );
1390 }
1391
1392 #[test]
1393 fn update_backlog_empty() {
1394 let state = TelemetryState::new(
1395 Range {
1396 start: 0,
1397 end: 5000,
1398 },
1399 5,
1400 Range {
1401 start: 0,
1402 end: 5000,
1403 },
1404 5,
1405 "hermes_",
1406 );
1407
1408 let chain_id = ChainId::from_string("chain-test");
1409 let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
1410 let channel_id = ChannelId::new(0);
1411 let port_id = PortId::transfer();
1412
1413 state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1414 state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1415 state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1416 state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1417 state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
1418
1419 state.update_backlog(
1420 vec![],
1421 &chain_id,
1422 &channel_id,
1423 &port_id,
1424 &counterparty_chain_id,
1425 );
1426
1427 let metrics = state.registry.gather().clone();
1428 let backlog_size = metrics
1429 .iter()
1430 .find(|&metric| metric.get_name() == "hermes_backlog_size")
1431 .unwrap();
1432 assert!(
1433 assert_metric_value(backlog_size.get_metric(), 0),
1434 "expected backlog_size to be 0"
1435 );
1436 let backlog_oldest_sequence = metrics
1437 .iter()
1438 .find(|&metric| metric.get_name() == "hermes_backlog_oldest_sequence")
1439 .unwrap();
1440 assert!(
1441 assert_metric_value(backlog_oldest_sequence.get_metric(), 0),
1442 "expected backlog_oldest_sequence to be 0"
1443 );
1444 }
1445
1446 fn assert_metric_value(metric: &[Metric], expected: u64) -> bool {
1447 metric
1448 .iter()
1449 .any(|m| m.get_gauge().get_value() as u64 == expected)
1450 }
1451}