use freenet::{
local_node::NodeConfig,
server::serve_client_api,
test_utils::{TestContext, load_contract, make_get, make_subscribe},
};
use freenet_macros::freenet_test;
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, HostResponse, WebApi},
prelude::*,
};
use futures::FutureExt;
use std::{
net::Ipv4Addr,
sync::{LazyLock, Mutex},
time::Duration,
};
use tokio::{select, time::timeout};
use tokio_tungstenite::connect_async;
use tracing::{error, info};
static RNG: LazyLock<Mutex<rand::rngs::StdRng>> = LazyLock::new(|| {
use rand::SeedableRng;
Mutex::new(rand::rngs::StdRng::from_seed(
*b"error_notification_test_seed0123",
))
});
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 10,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_get_error_notification(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
let (ws_stream, _) = connect_async(&gateway.ws_url()).await?;
let mut client = WebApi::start(ws_stream);
info!("Testing GET operation for non-existent contract (should fail with error)");
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![1u8; 32].into())?; let nonexistent_key = contract.key();
make_get(&mut client, nonexistent_key, false, false).await?;
let get_result = timeout(Duration::from_secs(30), client.recv()).await;
match get_result {
Ok(Ok(response)) => {
info!("✓ Received response (not timing out): {:?}", response);
info!("✓ Client properly notified instead of hanging");
}
Ok(Err(e)) => {
info!("✓ Received error notification: {}", e);
}
Err(_) => {
panic!(
"GET operation timed out - no response received! \
This indicates the bug from issue #1858 has regressed. \
Clients should receive error responses, not hang indefinitely."
);
}
}
info!("Error notification test passed - client did not hang on operation failure");
client
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 10,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_put_error_notification(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
let (ws_stream, _) = connect_async(&gateway.ws_url()).await?;
let mut client = WebApi::start(ws_stream);
info!("Testing PUT operation with invalid contract (should fail with error)");
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![].into())?;
let invalid_state = WrappedState::new(vec![0xFF; 1024 * 1024]);
let put_request = ClientRequest::ContractOp(ContractRequest::Put {
contract: contract.clone(),
state: invalid_state,
related_contracts: Default::default(),
subscribe: false,
blocking_subscribe: false,
});
client.send(put_request).await?;
let put_result = timeout(Duration::from_secs(30), client.recv()).await;
match put_result {
Ok(Ok(response)) => {
info!("✓ Received response (not timing out): {:?}", response);
info!("✓ Client properly notified instead of hanging");
}
Ok(Err(e)) => {
info!("✓ Received error notification: {}", e);
}
Err(_) => {
panic!(
"PUT operation timed out - no response received! \
This indicates clients are not receiving error notifications. \
Clients should receive error responses, not hang indefinitely."
);
}
}
info!("PUT error notification test passed - client did not hang on operation failure");
client
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 10,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_update_error_notification(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
let (ws_stream, _) = connect_async(&gateway.ws_url()).await?;
let mut client = WebApi::start(ws_stream);
info!("Testing UPDATE operation for non-existent contract (should fail with error)");
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![99u8; 32].into())?; let nonexistent_key = contract.key();
let new_state = State::from(vec![1, 2, 3, 4]);
let update_request = ClientRequest::ContractOp(ContractRequest::Update {
key: nonexistent_key,
data: freenet_stdlib::prelude::UpdateData::State(new_state),
});
client.send(update_request).await?;
let update_result = timeout(Duration::from_secs(30), client.recv()).await;
match update_result {
Ok(Ok(response)) => {
info!("✓ Received response (not timing out): {:?}", response);
info!("✓ Client properly notified instead of hanging");
}
Ok(Err(e)) => {
info!("✓ Received error notification: {}", e);
}
Err(_) => {
panic!(
"UPDATE operation timed out - no response received! \
This indicates clients are not receiving error notifications. \
Clients should receive error responses, not hang indefinitely."
);
}
}
info!("UPDATE error notification test passed - client did not hang on operation failure");
client
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "node-a"],
timeout_secs = 180,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_subscribe_failure_notifies_client(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
let (ws_stream, _) = connect_async(&gateway.ws_url()).await?;
let mut client = WebApi::start(ws_stream);
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![42u8; 32].into())?;
make_subscribe(&mut client, contract.key()).await?;
let mut got_result_error = false;
let mut got_notification_error = false;
let deadline = Duration::from_secs(90);
let start = tokio::time::Instant::now();
while start.elapsed() < deadline {
match timeout(Duration::from_secs(10), client.recv()).await {
Ok(Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::SubscribeResponse {
subscribed, ..
},
))) => {
if !subscribed {
got_result_error = true;
}
}
Ok(Err(e)) => {
let err_str = format!("{e}");
if err_str.contains("not found") || err_str.contains("Subscription failed") {
got_notification_error = true;
} else {
got_result_error = true;
}
}
Ok(Ok(_)) => {}
Err(_) => break,
}
if got_result_error && got_notification_error {
break;
}
}
assert!(
got_result_error || got_notification_error,
"Client did not receive any error for failed subscription \
Got result_error={}, notification_error={}",
got_result_error,
got_notification_error
);
client
.send(ClientRequest::Disconnect { cause: None })
.await?;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 15,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_subscribe_without_wasm_rejected(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.gateway()?;
let (ws_stream, _) = connect_async(&gateway.ws_url()).await?;
let mut client = WebApi::start(ws_stream);
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![42u8; 32].into())?;
make_subscribe(&mut client, contract.key()).await?;
let start = tokio::time::Instant::now();
let mut got_rejection = false;
while start.elapsed() < Duration::from_secs(15) {
match timeout(Duration::from_secs(5), client.recv()).await {
Ok(Err(e)) => {
let err_str = format!("{e}");
if err_str.contains("WASM") || err_str.contains("not cached locally") {
got_rejection = true;
info!("Got expected WASM rejection error: {err_str}");
break;
} else {
got_rejection = true;
info!("Got error (not WASM-specific): {err_str}");
break;
}
}
Ok(Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::SubscribeResponse {
subscribed: false,
..
},
))) => {
got_rejection = true;
info!("Got SubscribeResponse with subscribed=false");
break;
}
Ok(Ok(other)) => {
tracing::debug!("Skipping unrelated response: {other}");
}
Err(_) => break, }
}
assert!(
got_rejection,
"Subscribe without prior PUT should be rejected, but no error received within 15s"
);
assert!(
start.elapsed() < Duration::from_secs(10),
"Rejection took too long ({:?}), should be immediate",
start.elapsed()
);
client
.send(ClientRequest::Disconnect { cause: None })
.await?;
Ok(())
}
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_connection_drop_error_notification() -> anyhow::Result<()> {
use std::net::TcpListener;
let gateway_network_socket = TcpListener::bind("127.0.0.1:0")?;
let gateway_ws_socket = TcpListener::bind("127.0.0.1:0")?;
let peer_ws_socket = TcpListener::bind("127.0.0.1:0")?;
let gateway_port = gateway_network_socket.local_addr()?.port();
let gateway_ws_port = gateway_ws_socket.local_addr()?.port();
let peer_ws_port = peer_ws_socket.local_addr()?.port();
let temp_dir_gw = tempfile::tempdir()?;
let gateway_key = freenet::dev_tool::TransportKeypair::new();
let gateway_transport_keypair = temp_dir_gw.path().join("private.pem");
gateway_key.save(&gateway_transport_keypair)?;
gateway_key
.public()
.save(temp_dir_gw.path().join("public.pem"))?;
let gateway_config = freenet::config::ConfigArgs {
ws_api: freenet::config::WebsocketApiArgs {
address: Some(Ipv4Addr::LOCALHOST.into()),
ws_api_port: Some(gateway_ws_port),
token_ttl_seconds: None,
token_cleanup_interval_seconds: None,
allowed_host: None,
allowed_source_cidrs: None,
},
network_api: freenet::config::NetworkArgs {
public_address: Some(Ipv4Addr::LOCALHOST.into()),
public_port: Some(gateway_port),
is_gateway: true,
skip_load_from_network: true,
gateways: Some(vec![]),
location: Some({
use rand::Rng;
RNG.lock().unwrap().random()
}),
ignore_protocol_checking: true,
address: Some(Ipv4Addr::LOCALHOST.into()),
network_port: Some(gateway_port),
min_connections: None,
max_connections: None,
bandwidth_limit: None,
blocked_addresses: None,
transient_budget: None,
transient_ttl_secs: None,
total_bandwidth_limit: None,
min_bandwidth_per_connection: None,
..Default::default()
},
config_paths: freenet::config::ConfigPathsArgs {
config_dir: Some(temp_dir_gw.path().to_path_buf()),
data_dir: Some(temp_dir_gw.path().to_path_buf()),
log_dir: Some(temp_dir_gw.path().to_path_buf()),
},
secrets: freenet::config::SecretArgs {
transport_keypair: Some(gateway_transport_keypair),
..Default::default()
},
..Default::default()
};
let temp_dir_peer = tempfile::tempdir()?;
let peer_key = freenet::dev_tool::TransportKeypair::new();
let peer_transport_keypair = temp_dir_peer.path().join("private.pem");
peer_key.save(&peer_transport_keypair)?;
let gateway_info = freenet::config::InlineGwConfig {
address: (Ipv4Addr::LOCALHOST, gateway_port).into(),
location: Some({
use rand::Rng;
RNG.lock().unwrap().random()
}),
public_key_path: temp_dir_gw.path().join("public.pem"),
};
let peer_config = freenet::config::ConfigArgs {
ws_api: freenet::config::WebsocketApiArgs {
address: Some(Ipv4Addr::LOCALHOST.into()),
ws_api_port: Some(peer_ws_port),
token_ttl_seconds: None,
token_cleanup_interval_seconds: None,
allowed_host: None,
allowed_source_cidrs: None,
},
network_api: freenet::config::NetworkArgs {
public_address: Some(Ipv4Addr::LOCALHOST.into()),
public_port: None,
is_gateway: false,
skip_load_from_network: true,
gateways: Some(vec![serde_json::to_string(&gateway_info)?]),
location: Some({
use rand::Rng;
RNG.lock().unwrap().random()
}),
ignore_protocol_checking: true,
address: Some(Ipv4Addr::LOCALHOST.into()),
network_port: None,
min_connections: None,
max_connections: None,
bandwidth_limit: None,
blocked_addresses: None,
transient_budget: None,
transient_ttl_secs: None,
total_bandwidth_limit: None,
min_bandwidth_per_connection: None,
..Default::default()
},
config_paths: freenet::config::ConfigPathsArgs {
config_dir: Some(temp_dir_peer.path().to_path_buf()),
data_dir: Some(temp_dir_peer.path().to_path_buf()),
log_dir: Some(temp_dir_peer.path().to_path_buf()),
},
secrets: freenet::config::SecretArgs {
transport_keypair: Some(peer_transport_keypair),
..Default::default()
},
..Default::default()
};
std::mem::drop(gateway_network_socket);
std::mem::drop(gateway_ws_socket);
std::mem::drop(peer_ws_socket);
let gateway = async {
let config = gateway_config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_client_api(config.ws_api).await?)
.await?;
node.run().await
}
.boxed_local();
let (peer_shutdown_tx, mut peer_shutdown_rx) = tokio::sync::mpsc::channel::<()>(1);
let peer = async move {
let config = peer_config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_client_api(config.ws_api).await?)
.await?;
tokio::select! {
result = node.run() => result,
_ = peer_shutdown_rx.recv() => {
info!("Peer received shutdown signal - simulating connection drop");
Err(anyhow::anyhow!("Peer shutdown requested"))
}
}
}
.boxed_local();
let test = tokio::time::timeout(Duration::from_secs(90), async move {
info!("Waiting for nodes to start up and connect...");
tokio::time::sleep(Duration::from_secs(15)).await;
let url = format!(
"ws://localhost:{}/v1/contract/command?encodingProtocol=native",
gateway_ws_port
);
let (ws_stream, _) = connect_async(&url).await?;
let mut client = WebApi::start(ws_stream);
info!("Client connected to gateway");
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, vec![].into())?;
let state = freenet::test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(state);
let put_request = ClientRequest::ContractOp(ContractRequest::Put {
contract: contract.clone(),
state: wrapped_state.clone(),
related_contracts: Default::default(),
subscribe: false,
blocking_subscribe: false,
});
client.send(put_request).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
info!("Dropping peer connection to simulate network failure...");
peer_shutdown_tx.send(()).await?;
tokio::time::sleep(Duration::from_secs(2)).await;
info!("Waiting for response after connection drop...");
let response_result = timeout(Duration::from_secs(30), client.recv()).await;
match response_result {
Ok(Ok(response)) => {
info!("✓ Received response after connection drop: {:?}", response);
info!("✓ Client properly handled connection drop scenario");
}
Ok(Err(e)) => {
info!("✓ Received error notification after connection drop: {}", e);
info!("✓ Client properly notified of connection issues");
}
Err(_) => {
panic!(
"Operation timed out after connection drop - no response received! \
This indicates clients are not being notified of connection failures. \
Clients should receive error responses even when connections fail."
);
}
}
info!("Connection drop error notification test passed");
let _disconnect = client.send(ClientRequest::Disconnect { cause: None }).await;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok::<(), anyhow::Error>(())
});
select! {
_ = gateway => {
error!("Gateway exited unexpectedly");
Ok(())
}
_ = peer => {
Ok(())
}
result = test => {
result??;
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
}
}