cs_mwc_bch/peer/
peer.rs

1use messages::{Message, MessageHeader, Ping, Version};
2use network::Network;
3use peer::atomic_reader::AtomicReader;
4use snowflake::ProcessUniqueId;
5use std::fmt;
6use std::hash::{Hash, Hasher};
7use std::io;
8use std::io::Write;
9use std::net::{IpAddr, Shutdown, SocketAddr, TcpStream};
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex, Weak};
12use std::thread;
13use std::time::{Duration, UNIX_EPOCH};
14use util::rx::{Observable, Observer, Single, Subject};
15use util::{secs_since, Error, Result};
16
17/// Time to wait for the initial TCP connection
18const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
19
20/// Time to wait for handshake messages before failing to connect
21const HANDSHAKE_READ_TIMEOUT: Duration = Duration::from_secs(3);
22
23/// Event emitted when a connection is established with the peer
24#[derive(Clone, Debug)]
25pub struct PeerConnected {
26    pub peer: Arc<Peer>,
27}
28
29/// Event emitted when the connection with the peer is terminated
30#[derive(Clone, Debug)]
31pub struct PeerDisconnected {
32    pub peer: Arc<Peer>,
33}
34
35/// Event emitted when the peer receives a network message
36#[derive(Clone, Debug)]
37pub struct PeerMessage {
38    pub peer: Arc<Peer>,
39    pub message: Message,
40}
41
42/// Node on the network to send and receive messages
43///
44/// It will setup a connection, respond to pings, and store basic properties about the connection,
45/// but any real logic to process messages will be handled outside. Network messages received will
46/// be published to an observable on the peer's receiver thread. Messages may be sent via send()
47/// from any thread. Once shutdown, the Peer may no longer be used.
48pub struct Peer {
49    /// Unique id for this connection
50    pub id: ProcessUniqueId,
51    /// IP address
52    pub ip: IpAddr,
53    /// Port
54    pub port: u16,
55    /// Network
56    pub network: Network,
57
58    pub(crate) connected_event: Single<PeerConnected>,
59    pub(crate) disconnected_event: Single<PeerDisconnected>,
60    pub(crate) messages: Subject<PeerMessage>,
61
62    tcp_writer: Mutex<Option<TcpStream>>,
63
64    connected: AtomicBool,
65    time_delta: Mutex<i64>,
66    minfee: Mutex<u64>,
67    sendheaders: AtomicBool,
68    sendcmpct: AtomicBool,
69    version: Mutex<Option<Version>>,
70
71    /// Weak reference to self so we can pass ourselves in emitted events. This is a
72    /// bit ugly, but we hopefully can able to remove it once arbitrary self types goes in.
73    weak_self: Mutex<Option<Weak<Peer>>>,
74}
75
76impl Peer {
77    /// Creates a new peer and begins connecting
78    pub fn connect(
79        ip: IpAddr,
80        port: u16,
81        network: Network,
82        version: Version,
83        connectable: fn(&Version) -> bool,
84    ) -> Arc<Peer> {
85        let peer = Arc::new(Peer {
86            id: ProcessUniqueId::new(),
87            ip,
88            port,
89            network,
90            connected_event: Single::new(),
91            disconnected_event: Single::new(),
92            messages: Subject::new(),
93            tcp_writer: Mutex::new(None),
94            connected: AtomicBool::new(false),
95            time_delta: Mutex::new(0),
96            minfee: Mutex::new(0),
97            sendheaders: AtomicBool::new(false),
98            sendcmpct: AtomicBool::new(false),
99            version: Mutex::new(None),
100            weak_self: Mutex::new(None),
101        });
102
103        *peer.weak_self.lock().unwrap() = Some(Arc::downgrade(&peer));
104
105        Peer::connect_internal(&peer, version, connectable);
106
107        peer
108    }
109
110    /// Sends a message to the peer
111    pub fn send(&self, message: &Message) -> Result<()> {
112        if !self.connected.load(Ordering::Relaxed) {
113            return Err(Error::IllegalState("Not connected".to_string()));
114        }
115
116        let mut io_error: Option<io::Error> = None;
117        {
118            let mut tcp_writer = self.tcp_writer.lock().unwrap();
119            let mut tcp_writer = match tcp_writer.as_mut() {
120                Some(tcp_writer) => tcp_writer,
121                None => return Err(Error::IllegalState("No tcp stream".to_string())),
122            };
123
124            debug!("{:?} Write {:#?}", self, message);
125
126            if let Err(e) = message.write(&mut tcp_writer, self.network.magic()) {
127                io_error = Some(e);
128            } else {
129                if let Err(e) = tcp_writer.flush() {
130                    io_error = Some(e);
131                }
132            }
133        }
134
135        match io_error {
136            Some(e) => {
137                self.disconnect();
138                Err(Error::IOError(e))
139            }
140            None => Ok(()),
141        }
142    }
143
144    /// Disconects and disables the peer
145    pub fn disconnect(&self) {
146        self.connected.swap(false, Ordering::Relaxed);
147
148        info!("{:?} Disconnecting", self);
149
150        let mut tcp_stream = self.tcp_writer.lock().unwrap();
151        if let Some(tcp_stream) = tcp_stream.as_mut() {
152            if let Err(e) = tcp_stream.shutdown(Shutdown::Both) {
153                warn!("{:?} Problem shutting down tcp stream: {:?}", self, e);
154            }
155        }
156
157        if let Some(peer) = self.strong_self() {
158            self.disconnected_event.next(&PeerDisconnected { peer });
159        }
160    }
161
162    /// Returns a Single that emits a message when connected
163    pub fn connected_event(&self) -> &impl Observable<PeerConnected> {
164        &self.connected_event
165    }
166
167    /// Returns a Single that emits a message when connected
168    pub fn disconnected_event(&self) -> &impl Observable<PeerDisconnected> {
169        &self.disconnected_event
170    }
171
172    /// Returns an Observable that emits network messages
173    pub fn messages(&self) -> &impl Observable<PeerMessage> {
174        &self.messages
175    }
176
177    /// Returns whether the peer is connected
178    pub fn connected(&self) -> bool {
179        self.connected.load(Ordering::Relaxed)
180    }
181
182    /// Returns the time difference in seconds between our time and theirs, which is valid after connecting
183    pub fn time_delta(&self) -> i64 {
184        *self.time_delta.lock().unwrap()
185    }
186
187    /// Returns the minimum fee this peer accepts in sats/1000bytes
188    pub fn minfee(&self) -> u64 {
189        *self.minfee.lock().unwrap()
190    }
191
192    /// Returns whether this peer may announce new blocks with headers instead of inv
193    pub fn sendheaders(&self) -> bool {
194        self.sendheaders.load(Ordering::Relaxed)
195    }
196
197    /// Returns whether compact blocks are supported
198    pub fn sendcmpct(&self) -> bool {
199        self.sendcmpct.load(Ordering::Relaxed)
200    }
201
202    /// Gets the version message received during the handshake
203    pub fn version(&self) -> Result<Version> {
204        match &*self.version.lock().unwrap() {
205            Some(ref version) => Ok(version.clone()),
206            None => Err(Error::IllegalState("Not connected".to_string())),
207        }
208    }
209
210    fn connect_internal(peer: &Arc<Peer>, version: Version, connectable: fn(&Version) -> bool) {
211        info!("{:?} Connecting to {:?}:{}", peer, peer.ip, peer.port);
212
213        let tpeer = peer.clone();
214
215        thread::spawn(move || {
216            let mut tcp_reader = match tpeer.handshake(version, connectable) {
217                Ok(tcp_stream) => tcp_stream,
218                Err(e) => {
219                    error!("Failed to complete handshake: {:?}", e);
220                    tpeer.disconnect();
221                    return;
222                }
223            };
224
225            // The peer is considered connected and may be written to now
226            info!("{:?} Connected to {:?}:{}", tpeer, tpeer.ip, tpeer.port);
227            tpeer.connected.store(true, Ordering::Relaxed);
228            tpeer.connected_event.next(&PeerConnected {
229                peer: tpeer.clone(),
230            });
231
232            let mut partial: Option<MessageHeader> = None;
233            let magic = tpeer.network.magic();
234
235            // Message reads over TCP must be all-or-nothing.
236            let mut tcp_reader = AtomicReader::new(&mut tcp_reader);
237
238            loop {
239                let message = match &partial {
240                    Some(header) => Message::read_partial(&mut tcp_reader, header),
241                    None => Message::read(&mut tcp_reader, magic),
242                };
243
244                // Always check the connected flag right after the blocking read so we exit right away,
245                // and also so that we don't mistake errors with the stream shutting down
246                if !tpeer.connected.load(Ordering::Relaxed) {
247                    return;
248                }
249
250                match message {
251                    Ok(message) => {
252                        if let Message::Partial(header) = message {
253                            partial = Some(header);
254                        } else {
255                            debug!("{:?} Read {:#?}", tpeer, message);
256                            partial = None;
257
258                            if let Err(e) = tpeer.handle_message(&message) {
259                                error!("{:?} Error handling message: {:?}", tpeer, e);
260                                tpeer.disconnect();
261                                return;
262                            }
263
264                            tpeer.messages.next(&PeerMessage {
265                                peer: tpeer.clone(),
266                                message,
267                            });
268                        }
269                    }
270                    Err(e) => {
271                        // If timeout, try again later. Otherwise, shutdown
272                        if let Error::IOError(ref e) = e {
273                            // Depending on platform, either TimedOut or WouldBlock may be returned to indicate a non-error timeout
274                            if e.kind() == io::ErrorKind::TimedOut
275                                || e.kind() == io::ErrorKind::WouldBlock
276                            {
277                                continue;
278                            }
279                        }
280
281                        error!("{:?} Error reading message {:?}", tpeer, e);
282                        tpeer.disconnect();
283                        return;
284                    }
285                }
286            }
287        });
288    }
289
290    fn handshake(
291        self: &Peer,
292        version: Version,
293        connectable: fn(&Version) -> bool,
294    ) -> Result<TcpStream> {
295        // Connect over TCP
296        let tcp_addr = SocketAddr::new(self.ip, self.port);
297        let mut tcp_stream = TcpStream::connect_timeout(&tcp_addr, CONNECT_TIMEOUT)?;
298        tcp_stream.set_nodelay(true)?; // Disable buffering
299        tcp_stream.set_read_timeout(Some(HANDSHAKE_READ_TIMEOUT))?;
300        tcp_stream.set_nonblocking(false)?;
301
302        // Write our version
303        let our_version = Message::Version(version);
304        debug!("{:?} Write {:#?}", self, our_version);
305        let magic = self.network.magic();
306        our_version.write(&mut tcp_stream, magic)?;
307
308        // Read their version
309        let msg = Message::read(&mut tcp_stream, magic)?;
310        debug!("{:?} Read {:#?}", self, msg);
311        let their_version = match msg {
312            Message::Version(version) => version,
313            _ => return Err(Error::BadData("Unexpected command".to_string())),
314        };
315
316        if !connectable(&their_version) {
317            return Err(Error::IllegalState("Peer is not connectable".to_string()));
318        }
319
320        let now = secs_since(UNIX_EPOCH) as i64;
321        *self.time_delta.lock().unwrap() = now - their_version.timestamp;
322        *self.version.lock().unwrap() = Some(their_version);
323
324        // Read their verack
325        let their_verack = Message::read(&mut tcp_stream, magic)?;
326        debug!("{:?} Read {:#?}", self, their_verack);
327        match their_verack {
328            Message::Verack => {}
329            _ => return Err(Error::BadData("Unexpected command".to_string())),
330        };
331
332        // Write our verack
333        debug!("{:?} Write {:#?}", self, Message::Verack);
334        Message::Verack.write(&mut tcp_stream, magic)?;
335
336        // Write a ping message because this seems to help with connection weirdness
337        // https://bitcoin.stackexchange.com/questions/49487/getaddr-not-returning-connected-node-addresses
338        let ping = Message::Ping(Ping {
339            nonce: secs_since(UNIX_EPOCH) as u64,
340        });
341        debug!("{:?} Write {:#?}", self, ping);
342        ping.write(&mut tcp_stream, magic)?;
343
344        // After handshake, clone TCP stream and save the write version
345        *self.tcp_writer.lock().unwrap() = Some(tcp_stream.try_clone()?);
346
347        // We don't need a timeout for the read. The peer will shutdown just fine.
348        // The read timeout doesn't work reliably across platforms anyway.
349        tcp_stream.set_read_timeout(None)?;
350
351        Ok(tcp_stream)
352    }
353
354    fn handle_message(&self, message: &Message) -> Result<()> {
355        // A subset of messages are handled directly by the peer
356        match message {
357            Message::FeeFilter(feefilter) => {
358                *self.minfee.lock().unwrap() = feefilter.minfee;
359            }
360            Message::Ping(ping) => {
361                let pong = Message::Pong(ping.clone());
362                self.send(&pong)?;
363            }
364            Message::SendHeaders => {
365                self.sendheaders.store(true, Ordering::Relaxed);
366            }
367            Message::SendCmpct(sendcmpct) => {
368                let enable = sendcmpct.use_cmpctblock();
369                self.sendcmpct.store(enable, Ordering::Relaxed);
370            }
371            _ => {}
372        }
373        Ok(())
374    }
375
376    fn strong_self(&self) -> Option<Arc<Peer>> {
377        match &*self.weak_self.lock().unwrap() {
378            Some(ref weak_peer) => weak_peer.upgrade(),
379            None => None,
380        }
381    }
382}
383
384impl PartialEq for Peer {
385    fn eq(&self, other: &Peer) -> bool {
386        self.id == other.id
387    }
388}
389
390impl Eq for Peer {}
391
392impl Hash for Peer {
393    fn hash<H: Hasher>(&self, state: &mut H) {
394        self.id.hash(state)
395    }
396}
397
398impl fmt::Debug for Peer {
399    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
400        f.write_str(&format!("[Peer {}]", self.id))
401    }
402}
403
404impl Drop for Peer {
405    fn drop(&mut self) {
406        self.disconnect();
407    }
408}