use anyhow::{Context, anyhow, bail};
use freenet::test_utils::{
self, TestContext, TestResult, make_get, make_put, make_subscribe, make_update,
};
use freenet_macros::freenet_test;
use freenet_stdlib::{
client_api::{ClientRequest, ContractResponse, HostResponse, NodeQuery, QueryResponse, WebApi},
prelude::*,
};
use std::{net::SocketAddr, time::Duration};
use tokio_tungstenite::connect_async;
async fn query_connected_peers(
client: &mut WebApi,
node_name: &str,
timeout: Duration,
attempt: u32,
) -> Option<Vec<(String, SocketAddr)>> {
if let Err(e) = client
.send(ClientRequest::NodeQueries(NodeQuery::ConnectedPeers))
.await
{
tracing::warn!(
attempt,
node = node_name,
error = %e,
"Failed to send query, will retry..."
);
return None;
}
match tokio::time::timeout(timeout, client.recv()).await {
Ok(Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers { peers }))) => Some(peers),
Ok(Ok(other)) => {
tracing::warn!(
attempt,
node = node_name,
?other,
"Unexpected response, will retry..."
);
None
}
Ok(Err(e)) => {
tracing::warn!(
attempt,
node = node_name,
error = %e,
"WebSocket error, will retry..."
);
None
}
Err(_elapsed) => {
tracing::warn!(
attempt,
node = node_name,
timeout_secs = timeout.as_secs(),
"Query timed out, will retry..."
);
None
}
}
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
aggregate_events = "always",
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_gateway_reconnection(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let peer = ctx.node("peer")?;
tokio::time::sleep(Duration::from_secs(5)).await;
let uri = peer.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api = WebApi::start(stream);
tracing::info!("Performing initial PUT to verify connectivity");
make_put(
&mut client_api,
wrapped_state.clone(),
contract.clone(),
false,
)
.await?;
let deadline = tokio::time::Instant::now() + Duration::from_secs(60);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
bail!("Timeout waiting for put response");
}
match tokio::time::timeout(remaining, client_api.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key);
tracing::info!("Initial PUT successful");
break;
}
Ok(Ok(other)) => {
tracing::debug!(
"Skipping stale response while waiting for PutResponse: {:?}",
other
);
continue;
}
Ok(Err(e)) => {
bail!("Error receiving put response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for put response");
}
}
}
tracing::info!("Verifying with GET");
make_get(&mut client_api, contract_key, true, false).await?;
let get_response = tokio::time::timeout(Duration::from_secs(60), client_api.recv()).await;
match get_response {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
contract: recv_contract,
state: recv_state,
..
}))) => {
assert_eq!(
recv_contract.as_ref().expect("Contract should exist").key(),
contract_key
);
if recv_state != wrapped_state {
tracing::error!("State mismatch!");
tracing::error!(
"Expected state: {:?}",
String::from_utf8_lossy(wrapped_state.as_ref())
);
tracing::error!(
"Received state: {:?}",
String::from_utf8_lossy(recv_state.as_ref())
);
}
assert_eq!(recv_state, wrapped_state);
tracing::info!("Initial GET successful");
}
Ok(Ok(other)) => {
bail!("Unexpected response while waiting for get: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving get response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for get response");
}
}
tracing::info!("Disconnecting from peer");
client_api
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_secs(3)).await;
tracing::info!("Reconnecting to peer");
let (stream, _) = connect_async(&uri).await?;
let mut client_api = WebApi::start(stream);
tokio::time::sleep(Duration::from_secs(5)).await;
tracing::info!("Performing GET after reconnection");
make_get(&mut client_api, contract_key, true, false).await?;
let deadline = tokio::time::Instant::now() + Duration::from_secs(60);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
bail!("Timeout waiting for get response after reconnection");
}
match tokio::time::timeout(remaining, client_api.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
contract: recv_contract,
state: recv_state,
..
}))) => {
assert_eq!(
recv_contract.as_ref().expect("Contract should exist").key(),
contract_key
);
assert_eq!(recv_state, wrapped_state);
tracing::info!(
"Reconnection test successful - peer can perform operations after reconnecting"
);
break;
}
Ok(Ok(other)) => {
tracing::debug!(
"Skipping stale response while waiting for GetResponse: {:?}",
other
);
continue;
}
Ok(Err(e)) => {
bail!("Error receiving get response after reconnection: {}", e);
}
Err(_) => {
bail!("Timeout waiting for get response after reconnection");
}
}
}
client_api
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 60,
startup_wait_secs = 5,
aggregate_events = "always",
tokio_flavor = "multi_thread",
tokio_worker_threads = 4,
)]
async fn test_basic_gateway_connectivity(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.node("gateway")?;
let uri = gateway.ws_url();
let result = tokio::time::timeout(Duration::from_secs(10), connect_async(&uri)).await;
match result {
Ok(Ok((stream, _))) => {
tracing::info!("Successfully connected to gateway WebSocket");
let mut client = WebApi::start(stream);
client
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
Ok(Err(e)) => {
bail!("Failed to connect to gateway: {}", e);
}
Err(_) => {
bail!("Timeout connecting to gateway");
}
}
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer1", "peer2"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
// Locations are derived from varied loopback IPs (127.x.y.1) which gives each node
// a unique location without needing explicit configuration
aggregate_events = "always",
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let gateway = ctx.node("gateway")?;
let peer1 = ctx.node("peer1")?;
let peer2 = ctx.node("peer2")?;
tracing::info!(
"Using deterministic node locations: gateway={:.3}, peer1={:.3}, peer2={:.3}",
gateway.location,
peer1.location,
peer2.location
);
let peer1_public_port = peer1.network_port.context(
"peer1 missing network port; auto_connect_peers requires public_port for mesh connectivity",
)?;
let peer2_public_port = peer2.network_port.context(
"peer2 missing network port; auto_connect_peers requires public_port for mesh connectivity",
)?;
tracing::info!(
peer1_port = peer1_public_port,
peer2_port = peer2_public_port,
"Verified peer network ports for direct connectivity"
);
tokio::time::sleep(Duration::from_secs(5)).await;
let (stream_gw, _) = connect_async(&gateway.ws_url()).await?;
let mut client_gw = WebApi::start(stream_gw);
let (stream1, _) = connect_async(&peer1.ws_url()).await?;
let mut client1 = WebApi::start(stream1);
let (stream2, _) = connect_async(&peer2.ws_url()).await?;
let mut client2 = WebApi::start(stream2);
const MESH_FORMATION_TIMEOUT: Duration = Duration::from_secs(90);
const QUERY_TIMEOUT: Duration = Duration::from_secs(10);
const RETRY_DELAY: Duration = Duration::from_secs(2);
let mesh_deadline = tokio::time::Instant::now() + MESH_FORMATION_TIMEOUT;
let mut mesh_established = false;
let mut last_snapshot = (String::new(), String::new(), String::new());
let mut attempt: u32 = 0;
while tokio::time::Instant::now() < mesh_deadline {
attempt += 1;
let remaining_secs = mesh_deadline
.saturating_duration_since(tokio::time::Instant::now())
.as_secs();
if attempt <= 5 {
tracing::info!(
"Attempt {} ({}s remaining): Querying all nodes for connected peers...",
attempt,
remaining_secs
);
}
tracing::info!(
attempt,
remaining_secs,
"Querying all nodes for connected peers..."
);
let Some(gw_peers) =
query_connected_peers(&mut client_gw, "gateway", QUERY_TIMEOUT, attempt).await
else {
tokio::time::sleep(RETRY_DELAY).await;
continue;
};
let Some(peer1_peers) =
query_connected_peers(&mut client1, "peer1", QUERY_TIMEOUT, attempt).await
else {
tokio::time::sleep(RETRY_DELAY).await;
continue;
};
let Some(peer2_peers) =
query_connected_peers(&mut client2, "peer2", QUERY_TIMEOUT, attempt).await
else {
tokio::time::sleep(RETRY_DELAY).await;
continue;
};
if attempt <= 5 {
tracing::info!(
" - Gateway has {} connections, Peer1 has {}, Peer2 has {}",
gw_peers.len(),
peer1_peers.len(),
peer2_peers.len()
);
}
tracing::info!(
gateway_connections = gw_peers.len(),
peer1_connections = peer1_peers.len(),
peer2_connections = peer2_peers.len(),
"Connection counts"
);
tracing::debug!("Gateway peers: {:?}", gw_peers);
tracing::debug!("Peer1 peers: {:?}", peer1_peers);
tracing::debug!("Peer2 peers: {:?}", peer2_peers);
last_snapshot = (
format!("{:?}", gw_peers),
format!("{:?}", peer1_peers),
format!("{:?}", peer2_peers),
);
let gateway_has_minimum = !gw_peers.is_empty();
let peer1_has_minimum = !peer1_peers.is_empty();
let peer2_has_minimum = !peer2_peers.is_empty();
if gateway_has_minimum && peer1_has_minimum && peer2_has_minimum {
let is_full_mesh =
gw_peers.len() >= 2 && peer1_peers.len() >= 2 && peer2_peers.len() >= 2;
if is_full_mesh {
tracing::info!("Full mesh connectivity established!");
} else {
tracing::info!(
"Minimum connectivity achieved (all nodes connected, network is reachable)"
);
}
mesh_established = true;
break;
}
tracing::info!("Network not yet meeting minimum connectivity, waiting...");
tokio::time::sleep(RETRY_DELAY).await;
}
if !mesh_established {
tracing::error!(
gateway_peers = %last_snapshot.0,
peer1_peers = %last_snapshot.1,
peer2_peers = %last_snapshot.2,
"Connectivity check failed; dumping last snapshot"
);
if let Ok(aggregator) = ctx.aggregate_events().await {
if let Ok(events) = aggregator.get_all_events().await {
tracing::error!(total_events = events.len(), "Aggregated events at timeout");
for event in events.iter().rev().take(10).rev() {
tracing::error!(?event.kind, peer=%event.peer_id, ts=%event.datetime, "Recent event");
}
}
}
bail!(
"Failed to establish minimum connectivity after {} attempts ({}s timeout). Gateway peers: {}; peer1 peers: {}; peer2 peers: {}",
attempt,
MESH_FORMATION_TIMEOUT.as_secs(),
last_snapshot.0,
last_snapshot.1,
last_snapshot.2
);
}
tokio::time::sleep(Duration::from_secs(2)).await;
tracing::info!("Verifying network functionality with PUT/GET operations");
const PUT_RETRIES: usize = 3;
perform_put_with_retries(
&mut client1,
&wrapped_state,
&contract,
&contract_key,
PUT_RETRIES,
)
.await?;
make_get(&mut client2, contract_key, true, false).await?;
let get_response = tokio::time::timeout(Duration::from_secs(60), client2.recv()).await;
match get_response {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
contract: recv_contract,
state: recv_state,
..
}))) => {
assert_eq!(recv_contract.as_ref().unwrap().key(), contract_key);
assert_eq!(recv_state, wrapped_state);
tracing::info!("✅ Peer2 successfully retrieved data from network");
}
Ok(Ok(other)) => bail!("Unexpected GET response: {:?}", other),
Ok(Err(e)) => bail!("Error receiving GET response: {}", e),
Err(_) => bail!("Timeout waiting for GET response"),
}
tracing::info!("Testing UPDATE propagation via neighbor hosting");
make_subscribe(&mut client2, contract_key).await?;
loop {
let resp = tokio::time::timeout(Duration::from_secs(30), client2.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
}))) => {
assert_eq!(key, contract_key, "Subscribe response key mismatch");
assert!(subscribed, "Subscribe should succeed");
tracing::info!("✅ Peer2 subscribed to contract updates");
break;
}
Ok(Ok(other)) => {
tracing::debug!("Ignoring non-subscribe response: {:?}", other);
continue;
}
Ok(Err(e)) => bail!("Error receiving subscribe response: {}", e),
Err(_) => bail!("Timeout waiting for subscribe response"),
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
let mut todo_list: test_utils::TodoList =
serde_json::from_slice(wrapped_state.as_ref()).expect("deserialize state");
todo_list.tasks.push(test_utils::Task {
id: todo_list.tasks.len() as u64 + 1,
title: "Proximity cache test".to_string(),
description: "Update via proximity cache test".to_string(),
completed: false,
priority: 1,
});
todo_list.version += 1;
let updated_bytes = serde_json::to_vec(&todo_list).expect("serialize updated state");
let updated_state = WrappedState::from(updated_bytes);
make_update(&mut client1, contract_key, updated_state.clone()).await?;
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
bail!("Timeout waiting for UpdateResponse on peer1");
}
match tokio::time::timeout(remaining, client1.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key,
..
}))) => {
assert_eq!(key, contract_key);
tracing::info!("✅ Peer1 received UpdateResponse");
break;
}
Ok(Ok(other)) => {
tracing::debug!(
"Skipping stale response while waiting for UpdateResponse: {:?}",
other
);
continue;
}
Ok(Err(e)) => bail!("Error receiving UpdateResponse: {}", e),
Err(_) => bail!("Timeout waiting for UpdateResponse on peer1"),
}
}
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
bail!(
"Timeout waiting for UpdateNotification on peer2 - \
UPDATE may not have propagated via proximity cache (issue #2294)"
);
}
match tokio::time::timeout(remaining, client2.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
update,
}))) => {
assert_eq!(key, contract_key);
match update {
UpdateData::State(state) => {
let received_list: test_utils::TodoList =
serde_json::from_slice(state.as_ref())
.expect("deserialize update notification state");
let has_our_task = received_list
.tasks
.iter()
.any(|t| t.title == "Proximity cache test");
assert!(
has_our_task,
"Update notification state should contain our task. Got: {:?}",
received_list.tasks
);
tracing::info!(
"✅ Peer2 received UpdateNotification via proximity cache (issue #2294 regression test passed)"
);
}
other @ UpdateData::Delta(_)
| other @ UpdateData::StateAndDelta { .. }
| other @ UpdateData::RelatedState { .. }
| other @ UpdateData::RelatedDelta { .. }
| other @ UpdateData::RelatedStateAndDelta { .. }
| other => {
bail!("Unexpected update data type: {:?}", other)
}
}
break;
}
Ok(Ok(other)) => {
tracing::debug!(
"Skipping stale response while waiting for UpdateNotification: {:?}",
other
);
continue;
}
Ok(Err(e)) => bail!("Error receiving UpdateNotification: {}", e),
Err(_) => bail!(
"Timeout waiting for UpdateNotification on peer2 - \
UPDATE may not have propagated via proximity cache (issue #2294)"
),
}
}
client_gw
.send(ClientRequest::Disconnect { cause: None })
.await?;
client1
.send(ClientRequest::Disconnect { cause: None })
.await?;
client2
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
async fn perform_put_with_retries(
client: &mut WebApi,
wrapped_state: &WrappedState,
contract: &ContractContainer,
contract_key: &ContractKey,
max_attempts: usize,
) -> TestResult {
let mut last_err: Option<anyhow::Error> = None;
for attempt in 1..=max_attempts {
tracing::info!(attempt, max_attempts, "Starting PUT attempt");
if let Err(err) = make_put(client, wrapped_state.clone(), contract.clone(), false).await {
last_err = Some(err);
} else {
match tokio::time::timeout(Duration::from_secs(60), client.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
if key == *contract_key {
tracing::info!(attempt, "Peer1 successfully performed PUT");
return Ok(());
}
last_err = Some(anyhow!(
"Received PUT response for unexpected key {key:?} (expected {contract_key:?})"
));
}
Ok(Ok(other)) => {
last_err = Some(anyhow!("Unexpected PUT response: {other:?}"));
}
Ok(Err(e)) => {
last_err = Some(anyhow!("Error receiving PUT response: {e}"));
}
Err(_) => {
last_err = Some(anyhow!("Timeout waiting for PUT response"));
}
}
}
tracing::warn!(
attempt,
max_attempts,
"PUT attempt failed; retrying after short delay"
);
tokio::time::sleep(Duration::from_secs(2)).await;
}
Err(last_err.unwrap_or_else(|| anyhow!("PUT failed after {max_attempts} attempts")))
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 120,
// Reduced startup wait - we'll poll for actual connection establishment
startup_wait_secs = 5,
aggregate_events = "always",
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_gateway_reports_peer_identity_after_connect(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.node("gateway")?;
let peer = ctx.node("peer")?;
let (stream_gw, _) = connect_async(&gateway.ws_url()).await?;
let mut client_gw = WebApi::start(stream_gw);
let (stream_peer, _) = connect_async(&peer.ws_url()).await?;
let mut client_peer = WebApi::start(stream_peer);
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
const QUERY_TIMEOUT: Duration = Duration::from_secs(10);
const RETRY_DELAY: Duration = Duration::from_secs(2);
let deadline = tokio::time::Instant::now() + CONNECTION_TIMEOUT;
let mut gw_peers = Vec::new();
let mut peer_peers = Vec::new();
let mut attempt: u32 = 0;
while tokio::time::Instant::now() < deadline {
attempt += 1;
let remaining_secs = deadline
.saturating_duration_since(tokio::time::Instant::now())
.as_secs();
tracing::info!(
attempt,
remaining_secs,
"Querying nodes for connected peers..."
);
let Some(gw_result) =
query_connected_peers(&mut client_gw, "gateway", QUERY_TIMEOUT, attempt).await
else {
tokio::time::sleep(RETRY_DELAY).await;
continue;
};
let Some(peer_result) =
query_connected_peers(&mut client_peer, "peer", QUERY_TIMEOUT, attempt).await
else {
tokio::time::sleep(RETRY_DELAY).await;
continue;
};
gw_peers = gw_result;
peer_peers = peer_result;
tracing::info!(
gateway_connections = gw_peers.len(),
peer_connections = peer_peers.len(),
"Connection visibility check"
);
if !gw_peers.is_empty() && !peer_peers.is_empty() {
break;
}
tokio::time::sleep(RETRY_DELAY).await;
}
if gw_peers.is_empty() {
bail!(
"REGRESSION: Gateway's QueryConnections returned empty after {} attempts ({}s timeout)! \
This indicates the peer's identity was not propagated to the \
transport layer when the transient connection was promoted. \
See PR #2211 for the original bug fix.",
attempt,
CONNECTION_TIMEOUT.as_secs()
);
}
if peer_peers.is_empty() {
bail!(
"REGRESSION: Peer's QueryConnections returned empty after {} attempts ({}s timeout)! \
This indicates the connection wasn't properly established.",
attempt,
CONNECTION_TIMEOUT.as_secs()
);
}
tracing::info!(
"Connection identity propagation verified: gateway sees {} peer(s), peer sees {} connection(s)",
gw_peers.len(),
peer_peers.len()
);
Ok(())
}