use super::*;
use crate::{message::Transaction, ring::PeerKeyLocation, util::Contains};
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
use std::collections::HashSet;
use std::net::SocketAddr;
fn random_peer() -> PeerKeyLocation {
PeerKeyLocation::random()
}
#[allow(clippy::type_complexity)]
struct TestRing {
pub k_closest_calls:
std::sync::Arc<tokio::sync::Mutex<Vec<(ContractInstanceId, Vec<SocketAddr>, usize)>>>,
pub candidates: Vec<PeerKeyLocation>,
pub own_addr: SocketAddr,
}
impl TestRing {
fn new(candidates: Vec<PeerKeyLocation>, own_location: PeerKeyLocation) -> Self {
let own_addr = own_location
.socket_addr()
.expect("own location must have address");
Self {
k_closest_calls: std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new())),
candidates,
own_addr,
}
}
pub async fn k_closest_potentially_hosting(
&self,
instance_id: &ContractInstanceId,
skip_list: impl Contains<SocketAddr> + Clone,
k: usize,
) -> Vec<PeerKeyLocation> {
let mut skip_vec: Vec<SocketAddr> = self
.candidates
.iter()
.filter_map(|peer| {
peer.socket_addr()
.filter(|addr| skip_list.has_element(*addr))
})
.collect();
if skip_list.has_element(self.own_addr) && !skip_vec.contains(&self.own_addr) {
skip_vec.push(self.own_addr);
}
self.k_closest_calls
.lock()
.await
.push((*instance_id, skip_vec, k));
self.candidates
.iter()
.filter(|peer| {
peer.socket_addr()
.map(|addr| !skip_list.has_element(addr))
.unwrap_or(true)
})
.take(k)
.cloned()
.collect()
}
}
#[tokio::test]
async fn test_subscription_routing_calls_k_closest_with_skip_list() {
let contract_key = ContractKey::from_id_and_code(
ContractInstanceId::new([10u8; 32]),
CodeHash::new([11u8; 32]),
);
let peer1 = random_peer();
let peer2 = random_peer();
let peer3 = random_peer();
let own_location = random_peer();
let peer1_addr = peer1.socket_addr().expect("peer1 must have address");
let peer2_addr = peer2.socket_addr().expect("peer2 must have address");
let own_addr = own_location
.socket_addr()
.expect("own location must have address");
let test_ring = TestRing::new(
vec![peer1.clone(), peer2.clone(), peer3.clone()],
own_location.clone(),
);
let sub_op = start_op(*contract_key.id(), false);
assert!(matches!(sub_op.state, SubscribeState::PrepareRequest(_)));
let mut initial_skip: HashSet<SocketAddr> = HashSet::new();
initial_skip.insert(own_addr);
let initial_candidates = test_ring
.k_closest_potentially_hosting(contract_key.id(), &initial_skip, 3)
.await;
let k_closest_calls = test_ring.k_closest_calls.lock().await;
assert_eq!(
k_closest_calls.len(),
1,
"Should have called k_closest_potentially_hosting once"
);
assert_eq!(
k_closest_calls[0].0,
*contract_key.id(),
"Should query for correct contract"
);
assert_eq!(
k_closest_calls[0].1.len(),
1,
"Initial call should only skip own peer"
);
assert_eq!(
k_closest_calls[0].1[0], own_addr,
"Initial skip list should contain own address"
);
assert_eq!(k_closest_calls[0].2, 3, "Should request 3 candidates");
drop(k_closest_calls);
assert_eq!(
initial_candidates.len(),
3,
"Should return all 3 candidates initially"
);
assert_eq!(initial_candidates[0], peer1, "Should return peer1 first");
let mut skip_list: HashSet<SocketAddr> = HashSet::new();
skip_list.insert(peer1_addr);
let candidates_after_failure = test_ring
.k_closest_potentially_hosting(contract_key.id(), &skip_list, 3)
.await;
let k_closest_calls = test_ring.k_closest_calls.lock().await;
assert_eq!(
k_closest_calls.len(),
2,
"Should have called k_closest_potentially_hosting twice"
);
assert_eq!(
k_closest_calls[1].0,
*contract_key.id(),
"Second call should query for correct contract"
);
assert_eq!(
k_closest_calls[1].1.len(),
1,
"Skip list should contain 1 failed peer"
);
assert_eq!(
k_closest_calls[1].1[0], peer1_addr,
"Skip list should contain the failed peer's address"
);
assert_eq!(k_closest_calls[1].2, 3, "Should still request 3 candidates");
drop(k_closest_calls);
assert!(
!candidates_after_failure
.iter()
.any(|p| p.socket_addr() == Some(peer1_addr)),
"Failed peer should be excluded from candidates"
);
assert!(
!candidates_after_failure.is_empty(),
"Should find alternative candidates"
);
assert_eq!(
candidates_after_failure[0], peer2,
"Should return peer2 as first alternative"
);
skip_list.insert(peer2_addr); let final_candidates = test_ring
.k_closest_potentially_hosting(contract_key.id(), &skip_list, 3)
.await;
let k_closest_calls = test_ring.k_closest_calls.lock().await;
assert_eq!(
k_closest_calls.len(),
3,
"Should have called k_closest_potentially_hosting three times"
);
assert_eq!(
k_closest_calls[2].1.len(),
2,
"Final skip list should contain 2 failed peers"
);
drop(k_closest_calls);
assert_eq!(
final_candidates.len(),
1,
"Should have 1 remaining candidate"
);
assert_eq!(
final_candidates[0], peer3,
"Should return peer3 as final option"
);
assert!(
!final_candidates
.iter()
.any(|p| p.socket_addr() == Some(peer1_addr) || p.socket_addr() == Some(peer2_addr)),
"Failed peers should be excluded"
);
}
#[tokio::test]
async fn test_subscription_production_code_paths_use_k_closest() {
let contract_key = ContractKey::from_id_and_code(
ContractInstanceId::new([11u8; 32]),
CodeHash::new([12u8; 32]),
);
let peer1 = random_peer();
let peer2 = random_peer();
let peer3 = random_peer();
let own_location = random_peer();
let peer1_addr = peer1.socket_addr().expect("peer1 must have address");
let peer2_addr = peer2.socket_addr().expect("peer2 must have address");
let own_addr = own_location
.socket_addr()
.expect("own location must have address");
let test_ring = TestRing::new(
vec![peer1.clone(), peer2.clone(), peer3.clone()],
own_location.clone(),
);
let sub_op = start_op(*contract_key.id(), false);
assert!(matches!(sub_op.state, SubscribeState::PrepareRequest(_)));
let mut initial_skip: HashSet<SocketAddr> = HashSet::new();
initial_skip.insert(own_addr);
let initial_candidates = test_ring
.k_closest_potentially_hosting(contract_key.id(), &initial_skip, 3)
.await;
let k_closest_calls = test_ring.k_closest_calls.lock().await;
assert_eq!(
k_closest_calls.len(),
1,
"Should have recorded initial call"
);
assert_eq!(
k_closest_calls[0].0,
*contract_key.id(),
"Should use correct contract key"
);
assert_eq!(
k_closest_calls[0].1.len(),
1,
"Should skip own peer initially"
);
assert_eq!(
k_closest_calls[0].1[0], own_addr,
"Skip list should contain own address"
);
assert_eq!(k_closest_calls[0].2, 3, "Should request 3 candidates");
drop(k_closest_calls);
assert_eq!(
initial_candidates.len(),
3,
"Should return all 3 candidates"
);
assert_eq!(initial_candidates[0], peer1, "Should return peer1 first");
let mut skip_list: HashSet<SocketAddr> = HashSet::new();
skip_list.insert(peer1_addr);
let seek_candidates = test_ring
.k_closest_potentially_hosting(contract_key.id(), &skip_list, 3)
.await;
let k_closest_calls = test_ring.k_closest_calls.lock().await;
assert_eq!(k_closest_calls.len(), 2, "Should have recorded second call");
assert_eq!(
k_closest_calls[1].0,
*contract_key.id(),
"Should use correct contract key"
);
assert_eq!(
k_closest_calls[1].1.len(),
1,
"Should include failed peer in skip list"
);
assert_eq!(
k_closest_calls[1].1[0], peer1_addr,
"Should skip the failed peer's address"
);
assert_eq!(k_closest_calls[1].2, 3, "Should still request 3 candidates");
drop(k_closest_calls);
assert!(
!seek_candidates
.iter()
.any(|p| p.socket_addr() == Some(peer1_addr)),
"Should exclude failed peer"
);
assert_eq!(
seek_candidates.len(),
2,
"Should return remaining 2 candidates"
);
assert_eq!(seek_candidates[0], peer2, "Should return peer2 first");
skip_list.insert(peer2_addr); let retry_candidates = test_ring
.k_closest_potentially_hosting(contract_key.id(), &skip_list, 3)
.await;
let k_closest_calls = test_ring.k_closest_calls.lock().await;
assert_eq!(k_closest_calls.len(), 3, "Should have recorded third call");
assert_eq!(
k_closest_calls[2].0,
*contract_key.id(),
"Should use correct contract key"
);
assert_eq!(
k_closest_calls[2].1.len(),
2,
"Should include both failed peers in skip list"
);
assert!(
k_closest_calls[2].1.contains(&peer1_addr),
"Should skip peer1"
);
assert!(
k_closest_calls[2].1.contains(&peer2_addr),
"Should skip peer2"
);
assert_eq!(k_closest_calls[2].2, 3, "Should still request 3 candidates");
drop(k_closest_calls);
assert!(
!retry_candidates
.iter()
.any(|p| p.socket_addr() == Some(peer1_addr) || p.socket_addr() == Some(peer2_addr)),
"Should exclude both failed peers"
);
assert_eq!(retry_candidates.len(), 1, "Should return final 1 candidate");
assert_eq!(
retry_candidates[0], peer3,
"Should return peer3 as last option"
);
}
#[tokio::test]
async fn test_subscription_validates_k_closest_usage() {
let contract_key =
ContractKey::from_id_and_code(ContractInstanceId::new([1u8; 32]), CodeHash::new([2u8; 32]));
let transaction_id = Transaction::new::<SubscribeMsg>();
let test_ring = TestRing::new(
vec![random_peer(), random_peer(), random_peer()],
random_peer(),
);
{
let mut initial_skip: HashSet<SocketAddr> = HashSet::new();
initial_skip.insert(test_ring.own_addr);
let _candidates = test_ring
.k_closest_potentially_hosting(contract_key.id(), &initial_skip, 3)
.await;
let calls = test_ring.k_closest_calls.lock().await;
assert_eq!(calls.len(), 1, "Should record the call");
let (instance_id, skip_list, k) = &calls[0];
assert_eq!(*instance_id, *contract_key.id());
assert_eq!(
skip_list.len(),
1,
"First attempt should only skip own peer"
);
assert_eq!(
skip_list[0], test_ring.own_addr,
"Skip list should contain own address"
);
assert_eq!(*k, 3, "Uses k=3 as per fix");
}
{
test_ring.k_closest_calls.lock().await.clear();
let failed_addr = test_ring.candidates[0]
.socket_addr()
.expect("candidate must have address");
let skip_list = [failed_addr];
let candidates = test_ring
.k_closest_potentially_hosting(contract_key.id(), &skip_list[..], 3)
.await;
let calls = test_ring.k_closest_calls.lock().await;
let (instance_id, used_skip_list, k) = &calls[0];
assert_eq!(*instance_id, *contract_key.id());
assert_eq!(used_skip_list.len(), 1, "Skip list includes failed peer");
assert_eq!(used_skip_list[0], failed_addr);
assert_eq!(*k, 3);
assert!(
!candidates
.iter()
.any(|c| c.socket_addr() == Some(failed_addr)),
"Failed peer must be excluded"
);
}
{
let op = SubscribeOp {
id: transaction_id,
state: SubscribeState::AwaitingResponse(super::AwaitingResponseData {
next_hop: None,
instance_id: *contract_key.id(),
retries: 0,
current_hop: 0,
tried_peers: std::collections::HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: 0,
visited: crate::operations::VisitedPeers::new(&transaction_id),
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(matches!(op.state, SubscribeState::AwaitingResponse(_)));
}
}
#[test]
fn test_start_op_creates_prepare_request_state() {
let instance_id = ContractInstanceId::new([42u8; 32]);
let sub_op = start_op(instance_id, false);
assert!(
matches!(
sub_op.state,
SubscribeState::PrepareRequest(ref data)
if data.instance_id == instance_id && data.id == sub_op.id
),
"start_op should create PrepareRequest state with correct instance_id"
);
assert_eq!(
sub_op.requester_addr, None,
"Local subscription should have no requester address"
);
let tx_id = sub_op.id;
assert_ne!(
format!("{}", tx_id),
"",
"Transaction ID should be generated"
);
}
#[test]
fn test_start_op_with_id_uses_provided_transaction() {
let instance_id = ContractInstanceId::new([99u8; 32]);
let custom_tx = Transaction::new::<SubscribeMsg>();
let sub_op = start_op_with_id(instance_id, custom_tx, false);
assert_eq!(
sub_op.id, custom_tx,
"start_op_with_id should use provided transaction ID"
);
assert!(
matches!(
sub_op.state,
SubscribeState::PrepareRequest(ref data)
if data.instance_id == instance_id && data.id == custom_tx
),
"PrepareRequest state should have provided transaction ID"
);
assert_eq!(
sub_op.requester_addr, None,
"Local subscription should have no requester address"
);
}
#[test]
fn test_subscribe_op_state_lifecycle() {
let instance_id = ContractInstanceId::new([7u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([8u8; 32]));
let tx_id = Transaction::new::<SubscribeMsg>();
let peer_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let op_initial = SubscribeOp {
id: tx_id,
state: SubscribeState::PrepareRequest(super::PrepareRequestData {
id: tx_id,
instance_id,
is_renewal: false,
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
!op_initial.finalized(),
"PrepareRequest should not be finalized"
);
assert!(
!op_initial.is_completed(),
"PrepareRequest should not be completed"
);
let op_awaiting = SubscribeOp {
id: tx_id,
state: SubscribeState::AwaitingResponse(super::AwaitingResponseData {
next_hop: Some(peer_addr),
instance_id,
retries: 0,
current_hop: 0,
tried_peers: std::collections::HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: 0,
visited: crate::operations::VisitedPeers::new(&tx_id),
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
!op_awaiting.finalized(),
"AwaitingResponse should not be finalized"
);
assert!(
!op_awaiting.is_completed(),
"AwaitingResponse should not be completed"
);
assert_eq!(
op_awaiting.get_next_hop_addr(),
Some(peer_addr),
"Should return next hop address for routing"
);
let op_completed = SubscribeOp {
id: tx_id,
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
op_completed.finalized(),
"Completed state should be finalized"
);
assert!(
op_completed.is_completed(),
"Completed state should be completed"
);
let result = op_completed.to_host_result();
assert!(
result.is_ok(),
"Completed operation should return Ok result"
);
}
#[test]
fn test_subscribe_op_failed_state_returns_error() {
let tx_id = Transaction::new::<SubscribeMsg>();
let op_failed = SubscribeOp {
id: tx_id,
state: SubscribeState::Failed,
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
let result = op_failed.to_host_result();
assert!(result.is_err(), "Failed subscription should return error");
if let Err(err) = result {
let error_msg = format!("{:?}", err);
assert!(
error_msg.contains("subscribe didn't finish successfully"),
"Error should indicate subscription failure"
);
}
}
#[test]
fn test_local_subscription_completion_state() {
let instance_id = ContractInstanceId::new([15u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([16u8; 32]));
let tx_id = Transaction::new::<SubscribeMsg>();
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None, requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(op.finalized(), "Local subscription should be finalized");
assert!(op.is_completed(), "Local subscription should be completed");
let result = op.to_host_result();
assert!(result.is_ok(), "Local subscription should succeed");
if let Ok(host_response) = result {
let response_str = format!("{:?}", host_response);
assert!(
response_str.contains("SubscribeResponse"),
"Should return SubscribeResponse"
);
assert!(
response_str.contains("subscribed: true"),
"Should indicate successful subscription"
);
}
}
#[test]
fn test_is_renewal_flag() {
let instance_id = ContractInstanceId::new([20u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([21u8; 32]));
let tx = Transaction::new::<SubscribeMsg>();
let renewal_op = SubscribeOp {
id: tx,
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: true,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(renewal_op.is_renewal());
let client_op = SubscribeOp {
id: tx,
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(!client_op.is_renewal());
}
#[test]
fn test_op_enum_is_subscription_renewal() {
use crate::operations::OpEnum;
let instance_id = ContractInstanceId::new([22u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([23u8; 32]));
let tx = Transaction::new::<SubscribeMsg>();
let renewal = OpEnum::Subscribe(SubscribeOp {
id: tx,
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: true,
stats: None,
ack_received: false,
speculative_paths: 0,
});
assert!(renewal.is_subscription_renewal());
let non_renewal = OpEnum::Subscribe(SubscribeOp {
id: tx,
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
});
assert!(!non_renewal.is_subscription_renewal());
}
#[test]
fn test_subscribe_failure_outcome() {
use crate::operations::OpOutcome;
use crate::ring::{Location, PeerKeyLocation};
let tx = Transaction::new::<SubscribeMsg>();
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let op_with_stats = SubscribeOp {
id: tx,
state: SubscribeState::Failed, requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: Some(super::SubscribeStats {
target_peer: target_peer.clone(),
contract_location,
}),
ack_received: false,
speculative_paths: 0,
};
match op_with_stats.outcome() {
OpOutcome::ContractOpFailure {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpSuccessUntimed { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpFailure for non-finalized op with stats")
}
}
let instance_id = ContractInstanceId::new([30u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([31u8; 32]));
let op_completed = SubscribeOp {
id: tx,
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: Some(super::SubscribeStats {
target_peer: target_peer.clone(),
contract_location,
}),
ack_received: false,
speculative_paths: 0,
};
match op_completed.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpFailure { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed for completed subscribe with stats")
}
}
let op_no_stats = SubscribeOp {
id: tx,
state: SubscribeState::Failed,
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
matches!(op_no_stats.outcome(), OpOutcome::Incomplete),
"Non-finalized op without stats should return Incomplete"
);
let op_for_info = SubscribeOp {
id: tx,
state: SubscribeState::Failed,
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: Some(super::SubscribeStats {
target_peer: target_peer.clone(),
contract_location,
}),
ack_received: false,
speculative_paths: 0,
};
let (peer, loc) = op_for_info
.failure_routing_info()
.expect("Should have routing info");
assert_eq!(peer, target_peer);
assert_eq!(loc, contract_location);
}
#[test]
fn test_subscribe_outcome_success_untimed_with_stats() {
use crate::operations::OpOutcome;
use crate::ring::{Location, PeerKeyLocation};
let instance_id = ContractInstanceId::new([40u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([41u8; 32]));
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let op = SubscribeOp {
id: Transaction::new::<SubscribeMsg>(),
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: Some(super::SubscribeStats {
target_peer: target_peer.clone(),
contract_location,
}),
ack_received: false,
speculative_paths: 0,
};
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
other @ OpOutcome::ContractOpSuccess { .. }
| other @ OpOutcome::ContractOpFailure { .. }
| other @ OpOutcome::Incomplete
| other @ OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed, got {other:?}")
}
}
}
#[test]
fn test_subscribe_outcome_irrelevant_without_stats() {
use crate::operations::OpOutcome;
let instance_id = ContractInstanceId::new([42u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([43u8; 32]));
let op = SubscribeOp {
id: Transaction::new::<SubscribeMsg>(),
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(matches!(op.outcome(), OpOutcome::Irrelevant));
}
#[test]
fn test_subscribe_outcome_failure_with_stats() {
use crate::operations::OpOutcome;
use crate::ring::{Location, PeerKeyLocation};
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let op = SubscribeOp {
id: Transaction::new::<SubscribeMsg>(),
state: SubscribeState::Failed, requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: Some(super::SubscribeStats {
target_peer: target_peer.clone(),
contract_location,
}),
ack_received: false,
speculative_paths: 0,
};
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
other @ OpOutcome::ContractOpSuccess { .. }
| other @ OpOutcome::ContractOpSuccessUntimed { .. }
| other @ OpOutcome::Incomplete
| other @ OpOutcome::Irrelevant => panic!("Expected ContractOpFailure, got {other:?}"),
}
}
#[test]
fn test_subscribe_outcome_incomplete_without_stats() {
use crate::operations::OpOutcome;
let op = SubscribeOp {
id: Transaction::new::<SubscribeMsg>(),
state: SubscribeState::Failed,
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
}
#[test]
fn test_subscribe_stats_lifecycle() {
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let tx = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([50u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([51u8; 32]));
let mut op = SubscribeOp {
id: tx,
state: SubscribeState::AwaitingResponse(super::AwaitingResponseData {
next_hop: None,
instance_id,
retries: 0,
current_hop: 0,
tried_peers: std::collections::HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: 0,
visited: crate::operations::VisitedPeers::new(&tx),
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
op.stats = Some(super::SubscribeStats {
target_peer: target_peer.clone(),
contract_location,
});
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpSuccessUntimed { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpFailure for in-progress subscribe with stats")
}
}
op.state = SubscribeState::Completed(super::CompletedData { key: contract_key });
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpFailure { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed for completed subscribe with stats")
}
}
}
#[test]
fn test_subscribe_renewal_reports_outcome() {
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let instance_id = ContractInstanceId::new([60u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([61u8; 32]));
let op = SubscribeOp {
id: Transaction::new::<SubscribeMsg>(),
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: true, stats: Some(super::SubscribeStats {
target_peer: target_peer.clone(),
contract_location,
}),
ack_received: false,
speculative_paths: 0,
};
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpFailure { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed for renewal subscribe")
}
}
}
#[test]
fn test_create_unsubscribe_op() {
let instance_id = ContractInstanceId::new([77u8; 32]);
let tx = Transaction::new::<SubscribeMsg>();
let target_addr: SocketAddr = "10.0.0.1:9000".parse().unwrap();
let op = create_unsubscribe_op(instance_id, tx, target_addr);
assert_eq!(op.id, tx);
assert!(!op.is_renewal);
match &op.state {
SubscribeState::AwaitingResponse(data) => {
assert_eq!(data.next_hop, Some(target_addr));
assert_eq!(data.instance_id, instance_id);
}
other @ SubscribeState::PrepareRequest(_)
| other @ SubscribeState::Completed(_)
| other @ SubscribeState::Failed => {
panic!("Expected AwaitingResponse state, got {:?}", other)
}
}
assert_eq!(op.get_next_hop_addr(), Some(target_addr));
}
#[test]
fn test_not_found_result_intermediate_node_sends_notfound() {
use crate::operations::OperationResult;
let tx = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([42u8; 32]);
let requester: SocketAddr = "10.0.0.1:9000".parse().unwrap();
let result = SubscribeOp::not_found_result(tx, instance_id, Some(requester), "HTL exhausted");
match result {
Ok(OperationResult::SendAndComplete {
msg,
next_hop,
stream_data,
}) => {
assert_eq!(
next_hop,
Some(requester),
"NotFound should be sent to requester"
);
assert!(stream_data.is_none());
let crate::message::NetMessage::V1(net_msg) = msg;
match net_msg {
crate::message::NetMessageV1::Subscribe(SubscribeMsg::Response {
id,
instance_id: resp_instance_id,
result,
}) => {
assert_eq!(id, tx);
assert_eq!(resp_instance_id, instance_id);
assert!(
matches!(result, SubscribeMsgResult::NotFound),
"Expected NotFound, got {:?}",
result
);
}
other @ crate::message::NetMessageV1::Connect(_)
| other @ crate::message::NetMessageV1::Put(_)
| other @ crate::message::NetMessageV1::Get(_)
| other @ crate::message::NetMessageV1::Subscribe(_)
| other @ crate::message::NetMessageV1::Update(_)
| other @ crate::message::NetMessageV1::Aborted(_)
| other @ crate::message::NetMessageV1::NeighborHosting { .. }
| other @ crate::message::NetMessageV1::InterestSync { .. }
| other @ crate::message::NetMessageV1::ReadyState { .. } => {
panic!("Expected Subscribe Response, got {:?}", other)
}
}
}
other => panic!(
"Expected Ok(SendAndComplete), got {:?}",
other.as_ref().map(|_| "Ok(other)").unwrap_or("Err")
),
}
}
#[test]
fn test_not_found_result_originator_returns_error() {
use crate::operations::OpError;
use crate::ring::RingError;
let tx = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([42u8; 32]);
let result = SubscribeOp::not_found_result(tx, instance_id, None, "no closer peers");
match result {
Err(OpError::RingError(RingError::NoHostingPeers(id))) => {
assert_eq!(
id, instance_id,
"Error should reference the correct contract"
);
}
Err(other) => panic!("Expected NoHostingPeers error, got: {other}"),
Ok(_) => panic!("Originator should get an error, not a message"),
}
}
#[ignore]
#[test]
fn test_should_forward_not_found_on_abort_state_guard() {
}
#[test]
fn test_awaiting_response_data_tracks_alternatives() {
let instance_id = ContractInstanceId::new([80u8; 32]);
let tx = Transaction::new::<SubscribeMsg>();
let peer1 = random_peer();
let peer2 = random_peer();
let peer3 = random_peer();
let peer1_addr = peer1.socket_addr().unwrap();
let mut tried_peers = HashSet::new();
tried_peers.insert(peer1_addr);
let data = AwaitingResponseData {
next_hop: Some(peer1_addr),
instance_id,
retries: 0,
current_hop: 10,
tried_peers,
alternatives: vec![peer2.clone(), peer3.clone()],
attempts_at_hop: 1,
visited: crate::operations::VisitedPeers::new(&tx),
};
assert_eq!(
data.alternatives.len(),
2,
"Should store remaining candidates"
);
assert_eq!(data.attempts_at_hop, 1, "First attempt at this hop");
assert_eq!(data.retries, 0, "No retry rounds yet");
assert_eq!(data.next_hop, Some(peer1_addr), "Target is first candidate");
}
#[test]
fn test_retry_phase1_state_transition() {
let instance_id = ContractInstanceId::new([81u8; 32]);
let tx = Transaction::new::<SubscribeMsg>();
let peer1 = random_peer();
let peer2 = random_peer();
let peer3 = random_peer();
let peer1_addr = peer1.socket_addr().unwrap();
let peer2_addr = peer2.socket_addr().unwrap();
let mut tried_peers = HashSet::new();
tried_peers.insert(peer1_addr);
let mut alternatives = vec![peer2.clone(), peer3.clone()];
let attempts_at_hop: usize = 1;
assert!(!alternatives.is_empty() && attempts_at_hop < MAX_BREADTH);
let next_target = alternatives.remove(0);
let next_addr = next_target.socket_addr().unwrap();
tried_peers.insert(next_addr);
let new_data = AwaitingResponseData {
next_hop: Some(next_addr),
instance_id,
retries: 0,
current_hop: 10,
tried_peers: tried_peers.clone(),
alternatives: alternatives.clone(),
attempts_at_hop: attempts_at_hop + 1,
visited: crate::operations::VisitedPeers::new(&tx),
};
assert_eq!(new_data.next_hop, Some(peer2_addr), "Should target peer2");
assert_eq!(
new_data.alternatives.len(),
1,
"One alternative remaining (peer3)"
);
assert_eq!(new_data.attempts_at_hop, 2, "Second attempt at this hop");
assert_eq!(new_data.retries, 0, "Still in same retry round");
assert!(
new_data.tried_peers.contains(&peer1_addr),
"peer1 still in tried set"
);
assert!(
new_data.tried_peers.contains(&peer2_addr),
"peer2 added to tried set"
);
}
#[test]
fn test_retry_phase1_boundary_max_breadth() {
let peer1 = random_peer();
let alternatives = [peer1]; let attempts_at_hop = MAX_BREADTH;
assert!(
alternatives.is_empty() || attempts_at_hop >= MAX_BREADTH,
"Phase 1 should NOT trigger when attempts_at_hop == MAX_BREADTH"
);
}
#[test]
fn test_retry_phase2_state_transition() {
let instance_id = ContractInstanceId::new([82u8; 32]);
let tx = Transaction::new::<SubscribeMsg>();
let new_peer = random_peer();
let new_peer_addr = new_peer.socket_addr().unwrap();
let extra_peer = random_peer();
let retries: usize = 3;
assert!(retries < MAX_RETRIES);
let mut new_candidates = vec![new_peer.clone(), extra_peer.clone()];
let next_target = new_candidates.remove(0);
let next_addr = next_target.socket_addr().unwrap();
let mut new_tried_peers = HashSet::new();
new_tried_peers.insert(next_addr);
let new_data = AwaitingResponseData {
next_hop: Some(next_addr),
instance_id,
retries: retries + 1,
current_hop: 8,
tried_peers: new_tried_peers.clone(),
alternatives: new_candidates.clone(),
attempts_at_hop: 1,
visited: crate::operations::VisitedPeers::new(&tx),
};
assert_eq!(
new_data.next_hop,
Some(new_peer_addr),
"Should target new candidate"
);
assert_eq!(new_data.retries, 4, "Retry counter incremented");
assert_eq!(
new_data.alternatives.len(),
1,
"Remaining new candidates stored"
);
assert_eq!(new_data.attempts_at_hop, 1, "Reset to 1 for new round");
assert_eq!(
new_data.tried_peers.len(),
1,
"Fresh tried_peers with only new target"
);
assert!(new_data.tried_peers.contains(&new_peer_addr));
}
#[test]
fn test_retry_phase2_boundary_max_retries() {
let retries = MAX_RETRIES;
assert!(
retries >= MAX_RETRIES,
"Phase 2 should NOT trigger when retries == MAX_RETRIES"
);
}
#[test]
fn test_retry_phase3_intermediate_node_forwards_notfound() {
let tx = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([83u8; 32]);
let requester: SocketAddr = "10.0.0.1:9000".parse().unwrap();
let op = SubscribeOp {
id: tx,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: Some("10.0.0.2:9000".parse().unwrap()),
instance_id,
retries: MAX_RETRIES,
current_hop: 5,
tried_peers: HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: MAX_BREADTH,
visited: crate::operations::VisitedPeers::new(&tx),
}),
requester_addr: Some(requester),
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
op.requester_addr.is_some(),
"Intermediate node should have requester_addr"
);
if let SubscribeState::AwaitingResponse(ref data) = op.state {
assert!(
data.alternatives.is_empty() || data.attempts_at_hop >= MAX_BREADTH,
"Phase 1 should be exhausted"
);
assert!(data.retries >= MAX_RETRIES, "Phase 2 should be exhausted");
} else {
panic!("Expected AwaitingResponse state");
}
}
#[test]
fn test_retry_phase3_originator_fails_locally() {
let tx = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([84u8; 32]);
let op = SubscribeOp {
id: tx,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: Some("10.0.0.2:9000".parse().unwrap()),
instance_id,
retries: MAX_RETRIES,
current_hop: 5,
tried_peers: HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: MAX_BREADTH,
visited: crate::operations::VisitedPeers::new(&tx),
}),
requester_addr: None, requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
op.requester_addr.is_none(),
"Originator should not have requester_addr"
);
}
#[test]
fn test_non_awaiting_response_states_skip_retry() {
let tx = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([85u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([86u8; 32]));
let op_prepare = SubscribeOp {
id: tx,
state: SubscribeState::PrepareRequest(PrepareRequestData {
id: tx,
instance_id,
is_renewal: false,
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
!matches!(op_prepare.state, SubscribeState::AwaitingResponse(_)),
"PrepareRequest should not trigger retry"
);
let op_completed = SubscribeOp {
id: tx,
state: SubscribeState::Completed(CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
!matches!(op_completed.state, SubscribeState::AwaitingResponse(_)),
"Completed should not trigger retry"
);
let op_failed = SubscribeOp {
id: tx,
state: SubscribeState::Failed,
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
!matches!(op_failed.state, SubscribeState::AwaitingResponse(_)),
"Failed should not trigger retry"
);
}
#[test]
fn test_visited_bloom_filter_accumulates_across_retries() {
let tx = Transaction::new::<SubscribeMsg>();
let peer1 = random_peer();
let peer2 = random_peer();
let peer3 = random_peer();
let peer1_addr = peer1.socket_addr().unwrap();
let peer2_addr = peer2.socket_addr().unwrap();
let peer3_addr = peer3.socket_addr().unwrap();
let mut visited = crate::operations::VisitedPeers::new(&tx);
visited.mark_visited(peer1_addr);
visited.mark_visited(peer2_addr);
assert!(visited.probably_visited(peer1_addr));
assert!(visited.probably_visited(peer2_addr));
assert!(!visited.probably_visited(peer3_addr));
visited.mark_visited(peer3_addr);
assert!(
visited.probably_visited(peer1_addr),
"peer1 still visited after Phase 2"
);
assert!(
visited.probably_visited(peer2_addr),
"peer2 still visited after Phase 2"
);
assert!(visited.probably_visited(peer3_addr), "peer3 now visited");
}
#[test]
fn test_tried_peers_reset_on_phase2_visited_preserved() {
let tx = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([87u8; 32]);
let peer1 = random_peer();
let peer2 = random_peer();
let peer1_addr = peer1.socket_addr().unwrap();
let peer2_addr = peer2.socket_addr().unwrap();
let mut visited = crate::operations::VisitedPeers::new(&tx);
visited.mark_visited(peer1_addr);
visited.mark_visited(peer2_addr);
let mut old_tried_peers = HashSet::new();
old_tried_peers.insert(peer1_addr);
old_tried_peers.insert(peer2_addr);
for addr in &old_tried_peers {
visited.mark_visited(*addr);
}
let new_peer = random_peer();
let new_peer_addr = new_peer.socket_addr().unwrap();
let mut new_tried_peers = HashSet::new();
new_tried_peers.insert(new_peer_addr);
visited.mark_visited(new_peer_addr);
let new_data = AwaitingResponseData {
next_hop: Some(new_peer_addr),
instance_id,
retries: 1,
current_hop: 8,
tried_peers: new_tried_peers,
alternatives: Vec::new(),
attempts_at_hop: 1,
visited: visited.clone(),
};
assert_eq!(
new_data.tried_peers.len(),
1,
"tried_peers reset for new round"
);
assert!(new_data.tried_peers.contains(&new_peer_addr));
assert!(
visited.probably_visited(peer1_addr),
"peer1 still in visited bloom"
);
assert!(
visited.probably_visited(peer2_addr),
"peer2 still in visited bloom"
);
assert!(
visited.probably_visited(new_peer_addr),
"new peer in visited bloom"
);
}
#[test]
fn test_retry_constants() {
assert_eq!(MAX_BREADTH, 3, "MAX_BREADTH should be 3 (same as GET)");
assert_eq!(MAX_RETRIES, 10, "MAX_RETRIES should be 10 (same as GET)");
}
use crate::operations::OpOutcome;
use crate::ring::Location;
fn awaiting_op(
instance_id: ContractInstanceId,
next_hop: Option<SocketAddr>,
stats: Option<SubscribeStats>,
) -> SubscribeOp {
let id = Transaction::new::<SubscribeMsg>();
SubscribeOp {
id,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop,
instance_id,
retries: 0,
current_hop: 3,
tried_peers: HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: 1,
visited: crate::operations::VisitedPeers::new(&id),
}),
requester_addr: next_hop.map(|_| "127.0.0.1:12345".parse().unwrap()),
requester_pub_key: None,
is_renewal: false,
stats,
ack_received: false,
speculative_paths: 0,
}
}
#[test]
fn intermediate_forward_with_stats_reports_failure() {
let target_peer = random_peer();
let contract_location = Location::random();
let instance_id = ContractInstanceId::new([1u8; 32]);
let op = awaiting_op(
instance_id,
target_peer.socket_addr(),
Some(SubscribeStats {
target_peer,
contract_location,
}),
);
let info = op
.failure_routing_info()
.expect("should have routing info for timeout reporting");
assert_eq!(info.1, contract_location);
assert!(!op.finalized());
assert!(
matches!(op.outcome(), OpOutcome::ContractOpFailure { .. }),
"timed-out forward should report failure, not Incomplete"
);
}
#[test]
fn subscribe_without_stats_returns_incomplete() {
let op = awaiting_op(ContractInstanceId::new([2u8; 32]), None, None);
assert!(op.failure_routing_info().is_none());
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
}
#[test]
fn completed_subscribe_reports_success() {
let target_peer = random_peer();
let contract_location = Location::random();
let key =
ContractKey::from_id_and_code(ContractInstanceId::new([3u8; 32]), CodeHash::new([4u8; 32]));
let op = SubscribeOp {
id: Transaction::new::<SubscribeMsg>(),
state: SubscribeState::Completed(CompletedData { key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: Some(SubscribeStats {
target_peer,
contract_location,
}),
ack_received: false,
speculative_paths: 0,
};
assert!(op.finalized());
assert!(matches!(
op.outcome(),
OpOutcome::ContractOpSuccessUntimed { .. }
));
}
fn make_retry_op(alternatives: Vec<PeerKeyLocation>, tried: &[PeerKeyLocation]) -> SubscribeOp {
let tx_id = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([50u8; 32]);
let mut tried_peers = HashSet::new();
for p in tried {
if let Some(addr) = p.socket_addr() {
tried_peers.insert(addr);
}
}
SubscribeOp {
id: tx_id,
state: SubscribeState::AwaitingResponse(super::AwaitingResponseData {
next_hop: tried.first().and_then(|p| p.socket_addr()),
instance_id,
retries: 0,
current_hop: 10,
tried_peers,
alternatives,
attempts_at_hop: 1,
visited: crate::operations::VisitedPeers::new(&tx_id),
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
}
}
fn extract_request_htl(msg: &SubscribeMsg) -> usize {
match msg {
SubscribeMsg::Request { htl, .. } => *htl,
other @ SubscribeMsg::Response { .. }
| other @ SubscribeMsg::Unsubscribe { .. }
| other @ SubscribeMsg::ForwardingAck { .. } => {
panic!("Expected Request, got {other:?}")
}
}
}
fn extract_awaiting_data(op: &SubscribeOp) -> &super::AwaitingResponseData {
match &op.state {
SubscribeState::AwaitingResponse(data) => data,
other @ SubscribeState::PrepareRequest(_)
| other @ SubscribeState::Completed(_)
| other @ SubscribeState::Failed => {
panic!("Expected AwaitingResponse, got {other:?}")
}
}
}
#[test]
fn test_retry_with_next_alternative_picks_next_peer() {
let peer1 = random_peer();
let peer2 = random_peer();
let peer3 = random_peer();
let op = make_retry_op(vec![peer2.clone(), peer3.clone()], &[peer1]);
assert!(op.is_originator(), "No requester_addr means originator");
let (new_op, msg) = op
.retry_with_next_alternative(10, &[])
.map_err(|_| "retry should succeed with alternatives available")
.unwrap();
assert_eq!(extract_request_htl(&msg), 5);
let data = extract_awaiting_data(&new_op);
assert!(data.tried_peers.contains(&peer2.socket_addr().unwrap()));
assert_eq!(data.alternatives.len(), 1, "peer3 should remain");
}
#[test]
fn test_retry_with_no_alternatives_returns_err() {
let op = make_retry_op(vec![], &[]);
let result = op.retry_with_next_alternative(10, &[]);
assert!(result.is_err(), "Should fail when no alternatives remain");
}
#[test]
fn test_retry_uses_fallback_peers() {
let fallback_peer = random_peer();
let op = make_retry_op(vec![], &[]);
let (new_op, _msg) = op
.retry_with_next_alternative(10, std::slice::from_ref(&fallback_peer))
.map_err(|_| "retry should succeed with fallback peers")
.unwrap();
let data = extract_awaiting_data(&new_op);
assert!(
data.tried_peers
.contains(&fallback_peer.socket_addr().unwrap()),
"Fallback peer should be marked as tried"
);
}
#[test]
fn test_retry_skips_tried_fallback_peers() {
let tried_peer = random_peer();
let op = make_retry_op(vec![], std::slice::from_ref(&tried_peer));
let result = op.retry_with_next_alternative(10, &[tried_peer]);
assert!(
result.is_err(),
"Should fail when all fallback peers already tried"
);
}
#[test]
fn test_retry_htl_decreases_with_attempts() {
let op = make_retry_op(vec![random_peer(), random_peer(), random_peer()], &[]);
let (op, msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry 1 failed"));
assert_eq!(extract_request_htl(&msg), 5);
let (op, msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry 2 failed"));
assert_eq!(extract_request_htl(&msg), super::MIN_RETRY_HTL);
let (_op, msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry 3 failed"));
assert_eq!(extract_request_htl(&msg), super::MIN_RETRY_HTL);
}
#[test]
fn test_retry_htl_floor_at_min() {
let tx_id = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([61u8; 32]);
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::AwaitingResponse(super::AwaitingResponseData {
next_hop: None,
instance_id,
retries: 0,
current_hop: 10,
tried_peers: HashSet::new(),
alternatives: vec![random_peer()],
attempts_at_hop: 20,
visited: crate::operations::VisitedPeers::new(&tx_id),
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
let (_op, msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry failed"));
assert!(
extract_request_htl(&msg) >= super::MIN_RETRY_HTL,
"HTL should not fall below MIN_RETRY_HTL"
);
}
#[test]
fn test_retry_htl_capped_at_max_hops() {
let alt = random_peer();
let op = make_retry_op(vec![alt], &[]);
let (_op, msg) = op
.retry_with_next_alternative(2, &[])
.unwrap_or_else(|_| panic!("retry failed"));
assert_eq!(
extract_request_htl(&msg),
2,
"HTL must not exceed max_hops_to_live even when MIN_RETRY_HTL is higher"
);
}
#[test]
fn test_retry_fallback_skips_bloom_visited() {
let visited_peer = random_peer();
let fresh_peer = random_peer();
let visited_addr = visited_peer.socket_addr().unwrap();
let tx_id = Transaction::new::<SubscribeMsg>();
let mut visited = crate::operations::VisitedPeers::new(&tx_id);
visited.mark_visited(visited_addr);
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::AwaitingResponse(super::AwaitingResponseData {
next_hop: None,
instance_id: ContractInstanceId::new([62u8; 32]),
retries: 0,
current_hop: 10,
tried_peers: HashSet::new(),
alternatives: vec![],
attempts_at_hop: 1,
visited,
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
let (new_op, _msg) = op
.retry_with_next_alternative(10, &[visited_peer, fresh_peer])
.unwrap_or_else(|_| panic!("retry failed"));
let data = extract_awaiting_data(&new_op);
assert_eq!(
data.alternatives.len(),
0,
"Only fresh_peer should be injected (visited_peer filtered by bloom)"
);
}
#[test]
fn test_retry_marks_target_in_bloom_filter() {
let alt = random_peer();
let alt_addr = alt.socket_addr().unwrap();
let op = make_retry_op(vec![alt], &[]);
let (new_op, _msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry failed"));
let data = extract_awaiting_data(&new_op);
assert!(
data.visited.probably_visited(alt_addr),
"Retry target should be marked in bloom filter"
);
}
#[test]
fn test_is_originator_false_for_intermediate_hop() {
let instance_id = ContractInstanceId::new([54u8; 32]);
let tx_id = Transaction::new::<SubscribeMsg>();
let requester: SocketAddr = "127.0.0.1:5555".parse().unwrap();
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::AwaitingResponse(super::AwaitingResponseData {
next_hop: None,
instance_id,
retries: 0,
current_hop: 10,
tried_peers: HashSet::new(),
alternatives: vec![],
attempts_at_hop: 0,
visited: crate::operations::VisitedPeers::new(&tx_id),
}),
requester_addr: Some(requester), requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(
!op.is_originator(),
"Op with requester_addr should not be originator"
);
}
#[test]
fn test_retry_fails_on_wrong_state() {
let instance_id = ContractInstanceId::new([55u8; 32]);
let contract_key = ContractKey::from_id_and_code(instance_id, CodeHash::new([56u8; 32]));
let tx_id = Transaction::new::<SubscribeMsg>();
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::PrepareRequest(super::PrepareRequestData {
id: tx_id,
instance_id,
is_renewal: false,
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(op.retry_with_next_alternative(10, &[]).is_err());
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::Completed(super::CompletedData { key: contract_key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(op.retry_with_next_alternative(10, &[]).is_err());
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::Failed,
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
};
assert!(op.retry_with_next_alternative(10, &[]).is_err());
}
#[test]
fn subscribe_forwarding_ack_serde_roundtrip() {
let id = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([42; 32]);
let msg = SubscribeMsg::ForwardingAck { id, instance_id };
let serialized = bincode::serialize(&msg).expect("serialize");
let deserialized: SubscribeMsg = bincode::deserialize(&serialized).expect("deserialize");
match deserialized {
SubscribeMsg::ForwardingAck {
id: deser_id,
instance_id: deser_iid,
} => {
assert_eq!(deser_id, id);
assert_eq!(deser_iid, instance_id);
}
other @ SubscribeMsg::Request { .. }
| other @ SubscribeMsg::Response { .. }
| other @ SubscribeMsg::Unsubscribe { .. } => {
panic!("Expected ForwardingAck, got {other}")
}
}
}