1use super::*;
17
18use snarkos_node_router::messages::{
19 BlockRequest,
20 DisconnectReason,
21 Message,
22 MessageCodec,
23 Ping,
24 Pong,
25 PuzzleRequest,
26 UnconfirmedTransaction,
27};
28use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
29use snarkvm::prelude::{Field, Network, Zero, block::Transaction};
30
31use std::{io, net::SocketAddr};
32
33impl<N: Network, C: ConsensusStorage<N>> P2P for Prover<N, C> {
34 fn tcp(&self) -> &Tcp {
36 self.router.tcp()
37 }
38}
39
40#[async_trait]
41impl<N: Network, C: ConsensusStorage<N>> Handshake for Prover<N, C> {
42 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
44 let peer_addr = connection.addr();
46 let conn_side = connection.side();
47 let stream = self.borrow_stream(&mut connection);
48 let genesis_header = *self.genesis.header();
49 let restrictions_id = Field::zero(); self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
51
52 Ok(connection)
53 }
54}
55
56#[async_trait]
57impl<N: Network, C: ConsensusStorage<N>> OnConnect for Prover<N, C>
58where
59 Self: Outbound<N>,
60{
61 async fn on_connect(&self, peer_addr: SocketAddr) {
62 let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
64 self.router().insert_connected_peer(peer_ip);
66 self.ping.on_peer_connected(peer_ip);
68 }
69}
70
71#[async_trait]
72impl<N: Network, C: ConsensusStorage<N>> Disconnect for Prover<N, C> {
73 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
75 if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) {
76 self.sync.remove_peer(&peer_ip);
77 self.router.remove_connected_peer(peer_ip);
78 }
79 }
80}
81
82#[async_trait]
83impl<N: Network, C: ConsensusStorage<N>> Reading for Prover<N, C> {
84 type Codec = MessageCodec<N>;
85 type Message = Message<N>;
86
87 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
90 Default::default()
91 }
92
93 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
95 if let Err(error) = self.inbound(peer_addr, message).await {
97 if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
98 warn!("Disconnecting from '{peer_addr}' - {error}");
99 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
100 self.router().disconnect(peer_ip);
102 }
103 }
104 Ok(())
105 }
106}
107
108#[async_trait]
109impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Prover<N, C> {}
110
111impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Prover<N, C> {
112 fn handle_puzzle_request(&self) {
114 if let Some((sync_peers, _)) = self.sync.find_sync_peers() {
116 if let Some((peer_ip, _)) = sync_peers.into_iter().max_by_key(|(_, height)| *height) {
118 self.router().send(peer_ip, Message::PuzzleRequest(PuzzleRequest));
120 }
121 }
122 }
123}
124
125impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
126 fn router(&self) -> &Router<N> {
128 &self.router
129 }
130
131 fn is_block_synced(&self) -> bool {
133 true
134 }
135
136 fn num_blocks_behind(&self) -> Option<u32> {
139 Some(0)
141 }
142}
143
144#[async_trait]
145impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Prover<N, C> {
146 fn is_valid_message_version(&self, message_version: u32) -> bool {
148 self.router().is_valid_message_version(message_version)
149 }
150
151 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool {
153 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
154 false
155 }
156
157 fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool {
159 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
160 false
161 }
162
163 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
165 if let Some(block_locators) = message.block_locators {
167 if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) {
169 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
170 return false;
171 }
172 }
173
174 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
176 true
177 }
178
179 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
181 self.ping.on_pong_received(peer_ip);
182 true
183 }
184
185 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
187 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
188 false
189 }
190
191 fn puzzle_response(&self, peer_ip: SocketAddr, epoch_hash: N::BlockHash, header: Header<N>) -> bool {
193 let block_height = header.height();
195
196 info!(
197 "Puzzle (Block {block_height}, Coinbase Target {}, Proof Target {})",
198 header.coinbase_target(),
199 header.proof_target()
200 );
201
202 self.latest_epoch_hash.write().replace(epoch_hash);
204 self.latest_block_header.write().replace(header);
206
207 trace!("Received 'PuzzleResponse' from '{peer_ip}' (Block {block_height})");
208 true
209 }
210
211 async fn unconfirmed_solution(
213 &self,
214 peer_ip: SocketAddr,
215 serialized: UnconfirmedSolution<N>,
216 solution: Solution<N>,
217 ) -> bool {
218 let epoch_hash = *self.latest_epoch_hash.read();
220 let proof_target = self.latest_block_header.read().as_ref().map(|header| header.proof_target());
222
223 if let (Some(epoch_hash), Some(proof_target)) = (epoch_hash, proof_target) {
224 let puzzle = self.puzzle.clone();
226 let is_valid =
227 tokio::task::spawn_blocking(move || puzzle.check_solution(&solution, epoch_hash, proof_target)).await;
228
229 match is_valid {
230 Ok(Ok(())) => {
232 let message = Message::UnconfirmedSolution(serialized);
233 self.propagate(message, &[peer_ip]);
235 }
236 Ok(Err(_)) => {
237 trace!("Invalid solution '{}' for the proof target.", solution.id())
238 }
239 Err(error) => {
241 if let Some(height) = self.latest_block_header.read().as_ref().map(|header| header.height()) {
242 if height % N::NUM_BLOCKS_PER_EPOCH > 10 {
243 warn!("Failed to verify the solution - {error}")
244 }
245 }
246 }
247 }
248 }
249 true
250 }
251
252 async fn unconfirmed_transaction(
254 &self,
255 _peer_ip: SocketAddr,
256 _serialized: UnconfirmedTransaction<N>,
257 _transaction: Transaction<N>,
258 ) -> bool {
259 true
260 }
261}