1use super::*;
17use snarkos_node_router::{
18 Routing,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 DisconnectReason,
24 MessageCodec,
25 PeerRequest,
26 Ping,
27 Pong,
28 PuzzleResponse,
29 UnconfirmedTransaction,
30 },
31};
32use snarkos_node_sync::communication_service::CommunicationService;
33use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
34use snarkvm::{
35 ledger::narwhal::Data,
36 prelude::{Network, block::Transaction},
37};
38
39use std::{io, net::SocketAddr, time::Duration};
40
41impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> {
42 fn tcp(&self) -> &Tcp {
44 self.router.tcp()
45 }
46}
47
48#[async_trait]
49impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> {
50 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
52 let peer_addr = connection.addr();
54 let conn_side = connection.side();
55 let stream = self.borrow_stream(&mut connection);
56 let genesis_header = *self.genesis.header();
57 let restrictions_id = self.ledger.vm().restrictions().restrictions_id();
58 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
59
60 Ok(connection)
61 }
62}
63
64#[async_trait]
65impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C>
66where
67 Self: Outbound<N>,
68{
69 async fn on_connect(&self, peer_addr: SocketAddr) {
70 let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
72 self.router().insert_connected_peer(peer_ip);
74 if self.router.bootstrap_peers().contains(&peer_ip) {
76 Outbound::send(self, peer_ip, Message::PeerRequest(PeerRequest));
77 }
78 let block_locators = match self.sync.get_block_locators() {
80 Ok(block_locators) => Some(block_locators),
81 Err(e) => {
82 error!("Failed to get block locators: {e}");
83 return;
84 }
85 };
86 self.send_ping(peer_ip, block_locators);
88 }
89}
90
91#[async_trait]
92impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> {
93 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
95 if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) {
96 self.sync.remove_peer(&peer_ip);
97 self.router.remove_connected_peer(peer_ip);
98 }
99 }
100}
101
102#[async_trait]
103impl<N: Network, C: ConsensusStorage<N>> Writing for Client<N, C> {
104 type Codec = MessageCodec<N>;
105 type Message = Message<N>;
106
107 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
110 Default::default()
111 }
112}
113
114#[async_trait]
115impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> {
116 type Codec = MessageCodec<N>;
117 type Message = Message<N>;
118
119 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
122 Default::default()
123 }
124
125 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
127 let clone = self.clone();
128 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
129 tokio::spawn(async move {
132 clone.process_message_inner(peer_addr, message).await;
133 });
134 } else {
135 self.process_message_inner(peer_addr, message).await;
136 }
137 Ok(())
138 }
139}
140
141impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
142 async fn process_message_inner(
143 &self,
144 peer_addr: SocketAddr,
145 message: <Client<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
146 ) {
147 if let Err(error) = self.inbound(peer_addr, message).await {
149 if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
150 warn!("Disconnecting from '{peer_ip}' - {error}");
151 Outbound::send(self, peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
152 self.router().disconnect(peer_ip);
154 }
155 }
156 }
157}
158
159#[async_trait]
160impl<N: Network, C: ConsensusStorage<N>> CommunicationService for Client<N, C> {
161 type Message = Message<N>;
163
164 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
166 debug_assert!(start_height < end_height, "Invalid block request format");
167 Message::BlockRequest(BlockRequest { start_height, end_height })
168 }
169
170 async fn send(
176 &self,
177 peer_ip: SocketAddr,
178 message: Self::Message,
179 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
180 Outbound::send(self, peer_ip, message)
181 }
182}
183
184#[async_trait]
185impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Client<N, C> {}
186
187impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Client<N, C> {}
188
189impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
190 fn router(&self) -> &Router<N> {
192 &self.router
193 }
194
195 fn is_block_synced(&self) -> bool {
197 self.sync.is_block_synced()
198 }
199
200 fn num_blocks_behind(&self) -> u32 {
202 self.sync.num_blocks_behind()
203 }
204}
205
206#[async_trait]
207impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {
208 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool {
210 let BlockRequest { start_height, end_height } = &message;
211
212 let blocks = match self.ledger.get_blocks(*start_height..*end_height) {
214 Ok(blocks) => Data::Object(DataBlocks(blocks)),
215 Err(error) => {
216 error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}");
217 return false;
218 }
219 };
220 Outbound::send(self, peer_ip, Message::BlockResponse(BlockResponse { request: message, blocks }));
222 true
223 }
224
225 fn block_response(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> bool {
227 match self.sync.advance_with_sync_blocks(peer_ip, blocks) {
229 Ok(()) => true,
230 Err(error) => {
231 warn!("{error}");
232 false
233 }
234 }
235 }
236
237 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
239 if self.sync.mode().is_router() {
241 if let Some(block_locators) = message.block_locators {
243 if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) {
245 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
246 return false;
247 }
248 }
249 }
250
251 Outbound::send(self, peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
253 true
254 }
255
256 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
258 let self_ = self.clone();
260 tokio::spawn(async move {
261 tokio::time::sleep(Duration::from_secs(Self::PING_SLEEP_IN_SECS)).await;
263 if self_.router().is_connected(&peer_ip) {
265 match self_.sync.get_block_locators() {
267 Ok(block_locators) => self_.send_ping(peer_ip, Some(block_locators)),
269 Err(e) => error!("Failed to get block locators - {e}"),
270 }
271 }
272 });
273 true
274 }
275
276 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
278 let epoch_hash = match self.ledger.latest_epoch_hash() {
280 Ok(epoch_hash) => epoch_hash,
281 Err(error) => {
282 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}");
283 return false;
284 }
285 };
286 let block_header = Data::Object(self.ledger.latest_header());
288 Outbound::send(self, peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header }));
290 true
291 }
292
293 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
295 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
296 false
297 }
298
299 async fn unconfirmed_solution(
301 &self,
302 peer_ip: SocketAddr,
303 serialized: UnconfirmedSolution<N>,
304 solution: Solution<N>,
305 ) -> bool {
306 let mut solution_queue = self.solution_queue.lock();
308 if !solution_queue.contains(&solution.id()) {
309 solution_queue.put(solution.id(), (peer_ip, serialized, solution));
310 }
311
312 true }
314
315 async fn unconfirmed_transaction(
317 &self,
318 peer_ip: SocketAddr,
319 serialized: UnconfirmedTransaction<N>,
320 transaction: Transaction<N>,
321 ) -> bool {
322 match &transaction {
324 Transaction::<N>::Fee(..) => (), Transaction::<N>::Deploy(..) => {
326 let mut deploy_queue = self.deploy_queue.lock();
327 if !deploy_queue.contains(&transaction.id()) {
328 deploy_queue.put(transaction.id(), (peer_ip, serialized, transaction));
329 }
330 }
331 Transaction::<N>::Execute(..) => {
332 let mut execute_queue = self.execute_queue.lock();
333 if !execute_queue.contains(&transaction.id()) {
334 execute_queue.put(transaction.id(), (peer_ip, serialized, transaction));
335 }
336 }
337 }
338
339 true }
341}