1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright (C) 2019-2023 Aleo Systems Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::{
    messages::{Message, Ping},
    Router,
};
use snarkos_node_sync_locators::BlockLocators;
use snarkos_node_tcp::protocols::Writing;
use snarkvm::prelude::Network;
use std::io;

use std::net::SocketAddr;
use tokio::sync::oneshot;

pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
    /// Returns a reference to the router.
    fn router(&self) -> &Router<N>;

    /// Sends a "Ping" message to the given peer.
    fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) {
        self.send(peer_ip, Message::Ping(Ping::new(self.router().node_type(), block_locators)));
    }

    /// Sends the given message to specified peer.
    ///
    /// This function returns as soon as the message is queued to be sent,
    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
    /// which can be used to determine when and whether the message has been delivered.
    fn send(&self, peer_ip: SocketAddr, message: Message<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
        // Determine whether to send the message.
        if !self.can_send(peer_ip, &message) {
            return None;
        }
        // Resolve the listener IP to the (ambiguous) peer address.
        let peer_addr = match self.router().resolve_to_ambiguous(&peer_ip) {
            Some(peer_addr) => peer_addr,
            None => {
                warn!("Unable to resolve the listener IP address '{peer_ip}'");
                return None;
            }
        };
        // If the message type is a block request, add it to the cache.
        if let Message::BlockRequest(request) = message {
            self.router().cache.insert_outbound_block_request(peer_ip, request);
        }
        // If the message type is a puzzle request, increment the cache.
        if matches!(message, Message::PuzzleRequest(_)) {
            self.router().cache.increment_outbound_puzzle_requests(peer_ip);
        }
        // Retrieve the message name.
        let name = message.name();
        // Send the message to the peer.
        trace!("Sending '{name}' to '{peer_ip}'");
        let result = self.unicast(peer_addr, message);
        // If the message was unable to be sent, disconnect.
        if let Err(e) = &result {
            warn!("Failed to send '{name}' to '{peer_ip}': {e}");
            debug!("Disconnecting from '{peer_ip}' (unable to send)");
            self.router().disconnect(peer_ip);
        }
        result.ok()
    }

    /// Sends the given message to every connected peer, excluding the sender and any specified peer IPs.
    fn propagate(&self, message: Message<N>, excluded_peers: &[SocketAddr]) {
        // TODO (howardwu): Serialize large messages once only.
        // // Perform ahead-of-time, non-blocking serialization just once for applicable objects.
        // if let Message::UnconfirmedSolution(ref mut message) = message {
        //     if let Ok(serialized_solution) = Data::serialize(message.solution.clone()).await {
        //         let _ = std::mem::replace(&mut message.solution, Data::Buffer(serialized_solution));
        //     } else {
        //         error!("Solution serialization is bugged");
        //     }
        // } else if let Message::UnconfirmedTransaction(ref mut message) = message {
        //     if let Ok(serialized_transaction) = Data::serialize(message.transaction.clone()).await {
        //         let _ = std::mem::replace(&mut message.transaction, Data::Buffer(serialized_transaction));
        //     } else {
        //         error!("Transaction serialization is bugged");
        //     }
        // }

        // Prepare the peers to send to.
        let connected_peers = self.router().connected_peers();
        let peers = connected_peers.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip));

        // Iterate through all peers that are not the sender and excluded peers.
        for peer_ip in peers {
            self.send(*peer_ip, message.clone());
        }
    }

    /// Sends the given message to every connected validator, excluding the sender and any specified IPs.
    fn propagate_to_validators(&self, message: Message<N>, excluded_peers: &[SocketAddr]) {
        // TODO (howardwu): Serialize large messages once only.
        // // Perform ahead-of-time, non-blocking serialization just once for applicable objects.
        // if let Message::UnconfirmedSolution(ref mut message) = message {
        //     if let Ok(serialized_solution) = Data::serialize(message.solution.clone()).await {
        //         let _ = std::mem::replace(&mut message.solution, Data::Buffer(serialized_solution));
        //     } else {
        //         error!("Solution serialization is bugged");
        //     }
        // } else if let Message::UnconfirmedTransaction(ref mut message) = message {
        //     if let Ok(serialized_transaction) = Data::serialize(message.transaction.clone()).await {
        //         let _ = std::mem::replace(&mut message.transaction, Data::Buffer(serialized_transaction));
        //     } else {
        //         error!("Transaction serialization is bugged");
        //     }
        // }

        // Prepare the peers to send to.
        let connected_validators = self.router().connected_validators();
        let peers = connected_validators.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip));

        // Iterate through all validators that are not the sender and excluded validators.
        for peer_ip in peers {
            self.send(*peer_ip, message.clone());
        }
    }

    /// Returns `true` if the message can be sent.
    fn can_send(&self, peer_ip: SocketAddr, message: &Message<N>) -> bool {
        // Ensure the peer is connected before sending.
        if !self.router().is_connected(&peer_ip) {
            warn!("Attempted to send to a non-connected peer {peer_ip}");
            return false;
        }
        // Determine whether to send the message.
        match message {
            Message::UnconfirmedSolution(message) => {
                // Update the timestamp for the unconfirmed solution.
                let seen_before = self.router().cache.insert_outbound_solution(peer_ip, message.solution_id).is_some();
                // Determine whether to send the solution.
                !seen_before
            }
            Message::UnconfirmedTransaction(message) => {
                // Update the timestamp for the unconfirmed transaction.
                let seen_before =
                    self.router().cache.insert_outbound_transaction(peer_ip, message.transaction_id).is_some();
                // Determine whether to send the transaction.
                !seen_before
            }
            // For all other message types, return `true`.
            _ => true,
        }
    }
}