snarkos_node_router/
outbound.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Router,
18    messages::{Message, Ping},
19};
20use snarkos_node_sync_locators::BlockLocators;
21use snarkos_node_tcp::protocols::Writing;
22use snarkvm::prelude::Network;
23use std::io;
24
25use std::net::SocketAddr;
26use tokio::sync::oneshot;
27
28pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
29    /// Returns a reference to the router.
30    fn router(&self) -> &Router<N>;
31
32    /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
33    fn is_block_synced(&self) -> bool;
34
35    /// Returns the number of blocks this node is behind the greatest peer height.
36    fn num_blocks_behind(&self) -> u32;
37
38    /// Sends a "Ping" message to the given peer.
39    fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) {
40        self.send(peer_ip, Message::Ping(Ping::new(self.router().node_type(), block_locators)));
41    }
42
43    /// Sends the given message to specified peer.
44    ///
45    /// This function returns as soon as the message is queued to be sent,
46    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
47    /// which can be used to determine when and whether the message has been delivered.
48    fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
49        // Determine whether to send the message.
50        if !self.can_send(peer_ip, &message) {
51            return None;
52        }
53        // Resolve the listener IP to the (ambiguous) peer address.
54        let peer_addr = match self.router().resolve_to_ambiguous(&peer_ip) {
55            Some(peer_addr) => peer_addr,
56            None => {
57                warn!("Unable to resolve the listener IP address '{peer_ip}'");
58                return None;
59            }
60        };
61        // If the message type is a block request, add it to the cache.
62        if let Message::BlockRequest(request) = message {
63            self.router().cache.insert_outbound_block_request(peer_ip, request);
64        }
65        // If the message type is a puzzle request, increment the cache.
66        if matches!(message, Message::PuzzleRequest(_)) {
67            self.router().cache.increment_outbound_puzzle_requests(peer_ip);
68        }
69        // If the message type is a peer request, increment the cache.
70        if matches!(message, Message::PeerRequest(_)) {
71            self.router().cache.increment_outbound_peer_requests(peer_ip);
72        }
73        // Retrieve the message name.
74        let name = message.name();
75        // Send the message to the peer.
76        trace!("Sending '{name}' to '{peer_ip}'");
77        let result = self.unicast(peer_addr, message);
78        // If the message was unable to be sent, disconnect.
79        if let Err(e) = &result {
80            warn!("Failed to send '{name}' to '{peer_ip}': {e}");
81            debug!("Disconnecting from '{peer_ip}' (unable to send)");
82            self.router().disconnect(peer_ip);
83        }
84        result.ok()
85    }
86
87    /// Sends the given message to every connected peer, excluding the sender and any specified peer IPs.
88    fn propagate(&self, message: Message<N>, excluded_peers: &[SocketAddr]) {
89        // TODO (howardwu): Serialize large messages once only.
90        // // Perform ahead-of-time, non-blocking serialization just once for applicable objects.
91        // if let Message::UnconfirmedSolution(ref mut message) = message {
92        //     if let Ok(serialized_solution) = Data::serialize(message.solution.clone()).await {
93        //         let _ = std::mem::replace(&mut message.solution, Data::Buffer(serialized_solution));
94        //     } else {
95        //         error!("Solution serialization is bugged");
96        //     }
97        // } else if let Message::UnconfirmedTransaction(ref mut message) = message {
98        //     if let Ok(serialized_transaction) = Data::serialize(message.transaction.clone()).await {
99        //         let _ = std::mem::replace(&mut message.transaction, Data::Buffer(serialized_transaction));
100        //     } else {
101        //         error!("Transaction serialization is bugged");
102        //     }
103        // }
104
105        // Prepare the peers to send to.
106        let connected_peers = self.router().connected_peers();
107        let peers = connected_peers.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip));
108
109        // Iterate through all peers that are not the sender and excluded peers.
110        for peer_ip in peers {
111            self.send(*peer_ip, message.clone());
112        }
113    }
114
115    /// Sends the given message to every connected validator, excluding the sender and any specified IPs.
116    fn propagate_to_validators(&self, message: Message<N>, excluded_peers: &[SocketAddr]) {
117        // TODO (howardwu): Serialize large messages once only.
118        // // Perform ahead-of-time, non-blocking serialization just once for applicable objects.
119        // if let Message::UnconfirmedSolution(ref mut message) = message {
120        //     if let Ok(serialized_solution) = Data::serialize(message.solution.clone()).await {
121        //         let _ = std::mem::replace(&mut message.solution, Data::Buffer(serialized_solution));
122        //     } else {
123        //         error!("Solution serialization is bugged");
124        //     }
125        // } else if let Message::UnconfirmedTransaction(ref mut message) = message {
126        //     if let Ok(serialized_transaction) = Data::serialize(message.transaction.clone()).await {
127        //         let _ = std::mem::replace(&mut message.transaction, Data::Buffer(serialized_transaction));
128        //     } else {
129        //         error!("Transaction serialization is bugged");
130        //     }
131        // }
132
133        // Prepare the peers to send to.
134        let connected_validators = self.router().connected_validators();
135        let peers = connected_validators.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip));
136
137        // Iterate through all validators that are not the sender and excluded validators.
138        for peer_ip in peers {
139            self.send(*peer_ip, message.clone());
140        }
141    }
142
143    /// Returns `true` if the message can be sent.
144    fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool {
145        // Ensure the peer is connected before sending.
146        if !self.router().is_connected(&peer_ip) {
147            warn!("Attempted to send to a non-connected peer {peer_ip}");
148            return false;
149        }
150        // Determine whether to send the message.
151        match message {
152            Message::UnconfirmedSolution(message) => {
153                // Update the timestamp for the unconfirmed solution.
154                let seen_before = self.router().cache.insert_outbound_solution(peer_ip, message.solution_id).is_some();
155                // Determine whether to send the solution.
156                !seen_before
157            }
158            Message::UnconfirmedTransaction(message) => {
159                // Update the timestamp for the unconfirmed transaction.
160                let seen_before =
161                    self.router().cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some();
162                // Determine whether to send the transaction.
163                !seen_before
164            }
165            // For all other message types, return `true`.
166            _ => true,
167        }
168    }
169}