use crate::serv::Server;
use crate::util::{Mutex, RwLock};
use std::fmt;
use std::net::{Shutdown, TcpStream};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use lru::LruCache;
use crate::chain;
use crate::chain::txhashset::BitmapChunk;
use crate::conn;
use crate::handshake::Handshake;
use crate::msg::{
self, ArchiveHeaderData, BanReason, GetPeerAddrs, HashHeadersData, Locator, Msg, Ping,
SegmentRequest, Type,
};
use crate::mwc_core::core::hash::{Hash, Hashed};
use crate::mwc_core::core::{OutputIdentifier, Segment, SegmentIdentifier, TxKernel};
use crate::mwc_core::pow::Difficulty;
use crate::mwc_core::ser::Writeable;
use crate::mwc_core::{core, global};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead,
};
use crate::util::secp::pedersen::RangeProof;
use chrono::prelude::Utc;
use mwc_chain::txhashset::Segmenter;
use mwc_chain::SyncState;
const MAX_TRACK_SIZE: usize = 200;
const MAX_PEER_MSG_PER_MIN: u64 = 1000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
Connected,
Banned,
}
pub struct Peer {
pub info: PeerInfo,
state: Arc<RwLock<State>>,
tracking_adapter: TrackingAdapter,
tracker: Arc<conn::Tracker>,
send_handle: Mutex<conn::ConnHandle>,
stop_handle: Mutex<conn::StopHandle>,
}
impl fmt::Debug for Peer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Peer({:?})", &self.info)
}
}
impl Peer {
fn new(
info: PeerInfo,
conn: TcpStream,
adapter: Arc<dyn NetAdapter>,
sync_state: Arc<SyncState>,
server: Server,
) -> std::io::Result<Peer> {
let state = Arc::new(RwLock::new(State::Connected));
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone(), server);
let tracker = Arc::new(conn::Tracker::new());
let (sendh, stoph) =
conn::listen(conn, info.version, tracker.clone(), sync_state, handler)?;
let send_handle = Mutex::new(sendh);
let stop_handle = Mutex::new(stoph);
Ok(Peer {
info,
state,
tracking_adapter,
tracker,
send_handle,
stop_handle,
})
}
pub fn accept(
mut conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
sync_state: Arc<SyncState>,
server: Server,
) -> Result<Peer, Error> {
debug!("accept: handshaking from {:?}", conn.peer_addr());
let info = hs.accept(capab, total_difficulty, &mut conn);
match info {
Ok(info) => Ok(Peer::new(info, conn, adapter, sync_state, server)?),
Err(e) => {
debug!(
"accept: handshaking from {:?} failed with error: {:?}",
conn.peer_addr(),
e
);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
Err(e)
}
}
}
pub fn connect(
mut conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: PeerAddr,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
peer_addr: Option<PeerAddr>,
sync_state: Arc<SyncState>,
server: Server,
) -> Result<Peer, Error> {
debug!("connect: handshaking with {:?}", self_addr);
let info = if peer_addr.is_some() {
hs.initiate(
capab,
total_difficulty,
self_addr,
&mut conn,
Some(peer_addr.clone().unwrap()),
)
} else {
hs.initiate(capab, total_difficulty, self_addr, &mut conn, None)
};
match info {
Ok(info) => Ok(Peer::new(info, conn, adapter, sync_state, server)?),
Err(e) => {
if peer_addr.is_some() {
debug!(
"connect: handshaking with {:?} failed with error: {:?}",
peer_addr.unwrap(),
e
);
} else {
debug!(
"connect: handshaking with {:?} failed with error: {:?}",
conn.peer_addr(),
e
);
}
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
Err(e)
}
}
}
pub fn is_denied(config: &P2PConfig, peer_addr: &PeerAddr) -> bool {
if let Some(ref denied) = config.peers_deny {
if denied.peers.contains(peer_addr) {
debug!(
"checking peer allowed/denied: {:?} explicitly denied",
peer_addr
);
return true;
}
}
if let Some(ref allowed) = config.peers_allow {
if allowed.peers.contains(peer_addr) {
debug!(
"checking peer allowed/denied: {:?} explicitly allowed",
peer_addr
);
return false;
} else {
debug!(
"checking peer allowed/denied: {:?} not explicitly allowed, denying",
peer_addr
);
return true;
}
}
false
}
pub fn is_connected(&self) -> bool {
State::Connected == *self.state.read()
}
pub fn is_banned(&self) -> bool {
State::Banned == *self.state.read()
}
pub fn is_stuck(&self) -> (bool, Difficulty) {
let peer_live_info = self.info.live_info.read();
let now = Utc::now().timestamp_millis();
if now > peer_live_info.stuck_detector.timestamp_millis() + global::STUCK_PEER_KICK_TIME {
(true, peer_live_info.total_difficulty)
} else {
(false, peer_live_info.total_difficulty)
}
}
pub fn is_abusive(&self) -> bool {
let rec = self.tracker().received_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN
}
pub fn tracker(&self) -> &conn::Tracker {
&self.tracker
}
pub fn set_banned(&self) {
*self.state.write() = State::Banned;
}
fn send<T: Writeable>(&self, msg: T, msg_type: Type) -> Result<(), Error> {
let msg = Msg::new(msg_type, msg, self.info.version)?;
self.send_handle.lock().send(msg)
}
pub fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> {
let ping_msg = Ping {
total_difficulty,
height,
};
self.send(ping_msg, msg::Type::Ping)
}
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> {
let ban_reason_msg = BanReason { ban_reason };
self.send(ban_reason_msg, msg::Type::BanReason).map(|_| ())
}
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr);
self.send(b, msg::Type::CompactBlock)?;
Ok(true)
} else {
debug!(
"Suppress compact block send {} to {} (already seen)",
b.hash(),
self.info.addr,
);
Ok(false)
}
}
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr);
self.send(bh, msg::Type::Header)?;
Ok(true)
} else {
debug!(
"Suppress header send {} to {} (already seen)",
bh.hash(),
self.info.addr,
);
Ok(false)
}
}
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
self.send(h, msg::Type::TransactionKernel)?;
Ok(true)
} else {
debug!(
"Not sending tx kernel hash {} to {} (already seen)",
h, self.info.addr
);
Ok(false)
}
}
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<bool, Error> {
let kernel = &tx.kernels()[0];
if self
.info
.capabilities
.contains(Capabilities::TX_KERNEL_HASH)
{
return self.send_tx_kernel_hash(kernel.hash());
}
if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
self.send(tx, msg::Type::Transaction)?;
Ok(true)
} else {
debug!(
"Not sending tx {} to {} (already seen)",
tx.hash(),
self.info.addr
);
Ok(false)
}
}
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr);
self.send(tx, msg::Type::StemTransaction)
}
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}
pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
debug!(
"Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr
);
self.send(&h, msg::Type::GetTransaction)
}
pub fn send_block_request(&self, h: Hash, opts: chain::Options) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h, opts);
self.send(&h, msg::Type::GetBlock)
}
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting compact block {} from {}", h, self.info.addr);
self.send(&h, msg::Type::GetCompactBlock)
}
pub fn send_peer_request(
&self,
capab: Capabilities,
use_tor_connection: bool,
) -> Result<(), Error> {
let capab = if use_tor_connection {
capab | Capabilities::TOR_ADDRESS
} else {
capab
};
trace!("Asking {} for more peers {:?}", self.info.addr, capab);
self.send(
&GetPeerAddrs {
capabilities: capab,
},
msg::Type::GetPeerAddrs,
)
}
pub fn send_start_pibd_sync_request(&self, height: u64, hash: Hash) -> Result<(), Error> {
info!(
"Asking peer {} for pibd sync at {} {}.",
self.info.addr, height, hash
);
self.send(
&ArchiveHeaderData { hash, height },
msg::Type::StartPibdSyncRequest,
)
}
pub fn send_start_headers_hash_sync_request(&self, archive_height: u64) -> Result<(), Error> {
info!(
"Asking peer {} for headers hash sync for archive_height {}.",
self.info.addr, archive_height
);
self.send(
&HashHeadersData { archive_height },
msg::Type::StartHeadersHashRequest,
)
}
pub fn send_headers_hash_segment_request(
&self,
headers_hash_root: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
debug!(
"Requesting peer {} for headers hashs, root hash {}, id {}",
self.info.addr, headers_hash_root, identifier
);
self.send(
&SegmentRequest {
block_hash: headers_hash_root,
identifier,
},
msg::Type::GetHeadersHashesSegment,
)
}
pub fn send_bitmap_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
debug!(
"Requesting peer {} for outputs bitmap, hash {}, id {}",
self.info.addr, h, identifier
);
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetOutputBitmapSegment,
)
}
pub fn send_output_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
debug!(
"Requesting peer {} for outputs, hash {}, id {}",
self.info.addr, h, identifier
);
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetOutputSegment,
)
}
pub fn send_rangeproof_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
debug!(
"Requesting peer {} for rangeproofs, hash {}, id {}",
self.info.addr, h, identifier
);
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetRangeProofSegment,
)
}
pub fn send_kernel_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
debug!(
"Requesting peer {} for kernels, hash {}, id {}",
self.info.addr, h, identifier
);
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetKernelSegment,
)
}
pub fn stop(&self) {
debug!("Stopping peer {:?}", self.info.addr);
match self.stop_handle.try_lock() {
Some(handle) => handle.stop(),
None => error!("can't get stop lock for peer"),
}
}
pub fn wait(&self) {
debug!("Waiting for peer {:?} to stop", self.info.addr);
match self.stop_handle.try_lock() {
Some(mut handle) => handle.wait(),
None => error!("can't get stop lock for peer"),
}
}
}
#[derive(Clone)]
struct TrackingAdapter {
adapter: Arc<dyn NetAdapter>,
received: Arc<RwLock<LruCache<Hash, ()>>>,
requested: Arc<RwLock<LruCache<Hash, chain::Options>>>,
}
impl TrackingAdapter {
fn new(adapter: Arc<dyn NetAdapter>) -> TrackingAdapter {
TrackingAdapter {
adapter: adapter,
received: Arc::new(RwLock::new(LruCache::new(
NonZeroUsize::new(MAX_TRACK_SIZE).unwrap(),
))),
requested: Arc::new(RwLock::new(LruCache::new(
NonZeroUsize::new(MAX_TRACK_SIZE).unwrap(),
))),
}
}
fn has_recv(&self, hash: Hash) -> bool {
self.received.read().contains(&hash)
}
fn push_recv(&self, hash: Hash) {
self.received.write().put(hash, ());
}
fn push_req(&self, hash: Hash, opts: chain::Options) {
self.requested.write().put(hash, opts);
}
fn req_opts(&self, hash: Hash) -> Option<chain::Options> {
self.requested.write().get(&hash).cloned()
}
}
impl ChainAdapter for TrackingAdapter {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
self.adapter.total_difficulty()
}
fn total_height(&self) -> Result<u64, chain::Error> {
self.adapter.total_height()
}
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.adapter.get_transaction(kernel_hash)
}
fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, peer_info)
}
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
if !stem {
let kernel = &tx.kernels()[0];
self.push_recv(kernel.hash());
}
self.adapter.transaction_received(tx, stem)
}
fn block_received(
&self,
b: core::Block,
peer_info: &PeerInfo,
opts: chain::Options,
) -> Result<bool, chain::Error> {
let bh = b.hash();
self.push_recv(bh);
let req_opts = self.req_opts(bh).unwrap_or(opts);
self.adapter.block_received(b, peer_info, req_opts)
}
fn compact_block_received(
&self,
cb: core::CompactBlock,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, peer_info)
}
fn header_received(
&self,
bh: core::BlockHeader,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(bh.hash());
self.adapter.header_received(bh, peer_info)
}
fn header_locator(&self) -> Result<Vec<Hash>, chain::Error> {
self.adapter.header_locator()
}
fn headers_received(
&self,
bh: &[core::BlockHeader],
remaining: u64,
peer_info: &PeerInfo,
) -> Result<(), chain::Error> {
self.adapter.headers_received(bh, remaining, peer_info)
}
fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(locator)
}
fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option<core::Block> {
self.adapter.get_block(h, peer_info)
}
fn txhashset_read(&self, h: Hash) -> Option<TxHashSetRead> {
self.adapter.txhashset_read(h)
}
fn txhashset_archive_header(&self) -> Result<core::BlockHeader, chain::Error> {
self.adapter.txhashset_archive_header()
}
fn get_tmp_dir(&self) -> PathBuf {
self.adapter.get_tmp_dir()
}
fn get_tmpfile_pathname(&self, tmpfile_name: String) -> PathBuf {
self.adapter.get_tmpfile_pathname(tmpfile_name)
}
fn recieve_pibd_status(
&self,
peer: &PeerAddr,
header_hash: Hash,
header_height: u64,
output_bitmap_root: Hash,
) -> Result<(), chain::Error> {
self.adapter
.recieve_pibd_status(peer, header_hash, header_height, output_bitmap_root)
}
fn recieve_another_archive_header(
&self,
peer: &PeerAddr,
header_hash: Hash,
header_height: u64,
) -> Result<(), chain::Error> {
self.adapter
.recieve_another_archive_header(peer, header_hash, header_height)
}
fn receive_headers_hash_response(
&self,
peer: &PeerAddr,
archive_height: u64,
headers_hash_root: Hash,
) -> Result<(), chain::Error> {
self.adapter
.receive_headers_hash_response(peer, archive_height, headers_hash_root)
}
fn get_header_hashes_segment(
&self,
header_hashes_root: Hash,
id: SegmentIdentifier,
) -> Result<Segment<Hash>, chain::Error> {
self.adapter
.get_header_hashes_segment(header_hashes_root, id)
}
fn receive_header_hashes_segment(
&self,
peer: &PeerAddr,
header_hashes_root: Hash,
segment: Segment<Hash>,
) -> Result<(), chain::Error> {
self.adapter
.receive_header_hashes_segment(peer, header_hashes_root, segment)
}
fn prepare_segmenter(&self) -> Result<Segmenter, chain::Error> {
self.adapter.prepare_segmenter()
}
fn get_kernel_segment(
&self,
hash: Hash,
id: SegmentIdentifier,
) -> Result<Segment<TxKernel>, chain::Error> {
self.adapter.get_kernel_segment(hash, id)
}
fn get_bitmap_segment(
&self,
hash: Hash,
id: SegmentIdentifier,
) -> Result<Segment<BitmapChunk>, chain::Error> {
self.adapter.get_bitmap_segment(hash, id)
}
fn get_output_segment(
&self,
hash: Hash,
id: SegmentIdentifier,
) -> Result<Segment<OutputIdentifier>, chain::Error> {
self.adapter.get_output_segment(hash, id)
}
fn get_rangeproof_segment(
&self,
hash: Hash,
id: SegmentIdentifier,
) -> Result<Segment<RangeProof>, chain::Error> {
self.adapter.get_rangeproof_segment(hash, id)
}
fn receive_bitmap_segment(
&self,
peer: &PeerAddr,
archive_header_hash: Hash,
segment: Segment<BitmapChunk>,
) -> Result<(), chain::Error> {
self.adapter
.receive_bitmap_segment(peer, archive_header_hash, segment)
}
fn receive_output_segment(
&self,
peer: &PeerAddr,
archive_header_hash: Hash,
segment: Segment<OutputIdentifier>,
) -> Result<(), chain::Error> {
self.adapter
.receive_output_segment(peer, archive_header_hash, segment)
}
fn receive_rangeproof_segment(
&self,
peer: &PeerAddr,
archive_header_hash: Hash,
segment: Segment<RangeProof>,
) -> Result<(), chain::Error> {
self.adapter
.receive_rangeproof_segment(peer, archive_header_hash, segment)
}
fn receive_kernel_segment(
&self,
peer: &PeerAddr,
archive_header_hash: Hash,
segment: Segment<TxKernel>,
) -> Result<(), chain::Error> {
self.adapter
.receive_kernel_segment(peer, archive_header_hash, segment)
}
fn peer_difficulty(&self, addr: &PeerAddr, diff: Difficulty, height: u64) {
self.adapter.peer_difficulty(addr, diff, height)
}
}
impl NetAdapter for TrackingAdapter {
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<PeerAddr> {
self.adapter.find_peer_addrs(capab)
}
fn peer_addrs_received(&self, addrs: Vec<PeerAddr>) {
self.adapter.peer_addrs_received(addrs)
}
fn is_banned(&self, addr: &PeerAddr) -> bool {
self.adapter.is_banned(addr)
}
fn ban_peer(&self, addr: &PeerAddr, ban_reason: ReasonForBan, message: &str) {
self.adapter.ban_peer(addr, ban_reason, message)
}
}