1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use crate::api::BitcoinDB;
use crate::iter::fetch_async::{fetch_block, Task};
use crate::iter::util::{get_task, DBCopy};
use bitcoin::Block;
use std::borrow::BorrowMut;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, sync_channel, Receiver};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
pub struct BlockIter<TBlock> {
receivers: Vec<Receiver<TBlock>>,
task_order: Receiver<usize>,
worker_thread: Option<Vec<JoinHandle<()>>>,
error_state: Arc<AtomicBool>,
}
impl<TBlock> BlockIter<TBlock>
where
TBlock: From<Block> + Send + 'static,
{
pub fn new(db: &BitcoinDB, heights: Vec<u32>) -> Self {
let cpus = num_cpus::get();
let error_state = Arc::new(AtomicBool::new(false));
let (task_register, task_order) = sync_channel(cpus * 10);
let mut tasks: VecDeque<Task> = VecDeque::with_capacity(heights.len());
for height in heights {
tasks.push_back(Task {
height,
error_state: error_state.clone(),
})
}
let tasks = Arc::new(Mutex::new(tasks));
let mut handles = Vec::with_capacity(cpus);
let mut receivers = Vec::with_capacity(cpus);
for thread_number in 0..cpus {
let (sender, receiver) = channel();
let task = tasks.clone();
let register = task_register.clone();
let db = DBCopy::from_bitcoin_db(db);
let handle = thread::spawn(move || {
loop {
match get_task(&task, ®ister, thread_number) {
None => break,
Some(task) => {
if !fetch_block(&db, task, &sender) {
break;
}
}
}
}
});
receivers.push(receiver);
handles.push(handle);
}
BlockIter {
receivers,
task_order,
worker_thread: Some(handles),
error_state,
}
}
pub fn from_range(db: &BitcoinDB, start: u32, end: u32) -> Self {
if end <= start {
BlockIter::new(db, Vec::new())
} else {
let heights: Vec<u32> = (start..end).collect();
BlockIter::new(db, heights)
}
}
}
impl<TBlock> Iterator for BlockIter<TBlock> {
type Item = TBlock;
fn next(&mut self) -> Option<Self::Item> {
match self.task_order.recv() {
Ok(thread_number) => match self.receivers.get(thread_number).unwrap().recv() {
Ok(block) => Some(block),
Err(_) => None,
},
Err(_) => None,
}
}
}
impl<T> BlockIter<T> {
fn join(&mut self) {
for handle in self.worker_thread.take().unwrap() {
handle.join().unwrap()
}
}
}
impl<T> Drop for BlockIter<T> {
fn drop(&mut self) {
{
let err = self.error_state.borrow_mut();
err.fetch_or(true, Ordering::SeqCst);
}
self.join();
}
}