ckb_migration_template/
lib.rs

1//! Provide proc-macros to setup migration.
2
3extern crate proc_macro;
4
5use proc_macro::TokenStream;
6use quote::quote;
7use syn::parse_macro_input;
8
9/// multi thread migration template
10#[proc_macro]
11pub fn multi_thread_migration(input: TokenStream) -> TokenStream {
12    let block_expr = parse_macro_input!(input as syn::ExprBlock);
13    let expanded = quote! {
14        const MAX_THREAD: u64 = 6;
15        const MIN_THREAD: u64 = 2;
16        const BATCH: usize = 1_000;
17
18        let chain_db = ChainDB::new(db, StoreConfig::default());
19        let tip = chain_db.get_tip_header().expect("db tip header index");
20        let tip_number = tip.number();
21
22        let tb_num = std::cmp::max(MIN_THREAD, num_cpus::get() as u64);
23        let tb_num = std::cmp::min(tb_num, MAX_THREAD);
24        let chunk_size = tip_number / tb_num;
25        let remainder = tip_number % tb_num;
26        let _barrier = ::std::sync::Arc::new(::std::sync::Barrier::new(tb_num as usize));
27
28        let handles: Vec<_> = (0..tb_num).map(|i| {
29            let chain_db = chain_db.clone();
30            let pb = ::std::sync::Arc::clone(&pb);
31            let barrier = Arc::clone(&_barrier);
32
33            let last = i == (tb_num - 1);
34            let size = if last {
35                chunk_size + remainder
36            } else {
37                chunk_size
38            };
39            let end = if last {
40                tip_number + 1
41            } else {
42                (i + 1) * chunk_size
43            };
44
45            let pbi = pb(size * 2);
46            pbi.set_style(
47                ProgressStyle::default_bar()
48                    .template(
49                        "{prefix:.bold.dim} {spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta}) {msg}",
50                    )
51                    .progress_chars("#>-"),
52            );
53            pbi.set_position(0);
54            pbi.enable_steady_tick(5000);
55            ::std::thread::spawn(move || {
56                let mut wb = chain_db.new_write_batch();
57
58                #block_expr
59
60                if !wb.is_empty() {
61                    chain_db.write(&wb).unwrap();
62                }
63                pbi.finish_with_message("done!");
64            })
65        }).collect();
66
67        // Wait for other threads to finish.
68        for handle in handles {
69            handle.join().unwrap();
70        }
71        Ok(chain_db.into_inner())
72    };
73
74    TokenStream::from(expanded)
75}