snarkos_node/bootstrap_client/
network.rs1use crate::{
17 BootstrapClient,
18 bft::{
19 MAX_VALIDATORS_TO_SEND,
20 events::{self, Event},
21 },
22 bootstrap_client::codec::BootstrapClientCodec,
23 router::{
24 MAX_PEERS_TO_SEND,
25 Peer,
26 PeerPoolHandling,
27 Resolver,
28 messages::{self, Message},
29 },
30 tcp::{ConnectionSide, P2P, Tcp, protocols::*},
31};
32use snarkvm::prelude::Network;
33
34use indexmap::IndexMap;
35#[cfg(feature = "locktick")]
36use locktick::parking_lot::RwLock;
37#[cfg(not(feature = "locktick"))]
38use parking_lot::RwLock;
39use std::{collections::HashMap, io, net::SocketAddr};
40use tokio::time::sleep;
41use tokio_util::codec::Decoder;
42
43impl<N: Network> P2P for BootstrapClient<N> {
44 fn tcp(&self) -> &Tcp {
45 &self.tcp
46 }
47}
48
49impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> {
50 const MAXIMUM_POOL_SIZE: usize = 10_000;
51 const OWNER: &'static str = "[Network]";
52 const PEER_SLASHING_COUNT: usize = 200;
53
54 fn is_dev(&self) -> bool {
55 self.dev.is_some()
56 }
57
58 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
59 &self.peer_pool
60 }
61
62 fn resolver(&self) -> &RwLock<Resolver<N>> {
63 &self.resolver
64 }
65}
66
67#[derive(Debug)]
69pub enum MessageOrEvent<N: Network> {
70 Message(Message<N>),
71 Event(Event<N>),
72}
73
74#[async_trait]
75impl<N: Network> OnConnect for BootstrapClient<N> {
76 async fn on_connect(&self, peer_addr: SocketAddr) {
77 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
80 if let Some(peer) = self.get_connected_peer(listener_addr) {
81 if self.resolver().read().get_peer_ip_for_address(peer.aleo_addr).is_some() {
82 self.known_validators.write().insert(listener_addr, peer.aleo_addr);
83 }
84 }
85 }
86 let tcp = self.tcp().clone();
89 tokio::spawn(async move {
90 sleep(Self::CONNECTION_LIFETIME).await;
91 tcp.disconnect(peer_addr).await;
92 });
93 }
94}
95
96#[async_trait]
97impl<N: Network> Disconnect for BootstrapClient<N> {
98 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
100 if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
101 self.downgrade_peer_to_candidate(listener_addr);
102 }
103 }
104}
105
106#[async_trait]
107impl<N: Network> Reading for BootstrapClient<N> {
108 type Codec = BootstrapClientCodec<N>;
109 type Message = <BootstrapClientCodec<N> as Decoder>::Item;
110
111 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
114 Default::default()
115 }
116
117 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
119 let Some(listener_addr) = self.resolve_to_listener(peer_addr) else {
121 return Ok(());
123 };
124
125 match message {
127 MessageOrEvent::Message(Message::PeerRequest(_)) => {
128 debug!("Received a PeerRequest from '{listener_addr}'");
129 let mut peers = self.get_candidate_peers();
130
131 let known_validators = self.get_known_validators();
133 peers.retain(|peer| !known_validators.contains_key(&peer.listener_addr));
134 peers.truncate(MAX_PEERS_TO_SEND);
135 let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>();
136
137 debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len());
138 let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers }));
139 if let Err(err) = self.unicast(peer_addr, msg)?.await {
140 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
141 } else {
142 debug!("Disconnecting from '{listener_addr}' - peers provided");
143 }
144
145 self.tcp().disconnect(peer_addr).await;
146 }
147 MessageOrEvent::Event(Event::ValidatorsRequest(_)) => {
148 debug!("Received a ValidatorsRequest from '{listener_addr}'");
149 let current_committee = match self.get_or_update_committee().await {
150 Ok(new_committee) => new_committee,
151 Err(error) => {
152 error!("Couldn't update the validator committee: {error}");
153 None
154 }
155 };
156
157 let mut known_validators = self.get_known_validators();
160 if let Some(committee) = ¤t_committee {
161 known_validators.retain(|_, aleo_addr| committee.contains(aleo_addr));
162 }
163 let validators = known_validators.into_iter().take(MAX_VALIDATORS_TO_SEND).collect::<IndexMap<_, _>>();
164
165 debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len());
166 let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators }));
167 if let Err(err) = self.unicast(peer_addr, msg)?.await {
168 warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
169 } else {
170 debug!("Disconnecting from '{listener_addr}' - peers provided");
171 }
172
173 self.tcp().disconnect(peer_addr).await;
174 }
175 msg => {
176 let name = match msg {
177 MessageOrEvent::Message(msg) => msg.name(),
178 MessageOrEvent::Event(msg) => msg.name(),
179 };
180 trace!("Ignoring an unhandled message ({name}) from {listener_addr}");
181 }
182 }
183
184 Ok(())
185 }
186}
187
188#[async_trait]
189impl<N: Network> Writing for BootstrapClient<N> {
190 type Codec = BootstrapClientCodec<N>;
191 type Message = MessageOrEvent<N>;
192
193 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
196 Default::default()
197 }
198}