1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::api::{BitcoinDB, Txid};
use crate::iter::fetch_connected_async::{fetch_block_connected, TaskConnected};
use crate::iter::util::{DBCopy, VecMap};
use crate::parser::proto::connected_proto::{BlockConnectable, TxConnectable};
use std::borrow::BorrowMut;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, sync_channel, Receiver};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
pub struct ConnectedBlockIter<TBlock> {
receivers: Vec<Receiver<TBlock>>,
task_order: Receiver<usize>,
worker_thread: Option<Vec<JoinHandle<()>>>,
error_state: Arc<AtomicBool>,
}
impl<TBlock> ConnectedBlockIter<TBlock>
where
TBlock: 'static + BlockConnectable + Send,
{
pub fn new(db: &BitcoinDB, end: u32) -> Self {
let cpus = num_cpus::get();
let outputs_insertion_height = Arc::new((Mutex::new(0), Condvar::new()));
let error_state = Arc::new(AtomicBool::new(false));
let (task_register, task_order) = channel();
let unspent: Arc<
Mutex<HashMap<Txid, Arc<Mutex<VecMap<<TBlock::Tx as TxConnectable>::TOut>>>>>,
> = Arc::new(Mutex::new(HashMap::new()));
let mut tasks: VecDeque<TaskConnected> = VecDeque::with_capacity(end as usize);
for height in 0..end {
tasks.push_back(TaskConnected {
height,
outputs_insertion_height: outputs_insertion_height.clone(),
error_state: error_state.clone(),
})
}
let tasks = Arc::new(Mutex::new(tasks));
let mut handles = Vec::with_capacity(cpus);
let mut receivers = Vec::with_capacity(cpus);
for thread_number in 0..cpus {
let (sender, receiver) = sync_channel(10);
let task = tasks.clone();
let register = task_register.clone();
let db_copy = DBCopy::from_bitcoin_db(db);
let unspent_copy = unspent.clone();
let handle = thread::spawn(move || {
loop {
let task = {
let mut task = task.lock().unwrap();
if task.front().is_some() {
register.send(thread_number).unwrap();
}
task.pop_front()
};
match task {
None => break,
Some(task) => {
if !fetch_block_connected(&unspent_copy, &db_copy, task, &sender) {
break;
}
}
}
}
});
receivers.push(receiver);
handles.push(handle);
}
ConnectedBlockIter {
receivers,
task_order,
worker_thread: Some(handles),
error_state,
}
}
}
impl<TBlock> Iterator for ConnectedBlockIter<TBlock> {
type Item = TBlock;
fn next(&mut self) -> Option<Self::Item> {
match self.task_order.recv() {
Ok(thread_number) => match self.receivers.get(thread_number).unwrap().recv() {
Ok(block) => Some(block),
Err(_) => None,
},
Err(_) => None,
}
}
}
impl<T> ConnectedBlockIter<T> {
fn join(&mut self) {
for handle in self.worker_thread.take().unwrap() {
handle.join().unwrap()
}
}
}
impl<T> Drop for ConnectedBlockIter<T> {
fn drop(&mut self) {
{
let err = self.error_state.borrow_mut();
err.fetch_or(true, Ordering::SeqCst);
}
self.join();
}
}