nanocurrency_peering/
lib.rs

1use std::iter::IntoIterator;
2use std::io;
3use std::net::{SocketAddr, SocketAddrV6, ToSocketAddrs};
4use std::cell::RefCell;
5use std::rc::Rc;
6use std::time::{Duration, Instant};
7use std::collections::{hash_map, HashMap};
8use std::marker::PhantomData;
9use std::fmt::Debug;
10
11#[macro_use]
12extern crate log;
13
14#[macro_use]
15extern crate futures;
16use futures::{stream, Future, Sink, Stream};
17
18extern crate net2;
19use net2::UdpBuilder;
20
21extern crate bytes;
22
23extern crate tokio;
24use tokio::net::UdpSocket;
25use tokio::reactor::Handle;
26
27extern crate tokio_io;
28
29extern crate tokio_timer;
30use tokio_timer::Timer;
31
32extern crate rand;
33use rand::{thread_rng, Rng};
34
35extern crate nanocurrency_protocol;
36use nanocurrency_protocol::*;
37
38#[macro_use]
39extern crate nanocurrency_types;
40use nanocurrency_types::Network;
41
42mod udp_framed;
43
44// In seconds.
45const KEEPALIVE_INTERVAL: u64 = 60;
46const KEEPALIVE_CUTOFF: u64 = KEEPALIVE_INTERVAL * 5;
47
48pub fn addr_to_ipv6(addr: SocketAddr) -> SocketAddrV6 {
49    match addr {
50        SocketAddr::V4(addr) => SocketAddrV6::new(addr.ip().to_ipv6_mapped(), addr.port(), 0, 0),
51        SocketAddr::V6(addr) => addr,
52    }
53}
54
55struct IgnoreErrors<S: Stream, E>
56where
57    S::Error: Debug,
58{
59    inner: S,
60    err_phantom: PhantomData<E>,
61}
62
63impl<S: Stream, E: Debug> Stream for IgnoreErrors<S, E>
64where
65    S::Error: Debug,
66{
67    type Item = S::Item;
68    type Error = E;
69
70    fn poll(&mut self) -> Result<futures::Async<Option<S::Item>>, E> {
71        loop {
72            match self.inner.poll() {
73                Ok(x) => return Ok(x),
74                Err(err) => debug!("ignoring error: {:?}", err),
75            }
76        }
77    }
78}
79
80fn ignore_errors<S: Stream, E>(stream: S) -> IgnoreErrors<S, E>
81where
82    S::Error: Debug,
83{
84    IgnoreErrors {
85        inner: stream,
86        err_phantom: PhantomData,
87    }
88}
89
90pub struct PeerInfo {
91    pub last_heard_from: Instant,
92    pub network_version: u8,
93    // TODO replace with #[non_exhaustive] once stable
94    _private: (),
95}
96
97#[derive(Default)]
98pub struct PeeringManagerState {
99    peers: HashMap<SocketAddrV6, PeerInfo>,
100    rand_peers: RefCell<Vec<SocketAddrV6>>,
101    new_peer_backoff: HashMap<SocketAddrV6, Instant>,
102}
103
104impl PeeringManagerState {
105    pub fn get_rand_peers(&self, peers_out: &mut [SocketAddrV6]) {
106        if self.peers.len() < peers_out.len() {
107            for (peer, peer_out) in self.peers.keys().zip(peers_out.iter_mut()) {
108                *peer_out = *peer;
109            }
110            return;
111        }
112        let mut thread_rng = thread_rng();
113        let mut rand_peers = self.rand_peers.borrow_mut();
114        for peer_out in peers_out.iter_mut() {
115            if rand_peers.is_empty() {
116                rand_peers.extend(self.peers.keys());
117                thread_rng.shuffle(&mut rand_peers);
118            }
119            *peer_out = rand_peers.pop().unwrap();
120        }
121    }
122
123    pub fn peers(&self) -> &HashMap<SocketAddrV6, PeerInfo> {
124        &self.peers
125    }
126
127    fn contacted(&mut self, addr: SocketAddr, header: &MessageHeader) {
128        let addr = addr_to_ipv6(addr);
129        match self.peers.entry(addr) {
130            hash_map::Entry::Occupied(mut entry) => {
131                entry.get_mut().last_heard_from = Instant::now();
132            }
133            hash_map::Entry::Vacant(entry) => {
134                entry.insert(PeerInfo {
135                    last_heard_from: Instant::now(),
136                    network_version: header.version,
137                    _private: (),
138                });
139            }
140        }
141    }
142}
143
144pub struct PeeringManagerBuilder<F, I, II>
145where
146    I: Iterator<Item = (Message, SocketAddr)>,
147    II: IntoIterator<Item = (Message, SocketAddr), IntoIter = I>,
148    F: Fn(&PeeringManagerState, MessageHeader, Message, SocketAddr) -> II + 'static,
149{
150    use_official_peers: bool,
151    custom_peers: Vec<SocketAddr>,
152    listen_addr: SocketAddr,
153    network: Network,
154    message_handler: F,
155    state_base: Rc<RefCell<PeeringManagerState>>,
156    send_messages: Box<Stream<Item = (Message, SocketAddr), Error = ()>>,
157}
158
159impl<F, I, II> PeeringManagerBuilder<F, I, II>
160where
161    I: Iterator<Item = (Message, SocketAddr)>,
162    II: IntoIterator<Item = (Message, SocketAddr), IntoIter = I>,
163    F: Fn(&PeeringManagerState, MessageHeader, Message, SocketAddr) -> II + 'static,
164{
165    pub fn new(message_handler: F) -> PeeringManagerBuilder<F, I, II> {
166        PeeringManagerBuilder {
167            use_official_peers: true,
168            custom_peers: Vec::new(),
169            listen_addr: "[::]:7075".parse().unwrap(),
170            network: Network::Live,
171            message_handler,
172            state_base: Default::default(),
173            send_messages: Box::new(stream::empty()),
174        }
175    }
176
177    pub fn use_official_peers(mut self, value: bool) -> Self {
178        self.use_official_peers = value;
179        self
180    }
181
182    pub fn custom_peers(mut self, value: Vec<SocketAddr>) -> Self {
183        self.custom_peers = value;
184        self
185    }
186
187    pub fn listen_addr(mut self, value: SocketAddr) -> Self {
188        self.listen_addr = value;
189        self
190    }
191
192    pub fn network(mut self, value: Network) -> Self {
193        self.network = value;
194        self
195    }
196
197    pub fn state_base(mut self, value: Rc<RefCell<PeeringManagerState>>) -> Self {
198        self.state_base = value;
199        self
200    }
201
202    pub fn send_messages(
203        mut self,
204        value: Box<Stream<Item = (Message, SocketAddr), Error = ()>>,
205    ) -> Self {
206        self.send_messages = value;
207        self
208    }
209
210    pub fn run(self) -> io::Result<Box<Future<Item = (), Error = ()>>> {
211        let mut configured_peers: Vec<SocketAddrV6> = Vec::new();
212        if self.use_official_peers {
213            let official_domain = match self.network {
214                Network::Live => Some("rai.raiblocks.net:7075"),
215                Network::Beta => Some("rai-beta.raiblocks.net:7075"),
216                Network::Test => None,
217            };
218            if let Some(official_domain) = official_domain {
219                configured_peers.extend(official_domain.to_socket_addrs()?.map(addr_to_ipv6));
220            }
221        }
222        configured_peers.extend(self.custom_peers.into_iter().map(addr_to_ipv6));
223        let socket;
224        if cfg!(target_os = "windows") {
225            // TODO this is necessary on Windows but doesn't work well
226            let std_socket = UdpBuilder::new_v6()?
227                .only_v6(false)?
228                .bind(self.listen_addr)?;
229            socket = UdpSocket::from_std(std_socket, &Handle::current())?;
230        } else {
231            socket = UdpSocket::bind(&self.listen_addr)?;
232        }
233        let (sink, stream) = udp_framed::UdpFramed::new(socket, NanoCurrencyCodec).split();
234        let network = self.network;
235        let message_handler = self.message_handler;
236        let state_rc = self.state_base.clone();
237        let process_message = move |((header, msg), src)| {
238            let _: &MessageHeader = &header;
239            if header.network != network {
240                warn!("ignoring message from {:?} network", header.network);
241                return stream::iter_ok(Vec::new().into_iter());
242            }
243            let mut state = state_rc.borrow_mut();
244            state.contacted(src, &header);
245            trace!("got message from {:?}: {:?}", src, msg);
246            let mut output_messages = Vec::new();
247            match msg {
248                Message::Keepalive(new_peers) => {
249                    let state = &mut state;
250                    output_messages.extend(new_peers.to_vec().into_iter().filter_map(
251                        move |new_peer| {
252                            match state.new_peer_backoff.entry(new_peer) {
253                                hash_map::Entry::Occupied(mut entry) => {
254                                    let entry = entry.get_mut();
255                                    if *entry
256                                        > Instant::now() - Duration::from_secs(KEEPALIVE_CUTOFF)
257                                    {
258                                        return None;
259                                    }
260                                    *entry = Instant::now();
261                                }
262                                hash_map::Entry::Vacant(entry) => {
263                                    entry.insert(Instant::now());
264                                }
265                            }
266                            if state.peers.contains_key(&new_peer) {
267                                return None;
268                            }
269                            let ip = new_peer.ip().clone();
270                            if ip.octets().iter().all(|&x| x == 0) {
271                                return None;
272                            }
273                            if new_peer.port() == 0 {
274                                return None;
275                            }
276                            if ip.is_unspecified() || ip.is_loopback() || ip.is_multicast() {
277                                return None;
278                            }
279                            let mut rand_peers = [zero_v6_addr!(); 8];
280                            state.get_rand_peers(&mut rand_peers);
281                            Some((
282                                (network, Message::Keepalive(rand_peers)),
283                                SocketAddr::V6(new_peer),
284                            ))
285                        },
286                    ));
287                }
288                _ => {}
289            }
290            output_messages.extend(
291                (message_handler)(&state, header, msg, src)
292                    .into_iter()
293                    .map(|(m, a)| ((network, m), a)),
294            );
295            stream::iter_ok::<_, io::Error>(output_messages)
296        };
297        let process_messages = stream.map(process_message).flatten();
298        let state_rc = self.state_base.clone();
299        let timer = Timer::default();
300        let keepalive = stream::once(Ok(()))
301            .chain(timer.interval(Duration::from_secs(KEEPALIVE_INTERVAL)))
302            .map(move |_| {
303                let mut state = state_rc.borrow_mut();
304                let last_heard_cutoff = Instant::now() - Duration::from_secs(KEEPALIVE_CUTOFF);
305                state
306                    .new_peer_backoff
307                    .retain(|_, ts| *ts > last_heard_cutoff);
308                state
309                    .peers
310                    .retain(|_, info| info.last_heard_from > last_heard_cutoff);
311                debug!("peers: {:?}", state.peers.keys());
312                let mut keepalives = Vec::with_capacity(state.peers.len());
313                for addr in state
314                    .peers
315                    .iter()
316                    .map(|(a, _)| a)
317                    .chain(configured_peers.iter())
318                {
319                    let mut rand_peers = [zero_v6_addr!(); 8];
320                    state.get_rand_peers(&mut rand_peers);
321                    keepalives.push((
322                        (network, Message::Keepalive(rand_peers)),
323                        SocketAddr::V6(*addr),
324                    ));
325                }
326                stream::iter_ok::<_, io::Error>(keepalives.into_iter())
327            })
328            .flatten();
329        let send_messages = self.send_messages.map(move |(msg, addr)| ((network, msg), addr));
330        Ok(Box::new(
331            sink.send_all(
332                ignore_errors::<_, io::Error>(process_messages)
333                    .select(ignore_errors(keepalive))
334                    .select(ignore_errors(send_messages)),
335            ).map(|_| ())
336                .map_err(|_| ()),
337        ))
338    }
339}