use std::collections::HashMap;
use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::{Family, MetricConstructor};
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{linear_buckets, Histogram};
use prometheus_client::registry::Registry;
use crate::topic::TopicHash;
use crate::types::{MessageAcceptance, PeerKind};
const DEFAULT_MAX_TOPICS: usize = 300;
const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 50;
#[derive(Debug, Clone)]
pub struct Config {
pub max_topics: usize,
pub max_never_subscribed_topics: usize,
pub score_buckets: Vec<f64>,
}
impl Config {
pub fn buckets_using_scoring_thresholds(&mut self, params: &crate::PeerScoreThresholds) {
self.score_buckets = vec![
params.graylist_threshold,
params.publish_threshold,
params.gossip_threshold,
params.gossip_threshold / 2.0,
params.gossip_threshold / 4.0,
0.0,
1.0,
10.0,
100.0,
];
}
}
impl Default for Config {
fn default() -> Self {
let gossip_threshold = -4000.0;
let publish_threshold = -8000.0;
let graylist_threshold = -16000.0;
let score_buckets: Vec<f64> = vec![
graylist_threshold,
publish_threshold,
gossip_threshold,
gossip_threshold / 2.0,
gossip_threshold / 4.0,
0.0,
1.0,
10.0,
100.0,
];
Config {
max_topics: DEFAULT_MAX_TOPICS,
max_never_subscribed_topics: DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS,
score_buckets,
}
}
}
type EverSubscribed = bool;
pub struct Metrics {
max_topics: usize,
max_never_subscribed_topics: usize,
topic_info: HashMap<TopicHash, EverSubscribed>,
topic_subscription_status: Family<TopicHash, Gauge>,
topic_peers_count: Family<TopicHash, Gauge>,
invalid_messages: Family<TopicHash, Counter>,
accepted_messages: Family<TopicHash, Counter>,
ignored_messages: Family<TopicHash, Counter>,
rejected_messages: Family<TopicHash, Counter>,
mesh_peer_counts: Family<TopicHash, Gauge>,
mesh_peer_inclusion_events: Family<InclusionLabel, Counter>,
mesh_peer_churn_events: Family<ChurnLabel, Counter>,
topic_msg_sent_counts: Family<TopicHash, Counter>,
topic_msg_sent_bytes: Family<TopicHash, Counter>,
topic_msg_published: Family<TopicHash, Counter>,
topic_msg_recv_counts_unfiltered: Family<TopicHash, Counter>,
topic_msg_recv_counts: Family<TopicHash, Counter>,
topic_msg_recv_bytes: Family<TopicHash, Counter>,
score_per_mesh: Family<TopicHash, Histogram, HistBuilder>,
scoring_penalties: Family<PenaltyLabel, Counter>,
peers_per_protocol: Family<ProtocolLabel, Gauge>,
heartbeat_duration: Histogram,
memcache_misses: Counter,
topic_iwant_msgs: Family<TopicHash, Counter>,
}
impl Metrics {
pub fn new(registry: &mut Registry, config: Config) -> Self {
let Config {
max_topics,
max_never_subscribed_topics,
score_buckets,
} = config;
macro_rules! register_family {
($name:expr, $help:expr) => {{
let fam = Family::default();
registry.register($name, $help, Box::new(fam.clone()));
fam
}};
}
let topic_subscription_status = register_family!(
"topic_subscription_status",
"Subscription status per known topic"
);
let topic_peers_count = register_family!(
"topic_peers_counts",
"Number of peers subscribed to each topic"
);
let invalid_messages = register_family!(
"invalid_messages_per_topic",
"Number of invalid messages received for each topic"
);
let accepted_messages = register_family!(
"accepted_messages_per_topic",
"Number of accepted messages received for each topic"
);
let ignored_messages = register_family!(
"ignored_messages_per_topic",
"Number of ignored messages received for each topic"
);
let rejected_messages = register_family!(
"rejected_messages_per_topic",
"Number of rejected messages received for each topic"
);
let mesh_peer_counts = register_family!(
"mesh_peer_counts",
"Number of peers in each topic in our mesh"
);
let mesh_peer_inclusion_events = register_family!(
"mesh_peer_inclusion_events",
"Number of times a peer gets added to our mesh for different reasons"
);
let mesh_peer_churn_events = register_family!(
"mesh_peer_churn_events",
"Number of times a peer gets removed from our mesh for different reasons"
);
let topic_msg_sent_counts = register_family!(
"topic_msg_sent_counts",
"Number of gossip messages sent to each topic"
);
let topic_msg_published = register_family!(
"topic_msg_published",
"Number of gossip messages published to each topic"
);
let topic_msg_sent_bytes = register_family!(
"topic_msg_sent_bytes",
"Bytes from gossip messages sent to each topic"
);
let topic_msg_recv_counts_unfiltered = register_family!(
"topic_msg_recv_counts_unfiltered",
"Number of gossip messages received on each topic (without duplicates being filtered)"
);
let topic_msg_recv_counts = register_family!(
"topic_msg_recv_counts",
"Number of gossip messages received on each topic (after duplicates have been filtered)"
);
let topic_msg_recv_bytes = register_family!(
"topic_msg_recv_bytes",
"Bytes received from gossip messages for each topic"
);
let hist_builder = HistBuilder {
buckets: score_buckets,
};
let score_per_mesh: Family<_, _, HistBuilder> = Family::new_with_constructor(hist_builder);
registry.register(
"score_per_mesh",
"Histogram of scores per mesh topic",
Box::new(score_per_mesh.clone()),
);
let scoring_penalties = register_family!(
"scoring_penalties",
"Counter of types of scoring penalties given to peers"
);
let peers_per_protocol = register_family!(
"peers_per_protocol",
"Number of connected peers by protocol type"
);
let heartbeat_duration = Histogram::new(linear_buckets(0.0, 50.0, 10));
registry.register(
"heartbeat_duration",
"Histogram of observed heartbeat durations",
Box::new(heartbeat_duration.clone()),
);
let topic_iwant_msgs = register_family!(
"topic_iwant_msgs",
"Number of times we have decided an IWANT is required for this topic"
);
let memcache_misses = {
let metric = Counter::default();
registry.register(
"memcache_misses",
"Number of times a message is not found in the duplicate cache when validating",
Box::new(metric.clone()),
);
metric
};
Self {
max_topics,
max_never_subscribed_topics,
topic_info: HashMap::default(),
topic_subscription_status,
topic_peers_count,
invalid_messages,
accepted_messages,
ignored_messages,
rejected_messages,
mesh_peer_counts,
mesh_peer_inclusion_events,
mesh_peer_churn_events,
topic_msg_sent_counts,
topic_msg_sent_bytes,
topic_msg_published,
topic_msg_recv_counts_unfiltered,
topic_msg_recv_counts,
topic_msg_recv_bytes,
score_per_mesh,
scoring_penalties,
peers_per_protocol,
heartbeat_duration,
memcache_misses,
topic_iwant_msgs,
}
}
fn non_subscription_topics_count(&self) -> usize {
self.topic_info
.values()
.filter(|&ever_subscribed| !ever_subscribed)
.count()
}
fn register_topic(&mut self, topic: &TopicHash) -> Result<(), ()> {
if self.topic_info.contains_key(topic) {
Ok(())
} else if self.topic_info.len() < self.max_topics
&& self.non_subscription_topics_count() < self.max_never_subscribed_topics
{
self.topic_info.entry(topic.clone()).or_insert(false);
self.topic_subscription_status.get_or_create(topic).set(0);
Ok(())
} else {
Err(())
}
}
pub fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count
.get_or_create(topic)
.set(count as u64);
}
}
pub fn joined(&mut self, topic: &TopicHash) {
if self.topic_info.contains_key(topic) || self.topic_info.len() < self.max_topics {
self.topic_info.insert(topic.clone(), true);
let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(1);
debug_assert_eq!(was_subscribed, 0);
self.mesh_peer_counts.get_or_create(topic).set(0);
}
}
pub fn left(&mut self, topic: &TopicHash) {
if self.topic_info.contains_key(topic) {
let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(0);
debug_assert_eq!(was_subscribed, 1);
self.mesh_peer_counts.get_or_create(topic).set(0);
}
}
pub fn peers_included(&mut self, topic: &TopicHash, reason: Inclusion, count: usize) {
if self.register_topic(topic).is_ok() {
self.mesh_peer_inclusion_events
.get_or_create(&InclusionLabel {
hash: topic.to_string(),
reason,
})
.inc_by(count as u64);
}
}
pub fn peers_removed(&mut self, topic: &TopicHash, reason: Churn, count: usize) {
if self.register_topic(topic).is_ok() {
self.mesh_peer_churn_events
.get_or_create(&ChurnLabel {
hash: topic.to_string(),
reason,
})
.inc_by(count as u64);
}
}
pub fn set_mesh_peers(&mut self, topic: &TopicHash, count: usize) {
if self.register_topic(topic).is_ok() {
self.mesh_peer_counts.get_or_create(topic).set(count as u64);
}
}
pub fn register_invalid_message(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.invalid_messages.get_or_create(topic).inc();
}
}
pub fn register_score_penalty(&mut self, penalty: Penalty) {
self.scoring_penalties
.get_or_create(&PenaltyLabel { penalty })
.inc();
}
pub fn register_published_message(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_msg_published.get_or_create(topic).inc();
}
}
pub fn msg_sent(&mut self, topic: &TopicHash, bytes: usize) {
if self.register_topic(topic).is_ok() {
self.topic_msg_sent_counts.get_or_create(topic).inc();
self.topic_msg_sent_bytes
.get_or_create(topic)
.inc_by(bytes as u64);
}
}
pub fn msg_recvd(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_msg_recv_counts.get_or_create(topic).inc();
}
}
pub fn msg_recvd_unfiltered(&mut self, topic: &TopicHash, bytes: usize) {
if self.register_topic(topic).is_ok() {
self.topic_msg_recv_counts_unfiltered
.get_or_create(topic)
.inc();
self.topic_msg_recv_bytes
.get_or_create(topic)
.inc_by(bytes as u64);
}
}
pub fn register_msg_validation(&mut self, topic: &TopicHash, validation: &MessageAcceptance) {
if self.register_topic(topic).is_ok() {
match validation {
MessageAcceptance::Accept => self.accepted_messages.get_or_create(topic).inc(),
MessageAcceptance::Ignore => self.ignored_messages.get_or_create(topic).inc(),
MessageAcceptance::Reject => self.rejected_messages.get_or_create(topic).inc(),
};
}
}
pub fn memcache_miss(&mut self) {
self.memcache_misses.inc();
}
pub fn register_iwant(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_iwant_msgs.get_or_create(topic).inc();
}
}
pub fn observe_heartbeat_duration(&mut self, millis: u64) {
self.heartbeat_duration.observe(millis as f64);
}
pub fn observe_mesh_peers_score(&mut self, topic: &TopicHash, score: f64) {
if self.register_topic(topic).is_ok() {
self.score_per_mesh.get_or_create(topic).observe(score);
}
}
pub fn peer_protocol_connected(&mut self, kind: PeerKind) {
self.peers_per_protocol
.get_or_create(&ProtocolLabel { protocol: kind })
.inc();
}
pub fn peer_protocol_disconnected(&mut self, kind: PeerKind) {
let metric = self
.peers_per_protocol
.get_or_create(&ProtocolLabel { protocol: kind });
if metric.get() != 0 {
metric.set(metric.get() - 1);
}
}
}
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
pub enum Inclusion {
Fanout,
Random,
Subscribed,
Outbound,
}
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
pub enum Churn {
Dc,
BadScore,
Prune,
Unsub,
Excess,
}
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
pub enum Penalty {
GraftBackoff,
BrokenPromise,
MessageDeficit,
IPColocation,
}
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
struct InclusionLabel {
hash: String,
reason: Inclusion,
}
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
struct ChurnLabel {
hash: String,
reason: Churn,
}
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
struct ProtocolLabel {
protocol: PeerKind,
}
#[derive(PartialEq, Eq, Hash, Encode, Clone)]
struct PenaltyLabel {
penalty: Penalty,
}
#[derive(Clone)]
struct HistBuilder {
buckets: Vec<f64>,
}
impl MetricConstructor<Histogram> for HistBuilder {
fn new_metric(&self) -> Histogram {
Histogram::new(self.buckets.clone().into_iter())
}
}