use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
use p2p::Capabilities;
use rand::prelude::*;
use std::cmp;
use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus, Tip};
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::BlockHeader;
use crate::p2p;
pub struct BodySync {
chain: Arc<chain::Chain>,
peers: Arc<p2p::Peers>,
sync_state: Arc<SyncState>,
blocks_requested: u64,
receive_timeout: DateTime<Utc>,
prev_blocks_received: u64,
}
impl BodySync {
pub fn new(
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
) -> BodySync {
BodySync {
sync_state,
peers,
chain,
blocks_requested: 0,
receive_timeout: Utc::now(),
prev_blocks_received: 0,
}
}
pub fn check_run(
&mut self,
head: &chain::Tip,
highest_height: u64,
) -> Result<bool, chain::Error> {
if self.body_sync_due()? {
if self.body_sync()? {
return Ok(true);
}
self.sync_state.update(SyncStatus::BodySync {
current_height: head.height,
highest_height: highest_height,
});
}
Ok(false)
}
fn archive_mode(&self) -> bool {
self.chain.archive_mode()
}
fn body_sync(&mut self) -> Result<bool, chain::Error> {
let head = self.chain.head()?;
let header_head = self.chain.header_head()?;
let fork_point = self.chain.fork_point()?;
if self.chain.check_txhashset_needed(&fork_point)? {
trace!(
"body_sync: cannot sync full blocks earlier than horizon. will request txhashset",
);
return Ok(true);
}
let peers = {
let peers_iter = || {
let cap = if self.archive_mode() {
Capabilities::BLOCK_HIST
} else {
Capabilities::UNKNOWN
};
self.peers
.iter()
.with_capabilities(cap)
.with_difficulty(|x| x > head.total_difficulty)
.connected()
};
let mut peers: Vec<_> = peers_iter().outbound().into_iter().collect();
if peers.is_empty() {
debug!("no outbound peers with more work, considering inbound");
peers = peers_iter().inbound().into_iter().collect();
}
if peers.is_empty() {
debug!("no peers (inbound or outbound) with more work");
return Ok(false);
}
peers
};
let block_count = cmp::min(
cmp::min(100, peers.len() * 10),
chain::MAX_ORPHAN_SIZE.saturating_sub(self.chain.orphans_len()) + 1,
);
let hashes = self.block_hashes_to_sync(&fork_point, &header_head, block_count as u64)?;
if !hashes.is_empty() {
debug!(
"block_sync: {}/{} requesting blocks {:?} from {} peers",
head.height,
header_head.height,
hashes,
peers.len(),
);
self.blocks_requested = 0;
self.receive_timeout = Utc::now() + Duration::seconds(6);
let mut rng = rand::thread_rng();
for hash in hashes {
if let Some(peer) = peers.choose(&mut rng) {
if let Err(e) = peer.send_block_request(hash, chain::Options::SYNC) {
debug!("Skipped request to {}: {:?}", peer.info.addr, e);
peer.stop();
} else {
self.blocks_requested += 1;
}
}
}
}
return Ok(false);
}
fn block_hashes_to_sync(
&self,
fork_point: &BlockHeader,
header_head: &Tip,
count: u64,
) -> Result<Vec<Hash>, chain::Error> {
let mut hashes = vec![];
let max_height = cmp::min(fork_point.height + count, header_head.height);
let mut current = self.chain.get_header_by_height(max_height)?;
while current.height > fork_point.height {
if !self.chain.is_orphan(¤t.hash()) {
hashes.push(current.hash());
}
current = self.chain.get_previous_header(¤t)?;
}
hashes.reverse();
Ok(hashes)
}
fn body_sync_due(&mut self) -> Result<bool, chain::Error> {
let blocks_received = self.blocks_received()?;
if self.blocks_requested > 0 {
let timeout = Utc::now() > self.receive_timeout;
if timeout && blocks_received <= self.prev_blocks_received {
debug!(
"body_sync: expecting {} more blocks and none received for a while",
self.blocks_requested,
);
return Ok(true);
}
}
if blocks_received > self.prev_blocks_received {
self.receive_timeout = Utc::now() + Duration::seconds(1);
self.blocks_requested = self
.blocks_requested
.saturating_sub(blocks_received - self.prev_blocks_received);
self.prev_blocks_received = blocks_received;
}
if self.blocks_requested < 2 {
trace!("body_sync: no pending block request, asking more");
return Ok(true);
}
Ok(false)
}
fn blocks_received(&self) -> Result<u64, chain::Error> {
Ok((self.chain.head()?).height
+ self.chain.orphans_len() as u64
+ self.chain.orphans_evicted_len() as u64)
}
}