use anyhow::Context;
use freenet_stdlib::{
client_api::{ClientRequest, ErrorKind},
prelude::ContractInstanceId,
};
use std::{
borrow::Cow,
fs::File,
io::Read,
net::{IpAddr, SocketAddr, ToSocketAddrs},
sync::Arc,
time::Duration,
};
use std::{collections::HashSet, convert::Infallible};
use self::p2p_impl::NodeP2P;
use crate::{
client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest},
config::{Address, GatewayConfig, WebsocketApiConfig},
contract::{ExecutorError, NetworkContractHandler},
local_node::Executor,
message::{InnerMessage, NetMessage, NodeEvent, Transaction, TransactionType},
operations::{OpError, connect, get, put, subscribe, update},
ring::{Location, PeerKeyLocation},
tracing::{EventRegister, NetEventLog, NetEventRegister},
};
use crate::{
config::Config,
message::{MessageStats, NetMessageV1},
};
use freenet_stdlib::client_api::DelegateRequest;
use serde::{Deserialize, Serialize};
pub(crate) use network_bridge::{
ConnectionError, EventLoopNotificationsSender, NetworkBridge, OpExecutionPayload, WaiterReply,
};
pub(crate) use network_bridge::broadcast_queue::BROADCAST_STREAM_METRICS;
#[cfg(test)]
pub(crate) use network_bridge::{EventLoopNotificationsReceiver, event_loop_notification_channel};
pub use network_bridge::{EventLoopExitReason, NetworkStats, reset_channel_id_counter};
use crate::topology::rate::Rate;
use crate::transport::{TransportKeypair, TransportPublicKey};
pub(crate) use op_state_manager::OpManager;
mod network_bridge;
pub use network_bridge::in_memory::{FaultInjectorState, get_fault_injector, set_fault_injector};
pub(crate) mod background_task_monitor;
pub(crate) mod neighbor_hosting;
pub(crate) mod network_status;
mod op_state_manager;
mod p2p_impl;
mod request_router;
pub(crate) mod testing_impl;
pub use request_router::{DeduplicatedRequest, RequestRouter};
#[derive(Clone)]
pub struct ShutdownHandle {
tx: tokio::sync::mpsc::Sender<NodeEvent>,
inflight_client_ops: Arc<std::sync::atomic::AtomicUsize>,
shutting_down: Arc<std::sync::atomic::AtomicBool>,
drain_timeout: std::time::Duration,
}
impl ShutdownHandle {
pub async fn shutdown(&self) {
use std::sync::atomic::Ordering;
self.shutting_down.store(true, Ordering::SeqCst);
self.wait_for_drain().await;
if let Err(err) = self
.tx
.send(NodeEvent::Disconnect {
cause: Some("graceful shutdown".into()),
})
.await
{
tracing::debug!(
error = %err,
"failed to send graceful shutdown signal; shutdown channel may already be closed"
);
}
}
async fn wait_for_drain(&self) {
use std::sync::atomic::Ordering;
if self.drain_timeout.is_zero() {
return;
}
let initial = self.inflight_client_ops.load(Ordering::SeqCst);
if initial == 0 {
return;
}
tracing::info!(
initial,
drain_timeout_secs = self.drain_timeout.as_secs(),
"Shutdown drain: waiting for in-flight client ops to finish"
);
let drained = tokio::time::timeout(self.drain_timeout, async {
let mut tick = tokio::time::interval(std::time::Duration::from_millis(200));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tick.tick().await;
loop {
if self.inflight_client_ops.load(Ordering::SeqCst) == 0 {
return;
}
tick.tick().await;
}
})
.await;
let remaining = self.inflight_client_ops.load(Ordering::Relaxed);
match drained {
Ok(()) => tracing::info!(initial, "Shutdown drain complete (all client ops finished)"),
Err(_) => tracing::warn!(
initial,
remaining,
drain_timeout_secs = self.drain_timeout.as_secs(),
"Shutdown drain timed out; proceeding with disconnect"
),
}
}
}
pub struct Node {
inner: NodeP2P,
shutdown_handle: ShutdownHandle,
}
impl Node {
pub fn update_location(&mut self, location: Location) {
self.inner
.op_manager
.ring
.connection_manager
.update_location(Some(location));
}
pub fn shutdown_handle(&self) -> ShutdownHandle {
self.shutdown_handle.clone()
}
pub async fn run(self) -> anyhow::Result<Infallible> {
self.inner.run_node().await
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[non_exhaustive] pub struct NodeConfig {
pub should_connect: bool,
pub is_gateway: bool,
pub key_pair: TransportKeypair,
pub network_listener_ip: IpAddr,
pub network_listener_port: u16,
pub(crate) own_addr: Option<SocketAddr>,
pub(crate) config: Arc<Config>,
pub(crate) gateways: Vec<InitPeerNode>,
pub(crate) location: Option<Location>,
pub(crate) max_hops_to_live: Option<usize>,
pub(crate) rnd_if_htl_above: Option<usize>,
pub(crate) max_number_conn: Option<usize>,
pub(crate) min_number_conn: Option<usize>,
pub(crate) max_upstream_bandwidth: Option<Rate>,
pub(crate) max_downstream_bandwidth: Option<Rate>,
pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
pub(crate) transient_budget: usize,
pub(crate) transient_ttl: Duration,
#[serde(default)]
pub(crate) relay_ready_connections: Option<usize>,
#[serde(skip)]
pub(crate) governance_config_override: Option<crate::contract::governance::GovernanceConfig>,
#[serde(skip)]
pub(crate) subscribe_hint_floor_override: Option<(u8, u8, u16)>,
}
impl NodeConfig {
pub(crate) fn local_peer_id_string(&self) -> String {
let addr = self.own_addr.unwrap_or_else(|| {
std::net::SocketAddr::new(self.network_listener_ip, self.network_listener_port)
});
PeerId::new(self.key_pair.public().clone(), addr).to_string()
}
pub async fn new(config: Config) -> anyhow::Result<NodeConfig> {
tracing::info!("Loading node configuration for mode {}", config.mode);
let own_pub_key = config.transport_keypair().public();
let mut gateways = Vec::with_capacity(config.gateways.len());
for gw in &config.gateways {
let GatewayConfig {
address,
public_key_path,
location,
} = gw;
let mut key_bytes = None;
for attempt in 0..10 {
let mut key_file = File::open(public_key_path).with_context(|| {
format!("failed loading gateway pubkey from {public_key_path:?}")
})?;
let mut buf = String::new();
key_file.read_to_string(&mut buf)?;
let buf = buf.trim();
if buf.starts_with("-----BEGIN") {
if attempt < 9 {
tracing::debug!(
public_key_path = ?public_key_path,
attempt = attempt + 1,
"Gateway public key is still RSA PEM format, waiting for X25519 conversion..."
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
} else {
tracing::warn!(
public_key_path = ?public_key_path,
"Gateway public key still in RSA PEM format after 5s. Skipping this gateway."
);
break;
}
}
match hex::decode(buf) {
Ok(bytes) if bytes.len() == 32 => {
key_bytes = Some(bytes);
break;
}
Ok(bytes) => {
anyhow::bail!(
"invalid gateway pubkey length {} (expected 32) from {public_key_path:?}",
bytes.len()
);
}
Err(e) => {
anyhow::bail!(
"failed to decode gateway pubkey hex from {public_key_path:?}: {e}"
);
}
}
}
let key_bytes = match key_bytes {
Some(bytes) => bytes,
None => continue, };
let mut key_arr = [0u8; 32];
key_arr.copy_from_slice(&key_bytes);
let transport_pub_key = TransportPublicKey::from_bytes(key_arr);
if &transport_pub_key == own_pub_key {
tracing::warn!(
"Skipping gateway with same public key as self: {:?}",
public_key_path
);
continue;
}
let address = Self::parse_socket_addr(address).await?;
let peer_key_location = PeerKeyLocation::new(transport_pub_key, address);
let location = location
.map(Location::new)
.unwrap_or_else(|| Location::from_address(&address));
gateways.push(InitPeerNode::new(peer_key_location, location));
}
tracing::info!(
"Node will be listening at {}:{} internal address",
config.network_api.address,
config.network_api.port
);
if let Some(own_addr) = &config.peer_id {
tracing::info!("Node external address: {}", own_addr.socket_addr());
}
Ok(NodeConfig {
should_connect: true,
is_gateway: config.is_gateway,
key_pair: config.transport_keypair().clone(),
gateways,
own_addr: config.peer_id.clone().map(|p| p.socket_addr()),
network_listener_ip: config.network_api.address,
network_listener_port: config.network_api.port,
location: config.location.map(Location::new),
config: Arc::new(config.clone()),
max_hops_to_live: None,
rnd_if_htl_above: None,
max_number_conn: Some(config.network_api.max_connections),
min_number_conn: Some(config.network_api.min_connections),
max_upstream_bandwidth: None,
max_downstream_bandwidth: None,
blocked_addresses: config.network_api.blocked_addresses.clone(),
transient_budget: config.network_api.transient_budget,
transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs),
relay_ready_connections: if config.network_api.skip_load_from_network {
Some(0) } else {
Some(3) },
governance_config_override: None,
subscribe_hint_floor_override: None,
})
}
pub(crate) async fn parse_socket_addr(address: &Address) -> anyhow::Result<SocketAddr> {
let (hostname, port) = match address {
crate::config::Address::Host { host, port } => {
let host_with_port = format!("{host}:{port}");
if let Ok(mut addrs) = host_with_port.to_socket_addrs() {
if let Some(addr) = addrs.next() {
return Ok(addr);
}
}
(Cow::Borrowed(host.as_str()), Some(*port))
}
crate::config::Address::Hostname(hostname) => {
match hostname.rsplit_once(':') {
None => {
let hostname_with_port =
format!("{}:{}", hostname, crate::config::DEFAULT_GATEWAY_PORT);
if let Ok(mut addrs) = hostname_with_port.to_socket_addrs() {
if let Some(addr) = addrs.next() {
return Ok(addr);
}
}
(Cow::Borrowed(hostname.as_str()), None)
}
Some((host, port)) => match port.parse::<u16>() {
Ok(port) => {
if let Ok(mut addrs) = hostname.to_socket_addrs() {
if let Some(addr) = addrs.next() {
return Ok(addr);
}
}
(Cow::Borrowed(host), Some(port))
}
Err(_) => return Err(anyhow::anyhow!("Invalid port number: {port}")),
},
}
}
Address::HostAddress(addr) => return Ok(*addr),
};
let resolver = hickory_resolver::TokioResolver::builder_tokio()?.build()?;
let hostname = if hostname.ends_with('.') {
hostname
} else {
Cow::Owned(format!("{hostname}."))
};
let ips = resolver.lookup_ip(hostname.as_ref()).await?;
match ips.iter().next() {
Some(ip) => Ok(SocketAddr::new(
ip,
port.unwrap_or(crate::config::DEFAULT_GATEWAY_PORT),
)),
None => Err(anyhow::anyhow!("Fail to resolve IP address of {hostname}")),
}
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn is_gateway(&mut self) -> &mut Self {
self.is_gateway = true;
self
}
pub fn first_gateway(&mut self) {
self.should_connect = false;
}
pub fn with_should_connect(&mut self, should_connect: bool) -> &mut Self {
self.should_connect = should_connect;
self
}
pub fn max_hops_to_live(&mut self, num_hops: usize) -> &mut Self {
self.max_hops_to_live = Some(num_hops);
self
}
pub fn rnd_if_htl_above(&mut self, num_hops: usize) -> &mut Self {
self.rnd_if_htl_above = Some(num_hops);
self
}
pub fn max_number_of_connections(&mut self, num: usize) -> &mut Self {
self.max_number_conn = Some(num);
self
}
pub fn min_number_of_connections(&mut self, num: usize) -> &mut Self {
self.min_number_conn = Some(num);
self
}
pub fn relay_ready_connections(&mut self, num: Option<usize>) -> &mut Self {
self.relay_ready_connections = num;
self
}
pub fn with_own_addr(&mut self, addr: SocketAddr) -> &mut Self {
self.own_addr = Some(addr);
self
}
pub fn with_location(&mut self, loc: Location) -> &mut Self {
self.location = Some(loc);
self
}
pub fn add_gateway(&mut self, peer: InitPeerNode) -> &mut Self {
self.gateways.push(peer);
self
}
pub async fn build<const CLIENTS: usize>(
self,
clients: [BoxedClient; CLIENTS],
) -> anyhow::Result<Node> {
let (node, _flush_handle) = self.build_with_flush_handle(clients).await?;
Ok(node)
}
pub async fn build_with_flush_handle<const CLIENTS: usize>(
self,
clients: [BoxedClient; CLIENTS],
) -> anyhow::Result<(Node, crate::tracing::EventFlushHandle)> {
let (event_register, flush_handle) = {
use super::tracing::{DynamicRegister, TelemetryReporter};
let event_reg = EventRegister::new(self.config.event_log());
let flush_handle = event_reg.flush_handle();
let mut registers: Vec<Box<dyn NetEventRegister>> = vec![Box::new(event_reg)];
#[cfg(feature = "trace-ot")]
{
use super::tracing::OTEventRegister;
registers.push(Box::new(OTEventRegister::new()));
}
if let Some(telemetry) =
TelemetryReporter::new(&self.config.telemetry, self.local_peer_id_string())
{
registers.push(Box::new(telemetry));
}
(DynamicRegister::new(registers), flush_handle)
};
let cfg = self.config.clone();
let drain_timeout = std::time::Duration::from_secs(cfg.shutdown_drain_secs);
let (node_inner, shutdown_tx) = NodeP2P::build::<NetworkContractHandler, CLIENTS, _>(
self,
clients,
event_register,
cfg,
)
.await?;
let shutdown_handle = ShutdownHandle {
tx: shutdown_tx,
inflight_client_ops: node_inner.op_manager.inflight_client_ops_handle(),
shutting_down: node_inner.op_manager.shutting_down_handle(),
drain_timeout,
};
Ok((
Node {
inner: node_inner,
shutdown_handle,
},
flush_handle,
))
}
pub fn get_own_addr(&self) -> Option<SocketAddr> {
self.own_addr
}
fn get_gateways(&self) -> anyhow::Result<Vec<PeerKeyLocation>> {
let gateways: Vec<PeerKeyLocation> = self
.gateways
.iter()
.map(|node| node.peer_key_location.clone())
.collect();
if !self.is_gateway && gateways.is_empty() {
anyhow::bail!(
"At least one remote gateway is required to join an existing network for non-gateway nodes."
)
} else {
Ok(gateways)
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct InitPeerNode {
peer_key_location: PeerKeyLocation,
location: Location,
}
impl InitPeerNode {
pub fn new(peer_key_location: PeerKeyLocation, location: Location) -> Self {
Self {
peer_key_location,
location,
}
}
}
async fn report_result(
tx: Option<Transaction>,
op_result: Result<(), OpError>,
op_manager: &OpManager,
_event_listener: &mut dyn NetEventRegister,
) {
if let Some(tx_id) = tx {
if matches!(tx_id.transaction_type(), TransactionType::Update) {
tracing::debug!("report_result called for UPDATE transaction {}", tx_id);
}
}
match op_result {
Ok(()) => {
tracing::debug!(?tx, "Network message dispatch finished");
}
Err(err) => {
if let Some(tx) = tx {
if !tx.is_sub_operation() {
let client_error = freenet_stdlib::client_api::ClientError::from(
freenet_stdlib::client_api::ErrorKind::OperationError {
cause: err.to_string().into(),
},
);
op_manager.send_client_result(tx, Err(client_error));
}
op_manager.completed(tx);
}
#[cfg(any(debug_assertions, test))]
{
use std::io::Write;
#[cfg(debug_assertions)]
let OpError::InvalidStateTransition { tx, state, trace } = err else {
tracing::error!("Finished transaction with error: {err}");
return;
};
#[cfg(not(debug_assertions))]
let OpError::InvalidStateTransition { tx } = err else {
tracing::error!("Finished transaction with error: {err}");
return;
};
#[cfg(debug_assertions)]
let trace = format!("{trace}");
#[cfg(debug_assertions)]
{
let mut tr_lines = trace.lines();
let trace = tr_lines
.nth(2)
.map(|second_trace| {
let second_trace_lines =
[second_trace, tr_lines.next().unwrap_or_default()];
second_trace_lines.join("\n")
})
.unwrap_or_default();
let peer = op_manager.ring.connection_manager.own_location();
let log = format!(
"Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n"
);
std::io::stderr().write_all(log.as_bytes()).unwrap();
}
#[cfg(not(debug_assertions))]
{
let peer = op_manager.ring.connection_manager.own_location();
let log = format!("Transaction ({tx} @ {peer}) error\n");
std::io::stderr().write_all(log.as_bytes()).unwrap();
}
}
#[cfg(not(any(debug_assertions, test)))]
{
tracing::debug!("Finished transaction with error: {err}");
}
}
}
}
pub(crate) async fn process_message_decoupled<CB>(
msg: NetMessage,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
mut event_listener: Box<dyn NetEventRegister>,
pending_op_result: Option<tokio::sync::mpsc::Sender<crate::node::WaiterReply>>,
) where
CB: NetworkBridge + Clone + 'static,
{
let tx = *msg.id();
let op_result = handle_pure_network_message(
msg,
source_addr,
op_manager.clone(),
conn_manager,
event_listener.as_mut(),
pending_op_result,
)
.await;
report_result(Some(tx), op_result, &op_manager, &mut *event_listener).await;
}
#[allow(clippy::too_many_arguments)]
async fn handle_pure_network_message<CB>(
msg: NetMessage,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
pending_op_result: Option<tokio::sync::mpsc::Sender<crate::node::WaiterReply>>,
) -> Result<(), crate::node::OpError>
where
CB: NetworkBridge + Clone + 'static,
{
match msg {
NetMessage::V1(msg_v1) => {
handle_pure_network_message_v1(
msg_v1,
source_addr,
op_manager,
conn_manager,
event_listener,
pending_op_result,
)
.await
}
}
}
fn try_forward_driver_reply(
pending_op_result: Option<&tokio::sync::mpsc::Sender<crate::node::WaiterReply>>,
reply: NetMessage,
op_label: &'static str,
) -> bool {
let Some(callback) = pending_op_result else {
return false;
};
let tx_id = *reply.id();
if let Err(err) = callback.try_send(crate::node::WaiterReply::Reply(reply)) {
tracing::debug!(
%err,
%tx_id,
op = op_label,
"Driver reply dropped (OpCtx receiver closed or reply channel full); operation proceeds without it"
);
}
true
}
fn fill_connect_response_acceptor_addr(
op: connect::ConnectMsg,
source_addr: Option<std::net::SocketAddr>,
) -> connect::ConnectMsg {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
connect::ConnectMsg::Response { id, mut payload } => {
if payload.acceptor.peer_addr.is_unknown() {
if let Some(addr) = source_addr {
payload.acceptor.peer_addr = crate::ring::PeerAddr::Known(addr);
tracing::debug!(
acceptor_pub_key = %payload.acceptor.pub_key(),
acceptor_addr = %addr,
"connect bypass: filled acceptor address from source_addr"
);
} else {
tracing::warn!(
acceptor_pub_key = %payload.acceptor.pub_key(),
"connect bypass: response received without source_addr, cannot fill acceptor address"
);
}
}
connect::ConnectMsg::Response { id, payload }
}
other => other,
}
}
#[allow(clippy::too_many_arguments, clippy::needless_return)]
async fn handle_pure_network_message_v1<CB>(
msg: NetMessageV1,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
pending_op_result: Option<tokio::sync::mpsc::Sender<crate::node::WaiterReply>>,
) -> Result<(), crate::node::OpError>
where
CB: NetworkBridge + Clone + 'static,
{
event_listener
.register_events(NetEventLog::from_inbound_msg_v1(
&msg,
&op_manager,
source_addr,
))
.await;
let tx = Some(*msg.id());
tracing::debug!(?tx, "Processing pure network operation");
match msg {
NetMessageV1::Connect(ref op) => {
if matches!(
op,
connect::ConnectMsg::Response { .. }
| connect::ConnectMsg::Rejected { .. }
| connect::ConnectMsg::ObservedAddress { .. }
| connect::ConnectMsg::ConnectFailed { .. }
) {
let forwarded_op = fill_connect_response_acceptor_addr(op.clone(), source_addr);
if try_forward_driver_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Connect(forwarded_op)),
"connect",
) {
return Ok(());
}
}
if let connect::ConnectMsg::Request { id, payload } = op {
if let Some(upstream_addr) = source_addr {
if !op_manager.active_relay_connect_txs.contains(id) {
if let Err(err) = connect::op_ctx_task::start_relay_connect(
op_manager.clone(),
*id,
payload.clone(),
upstream_addr,
)
.await
{
tracing::error!(
tx = %id,
%upstream_addr,
error = %err,
"CONNECT relay dispatch: start_relay_connect failed"
);
}
} else {
tracing::debug!(
tx = %id,
%upstream_addr,
"CONNECT: duplicate Request, relay driver already running"
);
}
} else {
tracing::debug!(
tx = %id,
"CONNECT: Request without source_addr ignored (no legacy joiner path)"
);
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"CONNECT: non-Request variant ignored \
(Response/Rejected/ObservedAddress/ConnectFailed already handled by bypass)"
);
}
return Ok(());
}
NetMessageV1::Put(ref op) => {
if matches!(
op,
put::PutMsg::Response { .. }
| put::PutMsg::ResponseStreaming { .. }
| put::PutMsg::Error { .. }
) && try_forward_driver_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Put((*op).clone())),
"put",
) {
return Ok(());
}
#[allow(clippy::wildcard_enum_match_arm)]
let banned_key = match op {
put::PutMsg::Request { contract, .. } => Some(contract.key()),
put::PutMsg::RequestStreaming { contract_key, .. } => Some(*contract_key),
_ => None,
};
if let Some(key) = banned_key {
if op_manager.ring.contract_ban_list.is_banned(key.id()) {
tracing::debug!(
tx = %op.id(),
%key,
phase = "put_banned_drop",
"PUT dispatch: dropping request for banned contract"
);
return Ok(());
}
}
let effective_upstream =
source_addr.or_else(|| op_manager.ring.connection_manager.get_own_addr());
if let Some(upstream_addr) = effective_upstream {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
put::PutMsg::Request {
id,
contract,
related_contracts,
value,
htl,
skip_list,
} => {
if let Err(err) = put::op_ctx_task::start_relay_put(
op_manager.clone(),
conn_manager.clone(),
*id,
contract.clone(),
related_contracts.clone(),
value.clone(),
*htl,
skip_list.clone(),
upstream_addr,
)
.await
{
tracing::error!(
tx = %id,
contract = %contract.key(),
error = %err,
"PUT relay dispatch: start_relay_put failed"
);
}
}
put::PutMsg::RequestStreaming {
id,
stream_id,
contract_key,
total_size,
htl,
skip_list,
subscribe,
} => {
if let Err(err) = put::op_ctx_task::start_relay_put_streaming(
op_manager.clone(),
conn_manager.clone(),
*id,
*stream_id,
*contract_key,
*total_size,
*htl,
skip_list.clone(),
*subscribe,
upstream_addr,
)
.await
{
tracing::error!(
tx = %id,
contract = %contract_key,
error = %err,
"PUT relay dispatch: start_relay_put_streaming failed"
);
}
}
_ => {
tracing::debug!(
tx = %op.id(),
?op,
"PUT: non-dispatch variant ignored \
(Response/ResponseStreaming/Error already \
handled by bypass; ForwardingAck is no-op)"
);
}
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"PUT: no own_addr available — pre-handshake \
message ignored"
);
}
return Ok(());
}
NetMessageV1::Get(ref op) => {
if matches!(
op,
get::GetMsg::Response { .. } | get::GetMsg::ResponseStreaming { .. }
) && try_forward_driver_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Get((*op).clone())),
"get",
) {
return Ok(());
}
if let get::GetMsg::Request { instance_id, .. } = op {
if op_manager.ring.contract_ban_list.is_banned(instance_id) {
tracing::debug!(
tx = %op.id(),
%instance_id,
phase = "get_banned_drop",
"GET dispatch: dropping request for banned contract"
);
return Ok(());
}
}
let effective_upstream =
source_addr.or_else(|| op_manager.ring.connection_manager.get_own_addr());
if let Some(upstream_addr) = effective_upstream {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
get::GetMsg::Request {
id,
instance_id,
fetch_contract,
htl,
visited,
subscribe,
} => {
if let Err(err) = get::op_ctx_task::start_relay_get(
op_manager.clone(),
conn_manager.clone(),
*id,
*instance_id,
*htl,
upstream_addr,
visited.clone(),
*fetch_contract,
*subscribe,
)
.await
{
tracing::error!(
tx = %id,
%instance_id,
error = %err,
"GET relay dispatch: start_relay_get failed"
);
}
}
_ => {
tracing::debug!(
tx = %op.id(),
?op,
"GET: non-dispatch variant ignored \
(Response/ResponseStreaming already handled \
by bypass; ForwardingAck is no-op; \
ResponseStreamingAck handled by stream layer)"
);
}
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"GET: no own_addr available — pre-handshake \
message ignored"
);
}
return Ok(());
}
NetMessageV1::Update(ref op) => {
if let Some(sender_addr) = source_addr {
let key = match op {
update::UpdateMsg::RequestUpdate { key, .. }
| update::UpdateMsg::BroadcastTo { key, .. }
| update::UpdateMsg::RequestUpdateStreaming { key, .. }
| update::UpdateMsg::BroadcastToStreaming { key, .. } => *key,
};
if op_manager.ring.contract_ban_list.is_banned(key.id()) {
tracing::debug!(
tx = %op.id(),
%key,
%sender_addr,
phase = "update_dispatch_banned_drop",
"UPDATE dispatch: dropping request for banned contract"
);
return Ok(());
}
let rate_decision = op_manager
.ring
.update_rate_limiter
.check_and_record(sender_addr, *key.id());
if !rate_decision.is_allowed() {
tracing::debug!(
tx = %op.id(),
%key,
%sender_addr,
?rate_decision,
phase = "update_dispatch_rate_limited",
"UPDATE dispatch: rejected by per-(sender, contract) rate limit"
);
return Ok(());
}
#[allow(clippy::wildcard_enum_match_arm)]
match op {
update::UpdateMsg::RequestUpdate {
id,
key,
related_contracts,
value,
} => {
if let Err(err) = update::op_ctx_task::start_relay_request_update(
op_manager.clone(),
*id,
*key,
related_contracts.clone(),
value.clone(),
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_request_update failed"
);
}
return Ok(());
}
update::UpdateMsg::BroadcastTo {
id,
key,
payload,
sender_summary_bytes,
} => {
if let Err(err) = update::op_ctx_task::start_relay_broadcast_to(
op_manager.clone(),
*id,
*key,
payload.clone(),
sender_summary_bytes.clone(),
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_broadcast_to failed"
);
}
return Ok(());
}
update::UpdateMsg::RequestUpdateStreaming {
id,
key,
stream_id,
total_size,
} => {
if let Err(err) = update::op_ctx_task::start_relay_request_update_streaming(
op_manager.clone(),
*id,
*key,
*stream_id,
*total_size,
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_request_update_streaming failed"
);
}
return Ok(());
}
update::UpdateMsg::BroadcastToStreaming {
id,
key,
stream_id,
total_size,
} => {
if let Err(err) = update::op_ctx_task::start_relay_broadcast_to_streaming(
op_manager.clone(),
*id,
*key,
*stream_id,
*total_size,
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_broadcast_to_streaming failed"
);
}
return Ok(());
}
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"UPDATE: internal-source variant ignored"
);
}
return Ok(());
}
NetMessageV1::Subscribe(ref op) => {
if matches!(op, subscribe::SubscribeMsg::Response { .. })
&& try_forward_driver_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Subscribe((*op).clone())),
"subscribe",
)
{
return Ok(());
}
let effective_upstream =
source_addr.or_else(|| op_manager.ring.connection_manager.get_own_addr());
if let Some(upstream_addr) = effective_upstream {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
subscribe::SubscribeMsg::Request {
id,
instance_id,
htl,
visited,
is_renewal,
} => {
if op_manager.ring.contract_ban_list.is_banned(instance_id) {
tracing::debug!(
tx = %id,
%instance_id,
%upstream_addr,
phase = "subscribe_dispatch_banned_drop",
"SUBSCRIBE dispatch: dropping request for banned contract"
);
return Ok(());
}
if let Err(err) = subscribe::op_ctx_task::start_relay_subscribe(
op_manager.clone(),
*id,
*instance_id,
*htl,
visited.clone(),
*is_renewal,
upstream_addr,
)
.await
{
tracing::error!(
tx = %id,
%instance_id,
error = %err,
"SUBSCRIBE relay dispatch: start_relay_subscribe failed"
);
}
}
subscribe::SubscribeMsg::Unsubscribe { id, instance_id } => {
subscribe::handle_unsubscribe_inbound(
&op_manager,
*id,
*instance_id,
source_addr,
)
.await;
}
_ => {
tracing::debug!(
tx = %op.id(),
?op,
"SUBSCRIBE: non-dispatch variant ignored \
(Response already handled by bypass; \
ForwardingAck is no-op)"
);
}
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"SUBSCRIBE: no own_addr available — pre-handshake \
message ignored"
);
}
return Ok(());
}
NetMessageV1::NeighborHosting { ref message } => {
let Some(source) = source_addr else {
tracing::warn!(
"Received NeighborHosting message without source address (pure network)"
);
return Ok(());
};
tracing::debug!(
from = %source,
"Processing NeighborHosting message (pure network)"
);
let source_pub_key = op_manager
.ring
.connection_manager
.get_peer_by_addr(source)
.map(|pkl| pkl.pub_key().clone());
let Some(source_pub_key) = source_pub_key else {
tracing::debug!(
%source,
"NeighborHosting: could not resolve source addr to pub_key, skipping"
);
return Ok(());
};
let result = op_manager
.neighbor_hosting
.handle_message(&source_pub_key, message.clone());
if let Some(response) = result.response {
let response_msg =
NetMessage::V1(NetMessageV1::NeighborHosting { message: response });
if let Err(err) = conn_manager.send(source, response_msg).await {
tracing::error!(%err, %source, "Failed to send NeighborHosting response");
}
}
for instance_id in result.overlapping_contracts {
if op_manager.ring.contract_ban_list.is_banned(&instance_id) {
tracing::debug!(
%instance_id,
peer = %source_pub_key,
phase = "neighbor_hosting_banned_skip",
"skipping proximity sync for banned contract"
);
continue;
}
let probe_key = freenet_stdlib::prelude::ContractKey::from_id_and_code(
instance_id,
freenet_stdlib::prelude::CodeHash::new([0u8; 32]),
);
if !op_manager.ring.is_receiving_updates(&probe_key)
&& !op_manager.ring.has_downstream_subscribers(&probe_key)
&& !op_manager.pending_broadcasts.contains(&instance_id)
{
continue;
}
if let Some((key, state)) =
get_contract_state_by_id(&op_manager, &instance_id).await
{
op_manager.flush_pending_broadcast_on_interest(&key).await;
if !op_manager.ring.is_receiving_updates(&key)
&& !op_manager.ring.has_downstream_subscribers(&key)
{
continue;
}
tracing::debug!(
contract = %key,
peer = %source_pub_key,
"Proximity cache overlap — syncing state to neighbor"
);
if let Err(e) = op_manager.try_notify_node_event(NodeEvent::SyncStateToPeer {
key,
new_state: state,
target: source,
}) {
tracing::debug!(
contract = %instance_id,
error = %e,
"Failed to emit SyncStateToPeer for proximity sync (best-effort)"
);
}
}
}
return Ok(());
}
NetMessageV1::InterestSync { ref message } => {
let Some(source) = source_addr else {
tracing::warn!("Received InterestSync message without source address");
return Ok(());
};
tracing::debug!(
from = %source,
"Processing InterestSync message"
);
if let Some(response) =
handle_interest_sync_message(&op_manager, source, message.clone()).await
{
let response_msg = NetMessage::V1(NetMessageV1::InterestSync { message: response });
if let Err(err) = conn_manager.send(source, response_msg).await {
tracing::error!(%err, %source, "Failed to send InterestSync response");
}
}
return Ok(());
}
NetMessageV1::ReadyState { ready } => {
let Some(source) = source_addr else {
tracing::warn!("Received ReadyState message without source address");
return Ok(());
};
if ready {
op_manager.ring.connection_manager.mark_peer_ready(source);
} else {
op_manager
.ring
.connection_manager
.mark_peer_not_ready(source);
}
tracing::debug!(
from = %source,
ready,
"Processed ReadyState from peer"
);
return Ok(());
}
NetMessageV1::SubscribeHint(hint) => {
let floor = op_manager
.ring
.connection_manager
.subscribe_hint_floor_override()
.unwrap_or(crate::node::network_bridge::p2p_protoc::SUBSCRIBE_HINT_MIN_VERSION);
let own_version = crate::node::network_bridge::p2p_protoc::own_crate_version();
if !crate::node::network_bridge::p2p_protoc::version_supports_subscribe_hint(
Some(own_version),
floor,
) {
tracing::debug!(
key = %hint.key,
?own_version,
?floor,
?source_addr,
"Ignoring inbound SubscribeHint: own version is below the \
SubscribeHint re-enable floor (pre-floor peer, wire-compat)"
);
return Ok(());
}
if op_manager.ring.is_hosting_contract(&hint.key) {
tracing::debug!(
key = %hint.key,
?source_addr,
"Received SubscribeHint for an already-hosted contract — ignoring"
);
return Ok(());
}
if hint.holder.socket_addr() != source_addr {
tracing::debug!(
key = %hint.key,
holder = ?hint.holder.socket_addr(),
?source_addr,
"Received SubscribeHint whose holder is not the sender — ignoring"
);
return Ok(());
}
tracing::debug!(
key = %hint.key,
holder = %hint.holder,
?source_addr,
"Received SubscribeHint — starting directed subscribe to holder"
);
subscribe::start_directed_subscribe(op_manager.clone(), hint.key, hint.holder);
return Ok(());
}
NetMessageV1::Aborted(tx) => {
tracing::debug!(
%tx,
tx_type = ?tx.transaction_type(),
"Received Aborted message — driver owns cancellation, ignoring"
);
Ok(())
}
}
}
const MAX_STALE_SYNCS_PER_SUMMARIES: usize = 32;
fn stale_sync_emit_budget(stale_contracts_len: usize) -> usize {
stale_contracts_len.min(MAX_STALE_SYNCS_PER_SUMMARIES)
}
#[cfg(test)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum StaleSyncDisposition {
Emit,
Banned,
NoState,
}
#[cfg(test)]
fn count_stale_syncs_emitted(dispositions: &[StaleSyncDisposition]) -> usize {
let budget = stale_sync_emit_budget(dispositions.len());
let mut emitted = 0usize;
for d in dispositions {
if emitted >= budget {
break;
}
if *d == StaleSyncDisposition::Emit {
emitted += 1;
}
}
emitted
}
#[cfg(test)]
fn emitted_indices_for_rotation(total: usize, start: usize) -> Vec<usize> {
let budget = stale_sync_emit_budget(total);
(0..budget).map(|p| (start + p) % total).collect()
}
async fn handle_interest_sync_message(
op_manager: &Arc<OpManager>,
source: std::net::SocketAddr,
message: crate::message::InterestMessage,
) -> Option<crate::message::InterestMessage> {
use crate::message::{InterestMessage, NodeEvent, SummaryEntry};
use crate::ring::interest::contract_hash;
match message {
InterestMessage::Interests { hashes } => {
tracing::debug!(
from = %source,
hash_count = hashes.len(),
"Received Interests message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
let incoming_hashes: std::collections::HashSet<u32> =
hashes.iter().copied().collect();
let current_contracts = op_manager.interest_manager.get_contracts_for_peer(pk);
let mut removed = 0usize;
for contract in ¤t_contracts {
let h = contract_hash(contract);
if !incoming_hashes.contains(&h) {
op_manager
.interest_manager
.remove_peer_interest(contract, pk);
removed += 1;
}
}
if removed > 0 {
tracing::debug!(
from = %source,
removed,
"Full-replace: removed stale interest entries"
);
}
}
let matching = op_manager.interest_manager.get_matching_contracts(&hashes);
let mut entries = Vec::with_capacity(matching.len());
for contract in matching {
let hash = contract_hash(&contract);
let summary = summary_if_hosted_or_in_use(op_manager, &contract).await;
entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
if let Some(ref pk) = peer_key {
if op_manager
.interest_manager
.get_peer_interest(&contract, pk)
.is_some()
{
op_manager
.interest_manager
.refresh_peer_interest(&contract, pk);
} else {
let is_new = op_manager.interest_manager.register_peer_interest(
&contract,
pk.clone(),
None, false,
);
if is_new {
op_manager
.flush_pending_broadcast_on_interest(&contract)
.await;
}
}
}
}
if entries.is_empty() {
None
} else {
Some(InterestMessage::Summaries { entries })
}
}
InterestMessage::Summaries { entries } => {
tracing::debug!(
from = %source,
entry_count = entries.len(),
"Received Summaries message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
let mut stale_contracts = Vec::new();
let emit_confirmed = crate::config::SimulationIdleTimeout::is_enabled();
let mut confirmed_states: Vec<(freenet_stdlib::prelude::ContractKey, String)> =
Vec::new();
if let Some(pk) = peer_key {
for entry in entries {
for contract in op_manager.interest_manager.lookup_by_hash(entry.hash) {
if !op_manager.interest_manager.has_local_interest(&contract) {
continue;
}
let their_summary = entry.to_summary();
let our_summary = summary_if_hosted_or_in_use(op_manager, &contract).await;
if emit_confirmed {
if let Some(ref summary) = our_summary {
confirmed_states.push((contract, hex::encode(summary.as_ref())));
}
}
let is_stale = our_summary
.as_ref()
.zip(their_summary.as_ref())
.is_some_and(|(ours, theirs)| ours.as_ref() != theirs.as_ref());
op_manager.interest_manager.update_peer_summary(
&contract,
&pk,
their_summary,
);
if is_stale && !stale_contracts.contains(&contract) {
stale_contracts.push(contract);
}
}
}
}
let total_stale = stale_contracts.len();
let emit_budget = stale_sync_emit_budget(total_stale);
if total_stale > emit_budget {
let start = crate::config::GlobalRng::random_range(0..total_stale);
stale_contracts.rotate_left(start);
}
let mut emitted = 0usize;
for contract in stale_contracts {
if emitted >= emit_budget {
tracing::warn!(
stale_peer = %source,
total_stale,
emitted,
cap = MAX_STALE_SYNCS_PER_SUMMARIES,
"Stale-contract sync cap hit for Summaries message; \
deferring the remainder to a later interest-sync cycle"
);
break;
}
if op_manager.ring.contract_ban_list.is_banned(contract.id()) {
tracing::debug!(
%contract,
stale_peer = %source,
phase = "interest_sync_banned_skip",
"skipping summary-mismatch sync for banned contract"
);
continue;
}
let Some(state) = get_contract_state(op_manager, &contract).await else {
tracing::trace!(
contract = %contract,
"Skipping stale-peer sync — no local state available"
);
continue;
};
emitted += 1;
tracing::debug!(
contract = %contract,
stale_peer = %source,
"Summary mismatch in interest sync — syncing state to stale peer"
);
if let Err(e) = op_manager.try_notify_node_event(NodeEvent::SyncStateToPeer {
key: contract,
new_state: state,
target: source,
}) {
tracing::debug!(
contract = %contract,
error = %e,
"Failed to emit SyncStateToPeer for stale peer correction (best-effort)"
);
}
}
for (key, state_hash) in confirmed_states {
if let Some(event) =
crate::tracing::NetEventLog::state_confirmed(&op_manager.ring, key, state_hash)
{
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
}
None
}
InterestMessage::ChangeInterests { added, removed } => {
tracing::debug!(
from = %source,
added_count = added.len(),
removed_count = removed.len(),
"Received ChangeInterests message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
for hash in removed {
for contract in op_manager.interest_manager.lookup_by_hash(hash) {
op_manager
.interest_manager
.remove_peer_interest(&contract, pk);
}
}
}
let mut entries = Vec::new();
if let Some(ref pk) = peer_key {
for hash in added {
for contract in op_manager.interest_manager.lookup_by_hash(hash) {
if !op_manager.interest_manager.has_local_interest(&contract) {
continue;
}
let is_new = op_manager.interest_manager.register_peer_interest(
&contract,
pk.clone(),
None,
false,
);
if is_new {
op_manager
.flush_pending_broadcast_on_interest(&contract)
.await;
}
let summary = summary_if_hosted_or_in_use(op_manager, &contract).await;
entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
}
}
}
if entries.is_empty() {
None
} else {
Some(InterestMessage::Summaries { entries })
}
}
InterestMessage::ResyncRequest { key } => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_request_received",
"Received ResyncRequest - peer needs full state"
);
op_manager.interest_manager.record_resync_request_received();
crate::config::GlobalTestMetrics::record_resync_request();
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
op_manager
.interest_manager
.update_peer_summary(&key, pk, None);
}
let from_peer = op_manager.ring.connection_manager.get_peer_by_addr(source);
if let Some(ref from_pkl) = from_peer {
if let Some(event) = crate::tracing::NetEventLog::resync_request_received(
&op_manager.ring,
key,
from_pkl.clone(),
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
} else {
tracing::debug!(
contract = %key,
source = %source,
"ResyncRequest telemetry skipped: peer lookup failed"
);
}
let state = get_contract_state(op_manager, &key).await;
let Some(state) = state else {
tracing::warn!(
contract = %key,
"ResyncRequest for contract we don't have state for"
);
return None;
};
let summary = get_contract_summary(op_manager, &key).await;
let Some(summary) = summary else {
tracing::warn!(
contract = %key,
"ResyncRequest for contract we can't compute summary for"
);
return None;
};
tracing::info!(
to = %source,
contract = %key,
state_size = state.as_ref().len(),
summary_size = summary.as_ref().len(),
event = "resync_response_sent",
"Sending ResyncResponse with full state"
);
if let Some(ref to_pkl) = from_peer {
if let Some(event) = crate::tracing::NetEventLog::resync_response_sent(
&op_manager.ring,
key,
to_pkl.clone(),
state.as_ref().len(),
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
}
Some(InterestMessage::ResyncResponse {
key,
state_bytes: state.as_ref().to_vec(),
summary_bytes: summary.as_ref().to_vec(),
})
}
InterestMessage::ResyncResponse {
key,
state_bytes,
summary_bytes,
} => {
tracing::info!(
from = %source,
contract = %key,
state_size = state_bytes.len(),
event = "resync_response_received",
"Received ResyncResponse with full state"
);
let state = freenet_stdlib::prelude::State::from(state_bytes.clone());
let update_data = freenet_stdlib::prelude::UpdateData::State(state);
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::UpdateQuery {
key,
data: update_data,
related_contracts: Default::default(),
})
.await
{
Ok(ContractHandlerEvent::UpdateResponse {
new_value: Ok(_), ..
}) => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_applied",
changed = true,
"ResyncResponse state applied successfully"
);
}
Ok(ContractHandlerEvent::UpdateNoChange { .. }) => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_applied",
changed = false,
"ResyncResponse state unchanged (already had this state)"
);
}
Ok(other) => {
tracing::debug!(
from = %source,
contract = %key,
event = "resync_failed",
response = %other,
"Unexpected response to resync update"
);
}
Err(e) => {
tracing::error!(
from = %source,
contract = %key,
event = "resync_failed",
error = %e,
"Failed to apply resync state"
);
}
}
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(pk) = peer_key {
let summary = freenet_stdlib::prelude::StateSummary::from(summary_bytes);
op_manager
.interest_manager
.update_peer_summary(&key, &pk, Some(summary));
}
None
}
}
}
async fn get_contract_state(
op_manager: &Arc<OpManager>,
key: &freenet_stdlib::prelude::ContractKey,
) -> Option<freenet_stdlib::prelude::WrappedState> {
get_contract_state_by_id(op_manager, key.id())
.await
.map(|(_, state)| state)
}
async fn get_contract_state_by_id(
op_manager: &Arc<OpManager>,
instance_id: &freenet_stdlib::prelude::ContractInstanceId,
) -> Option<(
freenet_stdlib::prelude::ContractKey,
freenet_stdlib::prelude::WrappedState,
)> {
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code: false,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
key: Some(key),
response: Ok(store_response),
}) => store_response.state.map(|state| (key, state)),
Ok(ContractHandlerEvent::GetResponse {
response: Err(e), ..
}) => {
tracing::warn!(
contract = %instance_id,
error = %e,
"Failed to get contract state by instance id"
);
None
}
_ => None,
}
}
async fn get_contract_summary(
op_manager: &Arc<OpManager>,
key: &freenet_stdlib::prelude::ContractKey,
) -> Option<freenet_stdlib::prelude::StateSummary<'static>> {
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::GetSummaryQuery { key: *key })
.await
{
Ok(ContractHandlerEvent::GetSummaryResponse {
summary: Ok(summary),
..
}) => Some(summary),
Ok(ContractHandlerEvent::GetSummaryResponse {
summary: Err(e), ..
}) => {
tracing::debug!(
contract = %key,
error = %e,
"Failed to get contract summary"
);
None
}
_ => None,
}
}
async fn summary_if_hosted_or_in_use(
op_manager: &Arc<OpManager>,
key: &freenet_stdlib::prelude::ContractKey,
) -> Option<freenet_stdlib::prelude::StateSummary<'static>> {
if op_manager.ring.is_hosting_contract(key) || op_manager.ring.contract_in_use(key) {
get_contract_summary(op_manager, key).await
} else {
None
}
}
fn get_peer_key_from_addr(
op_manager: &Arc<OpManager>,
addr: std::net::SocketAddr,
) -> Option<crate::ring::interest::PeerKey> {
op_manager
.ring
.connection_manager
.get_peer_by_addr(addr)
.map(|pkl| crate::ring::interest::PeerKey::from(pkl.pub_key.clone()))
}
#[allow(dead_code)]
pub async fn subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_id: Option<ClientId>,
) -> Result<Transaction, OpError> {
subscribe_with_id(op_manager, instance_id, client_id, None).await
}
pub async fn subscribe_with_id(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_id: Option<ClientId>,
transaction_id: Option<Transaction>,
) -> Result<Transaction, OpError> {
let client_tx = match transaction_id {
Some(id) => id,
None => Transaction::new::<subscribe::SubscribeMsg>(),
};
if let Some(client_id) = client_id {
use crate::client_events::RequestId;
let request_id = RequestId::new();
if let Err(e) = op_manager
.ch_outbound
.waiting_for_subscription_result(client_tx, instance_id, client_id, request_id)
.await
{
tracing::warn!(tx = %client_tx, error = %e, "failed to register subscription result waiter");
}
}
subscribe::start_client_subscribe(op_manager, instance_id, client_tx).await
}
pub type PeerId = crate::ring::KnownPeerKeyLocation;
pub async fn run_local_node(
mut executor: Executor,
socket: WebsocketApiConfig,
) -> anyhow::Result<()> {
if !crate::server::is_private_ip(&socket.address) {
anyhow::bail!(
"invalid ip: {}, only loopback and private network addresses are allowed",
socket.address
)
}
crate::node::network_status::init(
socket.port,
std::collections::HashSet::new(),
crate::config::PCK_VERSION.to_string(),
);
let (mut gw, mut ws_proxy) = crate::server::serve_client_api_in(socket).await?;
enum Receiver {
Ws,
Gw,
}
let mut receiver;
loop {
let req = crate::deterministic_select! {
req = ws_proxy.recv() => {
receiver = Receiver::Ws;
req?
},
req = gw.recv() => {
receiver = Receiver::Gw;
req?
},
};
let OpenRequest {
client_id: id,
request,
notification_channel,
token,
origin_contract,
user_context,
..
} = req;
tracing::debug!(client_id = %id, ?token, "Received OpenRequest -> {request}");
let res = match *request {
ClientRequest::ContractOp(op) => {
executor
.contract_requests(op, id, notification_channel)
.await
}
ClientRequest::DelegateOp(op) => {
let op_name = match op {
DelegateRequest::RegisterDelegate { .. } => "RegisterDelegate",
DelegateRequest::ApplicationMessages { .. } => "ApplicationMessages",
DelegateRequest::UnregisterDelegate(_) => "UnregisterDelegate",
_ => "Unknown",
};
tracing::debug!(
op_name = ?op_name,
?origin_contract,
"Handling ClientRequest::DelegateOp"
);
executor.delegate_request(op, origin_contract.as_ref(), None, user_context.as_ref())
}
ClientRequest::Disconnect { cause } => {
if let Some(cause) = cause {
tracing::info!("disconnecting cause: {cause}");
}
continue;
}
ClientRequest::Authenticate { .. }
| ClientRequest::NodeQueries(_)
| ClientRequest::Close
| _ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
};
match res {
Ok(res) => {
match receiver {
Receiver::Ws => ws_proxy.send(id, Ok(res)).await?,
Receiver::Gw => gw.send(id, Ok(res)).await?,
};
}
Err(err) if err.is_request() => {
let err = ErrorKind::RequestError(err.unwrap_request());
match receiver {
Receiver::Ws => {
ws_proxy.send(id, Err(err.into())).await?;
}
Receiver::Gw => {
gw.send(id, Err(err.into())).await?;
}
};
}
Err(err) => {
tracing::error!("{err}");
let err = Err(ErrorKind::Unhandled {
cause: format!("{err}").into(),
}
.into());
match receiver {
Receiver::Ws => {
ws_proxy.send(id, err).await?;
}
Receiver::Gw => {
gw.send(id, err).await?;
}
};
}
}
}
}
pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
tracing::info!("Starting node");
let is_gateway = node.inner.is_gateway;
let location = if let Some(loc) = node.inner.location {
Some(loc)
} else {
is_gateway
.then(|| {
node.inner
.peer_id
.as_ref()
.map(|id| Location::from_address(&id.socket_addr()))
})
.flatten()
};
if let Some(location) = location {
tracing::info!("Setting initial location: {location}");
node.update_location(location);
}
match node.run().await {
Ok(_) => {
if is_gateway {
tracing::info!("Gateway finished");
} else {
tracing::info!("Node finished");
}
Ok(())
}
Err(e) => {
tracing::error!("{e}");
Err(e)
}
}
}
#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr};
use super::*;
use rstest::rstest;
fn assert_log_site_pin(
needle: &str,
expected_macro: &str,
must_contain: &[&str],
must_not_contain: &[&str],
) {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/node.rs");
let source = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("must read own source at {}: {e}", path.display()));
let idx = source
.find(needle)
.unwrap_or_else(|| panic!("log message `{needle}` must still exist in source"));
let preceding = &source[..idx];
let macro_idx = preceding
.rfind("tracing::")
.unwrap_or_else(|| panic!("a tracing macro must precede the `{needle}` log site"));
let line_start = preceding[..macro_idx].rfind('\n').map_or(0, |n| n + 1);
let line_prefix = &preceding[line_start..macro_idx];
assert!(
line_prefix.chars().all(char::is_whitespace),
"rfind matched `tracing::` inside a string literal or comment, \
not a macro invocation. Prefix on its line: {line_prefix:?}"
);
let after_macro = &preceding[macro_idx + "tracing::".len()..];
let macro_name = after_macro.split('!').next().unwrap_or("");
let tail_start = preceding
.char_indices()
.map(|(i, _)| i)
.find(|&i| preceding.len() - i <= 200)
.unwrap_or(0);
let context = &preceding[tail_start..];
assert_eq!(
macro_name, expected_macro,
"log site for `{needle}` must be at `tracing::{expected_macro}!` \
(closest preceding macro is `tracing::{macro_name}!`). \
A level change here restores an issue #4251 regression.\n\
Preceding source (last 200 bytes):\n{context}"
);
let macro_body = &source[macro_idx..idx];
for substr in must_contain {
assert!(
macro_body.contains(substr),
"log site for `{needle}` must contain `{substr}` in its macro invocation:\n{macro_body}"
);
}
for forbidden in must_not_contain {
assert!(
!macro_body.contains(forbidden),
"log site for `{needle}` must NOT contain `{forbidden}` \
(would restore an issue #4251 regression):\n{macro_body}"
);
}
}
#[test]
fn summary_mismatch_in_interest_sync_logs_at_debug_pin_test() {
assert_log_site_pin(
"Summary mismatch in interest sync \u{2014} syncing state to stale peer",
"debug",
&[],
&[],
);
}
#[test]
fn unexpected_resync_response_uses_display_not_debug_pin_test() {
assert_log_site_pin(
"Unexpected response to resync update",
"debug",
&["response = %other"],
&["response = ?other"],
);
}
#[test]
fn failed_to_get_contract_summary_logs_at_debug_pin_test() {
assert_log_site_pin("Failed to get contract summary", "debug", &[], &[]);
}
#[test]
fn interest_sync_periodic_arms_summarize_only_hosted_or_in_use_pin() {
let src = include_str!("node.rs");
let helper_start = src
.find("async fn summary_if_hosted_or_in_use(")
.expect("summary_if_hosted_or_in_use helper not found");
let helper_end = helper_start
+ src[helper_start..]
.find("\n}\n")
.expect("summary_if_hosted_or_in_use body end not found");
let helper_src = &src[helper_start..helper_end];
assert!(
helper_src.contains("is_hosting_contract"),
"summary_if_hosted_or_in_use must gate on is_hosting_contract"
);
assert!(
helper_src.contains("contract_in_use"),
"summary_if_hosted_or_in_use must ALSO gate on contract_in_use so an \
evicted-but-in-use stateful contract keeps its interest-sync heal"
);
let handler_start = src
.find("async fn handle_interest_sync_message(")
.expect("handle_interest_sync_message not found");
let resync_off = src[handler_start..]
.find("InterestMessage::ResyncRequest")
.expect("ResyncRequest arm not found");
let periodic_arms = &src[handler_start..handler_start + resync_off];
assert!(
!periodic_arms.contains("get_contract_summary("),
"the periodic interest-sync arms (Interests/Summaries/ChangeInterests) \
must call summary_if_hosted_or_in_use, not get_contract_summary \
directly (#4473) — a bare call here reintroduces the summarize storm"
);
let gated_calls = periodic_arms
.matches("summary_if_hosted_or_in_use(")
.count();
assert!(
gated_calls >= 3,
"expected the 3 periodic interest-sync arms to call \
summary_if_hosted_or_in_use, found {gated_calls}"
);
}
#[test]
fn neighbor_hosting_overlap_sync_gates_before_state_fetch_pin() {
let src = include_str!("node.rs");
let loop_start = src
.find("for instance_id in result.overlapping_contracts {")
.expect("NeighborHosting overlap-sync loop not found");
let loop_src = &src[loop_start..];
let gate_pos = loop_src
.find("op_manager.pending_broadcasts.contains(&instance_id)")
.expect(
"overlap-sync loop MUST gate on the activity predicate + \
pending_broadcasts.contains before fetching state (#4473)",
);
let recv_pos = loop_src
.find("is_receiving_updates(&probe_key)")
.expect("overlap-sync gate MUST check is_receiving_updates on the probe key");
let downstream_pos = loop_src
.find("has_downstream_subscribers(&probe_key)")
.expect("overlap-sync gate MUST check has_downstream_subscribers on the probe key");
let fetch_pos = loop_src
.find("get_contract_state_by_id(&op_manager, &instance_id)")
.expect("overlap-sync loop must still fetch state on the served path");
assert!(
recv_pos < fetch_pos && downstream_pos < fetch_pos && gate_pos < fetch_pos,
"the activity gate (is_receiving_updates || has_downstream_subscribers \
|| pending_broadcasts.contains) MUST precede get_contract_state_by_id, \
or the #4473 fetch_contract churn regresses for phantom contracts"
);
}
#[tokio::test]
async fn test_hostname_resolution_localhost() {
let addr = Address::Hostname("localhost".to_string());
let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert!(
socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
|| socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
);
assert_eq!(
socket_addr.port(),
crate::config::DEFAULT_GATEWAY_PORT,
"port-less gateway host must default to 31337, not a random port"
);
}
#[tokio::test]
async fn test_hostname_resolution_with_port() {
let addr = Address::Hostname("google.com:8080".to_string());
let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert_eq!(socket_addr.port(), 8080);
}
#[tokio::test]
async fn test_host_variant_defaults_to_gateway_port() {
let addr = Address::Host {
host: "localhost".to_string(),
port: crate::config::DEFAULT_GATEWAY_PORT,
};
let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert!(
socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
|| socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
);
assert_eq!(socket_addr.port(), crate::config::DEFAULT_GATEWAY_PORT);
}
#[tokio::test]
async fn test_host_variant_explicit_port() {
let addr = Address::Host {
host: "localhost".to_string(),
port: 12345,
};
let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert_eq!(socket_addr.port(), 12345);
}
#[tokio::test]
async fn test_hostname_resolution_with_trailing_dot() {
let addr = Address::Hostname("localhost.".to_string());
let result = NodeConfig::parse_socket_addr(&addr).await;
if let Ok(socket_addr) = result {
assert!(
socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
|| socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
);
}
}
#[tokio::test]
async fn test_hostname_resolution_direct_socket_addr() {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
let addr = Address::HostAddress(socket);
let resolved = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert_eq!(resolved, socket);
}
#[tokio::test]
async fn test_hostname_resolution_invalid_port() {
let addr = Address::Hostname("localhost:not_a_port".to_string());
let result = NodeConfig::parse_socket_addr(&addr).await;
assert!(result.is_err());
}
#[ignore]
#[rstest]
#[case::same_addr_different_keys(8080, 8080, true)]
#[case::different_addr_same_key(8080, 8081, false)]
fn test_peer_id_equality(#[case] port1: u16, #[case] port2: u16, #[case] expected_equal: bool) {
let keypair1 = TransportKeypair::new();
let keypair2 = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port1);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port2);
let peer1 = PeerId::new(keypair1.public().clone(), addr1);
let peer2 = PeerId::new(keypair2.public().clone(), addr2);
assert_eq!(peer1 == peer2, expected_equal);
}
#[test]
fn test_peer_id_equality_same_key_same_addr() {
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair.public().clone(), addr);
let peer2 = PeerId::new(keypair.public().clone(), addr);
assert_eq!(peer1, peer2);
}
#[test]
fn test_peer_id_equality_different_key_same_addr() {
let keypair1 = TransportKeypair::new();
let keypair2 = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair1.public().clone(), addr);
let peer2 = PeerId::new(keypair2.public().clone(), addr);
assert_ne!(peer1, peer2);
}
#[test]
fn test_peer_id_equality_different_addr() {
let keypair = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081);
let peer1 = PeerId::new(keypair.public().clone(), addr1);
let peer2 = PeerId::new(keypair.public().clone(), addr2);
assert_ne!(peer1, peer2);
}
#[rstest]
#[case::lower_port_first(8080, 8081)]
#[case::high_port_diff(1024, 65535)]
fn test_peer_id_ordering(#[case] lower_port: u16, #[case] higher_port: u16) {
let keypair = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), lower_port);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), higher_port);
let peer1 = PeerId::new(keypair.public().clone(), addr1);
let peer2 = PeerId::new(keypair.public().clone(), addr2);
assert!(peer1 < peer2);
assert!(peer2 > peer1);
}
#[test]
fn test_peer_id_hash_consistency() {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair.public().clone(), addr);
let peer2 = PeerId::new(keypair.public().clone(), addr);
let mut hasher1 = DefaultHasher::new();
let mut hasher2 = DefaultHasher::new();
peer1.hash(&mut hasher1);
peer2.hash(&mut hasher2);
assert_eq!(hasher1.finish(), hasher2.finish());
}
#[test]
fn test_peer_id_random_produces_unique() {
let peer1 = PeerId::random();
let peer2 = PeerId::random();
assert_ne!(peer1.socket_addr(), peer2.socket_addr());
}
#[test]
fn test_peer_id_serialization() {
let peer = PeerId::random();
let bytes = peer.to_bytes();
assert!(!bytes.is_empty());
let deserialized: PeerId = bincode::deserialize(&bytes).unwrap();
assert_eq!(peer.socket_addr(), deserialized.socket_addr());
}
#[test]
fn test_peer_id_display() {
let peer = PeerId::random();
let display = format!("{}", peer);
let debug = format!("{:?}", peer);
assert_eq!(display, debug);
assert!(!display.is_empty());
}
#[test]
fn test_init_peer_node_construction() {
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
let peer_key_location = PeerKeyLocation::new(keypair.public().clone(), addr);
let location = Location::new(0.5);
let init_peer = InitPeerNode::new(peer_key_location.clone(), location);
assert_eq!(init_peer.peer_key_location, peer_key_location);
assert_eq!(init_peer.location, location);
}
mod inbound_subscribe_hint_gate {
use crate::node::network_bridge::p2p_protoc::{
SUBSCRIBE_HINT_MIN_VERSION, own_crate_version, version_supports_subscribe_hint,
};
#[ignore]
#[test]
fn receive_gate_ignores_hint_while_deactivated() {
let own = own_crate_version();
assert!(
own < SUBSCRIBE_HINT_MIN_VERSION,
"own version {own:?} must be below the parked floor \
{SUBSCRIBE_HINT_MIN_VERSION:?} for the migration to stay off"
);
assert!(
!version_supports_subscribe_hint(Some(own), SUBSCRIBE_HINT_MIN_VERSION),
"while deactivated, the receive gate must IGNORE inbound hints \
(own version below the floor)"
);
assert!(!version_supports_subscribe_hint(
Some((0, 2, 73)),
SUBSCRIBE_HINT_MIN_VERSION
));
}
#[test]
fn receive_gate_active_at_reenable_floor() {
assert!(version_supports_subscribe_hint(
Some((0, 2, 80)),
SUBSCRIBE_HINT_MIN_VERSION
));
assert!(version_supports_subscribe_hint(
Some((0, 3, 0)),
SUBSCRIBE_HINT_MIN_VERSION
));
assert!(!version_supports_subscribe_hint(
Some((0, 2, 79)),
SUBSCRIBE_HINT_MIN_VERSION
));
assert!(!version_supports_subscribe_hint(
Some((0, 2, 73)),
SUBSCRIBE_HINT_MIN_VERSION
));
assert!(!version_supports_subscribe_hint(
None,
SUBSCRIBE_HINT_MIN_VERSION
));
}
#[test]
fn receive_gate_acts_on_hint_when_floor_lowered() {
let own = own_crate_version();
assert!(
version_supports_subscribe_hint(Some(own), (0, 0, 0)),
"with the floor lowered to (0,0,0) the receive side must act on hints"
);
}
#[test]
fn receive_gate_is_wired_before_directed_subscribe() {
const SOURCE: &str = include_str!("node.rs");
let arm_anchor: String = ["NetMessageV1::", "SubscribeHint(hint)", " => {"].concat();
let arm_start = SOURCE
.find(&arm_anchor)
.expect("SubscribeHint receive arm not found — update this guard");
let next_anchor: String = ["NetMessageV1::", "Aborted(tx)", " => {"].concat();
let arm_end = SOURCE[arm_start..]
.find(&next_anchor)
.map(|i| arm_start + i)
.expect("end of SubscribeHint arm not found — update guard");
let arm = &SOURCE[arm_start..arm_end];
let gate_idx = arm
.find("version_supports_subscribe_hint(")
.expect("receive arm must call version_supports_subscribe_hint as a gate");
let directed_idx = arm
.find("start_directed_subscribe(")
.expect("receive arm must still call start_directed_subscribe");
assert!(
gate_idx < directed_idx,
"the version gate must run BEFORE start_directed_subscribe"
);
assert!(
arm.contains("subscribe_hint_floor_override()"),
"receive gate must read the same per-node floor override as the send side"
);
}
}
mod callback_forward_tests {
use super::super::try_forward_driver_reply;
use crate::message::{MessageStats, NetMessage, NetMessageV1, Transaction};
use crate::operations::connect::ConnectMsg;
fn dummy_reply() -> NetMessage {
NetMessage::V1(NetMessageV1::Aborted(Transaction::new::<ConnectMsg>()))
}
#[tokio::test]
async fn bypass_forwards_when_callback_registered() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let reply = dummy_reply();
let expected_id = *reply.id();
let taken = try_forward_driver_reply(Some(&tx), reply, "subscribe");
assert!(taken, "callback present → bypass must be taken");
let received = rx
.try_recv()
.expect("helper should forward the reply to the callback");
match received {
crate::node::WaiterReply::Reply(msg) => assert_eq!(*msg.id(), expected_id),
other => panic!("expected WaiterReply::Reply, got: {other:?}"),
}
}
#[tokio::test]
async fn bypass_returns_false_when_no_callback() {
let taken = try_forward_driver_reply(None, dummy_reply(), "subscribe");
assert!(!taken, "no callback → bypass must not be taken");
}
#[tokio::test]
async fn bypass_returns_true_even_when_receiver_dropped() {
let (tx, rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
drop(rx);
let taken = try_forward_driver_reply(Some(&tx), dummy_reply(), "subscribe");
assert!(
taken,
"callback present but receiver dropped → bypass still taken"
);
}
#[test]
fn forward_driver_reply_logs_benign_drop_at_debug_only() {
const SOURCE: &str = include_str!("node.rs");
let fn_anchor: String = ["fn try_forward_driver_reply", "("].concat();
let start = SOURCE.find(&fn_anchor).expect(
"try_forward_driver_reply definition not found — \
it was renamed or moved; update this guard",
);
let fn_end: String = ["\n", "}", "\n"].concat();
let after = start + fn_anchor.len();
let window_end = SOURCE[after..]
.find(&fn_end)
.map(|i| after + i + fn_end.len())
.expect("closing brace of try_forward_driver_reply not found");
let body = &SOURCE[start..window_end];
let debug_macro: String = ["tracing", "::debug!"].concat();
let warn_macro: String = ["tracing", "::warn!"].concat();
let error_macro: String = ["tracing", "::error!"].concat();
assert!(
body.contains(&debug_macro),
"the benign dropped-reply path must be logged at debug"
);
assert!(
!body.contains(&error_macro),
"try_forward_driver_reply must NOT log at error: the dropped \
reply (closed receiver from a cancelled SUBSCRIBE renewal, or \
a full CONNECT fan-in channel) is benign and intentionally \
lossy. Re-escalating to error! reintroduces the false-alarm \
spam this guard prevents (see issue #4350)."
);
assert!(
!body.contains(&warn_macro),
"try_forward_driver_reply must NOT log at warn: CONNECT's \
capacity-N fan-in legitimately reaches the full-channel case \
under load, so warning on the benign drop is also a false \
alarm."
);
}
#[test]
fn bypass_is_wired_into_subscribe_branch_regression_guard() {
const SOURCE: &str = include_str!("node.rs");
let subscribe_branch_anchor: String =
["NetMessageV1::", "Subscribe(ref op)", " => {"].concat();
let branch_start = SOURCE.find(&subscribe_branch_anchor).expect(
"SUBSCRIBE branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let next_variant_anchor: String = ["// Non-transactional", " message types:"].concat();
let window_end = SOURCE[branch_start..]
.find(&next_variant_anchor)
.expect("end of SUBSCRIBE branch not found — update guard")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_driver_reply("),
"SUBSCRIBE branch no longer calls \
try_forward_driver_reply before relay dispatch. \
Either restore the bypass or update this regression \
guard if the branch was legitimately refactored."
);
let response_gate: String = [
"matches!(op, ",
"subscribe::SubscribeMsg::Response { .. }",
")",
]
.concat();
assert!(
window.contains(&response_gate),
"SUBSCRIBE branch bypass is not gated on Response-only. \
Non-terminal messages (ForwardingAck, Unsubscribe) must NOT \
be forwarded to the driver channel — they would fill \
the capacity-1 reply slot and block the real Response."
);
}
#[test]
fn put_branch_bypass_includes_error_variant_regression_guard() {
const SOURCE: &str = include_str!("node.rs");
let put_branch_anchor: String = ["NetMessageV1::", "Put(ref op)", " => {"].concat();
let branch_start = SOURCE.find(&put_branch_anchor).expect(
"PUT branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_anchor: String = ["NetMessageV1::", "Get(ref op)", " => {"].concat();
let window_end = SOURCE[branch_start..]
.find(&next_anchor)
.expect("end of PUT branch not found — update guard")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_driver_reply("),
"PUT branch no longer calls try_forward_driver_reply \
— either restore the bypass or update this guard."
);
let gate_start = window
.find("matches!(\n op,")
.or_else(|| window.find("matches!(op,"))
.expect("terminal-gate matches! not found in PUT branch");
let gate_end = window[gate_start..]
.find(") && try_forward_driver_reply(")
.expect("end of terminal-gate matches! not found")
+ gate_start;
let gate = &window[gate_start..gate_end];
for expected in [
"put::PutMsg::Response { .. }",
"put::PutMsg::ResponseStreaming { .. }",
"put::PutMsg::Error { .. }",
] {
assert!(
gate.contains(expected),
"PUT bypass terminal-gate missing `{expected}` — \
issue #4111: without Error in the gate, the \
originator-loopback failure path's \
send_local_loopback(PutMsg::Error) lands in the \
dispatch wildcard and the originator's retry-loop \
re-runs the same deterministic local failure."
);
}
}
#[tokio::test]
async fn bypass_does_not_block_when_channel_already_full() {
let (tx, _rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
tx.try_send(crate::node::WaiterReply::Reply(dummy_reply()))
.expect("capacity-1 channel should accept first message");
let taken = try_forward_driver_reply(Some(&tx), dummy_reply(), "subscribe");
assert!(
taken,
"callback present but channel full → bypass still taken"
);
}
use crate::operations::VisitedPeers;
use crate::operations::subscribe::{SubscribeMsg, SubscribeMsgResult};
fn subscribe_branch_would_forward(
op: &SubscribeMsg,
callback: Option<&tokio::sync::mpsc::Sender<crate::node::WaiterReply>>,
) -> bool {
matches!(op, SubscribeMsg::Response { .. })
&& try_forward_driver_reply(
callback,
NetMessage::V1(NetMessageV1::Subscribe(op.clone())),
"subscribe",
)
}
#[tokio::test]
async fn subscribe_response_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([1u8; 32]);
let key = freenet_stdlib::prelude::ContractKey::from_id_and_code(
instance_id,
freenet_stdlib::prelude::CodeHash::new([2u8; 32]),
);
let op = SubscribeMsg::Response {
id: sub_tx,
instance_id,
result: SubscribeMsgResult::Subscribed { key },
hop_count: 0,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(taken, "Response with callback → must be forwarded");
match rx.try_recv().expect("Response should be in channel") {
crate::node::WaiterReply::Reply(msg) => assert_eq!(*msg.id(), sub_tx),
other => panic!("expected WaiterReply::Reply, got: {other:?}"),
}
}
#[tokio::test]
async fn forwarding_ack_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([3u8; 32]);
let op = SubscribeMsg::ForwardingAck {
id: sub_tx,
instance_id,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(
!taken,
"ForwardingAck must NOT be forwarded to task channel"
);
assert!(
rx.try_recv().is_err(),
"channel must remain empty after ForwardingAck"
);
}
#[tokio::test]
async fn unsubscribe_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([4u8; 32]);
let op = SubscribeMsg::Unsubscribe {
id: sub_tx,
instance_id,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Unsubscribe must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn request_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([5u8; 32]);
let op = SubscribeMsg::Request {
id: sub_tx,
instance_id,
htl: 5,
visited: VisitedPeers::new(&sub_tx),
is_renewal: false,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Request must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn response_without_callback_falls_through() {
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([6u8; 32]);
let op = SubscribeMsg::Response {
id: sub_tx,
instance_id,
result: SubscribeMsgResult::NotFound,
hop_count: 0,
};
let taken = subscribe_branch_would_forward(&op, None);
assert!(
!taken,
"Response without callback → must fall through to legacy path"
);
}
#[test]
fn bypass_is_wired_into_put_branch_regression_guard() {
const SOURCE: &str = include_str!("node.rs");
let put_branch_anchor = "NetMessageV1::Put(ref op) => {";
let branch_start = SOURCE.find(put_branch_anchor).expect(
"PUT branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let next_variant = "NetMessageV1::Get(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of PUT arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_driver_reply("),
"PUT branch no longer calls try_forward_driver_reply. \
Restore the bypass or update this regression guard."
);
assert!(
window.contains("put::PutMsg::Response { .. }"),
"PUT branch bypass is not gated on Response. \
Non-terminal messages must NOT be forwarded to the driver channel."
);
assert!(
window.contains("put::PutMsg::ResponseStreaming { .. }"),
"PUT branch bypass is not gated on ResponseStreaming. \
Both terminal variants must be forwarded."
);
let legacy_dispatch_needle = format!("handle{}::<put::PutOp, _>", "_op_request");
assert!(
!window.contains(&legacy_dispatch_needle),
"PUT branch must not call legacy state-machine dispatch"
);
let dashmap_gate_needle = format!("has{}_op", "_put");
assert!(
!window.contains(&dashmap_gate_needle),
"PUT branch must not gate dispatch on per-op DashMap existence"
);
}
use crate::operations::put::PutMsg;
use freenet_stdlib::prelude::*;
fn dummy_put_key(a: u8, b: u8) -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([a; 32]), CodeHash::new([b; 32]))
}
fn put_branch_would_forward(
op: &PutMsg,
callback: Option<&tokio::sync::mpsc::Sender<crate::node::WaiterReply>>,
) -> bool {
matches!(
op,
PutMsg::Response { .. } | PutMsg::ResponseStreaming { .. }
) && try_forward_driver_reply(
callback,
NetMessage::V1(NetMessageV1::Put(op.clone())),
"put",
)
}
#[tokio::test]
async fn put_response_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(10, 11);
let op = PutMsg::Response {
id: put_tx,
key,
hop_count: 0,
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(taken, "Response with callback → must be forwarded");
match rx.try_recv().expect("Response should be in channel") {
crate::node::WaiterReply::Reply(msg) => assert_eq!(*msg.id(), put_tx),
other => panic!("expected WaiterReply::Reply, got: {other:?}"),
}
}
#[tokio::test]
async fn put_response_streaming_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(12, 13);
let op = PutMsg::ResponseStreaming {
id: put_tx,
key,
continue_forwarding: false,
hop_count: 0,
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(taken, "ResponseStreaming with callback → must be forwarded");
match rx
.try_recv()
.expect("ResponseStreaming should be in channel")
{
crate::node::WaiterReply::Reply(msg) => assert_eq!(*msg.id(), put_tx),
other => panic!("expected WaiterReply::Reply, got: {other:?}"),
}
}
#[tokio::test]
async fn put_forwarding_ack_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(14, 15);
let op = PutMsg::ForwardingAck {
id: put_tx,
contract_key: key,
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(
!taken,
"ForwardingAck must NOT be forwarded to task channel"
);
assert!(
rx.try_recv().is_err(),
"channel must remain empty after ForwardingAck"
);
}
#[tokio::test]
async fn put_request_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<crate::node::WaiterReply>(1);
let put_tx = Transaction::new::<PutMsg>();
let op = PutMsg::Request {
id: put_tx,
contract: ContractContainer::Wasm(ContractWasmAPIVersion::V1(
WrappedContract::new(
std::sync::Arc::new(ContractCode::from(vec![0u8])),
Parameters::from(vec![]),
),
)),
related_contracts: RelatedContracts::default(),
value: WrappedState::new(vec![1u8]),
htl: 5,
skip_list: std::collections::HashSet::new(),
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Request must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn put_response_without_callback_falls_through() {
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(16, 17);
let op = PutMsg::Response {
id: put_tx,
key,
hop_count: 0,
};
let taken = put_branch_would_forward(&op, None);
assert!(
!taken,
"Response without callback → must fall through to legacy path"
);
}
#[test]
fn get_branch_dispatches_relay_driver() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Get(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"GET branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_variant = "NetMessageV1::Update(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of GET arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_driver_reply("),
"GET branch no longer calls try_forward_driver_reply \
before relay dispatch. Restore the bypass."
);
assert!(
window.contains("get::GetMsg::Response { .. }"),
"GET branch bypass is not gated on Response. \
Non-terminal messages must NOT be forwarded to the driver channel."
);
assert!(
window.contains("get::GetMsg::ResponseStreaming { .. }"),
"GET branch bypass is not gated on ResponseStreaming. \
Both terminal variants must be forwarded."
);
assert!(
window.contains("start_relay_get("),
"GET branch no longer calls start_relay_get for relay dispatch."
);
assert!(
window.contains("effective_upstream") || window.contains("upstream_addr"),
"GET relay dispatch must thread an effective upstream address \
(source_addr or own_addr loopback) into the relay driver."
);
let legacy_dispatch_needle = format!("handle{}::<get::GetOp, _>", "_op_request");
assert!(
!window.contains(&legacy_dispatch_needle),
"GET branch must NOT call legacy state-machine dispatch"
);
let dashmap_gate_needle = format!("has{}_op", "_get");
assert!(
!window.contains(&dashmap_gate_needle),
"GET branch must NOT gate on per-op DashMap existence"
);
let bypass_pos = window
.find("try_forward_driver_reply(")
.expect("try_forward_driver_reply not found in GET branch");
let relay_pos = window
.find("start_relay_get(")
.expect("start_relay_get not found in GET branch");
assert!(
bypass_pos < relay_pos,
"Reply bypass (try_forward_driver_reply) must \
appear BEFORE relay dispatch (start_relay_get) — \
swapping order would break the terminal-reply fast \
path."
);
}
#[test]
fn update_branch_dispatches_all_relay_drivers() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Update(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"UPDATE branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let next_variant = "NetMessageV1::Subscribe(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of UPDATE arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
for driver in [
"start_relay_request_update(",
"start_relay_broadcast_to(",
"start_relay_request_update_streaming(",
"start_relay_broadcast_to_streaming(",
] {
assert!(
window.contains(driver),
"UPDATE branch must call {driver} for relay dispatch."
);
}
let legacy_call = ["handle_op_request::<update::", "UpdateOp", ", _>"].concat();
assert!(
!window.contains(&legacy_call),
"UPDATE branch must NOT call handle_op_request"
);
let dispatch_gate = ["has_", "update_op"].concat();
assert!(
!window.contains(&dispatch_gate),
"UPDATE relay dispatch must NOT consult has_update_op"
);
}
#[test]
fn update_branch_dispatch_gates_on_source_addr() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Update(ref op) => {";
let branch_start = SOURCE.find(anchor).expect("UPDATE branch not found");
let next_variant = "NetMessageV1::Subscribe(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of UPDATE arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("if let Some(sender_addr) = source_addr"),
"UPDATE relay dispatch must be gated on source_addr.is_some() — \
internal callers must NOT spawn relay drivers."
);
}
#[test]
fn put_branch_dispatches_relay_drivers() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Put(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"PUT branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_variant = "NetMessageV1::Get(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of PUT arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("start_relay_put("),
"PUT branch no longer calls start_relay_put for relay dispatch."
);
assert!(
window.contains("start_relay_put_streaming("),
"PUT branch must call start_relay_put_streaming for streaming relay hops."
);
assert!(
window.contains("effective_upstream") || window.contains("upstream_addr"),
"PUT relay dispatch must thread an effective upstream address \
(source_addr or own_addr loopback) into the relay drivers."
);
let legacy_dispatch_needle = format!("handle{}::<put::PutOp, _>", "_op_request");
assert!(
!window.contains(&legacy_dispatch_needle),
"PUT branch must NOT call legacy state-machine dispatch"
);
let dashmap_gate_needle = format!("has{}_op", "_put");
assert!(
!window.contains(&dashmap_gate_needle),
"PUT branch must NOT gate on per-op DashMap existence"
);
}
#[test]
fn start_relay_put_handles_upgrade_on_forward() {
const SOURCE: &str = include_str!("operations/put/op_ctx_task.rs");
let anchor = "async fn drive_relay_put<CB>(";
let driver_start = SOURCE
.find(anchor)
.expect("drive_relay_put fn not found — has the signature changed?");
let driver_end = SOURCE[driver_start + anchor.len()..]
.find("\nasync fn ")
.map(|idx| idx + driver_start + anchor.len())
.unwrap_or(SOURCE.len());
let body = &SOURCE[driver_start..driver_end];
assert!(
body.contains("should_use_streaming("),
"drive_relay_put must call should_use_streaming on the merged \
payload to decide between non-streaming Request and streaming \
upgrade on forward."
);
assert!(
body.contains("PutMsg::RequestStreaming {"),
"drive_relay_put must build PutMsg::RequestStreaming when the \
forwarded payload would exceed streaming_threshold."
);
assert!(
body.contains("send_stream("),
"drive_relay_put must call NetworkBridge::send_stream for the \
raw fragments after the RequestStreaming metadata send."
);
}
#[test]
fn subscribe_branch_dispatches_relay_driver() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Subscribe(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"SUBSCRIBE branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_variant = "// Non-transactional message types:";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of SUBSCRIBE arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_driver_reply("),
"SUBSCRIBE branch no longer calls try_forward_driver_reply \
before relay dispatch — restore it."
);
assert!(
window.contains("subscribe::SubscribeMsg::Response { .. }"),
"SUBSCRIBE branch bypass is not gated on Response. \
Non-terminal messages must NOT be forwarded to the driver channel."
);
assert!(
window.contains("start_relay_subscribe("),
"SUBSCRIBE branch no longer calls start_relay_subscribe for relay \
dispatch — restore it."
);
assert!(
window.contains("handle_unsubscribe_inbound("),
"SUBSCRIBE branch must call handle_unsubscribe_inbound \
for Unsubscribe wire messages."
);
assert!(
window.contains("effective_upstream") || window.contains("upstream_addr"),
"SUBSCRIBE relay dispatch must thread an effective upstream address \
(source_addr or own_addr loopback) into the relay driver."
);
let legacy_dispatch_needle =
format!("handle{}::<subscribe::SubscribeOp, _>", "_op_request");
assert!(
!window.contains(&legacy_dispatch_needle),
"SUBSCRIBE branch must NOT call legacy state-machine dispatch"
);
let dashmap_gate_needle = format!("has{}_op", "_subscribe");
assert!(
!window.contains(&dashmap_gate_needle),
"SUBSCRIBE branch must NOT gate on per-op DashMap existence"
);
let bypass_pos = window
.find("try_forward_driver_reply(")
.expect("try_forward_driver_reply not found in SUBSCRIBE branch");
let relay_pos = window
.find("start_relay_subscribe(")
.expect("start_relay_subscribe not found in SUBSCRIBE branch");
assert!(
bypass_pos < relay_pos,
"SUBSCRIBE bypass (try_forward_driver_reply) must appear \
BEFORE relay dispatch (start_relay_subscribe). Swapping order \
would break the client-driver terminal-reply fast path."
);
}
}
mod fill_connect_response_acceptor_addr_tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::super::fill_connect_response_acceptor_addr;
use crate::message::Transaction;
use crate::operations::connect::{ConnectMsg, ConnectResponse};
use crate::ring::{PeerAddr, PeerKeyLocation};
fn dummy_unknown_pkl() -> PeerKeyLocation {
let pkl = PeerKeyLocation::random();
PeerKeyLocation {
pub_key: pkl.pub_key,
peer_addr: PeerAddr::Unknown,
}
}
fn known_addr() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 5)), 50051)
}
#[test]
fn fills_unknown_acceptor_addr_from_source_addr() {
let id = Transaction::new::<ConnectMsg>();
let payload = ConnectResponse {
acceptor: dummy_unknown_pkl(),
};
let msg = ConnectMsg::Response { id, payload };
let source = known_addr();
let filled = fill_connect_response_acceptor_addr(msg, Some(source));
#[allow(clippy::wildcard_enum_match_arm)]
match filled {
ConnectMsg::Response { payload, .. } => {
assert_eq!(payload.acceptor.socket_addr(), Some(source));
}
other => panic!("expected Response, got {other:?}"),
}
}
#[test]
fn leaves_known_acceptor_addr_unchanged() {
let id = Transaction::new::<ConnectMsg>();
let original_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 7)), 12345);
let pkl = PeerKeyLocation::random();
let payload = ConnectResponse {
acceptor: PeerKeyLocation {
pub_key: pkl.pub_key,
peer_addr: PeerAddr::Known(original_addr),
},
};
let msg = ConnectMsg::Response { id, payload };
let filled = fill_connect_response_acceptor_addr(msg, Some(known_addr()));
#[allow(clippy::wildcard_enum_match_arm)]
match filled {
ConnectMsg::Response { payload, .. } => {
assert_eq!(
payload.acceptor.socket_addr(),
Some(original_addr),
"fill must NOT overwrite a known acceptor address"
);
}
other => panic!("expected Response, got {other:?}"),
}
}
#[test]
fn unknown_acceptor_without_source_addr_passes_through() {
let id = Transaction::new::<ConnectMsg>();
let payload = ConnectResponse {
acceptor: dummy_unknown_pkl(),
};
let msg = ConnectMsg::Response { id, payload };
let filled = fill_connect_response_acceptor_addr(msg, None);
#[allow(clippy::wildcard_enum_match_arm)]
match filled {
ConnectMsg::Response { payload, .. } => {
assert!(
payload.acceptor.peer_addr.is_unknown(),
"fill must remain Unknown when source_addr is None"
);
}
other => panic!("expected Response, got {other:?}"),
}
}
#[test]
fn rejected_variant_passes_through_untouched() {
use crate::ring::Location;
let id = Transaction::new::<ConnectMsg>();
let dl = Location::new(0.42);
let msg = ConnectMsg::Rejected {
id,
desired_location: dl,
};
let filled = fill_connect_response_acceptor_addr(msg, Some(known_addr()));
#[allow(clippy::wildcard_enum_match_arm)]
match filled {
ConnectMsg::Rejected {
id: rid,
desired_location,
} => {
assert_eq!(rid, id);
assert_eq!(desired_location, dl);
}
other => panic!("expected Rejected, got {other:?}"),
}
}
}
mod connect_bypass_coverage_guards {
const SOURCE: &str = include_str!("node.rs");
fn connect_branch_window() -> &'static str {
let branch_anchor = "NetMessageV1::Connect(ref op) => {";
let branch_start = SOURCE.find(branch_anchor).expect(
"Connect branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_variant_anchor = "NetMessageV1::Put(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant_anchor)
.expect("end of Connect branch not found — update guard")
+ branch_start;
&SOURCE[branch_start..window_end]
}
#[test]
fn connect_branch_bypass_forwards_response() {
assert!(
connect_branch_window().contains("connect::ConnectMsg::Response { .. }"),
"Connect bypass `matches!` no longer forwards Response. \
Response is the joiner-fan-in terminal variant and MUST \
reach the per-tx multi-reply receiver."
);
}
#[test]
fn connect_branch_bypass_forwards_rejected() {
assert!(
connect_branch_window().contains("connect::ConnectMsg::Rejected { .. }"),
"Connect bypass `matches!` no longer forwards Rejected. \
Relay drivers and the joiner driver both observe Rejected \
to record connection failure / record_connection_failure."
);
}
#[test]
fn connect_branch_bypass_forwards_observed_address() {
assert!(
connect_branch_window().contains("connect::ConnectMsg::ObservedAddress { .. }"),
"Connect bypass `matches!` no longer forwards \
ObservedAddress. The joiner driver inbox owns the \
set_own_addr / update_location side effect; dropping \
ObservedAddress here breaks NAT discovery."
);
}
#[test]
fn connect_branch_bypass_forwards_connect_failed() {
assert!(
connect_branch_window().contains("connect::ConnectMsg::ConnectFailed { .. }"),
"Connect bypass `matches!` no longer forwards \
ConnectFailed. The relay driver inbox owns hole-punch \
failure re-route; dropping ConnectFailed here strands \
the re-route on legacy `process_message`."
);
}
#[test]
fn connect_branch_bypass_does_not_forward_request() {
let window = connect_branch_window();
let bypass_anchor = "if matches!(\n op,";
let bypass_start = window
.find(bypass_anchor)
.expect("bypass `matches!` block not found in Connect branch — guard outdated");
let bypass_end = window[bypass_start..]
.find(") {")
.expect("bypass `matches!` block has no closing `) {`")
+ bypass_start;
let bypass_block = &window[bypass_start..bypass_end];
assert!(
!bypass_block.contains("connect::ConnectMsg::Request"),
"Connect bypass `matches!` MUST NOT forward Request. \
Request is the spawn signal for start_relay_connect; \
forwarding it would route fresh Requests into a multi-reply \
receiver that doesn't exist yet."
);
}
#[test]
fn connect_branch_dispatches_start_relay_connect_for_fresh_request() {
let window = connect_branch_window();
assert!(
window.contains("start_relay_connect("),
"Connect branch no longer calls start_relay_connect — \
removing it strands relay CONNECT on legacy."
);
}
#[test]
fn connect_relay_dispatch_gated_on_source_addr() {
let window = connect_branch_window();
let dispatch_anchor = "start_relay_connect(";
let dispatch_pos = window
.find(dispatch_anchor)
.expect("start_relay_connect not found in Connect branch");
let gate_start = dispatch_pos.saturating_sub(500);
let gate_window = &window[gate_start..dispatch_pos];
assert!(
gate_window.contains("source_addr"),
"CONNECT relay dispatch is not gated on source_addr — \
originator loop-back must NOT spawn a relay driver."
);
}
#[test]
fn connect_relay_dispatch_guarded_by_active_relay_set() {
let window = connect_branch_window();
let dispatch_pos = window
.find("start_relay_connect(")
.expect("start_relay_connect not found in Connect branch");
let gate_start = dispatch_pos.saturating_sub(500);
let gate_window = &window[gate_start..dispatch_pos];
assert!(
gate_window.contains("active_relay_connect_txs"),
"CONNECT relay dispatch is not guarded by \
active_relay_connect_txs.contains(id). Without it, a \
duplicate Request retransmission could spawn a second \
driver before the first inserts into the dedup set."
);
}
}
mod resync_request_clears_sender_summary {
const SOURCE: &str = include_str!("node.rs");
fn resync_request_arm() -> &'static str {
let arm_anchor = "InterestMessage::ResyncRequest { key } => {";
let arm_start = SOURCE.find(arm_anchor).expect(
"ResyncRequest arm of handle_interest_sync_message not found — \
the match arm has been renamed or moved; update this guard",
);
let next_anchor = "InterestMessage::ResyncResponse {";
let arm_end = SOURCE[arm_start..]
.find(next_anchor)
.map(|i| arm_start + i)
.expect("end of ResyncRequest arm not found — update guard");
&SOURCE[arm_start..arm_end]
}
#[test]
fn resync_request_handler_clears_cached_peer_summary() {
let arm = resync_request_arm();
assert!(
arm.contains("update_peer_summary"),
"ResyncRequest handler no longer calls update_peer_summary. \
#4145 caching relies on this handler clearing the sender's \
cached summary so a delta-apply failure forces a fresh \
full-state resend instead of looping on unappliable deltas."
);
let collapsed: String = arm.chars().filter(|c| !c.is_whitespace()).collect();
assert!(
collapsed.contains("update_peer_summary(&key,pk,None)"),
"ResyncRequest handler must clear the cached summary with \
`update_peer_summary(&key, pk, None)` (the `None` clears it). \
Caching a summary here instead would defeat the #4145 backstop."
);
}
}
mod shutdown_drain {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
fn make_handle(
initial_count: usize,
drain_timeout: Duration,
) -> (
ShutdownHandle,
Arc<AtomicUsize>,
Arc<std::sync::atomic::AtomicBool>,
tokio::sync::mpsc::Receiver<NodeEvent>,
) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let counter = Arc::new(AtomicUsize::new(initial_count));
let gate = Arc::new(std::sync::atomic::AtomicBool::new(false));
let handle = ShutdownHandle {
tx,
inflight_client_ops: counter.clone(),
shutting_down: gate.clone(),
drain_timeout,
};
(handle, counter, gate, rx)
}
#[tokio::test]
async fn shutdown_with_zero_ops_returns_immediately() {
let (handle, _counter, _gate, mut rx) = make_handle(0, Duration::from_secs(60));
let start = std::time::Instant::now();
handle.shutdown().await;
assert!(
start.elapsed() < Duration::from_millis(100),
"shutdown with zero in-flight ops should not sleep"
);
assert!(matches!(
rx.recv().await.expect("Disconnect must be sent"),
NodeEvent::Disconnect { .. }
));
}
#[tokio::test]
async fn shutdown_waits_then_proceeds_on_timeout() {
let (handle, _counter, _gate, mut rx) = make_handle(1, Duration::from_millis(200));
let start = std::time::Instant::now();
handle.shutdown().await;
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(180),
"shutdown should wait the full drain timeout when ops \
never finish (elapsed: {elapsed:?})"
);
assert!(matches!(
rx.recv()
.await
.expect("Disconnect must be sent even on drain timeout"),
NodeEvent::Disconnect { .. }
));
}
#[tokio::test]
async fn shutdown_proceeds_as_soon_as_counter_clears() {
let (handle, counter, _gate, mut rx) = make_handle(1, Duration::from_secs(5));
let counter_clone = counter.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
counter_clone.fetch_sub(1, Ordering::Relaxed);
});
let start = std::time::Instant::now();
handle.shutdown().await;
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(80) && elapsed < Duration::from_secs(2),
"shutdown should return shortly after the counter clears, \
not wait the full drain timeout (elapsed: {elapsed:?})"
);
assert!(matches!(
rx.recv().await.expect("Disconnect must be sent"),
NodeEvent::Disconnect { .. }
));
}
#[tokio::test]
async fn drain_disabled_skips_wait_even_with_ops_in_flight() {
let (handle, _counter, _gate, mut rx) = make_handle(5, Duration::ZERO);
let start = std::time::Instant::now();
handle.shutdown().await;
assert!(
start.elapsed() < Duration::from_millis(50),
"drain_timeout=0 must skip the wait"
);
assert!(matches!(
rx.recv().await.expect("Disconnect must be sent"),
NodeEvent::Disconnect { .. }
));
}
#[tokio::test]
async fn shutdown_closes_admission_gate_before_drain() {
let (handle, counter, gate, mut rx) = make_handle(1, Duration::from_millis(500));
assert!(
!gate.load(Ordering::Relaxed),
"admission gate must start closed"
);
let counter_clone = counter.clone();
let gate_clone = gate.clone();
let observed_during_drain = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let g = gate_clone.load(Ordering::Relaxed);
counter_clone.fetch_sub(1, Ordering::Relaxed);
g
});
handle.shutdown().await;
let gate_was_set_during_drain = observed_during_drain
.await
.expect("observer task must not panic");
assert!(
gate_was_set_during_drain,
"shutdown() must flip the admission gate BEFORE the \
drain wait, not after. Otherwise a new client op \
spawned during the drain bypasses the gate, bumps \
the counter (now unobserved), and gets cut off."
);
assert!(matches!(
rx.recv().await.expect("Disconnect must be sent"),
NodeEvent::Disconnect { .. }
));
}
}
mod stale_sync_cap {
use super::super::{
MAX_STALE_SYNCS_PER_SUMMARIES, StaleSyncDisposition, count_stale_syncs_emitted,
emitted_indices_for_rotation, stale_sync_emit_budget,
};
use std::collections::HashSet;
const EMIT: StaleSyncDisposition = StaleSyncDisposition::Emit;
const BANNED: StaleSyncDisposition = StaleSyncDisposition::Banned;
const NO_STATE: StaleSyncDisposition = StaleSyncDisposition::NoState;
#[test]
fn emit_budget_caps_at_max() {
assert_eq!(stale_sync_emit_budget(0), 0);
assert_eq!(stale_sync_emit_budget(1), 1);
assert_eq!(
stale_sync_emit_budget(MAX_STALE_SYNCS_PER_SUMMARIES - 1),
MAX_STALE_SYNCS_PER_SUMMARIES - 1
);
assert_eq!(
stale_sync_emit_budget(MAX_STALE_SYNCS_PER_SUMMARIES),
MAX_STALE_SYNCS_PER_SUMMARIES
);
assert_eq!(
stale_sync_emit_budget(MAX_STALE_SYNCS_PER_SUMMARIES + 1),
MAX_STALE_SYNCS_PER_SUMMARIES
);
assert_eq!(
stale_sync_emit_budget(MAX_STALE_SYNCS_PER_SUMMARIES * 100),
MAX_STALE_SYNCS_PER_SUMMARIES
);
}
#[test]
fn many_stale_contracts_emit_at_most_cap() {
let n = MAX_STALE_SYNCS_PER_SUMMARIES * 100; let dispositions = vec![EMIT; n];
let emitted = count_stale_syncs_emitted(&dispositions);
assert_eq!(
emitted, MAX_STALE_SYNCS_PER_SUMMARIES,
"a peer diverging on {n} contracts must emit at most the cap \
({MAX_STALE_SYNCS_PER_SUMMARIES}) SyncStateToPeer events per \
Summaries message, not one per contract"
);
}
#[test]
fn exactly_cap_emits_all() {
let dispositions = vec![EMIT; MAX_STALE_SYNCS_PER_SUMMARIES];
assert_eq!(
count_stale_syncs_emitted(&dispositions),
MAX_STALE_SYNCS_PER_SUMMARIES
);
}
#[test]
fn below_cap_emits_all_emittable() {
let dispositions = vec![EMIT; 5];
assert_eq!(count_stale_syncs_emitted(&dispositions), 5);
assert_eq!(count_stale_syncs_emitted(&[]), 0);
}
#[test]
fn skips_do_not_consume_budget() {
let mut dispositions = vec![BANNED, NO_STATE, BANNED, NO_STATE, BANNED];
dispositions.extend([NO_STATE, BANNED, NO_STATE, BANNED, NO_STATE]);
dispositions.extend([EMIT, EMIT, EMIT]);
assert_eq!(
count_stale_syncs_emitted(&dispositions),
3,
"banned/no-state contracts must be skipped without consuming \
the emit budget"
);
}
#[test]
fn interleaved_skips_still_capped() {
let mut dispositions = Vec::new();
for _ in 0..(MAX_STALE_SYNCS_PER_SUMMARIES * 3) {
dispositions.push(BANNED);
dispositions.push(EMIT);
}
let emitted = count_stale_syncs_emitted(&dispositions);
assert_eq!(
emitted, MAX_STALE_SYNCS_PER_SUMMARIES,
"emitted SyncStateToPeer events must be capped at \
{MAX_STALE_SYNCS_PER_SUMMARIES} even with skips interleaved"
);
}
#[test]
fn rotation_covers_every_contract_over_cap() {
let total = MAX_STALE_SYNCS_PER_SUMMARIES * 3; let mut covered = HashSet::new();
for start in 0..total {
for idx in emitted_indices_for_rotation(total, start) {
covered.insert(idx);
}
}
assert_eq!(
covered.len(),
total,
"every one of the {total} stale contracts must be reachable for \
some rotation start — otherwise contracts past the cap are \
permanently starved when the leading prefix stays stale"
);
for start in 0..total {
assert_eq!(
emitted_indices_for_rotation(total, start).len(),
MAX_STALE_SYNCS_PER_SUMMARIES
);
}
}
#[test]
fn stuck_prefix_does_not_block_tail_under_rotation() {
let total = MAX_STALE_SYNCS_PER_SUMMARIES + 1; let last = total - 1;
let mut found = false;
for start in 0..total {
let window: HashSet<usize> = emitted_indices_for_rotation(total, start)
.into_iter()
.collect();
if window.contains(&last) && !window.contains(&0) {
found = true;
break;
}
}
assert!(
found,
"there must be a rotation start that attempts the tail contract \
({last}) without attempting the (assumed-stuck) head contract \
(0); otherwise a stuck head starves the tail"
);
}
#[test]
fn stale_sync_loop_uses_emit_budget_pin() {
const SOURCE: &str = include_str!("node.rs");
let loop_anchor = "for contract in stale_contracts {";
let start = SOURCE.find(loop_anchor).expect(
"stale-contract emission loop not found; the `for contract in \
stale_contracts` loop has been renamed or moved — update this \
pin and re-verify the #3798 Gap 1 cap is still applied",
);
let window_end = SOURCE[start..]
.find("for (key, state_hash) in confirmed_states {")
.map(|off| start + off)
.unwrap_or(SOURCE.len());
let budget_decl = "let emit_budget = stale_sync_emit_budget(";
let budget_pos = SOURCE[..start]
.rfind(budget_decl)
.expect("emit budget is not computed before the stale-sync loop");
let window = &SOURCE[budget_pos..window_end];
assert!(
window.contains("stale_sync_emit_budget("),
"stale-sync loop no longer computes the emit budget — the \
#3798 Gap 1 cap has been dropped"
);
assert!(
window.contains("if emitted >= emit_budget {"),
"stale-sync loop no longer breaks when the emit budget is \
reached — the #3798 Gap 1 cap is not enforced"
);
assert!(
window.contains("emitted += 1;"),
"stale-sync loop no longer counts emissions against the budget \
— the #3798 Gap 1 cap cannot be enforced without it"
);
assert!(
window.contains("MAX_STALE_SYNCS_PER_SUMMARIES"),
"stale-sync cap warning no longer references the cap constant"
);
assert!(
window.contains("if total_stale > emit_budget {")
&& window.contains("rotate_left("),
"stale-sync loop no longer rotates the stale set when over the \
cap — over-cap contracts can be permanently starved by a stuck \
prefix (#3798 Gap 1 / #4468 codex P2)"
);
assert!(
window.contains("GlobalRng::random_range("),
"stale-sync rotation offset is no longer drawn from GlobalRng — \
a fixed/non-random rotation does not avoid starvation and \
breaks simulation determinism"
);
}
}
}