1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::api::{BitcoinDB, Txid};
use crate::iter::fetch_connected_async::{fetch_block_connected, TaskConnected};
use crate::iter::util::{DBCopy, FromBlockComponent, FromTxComponent, VecMap};
use bitcoin::TxOut;
use std::borrow::BorrowMut;
use std::collections::{HashMap, VecDeque};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{sync_channel, Receiver};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
pub struct ConnectedBlockIterator<TBlock, Tx, TOut> {
receiver: Receiver<TBlock>,
worker_thread: Option<JoinHandle<()>>,
error_state: Arc<AtomicBool>,
tx_phantom: PhantomData<Tx>,
tout_phantom: PhantomData<TOut>,
}
impl<T, Tx, TOut> ConnectedBlockIterator<T, Tx, TOut> {
fn join(&mut self) {
self.worker_thread.take().unwrap().join().unwrap()
}
}
impl<T, Tx, TOut> Drop for ConnectedBlockIterator<T, Tx, TOut> {
fn drop(&mut self) {
{
let err = self.error_state.borrow_mut();
err.fetch_or(true, Ordering::SeqCst);
}
self.join();
}
}
impl<TBlock, Tx, TOut> ConnectedBlockIterator<TBlock, Tx, TOut>
where
TOut: 'static + From<TxOut> + Send,
Tx: FromTxComponent<TOut> + Send,
TBlock: 'static + FromBlockComponent<Tx> + Send,
{
pub fn new(db: &BitcoinDB, end: u32) -> Self {
let cpus = num_cpus::get();
let outputs_insertion_height = Arc::new((Mutex::new(0), Condvar::new()));
let result_height = Arc::new((Mutex::new(0), Condvar::new()));
let error_state = Arc::new(AtomicBool::new(false));
let error_state_copy = error_state.clone();
let (sender, receiver) = sync_channel(cpus * 10);
let unspent: Arc<Mutex<HashMap<Txid, Arc<Mutex<VecMap<TOut>>>>>> =
Arc::new(Mutex::new(HashMap::new()));
let db = DBCopy::from_bitcoin_db(db);
let worker_thread = thread::spawn(move || {
let mut tasks: VecDeque<TaskConnected<TBlock>> = VecDeque::with_capacity(end as usize);
for height in 0..end {
tasks.push_back(TaskConnected {
height,
outputs_insertion_height: outputs_insertion_height.clone(),
result_height: result_height.clone(),
sender: sender.clone(),
error_state: error_state_copy.clone(),
})
}
let tasks = Arc::new(Mutex::new(tasks));
let mut handles = Vec::with_capacity(cpus);
for _ in 0..cpus {
let task = tasks.clone();
let db_copy = db.clone();
let unspent_copy = unspent.clone();
let handle = thread::spawn(move || {
loop {
let task = {
let mut task = task.lock().unwrap();
task.pop_front()
};
match task {
None => break,
Some(task) => {
if !fetch_block_connected(&unspent_copy, &db_copy, task) {
break;
}
}
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
});
ConnectedBlockIterator {
receiver,
worker_thread: Some(worker_thread),
error_state,
tx_phantom: Default::default(),
tout_phantom: Default::default(),
}
}
}
impl<TBlock, Tx, TOut> Iterator for ConnectedBlockIterator<TBlock, Tx, TOut> {
type Item = TBlock;
fn next(&mut self) -> Option<Self::Item> {
match self.receiver.recv() {
Ok(block) => Some(block),
Err(_) => None,
}
}
}