1use super::*;
17use snarkos_node_router::{
18 Routing,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 DisconnectReason,
24 MessageCodec,
25 PeerRequest,
26 Ping,
27 Pong,
28 PuzzleResponse,
29 UnconfirmedTransaction,
30 },
31};
32use snarkos_node_sync::communication_service::CommunicationService;
33use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
34use snarkvm::{
35 ledger::narwhal::Data,
36 prelude::{Network, block::Transaction},
37};
38
39use std::{io, net::SocketAddr, time::Duration};
40
41impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> {
42 fn tcp(&self) -> &Tcp {
44 self.router.tcp()
45 }
46}
47
48#[async_trait]
49impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> {
50 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
52 let peer_addr = connection.addr();
54 let conn_side = connection.side();
55 let stream = self.borrow_stream(&mut connection);
56 let genesis_header = *self.genesis.header();
57 let restrictions_id = self.ledger.vm().restrictions().restrictions_id();
58 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
59
60 Ok(connection)
61 }
62}
63
64#[async_trait]
65impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C>
66where
67 Self: Outbound<N>,
68{
69 async fn on_connect(&self, peer_addr: SocketAddr) {
70 let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
72 self.router().insert_connected_peer(peer_ip);
74 if self.router.bootstrap_peers().contains(&peer_ip) {
76 Outbound::send(self, peer_ip, Message::PeerRequest(PeerRequest));
77 }
78 let block_locators = match self.sync.get_block_locators() {
80 Ok(block_locators) => Some(block_locators),
81 Err(e) => {
82 error!("Failed to get block locators: {e}");
83 return;
84 }
85 };
86 self.send_ping(peer_ip, block_locators);
88 }
89}
90
91#[async_trait]
92impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> {
93 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
95 if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) {
96 self.sync.remove_peer(&peer_ip);
97 self.router.remove_connected_peer(peer_ip);
98 }
99 }
100}
101
102#[async_trait]
103impl<N: Network, C: ConsensusStorage<N>> Writing for Client<N, C> {
104 type Codec = MessageCodec<N>;
105 type Message = Message<N>;
106
107 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
110 Default::default()
111 }
112}
113
114#[async_trait]
115impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> {
116 type Codec = MessageCodec<N>;
117 type Message = Message<N>;
118
119 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
122 Default::default()
123 }
124
125 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
127 let clone = self.clone();
128 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
129 tokio::spawn(async move {
132 clone.process_message_inner(peer_addr, message).await;
133 });
134 } else {
135 self.process_message_inner(peer_addr, message).await;
136 }
137 Ok(())
138 }
139}
140
141impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
142 async fn process_message_inner(
143 &self,
144 peer_addr: SocketAddr,
145 message: <Client<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
146 ) {
147 if let Err(error) = self.inbound(peer_addr, message).await {
149 warn!("Failed to process inbound message from '{peer_addr}' - {error}");
150 if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
151 warn!("Disconnecting from '{peer_ip}' for protocol violation");
152 Outbound::send(self, peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
153 self.router().disconnect(peer_ip);
155 }
156 }
157 }
158}
159
160#[async_trait]
161impl<N: Network, C: ConsensusStorage<N>> CommunicationService for Client<N, C> {
162 type Message = Message<N>;
164
165 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
167 debug_assert!(start_height < end_height, "Invalid block request format");
168 Message::BlockRequest(BlockRequest { start_height, end_height })
169 }
170
171 async fn send(
177 &self,
178 peer_ip: SocketAddr,
179 message: Self::Message,
180 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
181 Outbound::send(self, peer_ip, message)
182 }
183}
184
185#[async_trait]
186impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Client<N, C> {}
187
188impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Client<N, C> {}
189
190impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
191 fn router(&self) -> &Router<N> {
193 &self.router
194 }
195
196 fn is_block_synced(&self) -> bool {
198 self.sync.is_block_synced()
199 }
200
201 fn num_blocks_behind(&self) -> u32 {
203 self.sync.num_blocks_behind()
204 }
205}
206
207#[async_trait]
208impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {
209 fn is_valid_message_version(&self, message_version: u32) -> bool {
211 self.router().is_valid_message_version(message_version)
212 }
213
214 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool {
216 let BlockRequest { start_height, end_height } = &message;
217
218 let blocks = match self.ledger.get_blocks(*start_height..*end_height) {
220 Ok(blocks) => Data::Object(DataBlocks(blocks)),
221 Err(error) => {
222 error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}");
223 return false;
224 }
225 };
226 Outbound::send(self, peer_ip, Message::BlockResponse(BlockResponse { request: message, blocks }));
228 true
229 }
230
231 fn block_response(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> bool {
233 match self.sync.advance_with_sync_blocks(peer_ip, blocks) {
235 Ok(()) => true,
236 Err(error) => {
237 warn!("{error}");
238 false
239 }
240 }
241 }
242
243 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
245 if self.sync.mode().is_router() {
247 if let Some(block_locators) = message.block_locators {
249 if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) {
251 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
252 return false;
253 }
254 }
255 }
256
257 Outbound::send(self, peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
259 true
260 }
261
262 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
264 let self_ = self.clone();
266 tokio::spawn(async move {
267 tokio::time::sleep(Duration::from_secs(Self::PING_SLEEP_IN_SECS)).await;
269 if self_.router().is_connected(&peer_ip) {
271 match self_.sync.get_block_locators() {
273 Ok(block_locators) => self_.send_ping(peer_ip, Some(block_locators)),
275 Err(e) => error!("Failed to get block locators - {e}"),
276 }
277 }
278 });
279 true
280 }
281
282 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
284 let epoch_hash = match self.ledger.latest_epoch_hash() {
286 Ok(epoch_hash) => epoch_hash,
287 Err(error) => {
288 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}");
289 return false;
290 }
291 };
292 let block_header = Data::Object(self.ledger.latest_header());
294 Outbound::send(self, peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header }));
296 true
297 }
298
299 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
301 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
302 false
303 }
304
305 async fn unconfirmed_solution(
307 &self,
308 peer_ip: SocketAddr,
309 serialized: UnconfirmedSolution<N>,
310 solution: Solution<N>,
311 ) -> bool {
312 let mut solution_queue = self.solution_queue.lock();
314 if !solution_queue.contains(&solution.id()) {
315 solution_queue.put(solution.id(), (peer_ip, serialized, solution));
316 }
317
318 true }
320
321 async fn unconfirmed_transaction(
323 &self,
324 peer_ip: SocketAddr,
325 serialized: UnconfirmedTransaction<N>,
326 transaction: Transaction<N>,
327 ) -> bool {
328 match &transaction {
330 Transaction::<N>::Fee(..) => (), Transaction::<N>::Deploy(..) => {
332 let mut deploy_queue = self.deploy_queue.lock();
333 if !deploy_queue.contains(&transaction.id()) {
334 deploy_queue.put(transaction.id(), (peer_ip, serialized, transaction));
335 }
336 }
337 Transaction::<N>::Execute(..) => {
338 let mut execute_queue = self.execute_queue.lock();
339 if !execute_queue.contains(&transaction.id()) {
340 execute_queue.put(transaction.id(), (peer_ip, serialized, transaction));
341 }
342 }
343 }
344
345 true }
347}