use crate::network::network_manager::NetworkManager;
use crate::network::protocol::{
AddrMessage, AddrV2Message, NetworkAddress, ProtocolMessage, ProtocolParser,
};
use crate::network::transport::TransportAddr;
use crate::network::NetworkMessage;
use crate::utils::current_timestamp;
use anyhow::Result;
use std::net::SocketAddr;
use tracing::{debug, warn};
impl NetworkManager {
pub(crate) async fn handle_get_addr(&self, peer_addr: SocketAddr) -> Result<()> {
let ban_list = self.ban_list().read().await.clone();
let connected_peers: Vec<SocketAddr> = {
let pm = self.peer_manager_ref().lock().await;
pm.peer_socket_addresses()
};
let addresses = {
let db = self.address_database().write().await;
let fresh = db.get_fresh_addresses(2500);
db.filter_addresses(fresh, &ban_list, &connected_peers)
};
let addr_msg = AddrMessage { addresses };
let response = ProtocolMessage::Addr(addr_msg);
let wire_msg = ProtocolParser::serialize_message(&response)?;
self.send_to_peer(peer_addr, wire_msg).await?;
Ok(())
}
pub(crate) async fn handle_addr(&self, peer_addr: SocketAddr, msg: AddrMessage) -> Result<()> {
let max_addr = self.protocol_limits().max_addr_to_send;
if msg.addresses.len() > max_addr {
warn!(
"addr message size = {} exceeds max_addr_to_send ({}), disconnecting peer {}",
msg.addresses.len(),
max_addr,
peer_addr
);
let _ = self
.peer_tx()
.send(NetworkMessage::PeerDisconnected(TransportAddr::Tcp(
peer_addr,
)));
return Err(anyhow::anyhow!("addr message size exceeded"));
}
let peer_services = {
let peer_states = self.peer_states().read().await;
peer_states
.get(&peer_addr)
.map(|state| state.services)
.unwrap_or(0)
};
{
let mut db = self.address_database().write().await;
for addr in &msg.addresses {
db.add_address(addr.clone(), peer_services);
}
}
self.relay_addresses(peer_addr, &msg.addresses).await?;
Ok(())
}
pub(crate) async fn handle_addr_v2(
&self,
peer_addr: SocketAddr,
msg: AddrV2Message,
) -> Result<()> {
let max_addr = self.protocol_limits().max_addr_to_send;
if msg.addresses.len() > max_addr {
warn!(
"addrv2 message size = {} exceeds max_addr_to_send ({}), disconnecting peer {}",
msg.addresses.len(),
max_addr,
peer_addr
);
let _ = self
.peer_tx()
.send(NetworkMessage::PeerDisconnected(TransportAddr::Tcp(
peer_addr,
)));
return Err(anyhow::anyhow!("addrv2 message size exceeded"));
}
let legacy: Vec<NetworkAddress> = msg
.addresses
.iter()
.filter_map(|a| a.to_legacy())
.map(|a| NetworkAddress {
services: a.services,
ip: a.ip,
port: a.port,
})
.collect();
if legacy.is_empty() {
debug!(
"AddrV2 from {}: {} entr(ies), none map to legacy net_addr (e.g. onion-only)",
peer_addr,
msg.addresses.len()
);
return Ok(());
}
self.handle_addr(peer_addr, AddrMessage { addresses: legacy })
.await
}
pub(crate) async fn relay_addresses(
&self,
sender_addr: SocketAddr,
addresses: &[NetworkAddress],
) -> Result<()> {
let now = current_timestamp();
let min_interval = 2 * 60 * 60 + 24 * 60;
{
let last_sent = *self.last_addr_sent().lock().await;
if now.saturating_sub(last_sent) < min_interval {
return Ok(());
}
}
let ban_list = self.ban_list().read().await.clone();
let connected_peers: Vec<SocketAddr> = {
let pm = self.peer_manager_ref().lock().await;
pm.peer_socket_addresses()
};
let filtered = {
let db = self.address_database().read().await;
db.filter_addresses(addresses.to_vec(), &ban_list, &connected_peers)
};
if filtered.is_empty() {
return Ok(());
}
let max_addr = self.protocol_limits().max_addr_to_send;
let addresses_to_relay: Vec<NetworkAddress> = filtered.into_iter().take(max_addr).collect();
let addr_msg = AddrMessage {
addresses: addresses_to_relay,
};
let relay_msg = ProtocolMessage::Addr(addr_msg);
let wire_msg = ProtocolParser::serialize_message(&relay_msg)?;
let peer_addrs: Vec<SocketAddr> = {
let pm = self.peer_manager_ref().lock().await;
pm.peer_socket_addresses()
.into_iter()
.filter(|addr| *addr != sender_addr)
.collect()
};
for peer_addr in peer_addrs {
if let Err(e) = self.send_to_peer(peer_addr, wire_msg.clone()).await {
warn!("Failed to relay addresses to {}: {}", peer_addr, e);
}
}
*self.last_addr_sent().lock().await = now;
Ok(())
}
}