use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::{Arc, Weak, atomic::AtomicU64};
use std::time::Duration;
use std::time::Instant as WallClockInstant;
use tokio::time::Instant;
use tracing::Instrument;
use either::Either;
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
use parking_lot::{Mutex, RwLock};
pub use hosting::{
AddClientSubscriptionResult, ClientDisconnectResult, SubscribeResult,
SubscribedContractSnapshot,
};
use crate::message::TransactionType;
use crate::topology::TopologyAdjustment;
use crate::topology::rate::Rate;
use crate::tracing::{NetEventLog, NetEventRegister};
use crate::transport::TransportPublicKey;
use crate::util::{Contains, time_source::InstantTimeSrc};
use crate::{
config::{GlobalExecutor, GlobalRng},
message::Transaction,
node::{self, EventLoopNotificationsSender, NodeConfig, OpManager, PeerId},
router::Router,
};
mod broken_invariants;
mod connection_backoff;
mod connection_manager;
pub(crate) mod contract_ban_list;
pub(crate) use connection_manager::ConnectionManager;
mod connection;
mod hosting;
pub(crate) use broken_invariants::{BrokenInvariant, BrokenInvariantsTracker};
pub(crate) use hosting::DEFAULT_HOSTING_BUDGET_BYTES;
pub use hosting::{AccessType, RecordAccessResult};
pub mod interest;
mod live_tx;
mod location;
pub(crate) mod peer_cache;
mod peer_connection_backoff;
mod peer_key_location;
pub mod topology_registry;
pub(crate) mod update_rate_limit;
pub const AUTO_SUBSCRIBE_ON_GET: bool = true;
const LOCAL_DEMAND_WEIGHT: f64 = 1.0;
const FORWARDED_DEMAND_WEIGHT: f64 = 0.1;
const GOVERNANCE_TICK_INTERVAL: Duration = Duration::from_secs(60);
use connection_backoff::ConnectionBackoff;
pub use connection_backoff::ConnectionFailureReason;
pub(crate) use peer_connection_backoff::PeerConnectionBackoff;
pub use self::live_tx::LiveTransactionTracker;
pub use connection::Connection;
pub use interest::PeerKey;
pub use location::{Distance, Location};
pub use peer_key_location::{KnownPeerKeyLocation, PeerAddr, PeerKeyLocation};
struct ContractConnectState {
current_backoff: Duration,
last_attempt: Instant,
}
pub(crate) struct Ring {
pub max_hops_to_live: usize,
pub connection_manager: ConnectionManager,
pub router: Arc<RwLock<Router>>,
pub live_tx_tracker: LiveTransactionTracker,
hosting_manager: hosting::HostingManager,
broken_invariants: BrokenInvariantsTracker,
pub(crate) governance: Arc<crate::contract::governance::GovernanceManager>,
pub(crate) update_rate_limiter: Arc<update_rate_limit::UpdateRateLimiter>,
pub(crate) contract_ban_list: Arc<contract_ban_list::ContractBanList>,
event_register: Box<dyn NetEventRegister>,
op_manager: RwLock<Option<Weak<OpManager>>>,
pub(crate) is_gateway: bool,
connection_backoff: Arc<parking_lot::Mutex<ConnectionBackoff>>,
contract_connect_backoff: Mutex<HashMap<ContractKey, ContractConnectState>>,
pub(crate) time_source: Arc<dyn crate::util::time_source::TimeSource + Send + Sync>,
pub(crate) peer_cache_dir: Option<std::path::PathBuf>,
}
pub(crate) struct SubscriptionRecoveryGuard {
op_manager: Arc<OpManager>,
contract_key: ContractKey,
completed: bool,
}
impl SubscriptionRecoveryGuard {
pub(crate) fn new(op_manager: Arc<OpManager>, contract_key: ContractKey) -> Self {
Self {
op_manager,
contract_key,
completed: false,
}
}
pub(crate) fn complete(mut self, success: bool) {
self.op_manager
.ring
.complete_subscription_request(&self.contract_key, success);
self.completed = true;
}
}
impl Drop for SubscriptionRecoveryGuard {
fn drop(&mut self) {
if !self.completed {
tracing::warn!(
contract = %self.contract_key,
"Subscription recovery task terminated unexpectedly, marking as failed"
);
self.op_manager
.ring
.complete_subscription_request(&self.contract_key, false);
}
}
}
enum OpManagerState<T> {
NotAttached,
Live(Arc<T>),
Detached,
}
fn classify_op_manager_ref<T>(slot: &RwLock<Option<Weak<T>>>) -> OpManagerState<T> {
let upgraded = slot.read().as_ref().map(|weak| weak.clone().upgrade());
match upgraded {
None => OpManagerState::NotAttached,
Some(Some(strong)) => OpManagerState::Live(strong),
Some(None) => OpManagerState::Detached,
}
}
#[derive(Debug, Default)]
pub struct PruneConnectionResult {
pub orphaned_transactions: Vec<Transaction>,
pub became_unready: bool,
}
impl Ring {
pub const DEFAULT_MIN_CONNECTIONS: usize = 25;
pub const DEFAULT_MAX_CONNECTIONS: usize = 200;
const DEFAULT_MAX_UPSTREAM_BANDWIDTH: Rate = Rate::new_per_second(1_000_000.0);
const DEFAULT_MAX_DOWNSTREAM_BANDWIDTH: Rate = Rate::new_per_second(1_000_000.0);
const DEFAULT_RAND_WALK_ABOVE_HTL: usize = 7;
pub const DEFAULT_MAX_HOPS_TO_LIVE: usize = 10;
pub fn new<ER: NetEventRegister + Clone>(
config: &NodeConfig,
event_loop_notifier: EventLoopNotificationsSender,
event_register: ER,
is_gateway: bool,
connection_manager: ConnectionManager,
task_monitor: &crate::node::background_task_monitor::BackgroundTaskMonitor,
) -> anyhow::Result<Arc<Self>> {
let live_tx_tracker = LiveTransactionTracker::new();
let max_hops_to_live = if let Some(v) = config.max_hops_to_live {
v
} else {
Self::DEFAULT_MAX_HOPS_TO_LIVE
};
let router = Arc::new(RwLock::new(Router::new(&[])));
crate::node::network_status::set_router(router.clone());
task_monitor.register(
"refresh_router",
GlobalExecutor::spawn(Self::refresh_router(router.clone(), event_register.clone())),
);
#[cfg(any(test, feature = "testing"))]
const TOPOLOGY_SNAPSHOT_INTERVAL: Duration = Duration::from_secs(1);
let peer_cache_dir = if is_gateway {
None
} else {
Some(config.config.data_dir())
};
let time_source: Arc<dyn crate::util::time_source::TimeSource + Send + Sync> =
Arc::new(InstantTimeSrc::new());
let governance_config = config
.governance_config_override
.clone()
.unwrap_or_default();
let governance = Arc::new(crate::contract::governance::GovernanceManager::new(
governance_config,
time_source.clone(),
));
let ring = Ring {
max_hops_to_live,
router,
connection_manager,
hosting_manager: hosting::HostingManager::new(config.config.max_hosting_storage),
broken_invariants: BrokenInvariantsTracker::new(time_source.clone()),
governance,
update_rate_limiter: Arc::new(update_rate_limit::UpdateRateLimiter::new(
time_source.clone(),
)),
contract_ban_list: Arc::new(contract_ban_list::ContractBanList::new(
time_source.clone(),
)),
live_tx_tracker: live_tx_tracker.clone(),
event_register: Box::new(event_register),
op_manager: RwLock::new(None),
is_gateway,
connection_backoff: Arc::new(Mutex::new(ConnectionBackoff::new())),
contract_connect_backoff: Mutex::new(HashMap::new()),
time_source,
peer_cache_dir,
};
if let Some(loc) = config.location {
if config.own_addr.is_none() && is_gateway {
return Err(anyhow::anyhow!("own_addr is required for gateways"));
}
ring.connection_manager.update_location(Some(loc));
}
let ring = Arc::new(ring);
let current_span = tracing::Span::current();
let span = if current_span.is_none() {
tracing::info_span!("connection_maintenance")
} else {
tracing::info_span!(parent: current_span, "connection_maintenance")
};
task_monitor.register(
"connection_maintenance",
GlobalExecutor::spawn({
let fut = ring
.clone()
.connection_maintenance(event_loop_notifier, live_tx_tracker)
.instrument(span);
async move {
if let Err(e) = fut.await {
tracing::error!(error = %e, "connection_maintenance exited with error");
}
}
}),
);
task_monitor.register(
"emit_subscription_state_telemetry",
GlobalExecutor::spawn(Self::emit_subscription_state_telemetry(
ring.clone(),
Self::SUBSCRIPTION_STATE_INTERVAL,
)),
);
task_monitor.register(
"recover_orphaned_subscriptions",
GlobalExecutor::spawn(Self::recover_orphaned_subscriptions(
ring.clone(),
Self::SUBSCRIPTION_RECOVERY_INTERVAL,
)),
);
task_monitor.register(
"sweep_get_subscription_cache",
GlobalExecutor::spawn(Self::sweep_get_subscription_cache(
ring.clone(),
Self::GET_SUBSCRIPTION_SWEEP_INTERVAL,
)),
);
#[cfg(any(test, feature = "testing"))]
task_monitor.register(
"register_topology_snapshots",
GlobalExecutor::spawn(Self::register_topology_snapshots_periodically(
ring.clone(),
TOPOLOGY_SNAPSHOT_INTERVAL,
)),
);
task_monitor.register(
"emit_router_snapshot_telemetry",
GlobalExecutor::spawn(Self::emit_router_snapshot_telemetry(
ring.clone(),
Duration::from_secs(60 * 5),
)),
);
const CONTRACT_CONNECT_INTERVAL: Duration = Duration::from_secs(30);
task_monitor.register(
"contract_directed_connects",
GlobalExecutor::spawn(Self::contract_directed_connects(
ring.clone(),
CONTRACT_CONNECT_INTERVAL,
)),
);
task_monitor.register(
"interest_heartbeat",
GlobalExecutor::spawn(Self::interest_heartbeat(ring.clone())),
);
task_monitor.register(
"governance_reaper",
GlobalExecutor::spawn(Self::governance_reaper_loop(ring.clone())),
);
Ok(ring)
}
pub fn attach_op_manager(&self, op_manager: &Arc<OpManager>) {
self.op_manager.write().replace(Arc::downgrade(op_manager));
}
fn upgrade_op_manager(&self) -> Option<Arc<OpManager>> {
self.op_manager
.read()
.as_ref()
.and_then(|weak| weak.clone().upgrade())
}
fn op_manager_state(&self) -> OpManagerState<OpManager> {
classify_op_manager_ref(&self.op_manager)
}
pub fn is_gateway(&self) -> bool {
self.is_gateway
}
pub fn open_connections(&self) -> usize {
self.connection_manager.connection_count()
}
pub fn record_connection_failure(&self, target: Location, reason: ConnectionFailureReason) {
let mut backoff = self.connection_backoff.lock();
backoff.record_failure_with_reason(target, reason);
}
pub fn record_connection_success(&self, target: Location) {
let mut backoff = self.connection_backoff.lock();
backoff.record_success(target);
}
pub fn is_in_connection_backoff(&self, target: Location) -> bool {
self.connection_backoff.lock().is_in_backoff(target)
}
pub fn cleanup_connection_backoff(&self) {
self.connection_backoff.lock().cleanup_expired();
}
pub fn reset_all_connection_backoff(&self) {
self.connection_backoff.lock().clear();
}
const INITIAL_CONTRACT_CONNECT_BACKOFF: Duration = Duration::from_secs(30);
const MAX_CONTRACT_CONNECT_BACKOFF: Duration = Duration::from_secs(24 * 60 * 60);
const MAX_CONTRACT_CONNECTS_PER_CYCLE: usize = 2;
fn is_subscription_root(&self, contract_key: &ContractKey) -> bool {
if !self.is_hosting_contract(contract_key) {
return false;
}
let contract_location = Location::from(contract_key);
let my_location = match self.connection_manager.own_location().location() {
Some(loc) => loc,
None => return false,
};
let my_distance = my_location.distance(contract_location);
let connections = self.connection_manager.get_connections_by_location();
for (_loc, conns) in connections.iter() {
for conn in conns {
if let Some(peer_loc) = conn.location.location() {
if peer_loc.distance(contract_location) < my_distance {
return false;
}
}
}
}
true
}
fn is_in_contract_connect_backoff(&self, contract_key: &ContractKey) -> bool {
let backoff = self.contract_connect_backoff.lock();
if let Some(state) = backoff.get(contract_key) {
state.last_attempt.elapsed() < state.current_backoff
} else {
false
}
}
fn record_contract_connect_attempt(&self, contract_key: &ContractKey) {
let mut backoff = self.contract_connect_backoff.lock();
let now = self.time_source.now();
let state = backoff
.entry(*contract_key)
.or_insert_with(|| ContractConnectState {
current_backoff: Self::INITIAL_CONTRACT_CONNECT_BACKOFF,
last_attempt: now,
});
state.last_attempt = now;
state.current_backoff = (state.current_backoff * 2).min(Self::MAX_CONTRACT_CONNECT_BACKOFF);
}
async fn contract_directed_connects(ring: Arc<Self>, interval: Duration) {
let initial_delay = Duration::from_secs(GlobalRng::random_range(30u64..=60u64));
tokio::time::sleep(initial_delay).await;
let mut tick_interval = tokio::time::interval(interval);
tick_interval.tick().await;
loop {
tick_interval.tick().await;
let conn_count = ring.connection_manager.connection_count();
if conn_count < 2 {
continue;
}
let contracts = ring.hosting_contract_keys();
if contracts.is_empty() {
continue;
}
let Some(op_manager) = ring.upgrade_op_manager() else {
continue;
};
let mut connects_this_cycle = 0;
for contract_key in &contracts {
if connects_this_cycle >= Self::MAX_CONTRACT_CONNECTS_PER_CYCLE {
break;
}
if !ring.is_subscription_root(contract_key) {
let had_backoff = ring
.contract_connect_backoff
.lock()
.remove(contract_key)
.is_some();
if had_backoff {
ring.force_subscription_renewal(contract_key);
tracing::info!(
contract = %contract_key,
"No longer subscription root after contract-directed CONNECT; \
expired subscription to re-route through closer peer"
);
}
continue;
}
let contract_location = Location::from(contract_key);
let my_location = ring.connection_manager.own_location().location();
let my_distance = my_location.map(|l| l.distance(contract_location).as_f64());
if ring.is_in_contract_connect_backoff(contract_key) {
continue;
}
crate::tracing::telemetry::send_standalone_event(
"subscription_root_detected",
serde_json::json!({
"contract": contract_key.to_string(),
"contract_location": contract_location.as_f64(),
"neighbor_count": conn_count,
"my_distance": my_distance,
}),
);
ring.record_contract_connect_attempt(contract_key);
let backoff_secs = {
let b = ring.contract_connect_backoff.lock();
b.get(contract_key)
.map(|s| s.current_backoff.as_secs())
.unwrap_or(0)
};
tracing::info!(
contract = %contract_key,
%contract_location,
backoff_secs,
"Initiating contract-directed CONNECT as subscription root"
);
crate::tracing::telemetry::send_standalone_event(
"contract_directed_connect",
serde_json::json!({
"contract": contract_key.to_string(),
"contract_location": contract_location.as_f64(),
"my_distance": my_distance,
"backoff_secs": backoff_secs,
}),
);
let skip_list = HashSet::new();
match ring
.acquire_new(
contract_location,
&skip_list,
&op_manager.to_event_listener,
&ring.live_tx_tracker,
&op_manager,
)
.await
{
Ok(Some(tx)) => {
tracing::debug!(
%tx,
contract = %contract_key,
"Contract-directed CONNECT initiated"
);
connects_this_cycle += 1;
}
Ok(None) => {
tracing::debug!(
contract = %contract_key,
"Contract-directed CONNECT: no routing target found"
);
}
Err(e) => {
tracing::warn!(
contract = %contract_key,
error = %e,
"Contract-directed CONNECT failed"
);
}
}
}
}
}
async fn governance_reaper_loop(ring: Arc<Self>) {
let own_loc = ring
.connection_manager
.own_location()
.location()
.map(|l| l.as_f64())
.unwrap_or(0.5);
let initial_delay_secs = 30 + ((own_loc * 60.0) as u64);
let initial_delay = Duration::from_secs(initial_delay_secs);
tokio::time::sleep(initial_delay).await;
let mut interval = tokio::time::interval(GOVERNANCE_TICK_INTERVAL);
interval.tick().await; let mut last_tick = ring.time_source.now();
loop {
interval.tick().await;
let now = ring.time_source.now();
let elapsed = now.saturating_duration_since(last_tick);
last_tick = now;
let result = ring.governance_tick(elapsed);
ring.update_rate_limiter.cleanup();
ring.contract_ban_list.cleanup();
ring.broken_invariants.cleanup();
tracing::debug!(
median = ?result.median_log_ratio,
mad = ?result.mad,
threshold = ?result.threshold,
sample_size = result.sample_size,
capacity_ceiling_binding = result.capacity_ceiling_binding,
skip_reason = ?result.skip_reason,
"governance reaper tick",
);
Self::apply_ban_decisions(
&ring.contract_ban_list,
&result.decisions,
ring.time_source.now() + ring.governance.ban_ttl(),
);
for decision in result.decisions {
tracing::info!(
contract = %decision.key,
from = ?decision.from,
to = ?decision.to,
reason = ?decision.reason,
actionable = decision.actionable,
"governance state transition",
);
}
}
}
pub(crate) fn apply_ban_decisions(
ban_list: &contract_ban_list::ContractBanList,
decisions: &[crate::contract::governance::ReaperDecision],
ban_expiry: tokio::time::Instant,
) {
use crate::contract::governance::TransitionReason;
for decision in decisions {
if !decision.actionable {
continue;
}
#[allow(clippy::wildcard_enum_match_arm)]
match decision.reason {
TransitionReason::BanTriggered => {
ban_list.ban(
decision.key,
ban_expiry,
contract_ban_list::BanReason::AutoMad,
);
}
TransitionReason::BanLifted => {
ban_list.unban(&decision.key);
}
_ => {}
}
}
}
async fn interest_heartbeat(ring: Arc<Self>) {
use crate::ring::interest::INTEREST_HEARTBEAT_INTERVAL;
let initial_delay = Duration::from_secs(GlobalRng::random_range(15u64..=45u64));
tokio::time::sleep(initial_delay).await;
let mut interval = tokio::time::interval(INTEREST_HEARTBEAT_INTERVAL);
interval.tick().await;
loop {
interval.tick().await;
let Some(op_manager) = ring.upgrade_op_manager() else {
continue;
};
let hashes = op_manager.interest_manager.get_all_interest_hashes();
if hashes.is_empty() {
continue;
}
let connections = ring.connection_manager.get_connections_by_location();
let peer_addrs: Vec<std::net::SocketAddr> = {
let mut seen = HashSet::new();
connections
.values()
.flat_map(|conns| conns.iter())
.filter_map(|conn| conn.location.socket_addr())
.filter(|addr| seen.insert(*addr))
.collect()
};
if peer_addrs.is_empty() {
continue;
}
let num_peers = peer_addrs.len();
let spread_delay = INTEREST_HEARTBEAT_INTERVAL / num_peers as u32;
tracing::debug!(
num_peers,
num_hashes = hashes.len(),
"Interest heartbeat: sending Interests to peers"
);
let sender = op_manager.to_event_listener.notifications_sender();
let mut peers_sent = 0usize;
for (i, peer_addr) in peer_addrs.into_iter().enumerate() {
let message = crate::message::InterestMessage::Interests {
hashes: hashes.clone(),
};
if let Err(e) = sender
.send(either::Either::Right(
crate::message::NodeEvent::SendInterestMessage {
target: peer_addr,
message,
},
))
.await
{
tracing::debug!(
peer = %peer_addr,
error = %e,
"Interest heartbeat: failed to queue message"
);
break;
}
peers_sent += 1;
if i + 1 < num_peers {
tokio::time::sleep(spread_delay).await;
}
}
crate::tracing::telemetry::send_standalone_event(
"interest_heartbeat_cycle",
serde_json::json!({
"peers_sent": peers_sent,
"interest_hashes": hashes.len(),
}),
);
}
}
pub async fn register_events<'a>(
&self,
events: either::Either<
crate::tracing::NetEventLog<'a>,
Vec<crate::tracing::NetEventLog<'a>>,
>,
) {
self.event_register.register_events(events).await;
}
const ROUTER_HISTORY_LIMIT: usize = 10_000;
async fn refresh_router<ER: NetEventRegister>(router: Arc<RwLock<Router>>, register: ER) {
match register.get_router_events(Self::ROUTER_HISTORY_LIMIT).await {
Ok(history) if !history.is_empty() => {
tracing::info!(
events = history.len(),
"Restored routing history from event log"
);
*router.write() = Router::new(&history);
}
Ok(_) => {
tracing::debug!("No routing history to restore on startup");
}
Err(error) => {
tracing::warn!(%error, "Failed to load routing history on startup, starting cold");
}
}
let mut interval = tokio::time::interval(Duration::from_secs(60 * 5));
interval.tick().await;
loop {
interval.tick().await;
let history = match register.get_router_events(Self::ROUTER_HISTORY_LIMIT).await {
Ok(h) => h,
Err(error) => {
tracing::error!(
error = %error,
"Failed to refresh routing history from event log; \
keeping existing router and retrying on next interval"
);
continue;
}
};
if !history.is_empty() {
*router.write() = Router::new(&history);
}
}
}
async fn emit_router_snapshot_telemetry(ring: Arc<Self>, interval_duration: Duration) {
let mut interval = tokio::time::interval(interval_duration);
interval.tick().await;
loop {
interval.tick().await;
let mut snapshot = ring.router.read().snapshot();
if let Some(op_manager) = ring.upgrade_op_manager() {
let (curve, data_range, events, adjustments) =
op_manager.connect_forward_estimator.read().snapshot();
snapshot.connect_forward_curve = Some(curve);
snapshot.connect_forward_data_range = Some(data_range);
snapshot.connect_forward_events = Some(events);
snapshot.connect_forward_peer_adjustments = Some(adjustments);
}
let (open_fds, fd_soft_limit) = read_fd_usage();
snapshot.open_fds = open_fds;
snapshot.fd_soft_limit = fd_soft_limit;
let mc = crate::wasm_runtime::MODULE_CACHE_METRICS.snapshot();
snapshot.contract_module_cache_entries = Some(mc.contract_entries);
snapshot.contract_module_cache_total_bytes = Some(mc.contract_total_bytes);
snapshot.contract_module_cache_budget_bytes = Some(mc.contract_budget_bytes);
snapshot.contract_module_cache_evictions_total = Some(mc.contract_evictions_total);
snapshot.delegate_module_cache_entries = Some(mc.delegate_entries);
snapshot.delegate_module_cache_total_bytes = Some(mc.delegate_total_bytes);
snapshot.delegate_module_cache_budget_bytes = Some(mc.delegate_budget_bytes);
snapshot.delegate_module_cache_evictions_total = Some(mc.delegate_evictions_total);
tracing::info!(
failure_events = snapshot.failure_events,
success_events = snapshot.success_events,
prediction_active = snapshot.prediction_active,
consider_n_closest_peers = snapshot.consider_n_closest_peers,
open_fds = ?snapshot.open_fds,
fd_soft_limit = ?snapshot.fd_soft_limit,
contract_module_cache_entries = mc.contract_entries,
contract_module_cache_total_bytes = mc.contract_total_bytes,
contract_module_cache_evictions_total = mc.contract_evictions_total,
"router_snapshot"
);
if let Some(event) = NetEventLog::router_snapshot(&ring, snapshot) {
ring.event_register
.register_events(Either::Left(event))
.await;
}
}
}
async fn emit_subscription_state_telemetry(ring: Arc<Self>, interval_duration: Duration) {
let mut interval = tokio::time::interval(interval_duration);
interval.tick().await;
loop {
interval.tick().await;
let subscription_states = ring.get_subscription_states();
if subscription_states.is_empty() {
continue;
}
tracing::debug!(
subscription_count = subscription_states.len(),
"Emitting periodic subscription state telemetry"
);
for (key, has_client, is_active, _expires_at) in subscription_states {
tracing::trace!(
%key,
has_client_subscription = has_client,
is_active_subscription = is_active,
"Subscription state"
);
}
}
}
const MAX_RECOVERY_ATTEMPTS_PER_INTERVAL: usize = 10;
const RENEWAL_DEFER_CAPACITY_FRACTION: usize = 2;
const RENEWAL_STOP_CAPACITY_FRACTION: usize = 4;
pub(crate) const SUBSCRIPTION_STATE_INTERVAL: Duration = Duration::from_secs(60);
pub(crate) const SUBSCRIPTION_RECOVERY_INTERVAL: Duration = Duration::from_secs(30);
pub(crate) const GET_SUBSCRIPTION_SWEEP_INTERVAL: Duration = Duration::from_secs(60);
async fn recover_orphaned_subscriptions(ring: Arc<Self>, interval_duration: Duration) {
let mut wait_logged = false;
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
if ring.open_connections() > 0 {
tracing::info!(
hosted_contracts = ring.hosting_contract_keys().len(),
"Ring connection established, starting subscription recovery"
);
break;
}
if !wait_logged {
wait_logged = true;
tracing::info!(
hosted_contracts = ring.hosting_contract_keys().len(),
"Waiting for ring connection before starting subscription recovery"
);
}
}
let jitter = Duration::from_secs(GlobalRng::random_range(2u64..=5u64));
tokio::time::sleep(jitter).await;
let mut interval = tokio::time::interval(interval_duration);
interval.tick().await;
let mut first_pass = true;
loop {
if first_pass {
first_pass = false;
} else {
interval.tick().await;
}
let expired = ring.expire_stale_subscriptions();
if !expired.is_empty() {
tracing::debug!(
expired_count = expired.len(),
"Expired {} stale subscriptions",
expired.len()
);
}
let ds_expired = ring.expire_stale_downstream_subscribers();
if !ds_expired.is_empty() {
tracing::debug!(
expired_count = ds_expired.len(),
"Expired stale downstream subscribers"
);
if let Some(op_manager) = ring.upgrade_op_manager() {
for (contract, expired_count) in &ds_expired {
for _ in 0..*expired_count {
op_manager
.interest_manager
.remove_downstream_subscriber(contract);
}
if ring.should_unsubscribe_upstream(contract) {
let op_mgr = op_manager.clone();
let contract = *contract;
GlobalExecutor::spawn(async move {
op_mgr.send_unsubscribe_upstream(&contract).await;
});
}
}
}
}
if ring.open_connections() == 0 {
tracing::debug!("Skipping subscription renewal: no ring connections");
continue;
}
let mut contracts_needing_renewal = ring.contracts_needing_renewal();
if contracts_needing_renewal.is_empty() {
tracing::debug!(
hosted = ring.hosting_contract_keys().len(),
"No contracts needing subscription renewal"
);
continue;
}
tracing::info!(
needing_renewal = contracts_needing_renewal.len(),
hosted = ring.hosting_contract_keys().len(),
"Starting subscription renewal cycle"
);
GlobalRng::shuffle(&mut contracts_needing_renewal);
let Some(op_manager) = ring.upgrade_op_manager() else {
tracing::debug!("OpManager not available for subscription renewal");
continue;
};
let sender = op_manager.to_event_listener.notifications_sender();
let channel_remaining = sender.capacity();
let channel_max = sender.max_capacity();
let batch_limit =
if channel_remaining < channel_max / Self::RENEWAL_DEFER_CAPACITY_FRACTION {
let reduced = (Self::MAX_RECOVERY_ATTEMPTS_PER_INTERVAL / 4).max(1);
tracing::warn!(
channel_remaining,
channel_max,
batch_limit = reduced,
contracts = contracts_needing_renewal.len(),
"Notification channel >50% full, reducing renewal batch size"
);
reduced
} else {
Self::MAX_RECOVERY_ATTEMPTS_PER_INTERVAL
};
let mut attempted = 0;
let mut skipped = 0;
for contract in contracts_needing_renewal {
if attempted >= batch_limit {
tracing::debug!(
limit = batch_limit,
"Reached max renewal attempts for this interval, remaining will be tried next cycle"
);
break;
}
let remaining_now = sender.capacity();
if remaining_now < channel_max / Self::RENEWAL_STOP_CAPACITY_FRACTION {
tracing::warn!(
channel_remaining = remaining_now,
attempted,
"Notification channel >75% full during renewal spawning, stopping early"
);
break;
}
if ring.contract_ban_list.is_banned(contract.id()) {
tracing::debug!(
%contract,
phase = "subscription_renewal_banned_skip",
"skipping subscription renewal for banned contract"
);
skipped += 1;
continue;
}
if !ring.can_request_subscription(&contract) {
skipped += 1;
continue;
}
if ring.mark_subscription_pending(contract) {
attempted += 1;
let jitter_ms = GlobalRng::random_range(0u64..=15_000);
let op_manager_clone = op_manager.clone();
let contract_key = contract;
GlobalExecutor::spawn(async move {
tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
let guard =
SubscriptionRecoveryGuard::new(op_manager_clone.clone(), contract_key);
let instance_id = *contract_key.id();
let renewal_tx = crate::message::Transaction::new::<
crate::operations::subscribe::SubscribeMsg,
>();
let renewal_deadline = Self::SUBSCRIPTION_RECOVERY_INTERVAL
.saturating_sub(Duration::from_secs(5));
let outcome_enum = match tokio::time::timeout(
renewal_deadline,
crate::operations::subscribe::run_renewal_subscribe(
op_manager_clone.clone(),
instance_id,
renewal_tx,
),
)
.await
{
Ok(outcome) => outcome,
Err(_) => crate::operations::subscribe::RenewalOutcome::Failed {
reason: format!(
"renewal task exceeded {}s cycle deadline",
renewal_deadline.as_secs()
),
},
};
let (outcome, error_msg) = match outcome_enum {
crate::operations::subscribe::RenewalOutcome::Success => {
tracing::info!(
%contract_key,
"Subscription renewal succeeded"
);
guard.complete(true);
("success", None)
}
crate::operations::subscribe::RenewalOutcome::ChannelCongestion => {
tracing::warn!(
%contract_key,
"Subscription renewal skipped (channel full), will retry next cycle"
);
guard.complete(true);
("dropped_channel_full", None)
}
crate::operations::subscribe::RenewalOutcome::Failed { reason } => {
tracing::debug!(
%contract_key,
error = %reason,
"Subscription renewal failed (will retry with backoff)"
);
guard.complete(false);
("failed", Some(reason))
}
};
crate::tracing::telemetry::send_standalone_event(
"subscription_renewal_outcome",
serde_json::json!({
"contract": contract_key.to_string(),
"outcome": outcome,
"error": error_msg,
}),
);
});
}
}
if attempted > 0 || skipped > 0 {
tracing::info!(
attempted,
skipped_rate_limited = skipped,
"Subscription renewal cycle complete"
);
}
}
}
async fn sweep_get_subscription_cache(ring: Arc<Self>, interval_duration: Duration) {
let initial_delay = Duration::from_secs(GlobalRng::random_range(10u64..=30u64));
tokio::time::sleep(initial_delay).await;
let mut interval = tokio::time::interval(interval_duration);
interval.tick().await;
loop {
interval.tick().await;
let expired = ring.sweep_expired_get_subscriptions();
if !expired.is_empty() {
tracing::debug!(
expired_count = expired.len(),
"GET subscription cache sweep found expired entries"
);
}
let op_manager = ring.upgrade_op_manager();
if op_manager.is_none() {
tracing::debug!(
expired_count = expired.len(),
"OpManager unavailable during GET subscription sweep — \
on-disk reclamation skipped for the expired contracts this cycle"
);
}
for (key, expected_generation) in expired {
ring.unsubscribe(&key);
tracing::info!(
%key,
"Cleaned up expired hosting subscription from local state"
);
if let Some(op_manager) = &op_manager {
crate::operations::reclaim_evicted_contract(
op_manager,
key,
expected_generation,
);
}
}
if let Some(op_manager) = &op_manager {
let pending = ring.pending_reclamation_snapshot();
if !pending.is_empty() {
tracing::debug!(
pending_count = pending.len(),
"Retrying pending reclamations from previous skipped \
`EvictContract` events"
);
for (key, expected_generation) in pending {
crate::operations::reclaim_evicted_contract(
op_manager,
key,
expected_generation,
);
}
}
}
}
}
#[cfg(any(test, feature = "testing"))]
async fn register_topology_snapshots_periodically(
ring: Arc<Self>,
interval_duration: Duration,
) {
use topology_registry::{get_current_network_name, register_topology_snapshot};
tracing::info!("Topology snapshot registration task started");
tokio::time::sleep(Duration::from_millis(100)).await;
let mut interval = tokio::time::interval(interval_duration);
interval.tick().await;
loop {
interval.tick().await;
let Some(network_name) = get_current_network_name() else {
tracing::debug!("Topology snapshot: no network name set, skipping");
continue;
};
let Some(peer_addr) = ring.connection_manager.get_own_addr() else {
tracing::debug!("Topology snapshot: no peer address yet, skipping");
continue;
};
let location = ring
.connection_manager
.get_stored_location()
.map(|l| l.as_f64())
.unwrap_or(0.0);
let snapshot = ring
.hosting_manager
.generate_topology_snapshot(peer_addr, location);
let contract_count = snapshot.contracts.len();
register_topology_snapshot(&network_name, snapshot);
tracing::info!(
%peer_addr,
location,
network = %network_name,
contract_count,
"Registered topology snapshot"
);
}
}
pub fn host_contract(
&self,
key: ContractKey,
size_bytes: u64,
access_type: AccessType,
) -> RecordAccessResult {
self.hosting_manager
.record_contract_access(key, size_bytes, access_type)
}
pub fn record_get_access(&self, key: ContractKey, size_bytes: u64) -> RecordAccessResult {
self.host_contract(key, size_bytes, AccessType::Get)
}
#[inline]
pub fn is_hosting_contract(&self, key: &ContractKey) -> bool {
self.hosting_manager.is_hosting_contract(key)
}
pub fn set_hosting_storage(&self, storage: crate::contract::storages::Storage) {
self.hosting_manager.set_storage(storage);
}
pub(crate) fn clear_redb_storage(&self) {
self.hosting_manager.clear_storage();
self.broken_invariants.clear_storage();
}
#[cfg(feature = "redb")]
pub fn load_hosting_cache<F>(
&self,
storage: &crate::contract::storages::Storage,
code_hash_lookup: F,
) -> Result<usize, redb::Error>
where
F: Fn(
&freenet_stdlib::prelude::ContractInstanceId,
) -> Option<freenet_stdlib::prelude::CodeHash>,
{
self.hosting_manager
.load_from_storage(storage, code_hash_lookup)
}
#[cfg(all(feature = "sqlite", not(feature = "redb")))]
pub async fn load_hosting_cache<F>(
&self,
storage: &crate::contract::storages::Storage,
code_hash_lookup: F,
) -> Result<usize, crate::contract::storages::sqlite::SqlDbError>
where
F: Fn(
&freenet_stdlib::prelude::ContractInstanceId,
) -> Option<freenet_stdlib::prelude::CodeHash>,
{
self.hosting_manager
.load_from_storage(storage, code_hash_lookup)
.await
}
pub fn record_request(
&self,
recipient: PeerKeyLocation,
target: Location,
request_type: TransactionType,
) {
self.connection_manager
.topology_manager
.write()
.record_request(recipient, target, request_type);
}
pub async fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) -> bool {
tracing::info!(
peer = %peer,
peer_location = %loc,
this = ?self.connection_manager.get_own_addr(),
was_reserved = %was_reserved,
"Adding connection to peer"
);
let min_ready = self.connection_manager.min_ready_connections;
let was_ready = min_ready == 0 || self.connection_manager.connection_count() >= min_ready;
let addr = peer.socket_addr();
let pub_key = peer.pub_key().clone();
let added = self
.connection_manager
.add_connection(loc, addr, pub_key, was_reserved);
if !added {
tracing::warn!(
peer = %peer,
peer_location = %loc,
"Ring rejected connection - not updating caches or logging connection event"
);
return false;
}
if let Some(own_loc) = self.connection_manager.own_location().location() {
crate::node::network_status::set_own_location(own_loc.as_f64());
}
self.refresh_density_request_cache();
let is_ready = self.connection_manager.is_self_ready();
!was_ready && is_ready
}
pub fn update_connection_identity(&self, old_peer: &PeerId, new_peer: PeerId) {
if self.connection_manager.update_peer_identity(
old_peer.socket_addr(),
new_peer.socket_addr(),
new_peer.pub_key().clone(),
) {
self.refresh_density_request_cache();
}
}
fn refresh_density_request_cache(&self) {
let cbl = self.connection_manager.get_connections_by_location();
let topology_manager = &mut self.connection_manager.topology_manager.write();
let _refreshed = topology_manager.refresh_cache(&cbl);
}
pub fn is_not_connected<'a>(
&self,
peers: impl Iterator<Item = &'a PeerKeyLocation>,
) -> impl Iterator<Item = &'a PeerKeyLocation> + Send {
let mut filtered = Vec::new();
for peer in peers {
if let Some(addr) = peer.socket_addr() {
if !self.connection_manager.has_connection_or_pending(addr) {
filtered.push(peer);
}
} else {
filtered.push(peer);
}
}
filtered.into_iter()
}
#[inline]
pub fn closest_potentially_hosting(
&self,
contract_key: &ContractKey,
skip_list: impl Contains<std::net::SocketAddr>,
) -> Option<PeerKeyLocation> {
let router = self.router.read();
let target = Location::from(contract_key);
let (peer, decision) = self
.connection_manager
.routing_with_telemetry(target, None, skip_list, &router);
if let Some(decision) = &decision {
tracing::debug!(
target_location = %target.as_f64(),
strategy = ?decision.strategy,
num_candidates = decision.candidates.len(),
total_routing_events = decision.total_routing_events,
selected = peer.is_some(),
"routing_decision"
);
}
peer
}
pub fn k_closest_potentially_hosting<K>(
&self,
contract_id: &K,
skip_list: impl Contains<std::net::SocketAddr> + Clone,
k: usize,
) -> Vec<PeerKeyLocation>
where
for<'a> Location: From<&'a K>,
{
let target_location = Location::from(contract_id);
let mut seen = HashSet::new();
let mut candidates: Vec<PeerKeyLocation> = Vec::new();
let mut not_ready_fallback: Vec<PeerKeyLocation> = Vec::new();
let mut skipped_not_ready: usize = 0;
let mut skipped_transient: usize = 0;
let connections = self.connection_manager.get_connections_by_location();
let mut sorted_keys: Vec<_> = connections.keys().collect();
sorted_keys.sort();
for loc in sorted_keys {
let conns = connections.get(loc).expect("key exists");
let mut sorted_conns: Vec<_> = conns.iter().collect();
sorted_conns.sort_by_key(|c| c.location.clone());
for conn in sorted_conns {
if let Some(addr) = conn.location.socket_addr() {
if skip_list.has_element(addr) || !seen.insert(addr) {
continue;
}
if self.connection_manager.is_transient(addr) {
tracing::debug!(
%addr,
target_location = %target_location.as_f64(),
"k_closest: skipping transient peer"
);
skipped_transient += 1;
continue;
}
if !self.connection_manager.is_peer_ready(addr) {
tracing::debug!(
%addr,
target_location = %target_location.as_f64(),
"k_closest: skipping peer not yet ready"
);
not_ready_fallback.push(conn.location.clone());
skipped_not_ready += 1;
continue;
}
} else {
tracing::debug!(
peer = ?conn.location,
target_location = %target_location.as_f64(),
"k_closest: including addressless candidate (bypasses addr-keyed filters)"
);
}
candidates.push(conn.location.clone());
}
}
if candidates.is_empty() && !not_ready_fallback.is_empty() {
tracing::warn!(
count = not_ready_fallback.len(),
target_location = %target_location.as_f64(),
"k_closest: no ready peers available, falling back to not-yet-ready peers to avoid EmptyRing"
);
candidates = not_ready_fallback;
}
if candidates.is_empty() && skipped_transient > 0 {
tracing::warn!(
skipped_transient,
target_location = %target_location.as_f64(),
"k_closest: no viable peers — all eligible connections were transient \
(no fallback by design; routing will fail this hop)"
);
}
candidates.sort();
let (selected, decision) = self.router.read().select_k_best_peers_with_telemetry(
candidates.iter(),
target_location,
k,
);
tracing::debug!(
target_location = %target_location.as_f64(),
strategy = ?decision.strategy,
num_candidates = decision.candidates.len(),
total_routing_events = decision.total_routing_events,
selected_count = selected.len(),
"routing_decision"
);
tracing::debug!(
target_location = %target_location.as_f64(),
candidates_found = selected.len(),
skipped_not_ready,
skipped_transient,
"k_closest_potentially_hosting result"
);
selected.into_iter().cloned().collect()
}
pub fn routing_finished(&self, event: crate::router::RouteEvent) {
self.connection_manager
.topology_manager
.write()
.report_outbound_request(event.peer.clone(), event.contract_location);
if let Some(addr) = event.peer.socket_addr() {
let mut health = self.connection_manager.peer_health.lock();
match &event.outcome {
crate::router::RouteOutcome::Success { .. }
| crate::router::RouteOutcome::SuccessUntimed => {
health.record_success(addr);
}
crate::router::RouteOutcome::Failure => {
health.record_failure(addr);
}
}
}
self.router.write().add_event(event);
}
pub fn subscribe(&self, contract: ContractKey) -> SubscribeResult {
self.hosting_manager.subscribe(contract)
}
pub fn unsubscribe(&self, contract: &ContractKey) {
self.hosting_manager.unsubscribe(contract)
}
pub fn is_subscribed(&self, contract: &ContractKey) -> bool {
self.hosting_manager.is_subscribed(contract)
}
pub fn get_subscribed_contracts(&self) -> Vec<ContractKey> {
self.hosting_manager.get_subscribed_contracts()
}
fn force_subscription_renewal(&self, contract: &ContractKey) {
self.hosting_manager.force_subscription_renewal(contract);
}
pub fn expire_stale_subscriptions(&self) -> Vec<ContractKey> {
self.hosting_manager.expire_stale_subscriptions()
}
pub fn add_downstream_subscriber(&self, contract: &ContractKey, peer: PeerKey) -> bool {
let outcome = self
.hosting_manager
.add_downstream_subscriber(contract, peer);
!matches!(
outcome,
crate::ring::hosting::AddSubscriberOutcome::Rejected
)
}
#[allow(dead_code)] pub fn renew_downstream_subscriber(&self, contract: &ContractKey, peer: &PeerKey) -> bool {
self.hosting_manager
.renew_downstream_subscriber(contract, peer)
}
pub fn remove_downstream_subscriber(&self, contract: &ContractKey, peer: &PeerKey) -> bool {
self.hosting_manager
.remove_downstream_subscriber(contract, peer)
}
pub fn has_downstream_subscribers(&self, contract: &ContractKey) -> bool {
self.hosting_manager.has_downstream_subscribers(contract)
}
pub(crate) fn contract_in_use(&self, contract: &ContractKey) -> bool {
self.hosting_manager.contract_in_use(contract)
}
pub(crate) fn commit_state_write(&self, contract: &ContractKey, state_size: usize) {
let new_gen = self.hosting_manager.bump_state_generation(contract);
self.hosting_manager
.refresh_cache_generation(contract, new_gen);
self.report_contract_resource_usage(
*contract.id(),
crate::topology::meter::ResourceType::StateBytesWritten,
state_size as f64,
);
}
pub(crate) fn state_generation(&self, contract: &ContractKey) -> u64 {
self.hosting_manager.state_generation(contract)
}
pub(crate) fn forget_state_generation(&self, contract: &ContractKey) {
self.hosting_manager.forget_state_generation(contract)
}
pub(crate) fn pending_reclamation_add(&self, contract: ContractKey, expected_generation: u64) {
self.hosting_manager
.pending_reclamation_add(contract, expected_generation)
}
pub(crate) fn pending_reclamation_remove(&self, contract: &ContractKey) {
self.hosting_manager.pending_reclamation_remove(contract)
}
pub(crate) fn pending_reclamation_snapshot(&self) -> Vec<(ContractKey, u64)> {
self.hosting_manager.pending_reclamation_snapshot()
}
pub fn expire_stale_downstream_subscribers(&self) -> Vec<(ContractKey, usize)> {
self.hosting_manager.expire_stale_downstream_subscribers()
}
pub fn should_unsubscribe_upstream(&self, contract: &ContractKey) -> bool {
self.hosting_manager.should_unsubscribe_upstream(contract)
}
pub fn is_receiving_updates(&self, contract: &ContractKey) -> bool {
self.hosting_manager.is_receiving_updates(contract)
}
pub fn contracts_needing_renewal(&self) -> Vec<ContractKey> {
self.hosting_manager.contracts_needing_renewal()
}
pub fn add_client_subscription(
&self,
instance_id: &ContractInstanceId,
client_id: crate::client_events::ClientId,
) -> AddClientSubscriptionResult {
self.hosting_manager
.add_client_subscription(instance_id, client_id)
}
pub fn remove_client_from_all_subscriptions(
&self,
client_id: crate::client_events::ClientId,
) -> ClientDisconnectResult {
self.hosting_manager
.remove_client_from_all_subscriptions(client_id)
}
pub fn hosting_contract_keys(&self) -> Vec<ContractKey> {
self.hosting_manager.hosting_contract_keys()
}
pub fn hosting_contract_size(&self, key: &ContractKey) -> u64 {
self.hosting_manager.hosting_contract_size(key)
}
pub fn hosting_contracts_count(&self) -> usize {
self.hosting_manager.hosting_contracts_count()
}
pub fn get_subscription_states(&self) -> Vec<(ContractKey, bool, bool, Option<Instant>)> {
self.hosting_manager.get_subscription_states()
}
pub fn dashboard_subscription_snapshot(&self) -> Vec<SubscribedContractSnapshot> {
self.hosting_manager.dashboard_subscription_snapshot()
}
pub fn dashboard_governance_snapshot(&self) -> crate::node::network_status::GovernanceSnapshot {
use crate::contract::governance as gov;
use crate::node::network_status as ns;
let now = self.time_source.now();
let mode = match self.governance.mode() {
gov::GovernanceMode::Off => ns::GovernanceModeSnapshot::Off,
gov::GovernanceMode::DryRun => ns::GovernanceModeSnapshot::DryRun,
gov::GovernanceMode::Enforce => ns::GovernanceModeSnapshot::Enforce,
};
let map_state = |s: gov::GovernanceState| match s {
gov::GovernanceState::Normal => ns::GovernanceStateSnapshot::Normal,
gov::GovernanceState::Borderline => ns::GovernanceStateSnapshot::Borderline,
gov::GovernanceState::WouldEvict => ns::GovernanceStateSnapshot::WouldEvict,
gov::GovernanceState::Evicted => ns::GovernanceStateSnapshot::Evicted,
gov::GovernanceState::Banned => ns::GovernanceStateSnapshot::Banned,
};
let map_reason = |r: gov::TransitionReason| match r {
gov::TransitionReason::FirstSeen => ns::GovernanceTransitionReasonSnapshot::FirstSeen,
gov::TransitionReason::BorderlineEntered => {
ns::GovernanceTransitionReasonSnapshot::BorderlineEntered
}
gov::TransitionReason::ThresholdCrossed => {
ns::GovernanceTransitionReasonSnapshot::ThresholdCrossed
}
gov::TransitionReason::Evicted => ns::GovernanceTransitionReasonSnapshot::Evicted,
gov::TransitionReason::BanTriggered => {
ns::GovernanceTransitionReasonSnapshot::BanTriggered
}
gov::TransitionReason::Recovered => ns::GovernanceTransitionReasonSnapshot::Recovered,
gov::TransitionReason::BanLifted => ns::GovernanceTransitionReasonSnapshot::BanLifted,
};
let contracts: Vec<ns::ContractGovernanceEntry> = self
.governance
.iter_flagged_scores()
.into_iter()
.map(|(id, score)| {
let instance_id = id.to_string();
let instance_id_short = if instance_id.chars().count() > 12 {
let trunc: String = instance_id.chars().take(12).collect();
format!("{trunc}...")
} else {
instance_id.clone()
};
let history = score
.history
.iter()
.map(|t| ns::GovernanceTransitionEntry {
secs_ago: now.saturating_duration_since(t.at).as_secs(),
from: map_state(t.from),
to: map_state(t.to),
reason: map_reason(t.reason),
})
.collect();
ns::ContractGovernanceEntry {
instance_id,
instance_id_short,
state: map_state(score.state),
cost_used: score.cost_used,
benefit_score: score.benefit_score,
log_ratio: score.log_ratio(self.governance.benefit_floor()),
age_secs: now.saturating_duration_since(score.first_seen).as_secs(),
last_transition_secs_ago: now
.saturating_duration_since(score.last_transition)
.as_secs(),
history,
}
})
.collect();
let norms = match self.governance.latest_norms() {
Some(n) => ns::NetworkNorms {
median_log_ratio: n.median_log_ratio,
mad: n.mad,
threshold: n.threshold,
sample_size: n.sample_size,
capacity_ceiling_binding: n.capacity_ceiling_binding,
skip_reason: n.skip_reason.map(|r| match r {
crate::governance::SkipReason::InsufficientSamples => {
ns::GovernanceSkipReasonSnapshot::InsufficientSamples
}
crate::governance::SkipReason::MadCollapsed => {
ns::GovernanceSkipReasonSnapshot::MadCollapsed
}
crate::governance::SkipReason::NoExtractableRatios => {
ns::GovernanceSkipReasonSnapshot::NoExtractableRatios
}
}),
},
None => ns::NetworkNorms::default(),
};
let state_by_id: std::collections::HashMap<String, ns::GovernanceStateSnapshot> = contracts
.iter()
.map(|c| (c.instance_id.clone(), c.state))
.collect();
let observed_count = self.governance.len();
let min_samples = self.governance.outlier_min_samples();
let last_tick_at = self.governance.latest_norms().map(|n| n.at);
ns::GovernanceSnapshot {
mode,
contracts,
observed_count,
min_samples,
norms,
last_tick_at,
state_by_id,
}
}
pub fn dashboard_ban_list_snapshot(&self) -> crate::node::network_status::BanListSnapshot {
use crate::node::network_status as ns;
use crate::ring::contract_ban_list::BanReason;
let map_reason = |r: BanReason| match r {
BanReason::AutoMad => ns::BanReasonSnapshot::AutoMad,
BanReason::Operator => ns::BanReasonSnapshot::Operator,
};
let mut entries: Vec<ns::BanListEntry> = self
.contract_ban_list
.snapshot()
.into_iter()
.map(|e| ns::BanListEntry {
instance_id: e.contract.to_string(),
reason: map_reason(e.reason),
expires_in_secs: e.remaining.as_secs(),
})
.collect();
entries.sort_by(|a, b| {
a.expires_in_secs
.cmp(&b.expires_in_secs)
.then_with(|| a.instance_id.cmp(&b.instance_id))
});
ns::BanListSnapshot {
count: entries.len(),
capacity_rejected_total: self.contract_ban_list.capacity_rejected_total(),
entries,
}
}
pub fn record_contract_update(&self, contract: &ContractKey) {
self.hosting_manager.record_contract_update(contract)
}
pub fn is_contract_broken(&self, contract: &ContractKey) -> bool {
self.broken_invariants.is_broken(contract.id())
}
pub(crate) fn record_broken_invariant(&self, contract: ContractKey, kind: BrokenInvariant) {
self.broken_invariants.record(*contract.id(), kind);
}
pub(crate) fn set_broken_invariants_storage(
&self,
storage: crate::contract::storages::Storage,
) {
self.broken_invariants.set_storage(storage);
}
pub(crate) fn report_contract_resource_usage(
&self,
contract_id: freenet_stdlib::prelude::ContractInstanceId,
resource: crate::topology::meter::ResourceType,
amount: f64,
) {
let now = self.time_source.now();
let mut topo = self.connection_manager.topology_manager.write();
topo.report_resource_usage(
&crate::topology::meter::AttributionSource::Contract(contract_id),
resource,
amount,
now,
);
drop(topo);
let weight = resource_weight(resource);
self.governance.ingest_cost(contract_id, amount * weight);
}
pub(crate) fn governance_tick(
&self,
tick_interval: Duration,
) -> crate::contract::governance::ReaperTickResult {
let tracked = self.governance.tracked_ids();
let mut benefits = self
.hosting_manager
.beneficiary_counts(LOCAL_DEMAND_WEIGHT, FORWARDED_DEMAND_WEIGHT);
benefits.retain(|id, _| tracked.contains(id));
self.governance.tick(tick_interval, &benefits)
}
#[cfg(all(test, feature = "simulation_tests"))]
pub(crate) fn hosting_manager_local_client_count(
&self,
instance_id: &ContractInstanceId,
) -> usize {
self.hosting_manager.local_client_count(instance_id)
}
#[cfg(all(test, feature = "simulation_tests"))]
pub(crate) fn hosting_manager_downstream_subscriber_count(
&self,
instance_id: &ContractInstanceId,
) -> usize {
self.hosting_manager
.downstream_subscriber_count(instance_id)
}
}
fn resource_weight(resource: crate::topology::meter::ResourceType) -> f64 {
use crate::topology::meter::ResourceType::*;
match resource {
InboundBandwidthBytes | OutboundBandwidthBytes => 1.0,
ExecCpuMicros | ExecFuelUnits => 1.0,
StateBytesWritten | BroadcastFanoutCost => 1.0,
}
}
fn feed_peer_bandwidth_to_meter(
connection_manager: &ConnectionManager,
snapshot: &[(SocketAddr, u64, u64)],
prev: &mut HashMap<SocketAddr, (u64, u64)>,
interval_start: Instant,
) {
use crate::topology::meter::{AttributionSource, ResourceType};
let mut live_peers: std::collections::HashSet<PeerKeyLocation> =
std::collections::HashSet::new();
for &(addr, cum_sent, cum_recv) in snapshot {
let (prev_sent, prev_recv) = prev.get(&addr).copied().unwrap_or((0, 0));
let sent_delta = cum_sent.saturating_sub(prev_sent);
let recv_delta = cum_recv.saturating_sub(prev_recv);
prev.insert(addr, (cum_sent, cum_recv));
let Some(peer) = connection_manager.get_peer_by_addr(addr) else {
continue;
};
live_peers.insert(peer.clone());
if sent_delta == 0 && recv_delta == 0 {
continue;
}
let source = AttributionSource::Peer(peer);
let mut topo = connection_manager.topology_manager.write();
if recv_delta > 0 {
topo.report_resource_usage(
&source,
ResourceType::InboundBandwidthBytes,
recv_delta as f64,
interval_start,
);
}
if sent_delta > 0 {
topo.report_resource_usage(
&source,
ResourceType::OutboundBandwidthBytes,
sent_delta as f64,
interval_start,
);
}
}
connection_manager
.topology_manager
.write()
.retain_peer_sources(&live_peers);
if prev.len() > snapshot.len() {
let live: std::collections::HashSet<SocketAddr> =
snapshot.iter().map(|&(addr, _, _)| addr).collect();
prev.retain(|addr, _| live.contains(addr));
}
}
impl Ring {
pub fn can_request_subscription(&self, contract: &ContractKey) -> bool {
self.hosting_manager.can_request_subscription(contract)
}
pub fn mark_subscription_pending(&self, contract: ContractKey) -> bool {
self.hosting_manager.mark_subscription_pending(contract)
}
pub fn complete_subscription_request(&self, contract: &ContractKey, success: bool) {
self.hosting_manager
.complete_subscription_request(contract, success)
}
pub fn touch_hosting(&self, key: &ContractKey) {
self.hosting_manager.touch_hosting(key)
}
pub fn mark_local_client_access(&self, key: &ContractKey) {
self.hosting_manager.mark_local_client_access(key)
}
pub fn has_local_client_access(&self, key: &ContractKey) -> bool {
self.hosting_manager.has_local_client_access(key)
}
pub fn sweep_expired_hosting(&self) -> Vec<(ContractKey, u64)> {
self.hosting_manager.sweep_expired_hosting()
}
pub fn sweep_expired_get_subscriptions(&self) -> Vec<(ContractKey, u64)> {
self.sweep_expired_hosting()
}
pub async fn prune_connection(&self, peer: PeerId) -> PruneConnectionResult {
use crate::tracing::DisconnectReason;
tracing::debug!(%peer, "Removing connection");
crate::node::network_status::record_peer_disconnected(peer.socket_addr());
let orphaned_transactions = self
.live_tx_tracker
.prune_transactions_from_peer(peer.socket_addr());
if !orphaned_transactions.is_empty() {
tracing::debug!(
%peer,
orphaned_count = orphaned_transactions.len(),
"Connection pruned with orphaned transactions"
);
}
let min_ready = self.connection_manager.min_ready_connections;
let was_ready = self.connection_manager.is_self_ready();
let connection_duration_ms = self
.connection_manager
.get_connection_duration_ms(peer.socket_addr());
let Some(_loc) = self
.connection_manager
.prune_alive_connection(peer.socket_addr())
else {
return PruneConnectionResult {
orphaned_transactions,
became_unready: false,
};
};
if let Some(event) = NetEventLog::disconnected_with_context(
self,
&peer,
DisconnectReason::Pruned,
connection_duration_ms,
None, None, ) {
self.event_register
.register_events(Either::Left(event))
.await;
}
let is_ready = self.connection_manager.is_self_ready();
let became_unready = min_ready > 0 && was_ready && !is_ready;
PruneConnectionResult {
orphaned_transactions,
became_unready,
}
}
async fn connection_maintenance(
self: Arc<Self>,
notifier: EventLoopNotificationsSender,
live_tx_tracker: LiveTransactionTracker,
) -> anyhow::Result<()> {
let is_gateway = self.is_gateway;
tracing::info!(is_gateway, "Connection maintenance task starting");
#[cfg(not(test))]
const CHECK_TICK_DURATION: Duration = Duration::from_secs(60);
#[cfg(test)]
const CHECK_TICK_DURATION: Duration = Duration::from_secs(2);
#[cfg(not(test))]
const FAST_CHECK_TICK_DURATION: Duration = Duration::from_secs(5);
#[cfg(test)]
const FAST_CHECK_TICK_DURATION: Duration = Duration::from_secs(1);
const REGENERATE_DENSITY_MAP_INTERVAL: Duration = Duration::from_secs(60);
const BASE_CONCURRENT_CONNECTIONS: usize = 3;
let mut check_interval = tokio::time::interval(CHECK_TICK_DURATION);
check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut refresh_density_map = tokio::time::interval(REGENERATE_DENSITY_MAP_INTERVAL);
refresh_density_map.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tokio::time::sleep(Duration::from_secs(2)).await;
let mut pending_conn_adds = BTreeSet::new();
let mut last_backoff_cleanup = self.time_source.now();
let mut last_health_check = self.time_source.now();
let mut last_peer_cache_save = self.time_source.now();
const HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(300);
const PEER_CACHE_SAVE_INTERVAL: Duration = Duration::from_secs(30);
const BACKOFF_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
const ISOLATION_ESCALATION_THRESHOLD: Duration = Duration::from_secs(120);
const INITIAL_ISOLATION_ESCALATION_THRESHOLD: Duration = Duration::from_secs(30);
const DEFERRED_SWAP_DROP_TTL: Duration = Duration::from_secs(120);
#[cfg(not(test))]
const GATEWAY_VERSION_PROBE_INTERVAL: Duration = Duration::from_secs(4 * 3600);
#[cfg(test)]
const GATEWAY_VERSION_PROBE_INTERVAL: Duration = Duration::from_secs(10);
const GATEWAY_PROBE_JITTER_FACTOR: f64 = 0.2;
let mut deferred_swap_drops: Vec<(SocketAddr, tokio::time::Instant)> = Vec::new();
let mut zero_connections_since: Option<Instant> = None;
let mut ever_had_connections = false;
let mut prev_peer_bandwidth: HashMap<SocketAddr, (u64, u64)> =
crate::transport::metrics::TRANSPORT_METRICS
.per_peer_snapshot()
.into_iter()
.map(|(addr, sent, recv)| (addr, (sent, recv)))
.collect();
let mut prev_bandwidth_tick = self.time_source.now();
let initial_probe_delay_secs =
GlobalRng::random_u64() % GATEWAY_VERSION_PROBE_INTERVAL.as_secs();
let mut next_gateway_probe =
self.time_source.now() + Duration::from_secs(initial_probe_delay_secs);
let mut last_conn_count: usize = 0;
let mut no_progress_ticks: u32 = 0;
const FAST_TICK_BACKOFF_THRESHOLD: u32 = 6;
const MAX_FAST_TICK_MULTIPLIER: u32 =
(CHECK_TICK_DURATION.as_secs() / FAST_CHECK_TICK_DURATION.as_secs()) as u32;
let mut last_boot_time = boot_time::Instant::now();
let mut last_mono_time = WallClockInstant::now();
const SUSPEND_DETECTION_THRESHOLD: Duration = CHECK_TICK_DURATION.saturating_mul(2);
let mut this_peer = None;
'maintenance: loop {
let boot_elapsed = last_boot_time.elapsed();
let mono_elapsed = last_mono_time.elapsed();
last_boot_time = boot_time::Instant::now();
last_mono_time = WallClockInstant::now();
let suspend_jump = classify_suspend_jump(boot_elapsed, mono_elapsed);
if let Some(skew) = mono_elapsed.checked_sub(boot_elapsed)
&& skew > Duration::from_millis(100)
{
tracing::warn!(
mono_ahead_ms = skew.as_millis() as u64,
boot_elapsed_ms = boot_elapsed.as_millis() as u64,
mono_elapsed_ms = mono_elapsed.as_millis() as u64,
"connection_maintenance: monotonic clock is significantly \
ahead of boot clock — possible virtualization TSC anomaly \
or monotonic clock regression; suspend detection is \
saturated to zero for this iteration"
);
}
let op_manager = match self.op_manager_state() {
OpManagerState::Live(op_manager) => op_manager,
OpManagerState::NotAttached => {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
OpManagerState::Detached => {
tracing::info!(
is_gateway,
"OpManager dropped; connection maintenance loop ending (parking)"
);
break 'maintenance;
}
};
let Some(this_addr) = &this_peer else {
let Some(addr) = self.connection_manager.get_own_addr() else {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
this_peer = Some(addr);
continue;
};
let mut skip_list = HashSet::new();
skip_list.insert(*this_addr);
let reset_all_backoff = || {
self.reset_all_connection_backoff();
op_manager.gateway_backoff.lock().clear();
op_manager.gateway_backoff_cleared.notify_waiters();
let gateway_addrs: Vec<_> = op_manager
.configured_gateways
.iter()
.filter_map(|gw| gw.socket_addr())
.collect();
self.connection_manager
.clear_pending_reservations_for(&gateway_addrs);
};
if suspend_jump > SUSPEND_DETECTION_THRESHOLD {
tracing::warn!(
boot_elapsed_secs = boot_elapsed.as_secs(),
mono_elapsed_secs = mono_elapsed.as_secs(),
suspend_jump_secs = suspend_jump.as_secs(),
"Detected suspend/resume (boot-time jump) — dropping all connections and clearing state"
);
reset_all_backoff();
self.connection_manager.cleanup_all_failed_addrs();
notifier
.notifications_sender
.send(Either::Right(crate::message::NodeEvent::DropAllConnections))
.await
.map_err(|error| {
tracing::debug!(?error, "Failed to send DropAllConnections");
error
})?;
zero_connections_since = None;
}
if last_backoff_cleanup.elapsed() > BACKOFF_CLEANUP_INTERVAL {
self.cleanup_connection_backoff();
last_backoff_cleanup = self.time_source.now();
}
let stale_removed = self.connection_manager.cleanup_stale_reservations();
if stale_removed > 0 {
tracing::warn!(
stale_removed,
"Cleaned up stale reservations and orphaned location entries"
);
}
let tick_now = self.time_source.now();
self.connection_manager.cleanup_stale_failed_addrs();
self.connection_manager
.cleanup_expired_acceptor_stats(tick_now);
let expired_transients = self.connection_manager.cleanup_expired_transients();
if expired_transients > 0 {
tracing::debug!(
expired_transients,
"Cleaned up expired transient connections"
);
}
if last_health_check.elapsed() > HEALTH_CHECK_INTERVAL {
last_health_check = self.time_source.now();
let current_ring = self.connection_manager.connection_count();
let unhealthy = self
.connection_manager
.peer_health
.lock()
.unhealthy_peers(self.connection_manager.min_connections, current_ring);
for addr in unhealthy {
tracing::warn!(
peer = %addr,
"Evicting unhealthy peer (sustained routing failures)"
);
if let Err(e) = notifier
.notifications_sender
.send(Either::Right(crate::message::NodeEvent::DropConnection(
addr,
)))
.await
{
tracing::debug!(error = ?e, "Failed to send DropConnection for unhealthy peer");
}
}
}
if last_peer_cache_save.elapsed() > PEER_CACHE_SAVE_INTERVAL {
last_peer_cache_save = self.time_source.now();
if let Some(ref dir) = self.peer_cache_dir {
let cache = peer_cache::PeerCache::snapshot_from(
&self.connection_manager,
self.time_source.as_ref(),
);
if !cache.peers.is_empty() {
if let Err(e) = cache.save(dir) {
tracing::warn!(error = %e, "Failed to save peer cache");
}
}
}
}
let current_conn_count = self.connection_manager.connection_count();
crate::transport::set_open_connection_count(current_conn_count);
if current_conn_count == 0 {
let threshold = if ever_had_connections {
ISOLATION_ESCALATION_THRESHOLD
} else {
INITIAL_ISOLATION_ESCALATION_THRESHOLD
};
if let Some(since) = zero_connections_since {
if since.elapsed() > threshold {
tracing::warn!(
is_gateway,
isolated_for_secs = since.elapsed().as_secs(),
threshold_secs = threshold.as_secs(),
ever_connected = ever_had_connections,
"Node isolated with zero ring connections — resetting all backoff state"
);
reset_all_backoff();
zero_connections_since = Some(self.time_source.now());
}
} else {
zero_connections_since = Some(self.time_source.now());
tracing::warn!(
is_gateway,
"Zero ring connections detected — starting isolation timer"
);
}
} else if zero_connections_since.take().is_some() {
ever_had_connections = true;
tracing::info!(
connections = current_conn_count,
"Recovered from zero-connection state"
);
}
if should_probe_gateway(
is_gateway,
!op_manager.configured_gateways.is_empty(),
self.time_source.now(),
next_gateway_probe,
) {
let gw_index =
GlobalRng::random_u64() as usize % op_manager.configured_gateways.len();
let gateway = &op_manager.configured_gateways[gw_index];
if let Err(e) =
crate::operations::connect::gateway_version_probe(gateway, &op_manager).await
{
tracing::debug!(
error = %e,
gateway = %gateway,
"Gateway version probe failed, will retry next cycle"
);
}
let base_secs = GATEWAY_VERSION_PROBE_INTERVAL.as_secs() as f64;
let jitter_range = base_secs * GATEWAY_PROBE_JITTER_FACTOR;
let uniform_01 = (GlobalRng::random_u64() as f64) / (u64::MAX as f64);
let jittered_secs = base_secs + jitter_range * (2.0 * uniform_01 - 1.0);
next_gateway_probe =
self.time_source.now() + Duration::from_secs_f64(jittered_secs.max(1.0));
}
if current_conn_count == 0 && !op_manager.configured_gateways.is_empty() {
let eligible: Vec<_> = {
let backoff = op_manager.gateway_backoff.lock();
self.is_not_connected(op_manager.configured_gateways.iter())
.filter(|gw| {
gw.socket_addr()
.map(|addr| !backoff.is_in_backoff(addr))
.unwrap_or(false) })
.cloned()
.collect()
};
if eligible.is_empty() {
tracing::debug!(
total_gateways = op_manager.configured_gateways.len(),
"Zero connections — all gateways connected/pending or in backoff"
);
} else {
let attempt_count = eligible.len().min(BASE_CONCURRENT_CONNECTIONS);
tracing::info!(
eligible = eligible.len(),
attempting = attempt_count,
"Zero connections — attempting gateway bootstrap"
);
for gw in eligible.iter().take(BASE_CONCURRENT_CONNECTIONS) {
match crate::operations::connect::join_ring_request(gw, &op_manager, None)
.await
{
Ok(()) => tracing::debug!(gateway = %gw, "Gateway bootstrap initiated"),
Err(e) => {
tracing::warn!(gateway = %gw, error = %e, "Gateway bootstrap failed")
}
}
}
}
}
let max_concurrent = calculate_max_concurrent_connections(
current_conn_count,
self.connection_manager.min_connections,
);
let mut active_count = live_tx_tracker.active_acquisition_transaction_count();
let respect_backoff = should_respect_location_backoff(
current_conn_count,
self.connection_manager.min_connections,
);
while let Some(ideal_location) = pending_conn_adds.pop_first() {
if respect_backoff && self.is_in_connection_backoff(ideal_location) {
tracing::debug!(
target_location = %ideal_location,
"Skipping connection attempt - target in backoff"
);
continue;
}
if active_count >= max_concurrent {
tracing::debug!(
active_connections = active_count,
max_concurrent,
target_location = %ideal_location,
"At max concurrent connections, re-queuing location"
);
pending_conn_adds.insert(ideal_location);
break;
}
tracing::debug!(
active_connections = active_count,
max_concurrent,
target_location = %ideal_location,
"Attempting to acquire new connection"
);
let tx = self
.acquire_new(
ideal_location,
&skip_list,
¬ifier,
&live_tx_tracker,
&op_manager,
)
.await
.map_err(|error| {
tracing::error!(
?error,
"FATAL: Connection maintenance task failed - shutting down"
);
error
})?;
if tx.is_none() {
let conns = self.connection_manager.connection_count();
tracing::debug!(
connections = conns,
target_location = %ideal_location,
"acquire_new returned None - likely no peers to query through"
);
} else {
active_count += 1;
tracing::info!(
active_connections = active_count,
"Successfully initiated connection acquisition"
);
}
}
let current_connections = self.connection_manager.connection_count();
let pending_connection_targets = pending_conn_adds.len();
let peers = self.connection_manager.get_connections_by_location();
let connections_considered: usize = peers.values().map(|c| c.len()).sum();
let mut neighbor_locations: BTreeMap<_, Vec<_>> = peers
.iter()
.map(|(loc, conns)| {
let conns: Vec<_> = conns
.iter()
.filter(|conn| {
conn.location
.socket_addr()
.map(|addr| !live_tx_tracker.has_live_connection(addr))
.unwrap_or(true)
})
.cloned()
.collect();
(*loc, conns)
})
.filter(|(_, conns)| !conns.is_empty())
.collect();
if neighbor_locations.is_empty() && connections_considered > 0 {
tracing::debug!(
current_connections,
connections_considered,
live_tx_peers = live_tx_tracker.len(),
"Neighbor filtering removed all candidates; using all connections"
);
neighbor_locations = peers
.iter()
.map(|(loc, conns)| (*loc, conns.clone()))
.filter(|(_, conns)| !conns.is_empty())
.collect();
}
if current_connections > self.connection_manager.max_connections {
neighbor_locations = peers.clone();
}
tracing::debug!(
current_connections,
candidates = peers.len(),
live_tx_peers = live_tx_tracker.len(),
"Evaluating topology maintenance"
);
let bandwidth_tick_now = self.time_source.now();
feed_peer_bandwidth_to_meter(
&self.connection_manager,
&crate::transport::metrics::TRANSPORT_METRICS.per_peer_snapshot(),
&mut prev_peer_bandwidth,
prev_bandwidth_tick,
);
prev_bandwidth_tick = bandwidth_tick_now;
let adjustment = self
.connection_manager
.topology_manager
.write()
.adjust_topology(
&neighbor_locations,
&self.connection_manager.own_location().location(),
self.time_source.now(),
current_connections,
);
tracing::debug!(
adjustment = ?adjustment,
current_connections,
is_gateway,
pending_adds = pending_connection_targets,
"Topology adjustment result"
);
match adjustment {
TopologyAdjustment::AddConnections(target_locs) => {
let allowed = calculate_allowed_connection_additions(
current_connections,
pending_connection_targets,
self.connection_manager.min_connections,
self.connection_manager.max_connections,
target_locs.len(),
);
if allowed == 0 {
tracing::debug!(
requested = target_locs.len(),
current_connections,
pending = pending_connection_targets,
min_connections = self.connection_manager.min_connections,
max_connections = self.connection_manager.max_connections,
"Skipping queuing new connection targets – backlog already satisfies capacity constraints"
);
} else {
let total_pending_after = pending_connection_targets + allowed;
tracing::debug!(
requested = target_locs.len(),
allowed,
total_pending_after,
"Queuing additional connection targets"
);
pending_conn_adds.extend(target_locs.into_iter().take(allowed));
}
}
TopologyAdjustment::RemoveConnections(should_disconnect_peers) => {
for peer in should_disconnect_peers {
if let Some(addr) = peer.socket_addr() {
notifier
.notifications_sender
.send(Either::Right(crate::message::NodeEvent::DropConnection(
addr,
)))
.await
.map_err(|error| {
tracing::debug!(
error = ?error,
"Shutting down connection maintenance task"
);
error
})?;
}
}
}
TopologyAdjustment::SwapConnection {
remove,
add_location,
} => {
if let Some(addr) = remove.socket_addr() {
tracing::info!(
remove_peer = %remove,
add_target = %add_location,
"Topology swap: queuing replacement connection (drop deferred)"
);
pending_conn_adds.insert(add_location);
if !deferred_swap_drops.iter().any(|(a, _)| *a == addr) {
deferred_swap_drops.push((addr, tick_now));
}
} else {
tracing::warn!(
remove_peer = %remove,
"Topology swap skipped: peer has no socket address"
);
}
}
TopologyAdjustment::NoChange => {}
}
{
let before_len = deferred_swap_drops.len();
deferred_swap_drops.retain(|(_, queued_at)| {
tick_now.saturating_duration_since(*queued_at) < DEFERRED_SWAP_DROP_TTL
});
let expired = before_len - deferred_swap_drops.len();
if expired > 0 {
tracing::debug!(
expired,
"Deferred swap drops expired (replacement never connected)"
);
}
if !deferred_swap_drops.is_empty() {
let fresh_count = self.connection_manager.connection_count();
let min_conn = self.connection_manager.min_connections;
let n_to_drop = deferred_swap_drops_to_execute(
fresh_count,
min_conn,
deferred_swap_drops.len(),
);
for (addr, _) in deferred_swap_drops.drain(..n_to_drop) {
tracing::info!(
peer = %addr,
connections = fresh_count,
"Executing deferred swap drop (replacement connected)"
);
notifier
.notifications_sender
.send(Either::Right(crate::message::NodeEvent::DropConnection(
addr,
)))
.await
.map_err(|error| {
tracing::debug!(
?error,
"Shutting down connection maintenance task"
);
error
})?;
}
}
}
let needs_fast_tick = current_connections < self.connection_manager.min_connections;
if needs_fast_tick {
if current_connections != last_conn_count {
no_progress_ticks = 0;
} else {
no_progress_ticks = no_progress_ticks.saturating_add(1);
}
last_conn_count = current_connections;
let multiplier = if no_progress_ticks <= FAST_TICK_BACKOFF_THRESHOLD {
1u32
} else {
let excess = no_progress_ticks - FAST_TICK_BACKOFF_THRESHOLD;
2u32.saturating_pow(excess).min(MAX_FAST_TICK_MULTIPLIER)
};
let jitter: f64 = crate::config::GlobalRng::random_range(0.8..=1.2);
let adaptive_duration =
FAST_CHECK_TICK_DURATION.mul_f64(multiplier as f64 * jitter);
if multiplier > 1 {
tracing::debug!(
current_connections,
min_connections = self.connection_manager.min_connections,
no_progress_ticks,
tick_interval_secs = adaptive_duration.as_secs(),
"Fast-tick backed off due to no connection progress"
);
}
crate::deterministic_select! {
_ = refresh_density_map.tick() => {
self.refresh_density_request_cache();
},
_ = tokio::time::sleep(adaptive_duration) => {},
}
} else {
no_progress_ticks = 0;
last_conn_count = current_connections;
check_interval.reset();
crate::deterministic_select! {
_ = refresh_density_map.tick() => {
self.refresh_density_request_cache();
},
_ = check_interval.tick() => {},
}
}
}
std::future::pending::<()>().await;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, notifier, live_tx_tracker, op_manager), fields(peer = %self.connection_manager.pub_key))]
async fn acquire_new(
&self,
ideal_location: Location,
skip_list: &HashSet<SocketAddr>,
notifier: &EventLoopNotificationsSender,
live_tx_tracker: &LiveTransactionTracker,
op_manager: &Arc<OpManager>,
) -> anyhow::Result<Option<Transaction>> {
let current_connections = self.connection_manager.connection_count();
let is_gateway = self.is_gateway;
tracing::debug!(
current_connections,
is_gateway,
target_location = %ideal_location,
"acquire_new: attempting to find peer to query"
);
let query_target = {
let num_connections = self.connection_manager.num_connections();
tracing::debug!(
target_location = %ideal_location,
num_connections,
skip_list_size = skip_list.len(),
self_addr = ?self.connection_manager.get_own_addr(),
"Looking for peer to route through"
);
let candidates = self.connection_manager.routing_candidates(
ideal_location,
None,
skip_list,
false, );
let selected = if !candidates.is_empty() {
let (selected, _) = self.router.read().select_k_best_peers_with_telemetry(
candidates.iter(),
ideal_location,
1,
);
selected.into_iter().next().cloned()
} else {
None
};
if let Some(target) = selected {
tracing::debug!(
query_target = %target,
target_location = %ideal_location,
"connection_maintenance selected routing target"
);
target
} else {
tracing::warn!(
current_connections,
is_gateway,
target_location = %ideal_location,
"acquire_new: no routing candidates found - cannot find peer to query"
);
return Ok(None);
}
};
let joiner = self.connection_manager.own_location();
tracing::debug!(
this_peer = %joiner,
query_target_peer = %query_target,
target_location = %ideal_location,
"Sending connect request via connection_maintenance"
);
let _ = notifier;
let tx = Transaction::new::<crate::operations::connect::ConnectMsg>();
let gateway_addr = match query_target.socket_addr() {
Some(addr) => addr,
None => {
tracing::warn!(
target_location = %ideal_location,
"acquire_new: selected query target has no socket address, skipping spawn"
);
return Ok(None);
}
};
live_tx_tracker.add_transaction(gateway_addr, tx);
let op_manager_spawn = op_manager.clone();
let gateway = query_target;
let joiner_for_driver = joiner;
GlobalExecutor::spawn(async move {
if let Err(err) = crate::operations::connect::op_ctx_task::start_client_connect(
tx,
gateway,
gateway_addr,
&op_manager_spawn,
joiner_for_driver,
ideal_location,
None,
)
.await
{
tracing::debug!(
%tx,
%err,
"acquire_new: CONNECT driver completed with error"
);
}
});
tracing::debug!(tx = %tx, "Connect request sent");
Ok(Some(tx))
}
#[cfg(any(test, feature = "testing"))]
#[allow(dead_code)] pub fn register_topology_snapshot(&self, network_name: &str) {
let Some(peer_addr) = self.connection_manager.get_own_addr() else {
return;
};
let location = self
.connection_manager
.get_stored_location()
.map(|l| l.as_f64())
.unwrap_or(0.0);
let snapshot = self
.hosting_manager
.generate_topology_snapshot(peer_addr, location);
topology_registry::register_topology_snapshot(network_name, snapshot);
}
#[cfg(any(test, feature = "testing"))]
#[allow(dead_code)] pub fn get_topology_snapshot(&self) -> Option<topology_registry::TopologySnapshot> {
let peer_addr = self.connection_manager.get_own_addr()?;
let location = self
.connection_manager
.get_stored_location()
.map(|l| l.as_f64())
.unwrap_or(0.0);
Some(
self.hosting_manager
.generate_topology_snapshot(peer_addr, location),
)
}
}
fn calculate_max_concurrent_connections(
current_connections: usize,
min_connections: usize,
) -> usize {
const BASE: usize = 3;
const CONNECTIONS_PER_EXTRA_SLOT: usize = 3;
if current_connections >= min_connections {
return BASE;
}
let deficit = min_connections - current_connections;
let bootstrap_cap = (min_connections / 2).max(BASE);
(BASE + deficit / CONNECTIONS_PER_EXTRA_SLOT).min(bootstrap_cap)
}
fn should_respect_location_backoff(current_connections: usize, min_connections: usize) -> bool {
current_connections >= min_connections
}
fn calculate_allowed_connection_additions(
current_connections: usize,
pending_connections: usize,
min_connections: usize,
max_connections: usize,
requested: usize,
) -> usize {
if requested == 0 {
return 0;
}
let effective_connections = current_connections.saturating_add(pending_connections);
if effective_connections >= max_connections {
return 0;
}
let mut available_capacity = max_connections - effective_connections;
if current_connections < min_connections {
let deficit_to_min = min_connections.saturating_sub(effective_connections);
available_capacity = available_capacity.min(deficit_to_min);
}
available_capacity.min(requested)
}
fn deferred_swap_drops_to_execute(
current_connections: usize,
min_connections: usize,
pending_drops: usize,
) -> usize {
let mut effective_count = current_connections;
let mut n_to_drop = 0usize;
for _ in 0..pending_drops {
let remaining_pending = pending_drops - n_to_drop;
if effective_count > min_connections.saturating_add(remaining_pending) {
n_to_drop += 1;
effective_count = effective_count.saturating_sub(1);
} else {
break;
}
}
n_to_drop
}
#[inline]
fn classify_suspend_jump(boot_elapsed: Duration, mono_elapsed: Duration) -> Duration {
boot_elapsed.saturating_sub(mono_elapsed)
}
fn read_fd_usage() -> (Option<u64>, Option<u64>) {
(read_open_fd_count(), read_fd_soft_limit())
}
#[cfg(target_os = "linux")]
fn read_open_fd_count() -> Option<u64> {
let count = std::fs::read_dir("/proc/self/fd").ok()?.count() as u64;
Some(count.saturating_sub(1))
}
#[cfg(not(target_os = "linux"))]
fn read_open_fd_count() -> Option<u64> {
None
}
#[cfg(unix)]
fn read_fd_soft_limit() -> Option<u64> {
let mut limits = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut limits) } != 0 {
return None;
}
#[allow(clippy::unnecessary_cast)]
Some(limits.rlim_cur as u64)
}
#[cfg(not(unix))]
fn read_fd_soft_limit() -> Option<u64> {
None
}
#[cfg(test)]
mod fd_usage_tests {
#[cfg(target_os = "linux")]
#[test]
fn read_fd_usage_reports_sane_values_on_linux() {
let (open, soft) = super::read_fd_usage();
let open = open.expect("linux reports an open-fd count via /proc/self/fd");
let soft = soft.expect("unix reports the RLIMIT_NOFILE soft limit");
assert!(open > 0, "expected a positive open-fd count, got {open}");
assert!(
soft >= open,
"open fds ({open}) must not exceed the soft limit ({soft})"
);
}
}
#[cfg(test)]
mod resource_meter_bridge_tests {
use super::*;
use crate::topology::meter::{AttributionSource, ResourceType};
use crate::transport::TransportKeypair;
const SOURCE_RAMP_UP_DURATION: Duration = Duration::from_secs(5 * 60);
fn add_connections(cm: &ConnectionManager, n: usize) -> Vec<SocketAddr> {
let mut addrs = Vec::with_capacity(n);
for i in 0..n {
let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
let loc = Location::new((i as f64 + 0.5) / n as f64);
let keypair = TransportKeypair::new();
assert!(cm.add_connection(loc, addr, keypair.public().clone(), false));
addrs.push(addr);
}
addrs
}
fn drive_bridge(
cm: &ConnectionManager,
addrs: &[SocketAddr],
per_sec_recv: &[u64],
first_sample: Instant,
ticks: u64,
) -> HashMap<SocketAddr, (u64, u64)> {
let mut prev: HashMap<SocketAddr, (u64, u64)> = HashMap::new();
let mut cumulative: Vec<u64> = vec![0; addrs.len()];
for t in 0..ticks {
for (i, c) in cumulative.iter_mut().enumerate() {
*c += per_sec_recv[i];
}
let snapshot: Vec<(SocketAddr, u64, u64)> = addrs
.iter()
.zip(cumulative.iter())
.map(|(&addr, &recv)| (addr, 0u64, recv))
.collect();
let at = first_sample + Duration::from_secs(t);
feed_peer_bandwidth_to_meter(cm, &snapshot, &mut prev, at);
}
prev
}
#[test_log::test]
fn bridge_feeds_meter_so_overloaded_node_removes() {
let cm = ConnectionManager::test_default();
let addrs = add_connections(&cm, 6);
let per_sec_recv = vec![200_000u64; 6];
let now = Instant::now();
let first_sample = now - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
let ticks = SOURCE_RAMP_UP_DURATION.as_secs() + 29; drive_bridge(&cm, &addrs, &per_sec_recv, first_sample, ticks);
let mut neighbor_locations = BTreeMap::new();
let conns = cm.get_connections_by_location();
for (loc, c) in &conns {
neighbor_locations.insert(*loc, c.clone());
}
let adjustment =
cm.topology_manager
.write()
.adjust_topology(&neighbor_locations, &None, now, 6);
assert!(
matches!(adjustment, TopologyAdjustment::RemoveConnections(_)),
"overloaded node fed via the bridge must remove a connection, got {adjustment:?}"
);
}
#[test_log::test]
fn empty_meter_never_removes() {
let cm = ConnectionManager::test_default();
let _addrs = add_connections(&cm, 6);
let mut neighbor_locations = BTreeMap::new();
let conns = cm.get_connections_by_location();
for (loc, c) in &conns {
neighbor_locations.insert(*loc, c.clone());
}
let adjustment = cm.topology_manager.write().adjust_topology(
&neighbor_locations,
&None,
Instant::now(),
6,
);
assert!(
!matches!(adjustment, TopologyAdjustment::RemoveConnections(_)),
"with an unfed meter the remove branch must be unreachable (the #3453 bug), got {adjustment:?}"
);
}
#[test_log::test]
fn bridge_records_inbound_and_outbound_for_known_peer() {
let cm = ConnectionManager::test_default();
let addrs = add_connections(&cm, 1);
let peer = cm
.get_peer_by_addr(addrs[0])
.expect("peer is a ring connection");
let at = Instant::now();
let mut prev = HashMap::new();
feed_peer_bandwidth_to_meter(&cm, &[(addrs[0], 1_000, 2_000)], &mut prev, at);
let source = AttributionSource::Peer(peer);
let mut topo = cm.topology_manager.write();
let outbound = topo.attributed_usage_rate(
&source,
&ResourceType::OutboundBandwidthBytes,
at + Duration::from_secs(1),
);
let inbound = topo.attributed_usage_rate(
&source,
&ResourceType::InboundBandwidthBytes,
at + Duration::from_secs(1),
);
drop(topo);
assert_eq!(outbound.expect("outbound recorded").per_second(), 1_000.0);
assert_eq!(inbound.expect("inbound recorded").per_second(), 2_000.0);
}
#[test_log::test]
fn bridge_skips_unresolvable_address() {
let cm = ConnectionManager::test_default();
let _addrs = add_connections(&cm, 6);
let unknown: SocketAddr = "127.0.0.1:55555".parse().unwrap();
let now = Instant::now();
let window_end = now - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
let mut prev = HashMap::new();
let mut cum = 0u64;
for t in 0..120u64 {
cum += 10_000_000; feed_peer_bandwidth_to_meter(
&cm,
&[(unknown, 0, cum)],
&mut prev,
window_end - Duration::from_secs(120 - t),
);
}
let mut neighbor_locations = BTreeMap::new();
for (loc, c) in &cm.get_connections_by_location() {
neighbor_locations.insert(*loc, c.clone());
}
let adjustment =
cm.topology_manager
.write()
.adjust_topology(&neighbor_locations, &None, now, 6);
assert!(
!matches!(adjustment, TopologyAdjustment::RemoveConnections(_)),
"traffic for an unresolvable address must not be attributed, got {adjustment:?}"
);
assert_eq!(prev.get(&unknown).copied(), Some((0, cum)));
}
#[test_log::test]
fn bridge_handles_counter_reset_without_underflow() {
let cm = ConnectionManager::test_default();
let addrs = add_connections(&cm, 1);
let peer = cm.get_peer_by_addr(addrs[0]).unwrap();
let source = AttributionSource::Peer(peer);
let t0 = Instant::now();
let mut prev = HashMap::new();
feed_peer_bandwidth_to_meter(&cm, &[(addrs[0], 0, 1_000_000)], &mut prev, t0);
let t1 = t0 + Duration::from_secs(1);
feed_peer_bandwidth_to_meter(&cm, &[(addrs[0], 0, 10)], &mut prev, t1);
let rate = cm
.topology_manager
.write()
.attributed_usage_rate(
&source,
&ResourceType::InboundBandwidthBytes,
t1 + Duration::from_secs(1),
)
.unwrap()
.per_second();
assert!(
rate > 0.0 && rate.is_finite(),
"counter reset must clamp to a finite, non-underflowed rate, got {rate}"
);
}
#[test_log::test]
fn bridge_prunes_departed_peers_from_prev() {
let cm = ConnectionManager::test_default();
let addrs = add_connections(&cm, 3);
let at = Instant::now();
let mut prev = HashMap::new();
let full: Vec<_> = addrs.iter().map(|&a| (a, 1u64, 1u64)).collect();
feed_peer_bandwidth_to_meter(&cm, &full, &mut prev, at);
assert_eq!(prev.len(), 3);
feed_peer_bandwidth_to_meter(
&cm,
&[(addrs[0], 2, 2)],
&mut prev,
at + Duration::from_secs(1),
);
assert_eq!(prev.len(), 1, "departed peers must be pruned from prev");
assert!(prev.contains_key(&addrs[0]));
}
#[test_log::test]
fn bridge_timestamps_delta_at_interval_start_no_inflation() {
let cm = ConnectionManager::test_default();
let addrs = add_connections(&cm, 1);
let peer = cm.get_peer_by_addr(addrs[0]).unwrap();
let source = AttributionSource::Peer(peer);
let interval = Duration::from_secs(60);
let interval_start = Instant::now();
let query_time = interval_start + interval;
let mut prev = HashMap::new();
feed_peer_bandwidth_to_meter(&cm, &[(addrs[0], 0, 60_000)], &mut prev, interval_start);
let rate = cm
.topology_manager
.write()
.attributed_usage_rate(&source, &ResourceType::InboundBandwidthBytes, query_time)
.expect("inbound recorded")
.per_second();
assert!(
(rate - 1000.0).abs() < 1.0,
"interval delta must be divided by the real interval, got {rate} B/s (expected ~1000)"
);
assert!(
rate < 2000.0,
"rate must not be inflated toward the 1s-floor (delta/1s = 60000), got {rate}"
);
}
#[test_log::test]
fn bridge_prunes_departed_peer_from_meter() {
let cm = ConnectionManager::test_default();
let addrs = add_connections(&cm, 2);
let peer0 = cm.get_peer_by_addr(addrs[0]).unwrap();
let peer1 = cm.get_peer_by_addr(addrs[1]).unwrap();
let src0 = AttributionSource::Peer(peer0);
let src1 = AttributionSource::Peer(peer1);
let t0 = Instant::now();
let mut prev = HashMap::new();
feed_peer_bandwidth_to_meter(
&cm,
&[(addrs[0], 0, 10_000), (addrs[1], 0, 10_000)],
&mut prev,
t0,
);
let q = t0 + Duration::from_secs(60);
{
let mut topo = cm.topology_manager.write();
assert!(
topo.attributed_usage_rate(&src0, &ResourceType::InboundBandwidthBytes, q)
.is_some(),
"peer0 should have a meter entry after tick 1"
);
assert!(
topo.attributed_usage_rate(&src1, &ResourceType::InboundBandwidthBytes, q)
.is_some(),
"peer1 should have a meter entry after tick 1"
);
}
cm.prune_alive_connection(addrs[1]);
assert!(
cm.get_peer_by_addr(addrs[1]).is_none(),
"peer1 must no longer resolve after prune"
);
feed_peer_bandwidth_to_meter(&cm, &[(addrs[0], 0, 20_000)], &mut prev, t0);
{
let mut topo = cm.topology_manager.write();
assert!(
topo.attributed_usage_rate(&src0, &ResourceType::InboundBandwidthBytes, q)
.is_some(),
"live peer0 must keep its meter entry"
);
assert!(
topo.attributed_usage_rate(&src1, &ResourceType::InboundBandwidthBytes, q)
.is_none(),
"departed peer1 must be pruned from the meter"
);
}
}
}
#[cfg(test)]
mod k_closest_source_tests {
fn production_source() -> &'static str {
const FULL: &str = include_str!("ring.rs");
let cutoff = FULL
.find("\n#[cfg(test)]\nmod ")
.expect("ring.rs must have a top-level #[cfg(test)] mod section");
&FULL[..cutoff]
}
fn extract_fn_body<'a>(source: &'a str, signature_prefix: &str) -> &'a str {
let start = source
.find(signature_prefix)
.unwrap_or_else(|| panic!("could not find {signature_prefix}"));
let brace = source[start..].find('{').expect("fn sig must have body");
let body_start = start + brace + 1;
let bytes = source.as_bytes();
let mut depth: i32 = 1;
let mut i = body_start;
while i < bytes.len() {
match bytes[i] {
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
return &source[body_start..i];
}
}
_ => {}
}
i += 1;
}
panic!("unbalanced braces while extracting {signature_prefix}");
}
#[test]
fn k_closest_potentially_hosting_filters_transient_peers() {
let src = production_source();
let body = extract_fn_body(src, "pub fn k_closest_potentially_hosting<K>(");
assert!(
body.contains("is_transient(addr)"),
"k_closest_potentially_hosting must call is_transient(addr) on each \
candidate connection. If the filter was deliberately removed, also \
update .claude/rules/ring.md (which documents the filter rule) and \
this test. Issue #4222 / #3570."
);
assert!(
body.contains("skipped_transient"),
"k_closest_potentially_hosting must surface a skipped_transient \
counter so operators can see when the filter is removing peers."
);
}
#[test]
fn k_closest_potentially_hosting_preserves_not_ready_fallback() {
let src = production_source();
let body = extract_fn_body(src, "pub fn k_closest_potentially_hosting<K>(");
assert!(
body.contains("not_ready_fallback"),
"k_closest_potentially_hosting must keep the not-yet-ready peer \
fallback so cold-start nodes don't fail every GET with EmptyRing."
);
}
}
#[cfg(test)]
mod renewal_ban_gate_source_tests {
fn production_source() -> &'static str {
const FULL: &str = include_str!("ring.rs");
let cutoff = FULL
.find("\n#[cfg(test)]\nmod ")
.expect("ring.rs must have a top-level #[cfg(test)] mod section");
&FULL[..cutoff]
}
fn extract_fn_body<'a>(source: &'a str, signature_prefix: &str) -> &'a str {
let start = source
.find(signature_prefix)
.unwrap_or_else(|| panic!("could not find {signature_prefix}"));
let brace = source[start..].find('{').expect("fn sig must have body");
let body_start = start + brace + 1;
let bytes = source.as_bytes();
let mut depth: i32 = 1;
let mut i = body_start;
while i < bytes.len() {
match bytes[i] {
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
return &source[body_start..i];
}
}
_ => {}
}
i += 1;
}
panic!("unbalanced braces while extracting {signature_prefix}");
}
#[test]
fn renewal_loop_gates_banned_contracts_before_spawning() {
let src = production_source();
let body = extract_fn_body(
src,
"async fn recover_orphaned_subscriptions(ring: Arc<Self>",
);
let gate_pos = body.find("contract_ban_list.is_banned").expect(
"the subscription-renewal loop must gate on \
`contract_ban_list.is_banned` so a banned-but-still-subscribed \
contract stops emitting outbound SUBSCRIBE renewals (#4373). If \
this gate was removed, the egress leak is back.",
);
let can_request_pos = body
.find("can_request_subscription(&contract)")
.expect("renewal loop must still consult can_request_subscription");
assert!(
gate_pos < can_request_pos,
"ban gate (offset {gate_pos}) must run BEFORE the \
can_request_subscription spam check (offset {can_request_pos}) so \
a banned contract is always skipped"
);
let mark_pending_pos = body
.find("mark_subscription_pending(contract)")
.expect("renewal loop must still call mark_subscription_pending");
assert!(
gate_pos < mark_pending_pos,
"ban gate (offset {gate_pos}) must run BEFORE \
mark_subscription_pending (offset {mark_pending_pos})"
);
let spawn_pos = body
.find("run_renewal_subscribe(")
.expect("renewal loop must still spawn run_renewal_subscribe");
assert!(
gate_pos < spawn_pos,
"ban gate (offset {gate_pos}) must run BEFORE the \
run_renewal_subscribe outbound-egress spawn (offset {spawn_pos})"
);
}
}
#[cfg(test)]
mod renewal_ban_gate_behavior_tests {
use super::contract_ban_list::{BanReason, ContractBanList};
use crate::util::time_source::{SharedMockTimeSource, TimeSource};
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
use std::sync::Arc;
use std::time::Duration;
fn mk_key(byte: u8) -> ContractKey {
ContractKey::from_id_and_code(
ContractInstanceId::new([byte; 32]),
CodeHash::new([byte; 32]),
)
}
#[test]
fn banned_contract_is_skipped_for_renewal_while_unbanned_proceeds() {
let ts = SharedMockTimeSource::new();
let ban_list = ContractBanList::new(Arc::new(ts.clone()));
let banned = mk_key(1);
let healthy = mk_key(2);
ban_list.ban(
*banned.id(),
ts.now() + Duration::from_secs(60),
BanReason::AutoMad,
);
assert!(
ban_list.is_banned(banned.id()),
"a banned contract must be skipped for subscription renewal (#4373)"
);
assert!(
!ban_list.is_banned(healthy.id()),
"a non-banned contract must still be eligible for renewal"
);
ts.advance_time(Duration::from_secs(61));
assert!(
!ban_list.is_banned(banned.id()),
"after the ban TTL expires the contract is eligible for renewal again"
);
}
}
#[cfg(test)]
mod suspend_jump_tests {
use super::classify_suspend_jump;
use std::time::Duration;
#[test]
fn scheduler_stall_is_not_suspend() {
let boot = Duration::from_secs(139);
let mono = Duration::from_secs(139);
assert_eq!(classify_suspend_jump(boot, mono), Duration::ZERO);
}
#[test]
fn real_suspend_is_detected() {
let boot = Duration::from_secs(3600);
let mono = Duration::from_millis(50);
assert_eq!(
classify_suspend_jump(boot, mono),
Duration::from_secs(3600) - Duration::from_millis(50)
);
}
#[test]
fn healthy_tick_is_not_suspend() {
let boot = Duration::from_millis(2050);
let mono = Duration::from_millis(2048);
assert_eq!(classify_suspend_jump(boot, mono), Duration::from_millis(2));
}
#[test]
fn monotonic_ahead_of_boot_saturates_to_zero() {
let boot = Duration::from_millis(100);
let mono = Duration::from_millis(101);
assert_eq!(classify_suspend_jump(boot, mono), Duration::ZERO);
}
#[test]
fn threshold_comparison_rejects_scheduler_stall() {
let threshold = Duration::from_secs(30);
let boot = Duration::from_secs(139);
let mono = Duration::from_secs(139);
assert!(classify_suspend_jump(boot, mono) <= threshold);
}
#[test]
fn threshold_comparison_accepts_real_suspend() {
let threshold = Duration::from_secs(4);
let boot = Duration::from_secs(300);
let mono = Duration::from_millis(3);
assert!(classify_suspend_jump(boot, mono) > threshold);
}
}
#[inline]
fn should_probe_gateway(
is_gateway: bool,
has_configured_gateways: bool,
now: Instant,
next_probe: Instant,
) -> bool {
!is_gateway && has_configured_gateways && now >= next_probe
}
#[cfg(test)]
mod gateway_version_probe_predicate_tests {
use super::should_probe_gateway;
use std::time::Duration;
use tokio::time::Instant;
#[test]
fn fires_on_non_gateway_with_configured_gateways_at_due_time() {
let now = Instant::now();
let next_probe = now - Duration::from_secs(1);
assert!(should_probe_gateway(false, true, now, next_probe));
}
#[test]
fn skipped_when_running_as_gateway() {
let now = Instant::now();
let next_probe = now - Duration::from_secs(1);
assert!(!should_probe_gateway(true, true, now, next_probe));
}
#[test]
fn skipped_when_no_gateways_are_configured() {
let now = Instant::now();
let next_probe = now - Duration::from_secs(1);
assert!(!should_probe_gateway(false, false, now, next_probe));
}
#[test]
fn skipped_before_due_time() {
let now = Instant::now();
let next_probe = now + Duration::from_secs(60);
assert!(!should_probe_gateway(false, true, now, next_probe));
}
#[test]
fn fires_at_exact_due_time_boundary() {
let now = Instant::now();
assert!(should_probe_gateway(false, true, now, now));
}
#[test]
fn gateway_with_no_configured_gateways_is_still_skipped() {
let now = Instant::now();
assert!(!should_probe_gateway(true, false, now, now));
}
}
#[cfg(test)]
mod max_concurrent_connections_tests {
use super::calculate_max_concurrent_connections;
#[test]
fn at_min_connections_returns_base() {
assert_eq!(calculate_max_concurrent_connections(25, 25), 3);
}
#[test]
fn above_min_connections_returns_base() {
assert_eq!(calculate_max_concurrent_connections(30, 25), 3);
}
#[test]
fn large_deficit_scales_up() {
assert_eq!(calculate_max_concurrent_connections(10, 25), 8);
}
#[test]
fn full_deficit_capped_at_half_min() {
assert_eq!(calculate_max_concurrent_connections(0, 25), 11);
}
#[test]
fn small_deficit_adds_nothing() {
assert_eq!(calculate_max_concurrent_connections(23, 25), 3);
}
#[test]
fn very_small_min_connections() {
assert_eq!(calculate_max_concurrent_connections(0, 1), 3);
assert_eq!(calculate_max_concurrent_connections(0, 2), 3);
}
#[test]
fn high_min_connections_scales_cap() {
assert_eq!(calculate_max_concurrent_connections(0, 50), 19);
}
}
#[cfg(test)]
mod respect_location_backoff_tests {
use super::should_respect_location_backoff;
#[test]
fn below_min_bypasses_backoff() {
assert!(!should_respect_location_backoff(0, 10));
assert!(!should_respect_location_backoff(7, 10));
assert!(!should_respect_location_backoff(9, 10));
}
#[test]
fn at_or_above_min_respects_backoff() {
assert!(should_respect_location_backoff(10, 10)); assert!(should_respect_location_backoff(11, 10));
assert!(should_respect_location_backoff(20, 10));
}
}
#[cfg(test)]
mod pending_additions_tests {
use super::calculate_allowed_connection_additions;
#[test]
fn respects_minimum_when_backlog_exists() {
let allowed = calculate_allowed_connection_additions(1, 24, 25, 200, 24);
assert_eq!(allowed, 0, "Backlog should satisfy minimum deficit");
}
#[test]
fn permits_requests_until_minimum_is_met() {
let allowed = calculate_allowed_connection_additions(1, 0, 25, 200, 24);
assert_eq!(allowed, 24);
}
#[test]
fn caps_additions_at_available_capacity() {
let allowed = calculate_allowed_connection_additions(190, 5, 25, 200, 10);
assert_eq!(allowed, 5);
}
#[test]
fn respects_requested_when_capacity_allows() {
let allowed = calculate_allowed_connection_additions(50, 0, 25, 200, 3);
assert_eq!(allowed, 3);
}
}
#[cfg(test)]
mod op_manager_state_tests {
use super::{OpManagerState, classify_op_manager_ref};
use parking_lot::RwLock;
use std::sync::{Arc, Weak};
#[test]
fn empty_slot_is_not_attached() {
let slot: RwLock<Option<Weak<u32>>> = RwLock::new(None);
assert!(matches!(
classify_op_manager_ref(&slot),
OpManagerState::NotAttached
));
}
#[test]
fn live_weak_upgrades_to_live() {
let owner = Arc::new(7u32);
let slot: RwLock<Option<Weak<u32>>> = RwLock::new(Some(Arc::downgrade(&owner)));
match classify_op_manager_ref(&slot) {
OpManagerState::Live(got) => assert_eq!(*got, 7),
OpManagerState::NotAttached | OpManagerState::Detached => {
panic!("expected Live while the owner Arc is alive")
}
}
}
#[test]
fn dropped_owner_is_detached_not_not_attached() {
let owner = Arc::new(7u32);
let weak = Arc::downgrade(&owner);
let slot: RwLock<Option<Weak<u32>>> = RwLock::new(Some(weak));
drop(owner);
assert!(
matches!(classify_op_manager_ref(&slot), OpManagerState::Detached),
"a dropped owner must be Detached so connection_maintenance exits \
instead of zombie-spinning (#3308)"
);
}
#[tokio::test(start_paused = true)]
async fn detached_maintenance_task_parks_without_tripping_monitor() {
use crate::node::background_task_monitor::BackgroundTaskMonitor;
use std::time::Duration;
let owner = Arc::new(0u32);
let slot: Arc<RwLock<Option<Weak<u32>>>> =
Arc::new(RwLock::new(Some(Arc::downgrade(&owner))));
let task_slot = Arc::clone(&slot);
let handle = tokio::spawn(async move {
loop {
match classify_op_manager_ref(&task_slot) {
OpManagerState::Live(_) => {
tokio::time::sleep(Duration::from_millis(50)).await;
}
OpManagerState::NotAttached => {
tokio::time::sleep(Duration::from_millis(50)).await;
}
OpManagerState::Detached => break,
}
}
std::future::pending::<()>().await;
});
let monitor = BackgroundTaskMonitor::new();
monitor.register("connection_maintenance_sim", handle);
tokio::time::advance(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
drop(owner);
tokio::time::advance(Duration::from_millis(200)).await;
tokio::task::yield_now().await;
let exit = monitor.wait_for_any_exit();
tokio::pin!(exit);
let still_parked = tokio::time::timeout(Duration::from_millis(100), &mut exit)
.await
.is_err();
assert!(
still_parked,
"Detached maintenance task must park, not exit: a clean return \
would trip BackgroundTaskMonitor::wait_for_any_exit and crash \
the node (#3308 / #4292)"
);
}
}
#[cfg(test)]
mod refresh_router_tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use either::Either;
use futures::FutureExt;
use parking_lot::RwLock;
use crate::ring::PeerKeyLocation;
use crate::ring::location::Location;
use crate::router::{RouteEvent, RouteOutcome, Router};
use crate::tracing::{NetEventLog, NetEventRegister};
#[derive(Clone)]
struct WarmStartRegister {
events: Vec<RouteEvent>,
}
impl NetEventRegister for WarmStartRegister {
fn register_events<'a>(
&'a self,
_events: Either<NetEventLog<'a>, Vec<NetEventLog<'a>>>,
) -> futures::future::BoxFuture<'a, ()> {
async {}.boxed()
}
fn notify_of_time_out(
&mut self,
_tx: crate::message::Transaction,
_op_type: &str,
_target_peer: Option<String>,
) -> futures::future::BoxFuture<'_, ()> {
async {}.boxed()
}
fn trait_clone(&self) -> Box<dyn NetEventRegister> {
Box::new(self.clone())
}
fn get_router_events(
&self,
number: usize,
) -> futures::future::BoxFuture<'_, anyhow::Result<Vec<RouteEvent>>> {
let events = self.events.iter().take(number).cloned().collect();
async move { Ok(events) }.boxed()
}
}
fn make_route_events(count: usize) -> Vec<RouteEvent> {
(0..count)
.map(|_| RouteEvent {
peer: PeerKeyLocation::random(),
contract_location: Location::random(),
outcome: RouteOutcome::Success {
time_to_response_start: Duration::from_millis(50),
payload_size: 1000,
payload_transfer_time: Duration::from_millis(100),
},
op_type: None,
})
.collect()
}
#[tokio::test]
async fn refresh_router_loads_history_on_startup() {
let events = make_route_events(100);
let register = WarmStartRegister {
events: events.clone(),
};
let router = Arc::new(RwLock::new(Router::new(&[])));
let snapshot = router.read().snapshot();
assert_eq!(snapshot.failure_events, 0);
assert_eq!(snapshot.success_events, 0);
tokio::time::timeout(
Duration::from_millis(100),
super::Ring::refresh_router(router.clone(), register),
)
.await
.ok();
let snapshot = router.read().snapshot();
assert_eq!(
snapshot.success_events, 100,
"Router should have been populated with startup history"
);
}
#[tokio::test]
async fn refresh_router_handles_empty_history() {
let register = WarmStartRegister { events: vec![] };
let router = Arc::new(RwLock::new(Router::new(&[])));
tokio::time::timeout(
Duration::from_millis(100),
super::Ring::refresh_router(router.clone(), register),
)
.await
.ok();
let snapshot = router.read().snapshot();
assert_eq!(snapshot.failure_events, 0);
assert_eq!(snapshot.success_events, 0);
}
#[derive(Clone)]
struct FlakyRegister {
events: Vec<RouteEvent>,
fail_first: usize,
calls: Arc<AtomicUsize>,
}
impl NetEventRegister for FlakyRegister {
fn register_events<'a>(
&'a self,
_events: Either<NetEventLog<'a>, Vec<NetEventLog<'a>>>,
) -> futures::future::BoxFuture<'a, ()> {
async {}.boxed()
}
fn notify_of_time_out(
&mut self,
_tx: crate::message::Transaction,
_op_type: &str,
_target_peer: Option<String>,
) -> futures::future::BoxFuture<'_, ()> {
async {}.boxed()
}
fn trait_clone(&self) -> Box<dyn NetEventRegister> {
Box::new(self.clone())
}
fn get_router_events(
&self,
number: usize,
) -> futures::future::BoxFuture<'_, anyhow::Result<Vec<RouteEvent>>> {
let call = self.calls.fetch_add(1, Ordering::SeqCst);
let fail = call < self.fail_first;
let events: Vec<RouteEvent> = self.events.iter().take(number).cloned().collect();
async move {
if fail {
Err(anyhow::anyhow!(
"No file descriptors available (os error 24)"
))
} else {
Ok(events)
}
}
.boxed()
}
}
#[tokio::test(start_paused = true)]
async fn refresh_router_survives_transient_get_router_events_error() {
let events = make_route_events(100);
let calls = Arc::new(AtomicUsize::new(0));
let register = FlakyRegister {
events: events.clone(),
fail_first: 4,
calls: calls.clone(),
};
let router = Arc::new(RwLock::new(Router::new(&[])));
tokio::time::timeout(
Duration::from_secs(60 * 40),
super::Ring::refresh_router(router.clone(), register),
)
.await
.ok();
let total_calls = calls.load(Ordering::SeqCst);
assert!(
total_calls > 4,
"refresh loop should keep polling past transient errors, but only \
called get_router_events {total_calls} times (<= the 4 injected failures), \
implying it returned/died instead of retrying"
);
let snapshot = router.read().snapshot();
assert_eq!(
snapshot.success_events, 100,
"router should have recovered and loaded history after the transient \
errors cleared"
);
}
}
#[cfg(test)]
mod deferred_swap_drop_tests {
use super::deferred_swap_drops_to_execute;
#[test]
fn three_node_ring_no_drop_without_replacement() {
assert_eq!(deferred_swap_drops_to_execute(2, 1, 1), 0);
}
#[test]
fn three_node_ring_drop_when_replacement_connected() {
assert_eq!(deferred_swap_drops_to_execute(3, 1, 1), 1);
}
#[test]
fn no_pending_drops_returns_zero() {
assert_eq!(deferred_swap_drops_to_execute(10, 10, 0), 0);
}
#[test]
fn large_network_no_drop_without_replacement() {
assert_eq!(deferred_swap_drops_to_execute(12, 10, 3), 0);
}
#[test]
fn large_network_boundary_no_drop() {
assert_eq!(deferred_swap_drops_to_execute(13, 10, 3), 0);
}
#[test]
fn large_network_all_replacements_connected() {
assert_eq!(deferred_swap_drops_to_execute(14, 10, 3), 3);
}
#[test]
fn at_min_with_pending_no_drop() {
assert_eq!(deferred_swap_drops_to_execute(10, 10, 1), 0);
}
#[test]
fn one_above_min_with_pending_no_drop() {
assert_eq!(deferred_swap_drops_to_execute(11, 10, 1), 0);
}
#[test]
fn two_above_min_with_one_pending_drops_one() {
assert_eq!(deferred_swap_drops_to_execute(12, 10, 1), 1);
}
#[test]
fn all_zero_returns_zero() {
assert_eq!(deferred_swap_drops_to_execute(0, 0, 0), 0);
}
#[test]
fn below_min_connections_no_drop() {
assert_eq!(deferred_swap_drops_to_execute(5, 10, 2), 0);
}
}
#[cfg(test)]
mod instant_now_pin_test {
const EXPECTED_BARE_INSTANT_NOW: usize = 14;
#[test]
fn no_unexpected_bare_instant_now_call_sites() {
let src = include_str!("ring.rs");
let needle = format!("Instant{}now()", "::");
let mut count = 0usize;
for line in src.lines() {
let code = match line.split_once("//") {
Some((before, _)) => before,
None => line,
};
let bytes = code.as_bytes();
let mut idx = 0;
while let Some(pos) = code[idx..].find(&needle) {
let abs = idx + pos;
let prev_is_pathy = abs
.checked_sub(1)
.map(|p| {
let c = bytes[p];
c == b':' || c == b'_' || c.is_ascii_alphanumeric()
})
.unwrap_or(false);
if !prev_is_pathy {
count += 1;
}
idx = abs + needle.len();
}
}
assert_eq!(
count, EXPECTED_BARE_INSTANT_NOW,
"Unexpected number of bare wall-clock time-read call sites in ring.rs. \
Production code must read time via `self.time_source.now()` so the \
connection-maintenance loop stays deterministic under start_paused \
(#4277). If you intentionally added or removed a TEST-only call site, \
update EXPECTED_BARE_INSTANT_NOW; if this fired on PRODUCTION code, \
migrate it to `self.time_source.now()` instead of bumping the count."
);
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum RingError {
#[error(transparent)]
ConnError(#[from] Box<node::ConnectionError>),
#[error("No ring connections found")]
#[allow(dead_code)]
EmptyRing,
#[error("Ran out of, or haven't found any, hosting peers for contract {0}")]
NoHostingPeers(ContractInstanceId),
#[error("Peer has not joined the network yet (no ring location established)")]
PeerNotJoined,
}