1use super::*;
17use snarkos_node_router::{
18 Routing,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 DisconnectReason,
24 MessageCodec,
25 PeerRequest,
26 Ping,
27 Pong,
28 PuzzleResponse,
29 UnconfirmedTransaction,
30 },
31};
32use snarkos_node_sync::communication_service::CommunicationService;
33use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
34use snarkvm::{
35 ledger::narwhal::Data,
36 prelude::{Network, block::Transaction},
37};
38
39use std::{io, net::SocketAddr};
40
41impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> {
42 fn tcp(&self) -> &Tcp {
44 self.router.tcp()
45 }
46}
47
48#[async_trait]
49impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> {
50 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
52 let peer_addr = connection.addr();
54 let conn_side = connection.side();
55 let stream = self.borrow_stream(&mut connection);
56 let genesis_header = *self.genesis.header();
57 let restrictions_id = self.ledger.vm().restrictions().restrictions_id();
58 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
59
60 Ok(connection)
61 }
62}
63
64#[async_trait]
65impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C> {
66 async fn on_connect(&self, peer_addr: SocketAddr) {
67 let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
69 if self.router.bootstrap_peers().contains(&peer_ip) {
71 self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
72 }
73 self.ping.on_peer_connected(peer_ip);
75 }
76}
77
78#[async_trait]
79impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> {
80 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
82 if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) {
83 self.sync.remove_peer(&peer_ip);
84 self.router.remove_connected_peer(peer_ip);
85 }
86 }
87}
88
89#[async_trait]
90impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> {
91 type Codec = MessageCodec<N>;
92 type Message = Message<N>;
93
94 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
97 Default::default()
98 }
99
100 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
102 let clone = self.clone();
103 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
104 tokio::spawn(async move {
107 clone.process_message_inner(peer_addr, message).await;
108 });
109 } else {
110 self.process_message_inner(peer_addr, message).await;
111 }
112 Ok(())
113 }
114}
115
116impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
117 async fn process_message_inner(
118 &self,
119 peer_addr: SocketAddr,
120 message: <Client<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
121 ) {
122 if let Err(error) = self.inbound(peer_addr, message).await {
124 warn!("Failed to process inbound message from '{peer_addr}' - {error}");
125 if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
126 warn!("Disconnecting from '{peer_ip}' for protocol violation");
127 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
128 self.router().disconnect(peer_ip);
130 }
131 }
132 }
133}
134
135#[async_trait]
136impl<N: Network, C: ConsensusStorage<N>> CommunicationService for Client<N, C> {
137 type Message = Message<N>;
139
140 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
142 debug_assert!(start_height < end_height, "Invalid block request format");
143 Message::BlockRequest(BlockRequest { start_height, end_height })
144 }
145
146 async fn send(
152 &self,
153 peer_ip: SocketAddr,
154 message: Self::Message,
155 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
156 self.router().send(peer_ip, message)
157 }
158
159 fn ban_peer(&self, peer_ip: SocketAddr) {
160 debug!("Banning peer {peer_ip} for timing out on block requests");
161
162 let tcp = self.router.tcp().clone();
163 tcp.banned_peers().update_ip_ban(peer_ip.ip());
164
165 tokio::spawn(async move {
166 tcp.disconnect(peer_ip).await;
167 });
168 }
169}
170
171#[async_trait]
172impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Client<N, C> {}
173
174impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Client<N, C> {}
175
176impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
177 fn router(&self) -> &Router<N> {
179 &self.router
180 }
181
182 fn is_block_synced(&self) -> bool {
184 self.sync.is_block_synced()
185 }
186
187 fn num_blocks_behind(&self) -> Option<u32> {
190 self.sync.num_blocks_behind()
191 }
192}
193
194#[async_trait]
195impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {
196 fn is_valid_message_version(&self, message_version: u32) -> bool {
198 self.router().is_valid_message_version(message_version)
199 }
200
201 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool {
203 let BlockRequest { start_height, end_height } = &message;
204
205 let blocks = match self.ledger.get_blocks(*start_height..*end_height) {
207 Ok(blocks) => Data::Object(DataBlocks(blocks)),
208 Err(error) => {
209 error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}");
210 return false;
211 }
212 };
213 self.router().send(peer_ip, Message::BlockResponse(BlockResponse { request: message, blocks }));
215 true
216 }
217
218 fn block_response(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> bool {
220 if let Err(err) = self.sync.insert_block_responses(peer_ip, blocks) {
222 warn!("Failed to insert block response: {err}");
223 false
224 } else {
225 true
226 }
227 }
228
229 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
231 if let Some(block_locators) = message.block_locators {
233 if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) {
235 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
236 return false;
237 }
238 }
239
240 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
242 true
243 }
244
245 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
247 self.ping.on_pong_received(peer_ip);
248 true
249 }
250
251 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
253 let epoch_hash = match self.ledger.latest_epoch_hash() {
255 Ok(epoch_hash) => epoch_hash,
256 Err(error) => {
257 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}");
258 return false;
259 }
260 };
261 let block_header = Data::Object(self.ledger.latest_header());
263 self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header }));
265 true
266 }
267
268 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
270 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
271 false
272 }
273
274 async fn unconfirmed_solution(
276 &self,
277 peer_ip: SocketAddr,
278 serialized: UnconfirmedSolution<N>,
279 solution: Solution<N>,
280 ) -> bool {
281 let mut solution_queue = self.solution_queue.lock();
283 if !solution_queue.contains(&solution.id()) {
284 solution_queue.put(solution.id(), (peer_ip, serialized, solution));
285 }
286
287 true }
289
290 async fn unconfirmed_transaction(
292 &self,
293 peer_ip: SocketAddr,
294 serialized: UnconfirmedTransaction<N>,
295 transaction: Transaction<N>,
296 ) -> bool {
297 match &transaction {
299 Transaction::<N>::Fee(..) => (), Transaction::<N>::Deploy(..) => {
301 let mut deploy_queue = self.deploy_queue.lock();
302 if !deploy_queue.contains(&transaction.id()) {
303 deploy_queue.put(transaction.id(), (peer_ip, serialized, transaction));
304 }
305 }
306 Transaction::<N>::Execute(..) => {
307 let mut execute_queue = self.execute_queue.lock();
308 if !execute_queue.contains(&transaction.id()) {
309 execute_queue.put(transaction.id(), (peer_ip, serialized, transaction));
310 }
311 }
312 }
313
314 true }
316}