use freenet::test_utils::{
TestContext, load_contract, make_get, make_put, make_subscribe, make_update,
};
use freenet_macros::freenet_test;
use freenet_stdlib::{
client_api::{ClientRequest, ContractResponse, HostResponse, WebApi},
prelude::*,
};
use std::time::Duration;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::info;
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 10,
tokio_flavor = "multi_thread"
)]
async fn test_isolated_node_put_get_workflow(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = freenet::test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let (ws_stream, _) = connect_async(&gateway.ws_url()).await?;
let mut client = WebApi::start(ws_stream);
info!("Step 1: Performing PUT operation to cache contract locally");
let put_start = std::time::Instant::now();
make_put(&mut client, wrapped_state.clone(), contract.clone(), false).await?;
let put_result = timeout(Duration::from_secs(30), client.recv()).await;
let put_elapsed = put_start.elapsed();
match put_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key);
info!("PUT operation successful in {:?}", put_elapsed);
}
Ok(Ok(other)) => {
panic!("Unexpected PUT response: {:?}", other);
}
Ok(Err(e)) => {
panic!("PUT operation failed: {}", e);
}
Err(_) => {
panic!("PUT operation timed out");
}
}
info!("Contract verified in local cache");
info!("Step 2: Performing GET operation using local cache");
let get_start = std::time::Instant::now();
make_get(&mut client, contract_key, true, false).await?;
let get_result = timeout(Duration::from_secs(10), client.recv()).await;
let get_elapsed = get_start.elapsed();
assert!(
get_elapsed < Duration::from_secs(5),
"GET from local cache should be fast, not hanging on self-routing. Elapsed: {:?}",
get_elapsed
);
match get_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
contract: recv_contract,
state: recv_state,
..
}))) => {
assert_eq!(
recv_contract
.as_ref()
.expect("Contract should be present")
.key(),
contract_key
);
assert_eq!(recv_state, wrapped_state);
info!(
"GET operation successful from local cache in {:?}",
get_elapsed
);
}
Ok(Ok(other)) => {
panic!("Unexpected GET response: {:?}", other);
}
Ok(Err(e)) => {
panic!("GET operation failed: {}", e);
}
Err(_) => {
panic!("GET operation timed out");
}
}
info!("PUT-then-GET workflow completed successfully without self-routing");
client
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 10,
tokio_flavor = "multi_thread"
)]
async fn test_concurrent_get_deduplication_race(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = freenet::test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let (ws_stream1, _) = connect_async(&gateway.ws_url()).await?;
let mut client1 = WebApi::start(ws_stream1);
let (ws_stream2, _) = connect_async(&gateway.ws_url()).await?;
let mut client2 = WebApi::start(ws_stream2);
let (ws_stream3, _) = connect_async(&gateway.ws_url()).await?;
let mut client3 = WebApi::start(ws_stream3);
info!("Step 1: PUT contract to cache it locally");
make_put(&mut client1, wrapped_state.clone(), contract.clone(), false).await?;
let put_result = timeout(Duration::from_secs(30), client1.recv()).await;
match put_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key);
info!("Contract cached successfully");
}
other => {
panic!("PUT failed: {:?}", other);
}
}
info!("Step 2: Concurrent GET requests from multiple clients");
info!("This tests the deduplication race condition from issue #1886");
let get1 = async {
make_get(&mut client1, contract_key, true, false).await?;
let result = timeout(Duration::from_secs(5), client1.recv()).await;
Ok::<_, anyhow::Error>((1, result))
};
let get2 = async {
make_get(&mut client2, contract_key, true, false).await?;
let result = timeout(Duration::from_secs(5), client2.recv()).await;
Ok::<_, anyhow::Error>((2, result))
};
let get3 = async {
make_get(&mut client3, contract_key, true, false).await?;
let result = timeout(Duration::from_secs(5), client3.recv()).await;
Ok::<_, anyhow::Error>((3, result))
};
let (result1, result2, result3) = tokio::join!(get1, get2, get3);
let check_result =
|client_num: i32, result: anyhow::Result<(i32, Result<Result<HostResponse, _>, _>)>| {
match result {
Ok((
_,
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
state,
..
}))),
)) => {
assert_eq!(key, contract_key);
assert_eq!(state, wrapped_state);
info!("Client {}: Received GET response", client_num);
true
}
Ok((_, Ok(Ok(other)))) => {
info!("Client {}: Unexpected response: {:?}", client_num, other);
false
}
Ok((_, Ok(Err(e)))) => {
info!("Client {}: Error: {}", client_num, e);
false
}
Ok((_, Err(_))) => {
info!(
"Client {}: TIMEOUT - This is the bug from issue #1886!",
client_num
);
false
}
Err(e) => {
info!("Client {}: Failed to send request: {}", client_num, e);
false
}
}
};
let success1 = check_result(1, result1);
let success2 = check_result(2, result2);
let success3 = check_result(3, result3);
assert!(
success1 && success2 && success3,
"All clients should receive GET responses. Failures indicate issue #1886 race condition."
);
info!("All clients received responses - no race condition detected");
client1
.send(ClientRequest::Disconnect { cause: None })
.await?;
client2
.send(ClientRequest::Disconnect { cause: None })
.await?;
client3
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 10,
tokio_flavor = "multi_thread"
)]
async fn test_isolated_node_local_subscription(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = freenet::test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let (ws_stream1, _) = connect_async(&gateway.ws_url()).await?;
let mut client1 = WebApi::start(ws_stream1);
let (ws_stream2, _) = connect_async(&gateway.ws_url()).await?;
let mut client2 = WebApi::start(ws_stream2);
info!("Step 1: Performing PUT operation to cache contract locally");
make_put(&mut client1, wrapped_state.clone(), contract.clone(), false).await?;
let put_result = timeout(Duration::from_secs(30), client1.recv()).await;
match put_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key);
info!("PUT operation successful");
}
Ok(Ok(other)) => {
panic!("Unexpected PUT response: {:?}", other);
}
Ok(Err(e)) => {
panic!("PUT operation failed: {}", e);
}
Err(_) => {
panic!("PUT operation timed out");
}
}
info!("Step 2: Testing SUBSCRIBE operation on locally cached contract");
let subscribe_start = std::time::Instant::now();
make_subscribe(&mut client1, contract_key).await?;
let subscribe_result = timeout(Duration::from_secs(10), client1.recv()).await;
let subscribe_elapsed = subscribe_start.elapsed();
match subscribe_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
}))) => {
assert_eq!(key, contract_key);
info!(
"Client 1: SUBSCRIBE operation successful in {:?}",
subscribe_elapsed
);
assert!(
subscribed,
"Should receive subscribed=true when subscribing to local contract"
);
}
Ok(Ok(other)) => {
panic!("Unexpected SUBSCRIBE response: {:?}", other);
}
Ok(Err(e)) => {
panic!("SUBSCRIBE operation failed: {}", e);
}
Err(_) => {
panic!(
"SUBSCRIBE operation timed out - SubscribeResponse not delivered! \
This indicates the bug from PR #1844 has regressed."
);
}
}
info!("Step 3: Testing second client subscription");
make_subscribe(&mut client2, contract_key).await?;
let subscribe2_result = timeout(Duration::from_secs(10), client2.recv()).await;
match subscribe2_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
}))) => {
assert_eq!(key, contract_key);
info!("Client 2: SUBSCRIBE operation successful");
assert!(subscribed);
}
_ => {
panic!("Client 2: SUBSCRIBE operation failed or timed out");
}
}
info!(
"Local subscription test completed successfully - both clients received SubscribeResponse"
);
client1
.send(ClientRequest::Disconnect { cause: None })
.await?;
client2
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 10,
tokio_flavor = "multi_thread"
)]
async fn test_isolated_node_update_operation(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = freenet::test_utils::create_empty_todo_list();
let wrapped_initial_state = WrappedState::from(initial_state);
let (ws_stream, _) = connect_async(&gateway.ws_url()).await?;
let mut client = WebApi::start(ws_stream);
info!("Step 1: Performing PUT operation to cache contract locally");
let put_start = std::time::Instant::now();
make_put(
&mut client,
wrapped_initial_state.clone(),
contract.clone(),
false,
)
.await?;
let put_result = timeout(Duration::from_secs(30), client.recv()).await;
let put_elapsed = put_start.elapsed();
match put_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key);
info!("PUT operation successful in {:?}", put_elapsed);
}
Ok(Ok(other)) => {
panic!("Unexpected PUT response: {:?}", other);
}
Ok(Err(e)) => {
panic!("PUT operation failed: {}", e);
}
Err(_) => {
panic!("PUT operation timed out");
}
}
info!("Step 2: Performing UPDATE operation with new state");
let updated_state = freenet::test_utils::create_todo_list_with_item("Test task");
let wrapped_updated_state = WrappedState::from(updated_state);
let update_start = std::time::Instant::now();
make_update(&mut client, contract_key, wrapped_updated_state.clone()).await?;
let update_result = timeout(Duration::from_secs(15), client.recv()).await;
let update_elapsed = update_start.elapsed();
assert!(
update_elapsed < Duration::from_secs(10),
"UPDATE should complete quickly on isolated node, not timeout. Elapsed: {:?}",
update_elapsed
);
match update_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key, ..
}))) => {
assert_eq!(key, contract_key);
info!("UPDATE operation successful in {:?}", update_elapsed);
}
Ok(Ok(other)) => {
panic!("Unexpected UPDATE response: {:?}", other);
}
Ok(Err(e)) => {
panic!("UPDATE operation failed: {}", e);
}
Err(_) => {
panic!("UPDATE operation timed out (this is the bug in issue #1884)");
}
}
info!("Step 3: Performing GET operation to verify updated state");
let get_start = std::time::Instant::now();
make_get(&mut client, contract_key, true, false).await?;
let get_result = timeout(Duration::from_secs(10), client.recv()).await;
let get_elapsed = get_start.elapsed();
match get_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
state: recv_state,
..
}))) => {
let recv_str = String::from_utf8_lossy(recv_state.as_ref());
info!("Received state after UPDATE: {}", recv_str);
assert!(
recv_str.contains("\"title\":\"Test task\""),
"State should contain the updated task 'Test task'"
);
assert!(
recv_str.contains("\"tasks\":["),
"State should have tasks array"
);
assert!(
!recv_str.contains("\"tasks\":[]"),
"Tasks array should not be empty after update"
);
info!(
"GET operation successful, state correctly updated in {:?}",
get_elapsed
);
}
Ok(Ok(other)) => {
panic!("Unexpected GET response: {:?}", other);
}
Ok(Err(e)) => {
panic!("GET operation failed: {}", e);
}
Err(_) => {
panic!("GET operation timed out");
}
}
info!("PUT-UPDATE-GET workflow completed successfully on isolated node");
client
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}