#![cfg(feature = "simulation_tests")]
use freenet::config::{GlobalRng, GlobalSimulationTime};
use freenet::dev_tool::{
MockStateStorage, NodeLabel, ScheduledOperation, SimNetwork, SimOperation,
};
use freenet_stdlib::prelude::*;
use std::time::Duration;
async fn setup_streaming_network(
name: &str,
gateways: usize,
nodes: usize,
seed: u64,
streaming_threshold: usize,
) -> SimNetwork {
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000;
const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + (seed % RANGE_MS));
let mut sim = SimNetwork::new(
name, gateways, nodes, 7, 3, 10, 2, seed,
)
.await;
sim.with_streaming_threshold(streaming_threshold);
sim
}
fn find_contract_in_non_gateway_storages(
node_storages: &std::collections::HashMap<NodeLabel, MockStateStorage>,
contract_key: &ContractKey,
) -> Option<WrappedState> {
for (label, storage) in node_storages {
if label.is_node() {
if let Some(state) = storage.get_stored_state(contract_key) {
return Some(state);
}
}
}
None
}
#[test]
fn test_streaming_put_large_state() {
const SEED: u64 = 0x5720_0001_DEAD_BEEF;
const NETWORK_NAME: &str = "streaming-put-large";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(42);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 42);
let contract_key = contract.key();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(60),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming PUT simulation should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 100KB contract to be stored in at least one non-gateway node"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 100KB state"
);
}
#[test]
fn test_streaming_put_below_threshold_uses_inline() {
const SEED: u64 = 0x1011_0002_CAFE_BABE;
const NETWORK_NAME: &str = "streaming-inline";
const THRESHOLD: usize = 1024;
const SMALL_STATE_SIZE: usize = 512;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(99);
let small_state = SimOperation::create_large_state(SMALL_STATE_SIZE, 99);
let contract_key = contract.key();
let operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: small_state.clone(),
subscribe: false,
},
)];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(45),
);
assert!(
result.turmoil_result.is_ok(),
"Inline PUT simulation should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 512-byte contract to be stored in at least one non-gateway node"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, small_state,
"Stored state bytes should match the original 512-byte state"
);
}
#[test]
fn test_streaming_update_broadcast() {
const SEED: u64 = 0xBCA5_0003_1234_5678;
const NETWORK_NAME: &str = "streaming-update";
const THRESHOLD: usize = 1024;
const LARGE_UPDATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 3, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(55);
let contract_key = contract.key();
let initial_state = SimOperation::create_test_state(55);
let large_update = SimOperation::create_large_state(LARGE_UPDATE_SIZE, 77);
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: initial_state,
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Subscribe {
contract_id: *contract_key.id(),
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Subscribe {
contract_id: *contract_key.id(),
},
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: large_update,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(60),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming UPDATE broadcast simulation should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected at least one subscribing non-gateway node to have state for the updated contract"
);
}
#[test]
fn test_streaming_multiple_concurrent_puts() {
const SEED: u64 = 0xC00C_0004_ABCD_EF01;
const NETWORK_NAME: &str = "streaming-concurrent";
const THRESHOLD: usize = 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let contract_a = SimOperation::create_test_contract(10);
let state_a = SimOperation::create_large_state(50 * 1024, 10); let key_a = contract_a.key();
let contract_b = SimOperation::create_test_contract(20);
let state_b = SimOperation::create_large_state(80 * 1024, 20); let key_b = contract_b.key();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract_a.clone(),
state: state_a.clone(),
subscribe: false,
},
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract_b.clone(),
state: state_b.clone(),
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(60),
);
assert!(
result.turmoil_result.is_ok(),
"Multiple concurrent streaming PUTs should complete: {:?}",
result.turmoil_result.err()
);
let stored_a = find_contract_in_non_gateway_storages(&result.node_storages, &key_a);
assert!(
stored_a.is_some(),
"Expected 50KB contract A to be stored in at least one non-gateway node"
);
let stored_a_bytes: Vec<u8> = stored_a.unwrap().as_ref().to_vec();
assert_eq!(
stored_a_bytes, state_a,
"Stored state bytes for contract A should match the original 50KB state"
);
let stored_b = find_contract_in_non_gateway_storages(&result.node_storages, &key_b);
assert!(
stored_b.is_some(),
"Expected 80KB contract B to be stored in at least one non-gateway node"
);
let stored_b_bytes: Vec<u8> = stored_b.unwrap().as_ref().to_vec();
assert_eq!(
stored_b_bytes, state_b,
"Stored state bytes for contract B should match the original 80KB state"
);
}
#[test]
fn test_streaming_with_packet_loss() {
use freenet::simulation::FaultConfig;
const SEED: u64 = 0xA055_0005_BEEF_CAFE;
const NETWORK_NAME: &str = "streaming-lossy";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let fault_config = FaultConfig::builder().message_loss_rate(0.05).build();
sim.with_fault_injection(fault_config);
let contract = SimOperation::create_test_contract(33);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 33);
let contract_key = contract.key();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180), Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming with 5% packet loss should still complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 100KB contract to be stored in at least one non-gateway node even with packet loss"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 100KB state despite packet loss"
);
}
#[test]
fn test_streaming_with_packet_reordering() {
use freenet::simulation::FaultConfig;
const SEED: u64 = 0xBE0F_0006_DEAD_1234;
const NETWORK_NAME: &str = "streaming-reorder";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 2, SEED, THRESHOLD));
let fault_config = FaultConfig::builder()
.latency_range(Duration::from_millis(10)..Duration::from_millis(100))
.build();
sim.with_fault_injection(fault_config);
let contract = SimOperation::create_test_contract(66);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 66);
let contract_key = contract.key();
let operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
)];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming with packet reordering should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 100KB contract to be stored despite packet reordering"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 100KB state despite reordering"
);
}
#[test]
fn test_streaming_multi_hop_forwarding() {
const SEED: u64 = 0xF1A7_0007_CAFE_9876;
const NETWORK_NAME: &str = "streaming-multihop";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 200 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 4, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(88);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 88);
let contract_key = contract.key();
let operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
)];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Multi-hop streaming PUT should complete: {:?}",
result.turmoil_result.err()
);
let stored = find_contract_in_non_gateway_storages(&result.node_storages, &contract_key);
assert!(
stored.is_some(),
"Expected 200KB contract to be stored in at least one non-gateway node via multi-hop"
);
let stored_bytes: Vec<u8> = stored.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 200KB state"
);
}
#[test]
fn test_streaming_get_through_relay() {
const SEED: u64 = 0xDE1A_0008_FACE_B00C;
const NETWORK_NAME: &str = "streaming-get-relay";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 1024 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 4, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(0xAB);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 0xAB);
let contract_key = contract.key();
let contract_id = *contract_key.id();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: false,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 3),
SimOperation::Get {
contract_id,
return_contract_code: false,
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300), Duration::from_secs(120),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming GET through relay should complete: {:?}",
result.turmoil_result.err()
);
let node3_label = NodeLabel::node(NETWORK_NAME, 3);
let node3_storage = result
.node_storages
.get(&node3_label)
.expect("node 3 should have a storage handle");
let node3_state = node3_storage.get_stored_state(&contract_key);
assert!(
node3_state.is_some(),
"Node 3 should have 1MB contract state after streaming GET through relay"
);
let stored_bytes: Vec<u8> = node3_state.unwrap().as_ref().to_vec();
assert_eq!(
stored_bytes, large_state,
"Stored state bytes should match the original 1MB state after relay GET"
);
}
#[test]
fn test_streaming_get_triggers_auto_subscribe() {
const SEED: u64 = 0xDE1A_0009_CAFE_F00D;
const NETWORK_NAME: &str = "streaming-get-auto-subscribe";
const THRESHOLD: usize = 1024;
const LARGE_STATE_SIZE: usize = 100 * 1024;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let sim = rt.block_on(setup_streaming_network(NETWORK_NAME, 1, 3, SEED, THRESHOLD));
let contract = SimOperation::create_test_contract(0xCC);
let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 0xCC);
let contract_key = contract.key();
let contract_id = *contract_key.id();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: large_state.clone(),
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300),
Duration::from_secs(120),
);
assert!(
result.turmoil_result.is_ok(),
"Streaming GET should complete: {:?}",
result.turmoil_result.err()
);
let node2_label = NodeLabel::node(NETWORK_NAME, 2);
let node2_storage = result
.node_storages
.get(&node2_label)
.expect("node 2 should have a storage handle");
let node2_state = node2_storage.get_stored_state(&contract_key);
assert!(
node2_state.is_some(),
"Node 2 should have contract state after streaming GET"
);
let node2_snapshot = result.topology_snapshots.iter().find(|s| {
s.contracts
.values()
.any(|c| c.contract_key == contract_key && c.is_hosting)
&& s.peer_addr.ip() != std::net::IpAddr::from([1u8, 0, 0, 1])
});
assert!(
node2_snapshot.is_some(),
"After streaming GET, the requesting node should be hosting the contract. \
Topology snapshots: {:#?}",
result
.topology_snapshots
.iter()
.map(|s| (
s.peer_addr,
s.contracts
.values()
.map(|c| (&c.contract_key, c.is_hosting, c.upstream.is_some()))
.collect::<Vec<_>>()
))
.collect::<Vec<_>>()
);
}