1#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19 CONTEXT,
20 MAX_BATCH_DELAY_IN_MS,
21 MEMORY_POOL_PORT,
22 Worker,
23 events::{EventCodec, PrimaryPing},
24 helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
25 spawn_blocking,
26};
27use aleo_std::StorageMode;
28use snarkos_account::Account;
29use snarkos_node_bft_events::{
30 BlockRequest,
31 BlockResponse,
32 CertificateRequest,
33 CertificateResponse,
34 ChallengeRequest,
35 ChallengeResponse,
36 DataBlocks,
37 DisconnectReason,
38 Event,
39 EventTrait,
40 TransmissionRequest,
41 TransmissionResponse,
42 ValidatorsRequest,
43 ValidatorsResponse,
44};
45use snarkos_node_bft_ledger_service::LedgerService;
46use snarkos_node_network::{
47 ConnectionMode,
48 NodeType,
49 Peer,
50 PeerPoolHandling,
51 Resolver,
52 bootstrap_peers,
53 built_info,
54 log_repo_sha_comparison,
55};
56use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
57use snarkos_node_tcp::{
58 Config,
59 Connection,
60 ConnectionSide,
61 P2P,
62 Tcp,
63 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
64};
65use snarkvm::{
66 console::prelude::*,
67 ledger::{
68 committee::Committee,
69 narwhal::{BatchHeader, Data},
70 },
71 prelude::{Address, Field},
72};
73
74use colored::Colorize;
75use futures::SinkExt;
76use indexmap::IndexMap;
77#[cfg(feature = "locktick")]
78use locktick::parking_lot::{Mutex, RwLock};
79#[cfg(not(feature = "locktick"))]
80use parking_lot::{Mutex, RwLock};
81use rand::{
82 rngs::OsRng,
83 seq::{IteratorRandom, SliceRandom},
84};
85use std::{
86 collections::{HashMap, HashSet},
87 future::Future,
88 io,
89 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
90 sync::Arc,
91 time::Duration,
92};
93use tokio::{
94 net::TcpStream,
95 sync::{OnceCell, oneshot},
96 task::{self, JoinHandle},
97};
98use tokio_stream::StreamExt;
99use tokio_util::codec::Framed;
100
101const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const MAX_CONNECTION_ATTEMPTS: usize = 10;
108const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; pub const MAX_VALIDATORS_TO_SEND: usize = 200;
113
114#[cfg(not(any(test)))]
116const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
117const IP_BAN_TIME_IN_SECS: u64 = 300;
119
120const VALIDATOR_CACHE_FILENAME: &str = "cached_gateway_peers";
122
123#[async_trait]
126pub trait Transport<N: Network>: Send + Sync {
127 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
128 fn broadcast(&self, event: Event<N>);
129}
130
131#[derive(Clone)]
134pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
135
136impl<N: Network> Deref for Gateway<N> {
137 type Target = Arc<InnerGateway<N>>;
138
139 fn deref(&self) -> &Self::Target {
140 &self.0
141 }
142}
143
144pub struct InnerGateway<N: Network> {
145 account: Account<N>,
147 storage: Storage<N>,
149 ledger: Arc<dyn LedgerService<N>>,
151 tcp: Tcp,
153 cache: Cache<N>,
155 resolver: RwLock<Resolver<N>>,
157 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
159 #[cfg(feature = "telemetry")]
160 validator_telemetry: Telemetry<N>,
161 primary_sender: OnceCell<PrimarySender<N>>,
163 worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
165 sync_sender: OnceCell<SyncSender<N>>,
167 handles: Mutex<Vec<JoinHandle<()>>>,
169 storage_mode: StorageMode,
171 trusted_peers_only: bool,
173 dev: Option<u16>,
175}
176
177impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
178 const MAXIMUM_POOL_SIZE: usize = 200;
179 const OWNER: &str = CONTEXT;
180 const PEER_SLASHING_COUNT: usize = 20;
181
182 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
183 &self.peer_pool
184 }
185
186 fn resolver(&self) -> &RwLock<Resolver<N>> {
187 &self.resolver
188 }
189
190 fn is_dev(&self) -> bool {
191 self.dev.is_some()
192 }
193
194 fn trusted_peers_only(&self) -> bool {
195 self.trusted_peers_only
196 }
197
198 fn node_type(&self) -> NodeType {
199 NodeType::Validator
200 }
201}
202
203impl<N: Network> Gateway<N> {
204 #[allow(clippy::too_many_arguments)]
206 pub fn new(
207 account: Account<N>,
208 storage: Storage<N>,
209 ledger: Arc<dyn LedgerService<N>>,
210 ip: Option<SocketAddr>,
211 trusted_validators: &[SocketAddr],
212 trusted_peers_only: bool,
213 storage_mode: StorageMode,
214 dev: Option<u16>,
215 ) -> Result<Self> {
216 let ip = match (ip, dev) {
218 (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
219 (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
220 (Some(ip), _) => ip,
221 };
222 let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
224
225 let mut initial_peers = HashMap::new();
227
228 if !trusted_peers_only {
230 let cached_peers = Self::load_cached_peers(&storage_mode, VALIDATOR_CACHE_FILENAME)?;
231 for addr in cached_peers {
232 initial_peers.insert(addr, Peer::new_candidate(addr, false));
233 }
234 }
235
236 initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
239
240 Ok(Self(Arc::new(InnerGateway {
242 account,
243 storage,
244 ledger,
245 tcp,
246 cache: Default::default(),
247 resolver: Default::default(),
248 peer_pool: RwLock::new(initial_peers),
249 #[cfg(feature = "telemetry")]
250 validator_telemetry: Default::default(),
251 primary_sender: Default::default(),
252 worker_senders: Default::default(),
253 sync_sender: Default::default(),
254 handles: Default::default(),
255 storage_mode,
256 trusted_peers_only,
257 dev,
258 })))
259 }
260
261 pub async fn run(
263 &self,
264 primary_sender: PrimarySender<N>,
265 worker_senders: IndexMap<u8, WorkerSender<N>>,
266 sync_sender: Option<SyncSender<N>>,
267 ) {
268 debug!("Starting the gateway for the memory pool...");
269
270 self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
272
273 self.worker_senders.set(worker_senders).expect("The worker senders are already set");
275
276 if let Some(sync_sender) = sync_sender {
278 self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
279 }
280
281 self.enable_handshake().await;
283 self.enable_reading().await;
284 self.enable_writing().await;
285 self.enable_disconnect().await;
286 self.enable_on_connect().await;
287
288 let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
290 debug!("Listening for validator connections at address {listen_addr:?}");
291
292 self.initialize_heartbeat();
294
295 info!("Started the gateway for the memory pool at '{}'", self.local_ip());
296 }
297}
298
299impl<N: Network> Gateway<N> {
301 fn max_committee_size(&self) -> usize {
303 self.ledger.current_committee().map_or_else(
304 |_e| Committee::<N>::max_committee_size().unwrap() as usize,
305 |committee| committee.num_members(),
306 )
307 }
308
309 fn max_cache_events(&self) -> usize {
311 self.max_cache_transmissions()
312 }
313
314 fn max_cache_certificates(&self) -> usize {
316 2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
317 }
318
319 fn max_cache_transmissions(&self) -> usize {
321 self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
322 }
323
324 fn max_cache_duplicates(&self) -> usize {
326 self.max_committee_size().pow(2)
327 }
328}
329
330#[async_trait]
331impl<N: Network> CommunicationService for Gateway<N> {
332 type Message = Event<N>;
334
335 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
337 debug_assert!(start_height < end_height, "Invalid block request format");
338 Event::BlockRequest(BlockRequest { start_height, end_height })
339 }
340
341 async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
347 Transport::send(self, peer_ip, message).await
348 }
349}
350
351impl<N: Network> Gateway<N> {
352 pub fn account(&self) -> &Account<N> {
354 &self.account
355 }
356
357 pub fn dev(&self) -> Option<u16> {
359 self.dev
360 }
361
362 pub fn resolver(&self) -> &RwLock<Resolver<N>> {
364 &self.resolver
365 }
366
367 pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
369 self.resolver.read().get_listener(*connected_addr)
370 }
371
372 #[cfg(feature = "telemetry")]
374 pub fn validator_telemetry(&self) -> &Telemetry<N> {
375 &self.validator_telemetry
376 }
377
378 pub fn primary_sender(&self) -> &PrimarySender<N> {
380 self.primary_sender.get().expect("Primary sender not set in gateway")
381 }
382
383 pub fn num_workers(&self) -> u8 {
385 u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
386 .expect("Too many workers")
387 }
388
389 pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
391 self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
392 }
393
394 pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
396 if self.trusted_peers().contains(&ip) {
398 return true;
399 }
400 match self.resolve_to_aleo_addr(ip) {
402 Some(address) => self.is_authorized_validator_address(address),
404 None => false,
405 }
406 }
407
408 pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
410 if self
419 .ledger
420 .get_committee_lookback_for_round(self.storage.current_round())
421 .is_ok_and(|committee| committee.is_committee_member(validator_address))
422 {
423 return true;
424 }
425
426 if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
428 return true;
429 }
430
431 let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
433 match self.ledger.get_block_round(previous_block_height) {
435 Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
436 self.ledger
437 .get_committee_lookback_for_round(round)
438 .is_ok_and(|committee| committee.is_committee_member(validator_address))
439 }),
440 Err(_) => false,
441 }
442 }
443
444 pub fn connected_addresses(&self) -> HashSet<Address<N>> {
446 self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect()
447 }
448
449 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
451 if self.is_local_ip(listener_addr) {
453 bail!("{CONTEXT} Dropping connection request from '{listener_addr}' (attempted to self-connect)");
454 }
455 if !listener_addr.ip().is_loopback() {
457 let num_attempts = self.cache.insert_inbound_connection(listener_addr.ip(), RESTRICTED_INTERVAL);
459 if num_attempts > MAX_CONNECTION_ATTEMPTS {
461 bail!("Dropping connection request from '{listener_addr}' (tried {num_attempts} times)");
462 }
463 }
464 Ok(())
465 }
466
467 #[cfg(feature = "metrics")]
468 fn update_metrics(&self) {
469 metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64);
470 metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64);
471 }
472
473 #[cfg(test)]
475 pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
476 self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
478 self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
480 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
481 peer.upgrade_to_connected(
482 peer_addr,
483 peer_ip.port(),
484 address,
485 NodeType::Validator,
486 0,
487 ConnectionMode::Gateway,
488 );
489 }
490 }
491
492 fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
498 let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
500 warn!("Unable to resolve the listener IP address '{peer_ip}'");
501 return None;
502 };
503 let name = event.name();
505 trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
507 let result = self.unicast(peer_addr, event);
508 if let Err(e) = &result {
510 warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
511 debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
512 self.disconnect(peer_ip);
513 }
514 result.ok()
515 }
516
517 async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
521 let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
523 trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
525 return Ok(false);
526 };
527 if !(self.is_authorized_validator_ip(peer_ip)
529 || self
530 .get_connected_peer(peer_ip)
531 .map(|peer| peer.node_type == NodeType::BootstrapClient)
532 .unwrap_or(false))
533 {
534 bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
535 }
536 let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
538 if num_events >= self.max_cache_events() {
539 bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
540 }
541 match event {
543 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
544 let certificate_id = match &event {
546 Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
547 Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
548 _ => unreachable!(),
549 };
550 let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
552 if num_events >= self.max_cache_duplicates() {
553 return Ok(true);
554 }
555 }
556 Event::TransmissionRequest(TransmissionRequest { transmission_id })
557 | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
558 let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
560 if num_events >= self.max_cache_duplicates() {
561 return Ok(true);
562 }
563 }
564 Event::BlockRequest(_) => {
565 let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
566 if num_events >= self.max_cache_duplicates() {
567 return Ok(true);
568 }
569 }
570 _ => {}
571 }
572 trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
573
574 match event {
577 Event::BatchPropose(batch_propose) => {
578 let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
580 Ok(true)
581 }
582 Event::BatchSignature(batch_signature) => {
583 let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
585 Ok(true)
586 }
587 Event::BatchCertified(batch_certified) => {
588 let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
590 Ok(true)
591 }
592 Event::BlockRequest(block_request) => {
593 let BlockRequest { start_height, end_height } = block_request;
594
595 if start_height >= end_height {
597 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
598 }
599 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
601 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
602 }
603
604 let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
606
607 let self_ = self.clone();
608 let blocks = match task::spawn_blocking(move || {
609 match self_.ledger.get_blocks(start_height..end_height) {
611 Ok(blocks) => Ok(DataBlocks(blocks)),
612 Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
613 }
614 })
615 .await
616 {
617 Ok(Ok(blocks)) => blocks,
618 Ok(Err(error)) => return Err(error),
619 Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
620 };
621
622 let self_ = self.clone();
623 tokio::spawn(async move {
624 let event =
626 Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
627 Transport::send(&self_, peer_ip, event).await;
628 });
629 Ok(true)
630 }
631 Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
632 if let Some(sync_sender) = self.sync_sender.get() {
634 if !self.cache.remove_outbound_block_request(peer_ip, &request) {
636 bail!("Unsolicited block response from '{peer_ip}'")
637 }
638
639 let (send, recv) = tokio::sync::oneshot::channel();
643 rayon::spawn_fifo(move || {
644 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
645 let _ = send.send(blocks);
646 });
647 let blocks = match recv.await {
648 Ok(Ok(blocks)) => blocks,
649 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
650 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
651 };
652
653 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
655 if let Err(err) =
657 sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await
658 {
659 warn!("Unable to process block response from '{peer_ip}' - {err}");
660 }
661 }
662 Ok(true)
663 }
664 Event::CertificateRequest(certificate_request) => {
665 if let Some(sync_sender) = self.sync_sender.get() {
668 let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
670 }
671 Ok(true)
672 }
673 Event::CertificateResponse(certificate_response) => {
674 if let Some(sync_sender) = self.sync_sender.get() {
677 let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
679 }
680 Ok(true)
681 }
682 Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
683 bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
685 }
686 Event::Disconnect(message) => {
687 debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
689 self.disconnect(peer_ip);
690 Ok(false)
691 }
692 Event::PrimaryPing(ping) => {
693 let PrimaryPing { version, block_locators, primary_certificate } = ping;
694
695 if version < Event::<N>::VERSION {
697 bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
698 }
699
700 if let Some(sync_sender) = self.sync_sender.get() {
702 if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
704 bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
705 }
706 }
707
708 let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
710 Ok(true)
711 }
712 Event::TransmissionRequest(request) => {
713 let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
716 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
717 return Ok(true);
718 };
719 if let Some(sender) = self.get_worker_sender(worker_id) {
721 let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
723 }
724 Ok(true)
725 }
726 Event::TransmissionResponse(response) => {
727 let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
729 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
730 return Ok(true);
731 };
732 if let Some(sender) = self.get_worker_sender(worker_id) {
734 let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
736 }
737 Ok(true)
738 }
739 Event::ValidatorsRequest(_) => {
740 let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
741 connected_peers.shuffle(&mut rand::thread_rng());
742
743 let self_ = self.clone();
744 tokio::spawn(async move {
745 let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
747 for validator in connected_peers.into_iter() {
749 validators.insert(validator.listener_addr, validator.aleo_addr);
751 }
752 let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
754 Transport::send(&self_, peer_ip, event).await;
755 });
756 Ok(true)
757 }
758 Event::ValidatorsResponse(response) => {
759 if self.trusted_peers_only {
760 bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
761 }
762 let ValidatorsResponse { validators } = response;
763 ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
765 if !self.cache.contains_outbound_validators_request(peer_ip) {
767 bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
768 }
769 self.cache.decrement_outbound_validators_requests(peer_ip);
771
772 let valid_addrs = validators
775 .into_iter()
776 .filter_map(|(listener_addr, aleo_addr)| {
777 (self.account.address() != aleo_addr
778 && !self.is_connected_address(aleo_addr)
779 && self.is_authorized_validator_address(aleo_addr))
780 .then_some((listener_addr, None))
781 })
782 .collect::<Vec<_>>();
783 if !valid_addrs.is_empty() {
784 self.insert_candidate_peers(valid_addrs);
785 }
786
787 #[cfg(feature = "metrics")]
788 self.update_metrics();
789
790 Ok(true)
791 }
792 Event::WorkerPing(ping) => {
793 ensure!(
795 ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
796 "{CONTEXT} Received too many transmissions"
797 );
798 let num_workers = self.num_workers();
800 for transmission_id in ping.transmission_ids.into_iter() {
802 let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
804 warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
805 continue;
806 };
807 if let Some(sender) = self.get_worker_sender(worker_id) {
809 let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
811 }
812 }
813 Ok(true)
814 }
815 }
816 }
817
818 fn initialize_heartbeat(&self) {
820 let self_clone = self.clone();
821 self.spawn(async move {
822 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
824 info!("Starting the heartbeat of the gateway...");
825 loop {
826 self_clone.heartbeat().await;
828 tokio::time::sleep(Duration::from_secs(15)).await;
830 }
831 });
832 }
833
834 #[allow(dead_code)]
836 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
837 self.handles.lock().push(tokio::spawn(future));
838 }
839
840 pub async fn shut_down(&self) {
842 info!("Shutting down the gateway...");
843 if let Err(e) = self.save_best_peers(&self.storage_mode, VALIDATOR_CACHE_FILENAME, None) {
845 warn!("Failed to persist best validators to disk: {e}");
846 }
847 self.handles.lock().iter().for_each(|handle| handle.abort());
849 self.tcp.shut_down().await;
851 }
852}
853
854impl<N: Network> Gateway<N> {
855 async fn heartbeat(&self) {
857 self.log_connected_validators();
859 #[cfg(feature = "telemetry")]
861 self.log_participation_scores();
862 self.handle_trusted_validators();
864 self.handle_bootstrap_peers().await;
866 self.handle_unauthorized_validators();
868 self.handle_min_connected_validators();
870 self.handle_banned_ips();
872 }
873
874 fn log_connected_validators(&self) {
876 let connected_validators = self.connected_peers();
878 let committee = match self.ledger.current_committee() {
879 Ok(c) => c,
880 Err(err) => {
881 error!("Failed to get current committee: {err}");
882 return;
883 }
884 };
885
886 let validators_total = committee.num_members().saturating_sub(1);
888 let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
890 let connections_msg = match connected_validators.len() {
892 0 => "No connected validators".to_string(),
893 num_connected => format!("Connected to {num_connected} validators {total_validators}"),
894 };
895 let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
897 connected_validator_addresses.insert(self.account.address());
900
901 info!("{connections_msg}");
903 for peer_ip in &connected_validators {
904 let address = self.resolve_to_aleo_addr(*peer_ip).map_or("Unknown".to_string(), |a| {
905 connected_validator_addresses.insert(a);
906 a.to_string()
907 });
908 debug!("{}", format!(" {peer_ip} - {address}").dimmed());
909 }
910
911 let num_not_connected = validators_total.saturating_sub(connected_validators.len());
913 if num_not_connected > 0 {
914 info!("Not connected to {num_not_connected} validators {total_validators}");
915 let committee_members: HashSet<_> =
917 self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
918
919 for address in committee_members.difference(&connected_validator_addresses) {
921 debug!("{}", format!(" Not connected to {address}").dimmed());
922 }
923 }
924
925 if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
926 error!("Not connected to a quorum of validators");
927 }
928 }
929
930 #[cfg(feature = "telemetry")]
932 fn log_participation_scores(&self) {
933 if let Ok(current_committee) = self.ledger.current_committee() {
934 let participation_scores = self.validator_telemetry().get_participation_scores(¤t_committee);
936 debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
938 for (address, score) in participation_scores {
939 debug!("{}", format!(" {address} - {score:.2}%").dimmed());
940 }
941 }
942 }
943
944 fn handle_trusted_validators(&self) {
946 for validator_ip in &self.trusted_peers() {
948 self.connect(*validator_ip);
950 }
951 }
952
953 async fn handle_bootstrap_peers(&self) {
955 if self.trusted_peers_only {
957 return;
958 }
959 let mut candidate_bootstrap = Vec::new();
961 let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
962 for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
963 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
964 candidate_bootstrap.push(bootstrap_ip);
965 }
966 }
967 if connected_bootstrap.is_empty() {
969 let rng = &mut OsRng;
971 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
973 match self.connect(peer_ip) {
974 Some(hdl) => {
975 let result = hdl.await;
976 if let Err(err) = result {
977 warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
978 }
979 }
980 None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
981 }
982 }
983 }
984 let num_surplus = connected_bootstrap.len().saturating_sub(1);
986 if num_surplus > 0 {
987 let rng = &mut OsRng;
989 for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
991 info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
992 <Self as Transport<N>>::send(
993 self,
994 peer.listener_addr,
995 Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
996 )
997 .await;
998 self.disconnect(peer.listener_addr);
1000 }
1001 }
1002 }
1003
1004 fn handle_unauthorized_validators(&self) {
1006 let self_ = self.clone();
1007 tokio::spawn(async move {
1008 let validators = self_.get_connected_peers();
1010 for peer in validators {
1012 if peer.node_type == NodeType::BootstrapClient {
1014 continue;
1015 }
1016 if !self_.is_authorized_validator_ip(peer.listener_addr) {
1018 warn!(
1019 "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1020 peer.listener_addr
1021 );
1022 Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1023 self_.disconnect(peer.listener_addr);
1025 }
1026 }
1027 });
1028 }
1029
1030 fn handle_min_connected_validators(&self) {
1034 let trusted_validators = self.trusted_peers();
1037 if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
1038 for peer in self.get_candidate_peers() {
1039 if !trusted_validators.contains(&peer.listener_addr) {
1040 self.connect(peer.listener_addr);
1042 }
1043 }
1044
1045 let validators = self.connected_peers();
1047 if validators.is_empty() {
1049 return;
1050 }
1051 if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1053 let self_ = self.clone();
1054 tokio::spawn(async move {
1055 self_.cache.increment_outbound_validators_requests(validator_ip);
1057 let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1059 });
1060 }
1061 }
1062 }
1063
1064 async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1066 if let Err(error) = self.inbound(peer_addr, message).await {
1068 if let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) {
1069 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1070 let self_ = self.clone();
1071 tokio::spawn(async move {
1072 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1073 self_.disconnect(peer_ip);
1075 });
1076 }
1077 }
1078 }
1079
1080 fn handle_banned_ips(&self) {
1082 self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1083 }
1084}
1085
1086#[async_trait]
1087impl<N: Network> Transport<N> for Gateway<N> {
1088 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1096 macro_rules! send {
1097 ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1098 while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1100 tokio::time::sleep(Duration::from_millis(10)).await;
1102 }
1103 $self.send_inner(peer_ip, event)
1105 }};
1106 }
1107
1108 match event {
1110 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1111 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1113 send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1115 }
1116 Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1117 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1119 send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1121 }
1122 Event::BlockRequest(request) => {
1123 self.cache.insert_outbound_block_request(peer_ip, request);
1125 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1127 }
1128 _ => {
1129 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1131 }
1132 }
1133 }
1134
1135 fn broadcast(&self, event: Event<N>) {
1139 if self.number_of_connected_peers() > 0 {
1141 let self_ = self.clone();
1142 let connected_peers = self.connected_peers();
1143 tokio::spawn(async move {
1144 for peer_ip in connected_peers {
1146 let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1148 }
1149 });
1150 }
1151 }
1152}
1153
1154impl<N: Network> P2P for Gateway<N> {
1155 fn tcp(&self) -> &Tcp {
1157 &self.tcp
1158 }
1159}
1160
1161#[async_trait]
1162impl<N: Network> Reading for Gateway<N> {
1163 type Codec = EventCodec<N>;
1164 type Message = Event<N>;
1165
1166 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1169 Default::default()
1170 }
1171
1172 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1174 if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1175 let self_ = self.clone();
1176 tokio::spawn(async move {
1179 self_.process_message_inner(peer_addr, message).await;
1180 });
1181 } else {
1182 self.process_message_inner(peer_addr, message).await;
1183 }
1184 Ok(())
1185 }
1186
1187 fn message_queue_depth(&self) -> usize {
1190 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1191 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1192 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1193 }
1194}
1195
1196#[async_trait]
1197impl<N: Network> Writing for Gateway<N> {
1198 type Codec = EventCodec<N>;
1199 type Message = Event<N>;
1200
1201 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1204 Default::default()
1205 }
1206
1207 fn message_queue_depth(&self) -> usize {
1211 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1212 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1213 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1214 }
1215}
1216
1217#[async_trait]
1218impl<N: Network> Disconnect for Gateway<N> {
1219 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1221 if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1222 self.downgrade_peer_to_candidate(peer_ip);
1223 if let Some(sync_sender) = self.sync_sender.get() {
1225 let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
1226 tokio::spawn(async move {
1227 if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
1228 warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
1229 }
1230 });
1231 }
1232 self.cache.clear_outbound_validators_requests(peer_ip);
1236 self.cache.clear_outbound_block_requests(peer_ip);
1237 #[cfg(feature = "metrics")]
1238 self.update_metrics();
1239 }
1240 }
1241}
1242
1243#[async_trait]
1244impl<N: Network> OnConnect for Gateway<N> {
1245 async fn on_connect(&self, peer_addr: SocketAddr) {
1246 if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1247 if let Some(peer) = self.get_connected_peer(listener_addr) {
1248 if peer.node_type == NodeType::BootstrapClient {
1249 let _ =
1250 <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1251 .await;
1252 }
1253 }
1254 }
1255 }
1256}
1257
1258#[async_trait]
1259impl<N: Network> Handshake for Gateway<N> {
1260 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1262 let peer_addr = connection.addr();
1264 let peer_side = connection.side();
1265
1266 #[cfg(not(any(test)))]
1268 if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1269 if self.is_ip_banned(peer_addr.ip()) {
1271 trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1272 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1273 }
1274
1275 let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1276
1277 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1278 if num_attempts > MAX_CONNECTION_ATTEMPTS {
1279 self.update_ip_ban(peer_addr.ip());
1280 trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1281 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1282 }
1283 }
1284
1285 let stream = self.borrow_stream(&mut connection);
1286
1287 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1290 debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1291 None
1292 } else {
1293 debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1294 Some(peer_addr)
1295 };
1296
1297 let restrictions_id = self.ledger.latest_restrictions_id();
1299
1300 let handshake_result = if peer_side == ConnectionSide::Responder {
1302 self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1303 } else {
1304 self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1305 };
1306
1307 if let Some(addr) = listener_addr {
1308 match handshake_result {
1309 Ok(Some(ref cr)) => {
1310 let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1311 NodeType::BootstrapClient
1312 } else {
1313 NodeType::Validator
1314 };
1315 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1316 self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1317 peer.upgrade_to_connected(
1318 peer_addr,
1319 cr.listener_port,
1320 cr.address,
1321 node_type,
1322 cr.version,
1323 ConnectionMode::Gateway,
1324 );
1325 }
1326 #[cfg(feature = "metrics")]
1327 self.update_metrics();
1328 info!("{CONTEXT} Connected to '{addr}'");
1329 }
1330 Ok(None) => {
1331 return Err(error(format!("Duplicate handshake attempt with '{addr}'")));
1332 }
1333 Err(error) => {
1334 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1335 if peer.is_connecting() {
1337 peer.downgrade_to_candidate(addr);
1338 }
1339 }
1340 return Err(error);
1342 }
1343 }
1344 }
1345
1346 Ok(connection)
1347 }
1348}
1349
1350macro_rules! expect_event {
1352 ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1353 match $framed.try_next().await? {
1354 Some($event_ty(data)) => {
1356 trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1357 data
1358 }
1359 Some(Event::Disconnect(reason)) => {
1361 return Err(error(format!("{CONTEXT} '{}' disconnected: {reason:?}", $peer_addr)));
1362 }
1363 Some(ty) => {
1365 return Err(error(format!(
1366 "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1367 $peer_addr,
1368 ty.name(),
1369 stringify!($event_ty),
1370 )))
1371 }
1372 None => {
1374 return Err(error(format!(
1375 "{CONTEXT} the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
1376 stringify!($event_ty)
1377 )))
1378 }
1379 }
1380 };
1381}
1382
1383async fn send_event<N: Network>(
1385 framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1386 peer_addr: SocketAddr,
1387 event: Event<N>,
1388) -> io::Result<()> {
1389 trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1390 framed.send(event).await
1391}
1392
1393impl<N: Network> Gateway<N> {
1394 async fn handshake_inner_initiator<'a>(
1396 &'a self,
1397 peer_addr: SocketAddr,
1398 restrictions_id: Field<N>,
1399 stream: &'a mut TcpStream,
1400 ) -> io::Result<Option<ChallengeRequest<N>>> {
1401 if !self.add_connecting_peer(peer_addr) {
1403 return Ok(None);
1404 }
1405
1406 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1408
1409 let rng = &mut rand::rngs::OsRng;
1411
1412 let our_nonce = rng.r#gen();
1416 let current_block_height = self.ledger.latest_block_height();
1418 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1419 let snarkos_sha = (consensus_version >= ConsensusVersion::V12)
1420 .then(|| built_info::GIT_COMMIT_HASH.unwrap_or_default().into());
1421 let our_request =
1423 ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha.clone());
1424 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1425
1426 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1430 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1432
1433 if let Some(reason) = self
1435 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1436 .await
1437 {
1438 send_event(&mut framed, peer_addr, reason.into()).await?;
1439 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1440 }
1441 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1443 send_event(&mut framed, peer_addr, reason.into()).await?;
1444 if reason == DisconnectReason::NoReasonGiven {
1445 return Ok(None);
1447 } else {
1448 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1449 }
1450 }
1451
1452 let response_nonce: u64 = rng.r#gen();
1456 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1457 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1458 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1459 };
1460 let our_response =
1462 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1463 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1464
1465 Ok(Some(peer_request))
1466 }
1467
1468 async fn handshake_inner_responder<'a>(
1470 &'a self,
1471 peer_addr: SocketAddr,
1472 peer_ip: &mut Option<SocketAddr>,
1473 restrictions_id: Field<N>,
1474 stream: &'a mut TcpStream,
1475 ) -> io::Result<Option<ChallengeRequest<N>>> {
1476 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1478
1479 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1483
1484 if self.account.address() == peer_request.address {
1486 return Err(error("Skipping request to connect to self".to_string()));
1487 }
1488
1489 *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1491 let peer_ip = peer_ip.unwrap();
1492
1493 if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1495 return Err(error(format!("{forbidden_message}")));
1496 }
1497
1498 if !self.add_connecting_peer(peer_ip) {
1500 return Ok(None);
1501 }
1502
1503 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1505 send_event(&mut framed, peer_addr, reason.into()).await?;
1506 if reason == DisconnectReason::NoReasonGiven {
1507 return Ok(None);
1509 } else {
1510 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1511 }
1512 }
1513
1514 let rng = &mut rand::rngs::OsRng;
1518
1519 let response_nonce: u64 = rng.r#gen();
1521 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1522 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1523 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1524 };
1525 let our_response =
1527 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1528 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1529
1530 let our_nonce = rng.r#gen();
1532 let current_block_height = self.ledger.latest_block_height();
1534 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1535 let snarkos_sha = (consensus_version >= ConsensusVersion::V12)
1536 .then(|| built_info::GIT_COMMIT_HASH.unwrap_or_default().into());
1537 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1539 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1540
1541 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1545 if let Some(reason) = self
1547 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1548 .await
1549 {
1550 send_event(&mut framed, peer_addr, reason.into()).await?;
1551 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1552 }
1553
1554 Ok(Some(peer_request))
1555 }
1556
1557 fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1559 let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
1561 log_repo_sha_comparison(peer_addr, snarkos_sha.as_ref(), CONTEXT);
1562
1563 let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1564
1565 if version < Event::<N>::VERSION {
1567 warn!("{CONTEXT} Dropping '{peer_addr}' on version {version} (outdated)");
1568 return Some(DisconnectReason::OutdatedClientVersion);
1569 }
1570 if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1572 warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1573 return Some(DisconnectReason::ProtocolViolation);
1574 }
1575 if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1576 if !self.is_authorized_validator_address(address) {
1578 warn!("{CONTEXT} Dropping '{peer_addr}' for being an unauthorized validator ({address})");
1579 return Some(DisconnectReason::ProtocolViolation);
1580 }
1581 }
1582 if self.is_connected_address(address) {
1584 warn!("{CONTEXT} Dropping '{peer_addr}' for being already connected ({address})");
1585 return Some(DisconnectReason::NoReasonGiven);
1586 }
1587 None
1588 }
1589
1590 async fn verify_challenge_response(
1592 &self,
1593 peer_addr: SocketAddr,
1594 peer_address: Address<N>,
1595 response: ChallengeResponse<N>,
1596 expected_restrictions_id: Field<N>,
1597 expected_nonce: u64,
1598 ) -> Option<DisconnectReason> {
1599 let ChallengeResponse { restrictions_id, signature, nonce } = response;
1601
1602 if restrictions_id != expected_restrictions_id {
1604 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1605 return Some(DisconnectReason::InvalidChallengeResponse);
1606 }
1607 let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1609 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1610 return Some(DisconnectReason::InvalidChallengeResponse);
1611 };
1612 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1614 warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1615 return Some(DisconnectReason::InvalidChallengeResponse);
1616 }
1617 None
1618 }
1619}
1620
1621#[cfg(test)]
1622mod prop_tests {
1623 use crate::{
1624 Gateway,
1625 MAX_WORKERS,
1626 MEMORY_POOL_PORT,
1627 Worker,
1628 gateway::prop_tests::GatewayAddress::{Dev, Prod},
1629 helpers::{Storage, init_primary_channels, init_worker_channels},
1630 };
1631 use aleo_std::StorageMode;
1632 use snarkos_account::Account;
1633 use snarkos_node_bft_ledger_service::MockLedgerService;
1634 use snarkos_node_bft_storage_service::BFTMemoryService;
1635 use snarkos_node_network::PeerPoolHandling;
1636 use snarkos_node_tcp::P2P;
1637 use snarkvm::{
1638 ledger::{
1639 committee::{
1640 Committee,
1641 prop_tests::{CommitteeContext, ValidatorSet},
1642 test_helpers::sample_committee_for_round_and_members,
1643 },
1644 narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1645 },
1646 prelude::{MainnetV0, PrivateKey},
1647 utilities::TestRng,
1648 };
1649
1650 use indexmap::{IndexMap, IndexSet};
1651 use proptest::{
1652 prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1653 sample::Selector,
1654 };
1655 use std::{
1656 fmt::{Debug, Formatter},
1657 net::{IpAddr, Ipv4Addr, SocketAddr},
1658 sync::Arc,
1659 };
1660 use test_strategy::proptest;
1661
1662 type CurrentNetwork = MainnetV0;
1663
1664 impl Debug for Gateway<CurrentNetwork> {
1665 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1666 f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1668 }
1669 }
1670
1671 #[derive(Debug, test_strategy::Arbitrary)]
1672 enum GatewayAddress {
1673 Dev(u8),
1674 Prod(Option<SocketAddr>),
1675 }
1676
1677 impl GatewayAddress {
1678 fn ip(&self) -> Option<SocketAddr> {
1679 if let GatewayAddress::Prod(ip) = self {
1680 return *ip;
1681 }
1682 None
1683 }
1684
1685 fn port(&self) -> Option<u16> {
1686 if let GatewayAddress::Dev(port) = self {
1687 return Some(*port as u16);
1688 }
1689 None
1690 }
1691 }
1692
1693 impl Arbitrary for Gateway<CurrentNetwork> {
1694 type Parameters = ();
1695 type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1696
1697 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1698 any_valid_dev_gateway()
1699 .prop_map(|(storage, _, private_key, address)| {
1700 Gateway::new(
1701 Account::try_from(private_key).unwrap(),
1702 storage.clone(),
1703 storage.ledger().clone(),
1704 address.ip(),
1705 &[],
1706 false,
1707 StorageMode::new_test(None),
1708 address.port(),
1709 )
1710 .unwrap()
1711 })
1712 .boxed()
1713 }
1714 }
1715
1716 type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1717
1718 fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1719 (any::<CommitteeContext>(), any::<Selector>())
1720 .prop_flat_map(|(context, account_selector)| {
1721 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1722 (
1723 any_with::<Storage<CurrentNetwork>>(context.clone()),
1724 Just(context),
1725 Just(account_selector.select(validators)),
1726 0u8..,
1727 )
1728 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1729 })
1730 .boxed()
1731 }
1732
1733 fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1734 (any::<CommitteeContext>(), any::<Selector>())
1735 .prop_flat_map(|(context, account_selector)| {
1736 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1737 (
1738 any_with::<Storage<CurrentNetwork>>(context.clone()),
1739 Just(context),
1740 Just(account_selector.select(validators)),
1741 any::<Option<SocketAddr>>(),
1742 )
1743 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1744 })
1745 .boxed()
1746 }
1747
1748 #[proptest]
1749 fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1750 let (storage, _, private_key, dev) = input;
1751 let account = Account::try_from(private_key).unwrap();
1752
1753 let gateway = Gateway::new(
1754 account.clone(),
1755 storage.clone(),
1756 storage.ledger().clone(),
1757 dev.ip(),
1758 &[],
1759 false,
1760 StorageMode::new_test(None),
1761 dev.port(),
1762 )
1763 .unwrap();
1764 let tcp_config = gateway.tcp().config();
1765 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1766 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1767
1768 let tcp_config = gateway.tcp().config();
1769 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1770 assert_eq!(gateway.account().address(), account.address());
1771 }
1772
1773 #[proptest]
1774 fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1775 let (storage, _, private_key, dev) = input;
1776 let account = Account::try_from(private_key).unwrap();
1777
1778 let gateway = Gateway::new(
1779 account.clone(),
1780 storage.clone(),
1781 storage.ledger().clone(),
1782 dev.ip(),
1783 &[],
1784 false,
1785 StorageMode::new_test(None),
1786 dev.port(),
1787 )
1788 .unwrap();
1789 let tcp_config = gateway.tcp().config();
1790 if let Some(socket_addr) = dev.ip() {
1791 assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1792 assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1793 } else {
1794 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1795 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1796 }
1797
1798 let tcp_config = gateway.tcp().config();
1799 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1800 assert_eq!(gateway.account().address(), account.address());
1801 }
1802
1803 #[proptest(async = "tokio")]
1804 async fn gateway_start(
1805 #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1806 #[strategy(0..MAX_WORKERS)] workers_count: u8,
1807 ) {
1808 let (storage, committee, private_key, dev) = input;
1809 let committee = committee.0;
1810 let worker_storage = storage.clone();
1811 let account = Account::try_from(private_key).unwrap();
1812
1813 let gateway = Gateway::new(
1814 account,
1815 storage.clone(),
1816 storage.ledger().clone(),
1817 dev.ip(),
1818 &[],
1819 false,
1820 StorageMode::new_test(None),
1821 dev.port(),
1822 )
1823 .unwrap();
1824
1825 let (primary_sender, _) = init_primary_channels();
1826
1827 let (workers, worker_senders) = {
1828 let mut tx_workers = IndexMap::new();
1830 let mut workers = IndexMap::new();
1831
1832 for id in 0..workers_count {
1834 let (tx_worker, rx_worker) = init_worker_channels();
1836 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1838 let worker =
1839 Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1840 .unwrap();
1841 worker.run(rx_worker);
1843
1844 workers.insert(id, worker);
1846 tx_workers.insert(id, tx_worker);
1847 }
1848 (workers, tx_workers)
1849 };
1850
1851 gateway.run(primary_sender, worker_senders, None).await;
1852 assert_eq!(
1853 gateway.local_ip(),
1854 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1855 );
1856 assert_eq!(gateway.num_workers(), workers.len() as u8);
1857 }
1858
1859 #[proptest]
1860 fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1861 let rng = &mut TestRng::default();
1862
1863 let current_round = 2;
1865 let committee_size = 4;
1866 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1867 let (_, _, private_key, dev) = input;
1868 let account = Account::try_from(private_key).unwrap();
1869
1870 let mut certificates = IndexSet::new();
1872 for _ in 0..committee_size {
1873 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1874 }
1875 let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1876 let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1878 for _ in 0..committee_size {
1880 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1881 }
1882 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1884 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1886 let gateway = Gateway::new(
1888 account.clone(),
1889 storage.clone(),
1890 ledger.clone(),
1891 dev.ip(),
1892 &[],
1893 false,
1894 StorageMode::new_test(None),
1895 dev.port(),
1896 )
1897 .unwrap();
1898 for certificate in certificates.iter() {
1900 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1901 }
1902 for i in 0..certificates.clone().len() {
1904 let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1905 if i < committee_size {
1906 assert!(is_authorized);
1907 } else {
1908 assert!(!is_authorized);
1909 }
1910 }
1911 }
1912}