use super::*;
use snarkos_node_sync_locators::BlockLocators;
use snarkos_node_tcp::protocols::Writing;
use std::io;
use tokio::sync::oneshot;
impl<N: Network> Router<N> {
#[must_use]
pub fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) -> bool {
let result = self.send(peer_ip, Message::Ping(messages::Ping::new(self.node_type(), block_locators)));
result.is_some()
}
pub fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
if !self.can_send(peer_ip, &message) {
return None;
}
let peer_addr = match self.resolve_to_ambiguous(peer_ip) {
Some(peer_addr) => peer_addr,
None => {
warn!("Unable to resolve the listener IP address '{peer_ip}'");
return None;
}
};
if let Message::BlockRequest(request) = message {
self.cache.insert_outbound_block_request(peer_ip, request);
}
if matches!(message, Message::PuzzleRequest(_)) {
self.cache.increment_outbound_puzzle_requests(peer_ip);
}
if matches!(message, Message::PeerRequest(_)) {
self.cache.increment_outbound_peer_requests(peer_ip);
}
let name = message.name();
trace!("Sending '{name}' to '{peer_ip}'");
let result = self.unicast(peer_addr, message);
if let Err(e) = &result {
warn!("Failed to send '{name}' to '{peer_ip}': {e}");
debug!("Disconnecting from '{peer_ip}' (unable to send)");
self.disconnect(peer_ip);
}
result.ok()
}
fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool {
if !self.is_connected(peer_ip) {
warn!("Attempted to send to a non-connected peer {peer_ip}");
return false;
}
match message {
Message::UnconfirmedSolution(message) => {
let seen_before = self.cache.insert_outbound_solution(peer_ip, message.solution_id).is_some();
!seen_before
}
Message::UnconfirmedTransaction(message) => {
let seen_before = self.cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some();
!seen_before
}
_ => true,
}
}
}
#[async_trait]
impl<N: Network> Writing for Router<N> {
type Codec = MessageCodec<N>;
type Message = Message<N>;
fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
Default::default()
}
}