use freenet_stdlib::prelude::*;
use futures::Future;
use rand::prelude::IndexedRandom;
use std::{
collections::{BTreeMap, HashMap, HashSet},
net::{Ipv6Addr, SocketAddr},
num::NonZeroUsize,
pin::Pin,
sync::Arc,
time::Duration,
};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::info;
#[cfg(feature = "trace-ot")]
use crate::tracing::CombinedRegister;
use crate::{
client_events::test::{MemoryEventsGen, RandomEventGenerator},
config::{ConfigArgs, GlobalExecutor, GlobalRng},
dev_tool::TransportKeypair,
node::{InitPeerNode, NetEventRegister, NodeConfig},
ring::{ConnectionManager, Distance, Location, PeerKeyLocation},
simulation::{FaultConfig, VirtualTime},
tracing::TestEventListener,
transport::{
TransportPublicKey,
in_memory_socket::{register_network_time_source, unregister_network_time_source},
},
};
mod in_memory;
mod network;
pub mod turmoil_runner;
pub use self::network::{NetworkPeer, PeerMessage, PeerStatus};
pub use self::turmoil_runner::{TurmoilConfig, TurmoilResult, run_turmoil_simulation};
pub(crate) type EventId = u32;
#[derive(Clone, Debug)]
pub enum SimOperation {
Put {
contract: ContractContainer,
state: Vec<u8>,
subscribe: bool,
},
Get {
contract_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
},
Subscribe {
contract_id: ContractInstanceId,
},
Update {
key: ContractKey,
data: Vec<u8>,
},
SeedContract {
contract: ContractContainer,
state: Vec<u8>,
},
Disconnect,
}
impl SimOperation {
pub fn create_test_contract(seed: u8) -> ContractContainer {
let mut code_bytes = vec![0u8; 32];
let mut params_bytes = vec![0u8; 16];
for (i, byte) in code_bytes.iter_mut().enumerate() {
*byte = seed.wrapping_add(i as u8);
}
for (i, byte) in params_bytes.iter_mut().enumerate() {
*byte = seed.wrapping_add(i as u8).wrapping_mul(2);
}
let code = ContractCode::from(code_bytes);
let params = Parameters::from(params_bytes);
ContractWasmAPIVersion::V1(WrappedContract::new(code.into(), params)).into()
}
pub fn create_large_state(size_bytes: usize, seed: u8) -> Vec<u8> {
let mut state = Vec::with_capacity(size_bytes);
for i in 0..size_bytes {
state.push(seed.wrapping_add((i % 256) as u8).wrapping_mul(3));
}
state
}
pub fn create_test_state(seed: u8) -> Vec<u8> {
let mut state_bytes = vec![0u8; 64];
for (i, byte) in state_bytes.iter_mut().enumerate() {
*byte = seed.wrapping_add(i as u8).wrapping_mul(3);
}
state_bytes
}
pub fn create_crdt_state(version: u64, seed: u8) -> Vec<u8> {
let mut state_bytes = Vec::with_capacity(8 + 128);
state_bytes.extend_from_slice(&version.to_le_bytes());
for i in 0..128u8 {
state_bytes.push(seed.wrapping_add(i).wrapping_mul(3));
}
state_bytes
}
#[cfg(any(test, feature = "testing"))]
pub(crate) fn into_client_request(self) -> freenet_stdlib::client_api::ClientRequest<'static> {
use freenet_stdlib::client_api::{ClientRequest, ContractRequest};
match self {
SimOperation::Put {
contract,
state,
subscribe,
} => ClientRequest::ContractOp(ContractRequest::Put {
contract,
state: WrappedState::new(state),
related_contracts: RelatedContracts::new(),
subscribe,
blocking_subscribe: false,
}),
SimOperation::Get {
contract_id,
return_contract_code,
subscribe,
} => ClientRequest::ContractOp(ContractRequest::Get {
key: contract_id,
return_contract_code,
subscribe,
blocking_subscribe: false,
}),
SimOperation::Subscribe { contract_id } => {
ClientRequest::ContractOp(ContractRequest::Subscribe {
key: contract_id,
summary: None,
})
}
SimOperation::Update { key, data } => {
ClientRequest::ContractOp(ContractRequest::Update {
key,
data: UpdateData::State(State::from(data)),
})
}
SimOperation::SeedContract { .. } => {
panic!(
"SeedContract is not a client request — it must be handled \
by run_controlled_simulation before event dispatch"
)
}
SimOperation::Disconnect => ClientRequest::Disconnect { cause: None },
}
}
}
#[derive(Clone, Debug)]
pub struct ScheduledOperation {
pub node: NodeLabel,
pub operation: SimOperation,
}
impl ScheduledOperation {
pub fn new(node: NodeLabel, operation: SimOperation) -> Self {
Self { node, operation }
}
}
pub struct ControlledSimulationResult {
pub turmoil_result: turmoil::Result,
pub topology_snapshots: Vec<crate::ring::topology_registry::TopologySnapshot>,
pub node_storages: HashMap<NodeLabel, crate::wasm_runtime::MockStateStorage>,
}
#[derive(PartialEq, Eq, Hash, Clone, PartialOrd, Ord, Debug)]
pub struct NodeLabel(Arc<str>);
impl NodeLabel {
pub fn gateway(network_name: &str, id: usize) -> Self {
Self(format!("{network_name}-gateway-{id}").into())
}
pub fn node(network_name: &str, id: usize) -> Self {
Self(format!("{network_name}-node-{id}").into())
}
pub fn is_gateway(&self) -> bool {
self.0.contains("-gateway-")
}
pub fn is_node(&self) -> bool {
self.0.contains("-node-")
}
pub fn number(&self) -> usize {
self.0
.rsplit('-')
.next()
.expect("should have a number part")
.parse::<usize>()
.expect("last part should be a number")
}
}
impl std::fmt::Display for NodeLabel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::ops::Deref for NodeLabel {
type Target = str;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl<'a> From<&'a str> for NodeLabel {
fn from(value: &'a str) -> Self {
assert!(value.starts_with("gateway-") || value.starts_with("node-"));
let mut parts = value.split('-');
assert!(parts.next().is_some());
assert!(
parts
.next()
.map(|s| s.parse::<u16>())
.transpose()
.expect("should be an u16")
.is_some()
);
assert!(parts.next().is_none());
Self(value.to_string().into())
}
}
#[derive(Clone)]
pub(crate) struct GatewayConfig {
#[allow(dead_code)]
label: NodeLabel,
peer_key_location: PeerKeyLocation,
location: Location,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct EventSummary {
pub tx: crate::message::Transaction,
pub peer_addr: std::net::SocketAddr,
pub event_kind_name: String,
pub contract_key: Option<String>,
pub state_hash: Option<String>,
pub event_detail: String,
}
pub struct EventChain<S = watch::Sender<(EventId, TransportPublicKey)>> {
labels: Vec<(NodeLabel, TransportPublicKey)>,
user_ev_controller: S,
total_events: u32,
count: u32,
rng: rand::rngs::SmallRng,
clean_up_tmp_dirs: bool,
choice: Option<TransportPublicKey>,
}
impl<S> EventChain<S> {
pub fn new(
labels: Vec<(NodeLabel, TransportPublicKey)>,
user_ev_controller: S,
total_events: u32,
clean_up_tmp_dirs: bool,
) -> Self {
const SEED: u64 = 0xdeadbeef;
EventChain {
labels,
user_ev_controller,
total_events,
count: 0,
rng: rand::rngs::SmallRng::seed_from_u64(SEED),
clean_up_tmp_dirs,
choice: None,
}
}
fn increment_count(self: Pin<&mut Self>) {
unsafe {
let this = self.get_unchecked_mut();
this.count += 1;
}
}
fn choose_peer(self: Pin<&mut Self>) -> TransportPublicKey {
let this = unsafe { self.get_unchecked_mut() };
if let Some(id) = this.choice.take() {
return id;
}
let rng = &mut this.rng;
let labels = &mut this.labels;
let (_, id) = labels.choose(rng).expect("not empty");
id.clone()
}
fn set_choice(self: Pin<&mut Self>, id: TransportPublicKey) {
let this = unsafe { self.get_unchecked_mut() };
this.choice = Some(id);
}
}
trait EventSender {
fn send(
&self,
cx: &mut std::task::Context<'_>,
value: (EventId, TransportPublicKey),
) -> std::task::Poll<Result<(), ()>>;
}
impl EventSender for mpsc::Sender<(EventId, TransportPublicKey)> {
fn send(
&self,
cx: &mut std::task::Context<'_>,
value: (EventId, TransportPublicKey),
) -> std::task::Poll<Result<(), ()>> {
let f = self.send(value);
futures::pin_mut!(f);
f.poll(cx).map(|r| r.map_err(|_| ()))
}
}
impl EventSender for watch::Sender<(EventId, TransportPublicKey)> {
fn send(
&self,
_cx: &mut std::task::Context<'_>,
value: (EventId, TransportPublicKey),
) -> std::task::Poll<Result<(), ()>> {
match self.send(value) {
Ok(_) => std::task::Poll::Ready(Ok(())),
Err(_) => std::task::Poll::Ready(Err(())),
}
}
}
impl EventSender for broadcast::Sender<(EventId, TransportPublicKey)> {
fn send(
&self,
_cx: &mut std::task::Context<'_>,
value: (EventId, TransportPublicKey),
) -> std::task::Poll<Result<(), ()>> {
match self.send(value) {
Ok(_) => std::task::Poll::Ready(Ok(())),
Err(_) => std::task::Poll::Ready(Err(())),
}
}
}
impl<S: EventSender> futures::stream::Stream for EventChain<S> {
type Item = EventId;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.count < self.total_events {
let id = self.as_mut().choose_peer();
match self
.user_ev_controller
.send(cx, (self.count, id.clone()))
.map_err(|_| {
tracing::error!("peer controller should be alive, finishing event chain")
}) {
std::task::Poll::Ready(_) => {}
std::task::Poll::Pending => {
self.as_mut().set_choice(id);
return std::task::Poll::Pending;
}
}
self.as_mut().increment_count();
std::task::Poll::Ready(Some(self.count))
} else {
std::task::Poll::Ready(None)
}
}
}
impl<S> Drop for EventChain<S> {
fn drop(&mut self) {
if self.clean_up_tmp_dirs {
clean_up_tmp_dirs(self.labels.iter().map(|(l, _)| l));
}
}
}
pub struct ControlledEventChain {
user_ev_controller: watch::Sender<(EventId, TransportPublicKey)>,
event_sequence: Vec<(EventId, NodeLabel)>,
label_to_key: HashMap<NodeLabel, TransportPublicKey>,
current_index: usize,
}
impl ControlledEventChain {
pub fn new(
user_ev_controller: watch::Sender<(EventId, TransportPublicKey)>,
event_sequence: Vec<(EventId, NodeLabel)>,
label_to_key: HashMap<NodeLabel, TransportPublicKey>,
) -> Self {
Self {
user_ev_controller,
event_sequence,
label_to_key,
current_index: 0,
}
}
pub fn trigger_next(&mut self) -> Option<EventId> {
if self.current_index >= self.event_sequence.len() {
return None;
}
let (event_id, label) = &self.event_sequence[self.current_index];
let key = self
.label_to_key
.get(label)
.expect("Label should exist in mapping");
match self.user_ev_controller.send((*event_id, key.clone())) {
Ok(()) => {
self.current_index += 1;
Some(*event_id)
}
Err(e) => {
tracing::error!("Failed to send event {}: {:?}", event_id, e);
None
}
}
}
pub fn trigger_all(&mut self) -> usize {
let mut count = 0;
while self.trigger_next().is_some() {
count += 1;
}
count
}
pub fn is_complete(&self) -> bool {
self.current_index >= self.event_sequence.len()
}
pub fn remaining(&self) -> usize {
self.event_sequence.len().saturating_sub(self.current_index)
}
}
impl futures::stream::Stream for ControlledEventChain {
type Item = EventId;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.trigger_next() {
Some(event_id) => std::task::Poll::Ready(Some(event_id)),
None => std::task::Poll::Ready(None),
}
}
}
#[cfg(feature = "trace-ot")]
type DefaultRegistry = CombinedRegister<2>;
#[cfg(not(feature = "trace-ot"))]
type DefaultRegistry = TestEventListener;
pub(super) struct Builder<ER> {
pub config: NodeConfig,
contract_handler_name: String,
event_register: ER,
contracts: Vec<(ContractContainer, WrappedState, bool)>,
contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
pub rng_seed: u64,
pub network_name: String,
pub shared_cm: Option<Arc<parking_lot::Mutex<Option<crate::ring::ConnectionManager>>>>,
}
impl<ER: NetEventRegister> Builder<ER> {
pub fn build(
builder: NodeConfig,
event_register: ER,
contract_handler_name: String,
rng_seed: u64,
network_name: String,
) -> Builder<ER> {
Builder {
config: builder.clone(),
contract_handler_name,
event_register,
contracts: Vec::new(),
contract_subscribers: HashMap::new(),
rng_seed,
network_name,
shared_cm: None,
}
}
}
#[derive(Debug)]
pub struct RunningNode {
pub label: NodeLabel,
pub addr: SocketAddr,
pub abort_handle: tokio::task::AbortHandle,
}
#[derive(Clone)]
pub struct RestartableNodeConfig {
pub config: NodeConfig,
#[allow(dead_code)]
pub label: NodeLabel,
pub is_gateway: bool,
#[allow(dead_code)]
pub gateway_configs: Vec<GatewayConfig>,
pub rng_seed: u64,
pub shared_storage: crate::wasm_runtime::MockStateStorage,
}
#[cfg(any(test, feature = "testing"))]
struct DirectNodeState {
label: NodeLabel,
addr: SocketAddr,
is_gateway: bool,
permanently_dropped: bool,
}
#[derive(Debug, Clone)]
pub struct ChurnConfig {
pub crash_probability: f64,
pub tick_interval: Duration,
pub recovery_delay: Duration,
pub max_simultaneous_crashes: Option<usize>,
pub permanent_crash_rate: f64,
pub warmup_delay: Duration,
}
impl Default for ChurnConfig {
fn default() -> Self {
Self {
crash_probability: 0.1,
tick_interval: Duration::from_secs(5),
recovery_delay: Duration::from_secs(3),
max_simultaneous_crashes: None,
permanent_crash_rate: 0.05,
warmup_delay: Duration::from_secs(5),
}
}
}
pub struct SimNetwork {
name: String,
clean_up_tmp_dirs: bool,
labels: Vec<(NodeLabel, TransportPublicKey)>,
pub(crate) event_listener: TestEventListener,
user_ev_controller: Option<watch::Sender<(EventId, TransportPublicKey)>>,
receiver_ch: watch::Receiver<(EventId, TransportPublicKey)>,
number_of_gateways: usize,
gateways: Vec<(Builder<DefaultRegistry>, GatewayConfig)>,
number_of_nodes: usize,
nodes: Vec<(Builder<DefaultRegistry>, NodeLabel)>,
ring_max_htl: usize,
rnd_if_htl_above: usize,
max_connections: usize,
min_connections: usize,
start_backoff: Duration,
seed: u64,
virtual_time: VirtualTime,
running_nodes: HashMap<NodeLabel, RunningNode>,
node_addresses: HashMap<NodeLabel, SocketAddr>,
restartable_configs: HashMap<NodeLabel, RestartableNodeConfig>,
all_gateway_configs: Vec<GatewayConfig>,
pub streaming_threshold: Option<usize>,
connection_managers: HashMap<NodeLabel, ConnectionManager>,
pub use_mock_wasm: bool,
churn_config: Option<ChurnConfig>,
}
impl SimNetwork {
pub const DEFAULT_SEED: u64 = 0xDEADBEEF_CAFEBABE;
#[allow(clippy::too_many_arguments)]
pub async fn new(
name: &str,
gateways: usize,
nodes: usize,
ring_max_htl: usize,
rnd_if_htl_above: usize,
max_connections: usize,
min_connections: usize,
seed: u64,
) -> Self {
assert!(nodes > 0);
GlobalRng::set_seed(seed);
let (user_ev_controller, mut receiver_ch) =
watch::channel((0, TransportKeypair::new().public().clone()));
receiver_ch.borrow_and_update();
let virtual_time = VirtualTime::new();
register_network_time_source(name, virtual_time.clone());
let mut net = Self {
name: name.into(),
clean_up_tmp_dirs: true,
event_listener: TestEventListener::new().await,
labels: Vec::with_capacity(nodes + gateways),
user_ev_controller: Some(user_ev_controller),
receiver_ch,
number_of_gateways: gateways,
gateways: Vec::with_capacity(gateways),
number_of_nodes: nodes,
nodes: Vec::with_capacity(nodes),
ring_max_htl,
rnd_if_htl_above,
max_connections,
min_connections,
start_backoff: Duration::from_millis(1),
seed,
virtual_time,
running_nodes: HashMap::new(),
node_addresses: HashMap::new(),
restartable_configs: HashMap::new(),
all_gateway_configs: Vec::new(),
streaming_threshold: None,
connection_managers: HashMap::new(),
use_mock_wasm: false,
churn_config: None,
};
net.config_gateways(
gateways
.try_into()
.expect("should have at least one gateway"),
)
.await;
net.config_nodes(nodes).await;
net.init_default_fault_injection();
net
}
fn init_default_fault_injection(&mut self) {
use crate::node::network_bridge::{FaultInjectorState, set_fault_injector};
let fault_seed = self.seed.wrapping_add(0xFA01_7777);
let state = FaultInjectorState::new(FaultConfig::default(), fault_seed)
.with_virtual_time(self.virtual_time.clone());
set_fault_injector(
&self.name,
Some(std::sync::Arc::new(std::sync::Mutex::new(state))),
);
}
pub fn with_streaming_threshold(&mut self, threshold: usize) {
self.streaming_threshold = Some(threshold);
for (builder, _) in &mut self.gateways {
let old_config = &*builder.config.config;
let mut new_network_api = old_config.network_api.clone();
new_network_api.streaming_threshold = threshold;
let mut new_config = old_config.clone();
new_config.network_api = new_network_api;
builder.config.config = Arc::new(new_config);
}
for (builder, _) in &mut self.nodes {
let old_config = &*builder.config.config;
let mut new_network_api = old_config.network_api.clone();
new_network_api.streaming_threshold = threshold;
let mut new_config = old_config.clone();
new_config.network_api = new_network_api;
builder.config.config = Arc::new(new_config);
}
}
pub fn with_readiness_gating(&mut self, min: usize) {
for (builder, _) in &mut self.gateways {
builder.config.relay_ready_connections = Some(min);
}
for (builder, _) in &mut self.nodes {
builder.config.relay_ready_connections = Some(min);
}
}
fn derive_peer_seed(&self, peer_index: usize) -> u64 {
let mut seed = self.seed;
seed = seed.wrapping_add(peer_index as u64);
seed ^= seed >> 33;
seed = seed.wrapping_mul(0xff51afd7ed558ccd);
seed ^= seed >> 33;
seed = seed.wrapping_mul(0xc4ceb9fe1a85ec53);
seed ^= seed >> 33;
seed
}
}
impl SimNetwork {
pub fn with_start_backoff(&mut self, value: Duration) {
self.start_backoff = value;
}
pub fn with_churn(&mut self, config: ChurnConfig) -> &mut Self {
self.churn_config = Some(config);
self
}
pub fn virtual_time(&self) -> &VirtualTime {
&self.virtual_time
}
pub fn with_fault_injection(&mut self, config: FaultConfig) {
use crate::node::network_bridge::{FaultInjectorState, set_fault_injector};
let fault_seed = self.seed.wrapping_add(0xFA01_7777);
let state = FaultInjectorState::new(config, fault_seed)
.with_virtual_time(self.virtual_time.clone());
set_fault_injector(
&self.name,
Some(std::sync::Arc::new(std::sync::Mutex::new(state))),
);
}
pub fn clear_fault_injection(&mut self) {
self.init_default_fault_injection();
}
#[deprecated(
since = "0.1.0",
note = "VirtualTime is now always enabled. Use with_fault_injection() and virtual_time() instead."
)]
pub fn with_fault_injection_virtual_time(
&mut self,
config: FaultConfig,
virtual_time: VirtualTime,
) {
use crate::node::network_bridge::{FaultInjectorState, set_fault_injector};
let fault_seed = self.seed.wrapping_add(0xFA01_7777);
let state = FaultInjectorState::new(config, fault_seed).with_virtual_time(virtual_time);
set_fault_injector(
&self.name,
Some(std::sync::Arc::new(std::sync::Mutex::new(state))),
);
}
pub fn advance_virtual_time(&mut self) -> usize {
use crate::node::network_bridge::get_fault_injector;
if let Some(injector) = get_fault_injector(&self.name) {
let mut state = injector.lock().unwrap();
state.advance_time()
} else {
0
}
}
pub fn advance_time(&mut self, duration: Duration) -> usize {
self.virtual_time.advance(duration);
self.advance_virtual_time()
}
pub fn get_network_stats(&self) -> Option<crate::node::network_bridge::NetworkStats> {
use crate::node::network_bridge::get_fault_injector;
get_fault_injector(&self.name).map(|injector| {
let state = injector.lock().unwrap();
state.stats().clone()
})
}
pub fn reset_network_stats(&mut self) {
use crate::node::network_bridge::get_fault_injector;
if let Some(injector) = get_fault_injector(&self.name) {
let mut state = injector.lock().unwrap();
state.reset_stats();
}
}
pub fn crash_node(&mut self, label: &NodeLabel) -> bool {
use crate::node::network_bridge::get_fault_injector;
let addr = match self.node_addresses.get(label) {
Some(addr) => *addr,
None => {
tracing::warn!(?label, "Cannot crash node: address not found");
return false;
}
};
if let Some(injector) = get_fault_injector(&self.name) {
let mut state = injector.lock().unwrap();
state.config.crash_node(addr);
tracing::info!(?label, ?addr, "Node marked as crashed in fault injector");
}
if let Some(running) = self.running_nodes.remove(label) {
running.abort_handle.abort();
tracing::info!(?label, "Node task aborted");
true
} else {
tracing::warn!(
?label,
"Node not found in running_nodes (may not be started yet)"
);
false
}
}
pub fn recover_node(&mut self, label: &NodeLabel) -> bool {
use crate::node::network_bridge::get_fault_injector;
let addr = match self.node_addresses.get(label) {
Some(addr) => addr,
None => {
tracing::warn!(?label, "Cannot recover node: address not found");
return false;
}
};
if let Some(injector) = get_fault_injector(&self.name) {
let mut state = injector.lock().unwrap();
state.config.recover_node(addr);
tracing::info!(
?label,
?addr,
"Node recovered (no longer marked as crashed)"
);
true
} else {
false
}
}
pub fn is_node_crashed(&self, label: &NodeLabel) -> bool {
use crate::node::network_bridge::get_fault_injector;
let addr = match self.node_addresses.get(label) {
Some(addr) => addr,
None => return false,
};
if let Some(injector) = get_fault_injector(&self.name) {
let state = injector.lock().unwrap();
state.config.is_crashed(addr)
} else {
false
}
}
pub fn node_address(&self, label: &NodeLabel) -> Option<SocketAddr> {
self.node_addresses.get(label).copied()
}
pub fn all_node_addresses(&self) -> &HashMap<NodeLabel, SocketAddr> {
&self.node_addresses
}
pub async fn restart_node<R>(
&mut self,
label: &NodeLabel,
seed: u64,
max_contract_num: usize,
iterations: usize,
) -> Option<tokio::task::JoinHandle<anyhow::Result<()>>>
where
R: crate::client_events::test::RandomEventGenerator + Send + 'static,
{
use crate::node::network_bridge::get_fault_injector;
use crate::transport::in_memory_socket::unregister_socket;
let restart_config = match self.restartable_configs.get(label) {
Some(config) => config.clone(),
None => {
tracing::warn!(?label, "Cannot restart node: config not found");
return None;
}
};
let node_addr = match self.node_addresses.get(label) {
Some(addr) => *addr,
None => {
tracing::warn!(?label, "Cannot restart node: address not found");
return None;
}
};
tracing::info!(?label, ?node_addr, "Restarting node with persisted state");
unregister_socket(&self.name, &node_addr);
if let Some(injector) = get_fault_injector(&self.name) {
let mut state = injector.lock().unwrap();
state.config.recover_node(&node_addr);
}
let event_listener = {
#[cfg(feature = "trace-ot")]
{
use crate::tracing::OTEventRegister;
CombinedRegister::new([
self.event_listener.trait_clone(),
Box::new(OTEventRegister::new()),
])
}
#[cfg(not(feature = "trace-ot"))]
{
self.event_listener.clone()
}
};
let builder = Builder::build(
restart_config.config.clone(),
event_listener,
format!("{}-{}", self.name, label),
restart_config.rng_seed,
self.name.clone(),
);
let total_peer_num = self.labels.len();
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
self.receiver_ch.clone(),
restart_config.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(label.number(), total_peer_num, max_contract_num, iterations);
let span = if restart_config.is_gateway {
tracing::info_span!("in_mem_gateway_restart", %label)
} else {
tracing::info_span!("in_mem_node_restart", %label)
};
let shared_storage = restart_config.shared_storage.clone();
let node_task = async move {
builder
.run_node_with_shared_storage(user_events, span, shared_storage)
.await
};
let handle = GlobalExecutor::spawn(node_task);
self.running_nodes.insert(
label.clone(),
RunningNode {
label: label.clone(),
addr: node_addr,
abort_handle: handle.abort_handle(),
},
);
tracing::info!(?label, "Node restarted successfully with persisted state");
Some(handle)
}
#[allow(dead_code)]
pub(crate) fn connection_manager(&self, label: &NodeLabel) -> Option<&ConnectionManager> {
self.connection_managers.get(label)
}
pub fn has_connection_or_pending(&self, label: &NodeLabel, addr: SocketAddr) -> Option<bool> {
self.connection_managers
.get(label)
.map(|cm| cm.has_connection_or_pending(addr))
}
pub fn inject_stale_reservation(
&self,
label: &NodeLabel,
addr: SocketAddr,
location: Location,
age: Duration,
) -> bool {
if let Some(cm) = self.connection_managers.get(label) {
let created = tokio::time::Instant::now() - age;
cm.inject_reservation(addr, location, created);
true
} else {
false
}
}
pub fn connection_count(&self, label: &NodeLabel) -> Option<usize> {
self.connection_managers
.get(label)
.map(|cm| cm.connection_count())
}
pub fn reserved_connections_count(&self, label: &NodeLabel) -> Option<usize> {
self.connection_managers
.get(label)
.map(|cm| cm.get_reserved_connections())
}
pub fn cleanup_stale_reservations(&self, label: &NodeLabel) -> Option<usize> {
self.connection_managers
.get(label)
.map(|cm| cm.cleanup_stale_reservations())
}
pub fn should_accept(
&self,
label: &NodeLabel,
location: Location,
addr: SocketAddr,
) -> Option<bool> {
self.connection_managers
.get(label)
.map(|cm| cm.should_accept(location, addr))
}
pub fn can_restart(&self, label: &NodeLabel) -> bool {
self.restartable_configs.contains_key(label)
}
#[allow(unused)]
pub fn debug(&mut self) {
self.clean_up_tmp_dirs = false;
}
fn derive_deterministic_port(&self, peer_index: usize) -> u16 {
const BASE_PORT: u16 = 50000;
const PORT_RANGE: u16 = 10000;
let peer_seed = self.derive_peer_seed(peer_index);
BASE_PORT + ((peer_seed % PORT_RANGE as u64) as u16)
}
async fn config_gateways(&mut self, num: NonZeroUsize) {
info!("Building {} gateways", num);
let mut configs = Vec::with_capacity(num.into());
for node_no in 0..num.into() {
let label = NodeLabel::gateway(&self.name, node_no);
let port = self.derive_deterministic_port(node_no);
let keypair = crate::transport::TransportKeypair::new();
let addr: SocketAddr = (Ipv6Addr::LOCALHOST, port).into();
let peer_key_location = PeerKeyLocation::new(keypair.public().clone(), addr);
let location = Location::from_address(&addr);
self.node_addresses.insert(label.clone(), addr);
let config_args = ConfigArgs {
id: Some(format!("{label}")),
mode: Some(OperationMode::Local),
network_api: crate::config::NetworkArgs {
streaming_threshold: Some(self.streaming_threshold.unwrap_or(usize::MAX)),
..Default::default()
},
..Default::default()
};
let mut config = NodeConfig::new(config_args.build().await.unwrap())
.await
.unwrap();
config.key_pair = keypair;
config.network_listener_ip = Ipv6Addr::LOCALHOST.into();
config.network_listener_port = port;
config.with_own_addr(addr);
config
.with_location(location)
.max_hops_to_live(self.ring_max_htl)
.max_number_of_connections(self.max_connections)
.min_number_of_connections(self.min_connections)
.is_gateway()
.rnd_if_htl_above(self.rnd_if_htl_above);
config.relay_ready_connections = Some(0);
self.event_listener
.add_node(label.clone(), config.key_pair.public().clone());
configs.push((
config,
GatewayConfig {
label,
peer_key_location,
location,
},
));
}
for config in &mut configs {
config.0.should_connect = false;
}
let gateways: Vec<_> = configs.iter().map(|(_, gw)| gw.clone()).collect();
self.all_gateway_configs = gateways.clone();
for (this_node, this_config) in configs {
let event_listener = {
#[cfg(feature = "trace-ot")]
{
use crate::tracing::OTEventRegister;
CombinedRegister::new([
self.event_listener.trait_clone(),
Box::new(OTEventRegister::new()),
])
}
#[cfg(not(feature = "trace-ot"))]
{
self.event_listener.clone()
}
};
let peer_seed = self.derive_peer_seed(this_config.label.number());
let gateway = Builder::build(
this_node,
event_listener,
format!("{}-{label}", self.name, label = this_config.label),
peer_seed,
self.name.clone(),
);
self.gateways.push((gateway, this_config));
}
}
async fn config_nodes(&mut self, num: usize) {
info!("Building {} regular nodes", num);
let gateways: Vec<_> = self
.gateways
.iter()
.map(|(_node, config)| config)
.cloned()
.collect();
for node_no in self.number_of_gateways..num + self.number_of_gateways {
let label = NodeLabel::node(&self.name, node_no);
let config_args = ConfigArgs {
id: Some(format!("{label}")),
mode: Some(OperationMode::Local),
network_api: crate::config::NetworkArgs {
streaming_threshold: Some(self.streaming_threshold.unwrap_or(usize::MAX)),
..Default::default()
},
..Default::default()
};
let mut config = NodeConfig::new(config_args.build().await.unwrap())
.await
.unwrap();
for GatewayConfig {
peer_key_location,
location,
..
} in &gateways
{
config.add_gateway(InitPeerNode::new(peer_key_location.clone(), *location));
}
let port = self.derive_deterministic_port(node_no);
let addr: SocketAddr = (Ipv6Addr::LOCALHOST, port).into();
let location = Location::from_address(&addr);
config.network_listener_port = port;
config.network_listener_ip = Ipv6Addr::LOCALHOST.into();
config.key_pair = crate::transport::TransportKeypair::new();
config.with_own_addr(addr);
config
.with_location(location)
.max_hops_to_live(self.ring_max_htl)
.rnd_if_htl_above(self.rnd_if_htl_above)
.max_number_of_connections(self.max_connections);
config.relay_ready_connections = Some(0);
self.node_addresses.insert(label.clone(), addr);
self.event_listener
.add_node(label.clone(), config.key_pair.public().clone());
let event_listener = {
#[cfg(feature = "trace-ot")]
{
use crate::tracing::OTEventRegister;
CombinedRegister::new([
self.event_listener.trait_clone(),
Box::new(OTEventRegister::new()),
])
}
#[cfg(not(feature = "trace-ot"))]
{
self.event_listener.clone()
}
};
let peer_seed = self.derive_peer_seed(node_no);
let node = Builder::build(
config,
event_listener,
format!("{}-{label}", self.name),
peer_seed,
self.name.clone(),
);
self.nodes.push((node, label));
}
}
pub async fn start_with_rand_gen<R>(
&mut self,
seed: u64,
max_contract_num: usize,
iterations: usize,
) -> Vec<tokio::task::JoinHandle<anyhow::Result<()>>>
where
R: RandomEventGenerator + Send + 'static,
{
use crate::ring::topology_registry::set_current_network_name;
use crate::transport::in_memory_socket::is_socket_registered;
set_current_network_name(&self.name);
let total_peer_num = self.gateways.len() + self.nodes.len();
let mut peers = vec![];
let gateways: Vec<_> = self.gateways.drain(..).collect();
let mut gateway_addrs = Vec::with_capacity(gateways.len());
for (mut node, config) in gateways {
let label = config.label.clone();
let gateway_addr = *config
.peer_key_location
.peer_addr
.as_known()
.expect("Gateway should have known address");
gateway_addrs.push(gateway_addr);
tracing::debug!(peer = %label, addr = %gateway_addr, "starting gateway");
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
self.restartable_configs.insert(
label.clone(),
RestartableNodeConfig {
config: node.config.clone(),
label: label.clone(),
is_gateway: true,
gateway_configs: self.all_gateway_configs.clone(),
rng_seed: node.rng_seed,
shared_storage: shared_storage.clone(),
},
);
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
self.receiver_ch.clone(),
node.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(label.number(), total_peer_num, max_contract_num, iterations);
let span = tracing::info_span!("in_mem_gateway", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let shared_cm: Arc<parking_lot::Mutex<Option<ConnectionManager>>> =
Arc::new(parking_lot::Mutex::new(None));
node.shared_cm = Some(shared_cm.clone());
let node_task = async move {
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
};
let handle = GlobalExecutor::spawn(node_task);
self.running_nodes.insert(
label.clone(),
RunningNode {
label: label.clone(),
addr: gateway_addr,
abort_handle: handle.abort_handle(),
},
);
peers.push(handle);
let captured_cm = capture_shared_slot(&shared_cm, self.start_backoff, &label)
.await
.unwrap_or_else(|| {
panic!(
"SimNetwork: node {label} failed to publish ConnectionManager \
within the capture budget. This means the spawned `run_node` \
task panicked before reaching the `shared_cm` write at the top \
of run_node_with_shared_storage, or the task was never scheduled \
at all. Check preceding logs for the real failure."
)
});
self.connection_managers.insert(label, captured_cm);
}
let registration_timeout = Duration::from_secs(10);
let poll_interval = Duration::from_millis(10);
let start = tokio::time::Instant::now();
'wait_loop: loop {
let mut all_registered = true;
for addr in &gateway_addrs {
if !is_socket_registered(&self.name, addr) {
all_registered = false;
break;
}
}
if all_registered {
tracing::debug!("All {} gateways registered", gateway_addrs.len());
tokio::time::sleep(Duration::from_millis(100)).await;
tracing::debug!("Starting regular nodes");
break 'wait_loop;
}
if start.elapsed() > registration_timeout {
tracing::warn!(
"Timeout waiting for gateway registration, some may not be ready. \
Registered: {}/{}",
gateway_addrs
.iter()
.filter(|a| is_socket_registered(&self.name, a))
.count(),
gateway_addrs.len()
);
break 'wait_loop;
}
tokio::time::sleep(poll_interval).await;
}
let nodes: Vec<_> = self.nodes.drain(..).collect();
for (mut node, label) in nodes {
let node_addr = self
.node_addresses
.get(&label)
.copied()
.expect("Node address should be tracked");
tracing::debug!(peer = %label, addr = %node_addr, "starting regular node");
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
self.restartable_configs.insert(
label.clone(),
RestartableNodeConfig {
config: node.config.clone(),
label: label.clone(),
is_gateway: false,
gateway_configs: self.all_gateway_configs.clone(),
rng_seed: node.rng_seed,
shared_storage: shared_storage.clone(),
},
);
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
self.receiver_ch.clone(),
node.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(label.number(), total_peer_num, max_contract_num, iterations);
let span = tracing::info_span!("in_mem_node", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let shared_cm: Arc<parking_lot::Mutex<Option<ConnectionManager>>> =
Arc::new(parking_lot::Mutex::new(None));
node.shared_cm = Some(shared_cm.clone());
let node_task = async move {
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
};
let handle = GlobalExecutor::spawn(node_task);
self.running_nodes.insert(
label.clone(),
RunningNode {
label: label.clone(),
addr: node_addr,
abort_handle: handle.abort_handle(),
},
);
peers.push(handle);
let captured_cm = capture_shared_slot(&shared_cm, self.start_backoff, &label)
.await
.unwrap_or_else(|| {
panic!(
"SimNetwork: node {label} failed to publish ConnectionManager \
within the capture budget. This means the spawned `run_node` \
task panicked before reaching the `shared_cm` write at the top \
of run_node_with_shared_storage, or the task was never scheduled \
at all. Check preceding logs for the real failure."
)
});
self.connection_managers.insert(label, captured_cm);
}
self.labels.sort_by(|(a, _), (b, _)| a.cmp(b));
peers
}
#[cfg(any(test, feature = "testing"))]
pub async fn start_with_controlled_events(
&mut self,
operations: Vec<ScheduledOperation>,
) -> (Vec<tokio::task::JoinHandle<anyhow::Result<()>>>, usize) {
use crate::ring::topology_registry::set_current_network_name;
use crate::transport::in_memory_socket::is_socket_registered;
set_current_network_name(&self.name);
let num_operations = operations.len();
let mut peers = vec![];
let mut operations_by_node: HashMap<NodeLabel, Vec<(EventId, SimOperation)>> =
HashMap::new();
for (event_id, scheduled_op) in operations.into_iter().enumerate() {
operations_by_node
.entry(scheduled_op.node)
.or_default()
.push((event_id as EventId, scheduled_op.operation));
}
let gateways: Vec<_> = self.gateways.drain(..).collect();
let mut gateway_addrs = Vec::with_capacity(gateways.len());
for (node, config) in gateways {
let label = config.label.clone();
let gateway_addr = *config
.peer_key_location
.peer_addr
.as_known()
.expect("Gateway should have known address");
gateway_addrs.push(gateway_addr);
tracing::debug!(peer = %label, addr = %gateway_addr, "starting gateway with controlled events");
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
self.restartable_configs.insert(
label.clone(),
RestartableNodeConfig {
config: node.config.clone(),
label: label.clone(),
is_gateway: true,
gateway_configs: self.all_gateway_configs.clone(),
rng_seed: node.rng_seed,
shared_storage: shared_storage.clone(),
},
);
let mut user_events = MemoryEventsGen::new(
self.receiver_ch.clone(),
node.config.key_pair.public().clone(),
);
if let Some(node_ops) = operations_by_node.remove(&label) {
let events: Vec<_> = node_ops
.into_iter()
.map(|(id, op)| (id, op.into_client_request()))
.collect();
user_events.generate_events(events);
}
let span = tracing::info_span!("in_mem_gateway_controlled", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let node_task = async move {
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
};
let handle = GlobalExecutor::spawn(node_task);
self.running_nodes.insert(
label.clone(),
RunningNode {
label: label.clone(),
addr: gateway_addr,
abort_handle: handle.abort_handle(),
},
);
peers.push(handle);
tokio::time::sleep(self.start_backoff).await;
}
let registration_timeout = Duration::from_secs(10);
let poll_interval = Duration::from_millis(10);
let start = tokio::time::Instant::now();
'wait_loop: loop {
let mut all_registered = true;
for addr in &gateway_addrs {
if !is_socket_registered(&self.name, addr) {
all_registered = false;
break;
}
}
if all_registered {
tracing::debug!(
"All {} gateways registered in {:?}",
gateway_addrs.len(),
start.elapsed()
);
break 'wait_loop;
}
if start.elapsed() > registration_timeout {
tracing::warn!(
"Timeout waiting for gateway registration. {} of {} registered.",
gateway_addrs
.iter()
.filter(|a| is_socket_registered(&self.name, a))
.count(),
gateway_addrs.len()
);
break 'wait_loop;
}
tokio::time::sleep(poll_interval).await;
}
for (node, label) in std::mem::take(&mut self.nodes) {
let node_addr = *self
.node_addresses
.get(&label)
.expect("Node address should be tracked");
tracing::debug!(peer = %label, addr = %node_addr, "starting regular node with controlled events");
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
self.restartable_configs.insert(
label.clone(),
RestartableNodeConfig {
config: node.config.clone(),
label: label.clone(),
is_gateway: false,
gateway_configs: self.all_gateway_configs.clone(),
rng_seed: node.rng_seed,
shared_storage: shared_storage.clone(),
},
);
let mut user_events = MemoryEventsGen::new(
self.receiver_ch.clone(),
node.config.key_pair.public().clone(),
);
if let Some(node_ops) = operations_by_node.remove(&label) {
let events: Vec<_> = node_ops
.into_iter()
.map(|(id, op)| (id, op.into_client_request()))
.collect();
user_events.generate_events(events);
}
let span = tracing::info_span!("in_mem_node_controlled", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let node_task = async move {
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
};
let handle = GlobalExecutor::spawn(node_task);
self.running_nodes.insert(
label.clone(),
RunningNode {
label: label.clone(),
addr: node_addr,
abort_handle: handle.abort_handle(),
},
);
peers.push(handle);
tokio::time::sleep(self.start_backoff).await;
}
if !operations_by_node.is_empty() {
let unknown_nodes: Vec<_> = operations_by_node.keys().collect();
tracing::warn!(
"Operations scheduled for unknown nodes: {:?}",
unknown_nodes
);
}
self.labels.sort_by(|(a, _), (b, _)| a.cmp(b));
(peers, num_operations)
}
pub fn controlled_event_chain(
&mut self,
event_sequence: Vec<(EventId, NodeLabel)>,
) -> ControlledEventChain {
let user_ev_controller = self
.user_ev_controller
.take()
.expect("controller should be set");
let label_to_key: HashMap<_, _> = self.labels.iter().cloned().collect();
ControlledEventChain::new(user_ev_controller, event_sequence, label_to_key)
}
pub fn get_peer_locations(&self) -> Vec<f64> {
let mut locations = Vec::new();
for (builder, _) in &self.gateways {
if let Some(loc) = &builder.config.location {
locations.push(loc.as_f64());
}
}
for (builder, _) in &self.nodes {
if let Some(loc) = &builder.config.location {
locations.push(loc.as_f64());
}
}
locations
}
pub fn build_peers(&mut self) -> Vec<(NodeLabel, NodeConfig)> {
let gw = self.gateways.drain(..).map(|(n, c)| (n, c.label));
let mut peers = vec![];
for (builder, label) in gw.chain(self.nodes.drain(..)).collect::<Vec<_>>() {
let pub_key = builder.config.key_pair.public();
self.labels.push((label.clone(), pub_key.clone()));
peers.push((label, builder.config));
}
self.labels.sort_by(|(a, _), (b, _)| a.cmp(b));
peers.sort_by(|(a, _), (b, _)| a.cmp(b));
peers
}
pub fn node_connectivity(
&self,
) -> HashMap<NodeLabel, (TransportPublicKey, HashMap<NodeLabel, Distance>)> {
let mut peers_connections = HashMap::with_capacity(self.labels.len());
let key_to_label: HashMap<_, _> = self.labels.iter().map(|(k, v)| (v, k)).collect();
for (label, key) in &self.labels {
let conns = self
.event_listener
.connections(key)
.map(|(k, d)| (key_to_label[k.pub_key()].clone(), d))
.collect::<HashMap<_, _>>();
peers_connections.insert(label.clone(), (key.clone(), conns));
}
peers_connections
}
pub fn neighbor_peer_keys(&self, label: &NodeLabel) -> Option<HashSet<TransportPublicKey>> {
let key = self
.labels
.iter()
.find(|(l, _)| l == label)
.map(|(_, k)| k)?;
Some(
self.event_listener
.connections(key)
.map(|(k, _)| k.pub_key().clone())
.collect(),
)
}
pub fn event_chain(
&mut self,
total_events: u32,
controller: Option<watch::Sender<(EventId, TransportPublicKey)>>,
) -> EventChain {
let user_ev_controller = controller.unwrap_or_else(|| {
self.user_ev_controller
.take()
.expect("controller should be set")
});
let labels = self.labels.clone();
EventChain::new(labels, user_ev_controller, total_events, false)
}
#[deprecated(
since = "0.1.0",
note = "Use event_chain(&mut self) instead to retain access to SimNetwork for verification"
)]
pub fn into_event_chain(
mut self,
total_events: u32,
controller: Option<watch::Sender<(EventId, TransportPublicKey)>>,
) -> EventChain {
let user_ev_controller = controller.unwrap_or_else(|| {
self.user_ev_controller
.take()
.expect("controller should be set")
});
let labels = std::mem::take(&mut self.labels);
let debug_val = self.clean_up_tmp_dirs;
self.clean_up_tmp_dirs = false; EventChain::new(labels, user_ev_controller, total_events, debug_val)
}
pub async fn check_connectivity(&mut self, time_out: Duration) -> anyhow::Result<()> {
self.connectivity(time_out, 1.0).await
}
pub async fn check_partial_connectivity(
&mut self,
time_out: Duration,
percent: f64,
) -> anyhow::Result<()> {
self.connectivity(time_out, percent).await
}
async fn connectivity(&mut self, time_out: Duration, percent: f64) -> anyhow::Result<()> {
let num_nodes = self.number_of_nodes;
let mut connected = HashSet::new();
let elapsed = std::time::Instant::now();
let time_step = Duration::from_millis(100);
while elapsed.elapsed() < time_out && (connected.len() as f64 / num_nodes as f64) < percent
{
self.advance_time(time_step);
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(10)).await;
for node in self.number_of_gateways..num_nodes + self.number_of_gateways {
if !connected.contains(&node)
&& self.is_connected(&NodeLabel::node(&self.name, node))
{
connected.insert(node);
}
}
}
let expected =
HashSet::from_iter(self.number_of_gateways..num_nodes + self.number_of_gateways);
let mut missing: Vec<_> = expected
.difference(&connected)
.map(|n| format!("{}-node-{n}", self.name))
.collect();
tracing::info!("Number of simulated nodes: {num_nodes}");
let connected_percent = connected.len() as f64 / num_nodes as f64;
if connected_percent < (percent - 0.01) {
missing.sort();
let show_max = missing.len().min(100);
tracing::error!("Nodes without connection: {:?}(..)", &missing[..show_max],);
tracing::error!(
"Total nodes without connection: {:?}, ({:.1}% connected < {:.1}% required)",
missing.len(),
connected_percent * 100.0,
percent * 100.0
);
anyhow::bail!("found disconnected nodes");
}
tracing::info!(
"Required time for connecting all peers: {} secs",
elapsed.elapsed().as_secs()
);
Ok(())
}
pub fn is_connected(&self, peer: &NodeLabel) -> bool {
let pos = self
.labels
.binary_search_by(|(label, _)| label.cmp(peer))
.expect("peer not found");
self.event_listener.is_connected(&self.labels[pos].1)
}
pub async fn get_event_logs(&self) -> Vec<crate::tracing::NetLogMessage> {
self.event_listener.logs.lock().await.clone()
}
pub async fn get_deterministic_event_summary(&self) -> Vec<EventSummary> {
let logs = self.event_listener.logs.lock().await;
let mut summaries: Vec<EventSummary> = logs
.iter()
.map(|log| {
let event_detail = format!("{:?}", log.kind);
let event_kind_name = log.kind.variant_name().to_string();
let contract_key = log.kind.contract_key().map(|k| format!("{:?}", k));
let state_hash = log.kind.state_hash().map(String::from);
EventSummary {
tx: log.tx,
peer_addr: log.peer_id.socket_addr(),
event_kind_name,
contract_key,
state_hash,
event_detail,
}
})
.collect();
summaries.sort();
summaries
}
pub async fn get_event_counts(&self) -> std::collections::HashMap<String, usize> {
let logs = self.event_listener.logs.lock().await;
let mut counts = std::collections::HashMap::new();
for log in logs.iter() {
let key = log.kind.variant_name();
*counts.entry(key.to_string()).or_insert(0) += 1;
}
counts
}
pub fn event_logs_handle(&self) -> Arc<tokio::sync::Mutex<Vec<crate::tracing::NetLogMessage>>> {
self.event_listener.logs.clone()
}
pub fn network_connectivity_quality(&self) -> anyhow::Result<()> {
const HIGHER_THAN_MIN_THRESHOLD: f64 = 0.5;
let num_nodes = self.number_of_nodes;
if num_nodes == 0 {
anyhow::bail!("cannot check connectivity quality with zero nodes");
}
let min_connections_threshold = (num_nodes as f64 * HIGHER_THAN_MIN_THRESHOLD) as usize;
let node_connectivity = self.node_connectivity();
let mut connections_per_peer: Vec<_> = node_connectivity
.iter()
.map(|(k, v)| (k, v.1.len()))
.filter(|&(k, _)| !k.is_gateway())
.map(|(_, v)| v)
.collect();
if connections_per_peer.is_empty() {
anyhow::bail!("no non-gateway nodes found for connectivity check");
}
connections_per_peer.sort_unstable_by_key(|num_conn| *num_conn);
if connections_per_peer
.get(min_connections_threshold)
.copied()
.unwrap_or(0)
< self.min_connections
{
tracing::error!(
"Low connectivity; more than {:.0}% of the nodes don't have more than minimum connections",
HIGHER_THAN_MIN_THRESHOLD * 100.0
);
anyhow::bail!("low connectivity");
} else {
let idx = connections_per_peer[min_connections_threshold..]
.iter()
.position(|num_conn| *num_conn < self.min_connections)
.unwrap_or_else(|| connections_per_peer[min_connections_threshold..].len() - 1)
+ (min_connections_threshold - 1);
let percentile = idx as f64 / connections_per_peer.len() as f64 * 100.0;
tracing::info!("{percentile:.0}% nodes have higher than required minimum connections");
}
let expected_avg_connections =
((self.max_connections - self.min_connections) / 2) + self.min_connections;
let avg_connections: usize = connections_per_peer.iter().sum::<usize>() / num_nodes;
if avg_connections < expected_avg_connections {
tracing::warn!(
"Average number of connections ({avg_connections}) is low (< {expected_avg_connections})"
);
}
Ok(())
}
pub async fn check_convergence(&self) -> ConvergenceResult {
let logs = self.event_listener.logs.lock().await;
let mut contract_states: BTreeMap<String, BTreeMap<SocketAddr, String>> = BTreeMap::new();
for log in logs.iter() {
let contract_key = log.kind.contract_key().map(|k| format!("{:?}", k));
let state_hash = log.kind.stored_state_hash().map(String::from);
if let (Some(contract_key), Some(state_hash)) = (contract_key, state_hash) {
contract_states
.entry(contract_key)
.or_default()
.insert(log.peer_id.socket_addr(), state_hash);
}
}
let mut converged = Vec::new();
let mut diverged = Vec::new();
for (contract_key, peer_states) in contract_states {
if peer_states.len() < 2 {
continue;
}
let unique_states: HashSet<&String> = peer_states.values().collect();
if unique_states.len() == 1 {
let state = unique_states.into_iter().next().unwrap().clone();
converged.push(ConvergedContract {
contract_key,
state_hash: state,
replica_count: peer_states.len(),
});
} else {
diverged.push(DivergedContract {
contract_key,
peer_states: peer_states.into_iter().collect(),
});
}
}
ConvergenceResult {
converged,
diverged,
}
}
pub async fn await_convergence(
&self,
timeout: Duration,
poll_interval: Duration,
min_contracts: usize,
) -> Result<ConvergenceResult, ConvergenceResult> {
let start = tokio::time::Instant::now();
loop {
let result = self.check_convergence().await;
let total_replicated = result.converged.len() + result.diverged.len();
if total_replicated >= min_contracts && result.diverged.is_empty() {
tracing::info!(
"Convergence achieved: {} contracts converged in {:?}",
result.converged.len(),
start.elapsed()
);
return Ok(result);
}
if start.elapsed() >= timeout {
tracing::warn!(
"Convergence timeout after {:?}: {} converged, {} diverged",
timeout,
result.converged.len(),
result.diverged.len()
);
return Err(result);
}
tokio::time::sleep(poll_interval).await;
}
}
pub async fn assert_convergence(&self, timeout: Duration, poll_interval: Duration) {
match self.await_convergence(timeout, poll_interval, 1).await {
Ok(result) => {
tracing::info!(
"Convergence assertion passed: {} contracts",
result.converged.len()
);
}
Err(result) => {
let diverged_details: Vec<String> = result
.diverged
.iter()
.map(|d| {
format!(
"Contract {}: {} different states across {} peers",
d.contract_key,
d.unique_state_count(),
d.peer_states.len()
)
})
.collect();
panic!(
"Convergence assertion failed after {:?}: {} contracts diverged\n{}",
timeout,
result.diverged.len(),
diverged_details.join("\n")
);
}
}
}
pub async fn convergence_rate(&self) -> f64 {
let result = self.check_convergence().await;
let total = result.converged.len() + result.diverged.len();
if total == 0 {
return 1.0; }
result.converged.len() as f64 / total as f64
}
pub async fn verify_state(&self) -> crate::tracing::VerificationReport {
let logs = self.event_listener.logs.lock().await;
let verifier = crate::tracing::StateVerifier::from_events(logs.clone());
verifier.verify()
}
pub async fn assert_state_verified(&self) {
let report = self.verify_state().await;
if !report.is_clean() {
panic!(
"State verification failed with {} anomalies:\n{}",
report.anomalies.len(),
report.display()
);
}
}
pub async fn count_subscribed_contracts(&self) -> usize {
use std::collections::HashSet;
let logs = self.event_listener.logs.lock().await;
let mut subscribed_contracts: HashSet<String> = HashSet::new();
for log in logs.iter() {
if let crate::tracing::EventKind::Subscribe(
crate::tracing::SubscribeEvent::SubscribeSuccess { key, .. },
) = &log.kind
{
subscribed_contracts.insert(format!("{:?}", key));
}
}
subscribed_contracts.len()
}
pub async fn get_contract_state_hashes(
&self,
) -> BTreeMap<String, BTreeMap<SocketAddr, String>> {
let summary = self.get_deterministic_event_summary().await;
let mut contract_states: BTreeMap<String, BTreeMap<SocketAddr, String>> = BTreeMap::new();
for event in &summary {
if let (Some(contract_key), Some(state_hash)) = (&event.contract_key, &event.state_hash)
{
contract_states
.entry(contract_key.clone())
.or_default()
.insert(event.peer_addr, state_hash.clone());
}
}
contract_states
}
pub async fn get_contract_distribution(&self) -> Vec<ContractDistribution> {
let states = self.get_contract_state_hashes().await;
states
.into_iter()
.map(|(contract_key, peer_states)| ContractDistribution {
contract_key,
replica_count: peer_states.len(),
peers: peer_states.keys().cloned().collect(),
})
.collect()
}
pub async fn get_operation_summary(&self) -> OperationSummary {
let logs = self.event_listener.logs.lock().await;
let mut summary = OperationSummary::default();
for log in logs.iter() {
match &log.kind {
crate::tracing::EventKind::Put(put_event) => {
use crate::tracing::PutEvent;
match put_event {
PutEvent::Request { .. } => summary.put.requested += 1,
PutEvent::PutSuccess { .. } => summary.put.succeeded += 1,
PutEvent::PutFailure { .. } => summary.put.failed += 1,
PutEvent::ResponseSent { .. } => {} PutEvent::BroadcastEmitted { .. } => summary.put.broadcasts_emitted += 1,
PutEvent::BroadcastReceived { .. } => summary.put.broadcasts_received += 1,
}
}
crate::tracing::EventKind::Get(get_event) => {
use crate::tracing::GetEvent;
match get_event {
GetEvent::Request { .. } => summary.get.requested += 1,
GetEvent::GetSuccess { .. } => summary.get.succeeded += 1,
GetEvent::GetFailure { .. } => summary.get.failed += 1,
GetEvent::GetNotFound { .. }
| GetEvent::ResponseSent { .. }
| GetEvent::ForwardingAckSent { .. }
| GetEvent::ForwardingAckReceived { .. } => {}
}
}
crate::tracing::EventKind::Subscribe(sub_event) => {
use crate::tracing::SubscribeEvent;
match sub_event {
SubscribeEvent::Request { .. } => summary.subscribe.requested += 1,
SubscribeEvent::SubscribeSuccess { .. } => summary.subscribe.succeeded += 1,
SubscribeEvent::SubscribeNotFound { .. } => summary.subscribe.failed += 1,
SubscribeEvent::ResponseSent { .. }
| SubscribeEvent::HostingStarted { .. }
| SubscribeEvent::HostingStopped { .. }
| SubscribeEvent::_Reserved6
| SubscribeEvent::_Reserved7
| SubscribeEvent::_Reserved8
| SubscribeEvent::_Reserved9
| SubscribeEvent::_Reserved10
| SubscribeEvent::UnsubscribeSent { .. }
| SubscribeEvent::UnsubscribeReceived { .. } => {}
}
}
crate::tracing::EventKind::Update(update_event) => {
use crate::tracing::UpdateEvent;
match update_event {
UpdateEvent::Request { .. } => summary.update.requested += 1,
UpdateEvent::UpdateSuccess { .. } => summary.update.succeeded += 1,
UpdateEvent::BroadcastReceived { .. } => {
summary.update.broadcasts_received += 1
}
UpdateEvent::BroadcastEmitted { .. } => {
summary.update.broadcasts_emitted += 1
}
UpdateEvent::UpdateFailure { .. }
| UpdateEvent::BroadcastComplete { .. }
| UpdateEvent::BroadcastApplied { .. }
| UpdateEvent::BroadcastDeliverySummary { .. } => {}
}
}
crate::tracing::EventKind::Timeout { .. } => {
summary.timeouts += 1;
}
crate::tracing::EventKind::Connect(_)
| crate::tracing::EventKind::Route(_)
| crate::tracing::EventKind::Transfer(_)
| crate::tracing::EventKind::Lifecycle(_)
| crate::tracing::EventKind::Ignored
| crate::tracing::EventKind::Disconnected { .. }
| crate::tracing::EventKind::TransportSnapshot(_)
| crate::tracing::EventKind::InterestSync(_)
| crate::tracing::EventKind::RoutingDecision(_)
| crate::tracing::EventKind::RouterSnapshot(_) => {}
}
}
summary
}
pub async fn operation_completion_status(&self) -> (usize, usize) {
let summary = self.get_operation_summary().await;
let completed = summary.total_completed();
let requested = summary.total_requested();
let pending = requested.saturating_sub(completed);
(completed, pending)
}
pub async fn operation_success_rate(&self) -> f64 {
let summary = self.get_operation_summary().await;
summary.overall_success_rate()
}
pub async fn await_operation_completion(
&self,
timeout: Duration,
poll_interval: Duration,
) -> Result<OperationSummary, OperationSummary> {
let start = tokio::time::Instant::now();
loop {
let summary = self.get_operation_summary().await;
let (completed, pending) = (summary.total_completed(), summary.total_requested());
if pending == 0 || completed >= pending {
return Ok(summary);
}
if start.elapsed() >= timeout {
tracing::warn!(
"Operation completion timeout: {} completed, {} pending",
completed,
pending.saturating_sub(completed)
);
return Err(summary);
}
tokio::time::sleep(poll_interval).await;
}
}
pub async fn await_network_quiescence(
&self,
timeout: Duration,
quiescence_duration: Duration,
poll_interval: Duration,
) -> Result<usize, usize> {
let start = tokio::time::Instant::now();
let mut last_log_count = 0usize;
let mut quiet_since = tokio::time::Instant::now();
loop {
let current_count = self.event_listener.logs.lock().await.len();
if current_count != last_log_count {
last_log_count = current_count;
quiet_since = tokio::time::Instant::now();
} else if quiet_since.elapsed() >= quiescence_duration {
tracing::info!(
"Network quiesced after {:?} with {} log entries",
start.elapsed(),
current_count
);
return Ok(current_count);
}
if start.elapsed() >= timeout {
tracing::warn!(
"Network quiescence timeout after {:?}: {} log entries, still active",
timeout,
current_count
);
return Err(current_count);
}
tokio::time::sleep(poll_interval).await;
}
}
pub async fn assert_operation_success_rate(&self, min_rate: f64) {
let summary = self.get_operation_summary().await;
let rate = summary.overall_success_rate();
if rate < min_rate {
panic!(
"Operation success rate {:.1}% is below threshold {:.1}%\n\
Put: {}/{} ({:.1}%), Get: {}/{} ({:.1}%), \
Subscribe: {}/{} ({:.1}%), Update: {}/{} ({:.1}%)",
rate * 100.0,
min_rate * 100.0,
summary.put.succeeded,
summary.put.completed(),
summary.put.success_rate() * 100.0,
summary.get.succeeded,
summary.get.completed(),
summary.get.success_rate() * 100.0,
summary.subscribe.succeeded,
summary.subscribe.completed(),
summary.subscribe.success_rate() * 100.0,
summary.update.succeeded,
summary.update.completed(),
summary.update.success_rate() * 100.0,
);
}
}
pub fn run_simulation<R, F, Fut>(
mut self,
seed: u64,
max_contract_num: usize,
iterations: usize,
simulation_duration: Duration,
event_wait: Duration,
test_fn: F,
) -> turmoil::Result
where
R: RandomEventGenerator + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<Output = turmoil::Result> + 'static,
{
use crate::config::{GlobalRng, GlobalSimulationTime};
use std::sync::Mutex;
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000; const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000; let epoch_offset = seed % RANGE_MS;
GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + epoch_offset);
let mut sim = turmoil::Builder::new()
.simulation_duration(simulation_duration)
.rng_seed(seed)
.build();
let total_peer_num = self.gateways.len() + self.nodes.len();
let gateways: Vec<_> = self.gateways.drain(..).collect();
for (node, config) in gateways {
let label = config.label.clone();
let host_name = label.to_string();
let receiver_ch = self.receiver_ch.clone();
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
receiver_ch,
node.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(label.number(), total_peer_num, max_contract_num, iterations);
let span = tracing::info_span!("turmoil_gateway", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let node = Arc::new(Mutex::new(Some(node)));
let user_events = Arc::new(Mutex::new(Some(user_events)));
let span = Arc::new(Mutex::new(Some(span)));
let shared_storage = Arc::new(Mutex::new(Some(shared_storage)));
sim.host(host_name, move || {
let node = node.clone();
let user_events = user_events.clone();
let span = span.clone();
let shared_storage = shared_storage.clone();
async move {
let node = node
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let user_events = user_events
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let span = span
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let shared_storage = shared_storage
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
.map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error>
})
}
});
}
let nodes: Vec<_> = self.nodes.drain(..).collect();
for (node, label) in nodes {
let host_name = label.to_string();
let receiver_ch = self.receiver_ch.clone();
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
receiver_ch,
node.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(label.number(), total_peer_num, max_contract_num, iterations);
let span = tracing::info_span!("turmoil_node", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let node = Arc::new(Mutex::new(Some(node)));
let user_events = Arc::new(Mutex::new(Some(user_events)));
let span = Arc::new(Mutex::new(Some(span)));
let shared_storage = Arc::new(Mutex::new(Some(shared_storage)));
sim.host(host_name, move || {
let node = node.clone();
let user_events = user_events.clone();
let span = span.clone();
let shared_storage = shared_storage.clone();
async move {
let node = node
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let user_events = user_events
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let span = span
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let shared_storage = shared_storage
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
.map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error>
})
}
});
}
let user_ev_controller = self
.user_ev_controller
.take()
.expect("user_ev_controller should be set");
let labels: Vec<_> = self.labels.clone();
sim.client("test", async move {
tokio::time::sleep(Duration::from_secs(2)).await;
use rand::SeedableRng;
use rand::prelude::*;
let mut event_rng = <rand::rngs::SmallRng as SeedableRng>::seed_from_u64(seed);
for event_id in 0..iterations as u32 {
if let Some((_, peer_key)) = labels.choose(&mut event_rng) {
if user_ev_controller
.send((event_id, peer_key.clone()))
.is_err()
{
tracing::warn!(event_id, "Failed to send event signal - receivers dropped");
break;
}
tokio::time::sleep(event_wait).await;
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
test_fn().await
});
sim.run()
}
#[cfg(any(test, feature = "testing"))]
pub fn run_controlled_simulation(
mut self,
seed: u64,
operations: Vec<ScheduledOperation>,
simulation_duration: Duration,
post_operations_wait: Duration,
) -> ControlledSimulationResult {
use crate::config::{GlobalRng, GlobalSimulationTime};
use crate::ring::topology_registry::{
get_all_topology_snapshots, set_current_network_name,
};
use std::collections::HashMap;
use std::sync::Mutex;
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000; const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000; let epoch_offset = seed % RANGE_MS;
GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + epoch_offset);
set_current_network_name(&self.name);
let network_name = self.name.clone();
let mut sim = turmoil::Builder::new()
.simulation_duration(simulation_duration)
.rng_seed(seed)
.build();
let mut seed_ops: Vec<(NodeLabel, ContractContainer, Vec<u8>)> = Vec::new();
let mut event_ops: Vec<ScheduledOperation> = Vec::new();
for scheduled_op in operations {
match scheduled_op.operation {
SimOperation::SeedContract {
ref contract,
ref state,
} => {
seed_ops.push((scheduled_op.node, contract.clone(), state.clone()));
}
SimOperation::Put { .. }
| SimOperation::Get { .. }
| SimOperation::Subscribe { .. }
| SimOperation::Update { .. }
| SimOperation::Disconnect => event_ops.push(scheduled_op),
}
}
let mut operations_by_node: HashMap<NodeLabel, Vec<(EventId, SimOperation)>> =
HashMap::new();
let mut operation_sequence: Vec<(EventId, NodeLabel)> = Vec::new();
for (event_id, scheduled_op) in event_ops.into_iter().enumerate() {
let event_id = event_id as EventId;
operation_sequence.push((event_id, scheduled_op.node.clone()));
operations_by_node
.entry(scheduled_op.node)
.or_default()
.push((event_id, scheduled_op.operation));
}
let mut node_storages: HashMap<NodeLabel, crate::wasm_runtime::MockStateStorage> =
HashMap::new();
let contract_stores: Arc<
Mutex<HashMap<NodeLabel, crate::wasm_runtime::InMemoryContractStore>>,
> = Arc::new(Mutex::new(HashMap::new()));
let use_mock_wasm = self.use_mock_wasm;
let gateways: Vec<_> = self.gateways.drain(..).collect();
for (node, config) in gateways {
let label = config.label.clone();
let host_name = label.to_string();
let receiver_ch = self.receiver_ch.clone();
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
node_storages.insert(label.clone(), shared_storage.clone());
let mut user_events =
MemoryEventsGen::new(receiver_ch.clone(), node.config.key_pair.public().clone());
if let Some(node_ops) = operations_by_node.remove(&label) {
let events: Vec<_> = node_ops
.into_iter()
.map(|(id, op)| (id, op.into_client_request()))
.collect();
user_events.generate_events(events);
}
let span = tracing::info_span!("turmoil_gateway_controlled", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let node = Arc::new(Mutex::new(Some(node)));
let user_events = Arc::new(Mutex::new(Some(user_events)));
let span = Arc::new(Mutex::new(Some(span)));
let shared_storage = Arc::new(Mutex::new(Some(shared_storage)));
let contract_stores = contract_stores.clone();
let label_for_closure = label.clone();
sim.host(host_name, move || {
let node = node.clone();
let user_events = user_events.clone();
let span = span.clone();
let shared_storage = shared_storage.clone();
let contract_stores = contract_stores.clone();
let label = label_for_closure.clone();
async move {
let node = node
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let user_events = user_events
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let span = span
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let shared_storage = shared_storage
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let cs = contract_stores.lock().unwrap().remove(&label);
if use_mock_wasm {
node.run_node_with_mock_wasm(user_events, span, shared_storage, cs)
.await
.map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error>
})
} else {
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
.map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error>
})
}
}
});
}
let nodes: Vec<_> = self.nodes.drain(..).collect();
for (node, label) in nodes {
let host_name = label.to_string();
let receiver_ch = self.receiver_ch.clone();
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
node_storages.insert(label.clone(), shared_storage.clone());
let mut user_events =
MemoryEventsGen::new(receiver_ch.clone(), node.config.key_pair.public().clone());
if let Some(node_ops) = operations_by_node.remove(&label) {
let events: Vec<_> = node_ops
.into_iter()
.map(|(id, op)| (id, op.into_client_request()))
.collect();
user_events.generate_events(events);
}
let span = tracing::info_span!("turmoil_node_controlled", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let node = Arc::new(Mutex::new(Some(node)));
let user_events = Arc::new(Mutex::new(Some(user_events)));
let span = Arc::new(Mutex::new(Some(span)));
let shared_storage = Arc::new(Mutex::new(Some(shared_storage)));
let contract_stores = contract_stores.clone();
let label_for_closure = label.clone();
sim.host(host_name, move || {
let node = node.clone();
let user_events = user_events.clone();
let span = span.clone();
let shared_storage = shared_storage.clone();
let contract_stores = contract_stores.clone();
let label = label_for_closure.clone();
async move {
let node = node
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let user_events = user_events
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let span = span
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let shared_storage = shared_storage
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let cs = contract_stores.lock().unwrap().remove(&label);
if use_mock_wasm {
node.run_node_with_mock_wasm(user_events, span, shared_storage, cs)
.await
.map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error>
})
} else {
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
.map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error>
})
}
}
});
}
for (label, contract, state) in seed_ops {
let storage = node_storages
.get(&label)
.unwrap_or_else(|| panic!("SeedContract: node {label:?} not found in storages"));
let key = contract.key();
storage.seed_state(key, WrappedState::new(state));
storage.seed_params(key, contract.params().clone());
let mut stores = contract_stores.lock().unwrap();
let contract_store = stores.entry(label.clone()).or_default();
contract_store
.store_contract(contract.clone())
.unwrap_or_else(|e| {
panic!("SeedContract: failed to store contract for {label:?}: {e}")
});
tracing::debug!(
node = %label,
contract = %key,
"SeedContract: pre-populated contract in node storage"
);
}
let user_ev_controller = self
.user_ev_controller
.take()
.expect("user_ev_controller should be set");
let labels: Vec<_> = self.labels.clone();
let label_to_key: HashMap<NodeLabel, _> = labels.into_iter().collect();
sim.client("test", async move {
tokio::time::sleep(Duration::from_secs(3)).await;
for (event_id, node_label) in operation_sequence {
if let Some(peer_key) = label_to_key.get(&node_label) {
tracing::info!(
event_id,
node = %node_label,
"Triggering controlled event"
);
if user_ev_controller
.send((event_id, peer_key.clone()))
.is_err()
{
tracing::warn!(
event_id,
node = %node_label,
"Failed to send event signal - receivers dropped"
);
break;
}
tokio::time::sleep(Duration::from_secs(3)).await;
} else {
tracing::warn!(
event_id,
node = %node_label,
"No peer key found for node label"
);
}
}
tracing::info!(
wait_secs = post_operations_wait.as_secs(),
"Waiting for post-operation processing"
);
tokio::time::sleep(post_operations_wait).await;
Ok(())
});
let turmoil_result = sim.run();
let topology_snapshots = get_all_topology_snapshots(&network_name);
ControlledSimulationResult {
turmoil_result,
topology_snapshots,
node_storages,
}
}
#[cfg(any(test, feature = "testing"))]
pub fn run_fdev_test<R>(
mut self,
seed: u64,
max_contract_num: usize,
iterations: usize,
simulation_duration: Duration,
event_wait: Duration,
) -> anyhow::Result<()>
where
R: RandomEventGenerator + Send + 'static,
{
use crate::config::{GlobalRng, GlobalSimulationTime};
use crate::ring::topology_registry::set_current_network_name;
use std::sync::Mutex;
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000; const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000; let epoch_offset = seed % RANGE_MS;
GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + epoch_offset);
set_current_network_name(&self.name);
let mut sim = turmoil::Builder::new()
.simulation_duration(simulation_duration)
.rng_seed(seed)
.build();
let total_peer_num = self.gateways.len() + self.nodes.len();
let gateways: Vec<_> = self.gateways.drain(..).collect();
for (node, config) in gateways {
let label = config.label.clone();
let host_name = label.to_string();
let receiver_ch = self.receiver_ch.clone();
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
receiver_ch,
node.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(label.number(), total_peer_num, max_contract_num, iterations);
let span = tracing::info_span!("turmoil_gateway", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let node = Arc::new(Mutex::new(Some(node)));
let user_events = Arc::new(Mutex::new(Some(user_events)));
let span = Arc::new(Mutex::new(Some(span)));
let shared_storage = Arc::new(Mutex::new(Some(shared_storage)));
sim.host(host_name, move || {
let node = node.clone();
let user_events = user_events.clone();
let span = span.clone();
let shared_storage = shared_storage.clone();
async move {
let node = node
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let user_events = user_events
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let span = span
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let shared_storage = shared_storage
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
.map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error>
})
}
});
}
let nodes: Vec<_> = self.nodes.drain(..).collect();
for (node, label) in nodes {
let host_name = label.to_string();
let receiver_ch = self.receiver_ch.clone();
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
receiver_ch,
node.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(label.number(), total_peer_num, max_contract_num, iterations);
let span = tracing::info_span!("turmoil_node", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let node = Arc::new(Mutex::new(Some(node)));
let user_events = Arc::new(Mutex::new(Some(user_events)));
let span = Arc::new(Mutex::new(Some(span)));
let shared_storage = Arc::new(Mutex::new(Some(shared_storage)));
sim.host(host_name, move || {
let node = node.clone();
let user_events = user_events.clone();
let span = span.clone();
let shared_storage = shared_storage.clone();
async move {
let node = node
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let user_events = user_events
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let span = span
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
let shared_storage = shared_storage
.lock()
.unwrap()
.take()
.expect("Turmoil host should only be called once");
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
.map_err(|e| {
Box::new(std::io::Error::other(e.to_string()))
as Box<dyn std::error::Error>
})
}
});
}
let user_ev_controller = self
.user_ev_controller
.take()
.expect("user_ev_controller should be set");
let labels: Vec<_> = self.labels.clone();
let event_logs = self.event_listener.logs.clone();
sim.client("test", async move {
tokio::time::sleep(Duration::from_secs(2)).await;
use rand::SeedableRng;
use rand::prelude::*;
let mut event_rng = <rand::rngs::SmallRng as SeedableRng>::seed_from_u64(seed);
for event_id in 0..iterations as u32 {
if let Some((_, peer_key)) = labels.choose(&mut event_rng) {
if user_ev_controller
.send((event_id, peer_key.clone()))
.is_err()
{
tracing::warn!(event_id, "Failed to send event signal - receivers dropped");
break;
}
}
tokio::time::sleep(event_wait).await;
}
tokio::time::sleep(Duration::from_secs(2)).await;
let subscribed_count = {
use std::collections::HashSet;
let logs = event_logs.lock().await;
let mut subscribed_contracts: HashSet<String> = HashSet::new();
for log in logs.iter() {
if let crate::tracing::EventKind::Subscribe(
crate::tracing::SubscribeEvent::SubscribeSuccess { key, .. },
) = &log.kind
{
subscribed_contracts.insert(format!("{:?}", key));
}
}
subscribed_contracts.len()
};
if subscribed_count > 0 {
tracing::info!(
"Found {} subscribed contracts, checking convergence...",
subscribed_count
);
tokio::time::sleep(Duration::from_secs(10)).await;
}
Ok(())
});
sim.run()
.map_err(|e| anyhow::anyhow!("Turmoil simulation failed: {:?}", e))
}
#[cfg(any(test, feature = "testing"))]
pub fn run_simulation_direct<R>(
mut self,
seed: u64,
max_contract_num: usize,
iterations: usize,
event_wait: Duration,
) -> anyhow::Result<()>
where
R: RandomEventGenerator + Send + 'static,
{
use crate::config::{GlobalRng, GlobalSimulationTime, SimulationTransportOpt};
use crate::ring::topology_registry::set_current_network_name;
GlobalRng::set_seed(seed);
const BASE_EPOCH_MS: u64 = 1577836800000; const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000; let epoch_offset = seed % RANGE_MS;
GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + epoch_offset);
set_current_network_name(&self.name);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.start_paused(true)
.build()?;
let total_peer_num = self.gateways.len() + self.nodes.len();
crate::config::SimulationIdleTimeout::enable();
SimulationTransportOpt::enable();
let use_mock_wasm = self.use_mock_wasm;
let result: anyhow::Result<()> = rt.block_on(async {
let vt = self.virtual_time.clone();
let time_driver = tokio::spawn(async move {
let start = tokio::time::Instant::now();
loop {
tokio::time::sleep(Duration::from_millis(1)).await;
vt.advance_to(start.elapsed().as_nanos() as u64);
}
});
let direct_nodes: Arc<tokio::sync::Mutex<HashMap<NodeLabel, DirectNodeState>>> =
Arc::new(tokio::sync::Mutex::new(HashMap::new()));
let mut node_handles = Vec::new();
let gateways: Vec<_> = self.gateways.drain(..).collect();
for (node, config) in gateways {
let label = config.label.clone();
let receiver_ch = self.receiver_ch.clone();
let addr = *self.node_addresses.get(&label).expect("gateway address");
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
receiver_ch,
node.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(
label.number(),
total_peer_num,
max_contract_num,
iterations,
);
let span = tracing::info_span!("direct_gateway", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let handle = if use_mock_wasm {
tokio::spawn(async move {
node.run_node_with_mock_wasm(user_events, span, shared_storage, None)
.await
})
} else {
tokio::spawn(async move {
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
})
};
{
let mut nodes_map = direct_nodes.lock().await;
nodes_map.insert(
label.clone(),
DirectNodeState {
label: label.clone(),
addr,
is_gateway: true,
permanently_dropped: false,
},
);
}
node_handles.push(handle);
}
let nodes: Vec<_> = self.nodes.drain(..).collect();
for (i, (node, label)) in nodes.into_iter().enumerate() {
let receiver_ch = self.receiver_ch.clone();
let addr = *self.node_addresses.get(&label).expect("node address");
let shared_storage = crate::wasm_runtime::MockStateStorage::new();
let mut user_events = MemoryEventsGen::<R>::new_with_seed(
receiver_ch,
node.config.key_pair.public().clone(),
seed,
);
user_events.rng_params(
label.number(),
total_peer_num,
max_contract_num,
iterations,
);
let span = tracing::info_span!("direct_node", %label);
self.labels
.push((label.clone(), node.config.key_pair.public().clone()));
let backoff = self.start_backoff * (i as u32 + 1);
let handle = if use_mock_wasm {
tokio::spawn(async move {
tokio::time::sleep(backoff).await;
node.run_node_with_mock_wasm(user_events, span, shared_storage, None)
.await
})
} else {
tokio::spawn(async move {
tokio::time::sleep(backoff).await;
node.run_node_with_shared_storage(user_events, span, shared_storage)
.await
})
};
{
let mut nodes_map = direct_nodes.lock().await;
nodes_map.insert(
label.clone(),
DirectNodeState {
label: label.clone(),
addr,
is_gateway: false,
permanently_dropped: false,
},
);
}
node_handles.push(handle);
}
let chaos_driver = if let Some(churn_config) = self.churn_config.clone() {
use crate::node::network_bridge::get_fault_injector;
use rand::SeedableRng;
use rand::prelude::*;
let chaos_nodes = direct_nodes.clone();
let network_name = self.name.clone();
let non_gateway_count = self.number_of_nodes;
let max_crashes = churn_config
.max_simultaneous_crashes
.unwrap_or((non_gateway_count / 4).max(1));
Some(tokio::spawn(async move {
let mut chaos_rng = <rand::rngs::SmallRng as SeedableRng>::seed_from_u64(
seed.wrapping_add(0xC1_0055_DEAD),
);
tokio::time::sleep(churn_config.warmup_delay).await;
let mut pending_recovery: Vec<(NodeLabel, tokio::time::Instant)> = Vec::new();
let mut total_crashes: usize = 0;
let mut total_recoveries: usize = 0;
let mut total_permanent: usize = 0;
loop {
tokio::time::sleep(churn_config.tick_interval).await;
let now = tokio::time::Instant::now();
let mut recovered = Vec::new();
pending_recovery.retain(|(label, crash_time)| {
if now.duration_since(*crash_time) >= churn_config.recovery_delay {
recovered.push(label.clone());
false
} else {
true
}
});
for label in recovered {
let nodes_map = chaos_nodes.lock().await;
let state = match nodes_map.get(&label) {
Some(s) if !s.permanently_dropped => s,
_ => continue,
};
let addr = state.addr;
drop(nodes_map);
if let Some(injector) = get_fault_injector(&network_name) {
let mut inj = injector.lock().unwrap();
inj.config.recover_node(&addr);
}
total_recoveries += 1;
tracing::info!(
?label,
?addr,
total_recoveries,
"Chaos driver: node recovered"
);
}
let mut currently_crashed = pending_recovery.len();
if currently_crashed >= max_crashes {
continue;
}
let nodes_map = chaos_nodes.lock().await;
let eligible: Vec<(NodeLabel, SocketAddr)> = nodes_map
.values()
.filter(|s| {
!s.is_gateway
&& !s.permanently_dropped
&& !pending_recovery.iter().any(|(l, _)| l == &s.label)
})
.map(|s| (s.label.clone(), s.addr))
.collect();
drop(nodes_map);
for (label, addr) in eligible {
if currently_crashed >= max_crashes {
break;
}
if !chaos_rng.random_bool(churn_config.crash_probability) {
continue;
}
let is_permanent =
chaos_rng.random_bool(churn_config.permanent_crash_rate);
if let Some(injector) = get_fault_injector(&network_name) {
let mut inj = injector.lock().unwrap();
inj.config.crash_node(addr);
}
total_crashes += 1;
if is_permanent {
let mut nodes_map = chaos_nodes.lock().await;
if let Some(state) = nodes_map.get_mut(&label) {
state.permanently_dropped = true;
}
total_permanent += 1;
tracing::info!(
?label,
?addr,
total_permanent,
"Chaos driver: node permanently crashed"
);
} else {
pending_recovery.push((label.clone(), tokio::time::Instant::now()));
currently_crashed += 1;
tracing::info!(
?label,
?addr,
total_crashes,
"Chaos driver: node crashed (will recover)"
);
}
}
}
}))
} else {
None
};
let user_ev_controller = self
.user_ev_controller
.take()
.expect("user_ev_controller should be set");
let labels: Vec<_> = self.labels.clone();
let event_logs = self.event_listener.logs.clone();
tokio::time::sleep(Duration::from_secs(2)).await;
use rand::SeedableRng;
use rand::prelude::*;
let mut event_rng = <rand::rngs::SmallRng as SeedableRng>::seed_from_u64(seed);
for event_id in 0..iterations as u32 {
if let Some((_, peer_key)) = labels.choose(&mut event_rng) {
if user_ev_controller
.send((event_id, peer_key.clone()))
.is_err()
{
tracing::warn!(event_id, "Failed to send event signal - receivers dropped");
break;
}
}
tokio::time::sleep(event_wait).await;
}
tokio::time::sleep(Duration::from_secs(15)).await;
let converged = 'convergence: {
for round in 0..30u32 {
let result = check_convergence_from_logs(&event_logs).await;
let total = result.converged.len() + result.diverged.len();
if total > 0 && result.diverged.is_empty() {
tracing::info!(
converged = result.converged.len(),
round,
"Convergence achieved during propagation"
);
break 'convergence true;
}
tracing::debug!(
converged = result.converged.len(),
diverged = result.diverged.len(),
round,
"Convergence not yet achieved, waiting..."
);
tokio::time::sleep(Duration::from_secs(60)).await;
}
false
};
if !converged {
tracing::warn!(
"Propagation timeout — convergence not achieved after 30 rounds (1800s)"
);
}
if let Some(chaos) = chaos_driver {
chaos.abort();
}
time_driver.abort();
let mut first_error: Option<anyhow::Error> = None;
for handle in node_handles {
handle.abort();
match handle.await {
Err(e) if e.is_cancelled() => {}
Err(e) => {
let msg = format!("Node task panicked: {e}");
tracing::error!("{}", msg);
if first_error.is_none() {
first_error = Some(anyhow::anyhow!("{}", msg));
}
}
Ok(Err(e)) => {
tracing::error!("Node task failed: {e}");
if first_error.is_none() {
first_error = Some(e);
}
}
Ok(Ok(())) => {}
}
}
if let Some(e) = first_error {
return Err(e);
}
info!("Direct simulation completed successfully");
Ok(())
});
result
}
pub fn network_name(&self) -> &str {
&self.name
}
pub fn get_topology_snapshots(&self) -> Vec<crate::ring::topology_registry::TopologySnapshot> {
crate::ring::topology_registry::get_all_topology_snapshots(&self.name)
}
pub fn validate_subscription_topology(
&self,
contract_id: &freenet_stdlib::prelude::ContractInstanceId,
contract_location: f64,
) -> crate::ring::topology_registry::TopologyValidationResult {
crate::ring::topology_registry::validate_topology(
&self.name,
contract_id,
contract_location,
)
}
pub fn clear_topology_snapshots(&self) {
crate::ring::topology_registry::clear_topology_snapshots(&self.name);
}
pub fn assert_topology_healthy(
&self,
contract_id: &freenet_stdlib::prelude::ContractInstanceId,
contract_location: f64,
) {
let result = self.validate_subscription_topology(contract_id, contract_location);
if !result.is_healthy() {
let mut issues = Vec::new();
if !result.bidirectional_cycles.is_empty() {
issues.push(format!(
"ISSUE #2720: {} bidirectional cycles found: {:?}",
result.bidirectional_cycles.len(),
result.bidirectional_cycles
));
}
if !result.orphan_hosters.is_empty() {
issues.push(format!(
"ISSUE #2719: {} orphan hosters found: {:?}",
result.orphan_hosters.len(),
result.orphan_hosters
));
}
if !result.unreachable_hosters.is_empty() {
issues.push(format!(
"ISSUE #2720: {} unreachable hosters found: {:?}",
result.unreachable_hosters.len(),
result.unreachable_hosters
));
}
if !result.proximity_violations.is_empty() {
issues.push(format!(
"ISSUE #2721: {} proximity violations found",
result.proximity_violations.len()
));
}
panic!(
"Subscription topology has {} issues:\n{}",
result.issue_count,
issues.join("\n")
);
}
}
}
#[derive(Debug, Clone)]
pub struct ConvergenceResult {
pub converged: Vec<ConvergedContract>,
pub diverged: Vec<DivergedContract>,
}
impl ConvergenceResult {
pub fn total_contracts(&self) -> usize {
self.converged.len() + self.diverged.len()
}
pub fn rate(&self) -> f64 {
if self.total_contracts() == 0 {
return 1.0;
}
self.converged.len() as f64 / self.total_contracts() as f64
}
pub fn is_converged(&self) -> bool {
self.diverged.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct ConvergedContract {
pub contract_key: String,
pub state_hash: String,
pub replica_count: usize,
}
#[derive(Debug, Clone)]
pub struct DivergedContract {
pub contract_key: String,
pub peer_states: Vec<(SocketAddr, String)>,
}
impl DivergedContract {
pub fn unique_state_count(&self) -> usize {
let unique: HashSet<&String> = self.peer_states.iter().map(|(_, h)| h).collect();
unique.len()
}
}
#[derive(Debug, Clone)]
pub struct ContractDistribution {
pub contract_key: String,
pub replica_count: usize,
pub peers: Vec<SocketAddr>,
}
#[derive(Debug, Clone, Default)]
pub struct OperationSummary {
pub put: PutOperationStats,
pub get: OperationStats,
pub subscribe: OperationStats,
pub update: UpdateOperationStats,
pub timeouts: usize,
}
impl OperationSummary {
pub fn total_requested(&self) -> usize {
self.put.requested + self.get.requested + self.subscribe.requested + self.update.requested
}
pub fn total_completed(&self) -> usize {
self.put.completed()
+ self.get.completed()
+ self.subscribe.completed()
+ self.update.completed()
}
pub fn total_succeeded(&self) -> usize {
self.put.succeeded + self.get.succeeded + self.subscribe.succeeded + self.update.succeeded
}
pub fn total_failed(&self) -> usize {
self.put.failed + self.get.failed + self.subscribe.failed + self.update.failed
}
pub fn overall_success_rate(&self) -> f64 {
let completed = self.total_completed() + self.timeouts;
if completed == 0 {
return 1.0; }
self.total_succeeded() as f64 / completed as f64
}
pub fn all_completed(&self) -> bool {
self.total_completed() >= self.total_requested()
}
}
#[derive(Debug, Clone, Default)]
pub struct OperationStats {
pub requested: usize,
pub succeeded: usize,
pub failed: usize,
}
impl OperationStats {
pub fn completed(&self) -> usize {
self.succeeded + self.failed
}
pub fn success_rate(&self) -> f64 {
let completed = self.completed();
if completed == 0 {
return 1.0;
}
self.succeeded as f64 / completed as f64
}
}
#[derive(Debug, Clone, Default)]
pub struct PutOperationStats {
pub requested: usize,
pub succeeded: usize,
pub failed: usize,
pub broadcasts_emitted: usize,
pub broadcasts_received: usize,
}
impl PutOperationStats {
pub fn completed(&self) -> usize {
self.succeeded + self.failed
}
pub fn success_rate(&self) -> f64 {
let completed = self.completed();
if completed == 0 {
return 1.0;
}
self.succeeded as f64 / completed as f64
}
pub fn broadcast_propagation_rate(&self) -> f64 {
if self.broadcasts_emitted == 0 {
return 1.0;
}
self.broadcasts_received as f64 / self.broadcasts_emitted as f64
}
}
#[derive(Debug, Clone, Default)]
pub struct UpdateOperationStats {
pub requested: usize,
pub succeeded: usize,
pub failed: usize,
pub broadcasts_emitted: usize,
pub broadcasts_received: usize,
}
impl UpdateOperationStats {
pub fn completed(&self) -> usize {
self.succeeded + self.failed
}
pub fn success_rate(&self) -> f64 {
let completed = self.completed();
if completed == 0 {
return 1.0;
}
self.succeeded as f64 / completed as f64
}
}
#[cfg(any(debug_assertions, test))]
impl std::fmt::Debug for SimNetwork {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SimNetwork")
.field("name", &self.name)
.field("labels", &self.labels)
.field("number_of_gateways", &self.number_of_gateways)
.field("number_of_nodes", &self.number_of_nodes)
.field("ring_max_htl", &self.ring_max_htl)
.field("rnd_if_htl_above", &self.rnd_if_htl_above)
.field("max_connections", &self.max_connections)
.field("min_connections", &self.min_connections)
.field("init_backoff", &self.start_backoff)
.finish()
}
}
impl Drop for SimNetwork {
fn drop(&mut self) {
use crate::node::network_bridge::set_fault_injector;
use crate::ring::topology_registry::{
clear_current_network_name, clear_topology_snapshots,
};
use crate::transport::in_memory_socket::{
clear_network_address_mappings, remove_network_socket_registry,
set_packet_delivery_callback, set_queue_packet_callback,
};
set_fault_injector(&self.name, None);
unregister_network_time_source(&self.name);
clear_topology_snapshots(&self.name);
remove_network_socket_registry(&self.name);
clear_network_address_mappings(&self.name);
set_packet_delivery_callback(None);
set_queue_packet_callback(None);
clear_current_network_name();
if self.clean_up_tmp_dirs {
clean_up_tmp_dirs(self.labels.iter().map(|(l, _)| l));
}
}
}
async fn capture_shared_slot<T>(
slot: &Arc<parking_lot::Mutex<Option<T>>>,
start_backoff: Duration,
label: &NodeLabel,
) -> Option<T> {
tokio::time::sleep(start_backoff).await;
if let Some(value) = slot.lock().take() {
return Some(value);
}
const MAX_PUBLISH_POLLS: usize = 1024;
for _ in 0..MAX_PUBLISH_POLLS {
tokio::task::yield_now().await;
if let Some(value) = slot.lock().take() {
return Some(value);
}
}
tracing::warn!(
%label,
max_polls = MAX_PUBLISH_POLLS,
"SimNetwork: node did not publish its shared slot within the \
yield-poll budget — the spawned `run_node` task likely panicked \
before reaching its publish point, or is wedged before its first \
`.await`."
);
None
}
#[cfg(test)]
mod capture_shared_slot_tests {
use super::{NodeLabel, capture_shared_slot};
use std::{sync::Arc, time::Duration};
fn label() -> NodeLabel {
NodeLabel::gateway("test", 0)
}
#[tokio::test(flavor = "current_thread")]
async fn fast_path_returns_published_value() {
let slot = Arc::new(parking_lot::Mutex::new(Some(42u32)));
let result = capture_shared_slot(&slot, Duration::from_millis(1), &label()).await;
assert_eq!(result, Some(42));
assert!(slot.lock().is_none(), "slot must be drained by take()");
}
#[tokio::test(flavor = "current_thread")]
async fn publication_race_resolves_via_yield_loop() {
let slot = Arc::new(parking_lot::Mutex::new(None::<u32>));
let publisher_slot = Arc::clone(&slot);
tokio::spawn(async move {
for _ in 0..3 {
tokio::task::yield_now().await;
}
*publisher_slot.lock() = Some(7);
});
let result = capture_shared_slot(&slot, Duration::from_millis(1), &label()).await;
assert_eq!(result, Some(7));
}
#[tokio::test(flavor = "current_thread")]
async fn timeout_returns_none_when_never_published() {
let slot: Arc<parking_lot::Mutex<Option<u32>>> = Arc::new(parking_lot::Mutex::new(None));
let result = capture_shared_slot(&slot, Duration::from_millis(1), &label()).await;
assert_eq!(result, None);
}
}
fn clean_up_tmp_dirs<'a>(labels: impl Iterator<Item = &'a NodeLabel>) {
for label in labels {
let p = std::env::temp_dir().join(format!(
"freenet-executor-{sim}-{label}",
sim = "sim",
label = label
));
let _removed = std::fs::remove_dir_all(p);
}
}
pub async fn check_convergence_from_logs(
logs: &Arc<tokio::sync::Mutex<Vec<crate::tracing::NetLogMessage>>>,
) -> ConvergenceResult {
let logs = logs.lock().await;
let mut contract_states: BTreeMap<String, BTreeMap<SocketAddr, String>> = BTreeMap::new();
for log in logs.iter() {
let contract_key = log.kind.contract_key().map(|k| format!("{:?}", k));
let state_hash = log.kind.stored_state_hash().map(String::from);
if let (Some(contract_key), Some(state_hash)) = (contract_key, state_hash) {
contract_states
.entry(contract_key)
.or_default()
.insert(log.peer_id.socket_addr(), state_hash);
}
}
let mut converged = Vec::new();
let mut diverged = Vec::new();
for (contract_key, peer_states) in contract_states {
if peer_states.len() < 2 {
continue;
}
let unique_states: HashSet<&String> = peer_states.values().collect();
if unique_states.len() == 1 {
let state = unique_states.into_iter().next().unwrap().clone();
converged.push(ConvergedContract {
contract_key,
state_hash: state,
replica_count: peer_states.len(),
});
} else {
diverged.push(DivergedContract {
contract_key,
peer_states: peer_states.into_iter().collect(),
});
}
}
ConvergenceResult {
converged,
diverged,
}
}
use crate::contract::OperationMode;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_deterministic_peer_locations() {
const SEED: u64 = 0xDEADBEEF_CAFEBABE;
let sim1 = SimNetwork::new(
"determinism-test-1",
2, 3, 7, 3, 10, 2, SEED,
)
.await;
let locations1 = sim1.get_peer_locations();
let sim2 = SimNetwork::new(
"determinism-test-2",
2, 3,
7,
3,
10,
2,
SEED, )
.await;
let locations2 = sim2.get_peer_locations();
assert_eq!(
locations1, locations2,
"Peer locations should be identical with the same seed.\n\
Run 1: {:?}\n\
Run 2: {:?}",
locations1, locations2
);
}
}