ckb_migration_template/
lib.rs1extern crate proc_macro;
4
5use proc_macro::TokenStream;
6use quote::quote;
7use syn::parse_macro_input;
8
9#[proc_macro]
11pub fn multi_thread_migration(input: TokenStream) -> TokenStream {
12 let block_expr = parse_macro_input!(input as syn::ExprBlock);
13 let expanded = quote! {
14 const MAX_THREAD: u64 = 6;
15 const MIN_THREAD: u64 = 2;
16 const BATCH: usize = 1_000;
17
18 let chain_db = ChainDB::new(db, StoreConfig::default());
19 let tip = chain_db.get_tip_header().expect("db tip header index");
20 let tip_number = tip.number();
21
22 let tb_num = std::cmp::max(MIN_THREAD, num_cpus::get() as u64);
23 let tb_num = std::cmp::min(tb_num, MAX_THREAD);
24 let chunk_size = tip_number / tb_num;
25 let remainder = tip_number % tb_num;
26 let _barrier = ::std::sync::Arc::new(::std::sync::Barrier::new(tb_num as usize));
27
28 let handles: Vec<_> = (0..tb_num).map(|i| {
29 let chain_db = chain_db.clone();
30 let pb = ::std::sync::Arc::clone(&pb);
31 let barrier = Arc::clone(&_barrier);
32
33 let last = i == (tb_num - 1);
34 let size = if last {
35 chunk_size + remainder
36 } else {
37 chunk_size
38 };
39 let end = if last {
40 tip_number + 1
41 } else {
42 (i + 1) * chunk_size
43 };
44
45 let pbi = pb(size * 2);
46 pbi.set_style(
47 ProgressStyle::default_bar()
48 .template(
49 "{prefix:.bold.dim} {spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta}) {msg}",
50 )
51 .expect("Failed to set progress bar template")
52 .progress_chars("#>-"),
53 );
54 pbi.set_position(0);
55 pbi.enable_steady_tick(std::time::Duration::from_millis(5000));
56 ::std::thread::spawn(move || {
57 let mut wb = chain_db.new_write_batch();
58
59 #block_expr
60
61 if !wb.is_empty() {
62 chain_db.write(&wb).unwrap();
63 }
64 pbi.finish_with_message("done!");
65 })
66 }).collect();
67
68 for handle in handles {
70 handle.join().unwrap();
71 }
72 Ok(chain_db.into_inner())
73 };
74
75 TokenStream::from(expanded)
76}