1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
//!
//! Development Note:
//!
//! This module consists manually implemented concurrency model.
//!
//! Below is a basic description of the data model:
//! - Each worker thread actively try to fetch task
//! - There are two groups of workers in this file: producers and consumers
//!
//! ## Synchronization
//! - When each thread fetch a task, it registers its thread ID (thread_num)
//!   in a mpsc channel. When consumer consumes, it fetch from this mpsc
//!   channel to see which thread data stream to fetch from. This ensures
//!   the output are in right order.
//! - An additional task number (current, or current_height) is updated
//!   when output is received, it is compared to the output's task number
//!   to ensure that output are received in the right order.
//! - If order is incorrect, some one of the threads have stopped due
//!   to exception. This will stop iterator output, and stop all producers
//!   from fetching tasks, and attempt to flush output until all workers
//!   have stopped.
//!
//! ## Error handling
//! - When any exception occurs, stop producers from fetching new task.
//! - Stop consumers only after all producers have stopped
//!   (otherwise producers might block consumers from sending)
//! - Before dropping the structure, stop all producers from fetching tasks,
//!   and flush all remaining tasks.
//!
//! ## Temporary RocksDB Storage
//! - A temporary rocksDB will be created in system, which will be cleared up
//!   once the iterator is dropped.
//!
use crate::api::BitcoinDB;
use crate::iter::fetch_connected_async::{connect_outpoints, update_unspent_cache};
use crate::iter::util::get_task;
#[cfg(not(feature = "on-disk-utxo"))]
use crate::iter::util::VecMap;
use crate::parser::proto::connected_proto::BlockConnectable;
#[cfg(not(feature = "on-disk-utxo"))]
use crate::parser::proto::connected_proto::TxConnectable;
#[cfg(not(feature = "on-disk-utxo"))]
use hash_hasher::HashedMap;
#[cfg(feature = "on-disk-utxo")]
use log::{error, warn};
#[cfg(feature = "on-disk-utxo")]
use rocksdb::{Options, PlainTableFactoryOptions, SliceTransform, WriteOptions, DB};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, sync_channel, Receiver};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
#[cfg(feature = "on-disk-utxo")]
use tempdir::TempDir;

const MAX_SIZE_FOR_THREAD: usize = 10;

/// iterate through blocks, and connecting outpoints.
pub struct ConnectedBlockIter<TBlock> {
    result_receivers: Vec<Receiver<(TBlock, usize)>>,
    result_order: Receiver<usize>,
    worker_thread: Option<Vec<JoinHandle<()>>>,
    #[cfg(feature = "on-disk-utxo")]
    rocks_db_path: Option<TempDir>,
    iterator_stopper: Arc<AtomicBool>,
    is_killed: bool,
    current_height: usize,
}

