snarkos_node/bootstrap_client/
network.rs1use crate::{
17 BootstrapClient,
18 bft::{
19 MAX_VALIDATORS_TO_SEND,
20 events::{self, Event},
21 },
22 bootstrap_client::codec::BootstrapClientCodec,
23 network::{ConnectionMode, NodeType, Peer, PeerPoolHandling, Resolver},
24 router::{
25 MAX_PEERS_TO_SEND,
26 messages::{self, Message},
27 },
28 tcp::{ConnectionSide, P2P, Tcp, protocols::*},
29};
30use snarkvm::prelude::Network;
31
32use indexmap::IndexMap;
33#[cfg(feature = "locktick")]
34use locktick::parking_lot::RwLock;
35#[cfg(not(feature = "locktick"))]
36use parking_lot::RwLock;
37use std::{collections::HashMap, io, net::SocketAddr};
38use tokio::time::sleep;
39use tokio_util::codec::Decoder;
40
41impl<N: Network> P2P for BootstrapClient<N> {
42 fn tcp(&self) -> &Tcp {
43 &self.tcp
44 }
45}
46
47impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> {
48 const MAXIMUM_POOL_SIZE: usize = 10_000;
49 const OWNER: &'static str = "[Network]";
50 const PEER_SLASHING_COUNT: usize = 200;
51
52 fn is_dev(&self) -> bool {
53 self.dev.is_some()
54 }
55
56 fn trusted_peers_only(&self) -> bool {
57 false
58 }
59
60 fn node_type(&self) -> NodeType {
61 NodeType::BootstrapClient
62 }
63
64 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
65 &self.peer_pool
66 }
67
68 fn resolver(&self) -> &RwLock<Resolver<N>> {
69 &self.resolver
70 }
71}
72
73#[derive(Debug)]
75pub enum MessageOrEvent<N: Network> {
76 Message(Message<N>),
77 Event(Event<N>),
78}
79
80#[async_trait]
81impl<N: Network> OnConnect for BootstrapClient<N> {
82 async fn on_connect(&self, peer_addr: SocketAddr) {
83 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
86 if let Some(peer) = self.get_connected_peer(listener_addr) {
87 if peer.node_type == NodeType::Validator {
88 self.known_validators.write().insert(listener_addr, (peer.aleo_addr, peer.connection_mode));
89 }
90 }
91 }
92 let tcp = self.tcp().clone();
95 tokio::spawn(async move {
96 sleep(Self::CONNECTION_LIFETIME).await;
97 tcp.disconnect(peer_addr).await;
98 });
99 }
100}
101
102#[async_trait]
103impl<N: Network> Disconnect for BootstrapClient<N> {
104 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
106 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
107 self.downgrade_peer_to_candidate(listener_addr);
108 }
109 }
110}
111
112#[async_trait]
113impl<N: Network> Reading for BootstrapClient<N> {
114 type Codec = BootstrapClientCodec<N>;
115 type Message = <BootstrapClientCodec<N> as Decoder>::Item;
116
117 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
120 Default::default()
121 }
122
123 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
125 let Some(listener_addr) = self.resolve_to_listener(peer_addr) else {
127 return Ok(());
129 };
130
131 match message {
133 MessageOrEvent::Message(Message::PeerRequest(_)) => {
134 debug!("Received a PeerRequest from '{listener_addr}'");
135 let mut peers = self.get_candidate_peers();
136
137 let Some(peer) = self.get_connected_peer(listener_addr) else {
140 return Ok(());
141 };
142 let validators = self.get_validator_addrs().await;
143
144 if peer.node_type == NodeType::Validator {
145 peers.retain(|peer| {
147 validators
148 .get(&peer.listener_addr)
149 .map(|(_, connection_mode)| *connection_mode != ConnectionMode::Gateway)
150 .unwrap_or(true)
151 });
152 } else {
153 peers.retain(|peer| !validators.contains_key(&peer.listener_addr));
155 }
156 peers.truncate(MAX_PEERS_TO_SEND);
157 let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>();
158
159 debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len());
160 let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers }));
161 if let Err(err) = self.unicast(peer_addr, msg)?.await {
162 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
163 } else {
164 debug!("Disconnecting from '{listener_addr}' - peers provided");
165 }
166
167 self.tcp().disconnect(peer_addr).await;
168 }
169 MessageOrEvent::Event(Event::ValidatorsRequest(_)) => {
170 debug!("Received a ValidatorsRequest from '{listener_addr}'");
171
172 let validators = self.get_validator_addrs().await;
174 let validators = validators
175 .into_iter()
176 .filter_map(|(listener_addr, (aleo_addr, connection_mode))| {
177 (connection_mode == ConnectionMode::Gateway).then_some((listener_addr, aleo_addr))
179 })
180 .take(MAX_VALIDATORS_TO_SEND)
181 .collect::<IndexMap<_, _>>();
182
183 debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len());
184 let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators }));
185 if let Err(err) = self.unicast(peer_addr, msg)?.await {
186 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
187 } else {
188 debug!("Disconnecting from '{listener_addr}' - peers provided");
189 }
190
191 self.tcp().disconnect(peer_addr).await;
192 }
193 msg => {
194 let name = match msg {
195 MessageOrEvent::Message(msg) => msg.name(),
196 MessageOrEvent::Event(msg) => msg.name(),
197 };
198 trace!("Ignoring an unhandled message ({name}) from {listener_addr}");
199 }
200 }
201
202 Ok(())
203 }
204}
205
206#[async_trait]
207impl<N: Network> Writing for BootstrapClient<N> {
208 type Codec = BootstrapClientCodec<N>;
209 type Message = MessageOrEvent<N>;
210
211 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
214 Default::default()
215 }
216}