use freenet::test_utils::{create_empty_todo_list, load_contract};
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, ContractResponse, HostResponse, WebApi},
prelude::*,
};
use std::{
path::{Path, PathBuf},
process::{Child, Command, Stdio},
time::Duration,
};
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::info;
const TEST_CONTRACT: &str = "test-contract-integration";
const CONTRACT_PARAMS: &[u8] = b"persistence-roundtrip-params-3367";
fn workspace_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.and_then(|p| p.parent())
.expect("workspace layout: crates/core/../../ should resolve")
.to_path_buf()
}
fn target_dir() -> PathBuf {
std::env::var_os("CARGO_TARGET_DIR")
.map(PathBuf::from)
.unwrap_or_else(|| workspace_root().join("target"))
}
fn freenet_bin() -> PathBuf {
let debug = target_dir().join("debug").join("freenet");
if debug.exists() {
return debug;
}
let release = target_dir().join("release").join("freenet");
assert!(
release.exists(),
"freenet binary not found at {debug:?} or {release:?}. Build it first: \
`cargo build --bin freenet` (CI's test_unit job builds it before tests)."
);
release
}
struct NodeProcess {
child: Child,
ws_port: u16,
}
impl NodeProcess {
fn spawn(
dir: &Path,
ws_port: u16,
network_port: u16,
transport_keypair: &Path,
) -> anyhow::Result<Self> {
let child = Command::new(freenet_bin())
.arg("network")
.args(["--ws-api-address", "127.0.0.1"])
.args(["--ws-api-port", &ws_port.to_string()])
.args(["--network-address", "127.0.0.1"])
.args(["--network-port", &network_port.to_string()])
.args(["--public-network-address", "127.0.0.1"])
.args(["--public-network-port", &network_port.to_string()])
.arg("--is-gateway")
.arg("--skip-load-from-network")
.arg("--ignore-protocol-checking")
.args(["--location", "0.5"])
.args(["--config-dir", &dir.to_string_lossy()])
.args(["--data-dir", &dir.to_string_lossy()])
.args(["--log-dir", &dir.to_string_lossy()])
.args(["--transport-keypair", &transport_keypair.to_string_lossy()])
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
Ok(Self { child, ws_port })
}
fn has_exited(&mut self) -> anyhow::Result<bool> {
Ok(self.child.try_wait()?.is_some())
}
fn stop(mut self) -> anyhow::Result<()> {
self.child.kill()?;
self.child.wait()?;
Ok(())
}
}
impl Drop for NodeProcess {
fn drop(&mut self) {
if self.child.kill().is_ok() {
let _reaped = self.child.wait().is_ok();
}
}
}
fn reserve_port() -> anyhow::Result<u16> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
Ok(listener.local_addr()?.port())
}
async fn connect(ws_port: u16) -> anyhow::Result<WebApi> {
let url = format!("ws://127.0.0.1:{ws_port}/v1/contract/command?encodingProtocol=native");
let (ws_stream, _) = connect_async(&url).await?;
Ok(WebApi::start(ws_stream))
}
async fn wait_for_ws(node: &mut NodeProcess) -> anyhow::Result<WebApi> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(60);
loop {
if node.has_exited()? {
anyhow::bail!(
"freenet node process exited before its WS API on port {} came up",
node.ws_port
);
}
if let Ok(client) = connect(node.ws_port).await {
return Ok(client);
}
if tokio::time::Instant::now() >= deadline {
anyhow::bail!("WS API on port {} did not come up within 60s", node.ws_port);
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
}
macro_rules! recv_until {
($client:expr, $secs:expr, $pat:pat => $body:expr) => {{
let deadline = tokio::time::Instant::now() + Duration::from_secs($secs);
loop {
if tokio::time::Instant::now() >= deadline {
anyhow::bail!("timed out waiting for expected response");
}
match timeout(Duration::from_secs(5), $client.recv()).await {
Ok(Ok($pat)) => break $body,
Ok(Ok(other)) => info!("ignoring response while waiting: {other:?}"),
Ok(Err(e)) => anyhow::bail!("client error while waiting for response: {e}"),
Err(_) => {}
}
}
}};
}
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
async fn test_persistence_roundtrip_state_and_params() -> anyhow::Result<()> {
freenet::test_utils::ensure_contract_compiled(TEST_CONTRACT)?;
let data_dir = tempfile::tempdir()?;
let data_path = data_dir.path().to_path_buf();
let key = freenet::dev_tool::TransportKeypair::new();
let transport_keypair = data_path.join("private.pem");
key.save(&transport_keypair)?;
key.public().save(data_path.join("public.pem"))?;
let (ws_port, network_port) = (reserve_port()?, reserve_port()?);
let (ws_port2, network_port2) = (reserve_port()?, reserve_port()?);
let contract = load_contract(TEST_CONTRACT, Parameters::from(CONTRACT_PARAMS.to_vec()))?;
let contract_key = contract.key();
info!(%contract_key, "loaded contract with non-empty parameters");
let empty_param_key = load_contract(TEST_CONTRACT, Parameters::from(vec![]))?.key();
assert_ne!(
contract_key, empty_param_key,
"parameters must be part of the contract key for this test to be meaningful"
);
let mut todo: freenet::test_utils::TodoList =
serde_json::from_slice(&create_empty_todo_list())?;
todo.tasks.push(freenet::test_utils::Task {
id: 7,
title: "survive a restart".to_string(),
description: "state and params must round-trip through redb".to_string(),
completed: false,
priority: 3,
});
let initial_state = WrappedState::from(serde_json::to_vec(&todo)?);
let mut node1 = NodeProcess::spawn(&data_path, ws_port, network_port, &transport_keypair)?;
let mut client = wait_for_ws(&mut node1).await?;
client
.send(ClientRequest::ContractOp(ContractRequest::Put {
contract: contract.clone(),
state: initial_state.clone(),
related_contracts: Default::default(),
subscribe: false,
blocking_subscribe: false,
}))
.await?;
let put_key = recv_until!(client, 30,
HostResponse::ContractResponse(ContractResponse::PutResponse { key }) => key);
assert_eq!(put_key, contract_key, "PUT acknowledged a different key");
info!("PUT acknowledged; killing node 1 to release the redb store");
drop(client);
node1.stop()?;
let mut node2 = NodeProcess::spawn(&data_path, ws_port2, network_port2, &transport_keypair)?;
let mut client = wait_for_ws(&mut node2).await?;
client
.send(ClientRequest::ContractOp(ContractRequest::Get {
key: *contract_key.id(),
return_contract_code: true,
subscribe: true,
blocking_subscribe: false,
}))
.await?;
let (got_contract, got_state) = recv_until!(client, 30,
HostResponse::ContractResponse(ContractResponse::GetResponse {
contract: Some(c), state, ..
}) => (c, state));
assert_eq!(
got_contract.params().as_ref(),
CONTRACT_PARAMS,
"parameters were lost across restart (#3350)"
);
assert_eq!(
got_contract.key(),
contract_key,
"contract key changed across restart"
);
let recovered: freenet::test_utils::TodoList = serde_json::from_slice(got_state.as_ref())?;
assert_eq!(
recovered.tasks.len(),
1,
"persisted state lost its task across restart"
);
assert_eq!(recovered.tasks[0].id, 7, "persisted task id changed");
assert_eq!(
recovered.tasks[0].title, "survive a restart",
"persisted task title changed"
);
info!("GET-with-subscribe after restart returned correct state and parameters");
client
.send(ClientRequest::ContractOp(ContractRequest::Get {
key: *contract_key.id(),
return_contract_code: true,
subscribe: false,
blocking_subscribe: false,
}))
.await?;
let second = recv_until!(client, 30,
HostResponse::ContractResponse(ContractResponse::GetResponse {
contract: Some(c), ..
}) => c);
assert_eq!(
second.params().as_ref(),
CONTRACT_PARAMS,
"second GET after restart returned wrong parameters"
);
info!("second GET after restart still returns the persisted contract");
drop(client);
node2.stop()?;
Ok(())
}