use crate::conn::Tracker;
use crate::msg::{read_message, write_message, Hand, Msg, Shake, TorAddress, Type, USER_AGENT};
use crate::mwc_core::core::hash::Hash;
use crate::mwc_core::pow::Difficulty;
use crate::mwc_core::ser::ProtocolVersion;
use crate::peer::Peer;
use crate::types::{
Capabilities, Direction, Error, P2PConfig, PeerAddr, PeerAddr::Ip, PeerAddr::Onion, PeerInfo,
PeerLiveInfo,
};
use crate::util::RwLock;
use mwc_core::global;
use rand::{thread_rng, Rng};
use std::collections::VecDeque;
use std::net::{SocketAddr, TcpStream};
use std::sync::Arc;
use std::time::Duration;
const NONCES_CAP: usize = 100;
const ADDRS_CAP: usize = 10;
const HAND_READ_TIMEOUT: Duration = Duration::from_millis(10_000);
const SHAKE_READ_TIMEOUT: Duration = Duration::from_millis(10_000);
const HAND_WRITE_TIMEOUT: Duration = Duration::from_millis(2_000);
const SHAKE_WRITE_TIMEOUT: Duration = Duration::from_millis(2_000);
pub struct Handshake {
nonces: Arc<RwLock<VecDeque<u64>>>,
pub addrs: Arc<RwLock<VecDeque<PeerAddr>>>,
genesis: Hash,
config: P2PConfig,
protocol_version: ProtocolVersion,
tracker: Arc<Tracker>,
onion_address: Option<String>,
}
impl Handshake {
pub fn new(genesis: Hash, config: P2PConfig, onion_address: Option<String>) -> Handshake {
Handshake {
nonces: Arc::new(RwLock::new(VecDeque::with_capacity(NONCES_CAP))),
addrs: Arc::new(RwLock::new(VecDeque::with_capacity(ADDRS_CAP))),
genesis,
config,
protocol_version: ProtocolVersion::local(),
tracker: Arc::new(Tracker::new()),
onion_address: onion_address,
}
}
fn negotiate_protocol_version(&self, other: ProtocolVersion) -> Result<ProtocolVersion, Error> {
let version = std::cmp::min(self.protocol_version, other);
Ok(version)
}
pub fn initiate(
&self,
capabilities: Capabilities,
total_difficulty: Difficulty,
self_addr: PeerAddr,
conn: &mut TcpStream,
peer_addr: Option<PeerAddr>,
) -> Result<PeerInfo, Error> {
let _ = conn.set_write_timeout(Some(HAND_WRITE_TIMEOUT));
let _ = conn.set_read_timeout(Some(SHAKE_READ_TIMEOUT));
let nonce = self.next_nonce();
let peer_addr = peer_addr.unwrap_or(match conn.peer_addr() {
Ok(addr) => PeerAddr::Ip(addr),
Err(e) => {
return Err(Error::ConnectionClose(format!(
"unable to get peer address, {}",
e
)))
}
});
let hand = Hand {
version: self.protocol_version,
capabilities,
nonce,
genesis: self.genesis,
total_difficulty,
sender_addr: self_addr.clone(),
receiver_addr: peer_addr.clone(),
user_agent: USER_AGENT.to_string(),
tx_fee_base: global::get_accept_fee_base(),
};
let msg = Msg::new(Type::Hand, hand, self.protocol_version)?;
write_message(conn, &vec![msg], self.tracker.clone())?;
let shake: Shake = read_message(conn, self.protocol_version, Type::Shake)?;
if shake.genesis != self.genesis {
return Err(Error::GenesisMismatch {
us: self.genesis,
peer: shake.genesis,
});
}
if shake.capabilities.contains(Capabilities::TOR_ADDRESS) && self.onion_address.is_some() {
let onion_address = self.onion_address.as_ref().unwrap().to_string();
debug!(
"Tor enabled peer {:?}, sending onion_address = {}",
self_addr, onion_address
);
let tor_address = TorAddress::new(onion_address);
let msg = Msg::new(Type::TorAddress, tor_address, self.protocol_version)?;
write_message(conn, &vec![msg], self.tracker.clone())?;
} else {
debug!("non-Tor peer {:?}", self_addr);
}
let negotiated_version = self.negotiate_protocol_version(shake.version)?;
let peer_info = PeerInfo {
capabilities: shake.capabilities,
user_agent: shake.user_agent,
addr: peer_addr,
version: negotiated_version,
live_info: Arc::new(RwLock::new(PeerLiveInfo::new(shake.total_difficulty))),
direction: if self.onion_address.is_some() {
Direction::OutboundTor
} else {
Direction::Outbound
},
tx_base_fee: shake.tx_fee_base,
};
if Peer::is_denied(&self.config, &peer_info.addr) {
return Err(Error::ConnectionClose(format!(
"{:?} is denied",
peer_info.addr
)));
}
debug!(
"Connected! Cumulative {} offered from {:?}, {:?}, {:?}, {:?}",
shake.total_difficulty.to_num(),
peer_info.addr,
peer_info.version,
peer_info.user_agent,
peer_info.capabilities,
);
Ok(peer_info)
}
pub fn accept(
&self,
capab: Capabilities,
total_difficulty: Difficulty,
conn: &mut TcpStream,
) -> Result<PeerInfo, Error> {
let _ = conn.set_read_timeout(Some(HAND_READ_TIMEOUT));
let _ = conn.set_write_timeout(Some(SHAKE_WRITE_TIMEOUT));
let hand: Hand = read_message(conn, self.protocol_version, Type::Hand)?;
if hand.genesis != self.genesis {
return Err(Error::GenesisMismatch {
us: self.genesis,
peer: hand.genesis,
});
} else {
let nonces = self.nonces.read();
let addr = resolve_peer_addr(hand.sender_addr.clone(), &conn);
if nonces.contains(&hand.nonce) {
let mut addrs = self.addrs.write();
addrs.push_back(addr);
if addrs.len() >= ADDRS_CAP {
addrs.pop_front();
}
return Err(Error::PeerWithSelf);
}
}
let negotiated_version = self.negotiate_protocol_version(hand.version)?;
let peer_info = PeerInfo {
capabilities: hand.capabilities,
user_agent: hand.user_agent,
addr: resolve_peer_addr(hand.sender_addr.clone(), &conn),
version: negotiated_version,
live_info: Arc::new(RwLock::new(PeerLiveInfo::new(hand.total_difficulty))),
direction: if self.onion_address.is_some() {
Direction::InboundTor
} else {
Direction::Inbound
},
tx_base_fee: hand.tx_fee_base,
};
if Peer::is_denied(&self.config, &peer_info.addr) {
return Err(Error::ConnectionClose(String::from(
"Peer denied because it is in config black list",
)));
}
let shake = Shake {
version: self.protocol_version,
capabilities: capab,
genesis: self.genesis,
total_difficulty: total_difficulty,
user_agent: USER_AGENT.to_string(),
tx_fee_base: global::get_accept_fee_base(),
};
let msg = Msg::new(Type::Shake, shake, negotiated_version)?;
write_message(conn, &vec![msg], self.tracker.clone())?;
trace!("Success handshake with {}.", peer_info.addr);
Ok(peer_info)
}
fn next_nonce(&self) -> u64 {
let nonce = thread_rng().gen();
let mut nonces = self.nonces.write();
nonces.push_back(nonce);
if nonces.len() >= NONCES_CAP {
nonces.pop_front();
}
nonce
}
}
fn resolve_peer_addr(advertised: PeerAddr, conn: &TcpStream) -> PeerAddr {
match advertised {
Ip(socket_addr) => {
let port = socket_addr.port();
if let Ok(addr) = conn.peer_addr() {
PeerAddr::Ip(SocketAddr::new(addr.ip(), port))
} else {
advertised
}
}
Onion(_) => advertised,
}
}