1use super::*;
17use snarkos_node_router::{
18 Routing,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 DisconnectReason,
24 MessageCodec,
25 PeerRequest,
26 Ping,
27 Pong,
28 PuzzleResponse,
29 UnconfirmedTransaction,
30 },
31};
32use snarkos_node_sync::communication_service::CommunicationService;
33use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
34use snarkvm::{
35 ledger::narwhal::Data,
36 prelude::{Network, block::Transaction},
37};
38
39use std::{io, net::SocketAddr, time::Duration};
40
41impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> {
42 fn tcp(&self) -> &Tcp {
44 self.router.tcp()
45 }
46}
47
48#[async_trait]
49impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> {
50 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
52 let peer_addr = connection.addr();
54 let conn_side = connection.side();
55 let stream = self.borrow_stream(&mut connection);
56 let genesis_header = *self.genesis.header();
57 let restrictions_id = self.ledger.vm().restrictions().restrictions_id();
58 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
59
60 Ok(connection)
61 }
62}
63
64#[async_trait]
65impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C>
66where
67 Self: Outbound<N>,
68{
69 async fn on_connect(&self, peer_addr: SocketAddr) {
70 let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
72 self.router().insert_connected_peer(peer_ip);
74 if self.router.bootstrap_peers().contains(&peer_ip) {
76 Outbound::send(self, peer_ip, Message::PeerRequest(PeerRequest));
77 }
78 let block_locators = match self.sync.get_block_locators() {
80 Ok(block_locators) => Some(block_locators),
81 Err(e) => {
82 error!("Failed to get block locators: {e}");
83 return;
84 }
85 };
86 self.send_ping(peer_ip, block_locators);
88 }
89}
90
91#[async_trait]
92impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> {
93 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
95 if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) {
96 self.sync.remove_peer(&peer_ip);
97 self.router.remove_connected_peer(peer_ip);
98 }
99 }
100}
101
102#[async_trait]
103impl<N: Network, C: ConsensusStorage<N>> Writing for Client<N, C> {
104 type Codec = MessageCodec<N>;
105 type Message = Message<N>;
106
107 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
110 Default::default()
111 }
112}
113
114#[async_trait]
115impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> {
116 type Codec = MessageCodec<N>;
117 type Message = Message<N>;
118
119 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
122 Default::default()
123 }
124
125 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
127 let clone = self.clone();
128 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
129 tokio::spawn(async move {
132 clone.process_message_inner(peer_addr, message).await;
133 });
134 } else {
135 self.process_message_inner(peer_addr, message).await;
136 }
137 Ok(())
138 }
139}
140
141impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
142 async fn process_message_inner(
143 &self,
144 peer_addr: SocketAddr,
145 message: <Client<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
146 ) {
147 if let Err(error) = self.inbound(peer_addr, message).await {
149 warn!("Failed to process inbound message from '{peer_addr}' - {error}");
150 if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
151 warn!("Disconnecting from '{peer_ip}' for protocol violation");
152 Outbound::send(self, peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
153 self.router().disconnect(peer_ip);
155 }
156 }
157 }
158}
159
160#[async_trait]
161impl<N: Network, C: ConsensusStorage<N>> CommunicationService for Client<N, C> {
162 type Message = Message<N>;
164
165 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
167 debug_assert!(start_height < end_height, "Invalid block request format");
168 Message::BlockRequest(BlockRequest { start_height, end_height })
169 }
170
171 async fn send(
177 &self,
178 peer_ip: SocketAddr,
179 message: Self::Message,
180 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
181 Outbound::send(self, peer_ip, message)
182 }
183}
184
185#[async_trait]
186impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Client<N, C> {}
187
188impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Client<N, C> {}
189
190impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
191 fn router(&self) -> &Router<N> {
193 &self.router
194 }
195
196 fn is_block_synced(&self) -> bool {
198 self.sync.is_block_synced()
199 }
200
201 fn num_blocks_behind(&self) -> u32 {
203 self.sync.num_blocks_behind()
204 }
205}
206
207#[async_trait]
208impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {
209 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool {
211 let BlockRequest { start_height, end_height } = &message;
212
213 let blocks = match self.ledger.get_blocks(*start_height..*end_height) {
215 Ok(blocks) => Data::Object(DataBlocks(blocks)),
216 Err(error) => {
217 error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}");
218 return false;
219 }
220 };
221 Outbound::send(self, peer_ip, Message::BlockResponse(BlockResponse { request: message, blocks }));
223 true
224 }
225
226 fn block_response(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> bool {
228 match self.sync.advance_with_sync_blocks(peer_ip, blocks) {
230 Ok(()) => true,
231 Err(error) => {
232 warn!("{error}");
233 false
234 }
235 }
236 }
237
238 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
240 if self.sync.mode().is_router() {
242 if let Some(block_locators) = message.block_locators {
244 if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) {
246 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
247 return false;
248 }
249 }
250 }
251
252 Outbound::send(self, peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
254 true
255 }
256
257 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
259 let self_ = self.clone();
261 tokio::spawn(async move {
262 tokio::time::sleep(Duration::from_secs(Self::PING_SLEEP_IN_SECS)).await;
264 if self_.router().is_connected(&peer_ip) {
266 match self_.sync.get_block_locators() {
268 Ok(block_locators) => self_.send_ping(peer_ip, Some(block_locators)),
270 Err(e) => error!("Failed to get block locators - {e}"),
271 }
272 }
273 });
274 true
275 }
276
277 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
279 let epoch_hash = match self.ledger.latest_epoch_hash() {
281 Ok(epoch_hash) => epoch_hash,
282 Err(error) => {
283 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}");
284 return false;
285 }
286 };
287 let block_header = Data::Object(self.ledger.latest_header());
289 Outbound::send(self, peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header }));
291 true
292 }
293
294 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
296 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
297 false
298 }
299
300 async fn unconfirmed_solution(
302 &self,
303 peer_ip: SocketAddr,
304 serialized: UnconfirmedSolution<N>,
305 solution: Solution<N>,
306 ) -> bool {
307 let mut solution_queue = self.solution_queue.lock();
309 if !solution_queue.contains(&solution.id()) {
310 solution_queue.put(solution.id(), (peer_ip, serialized, solution));
311 }
312
313 true }
315
316 async fn unconfirmed_transaction(
318 &self,
319 peer_ip: SocketAddr,
320 serialized: UnconfirmedTransaction<N>,
321 transaction: Transaction<N>,
322 ) -> bool {
323 match &transaction {
325 Transaction::<N>::Fee(..) => (), Transaction::<N>::Deploy(..) => {
327 let mut deploy_queue = self.deploy_queue.lock();
328 if !deploy_queue.contains(&transaction.id()) {
329 deploy_queue.put(transaction.id(), (peer_ip, serialized, transaction));
330 }
331 }
332 Transaction::<N>::Execute(..) => {
333 let mut execute_queue = self.execute_queue.lock();
334 if !execute_queue.contains(&transaction.id()) {
335 execute_queue.put(transaction.id(), (peer_ip, serialized, transaction));
336 }
337 }
338 }
339
340 true }
342}