1#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19 CONTEXT,
20 MAX_BATCH_DELAY_IN_MS,
21 MEMORY_POOL_PORT,
22 Worker,
23 events::{EventCodec, PrimaryPing},
24 helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker},
25 spawn_blocking,
26};
27use snarkos_account::Account;
28use snarkos_node_bft_events::{
29 BlockRequest,
30 BlockResponse,
31 CertificateRequest,
32 CertificateResponse,
33 ChallengeRequest,
34 ChallengeResponse,
35 DataBlocks,
36 DisconnectReason,
37 Event,
38 EventTrait,
39 TransmissionRequest,
40 TransmissionResponse,
41 ValidatorsRequest,
42 ValidatorsResponse,
43};
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
46use snarkos_node_tcp::{
47 Config,
48 Connection,
49 ConnectionSide,
50 P2P,
51 Tcp,
52 is_bogon_ip,
53 is_unspecified_or_broadcast_ip,
54 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
55};
56use snarkvm::{
57 console::prelude::*,
58 ledger::{
59 committee::Committee,
60 narwhal::{BatchHeader, Data},
61 },
62 prelude::{Address, Field},
63};
64
65use colored::Colorize;
66use futures::SinkExt;
67use indexmap::{IndexMap, IndexSet};
68#[cfg(feature = "locktick")]
69use locktick::parking_lot::{Mutex, RwLock};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use rand::seq::{IteratorRandom, SliceRandom};
73#[cfg(not(any(test)))]
74use std::net::IpAddr;
75use std::{
76 collections::HashSet,
77 future::Future,
78 io,
79 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
80 sync::Arc,
81 time::Duration,
82};
83use tokio::{
84 net::TcpStream,
85 sync::{OnceCell, oneshot},
86 task::{self, JoinHandle},
87};
88use tokio_stream::StreamExt;
89use tokio_util::codec::Framed;
90
91const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const MAX_CONNECTION_ATTEMPTS: usize = 10;
98const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; const MIN_CONNECTED_VALIDATORS: usize = 175;
103const MAX_VALIDATORS_TO_SEND: usize = 200;
105
106#[cfg(not(any(test)))]
108const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
109const IP_BAN_TIME_IN_SECS: u64 = 300;
111
112#[async_trait]
115pub trait Transport<N: Network>: Send + Sync {
116 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
117 fn broadcast(&self, event: Event<N>);
118}
119
120#[derive(Clone)]
123pub struct Gateway<N: Network> {
124 account: Account<N>,
126 storage: Storage<N>,
128 ledger: Arc<dyn LedgerService<N>>,
130 tcp: Tcp,
132 cache: Arc<Cache<N>>,
134 resolver: Arc<Resolver<N>>,
136 trusted_validators: IndexSet<SocketAddr>,
138 connected_peers: Arc<RwLock<IndexSet<SocketAddr>>>,
140 connecting_peers: Arc<Mutex<IndexSet<SocketAddr>>>,
145 #[cfg(feature = "telemetry")]
147 validator_telemetry: Telemetry<N>,
148 primary_sender: Arc<OnceCell<PrimarySender<N>>>,
150 worker_senders: Arc<OnceCell<IndexMap<u8, WorkerSender<N>>>>,
152 sync_sender: Arc<OnceCell<SyncSender<N>>>,
154 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
156 dev: Option<u16>,
158}
159
160impl<N: Network> Gateway<N> {
161 pub fn new(
163 account: Account<N>,
164 storage: Storage<N>,
165 ledger: Arc<dyn LedgerService<N>>,
166 ip: Option<SocketAddr>,
167 trusted_validators: &[SocketAddr],
168 dev: Option<u16>,
169 ) -> Result<Self> {
170 let ip = match (ip, dev) {
172 (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
173 (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
174 (Some(ip), _) => ip,
175 };
176 let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
178
179 Ok(Self {
181 account,
182 storage,
183 ledger,
184 tcp,
185 cache: Default::default(),
186 resolver: Default::default(),
187 trusted_validators: trusted_validators.iter().copied().collect(),
188 connected_peers: Default::default(),
189 connecting_peers: Default::default(),
190 #[cfg(feature = "telemetry")]
191 validator_telemetry: Default::default(),
192 primary_sender: Default::default(),
193 worker_senders: Default::default(),
194 sync_sender: Default::default(),
195 handles: Default::default(),
196 dev,
197 })
198 }
199
200 pub async fn run(
202 &self,
203 primary_sender: PrimarySender<N>,
204 worker_senders: IndexMap<u8, WorkerSender<N>>,
205 sync_sender: Option<SyncSender<N>>,
206 ) {
207 debug!("Starting the gateway for the memory pool...");
208
209 self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
211
212 self.worker_senders.set(worker_senders).expect("The worker senders are already set");
214
215 if let Some(sync_sender) = sync_sender {
217 self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
218 }
219
220 self.enable_handshake().await;
222 self.enable_reading().await;
223 self.enable_writing().await;
224 self.enable_disconnect().await;
225 self.enable_on_connect().await;
226
227 let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
229 debug!("Listening for validator connections at address {listen_addr:?}");
230
231 self.initialize_heartbeat();
233
234 info!("Started the gateway for the memory pool at '{}'", self.local_ip());
235 }
236}
237
238impl<N: Network> Gateway<N> {
240 fn max_committee_size(&self) -> usize {
242 self.ledger.current_committee().map_or_else(
243 |_e| Committee::<N>::max_committee_size().unwrap() as usize,
244 |committee| committee.num_members(),
245 )
246 }
247
248 fn max_cache_events(&self) -> usize {
250 self.max_cache_transmissions()
251 }
252
253 fn max_cache_certificates(&self) -> usize {
255 2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
256 }
257
258 fn max_cache_transmissions(&self) -> usize {
260 self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
261 }
262
263 fn max_cache_duplicates(&self) -> usize {
265 self.max_committee_size().pow(2)
266 }
267}
268
269#[async_trait]
270impl<N: Network> CommunicationService for Gateway<N> {
271 type Message = Event<N>;
273
274 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
276 debug_assert!(start_height < end_height, "Invalid block request format");
277 Event::BlockRequest(BlockRequest { start_height, end_height })
278 }
279
280 async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
286 Transport::send(self, peer_ip, message).await
287 }
288
289 fn ban_peer(&self, peer_ip: SocketAddr) {
290 trace!("Banning peer {peer_ip} for timing out on block requests");
291
292 let tcp = self.tcp().clone();
293 tcp.banned_peers().update_ip_ban(peer_ip.ip());
294
295 tokio::spawn(async move {
296 tcp.disconnect(peer_ip).await;
297 });
298 }
299}
300
301impl<N: Network> Gateway<N> {
302 pub const fn account(&self) -> &Account<N> {
304 &self.account
305 }
306
307 pub const fn dev(&self) -> Option<u16> {
309 self.dev
310 }
311
312 pub fn local_ip(&self) -> SocketAddr {
314 self.tcp.listening_addr().expect("The TCP listener is not enabled")
315 }
316
317 pub fn is_local_ip(&self, ip: SocketAddr) -> bool {
319 ip == self.local_ip()
320 || (ip.ip().is_unspecified() || ip.ip().is_loopback()) && ip.port() == self.local_ip().port()
321 }
322
323 pub fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
325 !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
326 }
327
328 pub fn resolver(&self) -> &Resolver<N> {
330 &self.resolver
331 }
332
333 #[cfg(feature = "telemetry")]
335 pub fn validator_telemetry(&self) -> &Telemetry<N> {
336 &self.validator_telemetry
337 }
338
339 pub fn primary_sender(&self) -> &PrimarySender<N> {
341 self.primary_sender.get().expect("Primary sender not set in gateway")
342 }
343
344 pub fn num_workers(&self) -> u8 {
346 u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
347 .expect("Too many workers")
348 }
349
350 pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
352 self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
353 }
354
355 pub fn is_connected_address(&self, address: Address<N>) -> bool {
357 match self.resolver.get_peer_ip_for_address(address) {
359 Some(peer_ip) => self.is_connected_ip(peer_ip),
361 None => false,
362 }
363 }
364
365 pub fn is_connected_ip(&self, ip: SocketAddr) -> bool {
367 self.connected_peers.read().contains(&ip)
368 }
369
370 pub fn is_connecting_ip(&self, ip: SocketAddr) -> bool {
372 self.connecting_peers.lock().contains(&ip)
373 }
374
375 pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
377 if self.trusted_validators.contains(&ip) {
379 return true;
380 }
381 match self.resolver.get_address(ip) {
383 Some(address) => self.is_authorized_validator_address(address),
385 None => false,
386 }
387 }
388
389 pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
391 if self
400 .ledger
401 .get_committee_lookback_for_round(self.storage.current_round())
402 .is_ok_and(|committee| committee.is_committee_member(validator_address))
403 {
404 return true;
405 }
406
407 if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
409 return true;
410 }
411
412 let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
414 match self.ledger.get_block_round(previous_block_height) {
416 Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
417 self.ledger
418 .get_committee_lookback_for_round(round)
419 .is_ok_and(|committee| committee.is_committee_member(validator_address))
420 }),
421 Err(_) => false,
422 }
423 }
424
425 pub fn max_connected_peers(&self) -> usize {
427 self.tcp.config().max_connections as usize
428 }
429
430 pub fn number_of_connected_peers(&self) -> usize {
432 self.connected_peers.read().len()
433 }
434
435 pub fn connected_addresses(&self) -> HashSet<Address<N>> {
437 self.connected_peers.read().iter().filter_map(|peer_ip| self.resolver.get_address(*peer_ip)).collect()
438 }
439
440 pub fn connected_peers(&self) -> &RwLock<IndexSet<SocketAddr>> {
442 &self.connected_peers
443 }
444
445 pub fn connect(&self, peer_ip: SocketAddr) -> Option<JoinHandle<()>> {
447 if let Err(forbidden_error) = self.check_connection_attempt(peer_ip) {
449 warn!("{forbidden_error}");
450 return None;
451 }
452
453 let self_ = self.clone();
454 Some(tokio::spawn(async move {
455 debug!("Connecting to validator {peer_ip}...");
456 if let Err(error) = self_.tcp.connect(peer_ip).await {
458 self_.connecting_peers.lock().shift_remove(&peer_ip);
459 warn!("Unable to connect to '{peer_ip}' - {error}");
460 }
461 }))
462 }
463
464 fn check_connection_attempt(&self, peer_ip: SocketAddr) -> Result<()> {
466 if self.is_local_ip(peer_ip) {
468 bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (attempted to self-connect)")
469 }
470 if self.number_of_connected_peers() >= self.max_connected_peers() {
472 bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (maximum peers reached)")
473 }
474 if self.is_connected_ip(peer_ip) {
476 bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connected)")
477 }
478 if self.is_connecting_ip(peer_ip) {
480 bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connecting)")
481 }
482 Ok(())
483 }
484
485 fn ensure_peer_is_allowed(&self, peer_ip: SocketAddr) -> Result<()> {
487 if self.is_local_ip(peer_ip) {
489 bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (attempted to self-connect)")
490 }
491 if !self.connecting_peers.lock().insert(peer_ip) {
493 bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already shaking hands as the initiator)")
494 }
495 if self.is_connected_ip(peer_ip) {
497 bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already connected)")
498 }
499 if !peer_ip.ip().is_loopback() {
501 let num_attempts = self.cache.insert_inbound_connection(peer_ip.ip(), RESTRICTED_INTERVAL);
503 if num_attempts > MAX_CONNECTION_ATTEMPTS {
505 bail!("Dropping connection request from '{peer_ip}' (tried {num_attempts} times)")
506 }
507 }
508 Ok(())
509 }
510
511 #[cfg(not(any(test)))]
513 fn is_ip_banned(&self, ip: IpAddr) -> bool {
514 self.tcp.banned_peers().is_ip_banned(&ip)
515 }
516
517 #[cfg(not(any(test)))]
519 fn update_ip_ban(&self, ip: IpAddr) {
520 self.tcp.banned_peers().update_ip_ban(ip);
521 }
522
523 #[cfg(feature = "metrics")]
524 fn update_metrics(&self) {
525 metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
526 metrics::gauge(metrics::bft::CONNECTING, self.connecting_peers.lock().len() as f64);
527 }
528
529 #[cfg(not(test))]
531 fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
532 self.resolver.insert_peer(peer_ip, peer_addr, address);
534 self.connected_peers.write().insert(peer_ip);
536 #[cfg(feature = "metrics")]
537 self.update_metrics();
538 }
539
540 #[cfg(test)]
542 pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
543 self.resolver.insert_peer(peer_ip, peer_addr, address);
545 self.connected_peers.write().insert(peer_ip);
547 }
548
549 fn remove_connected_peer(&self, peer_ip: SocketAddr) {
551 if let Some(sync_sender) = self.sync_sender.get() {
553 let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
554 tokio::spawn(async move {
555 if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
556 warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
557 }
558 });
559 }
560 self.resolver.remove_peer(peer_ip);
562 self.connected_peers.write().shift_remove(&peer_ip);
564 #[cfg(feature = "metrics")]
565 self.update_metrics();
566 }
567
568 fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
574 let Some(peer_addr) = self.resolver.get_ambiguous(peer_ip) else {
576 warn!("Unable to resolve the listener IP address '{peer_ip}'");
577 return None;
578 };
579 let name = event.name();
581 trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
583 let result = self.unicast(peer_addr, event);
584 if let Err(e) = &result {
586 warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
587 debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
588 self.disconnect(peer_ip);
589 }
590 result.ok()
591 }
592
593 async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<()> {
595 let Some(peer_ip) = self.resolver.get_listener(peer_addr) else {
597 bail!("{CONTEXT} Unable to resolve the (ambiguous) peer address '{peer_addr}'")
598 };
599 if !self.is_authorized_validator_ip(peer_ip) {
601 bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
602 }
603 let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
605 if num_events >= self.max_cache_events() {
606 bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
607 }
608 match event {
610 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
611 let certificate_id = match &event {
613 Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
614 Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
615 _ => unreachable!(),
616 };
617 let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
619 if num_events >= self.max_cache_duplicates() {
620 return Ok(());
621 }
622 }
623 Event::TransmissionRequest(TransmissionRequest { transmission_id })
624 | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
625 let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
627 if num_events >= self.max_cache_duplicates() {
628 return Ok(());
629 }
630 }
631 Event::BlockRequest(_) => {
632 let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
633 if num_events >= self.max_cache_duplicates() {
634 return Ok(());
635 }
636 }
637 _ => {}
638 }
639 trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
640
641 match event {
644 Event::BatchPropose(batch_propose) => {
645 let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
647 Ok(())
648 }
649 Event::BatchSignature(batch_signature) => {
650 let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
652 Ok(())
653 }
654 Event::BatchCertified(batch_certified) => {
655 let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
657 Ok(())
658 }
659 Event::BlockRequest(block_request) => {
660 let BlockRequest { start_height, end_height } = block_request;
661
662 if start_height >= end_height {
664 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
665 }
666 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
668 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
669 }
670
671 let self_ = self.clone();
672 let blocks = match task::spawn_blocking(move || {
673 match self_.ledger.get_blocks(start_height..end_height) {
675 Ok(blocks) => Ok(Data::Object(DataBlocks(blocks))),
676 Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
677 }
678 })
679 .await
680 {
681 Ok(Ok(blocks)) => blocks,
682 Ok(Err(error)) => return Err(error),
683 Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
684 };
685
686 let self_ = self.clone();
687 tokio::spawn(async move {
688 let event = Event::BlockResponse(BlockResponse { request: block_request, blocks });
690 Transport::send(&self_, peer_ip, event).await;
691 });
692 Ok(())
693 }
694 Event::BlockResponse(block_response) => {
695 if let Some(sync_sender) = self.sync_sender.get() {
697 let BlockResponse { request, blocks } = block_response;
699
700 if !self.cache.remove_outbound_block_request(peer_ip, &request) {
702 bail!("Unsolicited block response from '{peer_ip}'")
703 }
704
705 let (send, recv) = tokio::sync::oneshot::channel();
709 rayon::spawn_fifo(move || {
710 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
711 let _ = send.send(blocks);
712 });
713 let blocks = match recv.await {
714 Ok(Ok(blocks)) => blocks,
715 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
716 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
717 };
718
719 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
721 if let Err(e) = sync_sender.advance_with_sync_blocks(peer_ip, blocks.0).await {
723 warn!("Unable to process block response from '{peer_ip}' - {e}");
724 }
725 }
726 Ok(())
727 }
728 Event::CertificateRequest(certificate_request) => {
729 if let Some(sync_sender) = self.sync_sender.get() {
732 let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
734 }
735 Ok(())
736 }
737 Event::CertificateResponse(certificate_response) => {
738 if let Some(sync_sender) = self.sync_sender.get() {
741 let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
743 }
744 Ok(())
745 }
746 Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
747 bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
749 }
750 Event::Disconnect(disconnect) => {
751 bail!("{CONTEXT} {:?}", disconnect.reason)
752 }
753 Event::PrimaryPing(ping) => {
754 let PrimaryPing { version, block_locators, primary_certificate } = ping;
755
756 if version < Event::<N>::VERSION {
758 bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
759 }
760
761 if let Some(sync_sender) = self.sync_sender.get() {
763 if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
765 bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
766 }
767 }
768
769 let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
771 Ok(())
772 }
773 Event::TransmissionRequest(request) => {
774 let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
777 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
778 return Ok(());
779 };
780 if let Some(sender) = self.get_worker_sender(worker_id) {
782 let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
784 }
785 Ok(())
786 }
787 Event::TransmissionResponse(response) => {
788 let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
790 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
791 return Ok(());
792 };
793 if let Some(sender) = self.get_worker_sender(worker_id) {
795 let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
797 }
798 Ok(())
799 }
800 Event::ValidatorsRequest(_) => {
801 let mut connected_peers: Vec<_> = match self.dev.is_some() {
803 true => self.connected_peers.read().iter().copied().collect(),
805 false => {
807 self.connected_peers.read().iter().copied().filter(|ip| self.is_valid_peer_ip(*ip)).collect()
808 }
809 };
810 connected_peers.shuffle(&mut rand::thread_rng());
812
813 let self_ = self.clone();
814 tokio::spawn(async move {
815 let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
817 for validator_ip in connected_peers.into_iter().take(MAX_VALIDATORS_TO_SEND) {
819 if let Some(validator_address) = self_.resolver.get_address(validator_ip) {
821 validators.insert(validator_ip, validator_address);
823 }
824 }
825 let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
827 Transport::send(&self_, peer_ip, event).await;
828 });
829 Ok(())
830 }
831 Event::ValidatorsResponse(response) => {
832 let ValidatorsResponse { validators } = response;
833 ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
835 if !self.cache.contains_outbound_validators_request(peer_ip) {
837 bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
838 }
839 self.cache.decrement_outbound_validators_requests(peer_ip);
841
842 if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
844 let self_ = self.clone();
846 tokio::spawn(async move {
847 for (validator_ip, validator_address) in validators {
848 if self_.dev.is_some() {
849 if self_.is_local_ip(validator_ip) {
851 continue;
852 }
853 } else {
854 if !self_.is_valid_peer_ip(validator_ip) {
856 continue;
857 }
858 }
859
860 if self_.account.address() == validator_address {
862 continue;
863 }
864 if self_.is_connected_ip(validator_ip) || self_.is_connecting_ip(validator_ip) {
866 continue;
867 }
868 if self_.is_connected_address(validator_address) {
870 continue;
871 }
872 if !self_.is_authorized_validator_address(validator_address) {
874 continue;
875 }
876 self_.connect(validator_ip);
878 }
879 });
880 }
881 Ok(())
882 }
883 Event::WorkerPing(ping) => {
884 ensure!(
886 ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
887 "{CONTEXT} Received too many transmissions"
888 );
889 let num_workers = self.num_workers();
891 for transmission_id in ping.transmission_ids.into_iter() {
893 let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
895 warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
896 continue;
897 };
898 if let Some(sender) = self.get_worker_sender(worker_id) {
900 let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
902 }
903 }
904 Ok(())
905 }
906 }
907 }
908
909 pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle<()> {
911 let gateway = self.clone();
912 tokio::spawn(async move {
913 if let Some(peer_addr) = gateway.resolver.get_ambiguous(peer_ip) {
914 let _disconnected = gateway.tcp.disconnect(peer_addr).await;
916 debug_assert!(_disconnected);
917 }
918 })
919 }
920
921 fn initialize_heartbeat(&self) {
923 let self_clone = self.clone();
924 self.spawn(async move {
925 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
927 info!("Starting the heartbeat of the gateway...");
928 loop {
929 self_clone.heartbeat();
931 tokio::time::sleep(Duration::from_secs(15)).await;
933 }
934 });
935 }
936
937 #[allow(dead_code)]
939 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
940 self.handles.lock().push(tokio::spawn(future));
941 }
942
943 pub async fn shut_down(&self) {
945 info!("Shutting down the gateway...");
946 self.handles.lock().iter().for_each(|handle| handle.abort());
948 self.tcp.shut_down().await;
950 }
951}
952
953impl<N: Network> Gateway<N> {
954 fn heartbeat(&self) {
956 self.log_connected_validators();
958 #[cfg(feature = "telemetry")]
960 self.log_participation_scores();
961 self.handle_trusted_validators();
963 self.handle_unauthorized_validators();
965 self.handle_min_connected_validators();
967 self.handle_banned_ips();
969 }
970
971 fn log_connected_validators(&self) {
973 let connected_validators = self.connected_peers().read().clone();
975 let validators_total = self.ledger.current_committee().map_or(0, |c| c.num_members().saturating_sub(1));
977 let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
979 let connections_msg = match connected_validators.len() {
981 0 => "No connected validators".to_string(),
982 num_connected => format!("Connected to {num_connected} validators {total_validators}"),
983 };
984 let mut connected_validator_addresses = IndexSet::with_capacity(connected_validators.len());
986 connected_validator_addresses.insert(self.account.address());
987 info!("{connections_msg}");
989 for peer_ip in &connected_validators {
990 let address = self.resolver.get_address(*peer_ip).map_or("Unknown".to_string(), |a| {
991 connected_validator_addresses.insert(a);
992 a.to_string()
993 });
994 debug!("{}", format!(" {peer_ip} - {address}").dimmed());
995 }
996
997 let num_not_connected = validators_total.saturating_sub(connected_validators.len());
999 if num_not_connected > 0 {
1000 info!("Not connected to {num_not_connected} validators {total_validators}");
1001 let committee_members: IndexSet<_> =
1003 self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
1004
1005 for address in committee_members.difference(&connected_validator_addresses) {
1007 debug!("{}", format!(" Not connected to {address}").dimmed());
1008 }
1009 }
1010 }
1011
1012 #[cfg(feature = "telemetry")]
1014 fn log_participation_scores(&self) {
1015 if let Ok(current_committee) = self.ledger.current_committee() {
1016 let participation_scores = self.validator_telemetry().get_participation_scores(¤t_committee);
1018 debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
1020 for (address, score) in participation_scores {
1021 debug!("{}", format!(" {address} - {score:.2}%").dimmed());
1022 }
1023 }
1024 }
1025
1026 fn handle_trusted_validators(&self) {
1028 for validator_ip in &self.trusted_validators {
1030 if !self.is_local_ip(*validator_ip)
1032 && !self.is_connecting_ip(*validator_ip)
1033 && !self.is_connected_ip(*validator_ip)
1034 {
1035 self.connect(*validator_ip);
1037 }
1038 }
1039 }
1040
1041 fn handle_unauthorized_validators(&self) {
1043 let self_ = self.clone();
1044 tokio::spawn(async move {
1045 let validators = self_.connected_peers().read().clone();
1047 for peer_ip in validators {
1049 if !self_.is_authorized_validator_ip(peer_ip) {
1051 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - Validator is not in the current committee");
1052 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1053 self_.disconnect(peer_ip);
1055 }
1056 }
1057 });
1058 }
1059
1060 fn handle_min_connected_validators(&self) {
1063 if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
1065 let validators = self.connected_peers().read().clone();
1067 if validators.is_empty() {
1069 return;
1070 }
1071 if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1073 let self_ = self.clone();
1074 tokio::spawn(async move {
1075 self_.cache.increment_outbound_validators_requests(validator_ip);
1077 let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1079 });
1080 }
1081 }
1082 }
1083
1084 async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1086 if let Err(error) = self.inbound(peer_addr, message).await {
1088 if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
1089 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1090 let self_ = self.clone();
1091 tokio::spawn(async move {
1092 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1093 self_.disconnect(peer_ip);
1095 });
1096 }
1097 }
1098 }
1099
1100 fn handle_banned_ips(&self) {
1102 self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1103 }
1104}
1105
1106#[async_trait]
1107impl<N: Network> Transport<N> for Gateway<N> {
1108 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1116 macro_rules! send {
1117 ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1118 while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1120 tokio::time::sleep(Duration::from_millis(10)).await;
1122 }
1123 $self.send_inner(peer_ip, event)
1125 }};
1126 }
1127
1128 match event {
1130 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1131 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1133 send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1135 }
1136 Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1137 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1139 send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1141 }
1142 Event::BlockRequest(request) => {
1143 self.cache.insert_outbound_block_request(peer_ip, request);
1145 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1147 }
1148 _ => {
1149 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1151 }
1152 }
1153 }
1154
1155 fn broadcast(&self, event: Event<N>) {
1159 if self.number_of_connected_peers() > 0 {
1161 let self_ = self.clone();
1162 let connected_peers = self.connected_peers.read().clone();
1163 tokio::spawn(async move {
1164 for peer_ip in connected_peers {
1166 let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1168 }
1169 });
1170 }
1171 }
1172}
1173
1174impl<N: Network> P2P for Gateway<N> {
1175 fn tcp(&self) -> &Tcp {
1177 &self.tcp
1178 }
1179}
1180
1181#[async_trait]
1182impl<N: Network> Reading for Gateway<N> {
1183 type Codec = EventCodec<N>;
1184 type Message = Event<N>;
1185
1186 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1189 Default::default()
1190 }
1191
1192 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1194 if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1195 let self_ = self.clone();
1196 tokio::spawn(async move {
1199 self_.process_message_inner(peer_addr, message).await;
1200 });
1201 } else {
1202 self.process_message_inner(peer_addr, message).await;
1203 }
1204 Ok(())
1205 }
1206
1207 fn message_queue_depth(&self) -> usize {
1210 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1211 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1212 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1213 }
1214}
1215
1216#[async_trait]
1217impl<N: Network> Writing for Gateway<N> {
1218 type Codec = EventCodec<N>;
1219 type Message = Event<N>;
1220
1221 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1224 Default::default()
1225 }
1226
1227 fn message_queue_depth(&self) -> usize {
1231 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1232 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1233 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1234 }
1235}
1236
1237#[async_trait]
1238impl<N: Network> Disconnect for Gateway<N> {
1239 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1241 if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
1242 self.remove_connected_peer(peer_ip);
1243
1244 self.cache.clear_outbound_validators_requests(peer_ip);
1248 self.cache.clear_outbound_block_requests(peer_ip);
1249 }
1250 }
1251}
1252
1253#[async_trait]
1254impl<N: Network> OnConnect for Gateway<N> {
1255 async fn on_connect(&self, _peer_addr: SocketAddr) {
1256 return;
1257 }
1258}
1259
1260#[async_trait]
1261impl<N: Network> Handshake for Gateway<N> {
1262 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1264 let peer_addr = connection.addr();
1266 let peer_side = connection.side();
1267
1268 #[cfg(not(any(test)))]
1270 if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1271 if self.is_ip_banned(peer_addr.ip()) {
1273 trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
1274 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1275 }
1276
1277 let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1278
1279 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1280 if num_attempts > MAX_CONNECTION_ATTEMPTS {
1281 self.update_ip_ban(peer_addr.ip());
1282 trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1283 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1284 }
1285 }
1286
1287 let stream = self.borrow_stream(&mut connection);
1288
1289 let mut peer_ip = if peer_side == ConnectionSide::Initiator {
1292 debug!("{CONTEXT} Gateway received a connection request from '{peer_addr}'");
1293 None
1294 } else {
1295 debug!("{CONTEXT} Gateway is connecting to {peer_addr}...");
1296 Some(peer_addr)
1297 };
1298
1299 let restrictions_id = self.ledger.latest_restrictions_id();
1301
1302 let handshake_result = if peer_side == ConnectionSide::Responder {
1304 self.handshake_inner_initiator(peer_addr, peer_ip, restrictions_id, stream).await
1305 } else {
1306 self.handshake_inner_responder(peer_addr, &mut peer_ip, restrictions_id, stream).await
1307 };
1308
1309 if let Some(ip) = peer_ip {
1311 self.connecting_peers.lock().shift_remove(&ip);
1312 }
1313 let (ref peer_ip, _) = handshake_result?;
1314 info!("{CONTEXT} Gateway is connected to '{peer_ip}'");
1315
1316 Ok(connection)
1317 }
1318}
1319
1320macro_rules! expect_event {
1322 ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1323 match $framed.try_next().await? {
1324 Some($event_ty(data)) => {
1326 trace!("{CONTEXT} Gateway received '{}' from '{}'", data.name(), $peer_addr);
1327 data
1328 }
1329 Some(Event::Disconnect(reason)) => {
1331 return Err(error(format!("{CONTEXT} '{}' disconnected: {reason:?}", $peer_addr)));
1332 }
1333 Some(ty) => {
1335 return Err(error(format!(
1336 "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1337 $peer_addr,
1338 ty.name(),
1339 stringify!($event_ty),
1340 )))
1341 }
1342 None => {
1344 return Err(error(format!(
1345 "{CONTEXT} '{}' disconnected before sending {:?}",
1346 $peer_addr,
1347 stringify!($event_ty)
1348 )))
1349 }
1350 }
1351 };
1352}
1353
1354async fn send_event<N: Network>(
1356 framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1357 peer_addr: SocketAddr,
1358 event: Event<N>,
1359) -> io::Result<()> {
1360 trace!("{CONTEXT} Gateway is sending '{}' to '{peer_addr}'", event.name());
1361 framed.send(event).await
1362}
1363
1364impl<N: Network> Gateway<N> {
1365 async fn handshake_inner_initiator<'a>(
1367 &'a self,
1368 peer_addr: SocketAddr,
1369 peer_ip: Option<SocketAddr>,
1370 restrictions_id: Field<N>,
1371 stream: &'a mut TcpStream,
1372 ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec<N>>)> {
1373 let peer_ip = peer_ip.unwrap();
1375
1376 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1378
1379 let rng = &mut rand::rngs::OsRng;
1381
1382 let our_nonce = rng.r#gen();
1386 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce);
1388 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1389
1390 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1394 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1396
1397 if let Some(reason) = self
1399 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1400 .await
1401 {
1402 send_event(&mut framed, peer_addr, reason.into()).await?;
1403 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1404 }
1405 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1407 send_event(&mut framed, peer_addr, reason.into()).await?;
1408 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1409 }
1410
1411 let response_nonce: u64 = rng.r#gen();
1415 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1416 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1417 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1418 };
1419 let our_response =
1421 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1422 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1423
1424 self.insert_connected_peer(peer_ip, peer_addr, peer_request.address);
1426
1427 Ok((peer_ip, framed))
1428 }
1429
1430 async fn handshake_inner_responder<'a>(
1432 &'a self,
1433 peer_addr: SocketAddr,
1434 peer_ip: &mut Option<SocketAddr>,
1435 restrictions_id: Field<N>,
1436 stream: &'a mut TcpStream,
1437 ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec<N>>)> {
1438 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1440
1441 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1445
1446 if self.account.address() == peer_request.address {
1448 return Err(error("Skipping request to connect to self".to_string()));
1449 }
1450
1451 *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1453 let peer_ip = peer_ip.unwrap();
1454
1455 if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1457 return Err(error(format!("{forbidden_message}")));
1458 }
1459 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1461 send_event(&mut framed, peer_addr, reason.into()).await?;
1462 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1463 }
1464
1465 let rng = &mut rand::rngs::OsRng;
1469
1470 let response_nonce: u64 = rng.r#gen();
1472 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1473 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1474 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1475 };
1476 let our_response =
1478 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1479 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1480
1481 let our_nonce = rng.r#gen();
1483 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce);
1485 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1486
1487 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1491 if let Some(reason) = self
1493 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1494 .await
1495 {
1496 send_event(&mut framed, peer_addr, reason.into()).await?;
1497 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1498 }
1499 self.insert_connected_peer(peer_ip, peer_addr, peer_request.address);
1501
1502 Ok((peer_ip, framed))
1503 }
1504
1505 fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1507 let &ChallengeRequest { version, listener_port: _, address, nonce: _ } = event;
1509 if version < Event::<N>::VERSION {
1511 warn!("{CONTEXT} Gateway is dropping '{peer_addr}' on version {version} (outdated)");
1512 return Some(DisconnectReason::OutdatedClientVersion);
1513 }
1514 if !self.is_authorized_validator_address(address) {
1516 warn!("{CONTEXT} Gateway is dropping '{peer_addr}' for being an unauthorized validator ({address})");
1517 return Some(DisconnectReason::ProtocolViolation);
1518 }
1519 if self.is_connected_address(address) {
1521 warn!("{CONTEXT} Gateway is dropping '{peer_addr}' for being already connected ({address})");
1522 return Some(DisconnectReason::ProtocolViolation);
1523 }
1524 None
1525 }
1526
1527 async fn verify_challenge_response(
1529 &self,
1530 peer_addr: SocketAddr,
1531 peer_address: Address<N>,
1532 response: ChallengeResponse<N>,
1533 expected_restrictions_id: Field<N>,
1534 expected_nonce: u64,
1535 ) -> Option<DisconnectReason> {
1536 let ChallengeResponse { restrictions_id, signature, nonce } = response;
1538
1539 if restrictions_id != expected_restrictions_id {
1541 warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1542 return Some(DisconnectReason::InvalidChallengeResponse);
1543 }
1544 let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1546 warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1547 return Some(DisconnectReason::InvalidChallengeResponse);
1548 };
1549 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1551 warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (invalid signature)");
1552 return Some(DisconnectReason::InvalidChallengeResponse);
1553 }
1554 None
1555 }
1556}
1557
1558#[cfg(test)]
1559mod prop_tests {
1560 use crate::{
1561 Gateway,
1562 MAX_WORKERS,
1563 MEMORY_POOL_PORT,
1564 Worker,
1565 gateway::prop_tests::GatewayAddress::{Dev, Prod},
1566 helpers::{Storage, init_primary_channels, init_worker_channels},
1567 };
1568 use snarkos_account::Account;
1569 use snarkos_node_bft_ledger_service::MockLedgerService;
1570 use snarkos_node_bft_storage_service::BFTMemoryService;
1571 use snarkos_node_tcp::P2P;
1572 use snarkvm::{
1573 ledger::{
1574 committee::{
1575 Committee,
1576 prop_tests::{CommitteeContext, ValidatorSet},
1577 test_helpers::sample_committee_for_round_and_members,
1578 },
1579 narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1580 },
1581 prelude::{MainnetV0, PrivateKey},
1582 utilities::TestRng,
1583 };
1584
1585 use indexmap::{IndexMap, IndexSet};
1586 use proptest::{
1587 prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1588 sample::Selector,
1589 };
1590 use std::{
1591 fmt::{Debug, Formatter},
1592 net::{IpAddr, Ipv4Addr, SocketAddr},
1593 sync::Arc,
1594 };
1595 use test_strategy::proptest;
1596
1597 type CurrentNetwork = MainnetV0;
1598
1599 impl Debug for Gateway<CurrentNetwork> {
1600 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1601 f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1603 }
1604 }
1605
1606 #[derive(Debug, test_strategy::Arbitrary)]
1607 enum GatewayAddress {
1608 Dev(u8),
1609 Prod(Option<SocketAddr>),
1610 }
1611
1612 impl GatewayAddress {
1613 fn ip(&self) -> Option<SocketAddr> {
1614 if let GatewayAddress::Prod(ip) = self {
1615 return *ip;
1616 }
1617 None
1618 }
1619
1620 fn port(&self) -> Option<u16> {
1621 if let GatewayAddress::Dev(port) = self {
1622 return Some(*port as u16);
1623 }
1624 None
1625 }
1626 }
1627
1628 impl Arbitrary for Gateway<CurrentNetwork> {
1629 type Parameters = ();
1630 type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1631
1632 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1633 any_valid_dev_gateway()
1634 .prop_map(|(storage, _, private_key, address)| {
1635 Gateway::new(
1636 Account::try_from(private_key).unwrap(),
1637 storage.clone(),
1638 storage.ledger().clone(),
1639 address.ip(),
1640 &[],
1641 address.port(),
1642 )
1643 .unwrap()
1644 })
1645 .boxed()
1646 }
1647 }
1648
1649 type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1650
1651 fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1652 (any::<CommitteeContext>(), any::<Selector>())
1653 .prop_flat_map(|(context, account_selector)| {
1654 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1655 (
1656 any_with::<Storage<CurrentNetwork>>(context.clone()),
1657 Just(context),
1658 Just(account_selector.select(validators)),
1659 0u8..,
1660 )
1661 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1662 })
1663 .boxed()
1664 }
1665
1666 fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1667 (any::<CommitteeContext>(), any::<Selector>())
1668 .prop_flat_map(|(context, account_selector)| {
1669 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1670 (
1671 any_with::<Storage<CurrentNetwork>>(context.clone()),
1672 Just(context),
1673 Just(account_selector.select(validators)),
1674 any::<Option<SocketAddr>>(),
1675 )
1676 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1677 })
1678 .boxed()
1679 }
1680
1681 #[proptest]
1682 fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1683 let (storage, _, private_key, dev) = input;
1684 let account = Account::try_from(private_key).unwrap();
1685
1686 let gateway =
1687 Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
1688 .unwrap();
1689 let tcp_config = gateway.tcp().config();
1690 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1691 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1692
1693 let tcp_config = gateway.tcp().config();
1694 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1695 assert_eq!(gateway.account().address(), account.address());
1696 }
1697
1698 #[proptest]
1699 fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1700 let (storage, _, private_key, dev) = input;
1701 let account = Account::try_from(private_key).unwrap();
1702
1703 let gateway =
1704 Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
1705 .unwrap();
1706 let tcp_config = gateway.tcp().config();
1707 if let Some(socket_addr) = dev.ip() {
1708 assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1709 assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1710 } else {
1711 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1712 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1713 }
1714
1715 let tcp_config = gateway.tcp().config();
1716 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1717 assert_eq!(gateway.account().address(), account.address());
1718 }
1719
1720 #[proptest(async = "tokio")]
1721 async fn gateway_start(
1722 #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1723 #[strategy(0..MAX_WORKERS)] workers_count: u8,
1724 ) {
1725 let (storage, committee, private_key, dev) = input;
1726 let committee = committee.0;
1727 let worker_storage = storage.clone();
1728 let account = Account::try_from(private_key).unwrap();
1729
1730 let gateway =
1731 Gateway::new(account, storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();
1732
1733 let (primary_sender, _) = init_primary_channels();
1734
1735 let (workers, worker_senders) = {
1736 let mut tx_workers = IndexMap::new();
1738 let mut workers = IndexMap::new();
1739
1740 for id in 0..workers_count {
1742 let (tx_worker, rx_worker) = init_worker_channels();
1744 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1746 let worker =
1747 Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1748 .unwrap();
1749 worker.run(rx_worker);
1751
1752 workers.insert(id, worker);
1754 tx_workers.insert(id, tx_worker);
1755 }
1756 (workers, tx_workers)
1757 };
1758
1759 gateway.run(primary_sender, worker_senders, None).await;
1760 assert_eq!(
1761 gateway.local_ip(),
1762 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1763 );
1764 assert_eq!(gateway.num_workers(), workers.len() as u8);
1765 }
1766
1767 #[proptest]
1768 fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1769 let rng = &mut TestRng::default();
1770
1771 let current_round = 2;
1773 let committee_size = 4;
1774 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1775 let (_, _, private_key, dev) = input;
1776 let account = Account::try_from(private_key).unwrap();
1777
1778 let mut certificates = IndexSet::new();
1780 for _ in 0..committee_size {
1781 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1782 }
1783 let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1784 let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1786 for _ in 0..committee_size {
1788 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1789 }
1790 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1792 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1794 let gateway =
1796 Gateway::new(account.clone(), storage.clone(), ledger.clone(), dev.ip(), &[], dev.port()).unwrap();
1797 for certificate in certificates.iter() {
1799 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1800 }
1801 for i in 0..certificates.clone().len() {
1803 let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1804 if i < committee_size {
1805 assert!(is_authorized);
1806 } else {
1807 assert!(!is_authorized);
1808 }
1809 }
1810 }
1811}