#![allow(dead_code)]
use dashmap::DashMap;
use freenet_stdlib::prelude::{ContractKey, StateDelta, StateSummary};
use lru::LruCache;
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::time::Instant;
use crate::transport::TransportPublicKey;
use crate::util::time_source::TimeSource;
pub const INTEREST_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(300);
pub const INTEREST_TTL: Duration = Duration::from_secs(INTEREST_HEARTBEAT_INTERVAL.as_secs() * 4);
pub const INTEREST_SWEEP_INTERVAL: Duration = Duration::from_secs(60);
pub const INTEREST_DISCONNECT_GRACE_PERIOD: Duration = Duration::from_secs(90);
use crate::config::GlobalExecutor;
use crate::config::GlobalRng;
const DELTA_CACHE_SIZE: usize = 1024;
const BROADCAST_CH_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct PeerKey(pub TransportPublicKey);
impl From<TransportPublicKey> for PeerKey {
fn from(key: TransportPublicKey) -> Self {
Self(key)
}
}
#[derive(Clone, Debug)]
pub struct PeerInterest {
pub summary: Option<StateSummary<'static>>,
pub last_refreshed: Instant,
pub is_upstream: bool,
}
impl PeerInterest {
pub fn new(summary: Option<StateSummary<'static>>, is_upstream: bool, now: Instant) -> Self {
Self {
summary,
last_refreshed: now,
is_upstream,
}
}
pub fn refresh(&mut self, now: Instant) {
self.last_refreshed = now;
}
pub fn is_expired_at(&self, now: Instant) -> bool {
now.saturating_duration_since(self.last_refreshed) > INTEREST_TTL
}
pub fn update_summary(&mut self, summary: Option<StateSummary<'static>>, now: Instant) {
self.summary = summary;
self.refresh(now);
}
}
#[derive(Clone, Debug, Default)]
pub struct LocalInterest {
pub hosting: bool,
pub local_client_count: usize,
pub downstream_subscriber_count: usize,
}
impl LocalInterest {
pub fn is_interested(&self) -> bool {
self.hosting || self.local_client_count > 0 || self.downstream_subscriber_count > 0
}
pub fn add_client(&mut self) -> bool {
let was_first = self.local_client_count == 0;
self.local_client_count += 1;
was_first && !self.hosting && self.downstream_subscriber_count == 0
}
pub fn remove_client(&mut self) -> bool {
self.local_client_count = self.local_client_count.saturating_sub(1);
!self.is_interested()
}
pub fn add_downstream(&mut self) -> bool {
let was_first =
self.downstream_subscriber_count == 0 && self.local_client_count == 0 && !self.hosting;
self.downstream_subscriber_count += 1;
was_first
}
pub fn remove_downstream(&mut self) -> bool {
self.downstream_subscriber_count = self.downstream_subscriber_count.saturating_sub(1);
!self.is_interested()
}
pub fn set_hosting(&mut self, hosting: bool) -> bool {
let was_interested = self.is_interested();
self.hosting = hosting;
let is_interested = self.is_interested();
was_interested != is_interested
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
struct DeltaCacheKey {
contract: ContractKey,
peer_summary_hash: u64,
our_summary_hash: u64,
}
fn hash_bytes(bytes: &[u8]) -> u64 {
use std::hash::Hasher;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
hasher.write(bytes);
hasher.finish()
}
pub fn contract_hash(contract: &ContractKey) -> u32 {
const FNV_OFFSET: u32 = 2166136261;
const FNV_PRIME: u32 = 16777619;
let id_bytes = contract.id().as_bytes();
let mut hash = FNV_OFFSET;
for byte in id_bytes {
hash ^= *byte as u32;
hash = hash.wrapping_mul(FNV_PRIME);
}
hash
}
pub fn is_delta_efficient(summary_size: usize, state_size: usize) -> bool {
if state_size == 0 {
return false;
}
summary_size * 2 < state_size
}
pub struct InterestManager<T: TimeSource> {
interested_peers: DashMap<ContractKey, HashMap<PeerKey, PeerInterest>>,
peer_contracts: DashMap<PeerKey, HashSet<ContractKey>>,
local_interests: DashMap<ContractKey, LocalInterest>,
delta_cache: Mutex<LruCache<DeltaCacheKey, StateDelta<'static>>>,
contract_hash_index: DashMap<u32, Vec<ContractKey>>,
time_source: T,
delta_sends: AtomicU64,
full_state_sends: AtomicU64,
delta_bytes_saved: AtomicU64,
resync_requests_received: AtomicU64,
summary_notify_timestamps: DashMap<ContractKey, Instant>,
pending_removals: DashMap<PeerKey, Instant>,
}
impl<T: TimeSource> InterestManager<T> {
pub fn new(time_source: T) -> Self {
Self {
interested_peers: DashMap::new(),
peer_contracts: DashMap::new(),
local_interests: DashMap::new(),
delta_cache: Mutex::new(LruCache::new(
NonZeroUsize::new(DELTA_CACHE_SIZE).expect("DELTA_CACHE_SIZE must be > 0"),
)),
contract_hash_index: DashMap::new(),
time_source,
delta_sends: AtomicU64::new(0),
full_state_sends: AtomicU64::new(0),
delta_bytes_saved: AtomicU64::new(0),
resync_requests_received: AtomicU64::new(0),
summary_notify_timestamps: DashMap::new(),
pending_removals: DashMap::new(),
}
}
pub fn record_delta_send(&self, state_size: usize, delta_size: usize) {
self.delta_sends.fetch_add(1, Ordering::Relaxed);
let bytes_saved = state_size.saturating_sub(delta_size);
self.delta_bytes_saved
.fetch_add(bytes_saved as u64, Ordering::Relaxed);
}
pub fn record_full_state_send(&self) {
self.full_state_sends.fetch_add(1, Ordering::Relaxed);
}
pub fn record_resync_request_received(&self) {
self.resync_requests_received
.fetch_add(1, Ordering::Relaxed);
}
pub fn now(&self) -> Instant {
self.time_source.now()
}
pub fn register_peer_interest(
&self,
contract: &ContractKey,
peer: PeerKey,
summary: Option<StateSummary<'static>>,
is_upstream: bool,
) -> bool {
let now = self.time_source.now();
let mut entry = self.interested_peers.entry(*contract).or_default();
let is_new = !entry.contains_key(&peer);
entry.insert(peer.clone(), PeerInterest::new(summary, is_upstream, now));
self.peer_contracts
.entry(peer)
.or_default()
.insert(*contract);
self.index_contract_hash(contract);
is_new
}
pub fn remove_peer_interest(&self, contract: &ContractKey, peer: &PeerKey) -> bool {
if let Some(mut entry) = self.interested_peers.get_mut(contract) {
let removed = entry.remove(peer).is_some();
if removed {
if let Some(mut peer_entry) = self.peer_contracts.get_mut(peer) {
peer_entry.remove(contract);
if peer_entry.is_empty() {
drop(peer_entry);
self.peer_contracts.remove_if(peer, |_, v| v.is_empty());
}
}
}
if entry.is_empty() {
drop(entry);
self.interested_peers
.remove_if(contract, |_, v| v.is_empty());
self.cleanup_contract_if_no_interest(contract);
}
removed
} else {
false
}
}
pub fn update_peer_summary(
&self,
contract: &ContractKey,
peer: &PeerKey,
summary: Option<StateSummary<'static>>,
) {
let now = self.time_source.now();
if let Some(mut entry) = self.interested_peers.get_mut(contract) {
if let Some(interest) = entry.get_mut(peer) {
interest.update_summary(summary, now);
}
}
}
pub fn refresh_peer_interest(&self, contract: &ContractKey, peer: &PeerKey) {
let now = self.time_source.now();
if let Some(mut entry) = self.interested_peers.get_mut(contract) {
if let Some(interest) = entry.get_mut(peer) {
interest.refresh(now);
}
}
}
pub fn get_interested_peers(&self, contract: &ContractKey) -> Vec<(PeerKey, PeerInterest)> {
let mut peers: Vec<(PeerKey, PeerInterest)> = self
.interested_peers
.get(contract)
.map(|entry| entry.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default();
peers.sort_by(|(a, _), (b, _)| a.0.as_bytes().cmp(b.0.as_bytes()));
peers
}
pub fn get_peer_interest(
&self,
contract: &ContractKey,
peer: &PeerKey,
) -> Option<PeerInterest> {
self.interested_peers
.get(contract)
.and_then(|entry| entry.get(peer).cloned())
}
pub fn get_contracts_for_peer(&self, peer: &PeerKey) -> HashSet<ContractKey> {
self.peer_contracts
.get(peer)
.map(|entry| entry.value().clone())
.unwrap_or_default()
}
pub fn get_peer_summary(
&self,
contract: &ContractKey,
peer: &PeerKey,
) -> Option<StateSummary<'static>> {
self.interested_peers
.get(contract)
.and_then(|entry| entry.get(peer).and_then(|i| i.summary.clone()))
}
pub fn should_send_summary_notification(&self, contract: &ContractKey) -> bool {
let now = self.time_source.now();
let min_interval = Duration::from_millis(100);
let mut entry = self.summary_notify_timestamps.entry(*contract).or_insert(
now - min_interval - Duration::from_millis(1),
);
if now.duration_since(*entry.value()) >= min_interval {
*entry.value_mut() = now;
true
} else {
false
}
}
pub fn remove_all_peer_interests(&self, peer: &PeerKey) -> usize {
let contracts: Vec<ContractKey> = self
.peer_contracts
.remove(peer)
.map(|(_, set)| set.into_iter().collect())
.unwrap_or_default();
let removed_count = contracts.len();
for contract in &contracts {
if let Some(mut entry) = self.interested_peers.get_mut(contract) {
entry.remove(peer);
if entry.is_empty() {
drop(entry);
self.interested_peers
.remove_if(contract, |_, v| v.is_empty());
self.cleanup_contract_if_no_interest(contract);
}
}
}
if removed_count > 0 {
tracing::debug!(removed_count, "Removed peer interests on disconnect");
}
removed_count
}
pub fn schedule_deferred_removal(&self, peer: &PeerKey) {
let deadline = self.time_source.now() + INTEREST_DISCONNECT_GRACE_PERIOD;
self.pending_removals.insert(peer.clone(), deadline);
tracing::debug!(
peer = %peer.0,
grace_secs = INTEREST_DISCONNECT_GRACE_PERIOD.as_secs(),
"Scheduled deferred interest removal"
);
}
pub fn cancel_deferred_removal(&self, peer: &PeerKey) -> bool {
let cancelled = self.pending_removals.remove(peer).is_some();
if cancelled {
tracing::debug!(
peer = %peer.0,
"Cancelled deferred interest removal — peer reconnected"
);
}
cancelled
}
pub fn execute_pending_removals(&self) -> usize {
let now = self.time_source.now();
let expired_peers: Vec<PeerKey> = self
.pending_removals
.iter()
.filter(|entry| now >= *entry.value())
.map(|entry| entry.key().clone())
.collect();
let mut executed = 0;
for peer in &expired_peers {
if self.pending_removals.remove(peer).is_some() {
let removed = self.remove_all_peer_interests(peer);
tracing::info!(
peer = %peer.0,
removed_interests = removed,
"Executed deferred interest removal — peer did not reconnect"
);
executed += 1;
}
}
executed
}
pub fn register_local_interest(&self, contract: &ContractKey) -> &Self {
self.local_interests.entry(*contract).or_default();
self.index_contract_hash(contract);
self
}
pub fn register_local_hosting(&self, contract: &ContractKey) -> bool {
let mut entry = self.local_interests.entry(*contract).or_default();
let was_interested = entry.is_interested();
entry.hosting = true;
self.index_contract_hash(contract);
!was_interested
}
pub fn unregister_local_hosting(&self, contract: &ContractKey) -> bool {
if let Some(mut entry) = self.local_interests.get_mut(contract) {
entry.hosting = false;
let lost_interest = !entry.is_interested();
if lost_interest {
drop(entry);
self.local_interests.remove(contract);
self.cleanup_contract_if_no_interest(contract);
}
lost_interest
} else {
false
}
}
pub fn add_local_client(&self, contract: &ContractKey) -> bool {
let mut entry = self.local_interests.entry(*contract).or_default();
let became_interested = entry.add_client();
self.index_contract_hash(contract);
became_interested
}
pub fn remove_local_client(&self, contract: &ContractKey) -> bool {
if let Some(mut entry) = self.local_interests.get_mut(contract) {
let lost_interest = entry.remove_client();
if lost_interest {
drop(entry);
self.local_interests.remove(contract);
self.cleanup_contract_if_no_interest(contract);
}
lost_interest
} else {
false
}
}
pub fn add_downstream_subscriber(&self, contract: &ContractKey) -> bool {
let mut entry = self.local_interests.entry(*contract).or_default();
let became_interested = entry.add_downstream();
self.index_contract_hash(contract);
became_interested
}
pub fn remove_downstream_subscriber(&self, contract: &ContractKey) -> bool {
if let Some(mut entry) = self.local_interests.get_mut(contract) {
let lost_interest = entry.remove_downstream();
if lost_interest {
drop(entry);
self.local_interests.remove(contract);
self.cleanup_contract_if_no_interest(contract);
}
lost_interest
} else {
false
}
}
pub fn with_local_interest<F, R>(&self, contract: &ContractKey, f: F) -> R
where
F: FnOnce(&mut LocalInterest) -> R,
{
let mut entry = self.local_interests.entry(*contract).or_default();
f(entry.value_mut())
}
pub fn has_local_interest(&self, contract: &ContractKey) -> bool {
self.local_interests
.get(contract)
.map(|entry| entry.is_interested())
.unwrap_or(false)
}
pub fn cleanup_local_interest(&self, contract: &ContractKey) {
if let Some(entry) = self.local_interests.get(contract) {
if !entry.is_interested() {
drop(entry);
self.local_interests.remove(contract);
}
}
}
pub fn sweep_expired_interests(&self) -> Vec<(ContractKey, PeerKey)> {
let now = self.time_source.now();
let mut expired = Vec::new();
let mut contracts: Vec<_> = self
.interested_peers
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect();
contracts.sort_by(|(a, _), (b, _)| a.id().as_bytes().cmp(b.id().as_bytes()));
for (contract, peers_map) in contracts {
let mut peers_to_remove: Vec<PeerKey> = peers_map
.iter()
.filter(|(_, interest)| interest.is_expired_at(now))
.map(|(peer, _)| peer.clone())
.collect();
peers_to_remove.sort_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()));
for peer in peers_to_remove {
expired.push((contract, peer));
}
}
for (contract, peer) in &expired {
self.remove_peer_interest(contract, peer);
}
if !expired.is_empty() {
tracing::debug!(
expired_count = expired.len(),
"Interest sweep: removed expired entries"
);
}
expired
}
pub fn start_sweep_task(manager: std::sync::Arc<Self>)
where
T: Send + Sync + 'static,
{
GlobalExecutor::spawn(Self::sweep_task(manager));
}
async fn sweep_task(manager: std::sync::Arc<Self>)
where
T: Send + Sync + 'static,
{
let initial_delay = Duration::from_secs(GlobalRng::random_range(10u64..=30u64));
tokio::time::sleep(initial_delay).await;
let mut interval = tokio::time::interval(INTEREST_SWEEP_INTERVAL);
interval.tick().await;
loop {
interval.tick().await;
manager.execute_pending_removals();
let stats = manager.stats();
let expired = manager.sweep_expired_interests();
if !expired.is_empty() {
tracing::info!(
expired_count = expired.len(),
"Interest sweep: cleaned up expired peer interests"
);
for (contract, peer) in &expired {
crate::tracing::telemetry::send_standalone_event(
"interest_expired",
serde_json::json!({
"contract": contract.to_string(),
"peer": peer.0.to_string(),
}),
);
}
}
crate::tracing::telemetry::send_standalone_event(
"subscription_health_snapshot",
serde_json::json!({
"contracts_with_interests": stats.total_contracts,
"total_interest_entries": stats.total_peer_interests,
"expired_this_sweep": expired.len(),
}),
);
}
}
fn index_contract_hash(&self, contract: &ContractKey) {
let hash = contract_hash(contract);
let mut entry = self.contract_hash_index.entry(hash).or_default();
if !entry.contains(contract) {
entry.push(*contract);
}
}
fn unindex_contract_hash(&self, contract: &ContractKey) {
let hash = contract_hash(contract);
if let Some(mut entry) = self.contract_hash_index.get_mut(&hash) {
entry.retain(|c| c != contract);
if entry.is_empty() {
drop(entry);
self.contract_hash_index.remove(&hash);
}
}
}
fn cleanup_contract_if_no_interest(&self, contract: &ContractKey) {
let has_peer_interest = self.interested_peers.contains_key(contract);
let has_local_interest = self.has_local_interest(contract);
if !has_peer_interest && !has_local_interest {
self.unindex_contract_hash(contract);
self.summary_notify_timestamps.remove(contract);
}
}
pub fn lookup_by_hash(&self, hash: u32) -> Vec<ContractKey> {
self.contract_hash_index
.get(&hash)
.as_deref()
.cloned()
.unwrap_or_default()
}
pub fn get_all_interest_hashes(&self) -> Vec<u32> {
let mut hashes: Vec<u32> = self.contract_hash_index.iter().map(|e| *e.key()).collect();
hashes.sort_unstable();
hashes
}
pub fn get_matching_contracts(&self, hashes: &[u32]) -> Vec<ContractKey> {
let hash_set: std::collections::HashSet<u32> = hashes.iter().copied().collect();
let mut contracts: Vec<ContractKey> = self
.contract_hash_index
.iter()
.filter(|entry| hash_set.contains(entry.key()))
.flat_map(|entry| entry.value().clone())
.collect();
contracts.sort_by(|a, b| a.id().as_bytes().cmp(b.id().as_bytes()));
contracts
}
pub fn cache_delta(
&self,
contract: &ContractKey,
peer_summary: &[u8],
our_summary: &[u8],
delta: StateDelta<'static>,
) {
let key = DeltaCacheKey {
contract: *contract,
peer_summary_hash: hash_bytes(peer_summary),
our_summary_hash: hash_bytes(our_summary),
};
self.delta_cache.lock().put(key, delta);
}
pub fn get_cached_delta(
&self,
contract: &ContractKey,
peer_summary: &[u8],
our_summary: &[u8],
) -> Option<StateDelta<'static>> {
let key = DeltaCacheKey {
contract: *contract,
peer_summary_hash: hash_bytes(peer_summary),
our_summary_hash: hash_bytes(our_summary),
};
self.delta_cache.lock().get(&key).cloned()
}
pub async fn get_contract_summary(
&self,
op_manager: &crate::node::OpManager,
key: &ContractKey,
) -> Option<StateSummary<'static>> {
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler_with_timeout(
ContractHandlerEvent::GetSummaryQuery { key: *key },
BROADCAST_CH_TIMEOUT,
)
.await
{
Ok(ContractHandlerEvent::GetSummaryResponse { summary: Ok(s), .. }) => Some(s),
Ok(ContractHandlerEvent::GetSummaryResponse {
summary: Err(e), ..
}) => {
tracing::debug!(
contract = %key,
error = %e,
"Failed to get contract summary"
);
None
}
Ok(other) => {
tracing::warn!(
contract = %key,
response = ?other,
"Unexpected response to GetSummaryQuery"
);
None
}
Err(e) => {
tracing::debug!(
contract = %key,
error = %e,
"Error getting contract summary"
);
None
}
}
}
pub async fn compute_delta(
&self,
op_manager: &crate::node::OpManager,
key: &ContractKey,
their_summary: &StateSummary<'static>,
our_summary: &StateSummary<'static>,
our_state_size: usize,
) -> Result<Option<StateDelta<'static>>, String> {
use crate::contract::ContractHandlerEvent;
let their_summary_bytes = their_summary.as_ref();
let our_summary_bytes = our_summary.as_ref();
if let Some(cached) = self.get_cached_delta(key, their_summary_bytes, our_summary_bytes) {
if cached.as_ref().is_empty() {
tracing::trace!(contract = %key, "Cached empty delta (no change)");
return Ok(None);
}
tracing::trace!(contract = %key, "Using cached delta");
return Ok(Some(cached));
}
if !is_delta_efficient(their_summary_bytes.len(), our_state_size) {
return Err("Delta not efficient for this contract".to_string());
}
match op_manager
.notify_contract_handler_with_timeout(
ContractHandlerEvent::GetDeltaQuery {
key: *key,
their_summary: their_summary.clone(),
},
BROADCAST_CH_TIMEOUT,
)
.await
{
Ok(ContractHandlerEvent::GetDeltaResponse { delta: Ok(d), .. }) => {
if d.as_ref().is_empty() {
self.cache_delta(key, their_summary_bytes, our_summary_bytes, d);
tracing::trace!(
contract = %key,
"Contract returned empty delta (no change)"
);
Ok(None)
} else {
self.cache_delta(key, their_summary_bytes, our_summary_bytes, d.clone());
Ok(Some(d))
}
}
Ok(ContractHandlerEvent::GetDeltaResponse { delta: Err(e), .. }) => {
Err(format!("Delta computation failed: {}", e))
}
Ok(other) => Err(format!("Unexpected response to GetDeltaQuery: {:?}", other)),
Err(e) => Err(format!("Error computing delta: {}", e)),
}
}
pub fn stats(&self) -> InterestManagerStats {
let total_contracts = self.interested_peers.len();
let total_peer_interests: usize = self
.interested_peers
.iter()
.map(|entry| entry.value().len())
.sum();
let local_interests = self.local_interests.len();
let hash_index_size = self.contract_hash_index.len();
InterestManagerStats {
total_contracts,
total_peer_interests,
local_interests,
hash_index_size,
delta_sends: self.delta_sends.load(Ordering::Relaxed),
full_state_sends: self.full_state_sends.load(Ordering::Relaxed),
delta_bytes_saved: self.delta_bytes_saved.load(Ordering::Relaxed),
resync_requests_received: self.resync_requests_received.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct InterestManagerStats {
pub total_contracts: usize,
pub total_peer_interests: usize,
pub local_interests: usize,
pub hash_index_size: usize,
pub delta_sends: u64,
pub full_state_sends: u64,
pub delta_bytes_saved: u64,
pub resync_requests_received: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::time_source::SharedMockTimeSource;
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId};
type TestInterestManager = InterestManager<SharedMockTimeSource>;
fn make_contract_key(seed: u8) -> ContractKey {
ContractKey::from_id_and_code(
ContractInstanceId::new([seed; 32]),
CodeHash::new([seed.wrapping_add(1); 32]),
)
}
fn make_peer_key(_seed: u8) -> PeerKey {
use crate::transport::TransportKeypair;
let keypair = TransportKeypair::new();
PeerKey(keypair.public().clone())
}
fn make_manager() -> (TestInterestManager, SharedMockTimeSource) {
let time_source = SharedMockTimeSource::new();
let manager = InterestManager::new(time_source.clone());
(manager, time_source)
}
#[test]
fn test_register_and_remove_peer_interest() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
assert!(manager.register_peer_interest(&contract, peer.clone(), None, false));
assert!(!manager.register_peer_interest(&contract, peer.clone(), None, false));
assert!(manager.get_peer_interest(&contract, &peer).is_some());
assert!(manager.remove_peer_interest(&contract, &peer));
assert!(manager.get_peer_interest(&contract, &peer).is_none());
assert!(!manager.remove_peer_interest(&contract, &peer));
}
#[test]
fn test_update_peer_summary() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer.clone(), None, false);
assert!(manager.get_peer_summary(&contract, &peer).is_none());
let summary = StateSummary::from(vec![1, 2, 3]);
manager.update_peer_summary(&contract, &peer, Some(summary.clone()));
let retrieved = manager.get_peer_summary(&contract, &peer);
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().as_ref(), summary.as_ref());
}
#[test]
fn test_local_interest_tracking() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
assert!(!manager.has_local_interest(&contract));
manager.with_local_interest(&contract, |interest| {
interest.set_hosting(true);
});
assert!(manager.has_local_interest(&contract));
manager.with_local_interest(&contract, |interest| {
interest.add_client();
});
assert!(manager.has_local_interest(&contract));
manager.with_local_interest(&contract, |interest| {
interest.set_hosting(false);
});
assert!(manager.has_local_interest(&contract));
manager.with_local_interest(&contract, |interest| {
interest.remove_client();
});
assert!(!manager.has_local_interest(&contract));
}
#[test]
fn test_local_interest_transitions() {
let mut interest = LocalInterest::default();
assert!(!interest.is_interested());
assert!(interest.add_client()); assert!(interest.is_interested());
assert!(!interest.add_client()); assert!(interest.is_interested());
assert!(!interest.remove_client()); assert!(interest.is_interested());
assert!(interest.remove_client()); assert!(!interest.is_interested());
}
#[test]
fn test_contract_hash_consistency() {
let contract = make_contract_key(42);
let hash1 = contract_hash(&contract);
let hash2 = contract_hash(&contract);
assert_eq!(hash1, hash2);
let other_contract = make_contract_key(43);
let other_hash = contract_hash(&other_contract);
assert_ne!(hash1, other_hash);
}
#[test]
fn test_contract_hash_index() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer, None, false);
let hash = contract_hash(&contract);
let retrieved = manager.lookup_by_hash(hash);
assert_eq!(retrieved, vec![contract]);
assert!(manager.lookup_by_hash(12345).is_empty());
}
#[test]
fn test_get_all_interest_hashes() {
let (manager, _time) = make_manager();
let contract1 = make_contract_key(1);
let contract2 = make_contract_key(2);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract1, peer.clone(), None, false);
manager.register_local_hosting(&contract2);
let hashes = manager.get_all_interest_hashes();
assert_eq!(hashes.len(), 2);
assert!(hashes.contains(&contract_hash(&contract1)));
assert!(hashes.contains(&contract_hash(&contract2)));
}
#[test]
fn test_delta_efficiency_check() {
assert!(is_delta_efficient(100, 1000));
assert!(!is_delta_efficient(500, 1000));
assert!(!is_delta_efficient(1500, 1000));
assert!(!is_delta_efficient(100, 0));
}
#[test]
fn test_delta_cache() {
let (manager, _time) = make_manager();
let contract1 = make_contract_key(1);
let contract2 = make_contract_key(2);
let peer_summary = vec![1, 2, 3];
let our_summary = vec![4, 5, 6];
let delta = StateDelta::from(vec![7, 8, 9]);
assert!(
manager
.get_cached_delta(&contract1, &peer_summary, &our_summary)
.is_none()
);
manager.cache_delta(&contract1, &peer_summary, &our_summary, delta.clone());
let cached = manager.get_cached_delta(&contract1, &peer_summary, &our_summary);
assert!(cached.is_some());
assert_eq!(cached.unwrap().as_ref(), delta.as_ref());
assert!(
manager
.get_cached_delta(&contract2, &peer_summary, &our_summary)
.is_none()
);
}
#[test]
fn test_sweep_expired_interests() {
let (manager, time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer.clone(), None, false);
time.advance_time(INTEREST_TTL + Duration::from_secs(1));
let expired = manager.sweep_expired_interests();
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].0, contract);
assert!(manager.get_peer_interest(&contract, &peer).is_none());
}
#[test]
fn test_refresh_prevents_expiration() {
let (manager, time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer.clone(), None, false);
time.advance_time(INTEREST_TTL - Duration::from_secs(10));
manager.refresh_peer_interest(&contract, &peer);
time.advance_time(Duration::from_secs(20));
let expired = manager.sweep_expired_interests();
assert!(expired.is_empty());
assert!(manager.get_peer_interest(&contract, &peer).is_some());
}
#[test]
fn test_stats() {
let (manager, _time) = make_manager();
let contract1 = make_contract_key(1);
let contract2 = make_contract_key(2);
let peer1 = make_peer_key(1);
let peer2 = make_peer_key(2);
manager.register_peer_interest(&contract1, peer1.clone(), None, false);
manager.register_peer_interest(&contract1, peer2.clone(), None, false);
manager.register_peer_interest(&contract2, peer1, None, true);
manager.with_local_interest(&contract1, |i| i.set_hosting(true));
let stats = manager.stats();
assert_eq!(stats.total_contracts, 2);
assert_eq!(stats.total_peer_interests, 3);
assert_eq!(stats.local_interests, 1);
assert!(stats.hash_index_size >= 2);
}
#[test]
fn test_delta_sync_metrics() {
let (manager, _time) = make_manager();
let stats = manager.stats();
assert_eq!(stats.delta_sends, 0);
assert_eq!(stats.full_state_sends, 0);
assert_eq!(stats.delta_bytes_saved, 0);
manager.record_delta_send(1000, 100);
manager.record_delta_send(2000, 200);
manager.record_full_state_send();
manager.record_full_state_send();
let stats = manager.stats();
assert_eq!(stats.delta_sends, 2);
assert_eq!(stats.full_state_sends, 2);
assert_eq!(stats.delta_bytes_saved, 2700);
}
#[test]
fn test_get_matching_contracts() {
let (manager, _time) = make_manager();
let contract1 = make_contract_key(1);
let contract2 = make_contract_key(2);
let contract3 = make_contract_key(3);
manager.register_local_hosting(&contract1);
manager.register_local_hosting(&contract2);
let hash1 = contract_hash(&contract1);
let hash2 = contract_hash(&contract2);
let hash3 = contract_hash(&contract3);
let matching = manager.get_matching_contracts(&[hash1, hash3]);
assert_eq!(matching.len(), 1);
assert!(matching.contains(&contract1));
let matching = manager.get_matching_contracts(&[hash1, hash2]);
assert_eq!(matching.len(), 2);
assert!(matching.contains(&contract1));
assert!(matching.contains(&contract2));
let matching = manager.get_matching_contracts(&[hash3, 99999]);
assert!(matching.is_empty());
let matching = manager.get_matching_contracts(&[]);
assert!(matching.is_empty());
}
#[test]
fn test_interest_sync_flow_simulation() {
let (manager_a, _time_a) = make_manager();
let (manager_b, _time_b) = make_manager();
let contract1 = make_contract_key(1);
let contract2 = make_contract_key(2);
let contract3 = make_contract_key(3);
let peer_a = make_peer_key(1);
let peer_b = make_peer_key(2);
let summary1 = StateSummary::from(vec![1, 1, 1]);
let summary2 = StateSummary::from(vec![2, 2, 2]);
manager_a.register_local_hosting(&contract1);
manager_a.register_local_hosting(&contract2);
manager_b.register_local_hosting(&contract2);
manager_b.register_local_hosting(&contract3);
let a_hashes = manager_a.get_all_interest_hashes();
assert_eq!(a_hashes.len(), 2);
let matching = manager_b.get_matching_contracts(&a_hashes);
assert_eq!(matching.len(), 1);
assert!(matching.contains(&contract2));
for contract in &matching {
manager_b.register_peer_interest(contract, peer_a.clone(), None, false);
}
assert!(
manager_b
.get_interested_peers(&contract2)
.iter()
.any(|(pk, _)| pk == &peer_a)
);
manager_a.register_peer_interest(&contract2, peer_b.clone(), Some(summary2.clone()), false);
let cached_summary = manager_a.get_peer_summary(&contract2, &peer_b);
assert!(cached_summary.is_some());
assert_eq!(cached_summary.unwrap().as_ref(), summary2.as_ref());
manager_b.update_peer_summary(&contract2, &peer_a, Some(summary1.clone()));
let cached_summary = manager_b.get_peer_summary(&contract2, &peer_a);
assert!(cached_summary.is_some());
assert_eq!(cached_summary.unwrap().as_ref(), summary1.as_ref());
}
#[test]
fn test_change_interests_flow_simulation() {
let (manager, _time) = make_manager();
let contract1 = make_contract_key(1);
let contract2 = make_contract_key(2);
let peer = make_peer_key(1);
let hash1 = contract_hash(&contract1);
let hash2 = contract_hash(&contract2);
manager.register_local_hosting(&contract1);
let added_hashes = vec![hash1, hash2];
for hash in &added_hashes {
for contract in manager.lookup_by_hash(*hash) {
if manager.has_local_interest(&contract) {
manager.register_peer_interest(&contract, peer.clone(), None, false);
}
}
}
assert!(
manager
.get_interested_peers(&contract1)
.iter()
.any(|(pk, _)| pk == &peer)
);
assert!(
!manager
.get_interested_peers(&contract2)
.iter()
.any(|(pk, _)| pk == &peer)
);
let removed_hashes = vec![hash1];
for hash in &removed_hashes {
for contract in manager.lookup_by_hash(*hash) {
manager.remove_peer_interest(&contract, &peer);
}
}
assert!(
!manager
.get_interested_peers(&contract1)
.iter()
.any(|(pk, _)| pk == &peer)
);
}
#[test]
fn test_resync_clears_summary() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
let summary = StateSummary::from(vec![1, 2, 3]);
manager.register_peer_interest(&contract, peer.clone(), Some(summary.clone()), false);
let cached = manager.get_peer_summary(&contract, &peer);
assert!(cached.is_some());
manager.update_peer_summary(&contract, &peer, None);
let cached = manager.get_peer_summary(&contract, &peer);
assert!(cached.is_none());
assert!(
manager
.get_interested_peers(&contract)
.iter()
.any(|(pk, _)| pk == &peer)
);
}
#[test]
fn test_resync_full_flow() {
let (manager_a, _time_a) = make_manager();
let (manager_b, _time_b) = make_manager();
let contract = make_contract_key(1);
let peer_a = make_peer_key(1);
let peer_b = make_peer_key(2);
let old_summary = StateSummary::from(vec![1, 2, 3]); let new_summary = StateSummary::from(vec![4, 5, 6]);
manager_a.register_local_hosting(&contract);
manager_b.register_local_hosting(&contract);
manager_a.register_peer_interest(
&contract,
peer_b.clone(),
Some(new_summary.clone()),
false,
);
manager_b.register_peer_interest(
&contract,
peer_a.clone(),
Some(old_summary.clone()),
false,
);
manager_b.update_peer_summary(&contract, &peer_a, None);
let cached = manager_b.get_peer_summary(&contract, &peer_a);
assert!(cached.is_none(), "B should have cleared A's summary");
manager_a.update_peer_summary(&contract, &peer_b, Some(new_summary.clone()));
let cached = manager_a.get_peer_summary(&contract, &peer_b);
assert!(cached.is_some(), "A should have B's summary");
assert_eq!(
cached.unwrap().as_ref(),
new_summary.as_ref(),
"A should have B's correct summary"
);
assert!(
manager_a
.get_interested_peers(&contract)
.iter()
.any(|(pk, _)| pk == &peer_b)
);
assert!(
manager_b
.get_interested_peers(&contract)
.iter()
.any(|(pk, _)| pk == &peer_a)
);
}
#[test]
fn test_delta_vs_full_state_decision() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
let peer_with_summary = make_peer_key(1);
let peer_without_summary = make_peer_key(2);
manager.register_local_hosting(&contract);
let small_summary = StateSummary::from(vec![1; 100]); let large_state_size = 1000;
let large_summary = StateSummary::from(vec![1; 600]);
manager.register_peer_interest(
&contract,
peer_with_summary.clone(),
Some(small_summary.clone()),
false,
);
manager.register_peer_interest(&contract, peer_without_summary.clone(), None, false);
let peer_summary = manager.get_peer_summary(&contract, &peer_with_summary);
assert!(peer_summary.is_some(), "peer should have summary");
let summary = peer_summary.unwrap();
assert!(
is_delta_efficient(summary.as_ref().len(), large_state_size),
"small summary should be efficient for delta"
);
let peer_summary = manager.get_peer_summary(&contract, &peer_without_summary);
assert!(
peer_summary.is_none(),
"peer without summary should trigger full state"
);
assert!(
!is_delta_efficient(large_summary.as_ref().len(), large_state_size),
"large summary (>50% of state) should not be efficient for delta"
);
let half_summary = StateSummary::from(vec![1; 500]); assert!(
!is_delta_efficient(half_summary.as_ref().len(), large_state_size),
"summary at exactly 50% boundary should not be efficient"
);
let just_under_half = StateSummary::from(vec![1; 499]); assert!(
is_delta_efficient(just_under_half.as_ref().len(), large_state_size),
"summary just under 50% should be efficient"
);
}
#[test]
fn test_broadcast_peer_selection() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
let peer1 = make_peer_key(1); let peer2 = make_peer_key(2); let peer3 = make_peer_key(3);
let summary1 = StateSummary::from(vec![1, 2, 3]);
let summary3 = StateSummary::from(vec![3, 2, 1]);
manager.register_local_hosting(&contract);
manager.register_peer_interest(&contract, peer1.clone(), Some(summary1.clone()), false);
manager.register_peer_interest(&contract, peer2.clone(), None, false);
manager.register_peer_interest(&contract, peer3.clone(), Some(summary3.clone()), false);
let interested = manager.get_interested_peers(&contract);
assert_eq!(interested.len(), 3);
let mut delta_peers = Vec::new();
let mut full_state_peers = Vec::new();
for (peer_key, _interest) in &interested {
if let Some(_summary) = manager.get_peer_summary(&contract, peer_key) {
delta_peers.push(peer_key.clone());
} else {
full_state_peers.push(peer_key.clone());
}
}
assert_eq!(delta_peers.len(), 2);
assert!(delta_peers.contains(&peer1));
assert!(delta_peers.contains(&peer3));
assert_eq!(full_state_peers.len(), 1);
assert!(full_state_peers.contains(&peer2));
}
#[test]
fn test_get_contracts_for_peer() {
let (manager, _time) = make_manager();
let contract1 = make_contract_key(1);
let contract2 = make_contract_key(2);
let contract3 = make_contract_key(3);
let peer = make_peer_key(1);
let contracts = manager.get_contracts_for_peer(&peer);
assert!(contracts.is_empty());
manager.register_peer_interest(&contract1, peer.clone(), None, false);
manager.register_peer_interest(&contract2, peer.clone(), None, false);
let contracts = manager.get_contracts_for_peer(&peer);
assert_eq!(contracts.len(), 2);
assert!(contracts.contains(&contract1));
assert!(contracts.contains(&contract2));
assert!(!contracts.contains(&contract3));
manager.remove_peer_interest(&contract1, &peer);
let contracts = manager.get_contracts_for_peer(&peer);
assert_eq!(contracts.len(), 1);
assert!(contracts.contains(&contract2));
}
#[test]
fn test_full_replace_interest_sync() {
let (manager, time) = make_manager();
let contract1 = make_contract_key(1);
let contract2 = make_contract_key(2);
let contract3 = make_contract_key(3);
let peer = make_peer_key(1);
manager.register_local_hosting(&contract1);
manager.register_local_hosting(&contract2);
manager.register_local_hosting(&contract3);
manager.register_peer_interest(&contract1, peer.clone(), None, false);
manager.register_peer_interest(&contract2, peer.clone(), None, false);
time.advance_time(Duration::from_secs(60));
let incoming_hashes: HashSet<u32> = [contract_hash(&contract2), contract_hash(&contract3)]
.into_iter()
.collect();
let current_contracts = manager.get_contracts_for_peer(&peer);
assert_eq!(current_contracts.len(), 2);
for contract in ¤t_contracts {
let h = contract_hash(contract);
if !incoming_hashes.contains(&h) {
manager.remove_peer_interest(contract, &peer);
}
}
let matching =
manager.get_matching_contracts(&incoming_hashes.iter().copied().collect::<Vec<_>>());
for contract in &matching {
if manager.get_peer_interest(contract, &peer).is_some() {
manager.refresh_peer_interest(contract, &peer);
} else {
manager.register_peer_interest(contract, peer.clone(), None, false);
}
}
assert!(
manager.get_peer_interest(&contract1, &peer).is_none(),
"contract1 should have been removed"
);
assert!(
manager.get_peer_interest(&contract2, &peer).is_some(),
"contract2 should still exist (refreshed)"
);
assert!(
manager.get_peer_interest(&contract3, &peer).is_some(),
"contract3 should have been added"
);
let interest2 = manager.get_peer_interest(&contract2, &peer).unwrap();
assert!(
!interest2.is_expired_at(time.now()),
"contract2 interest should not be expired after refresh"
);
}
#[test]
fn test_refresh_preserves_summary() {
let (manager, time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
let summary = StateSummary::from(vec![1, 2, 3]);
manager.register_peer_interest(&contract, peer.clone(), Some(summary.clone()), false);
time.advance_time(Duration::from_secs(60));
manager.refresh_peer_interest(&contract, &peer);
let cached = manager.get_peer_summary(&contract, &peer);
assert!(
cached.is_some(),
"summary should be preserved after refresh"
);
assert_eq!(cached.unwrap().as_ref(), summary.as_ref());
let interest = manager.get_peer_interest(&contract, &peer).unwrap();
assert!(
!interest.is_expired_at(time.now()),
"interest should not be expired after refresh"
);
}
#[test]
fn test_is_upstream_flag_registration() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
let upstream_peer = make_peer_key(1);
let downstream_peer = make_peer_key(2);
manager.register_peer_interest(&contract, upstream_peer.clone(), None, true);
manager.register_peer_interest(&contract, downstream_peer.clone(), None, false);
let upstream_interest = manager
.get_peer_interest(&contract, &upstream_peer)
.unwrap();
assert!(
upstream_interest.is_upstream,
"Peer registered with is_upstream=true should have is_upstream=true"
);
let downstream_interest = manager
.get_peer_interest(&contract, &downstream_peer)
.unwrap();
assert!(
!downstream_interest.is_upstream,
"Peer registered with is_upstream=false should have is_upstream=false"
);
let peers = manager.get_interested_peers(&contract);
assert_eq!(peers.len(), 2);
let upstream_entry = peers.iter().find(|(k, _)| k == &upstream_peer).unwrap();
assert!(upstream_entry.1.is_upstream);
let downstream_entry = peers.iter().find(|(k, _)| k == &downstream_peer).unwrap();
assert!(!downstream_entry.1.is_upstream);
}
#[test]
fn test_register_peer_interest_resets_ttl() {
let (manager, time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer.clone(), None, false);
time.advance_time(INTEREST_TTL - Duration::from_secs(10));
manager.register_peer_interest(&contract, peer.clone(), None, false);
time.advance_time(Duration::from_secs(20));
let expired = manager.sweep_expired_interests();
assert!(expired.is_empty(), "re-registration should have reset TTL");
assert!(manager.get_peer_interest(&contract, &peer).is_some());
}
#[test]
fn test_subscribe_registers_local_interest() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
assert!(!manager.has_local_interest(&contract));
let became_interested = manager.add_local_client(&contract);
assert!(became_interested);
assert!(manager.has_local_interest(&contract));
let became_interested_again = manager.add_local_client(&contract);
assert!(!became_interested_again);
assert!(manager.has_local_interest(&contract));
}
#[test]
fn test_downstream_subscriber_creates_local_interest() {
let (manager, _time) = make_manager();
let contract = make_contract_key(50);
assert!(!manager.has_local_interest(&contract));
let became_interested = manager.add_downstream_subscriber(&contract);
assert!(
became_interested,
"First downstream subscriber should create interest"
);
assert!(
manager.has_local_interest(&contract),
"Relay node with downstream subscriber must have local interest"
);
let became_interested_again = manager.add_downstream_subscriber(&contract);
assert!(!became_interested_again);
assert!(manager.has_local_interest(&contract));
let lost_interest = manager.remove_downstream_subscriber(&contract);
assert!(!lost_interest, "Still have one downstream subscriber");
assert!(manager.has_local_interest(&contract));
let lost_interest = manager.remove_downstream_subscriber(&contract);
assert!(lost_interest, "Last downstream subscriber removed");
assert!(
!manager.has_local_interest(&contract),
"No downstream subscribers left — should lose interest"
);
}
#[test]
fn test_deferred_removal_executes_after_grace_period() {
let (manager, time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer.clone(), None, false);
assert!(manager.get_peer_interest(&contract, &peer).is_some());
manager.schedule_deferred_removal(&peer);
time.advance_time(INTEREST_DISCONNECT_GRACE_PERIOD - Duration::from_secs(1));
let removed = manager.execute_pending_removals();
assert_eq!(removed, 0);
assert!(manager.get_peer_interest(&contract, &peer).is_some());
time.advance_time(Duration::from_secs(2));
let removed = manager.execute_pending_removals();
assert_eq!(removed, 1);
assert!(manager.get_peer_interest(&contract, &peer).is_none());
}
#[test]
fn test_deferred_removal_cancelled_on_reconnect() {
let (manager, time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer.clone(), None, false);
manager.schedule_deferred_removal(&peer);
time.advance_time(Duration::from_secs(30));
let cancelled = manager.cancel_deferred_removal(&peer);
assert!(cancelled);
time.advance_time(INTEREST_DISCONNECT_GRACE_PERIOD);
let removed = manager.execute_pending_removals();
assert_eq!(removed, 0);
assert!(manager.get_peer_interest(&contract, &peer).is_some());
}
#[test]
fn test_deferred_removal_replaces_on_repeated_disconnect() {
let (manager, time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer.clone(), None, false);
manager.schedule_deferred_removal(&peer);
time.advance_time(Duration::from_secs(60));
manager.schedule_deferred_removal(&peer);
time.advance_time(Duration::from_secs(60));
let removed = manager.execute_pending_removals();
assert_eq!(removed, 0, "Second schedule should have reset the deadline");
assert!(manager.get_peer_interest(&contract, &peer).is_some());
time.advance_time(Duration::from_secs(31));
let removed = manager.execute_pending_removals();
assert_eq!(removed, 1);
assert!(manager.get_peer_interest(&contract, &peer).is_none());
}
#[test]
fn test_cancel_deferred_removal_returns_false_when_none_pending() {
let (manager, _time) = make_manager();
let peer = make_peer_key(1);
assert!(!manager.cancel_deferred_removal(&peer));
}
#[test]
fn test_execute_skips_removal_if_cancelled_between_collect_and_remove() {
let (manager, time) = make_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(1);
manager.register_peer_interest(&contract, peer.clone(), None, false);
manager.schedule_deferred_removal(&peer);
time.advance_time(INTEREST_DISCONNECT_GRACE_PERIOD + Duration::from_secs(1));
manager.cancel_deferred_removal(&peer);
let removed = manager.execute_pending_removals();
assert_eq!(removed, 0);
assert!(
manager.get_peer_interest(&contract, &peer).is_some(),
"Interests must be preserved when peer reconnected before sweep executed"
);
}
#[test]
fn test_summary_mismatch_targets_only_stale_peer() {
let (manager, _time) = make_manager();
let contract = make_contract_key(1);
let peer_a = make_peer_key(1);
let peer_b = make_peer_key(2);
let peer_c = make_peer_key(3);
manager.register_local_hosting(&contract);
let our_summary = StateSummary::from(vec![1, 2, 3]);
manager.register_peer_interest(&contract, peer_a.clone(), Some(our_summary.clone()), false);
manager.register_peer_interest(&contract, peer_c.clone(), Some(our_summary.clone()), false);
let stale_summary = StateSummary::from(vec![0, 0, 0]);
manager.register_peer_interest(
&contract,
peer_b.clone(),
Some(stale_summary.clone()),
false,
);
let peer_b_summary = manager.get_peer_summary(&contract, &peer_b);
let is_stale = Some(&our_summary)
.zip(peer_b_summary.as_ref())
.is_some_and(|(ours, theirs)| ours.as_ref() != theirs.as_ref());
assert!(is_stale, "Peer B should be detected as stale");
for (label, peer) in [("A", &peer_a), ("C", &peer_c)] {
let summary = manager.get_peer_summary(&contract, peer);
let stale = Some(&our_summary)
.zip(summary.as_ref())
.is_some_and(|(ours, theirs)| ours.as_ref() != theirs.as_ref());
assert!(!stale, "Peer {label} should NOT be stale");
}
let interested_peers = manager.get_interested_peers(&contract);
assert_eq!(
interested_peers.len(),
3,
"All 3 peers should be interested"
);
let stale_count = interested_peers
.iter()
.filter(|(pk, _)| {
let summary = manager.get_peer_summary(&contract, pk);
summary
.as_ref()
.map(|s| s.as_ref() != our_summary.as_ref())
.unwrap_or(false)
})
.count();
assert_eq!(
stale_count,
1,
"Only 1 peer (B) should need syncing, not all {}",
interested_peers.len()
);
}
}