snarkos_node_bft/
gateway.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19    CONTEXT,
20    MAX_BATCH_DELAY_IN_MS,
21    MEMORY_POOL_PORT,
22    Worker,
23    events::{EventCodec, PrimaryPing},
24    helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker},
25    spawn_blocking,
26};
27use snarkos_account::Account;
28use snarkos_node_bft_events::{
29    BlockRequest,
30    BlockResponse,
31    CertificateRequest,
32    CertificateResponse,
33    ChallengeRequest,
34    ChallengeResponse,
35    DataBlocks,
36    DisconnectReason,
37    Event,
38    EventTrait,
39    TransmissionRequest,
40    TransmissionResponse,
41    ValidatorsRequest,
42    ValidatorsResponse,
43};
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
46use snarkos_node_tcp::{
47    Config,
48    Connection,
49    ConnectionSide,
50    P2P,
51    Tcp,
52    is_bogon_ip,
53    is_unspecified_or_broadcast_ip,
54    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
55};
56use snarkvm::{
57    console::prelude::*,
58    ledger::{
59        committee::Committee,
60        narwhal::{BatchHeader, Data},
61    },
62    prelude::{Address, Field},
63};
64
65use colored::Colorize;
66use futures::SinkExt;
67use indexmap::{IndexMap, IndexSet};
68#[cfg(feature = "locktick")]
69use locktick::parking_lot::{Mutex, RwLock};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use rand::seq::{IteratorRandom, SliceRandom};
73#[cfg(not(any(test)))]
74use std::net::IpAddr;
75use std::{
76    collections::HashSet,
77    future::Future,
78    io,
79    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
80    sync::Arc,
81    time::Duration,
82};
83use tokio::{
84    net::TcpStream,
85    sync::{OnceCell, oneshot},
86    task::{self, JoinHandle},
87};
88use tokio_stream::StreamExt;
89use tokio_util::codec::Framed;
90
91/// The maximum interval of events to cache.
92const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
93/// The maximum interval of requests to cache.
94const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
95
96/// The maximum number of connection attempts in an interval.
97const MAX_CONNECTION_ATTEMPTS: usize = 10;
98/// The maximum interval to restrict a peer.
99const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
100
101/// The minimum number of validators to maintain a connection to.
102const MIN_CONNECTED_VALIDATORS: usize = 175;
103/// The maximum number of validators to send in a validators response event.
104const MAX_VALIDATORS_TO_SEND: usize = 200;
105
106/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
107#[cfg(not(any(test)))]
108const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
109/// The amount of time an IP address is prohibited from connecting.
110const IP_BAN_TIME_IN_SECS: u64 = 300;
111
112/// Part of the Gateway API that deals with networking.
113/// This is a separate trait to allow for easier testing/mocking.
114#[async_trait]
115pub trait Transport<N: Network>: Send + Sync {
116    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
117    fn broadcast(&self, event: Event<N>);
118}
119
120/// The gateway maintains connections to other validators.
121/// For connections with clients and provers, the Router logic is used.
122#[derive(Clone)]
123pub struct Gateway<N: Network> {
124    /// The account of the node.
125    account: Account<N>,
126    /// The storage.
127    storage: Storage<N>,
128    /// The ledger service.
129    ledger: Arc<dyn LedgerService<N>>,
130    /// The TCP stack.
131    tcp: Tcp,
132    /// The cache.
133    cache: Arc<Cache<N>>,
134    /// The resolver.
135    resolver: Arc<Resolver<N>>,
136    /// The set of trusted validators.
137    trusted_validators: IndexSet<SocketAddr>,
138    /// The map of connected peer IPs to their peer handlers.
139    connected_peers: Arc<RwLock<IndexSet<SocketAddr>>>,
140    /// The set of handshaking peers. While `Tcp` already recognizes the connecting IP addresses
141    /// and prevents duplicate outbound connection attempts to the same IP address, it is unable to
142    /// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously
143    /// attempt to connect to each other). This set is used to prevent this from happening.
144    connecting_peers: Arc<Mutex<IndexSet<SocketAddr>>>,
145    /// The validator telemetry.
146    #[cfg(feature = "telemetry")]
147    validator_telemetry: Telemetry<N>,
148    /// The primary sender.
149    primary_sender: Arc<OnceCell<PrimarySender<N>>>,
150    /// The worker senders.
151    worker_senders: Arc<OnceCell<IndexMap<u8, WorkerSender<N>>>>,
152    /// The sync sender.
153    sync_sender: Arc<OnceCell<SyncSender<N>>>,
154    /// The spawned handles.
155    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
156    /// The development mode.
157    dev: Option<u16>,
158}
159
160impl<N: Network> Gateway<N> {
161    /// Initializes a new gateway.
162    pub fn new(
163        account: Account<N>,
164        storage: Storage<N>,
165        ledger: Arc<dyn LedgerService<N>>,
166        ip: Option<SocketAddr>,
167        trusted_validators: &[SocketAddr],
168        dev: Option<u16>,
169    ) -> Result<Self> {
170        // Initialize the gateway IP.
171        let ip = match (ip, dev) {
172            (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
173            (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
174            (Some(ip), _) => ip,
175        };
176        // Initialize the TCP stack.
177        let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
178
179        // Return the gateway.
180        Ok(Self {
181            account,
182            storage,
183            ledger,
184            tcp,
185            cache: Default::default(),
186            resolver: Default::default(),
187            trusted_validators: trusted_validators.iter().copied().collect(),
188            connected_peers: Default::default(),
189            connecting_peers: Default::default(),
190            #[cfg(feature = "telemetry")]
191            validator_telemetry: Default::default(),
192            primary_sender: Default::default(),
193            worker_senders: Default::default(),
194            sync_sender: Default::default(),
195            handles: Default::default(),
196            dev,
197        })
198    }
199
200    /// Run the gateway.
201    pub async fn run(
202        &self,
203        primary_sender: PrimarySender<N>,
204        worker_senders: IndexMap<u8, WorkerSender<N>>,
205        sync_sender: Option<SyncSender<N>>,
206    ) {
207        debug!("Starting the gateway for the memory pool...");
208
209        // Set the primary sender.
210        self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
211
212        // Set the worker senders.
213        self.worker_senders.set(worker_senders).expect("The worker senders are already set");
214
215        // If the sync sender was provided, set the sync sender.
216        if let Some(sync_sender) = sync_sender {
217            self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
218        }
219
220        // Enable the TCP protocols.
221        self.enable_handshake().await;
222        self.enable_reading().await;
223        self.enable_writing().await;
224        self.enable_disconnect().await;
225        self.enable_on_connect().await;
226
227        // Enable the TCP listener. Note: This must be called after the above protocols.
228        let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
229        debug!("Listening for validator connections at address {listen_addr:?}");
230
231        // Initialize the heartbeat.
232        self.initialize_heartbeat();
233
234        info!("Started the gateway for the memory pool at '{}'", self.local_ip());
235    }
236}
237
238// Dynamic rate limiting.
239impl<N: Network> Gateway<N> {
240    /// The current maximum committee size.
241    fn max_committee_size(&self) -> usize {
242        self.ledger.current_committee().map_or_else(
243            |_e| Committee::<N>::max_committee_size().unwrap() as usize,
244            |committee| committee.num_members(),
245        )
246    }
247
248    /// The maximum number of events to cache.
249    fn max_cache_events(&self) -> usize {
250        self.max_cache_transmissions()
251    }
252
253    /// The maximum number of certificate requests to cache.
254    fn max_cache_certificates(&self) -> usize {
255        2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
256    }
257
258    /// The maximum number of transmission requests to cache.
259    fn max_cache_transmissions(&self) -> usize {
260        self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
261    }
262
263    /// The maximum number of duplicates for any particular request.
264    fn max_cache_duplicates(&self) -> usize {
265        self.max_committee_size().pow(2)
266    }
267}
268
269#[async_trait]
270impl<N: Network> CommunicationService for Gateway<N> {
271    /// The message type.
272    type Message = Event<N>;
273
274    /// Prepares a block request to be sent.
275    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
276        debug_assert!(start_height < end_height, "Invalid block request format");
277        Event::BlockRequest(BlockRequest { start_height, end_height })
278    }
279
280    /// Sends the given message to specified peer.
281    ///
282    /// This function returns as soon as the message is queued to be sent,
283    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
284    /// which can be used to determine when and whether the message has been delivered.
285    async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
286        Transport::send(self, peer_ip, message).await
287    }
288
289    fn ban_peer(&self, peer_ip: SocketAddr) {
290        trace!("Banning peer {peer_ip} for timing out on block requests");
291
292        let tcp = self.tcp().clone();
293        tcp.banned_peers().update_ip_ban(peer_ip.ip());
294
295        tokio::spawn(async move {
296            tcp.disconnect(peer_ip).await;
297        });
298    }
299}
300
301impl<N: Network> Gateway<N> {
302    /// Returns the account of the node.
303    pub const fn account(&self) -> &Account<N> {
304        &self.account
305    }
306
307    /// Returns the dev identifier of the node.
308    pub const fn dev(&self) -> Option<u16> {
309        self.dev
310    }
311
312    /// Returns the IP address of this node.
313    pub fn local_ip(&self) -> SocketAddr {
314        self.tcp.listening_addr().expect("The TCP listener is not enabled")
315    }
316
317    /// Returns `true` if the given IP is this node.
318    pub fn is_local_ip(&self, ip: SocketAddr) -> bool {
319        ip == self.local_ip()
320            || (ip.ip().is_unspecified() || ip.ip().is_loopback()) && ip.port() == self.local_ip().port()
321    }
322
323    /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
324    pub fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
325        !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
326    }
327
328    /// Returns the resolver.
329    pub fn resolver(&self) -> &Resolver<N> {
330        &self.resolver
331    }
332
333    /// Returns the validator telemetry.
334    #[cfg(feature = "telemetry")]
335    pub fn validator_telemetry(&self) -> &Telemetry<N> {
336        &self.validator_telemetry
337    }
338
339    /// Returns the primary sender.
340    pub fn primary_sender(&self) -> &PrimarySender<N> {
341        self.primary_sender.get().expect("Primary sender not set in gateway")
342    }
343
344    /// Returns the number of workers.
345    pub fn num_workers(&self) -> u8 {
346        u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
347            .expect("Too many workers")
348    }
349
350    /// Returns the worker sender for the given worker ID.
351    pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
352        self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
353    }
354
355    /// Returns `true` if the node is connected to the given Aleo address.
356    pub fn is_connected_address(&self, address: Address<N>) -> bool {
357        // Retrieve the peer IP of the given address.
358        match self.resolver.get_peer_ip_for_address(address) {
359            // Determine if the peer IP is connected.
360            Some(peer_ip) => self.is_connected_ip(peer_ip),
361            None => false,
362        }
363    }
364
365    /// Returns `true` if the node is connected to the given peer IP.
366    pub fn is_connected_ip(&self, ip: SocketAddr) -> bool {
367        self.connected_peers.read().contains(&ip)
368    }
369
370    /// Returns `true` if the node is connecting to the given peer IP.
371    pub fn is_connecting_ip(&self, ip: SocketAddr) -> bool {
372        self.connecting_peers.lock().contains(&ip)
373    }
374
375    /// Returns `true` if the given peer IP is an authorized validator.
376    pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
377        // If the peer IP is in the trusted validators, return early.
378        if self.trusted_validators.contains(&ip) {
379            return true;
380        }
381        // Retrieve the Aleo address of the peer IP.
382        match self.resolver.get_address(ip) {
383            // Determine if the peer IP is an authorized validator.
384            Some(address) => self.is_authorized_validator_address(address),
385            None => false,
386        }
387    }
388
389    /// Returns `true` if the given address is an authorized validator.
390    pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
391        // Determine if the validator address is a member of the committee lookback,
392        // the current committee, or the previous committee lookbacks.
393        // We allow leniency in this validation check in order to accommodate these two scenarios:
394        //  1. New validators should be able to connect immediately once bonded as a committee member.
395        //  2. Existing validators must remain connected until they are no longer bonded as a committee member.
396        //     (i.e. meaning they must stay online until the next block has been produced)
397
398        // Determine if the validator is in the current committee with lookback.
399        if self
400            .ledger
401            .get_committee_lookback_for_round(self.storage.current_round())
402            .is_ok_and(|committee| committee.is_committee_member(validator_address))
403        {
404            return true;
405        }
406
407        // Determine if the validator is in the latest committee on the ledger.
408        if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
409            return true;
410        }
411
412        // Retrieve the previous block height to consider from the sync tolerance.
413        let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
414        // Determine if the validator is in any of the previous committee lookbacks.
415        match self.ledger.get_block_round(previous_block_height) {
416            Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
417                self.ledger
418                    .get_committee_lookback_for_round(round)
419                    .is_ok_and(|committee| committee.is_committee_member(validator_address))
420            }),
421            Err(_) => false,
422        }
423    }
424
425    /// Returns the maximum number of connected peers.
426    pub fn max_connected_peers(&self) -> usize {
427        self.tcp.config().max_connections as usize
428    }
429
430    /// Returns the number of connected peers.
431    pub fn number_of_connected_peers(&self) -> usize {
432        self.connected_peers.read().len()
433    }
434
435    /// Returns the list of connected addresses.
436    pub fn connected_addresses(&self) -> HashSet<Address<N>> {
437        self.connected_peers.read().iter().filter_map(|peer_ip| self.resolver.get_address(*peer_ip)).collect()
438    }
439
440    /// Returns the list of connected peers.
441    pub fn connected_peers(&self) -> &RwLock<IndexSet<SocketAddr>> {
442        &self.connected_peers
443    }
444
445    /// Attempts to connect to the given peer IP.
446    pub fn connect(&self, peer_ip: SocketAddr) -> Option<JoinHandle<()>> {
447        // Return early if the attempt is against the protocol rules.
448        if let Err(forbidden_error) = self.check_connection_attempt(peer_ip) {
449            warn!("{forbidden_error}");
450            return None;
451        }
452
453        let self_ = self.clone();
454        Some(tokio::spawn(async move {
455            debug!("Connecting to validator {peer_ip}...");
456            // Attempt to connect to the peer.
457            if let Err(error) = self_.tcp.connect(peer_ip).await {
458                self_.connecting_peers.lock().shift_remove(&peer_ip);
459                warn!("Unable to connect to '{peer_ip}' - {error}");
460            }
461        }))
462    }
463
464    /// Ensure we are allowed to connect to the given peer.
465    fn check_connection_attempt(&self, peer_ip: SocketAddr) -> Result<()> {
466        // Ensure the peer IP is not this node.
467        if self.is_local_ip(peer_ip) {
468            bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (attempted to self-connect)")
469        }
470        // Ensure the node does not surpass the maximum number of peer connections.
471        if self.number_of_connected_peers() >= self.max_connected_peers() {
472            bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (maximum peers reached)")
473        }
474        // Ensure the node is not already connected to this peer.
475        if self.is_connected_ip(peer_ip) {
476            bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connected)")
477        }
478        // Ensure the node is not already connecting to this peer.
479        if self.is_connecting_ip(peer_ip) {
480            bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connecting)")
481        }
482        Ok(())
483    }
484
485    /// Ensure the peer is allowed to connect.
486    fn ensure_peer_is_allowed(&self, peer_ip: SocketAddr) -> Result<()> {
487        // Ensure the peer IP is not this node.
488        if self.is_local_ip(peer_ip) {
489            bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (attempted to self-connect)")
490        }
491        // Ensure the node is not already connecting to this peer.
492        if !self.connecting_peers.lock().insert(peer_ip) {
493            bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already shaking hands as the initiator)")
494        }
495        // Ensure the node is not already connected to this peer.
496        if self.is_connected_ip(peer_ip) {
497            bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already connected)")
498        }
499        // Ensure the peer is not spamming connection attempts.
500        if !peer_ip.ip().is_loopback() {
501            // Add this connection attempt and retrieve the number of attempts.
502            let num_attempts = self.cache.insert_inbound_connection(peer_ip.ip(), RESTRICTED_INTERVAL);
503            // Ensure the connecting peer has not surpassed the connection attempt limit.
504            if num_attempts > MAX_CONNECTION_ATTEMPTS {
505                bail!("Dropping connection request from '{peer_ip}' (tried {num_attempts} times)")
506            }
507        }
508        Ok(())
509    }
510
511    /// Check whether the given IP address is currently banned.
512    #[cfg(not(any(test)))]
513    fn is_ip_banned(&self, ip: IpAddr) -> bool {
514        self.tcp.banned_peers().is_ip_banned(&ip)
515    }
516
517    /// Insert or update a banned IP.
518    #[cfg(not(any(test)))]
519    fn update_ip_ban(&self, ip: IpAddr) {
520        self.tcp.banned_peers().update_ip_ban(ip);
521    }
522
523    #[cfg(feature = "metrics")]
524    fn update_metrics(&self) {
525        metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
526        metrics::gauge(metrics::bft::CONNECTING, self.connecting_peers.lock().len() as f64);
527    }
528
529    /// Inserts the given peer into the connected peers.
530    #[cfg(not(test))]
531    fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
532        // Adds a bidirectional map between the listener address and (ambiguous) peer address.
533        self.resolver.insert_peer(peer_ip, peer_addr, address);
534        // Add a transmission for this peer in the connected peers.
535        self.connected_peers.write().insert(peer_ip);
536        #[cfg(feature = "metrics")]
537        self.update_metrics();
538    }
539
540    /// Inserts the given peer into the connected peers. This is only used in testing.
541    #[cfg(test)]
542    pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
543        // Adds a bidirectional map between the listener address and (ambiguous) peer address.
544        self.resolver.insert_peer(peer_ip, peer_addr, address);
545        // Add a transmission for this peer in the connected peers.
546        self.connected_peers.write().insert(peer_ip);
547    }
548
549    /// Removes the connected peer and adds them to the candidate peers.
550    fn remove_connected_peer(&self, peer_ip: SocketAddr) {
551        // Remove the peer from the sync module. Except for some tests, there is always a sync sender.
552        if let Some(sync_sender) = self.sync_sender.get() {
553            let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
554            tokio::spawn(async move {
555                if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
556                    warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
557                }
558            });
559        }
560        // Removes the bidirectional map between the listener address and (ambiguous) peer address.
561        self.resolver.remove_peer(peer_ip);
562        // Remove this peer from the connected peers, if it exists.
563        self.connected_peers.write().shift_remove(&peer_ip);
564        #[cfg(feature = "metrics")]
565        self.update_metrics();
566    }
567
568    /// Sends the given event to specified peer.
569    ///
570    /// This function returns as soon as the event is queued to be sent,
571    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
572    /// which can be used to determine when and whether the event has been delivered.
573    fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
574        // Resolve the listener IP to the (ambiguous) peer address.
575        let Some(peer_addr) = self.resolver.get_ambiguous(peer_ip) else {
576            warn!("Unable to resolve the listener IP address '{peer_ip}'");
577            return None;
578        };
579        // Retrieve the event name.
580        let name = event.name();
581        // Send the event to the peer.
582        trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
583        let result = self.unicast(peer_addr, event);
584        // If the event was unable to be sent, disconnect.
585        if let Err(e) = &result {
586            warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
587            debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
588            self.disconnect(peer_ip);
589        }
590        result.ok()
591    }
592
593    /// Handles the inbound event from the peer.
594    async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<()> {
595        // Retrieve the listener IP for the peer.
596        let Some(peer_ip) = self.resolver.get_listener(peer_addr) else {
597            bail!("{CONTEXT} Unable to resolve the (ambiguous) peer address '{peer_addr}'")
598        };
599        // Ensure that the peer is an authorized committee member.
600        if !self.is_authorized_validator_ip(peer_ip) {
601            bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
602        }
603        // Drop the peer, if they have exceeded the rate limit (i.e. they are requesting too much from us).
604        let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
605        if num_events >= self.max_cache_events() {
606            bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
607        }
608        // Rate limit for duplicate requests.
609        match event {
610            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
611                // Retrieve the certificate ID.
612                let certificate_id = match &event {
613                    Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
614                    Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
615                    _ => unreachable!(),
616                };
617                // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
618                let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
619                if num_events >= self.max_cache_duplicates() {
620                    return Ok(());
621                }
622            }
623            Event::TransmissionRequest(TransmissionRequest { transmission_id })
624            | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
625                // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
626                let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
627                if num_events >= self.max_cache_duplicates() {
628                    return Ok(());
629                }
630            }
631            Event::BlockRequest(_) => {
632                let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
633                if num_events >= self.max_cache_duplicates() {
634                    return Ok(());
635                }
636            }
637            _ => {}
638        }
639        trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
640
641        // This match statement handles the inbound event by deserializing the event,
642        // checking the event is valid, and then calling the appropriate (trait) handler.
643        match event {
644            Event::BatchPropose(batch_propose) => {
645                // Send the batch propose to the primary.
646                let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
647                Ok(())
648            }
649            Event::BatchSignature(batch_signature) => {
650                // Send the batch signature to the primary.
651                let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
652                Ok(())
653            }
654            Event::BatchCertified(batch_certified) => {
655                // Send the batch certificate to the primary.
656                let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
657                Ok(())
658            }
659            Event::BlockRequest(block_request) => {
660                let BlockRequest { start_height, end_height } = block_request;
661
662                // Ensure the block request is well-formed.
663                if start_height >= end_height {
664                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
665                }
666                // Ensure that the block request is within the allowed bounds.
667                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
668                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
669                }
670
671                let self_ = self.clone();
672                let blocks = match task::spawn_blocking(move || {
673                    // Retrieve the blocks within the requested range.
674                    match self_.ledger.get_blocks(start_height..end_height) {
675                        Ok(blocks) => Ok(Data::Object(DataBlocks(blocks))),
676                        Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
677                    }
678                })
679                .await
680                {
681                    Ok(Ok(blocks)) => blocks,
682                    Ok(Err(error)) => return Err(error),
683                    Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
684                };
685
686                let self_ = self.clone();
687                tokio::spawn(async move {
688                    // Send the `BlockResponse` message to the peer.
689                    let event = Event::BlockResponse(BlockResponse { request: block_request, blocks });
690                    Transport::send(&self_, peer_ip, event).await;
691                });
692                Ok(())
693            }
694            Event::BlockResponse(block_response) => {
695                // Process the block response. Except for some tests, there is always a sync sender.
696                if let Some(sync_sender) = self.sync_sender.get() {
697                    // Retrieve the block response.
698                    let BlockResponse { request, blocks } = block_response;
699
700                    // Check the response corresponds to a request.
701                    if !self.cache.remove_outbound_block_request(peer_ip, &request) {
702                        bail!("Unsolicited block response from '{peer_ip}'")
703                    }
704
705                    // Perform the deferred non-blocking deserialization of the blocks.
706                    // The deserialization can take a long time (minutes). We should not be running
707                    // this on a blocking task, but on a rayon thread pool.
708                    let (send, recv) = tokio::sync::oneshot::channel();
709                    rayon::spawn_fifo(move || {
710                        let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
711                        let _ = send.send(blocks);
712                    });
713                    let blocks = match recv.await {
714                        Ok(Ok(blocks)) => blocks,
715                        Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
716                        Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
717                    };
718
719                    // Ensure the block response is well-formed.
720                    blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
721                    // Send the blocks to the sync module.
722                    if let Err(e) = sync_sender.advance_with_sync_blocks(peer_ip, blocks.0).await {
723                        warn!("Unable to process block response from '{peer_ip}' - {e}");
724                    }
725                }
726                Ok(())
727            }
728            Event::CertificateRequest(certificate_request) => {
729                // Send the certificate request to the sync module.
730                // Except for some tests, there is always a sync sender.
731                if let Some(sync_sender) = self.sync_sender.get() {
732                    // Send the certificate request to the sync module.
733                    let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
734                }
735                Ok(())
736            }
737            Event::CertificateResponse(certificate_response) => {
738                // Send the certificate response to the sync module.
739                // Except for some tests, there is always a sync sender.
740                if let Some(sync_sender) = self.sync_sender.get() {
741                    // Send the certificate response to the sync module.
742                    let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
743                }
744                Ok(())
745            }
746            Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
747                // Disconnect as the peer is not following the protocol.
748                bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
749            }
750            Event::Disconnect(disconnect) => {
751                bail!("{CONTEXT} {:?}", disconnect.reason)
752            }
753            Event::PrimaryPing(ping) => {
754                let PrimaryPing { version, block_locators, primary_certificate } = ping;
755
756                // Ensure the event version is not outdated.
757                if version < Event::<N>::VERSION {
758                    bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
759                }
760
761                // Update the peer locators. Except for some tests, there is always a sync sender.
762                if let Some(sync_sender) = self.sync_sender.get() {
763                    // Check the block locators are valid, and update the validators in the sync module.
764                    if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
765                        bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
766                    }
767                }
768
769                // Send the batch certificates to the primary.
770                let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
771                Ok(())
772            }
773            Event::TransmissionRequest(request) => {
774                // TODO (howardwu): Add rate limiting checks on this event, on a per-peer basis.
775                // Determine the worker ID.
776                let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
777                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
778                    return Ok(());
779                };
780                // Send the transmission request to the worker.
781                if let Some(sender) = self.get_worker_sender(worker_id) {
782                    // Send the transmission request to the worker.
783                    let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
784                }
785                Ok(())
786            }
787            Event::TransmissionResponse(response) => {
788                // Determine the worker ID.
789                let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
790                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
791                    return Ok(());
792                };
793                // Send the transmission response to the worker.
794                if let Some(sender) = self.get_worker_sender(worker_id) {
795                    // Send the transmission response to the worker.
796                    let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
797                }
798                Ok(())
799            }
800            Event::ValidatorsRequest(_) => {
801                // Retrieve the connected peers.
802                let mut connected_peers: Vec<_> = match self.dev.is_some() {
803                    // In development mode, relax the validity requirements to make operating devnets more flexible.
804                    true => self.connected_peers.read().iter().copied().collect(),
805                    // In production mode, ensure the peer IPs are valid.
806                    false => {
807                        self.connected_peers.read().iter().copied().filter(|ip| self.is_valid_peer_ip(*ip)).collect()
808                    }
809                };
810                // Shuffle the connected peers.
811                connected_peers.shuffle(&mut rand::thread_rng());
812
813                let self_ = self.clone();
814                tokio::spawn(async move {
815                    // Initialize the validators.
816                    let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
817                    // Iterate over the validators.
818                    for validator_ip in connected_peers.into_iter().take(MAX_VALIDATORS_TO_SEND) {
819                        // Retrieve the validator address.
820                        if let Some(validator_address) = self_.resolver.get_address(validator_ip) {
821                            // Add the validator to the list of validators.
822                            validators.insert(validator_ip, validator_address);
823                        }
824                    }
825                    // Send the validators response to the peer.
826                    let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
827                    Transport::send(&self_, peer_ip, event).await;
828                });
829                Ok(())
830            }
831            Event::ValidatorsResponse(response) => {
832                let ValidatorsResponse { validators } = response;
833                // Ensure the number of validators is not too large.
834                ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
835                // Ensure the cache contains a validators request for this peer.
836                if !self.cache.contains_outbound_validators_request(peer_ip) {
837                    bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
838                }
839                // Decrement the number of validators requests for this peer.
840                self.cache.decrement_outbound_validators_requests(peer_ip);
841
842                // If the number of connected validators is less than the minimum, connect to more validators.
843                if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
844                    // Attempt to connect to any validators that are not already connected.
845                    let self_ = self.clone();
846                    tokio::spawn(async move {
847                        for (validator_ip, validator_address) in validators {
848                            if self_.dev.is_some() {
849                                // Ensure the validator IP is not this node.
850                                if self_.is_local_ip(validator_ip) {
851                                    continue;
852                                }
853                            } else {
854                                // Ensure the validator IP is not this node and is well-formed.
855                                if !self_.is_valid_peer_ip(validator_ip) {
856                                    continue;
857                                }
858                            }
859
860                            // Ensure the validator address is not this node.
861                            if self_.account.address() == validator_address {
862                                continue;
863                            }
864                            // Ensure the validator IP is not already connected or connecting.
865                            if self_.is_connected_ip(validator_ip) || self_.is_connecting_ip(validator_ip) {
866                                continue;
867                            }
868                            // Ensure the validator address is not already connected.
869                            if self_.is_connected_address(validator_address) {
870                                continue;
871                            }
872                            // Ensure the validator address is an authorized validator.
873                            if !self_.is_authorized_validator_address(validator_address) {
874                                continue;
875                            }
876                            // Attempt to connect to the validator.
877                            self_.connect(validator_ip);
878                        }
879                    });
880                }
881                Ok(())
882            }
883            Event::WorkerPing(ping) => {
884                // Ensure the number of transmissions is not too large.
885                ensure!(
886                    ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
887                    "{CONTEXT} Received too many transmissions"
888                );
889                // Retrieve the number of workers.
890                let num_workers = self.num_workers();
891                // Iterate over the transmission IDs.
892                for transmission_id in ping.transmission_ids.into_iter() {
893                    // Determine the worker ID.
894                    let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
895                        warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
896                        continue;
897                    };
898                    // Send the transmission ID to the worker.
899                    if let Some(sender) = self.get_worker_sender(worker_id) {
900                        // Send the transmission ID to the worker.
901                        let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
902                    }
903                }
904                Ok(())
905            }
906        }
907    }
908
909    /// Disconnects from the given peer IP, if the peer is connected.
910    pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle<()> {
911        let gateway = self.clone();
912        tokio::spawn(async move {
913            if let Some(peer_addr) = gateway.resolver.get_ambiguous(peer_ip) {
914                // Disconnect from this peer.
915                let _disconnected = gateway.tcp.disconnect(peer_addr).await;
916                debug_assert!(_disconnected);
917            }
918        })
919    }
920
921    /// Initialize a new instance of the heartbeat.
922    fn initialize_heartbeat(&self) {
923        let self_clone = self.clone();
924        self.spawn(async move {
925            // Sleep briefly to ensure the other nodes are ready to connect.
926            tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
927            info!("Starting the heartbeat of the gateway...");
928            loop {
929                // Process a heartbeat in the gateway.
930                self_clone.heartbeat();
931                // Sleep for the heartbeat interval.
932                tokio::time::sleep(Duration::from_secs(15)).await;
933            }
934        });
935    }
936
937    /// Spawns a task with the given future; it should only be used for long-running tasks.
938    #[allow(dead_code)]
939    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
940        self.handles.lock().push(tokio::spawn(future));
941    }
942
943    /// Shuts down the gateway.
944    pub async fn shut_down(&self) {
945        info!("Shutting down the gateway...");
946        // Abort the tasks.
947        self.handles.lock().iter().for_each(|handle| handle.abort());
948        // Close the listener.
949        self.tcp.shut_down().await;
950    }
951}
952
953impl<N: Network> Gateway<N> {
954    /// Handles the heartbeat request.
955    fn heartbeat(&self) {
956        // Log the connected validators.
957        self.log_connected_validators();
958        // Log the validator participation scores.
959        #[cfg(feature = "telemetry")]
960        self.log_participation_scores();
961        // Keep the trusted validators connected.
962        self.handle_trusted_validators();
963        // Removes any validators that not in the current committee.
964        self.handle_unauthorized_validators();
965        // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
966        self.handle_min_connected_validators();
967        // Unban any addresses whose ban time has expired.
968        self.handle_banned_ips();
969    }
970
971    /// Logs the connected validators.
972    fn log_connected_validators(&self) {
973        // Log the connected validators.
974        let connected_validators = self.connected_peers().read().clone();
975        // Resolve the total number of connectable validators.
976        let validators_total = self.ledger.current_committee().map_or(0, |c| c.num_members().saturating_sub(1));
977        // Format the total validators message.
978        let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
979        // Construct the connections message.
980        let connections_msg = match connected_validators.len() {
981            0 => "No connected validators".to_string(),
982            num_connected => format!("Connected to {num_connected} validators {total_validators}"),
983        };
984        // Collect the connected validator addresses.
985        let mut connected_validator_addresses = IndexSet::with_capacity(connected_validators.len());
986        connected_validator_addresses.insert(self.account.address());
987        // Log the connected validators.
988        info!("{connections_msg}");
989        for peer_ip in &connected_validators {
990            let address = self.resolver.get_address(*peer_ip).map_or("Unknown".to_string(), |a| {
991                connected_validator_addresses.insert(a);
992                a.to_string()
993            });
994            debug!("{}", format!("  {peer_ip} - {address}").dimmed());
995        }
996
997        // Log the validators that are not connected.
998        let num_not_connected = validators_total.saturating_sub(connected_validators.len());
999        if num_not_connected > 0 {
1000            info!("Not connected to {num_not_connected} validators {total_validators}");
1001            // Collect the committee members.
1002            let committee_members: IndexSet<_> =
1003                self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
1004
1005            // Log the validators that are not connected.
1006            for address in committee_members.difference(&connected_validator_addresses) {
1007                debug!("{}", format!("  Not connected to {address}").dimmed());
1008            }
1009        }
1010    }
1011
1012    // Logs the validator participation scores.
1013    #[cfg(feature = "telemetry")]
1014    fn log_participation_scores(&self) {
1015        if let Ok(current_committee) = self.ledger.current_committee() {
1016            // Retrieve the participation scores.
1017            let participation_scores = self.validator_telemetry().get_participation_scores(&current_committee);
1018            // Log the participation scores.
1019            debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
1020            for (address, score) in participation_scores {
1021                debug!("{}", format!("  {address} - {score:.2}%").dimmed());
1022            }
1023        }
1024    }
1025
1026    /// This function attempts to connect to any disconnected trusted validators.
1027    fn handle_trusted_validators(&self) {
1028        // Ensure that the trusted nodes are connected.
1029        for validator_ip in &self.trusted_validators {
1030            // If the trusted_validator is not connected, attempt to connect to it.
1031            if !self.is_local_ip(*validator_ip)
1032                && !self.is_connecting_ip(*validator_ip)
1033                && !self.is_connected_ip(*validator_ip)
1034            {
1035                // Attempt to connect to the trusted validator.
1036                self.connect(*validator_ip);
1037            }
1038        }
1039    }
1040
1041    /// This function attempts to disconnect any validators that are not in the current committee.
1042    fn handle_unauthorized_validators(&self) {
1043        let self_ = self.clone();
1044        tokio::spawn(async move {
1045            // Retrieve the connected validators.
1046            let validators = self_.connected_peers().read().clone();
1047            // Iterate over the validator IPs.
1048            for peer_ip in validators {
1049                // Disconnect any validator that is not in the current committee.
1050                if !self_.is_authorized_validator_ip(peer_ip) {
1051                    warn!("{CONTEXT} Disconnecting from '{peer_ip}' - Validator is not in the current committee");
1052                    Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1053                    // Disconnect from this peer.
1054                    self_.disconnect(peer_ip);
1055                }
1056            }
1057        });
1058    }
1059
1060    /// This function sends a `ValidatorsRequest` to a random validator,
1061    /// if the number of connected validators is less than the minimum.
1062    fn handle_min_connected_validators(&self) {
1063        // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
1064        if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
1065            // Retrieve the connected validators.
1066            let validators = self.connected_peers().read().clone();
1067            // If there are no validator IPs to connect to, return early.
1068            if validators.is_empty() {
1069                return;
1070            }
1071            // Select a random validator IP.
1072            if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1073                let self_ = self.clone();
1074                tokio::spawn(async move {
1075                    // Increment the number of outbound validators requests for this validator.
1076                    self_.cache.increment_outbound_validators_requests(validator_ip);
1077                    // Send a `ValidatorsRequest` to the validator.
1078                    let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1079                });
1080            }
1081        }
1082    }
1083
1084    /// Processes a message received from the network.
1085    async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1086        // Process the message. Disconnect if the peer violated the protocol.
1087        if let Err(error) = self.inbound(peer_addr, message).await {
1088            if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
1089                warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1090                let self_ = self.clone();
1091                tokio::spawn(async move {
1092                    Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1093                    // Disconnect from this peer.
1094                    self_.disconnect(peer_ip);
1095                });
1096            }
1097        }
1098    }
1099
1100    // Remove addresses whose ban time has expired.
1101    fn handle_banned_ips(&self) {
1102        self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1103    }
1104}
1105
1106#[async_trait]
1107impl<N: Network> Transport<N> for Gateway<N> {
1108    /// Sends the given event to specified peer.
1109    ///
1110    /// This method is rate limited to prevent spamming the peer.
1111    ///
1112    /// This function returns as soon as the event is queued to be sent,
1113    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
1114    /// which can be used to determine when and whether the event has been delivered.
1115    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1116        macro_rules! send {
1117            ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1118                // Rate limit the number of certificate requests sent to the peer.
1119                while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1120                    // Sleep for a short period of time to allow the cache to clear.
1121                    tokio::time::sleep(Duration::from_millis(10)).await;
1122                }
1123                // Send the event to the peer.
1124                $self.send_inner(peer_ip, event)
1125            }};
1126        }
1127
1128        // Increment the cache for certificate, transmission and block events.
1129        match event {
1130            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1131                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1132                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1133                // Send the event to the peer.
1134                send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1135            }
1136            Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1137                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1138                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1139                // Send the event to the peer.
1140                send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1141            }
1142            Event::BlockRequest(request) => {
1143                // Insert the outbound request so we can match it to responses.
1144                self.cache.insert_outbound_block_request(peer_ip, request);
1145                // Send the event to the peer and update the outbound event cache, use the general rate limit.
1146                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1147            }
1148            _ => {
1149                // Send the event to the peer, use the general rate limit.
1150                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1151            }
1152        }
1153    }
1154
1155    /// Broadcasts the given event to all connected peers.
1156    // TODO(ljedrz): the event should be checked for the presence of Data::Object, and
1157    // serialized in advance if it's there.
1158    fn broadcast(&self, event: Event<N>) {
1159        // Ensure there are connected peers.
1160        if self.number_of_connected_peers() > 0 {
1161            let self_ = self.clone();
1162            let connected_peers = self.connected_peers.read().clone();
1163            tokio::spawn(async move {
1164                // Iterate through all connected peers.
1165                for peer_ip in connected_peers {
1166                    // Send the event to the peer.
1167                    let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1168                }
1169            });
1170        }
1171    }
1172}
1173
1174impl<N: Network> P2P for Gateway<N> {
1175    /// Returns a reference to the TCP instance.
1176    fn tcp(&self) -> &Tcp {
1177        &self.tcp
1178    }
1179}
1180
1181#[async_trait]
1182impl<N: Network> Reading for Gateway<N> {
1183    type Codec = EventCodec<N>;
1184    type Message = Event<N>;
1185
1186    /// Creates a [`Decoder`] used to interpret messages from the network.
1187    /// The `side` param indicates the connection side **from the node's perspective**.
1188    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1189        Default::default()
1190    }
1191
1192    /// Processes a message received from the network.
1193    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1194        if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1195            let self_ = self.clone();
1196            // Handle BlockRequest and BlockResponse messages in a separate task to not block the
1197            // inbound queue.
1198            tokio::spawn(async move {
1199                self_.process_message_inner(peer_addr, message).await;
1200            });
1201        } else {
1202            self.process_message_inner(peer_addr, message).await;
1203        }
1204        Ok(())
1205    }
1206
1207    /// Computes the depth of per-connection queues used to process inbound messages, sufficient to process the maximum expected load at any givent moment.
1208    /// The greater it is, the more inbound messages the node can enqueue, but a too large value can make the node more susceptible to DoS attacks.
1209    fn message_queue_depth(&self) -> usize {
1210        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1211            * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1212            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1213    }
1214}
1215
1216#[async_trait]
1217impl<N: Network> Writing for Gateway<N> {
1218    type Codec = EventCodec<N>;
1219    type Message = Event<N>;
1220
1221    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
1222    /// The `side` parameter indicates the connection side **from the node's perspective**.
1223    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1224        Default::default()
1225    }
1226
1227    /// Computes the depth of per-connection queues used to send outbound messages, sufficient to process the maximum expected load at any givent moment.
1228    /// The greater it is, the more outbound messages the node can enqueue. A too large value large value might obscure potential issues with your implementation
1229    /// (like slow serialization) or network.
1230    fn message_queue_depth(&self) -> usize {
1231        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1232            * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1233            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1234    }
1235}
1236
1237#[async_trait]
1238impl<N: Network> Disconnect for Gateway<N> {
1239    /// Any extra operations to be performed during a disconnect.
1240    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1241        if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
1242            self.remove_connected_peer(peer_ip);
1243
1244            // We don't clear this map based on time but only on peer disconnect.
1245            // This is sufficient to avoid infinite growth as the committee has a fixed number
1246            // of members.
1247            self.cache.clear_outbound_validators_requests(peer_ip);
1248            self.cache.clear_outbound_block_requests(peer_ip);
1249        }
1250    }
1251}
1252
1253#[async_trait]
1254impl<N: Network> OnConnect for Gateway<N> {
1255    async fn on_connect(&self, _peer_addr: SocketAddr) {
1256        return;
1257    }
1258}
1259
1260#[async_trait]
1261impl<N: Network> Handshake for Gateway<N> {
1262    /// Performs the handshake protocol.
1263    async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1264        // Perform the handshake.
1265        let peer_addr = connection.addr();
1266        let peer_side = connection.side();
1267
1268        // Check (or impose) IP-level bans.
1269        #[cfg(not(any(test)))]
1270        if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1271            // If the IP is already banned reject the connection.
1272            if self.is_ip_banned(peer_addr.ip()) {
1273                trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
1274                return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1275            }
1276
1277            let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1278
1279            debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1280            if num_attempts > MAX_CONNECTION_ATTEMPTS {
1281                self.update_ip_ban(peer_addr.ip());
1282                trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1283                return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1284            }
1285        }
1286
1287        let stream = self.borrow_stream(&mut connection);
1288
1289        // If this is an inbound connection, we log it, but don't know the listening address yet.
1290        // Otherwise, we can immediately register the listening address.
1291        let mut peer_ip = if peer_side == ConnectionSide::Initiator {
1292            debug!("{CONTEXT} Gateway received a connection request from '{peer_addr}'");
1293            None
1294        } else {
1295            debug!("{CONTEXT} Gateway is connecting to {peer_addr}...");
1296            Some(peer_addr)
1297        };
1298
1299        // Retrieve the restrictions ID.
1300        let restrictions_id = self.ledger.latest_restrictions_id();
1301
1302        // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
1303        let handshake_result = if peer_side == ConnectionSide::Responder {
1304            self.handshake_inner_initiator(peer_addr, peer_ip, restrictions_id, stream).await
1305        } else {
1306            self.handshake_inner_responder(peer_addr, &mut peer_ip, restrictions_id, stream).await
1307        };
1308
1309        // Remove the address from the collection of connecting peers (if the handshake got to the point where it's known).
1310        if let Some(ip) = peer_ip {
1311            self.connecting_peers.lock().shift_remove(&ip);
1312        }
1313        let (ref peer_ip, _) = handshake_result?;
1314        info!("{CONTEXT} Gateway is connected to '{peer_ip}'");
1315
1316        Ok(connection)
1317    }
1318}
1319
1320/// A macro unwrapping the expected handshake event or returning an error for unexpected events.
1321macro_rules! expect_event {
1322    ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1323        match $framed.try_next().await? {
1324            // Received the expected event, proceed.
1325            Some($event_ty(data)) => {
1326                trace!("{CONTEXT} Gateway received '{}' from '{}'", data.name(), $peer_addr);
1327                data
1328            }
1329            // Received a disconnect event, abort.
1330            Some(Event::Disconnect(reason)) => {
1331                return Err(error(format!("{CONTEXT} '{}' disconnected: {reason:?}", $peer_addr)));
1332            }
1333            // Received an unexpected event, abort.
1334            Some(ty) => {
1335                return Err(error(format!(
1336                    "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1337                    $peer_addr,
1338                    ty.name(),
1339                    stringify!($event_ty),
1340                )))
1341            }
1342            // Received nothing.
1343            None => {
1344                return Err(error(format!(
1345                    "{CONTEXT} '{}' disconnected before sending {:?}",
1346                    $peer_addr,
1347                    stringify!($event_ty)
1348                )))
1349            }
1350        }
1351    };
1352}
1353
1354/// Send the given message to the peer.
1355async fn send_event<N: Network>(
1356    framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1357    peer_addr: SocketAddr,
1358    event: Event<N>,
1359) -> io::Result<()> {
1360    trace!("{CONTEXT} Gateway is sending '{}' to '{peer_addr}'", event.name());
1361    framed.send(event).await
1362}
1363
1364impl<N: Network> Gateway<N> {
1365    /// The connection initiator side of the handshake.
1366    async fn handshake_inner_initiator<'a>(
1367        &'a self,
1368        peer_addr: SocketAddr,
1369        peer_ip: Option<SocketAddr>,
1370        restrictions_id: Field<N>,
1371        stream: &'a mut TcpStream,
1372    ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec<N>>)> {
1373        // This value is immediately guaranteed to be present, so it can be unwrapped.
1374        let peer_ip = peer_ip.unwrap();
1375
1376        // Construct the stream.
1377        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1378
1379        // Initialize an RNG.
1380        let rng = &mut rand::rngs::OsRng;
1381
1382        /* Step 1: Send the challenge request. */
1383
1384        // Sample a random nonce.
1385        let our_nonce = rng.r#gen();
1386        // Send a challenge request to the peer.
1387        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce);
1388        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1389
1390        /* Step 2: Receive the peer's challenge response followed by the challenge request. */
1391
1392        // Listen for the challenge response message.
1393        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1394        // Listen for the challenge request message.
1395        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1396
1397        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1398        if let Some(reason) = self
1399            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1400            .await
1401        {
1402            send_event(&mut framed, peer_addr, reason.into()).await?;
1403            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1404        }
1405        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1406        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1407            send_event(&mut framed, peer_addr, reason.into()).await?;
1408            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1409        }
1410
1411        /* Step 3: Send the challenge response. */
1412
1413        // Sign the counterparty nonce.
1414        let response_nonce: u64 = rng.r#gen();
1415        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1416        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1417            return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1418        };
1419        // Send the challenge response.
1420        let our_response =
1421            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1422        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1423
1424        // Add the peer to the gateway.
1425        self.insert_connected_peer(peer_ip, peer_addr, peer_request.address);
1426
1427        Ok((peer_ip, framed))
1428    }
1429
1430    /// The connection responder side of the handshake.
1431    async fn handshake_inner_responder<'a>(
1432        &'a self,
1433        peer_addr: SocketAddr,
1434        peer_ip: &mut Option<SocketAddr>,
1435        restrictions_id: Field<N>,
1436        stream: &'a mut TcpStream,
1437    ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec<N>>)> {
1438        // Construct the stream.
1439        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1440
1441        /* Step 1: Receive the challenge request. */
1442
1443        // Listen for the challenge request message.
1444        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1445
1446        // Ensure the address is not the same as this node.
1447        if self.account.address() == peer_request.address {
1448            return Err(error("Skipping request to connect to self".to_string()));
1449        }
1450
1451        // Obtain the peer's listening address.
1452        *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1453        let peer_ip = peer_ip.unwrap();
1454
1455        // Knowing the peer's listening address, ensure it is allowed to connect.
1456        if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1457            return Err(error(format!("{forbidden_message}")));
1458        }
1459        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1460        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1461            send_event(&mut framed, peer_addr, reason.into()).await?;
1462            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1463        }
1464
1465        /* Step 2: Send the challenge response followed by own challenge request. */
1466
1467        // Initialize an RNG.
1468        let rng = &mut rand::rngs::OsRng;
1469
1470        // Sign the counterparty nonce.
1471        let response_nonce: u64 = rng.r#gen();
1472        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1473        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1474            return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1475        };
1476        // Send the challenge response.
1477        let our_response =
1478            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1479        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1480
1481        // Sample a random nonce.
1482        let our_nonce = rng.r#gen();
1483        // Send the challenge request.
1484        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce);
1485        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1486
1487        /* Step 3: Receive the challenge response. */
1488
1489        // Listen for the challenge response message.
1490        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1491        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1492        if let Some(reason) = self
1493            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1494            .await
1495        {
1496            send_event(&mut framed, peer_addr, reason.into()).await?;
1497            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1498        }
1499        // Add the peer to the gateway.
1500        self.insert_connected_peer(peer_ip, peer_addr, peer_request.address);
1501
1502        Ok((peer_ip, framed))
1503    }
1504
1505    /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
1506    fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1507        // Retrieve the components of the challenge request.
1508        let &ChallengeRequest { version, listener_port: _, address, nonce: _ } = event;
1509        // Ensure the event protocol version is not outdated.
1510        if version < Event::<N>::VERSION {
1511            warn!("{CONTEXT} Gateway is dropping '{peer_addr}' on version {version} (outdated)");
1512            return Some(DisconnectReason::OutdatedClientVersion);
1513        }
1514        // Ensure the address is a current committee member.
1515        if !self.is_authorized_validator_address(address) {
1516            warn!("{CONTEXT} Gateway is dropping '{peer_addr}' for being an unauthorized validator ({address})");
1517            return Some(DisconnectReason::ProtocolViolation);
1518        }
1519        // Ensure the address is not already connected.
1520        if self.is_connected_address(address) {
1521            warn!("{CONTEXT} Gateway is dropping '{peer_addr}' for being already connected ({address})");
1522            return Some(DisconnectReason::ProtocolViolation);
1523        }
1524        None
1525    }
1526
1527    /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
1528    async fn verify_challenge_response(
1529        &self,
1530        peer_addr: SocketAddr,
1531        peer_address: Address<N>,
1532        response: ChallengeResponse<N>,
1533        expected_restrictions_id: Field<N>,
1534        expected_nonce: u64,
1535    ) -> Option<DisconnectReason> {
1536        // Retrieve the components of the challenge response.
1537        let ChallengeResponse { restrictions_id, signature, nonce } = response;
1538
1539        // Verify the restrictions ID.
1540        if restrictions_id != expected_restrictions_id {
1541            warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1542            return Some(DisconnectReason::InvalidChallengeResponse);
1543        }
1544        // Perform the deferred non-blocking deserialization of the signature.
1545        let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1546            warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1547            return Some(DisconnectReason::InvalidChallengeResponse);
1548        };
1549        // Verify the signature.
1550        if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1551            warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (invalid signature)");
1552            return Some(DisconnectReason::InvalidChallengeResponse);
1553        }
1554        None
1555    }
1556}
1557
1558#[cfg(test)]
1559mod prop_tests {
1560    use crate::{
1561        Gateway,
1562        MAX_WORKERS,
1563        MEMORY_POOL_PORT,
1564        Worker,
1565        gateway::prop_tests::GatewayAddress::{Dev, Prod},
1566        helpers::{Storage, init_primary_channels, init_worker_channels},
1567    };
1568    use snarkos_account::Account;
1569    use snarkos_node_bft_ledger_service::MockLedgerService;
1570    use snarkos_node_bft_storage_service::BFTMemoryService;
1571    use snarkos_node_tcp::P2P;
1572    use snarkvm::{
1573        ledger::{
1574            committee::{
1575                Committee,
1576                prop_tests::{CommitteeContext, ValidatorSet},
1577                test_helpers::sample_committee_for_round_and_members,
1578            },
1579            narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1580        },
1581        prelude::{MainnetV0, PrivateKey},
1582        utilities::TestRng,
1583    };
1584
1585    use indexmap::{IndexMap, IndexSet};
1586    use proptest::{
1587        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1588        sample::Selector,
1589    };
1590    use std::{
1591        fmt::{Debug, Formatter},
1592        net::{IpAddr, Ipv4Addr, SocketAddr},
1593        sync::Arc,
1594    };
1595    use test_strategy::proptest;
1596
1597    type CurrentNetwork = MainnetV0;
1598
1599    impl Debug for Gateway<CurrentNetwork> {
1600        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1601            // TODO implement Debug properly and move it over to production code
1602            f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1603        }
1604    }
1605
1606    #[derive(Debug, test_strategy::Arbitrary)]
1607    enum GatewayAddress {
1608        Dev(u8),
1609        Prod(Option<SocketAddr>),
1610    }
1611
1612    impl GatewayAddress {
1613        fn ip(&self) -> Option<SocketAddr> {
1614            if let GatewayAddress::Prod(ip) = self {
1615                return *ip;
1616            }
1617            None
1618        }
1619
1620        fn port(&self) -> Option<u16> {
1621            if let GatewayAddress::Dev(port) = self {
1622                return Some(*port as u16);
1623            }
1624            None
1625        }
1626    }
1627
1628    impl Arbitrary for Gateway<CurrentNetwork> {
1629        type Parameters = ();
1630        type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1631
1632        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1633            any_valid_dev_gateway()
1634                .prop_map(|(storage, _, private_key, address)| {
1635                    Gateway::new(
1636                        Account::try_from(private_key).unwrap(),
1637                        storage.clone(),
1638                        storage.ledger().clone(),
1639                        address.ip(),
1640                        &[],
1641                        address.port(),
1642                    )
1643                    .unwrap()
1644                })
1645                .boxed()
1646        }
1647    }
1648
1649    type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1650
1651    fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1652        (any::<CommitteeContext>(), any::<Selector>())
1653            .prop_flat_map(|(context, account_selector)| {
1654                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1655                (
1656                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1657                    Just(context),
1658                    Just(account_selector.select(validators)),
1659                    0u8..,
1660                )
1661                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1662            })
1663            .boxed()
1664    }
1665
1666    fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1667        (any::<CommitteeContext>(), any::<Selector>())
1668            .prop_flat_map(|(context, account_selector)| {
1669                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1670                (
1671                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1672                    Just(context),
1673                    Just(account_selector.select(validators)),
1674                    any::<Option<SocketAddr>>(),
1675                )
1676                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1677            })
1678            .boxed()
1679    }
1680
1681    #[proptest]
1682    fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1683        let (storage, _, private_key, dev) = input;
1684        let account = Account::try_from(private_key).unwrap();
1685
1686        let gateway =
1687            Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
1688                .unwrap();
1689        let tcp_config = gateway.tcp().config();
1690        assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1691        assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1692
1693        let tcp_config = gateway.tcp().config();
1694        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1695        assert_eq!(gateway.account().address(), account.address());
1696    }
1697
1698    #[proptest]
1699    fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1700        let (storage, _, private_key, dev) = input;
1701        let account = Account::try_from(private_key).unwrap();
1702
1703        let gateway =
1704            Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
1705                .unwrap();
1706        let tcp_config = gateway.tcp().config();
1707        if let Some(socket_addr) = dev.ip() {
1708            assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1709            assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1710        } else {
1711            assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1712            assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1713        }
1714
1715        let tcp_config = gateway.tcp().config();
1716        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1717        assert_eq!(gateway.account().address(), account.address());
1718    }
1719
1720    #[proptest(async = "tokio")]
1721    async fn gateway_start(
1722        #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1723        #[strategy(0..MAX_WORKERS)] workers_count: u8,
1724    ) {
1725        let (storage, committee, private_key, dev) = input;
1726        let committee = committee.0;
1727        let worker_storage = storage.clone();
1728        let account = Account::try_from(private_key).unwrap();
1729
1730        let gateway =
1731            Gateway::new(account, storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();
1732
1733        let (primary_sender, _) = init_primary_channels();
1734
1735        let (workers, worker_senders) = {
1736            // Construct a map of the worker senders.
1737            let mut tx_workers = IndexMap::new();
1738            let mut workers = IndexMap::new();
1739
1740            // Initialize the workers.
1741            for id in 0..workers_count {
1742                // Construct the worker channels.
1743                let (tx_worker, rx_worker) = init_worker_channels();
1744                // Construct the worker instance.
1745                let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1746                let worker =
1747                    Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1748                        .unwrap();
1749                // Run the worker instance.
1750                worker.run(rx_worker);
1751
1752                // Add the worker and the worker sender to maps
1753                workers.insert(id, worker);
1754                tx_workers.insert(id, tx_worker);
1755            }
1756            (workers, tx_workers)
1757        };
1758
1759        gateway.run(primary_sender, worker_senders, None).await;
1760        assert_eq!(
1761            gateway.local_ip(),
1762            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1763        );
1764        assert_eq!(gateway.num_workers(), workers.len() as u8);
1765    }
1766
1767    #[proptest]
1768    fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1769        let rng = &mut TestRng::default();
1770
1771        // Initialize the round parameters.
1772        let current_round = 2;
1773        let committee_size = 4;
1774        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1775        let (_, _, private_key, dev) = input;
1776        let account = Account::try_from(private_key).unwrap();
1777
1778        // Sample the certificates.
1779        let mut certificates = IndexSet::new();
1780        for _ in 0..committee_size {
1781            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1782        }
1783        let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1784        // Initialize the committee.
1785        let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1786        // Sample extra certificates from non-committee members.
1787        for _ in 0..committee_size {
1788            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1789        }
1790        // Initialize the ledger.
1791        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1792        // Initialize the storage.
1793        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1794        // Initialize the gateway.
1795        let gateway =
1796            Gateway::new(account.clone(), storage.clone(), ledger.clone(), dev.ip(), &[], dev.port()).unwrap();
1797        // Insert certificate to the storage.
1798        for certificate in certificates.iter() {
1799            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1800        }
1801        // Check that the current committee members are authorized validators.
1802        for i in 0..certificates.clone().len() {
1803            let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1804            if i < committee_size {
1805                assert!(is_authorized);
1806            } else {
1807                assert!(!is_authorized);
1808            }
1809        }
1810    }
1811}