use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use either::Either;
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
use crate::client_events::HostResult;
use crate::config::{GlobalExecutor, OPERATION_TTL};
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::OpManager;
use crate::operations::NetworkBridge;
use crate::operations::OpError;
use crate::operations::op_ctx::{
AdvanceOutcome, AttemptOutcome, RetryDriver, RetryLoopOutcome, drive_retry_loop,
};
use crate::operations::orphan_streams::{OrphanStreamError, STREAM_CLAIM_TIMEOUT};
use crate::ring::{Location, PeerKeyLocation};
use crate::router::{RouteEvent, RouteOutcome};
use crate::tracing::{NetEventLog, OperationFailure, state_hash_full};
use crate::transport::peer_connection::StreamId;
use super::{PutFinalizationData, PutMsg, PutStreamingPayload};
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<Transaction, OpError> {
tracing::debug!(
tx = %client_tx,
contract = %contract.key(),
"put (task-per-tx): spawning client-initiated task"
);
GlobalExecutor::spawn(run_client_put(
op_manager,
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
));
Ok(client_tx)
}
#[allow(clippy::too_many_arguments)]
async fn run_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) {
let outcome = drive_client_put(
op_manager.clone(),
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
)
.await;
deliver_outcome(&op_manager, client_tx, outcome);
}
#[derive(Debug)]
enum DriverOutcome {
Publish(HostResult),
InfrastructureError(OpError),
}
#[allow(clippy::too_many_arguments)]
async fn drive_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> DriverOutcome {
match drive_client_put_inner(
&op_manager,
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
)
.await
{
Ok(outcome) => outcome,
Err(err) => DriverOutcome::InfrastructureError(err),
}
}
#[allow(clippy::too_many_arguments)]
async fn drive_client_put_inner(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<DriverOutcome, OpError> {
let key = contract.key();
let mut tried: Vec<std::net::SocketAddr> = Vec::new();
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
tried.push(own_addr);
}
let initial_target = op_manager
.ring
.closest_potentially_hosting(&key, tried.as_slice());
let current_target = match initial_target {
Some(peer) => {
if let Some(addr) = peer.socket_addr() {
tried.push(addr);
}
peer
}
None => op_manager.ring.connection_manager.own_location(),
};
struct PutRetryDriver<'a> {
op_manager: &'a OpManager,
key: ContractKey,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
tried: Vec<std::net::SocketAddr>,
retries: usize,
current_target: PeerKeyLocation,
attempt_timeout: std::time::Duration,
}
impl RetryDriver for PutRetryDriver<'_> {
type Terminal = ContractKey;
fn new_attempt_tx(&mut self) -> Transaction {
Transaction::new::<PutMsg>()
}
fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {
NetMessage::from(PutMsg::Request {
id: attempt_tx,
contract: self.contract.clone(),
related_contracts: self.related.clone(),
value: self.value.clone(),
htl: self.htl,
skip_list: self
.op_manager
.ring
.connection_manager
.get_own_addr()
.into_iter()
.collect::<HashSet<_>>(),
})
}
fn classify(&mut self, reply: NetMessage) -> AttemptOutcome<ContractKey> {
match classify_reply(&reply) {
ReplyClass::Stored { key } | ReplyClass::LocalCompletion { key } => {
AttemptOutcome::Terminal(key)
}
ReplyClass::Unexpected => AttemptOutcome::Unexpected,
}
}
fn advance(&mut self) -> AdvanceOutcome {
match advance_to_next_peer(
self.op_manager,
&self.key,
&mut self.tried,
&mut self.retries,
) {
Some((next_target, _next_addr)) => {
self.current_target = next_target;
AdvanceOutcome::Next
}
None => AdvanceOutcome::Exhausted,
}
}
fn attempt_timeout(&self) -> std::time::Duration {
self.attempt_timeout
}
}
let attempt_timeout =
compute_put_attempt_timeout(op_manager.streaming_threshold, &value, &contract);
let mut driver = PutRetryDriver {
op_manager,
key,
contract,
related,
value,
htl,
tried,
retries: 0,
current_target,
attempt_timeout,
};
let loop_result = drive_retry_loop(op_manager, client_tx, "put", &mut driver).await;
match loop_result {
RetryLoopOutcome::Done(reply_key) => {
op_manager.completed(client_tx);
let contract_location = Location::from(&reply_key);
let route_event = RouteEvent {
peer: driver.current_target.clone(),
contract_location,
outcome: RouteOutcome::SuccessUntimed,
op_type: Some(crate::node::network_status::OpType::Put),
};
if let Some(log_event) =
crate::tracing::NetEventLog::route_event(&client_tx, &op_manager.ring, &route_event)
{
op_manager
.ring
.register_events(either::Either::Left(log_event))
.await;
}
op_manager.ring.routing_finished(route_event);
crate::node::network_status::record_op_result(
crate::node::network_status::OpType::Put,
true,
);
super::finalize_put_at_originator(
op_manager,
client_tx,
reply_key,
PutFinalizationData {
sender: driver.current_target,
hop_count: None,
state_hash: None,
state_size: None,
},
false,
false,
)
.await;
maybe_subscribe_child(
op_manager,
client_tx,
reply_key,
subscribe,
blocking_subscribe,
)
.await;
Ok(DriverOutcome::Publish(Ok(HostResponse::ContractResponse(
ContractResponse::PutResponse { key: reply_key },
))))
}
RetryLoopOutcome::Exhausted(cause) => {
Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into())))
}
RetryLoopOutcome::Unexpected => Err(OpError::UnexpectedOpState),
RetryLoopOutcome::InfraError(err) => Err(err),
}
}
#[derive(Debug)]
enum ReplyClass {
Stored {
key: ContractKey,
},
LocalCompletion {
key: ContractKey,
},
Unexpected,
}
fn classify_reply(msg: &NetMessage) -> ReplyClass {
match msg {
NetMessage::V1(NetMessageV1::Put(
PutMsg::Response { key, .. } | PutMsg::ResponseStreaming { key, .. },
)) => ReplyClass::Stored { key: *key },
NetMessage::V1(NetMessageV1::Put(PutMsg::Request {
id: _, contract, ..
})) => ReplyClass::LocalCompletion {
key: contract.key(),
},
_ => ReplyClass::Unexpected,
}
}
fn compute_put_attempt_timeout(
streaming_threshold: usize,
value: &WrappedState,
contract: &ContractContainer,
) -> std::time::Duration {
let payload_size_estimate = value
.size()
.saturating_add(contract.data().len())
.saturating_add(contract.params().size());
crate::operations::streaming_aware_attempt_timeout(streaming_threshold, payload_size_estimate)
}
const MAX_RETRIES: usize = 3;
fn advance_to_next_peer(
op_manager: &OpManager,
key: &ContractKey,
tried: &mut Vec<std::net::SocketAddr>,
retries: &mut usize,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)> {
if *retries >= MAX_RETRIES {
return None;
}
*retries += 1;
let peer = op_manager
.ring
.closest_potentially_hosting(key, tried.as_slice())?;
let addr = peer.socket_addr()?;
tried.push(addr);
Some((peer, addr))
}
async fn maybe_subscribe_child(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
key: ContractKey,
subscribe: bool,
blocking_subscribe: bool,
) {
if !subscribe {
return;
}
use crate::operations::subscribe;
let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&client_tx);
if blocking_subscribe {
subscribe::run_client_subscribe(op_manager.clone(), *key.id(), child_tx).await;
} else {
GlobalExecutor::spawn(subscribe::run_client_subscribe(
op_manager.clone(),
*key.id(),
child_tx,
));
}
}
fn deliver_outcome(op_manager: &OpManager, client_tx: Transaction, outcome: DriverOutcome) {
match outcome {
DriverOutcome::Publish(result) => {
op_manager.send_client_result(client_tx, result);
}
DriverOutcome::InfrastructureError(err) => {
tracing::warn!(
tx = %client_tx,
error = %err,
"put (task-per-tx): infrastructure error; publishing synthesized client error"
);
let synthesized: HostResult = Err(ErrorKind::OperationError {
cause: format!("PUT failed: {err}").into(),
}
.into());
op_manager.send_client_result(client_tx, synthesized);
}
}
}
#[cfg(any(test, feature = "testing"))]
pub static RELAY_PUT_DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_INFLIGHT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_SPAWNED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_COMPLETED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_DEDUP_REJECTS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_relay_put(
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
contract: ContractContainer,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
skip_list: HashSet<SocketAddr>,
upstream_addr: SocketAddr,
) -> Result<(), OpError> {
#[cfg(any(test, feature = "testing"))]
RELAY_PUT_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_put_txs.insert(incoming_tx) {
RELAY_PUT_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
contract = %contract.key(),
%upstream_addr,
phase = "relay_put_dedup_reject",
"PUT relay (task-per-tx): duplicate Request for in-flight tx, dropping"
);
return Ok(());
}
tracing::debug!(
tx = %incoming_tx,
contract = %contract.key(),
htl,
%upstream_addr,
phase = "relay_put_start",
"PUT relay (task-per-tx): spawning driver"
);
RELAY_PUT_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_PUT_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let guard = RelayPutInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
GlobalExecutor::spawn(run_relay_put(
guard,
op_manager,
incoming_tx,
contract,
related_contracts,
value,
htl,
skip_list,
upstream_addr,
));
Ok(())
}
struct RelayPutInflightGuard {
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
}
impl Drop for RelayPutInflightGuard {
fn drop(&mut self) {
self.op_manager
.active_relay_put_txs
.remove(&self.incoming_tx);
RELAY_PUT_INFLIGHT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
RELAY_PUT_COMPLETED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[allow(clippy::too_many_arguments)]
async fn run_relay_put(
guard: RelayPutInflightGuard,
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
contract: ContractContainer,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
skip_list: HashSet<SocketAddr>,
upstream_addr: SocketAddr,
) {
let _guard = guard;
if let Err(err) = drive_relay_put(
&op_manager,
incoming_tx,
contract,
related_contracts,
value,
htl,
skip_list,
upstream_addr,
)
.await
{
tracing::warn!(
tx = %incoming_tx,
error = %err,
phase = "relay_put_error",
"PUT relay (task-per-tx): driver returned error"
);
}
tokio::task::yield_now().await;
op_manager.release_pending_op_slot(incoming_tx).await;
}
#[allow(clippy::too_many_arguments)]
async fn drive_relay_put(
op_manager: &Arc<OpManager>,
incoming_tx: Transaction,
contract: ContractContainer,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
skip_list: HashSet<SocketAddr>,
upstream_addr: SocketAddr,
) -> Result<(), OpError> {
let key = contract.key();
tracing::info!(
tx = %incoming_tx,
contract = %key,
htl,
%upstream_addr,
phase = "relay_put_request",
"PUT relay (task-per-tx): processing Request"
);
let merged_value = relay_put_store_locally(
op_manager,
incoming_tx,
key,
value.clone(),
&contract,
related_contracts.clone(),
htl,
)
.await?;
let mut new_skip_list = skip_list;
new_skip_list.insert(upstream_addr);
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
new_skip_list.insert(own_addr);
}
let next_hop = if htl > 0 {
op_manager
.ring
.closest_potentially_hosting(&key, &new_skip_list)
} else {
None
};
let (next_peer, next_addr) = match next_hop {
Some(peer) => {
let addr = match peer.socket_addr() {
Some(a) => a,
None => {
tracing::error!(
tx = %incoming_tx,
contract = %key,
target_pub_key = %peer.pub_key(),
"PUT relay (task-per-tx): next hop has no socket address"
);
return relay_put_finalize_local(
op_manager,
incoming_tx,
key,
merged_value,
upstream_addr,
)
.await;
}
};
(peer, addr)
}
None => {
tracing::info!(
tx = %incoming_tx,
contract = %key,
phase = "relay_put_complete",
"PUT relay (task-per-tx): no next hop, finalizing at this node"
);
return relay_put_finalize_local(
op_manager,
incoming_tx,
key,
merged_value,
upstream_addr,
)
.await;
}
};
let new_htl = htl.saturating_sub(1);
if let Some(event) = NetEventLog::put_request(
&incoming_tx,
&op_manager.ring,
key,
next_peer.clone(),
new_htl,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
tracing::debug!(
tx = %incoming_tx,
contract = %key,
peer_addr = %next_addr,
htl = new_htl,
phase = "relay_put_forward",
"PUT relay (task-per-tx): forwarding to next hop"
);
let forward = NetMessage::from(PutMsg::Request {
id: incoming_tx,
contract,
related_contracts,
value: merged_value,
htl: new_htl,
skip_list: new_skip_list,
});
let mut ctx = op_manager.op_ctx(incoming_tx);
let round_trip =
tokio::time::timeout(OPERATION_TTL, ctx.send_to_and_await(next_addr, forward)).await;
op_manager.release_pending_op_slot(incoming_tx).await;
let reply = match round_trip {
Ok(Ok(reply)) => reply,
Ok(Err(err)) => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
error = %err,
"PUT relay (task-per-tx): send_to_and_await failed"
);
crate::operations::record_relay_route_event(
op_manager,
next_peer.clone(),
crate::ring::Location::from(&key),
crate::router::RouteOutcome::Failure,
crate::node::network_status::OpType::Put,
);
return Err(err);
}
Err(_elapsed) => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
target = %next_addr,
timeout_secs = OPERATION_TTL.as_secs(),
"PUT relay (task-per-tx): downstream timed out"
);
crate::operations::record_relay_route_event(
op_manager,
next_peer.clone(),
crate::ring::Location::from(&key),
crate::router::RouteOutcome::Failure,
crate::node::network_status::OpType::Put,
);
return Err(OpError::UnexpectedOpState);
}
};
match reply {
NetMessage::V1(NetMessageV1::Put(PutMsg::Response { key: reply_key, .. })) => {
tracing::info!(
tx = %incoming_tx,
contract = %reply_key,
phase = "relay_put_bubble",
"PUT relay (task-per-tx): downstream returned Response; bubbling upstream"
);
crate::operations::record_relay_route_event(
op_manager,
next_peer.clone(),
crate::ring::Location::from(&reply_key),
crate::router::RouteOutcome::SuccessUntimed,
crate::node::network_status::OpType::Put,
);
relay_put_send_response(op_manager, incoming_tx, reply_key, upstream_addr).await
}
NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming { key: reply_key, .. })) => {
tracing::warn!(
tx = %incoming_tx,
contract = %reply_key,
phase = "relay_put_bubble_streaming_downgrade",
"PUT relay (task-per-tx): downstream returned ResponseStreaming — \
synthesizing non-streaming Response upstream (slice A limitation)"
);
crate::operations::record_relay_route_event(
op_manager,
next_peer.clone(),
crate::ring::Location::from(&reply_key),
crate::router::RouteOutcome::SuccessUntimed,
crate::node::network_status::OpType::Put,
);
relay_put_send_response(op_manager, incoming_tx, reply_key, upstream_addr).await
}
other => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
reply_variant = ?std::mem::discriminant(&other),
"PUT relay (task-per-tx): unexpected reply variant; treating as failure"
);
Err(OpError::UnexpectedOpState)
}
}
}
async fn relay_put_store_locally(
op_manager: &Arc<OpManager>,
incoming_tx: Transaction,
key: ContractKey,
value: WrappedState,
contract: &ContractContainer,
related_contracts: RelatedContracts<'static>,
htl: usize,
) -> Result<WrappedState, OpError> {
let was_hosting = op_manager.ring.is_hosting_contract(&key);
let (merged_value, _state_changed) = match super::put_contract(
op_manager,
key,
value.clone(),
related_contracts,
contract,
)
.await
{
Ok(result) => result,
Err(err) => {
tracing::error!(
tx = %incoming_tx,
contract = %key,
error = %err,
htl,
"PUT relay (task-per-tx): put_contract failed"
);
if let Some(event) = NetEventLog::put_failure(
&incoming_tx,
&op_manager.ring,
key,
OperationFailure::ContractError(err.to_string()),
Some(op_manager.ring.max_hops_to_live.saturating_sub(htl)),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
return Err(err);
}
};
if !was_hosting {
let evicted = op_manager
.ring
.host_contract(key, value.size() as u64, crate::ring::AccessType::Put)
.evicted;
crate::operations::announce_contract_hosted(op_manager, &key).await;
let mut removed_contracts = Vec::new();
for evicted_key in evicted {
if op_manager
.interest_manager
.unregister_local_hosting(&evicted_key)
{
removed_contracts.push(evicted_key);
}
}
let became_interested = op_manager.interest_manager.register_local_hosting(&key);
let added = if became_interested { vec![key] } else { vec![] };
if !added.is_empty() || !removed_contracts.is_empty() {
crate::operations::broadcast_change_interests(op_manager, added, removed_contracts)
.await;
}
}
debug_assert!(
op_manager.ring.is_hosting_contract(&key),
"PUT relay (task-per-tx): contract {key} must be in hosting list after put_contract + host_contract"
);
Ok(merged_value)
}
async fn relay_put_finalize_local(
op_manager: &OpManager,
incoming_tx: Transaction,
key: ContractKey,
merged_value: WrappedState,
upstream_addr: SocketAddr,
) -> Result<(), OpError> {
let own_location = op_manager.ring.connection_manager.own_location();
let hash = Some(state_hash_full(&merged_value));
let size = Some(merged_value.len());
if let Some(event) = NetEventLog::put_success(
&incoming_tx,
&op_manager.ring,
key,
own_location,
None,
hash,
size,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
relay_put_send_response(op_manager, incoming_tx, key, upstream_addr).await
}
async fn relay_put_send_response(
op_manager: &OpManager,
incoming_tx: Transaction,
key: ContractKey,
upstream_addr: SocketAddr,
) -> Result<(), OpError> {
let msg = NetMessage::from(PutMsg::Response {
id: incoming_tx,
key,
});
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(upstream_addr, msg)
.await
.map_err(|_| OpError::NotificationError)
}
#[cfg(any(test, feature = "testing"))]
pub static RELAY_PUT_STREAMING_DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_STREAMING_INFLIGHT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_STREAMING_SPAWNED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_STREAMING_COMPLETED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_PUT_STREAMING_DEDUP_REJECTS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_relay_put_streaming<CB>(
op_manager: Arc<OpManager>,
conn_manager: CB,
incoming_tx: Transaction,
stream_id: StreamId,
contract_key: ContractKey,
total_size: u64,
htl: usize,
skip_list: HashSet<SocketAddr>,
subscribe: bool,
upstream_addr: SocketAddr,
) -> Result<(), OpError>
where
CB: NetworkBridge + Clone + Send + 'static,
{
#[cfg(any(test, feature = "testing"))]
RELAY_PUT_STREAMING_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_put_txs.insert(incoming_tx) {
RELAY_PUT_STREAMING_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
contract = %contract_key,
%upstream_addr,
phase = "relay_put_streaming_dedup_reject",
"PUT streaming relay (task-per-tx): duplicate Request for in-flight tx, dropping"
);
return Ok(());
}
tracing::debug!(
tx = %incoming_tx,
contract = %contract_key,
%stream_id,
total_size,
htl,
%upstream_addr,
phase = "relay_put_streaming_start",
"PUT streaming relay (task-per-tx): spawning driver"
);
RELAY_PUT_STREAMING_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_PUT_STREAMING_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let guard = RelayPutStreamingInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
GlobalExecutor::spawn(run_relay_put_streaming(
guard,
op_manager,
conn_manager,
incoming_tx,
stream_id,
contract_key,
total_size,
htl,
skip_list,
subscribe,
upstream_addr,
));
Ok(())
}
struct RelayPutStreamingInflightGuard {
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
}
impl Drop for RelayPutStreamingInflightGuard {
fn drop(&mut self) {
self.op_manager
.active_relay_put_txs
.remove(&self.incoming_tx);
RELAY_PUT_STREAMING_INFLIGHT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
RELAY_PUT_STREAMING_COMPLETED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[allow(clippy::too_many_arguments)]
async fn run_relay_put_streaming<CB>(
guard: RelayPutStreamingInflightGuard,
op_manager: Arc<OpManager>,
conn_manager: CB,
incoming_tx: Transaction,
stream_id: StreamId,
contract_key: ContractKey,
total_size: u64,
htl: usize,
skip_list: HashSet<SocketAddr>,
subscribe: bool,
upstream_addr: SocketAddr,
) where
CB: NetworkBridge + Clone + Send + 'static,
{
let _guard = guard;
if let Err(err) = drive_relay_put_streaming(
&op_manager,
&conn_manager,
incoming_tx,
stream_id,
contract_key,
total_size,
htl,
skip_list,
subscribe,
upstream_addr,
)
.await
{
tracing::warn!(
tx = %incoming_tx,
error = %err,
phase = "relay_put_streaming_error",
"PUT streaming relay (task-per-tx): driver returned error"
);
}
tokio::task::yield_now().await;
op_manager.release_pending_op_slot(incoming_tx).await;
}
#[allow(clippy::too_many_arguments)]
async fn drive_relay_put_streaming<CB>(
op_manager: &Arc<OpManager>,
conn_manager: &CB,
incoming_tx: Transaction,
stream_id: StreamId,
contract_key: ContractKey,
total_size: u64,
htl: usize,
skip_list: HashSet<SocketAddr>,
subscribe: bool,
upstream_addr: SocketAddr,
) -> Result<(), OpError>
where
CB: NetworkBridge + Clone + Send + 'static,
{
tracing::info!(
tx = %incoming_tx,
contract = %contract_key,
%stream_id,
total_size,
htl,
%upstream_addr,
phase = "relay_put_streaming_request",
"PUT streaming relay (task-per-tx): processing RequestStreaming"
);
let stream_handle = match op_manager
.orphan_stream_registry()
.claim_or_wait(upstream_addr, stream_id, STREAM_CLAIM_TIMEOUT)
.await
{
Ok(handle) => handle,
Err(OrphanStreamError::AlreadyClaimed) => {
tracing::debug!(
tx = %incoming_tx,
%stream_id,
"PUT streaming relay (task-per-tx): stream already claimed, skipping"
);
return Ok(());
}
Err(err) => {
tracing::error!(
tx = %incoming_tx,
%stream_id,
error = %err,
"PUT streaming relay (task-per-tx): orphan stream claim failed"
);
return Err(OpError::OrphanStreamClaimFailed);
}
};
let mut new_skip_list = skip_list;
new_skip_list.insert(upstream_addr);
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
new_skip_list.insert(own_addr);
}
let next_hop = if htl > 0 {
op_manager
.ring
.closest_potentially_hosting(&contract_key, &new_skip_list)
} else {
None
};
let next_hop_addr = next_hop.as_ref().and_then(|p| p.socket_addr());
let piping = if let Some(next_addr) = next_hop_addr {
if crate::operations::should_use_streaming(
op_manager.streaming_threshold,
total_size as usize,
) {
let outbound_sid = StreamId::next_operations();
let forked_handle = stream_handle.fork();
let new_htl = htl.saturating_sub(1);
let pipe_metadata = PutMsg::RequestStreaming {
id: incoming_tx,
stream_id: outbound_sid,
contract_key,
total_size,
htl: new_htl,
skip_list: new_skip_list.clone(),
subscribe,
};
let pipe_metadata_net: NetMessage = pipe_metadata.into();
let embedded_metadata = match bincode::serialize(&pipe_metadata_net) {
Ok(bytes) => Some(bytes::Bytes::from(bytes)),
Err(e) => {
tracing::warn!(
tx = %incoming_tx,
error = %e,
"Failed to serialize piped stream metadata for embedding"
);
None
}
};
tracing::info!(
tx = %incoming_tx,
inbound_stream_id = %stream_id,
outbound_stream_id = %outbound_sid,
total_size,
peer_addr = %next_addr,
"Starting piped stream forwarding to next hop"
);
if let Some(ref peer) = next_hop {
if let Some(event) = NetEventLog::put_request(
&incoming_tx,
&op_manager.ring,
contract_key,
peer.clone(),
new_htl,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
Some((
next_addr,
pipe_metadata_net,
outbound_sid,
forked_handle,
embedded_metadata,
))
} else {
None
}
} else {
None
};
let downstream_reply_rx: Option<(SocketAddr, tokio::sync::mpsc::Receiver<NetMessage>)> =
if let Some((next_addr, metadata_net, outbound_sid, forked_handle, embedded_metadata)) =
piping
{
let mut ctx = op_manager.op_ctx(incoming_tx);
let rx_opt: Option<tokio::sync::mpsc::Receiver<NetMessage>> = match ctx
.send_to_and_register_waiter(next_addr, metadata_net)
.await
{
Ok(rx) => Some(rx),
Err(err) => {
tracing::warn!(
tx = %incoming_tx,
target = %next_addr,
error = %err,
"PUT streaming relay (task-per-tx): metadata register_waiter failed; will finalize locally"
);
None
}
};
if rx_opt.is_some() {
if let Err(err) = conn_manager
.pipe_stream(next_addr, outbound_sid, forked_handle, embedded_metadata)
.await
{
tracing::warn!(
tx = %incoming_tx,
target = %next_addr,
error = %err,
"PUT streaming relay (task-per-tx): pipe_stream failed after waiter install; \
will wait on downstream reply and bubble what we get"
);
}
}
rx_opt.map(|rx| (next_addr, rx))
} else {
None
};
let stream_data = match stream_handle.assemble().await {
Ok(data) => data,
Err(err) => {
tracing::error!(
tx = %incoming_tx,
%stream_id,
error = %err,
"PUT streaming relay (task-per-tx): stream assembly failed"
);
return Err(OpError::StreamCancelled);
}
};
let payload: PutStreamingPayload = match bincode::deserialize(&stream_data) {
Ok(p) => p,
Err(err) => {
tracing::error!(
tx = %incoming_tx,
%stream_id,
error = %err,
"PUT streaming relay (task-per-tx): payload deserialize failed"
);
return Err(OpError::invalid_transition(incoming_tx));
}
};
let PutStreamingPayload {
contract,
value,
related_contracts,
} = payload;
let key = contract.key();
if key != contract_key {
tracing::error!(
tx = %incoming_tx,
expected = %contract_key,
actual = %key,
"PUT streaming relay (task-per-tx): contract key mismatch"
);
return Err(OpError::invalid_transition(incoming_tx));
}
let merged_value = relay_put_store_locally(
op_manager,
incoming_tx,
key,
value,
&contract,
related_contracts,
htl,
)
.await?;
if let Some((next_addr, mut rx)) = downstream_reply_rx {
let reply = match tokio::time::timeout(OPERATION_TTL, rx.recv()).await {
Ok(Some(reply)) => reply,
Ok(None) => {
tracing::warn!(
tx = %incoming_tx,
target = %next_addr,
"PUT streaming relay (task-per-tx): downstream reply channel closed before reply"
);
if let Some(ref peer) = next_hop {
crate::operations::record_relay_route_event(
op_manager,
peer.clone(),
crate::ring::Location::from(&key),
crate::router::RouteOutcome::Failure,
crate::node::network_status::OpType::Put,
);
}
op_manager.release_pending_op_slot(incoming_tx).await;
return relay_put_send_response(op_manager, incoming_tx, key, upstream_addr).await;
}
Err(_elapsed) => {
tracing::warn!(
tx = %incoming_tx,
target = %next_addr,
"PUT streaming relay (task-per-tx): downstream reply timed out"
);
if let Some(ref peer) = next_hop {
crate::operations::record_relay_route_event(
op_manager,
peer.clone(),
crate::ring::Location::from(&key),
crate::router::RouteOutcome::Failure,
crate::node::network_status::OpType::Put,
);
}
op_manager.release_pending_op_slot(incoming_tx).await;
return relay_put_send_response(op_manager, incoming_tx, key, upstream_addr).await;
}
};
op_manager.release_pending_op_slot(incoming_tx).await;
match reply {
NetMessage::V1(NetMessageV1::Put(PutMsg::Response { key: reply_key, .. }))
| NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming {
key: reply_key, ..
})) => {
tracing::info!(
tx = %incoming_tx,
contract = %reply_key,
phase = "relay_put_streaming_bubble",
"PUT streaming relay (task-per-tx): downstream replied; bubbling Response upstream"
);
if let Some(ref peer) = next_hop {
crate::operations::record_relay_route_event(
op_manager,
peer.clone(),
crate::ring::Location::from(&reply_key),
crate::router::RouteOutcome::SuccessUntimed,
crate::node::network_status::OpType::Put,
);
}
relay_put_send_response(op_manager, incoming_tx, reply_key, upstream_addr).await
}
other => {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
reply_variant = ?std::mem::discriminant(&other),
"PUT streaming relay (task-per-tx): unexpected reply variant"
);
relay_put_send_response(op_manager, incoming_tx, key, upstream_addr).await
}
}
} else {
relay_put_finalize_local(op_manager, incoming_tx, key, merged_value, upstream_addr).await
}
}
#[cfg(test)]
mod tests {
use super::*;
fn dummy_key() -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([1u8; 32]), CodeHash::new([2u8; 32]))
}
fn dummy_tx() -> Transaction {
Transaction::new::<PutMsg>()
}
#[test]
fn classify_reply_response_is_stored() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Response { id: tx, key }));
assert!(matches!(classify_reply(&msg), ReplyClass::Stored { .. }));
}
#[test]
fn classify_reply_response_streaming_is_stored() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming {
id: tx,
key,
continue_forwarding: false,
}));
assert!(matches!(classify_reply(&msg), ReplyClass::Stored { .. }));
}
#[test]
fn classify_reply_forwarding_ack_is_unexpected() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::ForwardingAck {
id: tx,
contract_key: key,
}));
assert!(
matches!(classify_reply(&msg), ReplyClass::Unexpected),
"ForwardingAck must NOT be classified as terminal (Phase 2b bug 2)"
);
}
#[test]
fn finalize_put_at_originator_never_subscribes_from_driver() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let call_marker = "finalize_put_at_originator(";
let mut offset = 0;
let mut call_count = 0;
while let Some(pos) = SOURCE[offset..].find(call_marker) {
let abs_pos = offset + pos;
let window_end = SOURCE[abs_pos..]
.find(".await")
.map(|p| abs_pos + p)
.unwrap_or(SOURCE.len().min(abs_pos + 500));
let window = &SOURCE[abs_pos..window_end];
let after_struct = window.find("},").map(|p| &window[p..]);
if let Some(tail) = after_struct {
assert!(
!tail.contains("subscribe"),
"finalize_put_at_originator call in driver passes subscribe \
arguments that reference the `subscribe` variable instead of \
hardcoded `false`. This would cause double-subscription — \
subscriptions must be handled exclusively by maybe_subscribe_child. \
See commit 494a3c69 for the original fix."
);
}
call_count += 1;
offset = abs_pos + call_marker.len();
}
assert!(
call_count >= 1,
"Expected at least 1 finalize_put_at_originator call in the driver, \
found {call_count}"
);
}
#[test]
fn max_retries_boundary_exhausts_at_limit() {
let mut retries: usize = 0;
for _ in 0..MAX_RETRIES {
assert!(retries < MAX_RETRIES, "should not exhaust before limit");
retries += 1;
}
assert!(
retries >= MAX_RETRIES,
"should exhaust at MAX_RETRIES={MAX_RETRIES}"
);
}
#[test]
fn classify_reply_unexpected_for_non_put_message() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Aborted(tx));
assert!(matches!(classify_reply(&msg), ReplyClass::Unexpected));
}
#[test]
fn compute_put_attempt_timeout_matches_payload_streaming_decision() {
use crate::config::OPERATION_TTL;
let small_state = WrappedState::new(vec![0u8; 1024]);
let small_contract =
ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0u8; 256])),
Parameters::from(vec![]),
)));
assert_eq!(
compute_put_attempt_timeout(64 * 1024, &small_state, &small_contract),
OPERATION_TTL,
"non-streaming-eligible payload must reuse OPERATION_TTL"
);
let website_state = WrappedState::new(vec![0u8; 2_460_242 - 256]);
let timeout = compute_put_attempt_timeout(64 * 1024, &website_state, &small_contract);
assert!(
timeout > std::time::Duration::from_secs(63),
"website-scale payload timeout {timeout:?} must exceed observed \
completion (~62 s); otherwise issue #4001 recurs"
);
assert!(
timeout > OPERATION_TTL,
"website-scale payload timeout {timeout:?} must exceed OPERATION_TTL"
);
}
#[test]
fn payload_size_estimate_within_throughput_floor_safety_margin() {
use crate::operations::put::PutStreamingPayload;
let state = WrappedState::new(vec![0xABu8; 1024 * 1024]);
let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0x42u8; 200 * 1024])),
Parameters::from(vec![0x55u8; 4096]),
)));
let estimate = state.size() + contract.data().len();
let payload = PutStreamingPayload {
contract: contract.clone(),
related_contracts: RelatedContracts::default(),
value: state,
};
let actual = bincode::serialized_size(&payload).expect("serializable") as usize;
assert!(
actual <= estimate.saturating_mul(2),
"bincode payload size {actual} exceeds 2× estimate {estimate} — \
the 20 KiB/s throughput floor (half observed throughput) no \
longer absorbs the slack; either tighten the estimate (include \
parameters / related_contracts) or revisit \
STREAMING_THROUGHPUT_FLOOR_BPS in operations.rs"
);
}
#[test]
fn driver_outcome_exhausted_produces_client_error() {
let cause = "PUT to contract failed after 3 attempts".to_string();
let outcome: DriverOutcome = match RetryLoopOutcome::<ContractKey>::Exhausted(cause) {
RetryLoopOutcome::Exhausted(cause) => {
DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into()))
}
RetryLoopOutcome::Done(_)
| RetryLoopOutcome::Unexpected
| RetryLoopOutcome::InfraError(_) => unreachable!(),
};
assert!(
matches!(outcome, DriverOutcome::Publish(Err(_))),
"Exhaustion must produce a client error, not be swallowed"
);
}
#[test]
fn classify_reply_request_is_local_completion() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Request {
id: tx,
contract: ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0u8])),
Parameters::from(vec![]),
))),
related_contracts: RelatedContracts::default(),
value: WrappedState::new(vec![1u8]),
htl: 5,
skip_list: HashSet::new(),
}));
assert!(matches!(
classify_reply(&msg),
ReplyClass::LocalCompletion { .. }
));
}
fn relay_section(src: &str) -> &str {
let start = src
.find("pub(crate) async fn start_relay_put(")
.expect("start_relay_put not found");
let end = src
.find("\n#[cfg(test)]")
.expect("test module marker not found");
&src[start..end]
}
fn relay_slice_a_section(src: &str) -> &str {
let start = src
.find("pub(crate) async fn start_relay_put(")
.expect("start_relay_put not found");
let end = src
.find("// ── Relay streaming PUT driver")
.expect("slice B marker not found");
&src[start..end]
}
#[test]
fn start_relay_put_checks_dedup_gate() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_section(src);
let window_start = relay
.find("pub(crate) async fn start_relay_put(")
.expect("entry-point not found");
let spawn_pos = relay[window_start..]
.find("GlobalExecutor::spawn(run_relay_put(")
.expect("spawn site not found")
+ window_start;
let insert_pos = relay[window_start..]
.find("active_relay_put_txs.insert(incoming_tx)")
.expect("dedup insert site not found")
+ window_start;
assert!(
insert_pos < spawn_pos,
"active_relay_put_txs.insert MUST happen before GlobalExecutor::spawn"
);
}
#[test]
fn dedup_rejection_increments_counter() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_section(src);
let after_insert = relay
.split("active_relay_put_txs.insert(incoming_tx)")
.nth(1)
.expect("dedup insert site not found");
let window = &after_insert[..500.min(after_insert.len())];
assert!(
window.contains("RELAY_PUT_DEDUP_REJECTS.fetch_add"),
"dedup gate must increment RELAY_PUT_DEDUP_REJECTS on rejection"
);
}
#[test]
fn raii_guard_clears_dedup_set_on_drop() {
let src = include_str!("op_ctx_task.rs");
let drop_start = src
.find("impl Drop for RelayPutInflightGuard")
.expect("RelayPutInflightGuard Drop impl not found");
let drop_body = &src[drop_start..drop_start + 600];
assert!(
drop_body.contains("active_relay_put_txs"),
"RelayPutInflightGuard::drop must remove from active_relay_put_txs"
);
assert!(
drop_body.contains("RELAY_PUT_INFLIGHT.fetch_sub"),
"RelayPutInflightGuard::drop must decrement RELAY_PUT_INFLIGHT"
);
assert!(
drop_body.contains("RELAY_PUT_COMPLETED_TOTAL.fetch_add"),
"RelayPutInflightGuard::drop must increment RELAY_PUT_COMPLETED_TOTAL"
);
}
#[test]
fn drive_relay_put_forwards_via_send_to_and_await() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_put(")
.expect("drive_relay_put not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_put_finalize_local(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
assert!(
driver_src.contains("ctx.send_to_and_await("),
"drive_relay_put must forward downstream via send_to_and_await \
so the downstream Response bubbles back to this relay"
);
assert!(
!driver_src.contains("send_fire_and_forget"),
"drive_relay_put must NOT fire-and-forget the downstream forward; \
PUT relay is req/response"
);
}
#[test]
fn drive_relay_put_reuses_incoming_tx_on_forward() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_put(")
.expect("drive_relay_put not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_put_finalize_local(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
let forward_pos = driver_src
.find("PutMsg::Request {")
.expect("forward PutMsg::Request not found in driver");
let forward_window = &driver_src[forward_pos..forward_pos + 400];
assert!(
forward_window.contains("id: incoming_tx"),
"relay forward must reuse incoming_tx; minting a fresh tx per hop \
breaks the downstream dispatch gate's has_put_op check"
);
assert!(
!forward_window.contains("Transaction::new::<PutMsg>()"),
"relay forward must NOT mint a fresh Transaction"
);
}
#[test]
fn relay_put_send_response_is_fire_and_forget() {
let src = include_str!("op_ctx_task.rs");
let fn_start = src
.find("async fn relay_put_send_response(")
.expect("relay_put_send_response not found");
let fn_end = src[fn_start..]
.find("\n}\n")
.expect("function body end not found")
+ fn_start;
let fn_src = &src[fn_start..fn_end];
assert!(
fn_src.contains("send_fire_and_forget"),
"relay_put_send_response must use send_fire_and_forget for the \
upstream response"
);
}
#[test]
fn slice_a_does_not_touch_put_streaming_variants() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_slice_a_section(src);
assert!(
!relay.contains("PutMsg::RequestStreaming"),
"slice A must not handle PutMsg::RequestStreaming (belongs to slice B)"
);
let builds_streaming = relay.contains("NetMessage::from(PutMsg::ResponseStreaming")
|| relay.contains("PutMsg::ResponseStreaming {\n") && {
let pos = relay
.find("PutMsg::ResponseStreaming {")
.expect("anchor present by guard");
let window = &relay[pos..pos + 200.min(relay.len() - pos)];
window.contains("id: ")
};
assert!(
!builds_streaming,
"slice A driver must not CONSTRUCT a PutMsg::ResponseStreaming"
);
}
#[test]
fn drive_relay_put_stores_locally_before_forwarding() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_put(")
.expect("drive_relay_put not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_put_store_locally(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
let store_pos = driver_src
.find("relay_put_store_locally(")
.expect("relay_put_store_locally call missing in driver");
let forward_pos = driver_src
.find("ctx.send_to_and_await(")
.expect("send_to_and_await call missing in driver");
assert!(
store_pos < forward_pos,
"local store MUST run before the downstream forward"
);
let helper_start = src
.find("async fn relay_put_store_locally(")
.expect("helper not found");
let helper_end = src[helper_start..]
.find("\nasync fn relay_put_finalize_local(")
.expect("helper body end not found")
+ helper_start;
let helper_src = &src[helper_start..helper_end];
assert!(
helper_src.contains("super::put_contract("),
"helper MUST call put_contract"
);
assert!(
helper_src.contains("host_contract("),
"helper MUST call ring.host_contract for first-time hosting"
);
assert!(
helper_src.contains("announce_contract_hosted"),
"helper MUST call announce_contract_hosted for first-time hosting"
);
}
fn relay_slice_b_section(src: &str) -> &str {
let start = src
.find("// ── Relay streaming PUT driver")
.expect("slice B marker not found");
let end = src
.find("\n// ── End of relay PUT driver ─")
.expect("end-of-driver marker not found");
&src[start..end]
}
#[test]
fn start_relay_put_streaming_checks_dedup_gate() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let insert_pos = b
.find("active_relay_put_txs.insert(incoming_tx)")
.expect("dedup insert not found");
let spawn_pos = b
.find("GlobalExecutor::spawn(run_relay_put_streaming(")
.expect("spawn site not found");
assert!(
insert_pos < spawn_pos,
"dedup-gate MUST run before spawning the streaming driver"
);
}
#[test]
fn start_relay_put_streaming_dedup_reject_is_silent() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let insert_pos = b
.find("active_relay_put_txs.insert(incoming_tx)")
.expect("dedup anchor not found");
let window = &b[insert_pos..];
let close_pos = window
.find("\n }\n")
.expect("dedup-branch close not found");
let branch = &window[..close_pos];
assert!(
branch.contains("RELAY_PUT_STREAMING_DEDUP_REJECTS.fetch_add"),
"dedup gate must increment RELAY_PUT_STREAMING_DEDUP_REJECTS on rejection"
);
assert!(
!branch.contains("send_fire_and_forget"),
"dedup-reject must NOT fire a fabricated Response (no NotFound variant in PutMsg)"
);
}
#[test]
fn relay_put_streaming_guard_drop_is_balanced() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let drop_start = b
.find("impl Drop for RelayPutStreamingInflightGuard")
.expect("guard Drop impl not found");
let drop_end = b[drop_start..]
.find("\n}\n")
.expect("guard Drop body end not found")
+ drop_start;
let drop_body = &b[drop_start..drop_end];
assert!(
drop_body.contains("RELAY_PUT_STREAMING_INFLIGHT.fetch_sub"),
"guard::drop must decrement RELAY_PUT_STREAMING_INFLIGHT"
);
assert!(
drop_body.contains("RELAY_PUT_STREAMING_COMPLETED_TOTAL.fetch_add"),
"guard::drop must increment RELAY_PUT_STREAMING_COMPLETED_TOTAL"
);
assert!(
drop_body.contains("active_relay_put_txs"),
"guard::drop must reference active_relay_put_txs"
);
assert!(
drop_body.contains(".remove(&self.incoming_tx)"),
"guard::drop must remove the tx from active_relay_put_txs"
);
}
#[test]
fn drive_relay_put_streaming_uses_claim_and_pipe() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let driver_start = b
.find("async fn drive_relay_put_streaming")
.expect("drive_relay_put_streaming not found");
let driver = &b[driver_start..];
assert!(
driver.contains("orphan_stream_registry()"),
"streaming driver must claim via orphan_stream_registry"
);
assert!(
driver.contains("claim_or_wait(upstream_addr"),
"claim MUST use upstream_addr + inbound stream_id"
);
assert!(
driver.contains(".pipe_stream("),
"streaming driver must call pipe_stream for forwarding"
);
assert!(
driver.contains("relay_put_store_locally("),
"streaming driver must reuse the shared local-store helper"
);
}
#[test]
fn drive_relay_put_streaming_omits_forwarding_ack() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let driver_start = b
.find("async fn drive_relay_put_streaming")
.expect("drive_relay_put_streaming not found");
let driver = &b[driver_start..];
assert!(
!driver.contains("NetMessage::from(PutMsg::ForwardingAck")
&& !driver.contains("PutMsg::ForwardingAck {"),
"slice B driver must not construct PutMsg::ForwardingAck"
);
}
#[test]
fn drive_relay_put_streaming_reuses_incoming_tx_on_forward() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let pipe_meta_pos = b
.find("PutMsg::RequestStreaming {")
.expect("outbound RequestStreaming construction not found");
let window = &b[pipe_meta_pos..pipe_meta_pos + 300.min(b.len() - pipe_meta_pos)];
assert!(
window.contains("id: incoming_tx"),
"piped metadata must reuse incoming_tx (not mint fresh)"
);
}
#[test]
fn drive_relay_put_streaming_already_claimed_is_silent() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let ac_pos = b
.find("OrphanStreamError::AlreadyClaimed")
.expect("AlreadyClaimed arm not found");
let arm = &b[ac_pos..ac_pos + 600.min(b.len() - ac_pos)];
assert!(
arm.contains("return Ok(())"),
"AlreadyClaimed arm must return Ok(()) — the still-in-flight \
driver owns the upstream reply; this duplicate must exit silently"
);
assert!(
!arm.contains("send_fire_and_forget"),
"AlreadyClaimed arm must NOT fabricate a Response upstream"
);
}
#[test]
fn drive_relay_put_streaming_claim_failure_is_silent() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let pos = b
.find("OrphanStreamClaimFailed")
.expect("OrphanStreamClaimFailed not found in slice B");
let window = &b[..pos];
let err_arm_start = window
.rfind("Err(err) =>")
.expect("orphan-claim Err arm not found");
let arm = &b[err_arm_start..pos + 100];
assert!(
!arm.contains("send_fire_and_forget"),
"orphan-claim failure must NOT fabricate a success Response upstream"
);
}
#[test]
fn drive_relay_put_streaming_timeout_bubbles_response() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let pos = b
.find("downstream reply timed out")
.expect("timeout arm not found");
let arm = &b[pos..pos + 800.min(b.len() - pos)];
assert!(
arm.contains("relay_put_send_response"),
"timeout arm must still call relay_put_send_response so the \
upstream waiter is not left to its own OPERATION_TTL"
);
}
#[test]
fn drive_relay_put_streaming_store_failure_paths_do_not_fabricate() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let driver_start = b
.find("async fn drive_relay_put_streaming")
.expect("driver not found");
let driver = &b[driver_start..];
for phrase in [
"stream assembly failed",
"contract key mismatch",
"payload deserialize failed",
] {
let anchor = driver
.find(phrase)
.unwrap_or_else(|| panic!("failure-path anchor {phrase:?} not found"));
let tail = &driver[anchor..];
let ret_pos = tail
.find("return Err(")
.unwrap_or_else(|| panic!("no return Err after {phrase:?}"));
let pre_return = &tail[..ret_pos];
assert!(
!pre_return.contains("send_fire_and_forget"),
"failure path {phrase:?} must not fabricate a Response upstream"
);
}
}
#[test]
fn drive_relay_put_streaming_registers_waiter_before_pipe_stream() {
let src = include_str!("op_ctx_task.rs");
let b = relay_slice_b_section(src);
let driver_start = b
.find("async fn drive_relay_put_streaming")
.expect("driver not found");
let driver = &b[driver_start..];
let register_pos = driver
.find("send_to_and_register_waiter(")
.expect("register_waiter call not found");
let pipe_pos = driver
.find(".pipe_stream(")
.expect("pipe_stream call not found");
assert!(
register_pos < pipe_pos,
"send_to_and_register_waiter MUST precede pipe_stream so the \
pending_op_results callback is installed before fragments \
land downstream (otherwise a fast reply races past the waiter)"
);
}
#[test]
fn drive_relay_put_records_route_events_on_transport_failure() {
let src = include_str!("op_ctx_task.rs");
let body_start = src
.find("async fn drive_relay_put(")
.expect("drive_relay_put fn must exist");
let body_end = src[body_start..]
.find("/// Store a relayed PUT")
.map(|i| body_start + i)
.unwrap_or(src.len());
let body = &src[body_start..body_end];
for log_phrase in ["send_to_and_await failed", "downstream timed out"] {
let pos = body
.find(log_phrase)
.unwrap_or_else(|| panic!("expected `{log_phrase}` in drive_relay_put"));
let after = &body[pos..pos + 1500.min(body.len() - pos)];
assert!(
after.contains("record_relay_route_event")
&& after.contains("RouteOutcome::Failure"),
"drive_relay_put arm for `{log_phrase}` must call \
record_relay_route_event with RouteOutcome::Failure. \
Without this, transport failures from relay-forwarded \
PUTs are dropped — the regression PR #4051 fixes."
);
}
let pos = body
.find("downstream returned Response; bubbling upstream")
.expect("Response success arm not found");
let after = &body[pos..pos + 1500.min(body.len() - pos)];
assert!(
after.contains("record_relay_route_event")
&& after.contains("RouteOutcome::SuccessUntimed"),
"drive_relay_put Response arm must record SuccessUntimed."
);
}
}