1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::{
io::{Error as IoError, ErrorKind},
time::Duration,
};
use tokio::{net::TcpStream, time::timeout};
use snarkos_metrics::{self as metrics, connections::*, wrapped_mpsc};
use crate::{NetworkError, Node, Peer, PeerEvent, PeerEventData, PeerHandle, Version};
use super::{network::PeerIOHandle, PeerAction};
const CONNECTION_TIMEOUT_SECS: u64 = 3;
impl Peer {
pub fn connect(mut self, node: Node, event_target: wrapped_mpsc::Sender<PeerEvent>) {
let (sender, receiver) = wrapped_mpsc::channel::<PeerAction>(snarkos_metrics::queues::PEER_EVENTS, 64);
tokio::spawn(async move {
self.set_connecting();
match self.inner_connect(node.version()).await {
Err(e) => {
self.quality.connect_failed();
event_target
.send(PeerEvent {
address: self.address,
data: PeerEventData::FailHandshake,
})
.await
.ok();
self.fail();
if !e.is_trivial() {
error!(
"failed to send outgoing connection to peer '{}': '{:?}'",
self.address, e
);
} else {
warn!(
"failed to send outgoing connection to peer '{}': '{:?}'",
self.address, e
);
}
}
Ok(network) => {
self.set_connected();
metrics::increment_gauge!(CONNECTED, 1.0);
event_target
.send(PeerEvent {
address: self.address,
data: PeerEventData::Connected(PeerHandle { sender: sender.clone() }),
})
.await
.ok();
if let Err(e) = self.run(node, network, receiver).await {
if !e.is_trivial() {
self.fail();
error!(
"unrecoverable failure communicating to outbound peer '{}': '{:?}'",
self.address, e
);
} else {
warn!(
"unrecoverable failure communicating to outbound peer '{}': '{:?}'",
self.address, e
);
}
}
metrics::decrement_gauge!(CONNECTED, 1.0);
}
}
self.set_disconnected();
event_target
.send(PeerEvent {
address: self.address,
data: PeerEventData::Disconnect(Box::new(self)),
})
.await
.ok();
});
}
async fn inner_connect(&mut self, our_version: Version) -> Result<PeerIOHandle, NetworkError> {
metrics::increment_gauge!(CONNECTING, 1.0);
let _x = defer::defer(|| metrics::decrement_gauge!(CONNECTING, 1.0));
match timeout(
Duration::from_secs(CONNECTION_TIMEOUT_SECS),
TcpStream::connect(self.address),
)
.await
{
Ok(stream) => self.inner_handshake_initiator(stream?, our_version).await,
Err(_) => Err(NetworkError::Io(IoError::new(
ErrorKind::TimedOut,
"connection timed out",
))),
}
}
}