use std::net::SocketAddr;
use std::sync::Arc;
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
use crate::client_events::HostResult;
use crate::config::{GlobalExecutor, OPERATION_TTL};
use crate::contract::{ContractHandlerEvent, StoreResponse};
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::OpManager;
#[rustfmt::skip]
use crate::operations::op_ctx::{
AdvanceOutcome, AttemptOutcome, RetryDriver, RetryLoopOutcome, drive_retry_loop,
};
use crate::operations::OpError;
use crate::operations::VisitedPeers;
use crate::ring::{Location, PeerKeyLocation};
use crate::router::{RouteEvent, RouteOutcome};
use crate::transport::peer_connection::StreamId;
use super::{GetMsg, GetMsgResult, GetStreamingPayload};
use crate::operations::orphan_streams::{OrphanStreamError, STREAM_CLAIM_TIMEOUT};
#[cfg(any(test, feature = "testing"))]
pub static DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[cfg(any(test, feature = "testing"))]
pub static RELAY_DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_INFLIGHT: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_SPAWNED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_COMPLETED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_DEDUP_REJECTS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_client_get(
op_manager: Arc<OpManager>,
client_tx: Transaction,
instance_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<Transaction, OpError> {
#[cfg(any(test, feature = "testing"))]
DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
tracing::debug!(
tx = %client_tx,
contract = %instance_id,
"get (task-per-tx): spawning client-initiated task"
);
GlobalExecutor::spawn(run_client_get(
op_manager,
client_tx,
instance_id,
return_contract_code,
subscribe,
blocking_subscribe,
));
Ok(client_tx)
}
async fn run_client_get(
op_manager: Arc<OpManager>,
client_tx: Transaction,
instance_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) {
let outcome = drive_client_get(
op_manager.clone(),
client_tx,
instance_id,
return_contract_code,
subscribe,
blocking_subscribe,
)
.await;
deliver_outcome(&op_manager, client_tx, outcome);
}
#[derive(Debug)]
enum DriverOutcome {
Publish(HostResult),
InfrastructureError(OpError),
}
async fn drive_client_get(
op_manager: Arc<OpManager>,
client_tx: Transaction,
instance_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) -> DriverOutcome {
match drive_client_get_inner(
&op_manager,
client_tx,
instance_id,
return_contract_code,
subscribe,
blocking_subscribe,
)
.await
{
Ok(outcome) => outcome,
Err(err) => DriverOutcome::InfrastructureError(err),
}
}
async fn drive_client_get_inner(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
instance_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<DriverOutcome, OpError> {
let htl = op_manager.ring.max_hops_to_live;
let mut tried: Vec<std::net::SocketAddr> = Vec::new();
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
tried.push(own_addr);
}
let initial_target = op_manager
.ring
.k_closest_potentially_hosting(&instance_id, tried.as_slice(), 1)
.into_iter()
.next();
let current_target = match initial_target {
Some(peer) => {
if let Some(addr) = peer.socket_addr() {
tried.push(addr);
}
peer
}
None => op_manager.ring.connection_manager.own_location(),
};
let mut driver = GetRetryDriver {
op_manager,
instance_id,
htl,
tried,
retries: 0,
current_target,
attempt_visited: VisitedPeers::new(&client_tx),
};
let loop_result = drive_retry_loop(op_manager, client_tx, "get", &mut driver).await;
match loop_result {
RetryLoopOutcome::Done(terminal) => {
op_manager.completed(client_tx);
let reply_key = match &terminal {
Terminal::InlineFound {
key,
state,
contract,
} => {
cache_contract_locally(
op_manager,
*key,
state.clone(),
contract.clone(),
true, )
.await;
*key
}
Terminal::Streaming {
key,
stream_id,
includes_contract,
} => {
if let Some(peer_addr) = driver.current_target.socket_addr() {
if let Err(e) = assemble_and_cache_stream(
op_manager,
peer_addr,
*stream_id,
*key,
*includes_contract,
)
.await
{
tracing::warn!(
%key,
error = %e,
"get (task-per-tx): stream assembly failed — \
state will not be cached locally"
);
}
} else {
tracing::warn!(
%key,
"get (task-per-tx): current_target has no socket_addr; \
cannot claim orphan stream"
);
}
*key
}
Terminal::LocalCompletion => {
match lookup_stored_key(op_manager, &instance_id).await {
Some(k) => k,
None => synthetic_key(&instance_id),
}
}
};
let host_result =
build_host_response(op_manager, &instance_id, return_contract_code).await;
if host_result.is_ok()
&& !subscribe
&& crate::ring::AUTO_SUBSCRIBE_ON_GET
&& !op_manager.ring.is_subscribed(&reply_key)
{
let path_label = match &terminal {
Terminal::Streaming { .. } => "streaming (task-per-tx)",
Terminal::InlineFound { .. } | Terminal::LocalCompletion => {
"non-streaming (task-per-tx)"
}
};
crate::operations::auto_subscribe_on_get_response(
op_manager,
&reply_key,
&client_tx,
&Some(driver.current_target.clone()),
false,
blocking_subscribe,
path_label,
)
.await;
}
let contract_location = Location::from(&reply_key);
let route_event = RouteEvent {
peer: driver.current_target.clone(),
contract_location,
outcome: if host_result.is_ok() {
RouteOutcome::SuccessUntimed
} else {
RouteOutcome::Failure
},
op_type: Some(crate::node::network_status::OpType::Get),
};
if let Some(log_event) =
crate::tracing::NetEventLog::route_event(&client_tx, &op_manager.ring, &route_event)
{
op_manager
.ring
.register_events(either::Either::Left(log_event))
.await;
}
op_manager.ring.routing_finished(route_event);
crate::node::network_status::record_op_result(
crate::node::network_status::OpType::Get,
host_result.is_ok(),
);
maybe_subscribe_child(
op_manager,
client_tx,
reply_key,
subscribe,
blocking_subscribe,
)
.await;
Ok(DriverOutcome::Publish(host_result))
}
RetryLoopOutcome::Exhausted(cause) => {
Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into())))
}
RetryLoopOutcome::Unexpected => Err(OpError::UnexpectedOpState),
RetryLoopOutcome::InfraError(err) => Err(err),
}
}
struct GetRetryDriver<'a> {
op_manager: &'a OpManager,
instance_id: ContractInstanceId,
htl: usize,
tried: Vec<std::net::SocketAddr>,
retries: usize,
current_target: PeerKeyLocation,
attempt_visited: VisitedPeers,
}
#[derive(Debug)]
enum Terminal {
InlineFound {
key: ContractKey,
state: WrappedState,
contract: Option<ContractContainer>,
},
Streaming {
key: ContractKey,
stream_id: StreamId,
includes_contract: bool,
},
LocalCompletion,
}
fn classify(reply: NetMessage) -> AttemptOutcome<Terminal> {
match reply {
NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
result:
GetMsgResult::Found {
key,
value:
StoreResponse {
state: Some(state),
contract,
},
},
..
})) => AttemptOutcome::Terminal(Terminal::InlineFound {
key,
state,
contract,
}),
NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
result: GetMsgResult::Found { value, .. },
..
})) => {
tracing::warn!(
?value,
"get (task-per-tx): Response{{Found}} arrived without state"
);
AttemptOutcome::Unexpected
}
NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
result: GetMsgResult::NotFound,
..
})) => AttemptOutcome::Retry,
NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreaming {
key,
stream_id,
includes_contract,
..
})) => AttemptOutcome::Terminal(Terminal::Streaming {
key,
stream_id,
includes_contract,
}),
NetMessage::V1(NetMessageV1::Get(GetMsg::Request { .. })) => {
AttemptOutcome::Terminal(Terminal::LocalCompletion)
}
NetMessage::V1(NetMessageV1::Get(
GetMsg::ForwardingAck { .. } | GetMsg::ResponseStreamingAck { .. },
)) => AttemptOutcome::Unexpected,
_ => AttemptOutcome::Unexpected,
}
}
impl RetryDriver for GetRetryDriver<'_> {
type Terminal = Terminal;
fn new_attempt_tx(&mut self) -> Transaction {
let tx = Transaction::new::<GetMsg>();
self.attempt_visited = VisitedPeers::new(&tx);
tx
}
fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {
NetMessage::from(GetMsg::Request {
id: attempt_tx,
instance_id: self.instance_id,
fetch_contract: true,
htl: self.htl,
visited: self.attempt_visited.clone(),
subscribe: false,
})
}
fn classify(&mut self, reply: NetMessage) -> AttemptOutcome<Terminal> {
classify(reply)
}
fn advance(&mut self) -> AdvanceOutcome {
match advance_to_next_peer(
self.op_manager,
&self.instance_id,
&mut self.tried,
&mut self.retries,
) {
Some((next_target, _next_addr)) => {
self.current_target = next_target;
AdvanceOutcome::Next
}
None => AdvanceOutcome::Exhausted,
}
}
}
async fn build_host_response(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
return_contract_code: bool,
) -> HostResult {
let lookup = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code,
})
.await;
match lookup {
Ok(ContractHandlerEvent::GetResponse {
key: Some(resolved_key),
response:
Ok(StoreResponse {
state: Some(state),
contract,
}),
}) => {
let client_contract = if return_contract_code { contract } else { None };
Ok(HostResponse::ContractResponse(
ContractResponse::GetResponse {
key: resolved_key,
contract: client_contract,
state,
},
))
}
_ => {
tracing::warn!(
contract = %instance_id,
"get (task-per-tx): terminal reply classified success but local \
store lookup returned no state; synthesizing client error"
);
Err(ErrorKind::OperationError {
cause: format!(
"GET succeeded on wire but local store lookup failed for {instance_id}"
)
.into(),
}
.into())
}
}
}
fn synthetic_key(instance_id: &ContractInstanceId) -> ContractKey {
ContractKey::from_id_and_code(*instance_id, CodeHash::new([0u8; 32]))
}
async fn cache_contract_locally(
op_manager: &OpManager,
key: ContractKey,
state: WrappedState,
contract: Option<ContractContainer>,
is_client_requester: bool,
) {
let state_size = state.size() as u64;
let local_state = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await;
let state_matches = matches!(
&local_state,
Ok(ContractHandlerEvent::GetResponse {
response: Ok(StoreResponse {
state: Some(local),
..
}),
..
}) if local.as_ref() == state.as_ref(),
);
let put_persisted = if state_matches {
tracing::debug!(
%key,
"get (task-per-tx): local state matches, skipping redundant PutQuery"
);
false
} else if let Some(contract_code) = contract {
match op_manager
.notify_contract_handler(ContractHandlerEvent::PutQuery {
key,
state,
related_contracts: RelatedContracts::default(),
contract: Some(contract_code),
})
.await
{
Ok(ContractHandlerEvent::PutResponse {
new_value: Ok(_), ..
}) => true,
Ok(ContractHandlerEvent::PutResponse {
new_value: Err(err),
..
}) => {
tracing::warn!(
%key,
%err,
"get (task-per-tx): PutQuery rejected by executor"
);
false
}
Ok(other) => {
tracing::warn!(
%key,
?other,
"get (task-per-tx): PutQuery returned unexpected event"
);
false
}
Err(err) => {
tracing::warn!(
%key,
%err,
"get (task-per-tx): PutQuery failed"
);
false
}
}
} else {
tracing::debug!(
%key,
"get (task-per-tx): skipping local cache — contract code missing"
);
false
};
let access_result = op_manager.ring.record_get_access(key, state_size);
if is_client_requester {
op_manager.ring.mark_local_client_access(&key);
}
let mut removed_contracts = Vec::new();
for evicted_key in &access_result.evicted {
if op_manager
.interest_manager
.unregister_local_hosting(evicted_key)
{
removed_contracts.push(*evicted_key);
}
}
if access_result.is_new && put_persisted {
crate::operations::announce_contract_hosted(op_manager, &key).await;
let became_interested = op_manager.interest_manager.register_local_hosting(&key);
let added = if became_interested { vec![key] } else { vec![] };
if !added.is_empty() || !removed_contracts.is_empty() {
crate::operations::broadcast_change_interests(op_manager, added, removed_contracts)
.await;
}
} else if !removed_contracts.is_empty() {
crate::operations::broadcast_change_interests(op_manager, vec![], removed_contracts).await;
}
}
async fn assemble_and_cache_stream(
op_manager: &OpManager,
peer_addr: std::net::SocketAddr,
stream_id: StreamId,
expected_key: ContractKey,
includes_contract: bool,
) -> Result<(), String> {
let handle = match op_manager
.orphan_stream_registry()
.claim_or_wait(peer_addr, stream_id, STREAM_CLAIM_TIMEOUT)
.await
{
Ok(h) => h,
Err(OrphanStreamError::AlreadyClaimed) => {
tracing::debug!(
%peer_addr,
%stream_id,
"stream already claimed (dedup)"
);
return Ok(());
}
Err(e) => return Err(format!("claim_or_wait: {e}")),
};
let bytes = handle
.assemble()
.await
.map_err(|e| format!("stream assembly: {e}"))?;
let payload: GetStreamingPayload =
bincode::deserialize(&bytes).map_err(|e| format!("deserialize: {e}"))?;
if payload.key != expected_key {
return Err(format!(
"stream key mismatch: expected {expected_key}, got {}",
payload.key
));
}
let Some(state) = payload.value.state else {
return Err("stream payload has no state".into());
};
let contract = if includes_contract {
payload.value.contract
} else {
None
};
cache_contract_locally(op_manager, payload.key, state, contract, true).await;
Ok(())
}
async fn lookup_stored_key(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
) -> Option<ContractKey> {
let lookup = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code: false,
})
.await;
match lookup {
Ok(ContractHandlerEvent::GetResponse {
key: Some(key),
response: Ok(_),
}) => Some(key),
_ => None,
}
}
const MAX_RETRIES: usize = 3;
fn advance_to_next_peer(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
tried: &mut Vec<std::net::SocketAddr>,
retries: &mut usize,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)> {
if *retries >= MAX_RETRIES {
return None;
}
*retries += 1;
let peer = op_manager
.ring
.k_closest_potentially_hosting(instance_id, tried.as_slice(), 1)
.into_iter()
.next()?;
let addr = peer.socket_addr()?;
tried.push(addr);
Some((peer, addr))
}
async fn maybe_subscribe_child(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
key: ContractKey,
subscribe: bool,
blocking_subscribe: bool,
) {
if !subscribe {
return;
}
use crate::operations::subscribe;
let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&client_tx);
if blocking_subscribe {
subscribe::run_client_subscribe(op_manager.clone(), *key.id(), child_tx).await;
} else {
GlobalExecutor::spawn(subscribe::run_client_subscribe(
op_manager.clone(),
*key.id(),
child_tx,
));
}
}
fn deliver_outcome(op_manager: &OpManager, client_tx: Transaction, outcome: DriverOutcome) {
match outcome {
DriverOutcome::Publish(result) => {
op_manager.send_client_result(client_tx, result);
}
DriverOutcome::InfrastructureError(err) => {
tracing::warn!(
tx = %client_tx,
error = %err,
"get (task-per-tx): infrastructure error; publishing synthesized client error"
);
let synthesized: HostResult = Err(ErrorKind::OperationError {
cause: format!("GET failed: {err}").into(),
}
.into());
op_manager.send_client_result(client_tx, synthesized);
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_relay_get(
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
instance_id: ContractInstanceId,
htl: usize,
upstream_addr: SocketAddr,
visited: VisitedPeers,
fetch_contract: bool,
subscribe: bool,
) -> Result<(), OpError> {
#[cfg(any(test, feature = "testing"))]
RELAY_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_get_txs.insert(incoming_tx) {
RELAY_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
%instance_id,
%upstream_addr,
phase = "relay_dedup_reject",
"GET relay (task-per-tx): duplicate Request for in-flight tx, dropping"
);
return Ok(());
}
tracing::debug!(
tx = %incoming_tx,
%instance_id,
htl,
%upstream_addr,
phase = "relay_start",
"GET relay (task-per-tx): spawning driver"
);
GlobalExecutor::spawn(run_relay_get(
op_manager,
incoming_tx,
instance_id,
htl,
upstream_addr,
visited,
fetch_contract,
subscribe,
));
Ok(())
}
struct RelayInflightGuard {
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
}
impl Drop for RelayInflightGuard {
fn drop(&mut self) {
self.op_manager
.active_relay_get_txs
.remove(&self.incoming_tx);
RELAY_INFLIGHT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
RELAY_COMPLETED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[allow(clippy::too_many_arguments)]
async fn run_relay_get(
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
instance_id: ContractInstanceId,
htl: usize,
upstream_addr: SocketAddr,
visited: VisitedPeers,
fetch_contract: bool,
subscribe: bool,
) {
RELAY_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let _guard = RelayInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
if let Err(err) = drive_relay_get(
&op_manager,
incoming_tx,
instance_id,
htl,
upstream_addr,
visited,
fetch_contract,
subscribe,
)
.await
{
tracing::warn!(
tx = %incoming_tx,
%instance_id,
error = %err,
phase = "relay_infra_error",
"GET relay (task-per-tx): infrastructure error in driver"
);
}
tokio::task::yield_now().await;
op_manager.release_pending_op_slot(incoming_tx).await;
}
#[allow(clippy::too_many_arguments)]
async fn drive_relay_get(
op_manager: &Arc<OpManager>,
incoming_tx: Transaction,
instance_id: ContractInstanceId,
htl: usize,
upstream_addr: SocketAddr,
visited: VisitedPeers,
fetch_contract: bool,
subscribe: bool,
) -> Result<(), OpError> {
match drive_relay_get_inner(
op_manager,
incoming_tx,
instance_id,
htl,
upstream_addr,
visited,
fetch_contract,
subscribe,
)
.await
{
Ok(()) => Ok(()),
Err(err) => {
tracing::warn!(
tx = %incoming_tx,
%instance_id,
error = %err,
phase = "relay_inner_error",
"GET relay (task-per-tx): inner driver returned error; sending NotFound upstream"
);
relay_send_not_found(op_manager, incoming_tx, instance_id, upstream_addr).await;
Err(err)
}
}
}
type LocalFallback = (ContractKey, WrappedState, Option<ContractContainer>);
async fn check_local_with_interest_gate(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
fetch_contract: bool,
) -> (Option<LocalFallback>, Option<LocalFallback>) {
let get_result = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code: fetch_contract,
})
.await;
let raw_local = match get_result {
Ok(ContractHandlerEvent::GetResponse {
key: Some(key),
response:
Ok(StoreResponse {
state: Some(state),
contract,
}),
}) => {
if fetch_contract && contract.is_none() {
None
} else {
Some((key, state, contract))
}
}
_ => None,
};
match raw_local {
None => (None, None),
Some((key, state, contract)) => {
if !op_manager.interest_manager.has_local_interest(&key) {
tracing::debug!(
%instance_id,
"GET relay: stale cache (no local interest), will forward"
);
(None, Some((key, state, contract)))
} else {
(Some((key, state, contract)), None)
}
}
}
}
fn relay_advance_to_next_peer(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
tried: &mut Vec<SocketAddr>,
retries: &mut usize,
new_visited: &VisitedPeers,
) -> Option<(PeerKeyLocation, SocketAddr)> {
const MAX_RELAY_RETRIES: usize = 1;
if *retries >= MAX_RELAY_RETRIES {
return None;
}
*retries += 1;
let peer = op_manager
.ring
.k_closest_potentially_hosting(instance_id, new_visited.clone(), 1)
.into_iter()
.next()?;
let addr = peer.socket_addr()?;
if tried.contains(&addr) {
return None;
}
tried.push(addr);
Some((peer, addr))
}
async fn relay_send_not_found(
op_manager: &OpManager,
tx: Transaction,
instance_id: ContractInstanceId,
upstream_addr: SocketAddr,
) {
let msg = NetMessage::from(GetMsg::Response {
id: tx,
instance_id,
result: GetMsgResult::NotFound,
});
let mut ctx = op_manager.op_ctx(tx);
if let Err(err) = ctx.send_fire_and_forget(upstream_addr, msg).await {
tracing::warn!(
tx = %tx,
%instance_id,
%upstream_addr,
error = %err,
"GET relay: failed to send NotFound upstream"
);
}
}
async fn relay_send_found(
op_manager: &OpManager,
tx: Transaction,
instance_id: ContractInstanceId,
upstream_addr: SocketAddr,
key: ContractKey,
state: WrappedState,
contract: Option<ContractContainer>,
) -> Result<(), OpError> {
let msg = NetMessage::from(GetMsg::Response {
id: tx,
instance_id,
result: GetMsgResult::Found {
key,
value: StoreResponse {
state: Some(state),
contract,
},
},
});
let mut ctx = op_manager.op_ctx(tx);
ctx.send_fire_and_forget(upstream_addr, msg)
.await
.map_err(|_| OpError::NotificationError)
}
#[allow(clippy::too_many_arguments)]
async fn drive_relay_get_inner(
op_manager: &Arc<OpManager>,
incoming_tx: Transaction,
instance_id: ContractInstanceId,
htl: usize,
upstream_addr: SocketAddr,
visited: VisitedPeers,
fetch_contract: bool,
subscribe: bool,
) -> Result<(), OpError> {
let ring_max_htl = op_manager.ring.max_hops_to_live.max(1);
let htl = htl.min(ring_max_htl);
if htl == 0 {
tracing::warn!(
tx = %incoming_tx,
%instance_id,
%upstream_addr,
htl = 0,
phase = "not_found",
"GET relay (task-per-tx): HTL exhausted — sending NotFound upstream"
);
if let Some(event) = crate::tracing::NetEventLog::get_not_found(
&incoming_tx,
&op_manager.ring,
instance_id,
Some(op_manager.ring.max_hops_to_live),
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
relay_send_not_found(op_manager, incoming_tx, instance_id, upstream_addr).await;
return Ok(());
}
let mut new_visited = visited.with_transaction(&incoming_tx);
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
new_visited.mark_visited(own_addr);
}
new_visited.mark_visited(upstream_addr);
let (local_value, local_fallback) =
check_local_with_interest_gate(op_manager, &instance_id, fetch_contract).await;
if let Some((key, state, contract)) = local_value {
tracing::info!(
tx = %incoming_tx,
%instance_id,
contract = %key,
phase = "complete",
"GET relay (task-per-tx): contract found locally (active interest) — sending Found upstream"
);
if subscribe {
crate::operations::subscribe::register_downstream_subscriber(
op_manager,
&key,
upstream_addr,
None,
None,
&incoming_tx,
" (relay task-per-tx, piggybacked on GET)",
)
.await;
} else if let Some(pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(upstream_addr)
{
let peer_key = crate::ring::interest::PeerKey::from(pkl.pub_key.clone());
op_manager
.interest_manager
.register_peer_interest(&key, peer_key, None, false);
}
cache_contract_locally(op_manager, key, state.clone(), contract.clone(), false).await;
relay_send_found(
op_manager,
incoming_tx,
instance_id,
upstream_addr,
key,
state,
contract,
)
.await?;
return Ok(());
}
let mut tried: Vec<SocketAddr> = Vec::new();
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
tried.push(own_addr);
}
tried.push(upstream_addr);
let mut retries: usize = 0;
loop {
let (peer, peer_addr) = match relay_advance_to_next_peer(
op_manager,
&instance_id,
&mut tried,
&mut retries,
&new_visited,
) {
Some(p) => p,
None => {
if let Some((key, state, contract)) = local_fallback {
tracing::info!(
tx = %incoming_tx,
%instance_id,
contract = %key,
"GET relay (task-per-tx): all peers exhausted — serving local fallback"
);
cache_contract_locally(op_manager, key, state.clone(), contract.clone(), false)
.await;
relay_send_found(
op_manager,
incoming_tx,
instance_id,
upstream_addr,
key,
state,
contract,
)
.await?;
} else {
tracing::warn!(
tx = %incoming_tx,
%instance_id,
%upstream_addr,
phase = "not_found",
"GET relay (task-per-tx): all peers exhausted — sending NotFound upstream"
);
if let Some(event) = crate::tracing::NetEventLog::get_not_found(
&incoming_tx,
&op_manager.ring,
instance_id,
None,
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
relay_send_not_found(op_manager, incoming_tx, instance_id, upstream_addr).await;
}
return Ok(());
}
};
tracing::debug!(
tx = %incoming_tx,
%instance_id,
target = %peer,
target_addr = %peer_addr,
phase = "forward",
"GET relay (task-per-tx): forwarding request to downstream peer"
);
let new_htl = htl.saturating_sub(1);
if let Some(event) = crate::tracing::NetEventLog::get_request(
&incoming_tx,
&op_manager.ring,
instance_id,
peer.clone(),
new_htl,
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
let request = NetMessage::from(GetMsg::Request {
id: incoming_tx,
instance_id,
fetch_contract,
htl: new_htl,
visited: new_visited.clone(),
subscribe,
});
let mut ctx = op_manager.op_ctx(incoming_tx);
let round_trip =
tokio::time::timeout(OPERATION_TTL, ctx.send_to_and_await(peer_addr, request)).await;
op_manager.release_pending_op_slot(incoming_tx).await;
let reply = match round_trip {
Ok(Ok(reply)) => reply,
Ok(Err(err)) => {
tracing::warn!(
tx = %incoming_tx,
target = %peer,
error = %err,
"GET relay (task-per-tx): send_to_and_await failed; advancing to next peer"
);
new_visited.mark_visited(peer_addr);
continue;
}
Err(_elapsed) => {
tracing::warn!(
tx = %incoming_tx,
target = %peer,
timeout_secs = OPERATION_TTL.as_secs(),
"GET relay (task-per-tx): attempt timed out; advancing to next peer"
);
new_visited.mark_visited(peer_addr);
continue;
}
};
match classify(reply) {
AttemptOutcome::Terminal(Terminal::InlineFound {
key,
state,
contract,
}) => {
tracing::info!(
tx = %incoming_tx,
%instance_id,
contract = %key,
phase = "relay_found",
"GET relay (task-per-tx): downstream returned Found — caching locally and bubbling upstream"
);
cache_contract_locally(op_manager, key, state.clone(), contract.clone(), false)
.await;
relay_send_found(
op_manager,
incoming_tx,
instance_id,
upstream_addr,
key,
state,
contract,
)
.await?;
return Ok(());
}
AttemptOutcome::Terminal(Terminal::Streaming { .. }) => {
tracing::warn!(
tx = %incoming_tx,
%instance_id,
target = %peer,
"GET relay (task-per-tx): downstream returned ResponseStreaming — \
streaming relay forwarding not yet implemented (port plan §7); \
trying next peer"
);
new_visited.mark_visited(peer_addr);
continue;
}
AttemptOutcome::Terminal(Terminal::LocalCompletion) => {
tracing::warn!(
tx = %incoming_tx,
%instance_id,
"GET relay (task-per-tx): unexpected LocalCompletion (Request-echo) — trying next peer"
);
new_visited.mark_visited(peer_addr);
continue;
}
AttemptOutcome::Retry => {
tracing::debug!(
tx = %incoming_tx,
target = %peer,
"GET relay (task-per-tx): downstream returned NotFound; advancing to next peer"
);
new_visited.mark_visited(peer_addr);
continue;
}
AttemptOutcome::Unexpected => {
tracing::warn!(
tx = %incoming_tx,
target = %peer,
"GET relay (task-per-tx): unexpected reply variant; advancing to next peer"
);
new_visited.mark_visited(peer_addr);
continue;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn dummy_key() -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([1u8; 32]), CodeHash::new([2u8; 32]))
}
fn dummy_tx() -> Transaction {
Transaction::new::<GetMsg>()
}
#[test]
fn classify_response_found_is_inline_terminal() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
id: tx,
instance_id: *key.id(),
result: GetMsgResult::Found {
key,
value: StoreResponse {
state: Some(WrappedState::new(vec![1u8])),
contract: None,
},
},
}));
assert!(matches!(
classify(msg),
AttemptOutcome::Terminal(Terminal::InlineFound { .. })
));
}
#[test]
fn classify_response_notfound_is_retry() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
id: tx,
instance_id: *key.id(),
result: GetMsgResult::NotFound,
}));
assert!(matches!(classify(msg), AttemptOutcome::Retry));
}
#[test]
fn classify_response_streaming_is_streaming_terminal() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreaming {
id: tx,
instance_id: *key.id(),
stream_id: crate::transport::peer_connection::StreamId::next(),
key,
total_size: 1024,
includes_contract: true,
}));
assert!(matches!(
classify(msg),
AttemptOutcome::Terminal(Terminal::Streaming { .. })
));
}
#[test]
fn classify_forwarding_ack_is_unexpected() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ForwardingAck {
id: tx,
instance_id: *key.id(),
}));
assert!(
matches!(classify(msg), AttemptOutcome::Unexpected),
"ForwardingAck must NOT be classified as terminal (Phase 2b bug 2)"
);
}
#[test]
fn classify_response_streaming_ack_is_unexpected() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreamingAck {
id: tx,
stream_id: crate::transport::peer_connection::StreamId::next(),
}));
assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
}
#[test]
fn classify_request_echo_is_local_completion() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Request {
id: tx,
instance_id: *key.id(),
fetch_contract: true,
htl: 5,
visited: VisitedPeers::new(&tx),
subscribe: false,
}));
assert!(matches!(
classify(msg),
AttemptOutcome::Terminal(Terminal::LocalCompletion)
));
}
#[test]
fn classify_response_found_without_state_is_unexpected() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
id: tx,
instance_id: *key.id(),
result: GetMsgResult::Found {
key,
value: StoreResponse {
state: None,
contract: None,
},
},
}));
assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
}
#[test]
fn classify_unexpected_for_non_get_message() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Aborted(tx));
assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
}
#[test]
fn max_retries_boundary_exhausts_at_limit() {
let mut retries: usize = 0;
for _ in 0..MAX_RETRIES {
assert!(retries < MAX_RETRIES, "should not exhaust before limit");
retries += 1;
}
assert!(
retries >= MAX_RETRIES,
"should exhaust at MAX_RETRIES={MAX_RETRIES}"
);
}
#[test]
fn driver_outcome_exhausted_produces_client_error() {
let cause = "GET to contract failed after 3 attempts".to_string();
let outcome: DriverOutcome = match RetryLoopOutcome::<()>::Exhausted(cause) {
RetryLoopOutcome::Exhausted(cause) => {
DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into()))
}
RetryLoopOutcome::Done(_)
| RetryLoopOutcome::Unexpected
| RetryLoopOutcome::InfraError(_) => unreachable!(),
};
assert!(
matches!(outcome, DriverOutcome::Publish(Err(_))),
"Exhaustion must produce a client error, not be swallowed"
);
}
fn production_source() -> &'static str {
const FULL: &str = include_str!("op_ctx_task.rs");
let cutoff = FULL
.find("#[cfg(test)]")
.expect("file must have a #[cfg(test)] section");
&FULL[..cutoff]
}
fn extract_fn_body<'a>(source: &'a str, signature_prefix: &str) -> &'a str {
let start = source
.find(signature_prefix)
.unwrap_or_else(|| panic!("could not find {signature_prefix}"));
let brace = source[start..].find('{').expect("fn sig must have body");
let body_start = start + brace + 1;
let bytes = source.as_bytes();
let mut depth: i32 = 1;
let mut i = body_start;
while i < bytes.len() {
match bytes[i] {
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
return &source[body_start..i];
}
}
_ => {}
}
i += 1;
}
panic!("unterminated fn body for {signature_prefix}");
}
#[test]
fn cache_contract_locally_has_state_matches_short_circuit() {
let src = production_source();
let body = extract_fn_body(src, "async fn cache_contract_locally(");
let get_pos = body
.find("ContractHandlerEvent::GetQuery")
.unwrap_or(usize::MAX);
let put_pos = body
.find("ContractHandlerEvent::PutQuery")
.unwrap_or(usize::MAX);
let has_byte_compare = body.contains("as_ref() ==") || body.contains("state_matches");
let has_short_circuit = get_pos < put_pos && has_byte_compare;
assert!(
has_short_circuit,
"cache_contract_locally is missing the state_matches idempotency \
short-circuit from the legacy Response{{Found}} branch \
(get.rs:2218-2241). Without it the driver re-invokes PutQuery \
on identical state — regressing issue #2018 for contracts \
that enforce idempotency in update_state()."
);
}
#[test]
fn cache_contract_locally_runs_side_effects_on_put_error() {
let src = production_source();
let body = extract_fn_body(src, "async fn cache_contract_locally(");
let err_arm = body
.find("new_value: Err(")
.expect("PutResponse Err arm must exist");
let side_effect = body
.find("record_get_access")
.expect("record_get_access must be called");
assert!(
side_effect > err_arm,
"record_get_access must run AFTER the PutResponse match \
(outside both Ok and Err arms) so hosting LRU/TTL refresh on \
any successful wire-level GET — including when the local \
executor rejects the PutQuery. The legacy branch at \
get.rs:2420-2435 continues these side effects on error; \
the driver must match."
);
}
#[test]
fn driver_calls_auto_subscribe_on_get_response() {
let src = production_source();
assert!(
src.contains("auto_subscribe_on_get_response"),
"The driver must invoke `auto_subscribe_on_get_response` on \
successful GET terminal paths (AUTO_SUBSCRIBE_ON_GET = true in \
ring.rs:60). The legacy branch does this at get.rs:2313/2408/3136/3185; \
the driver must mirror it so client GETs with subscribe=false \
still register the fallback subscription."
);
}
#[test]
fn record_op_result_reflects_host_result_outcome() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let done_arm_start = SOURCE
.find("RetryLoopOutcome::Done(")
.expect("Done arm must exist");
let next_arm = SOURCE[done_arm_start..]
.find("RetryLoopOutcome::Exhausted")
.expect("Exhausted arm must follow");
let arm = &SOURCE[done_arm_start..done_arm_start + next_arm];
let call_pos = arm
.find("record_op_result")
.expect("record_op_result must be called in Done arm");
let tail = &arm[call_pos..];
let call_window = &tail[..tail.len().min(200)];
let looks_unconditional = call_window.contains("true,") && !call_window.contains("is_ok()");
assert!(
!looks_unconditional,
"record_op_result in the Done arm is passed an unconditional \
`true`. The success flag must track `host_result.is_ok()` so \
telemetry does not diverge from the client-visible outcome. \
Call window: {call_window}"
);
}
#[test]
fn driver_hardcodes_fetch_contract_true_per_issue_3757() {
let src = production_source();
let build_body = extract_fn_body(
src,
"fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {",
);
assert!(
build_body.contains("fetch_contract: true,"),
"GetMsg::Request.fetch_contract must stay hard-coded `true` — \
the node needs WASM for local validation/hosting regardless of \
the client's return_contract_code preference (issue #3757 / \
get.rs:52-55)."
);
}
#[test]
fn streaming_terminal_calls_assemble_and_cache_stream() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_client_get_inner(");
let arm = body
.find("Terminal::Streaming {")
.expect("Done arm must handle Terminal::Streaming");
let tail = &body[arm..];
let arm_end = tail[1..]
.find("Terminal::")
.map(|p| p + 1)
.unwrap_or(tail.len());
let arm_body = &tail[..arm_end];
assert!(
arm_body.contains("assemble_and_cache_stream"),
"Terminal::Streaming arm of drive_client_get_inner must call \
`assemble_and_cache_stream`. Without this, cold-cache streaming \
GETs return OperationError because nothing writes the local \
store. See bug #1 in PR #3884 review."
);
}
#[test]
fn streaming_payload_round_trips_via_bincode() {
let key = dummy_key();
let state_bytes = vec![0x42u8; 512];
let payload = GetStreamingPayload {
key,
value: StoreResponse {
state: Some(WrappedState::new(state_bytes.clone())),
contract: None,
},
};
let encoded = bincode::serialize(&payload).expect("bincode encode");
let decoded: GetStreamingPayload = bincode::deserialize(&encoded).expect("bincode decode");
assert_eq!(decoded.key, key);
assert_eq!(
decoded.value.state.as_ref().map(|s| s.as_ref().to_vec()),
Some(state_bytes),
"state bytes must round-trip through the streaming payload"
);
}
#[test]
fn assemble_and_cache_stream_performs_claim_assemble_key_check() {
let src = production_source();
let body = extract_fn_body(src, "async fn assemble_and_cache_stream(");
assert!(
body.contains("orphan_stream_registry") && body.contains("claim_or_wait"),
"assemble_and_cache_stream must claim the stream via \
orphan_stream_registry().claim_or_wait()"
);
assert!(
body.contains(".assemble()") && body.contains(".await"),
"assemble_and_cache_stream must await stream assembly"
);
assert!(
body.contains("GetStreamingPayload") && body.contains("bincode::deserialize"),
"assemble_and_cache_stream must deserialize the payload \
as GetStreamingPayload"
);
assert!(
body.contains("payload.key != expected_key"),
"assemble_and_cache_stream must verify the stream payload's \
key matches the expected ContractKey — a mismatch would \
silently cache the wrong contract under the expected key"
);
assert!(
body.contains("cache_contract_locally"),
"assemble_and_cache_stream must delegate the actual write \
and hosting side effects to cache_contract_locally"
);
}
#[test]
fn maybe_subscribe_child_short_circuits_on_false() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let fn_start = SOURCE
.find("async fn maybe_subscribe_child(")
.expect("maybe_subscribe_child must exist");
let body = &SOURCE[fn_start..];
body.find("if !subscribe {")
.expect("maybe_subscribe_child must short-circuit on !subscribe");
}
#[test]
fn relay_driver_htl_zero_guard_precedes_retry_loop() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let htl_guard = body
.find("if htl == 0")
.expect("drive_relay_get_inner must have an `if htl == 0` guard");
let retry_loop = body
.find("relay_advance_to_next_peer")
.expect("drive_relay_get_inner must call relay_advance_to_next_peer");
assert!(
htl_guard < retry_loop,
"HTL=0 guard must appear BEFORE the retry loop; \
otherwise HTL exhaustion enters the retry loop and forwards unnecessarily"
);
}
#[test]
fn relay_driver_local_cache_check_precedes_retry_loop() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let local_check = body
.find("check_local_with_interest_gate")
.expect("drive_relay_get_inner must call check_local_with_interest_gate");
let retry_loop = body
.find("relay_advance_to_next_peer")
.expect("drive_relay_get_inner must call relay_advance_to_next_peer");
assert!(
local_check < retry_loop,
"Local-cache check must appear BEFORE the retry loop; \
otherwise a hosting relay forwards instead of answering immediately"
);
}
#[test]
fn check_local_with_interest_gate_has_interest_check() {
let src = production_source();
let body = extract_fn_body(src, "async fn check_local_with_interest_gate(");
assert!(
body.contains("has_local_interest"),
"check_local_with_interest_gate must call `has_local_interest` to gate \
whether stale cache is served immediately or deferred to network"
);
assert!(
body.contains("None, Some("),
"check_local_with_interest_gate must return (None, Some(fallback)) \
when there is local state but no active interest"
);
}
#[test]
fn relay_driver_caches_locally_on_found_response() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let found_arm = body
.find("Terminal::InlineFound {")
.expect("drive_relay_get_inner must handle Terminal::InlineFound");
let tail = &body[found_arm..];
let cache_pos = tail.find("cache_contract_locally").unwrap_or(usize::MAX);
let send_pos = tail.find("relay_send_found").unwrap_or(usize::MAX);
assert!(
cache_pos < send_pos,
"cache_contract_locally must be called BEFORE relay_send_found \
in the InlineFound arm — the relay must cache the contract before \
bubbling the response upstream"
);
}
#[test]
fn relay_driver_exhaustion_sends_not_found_upstream() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
assert!(
body.contains("relay_send_not_found"),
"drive_relay_get_inner must call `relay_send_not_found` on exhaustion"
);
}
#[test]
fn relay_driver_does_not_send_forwarding_ack() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
assert!(
!body.contains("relay_send_forwarding_ack"),
"drive_relay_get_inner must NOT call relay_send_forwarding_ack \
— the ack collided with upstream's attempt_tx waiter and \
amplified spawns (workflow 24600634908: 6.8M spawns, 63GB RSS)"
);
assert!(
!body.contains("GetMsg::ForwardingAck {"),
"drive_relay_get_inner must not construct GetMsg::ForwardingAck — \
dropped to break the spawn-amplification cycle"
);
}
#[test]
fn relay_advance_uses_visited_bloom_filter_as_skip_list() {
let src = production_source();
let body = extract_fn_body(src, "fn relay_advance_to_next_peer(");
assert!(
body.contains("k_closest_potentially_hosting"),
"relay_advance_to_next_peer must use k_closest_potentially_hosting"
);
assert!(
body.contains("new_visited"),
"relay_advance_to_next_peer must pass new_visited as the skip list \
so the upstream's VisitedPeers is respected"
);
}
#[test]
fn relay_driver_reuses_classify_function() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
assert!(
body.contains("classify(reply)"),
"drive_relay_get_inner must call `classify(reply)` to classify \
downstream replies — reusing the client driver's classify avoids \
duplicate terminal-variant handling"
);
}
#[test]
fn relay_entry_point_no_longer_annotated_dead_code_after_commit2() {
let src = production_source();
let fn_start = src
.find("pub(crate) async fn start_relay_get(")
.expect("start_relay_get must exist");
let window_start = fn_start.saturating_sub(500);
let window = &src[window_start..fn_start];
assert!(
!window.contains("#[allow(dead_code)]"),
"start_relay_get must NOT be annotated with #[allow(dead_code)] in commit 2 \
— the driver is now live (wired in node.rs dispatch). Remove the attribute \
to keep dead-code warnings effective."
);
}
#[test]
fn dispatch_gate_loopback_source_addr_none_uses_legacy() {
const NODE_RS: &str = include_str!("../../node.rs");
let dispatch_start = NODE_RS
.find("// #1454 phase 5 / #3883: relay GET task-per-tx dispatch.")
.expect("relay dispatch comment anchor must exist in node.rs");
let dispatch_end = NODE_RS[dispatch_start..]
.find("let op_result = handle_op_request::<get::GetOp, _>")
.expect("legacy fallthrough anchor must follow relay dispatch");
let block = &NODE_RS[dispatch_start..dispatch_start + dispatch_end];
assert!(
block.contains("if let Some(upstream_addr) = source_addr"),
"Relay dispatch block must gate on `source_addr.is_some()` so that \
originator loopback (source_addr=None from phase-3b client driver) \
falls through to the legacy `handle_op_request` path. Without this \
gate, the Request-echo contract `drive_client_get_inner::classify` \
relies on for Terminal::LocalCompletion breaks. See port plan §2."
);
}
#[test]
fn dispatch_gate_existing_get_op_uses_legacy() {
const NODE_RS: &str = include_str!("../../node.rs");
let dispatch_start = NODE_RS
.find("// #1454 phase 5 / #3883: relay GET task-per-tx dispatch.")
.expect("relay dispatch comment anchor must exist in node.rs");
let dispatch_end = NODE_RS[dispatch_start..]
.find("let op_result = handle_op_request::<get::GetOp, _>")
.expect("legacy fallthrough anchor must follow relay dispatch");
let block = &NODE_RS[dispatch_start..dispatch_start + dispatch_end];
assert!(
block.contains("!op_manager.has_get_op(id)"),
"Relay dispatch block must gate on `!op_manager.has_get_op(id)` so that \
GC-spawned retries and `start_targeted_op` (UPDATE auto-fetch) continue \
through the legacy `handle_op_request` path. Without this gate, the \
relay driver would hijack transactions the legacy state machine owns."
);
}
#[test]
fn dispatch_gate_ordering_bypass_before_relay_before_legacy() {
const NODE_RS: &str = include_str!("../../node.rs");
let get_arm = NODE_RS
.find("NetMessageV1::Get(ref op) => {")
.expect("Get arm must exist in handle_pure_network_message_v1");
let tail = &NODE_RS[get_arm..];
let bypass_pos = tail
.find("try_forward_task_per_tx_reply")
.expect("phase-3b bypass must exist in Get arm");
let relay_pos = tail
.find("get::op_ctx_task::start_relay_get")
.expect("phase-5 relay dispatch must exist in Get arm");
let legacy_pos = tail
.find("handle_op_request::<get::GetOp, _>")
.expect("legacy fallthrough must exist in Get arm");
assert!(
bypass_pos < relay_pos && relay_pos < legacy_pos,
"Dispatch ordering in the Get arm must be: \
try_forward_task_per_tx_reply (phase-3b terminal bypass) \
THEN start_relay_get (phase-5 relay dispatch) \
THEN handle_op_request (legacy loopback + has_get_op fallthrough). \
Got bypass={bypass_pos}, relay={relay_pos}, legacy={legacy_pos}."
);
}
#[test]
fn htl_zero_guard_emits_not_found_upstream_frame() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let guard_start = body.find("if htl == 0").expect("htl==0 guard must exist");
let after_guard = &body[guard_start..];
let body_open = after_guard
.find('{')
.expect("htl==0 guard must have a body");
let bytes = after_guard.as_bytes();
let mut depth: i32 = 1;
let mut i = body_open + 1;
while i < bytes.len() && depth > 0 {
match bytes[i] {
b'{' => depth += 1,
b'}' => depth -= 1,
_ => {}
}
i += 1;
}
let guard_block = &after_guard[body_open..i];
assert!(
guard_block.contains("relay_send_not_found"),
"HTL=0 guard block must call `relay_send_not_found(...)` so the \
upstream gets a prompt NotFound frame instead of hanging until \
OPERATION_TTL. Guard body was: {guard_block}"
);
assert!(
guard_block.contains("upstream_addr"),
"HTL=0 NotFound must be sent to `upstream_addr`, not any other \
peer. Guard body was: {guard_block}"
);
assert!(
guard_block.contains("return Ok(())"),
"HTL=0 guard must return early after sending NotFound; otherwise \
execution would fall into the retry loop with htl=0."
);
}
#[test]
fn exhaustion_branches_on_local_fallback_presence() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let none_arm = body
.find("None => {")
.expect("exhaustion arm (`None => {{`) must exist");
let tail = &body[none_arm..];
let clip = tail
.find("return Ok(());")
.expect("exhaustion branch must end with `return Ok(());`");
let arm = &tail[..clip + "return Ok(());".len()];
assert!(
arm.contains("if let Some((key, state, contract)) = local_fallback"),
"Exhaustion arm must branch on `local_fallback`; otherwise the \
stale-cache fallback semantic (R3d → R10) is lost."
);
assert!(
arm.contains("relay_send_found"),
"Exhaustion arm must call `relay_send_found` when `local_fallback` \
is Some — the stale-cache relay serves what it has after all \
downstream peers NotFound."
);
assert!(
arm.contains("relay_send_not_found"),
"Exhaustion arm must call `relay_send_not_found` when \
`local_fallback` is None — the non-caching relay bubbles \
NotFound upstream."
);
let found_pos = arm.find("relay_send_found").unwrap();
let not_found_pos = arm.find("relay_send_not_found").unwrap();
assert!(
found_pos < not_found_pos,
"Fallback-Found must precede NotFound in the exhaustion arm \
source order; a reversal would mean NotFound always runs first \
and the fallback path is dead code."
);
}
#[test]
fn interest_gate_returns_fallback_when_not_actively_hosting() {
let src = production_source();
let body = extract_fn_body(src, "async fn check_local_with_interest_gate(");
let neg_gate = body
.find("if !op_manager.interest_manager.has_local_interest(&key)")
.expect("interest gate must check `!has_local_interest` branch explicitly");
let tail = &body[neg_gate..];
let stale_pos = tail.find("(None, Some(").expect("stale branch must exist");
let active_pos = tail.find("(Some(").expect("active branch must exist");
assert!(
stale_pos < active_pos,
"`if !has_local_interest` arm (stale → fallback slot) must appear \
before the `else` arm (active → local_value slot) in source order."
);
}
#[test]
fn forwarded_request_decrements_htl_and_propagates_visited() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
assert!(
body.contains("let new_htl = htl.saturating_sub(1);"),
"Retry loop must compute `new_htl = htl.saturating_sub(1)` so \
the forwarded Request carries one-less HTL — otherwise the \
downstream relay would forward with the same HTL and loops \
could form."
);
let request_build = body
.find("NetMessage::from(GetMsg::Request")
.expect("retry loop must build the forwarded GetMsg::Request");
let window = &body[request_build..(request_build + 400).min(body.len())];
assert!(
window.contains("htl: new_htl"),
"Forwarded GetMsg::Request must set `htl: new_htl` (not `htl`) so \
the decremented value actually propagates on the wire. Window: {window}"
);
assert!(
window.contains("visited: new_visited.clone()"),
"Forwarded GetMsg::Request must set `visited: new_visited.clone()` \
so the downstream sees the updated skip set — without this, \
the downstream could forward back to peers the upstream already \
tried, forming routing loops. Window: {window}"
);
assert!(
window.contains("id: incoming_tx"),
"Forwarded GetMsg::Request must reuse `id: incoming_tx` (legacy \
end-to-end tx preservation). Minting a fresh tx per retry (the \
original phase-5 implementation) caused downstream to treat each \
retry as a brand-new Request and spawn a fresh relay subtree, \
producing 7M spawns in 100s of ci-fault-loss and 63GB RSS."
);
}
#[test]
fn visited_bloom_seeded_with_own_and_upstream_before_loop() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let loop_start = body.find("loop {").expect("retry loop must exist");
let pre_loop = &body[..loop_start];
assert!(
pre_loop.contains("new_visited.mark_visited(upstream_addr);"),
"`new_visited.mark_visited(upstream_addr)` must run BEFORE the \
retry loop so downstream routing can't immediately hand back \
to upstream."
);
assert!(
pre_loop.contains("new_visited.mark_visited(own_addr);"),
"`new_visited.mark_visited(own_addr)` must run BEFORE the retry \
loop so we don't route back to ourselves."
);
}
#[test]
fn drive_relay_get_sends_compensating_not_found_on_inner_err() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get(");
let err_arm = body
.find("Err(err) =>")
.expect("drive_relay_get must have an Err arm");
let tail = &body[err_arm..];
assert!(
tail.contains("relay_send_not_found"),
"`drive_relay_get` Err arm must call `relay_send_not_found` so \
that an inner infrastructure error (including `relay_send_found` \
failures surfaced via `?`) produces a compensating NotFound \
upstream instead of a silent 60s upstream timeout."
);
assert!(
tail.contains("upstream_addr"),
"The compensating NotFound in `drive_relay_get` must target \
`upstream_addr`, not any other peer."
);
assert!(
tail.contains("Err(err)"),
"`drive_relay_get` Err arm must re-raise `Err(err)` after \
sending the compensating NotFound, so `run_relay_get`'s \
infra-error log still fires."
);
}
#[test]
fn relay_send_found_maps_send_failure_to_op_error() {
let src = production_source();
let body = extract_fn_body(src, "async fn relay_send_found(");
assert!(
body.contains("map_err(|_| OpError::NotificationError)"),
"`relay_send_found` must map fire-and-forget send errors to \
`OpError::NotificationError` so `?` surfaces them to the outer \
`drive_relay_get` error funnel and triggers the compensating \
NotFound."
);
}
#[test]
fn streaming_downstream_is_currently_warned_and_skipped() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let streaming_arm_start = body
.find("AttemptOutcome::Terminal(Terminal::Streaming { .. })")
.expect("Streaming arm must exist in relay driver");
let tail = &body[streaming_arm_start..];
let clip = tail[1..]
.find("AttemptOutcome::")
.map(|p| p + 1)
.unwrap_or(tail.len());
let arm = &tail[..clip];
assert!(
arm.contains("continue;"),
"Streaming arm must `continue;` to try the next peer — streaming \
relay forwarding is out of scope for #3883 (see port plan §7). \
A future PR will replace this with a proper chunk pipe-through."
);
assert!(
arm.contains("new_visited.mark_visited(peer_addr)"),
"Streaming arm must mark the streaming peer as visited so the \
retry loop doesn't re-select it next iteration."
);
assert!(
!arm.contains("relay_send_found"),
"Streaming arm must NOT call `relay_send_found` — doing so \
without the chunk payload would send a Found frame with \
`state: None` (Unexpected at the upstream classify). See R12b \
gap in port plan risk register."
);
assert!(
!arm.contains("orphan_stream_registry"),
"Streaming arm must NOT claim the stream via \
`orphan_stream_registry` — that's the migrated behavior for a \
follow-up PR. Doing it here without the upstream pipe would \
leak stream state."
);
}
#[test]
fn send_and_await_failure_arms_continue_to_next_peer() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let round_trip = body
.find("let reply = match round_trip {")
.expect("round_trip match must exist");
let tail = &body[round_trip..];
let clip = tail
.find("match classify(reply)")
.expect("classify match must follow round_trip match");
let match_body = &tail[..clip];
let err_arm = match_body
.find("Ok(Err(err)) =>")
.expect("channel-error arm must exist");
let err_tail = &match_body[err_arm..];
let err_arm_end = err_tail
.find("Err(_elapsed) =>")
.expect("timeout arm must follow channel-error arm");
let err_block = &err_tail[..err_arm_end];
assert!(
err_block.contains("continue;"),
"`Ok(Err(_))` arm must `continue;` so the loop tries the next \
peer. A regression that returns or breaks here would abandon \
remaining candidates."
);
assert!(
err_block.contains("mark_visited(peer_addr)"),
"`Ok(Err(_))` arm must mark the failing peer as visited so the \
next `relay_advance_to_next_peer` call doesn't re-select it."
);
let timeout_arm = match_body
.find("Err(_elapsed) =>")
.expect("timeout arm must exist");
let timeout_tail = &match_body[timeout_arm..];
let timeout_end = timeout_tail.rfind("};").unwrap_or(timeout_tail.len());
let timeout_block = &timeout_tail[..timeout_end];
assert!(
timeout_block.contains("continue;"),
"`Err(_elapsed)` (timeout) arm must `continue;` so the loop \
tries the next peer. A regression that returns here would \
mean any slow peer permanently blocks the relay."
);
assert!(
timeout_block.contains("mark_visited(peer_addr)"),
"`Err(_elapsed)` (timeout) arm must mark the timing-out peer \
as visited; otherwise the retry loop might re-select it and \
time out again."
);
}
#[test]
fn dispatch_gate_is_per_transaction_not_per_contract() {
const NODE_RS: &str = include_str!("../../node.rs");
let dispatch_start = NODE_RS
.find("// #1454 phase 5 / #3883: relay GET task-per-tx dispatch.")
.expect("relay dispatch comment anchor must exist in node.rs");
let dispatch_end = NODE_RS[dispatch_start..]
.find("let op_result = handle_op_request::<get::GetOp, _>")
.expect("legacy fallthrough anchor must follow relay dispatch");
let block = &NODE_RS[dispatch_start..dispatch_start + dispatch_end];
assert!(
block.contains("has_get_op(id)"),
"Dispatch gate must be keyed on `has_get_op(id)` (per-transaction), \
not `instance_id` / contract key (per-key). Per-key dedup would \
break the upstream-reply invariant: two upstreams GETting the \
same contract need independent replies, each on its own tx."
);
assert!(
!block.contains("has_get_op(instance_id)")
&& !block.contains("has_get_op(contract_id)"),
"Dispatch block must not gate on contract key (per-key dedup). \
Found a suspicious has_get_op variant: {block}"
);
}
#[test]
fn cache_contract_locally_gates_mark_local_client_access_on_requester() {
let src = production_source();
let sig_start = src
.find("async fn cache_contract_locally(")
.expect("cache_contract_locally must exist");
let sig_window = &src[sig_start..(sig_start + 400).min(src.len())];
assert!(
sig_window.contains("is_client_requester: bool"),
"`cache_contract_locally` must accept `is_client_requester: bool` \
so relay callsites can opt out of the sticky local_client_access \
flag. Sig window: {sig_window}"
);
let body = extract_fn_body(src, "async fn cache_contract_locally(");
let gate_pos = body
.find("if is_client_requester {")
.expect("mark_local_client_access must be gated on is_client_requester");
let mark_pos = body
.find("mark_local_client_access")
.expect("mark_local_client_access must still be called under the gate");
assert!(
gate_pos < mark_pos,
"`if is_client_requester {{` must precede the \
`mark_local_client_access` call; otherwise the gate is \
structurally useless."
);
let guarded_snippet =
"if is_client_requester {\n op_manager.ring.mark_local_client_access";
assert!(
body.contains(guarded_snippet),
"The `mark_local_client_access` call must be directly inside the \
`if is_client_requester {{ ... }}` block. Current body structure \
may have split the guard from the call.\n{body}"
);
}
#[test]
fn all_relay_callsites_pass_is_client_requester_false() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_relay_get_inner(");
let callsites: Vec<usize> = body
.match_indices("cache_contract_locally(")
.map(|(i, _)| i)
.collect();
assert_eq!(
callsites.len(),
3,
"drive_relay_get_inner should have exactly three \
cache_contract_locally callsites (R4 immediate, R10 fallback, \
R12a bubble-up). Found {} — a refactor has changed the shape.",
callsites.len(),
);
for (idx, &pos) in callsites.iter().enumerate() {
let tail = &body[pos..];
let mut depth: i32 = 0;
let mut end: Option<usize> = None;
for (i, c) in tail.char_indices() {
match c {
'(' => depth += 1,
')' => {
depth -= 1;
if depth == 0 {
end = Some(i);
break;
}
}
_ => {}
}
}
let end = end.unwrap_or_else(|| {
panic!("unterminated cache_contract_locally call at relay callsite #{idx}")
});
let call = &tail[..=end];
let normalized: String = call.chars().filter(|c| !c.is_whitespace()).collect();
assert!(
normalized.ends_with("false,).await") || normalized.ends_with("false)"),
"Relay callsite #{idx} of cache_contract_locally must pass \
`false` for is_client_requester (relay is not the client). \
Call text: {call}"
);
}
}
#[test]
fn client_driver_and_streaming_callsites_pass_is_client_requester_true() {
let src = production_source();
let drive_body = extract_fn_body(src, "async fn drive_client_get_inner(");
let drive_call_pos = drive_body
.find("cache_contract_locally(")
.expect("client driver must cache_contract_locally on InlineFound");
let drive_window =
&drive_body[drive_call_pos..(drive_call_pos + 400).min(drive_body.len())];
let drive_normalized: String = drive_window
.chars()
.take_while(|&c| c != ';')
.filter(|c| !c.is_whitespace())
.collect();
assert!(
drive_normalized.contains("true"),
"Client driver's cache_contract_locally call must pass `true` for \
is_client_requester (this node initiated the GET). Call window: \
{drive_window}"
);
let stream_body = extract_fn_body(src, "async fn assemble_and_cache_stream(");
let stream_call_pos = stream_body
.find("cache_contract_locally(")
.expect("assemble_and_cache_stream must cache_contract_locally");
let stream_window =
&stream_body[stream_call_pos..(stream_call_pos + 400).min(stream_body.len())];
let stream_normalized: String = stream_window
.chars()
.take_while(|&c| c != ';')
.filter(|c| !c.is_whitespace())
.collect();
assert!(
stream_normalized.contains("true"),
"assemble_and_cache_stream must pass `true` for is_client_requester \
(stream is only claimed by the client-originating driver). Call \
window: {stream_window}"
);
}
#[test]
fn cache_contract_locally_docstring_cites_legacy_mirror() {
let src = production_source();
let fn_pos = src
.find("async fn cache_contract_locally(")
.expect("cache_contract_locally must exist");
let window_start = fn_pos.saturating_sub(1500);
let window = &src[window_start..fn_pos];
assert!(
window.contains("is_client_requester"),
"cache_contract_locally docstring must describe the \
is_client_requester parameter so callers know which value to pass."
);
assert!(
window.contains("mark_local_client_access"),
"cache_contract_locally docstring must name the legacy call \
(`mark_local_client_access`) it gates on is_client_requester."
);
}
#[test]
fn relay_driver_call_count_incremented_on_entry() {
let src = production_source();
assert!(
src.contains("pub static RELAY_DRIVER_CALL_COUNT"),
"RELAY_DRIVER_CALL_COUNT static must be declared for behavioral \
tests to verify the dispatch gate routes through the relay driver."
);
let body = extract_fn_body(src, "pub(crate) async fn start_relay_get(");
assert!(
body.contains("RELAY_DRIVER_CALL_COUNT.fetch_add(1"),
"`start_relay_get` must increment `RELAY_DRIVER_CALL_COUNT` at \
entry so integration tests can assert dispatch-gate coverage."
);
let counter_pos = body.find("RELAY_DRIVER_CALL_COUNT.fetch_add").unwrap();
let pre = &body[counter_pos.saturating_sub(200)..counter_pos];
assert!(
pre.contains("#[cfg(any(test, feature = \"testing\"))]"),
"`RELAY_DRIVER_CALL_COUNT` increment must be gated behind \
`#[cfg(any(test, feature = \"testing\"))]` so it doesn't ship \
in release builds."
);
}
}