bitcoin_block_parser/
headers.rs

1//! Used to parse the [`bitcoin::block::Header`] from the `blocks` directory to order and locate
2//! every block for later parsing.
3
4use crate::xor::{XorReader, XOR_MASK_LEN};
5use anyhow::bail;
6use anyhow::Result;
7use bitcoin::block::Header;
8use bitcoin::consensus::Decodable;
9use bitcoin::hashes::Hash;
10use bitcoin::BlockHash;
11use log::info;
12use std::collections::HashMap;
13use std::fs;
14use std::fs::File;
15use std::io::{BufReader, Read};
16use std::path::{Path, PathBuf};
17use std::sync::mpsc;
18use threadpool::ThreadPool;
19
20/// Before the header are 4 magic bytes and 4 bytes that indicate the block size
21const PRE_HEADER_SIZE: usize = 8;
22
23/// Points to the on-disk location where a block starts (and the header ends)
24#[derive(Clone, Debug)]
25pub struct ParsedHeader {
26    /// Consensus parsed `bitcoin::Header`
27    pub inner: Header,
28    /// Byte offset from the beginning of the file
29    pub offset: usize,
30    /// This header's block hash
31    pub hash: BlockHash,
32    /// Path of the BLK file
33    pub path: PathBuf,
34    /// XOR mask of the BLK file
35    pub xor_mask: Option<[u8; XOR_MASK_LEN]>,
36}
37/// You can [specify the blocks directory](https://en.bitcoin.it/wiki/Data_directory) when
38/// running `bitcoind`.
39
40/// Fast multithreaded parser of [`ParsedHeader`] from the blocks directory
41pub struct HeaderParser;
42impl HeaderParser {
43    /// Parses the headers from the bitcoin `blocks` directory returning [`ParsedHeader`] in height order,
44    /// starting from the genesis block.
45    /// - Returns an `Err` if the directory contains invalid `.blk` files.
46    /// - Takes a few seconds to run.
47    pub fn parse(blocks_dir: &str) -> Result<Vec<ParsedHeader>> {
48        info!("Reading headers from {}", blocks_dir);
49        let xor_mask = Self::read_xor_mask(blocks_dir)?;
50        let (tx, rx) = mpsc::channel();
51        let pool = ThreadPool::new(64);
52
53        // Read headers from every BLK file in a new thread
54        for path in Self::blk_files(blocks_dir)? {
55            let path = path.clone();
56            let tx = tx.clone();
57            pool.execute(move || {
58                let results = Self::parse_headers_file(path, xor_mask);
59                let _ = tx.send(results);
60            });
61        }
62        drop(tx);
63
64        // Receive all the headers from spawned threads
65        let mut locations = HashMap::default();
66        let mut collisions: Vec<ParsedHeader> = vec![];
67        for received in rx {
68            for header in received? {
69                if let Some(collision) = locations.insert(header.inner.prev_blockhash, header) {
70                    collisions.push(collision)
71                }
72            }
73        }
74
75        // Resolve reorgs and order the headers by block height
76        for collision in collisions {
77            Self::resolve_collisions(&mut locations, collision);
78        }
79        let ordered = Self::order_headers(locations);
80        info!("Finished reading {} headers", ordered.len());
81        Ok(ordered)
82    }
83
84    /// Parses headers from a BLK file
85    fn parse_headers_file(
86        path: PathBuf,
87        xor_mask: Option<[u8; XOR_MASK_LEN]>,
88    ) -> Result<Vec<ParsedHeader>> {
89        let buffer_size = PRE_HEADER_SIZE + Header::SIZE;
90        let reader = XorReader::new(File::open(&path)?, xor_mask);
91        let mut reader = BufReader::with_capacity(buffer_size, reader);
92
93        let mut offset = 0;
94        // First 8 bytes are 4 magic bytes and 4 bytes that indicate the block size
95        let mut buffer = vec![0; PRE_HEADER_SIZE];
96        let mut headers = vec![];
97
98        while reader.read_exact(&mut buffer).is_ok() {
99            offset += buffer.len();
100            if let Ok(header) = Header::consensus_decode(&mut reader) {
101                headers.push(ParsedHeader {
102                    inner: header,
103                    offset: offset + Header::SIZE,
104                    hash: header.block_hash(),
105                    path: path.clone(),
106                    xor_mask,
107                });
108                // Get the size of the next block
109                let size = u32::from_le_bytes(buffer[4..].try_into()?) as usize;
110                // Seek to the next block, subtracting the block header bytes we parsed
111                reader.seek_relative(size.saturating_sub(Header::SIZE) as i64)?;
112                offset += size;
113            }
114        }
115        Ok(headers)
116    }
117
118    /// Returns the list of all BLK files in the dir
119    fn blk_files(dir: &str) -> Result<Vec<PathBuf>> {
120        let read_dir = fs::read_dir(Path::new(&dir))?;
121        let mut files = vec![];
122
123        for file in read_dir {
124            let file = file?;
125            let name = file.file_name().to_string_lossy().to_string();
126            if name.starts_with("blk") {
127                files.push(file.path())
128            }
129        }
130
131        if files.is_empty() {
132            bail!("No BLK files found in dir {:?}", dir);
133        }
134
135        Ok(files)
136    }
137
138    /// Reads the block XOR mask. If no `xor.dat` file is present,
139    /// use all-zeroed array to perform an XOR no-op.
140    fn read_xor_mask<P: AsRef<Path>>(dir: P) -> Result<Option<[u8; XOR_MASK_LEN]>> {
141        let path = dir.as_ref().join("xor.dat");
142        if !path.exists() {
143            return Ok(None);
144        }
145        let mut file = File::open(path)?;
146        let mut buf = [0_u8; XOR_MASK_LEN];
147        file.read_exact(&mut buf)?;
148        Ok(Some(buf))
149    }
150
151    /// In case of reorgs we need to resolve to the longest chain
152    fn resolve_collisions(headers: &mut HashMap<BlockHash, ParsedHeader>, collision: ParsedHeader) {
153        let existing = headers
154            .get(&collision.inner.prev_blockhash)
155            .expect("Missing previous blockhash (corrupted blocks)");
156        let mut e_hash = &existing.hash;
157        let mut c_hash = &collision.hash;
158
159        while let (Some(e), Some(c)) = (headers.get(e_hash), headers.get(c_hash)) {
160            e_hash = &e.hash;
161            c_hash = &c.hash;
162        }
163
164        // In case collision is the longest, update the blocks map
165        if headers.contains_key(c_hash) {
166            headers.insert(collision.inner.prev_blockhash, collision);
167        }
168    }
169
170    /// Puts the headers into the correct order by block height (using the hashes)
171    fn order_headers(mut headers: HashMap<BlockHash, ParsedHeader>) -> Vec<ParsedHeader> {
172        let mut ordered_headers = vec![];
173        // Genesis block starts with prev = all_zeros
174        let mut next_hash = BlockHash::all_zeros();
175
176        while let Some(index) = headers.remove(&next_hash) {
177            next_hash = index.hash;
178            ordered_headers.push(index);
179        }
180
181        ordered_headers
182    }
183}