1use super::*;
17
18use snarkos_node_router::messages::{
19 BlockRequest,
20 DisconnectReason,
21 Message,
22 MessageCodec,
23 Ping,
24 Pong,
25 PuzzleRequest,
26 UnconfirmedTransaction,
27};
28use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
29use snarkvm::prelude::{Field, Network, Zero, block::Transaction};
30
31use std::{io, net::SocketAddr};
32
33impl<N: Network, C: ConsensusStorage<N>> P2P for Prover<N, C> {
34 fn tcp(&self) -> &Tcp {
36 self.router.tcp()
37 }
38}
39
40#[async_trait]
41impl<N: Network, C: ConsensusStorage<N>> Handshake for Prover<N, C> {
42 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
44 let peer_addr = connection.addr();
46 let conn_side = connection.side();
47 let stream = self.borrow_stream(&mut connection);
48 let genesis_header = *self.genesis.header();
49 let restrictions_id = Field::zero(); self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
51
52 Ok(connection)
53 }
54}
55
56#[async_trait]
57impl<N: Network, C: ConsensusStorage<N>> OnConnect for Prover<N, C>
58where
59 Self: Outbound<N>,
60{
61 async fn on_connect(&self, peer_addr: SocketAddr) {
62 let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
64 self.router().insert_connected_peer(peer_ip);
66 self.send_ping(peer_ip, None);
68 }
69}
70
71#[async_trait]
72impl<N: Network, C: ConsensusStorage<N>> Disconnect for Prover<N, C> {
73 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
75 if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) {
76 self.sync.remove_peer(&peer_ip);
77 self.router.remove_connected_peer(peer_ip);
78 }
79 }
80}
81
82#[async_trait]
83impl<N: Network, C: ConsensusStorage<N>> Writing for Prover<N, C> {
84 type Codec = MessageCodec<N>;
85 type Message = Message<N>;
86
87 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
90 Default::default()
91 }
92}
93
94#[async_trait]
95impl<N: Network, C: ConsensusStorage<N>> Reading for Prover<N, C> {
96 type Codec = MessageCodec<N>;
97 type Message = Message<N>;
98
99 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
102 Default::default()
103 }
104
105 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
107 if let Err(error) = self.inbound(peer_addr, message).await {
109 if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
110 warn!("Disconnecting from '{peer_addr}' - {error}");
111 Outbound::send(self, peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
112 self.router().disconnect(peer_ip);
114 }
115 }
116 Ok(())
117 }
118}
119
120#[async_trait]
121impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Prover<N, C> {}
122
123impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Prover<N, C> {
124 fn handle_puzzle_request(&self) {
126 if let Some((sync_peers, _)) = self.sync.find_sync_peers() {
128 if let Some((peer_ip, _)) = sync_peers.into_iter().max_by_key(|(_, height)| *height) {
130 Outbound::send(self, peer_ip, Message::PuzzleRequest(PuzzleRequest));
132 }
133 }
134 }
135}
136
137impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
138 fn router(&self) -> &Router<N> {
140 &self.router
141 }
142
143 fn is_block_synced(&self) -> bool {
145 true
146 }
147
148 fn num_blocks_behind(&self) -> u32 {
150 0
151 }
152}
153
154#[async_trait]
155impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Prover<N, C> {
156 fn is_valid_message_version(&self, message_version: u32) -> bool {
158 self.router().is_valid_message_version(message_version)
159 }
160
161 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool {
163 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
164 false
165 }
166
167 fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool {
169 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
170 false
171 }
172
173 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
175 if self.sync.mode().is_router() {
177 if let Some(block_locators) = message.block_locators {
179 if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) {
181 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
182 return false;
183 }
184 }
185 }
186
187 Outbound::send(self, peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
189 true
190 }
191
192 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
194 let self_clone = self.clone();
196 tokio::spawn(async move {
197 tokio::time::sleep(Duration::from_secs(Self::PING_SLEEP_IN_SECS)).await;
199 if self_clone.router().is_connected(&peer_ip) {
201 self_clone.send_ping(peer_ip, None);
203 }
204 });
205 true
206 }
207
208 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
210 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
211 false
212 }
213
214 fn puzzle_response(&self, peer_ip: SocketAddr, epoch_hash: N::BlockHash, header: Header<N>) -> bool {
216 let block_height = header.height();
218
219 info!(
220 "Puzzle (Block {block_height}, Coinbase Target {}, Proof Target {})",
221 header.coinbase_target(),
222 header.proof_target()
223 );
224
225 self.latest_epoch_hash.write().replace(epoch_hash);
227 self.latest_block_header.write().replace(header);
229
230 trace!("Received 'PuzzleResponse' from '{peer_ip}' (Block {block_height})");
231 true
232 }
233
234 async fn unconfirmed_solution(
236 &self,
237 peer_ip: SocketAddr,
238 serialized: UnconfirmedSolution<N>,
239 solution: Solution<N>,
240 ) -> bool {
241 let epoch_hash = *self.latest_epoch_hash.read();
243 let proof_target = self.latest_block_header.read().as_ref().map(|header| header.proof_target());
245
246 if let (Some(epoch_hash), Some(proof_target)) = (epoch_hash, proof_target) {
247 let puzzle = self.puzzle.clone();
249 let is_valid =
250 tokio::task::spawn_blocking(move || puzzle.check_solution(&solution, epoch_hash, proof_target)).await;
251
252 match is_valid {
253 Ok(Ok(())) => {
255 let message = Message::UnconfirmedSolution(serialized);
256 self.propagate(message, &[peer_ip]);
258 }
259 Ok(Err(_)) => {
260 trace!("Invalid solution '{}' for the proof target.", solution.id())
261 }
262 Err(error) => {
264 if let Some(height) = self.latest_block_header.read().as_ref().map(|header| header.height()) {
265 if height % N::NUM_BLOCKS_PER_EPOCH > 10 {
266 warn!("Failed to verify the solution - {error}")
267 }
268 }
269 }
270 }
271 }
272 true
273 }
274
275 async fn unconfirmed_transaction(
277 &self,
278 _peer_ip: SocketAddr,
279 _serialized: UnconfirmedTransaction<N>,
280 _transaction: Transaction<N>,
281 ) -> bool {
282 true
283 }
284}