1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use bitcoin::network::message::NetworkMessage;
use p2p::{
P2PControlSender, PeerId, PeerMessage, PeerMessageReceiver, PeerMessageSender
};
use rand::{RngCore, thread_rng};
use std::{
collections::HashMap,
sync::mpsc,
thread,
time::Duration
};
use timeout::{ExpectedReply, SharedTimeout};
const SECS: u64 = 60;
pub struct Ping {
p2p: P2PControlSender<NetworkMessage>,
timeout: SharedTimeout<NetworkMessage, ExpectedReply>,
asked: HashMap<PeerId, u64>
}
impl Ping {
pub fn new(p2p: P2PControlSender<NetworkMessage>, timeout: SharedTimeout<NetworkMessage, ExpectedReply>) -> PeerMessageSender<NetworkMessage> {
let (sender, receiver) = mpsc::sync_channel(p2p.back_pressure);
let mut ping = Ping { p2p, timeout, asked: HashMap::new() };
thread::Builder::new().name("ping".to_string()).spawn(move || { ping.run(receiver) }).unwrap();
PeerMessageSender::new(sender)
}
fn run(&mut self, receiver: PeerMessageReceiver<NetworkMessage>) {
loop {
while let Ok(msg) = receiver.recv_timeout(Duration::from_millis(SECS*1000)) {
match msg {
PeerMessage::Disconnected(pid,_) => {
self.timeout.lock().unwrap().forget(pid);
self.asked.remove(&pid);
},
PeerMessage::Incoming(pid, msg) => {
match msg {
NetworkMessage::Pong(n) => {
if self.asked.remove(&pid) == Some(n) {
self.timeout.lock().unwrap().received(pid, 1, ExpectedReply::Pong);
}
}
_ => { }
}
}
_ => {}
}
}
self.timeout.lock().unwrap().check(vec!(ExpectedReply::Pong));
for peer in self.p2p.peers() {
if !self.timeout.lock().unwrap().is_busy(peer) {
let ask = thread_rng().next_u64();
self.asked.insert(peer, ask);
self.timeout.lock().unwrap().expect(peer, 1, ExpectedReply::Pong);
self.p2p.send_network(peer, NetworkMessage::Ping(ask));
}
}
}
}
}