ckb-instrument 1.1.0

Instruments for ckb for working with `Export`, `Import`
Documentation
use ckb_app_config::ImportSource;
use ckb_chain::ChainController;
use ckb_jsonrpc_types::BlockView as JsonBlock;
use ckb_shared::Shared;
use ckb_store::ChainStore;
use ckb_types::core;
use ckb_verification_traits::Switch;
#[cfg(feature = "progress_bar")]
use indicatif::{ProgressBar, ProgressStyle};
use rayon::prelude::*;
use std::error::Error;
use std::fs;
use std::io;
use std::io::BufRead;
use std::sync::Arc;

/// Export block date from file to database.
pub struct Import {
    /// source file contains block data
    source: ImportSource,
    shared: Shared,
    chain: ChainController,
    switch: Switch,
    num_threads: usize,
}

impl Import {
    /// Creates a new import job.
    pub fn new(
        chain: ChainController,
        shared: Shared,
        source: ImportSource,
        switch: Switch,
        num_threads: usize,
    ) -> Self {
        Import {
            chain,
            shared,
            source,
            switch,
            num_threads,
        }
    }

    /// Executes the import job.
    pub fn execute(self) -> Result<(), Box<dyn Error>> {
        {
            let snapshot = self.shared.snapshot();
            let tip = snapshot.tip_header();
            println!(
                "Before import, current tip is {}-{}",
                tip.number(),
                tip.hash()
            );
        }

        self.read_from_json()?;

        {
            let snapshot = self.shared.snapshot();
            let tip = snapshot.tip_header();
            println!(
                "After import, Current tip is {}-{}",
                tip.number(),
                tip.hash()
            );
        }
        Ok(())
    }

    /// Imports the chain from the JSON file.
    pub fn read_from_json(&self) -> Result<(), Box<dyn Error>> {
        use std::io::Read;

        use ckb_chain::VerifyResult;
        use ckb_types::core::BlockView;

        while self.chain.is_verifying_unverified_blocks_on_startup() {
            std::thread::sleep(std::time::Duration::from_millis(10));
        }

        let f: Box<dyn Read + Send> = match &self.source {
            ImportSource::Path(source) => Box::new(fs::File::open(source)?),
            ImportSource::Stdin => {
                // read from stdin
                Box::new(std::io::stdin())
            }
        };

        let reader = io::BufReader::new(f);
        let mut lines = reader.lines().peekable();
        let first_block = if let Some(Ok(first_line)) = lines.peek() {
            let first_block: JsonBlock =
                serde_json::from_str(first_line).expect("parse first block from json");

            let first_block: core::BlockView = first_block.into();
            Ok(first_block)
        } else {
            Err(Box::new(io::Error::new(
                io::ErrorKind::InvalidData,
                "The source file is empty or malformed.",
            )))
        }?;

        if !first_block.is_genesis() {
            let first_block_parent = first_block.parent_hash();
            if self
                .shared
                .snapshot()
                .get_block(&first_block_parent)
                .is_none()
            {
                let tip = self
                    .shared
                    .snapshot()
                    .get_tip_header()
                    .expect("must get tip header");

                let source_display = match self.source {
                    ImportSource::Path(ref path) => path.display().to_string(),
                    ImportSource::Stdin => "stdin".to_string(),
                };

                return Err(Box::new(io::Error::other(format!(
                    "In {}, the first block is {}-{}, and its parent (hash: {}) was not found in the database. The current tip is {}-{}.",
                    source_display,
                    first_block.number(),
                    first_block.hash(),
                    first_block_parent,
                    tip.number(),
                    tip.hash(),
                ))));
            }
        }

        #[cfg(feature = "progress_bar")]
        let progress_bar = {
            let bar = match &self.source {
                ImportSource::Path(source) => {
                    let file_size = fs::metadata(source)?.len();
                    ProgressBar::new(file_size)
                }
                ImportSource::Stdin => ProgressBar::new_spinner(),
            };
            let style = ProgressStyle::default_bar()
                .template("[{elapsed_precise}] {bar:50.cyan/blue} {bytes:>6}/{total_bytes:6} {msg}")
                .expect("Failed to set progress bar template")
                .progress_chars("##-");
            bar.set_style(style);
            bar
        };

        let mut largest_block_number = 0;
        const BLOCKS_COUNT_PER_CHUNK: usize = 1024 * 6;
        let (blocks_tx, blocks_rx) =
            ckb_channel::bounded::<(Arc<BlockView>, usize)>(BLOCKS_COUNT_PER_CHUNK);
        std::thread::spawn({
            let num_threads = self.num_threads;
            move || {
                let pool = rayon::ThreadPoolBuilder::new()
                    .num_threads(num_threads)
                    .build()
                    .expect("rayon thread pool must build");
                pool.install(|| {
                    loop {
                        let batch: Vec<String> = lines
                            .by_ref()
                            .take(BLOCKS_COUNT_PER_CHUNK)
                            .filter_map(Result::ok)
                            .collect();
                        if batch.is_empty() {
                            break;
                        }
                        batch.par_iter().for_each(|line| {
                            let block: JsonBlock =
                                serde_json::from_str(line).expect("parse block from json");
                            let block: Arc<core::BlockView> = Arc::new(block.into());
                            blocks_tx
                                .send((block, line.len()))
                                .expect("send block to channel");
                        });
                    }
                    drop(blocks_tx);
                });
            }
        });

        for (block, block_size) in blocks_rx {
            if !block.is_genesis() {
                use ckb_chain::LonelyBlock;

                largest_block_number = largest_block_number.max(block.number());

                #[cfg(feature = "progress_bar")]
                let callback = {
                    let progress_bar = progress_bar.clone();
                    Box::new(move |verify_result: VerifyResult| {
                        if let Err(err) = verify_result {
                            eprintln!("Error verifying block: {:?}", err);
                        } else {
                            progress_bar.inc(block_size as u64);
                        }
                    })
                };
                #[cfg(not(feature = "progress_bar"))]
                let callback = {
                    let _ = block_size;
                    Box::new(move |verify_result: VerifyResult| {
                        if let Err(err) = verify_result {
                            eprintln!("Error verifying block: {:?}", err);
                        }
                    })
                };

                let lonely_block = LonelyBlock {
                    block,
                    switch: Some(self.switch),
                    verify_callback: Some(callback),
                };
                self.chain.asynchronous_process_lonely_block(lonely_block);
            }
        }

        while self
            .shared
            .snapshot()
            .get_block_hash(largest_block_number)
            .is_none()
        {
            std::thread::sleep(std::time::Duration::from_secs(1));
        }

        #[cfg(feature = "progress_bar")]
        progress_bar.finish_with_message("done!");
        Ok(())
    }
}