Skip to main content

snarkos_node_bft/
gateway.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19    CONTEXT,
20    MAX_BATCH_DELAY_IN_MS,
21    MEMORY_POOL_PORT,
22    Worker,
23    events::{Disconnect as DisconnectEvent, DisconnectReason, EventCodec, PrimaryPing},
24    helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
25    spawn_blocking,
26};
27use snarkos_account::Account;
28use snarkos_node_bft_events::{
29    BlockRequest,
30    BlockResponse,
31    CertificateRequest,
32    CertificateResponse,
33    ChallengeRequest,
34    ChallengeResponse,
35    DataBlocks,
36    Event,
37    EventTrait,
38    TransmissionRequest,
39    TransmissionResponse,
40    ValidatorsRequest,
41    ValidatorsResponse,
42};
43use snarkos_node_bft_ledger_service::LedgerService;
44use snarkos_node_network::{
45    ConnectionMode,
46    NodeType,
47    Peer,
48    PeerPoolHandling,
49    Resolver,
50    bootstrap_peers,
51    get_repo_commit_hash,
52    log_repo_sha_comparison,
53};
54use snarkos_node_sync::{InsertBlockResponseError, MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
55use snarkos_node_tcp::{
56    Config,
57    Connection,
58    ConnectionSide,
59    P2P,
60    Tcp,
61    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
62};
63use snarkos_utilities::NodeDataDir;
64use snarkvm::{
65    console::prelude::*,
66    ledger::{
67        committee::Committee,
68        narwhal::{BatchHeader, Data},
69    },
70    prelude::{Address, Field},
71};
72
73use colored::Colorize;
74use futures::SinkExt;
75use indexmap::IndexMap;
76#[cfg(feature = "locktick")]
77use locktick::parking_lot::{Mutex, RwLock};
78#[cfg(not(feature = "locktick"))]
79use parking_lot::{Mutex, RwLock};
80use rand::{
81    rngs::OsRng,
82    seq::{IteratorRandom, SliceRandom},
83};
84use std::{
85    collections::{HashMap, HashSet},
86    future::Future,
87    io,
88    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
89    sync::Arc,
90    time::{Duration, Instant},
91};
92use tokio::{
93    net::TcpStream,
94    sync::{OnceCell, oneshot},
95    task::{self, JoinHandle},
96};
97use tokio_stream::StreamExt;
98use tokio_util::codec::Framed;
99
100/// The maximum interval of events to cache.
101const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
102/// The maximum interval of requests to cache.
103const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
104
105/// The maximum number of connection attempts in an interval.
106const MAX_CONNECTION_ATTEMPTS: usize = 10;
107/// The maximum interval to restrict a peer.
108const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
109
110/// The maximum number of validators to send in a validators response event.
111pub const MAX_VALIDATORS_TO_SEND: usize = 200;
112
113/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
114#[cfg(not(any(test)))]
115const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
116/// The amount of time an IP address is prohibited from connecting.
117const IP_BAN_TIME_IN_SECS: u64 = 300;
118
119/// Part of the Gateway API that deals with networking.
120/// This is a separate trait to allow for easier testing/mocking.
121#[async_trait]
122pub trait Transport<N: Network>: Send + Sync {
123    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
124    fn broadcast(&self, event: Event<N>);
125}
126
127/// The gateway maintains connections to other validators.
128/// For connections with clients and provers, the Router logic is used.
129#[derive(Clone)]
130pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
131
132impl<N: Network> Deref for Gateway<N> {
133    type Target = Arc<InnerGateway<N>>;
134
135    fn deref(&self) -> &Self::Target {
136        &self.0
137    }
138}
139
140pub struct InnerGateway<N: Network> {
141    /// The account of the node.
142    account: Account<N>,
143    /// The storage.
144    storage: Storage<N>,
145    /// The ledger service.
146    ledger: Arc<dyn LedgerService<N>>,
147    /// The TCP stack.
148    tcp: Tcp,
149    /// The cache.
150    cache: Cache<N>,
151    /// The resolver.
152    resolver: RwLock<Resolver<N>>,
153    /// The collection of both candidate and connected peers.
154    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
155    #[cfg(feature = "telemetry")]
156    validator_telemetry: Telemetry<N>,
157    /// The primary sender.
158    primary_sender: OnceCell<PrimarySender<N>>,
159    /// The worker senders.
160    worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
161    /// The sync sender.
162    sync_sender: OnceCell<SyncSender<N>>,
163    /// The spawned handles.
164    handles: Mutex<Vec<JoinHandle<()>>>,
165    /// The storage mode.
166    node_data_dir: NodeDataDir,
167    /// If the flag is set, the node will only connect to trusted peers.
168    trusted_peers_only: bool,
169    /// The development mode.
170    dev: Option<u16>,
171}
172
173impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
174    const MAXIMUM_POOL_SIZE: usize = 200;
175    const OWNER: &str = CONTEXT;
176    const PEER_SLASHING_COUNT: usize = 20;
177
178    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
179        &self.peer_pool
180    }
181
182    fn resolver(&self) -> &RwLock<Resolver<N>> {
183        &self.resolver
184    }
185
186    fn is_dev(&self) -> bool {
187        self.dev.is_some()
188    }
189
190    fn trusted_peers_only(&self) -> bool {
191        self.trusted_peers_only
192    }
193
194    fn node_type(&self) -> NodeType {
195        NodeType::Validator
196    }
197}
198
199impl<N: Network> Gateway<N> {
200    /// Initializes a new gateway.
201    #[allow(clippy::too_many_arguments)]
202    pub fn new(
203        account: Account<N>,
204        storage: Storage<N>,
205        ledger: Arc<dyn LedgerService<N>>,
206        ip: Option<SocketAddr>,
207        trusted_validators: &[SocketAddr],
208        trusted_peers_only: bool,
209        node_data_dir: NodeDataDir,
210        dev: Option<u16>,
211    ) -> Result<Self> {
212        // Initialize the gateway IP.
213        let ip = match (ip, dev) {
214            (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
215            (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
216            (Some(ip), _) => ip,
217        };
218        // Initialize the TCP stack.
219        let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
220
221        // Prepare the collection of the initial peers.
222        let mut initial_peers = HashMap::new();
223
224        // Load entries from the validator cache (if present and if we are not in trusted peers only mode).
225        if !trusted_peers_only {
226            let cached_peers = Self::load_cached_peers(&node_data_dir.gateway_peer_cache_path())?;
227            for addr in cached_peers {
228                initial_peers.insert(addr, Peer::new_candidate(addr, false));
229            }
230        }
231
232        // Add the trusted peers to the list of the initial peers; this may promote
233        // some of the cached validators to trusted ones.
234        initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
235
236        // Return the gateway.
237        Ok(Self(Arc::new(InnerGateway {
238            account,
239            storage,
240            ledger,
241            tcp,
242            cache: Default::default(),
243            resolver: Default::default(),
244            peer_pool: RwLock::new(initial_peers),
245            #[cfg(feature = "telemetry")]
246            validator_telemetry: Default::default(),
247            primary_sender: Default::default(),
248            worker_senders: Default::default(),
249            sync_sender: Default::default(),
250            handles: Default::default(),
251            node_data_dir,
252            trusted_peers_only,
253            dev,
254        })))
255    }
256
257    /// Run the gateway.
258    pub async fn run(
259        &self,
260        primary_sender: PrimarySender<N>,
261        worker_senders: IndexMap<u8, WorkerSender<N>>,
262        sync_sender: Option<SyncSender<N>>,
263    ) {
264        debug!("Starting the gateway for the memory pool...");
265
266        // Set the primary sender.
267        self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
268
269        // Set the worker senders.
270        self.worker_senders.set(worker_senders).expect("The worker senders are already set");
271
272        // If the sync sender was provided, set the sync sender.
273        if let Some(sync_sender) = sync_sender {
274            self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
275        }
276
277        // Enable the TCP protocols.
278        self.enable_handshake().await;
279        self.enable_reading().await;
280        self.enable_writing().await;
281        self.enable_disconnect().await;
282        self.enable_on_connect().await;
283
284        // Enable the TCP listener. Note: This must be called after the above protocols.
285        let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
286        debug!("Listening for validator connections at address {listen_addr:?}");
287
288        // Initialize the heartbeat.
289        self.initialize_heartbeat();
290
291        info!("Started the gateway for the memory pool at '{}'", self.local_ip());
292    }
293}
294
295// Dynamic rate limiting.
296impl<N: Network> Gateway<N> {
297    /// The current maximum committee size.
298    fn max_committee_size(&self) -> usize {
299        self.ledger.current_committee().map_or_else(
300            |_e| Committee::<N>::max_committee_size().unwrap() as usize,
301            |committee| committee.num_members(),
302        )
303    }
304
305    /// The maximum number of events to cache.
306    fn max_cache_events(&self) -> usize {
307        self.max_cache_transmissions()
308    }
309
310    /// The maximum number of certificate requests to cache.
311    fn max_cache_certificates(&self) -> usize {
312        2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
313    }
314
315    /// The maximum number of transmission requests to cache.
316    fn max_cache_transmissions(&self) -> usize {
317        self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
318    }
319
320    /// The maximum number of duplicates for any particular request.
321    fn max_cache_duplicates(&self) -> usize {
322        self.max_committee_size().pow(2)
323    }
324}
325
326#[async_trait]
327impl<N: Network> CommunicationService for Gateway<N> {
328    /// The message type.
329    type Message = Event<N>;
330
331    /// Prepares a block request to be sent.
332    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
333        debug_assert!(start_height < end_height, "Invalid block request format");
334        Event::BlockRequest(BlockRequest { start_height, end_height })
335    }
336
337    /// Sends the given message to specified peer.
338    ///
339    /// This function returns as soon as the message is queued to be sent,
340    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
341    /// which can be used to determine when and whether the message has been delivered.
342    async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
343        Transport::send(self, peer_ip, message).await
344    }
345}
346
347impl<N: Network> Gateway<N> {
348    /// Returns the account of the node.
349    pub fn account(&self) -> &Account<N> {
350        &self.account
351    }
352
353    /// Returns the dev identifier of the node.
354    pub fn dev(&self) -> Option<u16> {
355        self.dev
356    }
357
358    /// Returns the resolver.
359    pub fn resolver(&self) -> &RwLock<Resolver<N>> {
360        &self.resolver
361    }
362
363    /// Returns the listener IP address from the (ambiguous) peer address.
364    pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
365        self.resolver.read().get_listener(*connected_addr)
366    }
367
368    /// Returns the validator telemetry.
369    #[cfg(feature = "telemetry")]
370    pub fn validator_telemetry(&self) -> &Telemetry<N> {
371        &self.validator_telemetry
372    }
373
374    /// Returns the primary sender.
375    pub fn primary_sender(&self) -> &PrimarySender<N> {
376        self.primary_sender.get().expect("Primary sender not set in gateway")
377    }
378
379    /// Returns the number of workers.
380    pub fn num_workers(&self) -> u8 {
381        u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
382            .expect("Too many workers")
383    }
384
385    /// Returns the worker sender for the given worker ID.
386    pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
387        self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
388    }
389
390    /// Returns `true` if the given peer IP is an authorized validator.
391    pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
392        // If the peer IP is in the trusted validators, return early.
393        if self.trusted_peers().contains(&ip) {
394            return true;
395        }
396        // Retrieve the Aleo address of the peer IP.
397        match self.resolve_to_aleo_addr(ip) {
398            // Determine if the peer IP is an authorized validator.
399            Some(address) => self.is_authorized_validator_address(address),
400            None => false,
401        }
402    }
403
404    /// Returns `true` if the given address is an authorized validator.
405    pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
406        // Determine if the validator address is a member of the committee lookback,
407        // the current committee, or the previous committee lookbacks.
408        // We allow leniency in this validation check in order to accommodate these two scenarios:
409        //  1. New validators should be able to connect immediately once bonded as a committee member.
410        //  2. Existing validators must remain connected until they are no longer bonded as a committee member.
411        //     (i.e. meaning they must stay online until the next block has been produced)
412
413        // Determine if the validator is in the current committee with lookback.
414        if self
415            .ledger
416            .get_committee_lookback_for_round(self.storage.current_round())
417            .is_ok_and(|committee| committee.is_committee_member(validator_address))
418        {
419            return true;
420        }
421
422        // Determine if the validator is in the latest committee on the ledger.
423        if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
424            return true;
425        }
426
427        // Retrieve the previous block height to consider from the sync tolerance.
428        let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
429        // Determine if the validator is in any of the previous committee lookbacks.
430        match self.ledger.get_block_round(previous_block_height) {
431            Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
432                self.ledger
433                    .get_committee_lookback_for_round(round)
434                    .is_ok_and(|committee| committee.is_committee_member(validator_address))
435            }),
436            Err(_) => false,
437        }
438    }
439
440    /// Returns the list of connected addresses.
441    pub fn connected_addresses(&self) -> HashSet<Address<N>> {
442        self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect()
443    }
444
445    /// Ensure the peer is allowed to connect.
446    fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
447        // Ensure the peer IP is not this node.
448        if self.is_local_ip(listener_addr) {
449            bail!("{CONTEXT} Dropping connection request from '{listener_addr}' (attempted to self-connect)");
450        }
451        // Ensure the peer is not spamming connection attempts.
452        if !listener_addr.ip().is_loopback() {
453            // Add this connection attempt and retrieve the number of attempts.
454            let num_attempts = self.cache.insert_inbound_connection(listener_addr.ip(), RESTRICTED_INTERVAL);
455            // Ensure the connecting peer has not surpassed the connection attempt limit.
456            if num_attempts > MAX_CONNECTION_ATTEMPTS {
457                bail!("Dropping connection request from '{listener_addr}' (tried {num_attempts} times)");
458            }
459        }
460        Ok(())
461    }
462
463    #[cfg(feature = "metrics")]
464    fn update_metrics(&self) {
465        metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64);
466        metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64);
467    }
468
469    /// Inserts the given peer into the connected peers. This is only used in testing.
470    #[cfg(test)]
471    pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
472        // Adds a bidirectional map between the listener address and (ambiguous) peer address.
473        self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
474        // Add a transmission for this peer in the connected peers.
475        self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
476        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
477            peer.upgrade_to_connected(
478                peer_addr,
479                peer_ip.port(),
480                address,
481                NodeType::Validator,
482                0,
483                ConnectionMode::Gateway,
484            );
485        }
486    }
487
488    /// Sends the given event to specified peer.
489    ///
490    /// This function returns as soon as the event is queued to be sent,
491    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
492    /// which can be used to determine when and whether the event has been delivered.
493    fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
494        // Resolve the listener IP to the (ambiguous) peer address.
495        let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
496            warn!("Unable to resolve the listener IP address '{peer_ip}'");
497            return None;
498        };
499        // Retrieve the event name.
500        let name = event.name();
501        // Send the event to the peer.
502        trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
503        let result = self.unicast(peer_addr, event);
504        // If the event was unable to be sent, disconnect.
505        if let Err(e) = &result {
506            warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {e}");
507            debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
508            self.disconnect(peer_ip);
509        }
510        result.ok()
511    }
512
513    /// Handles the inbound event from the peer. The returned value indicates whether
514    /// the connection is still active, and errors cause a disconnect once they are
515    /// propagated to the caller.
516    async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
517        // Retrieve the listener IP for the peer.
518        let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
519            // No longer connected to the peer.
520            trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
521            return Ok(false);
522        };
523        // Ensure that the peer is an authorized committee member or a bootstrapper.
524        if !(self.is_authorized_validator_ip(peer_ip)
525            || self
526                .get_connected_peer(peer_ip)
527                .map(|peer| peer.node_type == NodeType::BootstrapClient)
528                .unwrap_or(false))
529        {
530            bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
531        }
532        // Drop the peer, if they have exceeded the rate limit (i.e. they are requesting too much from us).
533        let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
534        if num_events >= self.max_cache_events() {
535            bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
536        }
537        // Rate limit for duplicate requests.
538        match event {
539            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
540                // Retrieve the certificate ID.
541                let certificate_id = match &event {
542                    Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
543                    Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
544                    _ => unreachable!(),
545                };
546                // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
547                let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
548                if num_events >= self.max_cache_duplicates() {
549                    return Ok(true);
550                }
551            }
552            Event::TransmissionRequest(TransmissionRequest { transmission_id })
553            | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
554                // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
555                let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
556                if num_events >= self.max_cache_duplicates() {
557                    return Ok(true);
558                }
559            }
560            Event::BlockRequest(_) => {
561                let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
562                if num_events >= self.max_cache_duplicates() {
563                    return Ok(true);
564                }
565            }
566            _ => {}
567        }
568        trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
569
570        // This match statement handles the inbound event by deserializing the event,
571        // checking the event is valid, and then calling the appropriate (trait) handler.
572        match event {
573            Event::BatchPropose(batch_propose) => {
574                // Send the batch propose to the primary.
575                let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
576                Ok(true)
577            }
578            Event::BatchSignature(batch_signature) => {
579                // Send the batch signature to the primary.
580                let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
581                Ok(true)
582            }
583            Event::BatchCertified(batch_certified) => {
584                // Send the batch certificate to the primary.
585                let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
586                Ok(true)
587            }
588            Event::BlockRequest(block_request) => {
589                let BlockRequest { start_height, end_height } = block_request;
590
591                // Ensure the block request is well-formed.
592                if start_height >= end_height {
593                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
594                }
595                // Ensure that the block request is within the allowed bounds.
596                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
597                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
598                }
599
600                // End height is exclusive.
601                let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
602
603                let self_ = self.clone();
604                let blocks = match task::spawn_blocking(move || {
605                    // Retrieve the blocks within the requested range.
606                    match self_.ledger.get_blocks(start_height..end_height) {
607                        Ok(blocks) => Ok(DataBlocks(blocks)),
608                        Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
609                    }
610                })
611                .await
612                {
613                    Ok(Ok(blocks)) => blocks,
614                    Ok(Err(error)) => return Err(error),
615                    Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
616                };
617
618                let self_ = self.clone();
619                tokio::spawn(async move {
620                    // Send the `BlockResponse` message to the peer.
621                    let event =
622                        Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
623                    Transport::send(&self_, peer_ip, event).await;
624                });
625                Ok(true)
626            }
627            Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
628                // Process the block response. Except for some tests, there is always a sync sender.
629                if let Some(sync_sender) = self.sync_sender.get() {
630                    // Check the response corresponds to a request.
631                    if !self.cache.remove_outbound_block_request(peer_ip, &request) {
632                        bail!("Unsolicited block response from '{peer_ip}'")
633                    }
634
635                    // Perform the deferred non-blocking deserialization of the blocks.
636                    // The deserialization can take a long time (minutes). We should not be running
637                    // this on a blocking task, but on a rayon thread pool.
638                    let (send, recv) = tokio::sync::oneshot::channel();
639                    rayon::spawn_fifo(move || {
640                        let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
641                        let _ = send.send(blocks);
642                    });
643                    let blocks = match recv.await {
644                        Ok(Ok(blocks)) => blocks,
645                        Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
646                        Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
647                    };
648
649                    // Ensure the block response is well-formed.
650                    blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
651                    // Send the blocks to the sync module.
652                    match sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await {
653                        Ok(_) => Ok(true),
654                        Err(err @ InsertBlockResponseError::EmptyBlockResponse)
655                        | Err(err @ InsertBlockResponseError::NoConsensusVersion)
656                        | Err(err @ InsertBlockResponseError::ConsensusVersionMismatch { .. }) => {
657                            error!("Peer '{peer_ip}' sent an invalid block response - {err}");
658                            self.ip_ban_peer(peer_ip, Some(&err.to_string()));
659                            Err(err.into())
660                        }
661                        Err(err) => {
662                            warn!("Unable to process block response from '{peer_ip}' - {err}");
663                            Err(err.into())
664                        }
665                    }
666                } else {
667                    debug!("Ignoring block response from '{peer_ip}' - no sync sender");
668                    Ok(true)
669                }
670            }
671            Event::CertificateRequest(certificate_request) => {
672                // Send the certificate request to the sync module.
673                // Except for some tests, there is always a sync sender.
674                if let Some(sync_sender) = self.sync_sender.get() {
675                    // Send the certificate request to the sync module.
676                    let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
677                }
678                Ok(true)
679            }
680            Event::CertificateResponse(certificate_response) => {
681                // Send the certificate response to the sync module.
682                // Except for some tests, there is always a sync sender.
683                if let Some(sync_sender) = self.sync_sender.get() {
684                    // Send the certificate response to the sync module.
685                    let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
686                }
687                Ok(true)
688            }
689            Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
690                // Disconnect as the peer is not following the protocol.
691                bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
692            }
693            Event::Disconnect(message) => {
694                // The peer informs us that they had disconnected. Disconnect from them too.
695                debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
696                self.disconnect(peer_ip);
697                Ok(false)
698            }
699            Event::PrimaryPing(ping) => {
700                let PrimaryPing { version, block_locators, primary_certificate } = ping;
701
702                // Ensure the event version is not outdated.
703                if version < Event::<N>::VERSION {
704                    bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
705                }
706
707                // Log the validator's height.
708                debug!("Validator '{peer_ip}' is at height {}", block_locators.latest_locator_height());
709
710                // Update the peer locators. Except for some tests, there is always a sync sender.
711                if let Some(sync_sender) = self.sync_sender.get() {
712                    // Check the block locators are valid, and update the validators in the sync module.
713                    if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
714                        bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
715                    }
716                }
717
718                // Send the batch certificates to the primary.
719                let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
720                Ok(true)
721            }
722            Event::TransmissionRequest(request) => {
723                // TODO (howardwu): Add rate limiting checks on this event, on a per-peer basis.
724                // Determine the worker ID.
725                let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
726                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
727                    return Ok(true);
728                };
729                // Send the transmission request to the worker.
730                if let Some(sender) = self.get_worker_sender(worker_id) {
731                    // Send the transmission request to the worker.
732                    let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
733                }
734                Ok(true)
735            }
736            Event::TransmissionResponse(response) => {
737                // Determine the worker ID.
738                let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
739                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
740                    return Ok(true);
741                };
742                // Send the transmission response to the worker.
743                if let Some(sender) = self.get_worker_sender(worker_id) {
744                    // Send the transmission response to the worker.
745                    let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
746                }
747                Ok(true)
748            }
749            Event::ValidatorsRequest(_) => {
750                let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
751                connected_peers.shuffle(&mut rand::thread_rng());
752
753                let self_ = self.clone();
754                tokio::spawn(async move {
755                    // Initialize the validators.
756                    let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
757                    // Iterate over the validators.
758                    for validator in connected_peers.into_iter() {
759                        // Add the validator to the list of validators.
760                        validators.insert(validator.listener_addr, validator.aleo_addr);
761                    }
762                    // Send the validators response to the peer.
763                    let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
764                    Transport::send(&self_, peer_ip, event).await;
765                });
766                Ok(true)
767            }
768            Event::ValidatorsResponse(response) => {
769                if self.trusted_peers_only {
770                    bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
771                }
772                let ValidatorsResponse { validators } = response;
773                // Ensure the number of validators is not too large.
774                ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
775                // Ensure the cache contains a validators request for this peer.
776                if !self.cache.contains_outbound_validators_request(peer_ip) {
777                    bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
778                }
779                // Decrement the number of validators requests for this peer.
780                self.cache.decrement_outbound_validators_requests(peer_ip);
781
782                // Add valid validators as candidates to the peer pool; only validator-related
783                // filters need to be applied, the rest is handled by `PeerPoolHandling`.
784                let valid_addrs = validators
785                    .into_iter()
786                    .filter_map(|(listener_addr, aleo_addr)| {
787                        (self.account.address() != aleo_addr
788                            && !self.is_connected_address(aleo_addr)
789                            && self.is_authorized_validator_address(aleo_addr))
790                        .then_some((listener_addr, None))
791                    })
792                    .collect::<Vec<_>>();
793                if !valid_addrs.is_empty() {
794                    self.insert_candidate_peers(valid_addrs);
795                }
796
797                #[cfg(feature = "metrics")]
798                self.update_metrics();
799
800                Ok(true)
801            }
802            Event::WorkerPing(ping) => {
803                // Ensure the number of transmissions is not too large.
804                ensure!(
805                    ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
806                    "{CONTEXT} Received too many transmissions"
807                );
808                // Retrieve the number of workers.
809                let num_workers = self.num_workers();
810                // Iterate over the transmission IDs.
811                for transmission_id in ping.transmission_ids.into_iter() {
812                    // Determine the worker ID.
813                    let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
814                        warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
815                        continue;
816                    };
817                    // Send the transmission ID to the worker.
818                    if let Some(sender) = self.get_worker_sender(worker_id) {
819                        // Send the transmission ID to the worker.
820                        let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
821                    }
822                }
823                Ok(true)
824            }
825        }
826    }
827
828    /// Initialize a new instance of the heartbeat.
829    fn initialize_heartbeat(&self) {
830        let self_clone = self.clone();
831        self.spawn(async move {
832            let start = Instant::now();
833            // Sleep briefly to ensure the other nodes are ready to connect.
834            tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
835            info!("Starting the heartbeat of the gateway...");
836            loop {
837                // Process a heartbeat in the gateway.
838                let uptime = start.elapsed();
839                self_clone.heartbeat(uptime).await;
840                // Sleep for the heartbeat interval.
841                tokio::time::sleep(Duration::from_secs(15)).await;
842            }
843        });
844    }
845
846    /// Spawns a task with the given future; it should only be used for long-running tasks.
847    #[allow(dead_code)]
848    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
849        self.handles.lock().push(tokio::spawn(future));
850    }
851
852    /// Shuts down the gateway.
853    pub async fn shut_down(&self) {
854        info!("Shutting down the gateway...");
855        // Save the best peers for future use.
856        if let Err(e) = self.save_best_peers(&self.node_data_dir.gateway_peer_cache_path(), None, true) {
857            warn!("Failed to persist best validators to disk: {e}");
858        }
859        // Abort the tasks.
860        self.handles.lock().iter().for_each(|handle| handle.abort());
861        // Close the listener.
862        self.tcp.shut_down().await;
863    }
864}
865
866impl<N: Network> Gateway<N> {
867    /// The uptime after which nodes log a warning about missing validator connections.
868    const MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD: Duration = Duration::from_secs(60);
869
870    /// Handles the heartbeat request.
871    async fn heartbeat(&self, uptime: Duration) {
872        // Log the connected validators.
873        self.log_connected_validators(uptime);
874        // Log the validator participation scores.
875        #[cfg(feature = "telemetry")]
876        self.log_participation_scores();
877        // Keep the trusted validators connected.
878        self.handle_trusted_validators();
879        // Keep the bootstrap peers within the allowed range.
880        self.handle_bootstrap_peers().await;
881        // Removes any validators that not in the current committee.
882        self.handle_unauthorized_validators();
883        // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
884        self.handle_min_connected_validators();
885        // Unban any addresses whose ban time has expired.
886        self.handle_banned_ips();
887        // Update the dynamic validator whitelist.
888        self.update_validator_whitelist();
889    }
890
891    /// Logs the connected validators.
892    fn log_connected_validators(&self, uptime: Duration) {
893        // Retrieve the connected validators and current committee.
894        let connected_validators = self.connected_peers();
895        let committee = match self.ledger.current_committee() {
896            Ok(c) => c,
897            Err(err) => {
898                error!("Failed to get current committee: {err}");
899                return;
900            }
901        };
902
903        // Resolve the total number of connectable validators.
904        let validators_total = committee.num_members().saturating_sub(1);
905        // Format the total validators message.
906        let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
907        // Construct the connections message.
908        let connections_msg = match connected_validators.len() {
909            0 => "No connected validators".to_string(),
910            num_connected => format!("Connected to {num_connected} validators {total_validators}"),
911        };
912        info!("{connections_msg}");
913
914        // Collect the connected validator addresses and stake.
915        let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
916        // Include our own address.
917        connected_validator_addresses.insert(self.account.address());
918        // Include and log the connected validators.
919        for peer_ip in &connected_validators {
920            let address = self.resolve_to_aleo_addr(*peer_ip).map_or("Unknown".to_string(), |a| {
921                connected_validator_addresses.insert(a);
922                a.to_string()
923            });
924            debug!("{}", format!("  Connected to: {peer_ip} - {address}").dimmed());
925        }
926
927        // Log the validators that are not connected.
928        let num_not_connected = validators_total.saturating_sub(connected_validators.len());
929        if num_not_connected > 0 {
930            // Cache the total stake for computing percentages.
931            let total_stake = committee.total_stake();
932            let total_stake_f64 = total_stake as f64;
933
934            // Collect the committee members.
935            let committee_members: HashSet<_> =
936                self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
937
938            let not_connected_stake: u64 = committee_members
939                .difference(&connected_validator_addresses)
940                .map(|address| {
941                    let address_stake = committee.get_stake(*address);
942                    let address_stake_as_percentage =
943                        if total_stake == 0 { 0.0 } else { address_stake as f64 / total_stake_f64 * 100.0 };
944                    debug!(
945                        "{}",
946                        format!("  Not connected to {address} ({address_stake_as_percentage:.2}% of total stake)")
947                            .dimmed()
948                    );
949                    address_stake
950                })
951                .sum();
952
953            let not_connected_stake_as_percentage =
954                if total_stake == 0 { 0.0 } else { not_connected_stake as f64 / total_stake_f64 * 100.0 };
955            warn!(
956                "Not connected to {num_not_connected} validators {total_validators} ({not_connected_stake_as_percentage:.2}% of total stake not connected)"
957            );
958            #[cfg(feature = "metrics")]
959            {
960                let connected_stake_as_percentage = 100.0 - not_connected_stake_as_percentage;
961                metrics::gauge(metrics::bft::CONNECTED_STAKE, connected_stake_as_percentage);
962            }
963        } else {
964            #[cfg(feature = "metrics")]
965            metrics::gauge(metrics::bft::CONNECTED_STAKE, 100.0);
966        };
967
968        if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
969            // Not being connected to a quorum of validators is begning during startup.
970            if uptime > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
971                error!("Not connected to a quorum of validators");
972            } else {
973                debug!("Not connected to a quorum of validators");
974            }
975        }
976    }
977
978    // Logs the validator participation scores.
979    #[cfg(feature = "telemetry")]
980    fn log_participation_scores(&self) {
981        if let Ok(current_committee) = self.ledger.current_committee() {
982            // Retrieve the participation scores.
983            let participation_scores = self.validator_telemetry().get_participation_scores(&current_committee);
984            // Log the participation scores.
985            debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
986            for (address, score) in participation_scores {
987                debug!("{}", format!("  {address} - {score:.2}%").dimmed());
988            }
989        }
990    }
991
992    /// This function attempts to connect to any disconnected trusted validators.
993    fn handle_trusted_validators(&self) {
994        // Ensure that the trusted nodes are connected.
995        for validator_ip in &self.trusted_peers() {
996            // Attempt to connect to the trusted validator.
997            self.connect(*validator_ip);
998        }
999    }
1000
1001    /// This function keeps the number of bootstrap peers within the allowed range.
1002    async fn handle_bootstrap_peers(&self) {
1003        // Return early if we are in trusted peers only mode.
1004        if self.trusted_peers_only {
1005            return;
1006        }
1007        // Split the bootstrap peers into connected and candidate lists.
1008        let mut candidate_bootstrap = Vec::new();
1009        let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
1010        for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
1011            if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
1012                candidate_bootstrap.push(bootstrap_ip);
1013            }
1014        }
1015        // If there are not enough connected bootstrap peers, connect to more.
1016        if connected_bootstrap.is_empty() {
1017            // Initialize an RNG.
1018            let rng = &mut OsRng;
1019            // Attempt to connect to a bootstrap peer.
1020            if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
1021                match self.connect(peer_ip) {
1022                    Some(hdl) => {
1023                        let result = hdl.await;
1024                        if let Err(err) = result {
1025                            warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
1026                        }
1027                    }
1028                    None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
1029                }
1030            }
1031        }
1032        // Determine if the node is connected to more bootstrap peers than allowed.
1033        let num_surplus = connected_bootstrap.len().saturating_sub(1);
1034        if num_surplus > 0 {
1035            // Initialize an RNG.
1036            let rng = &mut OsRng;
1037            // Proceed to send disconnect requests to these bootstrap peers.
1038            for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
1039                info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
1040                <Self as Transport<N>>::send(
1041                    self,
1042                    peer.listener_addr,
1043                    Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
1044                )
1045                .await;
1046                // Disconnect from this peer.
1047                self.disconnect(peer.listener_addr);
1048            }
1049        }
1050    }
1051
1052    /// This function attempts to disconnect any validators that are not in the current committee.
1053    fn handle_unauthorized_validators(&self) {
1054        let self_ = self.clone();
1055        tokio::spawn(async move {
1056            // Retrieve the connected validators.
1057            let validators = self_.get_connected_peers();
1058            // Iterate over the validator IPs.
1059            for peer in validators {
1060                // Skip bootstrapper peers.
1061                if peer.node_type == NodeType::BootstrapClient {
1062                    continue;
1063                }
1064                // Disconnect any validator that is not in the current committee.
1065                if !self_.is_authorized_validator_ip(peer.listener_addr) {
1066                    warn!(
1067                        "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1068                        peer.listener_addr
1069                    );
1070                    Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1071                    // Disconnect from this peer.
1072                    self_.disconnect(peer.listener_addr);
1073                }
1074            }
1075        });
1076    }
1077
1078    /// This function sends a `ValidatorsRequest` to a random validator,
1079    /// if the number of connected validators is less than the minimum.
1080    /// It also attempts to connect to known unconnected validators.
1081    fn handle_min_connected_validators(&self) {
1082        // Attempt to connect to untrusted validators we're not connected to yet.
1083        // The trusted ones are already handled by `handle_trusted_validators`.
1084        let trusted_validators = self.trusted_peers();
1085        if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
1086            for peer in self.get_candidate_peers() {
1087                if !trusted_validators.contains(&peer.listener_addr) {
1088                    // Attempt to connect to unconnected validators.
1089                    self.connect(peer.listener_addr);
1090                }
1091            }
1092
1093            // Retrieve the connected validators.
1094            let validators = self.connected_peers();
1095            // If there are no validator IPs to connect to, return early.
1096            if validators.is_empty() {
1097                return;
1098            }
1099            // Select a random validator IP.
1100            if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1101                let self_ = self.clone();
1102                tokio::spawn(async move {
1103                    // Increment the number of outbound validators requests for this validator.
1104                    self_.cache.increment_outbound_validators_requests(validator_ip);
1105                    // Send a `ValidatorsRequest` to the validator.
1106                    let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1107                });
1108            }
1109        }
1110    }
1111
1112    /// Processes a message received from the network.
1113    async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1114        // Process the message. Disconnect if the peer violated the protocol.
1115        if let Err(error) = self.inbound(peer_addr, message).await {
1116            if let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) {
1117                warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1118                let self_ = self.clone();
1119                tokio::spawn(async move {
1120                    Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1121                    // Disconnect from this peer.
1122                    self_.disconnect(peer_ip);
1123                });
1124            }
1125        }
1126    }
1127
1128    // Remove addresses whose ban time has expired.
1129    fn handle_banned_ips(&self) {
1130        self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1131    }
1132
1133    // Update the dynamic validator whitelist.
1134    fn update_validator_whitelist(&self) {
1135        if let Err(e) =
1136            self.save_best_peers(&self.node_data_dir.validator_whitelist_path(), Some(MAX_VALIDATORS_TO_SEND), false)
1137        {
1138            warn!("Couldn't update the validator whitelist: {e}");
1139        }
1140    }
1141}
1142
1143#[async_trait]
1144impl<N: Network> Transport<N> for Gateway<N> {
1145    /// Sends the given event to specified peer.
1146    ///
1147    /// This method is rate limited to prevent spamming the peer.
1148    ///
1149    /// This function returns as soon as the event is queued to be sent,
1150    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
1151    /// which can be used to determine when and whether the event has been delivered.
1152    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1153        macro_rules! send {
1154            ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1155                // Rate limit the number of certificate requests sent to the peer.
1156                while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1157                    // Sleep for a short period of time to allow the cache to clear.
1158                    tokio::time::sleep(Duration::from_millis(10)).await;
1159                }
1160                // Send the event to the peer.
1161                $self.send_inner(peer_ip, event)
1162            }};
1163        }
1164
1165        // Increment the cache for certificate, transmission and block events.
1166        match event {
1167            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1168                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1169                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1170                // Send the event to the peer.
1171                send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1172            }
1173            Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1174                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1175                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1176                // Send the event to the peer.
1177                send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1178            }
1179            Event::BlockRequest(request) => {
1180                // Insert the outbound request so we can match it to responses.
1181                self.cache.insert_outbound_block_request(peer_ip, request);
1182                // Send the event to the peer and update the outbound event cache, use the general rate limit.
1183                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1184            }
1185            _ => {
1186                // Send the event to the peer, use the general rate limit.
1187                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1188            }
1189        }
1190    }
1191
1192    /// Broadcasts the given event to all connected peers.
1193    // TODO(ljedrz): the event should be checked for the presence of Data::Object, and
1194    // serialized in advance if it's there.
1195    fn broadcast(&self, event: Event<N>) {
1196        // Ensure there are connected peers.
1197        if self.number_of_connected_peers() > 0 {
1198            let self_ = self.clone();
1199            let connected_peers = self.connected_peers();
1200            tokio::spawn(async move {
1201                // Iterate through all connected peers.
1202                for peer_ip in connected_peers {
1203                    // Send the event to the peer.
1204                    let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1205                }
1206            });
1207        }
1208    }
1209}
1210
1211impl<N: Network> P2P for Gateway<N> {
1212    /// Returns a reference to the TCP instance.
1213    fn tcp(&self) -> &Tcp {
1214        &self.tcp
1215    }
1216}
1217
1218#[async_trait]
1219impl<N: Network> Reading for Gateway<N> {
1220    type Codec = EventCodec<N>;
1221    type Message = Event<N>;
1222
1223    /// Creates a [`Decoder`] used to interpret messages from the network.
1224    /// The `side` param indicates the connection side **from the node's perspective**.
1225    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1226        Default::default()
1227    }
1228
1229    /// Processes a message received from the network.
1230    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1231        if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1232            let self_ = self.clone();
1233            // Handle BlockRequest and BlockResponse messages in a separate task to not block the
1234            // inbound queue.
1235            tokio::spawn(async move {
1236                self_.process_message_inner(peer_addr, message).await;
1237            });
1238        } else {
1239            self.process_message_inner(peer_addr, message).await;
1240        }
1241        Ok(())
1242    }
1243
1244    /// Computes the depth of per-connection queues used to process inbound messages, sufficient to process the maximum expected load at any givent moment.
1245    /// The greater it is, the more inbound messages the node can enqueue, but a too large value can make the node more susceptible to DoS attacks.
1246    fn message_queue_depth(&self) -> usize {
1247        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1248            * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1249            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1250    }
1251}
1252
1253#[async_trait]
1254impl<N: Network> Writing for Gateway<N> {
1255    type Codec = EventCodec<N>;
1256    type Message = Event<N>;
1257
1258    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
1259    /// The `side` parameter indicates the connection side **from the node's perspective**.
1260    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1261        Default::default()
1262    }
1263
1264    /// Computes the depth of per-connection queues used to send outbound messages, sufficient to process the maximum expected load at any givent moment.
1265    /// The greater it is, the more outbound messages the node can enqueue. A too large value large value might obscure potential issues with your implementation
1266    /// (like slow serialization) or network.
1267    fn message_queue_depth(&self) -> usize {
1268        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1269            * N::LATEST_MAX_CERTIFICATES().unwrap() as usize
1270            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1271    }
1272}
1273
1274#[async_trait]
1275impl<N: Network> Disconnect for Gateway<N> {
1276    /// Any extra operations to be performed during a disconnect.
1277    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1278        if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1279            self.downgrade_peer_to_candidate(peer_ip);
1280            // Remove the peer from the sync module. Except for some tests, there is always a sync sender.
1281            if let Some(sync_sender) = self.sync_sender.get() {
1282                let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
1283                tokio::spawn(async move {
1284                    if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
1285                        warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
1286                    }
1287                });
1288            }
1289            // We don't clear this map based on time but only on peer disconnect.
1290            // This is sufficient to avoid infinite growth as the committee has a fixed number
1291            // of members.
1292            self.cache.clear_outbound_validators_requests(peer_ip);
1293            self.cache.clear_outbound_block_requests(peer_ip);
1294            #[cfg(feature = "metrics")]
1295            self.update_metrics();
1296        }
1297    }
1298}
1299
1300#[async_trait]
1301impl<N: Network> OnConnect for Gateway<N> {
1302    async fn on_connect(&self, peer_addr: SocketAddr) {
1303        if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1304            if let Some(peer) = self.get_connected_peer(listener_addr) {
1305                if peer.node_type == NodeType::BootstrapClient {
1306                    let _ =
1307                        <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1308                            .await;
1309                }
1310            }
1311        }
1312    }
1313}
1314
1315#[async_trait]
1316impl<N: Network> Handshake for Gateway<N> {
1317    /// Performs the handshake protocol.
1318    async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
1319        // Perform the handshake.
1320        let peer_addr = connection.addr();
1321        let peer_side = connection.side();
1322
1323        // Check (or impose) IP-level bans.
1324        #[cfg(not(any(test)))]
1325        if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1326            // If the IP is already banned reject the connection.
1327            if self.is_ip_banned(peer_addr.ip()) {
1328                trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1329                return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1330            }
1331
1332            let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1333
1334            debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1335            if num_attempts > MAX_CONNECTION_ATTEMPTS {
1336                self.update_ip_ban(peer_addr.ip());
1337                trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1338                return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1339            }
1340        }
1341
1342        let stream = self.borrow_stream(&mut connection);
1343
1344        // If this is an inbound connection, we log it, but don't know the listening address yet.
1345        // Otherwise, we can immediately register the listening address.
1346        let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1347            debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1348            None
1349        } else {
1350            debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1351            Some(peer_addr)
1352        };
1353
1354        // Retrieve the restrictions ID.
1355        let restrictions_id = self.ledger.latest_restrictions_id();
1356
1357        // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
1358        let handshake_result = if peer_side == ConnectionSide::Responder {
1359            self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1360        } else {
1361            self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1362        };
1363
1364        if let Some(addr) = listener_addr {
1365            match handshake_result {
1366                Ok(Some(ref cr)) => {
1367                    let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1368                        NodeType::BootstrapClient
1369                    } else {
1370                        NodeType::Validator
1371                    };
1372                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1373                        self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1374                        peer.upgrade_to_connected(
1375                            peer_addr,
1376                            cr.listener_port,
1377                            cr.address,
1378                            node_type,
1379                            cr.version,
1380                            ConnectionMode::Gateway,
1381                        );
1382                    }
1383                    #[cfg(feature = "metrics")]
1384                    self.update_metrics();
1385                    info!("{CONTEXT} Connected to '{addr}'");
1386                }
1387                Ok(None) => {
1388                    return Err(error(format!("Duplicate handshake attempt with '{addr}'")));
1389                }
1390                Err(error) => {
1391                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1392                        // The peer may only be downgraded if it's a ConnectingPeer.
1393                        if peer.is_connecting() {
1394                            peer.downgrade_to_candidate(addr);
1395                        }
1396                    }
1397                    // This error needs to be "repackaged" in order to conform to the return type.
1398                    return Err(error);
1399                }
1400            }
1401        }
1402
1403        Ok(connection)
1404    }
1405}
1406
1407/// A macro unwrapping the expected handshake event or returning an error for unexpected events.
1408macro_rules! expect_event {
1409    ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1410        match $framed.try_next().await? {
1411            // Received the expected event, proceed.
1412            Some($event_ty(data)) => {
1413                trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1414                data
1415            }
1416            // Received a disconnect event, abort.
1417            Some(Event::Disconnect(DisconnectEvent { reason })) => {
1418                return Err(error(format!("{CONTEXT} '{}' disconnected: {reason}", $peer_addr)));
1419            }
1420            // Received an unexpected event, abort.
1421            Some(ty) => {
1422                return Err(error(format!(
1423                    "{CONTEXT} '{}' did not follow the handshake protocol: received {:?} instead of {}",
1424                    $peer_addr,
1425                    ty.name(),
1426                    stringify!($event_ty),
1427                )))
1428            }
1429            // Received nothing.
1430            None => {
1431                return Err(error(format!(
1432                    "{CONTEXT} the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
1433                    stringify!($event_ty)
1434                )))
1435            }
1436        }
1437    };
1438}
1439
1440/// Send the given message to the peer.
1441async fn send_event<N: Network>(
1442    framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1443    peer_addr: SocketAddr,
1444    event: Event<N>,
1445) -> io::Result<()> {
1446    trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1447    framed.send(event).await
1448}
1449
1450impl<N: Network> Gateway<N> {
1451    /// The connection initiator side of the handshake.
1452    async fn handshake_inner_initiator<'a>(
1453        &'a self,
1454        peer_addr: SocketAddr,
1455        restrictions_id: Field<N>,
1456        stream: &'a mut TcpStream,
1457    ) -> io::Result<Option<ChallengeRequest<N>>> {
1458        // Introduce the peer into the peer pool.
1459        if !self.add_connecting_peer(peer_addr) {
1460            return Ok(None);
1461        }
1462
1463        // Construct the stream.
1464        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1465
1466        // Initialize an RNG.
1467        let rng = &mut rand::rngs::OsRng;
1468
1469        /* Step 1: Send the challenge request. */
1470
1471        // Sample a random nonce.
1472        let our_nonce = rng.r#gen();
1473        // Determine the snarkOS SHA to send to the peer.
1474        let current_block_height = self.ledger.latest_block_height();
1475        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1476        let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1477            (true, Some(sha)) => Some(sha),
1478            _ => None,
1479        };
1480        // Send a challenge request to the peer.
1481        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1482        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1483
1484        /* Step 2: Receive the peer's challenge response followed by the challenge request. */
1485
1486        // Listen for the challenge response message.
1487        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1488        // Listen for the challenge request message.
1489        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1490
1491        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1492        if let Some(reason) = self
1493            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1494            .await
1495        {
1496            send_event(&mut framed, peer_addr, reason.into()).await?;
1497            return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1498        }
1499        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1500        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1501            send_event(&mut framed, peer_addr, reason.into()).await?;
1502            if reason == DisconnectReason::NoReasonGiven {
1503                // The Aleo address is already connected; no reason to return an error.
1504                return Ok(None);
1505            } else {
1506                return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1507            }
1508        }
1509
1510        /* Step 3: Send the challenge response. */
1511
1512        // Sign the counterparty nonce.
1513        let response_nonce: u64 = rng.r#gen();
1514        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1515        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1516            return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1517        };
1518        // Send the challenge response.
1519        let our_response =
1520            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1521        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1522
1523        Ok(Some(peer_request))
1524    }
1525
1526    /// The connection responder side of the handshake.
1527    async fn handshake_inner_responder<'a>(
1528        &'a self,
1529        peer_addr: SocketAddr,
1530        peer_ip: &mut Option<SocketAddr>,
1531        restrictions_id: Field<N>,
1532        stream: &'a mut TcpStream,
1533    ) -> io::Result<Option<ChallengeRequest<N>>> {
1534        // Construct the stream.
1535        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1536
1537        /* Step 1: Receive the challenge request. */
1538
1539        // Listen for the challenge request message.
1540        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1541
1542        // Ensure the address is not the same as this node.
1543        if self.account.address() == peer_request.address {
1544            return Err(error("Skipping request to connect to self".to_string()));
1545        }
1546
1547        // Obtain the peer's listening address.
1548        *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1549        let peer_ip = peer_ip.unwrap();
1550
1551        // Knowing the peer's listening address, ensure it is allowed to connect.
1552        if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
1553            return Err(error(format!("{forbidden_message}")));
1554        }
1555
1556        // Introduce the peer into the peer pool.
1557        if !self.add_connecting_peer(peer_ip) {
1558            return Ok(None);
1559        }
1560
1561        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1562        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1563            send_event(&mut framed, peer_addr, reason.into()).await?;
1564            if reason == DisconnectReason::NoReasonGiven {
1565                // The Aleo address is already connected; no reason to return an error.
1566                return Ok(None);
1567            } else {
1568                return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1569            }
1570        }
1571
1572        /* Step 2: Send the challenge response followed by own challenge request. */
1573
1574        // Initialize an RNG.
1575        let rng = &mut rand::rngs::OsRng;
1576
1577        // Sign the counterparty nonce.
1578        let response_nonce: u64 = rng.r#gen();
1579        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1580        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1581            return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
1582        };
1583        // Send the challenge response.
1584        let our_response =
1585            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1586        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1587
1588        // Sample a random nonce.
1589        let our_nonce = rng.r#gen();
1590        // Determine the snarkOS SHA to send to the peer.
1591        let current_block_height = self.ledger.latest_block_height();
1592        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1593        let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1594            (true, Some(sha)) => Some(sha),
1595            _ => None,
1596        };
1597        // Send the challenge request.
1598        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1599        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1600
1601        /* Step 3: Receive the challenge response. */
1602
1603        // Listen for the challenge response message.
1604        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1605        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1606        if let Some(reason) = self
1607            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1608            .await
1609        {
1610            send_event(&mut framed, peer_addr, reason.into()).await?;
1611            return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
1612        }
1613
1614        Ok(Some(peer_request))
1615    }
1616
1617    /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
1618    fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1619        // Retrieve the components of the challenge request.
1620        let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
1621        log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
1622
1623        let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1624
1625        // Ensure the event protocol version is not outdated.
1626        if version < Event::<N>::VERSION {
1627            warn!("{CONTEXT} Dropping '{peer_addr}' on version {version} (outdated)");
1628            return Some(DisconnectReason::OutdatedClientVersion);
1629        }
1630        // If the node is in trusted peers only mode, ensure the peer is trusted.
1631        if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1632            warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1633            return Some(DisconnectReason::ProtocolViolation);
1634        }
1635        if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1636            // Ensure the address is a current committee member.
1637            if !self.is_authorized_validator_address(address) {
1638                warn!("{CONTEXT} Dropping '{peer_addr}' for being an unauthorized validator ({address})");
1639                return Some(DisconnectReason::ProtocolViolation);
1640            }
1641        }
1642        // Ensure the address is not already connected.
1643        if self.is_connected_address(address) {
1644            warn!("{CONTEXT} Dropping '{peer_addr}' for being already connected ({address})");
1645            return Some(DisconnectReason::NoReasonGiven);
1646        }
1647        None
1648    }
1649
1650    /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
1651    async fn verify_challenge_response(
1652        &self,
1653        peer_addr: SocketAddr,
1654        peer_address: Address<N>,
1655        response: ChallengeResponse<N>,
1656        expected_restrictions_id: Field<N>,
1657        expected_nonce: u64,
1658    ) -> Option<DisconnectReason> {
1659        // Retrieve the components of the challenge response.
1660        let ChallengeResponse { restrictions_id, signature, nonce } = response;
1661
1662        // Verify the restrictions ID.
1663        if restrictions_id != expected_restrictions_id {
1664            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1665            return Some(DisconnectReason::InvalidChallengeResponse);
1666        }
1667        // Perform the deferred non-blocking deserialization of the signature.
1668        let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1669            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1670            return Some(DisconnectReason::InvalidChallengeResponse);
1671        };
1672        // Verify the signature.
1673        if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1674            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1675            return Some(DisconnectReason::InvalidChallengeResponse);
1676        }
1677        None
1678    }
1679}
1680
1681#[cfg(test)]
1682mod prop_tests {
1683    use crate::{
1684        Gateway,
1685        MAX_WORKERS,
1686        MEMORY_POOL_PORT,
1687        Worker,
1688        gateway::prop_tests::GatewayAddress::{Dev, Prod},
1689        helpers::{Storage, init_primary_channels, init_worker_channels},
1690    };
1691
1692    use snarkos_account::Account;
1693    use snarkos_node_bft_ledger_service::MockLedgerService;
1694    use snarkos_node_bft_storage_service::BFTMemoryService;
1695    use snarkos_node_network::PeerPoolHandling;
1696    use snarkos_node_tcp::P2P;
1697    use snarkos_utilities::NodeDataDir;
1698
1699    use snarkvm::{
1700        ledger::{
1701            committee::{
1702                Committee,
1703                prop_tests::{CommitteeContext, ValidatorSet},
1704                test_helpers::sample_committee_for_round_and_members,
1705            },
1706            narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1707        },
1708        prelude::{MainnetV0, PrivateKey},
1709        utilities::TestRng,
1710    };
1711
1712    use indexmap::{IndexMap, IndexSet};
1713    use proptest::{
1714        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1715        sample::Selector,
1716    };
1717    use std::{
1718        fmt::{Debug, Formatter},
1719        net::{IpAddr, Ipv4Addr, SocketAddr},
1720        sync::Arc,
1721    };
1722    use test_strategy::proptest;
1723
1724    type CurrentNetwork = MainnetV0;
1725
1726    impl Debug for Gateway<CurrentNetwork> {
1727        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1728            // TODO implement Debug properly and move it over to production code
1729            f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1730        }
1731    }
1732
1733    #[derive(Debug, test_strategy::Arbitrary)]
1734    enum GatewayAddress {
1735        Dev(u8),
1736        Prod(Option<SocketAddr>),
1737    }
1738
1739    impl GatewayAddress {
1740        fn ip(&self) -> Option<SocketAddr> {
1741            if let GatewayAddress::Prod(ip) = self {
1742                return *ip;
1743            }
1744            None
1745        }
1746
1747        fn port(&self) -> Option<u16> {
1748            if let GatewayAddress::Dev(port) = self {
1749                return Some(*port as u16);
1750            }
1751            None
1752        }
1753    }
1754
1755    impl Arbitrary for Gateway<CurrentNetwork> {
1756        type Parameters = ();
1757        type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1758
1759        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1760            any_valid_dev_gateway()
1761                .prop_map(|(storage, _, private_key, address)| {
1762                    Gateway::new(
1763                        Account::try_from(private_key).unwrap(),
1764                        storage.clone(),
1765                        storage.ledger().clone(),
1766                        address.ip(),
1767                        &[],
1768                        false,
1769                        NodeDataDir::new_test(None),
1770                        address.port(),
1771                    )
1772                    .unwrap()
1773                })
1774                .boxed()
1775        }
1776    }
1777
1778    type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1779
1780    fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1781        (any::<CommitteeContext>(), any::<Selector>())
1782            .prop_flat_map(|(context, account_selector)| {
1783                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1784                (
1785                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1786                    Just(context),
1787                    Just(account_selector.select(validators)),
1788                    0u8..,
1789                )
1790                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, Dev(d)))
1791            })
1792            .boxed()
1793    }
1794
1795    fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1796        (any::<CommitteeContext>(), any::<Selector>())
1797            .prop_flat_map(|(context, account_selector)| {
1798                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1799                (
1800                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1801                    Just(context),
1802                    Just(account_selector.select(validators)),
1803                    any::<Option<SocketAddr>>(),
1804                )
1805                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, Prod(d)))
1806            })
1807            .boxed()
1808    }
1809
1810    #[proptest]
1811    fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1812        let (storage, _, private_key, dev) = input;
1813        let account = Account::try_from(private_key).unwrap();
1814
1815        let gateway = Gateway::new(
1816            account.clone(),
1817            storage.clone(),
1818            storage.ledger().clone(),
1819            dev.ip(),
1820            &[],
1821            false,
1822            NodeDataDir::new_test(None),
1823            dev.port(),
1824        )
1825        .unwrap();
1826        let tcp_config = gateway.tcp().config();
1827        assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1828        assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1829
1830        let tcp_config = gateway.tcp().config();
1831        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1832        assert_eq!(gateway.account().address(), account.address());
1833    }
1834
1835    #[proptest]
1836    fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1837        let (storage, _, private_key, dev) = input;
1838        let account = Account::try_from(private_key).unwrap();
1839
1840        let gateway = Gateway::new(
1841            account.clone(),
1842            storage.clone(),
1843            storage.ledger().clone(),
1844            dev.ip(),
1845            &[],
1846            false,
1847            NodeDataDir::new_test(None),
1848            dev.port(),
1849        )
1850        .unwrap();
1851        let tcp_config = gateway.tcp().config();
1852        if let Some(socket_addr) = dev.ip() {
1853            assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1854            assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1855        } else {
1856            assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1857            assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1858        }
1859
1860        let tcp_config = gateway.tcp().config();
1861        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
1862        assert_eq!(gateway.account().address(), account.address());
1863    }
1864
1865    #[proptest(async = "tokio")]
1866    async fn gateway_start(
1867        #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1868        #[strategy(0..MAX_WORKERS)] workers_count: u8,
1869    ) {
1870        let (storage, committee, private_key, dev) = input;
1871        let committee = committee.0;
1872        let worker_storage = storage.clone();
1873        let account = Account::try_from(private_key).unwrap();
1874
1875        let gateway = Gateway::new(
1876            account,
1877            storage.clone(),
1878            storage.ledger().clone(),
1879            dev.ip(),
1880            &[],
1881            false,
1882            NodeDataDir::new_test(None),
1883            dev.port(),
1884        )
1885        .unwrap();
1886
1887        let (primary_sender, _) = init_primary_channels();
1888
1889        let (workers, worker_senders) = {
1890            // Construct a map of the worker senders.
1891            let mut tx_workers = IndexMap::new();
1892            let mut workers = IndexMap::new();
1893
1894            // Initialize the workers.
1895            for id in 0..workers_count {
1896                // Construct the worker channels.
1897                let (tx_worker, rx_worker) = init_worker_channels();
1898                // Construct the worker instance.
1899                let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1900                let worker =
1901                    Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1902                        .unwrap();
1903                // Run the worker instance.
1904                worker.run(rx_worker);
1905
1906                // Add the worker and the worker sender to maps
1907                workers.insert(id, worker);
1908                tx_workers.insert(id, tx_worker);
1909            }
1910            (workers, tx_workers)
1911        };
1912
1913        gateway.run(primary_sender, worker_senders, None).await;
1914        assert_eq!(
1915            gateway.local_ip(),
1916            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
1917        );
1918        assert_eq!(gateway.num_workers(), workers.len() as u8);
1919    }
1920
1921    #[proptest]
1922    fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1923        let rng = &mut TestRng::default();
1924
1925        // Initialize the round parameters.
1926        let current_round = 2;
1927        let committee_size = 4;
1928        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1929        let (_, _, private_key, dev) = input;
1930        let account = Account::try_from(private_key).unwrap();
1931
1932        // Sample the certificates.
1933        let mut certificates = IndexSet::new();
1934        for _ in 0..committee_size {
1935            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1936        }
1937        let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
1938        // Initialize the committee.
1939        let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
1940        // Sample extra certificates from non-committee members.
1941        for _ in 0..committee_size {
1942            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
1943        }
1944        // Initialize the ledger.
1945        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1946        // Initialize the storage.
1947        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1948        // Initialize the gateway.
1949        let gateway = Gateway::new(
1950            account.clone(),
1951            storage.clone(),
1952            ledger.clone(),
1953            dev.ip(),
1954            &[],
1955            false,
1956            NodeDataDir::new_test(None),
1957            dev.port(),
1958        )
1959        .unwrap();
1960        // Insert certificate to the storage.
1961        for certificate in certificates.iter() {
1962            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1963        }
1964        // Check that the current committee members are authorized validators.
1965        for i in 0..certificates.clone().len() {
1966            let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
1967            if i < committee_size {
1968                assert!(is_authorized);
1969            } else {
1970                assert!(!is_authorized);
1971            }
1972        }
1973    }
1974}