#![cfg(feature = "simulation_tests")]
use freenet::config::GlobalTestMetrics;
use freenet::config::{GlobalRng, GlobalSimulationTime, SimulationTransportOpt};
use freenet::dev_tool::{
RequestId, SimNetwork, StreamId, VirtualTime, check_convergence_from_logs,
reset_channel_id_counter, reset_event_id_counter, reset_global_node_index, reset_nonce_counter,
};
use freenet::simulation::TimeSource;
use freenet::transport::in_memory_socket::{
SimulationSocket, clear_all_socket_registries, register_address_network,
register_network_time_source,
};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::net::{Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
struct TestConfig {
name: &'static str,
seed: u64,
gateways: usize,
nodes: usize,
ring_max_htl: usize,
rnd_if_htl_above: usize,
max_connections: usize,
min_connections: usize,
max_contracts: usize,
iterations: usize,
duration: Duration,
event_wait: Duration,
sleep_after_events: Duration,
require_convergence: bool,
latency_range: Option<std::ops::Range<Duration>>,
message_loss_rate: f64,
use_mock_wasm: bool,
}
impl TestConfig {
fn small(name: &'static str, seed: u64) -> Self {
Self {
name,
seed,
gateways: 1,
nodes: 3,
ring_max_htl: 7,
rnd_if_htl_above: 3,
max_connections: 10,
min_connections: 2,
max_contracts: 3,
iterations: 15,
duration: Duration::from_secs(20),
event_wait: Duration::from_millis(200),
sleep_after_events: Duration::from_secs(1),
require_convergence: true,
latency_range: None,
message_loss_rate: 0.0,
use_mock_wasm: false,
}
}
fn medium(name: &'static str, seed: u64) -> Self {
Self {
name,
seed,
gateways: 2,
nodes: 6,
ring_max_htl: 10,
rnd_if_htl_above: 7,
max_connections: 15,
min_connections: 2,
max_contracts: 8,
iterations: 100,
duration: Duration::from_secs(120),
event_wait: Duration::from_millis(200),
sleep_after_events: Duration::from_secs(3),
require_convergence: true,
latency_range: None,
message_loss_rate: 0.0,
use_mock_wasm: false,
}
}
fn large(name: &'static str, seed: u64) -> Self {
Self {
name,
seed,
gateways: 3,
nodes: 12,
ring_max_htl: 12,
rnd_if_htl_above: 8,
max_connections: 12,
min_connections: 6,
max_contracts: 15,
iterations: 150,
duration: Duration::from_secs(300),
event_wait: Duration::from_millis(200),
sleep_after_events: Duration::from_secs(10),
require_convergence: true,
latency_range: None,
message_loss_rate: 0.0,
use_mock_wasm: false,
}
}
#[allow(dead_code)]
fn long_running(name: &'static str, seed: u64) -> Self {
Self {
name,
seed,
gateways: 2,
nodes: 6,
ring_max_htl: 10,
rnd_if_htl_above: 5,
max_connections: 15,
min_connections: 3,
max_contracts: 8,
iterations: 360, duration: Duration::from_secs(3700), event_wait: Duration::from_secs(10), sleep_after_events: Duration::from_secs(10), require_convergence: true,
latency_range: Some(Duration::from_millis(10)..Duration::from_millis(50)),
message_loss_rate: 0.0,
use_mock_wasm: false,
}
}
fn with_nodes(mut self, nodes: usize) -> Self {
self.nodes = nodes;
self
}
fn with_gateways(mut self, gateways: usize) -> Self {
self.gateways = gateways;
self
}
fn with_iterations(mut self, iterations: usize) -> Self {
self.iterations = iterations;
self
}
fn with_max_contracts(mut self, max_contracts: usize) -> Self {
self.max_contracts = max_contracts;
self
}
fn with_duration(mut self, duration: Duration) -> Self {
self.duration = duration;
self
}
fn with_sleep(mut self, sleep: Duration) -> Self {
self.sleep_after_events = sleep;
self
}
fn with_connections(mut self, min: usize, max: usize) -> Self {
self.min_connections = min;
self.max_connections = max;
self
}
fn with_htl(mut self, max_htl: usize, rnd_above: usize) -> Self {
self.ring_max_htl = max_htl;
self.rnd_if_htl_above = rnd_above;
self
}
fn require_convergence(mut self) -> Self {
self.require_convergence = true;
self
}
#[allow(dead_code)]
fn with_latency(mut self, min: Duration, max: Duration) -> Self {
self.latency_range = Some(min..max);
self
}
fn with_event_wait(mut self, event_wait: Duration) -> Self {
self.event_wait = event_wait;
self
}
fn with_message_loss(mut self, rate: f64) -> Self {
self.message_loss_rate = rate.clamp(0.0, 1.0);
self
}
fn with_mock_wasm(mut self) -> Self {
self.use_mock_wasm = true;
self
}
fn run(self) -> TestResult {
use freenet::simulation::FaultConfig;
setup_deterministic_state(self.seed);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let mut sim = SimNetwork::new(
self.name,
self.gateways,
self.nodes,
self.ring_max_htl,
self.rnd_if_htl_above,
self.max_connections,
self.min_connections,
self.seed,
)
.await;
let has_latency = self.latency_range.is_some();
let has_loss = self.message_loss_rate > 0.0;
if has_latency || has_loss {
let mut builder = FaultConfig::builder();
if let Some(ref latency) = self.latency_range {
builder = builder.latency_range(latency.clone());
tracing::info!(
"Latency jitter enabled: {:?} - {:?}",
latency.start,
latency.end
);
}
if has_loss {
builder = builder.message_loss_rate(self.message_loss_rate);
tracing::info!(
"Message loss enabled: {:.1}%",
self.message_loss_rate * 100.0
);
}
sim.with_fault_injection(builder.build());
}
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let sleep_duration = self.sleep_after_events;
let event_wait = self.event_wait;
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
self.seed,
self.max_contracts,
self.iterations,
self.duration,
event_wait,
move || async move {
tokio::time::sleep(sleep_duration).await;
Ok(())
},
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
let event_count = rt.block_on(async { logs_handle.lock().await.len() });
TestResult {
seed: self.seed,
name: self.name,
simulation_result: result,
convergence,
event_count,
require_convergence: self.require_convergence,
logs_handle,
}
}
#[allow(dead_code)] fn run_direct(self) -> TestResult {
use freenet::simulation::FaultConfig;
setup_deterministic_state(self.seed);
let rt = create_runtime();
let use_mock_wasm = self.use_mock_wasm;
let (sim, logs_handle) = rt.block_on(async {
let mut sim = SimNetwork::new(
self.name,
self.gateways,
self.nodes,
self.ring_max_htl,
self.rnd_if_htl_above,
self.max_connections,
self.min_connections,
self.seed,
)
.await;
sim.use_mock_wasm = use_mock_wasm;
let has_latency = self.latency_range.is_some();
let has_loss = self.message_loss_rate > 0.0;
if has_latency || has_loss {
let mut builder = FaultConfig::builder();
if let Some(ref latency) = self.latency_range {
builder = builder.latency_range(latency.clone());
tracing::info!(
"Latency jitter enabled: {:?} - {:?}",
latency.start,
latency.end
);
}
if has_loss {
builder = builder.message_loss_rate(self.message_loss_rate);
tracing::info!(
"Message loss enabled: {:.1}%",
self.message_loss_rate * 100.0
);
}
sim.with_fault_injection(builder.build());
}
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
drop(rt);
let direct_result = sim.run_simulation_direct::<rand::rngs::SmallRng>(
self.seed,
self.max_contracts,
self.iterations,
self.event_wait,
);
let simulation_result: turmoil::Result = match direct_result {
Ok(()) => Ok(()),
Err(e) => Err(Box::new(std::io::Error::other(e.to_string()))),
};
let rt = create_runtime();
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
let event_count = rt.block_on(async { logs_handle.lock().await.len() });
TestResult {
seed: self.seed,
name: self.name,
simulation_result,
convergence,
event_count,
require_convergence: self.require_convergence,
logs_handle,
}
}
}
struct TestResult {
seed: u64,
name: &'static str,
simulation_result: turmoil::Result,
convergence: freenet::dev_tool::ConvergenceResult,
event_count: usize,
require_convergence: bool,
logs_handle: Arc<Mutex<Vec<freenet::tracing::NetLogMessage>>>,
}
impl TestResult {
fn assert_ok(self) -> Self {
if let Err(e) = &self.simulation_result {
tracing::error!("============================================================");
tracing::error!("SIMULATION TEST FAILED: {}", self.name);
tracing::error!("============================================================");
tracing::error!("Seed for reproduction: 0x{:X}", self.seed);
tracing::error!("Error: {:?}", e);
tracing::error!("============================================================");
}
assert!(
self.simulation_result.is_ok(),
"{} failed: {:?}",
self.name,
self.simulation_result.err()
);
self
}
fn check_convergence(self) -> Self {
tracing::info!("=== CONVERGENCE CHECK: {} ===", self.name);
tracing::info!(
"Result: {} converged, {} diverged, {} events",
self.convergence.converged.len(),
self.convergence.diverged.len(),
self.event_count
);
let rt = create_runtime();
let logs = rt.block_on(async { self.logs_handle.lock().await.clone() });
for diverged in &self.convergence.diverged {
tracing::warn!(
"DIVERGED: {} - {} unique states across {} peers",
diverged.contract_key,
diverged.unique_state_count(),
diverged.peer_states.len()
);
for (peer, hash) in &diverged.peer_states {
tracing::warn!(" peer {}: {}", peer, hash);
}
tracing::debug!(
"Stored state events for contract {}:",
diverged.contract_key
);
for log in &logs {
let contract_key = log.kind.contract_key().map(|k| format!("{:?}", k));
if contract_key.as_ref() == Some(&diverged.contract_key) {
if let Some(state_hash) = log.kind.stored_state_hash() {
let variant = log.kind.variant_name();
tracing::debug!(
" {} @ {}: {} -> {}",
log.tx,
log.peer_id.socket_addr(),
variant,
&state_hash[..16]
);
}
}
}
tracing::debug!("Subscribe events for contract {}:", diverged.contract_key);
for log in &logs {
let contract_key = log.kind.contract_key().map(|k| format!("{:?}", k));
if contract_key.as_ref() == Some(&diverged.contract_key) {
let variant = log.kind.variant_name();
if variant.contains("Subscribe") {
tracing::debug!(
" {} @ {}: {}",
log.tx,
log.peer_id.socket_addr(),
variant
);
}
}
}
}
if self.require_convergence {
assert!(
self.convergence.is_converged(),
"{} convergence failed: {} converged, {} diverged",
self.name,
self.convergence.converged.len(),
self.convergence.diverged.len()
);
}
tracing::info!("{} PASSED", self.name);
self
}
fn verify_operation_coverage(self) -> Self {
let rt = create_runtime();
let logs = rt.block_on(async { self.logs_handle.lock().await.clone() });
let mut put_count = 0;
let mut get_count = 0;
let mut update_count = 0;
let mut subscribe_count = 0;
let mut contracts_with_puts: std::collections::HashSet<String> =
std::collections::HashSet::new();
let mut puts_per_contract: std::collections::HashMap<
String,
std::collections::HashSet<(String, String)>,
> = std::collections::HashMap::new();
for log in &logs {
let variant = log.kind.variant_name();
if variant.starts_with("Put") {
put_count += 1;
if let Some(key) = log.kind.contract_key() {
let key_str = format!("{:?}", key);
contracts_with_puts.insert(key_str.clone());
let peer_addr = format!("{}", log.peer_id.socket_addr());
let state_hash = log
.kind
.state_hash()
.map(|h| h[..8].to_string())
.unwrap_or_default();
puts_per_contract
.entry(key_str)
.or_default()
.insert((peer_addr, state_hash));
}
} else if variant.starts_with("Get") {
get_count += 1;
} else if variant.starts_with("Update") {
update_count += 1;
} else if variant.starts_with("Subscribe") {
subscribe_count += 1;
}
}
let contracts_with_concurrent_puts: Vec<_> = puts_per_contract
.iter()
.filter(|(_, puts)| puts.len() > 1)
.collect();
tracing::info!("=== OPERATION COVERAGE: {} ===", self.name);
tracing::info!("PUT operations: {}", put_count);
tracing::info!("GET operations: {}", get_count);
tracing::info!("UPDATE operations: {}", update_count);
tracing::info!("SUBSCRIBE operations: {}", subscribe_count);
tracing::info!("Contracts with PUTs: {}", contracts_with_puts.len());
tracing::info!(
"Contracts with concurrent PUTs: {}",
contracts_with_concurrent_puts.len()
);
for (contract, puts) in &contracts_with_concurrent_puts {
tracing::debug!(
" Contract {} has {} concurrent puts from peers:",
&contract[..20],
puts.len()
);
for (peer, hash) in puts.iter().take(5) {
tracing::debug!(" peer={} state={}", peer, hash);
}
}
assert!(
put_count > 0,
"{}: No PUT operations detected - test not exercising puts",
self.name
);
assert!(
get_count > 0,
"{}: No GET operations detected - test not exercising gets",
self.name
);
assert!(
update_count > 0,
"{}: No UPDATE operations detected - test not exercising updates",
self.name
);
assert!(
subscribe_count > 0,
"{}: No SUBSCRIBE operations detected - test not exercising subscribes",
self.name
);
assert!(
!contracts_with_puts.is_empty(),
"{}: No contracts with PUTs - test not exercising contract creation",
self.name
);
tracing::info!(
"{}: Operation coverage verified (PUT={}, GET={}, UPDATE={}, SUBSCRIBE={}, contracts={})",
self.name,
put_count,
get_count,
update_count,
subscribe_count,
contracts_with_puts.len()
);
self
}
fn verify_state_report(self) -> Self {
let rt = create_runtime();
let report = rt.block_on(async {
let logs = self.logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!("=== ANOMALY DETECTION REPORT: {} ===", self.name);
tracing::info!(
"Events analyzed: {} total, {} state-mutating",
report.total_events,
report.state_events
);
tracing::info!("Contracts analyzed: {}", report.contracts_analyzed);
tracing::info!("Total anomalies: {}", report.anomalies.len());
if report.anomalies.is_empty() {
tracing::info!(
"{}: State verification CLEAN - no anomalies detected",
self.name
);
} else {
let divergences = report.divergences();
let missing = report.missing_broadcasts();
let unapplied = report.unapplied_broadcasts();
let partitions = report.suspected_partitions();
let stale = report.stale_peers();
let oscillations = report.state_oscillations();
let zombies = report.zombie_transactions();
let storms = report.broadcast_storms();
let cascades = report.delta_sync_cascades();
tracing::warn!(
"{}: {} anomalies detected: divergences={}, missing_broadcasts={}, \
unapplied_broadcasts={}, partitions={}, stale_peers={}, oscillations={}, \
zombies={}, storms={}, cascades={}",
self.name,
report.anomalies.len(),
divergences.len(),
missing.len(),
unapplied.len(),
partitions.len(),
stale.len(),
oscillations.len(),
zombies.len(),
storms.len(),
cascades.len(),
);
for (i, anomaly) in report.anomalies.iter().enumerate() {
tracing::debug!("{}: anomaly[{}] = {:?}", self.name, i, anomaly);
}
}
self
}
fn router_snapshots(&self) -> Vec<(usize, usize, bool)> {
let rt = create_runtime();
rt.block_on(async {
let logs = self.logs_handle.lock().await;
logs.iter()
.filter_map(|log| log.kind.router_snapshot_summary())
.collect()
})
}
}
fn setup_deterministic_state(seed: u64) {
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000; const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000; GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + (seed % RANGE_MS));
GlobalTestMetrics::reset();
SimulationTransportOpt::disable();
freenet::dev_tool::clear_crdt_contracts();
RequestId::reset_counter();
freenet::dev_tool::ClientId::reset_counter();
reset_event_id_counter();
reset_channel_id_counter();
StreamId::reset_counter();
reset_nonce_counter();
reset_global_node_index();
}
fn create_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct TraceFingerprint {
total_events: usize,
sequence_hash: u64,
counts_hash: u64,
}
impl TraceFingerprint {
fn from_events(events: &[String], event_counts: &HashMap<String, usize>) -> Self {
let mut seq_hasher = std::collections::hash_map::DefaultHasher::new();
for event in events {
event.hash(&mut seq_hasher);
}
let sorted_counts: BTreeMap<&String, &usize> = event_counts.iter().collect();
let mut counts_hasher = std::collections::hash_map::DefaultHasher::new();
for (k, v) in &sorted_counts {
k.hash(&mut counts_hasher);
v.hash(&mut counts_hasher);
}
TraceFingerprint {
total_events: events.len(),
sequence_hash: seq_hasher.finish(),
counts_hash: counts_hasher.finish(),
}
}
}
#[test_log::test]
fn test_strict_determinism_exact_event_equality() {
const SEED: u64 = 0xDE7E_2A1E_1234;
#[derive(Debug, PartialEq)]
struct SimulationTrace {
event_counts: HashMap<String, usize>,
event_sequence: Vec<String>, total_events: usize,
}
fn run_and_trace(name: &str, seed: u64) -> (turmoil::Result, SimulationTrace) {
setup_deterministic_state(seed);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
name, 2, 18, 10, 3, 15, 5, seed,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
seed,
10, 40, Duration::from_secs(30), Duration::from_millis(200),
|| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
},
);
let trace = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut event_counts: HashMap<String, usize> = HashMap::new();
let mut event_sequence: Vec<String> = Vec::new();
for log in logs.iter() {
let kind_name = log.kind.variant_name().to_string();
*event_counts.entry(kind_name.clone()).or_insert(0) += 1;
event_sequence.push(kind_name);
}
SimulationTrace {
total_events: logs.len(),
event_counts,
event_sequence,
}
});
(result, trace)
}
let (result1, trace1) = run_and_trace("strict-det-run1", SEED);
let (result2, trace2) = run_and_trace("strict-det-run2", SEED);
let (result3, trace3) = run_and_trace("strict-det-run3", SEED);
assert_eq!(
result1.is_ok(),
result2.is_ok(),
"STRICT DETERMINISM FAILURE: Simulation outcomes differ!\nRun 1: {:?}\nRun 2: {:?}",
result1,
result2
);
assert_eq!(
result2.is_ok(),
result3.is_ok(),
"STRICT DETERMINISM FAILURE: Simulation outcomes differ!\nRun 2: {:?}\nRun 3: {:?}",
result2,
result3
);
if trace1.total_events != trace2.total_events || trace2.total_events != trace3.total_events {
tracing::info!("\n=== DETERMINISM DEBUG ===");
tracing::info!(
"Run 1 total: {}, Run 2 total: {}, Run 3 total: {}",
trace1.total_events,
trace2.total_events,
trace3.total_events
);
let mut all_types: std::collections::BTreeSet<&String> =
trace1.event_counts.keys().collect();
all_types.extend(trace2.event_counts.keys());
all_types.extend(trace3.event_counts.keys());
for event_type in all_types {
let count1 = trace1.event_counts.get(event_type).unwrap_or(&0);
let count2 = trace2.event_counts.get(event_type).unwrap_or(&0);
let count3 = trace3.event_counts.get(event_type).unwrap_or(&0);
if count1 != count2 || count2 != count3 {
tracing::info!(
" {} : {} vs {} vs {} (DIFFERS)",
event_type,
count1,
count2,
count3
);
}
}
tracing::info!("=========================\n");
}
assert_eq!(
trace1.total_events, trace2.total_events,
"STRICT DETERMINISM FAILURE: Total event counts differ!"
);
assert_eq!(
trace2.total_events, trace3.total_events,
"STRICT DETERMINISM FAILURE: Total event counts differ!"
);
assert_eq!(
trace1.event_counts, trace2.event_counts,
"STRICT DETERMINISM FAILURE: Event counts per type differ!"
);
assert_eq!(
trace2.event_counts, trace3.event_counts,
"STRICT DETERMINISM FAILURE: Event counts per type differ!"
);
for (i, ((e1, e2), e3)) in trace1
.event_sequence
.iter()
.zip(trace2.event_sequence.iter())
.zip(trace3.event_sequence.iter())
.enumerate()
{
assert_eq!(e1, e2, "Event sequence differs at index {}!", i);
assert_eq!(e2, e3, "Event sequence differs at index {}!", i);
}
let fp1 = TraceFingerprint::from_events(&trace1.event_sequence, &trace1.event_counts);
let fp2 = TraceFingerprint::from_events(&trace2.event_sequence, &trace2.event_counts);
let fp3 = TraceFingerprint::from_events(&trace3.event_sequence, &trace3.event_counts);
assert_eq!(fp1, fp2, "Fingerprint mismatch between run 1 and run 2");
assert_eq!(fp2, fp3, "Fingerprint mismatch between run 2 and run 3");
tracing::info!(
"STRICT DETERMINISM TEST PASSED: {} events, fingerprint={:#018x}",
trace1.total_events,
fp1.sequence_hash
);
}
#[test_log::test]
fn test_strict_determinism_multi_gateway() {
const SEED: u64 = 0xAB17_6A7E_1234;
#[derive(Debug, PartialEq)]
struct SimulationTrace {
event_counts: HashMap<String, usize>,
event_sequence: Vec<String>,
total_events: usize,
}
fn run_and_trace(name: &str, seed: u64) -> (turmoil::Result, SimulationTrace) {
setup_deterministic_state(seed);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
name, 2, 6, 7, 3, 10, 2, seed,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
seed,
5,
20,
Duration::from_secs(30),
Duration::from_millis(200),
|| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
},
);
let trace = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut event_counts: HashMap<String, usize> = HashMap::new();
let mut event_sequence: Vec<String> = Vec::new();
for log in logs.iter() {
let kind_name = log.kind.variant_name().to_string();
*event_counts.entry(kind_name.clone()).or_insert(0) += 1;
event_sequence.push(kind_name);
}
SimulationTrace {
total_events: logs.len(),
event_counts,
event_sequence,
}
});
(result, trace)
}
let (result1, trace1) = run_and_trace("multi-gw-det-run1", SEED);
let (result2, trace2) = run_and_trace("multi-gw-det-run2", SEED);
let (result3, trace3) = run_and_trace("multi-gw-det-run3", SEED);
assert_eq!(result1.is_ok(), result2.is_ok());
assert_eq!(result2.is_ok(), result3.is_ok());
assert_eq!(trace1.total_events, trace2.total_events);
assert_eq!(trace2.total_events, trace3.total_events);
assert_eq!(trace1.event_counts, trace2.event_counts);
assert_eq!(trace2.event_counts, trace3.event_counts);
for (i, ((e1, e2), e3)) in trace1
.event_sequence
.iter()
.zip(trace2.event_sequence.iter())
.zip(trace3.event_sequence.iter())
.enumerate()
{
assert_eq!(e1, e2, "Event sequence differs at index {}!", i);
assert_eq!(e2, e3, "Event sequence differs at index {}!", i);
}
let fp1 = TraceFingerprint::from_events(&trace1.event_sequence, &trace1.event_counts);
let fp2 = TraceFingerprint::from_events(&trace2.event_sequence, &trace2.event_counts);
let fp3 = TraceFingerprint::from_events(&trace3.event_sequence, &trace3.event_counts);
assert_eq!(fp1, fp2, "Fingerprint mismatch between run 1 and run 2");
assert_eq!(fp2, fp3, "Fingerprint mismatch between run 2 and run 3");
tracing::info!(
"MULTI-GATEWAY DETERMINISM TEST PASSED: {} events (2 gateways), fingerprint={:#018x}",
trace1.total_events,
fp1.sequence_hash
);
}
#[test_log::test]
fn test_deterministic_replay_events() {
const SEED: u64 = 0xDEAD_BEEF_1234;
#[derive(Debug, PartialEq)]
struct ReplayTrace {
event_counts: HashMap<String, usize>,
total_events: usize,
}
fn run_and_trace(name: &str, seed: u64) -> (turmoil::Result, ReplayTrace) {
setup_deterministic_state(seed);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(name, 1, 3, 7, 3, 10, 2, seed).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
seed,
3,
10,
Duration::from_secs(20),
Duration::from_millis(200),
|| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
},
);
let trace = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut event_counts: HashMap<String, usize> = HashMap::new();
for log in logs.iter() {
let kind_name = log.kind.variant_name().to_string();
*event_counts.entry(kind_name).or_insert(0) += 1;
}
ReplayTrace {
total_events: logs.len(),
event_counts,
}
});
(result, trace)
}
let (result1, trace1) = run_and_trace("replay-run1", SEED);
let (result2, trace2) = run_and_trace("replay-run2", SEED);
assert_eq!(result1.is_ok(), result2.is_ok());
assert!(trace1.total_events > 0);
assert_eq!(trace1.event_counts, trace2.event_counts);
assert_eq!(trace1.total_events, trace2.total_events);
let fp1 = TraceFingerprint::from_events(&[], &trace1.event_counts);
let fp2 = TraceFingerprint::from_events(&[], &trace2.event_counts);
assert_eq!(
fp1.counts_hash, fp2.counts_hash,
"Counts fingerprint mismatch between replay runs"
);
tracing::info!(
"Deterministic replay test passed - {} events, counts_hash={:#018x}",
trace1.total_events,
fp1.counts_hash
);
}
#[test_log::test]
fn test_sim_network_basic_setup() {
#[allow(clippy::unusual_byte_groupings)]
const SEED: u64 = 0xBA51C_5E70_0001;
setup_deterministic_state(SEED);
let rt = create_runtime();
rt.block_on(async {
let mut sim = SimNetwork::new("basic-setup", 1, 5, 7, 3, 10, 2, SEED).await;
sim.with_start_backoff(Duration::from_millis(100));
let peers = sim.build_peers();
assert_eq!(peers.len(), 6, "Expected 1 gateway + 5 nodes = 6 peers");
let gateway_count = peers.iter().filter(|(l, _)| !l.is_node()).count();
let node_count = peers.iter().filter(|(l, _)| l.is_node()).count();
assert_eq!(gateway_count, 1, "Expected 1 gateway");
assert_eq!(node_count, 5, "Expected 5 regular nodes");
});
}
#[test_log::test]
fn test_sim_network_peer_startup() {
TestConfig::small("peer-startup", 0xBEE2_5747_0001)
.with_nodes(2)
.with_iterations(5)
.with_duration(Duration::from_secs(15))
.run()
.assert_ok();
}
#[test_log::test]
fn test_sim_network_connectivity() {
TestConfig::small("connectivity-test", 0xC0EE_3C70_0001)
.with_max_contracts(2)
.with_iterations(10)
.run()
.assert_ok();
}
#[test_log::test]
fn ci_quick_simulation() {
TestConfig::small("ci-quick-sim", 0xC1F1_ED5E_ED00)
.with_nodes(4)
.with_max_contracts(5)
.with_iterations(50)
.with_duration(Duration::from_secs(45))
.with_sleep(Duration::from_secs(2))
.run()
.assert_ok()
.verify_operation_coverage()
.check_convergence()
.verify_state_report();
}
#[test_log::test]
fn ci_medium_simulation() {
TestConfig::medium("ci-medium-sim", 0xC1F1_ED7E_ED01)
.run()
.assert_ok()
.verify_operation_coverage()
.check_convergence()
.verify_state_report();
}
#[test_log::test]
fn mock_wasm_runtime_simulation() {
TestConfig::small("mock-wasm-sim", 0xA0C1_1A5E_0001)
.with_mock_wasm()
.run_direct()
.assert_ok()
.verify_state_report();
}
#[test_log::test]
fn replica_validation_and_stepwise_consistency() {
TestConfig::medium("replica-validation", 0xBEE1_1CA5_0001)
.with_gateways(2)
.with_nodes(8)
.with_htl(8, 4)
.with_connections(4, 8)
.with_max_contracts(10)
.with_iterations(90)
.with_duration(Duration::from_secs(180))
.with_sleep(Duration::from_secs(5))
.require_convergence()
.run()
.assert_ok()
.verify_operation_coverage()
.check_convergence()
.verify_state_report();
}
#[test_log::test]
fn dense_network_replication() {
TestConfig::large("dense-network", 0xDE05_E0F0_0001)
.require_convergence()
.run()
.assert_ok()
.verify_operation_coverage()
.check_convergence()
.verify_state_report();
}
#[test_log::test]
fn test_turmoil_determinism_verification() {
const SEED: u64 = 0xDE7E_2A11_0001;
fn run_simulation(name: &'static str, seed: u64) -> Vec<String> {
setup_deterministic_state(seed);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(name, 1, 3, 7, 3, 10, 2, seed).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
seed,
3,
15,
Duration::from_secs(20),
Duration::from_millis(200),
|| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
},
);
assert!(result.is_ok(), "Simulation failed: {:?}", result.err());
rt.block_on(async {
let logs = logs_handle.lock().await;
logs.iter()
.map(|log| format!("{:?}", log.kind.variant_name()))
.collect()
})
}
let events1 = run_simulation("det-verify-1", SEED);
let events2 = run_simulation("det-verify-2", SEED);
assert_eq!(
events1.len(),
events2.len(),
"Event counts differ: {} vs {}",
events1.len(),
events2.len()
);
for (i, (e1, e2)) in events1.iter().zip(events2.iter()).enumerate() {
assert_eq!(e1, e2, "Event {} differs: {:?} vs {:?}", i, e1, e2);
}
let counts1: HashMap<String, usize> = {
let mut m = HashMap::new();
for e in &events1 {
*m.entry(e.clone()).or_insert(0) += 1;
}
m
};
let counts2: HashMap<String, usize> = {
let mut m = HashMap::new();
for e in &events2 {
*m.entry(e.clone()).or_insert(0) += 1;
}
m
};
let fp1 = TraceFingerprint::from_events(&events1, &counts1);
let fp2 = TraceFingerprint::from_events(&events2, &counts2);
assert_eq!(fp1, fp2, "Fingerprint mismatch between run 1 and run 2");
tracing::info!(
"Determinism verified: {} identical events, fingerprint={:#018x}",
events1.len(),
fp1.sequence_hash
);
}
#[test_log::test]
fn test_graceful_shutdown_no_deadlock() {
const SEED: u64 = 0xDEAD_BEEF_CAFE;
let rt = create_runtime();
let sim = rt.block_on(async {
SimNetwork::new("graceful-shutdown-test", 1, 2, 7, 3, 10, 2, SEED).await
});
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
SEED,
10,
3,
Duration::from_secs(30),
Duration::from_millis(200),
|| async {
tokio::time::sleep(Duration::from_secs(5)).await;
tracing::info!("Test client: simulation will end gracefully");
Ok(())
},
);
assert!(
result.is_ok(),
"Simulation should complete without deadlock: {:?}",
result.err()
);
tracing::info!("Graceful shutdown test passed - no deadlock under Turmoil");
}
#[test_log::test]
fn test_graceful_shutdown_typed_error() {
use freenet::EventLoopExitReason;
let graceful = EventLoopExitReason::GracefulShutdown;
let unexpected = EventLoopExitReason::UnexpectedStreamEnd;
assert_eq!(graceful.to_string(), "Graceful shutdown");
assert_eq!(
unexpected.to_string(),
"Network event stream ended unexpectedly"
);
tracing::info!("Typed error test passed");
}
#[test_log::test]
fn test_high_latency_timeout_regression() {
use freenet::simulation::FaultConfig;
const SEED: u64 = 0xDEAD_BEEF_1234;
tracing::info!(
"Starting high-latency regression test with seed: 0x{:X}",
SEED
);
setup_deterministic_state(SEED);
let rt = create_runtime();
rt.block_on(async {
let mut sim = SimNetwork::new(
"high-latency-regression",
1, 3, 7, 3, 10, 2, SEED,
)
.await;
let fault_config = FaultConfig::builder()
.latency_range(Duration::from_millis(150)..Duration::from_millis(200))
.build();
sim.with_fault_injection(fault_config);
sim.with_start_backoff(Duration::from_millis(100));
let _handles = sim
.start_with_rand_gen::<rand::rngs::SmallRng>(SEED, 1, 1)
.await;
for _ in 0..50 {
sim.advance_time(Duration::from_millis(100));
tokio::task::yield_now().await;
}
match sim
.check_partial_connectivity(Duration::from_secs(60), 0.5)
.await
{
Ok(()) => {
tracing::info!("High-latency network established connectivity");
}
Err(e) => {
tracing::warn!(
"High-latency connectivity check incomplete: {} (this may be acceptable)",
e
);
}
}
let summary = sim.get_operation_summary().await;
let total_timeouts = summary.timeouts;
tracing::info!(
"High-latency test results: {} total timeouts, {:.1}% success rate",
total_timeouts,
summary.overall_success_rate() * 100.0
);
assert!(
total_timeouts < 50,
"Expected <50 timeouts with MIN_RTO=300ms fix, got {} (timeout storm detected!)",
total_timeouts
);
tracing::info!("High-latency regression test PASSED - no timeout storm detected");
});
}
#[test]
fn test_turmoil_with_real_simulation_socket() -> turmoil::Result {
clear_all_socket_registries();
let network_name = "turmoil-test";
let virtual_time = VirtualTime::new();
register_network_time_source(network_name, virtual_time);
let server_addr: SocketAddr = (Ipv6Addr::LOCALHOST, 18000).into();
let client_addr: SocketAddr = (Ipv6Addr::LOCALHOST, 19000).into();
register_address_network(server_addr, network_name);
register_address_network(client_addr, network_name);
let mut sim = turmoil::Builder::new()
.simulation_duration(Duration::from_secs(10))
.rng_seed(0xDEAD_BEEF_CAFE_1234)
.build();
sim.host("server", move || async move {
let socket = SimulationSocket::bind(server_addr)
.await
.expect("Server bind failed");
tracing::info!("Server bound to {:?}", server_addr);
let mut buf = [0u8; 1024];
match socket.recv_from(&mut buf).await {
Ok((len, from)) => {
let msg = &buf[..len];
tracing::info!("Server received {:?} from {:?}", msg, from);
socket.send_to(b"pong", from).await.ok();
}
Err(e) => {
tracing::error!("Server recv error: {:?}", e);
}
}
Ok(())
});
sim.client("client", async move {
tokio::time::sleep(Duration::from_millis(100)).await;
let socket = SimulationSocket::bind(client_addr)
.await
.expect("Client bind failed");
tracing::info!("Client bound to {:?}", client_addr);
socket.send_to(b"ping", server_addr).await.ok();
tracing::info!("Client sent ping");
let mut buf = [0u8; 1024];
match tokio::time::timeout(Duration::from_secs(5), socket.recv_from(&mut buf)).await {
Ok(Ok((len, from))) => {
let msg = &buf[..len];
tracing::info!("Client received {:?} from {:?}", msg, from);
assert_eq!(msg, b"pong");
assert_eq!(from, server_addr);
}
Ok(Err(e)) => {
tracing::error!("Client recv error: {:?}", e);
}
Err(_) => {
tracing::error!("Client recv timeout");
}
}
Ok(())
});
sim.run()
}
const SOURCE_THRESHOLD: f64 = 0.05;
const SINGLE_HOSTER_NETWORK: &str = "single-hoster-test";
fn ring_distance(a: f64, b: f64) -> f64 {
let diff = (a - b).abs();
diff.min(1.0 - diff)
}
#[test_log::test]
fn test_topology_single_hoster() {
use freenet::dev_tool::{
Location, NodeLabel, ScheduledOperation, SimOperation, validate_topology_from_snapshots,
};
const SEED: u64 = 0x5EED_0001_CAFE;
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, gateway_location) = rt.block_on(async {
let sim = SimNetwork::new(
SINGLE_HOSTER_NETWORK,
1, 2, 7, 3, 10, 2, SEED,
)
.await;
let locations = sim.get_peer_locations();
let gateway_loc = locations
.first()
.copied()
.expect("Should have at least one gateway");
(sim, gateway_loc)
});
let _ = gateway_location;
let contract_seed = 42u8;
let contract = SimOperation::create_test_contract(contract_seed);
let contract_id = *contract.key().id();
let initial_state = SimOperation::create_test_state(contract_seed);
let operations = vec![ScheduledOperation::new(
NodeLabel::gateway(SINGLE_HOSTER_NETWORK, 0),
SimOperation::Put {
contract: contract.clone(),
state: initial_state.clone(),
subscribe: true,
},
)];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120), Duration::from_secs(45), );
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete successfully: {:?}",
result.turmoil_result.err()
);
let snapshots = result.topology_snapshots;
tracing::info!("Captured {} topology snapshots", snapshots.len());
let gateway_with_contract = snapshots
.iter()
.find(|snap| snap.contracts.contains_key(&contract_id));
assert!(
gateway_with_contract.is_some(),
"Gateway should have the contract in its topology snapshot. \
Available snapshots: {} peers",
snapshots.len()
);
let gateway_snap = gateway_with_contract.unwrap();
let contract_sub = gateway_snap.contracts.get(&contract_id).unwrap();
let contract_location = Location::from(&contract_id).as_f64();
let gateway_location = gateway_snap.location;
let gateway_distance = ring_distance(gateway_location, contract_location);
tracing::info!(
"Contract location: {:.4}, Gateway location: {:.4}, Distance: {:.4}, Threshold: {:.4}",
contract_location,
gateway_location,
gateway_distance,
SOURCE_THRESHOLD
);
assert!(
contract_sub.is_hosting,
"Gateway should be hosting the contract after PUT with subscribe=true"
);
tracing::info!(
"Gateway {} is hosting contract: upstream={:?}, downstream={:?}",
gateway_snap.peer_addr,
contract_sub.upstream,
contract_sub.downstream
);
let result = validate_topology_from_snapshots(&snapshots, &contract_id, contract_location);
tracing::info!(
"Topology validation: cycles={}, orphans={}, disconnected={}, unreachable={}, proximity={}",
result.bidirectional_cycles.len(),
result.orphan_hosters.len(),
result.disconnected_upstream.len(),
result.unreachable_hosters.len(),
result.proximity_violations.len()
);
assert!(
result.bidirectional_cycles.is_empty(),
"Single hoster should have no bidirectional cycles, found: {:?}",
result.bidirectional_cycles
);
let gateway_is_source = gateway_distance < SOURCE_THRESHOLD;
if gateway_is_source {
tracing::info!(
"Gateway IS source (distance={:.4} < threshold={:.4}) - validating healthy topology",
gateway_distance,
SOURCE_THRESHOLD
);
assert!(
result.orphan_hosters.is_empty(),
"Source hoster (within threshold) should not be marked as orphan, found: {:?}",
result.orphan_hosters
);
assert!(
result.disconnected_upstream.is_empty(),
"Source hoster (within threshold) should not be disconnected, found: {:?}",
result.disconnected_upstream
);
assert!(
result.unreachable_hosters.is_empty(),
"Source hoster should have no unreachable hosters, found: {:?}",
result.unreachable_hosters
);
assert!(
result.is_healthy(),
"Single source hoster topology should be healthy, got {} issues",
result.issue_count
);
} else {
tracing::warn!(
"Gateway is NOT source (distance={:.4} >= threshold={:.4}) - orphan/disconnected issues are expected",
gateway_distance,
SOURCE_THRESHOLD
);
if !result.orphan_hosters.is_empty() {
tracing::info!(
"Expected: Gateway flagged as orphan (not a source, no upstream): {:?}",
result.orphan_hosters
);
}
if !result.disconnected_upstream.is_empty() {
tracing::info!(
"Expected: Gateway flagged as disconnected upstream (not a source, has downstream): {:?}",
result.disconnected_upstream
);
}
}
tracing::info!(
"Single hoster topology test passed - gateway_is_source={}, cycles=0, other_issues={}",
gateway_is_source,
result.issue_count
);
}
#[test_log::test]
fn test_concurrent_updates_convergence() {
TestConfig::medium("concurrent-updates-convergence", 0xC0C0_BEEF_1234)
.with_gateways(2) .with_nodes(6) .with_max_contracts(3) .with_iterations(80) .with_duration(Duration::from_secs(90))
.with_sleep(Duration::from_secs(2)) .require_convergence() .run()
.assert_ok()
.verify_operation_coverage()
.check_convergence()
.verify_state_report();
}
#[test_log::test]
fn test_full_state_send_no_incorrect_caching() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation};
const SEED: u64 = 0x2763_CAFE_0001;
const NETWORK_NAME: &str = "full-state-cache-test";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 3, 7, 3, 10, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0x63);
let contract_id = *contract.key().id();
let contract_key = contract.key();
let initial_state = SimOperation::create_test_state(1);
let update_state_2 = SimOperation::create_test_state(2);
let update_state_3 = SimOperation::create_test_state(3);
let update_state_4 = SimOperation::create_test_state(4);
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: initial_state,
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 3),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: update_state_2,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Update {
key: contract_key,
data: update_state_3,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Update {
key: contract_key,
data: update_state_4,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120), Duration::from_secs(60), );
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete successfully: {:?}",
result.turmoil_result.err()
);
let convergence =
rt.block_on(async { freenet::dev_tool::check_convergence_from_logs(&logs_handle).await });
tracing::info!(
"Convergence check: {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
for diverged in &convergence.diverged {
tracing::error!(
"DIVERGED: {} - {} unique states across {} peers",
diverged.contract_key,
diverged.unique_state_count(),
diverged.peer_states.len()
);
for (peer, hash) in &diverged.peer_states {
tracing::error!(" peer {}: {}", peer, hash);
}
}
assert!(
convergence.is_converged(),
"PR #2763 REGRESSION: State divergence detected! \
This indicates incorrect summary caching after full state sends. \
{} contracts converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
let resync_count = GlobalTestMetrics::resync_requests();
let delta_sends = GlobalTestMetrics::delta_sends();
let full_state_sends = GlobalTestMetrics::full_state_sends();
tracing::info!(
"Broadcast stats - delta_sends: {}, full_state_sends: {}, resync_requests: {}",
delta_sends,
full_state_sends,
resync_count
);
assert_eq!(
resync_count, 0,
"PR #2763 REGRESSION: {} ResyncRequests detected! \
This indicates deltas are failing due to incorrect summary caching. \
With the fix, no resyncs should be needed in this scenario.",
resync_count
);
let total_broadcasts = delta_sends + full_state_sends;
assert!(
total_broadcasts > 0,
"Expected at least one broadcast to occur during the simulation"
);
tracing::info!(
"test_full_state_send_no_incorrect_caching PASSED: \
{} peers converged, {} broadcasts ({} delta, {} full state), {} resyncs",
convergence
.converged
.iter()
.map(|c| c.replica_count)
.sum::<usize>(),
total_broadcasts,
delta_sends,
full_state_sends,
resync_count
);
}
#[test_log::test]
fn test_subscribe_forwarding_ack_relay() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation};
const SEED: u64 = 0x5CB6_F37C_0001;
const NETWORK_NAME: &str = "subscribe-fwd-ack";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 4, 7, 3, 5, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xAC);
let contract_id = *contract.key().id();
let initial_state = SimOperation::create_test_state(1);
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: initial_state,
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 3),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 4),
SimOperation::Subscribe { contract_id },
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(30),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation failed (ForwardingAck regression?): {:?}",
result.turmoil_result.err()
);
let (success_count, not_found_count) = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut successes = 0usize;
let mut not_found = 0usize;
for log in logs.iter() {
match log.kind.subscribe_outcome() {
Some(true) => successes += 1,
Some(false) => not_found += 1,
None => {}
}
}
(successes, not_found)
});
tracing::info!(
success_count,
not_found_count,
"Subscribe telemetry summary"
);
assert!(
success_count >= 4,
"Expected at least 4 successful subscribes, got {} (not_found={})",
success_count,
not_found_count,
);
}
#[test_log::test]
fn test_crdt_mode_version_tracking() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
const SEED: u64 = 0x0276_3CD0_0001;
const NETWORK_NAME: &str = "crdt-version-test";
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 2, 7, 3, 10, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xCD);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let state_v1 = SimOperation::create_crdt_state(1, 0x11);
let state_v2 = SimOperation::create_crdt_state(2, 0x22);
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: state_v1,
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: state_v2,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(60),
Duration::from_secs(30),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete: {:?}",
result.turmoil_result.err()
);
let convergence =
rt.block_on(async { freenet::dev_tool::check_convergence_from_logs(&logs_handle).await });
let resync_count = GlobalTestMetrics::resync_requests();
let delta_sends = GlobalTestMetrics::delta_sends();
let full_state_sends = GlobalTestMetrics::full_state_sends();
tracing::info!(
"CRDT mode test - convergence: {}/{}, delta_sends: {}, full_state_sends: {}, resyncs: {}",
convergence.converged.len(),
convergence.total_contracts(),
delta_sends,
full_state_sends,
resync_count
);
assert!(
convergence.is_converged(),
"CRDT mode: all peers should converge. {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
tracing::info!(
"test_crdt_mode_version_tracking PASSED: converged={}, resyncs={}",
convergence.converged.len(),
resync_count
);
}
#[test_log::test]
fn test_pr2763_crdt_convergence_with_resync() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
const SEED: u64 = 0x0276_3B06_0001;
const NETWORK_NAME: &str = "pr2763-crdt-resync";
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 2, 7, 3, 10, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xBB);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let state_v1_gw = SimOperation::create_crdt_state(1, 0xAA); let state_v5_node = SimOperation::create_crdt_state(5, 0xBB); let state_v10_gw = SimOperation::create_crdt_state(10, 0xCC);
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: state_v1_gw,
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Update {
key: contract_key,
data: state_v5_node,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: state_v10_gw,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(90),
Duration::from_secs(45),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete: {:?}",
result.turmoil_result.err()
);
let convergence =
rt.block_on(async { freenet::dev_tool::check_convergence_from_logs(&logs_handle).await });
let resync_count = GlobalTestMetrics::resync_requests();
let delta_sends = GlobalTestMetrics::delta_sends();
let full_state_sends = GlobalTestMetrics::full_state_sends();
tracing::info!(
"CRDT resync test - convergence: {}/{}, delta_sends: {}, full_state_sends: {}, resyncs: {}",
convergence.converged.len(),
convergence.total_contracts(),
delta_sends,
full_state_sends,
resync_count
);
for diverged in &convergence.diverged {
tracing::warn!(
"DIVERGED: {} - {} unique states",
diverged.contract_key,
diverged.unique_state_count()
);
for (peer, hash) in &diverged.peer_states {
tracing::warn!(" peer {}: {}", peer, hash);
}
}
assert!(
convergence.is_converged(),
"CRDT resync test: peers MUST converge via ResyncRequest recovery. {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
if resync_count > 0 {
tracing::info!(
"CRDT resync test: {} ResyncRequests occurred (expected for version mismatch recovery)",
resync_count
);
}
tracing::info!(
"test_pr2763_crdt_convergence_with_resync PASSED: converged={}, resyncs={}",
convergence.converged.len(),
resync_count
);
}
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
#[rstest::rstest]
#[case::n3_g1_s1("crdt-3n-1gw-s1", 0x2773_0003_0001, 1, 3)]
#[case::n3_g1_s2("crdt-3n-1gw-s2", 0x2773_0003_0002, 1, 3)]
#[case::n3_g1_s3("crdt-3n-1gw-s3", 0x2773_0003_0003, 1, 3)]
#[case::n3_g1_s4("crdt-3n-1gw-s4", 0x2773_0003_0007, 1, 3)]
#[case::n3_g1_s5("crdt-3n-1gw-s5", 0x2773_0003_0006, 1, 3)]
#[case::n5_g2_s1("crdt-5n-2gw-s1", 0x2773_0005_1021, 2, 5)]
#[case::n5_g2_s2("crdt-5n-2gw-s2", 0x2773_0005_1012, 2, 5)]
#[case::n5_g2_s3("crdt-5n-2gw-s3", 0x2773_0005_1003, 2, 5)]
#[case::n5_g2_s4("crdt-5n-2gw-s4", 0x2773_0005_2001, 2, 5)]
#[case::n5_g2_s5("crdt-5n-2gw-s5", 0x2773_0005_1005, 2, 5)]
#[case::n6_g2_s1("crdt-6n-2gw-s1", 0x2773_0006_1001, 2, 6)]
#[case::n6_g2_s2("crdt-6n-2gw-s2", 0x2773_0006_1002, 2, 6)]
#[case::n6_g2_s3("crdt-6n-2gw-s3", 0x2773_0006_1003, 2, 6)]
#[case::n6_g2_s4("crdt-6n-2gw-s4", 0x2773_0006_1004, 2, 6)]
#[case::n6_g2_s5("crdt-6n-2gw-s5", 0x2773_0006_1005, 2, 6)]
#[case::n4_g1_s1("crdt-4n-1gw-s1", 0x2773_0004_0001, 1, 4)]
#[case::n4_g1_s2("crdt-4n-1gw-s2", 0x2773_0004_0002, 1, 4)]
#[case::n4_g1_s3("crdt-4n-1gw-s3", 0x2773_0004_0003, 1, 4)]
#[case::n4_g1_s4("crdt-4n-1gw-s4", 0x2773_0004_0004, 1, 4)]
#[case::n4_g1_s5("crdt-4n-1gw-s5", 0x2773_0004_0005, 1, 4)]
#[case::n5_g1_s1("crdt-5n-1gw-s1", 0x2773_0005_0008, 1, 5)]
#[case::n5_g1_s2("crdt-5n-1gw-s2", 0x2773_0005_0012, 1, 5)]
#[case::n5_g1_s3("crdt-5n-1gw-s3", 0x2773_0005_0003, 1, 5)]
#[case::n5_g1_s4("crdt-5n-1gw-s4", 0x2773_0005_0004, 1, 5)]
#[case::n5_g1_s5("crdt-5n-1gw-s5", 0x2773_0005_0005, 1, 5)]
#[case::n6_g1_s1("crdt-6n-1gw-s1", 0x2773_0006_0001, 1, 6)]
#[case::n6_g1_s2("crdt-6n-1gw-s2", 0x2773_0006_0002, 1, 6)]
#[case::n6_g1_s3("crdt-6n-1gw-s3", 0x2773_0006_0003, 1, 6)]
#[case::n6_g1_s4("crdt-6n-1gw-s4", 0x2773_0006_0004, 1, 6)]
#[case::n6_g1_s5("crdt-6n-1gw-s5", 0x2773_0006_0005, 1, 6)]
#[case::n7_g1_s1("crdt-7n-1gw-s1", 0x2773_0007_0001, 1, 7)]
#[case::n7_g1_s2("crdt-7n-1gw-s2", 0x2773_0007_0002, 1, 7)]
#[case::n7_g1_s3("crdt-7n-1gw-s3", 0x2773_0007_0023, 1, 7)]
#[case::n7_g1_s4("crdt-7n-1gw-s4", 0x2773_0007_0010, 1, 7)]
#[case::n7_g1_s5("crdt-7n-1gw-s5", 0x2773_0007_0005, 1, 7)]
#[case::n8_g1_s1("crdt-8n-1gw-s1", 0x2773_0008_0001, 1, 8)]
#[case::n8_g1_s2("crdt-8n-1gw-s2", 0x2773_0008_0002, 1, 8)]
#[case::n8_g1_s3("crdt-8n-1gw-s3", 0x2773_0008_0003, 1, 8)]
#[case::n8_g1_s4("crdt-8n-1gw-s4", 0x2773_0008_0004, 1, 8)]
#[case::n8_g1_s5("crdt-8n-1gw-s5", 0x2773_0008_0005, 1, 8)]
fn test_crdt_convergence(
#[case] name: &'static str,
#[case] seed: u64,
#[case] gateways: usize,
#[case] nodes: usize,
) {
GlobalTestMetrics::reset();
setup_deterministic_state(seed);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(name, gateways, nodes, 7, 3, 10, 4, seed).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0x5F);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let mut operations = vec![ScheduledOperation::new(
NodeLabel::gateway(name, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0x00),
subscribe: true,
},
)];
for i in 1..=nodes {
operations.push(ScheduledOperation::new(
NodeLabel::node(name, i),
SimOperation::Subscribe { contract_id },
));
}
for i in 1..=nodes {
operations.push(ScheduledOperation::new(
NodeLabel::node(name, i),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(10 + i as u64, i as u8),
},
));
}
let result = sim.run_controlled_simulation(
seed,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"[{}] Simulation should complete: {:?}",
name,
result.turmoil_result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
let resync_count = GlobalTestMetrics::resync_requests();
tracing::info!(
"[{}] nodes={}, gateways={}, converged={}/{}, resyncs={}",
name,
nodes,
gateways,
convergence.converged.len(),
convergence.total_contracts(),
resync_count
);
for diverged in &convergence.diverged {
tracing::warn!(
"[{}] DIVERGED: {} - {} unique states across {} peers",
name,
diverged.contract_key,
diverged.unique_state_count(),
diverged.peer_states.len()
);
}
assert!(
convergence.is_converged(),
"[{}] {} nodes must converge. {} converged, {} diverged",
name,
nodes,
convergence.converged.len(),
convergence.diverged.len()
);
tracing::info!(
"[{}] PASSED: converged={}, resyncs={}",
name,
convergence.converged.len(),
resync_count
);
}
#[test_log::test]
fn test_concurrent_updates_from_n_sources() {
const SEED: u64 = 0xC0C0_BEEF_0002;
const NETWORK_NAME: &str = "concurrent-updates-n";
const NODE_COUNT: usize = 6;
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 2, NODE_COUNT, 10, 5, 15, 4, SEED).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xC0);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let mut operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0x00),
subscribe: true,
},
)];
for i in 1..=NODE_COUNT {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Subscribe { contract_id },
));
}
for i in 1..=NODE_COUNT {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(10 + i as u64, i as u8),
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete: {:?}",
result.turmoil_result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
let resync_count = GlobalTestMetrics::resync_requests();
tracing::info!(
"Concurrent N-source updates: converged={}/{}, resyncs={}",
convergence.converged.len(),
convergence.total_contracts(),
resync_count
);
for diverged in &convergence.diverged {
tracing::warn!(
"DIVERGED: {} - {} unique states across {} peers",
diverged.contract_key,
diverged.unique_state_count(),
diverged.peer_states.len()
);
}
assert!(
convergence.is_converged(),
"Concurrent N-way updates MUST converge. {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
tracing::info!(
"test_concurrent_updates_from_n_sources PASSED: converged={}, resyncs={}",
convergence.converged.len(),
resync_count
);
}
#[test_log::test]
fn test_stale_summary_cache_multiple_branches() {
const SEED: u64 = 0x57A1_E001_0001;
const NETWORK_NAME: &str = "stale-summaries";
const NODE_COUNT: usize = 4;
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 1, NODE_COUNT, 7, 3, 10, 2, SEED).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0x57);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let mut operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0x01),
subscribe: true,
},
)];
for i in 1..=NODE_COUNT {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Subscribe { contract_id },
));
}
for i in 1..=NODE_COUNT {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(10 + i as u64, i as u8),
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete: {:?}",
result.turmoil_result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
let resync_count = GlobalTestMetrics::resync_requests();
tracing::info!(
"Stale summary test: converged={}/{}, resyncs={}",
convergence.converged.len(),
convergence.total_contracts(),
resync_count
);
for diverged in &convergence.diverged {
tracing::warn!(
"DIVERGED: {} - {} unique states across {} peers",
diverged.contract_key,
diverged.unique_state_count(),
diverged.peer_states.len()
);
}
assert!(
convergence.is_converged(),
"Divergent state must converge via CRDT merge + ResyncRequest. {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
tracing::info!(
"test_stale_summary_cache_multiple_branches PASSED: converged={}, resyncs={}",
convergence.converged.len(),
resync_count
);
}
#[test_log::test]
fn test_max_downstream_limit_reached() {
const SEED: u64 = 0x0A0D_0001_0001;
const NETWORK_NAME: &str = "large-network";
const NODE_COUNT: usize = 12;
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 2, NODE_COUNT, 10, 5, 15, 3, SEED).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0x0A);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let mut operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0x00),
subscribe: true,
},
)];
for i in 1..=NODE_COUNT {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Subscribe { contract_id },
));
}
for i in 1..=NODE_COUNT {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(10 + i as u64, i as u8),
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300),
Duration::from_secs(150),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete: {:?}",
result.turmoil_result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
let resync_count = GlobalTestMetrics::resync_requests();
tracing::info!(
"Large network test: converged={}/{}, resyncs={}",
convergence.converged.len(),
convergence.total_contracts(),
resync_count
);
for diverged in &convergence.diverged {
tracing::warn!(
"DIVERGED: {} - {} unique states across {} peers",
diverged.contract_key,
diverged.unique_state_count(),
diverged.peer_states.len()
);
}
assert!(
convergence.is_converged(),
"Large network must converge. {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
tracing::info!(
"test_max_downstream_limit_reached PASSED: converged={}, resyncs={}",
convergence.converged.len(),
resync_count
);
}
#[test_log::test]
fn test_chain_topology_formation() {
const SEED: u64 = 0xC4A1_0001_0001;
const NETWORK_NAME: &str = "chain-topology";
const NODE_COUNT: usize = 4;
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 1, NODE_COUNT, 10, 5, 10, 2, SEED).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xC4);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let mut operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0x00),
subscribe: true,
},
)];
for i in 1..=NODE_COUNT {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Subscribe { contract_id },
));
}
for i in 1..=NODE_COUNT {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(10 + i as u64, i as u8),
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete: {:?}",
result.turmoil_result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
let resync_count = GlobalTestMetrics::resync_requests();
tracing::info!(
"Chain/sequential test: converged={}/{}, resyncs={}",
convergence.converged.len(),
convergence.total_contracts(),
resync_count
);
assert!(
convergence.is_converged(),
"All {} contracts should converge. Converged: {}, Diverged: {:?}",
convergence.total_contracts(),
convergence.converged.len(),
convergence.diverged
);
tracing::info!(
"test_chain_topology_formation PASSED: converged={}, resyncs={}",
convergence.converged.len(),
resync_count
);
}
#[test_log::test]
fn test_subscription_broadcast_propagation() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation};
const SEED: u64 = 0xBCAD_C057_0001; const NETWORK_NAME: &str = "broadcast-test";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 2, 7, 3, 10, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xBC);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0x01),
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(100, 0xFF), },
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120), Duration::from_secs(60), );
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete: {:?}",
result.turmoil_result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
tracing::info!(
"Broadcast propagation test: converged={}, diverged={}",
convergence.converged.len(),
convergence.diverged.len()
);
let contract_key_str = format!("{:?}", contract_key);
let peer_states: std::collections::BTreeMap<std::net::SocketAddr, String> =
rt.block_on(async {
let logs = logs_handle.lock().await;
let mut states = std::collections::BTreeMap::new();
for log in logs.iter() {
if let Some(key) = log.kind.contract_key() {
if format!("{:?}", key) == contract_key_str {
if let Some(hash) = log.kind.stored_state_hash() {
states.insert(log.peer_id.socket_addr(), hash.to_string());
}
}
}
}
states
});
tracing::info!(
"Contract {} has state on {} peers: {:?}",
contract_key_str,
peer_states.len(),
peer_states.keys().collect::<Vec<_>>()
);
assert!(
peer_states.len() >= 2,
"BUG: Update broadcast failed! Only {} peer(s) have state for contract {}. \
Expected at least 2 (gateway + subscriber). \
This indicates subscribers aren't receiving broadcasts (proximity_sources=0). \
Peers with state: {:?}",
peer_states.len(),
contract_key_str,
peer_states.keys().collect::<Vec<_>>()
);
let unique_states: std::collections::HashSet<&String> = peer_states.values().collect();
assert!(
unique_states.len() == 1,
"BUG: State divergence! {} unique states across {} peers for contract {}. \
States: {:?}",
unique_states.len(),
peer_states.len(),
contract_key_str,
peer_states
);
let broadcast_emitted_count: usize = rt.block_on(async {
let logs = logs_handle.lock().await;
logs.iter()
.filter(|log| log.kind.is_update_broadcast_emitted())
.count()
});
assert!(
broadcast_emitted_count > 0,
"Expected at least one UpdateEvent::BroadcastEmitted telemetry event, got 0. \
This means the emission wiring from #3622 is broken."
);
tracing::info!(
"test_subscription_broadcast_propagation PASSED: {} peers converged to same state, \
{} BroadcastEmitted events",
peer_states.len(),
broadcast_emitted_count
);
}
#[test_log::test]
fn test_subscription_relay_propagation() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
const SEED: u64 = 0xBEAD_FEED_3390;
const NETWORK_NAME: &str = "relay-sub-test";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 3, 7, 3, 10, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0x39);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let operations = vec![
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0x01),
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(100, 0xFE),
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(60),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation should complete: {:?}",
result.turmoil_result.err()
);
let contract_key_str = format!("{:?}", contract_key);
let peer_states: std::collections::BTreeMap<std::net::SocketAddr, String> =
rt.block_on(async {
let logs = logs_handle.lock().await;
let mut states = std::collections::BTreeMap::new();
for log in logs.iter() {
if let Some(key) = log.kind.contract_key() {
if format!("{:?}", key) == contract_key_str {
if let Some(hash) = log.kind.stored_state_hash() {
states.insert(log.peer_id.socket_addr(), hash.to_string());
}
}
}
}
states
});
tracing::info!(
"Relay subscription test: contract {} has state on {} peers: {:?}",
contract_key_str,
peer_states.len(),
peer_states.keys().collect::<Vec<_>>()
);
assert!(
peer_states.len() >= 3,
"BUG (#3390): Relay subscription failed! Only {} peer(s) have state for contract {}. \
Expected at least 3 (hoster + gateway relay + subscriber). \
The relay node (gateway) likely didn't register the subscriber as downstream. \
Peers with state: {:?}",
peer_states.len(),
contract_key_str,
peer_states.keys().collect::<Vec<_>>()
);
let unique_states: std::collections::HashSet<&String> = peer_states.values().collect();
assert!(
unique_states.len() == 1,
"State divergence in relay test! {} unique states across {} peers: {:?}",
unique_states.len(),
peer_states.len(),
peer_states
);
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"Anomaly report: {} anomalies across {} contracts",
report.anomalies.len(),
report.contracts_analyzed
);
tracing::info!(
"test_subscription_relay_propagation PASSED: {} peers converged via relay",
peer_states.len()
);
}
#[test_log::test]
#[cfg(feature = "nightly_tests")]
fn test_long_running_deterministic() {
const SEED: u64 = 0x1A00_2C00_72AC;
tracing::info!("=== Starting Long-Running Deterministic Simulation ===");
tracing::info!("Seed: 0x{:X}", SEED);
tracing::info!("Virtual time target: 3600 seconds (1 hour)");
let start_time = std::time::Instant::now();
TestConfig::long_running("long-running", SEED)
.run_direct()
.assert_ok()
.verify_operation_coverage()
.check_convergence();
let wall_clock = start_time.elapsed();
tracing::info!("=== Long-Running Simulation Complete ===");
tracing::info!("Wall clock time: {:.1} seconds", wall_clock.as_secs_f64());
tracing::info!(
"Time acceleration: {:.1}x",
3600.0 / wall_clock.as_secs_f64()
);
tracing::info!("test_long_running_deterministic PASSED");
}
#[test_log::test]
fn test_partition_heal_convergence() {
use freenet::dev_tool::NodeLabel;
use freenet::simulation::Partition;
const SEED: u64 = 0xDA27_0EA1_0001;
const NETWORK_NAME: &str = "partition-heal";
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle, node_addrs) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 5, 7,
3,
10,
2,
SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
let node_addrs: HashMap<NodeLabel, std::net::SocketAddr> = sim.all_node_addresses().clone();
(sim, logs_handle, node_addrs)
});
let addrs: Vec<std::net::SocketAddr> = node_addrs.values().copied().collect();
let mid = addrs.len() / 2;
let side_a: std::collections::HashSet<_> = addrs[..mid].iter().copied().collect();
let side_b: std::collections::HashSet<_> = addrs[mid..].iter().copied().collect();
let side_a_len = side_a.len();
let side_b_len = side_b.len();
let network_name = NETWORK_NAME.to_string();
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
SEED,
5, 100, Duration::from_secs(120), Duration::from_millis(500), move || async move {
tracing::info!(
"Injecting partition: side_a={} nodes, side_b={} nodes",
side_a_len,
side_b_len,
);
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
let partition = Partition::new(side_a, side_b).permanent(0);
state.config.add_partition(partition);
}
tokio::time::sleep(Duration::from_secs(15)).await;
tracing::info!("Partition active for 15s, now healing");
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
state.config.partitions.clear();
}
tokio::time::sleep(Duration::from_secs(15)).await;
tracing::info!("Post-heal convergence period complete");
Ok(())
},
);
assert!(
result.is_ok(),
"Partition-heal simulation failed: {:?}",
result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
tracing::info!(
"Partition-heal: {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"=== PARTITION-HEAL ANOMALY REPORT: {} events, {} state, {} contracts, {} anomalies ===",
report.total_events,
report.state_events,
report.contracts_analyzed,
report.anomalies.len()
);
let divergences = report.divergences();
let missing = report.missing_broadcasts();
let partitions = report.suspected_partitions();
let stale = report.stale_peers();
let oscillations = report.state_oscillations();
tracing::warn!(
" divergences={}, missing_broadcasts={}, partitions={}, stale={}, oscillations={}",
divergences.len(),
missing.len(),
partitions.len(),
stale.len(),
oscillations.len(),
);
for (i, anomaly) in report.anomalies.iter().enumerate() {
tracing::debug!(" anomaly[{}] = {:?}", i, anomaly);
}
}
#[test_log::test]
fn test_crash_recover_convergence() {
use freenet::dev_tool::NodeLabel;
const SEED: u64 = 0x2011_2E57_0002;
const NETWORK_NAME: &str = "crash-recover";
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle, node_addrs) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 1, 5, 7, 3, 10, 2, SEED).await;
let logs_handle = sim.event_logs_handle();
let node_addrs: HashMap<NodeLabel, std::net::SocketAddr> = sim.all_node_addresses().clone();
(sim, logs_handle, node_addrs)
});
let crash_addrs: Vec<std::net::SocketAddr> = node_addrs
.iter()
.filter(|(label, _)| label.is_node())
.take(2)
.map(|(_, addr)| *addr)
.collect();
assert_eq!(crash_addrs.len(), 2, "Need at least 2 non-gateway nodes");
let network_name = NETWORK_NAME.to_string();
let crash_addrs_clone = crash_addrs.clone();
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
SEED,
3, 100, Duration::from_secs(120), Duration::from_millis(500), move || async move {
tracing::info!("Crashing 2 nodes: {:?}", crash_addrs_clone);
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
for addr in &crash_addrs_clone {
state.config.crash_node(*addr);
}
}
tokio::time::sleep(Duration::from_secs(10)).await;
tracing::info!("Degraded period over, recovering nodes");
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
for addr in &crash_addrs_clone {
state.config.recover_node(addr);
}
}
tokio::time::sleep(Duration::from_secs(15)).await;
tracing::info!("Post-recovery convergence period complete");
Ok(())
},
);
assert!(
result.is_ok(),
"Crash-recover simulation failed: {:?}",
result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
tracing::info!(
"Crash-recover: {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"=== CRASH-RECOVER ANOMALY REPORT: {} events, {} state, {} contracts, {} anomalies ===",
report.total_events,
report.state_events,
report.contracts_analyzed,
report.anomalies.len()
);
let divergences = report.divergences();
let stale = report.stale_peers();
let zombies = report.zombie_transactions();
let oscillations = report.state_oscillations();
tracing::warn!(
" divergences={}, stale_peers={}, zombies={}, oscillations={}",
divergences.len(),
stale.len(),
zombies.len(),
oscillations.len(),
);
for (i, anomaly) in report.anomalies.iter().enumerate() {
tracing::debug!(" anomaly[{}] = {:?}", i, anomaly);
}
}
#[test_log::test]
fn test_multi_step_churn() {
use freenet::dev_tool::NodeLabel;
const SEED: u64 = 0xC402_0000_0002;
const NETWORK_NAME: &str = "multi-churn";
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle, node_addrs) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 1, 4, 7, 3, 10, 2, SEED).await;
let logs_handle = sim.event_logs_handle();
let node_addrs: HashMap<NodeLabel, std::net::SocketAddr> = sim.all_node_addresses().clone();
(sim, logs_handle, node_addrs)
});
let churn_addrs: Vec<std::net::SocketAddr> = node_addrs
.iter()
.filter(|(label, _)| label.is_node())
.map(|(_, addr)| *addr)
.collect();
let network_name = NETWORK_NAME.to_string();
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
SEED,
3, 100, Duration::from_secs(150), Duration::from_millis(500), move || async move {
const CHURN_ROUNDS: usize = 3;
for round in 0..CHURN_ROUNDS {
let target = churn_addrs[round % churn_addrs.len()];
tracing::info!("Churn round {}: crashing {:?}", round, target);
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
state.config.crash_node(target);
}
tokio::time::sleep(Duration::from_secs(5)).await;
tracing::info!("Churn round {}: recovering {:?}", round, target);
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
state.config.recover_node(&target);
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
tokio::time::sleep(Duration::from_secs(10)).await;
tracing::info!("Multi-step churn: all {} rounds complete", CHURN_ROUNDS);
Ok(())
},
);
assert!(
result.is_ok(),
"Multi-step churn simulation failed: {:?}",
result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
tracing::info!(
"Multi-step churn: {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"=== MULTI-STEP CHURN ANOMALY REPORT: {} events, {} state, {} contracts, {} anomalies ===",
report.total_events,
report.state_events,
report.contracts_analyzed,
report.anomalies.len()
);
let divergences = report.divergences();
let missing = report.missing_broadcasts();
let stale = report.stale_peers();
let zombies = report.zombie_transactions();
let oscillations = report.state_oscillations();
let cascades = report.delta_sync_cascades();
tracing::warn!(
" divergences={}, missing={}, stale={}, zombies={}, oscillations={}, cascades={}",
divergences.len(),
missing.len(),
stale.len(),
zombies.len(),
oscillations.len(),
cascades.len(),
);
for (i, anomaly) in report.anomalies.iter().enumerate() {
tracing::debug!(" anomaly[{}] = {:?}", i, anomaly);
}
}
#[test_log::test]
fn test_determinism_parallel_safe() {
const SEED: u64 = 0x0A2A_11E1_0001;
#[derive(Debug, PartialEq)]
struct Trace {
event_counts: HashMap<String, usize>,
event_sequence: Vec<String>,
total_events: usize,
}
fn run_and_trace(name: &str, seed: u64) -> (turmoil::Result, Trace) {
setup_deterministic_state(seed);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(name, 1, 5, 7, 3, 10, 2, seed).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
seed,
3,
15,
Duration::from_secs(20),
Duration::from_millis(200),
|| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
},
);
let trace = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut event_counts: HashMap<String, usize> = HashMap::new();
let mut event_sequence: Vec<String> = Vec::new();
for log in logs.iter() {
let kind_name = log.kind.variant_name().to_string();
*event_counts.entry(kind_name.clone()).or_insert(0) += 1;
event_sequence.push(kind_name);
}
Trace {
total_events: logs.len(),
event_counts,
event_sequence,
}
});
(result, trace)
}
let (result1, trace1) = run_and_trace("parallel-safe-run1", SEED);
let (result2, trace2) = run_and_trace("parallel-safe-run2", SEED);
let (result3, trace3) = run_and_trace("parallel-safe-run3", SEED);
assert_eq!(
result1.is_ok(),
result2.is_ok(),
"Simulation outcomes differ: run1={:?}, run2={:?}",
result1,
result2
);
assert_eq!(result2.is_ok(), result3.is_ok());
assert!(trace1.total_events > 0, "No events captured");
assert_eq!(
trace1.total_events, trace2.total_events,
"Total events differ: {} vs {}",
trace1.total_events, trace2.total_events
);
assert_eq!(trace2.total_events, trace3.total_events);
assert_eq!(trace1.event_counts, trace2.event_counts);
assert_eq!(trace2.event_counts, trace3.event_counts);
assert_eq!(trace1.event_sequence, trace2.event_sequence);
assert_eq!(trace2.event_sequence, trace3.event_sequence);
let fp1 = TraceFingerprint::from_events(&trace1.event_sequence, &trace1.event_counts);
let fp2 = TraceFingerprint::from_events(&trace2.event_sequence, &trace2.event_counts);
let fp3 = TraceFingerprint::from_events(&trace3.event_sequence, &trace3.event_counts);
assert_eq!(fp1, fp2, "Fingerprint mismatch between run 1 and run 2");
assert_eq!(fp2, fp3, "Fingerprint mismatch between run 2 and run 3");
tracing::info!(
"PARALLEL-SAFE DETERMINISM PASSED: {} events, fingerprint={:#018x}",
trace1.total_events,
fp1.sequence_hash
);
}
#[test_log::test]
fn test_determinism_across_threads() {
const SEED: u64 = 0xCE05_7EAD_0001;
fn run_on_thread(name: &'static str, seed: u64) -> std::thread::JoinHandle<usize> {
std::thread::spawn(move || {
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000;
const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + (seed % RANGE_MS));
GlobalTestMetrics::reset();
RequestId::reset_counter();
freenet::dev_tool::ClientId::reset_counter();
reset_event_id_counter();
reset_channel_id_counter();
StreamId::reset_counter();
reset_nonce_counter();
reset_global_node_index();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime");
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(name, 1, 3, 7, 3, 10, 2, seed).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let _result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
seed,
3,
10,
Duration::from_secs(20),
Duration::from_millis(200),
|| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
},
);
rt.block_on(async { logs_handle.lock().await.len() })
})
}
let handle1 = run_on_thread("cross-thread-run1", SEED);
let handle2 = run_on_thread("cross-thread-run2", SEED);
let events1 = handle1.join().expect("Thread 1 panicked");
let events2 = handle2.join().expect("Thread 2 panicked");
assert!(events1 > 0, "Thread 1 should produce events, got 0");
assert!(events2 > 0, "Thread 2 should produce events, got 0");
tracing::info!(
"CROSS-THREAD ISOLATION PASSED: thread1={} events, thread2={} events",
events1,
events2
);
}
#[test_log::test]
fn test_direct_runner_determinism() {
const SEED: u64 = 0xD12E_C7DE_7000;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct EventKey {
tx: freenet::dev_tool::Transaction,
peer_addr: std::net::SocketAddr,
event_kind_name: String,
contract_key: Option<String>,
state_hash: Option<String>,
}
#[derive(Debug)]
struct SimulationTrace {
event_counts: HashMap<String, usize>,
event_sequence: Vec<String>,
event_keys: Vec<EventKey>,
total_events: usize,
}
fn run_and_trace(name: &str, seed: u64) -> SimulationTrace {
setup_deterministic_state(seed);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
name, 2, 4, 10, 3, 10, 3, seed,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
drop(rt);
sim.run_simulation_direct::<rand::rngs::SmallRng>(
seed,
10, 30, Duration::from_millis(200),
)
.expect("Direct simulation should succeed");
let rt = create_runtime();
rt.block_on(async {
let logs = logs_handle.lock().await;
let mut event_counts: HashMap<String, usize> = HashMap::new();
let mut event_sequence: Vec<String> = Vec::new();
let mut event_keys: Vec<EventKey> = logs
.iter()
.map(|log| {
let event_kind_name = log.kind.variant_name().to_string();
let contract_key = log.kind.contract_key().map(|k| format!("{:?}", k));
let state_hash = log.kind.state_hash().map(String::from);
*event_counts.entry(event_kind_name.clone()).or_insert(0) += 1;
event_sequence.push(event_kind_name.clone());
EventKey {
tx: log.tx,
peer_addr: log.peer_id.socket_addr(),
event_kind_name,
contract_key,
state_hash,
}
})
.collect();
event_keys.sort();
SimulationTrace {
total_events: logs.len(),
event_counts,
event_sequence,
event_keys,
}
})
}
let trace1 = run_and_trace("direct-det-run1", SEED);
let trace2 = run_and_trace("direct-det-run2", SEED);
let trace3 = run_and_trace("direct-det-run3", SEED);
assert!(trace1.total_events > 0, "Run 1 should produce events");
assert!(trace2.total_events > 0, "Run 2 should produce events");
assert!(trace3.total_events > 0, "Run 3 should produce events");
assert_eq!(
trace1.total_events, trace2.total_events,
"DIRECT DETERMINISM FAILURE: Total event counts differ (run1 vs run2)!"
);
assert_eq!(
trace2.total_events, trace3.total_events,
"DIRECT DETERMINISM FAILURE: Total event counts differ (run2 vs run3)!"
);
assert_eq!(
trace1.event_counts, trace2.event_counts,
"DIRECT DETERMINISM FAILURE: Event type counts differ (run1 vs run2)!"
);
assert_eq!(
trace2.event_counts, trace3.event_counts,
"DIRECT DETERMINISM FAILURE: Event type counts differ (run2 vs run3)!"
);
for (i, ((e1, e2), e3)) in trace1
.event_sequence
.iter()
.zip(trace2.event_sequence.iter())
.zip(trace3.event_sequence.iter())
.enumerate()
{
assert_eq!(
e1, e2,
"Event sequence differs at index {} (run1 vs run2)!",
i
);
assert_eq!(
e2, e3,
"Event sequence differs at index {} (run2 vs run3)!",
i
);
}
assert_eq!(
trace1.event_keys, trace2.event_keys,
"DIRECT DETERMINISM FAILURE: Sorted event keys differ (run1 vs run2)!"
);
assert_eq!(
trace2.event_keys, trace3.event_keys,
"DIRECT DETERMINISM FAILURE: Sorted event keys differ (run2 vs run3)!"
);
let fp1 = TraceFingerprint::from_events(&trace1.event_sequence, &trace1.event_counts);
let fp2 = TraceFingerprint::from_events(&trace2.event_sequence, &trace2.event_counts);
let fp3 = TraceFingerprint::from_events(&trace3.event_sequence, &trace3.event_counts);
assert_eq!(fp1, fp2, "Fingerprint mismatch between run 1 and run 2");
assert_eq!(fp2, fp3, "Fingerprint mismatch between run 2 and run 3");
tracing::info!(
"DIRECT RUNNER DETERMINISM PASSED: {} events, fingerprint={:#018x}",
trace1.total_events,
fp1.sequence_hash
);
}
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_suspend_resume_zombie_connections() {
const SEED: u64 = 0xDEAD_BEEF_3005;
const NETWORK_NAME: &str = "zombie-connections";
tracing::info!("=== Testing Zombie Connection Bug (PR #3005) ===");
let mut sim = SimNetwork::new(NETWORK_NAME, 1, 2, 7, 3, 10, 2, SEED).await;
sim.with_start_backoff(Duration::from_millis(50));
let _handles = sim
.start_with_rand_gen::<rand::rngs::SmallRng>(SEED, 5, 10)
.await;
tracing::info!("Phase 1: Network startup and stabilization");
tokio::time::sleep(Duration::from_secs(2)).await;
let initial_time = sim.virtual_time().now_nanos();
tracing::info!("Initial virtual time: {}ns", initial_time);
sim.check_connectivity(Duration::from_secs(10))
.await
.expect("Initial connectivity check should pass");
tracing::info!("✓ Network connectivity established");
let node_to_suspend = sim
.all_node_addresses()
.keys()
.find(|label| label.is_node())
.cloned()
.expect("Should have at least one node");
tracing::info!(?node_to_suspend, "Phase 2: Simulating suspend (crash node)");
let crashed = sim.crash_node(&node_to_suspend);
assert!(crashed, "Node should crash successfully");
assert!(sim.is_node_crashed(&node_to_suspend));
tracing::info!("✓ Node crashed (suspend simulated)");
tracing::info!("Phase 3: Simulating time passage during suspend");
let suspend_duration = Duration::from_secs(3600);
sim.virtual_time().advance(suspend_duration);
let time_after_suspend = sim.virtual_time().now_nanos();
tracing::info!(
"✓ Advanced virtual time by {} seconds (now: {}ns)",
suspend_duration.as_secs(),
time_after_suspend
);
tokio::time::sleep(Duration::from_millis(100)).await;
tracing::info!(
?node_to_suspend,
"Phase 4: Simulating resume (restart node)"
);
let restart_seed = SEED.wrapping_add(0x1000);
let handle = sim
.restart_node::<rand::rngs::SmallRng>(&node_to_suspend, restart_seed, 5, 5)
.await;
assert!(handle.is_some(), "Node should restart successfully");
assert!(!sim.is_node_crashed(&node_to_suspend));
tracing::info!("✓ Node restarted (resume simulated)");
tokio::time::sleep(Duration::from_secs(3)).await;
tracing::info!("Phase 5: Checking for zombie connections");
let connectivity_result = sim.check_connectivity(Duration::from_secs(15)).await;
match connectivity_result {
Ok(()) => {
tracing::info!("✓ Connectivity restored after restart");
tracing::info!(" FIX WORKING: No zombie connections blocking reconnection");
tracing::info!(" (or DropAllConnections was called on resume)");
}
Err(e) => {
tracing::error!("✗ Connectivity check failed after restart: {}", e);
tracing::error!("BUG DETECTED: Zombie connections blocking reconnection");
tracing::error!("");
tracing::error!("Root cause analysis:");
tracing::error!(" - Node crashed without proper cleanup");
tracing::error!(" - Old connection HashMap entries still exist");
tracing::error!(" - CONNECT messages sent through dead transport sockets");
tracing::error!(" - Gateway doesn't respond (socket handle invalid)");
tracing::error!(" - Bootstrap hangs waiting for response");
tracing::error!("");
tracing::error!("Expected fix: Call DropAllConnections in ops_after_resume()");
tracing::error!("See PR #3005 for details");
panic!("Zombie connection bug detected! Connectivity failed after node restart");
}
}
tracing::info!("=== Zombie Connection Test Complete ===");
}
#[test_log::test]
#[cfg(feature = "nightly_tests")]
fn test_subscription_renewal_at_scale() {
const SEED: u64 = 0x2995_CAFE_BABE;
tracing::info!("=== Subscription Renewal at Scale (250+ contracts) ===");
let start_time = std::time::Instant::now();
let config = TestConfig {
name: "subscription-renewal-scale",
seed: SEED,
gateways: 1,
nodes: 2,
ring_max_htl: 7,
rnd_if_htl_above: 3,
max_connections: 10,
min_connections: 2,
max_contracts: 250, iterations: 300, duration: Duration::from_secs(120), event_wait: Duration::from_millis(200),
sleep_after_events: Duration::from_secs(40), require_convergence: false,
latency_range: None,
message_loss_rate: 0.0,
use_mock_wasm: false,
};
let max_contracts = config.max_contracts;
tracing::info!(
"Testing with {} contracts, {} operations",
max_contracts,
config.iterations
);
let result = config.run();
let rt = create_runtime();
let logs = rt.block_on(async { result.logs_handle.lock().await.clone() });
let mut renewal_attempts = 0;
let mut channel_warnings = 0;
for log in &logs {
let msg = format!("{:?}", log);
if msg.contains("Subscription renewal: attempted") {
renewal_attempts += 1;
}
if msg.contains("Notification channel") && msg.contains("full") {
channel_warnings += 1;
tracing::error!("Channel saturation detected: {}", msg);
}
}
let wall_clock = start_time.elapsed();
tracing::info!(
"Completed in {:.1}s: {} renewal cycles, {} channel warnings",
wall_clock.as_secs_f64(),
renewal_attempts,
channel_warnings
);
result.assert_ok();
tracing::info!("Subscription renewal cycles observed: {}", renewal_attempts);
assert_eq!(
channel_warnings, 0,
"Channel saturation detected - regression in PR #2995/#3146 fix! \
The subscription renewal rate-limiting may not be working correctly."
);
tracing::info!(
"✓ No channel saturation with {} contracts (PR #2995/#3146 fix working)",
max_contracts
);
}
#[test_log::test]
fn test_router_accumulates_feedback_events() {
let result = TestConfig::small("router-feedback", 0x0FEE_DBAC_0001)
.with_nodes(4)
.with_max_contracts(5)
.with_iterations(50)
.with_duration(Duration::from_secs(60))
.with_sleep(Duration::from_secs(2))
.run()
.assert_ok();
let rt = create_runtime();
let (route_count, route_by_peer) = rt.block_on(async {
let logs = result.logs_handle.lock().await;
let route_count = logs
.iter()
.filter(|log| log.kind.variant_name() == "Route")
.count();
let mut by_peer: HashMap<String, usize> = HashMap::new();
for log in logs.iter().filter(|l| l.kind.variant_name() == "Route") {
*by_peer
.entry(format!("{}", log.peer_id.socket_addr()))
.or_default() += 1;
}
(route_count, by_peer)
});
tracing::info!("=== ROUTER FEEDBACK TEST ===");
tracing::info!(
"Total Route events (routing_finished calls): {}",
route_count
);
for (peer, count) in &route_by_peer {
tracing::info!(" peer {}: {} route events", peer, count);
}
assert!(
route_count > 0,
"No Route events in logs - routing_finished never called. \
The router is not receiving any feedback from completed operations."
);
tracing::info!(
"ROUTER FEEDBACK TEST PASSED: {} route events across {} peers",
route_count,
route_by_peer.len()
);
}
#[test_log::test]
fn test_router_accumulates_failure_events_with_message_loss() {
let result = TestConfig::small("router-faults", 0xFA17_0055_0001)
.with_nodes(4)
.with_max_contracts(5)
.with_iterations(60)
.with_duration(Duration::from_secs(60))
.with_sleep(Duration::from_secs(2))
.with_message_loss(0.15)
.run()
.assert_ok();
let rt = create_runtime();
let (route_count, timeout_count) = rt.block_on(async {
let logs = result.logs_handle.lock().await;
let route_count = logs
.iter()
.filter(|log| log.kind.variant_name() == "Route")
.count();
let timeout_count = logs
.iter()
.filter(|log| log.kind.variant_name() == "Timeout")
.count();
(route_count, timeout_count)
});
tracing::info!("=== ROUTER FAULT INJECTION TEST ===");
tracing::info!("Route events (routing_finished): {}", route_count);
tracing::info!("Timeout events: {}", timeout_count);
assert!(
route_count > 0,
"No Route events with 15% message loss - router not receiving any feedback"
);
tracing::info!(
"ROUTER FAULT TEST PASSED: route_events={}, timeouts={}",
route_count,
timeout_count
);
}
#[test_log::test]
fn test_router_prediction_threshold_activation() {
let result = TestConfig::small("router-threshold", 0xACE5_0FD0_0001)
.with_nodes(2)
.with_max_contracts(6)
.with_iterations(1500)
.with_event_wait(Duration::from_millis(500))
.with_duration(Duration::from_secs(900))
.with_sleep(Duration::from_secs(2))
.run()
.assert_ok();
let rt = create_runtime();
let (route_count, route_by_peer) = rt.block_on(async {
let logs = result.logs_handle.lock().await;
let route_count = logs
.iter()
.filter(|log| log.kind.variant_name() == "Route")
.count();
let mut by_peer: HashMap<String, usize> = HashMap::new();
for log in logs.iter().filter(|l| l.kind.variant_name() == "Route") {
*by_peer
.entry(format!("{}", log.peer_id.socket_addr()))
.or_default() += 1;
}
(route_count, by_peer)
});
tracing::info!("=== ROUTER THRESHOLD ACTIVATION TEST ===");
tracing::info!("Total Route events: {}", route_count);
for (peer, count) in &route_by_peer {
tracing::info!(" peer {}: {} route events", peer, count);
}
let snapshots = result.router_snapshots();
tracing::info!("RouterSnapshot events captured: {}", snapshots.len());
for (i, (failure, success, active)) in snapshots.iter().enumerate() {
tracing::info!(
" snapshot[{}]: failure_events={}, success_events={}, prediction_active={}",
i,
failure,
success,
active
);
}
let max_per_peer = route_by_peer.values().max().copied().unwrap_or(0);
tracing::info!("Max route events on a single peer: {}", max_per_peer);
assert!(
route_count >= 50,
"Only {} total Route events from 300 iterations - \
not enough feedback flowing to the router",
route_count
);
if !snapshots.is_empty() {
let any_active = snapshots.iter().any(|(_, _, active)| *active);
let max_failure_events = snapshots
.iter()
.map(|(failure, _, _)| *failure)
.max()
.unwrap();
if any_active {
tracing::info!(
"ROUTER THRESHOLD TEST PASSED: prediction activated with {} failure_events (threshold=50)",
max_failure_events
);
} else {
tracing::info!(
"RouterSnapshot shows failure_events={} (threshold=50). \
Route events per peer: {:?}. \
Note: Route events may not all be reflected in failure_events \
if the snapshot was captured before all operations completed.",
max_failure_events,
route_by_peer
);
}
}
tracing::info!(
"ROUTER THRESHOLD TEST PASSED: {} total route events, max {} per peer (threshold=50)",
route_count,
max_per_peer
);
}
#[test]
#[cfg(feature = "simulation_tests")]
fn test_pending_op_results_bounded() {
let result = TestConfig::medium("pending-op-bounded", 0x3100_0001).run();
result.assert_ok().verify_state_report();
let inserts = freenet::config::GlobalTestMetrics::pending_op_inserts();
let removes = freenet::config::GlobalTestMetrics::pending_op_removes();
let hwm = freenet::config::GlobalTestMetrics::pending_op_high_water_mark();
tracing::info!(inserts, removes, hwm, "pending_op_results resource metrics");
if inserts == 0 {
tracing::info!(
"pending_op_results path not exercised (no production OpCtx caller yet — see #1454 phase 2b)"
);
}
assert!(
hwm <= 100,
"pending_op_results high-water mark ({hwm}) exceeded bound of 100 — \
regression of #3100 (unbounded HashMap growth)"
);
let leak = inserts.saturating_sub(removes);
assert!(
leak <= 10,
"pending_op_results leak at shutdown: {leak} entries \
(inserts={inserts}, removes={removes}) — significant leak detected"
);
}
#[test]
#[cfg(feature = "simulation_tests")]
fn test_neighbor_hosting_bounded() {
let result = TestConfig::medium("neighbor-hosting-bounded", 0x3100_0002).run();
result.assert_ok().verify_state_report();
let updates = freenet::config::GlobalTestMetrics::neighbor_hosting_updates();
tracing::info!(updates, "neighbor hosting resource metrics");
assert!(
updates > 0,
"Neighbor hosting should have been exercised (updates = 0)"
);
assert!(
updates <= 500,
"neighbor_hosting_updates ({updates}) exceeded bound of 500 — \
excessive cache churn for an 8-peer / 8-contract network"
);
}
#[test]
#[cfg(feature = "simulation_tests")]
fn test_anti_starvation_exercised() {
let result = TestConfig::medium("anti-starvation", 0x3094_0002)
.with_iterations(150)
.with_max_contracts(10)
.run();
let triggers = freenet::config::GlobalTestMetrics::anti_starvation_triggers();
tracing::info!(triggers, "anti-starvation trigger count");
result.assert_ok().verify_state_report().check_convergence();
}
fn verify_contract_propagation(
rt: &tokio::runtime::Runtime,
logs_handle: &Arc<Mutex<Vec<freenet::tracing::NetLogMessage>>>,
contract_key: freenet_stdlib::prelude::ContractKey,
min_peers: usize,
) -> BTreeMap<SocketAddr, String> {
let contract_key_str = format!("{:?}", contract_key);
let peer_states: BTreeMap<SocketAddr, String> = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut states = BTreeMap::new();
for log in logs.iter() {
if let Some(key) = log.kind.contract_key() {
if format!("{:?}", key) == contract_key_str {
if let Some(hash) = log.kind.stored_state_hash() {
states.insert(log.peer_id.socket_addr(), hash.to_string());
}
}
}
}
states
});
tracing::info!(
"Contract {} has state on {} peers (need {}): {:?}",
contract_key_str,
peer_states.len(),
min_peers,
peer_states.keys().collect::<Vec<_>>()
);
assert!(
peer_states.len() >= min_peers,
"Contract {} propagation failed: only {} peer(s) have state, expected at least {}. \
Peers with state: {:?}",
contract_key_str,
peer_states.len(),
min_peers,
peer_states.keys().collect::<Vec<_>>()
);
let unique_states: std::collections::HashSet<&String> = peer_states.values().collect();
assert!(
unique_states.len() == 1,
"Contract {} state divergence: {} unique states across {} peers. States: {:?}",
contract_key_str,
unique_states.len(),
peer_states.len(),
peer_states
);
peer_states
}
#[test_log::test]
fn test_six_peer_contract_lifecycle() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
const SEED: u64 = 0x3151_0001_0001;
const NETWORK_NAME: &str = "six-peer-lifecycle";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 2, 4, 10, 5, 15, 3, SEED).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract_a = SimOperation::create_test_contract(0xA1);
let contract_a_id = *contract_a.key().id();
let contract_a_key = contract_a.key();
register_crdt_contract(contract_a_id);
let contract_b = SimOperation::create_test_contract(0xB2);
let contract_b_id = *contract_b.key().id();
let contract_b_key = contract_b.key();
register_crdt_contract(contract_b_id);
let mut operations = Vec::new();
for (contract, contract_id, contract_key, seed_byte) in [
(contract_a.clone(), contract_a_id, contract_a_key, 0xA1u8),
(contract_b.clone(), contract_b_id, contract_b_key, 0xB2u8),
] {
operations.push(ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, seed_byte),
subscribe: true,
},
));
operations.push(ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 1),
SimOperation::Subscribe { contract_id },
));
for i in 2..=5 {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Subscribe { contract_id },
));
}
for v in [10u64, 20, 30] {
operations.push(ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(v, seed_byte),
},
));
}
for v in [40u64, 50] {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(v, seed_byte),
},
));
}
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(240),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Six-peer lifecycle simulation failed: {:?}",
result.turmoil_result.err()
);
let states_a = verify_contract_propagation(&rt, &logs_handle, contract_a_key, 4);
let states_b = verify_contract_propagation(&rt, &logs_handle, contract_b_key, 4);
tracing::info!(
"test_six_peer_contract_lifecycle PASSED: contract_a on {} peers, contract_b on {} peers",
states_a.len(),
states_b.len()
);
}
#[test_log::test]
fn test_six_peer_lifecycle_mock_wasm() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
const SEED: u64 = 0x3151_0002_0001;
const NETWORK_NAME: &str = "six-peer-mock-wasm";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (mut sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 2, 4, 10, 5, 15, 3, SEED).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
sim.use_mock_wasm = true;
let contract = SimOperation::create_test_contract(0xC3);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let mut operations = vec![ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0xC3),
subscribe: true,
},
)];
operations.push(ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 1),
SimOperation::Subscribe { contract_id },
));
for i in 2..=5 {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Subscribe { contract_id },
));
}
for v in [10u64, 20, 30] {
operations.push(ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(v, 0xC3),
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(240),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"MockWasm lifecycle simulation failed: {:?}",
result.turmoil_result.err()
);
verify_contract_propagation(&rt, &logs_handle, contract_key, 4);
tracing::info!("test_six_peer_lifecycle_mock_wasm PASSED");
}
#[test_log::test]
fn test_late_joiner_receives_current_state() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
const SEED: u64 = 0x3151_0003_0001;
const NETWORK_NAME: &str = "late-joiner";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(NETWORK_NAME, 1, 3, 7, 3, 10, 2, SEED).await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xD4);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0xD4),
subscribe: true,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 2),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(10, 0xD4),
},
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(20, 0xD4),
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 3),
SimOperation::Subscribe { contract_id },
),
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(30, 0xD4),
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(180),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Late joiner simulation failed: {:?}",
result.turmoil_result.err()
);
let peer_states = verify_contract_propagation(&rt, &logs_handle, contract_key, 3);
let node3_label = NodeLabel::node(NETWORK_NAME, 3);
let node3_storage = result
.node_storages
.get(&node3_label)
.expect("node 3 should have a storage handle");
let node3_state = node3_storage.get_stored_state(&contract_key);
assert!(
node3_state.is_some(),
"Late joiner (node 3) should have contract state, but storage is empty. \
This indicates the subscribe-after-updates path failed to fetch current state."
);
tracing::info!(
"test_late_joiner_receives_current_state PASSED: {} peers converged, \
late joiner confirmed with state",
peer_states.len()
);
}
#[test_log::test]
#[cfg(feature = "nightly_tests")]
fn test_subscribe_renewal_long_running() {
const SEED: u64 = 0x3151_0004_0001;
tracing::info!("=== Starting Subscription Renewal Long-Running Test ===");
TestConfig::long_running("sub-renewal-long", SEED)
.with_mock_wasm()
.run_direct()
.assert_ok()
.verify_operation_coverage()
.verify_state_report();
tracing::info!("test_subscribe_renewal_long_running PASSED");
}
#[test_log::test]
fn test_topology_subscribe_health() {
const SEED: u64 = 0x3152_0001_0001;
tracing::info!("=== Starting Topology Subscribe Health Test ===");
let result = TestConfig::medium("sub-health", SEED)
.with_iterations(100)
.with_duration(Duration::from_secs(60))
.run()
.assert_ok();
let rt = create_runtime();
let (successes, failures) = rt.block_on(async {
let logs = result.logs_handle.lock().await;
let mut successes = 0u64;
let mut failures = 0u64;
for log in logs.iter() {
match log.kind.subscribe_outcome() {
Some(true) => successes += 1,
Some(false) => failures += 1,
None => {}
}
}
(successes, failures)
});
let total = successes + failures;
assert!(
total >= 5,
"Only {} subscribe outcome events in {} total events — \
too few for meaningful success rate measurement",
total,
result.event_count
);
let success_rate = successes as f64 / total as f64;
tracing::info!(
"Subscribe health: {}/{} succeeded ({:.1}%)",
successes,
total,
success_rate * 100.0
);
assert!(
success_rate >= 0.70,
"Subscribe success rate {:.1}% is below 70% threshold \
({} succeeded, {} failed out of {} total). \
See #3138 for context on subscribe rate regressions.",
success_rate * 100.0,
successes,
failures,
total
);
tracing::info!(
"test_topology_subscribe_health PASSED: {:.1}% success rate",
success_rate * 100.0
);
}
#[test_log::test]
fn test_router_learning() {
const SEED: u64 = 0x3152_0002_0001;
tracing::info!("=== Starting Router Learning Test ===");
let result = TestConfig::small("router-learn", SEED)
.with_iterations(150)
.with_duration(Duration::from_secs(200))
.run()
.assert_ok();
let snapshots = result.router_snapshots();
let any_prediction_active = snapshots.iter().any(|(_f, _s, active)| *active);
tracing::info!(
"Router snapshots: {} total, prediction_active in any: {}",
snapshots.len(),
any_prediction_active
);
let rt = create_runtime();
let outcomes: Vec<bool> = rt.block_on(async {
let logs = result.logs_handle.lock().await;
logs.iter()
.filter_map(|log| log.kind.route_outcome_is_success())
.collect()
});
assert!(
outcomes.len() >= 10,
"Only {} route outcome events — too few for meaningful comparison. \
Expected at least 10.",
outcomes.len()
);
let mid = outcomes.len() / 2;
let (first_half, second_half) = outcomes.split_at(mid);
let first_failures = first_half.iter().filter(|&&s| !s).count();
let second_failures = second_half.iter().filter(|&&s| !s).count();
let first_rate = if first_half.is_empty() {
0.0
} else {
first_failures as f64 / first_half.len() as f64
};
let second_rate = second_failures as f64 / second_half.len() as f64;
tracing::info!(
"Route failure rates: first half {:.1}% ({}/{}), second half {:.1}% ({}/{})",
first_rate * 100.0,
first_failures,
first_half.len(),
second_rate * 100.0,
second_failures,
second_half.len()
);
assert!(
second_rate <= first_rate + 0.15,
"Router is degrading: second-half failure rate ({:.1}%) exceeds \
first-half ({:.1}%) by more than 15pp. \
This suggests the router is not learning from feedback. See #3128/#3136.",
second_rate * 100.0,
first_rate * 100.0
);
tracing::info!("test_router_learning PASSED");
}
#[test_log::test]
fn test_interest_renewal() {
const SEED: u64 = 0x3152_0003_0001;
tracing::info!("=== Starting Interest Renewal Test ===");
let result = TestConfig::medium("interest-renew", SEED)
.with_nodes(4)
.with_iterations(400)
.with_event_wait(Duration::from_secs(3))
.run_direct()
.assert_ok();
let rt = create_runtime();
let (early_broadcasts, late_broadcasts) = rt.block_on(async {
let logs = result.logs_handle.lock().await;
let log_count = logs.len();
let mid_index = log_count / 2;
let mut early = 0usize;
let mut late = 0usize;
for (i, log) in logs.iter().enumerate() {
if log.kind.is_update_broadcast_received() {
if i < mid_index {
early += 1;
} else {
late += 1;
}
}
}
(early, late)
});
let total = early_broadcasts + late_broadcasts;
tracing::info!(
"Broadcast received events: {} total (early: {}, late: {})",
total,
early_broadcasts,
late_broadcasts
);
assert!(
total > 0,
"No BroadcastReceived events found in {} events — \
simulation may not be generating updates",
result.event_count
);
assert!(
late_broadcasts > 0,
"No BroadcastReceived events in second half of simulation. \
Subscriptions may have expired without renewal. See #3093."
);
let ratio = late_broadcasts as f64 / early_broadcasts.max(1) as f64;
tracing::info!(
"Late/early broadcast ratio: {:.2} ({}/{})",
ratio,
late_broadcasts,
early_broadcasts
);
assert!(
ratio > 0.2,
"Late broadcast ratio ({:.2}) is below 0.2 threshold — \
subscriptions are likely degrading across lease cycles. See #3093.",
ratio
);
tracing::info!(
"test_interest_renewal PASSED: {:.2} late/early ratio",
ratio
);
}
#[test_log::test]
fn test_interest_ttl_refresh_on_broadcast() {
const SEED: u64 = 0x3107_0BCA_0001;
tracing::info!("=== Starting Interest TTL Refresh on Broadcast Test ===");
tracing::info!("Virtual time target: 1800s (~1.5x INTEREST_TTL of 1200s)");
let result = TestConfig::small("ttl-refresh-bcast", SEED)
.with_gateways(1)
.with_nodes(2) .with_max_contracts(2) .with_iterations(300) .with_event_wait(Duration::from_secs(6))
.run_direct()
.assert_ok();
let rt = create_runtime();
let (early_broadcasts, mid_broadcasts, late_broadcasts) = rt.block_on(async {
let logs = result.logs_handle.lock().await;
let log_count = logs.len();
let third = log_count / 3;
let mut early = 0usize;
let mut mid = 0usize;
let mut late = 0usize;
for (i, log) in logs.iter().enumerate() {
if log.kind.is_update_broadcast_received() {
if i < third {
early += 1;
} else if i < third * 2 {
mid += 1;
} else {
late += 1;
}
}
}
(early, mid, late)
});
let total = early_broadcasts + mid_broadcasts + late_broadcasts;
tracing::info!(
"Broadcast received events: {} total (early: {}, mid: {}, late: {})",
total,
early_broadcasts,
mid_broadcasts,
late_broadcasts
);
assert!(
total > 0,
"No BroadcastReceived events found in {} logged events — \
simulation may not be generating updates. Seed: 0x{:X}",
result.event_count,
SEED
);
assert!(
late_broadcasts > 0,
"No BroadcastReceived events in the final third of simulation \
(after INTEREST_TTL boundary). Interest TTL is NOT being refreshed \
on broadcast send — subscriptions have silently expired. \
See #3093, #3107. Seed: 0x{:X}",
SEED
);
let late_ratio = late_broadcasts as f64 / early_broadcasts.max(1) as f64;
tracing::info!(
"Late/early broadcast ratio: {:.2} ({}/{})",
late_ratio,
late_broadcasts,
early_broadcasts
);
assert!(
late_ratio > 0.1,
"Late broadcast ratio ({:.2}) dropped below 0.1 — \
interest TTL refresh may be partially broken. \
Early: {}, Mid: {}, Late: {}. See #3093, #3107. Seed: 0x{:X}",
late_ratio,
early_broadcasts,
mid_broadcasts,
late_broadcasts,
SEED
);
tracing::info!(
"test_interest_ttl_refresh_on_broadcast PASSED: late/early ratio {:.2}, \
total broadcasts: {} (early: {}, mid: {}, late: {})",
late_ratio,
total,
early_broadcasts,
mid_broadcasts,
late_broadcasts
);
}
async fn let_network_run(sim: &mut SimNetwork, duration: Duration) {
let step = Duration::from_millis(100);
let mut elapsed = Duration::ZERO;
while elapsed < duration {
sim.advance_time(step);
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(10)).await;
elapsed += step;
}
}
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_thundering_herd_connect_storm() {
const SEED: u64 = 0x3207_0000_3208;
const NETWORK_NAME: &str = "thundering-herd-connect";
tracing::info!("=== Thundering Herd CONNECT Storm Test (Issue #3207, PR #3208) ===");
let mut sim = SimNetwork::new(NETWORK_NAME, 1, 20, 7, 3, 10, 2, SEED).await;
sim.with_start_backoff(Duration::from_millis(50));
let _handles = sim
.start_with_rand_gen::<rand::rngs::SmallRng>(SEED, 5, 10)
.await;
let logs_handle = sim.event_logs_handle();
tracing::info!("Phase 1: Stabilizing network (5s)");
let_network_run(&mut sim, Duration::from_secs(5)).await;
sim.check_partial_connectivity(Duration::from_secs(20), 0.8)
.await
.expect("Network should reach ≥80% connectivity before crash");
tracing::info!("Phase 1 complete: network connected");
let baseline_connects = {
let logs = logs_handle.lock().await;
logs.iter().filter(|log| log.kind.is_connect()).count()
};
tracing::info!("Baseline Connect events: {}", baseline_connects);
let gateway_label = sim
.all_node_addresses()
.keys()
.find(|label| label.is_gateway())
.cloned()
.expect("Should have a gateway");
tracing::info!(?gateway_label, "Phase 2: Crashing gateway");
let crashed = sim.crash_node(&gateway_label);
assert!(crashed, "Gateway should crash successfully");
tracing::info!("Phase 3: Peers detecting dead gateway (10s)");
let_network_run(&mut sim, Duration::from_secs(10)).await;
tracing::info!("Phase 4: Restarting gateway (thundering herd begins)");
let restart_seed = SEED.wrapping_add(0x1000);
let handle = sim
.restart_node::<rand::rngs::SmallRng>(&gateway_label, restart_seed, 5, 5)
.await;
assert!(handle.is_some(), "Gateway should restart successfully");
tracing::info!("Phase 5: Reconnection storm playing out (30s)");
let_network_run(&mut sim, Duration::from_secs(30)).await;
tracing::info!("Phase 6: Verifying network recovery");
let final_connects = {
let logs = logs_handle.lock().await;
logs.iter().filter(|log| log.kind.is_connect()).count()
};
let storm_connects = final_connects - baseline_connects;
tracing::info!(
"Connect events after restart: {} (baseline: {}, storm: {})",
final_connects,
baseline_connects,
storm_connects
);
sim.check_partial_connectivity(Duration::from_secs(20), 0.8)
.await
.expect(
"Network should recover to ≥80% connectivity after gateway restart. \
The rate limiter should throttle but not prevent reconnection.",
);
tracing::info!(
"test_thundering_herd_connect_storm PASSED: {} Connect events after restart, \
network recovered to ≥80% connectivity",
storm_connects
);
}
#[test_log::test]
fn test_isolated_node_rebootstraps_via_gateway() {
use freenet::dev_tool::NodeLabel;
use freenet::simulation::Partition;
const SEED: u64 = 0x3219_B007_0001;
const NETWORK_NAME: &str = "isolated-rebootstrap";
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle, node_addrs) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 4, 7, 3, 10, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
let node_addrs: HashMap<NodeLabel, std::net::SocketAddr> = sim.all_node_addresses().clone();
(sim, logs_handle, node_addrs)
});
let (isolated_label, isolated_addr) = node_addrs
.iter()
.find(|(label, _)| label.is_node())
.map(|(l, a)| (l.clone(), *a))
.expect("Need at least one non-gateway node");
let side_a: std::collections::HashSet<_> = [isolated_addr].into_iter().collect();
let side_b: std::collections::HashSet<_> = node_addrs
.iter()
.filter(|(label, _)| **label != isolated_label)
.map(|(_, addr)| *addr)
.collect();
tracing::info!(
isolated = %isolated_label,
isolated_addr = %isolated_addr,
rest_count = side_b.len(),
"Will isolate one node from all others"
);
let network_name = NETWORK_NAME.to_string();
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
SEED,
3, 80, Duration::from_secs(120), Duration::from_millis(500), move || async move {
tracing::info!("Injecting partition: isolating one node from all others");
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
let partition = Partition::new(side_a, side_b).permanent(0);
state.config.add_partition(partition);
}
tokio::time::sleep(Duration::from_secs(20)).await;
tracing::info!("Isolation period over — node should have zero connections now");
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
state.config.partitions.clear();
}
tracing::info!("Partition healed — waiting for re-bootstrap");
tokio::time::sleep(Duration::from_secs(30)).await;
tracing::info!("Post-heal convergence period complete");
Ok(())
},
);
assert!(
result.is_ok(),
"Isolated-node re-bootstrap simulation failed: {:?}",
result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
tracing::info!(
"Isolated-rebootstrap: {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
assert!(
convergence.diverged.is_empty(),
"Isolated node failed to re-bootstrap: {} contracts diverged (expected 0). \
This indicates the gateway bootstrap fallback in connection_maintenance \
did not recover the node after partition heal.",
convergence.diverged.len()
);
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"=== ISOLATED-REBOOTSTRAP ANOMALY REPORT: {} events, {} state, {} contracts, {} anomalies ===",
report.total_events,
report.state_events,
report.contracts_analyzed,
report.anomalies.len()
);
let divergences = report.divergences();
let stale = report.stale_peers();
tracing::warn!(" divergences={}, stale={}", divergences.len(), stale.len(),);
for (i, anomaly) in report.anomalies.iter().enumerate() {
tracing::debug!(" anomaly[{}] = {:?}", i, anomaly);
}
}
#[test]
fn test_direct_runner_churn() {
use freenet::dev_tool::ChurnConfig;
const SEED: u64 = 0xC1_0055_FEED;
setup_deterministic_state(SEED);
let rt = create_runtime();
let mut sim = rt.block_on(async {
SimNetwork::new(
"test-churn",
2, 8, 10, 5, 15, 3, SEED,
)
.await
});
sim.with_churn(ChurnConfig {
crash_probability: 0.20,
tick_interval: Duration::from_millis(200),
recovery_delay: Duration::from_millis(500),
max_simultaneous_crashes: Some(2),
permanent_crash_rate: 0.05,
warmup_delay: Duration::from_secs(5),
});
let logs_handle = sim.event_logs_handle();
drop(rt);
sim.run_simulation_direct::<rand::rngs::SmallRng>(
SEED,
10, 30, Duration::from_millis(200),
)
.expect("Direct simulation with churn should complete without panic");
let rt = create_runtime();
let event_count = rt.block_on(async {
let logs = logs_handle.lock().await;
logs.len()
});
assert!(
event_count > 0,
"Simulation with churn should produce events, got 0"
);
tracing::info!(
"CHURN TEST PASSED: {} events produced with node churn enabled",
event_count
);
}
#[test_log::test]
fn test_unhealthy_peer_eviction() {
use freenet::dev_tool::NodeLabel;
use freenet::simulation::Partition;
const SEED: u64 = 0xDEAD_BEEF_0042;
const NETWORK_NAME: &str = "unhealthy-eviction";
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle, node_addrs) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 4, 7,
3,
10,
2,
SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
let node_addrs: HashMap<NodeLabel, SocketAddr> = sim.all_node_addresses().clone();
(sim, logs_handle, node_addrs)
});
let victim_addr: SocketAddr = *node_addrs
.iter()
.find(|(label, _)| label.is_node())
.expect("Need at least 1 non-gateway node")
.1;
let all_addrs: HashSet<_> = node_addrs.values().copied().collect();
let victim_set: HashSet<_> = [victim_addr].into_iter().collect();
let healthy_set: HashSet<_> = all_addrs.difference(&victim_set).copied().collect();
let network_name = NETWORK_NAME.to_string();
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
SEED,
5, 150, Duration::from_secs(600), Duration::from_secs(2), move || async move {
tracing::info!("Partitioning victim node: {}", victim_addr);
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
let partition = Partition::new(victim_set, healthy_set).permanent(0);
state.config.add_partition(partition);
}
tokio::time::sleep(Duration::from_secs(120)).await;
tracing::info!("Partition active for 120s virtual time");
Ok(())
},
);
assert!(
result.is_ok(),
"Unhealthy-peer eviction simulation failed: {:?}",
result.err()
);
let event_count = rt.block_on(async { logs_handle.lock().await.len() });
assert!(event_count > 0, "Simulation should produce events, got 0");
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"=== UNHEALTHY EVICTION REPORT: {} events, {} state, {} contracts, {} anomalies ===",
report.total_events,
report.state_events,
report.contracts_analyzed,
report.anomalies.len()
);
}
#[test]
fn test_readiness_gating_production_config() {
const SEED: u64 = 0xBEAD_10A7_E001;
setup_deterministic_state(SEED);
let rt = create_runtime();
let mut sim = rt.block_on(async {
SimNetwork::new(
"test-readiness-gating",
1, 4, 10, 5, 10, 3, SEED,
)
.await
});
sim.with_readiness_gating(3);
let logs_handle = sim.event_logs_handle();
drop(rt);
sim.run_simulation_direct::<rand::rngs::SmallRng>(
SEED,
5, 20, Duration::from_millis(200),
)
.expect("Simulation with readiness gating should complete without panic");
let rt = create_runtime();
let event_count = rt.block_on(async {
let logs = logs_handle.lock().await;
logs.len()
});
assert!(
event_count > 0,
"Simulation with readiness gating should produce events, got 0"
);
tracing::info!(
"READINESS GATING TEST PASSED: {} events produced with gating(3)",
event_count
);
}
#[test]
fn test_readiness_gating_with_message_loss() {
use freenet::simulation::FaultConfig;
const SEED: u64 = 0xBEAD_10A7_E002;
setup_deterministic_state(SEED);
let rt = create_runtime();
let mut sim = rt.block_on(async {
SimNetwork::new(
"test-readiness-gating-loss",
1, 4, 10, 5, 10, 3, SEED,
)
.await
});
sim.with_readiness_gating(3);
sim.with_fault_injection(FaultConfig::builder().message_loss_rate(0.05).build());
let logs_handle = sim.event_logs_handle();
drop(rt);
sim.run_simulation_direct::<rand::rngs::SmallRng>(
SEED,
5, 25, Duration::from_millis(200),
)
.expect("Simulation with readiness gating + message loss should complete");
let rt = create_runtime();
let event_count = rt.block_on(async {
let logs = logs_handle.lock().await;
logs.len()
});
assert!(
event_count > 0,
"Simulation with readiness gating + message loss should produce events, got 0"
);
tracing::info!(
"READINESS GATING + MESSAGE LOSS TEST PASSED: {} events",
event_count
);
}
#[test]
fn test_gateway_version_probe_fires() {
const SEED: u64 = 0x6A7E_7AE9_0001;
setup_deterministic_state(SEED);
let rt = create_runtime();
let sim = rt.block_on(async {
SimNetwork::new(
"test-gw-version-probe",
1, 3, 7, 3, 10, 2, SEED,
)
.await
});
let logs_handle = sim.event_logs_handle();
drop(rt);
sim.run_simulation_direct::<rand::rngs::SmallRng>(SEED, 3, 30, Duration::from_secs(1))
.expect("Simulation should complete without panic");
let rt = create_runtime();
let unique_connect_txs = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut seen = std::collections::HashSet::new();
for log in logs.iter() {
if log.kind.variant_name() == "Connect" {
seen.insert(log.tx);
}
}
seen.len()
});
assert!(
unique_connect_txs > 6,
"Expected bootstrap (~3) + at least 4 probe CONNECTs, got {unique_connect_txs}"
);
}
#[test_log::test]
fn test_connect_despite_nat_partition() {
use freenet::dev_tool::NodeLabel;
use freenet::simulation::Partition;
const SEED: u64 = 0xC0DE_FA11_0001;
const NETWORK_NAME: &str = "nat-partition";
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle, node_addrs) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 5, 7, 3, 10, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
let node_addrs: HashMap<NodeLabel, std::net::SocketAddr> = sim.all_node_addresses().clone();
(sim, logs_handle, node_addrs)
});
let mut non_gw_nodes: Vec<_> = node_addrs
.iter()
.filter(|(label, _)| label.is_node())
.collect();
non_gw_nodes.sort_by_key(|(label, _)| label.number());
assert!(
non_gw_nodes.len() >= 2,
"Need at least 2 non-gateway nodes for partition"
);
let node_a_addr = *non_gw_nodes[0].1;
let node_b_addr = *non_gw_nodes[1].1;
let side_a: HashSet<std::net::SocketAddr> = [node_a_addr].into_iter().collect();
let side_b: HashSet<std::net::SocketAddr> = [node_b_addr].into_iter().collect();
let network_name = NETWORK_NAME.to_string();
tracing::info!(
node_a = %node_a_addr,
node_b = %node_b_addr,
"Will inject NAT partition between two nodes"
);
let result = sim.run_simulation::<rand::rngs::SmallRng, _, _>(
SEED,
3, 100, Duration::from_secs(120), Duration::from_millis(500), move || async move {
if let Some(injector) = freenet::dev_tool::get_fault_injector(&network_name) {
let mut state = injector.lock().unwrap();
let partition = Partition::new(side_a, side_b).permanent(0);
state.config.add_partition(partition);
tracing::info!("NAT partition injected between two nodes");
}
tokio::time::sleep(Duration::from_secs(30)).await;
tracing::info!("NAT partition test: observation period complete");
Ok(())
},
);
assert!(
result.is_ok(),
"NAT partition simulation failed (nodes may have gotten stuck retrying \
unreachable acceptor): {:?}",
result.err()
);
let convergence = rt.block_on(async { check_convergence_from_logs(&logs_handle).await });
tracing::info!(
"NAT partition: {} converged, {} diverged",
convergence.converged.len(),
convergence.diverged.len()
);
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"=== NAT PARTITION ANOMALY REPORT: {} events, {} state, {} contracts, {} anomalies ===",
report.total_events,
report.state_events,
report.contracts_analyzed,
report.anomalies.len()
);
let divergences = report.divergences();
let stale = report.stale_peers();
tracing::warn!(" divergences={}, stale={}", divergences.len(), stale.len(),);
for (i, anomaly) in report.anomalies.iter().enumerate() {
tracing::debug!(" anomaly[{}] = {:?}", i, anomaly);
}
}
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_connection_growth_stall_regression() {
use freenet::dev_tool::NodeLabel;
use freenet::simulation::FaultConfig;
const SEED: u64 = 0x3408_3398_0001;
const NETWORK_NAME: &str = "connection-growth-stall";
const GATEWAYS: usize = 2;
const NODES: usize = 15;
const RING_MAX_HTL: usize = 7;
const RND_IF_HTL_ABOVE: usize = 3;
const MAX_CONN: usize = 10;
const MIN_CONN: usize = 5;
tracing::info!("=== Connection Growth Stall Regression Test ===");
tracing::info!("Verifies fixes: #3408, #3398, #3396, #3380");
setup_deterministic_state(SEED);
let mut sim = SimNetwork::new(
NETWORK_NAME,
GATEWAYS,
NODES,
RING_MAX_HTL,
RND_IF_HTL_ABOVE,
MAX_CONN,
MIN_CONN,
SEED,
)
.await;
sim.with_start_backoff(Duration::from_millis(50));
let _handles = sim
.start_with_rand_gen::<rand::rngs::SmallRng>(SEED, 0, 0)
.await;
tracing::info!("Phase 1: Connection growth — 20 virtual minutes, no faults");
let_network_run(&mut sim, Duration::from_secs(1200)).await;
let mut node_counts: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
node_counts.sort_unstable();
let num_sampled = node_counts.len();
assert!(num_sampled > 0, "No connection_managers available");
let median_conn = node_counts[num_sampled / 2];
let nodes_above_min = node_counts.iter().filter(|&&c| c >= MIN_CONN).count();
let fraction_above_min = nodes_above_min as f64 / num_sampled as f64;
tracing::info!("Phase 1 connection counts: {:?}", node_counts);
tracing::info!(
"Median={}, nodes at min_connections={}/{} ({:.0}%)",
median_conn,
nodes_above_min,
num_sampled,
fraction_above_min * 100.0
);
let connectivity = sim.node_connectivity();
let mut nodes_with_peer_connections = 0usize;
for (label, (_key, conns)) in &connectivity {
if !label.is_gateway() {
let has_non_gw_conn = conns.keys().any(|peer| !peer.is_gateway());
if has_non_gw_conn {
nodes_with_peer_connections += 1;
}
}
}
tracing::info!(
"Nodes with non-gateway peer connections: {}/{}",
nodes_with_peer_connections,
NODES
);
assert!(
median_conn >= MIN_CONN - 1,
"Connection growth stall: median={} must be >= {} (MIN_CONN - 1). \
Counts: {:?}. Seed: 0x{:X}",
median_conn,
MIN_CONN - 1,
node_counts,
SEED
);
assert!(
fraction_above_min >= 0.10,
"Too few nodes reached min_connections: {:.0}% ({}/{}) — expected >= 10%. \
Counts: {:?}. Seed: 0x{:X}",
fraction_above_min * 100.0,
nodes_above_min,
num_sampled,
node_counts,
SEED
);
let peer_conn_fraction = nodes_with_peer_connections as f64 / NODES as f64;
assert!(
peer_conn_fraction >= 0.50,
"Only {:.0}% of nodes have non-gateway peer connections (expected >= 50%). \
CONNECT forwarding is insufficient. Seed: 0x{:X}",
peer_conn_fraction * 100.0,
SEED
);
tracing::info!("Phase 2: NAT failure simulation — 20% message loss for 1 min");
sim.with_fault_injection(FaultConfig::builder().message_loss_rate(0.20).build());
let_network_run(&mut sim, Duration::from_secs(60)).await;
let mut node_counts_after: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
node_counts_after.sort_unstable();
let median_after = node_counts_after
.get(node_counts_after.len() / 2)
.copied()
.unwrap_or(0);
let nodes_isolated = node_counts_after.iter().filter(|&&c| c == 0).count();
let fraction_isolated = nodes_isolated as f64 / NODES as f64;
tracing::info!(
"Phase 2 connection counts after NAT failures: {:?}",
node_counts_after
);
tracing::info!(
"Median={}, isolated={}/{} ({:.0}%)",
median_after,
nodes_isolated,
NODES,
fraction_isolated * 100.0
);
assert!(
median_after >= MIN_CONN.saturating_sub(2).max(2),
"Death spiral: median connections after NAT failures = {} (expected >= {}). \
Counts: {:?}. Seed: 0x{:X}",
median_after,
MIN_CONN.saturating_sub(2).max(2),
node_counts_after,
SEED
);
assert!(
fraction_isolated < 0.10,
"{:.0}% of nodes isolated after NAT failures (threshold: 10%). \
Counts: {:?}. Seed: 0x{:X}",
fraction_isolated * 100.0,
node_counts_after,
SEED
);
tracing::info!(
"PASSED: Phase1[median={}, above_min={}/{}, peer_conns={}/{}] \
Phase2[median={}, isolated={}/{}]",
median_conn,
nodes_above_min,
num_sampled,
nodes_with_peer_connections,
NODES,
median_after,
nodes_isolated,
NODES
);
}
#[test_log::test]
fn test_get_succeeds_despite_readiness_gating() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation};
const SEED: u64 = 0xAE7F_1A00_0001;
const NETWORK_NAME: &str = "get-readiness-fallback";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (mut sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, 3, 7, 3, 10, 2, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let total_nodes = 1 + 3; sim.with_readiness_gating(total_nodes + 1);
let contract = SimOperation::create_test_contract(0xAE);
let contract_id = *contract.key().id();
let contract_key = contract.key();
let operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: vec![1, 2, 3, 4],
subscribe: false,
},
),
ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, 1),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
),
];
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(120),
Duration::from_secs(30),
);
assert!(
result.turmoil_result.is_ok(),
"GET readiness fallback simulation failed: {:?}",
result.turmoil_result.err()
);
let node1_label = NodeLabel::node(NETWORK_NAME, 1);
let node1_storage = result
.node_storages
.get(&node1_label)
.expect("node 1 should have a storage handle");
let node1_state = node1_storage.get_stored_state(&contract_key);
assert!(
node1_state.is_some(),
"Node 1 should have contract state after GET, but storage is empty. \
This indicates k_closest_potentially_hosting returned empty due to \
readiness gating (the bug from #3356/#3423)."
);
let rt = create_runtime();
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"Anomaly report: {} anomalies across {} contracts",
report.anomalies.len(),
report.contracts_analyzed
);
tracing::info!(
"test_get_succeeds_despite_readiness_gating PASSED: \
node 1 has contract state, {} events analyzed",
report.total_events
);
}
#[test_log::test]
fn test_get_routing_coverage_low_htl() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
const SEED: u64 = 0xC0DE_B0CA_0032;
const NETWORK_NAME: &str = "get-routing-coverage";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let num_nodes = 15;
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, num_nodes, 3, 1, 7, 3, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xC0);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let mut operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0xC0),
subscribe: true,
},
),
];
for i in 1..=12 {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Subscribe { contract_id },
));
}
operations.push(ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(42, 0xC0),
},
));
for i in 1..=num_nodes {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300),
Duration::from_secs(90),
);
assert!(
result.turmoil_result.is_ok(),
"Simulation failed: {:?}",
result.turmoil_result.err()
);
let mut nodes_without_state = Vec::new();
for i in 1..=num_nodes {
let label = NodeLabel::node(NETWORK_NAME, i);
let storage = result
.node_storages
.get(&label)
.unwrap_or_else(|| panic!("node {i} should have a storage handle"));
if storage.get_stored_state(&contract_key).is_none() {
nodes_without_state.push(format!("node-{i}"));
}
}
assert!(
nodes_without_state.is_empty(),
"GET routing exhaustion: {} nodes failed to get contract state \
(contract only cached at ~5 nodes due to HTL=4). \
Failed nodes: {:?}. See #3431.",
nodes_without_state.len(),
nodes_without_state
);
let rt = create_runtime();
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"test_get_routing_coverage_low_htl PASSED: {} anomalies, {} events",
report.anomalies.len(),
report.total_events
);
}
#[ignore]
#[test_log::test]
fn test_get_retry_with_alternatives_sparse_topology() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation};
const SEED: u64 = 0xBEEF_CAFE_0001;
const NETWORK_NAME: &str = "get-retry-sparse";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let num_nodes = 10;
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, num_nodes, 2, 1, 6, 3, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xBE);
let contract_id = *contract.key().id();
let contract_key = contract.key();
let mut operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: vec![10, 20, 30, 40],
subscribe: false,
},
),
];
for i in 1..=num_nodes {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300), Duration::from_secs(90), );
assert!(
result.turmoil_result.is_ok(),
"Simulation failed: {:?}",
result.turmoil_result.err()
);
let mut nodes_without_state = Vec::new();
for i in 1..=num_nodes {
let label = NodeLabel::node(NETWORK_NAME, i);
let storage = result
.node_storages
.get(&label)
.unwrap_or_else(|| panic!("node {i} should have a storage handle"));
if storage.get_stored_state(&contract_key).is_none() {
nodes_without_state.push(format!("node-{i}"));
}
}
assert!(
nodes_without_state.is_empty(),
"GET retry regression: {} of {} nodes failed to get contract state \
(contract cached at only ~2-3 nodes due to HTL=2, retry should find it). \
Failed nodes: {:?}. See PR #3444.",
nodes_without_state.len(),
num_nodes,
nodes_without_state
);
let rt = create_runtime();
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"test_get_retry_with_alternatives_sparse_topology PASSED: \
all {} nodes got contract state, {} anomalies, {} events",
num_nodes,
report.anomalies.len(),
report.total_events
);
}
#[test_log::test]
fn test_auto_fetch_from_update_sender() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation, register_crdt_contract};
const SEED: u64 = 0xFE7C_A100_0001;
const NETWORK_NAME: &str = "auto-fetch-update";
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let num_nodes = 8;
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
1, num_nodes, 2, 1, 6, 3, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0xFE);
let contract_id = *contract.key().id();
let contract_key = contract.key();
register_crdt_contract(contract_id);
let mut operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: SimOperation::create_crdt_state(1, 0xFE),
subscribe: true,
},
),
];
for i in 1..=num_nodes {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Subscribe { contract_id },
));
}
operations.push(ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Update {
key: contract_key,
data: SimOperation::create_crdt_state(42, 0xFE),
},
));
for i in 1..=num_nodes {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(300), Duration::from_secs(90), );
assert!(
result.turmoil_result.is_ok(),
"Simulation failed: {:?}",
result.turmoil_result.err()
);
let mut nodes_without_state = Vec::new();
for i in 1..=num_nodes {
let label = NodeLabel::node(NETWORK_NAME, i);
let storage = result
.node_storages
.get(&label)
.unwrap_or_else(|| panic!("node {i} should have a storage handle"));
if storage.get_stored_state(&contract_key).is_none() {
nodes_without_state.push(format!("node-{i}"));
}
}
let success_count = num_nodes - nodes_without_state.len();
let success_rate = success_count as f64 / num_nodes as f64;
assert!(
success_rate >= 0.75,
"Auto-fetch regression: only {} of {} nodes ({:.0}%) got contract state \
after UPDATE broadcast. Without auto-fetch, nodes that didn't receive \
PUT due to low HTL would never get the contract. \
Failed nodes: {:?}. See PR #3444.",
success_count,
num_nodes,
success_rate * 100.0,
nodes_without_state
);
let rt = create_runtime();
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"test_auto_fetch_from_update_sender PASSED: \
{}/{} nodes got contract state ({:.0}%), {} anomalies, {} events",
success_count,
num_nodes,
success_rate * 100.0,
report.anomalies.len(),
report.total_events
);
}
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_connection_growth_plateau_diagnostic() {
use freenet::dev_tool::NodeLabel;
const SEED: u64 = 0x3555_D1A6_0002;
const NETWORK_NAME: &str = "growth-plateau-diag";
const GATEWAYS: usize = 2;
const NODES: usize = 50;
const RING_MAX_HTL: usize = 7;
const RND_IF_HTL_ABOVE: usize = 3;
const MAX_CONN: usize = 20;
const MIN_CONN: usize = 10;
tracing::info!("=== Connection Growth Plateau Diagnostic ===");
setup_deterministic_state(SEED);
let mut sim = SimNetwork::new(
NETWORK_NAME,
GATEWAYS,
NODES,
RING_MAX_HTL,
RND_IF_HTL_ABOVE,
MAX_CONN,
MIN_CONN,
SEED,
)
.await;
sim.with_start_backoff(Duration::from_millis(50));
let _handles = sim
.start_with_rand_gen::<rand::rngs::SmallRng>(SEED, 0, 0)
.await;
let logs_handle = sim.event_logs_handle();
let phases = [
("5min", Duration::from_secs(300)),
("10min", Duration::from_secs(300)),
("15min", Duration::from_secs(300)),
("20min", Duration::from_secs(300)),
];
for (phase_name, duration) in &phases {
let_network_run(&mut sim, *duration).await;
let mut node_counts: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
node_counts.sort_unstable();
let n = node_counts.len();
if n == 0 {
tracing::info!("[{}] No nodes sampled", phase_name);
continue;
}
let median = node_counts[n / 2];
let min = node_counts[0];
let max = node_counts[n - 1];
let above_min = node_counts.iter().filter(|&&c| c >= MIN_CONN).count();
let zero = node_counts.iter().filter(|&&c| c == 0).count();
let avg = node_counts.iter().sum::<usize>() as f64 / n as f64;
let (initiated, terminus_accepted, terminus_rejected, forwarded, connected, disconnected) = {
let logs = logs_handle.lock().await;
let initiated = logs
.iter()
.filter(|l| l.kind.is_connect_initiated())
.count();
let terminus_accepted = logs
.iter()
.filter(|l| l.kind.is_connect_terminus_accepted())
.count();
let terminus_rejected = logs
.iter()
.filter(|l| l.kind.is_connect_terminus_rejected())
.count();
let forwarded = logs
.iter()
.filter(|l| l.kind.is_connect_forwarded())
.count();
let connected = logs
.iter()
.filter(|l| l.kind.is_connect_connected())
.count();
let disconnected = logs.iter().filter(|l| l.kind.is_disconnected()).count();
(
initiated,
terminus_accepted,
terminus_rejected,
forwarded,
connected,
disconnected,
)
};
let terminus_total = terminus_accepted + terminus_rejected;
let terminus_accept_rate = if terminus_total > 0 {
terminus_accepted as f64 / terminus_total as f64 * 100.0
} else {
0.0
};
tracing::info!(
"[{}] Connections: median={}, avg={:.1}, min={}, max={}, \
above_min={}/{} ({:.0}%), zero={}",
phase_name,
median,
avg,
min,
max,
above_min,
n,
above_min as f64 / n as f64 * 100.0,
zero,
);
tracing::info!(
"[{}] CONNECT pipeline: initiated={}, forwarded={}, \
terminus_accepted={}, terminus_rejected={}, \
terminus_accept_rate={:.0}%, connected={}, disconnected={}",
phase_name,
initiated,
forwarded,
terminus_accepted,
terminus_rejected,
terminus_accept_rate,
connected,
disconnected,
);
}
let final_counts: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
let mut histogram: BTreeMap<usize, usize> = BTreeMap::new();
for &c in &final_counts {
*histogram.entry(c).or_insert(0) += 1;
}
tracing::info!("Final connection distribution: {:?}", histogram);
for i in 0..GATEWAYS {
let label = NodeLabel::gateway(NETWORK_NAME, i);
if let Some(count) = sim.connection_count(&label) {
tracing::info!("Gateway {} connections: {}", i, count);
}
}
let total_connected = final_counts.iter().filter(|&&c| c > 0).count();
assert!(
total_connected > NODES / 2,
"Fewer than half the nodes have any connections: {}/{}",
total_connected,
NODES,
);
}
#[cfg(feature = "nightly_tests")]
#[test_log::test]
fn test_get_reliability_diagnostic() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation};
const SEED: u64 = 0x3570_D1A6_0001;
const NETWORK_NAME: &str = "get-reliability-diag";
const NUM_NODES: usize = 100;
const NUM_GATEWAYS: usize = 3;
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
NUM_GATEWAYS,
NUM_NODES,
10, 7, 12, 4, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
let contract = SimOperation::create_test_contract(0x35);
let contract_id = *contract.key().id();
let contract_key = contract.key();
let mut operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: vec![0x35; 64],
subscribe: true,
},
),
];
for i in 0..NUM_NODES {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(600), Duration::from_secs(120), );
assert!(
result.turmoil_result.is_ok(),
"Simulation failed: {:?}",
result.turmoil_result.err()
);
let rt = create_runtime();
let (successes, not_found, failures, timeouts, elapsed_ms_list) = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut successes = 0u64;
let mut not_found = 0u64;
let mut failures = 0u64;
let mut timeouts = 0u64;
let mut elapsed_list = Vec::new();
for log in logs.iter() {
match log.kind.get_outcome() {
Some(true) => {
successes += 1;
if let Some(ms) = log.kind.get_elapsed_ms() {
elapsed_list.push(ms);
}
}
Some(false) => {
if let Some(ms) = log.kind.get_elapsed_ms() {
if ms >= 55_000 {
timeouts += 1;
} else {
not_found += 1;
}
} else {
failures += 1;
}
}
None => {}
}
}
(successes, not_found, failures, timeouts, elapsed_list)
});
let total_outcomes = successes + not_found + failures + timeouts;
let mut sorted_latencies = elapsed_ms_list.clone();
sorted_latencies.sort();
let p50 = sorted_latencies
.get(sorted_latencies.len() / 2)
.copied()
.unwrap_or(0);
let p90 = sorted_latencies
.get(sorted_latencies.len() * 9 / 10)
.copied()
.unwrap_or(0);
let p99 = sorted_latencies
.get(sorted_latencies.len() * 99 / 100)
.copied()
.unwrap_or(0);
let max_latency = sorted_latencies.last().copied().unwrap_or(0);
let success_rate = if total_outcomes > 0 {
successes as f64 / total_outcomes as f64
} else {
0.0
};
let (response_sent_count, ack_received_count, response_lost_txs, retry_storm_txs) = rt
.block_on(async {
let logs = logs_handle.lock().await;
let mut response_sent_txs: HashSet<String> = HashSet::new();
let mut success_txs: HashSet<String> = HashSet::new();
let mut ack_received = 0u64;
let mut request_count_per_tx: HashMap<String, usize> = HashMap::new();
for log in logs.iter() {
let tx_str = log.tx.to_string();
if log.kind.is_get_response_sent() {
response_sent_txs.insert(tx_str.clone());
}
if log.kind.get_outcome() == Some(true) {
success_txs.insert(tx_str.clone());
}
if log.kind.is_forwarding_ack_received() {
ack_received += 1;
}
if log.kind.get_outcome().is_some()
|| log.kind.is_get_response_sent()
|| log.kind.is_forwarding_ack_received()
{
} else if log.kind.is_get_request() {
*request_count_per_tx.entry(tx_str).or_insert(0) += 1;
}
}
let response_lost: Vec<_> = response_sent_txs
.difference(&success_txs)
.cloned()
.collect();
let retry_storms: Vec<_> = request_count_per_tx
.iter()
.filter(|&(_, &count)| count > 10)
.map(|(tx, count)| (tx.clone(), *count))
.collect();
(
response_sent_txs.len(),
ack_received,
response_lost,
retry_storms,
)
});
tracing::info!("=== GET Reliability Diagnostic (#3570) ===");
tracing::info!(
"Network: {} gateways + {} nodes = {} total peers",
NUM_GATEWAYS,
NUM_NODES,
NUM_GATEWAYS + NUM_NODES
);
tracing::info!(
"GET outcomes: {} total — {} success, {} not_found, {} failures, {} timeouts",
total_outcomes,
successes,
not_found,
failures,
timeouts
);
tracing::info!(
"GET success rate: {:.1}% ({}/{})",
success_rate * 100.0,
successes,
total_outcomes
);
tracing::info!(
"Latency (successful GETs): p50={}ms, p90={}ms, p99={}ms, max={}ms",
p50,
p90,
p99,
max_latency
);
tracing::info!(
"ForwardingAck: {} ACKs received, {} response_sent events, {} response-lost txs",
ack_received_count,
response_sent_count,
response_lost_txs.len()
);
if !retry_storm_txs.is_empty() {
tracing::info!(
"Retry storms (>10 requests per tx): {} txs, max {} requests",
retry_storm_txs.len(),
retry_storm_txs.iter().map(|(_, c)| c).max().unwrap_or(&0)
);
}
let mut nodes_with_state = 0;
let mut nodes_without_state = Vec::new();
for i in 0..NUM_NODES {
let label = NodeLabel::node(NETWORK_NAME, i);
if let Some(storage) = result.node_storages.get(&label) {
if storage.get_stored_state(&contract_key).is_some() {
nodes_with_state += 1;
} else {
nodes_without_state.push(i);
}
} else {
nodes_without_state.push(i);
}
}
tracing::info!(
"Storage verification: {}/{} nodes have contract state",
nodes_with_state,
NUM_NODES
);
if !nodes_without_state.is_empty() {
tracing::info!(
"Nodes missing state ({} total): {:?}{}",
nodes_without_state.len(),
&nodes_without_state[..nodes_without_state.len().min(20)],
if nodes_without_state.len() > 20 {
"..."
} else {
""
}
);
}
assert!(
total_outcomes >= 10,
"Only {} GET outcome events — too few for meaningful analysis",
total_outcomes
);
assert!(
success_rate >= 0.50,
"GET success rate {:.1}% is catastrophically low (below 50%). \
{} succeeded, {} not_found, {} failures, {} timeouts out of {} total. \
See #3570 for context.",
success_rate * 100.0,
successes,
not_found,
failures,
timeouts,
total_outcomes
);
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"Anomaly report: {} anomalies across {} contracts, {} total events",
report.anomalies.len(),
report.contracts_analyzed,
report.total_events
);
tracing::info!(
"test_get_reliability_diagnostic DONE: {:.1}% success rate, \
{}/{} nodes have state",
success_rate * 100.0,
nodes_with_state,
NUM_NODES
);
}
#[cfg(feature = "nightly_tests")]
#[test_log::test]
fn test_get_reliability_with_latency() {
use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation};
use freenet::simulation::FaultConfig;
const SEED: u64 = 0x3570_D1A6_0002;
const NETWORK_NAME: &str = "get-reliability-latency";
const NUM_NODES: usize = 100;
const NUM_GATEWAYS: usize = 3;
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (mut sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
NUM_GATEWAYS,
NUM_NODES,
10, 7, 12, 4, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
sim.with_fault_injection(
FaultConfig::builder()
.latency_range(Duration::from_millis(50)..Duration::from_millis(200))
.message_loss_rate(0.05)
.build(),
);
let contract = SimOperation::create_test_contract(0x36);
let contract_id = *contract.key().id();
let contract_key = contract.key();
let mut operations = vec![
ScheduledOperation::new(
NodeLabel::gateway(NETWORK_NAME, 0),
SimOperation::Put {
contract: contract.clone(),
state: vec![0x36; 64],
subscribe: true,
},
),
];
for i in 0..NUM_NODES {
operations.push(ScheduledOperation::new(
NodeLabel::node(NETWORK_NAME, i),
SimOperation::Get {
contract_id,
return_contract_code: true,
subscribe: false,
},
));
}
let result = sim.run_controlled_simulation(
SEED,
operations,
Duration::from_secs(900), Duration::from_secs(180), );
assert!(
result.turmoil_result.is_ok(),
"Simulation failed: {:?}",
result.turmoil_result.err()
);
let rt = create_runtime();
let (successes, not_found, failures, timeouts, elapsed_ms_list) = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut successes = 0u64;
let mut not_found = 0u64;
let mut failures = 0u64;
let mut timeouts = 0u64;
let mut elapsed_list = Vec::new();
for log in logs.iter() {
match log.kind.get_outcome() {
Some(true) => {
successes += 1;
if let Some(ms) = log.kind.get_elapsed_ms() {
elapsed_list.push(ms);
}
}
Some(false) => {
if let Some(ms) = log.kind.get_elapsed_ms() {
if ms >= 55_000 {
timeouts += 1;
} else {
not_found += 1;
}
} else {
failures += 1;
}
}
None => {}
}
}
(successes, not_found, failures, timeouts, elapsed_list)
});
let total_outcomes = successes + not_found + failures + timeouts;
let mut sorted_latencies = elapsed_ms_list.clone();
sorted_latencies.sort();
let p50 = sorted_latencies
.get(sorted_latencies.len() / 2)
.copied()
.unwrap_or(0);
let p90 = sorted_latencies
.get(sorted_latencies.len() * 9 / 10)
.copied()
.unwrap_or(0);
let p99 = sorted_latencies
.get(sorted_latencies.len() * 99 / 100)
.copied()
.unwrap_or(0);
let max_latency = sorted_latencies.last().copied().unwrap_or(0);
let success_rate = if total_outcomes > 0 {
successes as f64 / total_outcomes as f64
} else {
0.0
};
tracing::info!("=== GET Reliability with Latency (#3570) ===");
tracing::info!(
"Network: {} gateways + {} nodes, latency=50-200ms, loss=5%",
NUM_GATEWAYS,
NUM_NODES
);
tracing::info!(
"GET outcomes: {} total — {} success, {} not_found, {} failures, {} timeouts",
total_outcomes,
successes,
not_found,
failures,
timeouts
);
tracing::info!(
"GET success rate: {:.1}% ({}/{})",
success_rate * 100.0,
successes,
total_outcomes
);
tracing::info!(
"Latency (successful GETs): p50={}ms, p90={}ms, p99={}ms, max={}ms",
p50,
p90,
p99,
max_latency
);
let mut nodes_with_state = 0;
let mut nodes_without_state = Vec::new();
for i in 0..NUM_NODES {
let label = NodeLabel::node(NETWORK_NAME, i);
if let Some(storage) = result.node_storages.get(&label) {
if storage.get_stored_state(&contract_key).is_some() {
nodes_with_state += 1;
} else {
nodes_without_state.push(i);
}
} else {
nodes_without_state.push(i);
}
}
tracing::info!(
"Storage verification: {}/{} nodes have contract state",
nodes_with_state,
NUM_NODES
);
if !nodes_without_state.is_empty() {
tracing::info!(
"Nodes missing state ({} total): {:?}{}",
nodes_without_state.len(),
&nodes_without_state[..nodes_without_state.len().min(20)],
if nodes_without_state.len() > 20 {
"..."
} else {
""
}
);
}
tracing::info!(
"=== Comparison with no-latency baseline ===\n\
Baseline (no latency): 88.3% success, 72/100 nodes, p50=1ms, p90=754ms\n\
This run (50-200ms latency, 5% loss): {:.1}% success, {}/{} nodes, p50={}ms, p90={}ms",
success_rate * 100.0,
nodes_with_state,
NUM_NODES,
p50,
p90
);
assert!(
total_outcomes >= 10,
"Only {} GET outcome events — too few for meaningful analysis",
total_outcomes
);
let report = rt.block_on(async {
let logs = logs_handle.lock().await;
let verifier = freenet::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
});
tracing::info!(
"Anomaly report: {} anomalies across {} contracts, {} total events",
report.anomalies.len(),
report.contracts_analyzed,
report.total_events
);
tracing::info!(
"test_get_reliability_with_latency DONE: {:.1}% success rate, \
{}/{} nodes have state",
success_rate * 100.0,
nodes_with_state,
NUM_NODES
);
}
#[cfg(feature = "nightly_tests")]
#[test_log::test]
fn test_get_reliability_with_churn() {
use freenet::dev_tool::ChurnConfig;
use freenet::simulation::FaultConfig;
const SEED: u64 = 0x3570_D1A6_0003;
const NETWORK_NAME: &str = "get-reliability-churn";
const NUM_NODES: usize = 100;
const NUM_GATEWAYS: usize = 3;
GlobalTestMetrics::reset();
setup_deterministic_state(SEED);
let rt = create_runtime();
let (mut sim, logs_handle) = rt.block_on(async {
let sim = SimNetwork::new(
NETWORK_NAME,
NUM_GATEWAYS,
NUM_NODES,
10, 7, 12, 4, SEED,
)
.await;
let logs_handle = sim.event_logs_handle();
(sim, logs_handle)
});
sim.with_fault_injection(
FaultConfig::builder()
.latency_range(Duration::from_millis(50)..Duration::from_millis(200))
.message_loss_rate(0.05)
.build(),
);
sim.with_churn(ChurnConfig {
crash_probability: 0.10,
tick_interval: Duration::from_secs(5),
recovery_delay: Duration::from_secs(3),
max_simultaneous_crashes: Some(5),
permanent_crash_rate: 0.02, warmup_delay: Duration::from_secs(10),
});
drop(rt);
let direct_result = sim.run_simulation_direct::<rand::rngs::SmallRng>(
SEED,
15, 300, Duration::from_millis(500),
);
if let Err(e) = &direct_result {
tracing::warn!("Direct simulation completed with error (may be expected under churn): {e}");
}
let rt = create_runtime();
let (successes, not_found, failures, timeouts, elapsed_ms_list) = rt.block_on(async {
let logs = logs_handle.lock().await;
let mut successes = 0u64;
let mut not_found = 0u64;
let mut failures = 0u64;
let mut timeouts = 0u64;
let mut elapsed_list = Vec::new();
for log in logs.iter() {
match log.kind.get_outcome() {
Some(true) => {
successes += 1;
if let Some(ms) = log.kind.get_elapsed_ms() {
elapsed_list.push(ms);
}
}
Some(false) => {
if let Some(ms) = log.kind.get_elapsed_ms() {
if ms >= 55_000 {
timeouts += 1;
} else {
not_found += 1;
}
} else {
failures += 1;
}
}
None => {}
}
}
(successes, not_found, failures, timeouts, elapsed_list)
});
let total_outcomes = successes + not_found + failures + timeouts;
let mut sorted_latencies = elapsed_ms_list.clone();
sorted_latencies.sort();
let p50 = sorted_latencies
.get(sorted_latencies.len() / 2)
.copied()
.unwrap_or(0);
let p90 = sorted_latencies
.get(sorted_latencies.len() * 9 / 10)
.copied()
.unwrap_or(0);
let p99 = sorted_latencies
.get(sorted_latencies.len() * 99 / 100)
.copied()
.unwrap_or(0);
let max_latency = sorted_latencies.last().copied().unwrap_or(0);
let success_rate = if total_outcomes > 0 {
successes as f64 / total_outcomes as f64
} else {
0.0
};
tracing::info!("=== GET Reliability with Churn (#3570) ===");
tracing::info!(
"Network: {} gateways + {} nodes, latency=50-200ms, loss=5%, churn=10%/5s",
NUM_GATEWAYS,
NUM_NODES
);
tracing::info!(
"GET outcomes: {} total — {} success, {} not_found, {} failures, {} timeouts",
total_outcomes,
successes,
not_found,
failures,
timeouts
);
tracing::info!(
"GET success rate: {:.1}% ({}/{})",
success_rate * 100.0,
successes,
total_outcomes
);
tracing::info!(
"Latency (successful GETs): p50={}ms, p90={}ms, p99={}ms, max={}ms",
p50,
p90,
p99,
max_latency
);
tracing::info!(
"=== Comparison ===\n\
Baseline (no latency): 88.3% success, p90=754ms\n\
With latency (50-200ms, 5%): 81.9% success, p90=1833ms\n\
With churn + latency: {:.1}% success, p90={}ms",
success_rate * 100.0,
p90
);
if total_outcomes >= 10 {
tracing::info!(
"test_get_reliability_with_churn DONE: {:.1}% GET success rate \
({} outcomes from 300 random operations under churn)",
success_rate * 100.0,
total_outcomes
);
} else {
tracing::warn!(
"test_get_reliability_with_churn: only {} GET outcomes — \
random event generation may not have produced enough GETs",
total_outcomes
);
}
}
#[cfg(feature = "nightly_tests")]
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_nightly_50_node_topology_formation() {
use freenet::dev_tool::NodeLabel;
const SEED: u64 = 0x3511_5000_0001;
const NETWORK_NAME: &str = "nightly-50-topology";
const GATEWAYS: usize = 4;
const NODES: usize = 50;
const RING_MAX_HTL: usize = 10;
const RND_IF_HTL_ABOVE: usize = 5;
const MAX_CONN: usize = 20;
const MIN_CONN: usize = 10;
const VIRTUAL_DURATION: Duration = Duration::from_secs(3600);
tracing::info!("=== Nightly: 50-Node Topology Formation ===");
setup_deterministic_state(SEED);
let mut sim = SimNetwork::new(
NETWORK_NAME,
GATEWAYS,
NODES,
RING_MAX_HTL,
RND_IF_HTL_ABOVE,
MAX_CONN,
MIN_CONN,
SEED,
)
.await;
sim.with_start_backoff(Duration::from_millis(50));
let _handles = sim
.start_with_rand_gen::<rand::rngs::SmallRng>(SEED, 0, 0)
.await;
tracing::info!("Running 1 virtual hour of topology formation...");
let_network_run(&mut sim, VIRTUAL_DURATION).await;
let mut node_counts: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
node_counts.sort_unstable();
let num_sampled = node_counts.len();
assert!(num_sampled > 0, "No connection managers available");
let median_conn = node_counts[num_sampled / 2];
let nodes_above_min = node_counts.iter().filter(|&&c| c >= MIN_CONN).count();
let fraction_above_min = nodes_above_min as f64 / num_sampled as f64;
tracing::info!("Connection counts: {:?}", node_counts);
tracing::info!(
"Median={}, nodes at min_connections={}/{} ({:.0}%)",
median_conn,
nodes_above_min,
num_sampled,
fraction_above_min * 100.0
);
let connectivity = sim.node_connectivity();
let mut nodes_with_peer_connections = 0usize;
for (label, (_key, conns)) in &connectivity {
if !label.is_gateway() && conns.keys().any(|peer| !peer.is_gateway()) {
nodes_with_peer_connections += 1;
}
}
let peer_conn_fraction = nodes_with_peer_connections as f64 / NODES as f64;
tracing::info!(
"Nodes with non-gateway peer connections: {}/{} ({:.0}%)",
nodes_with_peer_connections,
NODES,
peer_conn_fraction * 100.0
);
assert!(
median_conn >= MIN_CONN,
"Topology formation stall: median={} < min_connections={}. \
Counts: {:?}. Seed: 0x{:X}",
median_conn,
MIN_CONN,
node_counts,
SEED
);
assert!(
fraction_above_min >= 0.90,
"Only {:.0}% of nodes reached min_connections (expected >= 90%). \
{}/{} nodes. Counts: {:?}. Seed: 0x{:X}",
fraction_above_min * 100.0,
nodes_above_min,
num_sampled,
node_counts,
SEED
);
assert!(
peer_conn_fraction >= 0.80,
"Only {:.0}% of nodes have non-gateway peer connections (expected >= 80%). \
CONNECT forwarding is insufficient. Seed: 0x{:X}",
peer_conn_fraction * 100.0,
SEED
);
tracing::info!(
"PASSED: median={}, above_min={:.0}%, peer_conns={:.0}%",
median_conn,
fraction_above_min * 100.0,
peer_conn_fraction * 100.0
);
}
#[cfg(feature = "nightly_tests")]
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_nightly_connection_growth_checkpoints() {
use freenet::dev_tool::NodeLabel;
const SEED: u64 = 0x3511_6C8E_0002;
const NETWORK_NAME: &str = "nightly-growth-checkpoints";
const GATEWAYS: usize = 4;
const NODES: usize = 50;
const RING_MAX_HTL: usize = 10;
const RND_IF_HTL_ABOVE: usize = 5;
const MAX_CONN: usize = 20;
const MIN_CONN: usize = 10;
const CHECKPOINTS_SECS: [u64; 4] = [300, 900, 1800, 3600];
tracing::info!("=== Nightly: Connection Growth Checkpoints ===");
setup_deterministic_state(SEED);
let mut sim = SimNetwork::new(
NETWORK_NAME,
GATEWAYS,
NODES,
RING_MAX_HTL,
RND_IF_HTL_ABOVE,
MAX_CONN,
MIN_CONN,
SEED,
)
.await;
sim.with_start_backoff(Duration::from_millis(50));
let _handles = sim
.start_with_rand_gen::<rand::rngs::SmallRng>(SEED, 0, 0)
.await;
let mut checkpoint_medians: Vec<(u64, usize)> = Vec::new();
let mut elapsed_so_far = 0u64;
for &target_secs in &CHECKPOINTS_SECS {
let delta = target_secs - elapsed_so_far;
let_network_run(&mut sim, Duration::from_secs(delta)).await;
elapsed_so_far = target_secs;
let mut counts: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
counts.sort_unstable();
let median = if counts.is_empty() {
0
} else {
counts[counts.len() / 2]
};
tracing::info!(
"Checkpoint @{}m: median={}, counts={:?}",
target_secs / 60,
median,
counts
);
assert!(
median > 0 || elapsed_so_far <= 300,
"Network collapse at checkpoint @{}m: median=0. Counts: {:?}. Seed: 0x{:X}",
target_secs / 60,
counts,
SEED
);
checkpoint_medians.push((target_secs, median));
}
for window in checkpoint_medians.windows(2) {
let (prev_t, prev_median) = window[0];
let (curr_t, curr_median) = window[1];
assert!(
curr_median + 1 >= prev_median,
"Connection growth regressed > 1 between @{}m (median={}) and @{}m (median={}). \
Growth must be near-monotonic. Seed: 0x{:X}",
prev_t / 60,
prev_median,
curr_t / 60,
curr_median,
SEED
);
}
let (_, final_median) = checkpoint_medians.last().unwrap();
assert!(
*final_median >= MIN_CONN,
"Final median={} < min_connections={} after 60 virtual minutes. \
Checkpoints: {:?}. Seed: 0x{:X}",
final_median,
MIN_CONN,
checkpoint_medians,
SEED
);
tracing::info!("PASSED: checkpoints={:?}", checkpoint_medians);
}
#[cfg(feature = "nightly_tests")]
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_nightly_fault_recovery_speed() {
use freenet::dev_tool::NodeLabel;
use freenet::simulation::FaultConfig;
const SEED: u64 = 0x3511_FA17_0003;
const NETWORK_NAME: &str = "nightly-fault-recovery";
const GATEWAYS: usize = 4;
const NODES: usize = 50;
const RING_MAX_HTL: usize = 10;
const RND_IF_HTL_ABOVE: usize = 5;
const MAX_CONN: usize = 20;
const MIN_CONN: usize = 10;
tracing::info!("=== Nightly: Fault Recovery Speed ===");
setup_deterministic_state(SEED);
let mut sim = SimNetwork::new(
NETWORK_NAME,
GATEWAYS,
NODES,
RING_MAX_HTL,
RND_IF_HTL_ABOVE,
MAX_CONN,
MIN_CONN,
SEED,
)
.await;
sim.with_start_backoff(Duration::from_millis(50));
let _handles = sim
.start_with_rand_gen::<rand::rngs::SmallRng>(SEED, 0, 0)
.await;
tracing::info!("Phase 1: Convergence — 30 virtual minutes, no faults");
let_network_run(&mut sim, Duration::from_secs(1800)).await;
let mut pre_fault_counts: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
pre_fault_counts.sort_unstable();
assert!(
!pre_fault_counts.is_empty(),
"Phase 1: no connection managers available. Seed: 0x{:X}",
SEED
);
let pre_fault_median = pre_fault_counts[pre_fault_counts.len() / 2];
tracing::info!(
"Phase 1 done: median={}, counts={:?}",
pre_fault_median,
pre_fault_counts
);
assert!(
pre_fault_median >= MIN_CONN,
"Network did not converge before fault injection: median={} < min_connections={}. \
Counts: {:?}. Seed: 0x{:X}",
pre_fault_median,
MIN_CONN,
pre_fault_counts,
SEED
);
tracing::info!("Phase 2: Fault injection — 20% message loss for 5 virtual minutes");
sim.with_fault_injection(FaultConfig::builder().message_loss_rate(0.20).build());
let_network_run(&mut sim, Duration::from_secs(300)).await;
let mut during_fault_counts: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
during_fault_counts.sort_unstable();
assert_eq!(
during_fault_counts.len(),
NODES,
"Phase 2: expected connection managers for all {} nodes, got {}. Seed: 0x{:X}",
NODES,
during_fault_counts.len(),
SEED
);
let during_fault_median = during_fault_counts[during_fault_counts.len() / 2];
tracing::info!(
"Phase 2 done: median={}, counts={:?}",
during_fault_median,
during_fault_counts
);
let during_fault_floor = MIN_CONN / 2;
assert!(
during_fault_median >= during_fault_floor,
"Connection collapse under load: during_fault_median={} < MIN_CONN/2={}. \
Counts: {:?}. Seed: 0x{:X}",
during_fault_median,
during_fault_floor,
during_fault_counts,
SEED
);
tracing::info!("Phase 3: Recovery — faults cleared, 25 virtual minutes");
sim.clear_fault_injection();
let_network_run(&mut sim, Duration::from_secs(1500)).await;
let mut post_recovery_counts: Vec<usize> = (0..NODES)
.filter_map(|i| {
let label = NodeLabel::node(NETWORK_NAME, i);
sim.connection_count(&label)
})
.collect();
post_recovery_counts.sort_unstable();
assert_eq!(
post_recovery_counts.len(),
NODES,
"Phase 3: expected connection managers for all {} nodes, got {}. Seed: 0x{:X}",
NODES,
post_recovery_counts.len(),
SEED
);
let post_recovery_median = post_recovery_counts[post_recovery_counts.len() / 2];
let isolated_count = post_recovery_counts.iter().filter(|&&c| c == 0).count();
let fraction_isolated = isolated_count as f64 / NODES as f64;
tracing::info!(
"Phase 3 done: median={}, isolated={}/{} ({:.0}%), counts={:?}",
post_recovery_median,
isolated_count,
NODES,
fraction_isolated * 100.0,
post_recovery_counts
);
let recovery_floor = pre_fault_median.saturating_sub(1).max(MIN_CONN);
assert!(
post_recovery_median >= recovery_floor,
"Incomplete recovery: post_recovery_median={} < floor={} \
(max(pre_fault_median - 1, MIN_CONN)). \
Pre-fault: {:?}, Post-recovery: {:?}. Seed: 0x{:X}",
post_recovery_median,
recovery_floor,
pre_fault_counts,
post_recovery_counts,
SEED
);
assert!(
fraction_isolated < 0.05,
"{:.0}% of nodes isolated after recovery (threshold: 5%). \
Counts: {:?}. Seed: 0x{:X}",
fraction_isolated * 100.0,
post_recovery_counts,
SEED
);
tracing::info!(
"PASSED: pre_fault_median={}, during_fault_median={}, post_recovery_median={}, \
isolated={:.0}%",
pre_fault_median,
during_fault_median,
post_recovery_median,
fraction_isolated * 100.0
);
}