ckb_migration_template/
lib.rs

1//! Provide proc-macros to setup migration.
2
3extern crate proc_macro;
4
5use proc_macro::TokenStream;
6use quote::quote;
7use syn::parse_macro_input;
8
9/// multi thread migration template
10#[proc_macro]
11pub fn multi_thread_migration(input: TokenStream) -> TokenStream {
12    let block_expr = parse_macro_input!(input as syn::ExprBlock);
13    let expanded = quote! {
14        const MAX_THREAD: u64 = 6;
15        const MIN_THREAD: u64 = 2;
16        const BATCH: usize = 1_000;
17
18        let chain_db = ChainDB::new(db, StoreConfig::default());
19        let tip = chain_db.get_tip_header().expect("db tip header index");
20        let tip_number = tip.number();
21
22        let tb_num = std::cmp::max(MIN_THREAD, num_cpus::get() as u64);
23        let tb_num = std::cmp::min(tb_num, MAX_THREAD);
24        let chunk_size = tip_number / tb_num;
25        let remainder = tip_number % tb_num;
26        let _barrier = ::std::sync::Arc::new(::std::sync::Barrier::new(tb_num as usize));
27
28        let handles: Vec<_> = (0..tb_num).map(|i| {
29            let chain_db = chain_db.clone();
30            let pb = ::std::sync::Arc::clone(&pb);
31            let barrier = Arc::clone(&_barrier);
32
33            let last = i == (tb_num - 1);
34            let size = if last {
35                chunk_size + remainder
36            } else {
37                chunk_size
38            };
39            let end = if last {
40                tip_number + 1
41            } else {
42                (i + 1) * chunk_size
43            };
44
45            let pbi = pb(size * 2);
46            pbi.set_style(
47                ProgressStyle::default_bar()
48                    .template(
49                        "{prefix:.bold.dim} {spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta}) {msg}",
50                    )
51                    .expect("Failed to set progress bar template")
52                    .progress_chars("#>-"),
53            );
54            pbi.set_position(0);
55            pbi.enable_steady_tick(std::time::Duration::from_millis(5000));
56            ::std::thread::spawn(move || {
57                let mut wb = chain_db.new_write_batch();
58
59                #block_expr
60
61                if !wb.is_empty() {
62                    chain_db.write(&wb).unwrap();
63                }
64                pbi.finish_with_message("done!");
65            })
66        }).collect();
67
68        // Wait for other threads to finish.
69        for handle in handles {
70            handle.join().unwrap();
71        }
72        Ok(chain_db.into_inner())
73    };
74
75    TokenStream::from(expanded)
76}