use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use either::Either;
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
use crate::client_events::HostResult;
use crate::config::{GlobalExecutor, OPERATION_TTL};
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::NetworkBridge;
use crate::node::OpManager;
use crate::operations::OpError;
use crate::operations::op_ctx::{
AdvanceOutcome, AttemptOutcome, OpCtx, RetryDriver, RetryLoopOutcome, drive_retry_loop,
};
use crate::operations::orphan_streams::{OrphanStreamError, STREAM_CLAIM_TIMEOUT};
use crate::ring::{Location, PeerKeyLocation};
use crate::router::{RouteEvent, RouteOutcome};
use crate::tracing::{NetEventLog, OperationFailure, state_hash_full};
use crate::transport::peer_connection::StreamId;
use super::{PutFinalizationData, PutMsg, PutStreamingPayload, PutTerminalError, bound_cause};
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<Transaction, OpError> {
tracing::debug!(
tx = %client_tx,
contract = %contract.key(),
"put: spawning client-initiated task"
);
let inflight_guard = match op_manager.admit_client_op() {
Some(g) => g,
None => return Err(OpError::NodeShuttingDown),
};
GlobalExecutor::spawn(async move {
let _inflight_guard = inflight_guard;
run_client_put(
op_manager,
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
)
.await;
});
Ok(client_tx)
}
#[allow(clippy::too_many_arguments)]
async fn run_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) {
let outcome = drive_client_put(
op_manager.clone(),
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
)
.await;
deliver_outcome(&op_manager, client_tx, outcome);
}
#[derive(Debug)]
enum DriverOutcome {
Publish(HostResult),
InfrastructureError(OpError),
}
#[allow(clippy::too_many_arguments)]
async fn drive_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> DriverOutcome {
match drive_client_put_inner(
&op_manager,
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
)
.await
{
Ok(outcome) => outcome,
Err(err) => DriverOutcome::InfrastructureError(err),
}
}
#[allow(clippy::too_many_arguments)]
async fn drive_client_put_inner(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<DriverOutcome, OpError> {
let key = contract.key();
let mut tried: Vec<std::net::SocketAddr> = Vec::new();
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
tried.push(own_addr);
}
let initial_target = op_manager
.ring
.closest_potentially_hosting(&key, tried.as_slice());
let current_target = match initial_target {
Some(peer) => {
if let Some(addr) = peer.socket_addr() {
tried.push(addr);
}
peer
}
None => op_manager.ring.connection_manager.own_location(),
};
struct PutRetryDriver<'a> {
op_manager: &'a OpManager,
key: ContractKey,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
tried: Vec<std::net::SocketAddr>,
retries: usize,
current_target: PeerKeyLocation,
attempt_timeout: std::time::Duration,
max_advancements: usize,
}
impl RetryDriver for PutRetryDriver<'_> {
type Terminal = (Result<ContractKey, PutTerminalError>, Option<usize>);
fn new_attempt_tx(&mut self) -> Transaction {
Transaction::new::<PutMsg>()
}
fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {
NetMessage::from(PutMsg::Request {
id: attempt_tx,
contract: self.contract.clone(),
related_contracts: self.related.clone(),
value: self.value.clone(),
htl: self.htl,
skip_list: self
.op_manager
.ring
.connection_manager
.get_own_addr()
.into_iter()
.collect::<HashSet<_>>(),
})
}
fn classify(&mut self, reply: NetMessage) -> AttemptOutcome<Self::Terminal> {
match classify_reply(&reply) {
ReplyClass::Stored { key, hop_count } => {
AttemptOutcome::Terminal((Ok(key), Some(hop_count)))
}
ReplyClass::LocalCompletion { key } => AttemptOutcome::Terminal((Ok(key), Some(0))),
ReplyClass::TerminalError { cause } => AttemptOutcome::Terminal((Err(cause), None)),
ReplyClass::Unexpected => AttemptOutcome::Unexpected,
}
}
fn advance(&mut self) -> AdvanceOutcome {
#[cfg(any(test, feature = "testing"))]
PUT_RETRY_DRIVER_ADVANCE_CALLS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
match advance_to_next_peer(
self.op_manager,
&self.key,
&mut self.tried,
&mut self.retries,
self.max_advancements,
) {
Some((next_target, _next_addr)) => {
self.current_target = next_target;
AdvanceOutcome::Next
}
None => AdvanceOutcome::Exhausted,
}
}
fn attempt_timeout(&self) -> std::time::Duration {
self.attempt_timeout
}
}
let attempt_timeout =
compute_put_attempt_timeout(op_manager.streaming_threshold, &value, &contract);
let payload_size_estimate = value
.size()
.saturating_add(contract.data().len())
.saturating_add(contract.params().size());
let max_advancements = if crate::operations::should_use_streaming(
op_manager.streaming_threshold,
payload_size_estimate,
) {
MAX_PEER_ADVANCEMENTS_STREAMING
} else {
MAX_PEER_ADVANCEMENTS_NON_STREAMING
};
let mut driver = PutRetryDriver {
op_manager,
key,
contract,
related,
value,
htl,
tried,
retries: 0,
current_target,
attempt_timeout,
max_advancements,
};
let loop_result = drive_retry_loop(op_manager, client_tx, "put", &mut driver).await;
match loop_result {
RetryLoopOutcome::Done((Ok(reply_key), wire_hop_count)) => {
op_manager.completed(client_tx);
let contract_location = Location::from(&reply_key);
let route_event = RouteEvent {
peer: driver.current_target.clone(),
contract_location,
outcome: RouteOutcome::SuccessUntimed,
op_type: Some(crate::node::network_status::OpType::Put),
};
if let Some(log_event) =
crate::tracing::NetEventLog::route_event(&client_tx, &op_manager.ring, &route_event)
{
op_manager
.ring
.register_events(either::Either::Left(log_event))
.await;
}
op_manager.ring.routing_finished(route_event);
crate::node::network_status::record_op_result(
crate::node::network_status::OpType::Put,
true,
);
let max_htl = op_manager.ring.max_hops_to_live;
let hop_count = wire_hop_count.map(|hc| hc.min(max_htl));
super::finalize_put_at_originator(
op_manager,
client_tx,
reply_key,
PutFinalizationData {
sender: driver.current_target,
hop_count,
state_hash: None,
state_size: None,
},
false,
false,
)
.await;
maybe_subscribe_child(
op_manager,
client_tx,
reply_key,
subscribe,
blocking_subscribe,
)
.await;
Ok(DriverOutcome::Publish(Ok(HostResponse::ContractResponse(
ContractResponse::PutResponse { key: reply_key },
))))
}
RetryLoopOutcome::Done((Err(cause), _hop_count)) => {
op_manager.completed(client_tx);
Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into_string().into(),
}
.into())))
}
RetryLoopOutcome::Exhausted(cause) => {
Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into())))
}
RetryLoopOutcome::Unexpected => Err(OpError::UnexpectedOpState),
RetryLoopOutcome::InfraError(err) => Err(err),
}
}
#[derive(Debug)]
enum ReplyClass {
Stored {
key: ContractKey,
hop_count: usize,
},
LocalCompletion {
key: ContractKey,
},
TerminalError {
cause: PutTerminalError,
},
Unexpected,
}
fn classify_reply(msg: &NetMessage) -> ReplyClass {
match msg {
NetMessage::V1(NetMessageV1::Put(PutMsg::Response { key, hop_count, .. }))
| NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming { key, hop_count, .. })) => {
ReplyClass::Stored {
key: *key,
hop_count: *hop_count,
}
}
NetMessage::V1(NetMessageV1::Put(PutMsg::Request {
id: _, contract, ..
})) => ReplyClass::LocalCompletion {
key: contract.key(),
},
NetMessage::V1(NetMessageV1::Put(PutMsg::Error { cause, .. })) => {
ReplyClass::TerminalError {
cause: PutTerminalError::from_wire(cause.clone()),
}
}
_ => ReplyClass::Unexpected,
}
}
fn compute_put_attempt_timeout(
streaming_threshold: usize,
value: &WrappedState,
contract: &ContractContainer,
) -> std::time::Duration {
let payload_size_estimate = value
.size()
.saturating_add(contract.data().len())
.saturating_add(contract.params().size());
crate::operations::streaming_aware_attempt_timeout(streaming_threshold, payload_size_estimate)
}
pub(crate) const MAX_PEER_ADVANCEMENTS_NON_STREAMING: usize = 3;
pub(crate) const MAX_PEER_ADVANCEMENTS_STREAMING: usize = 0;
fn advance_to_next_peer(
op_manager: &OpManager,
key: &ContractKey,
tried: &mut Vec<std::net::SocketAddr>,
retries: &mut usize,
max_advancements: usize,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)> {
if *retries >= max_advancements {
return None;
}
*retries += 1;
let peer = op_manager
.ring
.closest_potentially_hosting(key, tried.as_slice())?;
let addr = peer.socket_addr()?;
tried.push(addr);
Some((peer, addr))
}
async fn maybe_subscribe_child(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
key: ContractKey,
subscribe: bool,
blocking_subscribe: bool,
) {
if !subscribe {
return;
}
use crate::operations::subscribe;
let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&client_tx);
if blocking_subscribe {
subscribe::run_client_subscribe(op_manager.clone(), *key.id(), child_tx).await;
} else {
GlobalExecutor::spawn(subscribe::run_client_subscribe(
op_manager.clone(),
*key.id(),
child_tx,
));
}
}
fn deliver_outcome(op_manager: &OpManager, client_tx: Transaction, outcome: DriverOutcome) {
match outcome {
DriverOutcome::Publish(result) => {
op_manager.send_client_result(client_tx, result);
}
DriverOutcome::InfrastructureError(err) => {
tracing::warn!(
tx = %client_tx,
error = %err,
"put: infrastructure error; publishing synthesized client error"
);
let synthesized: HostResult = Err(ErrorKind::OperationError {
cause: format!("PUT failed: {err}").into(),
}
.into());
op_manager.send_client_result(client_tx, synthesized);
}
}
}
#[cfg(any(test, feature = "testing"))]
pub static RELAY_PUT_DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[cfg(any(test, feature = "testing"))]
pub static PUT_RETRY_DRIVER_ADVANCE_CALLS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_INFLIGHT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_SPAWNED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_COMPLETED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_DEDUP_REJECTS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_relay_put<CB>(
op_manager: Arc<OpManager>,
conn_manager: CB,
incoming_tx: Transaction,
contract: ContractContainer,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
skip_list: HashSet<SocketAddr>,
upstream_addr: SocketAddr,
) -> Result<(), OpError>
where
CB: NetworkBridge + Clone + Send + 'static,
{
#[cfg(any(test, feature = "testing"))]
RELAY_PUT_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_put_txs.insert(incoming_tx) {
RELAY_PUT_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
contract = %contract.key(),
%upstream_addr,
phase = "relay_put_dedup_reject",
"PUT relay: duplicate Request for in-flight tx, dropping"
);
return Ok(());
}
tracing::debug!(
tx = %incoming_tx,
contract = %contract.key(),
htl,
%upstream_addr,
phase = "relay_put_start",
"PUT relay: spawning driver"
);
RELAY_PUT_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_PUT_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let guard = RelayPutInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
GlobalExecutor::spawn(run_relay_put(
guard,
op_manager,
conn_manager,
incoming_tx,
contract,
related_contracts,
value,
htl,
skip_list,
upstream_addr,
));
Ok(())
}
struct RelayPutInflightGuard {
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
}
impl Drop for RelayPutInflightGuard {
fn drop(&mut self) {
self.op_manager
.active_relay_put_txs
.remove(&self.incoming_tx);
RELAY_PUT_INFLIGHT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
RELAY_PUT_COMPLETED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[allow(clippy::too_many_arguments)]
async fn run_relay_put<CB>(
guard: RelayPutInflightGuard,
op_manager: Arc<OpManager>,
conn_manager: CB,
incoming_tx: Transaction,
contract: ContractContainer,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
skip_list: HashSet<SocketAddr>,
upstream_addr: SocketAddr,
) where
CB: NetworkBridge + Clone + Send + 'static,
{
let _guard = guard;
let drive_result = drive_relay_put(
&op_manager,
&conn_manager,
incoming_tx,
contract,
related_contracts,
value,
htl,
skip_list,
upstream_addr,
)
.await;
if let Err(err) = &drive_result {
if err.is_contract_queue_full() {
tracing::debug!(
tx = %incoming_tx,
error = %err,
phase = "relay_put_error",
event = "queue_full",
"PUT relay: driver returned error"
);
} else {
tracing::warn!(
tx = %incoming_tx,
error = %err,
phase = "relay_put_error",
"PUT relay: driver returned error"
);
}
}
let own_addr = op_manager.ring.connection_manager.get_own_addr();
let originator_loopback = Some(upstream_addr) == own_addr;
if originator_loopback {
if let Err(err) = drive_result {
let cause = bound_cause(err.to_string());
let error_msg = NetMessage::from(PutMsg::Error {
id: incoming_tx,
cause: cause.clone(),
});
let mut ctx = op_manager.op_ctx(incoming_tx);
if let Err(send_err) = ctx.send_local_loopback(error_msg).await {
tracing::warn!(
tx = %incoming_tx,
cause = %cause,
error = %send_err,
"PUT relay (task-per-tx): send_local_loopback for terminal-error \
failed; falling back to direct result_router_tx publish"
);
dispatch_loopback_shutdown_fallback(
&op_manager.result_router_tx,
incoming_tx,
cause,
);
}
}
} else if let Err(err) = drive_result {
let cause = bound_cause(err.to_string());
if let Err(send_err) =
relay_put_send_error(&op_manager, incoming_tx, cause.clone(), upstream_addr).await
{
tracing::warn!(
tx = %incoming_tx,
%upstream_addr,
cause = %cause,
error = %send_err,
"PUT relay: failed to bubble terminal-error upstream \
(multi-hop bubble); upstream will see OPERATION_TTL"
);
}
}
tokio::task::yield_now().await;
if !originator_loopback {
op_manager.release_pending_op_slot(incoming_tx).await;
}
}
#[allow(clippy::too_many_arguments)]
async fn drive_relay_put<CB>(
op_manager: &Arc<OpManager>,
conn_manager: &CB,
incoming_tx: Transaction,
contract: ContractContainer,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
skip_list: HashSet<SocketAddr>,
upstream_addr: SocketAddr,
) -> Result<(), OpError>
where
CB: NetworkBridge + Clone + Send + 'static,
{
let key = contract.key();
tracing::info!(
tx = %incoming_tx,
contract = %key,
htl,
%upstream_addr,
phase = "relay_put_request",
"PUT relay: processing Request"
);
let merged_value = relay_put_store_locally(
op_manager,
incoming_tx,
key,
value.clone(),
&contract,
related_contracts.clone(),
htl,
)
.await?;
let mut new_skip_list = skip_list;
new_skip_list.insert(upstream_addr);
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
new_skip_list.insert(own_addr);
}
let next_hop = if htl > 0 {
op_manager
.ring
.closest_potentially_hosting(&key, &new_skip_list)
} else {
None
};
let (next_peer, next_addr) = match next_hop {
Some(peer) => {
let addr = match peer.socket_addr() {
Some(a) => a,
None => {
tracing::error!(
tx = %incoming_tx,
contract = %key,
target_pub_key = %peer.pub_key(),
"PUT relay: next hop has no socket address"
);
let hop_count = op_manager.ring.max_hops_to_live.saturating_sub(htl);
return relay_put_finalize_local(
op_manager,
incoming_tx,
key,
merged_value,
upstream_addr,
hop_count,
)
.await;
}
};
(peer, addr)
}
None => {
tracing::info!(
tx = %incoming_tx,
contract = %key,
phase = "relay_put_complete",
"PUT relay: no next hop, finalizing at this node"
);
let hop_count = op_manager.ring.max_hops_to_live.saturating_sub(htl);
return relay_put_finalize_local(
op_manager,
incoming_tx,
key,
merged_value,
upstream_addr,
hop_count,
)
.await;
}
};
let new_htl = htl.saturating_sub(1);
if let Some(event) = NetEventLog::put_request(
&incoming_tx,
&op_manager.ring,
key,
next_peer.clone(),
new_htl,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
tracing::debug!(
tx = %incoming_tx,
contract = %key,
peer_addr = %next_addr,
htl = new_htl,
phase = "relay_put_forward",
"PUT relay: forwarding to next hop"
);
let own_addr = op_manager.ring.connection_manager.get_own_addr();
let originator_loopback = Some(upstream_addr) == own_addr;
let payload = PutStreamingPayload {
contract: contract.clone(),
related_contracts: related_contracts.clone(),
value: merged_value.clone(),
};
let payload_bytes = match bincode::serialize(&payload) {
Ok(b) => b,
Err(e) => {
return Err(OpError::NotificationChannelError(format!(
"Failed to serialize PUT relay forward payload: {e}"
)));
}
};
let payload_size = payload_bytes.len();
let upgrade_to_streaming =
crate::operations::should_use_streaming(op_manager.streaming_threshold, payload_size);
let mut ctx = op_manager.op_ctx(incoming_tx);
if originator_loopback {
if upgrade_to_streaming {
let stream_id = StreamId::next_operations();
let metadata_msg = NetMessage::from(PutMsg::RequestStreaming {
id: incoming_tx,
stream_id,
contract_key: key,
total_size: payload_size as u64,
htl: new_htl,
skip_list: new_skip_list.clone(),
subscribe: false,
});
if let Err(err) = ctx.send_fire_and_forget(next_addr, metadata_msg).await {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
error = %err,
"PUT relay (loopback, loopback): \
streaming-upgrade send_fire_and_forget failed"
);
return Err(err);
}
if let Err(err) = conn_manager
.send_stream(
next_addr,
stream_id,
bytes::Bytes::from(payload_bytes),
None,
)
.await
{
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
%stream_id,
error = %err,
"PUT relay (loopback, loopback): send_stream failed"
);
return Err(OpError::NotificationChannelError(format!(
"send_stream failed: {err}"
)));
}
} else {
let forward = NetMessage::from(PutMsg::Request {
id: incoming_tx,
contract,
related_contracts,
value: merged_value,
htl: new_htl,
skip_list: new_skip_list,
});
if let Err(err) = ctx.send_fire_and_forget(next_addr, forward).await {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
error = %err,
"PUT relay (loopback, loopback): send_fire_and_forget failed"
);
return Err(err);
}
}
return Ok(());
}
let round_trip = if upgrade_to_streaming {
let stream_id = StreamId::next_operations();
tracing::info!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
payload_size,
%stream_id,
phase = "relay_put_forward_upgrade",
"PUT relay: payload exceeds threshold, upgrading to streaming"
);
let metadata_msg = NetMessage::from(PutMsg::RequestStreaming {
id: incoming_tx,
stream_id,
contract_key: key,
total_size: payload_size as u64,
htl: new_htl,
skip_list: new_skip_list.clone(),
subscribe: false,
});
let mut reply_rx = match ctx
.send_to_and_register_waiter(next_addr, metadata_msg)
.await
{
Ok(rx) => rx,
Err(err) => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
error = %err,
"PUT relay: streaming-upgrade send_to_and_register_waiter failed"
);
op_manager.release_pending_op_slot(incoming_tx).await;
return Err(err);
}
};
if let Err(err) = conn_manager
.send_stream(
next_addr,
stream_id,
bytes::Bytes::from(payload_bytes),
None,
)
.await
{
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
%stream_id,
error = %err,
"PUT relay: send_stream failed during streaming upgrade"
);
op_manager.release_pending_op_slot(incoming_tx).await;
return Err(OpError::NotificationChannelError(format!(
"send_stream failed: {err}"
)));
}
tokio::time::timeout(OPERATION_TTL, async move {
reply_rx.recv().await.ok_or(OpError::NotificationError)
})
.await
} else {
let forward = NetMessage::from(PutMsg::Request {
id: incoming_tx,
contract,
related_contracts,
value: merged_value,
htl: new_htl,
skip_list: new_skip_list,
});
tokio::time::timeout(OPERATION_TTL, ctx.send_to_and_await(next_addr, forward)).await
};
op_manager.release_pending_op_slot(incoming_tx).await;
let reply = match round_trip {
Ok(Ok(reply)) => reply,
Ok(Err(err)) => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
error = %err,
"PUT relay: send_to_and_await failed"
);
crate::operations::record_relay_route_event(
op_manager,
next_peer.clone(),
crate::ring::Location::from(&key),
crate::router::RouteOutcome::Failure,
crate::node::network_status::OpType::Put,
);
return Err(err);
}
Err(_elapsed) => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
timeout_secs = OPERATION_TTL.as_secs(),
"PUT relay: downstream timed out"
);
crate::operations::record_relay_route_event(
op_manager,
next_peer.clone(),
crate::ring::Location::from(&key),
crate::router::RouteOutcome::Failure,
crate::node::network_status::OpType::Put,
);
return Err(OpError::UnexpectedOpState);
}
};
match reply {
NetMessage::V1(NetMessageV1::Put(PutMsg::Response {
key: reply_key,
hop_count: downstream_hop_count,
..
})) => {
tracing::info!(
tx = %incoming_tx,
contract = %reply_key,
phase = "relay_put_bubble",
"PUT relay: downstream returned Response; bubbling upstream"
);
crate::operations::record_relay_route_event(
op_manager,
next_peer.clone(),
crate::ring::Location::from(&reply_key),
crate::router::RouteOutcome::SuccessUntimed,
crate::node::network_status::OpType::Put,
);
relay_put_send_response(
op_manager,
incoming_tx,
reply_key,
upstream_addr,
downstream_hop_count,
)
.await
}
NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming {
key: reply_key,
hop_count: downstream_hop_count,
..
})) => {
tracing::warn!(
tx = %incoming_tx,
contract = %reply_key,
phase = "relay_put_bubble_streaming_downgrade",
"PUT relay: downstream returned ResponseStreaming — \
synthesizing non-streaming Response upstream (slice A limitation)"
);
crate::operations::record_relay_route_event(
op_manager,
next_peer.clone(),
crate::ring::Location::from(&reply_key),
crate::router::RouteOutcome::SuccessUntimed,
crate::node::network_status::OpType::Put,
);
relay_put_send_response(
op_manager,
incoming_tx,
reply_key,
upstream_addr,
downstream_hop_count,
)
.await
}
NetMessage::V1(NetMessageV1::Put(PutMsg::Error {
cause: downstream_cause,
..
})) => {
let bubbled = bound_cause(downstream_cause);
tracing::warn!(
tx = %incoming_tx,
contract = %key,
cause = %bubbled,
phase = "relay_put_bubble_error_upstream",
"PUT relay: downstream PutMsg::Error — propagating cause to upstream"
);
relay_put_send_error(op_manager, incoming_tx, bubbled, upstream_addr).await
}
other => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
reply_variant = ?std::mem::discriminant(&other),
"PUT relay: unexpected reply variant; treating as failure"
);
Err(OpError::UnexpectedOpState)
}
}
}
async fn relay_put_store_locally(
op_manager: &Arc<OpManager>,
incoming_tx: Transaction,
key: ContractKey,
value: WrappedState,
contract: &ContractContainer,
related_contracts: RelatedContracts<'static>,
htl: usize,
) -> Result<WrappedState, OpError> {
let was_hosting = op_manager.ring.is_hosting_contract(&key);
let (merged_value, _state_changed) = match super::put_contract(
op_manager,
key,
value.clone(),
related_contracts,
contract,
)
.await
{
Ok(result) => result,
Err(err) => {
if err.is_contract_queue_full() {
tracing::debug!(
tx = %incoming_tx,
contract = %key,
error = %err,
htl,
event = "queue_full",
"PUT relay: per-contract queue saturated"
);
} else {
tracing::error!(
tx = %incoming_tx,
contract = %key,
error = %err,
htl,
"PUT relay: put_contract failed"
);
}
if let Some(event) = NetEventLog::put_failure(
&incoming_tx,
&op_manager.ring,
key,
OperationFailure::ContractError(err.to_string()),
Some(op_manager.ring.max_hops_to_live.saturating_sub(htl)),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
return Err(err);
}
};
if !was_hosting {
let evicted = op_manager
.ring
.host_contract(key, value.size() as u64, crate::ring::AccessType::Put)
.evicted;
crate::operations::announce_contract_hosted(op_manager, &key).await;
let mut removed_contracts = Vec::new();
for (evicted_key, expected_generation) in evicted {
if op_manager
.interest_manager
.unregister_local_hosting(&evicted_key)
{
removed_contracts.push(evicted_key);
}
crate::operations::reclaim_evicted_contract(
op_manager,
evicted_key,
expected_generation,
);
}
let became_interested = op_manager.interest_manager.register_local_hosting(&key);
let added = if became_interested { vec![key] } else { vec![] };
if !added.is_empty() || !removed_contracts.is_empty() {
crate::operations::broadcast_change_interests(op_manager, added, removed_contracts)
.await;
}
}
debug_assert!(
op_manager.ring.is_hosting_contract(&key),
"PUT relay: contract {key} must be in hosting list after put_contract + host_contract"
);
Ok(merged_value)
}
async fn relay_put_finalize_local(
op_manager: &OpManager,
incoming_tx: Transaction,
key: ContractKey,
merged_value: WrappedState,
upstream_addr: SocketAddr,
hop_count: usize,
) -> Result<(), OpError> {
let own_location = op_manager.ring.connection_manager.own_location();
let hash = Some(state_hash_full(&merged_value));
let size = Some(merged_value.len());
if let Some(event) = NetEventLog::put_success(
&incoming_tx,
&op_manager.ring,
key,
own_location,
Some(hop_count),
hash,
size,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
relay_put_send_response(op_manager, incoming_tx, key, upstream_addr, hop_count).await
}
async fn relay_put_send_response(
op_manager: &OpManager,
incoming_tx: Transaction,
key: ContractKey,
upstream_addr: SocketAddr,
hop_count: usize,
) -> Result<(), OpError> {
let msg = NetMessage::from(PutMsg::Response {
id: incoming_tx,
key,
hop_count,
});
let mut ctx = op_manager.op_ctx(incoming_tx);
let own_addr = op_manager.ring.connection_manager.get_own_addr();
if Some(upstream_addr) == own_addr {
ctx.send_local_loopback(msg)
.await
.map_err(|_| OpError::NotificationError)
} else {
ctx.send_fire_and_forget(upstream_addr, msg)
.await
.map_err(|_| OpError::NotificationError)
}
}
async fn relay_put_send_error(
op_manager: &OpManager,
incoming_tx: Transaction,
cause: String,
upstream_addr: SocketAddr,
) -> Result<(), OpError> {
let mut ctx = op_manager.op_ctx(incoming_tx);
let own_addr = op_manager.ring.connection_manager.get_own_addr();
relay_put_send_error_with_ctx(&mut ctx, own_addr, incoming_tx, cause, upstream_addr).await
}
fn dispatch_loopback_shutdown_fallback(
result_router_tx: &tokio::sync::mpsc::Sender<(Transaction, HostResult)>,
incoming_tx: Transaction,
cause: String,
) {
let host_err: freenet_stdlib::client_api::ClientError = ErrorKind::OperationError {
cause: cause.into(),
}
.into();
if let Err(err) = result_router_tx.try_send((incoming_tx, Err(host_err))) {
tracing::error!(
tx = %incoming_tx,
error = %err,
"PUT relay shutdown fallback: result_router_tx full or closed; \
client may not see the real failure cause (degraded but \
non-fatal — no panic, no slot mutation)"
);
}
}
async fn relay_put_send_error_with_ctx(
ctx: &mut OpCtx,
own_addr: Option<SocketAddr>,
incoming_tx: Transaction,
cause: String,
upstream_addr: SocketAddr,
) -> Result<(), OpError> {
let msg = NetMessage::from(PutMsg::Error {
id: incoming_tx,
cause,
});
if Some(upstream_addr) == own_addr {
ctx.send_local_loopback(msg)
.await
.map_err(|_| OpError::NotificationError)
} else {
ctx.send_fire_and_forget(upstream_addr, msg)
.await
.map_err(|_| OpError::NotificationError)
}
}
#[cfg(any(test, feature = "testing"))]
pub static RELAY_PUT_STREAMING_DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_STREAMING_INFLIGHT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_STREAMING_SPAWNED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_STREAMING_COMPLETED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_STREAMING_DEDUP_REJECTS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_relay_put_streaming<CB>(
op_manager: Arc<OpManager>,
conn_manager: CB,
incoming_tx: Transaction,
stream_id: StreamId,
contract_key: ContractKey,
total_size: u64,
htl: usize,
skip_list: HashSet<SocketAddr>,
subscribe: bool,
upstream_addr: SocketAddr,
) -> Result<(), OpError>
where
CB: NetworkBridge + Clone + Send + 'static,
{
#[cfg(any(test, feature = "testing"))]
RELAY_PUT_STREAMING_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_put_txs.insert(incoming_tx) {
RELAY_PUT_STREAMING_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
contract = %contract_key,
%upstream_addr,
phase = "relay_put_streaming_dedup_reject",
"PUT streaming relay: duplicate Request for in-flight tx, dropping"
);
return Ok(());
}
tracing::debug!(
tx = %incoming_tx,
contract = %contract_key,
%stream_id,
total_size,
htl,
%upstream_addr,
phase = "relay_put_streaming_start",
"PUT streaming relay: spawning driver"
);
RELAY_PUT_STREAMING_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_PUT_STREAMING_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let guard = RelayPutStreamingInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
GlobalExecutor::spawn(run_relay_put_streaming(
guard,
op_manager,
conn_manager,
incoming_tx,
stream_id,
contract_key,
total_size,
htl,
skip_list,
subscribe,
upstream_addr,
));
Ok(())
}
struct RelayPutStreamingInflightGuard {
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
}
impl Drop for RelayPutStreamingInflightGuard {
fn drop(&mut self) {
self.op_manager
.active_relay_put_txs
.remove(&self.incoming_tx);
RELAY_PUT_STREAMING_INFLIGHT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
RELAY_PUT_STREAMING_COMPLETED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[allow(clippy::too_many_arguments)]
async fn run_relay_put_streaming<CB>(
guard: RelayPutStreamingInflightGuard,
op_manager: Arc<OpManager>,
conn_manager: CB,
incoming_tx: Transaction,
stream_id: StreamId,
contract_key: ContractKey,
total_size: u64,
htl: usize,
skip_list: HashSet<SocketAddr>,
subscribe: bool,
upstream_addr: SocketAddr,
) where
CB: NetworkBridge + Clone + Send + 'static,
{
let _guard = guard;
if let Err(err) = drive_relay_put_streaming(
&op_manager,
&conn_manager,
incoming_tx,
stream_id,
contract_key,
total_size,
htl,
skip_list,
subscribe,
upstream_addr,
)
.await
{
if err.is_contract_queue_full() {
tracing::debug!(
tx = %incoming_tx,
error = %err,
phase = "relay_put_streaming_error",
event = "queue_full",
"PUT streaming relay: driver returned error"
);
} else {
tracing::warn!(
tx = %incoming_tx,
error = %err,
phase = "relay_put_streaming_error",
"PUT streaming relay: driver returned error"
);
}
}
tokio::task::yield_now().await;
op_manager.release_pending_op_slot(incoming_tx).await;
}
#[allow(clippy::too_many_arguments)]
async fn drive_relay_put_streaming<CB>(
op_manager: &Arc<OpManager>,
conn_manager: &CB,
incoming_tx: Transaction,
stream_id: StreamId,
contract_key: ContractKey,
total_size: u64,
htl: usize,
skip_list: HashSet<SocketAddr>,
subscribe: bool,
upstream_addr: SocketAddr,
) -> Result<(), OpError>
where
CB: NetworkBridge + Clone + Send + 'static,
{
tracing::info!(
tx = %incoming_tx,
contract = %contract_key,
%stream_id,
total_size,
htl,
%upstream_addr,
phase = "relay_put_streaming_request",
"PUT streaming relay: processing RequestStreaming"
);
let stream_handle = match op_manager
.orphan_stream_registry()
.claim_or_wait(upstream_addr, stream_id, STREAM_CLAIM_TIMEOUT)
.await
{
Ok(handle) => handle,
Err(OrphanStreamError::AlreadyClaimed) => {
tracing::debug!(
tx = %incoming_tx,
%stream_id,
"PUT streaming relay: stream already claimed, skipping"
);
return Ok(());
}
Err(err) => {
tracing::error!(
tx = %incoming_tx,
%stream_id,
error = %err,
"PUT streaming relay: orphan stream claim failed"
);
return Err(OpError::OrphanStreamClaimFailed);
}
};
let mut new_skip_list = skip_list;
new_skip_list.insert(upstream_addr);
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
new_skip_list.insert(own_addr);
}
let next_hop = if htl > 0 {
op_manager
.ring
.closest_potentially_hosting(&contract_key, &new_skip_list)
} else {
None
};
let next_hop_addr = next_hop.as_ref().and_then(|p| p.socket_addr());
let piping = if let Some(next_addr) = next_hop_addr {
if crate::operations::should_use_streaming(
op_manager.streaming_threshold,
total_size as usize,
) {
let outbound_sid = StreamId::next_operations();
let forked_handle = stream_handle.fork();
let new_htl = htl.saturating_sub(1);
let pipe_metadata = PutMsg::RequestStreaming {
id: incoming_tx,
stream_id: outbound_sid,
contract_key,
total_size,
htl: new_htl,
skip_list: new_skip_list.clone(),
subscribe,
};
let pipe_metadata_net: NetMessage = pipe_metadata.into();
let embedded_metadata = match bincode::serialize(&pipe_metadata_net) {
Ok(bytes) => Some(bytes::Bytes::from(bytes)),
Err(e) => {
tracing::warn!(
tx = %incoming_tx,
error = %e,
"Failed to serialize piped stream metadata for embedding"
);
None
}
};
tracing::info!(
tx = %incoming_tx,
inbound_stream_id = %stream_id,
outbound_stream_id = %outbound_sid,
total_size,
peer_addr = %next_addr,
"Starting piped stream forwarding to next hop"
);
if let Some(ref peer) = next_hop {
if let Some(event) = NetEventLog::put_request(
&incoming_tx,
&op_manager.ring,
contract_key,
peer.clone(),
new_htl,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
Some((
next_addr,
pipe_metadata_net,
outbound_sid,
forked_handle,
embedded_metadata,
))
} else {
None
}
} else {
None
};
let downstream_reply_rx: Option<(SocketAddr, tokio::sync::mpsc::Receiver<NetMessage>)> =
if let Some((next_addr, metadata_net, outbound_sid, forked_handle, embedded_metadata)) =
piping
{
let mut ctx = op_manager.op_ctx(incoming_tx);
let rx_opt: Option<tokio::sync::mpsc::Receiver<NetMessage>> = match ctx
.send_to_and_register_waiter(next_addr, metadata_net)
.await
{
Ok(rx) => Some(rx),
Err(err) => {
tracing::warn!(
tx = %incoming_tx,
target = %next_addr,
error = %err,
"PUT streaming relay: metadata register_waiter failed; will finalize locally"
);
None
}
};
if rx_opt.is_some() {
if let Err(err) = conn_manager
.pipe_stream(next_addr, outbound_sid, forked_handle, embedded_metadata)
.await
{
tracing::warn!(
tx = %incoming_tx,
target = %next_addr,
error = %err,
"PUT streaming relay: pipe_stream failed after waiter install; \
will wait on downstream reply and bubble what we get"
);
}
}
rx_opt.map(|rx| (next_addr, rx))
} else {
None
};
let stream_data = match stream_handle.assemble().await {
Ok(data) => data,
Err(err) => {
tracing::error!(
tx = %incoming_tx,
%stream_id,
error = %err,
"PUT streaming relay: stream assembly failed"
);
return Err(OpError::StreamCancelled);
}
};
let payload: PutStreamingPayload = match bincode::deserialize(&stream_data) {
Ok(p) => p,
Err(err) => {
tracing::error!(
tx = %incoming_tx,
%stream_id,
error = %err,
"PUT streaming relay: payload deserialize failed"
);
return Err(OpError::invalid_transition(incoming_tx));
}
};
let PutStreamingPayload {
contract,
value,
related_contracts,
} = payload;
let key = contract.key();
if key != contract_key {
tracing::error!(
tx = %incoming_tx,
expected = %contract_key,
actual = %key,
"PUT streaming relay: contract key mismatch"
);
return Err(OpError::invalid_transition(incoming_tx));
}
let merged_value = relay_put_store_locally(
op_manager,
incoming_tx,
key,
value,
&contract,
related_contracts,
htl,
)
.await?;
if let Some((next_addr, mut rx)) = downstream_reply_rx {
let reply = match tokio::time::timeout(OPERATION_TTL, rx.recv()).await {
Ok(Some(reply)) => reply,
Ok(None) => {
tracing::warn!(
tx = %incoming_tx,
target = %next_addr,
"PUT streaming relay: downstream reply channel closed before reply"
);
if let Some(ref peer) = next_hop {
crate::operations::record_relay_route_event(
op_manager,
peer.clone(),
crate::ring::Location::from(&key),
crate::router::RouteOutcome::Failure,
crate::node::network_status::OpType::Put,
);
}
op_manager.release_pending_op_slot(incoming_tx).await;
let hop_count = op_manager.ring.max_hops_to_live.saturating_sub(htl);
return relay_put_send_response(
op_manager,
incoming_tx,
key,
upstream_addr,
hop_count,
)
.await;
}
Err(_elapsed) => {
tracing::warn!(
tx = %incoming_tx,
target = %next_addr,
"PUT streaming relay: downstream reply timed out"
);
if let Some(ref peer) = next_hop {
crate::operations::record_relay_route_event(
op_manager,
peer.clone(),
crate::ring::Location::from(&key),
crate::router::RouteOutcome::Failure,
crate::node::network_status::OpType::Put,
);
}
op_manager.release_pending_op_slot(incoming_tx).await;
let hop_count = op_manager.ring.max_hops_to_live.saturating_sub(htl);
return relay_put_send_response(
op_manager,
incoming_tx,
key,
upstream_addr,
hop_count,
)
.await;
}
};
op_manager.release_pending_op_slot(incoming_tx).await;
match reply {
NetMessage::V1(NetMessageV1::Put(PutMsg::Response {
key: reply_key,
hop_count: downstream_hop_count,
..
}))
| NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming {
key: reply_key,
hop_count: downstream_hop_count,
..
})) => {
tracing::info!(
tx = %incoming_tx,
contract = %reply_key,
phase = "relay_put_streaming_bubble",
"PUT streaming relay: downstream replied; bubbling Response upstream"
);
if let Some(ref peer) = next_hop {
crate::operations::record_relay_route_event(
op_manager,
peer.clone(),
crate::ring::Location::from(&reply_key),
crate::router::RouteOutcome::SuccessUntimed,
crate::node::network_status::OpType::Put,
);
}
relay_put_send_response(
op_manager,
incoming_tx,
reply_key,
upstream_addr,
downstream_hop_count,
)
.await
}
other => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
reply_variant = ?std::mem::discriminant(&other),
"PUT streaming relay: unexpected reply variant"
);
let hop_count = op_manager.ring.max_hops_to_live.saturating_sub(htl);
relay_put_send_response(op_manager, incoming_tx, key, upstream_addr, hop_count)
.await
}
}
} else {
let hop_count = op_manager.ring.max_hops_to_live.saturating_sub(htl);
relay_put_finalize_local(
op_manager,
incoming_tx,
key,
merged_value,
upstream_addr,
hop_count,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
fn dummy_key() -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([1u8; 32]), CodeHash::new([2u8; 32]))
}
fn dummy_tx() -> Transaction {
Transaction::new::<PutMsg>()
}
#[test]
fn run_relay_put_wrappers_gate_queue_full_log_severity() {
let src = include_str!("op_ctx_task.rs");
for wrapper in [
"async fn run_relay_put<",
"async fn run_relay_put_streaming<",
] {
let start = src
.find(wrapper)
.unwrap_or_else(|| panic!("{wrapper} not found"));
let after = &src[start + 1..];
let end = after
.find("\nasync fn ")
.or_else(|| after.find("\n#[cfg(test)]"))
.unwrap_or(after.len());
let body = &src[start..start + 1 + end];
assert!(
body.contains("is_contract_queue_full()"),
"{wrapper} must gate its WARN log on \
err.is_contract_queue_full() — see issue #4251 and PR #4253"
);
assert!(
body.contains("event = \"queue_full\""),
"{wrapper} must tag the DEBUG branch with \
event = \"queue_full\" so log filtering / telemetry can \
distinguish queue-full backpressure from real failures"
);
assert!(
body.contains("tracing::debug!") && body.contains("tracing::warn!"),
"{wrapper} must keep BOTH a debug! (queue_full) and a warn! \
(real failures) call — an inversion that maps queue_full to \
warn would re-open the spam"
);
}
}
#[test]
fn streaming_put_retry_budget_does_not_exceed_client_patience() {
let worst_case = std::time::Duration::from_secs(
(MAX_PEER_ADVANCEMENTS_STREAMING as u64 + 1)
* crate::operations::STREAMING_ATTEMPT_TIMEOUT_CAP.as_secs(),
);
assert!(
worst_case <= crate::operations::STREAMING_ATTEMPT_TIMEOUT_CAP,
"streaming PUT worst-case wall-clock {worst_case:?} exceeds \
one STREAMING_ATTEMPT_TIMEOUT_CAP \
({:?}); freenet-git#53 will recur. Either reduce \
MAX_PEER_ADVANCEMENTS_STREAMING (currently {}) or \
STREAMING_ATTEMPT_TIMEOUT_CAP.",
crate::operations::STREAMING_ATTEMPT_TIMEOUT_CAP,
MAX_PEER_ADVANCEMENTS_STREAMING,
);
assert_eq!(
MAX_PEER_ADVANCEMENTS_STREAMING, 0,
"MAX_PEER_ADVANCEMENTS_STREAMING must be 0 (single \
attempt, no advancement). N>0 silently allows N+1 \
attempts and busts the WS-client budget — that's the \
original freenet-git#53 footgun. Raising this requires \
updating both this pin and the rationale on the constant."
);
assert_eq!(
MAX_PEER_ADVANCEMENTS_NON_STREAMING, 3,
"non-streaming PUTs keep the legacy 3-advancement budget \
(= 4 attempts total); this test does not authorize a \
reduction (would shrink the k_closest fan-out coverage \
for small payloads)."
);
}
#[test]
fn advance_to_next_peer_at_streaming_cap_exhausts_immediately() {
let cap = MAX_PEER_ADVANCEMENTS_STREAMING;
let mut retries: usize = 0;
let allow_first_advance = retries < cap;
assert!(
!allow_first_advance,
"MAX_PEER_ADVANCEMENTS_STREAMING ({cap}) must NOT permit \
any advancement — drive_retry_loop's first attempt has \
already run before advance() is called. Permitting even \
one advancement re-opens the 2x-budget bug freenet-git#53."
);
retries = 0;
for round in 0..MAX_PEER_ADVANCEMENTS_NON_STREAMING {
assert!(
retries < MAX_PEER_ADVANCEMENTS_NON_STREAMING,
"non-streaming advance round {round} should be allowed"
);
retries += 1;
}
assert!(
retries >= MAX_PEER_ADVANCEMENTS_NON_STREAMING,
"non-streaming exhausts after {MAX_PEER_ADVANCEMENTS_NON_STREAMING} advancements"
);
}
#[test]
fn drive_client_put_inner_dispatches_streaming_cap_on_should_use_streaming() {
let src = include_str!("op_ctx_task.rs");
let entry = src
.find("fn drive_client_put_inner")
.expect("drive_client_put_inner must exist");
let cap_decision = src[entry..]
.find("let max_advancements =")
.expect("drive_client_put_inner must compute `let max_advancements = …`");
let window = &src[entry + cap_decision..entry + cap_decision + 1500];
assert!(
window.contains("should_use_streaming("),
"drive_client_put_inner's max_advancements selection must \
gate on should_use_streaming(threshold, \
payload_size_estimate). A flat \
MAX_PEER_ADVANCEMENTS_NON_STREAMING for all PUTs re-opens \
freenet-git#53."
);
assert!(
window.contains("MAX_PEER_ADVANCEMENTS_STREAMING")
&& window.contains("MAX_PEER_ADVANCEMENTS_NON_STREAMING"),
"drive_client_put_inner must reference both advancement \
caps by name in the selection so future readers see the \
split — bare integer literals are forbidden here."
);
}
#[test]
fn start_client_put_acquires_inflight_guard_before_spawn() {
let src = include_str!("op_ctx_task.rs");
let entry = src
.find("pub(crate) async fn start_client_put(")
.expect("start_client_put must exist");
let after_spawn = src[entry..]
.find("GlobalExecutor::spawn(")
.expect("start_client_put must spawn a driver task");
let before_spawn = &src[entry..entry + after_spawn];
assert!(
before_spawn.contains("op_manager.admit_client_op()"),
"start_client_put must call op_manager.admit_client_op() \
before GlobalExecutor::spawn so (a) the shutdown drain \
knows this PUT is in flight and (b) the gate check + \
counter bump are atomic. Reverting to a separate \
is_shutting_down() check + client_op_guard() bump \
re-opens the TOCTOU window (Codex r2 finding)."
);
assert!(
before_spawn.contains("OpError::NodeShuttingDown"),
"start_client_put must early-return OpError::NodeShuttingDown \
when admit_client_op() refuses. Dropping the early-return \
would silently spawn a driver that the Disconnect cuts off."
);
let spawned = &src[entry + after_spawn..];
let block_end = spawned
.find("\n Ok(client_tx)")
.expect("start_client_put must return Ok(client_tx)");
let spawn_block = &spawned[..block_end];
assert!(
spawn_block.contains("let _inflight_guard = inflight_guard;"),
"the ClientOpGuard must be moved into the spawned future \
via `let _inflight_guard = inflight_guard;` so it is held \
for the full driver lifetime. A bare drop at the spawn \
site would clear the counter before run_client_put even \
starts."
);
}
#[test]
fn classify_reply_response_is_stored() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Response {
id: tx,
key,
hop_count: 0,
}));
assert!(matches!(classify_reply(&msg), ReplyClass::Stored { .. }));
}
#[test]
fn classify_reply_response_streaming_is_stored() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming {
id: tx,
key,
continue_forwarding: false,
hop_count: 0,
}));
assert!(matches!(classify_reply(&msg), ReplyClass::Stored { .. }));
}
#[test]
fn classify_reply_preserves_hop_count_for_stored() {
for hc in [0_usize, 1, 4, 10, 64] {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Response {
id: tx,
key,
hop_count: hc,
}));
match classify_reply(&msg) {
ReplyClass::Stored {
key: got_key,
hop_count: got_hc,
} => {
assert_eq!(got_key, key, "Stored.key preserved");
assert_eq!(got_hc, hc, "Stored.hop_count preserved ({hc})");
}
other @ (ReplyClass::LocalCompletion { .. }
| ReplyClass::TerminalError { .. }
| ReplyClass::Unexpected) => {
panic!("expected Stored, got {other:?} for hc={hc}")
}
}
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming {
id: tx,
key,
continue_forwarding: false,
hop_count: hc,
}));
match classify_reply(&msg) {
ReplyClass::Stored {
key: got_key,
hop_count: got_hc,
} => {
assert_eq!(got_key, key, "ResponseStreaming Stored.key preserved");
assert_eq!(
got_hc, hc,
"ResponseStreaming Stored.hop_count preserved ({hc})"
);
}
other @ (ReplyClass::LocalCompletion { .. }
| ReplyClass::TerminalError { .. }
| ReplyClass::Unexpected) => {
panic!("expected Stored, got {other:?} for hc={hc}")
}
}
}
}
#[test]
fn finalize_put_at_originator_uses_wire_hop_count() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let done_arm = SOURCE
.find("RetryLoopOutcome::Done((Ok(reply_key), wire_hop_count))")
.expect(
"RetryLoopOutcome::Done destructure not found — \
if you've refactored to use named struct destructuring \
update this anchor but keep the wire_hop_count threading",
);
let finalize_call = SOURCE[done_arm..]
.find("finalize_put_at_originator(")
.expect("no finalize_put_at_originator call in Done arm");
let region = &SOURCE[done_arm..done_arm + finalize_call + 500];
assert!(
!region.contains("hop_count: None,"),
"PutFinalizationData construction in start_client_put's \
Done arm must NOT hard-code hop_count: None — that emits a \
PutSuccess with hop_count=None alongside the populated one \
from from_inbound_msg_v1, defeating #4248"
);
assert!(
region.contains("hop_count,"),
"PutFinalizationData construction in start_client_put's \
Done arm must pass hop_count from the wire value"
);
assert!(
region.contains("wire_hop_count.map"),
"wire_hop_count must be mapped (e.g. clamp via .min(max_htl)) \
before being passed into PutFinalizationData"
);
}
#[test]
fn classify_reply_forwarding_ack_is_unexpected() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::ForwardingAck {
id: tx,
contract_key: key,
}));
assert!(
matches!(classify_reply(&msg), ReplyClass::Unexpected),
"ForwardingAck must NOT be classified as terminal"
);
}
#[test]
fn classify_reply_error_is_terminal_error_with_cause() {
let tx = dummy_tx();
let cause = "execution error: invalid contract update, reason: \
New state version 1 must be higher than current version 1"
.to_string();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Error {
id: tx,
cause: cause.clone(),
}));
match classify_reply(&msg) {
ReplyClass::TerminalError { cause: c } => assert_eq!(
c.as_str(),
cause.as_str(),
"TerminalError must carry the verbatim cause string from \
the wire envelope so the client sees the real reason \
(not the generic 'failed notifying, channel closed')"
),
ReplyClass::Stored { .. } => panic!(
"Error envelope must NOT classify as Stored — Stored is \
success and would suppress the cause"
),
ReplyClass::LocalCompletion { .. } => panic!(
"Error envelope must NOT classify as LocalCompletion — \
LocalCompletion is a success path keyed by Request echo"
),
ReplyClass::Unexpected => panic!(
"Error envelope must NOT classify as Unexpected — that \
returns RetryLoopOutcome::Unexpected, dropping the cause"
),
}
}
#[test]
fn classify_reply_error_with_empty_cause_preserves_empty() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Error {
id: tx,
cause: String::new(),
}));
match classify_reply(&msg) {
ReplyClass::TerminalError { cause } => assert_eq!(
cause.as_str(),
"",
"empty cause must round-trip as empty — replacing it with a \
placeholder hides the wire shape from the client"
),
ReplyClass::Stored { .. }
| ReplyClass::LocalCompletion { .. }
| ReplyClass::Unexpected => {
panic!("expected TerminalError with empty cause")
}
}
}
#[test]
fn classify_reply_error_with_oversize_cause_is_truncated() {
use crate::operations::put::PUT_TERMINAL_CAUSE_MAX_BYTES;
let tx = dummy_tx();
let cause = "x".repeat(PUT_TERMINAL_CAUSE_MAX_BYTES * 4);
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Error {
id: tx,
cause: cause.clone(),
}));
match classify_reply(&msg) {
ReplyClass::TerminalError { cause: c } => {
assert!(
c.as_str().len() <= PUT_TERMINAL_CAUSE_MAX_BYTES,
"oversize cause must be capped at PUT_TERMINAL_CAUSE_MAX_BYTES"
);
assert!(
c.as_str().ends_with("...[truncated]"),
"oversize cause must carry the truncation marker"
);
}
ReplyClass::Stored { .. }
| ReplyClass::LocalCompletion { .. }
| ReplyClass::Unexpected => {
panic!("expected TerminalError with truncated cause")
}
}
}
#[test]
fn classify_reply_error_with_normal_cause_passes_through() {
let tx = dummy_tx();
let cause = "contract rejected: version must be strictly increasing".to_string();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Error {
id: tx,
cause: cause.clone(),
}));
match classify_reply(&msg) {
ReplyClass::TerminalError { cause: c } => assert_eq!(c.as_str(), cause.as_str()),
ReplyClass::Stored { .. }
| ReplyClass::LocalCompletion { .. }
| ReplyClass::Unexpected => panic!("expected TerminalError"),
}
}
#[test]
fn run_client_put_done_err_arm_publishes_once_and_completes() {
let src = include_str!("op_ctx_task.rs");
let match_anchor = "match loop_result {";
let match_start = src
.find(match_anchor)
.expect("loop_result match site not found");
let match_end = src[match_start..]
.find("\n}\n")
.map(|p| match_start + p)
.expect("end of loop_result match not found");
let match_body = &src[match_start..match_end];
let arm_anchor = "RetryLoopOutcome::Done((Err(cause), _hop_count)) =>";
let arm_start = match_body
.find(arm_anchor)
.expect("Done(Err(cause)) arm not found in run_client_put — issue #4111 regressed");
let arm_end = match_body[arm_start + arm_anchor.len()..]
.find("RetryLoopOutcome::")
.map(|p| arm_start + arm_anchor.len() + p)
.unwrap_or(match_body.len());
let arm_body = &match_body[arm_start..arm_end];
assert!(
arm_body.contains("op_manager.completed(client_tx)"),
"Done(Err) arm MUST call op_manager.completed(client_tx) to \
reclaim the pending_op_results slot — otherwise it lingers \
until the 60s sweep"
);
assert!(
arm_body.contains("DriverOutcome::Publish(Err("),
"Done(Err) arm MUST publish a HostResult::Err — silent drop \
would hang the client until timeout"
);
assert!(
arm_body.contains("ErrorKind::OperationError"),
"Done(Err) arm MUST wrap the cause in \
freenet_stdlib::client_api::ErrorKind::OperationError so the \
client sees a structured error variant (not a generic string)"
);
}
#[test]
fn classify_reply_error_maps_to_terminal_error_with_cause() {
let tx = dummy_tx();
let cause = "rejected: version not increasing".to_string();
let reply = NetMessage::V1(NetMessageV1::Put(PutMsg::Error {
id: tx,
cause: cause.clone(),
}));
match classify_reply(&reply) {
ReplyClass::TerminalError { cause: c } => {
assert_eq!(
c.as_str(),
cause.as_str(),
"classify_reply must preserve the wire cause verbatim \
so `drive_retry_loop`'s Terminal arm can publish the \
real reason (not the synthesised \
'failed notifying, channel closed' marker)"
);
}
ReplyClass::Stored { .. }
| ReplyClass::LocalCompletion { .. }
| ReplyClass::Unexpected => {
panic!(
"PutMsg::Error must classify as TerminalError — \
`Unexpected` would route through \
RetryLoopOutcome::Unexpected and lose the cause"
)
}
}
}
#[test]
fn put_retry_driver_terminal_error_does_not_advance() {
let src = include_str!("op_ctx_task.rs");
let impl_start = src
.find("impl RetryDriver for PutRetryDriver<'_> {")
.expect("PutRetryDriver RetryDriver impl not found");
let impl_end = src[impl_start..]
.find("\n }\n")
.map(|p| impl_start + p)
.expect("end of PutRetryDriver impl not found");
let impl_body = &src[impl_start..impl_end];
assert!(
impl_body.contains("ReplyClass::TerminalError"),
"PutRetryDriver::classify must handle ReplyClass::TerminalError \
explicitly — otherwise it falls through to the Unexpected arm \
and the driver returns RetryLoopOutcome::Unexpected, losing the \
contract-side cause."
);
assert!(
impl_body.contains("AttemptOutcome::Terminal((Err("),
"PutRetryDriver::classify must produce \
AttemptOutcome::Terminal((Err(cause), _)) for TerminalError so the \
retry loop exits via the Done((Err(_), _)) path rather than \
advancing to a fresh attempt."
);
assert!(
impl_body.contains("type Terminal = (Result<ContractKey, PutTerminalError>"),
"PutRetryDriver::Terminal must be
(Result<ContractKey, PutTerminalError>, Option<usize>);
changing this back to bare ContractKey re-introduces the issue
#4111 failure mode (no way to express a terminal error);
changing the error half back to `String` drops the structured
classification (LocalRejection vs Relayed); dropping hop_count
breaks the telemetry contract from PR #4248."
);
}
#[test]
fn finalize_put_at_originator_never_subscribes_from_driver() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let call_marker = "finalize_put_at_originator(";
let mut offset = 0;
let mut call_count = 0;
while let Some(pos) = SOURCE[offset..].find(call_marker) {
let abs_pos = offset + pos;
let window_end = SOURCE[abs_pos..]
.find(".await")
.map(|p| abs_pos + p)
.unwrap_or(SOURCE.len().min(abs_pos + 500));
let window = &SOURCE[abs_pos..window_end];
let after_struct = window.find("},").map(|p| &window[p..]);
if let Some(tail) = after_struct {
assert!(
!tail.contains("subscribe"),
"finalize_put_at_originator call in driver passes subscribe \
arguments that reference the `subscribe` variable instead of \
hardcoded `false`. This would cause double-subscription — \
subscriptions must be handled exclusively by maybe_subscribe_child. \
See commit 494a3c69 for the original fix."
);
}
call_count += 1;
offset = abs_pos + call_marker.len();
}
assert!(
call_count >= 1,
"Expected at least 1 finalize_put_at_originator call in the driver, \
found {call_count}"
);
}
#[test]
fn max_advancements_boundary_exhausts_at_limit() {
let cap = MAX_PEER_ADVANCEMENTS_NON_STREAMING;
let mut retries: usize = 0;
for _ in 0..cap {
assert!(retries < cap, "should not exhaust before limit");
retries += 1;
}
assert!(retries >= cap, "should exhaust at cap={cap}");
}
#[test]
fn classify_reply_unexpected_for_non_put_message() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Aborted(tx));
assert!(matches!(classify_reply(&msg), ReplyClass::Unexpected));
}
#[test]
fn compute_put_attempt_timeout_matches_payload_streaming_decision() {
use crate::config::OPERATION_TTL;
let small_state = WrappedState::new(vec![0u8; 1024]);
let small_contract =
ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0u8; 256])),
Parameters::from(vec![]),
)));
assert_eq!(
compute_put_attempt_timeout(64 * 1024, &small_state, &small_contract),
OPERATION_TTL,
"non-streaming-eligible payload must reuse OPERATION_TTL"
);
let website_state = WrappedState::new(vec![0u8; 2_460_242 - 256]);
let timeout = compute_put_attempt_timeout(64 * 1024, &website_state, &small_contract);
assert!(
timeout > std::time::Duration::from_secs(63),
"website-scale payload timeout {timeout:?} must exceed observed \
completion (~62 s); otherwise issue #4001 recurs"
);
assert!(
timeout > OPERATION_TTL,
"website-scale payload timeout {timeout:?} must exceed OPERATION_TTL"
);
}
#[test]
fn payload_size_estimate_within_throughput_floor_safety_margin() {
use crate::operations::put::PutStreamingPayload;
let state = WrappedState::new(vec![0xABu8; 1024 * 1024]);
let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0x42u8; 200 * 1024])),
Parameters::from(vec![0x55u8; 4096]),
)));
let estimate = state.size() + contract.data().len();
let payload = PutStreamingPayload {
contract: contract.clone(),
related_contracts: RelatedContracts::default(),
value: state,
};
let actual = bincode::serialized_size(&payload).expect("serializable") as usize;
assert!(
actual <= estimate.saturating_mul(2),
"bincode payload size {actual} exceeds 2× estimate {estimate} — \
the 20 KiB/s throughput floor (half observed throughput) no \
longer absorbs the slack; either tighten the estimate (include \
parameters / related_contracts) or revisit \
STREAMING_THROUGHPUT_FLOOR_BPS in operations.rs"
);
}
#[test]
fn driver_outcome_exhausted_produces_client_error() {
let cause = "PUT to contract failed after 3 attempts".to_string();
let outcome: DriverOutcome =
match RetryLoopOutcome::<(ContractKey, Option<usize>)>::Exhausted(cause) {
RetryLoopOutcome::Exhausted(cause) => {
DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into()))
}
RetryLoopOutcome::Done(_)
| RetryLoopOutcome::Unexpected
| RetryLoopOutcome::InfraError(_) => unreachable!(),
};
assert!(
matches!(outcome, DriverOutcome::Publish(Err(_))),
"Exhaustion must produce a client error, not be swallowed"
);
}
#[test]
fn classify_reply_request_is_local_completion() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Request {
id: tx,
contract: ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0u8])),
Parameters::from(vec![]),
))),
related_contracts: RelatedContracts::default(),
value: WrappedState::new(vec![1u8]),
htl: 5,
skip_list: HashSet::new(),
}));
assert!(matches!(
classify_reply(&msg),
ReplyClass::LocalCompletion { .. }
));
}
fn relay_section(src: &str) -> &str {
let start = src
.find("pub(crate) async fn start_relay_put<CB>(")
.expect("start_relay_put not found");
let end = src
.find("\n#[cfg(test)]")
.expect("test module marker not found");
&src[start..end]
}
fn relay_slice_a_section(src: &str) -> &str {
let start = src
.find("pub(crate) async fn start_relay_put<CB>(")
.expect("start_relay_put not found");
let end = src
.find("// ── Relay streaming PUT driver")
.expect("slice B marker not found");
&src[start..end]
}
#[test]
fn start_relay_put_checks_dedup_gate() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_section(src);
let window_start = relay
.find("pub(crate) async fn start_relay_put<CB>(")
.expect("entry-point not found");
let spawn_pos = relay[window_start..]
.find("GlobalExecutor::spawn(run_relay_put(")
.expect("spawn site not found")
+ window_start;
let insert_pos = relay[window_start..]
.find("active_relay_put_txs.insert(incoming_tx)")
.expect("dedup insert site not found")
+ window_start;
assert!(
insert_pos < spawn_pos,
"active_relay_put_txs.insert MUST happen before GlobalExecutor::spawn"
);
}
#[test]
fn dedup_rejection_increments_counter() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_section(src);
let after_insert = relay
.split("active_relay_put_txs.insert(incoming_tx)")
.nth(1)
.expect("dedup insert site not found");
let window = &after_insert[..500.min(after_insert.len())];
assert!(
window.contains("RELAY_PUT_DEDUP_REJECTS.fetch_add"),
"dedup gate must increment RELAY_PUT_DEDUP_REJECTS on rejection"
);
}
#[test]
fn raii_guard_clears_dedup_set_on_drop() {
let src = include_str!("op_ctx_task.rs");
let drop_start = src
.find("impl Drop for RelayPutInflightGuard")
.expect("RelayPutInflightGuard Drop impl not found");
let drop_body = &src[drop_start..drop_start + 600];
assert!(
drop_body.contains("active_relay_put_txs"),
"RelayPutInflightGuard::drop must remove from active_relay_put_txs"
);
assert!(
drop_body.contains("RELAY_PUT_INFLIGHT.fetch_sub"),
"RelayPutInflightGuard::drop must decrement RELAY_PUT_INFLIGHT"
);
assert!(
drop_body.contains("RELAY_PUT_COMPLETED_TOTAL.fetch_add"),
"RelayPutInflightGuard::drop must increment RELAY_PUT_COMPLETED_TOTAL"
);
}
#[test]
fn drive_relay_put_non_streaming_path_uses_send_to_and_await() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_put<CB>(")
.expect("drive_relay_put not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_put_finalize_local(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
assert!(
driver_src.contains("ctx.send_to_and_await("),
"drive_relay_put non-streaming relay path (not loopback) must \
forward downstream via send_to_and_await so the downstream \
Response bubbles back to the relay's waiter"
);
assert!(
driver_src.contains("originator_loopback"),
"drive_relay_put must distinguish the originator-loopback \
branch from the true relay branch"
);
}
#[test]
fn relay_put_send_response_uses_loopback_when_upstream_is_own_addr() {
let src = include_str!("op_ctx_task.rs");
let fn_start = src
.find("async fn relay_put_send_response(")
.expect("relay_put_send_response not found");
let fn_end = src[fn_start..]
.find("\n// ── Relay streaming PUT driver")
.expect("end-of-relay-fn marker not found")
+ fn_start;
let body = &src[fn_start..fn_end];
assert!(
body.contains("send_local_loopback("),
"relay_put_send_response must call send_local_loopback for the \
upstream==own_addr branch (originator-loopback PUT path)"
);
assert!(
body.contains("get_own_addr()"),
"relay_put_send_response must compare upstream_addr to \
connection_manager.get_own_addr() to detect the loopback case"
);
}
#[test]
fn run_relay_put_skips_release_in_originator_loopback() {
let src = include_str!("op_ctx_task.rs");
let fn_start = src
.find("async fn run_relay_put<CB>(")
.expect("run_relay_put not found");
let fn_end = src[fn_start..]
.find("\n#[allow(clippy::too_many_arguments)]\nasync fn drive_relay_put<CB>(")
.expect("end-of-run_relay_put marker not found")
+ fn_start;
let body = &src[fn_start..fn_end];
let release_pos = body
.find("release_pending_op_slot(incoming_tx)")
.expect("release_pending_op_slot call not found in run_relay_put");
let preceding = &body[..release_pos];
assert!(
preceding.rfind("get_own_addr()").is_some(),
"run_relay_put must call get_own_addr() before \
release_pending_op_slot to gate the release on the \
upstream != own_addr case"
);
assert!(
preceding.rfind("Some(upstream_addr) != own_addr").is_some()
|| preceding.rfind("upstream_addr) != own_addr").is_some()
|| preceding.rfind("!originator_loopback").is_some(),
"run_relay_put must guard release_pending_op_slot with an \
upstream != own_addr check (originator-loopback exception)"
);
}
#[test]
fn run_relay_put_publishes_error_on_loopback_failure() {
let src = include_str!("op_ctx_task.rs");
let fn_start = src
.find("async fn run_relay_put<CB>(")
.expect("run_relay_put not found");
let fn_end = src[fn_start..]
.find("\n#[allow(clippy::too_many_arguments)]\nasync fn drive_relay_put<CB>(")
.expect("end-of-run_relay_put marker not found")
+ fn_start;
let body = &src[fn_start..fn_end];
assert!(
body.contains("originator_loopback"),
"run_relay_put must compute originator_loopback to gate the \
error-publication path"
);
assert!(
body.contains("PutMsg::Error {"),
"run_relay_put must construct a PutMsg::Error envelope to \
deliver the loopback failure to the originator's driver"
);
assert!(
body.contains("send_local_loopback"),
"run_relay_put must dispatch the PutMsg::Error via \
send_local_loopback so the bypass forwards it to the \
originator's pending_op_results waiter"
);
assert!(
body.contains("dispatch_loopback_shutdown_fallback("),
"run_relay_put must invoke the OpManager-independent \
fallback helper so the M3 behavioural tests cover the \
shutdown path"
);
assert!(
!body.contains("op_manager.completed(incoming_tx)"),
"run_relay_put MUST NOT call op_manager.completed(incoming_tx) \
in loopback mode — that's the pre-#4111 race shape \
(see .claude/rules/operations.md → \"WHEN publishing a \
terminal operation reply\")"
);
}
#[test]
fn drive_relay_put_reuses_incoming_tx_on_forward() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_put<CB>(")
.expect("drive_relay_put not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_put_finalize_local(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
let forward_pos = driver_src
.find("PutMsg::Request {")
.expect("forward PutMsg::Request not found in driver");
let forward_window = &driver_src[forward_pos..forward_pos + 400];
assert!(
forward_window.contains("id: incoming_tx"),
"relay forward must reuse incoming_tx; minting a fresh tx per hop \
breaks the downstream `active_relay_put_txs` dedup gate"
);
assert!(
!forward_window.contains("Transaction::new::<PutMsg>()"),
"relay forward must NOT mint a fresh Transaction"
);
}
#[test]
fn relay_put_send_response_is_fire_and_forget() {
let src = include_str!("op_ctx_task.rs");
let fn_start = src
.find("async fn relay_put_send_response(")
.expect("relay_put_send_response not found");
let fn_end = src[fn_start..]
.find("\n}\n")
.expect("function body end not found")
+ fn_start;
let fn_src = &src[fn_start..fn_end];
assert!(
fn_src.contains("send_fire_and_forget"),
"relay_put_send_response must use send_fire_and_forget for the \
upstream response"
);
}
#[test]
fn slice_a_does_not_touch_put_streaming_variants() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_slice_a_section(src);
let builds_streaming = relay.contains("NetMessage::from(PutMsg::ResponseStreaming")
|| relay.contains("PutMsg::ResponseStreaming {\n") && {
let pos = relay
.find("PutMsg::ResponseStreaming {")
.expect("anchor present by guard");
let window = &relay[pos..pos + 200.min(relay.len() - pos)];
window.contains("id: ")
};
assert!(
!builds_streaming,
"slice A driver must not CONSTRUCT a PutMsg::ResponseStreaming"
);
}
#[test]
fn drive_relay_put_stores_locally_before_forwarding() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_put<CB>(")
.expect("drive_relay_put not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_put_store_locally(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
let store_pos = driver_src
.find("relay_put_store_locally(")
.expect("relay_put_store_locally call missing in driver");
let forward_pos = driver_src
.find("ctx.send_to_and_await(")
.expect("send_to_and_await call missing in driver");
assert!(
store_pos < forward_pos,
"local store MUST run before the downstream forward"
);
let helper_start = src
.find("async fn relay_put_store_locally(")
.expect("helper not found");
let helper_end = src[helper_start..]
.find("\nasync fn relay_put_finalize_local(")
.expect("helper body end not found")
+ helper_start;
let helper_src = &src[helper_start..helper_end];
assert!(
helper_src.contains("super::put_contract("),
"helper MUST call put_contract"
);
assert!(
helper_src.contains("host_contract("),
"helper MUST call ring.host_contract for first-time hosting"
);
assert!(
helper_src.contains("announce_contract_hosted"),
"helper MUST call announce_contract_hosted for first-time hosting"
);
}
fn drive_relay_put_body(src: &str) -> &str {
let start = src
.find("async fn drive_relay_put<CB>(")
.expect("drive_relay_put not found");
let end = src[start..]
.find("\nasync fn relay_put_store_locally(")
.expect("driver body end not found")
+ start;
&src[start..end]
}
#[test]
fn drive_relay_put_upgrades_when_payload_exceeds_threshold() {
let src = include_str!("op_ctx_task.rs");
let body = drive_relay_put_body(src);
assert!(
body.contains("should_use_streaming("),
"drive_relay_put must call should_use_streaming on the merged payload"
);
assert!(
body.contains("PutMsg::RequestStreaming {"),
"drive_relay_put must build PutMsg::RequestStreaming on upgrade"
);
assert!(
body.contains("conn_manager\n .send_stream(")
|| body.contains("conn_manager.send_stream("),
"drive_relay_put must call NetworkBridge::send_stream after metadata send"
);
}
fn drive_relay_put_relay_branch(src: &str) -> &str {
let body = drive_relay_put_body(src);
let start = body
.find("let round_trip = if upgrade_to_streaming {")
.expect("relay-non-loopback round_trip branch not found");
let end = body[start..]
.find("};\n\n // Release the pending_op_results slot")
.expect("relay branch end-marker not found")
+ start;
&body[start..end]
}
#[test]
fn drive_relay_put_upgrade_installs_waiter_before_send_stream() {
let src = include_str!("op_ctx_task.rs");
let branch = drive_relay_put_relay_branch(src);
let waiter_pos = branch
.find("send_to_and_register_waiter(")
.expect("send_to_and_register_waiter call missing in relay upgrade branch");
let stream_pos = branch
.find(".send_stream(")
.expect("send_stream call missing in relay upgrade branch");
assert!(
waiter_pos < stream_pos,
"send_to_and_register_waiter MUST run before send_stream so the \
reply waiter is installed before the first fragment lands"
);
}
#[test]
fn drive_relay_put_upgrade_error_paths_release_slot() {
let src = include_str!("op_ctx_task.rs");
let branch = drive_relay_put_relay_branch(src);
let upgrade_start = branch
.find("if upgrade_to_streaming {")
.expect("upgrade_to_streaming branch not found in relay branch");
let upgrade_end = branch[upgrade_start..]
.find("} else {")
.expect("upgrade branch closing else not found")
+ upgrade_start;
let upgrade = &branch[upgrade_start..upgrade_end];
let mut search = 0;
let mut return_count = 0;
while let Some(pos) = upgrade[search..].find("return Err(") {
return_count += 1;
let abs = search + pos;
let window_start = abs.saturating_sub(400);
let window = &upgrade[window_start..abs];
assert!(
window.contains("release_pending_op_slot(incoming_tx)"),
"upgrade branch `return Err` at offset {abs} must be preceded by \
release_pending_op_slot — leaks pending_op_results otherwise"
);
search = abs + "return Err(".len();
}
assert!(
return_count >= 2,
"expected at least 2 error-return paths in relay upgrade branch \
(send_to_and_register_waiter fail, send_stream fail); found {return_count}"
);
}
fn relay_slice_b_section(src: &str) -> &str {
let start = src
.find("// ── Relay streaming PUT driver")
.expect("slice B marker not found");
let end = src
.find("\n// ── End of relay PUT driver ─")
.expect("end-of-driver marker not found");
&src[start..end]
}
#[test]
fn start_relay_put_streaming_checks_dedup_gate() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let insert_pos = b
.find("active_relay_put_txs.insert(incoming_tx)")
.expect("dedup insert not found");
let spawn_pos = b
.find("GlobalExecutor::spawn(run_relay_put_streaming(")
.expect("spawn site not found");
assert!(
insert_pos < spawn_pos,
"dedup-gate MUST run before spawning the streaming driver"
);
}
#[test]
fn start_relay_put_streaming_dedup_reject_is_silent() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let insert_pos = b
.find("active_relay_put_txs.insert(incoming_tx)")
.expect("dedup anchor not found");
let window = &b[insert_pos..];
let close_pos = window
.find("\n }\n")
.expect("dedup-branch close not found");
let branch = &window[..close_pos];
assert!(
branch.contains("RELAY_PUT_STREAMING_DEDUP_REJECTS.fetch_add"),
"dedup gate must increment RELAY_PUT_STREAMING_DEDUP_REJECTS on rejection"
);
assert!(
!branch.contains("send_fire_and_forget"),
"dedup-reject must NOT fire a fabricated Response (no NotFound variant in PutMsg)"
);
}
#[test]
fn relay_put_streaming_guard_drop_is_balanced() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let drop_start = b
.find("impl Drop for RelayPutStreamingInflightGuard")
.expect("guard Drop impl not found");
let drop_end = b[drop_start..]
.find("\n}\n")
.expect("guard Drop body end not found")
+ drop_start;
let drop_body = &b[drop_start..drop_end];
assert!(
drop_body.contains("RELAY_PUT_STREAMING_INFLIGHT.fetch_sub"),
"guard::drop must decrement RELAY_PUT_STREAMING_INFLIGHT"
);
assert!(
drop_body.contains("RELAY_PUT_STREAMING_COMPLETED_TOTAL.fetch_add"),
"guard::drop must increment RELAY_PUT_STREAMING_COMPLETED_TOTAL"
);
assert!(
drop_body.contains("active_relay_put_txs"),
"guard::drop must reference active_relay_put_txs"
);
assert!(
drop_body.contains(".remove(&self.incoming_tx)"),
"guard::drop must remove the tx from active_relay_put_txs"
);
}
#[test]
fn drive_relay_put_streaming_uses_claim_and_pipe() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let driver_start = b
.find("async fn drive_relay_put_streaming")
.expect("drive_relay_put_streaming not found");
let driver = &b[driver_start..];
assert!(
driver.contains("orphan_stream_registry()"),
"streaming driver must claim via orphan_stream_registry"
);
assert!(
driver.contains("claim_or_wait(upstream_addr"),
"claim MUST use upstream_addr + inbound stream_id"
);
assert!(
driver.contains(".pipe_stream("),
"streaming driver must call pipe_stream for forwarding"
);
assert!(
driver.contains("relay_put_store_locally("),
"streaming driver must reuse the shared local-store helper"
);
}
#[test]
fn drive_relay_put_streaming_omits_forwarding_ack() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let driver_start = b
.find("async fn drive_relay_put_streaming")
.expect("drive_relay_put_streaming not found");
let driver = &b[driver_start..];
assert!(
!driver.contains("NetMessage::from(PutMsg::ForwardingAck")
&& !driver.contains("PutMsg::ForwardingAck {"),
"slice B driver must not construct PutMsg::ForwardingAck"
);
}
#[test]
fn drive_relay_put_streaming_reuses_incoming_tx_on_forward() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let pipe_meta_pos = b
.find("PutMsg::RequestStreaming {")
.expect("outbound RequestStreaming construction not found");
let window = &b[pipe_meta_pos..pipe_meta_pos + 300.min(b.len() - pipe_meta_pos)];
assert!(
window.contains("id: incoming_tx"),
"piped metadata must reuse incoming_tx (not mint fresh)"
);
}
#[test]
fn drive_relay_put_streaming_already_claimed_is_silent() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let ac_pos = b
.find("OrphanStreamError::AlreadyClaimed")
.expect("AlreadyClaimed arm not found");
let arm = &b[ac_pos..ac_pos + 600.min(b.len() - ac_pos)];
assert!(
arm.contains("return Ok(())"),
"AlreadyClaimed arm must return Ok(()) — the still-in-flight \
driver owns the upstream reply; this duplicate must exit silently"
);
assert!(
!arm.contains("send_fire_and_forget"),
"AlreadyClaimed arm must NOT fabricate a Response upstream"
);
}
#[test]
fn drive_relay_put_streaming_claim_failure_is_silent() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let pos = b
.find("OrphanStreamClaimFailed")
.expect("OrphanStreamClaimFailed not found in slice B");
let window = &b[..pos];
let err_arm_start = window
.rfind("Err(err) =>")
.expect("orphan-claim Err arm not found");
let arm = &b[err_arm_start..pos + 100];
assert!(
!arm.contains("send_fire_and_forget"),
"orphan-claim failure must NOT fabricate a success Response upstream"
);
}
#[test]
fn drive_relay_put_streaming_timeout_bubbles_response() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let pos = b
.find("downstream reply timed out")
.expect("timeout arm not found");
let arm = &b[pos..pos + 1200.min(b.len() - pos)];
assert!(
arm.contains("relay_put_send_response"),
"timeout arm must still call relay_put_send_response so the \
upstream waiter is not left to its own OPERATION_TTL"
);
}
#[test]
fn drive_relay_put_streaming_store_failure_paths_do_not_fabricate() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let driver_start = b
.find("async fn drive_relay_put_streaming")
.expect("driver not found");
let driver = &b[driver_start..];
for phrase in [
"stream assembly failed",
"contract key mismatch",
"payload deserialize failed",
] {
let anchor = driver
.find(phrase)
.unwrap_or_else(|| panic!("failure-path anchor {phrase:?} not found"));
let tail = &driver[anchor..];
let ret_pos = tail
.find("return Err(")
.unwrap_or_else(|| panic!("no return Err after {phrase:?}"));
let pre_return = &tail[..ret_pos];
assert!(
!pre_return.contains("send_fire_and_forget"),
"failure path {phrase:?} must not fabricate a Response upstream"
);
}
}
#[test]
fn drive_relay_put_streaming_registers_waiter_before_pipe_stream() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let driver_start = b
.find("async fn drive_relay_put_streaming")
.expect("driver not found");
let driver = &b[driver_start..];
let register_pos = driver
.find("send_to_and_register_waiter(")
.expect("register_waiter call not found");
let pipe_pos = driver
.find(".pipe_stream(")
.expect("pipe_stream call not found");
assert!(
register_pos < pipe_pos,
"send_to_and_register_waiter MUST precede pipe_stream so the \
pending_op_results callback is installed before fragments \
land downstream (otherwise a fast reply races past the waiter)"
);
}
#[test]
fn drive_relay_put_records_route_events_on_transport_failure() {
let src = include_str!("op_ctx_task.rs");
let body_start = src
.find("async fn drive_relay_put<CB>(")
.expect("drive_relay_put fn must exist");
let body_end = src[body_start..]
.find("/// Store a relayed PUT")
.map(|i| body_start + i)
.unwrap_or(src.len());
let body = &src[body_start..body_end];
for log_phrase in ["send_to_and_await failed", "downstream timed out"] {
let pos = body
.find(log_phrase)
.unwrap_or_else(|| panic!("expected `{log_phrase}` in drive_relay_put"));
let after = &body[pos..pos + 1500.min(body.len() - pos)];
assert!(
after.contains("record_relay_route_event")
&& after.contains("RouteOutcome::Failure"),
"drive_relay_put arm for `{log_phrase}` must call \
record_relay_route_event with RouteOutcome::Failure. \
Without this, transport failures from relay-forwarded \
PUTs are dropped — the regression PR #4051 fixes."
);
}
let pos = body
.find("downstream returned Response; bubbling upstream")
.expect("Response success arm not found");
let after = &body[pos..pos + 1500.min(body.len() - pos)];
assert!(
after.contains("record_relay_route_event")
&& after.contains("RouteOutcome::SuccessUntimed"),
"drive_relay_put Response arm must record SuccessUntimed."
);
}
#[tokio::test]
async fn relay_put_send_error_with_ctx_uses_loopback_when_own_addr() {
use crate::node::{EventLoopNotificationsReceiver, event_loop_notification_channel};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let (receiver, sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut op_execution_receiver,
..
} = receiver;
let tx = dummy_tx();
let own = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9001);
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let cause = "rejected: deterministic local failure".to_string();
let result = relay_put_send_error_with_ctx(
&mut ctx,
Some(own), tx,
cause.clone(),
own, )
.await;
assert!(result.is_ok(), "loopback dispatch must succeed");
let (_reply_sender, outbound, target_addr) = op_execution_receiver
.recv()
.await
.expect("loopback envelope should be delivered to executor");
assert_eq!(
target_addr, None,
"loopback MUST pass target_addr=None so the dispatcher \
routes through InboundMessage → PUT bypass"
);
match outbound {
NetMessage::V1(NetMessageV1::Put(PutMsg::Error { id, cause: c })) => {
assert_eq!(id, tx, "envelope tx MUST reuse incoming_tx");
assert_eq!(
c, cause,
"envelope cause MUST be passed through verbatim — \
bound_cause is applied by callers, not by the helper"
);
}
other => panic!("expected PutMsg::Error envelope, got {other:?}"),
}
}
#[tokio::test]
async fn relay_put_send_error_with_ctx_uses_fire_and_forget_to_upstream() {
use crate::node::{EventLoopNotificationsReceiver, event_loop_notification_channel};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let (receiver, sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut op_execution_receiver,
..
} = receiver;
let tx = dummy_tx();
let own = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9001);
let upstream = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 42)), 31337);
assert_ne!(own, upstream, "test invariant: addresses must differ");
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let cause = "remote contract rejection: version not increasing".to_string();
let result =
relay_put_send_error_with_ctx(&mut ctx, Some(own), tx, cause.clone(), upstream).await;
assert!(result.is_ok(), "fire-and-forget dispatch must succeed");
let (_reply_sender, outbound, target_addr) = op_execution_receiver
.recv()
.await
.expect("upstream envelope should be delivered to executor");
assert_eq!(
target_addr,
Some(upstream),
"non-loopback MUST pass target_addr=Some(upstream_addr) so \
the dispatcher routes via OutboundMessageWithTarget to the \
upstream relay"
);
match outbound {
NetMessage::V1(NetMessageV1::Put(PutMsg::Error { id, cause: c })) => {
assert_eq!(id, tx);
assert_eq!(c, cause);
}
other => panic!("expected PutMsg::Error envelope, got {other:?}"),
}
}
#[tokio::test]
async fn relay_put_send_error_with_ctx_unknown_own_addr_uses_fire_and_forget() {
use crate::node::{EventLoopNotificationsReceiver, event_loop_notification_channel};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let (receiver, sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut op_execution_receiver,
..
} = receiver;
let tx = dummy_tx();
let upstream = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 42)), 31337);
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let result = relay_put_send_error_with_ctx(
&mut ctx,
None, tx,
"x".into(),
upstream,
)
.await;
assert!(result.is_ok());
let (_reply_sender, _outbound, target_addr) = op_execution_receiver
.recv()
.await
.expect("envelope should still be delivered when own_addr is unknown");
assert_eq!(
target_addr,
Some(upstream),
"own_addr=None MUST NOT mask as loopback — dispatch over the wire"
);
}
#[tokio::test]
async fn relay_put_send_error_with_ctx_errors_on_closed_channel() {
use crate::node::event_loop_notification_channel;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let (receiver, sender) = event_loop_notification_channel();
drop(receiver);
let tx = dummy_tx();
let upstream = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 42)), 31337);
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let result = relay_put_send_error_with_ctx(
&mut ctx,
Some(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
9001,
)),
tx,
"shutdown-time failure".into(),
upstream,
)
.await;
assert!(
matches!(result, Err(OpError::NotificationError)),
"closed executor channel MUST surface as NotificationError, \
got {result:?}"
);
}
#[test]
fn drive_relay_put_error_arm_rebounds_cause_before_bubble() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_put<CB>(")
.expect("drive_relay_put not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_put_finalize_local(")
.map(|p| driver_start + p)
.expect("end of drive_relay_put not found");
let driver_src = &src[driver_start..driver_end];
let arm_anchor = "PutMsg::Error {\n cause: downstream_cause,";
let arm_start = driver_src
.find(arm_anchor)
.expect("PutMsg::Error reply arm not found in drive_relay_put");
let arm_end = driver_src[arm_start..]
.find("other => {")
.map(|p| arm_start + p)
.expect("end-of-Error-arm marker not found");
let arm_body = &driver_src[arm_start..arm_end];
assert!(
arm_body.contains("bound_cause(downstream_cause)"),
"the downstream-Error arm MUST re-bound the incoming cause \
via bound_cause before passing it to relay_put_send_error \
— otherwise multi-hop forwarding amplifies attacker-controlled \
cause length per hop"
);
assert!(
arm_body.contains("relay_put_send_error("),
"the downstream-Error arm MUST forward via relay_put_send_error"
);
assert!(
arm_body.contains("upstream_addr"),
"the downstream-Error arm MUST target upstream_addr (the hop \
we received the original Request from), not an arbitrary peer"
);
}
#[test]
fn run_relay_put_bubbles_local_failure_in_non_loopback_mode() {
let src = include_str!("op_ctx_task.rs");
let fn_start = src
.find("async fn run_relay_put<CB>(")
.expect("run_relay_put not found");
let fn_end = src[fn_start..]
.find("\n#[allow(clippy::too_many_arguments)]\nasync fn drive_relay_put<CB>(")
.map(|p| fn_start + p)
.expect("end-of-run_relay_put marker not found");
let body = &src[fn_start..fn_end];
let else_anchor = "} else if let Err(err) = drive_result {";
let else_start = body.find(else_anchor).expect(
"non-loopback Err branch not found in run_relay_put — \
B1 fix regressed; intermediate relays will silently \
swallow local PUT failures",
);
let branch_body = &body[else_start..];
assert!(
branch_body.contains("bound_cause(err.to_string())"),
"non-loopback Err branch MUST apply bound_cause(err.to_string()) \
before bubbling — keeps the per-hop DoS amplification bound"
);
assert!(
branch_body.contains("relay_put_send_error("),
"non-loopback Err branch MUST call relay_put_send_error to \
emit PutMsg::Error to upstream_addr"
);
assert!(
branch_body.contains("upstream_addr"),
"non-loopback Err branch MUST target upstream_addr"
);
}
#[tokio::test]
async fn dispatch_loopback_shutdown_fallback_publishes_to_result_router() {
use tokio::sync::mpsc;
let (router_tx, mut router_rx) = mpsc::channel::<(Transaction, HostResult)>(8);
let tx = dummy_tx();
let cause = "contract rejection: version not increasing".to_string();
dispatch_loopback_shutdown_fallback(&router_tx, tx, cause.clone());
let (received_tx, host_result) = router_rx
.recv()
.await
.expect("fallback MUST publish to result_router_tx");
assert_eq!(received_tx, tx, "result_router tx must reuse incoming_tx");
let client_err =
host_result.expect_err("result MUST be Err — fallback is the failure path");
let rendered = format!("{client_err}");
assert!(
rendered.contains(&cause),
"result_router Err MUST carry the verbatim cause string \
so the WS client sees the real failure reason (got: {rendered:?})"
);
assert!(
router_rx.try_recv().is_err(),
"fallback MUST publish exactly one entry to result_router_tx"
);
}
#[tokio::test]
async fn dispatch_loopback_shutdown_fallback_does_not_emit_transaction_completed() {
use crate::node::{EventLoopNotificationsReceiver, event_loop_notification_channel};
use tokio::sync::mpsc;
let (router_tx, mut router_rx) = mpsc::channel::<(Transaction, HostResult)>(8);
let (receiver, _sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut notifications_receiver,
..
} = receiver;
let tx = dummy_tx();
dispatch_loopback_shutdown_fallback(
&router_tx,
tx,
"shutdown-time deterministic failure".into(),
);
let _drained = router_rx
.recv()
.await
.expect("fallback MUST publish to result_router_tx");
match notifications_receiver.try_recv() {
Ok(unexpected) => panic!(
"fallback MUST NOT emit anything on the notifications \
channel; pre-#4111 shape would have sent TransactionCompleted, \
re-introducing the M1/M2 race. Got: {unexpected:?}"
),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
panic!("notifications channel closed before the assertion ran")
}
}
}
#[tokio::test]
async fn dispatch_loopback_shutdown_fallback_does_not_panic_on_closed_router() {
use tokio::sync::mpsc;
let (router_tx, router_rx) = mpsc::channel::<(Transaction, HostResult)>(8);
drop(router_rx);
let tx = dummy_tx();
dispatch_loopback_shutdown_fallback(&router_tx, tx, "x".into());
}
#[tokio::test]
async fn dispatch_loopback_shutdown_fallback_does_not_block_on_full_router() {
use tokio::sync::mpsc;
let (router_tx, _router_rx) = mpsc::channel::<(Transaction, HostResult)>(1);
let filler_tx = dummy_tx();
let filler_err: freenet_stdlib::client_api::ClientError = ErrorKind::OperationError {
cause: "filler".into(),
}
.into();
router_tx
.try_send((filler_tx, Err(filler_err)))
.expect("first send fills the capacity-1 channel");
let tx = dummy_tx();
let result = tokio::time::timeout(std::time::Duration::from_millis(100), async {
dispatch_loopback_shutdown_fallback(&router_tx, tx, "shutdown".into());
})
.await;
assert!(
result.is_ok(),
"fallback MUST NOT block on a full result_router_tx — \
channel-safety rule: never `send().await` from inside a \
driver body. The helper must use `try_send` and drop+log \
on Full instead."
);
}
}