snarkos_node_router/
writing.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use super::*;
17
18use snarkos_node_sync_locators::BlockLocators;
19use snarkos_node_tcp::protocols::Writing;
20
21use std::io;
22use tokio::sync::oneshot;
23
24impl<N: Network> Router<N> {
25    /// Sends a "Ping" message to the given peer.
26    ///
27    /// Returns false if the peer does not exist or disconnected.
28    #[must_use]
29    pub fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) -> bool {
30        let result = self.send(peer_ip, Message::Ping(messages::Ping::new(self.node_type(), block_locators)));
31        result.is_some()
32    }
33
34    /// Sends the given message to specified peer.
35    ///
36    /// This function returns as soon as the message is queued to be sent,
37    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
38    /// which can be used to determine when and whether the message has been delivered.
39    ///
40    /// This returns None, if the peer does not exist or disconnected.
41    pub fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
42        // Determine whether to send the message.
43        if !self.can_send(peer_ip, &message) {
44            return None;
45        }
46        // Resolve the listener IP to the (ambiguous) peer address.
47        let peer_addr = match self.resolve_to_ambiguous(&peer_ip) {
48            Some(peer_addr) => peer_addr,
49            None => {
50                warn!("Unable to resolve the listener IP address '{peer_ip}'");
51                return None;
52            }
53        };
54        // If the message type is a block request, add it to the cache.
55        if let Message::BlockRequest(request) = message {
56            self.cache.insert_outbound_block_request(peer_ip, request);
57        }
58        // If the message type is a puzzle request, increment the cache.
59        if matches!(message, Message::PuzzleRequest(_)) {
60            self.cache.increment_outbound_puzzle_requests(peer_ip);
61        }
62        // If the message type is a peer request, increment the cache.
63        if matches!(message, Message::PeerRequest(_)) {
64            self.cache.increment_outbound_peer_requests(peer_ip);
65        }
66        // Retrieve the message name.
67        let name = message.name();
68        // Send the message to the peer.
69        trace!("Sending '{name}' to '{peer_ip}'");
70        let result = self.unicast(peer_addr, message);
71        // If the message was unable to be sent, disconnect.
72        if let Err(e) = &result {
73            warn!("Failed to send '{name}' to '{peer_ip}': {e}");
74            debug!("Disconnecting from '{peer_ip}' (unable to send)");
75            self.disconnect(peer_ip);
76        }
77        result.ok()
78    }
79
80    /// Returns `true` if the message can be sent.
81    fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool {
82        // Ensure the peer is connected before sending.
83        if !self.is_connected(&peer_ip) {
84            warn!("Attempted to send to a non-connected peer {peer_ip}");
85            return false;
86        }
87        // Determine whether to send the message.
88        match message {
89            Message::UnconfirmedSolution(message) => {
90                // Update the timestamp for the unconfirmed solution.
91                let seen_before = self.cache.insert_outbound_solution(peer_ip, message.solution_id).is_some();
92                // Determine whether to send the solution.
93                !seen_before
94            }
95            Message::UnconfirmedTransaction(message) => {
96                // Update the timestamp for the unconfirmed transaction.
97                let seen_before = self.cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some();
98                // Determine whether to send the transaction.
99                !seen_before
100            }
101            // For all other message types, return `true`.
102            _ => true,
103        }
104    }
105}
106#[async_trait]
107impl<N: Network> Writing for Router<N> {
108    type Codec = MessageCodec<N>;
109    type Message = Message<N>;
110
111    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
112    /// The `side` parameter indicates the connection side **from the node's perspective**.
113    fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
114        Default::default()
115    }
116}