snarkos_node_bft/
gateway.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19    CONTEXT,
20    MAX_BATCH_DELAY_IN_MS,
21    MEMORY_POOL_PORT,
22    Worker,
23    events::{EventCodec, PrimaryPing},
24    helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
25    spawn_blocking,
26};
27use aleo_std::StorageMode;
28use snarkos_account::Account;
29use snarkos_node_bft_events::{
30    BlockRequest,
31    BlockResponse,
32    CertificateRequest,
33    CertificateResponse,
34    ChallengeRequest,
35    ChallengeResponse,
36    DataBlocks,
37    DisconnectReason,
38    Event,
39    EventTrait,
40    TransmissionRequest,
41    TransmissionResponse,
42    ValidatorsRequest,
43    ValidatorsResponse,
44};
45use snarkos_node_bft_ledger_service::LedgerService;
46use snarkos_node_network::{
47    ConnectionMode,
48    NodeType,
49    Peer,
50    PeerPoolHandling,
51    Resolver,
52    bootstrap_peers,
53    built_info,
54    log_repo_sha_comparison,
55};
56use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
57use snarkos_node_tcp::{
58    Config,
59    Connection,
60    ConnectionSide,
61    P2P,
62    Tcp,
63    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
64};
65use snarkvm::{
66    console::prelude::*,
67    ledger::{
68        committee::Committee,
69        narwhal::{BatchHeader, Data},
70    },
71    prelude::{Address, Field},
72};
73
74use colored::Colorize;
75use futures::SinkExt;
76use indexmap::IndexMap;
77#[cfg(feature = "locktick")]
78use locktick::parking_lot::{Mutex, RwLock};
79#[cfg(not(feature = "locktick"))]
80use parking_lot::{Mutex, RwLock};
81use rand::{
82    rngs::OsRng,
83    seq::{IteratorRandom, SliceRandom},
84};
85use std::{
86    collections::{HashMap, HashSet},
87    future::Future,
88    io,
89    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
90    sync::Arc,
91    time::Duration,
92};
93use tokio::{
94    net::TcpStream,
95    sync::{OnceCell, oneshot},
96    task::{self, JoinHandle},
97};
98use tokio_stream::StreamExt;
99use tokio_util::codec::Framed;
100
101/// The maximum interval of events to cache.
102const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
103/// The maximum interval of requests to cache.
104const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
105
106/// The maximum number of connection attempts in an interval.
107const MAX_CONNECTION_ATTEMPTS: usize = 10;
108/// The maximum interval to restrict a peer.
109const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
110
111/// The maximum number of validators to send in a validators response event.
112pub const MAX_VALIDATORS_TO_SEND: usize = 200;
113
114/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
115#[cfg(not(any(test)))]
116const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
117/// The amount of time an IP address is prohibited from connecting.
118const IP_BAN_TIME_IN_SECS: u64 = 300;
119
120/// The name of the file containing cached validators.
121const VALIDATOR_CACHE_FILENAME: &str = "cached_gateway_peers";
122
123/// Part of the Gateway API that deals with networking.
124/// This is a separate trait to allow for easier testing/mocking.
125#[async_trait]
126pub trait Transport<N: Network>: Send + Sync {
127    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
128    fn broadcast(&self, event: Event<N>);
129}
130
131/// The gateway maintains connections to other validators.
132/// For connections with clients and provers, the Router logic is used.
133#[derive(Clone)]
134pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
135
136impl<N: Network> Deref for Gateway<N> {
137    type Target = Arc<InnerGateway<N>>;
138
139    fn deref(&self) -> &Self::Target {
140        &self.0
141    }
142}
143
144pub struct InnerGateway<N: Network> {
145    /// The account of the node.
146    account: Account<N>,
147    /// The storage.
148    storage: Storage<N>,
149    /// The ledger service.
150    ledger: Arc<dyn LedgerService<N>>,
151    /// The TCP stack.
152    tcp: Tcp,
153    /// The cache.
154    cache: Cache<N>,
155    /// The resolver.
156    resolver: RwLock<Resolver<N>>,
157    /// The collection of both candidate and connected peers.
158    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
159    #[cfg(feature = "telemetry")]
160    validator_telemetry: Telemetry<N>,
161    /// The primary sender.
162    primary_sender: OnceCell<PrimarySender<N>>,
163    /// The worker senders.
164    worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
165    /// The sync sender.
166    sync_sender: OnceCell<SyncSender<N>>,
167    /// The spawned handles.
168    handles: Mutex<Vec<JoinHandle<()>>>,
169    /// The storage mode.
170    storage_mode: StorageMode,
171    /// If the flag is set, the node will only connect to trusted peers.
172    trusted_peers_only: bool,
173    /// The development mode.
174    dev: Option<u16>,
175}
176
177impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
178    const MAXIMUM_POOL_SIZE: usize = 200;
179    const OWNER: &str = CONTEXT;
180    const PEER_SLASHING_COUNT: usize = 20;
181
182    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
183        &self.peer_pool
184    }
185
186    fn resolver(&self) -> &RwLock<Resolver<N>> {
187        &self.resolver
188    }
189
190    fn is_dev(&self) -> bool {
191        self.dev.is_some()
192    }
193
194    fn trusted_peers_only(&self) -> bool {
195        self.trusted_peers_only
196    }
197
198    fn node_type(&self) -> NodeType {
199        NodeType::Validator
200    }
201}
202
203impl<N: Network> Gateway<N> {
204    /// Initializes a new gateway.
205    #[allow(clippy::too_many_arguments)]
206    pub fn new(
207        account: Account<N>,
208        storage: Storage<N>,
209        ledger: Arc<dyn LedgerService<N>>,
210        ip: Option<SocketAddr>,
211        trusted_validators: &[SocketAddr],
212        trusted_peers_only: bool,
213        storage_mode: StorageMode,
214        dev: Option<u16>,
215    ) -> Result<Self> {
216        // Initialize the gateway IP.
217        let ip = match (ip, dev) {
218            (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
219            (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
220            (Some(ip), _) => ip,
221        };
222        // Initialize the TCP stack.
223        let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
224
225        // Prepare the collection of the initial peers.
226        let mut initial_peers = HashMap::new();
227
228        // Load entries from the validator cache (if present and if we are not in trusted peers only mode).
229        if !trusted_peers_only {
230            let cached_peers = Self::load_cached_peers(&storage_mode, VALIDATOR_CACHE_FILENAME)?;
231            for addr in cached_peers {
232                initial_peers.insert(addr, Peer::new_candidate(addr, false));
233            }
234        }
235
236        // Add the trusted peers to the list of the initial peers; this may promote
237        // some of the cached validators to trusted ones.
238        initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
239
240        // Return the gateway.
241        Ok(Self(Arc::new(InnerGateway {
242            account,
243            storage,
244            ledger,
245            tcp,
246            cache: Default::default(),
247            resolver: Default::default(),
248            peer_pool: RwLock::new(initial_peers),
249            #[cfg(feature = "telemetry")]
250            validator_telemetry: Default::default(),
251            primary_sender: Default::default(),
252            worker_senders: Default::default(),
253            sync_sender: Default::default(),
254            handles: Default::default(),
255            storage_mode,
256            trusted_peers_only,
257            dev,
258        })))
259    }
260
261    /// Run the gateway.
262    pub async fn run(
263        &self,
264        primary_sender: PrimarySender<N>,
265        worker_senders: IndexMap<u8, WorkerSender<N>>,
266        sync_sender: Option<SyncSender<N>>,
267    ) {
268        debug!("Starting the gateway for the memory pool...");
269
270        // Set the primary sender.
271        self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
272
273        // Set the worker senders.
274        self.worker_senders.set(worker_senders).expect("The worker senders are already set");
275
276        // If the sync sender was provided, set the sync sender.
277        if let Some(sync_sender) = sync_sender {
278            self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
279        }
280
281        // Enable the TCP protocols.
282        self.enable_handshake().await;
283        self.enable_reading().await;
284        self.enable_writing().await;
285        self.enable_disconnect().await;
286        self.enable_on_connect().await;
287
288        // Enable the TCP listener. Note: This must be called after the above protocols.
289        let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
290        debug!("Listening for validator connections at address {listen_addr:?}");
291
292        // Initialize the heartbeat.
293        self.initialize_heartbeat();
294
295        info!("Started the gateway for the memory pool at '{}'", self.local_ip());
296    }
297}
298
299// Dynamic rate limiting.
300impl<N: Network> Gateway<N> {
301    /// The current maximum committee size.
302    fn max_committee_size(&self) -> usize {
303        self.ledger.current_committee().map_or_else(
304            |_e| Committee::<N>::max_committee_size().unwrap() as usize,
305            |committee| committee.num_members(),
306        )
307    }
308
309    /// The maximum number of events to cache.
310    fn max_cache_events(&self) -> usize {
311        self.max_cache_transmissions()
312    }
313
314    /// The maximum number of certificate requests to cache.
315    fn max_cache_certificates(&self) -> usize {
316        2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
317    }
318
319    /// The maximum number of transmission requests to cache.
320    fn max_cache_transmissions(&self) -> usize {
321        self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
322    }
323
324    /// The maximum number of duplicates for any particular request.
325    fn max_cache_duplicates(&self) -> usize {
326        self.max_committee_size().pow(2)
327    }
328}
329
330#[async_trait]
331impl<N: Network> CommunicationService for Gateway<N> {
332    /// The message type.
333    type Message = Event<N>;
334
335    /// Prepares a block request to be sent.
336    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
337        debug_assert!(start_height < end_height, "Invalid block request format");
338        Event::BlockRequest(BlockRequest { start_height, end_height })
339    }
340
341    /// Sends the given message to specified peer.
342    ///
343    /// This function returns as soon as the message is queued to be sent,
344    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
345    /// which can be used to determine when and whether the message has been delivered.
346    async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
347        Transport::send(self, peer_ip, message).await
348    }
349}
350
351impl<N: Network> Gateway<N> {
352    /// Returns the account of the node.
353    pub fn account(&self) -> &Account<N> {
354        &self.account
355    }
356
357    /// Returns the dev identifier of the node.
358    pub fn dev(&self) -> Option<u16> {
359        self.dev
360    }
361
362    /// Returns the resolver.
363    pub fn resolver(&self) -> &RwLock<Resolver<N>> {
364        &self.resolver
365    }
366
367    /// Returns the listener IP address from the (ambiguous) peer address.
368    pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
369        self.resolver.read().get_listener(*connected_addr)
370    }
371
372    /// Returns the validator telemetry.
373    #[cfg(feature = "telemetry")]
374    pub fn validator_telemetry(&self) -> &Telemetry<N> {
375        &self.validator_telemetry
376    }
377
378    /// Returns the primary sender.
379    pub fn primary_sender(&self) -> &PrimarySender<N> {
380        self.primary_sender.get().expect("Primary sender not set in gateway")
381    }
382
383    /// Returns the number of workers.
384    pub fn num_workers(&self) -> u8 {
385        u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
386            .expect("Too many workers")
387    }
388
389    /// Returns the worker sender for the given worker ID.
390    pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
391        self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
392    }
393
394    /// Returns `true` if the given peer IP is an authorized validator.
395    pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
396        // If the peer IP is in the trusted validators, return early.
397        if self.trusted_peers().contains(&ip) {
398            return true;
399        }
400        // Retrieve the Aleo address of the peer IP.
401        match self.resolve_to_aleo_addr(ip) {
402            // Determine if the peer IP is an authorized validator.
403            Some(address) => self.is_authorized_validator_address(address),
404            None => false,
405        }
406    }
407
408    /// Returns `true` if the given address is an authorized validator.
409    pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
410        // Determine if the validator address is a member of the committee lookback,
411        // the current committee, or the previous committee lookbacks.
412        // We allow leniency in this validation check in order to accommodate these two scenarios:
413        //  1. New validators should be able to connect immediately once bonded as a committee member.
414        //  2. Existing validators must remain connected until they are no longer bonded as a committee member.
415        //     (i.e. meaning they must stay online until the next block has been produced)
416
417        // Determine if the validator is in the current committee with lookback.
418        if self
419            .ledger
420            .get_committee_lookback_for_round(self.storage.current_round())
421            .is_ok_and(|committee| committee.is_committee_member(validator_address))
422        {
423            return true;
424        }
425
426        // Determine if the validator is in the latest committee on the ledger.
427        if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
428            return true;
429        }
430
431        // Retrieve the previous block height to consider from the sync tolerance.
432        let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
433        // Determine if the validator is in any of the previous committee lookbacks.
434        match self.ledger.get_block_round(previous_block_height) {
435            Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
436                self.ledger
437                    .get_committee_lookback_for_round(round)
438                    .is_ok_and(|committee| committee.is_committee_member(validator_address))
439            }),
440            Err(_) => false,
441        }
442    }
443
444    /// Returns the list of connected addresses.
445    pub fn connected_addresses(&self) -> HashSet<Address<N>> {
446        self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect()
447    }
448
449    /// Ensure the peer is allowed to connect.
450    fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
451        // Ensure the peer IP is not this node.
452        if self.is_local_ip(listener_addr) {
453            bail!("{CONTEXT} Dropping connection request from '{listener_addr}' (attempted to self-connect)");
454        }
455        // Ensure the peer is not spamming connection attempts.
456        if !listener_addr.ip().is_loopback() {
457            // Add this connection attempt and retrieve the number of attempts.
458            let num_attempts = self.cache.insert_inbound_connection(listener_addr.ip(), RESTRICTED_INTERVAL);
459            // Ensure the connecting peer has not surpassed the connection attempt limit.
460            if num_attempts > MAX_CONNECTION_ATTEMPTS {
461                bail!("Dropping connection request from '{listener_addr}' (tried {num_attempts} times)");
462            }
463        }
464        Ok(())
465    }
466
467    #[cfg(feature = "metrics")]
468    fn update_metrics(&self) {
469        metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64);
470        metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64);
471    }
472
473    /// Inserts the given peer into the connected peers. This is only used in testing.
474    #[cfg(test)]
475    pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
476        // Adds a bidirectional map between the listener address and (ambiguous) peer address.
477        self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
478        // Add a transmission for this peer in the connected peers.
479        self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
480        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
481            peer.upgrade_to_connected(
482                peer_addr,
483                peer_ip.port(),
484                address,
485                NodeType::Validator,
486                0,
487                ConnectionMode::Gateway,
488            );
489        }
490    }
491
492    /// Sends the given event to specified peer.
493    ///
494    /// This function returns as soon as the event is queued to be sent,
495    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
496    /// which can be used to determine when and whether the event has been delivered.
497    fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
498        // Resolve the listener IP to the (ambiguous) peer address.
499        let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
500            warn!("Unable to resolve the listener IP address '{peer_ip}'");
501            return None;
502        };
503        // Retrieve the event name.
504        let name = event.name();
505        // Send the event to the peer.
506        trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
507        let result = self.unicast(peer_addr, event);
508        // If the event was unable to be sent, disconnect.
509        if let Err(e) = &result {
510            warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
511            debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
512            self.disconnect(peer_ip);
513        }
514        result.ok()
515    }
516
517    /// Handles the inbound event from the peer. The returned value indicates whether
518    /// the connection is still active, and errors cause a disconnect once they are
519    /// propagated to the caller.
520    async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
521        // Retrieve the listener IP for the peer.
522        let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
523            // No longer connected to the peer.
524            trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
525            return Ok(false);
526        };
527        // Ensure that the peer is an authorized committee member or a bootstrapper.
528        if !(self.is_authorized_validator_ip(peer_ip)
529            || self
530                .get_connected_peer(peer_ip)
531                .map(|peer| peer.node_type == NodeType::BootstrapClient)
532                .unwrap_or(false))
533        {
534            bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
535        }
536        // Drop the peer, if they have exceeded the rate limit (i.e. they are requesting too much from us).
537        let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
538        if num_events >= self.max_cache_events() {
539            bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
540        }
541        // Rate limit for duplicate requests.
542        match event {
543            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
544                // Retrieve the certificate ID.
545                let certificate_id = match &event {
546                    Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
547                    Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
548                    _ => unreachable!(),
549                };
550                // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
551                let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
552                if num_events >= self.max_cache_duplicates() {
553                    return Ok(true);
554                }
555            }
556            Event::TransmissionRequest(TransmissionRequest { transmission_id })
557            | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
558                // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
559                let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
560                if num_events >= self.max_cache_duplicates() {
561                    return Ok(true);
562                }
563            }
564            Event::BlockRequest(_) => {
565                let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
566                if num_events >= self.max_cache_duplicates() {
567                    return Ok(true);
568                }
569            }
570            _ => {}
571        }
572        trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
573
574        // This match statement handles the inbound event by deserializing the event,
575        // checking the event is valid, and then calling the appropriate (trait) handler.
576        match event {
577            Event::BatchPropose(batch_propose) => {
578                // Send the batch propose to the primary.
579                let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
580                Ok(true)
581            }
582            Event::BatchSignature(batch_signature) => {
583                // Send the batch signature to the primary.
584                let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
585                Ok(true)
586            }
587            Event::BatchCertified(batch_certified) => {
588                // Send the batch certificate to the primary.
589                let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
590                Ok(true)
591            }
592            Event::BlockRequest(block_request) => {
593                let BlockRequest { start_height, end_height } = block_request;
594
595                // Ensure the block request is well-formed.
596                if start_height >= end_height {
597                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
598                }
599                // Ensure that the block request is within the allowed bounds.
600                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
601                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
602                }
603
604                // End height is exclusive.
605                let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
606
607                let self_ = self.clone();
608                let blocks = match task::spawn_blocking(move || {
609                    // Retrieve the blocks within the requested range.
610                    match self_.ledger.get_blocks(start_height..end_height) {
611                        Ok(blocks) => Ok(DataBlocks(blocks)),
612                        Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
613                    }
614                })
615                .await
616                {
617                    Ok(Ok(blocks)) => blocks,
618                    Ok(Err(error)) => return Err(error),
619                    Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
620                };
621
622                let self_ = self.clone();
623                tokio::spawn(async move {
624                    // Send the `BlockResponse` message to the peer.
625                    let event =
626                        Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
627                    Transport::send(&self_, peer_ip, event).await;
628                });
629                Ok(true)
630            }
631            Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
632                // Process the block response. Except for some tests, there is always a sync sender.
633                if let Some(sync_sender) = self.sync_sender.get() {
634                    // Check the response corresponds to a request.
635                    if !self.cache.remove_outbound_block_request(peer_ip, &request) {
636                        bail!("Unsolicited block response from '{peer_ip}'")
637                    }
638
639                    // Perform the deferred non-blocking deserialization of the blocks.
640                    // The deserialization can take a long time (minutes). We should not be running
641                    // this on a blocking task, but on a rayon thread pool.
642                    let (send, recv) = tokio::sync::oneshot::channel();
643                    rayon::spawn_fifo(move || {
644                        let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
645                        let _ = send.send(blocks);
646                    });
647                    let blocks = match recv.await {
648                        Ok(Ok(blocks)) => blocks,
649                        Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
650                        Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
651                    };
652
653                    // Ensure the block response is well-formed.
654                    blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
655                    // Send the blocks to the sync module.
656                    if let Err(err) =
657                        sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await
658                    {
659                        warn!("Unable to process block response from '{peer_ip}' - {err}");
660                    }
661                }
662                Ok(true)
663            }
664            Event::CertificateRequest(certificate_request) => {
665                // Send the certificate request to the sync module.
666                // Except for some tests, there is always a sync sender.
667                if let Some(sync_sender) = self.sync_sender.get() {
668                    // Send the certificate request to the sync module.
669                    let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
670                }
671                Ok(true)
672            }
673            Event::CertificateResponse(certificate_response) => {
674                // Send the certificate response to the sync module.
675                // Except for some tests, there is always a sync sender.
676                if let Some(sync_sender) = self.sync_sender.get() {
677                    // Send the certificate response to the sync module.
678                    let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
679                }
680                Ok(true)
681            }
682            Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
683                // Disconnect as the peer is not following the protocol.
684                bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
685            }
686            Event::Disconnect(message) => {
687                // The peer informs us that they had disconnected. Disconnect from them too.
688                debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
689                self.disconnect(peer_ip);
690                Ok(false)
691            }
692            Event::PrimaryPing(ping) => {
693                let PrimaryPing { version, block_locators, primary_certificate } = ping;
694
695                // Ensure the event version is not outdated.
696                if version < Event::<N>::VERSION {
697                    bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
698                }
699
700                // Update the peer locators. Except for some tests, there is always a sync sender.
701                if let Some(sync_sender) = self.sync_sender.get() {
702                    // Check the block locators are valid, and update the validators in the sync module.
703                    if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
704                        bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
705                    }
706                }
707
708                // Send the batch certificates to the primary.
709                let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
710                Ok(true)
711            }
712            Event::TransmissionRequest(request) => {
713                // TODO (howardwu): Add rate limiting checks on this event, on a per-peer basis.
714                // Determine the worker ID.
715                let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
716                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
717                    return Ok(true);
718                };
719                // Send the transmission request to the worker.
720                if let Some(sender) = self.get_worker_sender(worker_id) {
721                    // Send the transmission request to the worker.
722                    let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
723                }
724                Ok(true)
725            }
726            Event::TransmissionResponse(response) => {
727                // Determine the worker ID.
728                let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
729                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
730                    return Ok(true);
731                };
732                // Send the transmission response to the worker.
733                if let Some(sender) = self.get_worker_sender(worker_id) {
734                    // Send the transmission response to the worker.
735                    let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
736                }
737                Ok(true)
738            }
739            Event::ValidatorsRequest(_) => {
740                let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
741                connected_peers.shuffle(&mut rand::thread_rng());
742
743                let self_ = self.clone();
744                tokio::spawn(async move {
745                    // Initialize the validators.
746                    let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
747                    // Iterate over the validators.
748                    for validator in connected_peers.into_iter() {
749                        // Add the validator to the list of validators.
750                        validators.insert(validator.listener_addr, validator.aleo_addr);
751                    }
752                    // Send the validators response to the peer.
753                    let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
754                    Transport::send(&self_, peer_ip, event).await;
755                });
756                Ok(true)
757            }
758            Event::ValidatorsResponse(response) => {
759                if self.trusted_peers_only {
760                    bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
761                }
762                let ValidatorsResponse { validators } = response;
763                // Ensure the number of validators is not too large.
764                ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
765                // Ensure the cache contains a validators request for this peer.
766                if !self.cache.contains_outbound_validators_request(peer_ip) {
767                    bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
768                }
769                // Decrement the number of validators requests for this peer.
770                self.cache.decrement_outbound_validators_requests(peer_ip);
771
772                // Add valid validators as candidates to the peer pool; only validator-related
773                // filters need to be applied, the rest is handled by `PeerPoolHandling`.
774                let valid_addrs = validators
775                    .into_iter()
776                    .filter_map(|(listener_addr, aleo_addr)| {
777                        (self.account.address() != aleo_addr
778                            && !self.is_connected_address(aleo_addr)
779                            && self.is_authorized_validator_address(aleo_addr))
780                        .then_some((listener_addr, None))
781                    })
782                    .collect::<Vec<_>>();
783                if !valid_addrs.is_empty() {
784                    self.insert_candidate_peers(valid_addrs);
785                }
786
787                #[cfg(feature = "metrics")]
788                self.update_metrics();
789
790                Ok(true)
791            }
792            Event::WorkerPing(ping) => {
793                // Ensure the number of transmissions is not too large.
794                ensure!(
795                    ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
796                    "{CONTEXT} Received too many transmissions"
797                );
798                // Retrieve the number of workers.
799                let num_workers = self.num_workers();
800                // Iterate over the transmission IDs.
801                for transmission_id in ping.transmission_ids.into_iter() {
802                    // Determine the worker ID.
803                    let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
804                        warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
805                        continue;
806                    };
807                    // Send the transmission ID to the worker.
808                    if let Some(sender) = self.get_worker_sender(worker_id) {
809                        // Send the transmission ID to the worker.
810                        let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
811                    }
812                }
813                Ok(true)
814            }
815        }
816    }
817
818    /// Initialize a new instance of the heartbeat.
819    fn initialize_heartbeat(&self) {
820        let self_clone = self.clone();
821        self.spawn(async move {
822            // Sleep briefly to ensure the other nodes are ready to connect.
823            tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
824            info!("Starting the heartbeat of the gateway...");
825            loop {
826                // Process a heartbeat in the gateway.
827                self_clone.heartbeat().await;
828                // Sleep for the heartbeat interval.
829                tokio::time::sleep(Duration::from_secs(15)).await;
830            }
831        });
832    }
833
834    /// Spawns a task with the given future; it should only be used for long-running tasks.
835    #[allow(dead_code)]
836    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
837        self.handles.lock().push(tokio::spawn(future));
838    }
839
840    /// Shuts down the gateway.
841    pub async fn shut_down(&self) {
842        info!("Shutting down the gateway...");
843        // Save the best peers for future use.
844        if let Err(e) = self.save_best_peers(&self.storage_mode, VALIDATOR_CACHE_FILENAME, None) {
845            warn!("Failed to persist best validators to disk: {e}");
846        }
847        // Abort the tasks.
848        self.handles.lock().iter().for_each(|handle| handle.abort());
849        // Close the listener.
850        self.tcp.shut_down().await;
851    }
852}
853
854impl<N: Network> Gateway<N> {
855    /// Handles the heartbeat request.
856    async fn heartbeat(&self) {
857        // Log the connected validators.
858        self.log_connected_validators();
859        // Log the validator participation scores.
860        #[cfg(feature = "telemetry")]
861        self.log_participation_scores();
862        // Keep the trusted validators connected.
863        self.handle_trusted_validators();
864        // Keep the bootstrap peers within the allowed range.
865        self.handle_bootstrap_peers().await;
866        // Removes any validators that not in the current committee.
867        self.handle_unauthorized_validators();
868        // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
869        self.handle_min_connected_validators();
870        // Unban any addresses whose ban time has expired.
871        self.handle_banned_ips();
872    }
873
874    /// Logs the connected validators.
875    fn log_connected_validators(&self) {
876        // Retrieve the connected validators and current committee.
877        let connected_validators = self.connected_peers();
878        let committee = match self.ledger.current_committee() {
879            Ok(c) => c,
880            Err(err) => {
881                error!("Failed to get current committee: {err}");
882                return;
883            }
884        };
885
886        // Resolve the total number of connectable validators.
887        let validators_total = committee.num_members().saturating_sub(1);
888        // Format the total validators message.
889        let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
890        // Construct the connections message.
891        let connections_msg = match connected_validators.len() {
892            0 => "No connected validators".to_string(),
893            num_connected => format!("Connected to {num_connected} validators {total_validators}"),
894        };
895        // Collect the connected validator addresses.
896        let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
897        // Include our own address, so we do not log ourself as disconnected and include ourself in the check
898        // for the quorum threshold.
899        connected_validator_addresses.insert(self.account.address());
900
901        // Log the connected validators and count the total connected stake.
902        info!("{connections_msg}");
903        for peer_ip in &connected_validators {
904            let address = self.resolve_to_aleo_addr(*peer_ip).map_or("Unknown".to_string(), |a| {
905                connected_validator_addresses.insert(a);
906                a.to_string()
907            });
908            debug!("{}", format!("  {peer_ip} - {address}").dimmed());
909        }
910
911        // Log the validators that are not connected.
912        let num_not_connected = validators_total.saturating_sub(connected_validators.len());
913        if num_not_connected > 0 {
914            info!("Not connected to {num_not_connected} validators {total_validators}");
915            // Collect the committee members.
916            let committee_members: HashSet<_> =
917                self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
918
919            // Log the validators that are not connected.
920            for address in committee_members.difference(&connected_validator_addresses) {
921                debug!("{}", format!("  Not connected to {address}").dimmed());
922            }
923        }
924
925        if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
926            error!("Not connected to a quorum of validators");
927        }
928    }
929
930    // Logs the validator participation scores.
931    #[cfg(feature = "telemetry")]
932    fn log_participation_scores(&self) {
933        if let Ok(current_committee) = self.ledger.current_committee() {
934            // Retrieve the participation scores.
935            let participation_scores = self.validator_telemetry().get_participation_scores(&current_committee);
936            // Log the participation scores.
937            debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
938            for (address, score) in participation_scores {
939                debug!("{}", format!("  {address} - {score:.2}%").dimmed());
940            }
941        }
942    }
943
944    /// This function attempts to connect to any disconnected trusted validators.
945    fn handle_trusted_validators(&self) {
946        // Ensure that the trusted nodes are connected.
947        for validator_ip in &self.trusted_peers() {
948            // Attempt to connect to the trusted validator.
949            self.connect(*validator_ip);
950        }
951    }
952
953    /// This function keeps the number of bootstrap peers within the allowed range.
954    async fn handle_bootstrap_peers(&self) {
955        // Return early if we are in trusted peers only mode.
956        if self.trusted_peers_only {
957            return;
958        }
959        // Split the bootstrap peers into connected and candidate lists.
960        let mut candidate_bootstrap = Vec::new();
961        let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
962        for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
963            if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
964                candidate_bootstrap.push(bootstrap_ip);
965            }
966        }
967        // If there are not enough connected bootstrap peers, connect to more.
968        if connected_bootstrap.is_empty() {
969            // Initialize an RNG.
970            let rng = &mut OsRng;
971            // Attempt to connect to a bootstrap peer.
972            if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
973                match self.connect(peer_ip) {
974                    Some(hdl) => {
975                        let result = hdl.await;
976                        if let Err(err) = result {
977                            warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
978                        }
979                    }
980                    None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
981                }
982            }
983        }
984        // Determine if the node is connected to more bootstrap peers than allowed.
985        let num_surplus = connected_bootstrap.len().saturating_sub(1);
986        if num_surplus > 0 {
987            // Initialize an RNG.
988            let rng = &mut OsRng;
989            // Proceed to send disconnect requests to these bootstrap peers.
990            for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
991                info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
992                <Self as Transport<N>>::send(
993                    self,
994                    peer.listener_addr,
995                    Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
996                )
997                .await;
998                // Disconnect from this peer.
999                self.disconnect(peer.listener_addr);
1000            }
1001        }
1002    }
1003
1004    /// This function attempts to disconnect any validators that are not in the current committee.
1005    fn handle_unauthorized_validators(&self) {
1006        let self_ = self.clone();
1007        tokio::spawn(async move {
1008            // Retrieve the connected validators.
1009            let validators = self_.get_connected_peers();
1010            // Iterate over the validator IPs.
1011            for peer in validators {
1012                // Skip bootstrapper peers.
1013                if peer.node_type == NodeType::BootstrapClient {
1014                    continue;
1015                }
1016                // Disconnect any validator that is not in the current committee.
1017                if !self_.is_authorized_validator_ip(peer.listener_addr) {
1018                    warn!(
1019                        "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1020                        peer.listener_addr
1021                    );
1022                    Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1023                    // Disconnect from this peer.
1024                    self_.disconnect(peer.listener_addr);
1025                }
1026            }
1027        });
1028    }
1029
1030    /// This function sends a `ValidatorsRequest` to a random validator,
1031    /// if the number of connected validators is less than the minimum.
1032    /// It also attempts to connect to known unconnected validators.
1033    fn handle_min_connected_validators(&self) {
1034        // Attempt to connect to untrusted validators we're not connected to yet.
1035        // The trusted ones are already handled by `handle_trusted_validators`.
1036        let trusted_validators = self.trusted_peers();
1037        if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
1038            for peer in self.get_candidate_peers() {
1039                if !trusted_validators.contains(&peer.listener_addr) {
1040                    // Attempt to connect to unconnected validators.
1041                    self.connect(peer.listener_addr);
1042                }
1043            }
1044
1045            // Retrieve the connected validators.
1046            let validators = self.connected_peers();
1047            // If there are no validator IPs to connect to, return early.
1048            if validators.is_empty() {
1049                return;
1050            }
1051            // Select a random validator IP.
1052            if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1053                let self_ = self.clone();
1054                tokio::spawn(async move {
1055                    // Increment the number of outbound validators requests for this validator.
1056                    self_.cache.increment_outbound_validators_requests(validator_ip);
1057                    // Send a `ValidatorsRequest` to the validator.
1058                    let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1059                });
1060            }
1061        }
1062    }
1063
1064    /// Processes a message received from the network.
1065    async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1066        // Process the message. Disconnect if the peer violated the protocol.
1067        if let Err(error) = self.inbound(peer_addr, message).await {
1068            if let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) {
1069                warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1070                let self_ = self.clone();
1071                tokio::spawn(async move {
1072                    Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1073                    // Disconnect from this peer.
1074                    self_.disconnect(peer_ip);
1075                });
1076            }
1077        }
1078    }
1079
1080    // Remove addresses whose ban time has expired.
1081    fn handle_banned_ips(&self) {
1082        self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1083    }
1084}
1085
1086#[async_trait]
1087impl<N: Network> Transport<N> for Gateway<N> {
1088    /// Sends the given event to specified peer.
1089    ///
1090    /// This method is rate limited to prevent spamming the peer.
1091    ///
1092    /// This function returns as soon as the event is queued to be sent,
1093    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
1094    /// which can be used to determine when and whether the event has been delivered.
1095    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1096        macro_rules! send {
1097            ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1098                // Rate limit the number of certificate requests sent to the peer.
1099                while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1100                    // Sleep for a short period of time to allow the cache to clear.
1101                    tokio::time::sleep(Duration::from_millis(10)).await;
1102                }
1103                // Send the event to the peer.
1104                $self.send_inner(peer_ip, event)
1105            }};
1106        }
1107
1108        // Increment the cache for certificate, transmission and block events.
1109        match event {
1110            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1111                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1112                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1113                // Send the event to the peer.
1114                send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1115            }
1116            Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1117                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1118                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1119                // Send the event to the peer.
1120                send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1121            }
1122            Event::BlockRequest(request) => {
1123                // Insert the outbound request so we can match it to responses.
1124                self.cache.insert_outbound_block_request(peer_ip, request);
1125                // Send the event to the peer and update the outbound event cache, use the general rate limit.
1126                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1127            }
1128            _ => {
1129                // Send the event to the peer, use the general rate limit.
1130                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1131            }
1132        }
1133    }
1134
1135    /// Broadcasts the given event to all connected peers.
1136    // TODO(ljedrz): the event should be checked for the presence of Data::Object, and
1137    // serialized in advance if it's there.
1138    fn broadcast(&self, event: Event<N>) {
1139        // Ensure there are connected peers.
1140        if self.number_of_connected_peers() > 0 {
1141            let self_ = self.clone();
1142            let connected_peers = self.connected_peers();
1143            tokio::spawn(async move {
1144                // Iterate through all connected peers.
1145                for peer_ip in connected_peers {
1146                    // Send the event to the peer.
1147                    let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1148                }
1149            });
1150        }
1151    }
1152}
1153
1154impl<N: Network> P2P for Gateway<N> {
1155    /// Returns a reference to the TCP instance.
1156    fn tcp(&self) -> &Tcp {
1157        &self.tcp
1158    }
1159}
1160
1161#[async_trait]
1162impl<N: Network> Reading for Gateway<N> {
1163    type Codec = EventCodec<N>;
1164    type Message = Event<N>;
1165
1166    /// Creates a [`Decoder`] used to interpret messages from the network.
1167    /// The `side` param indicates the connection side **from the node's perspective**.
1168    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1169        Default::default()
1170    }
1171
1172    /// Processes a message received from the network.
1173    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1174        if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1175            let self_ = self.clone();
1176            // Handle BlockRequest and BlockResponse messages in a separate task to not block the
1177            // inbound queue.
1178            tokio::spawn(async move {
1179                self_.process_message_inner(peer_addr, message).await;
1180            });
1181        } else {
1182            self.process_message_inner(peer_addr, message).await;
1183        }
1184        Ok(())
1185    }
1186
1187    /// Computes the depth of per-connection queues used to process inbound messages, sufficient to process the maximum expected load at any givent moment.
1188    /// The greater it is, the more inbound messages the node can enqueue, but a too large value can make the node more susceptible to DoS attacks.
1189    fn message_queue_depth(&self) -> usize {
1190        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1191            * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1192            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1193    }
1194}
1195
1196#[async_trait]
1197impl<N: Network> Writing for Gateway<N> {
1198    type Codec = EventCodec<N>;
1199    type Message = Event<N>;
1200
1201    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
1202    /// The `side` parameter indicates the connection side **from the node's perspective**.
1203    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1204        Default::default()
1205    }
1206
1207    /// Computes the depth of per-connection queues used to send outbound messages, sufficient to process the maximum expected load at any givent moment.
1208    /// The greater it is, the more outbound messages the node can enqueue. A too large value large value might obscure potential issues with your implementation
1209    /// (like slow serialization) or network.
1210    fn message_queue_depth(&self) -> usize {
1211        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1212            * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1213            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1214    }
1215}
1216
1217#[async_trait]
1218impl<N: Network> Disconnect for Gateway<N> {
1219    /// Any extra operations to be performed during a disconnect.
1220    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1221        if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1222            self.downgrade_peer_to_candidate(peer_ip);
1223            // Remove the peer from the sync module. Except for some tests, there is always a sync sender.
1224            if let Some(sync_sender) = self.sync_sender.get() {
1225                let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
1226                tokio::spawn(async move {
1227                    if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
1228                        warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
1229                    }
1230                });
1231            }
1232            // We don't clear this map based on time but only on peer disconnect.
1233            // This is sufficient to avoid infinite growth as the committee has a fixed number
1234            // of members.
1235            self.cache.clear_outbound_validators_requests(peer_ip);
1236            self.cache.clear_outbound_block_requests(peer_ip);
1237            #[cfg(feature = "metrics")]
1238            self.update_metrics();
1239        }
1240    }
1241}
1242
1243#[async_trait]
1244impl<N: Network> OnConnect for Gateway<N> {
1245    async fn on_connect(&self, peer_addr: SocketAddr) {
1246        if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1247            if let Some(peer) = self.get_connected_peer(listener_addr) {
1248                if peer.node_type == NodeType::BootstrapClient {
1249                    let _ =
1250                        <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1251                            .await;
1252                }
1253            }
1254        }
1255    }
1256}
1257
1258#[async_trait]
1259impl<N: Network> Handshake for Gateway<N> {
1260    /// Performs the handshake protocol.
1261    async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1262        // Perform the handshake.
1263        let peer_addr = connection.addr();
1264        let peer_side = connection.side();
1265
1266        // Check (or impose) IP-level bans.
1267        #[cfg(not(any(test)))]
1268        if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1269            // If the IP is already banned reject the connection.
1270            if self.is_ip_banned(peer_addr.ip()) {
1271                trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1272                return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1273            }
1274
1275            let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1276
1277            debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1278            if num_attempts > MAX_CONNECTION_ATTEMPTS {
1279                self.update_ip_ban(peer_addr.ip());
1280                trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1281                return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1282            }
1283        }
1284
1285        let stream = self.borrow_stream(&mut connection);
1286
1287        // If this is an inbound connection, we log it, but don't know the listening address yet.
1288        // Otherwise, we can immediately register the listening address.
1289        let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1290            debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1291            None
1292        } else {
1293            debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1294            Some(peer_addr)
1295        };
1296
1297        // Retrieve the restrictions ID.
1298        let restrictions_id = self.ledger.latest_restrictions_id();
1299
1300        // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
1301        let handshake_result = if peer_side == ConnectionSide::Responder {
1302            self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1303        } else {
1304            self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1305        };
1306
1307        if let Some(addr) = listener_addr {
1308            match handshake_result {
1309                Ok(Some(ref cr)) => {
1310                    let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1311                        NodeType::BootstrapClient
1312                    } else {
1313                        NodeType::Validator
1314                    };
1315                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1316                        self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1317                        peer.upgrade_to_connected(
1318                            peer_addr,
1319                            cr.listener_port,
1320                            cr.address,
1321                            node_type,
1322                            cr.version,
1323                            ConnectionMode::Gateway,
1324                        );
1325                    }
1326                    #[cfg(feature = "metrics")]
1327                    self.update_metrics();
1328                    info!("{CONTEXT} Connected to '{addr}'");
1329                }
1330                Ok(None) => {
1331                    return Err(error(format!("Duplicate handshake attempt with '{addr}'")));
1332                }
1333                Err(error) => {
1334                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1335                        // The peer may only be downgraded if it's a ConnectingPeer.
1336                        if peer.is_connecting() {
1337                            peer.downgrade_to_candidate(addr);
1338                        }
1339                    }
1340                    // This error needs to be "repackaged" in order to conform to the return type.
1341                    return Err(error);
1342                }
1343            }
1344        }
1345
1346        Ok(connection)
1347    }
1348}
1349
1350/// A macro unwrapping the expected handshake event or returning an error for unexpected events.
1351macro_rules! expect_event {
1352    ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1353        match $framed.try_next().await? {
1354            // Received the expected event, proceed.
1355            Some($event_ty(data)) => {
1356                trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1357                data
1358            }
1359            // Received a disconnect event, abort.
1360            Some(Event::Disconnect(reason)) => {
1361                return Err(error(format!("{CONTEXT} '{}' disconnected: {reason:?}", $peer_addr)));
1362            }
1363            // Received an unexpected event, abort.
1364            Some(ty) => {
1365                return Err(error(format!(
1366                    "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1367                    $peer_addr,
1368                    ty.name(),
1369                    stringify!($event_ty),
1370                )))
1371            }
1372            // Received nothing.
1373            None => {
1374                return Err(error(format!(
1375                    "{CONTEXT} the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
1376                    stringify!($event_ty)
1377                )))
1378            }
1379        }
1380    };
1381}
1382
1383/// Send the given message to the peer.
1384async fn send_event<N: Network>(
1385    framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1386    peer_addr: SocketAddr,
1387    event: Event<N>,
1388) -> io::Result<()> {
1389    trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1390    framed.send(event).await
1391}
1392
1393impl<N: Network> Gateway<N> {
1394    /// The connection initiator side of the handshake.
1395    async fn handshake_inner_initiator<'a>(
1396        &'a self,
1397        peer_addr: SocketAddr,
1398        restrictions_id: Field<N>,
1399        stream: &'a mut TcpStream,
1400    ) -> io::Result<Option<ChallengeRequest<N>>> {
1401        // Introduce the peer into the peer pool.
1402        if !self.add_connecting_peer(peer_addr) {
1403            return Ok(None);
1404        }
1405
1406        // Construct the stream.
1407        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1408
1409        // Initialize an RNG.
1410        let rng = &mut rand::rngs::OsRng;
1411
1412        /* Step 1: Send the challenge request. */
1413
1414        // Sample a random nonce.
1415        let our_nonce = rng.r#gen();
1416        // Determine the snarkOS SHA to send to the peer.
1417        let current_block_height = self.ledger.latest_block_height();
1418        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1419        let snarkos_sha = (consensus_version >= ConsensusVersion::V12)
1420            .then(|| built_info::GIT_COMMIT_HASH.unwrap_or_default().into());
1421        // Send a challenge request to the peer.
1422        let our_request =
1423            ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha.clone());
1424        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1425
1426        /* Step 2: Receive the peer's challenge response followed by the challenge request. */
1427
1428        // Listen for the challenge response message.
1429        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1430        // Listen for the challenge request message.
1431        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1432
1433        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1434        if let Some(reason) = self
1435            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1436            .await
1437        {
1438            send_event(&mut framed, peer_addr, reason.into()).await?;
1439            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1440        }
1441        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1442        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1443            send_event(&mut framed, peer_addr, reason.into()).await?;
1444            if reason == DisconnectReason::NoReasonGiven {
1445                // The Aleo address is already connected; no reason to return an error.
1446                return Ok(None);
1447            } else {
1448                return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1449            }
1450        }
1451
1452        /* Step 3: Send the challenge response. */
1453
1454        // Sign the counterparty nonce.
1455        let response_nonce: u64 = rng.r#gen();
1456        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1457        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1458            return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1459        };
1460        // Send the challenge response.
1461        let our_response =
1462            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1463        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1464
1465        Ok(Some(peer_request))
1466    }
1467
1468    /// The connection responder side of the handshake.
1469    async fn handshake_inner_responder<'a>(
1470        &'a self,
1471        peer_addr: SocketAddr,
1472        peer_ip: &mut Option<SocketAddr>,
1473        restrictions_id: Field<N>,
1474        stream: &'a mut TcpStream,
1475    ) -> io::Result<Option<ChallengeRequest<N>>> {
1476        // Construct the stream.
1477        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1478
1479        /* Step 1: Receive the challenge request. */
1480
1481        // Listen for the challenge request message.
1482        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1483
1484        // Ensure the address is not the same as this node.
1485        if self.account.address() == peer_request.address {
1486            return Err(error("Skipping request to connect to self".to_string()));
1487        }
1488
1489        // Obtain the peer's listening address.
1490        *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1491        let peer_ip = peer_ip.unwrap();
1492
1493        // Knowing the peer's listening address, ensure it is allowed to connect.
1494        if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1495            return Err(error(format!("{forbidden_message}")));
1496        }
1497
1498        // Introduce the peer into the peer pool.
1499        if !self.add_connecting_peer(peer_ip) {
1500            return Ok(None);
1501        }
1502
1503        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1504        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1505            send_event(&mut framed, peer_addr, reason.into()).await?;
1506            if reason == DisconnectReason::NoReasonGiven {
1507                // The Aleo address is already connected; no reason to return an error.
1508                return Ok(None);
1509            } else {
1510                return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1511            }
1512        }
1513
1514        /* Step 2: Send the challenge response followed by own challenge request. */
1515
1516        // Initialize an RNG.
1517        let rng = &mut rand::rngs::OsRng;
1518
1519        // Sign the counterparty nonce.
1520        let response_nonce: u64 = rng.r#gen();
1521        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1522        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1523            return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1524        };
1525        // Send the challenge response.
1526        let our_response =
1527            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1528        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1529
1530        // Sample a random nonce.
1531        let our_nonce = rng.r#gen();
1532        // Determine the snarkOS SHA to send to the peer.
1533        let current_block_height = self.ledger.latest_block_height();
1534        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1535        let snarkos_sha = (consensus_version >= ConsensusVersion::V12)
1536            .then(|| built_info::GIT_COMMIT_HASH.unwrap_or_default().into());
1537        // Send the challenge request.
1538        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1539        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1540
1541        /* Step 3: Receive the challenge response. */
1542
1543        // Listen for the challenge response message.
1544        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1545        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1546        if let Some(reason) = self
1547            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1548            .await
1549        {
1550            send_event(&mut framed, peer_addr, reason.into()).await?;
1551            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
1552        }
1553
1554        Ok(Some(peer_request))
1555    }
1556
1557    /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
1558    fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1559        // Retrieve the components of the challenge request.
1560        let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
1561        log_repo_sha_comparison(peer_addr, snarkos_sha.as_ref(), CONTEXT);
1562
1563        let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1564
1565        // Ensure the event protocol version is not outdated.
1566        if version < Event::<N>::VERSION {
1567            warn!("{CONTEXT} Dropping '{peer_addr}' on version {version} (outdated)");
1568            return Some(DisconnectReason::OutdatedClientVersion);
1569        }
1570        // If the node is in trusted peers only mode, ensure the peer is trusted.
1571        if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1572            warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1573            return Some(DisconnectReason::ProtocolViolation);
1574        }
1575        if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1576            // Ensure the address is a current committee member.
1577            if !self.is_authorized_validator_address(address) {
1578                warn!("{CONTEXT} Dropping '{peer_addr}' for being an unauthorized validator ({address})");
1579                return Some(DisconnectReason::ProtocolViolation);
1580            }
1581        }
1582        // Ensure the address is not already connected.
1583        if self.is_connected_address(address) {
1584            warn!("{CONTEXT} Dropping '{peer_addr}' for being already connected ({address})");
1585            return Some(DisconnectReason::NoReasonGiven);
1586        }
1587        None
1588    }
1589
1590    /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
1591    async fn verify_challenge_response(
1592        &self,
1593        peer_addr: SocketAddr,
1594        peer_address: Address<N>,
1595        response: ChallengeResponse<N>,
1596        expected_restrictions_id: Field<N>,
1597        expected_nonce: u64,
1598    ) -> Option<DisconnectReason> {
1599        // Retrieve the components of the challenge response.
1600        let ChallengeResponse { restrictions_id, signature, nonce } = response;
1601
1602        // Verify the restrictions ID.
1603        if restrictions_id != expected_restrictions_id {
1604            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1605            return Some(DisconnectReason::InvalidChallengeResponse);
1606        }
1607        // Perform the deferred non-blocking deserialization of the signature.
1608        let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1609            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1610            return Some(DisconnectReason::InvalidChallengeResponse);
1611        };
1612        // Verify the signature.
1613        if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1614            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1615            return Some(DisconnectReason::InvalidChallengeResponse);
1616        }
1617        None
1618    }
1619}
1620
1621#[cfg(test)]
1622mod prop_tests {
1623    use crate::{
1624        Gateway,
1625        MAX_WORKERS,
1626        MEMORY_POOL_PORT,
1627        Worker,
1628        gateway::prop_tests::GatewayAddress::{Dev, Prod},
1629        helpers::{Storage, init_primary_channels, init_worker_channels},
1630    };
1631    use aleo_std::StorageMode;
1632    use snarkos_account::Account;
1633    use snarkos_node_bft_ledger_service::MockLedgerService;
1634    use snarkos_node_bft_storage_service::BFTMemoryService;
1635    use snarkos_node_network::PeerPoolHandling;
1636    use snarkos_node_tcp::P2P;
1637    use snarkvm::{
1638        ledger::{
1639            committee::{
1640                Committee,
1641                prop_tests::{CommitteeContext, ValidatorSet},
1642                test_helpers::sample_committee_for_round_and_members,
1643            },
1644            narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1645        },
1646        prelude::{MainnetV0, PrivateKey},
1647        utilities::TestRng,
1648    };
1649
1650    use indexmap::{IndexMap, IndexSet};
1651    use proptest::{
1652        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1653        sample::Selector,
1654    };
1655    use std::{
1656        fmt::{Debug, Formatter},
1657        net::{IpAddr, Ipv4Addr, SocketAddr},
1658        sync::Arc,
1659    };
1660    use test_strategy::proptest;
1661
1662    type CurrentNetwork = MainnetV0;
1663
1664    impl Debug for Gateway<CurrentNetwork> {
1665        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1666            // TODO implement Debug properly and move it over to production code
1667            f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1668        }
1669    }
1670
1671    #[derive(Debug, test_strategy::Arbitrary)]
1672    enum GatewayAddress {
1673        Dev(u8),
1674        Prod(Option<SocketAddr>),
1675    }
1676
1677    impl GatewayAddress {
1678        fn ip(&self) -> Option<SocketAddr> {
1679            if let GatewayAddress::Prod(ip) = self {
1680                return *ip;
1681            }
1682            None
1683        }
1684
1685        fn port(&self) -> Option<u16> {
1686            if let GatewayAddress::Dev(port) = self {
1687                return Some(*port as u16);
1688            }
1689            None
1690        }
1691    }
1692
1693    impl Arbitrary for Gateway<CurrentNetwork> {
1694        type Parameters = ();
1695        type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1696
1697        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1698            any_valid_dev_gateway()
1699                .prop_map(|(storage, _, private_key, address)| {
1700                    Gateway::new(
1701                        Account::try_from(private_key).unwrap(),
1702                        storage.clone(),
1703                        storage.ledger().clone(),
1704                        address.ip(),
1705                        &[],
1706                        false,
1707                        StorageMode::new_test(None),
1708                        address.port(),
1709                    )
1710                    .unwrap()
1711                })
1712                .boxed()
1713        }
1714    }
1715
1716    type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1717
1718    fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1719        (any::<CommitteeContext>(), any::<Selector>())
1720            .prop_flat_map(|(context, account_selector)| {
1721                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1722                (
1723                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1724                    Just(context),
1725                    Just(account_selector.select(validators)),
1726                    0u8..,
1727                )
1728                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1729            })
1730            .boxed()
1731    }
1732
1733    fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1734        (any::<CommitteeContext>(), any::<Selector>())
1735            .prop_flat_map(|(context, account_selector)| {
1736                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1737                (
1738                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1739                    Just(context),
1740                    Just(account_selector.select(validators)),
1741                    any::<Option<SocketAddr>>(),
1742                )
1743                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1744            })
1745            .boxed()
1746    }
1747
1748    #[proptest]
1749    fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1750        let (storage, _, private_key, dev) = input;
1751        let account = Account::try_from(private_key).unwrap();
1752
1753        let gateway = Gateway::new(
1754            account.clone(),
1755            storage.clone(),
1756            storage.ledger().clone(),
1757            dev.ip(),
1758            &[],
1759            false,
1760            StorageMode::new_test(None),
1761            dev.port(),
1762        )
1763        .unwrap();
1764        let tcp_config = gateway.tcp().config();
1765        assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1766        assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1767
1768        let tcp_config = gateway.tcp().config();
1769        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1770        assert_eq!(gateway.account().address(), account.address());
1771    }
1772
1773    #[proptest]
1774    fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1775        let (storage, _, private_key, dev) = input;
1776        let account = Account::try_from(private_key).unwrap();
1777
1778        let gateway = Gateway::new(
1779            account.clone(),
1780            storage.clone(),
1781            storage.ledger().clone(),
1782            dev.ip(),
1783            &[],
1784            false,
1785            StorageMode::new_test(None),
1786            dev.port(),
1787        )
1788        .unwrap();
1789        let tcp_config = gateway.tcp().config();
1790        if let Some(socket_addr) = dev.ip() {
1791            assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1792            assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1793        } else {
1794            assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1795            assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1796        }
1797
1798        let tcp_config = gateway.tcp().config();
1799        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1800        assert_eq!(gateway.account().address(), account.address());
1801    }
1802
1803    #[proptest(async = "tokio")]
1804    async fn gateway_start(
1805        #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1806        #[strategy(0..MAX_WORKERS)] workers_count: u8,
1807    ) {
1808        let (storage, committee, private_key, dev) = input;
1809        let committee = committee.0;
1810        let worker_storage = storage.clone();
1811        let account = Account::try_from(private_key).unwrap();
1812
1813        let gateway = Gateway::new(
1814            account,
1815            storage.clone(),
1816            storage.ledger().clone(),
1817            dev.ip(),
1818            &[],
1819            false,
1820            StorageMode::new_test(None),
1821            dev.port(),
1822        )
1823        .unwrap();
1824
1825        let (primary_sender, _) = init_primary_channels();
1826
1827        let (workers, worker_senders) = {
1828            // Construct a map of the worker senders.
1829            let mut tx_workers = IndexMap::new();
1830            let mut workers = IndexMap::new();
1831
1832            // Initialize the workers.
1833            for id in 0..workers_count {
1834                // Construct the worker channels.
1835                let (tx_worker, rx_worker) = init_worker_channels();
1836                // Construct the worker instance.
1837                let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1838                let worker =
1839                    Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1840                        .unwrap();
1841                // Run the worker instance.
1842                worker.run(rx_worker);
1843
1844                // Add the worker and the worker sender to maps
1845                workers.insert(id, worker);
1846                tx_workers.insert(id, tx_worker);
1847            }
1848            (workers, tx_workers)
1849        };
1850
1851        gateway.run(primary_sender, worker_senders, None).await;
1852        assert_eq!(
1853            gateway.local_ip(),
1854            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1855        );
1856        assert_eq!(gateway.num_workers(), workers.len() as u8);
1857    }
1858
1859    #[proptest]
1860    fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1861        let rng = &mut TestRng::default();
1862
1863        // Initialize the round parameters.
1864        let current_round = 2;
1865        let committee_size = 4;
1866        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1867        let (_, _, private_key, dev) = input;
1868        let account = Account::try_from(private_key).unwrap();
1869
1870        // Sample the certificates.
1871        let mut certificates = IndexSet::new();
1872        for _ in 0..committee_size {
1873            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1874        }
1875        let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1876        // Initialize the committee.
1877        let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1878        // Sample extra certificates from non-committee members.
1879        for _ in 0..committee_size {
1880            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1881        }
1882        // Initialize the ledger.
1883        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1884        // Initialize the storage.
1885        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1886        // Initialize the gateway.
1887        let gateway = Gateway::new(
1888            account.clone(),
1889            storage.clone(),
1890            ledger.clone(),
1891            dev.ip(),
1892            &[],
1893            false,
1894            StorageMode::new_test(None),
1895            dev.port(),
1896        )
1897        .unwrap();
1898        // Insert certificate to the storage.
1899        for certificate in certificates.iter() {
1900            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1901        }
1902        // Check that the current committee members are authorized validators.
1903        for i in 0..certificates.clone().len() {
1904            let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1905            if i < committee_size {
1906                assert!(is_authorized);
1907            } else {
1908                assert!(!is_authorized);
1909            }
1910        }
1911    }
1912}