use crate::groups::GroupInfo;
use serde::Serialize;
use std::collections::{HashMap, HashSet};
use std::sync::Mutex;
#[derive(Debug, Clone, Default, Serialize)]
pub struct GroupCounters {
pub messages_received: u64,
pub messages_dropped_decode_failed: u64,
pub messages_dropped_author_banned: u64,
pub messages_dropped_write_policy_violation: u64,
pub messages_dropped_signature_failed: u64,
pub messages_dropped_other: u64,
pub last_message_at_ms: Option<u64>,
pub member_joined_events_applied: u64,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct GroupsDiagnosticsSnapshot {
pub groups: Vec<GroupDiagnostic>,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct GroupDiagnostic {
pub group_id: String,
pub members_v2_size: usize,
pub subscribed_metadata: bool,
pub subscribed_public: bool,
#[serde(flatten)]
pub counters: GroupCounters,
}
#[derive(Debug, Default)]
pub struct GroupsDiagnostics {
inner: Mutex<HashMap<String, GroupCounters>>,
}
impl GroupsDiagnostics {
#[must_use]
pub fn new() -> Self {
Self::default()
}
fn with_counters<F>(&self, group_id: &str, f: F)
where
F: FnOnce(&mut GroupCounters),
{
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
let entry = guard.entry(group_id.to_string()).or_default();
f(entry);
}
pub fn record_message_received(&self, group_id: &str, now_ms: u64) {
self.with_counters(group_id, |c| {
c.messages_received = c.messages_received.saturating_add(1);
c.last_message_at_ms = Some(now_ms);
});
}
pub fn record_decode_failed(&self, group_id: &str) {
self.with_counters(group_id, |c| {
c.messages_dropped_decode_failed = c.messages_dropped_decode_failed.saturating_add(1);
});
}
pub fn record_author_banned(&self, group_id: &str) {
self.with_counters(group_id, |c| {
c.messages_dropped_author_banned = c.messages_dropped_author_banned.saturating_add(1);
});
}
pub fn record_write_policy_violation(&self, group_id: &str) {
self.with_counters(group_id, |c| {
c.messages_dropped_write_policy_violation =
c.messages_dropped_write_policy_violation.saturating_add(1);
});
}
pub fn record_signature_failed(&self, group_id: &str) {
self.with_counters(group_id, |c| {
c.messages_dropped_signature_failed =
c.messages_dropped_signature_failed.saturating_add(1);
});
}
pub fn record_other_drop(&self, group_id: &str) {
self.with_counters(group_id, |c| {
c.messages_dropped_other = c.messages_dropped_other.saturating_add(1);
});
}
pub fn record_member_joined(&self, group_id: &str) {
self.with_counters(group_id, |c| {
c.member_joined_events_applied = c.member_joined_events_applied.saturating_add(1);
});
}
#[must_use]
pub fn snapshot(
&self,
groups: &HashMap<String, GroupInfo>,
metadata_subscribed: &HashSet<String>,
public_subscribed: &HashSet<String>,
) -> GroupsDiagnosticsSnapshot {
let counters_guard = match self.inner.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
fn merge_counters(dst: &mut GroupCounters, src: &GroupCounters) {
dst.messages_received = dst.messages_received.saturating_add(src.messages_received);
dst.messages_dropped_decode_failed = dst
.messages_dropped_decode_failed
.saturating_add(src.messages_dropped_decode_failed);
dst.messages_dropped_author_banned = dst
.messages_dropped_author_banned
.saturating_add(src.messages_dropped_author_banned);
dst.messages_dropped_write_policy_violation = dst
.messages_dropped_write_policy_violation
.saturating_add(src.messages_dropped_write_policy_violation);
dst.messages_dropped_signature_failed = dst
.messages_dropped_signature_failed
.saturating_add(src.messages_dropped_signature_failed);
dst.messages_dropped_other = dst
.messages_dropped_other
.saturating_add(src.messages_dropped_other);
dst.member_joined_events_applied = dst
.member_joined_events_applied
.saturating_add(src.member_joined_events_applied);
dst.last_message_at_ms = match (dst.last_message_at_ms, src.last_message_at_ms) {
(Some(a), Some(b)) => Some(a.max(b)),
(None, Some(b)) => Some(b),
(a, None) => a,
};
}
let stable_for_key = |key: &str| -> String {
groups
.get(key)
.map(|info| info.stable_group_id().to_string())
.or_else(|| {
groups
.values()
.find(|info| info.stable_group_id() == key)
.map(|info| info.stable_group_id().to_string())
})
.unwrap_or_else(|| key.to_string())
};
let mut rows: std::collections::BTreeMap<String, GroupDiagnostic> =
std::collections::BTreeMap::new();
for (key, info) in groups {
let stable_id = info.stable_group_id().to_string();
rows.entry(stable_id.clone())
.or_insert_with(|| GroupDiagnostic {
group_id: stable_id.clone(),
members_v2_size: info.members_v2.values().filter(|m| m.is_active()).count(),
subscribed_metadata: metadata_subscribed.contains(key)
|| metadata_subscribed.contains(&stable_id),
subscribed_public: public_subscribed.contains(&stable_id)
|| public_subscribed.contains(key),
counters: GroupCounters::default(),
});
}
for (key, counters) in counters_guard.iter() {
let stable_id = stable_for_key(key);
let row = rows
.entry(stable_id.clone())
.or_insert_with(|| GroupDiagnostic {
group_id: stable_id,
members_v2_size: 0,
subscribed_metadata: metadata_subscribed.contains(key),
subscribed_public: public_subscribed.contains(key),
counters: GroupCounters::default(),
});
merge_counters(&mut row.counters, counters);
}
GroupsDiagnosticsSnapshot {
groups: rows.into_values().collect(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::groups::{GroupInfo, GroupPolicyPreset};
use crate::identity::AgentId;
fn group(name: &str, mls_id: &str) -> GroupInfo {
GroupInfo::with_policy(
name.to_string(),
String::new(),
AgentId([7; 32]),
mls_id.to_string(),
GroupPolicyPreset::PublicOpen.to_policy(),
)
}
#[test]
fn record_and_snapshot_isolates_counters_per_group() {
let diag = GroupsDiagnostics::new();
diag.record_message_received("g1", 1_000);
diag.record_message_received("g1", 1_001);
diag.record_write_policy_violation("g1");
diag.record_decode_failed("g2");
diag.record_member_joined("g2");
let mut groups: HashMap<String, GroupInfo> = HashMap::new();
groups.insert("g1".into(), group("G1", "g1"));
groups.insert("g2".into(), group("G2", "g2"));
let mut meta = HashSet::new();
meta.insert("g1".to_string());
let mut pub_set = HashSet::new();
pub_set.insert("g1".to_string());
let snap = diag.snapshot(&groups, &meta, &pub_set);
assert_eq!(snap.groups.len(), 2);
let g1 = snap.groups.iter().find(|g| g.group_id == "g1").unwrap();
assert_eq!(g1.counters.messages_received, 2);
assert_eq!(g1.counters.messages_dropped_write_policy_violation, 1);
assert_eq!(g1.counters.last_message_at_ms, Some(1_001));
assert!(g1.subscribed_metadata);
assert!(g1.subscribed_public);
let g2 = snap.groups.iter().find(|g| g.group_id == "g2").unwrap();
assert_eq!(g2.counters.messages_dropped_decode_failed, 1);
assert_eq!(g2.counters.member_joined_events_applied, 1);
assert!(!g2.subscribed_metadata);
assert!(!g2.subscribed_public);
}
#[test]
fn snapshot_includes_groups_without_known_info() {
let diag = GroupsDiagnostics::new();
diag.record_other_drop("ghost");
let groups: HashMap<String, GroupInfo> = HashMap::new();
let snap = diag.snapshot(&groups, &HashSet::new(), &HashSet::new());
assert_eq!(snap.groups.len(), 1);
assert_eq!(snap.groups[0].group_id, "ghost");
assert_eq!(snap.groups[0].members_v2_size, 0);
assert_eq!(snap.groups[0].counters.messages_dropped_other, 1);
}
}