1use messages::{Message, MessageHeader, Ping, Version};
2use network::Network;
3use peer::atomic_reader::AtomicReader;
4use snowflake::ProcessUniqueId;
5use std::fmt;
6use std::hash::{Hash, Hasher};
7use std::io;
8use std::io::Write;
9use std::net::{IpAddr, Shutdown, SocketAddr, TcpStream};
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex, Weak};
12use std::thread;
13use std::time::{Duration, UNIX_EPOCH};
14use util::rx::{Observable, Observer, Single, Subject};
15use util::{secs_since, Error, Result};
16
17const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
19
20const HANDSHAKE_READ_TIMEOUT: Duration = Duration::from_secs(3);
22
23#[derive(Clone, Debug)]
25pub struct PeerConnected {
26 pub peer: Arc<Peer>,
27}
28
29#[derive(Clone, Debug)]
31pub struct PeerDisconnected {
32 pub peer: Arc<Peer>,
33}
34
35#[derive(Clone, Debug)]
37pub struct PeerMessage {
38 pub peer: Arc<Peer>,
39 pub message: Message,
40}
41
42pub struct Peer {
49 pub id: ProcessUniqueId,
51 pub ip: IpAddr,
53 pub port: u16,
55 pub network: Network,
57
58 pub(crate) connected_event: Single<PeerConnected>,
59 pub(crate) disconnected_event: Single<PeerDisconnected>,
60 pub(crate) messages: Subject<PeerMessage>,
61
62 tcp_writer: Mutex<Option<TcpStream>>,
63
64 connected: AtomicBool,
65 time_delta: Mutex<i64>,
66 minfee: Mutex<u64>,
67 sendheaders: AtomicBool,
68 sendcmpct: AtomicBool,
69 version: Mutex<Option<Version>>,
70
71 weak_self: Mutex<Option<Weak<Peer>>>,
74}
75
76impl Peer {
77 pub fn connect(
79 ip: IpAddr,
80 port: u16,
81 network: Network,
82 version: Version,
83 connectable: fn(&Version) -> bool,
84 ) -> Arc<Peer> {
85 let peer = Arc::new(Peer {
86 id: ProcessUniqueId::new(),
87 ip,
88 port,
89 network,
90 connected_event: Single::new(),
91 disconnected_event: Single::new(),
92 messages: Subject::new(),
93 tcp_writer: Mutex::new(None),
94 connected: AtomicBool::new(false),
95 time_delta: Mutex::new(0),
96 minfee: Mutex::new(0),
97 sendheaders: AtomicBool::new(false),
98 sendcmpct: AtomicBool::new(false),
99 version: Mutex::new(None),
100 weak_self: Mutex::new(None),
101 });
102
103 *peer.weak_self.lock().unwrap() = Some(Arc::downgrade(&peer));
104
105 Peer::connect_internal(&peer, version, connectable);
106
107 peer
108 }
109
110 pub fn send(&self, message: &Message) -> Result<()> {
112 if !self.connected.load(Ordering::Relaxed) {
113 return Err(Error::IllegalState("Not connected".to_string()));
114 }
115
116 let mut io_error: Option<io::Error> = None;
117 {
118 let mut tcp_writer = self.tcp_writer.lock().unwrap();
119 let mut tcp_writer = match tcp_writer.as_mut() {
120 Some(tcp_writer) => tcp_writer,
121 None => return Err(Error::IllegalState("No tcp stream".to_string())),
122 };
123
124 debug!("{:?} Write {:#?}", self, message);
125
126 if let Err(e) = message.write(&mut tcp_writer, self.network.magic()) {
127 io_error = Some(e);
128 } else {
129 if let Err(e) = tcp_writer.flush() {
130 io_error = Some(e);
131 }
132 }
133 }
134
135 match io_error {
136 Some(e) => {
137 self.disconnect();
138 Err(Error::IOError(e))
139 }
140 None => Ok(()),
141 }
142 }
143
144 pub fn disconnect(&self) {
146 self.connected.swap(false, Ordering::Relaxed);
147
148 info!("{:?} Disconnecting", self);
149
150 let mut tcp_stream = self.tcp_writer.lock().unwrap();
151 if let Some(tcp_stream) = tcp_stream.as_mut() {
152 if let Err(e) = tcp_stream.shutdown(Shutdown::Both) {
153 warn!("{:?} Problem shutting down tcp stream: {:?}", self, e);
154 }
155 }
156
157 if let Some(peer) = self.strong_self() {
158 self.disconnected_event.next(&PeerDisconnected { peer });
159 }
160 }
161
162 pub fn connected_event(&self) -> &impl Observable<PeerConnected> {
164 &self.connected_event
165 }
166
167 pub fn disconnected_event(&self) -> &impl Observable<PeerDisconnected> {
169 &self.disconnected_event
170 }
171
172 pub fn messages(&self) -> &impl Observable<PeerMessage> {
174 &self.messages
175 }
176
177 pub fn connected(&self) -> bool {
179 self.connected.load(Ordering::Relaxed)
180 }
181
182 pub fn time_delta(&self) -> i64 {
184 *self.time_delta.lock().unwrap()
185 }
186
187 pub fn minfee(&self) -> u64 {
189 *self.minfee.lock().unwrap()
190 }
191
192 pub fn sendheaders(&self) -> bool {
194 self.sendheaders.load(Ordering::Relaxed)
195 }
196
197 pub fn sendcmpct(&self) -> bool {
199 self.sendcmpct.load(Ordering::Relaxed)
200 }
201
202 pub fn version(&self) -> Result<Version> {
204 match &*self.version.lock().unwrap() {
205 Some(ref version) => Ok(version.clone()),
206 None => Err(Error::IllegalState("Not connected".to_string())),
207 }
208 }
209
210 fn connect_internal(peer: &Arc<Peer>, version: Version, connectable: fn(&Version) -> bool) {
211 info!("{:?} Connecting to {:?}:{}", peer, peer.ip, peer.port);
212
213 let tpeer = peer.clone();
214
215 thread::spawn(move || {
216 let mut tcp_reader = match tpeer.handshake(version, connectable) {
217 Ok(tcp_stream) => tcp_stream,
218 Err(e) => {
219 error!("Failed to complete handshake: {:?}", e);
220 tpeer.disconnect();
221 return;
222 }
223 };
224
225 info!("{:?} Connected to {:?}:{}", tpeer, tpeer.ip, tpeer.port);
227 tpeer.connected.store(true, Ordering::Relaxed);
228 tpeer.connected_event.next(&PeerConnected {
229 peer: tpeer.clone(),
230 });
231
232 let mut partial: Option<MessageHeader> = None;
233 let magic = tpeer.network.magic();
234
235 let mut tcp_reader = AtomicReader::new(&mut tcp_reader);
237
238 loop {
239 let message = match &partial {
240 Some(header) => Message::read_partial(&mut tcp_reader, header),
241 None => Message::read(&mut tcp_reader, magic),
242 };
243
244 if !tpeer.connected.load(Ordering::Relaxed) {
247 return;
248 }
249
250 match message {
251 Ok(message) => {
252 if let Message::Partial(header) = message {
253 partial = Some(header);
254 } else {
255 debug!("{:?} Read {:#?}", tpeer, message);
256 partial = None;
257
258 if let Err(e) = tpeer.handle_message(&message) {
259 error!("{:?} Error handling message: {:?}", tpeer, e);
260 tpeer.disconnect();
261 return;
262 }
263
264 tpeer.messages.next(&PeerMessage {
265 peer: tpeer.clone(),
266 message,
267 });
268 }
269 }
270 Err(e) => {
271 if let Error::IOError(ref e) = e {
273 if e.kind() == io::ErrorKind::TimedOut
275 || e.kind() == io::ErrorKind::WouldBlock
276 {
277 continue;
278 }
279 }
280
281 error!("{:?} Error reading message {:?}", tpeer, e);
282 tpeer.disconnect();
283 return;
284 }
285 }
286 }
287 });
288 }
289
290 fn handshake(
291 self: &Peer,
292 version: Version,
293 connectable: fn(&Version) -> bool,
294 ) -> Result<TcpStream> {
295 let tcp_addr = SocketAddr::new(self.ip, self.port);
297 let mut tcp_stream = TcpStream::connect_timeout(&tcp_addr, CONNECT_TIMEOUT)?;
298 tcp_stream.set_nodelay(true)?; tcp_stream.set_read_timeout(Some(HANDSHAKE_READ_TIMEOUT))?;
300 tcp_stream.set_nonblocking(false)?;
301
302 let our_version = Message::Version(version);
304 debug!("{:?} Write {:#?}", self, our_version);
305 let magic = self.network.magic();
306 our_version.write(&mut tcp_stream, magic)?;
307
308 let msg = Message::read(&mut tcp_stream, magic)?;
310 debug!("{:?} Read {:#?}", self, msg);
311 let their_version = match msg {
312 Message::Version(version) => version,
313 _ => return Err(Error::BadData("Unexpected command".to_string())),
314 };
315
316 if !connectable(&their_version) {
317 return Err(Error::IllegalState("Peer is not connectable".to_string()));
318 }
319
320 let now = secs_since(UNIX_EPOCH) as i64;
321 *self.time_delta.lock().unwrap() = now - their_version.timestamp;
322 *self.version.lock().unwrap() = Some(their_version);
323
324 let their_verack = Message::read(&mut tcp_stream, magic)?;
326 debug!("{:?} Read {:#?}", self, their_verack);
327 match their_verack {
328 Message::Verack => {}
329 _ => return Err(Error::BadData("Unexpected command".to_string())),
330 };
331
332 debug!("{:?} Write {:#?}", self, Message::Verack);
334 Message::Verack.write(&mut tcp_stream, magic)?;
335
336 let ping = Message::Ping(Ping {
339 nonce: secs_since(UNIX_EPOCH) as u64,
340 });
341 debug!("{:?} Write {:#?}", self, ping);
342 ping.write(&mut tcp_stream, magic)?;
343
344 *self.tcp_writer.lock().unwrap() = Some(tcp_stream.try_clone()?);
346
347 tcp_stream.set_read_timeout(None)?;
350
351 Ok(tcp_stream)
352 }
353
354 fn handle_message(&self, message: &Message) -> Result<()> {
355 match message {
357 Message::FeeFilter(feefilter) => {
358 *self.minfee.lock().unwrap() = feefilter.minfee;
359 }
360 Message::Ping(ping) => {
361 let pong = Message::Pong(ping.clone());
362 self.send(&pong)?;
363 }
364 Message::SendHeaders => {
365 self.sendheaders.store(true, Ordering::Relaxed);
366 }
367 Message::SendCmpct(sendcmpct) => {
368 let enable = sendcmpct.use_cmpctblock();
369 self.sendcmpct.store(enable, Ordering::Relaxed);
370 }
371 _ => {}
372 }
373 Ok(())
374 }
375
376 fn strong_self(&self) -> Option<Arc<Peer>> {
377 match &*self.weak_self.lock().unwrap() {
378 Some(ref weak_peer) => weak_peer.upgrade(),
379 None => None,
380 }
381 }
382}
383
384impl PartialEq for Peer {
385 fn eq(&self, other: &Peer) -> bool {
386 self.id == other.id
387 }
388}
389
390impl Eq for Peer {}
391
392impl Hash for Peer {
393 fn hash<H: Hasher>(&self, state: &mut H) {
394 self.id.hash(state)
395 }
396}
397
398impl fmt::Debug for Peer {
399 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
400 f.write_str(&format!("[Peer {}]", self.id))
401 }
402}
403
404impl Drop for Peer {
405 fn drop(&mut self) {
406 self.disconnect();
407 }
408}