1#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19 CONTEXT,
20 MAX_BATCH_DELAY_IN_MS,
21 MEMORY_POOL_PORT,
22 Worker,
23 events::{EventCodec, PrimaryPing},
24 helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker},
25 spawn_blocking,
26};
27use snarkos_account::Account;
28use snarkos_node_bft_events::{
29 BlockRequest,
30 BlockResponse,
31 CertificateRequest,
32 CertificateResponse,
33 ChallengeRequest,
34 ChallengeResponse,
35 DataBlocks,
36 DisconnectReason,
37 Event,
38 EventTrait,
39 TransmissionRequest,
40 TransmissionResponse,
41 ValidatorsRequest,
42 ValidatorsResponse,
43};
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
46use snarkos_node_tcp::{
47 Config,
48 Connection,
49 ConnectionSide,
50 P2P,
51 Tcp,
52 is_bogon_ip,
53 is_unspecified_or_broadcast_ip,
54 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
55};
56use snarkvm::{
57 console::prelude::*,
58 ledger::{
59 committee::Committee,
60 narwhal::{BatchHeader, Data},
61 },
62 prelude::{Address, Field},
63};
64
65use colored::Colorize;
66use futures::SinkExt;
67use indexmap::{IndexMap, IndexSet};
68#[cfg(feature = "locktick")]
69use locktick::parking_lot::{Mutex, RwLock};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use rand::seq::{IteratorRandom, SliceRandom};
73#[cfg(not(any(test)))]
74use std::net::IpAddr;
75use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
76use tokio::{
77 net::TcpStream,
78 sync::{OnceCell, oneshot},
79 task::{self, JoinHandle},
80};
81use tokio_stream::StreamExt;
82use tokio_util::codec::Framed;
83
84const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; const MAX_CONNECTION_ATTEMPTS: usize = 10;
91const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; const MIN_CONNECTED_VALIDATORS: usize = 175;
96const MAX_VALIDATORS_TO_SEND: usize = 200;
98
99#[cfg(not(any(test)))]
101const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
102const IP_BAN_TIME_IN_SECS: u64 = 300;
104
105#[async_trait]
108pub trait Transport<N: Network>: Send + Sync {
109 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
110 fn broadcast(&self, event: Event<N>);
111}
112
113#[derive(Clone)]
116pub struct Gateway<N: Network> {
117 account: Account<N>,
119 storage: Storage<N>,
121 ledger: Arc<dyn LedgerService<N>>,
123 tcp: Tcp,
125 cache: Arc<Cache<N>>,
127 resolver: Arc<Resolver<N>>,
129 trusted_validators: IndexSet<SocketAddr>,
131 connected_peers: Arc<RwLock<IndexSet<SocketAddr>>>,
133 connecting_peers: Arc<Mutex<IndexSet<SocketAddr>>>,
138 #[cfg(feature = "telemetry")]
140 validator_telemetry: Telemetry<N>,
141 primary_sender: Arc<OnceCell<PrimarySender<N>>>,
143 worker_senders: Arc<OnceCell<IndexMap<u8, WorkerSender<N>>>>,
145 sync_sender: Arc<OnceCell<SyncSender<N>>>,
147 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
149 dev: Option<u16>,
151}
152
153impl<N: Network> Gateway<N> {
154 pub fn new(
156 account: Account<N>,
157 storage: Storage<N>,
158 ledger: Arc<dyn LedgerService<N>>,
159 ip: Option<SocketAddr>,
160 trusted_validators: &[SocketAddr],
161 dev: Option<u16>,
162 ) -> Result<Self> {
163 let ip = match (ip, dev) {
165 (None, Some(dev)) => SocketAddr::from_str(&format!("127.0.0.1:{}", MEMORY_POOL_PORT + dev))?,
166 (None, None) => SocketAddr::from_str(&format!("0.0.0.0:{}", MEMORY_POOL_PORT))?,
167 (Some(ip), _) => ip,
168 };
169 let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
171
172 Ok(Self {
174 account,
175 storage,
176 ledger,
177 tcp,
178 cache: Default::default(),
179 resolver: Default::default(),
180 trusted_validators: trusted_validators.iter().copied().collect(),
181 connected_peers: Default::default(),
182 connecting_peers: Default::default(),
183 #[cfg(feature = "telemetry")]
184 validator_telemetry: Default::default(),
185 primary_sender: Default::default(),
186 worker_senders: Default::default(),
187 sync_sender: Default::default(),
188 handles: Default::default(),
189 dev,
190 })
191 }
192
193 pub async fn run(
195 &self,
196 primary_sender: PrimarySender<N>,
197 worker_senders: IndexMap<u8, WorkerSender<N>>,
198 sync_sender: Option<SyncSender<N>>,
199 ) {
200 debug!("Starting the gateway for the memory pool...");
201
202 self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
204
205 self.worker_senders.set(worker_senders).expect("The worker senders are already set");
207
208 if let Some(sync_sender) = sync_sender {
210 self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
211 }
212
213 self.enable_handshake().await;
215 self.enable_reading().await;
216 self.enable_writing().await;
217 self.enable_disconnect().await;
218 self.enable_on_connect().await;
219
220 let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
222 debug!("Listening for validator connections at address {listen_addr:?}");
223
224 self.initialize_heartbeat();
226
227 info!("Started the gateway for the memory pool at '{}'", self.local_ip());
228 }
229}
230
231impl<N: Network> Gateway<N> {
233 fn max_committee_size(&self) -> usize {
235 self.ledger.current_committee().map_or_else(
236 |_e| Committee::<N>::max_committee_size().unwrap() as usize,
237 |committee| committee.num_members(),
238 )
239 }
240
241 fn max_cache_events(&self) -> usize {
243 self.max_cache_transmissions()
244 }
245
246 fn max_cache_certificates(&self) -> usize {
248 2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
249 }
250
251 fn max_cache_transmissions(&self) -> usize {
253 self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
254 }
255
256 fn max_cache_duplicates(&self) -> usize {
258 self.max_committee_size().pow(2)
259 }
260}
261
262#[async_trait]
263impl<N: Network> CommunicationService for Gateway<N> {
264 type Message = Event<N>;
266
267 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
269 debug_assert!(start_height < end_height, "Invalid block request format");
270 Event::BlockRequest(BlockRequest { start_height, end_height })
271 }
272
273 async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
279 Transport::send(self, peer_ip, message).await
280 }
281
282 fn ban_peer(&self, peer_ip: SocketAddr) {
283 trace!("Banning peer {peer_ip} for timing out on block requests");
284
285 let tcp = self.tcp().clone();
286 tcp.banned_peers().update_ip_ban(peer_ip.ip());
287
288 tokio::spawn(async move {
289 tcp.disconnect(peer_ip).await;
290 });
291 }
292}
293
294impl<N: Network> Gateway<N> {
295 pub const fn account(&self) -> &Account<N> {
297 &self.account
298 }
299
300 pub const fn dev(&self) -> Option<u16> {
302 self.dev
303 }
304
305 pub fn local_ip(&self) -> SocketAddr {
307 self.tcp.listening_addr().expect("The TCP listener is not enabled")
308 }
309
310 pub fn is_local_ip(&self, ip: SocketAddr) -> bool {
312 ip == self.local_ip()
313 || (ip.ip().is_unspecified() || ip.ip().is_loopback()) && ip.port() == self.local_ip().port()
314 }
315
316 pub fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
318 !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
319 }
320
321 pub fn resolver(&self) -> &Resolver<N> {
323 &self.resolver
324 }
325
326 #[cfg(feature = "telemetry")]
328 pub fn validator_telemetry(&self) -> &Telemetry<N> {
329 &self.validator_telemetry
330 }
331
332 pub fn primary_sender(&self) -> &PrimarySender<N> {
334 self.primary_sender.get().expect("Primary sender not set in gateway")
335 }
336
337 pub fn num_workers(&self) -> u8 {
339 u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
340 .expect("Too many workers")
341 }
342
343 pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
345 self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
346 }
347
348 pub fn is_connected_address(&self, address: Address<N>) -> bool {
350 match self.resolver.get_peer_ip_for_address(address) {
352 Some(peer_ip) => self.is_connected_ip(peer_ip),
354 None => false,
355 }
356 }
357
358 pub fn is_connected_ip(&self, ip: SocketAddr) -> bool {
360 self.connected_peers.read().contains(&ip)
361 }
362
363 pub fn is_connecting_ip(&self, ip: SocketAddr) -> bool {
365 self.connecting_peers.lock().contains(&ip)
366 }
367
368 pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
370 if self.trusted_validators.contains(&ip) {
372 return true;
373 }
374 match self.resolver.get_address(ip) {
376 Some(address) => self.is_authorized_validator_address(address),
378 None => false,
379 }
380 }
381
382 pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
384 if self
393 .ledger
394 .get_committee_lookback_for_round(self.storage.current_round())
395 .map_or(false, |committee| committee.is_committee_member(validator_address))
396 {
397 return true;
398 }
399
400 if self.ledger.current_committee().map_or(false, |committee| committee.is_committee_member(validator_address)) {
402 return true;
403 }
404
405 let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
407 match self.ledger.get_block_round(previous_block_height) {
409 Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
410 self.ledger
411 .get_committee_lookback_for_round(round)
412 .map_or(false, |committee| committee.is_committee_member(validator_address))
413 }),
414 Err(_) => false,
415 }
416 }
417
418 pub fn max_connected_peers(&self) -> usize {
420 self.tcp.config().max_connections as usize
421 }
422
423 pub fn number_of_connected_peers(&self) -> usize {
425 self.connected_peers.read().len()
426 }
427
428 pub fn connected_addresses(&self) -> HashSet<Address<N>> {
430 self.connected_peers.read().iter().filter_map(|peer_ip| self.resolver.get_address(*peer_ip)).collect()
431 }
432
433 pub fn connected_peers(&self) -> &RwLock<IndexSet<SocketAddr>> {
435 &self.connected_peers
436 }
437
438 pub fn connect(&self, peer_ip: SocketAddr) -> Option<JoinHandle<()>> {
440 if let Err(forbidden_error) = self.check_connection_attempt(peer_ip) {
442 warn!("{forbidden_error}");
443 return None;
444 }
445
446 let self_ = self.clone();
447 Some(tokio::spawn(async move {
448 debug!("Connecting to validator {peer_ip}...");
449 if let Err(error) = self_.tcp.connect(peer_ip).await {
451 self_.connecting_peers.lock().shift_remove(&peer_ip);
452 warn!("Unable to connect to '{peer_ip}' - {error}");
453 }
454 }))
455 }
456
457 fn check_connection_attempt(&self, peer_ip: SocketAddr) -> Result<()> {
459 if self.is_local_ip(peer_ip) {
461 bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (attempted to self-connect)")
462 }
463 if self.number_of_connected_peers() >= self.max_connected_peers() {
465 bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (maximum peers reached)")
466 }
467 if self.is_connected_ip(peer_ip) {
469 bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connected)")
470 }
471 if self.is_connecting_ip(peer_ip) {
473 bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connecting)")
474 }
475 Ok(())
476 }
477
478 fn ensure_peer_is_allowed(&self, peer_ip: SocketAddr) -> Result<()> {
480 if self.is_local_ip(peer_ip) {
482 bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (attempted to self-connect)")
483 }
484 if !self.connecting_peers.lock().insert(peer_ip) {
486 bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already shaking hands as the initiator)")
487 }
488 if self.is_connected_ip(peer_ip) {
490 bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already connected)")
491 }
492 if !peer_ip.ip().is_loopback() {
494 let num_attempts = self.cache.insert_inbound_connection(peer_ip.ip(), RESTRICTED_INTERVAL);
496 if num_attempts > MAX_CONNECTION_ATTEMPTS {
498 bail!("Dropping connection request from '{peer_ip}' (tried {num_attempts} times)")
499 }
500 }
501 Ok(())
502 }
503
504 #[cfg(not(any(test)))]
506 fn is_ip_banned(&self, ip: IpAddr) -> bool {
507 self.tcp.banned_peers().is_ip_banned(&ip)
508 }
509
510 #[cfg(not(any(test)))]
512 fn update_ip_ban(&self, ip: IpAddr) {
513 self.tcp.banned_peers().update_ip_ban(ip);
514 }
515
516 #[cfg(feature = "metrics")]
517 fn update_metrics(&self) {
518 metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
519 metrics::gauge(metrics::bft::CONNECTING, self.connecting_peers.lock().len() as f64);
520 }
521
522 #[cfg(not(test))]
524 fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
525 self.resolver.insert_peer(peer_ip, peer_addr, address);
527 self.connected_peers.write().insert(peer_ip);
529 #[cfg(feature = "metrics")]
530 self.update_metrics();
531 }
532
533 #[cfg(test)]
535 pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
536 self.resolver.insert_peer(peer_ip, peer_addr, address);
538 self.connected_peers.write().insert(peer_ip);
540 }
541
542 fn remove_connected_peer(&self, peer_ip: SocketAddr) {
544 if let Some(sync_sender) = self.sync_sender.get() {
546 let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
547 tokio::spawn(async move {
548 if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
549 warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
550 }
551 });
552 }
553 self.resolver.remove_peer(peer_ip);
555 self.connected_peers.write().shift_remove(&peer_ip);
557 #[cfg(feature = "metrics")]
558 self.update_metrics();
559 }
560
561 fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
567 let Some(peer_addr) = self.resolver.get_ambiguous(peer_ip) else {
569 warn!("Unable to resolve the listener IP address '{peer_ip}'");
570 return None;
571 };
572 let name = event.name();
574 trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
576 let result = self.unicast(peer_addr, event);
577 if let Err(e) = &result {
579 warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
580 debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
581 self.disconnect(peer_ip);
582 }
583 result.ok()
584 }
585
586 async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<()> {
588 let Some(peer_ip) = self.resolver.get_listener(peer_addr) else {
590 bail!("{CONTEXT} Unable to resolve the (ambiguous) peer address '{peer_addr}'")
591 };
592 if !self.is_authorized_validator_ip(peer_ip) {
594 bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
595 }
596 let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
598 if num_events >= self.max_cache_events() {
599 bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
600 }
601 match event {
603 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
604 let certificate_id = match &event {
606 Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
607 Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
608 _ => unreachable!(),
609 };
610 let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
612 if num_events >= self.max_cache_duplicates() {
613 return Ok(());
614 }
615 }
616 Event::TransmissionRequest(TransmissionRequest { transmission_id })
617 | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
618 let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
620 if num_events >= self.max_cache_duplicates() {
621 return Ok(());
622 }
623 }
624 Event::BlockRequest(_) => {
625 let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
626 if num_events >= self.max_cache_duplicates() {
627 return Ok(());
628 }
629 }
630 _ => {}
631 }
632 trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
633
634 match event {
637 Event::BatchPropose(batch_propose) => {
638 let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
640 Ok(())
641 }
642 Event::BatchSignature(batch_signature) => {
643 let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
645 Ok(())
646 }
647 Event::BatchCertified(batch_certified) => {
648 let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
650 Ok(())
651 }
652 Event::BlockRequest(block_request) => {
653 let BlockRequest { start_height, end_height } = block_request;
654
655 if start_height >= end_height {
657 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
658 }
659 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
661 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
662 }
663
664 let self_ = self.clone();
665 let blocks = match task::spawn_blocking(move || {
666 match self_.ledger.get_blocks(start_height..end_height) {
668 Ok(blocks) => Ok(Data::Object(DataBlocks(blocks))),
669 Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
670 }
671 })
672 .await
673 {
674 Ok(Ok(blocks)) => blocks,
675 Ok(Err(error)) => return Err(error),
676 Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
677 };
678
679 let self_ = self.clone();
680 tokio::spawn(async move {
681 let event = Event::BlockResponse(BlockResponse { request: block_request, blocks });
683 Transport::send(&self_, peer_ip, event).await;
684 });
685 Ok(())
686 }
687 Event::BlockResponse(block_response) => {
688 if let Some(sync_sender) = self.sync_sender.get() {
690 let BlockResponse { request, blocks } = block_response;
692
693 if !self.cache.remove_outbound_block_request(peer_ip, &request) {
695 bail!("Unsolicited block response from '{peer_ip}'")
696 }
697
698 let (send, recv) = tokio::sync::oneshot::channel();
702 rayon::spawn_fifo(move || {
703 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
704 let _ = send.send(blocks);
705 });
706 let blocks = match recv.await {
707 Ok(Ok(blocks)) => blocks,
708 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
709 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
710 };
711
712 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
714 if let Err(e) = sync_sender.advance_with_sync_blocks(peer_ip, blocks.0).await {
716 warn!("Unable to process block response from '{peer_ip}' - {e}");
717 }
718 }
719 Ok(())
720 }
721 Event::CertificateRequest(certificate_request) => {
722 if let Some(sync_sender) = self.sync_sender.get() {
725 let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
727 }
728 Ok(())
729 }
730 Event::CertificateResponse(certificate_response) => {
731 if let Some(sync_sender) = self.sync_sender.get() {
734 let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
736 }
737 Ok(())
738 }
739 Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
740 bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
742 }
743 Event::Disconnect(disconnect) => {
744 bail!("{CONTEXT} {:?}", disconnect.reason)
745 }
746 Event::PrimaryPing(ping) => {
747 let PrimaryPing { version, block_locators, primary_certificate } = ping;
748
749 if version < Event::<N>::VERSION {
751 bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
752 }
753
754 if let Some(sync_sender) = self.sync_sender.get() {
756 if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
758 bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
759 }
760 }
761
762 let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
764 Ok(())
765 }
766 Event::TransmissionRequest(request) => {
767 let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
770 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
771 return Ok(());
772 };
773 if let Some(sender) = self.get_worker_sender(worker_id) {
775 let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
777 }
778 Ok(())
779 }
780 Event::TransmissionResponse(response) => {
781 let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
783 warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
784 return Ok(());
785 };
786 if let Some(sender) = self.get_worker_sender(worker_id) {
788 let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
790 }
791 Ok(())
792 }
793 Event::ValidatorsRequest(_) => {
794 let mut connected_peers: Vec<_> = match self.dev.is_some() {
796 true => self.connected_peers.read().iter().copied().collect(),
798 false => {
800 self.connected_peers.read().iter().copied().filter(|ip| self.is_valid_peer_ip(*ip)).collect()
801 }
802 };
803 connected_peers.shuffle(&mut rand::thread_rng());
805
806 let self_ = self.clone();
807 tokio::spawn(async move {
808 let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
810 for validator_ip in connected_peers.into_iter().take(MAX_VALIDATORS_TO_SEND) {
812 if let Some(validator_address) = self_.resolver.get_address(validator_ip) {
814 validators.insert(validator_ip, validator_address);
816 }
817 }
818 let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
820 Transport::send(&self_, peer_ip, event).await;
821 });
822 Ok(())
823 }
824 Event::ValidatorsResponse(response) => {
825 let ValidatorsResponse { validators } = response;
826 ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
828 if !self.cache.contains_outbound_validators_request(peer_ip) {
830 bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
831 }
832 self.cache.decrement_outbound_validators_requests(peer_ip);
834
835 if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
837 let self_ = self.clone();
839 tokio::spawn(async move {
840 for (validator_ip, validator_address) in validators {
841 if self_.dev.is_some() {
842 if self_.is_local_ip(validator_ip) {
844 continue;
845 }
846 } else {
847 if !self_.is_valid_peer_ip(validator_ip) {
849 continue;
850 }
851 }
852
853 if self_.account.address() == validator_address {
855 continue;
856 }
857 if self_.is_connected_ip(validator_ip) || self_.is_connecting_ip(validator_ip) {
859 continue;
860 }
861 if self_.is_connected_address(validator_address) {
863 continue;
864 }
865 if !self_.is_authorized_validator_address(validator_address) {
867 continue;
868 }
869 self_.connect(validator_ip);
871 }
872 });
873 }
874 Ok(())
875 }
876 Event::WorkerPing(ping) => {
877 ensure!(
879 ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
880 "{CONTEXT} Received too many transmissions"
881 );
882 let num_workers = self.num_workers();
884 for transmission_id in ping.transmission_ids.into_iter() {
886 let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
888 warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
889 continue;
890 };
891 if let Some(sender) = self.get_worker_sender(worker_id) {
893 let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
895 }
896 }
897 Ok(())
898 }
899 }
900 }
901
902 pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle<()> {
904 let gateway = self.clone();
905 tokio::spawn(async move {
906 if let Some(peer_addr) = gateway.resolver.get_ambiguous(peer_ip) {
907 let _disconnected = gateway.tcp.disconnect(peer_addr).await;
909 debug_assert!(_disconnected);
910 }
911 })
912 }
913
914 fn initialize_heartbeat(&self) {
916 let self_clone = self.clone();
917 self.spawn(async move {
918 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
920 info!("Starting the heartbeat of the gateway...");
921 loop {
922 self_clone.heartbeat();
924 tokio::time::sleep(Duration::from_secs(15)).await;
926 }
927 });
928 }
929
930 #[allow(dead_code)]
932 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
933 self.handles.lock().push(tokio::spawn(future));
934 }
935
936 pub async fn shut_down(&self) {
938 info!("Shutting down the gateway...");
939 self.handles.lock().iter().for_each(|handle| handle.abort());
941 self.tcp.shut_down().await;
943 }
944}
945
946impl<N: Network> Gateway<N> {
947 fn heartbeat(&self) {
949 self.log_connected_validators();
951 #[cfg(feature = "telemetry")]
953 self.log_participation_scores();
954 self.handle_trusted_validators();
956 self.handle_unauthorized_validators();
958 self.handle_min_connected_validators();
960 self.handle_banned_ips();
962 }
963
964 fn log_connected_validators(&self) {
966 let connected_validators = self.connected_peers().read().clone();
968 let validators_total = self.ledger.current_committee().map_or(0, |c| c.num_members().saturating_sub(1));
970 let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
972 let connections_msg = match connected_validators.len() {
974 0 => "No connected validators".to_string(),
975 num_connected => format!("Connected to {num_connected} validators {total_validators}"),
976 };
977 let mut connected_validator_addresses = IndexSet::with_capacity(connected_validators.len());
979 connected_validator_addresses.insert(self.account.address());
980 info!("{connections_msg}");
982 for peer_ip in &connected_validators {
983 let address = self.resolver.get_address(*peer_ip).map_or("Unknown".to_string(), |a| {
984 connected_validator_addresses.insert(a);
985 a.to_string()
986 });
987 debug!("{}", format!(" {peer_ip} - {address}").dimmed());
988 }
989
990 let num_not_connected = validators_total.saturating_sub(connected_validators.len());
992 if num_not_connected > 0 {
993 info!("Not connected to {num_not_connected} validators {total_validators}");
994 let committee_members: IndexSet<_> =
996 self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
997
998 for address in committee_members.difference(&connected_validator_addresses) {
1000 debug!("{}", format!(" Not connected to {address}").dimmed());
1001 }
1002 }
1003 }
1004
1005 #[cfg(feature = "telemetry")]
1007 fn log_participation_scores(&self) {
1008 if let Ok(current_committee) = self.ledger.current_committee() {
1009 let participation_scores = self.validator_telemetry().get_participation_scores(¤t_committee);
1011 debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
1013 for (address, score) in participation_scores {
1014 debug!("{}", format!(" {address} - {score:.2}%").dimmed());
1015 }
1016 }
1017 }
1018
1019 fn handle_trusted_validators(&self) {
1021 for validator_ip in &self.trusted_validators {
1023 if !self.is_local_ip(*validator_ip)
1025 && !self.is_connecting_ip(*validator_ip)
1026 && !self.is_connected_ip(*validator_ip)
1027 {
1028 self.connect(*validator_ip);
1030 }
1031 }
1032 }
1033
1034 fn handle_unauthorized_validators(&self) {
1036 let self_ = self.clone();
1037 tokio::spawn(async move {
1038 let validators = self_.connected_peers().read().clone();
1040 for peer_ip in validators {
1042 if !self_.is_authorized_validator_ip(peer_ip) {
1044 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - Validator is not in the current committee");
1045 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1046 self_.disconnect(peer_ip);
1048 }
1049 }
1050 });
1051 }
1052
1053 fn handle_min_connected_validators(&self) {
1056 if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
1058 let validators = self.connected_peers().read().clone();
1060 if validators.is_empty() {
1062 return;
1063 }
1064 if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1066 let self_ = self.clone();
1067 tokio::spawn(async move {
1068 self_.cache.increment_outbound_validators_requests(validator_ip);
1070 let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1072 });
1073 }
1074 }
1075 }
1076
1077 async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1079 if let Err(error) = self.inbound(peer_addr, message).await {
1081 if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
1082 warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1083 let self_ = self.clone();
1084 tokio::spawn(async move {
1085 Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1086 self_.disconnect(peer_ip);
1088 });
1089 }
1090 }
1091 }
1092
1093 fn handle_banned_ips(&self) {
1095 self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1096 }
1097}
1098
1099#[async_trait]
1100impl<N: Network> Transport<N> for Gateway<N> {
1101 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1109 macro_rules! send {
1110 ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1111 while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1113 tokio::time::sleep(Duration::from_millis(10)).await;
1115 }
1116 $self.send_inner(peer_ip, event)
1118 }};
1119 }
1120
1121 match event {
1123 Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1124 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1126 send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1128 }
1129 Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1130 self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1132 send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1134 }
1135 Event::BlockRequest(request) => {
1136 self.cache.insert_outbound_block_request(peer_ip, request);
1138 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1140 }
1141 _ => {
1142 send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1144 }
1145 }
1146 }
1147
1148 fn broadcast(&self, event: Event<N>) {
1152 if self.number_of_connected_peers() > 0 {
1154 let self_ = self.clone();
1155 let connected_peers = self.connected_peers.read().clone();
1156 tokio::spawn(async move {
1157 for peer_ip in connected_peers {
1159 let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1161 }
1162 });
1163 }
1164 }
1165}
1166
1167impl<N: Network> P2P for Gateway<N> {
1168 fn tcp(&self) -> &Tcp {
1170 &self.tcp
1171 }
1172}
1173
1174#[async_trait]
1175impl<N: Network> Reading for Gateway<N> {
1176 type Codec = EventCodec<N>;
1177 type Message = Event<N>;
1178
1179 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1182 Default::default()
1183 }
1184
1185 async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1187 if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1188 let self_ = self.clone();
1189 tokio::spawn(async move {
1192 self_.process_message_inner(peer_addr, message).await;
1193 });
1194 } else {
1195 self.process_message_inner(peer_addr, message).await;
1196 }
1197 Ok(())
1198 }
1199
1200 fn message_queue_depth(&self) -> usize {
1203 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1204 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1205 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1206 }
1207}
1208
1209#[async_trait]
1210impl<N: Network> Writing for Gateway<N> {
1211 type Codec = EventCodec<N>;
1212 type Message = Event<N>;
1213
1214 fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1217 Default::default()
1218 }
1219
1220 fn message_queue_depth(&self) -> usize {
1224 2 * BatchHeader::<N>::MAX_GC_ROUNDS
1225 * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1226 * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1227 }
1228}
1229
1230#[async_trait]
1231impl<N: Network> Disconnect for Gateway<N> {
1232 async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1234 if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
1235 self.remove_connected_peer(peer_ip);
1236
1237 self.cache.clear_outbound_validators_requests(peer_ip);
1241 self.cache.clear_outbound_block_requests(peer_ip);
1242 }
1243 }
1244}
1245
1246#[async_trait]
1247impl<N: Network> OnConnect for Gateway<N> {
1248 async fn on_connect(&self, _peer_addr: SocketAddr) {
1249 return;
1250 }
1251}
1252
1253#[async_trait]
1254impl<N: Network> Handshake for Gateway<N> {
1255 async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1257 let peer_addr = connection.addr();
1259 let peer_side = connection.side();
1260
1261 #[cfg(not(any(test)))]
1263 if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1264 if self.is_ip_banned(peer_addr.ip()) {
1266 trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
1267 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1268 }
1269
1270 let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1271
1272 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1273 if num_attempts > MAX_CONNECTION_ATTEMPTS {
1274 self.update_ip_ban(peer_addr.ip());
1275 trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1276 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1277 }
1278 }
1279
1280 let stream = self.borrow_stream(&mut connection);
1281
1282 let mut peer_ip = if peer_side == ConnectionSide::Initiator {
1285 debug!("{CONTEXT} Gateway received a connection request from '{peer_addr}'");
1286 None
1287 } else {
1288 debug!("{CONTEXT} Gateway is connecting to {peer_addr}...");
1289 Some(peer_addr)
1290 };
1291
1292 let restrictions_id = self.ledger.latest_restrictions_id();
1294
1295 let handshake_result = if peer_side == ConnectionSide::Responder {
1297 self.handshake_inner_initiator(peer_addr, peer_ip, restrictions_id, stream).await
1298 } else {
1299 self.handshake_inner_responder(peer_addr, &mut peer_ip, restrictions_id, stream).await
1300 };
1301
1302 if let Some(ip) = peer_ip {
1304 self.connecting_peers.lock().shift_remove(&ip);
1305 }
1306 let (ref peer_ip, _) = handshake_result?;
1307 info!("{CONTEXT} Gateway is connected to '{peer_ip}'");
1308
1309 Ok(connection)
1310 }
1311}
1312
1313macro_rules! expect_event {
1315 ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1316 match $framed.try_next().await? {
1317 Some($event_ty(data)) => {
1319 trace!("{CONTEXT} Gateway received '{}' from '{}'", data.name(), $peer_addr);
1320 data
1321 }
1322 Some(Event::Disconnect(reason)) => {
1324 return Err(error(format!("{CONTEXT} '{}' disconnected: {reason:?}", $peer_addr)));
1325 }
1326 Some(ty) => {
1328 return Err(error(format!(
1329 "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1330 $peer_addr,
1331 ty.name(),
1332 stringify!($event_ty),
1333 )))
1334 }
1335 None => {
1337 return Err(error(format!(
1338 "{CONTEXT} '{}' disconnected before sending {:?}",
1339 $peer_addr,
1340 stringify!($event_ty)
1341 )))
1342 }
1343 }
1344 };
1345}
1346
1347async fn send_event<N: Network>(
1349 framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1350 peer_addr: SocketAddr,
1351 event: Event<N>,
1352) -> io::Result<()> {
1353 trace!("{CONTEXT} Gateway is sending '{}' to '{peer_addr}'", event.name());
1354 framed.send(event).await
1355}
1356
1357impl<N: Network> Gateway<N> {
1358 async fn handshake_inner_initiator<'a>(
1360 &'a self,
1361 peer_addr: SocketAddr,
1362 peer_ip: Option<SocketAddr>,
1363 restrictions_id: Field<N>,
1364 stream: &'a mut TcpStream,
1365 ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec<N>>)> {
1366 let peer_ip = peer_ip.unwrap();
1368
1369 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1371
1372 let rng = &mut rand::rngs::OsRng;
1374
1375 let our_nonce = rng.gen();
1379 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce);
1381 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1382
1383 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1387 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1389
1390 if let Some(reason) = self
1392 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1393 .await
1394 {
1395 send_event(&mut framed, peer_addr, reason.into()).await?;
1396 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1397 }
1398 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1400 send_event(&mut framed, peer_addr, reason.into()).await?;
1401 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1402 }
1403
1404 let response_nonce: u64 = rng.gen();
1408 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1409 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1410 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1411 };
1412 let our_response =
1414 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1415 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1416
1417 self.insert_connected_peer(peer_ip, peer_addr, peer_request.address);
1419
1420 Ok((peer_ip, framed))
1421 }
1422
1423 async fn handshake_inner_responder<'a>(
1425 &'a self,
1426 peer_addr: SocketAddr,
1427 peer_ip: &mut Option<SocketAddr>,
1428 restrictions_id: Field<N>,
1429 stream: &'a mut TcpStream,
1430 ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec<N>>)> {
1431 let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1433
1434 let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1438
1439 if self.account.address() == peer_request.address {
1441 return Err(error("Skipping request to connect to self".to_string()));
1442 }
1443
1444 *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1446 let peer_ip = peer_ip.unwrap();
1447
1448 if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1450 return Err(error(format!("{forbidden_message}")));
1451 }
1452 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1454 send_event(&mut framed, peer_addr, reason.into()).await?;
1455 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1456 }
1457
1458 let rng = &mut rand::rngs::OsRng;
1462
1463 let response_nonce: u64 = rng.gen();
1465 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1466 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1467 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1468 };
1469 let our_response =
1471 ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1472 send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1473
1474 let our_nonce = rng.gen();
1476 let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce);
1478 send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1479
1480 let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1484 if let Some(reason) = self
1486 .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1487 .await
1488 {
1489 send_event(&mut framed, peer_addr, reason.into()).await?;
1490 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1491 }
1492 self.insert_connected_peer(peer_ip, peer_addr, peer_request.address);
1494
1495 Ok((peer_ip, framed))
1496 }
1497
1498 fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1500 let &ChallengeRequest { version, listener_port: _, address, nonce: _ } = event;
1502 if version < Event::<N>::VERSION {
1504 warn!("{CONTEXT} Gateway is dropping '{peer_addr}' on version {version} (outdated)");
1505 return Some(DisconnectReason::OutdatedClientVersion);
1506 }
1507 if !self.is_authorized_validator_address(address) {
1509 warn!("{CONTEXT} Gateway is dropping '{peer_addr}' for being an unauthorized validator ({address})");
1510 return Some(DisconnectReason::ProtocolViolation);
1511 }
1512 if self.is_connected_address(address) {
1514 warn!("{CONTEXT} Gateway is dropping '{peer_addr}' for being already connected ({address})");
1515 return Some(DisconnectReason::ProtocolViolation);
1516 }
1517 None
1518 }
1519
1520 async fn verify_challenge_response(
1522 &self,
1523 peer_addr: SocketAddr,
1524 peer_address: Address<N>,
1525 response: ChallengeResponse<N>,
1526 expected_restrictions_id: Field<N>,
1527 expected_nonce: u64,
1528 ) -> Option<DisconnectReason> {
1529 let ChallengeResponse { restrictions_id, signature, nonce } = response;
1531
1532 if restrictions_id != expected_restrictions_id {
1534 warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1535 return Some(DisconnectReason::InvalidChallengeResponse);
1536 }
1537 let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1539 warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1540 return Some(DisconnectReason::InvalidChallengeResponse);
1541 };
1542 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1544 warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (invalid signature)");
1545 return Some(DisconnectReason::InvalidChallengeResponse);
1546 }
1547 None
1548 }
1549}
1550
1551#[cfg(test)]
1552mod prop_tests {
1553 use crate::{
1554 Gateway,
1555 MAX_WORKERS,
1556 MEMORY_POOL_PORT,
1557 Worker,
1558 gateway::prop_tests::GatewayAddress::{Dev, Prod},
1559 helpers::{Storage, init_primary_channels, init_worker_channels},
1560 };
1561 use snarkos_account::Account;
1562 use snarkos_node_bft_ledger_service::MockLedgerService;
1563 use snarkos_node_bft_storage_service::BFTMemoryService;
1564 use snarkos_node_tcp::P2P;
1565 use snarkvm::{
1566 ledger::{
1567 committee::{
1568 Committee,
1569 prop_tests::{CommitteeContext, ValidatorSet},
1570 test_helpers::sample_committee_for_round_and_members,
1571 },
1572 narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1573 },
1574 prelude::{MainnetV0, PrivateKey},
1575 utilities::TestRng,
1576 };
1577
1578 use indexmap::{IndexMap, IndexSet};
1579 use proptest::{
1580 prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1581 sample::Selector,
1582 };
1583 use std::{
1584 fmt::{Debug, Formatter},
1585 net::{IpAddr, Ipv4Addr, SocketAddr},
1586 sync::Arc,
1587 };
1588 use test_strategy::proptest;
1589
1590 type CurrentNetwork = MainnetV0;
1591
1592 impl Debug for Gateway<CurrentNetwork> {
1593 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1594 f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1596 }
1597 }
1598
1599 #[derive(Debug, test_strategy::Arbitrary)]
1600 enum GatewayAddress {
1601 Dev(u8),
1602 Prod(Option<SocketAddr>),
1603 }
1604
1605 impl GatewayAddress {
1606 fn ip(&self) -> Option<SocketAddr> {
1607 if let GatewayAddress::Prod(ip) = self {
1608 return *ip;
1609 }
1610 None
1611 }
1612
1613 fn port(&self) -> Option<u16> {
1614 if let GatewayAddress::Dev(port) = self {
1615 return Some(*port as u16);
1616 }
1617 None
1618 }
1619 }
1620
1621 impl Arbitrary for Gateway<CurrentNetwork> {
1622 type Parameters = ();
1623 type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1624
1625 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1626 any_valid_dev_gateway()
1627 .prop_map(|(storage, _, private_key, address)| {
1628 Gateway::new(
1629 Account::try_from(private_key).unwrap(),
1630 storage.clone(),
1631 storage.ledger().clone(),
1632 address.ip(),
1633 &[],
1634 address.port(),
1635 )
1636 .unwrap()
1637 })
1638 .boxed()
1639 }
1640 }
1641
1642 type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1643
1644 fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1645 (any::<CommitteeContext>(), any::<Selector>())
1646 .prop_flat_map(|(context, account_selector)| {
1647 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1648 (
1649 any_with::<Storage<CurrentNetwork>>(context.clone()),
1650 Just(context),
1651 Just(account_selector.select(validators)),
1652 0u8..,
1653 )
1654 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1655 })
1656 .boxed()
1657 }
1658
1659 fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1660 (any::<CommitteeContext>(), any::<Selector>())
1661 .prop_flat_map(|(context, account_selector)| {
1662 let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1663 (
1664 any_with::<Storage<CurrentNetwork>>(context.clone()),
1665 Just(context),
1666 Just(account_selector.select(validators)),
1667 any::<Option<SocketAddr>>(),
1668 )
1669 .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1670 })
1671 .boxed()
1672 }
1673
1674 #[proptest]
1675 fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1676 let (storage, _, private_key, dev) = input;
1677 let account = Account::try_from(private_key).unwrap();
1678
1679 let gateway =
1680 Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
1681 .unwrap();
1682 let tcp_config = gateway.tcp().config();
1683 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1684 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1685
1686 let tcp_config = gateway.tcp().config();
1687 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1688 assert_eq!(gateway.account().address(), account.address());
1689 }
1690
1691 #[proptest]
1692 fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1693 let (storage, _, private_key, dev) = input;
1694 let account = Account::try_from(private_key).unwrap();
1695
1696 let gateway =
1697 Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
1698 .unwrap();
1699 let tcp_config = gateway.tcp().config();
1700 if let Some(socket_addr) = dev.ip() {
1701 assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1702 assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1703 } else {
1704 assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1705 assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1706 }
1707
1708 let tcp_config = gateway.tcp().config();
1709 assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1710 assert_eq!(gateway.account().address(), account.address());
1711 }
1712
1713 #[proptest(async = "tokio")]
1714 async fn gateway_start(
1715 #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1716 #[strategy(0..MAX_WORKERS)] workers_count: u8,
1717 ) {
1718 let (storage, committee, private_key, dev) = input;
1719 let committee = committee.0;
1720 let worker_storage = storage.clone();
1721 let account = Account::try_from(private_key).unwrap();
1722
1723 let gateway =
1724 Gateway::new(account, storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();
1725
1726 let (primary_sender, _) = init_primary_channels();
1727
1728 let (workers, worker_senders) = {
1729 let mut tx_workers = IndexMap::new();
1731 let mut workers = IndexMap::new();
1732
1733 for id in 0..workers_count {
1735 let (tx_worker, rx_worker) = init_worker_channels();
1737 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1739 let worker =
1740 Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1741 .unwrap();
1742 worker.run(rx_worker);
1744
1745 workers.insert(id, worker);
1747 tx_workers.insert(id, tx_worker);
1748 }
1749 (workers, tx_workers)
1750 };
1751
1752 gateway.run(primary_sender, worker_senders, None).await;
1753 assert_eq!(
1754 gateway.local_ip(),
1755 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1756 );
1757 assert_eq!(gateway.num_workers(), workers.len() as u8);
1758 }
1759
1760 #[proptest]
1761 fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1762 let rng = &mut TestRng::default();
1763
1764 let current_round = 2;
1766 let committee_size = 4;
1767 let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1768 let (_, _, private_key, dev) = input;
1769 let account = Account::try_from(private_key).unwrap();
1770
1771 let mut certificates = IndexSet::new();
1773 for _ in 0..committee_size {
1774 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1775 }
1776 let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1777 let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1779 for _ in 0..committee_size {
1781 certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1782 }
1783 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1785 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1787 let gateway =
1789 Gateway::new(account.clone(), storage.clone(), ledger.clone(), dev.ip(), &[], dev.port()).unwrap();
1790 for certificate in certificates.iter() {
1792 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1793 }
1794 for i in 0..certificates.clone().len() {
1796 let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1797 if i < committee_size {
1798 assert!(is_authorized);
1799 } else {
1800 assert!(!is_authorized);
1801 }
1802 }
1803 }
1804}