blocks_iterator/
lib.rs

1#![doc = include_str!("../README.md")]
2// Coding conventions
3#![forbid(unsafe_code)]
4#![deny(non_upper_case_globals)]
5#![deny(non_camel_case_types)]
6#![deny(non_snake_case)]
7#![deny(unused_mut)]
8#![deny(dead_code)]
9#![deny(unused_imports)]
10#![deny(unused_must_use)]
11#![cfg_attr(docsrs, feature(doc_auto_cfg))]
12
13use bitcoin::BlockHash;
14use log::{info, Level};
15use std::fs::File;
16
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::mpsc::{sync_channel, SyncSender};
19use std::sync::{Arc, Mutex};
20use std::thread;
21use std::thread::JoinHandle;
22use std::time::Instant;
23
24pub use period::{PeriodCounter, Periodic};
25
26mod block_extra;
27mod bsl;
28mod config;
29mod error;
30mod iter;
31mod period;
32mod pipe;
33mod stages;
34mod utxo;
35
36// re-exporting deps
37pub use bitcoin;
38pub use fxhash;
39pub use glob;
40pub use log;
41
42pub use block_extra::BlockExtra;
43pub use config::Config;
44pub use error::Error;
45pub use iter::iter;
46pub use pipe::PipeIterator;
47
48/// Before reorder we keep only the position of the block in the file system and data relative
49/// to the block hash, the previous hash and the following hash (populated during reorder phase)
50/// We will need
51///  to read the block from disk again, but by doing so we will avoid using too much
52/// memory in the `OutOfOrderBlocks` map.
53#[derive(Debug)]
54pub struct FsBlock {
55    /// the file the block identified by `hash` is stored in. Multiple blocks are stored in the
56    /// and we don't want to open/close the file many times for performance reasons so it's shared.
57    /// It's a Mutex to allow to be sent between threads but only one thread (reorder) mutably
58    /// access to it so there is no contention. (Arc alone isn't enough cause it can't be mutated,
59    /// RefCell can be mutated but not sent between threads)
60    pub file: Arc<Mutex<File>>,
61
62    /// The start position in bytes in the `file` at which the block identified by `hash`
63    pub start: usize,
64
65    /// The end position in bytes in the `file` at which the block identified by `hash`
66    pub end: usize,
67
68    /// The hash identifying this block, output of `block.header.block_hash()`
69    pub hash: BlockHash,
70
71    /// The hash of the block previous to this one, `block.header.prev_blockhash`
72    pub prev: BlockHash,
73
74    /// The hash of the blocks following this one. It is populated during the reorder phase, it can
75    /// be more than one because of reorgs.
76    pub next: Vec<BlockHash>,
77
78    /// The serialization format to use when trasformed to `BlockExtra` (0 or 1)
79    pub serialization_version: u8,
80
81    /// Total number of transaction inputs in this block
82    pub(crate) block_total_inputs: u32,
83
84    /// Total number of transaction outputs in this block
85    pub(crate) block_total_outputs: u32,
86
87    /// Total number of transactions in this block
88    pub(crate) block_total_txs: u32,
89}
90
91fn iterate(config: Config, channel: SyncSender<Option<BlockExtra>>) -> JoinHandle<()> {
92    thread::spawn(move || {
93        let now = Instant::now();
94        let early_stop = Arc::new(AtomicBool::new(false));
95
96        // FsBlock is a small struct (~120b), so 10_000 is not a problem but allows the read_detect to read ahead the next block file
97        let (send_block_fs, receive_block_fs) = sync_channel(0);
98        let _read = stages::ReadDetect::new(
99            config.blocks_dir.clone(),
100            config.network,
101            early_stop.clone(),
102            send_block_fs,
103            config.serialization_version,
104        );
105
106        let (send_ordered_blocks, receive_ordered_blocks) =
107            sync_channel(config.channels_size.into());
108        let _reorder = stages::Reorder::new(
109            config.network,
110            config.max_reorg,
111            config.stop_at_height,
112            early_stop.clone(),
113            receive_block_fs,
114            send_ordered_blocks,
115        );
116
117        let (send_blocks_with_txids, receive_blocks_with_txids) =
118            sync_channel(config.channels_size.into());
119        let send_blocks_with_txids = if config.skip_prevout {
120            // if skip_prevout is true, we send directly to end step
121            channel.clone()
122        } else {
123            send_blocks_with_txids
124        };
125
126        let _compute_txids = stages::ComputeTxids::new(
127            config.skip_prevout,
128            config.start_at_height,
129            receive_ordered_blocks,
130            send_blocks_with_txids,
131        );
132
133        if !config.skip_prevout {
134            match config.utxo_manager() {
135                Ok(utxo_manager) => {
136                    let _fee = stages::Fee::new(
137                        config.start_at_height,
138                        receive_blocks_with_txids,
139                        channel,
140                        utxo_manager,
141                    );
142                }
143                Err(e) => {
144                    log::error!("{e}");
145                    early_stop.store(true, Ordering::Relaxed);
146                    channel.send(None).unwrap();
147                }
148            }
149        }
150
151        info!("Total time elapsed: {}s", now.elapsed().as_secs());
152    })
153}
154
155/// Utility method usually returning [log::Level::Debug] but when `i` is divisible by `every` returns [log::Level::Info]
156#[deprecated(note = "use `period::Periodic` or `period::PeriodCounter`")]
157pub fn periodic_log_level(i: u32, every: u32) -> Level {
158    if i % every == 0 {
159        Level::Info
160    } else {
161        Level::Debug
162    }
163}
164
165#[cfg(test)]
166mod inner_test {
167    use crate::bitcoin::Network;
168    use crate::{iterate, Config};
169    use bitcoin::Txid;
170    use std::str::FromStr;
171    use std::sync::mpsc::sync_channel;
172    use test_log::test;
173
174    pub fn test_conf() -> Config {
175        Config::new("../blocks", Network::Testnet)
176    }
177
178    #[test]
179    fn test_blk_testnet() {
180        let conf = test_conf();
181        let (send, recv) = sync_channel(0);
182
183        let mut inputs = 0;
184        let mut outputs = 0;
185        let handle = iterate(conf, send);
186        let t1 = Txid::from_str("63375db7e443e491c99bcf46ce49422d05708f83b65335c935dee0a06855ebff")
187            .unwrap();
188        let t2 = Txid::from_str("0280d22f8aaa210b9ec8509067ecc523bf79609d8378cc56196857848cf42ce4")
189            .unwrap();
190        let t3 = Txid::from_str("b3c19d78b4953b694717a47d9852f8ea1ccd4cf93a45ba2e43a0f97d7cdb2655")
191            .unwrap();
192
193        while let Some(b) = recv.recv().unwrap() {
194            if b.height == 394 {
195                assert_eq!(b.fee(), Some(50_000));
196                assert_eq!(b.txids(), &vec![t1, t2, t3]);
197                assert_eq!(b.block_total_txs, 3);
198            }
199
200            inputs += b.block_total_inputs;
201            outputs += b.block_total_outputs;
202        }
203        handle.join().unwrap();
204
205        assert_eq!(inputs, 448);
206        assert_eq!(outputs, 426);
207    }
208
209    #[cfg(feature = "db")]
210    #[test]
211    fn test_blk_testnet_db() {
212        let tempdir = tempfile::TempDir::new().unwrap();
213        let conf = {
214            let mut conf = test_conf();
215            conf.utxo_db = Some(tempdir.path().to_path_buf());
216            conf
217        };
218
219        let mut max_height = 0;
220        for b in super::iter(conf.clone()) {
221            max_height = max_height.max(b.height);
222            if b.height == 389 {
223                assert_eq!(b.fee(), Some(50_000));
224                assert_eq!(b.iter_tx().size_hint(), (2, Some(2)));
225            }
226            assert!(b.iter_tx().next().is_some());
227            for (txid, tx) in b.iter_tx() {
228                assert_eq!(*txid, tx.compute_txid());
229            }
230        }
231        assert_eq!(max_height, 400 - conf.max_reorg as u32);
232
233        // iterating twice, this time prevouts come directly from db
234        for b in super::iter(conf) {
235            if b.height == 394 {
236                assert_eq!(b.fee(), Some(50_000));
237            }
238        }
239    }
240}