#![allow(clippy::unwrap_used)] #![allow(clippy::expect_used)]
use ant_quic::transport::TransportAddr;
use ant_quic::{
MtuConfig,
P2pConfig,
P2pEndpoint,
P2pEvent,
PeerId,
TraversalPhase,
unified_config::{AutoConnectPolicy, MdnsConfig, MdnsMode},
};
use clap::Parser;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
#[derive(Parser, Debug)]
#[command(name = "e2e-test-node")]
#[command(
author,
version,
about = "E2E test node with metrics push and data verification"
)]
struct Args {
#[arg(short, long, default_value = "[::]:0")]
listen: SocketAddr,
#[arg(short = 'k', long, value_delimiter = ',')]
known_peers: Vec<SocketAddr>,
#[arg(long)]
metrics_server: Option<String>,
#[arg(long, default_value = "5")]
metrics_interval: u64,
#[arg(long, default_value = "0")]
generate_data: u64,
#[arg(long)]
verify_data: bool,
#[arg(long)]
node_id: Option<String>,
#[arg(long, default_value = "local")]
node_location: String,
#[arg(long, default_value = "65536")]
chunk_size: usize,
#[arg(short, long)]
verbose: bool,
#[arg(long, default_value = "0")]
duration: u64,
#[arg(long)]
pqc_mtu: bool,
#[arg(long)]
no_port_mapping: bool,
#[arg(long)]
json: bool,
#[arg(long)]
counter_test: bool,
#[arg(long, default_value = "1000")]
counter_interval: u64,
#[arg(long)]
echo: bool,
#[arg(long, conflicts_with = "no_mdns")]
mdns: bool,
#[arg(long, conflicts_with = "mdns")]
no_mdns: bool,
#[arg(long)]
mdns_service: Option<String>,
#[arg(long)]
mdns_namespace: Option<String>,
#[arg(long, value_enum)]
mdns_mode: Option<CliMdnsMode>,
#[arg(long, value_enum)]
mdns_auto_connect: Option<CliMdnsAutoConnect>,
#[arg(long)]
show_progress: bool,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, clap::ValueEnum)]
enum CliMdnsMode {
Browse,
Advertise,
Both,
}
impl From<CliMdnsMode> for MdnsMode {
fn from(value: CliMdnsMode) -> Self {
match value {
CliMdnsMode::Browse => Self::BrowseOnly,
CliMdnsMode::Advertise => Self::AdvertiseOnly,
CliMdnsMode::Both => Self::Both,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, clap::ValueEnum)]
enum CliMdnsAutoConnect {
Disabled,
ApprovalRequired,
Enabled,
}
impl From<CliMdnsAutoConnect> for AutoConnectPolicy {
fn from(value: CliMdnsAutoConnect) -> Self {
match value {
CliMdnsAutoConnect::Disabled => Self::Disabled,
CliMdnsAutoConnect::ApprovalRequired => Self::ApprovalRequired,
CliMdnsAutoConnect::Enabled => Self::Enabled,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub peer_id: String,
pub remote_addr: String,
pub connected_at: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub connection_type: String, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeMetricsReport {
pub node_id: String,
pub location: String,
pub timestamp: u64,
pub uptime_secs: u64,
pub active_connections: usize,
pub bytes_sent_total: u64,
pub bytes_received_total: u64,
pub current_throughput_mbps: f64,
pub nat_traversal_successes: u64,
pub nat_traversal_failures: u64,
pub direct_connections: u64,
pub relayed_connections: u64,
pub data_chunks_sent: u64,
pub data_chunks_verified: u64,
pub data_verification_failures: u64,
pub external_addresses: Vec<String>,
pub connected_peers: Vec<PeerInfo>,
pub local_addr: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerifiedDataChunk {
pub sequence: u64,
pub data: Vec<u8>,
pub checksum: String,
pub timestamp: u64,
}
impl VerifiedDataChunk {
pub fn new(sequence: u64, data: Vec<u8>) -> Self {
let checksum = compute_sha256(&data);
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
Self {
sequence,
data,
checksum,
timestamp,
}
}
pub fn verify(&self) -> bool {
compute_sha256(&self.data) == self.checksum
}
}
#[derive(Debug, Default)]
struct RuntimeStats {
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
connections_accepted: AtomicU64,
connections_initiated: AtomicU64,
nat_traversals_completed: AtomicU64,
nat_traversals_failed: AtomicU64,
external_addresses_discovered: AtomicU64,
direct_connections: AtomicU64,
relayed_connections: AtomicU64,
data_chunks_sent: AtomicU64,
data_chunks_verified: AtomicU64,
data_verification_failures: AtomicU64,
counters_sent: AtomicU64,
counters_received: AtomicU64,
echoes_sent: AtomicU64,
}
#[derive(Debug, Clone)]
struct PeerState {
peer_id: PeerId,
remote_addr: TransportAddr,
connected_at: Instant,
bytes_sent: u64,
bytes_received: u64,
connection_type: String,
}
fn compute_sha256(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
hex::encode(hasher.finalize())
}
fn generate_test_data(size: u64, chunk_size: usize) -> Vec<VerifiedDataChunk> {
let mut chunks = Vec::new();
let mut remaining = size;
let mut sequence = 0u64;
while remaining > 0 {
let this_chunk = std::cmp::min(remaining, chunk_size as u64) as usize;
let data: Vec<u8> = (0..this_chunk)
.map(|i| ((sequence + i as u64) % 256) as u8)
.collect();
chunks.push(VerifiedDataChunk::new(sequence, data));
remaining -= this_chunk as u64;
sequence += 1;
}
chunks
}
fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = KB * 1024;
const GB: u64 = MB * 1024;
if bytes >= GB {
format!("{:.2} GB", bytes as f64 / GB as f64)
} else if bytes >= MB {
format!("{:.2} MB", bytes as f64 / MB as f64)
} else if bytes >= KB {
format!("{:.2} KB", bytes as f64 / KB as f64)
} else {
format!("{} B", bytes)
}
}
fn format_peer_id(peer_id: &PeerId) -> String {
hex::encode(&peer_id.0[..8])
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let log_level = if args.verbose { "debug" } else { "info" };
tracing_subscriber::fmt()
.with_env_filter(format!("ant_quic={log_level},e2e_test_node={log_level}"))
.init();
info!("E2E Test Node v{}", env!("CARGO_PKG_VERSION"));
info!("Starting in {} mode...", args.node_location);
let mut builder = P2pConfig::builder().bind_addr(args.listen);
for addr in &args.known_peers {
builder = builder.known_peer(*addr);
}
if args.pqc_mtu {
builder = builder.mtu(MtuConfig::pqc_optimized());
info!("Using PQC-optimized MTU settings");
}
if args.no_port_mapping {
builder = builder.port_mapping_enabled(false);
info!("Best-effort router port mapping disabled");
}
let mdns_requested = args.mdns
|| args.mdns_service.is_some()
|| args.mdns_namespace.is_some()
|| args.mdns_mode.is_some()
|| args.mdns_auto_connect.is_some();
if args.no_mdns && mdns_requested {
anyhow::bail!("--no-mdns cannot be combined with other mDNS configuration flags");
}
if args.no_mdns {
builder = builder.mdns_enabled(false);
info!("First-party mDNS disabled");
} else {
let mut mdns_config = MdnsConfig::default();
if let Some(service) = args.mdns_service.clone() {
mdns_config.service = Some(service);
}
if let Some(namespace) = args.mdns_namespace.clone() {
mdns_config.namespace = Some(namespace);
}
if let Some(mode) = args.mdns_mode {
mdns_config.mode = mode.into();
}
if let Some(auto_connect) = args.mdns_auto_connect {
mdns_config.auto_connect = auto_connect.into();
}
builder = builder.mdns(mdns_config);
}
let config = builder.build()?;
info!("Creating P2P endpoint...");
let endpoint = P2pEndpoint::new(config).await?;
let node_id = args.node_id.unwrap_or_else(|| {
let peer_id = endpoint.peer_id();
format!("node-{}", hex::encode(&peer_id.0[..4]))
});
let peer_id = endpoint.peer_id();
let public_key = endpoint.public_key_bytes();
info!("═══════════════════════════════════════════════════════════════");
info!(" E2E TEST NODE");
info!("═══════════════════════════════════════════════════════════════");
info!("Node ID: {}", node_id);
info!("Location: {}", args.node_location);
info!("Peer ID: {}", format_peer_id(&peer_id));
info!("Public Key: {}", hex::encode(public_key));
if let Some(addr) = endpoint.local_addr() {
info!("Local Address: {}", addr);
}
if let Some(ref server) = args.metrics_server {
info!("Metrics Server: {}", server);
}
if args.generate_data > 0 {
info!(
"Data Generation: {} ({} chunks)",
format_bytes(args.generate_data),
args.generate_data.div_ceil(args.chunk_size as u64)
);
}
info!("═══════════════════════════════════════════════════════════════");
if args.json {
if let Some(addr) = endpoint.local_addr() {
println!(
r#"{{"event":"local_identity","peer_id":"{}","addr":"{}"}}"#,
hex::encode(peer_id.0),
addr
);
} else {
println!(
r#"{{"event":"local_identity","peer_id":"{}"}}"#,
hex::encode(peer_id.0)
);
}
}
let shutdown = CancellationToken::new();
let shutdown_clone = shutdown.clone();
let stats = Arc::new(RuntimeStats::default());
let peers: Arc<RwLock<HashMap<PeerId, PeerState>>> = Arc::new(RwLock::new(HashMap::new()));
let external_addrs: Arc<RwLock<Vec<TransportAddr>>> = Arc::new(RwLock::new(Vec::new()));
let start_time = Instant::now();
tokio::spawn(async move {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c: {}", e);
}
info!("Shutdown signal received");
shutdown_clone.cancel();
});
let endpoint_events = endpoint.clone();
let shutdown_events = shutdown.clone();
let stats_events = stats.clone();
let peers_events = peers.clone();
let external_addrs_events = external_addrs.clone();
let json_output = args.json;
let event_handle = tokio::spawn(async move {
let mut events = endpoint_events.subscribe();
while !shutdown_events.is_cancelled() {
match tokio::time::timeout(Duration::from_millis(100), events.recv()).await {
Ok(Ok(event)) => {
handle_event(
&event,
&stats_events,
&peers_events,
&external_addrs_events,
json_output,
)
.await;
}
Ok(Err(_)) => break,
Err(_) => continue,
}
}
});
let metrics_handle = if let Some(ref server) = args.metrics_server {
let server = server.clone();
let endpoint_metrics = endpoint.clone();
let shutdown_metrics = shutdown.clone();
let stats_metrics = stats.clone();
let peers_metrics = peers.clone();
let external_addrs_metrics = external_addrs.clone();
let node_id_metrics = node_id.clone();
let location = args.node_location.clone();
let interval = args.metrics_interval;
Some(tokio::spawn(async move {
let client = reqwest::Client::new();
let mut interval_timer = tokio::time::interval(Duration::from_secs(interval));
while !shutdown_metrics.is_cancelled() {
interval_timer.tick().await;
let report = build_metrics_report(
&node_id_metrics,
&location,
&endpoint_metrics,
&stats_metrics,
&peers_metrics,
&external_addrs_metrics,
start_time,
)
.await;
match client
.post(format!("{}/api/metrics", server))
.json(&report)
.timeout(Duration::from_secs(5))
.send()
.await
{
Ok(resp) if resp.status().is_success() => {
debug!("Metrics pushed successfully");
}
Ok(resp) => {
warn!("Metrics push returned {}", resp.status());
}
Err(e) => {
debug!("Failed to push metrics: {}", e);
}
}
}
}))
} else {
None
};
let endpoint_recv = endpoint.clone();
let shutdown_recv = shutdown.clone();
let stats_recv = stats.clone();
let verify_data = args.verify_data;
let echo_enabled = args.echo;
let json = args.json;
let recv_handle = tokio::spawn(async move {
loop {
let result = tokio::select! {
r = endpoint_recv.recv() => r,
_ = shutdown_recv.cancelled() => break,
};
match result {
Ok((peer_id, data)) => {
stats_recv
.bytes_received
.fetch_add(data.len() as u64, Ordering::SeqCst);
if data.len() == 8 {
if let Ok(bytes) = data[..8].try_into() {
let counter = u64::from_be_bytes(bytes);
stats_recv.counters_received.fetch_add(1, Ordering::SeqCst);
if json {
println!(
r#"{{"event":"counter_received","counter":{},"peer":"{}"}}"#,
counter,
format_peer_id(&peer_id)
);
} else {
debug!(
"Received counter {} from {}",
counter,
format_peer_id(&peer_id)
);
}
}
} else if verify_data {
if let Ok(chunk) = serde_json::from_slice::<VerifiedDataChunk>(&data) {
if chunk.verify() {
stats_recv
.data_chunks_verified
.fetch_add(1, Ordering::SeqCst);
if json {
println!(
r#"{{"event":"chunk_verified","sequence":{},"peer":"{}","size":{}}}"#,
chunk.sequence,
format_peer_id(&peer_id),
chunk.data.len()
);
} else {
debug!(
"Verified chunk {} from {} ({} bytes)",
chunk.sequence,
format_peer_id(&peer_id),
chunk.data.len()
);
}
} else {
stats_recv
.data_verification_failures
.fetch_add(1, Ordering::SeqCst);
error!(
"Verification FAILED for chunk {} from {}",
chunk.sequence,
format_peer_id(&peer_id)
);
}
}
} else if json {
println!(
r#"{{"event":"data_received","bytes":{},"peer":"{}"}}"#,
data.len(),
format_peer_id(&peer_id)
);
} else {
debug!(
"Received {} bytes from {}",
data.len(),
format_peer_id(&peer_id)
);
}
if echo_enabled {
let endpoint_send = endpoint_recv.clone();
let stats_send = stats_recv.clone();
tokio::spawn(async move {
if let Err(e) = endpoint_send.send(&peer_id, &data).await {
debug!("Failed to echo: {}", e);
} else {
stats_send
.bytes_sent
.fetch_add(data.len() as u64, Ordering::SeqCst);
stats_send.echoes_sent.fetch_add(1, Ordering::SeqCst);
}
});
}
}
Err(_) => {
}
}
}
});
let counter_handle = if args.counter_test {
let endpoint_counter = endpoint.clone();
let shutdown_counter = shutdown.clone();
let stats_counter = stats.clone();
let interval_ms = args.counter_interval;
let json = args.json;
Some(tokio::spawn(async move {
let mut counter: u64 = 0;
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
while !shutdown_counter.is_cancelled() {
interval.tick().await;
counter += 1;
let connected_peers = endpoint_counter.connected_peers().await;
let mut send_tasks = Vec::with_capacity(connected_peers.len());
for peer in connected_peers {
let endpoint_send = endpoint_counter.clone();
let stats_send = stats_counter.clone();
let peer_id = peer.peer_id;
send_tasks.push(tokio::spawn(async move {
let data = counter.to_be_bytes();
match endpoint_send.send(&peer_id, &data).await {
Ok(()) => {
stats_send.counters_sent.fetch_add(1, Ordering::SeqCst);
stats_send
.bytes_sent
.fetch_add(data.len() as u64, Ordering::SeqCst);
if json {
println!(
r#"{{"event":"counter_sent","counter":{},"peer":"{}"}}"#,
counter,
format_peer_id(&peer_id)
);
}
}
Err(e) => debug!(
"Failed to send counter {} to {}: {}",
counter,
format_peer_id(&peer_id),
e
),
}
}));
}
for task in send_tasks {
if let Err(e) = task.await {
debug!("Counter send task join error: {}", e);
}
}
}
}))
} else {
None
};
if !args.known_peers.is_empty() {
info!("Connecting to {} known peer(s)...", args.known_peers.len());
match endpoint.connect_known_peers().await {
Ok(count) => {
info!("Connected to {} known peer(s)", count);
stats
.connections_initiated
.fetch_add(count as u64, Ordering::SeqCst);
}
Err(e) => {
error!("Failed to connect to known peers: {}", e);
}
}
}
let data_handle = if args.generate_data > 0 {
let endpoint_data = endpoint.clone();
let shutdown_data = shutdown.clone();
let stats_data = stats.clone();
let peers_data = peers.clone();
let data_size = args.generate_data;
let chunk_size = args.chunk_size;
let show_progress = args.show_progress;
let json = args.json;
Some(tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
let chunks = generate_test_data(data_size, chunk_size);
let total_chunks = chunks.len();
info!(
"Generated {} chunks ({} total)",
total_chunks,
format_bytes(data_size)
);
let connected_peers: Vec<PeerId> = peers_data.read().await.keys().cloned().collect();
if connected_peers.is_empty() {
warn!("No connected peers to send data to");
return;
}
info!("Sending data to {} peer(s)...", connected_peers.len());
let send_start = Instant::now();
let mut chunks_sent = 0u64;
let mut last_progress = Instant::now();
for (idx, chunk) in chunks.iter().enumerate() {
if shutdown_data.is_cancelled() {
break;
}
let chunk_bytes = serde_json::to_vec(&chunk).expect("Failed to serialize chunk");
for peer_id in &connected_peers {
match endpoint_data.send(peer_id, &chunk_bytes).await {
Ok(()) => {
stats_data
.bytes_sent
.fetch_add(chunk_bytes.len() as u64, Ordering::SeqCst);
stats_data.data_chunks_sent.fetch_add(1, Ordering::SeqCst);
chunks_sent += 1;
}
Err(e) => {
debug!(
"Failed to send chunk {} to {}: {}",
idx,
format_peer_id(peer_id),
e
);
}
}
}
if show_progress && last_progress.elapsed() > Duration::from_secs(1) {
let progress = (idx + 1) as f64 / total_chunks as f64 * 100.0;
let elapsed = send_start.elapsed().as_secs_f64();
let bytes_sent = stats_data.bytes_sent.load(Ordering::SeqCst);
let throughput_mbps = (bytes_sent as f64 * 8.0) / (elapsed * 1_000_000.0);
if json {
println!(
r#"{{"event":"progress","percent":{:.1},"chunks_sent":{},"throughput_mbps":{:.2}}}"#,
progress, chunks_sent, throughput_mbps
);
} else {
info!(
"Progress: {:.1}% ({}/{} chunks, {:.2} Mbps)",
progress,
idx + 1,
total_chunks,
throughput_mbps
);
}
last_progress = Instant::now();
}
}
let elapsed = send_start.elapsed();
let throughput_mbps = (data_size as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
if json {
println!(
r#"{{"event":"data_transfer_complete","chunks_sent":{},"bytes":{},"duration_secs":{:.2},"throughput_mbps":{:.2}}}"#,
chunks_sent,
data_size,
elapsed.as_secs_f64(),
throughput_mbps
);
} else {
info!("═══════════════════════════════════════════════════════════════");
info!(" DATA TRANSFER COMPLETE");
info!("═══════════════════════════════════════════════════════════════");
info!(" Chunks sent: {}", chunks_sent);
info!(" Total data: {}", format_bytes(data_size));
info!(" Duration: {:.2}s", elapsed.as_secs_f64());
info!(" Throughput: {:.2} Mbps", throughput_mbps);
info!("═══════════════════════════════════════════════════════════════");
}
}))
} else {
None
};
let duration = if args.duration > 0 {
Some(Duration::from_secs(args.duration))
} else {
None
};
info!("Ready. Press Ctrl+C to shutdown.");
while !shutdown.is_cancelled() {
if let Some(max_duration) = duration
&& start_time.elapsed() > max_duration
{
info!("Duration limit reached");
break;
}
match tokio::time::timeout(Duration::from_millis(100), endpoint.accept()).await {
Ok(Some(peer)) => {
info!(
"Accepted connection from: {} at {}",
format_peer_id(&peer.peer_id),
peer.remote_addr
);
stats.connections_accepted.fetch_add(1, Ordering::SeqCst);
let mut peers_guard = peers.write().await;
peers_guard.insert(
peer.peer_id,
PeerState {
peer_id: peer.peer_id,
remote_addr: peer.remote_addr,
connected_at: Instant::now(),
bytes_sent: 0,
bytes_received: 0,
connection_type: "direct".to_string(),
},
);
}
Ok(None) => {}
Err(_) => {}
}
}
info!("Shutting down...");
shutdown.cancel();
endpoint.shutdown().await;
event_handle.abort();
recv_handle.abort();
if let Some(h) = counter_handle {
h.abort();
}
if let Some(h) = metrics_handle {
h.abort();
}
if let Some(h) = data_handle {
let _ = h.await;
}
print_final_stats(&node_id, &stats, start_time.elapsed(), args.json);
info!("Goodbye!");
Ok(())
}
async fn handle_event(
event: &P2pEvent,
stats: &RuntimeStats,
peers: &RwLock<HashMap<PeerId, PeerState>>,
external_addrs: &RwLock<Vec<TransportAddr>>,
json: bool,
) {
match event {
P2pEvent::PeerConnected {
peer_id,
addr,
side,
traversal_method,
} => {
let direction = if side.is_client() {
"outbound"
} else {
"inbound"
};
let connection_type = match traversal_method {
ant_quic::TraversalMethod::Direct => "direct",
ant_quic::TraversalMethod::HolePunch
| ant_quic::TraversalMethod::PortPrediction => "nat_traversed",
ant_quic::TraversalMethod::Relay => "relayed",
};
peers.write().await.insert(
*peer_id,
PeerState {
peer_id: *peer_id,
remote_addr: addr.clone(),
connected_at: Instant::now(),
bytes_sent: 0,
bytes_received: 0,
connection_type: connection_type.to_string(),
},
);
if json {
println!(
r#"{{"event":"peer_connected","peer_id":"{}","addr":"{}","direction":"{}","connection_type":"{}"}}"#,
format_peer_id(peer_id),
addr,
direction,
connection_type
);
} else {
info!(
"Peer connected: {} at {} ({} / {})",
format_peer_id(peer_id),
addr,
direction,
connection_type
);
}
}
P2pEvent::PeerDisconnected { peer_id, reason } => {
peers.write().await.remove(peer_id);
if json {
println!(
r#"{{"event":"peer_disconnected","peer_id":"{}","reason":"{:?}"}}"#,
format_peer_id(peer_id),
reason
);
} else {
info!(
"Peer disconnected: {} ({:?})",
format_peer_id(peer_id),
reason
);
}
}
P2pEvent::PortMappingEstablished { external_addr }
| P2pEvent::PortMappingRenewed { external_addr } => {
if json {
println!(
r#"{{"event":"port_mapping_established","addr":"{}"}}"#,
external_addr
);
} else {
info!("Port mapping active at {}", external_addr);
}
}
P2pEvent::PortMappingAddressChanged {
previous_addr,
external_addr,
} => {
if json {
println!(
r#"{{"event":"port_mapping_address_changed","previous_addr":"{}","external_addr":"{}"}}"#,
previous_addr, external_addr
);
} else {
info!(
"Port mapping address changed from {} to {}",
previous_addr, external_addr
);
}
}
P2pEvent::PortMappingFailed { error } => {
if json {
println!(r#"{{"event":"port_mapping_failed","error":"{}"}}"#, error);
} else {
warn!("Port mapping failed: {}", error);
}
}
P2pEvent::ExternalAddressDiscovered { addr } => {
stats
.external_addresses_discovered
.fetch_add(1, Ordering::SeqCst);
external_addrs.write().await.push(addr.clone());
if json {
println!(
r#"{{"event":"external_address_discovered","addr":"{}"}}"#,
addr
);
} else {
info!("External address discovered: {}", addr);
}
}
P2pEvent::NatTraversalProgress { peer_id, phase } => {
match phase {
TraversalPhase::Connected => {
stats
.nat_traversals_completed
.fetch_add(1, Ordering::SeqCst);
}
TraversalPhase::Failed => {
stats.nat_traversals_failed.fetch_add(1, Ordering::SeqCst);
}
_ => {}
}
if json {
println!(
r#"{{"event":"nat_traversal_progress","peer_id":"{}","phase":"{:?}"}}"#,
format_peer_id(peer_id),
phase
);
} else {
debug!(
"NAT traversal progress: {} - {:?}",
format_peer_id(peer_id),
phase
);
}
}
P2pEvent::MdnsServiceAdvertised {
service,
namespace,
instance_fullname,
} => {
if json {
println!(
r#"{{"event":"mdns_service_advertised","service":"{}","namespace":{},"instance_fullname":"{}"}}"#,
service,
namespace
.as_ref()
.map(|value| format!("\"{}\"", value))
.unwrap_or_else(|| "null".to_string()),
instance_fullname
);
} else {
info!(
"mDNS service advertised: {} ({})",
instance_fullname,
namespace
.clone()
.unwrap_or_else(|| "no namespace".to_string())
);
}
}
P2pEvent::MdnsPeerDiscovered { peer } => {
if json {
println!(
r#"{{"event":"mdns_peer_discovered","fullname":"{}","addresses":"{}"}}"#,
peer.fullname,
peer.addresses
.iter()
.map(SocketAddr::to_string)
.collect::<Vec<_>>()
.join(",")
);
} else {
info!("mDNS peer discovered: {}", peer.fullname);
}
}
P2pEvent::MdnsPeerUpdated { peer } => {
if json {
println!(
r#"{{"event":"mdns_peer_updated","fullname":"{}","addresses":"{}"}}"#,
peer.fullname,
peer.addresses
.iter()
.map(SocketAddr::to_string)
.collect::<Vec<_>>()
.join(",")
);
}
}
P2pEvent::MdnsPeerRemoved { peer } => {
if json {
println!(
r#"{{"event":"mdns_peer_removed","fullname":"{}"}}"#,
peer.fullname
);
}
}
P2pEvent::MdnsAutoConnectSucceeded { peer, .. } => {
if json {
println!(
r#"{{"event":"mdns_auto_connect_succeeded","fullname":"{}"}}"#,
peer.fullname
);
}
}
P2pEvent::MdnsAutoConnectFailed { peer, error, .. } => {
if json {
println!(
r#"{{"event":"mdns_auto_connect_failed","fullname":"{}","error":"{}"}}"#,
peer.fullname, error
);
}
}
P2pEvent::DataReceived { peer_id, bytes } => {
stats
.bytes_received
.fetch_add(*bytes as u64, Ordering::SeqCst);
debug!(
"Data received: {} bytes from {}",
bytes,
format_peer_id(peer_id)
);
}
_ => {
debug!("Event: {:?}", event);
}
}
}
async fn build_metrics_report(
node_id: &str,
location: &str,
endpoint: &P2pEndpoint,
stats: &RuntimeStats,
peers: &RwLock<HashMap<PeerId, PeerState>>,
external_addrs: &RwLock<Vec<TransportAddr>>,
start_time: Instant,
) -> NodeMetricsReport {
let uptime = start_time.elapsed();
let bytes_sent = stats.bytes_sent.load(Ordering::SeqCst);
let bytes_received = stats.bytes_received.load(Ordering::SeqCst);
let total_bytes = bytes_sent + bytes_received;
let throughput_mbps = if uptime.as_secs() > 0 {
(total_bytes as f64 * 8.0) / (uptime.as_secs_f64() * 1_000_000.0)
} else {
0.0
};
let peers_guard = peers.read().await;
let connected_peers: Vec<PeerInfo> = peers_guard
.values()
.map(|p| PeerInfo {
peer_id: format_peer_id(&p.peer_id),
remote_addr: p.remote_addr.to_string(),
connected_at: p.connected_at.elapsed().as_secs(),
bytes_sent: p.bytes_sent,
bytes_received: p.bytes_received,
connection_type: p.connection_type.clone(),
})
.collect();
let external_addresses: Vec<String> = external_addrs
.read()
.await
.iter()
.map(|a| a.to_string())
.collect();
let local_addr = endpoint
.local_addr()
.map(|a| a.to_string())
.unwrap_or_default();
NodeMetricsReport {
node_id: node_id.to_string(),
location: location.to_string(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
uptime_secs: uptime.as_secs(),
active_connections: connected_peers.len(),
bytes_sent_total: bytes_sent,
bytes_received_total: bytes_received,
current_throughput_mbps: throughput_mbps,
nat_traversal_successes: stats.nat_traversals_completed.load(Ordering::SeqCst),
nat_traversal_failures: stats.nat_traversals_failed.load(Ordering::SeqCst),
direct_connections: stats.direct_connections.load(Ordering::SeqCst),
relayed_connections: stats.relayed_connections.load(Ordering::SeqCst),
data_chunks_sent: stats.data_chunks_sent.load(Ordering::SeqCst),
data_chunks_verified: stats.data_chunks_verified.load(Ordering::SeqCst),
data_verification_failures: stats.data_verification_failures.load(Ordering::SeqCst),
external_addresses,
connected_peers,
local_addr,
}
}
fn print_final_stats(node_id: &str, stats: &RuntimeStats, duration: Duration, json: bool) {
let bytes_sent = stats.bytes_sent.load(Ordering::SeqCst);
let bytes_received = stats.bytes_received.load(Ordering::SeqCst);
let secs = duration.as_secs_f64();
if json {
println!(
r#"{{"type":"final_stats","node_id":"{}","duration_secs":{:.2},"bytes_sent":{},"bytes_received":{},"connections_accepted":{},"connections_initiated":{},"nat_traversals_completed":{},"nat_traversals_failed":{},"chunks_sent":{},"chunks_verified":{},"verification_failures":{}}}"#,
node_id,
secs,
bytes_sent,
bytes_received,
stats.connections_accepted.load(Ordering::SeqCst),
stats.connections_initiated.load(Ordering::SeqCst),
stats.nat_traversals_completed.load(Ordering::SeqCst),
stats.nat_traversals_failed.load(Ordering::SeqCst),
stats.data_chunks_sent.load(Ordering::SeqCst),
stats.data_chunks_verified.load(Ordering::SeqCst),
stats.data_verification_failures.load(Ordering::SeqCst),
);
} else {
info!("═══════════════════════════════════════════════════════════════");
info!(" FINAL STATISTICS");
info!("═══════════════════════════════════════════════════════════════");
info!(" Node ID: {}", node_id);
info!(" Duration: {:.2}s", secs);
info!(
" Connections accepted: {}",
stats.connections_accepted.load(Ordering::SeqCst)
);
info!(
" Connections initiated: {}",
stats.connections_initiated.load(Ordering::SeqCst)
);
info!(
" NAT traversals completed: {}",
stats.nat_traversals_completed.load(Ordering::SeqCst)
);
info!(
" NAT traversals failed: {}",
stats.nat_traversals_failed.load(Ordering::SeqCst)
);
info!(" Bytes sent: {}", format_bytes(bytes_sent));
info!(" Bytes received: {}", format_bytes(bytes_received));
info!(
" Data chunks sent: {}",
stats.data_chunks_sent.load(Ordering::SeqCst)
);
info!(
" Data chunks verified: {}",
stats.data_chunks_verified.load(Ordering::SeqCst)
);
info!(
" Verification failures: {}",
stats.data_verification_failures.load(Ordering::SeqCst)
);
if secs > 0.0 {
let total_bytes = bytes_sent + bytes_received;
let throughput_mbps = (total_bytes as f64 * 8.0) / (secs * 1_000_000.0);
info!(" Throughput: {:.2} Mbps", throughput_mbps);
}
info!("═══════════════════════════════════════════════════════════════");
}
}