use freenet::{
local_node::NodeConfig,
server::serve_client_api,
test_utils::{load_contract, make_get, make_put, make_subscribe},
};
use freenet_stdlib::{client_api::WebApi, prelude::*};
use futures::FutureExt;
use std::{
net::{Ipv4Addr, TcpListener},
sync::{LazyLock, Mutex},
time::Duration,
};
use tokio::{select, time::timeout};
use tokio_tungstenite::connect_async;
use tracing::{error, info};
static RNG: LazyLock<Mutex<rand::rngs::StdRng>> = LazyLock::new(|| {
use rand::SeedableRng;
Mutex::new(rand::rngs::StdRng::from_seed(
*b"ops_before_join_test_seed_12345!",
))
});
async fn expect_peer_not_joined(client: &mut WebApi, op_name: &str) -> anyhow::Result<()> {
let response = timeout(Duration::from_secs(10), client.recv()).await;
let error_msg = match response {
Ok(Err(client_error)) => format!("{:?}", client_error),
Ok(Ok(resp)) => {
return Err(anyhow::anyhow!(
"{} unexpectedly succeeded before join: {:?}",
op_name,
resp
));
}
Err(_) => return Err(anyhow::anyhow!("{}: timeout - no response", op_name)),
};
assert!(
error_msg.contains("PeerNotJoined"),
"{}: expected PeerNotJoined, got: {}",
op_name,
error_msg
);
info!("{}: got PeerNotJoined as expected", op_name);
Ok(())
}
async fn connect_to_ws(port: u16) -> anyhow::Result<WebApi> {
let url = format!(
"ws://localhost:{}/v1/contract/command?encodingProtocol=native",
port
);
for attempt in 0..10 {
match connect_async(&url).await {
Ok((ws_stream, _)) => return Ok(WebApi::start(ws_stream)),
Err(_) if attempt < 9 => {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
Err(e) => return Err(anyhow::anyhow!("Failed to connect: {}", e)),
}
}
unreachable!()
}
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
async fn test_operations_blocked_before_join() -> anyhow::Result<()> {
let gateway_network_socket = TcpListener::bind("127.0.0.1:0")?;
let gateway_ws_socket = TcpListener::bind("127.0.0.1:0")?;
let peer_ws_socket = TcpListener::bind("127.0.0.1:0")?;
let gateway_port = gateway_network_socket.local_addr()?.port();
let gateway_ws_port = gateway_ws_socket.local_addr()?.port();
let peer_ws_port = peer_ws_socket.local_addr()?.port();
let temp_dir_gw = tempfile::tempdir()?;
let gateway_key = freenet::dev_tool::TransportKeypair::new();
let gateway_transport_keypair = temp_dir_gw.path().join("private.pem");
gateway_key.save(&gateway_transport_keypair)?;
gateway_key
.public()
.save(temp_dir_gw.path().join("public.pem"))?;
let gateway_config = freenet::config::ConfigArgs {
ws_api: freenet::config::WebsocketApiArgs {
address: Some(Ipv4Addr::LOCALHOST.into()),
ws_api_port: Some(gateway_ws_port),
token_ttl_seconds: None,
token_cleanup_interval_seconds: None,
allowed_host: None,
},
network_api: freenet::config::NetworkArgs {
public_address: Some(Ipv4Addr::LOCALHOST.into()),
public_port: Some(gateway_port),
is_gateway: true,
skip_load_from_network: true,
gateways: Some(vec![]),
location: Some({
use rand::Rng;
RNG.lock().unwrap().random()
}),
ignore_protocol_checking: true,
address: Some(Ipv4Addr::LOCALHOST.into()),
network_port: Some(gateway_port),
min_connections: None,
max_connections: None,
bandwidth_limit: None,
blocked_addresses: None,
transient_budget: None,
transient_ttl_secs: None,
total_bandwidth_limit: None,
min_bandwidth_per_connection: None,
..Default::default()
},
config_paths: freenet::config::ConfigPathsArgs {
config_dir: Some(temp_dir_gw.path().to_path_buf()),
data_dir: Some(temp_dir_gw.path().to_path_buf()),
log_dir: Some(temp_dir_gw.path().to_path_buf()),
},
secrets: freenet::config::SecretArgs {
transport_keypair: Some(gateway_transport_keypair),
..Default::default()
},
..Default::default()
};
let temp_dir_peer = tempfile::tempdir()?;
let peer_key = freenet::dev_tool::TransportKeypair::new();
let peer_transport_keypair = temp_dir_peer.path().join("private.pem");
peer_key.save(&peer_transport_keypair)?;
let gateway_info = freenet::config::InlineGwConfig {
address: (Ipv4Addr::LOCALHOST, gateway_port).into(),
location: Some({
use rand::Rng;
RNG.lock().unwrap().random()
}),
public_key_path: temp_dir_gw.path().join("public.pem"),
};
let peer_config = freenet::config::ConfigArgs {
ws_api: freenet::config::WebsocketApiArgs {
address: Some(Ipv4Addr::LOCALHOST.into()),
ws_api_port: Some(peer_ws_port),
token_ttl_seconds: None,
token_cleanup_interval_seconds: None,
allowed_host: None,
},
network_api: freenet::config::NetworkArgs {
public_address: Some(Ipv4Addr::LOCALHOST.into()),
public_port: None,
is_gateway: false,
skip_load_from_network: true,
gateways: Some(vec![serde_json::to_string(&gateway_info)?]),
location: Some({
use rand::Rng;
RNG.lock().unwrap().random()
}),
ignore_protocol_checking: true,
address: Some(Ipv4Addr::LOCALHOST.into()),
network_port: None,
min_connections: None,
max_connections: None,
bandwidth_limit: None,
blocked_addresses: None,
transient_budget: None,
transient_ttl_secs: None,
total_bandwidth_limit: None,
min_bandwidth_per_connection: None,
..Default::default()
},
config_paths: freenet::config::ConfigPathsArgs {
config_dir: Some(temp_dir_peer.path().to_path_buf()),
data_dir: Some(temp_dir_peer.path().to_path_buf()),
log_dir: Some(temp_dir_peer.path().to_path_buf()),
},
secrets: freenet::config::SecretArgs {
transport_keypair: Some(peer_transport_keypair),
..Default::default()
},
..Default::default()
};
std::mem::drop(gateway_network_socket);
std::mem::drop(gateway_ws_socket);
std::mem::drop(peer_ws_socket);
let (start_gateway_tx, start_gateway_rx) = tokio::sync::oneshot::channel::<()>();
let peer_cfg = peer_config.build().await?;
let peer_ws_server = serve_client_api(peer_cfg.ws_api.clone()).await?;
let peer_node = NodeConfig::new(peer_cfg.clone())
.await?
.build(peer_ws_server)
.await?;
let peer = async move { peer_node.run().await }.boxed_local();
let gw_config = gateway_config.build().await?;
let gateway = async move {
let _signal = start_gateway_rx.await;
info!("Starting gateway...");
let node = NodeConfig::new(gw_config.clone())
.await?
.build(serve_client_api(gw_config.ws_api).await?)
.await?;
node.run().await
}
.boxed_local();
let test = tokio::time::timeout(Duration::from_secs(120), async move {
tokio::time::sleep(Duration::from_millis(200)).await;
let mut client = connect_to_ws(peer_ws_port).await?;
info!("Connected to peer (gateway NOT started yet)");
let contract = load_contract("test-contract-integration", vec![].into())?;
let key = contract.key();
let state = WrappedState::from(freenet::test_utils::create_empty_todo_list());
info!("Testing operations before join...");
make_put(&mut client, state.clone(), contract.clone(), false).await?;
expect_peer_not_joined(&mut client, "PUT").await?;
make_get(&mut client, key, false, false).await?;
expect_peer_not_joined(&mut client, "GET").await?;
make_subscribe(&mut client, key).await?;
expect_peer_not_joined(&mut client, "SUBSCRIBE").await?;
info!("All operations correctly rejected before join");
info!("Starting gateway...");
start_gateway_tx.send(()).expect("gateway receiver dropped");
info!("Waiting for peer to join network (polling with retry)...");
let contract2 = load_contract("test-contract-integration", vec![1].into())?;
let max_join_wait = Duration::from_secs(60);
let poll_interval = Duration::from_secs(2);
let join_start = tokio::time::Instant::now();
let mut joined = false;
while join_start.elapsed() < max_join_wait {
tokio::time::sleep(poll_interval).await;
make_put(&mut client, state.clone(), contract2.clone(), false).await?;
let response = timeout(Duration::from_secs(10), client.recv()).await;
match response {
Ok(Ok(_)) => {
info!(
"PUT succeeded after {:?} - peer has joined",
join_start.elapsed()
);
joined = true;
break;
}
Ok(Err(e)) => {
let err_str = format!("{:?}", e);
if err_str.contains("PeerNotJoined") {
info!(
"Still PeerNotJoined after {:?}, retrying...",
join_start.elapsed()
);
continue;
}
return Err(anyhow::anyhow!("Unexpected error after join: {}", err_str));
}
Err(_) => {
info!(
"PUT timed out after {:?}, retrying...",
join_start.elapsed()
);
continue;
}
}
}
if !joined {
return Err(anyhow::anyhow!(
"Peer failed to join network within {:?}",
max_join_wait
));
}
info!("Test completed successfully");
Ok::<(), anyhow::Error>(())
});
select! {
_ = gateway => {
error!("Gateway exited unexpectedly");
Ok(())
}
_ = peer => {
error!("Peer exited unexpectedly");
Ok(())
}
result = test => {
result??;
Ok(())
}
}
}