use std::{
cmp::Reverse,
collections::{BTreeSet, HashSet},
net::SocketAddr,
sync::{
Arc, OnceLock,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::Duration,
};
use dashmap::{DashMap, DashSet};
use either::Either;
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey, WrappedState};
use parking_lot::{Mutex, RwLock};
use tokio::sync::{mpsc, oneshot};
use tracing::Instrument;
use crate::{
client_events::HostResult,
config::GlobalExecutor,
contract::{ContractError, ContractHandlerChannel, ContractHandlerEvent, SenderHalve},
message::{InterestMessage, MessageStats, NetMessage, NetMessageV1, NodeEvent, Transaction},
operations::{
OpCtx, OpError, connect::ConnectForwardEstimator, orphan_streams::OrphanStreamRegistry,
},
ring::{
ConnectionManager, LiveTransactionTracker, PeerConnectionBackoff, PeerKey, PeerKeyLocation,
Ring,
},
transport::TransportPublicKey,
util::time_source::InstantTimeSrc,
};
use super::{
NetEventRegister, NodeConfig, RequestRouter, neighbor_hosting::NeighborHostingManager,
network_bridge::EventLoopNotificationsSender,
};
#[derive(Default)]
struct Ops {
completed: DashSet<Transaction>,
under_progress: DashSet<Transaction>,
}
#[derive(Debug, Default)]
struct OpsSizes {
completed: usize,
under_progress: usize,
}
impl Ops {
fn sizes(&self) -> OpsSizes {
OpsSizes {
completed: self.completed.len(),
under_progress: self.under_progress.len(),
}
}
}
pub(crate) struct OpManager {
pub ring: Arc<Ring>,
ops: Arc<Ops>,
pub(crate) to_event_listener: EventLoopNotificationsSender,
pub ch_outbound: Arc<ContractHandlerChannel<SenderHalve>>,
new_transactions: tokio::sync::mpsc::Sender<Transaction>,
pub result_router_tx: mpsc::Sender<(Transaction, HostResult)>,
pub(crate) connect_forward_estimator: Arc<RwLock<ConnectForwardEstimator>>,
pub peer_ready: Arc<AtomicBool>,
pub is_gateway: bool,
contract_waiters:
Arc<Mutex<std::collections::HashMap<ContractInstanceId, Vec<oneshot::Sender<()>>>>>,
pub neighbor_hosting: Arc<NeighborHostingManager>,
pub interest_manager: Arc<crate::ring::interest::InterestManager<InstantTimeSrc>>,
pub broadcast_dedup_cache: Arc<crate::operations::update::BroadcastDedupCache>,
pub(crate) update_propagation_stats:
Arc<crate::operations::update::propagation_stats::UpdatePropagationStats>,
pub(crate) pending_broadcasts:
Arc<crate::operations::update::pending_broadcast::PendingBroadcastStore>,
request_router: Arc<OnceLock<Arc<RequestRouter>>>,
orphan_stream_registry: Arc<OrphanStreamRegistry>,
pub streaming_threshold: usize,
pub gateway_backoff: Arc<Mutex<PeerConnectionBackoff>>,
pub gateway_backoff_cleared: Arc<tokio::sync::Notify>,
pub blocked_addresses: Option<Arc<HashSet<SocketAddr>>>,
pub configured_gateways: Arc<Vec<PeerKeyLocation>>,
pub(crate) pending_contract_fetches: Arc<DashMap<ContractInstanceId, u64>>,
pub(crate) active_relay_get_txs: Arc<DashSet<Transaction>>,
pub(crate) active_relay_update_txs: Arc<DashSet<Transaction>>,
pub(crate) active_relay_put_txs: Arc<DashSet<Transaction>>,
pub(crate) active_relay_subscribe_txs: Arc<DashSet<Transaction>>,
pub(crate) active_relay_connect_txs: Arc<DashSet<Transaction>>,
pub(crate) inflight_client_ops: Arc<AtomicUsize>,
pub(crate) shutting_down: Arc<AtomicBool>,
}
impl Clone for OpManager {
fn clone(&self) -> Self {
Self {
ring: self.ring.clone(),
ops: self.ops.clone(),
to_event_listener: self.to_event_listener.clone(),
ch_outbound: self.ch_outbound.clone(),
new_transactions: self.new_transactions.clone(),
result_router_tx: self.result_router_tx.clone(),
connect_forward_estimator: self.connect_forward_estimator.clone(),
peer_ready: self.peer_ready.clone(),
is_gateway: self.is_gateway,
contract_waiters: self.contract_waiters.clone(),
neighbor_hosting: self.neighbor_hosting.clone(),
interest_manager: self.interest_manager.clone(),
broadcast_dedup_cache: self.broadcast_dedup_cache.clone(),
update_propagation_stats: self.update_propagation_stats.clone(),
pending_broadcasts: self.pending_broadcasts.clone(),
request_router: self.request_router.clone(),
orphan_stream_registry: self.orphan_stream_registry.clone(),
streaming_threshold: self.streaming_threshold,
gateway_backoff: self.gateway_backoff.clone(),
gateway_backoff_cleared: self.gateway_backoff_cleared.clone(),
blocked_addresses: self.blocked_addresses.clone(),
configured_gateways: self.configured_gateways.clone(),
pending_contract_fetches: self.pending_contract_fetches.clone(),
active_relay_get_txs: self.active_relay_get_txs.clone(),
active_relay_update_txs: self.active_relay_update_txs.clone(),
active_relay_put_txs: self.active_relay_put_txs.clone(),
active_relay_subscribe_txs: self.active_relay_subscribe_txs.clone(),
active_relay_connect_txs: self.active_relay_connect_txs.clone(),
inflight_client_ops: self.inflight_client_ops.clone(),
shutting_down: self.shutting_down.clone(),
}
}
}
pub(crate) struct ClientOpGuard {
counter: Arc<AtomicUsize>,
}
impl ClientOpGuard {
fn new(counter: Arc<AtomicUsize>) -> Self {
counter.fetch_add(1, Ordering::SeqCst);
Self { counter }
}
}
impl Drop for ClientOpGuard {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::Relaxed);
}
}
impl OpManager {
pub(super) fn new<ER: NetEventRegister + Clone>(
notification_channel: EventLoopNotificationsSender,
ch_outbound: ContractHandlerChannel<SenderHalve>,
config: &NodeConfig,
event_register: ER,
connection_manager: ConnectionManager,
result_router_tx: mpsc::Sender<(Transaction, HostResult)>,
task_monitor: &super::background_task_monitor::BackgroundTaskMonitor,
) -> anyhow::Result<Self> {
let ring = Ring::new(
config,
notification_channel.clone(),
event_register.clone(),
config.is_gateway,
connection_manager,
task_monitor,
)?;
let ops = Arc::new(Ops::default());
let (new_transactions, rx) = tokio::sync::mpsc::channel(100);
let current_span = tracing::Span::current();
let garbage_span = if current_span.is_none() {
tracing::info_span!("garbage_cleanup_task")
} else {
tracing::info_span!(parent: current_span, "garbage_cleanup_task")
};
let connect_forward_estimator = Arc::new(RwLock::new(ConnectForwardEstimator::new()));
let request_router = Arc::new(OnceLock::new());
let ch_outbound = Arc::new(ch_outbound);
let contract_waiters: Arc<
Mutex<std::collections::HashMap<ContractInstanceId, Vec<oneshot::Sender<()>>>>,
> = Arc::new(Mutex::new(std::collections::HashMap::new()));
let pending_contract_fetches: Arc<DashMap<ContractInstanceId, u64>> =
Arc::new(DashMap::new());
let active_relay_get_txs: Arc<DashSet<Transaction>> = Arc::new(DashSet::new());
let active_relay_update_txs: Arc<DashSet<Transaction>> = Arc::new(DashSet::new());
let active_relay_put_txs: Arc<DashSet<Transaction>> = Arc::new(DashSet::new());
let active_relay_subscribe_txs: Arc<DashSet<Transaction>> = Arc::new(DashSet::new());
let active_relay_connect_txs: Arc<DashSet<Transaction>> = Arc::new(DashSet::new());
task_monitor.register(
"garbage_cleanup",
GlobalExecutor::spawn(
garbage_cleanup_task(
rx,
ops.clone(),
ring.live_tx_tracker.clone(),
notification_channel.clone(),
event_register,
result_router_tx.clone(),
request_router.clone(),
contract_waiters.clone(),
pending_contract_fetches.clone(),
active_relay_get_txs.clone(),
active_relay_update_txs.clone(),
active_relay_put_txs.clone(),
active_relay_subscribe_txs.clone(),
active_relay_connect_txs.clone(),
)
.instrument(garbage_span),
),
);
let is_gateway = config.is_gateway;
let peer_ready = Arc::new(AtomicBool::new(is_gateway));
if is_gateway {
tracing::debug!("Gateway node: peer_ready set to true immediately");
} else {
tracing::debug!("Regular peer node: peer_ready will be set after first handshake");
}
let neighbor_hosting = Arc::new(NeighborHostingManager::new());
let interest_manager = Arc::new(crate::ring::interest::InterestManager::new(
InstantTimeSrc::new(),
));
crate::ring::interest::InterestManager::start_sweep_task(interest_manager.clone());
let streaming_threshold = config.config.network_api.streaming_threshold;
tracing::info!(
streaming_threshold_bytes = streaming_threshold,
"Streaming transport enabled for large transfers"
);
let orphan_stream_registry = Arc::new(OrphanStreamRegistry::new());
OrphanStreamRegistry::start_gc_task(orphan_stream_registry.clone());
let update_propagation_stats =
Arc::new(crate::operations::update::propagation_stats::UpdatePropagationStats::new());
task_monitor.register(
"update_propagation_summary",
update_propagation_stats.clone().start_summary_task(),
);
Ok(Self {
ring,
ops,
to_event_listener: notification_channel,
ch_outbound,
new_transactions,
result_router_tx,
connect_forward_estimator,
peer_ready,
is_gateway,
contract_waiters,
neighbor_hosting,
interest_manager,
broadcast_dedup_cache: Arc::new(crate::operations::update::BroadcastDedupCache::new()),
update_propagation_stats,
pending_broadcasts: Arc::new(
crate::operations::update::pending_broadcast::PendingBroadcastStore::new(),
),
request_router,
orphan_stream_registry,
streaming_threshold,
gateway_backoff: Arc::new(Mutex::new(PeerConnectionBackoff::new())),
gateway_backoff_cleared: Arc::new(tokio::sync::Notify::new()),
blocked_addresses: config
.blocked_addresses
.as_ref()
.map(|a| Arc::new(a.clone())),
configured_gateways: Arc::new(
config
.gateways
.iter()
.map(|gw| gw.peer_key_location.clone())
.collect(),
),
pending_contract_fetches,
active_relay_get_txs,
active_relay_update_txs,
active_relay_put_txs,
active_relay_subscribe_txs,
active_relay_connect_txs,
inflight_client_ops: Arc::new(AtomicUsize::new(0)),
shutting_down: Arc::new(AtomicBool::new(false)),
})
}
pub(crate) fn shutting_down_handle(&self) -> Arc<AtomicBool> {
self.shutting_down.clone()
}
fn client_op_guard(&self) -> ClientOpGuard {
ClientOpGuard::new(self.inflight_client_ops.clone())
}
pub(crate) fn admit_client_op(&self) -> Option<ClientOpGuard> {
let guard = self.client_op_guard();
if self.shutting_down.load(Ordering::SeqCst) {
drop(guard);
return None;
}
Some(guard)
}
pub(crate) fn inflight_client_ops_handle(&self) -> Arc<AtomicUsize> {
self.inflight_client_ops.clone()
}
pub fn set_request_router(&self, router: Arc<RequestRouter>) {
if self.request_router.set(router).is_err() {
tracing::warn!("Request router already set - ignoring duplicate set");
}
}
pub(crate) fn send_client_result(&self, tx: Transaction, host_result: HostResult) {
if let Err(err) = self.result_router_tx.try_send((tx, host_result)) {
tracing::error!(
%tx,
error = %err,
"failed to dispatch operation result to client \
(result router channel full or closed)"
);
return;
}
if let Err(err) = self
.to_event_listener
.notifications_sender
.try_send(Either::Right(NodeEvent::TransactionCompleted(tx)))
{
tracing::warn!(
%tx,
error = %err,
"failed to notify event loop about transaction completion"
);
}
}
pub(crate) fn notify_orphaned_transaction(&self, tx: Transaction, peer: SocketAddr) {
notify_orphaned_transaction_on(&self.to_event_listener.notifications_sender, tx, peer);
}
const NOTIFICATION_SEND_TIMEOUT: Duration = Duration::from_secs(30);
pub async fn notify_node_event(&self, msg: NodeEvent) -> Result<(), OpError> {
tracing::debug!(event = %msg, "notify_node_event: queuing node event");
notify_node_event_on(
self.to_event_listener.notifications_sender(),
Self::NOTIFICATION_SEND_TIMEOUT,
msg,
)
.await
}
pub fn try_notify_node_event(&self, msg: NodeEvent) -> Result<(), OpError> {
tracing::debug!(event = %msg, "try_notify_node_event: queuing node event (non-blocking)");
try_notify_node_event_on(
self.to_event_listener.notifications_sender(),
self.to_event_listener.notification_channel_pending(),
self.to_event_listener.notifications_sender().capacity(),
msg,
)
}
pub(crate) async fn flush_pending_broadcast_on_interest(&self, key: &ContractKey) {
flush_pending_broadcast_on_interest_on(
&self.pending_broadcasts,
self.to_event_listener.notifications_sender(),
self.to_event_listener.notification_channel_pending(),
self.to_event_listener.notifications_sender().capacity(),
key,
|| self.read_current_contract_state(key),
)
.await;
}
async fn read_current_contract_state(&self, key: &ContractKey) -> Option<WrappedState> {
use crate::contract::ContractHandlerEvent;
match self
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
response: Ok(store_response),
..
}) => store_response.state,
_ => None,
}
}
pub fn get_network_subscriptions(&self) -> Vec<(ContractKey, Vec<PeerKeyLocation>)> {
self.ring
.get_subscribed_contracts()
.into_iter()
.map(|contract_key| (contract_key, Vec::new()))
.collect()
}
pub async fn send_unsubscribe_upstream(&self, contract: &ContractKey) {
let upstream = self
.interest_manager
.get_interested_peers(contract)
.into_iter()
.find(|(_, interest)| interest.is_upstream);
let Some((peer_key, _)) = upstream else {
tracing::debug!(
contract = %contract,
"No upstream peer found for unsubscribe"
);
self.ring.unsubscribe(contract);
return;
};
let Some(peer_location) = self
.ring
.connection_manager
.get_peer_by_pub_key(&peer_key.0)
else {
tracing::debug!(
contract = %contract,
"Upstream peer address not found, cleaning up locally"
);
self.ring.unsubscribe(contract);
self.interest_manager
.remove_peer_interest(contract, &peer_key);
return;
};
let Some(&target_addr) = peer_location.peer_addr.as_known() else {
tracing::debug!(
contract = %contract,
"Upstream peer has no known address, cleaning up locally"
);
self.ring.unsubscribe(contract);
self.interest_manager
.remove_peer_interest(contract, &peer_key);
return;
};
let instance_id = *contract.id();
let tx = Transaction::new::<crate::operations::subscribe::SubscribeMsg>();
let msg = NetMessage::from(crate::operations::subscribe::SubscribeMsg::Unsubscribe {
id: tx,
instance_id,
});
let mut ctx = self.op_ctx(tx);
match ctx.send_fire_and_forget(target_addr, msg).await {
Ok(()) => {
tracing::debug!(
contract = %contract,
target = %target_addr,
"Sent Unsubscribe upstream"
);
}
Err(e) => {
tracing::warn!(
contract = %contract,
error = %e,
"Failed to send Unsubscribe upstream"
);
}
}
self.ring.unsubscribe(contract);
self.interest_manager
.remove_peer_interest(contract, &peer_key);
}
pub fn op_ctx(&self, tx: Transaction) -> OpCtx {
OpCtx::new(tx, self.to_event_listener.op_execution_sender.clone())
}
pub async fn notify_contract_handler(
&self,
msg: ContractHandlerEvent,
) -> Result<ContractHandlerEvent, ContractError> {
self.ch_outbound.send_to_handler(msg).await
}
pub async fn notify_contract_handler_prioritized(
&self,
msg: ContractHandlerEvent,
priority: crate::contract::Priority,
) -> Result<ContractHandlerEvent, ContractError> {
self.ch_outbound
.send_to_handler_prioritized(msg, priority)
.await
}
pub async fn notify_contract_handler_with_timeout(
&self,
msg: ContractHandlerEvent,
timeout: std::time::Duration,
) -> Result<ContractHandlerEvent, ContractError> {
self.ch_outbound
.send_to_handler_with_timeout(msg, timeout, crate::contract::Priority::DEFAULT)
.await
}
pub fn notify_contract_handler_fire_and_forget_prioritized(
&self,
ev: ContractHandlerEvent,
priority: crate::contract::Priority,
) {
if let Err(e) = self
.ch_outbound
.send_to_handler_fire_and_forget_prioritized(ev, priority)
{
tracing::warn!(error = %e, "failed to send fire-and-forget event to contract handler");
}
}
pub fn peek_next_hop_addr(&self, _id: &Transaction) -> Option<std::net::SocketAddr> {
None
}
pub fn peek_target_peer(&self, _id: &Transaction) -> Option<PeerKeyLocation> {
None
}
pub(crate) async fn release_pending_op_slot(&self, tx: Transaction) {
release_pending_op_slot_on(
self.to_event_listener.notifications_sender(),
tx,
Self::NOTIFICATION_SEND_TIMEOUT,
)
.await
}
pub fn completed(&self, id: Transaction) {
self.ring.live_tx_tracker.remove_finished_transaction(id);
self.ops.under_progress.remove(&id);
self.ops.completed.insert(id);
if let Some(router) = self.request_router.get() {
router.complete_operation(id);
}
}
pub fn sending_transaction(&self, peer: &PeerKeyLocation, msg: &NetMessage) {
let transaction = msg.id();
if let Some(target_loc) = msg.requested_location() {
self.ring
.record_request(peer.clone(), target_loc, transaction.transaction_type());
}
if let Some(peer_addr) = peer.socket_addr() {
self.ring
.live_tx_tracker
.add_transaction(peer_addr, *transaction);
}
}
pub fn wait_for_contract(&self, instance_id: ContractInstanceId) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
let mut waiters = self.contract_waiters.lock();
waiters.entry(instance_id).or_default().push(tx);
rx
}
pub fn notify_contract_stored(&self, key: &ContractKey) {
let mut waiters = self.contract_waiters.lock();
if let Some(senders) = waiters.remove(key.id()) {
let count = senders.len();
for sender in senders {
#[allow(clippy::let_underscore_must_use)]
let _ = sender.send(());
}
if count > 0 {
tracing::debug!(
%key,
count,
"Notified waiters that contract has been stored"
);
}
}
}
pub fn pending_op_counts(&self) -> [u32; 5] {
[0; 5]
}
pub fn contract_waiters_count(&self) -> u32 {
self.contract_waiters.lock().len() as u32
}
#[allow(dead_code)] pub fn orphan_stream_registry(&self) -> &Arc<OrphanStreamRegistry> {
&self.orphan_stream_registry
}
#[allow(dead_code)] pub fn should_use_streaming(&self, payload_size: usize) -> bool {
payload_size > self.streaming_threshold
}
pub(crate) fn on_ring_connection_established(
&self,
peer_addr: SocketAddr,
pub_key: &TransportPublicKey,
) -> Vec<(SocketAddr, NetMessage)> {
self.interest_manager
.cancel_deferred_removal(&PeerKey::from(pub_key.clone()));
let mut messages = Vec::with_capacity(2);
let interest_hashes = self.interest_manager.get_all_interest_hashes();
if !interest_hashes.is_empty() {
messages.push((
peer_addr,
NetMessage::V1(NetMessageV1::InterestSync {
message: InterestMessage::Interests {
hashes: interest_hashes,
},
}),
));
}
if let Some(cache_msg) = self
.neighbor_hosting
.on_ring_connection_established(pub_key)
{
messages.push((
peer_addr,
NetMessage::V1(NetMessageV1::NeighborHosting { message: cache_msg }),
));
}
if self.ring.connection_manager.is_self_ready()
&& self.ring.connection_manager.min_ready_connections > 0
{
messages.push((
peer_addr,
NetMessage::V1(NetMessageV1::ReadyState { ready: true }),
));
}
messages
}
pub(crate) fn on_ring_connection_lost(&self, pub_key: &TransportPublicKey) {
self.neighbor_hosting.on_peer_disconnected(pub_key);
self.interest_manager
.schedule_deferred_removal(&PeerKey::from(pub_key.clone()));
}
}
async fn release_pending_op_slot_on(
notifications_sender: &mpsc::Sender<Either<NetMessage, NodeEvent>>,
tx: Transaction,
timeout: Duration,
) {
match tokio::time::timeout(
timeout,
notifications_sender.send(Either::Right(NodeEvent::TransactionCompleted(tx))),
)
.await
{
Ok(Ok(())) => {}
Ok(Err(_)) => {
tracing::warn!(
%tx,
"release_pending_op_slot: notification channel closed; \
pending_op_results entry will be reclaimed by 60s sweep"
);
}
Err(_) => {
tracing::error!(
%tx,
timeout_secs = timeout.as_secs(),
"release_pending_op_slot: notification channel full for too long; \
event loop may be stuck; pending_op_results entry will be \
reclaimed by 60s sweep"
);
}
}
}
fn notify_orphaned_transaction_on(
notifications_sender: &mpsc::Sender<Either<NetMessage, NodeEvent>>,
tx: Transaction,
peer: SocketAddr,
) -> bool {
match notifications_sender.try_send(Either::Right(NodeEvent::TransactionOrphaned { tx, peer }))
{
Ok(()) => true,
Err(mpsc::error::TrySendError::Full(_)) => {
tracing::debug!(
%tx,
%peer,
"notify_orphaned_transaction: notification channel full; \
driver will wait for OPERATION_TTL timeout"
);
false
}
Err(mpsc::error::TrySendError::Closed(_)) => {
tracing::warn!(
%tx,
%peer,
"notify_orphaned_transaction: notification channel closed; \
receiver likely dropped"
);
false
}
}
}
async fn notify_node_event_on(
notifications_sender: &mpsc::Sender<Either<NetMessage, NodeEvent>>,
timeout: Duration,
msg: NodeEvent,
) -> Result<(), OpError> {
match tokio::time::timeout(timeout, notifications_sender.send(Either::Right(msg))).await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e.into()),
Err(_) => {
let channel_remaining = notifications_sender.capacity();
let channel_pending = notifications_sender
.max_capacity()
.saturating_sub(channel_remaining);
tracing::error!(
timeout_secs = timeout.as_secs(),
channel_pending,
channel_remaining,
"notify_node_event: Notification channel full for too long, event loop may be stuck"
);
Err(OpError::NotificationChannelError(
"notification channel send timed out — event loop is likely stuck".into(),
))
}
}
}
fn try_notify_node_event_on(
notifications_sender: &mpsc::Sender<Either<NetMessage, NodeEvent>>,
channel_pending: usize,
channel_remaining: usize,
msg: NodeEvent,
) -> Result<(), OpError> {
match notifications_sender.try_send(Either::Right(msg)) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => {
tracing::debug!(
channel_pending,
channel_remaining,
"try_notify_node_event: event-loop notification channel full; \
dropping best-effort broadcast event (#4145)"
);
Err(OpError::NotificationError)
}
Err(mpsc::error::TrySendError::Closed(_)) => {
tracing::warn!(
"try_notify_node_event: event-loop notification channel closed; \
receiver likely dropped"
);
Err(OpError::NotificationError)
}
}
}
async fn flush_pending_broadcast_on_interest_on<F, Fut>(
pending: &crate::operations::update::pending_broadcast::PendingBroadcastStore,
notifications_sender: &mpsc::Sender<Either<NetMessage, NodeEvent>>,
channel_pending: usize,
channel_remaining: usize,
key: &ContractKey,
read_current_state: F,
) where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Option<WrappedState>>,
{
if !pending.contains(key.id()) {
return;
}
let Some(stashed) = pending.take(key.id()) else {
return;
};
let state = read_current_state().await.unwrap_or(stashed);
emit_pending_broadcast_reemit_on(
notifications_sender,
channel_pending,
channel_remaining,
pending,
key,
state,
);
}
fn emit_pending_broadcast_reemit_on(
notifications_sender: &mpsc::Sender<Either<NetMessage, NodeEvent>>,
channel_pending: usize,
channel_remaining: usize,
pending: &crate::operations::update::pending_broadcast::PendingBroadcastStore,
key: &ContractKey,
state: WrappedState,
) {
tracing::debug!(
contract = %key,
phase = "pending_broadcast_flush",
"Re-broadcasting deferred fresh-contract state now that an interested peer/target appeared (#4359)"
);
let msg = NodeEvent::BroadcastStateChange {
key: *key,
new_state: state.clone(),
is_retry: false,
is_reemit: true,
};
if try_notify_node_event_on(
notifications_sender,
channel_pending,
channel_remaining,
msg,
)
.is_err()
{
pending.stash(*key.id(), state);
tracing::debug!(
contract = %key,
"emit_pending_broadcast_reemit_on: re-emit dropped; re-stashed for the next signal"
);
}
}
fn notify_transaction_timeout(
event_loop_notifier: &EventLoopNotificationsSender,
tx: Transaction,
) -> bool {
match event_loop_notifier
.notifications_sender
.try_send(Either::Right(NodeEvent::TransactionTimedOut(tx)))
{
Ok(()) => true,
Err(mpsc::error::TrySendError::Full(_)) => {
tracing::debug!(
tx = %tx,
"Notification channel full, skipping timeout notification for event loop"
);
false
}
Err(mpsc::error::TrySendError::Closed(_)) => {
tracing::warn!(
tx = %tx,
"Notification channel closed, receiver likely dropped"
);
false
}
}
}
#[allow(clippy::too_many_arguments)]
async fn garbage_cleanup_task<ER: NetEventRegister>(
mut new_transactions: tokio::sync::mpsc::Receiver<Transaction>,
ops: Arc<Ops>,
live_tx_tracker: LiveTransactionTracker,
event_loop_notifier: EventLoopNotificationsSender,
mut event_register: ER,
_result_router_tx: mpsc::Sender<(Transaction, HostResult)>,
request_router: Arc<OnceLock<Arc<RequestRouter>>>,
contract_waiters: Arc<
Mutex<std::collections::HashMap<ContractInstanceId, Vec<oneshot::Sender<()>>>>,
>,
pending_contract_fetches: Arc<DashMap<ContractInstanceId, u64>>,
active_relay_get_txs: Arc<DashSet<Transaction>>,
active_relay_update_txs: Arc<DashSet<Transaction>>,
active_relay_put_txs: Arc<DashSet<Transaction>>,
active_relay_subscribe_txs: Arc<DashSet<Transaction>>,
active_relay_connect_txs: Arc<DashSet<Transaction>>,
) {
const CLEANUP_INTERVAL: Duration = Duration::from_secs(5);
const WAITER_CLEANUP_EVERY_N_TICKS: u32 = 12; let mut tick = tokio::time::interval(CLEANUP_INTERVAL);
tick.tick().await;
let mut tick_count: u32 = 0;
let mut ttl_set = BTreeSet::new();
let mut delayed = vec![];
loop {
crate::deterministic_select! {
tx = new_transactions.recv() => {
if let Some(tx) = tx {
ttl_set.insert(Reverse(tx));
}
},
_ = tick.tick() => {
tick_count = tick_count.wrapping_add(1);
if std::env::var("FREENET_MEMORY_STATS").is_ok() {
use std::sync::atomic::Ordering;
let ops_sizes = ops.sizes();
let pending_fetches = pending_contract_fetches.len();
let waiters_len = contract_waiters.lock().len();
let relay_inflight =
crate::operations::get::op_ctx_task::RELAY_INFLIGHT
.load(Ordering::Relaxed);
let relay_spawned =
crate::operations::get::op_ctx_task::RELAY_SPAWNED_TOTAL
.load(Ordering::Relaxed);
let relay_completed =
crate::operations::get::op_ctx_task::RELAY_COMPLETED_TOTAL
.load(Ordering::Relaxed);
let relay_dedup_rejects =
crate::operations::get::op_ctx_task::RELAY_DEDUP_REJECTS
.load(Ordering::Relaxed);
let relay_active_txs = active_relay_get_txs.len();
let relay_update_inflight =
crate::operations::update::op_ctx_task::RELAY_UPDATE_INFLIGHT
.load(Ordering::Relaxed);
let relay_update_spawned =
crate::operations::update::op_ctx_task::RELAY_UPDATE_SPAWNED_TOTAL
.load(Ordering::Relaxed);
let relay_update_completed =
crate::operations::update::op_ctx_task::RELAY_UPDATE_COMPLETED_TOTAL
.load(Ordering::Relaxed);
let relay_update_dedup_rejects =
crate::operations::update::op_ctx_task::RELAY_UPDATE_DEDUP_REJECTS
.load(Ordering::Relaxed);
let relay_update_active_txs = active_relay_update_txs.len();
let relay_put_inflight =
crate::operations::put::op_ctx_task::RELAY_PUT_INFLIGHT
.load(Ordering::Relaxed);
let relay_put_spawned =
crate::operations::put::op_ctx_task::RELAY_PUT_SPAWNED_TOTAL
.load(Ordering::Relaxed);
let relay_put_completed =
crate::operations::put::op_ctx_task::RELAY_PUT_COMPLETED_TOTAL
.load(Ordering::Relaxed);
let relay_put_dedup_rejects =
crate::operations::put::op_ctx_task::RELAY_PUT_DEDUP_REJECTS
.load(Ordering::Relaxed);
let relay_put_active_txs = active_relay_put_txs.len();
let relay_subscribe_inflight =
crate::operations::subscribe::op_ctx_task::RELAY_SUBSCRIBE_INFLIGHT
.load(Ordering::Relaxed);
let relay_subscribe_spawned =
crate::operations::subscribe::op_ctx_task::RELAY_SUBSCRIBE_SPAWNED_TOTAL
.load(Ordering::Relaxed);
let relay_subscribe_completed =
crate::operations::subscribe::op_ctx_task::RELAY_SUBSCRIBE_COMPLETED_TOTAL
.load(Ordering::Relaxed);
let relay_subscribe_dedup_rejects =
crate::operations::subscribe::op_ctx_task::RELAY_SUBSCRIBE_DEDUP_REJECTS
.load(Ordering::Relaxed);
let relay_subscribe_active_txs = active_relay_subscribe_txs.len();
let relay_connect_active_txs = active_relay_connect_txs.len();
tracing::info!(
target: "memory_stats",
tick = tick_count,
ops_connect = 0,
ops_put = 0,
ops_get = 0,
ops_subscribe = 0,
ops_update = 0,
ops_completed = ops_sizes.completed,
ops_under_progress = ops_sizes.under_progress,
pending_contract_fetches = pending_fetches,
contract_waiters = waiters_len,
relay_inflight = relay_inflight,
relay_spawned = relay_spawned,
relay_completed = relay_completed,
relay_dedup_rejects = relay_dedup_rejects,
relay_active_txs = relay_active_txs,
relay_update_inflight = relay_update_inflight,
relay_update_spawned = relay_update_spawned,
relay_update_completed = relay_update_completed,
relay_update_dedup_rejects = relay_update_dedup_rejects,
relay_update_active_txs = relay_update_active_txs,
relay_put_inflight = relay_put_inflight,
relay_put_spawned = relay_put_spawned,
relay_put_completed = relay_put_completed,
relay_put_dedup_rejects = relay_put_dedup_rejects,
relay_put_active_txs = relay_put_active_txs,
relay_subscribe_inflight = relay_subscribe_inflight,
relay_subscribe_spawned = relay_subscribe_spawned,
relay_subscribe_completed = relay_subscribe_completed,
relay_subscribe_dedup_rejects = relay_subscribe_dedup_rejects,
relay_subscribe_active_txs = relay_subscribe_active_txs,
relay_connect_active_txs = relay_connect_active_txs,
"memory stats"
);
}
if tick_count % WAITER_CLEANUP_EVERY_N_TICKS == 0 {
let mut waiters = contract_waiters.lock();
let before = waiters.len();
waiters.retain(|_id, senders| {
senders.retain(|sender| !sender.is_closed());
!senders.is_empty()
});
let after = waiters.len();
if before != after {
tracing::info!(
before,
after,
removed = before - after,
"Cleaned up stale contract_waiters entries"
);
}
}
if tick_count % 12 == 0 {
let cooldown_ms = crate::operations::update::CONTRACT_FETCH_COOLDOWN_MS;
let now_ms = crate::config::GlobalSimulationTime::read_time_ms();
pending_contract_fetches.retain(|_, ts| {
now_ms.saturating_sub(*ts) < cooldown_ms * 2
});
}
let old_missing = std::mem::take(&mut delayed);
for tx in old_missing {
if let Some(tx) = ops.completed.remove(&tx) {
if cfg!(feature = "trace-ot") {
let op_type = tx.transaction_type().description();
event_register.notify_of_time_out(tx, op_type, None).await;
} else {
_ = tx;
}
continue;
}
let still_waiting = false;
if still_waiting {
delayed.push(tx);
} else {
ops.under_progress.remove(&tx);
ops.completed.remove(&tx);
tracing::info!(
tx = %tx,
tx_type = ?tx.transaction_type(),
elapsed_ms = tx.elapsed().as_millis(),
ttl_ms = crate::config::OPERATION_TTL.as_millis(),
"Transaction timed out"
);
notify_transaction_timeout(&event_loop_notifier, tx);
live_tx_tracker.remove_finished_transaction(tx);
if let Some(router) = request_router.get() {
router.complete_operation(tx);
}
}
}
let older_than: Reverse<Transaction> = Reverse(Transaction::ttl_transaction());
let absolute_cutoff: Reverse<Transaction> =
Reverse(Transaction::ttl_transaction_with_multiplier(5));
for Reverse(tx) in ttl_set.split_off(&older_than).into_iter() {
if ops.under_progress.contains(&tx) {
if Reverse(tx) < absolute_cutoff {
delayed.push(tx);
continue;
}
tracing::warn!(tx = %tx, "Cleaning up under_progress op that exceeded absolute timeout (5× TTL)");
ops.under_progress.remove(&tx);
}
if let Some(tx) = ops.completed.remove(&tx) {
tracing::debug!("Clean up timed out: {tx}");
if cfg!(feature = "trace-ot") {
let op_type = tx.transaction_type().description();
event_register.notify_of_time_out(tx, op_type, None).await;
} else {
_ = tx;
}
}
let removed = false;
if removed {
tracing::info!(
tx = %tx,
tx_type = ?tx.transaction_type(),
elapsed_ms = tx.elapsed().as_millis(),
ttl_ms = crate::config::OPERATION_TTL.as_millis(),
"Transaction timed out"
);
notify_transaction_timeout(&event_loop_notifier, tx);
live_tx_tracker.remove_finished_transaction(tx);
if let Some(router) = request_router.get() {
router.complete_operation(tx);
}
}
}
},
}
}
}
#[cfg(test)]
mod tests {
use super::super::network_bridge::event_loop_notification_channel;
use super::*;
use crate::config::GlobalSimulationTime;
use crate::node::network_bridge::EventLoopNotificationsReceiver;
use either::Either;
use tokio::time::{Duration, Instant, timeout};
#[tokio::test]
async fn notify_timeout_succeeds_when_receiver_alive() {
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let tx = Transaction::ttl_transaction();
let delivered = notify_transaction_timeout(¬ifier, tx);
assert!(
delivered,
"notification should be delivered while receiver is alive"
);
let received = timeout(Duration::from_millis(100), notifications_receiver.recv())
.await
.expect("timed out waiting for notification")
.expect("notification channel closed");
match received {
Either::Right(NodeEvent::TransactionTimedOut(observed)) => {
assert_eq!(observed, tx, "unexpected transaction in notification");
}
other @ Either::Left(_) | other @ Either::Right(_) => {
panic!("unexpected notification: {other:?}")
}
}
}
#[tokio::test]
async fn notify_timeout_handles_dropped_receiver() {
let (receiver, notifier) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::ttl_transaction();
let delivered = notify_transaction_timeout(¬ifier, tx);
assert!(
!delivered,
"notification delivery should fail once receiver is dropped"
);
}
#[tokio::test]
async fn release_pending_op_slot_emits_transaction_completed() {
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let tx = Transaction::ttl_transaction();
super::release_pending_op_slot_on(
notifier.notifications_sender(),
tx,
Duration::from_secs(1),
)
.await;
let received = timeout(Duration::from_millis(100), notifications_receiver.recv())
.await
.expect("timed out waiting for TransactionCompleted emission")
.expect("notification channel closed");
match received {
Either::Right(NodeEvent::TransactionCompleted(observed)) => {
assert_eq!(observed, tx, "emitted tx must match the argument");
}
other @ Either::Left(_) | other @ Either::Right(_) => {
panic!("expected TransactionCompleted, got {other:?}")
}
}
}
#[tokio::test]
async fn release_pending_op_slot_blocks_through_backpressure() {
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let filler_tx = Transaction::ttl_transaction();
let mut pre_filled = 0usize;
loop {
match notifier
.notifications_sender()
.try_send(Either::Right(NodeEvent::TransactionCompleted(filler_tx)))
{
Ok(()) => pre_filled += 1,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => break,
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
panic!("channel unexpectedly closed while pre-filling")
}
}
if pre_filled > 4096 {
panic!("channel did not backpressure after 4096 entries");
}
}
assert!(
pre_filled > 0,
"expected a bounded channel; got what appears to be unbounded"
);
let release_tx = Transaction::ttl_transaction();
let consumer = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(20)).await;
notifications_receiver
.recv()
.await
.expect("notification channel closed during drain");
loop {
match notifications_receiver.recv().await {
Some(Either::Right(NodeEvent::TransactionCompleted(observed)))
if observed == release_tx =>
{
return;
}
Some(_) => continue,
None => panic!("channel closed before release event observed"),
}
}
});
let release = timeout(
Duration::from_secs(2),
super::release_pending_op_slot_on(
notifier.notifications_sender(),
release_tx,
Duration::from_secs(30),
),
)
.await;
release.expect("helper must complete once channel has room");
consumer
.await
.expect("consumer task should terminate cleanly");
}
#[tokio::test]
async fn release_pending_op_slot_returns_on_closed_channel() {
let (receiver, notifier) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::ttl_transaction();
let result = timeout(
Duration::from_millis(200),
super::release_pending_op_slot_on(
notifier.notifications_sender(),
tx,
Duration::from_secs(30),
),
)
.await;
assert!(
result.is_ok(),
"helper must return promptly on closed channel"
);
}
fn test_contract_key(seed: u8) -> ContractKey {
ContractKey::from_id_and_code(
ContractInstanceId::new([seed; 32]),
freenet_stdlib::prelude::CodeHash::new([seed.wrapping_add(1); 32]),
)
}
#[tokio::test]
async fn flush_pending_broadcast_reemits_stashed_state_on_interest() {
use crate::operations::update::pending_broadcast::PendingBroadcastStore;
GlobalSimulationTime::set_time_ms(0);
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let store = PendingBroadcastStore::new();
let key = test_contract_key(7);
let state = freenet_stdlib::prelude::WrappedState::new(vec![0xCD; 16]);
super::emit_pending_broadcast_reemit_on(
notifier.notifications_sender(),
notifier.notification_channel_pending(),
notifier.notifications_sender().capacity(),
&store,
&key,
state.clone(),
);
let received = timeout(Duration::from_millis(200), notifications_receiver.recv())
.await
.expect("timed out waiting for re-broadcast emission")
.expect("notification channel closed");
match received {
Either::Right(NodeEvent::BroadcastStateChange {
key: observed_key,
new_state,
is_retry,
is_reemit,
}) => {
assert_eq!(observed_key, key, "re-broadcast must target the contract");
assert_eq!(
new_state.as_ref(),
state.as_ref(),
"re-broadcast must carry the resolved state"
);
assert!(
!is_retry,
"the deferred flush is a fresh logical broadcast, not a retry re-emission"
);
assert!(
is_reemit,
"the deferred flush must be tagged is_reemit so the give-up handler does \
not double-count a still-no-targets re-emission in the #4281 stats"
);
}
other @ Either::Left(_) | other @ Either::Right(_) => {
panic!("expected BroadcastStateChange, got {other:?}")
}
}
GlobalSimulationTime::clear_time();
}
#[tokio::test]
async fn flush_pending_broadcast_restashes_when_channel_full() {
use crate::operations::update::pending_broadcast::PendingBroadcastStore;
GlobalSimulationTime::set_time_ms(0);
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
notifications_receiver,
..
} = receiver;
let filler_tx = Transaction::ttl_transaction();
let mut pre_filled = 0usize;
loop {
match notifier
.notifications_sender()
.try_send(Either::Right(NodeEvent::TransactionCompleted(filler_tx)))
{
Ok(()) => pre_filled += 1,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => break,
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
panic!("channel unexpectedly closed while pre-filling")
}
}
if pre_filled > 4096 {
panic!("channel did not backpressure after 4096 entries");
}
}
let store = PendingBroadcastStore::new();
let key = test_contract_key(11);
let state = freenet_stdlib::prelude::WrappedState::new(vec![0xEF; 8]);
super::emit_pending_broadcast_reemit_on(
notifier.notifications_sender(),
notifier.notification_channel_pending(),
notifier.notifications_sender().capacity(),
&store,
&key,
state.clone(),
);
let recovered = store
.take(key.id())
.expect("state must be re-stashed after a full-channel drop, not lost");
assert_eq!(recovered.as_ref(), state.as_ref());
drop(notifications_receiver);
GlobalSimulationTime::clear_time();
}
#[tokio::test]
async fn flush_noop_when_nothing_stashed() {
use crate::operations::update::pending_broadcast::PendingBroadcastStore;
GlobalSimulationTime::set_time_ms(0);
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let store = PendingBroadcastStore::new();
let key = test_contract_key(21);
let read_called = std::cell::Cell::new(false);
super::flush_pending_broadcast_on_interest_on(
&store,
notifier.notifications_sender(),
notifier.notification_channel_pending(),
notifier.notifications_sender().capacity(),
&key,
|| {
read_called.set(true);
std::future::ready(None)
},
)
.await;
assert!(
!read_called.get(),
"no-op fast path must NOT read current contract state when nothing is stashed"
);
match notifications_receiver.try_recv() {
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
other => panic!("expected no emission on the no-op path, got {other:?}"),
}
GlobalSimulationTime::clear_time();
}
#[tokio::test]
async fn flush_reemits_current_state_when_live_read_present() {
use crate::operations::update::pending_broadcast::PendingBroadcastStore;
GlobalSimulationTime::set_time_ms(0);
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let store = PendingBroadcastStore::new();
let key = test_contract_key(22);
let stale = freenet_stdlib::prelude::WrappedState::new(vec![0xAA; 8]);
let current = freenet_stdlib::prelude::WrappedState::new(vec![0xBB; 12]);
store.stash(*key.id(), stale.clone());
let current_for_read = current.clone();
super::flush_pending_broadcast_on_interest_on(
&store,
notifier.notifications_sender(),
notifier.notification_channel_pending(),
notifier.notifications_sender().capacity(),
&key,
|| std::future::ready(Some(current_for_read.clone())),
)
.await;
let received = timeout(Duration::from_millis(200), notifications_receiver.recv())
.await
.expect("timed out waiting for re-broadcast emission")
.expect("notification channel closed");
match received {
Either::Right(NodeEvent::BroadcastStateChange {
key: observed_key,
new_state,
is_reemit,
..
}) => {
assert_eq!(observed_key, key);
assert_eq!(
new_state.as_ref(),
current.as_ref(),
"must re-emit the CURRENT live state, not the stale stashed bytes"
);
assert_ne!(
new_state.as_ref(),
stale.as_ref(),
"stale give-up-time bytes must not be broadcast when a live read succeeds"
);
assert!(is_reemit, "deferred flush must be tagged is_reemit");
}
other @ Either::Left(_) | other @ Either::Right(_) => {
panic!("expected BroadcastStateChange, got {other:?}")
}
}
assert!(
store.take(key.id()).is_none(),
"the stash entry must be drained by the flush"
);
GlobalSimulationTime::clear_time();
}
#[tokio::test]
async fn flush_falls_back_to_stashed_bytes_when_live_read_absent() {
use crate::operations::update::pending_broadcast::PendingBroadcastStore;
GlobalSimulationTime::set_time_ms(0);
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let store = PendingBroadcastStore::new();
let key = test_contract_key(23);
let stashed = freenet_stdlib::prelude::WrappedState::new(vec![0xCC; 10]);
store.stash(*key.id(), stashed.clone());
super::flush_pending_broadcast_on_interest_on(
&store,
notifier.notifications_sender(),
notifier.notification_channel_pending(),
notifier.notifications_sender().capacity(),
&key,
|| std::future::ready(None),
)
.await;
let received = timeout(Duration::from_millis(200), notifications_receiver.recv())
.await
.expect("timed out: the None live-read path must fall back to stashed bytes, not drop")
.expect("notification channel closed");
match received {
Either::Right(NodeEvent::BroadcastStateChange {
key: observed_key,
new_state,
..
}) => {
assert_eq!(observed_key, key);
assert_eq!(
new_state.as_ref(),
stashed.as_ref(),
"must fall back to the stashed bytes when the live read returns None"
);
}
other @ Either::Left(_) | other @ Either::Right(_) => {
panic!("expected BroadcastStateChange, got {other:?}")
}
}
GlobalSimulationTime::clear_time();
}
fn test_peer() -> SocketAddr {
"203.0.113.7:9999"
.parse()
.expect("test peer addr must be valid")
}
#[tokio::test]
async fn notify_orphaned_transaction_emits_transaction_orphaned() {
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let tx = Transaction::ttl_transaction();
let peer = test_peer();
let delivered =
super::notify_orphaned_transaction_on(notifier.notifications_sender(), tx, peer);
assert!(delivered, "helper must enqueue on a live channel");
let received = timeout(Duration::from_millis(100), notifications_receiver.recv())
.await
.expect("timed out waiting for TransactionOrphaned emission")
.expect("notification channel closed");
match received {
Either::Right(NodeEvent::TransactionOrphaned {
tx: observed_tx,
peer: observed_peer,
}) => {
assert_eq!(observed_tx, tx, "emitted tx must match the argument");
assert_eq!(observed_peer, peer, "emitted peer must match the argument");
}
other @ Either::Left(_) | other @ Either::Right(_) => {
panic!("expected TransactionOrphaned, got {other:?}")
}
}
}
#[tokio::test]
async fn notify_orphaned_transaction_handles_dropped_receiver() {
let (receiver, notifier) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::ttl_transaction();
let delivered =
super::notify_orphaned_transaction_on(notifier.notifications_sender(), tx, test_peer());
assert!(
!delivered,
"helper must return false once receiver is dropped"
);
}
#[tokio::test]
async fn orphaned_transaction_wakes_parked_waiter_with_peer_disconnected() {
let (mut event_loop_receiver, notifier) = event_loop_notification_channel();
let (response_sender, mut driver_response_rx) =
tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let mut pending_op_results: std::collections::HashMap<
Transaction,
tokio::sync::mpsc::Sender<crate::node::WaiterReply>,
> = std::collections::HashMap::new();
let tx = Transaction::ttl_transaction();
let peer = test_peer();
pending_op_results.insert(tx, response_sender);
let delivered =
super::notify_orphaned_transaction_on(notifier.notifications_sender(), tx, peer);
assert!(delivered, "orphan-handler helper must enqueue notification");
let event = timeout(
Duration::from_millis(100),
event_loop_receiver.notifications_receiver.recv(),
)
.await
.expect("event loop never received TransactionOrphaned")
.expect("notification channel closed before TransactionOrphaned arrived");
match event {
Either::Right(NodeEvent::TransactionOrphaned {
tx: observed_tx,
peer: observed_peer,
}) => {
assert_eq!(observed_tx, tx);
assert_eq!(observed_peer, peer);
if let Some(sender) = pending_op_results.remove(&observed_tx) {
#[allow(clippy::let_underscore_must_use)]
let _ = sender.try_send(crate::node::WaiterReply::PeerDisconnected {
peer: observed_peer,
});
}
}
other @ Either::Left(_) | other @ Either::Right(_) => {
panic!("expected TransactionOrphaned, got {other:?}")
}
}
let driver_wakeup = timeout(Duration::from_millis(100), driver_response_rx.recv()).await;
match driver_wakeup {
Ok(Some(crate::node::WaiterReply::PeerDisconnected { peer: observed })) => {
assert_eq!(observed, peer, "driver must receive the disconnect cause");
}
Ok(other) => panic!("driver received unexpected item: {other:?}"),
Err(_) => panic!(
"driver did not wake after orphan handling — \
pre-#4154 behavior reproduced"
),
}
}
#[tokio::test(start_paused = true)]
async fn notify_node_event_returns_err_after_timeout_on_saturated_channel() {
let (_receiver, notifier) = event_loop_notification_channel();
let filler_tx = Transaction::ttl_transaction();
let mut pre_filled = 0usize;
loop {
match notifier
.notifications_sender()
.try_send(Either::Right(NodeEvent::TransactionCompleted(filler_tx)))
{
Ok(()) => pre_filled += 1,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => break,
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
panic!("channel unexpectedly closed while pre-filling")
}
}
if pre_filled > 4096 {
panic!("channel did not backpressure after 4096 entries");
}
}
let _ = pre_filled;
let test_timeout = Duration::from_secs(30);
let tx = Transaction::ttl_transaction();
let start = Instant::now();
let result = super::notify_node_event_on(
notifier.notifications_sender(),
test_timeout,
NodeEvent::TransactionCompleted(tx),
)
.await;
let elapsed = start.elapsed();
assert!(
matches!(result, Err(OpError::NotificationChannelError(_))),
"blocking helper must return NotificationChannelError after timeout \
on saturated channel, got {result:?}"
);
assert!(
elapsed >= test_timeout,
"helper returned before the configured timeout elapsed ({elapsed:?} < {test_timeout:?})"
);
}
#[tokio::test]
async fn try_notify_node_event_enqueues_on_live_channel() {
let (receiver, notifier) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let tx = Transaction::ttl_transaction();
let event = NodeEvent::TransactionCompleted(tx);
super::try_notify_node_event_on(notifier.notifications_sender(), 0, 1024, event)
.expect("helper must enqueue on a live, non-full channel");
let received = timeout(Duration::from_millis(100), notifications_receiver.recv())
.await
.expect("timed out waiting for emission")
.expect("notification channel closed");
match received {
Either::Right(NodeEvent::TransactionCompleted(observed)) => {
assert_eq!(observed, tx, "emitted tx must match the argument");
}
other @ Either::Left(_) | other @ Either::Right(_) => {
panic!("expected TransactionCompleted, got {other:?}")
}
}
}
#[tokio::test]
async fn try_notify_node_event_returns_err_on_full_channel_without_blocking() {
let (_receiver, notifier) = event_loop_notification_channel();
let filler_tx = Transaction::ttl_transaction();
let mut pre_filled = 0usize;
loop {
match notifier
.notifications_sender()
.try_send(Either::Right(NodeEvent::TransactionCompleted(filler_tx)))
{
Ok(()) => pre_filled += 1,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => break,
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
panic!("channel unexpectedly closed while pre-filling")
}
}
if pre_filled > 4096 {
panic!("channel did not backpressure after 4096 entries");
}
}
assert!(
pre_filled > 0,
"expected a bounded channel; got what appears to be unbounded"
);
let tx = Transaction::ttl_transaction();
let start = Instant::now();
let result = timeout(
Duration::from_millis(100),
async {
super::try_notify_node_event_on(
notifier.notifications_sender(),
pre_filled,
0,
NodeEvent::TransactionCompleted(tx),
)
},
)
.await
.expect("try-variant must NOT block — pre-fix it could stall the executor 30s (#4145)");
let elapsed = start.elapsed();
assert!(
result.is_err(),
"try-variant must return Err when channel is full"
);
assert!(
elapsed < Duration::from_millis(100),
"try-variant must complete near-instantly, took {elapsed:?}"
);
}
#[tokio::test]
async fn try_notify_node_event_returns_err_on_closed_channel() {
let (receiver, notifier) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::ttl_transaction();
let result = super::try_notify_node_event_on(
notifier.notifications_sender(),
0,
0,
NodeEvent::TransactionCompleted(tx),
);
assert!(
result.is_err(),
"helper must return Err once receiver is dropped"
);
}
#[tokio::test]
async fn notify_orphaned_transaction_full_does_not_emit_warn() {
let logger = crate::test_utils::TestLogger::new()
.with_level("warn")
.capture_logs()
.init();
let (_receiver, notifier) = event_loop_notification_channel();
let filler_tx = Transaction::ttl_transaction();
let mut pre_filled = 0usize;
loop {
match notifier
.notifications_sender()
.try_send(Either::Right(NodeEvent::TransactionCompleted(filler_tx)))
{
Ok(()) => pre_filled += 1,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => break,
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
panic!("channel unexpectedly closed while pre-filling")
}
}
if pre_filled > 4096 {
panic!("channel did not backpressure after 4096 entries");
}
}
let tx = Transaction::ttl_transaction();
let delivered =
super::notify_orphaned_transaction_on(notifier.notifications_sender(), tx, test_peer());
assert!(!delivered, "helper must return false on a full channel");
assert!(
!logger.contains("notify_orphaned_transaction: notification channel full"),
"Full arm must not emit WARN (would re-spam gateways at 30K+/hr — #4238); \
captured: {:?}",
logger.logs()
);
}
#[tokio::test]
async fn notify_orphaned_transaction_closed_still_emits_warn() {
let logger = crate::test_utils::TestLogger::new()
.with_level("warn")
.capture_logs()
.init();
let (receiver, notifier) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::ttl_transaction();
let delivered =
super::notify_orphaned_transaction_on(notifier.notifications_sender(), tx, test_peer());
assert!(!delivered, "helper must return false on a closed channel");
assert!(
logger.contains("notify_orphaned_transaction: notification channel closed"),
"Closed arm must still emit WARN — receiver-dropped is not benign back-pressure; \
captured: {:?}",
logger.logs()
);
}
#[tokio::test]
async fn try_notify_node_event_full_does_not_emit_warn() {
let logger = crate::test_utils::TestLogger::new()
.with_level("warn")
.capture_logs()
.init();
let (_receiver, notifier) = event_loop_notification_channel();
let filler_tx = Transaction::ttl_transaction();
let mut pre_filled = 0usize;
loop {
match notifier
.notifications_sender()
.try_send(Either::Right(NodeEvent::TransactionCompleted(filler_tx)))
{
Ok(()) => pre_filled += 1,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => break,
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
panic!("channel unexpectedly closed while pre-filling")
}
}
if pre_filled > 4096 {
panic!("channel did not backpressure after 4096 entries");
}
}
let tx = Transaction::ttl_transaction();
let result = super::try_notify_node_event_on(
notifier.notifications_sender(),
pre_filled,
0,
NodeEvent::TransactionCompleted(tx),
);
assert!(result.is_err(), "helper must return Err on a full channel");
assert!(
!logger.contains("try_notify_node_event: event-loop notification channel full"),
"Full arm must not emit WARN (would re-introduce #4238 spam under fan-out); \
captured: {:?}",
logger.logs()
);
}
#[tokio::test]
async fn try_notify_node_event_closed_still_emits_warn() {
let logger = crate::test_utils::TestLogger::new()
.with_level("warn")
.capture_logs()
.init();
let (receiver, notifier) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::ttl_transaction();
let result = super::try_notify_node_event_on(
notifier.notifications_sender(),
0,
0,
NodeEvent::TransactionCompleted(tx),
);
assert!(
result.is_err(),
"helper must return Err once receiver is dropped"
);
assert!(
logger.contains("try_notify_node_event: event-loop notification channel closed"),
"Closed arm must still emit WARN; captured: {:?}",
logger.logs()
);
}
#[tokio::test]
async fn notify_transaction_timeout_full_does_not_emit_warn() {
let logger = crate::test_utils::TestLogger::new()
.with_level("warn")
.capture_logs()
.init();
let (_receiver, notifier) = event_loop_notification_channel();
let filler_tx = Transaction::ttl_transaction();
let mut pre_filled = 0usize;
loop {
match notifier
.notifications_sender()
.try_send(Either::Right(NodeEvent::TransactionCompleted(filler_tx)))
{
Ok(()) => pre_filled += 1,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => break,
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
panic!("channel unexpectedly closed while pre-filling")
}
}
if pre_filled > 4096 {
panic!("channel did not backpressure after 4096 entries");
}
}
let tx = Transaction::ttl_transaction();
let delivered = super::notify_transaction_timeout(¬ifier, tx);
assert!(!delivered, "helper must return false on a full channel");
assert!(
!logger.contains("Notification channel full, skipping timeout notification"),
"notify_transaction_timeout Full arm must not emit WARN \
(would re-introduce #4238 spam from the GC-sweep path); \
captured: {:?}",
logger.logs()
);
}
#[tokio::test]
async fn notify_transaction_timeout_closed_still_emits_warn() {
let logger = crate::test_utils::TestLogger::new()
.with_level("warn")
.capture_logs()
.init();
let (receiver, notifier) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::ttl_transaction();
let delivered = super::notify_transaction_timeout(¬ifier, tx);
assert!(!delivered, "helper must return false on a closed channel");
assert!(
logger.contains("Notification channel closed, receiver likely dropped"),
"Closed arm must still emit WARN; captured: {:?}",
logger.logs()
);
}
const EMISSION_SITES: &[(&str, &str)] = &[
(
"contract/executor/runtime.rs",
include_str!("../contract/executor/runtime.rs"),
),
(
"contract/executor/runtime/executor_impl.rs",
include_str!("../contract/executor/runtime/executor_impl.rs"),
),
(
"contract/executor/mock_runtime.rs",
include_str!("../contract/executor/mock_runtime.rs"),
),
("operations.rs", include_str!("../operations.rs")),
("node.rs", include_str!("../node.rs")),
(
"node/network_bridge/p2p_protoc.rs",
include_str!("network_bridge/p2p_protoc.rs"),
),
(
"node/network_bridge/p2p_protoc/broadcast.rs",
include_str!("network_bridge/p2p_protoc/broadcast.rs"),
),
];
fn names_gossip_emit(forward_window: &str) -> bool {
forward_window.contains("Broadcast") || forward_window.contains("SyncStateToPeer")
}
#[test]
fn broadcast_emission_sites_use_try_notify_or_are_deliberately_blocking() {
const MARKER: &str = "DELIBERATELY blocking";
const LOOKBACK_BYTES: usize = 2048;
for (rel_path, src) in EMISSION_SITES {
let needle = ".notify_node_event(";
let mut search_idx = 0;
while let Some(rel) = src[search_idx..].find(needle) {
let abs = search_idx + rel;
let preceded_by_try = abs.checked_sub(4).is_some_and(|i| &src[i..abs] == ".try");
if !preceded_by_try {
let window_end = (abs + 200).min(src.len());
let forward_window = &src[abs..window_end];
if names_gossip_emit(forward_window) {
let lookback_start = abs.saturating_sub(LOOKBACK_BYTES);
let backward_window = &src[lookback_start..abs];
assert!(
backward_window.contains(MARKER),
"crates/core/src/{rel_path}: blocking \
notify_node_event(...).await is used to emit a \
best-effort gossip event (Broadcast* or \
SyncStateToPeer) near offset {abs}, but no \
'{MARKER}' marker comment appears in the preceding \
~{LOOKBACK_BYTES} bytes. Either use \
try_notify_node_event (preferred — see #4145) or \
add a '{MARKER}' comment above the call explaining \
the deliberate exception, as `announce_contract_hosted` \
does for the one-shot hosting transition. Forward \
window:\n{forward_window}"
);
}
}
search_idx = abs + needle.len();
}
}
}
#[test]
fn broadcast_emission_files_are_in_allowlist() {
use std::collections::HashSet;
let allowlist: HashSet<&str> = EMISSION_SITES.iter().map(|(p, _)| *p).collect();
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let src_root = std::path::Path::new(manifest_dir).join("src");
fn walk(dir: &std::path::Path, root: &std::path::Path, out: &mut Vec<(String, String)>) {
let entries = std::fs::read_dir(dir)
.unwrap_or_else(|e| panic!("read_dir({}) failed: {e}", dir.display()));
for entry in entries {
let entry =
entry.unwrap_or_else(|e| panic!("dir entry in {} failed: {e}", dir.display()));
let path = entry.path();
if path.is_dir() {
walk(&path, root, out);
} else if path.extension().is_some_and(|e| e == "rs") {
let contents = std::fs::read_to_string(&path).unwrap_or_else(|e| {
panic!("read_to_string({}) failed: {e}", path.display())
});
let rel = path
.strip_prefix(root)
.unwrap_or(&path)
.to_string_lossy()
.replace('\\', "/");
out.push((rel, contents));
}
}
}
let mut files = Vec::new();
walk(&src_root, &src_root, &mut files);
for allow in &allowlist {
assert!(
files.iter().any(|(rel, _)| rel == *allow),
"allowlist file '{allow}' was NOT visited by the source walk \
(walked {} files total). Either EMISSION_SITES has a stale path \
or the walk failed silently — a vacuous pass would silently \
disable the regression guard.",
files.len()
);
}
for (rel, contents) in &files {
if allowlist.contains(rel.as_str()) {
continue;
}
if rel == "node/op_state_manager.rs" {
continue;
}
let needle = "notify_node_event(";
let mut search_idx = 0;
while let Some(rel_idx) = contents[search_idx..].find(needle) {
let abs = search_idx + rel_idx;
let window_end = (abs + 200).min(contents.len());
let forward_window = &contents[abs..window_end];
assert!(
!names_gossip_emit(forward_window),
"{rel} contains a `notify_node_event(...)` best-effort \
gossip emission (Broadcast* or SyncStateToPeer) near \
offset {abs}, but {rel} is NOT in the shared EMISSION_SITES \
allowlist used by both source-scrape tests. Add an entry \
to `EMISSION_SITES` in op_state_manager.rs covering this \
file (both `include_str!` and the relative path) — that \
single change updates both the primary scrape and this \
walker. Window:\n{forward_window}"
);
search_idx = abs + needle.len();
}
}
}
#[test]
fn contract_waiters_cleanup_removes_closed_senders() {
use std::collections::HashMap;
let mut waiters: HashMap<ContractInstanceId, Vec<oneshot::Sender<()>>> = HashMap::new();
let id1 = ContractInstanceId::new([1; 32]);
let id2 = ContractInstanceId::new([2; 32]);
let (tx_live, _rx_live) = oneshot::channel();
let (tx_dead, _rx_dead) = oneshot::channel::<()>();
drop(_rx_dead);
waiters.entry(id1).or_default().push(tx_live);
waiters.entry(id1).or_default().push(tx_dead);
let (tx_dead2, rx_dead2) = oneshot::channel::<()>();
drop(rx_dead2);
waiters.entry(id2).or_default().push(tx_dead2);
assert_eq!(waiters.len(), 2);
waiters.retain(|_id, senders| {
senders.retain(|sender| !sender.is_closed());
!senders.is_empty()
});
assert_eq!(waiters.len(), 1);
assert!(waiters.contains_key(&id1));
assert!(!waiters.contains_key(&id2));
assert_eq!(waiters[&id1].len(), 1);
}
#[test]
fn client_op_guard_decrements_on_panic() {
let counter = Arc::new(AtomicUsize::new(0));
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = ClientOpGuard::new(counter.clone());
assert_eq!(counter.load(Ordering::Relaxed), 1);
panic!("simulated driver panic");
}));
assert!(result.is_err(), "the closure must have panicked");
assert_eq!(
counter.load(Ordering::Relaxed),
0,
"ClientOpGuard::Drop must decrement the counter even on \
panic — otherwise the shutdown drain leaks and waits \
the full drain_timeout for a driver that's no longer \
alive."
);
}
#[test]
fn seqcst_used_for_admission_handshake_atomics() {
let op_state = include_str!("op_state_manager.rs");
let new_body = op_state
.split("fn new(counter: Arc<AtomicUsize>) -> Self {")
.nth(1)
.and_then(|s| s.split("Self {").next())
.expect("ClientOpGuard::new body must be findable");
assert!(
new_body.contains("fetch_add(1, Ordering::SeqCst)"),
"ClientOpGuard::new must fetch_add with SeqCst — see \
admit_client_op rustdoc. Found body:\n{new_body}"
);
let admit_body = op_state
.split("pub(crate) fn admit_client_op(&self) -> Option<ClientOpGuard> {")
.nth(1)
.and_then(|s| s.split("\n }").next())
.expect("admit_client_op body must be findable");
assert!(
admit_body.contains("self.shutting_down.load(Ordering::SeqCst)"),
"admit_client_op must load shutting_down with SeqCst.\nbody:\n{admit_body}"
);
let node_rs = include_str!("../node.rs");
assert!(
node_rs.contains("self.shutting_down.store(true, Ordering::SeqCst)"),
"ShutdownHandle::shutdown Phase 1 must store shutting_down \
with SeqCst — the gate write must synchronize with \
admit_client_op's gate load."
);
let seqcst_loads = node_rs
.matches("self.inflight_client_ops.load(Ordering::SeqCst)")
.count();
assert!(
seqcst_loads >= 2,
"wait_for_drain must load inflight_client_ops with SeqCst \
at BOTH the initial fast-path AND the poll loop \
(found {seqcst_loads} SeqCst loads, need >= 2). A single \
Relaxed load anywhere in the drain re-opens the Dekker \
race."
);
}
#[test]
fn admit_client_op_bumps_before_checking_gate() {
let src = include_str!("op_state_manager.rs");
let body = src
.split("pub(crate) fn admit_client_op(&self) -> Option<ClientOpGuard> {")
.nth(1)
.and_then(|s| s.split("\n }").next())
.expect("admit_client_op body must be findable");
let bump_pos = body
.find("self.client_op_guard()")
.expect("admit_client_op must call client_op_guard()");
let gate_pos = body
.find("self.shutting_down.load")
.expect("admit_client_op must load shutting_down");
assert!(
bump_pos < gate_pos,
"admit_client_op must call client_op_guard() (bump) BEFORE \
loading shutting_down (check). Reverse order re-opens \
Codex r2 TOCTOU: a check-then-bump caller can see \
shutting_down=false, then shutdown sets it and sees \
counter=0, returns from drain, then caller bumps + \
spawns into a dead node."
);
}
#[test]
fn admit_client_op_refuses_when_shutting_down() {
let counter = Arc::new(AtomicUsize::new(0));
let gate = Arc::new(AtomicBool::new(true)); let admit = || -> Option<ClientOpGuard> {
let guard = ClientOpGuard::new(counter.clone());
if gate.load(Ordering::Relaxed) {
drop(guard);
return None;
}
Some(guard)
};
assert!(
admit().is_none(),
"admit_client_op must return None when the gate is set"
);
assert_eq!(
counter.load(Ordering::Relaxed),
0,
"the bump-then-check pattern must net-zero the counter \
on rejection — otherwise the gate leaks bumped counts \
that the drain then waits on indefinitely."
);
gate.store(false, Ordering::Relaxed);
let g = admit().expect("gate is open, admit must succeed");
assert_eq!(counter.load(Ordering::Relaxed), 1);
drop(g);
assert_eq!(counter.load(Ordering::Relaxed), 0);
}
}