#![allow(clippy::unwrap_used)] #![allow(clippy::expect_used)]
use ant_quic::transport::TransportAddr;
use ant_quic::{
MtuConfig,
P2pConfig,
P2pEndpoint,
P2pEvent,
PeerId,
TraversalPhase,
unified_config::{AutoConnectPolicy, MdnsConfig, MdnsMode},
};
use clap::Parser;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
#[derive(Parser, Debug)]
#[command(name = "e2e-test-node")]
#[command(
author,
version,
about = "E2E test node with metrics push and data verification"
)]
struct Args {
#[arg(short, long, default_value = "[::]:0")]
listen: SocketAddr,
#[arg(short = 'k', long, value_delimiter = ',')]
known_peers: Vec<SocketAddr>,
#[arg(long)]
metrics_server: Option<String>,
#[arg(long, default_value = "5")]
metrics_interval: u64,
#[arg(long, default_value = "0")]
generate_data: u64,
#[arg(long)]
verify_data: bool,
#[arg(long)]
node_id: Option<String>,
#[arg(long, default_value = "local")]
node_location: String,
#[arg(long, default_value = "65536")]
chunk_size: usize,
#[arg(short, long)]
verbose: bool,
#[arg(long, default_value = "0")]
duration: u64,
#[arg(long)]
pqc_mtu: bool,
#[arg(long)]
no_port_mapping: bool,
#[arg(long)]
json: bool,
#[arg(long)]
counter_test: bool,
#[arg(long, default_value = "1000")]
counter_interval: u64,
#[arg(long)]
echo: bool,
#[arg(long, conflicts_with = "no_mdns")]
mdns: bool,
#[arg(long, conflicts_with = "mdns")]
no_mdns: bool,
#[arg(long)]
mdns_service: Option<String>,
#[arg(long)]
mdns_namespace: Option<String>,
#[arg(long, value_enum)]
mdns_mode: Option<CliMdnsMode>,
#[arg(long, value_enum)]
mdns_auto_connect: Option<CliMdnsAutoConnect>,
#[arg(long)]
show_progress: bool,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, clap::ValueEnum)]
enum CliMdnsMode {
Browse,
Advertise,
Both,
}
impl From<CliMdnsMode> for MdnsMode {
fn from(value: CliMdnsMode) -> Self {
match value {
CliMdnsMode::Browse => Self::BrowseOnly,
CliMdnsMode::Advertise => Self::AdvertiseOnly,
CliMdnsMode::Both => Self::Both,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, clap::ValueEnum)]
enum CliMdnsAutoConnect {
Disabled,
ApprovalRequired,
Enabled,
}
impl From<CliMdnsAutoConnect> for AutoConnectPolicy {
fn from(value: CliMdnsAutoConnect) -> Self {
match value {
CliMdnsAutoConnect::Disabled => Self::Disabled,
CliMdnsAutoConnect::ApprovalRequired => Self::ApprovalRequired,
CliMdnsAutoConnect::Enabled => Self::Enabled,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub peer_id: String,
pub remote_addr: String,
pub connected_at: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub connection_type: String, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeMetricsReport {
pub node_id: String,
pub location: String,
pub timestamp: u64,
pub uptime_secs: u64,
pub active_connections: usize,
pub bytes_sent_total: u64,
pub bytes_received_total: u64,
pub current_throughput_mbps: f64,
pub nat_traversal_successes: u64,
pub nat_traversal_failures: u64,
pub direct_connections: u64,
pub relayed_connections: u64,
pub data_chunks_sent: u64,
pub data_chunks_verified: u64,
pub data_verification_failures: u64,
pub external_addresses: Vec<String>,
pub connected_peers: Vec<PeerInfo>,
pub local_addr: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerifiedDataChunk {
pub sequence: u64,
pub data: Vec<u8>,
pub checksum: String,
pub timestamp: u64,
}
impl VerifiedDataChunk {
pub fn new(sequence: u64, data: Vec<u8>) -> Self {
let checksum = compute_sha256(&data);
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
Self {
sequence,
data,
checksum,
timestamp,
}
}
pub fn verify(&self) -> bool {
compute_sha256(&self.data) == self.checksum
}
}
#[derive(Debug, Default)]
struct RuntimeStats {
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
connections_accepted: AtomicU64,
connections_initiated: AtomicU64,
nat_traversals_completed: AtomicU64,
nat_traversals_failed: AtomicU64,
external_addresses_discovered: AtomicU64,
direct_connections: AtomicU64,
relayed_connections: AtomicU64,
data_chunks_sent: AtomicU64,
data_chunks_verified: AtomicU64,
data_verification_failures: AtomicU64,
counters_sent: AtomicU64,
counters_received: AtomicU64,
echoes_sent: AtomicU64,
}
#[derive(Debug, Clone)]
struct PeerState {
peer_id: PeerId,
remote_addr: TransportAddr,
connected_at: Instant,
bytes_sent: u64,
bytes_received: u64,
connection_type: String,
}
fn compute_sha256(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
hex::encode(hasher.finalize())
}
#[derive(Debug, Clone)]
struct TestDataChunks {
remaining: u64,
chunk_size: usize,
sequence: u64,
}
impl TestDataChunks {
fn remaining_chunks(&self) -> u64 {
data_chunk_count(self.remaining, self.chunk_size)
}
}
impl Iterator for TestDataChunks {
type Item = VerifiedDataChunk;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 || self.chunk_size == 0 {
return None;
}
let this_chunk = std::cmp::min(self.remaining, self.chunk_size as u64) as usize;
let sequence = self.sequence;
let data: Vec<u8> = (0..this_chunk)
.map(|i| ((sequence + i as u64) % 256) as u8)
.collect();
self.remaining -= this_chunk as u64;
self.sequence += 1;
Some(VerifiedDataChunk::new(sequence, data))
}
}
fn data_chunk_count(size: u64, chunk_size: usize) -> u64 {
if size == 0 || chunk_size == 0 {
0
} else {
size.div_ceil(chunk_size as u64)
}
}
fn generate_test_data(size: u64, chunk_size: usize) -> TestDataChunks {
TestDataChunks {
remaining: size,
chunk_size,
sequence: 0,
}
}
fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = KB * 1024;
const GB: u64 = MB * 1024;
if bytes >= GB {
format!("{:.2} GB", bytes as f64 / GB as f64)
} else if bytes >= MB {
format!("{:.2} MB", bytes as f64 / MB as f64)
} else if bytes >= KB {
format!("{:.2} KB", bytes as f64 / KB as f64)
} else {
format!("{} B", bytes)
}
}
fn format_peer_id(peer_id: &PeerId) -> String {
hex::encode(&peer_id.0[..8])
}
fn print_json(value: serde_json::Value) {
println!("{value}");
}
fn final_stats_json(node_id: &str, stats: &RuntimeStats, duration: Duration) -> serde_json::Value {
json!({
"type": "final_stats",
"node_id": node_id,
"duration_secs": duration.as_secs_f64(),
"bytes_sent": stats.bytes_sent.load(Ordering::SeqCst),
"bytes_received": stats.bytes_received.load(Ordering::SeqCst),
"connections_accepted": stats.connections_accepted.load(Ordering::SeqCst),
"connections_initiated": stats.connections_initiated.load(Ordering::SeqCst),
"nat_traversals_completed": stats.nat_traversals_completed.load(Ordering::SeqCst),
"nat_traversals_failed": stats.nat_traversals_failed.load(Ordering::SeqCst),
"chunks_sent": stats.data_chunks_sent.load(Ordering::SeqCst),
"chunks_verified": stats.data_chunks_verified.load(Ordering::SeqCst),
"verification_failures": stats.data_verification_failures.load(Ordering::SeqCst),
})
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let log_level = if args.verbose { "debug" } else { "info" };
tracing_subscriber::fmt()
.with_env_filter(format!("ant_quic={log_level},e2e_test_node={log_level}"))
.init();
info!("E2E Test Node v{}", env!("CARGO_PKG_VERSION"));
info!("Starting in {} mode...", args.node_location);
let mut builder = P2pConfig::builder().bind_addr(args.listen);
for addr in &args.known_peers {
builder = builder.known_peer(*addr);
}
if args.pqc_mtu {
builder = builder.mtu(MtuConfig::pqc_optimized());
info!("Using PQC-optimized MTU settings");
}
if args.no_port_mapping {
builder = builder.port_mapping_enabled(false);
info!("Best-effort router port mapping disabled");
}
let mdns_requested = args.mdns
|| args.mdns_service.is_some()
|| args.mdns_namespace.is_some()
|| args.mdns_mode.is_some()
|| args.mdns_auto_connect.is_some();
if args.no_mdns && mdns_requested {
anyhow::bail!("--no-mdns cannot be combined with other mDNS configuration flags");
}
if args.no_mdns {
builder = builder.mdns_enabled(false);
info!("First-party mDNS disabled");
} else {
let mut mdns_config = MdnsConfig::default();
if let Some(service) = args.mdns_service.clone() {
mdns_config.service = Some(service);
}
if let Some(namespace) = args.mdns_namespace.clone() {
mdns_config.namespace = Some(namespace);
}
if let Some(mode) = args.mdns_mode {
mdns_config.mode = mode.into();
}
if let Some(auto_connect) = args.mdns_auto_connect {
mdns_config.auto_connect = auto_connect.into();
}
builder = builder.mdns(mdns_config);
}
let config = builder.build()?;
info!("Creating P2P endpoint...");
let endpoint = P2pEndpoint::new(config).await?;
let node_id = args.node_id.unwrap_or_else(|| {
let peer_id = endpoint.peer_id();
format!("node-{}", hex::encode(&peer_id.0[..4]))
});
let peer_id = endpoint.peer_id();
let public_key = endpoint.public_key_bytes();
info!("═══════════════════════════════════════════════════════════════");
info!(" E2E TEST NODE");
info!("═══════════════════════════════════════════════════════════════");
info!("Node ID: {}", node_id);
info!("Location: {}", args.node_location);
info!("Peer ID: {}", format_peer_id(&peer_id));
info!("Public Key: {}", hex::encode(public_key));
if let Some(addr) = endpoint.local_addr() {
info!("Local Address: {}", addr);
}
if let Some(ref server) = args.metrics_server {
info!("Metrics Server: {}", server);
}
if args.generate_data > 0 {
info!(
"Data Generation: {} ({} chunks)",
format_bytes(args.generate_data),
data_chunk_count(args.generate_data, args.chunk_size)
);
}
info!("═══════════════════════════════════════════════════════════════");
if args.json {
if let Some(addr) = endpoint.local_addr() {
print_json(json!({
"event": "local_identity",
"peer_id": hex::encode(peer_id.0),
"addr": addr.to_string(),
}));
} else {
print_json(json!({
"event": "local_identity",
"peer_id": hex::encode(peer_id.0),
}));
}
}
let shutdown = CancellationToken::new();
let shutdown_clone = shutdown.clone();
let stats = Arc::new(RuntimeStats::default());
let peers: Arc<RwLock<HashMap<PeerId, PeerState>>> = Arc::new(RwLock::new(HashMap::new()));
let external_addrs: Arc<RwLock<Vec<TransportAddr>>> = Arc::new(RwLock::new(Vec::new()));
let start_time = Instant::now();
tokio::spawn(async move {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c: {}", e);
}
info!("Shutdown signal received");
shutdown_clone.cancel();
});
let endpoint_events = endpoint.clone();
let shutdown_events = shutdown.clone();
let stats_events = stats.clone();
let peers_events = peers.clone();
let external_addrs_events = external_addrs.clone();
let json_output = args.json;
let event_handle = tokio::spawn(async move {
let mut events = endpoint_events.subscribe();
while !shutdown_events.is_cancelled() {
match tokio::time::timeout(Duration::from_millis(100), events.recv()).await {
Ok(Ok(event)) => {
handle_event(
&event,
&stats_events,
&peers_events,
&external_addrs_events,
json_output,
)
.await;
}
Ok(Err(_)) => break,
Err(_) => continue,
}
}
});
let metrics_handle = if let Some(ref server) = args.metrics_server {
let server = server.clone();
let endpoint_metrics = endpoint.clone();
let shutdown_metrics = shutdown.clone();
let stats_metrics = stats.clone();
let peers_metrics = peers.clone();
let external_addrs_metrics = external_addrs.clone();
let node_id_metrics = node_id.clone();
let location = args.node_location.clone();
let interval = args.metrics_interval;
Some(tokio::spawn(async move {
let client = reqwest::Client::new();
let mut interval_timer = tokio::time::interval(Duration::from_secs(interval));
while !shutdown_metrics.is_cancelled() {
interval_timer.tick().await;
let report = build_metrics_report(
&node_id_metrics,
&location,
&endpoint_metrics,
&stats_metrics,
&peers_metrics,
&external_addrs_metrics,
start_time,
)
.await;
match client
.post(format!("{}/api/metrics", server))
.json(&report)
.timeout(Duration::from_secs(5))
.send()
.await
{
Ok(resp) if resp.status().is_success() => {
debug!("Metrics pushed successfully");
}
Ok(resp) => {
warn!("Metrics push returned {}", resp.status());
}
Err(e) => {
debug!("Failed to push metrics: {}", e);
}
}
}
}))
} else {
None
};
let endpoint_recv = endpoint.clone();
let shutdown_recv = shutdown.clone();
let stats_recv = stats.clone();
let verify_data = args.verify_data;
let echo_enabled = args.echo;
let json = args.json;
let recv_handle = tokio::spawn(async move {
loop {
let result = tokio::select! {
r = endpoint_recv.recv() => r,
_ = shutdown_recv.cancelled() => break,
};
match result {
Ok((peer_id, data)) => {
stats_recv
.bytes_received
.fetch_add(data.len() as u64, Ordering::SeqCst);
if data.len() == 8 {
if let Ok(bytes) = data[..8].try_into() {
let counter = u64::from_be_bytes(bytes);
stats_recv.counters_received.fetch_add(1, Ordering::SeqCst);
if json {
print_json(json!({
"event": "counter_received",
"counter": counter,
"peer": format_peer_id(&peer_id),
}));
} else {
debug!(
"Received counter {} from {}",
counter,
format_peer_id(&peer_id)
);
}
}
} else if verify_data {
if let Ok(chunk) = serde_json::from_slice::<VerifiedDataChunk>(&data) {
if chunk.verify() {
stats_recv
.data_chunks_verified
.fetch_add(1, Ordering::SeqCst);
if json {
print_json(json!({
"event": "chunk_verified",
"sequence": chunk.sequence,
"peer": format_peer_id(&peer_id),
"size": chunk.data.len(),
}));
} else {
debug!(
"Verified chunk {} from {} ({} bytes)",
chunk.sequence,
format_peer_id(&peer_id),
chunk.data.len()
);
}
} else {
stats_recv
.data_verification_failures
.fetch_add(1, Ordering::SeqCst);
error!(
"Verification FAILED for chunk {} from {}",
chunk.sequence,
format_peer_id(&peer_id)
);
}
}
} else if json {
print_json(json!({
"event": "data_received",
"bytes": data.len(),
"peer": format_peer_id(&peer_id),
}));
} else {
debug!(
"Received {} bytes from {}",
data.len(),
format_peer_id(&peer_id)
);
}
if echo_enabled {
let endpoint_send = endpoint_recv.clone();
let stats_send = stats_recv.clone();
tokio::spawn(async move {
if let Err(e) = endpoint_send.send(&peer_id, &data).await {
debug!("Failed to echo: {}", e);
} else {
stats_send
.bytes_sent
.fetch_add(data.len() as u64, Ordering::SeqCst);
stats_send.echoes_sent.fetch_add(1, Ordering::SeqCst);
}
});
}
}
Err(_) => {
}
}
}
});
let counter_handle = if args.counter_test {
let endpoint_counter = endpoint.clone();
let shutdown_counter = shutdown.clone();
let stats_counter = stats.clone();
let interval_ms = args.counter_interval;
let json = args.json;
Some(tokio::spawn(async move {
let mut counter: u64 = 0;
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
while !shutdown_counter.is_cancelled() {
interval.tick().await;
counter += 1;
let connected_peers = endpoint_counter.connected_peers().await;
let mut send_tasks = Vec::with_capacity(connected_peers.len());
for peer in connected_peers {
let endpoint_send = endpoint_counter.clone();
let stats_send = stats_counter.clone();
let peer_id = peer.peer_id;
send_tasks.push(tokio::spawn(async move {
let data = counter.to_be_bytes();
match endpoint_send.send(&peer_id, &data).await {
Ok(()) => {
stats_send.counters_sent.fetch_add(1, Ordering::SeqCst);
stats_send
.bytes_sent
.fetch_add(data.len() as u64, Ordering::SeqCst);
if json {
print_json(json!({
"event": "counter_sent",
"counter": counter,
"peer": format_peer_id(&peer_id),
}));
}
}
Err(e) => debug!(
"Failed to send counter {} to {}: {}",
counter,
format_peer_id(&peer_id),
e
),
}
}));
}
for task in send_tasks {
if let Err(e) = task.await {
debug!("Counter send task join error: {}", e);
}
}
}
}))
} else {
None
};
if !args.known_peers.is_empty() {
info!("Connecting to {} known peer(s)...", args.known_peers.len());
match endpoint.connect_known_peers().await {
Ok(count) => {
info!("Connected to {} known peer(s)", count);
stats
.connections_initiated
.fetch_add(count as u64, Ordering::SeqCst);
}
Err(e) => {
error!("Failed to connect to known peers: {}", e);
}
}
}
let data_handle = if args.generate_data > 0 {
let endpoint_data = endpoint.clone();
let shutdown_data = shutdown.clone();
let stats_data = stats.clone();
let peers_data = peers.clone();
let data_size = args.generate_data;
let chunk_size = args.chunk_size;
let show_progress = args.show_progress;
let json = args.json;
Some(tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
let connected_peers: Vec<PeerId> = peers_data.read().await.keys().cloned().collect();
if connected_peers.is_empty() {
warn!("No connected peers to send data to");
return;
}
let chunks = generate_test_data(data_size, chunk_size);
let total_chunks = chunks.remaining_chunks();
info!("Sending data to {} peer(s)...", connected_peers.len());
let send_start = Instant::now();
let mut chunks_sent = 0u64;
let mut last_progress = Instant::now();
for (idx, chunk) in chunks.enumerate() {
if shutdown_data.is_cancelled() {
break;
}
let chunk_bytes = serde_json::to_vec(&chunk).expect("Failed to serialize chunk");
for peer_id in &connected_peers {
match endpoint_data.send(peer_id, &chunk_bytes).await {
Ok(()) => {
stats_data
.bytes_sent
.fetch_add(chunk_bytes.len() as u64, Ordering::SeqCst);
stats_data.data_chunks_sent.fetch_add(1, Ordering::SeqCst);
chunks_sent += 1;
}
Err(e) => {
debug!(
"Failed to send chunk {} to {}: {}",
idx,
format_peer_id(peer_id),
e
);
}
}
}
if show_progress && last_progress.elapsed() > Duration::from_secs(1) {
let progress = (idx + 1) as f64 / total_chunks as f64 * 100.0;
let elapsed = send_start.elapsed().as_secs_f64();
let bytes_sent = stats_data.bytes_sent.load(Ordering::SeqCst);
let throughput_mbps = (bytes_sent as f64 * 8.0) / (elapsed * 1_000_000.0);
if json {
print_json(json!({
"event": "progress",
"percent": progress,
"chunks_sent": chunks_sent,
"throughput_mbps": throughput_mbps,
}));
} else {
info!(
"Progress: {:.1}% ({}/{} chunks, {:.2} Mbps)",
progress,
idx + 1,
total_chunks,
throughput_mbps
);
}
last_progress = Instant::now();
}
}
let elapsed = send_start.elapsed();
let throughput_mbps = (data_size as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
if json {
print_json(json!({
"event": "data_transfer_complete",
"chunks_sent": chunks_sent,
"bytes": data_size,
"duration_secs": elapsed.as_secs_f64(),
"throughput_mbps": throughput_mbps,
}));
} else {
info!("═══════════════════════════════════════════════════════════════");
info!(" DATA TRANSFER COMPLETE");
info!("═══════════════════════════════════════════════════════════════");
info!(" Chunks sent: {}", chunks_sent);
info!(" Total data: {}", format_bytes(data_size));
info!(" Duration: {:.2}s", elapsed.as_secs_f64());
info!(" Throughput: {:.2} Mbps", throughput_mbps);
info!("═══════════════════════════════════════════════════════════════");
}
}))
} else {
None
};
let duration = if args.duration > 0 {
Some(Duration::from_secs(args.duration))
} else {
None
};
info!("Ready. Press Ctrl+C to shutdown.");
while !shutdown.is_cancelled() {
if let Some(max_duration) = duration
&& start_time.elapsed() > max_duration
{
info!("Duration limit reached");
break;
}
match tokio::time::timeout(Duration::from_millis(100), endpoint.accept()).await {
Ok(Some(peer)) => {
info!(
"Accepted connection from: {} at {}",
format_peer_id(&peer.peer_id),
peer.remote_addr
);
stats.connections_accepted.fetch_add(1, Ordering::SeqCst);
let mut peers_guard = peers.write().await;
peers_guard.insert(
peer.peer_id,
PeerState {
peer_id: peer.peer_id,
remote_addr: peer.remote_addr,
connected_at: Instant::now(),
bytes_sent: 0,
bytes_received: 0,
connection_type: "direct".to_string(),
},
);
}
Ok(None) => {}
Err(_) => {}
}
}
info!("Shutting down...");
shutdown.cancel();
endpoint.shutdown().await;
event_handle.abort();
recv_handle.abort();
if let Some(h) = counter_handle {
h.abort();
}
if let Some(h) = metrics_handle {
h.abort();
}
if let Some(h) = data_handle {
let _ = h.await;
}
print_final_stats(&node_id, &stats, start_time.elapsed(), args.json);
info!("Goodbye!");
Ok(())
}
async fn handle_event(
event: &P2pEvent,
stats: &RuntimeStats,
peers: &RwLock<HashMap<PeerId, PeerState>>,
external_addrs: &RwLock<Vec<TransportAddr>>,
json: bool,
) {
match event {
P2pEvent::PeerConnected {
peer_id,
addr,
side,
traversal_method,
} => {
let direction = if side.is_client() {
"outbound"
} else {
"inbound"
};
let connection_type = match traversal_method {
ant_quic::TraversalMethod::Direct => "direct",
ant_quic::TraversalMethod::HolePunch
| ant_quic::TraversalMethod::PortPrediction => "nat_traversed",
ant_quic::TraversalMethod::Relay => "relayed",
};
peers.write().await.insert(
*peer_id,
PeerState {
peer_id: *peer_id,
remote_addr: addr.clone(),
connected_at: Instant::now(),
bytes_sent: 0,
bytes_received: 0,
connection_type: connection_type.to_string(),
},
);
if json {
print_json(json!({
"event": "peer_connected",
"peer_id": format_peer_id(peer_id),
"addr": addr.to_string(),
"direction": direction,
"connection_type": connection_type,
}));
} else {
info!(
"Peer connected: {} at {} ({} / {})",
format_peer_id(peer_id),
addr,
direction,
connection_type
);
}
}
P2pEvent::PeerDisconnected { peer_id, reason } => {
peers.write().await.remove(peer_id);
if json {
print_json(json!({
"event": "peer_disconnected",
"peer_id": format_peer_id(peer_id),
"reason": format!("{reason:?}"),
}));
} else {
info!(
"Peer disconnected: {} ({:?})",
format_peer_id(peer_id),
reason
);
}
}
P2pEvent::PortMappingEstablished { external_addr }
| P2pEvent::PortMappingRenewed { external_addr } => {
if json {
print_json(json!({
"event": "port_mapping_established",
"addr": external_addr.to_string(),
}));
} else {
info!("Port mapping active at {}", external_addr);
}
}
P2pEvent::PortMappingAddressChanged {
previous_addr,
external_addr,
} => {
if json {
print_json(json!({
"event": "port_mapping_address_changed",
"previous_addr": previous_addr.to_string(),
"external_addr": external_addr.to_string(),
}));
} else {
info!(
"Port mapping address changed from {} to {}",
previous_addr, external_addr
);
}
}
P2pEvent::PortMappingFailed { error } => {
if json {
print_json(json!({
"event": "port_mapping_failed",
"error": error,
}));
} else {
warn!("Port mapping failed: {}", error);
}
}
P2pEvent::ExternalAddressDiscovered { addr } => {
stats
.external_addresses_discovered
.fetch_add(1, Ordering::SeqCst);
external_addrs.write().await.push(addr.clone());
if json {
print_json(json!({
"event": "external_address_discovered",
"addr": addr.to_string(),
}));
} else {
info!("External address discovered: {}", addr);
}
}
P2pEvent::NatTraversalProgress { peer_id, phase } => {
match phase {
TraversalPhase::Connected => {
stats
.nat_traversals_completed
.fetch_add(1, Ordering::SeqCst);
}
TraversalPhase::Failed => {
stats.nat_traversals_failed.fetch_add(1, Ordering::SeqCst);
}
_ => {}
}
if json {
print_json(json!({
"event": "nat_traversal_progress",
"peer_id": format_peer_id(peer_id),
"phase": format!("{phase:?}"),
}));
} else {
debug!(
"NAT traversal progress: {} - {:?}",
format_peer_id(peer_id),
phase
);
}
}
P2pEvent::MdnsServiceAdvertised {
service,
namespace,
instance_fullname,
} => {
if json {
print_json(json!({
"event": "mdns_service_advertised",
"service": service,
"namespace": namespace.as_deref(),
"instance_fullname": instance_fullname,
}));
} else {
info!(
"mDNS service advertised: {} ({})",
instance_fullname,
namespace
.clone()
.unwrap_or_else(|| "no namespace".to_string())
);
}
}
P2pEvent::MdnsPeerDiscovered { peer } => {
if json {
print_json(json!({
"event": "mdns_peer_discovered",
"fullname": peer.fullname,
"addresses": peer
.addresses
.iter()
.map(SocketAddr::to_string)
.collect::<Vec<_>>()
.join(","),
}));
} else {
info!("mDNS peer discovered: {}", peer.fullname);
}
}
P2pEvent::MdnsPeerUpdated { peer } => {
if json {
print_json(json!({
"event": "mdns_peer_updated",
"fullname": peer.fullname,
"addresses": peer
.addresses
.iter()
.map(SocketAddr::to_string)
.collect::<Vec<_>>()
.join(","),
}));
}
}
P2pEvent::MdnsPeerRemoved { peer } => {
if json {
print_json(json!({
"event": "mdns_peer_removed",
"fullname": peer.fullname,
}));
}
}
P2pEvent::MdnsAutoConnectSucceeded { peer, .. } => {
if json {
print_json(json!({
"event": "mdns_auto_connect_succeeded",
"fullname": peer.fullname,
}));
}
}
P2pEvent::MdnsAutoConnectFailed { peer, error, .. } => {
if json {
print_json(json!({
"event": "mdns_auto_connect_failed",
"fullname": peer.fullname,
"error": error,
}));
}
}
P2pEvent::DataReceived { peer_id, bytes } => {
stats
.bytes_received
.fetch_add(*bytes as u64, Ordering::SeqCst);
debug!(
"Data received: {} bytes from {}",
bytes,
format_peer_id(peer_id)
);
}
_ => {
debug!("Event: {:?}", event);
}
}
}
async fn build_metrics_report(
node_id: &str,
location: &str,
endpoint: &P2pEndpoint,
stats: &RuntimeStats,
peers: &RwLock<HashMap<PeerId, PeerState>>,
external_addrs: &RwLock<Vec<TransportAddr>>,
start_time: Instant,
) -> NodeMetricsReport {
let uptime = start_time.elapsed();
let bytes_sent = stats.bytes_sent.load(Ordering::SeqCst);
let bytes_received = stats.bytes_received.load(Ordering::SeqCst);
let total_bytes = bytes_sent + bytes_received;
let throughput_mbps = if uptime.as_secs() > 0 {
(total_bytes as f64 * 8.0) / (uptime.as_secs_f64() * 1_000_000.0)
} else {
0.0
};
let peers_guard = peers.read().await;
let connected_peers: Vec<PeerInfo> = peers_guard
.values()
.map(|p| PeerInfo {
peer_id: format_peer_id(&p.peer_id),
remote_addr: p.remote_addr.to_string(),
connected_at: p.connected_at.elapsed().as_secs(),
bytes_sent: p.bytes_sent,
bytes_received: p.bytes_received,
connection_type: p.connection_type.clone(),
})
.collect();
let external_addresses: Vec<String> = external_addrs
.read()
.await
.iter()
.map(|a| a.to_string())
.collect();
let local_addr = endpoint
.local_addr()
.map(|a| a.to_string())
.unwrap_or_default();
NodeMetricsReport {
node_id: node_id.to_string(),
location: location.to_string(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
uptime_secs: uptime.as_secs(),
active_connections: connected_peers.len(),
bytes_sent_total: bytes_sent,
bytes_received_total: bytes_received,
current_throughput_mbps: throughput_mbps,
nat_traversal_successes: stats.nat_traversals_completed.load(Ordering::SeqCst),
nat_traversal_failures: stats.nat_traversals_failed.load(Ordering::SeqCst),
direct_connections: stats.direct_connections.load(Ordering::SeqCst),
relayed_connections: stats.relayed_connections.load(Ordering::SeqCst),
data_chunks_sent: stats.data_chunks_sent.load(Ordering::SeqCst),
data_chunks_verified: stats.data_chunks_verified.load(Ordering::SeqCst),
data_verification_failures: stats.data_verification_failures.load(Ordering::SeqCst),
external_addresses,
connected_peers,
local_addr,
}
}
fn print_final_stats(node_id: &str, stats: &RuntimeStats, duration: Duration, json: bool) {
let bytes_sent = stats.bytes_sent.load(Ordering::SeqCst);
let bytes_received = stats.bytes_received.load(Ordering::SeqCst);
let secs = duration.as_secs_f64();
if json {
print_json(final_stats_json(node_id, stats, duration));
} else {
info!("═══════════════════════════════════════════════════════════════");
info!(" FINAL STATISTICS");
info!("═══════════════════════════════════════════════════════════════");
info!(" Node ID: {}", node_id);
info!(" Duration: {:.2}s", secs);
info!(
" Connections accepted: {}",
stats.connections_accepted.load(Ordering::SeqCst)
);
info!(
" Connections initiated: {}",
stats.connections_initiated.load(Ordering::SeqCst)
);
info!(
" NAT traversals completed: {}",
stats.nat_traversals_completed.load(Ordering::SeqCst)
);
info!(
" NAT traversals failed: {}",
stats.nat_traversals_failed.load(Ordering::SeqCst)
);
info!(" Bytes sent: {}", format_bytes(bytes_sent));
info!(" Bytes received: {}", format_bytes(bytes_received));
info!(
" Data chunks sent: {}",
stats.data_chunks_sent.load(Ordering::SeqCst)
);
info!(
" Data chunks verified: {}",
stats.data_chunks_verified.load(Ordering::SeqCst)
);
info!(
" Verification failures: {}",
stats.data_verification_failures.load(Ordering::SeqCst)
);
if secs > 0.0 {
let total_bytes = bytes_sent + bytes_received;
let throughput_mbps = (total_bytes as f64 * 8.0) / (secs * 1_000_000.0);
info!(" Throughput: {:.2} Mbps", throughput_mbps);
}
info!("═══════════════════════════════════════════════════════════════");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn final_stats_json_escapes_node_id() {
let stats = RuntimeStats::default();
stats.bytes_sent.store(42, Ordering::SeqCst);
let node_id = "bad\"id\nnext";
let line = final_stats_json(node_id, &stats, Duration::from_millis(1250)).to_string();
let parsed: serde_json::Value =
serde_json::from_str(&line).expect("final stats must be valid JSON");
assert_eq!(parsed["node_id"].as_str(), Some(node_id));
assert_eq!(parsed["bytes_sent"].as_u64(), Some(42));
}
#[test]
fn generate_test_data_yields_chunks_lazily() {
let mut chunks = generate_test_data(10, 4);
assert_eq!(chunks.remaining_chunks(), 3);
let first = chunks.next().expect("first chunk");
assert_eq!(first.sequence, 0);
assert_eq!(first.data, vec![0, 1, 2, 3]);
assert_eq!(chunks.remaining_chunks(), 2);
let second = chunks.next().expect("second chunk");
assert_eq!(second.sequence, 1);
assert_eq!(second.data, vec![1, 2, 3, 4]);
assert_eq!(chunks.remaining_chunks(), 1);
let third = chunks.next().expect("third chunk");
assert_eq!(third.sequence, 2);
assert_eq!(third.data, vec![2, 3]);
assert_eq!(chunks.remaining_chunks(), 0);
assert!(chunks.next().is_none());
}
#[test]
fn data_chunk_count_handles_empty_and_partial_chunks() {
assert_eq!(data_chunk_count(0, 64), 0);
assert_eq!(data_chunk_count(1, 64), 1);
assert_eq!(data_chunk_count(64, 64), 1);
assert_eq!(data_chunk_count(65, 64), 2);
assert_eq!(data_chunk_count(65, 0), 0);
}
}