snarkos_node_bft/
gateway.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19    CONTEXT,
20    MAX_BATCH_DELAY_IN_MS,
21    MEMORY_POOL_PORT,
22    Worker,
23    events::{EventCodec, PrimaryPing},
24    helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker},
25    spawn_blocking,
26};
27use snarkos_account::Account;
28use snarkos_node_bft_events::{
29    BlockRequest,
30    BlockResponse,
31    CertificateRequest,
32    CertificateResponse,
33    ChallengeRequest,
34    ChallengeResponse,
35    DataBlocks,
36    DisconnectReason,
37    Event,
38    EventTrait,
39    TransmissionRequest,
40    TransmissionResponse,
41    ValidatorsRequest,
42    ValidatorsResponse,
43};
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
46use snarkos_node_tcp::{
47    Config,
48    Connection,
49    ConnectionSide,
50    P2P,
51    Tcp,
52    is_bogon_ip,
53    is_unspecified_or_broadcast_ip,
54    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
55};
56use snarkvm::{
57    console::prelude::*,
58    ledger::{
59        committee::Committee,
60        narwhal::{BatchHeader, Data},
61    },
62    prelude::{Address, Field},
63};
64
65use colored::Colorize;
66use futures::SinkExt;
67use indexmap::{IndexMap, IndexSet};
68#[cfg(feature = "locktick")]
69use locktick::parking_lot::{Mutex, RwLock};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use rand::seq::{IteratorRandom, SliceRandom};
73#[cfg(not(any(test)))]
74use std::net::IpAddr;
75use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
76use tokio::{
77    net::TcpStream,
78    sync::{OnceCell, oneshot},
79    task::{self, JoinHandle},
80};
81use tokio_stream::StreamExt;
82use tokio_util::codec::Framed;
83
84/// The maximum interval of events to cache.
85const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
86/// The maximum interval of requests to cache.
87const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
88
89/// The maximum number of connection attempts in an interval.
90const MAX_CONNECTION_ATTEMPTS: usize = 10;
91/// The maximum interval to restrict a peer.
92const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
93
94/// The minimum number of validators to maintain a connection to.
95const MIN_CONNECTED_VALIDATORS: usize = 175;
96/// The maximum number of validators to send in a validators response event.
97const MAX_VALIDATORS_TO_SEND: usize = 200;
98
99/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
100#[cfg(not(any(test)))]
101const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
102/// The amount of time an IP address is prohibited from connecting.
103const IP_BAN_TIME_IN_SECS: u64 = 300;
104
105/// Part of the Gateway API that deals with networking.
106/// This is a separate trait to allow for easier testing/mocking.
107#[async_trait]
108pub trait Transport<N: Network>: Send + Sync {
109    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
110    fn broadcast(&self, event: Event<N>);
111}
112
113/// The gateway maintains connections to other validators.
114/// For connections with clients and provers, the Router logic is used.
115#[derive(Clone)]
116pub struct Gateway<N: Network> {
117    /// The account of the node.
118    account: Account<N>,
119    /// The storage.
120    storage: Storage<N>,
121    /// The ledger service.
122    ledger: Arc<dyn LedgerService<N>>,
123    /// The TCP stack.
124    tcp: Tcp,
125    /// The cache.
126    cache: Arc<Cache<N>>,
127    /// The resolver.
128    resolver: Arc<Resolver<N>>,
129    /// The set of trusted validators.
130    trusted_validators: IndexSet<SocketAddr>,
131    /// The map of connected peer IPs to their peer handlers.
132    connected_peers: Arc<RwLock<IndexSet<SocketAddr>>>,
133    /// The set of handshaking peers. While `Tcp` already recognizes the connecting IP addresses
134    /// and prevents duplicate outbound connection attempts to the same IP address, it is unable to
135    /// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously
136    /// attempt to connect to each other). This set is used to prevent this from happening.
137    connecting_peers: Arc<Mutex<IndexSet<SocketAddr>>>,
138    /// The validator telemetry.
139    #[cfg(feature = "telemetry")]
140    validator_telemetry: Telemetry<N>,
141    /// The primary sender.
142    primary_sender: Arc<OnceCell<PrimarySender<N>>>,
143    /// The worker senders.
144    worker_senders: Arc<OnceCell<IndexMap<u8, WorkerSender<N>>>>,
145    /// The sync sender.
146    sync_sender: Arc<OnceCell<SyncSender<N>>>,
147    /// The spawned handles.
148    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
149    /// The development mode.
150    dev: Option<u16>,
151}
152
153impl<N: Network> Gateway<N> {
154    /// Initializes a new gateway.
155    pub fn new(
156        account: Account<N>,
157        storage: Storage<N>,
158        ledger: Arc<dyn LedgerService<N>>,
159        ip: Option<SocketAddr>,
160        trusted_validators: &[SocketAddr],
161        dev: Option<u16>,
162    ) -> Result<Self> {
163        // Initialize the gateway IP.
164        let ip = match (ip, dev) {
165            (None, Some(dev)) => SocketAddr::from_str(&format!("127.0.0.1:{}", MEMORY_POOL_PORT + dev))?,
166            (None, None) => SocketAddr::from_str(&format!("0.0.0.0:{}", MEMORY_POOL_PORT))?,
167            (Some(ip), _) => ip,
168        };
169        // Initialize the TCP stack.
170        let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
171
172        // Return the gateway.
173        Ok(Self {
174            account,
175            storage,
176            ledger,
177            tcp,
178            cache: Default::default(),
179            resolver: Default::default(),
180            trusted_validators: trusted_validators.iter().copied().collect(),
181            connected_peers: Default::default(),
182            connecting_peers: Default::default(),
183            #[cfg(feature = "telemetry")]
184            validator_telemetry: Default::default(),
185            primary_sender: Default::default(),
186            worker_senders: Default::default(),
187            sync_sender: Default::default(),
188            handles: Default::default(),
189            dev,
190        })
191    }
192
193    /// Run the gateway.
194    pub async fn run(
195        &self,
196        primary_sender: PrimarySender<N>,
197        worker_senders: IndexMap<u8, WorkerSender<N>>,
198        sync_sender: Option<SyncSender<N>>,
199    ) {
200        debug!("Starting the gateway for the memory pool...");
201
202        // Set the primary sender.
203        self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
204
205        // Set the worker senders.
206        self.worker_senders.set(worker_senders).expect("The worker senders are already set");
207
208        // If the sync sender was provided, set the sync sender.
209        if let Some(sync_sender) = sync_sender {
210            self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
211        }
212
213        // Enable the TCP protocols.
214        self.enable_handshake().await;
215        self.enable_reading().await;
216        self.enable_writing().await;
217        self.enable_disconnect().await;
218        self.enable_on_connect().await;
219
220        // Enable the TCP listener. Note: This must be called after the above protocols.
221        let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
222        debug!("Listening for validator connections at address {listen_addr:?}");
223
224        // Initialize the heartbeat.
225        self.initialize_heartbeat();
226
227        info!("Started the gateway for the memory pool at '{}'", self.local_ip());
228    }
229}
230
231// Dynamic rate limiting.
232impl<N: Network> Gateway<N> {
233    /// The current maximum committee size.
234    fn max_committee_size(&self) -> usize {
235        self.ledger.current_committee().map_or_else(
236            |_e| Committee::<N>::max_committee_size().unwrap() as usize,
237            |committee| committee.num_members(),
238        )
239    }
240
241    /// The maximum number of events to cache.
242    fn max_cache_events(&self) -> usize {
243        self.max_cache_transmissions()
244    }
245
246    /// The maximum number of certificate requests to cache.
247    fn max_cache_certificates(&self) -> usize {
248        2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
249    }
250
251    /// The maximum number of transmission requests to cache.
252    fn max_cache_transmissions(&self) -> usize {
253        self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
254    }
255
256    /// The maximum number of duplicates for any particular request.
257    fn max_cache_duplicates(&self) -> usize {
258        self.max_committee_size().pow(2)
259    }
260}
261
262#[async_trait]
263impl<N: Network> CommunicationService for Gateway<N> {
264    /// The message type.
265    type Message = Event<N>;
266
267    /// Prepares a block request to be sent.
268    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
269        debug_assert!(start_height < end_height, "Invalid block request format");
270        Event::BlockRequest(BlockRequest { start_height, end_height })
271    }
272
273    /// Sends the given message to specified peer.
274    ///
275    /// This function returns as soon as the message is queued to be sent,
276    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
277    /// which can be used to determine when and whether the message has been delivered.
278    async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
279        Transport::send(self, peer_ip, message).await
280    }
281
282    fn ban_peer(&self, peer_ip: SocketAddr) {
283        trace!("Banning peer {peer_ip} for timing out on block requests");
284
285        let tcp = self.tcp().clone();
286        tcp.banned_peers().update_ip_ban(peer_ip.ip());
287
288        tokio::spawn(async move {
289            tcp.disconnect(peer_ip).await;
290        });
291    }
292}
293
294impl<N: Network> Gateway<N> {
295    /// Returns the account of the node.
296    pub const fn account(&self) -> &Account<N> {
297        &self.account
298    }
299
300    /// Returns the dev identifier of the node.
301    pub const fn dev(&self) -> Option<u16> {
302        self.dev
303    }
304
305    /// Returns the IP address of this node.
306    pub fn local_ip(&self) -> SocketAddr {
307        self.tcp.listening_addr().expect("The TCP listener is not enabled")
308    }
309
310    /// Returns `true` if the given IP is this node.
311    pub fn is_local_ip(&self, ip: SocketAddr) -> bool {
312        ip == self.local_ip()
313            || (ip.ip().is_unspecified() || ip.ip().is_loopback()) && ip.port() == self.local_ip().port()
314    }
315
316    /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
317    pub fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
318        !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
319    }
320
321    /// Returns the resolver.
322    pub fn resolver(&self) -> &Resolver<N> {
323        &self.resolver
324    }
325
326    /// Returns the validator telemetry.
327    #[cfg(feature = "telemetry")]
328    pub fn validator_telemetry(&self) -> &Telemetry<N> {
329        &self.validator_telemetry
330    }
331
332    /// Returns the primary sender.
333    pub fn primary_sender(&self) -> &PrimarySender<N> {
334        self.primary_sender.get().expect("Primary sender not set in gateway")
335    }
336
337    /// Returns the number of workers.
338    pub fn num_workers(&self) -> u8 {
339        u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
340            .expect("Too many workers")
341    }
342
343    /// Returns the worker sender for the given worker ID.
344    pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
345        self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
346    }
347
348    /// Returns `true` if the node is connected to the given Aleo address.
349    pub fn is_connected_address(&self, address: Address<N>) -> bool {
350        // Retrieve the peer IP of the given address.
351        match self.resolver.get_peer_ip_for_address(address) {
352            // Determine if the peer IP is connected.
353            Some(peer_ip) => self.is_connected_ip(peer_ip),
354            None => false,
355        }
356    }
357
358    /// Returns `true` if the node is connected to the given peer IP.
359    pub fn is_connected_ip(&self, ip: SocketAddr) -> bool {
360        self.connected_peers.read().contains(&ip)
361    }
362
363    /// Returns `true` if the node is connecting to the given peer IP.
364    pub fn is_connecting_ip(&self, ip: SocketAddr) -> bool {
365        self.connecting_peers.lock().contains(&ip)
366    }
367
368    /// Returns `true` if the given peer IP is an authorized validator.
369    pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
370        // If the peer IP is in the trusted validators, return early.
371        if self.trusted_validators.contains(&ip) {
372            return true;
373        }
374        // Retrieve the Aleo address of the peer IP.
375        match self.resolver.get_address(ip) {
376            // Determine if the peer IP is an authorized validator.
377            Some(address) => self.is_authorized_validator_address(address),
378            None => false,
379        }
380    }
381
382    /// Returns `true` if the given address is an authorized validator.
383    pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
384        // Determine if the validator address is a member of the committee lookback,
385        // the current committee, or the previous committee lookbacks.
386        // We allow leniency in this validation check in order to accommodate these two scenarios:
387        //  1. New validators should be able to connect immediately once bonded as a committee member.
388        //  2. Existing validators must remain connected until they are no longer bonded as a committee member.
389        //     (i.e. meaning they must stay online until the next block has been produced)
390
391        // Determine if the validator is in the current committee with lookback.
392        if self
393            .ledger
394            .get_committee_lookback_for_round(self.storage.current_round())
395            .map_or(false, |committee| committee.is_committee_member(validator_address))
396        {
397            return true;
398        }
399
400        // Determine if the validator is in the latest committee on the ledger.
401        if self.ledger.current_committee().map_or(false, |committee| committee.is_committee_member(validator_address)) {
402            return true;
403        }
404
405        // Retrieve the previous block height to consider from the sync tolerance.
406        let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
407        // Determine if the validator is in any of the previous committee lookbacks.
408        match self.ledger.get_block_round(previous_block_height) {
409            Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
410                self.ledger
411                    .get_committee_lookback_for_round(round)
412                    .map_or(false, |committee| committee.is_committee_member(validator_address))
413            }),
414            Err(_) => false,
415        }
416    }
417
418    /// Returns the maximum number of connected peers.
419    pub fn max_connected_peers(&self) -> usize {
420        self.tcp.config().max_connections as usize
421    }
422
423    /// Returns the number of connected peers.
424    pub fn number_of_connected_peers(&self) -> usize {
425        self.connected_peers.read().len()
426    }
427
428    /// Returns the list of connected addresses.
429    pub fn connected_addresses(&self) -> HashSet<Address<N>> {
430        self.connected_peers.read().iter().filter_map(|peer_ip| self.resolver.get_address(*peer_ip)).collect()
431    }
432
433    /// Returns the list of connected peers.
434    pub fn connected_peers(&self) -> &RwLock<IndexSet<SocketAddr>> {
435        &self.connected_peers
436    }
437
438    /// Attempts to connect to the given peer IP.
439    pub fn connect(&self, peer_ip: SocketAddr) -> Option<JoinHandle<()>> {
440        // Return early if the attempt is against the protocol rules.
441        if let Err(forbidden_error) = self.check_connection_attempt(peer_ip) {
442            warn!("{forbidden_error}");
443            return None;
444        }
445
446        let self_ = self.clone();
447        Some(tokio::spawn(async move {
448            debug!("Connecting to validator {peer_ip}...");
449            // Attempt to connect to the peer.
450            if let Err(error) = self_.tcp.connect(peer_ip).await {
451                self_.connecting_peers.lock().shift_remove(&peer_ip);
452                warn!("Unable to connect to '{peer_ip}' - {error}");
453            }
454        }))
455    }
456
457    /// Ensure we are allowed to connect to the given peer.
458    fn check_connection_attempt(&self, peer_ip: SocketAddr) -> Result<()> {
459        // Ensure the peer IP is not this node.
460        if self.is_local_ip(peer_ip) {
461            bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (attempted to self-connect)")
462        }
463        // Ensure the node does not surpass the maximum number of peer connections.
464        if self.number_of_connected_peers() >= self.max_connected_peers() {
465            bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (maximum peers reached)")
466        }
467        // Ensure the node is not already connected to this peer.
468        if self.is_connected_ip(peer_ip) {
469            bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connected)")
470        }
471        // Ensure the node is not already connecting to this peer.
472        if self.is_connecting_ip(peer_ip) {
473            bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connecting)")
474        }
475        Ok(())
476    }
477
478    /// Ensure the peer is allowed to connect.
479    fn ensure_peer_is_allowed(&self, peer_ip: SocketAddr) -> Result<()> {
480        // Ensure the peer IP is not this node.
481        if self.is_local_ip(peer_ip) {
482            bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (attempted to self-connect)")
483        }
484        // Ensure the node is not already connecting to this peer.
485        if !self.connecting_peers.lock().insert(peer_ip) {
486            bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already shaking hands as the initiator)")
487        }
488        // Ensure the node is not already connected to this peer.
489        if self.is_connected_ip(peer_ip) {
490            bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already connected)")
491        }
492        // Ensure the peer is not spamming connection attempts.
493        if !peer_ip.ip().is_loopback() {
494            // Add this connection attempt and retrieve the number of attempts.
495            let num_attempts = self.cache.insert_inbound_connection(peer_ip.ip(), RESTRICTED_INTERVAL);
496            // Ensure the connecting peer has not surpassed the connection attempt limit.
497            if num_attempts > MAX_CONNECTION_ATTEMPTS {
498                bail!("Dropping connection request from '{peer_ip}' (tried {num_attempts} times)")
499            }
500        }
501        Ok(())
502    }
503
504    /// Check whether the given IP address is currently banned.
505    #[cfg(not(any(test)))]
506    fn is_ip_banned(&self, ip: IpAddr) -> bool {
507        self.tcp.banned_peers().is_ip_banned(&ip)
508    }
509
510    /// Insert or update a banned IP.
511    #[cfg(not(any(test)))]
512    fn update_ip_ban(&self, ip: IpAddr) {
513        self.tcp.banned_peers().update_ip_ban(ip);
514    }
515
516    #[cfg(feature = "metrics")]
517    fn update_metrics(&self) {
518        metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
519        metrics::gauge(metrics::bft::CONNECTING, self.connecting_peers.lock().len() as f64);
520    }
521
522    /// Inserts the given peer into the connected peers.
523    #[cfg(not(test))]
524    fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
525        // Adds a bidirectional map between the listener address and (ambiguous) peer address.
526        self.resolver.insert_peer(peer_ip, peer_addr, address);
527        // Add a transmission for this peer in the connected peers.
528        self.connected_peers.write().insert(peer_ip);
529        #[cfg(feature = "metrics")]
530        self.update_metrics();
531    }
532
533    /// Inserts the given peer into the connected peers. This is only used in testing.
534    #[cfg(test)]
535    pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
536        // Adds a bidirectional map between the listener address and (ambiguous) peer address.
537        self.resolver.insert_peer(peer_ip, peer_addr, address);
538        // Add a transmission for this peer in the connected peers.
539        self.connected_peers.write().insert(peer_ip);
540    }
541
542    /// Removes the connected peer and adds them to the candidate peers.
543    fn remove_connected_peer(&self, peer_ip: SocketAddr) {
544        // Remove the peer from the sync module. Except for some tests, there is always a sync sender.
545        if let Some(sync_sender) = self.sync_sender.get() {
546            let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
547            tokio::spawn(async move {
548                if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
549                    warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
550                }
551            });
552        }
553        // Removes the bidirectional map between the listener address and (ambiguous) peer address.
554        self.resolver.remove_peer(peer_ip);
555        // Remove this peer from the connected peers, if it exists.
556        self.connected_peers.write().shift_remove(&peer_ip);
557        #[cfg(feature = "metrics")]
558        self.update_metrics();
559    }
560
561    /// Sends the given event to specified peer.
562    ///
563    /// This function returns as soon as the event is queued to be sent,
564    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
565    /// which can be used to determine when and whether the event has been delivered.
566    fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
567        // Resolve the listener IP to the (ambiguous) peer address.
568        let Some(peer_addr) = self.resolver.get_ambiguous(peer_ip) else {
569            warn!("Unable to resolve the listener IP address '{peer_ip}'");
570            return None;
571        };
572        // Retrieve the event name.
573        let name = event.name();
574        // Send the event to the peer.
575        trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
576        let result = self.unicast(peer_addr, event);
577        // If the event was unable to be sent, disconnect.
578        if let Err(e) = &result {
579            warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
580            debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
581            self.disconnect(peer_ip);
582        }
583        result.ok()
584    }
585
586    /// Handles the inbound event from the peer.
587    async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<()> {
588        // Retrieve the listener IP for the peer.
589        let Some(peer_ip) = self.resolver.get_listener(peer_addr) else {
590            bail!("{CONTEXT} Unable to resolve the (ambiguous) peer address '{peer_addr}'")
591        };
592        // Ensure that the peer is an authorized committee member.
593        if !self.is_authorized_validator_ip(peer_ip) {
594            bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
595        }
596        // Drop the peer, if they have exceeded the rate limit (i.e. they are requesting too much from us).
597        let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
598        if num_events >= self.max_cache_events() {
599            bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
600        }
601        // Rate limit for duplicate requests.
602        match event {
603            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
604                // Retrieve the certificate ID.
605                let certificate_id = match &event {
606                    Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
607                    Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
608                    _ => unreachable!(),
609                };
610                // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
611                let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
612                if num_events >= self.max_cache_duplicates() {
613                    return Ok(());
614                }
615            }
616            Event::TransmissionRequest(TransmissionRequest { transmission_id })
617            | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
618                // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
619                let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
620                if num_events >= self.max_cache_duplicates() {
621                    return Ok(());
622                }
623            }
624            Event::BlockRequest(_) => {
625                let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
626                if num_events >= self.max_cache_duplicates() {
627                    return Ok(());
628                }
629            }
630            _ => {}
631        }
632        trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
633
634        // This match statement handles the inbound event by deserializing the event,
635        // checking the event is valid, and then calling the appropriate (trait) handler.
636        match event {
637            Event::BatchPropose(batch_propose) => {
638                // Send the batch propose to the primary.
639                let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
640                Ok(())
641            }
642            Event::BatchSignature(batch_signature) => {
643                // Send the batch signature to the primary.
644                let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
645                Ok(())
646            }
647            Event::BatchCertified(batch_certified) => {
648                // Send the batch certificate to the primary.
649                let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
650                Ok(())
651            }
652            Event::BlockRequest(block_request) => {
653                let BlockRequest { start_height, end_height } = block_request;
654
655                // Ensure the block request is well-formed.
656                if start_height >= end_height {
657                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
658                }
659                // Ensure that the block request is within the allowed bounds.
660                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
661                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
662                }
663
664                let self_ = self.clone();
665                let blocks = match task::spawn_blocking(move || {
666                    // Retrieve the blocks within the requested range.
667                    match self_.ledger.get_blocks(start_height..end_height) {
668                        Ok(blocks) => Ok(Data::Object(DataBlocks(blocks))),
669                        Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
670                    }
671                })
672                .await
673                {
674                    Ok(Ok(blocks)) => blocks,
675                    Ok(Err(error)) => return Err(error),
676                    Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
677                };
678
679                let self_ = self.clone();
680                tokio::spawn(async move {
681                    // Send the `BlockResponse` message to the peer.
682                    let event = Event::BlockResponse(BlockResponse { request: block_request, blocks });
683                    Transport::send(&self_, peer_ip, event).await;
684                });
685                Ok(())
686            }
687            Event::BlockResponse(block_response) => {
688                // Process the block response. Except for some tests, there is always a sync sender.
689                if let Some(sync_sender) = self.sync_sender.get() {
690                    // Retrieve the block response.
691                    let BlockResponse { request, blocks } = block_response;
692
693                    // Check the response corresponds to a request.
694                    if !self.cache.remove_outbound_block_request(peer_ip, &request) {
695                        bail!("Unsolicited block response from '{peer_ip}'")
696                    }
697
698                    // Perform the deferred non-blocking deserialization of the blocks.
699                    // The deserialization can take a long time (minutes). We should not be running
700                    // this on a blocking task, but on a rayon thread pool.
701                    let (send, recv) = tokio::sync::oneshot::channel();
702                    rayon::spawn_fifo(move || {
703                        let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
704                        let _ = send.send(blocks);
705                    });
706                    let blocks = match recv.await {
707                        Ok(Ok(blocks)) => blocks,
708                        Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
709                        Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
710                    };
711
712                    // Ensure the block response is well-formed.
713                    blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
714                    // Send the blocks to the sync module.
715                    if let Err(e) = sync_sender.advance_with_sync_blocks(peer_ip, blocks.0).await {
716                        warn!("Unable to process block response from '{peer_ip}' - {e}");
717                    }
718                }
719                Ok(())
720            }
721            Event::CertificateRequest(certificate_request) => {
722                // Send the certificate request to the sync module.
723                // Except for some tests, there is always a sync sender.
724                if let Some(sync_sender) = self.sync_sender.get() {
725                    // Send the certificate request to the sync module.
726                    let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
727                }
728                Ok(())
729            }
730            Event::CertificateResponse(certificate_response) => {
731                // Send the certificate response to the sync module.
732                // Except for some tests, there is always a sync sender.
733                if let Some(sync_sender) = self.sync_sender.get() {
734                    // Send the certificate response to the sync module.
735                    let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
736                }
737                Ok(())
738            }
739            Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
740                // Disconnect as the peer is not following the protocol.
741                bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
742            }
743            Event::Disconnect(disconnect) => {
744                bail!("{CONTEXT} {:?}", disconnect.reason)
745            }
746            Event::PrimaryPing(ping) => {
747                let PrimaryPing { version, block_locators, primary_certificate } = ping;
748
749                // Ensure the event version is not outdated.
750                if version < Event::<N>::VERSION {
751                    bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
752                }
753
754                // Update the peer locators. Except for some tests, there is always a sync sender.
755                if let Some(sync_sender) = self.sync_sender.get() {
756                    // Check the block locators are valid, and update the validators in the sync module.
757                    if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
758                        bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
759                    }
760                }
761
762                // Send the batch certificates to the primary.
763                let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
764                Ok(())
765            }
766            Event::TransmissionRequest(request) => {
767                // TODO (howardwu): Add rate limiting checks on this event, on a per-peer basis.
768                // Determine the worker ID.
769                let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
770                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
771                    return Ok(());
772                };
773                // Send the transmission request to the worker.
774                if let Some(sender) = self.get_worker_sender(worker_id) {
775                    // Send the transmission request to the worker.
776                    let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
777                }
778                Ok(())
779            }
780            Event::TransmissionResponse(response) => {
781                // Determine the worker ID.
782                let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
783                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
784                    return Ok(());
785                };
786                // Send the transmission response to the worker.
787                if let Some(sender) = self.get_worker_sender(worker_id) {
788                    // Send the transmission response to the worker.
789                    let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
790                }
791                Ok(())
792            }
793            Event::ValidatorsRequest(_) => {
794                // Retrieve the connected peers.
795                let mut connected_peers: Vec<_> = match self.dev.is_some() {
796                    // In development mode, relax the validity requirements to make operating devnets more flexible.
797                    true => self.connected_peers.read().iter().copied().collect(),
798                    // In production mode, ensure the peer IPs are valid.
799                    false => {
800                        self.connected_peers.read().iter().copied().filter(|ip| self.is_valid_peer_ip(*ip)).collect()
801                    }
802                };
803                // Shuffle the connected peers.
804                connected_peers.shuffle(&mut rand::thread_rng());
805
806                let self_ = self.clone();
807                tokio::spawn(async move {
808                    // Initialize the validators.
809                    let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
810                    // Iterate over the validators.
811                    for validator_ip in connected_peers.into_iter().take(MAX_VALIDATORS_TO_SEND) {
812                        // Retrieve the validator address.
813                        if let Some(validator_address) = self_.resolver.get_address(validator_ip) {
814                            // Add the validator to the list of validators.
815                            validators.insert(validator_ip, validator_address);
816                        }
817                    }
818                    // Send the validators response to the peer.
819                    let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
820                    Transport::send(&self_, peer_ip, event).await;
821                });
822                Ok(())
823            }
824            Event::ValidatorsResponse(response) => {
825                let ValidatorsResponse { validators } = response;
826                // Ensure the number of validators is not too large.
827                ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
828                // Ensure the cache contains a validators request for this peer.
829                if !self.cache.contains_outbound_validators_request(peer_ip) {
830                    bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
831                }
832                // Decrement the number of validators requests for this peer.
833                self.cache.decrement_outbound_validators_requests(peer_ip);
834
835                // If the number of connected validators is less than the minimum, connect to more validators.
836                if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
837                    // Attempt to connect to any validators that are not already connected.
838                    let self_ = self.clone();
839                    tokio::spawn(async move {
840                        for (validator_ip, validator_address) in validators {
841                            if self_.dev.is_some() {
842                                // Ensure the validator IP is not this node.
843                                if self_.is_local_ip(validator_ip) {
844                                    continue;
845                                }
846                            } else {
847                                // Ensure the validator IP is not this node and is well-formed.
848                                if !self_.is_valid_peer_ip(validator_ip) {
849                                    continue;
850                                }
851                            }
852
853                            // Ensure the validator address is not this node.
854                            if self_.account.address() == validator_address {
855                                continue;
856                            }
857                            // Ensure the validator IP is not already connected or connecting.
858                            if self_.is_connected_ip(validator_ip) || self_.is_connecting_ip(validator_ip) {
859                                continue;
860                            }
861                            // Ensure the validator address is not already connected.
862                            if self_.is_connected_address(validator_address) {
863                                continue;
864                            }
865                            // Ensure the validator address is an authorized validator.
866                            if !self_.is_authorized_validator_address(validator_address) {
867                                continue;
868                            }
869                            // Attempt to connect to the validator.
870                            self_.connect(validator_ip);
871                        }
872                    });
873                }
874                Ok(())
875            }
876            Event::WorkerPing(ping) => {
877                // Ensure the number of transmissions is not too large.
878                ensure!(
879                    ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
880                    "{CONTEXT} Received too many transmissions"
881                );
882                // Retrieve the number of workers.
883                let num_workers = self.num_workers();
884                // Iterate over the transmission IDs.
885                for transmission_id in ping.transmission_ids.into_iter() {
886                    // Determine the worker ID.
887                    let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
888                        warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
889                        continue;
890                    };
891                    // Send the transmission ID to the worker.
892                    if let Some(sender) = self.get_worker_sender(worker_id) {
893                        // Send the transmission ID to the worker.
894                        let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
895                    }
896                }
897                Ok(())
898            }
899        }
900    }
901
902    /// Disconnects from the given peer IP, if the peer is connected.
903    pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle<()> {
904        let gateway = self.clone();
905        tokio::spawn(async move {
906            if let Some(peer_addr) = gateway.resolver.get_ambiguous(peer_ip) {
907                // Disconnect from this peer.
908                let _disconnected = gateway.tcp.disconnect(peer_addr).await;
909                debug_assert!(_disconnected);
910            }
911        })
912    }
913
914    /// Initialize a new instance of the heartbeat.
915    fn initialize_heartbeat(&self) {
916        let self_clone = self.clone();
917        self.spawn(async move {
918            // Sleep briefly to ensure the other nodes are ready to connect.
919            tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
920            info!("Starting the heartbeat of the gateway...");
921            loop {
922                // Process a heartbeat in the gateway.
923                self_clone.heartbeat();
924                // Sleep for the heartbeat interval.
925                tokio::time::sleep(Duration::from_secs(15)).await;
926            }
927        });
928    }
929
930    /// Spawns a task with the given future; it should only be used for long-running tasks.
931    #[allow(dead_code)]
932    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
933        self.handles.lock().push(tokio::spawn(future));
934    }
935
936    /// Shuts down the gateway.
937    pub async fn shut_down(&self) {
938        info!("Shutting down the gateway...");
939        // Abort the tasks.
940        self.handles.lock().iter().for_each(|handle| handle.abort());
941        // Close the listener.
942        self.tcp.shut_down().await;
943    }
944}
945
946impl<N: Network> Gateway<N> {
947    /// Handles the heartbeat request.
948    fn heartbeat(&self) {
949        // Log the connected validators.
950        self.log_connected_validators();
951        // Log the validator participation scores.
952        #[cfg(feature = "telemetry")]
953        self.log_participation_scores();
954        // Keep the trusted validators connected.
955        self.handle_trusted_validators();
956        // Removes any validators that not in the current committee.
957        self.handle_unauthorized_validators();
958        // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
959        self.handle_min_connected_validators();
960        // Unban any addresses whose ban time has expired.
961        self.handle_banned_ips();
962    }
963
964    /// Logs the connected validators.
965    fn log_connected_validators(&self) {
966        // Log the connected validators.
967        let connected_validators = self.connected_peers().read().clone();
968        // Resolve the total number of connectable validators.
969        let validators_total = self.ledger.current_committee().map_or(0, |c| c.num_members().saturating_sub(1));
970        // Format the total validators message.
971        let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
972        // Construct the connections message.
973        let connections_msg = match connected_validators.len() {
974            0 => "No connected validators".to_string(),
975            num_connected => format!("Connected to {num_connected} validators {total_validators}"),
976        };
977        // Collect the connected validator addresses.
978        let mut connected_validator_addresses = IndexSet::with_capacity(connected_validators.len());
979        connected_validator_addresses.insert(self.account.address());
980        // Log the connected validators.
981        info!("{connections_msg}");
982        for peer_ip in &connected_validators {
983            let address = self.resolver.get_address(*peer_ip).map_or("Unknown".to_string(), |a| {
984                connected_validator_addresses.insert(a);
985                a.to_string()
986            });
987            debug!("{}", format!("  {peer_ip} - {address}").dimmed());
988        }
989
990        // Log the validators that are not connected.
991        let num_not_connected = validators_total.saturating_sub(connected_validators.len());
992        if num_not_connected > 0 {
993            info!("Not connected to {num_not_connected} validators {total_validators}");
994            // Collect the committee members.
995            let committee_members: IndexSet<_> =
996                self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
997
998            // Log the validators that are not connected.
999            for address in committee_members.difference(&connected_validator_addresses) {
1000                debug!("{}", format!("  Not connected to {address}").dimmed());
1001            }
1002        }
1003    }
1004
1005    // Logs the validator participation scores.
1006    #[cfg(feature = "telemetry")]
1007    fn log_participation_scores(&self) {
1008        if let Ok(current_committee) = self.ledger.current_committee() {
1009            // Retrieve the participation scores.
1010            let participation_scores = self.validator_telemetry().get_participation_scores(&current_committee);
1011            // Log the participation scores.
1012            debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
1013            for (address, score) in participation_scores {
1014                debug!("{}", format!("  {address} - {score:.2}%").dimmed());
1015            }
1016        }
1017    }
1018
1019    /// This function attempts to connect to any disconnected trusted validators.
1020    fn handle_trusted_validators(&self) {
1021        // Ensure that the trusted nodes are connected.
1022        for validator_ip in &self.trusted_validators {
1023            // If the trusted_validator is not connected, attempt to connect to it.
1024            if !self.is_local_ip(*validator_ip)
1025                && !self.is_connecting_ip(*validator_ip)
1026                && !self.is_connected_ip(*validator_ip)
1027            {
1028                // Attempt to connect to the trusted validator.
1029                self.connect(*validator_ip);
1030            }
1031        }
1032    }
1033
1034    /// This function attempts to disconnect any validators that are not in the current committee.
1035    fn handle_unauthorized_validators(&self) {
1036        let self_ = self.clone();
1037        tokio::spawn(async move {
1038            // Retrieve the connected validators.
1039            let validators = self_.connected_peers().read().clone();
1040            // Iterate over the validator IPs.
1041            for peer_ip in validators {
1042                // Disconnect any validator that is not in the current committee.
1043                if !self_.is_authorized_validator_ip(peer_ip) {
1044                    warn!("{CONTEXT} Disconnecting from '{peer_ip}' - Validator is not in the current committee");
1045                    Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1046                    // Disconnect from this peer.
1047                    self_.disconnect(peer_ip);
1048                }
1049            }
1050        });
1051    }
1052
1053    /// This function sends a `ValidatorsRequest` to a random validator,
1054    /// if the number of connected validators is less than the minimum.
1055    fn handle_min_connected_validators(&self) {
1056        // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
1057        if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
1058            // Retrieve the connected validators.
1059            let validators = self.connected_peers().read().clone();
1060            // If there are no validator IPs to connect to, return early.
1061            if validators.is_empty() {
1062                return;
1063            }
1064            // Select a random validator IP.
1065            if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1066                let self_ = self.clone();
1067                tokio::spawn(async move {
1068                    // Increment the number of outbound validators requests for this validator.
1069                    self_.cache.increment_outbound_validators_requests(validator_ip);
1070                    // Send a `ValidatorsRequest` to the validator.
1071                    let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1072                });
1073            }
1074        }
1075    }
1076
1077    /// Processes a message received from the network.
1078    async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1079        // Process the message. Disconnect if the peer violated the protocol.
1080        if let Err(error) = self.inbound(peer_addr, message).await {
1081            if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
1082                warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1083                let self_ = self.clone();
1084                tokio::spawn(async move {
1085                    Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1086                    // Disconnect from this peer.
1087                    self_.disconnect(peer_ip);
1088                });
1089            }
1090        }
1091    }
1092
1093    // Remove addresses whose ban time has expired.
1094    fn handle_banned_ips(&self) {
1095        self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1096    }
1097}
1098
1099#[async_trait]
1100impl<N: Network> Transport<N> for Gateway<N> {
1101    /// Sends the given event to specified peer.
1102    ///
1103    /// This method is rate limited to prevent spamming the peer.
1104    ///
1105    /// This function returns as soon as the event is queued to be sent,
1106    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
1107    /// which can be used to determine when and whether the event has been delivered.
1108    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1109        macro_rules! send {
1110            ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1111                // Rate limit the number of certificate requests sent to the peer.
1112                while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1113                    // Sleep for a short period of time to allow the cache to clear.
1114                    tokio::time::sleep(Duration::from_millis(10)).await;
1115                }
1116                // Send the event to the peer.
1117                $self.send_inner(peer_ip, event)
1118            }};
1119        }
1120
1121        // Increment the cache for certificate, transmission and block events.
1122        match event {
1123            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1124                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1125                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1126                // Send the event to the peer.
1127                send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1128            }
1129            Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1130                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1131                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1132                // Send the event to the peer.
1133                send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1134            }
1135            Event::BlockRequest(request) => {
1136                // Insert the outbound request so we can match it to responses.
1137                self.cache.insert_outbound_block_request(peer_ip, request);
1138                // Send the event to the peer and update the outbound event cache, use the general rate limit.
1139                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1140            }
1141            _ => {
1142                // Send the event to the peer, use the general rate limit.
1143                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1144            }
1145        }
1146    }
1147
1148    /// Broadcasts the given event to all connected peers.
1149    // TODO(ljedrz): the event should be checked for the presence of Data::Object, and
1150    // serialized in advance if it's there.
1151    fn broadcast(&self, event: Event<N>) {
1152        // Ensure there are connected peers.
1153        if self.number_of_connected_peers() > 0 {
1154            let self_ = self.clone();
1155            let connected_peers = self.connected_peers.read().clone();
1156            tokio::spawn(async move {
1157                // Iterate through all connected peers.
1158                for peer_ip in connected_peers {
1159                    // Send the event to the peer.
1160                    let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1161                }
1162            });
1163        }
1164    }
1165}
1166
1167impl<N: Network> P2P for Gateway<N> {
1168    /// Returns a reference to the TCP instance.
1169    fn tcp(&self) -> &Tcp {
1170        &self.tcp
1171    }
1172}
1173
1174#[async_trait]
1175impl<N: Network> Reading for Gateway<N> {
1176    type Codec = EventCodec<N>;
1177    type Message = Event<N>;
1178
1179    /// Creates a [`Decoder`] used to interpret messages from the network.
1180    /// The `side` param indicates the connection side **from the node's perspective**.
1181    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1182        Default::default()
1183    }
1184
1185    /// Processes a message received from the network.
1186    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1187        if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1188            let self_ = self.clone();
1189            // Handle BlockRequest and BlockResponse messages in a separate task to not block the
1190            // inbound queue.
1191            tokio::spawn(async move {
1192                self_.process_message_inner(peer_addr, message).await;
1193            });
1194        } else {
1195            self.process_message_inner(peer_addr, message).await;
1196        }
1197        Ok(())
1198    }
1199
1200    /// Computes the depth of per-connection queues used to process inbound messages, sufficient to process the maximum expected load at any givent moment.
1201    /// The greater it is, the more inbound messages the node can enqueue, but a too large value can make the node more susceptible to DoS attacks.
1202    fn message_queue_depth(&self) -> usize {
1203        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1204            * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1205            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1206    }
1207}
1208
1209#[async_trait]
1210impl<N: Network> Writing for Gateway<N> {
1211    type Codec = EventCodec<N>;
1212    type Message = Event<N>;
1213
1214    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
1215    /// The `side` parameter indicates the connection side **from the node's perspective**.
1216    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1217        Default::default()
1218    }
1219
1220    /// Computes the depth of per-connection queues used to send outbound messages, sufficient to process the maximum expected load at any givent moment.
1221    /// The greater it is, the more outbound messages the node can enqueue. A too large value large value might obscure potential issues with your implementation
1222    /// (like slow serialization) or network.
1223    fn message_queue_depth(&self) -> usize {
1224        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1225            * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1226            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1227    }
1228}
1229
1230#[async_trait]
1231impl<N: Network> Disconnect for Gateway<N> {
1232    /// Any extra operations to be performed during a disconnect.
1233    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1234        if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
1235            self.remove_connected_peer(peer_ip);
1236
1237            // We don't clear this map based on time but only on peer disconnect.
1238            // This is sufficient to avoid infinite growth as the committee has a fixed number
1239            // of members.
1240            self.cache.clear_outbound_validators_requests(peer_ip);
1241            self.cache.clear_outbound_block_requests(peer_ip);
1242        }
1243    }
1244}
1245
1246#[async_trait]
1247impl<N: Network> OnConnect for Gateway<N> {
1248    async fn on_connect(&self, _peer_addr: SocketAddr) {
1249        return;
1250    }
1251}
1252
1253#[async_trait]
1254impl<N: Network> Handshake for Gateway<N> {
1255    /// Performs the handshake protocol.
1256    async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1257        // Perform the handshake.
1258        let peer_addr = connection.addr();
1259        let peer_side = connection.side();
1260
1261        // Check (or impose) IP-level bans.
1262        #[cfg(not(any(test)))]
1263        if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1264            // If the IP is already banned reject the connection.
1265            if self.is_ip_banned(peer_addr.ip()) {
1266                trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
1267                return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1268            }
1269
1270            let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1271
1272            debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1273            if num_attempts > MAX_CONNECTION_ATTEMPTS {
1274                self.update_ip_ban(peer_addr.ip());
1275                trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1276                return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1277            }
1278        }
1279
1280        let stream = self.borrow_stream(&mut connection);
1281
1282        // If this is an inbound connection, we log it, but don't know the listening address yet.
1283        // Otherwise, we can immediately register the listening address.
1284        let mut peer_ip = if peer_side == ConnectionSide::Initiator {
1285            debug!("{CONTEXT} Gateway received a connection request from '{peer_addr}'");
1286            None
1287        } else {
1288            debug!("{CONTEXT} Gateway is connecting to {peer_addr}...");
1289            Some(peer_addr)
1290        };
1291
1292        // Retrieve the restrictions ID.
1293        let restrictions_id = self.ledger.latest_restrictions_id();
1294
1295        // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
1296        let handshake_result = if peer_side == ConnectionSide::Responder {
1297            self.handshake_inner_initiator(peer_addr, peer_ip, restrictions_id, stream).await
1298        } else {
1299            self.handshake_inner_responder(peer_addr, &mut peer_ip, restrictions_id, stream).await
1300        };
1301
1302        // Remove the address from the collection of connecting peers (if the handshake got to the point where it's known).
1303        if let Some(ip) = peer_ip {
1304            self.connecting_peers.lock().shift_remove(&ip);
1305        }
1306        let (ref peer_ip, _) = handshake_result?;
1307        info!("{CONTEXT} Gateway is connected to '{peer_ip}'");
1308
1309        Ok(connection)
1310    }
1311}
1312
1313/// A macro unwrapping the expected handshake event or returning an error for unexpected events.
1314macro_rules! expect_event {
1315    ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1316        match $framed.try_next().await? {
1317            // Received the expected event, proceed.
1318            Some($event_ty(data)) => {
1319                trace!("{CONTEXT} Gateway received '{}' from '{}'", data.name(), $peer_addr);
1320                data
1321            }
1322            // Received a disconnect event, abort.
1323            Some(Event::Disconnect(reason)) => {
1324                return Err(error(format!("{CONTEXT} '{}' disconnected: {reason:?}", $peer_addr)));
1325            }
1326            // Received an unexpected event, abort.
1327            Some(ty) => {
1328                return Err(error(format!(
1329                    "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1330                    $peer_addr,
1331                    ty.name(),
1332                    stringify!($event_ty),
1333                )))
1334            }
1335            // Received nothing.
1336            None => {
1337                return Err(error(format!(
1338                    "{CONTEXT} '{}' disconnected before sending {:?}",
1339                    $peer_addr,
1340                    stringify!($event_ty)
1341                )))
1342            }
1343        }
1344    };
1345}
1346
1347/// Send the given message to the peer.
1348async fn send_event<N: Network>(
1349    framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1350    peer_addr: SocketAddr,
1351    event: Event<N>,
1352) -> io::Result<()> {
1353    trace!("{CONTEXT} Gateway is sending '{}' to '{peer_addr}'", event.name());
1354    framed.send(event).await
1355}
1356
1357impl<N: Network> Gateway<N> {
1358    /// The connection initiator side of the handshake.
1359    async fn handshake_inner_initiator<'a>(
1360        &'a self,
1361        peer_addr: SocketAddr,
1362        peer_ip: Option<SocketAddr>,
1363        restrictions_id: Field<N>,
1364        stream: &'a mut TcpStream,
1365    ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec<N>>)> {
1366        // This value is immediately guaranteed to be present, so it can be unwrapped.
1367        let peer_ip = peer_ip.unwrap();
1368
1369        // Construct the stream.
1370        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1371
1372        // Initialize an RNG.
1373        let rng = &mut rand::rngs::OsRng;
1374
1375        /* Step 1: Send the challenge request. */
1376
1377        // Sample a random nonce.
1378        let our_nonce = rng.gen();
1379        // Send a challenge request to the peer.
1380        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce);
1381        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1382
1383        /* Step 2: Receive the peer's challenge response followed by the challenge request. */
1384
1385        // Listen for the challenge response message.
1386        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1387        // Listen for the challenge request message.
1388        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1389
1390        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1391        if let Some(reason) = self
1392            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1393            .await
1394        {
1395            send_event(&mut framed, peer_addr, reason.into()).await?;
1396            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1397        }
1398        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1399        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1400            send_event(&mut framed, peer_addr, reason.into()).await?;
1401            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1402        }
1403
1404        /* Step 3: Send the challenge response. */
1405
1406        // Sign the counterparty nonce.
1407        let response_nonce: u64 = rng.gen();
1408        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1409        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1410            return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1411        };
1412        // Send the challenge response.
1413        let our_response =
1414            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1415        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1416
1417        // Add the peer to the gateway.
1418        self.insert_connected_peer(peer_ip, peer_addr, peer_request.address);
1419
1420        Ok((peer_ip, framed))
1421    }
1422
1423    /// The connection responder side of the handshake.
1424    async fn handshake_inner_responder<'a>(
1425        &'a self,
1426        peer_addr: SocketAddr,
1427        peer_ip: &mut Option<SocketAddr>,
1428        restrictions_id: Field<N>,
1429        stream: &'a mut TcpStream,
1430    ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec<N>>)> {
1431        // Construct the stream.
1432        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1433
1434        /* Step 1: Receive the challenge request. */
1435
1436        // Listen for the challenge request message.
1437        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1438
1439        // Ensure the address is not the same as this node.
1440        if self.account.address() == peer_request.address {
1441            return Err(error("Skipping request to connect to self".to_string()));
1442        }
1443
1444        // Obtain the peer's listening address.
1445        *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1446        let peer_ip = peer_ip.unwrap();
1447
1448        // Knowing the peer's listening address, ensure it is allowed to connect.
1449        if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1450            return Err(error(format!("{forbidden_message}")));
1451        }
1452        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1453        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1454            send_event(&mut framed, peer_addr, reason.into()).await?;
1455            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1456        }
1457
1458        /* Step 2: Send the challenge response followed by own challenge request. */
1459
1460        // Initialize an RNG.
1461        let rng = &mut rand::rngs::OsRng;
1462
1463        // Sign the counterparty nonce.
1464        let response_nonce: u64 = rng.gen();
1465        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1466        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1467            return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1468        };
1469        // Send the challenge response.
1470        let our_response =
1471            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1472        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1473
1474        // Sample a random nonce.
1475        let our_nonce = rng.gen();
1476        // Send the challenge request.
1477        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce);
1478        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1479
1480        /* Step 3: Receive the challenge response. */
1481
1482        // Listen for the challenge response message.
1483        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1484        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1485        if let Some(reason) = self
1486            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1487            .await
1488        {
1489            send_event(&mut framed, peer_addr, reason.into()).await?;
1490            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1491        }
1492        // Add the peer to the gateway.
1493        self.insert_connected_peer(peer_ip, peer_addr, peer_request.address);
1494
1495        Ok((peer_ip, framed))
1496    }
1497
1498    /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
1499    fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1500        // Retrieve the components of the challenge request.
1501        let &ChallengeRequest { version, listener_port: _, address, nonce: _ } = event;
1502        // Ensure the event protocol version is not outdated.
1503        if version < Event::<N>::VERSION {
1504            warn!("{CONTEXT} Gateway is dropping '{peer_addr}' on version {version} (outdated)");
1505            return Some(DisconnectReason::OutdatedClientVersion);
1506        }
1507        // Ensure the address is a current committee member.
1508        if !self.is_authorized_validator_address(address) {
1509            warn!("{CONTEXT} Gateway is dropping '{peer_addr}' for being an unauthorized validator ({address})");
1510            return Some(DisconnectReason::ProtocolViolation);
1511        }
1512        // Ensure the address is not already connected.
1513        if self.is_connected_address(address) {
1514            warn!("{CONTEXT} Gateway is dropping '{peer_addr}' for being already connected ({address})");
1515            return Some(DisconnectReason::ProtocolViolation);
1516        }
1517        None
1518    }
1519
1520    /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
1521    async fn verify_challenge_response(
1522        &self,
1523        peer_addr: SocketAddr,
1524        peer_address: Address<N>,
1525        response: ChallengeResponse<N>,
1526        expected_restrictions_id: Field<N>,
1527        expected_nonce: u64,
1528    ) -> Option<DisconnectReason> {
1529        // Retrieve the components of the challenge response.
1530        let ChallengeResponse { restrictions_id, signature, nonce } = response;
1531
1532        // Verify the restrictions ID.
1533        if restrictions_id != expected_restrictions_id {
1534            warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1535            return Some(DisconnectReason::InvalidChallengeResponse);
1536        }
1537        // Perform the deferred non-blocking deserialization of the signature.
1538        let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1539            warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1540            return Some(DisconnectReason::InvalidChallengeResponse);
1541        };
1542        // Verify the signature.
1543        if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1544            warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (invalid signature)");
1545            return Some(DisconnectReason::InvalidChallengeResponse);
1546        }
1547        None
1548    }
1549}
1550
1551#[cfg(test)]
1552mod prop_tests {
1553    use crate::{
1554        Gateway,
1555        MAX_WORKERS,
1556        MEMORY_POOL_PORT,
1557        Worker,
1558        gateway::prop_tests::GatewayAddress::{Dev, Prod},
1559        helpers::{Storage, init_primary_channels, init_worker_channels},
1560    };
1561    use snarkos_account::Account;
1562    use snarkos_node_bft_ledger_service::MockLedgerService;
1563    use snarkos_node_bft_storage_service::BFTMemoryService;
1564    use snarkos_node_tcp::P2P;
1565    use snarkvm::{
1566        ledger::{
1567            committee::{
1568                Committee,
1569                prop_tests::{CommitteeContext, ValidatorSet},
1570                test_helpers::sample_committee_for_round_and_members,
1571            },
1572            narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1573        },
1574        prelude::{MainnetV0, PrivateKey},
1575        utilities::TestRng,
1576    };
1577
1578    use indexmap::{IndexMap, IndexSet};
1579    use proptest::{
1580        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1581        sample::Selector,
1582    };
1583    use std::{
1584        fmt::{Debug, Formatter},
1585        net::{IpAddr, Ipv4Addr, SocketAddr},
1586        sync::Arc,
1587    };
1588    use test_strategy::proptest;
1589
1590    type CurrentNetwork = MainnetV0;
1591
1592    impl Debug for Gateway<CurrentNetwork> {
1593        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1594            // TODO implement Debug properly and move it over to production code
1595            f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1596        }
1597    }
1598
1599    #[derive(Debug, test_strategy::Arbitrary)]
1600    enum GatewayAddress {
1601        Dev(u8),
1602        Prod(Option<SocketAddr>),
1603    }
1604
1605    impl GatewayAddress {
1606        fn ip(&self) -> Option<SocketAddr> {
1607            if let GatewayAddress::Prod(ip) = self {
1608                return *ip;
1609            }
1610            None
1611        }
1612
1613        fn port(&self) -> Option<u16> {
1614            if let GatewayAddress::Dev(port) = self {
1615                return Some(*port as u16);
1616            }
1617            None
1618        }
1619    }
1620
1621    impl Arbitrary for Gateway<CurrentNetwork> {
1622        type Parameters = ();
1623        type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1624
1625        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1626            any_valid_dev_gateway()
1627                .prop_map(|(storage, _, private_key, address)| {
1628                    Gateway::new(
1629                        Account::try_from(private_key).unwrap(),
1630                        storage.clone(),
1631                        storage.ledger().clone(),
1632                        address.ip(),
1633                        &[],
1634                        address.port(),
1635                    )
1636                    .unwrap()
1637                })
1638                .boxed()
1639        }
1640    }
1641
1642    type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1643
1644    fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1645        (any::<CommitteeContext>(), any::<Selector>())
1646            .prop_flat_map(|(context, account_selector)| {
1647                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1648                (
1649                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1650                    Just(context),
1651                    Just(account_selector.select(validators)),
1652                    0u8..,
1653                )
1654                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1655            })
1656            .boxed()
1657    }
1658
1659    fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1660        (any::<CommitteeContext>(), any::<Selector>())
1661            .prop_flat_map(|(context, account_selector)| {
1662                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1663                (
1664                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1665                    Just(context),
1666                    Just(account_selector.select(validators)),
1667                    any::<Option<SocketAddr>>(),
1668                )
1669                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1670            })
1671            .boxed()
1672    }
1673
1674    #[proptest]
1675    fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1676        let (storage, _, private_key, dev) = input;
1677        let account = Account::try_from(private_key).unwrap();
1678
1679        let gateway =
1680            Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
1681                .unwrap();
1682        let tcp_config = gateway.tcp().config();
1683        assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1684        assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1685
1686        let tcp_config = gateway.tcp().config();
1687        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1688        assert_eq!(gateway.account().address(), account.address());
1689    }
1690
1691    #[proptest]
1692    fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1693        let (storage, _, private_key, dev) = input;
1694        let account = Account::try_from(private_key).unwrap();
1695
1696        let gateway =
1697            Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
1698                .unwrap();
1699        let tcp_config = gateway.tcp().config();
1700        if let Some(socket_addr) = dev.ip() {
1701            assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1702            assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1703        } else {
1704            assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1705            assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1706        }
1707
1708        let tcp_config = gateway.tcp().config();
1709        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1710        assert_eq!(gateway.account().address(), account.address());
1711    }
1712
1713    #[proptest(async = "tokio")]
1714    async fn gateway_start(
1715        #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1716        #[strategy(0..MAX_WORKERS)] workers_count: u8,
1717    ) {
1718        let (storage, committee, private_key, dev) = input;
1719        let committee = committee.0;
1720        let worker_storage = storage.clone();
1721        let account = Account::try_from(private_key).unwrap();
1722
1723        let gateway =
1724            Gateway::new(account, storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();
1725
1726        let (primary_sender, _) = init_primary_channels();
1727
1728        let (workers, worker_senders) = {
1729            // Construct a map of the worker senders.
1730            let mut tx_workers = IndexMap::new();
1731            let mut workers = IndexMap::new();
1732
1733            // Initialize the workers.
1734            for id in 0..workers_count {
1735                // Construct the worker channels.
1736                let (tx_worker, rx_worker) = init_worker_channels();
1737                // Construct the worker instance.
1738                let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1739                let worker =
1740                    Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1741                        .unwrap();
1742                // Run the worker instance.
1743                worker.run(rx_worker);
1744
1745                // Add the worker and the worker sender to maps
1746                workers.insert(id, worker);
1747                tx_workers.insert(id, tx_worker);
1748            }
1749            (workers, tx_workers)
1750        };
1751
1752        gateway.run(primary_sender, worker_senders, None).await;
1753        assert_eq!(
1754            gateway.local_ip(),
1755            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1756        );
1757        assert_eq!(gateway.num_workers(), workers.len() as u8);
1758    }
1759
1760    #[proptest]
1761    fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1762        let rng = &mut TestRng::default();
1763
1764        // Initialize the round parameters.
1765        let current_round = 2;
1766        let committee_size = 4;
1767        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1768        let (_, _, private_key, dev) = input;
1769        let account = Account::try_from(private_key).unwrap();
1770
1771        // Sample the certificates.
1772        let mut certificates = IndexSet::new();
1773        for _ in 0..committee_size {
1774            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1775        }
1776        let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1777        // Initialize the committee.
1778        let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1779        // Sample extra certificates from non-committee members.
1780        for _ in 0..committee_size {
1781            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1782        }
1783        // Initialize the ledger.
1784        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1785        // Initialize the storage.
1786        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1787        // Initialize the gateway.
1788        let gateway =
1789            Gateway::new(account.clone(), storage.clone(), ledger.clone(), dev.ip(), &[], dev.port()).unwrap();
1790        // Insert certificate to the storage.
1791        for certificate in certificates.iter() {
1792            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1793        }
1794        // Check that the current committee members are authorized validators.
1795        for i in 0..certificates.clone().len() {
1796            let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1797            if i < committee_size {
1798                assert!(is_authorized);
1799            } else {
1800                assert!(!is_authorized);
1801            }
1802        }
1803    }
1804}