use anyhow::{bail, ensure};
use freenet::test_utils::{
self, TestContext, load_contract, load_delegate, make_get, make_get_with_blocking, make_put,
make_put_with_blocking, make_subscribe, make_update, verify_contract_exists,
};
use freenet_macros::freenet_test;
use freenet_stdlib::{
client_api::{ClientRequest, ContractResponse, HostResponse, QueryResponse, WebApi},
prelude::*,
};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::LazyLock;
use std::time::Duration;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
async fn get_contract(
client: &mut WebApi,
key: ContractKey,
temp_dir: impl AsRef<Path>,
) -> anyhow::Result<(ContractContainer, WrappedState)> {
const MAX_ATTEMPTS: usize = 3;
const ATTEMPT_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_RESPONSES_PER_ATTEMPT: usize = 20;
for attempt in 1..=MAX_ATTEMPTS {
tracing::info!("GET contract attempt {attempt}/{MAX_ATTEMPTS}");
make_get(client, key, true, false).await?;
for _ in 0..MAX_RESPONSES_PER_ATTEMPT {
let resp = tokio::time::timeout(ATTEMPT_TIMEOUT, client.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
contract: Some(contract),
state,
}))) => {
verify_contract_exists(temp_dir.as_ref(), key).await?;
tracing::info!("GET contract succeeded on attempt {attempt}");
return Ok((contract, state));
}
Ok(Ok(other)) => {
tracing::warn!("unexpected response while waiting for get: {:?}", other);
}
Ok(Err(e)) => {
tracing::warn!("GET attempt {attempt} error receiving response: {}", e);
break; }
Err(_) => {
tracing::warn!("GET attempt {attempt} timed out waiting for response");
break; }
}
}
if attempt == MAX_ATTEMPTS {
bail!("GET contract failed after {MAX_ATTEMPTS} attempts");
}
loop {
match tokio::time::timeout(Duration::from_millis(200), client.recv()).await {
Ok(Ok(resp)) => {
tracing::warn!("Discarding stray response prior to GET retry: {:?}", resp);
}
Ok(Err(err)) => {
tracing::warn!("Discarding stray error prior to GET retry: {}", err);
}
Err(_) => break,
}
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
unreachable!("get_contract loop should always return or bail");
}
async fn send_put_with_retry(
client: &mut WebApi,
state: WrappedState,
contract: ContractContainer,
description: &str,
expected_key: Option<ContractKey>,
) -> anyhow::Result<()> {
const MAX_ATTEMPTS: usize = 3;
const ATTEMPT_TIMEOUT: Duration = Duration::from_secs(60);
for attempt in 1..=MAX_ATTEMPTS {
tracing::info!("Sending {} (attempt {attempt}/{MAX_ATTEMPTS})", description);
make_put(client, state.clone(), contract.clone(), false).await?;
match tokio::time::timeout(ATTEMPT_TIMEOUT, client.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
if let Some(expected) = expected_key {
ensure!(
key == expected,
"{} returned unexpected contract key (expected {}, got {})",
description,
expected,
key
);
}
tracing::info!("{description} succeeded on attempt {attempt}");
return Ok(());
}
Ok(Ok(other)) => {
tracing::warn!(
"{} attempt {attempt} returned unexpected response: {:?}",
description,
other
);
}
Ok(Err(e)) => {
tracing::warn!(
"{} attempt {attempt} failed while receiving response: {}",
description,
e
);
}
Err(_) => {
tracing::warn!(
"{} attempt {attempt} timed out waiting for response",
description
);
}
}
if attempt == MAX_ATTEMPTS {
bail!("{description} failed after {MAX_ATTEMPTS} attempts");
}
loop {
match tokio::time::timeout(Duration::from_millis(200), client.recv()).await {
Ok(Ok(resp)) => {
tracing::warn!(
"Discarding stray response prior to retrying {}: {:?}",
description,
resp
);
}
Ok(Err(err)) => {
tracing::warn!(
"Discarding stray error prior to retrying {}: {}",
description,
err
);
}
Err(_) => break,
}
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
unreachable!("send_put_with_retry loop should always return or bail");
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4,
aggregate_events = "always"
)]
async fn test_put_contract(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let peer_a = ctx.node("peer-a")?;
let gateway = ctx.node("gateway")?;
let ws_api_port_peer_a = peer_a.ws_port;
let ws_api_port_peer_b = gateway.ws_port;
tracing::info!("Node A (peer-a) ws_port: {}", ws_api_port_peer_a);
tracing::info!("Node B (gateway) ws_port: {}", ws_api_port_peer_b);
let uri = peer_a.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api_a = WebApi::start(stream);
make_put(
&mut client_api_a,
wrapped_state.clone(),
contract.clone(),
false,
)
.await?;
tracing::info!("Waiting for PUT response...");
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
tracing::info!("PUT successful for contract: {}", key);
assert_eq!(key, contract_key);
}
Ok(Ok(other)) => {
tracing::warn!("unexpected response while waiting for put: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving put response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for put response after 120 seconds");
}
}
{
tracing::info!("getting contract from A");
let (response_contract, response_state) =
get_contract(&mut client_api_a, contract_key, &gateway.temp_dir_path).await?;
let response_key = response_contract.key();
assert_eq!(response_key, contract_key);
assert_eq!(response_contract, contract);
assert_eq!(response_state, wrapped_state);
}
{
let uri = gateway.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api_b = WebApi::start(stream);
let (response_contract, response_state) =
get_contract(&mut client_api_b, contract_key, &gateway.temp_dir_path).await?;
let response_key = response_contract.key();
assert_eq!(response_key, contract_key);
assert_eq!(response_contract, contract);
assert_eq!(response_state, wrapped_state);
client_api_b
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}
client_api_a
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_update_contract(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let peer_a = ctx.node("peer-a")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Node A (peer-a) data dir: {:?}", peer_a.temp_dir_path);
tracing::info!("Node B (gw) data dir: {:?}", gateway.temp_dir_path);
let uri = peer_a.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api_a = WebApi::start(stream);
make_put(
&mut client_api_a,
wrapped_state.clone(),
contract.clone(),
false,
)
.await?;
tracing::info!("Waiting for PUT response...");
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
tracing::info!("PUT successful for contract: {}", key);
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
}
Ok(Ok(other)) => {
tracing::warn!("unexpected response while waiting for put: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving put response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for put response after 120 seconds");
}
}
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
.unwrap_or_else(|_| test_utils::TodoList {
tasks: Vec::new(),
version: 0,
});
todo_list.tasks.push(test_utils::Task {
id: 1,
title: "Implement contract".to_string(),
description: "Create a smart contract for the todo list".to_string(),
completed: false,
priority: 3,
});
let updated_bytes = serde_json::to_vec(&todo_list).unwrap();
let updated_state = WrappedState::from(updated_bytes);
let expected_version_after_update = todo_list.version + 1;
make_update(&mut client_api_a, contract_key, updated_state.clone()).await?;
let resp = tokio::time::timeout(Duration::from_secs(30), client_api_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key,
summary: _,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE response"
);
}
Ok(Ok(other)) => {
bail!("unexpected response while waiting for update: {:?}", other);
}
Ok(Err(e)) => {
bail!("Client A: Error receiving update response: {}", e);
}
Err(_) => {
bail!("Client A: Timeout waiting for update response");
}
}
{
const MAX_VERSION_POLL_ATTEMPTS: usize = 30;
const VERSION_POLL_INTERVAL: Duration = Duration::from_secs(2);
let expected_todo_list: test_utils::TodoList =
serde_json::from_slice(updated_state.as_ref())
.expect("Failed to deserialize expected state");
let mut response_todo_list: Option<test_utils::TodoList> = None;
for attempt in 1..=MAX_VERSION_POLL_ATTEMPTS {
let (response_contract, response_state) =
get_contract(&mut client_api_a, contract_key, &gateway.temp_dir_path).await?;
assert_eq!(
response_contract.key(),
contract_key,
"Contract key mismatch in GET response"
);
assert_eq!(
response_contract, contract,
"Contract content mismatch in GET response"
);
let todo_list: test_utils::TodoList = serde_json::from_slice(response_state.as_ref())
.expect("Failed to deserialize response state");
tracing::info!(
"Version poll attempt {attempt}/{MAX_VERSION_POLL_ATTEMPTS}: got version {}, expected {}",
todo_list.version,
expected_version_after_update
);
if todo_list.version == expected_version_after_update {
response_todo_list = Some(todo_list);
break;
}
if attempt < MAX_VERSION_POLL_ATTEMPTS {
tokio::time::sleep(VERSION_POLL_INTERVAL).await;
}
}
let response_todo_list = response_todo_list.ok_or_else(|| {
anyhow::anyhow!(
"Version mismatch after {} attempts: expected {}, never observed",
MAX_VERSION_POLL_ATTEMPTS,
expected_version_after_update
)
})?;
assert_eq!(
response_todo_list.tasks.len(),
expected_todo_list.tasks.len(),
"Number of tasks should match"
);
assert_eq!(response_todo_list.tasks.len(), 1, "Should have one task");
assert_eq!(response_todo_list.tasks[0].id, 1, "Task ID should be 1");
assert_eq!(
response_todo_list.tasks[0].title, "Implement contract",
"Task title should match"
);
tracing::info!(
"Successfully verified updated state for contract {}",
contract_key
);
tracing::debug!(
"Response state: {:?}, Expected state: {:?}",
response_todo_list,
expected_todo_list
);
}
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let initial_wrapped_state = WrappedState::from(initial_state);
let peer_a = ctx.node("peer-a")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Node A data dir: {:?}", peer_a.temp_dir_path);
tracing::info!("Node B (gw) data dir: {:?}", gateway.temp_dir_path);
let uri = peer_a.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api_a = WebApi::start(stream);
send_put_with_retry(
&mut client_api_a,
initial_wrapped_state.clone(),
contract.clone(),
"first PUT (initial hosting)",
Some(contract_key),
)
.await?;
tokio::time::sleep(Duration::from_secs(2)).await;
let mut updated_todo_list: test_utils::TodoList =
serde_json::from_slice(initial_wrapped_state.as_ref()).unwrap();
for i in 1..=5 {
updated_todo_list.tasks.push(test_utils::Task {
id: i,
title: format!("Task {}", i),
description: format!("Description for task {}", i),
completed: false,
priority: i as u8,
});
}
let updated_bytes = serde_json::to_vec(&updated_todo_list).unwrap();
let updated_wrapped_state = WrappedState::from(updated_bytes);
tracing::info!(
"Initial state size: {} bytes, Updated state size: {} bytes",
initial_wrapped_state.as_ref().len(),
updated_wrapped_state.as_ref().len()
);
send_put_with_retry(
&mut client_api_a,
updated_wrapped_state.clone(),
contract.clone(),
"second PUT (merge)",
Some(contract_key),
)
.await?;
tokio::time::sleep(Duration::from_secs(2)).await;
let uri = gateway.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api_gateway = WebApi::start(stream);
tracing::info!("Getting contract from gateway to verify merged state was persisted...");
let (response_contract_gw, response_state_gw) = get_contract(
&mut client_api_gateway,
contract_key,
&gateway.temp_dir_path,
)
.await?;
assert_eq!(response_contract_gw.key(), contract_key);
let response_todo_list_gw: test_utils::TodoList =
serde_json::from_slice(response_state_gw.as_ref())
.expect("Failed to deserialize state from gateway");
tracing::info!(
"Gateway returned state with {} tasks, size {} bytes",
response_todo_list_gw.tasks.len(),
response_state_gw.as_ref().len()
);
assert_eq!(
response_todo_list_gw.tasks.len(),
5,
"Gateway should return merged state with 5 tasks (issue #1995: merged state must be persisted)"
);
assert_eq!(
response_state_gw.as_ref().len(),
updated_wrapped_state.as_ref().len(),
"Gateway state size should match the updated state"
);
tracing::info!(
"✓ Test passed: Gateway correctly persisted merged state after second PUT (issue #1995 fixed)"
);
client_api_a
.send(ClientRequest::Disconnect { cause: None })
.await?;
client_api_gateway
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "node-a", "node-b"],
timeout_secs = 600,
startup_wait_secs = 40,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_multiple_clients_subscription(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let node_a = ctx.node("node-a")?;
let node_b = ctx.node("node-b")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Node A data dir: {:?}", node_a.temp_dir_path);
tracing::info!("Gateway data dir: {:?}", gateway.temp_dir_path);
tracing::info!("Node B data dir: {:?}", node_b.temp_dir_path);
tracing::info!("Starting WebSocket connections after 40s startup wait");
let start_time = std::time::Instant::now();
let uri_a = node_a.ws_url();
let (stream1, _) = connect_async(&uri_a).await?;
let mut client_api1_node_a = WebApi::start(stream1);
let (stream2, _) = connect_async(&uri_a).await?;
let mut client_api2_node_a = WebApi::start(stream2);
let uri_b = node_b.ws_url();
let (stream3, _) = connect_async(&uri_b).await?;
let mut client_api_node_b = WebApi::start(stream3);
const PUT_MAX_ATTEMPTS: usize = 3;
let mut put_last_err: Option<anyhow::Error> = None;
for put_attempt in 1..=PUT_MAX_ATTEMPTS {
tracing::info!(
"Client 1: Starting PUT attempt {}/{} (elapsed: {:?})",
put_attempt,
PUT_MAX_ATTEMPTS,
start_time.elapsed()
);
if let Err(e) = make_put(
&mut client_api1_node_a,
wrapped_state.clone(),
contract.clone(),
false, )
.await
{
put_last_err = Some(e);
tracing::warn!(
"Client 1: PUT send failed on attempt {}, retrying...",
put_attempt
);
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
let mut put_succeeded = false;
let deadline = tokio::time::Instant::now() + Duration::from_secs(120);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
put_last_err = Some(anyhow::anyhow!("Timeout waiting for put response"));
break;
}
match tokio::time::timeout(remaining, client_api1_node_a.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
tracing::info!(
"Client 1: PUT completed successfully on attempt {} (elapsed: {:?})",
put_attempt,
start_time.elapsed()
);
put_succeeded = true;
break;
}
Ok(Ok(other)) => {
tracing::warn!("unexpected response while waiting for put: {:?}", other);
}
Ok(Err(e)) => {
put_last_err = Some(anyhow::anyhow!("Error receiving put response: {}", e));
break;
}
Err(_) => {
put_last_err = Some(anyhow::anyhow!("Timeout waiting for put response"));
break;
}
}
}
if put_succeeded {
put_last_err = None;
break;
}
tracing::warn!(
"Client 1: PUT attempt {} failed: {}; retrying after delay...",
put_attempt,
put_last_err.as_ref().unwrap()
);
tokio::time::sleep(Duration::from_secs(2)).await;
}
if let Some(e) = put_last_err {
bail!("PUT failed after {} attempts: {}", PUT_MAX_ATTEMPTS, e);
}
make_subscribe(&mut client_api1_node_a, contract_key).await?;
loop {
let resp = tokio::time::timeout(Duration::from_secs(30), client_api1_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in SUBSCRIBE response"
);
assert!(subscribed, "Failed to subscribe to contract");
tracing::info!("Client 1: Successfully subscribed to contract {}", key);
break;
}
Ok(Ok(other)) => {
tracing::warn!(
"Client 1: unexpected response while waiting for subscribe: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Client 1: Error receiving subscribe response: {}", e);
}
Err(_) => {
bail!("Client 1: Timeout waiting for subscribe response");
}
}
}
make_get(&mut client_api2_node_a, contract_key, true, false).await?;
loop {
let resp = tokio::time::timeout(Duration::from_secs(30), client_api2_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
contract: Some(_),
state: _,
}))) => {
assert_eq!(key, contract_key, "Contract key mismatch in GET response");
break;
}
Ok(Ok(other)) => {
tracing::warn!("unexpected response while waiting for get: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving get response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for get response");
}
}
}
make_subscribe(&mut client_api2_node_a, contract_key).await?;
loop {
let resp = tokio::time::timeout(Duration::from_secs(30), client_api2_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in SUBSCRIBE response"
);
assert!(subscribed, "Failed to subscribe to contract");
tracing::info!("Client 2: Successfully subscribed to contract {}", key);
break;
}
Ok(Ok(other)) => {
tracing::warn!(
"Client 2: unexpected response while waiting for subscribe: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Client 2: Error receiving subscribe response: {}", e);
}
Err(_) => {
bail!("Client 2: Timeout waiting for subscribe response");
}
}
}
tracing::info!("Waiting 5 seconds for contract to propagate across nodes...");
tokio::time::sleep(Duration::from_secs(5)).await;
tracing::info!(
"Client 3: Sending GET request for contract {} to Node B",
contract_key
);
let get_start = std::time::Instant::now();
make_get(&mut client_api_node_b, contract_key, true, false).await?;
loop {
let resp = tokio::time::timeout(Duration::from_secs(60), client_api_node_b.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
contract: Some(_),
state: _,
}))) => {
let elapsed = get_start.elapsed();
tracing::info!("Client 3: Received GET response after {:?}", elapsed);
assert_eq!(
key, contract_key,
"Contract key mismatch in GET response for client 3"
);
break;
}
Ok(Ok(other)) => {
tracing::warn!(
"Client 3: unexpected response while waiting for get: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Client 3: Error receiving get response: {}", e);
}
Err(_) => {
let elapsed = get_start.elapsed();
bail!(
"Client 3: Timeout waiting for get response after {:?}. Contract may not have propagated from Node A to Node B",
elapsed
);
}
}
}
make_subscribe(&mut client_api_node_b, contract_key).await?;
loop {
let resp = tokio::time::timeout(Duration::from_secs(60), client_api_node_b.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in SUBSCRIBE response for client 3"
);
assert!(subscribed, "Failed to subscribe to contract for client 3");
tracing::info!("Client 3: Successfully subscribed to contract {}", key);
break;
}
Ok(Ok(other)) => {
tracing::warn!(
"Client 3: unexpected response while waiting for subscribe: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Client 3: Error receiving subscribe response: {}", e);
}
Err(_) => {
bail!("Client 3: Timeout waiting for subscribe response");
}
}
}
tracing::info!("All clients subscribed, proceeding with UPDATE operation");
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
.unwrap_or_else(|_| test_utils::TodoList {
tasks: Vec::new(),
version: 0,
});
todo_list.tasks.push(test_utils::Task {
id: 1,
title: "Test multiple clients".to_string(),
description: "Verify that update notifications are received by multiple clients"
.to_string(),
completed: false,
priority: 5,
});
let updated_bytes = serde_json::to_vec(&todo_list).unwrap();
let updated_state = WrappedState::from(updated_bytes);
make_update(&mut client_api1_node_a, contract_key, updated_state.clone()).await?;
let mut client1_received_notification = false;
let mut client2_received_notification = false;
let mut client_node_b_received_notification = false;
let mut received_update_response = false;
let expected_task = test_utils::Task {
id: 1,
title: "Test multiple clients".to_string(),
description: "Verify that update notifications are received by multiple clients"
.to_string(),
completed: false,
priority: 5,
};
let start_time = std::time::Instant::now();
while start_time.elapsed() < Duration::from_secs(90)
&& (!received_update_response
|| !client1_received_notification
|| !client2_received_notification
|| !client_node_b_received_notification)
{
if !received_update_response || !client1_received_notification {
let resp =
tokio::time::timeout(Duration::from_secs(1), client_api1_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key,
summary: _,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE response"
);
tracing::info!("Client 1: Received update response for contract {}", key);
received_update_response = true;
}
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
update,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE notification for client 1"
);
match update {
UpdateData::State(state) => {
let received_todo_list: test_utils::TodoList =
serde_json::from_slice(state.as_ref())
.expect("Failed to deserialize state from update notification");
assert_eq!(received_todo_list.tasks.len(), 1, "Should have one task");
assert_eq!(
received_todo_list.tasks[0].id, expected_task.id,
"Task ID should match"
);
assert_eq!(
received_todo_list.tasks[0].title, expected_task.title,
"Task title should match"
);
assert_eq!(
received_todo_list.tasks[0].description, expected_task.description,
"Task description should match"
);
assert_eq!(
received_todo_list.tasks[0].completed, expected_task.completed,
"Task completed status should match"
);
assert_eq!(
received_todo_list.tasks[0].priority, expected_task.priority,
"Task priority should match"
);
tracing::info!("Client 1: Successfully verified update content");
}
UpdateData::Delta(_)
| UpdateData::StateAndDelta { .. }
| UpdateData::RelatedState { .. }
| UpdateData::RelatedDelta { .. }
| UpdateData::RelatedStateAndDelta { .. }
| _ => {
tracing::warn!(
"Client 1: Received unexpected update type: {:?}",
update
);
}
}
tracing::info!(
"✅ Client 1: Successfully received update notification for contract {}",
key
);
client1_received_notification = true;
}
Ok(Ok(other)) => {
tracing::debug!("Client 1: Received unexpected response: {:?}", other);
}
Ok(Err(e)) => {
tracing::debug!("Client 1: Error receiving response: {}", e);
}
Err(_) => {
}
}
}
if !client2_received_notification {
let resp =
tokio::time::timeout(Duration::from_secs(1), client_api2_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
update,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE notification for client 2"
);
match update {
UpdateData::State(state) => {
let received_todo_list: test_utils::TodoList =
serde_json::from_slice(state.as_ref())
.expect("Failed to deserialize state from update notification");
assert_eq!(received_todo_list.tasks.len(), 1, "Should have one task");
assert_eq!(
received_todo_list.tasks[0].id, expected_task.id,
"Task ID should match"
);
assert_eq!(
received_todo_list.tasks[0].title, expected_task.title,
"Task title should match"
);
assert_eq!(
received_todo_list.tasks[0].description, expected_task.description,
"Task description should match"
);
assert_eq!(
received_todo_list.tasks[0].completed, expected_task.completed,
"Task completed status should match"
);
assert_eq!(
received_todo_list.tasks[0].priority, expected_task.priority,
"Task priority should match"
);
tracing::info!("Client 2: Successfully verified update content");
}
UpdateData::Delta(_)
| UpdateData::StateAndDelta { .. }
| UpdateData::RelatedState { .. }
| UpdateData::RelatedDelta { .. }
| UpdateData::RelatedStateAndDelta { .. }
| _ => {
tracing::warn!(
"Client 2: Received unexpected update type: {:?}",
update
);
}
}
tracing::info!(
"✅ Client 2: Successfully received update notification for contract {}",
key
);
client2_received_notification = true;
}
Ok(Ok(other)) => {
tracing::debug!("Client 2: Received unexpected response: {:?}", other);
}
Ok(Err(e)) => {
tracing::debug!("Client 2: Error receiving response: {}", e);
}
Err(_) => {
}
}
}
if !client_node_b_received_notification {
let resp = tokio::time::timeout(Duration::from_secs(1), client_api_node_b.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
update,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE notification for client 3"
);
match update {
UpdateData::State(state) => {
let received_todo_list: test_utils::TodoList =
serde_json::from_slice(state.as_ref())
.expect("Failed to deserialize state from update notification");
assert_eq!(received_todo_list.tasks.len(), 1, "Should have one task");
assert_eq!(
received_todo_list.tasks[0].id, expected_task.id,
"Task ID should match"
);
assert_eq!(
received_todo_list.tasks[0].title, expected_task.title,
"Task title should match"
);
assert_eq!(
received_todo_list.tasks[0].description, expected_task.description,
"Task description should match"
);
assert_eq!(
received_todo_list.tasks[0].completed, expected_task.completed,
"Task completed status should match"
);
assert_eq!(
received_todo_list.tasks[0].priority, expected_task.priority,
"Task priority should match"
);
tracing::info!(
"Client 3: Successfully verified update content (cross-node)"
);
}
UpdateData::Delta(_)
| UpdateData::StateAndDelta { .. }
| UpdateData::RelatedState { .. }
| UpdateData::RelatedDelta { .. }
| UpdateData::RelatedStateAndDelta { .. }
| _ => {
tracing::warn!(
"Client 3: Received unexpected update type: {:?}",
update
);
}
}
tracing::info!(
"✅ Client 3: Successfully received update notification for contract {} (cross-node)",
key
);
client_node_b_received_notification = true;
}
Ok(Ok(other)) => {
tracing::debug!("Client 3: Received unexpected response: {:?}", other);
}
Ok(Err(e)) => {
tracing::debug!("Client 3: Error receiving response: {}", e);
}
Err(_) => {
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
assert!(
received_update_response,
"Did not receive update response within timeout period"
);
assert!(
client1_received_notification,
"Client 1 did not receive update notification within timeout period"
);
assert!(
client2_received_notification,
"Client 2 did not receive update notification within timeout period"
);
assert!(
client_node_b_received_notification,
"Client 3 did not receive update notification within timeout period (cross-node)"
);
client_api1_node_a
.send(ClientRequest::Disconnect { cause: None })
.await?;
client_api2_node_a
.send(ClientRequest::Disconnect { cause: None })
.await?;
client_api_node_b
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "node-a"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_get_with_subscribe_flag(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let node_a = ctx.node("node-a")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Node A data dir: {:?}", node_a.temp_dir_path);
tracing::info!("Node B (gw) data dir: {:?}", gateway.temp_dir_path);
let uri_a = node_a.ws_url();
let (stream1, _) = connect_async(&uri_a).await?;
let mut client_api1_node_a = WebApi::start(stream1);
tracing::info!("Client 1: Put contract with initial state");
make_put(
&mut client_api1_node_a,
wrapped_state.clone(),
contract.clone(),
false, )
.await?;
let resp = tokio::time::timeout(Duration::from_secs(45), client_api1_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
}
Ok(Ok(other)) => {
bail!("unexpected response while waiting for put: {:?}", other);
}
Ok(Err(e)) => {
bail!("Client 1: Error receiving put response: {}", e);
}
Err(_) => {
bail!("Client 1: Timeout waiting for put response");
}
}
tracing::warn!("Client 1: Successfully put contract {}", contract_key);
let (stream2, _) = connect_async(&uri_a).await?;
let mut client_api2_node_a = WebApi::start(stream2);
make_get(&mut client_api2_node_a, contract_key, true, true).await?;
let resp = tokio::time::timeout(Duration::from_secs(30), client_api2_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
contract: Some(_),
state: _,
}))) => {
assert_eq!(key, contract_key, "Contract key mismatch in GET response");
}
Ok(Ok(other)) => {
bail!("unexpected response while waiting for get: {:?}", other);
}
Ok(Err(e)) => {
bail!("Client 2: Error receiving get response: {}", e);
}
Err(_) => {
bail!("Client 2: Timeout waiting for get response");
}
}
tracing::info!(
"Client 2: Local subscription registered via GET with subscribe=true - \
notification delivery will use executor channels, not network broadcast"
);
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
.unwrap_or_else(|_| test_utils::TodoList {
tasks: Vec::new(),
version: 0,
});
todo_list.tasks.push(test_utils::Task {
id: 1,
title: "Test auto-subscribe with GET".to_string(),
description: "Verify that auto-subscribe works with GET operation".to_string(),
completed: false,
priority: 5,
});
let updated_bytes = serde_json::to_vec(&todo_list).unwrap();
let updated_state = WrappedState::from(updated_bytes);
make_update(&mut client_api1_node_a, contract_key, updated_state.clone()).await?;
let resp = tokio::time::timeout(Duration::from_secs(30), client_api1_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key,
summary: _,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE response"
);
}
Ok(Ok(other)) => {
bail!("unexpected response while waiting for update: {:?}", other);
}
Ok(Err(e)) => {
bail!("Client 1: Error receiving update response: {}", e);
}
Err(_) => {
bail!("Client 1: Timeout waiting for update response");
}
}
let expected_task = test_utils::Task {
id: 1,
title: "Test auto-subscribe with GET".to_string(),
description: "Verify that auto-subscribe works with GET operation".to_string(),
completed: false,
priority: 5,
};
let mut client2_node_a_received_notification = false;
let start_time = std::time::Instant::now();
while start_time.elapsed() < Duration::from_secs(30) && !client2_node_a_received_notification {
let resp = tokio::time::timeout(Duration::from_secs(1), client_api2_node_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
update,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE notification for client 2"
);
match update {
UpdateData::State(state) => {
let received_todo_list: test_utils::TodoList =
serde_json::from_slice(state.as_ref())
.expect("Failed to deserialize state from update notification");
assert_eq!(received_todo_list.tasks.len(), 1, "Should have one task");
assert_eq!(
received_todo_list.tasks[0].id, expected_task.id,
"Task ID should match"
);
assert_eq!(
received_todo_list.tasks[0].title, expected_task.title,
"Task title should match"
);
assert_eq!(
received_todo_list.tasks[0].description, expected_task.description,
"Task description should match"
);
assert_eq!(
received_todo_list.tasks[0].completed, expected_task.completed,
"Task completed status should match"
);
assert_eq!(
received_todo_list.tasks[0].priority, expected_task.priority,
"Task priority should match"
);
tracing::info!("Client 1: Successfully verified update content");
}
UpdateData::Delta(_)
| UpdateData::StateAndDelta { .. }
| UpdateData::RelatedState { .. }
| UpdateData::RelatedDelta { .. }
| UpdateData::RelatedStateAndDelta { .. }
| _ => {
tracing::warn!("Client 1: Received unexpected update type: {:?}", update);
}
}
client2_node_a_received_notification = true;
break;
}
Ok(Ok(other)) => {
bail!("unexpected response while waiting for update: {:?}", other);
}
Ok(Err(e)) => {
tracing::error!("Client 2: Timeout waiting for update: {}", e);
}
Err(_) => {
tracing::error!("Client 2: Timeout waiting for update response");
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
assert!(
client2_node_a_received_notification,
"Client 2 did not receive update notification - local subscription path failed. \
This validates that executor notification channels work independently of network \
subscription propagation (Issue #2075 decoupling)."
);
tracing::info!(
"SUCCESS: Local subscription delivered update via executor channels - \
no network registration was required"
);
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "node-a"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_put_with_subscribe_flag(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let node_a = ctx.node("node-a")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Node A data dir: {:?}", node_a.temp_dir_path);
tracing::info!("Gateway data dir: {:?}", gateway.temp_dir_path);
let uri_a = node_a.ws_url();
let (stream1, _) = connect_async(&uri_a).await?;
let mut client_api1 = WebApi::start(stream1);
let (stream2, _) = connect_async(&uri_a).await?;
let mut client_api2 = WebApi::start(stream2);
let put_start = std::time::Instant::now();
tracing::info!(
contract = %contract_key,
client = 1,
subscribe = true,
phase = "put_request",
"Sending PUT request"
);
make_put(
&mut client_api1,
wrapped_state.clone(),
contract.clone(),
true, )
.await?;
let mut put_response_received = false;
let start = std::time::Instant::now();
while !put_response_received && start.elapsed() < Duration::from_secs(30) {
let resp = tokio::time::timeout(Duration::from_secs(5), client_api1.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
tracing::info!(
contract = %contract_key,
client = 1,
elapsed_ms = put_start.elapsed().as_millis(),
phase = "put_response",
"PUT response received"
);
put_response_received = true;
}
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed: _,
}))) => {
bail!(
"Client 1: Received unexpected SubscribeResponse for contract {key} - \
sub-operations should not send client notifications"
);
}
Ok(Ok(other)) => {
tracing::debug!(
contract = %contract_key,
client = 1,
elapsed_ms = start.elapsed().as_millis(),
response = ?other,
"Received unexpected response while waiting for PUT"
);
}
Ok(Err(e)) => {
tracing::error!(
contract = %contract_key,
client = 1,
elapsed_ms = start.elapsed().as_millis(),
error = %e,
phase = "put_error",
"WebSocket error receiving PUT response"
);
bail!("WebSocket error while waiting for PUT response: {}", e);
}
Err(_) => {
tracing::debug!(
contract = %contract_key,
client = 1,
elapsed_ms = start.elapsed().as_millis(),
"Waiting for PUT response (no message in 5s)"
);
}
}
}
if !put_response_received {
tracing::error!(
contract = %contract_key,
client = 1,
elapsed_ms = start.elapsed().as_millis(),
phase = "put_timeout",
"PUT response timeout after 30 seconds"
);
bail!("Client 1: Did not receive PUT response within 30 seconds");
}
let get_start = std::time::Instant::now();
let mut get_response_received = false;
const MAX_GET_ATTEMPTS: u32 = 3;
const GET_ATTEMPT_TIMEOUT_SECS: u64 = 15;
for attempt in 1..=MAX_GET_ATTEMPTS {
tracing::info!(
contract = %contract_key,
client = 2,
attempt,
max_attempts = MAX_GET_ATTEMPTS,
phase = "get_request",
"Sending GET request"
);
make_get(&mut client_api2, contract_key, true, false).await?;
let attempt_start = std::time::Instant::now();
while !get_response_received
&& attempt_start.elapsed() < Duration::from_secs(GET_ATTEMPT_TIMEOUT_SECS)
{
let resp = tokio::time::timeout(Duration::from_secs(5), client_api2.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
contract: Some(_),
state: _,
}))) => {
assert_eq!(key, contract_key, "Contract key mismatch in GET response");
tracing::info!(
contract = %contract_key,
client = 2,
attempt,
elapsed_ms = get_start.elapsed().as_millis(),
phase = "get_response",
"GET response received"
);
get_response_received = true;
}
Ok(Ok(other)) => {
tracing::debug!(
contract = %contract_key,
client = 2,
attempt,
elapsed_ms = attempt_start.elapsed().as_millis(),
response = ?other,
"Received unexpected response while waiting for GET"
);
}
Ok(Err(e)) => {
tracing::error!(
contract = %contract_key,
client = 2,
attempt,
elapsed_ms = attempt_start.elapsed().as_millis(),
error = %e,
phase = "get_error",
"WebSocket error receiving GET response"
);
bail!("WebSocket error while waiting for GET response: {}", e);
}
Err(_) => {
tracing::debug!(
contract = %contract_key,
client = 2,
attempt,
elapsed_ms = attempt_start.elapsed().as_millis(),
"Waiting for GET response (no message in 5s)"
);
}
}
}
if get_response_received {
break;
}
if attempt < MAX_GET_ATTEMPTS {
tracing::warn!(
contract = %contract_key,
client = 2,
attempt,
elapsed_ms = get_start.elapsed().as_millis(),
phase = "get_retry",
"GET response not received within {}s, retrying (attempt {}/{})",
GET_ATTEMPT_TIMEOUT_SECS,
attempt,
MAX_GET_ATTEMPTS
);
}
}
if !get_response_received {
tracing::error!(
contract = %contract_key,
client = 2,
elapsed_ms = get_start.elapsed().as_millis(),
phase = "get_timeout",
"GET response timeout after {} attempts ({} seconds each)",
MAX_GET_ATTEMPTS,
GET_ATTEMPT_TIMEOUT_SECS
);
bail!(
"Client 2: Did not receive GET response after {} attempts",
MAX_GET_ATTEMPTS
);
}
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
.unwrap_or_else(|_| test_utils::TodoList {
tasks: Vec::new(),
version: 0,
});
todo_list.tasks.push(test_utils::Task {
id: 1,
title: "Test auto-subscribe with PUT".to_string(),
description: "Verify that auto-subscribe works with PUT operation".to_string(),
completed: false,
priority: 5,
});
let updated_bytes = serde_json::to_vec(&todo_list).unwrap();
let updated_state = WrappedState::from(updated_bytes);
let update_start = std::time::Instant::now();
tracing::info!(
contract = %contract_key,
client = 2,
phase = "update_request",
"Sending UPDATE request to trigger notification"
);
make_update(&mut client_api2, contract_key, updated_state.clone()).await?;
let mut update_response_received = false;
let start = std::time::Instant::now();
while !update_response_received && start.elapsed() < Duration::from_secs(30) {
let resp = tokio::time::timeout(Duration::from_secs(5), client_api2.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key,
summary: _,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE response"
);
tracing::info!(
contract = %contract_key,
client = 2,
elapsed_ms = update_start.elapsed().as_millis(),
phase = "update_response",
"UPDATE response received"
);
update_response_received = true;
}
Ok(Ok(other)) => {
tracing::debug!(
contract = %contract_key,
client = 2,
elapsed_ms = start.elapsed().as_millis(),
response = ?other,
"Received unexpected response while waiting for UPDATE"
);
}
Ok(Err(e)) => {
tracing::error!(
contract = %contract_key,
client = 2,
elapsed_ms = start.elapsed().as_millis(),
error = %e,
phase = "update_error",
"WebSocket error receiving UPDATE response"
);
bail!("WebSocket error while waiting for UPDATE response: {}", e);
}
Err(_) => {
tracing::debug!(
contract = %contract_key,
client = 2,
elapsed_ms = start.elapsed().as_millis(),
"Waiting for UPDATE response (no message in 5s)"
);
}
}
}
if !update_response_received {
tracing::error!(
contract = %contract_key,
client = 2,
elapsed_ms = start.elapsed().as_millis(),
phase = "update_timeout",
"UPDATE response timeout after 30 seconds"
);
bail!("Client 2: Did not receive UPDATE response within 30 seconds");
}
let expected_task = test_utils::Task {
id: 1,
title: "Test auto-subscribe with PUT".to_string(),
description: "Verify that auto-subscribe works with PUT operation".to_string(),
completed: false,
priority: 5,
};
let mut client1_received_notification = false;
let notification_start = std::time::Instant::now();
tracing::info!(
contract = %contract_key,
client = 1,
phase = "notification_wait",
"Waiting for UPDATE notification (auto-subscribed via PUT)"
);
while notification_start.elapsed() < Duration::from_secs(30) && !client1_received_notification {
let resp = tokio::time::timeout(Duration::from_secs(1), client_api1.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
update,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE notification for client 1"
);
match update {
UpdateData::State(state) => {
let received_todo_list: test_utils::TodoList =
serde_json::from_slice(state.as_ref())
.expect("Failed to deserialize state from update notification");
assert_eq!(received_todo_list.tasks.len(), 1, "Should have one task");
assert_eq!(
received_todo_list.tasks[0].id, expected_task.id,
"Task ID should match"
);
assert_eq!(
received_todo_list.tasks[0].title, expected_task.title,
"Task title should match"
);
assert_eq!(
received_todo_list.tasks[0].description, expected_task.description,
"Task description should match"
);
assert_eq!(
received_todo_list.tasks[0].completed, expected_task.completed,
"Task completed status should match"
);
assert_eq!(
received_todo_list.tasks[0].priority, expected_task.priority,
"Task priority should match"
);
tracing::info!(
contract = %contract_key,
client = 1,
elapsed_ms = notification_start.elapsed().as_millis(),
phase = "notification_received",
"UPDATE notification received and verified"
);
}
UpdateData::Delta(_)
| UpdateData::StateAndDelta { .. }
| UpdateData::RelatedState { .. }
| UpdateData::RelatedDelta { .. }
| UpdateData::RelatedStateAndDelta { .. }
| _ => {
tracing::warn!(
contract = %contract_key,
client = 1,
update_type = ?update,
"Received unexpected update type in notification"
);
}
}
client1_received_notification = true;
break;
}
Ok(Ok(other)) => {
tracing::debug!(
contract = %contract_key,
client = 1,
elapsed_ms = notification_start.elapsed().as_millis(),
response = ?other,
"Received unexpected response while waiting for notification"
);
}
Ok(Err(e)) => {
tracing::error!(
contract = %contract_key,
client = 1,
elapsed_ms = notification_start.elapsed().as_millis(),
error = %e,
phase = "notification_error",
"WebSocket error receiving notification"
);
bail!(
"WebSocket error while waiting for update notification: {}",
e
);
}
Err(_) => {
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
if !client1_received_notification {
tracing::error!(
contract = %contract_key,
client = 1,
elapsed_ms = notification_start.elapsed().as_millis(),
phase = "notification_timeout",
"UPDATE notification not received within 30 seconds - auto-subscribe via PUT may have failed"
);
}
assert!(
client1_received_notification,
"Client 1 did not receive update notification within timeout period (auto-subscribe via PUT failed)"
);
tracing::info!(
contract = %contract_key,
phase = "test_complete",
"test_put_with_subscribe_flag completed successfully"
);
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "node-a"],
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_put_with_blocking_subscribe(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let node_a = ctx.node("node-a")?;
let uri_a = node_a.ws_url();
let (stream1, _) = connect_async(&uri_a).await?;
let mut client_api1 = WebApi::start(stream1);
let put_start = std::time::Instant::now();
tracing::info!(
contract = %contract_key,
subscribe = true,
blocking_subscribe = true,
phase = "put_request",
"Sending blocking subscribe PUT request"
);
make_put_with_blocking(
&mut client_api1,
wrapped_state.clone(),
contract.clone(),
true, true, )
.await?;
let mut put_response_received = false;
let start = std::time::Instant::now();
while !put_response_received && start.elapsed() < Duration::from_secs(45) {
let resp = tokio::time::timeout(Duration::from_secs(5), client_api1.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
tracing::info!(
contract = %contract_key,
elapsed_ms = put_start.elapsed().as_millis(),
phase = "put_response",
"Blocking subscribe PUT response received"
);
put_response_received = true;
}
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed: _,
}))) => {
bail!(
"Received unexpected SubscribeResponse for contract {key} - \
sub-operations should not send client notifications"
);
}
Ok(Ok(other)) => {
tracing::debug!(
response = ?other,
"Received unexpected response while waiting for blocking PUT"
);
}
Ok(Err(e)) => {
bail!("WebSocket error while waiting for PUT response: {}", e);
}
Err(_) => {
tracing::debug!(
elapsed_ms = start.elapsed().as_millis(),
"Waiting for blocking PUT response (no message in 5s)"
);
}
}
}
ensure!(
put_response_received,
"Did not receive PUT response within 45 seconds for blocking_subscribe=true"
);
tracing::info!(
contract = %contract_key,
elapsed_ms = put_start.elapsed().as_millis(),
phase = "test_complete",
"test_put_with_blocking_subscribe completed successfully"
);
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "node-a"],
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_get_with_blocking_subscribe(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let node_a = ctx.node("node-a")?;
let uri_a = node_a.ws_url();
let (stream1, _) = connect_async(&uri_a).await?;
let mut client_api1 = WebApi::start(stream1);
make_put(
&mut client_api1,
wrapped_state.clone(),
contract.clone(),
false,
)
.await?;
let resp = tokio::time::timeout(Duration::from_secs(45), client_api1.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
}
other => bail!("Unexpected response waiting for PUT: {:?}", other),
}
let (stream2, _) = connect_async(&uri_a).await?;
let mut client_api2 = WebApi::start(stream2);
tracing::info!(
contract = %contract_key,
subscribe = true,
blocking_subscribe = true,
phase = "get_request",
"Sending blocking subscribe GET request"
);
make_get_with_blocking(&mut client_api2, contract_key, true, true, true).await?;
let mut get_response_received = false;
let start = std::time::Instant::now();
while !get_response_received && start.elapsed() < Duration::from_secs(45) {
let resp = tokio::time::timeout(Duration::from_secs(5), client_api2.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
contract: _,
state: _,
}))) => {
assert_eq!(key, contract_key, "Contract key mismatch in GET response");
tracing::info!(
contract = %contract_key,
elapsed_ms = start.elapsed().as_millis(),
phase = "get_response",
"Blocking subscribe GET response received"
);
get_response_received = true;
}
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed: _,
}))) => {
bail!(
"Received unexpected SubscribeResponse for contract {key} - \
sub-operations should not send client notifications"
);
}
Ok(Ok(other)) => {
tracing::debug!(
response = ?other,
"Received unexpected response while waiting for blocking GET"
);
}
Ok(Err(e)) => {
bail!("WebSocket error while waiting for GET response: {}", e);
}
Err(_) => {
tracing::debug!(
elapsed_ms = start.elapsed().as_millis(),
"Waiting for blocking GET response (no message in 5s)"
);
}
}
}
ensure!(
get_response_received,
"Did not receive GET response within 45 seconds for blocking_subscribe=true"
);
tracing::info!(
contract = %contract_key,
elapsed_ms = start.elapsed().as_millis(),
phase = "test_complete",
"test_get_with_blocking_subscribe completed successfully"
);
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "client-node"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_delegate_request(ctx: &mut TestContext) -> TestResult {
const TEST_DELEGATE: &str = "test-delegate-integration";
unsafe {
std::env::set_var("CARGO_PROFILE_RELEASE_LTO", "true");
std::env::set_var("CARGO_PROFILE_RELEASE_CODEGEN_UNITS", "1");
std::env::set_var("CARGO_PROFILE_RELEASE_STRIP", "true");
}
let params = Parameters::from(vec![]);
let delegate = load_delegate(TEST_DELEGATE, params.clone())?;
let delegate_key = delegate.key().clone();
let client_node = ctx.node("client-node")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Client node data dir: {:?}", client_node.temp_dir_path);
tracing::info!("Gateway node data dir: {:?}", gateway.temp_dir_path);
let uri = client_node.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client = WebApi::start(stream);
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::RegisterDelegate {
delegate: delegate.clone(),
cipher: freenet_stdlib::client_api::DelegateRequest::DEFAULT_CIPHER,
nonce: freenet_stdlib::client_api::DelegateRequest::DEFAULT_NONCE,
},
))
.await?;
let resp = tokio::time::timeout(Duration::from_secs(10), client.recv()).await??;
match resp {
HostResponse::DelegateResponse { key, values: _ } => {
assert_eq!(
key, delegate_key,
"Delegate key mismatch in register response"
);
tracing::info!("Successfully registered delegate with key: {key}");
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => {
bail!(
"Unexpected response while waiting for register: {:?}",
other
);
}
}
#[derive(Debug, Serialize, Deserialize)]
enum InboundAppMessage {
TestRequest(String),
}
let _app_id = ContractInstanceId::new([0; 32]);
let request_data = "test-request-data".to_string();
let payload = bincode::serialize(&InboundAppMessage::TestRequest(request_data.clone()))?;
let app_msg = ApplicationMessage::new(payload);
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::ApplicationMessages {
key: delegate_key.clone(),
params: params.clone(),
inbound: vec![InboundDelegateMsg::ApplicationMessage(app_msg)],
},
))
.await?;
let resp = tokio::time::timeout(Duration::from_secs(10), client.recv()).await??;
match resp {
HostResponse::DelegateResponse {
key,
values: outbound,
} => {
assert_eq!(key, delegate_key, "Delegate key mismatch in response");
assert!(!outbound.is_empty(), "No output messages from delegate");
let app_msg = match &outbound[0] {
OutboundDelegateMsg::ApplicationMessage(msg) => msg,
other @ OutboundDelegateMsg::RequestUserInput(_)
| other @ OutboundDelegateMsg::ContextUpdated(_)
| other @ OutboundDelegateMsg::GetContractRequest(_)
| other @ OutboundDelegateMsg::PutContractRequest(_)
| other @ OutboundDelegateMsg::UpdateContractRequest(_)
| other @ OutboundDelegateMsg::SubscribeContractRequest(_)
| other @ OutboundDelegateMsg::SendDelegateMessage(_) => {
bail!("Expected ApplicationMessage, got {:?}", other)
}
};
assert!(app_msg.processed, "Message not marked as processed");
#[derive(Debug, Deserialize)]
enum OutboundAppMessage {
TestResponse(String, Vec<u8>),
}
let response: OutboundAppMessage = bincode::deserialize(&app_msg.payload)?;
match response {
OutboundAppMessage::TestResponse(text, data) => {
assert_eq!(
text,
format!("Processed: {request_data}"),
"Response text doesn't match expected format"
);
assert_eq!(
data,
vec![4, 5, 6],
"Response data doesn't match expected value"
);
tracing::info!("Successfully received and verified delegate response");
}
}
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => {
bail!(
"Unexpected response while waiting for delegate response: {:?}",
other
);
}
}
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 120,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_attested_contract_passed_to_delegate(ctx: &mut TestContext) -> TestResult {
use freenet::dev_tool::AuthToken;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
const TEST_DELEGATE: &str = "test-delegate-attested";
#[derive(Debug, Serialize, Deserialize)]
enum InboundAppMessage {
CheckAttested,
}
#[derive(Debug, Deserialize)]
enum OutboundAppMessage {
Attested(Option<Vec<u8>>),
}
let gateway = ctx.node("gateway")?;
let token = AuthToken::from("test-attested-contract-token-e2e-12345".to_string());
let expected_contract_id = ContractInstanceId::new([42u8; 32]);
gateway.insert_origin_contract(token.clone(), expected_contract_id);
let params = Parameters::from(vec![]);
let delegate = load_delegate(TEST_DELEGATE, params.clone())?;
let delegate_key = delegate.key().clone();
let ws_url = gateway.ws_url();
let mut ws_request = ws_url.as_str().into_client_request()?;
ws_request.headers_mut().insert(
"Authorization",
format!("Bearer {}", token.as_str()).parse()?,
);
let (stream, _) = connect_async(ws_request).await?;
let mut client = WebApi::start(stream);
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::RegisterDelegate {
delegate: delegate.clone(),
cipher: freenet_stdlib::client_api::DelegateRequest::DEFAULT_CIPHER,
nonce: freenet_stdlib::client_api::DelegateRequest::DEFAULT_NONCE,
},
))
.await?;
let resp = timeout(Duration::from_secs(10), client.recv()).await??;
match resp {
HostResponse::DelegateResponse { key, .. } => {
assert_eq!(key, delegate_key, "Delegate key mismatch on register");
tracing::info!("Delegate registered: {key}");
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Expected DelegateResponse on register, got {:?}", other),
}
let _app_id = ContractInstanceId::new([0; 32]);
let payload = bincode::serialize(&InboundAppMessage::CheckAttested)?;
let app_msg = ApplicationMessage::new(payload);
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::ApplicationMessages {
key: delegate_key.clone(),
params,
inbound: vec![InboundDelegateMsg::ApplicationMessage(app_msg)],
},
))
.await?;
let resp = timeout(Duration::from_secs(10), client.recv()).await??;
match resp {
HostResponse::DelegateResponse { key, values } => {
assert_eq!(key, delegate_key, "Delegate key mismatch in response");
ensure!(!values.is_empty(), "No output messages from delegate");
let app_msg = match &values[0] {
OutboundDelegateMsg::ApplicationMessage(m) => m,
other @ OutboundDelegateMsg::RequestUserInput(_)
| other @ OutboundDelegateMsg::ContextUpdated(_)
| other @ OutboundDelegateMsg::GetContractRequest(_)
| other @ OutboundDelegateMsg::PutContractRequest(_)
| other @ OutboundDelegateMsg::UpdateContractRequest(_)
| other @ OutboundDelegateMsg::SubscribeContractRequest(_)
| other @ OutboundDelegateMsg::SendDelegateMessage(_) => {
bail!("Expected ApplicationMessage, got {:?}", other)
}
};
assert!(app_msg.processed, "Message not marked as processed");
let response: OutboundAppMessage = bincode::deserialize(&app_msg.payload)?;
match response {
OutboundAppMessage::Attested(Some(bytes)) => {
let origin: MessageOrigin = bincode::deserialize(&bytes)
.expect("Failed to deserialize MessageOrigin from delegate response");
#[allow(clippy::wildcard_enum_match_arm)]
match origin {
MessageOrigin::WebApp(contract_id) => {
assert_eq!(
contract_id, expected_contract_id,
"MessageOrigin contract ID does not match expected"
);
}
other => bail!("Expected MessageOrigin::WebApp, got {other:?}"),
}
tracing::info!(
"SUCCESS: MessageOrigin correctly passed to delegate process function"
);
}
OutboundAppMessage::Attested(None) => {
bail!(
"Delegate received origin=None but expected Some(MessageOrigin::WebApp(..)). \
The origin contract was NOT passed to the delegate process function."
);
}
}
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Expected DelegateResponse, got {:?}", other),
}
Ok(())
}
const THREE_HOP_TEST_CONTRACT: &str = "test-contract-integration";
static THREE_HOP_CONTRACT: LazyLock<(ContractContainer, freenet::dev_tool::Location)> =
LazyLock::new(|| {
let contract =
test_utils::load_contract(THREE_HOP_TEST_CONTRACT, vec![].into()).expect("contract");
let location = freenet::dev_tool::Location::from(&contract.key());
(contract, location)
});
fn three_hop_contract_location() -> freenet::dev_tool::Location {
let (_, location) = &*THREE_HOP_CONTRACT;
*location
}
fn three_hop_gateway_location() -> f64 {
freenet::dev_tool::Location::new_rounded(three_hop_contract_location().as_f64() + 0.2).as_f64()
}
fn three_hop_peer_a_location() -> f64 {
freenet::dev_tool::Location::new_rounded(three_hop_contract_location().as_f64() + 0.5).as_f64()
}
fn three_hop_peer_c_location() -> f64 {
three_hop_contract_location().as_f64()
}
fn expected_three_hop_locations() -> [f64; 3] {
[
three_hop_gateway_location(),
three_hop_peer_a_location(),
three_hop_peer_c_location(),
]
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a", "peer-c"],
gateways = ["gateway"],
node_configs = {
"gateway": { location: three_hop_gateway_location() },
"peer-a": { location: three_hop_peer_a_location() },
"peer-c": { location: three_hop_peer_c_location() },
},
timeout_secs = 240,
startup_wait_secs = 30,
aggregate_events = "on_failure",
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_put_contract_three_hop_returns_response(ctx: &mut TestContext) -> TestResult {
use freenet::dev_tool::Location;
let (contract, contract_location) = {
let (contract, location) = &*THREE_HOP_CONTRACT;
(contract.clone(), *location)
};
let contract_key = contract.key();
let node_locations = expected_three_hop_locations();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let gateway = ctx.node("gateway")?;
let peer_a = ctx.node("peer-a")?;
let peer_c = ctx.node("peer-c")?;
assert_eq!(gateway.location, node_locations[0]);
assert_eq!(peer_a.location, node_locations[1]);
assert_eq!(peer_c.location, node_locations[2]);
tracing::info!("Node A data dir: {:?}", peer_a.temp_dir_path);
tracing::info!("Gateway node data dir: {:?}", gateway.temp_dir_path);
tracing::info!("Node C data dir: {:?}", peer_c.temp_dir_path);
tracing::info!("Contract location: {}", contract_location.as_f64());
let gateway_distance = Location::new(gateway.location).distance(contract_location);
let peer_a_distance = Location::new(peer_a.location).distance(contract_location);
let peer_c_distance = Location::new(peer_c.location).distance(contract_location);
assert!(
peer_c_distance.as_f64() < gateway_distance.as_f64(),
"peer-c must be closer to contract than the gateway for three-hop routing"
);
assert!(
peer_c_distance.as_f64() < peer_a_distance.as_f64(),
"peer-c must be closest node to the contract location"
);
tracing::info!(
"Distances to contract - gateway: {}, peer-a: {}, peer-c: {}",
gateway_distance.as_f64(),
peer_a_distance.as_f64(),
peer_c_distance.as_f64()
);
let uri_a = peer_a.ws_url();
let (stream_a, _) = connect_async(&uri_a).await?;
let mut client_api_a = WebApi::start(stream_a);
send_put_with_retry(
&mut client_api_a,
wrapped_state.clone(),
contract.clone(),
"three-hop put",
Some(contract_key),
)
.await?;
let uri_c = peer_c.ws_url();
let (stream_c, _) = connect_async(&uri_c).await?;
let mut client_api_c = WebApi::start(stream_c);
const GET_RETRIES: usize = 3;
let mut last_err = None;
for attempt in 1..=GET_RETRIES {
tracing::info!("Attempt {attempt}/{GET_RETRIES} to GET from peer C");
tokio::time::sleep(Duration::from_secs(2)).await;
match get_contract(&mut client_api_c, contract_key, &peer_c.temp_dir_path).await {
Ok((response_contract, response_state)) => {
assert_eq!(response_contract, contract);
assert_eq!(response_state, wrapped_state);
break;
}
Err(e) => {
last_err = Some(e);
continue;
}
}
}
if let Some(err) = last_err {
bail!("GET from peer C failed after retries: {err}");
}
client_api_c
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
client_api_a
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let uri_b = gateway.ws_url();
let (stream_b, _) = connect_async(&uri_b).await?;
let mut client_api_b = WebApi::start(stream_b);
const GW_GET_RETRIES: usize = 3;
let mut gw_last_err = None;
for attempt in 1..=GW_GET_RETRIES {
tracing::info!("Attempt {attempt}/{GW_GET_RETRIES} to GET from gateway");
tokio::time::sleep(Duration::from_secs(1)).await;
match get_contract(&mut client_api_b, contract_key, &gateway.temp_dir_path).await {
Ok((gw_contract, gw_state)) => {
assert_eq!(gw_contract, contract);
assert_eq!(gw_state, wrapped_state);
gw_last_err = None;
break;
}
Err(e) => {
gw_last_err = Some(e);
if attempt < GW_GET_RETRIES {
tracing::warn!("Gateway GET attempt {attempt} failed, retrying...");
}
}
}
}
if let Some(err) = gw_last_err {
bail!("GET from gateway failed after retries: {err}");
}
client_api_b
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-node"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 10,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_subscription_introspection(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let _contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let _initial_state = test_utils::create_empty_todo_list();
let gateway = ctx.node("gateway")?;
let peer_node = ctx.node("peer-node")?;
tracing::info!("Gateway data dir: {:?}", gateway.temp_dir_path);
tracing::info!("Node data dir: {:?}", peer_node.temp_dir_path);
let uri_gw = gateway.ws_url();
let (stream_gw, _) = connect_async(&uri_gw).await?;
let mut client_gw = WebApi::start(stream_gw);
let uri_node = peer_node.ws_url();
let (stream_node, _) = connect_async(&uri_node).await?;
let _client_node = WebApi::start(stream_node);
tracing::info!("Testing basic subscription query without any subscriptions");
tracing::info!("Querying subscription info from gateway");
client_gw
.send(ClientRequest::NodeQueries(
freenet_stdlib::client_api::NodeQuery::SubscriptionInfo,
))
.await?;
let resp = timeout(Duration::from_secs(5), client_gw.recv()).await??;
match resp {
HostResponse::QueryResponse(QueryResponse::NetworkDebug(info)) => {
tracing::info!("Gateway subscription info:");
tracing::info!(" Connected peers: {:?}", info.connected_peers);
tracing::info!(" Total subscriptions: {}", info.subscriptions.len());
assert!(
info.subscriptions.is_empty(),
"Expected no subscriptions initially"
);
tracing::info!("Test passed - query subscription info works");
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::DelegateResponse { .. }
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => {
bail!("Unexpected response: {:?}", other);
}
}
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_update_no_change_notification(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-update-nochange";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
#[derive(serde::Serialize, serde::Deserialize)]
struct SimpleState {
value: String,
counter: u64,
}
let initial_state = SimpleState {
value: "initial".to_string(),
counter: 1,
};
let initial_state_bytes = serde_json::to_vec(&initial_state)?;
let wrapped_state = WrappedState::from(initial_state_bytes);
let peer_a = ctx.node("peer-a")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Node A data dir: {:?}", peer_a.temp_dir_path);
tracing::info!("Node B (gw) data dir: {:?}", gateway.temp_dir_path);
let uri = peer_a.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api_a = WebApi::start(stream);
make_put(
&mut client_api_a,
wrapped_state.clone(),
contract.clone(),
false,
)
.await?;
let resp = tokio::time::timeout(Duration::from_secs(30), client_api_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
}
Ok(Ok(other)) => {
tracing::warn!("unexpected response while waiting for put: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving put response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for put response");
}
}
tracing::info!("Sending UPDATE with identical state to trigger UpdateNoChange");
make_update(&mut client_api_a, contract_key, wrapped_state.clone()).await?;
let resp = tokio::time::timeout(Duration::from_secs(30), client_api_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key,
summary: _,
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE response"
);
tracing::info!("SUCCESS: Received UpdateResponse for no-change update");
}
Ok(Ok(other)) => {
bail!("Unexpected response while waiting for update: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving update response: {}", e);
}
Err(_) => {
bail!(
"TIMEOUT waiting for update response - UpdateNoChange bug: client not notified when update results in no state change"
);
}
}
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a"],
// Increased timeout for CI where 8 parallel tests compete for resources
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_update_broadcast_propagation_issue_2301(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_initial_state = WrappedState::from(initial_state);
let peer_a = ctx.node("peer-a")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Peer A data dir: {:?}", peer_a.temp_dir_path);
tracing::info!("Gateway data dir: {:?}", gateway.temp_dir_path);
let uri_peer_a = peer_a.ws_url();
let (stream_a, _) = connect_async(&uri_peer_a).await?;
let mut client_peer_a = WebApi::start(stream_a);
let uri_gateway = gateway.ws_url();
let (stream_gw, _) = connect_async(&uri_gateway).await?;
let mut client_gateway = WebApi::start(stream_gw);
tracing::info!("Step 1: Peer-a putting contract with initial state");
make_put(
&mut client_peer_a,
wrapped_initial_state.clone(),
contract.clone(),
false,
)
.await?;
let resp = tokio::time::timeout(Duration::from_secs(120), client_peer_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
tracing::info!("Peer-a: PUT successful for contract {}", key);
}
Ok(Ok(other)) => {
bail!(
"Peer-a: unexpected response while waiting for put: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Peer-a: Error receiving put response: {}", e);
}
Err(_) => {
bail!("Peer-a: Timeout waiting for put response");
}
}
tracing::info!("Step 2: Gateway getting contract to cache it");
make_get(&mut client_gateway, contract_key, true, false).await?;
let resp = tokio::time::timeout(Duration::from_secs(120), client_gateway.recv()).await;
let initial_state_on_gateway: WrappedState = match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
state,
..
}))) => {
assert_eq!(key, contract_key, "Contract key mismatch in GET response");
tracing::info!("Gateway: GET successful, contract cached");
state
}
Ok(Ok(other)) => {
bail!(
"Gateway: unexpected response while waiting for get: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Gateway: Error receiving get response: {}", e);
}
Err(_) => {
bail!("Gateway: Timeout waiting for get response");
}
};
tracing::info!("Step 3: Peer-a updating contract with new state");
let mut todo_list: test_utils::TodoList =
serde_json::from_slice(wrapped_initial_state.as_ref())
.expect("Failed to deserialize wrapped_initial_state as TodoList");
todo_list.tasks.push(test_utils::Task {
id: 1,
title: "Issue 2301 regression test".to_string(),
description: "This task should propagate to gateway via UPDATE broadcast".to_string(),
completed: false,
priority: 1,
});
let updated_bytes = serde_json::to_vec(&todo_list).unwrap();
let wrapped_updated_state = WrappedState::from(updated_bytes);
make_update(
&mut client_peer_a,
contract_key,
wrapped_updated_state.clone(),
)
.await?;
let resp = tokio::time::timeout(Duration::from_secs(30), client_peer_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key, ..
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE response"
);
tracing::info!("Peer-a: UPDATE successful");
}
Ok(Ok(other)) => {
bail!(
"Peer-a: unexpected response while waiting for update: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Peer-a: Error receiving update response: {}", e);
}
Err(_) => {
bail!("Peer-a: Timeout waiting for update response");
}
}
tracing::info!("Step 4: Verifying gateway received the update via polling GET");
const POLL_INTERVAL_SECS: u64 = 2; const POLL_TIMEOUT_SECS: u64 = 30;
let poll_start = std::time::Instant::now();
let poll_timeout = Duration::from_secs(POLL_TIMEOUT_SECS);
let mut gateway_received_update = false;
while poll_start.elapsed() < poll_timeout {
tokio::time::sleep(Duration::from_secs(POLL_INTERVAL_SECS)).await;
make_get(&mut client_gateway, contract_key, false, false).await?;
let resp = tokio::time::timeout(Duration::from_secs(10), client_gateway.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
state,
..
}))) => {
assert_eq!(key, contract_key);
if state.as_ref() != initial_state_on_gateway.as_ref() {
let state_on_gateway: test_utils::TodoList =
serde_json::from_slice(state.as_ref())
.expect("Failed to deserialize gateway state");
if !state_on_gateway.tasks.is_empty()
&& state_on_gateway.tasks[0].title == "Issue 2301 regression test"
{
tracing::info!(
"SUCCESS: Gateway received the UPDATE broadcast! State updated with {} task(s)",
state_on_gateway.tasks.len()
);
gateway_received_update = true;
break;
} else {
tracing::warn!(
"Gateway state changed but did not match expected task. \
Tasks: {:?}, version: {}",
state_on_gateway.tasks,
state_on_gateway.version
);
}
}
}
Ok(Ok(other)) => {
tracing::warn!("Gateway poll: unexpected response: {:?}", other);
}
Ok(Err(e)) => {
tracing::warn!("Gateway poll: error: {}", e);
}
Err(_) => {
tracing::warn!("Gateway poll: timeout");
}
}
tracing::debug!(
"Gateway state not yet updated, polling... (elapsed: {:?})",
poll_start.elapsed()
);
}
ensure!(
gateway_received_update,
"REGRESSION FAILURE (Issue #2301): Gateway did not receive UPDATE broadcast. \
This indicates the NoChange false positive bug is present - the UPDATE was \
not broadcast to other nodes because change detection incorrectly returned \
NoChange after the local state was already committed."
);
tracing::info!(
"Issue #2301 regression test passed: UPDATE broadcast propagation works correctly"
);
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a"],
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_put_then_immediate_subscribe_succeeds_locally_regression_2326(
ctx: &mut TestContext,
) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let peer_a = ctx.node("peer-a")?;
let ws_api_port = peer_a.ws_port;
tracing::info!(
"Regression test #2326: Testing PUT then immediate Subscribe on peer-a (ws_port: {})",
ws_api_port
);
let uri = peer_a.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api = WebApi::start(stream);
tracing::info!("Step 1: PUT contract with subscribe=false");
make_put(
&mut client_api,
wrapped_state.clone(),
contract.clone(),
false,
)
.await?;
let put_resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await;
match put_resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
tracing::info!("PUT successful for contract: {}", key);
assert_eq!(key, contract_key);
}
Ok(Ok(other)) => {
bail!("Unexpected response while waiting for PUT: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving PUT response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for PUT response");
}
}
tracing::info!("Step 2: Immediately Subscribe (before propagation to gateway)");
make_subscribe(&mut client_api, contract_key).await?;
let sub_resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await;
match sub_resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
..
}))) => {
ensure!(
subscribed,
"REGRESSION FAILURE (Issue #2326): Subscribe returned subscribed=false. \
This indicates the bug is present - Subscribe forwarded to a remote peer \
instead of completing locally for a cached contract."
);
tracing::info!(
"Subscribe successful for contract: {} (completed locally as expected)",
key
);
assert_eq!(key, contract_key);
}
Ok(Ok(other)) => {
bail!(
"Unexpected response while waiting for Subscribe: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Error receiving Subscribe response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for Subscribe response");
}
}
client_api
.send(ClientRequest::Disconnect { cause: None })
.await?;
tracing::info!("Issue #2326 regression test passed: PUT then immediate Subscribe works");
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 60,
startup_wait_secs = 5,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_get_notfound_no_forwarding_targets(ctx: &mut TestContext) -> TestResult {
let gateway = ctx.node("gateway")?;
tracing::info!("Gateway ws_port: {}", gateway.ws_port);
tokio::time::sleep(Duration::from_secs(2)).await;
let uri = gateway.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client_api = WebApi::start(stream);
let nonexistent_instance_id = ContractInstanceId::new([0xDE; 32]);
tracing::info!(
"Requesting non-existent contract: {}",
nonexistent_instance_id
);
let get_request = ClientRequest::ContractOp(freenet_stdlib::client_api::ContractRequest::Get {
key: nonexistent_instance_id,
return_contract_code: true,
subscribe: false,
blocking_subscribe: false,
});
client_api.send(get_request).await?;
let start = std::time::Instant::now();
let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await;
let elapsed = start.elapsed();
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::NotFound { instance_id }))) => {
tracing::info!(
"✓ Received NotFound response in {:?} for instance_id: {:?}",
elapsed,
instance_id
);
assert_eq!(
instance_id, nonexistent_instance_id,
"NotFound should be for the requested instance_id"
);
assert!(
elapsed < Duration::from_secs(20),
"NotFound should be returned quickly, not after timeout. Took {:?}",
elapsed
);
}
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { key, .. }))) => {
bail!(
"Unexpectedly found contract {:?} - test assumes it doesn't exist",
key
);
}
Ok(Ok(other)) => {
bail!("Unexpected response type: {:?} (expected NotFound)", other);
}
Ok(Err(e)) => {
let err_str = e.to_string();
if err_str.contains("no ring connections")
|| err_str.contains("No ring connections")
|| err_str.contains("not found")
{
tracing::info!(
"✓ Received fast error response in {:?}: {}",
elapsed,
err_str
);
assert!(
elapsed < Duration::from_secs(20),
"Error should be returned quickly, not after timeout. Took {:?}",
elapsed
);
} else {
bail!(
"Unexpected error: {} (expected NotFound or network error)",
e
);
}
}
Err(_) => {
bail!(
"Timeout after 30 seconds waiting for NotFound response. \
This is the bug: GET should return NotFound immediately, \
not timeout waiting for a response that never comes."
);
}
}
tracing::info!("Test PASSED: GET for non-existent contract fails quickly (not timeout)");
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a"],
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_put_triggers_update_for_subscribers(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_initial_state = WrappedState::from(initial_state.clone());
let peer_a = ctx.node("peer-a")?;
let gateway = ctx.node("gateway")?;
tracing::info!("Peer A data dir: {:?}", peer_a.temp_dir_path);
tracing::info!("Gateway data dir: {:?}", gateway.temp_dir_path);
let uri_peer_a = peer_a.ws_url();
let (stream_a, _) = connect_async(&uri_peer_a).await?;
let mut client_peer_a = WebApi::start(stream_a);
let uri_gateway = gateway.ws_url();
let (stream_gw, _) = connect_async(&uri_gateway).await?;
let mut client_gateway = WebApi::start(stream_gw);
tracing::info!("Step 1: Peer-a putting contract with initial state (version 1)");
make_put(
&mut client_peer_a,
wrapped_initial_state.clone(),
contract.clone(),
false,
)
.await?;
let resp = tokio::time::timeout(Duration::from_secs(120), client_peer_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
tracing::info!("Peer-a: Initial PUT successful for contract {}", key);
}
Ok(Ok(other)) => {
bail!(
"Peer-a: unexpected response while waiting for put: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Peer-a: Error receiving put response: {}", e);
}
Err(_) => {
bail!("Peer-a: Timeout waiting for put response");
}
}
tracing::info!("Step 2: Gateway getting contract to cache it");
make_get(&mut client_gateway, contract_key, true, false).await?;
let resp = tokio::time::timeout(Duration::from_secs(120), client_gateway.recv()).await;
let initial_state_on_gateway: WrappedState = match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
state,
..
}))) => {
assert_eq!(key, contract_key, "Contract key mismatch in GET response");
tracing::info!("Gateway: GET successful, contract cached");
state
}
Ok(Ok(other)) => {
bail!(
"Gateway: unexpected response while waiting for get: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Gateway: Error receiving get response: {}", e);
}
Err(_) => {
bail!("Gateway: Timeout waiting for get response");
}
};
tracing::info!("Step 3: Gateway subscribing to contract");
make_subscribe(&mut client_gateway, contract_key).await?;
let resp = tokio::time::timeout(Duration::from_secs(30), client_gateway.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key, ..
}))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in SUBSCRIBE response"
);
tracing::info!("Gateway: SUBSCRIBE successful for contract {}", key);
}
Ok(Ok(other)) => {
bail!(
"Gateway: unexpected response while waiting for subscribe: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Gateway: Error receiving subscribe response: {}", e);
}
Err(_) => {
bail!("Gateway: Timeout waiting for subscribe response");
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
tracing::info!("Step 4: Peer-a putting UPDATED contract state (version 2)");
let mut todo_list: test_utils::TodoList =
serde_json::from_slice(initial_state_on_gateway.as_ref())
.expect("Failed to deserialize initial state as TodoList");
todo_list.tasks.push(test_utils::Task {
id: 1,
title: "PUT→UPDATE regression test".to_string(),
description: "This task should propagate to gateway when PUT triggers UPDATE".to_string(),
completed: false,
priority: 1,
});
let updated_bytes = serde_json::to_vec(&todo_list).unwrap();
let wrapped_updated_state = WrappedState::from(updated_bytes);
make_put(
&mut client_peer_a,
wrapped_updated_state.clone(),
contract.clone(),
false,
)
.await?;
let resp = tokio::time::timeout(Duration::from_secs(120), client_peer_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(
key, contract_key,
"Contract key mismatch in second PUT response"
);
tracing::info!("Peer-a: Second PUT successful for contract {}", key);
}
Ok(Ok(other)) => {
bail!(
"Peer-a: unexpected response while waiting for second put: {:?}",
other
);
}
Ok(Err(e)) => {
bail!("Peer-a: Error receiving second put response: {}", e);
}
Err(_) => {
bail!("Peer-a: Timeout waiting for second put response");
}
}
tracing::info!("Step 5: Waiting for UPDATE notification on gateway (subscribed)");
const POLL_INTERVAL_SECS: u64 = 2;
const POLL_TIMEOUT_SECS: u64 = 30;
let poll_start = std::time::Instant::now();
let poll_timeout = Duration::from_secs(POLL_TIMEOUT_SECS);
let mut gateway_received_update = false;
while poll_start.elapsed() < poll_timeout {
tokio::time::sleep(Duration::from_secs(POLL_INTERVAL_SECS)).await;
make_get(&mut client_gateway, contract_key, false, false).await?;
let resp = tokio::time::timeout(Duration::from_secs(10), client_gateway.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
state,
..
}))) => {
assert_eq!(key, contract_key);
if state.as_ref() != initial_state_on_gateway.as_ref() {
let state_on_gateway: test_utils::TodoList =
serde_json::from_slice(state.as_ref())
.expect("Failed to deserialize gateway state");
if !state_on_gateway.tasks.is_empty()
&& state_on_gateway.tasks[0].title == "PUT→UPDATE regression test"
{
tracing::info!(
"SUCCESS: Gateway received the UPDATE triggered by PUT! \
State updated with {} task(s)",
state_on_gateway.tasks.len()
);
gateway_received_update = true;
break;
}
}
}
Ok(Ok(other)) => {
tracing::warn!("Gateway poll: unexpected response: {:?}", other);
}
Ok(Err(e)) => {
tracing::warn!("Gateway poll: error: {}", e);
}
Err(_) => {
tracing::warn!("Gateway poll: timeout");
}
}
tracing::debug!(
"Gateway state not yet updated, polling... (elapsed: {:?})",
poll_start.elapsed()
);
}
ensure!(
gateway_received_update,
"REGRESSION FAILURE (PUT→UPDATE propagation): Gateway did not receive UPDATE \
triggered by PUT. This indicates the bug is present: PUT on a subscribed \
contract is not triggering UPDATE propagation when the contract's summarize \
function accepts the incoming state as-is (merged_value == value)."
);
tracing::info!(
"PUT→UPDATE propagation regression test passed: \
PUT correctly triggers UPDATE for subscribers"
);
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
enum DelegateCommand {
GetContractState {
contract_id: ContractInstanceId,
},
GetMultipleContractStates {
contract_ids: Vec<ContractInstanceId>,
},
GetContractWithEcho {
contract_id: ContractInstanceId,
echo_message: String,
},
PutContractState {
contract: ContractContainer,
state: Vec<u8>,
},
UpdateContractState {
contract_id: ContractInstanceId,
state: Vec<u8>,
},
SubscribeContract {
contract_id: ContractInstanceId,
},
}
#[derive(Debug, Serialize, Deserialize)]
enum DelegateCommandResponse {
ContractState {
contract_id: ContractInstanceId,
state: Option<Vec<u8>>,
},
MultipleContractStates {
results: Vec<(ContractInstanceId, Option<Vec<u8>>)>,
},
Echo {
message: String,
},
ContractPutResult {
contract_id: ContractInstanceId,
success: bool,
error: Option<String>,
},
ContractUpdateResult {
contract_id: ContractInstanceId,
success: bool,
error: Option<String>,
},
ContractSubscribeResult {
contract_id: ContractInstanceId,
success: bool,
error: Option<String>,
},
ContractNotificationReceived {
contract_id: ContractInstanceId,
new_state: Vec<u8>,
},
Error {
message: String,
},
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 300,
startup_wait_secs = 20,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_delegate_contract_put_and_update(ctx: &mut TestContext) -> TestResult {
const TEST_DELEGATE: &str = "test-delegate-capabilities";
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, Parameters::from(vec![]))?;
let contract_key = contract.key();
let delegate = load_delegate(TEST_DELEGATE, Parameters::from(vec![]))?;
let delegate_key = delegate.key().clone();
let gateway = ctx.node("gateway")?;
tokio::time::sleep(Duration::from_secs(3)).await;
let uri = gateway.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client = WebApi::start(stream);
tracing::info!("Step 1: Registering delegate");
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::RegisterDelegate {
delegate: delegate.clone(),
cipher: freenet_stdlib::client_api::DelegateRequest::DEFAULT_CIPHER,
nonce: freenet_stdlib::client_api::DelegateRequest::DEFAULT_NONCE,
},
))
.await?;
let resp = timeout(Duration::from_secs(30), client.recv()).await??;
match resp {
HostResponse::DelegateResponse { key, .. } => {
ensure!(key == delegate_key, "Delegate key mismatch on register");
tracing::info!("Delegate registered: {key}");
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Unexpected register response: {:?}", other),
}
let initial_state = test_utils::create_empty_todo_list();
tracing::info!("Step 2: Delegate PUT contract (empty todo list)");
let _app_id = ContractInstanceId::new([42u8; 32]);
let put_cmd = DelegateCommand::PutContractState {
contract: contract.clone(),
state: initial_state.clone(),
};
let put_payload = bincode::serialize(&put_cmd)?;
let app_msg = ApplicationMessage::new(put_payload);
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::ApplicationMessages {
key: delegate_key.clone(),
params: Parameters::from(vec![]),
inbound: vec![InboundDelegateMsg::ApplicationMessage(app_msg)],
},
))
.await?;
let resp = timeout(Duration::from_secs(60), client.recv()).await??;
match resp {
HostResponse::DelegateResponse { key, values } => {
ensure!(key == delegate_key, "Delegate key mismatch on PUT response");
tracing::info!("Delegate PUT response: {} outbound messages", values.len());
let put_result = values
.iter()
.find_map(|v| {
if let OutboundDelegateMsg::ApplicationMessage(msg) = v {
bincode::deserialize::<DelegateCommandResponse>(&msg.payload).ok()
} else {
None
}
})
.ok_or_else(|| anyhow::anyhow!("No ApplicationMessage in delegate PUT response"))?;
match put_result {
DelegateCommandResponse::ContractPutResult { success, error, .. } => {
ensure!(
success,
"Delegate PUT failed: {}",
error.unwrap_or_default()
);
tracing::info!("Delegate PUT succeeded");
}
other @ DelegateCommandResponse::ContractState { .. }
| other @ DelegateCommandResponse::MultipleContractStates { .. }
| other @ DelegateCommandResponse::Echo { .. }
| other @ DelegateCommandResponse::ContractUpdateResult { .. }
| other @ DelegateCommandResponse::ContractSubscribeResult { .. }
| other @ DelegateCommandResponse::ContractNotificationReceived { .. }
| other @ DelegateCommandResponse::Error { .. } => {
bail!("Expected ContractPutResult, got {:?}", other)
}
}
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Unexpected delegate PUT response: {:?}", other),
}
tracing::info!("Step 3: Direct GET to verify PUT state");
make_get(&mut client, contract_key, true, false).await?;
let resp = timeout(Duration::from_secs(30), client.recv()).await??;
match resp {
HostResponse::ContractResponse(ContractResponse::GetResponse {
state, contract, ..
}) => {
ensure!(contract.is_some(), "GET should return contract code");
let stored: test_utils::TodoList = serde_json::from_slice(state.as_ref())?;
ensure!(
stored.tasks.is_empty(),
"Initial state should be empty todo list, got {} tasks",
stored.tasks.len()
);
tracing::info!(
"GET confirmed: empty todo list stored (version {})",
stored.version
);
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::DelegateResponse { .. }
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Unexpected GET response: {:?}", other),
}
tracing::info!("Step 4: Delegate UPDATE contract (add a task)");
let updated_state = test_utils::create_todo_list_with_item("Delegate E2E test task");
let contract_instance_id = *contract_key.id();
let update_cmd = DelegateCommand::UpdateContractState {
contract_id: contract_instance_id,
state: updated_state.clone(),
};
let update_payload = bincode::serialize(&update_cmd)?;
let update_msg = ApplicationMessage::new(update_payload);
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::ApplicationMessages {
key: delegate_key.clone(),
params: Parameters::from(vec![]),
inbound: vec![InboundDelegateMsg::ApplicationMessage(update_msg)],
},
))
.await?;
let resp = timeout(Duration::from_secs(60), client.recv()).await??;
match resp {
HostResponse::DelegateResponse { key, values } => {
ensure!(
key == delegate_key,
"Delegate key mismatch on UPDATE response"
);
tracing::info!(
"Delegate UPDATE response: {} outbound messages",
values.len()
);
let update_result = values
.iter()
.find_map(|v| {
if let OutboundDelegateMsg::ApplicationMessage(msg) = v {
bincode::deserialize::<DelegateCommandResponse>(&msg.payload).ok()
} else {
None
}
})
.ok_or_else(|| {
anyhow::anyhow!("No ApplicationMessage in delegate UPDATE response")
})?;
match update_result {
DelegateCommandResponse::ContractUpdateResult { success, error, .. } => {
ensure!(
success,
"Delegate UPDATE failed: {}",
error.unwrap_or_default()
);
tracing::info!("Delegate UPDATE succeeded");
}
other @ DelegateCommandResponse::ContractState { .. }
| other @ DelegateCommandResponse::MultipleContractStates { .. }
| other @ DelegateCommandResponse::Echo { .. }
| other @ DelegateCommandResponse::ContractPutResult { .. }
| other @ DelegateCommandResponse::ContractSubscribeResult { .. }
| other @ DelegateCommandResponse::ContractNotificationReceived { .. }
| other @ DelegateCommandResponse::Error { .. } => {
bail!("Expected ContractUpdateResult, got {:?}", other)
}
}
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Unexpected delegate UPDATE response: {:?}", other),
}
tracing::info!("Step 5: Direct GET to verify UPDATE state");
make_get(&mut client, contract_key, false, false).await?;
let resp = timeout(Duration::from_secs(30), client.recv()).await??;
match resp {
HostResponse::ContractResponse(ContractResponse::GetResponse { state, .. }) => {
let stored: test_utils::TodoList = serde_json::from_slice(state.as_ref())?;
ensure!(
stored.tasks.len() == 1,
"Updated state should have 1 task, got {}",
stored.tasks.len()
);
ensure!(
stored.tasks[0].title == "Delegate E2E test task",
"Task title mismatch: {}",
stored.tasks[0].title
);
tracing::info!(
"GET confirmed: 1 task stored (version {}), title: {}",
stored.version,
stored.tasks[0].title
);
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::DelegateResponse { .. }
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Unexpected GET response after UPDATE: {:?}", other),
}
tracing::info!("Delegate contract PUT and UPDATE E2E test passed");
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway"],
timeout_secs = 300,
startup_wait_secs = 20,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_delegate_contract_get(ctx: &mut TestContext) -> TestResult {
const TEST_DELEGATE: &str = "test-delegate-capabilities";
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = load_contract(TEST_CONTRACT, Parameters::from(vec![]))?;
let contract_key = contract.key();
let delegate = load_delegate(TEST_DELEGATE, Parameters::from(vec![]))?;
let delegate_key = delegate.key().clone();
let gateway = ctx.node("gateway")?;
tokio::time::sleep(Duration::from_secs(3)).await;
let uri = gateway.ws_url();
let (stream, _) = connect_async(&uri).await?;
let mut client = WebApi::start(stream);
let initial_state = test_utils::create_todo_list_with_item("Pre-existing task");
tracing::info!("Step 1: Direct PUT of contract with initial state");
make_put(
&mut client,
WrappedState::from(initial_state.clone()),
contract.clone(),
false,
)
.await?;
let resp = timeout(Duration::from_secs(30), client.recv()).await??;
match resp {
HostResponse::ContractResponse(ContractResponse::PutResponse { key }) => {
ensure!(key == contract_key, "PUT key mismatch");
tracing::info!("Direct PUT succeeded");
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::DelegateResponse { .. }
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Unexpected PUT response: {:?}", other),
}
tracing::info!("Step 2: Registering delegate");
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::RegisterDelegate {
delegate: delegate.clone(),
cipher: freenet_stdlib::client_api::DelegateRequest::DEFAULT_CIPHER,
nonce: freenet_stdlib::client_api::DelegateRequest::DEFAULT_NONCE,
},
))
.await?;
let resp = timeout(Duration::from_secs(30), client.recv()).await??;
match resp {
HostResponse::DelegateResponse { key, .. } => {
ensure!(key == delegate_key, "Delegate key mismatch on register");
tracing::info!("Delegate registered");
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Unexpected register response: {:?}", other),
}
tracing::info!("Step 3: Delegate GET contract state");
let contract_instance_id = *contract_key.id();
let _app_id = ContractInstanceId::new([42u8; 32]);
let get_cmd = DelegateCommand::GetContractState {
contract_id: contract_instance_id,
};
let get_payload = bincode::serialize(&get_cmd)?;
let get_msg = ApplicationMessage::new(get_payload);
client
.send(ClientRequest::DelegateOp(
freenet_stdlib::client_api::DelegateRequest::ApplicationMessages {
key: delegate_key.clone(),
params: Parameters::from(vec![]),
inbound: vec![InboundDelegateMsg::ApplicationMessage(get_msg)],
},
))
.await?;
let resp = timeout(Duration::from_secs(60), client.recv()).await??;
match resp {
HostResponse::DelegateResponse { key, values } => {
ensure!(key == delegate_key, "Delegate key mismatch on GET response");
let get_result = values
.iter()
.find_map(|v| {
if let OutboundDelegateMsg::ApplicationMessage(msg) = v {
bincode::deserialize::<DelegateCommandResponse>(&msg.payload).ok()
} else {
None
}
})
.ok_or_else(|| anyhow::anyhow!("No ApplicationMessage in delegate GET response"))?;
match get_result {
DelegateCommandResponse::ContractState { state, .. } => {
let state_bytes =
state.ok_or_else(|| anyhow::anyhow!("GET returned None state"))?;
let todo: test_utils::TodoList = serde_json::from_slice(&state_bytes)?;
ensure!(
todo.tasks.len() == 1,
"Expected 1 task from GET, got {}",
todo.tasks.len()
);
ensure!(
todo.tasks[0].title == "Pre-existing task",
"Task title mismatch: {}",
todo.tasks[0].title
);
tracing::info!(
"Delegate GET returned correct state: {} tasks, version {}",
todo.tasks.len(),
todo.version
);
}
other @ DelegateCommandResponse::MultipleContractStates { .. }
| other @ DelegateCommandResponse::Echo { .. }
| other @ DelegateCommandResponse::ContractPutResult { .. }
| other @ DelegateCommandResponse::ContractUpdateResult { .. }
| other @ DelegateCommandResponse::ContractSubscribeResult { .. }
| other @ DelegateCommandResponse::ContractNotificationReceived { .. }
| other @ DelegateCommandResponse::Error { .. } => {
bail!("Expected ContractState, got {:?}", other)
}
}
}
other @ HostResponse::ContractResponse(_)
| other @ HostResponse::QueryResponse(_)
| other @ HostResponse::Ok
| other => bail!("Unexpected delegate GET response: {:?}", other),
}
tracing::info!("Delegate contract GET E2E test passed");
Ok(())
}
#[freenet_test(
nodes = ["gateway", "node-a", "node-b"],
timeout_secs = 600,
startup_wait_secs = 40,
health_check_readiness = true,
aggregate_events = "always",
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_client_disconnect_triggers_upstream_unsubscribe(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);
let node_a = ctx.node("node-a")?;
let node_b = ctx.node("node-b")?;
tokio::time::sleep(Duration::from_secs(5)).await;
let uri_a = node_a.ws_url();
let (stream_a, _) = connect_async(&uri_a).await?;
let mut client_a = WebApi::start(stream_a);
let uri_b = node_b.ws_url();
let (stream_b, _) = connect_async(&uri_b).await?;
let mut client_b = WebApi::start(stream_b);
tracing::info!("Client A: PUT contract");
make_put(
&mut client_a,
wrapped_state.clone(),
contract.clone(),
false,
)
.await?;
loop {
match timeout(Duration::from_secs(120), client_a.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
assert_eq!(key, contract_key);
tracing::info!("Client A: PUT complete");
break;
}
Ok(Ok(_)) => continue,
Ok(Err(e)) => bail!("Error waiting for PUT response: {}", e),
Err(_) => bail!("Timeout waiting for PUT response"),
}
}
tracing::info!("Client B: GET contract");
make_get(&mut client_b, contract_key, true, false).await?;
loop {
match timeout(Duration::from_secs(120), client_b.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key, ..
}))) => {
assert_eq!(key, contract_key);
tracing::info!("Client B: GET complete");
break;
}
Ok(Ok(_)) => continue,
Ok(Err(e)) => bail!("Error waiting for GET response: {}", e),
Err(_) => bail!("Timeout waiting for GET response"),
}
}
tracing::info!("Client B: SUBSCRIBE");
make_subscribe(&mut client_b, contract_key).await?;
loop {
match timeout(Duration::from_secs(30), client_b.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
key,
subscribed,
}))) => {
assert_eq!(key, contract_key);
assert!(subscribed);
tracing::info!("Client B: subscribed");
break;
}
Ok(Ok(_)) => continue,
Ok(Err(e)) => bail!("Error waiting for SUBSCRIBE response: {}", e),
Err(_) => bail!("Timeout waiting for SUBSCRIBE response"),
}
}
tracing::info!("Phase 1: Verify subscription works");
let todo = test_utils::TodoList {
tasks: vec![test_utils::Task {
id: 1,
title: "first update".into(),
description: "verify subscription".into(),
completed: false,
priority: 1,
}],
version: 1,
};
let state1 = WrappedState::from(serde_json::to_vec(&todo)?);
make_update(&mut client_a, contract_key, state1).await?;
let mut client_b_received = false;
let deadline = tokio::time::Instant::now() + Duration::from_secs(60);
while tokio::time::Instant::now() < deadline {
match timeout(Duration::from_secs(10), client_b.recv()).await {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
..
}))) => {
assert_eq!(key, contract_key);
tracing::info!("Client B: received update notification");
client_b_received = true;
break;
}
Ok(Ok(_)) => continue,
_ => break,
}
}
assert!(
client_b_received,
"Client B should receive update notification while subscribed"
);
tracing::info!("Phase 2: Disconnect client B");
while timeout(Duration::from_millis(500), client_b.recv())
.await
.is_ok()
{}
drop(
client_b
.send(ClientRequest::Disconnect { cause: None })
.await,
);
drop(client_b);
let instance_id = *contract_key.id();
let mut unsubscribe_sent_count = 0;
let mut unsubscribe_received_count = 0;
let poll_deadline = tokio::time::Instant::now() + Duration::from_secs(30);
while tokio::time::Instant::now() < poll_deadline {
tokio::time::sleep(Duration::from_secs(5)).await;
let aggregator = ctx.aggregate_events().await?;
let events = aggregator.get_all_events().await?;
unsubscribe_sent_count = events
.iter()
.filter(|e| {
e.kind
.unsubscribe_sent_instance_id()
.is_some_and(|id| *id == instance_id)
})
.count();
unsubscribe_received_count = events
.iter()
.filter(|e| {
e.kind
.unsubscribe_received_instance_id()
.is_some_and(|id| *id == instance_id)
})
.count();
tracing::info!(
"Unsubscribe events for {}: sent={}, received={}",
instance_id,
unsubscribe_sent_count,
unsubscribe_received_count
);
if unsubscribe_received_count > 0 {
break;
}
}
assert!(
unsubscribe_received_count > 0,
"Upstream node should have received the Unsubscribe message after client disconnect \
(sent={unsubscribe_sent_count}, received={unsubscribe_received_count})"
);
client_a
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(())
}
#[freenet_test(
health_check_readiness = true,
nodes = ["gateway", "peer-a"],
timeout_secs = 300,
startup_wait_secs = 30,
tokio_flavor = "multi_thread",
tokio_worker_threads = 4
)]
async fn test_repeated_get_with_unchanged_state_succeeds(ctx: &mut TestContext) -> TestResult {
const TEST_CONTRACT: &str = "test-contract-update-nochange";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
#[derive(Serialize, Deserialize)]
struct SimpleState {
value: String,
counter: u64,
}
let initial_state = SimpleState {
value: "hello".to_string(),
counter: 1,
};
let initial_state_bytes = serde_json::to_vec(&initial_state)?;
let wrapped_state = WrappedState::from(initial_state_bytes);
let gateway = ctx.node("gateway")?;
let peer_a = ctx.node("peer-a")?;
let (stream_gw, _) = connect_async(&gateway.ws_url()).await?;
let mut client_gw = WebApi::start(stream_gw);
let (stream_a, _) = connect_async(&peer_a.ws_url()).await?;
let mut client_a = WebApi::start(stream_a);
tracing::info!("Step 1: PUT contract on gateway");
send_put_with_retry(
&mut client_gw,
wrapped_state.clone(),
contract.clone(),
"PUT on gateway",
Some(contract_key),
)
.await?;
tracing::info!("Step 2: First GET from peer-a");
let (_contract_returned, first_get_state) =
get_contract(&mut client_a, contract_key, &peer_a.temp_dir_path).await?;
let first_state: SimpleState = serde_json::from_slice(first_get_state.as_ref())?;
assert_eq!(first_state.value, "hello");
assert_eq!(first_state.counter, 1);
tracing::info!("Step 3: Second GET from peer-a (already cached)");
make_get(&mut client_a, contract_key, false, false).await?;
let resp = tokio::time::timeout(Duration::from_secs(60), client_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
state,
..
}))) => {
assert_eq!(key, contract_key);
let second_state: SimpleState = serde_json::from_slice(state.as_ref())?;
assert_eq!(second_state.value, "hello");
assert_eq!(second_state.counter, 1);
}
Ok(Ok(other)) => {
bail!("Unexpected response on second GET: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error on second GET: {}", e);
}
Err(_) => {
bail!("Timeout on second GET — caching path likely rejected the redundant state");
}
}
Ok(())
}