use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use either::Either;
use freenet_stdlib::prelude::*;
use tokio::sync::mpsc;
use tokio::time::Instant;
use crate::message::{NetMessage, NetMessageV1, NodeEvent, Transaction};
use crate::node::OpManager;
use crate::operations::OpError;
use crate::ring::{ConnectionFailureReason, Location, PeerKeyLocation};
use crate::tracing::NetEventLog;
use super::{
ConnectMsg, ConnectOp, ConnectRequest, ForwardAttempt, RelayEnv, RelayState,
dispatch_expect_connection_from,
};
#[cfg(any(test, feature = "testing"))]
#[allow(dead_code)]
pub static RELAY_CONNECT_DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(dead_code)]
pub static RELAY_CONNECT_INFLIGHT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(dead_code)]
pub static RELAY_CONNECT_SPAWNED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(dead_code)]
pub static RELAY_CONNECT_COMPLETED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(dead_code)]
pub static RELAY_CONNECT_DEDUP_REJECTS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub(crate) struct RelayConnectInflightGuard {
pub(crate) op_manager: std::sync::Arc<OpManager>,
pub(crate) incoming_tx: Transaction,
}
impl Drop for RelayConnectInflightGuard {
fn drop(&mut self) {
self.op_manager
.active_relay_connect_txs
.remove(&self.incoming_tx);
RELAY_CONNECT_INFLIGHT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
RELAY_CONNECT_COMPLETED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
pub(crate) async fn start_client_connect(
gateway: PeerKeyLocation,
gateway_addr: SocketAddr,
op_manager: &OpManager,
own: PeerKeyLocation,
desired_location: Location,
overall_timeout: Option<std::time::Duration>,
) -> Result<(), OpError> {
let started_without_address = op_manager.ring.connection_manager.get_own_addr().is_none();
let ttl = op_manager
.ring
.max_hops_to_live
.max(1)
.min(u8::MAX as usize) as u8;
let target_connections = op_manager.ring.connection_manager.min_connections;
let failed_addrs = op_manager.ring.connection_manager.recently_failed_addrs();
let connected_addrs = op_manager.ring.connection_manager.connected_peer_addrs();
tracing::debug!(
failed = failed_addrs.len(),
connected = connected_addrs.len(),
"pre-populating bloom filter with excluded peer addresses"
);
let mut exclude_addrs = failed_addrs;
exclude_addrs.extend(connected_addrs);
let (tx, _legacy_op, request_msg) = ConnectOp::initiate_join_request(
own.clone(),
gateway.clone(),
desired_location,
ttl,
target_connections,
op_manager.connect_forward_estimator.clone(),
&exclude_addrs,
);
if let Some(event) = NetEventLog::connect_request_sent(
&tx,
&op_manager.ring,
desired_location,
own,
gateway.clone(),
ttl,
true, ) {
op_manager.ring.register_events(Either::Left(event)).await;
}
tracing::debug!(
gateway = %gateway.pub_key(),
tx = %tx,
target_connections,
ttl,
"Initiating gateway connect (task-per-tx)"
);
let mut ctx = op_manager.op_ctx(tx);
let capacity = compute_reply_capacity(target_connections);
let receiver = match ctx
.send_to_and_collect_replies(
gateway_addr,
NetMessage::V1(NetMessageV1::Connect(request_msg)),
capacity,
)
.await
{
Ok(rx) => rx,
Err(e) => {
op_manager
.ring
.connection_manager
.prune_in_transit_connection(gateway_addr);
return Err(e);
}
};
let inner = drive_client_connect_inner(
tx,
gateway,
gateway_addr,
desired_location,
target_connections,
started_without_address,
receiver,
op_manager,
);
let outcome = match overall_timeout {
Some(timeout) => match tokio::time::timeout(timeout, inner).await {
Ok(result) => result,
Err(_) => {
tracing::debug!(
%tx,
timeout_ms = timeout.as_millis(),
"connect driver: overall timeout fired; exiting gracefully"
);
Ok(())
}
},
None => inner.await,
};
op_manager.release_pending_op_slot(tx).await;
outcome
}
#[allow(clippy::too_many_arguments)]
async fn drive_client_connect_inner(
tx: Transaction,
gateway: PeerKeyLocation,
gateway_addr: SocketAddr,
desired_location: Location,
target_connections: usize,
started_without_address: bool,
mut receiver: mpsc::Receiver<NetMessage>,
op_manager: &OpManager,
) -> Result<(), OpError> {
use std::collections::HashSet;
let mut accepted: HashSet<PeerKeyLocation> = HashSet::with_capacity(target_connections);
loop {
if should_exit_for_ttl(&tx) {
tracing::debug!(
%tx,
accepted = accepted.len(),
target_connections,
"connect driver: transaction timed out; exiting"
);
return Ok(());
}
const RECV_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
let msg = match tokio::time::timeout(RECV_POLL_INTERVAL, receiver.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => break,
Err(_) => continue,
};
let connect_msg = match msg {
NetMessage::V1(NetMessageV1::Connect(c)) => c,
other => {
tracing::warn!(
tx = %tx,
"connect driver received unexpected message kind: {:?}",
other
);
continue;
}
};
match connect_msg {
ConnectMsg::Response { payload, .. } => {
if let Some(event) = NetEventLog::connect_response_received(
&tx,
&op_manager.ring,
payload.acceptor.clone(),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
let acceptor_addr = match payload.acceptor.socket_addr() {
Some(a) => a,
None => {
tracing::warn!(
tx = %tx,
"connect driver: acceptor missing socket_addr; skipping"
);
continue;
}
};
if !accepted.insert(payload.acceptor.clone()) {
continue;
}
op_manager
.notify_node_event(NodeEvent::ExpectPeerConnection {
addr: acceptor_addr,
})
.await?;
let (callback, mut rx) = mpsc::channel(1);
op_manager
.notify_node_event(NodeEvent::ConnectPeer {
peer: payload.acceptor.clone(),
tx,
callback,
is_gw: false,
})
.await?;
let hole_punch_ok = match rx.recv().await {
Some(Ok((peer_id, _remaining))) => {
tracing::info!(
%peer_id,
tx = %tx,
elapsed_ms = tx.elapsed().as_millis(),
"connect driver: joined peer"
);
let now = tokio::time::Instant::now();
op_manager.ring.connection_manager.record_acceptor_outcome(
acceptor_addr,
true,
now,
);
true
}
Some(Err(_)) => {
tracing::warn!(
tx = %tx,
elapsed_ms = tx.elapsed().as_millis(),
"connect driver: ConnectPeer failed"
);
false
}
None => {
tracing::warn!(
tx = %tx,
acceptor = %payload.acceptor,
"connect driver: ConnectPeer callback closed without result"
);
false
}
};
if !hole_punch_ok {
accepted.remove(&payload.acceptor);
let failed_msg = ConnectMsg::ConnectFailed {
id: tx,
failed_acceptor_addr: acceptor_addr,
};
tracing::info!(
tx = %tx,
failed_acceptor = %acceptor_addr,
gateway = %gateway_addr,
"connect driver: sending ConnectFailed to gateway for re-routing"
);
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SendNetMessage {
target: gateway_addr,
msg: Box::new(NetMessage::V1(NetMessageV1::Connect(failed_msg))),
})
.await
{
tracing::warn!(
tx = %tx,
error = %e,
"connect driver: failed to emit ConnectFailed"
);
}
continue;
}
if accepted.len() >= target_connections {
debug_assert!(
!started_without_address
|| op_manager.ring.connection_manager.get_own_addr().is_some(),
"BUG: Connect completed but joiner never received ObservedAddress. \
This indicates the transport layer may have prematurely filled in \
the joiner's address, preventing ObservedAddress emission."
);
op_manager
.ring
.connection_manager
.reset_connect_jitter_failures();
{
let mut backoff = op_manager.gateway_backoff.lock();
backoff.record_success(gateway_addr);
}
op_manager.ring.record_connection_success(desired_location);
tracing::info!(
tx = %tx,
accepted = accepted.len(),
target_connections,
"connect driver: target reached, completing"
);
return Ok(());
}
}
ConnectMsg::Rejected {
desired_location: dl,
..
} => {
tracing::info!(
tx = %tx,
desired_location = %dl,
"connect driver: explicit rejection from relay"
);
op_manager
.ring
.record_connection_failure(dl, ConnectionFailureReason::Rejected);
{
let mut backoff = op_manager.gateway_backoff.lock();
backoff.record_failure(gateway_addr);
}
return Ok(());
}
ConnectMsg::ObservedAddress { address, .. } => {
let location = Location::from_address(&address);
op_manager.ring.connection_manager.set_own_addr(address);
op_manager
.ring
.connection_manager
.update_location(Some(location));
tracing::info!(
tx = %tx,
observed_address = %address,
location = %location,
"connect driver: updated own_addr and location from ObservedAddress"
);
}
other @ ConnectMsg::Request { .. } | other @ ConnectMsg::ConnectFailed { .. } => {
tracing::debug!(
tx = %tx,
"connect driver: ignoring unexpected variant on bypass: {}",
other
);
}
}
}
tracing::debug!(
tx = %tx,
accepted = accepted.len(),
target_connections,
gateway = %gateway.pub_key(),
"connect driver: receiver closed"
);
Ok(())
}
fn compute_reply_capacity(target_connections: usize) -> usize {
target_connections.saturating_mul(2).max(2)
}
const RELAY_RECV_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
pub(crate) async fn start_relay_connect(
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
payload: ConnectRequest,
upstream_addr: SocketAddr,
) -> Result<(), OpError> {
#[cfg(any(test, feature = "testing"))]
RELAY_CONNECT_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_connect_txs.insert(incoming_tx) {
RELAY_CONNECT_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
%upstream_addr,
phase = "relay_connect_dedup_reject",
"CONNECT relay (task-per-tx): duplicate Request for in-flight tx, dropping"
);
return Ok(());
}
tracing::debug!(
tx = %incoming_tx,
desired_location = %payload.desired_location,
ttl = payload.ttl,
%upstream_addr,
phase = "relay_connect_start",
"CONNECT relay (task-per-tx): spawning driver"
);
RELAY_CONNECT_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_CONNECT_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let guard = RelayConnectInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
crate::operations::GlobalExecutor::spawn(run_relay_connect(
guard,
op_manager,
incoming_tx,
payload,
upstream_addr,
));
Ok(())
}
async fn run_relay_connect(
guard: RelayConnectInflightGuard,
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
payload: ConnectRequest,
upstream_addr: SocketAddr,
) {
let _guard = guard;
if let Err(err) = drive_relay_connect(&op_manager, incoming_tx, payload, upstream_addr).await {
tracing::warn!(
tx = %incoming_tx,
error = %err,
phase = "relay_connect_error",
"CONNECT relay (task-per-tx): driver returned error"
);
}
tokio::task::yield_now().await;
op_manager.release_pending_op_slot(incoming_tx).await;
}
async fn drive_relay_connect(
op_manager: &Arc<OpManager>,
incoming_tx: Transaction,
payload: ConnectRequest,
upstream_addr: SocketAddr,
) -> Result<(), OpError> {
let desired_location = payload.desired_location;
let mut request = payload;
request.visited = request.visited.with_transaction(&incoming_tx);
let mut state = RelayState {
upstream_addr,
request,
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let mut recency: HashMap<PeerKeyLocation, Instant> = HashMap::new();
let mut forward_attempts: HashMap<PeerKeyLocation, ForwardAttempt> = HashMap::new();
let estimator = op_manager.connect_forward_estimator.read().clone();
let initial_actions = {
let _admission = match op_manager.ring.connection_manager.try_admit_connect() {
Some(g) => g,
None => {
tracing::info!(
tx = %incoming_tx,
desired_location = %state.request.desired_location,
%upstream_addr,
"CONNECT relay (task-per-tx): gateway overloaded, rejecting request"
);
if let Some(event) = NetEventLog::connect_rejected(
&incoming_tx,
&op_manager.ring,
state.request.desired_location,
"gateway overloaded",
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Rejected {
id: incoming_tx,
desired_location: state.request.desired_location,
})),
)
.await?;
return Ok(());
}
};
let env = RelayEnv::new(op_manager);
let now = Instant::now();
state.handle_request(&env, &recency, &mut forward_attempts, &estimator, now)
};
if let Some(event) = NetEventLog::connect_request_received(
&incoming_tx,
&op_manager.ring,
state.request.desired_location,
state.request.joiner.clone(),
upstream_addr,
initial_actions
.forward
.as_ref()
.map(|(peer, _)| peer.clone()),
initial_actions.accept.is_some(),
state.request.ttl,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
if let Some((_target, address)) = initial_actions.observed_address {
tracing::debug!(
tx = %incoming_tx,
observed_address = %address,
sending_to = %upstream_addr,
"Sending ObservedAddress to joiner"
);
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::ObservedAddress {
id: incoming_tx,
address,
})),
)
.await?;
}
if let Some(ref accept) = initial_actions.accept {
dispatch_expect_connection_from(op_manager, incoming_tx, accept.joiner.clone()).await?;
}
let downstream_receiver = match initial_actions.forward {
Some((next, request)) => {
recency.insert(next.clone(), Instant::now());
match next.socket_addr() {
Some(addr) => {
let mut ctx = op_manager.op_ctx(incoming_tx);
let capacity =
compute_reply_capacity(op_manager.ring.connection_manager.min_connections);
match ctx
.send_to_and_collect_replies(
addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
id: incoming_tx,
payload: request,
})),
capacity,
)
.await
{
Ok(rx) => Some(rx),
Err(e) => {
tracing::warn!(
tx = %incoming_tx,
target = %addr,
error = %e,
"CONNECT relay (task-per-tx): failed to dispatch downstream Request"
);
None
}
}
}
None => {
tracing::warn!(
tx = %incoming_tx,
next_peer = %next.pub_key(),
"CONNECT relay (task-per-tx): next hop has no socket address; skipping forward"
);
None
}
}
}
None => None,
};
if let Some(accept) = initial_actions.accept {
if let Some(event) = NetEventLog::connect_response_sent(
&incoming_tx,
&op_manager.ring,
accept.response.acceptor.clone(),
state.request.joiner.clone(),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
id: incoming_tx,
payload: accept.response,
})),
)
.await?;
}
if initial_actions.rejected {
if let Some(event) = NetEventLog::connect_rejected(
&incoming_tx,
&op_manager.ring,
state.request.desired_location,
"rejected by handle_request",
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Rejected {
id: incoming_tx,
desired_location: state.request.desired_location,
})),
)
.await?;
return Ok(());
}
let Some(mut receiver) = downstream_receiver else {
return Ok(());
};
loop {
if should_exit_for_ttl(&incoming_tx) {
tracing::debug!(
tx = %incoming_tx,
phase = "relay_connect_ttl_exit",
"CONNECT relay (task-per-tx): transaction timed out; exiting"
);
return Ok(());
}
let msg = match tokio::time::timeout(RELAY_RECV_POLL_INTERVAL, receiver.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => return Ok(()),
Err(_) => continue,
};
let connect_msg = match msg {
NetMessage::V1(NetMessageV1::Connect(c)) => c,
other => {
tracing::warn!(
tx = %incoming_tx,
"CONNECT relay (task-per-tx): unexpected message kind: {:?}",
other
);
continue;
}
};
match connect_msg {
ConnectMsg::Response { payload, .. } => {
if let Some(ref fwd) = state.forwarded_to.clone() {
op_manager.connect_forward_estimator.write().record(
fwd,
desired_location,
true,
);
}
tracing::debug!(
tx = %incoming_tx,
%upstream_addr,
acceptor_pub_key = %payload.acceptor.pub_key(),
forwarded_from = ?state.forwarded_to,
"CONNECT relay (task-per-tx): forwarding Response upstream"
);
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
id: incoming_tx,
payload,
})),
)
.await?;
state.response_forwarded = true;
}
ConnectMsg::Rejected {
desired_location: dl,
..
} => {
let failed_peer = state.forwarded_to.take();
state.forwarded_at = None;
if let Some(ref fwd) = failed_peer {
op_manager
.connect_forward_estimator
.write()
.record(fwd, dl, false);
forward_attempts.remove(fwd);
}
let now = Instant::now();
let env = RelayEnv::new(op_manager);
let estimator = op_manager.connect_forward_estimator.read().clone();
let retry =
state.handle_request(&env, &recency, &mut forward_attempts, &estimator, now);
if let Some((peer, forward_req)) = retry.forward {
recency.insert(peer.clone(), now);
tracing::debug!(
tx = %incoming_tx,
failed_peer = ?failed_peer,
retry_peer = %peer.pub_key(),
"CONNECT relay (task-per-tx): retrying with different uphill peer after rejection"
);
if let Some(addr) = peer.socket_addr() {
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
id: incoming_tx,
payload: forward_req,
})),
)
.await?;
}
} else if let Some(accept) = retry.accept {
tracing::info!(
tx = %incoming_tx,
failed_peer = ?failed_peer,
acceptor = %accept.response.acceptor.pub_key(),
"CONNECT relay (task-per-tx): accepting locally after uphill rejection"
);
if let Some(event) = NetEventLog::connect_response_sent(
&incoming_tx,
&op_manager.ring,
accept.response.acceptor.clone(),
accept.joiner.clone(),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
dispatch_expect_connection_from(op_manager, incoming_tx, accept.joiner).await?;
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
id: incoming_tx,
payload: accept.response,
})),
)
.await?;
} else {
tracing::debug!(
tx = %incoming_tx,
%upstream_addr,
failed_peer = ?failed_peer,
"CONNECT relay (task-per-tx): forwarding rejection upstream (no retry peers)"
);
if let Some(event) = NetEventLog::connect_rejected(
&incoming_tx,
&op_manager.ring,
dl,
"relay no retry peers available",
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Rejected {
id: incoming_tx,
desired_location: dl,
})),
)
.await?;
return Ok(());
}
}
ConnectMsg::ObservedAddress { address, .. } => {
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::ObservedAddress {
id: incoming_tx,
address,
})),
)
.await?;
}
ConnectMsg::ConnectFailed {
failed_acceptor_addr,
..
} => {
let now = Instant::now();
if state.response_forwarded {
if let Some(ref fwd) = state.forwarded_to {
if let Some(fwd_addr) = fwd.socket_addr() {
op_manager.ring.connection_manager.record_acceptor_outcome(
failed_acceptor_addr,
false,
now,
);
tracing::debug!(
tx = %incoming_tx,
forwarded_to_addr = %fwd_addr,
failed_acceptor = %failed_acceptor_addr,
"CONNECT relay (task-per-tx): forwarding ConnectFailed downstream"
);
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
fwd_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::ConnectFailed {
id: incoming_tx,
failed_acceptor_addr,
})),
)
.await?;
state.response_forwarded = false;
continue;
}
}
}
let failed_peer = state.forwarded_to.clone();
op_manager.ring.connection_manager.record_acceptor_outcome(
failed_acceptor_addr,
false,
now,
);
if let Some(ref fwd) = failed_peer {
op_manager.connect_forward_estimator.write().record(
fwd,
desired_location,
false,
);
forward_attempts.remove(fwd);
}
state.request.visited.mark_visited(failed_acceptor_addr);
state.forwarded_to = None;
state.forwarded_at = None;
state.response_forwarded = false;
let env = RelayEnv::new(op_manager);
let estimator = op_manager.connect_forward_estimator.read().clone();
let retry =
state.handle_request(&env, &recency, &mut forward_attempts, &estimator, now);
if let Some((peer, forward_req)) = retry.forward {
recency.insert(peer.clone(), now);
tracing::debug!(
tx = %incoming_tx,
failed_acceptor = %failed_acceptor_addr,
retry_peer = %peer.pub_key(),
"CONNECT relay (task-per-tx): re-routing to different peer after ConnectFailed"
);
if let Some(addr) = peer.socket_addr() {
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
id: incoming_tx,
payload: forward_req,
})),
)
.await?;
}
} else if let Some(accept) = retry.accept {
tracing::info!(
tx = %incoming_tx,
failed_acceptor = %failed_acceptor_addr,
acceptor = %accept.response.acceptor.pub_key(),
"CONNECT relay (task-per-tx): accepting locally after ConnectFailed"
);
if let Some(event) = NetEventLog::connect_response_sent(
&incoming_tx,
&op_manager.ring,
accept.response.acceptor.clone(),
accept.joiner.clone(),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
dispatch_expect_connection_from(op_manager, incoming_tx, accept.joiner).await?;
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
id: incoming_tx,
payload: accept.response,
})),
)
.await?;
} else {
tracing::debug!(
tx = %incoming_tx,
%upstream_addr,
failed_acceptor = %failed_acceptor_addr,
"CONNECT relay (task-per-tx): propagating ConnectFailed upstream (no re-route)"
);
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::ConnectFailed {
id: incoming_tx,
failed_acceptor_addr,
})),
)
.await?;
}
}
ConnectMsg::Request { .. } => {
tracing::warn!(
tx = %incoming_tx,
"CONNECT relay (task-per-tx): unexpected Request on driver inbox; bypass logic regressed"
);
}
}
}
}
fn should_exit_for_ttl(tx: &Transaction) -> bool {
tx.timed_out()
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::{compute_reply_capacity, should_exit_for_ttl};
use crate::message::Transaction;
use crate::operations::connect::ConnectMsg;
#[test]
fn compute_reply_capacity_floor_is_two() {
assert_eq!(compute_reply_capacity(0), 2, "0 must clamp to 2");
assert_eq!(compute_reply_capacity(1), 2, "1*2 == 2");
}
#[test]
fn compute_reply_capacity_doubles_target() {
assert_eq!(compute_reply_capacity(5), 10);
assert_eq!(compute_reply_capacity(10), 20);
}
#[test]
fn compute_reply_capacity_saturates_on_overflow() {
assert_eq!(compute_reply_capacity(usize::MAX), usize::MAX);
}
#[test]
fn should_exit_for_ttl_returns_true_for_expired_tx() {
let expired = Transaction::ttl_transaction();
assert!(
should_exit_for_ttl(&expired),
"TTL-expired tx must trigger driver exit"
);
}
#[test]
fn should_exit_for_ttl_returns_false_for_fresh_tx() {
let fresh = Transaction::new::<ConnectMsg>();
assert!(
!should_exit_for_ttl(&fresh),
"fresh tx must NOT trigger driver exit on first iteration"
);
}
#[test]
fn connect_failed_carries_acceptor_addr_not_gateway() {
let acceptor_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 50)), 50001);
let id = Transaction::new::<ConnectMsg>();
let failed = ConnectMsg::ConnectFailed {
id,
failed_acceptor_addr: acceptor_addr,
};
#[allow(clippy::wildcard_enum_match_arm)]
match failed {
ConnectMsg::ConnectFailed {
failed_acceptor_addr,
..
} => {
assert_eq!(failed_acceptor_addr, acceptor_addr);
}
other => panic!("expected ConnectFailed, got {other:?}"),
}
}
#[test]
fn start_relay_connect_inserts_into_dedup_set() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let fn_start = SOURCE
.find("pub(crate) async fn start_relay_connect(")
.expect("start_relay_connect not found — driver removed or renamed");
let fn_end = SOURCE[fn_start..]
.find("\n}\n")
.expect("start_relay_connect has no closing brace");
let body = &SOURCE[fn_start..fn_start + fn_end];
assert!(
body.contains("active_relay_connect_txs.insert(incoming_tx)"),
"start_relay_connect MUST insert(incoming_tx) into the dedup \
set before spawning. Without this, racing duplicate Requests \
spawn N drivers and leak `RelayConnectInflightGuard`s."
);
assert!(
body.contains("RELAY_CONNECT_DEDUP_REJECTS"),
"start_relay_connect MUST increment RELAY_CONNECT_DEDUP_REJECTS \
when the dedup gate fires; without it, dedup storms are \
invisible to telemetry."
);
}
#[test]
fn drive_relay_connect_handles_all_reentry_variants() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let fn_start = SOURCE
.find("async fn drive_relay_connect(")
.expect("drive_relay_connect not found");
let fn_end = SOURCE[fn_start..]
.find("/// Decide whether the driver loop should exit")
.expect("drive_relay_connect has no successor function — guard outdated")
+ fn_start;
let body = &SOURCE[fn_start..fn_end];
for variant in &[
"ConnectMsg::Response { payload, .. }",
"ConnectMsg::Rejected {",
"ConnectMsg::ObservedAddress { address, .. }",
"ConnectMsg::ConnectFailed {",
] {
assert!(
body.contains(variant),
"drive_relay_connect MUST match `{variant}`. \
Removing this arm strands the variant on legacy \
process_message after commit 4 retires it."
);
}
assert!(
body.contains("should_exit_for_ttl(&incoming_tx)"),
"drive_relay_connect MUST poll should_exit_for_ttl in the \
receive loop; without it a wedged tx leaks the \
pending_op_results slot until the 60s sweep."
);
}
#[test]
fn relay_driver_admission_guard_scoped_to_initial_actions() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let fn_start = SOURCE
.find("async fn drive_relay_connect(")
.expect("drive_relay_connect not found");
let body = &SOURCE[fn_start..];
let guard_decl_pos = body
.find("let _admission = match op_manager.ring.connection_manager.try_admit_connect()")
.expect(
"driver no longer binds admission guard as `let _admission = match ... try_admit_connect()`. \
The RAII guard MUST be bound to `_admission` (drops at end of initial-actions scope) \
— not held across the recv loop. See plan risk #4 (admission slot starvation)."
);
let receive_loop_pos = body
.find("loop {\n if should_exit_for_ttl")
.expect("driver no longer has the recv loop with TTL exit");
assert!(
guard_decl_pos < receive_loop_pos,
"admission gate (`let _admission = match ... try_admit_connect()`) \
must appear BEFORE the recv loop with TTL exit, so the guard \
drops at the end of its block scope and does not span the \
receive loop."
);
}
}