snarkos_node/bootstrap_client/
network.rs1use crate::{
17 BootstrapClient,
18 bft::{
19 MAX_VALIDATORS_TO_SEND,
20 events::{self, Event},
21 },
22 bootstrap_client::codec::BootstrapClientCodec,
23 network::{ConnectionMode, NodeType, Peer, PeerPoolHandling, Resolver},
24 router::{
25 MAX_PEERS_TO_SEND,
26 messages::{self, Message},
27 },
28 tcp::{ConnectionSide, P2P, Tcp, protocols::*},
29};
30use snarkvm::prelude::Network;
31
32use indexmap::IndexMap;
33#[cfg(feature = "locktick")]
34use locktick::parking_lot::RwLock;
35#[cfg(not(feature = "locktick"))]
36use parking_lot::RwLock;
37use std::{collections::HashMap, io, net::SocketAddr};
38use tokio::time::sleep;
39use tokio_util::codec::Decoder;
40
41impl<N: Network> P2P for BootstrapClient<N> {
42 fn tcp(&self) -> &Tcp {
43 &self.tcp
44 }
45}
46
47impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> {
48 const MAXIMUM_POOL_SIZE: usize = 10_000;
49 const OWNER: &'static str = "[Network]";
50 const PEER_SLASHING_COUNT: usize = 200;
51
52 fn is_dev(&self) -> bool {
53 self.dev.is_some()
54 }
55
56 fn trusted_peers_only(&self) -> bool {
57 false
58 }
59
60 fn node_type(&self) -> NodeType {
61 NodeType::BootstrapClient
62 }
63
64 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
65 &self.peer_pool
66 }
67
68 fn resolver(&self) -> &RwLock<Resolver<N>> {
69 &self.resolver
70 }
71}
72
73#[derive(Debug)]
75pub enum MessageOrEvent<N: Network> {
76 Message(Message<N>),
77 Event(Event<N>),
78}
79
80#[async_trait]
81impl<N: Network> OnConnect for BootstrapClient<N> {
82 async fn on_connect(&self, peer_addr: SocketAddr) {
83 if let Some(listener_addr) = self.resolve_to_listener(peer_addr)
86 && let Some(peer) = self.get_connected_peer(listener_addr)
87 && peer.node_type == NodeType::Validator
88 {
89 self.known_validators.write().insert(listener_addr, (peer.aleo_addr, peer.connection_mode));
90 }
91 let tcp = self.tcp().clone();
94 tokio::spawn(async move {
95 sleep(Self::CONNECTION_LIFETIME).await;
96 tcp.disconnect(peer_addr).await;
97 });
98 }
99}
100
101#[async_trait]
102impl<N: Network> Disconnect for BootstrapClient<N> {
103 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
105 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
106 self.downgrade_peer_to_candidate(listener_addr);
107 }
108 }
109}
110
111#[async_trait]
112impl<N: Network> Reading for BootstrapClient<N> {
113 type Codec = BootstrapClientCodec<N>;
114 type Message = <BootstrapClientCodec<N> as Decoder>::Item;
115
116 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
119 Default::default()
120 }
121
122 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
124 let Some(listener_addr) = self.resolve_to_listener(peer_addr) else {
126 return Ok(());
128 };
129
130 match message {
132 MessageOrEvent::Message(Message::PeerRequest(_)) => {
133 debug!("Received a PeerRequest from '{listener_addr}'");
134 let mut peers = self.get_candidate_peers();
135
136 let Some(peer) = self.get_connected_peer(listener_addr) else {
139 return Ok(());
140 };
141 let validators = self.get_validator_addrs().await;
142
143 if peer.node_type == NodeType::Validator {
144 peers.retain(|peer| {
146 validators
147 .get(&peer.listener_addr)
148 .map(|(_, connection_mode)| *connection_mode != ConnectionMode::Gateway)
149 .unwrap_or(true)
150 });
151 } else {
152 peers.retain(|peer| !validators.contains_key(&peer.listener_addr));
154 }
155 peers.truncate(MAX_PEERS_TO_SEND);
156 let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>();
157
158 debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len());
159 let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers }));
160 if let Err(err) = self.unicast(peer_addr, msg)?.await {
161 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
162 } else {
163 debug!("Disconnecting from '{listener_addr}' - peers provided");
164 }
165
166 self.tcp().disconnect(peer_addr).await;
167 }
168 MessageOrEvent::Event(Event::ValidatorsRequest(_)) => {
169 debug!("Received a ValidatorsRequest from '{listener_addr}'");
170
171 let validators = self.get_validator_addrs().await;
173 let validators = validators
174 .into_iter()
175 .filter_map(|(listener_addr, (aleo_addr, connection_mode))| {
176 (connection_mode == ConnectionMode::Gateway).then_some((listener_addr, aleo_addr))
178 })
179 .take(MAX_VALIDATORS_TO_SEND)
180 .collect::<IndexMap<_, _>>();
181
182 debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len());
183 let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators }));
184 if let Err(err) = self.unicast(peer_addr, msg)?.await {
185 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
186 } else {
187 debug!("Disconnecting from '{listener_addr}' - peers provided");
188 }
189
190 self.tcp().disconnect(peer_addr).await;
191 }
192 msg => {
193 let name = match msg {
194 MessageOrEvent::Message(msg) => msg.name(),
195 MessageOrEvent::Event(msg) => msg.name(),
196 };
197 trace!("Ignoring an unhandled message ({name}) from {listener_addr}");
198 }
199 }
200
201 Ok(())
202 }
203}
204
205#[async_trait]
206impl<N: Network> Writing for BootstrapClient<N> {
207 type Codec = BootstrapClientCodec<N>;
208 type Message = MessageOrEvent<N>;
209
210 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
213 Default::default()
214 }
215}