1use super::*;
17
18use snarkos_node_router::messages::{
19 BlockRequest,
20 DisconnectReason,
21 Message,
22 MessageCodec,
23 Ping,
24 Pong,
25 PuzzleRequest,
26 UnconfirmedTransaction,
27};
28use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
29use snarkvm::prelude::{Field, Network, Zero, block::Transaction};
30
31use std::{io, net::SocketAddr};
32
33impl<N: Network, C: ConsensusStorage<N>> P2P for Prover<N, C> {
34 fn tcp(&self) -> &Tcp {
36 self.router.tcp()
37 }
38}
39
40#[async_trait]
41impl<N: Network, C: ConsensusStorage<N>> Handshake for Prover<N, C> {
42 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
44 let peer_addr = connection.addr();
46 let conn_side = connection.side();
47 let stream = self.borrow_stream(&mut connection);
48 let genesis_header = *self.genesis.header();
49 let restrictions_id = Field::zero(); self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
51
52 Ok(connection)
53 }
54}
55
56#[async_trait]
57impl<N: Network, C: ConsensusStorage<N>> OnConnect for Prover<N, C>
58where
59 Self: Outbound<N>,
60{
61 async fn on_connect(&self, peer_addr: SocketAddr) {
62 let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
64 self.router().insert_connected_peer(peer_ip);
66 self.send_ping(peer_ip, None);
68 }
69}
70
71#[async_trait]
72impl<N: Network, C: ConsensusStorage<N>> Disconnect for Prover<N, C> {
73 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
75 if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) {
76 self.sync.remove_peer(&peer_ip);
77 self.router.remove_connected_peer(peer_ip);
78 }
79 }
80}
81
82#[async_trait]
83impl<N: Network, C: ConsensusStorage<N>> Writing for Prover<N, C> {
84 type Codec = MessageCodec<N>;
85 type Message = Message<N>;
86
87 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
90 Default::default()
91 }
92}
93
94#[async_trait]
95impl<N: Network, C: ConsensusStorage<N>> Reading for Prover<N, C> {
96 type Codec = MessageCodec<N>;
97 type Message = Message<N>;
98
99 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
102 Default::default()
103 }
104
105 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
107 if let Err(error) = self.inbound(peer_addr, message).await {
109 if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
110 warn!("Disconnecting from '{peer_addr}' - {error}");
111 Outbound::send(self, peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
112 self.router().disconnect(peer_ip);
114 }
115 }
116 Ok(())
117 }
118}
119
120#[async_trait]
121impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Prover<N, C> {}
122
123impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Prover<N, C> {
124 fn handle_puzzle_request(&self) {
126 if let Some((sync_peers, _)) = self.sync.find_sync_peers() {
128 if let Some((peer_ip, _)) = sync_peers.into_iter().max_by_key(|(_, height)| *height) {
130 Outbound::send(self, peer_ip, Message::PuzzleRequest(PuzzleRequest));
132 }
133 }
134 }
135}
136
137impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
138 fn router(&self) -> &Router<N> {
140 &self.router
141 }
142
143 fn is_block_synced(&self) -> bool {
145 true
146 }
147
148 fn num_blocks_behind(&self) -> u32 {
150 0
151 }
152}
153
154#[async_trait]
155impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Prover<N, C> {
156 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool {
158 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
159 false
160 }
161
162 fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool {
164 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
165 false
166 }
167
168 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool {
170 if self.sync.mode().is_router() {
172 if let Some(block_locators) = message.block_locators {
174 if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) {
176 warn!("Peer '{peer_ip}' sent invalid block locators: {error}");
177 return false;
178 }
179 }
180 }
181
182 Outbound::send(self, peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
184 true
185 }
186
187 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
189 let self_clone = self.clone();
191 tokio::spawn(async move {
192 tokio::time::sleep(Duration::from_secs(Self::PING_SLEEP_IN_SECS)).await;
194 if self_clone.router().is_connected(&peer_ip) {
196 self_clone.send_ping(peer_ip, None);
198 }
199 });
200 true
201 }
202
203 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
205 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
206 false
207 }
208
209 fn puzzle_response(&self, peer_ip: SocketAddr, epoch_hash: N::BlockHash, header: Header<N>) -> bool {
211 let block_height = header.height();
213
214 info!(
215 "Puzzle (Block {block_height}, Coinbase Target {}, Proof Target {})",
216 header.coinbase_target(),
217 header.proof_target()
218 );
219
220 self.latest_epoch_hash.write().replace(epoch_hash);
222 self.latest_block_header.write().replace(header);
224
225 trace!("Received 'PuzzleResponse' from '{peer_ip}' (Block {block_height})");
226 true
227 }
228
229 async fn unconfirmed_solution(
231 &self,
232 peer_ip: SocketAddr,
233 serialized: UnconfirmedSolution<N>,
234 solution: Solution<N>,
235 ) -> bool {
236 let epoch_hash = *self.latest_epoch_hash.read();
238 let proof_target = self.latest_block_header.read().as_ref().map(|header| header.proof_target());
240
241 if let (Some(epoch_hash), Some(proof_target)) = (epoch_hash, proof_target) {
242 let puzzle = self.puzzle.clone();
244 let is_valid =
245 tokio::task::spawn_blocking(move || puzzle.check_solution(&solution, epoch_hash, proof_target)).await;
246
247 match is_valid {
248 Ok(Ok(())) => {
250 let message = Message::UnconfirmedSolution(serialized);
251 self.propagate(message, &[peer_ip]);
253 }
254 Ok(Err(_)) => {
255 trace!("Invalid solution '{}' for the proof target.", solution.id())
256 }
257 Err(error) => {
259 if let Some(height) = self.latest_block_header.read().as_ref().map(|header| header.height()) {
260 if height % N::NUM_BLOCKS_PER_EPOCH > 10 {
261 warn!("Failed to verify the solution - {error}")
262 }
263 }
264 }
265 }
266 }
267 true
268 }
269
270 async fn unconfirmed_transaction(
272 &self,
273 _peer_ip: SocketAddr,
274 _serialized: UnconfirmedTransaction<N>,
275 _transaction: Transaction<N>,
276 ) -> bool {
277 true
278 }
279}