use ckb_app_config::ImportSource;
use ckb_chain::ChainController;
use ckb_jsonrpc_types::BlockView as JsonBlock;
use ckb_shared::Shared;
use ckb_store::ChainStore;
use ckb_types::core;
use ckb_verification_traits::Switch;
#[cfg(feature = "progress_bar")]
use indicatif::{ProgressBar, ProgressStyle};
use rayon::prelude::*;
use std::error::Error;
use std::fs;
use std::io;
use std::io::BufRead;
use std::sync::Arc;
pub struct Import {
source: ImportSource,
shared: Shared,
chain: ChainController,
switch: Switch,
num_threads: usize,
}
impl Import {
pub fn new(
chain: ChainController,
shared: Shared,
source: ImportSource,
switch: Switch,
num_threads: usize,
) -> Self {
Import {
chain,
shared,
source,
switch,
num_threads,
}
}
pub fn execute(self) -> Result<(), Box<dyn Error>> {
{
let snapshot = self.shared.snapshot();
let tip = snapshot.tip_header();
println!(
"Before import, current tip is {}-{}",
tip.number(),
tip.hash()
);
}
self.read_from_json()?;
{
let snapshot = self.shared.snapshot();
let tip = snapshot.tip_header();
println!(
"After import, Current tip is {}-{}",
tip.number(),
tip.hash()
);
}
Ok(())
}
pub fn read_from_json(&self) -> Result<(), Box<dyn Error>> {
use std::io::Read;
use ckb_chain::VerifyResult;
use ckb_types::core::BlockView;
while self.chain.is_verifying_unverified_blocks_on_startup() {
std::thread::sleep(std::time::Duration::from_millis(10));
}
let f: Box<dyn Read + Send> = match &self.source {
ImportSource::Path(source) => Box::new(fs::File::open(source)?),
ImportSource::Stdin => {
Box::new(std::io::stdin())
}
};
let reader = io::BufReader::new(f);
let mut lines = reader.lines().peekable();
let first_block = if let Some(Ok(first_line)) = lines.peek() {
let first_block: JsonBlock =
serde_json::from_str(first_line).expect("parse first block from json");
let first_block: core::BlockView = first_block.into();
Ok(first_block)
} else {
Err(Box::new(io::Error::new(
io::ErrorKind::InvalidData,
"The source file is empty or malformed.",
)))
}?;
if !first_block.is_genesis() {
let first_block_parent = first_block.parent_hash();
if self
.shared
.snapshot()
.get_block(&first_block_parent)
.is_none()
{
let tip = self
.shared
.snapshot()
.get_tip_header()
.expect("must get tip header");
let source_display = match self.source {
ImportSource::Path(ref path) => path.display().to_string(),
ImportSource::Stdin => "stdin".to_string(),
};
return Err(Box::new(io::Error::other(format!(
"In {}, the first block is {}-{}, and its parent (hash: {}) was not found in the database. The current tip is {}-{}.",
source_display,
first_block.number(),
first_block.hash(),
first_block_parent,
tip.number(),
tip.hash(),
))));
}
}
#[cfg(feature = "progress_bar")]
let progress_bar = {
let bar = match &self.source {
ImportSource::Path(source) => {
let file_size = fs::metadata(source)?.len();
ProgressBar::new(file_size)
}
ImportSource::Stdin => ProgressBar::new_spinner(),
};
let style = ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:50.cyan/blue} {bytes:>6}/{total_bytes:6} {msg}")
.expect("Failed to set progress bar template")
.progress_chars("##-");
bar.set_style(style);
bar
};
let mut largest_block_number = 0;
const BLOCKS_COUNT_PER_CHUNK: usize = 1024 * 6;
let (blocks_tx, blocks_rx) =
ckb_channel::bounded::<(Arc<BlockView>, usize)>(BLOCKS_COUNT_PER_CHUNK);
std::thread::spawn({
let num_threads = self.num_threads;
move || {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.expect("rayon thread pool must build");
pool.install(|| {
loop {
let batch: Vec<String> = lines
.by_ref()
.take(BLOCKS_COUNT_PER_CHUNK)
.filter_map(Result::ok)
.collect();
if batch.is_empty() {
break;
}
batch.par_iter().for_each(|line| {
let block: JsonBlock =
serde_json::from_str(line).expect("parse block from json");
let block: Arc<core::BlockView> = Arc::new(block.into());
blocks_tx
.send((block, line.len()))
.expect("send block to channel");
});
}
drop(blocks_tx);
});
}
});
for (block, block_size) in blocks_rx {
if !block.is_genesis() {
use ckb_chain::LonelyBlock;
largest_block_number = largest_block_number.max(block.number());
#[cfg(feature = "progress_bar")]
let callback = {
let progress_bar = progress_bar.clone();
Box::new(move |verify_result: VerifyResult| {
if let Err(err) = verify_result {
eprintln!("Error verifying block: {:?}", err);
} else {
progress_bar.inc(block_size as u64);
}
})
};
#[cfg(not(feature = "progress_bar"))]
let callback = {
let _ = block_size;
Box::new(move |verify_result: VerifyResult| {
if let Err(err) = verify_result {
eprintln!("Error verifying block: {:?}", err);
}
})
};
let lonely_block = LonelyBlock {
block,
switch: Some(self.switch),
verify_callback: Some(callback),
};
self.chain.asynchronous_process_lonely_block(lonely_block);
}
}
while self
.shared
.snapshot()
.get_block_hash(largest_block_number)
.is_none()
{
std::thread::sleep(std::time::Duration::from_secs(1));
}
#[cfg(feature = "progress_bar")]
progress_bar.finish_with_message("done!");
Ok(())
}
}