1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
extern crate proc_macro;
use proc_macro::TokenStream;
use quote::quote;
use syn::parse_macro_input;
#[proc_macro]
pub fn multi_thread_migration(input: TokenStream) -> TokenStream {
let block_expr = parse_macro_input!(input as syn::ExprBlock);
let expanded = quote! {
const MAX_THREAD: u64 = 6;
const MIN_THREAD: u64 = 2;
const BATCH: usize = 1_000;
let chain_db = ChainDB::new(db, StoreConfig::default());
let tip = chain_db.get_tip_header().expect("db tip header index");
let tip_number = tip.number();
let tb_num = std::cmp::max(MIN_THREAD, num_cpus::get() as u64);
let tb_num = std::cmp::min(tb_num, MAX_THREAD);
let chunk_size = tip_number / tb_num;
let remainder = tip_number % tb_num;
let _barrier = ::std::sync::Arc::new(::std::sync::Barrier::new(tb_num as usize));
let handles: Vec<_> = (0..tb_num).map(|i| {
let chain_db = chain_db.clone();
let pb = ::std::sync::Arc::clone(&pb);
let barrier = Arc::clone(&_barrier);
let last = i == (tb_num - 1);
let size = if last {
chunk_size + remainder
} else {
chunk_size
};
let end = if last {
tip_number + 1
} else {
(i + 1) * chunk_size
};
let pbi = pb(size * 2);
pbi.set_style(
ProgressStyle::default_bar()
.template(
"{prefix:.bold.dim} {spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta}) {msg}",
)
.progress_chars("#>-"),
);
pbi.set_position(0);
pbi.enable_steady_tick(5000);
::std::thread::spawn(move || {
let mut wb = chain_db.new_write_batch();
#block_expr
if !wb.is_empty() {
chain_db.write(&wb).unwrap();
}
pbi.finish_with_message("done!");
})
}).collect();
for handle in handles {
handle.join().unwrap();
}
Ok(chain_db.into_inner())
};
TokenStream::from(expanded)
}