1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
4#![deny(non_upper_case_globals)]
5#![deny(non_camel_case_types)]
6#![deny(non_snake_case)]
7#![deny(unused_mut)]
8#![deny(dead_code)]
9#![deny(unused_imports)]
10#![deny(unused_must_use)]
11#![cfg_attr(docsrs, feature(doc_auto_cfg))]
12
13use bitcoin::BlockHash;
14use log::{info, Level};
15use std::fs::File;
16
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::mpsc::{sync_channel, SyncSender};
19use std::sync::{Arc, Mutex};
20use std::thread;
21use std::thread::JoinHandle;
22use std::time::Instant;
23
24pub use period::{PeriodCounter, Periodic};
25
26mod block_extra;
27mod bsl;
28mod config;
29mod error;
30mod iter;
31mod period;
32mod pipe;
33mod stages;
34mod utxo;
35
36pub use bitcoin;
38pub use fxhash;
39pub use glob;
40pub use log;
41
42pub use block_extra::BlockExtra;
43pub use config::Config;
44pub use error::Error;
45pub use iter::iter;
46pub use pipe::PipeIterator;
47
48#[derive(Debug)]
54pub struct FsBlock {
55 pub file: Arc<Mutex<File>>,
61
62 pub start: usize,
64
65 pub end: usize,
67
68 pub hash: BlockHash,
70
71 pub prev: BlockHash,
73
74 pub next: Vec<BlockHash>,
77
78 pub serialization_version: u8,
80
81 pub(crate) block_total_inputs: u32,
83
84 pub(crate) block_total_outputs: u32,
86
87 pub(crate) block_total_txs: u32,
89}
90
91fn iterate(config: Config, channel: SyncSender<Option<BlockExtra>>) -> JoinHandle<()> {
92 thread::spawn(move || {
93 let now = Instant::now();
94 let early_stop = Arc::new(AtomicBool::new(false));
95
96 let (send_block_fs, receive_block_fs) = sync_channel(0);
98 let _read = stages::ReadDetect::new(
99 config.blocks_dir.clone(),
100 config.network,
101 early_stop.clone(),
102 send_block_fs,
103 config.serialization_version,
104 );
105
106 let (send_ordered_blocks, receive_ordered_blocks) =
107 sync_channel(config.channels_size.into());
108 let _reorder = stages::Reorder::new(
109 config.network,
110 config.max_reorg,
111 config.stop_at_height,
112 early_stop.clone(),
113 receive_block_fs,
114 send_ordered_blocks,
115 );
116
117 let (send_blocks_with_txids, receive_blocks_with_txids) =
118 sync_channel(config.channels_size.into());
119 let send_blocks_with_txids = if config.skip_prevout {
120 channel.clone()
122 } else {
123 send_blocks_with_txids
124 };
125
126 let _compute_txids = stages::ComputeTxids::new(
127 config.skip_prevout,
128 config.start_at_height,
129 receive_ordered_blocks,
130 send_blocks_with_txids,
131 );
132
133 if !config.skip_prevout {
134 match config.utxo_manager() {
135 Ok(utxo_manager) => {
136 let _fee = stages::Fee::new(
137 config.start_at_height,
138 receive_blocks_with_txids,
139 channel,
140 utxo_manager,
141 );
142 }
143 Err(e) => {
144 log::error!("{e}");
145 early_stop.store(true, Ordering::Relaxed);
146 channel.send(None).unwrap();
147 }
148 }
149 }
150
151 info!("Total time elapsed: {}s", now.elapsed().as_secs());
152 })
153}
154
155#[deprecated(note = "use `period::Periodic` or `period::PeriodCounter`")]
157pub fn periodic_log_level(i: u32, every: u32) -> Level {
158 if i % every == 0 {
159 Level::Info
160 } else {
161 Level::Debug
162 }
163}
164
165#[cfg(test)]
166mod inner_test {
167 use crate::bitcoin::Network;
168 use crate::{iterate, Config};
169 use bitcoin::Txid;
170 use std::str::FromStr;
171 use std::sync::mpsc::sync_channel;
172 use test_log::test;
173
174 pub fn test_conf() -> Config {
175 Config::new("../blocks", Network::Testnet)
176 }
177
178 #[test]
179 fn test_blk_testnet() {
180 let conf = test_conf();
181 let (send, recv) = sync_channel(0);
182
183 let mut inputs = 0;
184 let mut outputs = 0;
185 let handle = iterate(conf, send);
186 let t1 = Txid::from_str("63375db7e443e491c99bcf46ce49422d05708f83b65335c935dee0a06855ebff")
187 .unwrap();
188 let t2 = Txid::from_str("0280d22f8aaa210b9ec8509067ecc523bf79609d8378cc56196857848cf42ce4")
189 .unwrap();
190 let t3 = Txid::from_str("b3c19d78b4953b694717a47d9852f8ea1ccd4cf93a45ba2e43a0f97d7cdb2655")
191 .unwrap();
192
193 while let Some(b) = recv.recv().unwrap() {
194 if b.height == 394 {
195 assert_eq!(b.fee(), Some(50_000));
196 assert_eq!(b.txids(), &vec![t1, t2, t3]);
197 assert_eq!(b.block_total_txs, 3);
198 }
199
200 inputs += b.block_total_inputs;
201 outputs += b.block_total_outputs;
202 }
203 handle.join().unwrap();
204
205 assert_eq!(inputs, 448);
206 assert_eq!(outputs, 426);
207 }
208
209 #[cfg(feature = "db")]
210 #[test]
211 fn test_blk_testnet_db() {
212 let tempdir = tempfile::TempDir::new().unwrap();
213 let conf = {
214 let mut conf = test_conf();
215 conf.utxo_db = Some(tempdir.path().to_path_buf());
216 conf
217 };
218
219 let mut max_height = 0;
220 for b in super::iter(conf.clone()) {
221 max_height = max_height.max(b.height);
222 if b.height == 389 {
223 assert_eq!(b.fee(), Some(50_000));
224 assert_eq!(b.iter_tx().size_hint(), (2, Some(2)));
225 }
226 assert!(b.iter_tx().next().is_some());
227 for (txid, tx) in b.iter_tx() {
228 assert_eq!(*txid, tx.compute_txid());
229 }
230 }
231 assert_eq!(max_height, 400 - conf.max_reorg as u32);
232
233 for b in super::iter(conf) {
235 if b.height == 394 {
236 assert_eq!(b.fee(), Some(50_000));
237 }
238 }
239 }
240}