use std::sync::Arc;
use std::thread;
use std::time;
use crate::chain::{self, SyncState, SyncStatus};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::grin::sync::body_sync::BodySync;
use crate::grin::sync::header_sync::HeaderSync;
use crate::grin::sync::state_sync::StateSync;
use crate::p2p;
use crate::util::StopState;
pub fn run_sync(
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
stop_state: Arc<StopState>,
) -> std::io::Result<std::thread::JoinHandle<()>> {
thread::Builder::new()
.name("sync".to_string())
.spawn(move || {
let runner = SyncRunner::new(sync_state, peers, chain, stop_state);
runner.sync_loop();
})
}
pub struct SyncRunner {
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
stop_state: Arc<StopState>,
}
impl SyncRunner {
fn new(
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
stop_state: Arc<StopState>,
) -> SyncRunner {
SyncRunner {
sync_state,
peers,
chain,
stop_state,
}
}
fn wait_for_min_peers(&self) -> Result<(), chain::Error> {
let wait_secs = if let SyncStatus::AwaitingPeers(true) = self.sync_state.status() {
30
} else {
3
};
let head = self.chain.head()?;
let mut n = 0;
const MIN_PEERS: usize = 3;
loop {
if self.stop_state.is_stopped() {
break;
}
let wp = self
.peers
.iter()
.outbound()
.with_difficulty(|x| x >= head.total_difficulty)
.connected()
.count();
if wp > MIN_PEERS
|| (wp == 0
&& self.peers.enough_outbound_peers()
&& head.total_difficulty > Difficulty::zero())
|| n > wait_secs
{
if wp > 0 || !global::is_production_mode() {
break;
}
}
thread::sleep(time::Duration::from_secs(1));
n += 1;
}
Ok(())
}
fn sync_loop(&self) {
macro_rules! unwrap_or_restart_loop(
($obj: expr) =>(
match $obj {
Ok(v) => v,
Err(e) => {
error!("unexpected error: {:?}", e);
thread::sleep(time::Duration::from_secs(1));
continue;
},
}
));
if let Err(e) = self.wait_for_min_peers() {
error!("wait_for_min_peers failed: {:?}", e);
}
let mut header_sync = HeaderSync::new(
self.sync_state.clone(),
self.peers.clone(),
self.chain.clone(),
);
let mut body_sync = BodySync::new(
self.sync_state.clone(),
self.peers.clone(),
self.chain.clone(),
);
let mut state_sync = StateSync::new(
self.sync_state.clone(),
self.peers.clone(),
self.chain.clone(),
);
let mut highest_height = 0;
loop {
if self.stop_state.is_stopped() {
break;
}
thread::sleep(time::Duration::from_millis(10));
let currently_syncing = self.sync_state.is_syncing();
let (needs_syncing, most_work_height) = unwrap_or_restart_loop!(self.needs_syncing());
if most_work_height > 0 {
highest_height = most_work_height;
}
if !needs_syncing {
if currently_syncing {
self.sync_state.update(SyncStatus::NoSync);
unwrap_or_restart_loop!(self.chain.compact());
}
for _ in 1..10 {
thread::sleep(time::Duration::from_secs(1));
if self.stop_state.is_stopped() {
break;
}
}
continue;
}
let head = unwrap_or_restart_loop!(self.chain.head());
let tail = self.chain.tail().unwrap_or_else(|_| head.clone());
let header_head = unwrap_or_restart_loop!(self.chain.header_head());
let sync_status = self.sync_state.status();
let sync_head = match sync_status {
SyncStatus::HeaderSync { sync_head, .. } => sync_head,
_ => header_head,
};
unwrap_or_restart_loop!(header_sync.check_run(sync_head));
let mut check_state_sync = false;
match self.sync_state.status() {
SyncStatus::TxHashsetPibd { .. }
| SyncStatus::TxHashsetDownload { .. }
| SyncStatus::TxHashsetSetup { .. }
| SyncStatus::TxHashsetRangeProofsValidation { .. }
| SyncStatus::TxHashsetKernelsValidation { .. }
| SyncStatus::TxHashsetSave
| SyncStatus::TxHashsetDone => check_state_sync = true,
_ => {
if sync_head.height < highest_height {
continue;
}
let check_run =
unwrap_or_restart_loop!(body_sync.check_run(&head, highest_height));
if check_run {
check_state_sync = true;
}
}
}
if check_state_sync {
state_sync.check_run(
&header_head,
&head,
&tail,
highest_height,
self.stop_state.clone(),
);
}
}
}
fn needs_syncing(&self) -> Result<(bool, u64), chain::Error> {
let local_diff = self.chain.head()?.total_difficulty;
let mut is_syncing = self.sync_state.is_syncing();
let max_diff = self
.peers
.iter()
.connected()
.max_difficulty()
.unwrap_or(Difficulty::zero());
let peer = self
.peers
.iter()
.with_difficulty(|x| x >= max_diff)
.connected()
.choose_random();
let peer_info = if let Some(p) = peer {
p.info.clone()
} else {
warn!("sync: no peers available, disabling sync");
return Ok((false, 0));
};
if is_syncing {
if peer_info.total_difficulty() <= local_diff {
let ch = self.chain.head()?;
info!(
"synchronized at {} @ {} [{}]",
local_diff.to_num(),
ch.height,
ch.last_block_h
);
is_syncing = false;
}
} else {
let threshold = {
let diff_iter = match self.chain.difficulty_iter() {
Ok(v) => v,
Err(e) => {
error!("failed to get difficulty iterator: {:?}", e);
return Ok((false, 0));
}
};
diff_iter
.map(|x| x.difficulty)
.take(5)
.fold(Difficulty::zero(), |sum, val| sum + val)
};
let peer_diff = peer_info.total_difficulty();
if peer_diff > local_diff + threshold {
info!(
"sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync",
local_diff,
peer_diff,
threshold,
);
is_syncing = true;
}
}
Ok((is_syncing, peer_info.height()))
}
}