use anyhow::bail;
use freenet::dev_tool::Location;
use freenet::test_utils::{self, TestContext, make_put, make_subscribe, make_update};
use freenet_macros::freenet_test;
use freenet_stdlib::{
client_api::{ContractResponse, HostResponse, WebApi},
prelude::*,
};
use std::sync::LazyLock;
use std::time::Duration;
use tokio_tungstenite::connect_async;
const TEST_CONTRACT: &str = "test-contract-integration";
static CONTRACT: LazyLock<(ContractContainer, Location)> = LazyLock::new(|| {
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into()).expect("load contract");
let location = Location::from(&contract.key());
(contract, location)
});
fn contract_location() -> Location {
CONTRACT.1
}
fn host_peer_location() -> f64 {
contract_location().as_f64()
}
fn gateway_location() -> f64 {
Location::new_rounded(contract_location().as_f64() + 0.2).as_f64()
}
fn nat_peer_location() -> f64 {
Location::new_rounded(contract_location().as_f64() + 0.5).as_f64()
}
async fn await_put_response(
client: &mut WebApi,
contract_key: ContractKey,
timeout: Duration,
) -> anyhow::Result<()> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
bail!("timeout waiting for PUT response");
}
match tokio::time::timeout(remaining, client.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "PUT response key mismatch");
return Ok(());
}
Ok(Ok(other)) => {
tracing::debug!("await_put_response: ignoring {:?}", other);
}
Ok(Err(e)) => bail!("error waiting for PUT response: {e}"),
Err(_) => bail!("timeout waiting for PUT response"),
}
}
}
async fn await_subscribe_response(
client: &mut WebApi,
contract_key: ContractKey,
timeout: Duration,
) -> anyhow::Result<()> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
bail!("timeout waiting for SUBSCRIBE response");
}
match tokio::time::timeout(remaining, client.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
}))) => {
assert_eq!(key, contract_key, "SUBSCRIBE response key mismatch");
assert!(
subscribed,
"NAT peer subscription must succeed — a false `subscribed` means the \
relaying node could not register the peer (address-filling regression)"
);
return Ok(());
}
Ok(Ok(other)) => {
tracing::debug!("await_subscribe_response: ignoring {:?}", other);
}
Ok(Err(e)) => bail!("error waiting for SUBSCRIBE response: {e}"),
Err(_) => bail!("timeout waiting for SUBSCRIBE response"),
}
}
}
async fn await_update_notification(
client: &mut WebApi,
contract_key: ContractKey,
timeout: Duration,
) -> anyhow::Result<String> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
bail!("timeout waiting for UpdateNotification");
}
match tokio::time::timeout(remaining, client.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
update,
}))) => {
assert_eq!(key, contract_key, "UpdateNotification key mismatch");
match update {
UpdateData::State(state) => {
let todo: test_utils::TodoList = serde_json::from_slice(state.as_ref())
.expect("deserialize state from update notification");
let title = todo
.tasks
.first()
.map(|t| t.title.clone())
.unwrap_or_default();
return Ok(title);
}
UpdateData::Delta(_)
| UpdateData::StateAndDelta { .. }
| UpdateData::RelatedState { .. }
| UpdateData::RelatedDelta { .. }
| UpdateData::RelatedStateAndDelta { .. }
| _ => {
tracing::warn!("await_update_notification: ignoring non-State update");
}
}
}
Ok(Ok(other)) => {
tracing::debug!("await_update_notification: ignoring {:?}", other);
}
Ok(Err(e)) => bail!("error waiting for UpdateNotification: {e}"),
Err(_) => bail!("timeout waiting for UpdateNotification"),
}
}
}
fn todo_state_with_title(title: &str) -> WrappedState {
let todo = test_utils::TodoList {
tasks: vec![test_utils::Task {
id: 1,
title: title.to_string(),
description: "nat-subscription integration".to_string(),
completed: false,
priority: 1,
}],
version: 0,
};
WrappedState::from(serde_json::to_vec(&todo).expect("serialize todo state"))
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "host-peer", "nat-peer"],
gateways = ["gateway"],
node_configs = {
"gateway": { location: gateway_location() },
"host-peer": { location: host_peer_location() },
"nat-peer": { location: nat_peer_location() },
},
timeout_secs = 600,
startup_wait_secs = 40,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_nat_peer_remote_subscription_receives_update(ctx: &mut TestContext) -> TestResult {
let (contract, contract_loc) = {
let (contract, loc) = &*CONTRACT;
(contract.clone(), *loc)
};
let contract_key = contract.key();
let host_peer = ctx.node("host-peer")?;
let nat_peer = ctx.node("nat-peer")?;
let gateway = ctx.node("gateway")?;
assert_eq!(
host_peer.location,
host_peer_location(),
"host-peer must be pinned at the contract location"
);
assert_eq!(
nat_peer.location,
nat_peer_location(),
"nat-peer must be pinned half a ring from the contract"
);
let host_dist = Location::new_rounded(host_peer.location).distance(contract_loc);
let nat_dist = Location::new_rounded(nat_peer.location).distance(contract_loc);
assert!(
nat_dist > host_dist,
"nat-peer ({nat_dist:?}) must be farther from the contract than host-peer \
({host_dist:?}) — otherwise the subscriber could be the host and the test \
would not exercise gateway-observed-address registration"
);
tracing::info!(
"gateway: {:?} (loc {}), host-peer: {:?} (loc {}), nat-peer: {:?} (loc {}); contract loc {}",
gateway.temp_dir_path,
gateway.location,
host_peer.temp_dir_path,
host_peer.location,
nat_peer.temp_dir_path,
nat_peer.location,
contract_loc.as_f64(),
);
let uri_host = host_peer.ws_url();
let (stream_host, _) = connect_async(&uri_host).await?;
let mut client_host = WebApi::start(stream_host);
let uri_nat = nat_peer.ws_url();
let (stream_nat, _) = connect_async(&uri_nat).await?;
let mut client_nat = WebApi::start(stream_nat);
const PUT_MAX_ATTEMPTS: usize = 3;
let initial = todo_state_with_title("initial");
let mut put_ok = false;
let mut last_err: Option<anyhow::Error> = None;
for attempt in 1..=PUT_MAX_ATTEMPTS {
tracing::info!("host-peer PUT attempt {attempt}/{PUT_MAX_ATTEMPTS}");
if let Err(e) = make_put(&mut client_host, initial.clone(), contract.clone(), false).await {
last_err = Some(e);
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
match await_put_response(&mut client_host, contract_key, Duration::from_secs(120)).await {
Ok(()) => {
put_ok = true;
break;
}
Err(e) => {
last_err = Some(e);
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
if !put_ok {
bail!(
"host-peer PUT failed after {PUT_MAX_ATTEMPTS} attempts: {:?}",
last_err
);
}
tracing::info!("host-peer PUT succeeded");
make_subscribe(&mut client_nat, contract_key).await?;
await_subscribe_response(&mut client_nat, contract_key, Duration::from_secs(120)).await?;
tracing::info!("nat-peer SUBSCRIBE succeeded (relay registered observed addresses)");
tokio::time::sleep(Duration::from_secs(3)).await;
let updated = todo_state_with_title("update-from-host");
make_update(&mut client_host, contract_key, updated).await?;
tracing::info!("host-peer UPDATE sent; waiting for notification to route back to nat-peer");
let title =
await_update_notification(&mut client_nat, contract_key, Duration::from_secs(120)).await?;
assert_eq!(
title, "update-from-host",
"NAT peer must receive the update made on host-peer — a missing/wrong title means the \
broadcast did not route back through the downstream chain built from observed \
addresses (NAT address-filling regression)"
);
tracing::info!("nat-peer received update notification — NAT subscription end-to-end OK");
Ok(())
}