impl<TBlock> ConnectedBlockIter<TBlock>
where
    TBlock: 'static + BlockConnectable + Send,
{
    /// the worker threads are dispatched in this `new` constructor!
    pub fn new(db: &BitcoinDB, end: usize) -> Self {
        let cpus = num_cpus::get();
        let mut handles = Vec::with_capacity(cpus * 2);
        let iterator_stopper = Arc::new(AtomicBool::new(false));

        // UTXO cache
        #[cfg(not(feature = "on-disk-utxo"))]
        let unspent: Arc<
            Mutex<HashedMap<u128, Arc<Mutex<VecMap<<TBlock::Tx as TxConnectable>::TOut>>>>>,
        > = Arc::new(Mutex::new(HashedMap::default()));
        #[cfg(feature = "on-disk-utxo")]
        let cache_dir = {
            match TempDir::new("rocks_db") {
                Ok(tempdir) => tempdir,
                Err(e) => {
                    error!("failed to create rocksDB tempdir for UTXO: {}", e);
                    return ConnectedBlockIter::null();
                }
            }
        };
        #[cfg(feature = "on-disk-utxo")]
        let options = {
            let mut options = Options::default();

            // create table
            options.create_if_missing(true);

            // config to more jobs
            options.set_max_background_jobs(cpus as i32);

            // configure mem-table to a large value (1 GB)
            options.set_write_buffer_size(0x40000000);

            // configure l0 and l1 size, let them have the same size (4 GB)
            options.set_level_zero_file_num_compaction_trigger(4);
            options.set_max_bytes_for_level_base(0x100000000);

            // 256MB file size
            options.set_target_file_size_base(0x10000000);

            // use a smaller compaction multiplier
            options.set_max_bytes_for_level_multiplier(4.0);

            // use 8-byte prefix (2 ^ 64 is far enough for transaction counts)
            options.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));

            // set to plain-table for better performance
            options.set_plain_table_factory(&PlainTableFactoryOptions {
                // 16 (compressed txid) + 4 (i32 out n)
                user_key_length: 20,
                bloom_bits_per_key: 10,
                hash_table_ratio: 0.75,
                index_sparseness: 16,
            });

            options
        };
        #[cfg(feature = "on-disk-utxo")]
        let unspent = Arc::new(match DB::open(&options, &cache_dir) {
            Ok(db) => db,
            Err(e) => {
                error!("failed to create temp rocksDB for UTXO: {}", e);
                return ConnectedBlockIter::null();
            }
        });

        // all tasks
        let heights = Arc::new(Mutex::new((0..end).collect::<VecDeque<usize>>()));

        // the channel for synchronizing cache update
        let (block_worker_register, block_order) = channel();
        let block_order = Arc::new(Mutex::new(block_order));
        let mut block_receivers = Vec::with_capacity(cpus);

        // output insertion threads
        for thread_number in 0..cpus {
            // block streams
            let (block_sender, block_receiver) = sync_channel(MAX_SIZE_FOR_THREAD);
            let block_receiver = Arc::new(Mutex::new(block_receiver));

            // clone resources
            let unspent = unspent.clone();
            let heights = heights.clone();
            let db = db.clone();
            let block_worker_register = block_worker_register.clone();
            let iterator_stopper = iterator_stopper.clone();

            // write without WAL
            #[cfg(feature = "on-disk-utxo")]
            let write_options = {
                let mut opt = WriteOptions::default();
                opt.disable_wal(true);
                opt
            };

            // output cache insertion workers
            let handle = thread::spawn(move || {
                loop {
                    // stop acquiring new tasks
                    if iterator_stopper.load(Ordering::SeqCst) {
                        break;
                    }
                    match get_task(&heights, &block_worker_register, thread_number) {
                        // finish
                        None => break,
                        Some(height) => {
                            if !update_unspent_cache::<TBlock>(
                                &unspent,
                                #[cfg(feature = "on-disk-utxo")]
                                &write_options,
                                &db,
                                height,
                                &block_sender,
                            ) {
                                iterator_stopper.fetch_or(true, Ordering::SeqCst);
                                break;
                            }
                        }
                    }
                }
            });
            block_receivers.push(block_receiver);
            handles.push(handle);
        }

        // the channel for synchronizing output order
        let (result_register, result_order) = channel();

        // block_streams
        let mut result_receivers = Vec::with_capacity(cpus);

        // Ensure that right block order is provided by producer.
        // Update this variable on receiving block from producer.
        // Check if this variable equal to block height received.
        // Otherwise, stop producer.
        let current_height = Arc::new(AtomicUsize::new(0));

        // consume UTXO cache and produce output
        for thread_number in 0..cpus {
            // result streams
            let (result_sender, result_receiver) = sync_channel(MAX_SIZE_FOR_THREAD);

            let register = result_register.clone();
            let unspent = unspent.clone();
            let block_order = block_order.clone();
            let block_receivers = block_receivers.clone();
            let current_height = current_height.clone();
            let iterator_stopper = iterator_stopper.clone();

            let handle = thread::spawn(move || {
                loop {
                    // exclusive access to block receiver
                    let (blk, height) = {
                        let block_order_lock = block_order.lock().unwrap();
                        // receive thread_number for block receiver
                        // might block here, must drop all senders
                        if let Ok((height, worker_number)) = block_order_lock.recv() {
                            let lock = block_receivers.get(worker_number).unwrap().lock();
                            if height != current_height.load(Ordering::SeqCst) {
                                // some producer thread has failed to send block
                                // stop all producers
                                iterator_stopper.fetch_or(true, Ordering::SeqCst);
                                continue;
                            }
                            register.send(thread_number).unwrap();
                            match lock.unwrap().recv() {
                                Ok(blk) => {
                                    current_height.fetch_add(1, Ordering::SeqCst);
                                    (blk, height)
                                }
                                Err(_) => {
                                    // stop all producers
                                    iterator_stopper.fetch_or(true, Ordering::SeqCst);
                                    continue;
                                }
                            }
                        } else {
                            // all producers have stopped (block order registers dropped)
                            // may stop consumers
                            break;
                        }
                    };
                    // release receivers lock

                    if !connect_outpoints(&unspent, &result_sender, blk, height) {
                        // stop all producers
                        iterator_stopper.fetch_or(true, Ordering::SeqCst);
                        continue;
                    }
                }
            });

            result_receivers.push(result_receiver);
            handles.push(handle);
        }

        ConnectedBlockIter {
            result_receivers,
            result_order,
            worker_thread: Some(handles),
            #[cfg(feature = "on-disk-utxo")]
            rocks_db_path: Some(cache_dir),
            iterator_stopper,
            is_killed: false,
            current_height: 0,
        }
    }

    #[cfg(feature = "on-disk-utxo")]
    fn null() -> Self {
        let result_order = {
            let (_, receiver) = sync_channel(1);
            receiver
        };
        ConnectedBlockIter {
            result_receivers: Vec::new(),
            result_order,
            worker_thread: Some(Vec::new()),
            #[cfg(feature = "on-disk-utxo")]
            rocks_db_path: None,
            iterator_stopper: Arc::new(AtomicBool::new(false)),
            is_killed: true,
            current_height: 0,
        }
    }
}

