use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use crate::gossip::sharded_gossip::NodeId;
use crate::gossip::sharded_gossip::RegionDiffs;
use crate::gossip::sharded_gossip::RoundState;
use crate::types::event::*;
use crate::types::*;
use kitsune_p2p_timestamp::Timestamp;
use kitsune_p2p_types::agent_info::AgentInfoSigned;
use num_traits::*;
const HISTORICAL_RECORD_EXPIRE_DURATION_MICROS: i64 = 1000 * 1000 * 60 * 60 * 24 * 7;
#[derive(Debug, Clone, Copy)]
pub struct RunAvg(f32, u8);
impl Default for RunAvg {
fn default() -> Self {
Self(0.0, 0)
}
}
impl RunAvg {
pub fn push<V: AsPrimitive<f32>>(&mut self, v: V) {
self.push_n(v, 1);
}
pub fn push_n<V: AsPrimitive<f32>>(&mut self, v: V, count: u8) {
self.1 = self.1.saturating_add(count);
self.0 = (self.0 * (self.1 - count) as f32 + (v.as_() * count as f32)) / self.1 as f32;
}
}
macro_rules! mk_from {
($($t:ty,)*) => {$(
impl From<$t> for RunAvg {
fn from(o: $t) -> Self {
Self(o as f32, 1)
}
}
)*};
}
mk_from! {
i8,
u8,
i16,
u16,
i32,
u32,
i64,
u64,
f32,
f64,
}
impl std::ops::Deref for RunAvg {
type Target = f32;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AsRef<f32> for RunAvg {
fn as_ref(&self) -> &f32 {
&self.0
}
}
impl std::borrow::Borrow<f32> for RunAvg {
fn borrow(&self) -> &f32 {
&self.0
}
}
impl std::fmt::Display for RunAvg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
const MAX_TRIGGERS: u8 = 2;
const MAX_HISTORY: usize = 10;
#[derive(Debug, Clone, Default)]
pub struct PeerAgentHistory {
pub reachability_quotient: RunAvg,
pub latency_micros: RunAvg,
pub initiates: VecDeque<RoundMetric>,
pub accepts: VecDeque<RoundMetric>,
pub successes: VecDeque<RoundMetric>,
pub errors: VecDeque<RoundMetric>,
pub current_round: bool,
}
#[derive(Debug, Clone, Default)]
pub struct PeerNodeHistory {
pub remote_agents: Vec<Arc<KitsuneAgent>>,
pub current_round: Option<CurrentRound>,
pub completed_rounds: VecDeque<CompletedRound>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoundMetric {
pub instant: Instant,
pub gossip_type: GossipModuleType,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompletedRound {
pub id: String,
pub gossip_type: GossipModuleType,
pub start_time: Instant,
pub end_time: Instant,
pub error: bool,
pub region_diffs: RegionDiffs,
}
impl CompletedRound {
pub fn duration(&self) -> Duration {
self.end_time.duration_since(self.start_time)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CurrentRound {
pub id: String,
pub gossip_type: GossipModuleType,
pub last_touch: Instant,
pub start_time: Instant,
pub region_diffs: RegionDiffs,
}
impl CurrentRound {
pub fn new(id: String, gossip_type: GossipModuleType, start_time: Instant) -> Self {
Self {
id,
gossip_type,
start_time,
last_touch: Instant::now(),
region_diffs: Default::default(),
}
}
pub fn update(&mut self, round_state: &RoundState) {
self.last_touch = Instant::now();
self.region_diffs = round_state.region_diffs.clone();
}
pub fn completed(self, error: bool) -> CompletedRound {
CompletedRound {
id: self.id,
gossip_type: self.gossip_type,
start_time: self.start_time,
end_time: Instant::now(),
error,
region_diffs: self.region_diffs,
}
}
}
impl RoundMetric {
pub fn elapsed(&self) -> Duration {
self.instant.elapsed()
}
}
impl PartialOrd for RoundMetric {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for RoundMetric {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.instant.cmp(&other.instant)
}
}
impl PartialOrd for CompletedRound {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CompletedRound {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.start_time.cmp(&other.start_time) {
core::cmp::Ordering::Equal => {}
ord => return ord,
}
self.end_time.cmp(&other.end_time)
}
}
impl PartialOrd for CurrentRound {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CurrentRound {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.start_time.cmp(&other.start_time)
}
}
#[derive(Debug, Default)]
pub struct Metrics {
agent_history: HashMap<Arc<KitsuneAgent>, PeerAgentHistory>,
node_history: HashMap<NodeId, PeerNodeHistory>,
agg_extrap_cov: RunAvg,
pub(crate) force_initiates: u8,
}
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
pub enum RoundOutcome {
Success(RoundMetric),
Error(RoundMetric),
}
pub enum AgentLike<'lt> {
Info(&'lt AgentInfoSigned),
PubKey(&'lt Arc<KitsuneAgent>),
}
impl<'lt> From<&'lt AgentInfoSigned> for AgentLike<'lt> {
fn from(i: &'lt AgentInfoSigned) -> Self {
Self::Info(i)
}
}
impl<'lt> From<&'lt Arc<KitsuneAgent>> for AgentLike<'lt> {
fn from(pk: &'lt Arc<KitsuneAgent>) -> Self {
Self::PubKey(pk)
}
}
impl<'lt> AgentLike<'lt> {
pub fn agent(&self) -> &Arc<KitsuneAgent> {
match self {
Self::Info(i) => &i.agent,
Self::PubKey(pk) => pk,
}
}
}
impl Metrics {
pub fn dump_historical(&self) -> Vec<MetricRecord> {
let now = Timestamp::now();
let expires_at =
Timestamp::from_micros(now.as_micros() + HISTORICAL_RECORD_EXPIRE_DURATION_MICROS);
let mut out = Vec::new();
for (agent, node) in self.agent_history.iter() {
out.push(MetricRecord {
kind: MetricRecordKind::ReachabilityQuotient,
agent: Some(agent.clone()),
recorded_at_utc: now,
expires_at_utc: expires_at,
data: serde_json::json!(*node.reachability_quotient),
});
out.push(MetricRecord {
kind: MetricRecordKind::LatencyMicros,
agent: Some(agent.clone()),
recorded_at_utc: now,
expires_at_utc: expires_at,
data: serde_json::json!(*node.latency_micros),
});
}
out.push(MetricRecord {
kind: MetricRecordKind::AggExtrapCov,
agent: None,
recorded_at_utc: now,
expires_at_utc: expires_at,
data: serde_json::json!(*self.agg_extrap_cov),
});
out
}
pub fn dump(&self) -> serde_json::Value {
let agents: serde_json::Value = self
.agent_history
.iter()
.map(|(a, i)| {
(
a.to_string(),
serde_json::json!({
"reachability_quotient": *i.reachability_quotient,
"latency_micros": *i.latency_micros,
}),
)
})
.collect::<serde_json::map::Map<String, serde_json::Value>>()
.into();
serde_json::json!({
"aggExtrapCov": *self.agg_extrap_cov,
"agents": agents,
})
}
pub fn record_extrap_cov_event(&mut self, extrap_cov: f32) {
self.agg_extrap_cov.push(extrap_cov);
}
pub fn record_reachability_event<'a, T, I>(&mut self, success: bool, remote_agent_list: I)
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
for agent_info in remote_agent_list {
let info = self
.agent_history
.entry(agent_info.into().agent().clone())
.or_default();
if success {
info.reachability_quotient.push(100);
} else {
info.reachability_quotient.push_n(1, 5);
}
}
}
pub fn record_latency_micros<'a, T, I, V>(&mut self, micros: V, remote_agent_list: I)
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
V: AsPrimitive<f32>,
{
for agent_info in remote_agent_list {
let history = self
.agent_history
.entry(agent_info.into().agent().clone())
.or_default();
history.latency_micros.push(micros);
}
}
pub fn record_initiate<'a, T, I>(&mut self, remote_agent_list: I, gossip_type: GossipModuleType)
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
for agent_info in remote_agent_list {
let history = self
.agent_history
.entry(agent_info.into().agent().clone())
.or_default();
let round = RoundMetric {
instant: Instant::now(),
gossip_type,
};
record_item(&mut history.initiates, round);
if history.current_round {
tracing::info!("Recorded initiate with current round already set");
}
history.current_round = true;
}
}
pub fn record_accept<'a, T, I>(&mut self, remote_agent_list: I, gossip_type: GossipModuleType)
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
for agent_info in remote_agent_list {
let history = self
.agent_history
.entry(agent_info.into().agent().clone())
.or_default();
let round = RoundMetric {
instant: Instant::now(),
gossip_type,
};
record_item(&mut history.accepts, round);
if history.current_round {
tracing::info!("Recorded accept with current round already set");
}
history.current_round = true;
}
}
pub fn record_success<'a, T, I>(&mut self, remote_agent_list: I, gossip_type: GossipModuleType)
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
let mut should_dec_force_initiates = false;
for agent_info in remote_agent_list {
let history = self
.agent_history
.entry(agent_info.into().agent().clone())
.or_default();
history.reachability_quotient.push(100);
let round = RoundMetric {
instant: Instant::now(),
gossip_type,
};
record_item(&mut history.successes, round);
history.current_round = false;
if history.is_initiate_round() {
should_dec_force_initiates = true;
}
}
if should_dec_force_initiates {
self.force_initiates = self.force_initiates.saturating_sub(1);
}
tracing::debug!(
"recorded success in metrics. force_initiates={}",
self.force_initiates
);
}
pub fn record_error<'a, T, I>(&mut self, remote_agent_list: I, gossip_type: GossipModuleType)
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
for agent_info in remote_agent_list {
let history = self
.agent_history
.entry(agent_info.into().agent().clone())
.or_default();
history.reachability_quotient.push_n(1, 5);
let round = RoundMetric {
instant: Instant::now(),
gossip_type,
};
record_item(&mut history.errors, round);
history.current_round = false;
}
tracing::debug!(
"recorded error in metrics. force_initiates={}",
self.force_initiates
);
}
pub fn update_current_round(
&mut self,
peer: &NodeId,
gossip_type: GossipModuleType,
round_state: &RoundState,
) {
let remote_agents = round_state
.remote_agent_list
.clone()
.into_iter()
.map(|a| a.agent())
.collect();
let history = self.node_history.entry(peer.clone()).or_default();
history.remote_agents = remote_agents;
if let Some(r) = &mut history.current_round {
r.update(round_state);
} else {
history.current_round = Some(CurrentRound::new(
round_state.id.clone(),
gossip_type,
Instant::now(),
));
}
}
pub fn complete_current_round(&mut self, node: &NodeId, error: bool) {
let history = self.node_history.entry(node.clone()).or_default();
let r = history.current_round.take();
if let Some(r) = r {
history.completed_rounds.push_back(r.completed(error))
}
}
pub fn record_force_initiate(&mut self) {
self.force_initiates = MAX_TRIGGERS;
}
pub fn last_success<'a, T, I>(&self, remote_agent_list: I) -> Option<&RoundMetric>
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
remote_agent_list
.into_iter()
.filter_map(|agent_info| self.agent_history.get(agent_info.into().agent()))
.filter_map(|info| info.successes.back())
.min_by_key(|r| r.instant)
}
pub fn is_current_round<'a, T, I>(&self, remote_agent_list: I) -> bool
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
remote_agent_list
.into_iter()
.filter_map(|agent_info| self.agent_history.get(agent_info.into().agent()))
.any(|info| info.current_round)
}
pub fn last_outcome<'a, T, I>(&self, remote_agent_list: I) -> Option<RoundOutcome>
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
#[allow(clippy::map_flatten)]
remote_agent_list
.into_iter()
.filter_map(|agent_info| self.agent_history.get(agent_info.into().agent()))
.map(|info| {
[
info.errors.back().map(|x| RoundOutcome::Error(x.clone())),
info.successes
.back()
.map(|x| RoundOutcome::Success(x.clone())),
]
})
.flatten()
.flatten()
.max()
}
pub fn forced_initiate(&self) -> bool {
self.force_initiates > 0
}
pub fn reachability_quotient<'a, T, I>(&self, remote_agent_list: I) -> f32
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
let (sum, cnt) = remote_agent_list
.into_iter()
.filter_map(|agent_info| self.agent_history.get(agent_info.into().agent()))
.map(|info| *info.reachability_quotient)
.fold((0.0, 0.0), |acc, x| (acc.0 + x, acc.1 + 1.0));
if cnt <= 0.0 {
0.0
} else {
sum / cnt
}
}
pub fn latency_micros<'a, T, I>(&self, remote_agent_list: I) -> f32
where
T: Into<AgentLike<'a>>,
I: IntoIterator<Item = T>,
{
let (sum, cnt) = remote_agent_list
.into_iter()
.filter_map(|agent_info| self.agent_history.get(agent_info.into().agent()))
.map(|info| *info.latency_micros)
.fold((0.0, 0.0), |acc, x| (acc.0 + x, acc.1 + 1.0));
if cnt <= 0.0 {
0.0
} else {
sum / cnt
}
}
pub fn peer_agent_histories(&self) -> &HashMap<Arc<KitsuneAgent>, PeerAgentHistory> {
&self.agent_history
}
pub fn peer_node_histories(&self) -> &HashMap<NodeId, PeerNodeHistory> {
&self.node_history
}
}
impl PeerAgentHistory {
fn is_initiate_round(&self) -> bool {
match (self.accepts.back(), self.initiates.back()) {
(None, None) | (Some(_), None) => false,
(None, Some(_)) => true,
(Some(remote), Some(initiate)) => initiate > remote,
}
}
}
fn record_item<T>(buffer: &mut VecDeque<T>, item: T) {
if buffer.len() > MAX_HISTORY {
buffer.pop_front();
}
buffer.push_back(item);
}
impl std::fmt::Display for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
static TRACE: once_cell::sync::Lazy<bool> = once_cell::sync::Lazy::new(|| {
std::env::var("GOSSIP_METRICS").map_or(false, |s| s == "trace")
});
let trace = *TRACE;
write!(f, "Metrics:")?;
let mut average_last_completion = std::time::Duration::default();
let mut max_last_completion = std::time::Duration::default();
let mut average_completion_frequency = std::time::Duration::default();
let mut complete_rounds = 0;
let mut min_complete_rounds = usize::MAX;
for (key, info) in &self.agent_history {
let completion_frequency: std::time::Duration =
info.successes.iter().map(|i| i.elapsed()).sum();
let completion_frequency = completion_frequency
.checked_div(info.successes.len() as u32)
.unwrap_or_default();
let last_completion = info
.successes
.back()
.map(|i| i.elapsed())
.unwrap_or_default();
average_last_completion += last_completion;
max_last_completion = max_last_completion.max(last_completion);
average_completion_frequency += completion_frequency;
if !info.successes.is_empty() {
complete_rounds += 1;
}
min_complete_rounds = min_complete_rounds.min(info.successes.len());
if trace {
write!(f, "\n\t{:?}:", key)?;
write!(
f,
"\n\t\tErrors: {}, Last: {:?}",
info.errors.len(),
info.errors.back().map(|i| i.elapsed()).unwrap_or_default()
)?;
write!(
f,
"\n\t\tInitiates: {}, Last: {:?}",
info.initiates.len(),
info.initiates
.back()
.map(|i| i.elapsed())
.unwrap_or_default()
)?;
write!(
f,
"\n\t\tRemote Rounds: {}, Last: {:?}",
info.accepts.len(),
info.accepts.back().map(|i| i.elapsed()).unwrap_or_default()
)?;
write!(
f,
"\n\t\tComplete Rounds: {}, Last: {:?}, Average completion Frequency: {:?}",
info.successes.len(),
last_completion,
completion_frequency
)?;
write!(f, "\n\t\tCurrent Round: {:?}", info.current_round)?;
}
}
write!(
f,
"\n\tNumber of remote nodes complete {} out of {}. Min per node: {}.",
complete_rounds,
self.agent_history.len(),
min_complete_rounds
)?;
write!(
f,
"\n\tAverage time since last completion: {:?}",
average_last_completion
.checked_div(self.agent_history.len() as u32)
.unwrap_or_default()
)?;
write!(
f,
"\n\tMax time since last completion: {:?}",
max_last_completion
)?;
write!(
f,
"\n\tAverage completion frequency: {:?}",
average_completion_frequency
.checked_div(self.agent_history.len() as u32)
.unwrap_or_default()
)?;
write!(f, "\n\tForce Initiate: {}", self.force_initiates)?;
Ok(())
}
}
#[derive(Clone)]
pub struct MetricsSync(Arc<parking_lot::RwLock<Metrics>>);
impl Default for MetricsSync {
fn default() -> Self {
Self(Arc::new(parking_lot::RwLock::new(Metrics::default())))
}
}
impl std::fmt::Debug for MetricsSync {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.read().fmt(f)
}
}
impl std::fmt::Display for MetricsSync {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.read().fmt(f)
}
}
impl MetricsSync {
pub fn read(&self) -> parking_lot::RwLockReadGuard<Metrics> {
match self.0.try_read_for(std::time::Duration::from_millis(100)) {
Some(g) => g,
None => self.0.read_recursive(),
}
}
pub fn write(&self) -> parking_lot::RwLockWriteGuard<Metrics> {
match self.0.try_write_for(std::time::Duration::from_secs(100)) {
Some(g) => g,
None => {
eprintln!("Metrics lock likely deadlocked");
self.0.write()
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_run_avg() {
let mut a1 = RunAvg::default();
a1.push(100);
a1.push(1);
a1.push(1);
a1.push(1);
assert_eq!(25.75, *a1);
let mut a2 = RunAvg::default();
a2.push_n(100, 1);
a2.push_n(1, 3);
assert_eq!(25.75, *a2);
let mut a3 = RunAvg::default();
a3.push_n(100, 255);
a3.push(1);
assert_eq!(99.61176, *a3);
let mut a4 = RunAvg::default();
a4.push_n(100, 255);
a4.push_n(1, 128);
assert_eq!(50.30588, *a4);
let mut a5 = RunAvg::default();
a5.push_n(100, 255);
a5.push_n(1, 255);
assert_eq!(1.0, *a5);
}
}