use snarkvm::{
ledger::{
committee::Committee,
narwhal::{BatchCertificate, BatchHeader, Subdag},
},
prelude::{Address, Field, Network, cfg_chunks, cfg_iter},
};
use indexmap::{IndexMap, IndexSet};
#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
#[cfg(not(feature = "serial"))]
use rayon::prelude::*;
use std::{collections::BTreeMap, sync::Arc};
type ParticipationScores = (f64, f64, f64);
#[derive(Clone, Debug)]
pub struct Telemetry<N: Network> {
tracked_certificates: Arc<RwLock<BTreeMap<u64, IndexSet<Field<N>>>>>,
validator_signatures: Arc<RwLock<IndexMap<Address<N>, IndexMap<u64, u32>>>>,
validator_certificates: Arc<RwLock<IndexMap<Address<N>, IndexSet<u64>>>>,
participation_scores: Arc<RwLock<IndexMap<Address<N>, ParticipationScores>>>,
}
impl<N: Network> Default for Telemetry<N> {
fn default() -> Self {
Self::new()
}
}
impl<N: Network> Telemetry<N> {
pub fn new() -> Self {
Self {
tracked_certificates: Default::default(),
validator_signatures: Default::default(),
validator_certificates: Default::default(),
participation_scores: Default::default(),
}
}
pub fn get_participation_scores(&self, committee: &Committee<N>) -> IndexMap<Address<N>, f64> {
let participation_scores = self.participation_scores.read();
committee
.members()
.iter()
.map(|(address, _)| {
let score =
participation_scores.get(address).map(|(_, _, combined_score)| *combined_score).unwrap_or(0.0);
(*address, score)
})
.collect()
}
pub fn insert_subdag(&self, subdag: &Subdag<N>) {
let next_gc_round = subdag.anchor_round().saturating_sub(BatchHeader::<N>::MAX_GC_ROUNDS as u64);
self.garbage_collect_certificates(next_gc_round);
cfg_iter!(subdag).for_each(|(_round, certificates)| {
cfg_iter!(certificates).for_each(|certificate| {
self.insert_certificate(certificate);
})
});
self.update_participation_scores();
}
pub fn insert_certificate(&self, certificate: &BatchCertificate<N>) {
let mut tracked_certificates = self.tracked_certificates.write();
let certificate_round = certificate.round();
let certificate_author = certificate.author();
let certificate_id = certificate.id();
if tracked_certificates.get(&certificate_round).is_some_and(|certs| certs.contains(&certificate_id)) {
return;
}
tracked_certificates.entry(certificate_round).or_default().insert(certificate_id);
let mut validator_signatures = self.validator_signatures.write();
for address in
[certificate_author].into_iter().chain(certificate.signatures().map(|signature| signature.to_address()))
{
validator_signatures
.entry(address)
.or_default()
.entry(certificate_round)
.and_modify(|count| *count += 1)
.or_insert(1);
}
let mut validator_certificates = self.validator_certificates.write();
validator_certificates.entry(certificate_author).or_default().insert(certificate_round);
}
pub fn update_participation_scores(&self) {
fn weighted_score(certificate_score: f64, signature_score: f64) -> f64 {
let score = (0.9 * certificate_score) + (0.1 * signature_score);
(score * 100.0).round() / 100.0
}
let tracked_certificates = self.tracked_certificates.read();
let validator_signatures = self.validator_signatures.read();
let validator_certificates = self.validator_certificates.read();
let total_certificates = validator_certificates.values().map(|rounds| rounds.len()).sum::<usize>();
let signature_participation_scores: IndexMap<_, _> = cfg_iter!(&*validator_signatures)
.map(|(address, signatures)| {
let total_signatures = signatures.values().sum::<u32>() as f64;
let score = total_signatures / total_certificates as f64 * 100.0;
(*address, score as u16)
})
.collect();
let tracked_rounds: Vec<_> = tracked_certificates.keys().skip_while(|r| *r % 2 == 0).copied().collect();
let certificate_participation_scores: IndexMap<_, _> = cfg_iter!(&*validator_certificates)
.map(|(address, certificate_rounds)| {
let num_included_round_pairs = cfg_chunks!(&tracked_rounds, 2)
.filter(|chunk| chunk.iter().any(|r| certificate_rounds.contains(r)))
.count();
let num_round_pairs = (tracked_rounds.len().saturating_add(1)).saturating_div(2);
let score = num_included_round_pairs as f64 / num_round_pairs.max(1) as f64 * 100.0;
(*address, score as u16)
})
.collect();
let validator_addresses: IndexSet<_> =
signature_participation_scores.keys().chain(certificate_participation_scores.keys()).copied().collect();
let mut new_participation_scores = IndexMap::new();
for address in validator_addresses {
let signature_score = *signature_participation_scores.get(&address).unwrap_or(&0) as f64;
let certificate_score = *certificate_participation_scores.get(&address).unwrap_or(&0) as f64;
let combined_score = weighted_score(certificate_score, signature_score);
new_participation_scores.insert(address, (certificate_score, signature_score, combined_score));
}
*self.participation_scores.write() = new_participation_scores;
}
pub fn garbage_collect_certificates(&self, gc_round: u64) {
let mut tracked_certificates = self.tracked_certificates.write();
let mut validator_signatures = self.validator_signatures.write();
let mut validator_certificates = self.validator_certificates.write();
tracked_certificates.retain(|&round, _| round > gc_round);
validator_signatures.retain(|_, rounds| {
rounds.retain(|&round, _| round > gc_round);
!rounds.is_empty()
});
validator_certificates.retain(|_, rounds| {
rounds.retain(|&round| round > gc_round);
!rounds.is_empty()
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use snarkvm::{
ledger::{
committee::test_helpers::sample_committee_for_round_and_members,
narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round,
},
prelude::MainnetV0,
utilities::TestRng,
};
use rand::Rng;
type CurrentNetwork = MainnetV0;
#[test]
fn test_insert_certificates() {
let rng = &mut TestRng::default();
let telemetry = Telemetry::<CurrentNetwork>::new();
let current_round = 2;
let mut certificates = IndexSet::new();
for _ in 0..10 {
certificates.insert(sample_batch_certificate_for_round(current_round, rng));
}
assert!(telemetry.tracked_certificates.read().is_empty());
for certificate in &certificates {
telemetry.insert_certificate(certificate);
}
assert_eq!(telemetry.tracked_certificates.read().get(¤t_round).unwrap().len(), certificates.len());
}
#[test]
fn test_participation_scores() {
let rng = &mut TestRng::default();
let telemetry = Telemetry::<CurrentNetwork>::new();
let current_round = 2;
let mut certificates = IndexSet::new();
certificates.insert(sample_batch_certificate_for_round(current_round, rng));
certificates.insert(sample_batch_certificate_for_round(current_round, rng));
certificates.insert(sample_batch_certificate_for_round(current_round, rng));
certificates.insert(sample_batch_certificate_for_round(current_round, rng));
let committee = sample_committee_for_round_and_members(
current_round,
vec![
certificates[0].author(),
certificates[1].author(),
certificates[2].author(),
certificates[3].author(),
],
rng,
);
assert!(telemetry.tracked_certificates.read().is_empty());
for certificate in &certificates {
telemetry.insert_certificate(certificate);
}
let participation_scores = telemetry.get_participation_scores(&committee);
assert_eq!(participation_scores.len(), committee.members().len());
for (address, _) in committee.members() {
assert_eq!(*participation_scores.get(address).unwrap(), 0.0);
}
telemetry.update_participation_scores();
let participation_scores = telemetry.get_participation_scores(&committee);
for (address, _) in committee.members() {
assert!(*participation_scores.get(address).unwrap() > 0.0);
}
println!("{participation_scores:?}");
}
#[test]
fn test_garbage_collection() {
let rng = &mut TestRng::default();
let telemetry = Telemetry::<CurrentNetwork>::new();
let current_round = 2;
let next_round = current_round + 1;
let mut certificates = IndexSet::new();
let num_initial_certificates = rng.gen_range(1..10);
for _ in 0..num_initial_certificates {
certificates.insert(sample_batch_certificate_for_round(current_round, rng));
}
let num_new_certificates = rng.gen_range(1..10);
for _ in 0..num_new_certificates {
certificates.insert(sample_batch_certificate_for_round(next_round, rng));
}
for certificate in &certificates {
telemetry.insert_certificate(certificate);
}
assert_eq!(telemetry.tracked_certificates.read().get(¤t_round).unwrap().len(), num_initial_certificates);
assert_eq!(telemetry.tracked_certificates.read().get(&next_round).unwrap().len(), num_new_certificates);
telemetry.garbage_collect_certificates(current_round);
assert!(telemetry.tracked_certificates.read().get(¤t_round).is_none());
assert_eq!(telemetry.tracked_certificates.read().get(&next_round).unwrap().len(), num_new_certificates);
}
}