use std::time::Duration;
use dashmap::DashMap;
use freenet_stdlib::prelude::ContractInstanceId;
use crate::config::GlobalExecutor;
const SUMMARY_WINDOW: Duration = Duration::from_secs(60);
const TOP_K: usize = 8;
const MAX_TRACKED_CONTRACTS: usize = 4096;
#[derive(Default)]
struct ContractCounters {
updates: u64,
targets_total: u64,
no_targets: u64,
interest_resolve_failed: u64,
}
impl ContractCounters {
fn is_empty(&self) -> bool {
self.updates == 0
&& self.targets_total == 0
&& self.no_targets == 0
&& self.interest_resolve_failed == 0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ContractSummary {
pub(crate) contract: ContractInstanceId,
pub(crate) updates: u64,
pub(crate) targets_total: u64,
pub(crate) no_targets: u64,
pub(crate) interest_resolve_failed: u64,
}
impl ContractSummary {
fn targets_avg(&self) -> f64 {
if self.updates == 0 {
0.0
} else {
let avg = self.targets_total as f64 / self.updates as f64;
(avg * 10.0).round() / 10.0
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct WindowSummary {
pub(crate) contracts: usize,
pub(crate) updates: u64,
pub(crate) targets_total: u64,
pub(crate) no_targets: u64,
pub(crate) interest_resolve_failed: u64,
pub(crate) top: Vec<ContractSummary>,
pub(crate) others: usize,
}
impl WindowSummary {
pub(crate) fn targets_avg(&self) -> f64 {
if self.updates == 0 {
0.0
} else {
let avg = self.targets_total as f64 / self.updates as f64;
(avg * 10.0).round() / 10.0
}
}
}
pub(crate) fn aggregate_window(
mut summaries: Vec<ContractSummary>,
top_k: usize,
) -> Option<WindowSummary> {
summaries.retain(|s| s.updates > 0);
if summaries.is_empty() {
return None;
}
let contracts = summaries.len();
let updates: u64 = summaries.iter().map(|s| s.updates).sum();
let targets_total: u64 = summaries.iter().map(|s| s.targets_total).sum();
let no_targets: u64 = summaries.iter().map(|s| s.no_targets).sum();
let interest_resolve_failed: u64 = summaries.iter().map(|s| s.interest_resolve_failed).sum();
summaries.sort_by(|a, b| {
b.updates
.cmp(&a.updates)
.then_with(|| a.contract.as_bytes().cmp(b.contract.as_bytes()))
});
let others = contracts.saturating_sub(top_k);
summaries.truncate(top_k);
Some(WindowSummary {
contracts,
updates,
targets_total,
no_targets,
interest_resolve_failed,
top: summaries,
others,
})
}
pub(crate) struct UpdatePropagationStats {
contracts: DashMap<ContractInstanceId, ContractCounters>,
}
impl UpdatePropagationStats {
pub(crate) fn new() -> Self {
Self {
contracts: DashMap::new(),
}
}
pub(crate) fn record_broadcast(
&self,
contract: ContractInstanceId,
targets: usize,
interest_resolve_failed: usize,
) {
if !self.contracts.contains_key(&contract) && self.contracts.len() >= MAX_TRACKED_CONTRACTS
{
return;
}
let mut entry = self.contracts.entry(contract).or_default();
entry.updates += 1;
entry.targets_total += targets as u64;
if targets == 0 {
entry.no_targets += 1;
}
entry.interest_resolve_failed += interest_resolve_failed as u64;
}
fn drain_window(&self) -> Vec<ContractSummary> {
let mut out = Vec::with_capacity(self.contracts.len());
self.contracts.retain(|id, counters| {
if !counters.is_empty() {
out.push(ContractSummary {
contract: *id,
updates: counters.updates,
targets_total: counters.targets_total,
no_targets: counters.no_targets,
interest_resolve_failed: counters.interest_resolve_failed,
});
}
false
});
out
}
pub(crate) fn drain_and_log(&self) {
let Some(summary) = aggregate_window(self.drain_window(), TOP_K) else {
return;
};
tracing::info!(
contracts = summary.contracts,
updates = summary.updates,
targets_avg = summary.targets_avg(),
no_targets = summary.no_targets,
interest_resolve_failed = summary.interest_resolve_failed,
window_s = SUMMARY_WINDOW.as_secs(),
phase = "summary",
"update_propagation_summary"
);
for c in &summary.top {
tracing::info!(
contract = %format!("{:.8}", c.contract),
updates = c.updates,
targets_avg = c.targets_avg(),
no_targets = c.no_targets,
interest_resolve_failed = c.interest_resolve_failed,
phase = "summary_contract",
"update_propagation_summary contract detail"
);
}
if summary.others > 0 {
tracing::info!(
others = summary.others,
phase = "summary_rollup",
"update_propagation_summary …and {} other contract(s)",
summary.others
);
}
}
pub(crate) fn start_summary_task(self: std::sync::Arc<Self>) -> tokio::task::JoinHandle<()> {
GlobalExecutor::spawn(async move {
let mut ticker = tokio::time::interval(SUMMARY_WINDOW);
ticker.tick().await;
loop {
ticker.tick().await;
self.drain_and_log();
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn cid(seed: u8) -> ContractInstanceId {
ContractInstanceId::new([seed; 32])
}
#[test]
fn aggregate_window_empty_returns_none() {
assert!(aggregate_window(vec![], TOP_K).is_none());
}
#[test]
fn aggregate_window_ignores_zero_update_contracts() {
let summaries = vec![ContractSummary {
contract: cid(1),
updates: 0,
targets_total: 0,
no_targets: 0,
interest_resolve_failed: 5,
}];
assert!(aggregate_window(summaries, TOP_K).is_none());
}
#[test]
fn aggregate_window_totals_and_average() {
let summaries = vec![
ContractSummary {
contract: cid(1),
updates: 2,
targets_total: 10, no_targets: 0,
interest_resolve_failed: 1,
},
ContractSummary {
contract: cid(2),
updates: 3,
targets_total: 0, no_targets: 3,
interest_resolve_failed: 4,
},
];
let w = aggregate_window(summaries, TOP_K).expect("non-empty");
assert_eq!(w.contracts, 2);
assert_eq!(w.updates, 5);
assert_eq!(w.targets_total, 10);
assert_eq!(w.no_targets, 3);
assert_eq!(w.interest_resolve_failed, 5);
assert_eq!(w.targets_avg(), 2.0);
assert_eq!(w.others, 0);
assert_eq!(w.top[0].contract, cid(2));
assert_eq!(w.top[0].targets_avg(), 0.0);
assert_eq!(w.top[1].contract, cid(1));
assert_eq!(w.top[1].targets_avg(), 5.0);
}
#[test]
fn aggregate_window_caps_top_k_and_counts_others() {
let summaries: Vec<ContractSummary> = (0..5u8)
.map(|i| ContractSummary {
contract: cid(i),
updates: u64::from(i) + 1, targets_total: 0,
no_targets: 0,
interest_resolve_failed: 0,
})
.collect();
let w = aggregate_window(summaries, 2).expect("non-empty");
assert_eq!(w.contracts, 5);
assert_eq!(w.top.len(), 2);
assert_eq!(w.others, 3);
assert_eq!(w.top[0].contract, cid(4));
assert_eq!(w.top[1].contract, cid(3));
}
#[test]
fn aggregate_window_tie_break_is_deterministic_by_contract_id() {
let summaries = vec![
ContractSummary {
contract: cid(9),
updates: 1,
targets_total: 0,
no_targets: 0,
interest_resolve_failed: 0,
},
ContractSummary {
contract: cid(3),
updates: 1,
targets_total: 0,
no_targets: 0,
interest_resolve_failed: 0,
},
];
let w = aggregate_window(summaries, 1).expect("non-empty");
assert_eq!(w.top.len(), 1);
assert_eq!(w.others, 1);
assert_eq!(w.top[0].contract, cid(3));
}
#[test]
fn record_and_drain_accumulates_then_resets() {
let stats = UpdatePropagationStats::new();
let c = cid(7);
stats.record_broadcast(c, 3, 0);
stats.record_broadcast(c, 0, 2);
let drained = stats.drain_window();
assert_eq!(drained.len(), 1);
let s = &drained[0];
assert_eq!(s.contract, c);
assert_eq!(s.updates, 2);
assert_eq!(s.targets_total, 3);
assert_eq!(s.no_targets, 1);
assert_eq!(s.interest_resolve_failed, 2);
assert_eq!(stats.contracts.len(), 0);
let drained2 = stats.drain_window();
assert!(drained2.is_empty());
assert_eq!(stats.contracts.len(), 0);
}
#[test]
fn drain_restores_cap_headroom_for_next_window() {
let stats = UpdatePropagationStats::new();
let id_at = |i: usize| {
let mut bytes = [0u8; 32];
bytes[..8].copy_from_slice(&(i as u64).to_le_bytes());
ContractInstanceId::new(bytes)
};
for i in 0..MAX_TRACKED_CONTRACTS {
stats.record_broadcast(id_at(i), 1, 0);
}
assert_eq!(stats.contracts.len(), MAX_TRACKED_CONTRACTS);
let drained = stats.drain_window();
assert_eq!(drained.len(), MAX_TRACKED_CONTRACTS);
assert_eq!(
stats.contracts.len(),
0,
"drain must clear the map so the cap doesn't stay saturated"
);
let fresh = id_at(MAX_TRACKED_CONTRACTS + 1);
stats.record_broadcast(fresh, 2, 0);
assert!(
stats.contracts.get(&fresh).is_some(),
"new contract must be tracked in the window after a full drain"
);
let drained2 = stats.drain_window();
assert_eq!(drained2.len(), 1);
assert_eq!(drained2[0].contract, fresh);
assert_eq!(drained2[0].targets_total, 2);
}
#[test]
fn record_broadcast_respects_tracking_cap() {
let stats = UpdatePropagationStats::new();
for i in 0..MAX_TRACKED_CONTRACTS {
let mut bytes = [0u8; 32];
bytes[..8].copy_from_slice(&(i as u64).to_le_bytes());
stats.record_broadcast(ContractInstanceId::new(bytes), 1, 0);
}
assert_eq!(stats.contracts.len(), MAX_TRACKED_CONTRACTS);
let overflow = {
let mut bytes = [0u8; 32];
bytes[0] = 0xFF;
bytes[8] = 0xFF; ContractInstanceId::new(bytes)
};
stats.record_broadcast(overflow, 1, 0);
assert_eq!(stats.contracts.len(), MAX_TRACKED_CONTRACTS);
assert!(stats.contracts.get(&overflow).is_none());
}
#[test]
fn concurrent_record_and_drain_conserve_totals_without_field_split() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
const THREADS: usize = 8;
const PER_THREAD: u64 = 2_000;
const TARGETS_EACH: usize = 2;
const IRF_EACH: usize = 1;
let stats = Arc::new(UpdatePropagationStats::new());
let done = Arc::new(AtomicBool::new(false));
let c = cid(123);
let acc = std::thread::scope(|scope| {
let drainer = {
let stats = Arc::clone(&stats);
let done = Arc::clone(&done);
scope.spawn(move || {
let mut acc = (0u64, 0u64, 0u64, 0u64);
let mut drain_once = || {
for s in stats.drain_window() {
assert!(
s.updates > 0,
"field-split: drained summary has updates==0 with \
non-zero other fields: {s:?}"
);
acc.0 += s.updates;
acc.1 += s.targets_total;
acc.2 += s.no_targets;
acc.3 += s.interest_resolve_failed;
}
};
while !done.load(AtomicOrdering::Relaxed) {
drain_once();
std::thread::yield_now();
}
drain_once();
acc
})
};
let mut recorders = Vec::new();
for _ in 0..THREADS {
let stats = Arc::clone(&stats);
recorders.push(scope.spawn(move || {
for _ in 0..PER_THREAD {
stats.record_broadcast(c, TARGETS_EACH, IRF_EACH);
}
}));
}
for r in recorders {
r.join().unwrap();
}
done.store(true, AtomicOrdering::Relaxed);
drainer.join().unwrap()
});
let expected = THREADS as u64 * PER_THREAD;
assert_eq!(
acc.0, expected,
"every record must be counted exactly once across all windows"
);
assert_eq!(acc.1, expected * TARGETS_EACH as u64);
assert_eq!(acc.2, 0);
assert_eq!(acc.3, expected * IRF_EACH as u64);
}
#[test]
fn drain_and_log_emits_info_summary_and_is_silent_when_idle() {
use crate::test_utils::TestLogger;
let logger = TestLogger::new().capture_logs().with_level("info").init();
let stats = UpdatePropagationStats::new();
stats.drain_and_log();
assert!(
!logger.contains("update_propagation_summary"),
"an idle window must emit nothing, got: {:?}",
logger.logs()
);
stats.record_broadcast(cid(1), 5, 0);
stats.record_broadcast(cid(1), 3, 1);
stats.drain_and_log();
assert!(
logger.contains("update_propagation_summary"),
"active window must emit the summary, got: {:?}",
logger.logs()
);
assert!(
logger.contains("INFO"),
"summary must be INFO-level, got: {:?}",
logger.logs()
);
let before = logger.logs().len();
stats.drain_and_log();
let after_logs = logger.logs();
assert!(
after_logs.len() == before
|| !after_logs[before..]
.iter()
.any(|l| l.contains("update_propagation_summary")),
"a post-drain idle window must not re-emit a summary, got: {:?}",
&after_logs[before..]
);
assert_eq!(stats.contracts.len(), 0, "idle entries should be evicted");
}
#[test]
fn summary_emitter_logs_at_info_pin_test() {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("src/operations/update/propagation_stats.rs");
let source = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("must read own source at {}: {e}", path.display()));
let needle = "phase = \"summary\",";
let idx = source
.find(needle)
.expect("summary emitter must still exist in source");
let preceding = &source[..idx];
let macro_idx = preceding
.rfind("tracing::")
.expect("a tracing macro must precede the summary emitter");
let after_macro = &preceding[macro_idx + "tracing::".len()..];
let macro_name = after_macro.split('!').next().unwrap_or("");
assert_eq!(
macro_name, "info",
"update_propagation_summary emitter must stay at INFO — it is the \
liveness signal #4281 restored after the #4272 DEBUG demotions. \
Closest preceding macro is `tracing::{macro_name}!`."
);
}
#[test]
fn broadcast_path_feeds_propagation_stats_pin_test() {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("src/node/network_bridge/p2p_protoc.rs");
let source = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("must read p2p_protoc.rs at {}: {e}", path.display()));
let call_count = source
.matches("update_propagation_stats.record_broadcast(")
.count();
assert_eq!(
call_count, 2,
"handle_broadcast_state_change must call \
`update_propagation_stats.record_broadcast(...)` exactly twice — once \
on the fresh no-target path and once on the targets-found success \
path. Found {call_count}. A different count means an outcome arm was \
dropped (silent telemetry rot) or recording leaked elsewhere."
);
assert!(
source.contains("if !is_retry {"),
"the fresh-no-target record must be gated on `!is_retry` so retry \
re-emissions are not counted as fresh broadcasts."
);
}
}