snarkos_node_router/writing.rs
1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use super::*;
17
18use snarkos_node_sync_locators::BlockLocators;
19use snarkos_node_tcp::protocols::Writing;
20
21use std::io;
22use tokio::sync::oneshot;
23
24impl<N: Network> Router<N> {
25 /// Sends a "Ping" message to the given peer.
26 ///
27 /// Returns false if the peer does not exist or disconnected.
28 #[must_use]
29 pub fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) -> bool {
30 let result = self.send(peer_ip, Message::Ping(messages::Ping::new(self.node_type(), block_locators)));
31 result.is_some()
32 }
33
34 /// Sends the given message to specified peer.
35 ///
36 /// This function returns as soon as the message is queued to be sent,
37 /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
38 /// which can be used to determine when and whether the message has been delivered.
39 ///
40 /// This returns None, if the peer does not exist or disconnected.
41 pub fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
42 // Determine whether to send the message.
43 if !self.can_send(peer_ip, &message) {
44 return None;
45 }
46 // Resolve the listener IP to the (ambiguous) peer address.
47 let peer_addr = match self.resolve_to_ambiguous(&peer_ip) {
48 Some(peer_addr) => peer_addr,
49 None => {
50 warn!("Unable to resolve the listener IP address '{peer_ip}'");
51 return None;
52 }
53 };
54 // If the message type is a block request, add it to the cache.
55 if let Message::BlockRequest(request) = message {
56 self.cache.insert_outbound_block_request(peer_ip, request);
57 }
58 // If the message type is a puzzle request, increment the cache.
59 if matches!(message, Message::PuzzleRequest(_)) {
60 self.cache.increment_outbound_puzzle_requests(peer_ip);
61 }
62 // If the message type is a peer request, increment the cache.
63 if matches!(message, Message::PeerRequest(_)) {
64 self.cache.increment_outbound_peer_requests(peer_ip);
65 }
66 // Retrieve the message name.
67 let name = message.name();
68 // Send the message to the peer.
69 trace!("Sending '{name}' to '{peer_ip}'");
70 let result = self.unicast(peer_addr, message);
71 // If the message was unable to be sent, disconnect.
72 if let Err(e) = &result {
73 warn!("Failed to send '{name}' to '{peer_ip}': {e}");
74 debug!("Disconnecting from '{peer_ip}' (unable to send)");
75 self.disconnect(peer_ip);
76 }
77 result.ok()
78 }
79
80 /// Returns `true` if the message can be sent.
81 fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool {
82 // Ensure the peer is connected before sending.
83 if !self.is_connected(&peer_ip) {
84 warn!("Attempted to send to a non-connected peer {peer_ip}");
85 return false;
86 }
87 // Determine whether to send the message.
88 match message {
89 Message::UnconfirmedSolution(message) => {
90 // Update the timestamp for the unconfirmed solution.
91 let seen_before = self.cache.insert_outbound_solution(peer_ip, message.solution_id).is_some();
92 // Determine whether to send the solution.
93 !seen_before
94 }
95 Message::UnconfirmedTransaction(message) => {
96 // Update the timestamp for the unconfirmed transaction.
97 let seen_before = self.cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some();
98 // Determine whether to send the transaction.
99 !seen_before
100 }
101 // For all other message types, return `true`.
102 _ => true,
103 }
104 }
105}
106#[async_trait]
107impl<N: Network> Writing for Router<N> {
108 type Codec = MessageCodec<N>;
109 type Message = Message<N>;
110
111 /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
112 /// The `side` parameter indicates the connection side **from the node's perspective**.
113 fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
114 Default::default()
115 }
116}