1use super::*;
17use snarkos_node_router::{
18 PeerPoolHandling,
19 Routing,
20 messages::{
21 BlockRequest,
22 BlockResponse,
23 DataBlocks,
24 DisconnectReason,
25 MessageCodec,
26 PeerRequest,
27 Ping,
28 Pong,
29 PuzzleResponse,
30 UnconfirmedTransaction,
31 },
32};
33use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
34use snarkvm::{
35 ledger::narwhal::Data,
36 prelude::{Network, block::Transaction},
37};
38
39use std::{io, net::SocketAddr};
40
41impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> {
42 fn tcp(&self) -> &Tcp {
44 self.router.tcp()
45 }
46}
47
48#[async_trait]
49impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> {
50 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
52 let peer_addr = connection.addr();
54 let conn_side = connection.side();
55 let stream = self.borrow_stream(&mut connection);
56 let genesis_header = *self.genesis.header();
57 let restrictions_id = self.ledger.vm().restrictions().restrictions_id();
58 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
59
60 Ok(connection)
61 }
62}
63
64#[async_trait]
65impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C> {
66 async fn on_connect(&self, peer_addr: SocketAddr) {
67 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr) {
69 if let Some(peer) = self.router().get_connected_peer(listener_addr) {
70 if peer.node_type == NodeType::BootstrapClient {
72 self.router().send(listener_addr, Message::PeerRequest(PeerRequest));
73 } else {
74 self.ping.on_peer_connected(listener_addr);
76 }
77 }
78 }
79 }
80}
81
82#[async_trait]
83impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> {
84 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
86 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) {
87 self.sync.remove_peer(&peer_ip);
88 self.router.downgrade_peer_to_candidate(peer_ip);
89 self.router.cache().clear_peer_entries(peer_ip);
91 #[cfg(feature = "metrics")]
92 self.router.update_metrics();
93 }
94 }
95}
96
97#[async_trait]
98impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> {
99 type Codec = MessageCodec<N>;
100 type Message = Message<N>;
101
102 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
105 Default::default()
106 }
107
108 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
110 let clone = self.clone();
111 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
112 tokio::spawn(async move {
115 clone.process_message_inner(peer_addr, message).await;
116 });
117 } else {
118 self.process_message_inner(peer_addr, message).await;
119 }
120 Ok(())
121 }
122}
123
124impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
125 async fn process_message_inner(
126 &self,
127 peer_addr: SocketAddr,
128 message: <Client<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
129 ) {
130 if let Err(error) = self.inbound(peer_addr, message).await {
132 warn!("Failed to process inbound message from '{peer_addr}' - {error}");
133 if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) {
134 warn!("Disconnecting from '{peer_ip}' for protocol violation");
135 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
136 self.router().disconnect(peer_ip);
138 }
139 }
140 }
141}
142
143#[async_trait]
144impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Client<N, C> {}
145
146impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Client<N, C> {}
147
148impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
149 fn router(&self) -> &Router<N> {
151 &self.router
152 }
153
154 fn is_block_synced(&self) -> bool {
156 self.sync.is_block_synced()
157 }
158
159 fn num_blocks_behind(&self) -> Option<u32> {
162 self.sync.num_blocks_behind()
163 }
164
165 fn get_sync_speed(&self) -> f64 {
167 self.sync.get_sync_speed()
168 }
169}
170
171#[async_trait]
172impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {
173 fn is_valid_message_version(&self, message_version: u32) -> bool {
175 self.router().is_valid_message_version(message_version)
176 }
177
178 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool {
180 let BlockRequest { start_height, end_height } = &message;
181
182 let blocks = match self.ledger.get_blocks(*start_height..*end_height) {
184 Ok(blocks) => Data::Object(DataBlocks(blocks)),
185 Err(error) => {
186 error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}");
187 return false;
188 }
189 };
190 self.router().send(peer_ip, Message::BlockResponse(BlockResponse { request: message, blocks }));
192 true
193 }
194
195 fn block_response(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> bool {
197 if let Err(err) = self.sync.insert_block_responses(peer_ip, blocks) {
199 warn!("Failed to insert block response: {err}");
200 false
201 } else {
202 true
203 }
204 }
205
206 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
208 if let Some(block_locators) = message.block_locators {
210 if let Err(error) = self.sync.update_peer_locators(peer_ip, &block_locators) {
212 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
213 return false;
214 }
215
216 let last_peer_height = Some(block_locators.latest_locator_height());
217 self.router().update_connected_peer(&peer_ip, |peer| peer.last_height_seen = last_peer_height);
218 }
219
220 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
222 true
223 }
224
225 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
227 self.ping.on_pong_received(peer_ip);
228 true
229 }
230
231 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
233 let epoch_hash = match self.ledger.latest_epoch_hash() {
235 Ok(epoch_hash) => epoch_hash,
236 Err(error) => {
237 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}");
238 return false;
239 }
240 };
241 let block_header = Data::Object(self.ledger.latest_header());
243 self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header }));
245 true
246 }
247
248 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
250 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
251 false
252 }
253
254 async fn unconfirmed_solution(
256 &self,
257 peer_ip: SocketAddr,
258 serialized: UnconfirmedSolution<N>,
259 solution: Solution<N>,
260 ) -> bool {
261 let mut solution_queue = self.solution_queue.lock();
263 if !solution_queue.contains(&solution.id()) {
264 solution_queue.put(solution.id(), (peer_ip, serialized, solution));
265 }
266
267 true }
269
270 async fn unconfirmed_transaction(
272 &self,
273 peer_ip: SocketAddr,
274 serialized: UnconfirmedTransaction<N>,
275 transaction: Transaction<N>,
276 ) -> bool {
277 match &transaction {
279 Transaction::<N>::Fee(..) => (), Transaction::<N>::Deploy(..) => {
281 let mut deploy_queue = self.deploy_queue.lock();
282 if !deploy_queue.contains(&transaction.id()) {
283 deploy_queue.put(transaction.id(), (peer_ip, serialized, transaction));
284 }
285 }
286 Transaction::<N>::Execute(..) => {
287 let mut execute_queue = self.execute_queue.lock();
288 if !execute_queue.contains(&transaction.id()) {
289 execute_queue.put(transaction.id(), (peer_ip, serialized, transaction));
290 }
291 }
292 }
293
294 true }
296}