impl<T> ConnectedBlockIter<T> {
    /// stop workers, flush tasks
    fn kill(&mut self) {
        if !self.is_killed {
            // stop threads from getting new tasks
            self.iterator_stopper.fetch_or(true, Ordering::SeqCst);
            // flush the remaining tasks in the channel
            loop {
                let _ = match self.result_order.recv() {
                    Ok(thread_number) => self.result_receivers.get(thread_number).unwrap().recv(),
                    // all consumers (connecting workers) have stopped
                    Err(_) => break,
                };
            }
            self.is_killed = true;
        }
    }
}

impl<TBlock> Iterator for ConnectedBlockIter<TBlock> {
    type Item = TBlock;

    fn next(&mut self) -> Option<Self::Item> {
        if self.is_killed {
            return None;
        }
        match self.result_order.recv() {
            Ok(thread_number) => match self.result_receivers.get(thread_number).unwrap().recv() {
                Ok((block, height)) => {
                    // Some threads might have stopped first.
                    // while the remaining working threads produces wrong order.
                    if self.current_height != height {
                        self.kill();
                        return None;
                    }
                    self.current_height += 1;
                    Some(block)
                }
                // some worker have stopped
                Err(_) => {
                    self.kill();
                    None
                }
            },
            // all consumers (connecting workers) have stopped
            Err(_) => None,
        }
    }
}

impl<T> ConnectedBlockIter<T> {
    fn join(&mut self) {
        for handle in self.worker_thread.take().unwrap() {
            handle.join().unwrap()
        }
    }
}

impl<T> Drop for ConnectedBlockIter<T> {
    /// attempt to stop the worker threads
    fn drop(&mut self) {
        self.kill();
        self.join();
        #[cfg(feature = "on-disk-utxo")]
        if let Some(rocks_db_path) = self.rocks_db_path.take() {
            let result = DB::destroy(&Options::default(), &rocks_db_path);
            if let Err(e) = result {
                warn!("failed to destroy temp rocksDB, {}", e);
            }
        }
    }
}

#[cfg(test)]
#[cfg(feature = "on-disk-utxo")]
mod test_empty {
    use crate::{ConnectedBlockIter, SConnectedBlock};

    #[test]
    fn test_empty() {
        let mut empty = ConnectedBlockIter::null();
        for _ in 0..100 {
            let b: Option<SConnectedBlock> = empty.next();
            assert!(b.is_none());
        }
    }
}