1use super::*;
17
18use snarkos_node_router::messages::{
19 BlockRequest,
20 DisconnectReason,
21 Message,
22 MessageCodec,
23 Ping,
24 Pong,
25 PuzzleRequest,
26 UnconfirmedTransaction,
27};
28use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
29use snarkvm::prelude::{ConsensusVersion, Field, Network, Zero, block::Transaction};
30
31use std::{io, net::SocketAddr};
32
33impl<N: Network, C: ConsensusStorage<N>> P2P for Prover<N, C> {
34 fn tcp(&self) -> &Tcp {
36 self.router.tcp()
37 }
38}
39
40#[async_trait]
41impl<N: Network, C: ConsensusStorage<N>> Handshake for Prover<N, C> {
42 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
44 let peer_addr = connection.addr();
46 let conn_side = connection.side();
47 let stream = self.borrow_stream(&mut connection);
48 let genesis_header = *self.genesis.header();
49 let restrictions_id = Field::zero(); self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
51
52 Ok(connection)
53 }
54}
55
56#[async_trait]
57impl<N: Network, C: ConsensusStorage<N>> OnConnect for Prover<N, C>
58where
59 Self: Outbound<N>,
60{
61 async fn on_connect(&self, peer_addr: SocketAddr) {
62 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr) {
64 if let Some(peer) = self.router().get_connected_peer(listener_addr) {
65 if peer.node_type != NodeType::BootstrapClient {
66 self.ping.on_peer_connected(listener_addr);
68 }
69 }
70 }
71 }
72}
73
74#[async_trait]
75impl<N: Network, C: ConsensusStorage<N>> Disconnect for Prover<N, C> {
76 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
78 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) {
79 self.sync.remove_peer(&peer_ip);
80 self.router.downgrade_peer_to_candidate(peer_ip);
81 self.router.cache().clear_peer_entries(peer_ip);
83 #[cfg(feature = "metrics")]
84 self.router.update_metrics();
85 }
86 }
87}
88
89#[async_trait]
90impl<N: Network, C: ConsensusStorage<N>> Reading for Prover<N, C> {
91 type Codec = MessageCodec<N>;
92 type Message = Message<N>;
93
94 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
97 Default::default()
98 }
99
100 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
102 if let Err(error) = self.inbound(peer_addr, message).await {
104 if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) {
105 warn!("Disconnecting from '{peer_addr}' - {error}");
106 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
107 self.router().disconnect(peer_ip);
109 }
110 }
111 Ok(())
112 }
113}
114
115#[async_trait]
116impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Prover<N, C> {}
117
118impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Prover<N, C> {
119 fn handle_puzzle_request(&self) {
121 if let Some((sync_peers, _)) = self.sync.find_sync_peers() {
123 if let Some((peer_ip, _)) = sync_peers.into_iter().max_by_key(|(_, height)| *height) {
125 self.router().send(peer_ip, Message::PuzzleRequest(PuzzleRequest));
127 }
128 }
129 }
130}
131
132impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
133 fn router(&self) -> &Router<N> {
135 &self.router
136 }
137
138 fn is_block_synced(&self) -> bool {
140 true
141 }
142
143 fn num_blocks_behind(&self) -> Option<u32> {
146 Some(0)
148 }
149
150 fn get_sync_speed(&self) -> f64 {
152 0.0
153 }
154}
155
156#[async_trait]
157impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Prover<N, C> {
158 fn is_valid_message_version(&self, message_version: u32) -> bool {
160 self.router().is_valid_message_version(message_version)
161 }
162
163 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool {
165 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
166 false
167 }
168
169 fn block_response(
171 &self,
172 peer_ip: SocketAddr,
173 _blocks: Vec<Block<N>>,
174 _latest_consensus_version: Option<ConsensusVersion>,
175 ) -> bool {
176 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
177 false
178 }
179
180 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
182 if let Some(block_locators) = message.block_locators {
184 if let Err(error) = self.sync.update_peer_locators(peer_ip, &block_locators) {
186 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
187 return false;
188 }
189 }
190
191 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
193 true
194 }
195
196 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
198 self.ping.on_pong_received(peer_ip);
199 true
200 }
201
202 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
204 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
205 false
206 }
207
208 fn puzzle_response(&self, peer_ip: SocketAddr, epoch_hash: N::BlockHash, header: Header<N>) -> bool {
210 let block_height = header.height();
212
213 info!(
214 "Puzzle (Block {block_height}, Coinbase Target {}, Proof Target {})",
215 header.coinbase_target(),
216 header.proof_target()
217 );
218
219 self.latest_epoch_hash.write().replace(epoch_hash);
221 self.latest_block_header.write().replace(header);
223
224 trace!("Received 'PuzzleResponse' from '{peer_ip}' (Block {block_height})");
225 true
226 }
227
228 async fn unconfirmed_solution(
230 &self,
231 peer_ip: SocketAddr,
232 serialized: UnconfirmedSolution<N>,
233 solution: Solution<N>,
234 ) -> bool {
235 let epoch_hash = *self.latest_epoch_hash.read();
237 let proof_target = self.latest_block_header.read().as_ref().map(|header| header.proof_target());
239
240 if let (Some(epoch_hash), Some(proof_target)) = (epoch_hash, proof_target) {
241 let puzzle = self.puzzle.clone();
243 let is_valid =
244 tokio::task::spawn_blocking(move || puzzle.check_solution(&solution, epoch_hash, proof_target)).await;
245
246 match is_valid {
247 Ok(Ok(())) => {
249 let message = Message::UnconfirmedSolution(serialized);
250 self.propagate(message, &[peer_ip]);
252 }
253 Ok(Err(_)) => {
254 trace!("Invalid solution '{}' for the proof target.", solution.id())
255 }
256 Err(error) => {
258 if let Some(height) = self.latest_block_header.read().as_ref().map(|header| header.height()) {
259 if height % N::NUM_BLOCKS_PER_EPOCH > 10 {
260 warn!("Failed to verify the solution - {error}")
261 }
262 }
263 }
264 }
265 }
266 true
267 }
268
269 async fn unconfirmed_transaction(
271 &self,
272 _peer_ip: SocketAddr,
273 _serialized: UnconfirmedTransaction<N>,
274 _transaction: Transaction<N>,
275 ) -> bool {
276 true
277 }
278}