mod cache;
use crate::util::backoff::{ExponentialBackoff, TrackedBackoff};
use crate::util::time_source::{InstantTimeSrc, TimeSource};
pub use cache::{AccessType, RecordAccessResult};
use cache::{DEFAULT_HOSTING_BUDGET_BYTES, DEFAULT_MIN_TTL, HostingCache};
use dashmap::{DashMap, DashSet};
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use tokio::time::Instant;
use tracing::{debug, info};
use super::interest::PeerKey;
pub const SUBSCRIPTION_RENEWAL_INTERVAL: Duration = Duration::from_secs(120);
pub const LEASE_RENEWAL_MULTIPLIER: u32 = 4;
pub const SUBSCRIPTION_LEASE_DURATION: Duration =
Duration::from_secs(SUBSCRIPTION_RENEWAL_INTERVAL.as_secs() * LEASE_RENEWAL_MULTIPLIER as u64);
const INITIAL_SUBSCRIPTION_BACKOFF: Duration = Duration::from_secs(15);
const MAX_SUBSCRIPTION_BACKOFF: Duration =
Duration::from_secs(SUBSCRIPTION_LEASE_DURATION.as_secs() / 4);
const MAX_SUBSCRIPTION_BACKOFF_ENTRIES: usize = 4096;
const MAX_DOWNSTREAM_SUBSCRIBERS_PER_CONTRACT: usize = 512;
#[derive(Debug)]
pub struct AddClientSubscriptionResult {
pub is_first_client: bool,
}
#[derive(Debug)]
pub struct ClientDisconnectResult {
pub affected_contracts: Vec<ContractKey>,
}
#[derive(Debug)]
#[allow(dead_code)] pub struct SubscribeResult {
pub is_new: bool,
pub expires_at: Instant,
}
pub(crate) struct HostingManager {
active_subscriptions: DashMap<ContractKey, Instant>,
client_subscriptions: DashMap<ContractInstanceId, HashSet<crate::client_events::ClientId>>,
hosting_cache: RwLock<HostingCache<InstantTimeSrc>>,
downstream_subscribers: DashMap<ContractKey, HashMap<PeerKey, Instant>>,
time_source: InstantTimeSrc,
pending_subscription_requests: DashSet<ContractKey>,
subscription_backoff: RwLock<TrackedBackoff<ContractKey>>,
#[cfg(feature = "redb")]
storage: RwLock<Option<crate::contract::storages::Storage>>,
#[cfg(all(feature = "sqlite", not(feature = "redb")))]
storage: RwLock<Option<crate::contract::storages::Storage>>,
}
impl HostingManager {
pub fn new() -> Self {
let backoff_config =
ExponentialBackoff::new(INITIAL_SUBSCRIPTION_BACKOFF, MAX_SUBSCRIPTION_BACKOFF);
Self {
active_subscriptions: DashMap::new(),
client_subscriptions: DashMap::new(),
hosting_cache: RwLock::new(HostingCache::new(
DEFAULT_HOSTING_BUDGET_BYTES,
DEFAULT_MIN_TTL,
InstantTimeSrc::new(),
)),
downstream_subscribers: DashMap::new(),
time_source: InstantTimeSrc::new(),
pending_subscription_requests: DashSet::new(),
subscription_backoff: RwLock::new(TrackedBackoff::new(
backoff_config,
MAX_SUBSCRIPTION_BACKOFF_ENTRIES,
)),
storage: RwLock::new(None),
}
}
pub fn set_storage(&self, storage: crate::contract::storages::Storage) {
*self.storage.write() = Some(storage);
}
pub fn subscribe(&self, contract: ContractKey) -> SubscribeResult {
let expires_at = self.time_source.now() + SUBSCRIPTION_LEASE_DURATION;
let is_new = self
.active_subscriptions
.insert(contract, expires_at)
.is_none();
debug!(
%contract,
is_new,
expires_in_secs = SUBSCRIPTION_LEASE_DURATION.as_secs(),
"subscribe: {} subscription",
if is_new { "created" } else { "renewed" }
);
SubscribeResult { is_new, expires_at }
}
#[allow(dead_code)] pub fn renew_subscription(&self, contract: &ContractKey) -> bool {
if let Some(mut entry) = self.active_subscriptions.get_mut(contract) {
*entry = self.time_source.now() + SUBSCRIPTION_LEASE_DURATION;
debug!(%contract, "renew_subscription: lease extended");
true
} else {
debug!(%contract, "renew_subscription: no active subscription to renew");
false
}
}
pub fn unsubscribe(&self, contract: &ContractKey) {
if self.active_subscriptions.remove(contract).is_some() {
crate::node::network_status::record_subscription_removed(&format!("{contract}"));
debug!(%contract, "unsubscribe: removed active subscription");
}
}
pub fn is_subscribed(&self, contract: &ContractKey) -> bool {
self.active_subscriptions
.get(contract)
.map(|expires_at| *expires_at > self.time_source.now())
.unwrap_or(false)
}
pub fn get_subscribed_contracts(&self) -> Vec<ContractKey> {
let now = self.time_source.now();
let mut contracts: Vec<ContractKey> = self
.active_subscriptions
.iter()
.filter(|entry| *entry.value() > now)
.map(|entry| *entry.key())
.collect();
contracts.sort_by(|a, b| a.id().as_bytes().cmp(b.id().as_bytes()));
contracts
}
pub fn force_subscription_renewal(&self, contract: &ContractKey) {
if self.active_subscriptions.remove(contract).is_some() {
tracing::info!(
%contract,
"force_subscription_renewal: expired subscription to trigger re-route"
);
}
}
pub fn expire_stale_subscriptions(&self) -> Vec<ContractKey> {
let now = self.time_source.now();
let mut expired = Vec::new();
self.active_subscriptions.retain(|contract, expires_at| {
if *expires_at <= now {
expired.push(*contract);
false
} else {
true
}
});
if !expired.is_empty() {
for contract in &expired {
crate::node::network_status::record_subscription_removed(&format!("{contract}"));
}
info!(
expired_count = expired.len(),
"expire_stale_subscriptions: expired stale subscriptions"
);
}
expired
}
#[allow(dead_code)] pub fn active_subscription_count(&self) -> usize {
let now = self.time_source.now();
self.active_subscriptions
.iter()
.filter(|entry| *entry.value() > now)
.count()
}
pub fn add_client_subscription(
&self,
instance_id: &ContractInstanceId,
client_id: crate::client_events::ClientId,
) -> AddClientSubscriptionResult {
let mut entry = self.client_subscriptions.entry(*instance_id).or_default();
let is_first_client = entry.is_empty();
entry.insert(client_id);
debug!(
contract = %instance_id,
%client_id,
is_first_client,
"add_client_subscription: registered"
);
AddClientSubscriptionResult { is_first_client }
}
pub fn remove_client_subscription(
&self,
instance_id: &ContractInstanceId,
client_id: crate::client_events::ClientId,
) -> bool {
let mut no_more_subscriptions = false;
if let Some(mut clients) = self.client_subscriptions.get_mut(instance_id) {
clients.remove(&client_id);
if clients.is_empty() {
no_more_subscriptions = true;
}
}
if no_more_subscriptions {
self.client_subscriptions.remove(instance_id);
}
debug!(
contract = %instance_id,
%client_id,
no_more_subscriptions,
"remove_client_subscription: removed"
);
no_more_subscriptions
}
pub fn has_client_subscriptions(&self, instance_id: &ContractInstanceId) -> bool {
self.client_subscriptions
.get(instance_id)
.map(|clients| !clients.is_empty())
.unwrap_or(false)
}
pub fn remove_client_from_all_subscriptions(
&self,
client_id: crate::client_events::ClientId,
) -> ClientDisconnectResult {
let mut affected_contracts = Vec::new();
let mut instance_ids_with_client: Vec<ContractInstanceId> = self
.client_subscriptions
.iter()
.filter(|entry| entry.value().contains(&client_id))
.map(|entry| *entry.key())
.collect();
instance_ids_with_client.sort_by(|a, b| a.as_bytes().cmp(b.as_bytes()));
for instance_id in instance_ids_with_client {
self.remove_client_subscription(&instance_id, client_id);
if let Some(contract) = self
.active_subscriptions
.iter()
.find(|entry| *entry.key().id() == instance_id)
.map(|entry| *entry.key())
{
affected_contracts.push(contract);
}
}
debug!(
%client_id,
affected_count = affected_contracts.len(),
"remove_client_from_all_subscriptions: removed"
);
ClientDisconnectResult { affected_contracts }
}
pub fn add_downstream_subscriber(&self, contract: &ContractKey, peer: PeerKey) -> bool {
let mut entry = self.downstream_subscribers.entry(*contract).or_default();
if !entry.contains_key(&peer) && entry.len() >= MAX_DOWNSTREAM_SUBSCRIBERS_PER_CONTRACT {
tracing::warn!(
contract = %contract,
limit = MAX_DOWNSTREAM_SUBSCRIBERS_PER_CONTRACT,
"Downstream subscriber limit reached, rejecting peer"
);
return false;
}
entry.insert(peer, self.time_source.now());
true
}
#[allow(dead_code)] pub fn renew_downstream_subscriber(&self, contract: &ContractKey, peer: &PeerKey) -> bool {
if let Some(mut peers) = self.downstream_subscribers.get_mut(contract) {
if peers.contains_key(peer) {
peers.insert(peer.clone(), self.time_source.now());
return true;
}
}
false
}
pub fn remove_downstream_subscriber(&self, contract: &ContractKey, peer: &PeerKey) -> bool {
let mut removed = false;
if let Some(mut peers) = self.downstream_subscribers.get_mut(contract) {
removed = peers.remove(peer).is_some();
}
if removed {
self.downstream_subscribers
.remove_if(contract, |_, peers| peers.is_empty());
}
removed
}
pub fn has_downstream_subscribers(&self, contract: &ContractKey) -> bool {
self.downstream_subscribers
.get(contract)
.is_some_and(|peers| !peers.is_empty())
}
pub fn expire_stale_downstream_subscribers(&self) -> Vec<(ContractKey, usize)> {
let now = self.time_source.now();
let mut expired_counts = Vec::new();
let keys: Vec<ContractKey> = self
.downstream_subscribers
.iter()
.map(|entry| *entry.key())
.collect();
for key in keys {
if let Some(mut peers) = self.downstream_subscribers.get_mut(&key) {
let before = peers.len();
peers.retain(|_, last_renewed| {
now.duration_since(*last_renewed) < SUBSCRIPTION_LEASE_DURATION
});
let expired = before - peers.len();
if expired > 0 {
expired_counts.push((key, expired));
}
if peers.is_empty() {
drop(peers);
self.downstream_subscribers
.remove_if(&key, |_, peers| peers.is_empty());
}
}
}
expired_counts
}
pub fn should_unsubscribe_upstream(&self, contract: &ContractKey) -> bool {
if self.has_client_subscriptions(contract.id()) {
return false;
}
!self.has_downstream_subscribers(contract)
}
pub fn record_contract_access(
&self,
key: ContractKey,
size_bytes: u64,
access_type: AccessType,
) -> RecordAccessResult {
let result = self
.hosting_cache
.write()
.record_access(key, size_bytes, access_type);
if let Some(storage) = self.storage.read().as_ref() {
#[cfg(feature = "redb")]
{
use crate::contract::storages::HostingMetadata;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let access_type_u8 = match access_type {
AccessType::Get => 0,
AccessType::Put => 1,
AccessType::Subscribe => 2,
};
let code_hash: [u8; 32] = **key.code_hash();
let metadata = HostingMetadata::new(now_ms, access_type_u8, size_bytes, code_hash);
if let Err(e) = storage.store_hosting_metadata(&key, metadata) {
tracing::warn!(
contract = %key,
error = %e,
"Failed to persist hosting metadata for accessed contract"
);
}
}
#[cfg(all(feature = "sqlite", not(feature = "redb")))]
{
tracing::trace!(
contract = %key,
"Sqlite hosting metadata update deferred to state store"
);
}
for evicted_key in &result.evicted {
#[cfg(feature = "redb")]
{
if let Err(e) = storage.remove_hosting_metadata(evicted_key) {
tracing::warn!(
contract = %evicted_key,
error = %e,
"Failed to remove persisted hosting metadata for evicted contract"
);
}
}
#[cfg(all(feature = "sqlite", not(feature = "redb")))]
{
tracing::debug!(
contract = %evicted_key,
"Evicted contract - sqlite metadata cleanup deferred"
);
}
}
}
result
}
pub fn is_hosting_contract(&self, key: &ContractKey) -> bool {
self.hosting_cache.read().contains(key)
}
pub fn hosting_contract_keys(&self) -> Vec<ContractKey> {
self.hosting_cache.read().iter().collect()
}
pub fn hosting_contract_size(&self, key: &ContractKey) -> u64 {
self.hosting_cache
.read()
.get(key)
.map(|c| c.size_bytes)
.unwrap_or(0)
}
pub fn hosting_contracts_count(&self) -> usize {
self.hosting_cache.read().len()
}
#[cfg(test)]
pub fn should_host(&self, contract: &ContractKey) -> bool {
self.is_subscribed(contract)
|| self.has_client_subscriptions(contract.id())
|| self.is_hosting_contract(contract)
}
pub fn is_receiving_updates(&self, contract: &ContractKey) -> bool {
self.is_subscribed(contract) || self.has_client_subscriptions(contract.id())
}
pub fn touch_hosting(&self, key: &ContractKey) {
self.hosting_cache.write().touch(key);
}
pub fn sweep_expired_hosting(&self) -> Vec<ContractKey> {
let expired = self.hosting_cache.write().sweep_expired(|key| {
self.has_client_subscriptions(key.id()) || self.has_downstream_subscribers(key)
});
if !expired.is_empty() {
if let Some(storage) = self.storage.read().as_ref() {
for expired_key in &expired {
#[cfg(feature = "redb")]
{
if let Err(e) = storage.remove_hosting_metadata(expired_key) {
tracing::warn!(
contract = %expired_key,
error = %e,
"Failed to remove persisted hosting metadata for expired contract"
);
}
}
#[cfg(all(feature = "sqlite", not(feature = "redb")))]
{
tracing::debug!(
contract = %expired_key,
"Expired contract - sqlite metadata cleanup deferred"
);
}
}
}
}
expired
}
pub fn can_request_subscription(&self, contract: &ContractKey) -> bool {
if self.pending_subscription_requests.contains(contract) {
return false;
}
!self.subscription_backoff.read().is_in_backoff(contract)
}
pub fn mark_subscription_pending(&self, contract: ContractKey) -> bool {
if self.pending_subscription_requests.contains(&contract) {
return false;
}
self.pending_subscription_requests.insert(contract);
true
}
pub fn complete_subscription_request(&self, contract: &ContractKey, success: bool) {
self.pending_subscription_requests.remove(contract);
if success {
self.subscription_backoff.write().record_success(contract);
} else {
self.subscription_backoff.write().record_failure(*contract);
}
}
pub fn get_subscription_states(&self) -> Vec<(ContractKey, bool, bool, Option<Instant>)> {
let now = self.time_source.now();
let mut states: Vec<_> = self
.active_subscriptions
.iter()
.map(|entry| {
let contract = *entry.key();
let expires_at = *entry.value();
let is_active = expires_at > now;
let has_client = self.has_client_subscriptions(contract.id());
(contract, has_client, is_active, Some(expires_at))
})
.collect();
states.sort_by(|(a, _, _, _), (b, _, _, _)| a.id().as_bytes().cmp(b.id().as_bytes()));
states
}
pub fn contracts_needing_renewal(&self) -> Vec<ContractKey> {
let now = self.time_source.now();
let renewal_threshold = now + SUBSCRIPTION_RENEWAL_INTERVAL;
let mut needs_renewal_set = HashSet::new();
let mut active_subs: Vec<_> = self
.active_subscriptions
.iter()
.map(|entry| (*entry.key(), *entry.value()))
.collect();
active_subs.sort_by(|(a, _), (b, _)| a.id().as_bytes().cmp(b.id().as_bytes()));
for (key, expires_at) in active_subs {
if expires_at <= renewal_threshold && expires_at > now {
needs_renewal_set.insert(key);
}
}
let mut client_instance_ids: Vec<_> =
self.client_subscriptions.iter().map(|e| *e.key()).collect();
client_instance_ids.sort_by(|a, b| a.as_bytes().cmp(b.as_bytes()));
for instance_id in client_instance_ids {
let has_active = self
.active_subscriptions
.iter()
.any(|sub| sub.key().id() == &instance_id && *sub.value() > now);
if !has_active {
if let Some(contract) = self
.hosting_cache
.read()
.iter()
.find(|k| k.id() == &instance_id)
{
needs_renewal_set.insert(contract);
}
}
}
let mut result: Vec<ContractKey> = needs_renewal_set.into_iter().collect();
result.sort_by(|a, b| a.id().as_bytes().cmp(b.id().as_bytes()));
result
}
#[allow(dead_code)] pub fn generate_topology_snapshot(
&self,
peer_addr: std::net::SocketAddr,
location: f64,
) -> super::topology_registry::TopologySnapshot {
use super::topology_registry::{ContractSubscription, TopologySnapshot};
let mut snapshot = TopologySnapshot::new(peer_addr, location);
let now = self.time_source.now();
let hosting_cache = self.hosting_cache.read();
let mut hosted_contracts: Vec<_> = hosting_cache.iter().collect();
hosted_contracts.sort_by(|a, b| a.id().as_bytes().cmp(b.id().as_bytes()));
for contract_key in hosted_contracts {
let has_client_subscriptions =
self.client_subscriptions.contains_key(contract_key.id());
snapshot.set_contract(
*contract_key.id(),
ContractSubscription {
contract_key,
upstream: None, downstream: vec![], is_hosting: true,
has_client_subscriptions,
},
);
}
let mut active_subs: Vec<_> = self
.active_subscriptions
.iter()
.map(|entry| (*entry.key(), *entry.value()))
.collect();
active_subs.sort_by(|(a, _), (b, _)| a.id().as_bytes().cmp(b.id().as_bytes()));
for (contract_key, expires_at) in active_subs {
if expires_at > now && !hosting_cache.contains(&contract_key) {
let has_client_subscriptions =
self.client_subscriptions.contains_key(contract_key.id());
snapshot.set_contract(
*contract_key.id(),
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![],
is_hosting: false,
has_client_subscriptions,
},
);
}
}
snapshot.timestamp_nanos =
crate::config::GlobalSimulationTime::current_time_ms() * 1_000_000;
snapshot
}
}
impl HostingManager {
#[cfg(feature = "redb")]
pub fn load_from_storage<F>(
&self,
storage: &crate::contract::storages::Storage,
code_hash_lookup: F,
) -> Result<usize, redb::Error>
where
F: Fn(&ContractInstanceId) -> Option<freenet_stdlib::prelude::CodeHash>,
{
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
use std::collections::HashSet;
let metadata_entries = storage.load_all_hosting_metadata()?;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let mut cache = self.hosting_cache.write();
let mut loaded = 0;
let mut loaded_instance_ids: HashSet<[u8; 32]> = HashSet::new();
for (key_bytes, metadata) in metadata_entries {
if key_bytes.len() == 32 {
let mut instance_id_bytes = [0u8; 32];
instance_id_bytes.copy_from_slice(&key_bytes);
loaded_instance_ids.insert(instance_id_bytes);
let instance_id = ContractInstanceId::new(instance_id_bytes);
let code_hash = CodeHash::new(metadata.code_hash);
let key = ContractKey::from_id_and_code(instance_id, code_hash);
let access_type = match metadata.access_type {
1 => cache::AccessType::Put,
2 => cache::AccessType::Subscribe,
_ => cache::AccessType::Get,
};
let age_ms = now_ms.saturating_sub(metadata.last_access_ms);
let age = std::time::Duration::from_millis(age_ms);
cache.load_persisted_entry(key, metadata.size_bytes, access_type, age);
loaded += 1;
}
}
let all_state_keys = storage.iter_all_state_keys().unwrap_or_default();
let mut migrated = 0;
let mut migration_failures = 0;
for key_bytes in all_state_keys {
if key_bytes.len() != 32 {
continue;
}
let mut instance_id_bytes = [0u8; 32];
instance_id_bytes.copy_from_slice(&key_bytes);
if loaded_instance_ids.contains(&instance_id_bytes) {
continue;
}
let instance_id = ContractInstanceId::new(instance_id_bytes);
if let Some(code_hash) = code_hash_lookup(&instance_id) {
let key = ContractKey::from_id_and_code(instance_id, code_hash);
let size_bytes = storage.get_state_size(&key).unwrap_or(Some(0)).unwrap_or(0);
cache.load_persisted_entry(
key,
size_bytes,
cache::AccessType::Get,
std::time::Duration::ZERO,
);
let code_hash_bytes: [u8; 32] = *code_hash;
let metadata = crate::contract::storages::HostingMetadata::new(
now_ms,
0, size_bytes,
code_hash_bytes,
);
if let Err(e) = storage.store_hosting_metadata(&key, metadata) {
tracing::warn!(
contract = %key,
error = %e,
"Failed to persist hosting metadata for migrated legacy contract"
);
}
migrated += 1;
} else {
migration_failures += 1;
tracing::warn!(
instance_id = %instance_id,
"Legacy contract has state but no WASM code - cannot migrate"
);
}
}
cache.finalize_loading();
let total_loaded = loaded + migrated;
if migrated > 0 || migration_failures > 0 {
tracing::info!(
loaded_with_metadata = loaded,
migrated_legacy = migrated,
migration_failures = migration_failures,
total_contracts = total_loaded,
total_bytes = cache.current_bytes(),
"Loaded hosting cache from storage (with legacy migration)"
);
} else {
tracing::info!(
loaded_contracts = total_loaded,
total_bytes = cache.current_bytes(),
"Loaded hosting cache from storage"
);
}
Ok(total_loaded)
}
#[cfg(all(feature = "sqlite", not(feature = "redb")))]
pub async fn load_from_storage<F>(
&self,
storage: &crate::contract::storages::Storage,
code_hash_lookup: F,
) -> Result<usize, crate::contract::storages::sqlite::SqlDbError>
where
F: Fn(&ContractInstanceId) -> Option<freenet_stdlib::prelude::CodeHash>,
{
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
use std::collections::HashSet;
let metadata_entries = storage.load_all_hosting_metadata().await?;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let mut cache = self.hosting_cache.write();
let mut loaded = 0;
let mut loaded_instance_ids: HashSet<[u8; 32]> = HashSet::new();
for (key_bytes, metadata) in metadata_entries {
if key_bytes.len() == 32 {
let mut instance_id_bytes = [0u8; 32];
instance_id_bytes.copy_from_slice(&key_bytes);
loaded_instance_ids.insert(instance_id_bytes);
let instance_id = ContractInstanceId::new(instance_id_bytes);
let code_hash = CodeHash::new(metadata.code_hash);
let key = ContractKey::from_id_and_code(instance_id, code_hash);
let access_type = match metadata.access_type {
1 => cache::AccessType::Put,
2 => cache::AccessType::Subscribe,
_ => cache::AccessType::Get,
};
let age_ms = now_ms.saturating_sub(metadata.last_access_ms);
let age = std::time::Duration::from_millis(age_ms);
cache.load_persisted_entry(key, metadata.size_bytes, access_type, age);
loaded += 1;
}
}
let all_state_keys = storage.iter_all_state_keys().await.unwrap_or_default();
let mut migrated = 0;
let mut migration_failures = 0;
for key_bytes in all_state_keys {
if key_bytes.len() != 32 {
continue;
}
let mut instance_id_bytes = [0u8; 32];
instance_id_bytes.copy_from_slice(&key_bytes);
if loaded_instance_ids.contains(&instance_id_bytes) {
continue;
}
let instance_id = ContractInstanceId::new(instance_id_bytes);
if let Some(code_hash) = code_hash_lookup(&instance_id) {
let key = ContractKey::from_id_and_code(instance_id, code_hash);
let size_bytes = storage
.get_state_size(&key)
.await
.unwrap_or(Some(0))
.unwrap_or(0);
cache.load_persisted_entry(
key,
size_bytes,
cache::AccessType::Get,
std::time::Duration::ZERO,
);
let code_hash_bytes: [u8; 32] = *code_hash;
let metadata = crate::contract::storages::sqlite::HostingMetadata::new(
now_ms,
0, size_bytes,
code_hash_bytes,
);
if let Err(e) = storage.store_hosting_metadata(&key, metadata).await {
tracing::warn!(
contract = %key,
error = %e,
"Failed to persist hosting metadata for migrated legacy contract"
);
}
migrated += 1;
} else {
migration_failures += 1;
tracing::warn!(
instance_id = %instance_id,
"Legacy contract has state but no WASM code - cannot migrate"
);
}
}
cache.finalize_loading();
let total_loaded = loaded + migrated;
if migrated > 0 || migration_failures > 0 {
tracing::info!(
loaded_with_metadata = loaded,
migrated_legacy = migrated,
migration_failures = migration_failures,
total_contracts = total_loaded,
total_bytes = cache.current_bytes(),
"Loaded hosting cache from storage (with legacy migration)"
);
} else {
tracing::info!(
loaded_contracts = total_loaded,
total_bytes = cache.current_bytes(),
"Loaded hosting cache from storage"
);
}
Ok(total_loaded)
}
}
impl Default for HostingManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use freenet_stdlib::prelude::CodeHash;
fn make_contract_key(seed: u8) -> ContractKey {
ContractKey::from_id_and_code(
ContractInstanceId::new([seed; 32]),
CodeHash::new([seed.wrapping_add(1); 32]),
)
}
#[tokio::test]
async fn test_subscribe_creates_new_subscription() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
let result = manager.subscribe(contract);
assert!(result.is_new);
assert!(manager.is_subscribed(&contract));
}
#[tokio::test]
async fn test_subscribe_renews_existing() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
let first = manager.subscribe(contract);
let second = manager.subscribe(contract);
assert!(first.is_new);
assert!(!second.is_new);
assert!(second.expires_at >= first.expires_at);
}
#[tokio::test]
async fn test_unsubscribe_removes_subscription() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
manager.subscribe(contract);
assert!(manager.is_subscribed(&contract));
manager.unsubscribe(&contract);
assert!(!manager.is_subscribed(&contract));
}
#[tokio::test]
async fn test_renew_subscription() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
assert!(!manager.renew_subscription(&contract));
manager.subscribe(contract);
assert!(manager.renew_subscription(&contract));
}
#[tokio::test]
async fn test_get_subscribed_contracts() {
let manager = HostingManager::new();
let c1 = make_contract_key(1);
let c2 = make_contract_key(2);
let c3 = make_contract_key(3);
manager.subscribe(c1);
manager.subscribe(c2);
manager.subscribe(c3);
manager.unsubscribe(&c2);
let subscribed = manager.get_subscribed_contracts();
assert_eq!(subscribed.len(), 2);
assert!(subscribed.contains(&c1));
assert!(!subscribed.contains(&c2));
assert!(subscribed.contains(&c3));
}
#[tokio::test]
async fn test_active_subscription_count() {
let manager = HostingManager::new();
assert_eq!(manager.active_subscription_count(), 0);
manager.subscribe(make_contract_key(1));
manager.subscribe(make_contract_key(2));
assert_eq!(manager.active_subscription_count(), 2);
manager.unsubscribe(&make_contract_key(1));
assert_eq!(manager.active_subscription_count(), 1);
}
#[test]
fn test_client_subscription_basic() {
let manager = HostingManager::new();
let instance_id = ContractInstanceId::new([1; 32]);
let client_id = crate::client_events::ClientId::next();
let result = manager.add_client_subscription(&instance_id, client_id);
assert!(result.is_first_client);
assert!(manager.has_client_subscriptions(&instance_id));
let is_last = manager.remove_client_subscription(&instance_id, client_id);
assert!(is_last);
assert!(!manager.has_client_subscriptions(&instance_id));
}
#[test]
fn test_client_subscription_multiple_clients() {
let manager = HostingManager::new();
let instance_id = ContractInstanceId::new([1; 32]);
let client1 = crate::client_events::ClientId::next();
let client2 = crate::client_events::ClientId::next();
let r1 = manager.add_client_subscription(&instance_id, client1);
let r2 = manager.add_client_subscription(&instance_id, client2);
assert!(r1.is_first_client);
assert!(!r2.is_first_client);
let is_last1 = manager.remove_client_subscription(&instance_id, client1);
assert!(!is_last1);
let is_last2 = manager.remove_client_subscription(&instance_id, client2);
assert!(is_last2);
}
#[test]
fn test_hosting_cache_basic() {
let manager = HostingManager::new();
let key = make_contract_key(1);
assert!(!manager.is_hosting_contract(&key));
assert_eq!(manager.hosting_contracts_count(), 0);
manager.record_contract_access(key, 1000, AccessType::Put);
assert!(manager.is_hosting_contract(&key));
assert_eq!(manager.hosting_contracts_count(), 1);
}
#[test]
fn test_subscription_backoff() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
assert!(manager.can_request_subscription(&contract));
assert!(manager.mark_subscription_pending(contract));
assert!(!manager.can_request_subscription(&contract));
manager.complete_subscription_request(&contract, false);
assert!(!manager.can_request_subscription(&contract));
}
#[test]
fn test_should_host() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
assert!(!manager.should_host(&contract));
manager.record_contract_access(contract, 1000, AccessType::Put);
assert!(manager.should_host(&contract));
}
#[test]
fn test_hosted_contract_not_in_renewal_after_restart() {
let manager = HostingManager::new();
let contract = make_contract_key(42);
manager.record_contract_access(contract, 1000, AccessType::Get);
assert!(manager.is_hosting_contract(&contract));
assert!(
manager.contracts_needing_renewal().is_empty(),
"Hosted-only contract must NOT be in renewal list"
);
}
#[test]
fn test_is_receiving_updates_excludes_hosting_cache_only() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
assert!(!manager.is_receiving_updates(&contract));
manager.record_contract_access(contract, 1000, AccessType::Put);
assert!(manager.should_host(&contract));
assert!(
!manager.is_receiving_updates(&contract),
"Hosting cache alone should NOT count as receiving updates"
);
manager.subscribe(contract);
assert!(manager.is_receiving_updates(&contract));
}
#[test]
fn test_is_receiving_updates_with_client_subscription() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
let client_id = crate::client_events::ClientId::next();
assert!(!manager.is_receiving_updates(&contract));
manager.add_client_subscription(contract.id(), client_id);
assert!(manager.is_receiving_updates(&contract));
}
#[test]
fn test_contracts_needing_renewal_excludes_hosted_only() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
manager.record_contract_access(contract, 1000, AccessType::Get);
let needs_renewal = manager.contracts_needing_renewal();
assert!(
!needs_renewal.contains(&contract),
"Hosted-only contract should NOT be in renewal list"
);
let client_id = crate::client_events::ClientId::next();
manager.add_client_subscription(contract.id(), client_id);
let needs_renewal = manager.contracts_needing_renewal();
assert!(
needs_renewal.contains(&contract),
"Contract with client subscription should need renewal"
);
}
#[ignore]
#[test]
fn test_hosted_contract_renewed_despite_no_interest() {
let manager = HostingManager::new();
let contract = make_contract_key(42);
manager.record_contract_access(contract, 1000, AccessType::Get);
assert!(manager.is_hosting_contract(&contract));
let renewals = manager.contracts_needing_renewal();
assert!(
!renewals.contains(&contract),
"Hosted contract should NOT be in renewal list (startup window removed in #3546)"
);
}
#[ignore]
#[test]
fn test_startup_revalidation_includes_hosted_contracts() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
manager.record_contract_access(contract, 1000, AccessType::Get);
let needs_renewal = manager.contracts_needing_renewal();
assert!(
!needs_renewal.contains(&contract),
"Hosted contract should NOT be in renewal list (startup window removed in #3546)"
);
}
#[ignore]
#[test]
fn test_startup_revalidation_skips_already_subscribed() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
manager.record_contract_access(contract, 1000, AccessType::Get);
manager.subscribe(contract);
let needs_renewal = manager.contracts_needing_renewal();
assert!(
!needs_renewal.contains(&contract),
"Already-subscribed contract should not be in renewal list"
);
}
#[ignore]
#[test]
fn test_startup_revalidation_window_expires() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
manager.record_contract_access(contract, 1000, AccessType::Get);
let needs_renewal = manager.contracts_needing_renewal();
assert!(
!needs_renewal.contains(&contract),
"Hosted-only contract should NOT be in renewal list"
);
}
#[ignore]
#[test]
fn test_startup_revalidation_multiple_contracts() {
let manager = HostingManager::new();
let contract_a = make_contract_key(1);
let contract_b = make_contract_key(2);
let contract_c = make_contract_key(3);
manager.record_contract_access(contract_a, 1000, AccessType::Get);
manager.record_contract_access(contract_b, 1000, AccessType::Get);
manager.record_contract_access(contract_c, 1000, AccessType::Get);
manager.subscribe(contract_b);
let client_id = crate::client_events::ClientId::next();
manager.add_client_subscription(contract_c.id(), client_id);
let needs_renewal = manager.contracts_needing_renewal();
assert!(
!needs_renewal.contains(&contract_a),
"Hosted-only contract_a should NOT be included (startup window removed)"
);
assert!(
!needs_renewal.contains(&contract_b),
"Subscribed contract_b should be excluded (not expiring soon)"
);
assert!(
needs_renewal.contains(&contract_c),
"Client-subscribed contract_c should be included"
);
}
#[test]
fn test_hosted_contracts_not_renewed_at_scale() {
let manager = HostingManager::new();
for i in 0..200u8 {
let contract = make_contract_key(i);
manager.record_contract_access(contract, 1000, AccessType::Get);
}
assert_eq!(manager.hosting_contracts_count(), 200);
let needs_renewal = manager.contracts_needing_renewal();
assert!(
needs_renewal.is_empty(),
"200 hosted-only contracts should NOT be in renewal list, found {}",
needs_renewal.len()
);
let client_id = crate::client_events::ClientId::next();
let contract_a = make_contract_key(42);
let contract_b = make_contract_key(99);
manager.add_client_subscription(contract_a.id(), client_id);
manager.add_client_subscription(contract_b.id(), client_id);
let needs_renewal = manager.contracts_needing_renewal();
assert_eq!(
needs_renewal.len(),
2,
"Only 2 client-subscribed contracts should need renewal, found {}",
needs_renewal.len()
);
assert!(needs_renewal.contains(&contract_a));
assert!(needs_renewal.contains(&contract_b));
}
#[test]
fn test_backoff_shorter_than_lease() {
assert!(
MAX_SUBSCRIPTION_BACKOFF < SUBSCRIPTION_LEASE_DURATION,
"MAX_SUBSCRIPTION_BACKOFF ({:?}) must be shorter than \
SUBSCRIPTION_LEASE_DURATION ({:?}), otherwise subscriptions \
expire before retry",
MAX_SUBSCRIPTION_BACKOFF,
SUBSCRIPTION_LEASE_DURATION
);
}
#[test]
fn test_backoff_sequence_within_lease() {
let backoff =
ExponentialBackoff::new(INITIAL_SUBSCRIPTION_BACKOFF, MAX_SUBSCRIPTION_BACKOFF);
for failures in 1..=10 {
let delay = backoff.delay_for_failures(failures);
assert!(
delay < SUBSCRIPTION_LEASE_DURATION,
"Backoff delay after {} failures ({:?}) exceeds lease ({:?})",
failures,
delay,
SUBSCRIPTION_LEASE_DURATION
);
}
}
fn make_peer_key(seed: u8) -> PeerKey {
PeerKey(crate::transport::TransportPublicKey::from_bytes([seed; 32]))
}
#[test]
fn test_should_unsubscribe_upstream_unknown_contract() {
let manager = HostingManager::new();
let unknown_contract = make_contract_key(99);
assert!(
manager.should_unsubscribe_upstream(&unknown_contract),
"Unknown contract with no clients and no downstream should return true"
);
assert!(!manager.has_downstream_subscribers(&unknown_contract));
assert!(!manager.has_client_subscriptions(unknown_contract.id()));
}
#[test]
fn test_should_unsubscribe_upstream() {
let manager = HostingManager::new();
let contract = make_contract_key(1);
let peer = make_peer_key(10);
let client_id = crate::client_events::ClientId::next();
assert!(manager.should_unsubscribe_upstream(&contract));
manager.add_downstream_subscriber(&contract, peer.clone());
assert!(!manager.should_unsubscribe_upstream(&contract));
manager.remove_downstream_subscriber(&contract, &peer);
assert!(manager.should_unsubscribe_upstream(&contract));
manager.add_client_subscription(contract.id(), client_id);
assert!(!manager.should_unsubscribe_upstream(&contract));
}
#[test]
fn test_chain_propagation_single_downstream() {
let manager = HostingManager::new();
let contract = make_contract_key(10);
let downstream_c = make_peer_key(30);
manager.subscribe(contract);
manager.add_downstream_subscriber(&contract, downstream_c.clone());
assert!(manager.remove_downstream_subscriber(&contract, &downstream_c));
assert!(
manager.should_unsubscribe_upstream(&contract),
"Node with no clients and no downstream should propagate unsubscribe upstream"
);
}
#[test]
fn test_no_propagation_with_remaining_downstream() {
let manager = HostingManager::new();
let contract = make_contract_key(10);
let downstream_a = make_peer_key(10);
let downstream_c = make_peer_key(30);
manager.subscribe(contract);
manager.add_downstream_subscriber(&contract, downstream_a.clone());
manager.add_downstream_subscriber(&contract, downstream_c.clone());
assert!(manager.remove_downstream_subscriber(&contract, &downstream_c));
assert!(
!manager.should_unsubscribe_upstream(&contract),
"Node with remaining downstream should NOT propagate unsubscribe"
);
}
#[test]
fn test_no_propagation_with_local_client() {
let manager = HostingManager::new();
let contract = make_contract_key(10);
let downstream_peer = make_peer_key(10);
let client_id = crate::client_events::ClientId::next();
manager.subscribe(contract);
manager.add_downstream_subscriber(&contract, downstream_peer.clone());
manager.add_client_subscription(contract.id(), client_id);
assert!(manager.remove_downstream_subscriber(&contract, &downstream_peer));
assert!(
!manager.should_unsubscribe_upstream(&contract),
"Node with local client should NOT propagate unsubscribe even if downstream is empty"
);
}
#[test]
fn test_client_disconnect_triggers_unsubscribe_decision() {
let manager = HostingManager::new();
let contract = make_contract_key(10);
let client_id = crate::client_events::ClientId::next();
manager.subscribe(contract);
manager.add_client_subscription(contract.id(), client_id);
assert!(!manager.should_unsubscribe_upstream(&contract));
let result = manager.remove_client_from_all_subscriptions(client_id);
assert_eq!(
result.affected_contracts.len(),
1,
"Disconnect should report the affected contract"
);
assert_eq!(result.affected_contracts[0], contract);
assert!(
manager.should_unsubscribe_upstream(&contract),
"After client disconnect with no downstream, should propagate unsubscribe"
);
}
#[test]
fn test_client_disconnect_partial_unsubscribe() {
let manager = HostingManager::new();
let contract_a = make_contract_key(10);
let contract_b = make_contract_key(20);
let client_id = crate::client_events::ClientId::next();
let downstream_peer = make_peer_key(50);
manager.subscribe(contract_a);
manager.subscribe(contract_b);
manager.add_client_subscription(contract_a.id(), client_id);
manager.add_client_subscription(contract_b.id(), client_id);
manager.add_downstream_subscriber(&contract_b, downstream_peer.clone());
let result = manager.remove_client_from_all_subscriptions(client_id);
assert_eq!(result.affected_contracts.len(), 2);
assert!(
manager.should_unsubscribe_upstream(&contract_a),
"Contract with no remaining interest should trigger unsubscribe"
);
assert!(
!manager.should_unsubscribe_upstream(&contract_b),
"Contract with downstream subscribers should NOT trigger unsubscribe"
);
}
#[test]
fn test_expire_downstream_triggers_unsubscribe_decision() {
let manager = HostingManager::new();
let contract = make_contract_key(10);
let peer = make_peer_key(10);
manager.subscribe(contract);
manager.add_downstream_subscriber(&contract, peer.clone());
assert!(!manager.should_unsubscribe_upstream(&contract));
if let Some(mut peers) = manager.downstream_subscribers.get_mut(&contract) {
peers.insert(
peer.clone(),
Instant::now() - SUBSCRIPTION_LEASE_DURATION - Duration::from_secs(1),
);
}
let expired = manager.expire_stale_downstream_subscribers();
assert_eq!(
expired.len(),
1,
"Should detect one contract with expired downstream"
);
assert_eq!(expired[0].0, contract);
assert_eq!(expired[0].1, 1, "One peer should have expired");
assert!(
manager.should_unsubscribe_upstream(&contract),
"After all downstream subscribers expire, should propagate unsubscribe"
);
}
#[test]
fn test_partial_downstream_expiry_no_unsubscribe() {
let manager = HostingManager::new();
let contract = make_contract_key(10);
let stale_peer = make_peer_key(10);
let fresh_peer = make_peer_key(20);
manager.subscribe(contract);
manager.add_downstream_subscriber(&contract, stale_peer.clone());
manager.add_downstream_subscriber(&contract, fresh_peer.clone());
if let Some(mut peers) = manager.downstream_subscribers.get_mut(&contract) {
peers.insert(
stale_peer,
Instant::now() - SUBSCRIPTION_LEASE_DURATION - Duration::from_secs(1),
);
}
let expired = manager.expire_stale_downstream_subscribers();
assert_eq!(expired.len(), 1, "One contract had expired peers");
assert_eq!(expired[0].0, contract);
assert_eq!(expired[0].1, 1, "One peer should have expired");
assert!(
!manager.should_unsubscribe_upstream(&contract),
"Contract with remaining downstream should NOT trigger unsubscribe"
);
}
fn make_interest_manager() -> crate::ring::interest::InterestManager<InstantTimeSrc> {
crate::ring::interest::InterestManager::new(InstantTimeSrc::new())
}
#[test]
fn test_unsubscribe_handler_contract_found_peer_resolved() {
let manager = HostingManager::new();
let interest = make_interest_manager();
let contract = make_contract_key(1);
let peer = make_peer_key(10);
manager.add_downstream_subscriber(&contract, peer.clone());
interest.register_peer_interest(&contract, peer.clone(), None, true);
assert!(!manager.should_unsubscribe_upstream(&contract));
manager.remove_downstream_subscriber(&contract, &peer);
interest.remove_peer_interest(&contract, &peer);
assert!(!manager.has_downstream_subscribers(&contract));
assert!(manager.should_unsubscribe_upstream(&contract));
}
#[test]
fn test_unsubscribe_handler_unknown_peer_is_noop() {
let manager = HostingManager::new();
let contract = make_contract_key(2);
let known_peer = make_peer_key(20);
let unknown_peer = make_peer_key(99);
manager.add_downstream_subscriber(&contract, known_peer.clone());
assert!(!manager.remove_downstream_subscriber(&contract, &unknown_peer));
assert!(manager.has_downstream_subscribers(&contract));
assert!(!manager.should_unsubscribe_upstream(&contract));
}
#[test]
fn test_unsubscribe_handler_unknown_contract_is_noop() {
let manager = HostingManager::new();
let known_contract = make_contract_key(3);
let unknown_contract = make_contract_key(99);
let peer = make_peer_key(30);
manager.add_downstream_subscriber(&known_contract, peer.clone());
assert!(!manager.remove_downstream_subscriber(&unknown_contract, &peer));
assert!(manager.has_downstream_subscribers(&known_contract));
assert!(!manager.has_downstream_subscribers(&unknown_contract));
}
#[test]
fn test_unsubscribe_dual_tracking_authority() {
let manager = HostingManager::new();
let interest = make_interest_manager();
let contract = make_contract_key(4);
let peer = make_peer_key(40);
manager.add_downstream_subscriber(&contract, peer.clone());
interest.register_peer_interest(&contract, peer.clone(), None, true);
manager.remove_downstream_subscriber(&contract, &peer);
assert!(manager.should_unsubscribe_upstream(&contract));
assert!(interest.remove_peer_interest(&contract, &peer));
}
#[test]
fn test_downstream_subscribers_protect_from_eviction() {
use crate::ring::hosting::cache::HostingCache;
use crate::util::time_source::SharedMockTimeSource;
let time = SharedMockTimeSource::new();
let min_ttl = Duration::from_secs(60);
let mut cache = HostingCache::new(150, min_ttl, time.clone());
let protected = make_contract_key(1);
let unprotected = make_contract_key(2);
cache.record_access(protected, 100, AccessType::Get);
cache.record_access(unprotected, 100, AccessType::Get);
assert_eq!(cache.current_bytes(), 200);
time.advance_time(Duration::from_secs(61));
let evicted = cache.sweep_expired(|k| *k == protected);
assert!(
!evicted.contains(&protected),
"Contract with downstream subscribers must not be evicted"
);
assert!(
evicted.contains(&unprotected),
"Unprotected contract should be evicted when over budget + past TTL"
);
assert!(cache.contains(&protected));
}
#[test]
fn test_no_subscribers_allows_eviction() {
use crate::ring::hosting::cache::HostingCache;
use crate::util::time_source::SharedMockTimeSource;
let time = SharedMockTimeSource::new();
let min_ttl = Duration::from_secs(60);
let mut cache = HostingCache::new(80, min_ttl, time.clone());
let contract = make_contract_key(100);
cache.record_access(contract, 100, AccessType::Get);
assert!(cache.contains(&contract));
let evicted = cache.sweep_expired(|_| false);
assert!(
evicted.is_empty(),
"Contract within TTL should not be evicted"
);
time.advance_time(Duration::from_secs(61));
let evicted = cache.sweep_expired(|_| false);
assert!(
evicted.contains(&contract),
"Contract past TTL with no subscribers should be evicted"
);
assert!(!cache.contains(&contract));
}
#[test]
fn test_downstream_subscriber_limit_enforced() {
let manager = HostingManager::new();
let contract = make_contract_key(50);
let limit = MAX_DOWNSTREAM_SUBSCRIBERS_PER_CONTRACT;
let mut peers = Vec::with_capacity(limit);
for i in 0..limit {
let peer = PeerKey(crate::transport::TransportPublicKey::from_bytes({
let mut bytes = [0u8; 32];
bytes[0] = (i & 0xFF) as u8;
bytes[1] = ((i >> 8) & 0xFF) as u8;
bytes[2] = ((i >> 16) & 0xFF) as u8;
bytes
}));
peers.push(peer.clone());
let result = manager.add_downstream_subscriber(&contract, peer);
assert!(
result,
"Downstream subscriber {i} should succeed within limit (count before: {i})"
);
}
let actual_count = manager
.downstream_subscribers
.get(&contract)
.map(|e| e.len())
.unwrap_or(0);
assert_eq!(
actual_count, limit,
"Should have exactly {limit} entries, got {actual_count}"
);
let extra_peer = PeerKey(crate::transport::TransportPublicKey::from_bytes([0xAA; 32]));
let is_new = !manager
.downstream_subscribers
.get(&contract)
.map(|e| e.contains_key(&extra_peer))
.unwrap_or(false);
assert!(is_new, "Extra peer should not already be in the set");
let rejected = !manager.add_downstream_subscriber(&contract, extra_peer);
assert!(
rejected,
"Downstream subscriber beyond limit should be rejected (count was {actual_count})"
);
}
#[test]
fn test_downstream_subscriber_existing_peer_can_renew_at_limit() {
let manager = HostingManager::new();
let contract = make_contract_key(51);
let first_peer = make_peer_key(1);
manager.add_downstream_subscriber(&contract, first_peer.clone());
for i in 1..MAX_DOWNSTREAM_SUBSCRIBERS_PER_CONTRACT {
let peer = PeerKey(crate::transport::TransportPublicKey::from_bytes({
let mut bytes = [0u8; 32];
bytes[0] = (i & 0xFF) as u8;
bytes[1] = ((i >> 8) & 0xFF) as u8;
bytes
}));
manager.add_downstream_subscriber(&contract, peer);
}
assert!(
manager.add_downstream_subscriber(&contract, first_peer),
"Existing peer should be able to renew at limit"
);
}
#[test]
fn test_expire_returns_expired_count_for_interest_sync() {
let manager = HostingManager::new();
let interest = make_interest_manager();
let contract = make_contract_key(90);
let peer_a = make_peer_key(90);
let peer_b = make_peer_key(91);
manager.add_downstream_subscriber(&contract, peer_a.clone());
interest.add_downstream_subscriber(&contract);
manager.add_downstream_subscriber(&contract, peer_b.clone());
interest.add_downstream_subscriber(&contract);
let count = interest.with_local_interest(&contract, |li| li.downstream_subscriber_count);
assert_eq!(count, 2);
if let Some(mut peers) = manager.downstream_subscribers.get_mut(&contract) {
let stale = Instant::now() - SUBSCRIPTION_LEASE_DURATION - Duration::from_secs(1);
peers.insert(peer_a, stale);
peers.insert(peer_b, stale);
}
let expired = manager.expire_stale_downstream_subscribers();
assert_eq!(expired.len(), 1);
let (expired_contract, expired_count) = &expired[0];
assert_eq!(*expired_contract, contract);
assert_eq!(*expired_count, 2);
for _ in 0..*expired_count {
interest.remove_downstream_subscriber(expired_contract);
}
assert!(
!interest.has_local_interest(&contract),
"downstream_subscriber_count should be 0 after syncing with TTL expiry"
);
}
}