use freenet::{
local_node::NodeConfig,
server::serve_client_api,
test_utils::{load_contract, make_get, make_put},
};
use freenet_stdlib::{
client_api::{ContractResponse, HostResponse, WebApi},
prelude::*,
};
use std::{
net::{Ipv4Addr, TcpListener},
path::Path,
time::{Duration, Instant},
};
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::info;
const TEST_CONTRACT: &str = "test-contract-integration";
fn reserve_port() -> anyhow::Result<u16> {
let listener = TcpListener::bind("127.0.0.1:0")?;
Ok(listener.local_addr()?.port())
}
fn node_config(
dir: &Path,
ws_port: u16,
network_port: u16,
keypair_path: &Path,
) -> freenet::config::ConfigArgs {
freenet::config::ConfigArgs {
ws_api: freenet::config::WebsocketApiArgs {
address: Some(Ipv4Addr::LOCALHOST.into()),
ws_api_port: Some(ws_port),
..Default::default()
},
network_api: freenet::config::NetworkArgs {
public_address: Some(Ipv4Addr::LOCALHOST.into()),
public_port: Some(network_port),
is_gateway: true,
skip_load_from_network: true,
gateways: Some(vec![]),
location: Some(0.5),
ignore_protocol_checking: true,
address: Some(Ipv4Addr::LOCALHOST.into()),
network_port: Some(network_port),
..Default::default()
},
config_paths: freenet::config::ConfigPathsArgs {
config_dir: Some(dir.to_path_buf()),
data_dir: Some(dir.to_path_buf()),
log_dir: Some(dir.to_path_buf()),
},
secrets: freenet::config::SecretArgs {
transport_keypair: Some(keypair_path.to_path_buf()),
..Default::default()
},
..Default::default()
}
}
async fn connect_ws(port: u16, within: Duration) -> anyhow::Result<WebApi> {
let url = format!("ws://127.0.0.1:{port}/v1/contract/command?encodingProtocol=native");
let deadline = Instant::now() + within;
loop {
match connect_async(&url).await {
Ok((ws_stream, _)) => return Ok(WebApi::start(ws_stream)),
Err(e) => {
if Instant::now() >= deadline {
anyhow::bail!("WS API on port {port} did not come up within {within:?}: {e}");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
macro_rules! recv_until {
($client:expr, $secs:expr, $pat:pat => $body:expr) => {{
let deadline = Instant::now() + Duration::from_secs($secs);
loop {
if Instant::now() >= deadline {
anyhow::bail!("timed out waiting for expected response");
}
match timeout(Duration::from_secs(5), $client.recv()).await {
Ok(Ok($pat)) => break $body,
Ok(Ok(other)) => info!("ignoring response while waiting: {other:?}"),
Ok(Err(e)) => anyhow::bail!("client error while waiting: {e}"),
Err(_) => {}
}
}
}};
}
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
#[cfg_attr(not(feature = "redb"), ignore)]
async fn test_in_process_restart_releases_redb_lock() -> anyhow::Result<()> {
freenet::test_utils::ensure_contract_compiled(TEST_CONTRACT)?;
let data_dir = tempfile::tempdir()?;
let data_path = data_dir.path().to_path_buf();
let db_path = data_path.join("db").join("db");
let key = freenet::dev_tool::TransportKeypair::new();
let keypair_path = data_path.join("private.pem");
key.save(&keypair_path)?;
key.public().save(data_path.join("public.pem"))?;
let (ws_port1, net_port1) = (reserve_port()?, reserve_port()?);
let (ws_port2, net_port2) = (reserve_port()?, reserve_port()?);
let contract = load_contract(TEST_CONTRACT, Parameters::from(vec![]))?;
let contract_key = contract.key();
let initial_state = WrappedState::from(freenet::test_utils::create_empty_todo_list());
let cfg1 = node_config(&data_path, ws_port1, net_port1, &keypair_path)
.build()
.await?;
let node1 = NodeConfig::new(cfg1.clone())
.await?
.build(serve_client_api(cfg1.ws_api.clone()).await?)
.await?;
let shutdown1 = node1.shutdown_handle();
let run1 = tokio::spawn(async move { node1.run().await });
{
let mut client = connect_ws(ws_port1, Duration::from_secs(30)).await?;
make_put(&mut client, initial_state.clone(), contract.clone(), false).await?;
let put_key = recv_until!(client, 30,
HostResponse::ContractResponse(ContractResponse::PutResponse { key }) => key);
assert_eq!(put_key, contract_key, "PUT acknowledged a different key");
info!("node 1 PUT acknowledged; triggering graceful shutdown");
}
shutdown1.shutdown().await;
let run1_result = timeout(Duration::from_secs(30), run1)
.await
.map_err(|_| anyhow::anyhow!("node 1 run loop did not exit within 30s of shutdown()"))?
.map_err(|e| anyhow::anyhow!("node 1 run task panicked: {e}"))?;
info!(?run1_result, "node 1 run loop exited after shutdown");
let lock_deadline = Instant::now() + Duration::from_secs(10);
loop {
match try_open_redb(&db_path) {
Ok(()) => {
info!(
elapsed_ms = lock_deadline
.saturating_duration_since(Instant::now())
.as_millis(),
"redb lock released after shutdown"
);
break;
}
Err(e) => {
assert!(
Instant::now() < lock_deadline,
"redb file lock was not released within 10s of node 1 shutdown \
(detached executor task still holding it?): {e}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
let cfg2 = node_config(&data_path, ws_port2, net_port2, &keypair_path)
.build()
.await?;
let node2 = NodeConfig::new(cfg2.clone())
.await?
.build(serve_client_api(cfg2.ws_api.clone()).await?)
.await?;
let shutdown2 = node2.shutdown_handle();
let run2 = tokio::spawn(async move { node2.run().await });
let mut client = connect_ws(ws_port2, Duration::from_secs(30)).await?;
make_get(&mut client, contract_key, true, false).await?;
let got_state = recv_until!(client, 30,
HostResponse::ContractResponse(ContractResponse::GetResponse { state, .. }) => state);
assert_eq!(
got_state.as_ref(),
initial_state.as_ref(),
"node 2 served different state than node 1 persisted (stale server or bad reload)"
);
info!("node 2 GET after in-process restart returned the persisted state");
drop(client);
shutdown2.shutdown().await;
if timeout(Duration::from_secs(30), run2).await.is_err() {
info!("node 2 run loop did not exit within 30s of shutdown (cleanup only)");
}
Ok(())
}
fn try_open_redb(db_path: &Path) -> anyhow::Result<()> {
let db = redb::Database::create(db_path)?;
drop(db);
Ok(())
}