use ant_node::ant_protocol::{
ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
ChunkPutResponse, CHUNK_PROTOCOL_ID, MAX_WIRE_MESSAGE_SIZE,
};
use ant_node::client::{send_and_await_chunk_response, DataChunk, XorName};
use ant_node::payment::{
EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
QuotingMetricsTracker,
};
use ant_node::replication::config::MAX_REPLICATION_MESSAGE_SIZE;
use ant_node::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
use ant_node::{ReplicationConfig, ReplicationEngine};
use bytes::Bytes;
use evmlib::Network as EvmNetwork;
use evmlib::RewardsAddress;
use futures::future::join_all;
use rand::Rng;
use saorsa_core::identity::PeerId;
use saorsa_core::{
identity::NodeIdentity, IPDiversityConfig as CoreDiversityConfig, MultiAddr,
NodeConfig as CoreNodeConfig, P2PEvent, P2PNode,
};
use std::net::{Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
pub const TEST_PORT_RANGE_MIN: u16 = 20_000;
pub const TEST_PORT_RANGE_MAX: u16 = 60_000;
pub const MAX_TEST_NODE_COUNT: usize = 1000;
const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
const DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS: u64 = 30;
const TEST_CORE_CONNECTION_TIMEOUT_SECS: u64 = 2;
const TEST_PAYMENT_CACHE_CAPACITY: usize = 1000;
const TEST_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
const TEST_INITIAL_RECORDS: usize = 1000;
pub const DEFAULT_NODE_COUNT: usize = 25;
pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
pub const MINIMAL_NODE_COUNT: usize = 5;
pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
pub const SMALL_NODE_COUNT: usize = 10;
#[derive(Debug, thiserror::Error)]
pub enum TestnetError {
#[error("Configuration error: {0}")]
Config(String),
#[error("Node startup error: {0}")]
Startup(String),
#[error("Network stabilization error: {0}")]
Stabilization(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Core error: {0}")]
Core(String),
#[error("Storage error: {0}")]
Storage(String),
#[error("Retrieval error: {0}")]
Retrieval(String),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Node not running")]
NodeNotRunning,
}
pub type Result<T> = std::result::Result<T, TestnetError>;
#[derive(Debug, Clone)]
pub struct TestNetworkConfig {
pub node_count: usize,
pub base_port: u16,
pub bootstrap_count: usize,
pub test_data_dir: PathBuf,
pub spawn_delay: Duration,
pub stabilization_timeout: Duration,
pub node_startup_timeout: Duration,
pub enable_node_logging: bool,
pub payment_enforcement: bool,
pub evm_network: Option<EvmNetwork>,
}
impl Default for TestNetworkConfig {
fn default() -> Self {
let mut rng = rand::thread_rng();
#[allow(clippy::cast_possible_truncation)]
let max_base_port = TEST_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
let base_port = if max_base_port > TEST_PORT_RANGE_MIN {
rng.gen_range(TEST_PORT_RANGE_MIN..max_base_port)
} else {
TEST_PORT_RANGE_MIN
};
let suffix: u64 = rng.gen();
let test_data_dir = std::env::temp_dir().join(format!("ant_test_{suffix:x}"));
Self {
node_count: DEFAULT_NODE_COUNT,
base_port,
bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
test_data_dir,
spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
enable_node_logging: false,
payment_enforcement: false,
evm_network: None,
}
}
}
impl TestNetworkConfig {
#[must_use]
pub fn minimal() -> Self {
Self {
node_count: MINIMAL_NODE_COUNT,
bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
..Default::default()
}
}
#[must_use]
pub fn small() -> Self {
Self {
node_count: SMALL_NODE_COUNT,
bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
..Default::default()
}
}
#[must_use]
pub fn with_payment_enforcement(mut self) -> Self {
self.payment_enforcement = true;
self
}
#[must_use]
pub fn with_evm_network(mut self, network: EvmNetwork) -> Self {
self.evm_network = Some(network);
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NetworkState {
Uninitialized,
BootstrappingPhase,
NodeSpawningPhase,
Stabilizing,
Ready,
ShuttingDown,
Stopped,
Failed(String),
}
impl NetworkState {
#[must_use]
pub fn is_running(&self) -> bool {
matches!(self, Self::Ready | Self::Stabilizing)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeState {
Pending,
Starting,
Running,
Connecting,
Connected,
Stopping,
Stopped,
ShutDown,
Failed(String),
}
pub struct TestNode {
pub index: usize,
pub node_id: String,
pub port: u16,
pub data_dir: PathBuf,
pub p2p_node: Option<Arc<P2PNode>>,
pub ant_protocol: Option<Arc<AntProtocol>>,
pub is_bootstrap: bool,
pub state: Arc<RwLock<NodeState>>,
pub bootstrap_addrs: Vec<MultiAddr>,
pub node_identity: Option<Arc<NodeIdentity>>,
pub protocol_task: Option<JoinHandle<()>>,
pub replication_engine: Option<ReplicationEngine>,
pub replication_shutdown: Option<CancellationToken>,
}
impl TestNode {
pub async fn is_running(&self) -> bool {
matches!(
&*self.state.read().await,
NodeState::Running | NodeState::Connected
)
}
pub async fn shutdown(&mut self) -> Result<()> {
info!("Shutting down test node {}", self.index);
if let Some(ref mut engine) = self.replication_engine {
engine.shutdown().await;
}
self.replication_engine = None;
self.replication_shutdown = None;
if let Some(handle) = self.protocol_task.take() {
handle.abort();
}
*self.state.write().await = NodeState::Stopping;
if let Some(p2p) = self.p2p_node.take() {
p2p.shutdown()
.await
.map_err(|e| TestnetError::Core(format!("Failed to shutdown node: {e}")))?;
}
*self.state.write().await = NodeState::ShutDown;
info!("Test node {} shut down successfully", self.index);
Ok(())
}
pub async fn peer_count(&self) -> usize {
if let Some(ref node) = self.p2p_node {
node.peer_count().await
} else {
0
}
}
pub async fn connected_peers(&self) -> Vec<PeerId> {
if let Some(ref node) = self.p2p_node {
node.connected_peers().await
} else {
vec![]
}
}
pub async fn store_chunk(&self, data: &[u8]) -> Result<XorName> {
let protocol = self
.ant_protocol
.as_ref()
.ok_or(TestnetError::NodeNotRunning)?;
let address = Self::compute_chunk_address(data);
protocol.payment_verifier().cache_insert(address);
let request_id: u64 = rand::thread_rng().gen();
let request = ChunkPutRequest::new(address, data.to_vec());
let message = ChunkMessage {
request_id,
body: ChunkMessageBody::PutRequest(request),
};
let message_bytes = message.encode().map_err(|e| {
TestnetError::Serialization(format!("Failed to encode PUT request: {e}"))
})?;
let timeout = Duration::from_secs(DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS);
let response_bytes =
tokio::time::timeout(timeout, protocol.try_handle_request(&message_bytes))
.await
.map_err(|_| {
TestnetError::Storage(format!(
"Timeout storing chunk after {DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS}s"
))
})?
.map_err(|e| TestnetError::Storage(format!("Protocol error: {e}")))?
.ok_or_else(|| {
TestnetError::Storage(format!(
"Protocol returned no response for PUT request (request_id={request_id}, node_index={})",
self.index
))
})?;
let response = ChunkMessage::decode(&response_bytes)
.map_err(|e| TestnetError::Storage(format!("Failed to decode response: {e}")))?;
match response.body {
ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
debug!("Node {} stored chunk at {}", self.index, hex::encode(addr));
Ok(addr)
}
ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) => {
debug!(
"Node {} chunk already exists at {}",
self.index,
hex::encode(addr)
);
Ok(addr)
}
ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => Err(
TestnetError::Storage(format!("Payment required: {message}")),
),
ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
Err(TestnetError::Storage(format!("Protocol error: {e}")))
}
_ => Err(TestnetError::Storage(
"Unexpected response type".to_string(),
)),
}
}
pub async fn get_chunk(&self, address: &XorName) -> Result<Option<DataChunk>> {
let protocol = self
.ant_protocol
.as_ref()
.ok_or(TestnetError::NodeNotRunning)?;
let request_id: u64 = rand::thread_rng().gen();
let request = ChunkGetRequest::new(*address);
let message = ChunkMessage {
request_id,
body: ChunkMessageBody::GetRequest(request),
};
let message_bytes = message.encode().map_err(|e| {
TestnetError::Serialization(format!("Failed to encode GET request: {e}"))
})?;
let timeout = Duration::from_secs(DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS);
let response_bytes =
tokio::time::timeout(timeout, protocol.try_handle_request(&message_bytes))
.await
.map_err(|_| {
TestnetError::Retrieval(format!(
"Timeout retrieving chunk after {DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS}s"
))
})?
.map_err(|e| TestnetError::Retrieval(format!("Protocol error: {e}")))?
.ok_or_else(|| {
TestnetError::Retrieval(format!(
"Protocol returned no response for GET request (request_id={request_id}, address={address:?})"
))
})?;
let response = ChunkMessage::decode(&response_bytes)
.map_err(|e| TestnetError::Retrieval(format!("Failed to decode response: {e}")))?;
match response.body {
ChunkMessageBody::GetResponse(ChunkGetResponse::Success { address, content }) => {
debug!(
"Node {} retrieved chunk {} ({} bytes)",
self.index,
hex::encode(address),
content.len()
);
Ok(Some(DataChunk::new(address, Bytes::from(content))))
}
ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address }) => {
debug!(
"Node {} chunk not found: {}",
self.index,
hex::encode(address)
);
Ok(None)
}
ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => {
Err(TestnetError::Retrieval(format!("Protocol error: {e}")))
}
_ => Err(TestnetError::Retrieval(
"Unexpected response type".to_string(),
)),
}
}
pub async fn store_chunk_on(&self, target: &Self, data: &[u8]) -> Result<XorName> {
let target_p2p = target
.p2p_node
.as_ref()
.ok_or(TestnetError::NodeNotRunning)?;
let target_peer_id = target_p2p.peer_id();
self.store_chunk_on_peer(target_peer_id, data).await
}
pub async fn store_chunk_on_peer(
&self,
target_peer_id: &PeerId,
data: &[u8],
) -> Result<XorName> {
let p2p = self.p2p_node.as_ref().ok_or(TestnetError::NodeNotRunning)?;
let address = Self::compute_chunk_address(data);
let request_id: u64 = rand::thread_rng().gen();
let request = ChunkPutRequest::new(address, data.to_vec());
let message = ChunkMessage {
request_id,
body: ChunkMessageBody::PutRequest(request),
};
let message_bytes = message.encode().map_err(|e| {
TestnetError::Serialization(format!("Failed to encode PUT request: {e}"))
})?;
let timeout = Duration::from_secs(DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS);
let node_index = self.index;
send_and_await_chunk_response(
p2p,
target_peer_id,
message_bytes,
request_id,
timeout,
&[],
|body| match body {
ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
debug!(
"Node {} stored chunk on peer {}: {}",
node_index,
target_peer_id,
hex::encode(addr)
);
Some(Ok(addr))
}
ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
address: addr,
}) => {
debug!(
"Node {} chunk already exists on peer {}: {}",
node_index,
target_peer_id,
hex::encode(addr)
);
Some(Ok(addr))
}
ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
Some(Err(TestnetError::Storage(format!(
"Payment required: {message}"
))))
}
ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
TestnetError::Storage(format!("Remote protocol error: {e}")),
)),
_ => None,
},
|e| TestnetError::Storage(format!("Failed to send PUT to remote node: {e}")),
|| {
TestnetError::Storage(format!(
"Timeout waiting for remote store response after {DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS}s"
))
},
)
.await
}
pub async fn get_chunk_from(
&self,
target: &Self,
address: &XorName,
) -> Result<Option<DataChunk>> {
let target_p2p = target
.p2p_node
.as_ref()
.ok_or(TestnetError::NodeNotRunning)?;
let target_peer_id = target_p2p.peer_id();
self.get_chunk_from_peer(target_peer_id, address).await
}
pub async fn get_chunk_from_peer(
&self,
target_peer_id: &PeerId,
address: &XorName,
) -> Result<Option<DataChunk>> {
let p2p = self.p2p_node.as_ref().ok_or(TestnetError::NodeNotRunning)?;
let request_id: u64 = rand::thread_rng().gen();
let request = ChunkGetRequest::new(*address);
let message = ChunkMessage {
request_id,
body: ChunkMessageBody::GetRequest(request),
};
let message_bytes = message.encode().map_err(|e| {
TestnetError::Serialization(format!("Failed to encode GET request: {e}"))
})?;
let timeout = Duration::from_secs(DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS);
let node_index = self.index;
send_and_await_chunk_response(
p2p,
target_peer_id,
message_bytes,
request_id,
timeout,
&[],
|body| match body {
ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
address: addr,
content,
}) => {
debug!(
"Node {} retrieved chunk from peer {}: {} ({} bytes)",
node_index,
target_peer_id,
hex::encode(addr),
content.len()
);
Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
}
ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) => {
debug!(
"Node {} chunk not found on peer {}: {}",
node_index,
target_peer_id,
hex::encode(addr)
);
Some(Ok(None))
}
ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
TestnetError::Retrieval(format!("Remote protocol error: {e}")),
)),
_ => None,
},
|e| TestnetError::Retrieval(format!("Failed to send GET to remote node: {e}")),
|| {
TestnetError::Retrieval(format!(
"Timeout waiting for remote get response after {DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS}s"
))
},
)
.await
}
#[must_use]
pub fn compute_chunk_address(data: &[u8]) -> XorName {
ant_node::compute_address(data)
}
}
pub struct TestNetwork {
config: TestNetworkConfig,
nodes: Vec<TestNode>,
shutdown_tx: broadcast::Sender<()>,
state: Arc<RwLock<NetworkState>>,
health_monitor: Option<JoinHandle<()>>,
}
impl TestNetwork {
pub async fn new(config: TestNetworkConfig) -> Result<Self> {
if config.bootstrap_count >= config.node_count {
return Err(TestnetError::Config(
"Bootstrap count must be less than node count".to_string(),
));
}
if config.bootstrap_count == 0 {
return Err(TestnetError::Config(
"At least one bootstrap node is required".to_string(),
));
}
if config.node_count > MAX_TEST_NODE_COUNT {
return Err(TestnetError::Config(format!(
"Node count {} exceeds maximum {}",
config.node_count, MAX_TEST_NODE_COUNT
)));
}
let node_count_u16 = u16::try_from(config.node_count).map_err(|_| {
TestnetError::Config(format!("Node count {} exceeds u16::MAX", config.node_count))
})?;
let max_port = config
.base_port
.checked_add(node_count_u16)
.ok_or_else(|| {
TestnetError::Config(format!(
"Port range overflow: base_port {} + node_count {} exceeds u16::MAX",
config.base_port, config.node_count
))
})?;
if max_port > TEST_PORT_RANGE_MAX {
return Err(TestnetError::Config(format!(
"Port range overflow: max port {max_port} exceeds TEST_PORT_RANGE_MAX {TEST_PORT_RANGE_MAX}"
)));
}
tokio::fs::create_dir_all(&config.test_data_dir).await?;
let (shutdown_tx, _) = broadcast::channel(1);
Ok(Self {
config,
nodes: Vec::new(),
shutdown_tx,
state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
health_monitor: None,
})
}
pub async fn with_defaults() -> Result<Self> {
Self::new(TestNetworkConfig::default()).await
}
pub async fn minimal() -> Result<Self> {
Self::new(TestNetworkConfig::minimal()).await
}
pub async fn start(&mut self) -> Result<()> {
info!(
"Starting test network with {} nodes ({} bootstrap)",
self.config.node_count, self.config.bootstrap_count
);
*self.state.write().await = NetworkState::BootstrappingPhase;
self.start_bootstrap_nodes().await?;
*self.state.write().await = NetworkState::NodeSpawningPhase;
self.start_regular_nodes().await?;
*self.state.write().await = NetworkState::Stabilizing;
self.wait_for_stabilization().await?;
self.start_health_monitor();
*self.state.write().await = NetworkState::Ready;
info!("Test network is ready");
Ok(())
}
async fn start_bootstrap_nodes(&mut self) -> Result<()> {
info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
for i in 0..self.config.bootstrap_count {
let node = self.create_node(i, true, vec![]).await?;
self.start_node(node).await?;
tokio::time::sleep(self.config.spawn_delay).await;
}
self.wait_for_nodes_ready(0..self.config.bootstrap_count)
.await?;
info!("All bootstrap nodes are ready");
Ok(())
}
async fn start_regular_nodes(&mut self) -> Result<()> {
let regular_count = self.config.node_count - self.config.bootstrap_count;
info!("Starting {} regular nodes", regular_count);
let bootstrap_addrs: Vec<MultiAddr> = self
.nodes
.get(0..self.config.bootstrap_count)
.unwrap_or_default()
.iter()
.map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
.collect();
for i in self.config.bootstrap_count..self.config.node_count {
let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
self.start_node(node).await?;
tokio::time::sleep(self.config.spawn_delay).await;
}
info!("All regular nodes started");
Ok(())
}
async fn create_node(
&self,
index: usize,
is_bootstrap: bool,
bootstrap_addrs: Vec<MultiAddr>,
) -> Result<TestNode> {
let index_u16 = u16::try_from(index)
.map_err(|_| TestnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
let port = self.config.base_port + index_u16;
let node_id = format!("test_node_{index}");
let data_dir = self.config.test_data_dir.join(&node_id);
tokio::fs::create_dir_all(&data_dir).await?;
let identity = Arc::new(NodeIdentity::generate().map_err(|e| {
TestnetError::Core(format!("Failed to generate test node identity: {e}"))
})?);
let ant_protocol =
Self::create_ant_protocol(&data_dir, self.config.evm_network.clone(), &identity)
.await?;
Ok(TestNode {
index,
node_id,
port,
data_dir,
p2p_node: None,
ant_protocol: Some(Arc::new(ant_protocol)),
is_bootstrap,
state: Arc::new(RwLock::new(NodeState::Pending)),
bootstrap_addrs,
node_identity: Some(identity),
protocol_task: None,
replication_engine: None,
replication_shutdown: None,
})
}
pub async fn create_ant_protocol(
data_dir: &std::path::Path,
evm_network: Option<EvmNetwork>,
identity: &saorsa_core::identity::NodeIdentity,
) -> Result<AntProtocol> {
let storage_config = LmdbStorageConfig {
root_dir: data_dir.to_path_buf(),
..LmdbStorageConfig::test_default()
};
let storage = LmdbStorage::new(storage_config)
.await
.map_err(|e| TestnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
let rewards_address = RewardsAddress::new(TEST_REWARDS_ADDRESS);
let payment_config = PaymentVerifierConfig {
evm: EvmVerifierConfig {
network: evm_network.unwrap_or(EvmNetwork::ArbitrumSepoliaTest),
},
cache_capacity: TEST_PAYMENT_CACHE_CAPACITY,
local_rewards_address: rewards_address,
};
let payment_verifier = PaymentVerifier::new(payment_config);
let metrics_tracker = QuotingMetricsTracker::new(TEST_INITIAL_RECORDS);
let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
let pub_key_bytes = identity.public_key().as_bytes().to_vec();
let sk_bytes = identity.secret_key_bytes().to_vec();
let sk = {
use saorsa_pqc::pqc::types::MlDsaSecretKey;
match MlDsaSecretKey::from_bytes(&sk_bytes) {
Ok(sk) => sk,
Err(e) => {
return Err(TestnetError::Core(format!(
"Failed to deserialize ML-DSA-65 secret key: {e}"
)));
}
}
};
quote_generator.set_signer(pub_key_bytes, move |msg| {
use saorsa_pqc::pqc::MlDsaOperations;
let ml_dsa = saorsa_core::MlDsa65::new();
ml_dsa
.sign(&sk, msg)
.map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
});
Ok(AntProtocol::new(
Arc::new(storage),
Arc::new(payment_verifier),
Arc::new(quote_generator),
))
}
#[allow(clippy::too_many_lines)]
async fn start_node(&mut self, mut node: TestNode) -> Result<()> {
debug!("Starting node {} on port {}", node.index, node.port);
*node.state.write().await = NodeState::Starting;
let mut core_config = CoreNodeConfig::builder()
.port(node.port)
.local(true)
.connection_timeout(Duration::from_secs(TEST_CORE_CONNECTION_TIMEOUT_SECS))
.max_message_size(MAX_REPLICATION_MESSAGE_SIZE.max(MAX_WIRE_MESSAGE_SIZE))
.build()
.map_err(|e| TestnetError::Core(format!("Failed to create core config: {e}")))?;
core_config
.bootstrap_peers
.clone_from(&node.bootstrap_addrs);
core_config.diversity_config = Some(CoreDiversityConfig::permissive());
core_config.node_identity.clone_from(&node.node_identity);
let p2p_node = P2PNode::new(core_config).await.map_err(|e| {
TestnetError::Startup(format!("Failed to create node {}: {e}", node.index))
})?;
p2p_node.start().await.map_err(|e| {
TestnetError::Startup(format!("Failed to start node {}: {e}", node.index))
})?;
node.p2p_node = Some(Arc::new(p2p_node));
*node.state.write().await = NodeState::Running;
if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
let mut events = p2p.subscribe_events();
let p2p_clone = Arc::clone(p2p);
let protocol_clone = Arc::clone(protocol);
let node_index = node.index;
node.protocol_task = Some(tokio::spawn(async move {
while let Ok(event) = events.recv().await {
if let P2PEvent::Message {
topic,
source: Some(source),
data,
} = event
{
if topic == CHUNK_PROTOCOL_ID {
debug!(
"Node {node_index} received chunk protocol message from {source}"
);
let protocol = Arc::clone(&protocol_clone);
let p2p = Arc::clone(&p2p_clone);
tokio::spawn(async move {
match protocol.try_handle_request(&data).await {
Ok(Some(response)) => {
if let Err(e) = p2p
.send_message(
&source,
CHUNK_PROTOCOL_ID,
response.to_vec(),
&[],
)
.await
{
warn!(
"Node {node_index} failed to send chunk response to {source}: {e}"
);
}
}
Ok(None) => {
}
Err(e) => {
warn!(
"Node {node_index} chunk protocol handler error: {e}"
);
}
}
});
}
}
}
}));
}
if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
let shutdown = CancellationToken::new();
let repl_config = ReplicationConfig::default();
let (_fresh_tx, fresh_rx) = tokio::sync::mpsc::unbounded_channel();
match ReplicationEngine::new(
repl_config,
Arc::clone(p2p),
protocol.storage(),
protocol.payment_verifier_arc(),
&node.data_dir,
fresh_rx,
shutdown.clone(),
)
.await
{
Ok(mut engine) => {
let dht_events = p2p.dht_manager().subscribe_events();
engine.start(dht_events);
node.replication_engine = Some(engine);
node.replication_shutdown = Some(shutdown);
debug!("Node {} replication engine started", node.index);
}
Err(e) => {
warn!(
"Node {} failed to start replication engine: {e}",
node.index
);
}
}
}
debug!("Node {} started successfully", node.index);
self.nodes.push(node);
Ok(())
}
async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
let deadline = Instant::now() + self.config.node_startup_timeout;
for i in range {
while Instant::now() < deadline {
let node = self
.nodes
.get(i)
.ok_or_else(|| TestnetError::Config(format!("Node index {i} out of range")))?;
let state = node.state.read().await.clone();
match state {
NodeState::Running | NodeState::Connected => break,
NodeState::Failed(ref e) => {
return Err(TestnetError::Startup(format!("Node {i} failed: {e}")));
}
_ => tokio::time::sleep(Duration::from_millis(100)).await,
}
}
}
Ok(())
}
async fn wait_for_stabilization(&self) -> Result<()> {
let deadline = Instant::now() + self.config.stabilization_timeout;
let min_connections = self.config.bootstrap_count.min(3);
info!(
"Waiting for network stabilization (min {} connections per node)",
min_connections
);
while Instant::now() < deadline {
let mut all_connected = true;
let mut total_connections = 0;
for node in &self.nodes {
let peer_count = node.peer_count().await;
total_connections += peer_count;
if peer_count < min_connections {
all_connected = false;
}
}
if all_connected {
info!(
"Network stabilized: {} total connections",
total_connections
);
return Ok(());
}
debug!(
"Waiting for stabilization: {} total connections",
total_connections
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(TestnetError::Stabilization(
"Network failed to stabilize within timeout".to_string(),
))
}
pub async fn warmup_dht(&self) -> Result<()> {
info!("Warming up DHT routing tables ({} nodes)", self.nodes.len());
let num_warmup_queries = 5; let mut random_addresses = Vec::new();
for _ in 0..num_warmup_queries {
let mut addr = [0u8; 32];
rand::Rng::fill(&mut rand::thread_rng(), &mut addr);
random_addresses.push(addr);
}
for node in &self.nodes {
if let Some(ref p2p) = node.p2p_node {
for addr in &random_addresses {
let result = p2p.dht().find_closest_nodes(addr, 8).await;
if let Ok(peers) = result {
if peers.is_empty() {
warn!(
"Node {} DHT warmup found 0 peers for {} - DHT may not be seeded yet",
node.index,
hex::encode(addr)
);
} else {
debug!(
"Node {} DHT warmup found {} peers for target {}",
node.index,
peers.len(),
hex::encode(addr)
);
}
} else if tracing::enabled!(tracing::Level::WARN) {
warn!(
"Node {} DHT warmup failed for {}: {:?}",
node.index,
hex::encode(addr),
result
);
}
}
}
}
tokio::time::sleep(Duration::from_secs(3)).await;
info!("✅ DHT routing tables warmed up");
self.wait_for_replication_bootstrap().await?;
Ok(())
}
async fn wait_for_replication_bootstrap(&self) -> Result<()> {
const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(120);
for node in &self.nodes {
if let Some(ref engine) = node.replication_engine {
if !engine.wait_for_bootstrap_complete(BOOTSTRAP_TIMEOUT).await {
return Err(TestnetError::Stabilization(format!(
"Node {} replication bootstrap did not complete within {}s",
node.index,
BOOTSTRAP_TIMEOUT.as_secs(),
)));
}
debug!("Node {} replication bootstrap complete", node.index,);
}
}
info!("✅ All replication engines bootstrapped");
Ok(())
}
fn start_health_monitor(&mut self) {
let nodes: Vec<Arc<P2PNode>> = self
.nodes
.iter()
.filter_map(|n| n.p2p_node.clone())
.collect();
let _state = Arc::clone(&self.state);
let mut shutdown_rx = self.shutdown_tx.subscribe();
self.health_monitor = Some(tokio::spawn(async move {
let check_interval = Duration::from_secs(5);
loop {
tokio::select! {
_ = shutdown_rx.recv() => break,
() = tokio::time::sleep(check_interval) => {
for (i, node) in nodes.iter().enumerate() {
if !node.is_running() {
warn!("Node {} appears unhealthy", i);
}
}
}
}
}
}));
}
pub async fn shutdown(&mut self) -> Result<()> {
info!("Shutting down test network");
*self.state.write().await = NetworkState::ShuttingDown;
let _ = self.shutdown_tx.send(());
if let Some(handle) = self.health_monitor.take() {
handle.abort();
}
let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
for node in self.nodes.iter_mut().rev() {
let state = node.state.read().await.clone();
if matches!(state, NodeState::ShutDown | NodeState::Stopped) {
debug!("Skipping node {} (already shut down)", node.index);
continue;
}
debug!("Stopping node {}", node.index);
if let Some(ref mut engine) = node.replication_engine {
engine.shutdown().await;
}
node.replication_engine = None;
node.replication_shutdown = None;
if let Some(handle) = node.protocol_task.take() {
handle.abort();
}
*node.state.write().await = NodeState::Stopping;
if let Some(p2p) = node.p2p_node.clone() {
let node_index = node.index;
shutdown_futures.push(async move { (node_index, p2p.shutdown().await) });
}
}
for (node_index, result) in join_all(shutdown_futures).await {
if let Err(e) = result {
warn!("Error shutting down node {}: {}", node_index, e);
}
}
for node in &self.nodes {
let state = node.state.read().await.clone();
if !matches!(state, NodeState::ShutDown) {
*node.state.write().await = NodeState::Stopped;
}
}
if let Err(e) = tokio::fs::remove_dir_all(&self.config.test_data_dir).await {
warn!("Failed to cleanup test data directory: {}", e);
}
*self.state.write().await = NetworkState::Stopped;
info!("Test network shutdown complete");
Ok(())
}
#[must_use]
pub fn node(&self, index: usize) -> Option<&TestNode> {
self.nodes.get(index)
}
#[must_use]
pub fn node_mut(&mut self, index: usize) -> Option<&mut TestNode> {
self.nodes.get_mut(index)
}
#[must_use]
pub fn nodes(&self) -> &[TestNode] {
&self.nodes
}
#[must_use]
pub fn bootstrap_nodes(&self) -> &[TestNode] {
&self.nodes[0..self.config.bootstrap_count.min(self.nodes.len())]
}
#[must_use]
pub fn regular_nodes(&self) -> &[TestNode] {
if self.nodes.len() > self.config.bootstrap_count {
&self.nodes[self.config.bootstrap_count..]
} else {
&[]
}
}
pub async fn state(&self) -> NetworkState {
self.state.read().await.clone()
}
pub async fn is_ready(&self) -> bool {
matches!(self.state().await, NetworkState::Ready)
}
pub async fn total_connections(&self) -> usize {
let mut total = 0;
for node in &self.nodes {
total += node.peer_count().await;
}
total
}
#[must_use]
pub fn node_count(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn config(&self) -> &TestNetworkConfig {
&self.config
}
pub async fn add_node(&mut self) -> Result<usize> {
const DHT_WARMUP_QUERIES: usize = 10;
const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(120);
let index = self.nodes.len();
let bootstrap_addrs: Vec<MultiAddr> = self
.nodes
.get(0..self.config.bootstrap_count)
.unwrap_or_default()
.iter()
.map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
.collect();
let node = self.create_node(index, false, bootstrap_addrs).await?;
self.start_node(node).await?;
if let Some(ref p2p) = self.nodes[index].p2p_node {
for _ in 0..DHT_WARMUP_QUERIES {
let mut addr = [0u8; 32];
rand::Rng::fill(&mut rand::thread_rng(), &mut addr);
let _ = p2p.dht().find_closest_nodes(&addr, 8).await;
}
}
for i in 0..index {
if let Some(ref p2p) = self.nodes[i].p2p_node {
for _ in 0..DHT_WARMUP_QUERIES {
let mut addr = [0u8; 32];
rand::Rng::fill(&mut rand::thread_rng(), &mut addr);
let _ = p2p.dht().find_closest_nodes(&addr, 8).await;
}
}
}
tokio::time::sleep(Duration::from_secs(3)).await;
for node in &self.nodes {
if let Some(ref engine) = node.replication_engine {
engine.trigger_neighbor_sync();
}
}
if let Some(ref engine) = self.nodes[index].replication_engine {
if !engine.wait_for_bootstrap_complete(BOOTSTRAP_TIMEOUT).await {
return Err(TestnetError::Stabilization(format!(
"New node {index} replication bootstrap did not complete within {}s",
BOOTSTRAP_TIMEOUT.as_secs(),
)));
}
}
info!("New node {index} added and fully bootstrapped");
Ok(index)
}
pub async fn shutdown_node(&mut self, index: usize) -> Result<()> {
let node = self
.nodes
.get_mut(index)
.ok_or_else(|| TestnetError::Config(format!("Node index {index} out of bounds")))?;
node.shutdown().await?;
info!("Node {} has been shut down", index);
Ok(())
}
pub async fn shutdown_nodes(&mut self, indices: &[usize]) -> Result<()> {
for &index in indices {
self.shutdown_node(index).await?;
}
Ok(())
}
pub async fn running_node_count(&self) -> usize {
let mut count = 0;
for node in &self.nodes {
if node.is_running().await {
count += 1;
}
}
count
}
}
impl Drop for TestNetwork {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(());
for node in &mut self.nodes {
if let Some(token) = node.replication_shutdown.take() {
token.cancel();
}
}
if let Some(handle) = self.health_monitor.take() {
handle.abort();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_defaults() {
let config = TestNetworkConfig::default();
assert_eq!(config.node_count, 25);
assert_eq!(config.bootstrap_count, 3);
assert!(config.base_port >= 20000 && config.base_port < 60000);
assert!(config.test_data_dir.to_string_lossy().contains("ant_test_"));
}
#[test]
fn test_config_minimal() {
let config = TestNetworkConfig::minimal();
assert_eq!(config.node_count, 5);
assert_eq!(config.bootstrap_count, 2);
}
#[test]
fn test_config_isolation() {
let config1 = TestNetworkConfig::default();
let config2 = TestNetworkConfig::default();
assert_ne!(config1.test_data_dir, config2.test_data_dir);
}
#[test]
fn test_network_state_is_running() {
assert!(!NetworkState::Uninitialized.is_running());
assert!(NetworkState::Ready.is_running());
assert!(NetworkState::Stabilizing.is_running());
assert!(!NetworkState::Stopped.is_running());
}
#[tokio::test]
async fn test_invalid_bootstrap_count_rejected() {
let config = TestNetworkConfig {
node_count: 5,
bootstrap_count: 5, ..Default::default()
};
let result = TestNetwork::new(config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_zero_bootstrap_rejected() {
let config = TestNetworkConfig {
node_count: 5,
bootstrap_count: 0, ..Default::default()
};
let result = TestNetwork::new(config).await;
assert!(result.is_err());
}
}