#![cfg(feature = "simulation_tests")]
use freenet::config::{GlobalRng, GlobalSimulationTime};
use freenet::dev_tool::{
MockStateStorage, NodeLabel, ScheduledOperation, SimNetwork, SimOperation,
};
use freenet_stdlib::prelude::*;
use std::time::Duration;
async fn setup_streaming_network(
name: &str,
gateways: usize,
nodes: usize,
seed: u64,
streaming_threshold: usize,
) -> SimNetwork {
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000;
const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + (seed % RANGE_MS));
let mut sim = SimNetwork::new(
name, gateways, nodes, 7, 3, 10, 2, seed,
)
.await;
sim.with_streaming_threshold(streaming_threshold);
sim
}
fn find_contract_in_non_gateway_storages(
node_storages: &std::collections::HashMap<NodeLabel, MockStateStorage>,
contract_key: &ContractKey,
) -> Option<WrappedState> {
for (label, storage) in node_storages {
if label.is_node() {
if let Some(state) = storage.get_stored_state(contract_key) {
return Some(state);
}
}
}
None
}
#[test]
fn test_streaming_put_large_state() {
const SEED: u64 = 0x5720_0001_DEAD_BEEF;
const NETWORK_NAME: &str = "streaming-put-large";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(42);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 42);
let contract_key = contract.key();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(60),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming PUT simulation should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 100KB contract to be stored in at least one non-gateway node"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 100KB state"
);
}
#[test]
fn test_streaming_put_below_threshold_uses_inline() {
const SEED: u64 = 0x1011_0002_CAFE_BABE;
const NETWORK_NAME: &str = "streaming-inline";
const THRESHOLD: usize = 1024;
const SMALL_STATE_SIZE: usize = 512;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(99);
let small_state = SimOperation::create_large_state(SMALL_STATE_SIZE, 99);
let contract_key = contract.key();
let operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: small_state.clone(),
subscribe: false,
},
)];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(45),
);
assert!(
result.turmoil_result.is_ok(),
"Inline PUT simulation should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 512-byte contract to be stored in at least one non-gateway node"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, small_state,
"Stored state bytes should match the original 512-byte state"
);
}
#[test]
fn test_streaming_update_broadcast() {
const SEED: u64 = 0xBCA5_0003_1234_5678;
const NETWORK_NAME: &str = "streaming-update";
const THRESHOLD: usize = 1024;
const LARGE_UPDATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 3, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(55);
let contract_key = contract.key();
let initial_state = SimOperation::create_test_state(55);
let large_update = SimOperation::create_large_state(LARGE_UPDATE_SIZE, 77);
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: initial_state,
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Subscribe {
contract_id: *contract_key.id(),
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Subscribe {
contract_id: *contract_key.id(),
},
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: large_update,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(60),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming UPDATE broadcast simulation should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected at least one subscribing non-gateway node to have state for the updated contract"
);
}
#[test]
fn test_streaming_multiple_concurrent_puts() {
const SEED: u64 = 0xC00C_0004_ABCD_EF01;
const NETWORK_NAME: &str = "streaming-concurrent";
const THRESHOLD: usize = 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let contract_a = SimOperation::create_test_contract(10);
let state_a = SimOperation::create_large_state(50 * 1024, 10); let key_a = contract_a.key();
let contract_b = SimOperation::create_test_contract(20);
let state_b = SimOperation::create_large_state(80 * 1024, 20); let key_b = contract_b.key();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract_a.clone(),
state: state_a.clone(),
subscribe: false,
},
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract_b.clone(),
state: state_b.clone(),
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(60),
);
assert!(
result.turmoil_result.is_ok(),
"Multiple concurrent streaming PUTs should complete: {:?}",
result.turmoil_result.err()
);
let stored_a = find_contract_in_non_gateway_storages(&result.node_storages, &key_a);
assert!(
stored_a.is_some(),
"Expected 50KB contract A to be stored in at least one non-gateway node"
);
let stored_a_bytes: Vec<u8> = stored_a.unwrap().as_ref().to_vec();
assert_eq!(
stored_a_bytes, state_a,
"Stored state bytes for contract A should match the original 50KB state"
);
let stored_b = find_contract_in_non_gateway_storages(&result.node_storages, &key_b);
assert!(
stored_b.is_some(),
"Expected 80KB contract B to be stored in at least one non-gateway node"
);
let stored_b_bytes: Vec<u8> = stored_b.unwrap().as_ref().to_vec();
assert_eq!(
stored_b_bytes, state_b,
"Stored state bytes for contract B should match the original 80KB state"
);
}
#[test]
fn test_streaming_with_packet_loss() {
use freenet::simulation::FaultConfig;
const SEED: u64 = 0xA055_0005_BEEF_CAFE;
const NETWORK_NAME: &str = "streaming-lossy";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let fault_config = FaultConfig::builder().message_loss_rate(0.05).build();
sim.with_fault_injection(fault_config);
let contract = SimOperation::create_test_contract(33);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 33);
let contract_key = contract.key();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180), Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming with 5% packet loss should still complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 100KB contract to be stored in at least one non-gateway node even with packet loss"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 100KB state despite packet loss"
);
}
#[test]
fn test_streaming_with_packet_reordering() {
use freenet::simulation::FaultConfig;
const SEED: u64 = 0xBE0F_0006_DEAD_1234;
const NETWORK_NAME: &str = "streaming-reorder";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let fault_config = FaultConfig::builder()
.latency_range(Duration::from_millis(10)..Duration::from_millis(100))
.build();
sim.with_fault_injection(fault_config);
let contract = SimOperation::create_test_contract(66);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 66);
let contract_key = contract.key();
let operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
)];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming with packet reordering should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 100KB contract to be stored despite packet reordering"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 100KB state despite reordering"
);
}
#[test]
fn test_streaming_multi_hop_forwarding() {
const SEED: u64 = 0xF1A7_0007_CAFE_9876;
const NETWORK_NAME: &str = "streaming-multihop";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 200 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 4, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(88);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 88);
let contract_key = contract.key();
let operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
)];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Multi-hop streaming PUT should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 200KB contract to be stored in at least one non-gateway node via multi-hop"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 200KB state"
);
}
#[test]
fn test_streaming_get_through_relay() {
const SEED: u64 = 0xDE1A_0008_FACE_B00C;
const NETWORK_NAME: &str = "streaming-get-relay";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 1024 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 4, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(0xAB);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 0xAB);
let contract_key = contract.key();
let contract_id = *contract_key.id();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 3),
SimOperation::Get {
contract_id,
return_contract_code: false,
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300), Duration::from_secs(120),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming GET through relay should complete: {:?}",
result.turmoil_result.err()
);
let node3_label = NodeLabel::node(NETWORK_NAME, 3);
let node3_storage = result
.node_storages
.get(&node3_label)
.expect("node 3 should have a storage handle");
let node3_state = node3_storage.get_stored_state(&contract_key);
assert!(
node3_state.is_some(),
"Node 3 should have 1MB contract state after streaming GET through relay"
);
let stored_bytes: Vec<u8> = node3_state.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 1MB state after relay GET"
);
}
#[test]
fn test_streaming_get_1mb_with_packet_loss() {
use freenet::simulation::FaultConfig;
const SEED: u64 = 0xDE1A_000A_DEAD_BEEF;
const NETWORK_NAME: &str = "streaming-get-lossy-1mb";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 1024 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 4, SEED, THRESHOLD));
let fault_config = FaultConfig::builder().message_loss_rate(0.05).build();
sim.with_fault_injection(fault_config);
let contract = SimOperation::create_test_contract(0xDF);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 0xDF);
let contract_key = contract.key();
let contract_id = *contract_key.id();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 3),
SimOperation::Get {
contract_id,
return_contract_code: false,
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300), Duration::from_secs(120),
);
assert!(
result.turmoil_result.is_ok(),
"1MB streaming GET with 5% packet loss should complete without stalling: {:?}",
result.turmoil_result.err()
);
let node3_label = NodeLabel::node(NETWORK_NAME, 3);
let node3_storage = result
.node_storages
.get(&node3_label)
.expect("node 3 should have a storage handle");
let node3_state = node3_storage.get_stored_state(&contract_key);
assert!(
node3_state.is_some(),
"Node 3 should have 1MB contract state after streaming GET with packet loss"
);
let stored_bytes: Vec<u8> = node3_state.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 1MB state despite packet loss"
);
}
#[test]
fn test_streaming_get_triggers_auto_subscribe() {
const SEED: u64 = 0xDE1A_0009_CAFE_F00D;
const NETWORK_NAME: &str = "streaming-get-auto-subscribe";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 3, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(0xCC);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 0xCC);
let contract_key = contract.key();
let contract_id = *contract_key.id();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300),
Duration::from_secs(120),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming GET should complete: {:?}",
result.turmoil_result.err()
);
let node2_label = NodeLabel::node(NETWORK_NAME, 2);
let node2_storage = result
.node_storages
.get(&node2_label)
.expect("node 2 should have a storage handle");
let node2_state = node2_storage.get_stored_state(&contract_key);
assert!(
node2_state.is_some(),
"Node 2 should have contract state after streaming GET"
);
let node2_snapshot = result.topology_snapshots.iter().find(|s| {
s.contracts
.values()
.any(|c| c.contract_key == contract_key && c.is_hosting)
&& s.peer_addr.ip() != std::net::IpAddr::from([1u8, 0, 0, 1])
});
assert!(
node2_snapshot.is_some(),
"After streaming GET, the requesting node should be hosting the contract. \
Topology snapshots: {:#?}",
result
.topology_snapshots
.iter()
.map(|s| (
s.peer_addr,
s.contracts
.values()
.map(|c| (&c.contract_key, c.is_hosting, c.upstream.is_some()))
.collect::<Vec<_>>()
))
.collect::<Vec<_>>()
);
}
async fn setup_htl1_network(name: &str, nodes: usize, seed: u64, threshold: usize) -> SimNetwork {
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000;
const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + (seed % RANGE_MS));
let mut sim = SimNetwork::new(
name, 1, nodes, 1, 1, 10, 2, seed,
)
.await;
sim.with_streaming_threshold(threshold);
sim
}
#[ignore = "Infrastructure gap — HTL=1 topology isn't routable; see docstring and #3883"]
#[test]
fn test_driver_streaming_get_cold_cache() {
use freenet::dev_tool::GET_DRIVER_CALL_COUNT;
use std::sync::atomic::Ordering;
const SEED: u64 = 0xDE1A_0B0B_C01D_CA7E;
const NETWORK_NAME_PHASE1: &str = "driver-streaming-cold-cache-p1";
const NETWORK_NAME_PHASE2: &str = "driver-streaming-cold-cache-p2";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024; const NODES: usize = 10;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let contract = SimOperation::create_test_contract(0xBC);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 0xBC);
let contract_key = contract.key();
let contract_id = *contract_key.id();
let sim1 = rt.block_on(setup_htl1_network(
NETWORK_NAME_PHASE1,
NODES,
SEED,
THRESHOLD,
));
let phase1_ops = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME_PHASE1, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
)];
let phase1 = sim1.run_controlled_simulation(
SEED,
phase1_ops,
Duration::from_secs(120),
Duration::from_secs(30),
);
assert!(
phase1.turmoil_result.is_ok(),
"Phase 1 (PUT only) should complete: {:?}",
phase1.turmoil_result.err()
);
let cold_node_idx = (1..=NODES).find(|i| {
let label = NodeLabel::node(NETWORK_NAME_PHASE1, *i);
phase1
.node_storages
.get(&label)
.is_some_and(|s| s.get_stored_state(&contract_key).is_none())
});
let Some(cold_idx) = cold_node_idx else {
panic!(
"Test precondition: no cold-cache node exists after HTL=1 PUT \
in {NODES}-node topology. All nodes received the state during \
fan-out, which contradicts HTL=1. Try a larger topology."
);
};
let baseline_calls = GET_DRIVER_CALL_COUNT.load(Ordering::SeqCst);
let sim2 = rt.block_on(setup_htl1_network(
NETWORK_NAME_PHASE2,
NODES,
SEED,
THRESHOLD,
));
let cold_node_label = NodeLabel::node(NETWORK_NAME_PHASE2, cold_idx);
let phase2_ops = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME_PHASE2, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
),
ScheduledOperation::new(
cold_node_label.clone(),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
),
];
let phase2 = sim2.run_controlled_simulation(
SEED,
phase2_ops,
Duration::from_secs(300),
Duration::from_secs(120),
);
assert!(
phase2.turmoil_result.is_ok(),
"Phase 2 (PUT + cold GET) should complete: {:?}",
phase2.turmoil_result.err()
);
let driver_calls = GET_DRIVER_CALL_COUNT.load(Ordering::SeqCst);
assert!(
driver_calls > baseline_calls,
"Test infrastructure failure: GET_DRIVER_CALL_COUNT did not advance during \
phase 2. Either node {cold_idx} still had a relay-cached copy of the \
contract (phase-1 topology diverged from phase-2 topology), or the \
local-cache shortcut fired for some other reason. Without driver \
invocation this test does not exercise bug #1 from PR #3884."
);
let storage = phase2
.node_storages
.get(&cold_node_label)
.expect("cold-cache node should have a storage handle");
let stored = storage.get_stored_state(&contract_key);
assert!(
stored.is_some(),
"Bug #1 regression: cold-cache streaming GET through the driver did \
not write the contract state to local storage on {cold_node_label:?}. \
The driver's Terminal::Streaming path must ensure local caching."
);
assert_eq!(
stored.unwrap().as_ref().to_vec(),
large_state,
"Stored state on {cold_node_label:?} must match the PUT state"
);
}
#[ignore = "Infrastructure gap — HTL=1 topology isn't routable; see docstring and #3883"]
#[test]
fn test_driver_inline_get_triggers_auto_subscribe() {
use freenet::dev_tool::GET_DRIVER_CALL_COUNT;
use std::sync::atomic::Ordering;
const SEED: u64 = 0xDE1A_0B0C_A570_5C2B;
const NETWORK_NAME_PHASE1: &str = "driver-inline-auto-sub-p1";
const NETWORK_NAME_PHASE2: &str = "driver-inline-auto-sub-p2";
const THRESHOLD: usize = 1024;
const SMALL_STATE_SIZE: usize = 128;
const NODES: usize = 10;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let contract = SimOperation::create_test_contract(0xBD);
let small_state = SimOperation::create_large_state(SMALL_STATE_SIZE, 0xBD);
let contract_key = contract.key();
let contract_id = *contract_key.id();
let sim1 = rt.block_on(setup_htl1_network(
NETWORK_NAME_PHASE1,
NODES,
SEED,
THRESHOLD,
));
let phase1_ops = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME_PHASE1, 0),
SimOperation::Put {
contract: contract.clone(),
state: small_state.clone(),
subscribe: false,
},
)];
let phase1 = sim1.run_controlled_simulation(
SEED,
phase1_ops,
Duration::from_secs(120),
Duration::from_secs(30),
);
assert!(
phase1.turmoil_result.is_ok(),
"Phase 1 (PUT only) should complete: {:?}",
phase1.turmoil_result.err()
);
let cold_node_idx = (1..=NODES).find(|i| {
let label = NodeLabel::node(NETWORK_NAME_PHASE1, *i);
phase1
.node_storages
.get(&label)
.is_some_and(|s| s.get_stored_state(&contract_key).is_none())
});
let Some(cold_idx) = cold_node_idx else {
panic!(
"Test precondition: no cold-cache node exists after HTL=1 PUT \
in {NODES}-node topology."
);
};
let baseline_calls = GET_DRIVER_CALL_COUNT.load(Ordering::SeqCst);
let sim2 = rt.block_on(setup_htl1_network(
NETWORK_NAME_PHASE2,
NODES,
SEED,
THRESHOLD,
));
let cold_node_label = NodeLabel::node(NETWORK_NAME_PHASE2, cold_idx);
let phase2_ops = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME_PHASE2, 0),
SimOperation::Put {
contract: contract.clone(),
state: small_state.clone(),
subscribe: false,
},
),
ScheduledOperation::new(
cold_node_label.clone(),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
),
];
let phase2 = sim2.run_controlled_simulation(
SEED,
phase2_ops,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
phase2.turmoil_result.is_ok(),
"Phase 2 (PUT + cold GET) should complete: {:?}",
phase2.turmoil_result.err()
);
let driver_calls = GET_DRIVER_CALL_COUNT.load(Ordering::SeqCst);
assert!(
driver_calls > baseline_calls,
"Test infrastructure failure: GET_DRIVER_CALL_COUNT did not advance \
during phase 2. The cold node {cold_idx} must have routed through \
the task-per-tx driver."
);
let storage = phase2
.node_storages
.get(&cold_node_label)
.expect("cold-cache node should have a storage handle");
assert!(
storage.get_stored_state(&contract_key).is_some(),
"Cold-cache node should have stored the contract state after the GET"
);
let auto_subscribed = phase2
.topology_snapshots
.iter()
.any(|s| s.active_subscription_keys.contains(&contract_id));
assert!(
auto_subscribed,
"Bug #2 regression: cold-cache GET through the driver did not \
auto-subscribe the requesting node {cold_node_label:?}. The \
driver's Done arm must invoke `auto_subscribe_on_get_response` \
for successful GETs when the client did not explicitly set \
`subscribe=true`. Active subscriptions: {:#?}",
phase2
.topology_snapshots
.iter()
.map(|s| (s.peer_addr, s.active_subscription_keys.clone()))
.collect::<Vec<_>>()
);
}