1use super::*;
17use snarkos_node_router::{
18 Routing,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 DisconnectReason,
24 MessageCodec,
25 PeerRequest,
26 Ping,
27 Pong,
28 PuzzleResponse,
29 UnconfirmedTransaction,
30 },
31};
32use snarkos_node_sync::communication_service::CommunicationService;
33use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
34use snarkvm::{
35 ledger::narwhal::Data,
36 prelude::{Network, block::Transaction},
37};
38
39use std::{io, net::SocketAddr};
40
41impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> {
42 fn tcp(&self) -> &Tcp {
44 self.router.tcp()
45 }
46}
47
48#[async_trait]
49impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> {
50 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
52 let peer_addr = connection.addr();
54 let conn_side = connection.side();
55 let stream = self.borrow_stream(&mut connection);
56 let genesis_header = *self.genesis.header();
57 let restrictions_id = self.ledger.vm().restrictions().restrictions_id();
58 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
59
60 Ok(connection)
61 }
62}
63
64#[async_trait]
65impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C> {
66 async fn on_connect(&self, peer_addr: SocketAddr) {
67 let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
69 self.router().insert_connected_peer(peer_ip);
71 if self.router.bootstrap_peers().contains(&peer_ip) {
73 self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
74 }
75 self.ping.on_peer_connected(peer_ip);
77 }
78}
79
80#[async_trait]
81impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> {
82 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
84 if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) {
85 self.sync.remove_peer(&peer_ip);
86 self.router.remove_connected_peer(peer_ip);
87 }
88 }
89}
90
91#[async_trait]
92impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> {
93 type Codec = MessageCodec<N>;
94 type Message = Message<N>;
95
96 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
99 Default::default()
100 }
101
102 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
104 let clone = self.clone();
105 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
106 tokio::spawn(async move {
109 clone.process_message_inner(peer_addr, message).await;
110 });
111 } else {
112 self.process_message_inner(peer_addr, message).await;
113 }
114 Ok(())
115 }
116}
117
118impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
119 async fn process_message_inner(
120 &self,
121 peer_addr: SocketAddr,
122 message: <Client<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
123 ) {
124 if let Err(error) = self.inbound(peer_addr, message).await {
126 warn!("Failed to process inbound message from '{peer_addr}' - {error}");
127 if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
128 warn!("Disconnecting from '{peer_ip}' for protocol violation");
129 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
130 self.router().disconnect(peer_ip);
132 }
133 }
134 }
135}
136
137#[async_trait]
138impl<N: Network, C: ConsensusStorage<N>> CommunicationService for Client<N, C> {
139 type Message = Message<N>;
141
142 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
144 debug_assert!(start_height < end_height, "Invalid block request format");
145 Message::BlockRequest(BlockRequest { start_height, end_height })
146 }
147
148 async fn send(
154 &self,
155 peer_ip: SocketAddr,
156 message: Self::Message,
157 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
158 self.router().send(peer_ip, message)
159 }
160
161 fn ban_peer(&self, peer_ip: SocketAddr) {
162 debug!("Banning peer {peer_ip} for timing out on block requests");
163
164 let tcp = self.router.tcp().clone();
165 tcp.banned_peers().update_ip_ban(peer_ip.ip());
166
167 tokio::spawn(async move {
168 tcp.disconnect(peer_ip).await;
169 });
170 }
171}
172
173#[async_trait]
174impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Client<N, C> {}
175
176impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Client<N, C> {}
177
178impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
179 fn router(&self) -> &Router<N> {
181 &self.router
182 }
183
184 fn is_block_synced(&self) -> bool {
186 self.sync.is_block_synced()
187 }
188
189 fn num_blocks_behind(&self) -> Option<u32> {
192 self.sync.num_blocks_behind()
193 }
194}
195
196#[async_trait]
197impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {
198 fn is_valid_message_version(&self, message_version: u32) -> bool {
200 self.router().is_valid_message_version(message_version)
201 }
202
203 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool {
205 let BlockRequest { start_height, end_height } = &message;
206
207 let blocks = match self.ledger.get_blocks(*start_height..*end_height) {
209 Ok(blocks) => Data::Object(DataBlocks(blocks)),
210 Err(error) => {
211 error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}");
212 return false;
213 }
214 };
215 self.router().send(peer_ip, Message::BlockResponse(BlockResponse { request: message, blocks }));
217 true
218 }
219
220 fn block_response(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> bool {
222 if let Err(err) = self.sync.insert_block_responses(peer_ip, blocks) {
224 warn!("Failed to insert block response: {err}");
225 false
226 } else {
227 true
228 }
229 }
230
231 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
233 if let Some(block_locators) = message.block_locators {
235 if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) {
237 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
238 return false;
239 }
240 }
241
242 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
244 true
245 }
246
247 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
249 self.ping.on_pong_received(peer_ip);
250 true
251 }
252
253 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
255 let epoch_hash = match self.ledger.latest_epoch_hash() {
257 Ok(epoch_hash) => epoch_hash,
258 Err(error) => {
259 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}");
260 return false;
261 }
262 };
263 let block_header = Data::Object(self.ledger.latest_header());
265 self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header }));
267 true
268 }
269
270 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
272 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
273 false
274 }
275
276 async fn unconfirmed_solution(
278 &self,
279 peer_ip: SocketAddr,
280 serialized: UnconfirmedSolution<N>,
281 solution: Solution<N>,
282 ) -> bool {
283 let mut solution_queue = self.solution_queue.lock();
285 if !solution_queue.contains(&solution.id()) {
286 solution_queue.put(solution.id(), (peer_ip, serialized, solution));
287 }
288
289 true }
291
292 async fn unconfirmed_transaction(
294 &self,
295 peer_ip: SocketAddr,
296 serialized: UnconfirmedTransaction<N>,
297 transaction: Transaction<N>,
298 ) -> bool {
299 match &transaction {
301 Transaction::<N>::Fee(..) => (), Transaction::<N>::Deploy(..) => {
303 let mut deploy_queue = self.deploy_queue.lock();
304 if !deploy_queue.contains(&transaction.id()) {
305 deploy_queue.put(transaction.id(), (peer_ip, serialized, transaction));
306 }
307 }
308 Transaction::<N>::Execute(..) => {
309 let mut execute_queue = self.execute_queue.lock();
310 if !execute_queue.contains(&transaction.id()) {
311 execute_queue.put(transaction.id(), (peer_ip, serialized, transaction));
312 }
313 }
314 }
315
316 true }
318}