use messages::{Message, MessageHeader, Ping, Version};
use network::Network;
use peer::atomic_reader::AtomicReader;
use snowflake::ProcessUniqueId;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::io;
use std::io::Write;
use std::net::{IpAddr, Shutdown, SocketAddr, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::thread;
use std::time::{Duration, UNIX_EPOCH};
use util::rx::{Observable, Observer, Single, Subject};
use util::{secs_since, Error, Result};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const HANDSHAKE_READ_TIMEOUT: Duration = Duration::from_secs(3);
#[derive(Clone, Debug)]
pub struct PeerConnected {
pub peer: Arc<Peer>,
}
#[derive(Clone, Debug)]
pub struct PeerDisconnected {
pub peer: Arc<Peer>,
}
#[derive(Clone, Debug)]
pub struct PeerMessage {
pub peer: Arc<Peer>,
pub message: Message,
}
pub struct Peer {
pub id: ProcessUniqueId,
pub ip: IpAddr,
pub port: u16,
pub network: Network,
pub(crate) connected_event: Single<PeerConnected>,
pub(crate) disconnected_event: Single<PeerDisconnected>,
pub(crate) messages: Subject<PeerMessage>,
tcp_writer: Mutex<Option<TcpStream>>,
connected: AtomicBool,
time_delta: Mutex<i64>,
minfee: Mutex<u64>,
sendheaders: AtomicBool,
sendcmpct: AtomicBool,
version: Mutex<Option<Version>>,
weak_self: Mutex<Option<Weak<Peer>>>,
}
impl Peer {
pub fn connect(
ip: IpAddr,
port: u16,
network: Network,
version: Version,
min_start_height: i32,
required_services: u64,
) -> Arc<Peer> {
let peer = Arc::new(Peer {
id: ProcessUniqueId::new(),
ip,
port,
network,
connected_event: Single::new(),
disconnected_event: Single::new(),
messages: Subject::new(),
tcp_writer: Mutex::new(None),
connected: AtomicBool::new(false),
time_delta: Mutex::new(0),
minfee: Mutex::new(0),
sendheaders: AtomicBool::new(false),
sendcmpct: AtomicBool::new(false),
version: Mutex::new(None),
weak_self: Mutex::new(None),
});
*peer.weak_self.lock().unwrap() = Some(Arc::downgrade(&peer));
Peer::connect_internal(&peer, version, min_start_height, required_services);
peer
}
pub fn send(&self, message: &Message) -> Result<()> {
if !self.connected.load(Ordering::Relaxed) {
return Err(Error::IllegalState("Not connected".to_string()));
}
let mut io_error: Option<io::Error> = None;
{
let mut tcp_writer = self.tcp_writer.lock().unwrap();
let mut tcp_writer = match tcp_writer.as_mut() {
Some(tcp_writer) => tcp_writer,
None => return Err(Error::IllegalState("No tcp stream".to_string())),
};
debug!("{:?} Write {:#?}", self, message);
if let Err(e) = message.write(&mut tcp_writer, self.network.magic()) {
io_error = Some(e);
} else {
if let Err(e) = tcp_writer.flush() {
io_error = Some(e);
}
}
}
match io_error {
Some(e) => {
self.disconnect();
Err(Error::IOError(e))
}
None => Ok(()),
}
}
pub fn disconnect(&self) {
self.connected.swap(false, Ordering::Relaxed);
info!("{:?} Disconnecting", self);
let mut tcp_stream = self.tcp_writer.lock().unwrap();
if let Some(tcp_stream) = tcp_stream.as_mut() {
if let Err(e) = tcp_stream.shutdown(Shutdown::Both) {
warn!("{:?} Problem shutting down tcp stream: {:?}", self, e);
}
}
if let Some(peer) = self.strong_self() {
self.disconnected_event.next(&PeerDisconnected { peer });
}
}
pub fn connected_event(&self) -> &impl Observable<PeerConnected> {
&self.connected_event
}
pub fn disconnected_event(&self) -> &impl Observable<PeerDisconnected> {
&self.disconnected_event
}
pub fn messages(&self) -> &impl Observable<PeerMessage> {
&self.messages
}
pub fn connected(&self) -> bool {
self.connected.load(Ordering::Relaxed)
}
pub fn time_delta(&self) -> i64 {
*self.time_delta.lock().unwrap()
}
pub fn minfee(&self) -> u64 {
*self.minfee.lock().unwrap()
}
pub fn sendheaders(&self) -> bool {
self.sendheaders.load(Ordering::Relaxed)
}
pub fn sendcmpct(&self) -> bool {
self.sendcmpct.load(Ordering::Relaxed)
}
pub fn version(&self) -> Result<Version> {
match &*self.version.lock().unwrap() {
Some(ref version) => Ok(version.clone()),
None => Err(Error::IllegalState("Not connected".to_string())),
}
}
fn connect_internal(
peer: &Arc<Peer>,
version: Version,
min_start_height: i32,
required_services: u64,
) {
info!("{:?} Connecting to {:?}:{}", peer, peer.ip, peer.port);
let tpeer = peer.clone();
thread::spawn(move || {
let mut tcp_reader = match tpeer.handshake(version, min_start_height, required_services)
{
Ok(tcp_stream) => tcp_stream,
Err(e) => {
error!("Failed to complete handshake: {:?}", e);
tpeer.disconnect();
return;
}
};
info!("{:?} Connected to {:?}:{}", tpeer, tpeer.ip, tpeer.port);
tpeer.connected.store(true, Ordering::Relaxed);
tpeer.connected_event.next(&PeerConnected {
peer: tpeer.clone(),
});
let mut partial: Option<MessageHeader> = None;
let magic = tpeer.network.magic();
let mut tcp_reader = AtomicReader::new(&mut tcp_reader);
loop {
let message = match &partial {
Some(header) => Message::read_partial(&mut tcp_reader, header),
None => Message::read(&mut tcp_reader, magic),
};
if !tpeer.connected.load(Ordering::Relaxed) {
return;
}
match message {
Ok(message) => {
if let Message::Partial(header) = message {
partial = Some(header);
} else {
debug!("{:?} Read {:#?}", tpeer, message);
partial = None;
if let Err(e) = tpeer.handle_message(&message) {
error!("{:?} Error handling message: {:?}", tpeer, e);
tpeer.disconnect();
return;
}
tpeer.messages.next(&PeerMessage {
peer: tpeer.clone(),
message,
});
}
}
Err(e) => {
if let Error::IOError(ref e) = e {
if e.kind() == io::ErrorKind::TimedOut
|| e.kind() == io::ErrorKind::WouldBlock
{
continue;
}
}
error!("{:?} Error reading message {:?}", tpeer, e);
tpeer.disconnect();
return;
}
}
}
});
}
fn handshake(
self: &Peer,
version: Version,
min_start_height: i32,
required_services: u64,
) -> Result<TcpStream> {
let tcp_addr = SocketAddr::new(self.ip, self.port);
let mut tcp_stream = TcpStream::connect_timeout(&tcp_addr, CONNECT_TIMEOUT)?;
tcp_stream.set_nodelay(true)?; tcp_stream.set_read_timeout(Some(HANDSHAKE_READ_TIMEOUT))?;
tcp_stream.set_nonblocking(false)?;
let our_version = Message::Version(version);
debug!("{:?} Write {:#?}", self, our_version);
let magic = self.network.magic();
our_version.write(&mut tcp_stream, magic)?;
let msg = Message::read(&mut tcp_stream, magic)?;
debug!("{:?} Read {:#?}", self, msg);
let their_version = match msg {
Message::Version(version) => version,
_ => return Err(Error::BadData("Unexpected command".to_string())),
};
if their_version.start_height < min_start_height {
return Err(Error::IllegalState("Start height too old".to_string()));
}
if their_version.services & required_services != required_services {
return Err(Error::IllegalState("Required services missing".to_string()));
}
let now = secs_since(UNIX_EPOCH) as i64;
*self.time_delta.lock().unwrap() = now - their_version.timestamp;
*self.version.lock().unwrap() = Some(their_version);
let their_verack = Message::read(&mut tcp_stream, magic)?;
debug!("{:?} Read {:#?}", self, their_verack);
match their_verack {
Message::Verack => {}
_ => return Err(Error::BadData("Unexpected command".to_string())),
};
debug!("{:?} Write {:#?}", self, Message::Verack);
Message::Verack.write(&mut tcp_stream, magic)?;
let ping = Message::Ping(Ping {
nonce: secs_since(UNIX_EPOCH) as u64,
});
debug!("{:?} Write {:#?}", self, ping);
ping.write(&mut tcp_stream, magic)?;
*self.tcp_writer.lock().unwrap() = Some(tcp_stream.try_clone()?);
tcp_stream.set_read_timeout(None)?;
Ok(tcp_stream)
}
fn handle_message(&self, message: &Message) -> Result<()> {
match message {
&Message::FeeFilter(ref feefilter) => {
*self.minfee.lock().unwrap() = feefilter.minfee;
}
&Message::Ping(ref ping) => {
let pong = Message::Pong(ping.clone());
self.send(&pong)?;
}
&Message::SendHeaders => {
self.sendheaders.store(true, Ordering::Relaxed);
}
&Message::SendCmpct(ref sendcmpct) => {
let enable = sendcmpct.use_cmpctblock();
self.sendcmpct.store(enable, Ordering::Relaxed);
}
_ => {}
}
Ok(())
}
fn strong_self(&self) -> Option<Arc<Peer>> {
match &*self.weak_self.lock().unwrap() {
Some(ref weak_peer) => weak_peer.upgrade(),
None => None,
}
}
}
impl PartialEq for Peer {
fn eq(&self, other: &Peer) -> bool {
self.id == other.id
}
}
impl Eq for Peer {}
impl Hash for Peer {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state)
}
}
impl fmt::Debug for Peer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&format!("[Peer {}]", self.id))
}
}
impl Drop for Peer {
fn drop(&mut self) {
self.disconnect();
}
}