1#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19 CONTEXT,
20 MAX_BATCH_DELAY,
21 MEMORY_POOL_PORT,
22 Worker,
23 events::{DisconnectReason, EventCodec, PrimaryPing},
24 helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
25 spawn_blocking,
26};
27use smol_str::SmolStr;
28use snarkos_account::Account;
29use snarkos_node_bft_events::{
30 BlockRequest,
31 BlockResponse,
32 CertificateRequest,
33 CertificateResponse,
34 ChallengeRequest,
35 ChallengeResponse,
36 DataBlocks,
37 Event,
38 EventTrait,
39 TransmissionRequest,
40 TransmissionResponse,
41 ValidatorsRequest,
42 ValidatorsResponse,
43};
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_network::{
46 ConnectionMode,
47 NodeType,
48 Peer,
49 PeerPoolHandling,
50 Resolver,
51 bootstrap_peers,
52 get_repo_commit_hash,
53 log_repo_sha_comparison,
54 shorten_snarkos_sha,
55};
56use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
57use snarkos_node_tcp::{
58 Config,
59 ConnectError,
60 Connection,
61 ConnectionSide,
62 P2P,
63 Tcp,
64 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
65};
66use snarkos_utilities::NodeDataDir;
67use snarkvm::{
68 console::prelude::*,
69 ledger::{
70 committee::Committee,
71 narwhal::{BatchHeader, Data},
72 },
73 prelude::{Address, Field},
74 utilities::flatten_error,
75};
76
77use colored::Colorize;
78use futures::{SinkExt, future::join_all};
79use indexmap::IndexMap;
80#[cfg(feature = "locktick")]
81use locktick::parking_lot::{Mutex, RwLock};
82#[cfg(not(feature = "locktick"))]
83use parking_lot::{Mutex, RwLock};
84use rand::seq::{IteratorRandom, SliceRandom};
85use std::{
86 collections::{HashMap, HashSet},
87 future::Future,
88 io,
89 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
90 sync::Arc,
91 time::Duration,
92};
93use tokio::{
94 net::TcpStream,
95 sync::{OnceCell, oneshot},
96 task::{self, JoinHandle},
97};
98use tokio_stream::StreamExt;
99use tokio_util::codec::Framed;
100
101const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; #[cfg(not(test))]
108const MAX_CONNECTION_ATTEMPTS: usize = 10;
109
110pub const MAX_VALIDATORS_TO_SEND: usize = 200;
112
113#[cfg(not(test))]
115const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
116
117const IP_BAN_TIME_IN_SECS: u64 = 300;
119
120#[async_trait]
123pub trait Transport<N: Network>: Send + Sync {
124 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
125 fn broadcast(&self, event: Event<N>);
126}
127
128#[derive(Clone)]
131pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
132
133impl<N: Network> Deref for Gateway<N> {
134 type Target = Arc<InnerGateway<N>>;
135
136 fn deref(&self) -> &Self::Target {
137 &self.0
138 }
139}
140
141pub struct InnerGateway<N: Network> {
142 account: Account<N>,
144 storage: Storage<N>,
146 ledger: Arc<dyn LedgerService<N>>,
148 tcp: Tcp,
150 cache: Cache<N>,
152 resolver: RwLock<Resolver<N>>,
154 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
156 #[cfg(feature = "telemetry")]
157 validator_telemetry: Telemetry<N>,
158 primary_sender: OnceCell<PrimarySender<N>>,
160 worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
162 sync_sender: OnceCell<SyncSender<N>>,
164 handles: Mutex<Vec<JoinHandle<()>>>,
166 node_data_dir: NodeDataDir,
168 trusted_peers_only: bool,
170 dev: Option<u16>,
172}
173
174impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
175 const MAXIMUM_POOL_SIZE: usize = 200;
176 const OWNER: &str = CONTEXT;
177 const PEER_SLASHING_COUNT: usize = 20;
178
179 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
180 &self.peer_pool
181 }
182
183 fn resolver(&self) -> &RwLock<Resolver<N>> {
184 &self.resolver
185 }
186
187 fn is_dev(&self) -> bool {
188 self.dev.is_some()
189 }
190
191 fn trusted_peers_only(&self) -> bool {
192 self.trusted_peers_only
193 }
194
195 fn node_type(&self) -> NodeType {
196 NodeType::Validator
197 }
198}
199
200impl<N: Network> Gateway<N> {
201 #[allow(clippy::too_many_arguments)]
203 pub fn new(
204 account: Account<N>,
205 storage: Storage<N>,
206 ledger: Arc<dyn LedgerService<N>>,
207 ip: Option<SocketAddr>,
208 trusted_validators: &[SocketAddr],
209 trusted_peers_only: bool,
210 node_data_dir: NodeDataDir,
211 dev: Option<u16>,
212 ) -> Result<Self> {
213 let ip = match (ip, dev) {
215 (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
216 (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
217 (Some(ip), _) => ip,
218 };
219 let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size() * 10));
227
228 let mut initial_peers = HashMap::new();
230
231 if !trusted_peers_only {
233 let cached_peers = Self::load_cached_peers(&node_data_dir.gateway_peer_cache_path())?;
234 for addr in cached_peers {
235 initial_peers.insert(addr, Peer::new_candidate(addr, false));
236 }
237 }
238
239 initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
242
243 Ok(Self(Arc::new(InnerGateway {
245 account,
246 storage,
247 ledger,
248 tcp,
249 cache: Default::default(),
250 resolver: Default::default(),
251 peer_pool: RwLock::new(initial_peers),
252 #[cfg(feature = "telemetry")]
253 validator_telemetry: Default::default(),
254 primary_sender: Default::default(),
255 worker_senders: Default::default(),
256 sync_sender: Default::default(),
257 handles: Default::default(),
258 node_data_dir,
259 trusted_peers_only,
260 dev,
261 })))
262 }
263
264 pub async fn run(
266 &self,
267 primary_sender: PrimarySender<N>,
268 worker_senders: IndexMap<u8, WorkerSender<N>>,
269 sync_sender: Option<SyncSender<N>>,
270 ) {
271 debug!("Starting the gateway for the memory pool...");
272
273 self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
275
276 self.worker_senders.set(worker_senders).expect("The worker senders are already set");
278
279 if let Some(sync_sender) = sync_sender {
281 self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
282 }
283
284 self.enable_handshake().await;
286 self.enable_reading().await;
287 self.enable_writing().await;
288 self.enable_disconnect().await;
289 self.enable_on_connect().await;
290
291 #[cfg(feature = "metrics")]
293 {
294 let gateway = self.clone();
295 self.spawn(async move {
296 loop {
297 tokio::time::sleep(Duration::from_secs(1)).await;
298 gateway.update_metrics();
299 }
300 });
301 }
302
303 let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
305 debug!("Listening for validator connections at address {listen_addr:?}");
306
307 self.initialize_heartbeat();
309
310 info!("Started the gateway for the memory pool at '{}'", self.local_ip());
311 }
312}
313
314impl<N: Network> Gateway<N> {
316 fn max_committee_size(&self) -> usize {
318 self.ledger
319 .current_committee()
320 .map_or_else(|_e| Committee::<N>::max_committee_size() as usize, |committee| committee.num_members())
321 }
322
323 fn max_cache_events(&self) -> usize {
325 self.max_cache_transmissions()
326 }
327
328 fn max_cache_certificates(&self) -> usize {
330 2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
331 }
332
333 fn max_cache_transmissions(&self) -> usize {
335 self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
336 }
337
338 fn max_cache_duplicates(&self) -> usize {
340 self.max_committee_size().pow(2)
341 }
342}
343
344#[async_trait]
345impl<N: Network> CommunicationService for Gateway<N> {
346 type Message = Event<N>;
348
349 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
351 debug_assert!(start_height < end_height, "Invalid block request format");
352 Event::BlockRequest(BlockRequest { start_height, end_height })
353 }
354
355 async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
361 Transport::send(self, peer_ip, message).await
362 }
363}
364
365impl<N: Network> Gateway<N> {
366 pub fn account(&self) -> &Account<N> {
368 &self.account
369 }
370
371 pub fn dev(&self) -> Option<u16> {
373 self.dev
374 }
375
376 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
378 &self.ledger
379 }
380
381 pub fn resolver(&self) -> &RwLock<Resolver<N>> {
383 &self.resolver
384 }
385
386 pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
388 self.resolver.read().get_listener(*connected_addr)
389 }
390
391 #[cfg(feature = "telemetry")]
393 pub fn validator_telemetry(&self) -> &Telemetry<N> {
394 &self.validator_telemetry
395 }
396
397 pub fn primary_sender(&self) -> &PrimarySender<N> {
399 self.primary_sender.get().expect("Primary sender not set in gateway")
400 }
401
402 pub fn num_workers(&self) -> u8 {
404 u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
405 .expect("Too many workers")
406 }
407
408 pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
410 self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
411 }
412
413 pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
415 if self.trusted_peers().contains(&ip) {
417 return true;
418 }
419 match self.resolve_to_aleo_addr(ip) {
421 Some(address) => self.is_authorized_validator_address(address),
423 None => {
424 warn!("{CONTEXT} Could not resolve the Aleo address for '{ip}'");
425 false
426 }
427 }
428 }
429
430 pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
432 if self
441 .ledger
442 .get_committee_lookback_for_round(self.storage.current_round())
443 .is_ok_and(|committee| committee.is_committee_member(validator_address))
444 {
445 return true;
446 }
447
448 if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
450 return true;
451 }
452
453 let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
455 match self.ledger.get_block_round(previous_block_height) {
457 Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
458 self.ledger
459 .get_committee_lookback_for_round(round)
460 .is_ok_and(|committee| committee.is_committee_member(validator_address))
461 }),
462 Err(_) => false,
463 }
464 }
465
466 pub fn connected_addresses(&self) -> HashSet<Address<N>> {
468 self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect()
469 }
470
471 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
473 if self.is_local_ip(listener_addr) {
475 return Err(DisconnectReason::SelfConnect);
476 }
477
478 Ok(())
479 }
480
481 #[cfg(feature = "metrics")]
483 fn update_metrics(&self) {
484 if let Some(count) = self.number_of_connected_validators() {
485 metrics::gauge(metrics::bft::CONNECTED, count as f64);
486 }
487 if let Some(count) = self.number_of_connecting_peers() {
488 metrics::gauge(metrics::bft::CONNECTING, count as f64);
489 }
490 }
491
492 #[cfg(test)]
494 pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
495 self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
497 self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
499 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
500 peer.upgrade_to_connected(
501 peer_addr,
502 peer_ip.port(),
503 address,
504 NodeType::Validator,
505 0,
506 get_repo_commit_hash(),
507 ConnectionMode::Gateway,
508 );
509 }
510 }
511
512 fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
518 let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
520 warn!("Unable to resolve the listener IP address '{peer_ip}'");
521 return None;
522 };
523 let name = event.name();
525 trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
527 let result = self.unicast(peer_addr, event);
528 if let Err(err) = &result {
530 warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {err:?}");
531 debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
532 self.disconnect(peer_ip);
533 }
534 result.ok()
535 }
536
537 async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
541 let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
543 trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
545 return Ok(false);
546 };
547 if !(self.is_authorized_validator_ip(peer_ip)
549 || self
550 .get_connected_peer(peer_ip)
551 .map(|peer| peer.node_type == NodeType::BootstrapClient)
552 .unwrap_or(false))
553 {
554 bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
555 }
556 let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
558 if num_events >= self.max_cache_events() {
559 bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
560 }
561 match event {
563 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
564 let certificate_id = match &event {
566 Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
567 Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
568 _ => unreachable!(),
569 };
570 let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
572 if num_events >= self.max_cache_duplicates() {
573 return Ok(true);
574 }
575 }
576 Event::TransmissionRequest(TransmissionRequest { transmission_id })
577 | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
578 let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
580 if num_events >= self.max_cache_duplicates() {
581 return Ok(true);
582 }
583 }
584 Event::BlockRequest(_) => {
585 let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
586 if num_events >= self.max_cache_duplicates() {
587 return Ok(true);
588 }
589 }
590 _ => {}
591 }
592 trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
593
594 match event {
597 Event::BatchPropose(batch_propose) => {
598 let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
600 Ok(true)
601 }
602 Event::BatchSignature(batch_signature) => {
603 let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
605 Ok(true)
606 }
607 Event::BatchCertified(batch_certified) => {
608 let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
610 Ok(true)
611 }
612 Event::BlockRequest(block_request) => {
613 let BlockRequest { start_height, end_height } = block_request;
614
615 if start_height >= end_height {
617 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
618 }
619 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
621 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
622 }
623
624 let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
626
627 let self_ = self.clone();
628 let blocks = match task::spawn_blocking(move || {
629 match self_.ledger.get_blocks(start_height..end_height) {
631 Ok(blocks) => Ok(DataBlocks(blocks)),
632 Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
633 }
634 })
635 .await
636 {
637 Ok(Ok(blocks)) => blocks,
638 Ok(Err(error)) => return Err(error),
639 Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
640 };
641
642 let self_ = self.clone();
643 tokio::spawn(async move {
644 let event =
646 Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
647 Transport::send(&self_, peer_ip, event).await;
648 });
649 Ok(true)
650 }
651 Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
652 if let Some(sync_sender) = self.sync_sender.get() {
654 if !self.cache.remove_outbound_block_request(peer_ip, &request) {
656 bail!("Unsolicited block response from '{peer_ip}'")
657 }
658
659 let (send, recv) = tokio::sync::oneshot::channel();
663 rayon::spawn_fifo(move || {
664 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
665 let _ = send.send(blocks);
666 });
667 let blocks = match recv.await {
668 Ok(Ok(blocks)) => blocks,
669 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
670 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
671 };
672
673 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
675 match sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await {
677 Ok(_) => Ok(true),
678 Err(err) if err.is_benign() => {
679 let err: anyhow::Error = err.into();
680 let err = err.context(format!("Ignoring block response from peer '{peer_ip}'"));
681 debug!("{}", flatten_error(err));
682 Ok(true)
683 }
684 Err(err) if err.is_invalid_consensus_version() => {
685 let err: anyhow::Error = err.into();
686 let err = err.context(format!("Peer sent an invalid block response '{peer_ip}'"));
687
688 let msg = flatten_error(&err);
689 error!("{msg}");
690 self.ip_ban_peer(peer_ip, Some(&msg));
691 Err(err)
692 }
693 Err(err) => {
694 let err: anyhow::Error = err.into();
695 let err = err.context(format!("Peer '{peer_ip}' sent an invalid block response"));
696 warn!("{}", flatten_error(err));
697
698 Ok(true)
700 }
701 }
702 } else {
703 debug!("Ignoring block response from '{peer_ip}' - no sync sender");
704 Ok(true)
705 }
706 }
707 Event::CertificateRequest(certificate_request) => {
708 if let Some(sync_sender) = self.sync_sender.get() {
711 let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
713 }
714 Ok(true)
715 }
716 Event::CertificateResponse(certificate_response) => {
717 if let Some(sync_sender) = self.sync_sender.get() {
720 let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
722 }
723 Ok(true)
724 }
725 Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
726 bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
728 }
729 Event::Disconnect(message) => {
730 debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
732 self.disconnect(peer_ip);
733 Ok(false)
734 }
735 Event::PrimaryPing(ping) => {
736 let PrimaryPing { version, block_locators, primary_certificate } = ping;
737
738 if version < Event::<N>::VERSION {
740 bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
741 }
742
743 debug!("Validator '{peer_ip}' is at height {}", block_locators.latest_locator_height());
745
746 if let Some(sync_sender) = self.sync_sender.get() {
748 if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
750 bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
751 }
752 }
753
754 let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
756 Ok(true)
757 }
758 Event::TransmissionRequest(request) => {
759 let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
762 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
763 return Ok(true);
764 };
765 if let Some(sender) = self.get_worker_sender(worker_id) {
767 let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
769 }
770 Ok(true)
771 }
772 Event::TransmissionResponse(response) => {
773 let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
775 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
776 return Ok(true);
777 };
778 if let Some(sender) = self.get_worker_sender(worker_id) {
780 let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
782 }
783 Ok(true)
784 }
785 Event::ValidatorsRequest(_) => {
786 let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
787 connected_peers.shuffle(&mut rand::rng());
788
789 let self_ = self.clone();
790 tokio::spawn(async move {
791 let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
793 for validator in connected_peers.into_iter() {
795 validators.insert(validator.listener_addr, validator.aleo_addr);
797 }
798 let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
800 Transport::send(&self_, peer_ip, event).await;
801 });
802 Ok(true)
803 }
804 Event::ValidatorsResponse(response) => {
805 if self.trusted_peers_only {
806 bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
807 }
808 let ValidatorsResponse { validators } = response;
809 ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
811 if !self.cache.contains_outbound_validators_request(peer_ip) {
813 bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
814 }
815 self.cache.decrement_outbound_validators_requests(peer_ip);
817
818 let valid_addrs = validators
821 .into_iter()
822 .filter_map(|(listener_addr, aleo_addr)| {
823 (self.account.address() != aleo_addr
824 && !self.is_connected_address(aleo_addr)
825 && self.is_authorized_validator_address(aleo_addr))
826 .then_some((listener_addr, None))
827 })
828 .collect::<Vec<_>>();
829 if !valid_addrs.is_empty() {
830 self.insert_candidate_peers(valid_addrs);
831 }
832
833 Ok(true)
834 }
835 Event::WorkerPing(ping) => {
836 ensure!(
838 ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
839 "{CONTEXT} Received too many transmissions"
840 );
841 let num_workers = self.num_workers();
843 for transmission_id in ping.transmission_ids.into_iter() {
845 let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
847 warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
848 continue;
849 };
850 if let Some(sender) = self.get_worker_sender(worker_id) {
852 let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
854 }
855 }
856 Ok(true)
857 }
858 }
859 }
860
861 fn initialize_heartbeat(&self) {
863 let self_clone = self.clone();
864 self.spawn(async move {
865 tokio::time::sleep(Duration::from_millis(1000)).await;
867 info!("Starting the heartbeat of the gateway...");
868 loop {
869 self_clone.heartbeat().await;
871 tokio::time::sleep(Duration::from_secs(15)).await;
873 }
874 });
875 }
876
877 #[allow(dead_code)]
879 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
880 self.handles.lock().push(tokio::spawn(future));
881 }
882
883 pub async fn shut_down(&self) {
885 info!("Shutting down the gateway...");
886 if let Err(e) = self.save_best_peers(&self.node_data_dir.gateway_peer_cache_path(), None, true) {
888 warn!("Failed to persist best validators to disk: {e}");
889 }
890 self.handles.lock().iter().for_each(|handle| handle.abort());
892 self.tcp.shut_down().await;
894 }
895}
896
897impl<N: Network> Gateway<N> {
898 const MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS: Duration = Duration::from_secs(10);
900 const MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD: Duration = Duration::from_secs(60);
902
903 async fn heartbeat(&self) {
905 self.log_connected_validators();
907 #[cfg(feature = "telemetry")]
909 self.log_participation_scores();
910 self.handle_trusted_validators();
912 self.handle_bootstrap_peers().await;
914 self.handle_unauthorized_validators();
916 self.handle_min_connected_validators().await;
918 self.handle_banned_ips();
920 }
921
922 fn log_connected_validators(&self) {
924 let connected_validators = self.filter_connected_peers(|peer| peer.node_type == NodeType::Validator);
927
928 let committee = match self.ledger.current_committee() {
929 Ok(c) => c,
930 Err(err) => {
931 error!("Failed to get current committee: {err}");
932 return;
933 }
934 };
935
936 let validators_total = committee.num_members().saturating_sub(1);
938 let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
940 let connections_msg = match connected_validators.len() {
942 0 => "No connected validators".to_string(),
943 num_connected => format!("Connected to {num_connected} validators {total_validators}"),
944 };
945 info!("{connections_msg}");
946
947 let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
949 let mut connected_validator_shas: HashMap<SmolStr, u64> = HashMap::with_capacity(connected_validators.len());
950 let our_sha = shorten_snarkos_sha(&get_repo_commit_hash());
952 let our_stake = committee.get_stake(self.account.address());
953 connected_validator_shas.insert(our_sha.clone(), our_stake);
954 connected_validator_addresses.insert(self.account.address());
956 for peer in &connected_validators {
958 let address = peer.aleo_addr;
960 connected_validator_addresses.insert(address);
961 let address_stake = committee.get_stake(address);
963 let short_peer_sha = shorten_snarkos_sha(&peer.snarkos_sha);
964 *connected_validator_shas.entry(short_peer_sha.clone()).or_default() += address_stake;
965
966 debug!(
967 "{}",
968 format!(
969 " Connected to: {} - {} (connection age {:?})",
970 peer.listener_addr,
971 peer.aleo_addr,
972 peer.first_seen.elapsed()
973 )
974 .dimmed()
975 );
976 }
977
978 if let Some(combined_stake) = connected_validator_shas.get(&our_sha) {
980 let percentage = *combined_stake as f64 / committee.total_stake() as f64 * 100.0;
981 debug!("{}", format!(" Combined stake @ {our_sha}: {percentage:.2}%").dimmed());
982 #[cfg(feature = "metrics")]
983 metrics::gauge(metrics::bft::CONNECTED_STAKE_WITH_MATCHING_SHA, percentage);
984 }
985
986 let num_not_connected = validators_total.saturating_sub(connected_validators.len());
988 if num_not_connected > 0 && self.tcp().uptime() > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
989 let total_stake = committee.total_stake();
991 let total_stake_f64 = total_stake as f64;
992
993 let committee_members: HashSet<_> =
995 self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
996
997 let not_connected_stake: u64 = committee_members
998 .difference(&connected_validator_addresses)
999 .map(|address| {
1000 let address_stake = committee.get_stake(*address);
1001 let address_stake_as_percentage =
1002 if total_stake == 0 { 0.0 } else { address_stake as f64 / total_stake_f64 * 100.0 };
1003 debug!(
1004 "{}",
1005 format!(" Not connected to {address} ({address_stake_as_percentage:.2}% of total stake)")
1006 .dimmed()
1007 );
1008 address_stake
1009 })
1010 .sum();
1011
1012 let not_connected_stake_as_percentage =
1013 if total_stake == 0 { 0.0 } else { not_connected_stake as f64 / total_stake_f64 * 100.0 };
1014 warn!(
1015 "Not connected to {num_not_connected} validators {total_validators} ({not_connected_stake_as_percentage:.2}% of total stake not connected)"
1016 );
1017 #[cfg(feature = "metrics")]
1018 {
1019 let connected_stake_as_percentage = 100.0 - not_connected_stake_as_percentage;
1020 metrics::gauge(metrics::bft::CONNECTED_STAKE, connected_stake_as_percentage);
1021 }
1022 } else {
1023 #[cfg(feature = "metrics")]
1024 metrics::gauge(metrics::bft::CONNECTED_STAKE, 100.0);
1025 };
1026
1027 if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
1028 if self.tcp().uptime() > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
1030 error!("Not connected to a quorum of validators");
1031 } else {
1032 debug!("Not connected to a quorum of validators");
1033 }
1034 }
1035 }
1036
1037 #[cfg(feature = "telemetry")]
1039 fn log_participation_scores(&self) {
1040 if let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(self.storage.current_round()) {
1041 let participation_scores = self.validator_telemetry().get_participation_scores(&committee_lookback);
1043
1044 debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
1046 for (address, (cert_score, sig_score)) in participation_scores {
1047 debug!(
1048 "{}",
1049 format!(" {address} - certificates: {cert_score:.2}% signatures: {sig_score:.2}%").dimmed()
1050 );
1051 }
1052 }
1053 }
1054
1055 fn handle_trusted_validators(&self) {
1057 let trusted_peers = self.trusted_peers();
1058
1059 let handles: Vec<JoinHandle<_>> = trusted_peers
1061 .iter()
1062 .filter_map(|validator_ip| {
1063 match self.connect(*validator_ip) {
1065 Ok(hdl) => Some(hdl),
1066 Err(ConnectError::SelfConnect { .. })
1067 | Err(ConnectError::AlreadyConnected { .. })
1068 | Err(ConnectError::AlreadyConnecting { .. }) => None,
1069 Err(err) => {
1070 warn!("Could not initiate connection to trusted validator at '{validator_ip}' - {err}");
1071 None
1072 }
1073 }
1074 })
1075 .collect();
1076
1077 if !handles.is_empty() {
1078 info!("Reconnnecting to {} out of {} trusted validators", handles.len(), trusted_peers.len());
1079 }
1080 }
1081
1082 async fn handle_bootstrap_peers(&self) {
1084 if self.trusted_peers_only {
1086 return;
1087 }
1088 let mut candidate_bootstrap = Vec::new();
1090 let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
1091 for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
1092 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
1093 candidate_bootstrap.push(bootstrap_ip);
1094 }
1095 }
1096 if connected_bootstrap.is_empty() {
1098 let peer_to_connect = candidate_bootstrap.into_iter().choose(&mut rand::rng());
1100 if let Some(peer_ip) = peer_to_connect {
1101 match self.connect(peer_ip) {
1102 Ok(hdl) => {
1103 debug!("{CONTEXT} (Re-)connecting to bootstrap peer at '{peer_ip}'");
1104 let result = hdl.await;
1105 if let Err(err) = result {
1106 warn!("{CONTEXT} Failed to connect to bootstrap peer at '{peer_ip}' - {err}");
1107 }
1108 }
1109 Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => {}
1110 Err(err) => {
1111 warn!("{CONTEXT} Could not initiate connection to bootstrap peer at '{peer_ip}' - {err}")
1112 }
1113 }
1114 }
1115 }
1116 let num_surplus = connected_bootstrap.len().saturating_sub(1);
1118 if num_surplus > 0 {
1119 let peers_to_disconnect = connected_bootstrap.into_iter().sample(&mut rand::rng(), num_surplus);
1121 for peer in peers_to_disconnect {
1122 info!("{CONTEXT} Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
1123 <Self as Transport<N>>::send(
1124 self,
1125 peer.listener_addr,
1126 Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
1127 )
1128 .await;
1129 self.disconnect(peer.listener_addr);
1131 }
1132 }
1133 }
1134
1135 fn handle_unauthorized_validators(&self) {
1137 let self_ = self.clone();
1138 tokio::spawn(async move {
1139 let validators = self_.get_connected_peers();
1141 for peer in validators {
1143 if peer.node_type == NodeType::BootstrapClient {
1145 continue;
1146 }
1147 if !self_.is_authorized_validator_ip(peer.listener_addr) {
1149 warn!(
1150 "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1151 peer.listener_addr
1152 );
1153 Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1154 self_.disconnect(peer.listener_addr);
1156 }
1157 }
1158 });
1159 }
1160
1161 async fn handle_min_connected_validators(&self) {
1165 let trusted_validators = self.trusted_peers();
1168 if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES() as usize {
1169 let (addrs, handles): (Vec<_>, Vec<_>) = self
1170 .get_candidate_peers()
1171 .iter()
1172 .filter_map(|peer| {
1173 if trusted_validators.contains(&peer.listener_addr) {
1174 return None;
1175 }
1176
1177 if let Some(previous_attempt) = peer.last_connection_attempt
1178 && previous_attempt.elapsed() < Self::MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS
1179 {
1180 return None;
1181 }
1182
1183 match self.connect(peer.listener_addr) {
1184 Ok(hdl) => Some((peer.listener_addr, hdl)),
1185 Err(ConnectError::AlreadyConnected { .. })
1186 | Err(ConnectError::AlreadyConnecting { .. })
1187 | Err(ConnectError::SelfConnect { .. }) => None,
1188 Err(err) => {
1189 warn!(
1190 "{CONTEXT} Could not initiate connection to validator at '{}' - {err}",
1191 peer.listener_addr
1192 );
1193 None
1194 }
1195 }
1196 })
1197 .unzip();
1198
1199 for (addr, result) in addrs.into_iter().zip(join_all(handles).await) {
1200 if let Err(err) = result {
1201 warn!("{CONTEXT} Failed to connect to validator at '{addr}' - {err}");
1202 }
1203 }
1204
1205 let validators = self.connected_peers();
1207 if validators.is_empty() {
1209 return;
1210 }
1211 if let Some(validator_ip) = validators.into_iter().choose(&mut rand::rng()) {
1213 let self_ = self.clone();
1214 tokio::spawn(async move {
1215 self_.cache.increment_outbound_validators_requests(validator_ip);
1217 let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1219 });
1220 }
1221 }
1222 }
1223
1224 async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1226 if let Err(error) = self.inbound(peer_addr, message).await
1228 && let Some(peer_ip) = self.resolver.read().get_listener(peer_addr)
1229 {
1230 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1231 let self_ = self.clone();
1232 tokio::spawn(async move {
1233 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1234 self_.disconnect(peer_ip);
1236 });
1237 }
1238 }
1239
1240 fn handle_banned_ips(&self) {
1242 self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1243 }
1244}
1245
1246#[async_trait]
1247impl<N: Network> Transport<N> for Gateway<N> {
1248 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1256 macro_rules! send {
1257 ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1258 while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1260 tokio::time::sleep(Duration::from_millis(10)).await;
1262 }
1263 $self.send_inner(peer_ip, event)
1265 }};
1266 }
1267
1268 match event {
1270 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1271 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1273 send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1275 }
1276 Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1277 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1279 send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1281 }
1282 Event::BlockRequest(request) => {
1283 self.cache.insert_outbound_block_request(peer_ip, request);
1285 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1287 }
1288 _ => {
1289 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1291 }
1292 }
1293 }
1294
1295 fn broadcast(&self, event: Event<N>) {
1299 if self.number_of_connected_peers() > 0 {
1301 let self_ = self.clone();
1302 let connected_peers = self.connected_peers();
1303 tokio::spawn(async move {
1304 for peer_ip in connected_peers {
1306 let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1308 }
1309 });
1310 }
1311 }
1312}
1313
1314impl<N: Network> P2P for Gateway<N> {
1315 fn tcp(&self) -> &Tcp {
1317 &self.tcp
1318 }
1319}
1320
1321#[async_trait]
1322impl<N: Network> Reading for Gateway<N> {
1323 type Codec = EventCodec<N>;
1324 type Message = Event<N>;
1325
1326 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1329 Default::default()
1330 }
1331
1332 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1334 if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1335 let self_ = self.clone();
1336 tokio::spawn(async move {
1339 self_.process_message_inner(peer_addr, message).await;
1340 });
1341 } else {
1342 self.process_message_inner(peer_addr, message).await;
1343 }
1344 Ok(())
1345 }
1346
1347 fn message_queue_depth(&self) -> usize {
1350 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1351 * N::LATEST_MAX_CERTIFICATES() as usize
1352 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1353 }
1354}
1355
1356#[async_trait]
1357impl<N: Network> Writing for Gateway<N> {
1358 type Codec = EventCodec<N>;
1359 type Message = Event<N>;
1360
1361 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1364 Default::default()
1365 }
1366
1367 fn message_queue_depth(&self) -> usize {
1371 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1372 * N::LATEST_MAX_CERTIFICATES() as usize
1373 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1374 }
1375}
1376
1377#[async_trait]
1378impl<N: Network> Disconnect for Gateway<N> {
1379 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1381 if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1382 let was_fully_connected = self.downgrade_peer_to_candidate(peer_ip);
1388
1389 if was_fully_connected && let Some(sync_sender) = self.sync_sender.get() {
1391 let (tx, rx) = oneshot::channel();
1392
1393 if let Err(err) = sync_sender.tx_block_sync_remove_peer.send((peer_ip, tx)).await {
1394 let err: anyhow::Error = err.into();
1395 let err =
1396 err.context(format!("Unable to remove disconnecting peer '{peer_ip}' from the sync module"));
1397 warn!("{CONTEXT} {}", flatten_error(err));
1398 }
1399
1400 if let Err(err) = rx.await {
1401 let err: anyhow::Error = err.into();
1402 let err =
1403 err.context(format!("Unable to remove disconnecting peer '{peer_ip}' from the sync module"));
1404 warn!("{CONTEXT} {}", flatten_error(err));
1405 }
1406 }
1407 self.cache.clear_outbound_validators_requests(peer_ip);
1411 self.cache.clear_outbound_block_requests(peer_ip);
1412 } else {
1413 warn!("{CONTEXT} Got disconnect for a peer '{peer_addr}' that is not in the peer pool");
1414 }
1415 }
1416}
1417
1418#[async_trait]
1419impl<N: Network> OnConnect for Gateway<N> {
1420 async fn on_connect(&self, peer_addr: SocketAddr) {
1421 if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1422 if let Some(peer) = self.get_connected_peer(listener_addr) {
1423 if peer.node_type == NodeType::BootstrapClient {
1424 self.cache.increment_outbound_validators_requests(listener_addr);
1425 let _ =
1426 <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1427 .await;
1428 }
1429 }
1430 }
1431 }
1432}
1433
1434#[async_trait]
1435impl<N: Network> Handshake for Gateway<N> {
1436 async fn perform_handshake(&self, mut connection: Connection) -> Result<Connection, ConnectError> {
1438 let peer_addr = connection.addr();
1440 let peer_side = connection.side();
1441
1442 #[cfg(not(test))]
1444 if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1445 if self.is_ip_banned(peer_addr.ip()) {
1447 trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1448 return Err(ConnectError::BannedIp { ip: peer_addr.ip() });
1449 }
1450
1451 let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1452
1453 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1454 if num_attempts > MAX_CONNECTION_ATTEMPTS {
1455 self.update_ip_ban(peer_addr.ip());
1456 trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1457 return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
1458 }
1459 }
1460
1461 let stream = self.borrow_stream(&mut connection);
1462
1463 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1466 debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1467 None
1468 } else {
1469 debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1470 Some(peer_addr)
1471 };
1472
1473 let restrictions_id = self.ledger.latest_restrictions_id();
1475
1476 let handshake_result = if peer_side == ConnectionSide::Responder {
1478 self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1479 } else {
1480 self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1481 };
1482
1483 if let Some(addr) = listener_addr {
1484 match handshake_result {
1485 Ok(ref cr) => {
1486 let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1487 NodeType::BootstrapClient
1488 } else {
1489 NodeType::Validator
1490 };
1491
1492 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1493 self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1494 peer.upgrade_to_connected(
1495 peer_addr,
1496 cr.listener_port,
1497 cr.address,
1498 node_type,
1499 cr.version,
1500 cr.snarkos_sha,
1501 ConnectionMode::Gateway,
1502 );
1503 }
1504 info!("{CONTEXT} Connected to '{addr}'");
1505 }
1506 Err(error) => {
1507 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1508 if peer.is_connecting() {
1510 peer.downgrade_to_candidate(addr);
1511 }
1512 }
1513 return Err(error);
1514 }
1515 }
1516 }
1517
1518 Ok(connection)
1519 }
1520}
1521
1522macro_rules! expect_event {
1524 ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1525 match $framed.try_next().await? {
1526 Some($event_ty(data)) => {
1528 trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1529 data
1530 }
1531 Some(Event::Disconnect($crate::events::Disconnect { reason })) => {
1533 return Err(ConnectError::other(format!("'{}' disconnected with reason \"{reason}\"", $peer_addr)));
1534 }
1535 Some(ty) => {
1537 return Err(ConnectError::other(format!(
1538 "'{}' did not follow the handshake protocol: received {:?} instead of {}",
1539 $peer_addr,
1540 ty.name(),
1541 stringify!($msg_ty),
1542 )));
1543 }
1544 None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
1546 }
1547 };
1548}
1549
1550async fn send_event<N: Network>(
1552 framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1553 peer_addr: SocketAddr,
1554 event: Event<N>,
1555) -> io::Result<()> {
1556 trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1557 framed.send(event).await
1558}
1559
1560impl<N: Network> Gateway<N> {
1561 async fn handshake_inner_initiator<'a>(
1563 &'a self,
1564 peer_addr: SocketAddr,
1565 restrictions_id: Field<N>,
1566 stream: &'a mut TcpStream,
1567 ) -> Result<ChallengeRequest<N>, ConnectError> {
1568 self.add_connecting_peer(peer_addr)?;
1570
1571 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1573
1574 let our_nonce: u64 = rand::random();
1578 let current_block_height = self.ledger.latest_block_height();
1580 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1581 let snarkos_sha = match (self.is_dev(), consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1582 (true, _, Some(sha)) => Some(sha),
1583 (_, true, Some(sha)) => Some(sha),
1584 _ => None,
1585 };
1586 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1588 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1589
1590 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1594 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1596
1597 if let Some(reason) = self
1599 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1600 .await
1601 {
1602 send_event(&mut framed, peer_addr, reason.into()).await?;
1603 return Err(ConnectError::application(reason));
1604 }
1605
1606 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1608 send_event(&mut framed, peer_addr, reason.into()).await?;
1609 return Err(reason.into_connect_error(peer_addr));
1610 }
1611
1612 let response_nonce: u64 = rand::random();
1616 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1617 let Ok(our_signature) = self.account.sign_bytes(&data, &mut rand::rng()) else {
1618 return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
1619 };
1620 let our_response =
1622 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1623 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1624
1625 Ok(peer_request)
1626 }
1627
1628 async fn handshake_inner_responder<'a>(
1630 &'a self,
1631 peer_addr: SocketAddr,
1632 peer_ip: &mut Option<SocketAddr>,
1633 restrictions_id: Field<N>,
1634 stream: &'a mut TcpStream,
1635 ) -> Result<ChallengeRequest<N>, ConnectError> {
1636 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1638
1639 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1643
1644 if self.account.address() == peer_request.address {
1646 return Err(ConnectError::SelfConnect { address: peer_addr });
1647 }
1648
1649 *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1651 let peer_ip = peer_ip.unwrap();
1652
1653 if let Err(reason) = self.ensure_peer_is_allowed(peer_ip) {
1655 send_event(&mut framed, peer_addr, reason.into()).await?;
1656 return Err(reason.into_connect_error(peer_addr));
1657 }
1658
1659 self.add_connecting_peer(peer_ip)?;
1661
1662 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1664 send_event(&mut framed, peer_addr, reason.into()).await?;
1665 return Err(reason.into_connect_error(peer_addr));
1666 }
1667
1668 let response_nonce: u64 = rand::random();
1672 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1673 let Ok(our_signature) = self.account.sign_bytes(&data, &mut rand::rng()) else {
1674 return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
1675 };
1676 let our_response =
1678 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1679 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1680
1681 let our_nonce: u64 = rand::random();
1683 let current_block_height = self.ledger.latest_block_height();
1685 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1686 let snarkos_sha = match (self.is_dev(), consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1687 (true, _, Some(sha)) => Some(sha),
1688 (_, true, Some(sha)) => Some(sha),
1689 _ => None,
1690 };
1691 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1693 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1694
1695 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1699 if let Some(reason) = self
1701 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1702 .await
1703 {
1704 send_event(&mut framed, peer_addr, reason.into()).await?;
1705 Err(reason.into_connect_error(peer_addr))
1706 } else {
1707 Ok(peer_request)
1708 }
1709 }
1710
1711 #[must_use]
1713 fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1714 let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
1716 log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
1717
1718 let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1719
1720 if version < Event::<N>::VERSION {
1722 return Some(DisconnectReason::OutdatedClientVersion);
1723 }
1724 if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1726 warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1727 return Some(DisconnectReason::NoExternalPeersAllowed);
1728 }
1729 if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1730 if !self.is_authorized_validator_address(address) {
1732 return Some(DisconnectReason::UnauthorizedValidator);
1733 }
1734 }
1735
1736 if self.is_connected_address(address) {
1738 return Some(DisconnectReason::AlreadyConnectedToAleoAddress);
1739 }
1740
1741 None
1742 }
1743
1744 #[must_use]
1746 async fn verify_challenge_response(
1747 &self,
1748 peer_addr: SocketAddr,
1749 peer_address: Address<N>,
1750 response: ChallengeResponse<N>,
1751 expected_restrictions_id: Field<N>,
1752 expected_nonce: u64,
1753 ) -> Option<DisconnectReason> {
1754 let ChallengeResponse { restrictions_id, signature, nonce } = response;
1756
1757 if restrictions_id != expected_restrictions_id {
1759 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1760 return Some(DisconnectReason::InvalidChallengeResponse);
1761 }
1762 let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1764 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1765 return Some(DisconnectReason::InvalidChallengeResponse);
1766 };
1767 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1769 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1770 return Some(DisconnectReason::InvalidChallengeResponse);
1771 }
1772 None
1773 }
1774}
1775
1776#[cfg(test)]
1777mod prop_tests {
1778 use crate::{
1779 Gateway,
1780 MAX_WORKERS,
1781 MEMORY_POOL_PORT,
1782 Worker,
1783 helpers::{Storage, init_primary_channels, init_worker_channels},
1784 };
1785
1786 use snarkos_account::Account;
1787 use snarkos_node_bft_ledger_service::MockLedgerService;
1788 use snarkos_node_bft_storage_service::BFTMemoryService;
1789 use snarkos_node_network::PeerPoolHandling;
1790 use snarkos_node_tcp::P2P;
1791 use snarkos_utilities::NodeDataDir;
1792
1793 use snarkos_node_bft_events::committee_prop_tests::{CommitteeContext, ValidatorSet};
1794 use snarkvm::{
1795 ledger::{
1796 committee::{Committee, test_helpers::sample_committee_for_round_and_members},
1797 narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1798 },
1799 prelude::{MainnetV0, PrivateKey},
1800 utilities::TestRng,
1801 };
1802
1803 use indexmap::{IndexMap, IndexSet};
1804 use proptest::{
1805 prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1806 sample::Selector,
1807 };
1808 use std::{
1809 fmt::{Debug, Formatter},
1810 net::{IpAddr, Ipv4Addr, SocketAddr},
1811 sync::Arc,
1812 };
1813 use test_strategy::proptest;
1814
1815 type CurrentNetwork = MainnetV0;
1816
1817 impl Debug for Gateway<CurrentNetwork> {
1818 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1819 f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1821 }
1822 }
1823
1824 #[derive(Debug, test_strategy::Arbitrary)]
1825 enum GatewayAddress {
1826 Dev(u8),
1827 Prod(Option<SocketAddr>),
1828 }
1829
1830 impl GatewayAddress {
1831 fn ip(&self) -> Option<SocketAddr> {
1832 if let GatewayAddress::Prod(ip) = self {
1833 return *ip;
1834 }
1835 None
1836 }
1837
1838 fn port(&self) -> Option<u16> {
1839 if let GatewayAddress::Dev(port) = self {
1840 return Some(*port as u16);
1841 }
1842 None
1843 }
1844 }
1845
1846 impl Arbitrary for Gateway<CurrentNetwork> {
1847 type Parameters = ();
1848 type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1849
1850 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1851 any_valid_dev_gateway()
1852 .prop_map(|(storage, _, private_key, address)| {
1853 Gateway::new(
1854 Account::try_from(private_key).unwrap(),
1855 storage.clone(),
1856 storage.ledger().clone(),
1857 address.ip(),
1858 &[],
1859 false,
1860 NodeDataDir::new_test(None),
1861 address.port(),
1862 )
1863 .unwrap()
1864 })
1865 .boxed()
1866 }
1867 }
1868
1869 type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1870
1871 fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1872 (any::<CommitteeContext>(), any::<Selector>())
1873 .prop_flat_map(|(context, account_selector)| {
1874 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1875 (
1876 any_with::<Storage<CurrentNetwork>>(context.clone()),
1877 Just(context),
1878 Just(account_selector.select(validators)),
1879 0u8..,
1880 )
1881 .prop_map(|(a, b, c, d)| (a, b, c.private_key, GatewayAddress::Dev(d)))
1882 })
1883 .boxed()
1884 }
1885
1886 fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1887 (any::<CommitteeContext>(), any::<Selector>())
1888 .prop_flat_map(|(context, account_selector)| {
1889 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1890 (
1891 any_with::<Storage<CurrentNetwork>>(context.clone()),
1892 Just(context),
1893 Just(account_selector.select(validators)),
1894 any::<Option<SocketAddr>>(),
1895 )
1896 .prop_map(|(a, b, c, d)| (a, b, c.private_key, GatewayAddress::Prod(d)))
1897 })
1898 .boxed()
1899 }
1900
1901 #[proptest]
1902 fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1903 let (storage, _, private_key, dev) = input;
1904 let account = Account::try_from(private_key).unwrap();
1905
1906 let gateway = Gateway::new(
1907 account.clone(),
1908 storage.clone(),
1909 storage.ledger().clone(),
1910 dev.ip(),
1911 &[],
1912 false,
1913 NodeDataDir::new_test(None),
1914 dev.port(),
1915 )
1916 .unwrap();
1917 let tcp_config = gateway.tcp().config();
1918 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1919 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1920
1921 let tcp_config = gateway.tcp().config();
1922 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size() * 10);
1923 assert_eq!(gateway.account().address(), account.address());
1924 }
1925
1926 #[proptest]
1927 fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1928 let (storage, _, private_key, dev) = input;
1929 let account = Account::try_from(private_key).unwrap();
1930
1931 let gateway = Gateway::new(
1932 account.clone(),
1933 storage.clone(),
1934 storage.ledger().clone(),
1935 dev.ip(),
1936 &[],
1937 false,
1938 NodeDataDir::new_test(None),
1939 dev.port(),
1940 )
1941 .unwrap();
1942 let tcp_config = gateway.tcp().config();
1943 if let Some(socket_addr) = dev.ip() {
1944 assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1945 assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1946 } else {
1947 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1948 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1949 }
1950
1951 let tcp_config = gateway.tcp().config();
1952 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size() * 10);
1953 assert_eq!(gateway.account().address(), account.address());
1954 }
1955
1956 #[proptest(async = "tokio")]
1957 async fn gateway_start(
1958 #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1959 #[strategy(0..MAX_WORKERS)] workers_count: u8,
1960 ) {
1961 let (storage, committee, private_key, dev) = input;
1962 let committee = committee.0;
1963 let worker_storage = storage.clone();
1964 let account = Account::try_from(private_key).unwrap();
1965
1966 let gateway = Gateway::new(
1967 account,
1968 storage.clone(),
1969 storage.ledger().clone(),
1970 dev.ip(),
1971 &[],
1972 false,
1973 NodeDataDir::new_test(None),
1974 dev.port(),
1975 )
1976 .unwrap();
1977
1978 let (primary_sender, _) = init_primary_channels();
1979
1980 let (workers, worker_senders) = {
1981 let mut tx_workers = IndexMap::new();
1983 let mut workers = IndexMap::new();
1984
1985 for id in 0..workers_count {
1987 let (tx_worker, rx_worker) = init_worker_channels();
1989 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1991 let worker =
1992 Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1993 .unwrap();
1994 worker.run(rx_worker);
1996
1997 workers.insert(id, worker);
1999 tx_workers.insert(id, tx_worker);
2000 }
2001 (workers, tx_workers)
2002 };
2003
2004 gateway.run(primary_sender, worker_senders, None).await;
2005 assert_eq!(
2006 gateway.local_ip(),
2007 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
2008 );
2009 assert_eq!(gateway.num_workers(), workers.len() as u8);
2010 }
2011
2012 #[proptest]
2013 fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
2014 let rng = &mut TestRng::default();
2015
2016 let current_round = 2;
2018 let committee_size = 4;
2019 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
2020 let (_, _, private_key, dev) = input;
2021 let account = Account::try_from(private_key).unwrap();
2022
2023 let mut certificates = IndexSet::new();
2025 for _ in 0..committee_size {
2026 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
2027 }
2028 let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
2029 let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
2031 for _ in 0..committee_size {
2033 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
2034 }
2035 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2037 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
2039 let gateway = Gateway::new(
2041 account.clone(),
2042 storage.clone(),
2043 ledger.clone(),
2044 dev.ip(),
2045 &[],
2046 false,
2047 NodeDataDir::new_test(None),
2048 dev.port(),
2049 )
2050 .unwrap();
2051 for certificate in certificates.iter() {
2053 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2054 }
2055 for i in 0..certificates.clone().len() {
2057 let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
2058 if i < committee_size {
2059 assert!(is_authorized);
2060 } else {
2061 assert!(!is_authorized);
2062 }
2063 }
2064 }
2065}