#![allow(
dead_code,
clippy::unwrap_used,
clippy::expect_used,
clippy::cast_possible_truncation,
clippy::used_underscore_binding
)]
use ant_core::data::ClientConfig;
use ant_node::payment::{
EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
QuotingMetricsTracker,
};
use ant_node::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
use ant_protocol::evm::{testnet::Testnet, Network as EvmNetwork, RewardsAddress, Wallet};
use ant_protocol::transport::{
CoreNodeConfig, IPDiversityConfig, MlDsa65, MultiAddr, NodeIdentity, P2PEvent, P2PNode,
};
use ant_protocol::{CLOSE_GROUP_SIZE, MAX_WIRE_MESSAGE_SIZE};
use rand::Rng;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
const TEST_PORT_RANGE_MIN: u16 = 20_000;
const TEST_PORT_RANGE_MAX: u16 = 60_000;
const BOOTSTRAP_COUNT: usize = 2;
const SPAWN_DELAY_MS: u64 = 200;
const STABILIZATION_TIMEOUT_SECS: u64 = 180;
pub const DEFAULT_NODE_COUNT: usize = CLOSE_GROUP_SIZE * 2;
pub const MEDIAN_QUOTE_INDEX: usize = CLOSE_GROUP_SIZE / 2;
const TEST_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
const TEST_MAX_RECORDS: usize = 1280;
#[must_use]
pub fn test_client_config() -> ClientConfig {
ClientConfig {
quote_timeout_secs: 60,
store_timeout_secs: 60,
..Default::default()
}
}
pub struct TestNode {
pub p2p_node: Option<Arc<P2PNode>>,
pub protocol: Option<Arc<AntProtocol>>,
_handler_task: Option<tokio::task::JoinHandle<()>>,
}
pub struct MiniTestnet {
pub nodes: Vec<TestNode>,
_temp_dirs: Vec<tempfile::TempDir>,
_testnet: Testnet,
wallet: Wallet,
evm_network: EvmNetwork,
}
impl MiniTestnet {
pub async fn start(node_count: usize) -> Self {
let testnet = Testnet::new().await.expect("start Anvil testnet");
let evm_network = testnet.to_network();
let private_key = testnet
.default_wallet_private_key()
.expect("get wallet key");
let wallet = Wallet::new_from_private_key(evm_network.clone(), &private_key)
.expect("create funded wallet");
let bootstrap_count = BOOTSTRAP_COUNT.min(node_count);
let base_port = rand::thread_rng()
.gen_range(TEST_PORT_RANGE_MIN..TEST_PORT_RANGE_MAX - node_count as u16);
let mut nodes = Vec::with_capacity(node_count);
let mut temp_dirs = Vec::with_capacity(node_count);
let mut bootstrap_addrs = Vec::new();
for i in 0..bootstrap_count {
let port = base_port + i as u16;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let temp_dir = tempfile::TempDir::new().expect("create temp dir");
let (node, protocol, handler) =
Self::spawn_node(addr, &bootstrap_addrs, temp_dir.path(), &evm_network, i).await;
bootstrap_addrs.push(addr);
nodes.push(TestNode {
p2p_node: Some(Arc::clone(&node)),
protocol: Some(protocol),
_handler_task: Some(handler),
});
temp_dirs.push(temp_dir);
sleep(Duration::from_millis(SPAWN_DELAY_MS)).await;
}
for i in bootstrap_count..node_count {
let port = base_port + i as u16;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let temp_dir = tempfile::TempDir::new().expect("create temp dir");
let (node, protocol, handler) =
Self::spawn_node(addr, &bootstrap_addrs, temp_dir.path(), &evm_network, i).await;
nodes.push(TestNode {
p2p_node: Some(Arc::clone(&node)),
protocol: Some(protocol),
_handler_task: Some(handler),
});
temp_dirs.push(temp_dir);
sleep(Duration::from_millis(SPAWN_DELAY_MS)).await;
}
let min_routing_table_size = if node_count > 20 { 20 } else { node_count - 1 };
let deadline =
tokio::time::Instant::now() + Duration::from_secs(STABILIZATION_TIMEOUT_SECS);
loop {
let mut converged = true;
for n in &nodes {
if let Some(ref node) = n.p2p_node {
if node.dht().get_routing_table_size().await < min_routing_table_size {
converged = false;
break;
}
}
}
if converged {
break;
}
if tokio::time::Instant::now() > deadline {
break;
}
sleep(Duration::from_millis(500)).await;
}
let vault_address = evm_network.payment_vault_address();
wallet
.approve_to_spend_tokens(*vault_address, ant_protocol::evm::U256::MAX)
.await
.expect("approve payment vault token spend");
Self {
nodes,
_temp_dirs: temp_dirs,
_testnet: testnet,
wallet,
evm_network,
}
}
pub fn node(&self, index: usize) -> Option<Arc<P2PNode>> {
self.nodes.get(index).and_then(|n| n.p2p_node.clone())
}
pub fn wallet(&self) -> &Wallet {
&self.wallet
}
pub fn evm_network(&self) -> &EvmNetwork {
&self.evm_network
}
#[allow(clippy::too_many_lines)]
async fn spawn_node(
listen_addr: SocketAddr,
bootstrap_peers: &[SocketAddr],
data_dir: &std::path::Path,
evm_network: &EvmNetwork,
node_index: usize,
) -> (Arc<P2PNode>, Arc<AntProtocol>, tokio::task::JoinHandle<()>) {
let identity = Arc::new(NodeIdentity::generate().expect("generate node identity"));
let mut core_config = CoreNodeConfig::builder()
.port(listen_addr.port())
.ipv6(false)
.local(true)
.max_message_size(MAX_WIRE_MESSAGE_SIZE)
.build()
.expect("create core config");
core_config.bootstrap_peers = bootstrap_peers
.iter()
.map(|addr| MultiAddr::quic(*addr))
.collect();
core_config.connection_timeout = Duration::from_secs(5);
core_config.node_identity = Some(Arc::clone(&identity));
core_config.diversity_config = Some(IPDiversityConfig::permissive());
let node = Arc::new(P2PNode::new(core_config).await.expect("create P2P node"));
node.start().await.expect("start P2P node");
let storage_config = LmdbStorageConfig {
root_dir: data_dir.to_path_buf(),
verify_on_read: true,
max_map_size: 0,
disk_reserve: 0,
};
let storage = Arc::new(
LmdbStorage::new(storage_config)
.await
.expect("create storage"),
);
let mut addr_bytes = TEST_REWARDS_ADDRESS;
addr_bytes[19] = u8::try_from(node_index % 256).unwrap_or(0);
let rewards_address = RewardsAddress::new(addr_bytes);
let payment_config = PaymentVerifierConfig {
evm: EvmVerifierConfig {
network: evm_network.clone(),
},
cache_capacity: 1000,
local_rewards_address: rewards_address,
};
let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
payment_verifier.attach_p2p_node(Arc::clone(&node));
let metrics_tracker = QuotingMetricsTracker::new(TEST_MAX_RECORDS);
let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
let pub_key_bytes = identity.public_key().as_bytes().to_vec();
let sk_bytes = identity.secret_key_bytes().to_vec();
let sk = {
use ant_protocol::pqc::ops::MlDsaSecretKey;
MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize ML-DSA-65 secret key")
};
quote_generator.set_signer(pub_key_bytes, move |msg| {
use ant_protocol::pqc::ops::MlDsaOperations;
let ml_dsa = MlDsa65::new();
ml_dsa
.sign(&sk, msg)
.map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
});
let protocol = Arc::new(AntProtocol::new(
storage,
payment_verifier,
Arc::new(quote_generator),
));
let handler_node = Arc::clone(&node);
let handler_protocol = Arc::clone(&protocol);
let handler = tokio::spawn(async move {
let mut events = handler_node.subscribe_events();
loop {
match events.recv().await {
Ok(P2PEvent::Message {
topic,
source: Some(source_peer),
data,
}) => {
let protocol = Arc::clone(&handler_protocol);
let node = Arc::clone(&handler_node);
let topic_clone = topic.clone();
tokio::spawn(async move {
if topic_clone != ant_protocol::CHUNK_PROTOCOL_ID {
return;
}
match protocol.try_handle_request(&data).await {
Ok(Some(response_bytes)) => {
if let Err(e) = node
.send_message(
&source_peer,
&topic_clone,
response_bytes.to_vec(),
&[],
)
.await
{
eprintln!("ERROR: node {node_index} failed to send response to {source_peer}: {e}");
}
}
Ok(None) => {
}
Err(e) => {
eprintln!(
"ERROR: node {node_index} try_handle_request failed: {e}"
);
}
}
});
}
Ok(P2PEvent::Message { source: None, .. }) => {
eprintln!("WARNING: node {node_index} received message with no source");
}
Ok(_) => {}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
eprintln!("WARNING: node {node_index} handler lagged, dropped {n} events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
(node, protocol, handler)
}
pub fn shutdown_node(&mut self, index: usize) {
if let Some(node) = self.nodes.get_mut(index) {
if let Some(task) = node._handler_task.take() {
task.abort();
}
node.protocol = None;
node.p2p_node = None;
}
}
pub fn running_node_count(&self) -> usize {
self.nodes.iter().filter(|n| n.p2p_node.is_some()).count()
}
pub async fn teardown(self) {
for node in &self.nodes {
if let Some(ref task) = node._handler_task {
task.abort();
}
}
for node in &self.nodes {
if let Some(ref p2p_node) = node.p2p_node {
let _ = p2p_node.shutdown().await;
}
}
}
}