1#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19 CONTEXT,
20 MAX_BATCH_DELAY_IN_MS,
21 MEMORY_POOL_PORT,
22 Worker,
23 events::{Disconnect as DisconnectEvent, DisconnectReason, EventCodec, PrimaryPing},
24 helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
25 spawn_blocking,
26};
27use snarkos_account::Account;
28use snarkos_node_bft_events::{
29 BlockRequest,
30 BlockResponse,
31 CertificateRequest,
32 CertificateResponse,
33 ChallengeRequest,
34 ChallengeResponse,
35 DataBlocks,
36 Event,
37 EventTrait,
38 TransmissionRequest,
39 TransmissionResponse,
40 ValidatorsRequest,
41 ValidatorsResponse,
42};
43use snarkos_node_bft_ledger_service::LedgerService;
44use snarkos_node_network::{
45 ConnectionMode,
46 NodeType,
47 Peer,
48 PeerPoolHandling,
49 Resolver,
50 bootstrap_peers,
51 get_repo_commit_hash,
52 log_repo_sha_comparison,
53};
54use snarkos_node_sync::{InsertBlockResponseError, MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
55use snarkos_node_tcp::{
56 Config,
57 Connection,
58 ConnectionSide,
59 P2P,
60 Tcp,
61 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
62};
63use snarkos_utilities::NodeDataDir;
64use snarkvm::{
65 console::prelude::*,
66 ledger::{
67 committee::Committee,
68 narwhal::{BatchHeader, Data},
69 },
70 prelude::{Address, Field},
71};
72
73use colored::Colorize;
74use futures::SinkExt;
75use indexmap::IndexMap;
76#[cfg(feature = "locktick")]
77use locktick::parking_lot::{Mutex, RwLock};
78#[cfg(not(feature = "locktick"))]
79use parking_lot::{Mutex, RwLock};
80use rand::{
81 rngs::OsRng,
82 seq::{IteratorRandom, SliceRandom},
83};
84use std::{
85 collections::{HashMap, HashSet},
86 future::Future,
87 io,
88 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
89 sync::Arc,
90 time::{Duration, Instant},
91};
92use tokio::{
93 net::TcpStream,
94 sync::{OnceCell, oneshot},
95 task::{self, JoinHandle},
96};
97use tokio_stream::StreamExt;
98use tokio_util::codec::Framed;
99
100const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const MAX_CONNECTION_ATTEMPTS: usize = 10;
107const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; pub const MAX_VALIDATORS_TO_SEND: usize = 200;
112
113#[cfg(not(any(test)))]
115const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
116const IP_BAN_TIME_IN_SECS: u64 = 300;
118
119#[async_trait]
122pub trait Transport<N: Network>: Send + Sync {
123 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
124 fn broadcast(&self, event: Event<N>);
125}
126
127#[derive(Clone)]
130pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
131
132impl<N: Network> Deref for Gateway<N> {
133 type Target = Arc<InnerGateway<N>>;
134
135 fn deref(&self) -> &Self::Target {
136 &self.0
137 }
138}
139
140pub struct InnerGateway<N: Network> {
141 account: Account<N>,
143 storage: Storage<N>,
145 ledger: Arc<dyn LedgerService<N>>,
147 tcp: Tcp,
149 cache: Cache<N>,
151 resolver: RwLock<Resolver<N>>,
153 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
155 #[cfg(feature = "telemetry")]
156 validator_telemetry: Telemetry<N>,
157 primary_sender: OnceCell<PrimarySender<N>>,
159 worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
161 sync_sender: OnceCell<SyncSender<N>>,
163 handles: Mutex<Vec<JoinHandle<()>>>,
165 node_data_dir: NodeDataDir,
167 trusted_peers_only: bool,
169 dev: Option<u16>,
171}
172
173impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
174 const MAXIMUM_POOL_SIZE: usize = 200;
175 const OWNER: &str = CONTEXT;
176 const PEER_SLASHING_COUNT: usize = 20;
177
178 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
179 &self.peer_pool
180 }
181
182 fn resolver(&self) -> &RwLock<Resolver<N>> {
183 &self.resolver
184 }
185
186 fn is_dev(&self) -> bool {
187 self.dev.is_some()
188 }
189
190 fn trusted_peers_only(&self) -> bool {
191 self.trusted_peers_only
192 }
193
194 fn node_type(&self) -> NodeType {
195 NodeType::Validator
196 }
197}
198
199impl<N: Network> Gateway<N> {
200 #[allow(clippy::too_many_arguments)]
202 pub fn new(
203 account: Account<N>,
204 storage: Storage<N>,
205 ledger: Arc<dyn LedgerService<N>>,
206 ip: Option<SocketAddr>,
207 trusted_validators: &[SocketAddr],
208 trusted_peers_only: bool,
209 node_data_dir: NodeDataDir,
210 dev: Option<u16>,
211 ) -> Result<Self> {
212 let ip = match (ip, dev) {
214 (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
215 (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
216 (Some(ip), _) => ip,
217 };
218 let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
220
221 let mut initial_peers = HashMap::new();
223
224 if !trusted_peers_only {
226 let cached_peers = Self::load_cached_peers(&node_data_dir.gateway_peer_cache_path())?;
227 for addr in cached_peers {
228 initial_peers.insert(addr, Peer::new_candidate(addr, false));
229 }
230 }
231
232 initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
235
236 Ok(Self(Arc::new(InnerGateway {
238 account,
239 storage,
240 ledger,
241 tcp,
242 cache: Default::default(),
243 resolver: Default::default(),
244 peer_pool: RwLock::new(initial_peers),
245 #[cfg(feature = "telemetry")]
246 validator_telemetry: Default::default(),
247 primary_sender: Default::default(),
248 worker_senders: Default::default(),
249 sync_sender: Default::default(),
250 handles: Default::default(),
251 node_data_dir,
252 trusted_peers_only,
253 dev,
254 })))
255 }
256
257 pub async fn run(
259 &self,
260 primary_sender: PrimarySender<N>,
261 worker_senders: IndexMap<u8, WorkerSender<N>>,
262 sync_sender: Option<SyncSender<N>>,
263 ) {
264 debug!("Starting the gateway for the memory pool...");
265
266 self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
268
269 self.worker_senders.set(worker_senders).expect("The worker senders are already set");
271
272 if let Some(sync_sender) = sync_sender {
274 self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
275 }
276
277 self.enable_handshake().await;
279 self.enable_reading().await;
280 self.enable_writing().await;
281 self.enable_disconnect().await;
282 self.enable_on_connect().await;
283
284 let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
286 debug!("Listening for validator connections at address {listen_addr:?}");
287
288 self.initialize_heartbeat();
290
291 info!("Started the gateway for the memory pool at '{}'", self.local_ip());
292 }
293}
294
295impl<N: Network> Gateway<N> {
297 fn max_committee_size(&self) -> usize {
299 self.ledger.current_committee().map_or_else(
300 |_e| Committee::<N>::max_committee_size().unwrap() as usize,
301 |committee| committee.num_members(),
302 )
303 }
304
305 fn max_cache_events(&self) -> usize {
307 self.max_cache_transmissions()
308 }
309
310 fn max_cache_certificates(&self) -> usize {
312 2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
313 }
314
315 fn max_cache_transmissions(&self) -> usize {
317 self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
318 }
319
320 fn max_cache_duplicates(&self) -> usize {
322 self.max_committee_size().pow(2)
323 }
324}
325
326#[async_trait]
327impl<N: Network> CommunicationService for Gateway<N> {
328 type Message = Event<N>;
330
331 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
333 debug_assert!(start_height < end_height, "Invalid block request format");
334 Event::BlockRequest(BlockRequest { start_height, end_height })
335 }
336
337 async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
343 Transport::send(self, peer_ip, message).await
344 }
345}
346
347impl<N: Network> Gateway<N> {
348 pub fn account(&self) -> &Account<N> {
350 &self.account
351 }
352
353 pub fn dev(&self) -> Option<u16> {
355 self.dev
356 }
357
358 pub fn resolver(&self) -> &RwLock<Resolver<N>> {
360 &self.resolver
361 }
362
363 pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
365 self.resolver.read().get_listener(*connected_addr)
366 }
367
368 #[cfg(feature = "telemetry")]
370 pub fn validator_telemetry(&self) -> &Telemetry<N> {
371 &self.validator_telemetry
372 }
373
374 pub fn primary_sender(&self) -> &PrimarySender<N> {
376 self.primary_sender.get().expect("Primary sender not set in gateway")
377 }
378
379 pub fn num_workers(&self) -> u8 {
381 u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
382 .expect("Too many workers")
383 }
384
385 pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
387 self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
388 }
389
390 pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
392 if self.trusted_peers().contains(&ip) {
394 return true;
395 }
396 match self.resolve_to_aleo_addr(ip) {
398 Some(address) => self.is_authorized_validator_address(address),
400 None => false,
401 }
402 }
403
404 pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
406 if self
415 .ledger
416 .get_committee_lookback_for_round(self.storage.current_round())
417 .is_ok_and(|committee| committee.is_committee_member(validator_address))
418 {
419 return true;
420 }
421
422 if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
424 return true;
425 }
426
427 let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
429 match self.ledger.get_block_round(previous_block_height) {
431 Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
432 self.ledger
433 .get_committee_lookback_for_round(round)
434 .is_ok_and(|committee| committee.is_committee_member(validator_address))
435 }),
436 Err(_) => false,
437 }
438 }
439
440 pub fn connected_addresses(&self) -> HashSet<Address<N>> {
442 self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect()
443 }
444
445 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
447 if self.is_local_ip(listener_addr) {
449 bail!("{CONTEXT} Dropping connection request from '{listener_addr}' (attempted to self-connect)");
450 }
451 if !listener_addr.ip().is_loopback() {
453 let num_attempts = self.cache.insert_inbound_connection(listener_addr.ip(), RESTRICTED_INTERVAL);
455 if num_attempts > MAX_CONNECTION_ATTEMPTS {
457 bail!("Dropping connection request from '{listener_addr}' (tried {num_attempts} times)");
458 }
459 }
460 Ok(())
461 }
462
463 #[cfg(feature = "metrics")]
464 fn update_metrics(&self) {
465 metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64);
466 metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64);
467 }
468
469 #[cfg(test)]
471 pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
472 self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
474 self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
476 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
477 peer.upgrade_to_connected(
478 peer_addr,
479 peer_ip.port(),
480 address,
481 NodeType::Validator,
482 0,
483 ConnectionMode::Gateway,
484 );
485 }
486 }
487
488 fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
494 let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
496 warn!("Unable to resolve the listener IP address '{peer_ip}'");
497 return None;
498 };
499 let name = event.name();
501 trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
503 let result = self.unicast(peer_addr, event);
504 if let Err(e) = &result {
506 warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
507 debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
508 self.disconnect(peer_ip);
509 }
510 result.ok()
511 }
512
513 async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
517 let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
519 trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
521 return Ok(false);
522 };
523 if !(self.is_authorized_validator_ip(peer_ip)
525 || self
526 .get_connected_peer(peer_ip)
527 .map(|peer| peer.node_type == NodeType::BootstrapClient)
528 .unwrap_or(false))
529 {
530 bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
531 }
532 let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
534 if num_events >= self.max_cache_events() {
535 bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
536 }
537 match event {
539 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
540 let certificate_id = match &event {
542 Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
543 Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
544 _ => unreachable!(),
545 };
546 let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
548 if num_events >= self.max_cache_duplicates() {
549 return Ok(true);
550 }
551 }
552 Event::TransmissionRequest(TransmissionRequest { transmission_id })
553 | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
554 let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
556 if num_events >= self.max_cache_duplicates() {
557 return Ok(true);
558 }
559 }
560 Event::BlockRequest(_) => {
561 let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
562 if num_events >= self.max_cache_duplicates() {
563 return Ok(true);
564 }
565 }
566 _ => {}
567 }
568 trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
569
570 match event {
573 Event::BatchPropose(batch_propose) => {
574 let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
576 Ok(true)
577 }
578 Event::BatchSignature(batch_signature) => {
579 let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
581 Ok(true)
582 }
583 Event::BatchCertified(batch_certified) => {
584 let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
586 Ok(true)
587 }
588 Event::BlockRequest(block_request) => {
589 let BlockRequest { start_height, end_height } = block_request;
590
591 if start_height >= end_height {
593 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
594 }
595 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
597 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
598 }
599
600 let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
602
603 let self_ = self.clone();
604 let blocks = match task::spawn_blocking(move || {
605 match self_.ledger.get_blocks(start_height..end_height) {
607 Ok(blocks) => Ok(DataBlocks(blocks)),
608 Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
609 }
610 })
611 .await
612 {
613 Ok(Ok(blocks)) => blocks,
614 Ok(Err(error)) => return Err(error),
615 Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
616 };
617
618 let self_ = self.clone();
619 tokio::spawn(async move {
620 let event =
622 Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
623 Transport::send(&self_, peer_ip, event).await;
624 });
625 Ok(true)
626 }
627 Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
628 if let Some(sync_sender) = self.sync_sender.get() {
630 if !self.cache.remove_outbound_block_request(peer_ip, &request) {
632 bail!("Unsolicited block response from '{peer_ip}'")
633 }
634
635 let (send, recv) = tokio::sync::oneshot::channel();
639 rayon::spawn_fifo(move || {
640 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
641 let _ = send.send(blocks);
642 });
643 let blocks = match recv.await {
644 Ok(Ok(blocks)) => blocks,
645 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
646 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
647 };
648
649 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
651 match sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await {
653 Ok(_) => Ok(true),
654 Err(err @ InsertBlockResponseError::EmptyBlockResponse)
655 | Err(err @ InsertBlockResponseError::NoConsensusVersion)
656 | Err(err @ InsertBlockResponseError::ConsensusVersionMismatch { .. }) => {
657 error!("Peer '{peer_ip}' sent an invalid block response - {err}");
658 self.ip_ban_peer(peer_ip, Some(&err.to_string()));
659 Err(err.into())
660 }
661 Err(err) => {
662 warn!("Unable to process block response from '{peer_ip}' - {err}");
663 Err(err.into())
664 }
665 }
666 } else {
667 debug!("Ignoring block response from '{peer_ip}' - no sync sender");
668 Ok(true)
669 }
670 }
671 Event::CertificateRequest(certificate_request) => {
672 if let Some(sync_sender) = self.sync_sender.get() {
675 let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
677 }
678 Ok(true)
679 }
680 Event::CertificateResponse(certificate_response) => {
681 if let Some(sync_sender) = self.sync_sender.get() {
684 let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
686 }
687 Ok(true)
688 }
689 Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
690 bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
692 }
693 Event::Disconnect(message) => {
694 debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
696 self.disconnect(peer_ip);
697 Ok(false)
698 }
699 Event::PrimaryPing(ping) => {
700 let PrimaryPing { version, block_locators, primary_certificate } = ping;
701
702 if version < Event::<N>::VERSION {
704 bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
705 }
706
707 debug!("Validator '{peer_ip}' is at height {}", block_locators.latest_locator_height());
709
710 if let Some(sync_sender) = self.sync_sender.get() {
712 if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
714 bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
715 }
716 }
717
718 let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
720 Ok(true)
721 }
722 Event::TransmissionRequest(request) => {
723 let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
726 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
727 return Ok(true);
728 };
729 if let Some(sender) = self.get_worker_sender(worker_id) {
731 let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
733 }
734 Ok(true)
735 }
736 Event::TransmissionResponse(response) => {
737 let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
739 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
740 return Ok(true);
741 };
742 if let Some(sender) = self.get_worker_sender(worker_id) {
744 let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
746 }
747 Ok(true)
748 }
749 Event::ValidatorsRequest(_) => {
750 let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
751 connected_peers.shuffle(&mut rand::thread_rng());
752
753 let self_ = self.clone();
754 tokio::spawn(async move {
755 let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
757 for validator in connected_peers.into_iter() {
759 validators.insert(validator.listener_addr, validator.aleo_addr);
761 }
762 let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
764 Transport::send(&self_, peer_ip, event).await;
765 });
766 Ok(true)
767 }
768 Event::ValidatorsResponse(response) => {
769 if self.trusted_peers_only {
770 bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
771 }
772 let ValidatorsResponse { validators } = response;
773 ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
775 if !self.cache.contains_outbound_validators_request(peer_ip) {
777 bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
778 }
779 self.cache.decrement_outbound_validators_requests(peer_ip);
781
782 let valid_addrs = validators
785 .into_iter()
786 .filter_map(|(listener_addr, aleo_addr)| {
787 (self.account.address() != aleo_addr
788 && !self.is_connected_address(aleo_addr)
789 && self.is_authorized_validator_address(aleo_addr))
790 .then_some((listener_addr, None))
791 })
792 .collect::<Vec<_>>();
793 if !valid_addrs.is_empty() {
794 self.insert_candidate_peers(valid_addrs);
795 }
796
797 #[cfg(feature = "metrics")]
798 self.update_metrics();
799
800 Ok(true)
801 }
802 Event::WorkerPing(ping) => {
803 ensure!(
805 ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
806 "{CONTEXT} Received too many transmissions"
807 );
808 let num_workers = self.num_workers();
810 for transmission_id in ping.transmission_ids.into_iter() {
812 let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
814 warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
815 continue;
816 };
817 if let Some(sender) = self.get_worker_sender(worker_id) {
819 let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
821 }
822 }
823 Ok(true)
824 }
825 }
826 }
827
828 fn initialize_heartbeat(&self) {
830 let self_clone = self.clone();
831 self.spawn(async move {
832 let start = Instant::now();
833 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
835 info!("Starting the heartbeat of the gateway...");
836 loop {
837 let uptime = start.elapsed();
839 self_clone.heartbeat(uptime).await;
840 tokio::time::sleep(Duration::from_secs(15)).await;
842 }
843 });
844 }
845
846 #[allow(dead_code)]
848 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
849 self.handles.lock().push(tokio::spawn(future));
850 }
851
852 pub async fn shut_down(&self) {
854 info!("Shutting down the gateway...");
855 if let Err(e) = self.save_best_peers(&self.node_data_dir.gateway_peer_cache_path(), None, true) {
857 warn!("Failed to persist best validators to disk: {e}");
858 }
859 self.handles.lock().iter().for_each(|handle| handle.abort());
861 self.tcp.shut_down().await;
863 }
864}
865
866impl<N: Network> Gateway<N> {
867 const MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD: Duration = Duration::from_secs(60);
869
870 async fn heartbeat(&self, uptime: Duration) {
872 self.log_connected_validators(uptime);
874 #[cfg(feature = "telemetry")]
876 self.log_participation_scores();
877 self.handle_trusted_validators();
879 self.handle_bootstrap_peers().await;
881 self.handle_unauthorized_validators();
883 self.handle_min_connected_validators();
885 self.handle_banned_ips();
887 self.update_validator_whitelist();
889 }
890
891 fn log_connected_validators(&self, uptime: Duration) {
893 let connected_validators = self.connected_peers();
895 let committee = match self.ledger.current_committee() {
896 Ok(c) => c,
897 Err(err) => {
898 error!("Failed to get current committee: {err}");
899 return;
900 }
901 };
902
903 let validators_total = committee.num_members().saturating_sub(1);
905 let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
907 let connections_msg = match connected_validators.len() {
909 0 => "No connected validators".to_string(),
910 num_connected => format!("Connected to {num_connected} validators {total_validators}"),
911 };
912 info!("{connections_msg}");
913
914 let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
916 connected_validator_addresses.insert(self.account.address());
918 for peer_ip in &connected_validators {
920 let address = self.resolve_to_aleo_addr(*peer_ip).map_or("Unknown".to_string(), |a| {
921 connected_validator_addresses.insert(a);
922 a.to_string()
923 });
924 debug!("{}", format!(" Connected to: {peer_ip} - {address}").dimmed());
925 }
926
927 let num_not_connected = validators_total.saturating_sub(connected_validators.len());
929 if num_not_connected > 0 {
930 let total_stake = committee.total_stake();
932 let total_stake_f64 = total_stake as f64;
933
934 let committee_members: HashSet<_> =
936 self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
937
938 let not_connected_stake: u64 = committee_members
939 .difference(&connected_validator_addresses)
940 .map(|address| {
941 let address_stake = committee.get_stake(*address);
942 let address_stake_as_percentage =
943 if total_stake == 0 { 0.0 } else { address_stake as f64 / total_stake_f64 * 100.0 };
944 debug!(
945 "{}",
946 format!(" Not connected to {address} ({address_stake_as_percentage:.2}% of total stake)")
947 .dimmed()
948 );
949 address_stake
950 })
951 .sum();
952
953 let not_connected_stake_as_percentage =
954 if total_stake == 0 { 0.0 } else { not_connected_stake as f64 / total_stake_f64 * 100.0 };
955 warn!(
956 "Not connected to {num_not_connected} validators {total_validators} ({not_connected_stake_as_percentage:.2}% of total stake not connected)"
957 );
958 #[cfg(feature = "metrics")]
959 {
960 let connected_stake_as_percentage = 100.0 - not_connected_stake_as_percentage;
961 metrics::gauge(metrics::bft::CONNECTED_STAKE, connected_stake_as_percentage);
962 }
963 } else {
964 #[cfg(feature = "metrics")]
965 metrics::gauge(metrics::bft::CONNECTED_STAKE, 100.0);
966 };
967
968 if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
969 if uptime > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
971 error!("Not connected to a quorum of validators");
972 } else {
973 debug!("Not connected to a quorum of validators");
974 }
975 }
976 }
977
978 #[cfg(feature = "telemetry")]
980 fn log_participation_scores(&self) {
981 if let Ok(current_committee) = self.ledger.current_committee() {
982 let participation_scores = self.validator_telemetry().get_participation_scores(¤t_committee);
984 debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
986 for (address, score) in participation_scores {
987 debug!("{}", format!(" {address} - {score:.2}%").dimmed());
988 }
989 }
990 }
991
992 fn handle_trusted_validators(&self) {
994 for validator_ip in &self.trusted_peers() {
996 self.connect(*validator_ip);
998 }
999 }
1000
1001 async fn handle_bootstrap_peers(&self) {
1003 if self.trusted_peers_only {
1005 return;
1006 }
1007 let mut candidate_bootstrap = Vec::new();
1009 let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
1010 for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
1011 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
1012 candidate_bootstrap.push(bootstrap_ip);
1013 }
1014 }
1015 if connected_bootstrap.is_empty() {
1017 let rng = &mut OsRng;
1019 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
1021 match self.connect(peer_ip) {
1022 Some(hdl) => {
1023 let result = hdl.await;
1024 if let Err(err) = result {
1025 warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
1026 }
1027 }
1028 None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
1029 }
1030 }
1031 }
1032 let num_surplus = connected_bootstrap.len().saturating_sub(1);
1034 if num_surplus > 0 {
1035 let rng = &mut OsRng;
1037 for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
1039 info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
1040 <Self as Transport<N>>::send(
1041 self,
1042 peer.listener_addr,
1043 Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
1044 )
1045 .await;
1046 self.disconnect(peer.listener_addr);
1048 }
1049 }
1050 }
1051
1052 fn handle_unauthorized_validators(&self) {
1054 let self_ = self.clone();
1055 tokio::spawn(async move {
1056 let validators = self_.get_connected_peers();
1058 for peer in validators {
1060 if peer.node_type == NodeType::BootstrapClient {
1062 continue;
1063 }
1064 if !self_.is_authorized_validator_ip(peer.listener_addr) {
1066 warn!(
1067 "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1068 peer.listener_addr
1069 );
1070 Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1071 self_.disconnect(peer.listener_addr);
1073 }
1074 }
1075 });
1076 }
1077
1078 fn handle_min_connected_validators(&self) {
1082 let trusted_validators = self.trusted_peers();
1085 if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
1086 for peer in self.get_candidate_peers() {
1087 if !trusted_validators.contains(&peer.listener_addr) {
1088 self.connect(peer.listener_addr);
1090 }
1091 }
1092
1093 let validators = self.connected_peers();
1095 if validators.is_empty() {
1097 return;
1098 }
1099 if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1101 let self_ = self.clone();
1102 tokio::spawn(async move {
1103 self_.cache.increment_outbound_validators_requests(validator_ip);
1105 let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1107 });
1108 }
1109 }
1110 }
1111
1112 async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1114 if let Err(error) = self.inbound(peer_addr, message).await {
1116 if let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) {
1117 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1118 let self_ = self.clone();
1119 tokio::spawn(async move {
1120 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1121 self_.disconnect(peer_ip);
1123 });
1124 }
1125 }
1126 }
1127
1128 fn handle_banned_ips(&self) {
1130 self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1131 }
1132
1133 fn update_validator_whitelist(&self) {
1135 if let Err(e) =
1136 self.save_best_peers(&self.node_data_dir.validator_whitelist_path(), Some(MAX_VALIDATORS_TO_SEND), false)
1137 {
1138 warn!("Couldn't update the validator whitelist: {e}");
1139 }
1140 }
1141}
1142
1143#[async_trait]
1144impl<N: Network> Transport<N> for Gateway<N> {
1145 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1153 macro_rules! send {
1154 ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1155 while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1157 tokio::time::sleep(Duration::from_millis(10)).await;
1159 }
1160 $self.send_inner(peer_ip, event)
1162 }};
1163 }
1164
1165 match event {
1167 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1168 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1170 send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1172 }
1173 Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1174 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1176 send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1178 }
1179 Event::BlockRequest(request) => {
1180 self.cache.insert_outbound_block_request(peer_ip, request);
1182 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1184 }
1185 _ => {
1186 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1188 }
1189 }
1190 }
1191
1192 fn broadcast(&self, event: Event<N>) {
1196 if self.number_of_connected_peers() > 0 {
1198 let self_ = self.clone();
1199 let connected_peers = self.connected_peers();
1200 tokio::spawn(async move {
1201 for peer_ip in connected_peers {
1203 let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1205 }
1206 });
1207 }
1208 }
1209}
1210
1211impl<N: Network> P2P for Gateway<N> {
1212 fn tcp(&self) -> &Tcp {
1214 &self.tcp
1215 }
1216}
1217
1218#[async_trait]
1219impl<N: Network> Reading for Gateway<N> {
1220 type Codec = EventCodec<N>;
1221 type Message = Event<N>;
1222
1223 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1226 Default::default()
1227 }
1228
1229 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1231 if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1232 let self_ = self.clone();
1233 tokio::spawn(async move {
1236 self_.process_message_inner(peer_addr, message).await;
1237 });
1238 } else {
1239 self.process_message_inner(peer_addr, message).await;
1240 }
1241 Ok(())
1242 }
1243
1244 fn message_queue_depth(&self) -> usize {
1247 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1248 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1249 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1250 }
1251}
1252
1253#[async_trait]
1254impl<N: Network> Writing for Gateway<N> {
1255 type Codec = EventCodec<N>;
1256 type Message = Event<N>;
1257
1258 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1261 Default::default()
1262 }
1263
1264 fn message_queue_depth(&self) -> usize {
1268 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1269 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1270 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1271 }
1272}
1273
1274#[async_trait]
1275impl<N: Network> Disconnect for Gateway<N> {
1276 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1278 if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1279 self.downgrade_peer_to_candidate(peer_ip);
1280 if let Some(sync_sender) = self.sync_sender.get() {
1282 let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
1283 tokio::spawn(async move {
1284 if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
1285 warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
1286 }
1287 });
1288 }
1289 self.cache.clear_outbound_validators_requests(peer_ip);
1293 self.cache.clear_outbound_block_requests(peer_ip);
1294 #[cfg(feature = "metrics")]
1295 self.update_metrics();
1296 }
1297 }
1298}
1299
1300#[async_trait]
1301impl<N: Network> OnConnect for Gateway<N> {
1302 async fn on_connect(&self, peer_addr: SocketAddr) {
1303 if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1304 if let Some(peer) = self.get_connected_peer(listener_addr) {
1305 if peer.node_type == NodeType::BootstrapClient {
1306 let _ =
1307 <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1308 .await;
1309 }
1310 }
1311 }
1312 }
1313}
1314
1315#[async_trait]
1316impl<N: Network> Handshake for Gateway<N> {
1317 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1319 let peer_addr = connection.addr();
1321 let peer_side = connection.side();
1322
1323 #[cfg(not(any(test)))]
1325 if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1326 if self.is_ip_banned(peer_addr.ip()) {
1328 trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1329 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1330 }
1331
1332 let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1333
1334 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1335 if num_attempts > MAX_CONNECTION_ATTEMPTS {
1336 self.update_ip_ban(peer_addr.ip());
1337 trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1338 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1339 }
1340 }
1341
1342 let stream = self.borrow_stream(&mut connection);
1343
1344 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1347 debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1348 None
1349 } else {
1350 debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1351 Some(peer_addr)
1352 };
1353
1354 let restrictions_id = self.ledger.latest_restrictions_id();
1356
1357 let handshake_result = if peer_side == ConnectionSide::Responder {
1359 self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1360 } else {
1361 self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1362 };
1363
1364 if let Some(addr) = listener_addr {
1365 match handshake_result {
1366 Ok(Some(ref cr)) => {
1367 let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1368 NodeType::BootstrapClient
1369 } else {
1370 NodeType::Validator
1371 };
1372 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1373 self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1374 peer.upgrade_to_connected(
1375 peer_addr,
1376 cr.listener_port,
1377 cr.address,
1378 node_type,
1379 cr.version,
1380 ConnectionMode::Gateway,
1381 );
1382 }
1383 #[cfg(feature = "metrics")]
1384 self.update_metrics();
1385 info!("{CONTEXT} Connected to '{addr}'");
1386 }
1387 Ok(None) => {
1388 return Err(error(format!("Duplicate handshake attempt with '{addr}'")));
1389 }
1390 Err(error) => {
1391 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1392 if peer.is_connecting() {
1394 peer.downgrade_to_candidate(addr);
1395 }
1396 }
1397 return Err(error);
1399 }
1400 }
1401 }
1402
1403 Ok(connection)
1404 }
1405}
1406
1407macro_rules! expect_event {
1409 ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1410 match $framed.try_next().await? {
1411 Some($event_ty(data)) => {
1413 trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1414 data
1415 }
1416 Some(Event::Disconnect(DisconnectEvent { reason })) => {
1418 return Err(error(format!("{CONTEXT} '{}' disconnected: {reason}", $peer_addr)));
1419 }
1420 Some(ty) => {
1422 return Err(error(format!(
1423 "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1424 $peer_addr,
1425 ty.name(),
1426 stringify!($event_ty),
1427 )))
1428 }
1429 None => {
1431 return Err(error(format!(
1432 "{CONTEXT} the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
1433 stringify!($event_ty)
1434 )))
1435 }
1436 }
1437 };
1438}
1439
1440async fn send_event<N: Network>(
1442 framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1443 peer_addr: SocketAddr,
1444 event: Event<N>,
1445) -> io::Result<()> {
1446 trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1447 framed.send(event).await
1448}
1449
1450impl<N: Network> Gateway<N> {
1451 async fn handshake_inner_initiator<'a>(
1453 &'a self,
1454 peer_addr: SocketAddr,
1455 restrictions_id: Field<N>,
1456 stream: &'a mut TcpStream,
1457 ) -> io::Result<Option<ChallengeRequest<N>>> {
1458 if !self.add_connecting_peer(peer_addr) {
1460 return Ok(None);
1461 }
1462
1463 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1465
1466 let rng = &mut rand::rngs::OsRng;
1468
1469 let our_nonce = rng.r#gen();
1473 let current_block_height = self.ledger.latest_block_height();
1475 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1476 let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1477 (true, Some(sha)) => Some(sha),
1478 _ => None,
1479 };
1480 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1482 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1483
1484 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1488 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1490
1491 if let Some(reason) = self
1493 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1494 .await
1495 {
1496 send_event(&mut framed, peer_addr, reason.into()).await?;
1497 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1498 }
1499 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1501 send_event(&mut framed, peer_addr, reason.into()).await?;
1502 if reason == DisconnectReason::NoReasonGiven {
1503 return Ok(None);
1505 } else {
1506 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1507 }
1508 }
1509
1510 let response_nonce: u64 = rng.r#gen();
1514 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1515 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1516 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1517 };
1518 let our_response =
1520 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1521 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1522
1523 Ok(Some(peer_request))
1524 }
1525
1526 async fn handshake_inner_responder<'a>(
1528 &'a self,
1529 peer_addr: SocketAddr,
1530 peer_ip: &mut Option<SocketAddr>,
1531 restrictions_id: Field<N>,
1532 stream: &'a mut TcpStream,
1533 ) -> io::Result<Option<ChallengeRequest<N>>> {
1534 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1536
1537 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1541
1542 if self.account.address() == peer_request.address {
1544 return Err(error("Skipping request to connect to self".to_string()));
1545 }
1546
1547 *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1549 let peer_ip = peer_ip.unwrap();
1550
1551 if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1553 return Err(error(format!("{forbidden_message}")));
1554 }
1555
1556 if !self.add_connecting_peer(peer_ip) {
1558 return Ok(None);
1559 }
1560
1561 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1563 send_event(&mut framed, peer_addr, reason.into()).await?;
1564 if reason == DisconnectReason::NoReasonGiven {
1565 return Ok(None);
1567 } else {
1568 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1569 }
1570 }
1571
1572 let rng = &mut rand::rngs::OsRng;
1576
1577 let response_nonce: u64 = rng.r#gen();
1579 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1580 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1581 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1582 };
1583 let our_response =
1585 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1586 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1587
1588 let our_nonce = rng.r#gen();
1590 let current_block_height = self.ledger.latest_block_height();
1592 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1593 let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1594 (true, Some(sha)) => Some(sha),
1595 _ => None,
1596 };
1597 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1599 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1600
1601 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1605 if let Some(reason) = self
1607 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1608 .await
1609 {
1610 send_event(&mut framed, peer_addr, reason.into()).await?;
1611 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1612 }
1613
1614 Ok(Some(peer_request))
1615 }
1616
1617 fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1619 let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
1621 log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
1622
1623 let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1624
1625 if version < Event::<N>::VERSION {
1627 warn!("{CONTEXT} Dropping '{peer_addr}' on version {version} (outdated)");
1628 return Some(DisconnectReason::OutdatedClientVersion);
1629 }
1630 if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1632 warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1633 return Some(DisconnectReason::ProtocolViolation);
1634 }
1635 if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1636 if !self.is_authorized_validator_address(address) {
1638 warn!("{CONTEXT} Dropping '{peer_addr}' for being an unauthorized validator ({address})");
1639 return Some(DisconnectReason::ProtocolViolation);
1640 }
1641 }
1642 if self.is_connected_address(address) {
1644 warn!("{CONTEXT} Dropping '{peer_addr}' for being already connected ({address})");
1645 return Some(DisconnectReason::NoReasonGiven);
1646 }
1647 None
1648 }
1649
1650 async fn verify_challenge_response(
1652 &self,
1653 peer_addr: SocketAddr,
1654 peer_address: Address<N>,
1655 response: ChallengeResponse<N>,
1656 expected_restrictions_id: Field<N>,
1657 expected_nonce: u64,
1658 ) -> Option<DisconnectReason> {
1659 let ChallengeResponse { restrictions_id, signature, nonce } = response;
1661
1662 if restrictions_id != expected_restrictions_id {
1664 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1665 return Some(DisconnectReason::InvalidChallengeResponse);
1666 }
1667 let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1669 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1670 return Some(DisconnectReason::InvalidChallengeResponse);
1671 };
1672 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1674 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1675 return Some(DisconnectReason::InvalidChallengeResponse);
1676 }
1677 None
1678 }
1679}
1680
1681#[cfg(test)]
1682mod prop_tests {
1683 use crate::{
1684 Gateway,
1685 MAX_WORKERS,
1686 MEMORY_POOL_PORT,
1687 Worker,
1688 gateway::prop_tests::GatewayAddress::{Dev, Prod},
1689 helpers::{Storage, init_primary_channels, init_worker_channels},
1690 };
1691
1692 use snarkos_account::Account;
1693 use snarkos_node_bft_ledger_service::MockLedgerService;
1694 use snarkos_node_bft_storage_service::BFTMemoryService;
1695 use snarkos_node_network::PeerPoolHandling;
1696 use snarkos_node_tcp::P2P;
1697 use snarkos_utilities::NodeDataDir;
1698
1699 use snarkvm::{
1700 ledger::{
1701 committee::{
1702 Committee,
1703 prop_tests::{CommitteeContext, ValidatorSet},
1704 test_helpers::sample_committee_for_round_and_members,
1705 },
1706 narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1707 },
1708 prelude::{MainnetV0, PrivateKey},
1709 utilities::TestRng,
1710 };
1711
1712 use indexmap::{IndexMap, IndexSet};
1713 use proptest::{
1714 prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1715 sample::Selector,
1716 };
1717 use std::{
1718 fmt::{Debug, Formatter},
1719 net::{IpAddr, Ipv4Addr, SocketAddr},
1720 sync::Arc,
1721 };
1722 use test_strategy::proptest;
1723
1724 type CurrentNetwork = MainnetV0;
1725
1726 impl Debug for Gateway<CurrentNetwork> {
1727 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1728 f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1730 }
1731 }
1732
1733 #[derive(Debug, test_strategy::Arbitrary)]
1734 enum GatewayAddress {
1735 Dev(u8),
1736 Prod(Option<SocketAddr>),
1737 }
1738
1739 impl GatewayAddress {
1740 fn ip(&self) -> Option<SocketAddr> {
1741 if let GatewayAddress::Prod(ip) = self {
1742 return *ip;
1743 }
1744 None
1745 }
1746
1747 fn port(&self) -> Option<u16> {
1748 if let GatewayAddress::Dev(port) = self {
1749 return Some(*port as u16);
1750 }
1751 None
1752 }
1753 }
1754
1755 impl Arbitrary for Gateway<CurrentNetwork> {
1756 type Parameters = ();
1757 type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1758
1759 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1760 any_valid_dev_gateway()
1761 .prop_map(|(storage, _, private_key, address)| {
1762 Gateway::new(
1763 Account::try_from(private_key).unwrap(),
1764 storage.clone(),
1765 storage.ledger().clone(),
1766 address.ip(),
1767 &[],
1768 false,
1769 NodeDataDir::new_test(None),
1770 address.port(),
1771 )
1772 .unwrap()
1773 })
1774 .boxed()
1775 }
1776 }
1777
1778 type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1779
1780 fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1781 (any::<CommitteeContext>(), any::<Selector>())
1782 .prop_flat_map(|(context, account_selector)| {
1783 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1784 (
1785 any_with::<Storage<CurrentNetwork>>(context.clone()),
1786 Just(context),
1787 Just(account_selector.select(validators)),
1788 0u8..,
1789 )
1790 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1791 })
1792 .boxed()
1793 }
1794
1795 fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1796 (any::<CommitteeContext>(), any::<Selector>())
1797 .prop_flat_map(|(context, account_selector)| {
1798 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1799 (
1800 any_with::<Storage<CurrentNetwork>>(context.clone()),
1801 Just(context),
1802 Just(account_selector.select(validators)),
1803 any::<Option<SocketAddr>>(),
1804 )
1805 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1806 })
1807 .boxed()
1808 }
1809
1810 #[proptest]
1811 fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1812 let (storage, _, private_key, dev) = input;
1813 let account = Account::try_from(private_key).unwrap();
1814
1815 let gateway = Gateway::new(
1816 account.clone(),
1817 storage.clone(),
1818 storage.ledger().clone(),
1819 dev.ip(),
1820 &[],
1821 false,
1822 NodeDataDir::new_test(None),
1823 dev.port(),
1824 )
1825 .unwrap();
1826 let tcp_config = gateway.tcp().config();
1827 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1828 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1829
1830 let tcp_config = gateway.tcp().config();
1831 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1832 assert_eq!(gateway.account().address(), account.address());
1833 }
1834
1835 #[proptest]
1836 fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1837 let (storage, _, private_key, dev) = input;
1838 let account = Account::try_from(private_key).unwrap();
1839
1840 let gateway = Gateway::new(
1841 account.clone(),
1842 storage.clone(),
1843 storage.ledger().clone(),
1844 dev.ip(),
1845 &[],
1846 false,
1847 NodeDataDir::new_test(None),
1848 dev.port(),
1849 )
1850 .unwrap();
1851 let tcp_config = gateway.tcp().config();
1852 if let Some(socket_addr) = dev.ip() {
1853 assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1854 assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1855 } else {
1856 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1857 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1858 }
1859
1860 let tcp_config = gateway.tcp().config();
1861 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1862 assert_eq!(gateway.account().address(), account.address());
1863 }
1864
1865 #[proptest(async = "tokio")]
1866 async fn gateway_start(
1867 #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1868 #[strategy(0..MAX_WORKERS)] workers_count: u8,
1869 ) {
1870 let (storage, committee, private_key, dev) = input;
1871 let committee = committee.0;
1872 let worker_storage = storage.clone();
1873 let account = Account::try_from(private_key).unwrap();
1874
1875 let gateway = Gateway::new(
1876 account,
1877 storage.clone(),
1878 storage.ledger().clone(),
1879 dev.ip(),
1880 &[],
1881 false,
1882 NodeDataDir::new_test(None),
1883 dev.port(),
1884 )
1885 .unwrap();
1886
1887 let (primary_sender, _) = init_primary_channels();
1888
1889 let (workers, worker_senders) = {
1890 let mut tx_workers = IndexMap::new();
1892 let mut workers = IndexMap::new();
1893
1894 for id in 0..workers_count {
1896 let (tx_worker, rx_worker) = init_worker_channels();
1898 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1900 let worker =
1901 Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1902 .unwrap();
1903 worker.run(rx_worker);
1905
1906 workers.insert(id, worker);
1908 tx_workers.insert(id, tx_worker);
1909 }
1910 (workers, tx_workers)
1911 };
1912
1913 gateway.run(primary_sender, worker_senders, None).await;
1914 assert_eq!(
1915 gateway.local_ip(),
1916 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1917 );
1918 assert_eq!(gateway.num_workers(), workers.len() as u8);
1919 }
1920
1921 #[proptest]
1922 fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1923 let rng = &mut TestRng::default();
1924
1925 let current_round = 2;
1927 let committee_size = 4;
1928 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1929 let (_, _, private_key, dev) = input;
1930 let account = Account::try_from(private_key).unwrap();
1931
1932 let mut certificates = IndexSet::new();
1934 for _ in 0..committee_size {
1935 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1936 }
1937 let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1938 let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1940 for _ in 0..committee_size {
1942 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1943 }
1944 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1946 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1948 let gateway = Gateway::new(
1950 account.clone(),
1951 storage.clone(),
1952 ledger.clone(),
1953 dev.ip(),
1954 &[],
1955 false,
1956 NodeDataDir::new_test(None),
1957 dev.port(),
1958 )
1959 .unwrap();
1960 for certificate in certificates.iter() {
1962 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1963 }
1964 for i in 0..certificates.clone().len() {
1966 let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1967 if i < committee_size {
1968 assert!(is_authorized);
1969 } else {
1970 assert!(!is_authorized);
1971 }
1972 }
1973 }
1974}