1use super::*;
17
18use snarkos_node_network::harden_socket;
19use snarkos_node_router::messages::{
20 BlockRequest,
21 DisconnectReason,
22 Message,
23 MessageCodec,
24 Ping,
25 Pong,
26 PuzzleRequest,
27 UnconfirmedTransaction,
28};
29use snarkos_node_tcp::{ConnectError, Connection, ConnectionSide, Tcp};
30use snarkvm::{
31 console::network::{ConsensusVersion, Network},
32 ledger::block::Transaction,
33 prelude::{Field, Zero},
34 utilities::into_io_error,
35};
36
37use std::{io, net::SocketAddr};
38
39impl<N: Network, C: ConsensusStorage<N>> P2P for Prover<N, C> {
40 fn tcp(&self) -> &Tcp {
42 self.router.tcp()
43 }
44}
45
46#[async_trait]
47impl<N: Network, C: ConsensusStorage<N>> Handshake for Prover<N, C> {
48 async fn perform_handshake(&self, mut connection: Connection) -> Result<Connection, ConnectError> {
50 let peer_addr = connection.addr();
52 let conn_side = connection.side();
53 let stream = self.borrow_stream(&mut connection);
54 harden_socket(stream)?;
56 let genesis_header = *self.genesis.header();
57 let restrictions_id = Field::zero(); self.router
60 .handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id)
61 .await
62 .map_err(into_io_error)?;
63
64 Ok(connection)
65 }
66}
67
68#[async_trait]
69impl<N: Network, C: ConsensusStorage<N>> OnConnect for Prover<N, C>
70where
71 Self: Outbound<N>,
72{
73 async fn on_connect(&self, peer_addr: SocketAddr) {
74 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr)
76 && let Some(peer) = self.router().get_connected_peer(listener_addr)
77 && peer.node_type != NodeType::BootstrapClient
78 {
79 self.ping.on_peer_connected(listener_addr);
81 }
82 }
83}
84
85#[async_trait]
86impl<N: Network, C: ConsensusStorage<N>> Disconnect for Prover<N, C> {
87 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
89 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) {
90 let was_fully_connected = self.router.downgrade_peer_to_candidate(peer_ip);
91 if was_fully_connected {
93 self.sync.remove_peer(&peer_ip);
94 }
95 self.router.cache().clear_peer_entries(peer_ip);
97 #[cfg(feature = "metrics")]
98 self.router.update_metrics();
99 } else {
100 warn!("Got disconnect for a peer '{peer_addr}' that is not in the peer pool");
101 }
102 }
103}
104
105#[async_trait]
106impl<N: Network, C: ConsensusStorage<N>> Reading for Prover<N, C> {
107 type Codec = MessageCodec<N>;
108 type Message = Message<N>;
109
110 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
113 Default::default()
114 }
115
116 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
118 if let Err(error) = self.inbound(peer_addr, message).await
120 && let Some(peer_ip) = self.router().resolve_to_listener(peer_addr)
121 {
122 warn!("Disconnecting from '{peer_addr}' - {error}");
123 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
124 self.router().disconnect(peer_ip);
126 }
127 Ok(())
128 }
129}
130
131#[async_trait]
132impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Prover<N, C> {}
133
134impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Prover<N, C> {
135 fn handle_puzzle_request(&self) {
137 if let Some((sync_peers, _)) = self.sync.find_sync_peers() {
139 if let Some((peer_ip, _)) = sync_peers.into_iter().max_by_key(|(_, height)| *height) {
141 self.router().send(peer_ip, Message::PuzzleRequest(PuzzleRequest));
143 }
144 }
145 }
146}
147
148impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
149 fn router(&self) -> &Router<N> {
151 &self.router
152 }
153
154 fn is_block_synced(&self) -> bool {
156 true
157 }
158
159 fn num_blocks_behind(&self) -> Option<u32> {
162 Some(0)
164 }
165
166 fn get_sync_speed(&self) -> f64 {
168 0.0
169 }
170}
171
172#[async_trait]
173impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Prover<N, C> {
174 fn is_valid_message_version(&self, message_version: u32) -> bool {
176 self.router().is_valid_message_version(message_version)
177 }
178
179 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool {
181 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation);
182 false
183 }
184
185 fn block_response(
187 &self,
188 peer_ip: SocketAddr,
189 _blocks: Vec<Block<N>>,
190 _latest_consensus_version: Option<ConsensusVersion>,
191 ) -> bool {
192 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation);
193 false
194 }
195
196 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
198 if let Some(block_locators) = message.block_locators {
200 if let Err(error) = self.sync.update_peer_locators(peer_ip, &block_locators) {
202 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
203 return false;
204 }
205 }
206
207 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
209 true
210 }
211
212 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
214 self.ping.on_pong_received(peer_ip);
215 true
216 }
217
218 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
220 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation);
221 false
222 }
223
224 fn puzzle_response(&self, peer_ip: SocketAddr, epoch_hash: N::BlockHash, header: Header<N>) -> bool {
226 let block_height = header.height();
228
229 info!(
230 "Puzzle (Block {block_height}, Coinbase Target {}, Proof Target {})",
231 header.coinbase_target(),
232 header.proof_target()
233 );
234
235 self.latest_epoch_hash.write().replace(epoch_hash);
237 self.latest_block_header.write().replace(header);
239
240 trace!("Received 'PuzzleResponse' from '{peer_ip}' (Block {block_height})");
241 true
242 }
243
244 async fn unconfirmed_solution(
246 &self,
247 peer_ip: SocketAddr,
248 serialized: UnconfirmedSolution<N>,
249 solution: Solution<N>,
250 ) -> bool {
251 let epoch_hash = *self.latest_epoch_hash.read();
253 let proof_target = self.latest_block_header.read().as_ref().map(|header| header.proof_target());
255
256 if let (Some(epoch_hash), Some(proof_target)) = (epoch_hash, proof_target) {
257 let puzzle = self.puzzle.clone();
259 let is_valid =
260 tokio::task::spawn_blocking(move || puzzle.check_solution(&solution, epoch_hash, proof_target)).await;
261
262 match is_valid {
263 Ok(Ok(())) => {
265 let message = Message::UnconfirmedSolution(serialized);
266 self.propagate(message, &[peer_ip]);
268 }
269 Ok(Err(_)) => {
270 trace!("Invalid solution '{}' for the proof target.", solution.id())
271 }
272 Err(error) => {
274 if let Some(height) = self.latest_block_header.read().as_ref().map(|header| header.height())
275 && height % N::NUM_BLOCKS_PER_EPOCH > 10
276 {
277 warn!("Failed to verify the solution - {error}")
278 }
279 }
280 }
281 }
282 true
283 }
284
285 async fn unconfirmed_transaction(
287 &self,
288 _peer_ip: SocketAddr,
289 _serialized: UnconfirmedTransaction<N>,
290 _transaction: Transaction<N>,
291 ) -> bool {
292 true
293 }
294}