use ckb_app_config::ExportTarget;
use ckb_jsonrpc_types::BlockView as JsonBlock;
use ckb_jsonrpc_types::Either;
use ckb_shared::Snapshot;
use ckb_shared::shared::Shared;
use ckb_store::ChainStore;
use ckb_types::H256;
use ckb_types::core::BlockNumber;
#[cfg(feature = "progress_bar")]
use indicatif::{ParallelProgressIterator, ProgressBar, ProgressStyle};
use itertools::Itertools;
use rayon::prelude::*;
use std::error::Error;
use std::fs;
use std::io;
use std::io::Write;
use std::sync::Arc;
pub struct Export {
pub target: ExportTarget,
pub shared: Shared,
pub snapshot: Arc<Snapshot>,
pub from: Option<Either<BlockNumber, H256>>,
pub to: Option<Either<BlockNumber, H256>>,
}
impl Export {
pub fn new(
shared: Shared,
target: ExportTarget,
from: Option<Either<BlockNumber, H256>>,
to: Option<Either<BlockNumber, H256>>,
) -> Self {
let snapshot = shared.cloned_snapshot();
Export {
shared,
snapshot,
target,
from,
to,
}
}
fn file_name(&self, from_number: u64, to_number: u64) -> Result<String, Box<dyn Error>> {
Ok(format!(
"{}-{}-{}.jsonl",
self.shared.consensus().id,
from_number,
to_number
))
}
pub fn execute(self) -> Result<(), Box<dyn Error>> {
if let ExportTarget::Path(ref path) = self.target {
fs::create_dir_all(path)?;
}
self.write_to_json()
}
pub fn write_to_json(self) -> Result<(), Box<dyn Error>> {
let from_number = match self.from.clone() {
Some(Either::Left(number)) => number,
Some(Either::Right(hash)) => self
.snapshot
.get_block_number(&hash.clone().into())
.unwrap_or_else(|| panic!("must get block number for hash: {}", hash)),
None => 0,
};
let to_number = match self.to.clone() {
Some(Either::Left(number)) => number,
Some(Either::Right(hash)) => self
.snapshot
.get_block_number(&hash.clone().into())
.unwrap_or_else(|| panic!("must get block number for hash: {}", hash)),
None => self.snapshot.tip_number(),
};
if to_number < from_number {
return Err(format!("Invalid range: from {} to {}", from_number, to_number).into());
}
let mut writer: Box<dyn Write> = {
match self.target {
ExportTarget::Path(ref path_buf) => {
let file_name = self.file_name(from_number, to_number)?;
let file_path = path_buf.join(file_name);
println!("Writing to {}", file_path.display());
let f = fs::OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.open(file_path)?;
Box::new(io::BufWriter::new(f))
}
ExportTarget::Stdout => Box::new(std::io::stdout()),
}
};
let snapshot = self.shared.snapshot();
#[cfg(feature = "progress_bar")]
let progress_bar = {
let progress_bar = ProgressBar::new(to_number - from_number + 1);
progress_bar.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:50.cyan/blue} {pos:>6}/{len:6} {msg}")
.expect("Failed to set progress bar template")
.progress_chars("##-"),
);
progress_bar
};
const BLOCKS_COUNT_PER_CHUNK: usize = 1024;
let (blocks_tx, blocks_rx) = ckb_channel::bounded(BLOCKS_COUNT_PER_CHUNK);
std::thread::scope({
|s| {
#[cfg(feature = "progress_bar")]
let progress_bar_clone = progress_bar.clone();
let producer_jh = s.spawn(move || -> Result<(), String> {
(from_number..=to_number)
.chunks(BLOCKS_COUNT_PER_CHUNK)
.into_iter()
.try_for_each(|chunk| -> Result<(), String> {
let iter = chunk.collect::<Vec<_>>().into_par_iter();
#[cfg(feature = "progress_bar")]
let iter = iter.progress_with(progress_bar_clone.clone());
iter.try_for_each(|block_number| -> Result<(), String> {
let block_hash =
snapshot.get_block_hash(block_number).ok_or_else(|| {
format!("not found block hash for {}", block_number)
})?;
let block = snapshot.get_block(&block_hash).ok_or_else(|| {
format!("not found block for {}", block_number)
})?;
let block: JsonBlock = block.into();
let encoded = serde_json::to_vec(&block).map_err(|err| {
format!("serializing block failed: {:?}", err)
})?;
blocks_tx
.send((block_number, encoded))
.map_err(|err| format!("sending block failed: {:?}", err))
})
})?;
drop(blocks_tx);
Ok(())
});
let mut expected = from_number;
let mut buffer = std::collections::BTreeMap::new();
for (idx, res) in blocks_rx {
buffer.insert(idx, res);
while buffer.contains_key(&expected) {
let encoded = buffer.remove(&expected).unwrap();
if let Err(err) = writer.write_all(&encoded) {
eprintln!("writing block failed: {:?}", err);
}
if let Err(err) = writer.write_all(b"\n") {
eprintln!("writing newline failed: {:?}", err);
}
expected += 1;
}
}
let result = producer_jh.join();
if let Err(err) = result {
eprintln!("Error during export: {:?}", err);
return;
}
#[cfg(feature = "progress_bar")]
progress_bar.finish_with_message("done!");
}
});
Ok(())
}
}