use std::collections::HashSet;
use std::sync::Arc;
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::ContractInstanceId;
use crate::client_events::HostResult;
use crate::config::{GlobalExecutor, OPERATION_TTL};
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::OpManager;
use crate::operations::{OpError, VisitedPeers};
use crate::ring::{PeerKeyLocation, RingError};
use super::{
InitialRequest, MAX_BREADTH, MAX_RETRIES, SubscribeMsg, SubscribeMsgResult,
complete_local_subscription, prepare_initial_request,
};
pub(crate) async fn start_client_subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_tx: Transaction,
) -> Result<Transaction, OpError> {
tracing::debug!(
tx = %client_tx,
contract = %instance_id,
"subscribe (task-per-tx): spawning client-initiated task"
);
GlobalExecutor::spawn(run_client_subscribe(op_manager, instance_id, client_tx));
Ok(client_tx)
}
async fn run_client_subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_tx: Transaction,
) {
let outcome = drive_client_subscribe(op_manager.clone(), instance_id, client_tx).await;
deliver_outcome(&op_manager, client_tx, instance_id, outcome);
}
#[derive(Debug)]
enum DriverOutcome {
Publish(HostResult),
SkipAlreadyDelivered,
InfrastructureError(OpError),
}
async fn drive_client_subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_tx: Transaction,
) -> DriverOutcome {
match drive_client_subscribe_inner(&op_manager, instance_id, client_tx).await {
Ok(outcome) => outcome,
Err(err) => DriverOutcome::InfrastructureError(err),
}
}
async fn drive_client_subscribe_inner(
op_manager: &Arc<OpManager>,
instance_id: ContractInstanceId,
client_tx: Transaction,
) -> Result<DriverOutcome, OpError> {
let initial = prepare_initial_request(
op_manager,
client_tx,
instance_id,
false,
)
.await?;
let (target_peer, target_addr, mut visited, mut alternatives, htl) = match initial {
InitialRequest::LocallyComplete { key } => {
complete_local_subscription(op_manager, client_tx, key, false).await?;
return Ok(DriverOutcome::SkipAlreadyDelivered);
}
InitialRequest::NoHostingPeers => {
return Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: format!("no remote peers available for subscription to {instance_id}")
.into(),
}
.into())));
}
InitialRequest::PeerNotJoined => {
return Err(RingError::PeerNotJoined.into());
}
InitialRequest::NetworkRequest {
target,
target_addr,
visited,
alternatives,
htl,
} => (target, target_addr, visited, alternatives, htl),
};
tracing::debug!(
tx = %client_tx,
contract = %instance_id,
target = %target_addr,
"subscribe (task-per-tx): initial target selected, entering retry loop"
);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
tried_peers.insert(target_addr);
let mut retries: usize = 0;
let mut attempts_at_hop: usize = 1;
let mut current_target: PeerKeyLocation = target_peer;
let mut current_target_addr: std::net::SocketAddr = target_addr;
let mut is_first_attempt = true;
loop {
let attempt_tx = Transaction::new::<SubscribeMsg>();
tracing::debug!(
tx = %client_tx,
attempt_tx = %attempt_tx,
target = %current_target_addr,
retries,
attempts_at_hop,
"subscribe (task-per-tx): sending attempt"
);
if !is_first_attempt {
if let Some(event) = crate::tracing::NetEventLog::subscribe_request(
&attempt_tx,
&op_manager.ring,
instance_id,
current_target.clone(),
htl,
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
}
is_first_attempt = false;
let request = SubscribeMsg::Request {
id: attempt_tx,
instance_id,
htl,
visited: visited.clone(),
is_renewal: false,
};
let mut ctx = op_manager.op_ctx(attempt_tx);
let round_trip =
tokio::time::timeout(OPERATION_TTL, ctx.send_and_await(NetMessage::from(request)))
.await;
op_manager.release_pending_op_slot(attempt_tx).await;
let reply = match round_trip {
Ok(Ok(reply)) => reply,
Ok(Err(err)) => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
target = %current_target_addr,
retries,
attempts_at_hop,
outcome = "wire_error",
error = %err,
"subscribe (task-per-tx): send_and_await failed; advancing to next peer"
);
match advance_to_next_peer(
op_manager,
&instance_id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
)
.await
{
Some((next_target, next_addr)) => {
current_target = next_target;
current_target_addr = next_addr;
continue;
}
None => {
return Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: format!(
"subscribe to {instance_id} failed after {} rounds (last peer error: {err})",
retries + 1
)
.into(),
}
.into())));
}
}
}
Err(_) => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
target = %current_target_addr,
retries,
attempts_at_hop,
outcome = "timeout",
timeout_secs = OPERATION_TTL.as_secs(),
"subscribe (task-per-tx): attempt timed out; advancing to next peer"
);
match advance_to_next_peer(
op_manager,
&instance_id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
)
.await
{
Some((next_target, next_addr)) => {
current_target = next_target;
current_target_addr = next_addr;
continue;
}
None => {
return Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: format!(
"subscribe to {instance_id} timed out after {} rounds",
retries + 1
)
.into(),
}
.into())));
}
}
}
};
match classify_reply(&reply) {
ReplyClass::Subscribed { key } => {
tracing::info!(
tx = %client_tx,
attempt_tx = %attempt_tx,
contract = %key,
target = %current_target_addr,
retries,
attempts_at_hop,
outcome = "subscribed",
"subscribe (task-per-tx): subscribed"
);
return Ok(DriverOutcome::Publish(Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse {
key,
subscribed: true,
},
))));
}
ReplyClass::NotFound => {
tracing::debug!(
tx = %client_tx,
attempt_tx = %attempt_tx,
target = %current_target_addr,
retries,
attempts_at_hop,
outcome = "not_found",
"subscribe (task-per-tx): NotFound from peer; advancing to next peer"
);
match advance_to_next_peer(
op_manager,
&instance_id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
)
.await
{
Some((next_target, next_addr)) => {
current_target = next_target;
current_target_addr = next_addr;
continue;
}
None => {
return Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: format!(
"contract {instance_id} not found after exhaustive search"
)
.into(),
}
.into())));
}
}
}
ReplyClass::Unexpected => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
"subscribe (task-per-tx): unexpected terminal reply"
);
return Err(OpError::UnexpectedOpState);
}
}
}
}
#[derive(Debug)]
enum ReplyClass {
Subscribed {
key: freenet_stdlib::prelude::ContractKey,
},
NotFound,
Unexpected,
}
fn classify_reply(msg: &NetMessage) -> ReplyClass {
match msg {
NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Response { result, .. })) => {
match result {
SubscribeMsgResult::Subscribed { key } => ReplyClass::Subscribed { key: *key },
SubscribeMsgResult::NotFound => ReplyClass::NotFound,
}
}
_ => ReplyClass::Unexpected,
}
}
async fn advance_to_next_peer(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
visited: &mut VisitedPeers,
tried_peers: &mut HashSet<std::net::SocketAddr>,
alternatives: &mut Vec<PeerKeyLocation>,
retries: &mut usize,
attempts_at_hop: &mut usize,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)> {
advance_to_next_peer_impl(
instance_id,
visited,
tried_peers,
alternatives,
retries,
attempts_at_hop,
|instance_id, visited| {
op_manager
.ring
.k_closest_potentially_hosting(instance_id, visited, MAX_BREADTH)
},
)
}
fn advance_to_next_peer_impl<F>(
instance_id: &ContractInstanceId,
visited: &mut VisitedPeers,
tried_peers: &mut HashSet<std::net::SocketAddr>,
alternatives: &mut Vec<PeerKeyLocation>,
retries: &mut usize,
attempts_at_hop: &mut usize,
mut fresh_candidates: F,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)>
where
F: FnMut(&ContractInstanceId, &VisitedPeers) -> Vec<PeerKeyLocation>,
{
if *attempts_at_hop < MAX_BREADTH {
while !alternatives.is_empty() {
let candidate = alternatives.remove(0);
if let Some(addr) = candidate.socket_addr() {
if tried_peers.contains(&addr) || visited.probably_visited(addr) {
continue;
}
tried_peers.insert(addr);
visited.mark_visited(addr);
*attempts_at_hop += 1;
tracing::debug!(
%instance_id,
target = %addr,
attempts_at_hop = *attempts_at_hop,
"subscribe (task-per-tx): breadth retry with next alternative"
);
return Some((candidate, addr));
}
}
}
if *retries < MAX_RETRIES {
*retries += 1;
*attempts_at_hop = 1;
let mut fresh = fresh_candidates(instance_id, visited);
while !fresh.is_empty() {
let candidate = fresh.remove(0);
if let Some(addr) = candidate.socket_addr() {
if tried_peers.contains(&addr) || visited.probably_visited(addr) {
continue;
}
tried_peers.insert(addr);
visited.mark_visited(addr);
*alternatives = fresh;
tracing::debug!(
%instance_id,
target = %addr,
retries = *retries,
"subscribe (task-per-tx): fresh k_closest round found new target"
);
return Some((candidate, addr));
}
}
tracing::debug!(
%instance_id,
retries = *retries,
"subscribe (task-per-tx): fresh k_closest round returned no usable candidates"
);
}
None
}
fn deliver_outcome(
op_manager: &OpManager,
client_tx: Transaction,
instance_id: ContractInstanceId,
outcome: DriverOutcome,
) {
match outcome {
DriverOutcome::Publish(result) => {
op_manager.send_client_result(client_tx, result);
}
DriverOutcome::SkipAlreadyDelivered => {
tracing::debug!(
tx = %client_tx,
"subscribe (task-per-tx): local completion already published; \
skipping result_router_tx"
);
}
DriverOutcome::InfrastructureError(err) => {
tracing::warn!(
tx = %client_tx,
contract = %instance_id,
error = %err,
"subscribe (task-per-tx): infrastructure error; \
publishing synthesized client error"
);
let synthesized: HostResult = Err(ErrorKind::OperationError {
cause: format!("subscribe failed: {err}").into(),
}
.into());
op_manager.send_client_result(client_tx, synthesized);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::NetMessageV1;
use crate::operations::connect::ConnectMsg;
fn fresh_tx() -> Transaction {
Transaction::new::<SubscribeMsg>()
}
#[test]
fn classify_reply_subscribed() {
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
let key = ContractKey::from_id_and_code(
ContractInstanceId::new([3u8; 32]),
CodeHash::new([4u8; 32]),
);
let tx = fresh_tx();
let msg = NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Response {
id: tx,
instance_id: *key.id(),
result: SubscribeMsgResult::Subscribed { key },
}));
match classify_reply(&msg) {
ReplyClass::Subscribed { key: got } => assert_eq!(got, key),
other @ (ReplyClass::NotFound | ReplyClass::Unexpected) => {
panic!("expected Subscribed, got {other:?}")
}
}
}
#[test]
fn classify_reply_not_found() {
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([4u8; 32]);
let tx = fresh_tx();
let msg = NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Response {
id: tx,
instance_id,
result: SubscribeMsgResult::NotFound,
}));
assert!(matches!(classify_reply(&msg), ReplyClass::NotFound));
}
#[test]
fn classify_reply_unexpected_for_request() {
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([5u8; 32]);
let tx = fresh_tx();
let msg = NetMessage::V1(NetMessageV1::Subscribe(SubscribeMsg::Request {
id: tx,
instance_id,
htl: 5,
visited: super::VisitedPeers::new(&tx),
is_renewal: false,
}));
assert!(matches!(classify_reply(&msg), ReplyClass::Unexpected));
}
#[test]
fn classify_reply_unexpected_for_non_subscribe_variant() {
let tx = Transaction::new::<ConnectMsg>();
let msg = NetMessage::V1(NetMessageV1::Aborted(tx));
assert!(matches!(classify_reply(&msg), ReplyClass::Unexpected));
}
fn peer_at(addr: &str) -> PeerKeyLocation {
let mut p = PeerKeyLocation::random();
p.set_addr(addr.parse().expect("valid socket addr"));
p
}
fn contract_id() -> ContractInstanceId {
ContractInstanceId::new([9u8; 32])
}
#[test]
fn advance_breadth_retry_returns_next_alternative_fifo() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let a = peer_at("10.0.0.1:1001");
let b = peer_at("10.0.0.2:1002");
let c = peer_at("10.0.0.3:1003");
let a_addr = a.socket_addr().unwrap();
let mut alternatives = vec![a.clone(), b.clone(), c.clone()];
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| panic!("breadth retry path must not call fresh_candidates"),
);
let (picked, picked_addr) = result.expect("breadth retry should return an alternative");
assert_eq!(picked_addr, a_addr, "must pick FIRST alternative (FIFO)");
assert_eq!(picked.socket_addr(), Some(a_addr));
assert_eq!(attempts_at_hop, 2, "attempts_at_hop must increment");
assert_eq!(retries, 0, "retries must not change on breadth retry");
assert_eq!(alternatives.len(), 2, "one alternative consumed");
assert!(tried_peers.contains(&a_addr));
assert!(visited.probably_visited(a_addr));
}
#[test]
fn advance_breadth_retry_skips_already_visited() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let a = peer_at("10.0.0.1:1001");
let b = peer_at("10.0.0.2:1002");
let a_addr = a.socket_addr().unwrap();
let b_addr = b.socket_addr().unwrap();
visited.mark_visited(a_addr); let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
tried_peers.insert(a_addr);
let mut alternatives = vec![a, b];
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| panic!("should find B before falling through"),
);
let (_, picked_addr) = result.expect("should pick B after skipping A");
assert_eq!(picked_addr, b_addr);
assert!(alternatives.is_empty(), "both A and B consumed");
}
#[test]
fn advance_fresh_round_triggered_when_alternatives_exhausted() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let mut alternatives: Vec<PeerKeyLocation> = Vec::new();
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let fresh_peer = peer_at("10.0.0.5:1005");
let fresh_addr = fresh_peer.socket_addr().unwrap();
let mut fresh_calls = 0;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|got_id, _got_visited| {
fresh_calls += 1;
assert_eq!(got_id, &contract_id(), "passes through instance_id");
vec![fresh_peer.clone()]
},
);
assert_eq!(
fresh_calls, 1,
"fresh_candidates must be called exactly once"
);
let (_, picked_addr) = result.expect("fresh round should find a peer");
assert_eq!(picked_addr, fresh_addr);
assert_eq!(retries, 1, "retries incremented on fresh round");
assert_eq!(
attempts_at_hop, 1,
"attempts_at_hop reset to 1 on fresh round"
);
}
#[test]
fn advance_fresh_round_after_max_breadth_hit() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let unused_alt = peer_at("10.0.0.1:1001");
let mut alternatives = vec![unused_alt.clone()];
let mut retries = 0usize;
let mut attempts_at_hop = MAX_BREADTH;
let fresh_peer = peer_at("10.0.0.5:1005");
let fresh_addr = fresh_peer.socket_addr().unwrap();
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| vec![fresh_peer.clone()],
);
let (_, picked_addr) = result.expect("fresh round should run");
assert_eq!(picked_addr, fresh_addr);
assert_eq!(retries, 1, "went through fresh round, not breadth");
assert_eq!(attempts_at_hop, 1, "attempts_at_hop reset by fresh round");
}
#[test]
fn advance_exhausted_after_max_retries() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let mut alternatives: Vec<PeerKeyLocation> = Vec::new();
let mut retries = MAX_RETRIES;
let mut attempts_at_hop = 1usize;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| panic!("fresh_candidates must not be called when retries == MAX"),
);
assert!(result.is_none(), "exhausted case returns None");
assert_eq!(retries, MAX_RETRIES, "retries unchanged when exhausted");
}
#[test]
fn advance_exhausted_when_fresh_round_returns_empty() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let mut alternatives: Vec<PeerKeyLocation> = Vec::new();
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| Vec::new(),
);
assert!(result.is_none());
assert_eq!(
retries, 1,
"retries incremented even though fresh round was empty \
— the round was 'attempted', it just found nothing"
);
}
#[test]
fn advance_fresh_round_leftover_becomes_new_alternatives() {
let id = contract_id();
let tx = fresh_tx();
let mut visited = VisitedPeers::new(&tx);
let mut tried_peers: HashSet<std::net::SocketAddr> = HashSet::new();
let mut alternatives: Vec<PeerKeyLocation> = Vec::new();
let mut retries = 0usize;
let mut attempts_at_hop = 1usize;
let p1 = peer_at("10.0.0.1:1001");
let p2 = peer_at("10.0.0.2:1002");
let p3 = peer_at("10.0.0.3:1003");
let p1_addr = p1.socket_addr().unwrap();
let p2_addr = p2.socket_addr().unwrap();
let p3_addr = p3.socket_addr().unwrap();
let result = advance_to_next_peer_impl(
&id,
&mut visited,
&mut tried_peers,
&mut alternatives,
&mut retries,
&mut attempts_at_hop,
|_, _| vec![p1.clone(), p2.clone(), p3.clone()],
);
let (_, picked) = result.expect("fresh round returns first candidate");
assert_eq!(picked, p1_addr);
assert_eq!(
alternatives.len(),
2,
"rest of fresh becomes new alternatives"
);
let alt_addrs: Vec<_> = alternatives
.iter()
.filter_map(|p| p.socket_addr())
.collect();
assert!(alt_addrs.contains(&p2_addr));
assert!(alt_addrs.contains(&p3_addr));
}
}