1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::api::BitcoinDB;
use crate::iter::fetch_async::{fetch_block, Task};
use crate::iter::util::DBCopy;
use bitcoin::Block;
use std::borrow::BorrowMut;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{sync_channel, Receiver};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
pub struct BlockIterator<TBlock> {
receiver: Receiver<TBlock>,
worker_thread: Option<JoinHandle<()>>,
error_state: Arc<AtomicBool>,
}
impl<T> BlockIterator<T> {
fn join(&mut self) {
self.worker_thread.take().unwrap().join().unwrap();
}
}
impl<T> Drop for BlockIterator<T> {
fn drop(&mut self) {
{
let err = self.error_state.borrow_mut();
err.fetch_or(true, Ordering::SeqCst);
}
self.join();
}
}
impl<TBlock> BlockIterator<TBlock>
where
TBlock: From<Block> + Send + 'static,
{
pub fn new(db: &BitcoinDB, heights: Vec<u32>) -> Self {
let cursor: Vec<u32> = (0..heights.len() as u32).collect();
let cpus = num_cpus::get();
let output_number = Arc::new((Mutex::new(*cursor.get(0).unwrap()), Condvar::new()));
let error_state = Arc::new(AtomicBool::new(false));
let (sender, receiver) = sync_channel(cpus * 10);
let db = DBCopy::from_bitcoin_db(db);
let error_state_copy = error_state.clone();
let worker_thread = thread::spawn(move || {
let mut tasks: VecDeque<Task<TBlock>> = VecDeque::with_capacity(cursor.len());
for task_number in cursor {
tasks.push_back(Task {
task_number,
height: *heights.get(task_number as usize).unwrap(),
output_number: output_number.clone(),
sender: sender.clone(),
error_state: error_state_copy.clone(),
})
}
let tasks = Arc::new(Mutex::new(tasks));
let mut handles = Vec::with_capacity(cpus);
for _ in 0..cpus {
let task = tasks.clone();
let db_copy = db.clone();
let handle = thread::spawn(move || {
loop {
let task = {
let mut task = task.lock().unwrap();
task.pop_front()
};
match task {
None => break,
Some(task) => {
if !fetch_block(&db_copy, task) {
break;
}
}
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
});
BlockIterator {
receiver,
worker_thread: Some(worker_thread),
error_state,
}
}
pub fn from_range(db: &BitcoinDB, start: u32, end: u32) -> Self {
if end <= start {
BlockIterator::new(db, Vec::new())
} else {
let heights: Vec<u32> = (start..end).collect();
BlockIterator::new(db, heights)
}
}
}
impl<TBlock> Iterator for BlockIterator<TBlock> {
type Item = TBlock;
fn next(&mut self) -> Option<Self::Item> {
match self.receiver.recv() {
Ok(block) => Some(block),
Err(_) => None,
}
}
}