1use super::*;
17use snarkos_node_network::PeerPoolHandling;
18use snarkos_node_router::messages::{
19 BlockRequest,
20 BlockResponse,
21 DataBlocks,
22 DisconnectReason,
23 Message,
24 MessageCodec,
25 Ping,
26 Pong,
27 UnconfirmedTransaction,
28};
29use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
30use snarkvm::{
31 console::network::{ConsensusVersion, Network},
32 ledger::{block::Transaction, narwhal::Data},
33 utilities::{error, log_error},
34};
35
36use std::{io, net::SocketAddr};
37
38impl<N: Network, C: ConsensusStorage<N>> P2P for Validator<N, C> {
39 fn tcp(&self) -> &Tcp {
41 self.router.tcp()
42 }
43}
44
45#[async_trait]
46impl<N: Network, C: ConsensusStorage<N>> Handshake for Validator<N, C> {
47 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
49 let peer_addr = connection.addr();
51 let conn_side = connection.side();
52 let stream = self.borrow_stream(&mut connection);
53 let genesis_header = self.ledger.get_header(0).map_err(|e| error(format!("{e}")))?;
54 let restrictions_id = self.ledger.vm().restrictions().restrictions_id();
55 self.router.handshake(peer_addr, stream, conn_side, genesis_header, restrictions_id).await?;
56
57 Ok(connection)
58 }
59}
60
61#[async_trait]
62impl<N: Network, C: ConsensusStorage<N>> OnConnect for Validator<N, C>
63where
64 Self: Outbound<N>,
65{
66 async fn on_connect(&self, peer_addr: SocketAddr) {
67 if let Some(listener_addr) = self.router().resolve_to_listener(peer_addr) {
69 if let Some(peer) = self.router().get_connected_peer(listener_addr) {
70 if peer.node_type != NodeType::BootstrapClient {
71 self.ping.on_peer_connected(listener_addr);
73 }
74 }
75 }
76 }
77}
78
79#[async_trait]
80impl<N: Network, C: ConsensusStorage<N>> Disconnect for Validator<N, C> {
81 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
83 if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) {
84 self.sync.remove_peer(&peer_ip);
85 self.router.downgrade_peer_to_candidate(peer_ip);
86 self.router.cache().clear_peer_entries(peer_ip);
88 #[cfg(feature = "metrics")]
89 self.router.update_metrics();
90 }
91 }
92}
93
94#[async_trait]
95impl<N: Network, C: ConsensusStorage<N>> Reading for Validator<N, C> {
96 type Codec = MessageCodec<N>;
97 type Message = Message<N>;
98
99 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
102 Default::default()
103 }
104
105 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
107 let clone = self.clone();
108 if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
109 tokio::spawn(async move {
112 clone.process_message_inner(peer_addr, message).await;
113 });
114 } else {
115 self.process_message_inner(peer_addr, message).await;
116 }
117 Ok(())
118 }
119}
120
121impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
122 async fn process_message_inner(
123 &self,
124 peer_addr: SocketAddr,
125 message: <Validator<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
126 ) {
127 if let Err(error) = self.inbound(peer_addr, message).await {
129 warn!("Failed to process inbound message from '{peer_addr}' - {error}");
130 if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) {
131 warn!("Disconnecting from '{peer_ip}' for protocol violation");
132 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into()));
133 self.router().disconnect(peer_ip);
135 }
136 }
137 }
138}
139
140#[async_trait]
141impl<N: Network, C: ConsensusStorage<N>> Routing<N> for Validator<N, C> {}
142
143impl<N: Network, C: ConsensusStorage<N>> Heartbeat<N> for Validator<N, C> {}
144
145impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Validator<N, C> {
146 fn router(&self) -> &Router<N> {
148 &self.router
149 }
150
151 fn is_block_synced(&self) -> bool {
153 self.sync.is_block_synced()
154 }
155
156 fn num_blocks_behind(&self) -> Option<u32> {
159 self.sync.num_blocks_behind()
160 }
161
162 fn get_sync_speed(&self) -> f64 {
164 self.sync.get_sync_speed()
165 }
166}
167
168#[async_trait]
169impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Validator<N, C> {
170 fn is_valid_message_version(&self, message_version: u32) -> bool {
172 self.router().is_valid_message_version(message_version)
173 }
174
175 fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool {
177 let BlockRequest { start_height, end_height } = &message;
178
179 let latest_consensus_version = match N::CONSENSUS_VERSION(end_height.saturating_sub(1)) {
181 Ok(version) => version,
182 Err(err) => {
183 log_error(err.context("Failed to retrieve consensus version"));
184 return false;
185 }
186 };
187
188 let blocks = match self.ledger.get_blocks(*start_height..*end_height) {
190 Ok(blocks) => DataBlocks(blocks),
191 Err(error) => {
192 error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}");
193 return false;
194 }
195 };
196 self.router()
198 .send(peer_ip, Message::BlockResponse(BlockResponse::new(message, blocks, latest_consensus_version)));
199 true
200 }
201
202 fn block_response(
204 &self,
205 peer_ip: SocketAddr,
206 _blocks: Vec<Block<N>>,
207 _latest_consensus_version: Option<ConsensusVersion>,
208 ) -> bool {
209 warn!("Received a block response through P2P, not BFT, from {peer_ip}");
210 false
211 }
212
213 fn ping(&self, peer_ip: SocketAddr, _message: Ping<N>) -> bool {
215 self.router().send(peer_ip, Message::Pong(Pong { is_fork: Some(false) }));
220 true
221 }
222
223 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool {
225 self.ping.on_pong_received(peer_ip);
226 true
227 }
228
229 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool {
231 let epoch_hash = match self.ledger.latest_epoch_hash() {
233 Ok(epoch_hash) => epoch_hash,
234 Err(error) => {
235 error!("Failed to prepare a puzzle request for '{peer_ip}': {error}");
236 return false;
237 }
238 };
239 let block_header = Data::Object(self.ledger.latest_header());
241 self.router().send(peer_ip, Message::PuzzleResponse(PuzzleResponse { epoch_hash, block_header }));
243 true
244 }
245
246 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
248 debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
249 false
250 }
251
252 async fn unconfirmed_solution(
254 &self,
255 peer_ip: SocketAddr,
256 serialized: UnconfirmedSolution<N>,
257 solution: Solution<N>,
258 ) -> bool {
259 if let Err(error) = self.consensus.add_unconfirmed_solution(solution).await {
261 trace!("[UnconfirmedSolution] {error}");
262 return true; }
264 let message = Message::UnconfirmedSolution(serialized);
265 self.propagate_to_validators(message, &[peer_ip]);
267 true
268 }
269
270 async fn unconfirmed_transaction(
272 &self,
273 peer_ip: SocketAddr,
274 serialized: UnconfirmedTransaction<N>,
275 transaction: Transaction<N>,
276 ) -> bool {
277 if let Err(error) = self.consensus.add_unconfirmed_transaction(transaction).await {
279 trace!("[UnconfirmedTransaction] {error}");
280 return true; }
282 let message = Message::UnconfirmedTransaction(serialized);
283 self.propagate_to_validators(message, &[peer_ip]);
285 true
286 }
287}