use anyhow::Context;
use freenet_stdlib::{
client_api::{ClientRequest, ErrorKind},
prelude::ContractInstanceId,
};
use std::{
borrow::Cow,
fs::File,
io::Read,
net::{IpAddr, SocketAddr, ToSocketAddrs},
sync::Arc,
time::Duration,
};
use std::{collections::HashSet, convert::Infallible};
use self::p2p_impl::NodeP2P;
use crate::{
client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest},
config::{Address, GatewayConfig, WebsocketApiConfig},
contract::{ExecutorError, NetworkContractHandler},
local_node::Executor,
message::{InnerMessage, NetMessage, NodeEvent, Transaction, TransactionType},
operations::{OpError, connect, get, put, subscribe, update},
ring::{Location, PeerKeyLocation},
tracing::{EventRegister, NetEventLog, NetEventRegister},
};
use crate::{
config::Config,
message::{MessageStats, NetMessageV1},
};
use freenet_stdlib::client_api::DelegateRequest;
use serde::{Deserialize, Serialize};
pub(crate) use network_bridge::{
ConnectionError, EventLoopNotificationsSender, NetworkBridge, OpExecutionPayload,
};
#[cfg(test)]
pub(crate) use network_bridge::{EventLoopNotificationsReceiver, event_loop_notification_channel};
pub use network_bridge::{EventLoopExitReason, NetworkStats, reset_channel_id_counter};
use crate::topology::rate::Rate;
use crate::transport::{TransportKeypair, TransportPublicKey};
pub(crate) use op_state_manager::OpManager;
mod network_bridge;
pub use network_bridge::in_memory::{FaultInjectorState, get_fault_injector, set_fault_injector};
pub(crate) mod background_task_monitor;
pub(crate) mod neighbor_hosting;
pub(crate) mod network_status;
mod op_state_manager;
mod p2p_impl;
mod request_router;
pub(crate) mod testing_impl;
pub use request_router::{DeduplicatedRequest, RequestRouter};
#[derive(Clone)]
pub struct ShutdownHandle {
tx: tokio::sync::mpsc::Sender<NodeEvent>,
}
impl ShutdownHandle {
pub async fn shutdown(&self) {
if let Err(err) = self
.tx
.send(NodeEvent::Disconnect {
cause: Some("graceful shutdown".into()),
})
.await
{
tracing::debug!(
error = %err,
"failed to send graceful shutdown signal; shutdown channel may already be closed"
);
}
}
}
pub struct Node {
inner: NodeP2P,
shutdown_handle: ShutdownHandle,
}
impl Node {
pub fn update_location(&mut self, location: Location) {
self.inner
.op_manager
.ring
.connection_manager
.update_location(Some(location));
}
pub fn shutdown_handle(&self) -> ShutdownHandle {
self.shutdown_handle.clone()
}
pub async fn run(self) -> anyhow::Result<Infallible> {
self.inner.run_node().await
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[non_exhaustive] pub struct NodeConfig {
pub should_connect: bool,
pub is_gateway: bool,
pub key_pair: TransportKeypair,
pub network_listener_ip: IpAddr,
pub network_listener_port: u16,
pub(crate) own_addr: Option<SocketAddr>,
pub(crate) config: Arc<Config>,
pub(crate) gateways: Vec<InitPeerNode>,
pub(crate) location: Option<Location>,
pub(crate) max_hops_to_live: Option<usize>,
pub(crate) rnd_if_htl_above: Option<usize>,
pub(crate) max_number_conn: Option<usize>,
pub(crate) min_number_conn: Option<usize>,
pub(crate) max_upstream_bandwidth: Option<Rate>,
pub(crate) max_downstream_bandwidth: Option<Rate>,
pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
pub(crate) transient_budget: usize,
pub(crate) transient_ttl: Duration,
#[serde(default)]
pub(crate) relay_ready_connections: Option<usize>,
}
impl NodeConfig {
pub async fn new(config: Config) -> anyhow::Result<NodeConfig> {
tracing::info!("Loading node configuration for mode {}", config.mode);
let own_pub_key = config.transport_keypair().public();
let mut gateways = Vec::with_capacity(config.gateways.len());
for gw in &config.gateways {
let GatewayConfig {
address,
public_key_path,
location,
} = gw;
let mut key_bytes = None;
for attempt in 0..10 {
let mut key_file = File::open(public_key_path).with_context(|| {
format!("failed loading gateway pubkey from {public_key_path:?}")
})?;
let mut buf = String::new();
key_file.read_to_string(&mut buf)?;
let buf = buf.trim();
if buf.starts_with("-----BEGIN") {
if attempt < 9 {
tracing::debug!(
public_key_path = ?public_key_path,
attempt = attempt + 1,
"Gateway public key is still RSA PEM format, waiting for X25519 conversion..."
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
} else {
tracing::warn!(
public_key_path = ?public_key_path,
"Gateway public key still in RSA PEM format after 5s. Skipping this gateway."
);
break;
}
}
match hex::decode(buf) {
Ok(bytes) if bytes.len() == 32 => {
key_bytes = Some(bytes);
break;
}
Ok(bytes) => {
anyhow::bail!(
"invalid gateway pubkey length {} (expected 32) from {public_key_path:?}",
bytes.len()
);
}
Err(e) => {
anyhow::bail!(
"failed to decode gateway pubkey hex from {public_key_path:?}: {e}"
);
}
}
}
let key_bytes = match key_bytes {
Some(bytes) => bytes,
None => continue, };
let mut key_arr = [0u8; 32];
key_arr.copy_from_slice(&key_bytes);
let transport_pub_key = TransportPublicKey::from_bytes(key_arr);
if &transport_pub_key == own_pub_key {
tracing::warn!(
"Skipping gateway with same public key as self: {:?}",
public_key_path
);
continue;
}
let address = Self::parse_socket_addr(address).await?;
let peer_key_location = PeerKeyLocation::new(transport_pub_key, address);
let location = location
.map(Location::new)
.unwrap_or_else(|| Location::from_address(&address));
gateways.push(InitPeerNode::new(peer_key_location, location));
}
tracing::info!(
"Node will be listening at {}:{} internal address",
config.network_api.address,
config.network_api.port
);
if let Some(own_addr) = &config.peer_id {
tracing::info!("Node external address: {}", own_addr.socket_addr());
}
Ok(NodeConfig {
should_connect: true,
is_gateway: config.is_gateway,
key_pair: config.transport_keypair().clone(),
gateways,
own_addr: config.peer_id.clone().map(|p| p.socket_addr()),
network_listener_ip: config.network_api.address,
network_listener_port: config.network_api.port,
location: config.location.map(Location::new),
config: Arc::new(config.clone()),
max_hops_to_live: None,
rnd_if_htl_above: None,
max_number_conn: Some(config.network_api.max_connections),
min_number_conn: Some(config.network_api.min_connections),
max_upstream_bandwidth: None,
max_downstream_bandwidth: None,
blocked_addresses: config.network_api.blocked_addresses.clone(),
transient_budget: config.network_api.transient_budget,
transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs),
relay_ready_connections: if config.network_api.skip_load_from_network {
Some(0) } else {
Some(3) },
})
}
pub(crate) async fn parse_socket_addr(address: &Address) -> anyhow::Result<SocketAddr> {
let (hostname, port) = match address {
crate::config::Address::Hostname(hostname) => {
match hostname.rsplit_once(':') {
None => {
let hostname_with_port =
format!("{}:{}", hostname, crate::config::default_network_api_port());
if let Ok(mut addrs) = hostname_with_port.to_socket_addrs() {
if let Some(addr) = addrs.next() {
return Ok(addr);
}
}
(Cow::Borrowed(hostname.as_str()), None)
}
Some((host, port)) => match port.parse::<u16>() {
Ok(port) => {
if let Ok(mut addrs) = hostname.to_socket_addrs() {
if let Some(addr) = addrs.next() {
return Ok(addr);
}
}
(Cow::Borrowed(host), Some(port))
}
Err(_) => return Err(anyhow::anyhow!("Invalid port number: {port}")),
},
}
}
Address::HostAddress(addr) => return Ok(*addr),
};
let (conf, opts) = hickory_resolver::system_conf::read_system_conf()?;
let resolver = hickory_resolver::TokioAsyncResolver::new(
conf,
opts,
hickory_resolver::name_server::GenericConnector::new(
hickory_resolver::name_server::TokioRuntimeProvider::new(),
),
);
let hostname = if hostname.ends_with('.') {
hostname
} else {
Cow::Owned(format!("{hostname}."))
};
let ips = resolver.lookup_ip(hostname.as_ref()).await?;
match ips.into_iter().next() {
Some(ip) => Ok(SocketAddr::new(
ip,
port.unwrap_or_else(crate::config::default_network_api_port),
)),
None => Err(anyhow::anyhow!("Fail to resolve IP address of {hostname}")),
}
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn is_gateway(&mut self) -> &mut Self {
self.is_gateway = true;
self
}
pub fn first_gateway(&mut self) {
self.should_connect = false;
}
pub fn with_should_connect(&mut self, should_connect: bool) -> &mut Self {
self.should_connect = should_connect;
self
}
pub fn max_hops_to_live(&mut self, num_hops: usize) -> &mut Self {
self.max_hops_to_live = Some(num_hops);
self
}
pub fn rnd_if_htl_above(&mut self, num_hops: usize) -> &mut Self {
self.rnd_if_htl_above = Some(num_hops);
self
}
pub fn max_number_of_connections(&mut self, num: usize) -> &mut Self {
self.max_number_conn = Some(num);
self
}
pub fn min_number_of_connections(&mut self, num: usize) -> &mut Self {
self.min_number_conn = Some(num);
self
}
pub fn relay_ready_connections(&mut self, num: Option<usize>) -> &mut Self {
self.relay_ready_connections = num;
self
}
pub fn with_own_addr(&mut self, addr: SocketAddr) -> &mut Self {
self.own_addr = Some(addr);
self
}
pub fn with_location(&mut self, loc: Location) -> &mut Self {
self.location = Some(loc);
self
}
pub fn add_gateway(&mut self, peer: InitPeerNode) -> &mut Self {
self.gateways.push(peer);
self
}
pub async fn build<const CLIENTS: usize>(
self,
clients: [BoxedClient; CLIENTS],
) -> anyhow::Result<Node> {
let (node, _flush_handle) = self.build_with_flush_handle(clients).await?;
Ok(node)
}
pub async fn build_with_flush_handle<const CLIENTS: usize>(
self,
clients: [BoxedClient; CLIENTS],
) -> anyhow::Result<(Node, crate::tracing::EventFlushHandle)> {
let (event_register, flush_handle) = {
use super::tracing::{DynamicRegister, TelemetryReporter};
let event_reg = EventRegister::new(self.config.event_log());
let flush_handle = event_reg.flush_handle();
let mut registers: Vec<Box<dyn NetEventRegister>> = vec![Box::new(event_reg)];
#[cfg(feature = "trace-ot")]
{
use super::tracing::OTEventRegister;
registers.push(Box::new(OTEventRegister::new()));
}
if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry) {
registers.push(Box::new(telemetry));
}
(DynamicRegister::new(registers), flush_handle)
};
let cfg = self.config.clone();
let (node_inner, shutdown_tx) = NodeP2P::build::<NetworkContractHandler, CLIENTS, _>(
self,
clients,
event_register,
cfg,
)
.await?;
let shutdown_handle = ShutdownHandle { tx: shutdown_tx };
Ok((
Node {
inner: node_inner,
shutdown_handle,
},
flush_handle,
))
}
pub fn get_own_addr(&self) -> Option<SocketAddr> {
self.own_addr
}
fn get_gateways(&self) -> anyhow::Result<Vec<PeerKeyLocation>> {
let gateways: Vec<PeerKeyLocation> = self
.gateways
.iter()
.map(|node| node.peer_key_location.clone())
.collect();
if !self.is_gateway && gateways.is_empty() {
anyhow::bail!(
"At least one remote gateway is required to join an existing network for non-gateway nodes."
)
} else {
Ok(gateways)
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct InitPeerNode {
peer_key_location: PeerKeyLocation,
location: Location,
}
impl InitPeerNode {
pub fn new(peer_key_location: PeerKeyLocation, location: Location) -> Self {
Self {
peer_key_location,
location,
}
}
}
async fn report_result(
tx: Option<Transaction>,
op_result: Result<(), OpError>,
op_manager: &OpManager,
_event_listener: &mut dyn NetEventRegister,
) {
if let Some(tx_id) = tx {
if matches!(tx_id.transaction_type(), TransactionType::Update) {
tracing::debug!("report_result called for UPDATE transaction {}", tx_id);
}
}
match op_result {
Ok(()) => {
tracing::debug!(?tx, "Network message dispatch finished");
}
Err(err) => {
if let Some(tx) = tx {
if !tx.is_sub_operation() {
let client_error = freenet_stdlib::client_api::ClientError::from(
freenet_stdlib::client_api::ErrorKind::OperationError {
cause: err.to_string().into(),
},
);
op_manager.send_client_result(tx, Err(client_error));
}
op_manager.completed(tx);
}
#[cfg(any(debug_assertions, test))]
{
use std::io::Write;
#[cfg(debug_assertions)]
let OpError::InvalidStateTransition { tx, state, trace } = err else {
tracing::error!("Finished transaction with error: {err}");
return;
};
#[cfg(not(debug_assertions))]
let OpError::InvalidStateTransition { tx } = err else {
tracing::error!("Finished transaction with error: {err}");
return;
};
#[cfg(debug_assertions)]
let trace = format!("{trace}");
#[cfg(debug_assertions)]
{
let mut tr_lines = trace.lines();
let trace = tr_lines
.nth(2)
.map(|second_trace| {
let second_trace_lines =
[second_trace, tr_lines.next().unwrap_or_default()];
second_trace_lines.join("\n")
})
.unwrap_or_default();
let peer = op_manager.ring.connection_manager.own_location();
let log = format!(
"Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n"
);
std::io::stderr().write_all(log.as_bytes()).unwrap();
}
#[cfg(not(debug_assertions))]
{
let peer = op_manager.ring.connection_manager.own_location();
let log = format!("Transaction ({tx} @ {peer}) error\n");
std::io::stderr().write_all(log.as_bytes()).unwrap();
}
}
#[cfg(not(any(debug_assertions, test)))]
{
tracing::debug!("Finished transaction with error: {err}");
}
}
}
}
pub(crate) async fn process_message_decoupled<CB>(
msg: NetMessage,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
mut event_listener: Box<dyn NetEventRegister>,
pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
) where
CB: NetworkBridge + Clone + 'static,
{
let tx = *msg.id();
let op_result = handle_pure_network_message(
msg,
source_addr,
op_manager.clone(),
conn_manager,
event_listener.as_mut(),
pending_op_result,
)
.await;
report_result(Some(tx), op_result, &op_manager, &mut *event_listener).await;
}
#[allow(clippy::too_many_arguments)]
async fn handle_pure_network_message<CB>(
msg: NetMessage,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
) -> Result<(), crate::node::OpError>
where
CB: NetworkBridge + Clone + 'static,
{
match msg {
NetMessage::V1(msg_v1) => {
handle_pure_network_message_v1(
msg_v1,
source_addr,
op_manager,
conn_manager,
event_listener,
pending_op_result,
)
.await
}
}
}
fn try_forward_task_per_tx_reply(
pending_op_result: Option<&tokio::sync::mpsc::Sender<NetMessage>>,
reply: NetMessage,
op_label: &'static str,
) -> bool {
let Some(callback) = pending_op_result else {
return false;
};
let tx_id = *reply.id();
if let Err(err) = callback.try_send(reply) {
tracing::error!(
%err,
%tx_id,
op = op_label,
"Failed to forward task-per-tx reply to OpCtx task"
);
}
true
}
fn fill_connect_response_acceptor_addr(
op: connect::ConnectMsg,
source_addr: Option<std::net::SocketAddr>,
) -> connect::ConnectMsg {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
connect::ConnectMsg::Response { id, mut payload } => {
if payload.acceptor.peer_addr.is_unknown() {
if let Some(addr) = source_addr {
payload.acceptor.peer_addr = crate::ring::PeerAddr::Known(addr);
tracing::debug!(
acceptor_pub_key = %payload.acceptor.pub_key(),
acceptor_addr = %addr,
"connect bypass: filled acceptor address from source_addr"
);
} else {
tracing::warn!(
acceptor_pub_key = %payload.acceptor.pub_key(),
"connect bypass: response received without source_addr, cannot fill acceptor address"
);
}
}
connect::ConnectMsg::Response { id, payload }
}
other => other,
}
}
#[allow(clippy::too_many_arguments, clippy::needless_return)]
async fn handle_pure_network_message_v1<CB>(
msg: NetMessageV1,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
) -> Result<(), crate::node::OpError>
where
CB: NetworkBridge + Clone + 'static,
{
event_listener
.register_events(NetEventLog::from_inbound_msg_v1(
&msg,
&op_manager,
source_addr,
))
.await;
let tx = Some(*msg.id());
tracing::debug!(?tx, "Processing pure network operation");
match msg {
NetMessageV1::Connect(ref op) => {
if matches!(
op,
connect::ConnectMsg::Response { .. }
| connect::ConnectMsg::Rejected { .. }
| connect::ConnectMsg::ObservedAddress { .. }
| connect::ConnectMsg::ConnectFailed { .. }
) {
let forwarded_op = fill_connect_response_acceptor_addr(op.clone(), source_addr);
if try_forward_task_per_tx_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Connect(forwarded_op)),
"connect",
) {
return Ok(());
}
}
if let connect::ConnectMsg::Request { id, payload } = op {
if let Some(upstream_addr) = source_addr {
if !op_manager.active_relay_connect_txs.contains(id) {
if let Err(err) = connect::op_ctx_task::start_relay_connect(
op_manager.clone(),
*id,
payload.clone(),
upstream_addr,
)
.await
{
tracing::error!(
tx = %id,
%upstream_addr,
error = %err,
"CONNECT relay dispatch: start_relay_connect failed"
);
}
} else {
tracing::debug!(
tx = %id,
%upstream_addr,
"CONNECT: duplicate Request, relay driver already running"
);
}
} else {
tracing::debug!(
tx = %id,
"CONNECT: Request without source_addr ignored (no legacy joiner path)"
);
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"CONNECT: non-Request variant ignored \
(Response/Rejected/ObservedAddress/ConnectFailed already handled by bypass)"
);
}
return Ok(());
}
NetMessageV1::Put(ref op) => {
if matches!(
op,
put::PutMsg::Response { .. } | put::PutMsg::ResponseStreaming { .. }
) && try_forward_task_per_tx_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Put((*op).clone())),
"put",
) {
return Ok(());
}
let effective_upstream =
source_addr.or_else(|| op_manager.ring.connection_manager.get_own_addr());
if let Some(upstream_addr) = effective_upstream {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
put::PutMsg::Request {
id,
contract,
related_contracts,
value,
htl,
skip_list,
} => {
if let Err(err) = put::op_ctx_task::start_relay_put(
op_manager.clone(),
conn_manager.clone(),
*id,
contract.clone(),
related_contracts.clone(),
value.clone(),
*htl,
skip_list.clone(),
upstream_addr,
)
.await
{
tracing::error!(
tx = %id,
contract = %contract.key(),
error = %err,
"PUT relay dispatch: start_relay_put failed"
);
}
}
put::PutMsg::RequestStreaming {
id,
stream_id,
contract_key,
total_size,
htl,
skip_list,
subscribe,
} => {
if let Err(err) = put::op_ctx_task::start_relay_put_streaming(
op_manager.clone(),
conn_manager.clone(),
*id,
*stream_id,
*contract_key,
*total_size,
*htl,
skip_list.clone(),
*subscribe,
upstream_addr,
)
.await
{
tracing::error!(
tx = %id,
contract = %contract_key,
error = %err,
"PUT relay dispatch: start_relay_put_streaming failed"
);
}
}
_ => {
tracing::debug!(
tx = %op.id(),
?op,
"PUT: non-dispatch variant ignored \
(Response/ResponseStreaming already handled \
by bypass; ForwardingAck is no-op)"
);
}
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"PUT: no own_addr available — pre-handshake \
message ignored"
);
}
return Ok(());
}
NetMessageV1::Get(ref op) => {
if matches!(
op,
get::GetMsg::Response { .. } | get::GetMsg::ResponseStreaming { .. }
) && try_forward_task_per_tx_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Get((*op).clone())),
"get",
) {
return Ok(());
}
let effective_upstream =
source_addr.or_else(|| op_manager.ring.connection_manager.get_own_addr());
if let Some(upstream_addr) = effective_upstream {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
get::GetMsg::Request {
id,
instance_id,
fetch_contract,
htl,
visited,
subscribe,
} => {
if let Err(err) = get::op_ctx_task::start_relay_get(
op_manager.clone(),
*id,
*instance_id,
*htl,
upstream_addr,
visited.clone(),
*fetch_contract,
*subscribe,
)
.await
{
tracing::error!(
tx = %id,
%instance_id,
error = %err,
"GET relay dispatch: start_relay_get failed"
);
}
}
_ => {
tracing::debug!(
tx = %op.id(),
?op,
"GET: non-dispatch variant ignored \
(Response/ResponseStreaming already handled \
by bypass; ForwardingAck is no-op; \
ResponseStreamingAck handled by stream layer)"
);
}
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"GET: no own_addr available — pre-handshake \
message ignored"
);
}
return Ok(());
}
NetMessageV1::Update(ref op) => {
if let Some(sender_addr) = source_addr {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
update::UpdateMsg::RequestUpdate {
id,
key,
related_contracts,
value,
} => {
if let Err(err) = update::op_ctx_task::start_relay_request_update(
op_manager.clone(),
*id,
*key,
related_contracts.clone(),
value.clone(),
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_request_update failed"
);
}
return Ok(());
}
update::UpdateMsg::BroadcastTo {
id,
key,
payload,
sender_summary_bytes,
} => {
if let Err(err) = update::op_ctx_task::start_relay_broadcast_to(
op_manager.clone(),
*id,
*key,
payload.clone(),
sender_summary_bytes.clone(),
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_broadcast_to failed"
);
}
return Ok(());
}
update::UpdateMsg::RequestUpdateStreaming {
id,
key,
stream_id,
total_size,
} => {
if let Err(err) = update::op_ctx_task::start_relay_request_update_streaming(
op_manager.clone(),
*id,
*key,
*stream_id,
*total_size,
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_request_update_streaming failed"
);
}
return Ok(());
}
update::UpdateMsg::BroadcastToStreaming {
id,
key,
stream_id,
total_size,
} => {
if let Err(err) = update::op_ctx_task::start_relay_broadcast_to_streaming(
op_manager.clone(),
*id,
*key,
*stream_id,
*total_size,
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_broadcast_to_streaming failed"
);
}
return Ok(());
}
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"UPDATE: internal-source variant ignored"
);
}
return Ok(());
}
NetMessageV1::Subscribe(ref op) => {
if matches!(op, subscribe::SubscribeMsg::Response { .. })
&& try_forward_task_per_tx_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Subscribe((*op).clone())),
"subscribe",
)
{
return Ok(());
}
let effective_upstream =
source_addr.or_else(|| op_manager.ring.connection_manager.get_own_addr());
if let Some(upstream_addr) = effective_upstream {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
subscribe::SubscribeMsg::Request {
id,
instance_id,
htl,
visited,
is_renewal,
} => {
if let Err(err) = subscribe::op_ctx_task::start_relay_subscribe(
op_manager.clone(),
*id,
*instance_id,
*htl,
visited.clone(),
*is_renewal,
upstream_addr,
)
.await
{
tracing::error!(
tx = %id,
%instance_id,
error = %err,
"SUBSCRIBE relay dispatch: start_relay_subscribe failed"
);
}
}
subscribe::SubscribeMsg::Unsubscribe { id, instance_id } => {
subscribe::handle_unsubscribe_inbound(
&op_manager,
*id,
*instance_id,
source_addr,
)
.await;
}
_ => {
tracing::debug!(
tx = %op.id(),
?op,
"SUBSCRIBE: non-dispatch variant ignored \
(Response already handled by bypass; \
ForwardingAck is no-op)"
);
}
}
} else {
tracing::debug!(
tx = %op.id(),
?op,
"SUBSCRIBE: no own_addr available — pre-handshake \
message ignored"
);
}
return Ok(());
}
NetMessageV1::NeighborHosting { ref message } => {
let Some(source) = source_addr else {
tracing::warn!(
"Received NeighborHosting message without source address (pure network)"
);
return Ok(());
};
tracing::debug!(
from = %source,
"Processing NeighborHosting message (pure network)"
);
let source_pub_key = op_manager
.ring
.connection_manager
.get_peer_by_addr(source)
.map(|pkl| pkl.pub_key().clone());
let Some(source_pub_key) = source_pub_key else {
tracing::debug!(
%source,
"NeighborHosting: could not resolve source addr to pub_key, skipping"
);
return Ok(());
};
let result = op_manager
.neighbor_hosting
.handle_message(&source_pub_key, message.clone());
if let Some(response) = result.response {
let response_msg =
NetMessage::V1(NetMessageV1::NeighborHosting { message: response });
if let Err(err) = conn_manager.send(source, response_msg).await {
tracing::error!(%err, %source, "Failed to send NeighborHosting response");
}
}
for instance_id in result.overlapping_contracts {
if let Some((key, state)) =
get_contract_state_by_id(&op_manager, &instance_id).await
{
if !op_manager.ring.is_receiving_updates(&key)
&& !op_manager.ring.has_downstream_subscribers(&key)
{
continue;
}
tracing::debug!(
contract = %key,
peer = %source_pub_key,
"Proximity cache overlap — syncing state to neighbor"
);
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SyncStateToPeer {
key,
new_state: state,
target: source,
})
.await
{
tracing::warn!(
contract = %instance_id,
error = %e,
"Failed to emit SyncStateToPeer for proximity sync"
);
}
}
}
return Ok(());
}
NetMessageV1::InterestSync { ref message } => {
let Some(source) = source_addr else {
tracing::warn!("Received InterestSync message without source address");
return Ok(());
};
tracing::debug!(
from = %source,
"Processing InterestSync message"
);
if let Some(response) =
handle_interest_sync_message(&op_manager, source, message.clone()).await
{
let response_msg = NetMessage::V1(NetMessageV1::InterestSync { message: response });
if let Err(err) = conn_manager.send(source, response_msg).await {
tracing::error!(%err, %source, "Failed to send InterestSync response");
}
}
return Ok(());
}
NetMessageV1::ReadyState { ready } => {
let Some(source) = source_addr else {
tracing::warn!("Received ReadyState message without source address");
return Ok(());
};
if ready {
op_manager.ring.connection_manager.mark_peer_ready(source);
} else {
op_manager
.ring
.connection_manager
.mark_peer_not_ready(source);
}
tracing::debug!(
from = %source,
ready,
"Processed ReadyState from peer"
);
return Ok(());
}
NetMessageV1::Aborted(tx) => {
tracing::debug!(
%tx,
tx_type = ?tx.transaction_type(),
"Received Aborted message — driver owns cancellation, ignoring"
);
Ok(())
}
}
}
async fn handle_interest_sync_message(
op_manager: &Arc<OpManager>,
source: std::net::SocketAddr,
message: crate::message::InterestMessage,
) -> Option<crate::message::InterestMessage> {
use crate::message::{InterestMessage, NodeEvent, SummaryEntry};
use crate::ring::interest::contract_hash;
match message {
InterestMessage::Interests { hashes } => {
tracing::debug!(
from = %source,
hash_count = hashes.len(),
"Received Interests message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
let incoming_hashes: std::collections::HashSet<u32> =
hashes.iter().copied().collect();
let current_contracts = op_manager.interest_manager.get_contracts_for_peer(pk);
let mut removed = 0usize;
for contract in ¤t_contracts {
let h = contract_hash(contract);
if !incoming_hashes.contains(&h) {
op_manager
.interest_manager
.remove_peer_interest(contract, pk);
removed += 1;
}
}
if removed > 0 {
tracing::debug!(
from = %source,
removed,
"Full-replace: removed stale interest entries"
);
}
}
let matching = op_manager.interest_manager.get_matching_contracts(&hashes);
let mut entries = Vec::with_capacity(matching.len());
for contract in matching {
let hash = contract_hash(&contract);
let summary = get_contract_summary(op_manager, &contract).await;
entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
if let Some(ref pk) = peer_key {
if op_manager
.interest_manager
.get_peer_interest(&contract, pk)
.is_some()
{
op_manager
.interest_manager
.refresh_peer_interest(&contract, pk);
} else {
op_manager.interest_manager.register_peer_interest(
&contract,
pk.clone(),
None, false,
);
}
}
}
if entries.is_empty() {
None
} else {
Some(InterestMessage::Summaries { entries })
}
}
InterestMessage::Summaries { entries } => {
tracing::debug!(
from = %source,
entry_count = entries.len(),
"Received Summaries message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
let mut stale_contracts = Vec::new();
let emit_confirmed = crate::config::SimulationIdleTimeout::is_enabled();
let mut confirmed_states: Vec<(freenet_stdlib::prelude::ContractKey, String)> =
Vec::new();
if let Some(pk) = peer_key {
for entry in entries {
for contract in op_manager.interest_manager.lookup_by_hash(entry.hash) {
if !op_manager.interest_manager.has_local_interest(&contract) {
continue;
}
let their_summary = entry.to_summary();
let our_summary = get_contract_summary(op_manager, &contract).await;
if emit_confirmed {
if let Some(ref summary) = our_summary {
confirmed_states.push((contract, hex::encode(summary.as_ref())));
}
}
let is_stale = our_summary
.as_ref()
.zip(their_summary.as_ref())
.is_some_and(|(ours, theirs)| ours.as_ref() != theirs.as_ref());
op_manager.interest_manager.update_peer_summary(
&contract,
&pk,
their_summary,
);
if is_stale && !stale_contracts.contains(&contract) {
stale_contracts.push(contract);
}
}
}
}
for contract in stale_contracts {
let Some(state) = get_contract_state(op_manager, &contract).await else {
tracing::trace!(
contract = %contract,
"Skipping stale-peer sync — no local state available"
);
continue;
};
tracing::info!(
contract = %contract,
stale_peer = %source,
"Summary mismatch in interest sync — syncing state to stale peer"
);
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SyncStateToPeer {
key: contract,
new_state: state,
target: source,
})
.await
{
tracing::warn!(
contract = %contract,
error = %e,
"Failed to emit SyncStateToPeer for stale peer correction"
);
}
}
for (key, state_hash) in confirmed_states {
if let Some(event) =
crate::tracing::NetEventLog::state_confirmed(&op_manager.ring, key, state_hash)
{
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
}
None
}
InterestMessage::ChangeInterests { added, removed } => {
tracing::debug!(
from = %source,
added_count = added.len(),
removed_count = removed.len(),
"Received ChangeInterests message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
for hash in removed {
for contract in op_manager.interest_manager.lookup_by_hash(hash) {
op_manager
.interest_manager
.remove_peer_interest(&contract, pk);
}
}
}
let mut entries = Vec::new();
if let Some(ref pk) = peer_key {
for hash in added {
for contract in op_manager.interest_manager.lookup_by_hash(hash) {
if !op_manager.interest_manager.has_local_interest(&contract) {
continue;
}
op_manager.interest_manager.register_peer_interest(
&contract,
pk.clone(),
None,
false,
);
let summary = get_contract_summary(op_manager, &contract).await;
entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
}
}
}
if entries.is_empty() {
None
} else {
Some(InterestMessage::Summaries { entries })
}
}
InterestMessage::ResyncRequest { key } => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_request_received",
"Received ResyncRequest - peer needs full state"
);
op_manager.interest_manager.record_resync_request_received();
crate::config::GlobalTestMetrics::record_resync_request();
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
op_manager
.interest_manager
.update_peer_summary(&key, pk, None);
}
let from_peer = op_manager.ring.connection_manager.get_peer_by_addr(source);
if let Some(ref from_pkl) = from_peer {
if let Some(event) = crate::tracing::NetEventLog::resync_request_received(
&op_manager.ring,
key,
from_pkl.clone(),
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
} else {
tracing::debug!(
contract = %key,
source = %source,
"ResyncRequest telemetry skipped: peer lookup failed"
);
}
let state = get_contract_state(op_manager, &key).await;
let Some(state) = state else {
tracing::warn!(
contract = %key,
"ResyncRequest for contract we don't have state for"
);
return None;
};
let summary = get_contract_summary(op_manager, &key).await;
let Some(summary) = summary else {
tracing::warn!(
contract = %key,
"ResyncRequest for contract we can't compute summary for"
);
return None;
};
tracing::info!(
to = %source,
contract = %key,
state_size = state.as_ref().len(),
summary_size = summary.as_ref().len(),
event = "resync_response_sent",
"Sending ResyncResponse with full state"
);
if let Some(ref to_pkl) = from_peer {
if let Some(event) = crate::tracing::NetEventLog::resync_response_sent(
&op_manager.ring,
key,
to_pkl.clone(),
state.as_ref().len(),
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
}
Some(InterestMessage::ResyncResponse {
key,
state_bytes: state.as_ref().to_vec(),
summary_bytes: summary.as_ref().to_vec(),
})
}
InterestMessage::ResyncResponse {
key,
state_bytes,
summary_bytes,
} => {
tracing::info!(
from = %source,
contract = %key,
state_size = state_bytes.len(),
event = "resync_response_received",
"Received ResyncResponse with full state"
);
let state = freenet_stdlib::prelude::State::from(state_bytes.clone());
let update_data = freenet_stdlib::prelude::UpdateData::State(state);
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::UpdateQuery {
key,
data: update_data,
related_contracts: Default::default(),
})
.await
{
Ok(ContractHandlerEvent::UpdateResponse {
new_value: Ok(_), ..
}) => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_applied",
changed = true,
"ResyncResponse state applied successfully"
);
}
Ok(ContractHandlerEvent::UpdateNoChange { .. }) => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_applied",
changed = false,
"ResyncResponse state unchanged (already had this state)"
);
}
Ok(other) => {
tracing::warn!(
from = %source,
contract = %key,
event = "resync_failed",
response = ?other,
"Unexpected response to resync update"
);
}
Err(e) => {
tracing::error!(
from = %source,
contract = %key,
event = "resync_failed",
error = %e,
"Failed to apply resync state"
);
}
}
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(pk) = peer_key {
let summary = freenet_stdlib::prelude::StateSummary::from(summary_bytes);
op_manager
.interest_manager
.update_peer_summary(&key, &pk, Some(summary));
}
None
}
}
}
async fn get_contract_state(
op_manager: &Arc<OpManager>,
key: &freenet_stdlib::prelude::ContractKey,
) -> Option<freenet_stdlib::prelude::WrappedState> {
get_contract_state_by_id(op_manager, key.id())
.await
.map(|(_, state)| state)
}
async fn get_contract_state_by_id(
op_manager: &Arc<OpManager>,
instance_id: &freenet_stdlib::prelude::ContractInstanceId,
) -> Option<(
freenet_stdlib::prelude::ContractKey,
freenet_stdlib::prelude::WrappedState,
)> {
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code: false,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
key: Some(key),
response: Ok(store_response),
}) => store_response.state.map(|state| (key, state)),
Ok(ContractHandlerEvent::GetResponse {
response: Err(e), ..
}) => {
tracing::warn!(
contract = %instance_id,
error = %e,
"Failed to get contract state by instance id"
);
None
}
_ => None,
}
}
async fn get_contract_summary(
op_manager: &Arc<OpManager>,
key: &freenet_stdlib::prelude::ContractKey,
) -> Option<freenet_stdlib::prelude::StateSummary<'static>> {
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::GetSummaryQuery { key: *key })
.await
{
Ok(ContractHandlerEvent::GetSummaryResponse {
summary: Ok(summary),
..
}) => Some(summary),
Ok(ContractHandlerEvent::GetSummaryResponse {
summary: Err(e), ..
}) => {
tracing::warn!(
contract = %key,
error = %e,
"Failed to get contract summary"
);
None
}
_ => None,
}
}
fn get_peer_key_from_addr(
op_manager: &Arc<OpManager>,
addr: std::net::SocketAddr,
) -> Option<crate::ring::interest::PeerKey> {
op_manager
.ring
.connection_manager
.get_peer_by_addr(addr)
.map(|pkl| crate::ring::interest::PeerKey::from(pkl.pub_key.clone()))
}
#[allow(dead_code)]
pub async fn subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_id: Option<ClientId>,
) -> Result<Transaction, OpError> {
subscribe_with_id(op_manager, instance_id, client_id, None).await
}
pub async fn subscribe_with_id(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_id: Option<ClientId>,
transaction_id: Option<Transaction>,
) -> Result<Transaction, OpError> {
let client_tx = match transaction_id {
Some(id) => id,
None => Transaction::new::<subscribe::SubscribeMsg>(),
};
if let Some(client_id) = client_id {
use crate::client_events::RequestId;
let request_id = RequestId::new();
if let Err(e) = op_manager
.ch_outbound
.waiting_for_subscription_result(client_tx, instance_id, client_id, request_id)
.await
{
tracing::warn!(tx = %client_tx, error = %e, "failed to register subscription result waiter");
}
}
subscribe::start_client_subscribe(op_manager, instance_id, client_tx).await
}
pub(crate) async fn handle_aborted_op(
tx: Transaction,
_op_manager: &OpManager,
_gateways: &[PeerKeyLocation],
) -> Result<(), OpError> {
tracing::debug!(
%tx,
tx_type = ?tx.transaction_type(),
"handle_aborted_op: no-op after #1454 phase 6 (driver-owned cancellation)"
);
Ok(())
}
pub type PeerId = crate::ring::KnownPeerKeyLocation;
pub async fn run_local_node(
mut executor: Executor,
socket: WebsocketApiConfig,
) -> anyhow::Result<()> {
if !crate::server::is_private_ip(&socket.address) {
anyhow::bail!(
"invalid ip: {}, only loopback and private network addresses are allowed",
socket.address
)
}
let (mut gw, mut ws_proxy) = crate::server::serve_client_api_in(socket).await?;
enum Receiver {
Ws,
Gw,
}
let mut receiver;
loop {
let req = crate::deterministic_select! {
req = ws_proxy.recv() => {
receiver = Receiver::Ws;
req?
},
req = gw.recv() => {
receiver = Receiver::Gw;
req?
},
};
let OpenRequest {
client_id: id,
request,
notification_channel,
token,
origin_contract,
..
} = req;
tracing::debug!(client_id = %id, ?token, "Received OpenRequest -> {request}");
let res = match *request {
ClientRequest::ContractOp(op) => {
executor
.contract_requests(op, id, notification_channel)
.await
}
ClientRequest::DelegateOp(op) => {
let op_name = match op {
DelegateRequest::RegisterDelegate { .. } => "RegisterDelegate",
DelegateRequest::ApplicationMessages { .. } => "ApplicationMessages",
DelegateRequest::UnregisterDelegate(_) => "UnregisterDelegate",
_ => "Unknown",
};
tracing::debug!(
op_name = ?op_name,
?origin_contract,
"Handling ClientRequest::DelegateOp"
);
executor.delegate_request(op, origin_contract.as_ref(), None)
}
ClientRequest::Disconnect { cause } => {
if let Some(cause) = cause {
tracing::info!("disconnecting cause: {cause}");
}
continue;
}
ClientRequest::Authenticate { .. }
| ClientRequest::NodeQueries(_)
| ClientRequest::Close
| _ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
};
match res {
Ok(res) => {
match receiver {
Receiver::Ws => ws_proxy.send(id, Ok(res)).await?,
Receiver::Gw => gw.send(id, Ok(res)).await?,
};
}
Err(err) if err.is_request() => {
let err = ErrorKind::RequestError(err.unwrap_request());
match receiver {
Receiver::Ws => {
ws_proxy.send(id, Err(err.into())).await?;
}
Receiver::Gw => {
gw.send(id, Err(err.into())).await?;
}
};
}
Err(err) => {
tracing::error!("{err}");
let err = Err(ErrorKind::Unhandled {
cause: format!("{err}").into(),
}
.into());
match receiver {
Receiver::Ws => {
ws_proxy.send(id, err).await?;
}
Receiver::Gw => {
gw.send(id, err).await?;
}
};
}
}
}
}
pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
tracing::info!("Starting node");
let is_gateway = node.inner.is_gateway;
let location = if let Some(loc) = node.inner.location {
Some(loc)
} else {
is_gateway
.then(|| {
node.inner
.peer_id
.as_ref()
.map(|id| Location::from_address(&id.socket_addr()))
})
.flatten()
};
if let Some(location) = location {
tracing::info!("Setting initial location: {location}");
node.update_location(location);
}
match node.run().await {
Ok(_) => {
if is_gateway {
tracing::info!("Gateway finished");
} else {
tracing::info!("Node finished");
}
Ok(())
}
Err(e) => {
tracing::error!("{e}");
Err(e)
}
}
}
#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr};
use super::*;
use rstest::rstest;
#[tokio::test]
async fn test_hostname_resolution_localhost() {
let addr = Address::Hostname("localhost".to_string());
let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert!(
socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
|| socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
);
assert!(socket_addr.port() > 1024);
}
#[tokio::test]
async fn test_hostname_resolution_with_port() {
let addr = Address::Hostname("google.com:8080".to_string());
let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert_eq!(socket_addr.port(), 8080);
}
#[tokio::test]
async fn test_hostname_resolution_with_trailing_dot() {
let addr = Address::Hostname("localhost.".to_string());
let result = NodeConfig::parse_socket_addr(&addr).await;
if let Ok(socket_addr) = result {
assert!(
socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
|| socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
);
}
}
#[tokio::test]
async fn test_hostname_resolution_direct_socket_addr() {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
let addr = Address::HostAddress(socket);
let resolved = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert_eq!(resolved, socket);
}
#[tokio::test]
async fn test_hostname_resolution_invalid_port() {
let addr = Address::Hostname("localhost:not_a_port".to_string());
let result = NodeConfig::parse_socket_addr(&addr).await;
assert!(result.is_err());
}
#[ignore]
#[rstest]
#[case::same_addr_different_keys(8080, 8080, true)]
#[case::different_addr_same_key(8080, 8081, false)]
fn test_peer_id_equality(#[case] port1: u16, #[case] port2: u16, #[case] expected_equal: bool) {
let keypair1 = TransportKeypair::new();
let keypair2 = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port1);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port2);
let peer1 = PeerId::new(keypair1.public().clone(), addr1);
let peer2 = PeerId::new(keypair2.public().clone(), addr2);
assert_eq!(peer1 == peer2, expected_equal);
}
#[test]
fn test_peer_id_equality_same_key_same_addr() {
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair.public().clone(), addr);
let peer2 = PeerId::new(keypair.public().clone(), addr);
assert_eq!(peer1, peer2);
}
#[test]
fn test_peer_id_equality_different_key_same_addr() {
let keypair1 = TransportKeypair::new();
let keypair2 = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair1.public().clone(), addr);
let peer2 = PeerId::new(keypair2.public().clone(), addr);
assert_ne!(peer1, peer2);
}
#[test]
fn test_peer_id_equality_different_addr() {
let keypair = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081);
let peer1 = PeerId::new(keypair.public().clone(), addr1);
let peer2 = PeerId::new(keypair.public().clone(), addr2);
assert_ne!(peer1, peer2);
}
#[rstest]
#[case::lower_port_first(8080, 8081)]
#[case::high_port_diff(1024, 65535)]
fn test_peer_id_ordering(#[case] lower_port: u16, #[case] higher_port: u16) {
let keypair = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), lower_port);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), higher_port);
let peer1 = PeerId::new(keypair.public().clone(), addr1);
let peer2 = PeerId::new(keypair.public().clone(), addr2);
assert!(peer1 < peer2);
assert!(peer2 > peer1);
}
#[test]
fn test_peer_id_hash_consistency() {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair.public().clone(), addr);
let peer2 = PeerId::new(keypair.public().clone(), addr);
let mut hasher1 = DefaultHasher::new();
let mut hasher2 = DefaultHasher::new();
peer1.hash(&mut hasher1);
peer2.hash(&mut hasher2);
assert_eq!(hasher1.finish(), hasher2.finish());
}
#[test]
fn test_peer_id_random_produces_unique() {
let peer1 = PeerId::random();
let peer2 = PeerId::random();
assert_ne!(peer1.socket_addr(), peer2.socket_addr());
}
#[test]
fn test_peer_id_serialization() {
let peer = PeerId::random();
let bytes = peer.to_bytes();
assert!(!bytes.is_empty());
let deserialized: PeerId = bincode::deserialize(&bytes).unwrap();
assert_eq!(peer.socket_addr(), deserialized.socket_addr());
}
#[test]
fn test_peer_id_display() {
let peer = PeerId::random();
let display = format!("{}", peer);
let debug = format!("{:?}", peer);
assert_eq!(display, debug);
assert!(!display.is_empty());
}
#[test]
fn test_init_peer_node_construction() {
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
let peer_key_location = PeerKeyLocation::new(keypair.public().clone(), addr);
let location = Location::new(0.5);
let init_peer = InitPeerNode::new(peer_key_location.clone(), location);
assert_eq!(init_peer.peer_key_location, peer_key_location);
assert_eq!(init_peer.location, location);
}
mod callback_forward_tests {
use super::super::try_forward_task_per_tx_reply;
use crate::message::{MessageStats, NetMessage, NetMessageV1, Transaction};
use crate::operations::connect::ConnectMsg;
fn dummy_reply() -> NetMessage {
NetMessage::V1(NetMessageV1::Aborted(Transaction::new::<ConnectMsg>()))
}
#[tokio::test]
async fn bypass_forwards_when_callback_registered() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let reply = dummy_reply();
let expected_id = *reply.id();
let taken = try_forward_task_per_tx_reply(Some(&tx), reply, "subscribe");
assert!(taken, "callback present → bypass must be taken");
let received = rx
.try_recv()
.expect("helper should forward the reply to the callback");
assert_eq!(*received.id(), expected_id);
}
#[tokio::test]
async fn bypass_returns_false_when_no_callback() {
let taken = try_forward_task_per_tx_reply(None, dummy_reply(), "subscribe");
assert!(!taken, "no callback → bypass must not be taken");
}
#[tokio::test]
async fn bypass_returns_true_even_when_receiver_dropped() {
let (tx, rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
drop(rx);
let taken = try_forward_task_per_tx_reply(Some(&tx), dummy_reply(), "subscribe");
assert!(
taken,
"callback present but receiver dropped → bypass still taken"
);
}
#[test]
fn bypass_is_wired_into_subscribe_branch_regression_guard() {
const SOURCE: &str = include_str!("node.rs");
let subscribe_branch_anchor: String =
["NetMessageV1::", "Subscribe(ref op)", " => {"].concat();
let branch_start = SOURCE.find(&subscribe_branch_anchor).expect(
"SUBSCRIBE branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let next_variant_anchor: String = ["// Non-transactional", " message types:"].concat();
let window_end = SOURCE[branch_start..]
.find(&next_variant_anchor)
.expect("end of SUBSCRIBE branch not found — update guard")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_task_per_tx_reply("),
"SUBSCRIBE branch no longer calls \
try_forward_task_per_tx_reply before relay dispatch. \
Either restore the bypass or update this regression \
guard if the branch was legitimately refactored."
);
let response_gate: String = [
"matches!(op, ",
"subscribe::SubscribeMsg::Response { .. }",
")",
]
.concat();
assert!(
window.contains(&response_gate),
"SUBSCRIBE branch bypass is not gated on Response-only. \
Non-terminal messages (ForwardingAck, Unsubscribe) must NOT \
be forwarded to the task-per-tx channel — they would fill \
the capacity-1 reply slot and block the real Response."
);
}
#[tokio::test]
async fn bypass_does_not_block_when_channel_already_full() {
let (tx, _rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
tx.try_send(dummy_reply())
.expect("capacity-1 channel should accept first message");
let taken = try_forward_task_per_tx_reply(Some(&tx), dummy_reply(), "subscribe");
assert!(
taken,
"callback present but channel full → bypass still taken"
);
}
use crate::operations::VisitedPeers;
use crate::operations::subscribe::{SubscribeMsg, SubscribeMsgResult};
fn subscribe_branch_would_forward(
op: &SubscribeMsg,
callback: Option<&tokio::sync::mpsc::Sender<NetMessage>>,
) -> bool {
matches!(op, SubscribeMsg::Response { .. })
&& try_forward_task_per_tx_reply(
callback,
NetMessage::V1(NetMessageV1::Subscribe(op.clone())),
"subscribe",
)
}
#[tokio::test]
async fn subscribe_response_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([1u8; 32]);
let key = freenet_stdlib::prelude::ContractKey::from_id_and_code(
instance_id,
freenet_stdlib::prelude::CodeHash::new([2u8; 32]),
);
let op = SubscribeMsg::Response {
id: sub_tx,
instance_id,
result: SubscribeMsgResult::Subscribed { key },
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(taken, "Response with callback → must be forwarded");
let received = rx.try_recv().expect("Response should be in channel");
assert_eq!(*received.id(), sub_tx);
}
#[tokio::test]
async fn forwarding_ack_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([3u8; 32]);
let op = SubscribeMsg::ForwardingAck {
id: sub_tx,
instance_id,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(
!taken,
"ForwardingAck must NOT be forwarded to task channel"
);
assert!(
rx.try_recv().is_err(),
"channel must remain empty after ForwardingAck"
);
}
#[tokio::test]
async fn unsubscribe_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([4u8; 32]);
let op = SubscribeMsg::Unsubscribe {
id: sub_tx,
instance_id,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Unsubscribe must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn request_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([5u8; 32]);
let op = SubscribeMsg::Request {
id: sub_tx,
instance_id,
htl: 5,
visited: VisitedPeers::new(&sub_tx),
is_renewal: false,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Request must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn response_without_callback_falls_through() {
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([6u8; 32]);
let op = SubscribeMsg::Response {
id: sub_tx,
instance_id,
result: SubscribeMsgResult::NotFound,
};
let taken = subscribe_branch_would_forward(&op, None);
assert!(
!taken,
"Response without callback → must fall through to legacy path"
);
}
#[test]
fn bypass_is_wired_into_put_branch_regression_guard() {
const SOURCE: &str = include_str!("node.rs");
let put_branch_anchor = "NetMessageV1::Put(ref op) => {";
let branch_start = SOURCE.find(put_branch_anchor).expect(
"PUT branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let next_variant = "NetMessageV1::Get(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of PUT arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_task_per_tx_reply("),
"PUT branch no longer calls try_forward_task_per_tx_reply. \
Restore the bypass or update this regression guard."
);
assert!(
window.contains("put::PutMsg::Response { .. }"),
"PUT branch bypass is not gated on Response. \
Non-terminal messages must NOT be forwarded to the task-per-tx channel."
);
assert!(
window.contains("put::PutMsg::ResponseStreaming { .. }"),
"PUT branch bypass is not gated on ResponseStreaming. \
Both terminal variants must be forwarded."
);
let legacy_dispatch_needle = format!("handle{}::<put::PutOp, _>", "_op_request");
assert!(
!window.contains(&legacy_dispatch_needle),
"PUT branch must not call legacy state-machine dispatch"
);
let dashmap_gate_needle = format!("has{}_op", "_put");
assert!(
!window.contains(&dashmap_gate_needle),
"PUT branch must not gate dispatch on the retired DashMap existence check"
);
}
use crate::operations::put::PutMsg;
use freenet_stdlib::prelude::*;
fn dummy_put_key(a: u8, b: u8) -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([a; 32]), CodeHash::new([b; 32]))
}
fn put_branch_would_forward(
op: &PutMsg,
callback: Option<&tokio::sync::mpsc::Sender<NetMessage>>,
) -> bool {
matches!(
op,
PutMsg::Response { .. } | PutMsg::ResponseStreaming { .. }
) && try_forward_task_per_tx_reply(
callback,
NetMessage::V1(NetMessageV1::Put(op.clone())),
"put",
)
}
#[tokio::test]
async fn put_response_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(10, 11);
let op = PutMsg::Response { id: put_tx, key };
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(taken, "Response with callback → must be forwarded");
let received = rx.try_recv().expect("Response should be in channel");
assert_eq!(*received.id(), put_tx);
}
#[tokio::test]
async fn put_response_streaming_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(12, 13);
let op = PutMsg::ResponseStreaming {
id: put_tx,
key,
continue_forwarding: false,
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(taken, "ResponseStreaming with callback → must be forwarded");
let received = rx
.try_recv()
.expect("ResponseStreaming should be in channel");
assert_eq!(*received.id(), put_tx);
}
#[tokio::test]
async fn put_forwarding_ack_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(14, 15);
let op = PutMsg::ForwardingAck {
id: put_tx,
contract_key: key,
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(
!taken,
"ForwardingAck must NOT be forwarded to task channel"
);
assert!(
rx.try_recv().is_err(),
"channel must remain empty after ForwardingAck"
);
}
#[tokio::test]
async fn put_request_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let put_tx = Transaction::new::<PutMsg>();
let op = PutMsg::Request {
id: put_tx,
contract: ContractContainer::Wasm(ContractWasmAPIVersion::V1(
WrappedContract::new(
std::sync::Arc::new(ContractCode::from(vec![0u8])),
Parameters::from(vec![]),
),
)),
related_contracts: RelatedContracts::default(),
value: WrappedState::new(vec![1u8]),
htl: 5,
skip_list: std::collections::HashSet::new(),
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Request must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn put_response_without_callback_falls_through() {
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(16, 17);
let op = PutMsg::Response { id: put_tx, key };
let taken = put_branch_would_forward(&op, None);
assert!(
!taken,
"Response without callback → must fall through to legacy path"
);
}
#[test]
fn get_branch_dispatches_relay_driver() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Get(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"GET branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_variant = "NetMessageV1::Update(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of GET arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_task_per_tx_reply("),
"GET branch no longer calls try_forward_task_per_tx_reply \
before relay dispatch. Restore the bypass."
);
assert!(
window.contains("get::GetMsg::Response { .. }"),
"GET branch bypass is not gated on Response. \
Non-terminal messages must NOT be forwarded to the task-per-tx channel."
);
assert!(
window.contains("get::GetMsg::ResponseStreaming { .. }"),
"GET branch bypass is not gated on ResponseStreaming. \
Both terminal variants must be forwarded."
);
assert!(
window.contains("start_relay_get("),
"GET branch no longer calls start_relay_get for relay dispatch."
);
assert!(
window.contains("effective_upstream") || window.contains("upstream_addr"),
"GET relay dispatch must thread an effective upstream address \
(source_addr or own_addr loopback) into the relay driver."
);
let legacy_dispatch_needle = format!("handle{}::<get::GetOp, _>", "_op_request");
assert!(
!window.contains(&legacy_dispatch_needle),
"GET branch must NOT call legacy state-machine dispatch"
);
let dashmap_gate_needle = format!("has{}_op", "_get");
assert!(
!window.contains(&dashmap_gate_needle),
"GET branch must NOT gate on the retired DashMap existence check"
);
let bypass_pos = window
.find("try_forward_task_per_tx_reply(")
.expect("try_forward_task_per_tx_reply not found in GET branch");
let relay_pos = window
.find("start_relay_get(")
.expect("start_relay_get not found in GET branch");
assert!(
bypass_pos < relay_pos,
"Reply bypass (try_forward_task_per_tx_reply) must \
appear BEFORE relay dispatch (start_relay_get) — \
swapping order would break the terminal-reply fast \
path."
);
}
#[test]
fn update_branch_dispatches_all_relay_drivers() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Update(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"UPDATE branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let next_variant = "NetMessageV1::Subscribe(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of UPDATE arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
for driver in [
"start_relay_request_update(",
"start_relay_broadcast_to(",
"start_relay_request_update_streaming(",
"start_relay_broadcast_to_streaming(",
] {
assert!(
window.contains(driver),
"UPDATE branch must call {driver} for relay dispatch."
);
}
let legacy_call = ["handle_op_request::<update::", "UpdateOp", ", _>"].concat();
assert!(
!window.contains(&legacy_call),
"UPDATE branch must NOT call handle_op_request"
);
let dispatch_gate = ["has_", "update_op"].concat();
assert!(
!window.contains(&dispatch_gate),
"UPDATE relay dispatch must NOT consult has_update_op"
);
}
#[test]
fn update_branch_dispatch_gates_on_source_addr() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Update(ref op) => {";
let branch_start = SOURCE.find(anchor).expect("UPDATE branch not found");
let next_variant = "NetMessageV1::Subscribe(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of UPDATE arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("if let Some(sender_addr) = source_addr"),
"UPDATE relay dispatch must be gated on source_addr.is_some() — \
internal callers must NOT spawn relay drivers."
);
}
#[test]
fn put_branch_dispatches_relay_drivers() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Put(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"PUT branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_variant = "NetMessageV1::Get(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of PUT arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("start_relay_put("),
"PUT branch no longer calls start_relay_put for relay dispatch."
);
assert!(
window.contains("start_relay_put_streaming("),
"PUT branch must call start_relay_put_streaming for streaming relay hops."
);
assert!(
window.contains("effective_upstream") || window.contains("upstream_addr"),
"PUT relay dispatch must thread an effective upstream address \
(source_addr or own_addr loopback) into the relay drivers."
);
let legacy_dispatch_needle = format!("handle{}::<put::PutOp, _>", "_op_request");
assert!(
!window.contains(&legacy_dispatch_needle),
"PUT branch must NOT call legacy state-machine dispatch"
);
let dashmap_gate_needle = format!("has{}_op", "_put");
assert!(
!window.contains(&dashmap_gate_needle),
"PUT branch must NOT gate on the retired DashMap existence check"
);
}
#[test]
fn start_relay_put_handles_upgrade_on_forward() {
const SOURCE: &str = include_str!("operations/put/op_ctx_task.rs");
let anchor = "async fn drive_relay_put<CB>(";
let driver_start = SOURCE
.find(anchor)
.expect("drive_relay_put fn not found — has the signature changed?");
let driver_end = SOURCE[driver_start + anchor.len()..]
.find("\nasync fn ")
.map(|idx| idx + driver_start + anchor.len())
.unwrap_or(SOURCE.len());
let body = &SOURCE[driver_start..driver_end];
assert!(
body.contains("should_use_streaming("),
"drive_relay_put must call should_use_streaming on the merged \
payload to decide between non-streaming Request and streaming \
upgrade on forward."
);
assert!(
body.contains("PutMsg::RequestStreaming {"),
"drive_relay_put must build PutMsg::RequestStreaming when the \
forwarded payload would exceed streaming_threshold."
);
assert!(
body.contains("send_stream("),
"drive_relay_put must call NetworkBridge::send_stream for the \
raw fragments after the RequestStreaming metadata send."
);
}
#[test]
fn subscribe_branch_dispatches_relay_driver() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Subscribe(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"SUBSCRIBE branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_variant = "// Non-transactional message types:";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of SUBSCRIBE arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_task_per_tx_reply("),
"SUBSCRIBE branch no longer calls try_forward_task_per_tx_reply \
before relay dispatch — restore it."
);
assert!(
window.contains("subscribe::SubscribeMsg::Response { .. }"),
"SUBSCRIBE branch bypass is not gated on Response. \
Non-terminal messages must NOT be forwarded to the task-per-tx channel."
);
assert!(
window.contains("start_relay_subscribe("),
"SUBSCRIBE branch no longer calls start_relay_subscribe for relay \
dispatch — restore it."
);
assert!(
window.contains("handle_unsubscribe_inbound("),
"SUBSCRIBE branch must call handle_unsubscribe_inbound \
for Unsubscribe wire messages."
);
assert!(
window.contains("effective_upstream") || window.contains("upstream_addr"),
"SUBSCRIBE relay dispatch must thread an effective upstream address \
(source_addr or own_addr loopback) into the relay driver."
);
let legacy_dispatch_needle =
format!("handle{}::<subscribe::SubscribeOp, _>", "_op_request");
assert!(
!window.contains(&legacy_dispatch_needle),
"SUBSCRIBE branch must NOT call legacy state-machine dispatch"
);
let dashmap_gate_needle = format!("has{}_op", "_subscribe");
assert!(
!window.contains(&dashmap_gate_needle),
"SUBSCRIBE branch must NOT gate on the retired DashMap existence check"
);
let bypass_pos = window
.find("try_forward_task_per_tx_reply(")
.expect("try_forward_task_per_tx_reply not found in SUBSCRIBE branch");
let relay_pos = window
.find("start_relay_subscribe(")
.expect("start_relay_subscribe not found in SUBSCRIBE branch");
assert!(
bypass_pos < relay_pos,
"SUBSCRIBE bypass (try_forward_task_per_tx_reply) must appear \
BEFORE relay dispatch (start_relay_subscribe). Swapping order \
would break the client-driver terminal-reply fast path."
);
}
}
mod fill_connect_response_acceptor_addr_tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::super::fill_connect_response_acceptor_addr;
use crate::message::Transaction;
use crate::operations::connect::{ConnectMsg, ConnectResponse};
use crate::ring::{PeerAddr, PeerKeyLocation};
fn dummy_unknown_pkl() -> PeerKeyLocation {
let pkl = PeerKeyLocation::random();
PeerKeyLocation {
pub_key: pkl.pub_key,
peer_addr: PeerAddr::Unknown,
}
}
fn known_addr() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 5)), 50051)
}
#[test]
fn fills_unknown_acceptor_addr_from_source_addr() {
let id = Transaction::new::<ConnectMsg>();
let payload = ConnectResponse {
acceptor: dummy_unknown_pkl(),
};
let msg = ConnectMsg::Response { id, payload };
let source = known_addr();
let filled = fill_connect_response_acceptor_addr(msg, Some(source));
#[allow(clippy::wildcard_enum_match_arm)]
match filled {
ConnectMsg::Response { payload, .. } => {
assert_eq!(payload.acceptor.socket_addr(), Some(source));
}
other => panic!("expected Response, got {other:?}"),
}
}
#[test]
fn leaves_known_acceptor_addr_unchanged() {
let id = Transaction::new::<ConnectMsg>();
let original_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 7)), 12345);
let pkl = PeerKeyLocation::random();
let payload = ConnectResponse {
acceptor: PeerKeyLocation {
pub_key: pkl.pub_key,
peer_addr: PeerAddr::Known(original_addr),
},
};
let msg = ConnectMsg::Response { id, payload };
let filled = fill_connect_response_acceptor_addr(msg, Some(known_addr()));
#[allow(clippy::wildcard_enum_match_arm)]
match filled {
ConnectMsg::Response { payload, .. } => {
assert_eq!(
payload.acceptor.socket_addr(),
Some(original_addr),
"fill must NOT overwrite a known acceptor address"
);
}
other => panic!("expected Response, got {other:?}"),
}
}
#[test]
fn unknown_acceptor_without_source_addr_passes_through() {
let id = Transaction::new::<ConnectMsg>();
let payload = ConnectResponse {
acceptor: dummy_unknown_pkl(),
};
let msg = ConnectMsg::Response { id, payload };
let filled = fill_connect_response_acceptor_addr(msg, None);
#[allow(clippy::wildcard_enum_match_arm)]
match filled {
ConnectMsg::Response { payload, .. } => {
assert!(
payload.acceptor.peer_addr.is_unknown(),
"fill must remain Unknown when source_addr is None"
);
}
other => panic!("expected Response, got {other:?}"),
}
}
#[test]
fn rejected_variant_passes_through_untouched() {
use crate::ring::Location;
let id = Transaction::new::<ConnectMsg>();
let dl = Location::new(0.42);
let msg = ConnectMsg::Rejected {
id,
desired_location: dl,
};
let filled = fill_connect_response_acceptor_addr(msg, Some(known_addr()));
#[allow(clippy::wildcard_enum_match_arm)]
match filled {
ConnectMsg::Rejected {
id: rid,
desired_location,
} => {
assert_eq!(rid, id);
assert_eq!(desired_location, dl);
}
other => panic!("expected Rejected, got {other:?}"),
}
}
}
mod connect_bypass_coverage_guards {
const SOURCE: &str = include_str!("node.rs");
fn connect_branch_window() -> &'static str {
let branch_anchor = "NetMessageV1::Connect(ref op) => {";
let branch_start = SOURCE.find(branch_anchor).expect(
"Connect branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this guard",
);
let next_variant_anchor = "NetMessageV1::Put(ref op) => {";
let window_end = SOURCE[branch_start..]
.find(next_variant_anchor)
.expect("end of Connect branch not found — update guard")
+ branch_start;
&SOURCE[branch_start..window_end]
}
#[test]
fn connect_branch_bypass_forwards_response() {
assert!(
connect_branch_window().contains("connect::ConnectMsg::Response { .. }"),
"Connect bypass `matches!` no longer forwards Response. \
Response is the joiner-fan-in terminal variant and MUST \
reach the per-tx multi-reply receiver."
);
}
#[test]
fn connect_branch_bypass_forwards_rejected() {
assert!(
connect_branch_window().contains("connect::ConnectMsg::Rejected { .. }"),
"Connect bypass `matches!` no longer forwards Rejected. \
Relay drivers and the joiner driver both observe Rejected \
to record connection failure / record_connection_failure."
);
}
#[test]
fn connect_branch_bypass_forwards_observed_address() {
assert!(
connect_branch_window().contains("connect::ConnectMsg::ObservedAddress { .. }"),
"Connect bypass `matches!` no longer forwards \
ObservedAddress. The joiner driver inbox owns the \
set_own_addr / update_location side effect; dropping \
ObservedAddress here breaks NAT discovery."
);
}
#[test]
fn connect_branch_bypass_forwards_connect_failed() {
assert!(
connect_branch_window().contains("connect::ConnectMsg::ConnectFailed { .. }"),
"Connect bypass `matches!` no longer forwards \
ConnectFailed. The relay driver inbox owns hole-punch \
failure re-route; dropping ConnectFailed here strands \
the re-route on legacy `process_message`."
);
}
#[test]
fn connect_branch_bypass_does_not_forward_request() {
let window = connect_branch_window();
let bypass_anchor = "if matches!(\n op,";
let bypass_start = window
.find(bypass_anchor)
.expect("bypass `matches!` block not found in Connect branch — guard outdated");
let bypass_end = window[bypass_start..]
.find(") {")
.expect("bypass `matches!` block has no closing `) {`")
+ bypass_start;
let bypass_block = &window[bypass_start..bypass_end];
assert!(
!bypass_block.contains("connect::ConnectMsg::Request"),
"Connect bypass `matches!` MUST NOT forward Request. \
Request is the spawn signal for start_relay_connect; \
forwarding it would route fresh Requests into a multi-reply \
receiver that doesn't exist yet."
);
}
#[test]
fn connect_branch_dispatches_start_relay_connect_for_fresh_request() {
let window = connect_branch_window();
assert!(
window.contains("start_relay_connect("),
"Connect branch no longer calls start_relay_connect — \
removing it strands relay CONNECT on legacy."
);
}
#[test]
fn connect_relay_dispatch_gated_on_source_addr() {
let window = connect_branch_window();
let dispatch_anchor = "start_relay_connect(";
let dispatch_pos = window
.find(dispatch_anchor)
.expect("start_relay_connect not found in Connect branch");
let gate_start = dispatch_pos.saturating_sub(500);
let gate_window = &window[gate_start..dispatch_pos];
assert!(
gate_window.contains("source_addr"),
"CONNECT relay dispatch is not gated on source_addr — \
originator loop-back must NOT spawn a relay driver."
);
}
#[test]
fn connect_relay_dispatch_guarded_by_active_relay_set() {
let window = connect_branch_window();
let dispatch_pos = window
.find("start_relay_connect(")
.expect("start_relay_connect not found in Connect branch");
let gate_start = dispatch_pos.saturating_sub(500);
let gate_window = &window[gate_start..dispatch_pos];
assert!(
gate_window.contains("active_relay_connect_txs"),
"CONNECT relay dispatch is not guarded by \
active_relay_connect_txs.contains(id). Without it, a \
duplicate Request retransmission could spawn a second \
driver before the first inserts into the dedup set."
);
}
}
}