use crate::error::{NetworkError, NetworkResult};
use ant_quic::{bootstrap_cache::PeerCapabilities, Node, NodeConfig, TransportAddr};
use bytes::Bytes;
use saorsa_gossip_transport::GossipStreamType;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, RwLock};
use tracing::{debug, error, info, warn};
type AntPeerId = ant_quic::PeerId;
type GossipPeerId = saorsa_gossip_types::PeerId;
pub const DEFAULT_PORT: u16 = 5483;
pub const DEFAULT_METRICS_PORT: u16 = 12600;
pub const DEFAULT_MAX_CONNECTIONS: u32 = 100;
pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_STATS_INTERVAL: Duration = Duration::from_secs(60);
pub const MAX_MESSAGE_DESERIALIZE_SIZE: u64 = 4 * 1024 * 1024;
pub const DEFAULT_BOOTSTRAP_PEERS: &[&str] = &[
"142.93.199.50:5483", "147.182.234.192:5483", "65.21.157.229:5483", "116.203.101.172:5483", "149.28.156.231:5483", "45.77.176.184:5483", "[2604:a880:400:d1:0:3:7db3:f001]:5483", "[2604:a880:4:1d0:0:1:6ba1:f000]:5483", "[2a01:4f9:c012:684b::1]:5483", "[2a01:4f8:1c1a:31e6::1]:5483", "[2001:19f0:4401:346:5400:5ff:fed9:9735]:5483", "[2401:c080:1000:4c32:5400:5ff:fed9:9737]:5483", ];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkConfig {
#[serde(default)]
pub bind_addr: Option<SocketAddr>,
#[serde(default)]
pub bootstrap_nodes: Vec<SocketAddr>,
#[serde(default = "default_max_connections")]
pub max_connections: u32,
#[serde(default = "default_connection_timeout")]
pub connection_timeout: Duration,
#[serde(default = "default_stats_interval")]
pub stats_interval: Duration,
#[serde(default)]
pub peer_cache_path: Option<PathBuf>,
#[serde(default)]
pub pinned_bootstrap_peers: std::collections::HashSet<[u8; 32]>,
#[serde(default)]
pub inbound_allowlist: std::collections::HashSet<[u8; 32]>,
#[serde(default = "default_max_peers_per_ip")]
pub max_peers_per_ip: u32,
}
fn default_max_connections() -> u32 {
DEFAULT_MAX_CONNECTIONS
}
fn default_connection_timeout() -> Duration {
DEFAULT_CONNECTION_TIMEOUT
}
fn default_stats_interval() -> Duration {
DEFAULT_STATS_INTERVAL
}
fn default_max_peers_per_ip() -> u32 {
3
}
fn check_ipv6_available() -> bool {
std::net::UdpSocket::bind("[::1]:0").is_ok()
}
impl Default for NetworkConfig {
fn default() -> Self {
let ipv6_available = check_ipv6_available();
let bootstrap_nodes = DEFAULT_BOOTSTRAP_PEERS
.iter()
.filter_map(|addr| addr.parse::<std::net::SocketAddr>().ok())
.filter(|addr| ipv6_available || addr.is_ipv4())
.collect();
Self {
bind_addr: None,
bootstrap_nodes,
max_connections: DEFAULT_MAX_CONNECTIONS,
connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
stats_interval: DEFAULT_STATS_INTERVAL,
peer_cache_path: None,
pinned_bootstrap_peers: std::collections::HashSet::new(),
inbound_allowlist: std::collections::HashSet::new(),
max_peers_per_ip: 3,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct NetworkStats {
pub total_connections: u64,
pub active_connections: u32,
pub bytes_sent: u64,
pub bytes_received: u64,
pub peer_count: usize,
}
pub const DIRECT_MESSAGE_STREAM_TYPE: u8 = 0x10;
#[derive(Debug, Clone)]
pub struct NetworkNode {
node: Arc<RwLock<Option<Node>>>,
config: NetworkConfig,
event_sender: broadcast::Sender<NetworkEvent>,
recv_tx: mpsc::Sender<(AntPeerId, GossipStreamType, Bytes)>,
recv_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<(AntPeerId, GossipStreamType, Bytes)>>>,
direct_tx: mpsc::Sender<(AntPeerId, Bytes)>,
direct_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<(AntPeerId, Bytes)>>>,
peer_id: AntPeerId,
bootstrap_cache: Option<Arc<ant_quic::BootstrapCache>>,
}
impl NetworkNode {
pub async fn new(
config: NetworkConfig,
bootstrap_cache: Option<Arc<ant_quic::BootstrapCache>>,
keypair: Option<(ant_quic::MlDsaPublicKey, ant_quic::MlDsaSecretKey)>,
) -> NetworkResult<Self> {
let mut builder = NodeConfig::builder()
.data_channel_capacity(1024)
.max_concurrent_uni_streams(10_000);
if let Some(bind_addr) = config.bind_addr {
builder = builder.bind_addr(bind_addr);
}
for peer_addr in &config.bootstrap_nodes {
builder = builder.known_peer(*peer_addr);
}
if let Some((pk, sk)) = keypair {
builder = builder.keypair(pk, sk);
}
let node = Node::with_config(builder.build()).await.map_err(|e| {
NetworkError::NodeCreation(format!("Failed to create ant-quic node: {}", e))
})?;
let peer_id = node.peer_id();
let (event_sender, _event_receiver) = broadcast::channel(32);
let (recv_tx, recv_rx) = mpsc::channel(10_000);
let (direct_tx, direct_rx) = mpsc::channel(10_000);
let network_node = Self {
node: Arc::new(RwLock::new(Some(node))),
config,
event_sender,
recv_tx,
recv_rx: Arc::new(tokio::sync::Mutex::new(recv_rx)),
direct_tx,
direct_rx: Arc::new(tokio::sync::Mutex::new(direct_rx)),
peer_id,
bootstrap_cache,
};
network_node.spawn_receiver();
network_node.spawn_accept_loop();
Ok(network_node)
}
pub fn config(&self) -> &NetworkConfig {
&self.config
}
pub fn local_addr(&self) -> Option<SocketAddr> {
self.config.bind_addr
}
pub async fn bound_addr(&self) -> Option<SocketAddr> {
if let Some(status) = self.node_status().await {
Some(status.local_addr)
} else {
self.config.bind_addr
}
}
pub async fn external_addr(&self) -> Option<SocketAddr> {
let node_guard = self.node.read().await;
node_guard.as_ref().and_then(|n| n.external_addr())
}
pub async fn routable_addr(&self) -> Option<SocketAddr> {
if let Some(addr) = self.external_addr().await {
return Some(addr);
}
let addr = self.bound_addr().await?;
if addr.ip().is_unspecified() || addr.port() == 0 {
return None;
}
Some(addr)
}
pub async fn node_status(&self) -> Option<ant_quic::NodeStatus> {
let node = self.node.read().await.as_ref().cloned()?;
Some(node.status().await)
}
pub async fn probe_peer(
&self,
peer_id: AntPeerId,
timeout: std::time::Duration,
) -> Option<Result<std::time::Duration, ant_quic::NodeError>> {
let node = self.node.read().await.as_ref().cloned()?;
Some(node.probe_peer(&peer_id, timeout).await)
}
pub async fn connection_health(
&self,
peer_id: AntPeerId,
) -> Option<ant_quic::ConnectionHealth> {
let node = self.node.read().await.as_ref().cloned()?;
Some(node.connection_health(&peer_id).await)
}
pub async fn send_with_receive_ack(
&self,
peer_id: AntPeerId,
data: &[u8],
timeout: std::time::Duration,
) -> Option<Result<(), ant_quic::NodeError>> {
let node = self.node.read().await.as_ref().cloned()?;
Some(node.send_with_receive_ack(&peer_id, data, timeout).await)
}
pub async fn subscribe_all_peer_events(
&self,
) -> Option<tokio::sync::broadcast::Receiver<(ant_quic::PeerId, ant_quic::PeerLifecycleEvent)>>
{
let node = self.node.read().await.as_ref().cloned()?;
Some(node.subscribe_all_peer_events())
}
pub async fn stats(&self) -> NetworkStats {
let Some(node) = self.node.read().await.as_ref().cloned() else {
return NetworkStats::default();
};
let status = node.status().await;
NetworkStats {
total_connections: status.direct_connections + status.relayed_connections,
active_connections: status.active_connections as u32,
bytes_sent: status.relay_bytes_forwarded,
bytes_received: 0, peer_count: status.connected_peers,
}
}
pub async fn connection_count(&self) -> usize {
let Some(node) = self.node.read().await.as_ref().cloned() else {
return 0;
};
node.status().await.connected_peers
}
pub fn subscribe(&self) -> broadcast::Receiver<NetworkEvent> {
self.event_sender.subscribe()
}
pub fn emit_event(&self, event: NetworkEvent) {
let _ = self.event_sender.send(event);
}
pub async fn connect_cached_peer(&self, peer_id: AntPeerId) -> NetworkResult<SocketAddr> {
if self.is_connected(&peer_id).await {
let node_guard = self.node.read().await;
if let Some(node) = node_guard.as_ref() {
if let Some(addr) = node
.connected_peers()
.await
.into_iter()
.find(|conn| conn.peer_id == peer_id)
.and_then(|conn| match conn.remote_addr {
TransportAddr::Udp(addr) => Some(addr),
_ => None,
})
{
return Ok(addr);
}
}
}
let cache = self.bootstrap_cache.as_ref().ok_or_else(|| {
NetworkError::ConnectionFailed("bootstrap cache not configured".to_string())
})?;
let cached_peer = cache.get_peer(&peer_id).await.ok_or_else(|| {
NetworkError::ConnectionFailed(format!(
"peer {:?} not found in bootstrap cache",
peer_id
))
})?;
let candidate_addrs = cached_peer.preferred_addresses();
for addr in &candidate_addrs {
match self.connect_addr(*addr).await {
Ok(connected_peer) if connected_peer == peer_id => return Ok(*addr),
Ok(connected_peer) => {
warn!(
"Cached address {} for peer {:?} resolved to unexpected peer {:?}",
addr, peer_id, connected_peer
);
}
Err(e) => {
debug!(
"Cached dial to peer {:?} at {} failed: {}",
peer_id, addr, e
);
}
}
}
cache.record_failure(&peer_id).await;
Err(NetworkError::ConnectionFailed(format!(
"peer {:?} not reachable via {} cached addresses",
peer_id,
candidate_addrs.len()
)))
}
pub async fn connect_addr(&self, addr: SocketAddr) -> NetworkResult<AntPeerId> {
let node = self.require_node().await?;
let family = if addr.is_ipv4() { "v4" } else { "v6" };
tracing::debug!(
target: "x0x::connect",
strategy = "direct_addr",
%addr,
family,
"starting direct dial"
);
let start = std::time::Instant::now();
let result = node.connect_addr(addr).await;
let dur_ms = start.elapsed().as_millis() as u64;
match result {
Ok(peer_conn) => {
let rtt_ms = dur_ms as u32;
if let Some(ref cache) = self.bootstrap_cache {
cache
.add_from_connection(peer_conn.peer_id, vec![addr], None)
.await;
cache.record_success(&peer_conn.peer_id, rtt_ms).await;
}
self.emit_event(NetworkEvent::PeerConnected {
peer_id: peer_conn.peer_id.0,
address: addr,
});
tracing::info!(
target: "x0x::connect",
strategy = "direct_addr",
%addr,
family,
peer_id_prefix = %hex_prefix(&peer_conn.peer_id.0, 4),
dur_ms,
outcome = "ok",
"direct dial succeeded"
);
Ok(peer_conn.peer_id)
}
Err(e) => {
if let Some(ref cache) = self.bootstrap_cache {
let all_peers = cache.all_peers().await;
for peer in &all_peers {
if peer.addresses.contains(&addr) {
debug!(
"Recording connection failure for peer {:?} at {addr}",
peer.peer_id
);
cache.record_failure(&peer.peer_id).await;
}
}
}
tracing::info!(
target: "x0x::connect",
strategy = "direct_addr",
%addr,
family,
dur_ms,
outcome = "fail",
error = %e,
"direct dial failed"
);
Err(NetworkError::ConnectionFailed(e.to_string()))
}
}
}
pub async fn connect_peer(&self, peer_id: AntPeerId) -> NetworkResult<(SocketAddr, AntPeerId)> {
let node = self.require_node().await?;
let start = std::time::Instant::now();
let peer_conn = node
.connect_peer(peer_id)
.await
.map_err(|e| NetworkError::ConnectionFailed(e.to_string()))?;
let addr = match peer_conn.remote_addr {
TransportAddr::Udp(socket_addr) => socket_addr,
_ => {
return Err(NetworkError::ConnectionFailed(
"Unsupported transport type".to_string(),
))
}
};
let rtt_ms = start.elapsed().as_millis() as u32;
if let Some(ref cache) = self.bootstrap_cache {
cache
.add_from_connection(peer_conn.peer_id, vec![addr], None)
.await;
cache.record_success(&peer_conn.peer_id, rtt_ms).await;
}
self.emit_event(NetworkEvent::PeerConnected {
peer_id: peer_conn.peer_id.0,
address: addr,
});
Ok((addr, peer_conn.peer_id))
}
pub async fn connect_peer_with_addrs(
&self,
peer_id: AntPeerId,
addrs: Vec<SocketAddr>,
) -> NetworkResult<(SocketAddr, AntPeerId)> {
let node = self.require_node().await?;
let v4_count = addrs.iter().filter(|a| a.is_ipv4()).count();
let v6_count = addrs.len() - v4_count;
tracing::debug!(
target: "x0x::connect",
strategy = "peer_with_addrs",
peer_id_prefix = %hex_prefix(&peer_id.0, 4),
addr_count = addrs.len(),
v4_count,
v6_count,
"starting peer-authenticated dial with hints"
);
let start = std::time::Instant::now();
let peer_conn_res = node.connect_peer_with_addrs(peer_id, addrs).await;
let dur_ms = start.elapsed().as_millis() as u64;
let peer_conn = match peer_conn_res {
Ok(pc) => pc,
Err(e) => {
tracing::info!(
target: "x0x::connect",
strategy = "peer_with_addrs",
peer_id_prefix = %hex_prefix(&peer_id.0, 4),
dur_ms,
outcome = "fail",
error = %e,
"peer-authenticated dial failed"
);
return Err(NetworkError::ConnectionFailed(e.to_string()));
}
};
let addr = match peer_conn.remote_addr {
TransportAddr::Udp(socket_addr) => socket_addr,
_ => {
tracing::warn!(
target: "x0x::connect",
strategy = "peer_with_addrs",
peer_id_prefix = %hex_prefix(&peer_id.0, 4),
dur_ms,
"connected but transport type unsupported"
);
return Err(NetworkError::ConnectionFailed(
"Unsupported transport type".to_string(),
));
}
};
let family = if addr.is_ipv4() { "v4" } else { "v6" };
let rtt_ms = dur_ms as u32;
if let Some(ref cache) = self.bootstrap_cache {
cache
.add_from_connection(peer_conn.peer_id, vec![addr], None)
.await;
cache.record_success(&peer_conn.peer_id, rtt_ms).await;
}
self.emit_event(NetworkEvent::PeerConnected {
peer_id: peer_conn.peer_id.0,
address: addr,
});
tracing::info!(
target: "x0x::connect",
strategy = "peer_with_addrs",
peer_id_prefix = %hex_prefix(&peer_id.0, 4),
verified_prefix = %hex_prefix(&peer_conn.peer_id.0, 4),
selected_addr = %addr,
family,
dur_ms,
outcome = "ok",
"peer-authenticated dial succeeded"
);
Ok((addr, peer_conn.peer_id))
}
pub async fn upsert_peer_hints(
&self,
peer_id: AntPeerId,
addrs: Vec<SocketAddr>,
capabilities: Option<PeerCapabilities>,
) -> NetworkResult<()> {
let node = self.require_node().await?;
node.upsert_peer_hints(peer_id, addrs, capabilities).await;
Ok(())
}
pub async fn disconnect(&self, peer_id: &AntPeerId) -> NetworkResult<()> {
let node = self.require_node().await?;
node.disconnect(peer_id)
.await
.map_err(|e| NetworkError::ConnectionFailed(e.to_string()))?;
self.emit_event(NetworkEvent::PeerDisconnected { peer_id: peer_id.0 });
Ok(())
}
pub async fn connected_peers(&self) -> Vec<AntPeerId> {
let node_guard = self.node.read().await;
match node_guard.as_ref() {
Some(node) => node
.connected_peers()
.await
.iter()
.map(|conn| conn.peer_id)
.collect(),
None => Vec::new(),
}
}
pub async fn is_connected(&self, peer_id: &AntPeerId) -> bool {
let node_guard = self.node.read().await;
match node_guard.as_ref() {
Some(node) => node.is_connected(peer_id).await,
None => false,
}
}
pub async fn shutdown(&self) {
let mut node_guard = self.node.write().await;
let _ = node_guard.take();
}
async fn require_node(&self) -> NetworkResult<Node> {
self.node
.read()
.await
.as_ref()
.cloned()
.ok_or_else(|| NetworkError::NodeCreation("Node not initialized".to_string()))
}
pub fn peer_id(&self) -> AntPeerId {
self.peer_id
}
pub async fn send_direct(
&self,
peer_id: &AntPeerId,
sender_agent_id: &[u8; 32],
payload: &[u8],
) -> NetworkResult<()> {
if !self.is_connected(peer_id).await {
return Err(NetworkError::NotConnected(peer_id.0));
}
let mut buf = Vec::with_capacity(1 + 32 + payload.len());
buf.push(DIRECT_MESSAGE_STREAM_TYPE);
buf.extend_from_slice(sender_agent_id);
buf.extend_from_slice(payload);
let node = self.require_node().await?;
node.send(peer_id, &buf)
.await
.map_err(|e| NetworkError::ConnectionFailed(format!("send failed: {}", e)))?;
info!(
"[1/6 network] send_direct: {} bytes to peer {:?}",
payload.len(),
peer_id
);
Ok(())
}
pub async fn recv_direct(&self) -> Option<(AntPeerId, Bytes)> {
let mut rx = self.direct_rx.lock().await;
rx.recv().await
}
fn spawn_receiver(&self) {
let node = Arc::clone(&self.node);
let recv_tx = self.recv_tx.clone();
let direct_tx = self.direct_tx.clone();
tokio::spawn(async move {
debug!("NetworkNode receiver task started");
loop {
let node_guard = node.read().await;
let node_ref = match node_guard.as_ref() {
Some(n) => n,
None => {
debug!("Node not initialized, receiver stopping");
break;
}
};
let recv_result = node_ref.recv().await;
drop(node_guard);
match recv_result {
Ok((peer_id, data)) => {
if data.is_empty() {
continue;
}
let type_byte = data[0];
if type_byte == DIRECT_MESSAGE_STREAM_TYPE {
let payload = Bytes::copy_from_slice(&data[1..]);
if payload.len() > crate::direct::MAX_DIRECT_PAYLOAD_SIZE + 32 {
warn!(
"[1/6 network] dropping oversized direct message: {} bytes from peer {:?} (max: {})",
payload.len(),
peer_id,
crate::direct::MAX_DIRECT_PAYLOAD_SIZE + 32
);
continue;
}
info!(
"[1/6 network] recv direct: {} bytes from peer {:?}",
payload.len(),
peer_id
);
if let Err(e) = direct_tx.send((peer_id, payload)).await {
error!("Failed to forward direct message: {}", e);
break;
}
continue;
}
let stream_type = match GossipStreamType::from_byte(type_byte) {
Some(st) => st,
None => {
warn!("Unknown stream type byte: {}", type_byte);
continue;
}
};
let payload = Bytes::copy_from_slice(&data[1..]);
info!(
"[1/6 network] recv: {} bytes ({:?}) from peer {:?}",
data.len() - 1,
stream_type,
peer_id
);
let capacity = recv_tx.capacity();
let max_capacity = recv_tx.max_capacity();
if capacity.saturating_mul(5) < max_capacity {
warn!(
available = capacity,
max = max_capacity,
peer = ?peer_id,
stream = ?stream_type,
"[1/6 network] recv_tx >80% full — PubSub pipeline falling behind; back-pressure is about to stall network recv"
);
}
if let Err(e) = recv_tx.send((peer_id, stream_type, payload)).await {
error!("Failed to forward message: {}", e);
break;
}
}
Err(e) => {
debug!("Receive error: {}", e);
}
}
}
debug!("NetworkNode receiver task stopped");
});
}
fn spawn_accept_loop(&self) {
let node = Arc::clone(&self.node);
let event_sender = self.event_sender.clone();
let bootstrap_cache = self.bootstrap_cache.clone();
let inbound_allowlist = self.config.inbound_allowlist.clone();
tokio::spawn(async move {
debug!("NetworkNode accept loop started");
loop {
let node_guard = node.read().await;
let node_ref = match node_guard.as_ref() {
Some(n) => n,
None => {
debug!("Node not initialized, accept loop stopping");
break;
}
};
match node_ref.accept().await {
Some(peer_conn) => {
if !inbound_allowlist.is_empty()
&& !inbound_allowlist.contains(&peer_conn.peer_id.0)
{
tracing::warn!(
"SECURITY: Rejecting inbound connection from non-allowlisted peer {:?}",
peer_conn.peer_id
);
continue;
}
tracing::info!(
"Accepted inbound connection from peer {:?} at {:?}",
peer_conn.peer_id,
peer_conn.remote_addr
);
let addr = match peer_conn.remote_addr {
ant_quic::TransportAddr::Udp(addr) => Some(addr),
_ => None,
};
if let (Some(ref cache), Some(addr)) = (&bootstrap_cache, addr) {
cache
.add_from_connection(peer_conn.peer_id, vec![addr], None)
.await;
cache.record_success(&peer_conn.peer_id, 0).await;
}
let addr =
addr.unwrap_or_else(|| std::net::SocketAddr::from(([0, 0, 0, 0], 0)));
let _ = event_sender.send(NetworkEvent::PeerConnected {
peer_id: peer_conn.peer_id.0,
address: addr,
});
}
None => {
debug!("Accept loop ended (node shutting down)");
break;
}
}
}
debug!("NetworkNode accept loop stopped");
});
}
}
pub(crate) fn hex_prefix(bytes: &[u8; 32], n: usize) -> String {
let n = n.min(32);
let mut s = String::with_capacity(n * 2);
for b in &bytes[..n] {
use std::fmt::Write;
let _ = write!(&mut s, "{:02x}", b);
}
s
}
fn ant_to_gossip_peer_id(ant_id: &AntPeerId) -> GossipPeerId {
GossipPeerId::new(ant_id.0)
}
fn gossip_to_ant_peer_id(gossip_id: &GossipPeerId) -> AntPeerId {
ant_quic::PeerId(gossip_id.to_bytes())
}
#[async_trait::async_trait]
impl saorsa_gossip_transport::GossipTransport for NetworkNode {
async fn dial(&self, peer: GossipPeerId, addr: SocketAddr) -> anyhow::Result<()> {
let ant_peer = gossip_to_ant_peer_id(&peer);
if self.is_connected(&ant_peer).await {
debug!("Already connected to peer {:?} at {}", peer, addr);
return Ok(());
}
let connected_peer = self
.connect_addr(addr)
.await
.map_err(|e| anyhow::anyhow!("dial failed: {}", e))?;
if connected_peer != ant_peer {
warn!(
"SECURITY: Peer mismatch - expected {:?}, got {:?}",
peer, connected_peer
);
return Err(anyhow::anyhow!(
"Connected to unexpected peer {:?} when dialing {:?}",
connected_peer,
peer
));
}
Ok(())
}
async fn dial_bootstrap(&self, addr: SocketAddr) -> anyhow::Result<GossipPeerId> {
let ant_peer_id = self
.connect_addr(addr)
.await
.map_err(|e| anyhow::anyhow!("bootstrap dial failed: {}", e))?;
if !self.config.pinned_bootstrap_peers.is_empty()
&& !self.config.pinned_bootstrap_peers.contains(&ant_peer_id.0)
{
warn!(
"SECURITY: Bootstrap peer at {} has unexpected ID {:?} — not in pinned set",
addr, ant_peer_id
);
let _ = self.disconnect(&ant_peer_id).await;
return Err(anyhow::anyhow!(
"Bootstrap peer at {} has unpinned ID {:?}",
addr,
ant_peer_id
));
}
Ok(ant_to_gossip_peer_id(&ant_peer_id))
}
async fn listen(&self, _bind: SocketAddr) -> anyhow::Result<()> {
debug!("listen() no-op - NetworkNode already bound");
Ok(())
}
async fn close(&self) -> anyhow::Result<()> {
self.shutdown().await;
Ok(())
}
async fn send_to_peer(
&self,
peer: GossipPeerId,
stream_type: saorsa_gossip_transport::GossipStreamType,
data: bytes::Bytes,
) -> anyhow::Result<()> {
let ant_peer = gossip_to_ant_peer_id(&peer);
if !self.is_connected(&ant_peer).await {
if let Err(e) = self.connect_cached_peer(ant_peer).await {
return Err(anyhow::anyhow!(
"Peer {:?} not connected and bootstrap cache dial failed: {}",
peer,
e,
));
}
}
let mut buf = Vec::with_capacity(1 + data.len());
buf.push(stream_type.to_byte());
buf.extend_from_slice(&data);
let node_guard = self.node.read().await;
let node = node_guard
.as_ref()
.ok_or_else(|| anyhow::anyhow!("node not initialized"))?;
node.send(&ant_peer, &buf)
.await
.map_err(|e| anyhow::anyhow!("send failed: {}", e))?;
info!(
"[1/6 network] send: {} bytes ({:?}) to peer {:?}",
buf.len(),
stream_type,
peer
);
Ok(())
}
async fn receive_message(
&self,
) -> anyhow::Result<(
GossipPeerId,
saorsa_gossip_transport::GossipStreamType,
bytes::Bytes,
)> {
let mut recv_rx = self.recv_rx.lock().await;
let (ant_peer, stream_type, data) = recv_rx
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("Receive channel closed"))?;
Ok((ant_to_gossip_peer_id(&ant_peer), stream_type, data))
}
fn local_peer_id(&self) -> GossipPeerId {
ant_to_gossip_peer_id(&self.peer_id())
}
}
#[derive(Debug, Clone)]
pub enum NetworkEvent {
PeerConnected {
peer_id: [u8; 32],
address: SocketAddr,
},
PeerDisconnected {
peer_id: [u8; 32],
},
NatTypeDetected {
nat_type: String,
},
ExternalAddressDiscovered {
address: SocketAddr,
},
ConnectionError {
peer_id: Option<[u8; 32]>,
error: String,
},
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use super::*;
use saorsa_gossip_transport::GossipTransport;
#[tokio::test]
async fn test_gossip_transport_trait() {
let config = NetworkConfig::default();
let node = NetworkNode::new(config, None, None).await.unwrap();
let peer_id = node.local_peer_id();
assert_eq!(peer_id.to_bytes().len(), 32);
assert!(node.close().await.is_ok());
}
#[test]
fn test_peer_id_conversion() {
let bytes = [42u8; 32];
let ant_peer = ant_quic::PeerId(bytes);
let gossip_peer = ant_to_gossip_peer_id(&ant_peer);
let ant_peer_2 = gossip_to_ant_peer_id(&gossip_peer);
assert_eq!(ant_peer, ant_peer_2);
assert_eq!(gossip_peer.to_bytes(), bytes);
}
#[test]
fn test_network_config_defaults() {
let config = NetworkConfig::default();
assert!(config.bind_addr.is_none());
assert_eq!(
config.bootstrap_nodes.len(),
12,
"Should have 12 default bootstrap nodes (6 IPv4 + 6 IPv6)"
);
let expected_addrs: Vec<SocketAddr> = DEFAULT_BOOTSTRAP_PEERS
.iter()
.map(|s| s.parse().unwrap())
.collect();
for expected in &expected_addrs {
assert!(
config.bootstrap_nodes.contains(expected),
"Bootstrap nodes should include {}",
expected
);
}
assert_eq!(config.max_connections, DEFAULT_MAX_CONNECTIONS);
assert_eq!(config.connection_timeout, DEFAULT_CONNECTION_TIMEOUT);
}
#[test]
fn test_default_bootstrap_peers_parseable() {
for peer in DEFAULT_BOOTSTRAP_PEERS {
peer.parse::<SocketAddr>()
.unwrap_or_else(|_| panic!("Bootstrap peer '{}' is not a valid SocketAddr", peer));
}
}
#[tokio::test]
async fn test_network_stats_default() {
let stats = NetworkStats::default();
assert_eq!(stats.total_connections, 0);
assert_eq!(stats.active_connections, 0);
assert_eq!(stats.bytes_sent, 0);
assert_eq!(stats.bytes_received, 0);
assert_eq!(stats.peer_count, 0);
}
}
#[tokio::test]
async fn test_network_node_subscribe_events() {
let config = NetworkConfig::default();
let node = NetworkNode::new(config, None, None).await.unwrap();
let mut receiver = node.subscribe();
let event = NetworkEvent::PeerConnected {
peer_id: [1; 32],
address: "127.0.0.1:9000".parse().unwrap(),
};
node.emit_event(event);
let received = receiver.recv().await;
assert!(received.is_ok());
match received.unwrap() {
NetworkEvent::PeerConnected { peer_id, address } => {
assert_eq!(peer_id, [1; 32]);
assert_eq!(address, "127.0.0.1:9000".parse().unwrap());
}
_ => panic!("Expected PeerConnected event"),
}
}
#[tokio::test]
async fn test_network_node_multiple_subscribers() {
let config = NetworkConfig::default();
let node = NetworkNode::new(config, None, None).await.unwrap();
let mut rx1 = node.subscribe();
let mut rx2 = node.subscribe();
let event = NetworkEvent::NatTypeDetected {
nat_type: "Full Cone".to_string(),
};
node.emit_event(event);
assert!(rx1.recv().await.is_ok());
assert!(rx2.recv().await.is_ok());
}
#[ignore = "timing-sensitive mesh test — run manually with: cargo test test_mesh -- --ignored --nocapture"]
#[tokio::test]
async fn test_mesh_connections_are_bidirectional() {
const NODE_COUNT: usize = 4;
const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
let mut nodes = Vec::with_capacity(NODE_COUNT);
let mut addrs = Vec::with_capacity(NODE_COUNT);
for _ in 0..NODE_COUNT {
let config = NetworkConfig {
bind_addr: Some("127.0.0.1:0".parse().unwrap()),
bootstrap_nodes: Vec::new(),
max_connections: 100,
connection_timeout: CONNECT_TIMEOUT,
stats_interval: std::time::Duration::from_secs(60),
peer_cache_path: None,
pinned_bootstrap_peers: std::collections::HashSet::new(),
inbound_allowlist: std::collections::HashSet::new(),
max_peers_per_ip: 3,
};
let node = NetworkNode::new(config, None, None).await.unwrap();
nodes.push(node);
}
for node in &nodes {
let bound = node
.bound_addr()
.await
.expect("node must have a bound address");
let addr: SocketAddr = format!("127.0.0.1:{}", bound.port()).parse().unwrap();
addrs.push(addr);
}
let mut handles = Vec::new();
for (i, node) in nodes.iter().enumerate() {
for (j, target_addr) in addrs.iter().enumerate() {
if i == j {
continue;
}
let node = node.clone();
let addr = *target_addr;
handles.push(tokio::spawn(async move {
let result = tokio::time::timeout(CONNECT_TIMEOUT, node.connect_addr(addr)).await;
(i, j, result)
}));
}
}
let mut successful_connections = 0u32;
for handle in handles {
let (from, to, result) = handle.await.unwrap();
match result {
Ok(Ok(_)) => {
successful_connections += 1;
}
Ok(Err(e)) => {
eprintln!("Connection {}->{} failed: {}", from, to, e);
}
Err(_) => {
eprintln!("Connection {}->{} timed out", from, to);
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
for (i, node) in nodes.iter().enumerate() {
let count = node.connection_count().await;
eprintln!("Node {} ({}) has {} peers", i, addrs[i], count);
}
for i in 0..NODE_COUNT {
let peers_i = nodes[i].connected_peers().await;
for j in 0..NODE_COUNT {
if i == j {
continue;
}
let j_peer_id = nodes[j].peer_id();
let i_sees_j = peers_i.contains(&j_peer_id);
if i_sees_j {
let peers_j = nodes[j].connected_peers().await;
let i_peer_id = nodes[i].peer_id();
let j_sees_i = peers_j.contains(&i_peer_id);
assert!(
j_sees_i,
"Phantom connection: node {} sees node {} but node {} does not see node {} back",
i, j, j, i
);
}
}
}
assert!(
successful_connections > 0,
"No connections succeeded at all — this indicates a transport/binding issue, not a phantom connection bug"
);
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Message {
pub id: [u8; 32],
pub sender: [u8; 32],
pub topic: String,
pub payload: Vec<u8>,
pub timestamp: u64,
pub sequence: u64,
}
impl Message {
pub fn new(sender: [u8; 32], topic: String, payload: Vec<u8>) -> NetworkResult<Self> {
let timestamp = current_timestamp()?;
let id = generate_message_id(&sender, &topic, &payload, timestamp);
Ok(Self {
id,
sender,
topic,
payload,
timestamp,
sequence: 0,
})
}
pub fn with_sequence(
sender: [u8; 32],
topic: String,
payload: Vec<u8>,
sequence: u64,
) -> NetworkResult<Self> {
let mut msg = Self::new(sender, topic, payload)?;
msg.sequence = sequence;
Ok(msg)
}
pub fn to_json(&self) -> NetworkResult<Vec<u8>> {
serde_json::to_vec(self)
.map_err(|e| NetworkError::SerializationError(format!("JSON encode failed: {}", e)))
}
pub fn from_json(data: &[u8]) -> NetworkResult<Self> {
serde_json::from_slice(data)
.map_err(|e| NetworkError::SerializationError(format!("JSON decode failed: {}", e)))
}
pub fn to_binary(&self) -> NetworkResult<Vec<u8>> {
bincode::serialize(self)
.map_err(|e| NetworkError::SerializationError(format!("Binary encode failed: {}", e)))
}
pub fn from_binary(data: &[u8]) -> NetworkResult<Self> {
use bincode::Options;
bincode::options()
.with_fixint_encoding()
.with_limit(MAX_MESSAGE_DESERIALIZE_SIZE)
.allow_trailing_bytes()
.deserialize(data)
.map_err(|e| NetworkError::SerializationError(format!("Binary decode failed: {}", e)))
}
pub fn binary_size(&self) -> NetworkResult<usize> {
self.to_binary().map(|b| b.len())
}
pub fn json_size(&self) -> NetworkResult<usize> {
self.to_json().map(|j| j.len())
}
}
fn current_timestamp() -> NetworkResult<u64> {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.map_err(|_| NetworkError::TimestampError("System time before UNIX_EPOCH".to_string()))
}
fn generate_message_id(sender: &[u8; 32], topic: &str, payload: &[u8], timestamp: u64) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(sender);
hasher.update(topic.as_bytes());
hasher.update(payload);
hasher.update(×tamp.to_le_bytes());
let hash = hasher.finalize();
*hash.as_bytes()
}
#[cfg(test)]
mod message_tests {
#![allow(clippy::unwrap_used)]
use super::*;
#[test]
fn test_message_creation() {
let sender = [1; 32];
let topic = "test".to_string();
let payload = b"Hello".to_vec();
let msg = Message::new(sender, topic.clone(), payload.clone()).unwrap();
assert_eq!(msg.sender, sender);
assert_eq!(msg.topic, topic);
assert_eq!(msg.payload, payload);
assert!(msg.timestamp > 0);
assert_eq!(msg.sequence, 0);
assert_ne!(msg.id, [0; 32]);
}
#[test]
fn test_message_with_sequence() {
let sender = [2; 32];
let topic = "ordered".to_string();
let payload = b"Message 42".to_vec();
let sequence = 42u64;
let msg = Message::with_sequence(sender, topic, payload, sequence).unwrap();
assert_eq!(msg.sequence, 42);
assert_eq!(msg.sender, sender);
}
#[test]
fn test_message_json_roundtrip() {
let sender = [3; 32];
let topic = "json".to_string();
let payload = b"Test payload".to_vec();
let original = Message::new(sender, topic, payload).unwrap();
let json = original.to_json().unwrap();
let deserialized = Message::from_json(&json).unwrap();
assert_eq!(original, deserialized);
}
#[test]
fn test_message_binary_roundtrip() {
let sender = [4; 32];
let topic = "binary".to_string();
let payload = b"Binary test".to_vec();
let original = Message::new(sender, topic, payload).unwrap();
let binary = original.to_binary().unwrap();
let deserialized = Message::from_binary(&binary).unwrap();
assert_eq!(original, deserialized);
}
#[test]
fn test_message_binary_size() {
let sender = [6; 32];
let topic = "sizing".to_string();
let payload = b"Payload for size test".to_vec();
let msg = Message::new(sender, topic, payload).unwrap();
let binary_size = msg.binary_size().unwrap();
assert!(binary_size > 0);
let json_size = msg.json_size().unwrap();
assert!(json_size > 0);
assert!(json_size > binary_size);
}
#[test]
fn test_message_empty_payload() {
let sender = [7; 32];
let topic = "empty".to_string();
let payload = Vec::new();
let msg = Message::new(sender, topic, payload).unwrap();
assert_eq!(msg.payload.len(), 0);
assert_ne!(msg.id, [0; 32]);
}
#[test]
fn test_message_large_payload() {
let sender = [8; 32];
let topic = "large".to_string();
let payload = vec![42u8; 10000];
let msg = Message::new(sender, topic, payload.clone()).unwrap();
assert_eq!(msg.payload.len(), 10000);
assert_eq!(msg.payload, payload);
}
#[test]
fn test_message_unicode_topic() {
let sender = [10; 32];
let topic = "тема/главная/система".to_string();
let payload = b"Unicode test".to_vec();
let msg = Message::new(sender, topic.clone(), payload).unwrap();
assert_eq!(msg.topic, topic);
let json = msg.to_json().unwrap();
let deserialized = Message::from_json(&json).unwrap();
assert_eq!(deserialized.topic, topic);
}
#[test]
fn test_current_timestamp_positive() {
let ts = current_timestamp().unwrap();
assert!(ts > 1600000000);
assert!(ts < 2000000000);
}
}