1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
pub extern crate dpt;
pub extern crate rlpx;
#[macro_use]
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate secp256k1;
extern crate etcommon_bigint as bigint;
use dpt::{DPTNode, DPTStream, DPTMessage};
use rlpx::{Node, CapabilityInfo, RLPxStream};
use tokio_core::reactor::{Handle, Timeout};
use std::time::Duration;
use std::net::SocketAddr;
use std::io;
use secp256k1::key::SecretKey;
use futures::{StartSend, Async, Poll, Stream, Sink, Future, future};
use bigint::H512;
pub struct DevP2PStream {
dpt: DPTStream,
rlpx: RLPxStream,
ping_interval: Duration,
ping_timeout_interval: Duration,
ping_timeout: Timeout,
optimal_peers_len: usize,
handle: Handle,
}
impl DevP2PStream {
pub fn new(addr: &SocketAddr,
handle: &Handle, secret_key: SecretKey,
protocol_version: usize, client_version: String,
capabilities: Vec<CapabilityInfo>,
bootstrap_nodes: Vec<DPTNode>,
ping_interval: Duration, ping_timeout_interval: Duration,
optimal_peers_len: usize) -> Result<Self, io::Error> {
let port = addr.port();
let dpt = DPTStream::new(addr, handle, secret_key.clone(),
bootstrap_nodes, port)?;
let rlpx = RLPxStream::new(handle, secret_key.clone(),
protocol_version, client_version,
capabilities, port);
let ping_timeout = Timeout::new(ping_interval, handle)?;
Ok(DevP2PStream {
dpt, rlpx, ping_interval, ping_timeout,
ping_timeout_interval,
optimal_peers_len, handle: handle.clone()
})
}
fn poll_dpt_receive_peers(&mut self) -> Poll<(), io::Error> {
loop {
let node = match self.dpt.poll() {
Ok(Async::Ready(Some(node))) => node,
Ok(_) => return Ok(Async::Ready(())),
Err(e) => return Err(e),
};
self.rlpx.add_peer(&SocketAddr::new(node.address, node.tcp_port), node.id);
}
}
fn poll_dpt_request_new_peers(&mut self) -> Poll<(), io::Error> {
if self.rlpx.active_peers().len() < self.optimal_peers_len {
self.dpt.start_send(DPTMessage::RequestNewPeer)?;
self.dpt.poll_complete()?;
}
Ok(Async::Ready(()))
}
fn poll_dpt_ping(&mut self) -> Poll<(), io::Error> {
let result = self.ping_timeout.poll()?;
match result {
Async::NotReady => return Ok(Async::Ready(())),
Async::Ready(()) => {
self.dpt.start_send(DPTMessage::Ping(Timeout::new(
self.ping_timeout_interval, &self.handle)?))?;
self.dpt.poll_complete()?;
},
}
Ok(Async::Ready(()))
}
}
impl Stream for DevP2PStream {
type Item = (H512, CapabilityInfo, usize, Vec<u8>);
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.poll_dpt_receive_peers()?;
let result = self.rlpx.poll()?;
self.poll_dpt_request_new_peers()?;
self.poll_dpt_ping()?;
Ok(result)
}
}
impl Sink for DevP2PStream {
type SinkItem = (Node, CapabilityInfo, usize, Vec<u8>);
type SinkError = io::Error;
fn start_send(&mut self, val: (Node, CapabilityInfo, usize, Vec<u8>)) -> StartSend<Self::SinkItem, Self::SinkError> {
self.poll_dpt_receive_peers()?;
let result = self.rlpx.start_send(val)?;
self.poll_dpt_request_new_peers()?;
self.poll_dpt_ping()?;
Ok(result)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
try_ready!(self.dpt.poll_complete());
try_ready!(self.rlpx.poll_complete());
Ok(Async::Ready(()))
}
}