snarkos_node_router/outbound.rs
1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17 Router,
18 messages::{Message, Ping},
19};
20use snarkos_node_sync_locators::BlockLocators;
21use snarkos_node_tcp::protocols::Writing;
22use snarkvm::prelude::Network;
23use std::io;
24
25use std::net::SocketAddr;
26use tokio::sync::oneshot;
27
28pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
29 /// Returns a reference to the router.
30 fn router(&self) -> &Router<N>;
31
32 /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
33 fn is_block_synced(&self) -> bool;
34
35 /// Returns the number of blocks this node is behind the greatest peer height.
36 fn num_blocks_behind(&self) -> u32;
37
38 /// Sends a "Ping" message to the given peer.
39 fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) {
40 self.send(peer_ip, Message::Ping(Ping::new(self.router().node_type(), block_locators)));
41 }
42
43 /// Sends the given message to specified peer.
44 ///
45 /// This function returns as soon as the message is queued to be sent,
46 /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
47 /// which can be used to determine when and whether the message has been delivered.
48 fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
49 // Determine whether to send the message.
50 if !self.can_send(peer_ip, &message) {
51 return None;
52 }
53 // Resolve the listener IP to the (ambiguous) peer address.
54 let peer_addr = match self.router().resolve_to_ambiguous(&peer_ip) {
55 Some(peer_addr) => peer_addr,
56 None => {
57 warn!("Unable to resolve the listener IP address '{peer_ip}'");
58 return None;
59 }
60 };
61 // If the message type is a block request, add it to the cache.
62 if let Message::BlockRequest(request) = message {
63 self.router().cache.insert_outbound_block_request(peer_ip, request);
64 }
65 // If the message type is a puzzle request, increment the cache.
66 if matches!(message, Message::PuzzleRequest(_)) {
67 self.router().cache.increment_outbound_puzzle_requests(peer_ip);
68 }
69 // If the message type is a peer request, increment the cache.
70 if matches!(message, Message::PeerRequest(_)) {
71 self.router().cache.increment_outbound_peer_requests(peer_ip);
72 }
73 // Retrieve the message name.
74 let name = message.name();
75 // Send the message to the peer.
76 trace!("Sending '{name}' to '{peer_ip}'");
77 let result = self.unicast(peer_addr, message);
78 // If the message was unable to be sent, disconnect.
79 if let Err(e) = &result {
80 warn!("Failed to send '{name}' to '{peer_ip}': {e}");
81 debug!("Disconnecting from '{peer_ip}' (unable to send)");
82 self.router().disconnect(peer_ip);
83 }
84 result.ok()
85 }
86
87 /// Sends the given message to every connected peer, excluding the sender and any specified peer IPs.
88 fn propagate(&self, message: Message<N>, excluded_peers: &[SocketAddr]) {
89 // TODO (howardwu): Serialize large messages once only.
90 // // Perform ahead-of-time, non-blocking serialization just once for applicable objects.
91 // if let Message::UnconfirmedSolution(ref mut message) = message {
92 // if let Ok(serialized_solution) = Data::serialize(message.solution.clone()).await {
93 // let _ = std::mem::replace(&mut message.solution, Data::Buffer(serialized_solution));
94 // } else {
95 // error!("Solution serialization is bugged");
96 // }
97 // } else if let Message::UnconfirmedTransaction(ref mut message) = message {
98 // if let Ok(serialized_transaction) = Data::serialize(message.transaction.clone()).await {
99 // let _ = std::mem::replace(&mut message.transaction, Data::Buffer(serialized_transaction));
100 // } else {
101 // error!("Transaction serialization is bugged");
102 // }
103 // }
104
105 // Prepare the peers to send to.
106 let connected_peers = self.router().connected_peers();
107 let peers = connected_peers.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip));
108
109 // Iterate through all peers that are not the sender and excluded peers.
110 for peer_ip in peers {
111 self.send(*peer_ip, message.clone());
112 }
113 }
114
115 /// Sends the given message to every connected validator, excluding the sender and any specified IPs.
116 fn propagate_to_validators(&self, message: Message<N>, excluded_peers: &[SocketAddr]) {
117 // TODO (howardwu): Serialize large messages once only.
118 // // Perform ahead-of-time, non-blocking serialization just once for applicable objects.
119 // if let Message::UnconfirmedSolution(ref mut message) = message {
120 // if let Ok(serialized_solution) = Data::serialize(message.solution.clone()).await {
121 // let _ = std::mem::replace(&mut message.solution, Data::Buffer(serialized_solution));
122 // } else {
123 // error!("Solution serialization is bugged");
124 // }
125 // } else if let Message::UnconfirmedTransaction(ref mut message) = message {
126 // if let Ok(serialized_transaction) = Data::serialize(message.transaction.clone()).await {
127 // let _ = std::mem::replace(&mut message.transaction, Data::Buffer(serialized_transaction));
128 // } else {
129 // error!("Transaction serialization is bugged");
130 // }
131 // }
132
133 // Prepare the peers to send to.
134 let connected_validators = self.router().connected_validators();
135 let peers = connected_validators.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip));
136
137 // Iterate through all validators that are not the sender and excluded validators.
138 for peer_ip in peers {
139 self.send(*peer_ip, message.clone());
140 }
141 }
142
143 /// Returns `true` if the message can be sent.
144 fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool {
145 // Ensure the peer is connected before sending.
146 if !self.router().is_connected(&peer_ip) {
147 warn!("Attempted to send to a non-connected peer {peer_ip}");
148 return false;
149 }
150 // Determine whether to send the message.
151 match message {
152 Message::UnconfirmedSolution(message) => {
153 // Update the timestamp for the unconfirmed solution.
154 let seen_before = self.router().cache.insert_outbound_solution(peer_ip, message.solution_id).is_some();
155 // Determine whether to send the solution.
156 !seen_before
157 }
158 Message::UnconfirmedTransaction(message) => {
159 // Update the timestamp for the unconfirmed transaction.
160 let seen_before =
161 self.router().cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some();
162 // Determine whether to send the transaction.
163 !seen_before
164 }
165 // For all other message types, return `true`.
166 _ => true,
167 }
168 }
169}