use std::collections::HashSet;
use std::sync::Arc;
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::ContractInstanceId;
use crate::client_events::HostResult;
use crate::config::{GlobalExecutor, OPERATION_TTL};
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::OpManager;
use crate::operations::{OpError, VisitedPeers};
use crate::ring::{PeerKeyLocation, RingError};
use super::{
InitialRequest, MAX_BREADTH, MAX_RETRIES, SubscribeMsg, SubscribeMsgResult,
complete_local_subscription, prepare_initial_request, register_downstream_subscriber,
};
pub(crate) async fn start_client_subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_tx: Transaction,
) -> Result<Transaction, OpError> {
tracing::debug!(
tx = %client_tx,
contract = %instance_id,
"subscribe (task-per-tx): spawning client-initiated task"
);
GlobalExecutor::spawn(run_client_subscribe(op_manager, instance_id, client_tx));
Ok(client_tx)
}
pub(crate) async fn run_client_subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_tx: Transaction,
) {
let outcome = drive_client_subscribe(
op_manager.clone(),
instance_id,
client_tx,
false,
)
.await;
deliver_outcome(&op_manager, client_tx, instance_id, outcome);
}
#[derive(Debug)]
pub(crate) enum RenewalOutcome {
Success,
Failed { reason: String },
ChannelCongestion,
}
pub(crate) async fn run_renewal_subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
renewal_tx: Transaction,
) -> RenewalOutcome {
let result = drive_client_subscribe_inner(
&op_manager,
instance_id,
renewal_tx,
true,
)
.await;
classify_renewal_result(result)
}
fn classify_renewal_result(result: Result<DriverOutcome, OpError>) -> RenewalOutcome {
let infra_err = match result {
Ok(DriverOutcome::Publish(Ok(_))) | Ok(DriverOutcome::SkipAlreadyDelivered) => {
return RenewalOutcome::Success;
}
Ok(DriverOutcome::Publish(Err(host_err))) => {
return RenewalOutcome::Failed {
reason: host_err.to_string(),
};
}
Ok(DriverOutcome::InfrastructureError(err)) | Err(err) => err,
};
#[allow(clippy::wildcard_enum_match_arm)]
match infra_err {
OpError::NotificationError | OpError::NotificationChannelError(_) => {
RenewalOutcome::ChannelCongestion
}
other => RenewalOutcome::Failed {
reason: other.to_string(),
},
}
}
pub(crate) async fn run_executor_subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
executor_tx: Transaction,
) -> Result<(), ExecutorSubscribeError> {
match prepare_initial_request(
&op_manager,
executor_tx,
instance_id,
false,
)
.await
{
Ok(InitialRequest::LocallyComplete { key }) => {
tracing::debug!(
%key,
tx = %executor_tx,
"executor subscribe (task-per-tx): local hit, completing inline"
);
let became_interested = op_manager.interest_manager.add_local_client(&key);
if became_interested {
crate::operations::broadcast_change_interests(&op_manager, vec![key], vec![]).await;
}
op_manager.completed(executor_tx);
return Ok(());
}
Ok(InitialRequest::NoHostingPeers) => {
return Err(ExecutorSubscribeError::Infra(
RingError::NoHostingPeers(instance_id).into(),
));
}
Ok(InitialRequest::PeerNotJoined) => {
return Err(ExecutorSubscribeError::Infra(
RingError::PeerNotJoined.into(),
));
}
Ok(InitialRequest::NetworkRequest { .. }) => {
}
Err(err) => return Err(ExecutorSubscribeError::Infra(err)),
}
let result = drive_client_subscribe_inner(
&op_manager,
instance_id,
executor_tx,
false,
)
.await;
classify_executor_subscribe_result(result)
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ExecutorSubscribeError {
#[error("executor subscribe: network exhausted: {0}")]
NetworkExhausted(String),
#[error("executor subscribe: {0}")]
Infra(#[source] OpError),
}
fn classify_executor_subscribe_result(
result: Result<DriverOutcome, OpError>,
) -> Result<(), ExecutorSubscribeError> {
match result {
Ok(DriverOutcome::Publish(Ok(_))) | Ok(DriverOutcome::SkipAlreadyDelivered) => Ok(()),
Ok(DriverOutcome::Publish(Err(host_err))) => Err(ExecutorSubscribeError::NetworkExhausted(
host_err.to_string(),
)),
Ok(DriverOutcome::InfrastructureError(err)) | Err(err) => {
Err(ExecutorSubscribeError::Infra(err))
}
}
}
#[derive(Debug)]
enum DriverOutcome {
Publish(HostResult),
SkipAlreadyDelivered,
InfrastructureError(OpError),
}
async fn drive_client_subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_tx: Transaction,
is_renewal: bool,
) -> DriverOutcome {
match drive_client_subscribe_inner(&op_manager, instance_id, client_tx, is_renewal).await {
Ok(outcome) => outcome,
Err(err) => DriverOutcome::InfrastructureError(err),
}
}
async fn drive_client_subscribe_inner(
op_manager: &Arc<OpManager>,
instance_id: ContractInstanceId,
client_tx: Transaction,
is_renewal: bool,
) -> Result<DriverOutcome, OpError> {
let initial = prepare_initial_request(op_manager, client_tx, instance_id, is_renewal).await?;
let (target_peer, target_addr, mut visited, mut alternatives, htl) = match initial {
InitialRequest::LocallyComplete { key } => {
complete_local_subscription(op_manager, client_tx, key, is_renewal).await?;
return Ok(DriverOutcome::SkipAlreadyDelivered);
}
InitialRequest::NoHostingPeers => {
return Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: format!("no remote peers available for subscription to {instance_id}")
.into(),
}
.into())));
}
InitialRequest::PeerNotJoined => {
return Err(RingError::PeerNotJoined.into());
}
InitialRequest::NetworkRequest {
target,
target_addr,
visited,
alternatives,
htl,
} => (target, target_addr, visited, alternatives, htl),
};
tracing::debug!(
tx = %client_tx,
contract = %instance_id,
target = %target_addr,
"subscribe (task-per-tx): initial target selected, entering retry loop"
);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
tried_peers.insert(target_addr);
let mut retries: usize = 0;
let mut attempts_at_hop: usize = 1;
let mut current_target: PeerKeyLocation = target_peer;
let mut current_target_addr: std::net::SocketAddr = target_addr;
let mut is_first_attempt = true;
loop {
let attempt_tx = Transaction::new::<SubscribeMsg>();
tracing::debug!(
tx = %client_tx,
attempt_tx = %attempt_tx,
target = %current_target_addr,
retries,
attempts_at_hop,
"subscribe (task-per-tx): sending attempt"
);
if !is_first_attempt {
if let Some(event) = crate::tracing::NetEventLog::subscribe_request(
&attempt_tx,
&op_manager.ring,
instance_id,
current_target.clone(),
htl,
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
}
is_first_attempt = false;
let request = SubscribeMsg::Request {
id: attempt_tx,
instance_id,
htl,
visited: visited.clone(),
is_renewal,
};
let mut ctx = op_manager.op_ctx(attempt_tx);
let round_trip = tokio::time::timeout(
OPERATION_TTL,
ctx.send_to_and_await(current_target_addr, NetMessage::from(request)),
)
.await;
op_manager.release_pending_op_slot(attempt_tx).await;
let reply = match round_trip {
Ok(Ok(reply)) => reply,
Ok(Err(err)) => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
target = %current_target_addr,
retries,
attempts_at_hop,
outcome = "wire_error",
error = %err,
"subscribe (task-per-tx): send_and_await failed; advancing to next peer"
);
match advance_to_next_peer(
op_manager,
&instance_id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
)
.await
{
Some((next_target, next_addr)) => {
current_target = next_target;
current_target_addr = next_addr;
continue;
}
None => {
return Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: format!(
"subscribe to {instance_id} failed after {} rounds (last peer error: {err})",
retries + 1
)
.into(),
}
.into())));
}
}
}
Err(_) => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
target = %current_target_addr,
retries,
attempts_at_hop,
outcome = "timeout",
timeout_secs = OPERATION_TTL.as_secs(),
"subscribe (task-per-tx): attempt timed out; advancing to next peer"
);
match advance_to_next_peer(
op_manager,
&instance_id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
)
.await
{
Some((next_target, next_addr)) => {
current_target = next_target;
current_target_addr = next_addr;
continue;
}
None => {
return Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: format!(
"subscribe to {instance_id} timed out after {} rounds",
retries + 1
)
.into(),
}
.into())));
}
}
}
};
match classify_reply(&reply) {
ReplyClass::Subscribed { key } => {
tracing::info!(
tx = %client_tx,
attempt_tx = %attempt_tx,
contract = %key,
target = %current_target_addr,
retries,
attempts_at_hop,
outcome = "subscribed",
"subscribe (task-per-tx): subscribed"
);
if let Some(pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(current_target_addr)
{
let peer_key = crate::ring::interest::PeerKey::from(pkl.pub_key.clone());
op_manager
.interest_manager
.register_peer_interest(&key, peer_key, None, true);
}
op_manager.ring.subscribe(key);
op_manager.ring.complete_subscription_request(&key, true);
let became_interested = op_manager.interest_manager.add_local_client(&key);
if became_interested {
crate::operations::broadcast_change_interests(op_manager, vec![key], vec![])
.await;
}
return Ok(DriverOutcome::Publish(Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse {
key,
subscribed: true,
},
))));
}
ReplyClass::NotFound => {
tracing::debug!(
tx = %client_tx,
attempt_tx = %attempt_tx,
target = %current_target_addr,
retries,
attempts_at_hop,
outcome = "not_found",
"subscribe (task-per-tx): NotFound from peer; advancing to next peer"
);
match advance_to_next_peer(
op_manager,
&instance_id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
)
.await
{
Some((next_target, next_addr)) => {
current_target = next_target;
current_target_addr = next_addr;
continue;
}
None => {
return Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: format!(
"contract {instance_id} not found after exhaustive search"
)
.into(),
}
.into())));
}
}
}
ReplyClass::Unexpected => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
"subscribe (task-per-tx): unexpected terminal reply"
);
return Err(OpError::UnexpectedOpState);
}
}
}
}
#[derive(Debug)]
enum ReplyClass {
Subscribed {
key: freenet_stdlib::prelude::ContractKey,
},
NotFound,
Unexpected,
}
fn classify_reply(msg: &NetMessage) -> ReplyClass {
match msg {
NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Response { result, .. })) => {
match result {
SubscribeMsgResult::Subscribed { key } => ReplyClass::Subscribed { key: *key },
SubscribeMsgResult::NotFound => ReplyClass::NotFound,
}
}
_ => ReplyClass::Unexpected,
}
}
async fn advance_to_next_peer(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
visited: &mut VisitedPeers,
tried_peers: &mut HashSet<std::net::SocketAddr>,
alternatives: &mut Vec<PeerKeyLocation>,
retries: &mut usize,
attempts_at_hop: &mut usize,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)> {
advance_to_next_peer_impl(
instance_id,
visited,
tried_peers,
alternatives,
retries,
attempts_at_hop,
|instance_id, visited| {
op_manager
.ring
.k_closest_potentially_hosting(instance_id, visited, MAX_BREADTH)
},
)
}
fn advance_to_next_peer_impl<F>(
instance_id: &ContractInstanceId,
visited: &mut VisitedPeers,
tried_peers: &mut HashSet<std::net::SocketAddr>,
alternatives: &mut Vec<PeerKeyLocation>,
retries: &mut usize,
attempts_at_hop: &mut usize,
mut fresh_candidates: F,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)>
where
F: FnMut(&ContractInstanceId, &VisitedPeers) -> Vec<PeerKeyLocation>,
{
if *attempts_at_hop < MAX_BREADTH {
while !alternatives.is_empty() {
let candidate = alternatives.remove(0);
if let Some(addr) = candidate.socket_addr() {
if tried_peers.contains(&addr) || visited.probably_visited(addr) {
continue;
}
tried_peers.insert(addr);
visited.mark_visited(addr);
*attempts_at_hop += 1;
tracing::debug!(
%instance_id,
target = %addr,
attempts_at_hop = *attempts_at_hop,
"subscribe (task-per-tx): breadth retry with next alternative"
);
return Some((candidate, addr));
}
}
}
if *retries < MAX_RETRIES {
*retries += 1;
*attempts_at_hop = 1;
let mut fresh = fresh_candidates(instance_id, visited);
while !fresh.is_empty() {
let candidate = fresh.remove(0);
if let Some(addr) = candidate.socket_addr() {
if tried_peers.contains(&addr) || visited.probably_visited(addr) {
continue;
}
tried_peers.insert(addr);
visited.mark_visited(addr);
*alternatives = fresh;
tracing::debug!(
%instance_id,
target = %addr,
retries = *retries,
"subscribe (task-per-tx): fresh k_closest round found new target"
);
return Some((candidate, addr));
}
}
tracing::debug!(
%instance_id,
retries = *retries,
"subscribe (task-per-tx): fresh k_closest round returned no usable candidates"
);
}
None
}
fn classify_subscribe_outcome_for_op_stats(outcome: &DriverOutcome) -> bool {
matches!(
outcome,
DriverOutcome::Publish(Ok(_)) | DriverOutcome::SkipAlreadyDelivered
)
}
fn deliver_outcome(
op_manager: &OpManager,
client_tx: Transaction,
instance_id: ContractInstanceId,
outcome: DriverOutcome,
) {
if !client_tx.is_sub_operation() {
let success = classify_subscribe_outcome_for_op_stats(&outcome);
crate::node::network_status::record_op_result(
crate::node::network_status::OpType::Subscribe,
success,
);
}
match outcome {
DriverOutcome::Publish(result) => {
op_manager.send_client_result(client_tx, result);
}
DriverOutcome::SkipAlreadyDelivered => {
tracing::debug!(
tx = %client_tx,
"subscribe (task-per-tx): local completion already published; \
skipping result_router_tx"
);
}
DriverOutcome::InfrastructureError(err) => {
tracing::warn!(
tx = %client_tx,
contract = %instance_id,
error = %err,
"subscribe (task-per-tx): infrastructure error; \
publishing synthesized client error"
);
let synthesized: HostResult = Err(ErrorKind::OperationError {
cause: format!("subscribe failed: {err}").into(),
}
.into());
op_manager.send_client_result(client_tx, synthesized);
}
}
}
pub static RELAY_SUBSCRIBE_INFLIGHT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_SUBSCRIBE_SPAWNED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_SUBSCRIBE_COMPLETED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_SUBSCRIBE_DEDUP_REJECTS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[cfg(any(test, feature = "testing"))]
pub static RELAY_SUBSCRIBE_DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub(crate) async fn start_relay_subscribe(
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
instance_id: ContractInstanceId,
htl: usize,
visited: VisitedPeers,
is_renewal: bool,
upstream_addr: std::net::SocketAddr,
) -> Result<(), OpError> {
#[cfg(any(test, feature = "testing"))]
RELAY_SUBSCRIBE_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_subscribe_txs.insert(incoming_tx) {
RELAY_SUBSCRIBE_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
%instance_id,
%upstream_addr,
phase = "relay_subscribe_dedup_reject",
"SUBSCRIBE relay (task-per-tx): duplicate Request for in-flight tx, replying NotFound"
);
let response = NetMessage::from(SubscribeMsg::Response {
id: incoming_tx,
instance_id,
result: SubscribeMsgResult::NotFound,
});
let mut ctx = op_manager.op_ctx(incoming_tx);
if let Err(err) = ctx.send_fire_and_forget(upstream_addr, response).await {
tracing::debug!(
tx = %incoming_tx,
%upstream_addr,
error = %err,
"SUBSCRIBE relay (task-per-tx): dedup-reject NotFound send failed"
);
}
return Ok(());
}
RELAY_SUBSCRIBE_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_SUBSCRIBE_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let guard = RelaySubscribeInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
tracing::debug!(
tx = %incoming_tx,
%instance_id,
htl,
%upstream_addr,
is_renewal,
phase = "relay_subscribe_start",
"SUBSCRIBE relay (task-per-tx): spawning driver"
);
GlobalExecutor::spawn(run_relay_subscribe(
guard,
op_manager,
incoming_tx,
instance_id,
htl,
visited,
is_renewal,
upstream_addr,
));
Ok(())
}
struct RelaySubscribeInflightGuard {
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
}
impl Drop for RelaySubscribeInflightGuard {
fn drop(&mut self) {
self.op_manager
.active_relay_subscribe_txs
.remove(&self.incoming_tx);
RELAY_SUBSCRIBE_INFLIGHT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
RELAY_SUBSCRIBE_COMPLETED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[allow(clippy::too_many_arguments)]
async fn run_relay_subscribe(
guard: RelaySubscribeInflightGuard,
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
instance_id: ContractInstanceId,
htl: usize,
visited: VisitedPeers,
is_renewal: bool,
upstream_addr: std::net::SocketAddr,
) {
let _guard = guard;
if let Err(err) = drive_relay_subscribe(
&op_manager,
incoming_tx,
instance_id,
htl,
visited,
is_renewal,
upstream_addr,
)
.await
{
tracing::warn!(
tx = %incoming_tx,
%instance_id,
error = %err,
phase = "relay_subscribe_error",
"SUBSCRIBE relay (task-per-tx): driver returned error"
);
}
tokio::task::yield_now().await;
op_manager.release_pending_op_slot(incoming_tx).await;
}
async fn drive_relay_subscribe(
op_manager: &Arc<OpManager>,
incoming_tx: Transaction,
instance_id: ContractInstanceId,
htl: usize,
visited: VisitedPeers,
is_renewal: bool,
upstream_addr: std::net::SocketAddr,
) -> Result<(), OpError> {
tracing::info!(
tx = %incoming_tx,
%instance_id,
htl,
%upstream_addr,
is_renewal,
phase = "relay_subscribe_request",
"SUBSCRIBE relay (task-per-tx): processing Request"
);
if let Some(key) = super::wait_for_contract_with_timeout(
op_manager,
instance_id,
super::CONTRACT_WAIT_TIMEOUT_MS,
)
.await?
{
register_downstream_subscriber(
op_manager,
&key,
upstream_addr,
None,
Some(upstream_addr),
&incoming_tx,
" (task-per-tx relay local hit)",
)
.await;
tracing::info!(
tx = %incoming_tx,
contract = %key,
is_renewal,
phase = "relay_subscribe_local_hit",
"SUBSCRIBE relay (task-per-tx): fulfilled locally, sending Response"
);
return relay_subscribe_send_response(
op_manager,
incoming_tx,
instance_id,
SubscribeMsgResult::Subscribed { key },
upstream_addr,
)
.await;
}
if htl == 0 {
tracing::warn!(
tx = %incoming_tx,
contract = %instance_id,
htl = 0,
phase = "relay_subscribe_not_found",
"SUBSCRIBE relay (task-per-tx): HTL exhausted"
);
if let Some(event) = crate::tracing::NetEventLog::subscribe_not_found(
&incoming_tx,
&op_manager.ring,
instance_id,
Some(op_manager.ring.max_hops_to_live),
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
return relay_subscribe_send_response(
op_manager,
incoming_tx,
instance_id,
SubscribeMsgResult::NotFound,
upstream_addr,
)
.await;
}
let own_addr = op_manager.ring.connection_manager.peer_addr()?;
let mut new_visited = visited.with_transaction(&incoming_tx);
new_visited.mark_visited(own_addr);
new_visited.mark_visited(upstream_addr);
let mut candidates =
op_manager
.ring
.k_closest_potentially_hosting(&instance_id, &new_visited, MAX_BREADTH);
if candidates.is_empty() {
tracing::warn!(
tx = %incoming_tx,
contract = %instance_id,
phase = "relay_subscribe_not_found",
"SUBSCRIBE relay (task-per-tx): no closer peers to forward"
);
if let Some(event) = crate::tracing::NetEventLog::subscribe_not_found(
&incoming_tx,
&op_manager.ring,
instance_id,
None,
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
return relay_subscribe_send_response(
op_manager,
incoming_tx,
instance_id,
SubscribeMsgResult::NotFound,
upstream_addr,
)
.await;
}
let next_hop = candidates.remove(0);
let next_addr = match next_hop.socket_addr() {
Some(addr) => addr,
None => {
tracing::error!(
tx = %incoming_tx,
%instance_id,
target_pub_key = %next_hop.pub_key(),
"SUBSCRIBE relay (task-per-tx): next hop has no socket address"
);
return relay_subscribe_send_response(
op_manager,
incoming_tx,
instance_id,
SubscribeMsgResult::NotFound,
upstream_addr,
)
.await;
}
};
new_visited.mark_visited(next_addr);
let new_htl = htl.saturating_sub(1);
if let Some(event) = crate::tracing::NetEventLog::subscribe_request(
&incoming_tx,
&op_manager.ring,
instance_id,
next_hop.clone(),
new_htl,
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
tracing::debug!(
tx = %incoming_tx,
%instance_id,
peer_addr = %next_addr,
htl = new_htl,
phase = "relay_subscribe_forward",
"SUBSCRIBE relay (task-per-tx): forwarding to next hop"
);
let forward = NetMessage::from(SubscribeMsg::Request {
id: incoming_tx,
instance_id,
htl: new_htl,
visited: new_visited,
is_renewal,
});
let mut ctx = op_manager.op_ctx(incoming_tx);
let round_trip =
tokio::time::timeout(OPERATION_TTL, ctx.send_to_and_await(next_addr, forward)).await;
op_manager.release_pending_op_slot(incoming_tx).await;
let reply = match round_trip {
Ok(Ok(reply)) => reply,
Ok(Err(err)) => {
tracing::warn!(
tx = %incoming_tx,
%instance_id,
target = %next_addr,
error = %err,
"SUBSCRIBE relay (task-per-tx): send_to_and_await failed"
);
return relay_subscribe_send_response(
op_manager,
incoming_tx,
instance_id,
SubscribeMsgResult::NotFound,
upstream_addr,
)
.await;
}
Err(_elapsed) => {
tracing::warn!(
tx = %incoming_tx,
%instance_id,
target = %next_addr,
timeout_secs = OPERATION_TTL.as_secs(),
"SUBSCRIBE relay (task-per-tx): downstream timed out"
);
return relay_subscribe_send_response(
op_manager,
incoming_tx,
instance_id,
SubscribeMsgResult::NotFound,
upstream_addr,
)
.await;
}
};
let result = match reply {
NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Response {
result: SubscribeMsgResult::Subscribed { key },
..
})) => {
register_downstream_subscriber(
op_manager,
&key,
upstream_addr,
None,
Some(upstream_addr),
&incoming_tx,
" (task-per-tx relay registration on Response)",
)
.await;
tracing::info!(
tx = %incoming_tx,
contract = %key,
phase = "relay_subscribe_bubble",
"SUBSCRIBE relay (task-per-tx): downstream Subscribed; bubbling upstream"
);
SubscribeMsgResult::Subscribed { key }
}
NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Response {
result: SubscribeMsgResult::NotFound,
..
})) => {
tracing::debug!(
tx = %incoming_tx,
%instance_id,
phase = "relay_subscribe_bubble_not_found",
"SUBSCRIBE relay (task-per-tx): downstream NotFound; bubbling upstream"
);
SubscribeMsgResult::NotFound
}
other => {
tracing::warn!(
tx = %incoming_tx,
%instance_id,
reply_variant = ?std::mem::discriminant(&other),
"SUBSCRIBE relay (task-per-tx): unexpected reply variant; treating as NotFound"
);
SubscribeMsgResult::NotFound
}
};
relay_subscribe_send_response(op_manager, incoming_tx, instance_id, result, upstream_addr).await
}
async fn relay_subscribe_send_response(
op_manager: &OpManager,
incoming_tx: Transaction,
instance_id: ContractInstanceId,
result: SubscribeMsgResult,
upstream_addr: std::net::SocketAddr,
) -> Result<(), OpError> {
let response = NetMessage::from(SubscribeMsg::Response {
id: incoming_tx,
instance_id,
result,
});
let mut ctx = op_manager.op_ctx(incoming_tx);
ctx.send_fire_and_forget(upstream_addr, response).await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::NetMessageV1;
use crate::operations::connect::ConnectMsg;
fn fresh_tx() -> Transaction {
Transaction::new::<SubscribeMsg>()
}
#[test]
fn classify_reply_subscribed() {
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
let key = ContractKey::from_id_and_code(
ContractInstanceId::new([3u8; 32]),
CodeHash::new([4u8; 32]),
);
let tx = fresh_tx();
let msg = NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Response {
id: tx,
instance_id: *key.id(),
result: SubscribeMsgResult::Subscribed { key },
}));
match classify_reply(&msg) {
ReplyClass::Subscribed { key: got } => assert_eq!(got, key),
other @ (ReplyClass::NotFound | ReplyClass::Unexpected) => {
panic!("expected Subscribed, got {other:?}")
}
}
}
#[test]
fn classify_reply_not_found() {
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([4u8; 32]);
let tx = fresh_tx();
let msg = NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Response {
id: tx,
instance_id,
result: SubscribeMsgResult::NotFound,
}));
assert!(matches!(classify_reply(&msg), ReplyClass::NotFound));
}
#[test]
fn classify_reply_unexpected_for_request() {
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([5u8; 32]);
let tx = fresh_tx();
let msg = NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Request {
id: tx,
instance_id,
htl: 5,
visited: super::VisitedPeers::new(&tx),
is_renewal: false,
}));
assert!(matches!(classify_reply(&msg), ReplyClass::Unexpected));
}
#[test]
fn classify_reply_unexpected_for_non_subscribe_variant() {
let tx = Transaction::new::<ConnectMsg>();
let msg = NetMessage::V1(NetMessageV1::Aborted(tx));
assert!(matches!(classify_reply(&msg), ReplyClass::Unexpected));
}
fn peer_at(addr: &str) -> PeerKeyLocation {
let mut p = PeerKeyLocation::random();
p.set_addr(addr.parse().expect("valid socket addr"));
p
}
fn contract_id() -> ContractInstanceId {
ContractInstanceId::new([9u8; 32])
}
#[test]
fn advance_breadth_retry_returns_next_alternative_fifo() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let a = peer_at("10.0.0.1:1001");
let b = peer_at("10.0.0.2:1002");
let c = peer_at("10.0.0.3:1003");
let a_addr = a.socket_addr().unwrap();
let mut alternatives = vec![a.clone(), b.clone(), c.clone()];
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| panic!("breadth retry path must not call fresh_candidates"),
);
let (picked, picked_addr) = result.expect("breadth retry should return an alternative");
assert_eq!(picked_addr, a_addr, "must pick FIRST alternative (FIFO)");
assert_eq!(picked.socket_addr(), Some(a_addr));
assert_eq!(attempts_at_hop, 2, "attempts_at_hop must increment");
assert_eq!(retries, 0, "retries must not change on breadth retry");
assert_eq!(alternatives.len(), 2, "one alternative consumed");
assert!(tried_peers.contains(&a_addr));
assert!(visited.probably_visited(a_addr));
}
#[test]
fn advance_breadth_retry_skips_already_visited() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let a = peer_at("10.0.0.1:1001");
let b = peer_at("10.0.0.2:1002");
let a_addr = a.socket_addr().unwrap();
let b_addr = b.socket_addr().unwrap();
visited.mark_visited(a_addr); let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
tried_peers.insert(a_addr);
let mut alternatives = vec![a, b];
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| panic!("should find B before falling through"),
);
let (_, picked_addr) = result.expect("should pick B after skipping A");
assert_eq!(picked_addr, b_addr);
assert!(alternatives.is_empty(), "both A and B consumed");
}
#[test]
fn advance_fresh_round_triggered_when_alternatives_exhausted() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let mut alternatives: Vec<PeerKeyLocation> = Vec::new();
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let fresh_peer = peer_at("10.0.0.5:1005");
let fresh_addr = fresh_peer.socket_addr().unwrap();
let mut fresh_calls = 0;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|got_id, _got_visited| {
fresh_calls += 1;
assert_eq!(got_id, &contract_id(), "passes through instance_id");
vec![fresh_peer.clone()]
},
);
assert_eq!(
fresh_calls, 1,
"fresh_candidates must be called exactly once"
);
let (_, picked_addr) = result.expect("fresh round should find a peer");
assert_eq!(picked_addr, fresh_addr);
assert_eq!(retries, 1, "retries incremented on fresh round");
assert_eq!(
attempts_at_hop, 1,
"attempts_at_hop reset to 1 on fresh round"
);
}
#[test]
fn advance_fresh_round_after_max_breadth_hit() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let unused_alt = peer_at("10.0.0.1:1001");
let mut alternatives = vec![unused_alt.clone()];
let mut retries = 0usize;
let mut attempts_at_hop = MAX_BREADTH;
let fresh_peer = peer_at("10.0.0.5:1005");
let fresh_addr = fresh_peer.socket_addr().unwrap();
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| vec![fresh_peer.clone()],
);
let (_, picked_addr) = result.expect("fresh round should run");
assert_eq!(picked_addr, fresh_addr);
assert_eq!(retries, 1, "went through fresh round, not breadth");
assert_eq!(attempts_at_hop, 1, "attempts_at_hop reset by fresh round");
}
#[test]
fn advance_exhausted_after_max_retries() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let mut alternatives: Vec<PeerKeyLocation> = Vec::new();
let mut retries = MAX_RETRIES;
let mut attempts_at_hop = 1usize;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| panic!("fresh_candidates must not be called when retries == MAX"),
);
assert!(result.is_none(), "exhausted case returns None");
assert_eq!(retries, MAX_RETRIES, "retries unchanged when exhausted");
}
#[test]
fn advance_exhausted_when_fresh_round_returns_empty() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let mut alternatives: Vec<PeerKeyLocation> = Vec::new();
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| Vec::new(),
);
assert!(result.is_none());
assert_eq!(
retries, 1,
"retries incremented even though fresh round was empty \
— the round was 'attempted', it just found nothing"
);
}
#[test]
fn advance_fresh_round_leftover_becomes_new_alternatives() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let mut alternatives: Vec<PeerKeyLocation> = Vec::new();
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let p1 = peer_at("10.0.0.1:1001");
let p2 = peer_at("10.0.0.2:1002");
let p3 = peer_at("10.0.0.3:1003");
let p1_addr = p1.socket_addr().unwrap();
let p2_addr = p2.socket_addr().unwrap();
let p3_addr = p3.socket_addr().unwrap();
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| vec![p1.clone(), p2.clone(), p3.clone()],
);
let (_, picked) = result.expect("fresh round returns first candidate");
assert_eq!(picked, p1_addr);
assert_eq!(
alternatives.len(),
2,
"rest of fresh becomes new alternatives"
);
let alt_addrs: Vec<_> = alternatives
.iter()
.filter_map(|p| p.socket_addr())
.collect();
assert!(alt_addrs.contains(&p2_addr));
assert!(alt_addrs.contains(&p3_addr));
}
fn relay_section(src: &str) -> &str {
let start = src
.find("pub(crate) async fn start_relay_subscribe(")
.expect("start_relay_subscribe not found");
let end = src
.find("\n#[cfg(test)]")
.expect("test module marker not found");
&src[start..end]
}
#[test]
fn start_relay_subscribe_checks_dedup_gate() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_section(src);
let window_start = relay
.find("pub(crate) async fn start_relay_subscribe(")
.expect("entry-point not found");
let spawn_pos = relay[window_start..]
.find("GlobalExecutor::spawn(run_relay_subscribe(")
.expect("spawn site not found")
+ window_start;
let insert_pos = relay[window_start..]
.find("active_relay_subscribe_txs.insert(incoming_tx)")
.expect("dedup insert site not found")
+ window_start;
assert!(
insert_pos < spawn_pos,
"active_relay_subscribe_txs.insert MUST happen before GlobalExecutor::spawn"
);
}
#[test]
fn dedup_rejection_emits_not_found_reply() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_section(src);
let after_insert = relay
.split("active_relay_subscribe_txs.insert(incoming_tx)")
.nth(1)
.expect("dedup insert site not found");
let window_end = after_insert
.find("return Ok(())")
.expect("early return after dedup-reject not found");
let window = &after_insert[..window_end];
assert!(
window.contains("SubscribeMsgResult::NotFound"),
"dedup-reject path must emit SubscribeMsgResult::NotFound to \
the rejected upstream so its send_to_and_await returns fast"
);
assert!(
window.contains("send_fire_and_forget"),
"dedup-reject NotFound must be sent via send_fire_and_forget \
(not send_to_and_await — upstream's own waiter owns its slot)"
);
}
#[test]
fn dedup_rejection_increments_counter() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_section(src);
let after_insert = relay
.split("active_relay_subscribe_txs.insert(incoming_tx)")
.nth(1)
.expect("dedup insert site not found");
let window = &after_insert[..500.min(after_insert.len())];
assert!(
window.contains("RELAY_SUBSCRIBE_DEDUP_REJECTS.fetch_add"),
"dedup gate must increment RELAY_SUBSCRIBE_DEDUP_REJECTS on rejection"
);
}
#[test]
fn raii_guard_clears_dedup_set_on_drop() {
let src = include_str!("op_ctx_task.rs");
let drop_start = src
.find("impl Drop for RelaySubscribeInflightGuard")
.expect("RelaySubscribeInflightGuard Drop impl not found");
let drop_body = &src[drop_start..drop_start + 600];
assert!(
drop_body.contains("active_relay_subscribe_txs"),
"RelaySubscribeInflightGuard::drop must remove from active_relay_subscribe_txs"
);
assert!(
drop_body.contains("RELAY_SUBSCRIBE_INFLIGHT.fetch_sub"),
"RelaySubscribeInflightGuard::drop must decrement RELAY_SUBSCRIBE_INFLIGHT"
);
assert!(
drop_body.contains("RELAY_SUBSCRIBE_COMPLETED_TOTAL.fetch_add"),
"RelaySubscribeInflightGuard::drop must increment RELAY_SUBSCRIBE_COMPLETED_TOTAL"
);
}
#[test]
fn drive_relay_subscribe_forwards_via_send_to_and_await() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_subscribe(")
.expect("drive_relay_subscribe not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_subscribe_send_response(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
assert!(
driver_src.contains("ctx.send_to_and_await("),
"drive_relay_subscribe must forward downstream via send_to_and_await"
);
}
#[test]
fn drive_relay_subscribe_reuses_incoming_tx_on_forward() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_subscribe(")
.expect("drive_relay_subscribe not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_subscribe_send_response(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
let forward_pos = driver_src
.find("SubscribeMsg::Request {")
.expect("forward SubscribeMsg::Request not found in driver");
let forward_window = &driver_src[forward_pos..forward_pos + 400];
assert!(
forward_window.contains("id: incoming_tx"),
"relay forward must reuse incoming_tx"
);
assert!(
!forward_window.contains("Transaction::new::<SubscribeMsg>()"),
"relay forward must NOT mint a fresh Transaction"
);
}
#[test]
fn relay_subscribe_send_response_is_fire_and_forget() {
let src = include_str!("op_ctx_task.rs");
let fn_start = src
.find("async fn relay_subscribe_send_response(")
.expect("relay_subscribe_send_response not found");
let fn_end = src[fn_start..]
.find("\n}\n")
.expect("function body end not found")
+ fn_start;
let fn_src = &src[fn_start..fn_end];
assert!(
fn_src.contains("send_fire_and_forget"),
"relay_subscribe_send_response must use send_fire_and_forget for the upstream response"
);
}
#[test]
fn relay_subscribe_does_not_emit_forwarding_ack() {
let src = include_str!("op_ctx_task.rs");
let relay = relay_section(src);
assert!(
!relay.contains("SubscribeMsg::ForwardingAck {"),
"relay SUBSCRIBE driver must NOT construct a ForwardingAck — \
sharing incoming_tx with the reply collides with upstream's waiter"
);
}
#[test]
fn relay_subscribe_registers_downstream_on_both_paths() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_subscribe(")
.expect("drive_relay_subscribe not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_subscribe_send_response(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
let hits = driver_src
.matches("register_downstream_subscriber(")
.count();
assert!(
hits >= 2,
"driver must call register_downstream_subscriber on BOTH local-hit \
and downstream-Subscribed paths (found {hits})"
);
}
#[test]
fn relay_subscribe_does_not_install_lease_on_relayed_response() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_subscribe(")
.expect("drive_relay_subscribe not found");
let driver_end = src[driver_start..]
.find("\nasync fn relay_subscribe_send_response(")
.expect("driver body end not found")
+ driver_start;
let driver_src = &src[driver_start..driver_end];
let stripped: String = driver_src
.lines()
.map(|line| match line.find("//") {
Some(idx) => &line[..idx],
None => line,
})
.collect::<Vec<_>>()
.join("\n");
assert!(
!stripped.contains("ring.subscribe("),
"relay driver must NOT call ring.subscribe on relayed response"
);
assert!(
!stripped.contains("complete_subscription_request"),
"relay driver must NOT call complete_subscription_request on relayed response"
);
assert!(
!stripped.contains("announce_contract_hosted"),
"relay driver must NOT call announce_contract_hosted on relayed response"
);
}
#[test]
fn classify_renewal_result_publish_ok_is_success() {
use freenet_stdlib::client_api::{ContractResponse, HostResponse};
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
let key = ContractKey::from_id_and_code(
ContractInstanceId::new([1u8; 32]),
CodeHash::new([2u8; 32]),
);
let result = Ok(DriverOutcome::Publish(Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse {
key,
subscribed: true,
},
))));
assert!(matches!(
classify_renewal_result(result),
RenewalOutcome::Success
));
}
#[test]
fn classify_renewal_result_skip_already_delivered_is_success() {
let result = Ok(DriverOutcome::SkipAlreadyDelivered);
assert!(matches!(
classify_renewal_result(result),
RenewalOutcome::Success
));
}
#[test]
fn classify_renewal_result_publish_err_is_failed() {
let host_err: freenet_stdlib::client_api::ClientError =
freenet_stdlib::client_api::ErrorKind::OperationError {
cause: "downstream peer rejected".into(),
}
.into();
let result = Ok(DriverOutcome::Publish(Err(host_err)));
match classify_renewal_result(result) {
RenewalOutcome::Failed { reason } => {
assert!(
reason.contains("downstream peer rejected"),
"reason should include the host-error cause; got: {reason}"
);
}
RenewalOutcome::Success | RenewalOutcome::ChannelCongestion => {
panic!("expected Failed, got non-Failed variant")
}
}
}
#[test]
fn classify_renewal_result_notification_error_is_channel_congestion() {
let result = Ok(DriverOutcome::InfrastructureError(
OpError::NotificationError,
));
assert!(matches!(
classify_renewal_result(result),
RenewalOutcome::ChannelCongestion
));
}
#[test]
fn classify_renewal_result_notification_channel_error_is_channel_congestion() {
let result = Ok(DriverOutcome::InfrastructureError(
OpError::NotificationChannelError("channel full".into()),
));
assert!(matches!(
classify_renewal_result(result),
RenewalOutcome::ChannelCongestion
));
}
#[test]
fn classify_renewal_result_outer_err_notification_is_channel_congestion() {
let result: Result<DriverOutcome, OpError> = Err(OpError::NotificationError);
assert!(matches!(
classify_renewal_result(result),
RenewalOutcome::ChannelCongestion
));
}
#[test]
fn classify_renewal_result_unexpected_op_state_is_failed() {
let result: Result<DriverOutcome, OpError> = Err(OpError::UnexpectedOpState);
assert!(matches!(
classify_renewal_result(result),
RenewalOutcome::Failed { .. }
));
}
#[test]
fn classify_renewal_result_ring_error_no_hosting_peers_is_failed() {
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([7u8; 32]);
let result: Result<DriverOutcome, OpError> =
Err(OpError::RingError(RingError::NoHostingPeers(instance_id)));
assert!(matches!(
classify_renewal_result(result),
RenewalOutcome::Failed { .. }
));
}
#[test]
fn classify_executor_subscribe_publish_ok_is_ok() {
use freenet_stdlib::client_api::{ContractResponse, HostResponse};
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
let key = ContractKey::from_id_and_code(
ContractInstanceId::new([8u8; 32]),
CodeHash::new([9u8; 32]),
);
let result = Ok(DriverOutcome::Publish(Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse {
key,
subscribed: true,
},
))));
assert!(classify_executor_subscribe_result(result).is_ok());
}
#[test]
fn classify_executor_subscribe_skip_already_delivered_is_ok() {
let result = Ok(DriverOutcome::SkipAlreadyDelivered);
assert!(classify_executor_subscribe_result(result).is_ok());
}
#[test]
fn classify_executor_subscribe_publish_err_is_network_exhausted() {
let host_err: freenet_stdlib::client_api::ClientError =
freenet_stdlib::client_api::ErrorKind::OperationError {
cause: "downstream peer rejected".into(),
}
.into();
let result = Ok(DriverOutcome::Publish(Err(host_err)));
match classify_executor_subscribe_result(result) {
Err(ExecutorSubscribeError::NetworkExhausted(reason)) => {
assert!(
reason.contains("downstream peer rejected"),
"reason should include host-error cause; got: {reason}"
);
}
Ok(_) | Err(ExecutorSubscribeError::Infra(_)) => {
panic!("expected NetworkExhausted variant")
}
}
}
#[test]
fn classify_executor_subscribe_infrastructure_error_is_infra() {
let result = Ok(DriverOutcome::InfrastructureError(
OpError::NotificationError,
));
match classify_executor_subscribe_result(result) {
Err(ExecutorSubscribeError::Infra(OpError::NotificationError)) => {}
other => panic!("expected Infra(NotificationError), got {other:?}"),
}
}
#[test]
fn classify_executor_subscribe_outer_err_propagates_as_infra() {
let result: Result<DriverOutcome, OpError> = Err(OpError::UnexpectedOpState);
match classify_executor_subscribe_result(result) {
Err(ExecutorSubscribeError::Infra(OpError::UnexpectedOpState)) => {}
other => panic!("expected Infra(UnexpectedOpState), got {other:?}"),
}
}
#[test]
fn run_executor_subscribe_locally_complete_skips_local_subscribe_complete_event() {
let src = include_str!("op_ctx_task.rs");
let body = src
.split("pub(crate) async fn run_executor_subscribe(")
.nth(1)
.expect("run_executor_subscribe must exist")
.split(
"
}",
)
.next()
.expect("closing brace of run_executor_subscribe");
let stripped: String = body
.lines()
.map(|line| match line.find("//") {
Some(idx) => &line[..idx],
None => line,
})
.collect::<Vec<_>>()
.join("\n");
let helper = ["complete_local_", "subscription"].concat();
assert!(
!stripped.contains(&helper),
"run_executor_subscribe must NOT call the local-completion \
helper on the LocallyComplete branch — would publish a \
result for executor_tx that has no client waiter. Handle \
interest registration + op_manager.completed inline."
);
assert!(
body.contains("interest_manager.add_local_client"),
"run_executor_subscribe LocallyComplete arm must register \
local interest (mirrors complete_local_subscription side \
effect)"
);
assert!(
body.contains("op_manager.completed("),
"run_executor_subscribe LocallyComplete arm must call \
op_manager.completed() to drop the under_progress slot"
);
}
#[test]
fn run_executor_subscribe_no_hosting_peers_maps_to_infra_ring_error() {
let src = include_str!("op_ctx_task.rs");
let body = src
.split("pub(crate) async fn run_executor_subscribe(")
.nth(1)
.expect("run_executor_subscribe must exist")
.split(
"
}",
)
.next()
.expect("closing brace of run_executor_subscribe");
assert!(
body.contains("RingError::NoHostingPeers"),
"run_executor_subscribe must surface NoHostingPeers via \
RingError::NoHostingPeers"
);
assert!(
body.contains("RingError::PeerNotJoined"),
"run_executor_subscribe must surface PeerNotJoined via \
RingError::PeerNotJoined"
);
assert!(
body.contains("ExecutorSubscribeError::Infra"),
"run_executor_subscribe early-return error branches must \
wrap as ExecutorSubscribeError::Infra(...)"
);
}
#[test]
fn deliver_outcome_records_subscribe_op_result() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let prod = production_source(SOURCE);
let body = extract_fn_body(prod, "fn deliver_outcome(");
assert!(
body.contains("record_op_result"),
"deliver_outcome must call record_op_result so the dashboard \
SUBSCRIBE counter advances on task-per-tx terminal replies. \
Issue #4010."
);
assert!(
body.contains("OpType::Subscribe"),
"record_op_result inside deliver_outcome must be passed \
OpType::Subscribe (not Get/Put/Update)."
);
assert!(
body.contains("!client_tx.is_sub_operation()"),
"deliver_outcome must gate record_op_result on \
`!client_tx.is_sub_operation()` (note the leading `!`) so \
sub-op SUBSCRIBE spawned by PUT/GET \
(`maybe_subscribe_child`, `start_subscription_request`) \
does not inflate the user-facing dashboard counter. \
A flipped guard would silently re-introduce the inflation \
reported by Codex review. Issue #4010."
);
}
#[test]
fn renewal_and_executor_subscribe_do_not_record_op_stats() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let prod = production_source(SOURCE);
let renewal_body = extract_fn_body(prod, "pub(crate) async fn run_renewal_subscribe(");
assert!(
!renewal_body.contains("deliver_outcome"),
"run_renewal_subscribe must NOT route through deliver_outcome \
— that would record renewals against the dashboard SUBSCRIBE \
counter. Issue #4010."
);
assert!(
!renewal_body.contains("record_op_result"),
"run_renewal_subscribe must NOT call record_op_result. \
Issue #4010."
);
let executor_body = extract_fn_body(prod, "pub(crate) async fn run_executor_subscribe(");
assert!(
!executor_body.contains("deliver_outcome"),
"run_executor_subscribe must NOT route through deliver_outcome \
— that would record executor auto-subscribes against the \
dashboard SUBSCRIBE counter. Issue #4010."
);
assert!(
!executor_body.contains("record_op_result"),
"run_executor_subscribe must NOT call record_op_result. \
Issue #4010."
);
}
#[test]
fn classify_subscribe_outcome_covers_all_variants() {
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
let key = ContractKey::from_id_and_code(
ContractInstanceId::new([7u8; 32]),
CodeHash::new([8u8; 32]),
);
let publish_ok = DriverOutcome::Publish(Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse {
key,
subscribed: true,
},
)));
let publish_err = DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: "synthetic".into(),
}
.into()));
let skip = DriverOutcome::SkipAlreadyDelivered;
let infra = DriverOutcome::InfrastructureError(OpError::UnexpectedOpState);
assert!(classify_subscribe_outcome_for_op_stats(&publish_ok));
assert!(!classify_subscribe_outcome_for_op_stats(&publish_err));
assert!(classify_subscribe_outcome_for_op_stats(&skip));
assert!(!classify_subscribe_outcome_for_op_stats(&infra));
}
fn production_source(full: &str) -> &str {
let cutoff = full
.find("#[cfg(test)]")
.expect("file must have a #[cfg(test)] section");
&full[..cutoff]
}
fn extract_fn_body<'a>(source: &'a str, signature_prefix: &str) -> &'a str {
let start = source
.find(signature_prefix)
.unwrap_or_else(|| panic!("could not find {signature_prefix}"));
let brace = source[start..]
.find('{')
.expect("fn signature must have a body");
let body_start = start + brace + 1;
let bytes = source.as_bytes();
let mut depth: i32 = 1;
let mut i = body_start;
while i < bytes.len() {
match bytes[i] {
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
return &source[body_start..i];
}
}
_ => {}
}
i += 1;
}
panic!("unterminated fn body for {signature_prefix}");
}
}