1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::{stats, ConnWriter, Direction, Message, NetworkError, Node, Payload};
use snarkvm_objects::Storage;
use std::{collections::HashMap, net::SocketAddr};
use parking_lot::RwLock;
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};
type Channels = HashMap<SocketAddr, Sender<Message>>;
#[derive(Debug, Default)]
pub struct Outbound {
pub(crate) channels: RwLock<Channels>,
}
impl Outbound {
#[inline]
fn outbound_channel(&self, remote_address: SocketAddr) -> Result<Sender<Message>, NetworkError> {
Ok(self
.channels
.read()
.get(&remote_address)
.ok_or(NetworkError::OutboundChannelMissing)?
.clone())
}
}
impl<S: Storage + Send + Sync + 'static> Node<S> {
#[inline]
pub fn send_request(&self, request: Message) {
let target_addr = request.receiver();
match self.outbound.outbound_channel(target_addr) {
Ok(channel) => match channel.try_send(request) {
Ok(()) => {
metrics::increment_gauge!(stats::QUEUES_OUTBOUND, 1.0);
}
Err(TrySendError::Full(request)) => {
warn!(
"Couldn't send a {} to {}: the send channel is full",
request, target_addr
);
metrics::increment_counter!(stats::OUTBOUND_ALL_FAILURES);
}
Err(TrySendError::Closed(request)) => {
error!(
"Couldn't send a {} to {}: the send channel is closed",
request, target_addr
);
metrics::increment_counter!(stats::OUTBOUND_ALL_FAILURES);
}
},
Err(_) => {
warn!("Failed to send a {}: peer is disconnected", request);
metrics::increment_counter!(stats::OUTBOUND_ALL_FAILURES);
}
}
}
pub fn send_ping(&self, remote_address: SocketAddr) {
let current_block_height = if let Some(ref sync) = self.sync() {
sync.current_block_height()
} else {
0
};
self.peer_book.sending_ping(remote_address);
self.send_request(Message::new(
Direction::Outbound(remote_address),
Payload::Ping(current_block_height),
));
}
pub async fn listen_for_outbound_messages(&self, mut receiver: Receiver<Message>, writer: &mut ConnWriter) {
while let Some(message) = receiver.recv().await {
metrics::decrement_gauge!(stats::QUEUES_OUTBOUND, 1.0);
match writer.write_message(&message.payload).await {
Ok(_) => {
metrics::increment_counter!(stats::OUTBOUND_ALL_SUCCESSES);
}
Err(error) => {
warn!("Failed to send a {}: {}", message, error);
metrics::increment_counter!(stats::OUTBOUND_ALL_FAILURES);
}
}
}
}
}