1use super::*;
17
18use snarkos_node_network::harden_socket;
19use snarkos_node_router::messages::{
20 BlockRequest,
21 DisconnectReason,
22 Message,
23 MessageCodec,
24 Ping,
25 Pong,
26 PuzzleRequest,
27 UnconfirmedTransaction,
28};
29use snarkos_node_tcp::{ConnectError, Connection, ConnectionSide, Tcp};
30use snarkvm::{
31 console::network::{ConsensusVersion, Network},
32 ledger::block::Transaction,
33 prelude::{Field, Zero},
34 utilities::into_io_error,
35};
36
37use std::{io, net::SocketAddr};
38
39impl<N: Network, C: ConsensusStorage<N>> P2P for Prover<N, C> {
40 fn tcp(&self) -> &Tcp {
42 self.router.tcp()
43 }
44}
45
46#[async_trait]
47impl<N: Network, C: ConsensusStorage<N>> Handshake for Prover<N, C> {
48 async fn perform_handshake(&self, mut connection: Connection) -> Result<Connection, ConnectError> {
50 let peer_addr = connection.addr();
52 let conn_side = connection.side();
53 let stream = self.borrow_stream(&mut connection);
54 harden_socket(stream)?;
56 let genesis_header = *self.genesis.header();
57 let restrictions_id = Field::zero(); self.router
60 .handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id)
61 .await
62 .map_err(into_io_error)?;
63
64 Ok(connection)
65 }
66}
67
68#[async_trait]
69impl<N: Network, C: ConsensusStorage<N>> OnConnect for Prover<N, C>
70where
71 Self: Outbound<N>,
72{
73 async fn on_connect(&self, peer_addr: SocketAddr) {
74 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr)
76 && let Some(peer) = self.router().get_connected_peer(listener_addr)
77 && peer.node_type != NodeType::BootstrapClient
78 {
79 self.ping.on_peer_connected(listener_addr);
81 }
82 }
83}
84
85#[async_trait]
86impl<N: Network, C: ConsensusStorage<N>> Disconnect for Prover<N, C> {
87 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
89 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) {
90 self.sync.remove_peer(&peer_ip);
91 self.router.downgrade_peer_to_candidate(peer_ip);
92 self.router.cache().clear_peer_entries(peer_ip);
94 #[cfg(feature = "metrics")]
95 self.router.update_metrics();
96 }
97 }
98}
99
100#[async_trait]
101impl<N: Network, C: ConsensusStorage<N>> Reading for Prover<N, C> {
102 type Codec = MessageCodec<N>;
103 type Message = Message<N>;
104
105 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
108 Default::default()
109 }
110
111 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
113 if let Err(error) = self.inbound(peer_addr, message).await
115 && let Some(peer_ip) = self.router().resolve_to_listener(peer_addr)
116 {
117 warn!("Disconnecting from '{peer_addr}' - {error}");
118 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
119 self.router().disconnect(peer_ip);
121 }
122 Ok(())
123 }
124}
125
126#[async_trait]
127impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Prover<N, C> {}
128
129impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Prover<N, C> {
130 fn handle_puzzle_request(&self) {
132 if let Some((sync_peers, _)) = self.sync.find_sync_peers() {
134 if let Some((peer_ip, _)) = sync_peers.into_iter().max_by_key(|(_, height)| *height) {
136 self.router().send(peer_ip, Message::PuzzleRequest(PuzzleRequest));
138 }
139 }
140 }
141}
142
143impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
144 fn router(&self) -> &Router<N> {
146 &self.router
147 }
148
149 fn is_block_synced(&self) -> bool {
151 true
152 }
153
154 fn num_blocks_behind(&self) -> Option<u32> {
157 Some(0)
159 }
160
161 fn get_sync_speed(&self) -> f64 {
163 0.0
164 }
165}
166
167#[async_trait]
168impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Prover<N, C> {
169 fn is_valid_message_version(&self, message_version: u32) -> bool {
171 self.router().is_valid_message_version(message_version)
172 }
173
174 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool {
176 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation);
177 false
178 }
179
180 fn block_response(
182 &self,
183 peer_ip: SocketAddr,
184 _blocks: Vec<Block<N>>,
185 _latest_consensus_version: Option<ConsensusVersion>,
186 ) -> bool {
187 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation);
188 false
189 }
190
191 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
193 if let Some(block_locators) = message.block_locators {
195 if let Err(error) = self.sync.update_peer_locators(peer_ip, &block_locators) {
197 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
198 return false;
199 }
200 }
201
202 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
204 true
205 }
206
207 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
209 self.ping.on_pong_received(peer_ip);
210 true
211 }
212
213 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
215 debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation);
216 false
217 }
218
219 fn puzzle_response(&self, peer_ip: SocketAddr, epoch_hash: N::BlockHash, header: Header<N>) -> bool {
221 let block_height = header.height();
223
224 info!(
225 "Puzzle (Block {block_height}, Coinbase Target {}, Proof Target {})",
226 header.coinbase_target(),
227 header.proof_target()
228 );
229
230 self.latest_epoch_hash.write().replace(epoch_hash);
232 self.latest_block_header.write().replace(header);
234
235 trace!("Received 'PuzzleResponse' from '{peer_ip}' (Block {block_height})");
236 true
237 }
238
239 async fn unconfirmed_solution(
241 &self,
242 peer_ip: SocketAddr,
243 serialized: UnconfirmedSolution<N>,
244 solution: Solution<N>,
245 ) -> bool {
246 let epoch_hash = *self.latest_epoch_hash.read();
248 let proof_target = self.latest_block_header.read().as_ref().map(|header| header.proof_target());
250
251 if let (Some(epoch_hash), Some(proof_target)) = (epoch_hash, proof_target) {
252 let puzzle = self.puzzle.clone();
254 let is_valid =
255 tokio::task::spawn_blocking(move || puzzle.check_solution(&solution, epoch_hash, proof_target)).await;
256
257 match is_valid {
258 Ok(Ok(())) => {
260 let message = Message::UnconfirmedSolution(serialized);
261 self.propagate(message, &[peer_ip]);
263 }
264 Ok(Err(_)) => {
265 trace!("Invalid solution '{}' for the proof target.", solution.id())
266 }
267 Err(error) => {
269 if let Some(height) = self.latest_block_header.read().as_ref().map(|header| header.height())
270 && height % N::NUM_BLOCKS_PER_EPOCH > 10
271 {
272 warn!("Failed to verify the solution - {error}")
273 }
274 }
275 }
276 }
277 true
278 }
279
280 async fn unconfirmed_transaction(
282 &self,
283 _peer_ip: SocketAddr,
284 _serialized: UnconfirmedTransaction<N>,
285 _transaction: Transaction<N>,
286 ) -> bool {
287 true
288 }
289}