use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
use std::sync::Arc;
use crate::chain::{self, pibd_params, SyncState, SyncStatus};
use crate::core::core::{hash::Hashed, pmmr::segment::SegmentType};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::p2p::{self, Capabilities, Peer};
use crate::util::StopState;
pub struct StateSync {
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
prev_state_sync: Option<DateTime<Utc>>,
state_sync_peer: Option<Arc<Peer>>,
pibd_aborted: bool,
earliest_zero_pibd_peer_time: Option<DateTime<Utc>>,
}
impl StateSync {
pub fn new(
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
) -> StateSync {
StateSync {
sync_state,
peers,
chain,
prev_state_sync: None,
state_sync_peer: None,
pibd_aborted: false,
earliest_zero_pibd_peer_time: None,
}
}
pub fn set_earliest_zero_pibd_peer_time(&mut self, t: Option<DateTime<Utc>>) {
self.earliest_zero_pibd_peer_time = t;
}
pub fn set_pibd_aborted(&mut self) {
self.pibd_aborted = true;
}
pub fn check_run(
&mut self,
header_head: &chain::Tip,
head: &chain::Tip,
tail: &chain::Tip,
highest_height: u64,
stop_state: Arc<StopState>,
) -> bool {
trace!("state_sync: head.height: {}, tail.height: {}. header_head.height: {}, highest_height: {}",
head.height, tail.height, header_head.height, highest_height,
);
let mut sync_need_restart = false;
if let Some(sync_error) = self.sync_state.sync_error() {
error!("state_sync: error = {}. restart fast sync", sync_error);
sync_need_restart = true;
}
let using_pibd = !matches!(
self.sync_state.status(),
SyncStatus::TxHashsetPibd { aborted: true, .. },
) && !self.pibd_aborted;
if using_pibd {
if let SyncStatus::TxHashsetPibd { errored: true, .. } = self.sync_state.status() {
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
error!("PIBD Reported Failure - Restarting Sync");
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
if let Some(d) = desegmenter.write().as_mut() {
d.reset();
};
if let Err(e) = self.chain.reset_pibd_head() {
error!("pibd_sync restart: reset pibd_head error = {}", e);
}
if let Err(e) = self.chain.reset_chain_head_to_genesis() {
error!("pibd_sync restart: chain reset to genesis error = {}", e);
}
if let Err(e) = self.chain.reset_prune_lists() {
error!("pibd_sync restart: reset prune lists error = {}", e);
}
self.sync_state
.update_pibd_progress(false, false, 0, 1, &archive_header);
sync_need_restart = true;
}
}
if !using_pibd {
if let Some(ref peer) = self.state_sync_peer {
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if !peer.is_connected() {
sync_need_restart = true;
info!(
"state_sync: peer connection lost: {:?}. restart",
peer.info.addr,
);
}
}
}
}
let done = self.sync_state.update_if(
SyncStatus::BodySync {
current_height: 0,
highest_height: 0,
},
|s| match s {
SyncStatus::TxHashsetDone => true,
_ => false,
},
);
if sync_need_restart || done {
self.state_sync_reset();
self.sync_state.clear_sync_error();
}
if done {
return false;
}
if sync_need_restart || header_head.height == highest_height {
if using_pibd {
if sync_need_restart {
return true;
}
let (launch, _download_timeout) = self.state_sync_due();
let archive_header = { self.chain.txhashset_archive_header_header_only().unwrap() };
if launch {
self.sync_state
.update_pibd_progress(false, false, 0, 1, &archive_header);
}
if self.continue_pibd() {
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
if let Some(d) = desegmenter.write().as_mut() {
if let Ok(true) = d.check_progress(self.sync_state.clone()) {
if let Err(e) = d.check_update_leaf_set_state() {
error!("error updating PIBD leaf set: {}", e);
self.sync_state.update_pibd_progress(
false,
true,
0,
1,
&archive_header,
);
return false;
}
if let Err(e) = d.validate_complete_state(
self.sync_state.clone(),
stop_state.clone(),
) {
error!("error validating PIBD state: {}", e);
self.sync_state.update_pibd_progress(
false,
true,
0,
1,
&archive_header,
);
return false;
}
return true;
}
};
}
} else {
let (go, download_timeout) = self.state_sync_due();
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if download_timeout {
error!("state_sync: TxHashsetDownload status timeout in 10 minutes!");
self.sync_state
.set_sync_error(chain::Error::SyncError(format!(
"{:?}",
p2p::Error::Timeout
)));
}
}
if go {
self.state_sync_peer = None;
match self.request_state(&header_head) {
Ok(peer) => {
self.state_sync_peer = Some(peer);
}
Err(e) => self
.sync_state
.set_sync_error(chain::Error::SyncError(format!("{:?}", e))),
}
self.sync_state
.update(SyncStatus::TxHashsetDownload(Default::default()));
}
}
}
true
}
fn continue_pibd(&mut self) -> bool {
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
let stale_segments = self
.sync_state
.remove_stale_pibd_requests(pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS);
if let Some(mut de) = desegmenter.try_write() {
if let Some(d) = de.as_mut() {
let res = d.apply_next_segments();
if let Err(e) = res {
error!("error applying segment: {}", e);
self.sync_state
.update_pibd_progress(false, true, 0, 1, &archive_header);
return false;
}
}
}
let mut next_segment_ids = vec![];
if let Some(d) = desegmenter.write().as_mut() {
if let Ok(true) = d.check_progress(self.sync_state.clone()) {
return true;
}
next_segment_ids = d.next_desired_segments(pibd_params::SEGMENT_REQUEST_COUNT);
if !next_segment_ids.is_empty() {
debug!(
"state_sync: requesting next PIBD segments {:?}",
next_segment_ids
);
} else {
trace!("state_sync: no PIBD segments requested this loop");
}
}
for seg_id in next_segment_ids.iter() {
if self.sync_state.contains_pibd_segment(seg_id) {
debug!(
"state_sync: segment {:?} already requested, waiting for response",
seg_id
);
trace!("Request list contains, continuing: {:?}", seg_id);
continue;
}
let peers = self.peers.clone();
let peers_iter = || peers.iter().connected();
let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero());
let peers_iter_max = || peers_iter().with_difficulty(|x| x >= max_diff);
let peers_iter_pibd = || {
peers_iter_max()
.with_capabilities(Capabilities::PIBD_HIST_1)
.connected()
};
if peers_iter_pibd().count() == 0 {
if let None = self.earliest_zero_pibd_peer_time {
self.set_earliest_zero_pibd_peer_time(Some(Utc::now()));
}
if self.earliest_zero_pibd_peer_time.unwrap()
+ Duration::seconds(pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS)
< Utc::now()
{
info!("No PIBD-enabled max-difficulty peers for the past {} seconds - Aborting PIBD and falling back to TxHashset.zip download", pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS);
self.sync_state
.update_pibd_progress(true, true, 0, 1, &archive_header);
self.sync_state
.set_sync_error(chain::Error::AbortingPIBDError);
self.set_pibd_aborted();
return false;
}
} else {
self.set_earliest_zero_pibd_peer_time(None)
}
let excluded_peer = stale_segments
.iter()
.find(|(stale_id, _)| stale_id == seg_id)
.and_then(|(_, addr)| *addr);
let peer = peers_iter_pibd()
.outbound()
.exclude(excluded_peer)
.choose_random()
.or_else(|| {
peers_iter_pibd()
.inbound()
.exclude(excluded_peer)
.choose_random()
});
trace!("Chosen peer is {:?}", peer);
if let Some(p) = peer {
self.sync_state.add_pibd_segment(seg_id, p.info.addr.0);
let res = match seg_id.segment_type {
SegmentType::Bitmap => p.send_bitmap_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Output => p.send_output_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::RangeProof => p.send_rangeproof_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Kernel => p.send_kernel_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
};
if let Err(e) = res {
info!(
"Error sending request to peer at {}, reason: {:?}",
p.info.addr, e
);
self.sync_state.remove_pibd_segment(seg_id);
} else if let Some(prev_peer) = excluded_peer {
if p.info.addr.0 != prev_peer {
info!(
"state_sync: retrying segment {:?} with new peer {} (previously {})",
seg_id, p.info.addr, prev_peer
);
} else {
debug!(
"state_sync: requested segment {:?} from peer {}",
seg_id, p.info.addr
);
}
} else {
debug!(
"state_sync: requested segment {:?} from peer {}",
seg_id, p.info.addr
);
}
}
}
false
}
fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<Peer>, p2p::Error> {
let threshold = global::state_sync_threshold() as u64;
let archive_interval = global::txhashset_archive_interval();
let mut txhashset_height = header_head.height.saturating_sub(threshold);
txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval);
let peers_iter = || {
self.peers
.iter()
.with_capabilities(Capabilities::TXHASHSET_HIST)
.connected()
};
let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero());
let peers_iter = || peers_iter().with_difficulty(|x| x >= max_diff);
let peer = peers_iter().outbound().choose_random().or_else(|| {
warn!("no suitable outbound peer for state sync, considering inbound");
peers_iter().inbound().choose_random()
});
if let Some(peer) = peer {
let mut txhashset_head = self
.chain
.get_block_header(&header_head.prev_block_h)
.map_err(|e| {
error!(
"chain error during getting a block header {}: {:?}",
&header_head.prev_block_h, e
);
p2p::Error::Internal
})?;
while txhashset_head.height > txhashset_height {
txhashset_head = self
.chain
.get_previous_header(&txhashset_head)
.map_err(|e| {
error!(
"chain error during getting a previous block header {}: {:?}",
txhashset_head.hash(),
e
);
p2p::Error::Internal
})?;
}
let bhash = txhashset_head.hash();
debug!(
"state_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}",
header_head.height,
header_head.last_block_h,
txhashset_head.height,
bhash
);
if let Err(e) = peer.send_txhashset_request(txhashset_head.height, bhash) {
error!("state_sync: send_txhashset_request err! {:?}", e);
return Err(e);
}
return Ok(peer);
}
Err(p2p::Error::PeerException)
}
fn state_sync_due(&mut self) -> (bool, bool) {
let now = Utc::now();
let mut download_timeout = false;
match self.prev_state_sync {
None => {
self.prev_state_sync = Some(now);
(true, download_timeout)
}
Some(prev) => {
if now - prev > Duration::minutes(10) {
download_timeout = true;
}
(false, download_timeout)
}
}
}
fn state_sync_reset(&mut self) {
self.prev_state_sync = None;
self.state_sync_peer = None;
}
}