1use std::iter::IntoIterator;
2use std::io;
3use std::net::{SocketAddr, SocketAddrV6, ToSocketAddrs};
4use std::cell::RefCell;
5use std::rc::Rc;
6use std::time::{Duration, Instant};
7use std::collections::{hash_map, HashMap};
8use std::marker::PhantomData;
9use std::fmt::Debug;
10
11#[macro_use]
12extern crate log;
13
14#[macro_use]
15extern crate futures;
16use futures::{stream, Future, Sink, Stream};
17
18extern crate net2;
19use net2::UdpBuilder;
20
21extern crate bytes;
22
23extern crate tokio;
24use tokio::net::UdpSocket;
25use tokio::reactor::Handle;
26
27extern crate tokio_io;
28
29extern crate tokio_timer;
30use tokio_timer::Timer;
31
32extern crate rand;
33use rand::{thread_rng, Rng};
34
35extern crate nanocurrency_protocol;
36use nanocurrency_protocol::*;
37
38#[macro_use]
39extern crate nanocurrency_types;
40use nanocurrency_types::Network;
41
42mod udp_framed;
43
44const KEEPALIVE_INTERVAL: u64 = 60;
46const KEEPALIVE_CUTOFF: u64 = KEEPALIVE_INTERVAL * 5;
47
48pub fn addr_to_ipv6(addr: SocketAddr) -> SocketAddrV6 {
49 match addr {
50 SocketAddr::V4(addr) => SocketAddrV6::new(addr.ip().to_ipv6_mapped(), addr.port(), 0, 0),
51 SocketAddr::V6(addr) => addr,
52 }
53}
54
55struct IgnoreErrors<S: Stream, E>
56where
57 S::Error: Debug,
58{
59 inner: S,
60 err_phantom: PhantomData<E>,
61}
62
63impl<S: Stream, E: Debug> Stream for IgnoreErrors<S, E>
64where
65 S::Error: Debug,
66{
67 type Item = S::Item;
68 type Error = E;
69
70 fn poll(&mut self) -> Result<futures::Async<Option<S::Item>>, E> {
71 loop {
72 match self.inner.poll() {
73 Ok(x) => return Ok(x),
74 Err(err) => debug!("ignoring error: {:?}", err),
75 }
76 }
77 }
78}
79
80fn ignore_errors<S: Stream, E>(stream: S) -> IgnoreErrors<S, E>
81where
82 S::Error: Debug,
83{
84 IgnoreErrors {
85 inner: stream,
86 err_phantom: PhantomData,
87 }
88}
89
90pub struct PeerInfo {
91 pub last_heard_from: Instant,
92 pub network_version: u8,
93 _private: (),
95}
96
97#[derive(Default)]
98pub struct PeeringManagerState {
99 peers: HashMap<SocketAddrV6, PeerInfo>,
100 rand_peers: RefCell<Vec<SocketAddrV6>>,
101 new_peer_backoff: HashMap<SocketAddrV6, Instant>,
102}
103
104impl PeeringManagerState {
105 pub fn get_rand_peers(&self, peers_out: &mut [SocketAddrV6]) {
106 if self.peers.len() < peers_out.len() {
107 for (peer, peer_out) in self.peers.keys().zip(peers_out.iter_mut()) {
108 *peer_out = *peer;
109 }
110 return;
111 }
112 let mut thread_rng = thread_rng();
113 let mut rand_peers = self.rand_peers.borrow_mut();
114 for peer_out in peers_out.iter_mut() {
115 if rand_peers.is_empty() {
116 rand_peers.extend(self.peers.keys());
117 thread_rng.shuffle(&mut rand_peers);
118 }
119 *peer_out = rand_peers.pop().unwrap();
120 }
121 }
122
123 pub fn peers(&self) -> &HashMap<SocketAddrV6, PeerInfo> {
124 &self.peers
125 }
126
127 fn contacted(&mut self, addr: SocketAddr, header: &MessageHeader) {
128 let addr = addr_to_ipv6(addr);
129 match self.peers.entry(addr) {
130 hash_map::Entry::Occupied(mut entry) => {
131 entry.get_mut().last_heard_from = Instant::now();
132 }
133 hash_map::Entry::Vacant(entry) => {
134 entry.insert(PeerInfo {
135 last_heard_from: Instant::now(),
136 network_version: header.version,
137 _private: (),
138 });
139 }
140 }
141 }
142}
143
144pub struct PeeringManagerBuilder<F, I, II>
145where
146 I: Iterator<Item = (Message, SocketAddr)>,
147 II: IntoIterator<Item = (Message, SocketAddr), IntoIter = I>,
148 F: Fn(&PeeringManagerState, MessageHeader, Message, SocketAddr) -> II + 'static,
149{
150 use_official_peers: bool,
151 custom_peers: Vec<SocketAddr>,
152 listen_addr: SocketAddr,
153 network: Network,
154 message_handler: F,
155 state_base: Rc<RefCell<PeeringManagerState>>,
156 send_messages: Box<Stream<Item = (Message, SocketAddr), Error = ()>>,
157}
158
159impl<F, I, II> PeeringManagerBuilder<F, I, II>
160where
161 I: Iterator<Item = (Message, SocketAddr)>,
162 II: IntoIterator<Item = (Message, SocketAddr), IntoIter = I>,
163 F: Fn(&PeeringManagerState, MessageHeader, Message, SocketAddr) -> II + 'static,
164{
165 pub fn new(message_handler: F) -> PeeringManagerBuilder<F, I, II> {
166 PeeringManagerBuilder {
167 use_official_peers: true,
168 custom_peers: Vec::new(),
169 listen_addr: "[::]:7075".parse().unwrap(),
170 network: Network::Live,
171 message_handler,
172 state_base: Default::default(),
173 send_messages: Box::new(stream::empty()),
174 }
175 }
176
177 pub fn use_official_peers(mut self, value: bool) -> Self {
178 self.use_official_peers = value;
179 self
180 }
181
182 pub fn custom_peers(mut self, value: Vec<SocketAddr>) -> Self {
183 self.custom_peers = value;
184 self
185 }
186
187 pub fn listen_addr(mut self, value: SocketAddr) -> Self {
188 self.listen_addr = value;
189 self
190 }
191
192 pub fn network(mut self, value: Network) -> Self {
193 self.network = value;
194 self
195 }
196
197 pub fn state_base(mut self, value: Rc<RefCell<PeeringManagerState>>) -> Self {
198 self.state_base = value;
199 self
200 }
201
202 pub fn send_messages(
203 mut self,
204 value: Box<Stream<Item = (Message, SocketAddr), Error = ()>>,
205 ) -> Self {
206 self.send_messages = value;
207 self
208 }
209
210 pub fn run(self) -> io::Result<Box<Future<Item = (), Error = ()>>> {
211 let mut configured_peers: Vec<SocketAddrV6> = Vec::new();
212 if self.use_official_peers {
213 let official_domain = match self.network {
214 Network::Live => Some("rai.raiblocks.net:7075"),
215 Network::Beta => Some("rai-beta.raiblocks.net:7075"),
216 Network::Test => None,
217 };
218 if let Some(official_domain) = official_domain {
219 configured_peers.extend(official_domain.to_socket_addrs()?.map(addr_to_ipv6));
220 }
221 }
222 configured_peers.extend(self.custom_peers.into_iter().map(addr_to_ipv6));
223 let socket;
224 if cfg!(target_os = "windows") {
225 let std_socket = UdpBuilder::new_v6()?
227 .only_v6(false)?
228 .bind(self.listen_addr)?;
229 socket = UdpSocket::from_std(std_socket, &Handle::current())?;
230 } else {
231 socket = UdpSocket::bind(&self.listen_addr)?;
232 }
233 let (sink, stream) = udp_framed::UdpFramed::new(socket, NanoCurrencyCodec).split();
234 let network = self.network;
235 let message_handler = self.message_handler;
236 let state_rc = self.state_base.clone();
237 let process_message = move |((header, msg), src)| {
238 let _: &MessageHeader = &header;
239 if header.network != network {
240 warn!("ignoring message from {:?} network", header.network);
241 return stream::iter_ok(Vec::new().into_iter());
242 }
243 let mut state = state_rc.borrow_mut();
244 state.contacted(src, &header);
245 trace!("got message from {:?}: {:?}", src, msg);
246 let mut output_messages = Vec::new();
247 match msg {
248 Message::Keepalive(new_peers) => {
249 let state = &mut state;
250 output_messages.extend(new_peers.to_vec().into_iter().filter_map(
251 move |new_peer| {
252 match state.new_peer_backoff.entry(new_peer) {
253 hash_map::Entry::Occupied(mut entry) => {
254 let entry = entry.get_mut();
255 if *entry
256 > Instant::now() - Duration::from_secs(KEEPALIVE_CUTOFF)
257 {
258 return None;
259 }
260 *entry = Instant::now();
261 }
262 hash_map::Entry::Vacant(entry) => {
263 entry.insert(Instant::now());
264 }
265 }
266 if state.peers.contains_key(&new_peer) {
267 return None;
268 }
269 let ip = new_peer.ip().clone();
270 if ip.octets().iter().all(|&x| x == 0) {
271 return None;
272 }
273 if new_peer.port() == 0 {
274 return None;
275 }
276 if ip.is_unspecified() || ip.is_loopback() || ip.is_multicast() {
277 return None;
278 }
279 let mut rand_peers = [zero_v6_addr!(); 8];
280 state.get_rand_peers(&mut rand_peers);
281 Some((
282 (network, Message::Keepalive(rand_peers)),
283 SocketAddr::V6(new_peer),
284 ))
285 },
286 ));
287 }
288 _ => {}
289 }
290 output_messages.extend(
291 (message_handler)(&state, header, msg, src)
292 .into_iter()
293 .map(|(m, a)| ((network, m), a)),
294 );
295 stream::iter_ok::<_, io::Error>(output_messages)
296 };
297 let process_messages = stream.map(process_message).flatten();
298 let state_rc = self.state_base.clone();
299 let timer = Timer::default();
300 let keepalive = stream::once(Ok(()))
301 .chain(timer.interval(Duration::from_secs(KEEPALIVE_INTERVAL)))
302 .map(move |_| {
303 let mut state = state_rc.borrow_mut();
304 let last_heard_cutoff = Instant::now() - Duration::from_secs(KEEPALIVE_CUTOFF);
305 state
306 .new_peer_backoff
307 .retain(|_, ts| *ts > last_heard_cutoff);
308 state
309 .peers
310 .retain(|_, info| info.last_heard_from > last_heard_cutoff);
311 debug!("peers: {:?}", state.peers.keys());
312 let mut keepalives = Vec::with_capacity(state.peers.len());
313 for addr in state
314 .peers
315 .iter()
316 .map(|(a, _)| a)
317 .chain(configured_peers.iter())
318 {
319 let mut rand_peers = [zero_v6_addr!(); 8];
320 state.get_rand_peers(&mut rand_peers);
321 keepalives.push((
322 (network, Message::Keepalive(rand_peers)),
323 SocketAddr::V6(*addr),
324 ));
325 }
326 stream::iter_ok::<_, io::Error>(keepalives.into_iter())
327 })
328 .flatten();
329 let send_messages = self.send_messages.map(move |(msg, addr)| ((network, msg), addr));
330 Ok(Box::new(
331 sink.send_all(
332 ignore_errors::<_, io::Error>(process_messages)
333 .select(ignore_errors(keepalive))
334 .select(ignore_errors(send_messages)),
335 ).map(|_| ())
336 .map_err(|_| ()),
337 ))
338 }
339}