1#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19 CONTEXT,
20 MAX_BATCH_DELAY_IN_MS,
21 MEMORY_POOL_PORT,
22 Worker,
23 events::{DisconnectReason, EventCodec, PrimaryPing},
24 helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
25 spawn_blocking,
26};
27use smol_str::SmolStr;
28use snarkos_account::Account;
29use snarkos_node_bft_events::{
30 BlockRequest,
31 BlockResponse,
32 CertificateRequest,
33 CertificateResponse,
34 ChallengeRequest,
35 ChallengeResponse,
36 DataBlocks,
37 Event,
38 EventTrait,
39 TransmissionRequest,
40 TransmissionResponse,
41 ValidatorsRequest,
42 ValidatorsResponse,
43};
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_network::{
46 ConnectionMode,
47 NodeType,
48 Peer,
49 PeerPoolHandling,
50 Resolver,
51 bootstrap_peers,
52 get_repo_commit_hash,
53 log_repo_sha_comparison,
54 shorten_snarkos_sha,
55};
56use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
57use snarkos_node_tcp::{
58 Config,
59 ConnectError,
60 Connection,
61 ConnectionSide,
62 P2P,
63 Tcp,
64 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
65};
66use snarkos_utilities::NodeDataDir;
67use snarkvm::{
68 console::prelude::*,
69 ledger::{
70 committee::Committee,
71 narwhal::{BatchHeader, Data},
72 },
73 prelude::{Address, Field},
74 utilities::flatten_error,
75};
76
77use colored::Colorize;
78use futures::{SinkExt, future::join_all};
79use indexmap::IndexMap;
80#[cfg(feature = "locktick")]
81use locktick::parking_lot::{Mutex, RwLock};
82#[cfg(not(feature = "locktick"))]
83use parking_lot::{Mutex, RwLock};
84use rand::{
85 rngs::OsRng,
86 seq::{IteratorRandom, SliceRandom},
87};
88use std::{
89 collections::{HashMap, HashSet},
90 future::Future,
91 io,
92 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
93 sync::Arc,
94 time::Duration,
95};
96use tokio::{
97 net::TcpStream,
98 sync::{OnceCell, oneshot},
99 task::{self, JoinHandle},
100};
101use tokio_stream::StreamExt;
102use tokio_util::codec::Framed;
103
104const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; #[cfg(not(test))]
111const MAX_CONNECTION_ATTEMPTS: usize = 10;
112
113pub const MAX_VALIDATORS_TO_SEND: usize = 200;
115
116#[cfg(not(test))]
118const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
119
120const IP_BAN_TIME_IN_SECS: u64 = 300;
122
123#[async_trait]
126pub trait Transport<N: Network>: Send + Sync {
127 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
128 fn broadcast(&self, event: Event<N>);
129}
130
131#[derive(Clone)]
134pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
135
136impl<N: Network> Deref for Gateway<N> {
137 type Target = Arc<InnerGateway<N>>;
138
139 fn deref(&self) -> &Self::Target {
140 &self.0
141 }
142}
143
144pub struct InnerGateway<N: Network> {
145 account: Account<N>,
147 storage: Storage<N>,
149 ledger: Arc<dyn LedgerService<N>>,
151 tcp: Tcp,
153 cache: Cache<N>,
155 resolver: RwLock<Resolver<N>>,
157 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
159 #[cfg(feature = "telemetry")]
160 validator_telemetry: Telemetry<N>,
161 primary_sender: OnceCell<PrimarySender<N>>,
163 worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
165 sync_sender: OnceCell<SyncSender<N>>,
167 handles: Mutex<Vec<JoinHandle<()>>>,
169 node_data_dir: NodeDataDir,
171 trusted_peers_only: bool,
173 dev: Option<u16>,
175}
176
177impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
178 const MAXIMUM_POOL_SIZE: usize = 200;
179 const OWNER: &str = CONTEXT;
180 const PEER_SLASHING_COUNT: usize = 20;
181
182 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
183 &self.peer_pool
184 }
185
186 fn resolver(&self) -> &RwLock<Resolver<N>> {
187 &self.resolver
188 }
189
190 fn is_dev(&self) -> bool {
191 self.dev.is_some()
192 }
193
194 fn trusted_peers_only(&self) -> bool {
195 self.trusted_peers_only
196 }
197
198 fn node_type(&self) -> NodeType {
199 NodeType::Validator
200 }
201}
202
203impl<N: Network> Gateway<N> {
204 #[allow(clippy::too_many_arguments)]
206 pub fn new(
207 account: Account<N>,
208 storage: Storage<N>,
209 ledger: Arc<dyn LedgerService<N>>,
210 ip: Option<SocketAddr>,
211 trusted_validators: &[SocketAddr],
212 trusted_peers_only: bool,
213 node_data_dir: NodeDataDir,
214 dev: Option<u16>,
215 ) -> Result<Self> {
216 let ip = match (ip, dev) {
218 (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
219 (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
220 (Some(ip), _) => ip,
221 };
222 let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size() * 10));
230
231 let mut initial_peers = HashMap::new();
233
234 if !trusted_peers_only {
236 let cached_peers = Self::load_cached_peers(&node_data_dir.gateway_peer_cache_path())?;
237 for addr in cached_peers {
238 initial_peers.insert(addr, Peer::new_candidate(addr, false));
239 }
240 }
241
242 initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
245
246 Ok(Self(Arc::new(InnerGateway {
248 account,
249 storage,
250 ledger,
251 tcp,
252 cache: Default::default(),
253 resolver: Default::default(),
254 peer_pool: RwLock::new(initial_peers),
255 #[cfg(feature = "telemetry")]
256 validator_telemetry: Default::default(),
257 primary_sender: Default::default(),
258 worker_senders: Default::default(),
259 sync_sender: Default::default(),
260 handles: Default::default(),
261 node_data_dir,
262 trusted_peers_only,
263 dev,
264 })))
265 }
266
267 pub async fn run(
269 &self,
270 primary_sender: PrimarySender<N>,
271 worker_senders: IndexMap<u8, WorkerSender<N>>,
272 sync_sender: Option<SyncSender<N>>,
273 ) {
274 debug!("Starting the gateway for the memory pool...");
275
276 self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
278
279 self.worker_senders.set(worker_senders).expect("The worker senders are already set");
281
282 if let Some(sync_sender) = sync_sender {
284 self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
285 }
286
287 self.enable_handshake().await;
289 self.enable_reading().await;
290 self.enable_writing().await;
291 self.enable_disconnect().await;
292 self.enable_on_connect().await;
293
294 #[cfg(feature = "metrics")]
296 {
297 let gateway = self.clone();
298 self.spawn(async move {
299 loop {
300 tokio::time::sleep(Duration::from_secs(1)).await;
301 gateway.update_metrics();
302 }
303 });
304 }
305
306 let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
308 debug!("Listening for validator connections at address {listen_addr:?}");
309
310 self.initialize_heartbeat();
312
313 info!("Started the gateway for the memory pool at '{}'", self.local_ip());
314 }
315}
316
317impl<N: Network> Gateway<N> {
319 fn max_committee_size(&self) -> usize {
321 self.ledger
322 .current_committee()
323 .map_or_else(|_e| Committee::<N>::max_committee_size() as usize, |committee| committee.num_members())
324 }
325
326 fn max_cache_events(&self) -> usize {
328 self.max_cache_transmissions()
329 }
330
331 fn max_cache_certificates(&self) -> usize {
333 2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
334 }
335
336 fn max_cache_transmissions(&self) -> usize {
338 self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
339 }
340
341 fn max_cache_duplicates(&self) -> usize {
343 self.max_committee_size().pow(2)
344 }
345}
346
347#[async_trait]
348impl<N: Network> CommunicationService for Gateway<N> {
349 type Message = Event<N>;
351
352 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
354 debug_assert!(start_height < end_height, "Invalid block request format");
355 Event::BlockRequest(BlockRequest { start_height, end_height })
356 }
357
358 async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
364 Transport::send(self, peer_ip, message).await
365 }
366}
367
368impl<N: Network> Gateway<N> {
369 pub fn account(&self) -> &Account<N> {
371 &self.account
372 }
373
374 pub fn dev(&self) -> Option<u16> {
376 self.dev
377 }
378
379 pub fn resolver(&self) -> &RwLock<Resolver<N>> {
381 &self.resolver
382 }
383
384 pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
386 self.resolver.read().get_listener(*connected_addr)
387 }
388
389 #[cfg(feature = "telemetry")]
391 pub fn validator_telemetry(&self) -> &Telemetry<N> {
392 &self.validator_telemetry
393 }
394
395 pub fn primary_sender(&self) -> &PrimarySender<N> {
397 self.primary_sender.get().expect("Primary sender not set in gateway")
398 }
399
400 pub fn num_workers(&self) -> u8 {
402 u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
403 .expect("Too many workers")
404 }
405
406 pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
408 self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
409 }
410
411 pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
413 if self.trusted_peers().contains(&ip) {
415 return true;
416 }
417 match self.resolve_to_aleo_addr(ip) {
419 Some(address) => self.is_authorized_validator_address(address),
421 None => {
422 warn!("{CONTEXT} Could not resolve the Aleo address for '{ip}'");
423 false
424 }
425 }
426 }
427
428 pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
430 if self
439 .ledger
440 .get_committee_lookback_for_round(self.storage.current_round())
441 .is_ok_and(|committee| committee.is_committee_member(validator_address))
442 {
443 return true;
444 }
445
446 if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
448 return true;
449 }
450
451 let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
453 match self.ledger.get_block_round(previous_block_height) {
455 Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
456 self.ledger
457 .get_committee_lookback_for_round(round)
458 .is_ok_and(|committee| committee.is_committee_member(validator_address))
459 }),
460 Err(_) => false,
461 }
462 }
463
464 pub fn connected_addresses(&self) -> HashSet<Address<N>> {
466 self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect()
467 }
468
469 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
471 if self.is_local_ip(listener_addr) {
473 return Err(DisconnectReason::SelfConnect);
474 }
475
476 Ok(())
477 }
478
479 #[cfg(feature = "metrics")]
481 fn update_metrics(&self) {
482 metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64);
483 metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64);
484 }
485
486 #[cfg(test)]
488 pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
489 self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
491 self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
493 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
494 peer.upgrade_to_connected(
495 peer_addr,
496 peer_ip.port(),
497 address,
498 NodeType::Validator,
499 0,
500 get_repo_commit_hash(),
501 ConnectionMode::Gateway,
502 );
503 }
504 }
505
506 fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
512 let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
514 warn!("Unable to resolve the listener IP address '{peer_ip}'");
515 return None;
516 };
517 let name = event.name();
519 trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
521 let result = self.unicast(peer_addr, event);
522 if let Err(err) = &result {
524 warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {err:?}");
525 debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
526 self.disconnect(peer_ip);
527 }
528 result.ok()
529 }
530
531 async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
535 let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
537 trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
539 return Ok(false);
540 };
541 if !(self.is_authorized_validator_ip(peer_ip)
543 || self
544 .get_connected_peer(peer_ip)
545 .map(|peer| peer.node_type == NodeType::BootstrapClient)
546 .unwrap_or(false))
547 {
548 bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
549 }
550 let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
552 if num_events >= self.max_cache_events() {
553 bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
554 }
555 match event {
557 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
558 let certificate_id = match &event {
560 Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
561 Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
562 _ => unreachable!(),
563 };
564 let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
566 if num_events >= self.max_cache_duplicates() {
567 return Ok(true);
568 }
569 }
570 Event::TransmissionRequest(TransmissionRequest { transmission_id })
571 | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
572 let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
574 if num_events >= self.max_cache_duplicates() {
575 return Ok(true);
576 }
577 }
578 Event::BlockRequest(_) => {
579 let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
580 if num_events >= self.max_cache_duplicates() {
581 return Ok(true);
582 }
583 }
584 _ => {}
585 }
586 trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
587
588 match event {
591 Event::BatchPropose(batch_propose) => {
592 let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
594 Ok(true)
595 }
596 Event::BatchSignature(batch_signature) => {
597 let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
599 Ok(true)
600 }
601 Event::BatchCertified(batch_certified) => {
602 let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
604 Ok(true)
605 }
606 Event::BlockRequest(block_request) => {
607 let BlockRequest { start_height, end_height } = block_request;
608
609 if start_height >= end_height {
611 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
612 }
613 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
615 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
616 }
617
618 let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
620
621 let self_ = self.clone();
622 let blocks = match task::spawn_blocking(move || {
623 match self_.ledger.get_blocks(start_height..end_height) {
625 Ok(blocks) => Ok(DataBlocks(blocks)),
626 Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
627 }
628 })
629 .await
630 {
631 Ok(Ok(blocks)) => blocks,
632 Ok(Err(error)) => return Err(error),
633 Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
634 };
635
636 let self_ = self.clone();
637 tokio::spawn(async move {
638 let event =
640 Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
641 Transport::send(&self_, peer_ip, event).await;
642 });
643 Ok(true)
644 }
645 Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
646 if let Some(sync_sender) = self.sync_sender.get() {
648 if !self.cache.remove_outbound_block_request(peer_ip, &request) {
650 bail!("Unsolicited block response from '{peer_ip}'")
651 }
652
653 let (send, recv) = tokio::sync::oneshot::channel();
657 rayon::spawn_fifo(move || {
658 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
659 let _ = send.send(blocks);
660 });
661 let blocks = match recv.await {
662 Ok(Ok(blocks)) => blocks,
663 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
664 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
665 };
666
667 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
669 match sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await {
671 Ok(_) => Ok(true),
672 Err(err) if err.is_benign() => {
673 let err: anyhow::Error = err.into();
674 let err = err.context(format!("Ignoring block response from peer '{peer_ip}'"));
675 debug!("{}", flatten_error(err));
676 Ok(true)
677 }
678 Err(err) if err.is_invalid_consensus_version() => {
679 let err: anyhow::Error = err.into();
680 let err = err.context(format!("Peer sent an invalid block response '{peer_ip}'"));
681
682 let msg = flatten_error(&err);
683 error!("{msg}");
684 self.ip_ban_peer(peer_ip, Some(&msg));
685 Err(err)
686 }
687 Err(err) => {
688 let err: anyhow::Error = err.into();
689 let err = err.context(format!("Peer '{peer_ip}' sent an invalid block response"));
690 warn!("{}", flatten_error(err));
691
692 Ok(true)
694 }
695 }
696 } else {
697 debug!("Ignoring block response from '{peer_ip}' - no sync sender");
698 Ok(true)
699 }
700 }
701 Event::CertificateRequest(certificate_request) => {
702 if let Some(sync_sender) = self.sync_sender.get() {
705 let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
707 }
708 Ok(true)
709 }
710 Event::CertificateResponse(certificate_response) => {
711 if let Some(sync_sender) = self.sync_sender.get() {
714 let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
716 }
717 Ok(true)
718 }
719 Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
720 bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
722 }
723 Event::Disconnect(message) => {
724 debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
726 self.disconnect(peer_ip);
727 Ok(false)
728 }
729 Event::PrimaryPing(ping) => {
730 let PrimaryPing { version, block_locators, primary_certificate } = ping;
731
732 if version < Event::<N>::VERSION {
734 bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
735 }
736
737 debug!("Validator '{peer_ip}' is at height {}", block_locators.latest_locator_height());
739
740 if let Some(sync_sender) = self.sync_sender.get() {
742 if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
744 bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
745 }
746 }
747
748 let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
750 Ok(true)
751 }
752 Event::TransmissionRequest(request) => {
753 let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
756 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
757 return Ok(true);
758 };
759 if let Some(sender) = self.get_worker_sender(worker_id) {
761 let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
763 }
764 Ok(true)
765 }
766 Event::TransmissionResponse(response) => {
767 let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
769 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
770 return Ok(true);
771 };
772 if let Some(sender) = self.get_worker_sender(worker_id) {
774 let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
776 }
777 Ok(true)
778 }
779 Event::ValidatorsRequest(_) => {
780 let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
781 connected_peers.shuffle(&mut rand::thread_rng());
782
783 let self_ = self.clone();
784 tokio::spawn(async move {
785 let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
787 for validator in connected_peers.into_iter() {
789 validators.insert(validator.listener_addr, validator.aleo_addr);
791 }
792 let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
794 Transport::send(&self_, peer_ip, event).await;
795 });
796 Ok(true)
797 }
798 Event::ValidatorsResponse(response) => {
799 if self.trusted_peers_only {
800 bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
801 }
802 let ValidatorsResponse { validators } = response;
803 ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
805 if !self.cache.contains_outbound_validators_request(peer_ip) {
807 bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
808 }
809 self.cache.decrement_outbound_validators_requests(peer_ip);
811
812 let valid_addrs = validators
815 .into_iter()
816 .filter_map(|(listener_addr, aleo_addr)| {
817 (self.account.address() != aleo_addr
818 && !self.is_connected_address(aleo_addr)
819 && self.is_authorized_validator_address(aleo_addr))
820 .then_some((listener_addr, None))
821 })
822 .collect::<Vec<_>>();
823 if !valid_addrs.is_empty() {
824 self.insert_candidate_peers(valid_addrs);
825 }
826
827 Ok(true)
828 }
829 Event::WorkerPing(ping) => {
830 ensure!(
832 ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
833 "{CONTEXT} Received too many transmissions"
834 );
835 let num_workers = self.num_workers();
837 for transmission_id in ping.transmission_ids.into_iter() {
839 let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
841 warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
842 continue;
843 };
844 if let Some(sender) = self.get_worker_sender(worker_id) {
846 let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
848 }
849 }
850 Ok(true)
851 }
852 }
853 }
854
855 fn initialize_heartbeat(&self) {
857 let self_clone = self.clone();
858 self.spawn(async move {
859 tokio::time::sleep(Duration::from_millis(1000)).await;
861 info!("Starting the heartbeat of the gateway...");
862 loop {
863 self_clone.heartbeat().await;
865 tokio::time::sleep(Duration::from_secs(15)).await;
867 }
868 });
869 }
870
871 #[allow(dead_code)]
873 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
874 self.handles.lock().push(tokio::spawn(future));
875 }
876
877 pub async fn shut_down(&self) {
879 info!("Shutting down the gateway...");
880 if let Err(e) = self.save_best_peers(&self.node_data_dir.gateway_peer_cache_path(), None, true) {
882 warn!("Failed to persist best validators to disk: {e}");
883 }
884 self.handles.lock().iter().for_each(|handle| handle.abort());
886 self.tcp.shut_down().await;
888 }
889}
890
891impl<N: Network> Gateway<N> {
892 const MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS: Duration = Duration::from_secs(10);
894 const MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD: Duration = Duration::from_secs(60);
896
897 async fn heartbeat(&self) {
899 self.log_connected_validators();
901 #[cfg(feature = "telemetry")]
903 self.log_participation_scores();
904 self.handle_trusted_validators();
906 self.handle_bootstrap_peers().await;
908 self.handle_unauthorized_validators();
910 self.handle_min_connected_validators().await;
912 self.handle_banned_ips();
914 self.update_validator_whitelist();
916 }
917
918 fn log_connected_validators(&self) {
920 let connected_validators = self.filter_connected_peers(|peer| peer.node_type == NodeType::Validator);
923
924 let committee = match self.ledger.current_committee() {
925 Ok(c) => c,
926 Err(err) => {
927 error!("Failed to get current committee: {err}");
928 return;
929 }
930 };
931
932 let validators_total = committee.num_members().saturating_sub(1);
934 let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
936 let connections_msg = match connected_validators.len() {
938 0 => "No connected validators".to_string(),
939 num_connected => format!("Connected to {num_connected} validators {total_validators}"),
940 };
941 info!("{connections_msg}");
942
943 let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
945 let mut connected_validator_shas: HashMap<SmolStr, u64> = HashMap::with_capacity(connected_validators.len());
946 let our_sha = shorten_snarkos_sha(&get_repo_commit_hash());
948 let our_stake = committee.get_stake(self.account.address());
949 connected_validator_shas.insert(our_sha.clone(), our_stake);
950 connected_validator_addresses.insert(self.account.address());
952 for peer in &connected_validators {
954 let address = peer.aleo_addr;
956 connected_validator_addresses.insert(address);
957 let address_stake = committee.get_stake(address);
959 let short_peer_sha = shorten_snarkos_sha(&peer.snarkos_sha);
960 *connected_validator_shas.entry(short_peer_sha.clone()).or_default() += address_stake;
961
962 debug!(
963 "{}",
964 format!(
965 " Connected to: {} - {} (connection age {:?})",
966 peer.listener_addr,
967 peer.aleo_addr,
968 peer.first_seen.elapsed()
969 )
970 .dimmed()
971 );
972 }
973
974 if let Some(combined_stake) = connected_validator_shas.get(&our_sha) {
976 let percentage = *combined_stake as f64 / committee.total_stake() as f64 * 100.0;
977 debug!("{}", format!(" Combined stake @ {our_sha}: {percentage:.2}%").dimmed());
978 #[cfg(feature = "metrics")]
979 metrics::gauge(metrics::bft::CONNECTED_STAKE_WITH_MATCHING_SHA, percentage);
980 }
981
982 let num_not_connected = validators_total.saturating_sub(connected_validators.len());
984 if num_not_connected > 0 && self.tcp().uptime() > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
985 let total_stake = committee.total_stake();
987 let total_stake_f64 = total_stake as f64;
988
989 let committee_members: HashSet<_> =
991 self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
992
993 let not_connected_stake: u64 = committee_members
994 .difference(&connected_validator_addresses)
995 .map(|address| {
996 let address_stake = committee.get_stake(*address);
997 let address_stake_as_percentage =
998 if total_stake == 0 { 0.0 } else { address_stake as f64 / total_stake_f64 * 100.0 };
999 debug!(
1000 "{}",
1001 format!(" Not connected to {address} ({address_stake_as_percentage:.2}% of total stake)")
1002 .dimmed()
1003 );
1004 address_stake
1005 })
1006 .sum();
1007
1008 let not_connected_stake_as_percentage =
1009 if total_stake == 0 { 0.0 } else { not_connected_stake as f64 / total_stake_f64 * 100.0 };
1010 warn!(
1011 "Not connected to {num_not_connected} validators {total_validators} ({not_connected_stake_as_percentage:.2}% of total stake not connected)"
1012 );
1013 #[cfg(feature = "metrics")]
1014 {
1015 let connected_stake_as_percentage = 100.0 - not_connected_stake_as_percentage;
1016 metrics::gauge(metrics::bft::CONNECTED_STAKE, connected_stake_as_percentage);
1017 }
1018 } else {
1019 #[cfg(feature = "metrics")]
1020 metrics::gauge(metrics::bft::CONNECTED_STAKE, 100.0);
1021 };
1022
1023 if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
1024 if self.tcp().uptime() > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
1026 error!("Not connected to a quorum of validators");
1027 } else {
1028 debug!("Not connected to a quorum of validators");
1029 }
1030 }
1031 }
1032
1033 #[cfg(feature = "telemetry")]
1035 fn log_participation_scores(&self) {
1036 if let Ok(current_committee) = self.ledger.current_committee() {
1037 let participation_scores = self.validator_telemetry().get_participation_scores(¤t_committee);
1039 debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
1041 for (address, score) in participation_scores {
1042 debug!("{}", format!(" {address} - {score:.2}%").dimmed());
1043 }
1044 }
1045 }
1046
1047 fn handle_trusted_validators(&self) {
1049 let trusted_peers = self.trusted_peers();
1050
1051 let handles: Vec<JoinHandle<_>> = trusted_peers
1053 .iter()
1054 .filter_map(|validator_ip| {
1055 match self.connect(*validator_ip) {
1057 Ok(hdl) => Some(hdl),
1058 Err(ConnectError::SelfConnect { .. })
1059 | Err(ConnectError::AlreadyConnected { .. })
1060 | Err(ConnectError::AlreadyConnecting { .. }) => None,
1061 Err(err) => {
1062 warn!("Could not initiate connection to trusted validator at '{validator_ip}' - {err}");
1063 None
1064 }
1065 }
1066 })
1067 .collect();
1068
1069 if !handles.is_empty() {
1070 info!("Reconnnecting to {} out of {} trusted validators", handles.len(), trusted_peers.len());
1071 }
1072 }
1073
1074 async fn handle_bootstrap_peers(&self) {
1076 if self.trusted_peers_only {
1078 return;
1079 }
1080 let mut candidate_bootstrap = Vec::new();
1082 let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
1083 for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
1084 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
1085 candidate_bootstrap.push(bootstrap_ip);
1086 }
1087 }
1088 if connected_bootstrap.is_empty() {
1090 let rng = &mut OsRng;
1092 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
1094 match self.connect(peer_ip) {
1095 Ok(hdl) => {
1096 let result = hdl.await;
1097 if let Err(err) = result {
1098 warn!("{CONTEXT} Failed to connect to bootstrap peer at '{peer_ip}' - {err}");
1099 }
1100 }
1101 Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => {}
1102 Err(err) => {
1103 warn!("{CONTEXT} Could not initiate connection to bootstrap peer at '{peer_ip}' - {err}")
1104 }
1105 }
1106 }
1107 }
1108 let num_surplus = connected_bootstrap.len().saturating_sub(1);
1110 if num_surplus > 0 {
1111 let rng = &mut OsRng;
1113 for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
1115 info!("{CONTEXT} Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
1116 <Self as Transport<N>>::send(
1117 self,
1118 peer.listener_addr,
1119 Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
1120 )
1121 .await;
1122 self.disconnect(peer.listener_addr);
1124 }
1125 }
1126 }
1127
1128 fn handle_unauthorized_validators(&self) {
1130 let self_ = self.clone();
1131 tokio::spawn(async move {
1132 let validators = self_.get_connected_peers();
1134 for peer in validators {
1136 if peer.node_type == NodeType::BootstrapClient {
1138 continue;
1139 }
1140 if !self_.is_authorized_validator_ip(peer.listener_addr) {
1142 warn!(
1143 "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1144 peer.listener_addr
1145 );
1146 Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1147 self_.disconnect(peer.listener_addr);
1149 }
1150 }
1151 });
1152 }
1153
1154 async fn handle_min_connected_validators(&self) {
1158 let trusted_validators = self.trusted_peers();
1161 if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES() as usize {
1162 let (addrs, handles): (Vec<_>, Vec<_>) = self
1163 .get_candidate_peers()
1164 .iter()
1165 .filter_map(|peer| {
1166 if trusted_validators.contains(&peer.listener_addr) {
1167 return None;
1168 }
1169
1170 if let Some(previous_attempt) = peer.last_connection_attempt
1171 && previous_attempt.elapsed() < Self::MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS
1172 {
1173 return None;
1174 }
1175
1176 match self.connect(peer.listener_addr) {
1177 Ok(hdl) => Some((peer.listener_addr, hdl)),
1178 Err(ConnectError::AlreadyConnected { .. })
1179 | Err(ConnectError::AlreadyConnecting { .. })
1180 | Err(ConnectError::SelfConnect { .. }) => None,
1181 Err(err) => {
1182 warn!(
1183 "{CONTEXT} Could not initiate connection to validator at '{}' - {err}",
1184 peer.listener_addr
1185 );
1186 None
1187 }
1188 }
1189 })
1190 .unzip();
1191
1192 for (addr, result) in addrs.into_iter().zip(join_all(handles).await) {
1193 if let Err(err) = result {
1194 warn!("{CONTEXT} Failed to connect to validator at '{addr}' - {err}");
1195 }
1196 }
1197
1198 let validators = self.connected_peers();
1200 if validators.is_empty() {
1202 return;
1203 }
1204 if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1206 let self_ = self.clone();
1207 tokio::spawn(async move {
1208 self_.cache.increment_outbound_validators_requests(validator_ip);
1210 let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1212 });
1213 }
1214 }
1215 }
1216
1217 async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1219 if let Err(error) = self.inbound(peer_addr, message).await
1221 && let Some(peer_ip) = self.resolver.read().get_listener(peer_addr)
1222 {
1223 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1224 let self_ = self.clone();
1225 tokio::spawn(async move {
1226 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1227 self_.disconnect(peer_ip);
1229 });
1230 }
1231 }
1232
1233 fn handle_banned_ips(&self) {
1235 self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1236 }
1237
1238 fn update_validator_whitelist(&self) {
1240 if let Err(err) =
1241 self.save_best_peers(&self.node_data_dir.validator_whitelist_path(), Some(MAX_VALIDATORS_TO_SEND), false)
1242 {
1243 warn!("{CONTEXT} Could not update the validator whitelist: {err}");
1244 }
1245 }
1246}
1247
1248#[async_trait]
1249impl<N: Network> Transport<N> for Gateway<N> {
1250 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1258 macro_rules! send {
1259 ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1260 while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1262 tokio::time::sleep(Duration::from_millis(10)).await;
1264 }
1265 $self.send_inner(peer_ip, event)
1267 }};
1268 }
1269
1270 match event {
1272 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1273 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1275 send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1277 }
1278 Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1279 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1281 send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1283 }
1284 Event::BlockRequest(request) => {
1285 self.cache.insert_outbound_block_request(peer_ip, request);
1287 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1289 }
1290 _ => {
1291 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1293 }
1294 }
1295 }
1296
1297 fn broadcast(&self, event: Event<N>) {
1301 if self.number_of_connected_peers() > 0 {
1303 let self_ = self.clone();
1304 let connected_peers = self.connected_peers();
1305 tokio::spawn(async move {
1306 for peer_ip in connected_peers {
1308 let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1310 }
1311 });
1312 }
1313 }
1314}
1315
1316impl<N: Network> P2P for Gateway<N> {
1317 fn tcp(&self) -> &Tcp {
1319 &self.tcp
1320 }
1321}
1322
1323#[async_trait]
1324impl<N: Network> Reading for Gateway<N> {
1325 type Codec = EventCodec<N>;
1326 type Message = Event<N>;
1327
1328 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1331 Default::default()
1332 }
1333
1334 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1336 if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1337 let self_ = self.clone();
1338 tokio::spawn(async move {
1341 self_.process_message_inner(peer_addr, message).await;
1342 });
1343 } else {
1344 self.process_message_inner(peer_addr, message).await;
1345 }
1346 Ok(())
1347 }
1348
1349 fn message_queue_depth(&self) -> usize {
1352 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1353 * N::LATEST_MAX_CERTIFICATES() as usize
1354 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1355 }
1356}
1357
1358#[async_trait]
1359impl<N: Network> Writing for Gateway<N> {
1360 type Codec = EventCodec<N>;
1361 type Message = Event<N>;
1362
1363 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1366 Default::default()
1367 }
1368
1369 fn message_queue_depth(&self) -> usize {
1373 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1374 * N::LATEST_MAX_CERTIFICATES() as usize
1375 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1376 }
1377}
1378
1379#[async_trait]
1380impl<N: Network> Disconnect for Gateway<N> {
1381 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1383 if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1384 self.downgrade_peer_to_candidate(peer_ip);
1385 if let Some(sync_sender) = self.sync_sender.get() {
1387 let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
1388 tokio::spawn(async move {
1389 if let Err(err) = tx_block_sync_remove_peer_.send(peer_ip).await {
1390 warn!("{CONTEXT} Unable to remove '{peer_ip}' from the sync module - {err}");
1391 }
1392 });
1393 }
1394 self.cache.clear_outbound_validators_requests(peer_ip);
1398 self.cache.clear_outbound_block_requests(peer_ip);
1399 }
1400 }
1401}
1402
1403#[async_trait]
1404impl<N: Network> OnConnect for Gateway<N> {
1405 async fn on_connect(&self, peer_addr: SocketAddr) {
1406 if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1407 if let Some(peer) = self.get_connected_peer(listener_addr) {
1408 if peer.node_type == NodeType::BootstrapClient {
1409 self.cache.increment_outbound_validators_requests(listener_addr);
1410 let _ =
1411 <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1412 .await;
1413 }
1414 }
1415 }
1416 }
1417}
1418
1419#[async_trait]
1420impl<N: Network> Handshake for Gateway<N> {
1421 async fn perform_handshake(&self, mut connection: Connection) -> Result<Connection, ConnectError> {
1423 let peer_addr = connection.addr();
1425 let peer_side = connection.side();
1426
1427 #[cfg(not(test))]
1429 if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1430 if self.is_ip_banned(peer_addr.ip()) {
1432 trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1433 return Err(ConnectError::BannedIp { ip: peer_addr.ip() });
1434 }
1435
1436 let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1437
1438 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1439 if num_attempts > MAX_CONNECTION_ATTEMPTS {
1440 self.update_ip_ban(peer_addr.ip());
1441 trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1442 return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
1443 }
1444 }
1445
1446 let stream = self.borrow_stream(&mut connection);
1447
1448 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1451 debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1452 None
1453 } else {
1454 debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1455 Some(peer_addr)
1456 };
1457
1458 let restrictions_id = self.ledger.latest_restrictions_id();
1460
1461 let handshake_result = if peer_side == ConnectionSide::Responder {
1463 self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1464 } else {
1465 self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1466 };
1467
1468 if let Some(addr) = listener_addr {
1469 match handshake_result {
1470 Ok(ref cr) => {
1471 let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1472 NodeType::BootstrapClient
1473 } else {
1474 NodeType::Validator
1475 };
1476
1477 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1478 self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1479 peer.upgrade_to_connected(
1480 peer_addr,
1481 cr.listener_port,
1482 cr.address,
1483 node_type,
1484 cr.version,
1485 cr.snarkos_sha,
1486 ConnectionMode::Gateway,
1487 );
1488 }
1489 info!("{CONTEXT} Connected to '{addr}'");
1490 }
1491 Err(error) => {
1492 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1493 if peer.is_connecting() {
1495 peer.downgrade_to_candidate(addr);
1496 }
1497 }
1498 return Err(error);
1499 }
1500 }
1501 }
1502
1503 Ok(connection)
1504 }
1505}
1506
1507macro_rules! expect_event {
1509 ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1510 match $framed.try_next().await? {
1511 Some($event_ty(data)) => {
1513 trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1514 data
1515 }
1516 Some(Event::Disconnect($crate::events::Disconnect { reason })) => {
1518 return Err(ConnectError::other(format!("'{}' disconnected with reason \"{reason}\"", $peer_addr)));
1519 }
1520 Some(ty) => {
1522 return Err(ConnectError::other(format!(
1523 "'{}' did not follow the handshake protocol: received {:?} instead of {}",
1524 $peer_addr,
1525 ty.name(),
1526 stringify!($msg_ty),
1527 )));
1528 }
1529 None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
1531 }
1532 };
1533}
1534
1535async fn send_event<N: Network>(
1537 framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1538 peer_addr: SocketAddr,
1539 event: Event<N>,
1540) -> io::Result<()> {
1541 trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1542 framed.send(event).await
1543}
1544
1545impl<N: Network> Gateway<N> {
1546 async fn handshake_inner_initiator<'a>(
1548 &'a self,
1549 peer_addr: SocketAddr,
1550 restrictions_id: Field<N>,
1551 stream: &'a mut TcpStream,
1552 ) -> Result<ChallengeRequest<N>, ConnectError> {
1553 self.add_connecting_peer(peer_addr)?;
1555
1556 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1558
1559 let rng = &mut rand::rngs::OsRng;
1561
1562 let our_nonce = rng.r#gen();
1566 let current_block_height = self.ledger.latest_block_height();
1568 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1569 let snarkos_sha = match (self.is_dev(), consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1570 (true, _, Some(sha)) => Some(sha),
1571 (_, true, Some(sha)) => Some(sha),
1572 _ => None,
1573 };
1574 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1576 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1577
1578 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1582 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1584
1585 if let Some(reason) = self
1587 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1588 .await
1589 {
1590 send_event(&mut framed, peer_addr, reason.into()).await?;
1591 return Err(ConnectError::application(reason));
1592 }
1593
1594 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1596 send_event(&mut framed, peer_addr, reason.into()).await?;
1597 return Err(reason.into_connect_error(peer_addr));
1598 }
1599
1600 let response_nonce: u64 = rng.r#gen();
1604 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1605 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1606 return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
1607 };
1608 let our_response =
1610 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1611 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1612
1613 Ok(peer_request)
1614 }
1615
1616 async fn handshake_inner_responder<'a>(
1618 &'a self,
1619 peer_addr: SocketAddr,
1620 peer_ip: &mut Option<SocketAddr>,
1621 restrictions_id: Field<N>,
1622 stream: &'a mut TcpStream,
1623 ) -> Result<ChallengeRequest<N>, ConnectError> {
1624 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1626
1627 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1631
1632 if self.account.address() == peer_request.address {
1634 return Err(ConnectError::SelfConnect { address: peer_addr });
1635 }
1636
1637 *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1639 let peer_ip = peer_ip.unwrap();
1640
1641 if let Err(reason) = self.ensure_peer_is_allowed(peer_ip) {
1643 send_event(&mut framed, peer_addr, reason.into()).await?;
1644 return Err(reason.into_connect_error(peer_addr));
1645 }
1646
1647 self.add_connecting_peer(peer_ip)?;
1649
1650 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1652 send_event(&mut framed, peer_addr, reason.into()).await?;
1653 return Err(reason.into_connect_error(peer_addr));
1654 }
1655
1656 let rng = &mut rand::rngs::OsRng;
1660
1661 let response_nonce: u64 = rng.r#gen();
1663 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1664 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1665 return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
1666 };
1667 let our_response =
1669 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1670 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1671
1672 let our_nonce = rng.r#gen();
1674 let current_block_height = self.ledger.latest_block_height();
1676 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1677 let snarkos_sha = match (self.is_dev(), consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1678 (true, _, Some(sha)) => Some(sha),
1679 (_, true, Some(sha)) => Some(sha),
1680 _ => None,
1681 };
1682 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1684 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1685
1686 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1690 if let Some(reason) = self
1692 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1693 .await
1694 {
1695 send_event(&mut framed, peer_addr, reason.into()).await?;
1696 Err(reason.into_connect_error(peer_addr))
1697 } else {
1698 Ok(peer_request)
1699 }
1700 }
1701
1702 #[must_use]
1704 fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1705 let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
1707 log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
1708
1709 let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1710
1711 if version < Event::<N>::VERSION {
1713 return Some(DisconnectReason::OutdatedClientVersion);
1714 }
1715 if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1717 warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1718 return Some(DisconnectReason::NoExternalPeersAllowed);
1719 }
1720 if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1721 if !self.is_authorized_validator_address(address) {
1723 return Some(DisconnectReason::UnauthorizedValidator);
1724 }
1725 }
1726
1727 if self.is_connected_address(address) {
1729 return Some(DisconnectReason::AlreadyConnectedToAleoAddress);
1730 }
1731
1732 None
1733 }
1734
1735 #[must_use]
1737 async fn verify_challenge_response(
1738 &self,
1739 peer_addr: SocketAddr,
1740 peer_address: Address<N>,
1741 response: ChallengeResponse<N>,
1742 expected_restrictions_id: Field<N>,
1743 expected_nonce: u64,
1744 ) -> Option<DisconnectReason> {
1745 let ChallengeResponse { restrictions_id, signature, nonce } = response;
1747
1748 if restrictions_id != expected_restrictions_id {
1750 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1751 return Some(DisconnectReason::InvalidChallengeResponse);
1752 }
1753 let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1755 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1756 return Some(DisconnectReason::InvalidChallengeResponse);
1757 };
1758 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1760 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1761 return Some(DisconnectReason::InvalidChallengeResponse);
1762 }
1763 None
1764 }
1765}
1766
1767#[cfg(test)]
1768mod prop_tests {
1769 use crate::{
1770 Gateway,
1771 MAX_WORKERS,
1772 MEMORY_POOL_PORT,
1773 Worker,
1774 helpers::{Storage, init_primary_channels, init_worker_channels},
1775 };
1776
1777 use snarkos_account::Account;
1778 use snarkos_node_bft_ledger_service::MockLedgerService;
1779 use snarkos_node_bft_storage_service::BFTMemoryService;
1780 use snarkos_node_network::PeerPoolHandling;
1781 use snarkos_node_tcp::P2P;
1782 use snarkos_utilities::NodeDataDir;
1783
1784 use snarkvm::{
1785 ledger::{
1786 committee::{
1787 Committee,
1788 prop_tests::{CommitteeContext, ValidatorSet},
1789 test_helpers::sample_committee_for_round_and_members,
1790 },
1791 narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1792 },
1793 prelude::{MainnetV0, PrivateKey},
1794 utilities::TestRng,
1795 };
1796
1797 use indexmap::{IndexMap, IndexSet};
1798 use proptest::{
1799 prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1800 sample::Selector,
1801 };
1802 use std::{
1803 fmt::{Debug, Formatter},
1804 net::{IpAddr, Ipv4Addr, SocketAddr},
1805 sync::Arc,
1806 };
1807 use test_strategy::proptest;
1808
1809 type CurrentNetwork = MainnetV0;
1810
1811 impl Debug for Gateway<CurrentNetwork> {
1812 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1813 f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1815 }
1816 }
1817
1818 #[derive(Debug, test_strategy::Arbitrary)]
1819 enum GatewayAddress {
1820 Dev(u8),
1821 Prod(Option<SocketAddr>),
1822 }
1823
1824 impl GatewayAddress {
1825 fn ip(&self) -> Option<SocketAddr> {
1826 if let GatewayAddress::Prod(ip) = self {
1827 return *ip;
1828 }
1829 None
1830 }
1831
1832 fn port(&self) -> Option<u16> {
1833 if let GatewayAddress::Dev(port) = self {
1834 return Some(*port as u16);
1835 }
1836 None
1837 }
1838 }
1839
1840 impl Arbitrary for Gateway<CurrentNetwork> {
1841 type Parameters = ();
1842 type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1843
1844 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1845 any_valid_dev_gateway()
1846 .prop_map(|(storage, _, private_key, address)| {
1847 Gateway::new(
1848 Account::try_from(private_key).unwrap(),
1849 storage.clone(),
1850 storage.ledger().clone(),
1851 address.ip(),
1852 &[],
1853 false,
1854 NodeDataDir::new_test(None),
1855 address.port(),
1856 )
1857 .unwrap()
1858 })
1859 .boxed()
1860 }
1861 }
1862
1863 type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1864
1865 fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1866 (any::<CommitteeContext>(), any::<Selector>())
1867 .prop_flat_map(|(context, account_selector)| {
1868 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1869 (
1870 any_with::<Storage<CurrentNetwork>>(context.clone()),
1871 Just(context),
1872 Just(account_selector.select(validators)),
1873 0u8..,
1874 )
1875 .prop_map(|(a, b, c, d)| (a, b, c.private_key, GatewayAddress::Dev(d)))
1876 })
1877 .boxed()
1878 }
1879
1880 fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1881 (any::<CommitteeContext>(), any::<Selector>())
1882 .prop_flat_map(|(context, account_selector)| {
1883 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1884 (
1885 any_with::<Storage<CurrentNetwork>>(context.clone()),
1886 Just(context),
1887 Just(account_selector.select(validators)),
1888 any::<Option<SocketAddr>>(),
1889 )
1890 .prop_map(|(a, b, c, d)| (a, b, c.private_key, GatewayAddress::Prod(d)))
1891 })
1892 .boxed()
1893 }
1894
1895 #[proptest]
1896 fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1897 let (storage, _, private_key, dev) = input;
1898 let account = Account::try_from(private_key).unwrap();
1899
1900 let gateway = Gateway::new(
1901 account.clone(),
1902 storage.clone(),
1903 storage.ledger().clone(),
1904 dev.ip(),
1905 &[],
1906 false,
1907 NodeDataDir::new_test(None),
1908 dev.port(),
1909 )
1910 .unwrap();
1911 let tcp_config = gateway.tcp().config();
1912 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1913 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1914
1915 let tcp_config = gateway.tcp().config();
1916 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size() * 10);
1917 assert_eq!(gateway.account().address(), account.address());
1918 }
1919
1920 #[proptest]
1921 fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1922 let (storage, _, private_key, dev) = input;
1923 let account = Account::try_from(private_key).unwrap();
1924
1925 let gateway = Gateway::new(
1926 account.clone(),
1927 storage.clone(),
1928 storage.ledger().clone(),
1929 dev.ip(),
1930 &[],
1931 false,
1932 NodeDataDir::new_test(None),
1933 dev.port(),
1934 )
1935 .unwrap();
1936 let tcp_config = gateway.tcp().config();
1937 if let Some(socket_addr) = dev.ip() {
1938 assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1939 assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1940 } else {
1941 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1942 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1943 }
1944
1945 let tcp_config = gateway.tcp().config();
1946 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size() * 10);
1947 assert_eq!(gateway.account().address(), account.address());
1948 }
1949
1950 #[proptest(async = "tokio")]
1951 async fn gateway_start(
1952 #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1953 #[strategy(0..MAX_WORKERS)] workers_count: u8,
1954 ) {
1955 let (storage, committee, private_key, dev) = input;
1956 let committee = committee.0;
1957 let worker_storage = storage.clone();
1958 let account = Account::try_from(private_key).unwrap();
1959
1960 let gateway = Gateway::new(
1961 account,
1962 storage.clone(),
1963 storage.ledger().clone(),
1964 dev.ip(),
1965 &[],
1966 false,
1967 NodeDataDir::new_test(None),
1968 dev.port(),
1969 )
1970 .unwrap();
1971
1972 let (primary_sender, _) = init_primary_channels();
1973
1974 let (workers, worker_senders) = {
1975 let mut tx_workers = IndexMap::new();
1977 let mut workers = IndexMap::new();
1978
1979 for id in 0..workers_count {
1981 let (tx_worker, rx_worker) = init_worker_channels();
1983 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1985 let worker =
1986 Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1987 .unwrap();
1988 worker.run(rx_worker);
1990
1991 workers.insert(id, worker);
1993 tx_workers.insert(id, tx_worker);
1994 }
1995 (workers, tx_workers)
1996 };
1997
1998 gateway.run(primary_sender, worker_senders, None).await;
1999 assert_eq!(
2000 gateway.local_ip(),
2001 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
2002 );
2003 assert_eq!(gateway.num_workers(), workers.len() as u8);
2004 }
2005
2006 #[proptest]
2007 fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
2008 let rng = &mut TestRng::default();
2009
2010 let current_round = 2;
2012 let committee_size = 4;
2013 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
2014 let (_, _, private_key, dev) = input;
2015 let account = Account::try_from(private_key).unwrap();
2016
2017 let mut certificates = IndexSet::new();
2019 for _ in 0..committee_size {
2020 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
2021 }
2022 let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
2023 let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
2025 for _ in 0..committee_size {
2027 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
2028 }
2029 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2031 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2033 let gateway = Gateway::new(
2035 account.clone(),
2036 storage.clone(),
2037 ledger.clone(),
2038 dev.ip(),
2039 &[],
2040 false,
2041 NodeDataDir::new_test(None),
2042 dev.port(),
2043 )
2044 .unwrap();
2045 for certificate in certificates.iter() {
2047 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2048 }
2049 for i in 0..certificates.clone().len() {
2051 let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
2052 if i < committee_size {
2053 assert!(is_authorized);
2054 } else {
2055 assert!(!is_authorized);
2056 }
2057 }
2058 }
2059}