use std::collections::HashMap;
use std::io::{BufRead, BufReader, Read, Result, Seek, SeekFrom};
use std::sync::{Arc, Condvar, Mutex, mpsc};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use crate::{Block, sha256d};
#[derive(Debug, Clone)]
pub struct BlockInfo {
pub file: String,
pub height: u32,
pub block_hash: [u8; 32],
pub prev_block: [u8; 32],
pub block_pos: u64,
pub block_len: u32,
}
impl BlockInfo {
pub fn read_raw(&self) -> Result<Vec<u8>> {
let mut data = Vec::with_capacity(self.block_len as usize);
let mut file = std::fs::File::open(&self.file)?;
file.seek(SeekFrom::Start(self.block_pos))?;
let bytes_read = file.take(self.block_len as u64).read_to_end(&mut data)?;
if bytes_read != self.block_len as usize {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("{} != {} at block {} ({})", bytes_read, self.block_len, self.height, self.file),
));
}
Ok(data)
}
pub fn read(&self) -> Result<Block> {
let dat = self.read_raw()?;
let mut cur = std::io::Cursor::new(dat);
let mut block = Block::load(&mut cur).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Error decoding block {} ({}): {}", self.height, self.file, e),
)
})?;
block.height = self.height;
block.block_hash = self.block_hash;
Ok(block)
}
}
fn read_u32(buf: &mut BufReader<std::fs::File>) -> Result<u32> {
let mut bytes = [0u8; 4];
buf.read_exact(&mut bytes)?;
Ok(u32::from_le_bytes(bytes))
}
fn get_dat_files(datadir: &str) -> Result<Vec<String>> {
let mut files: Vec<String> = Vec::new();
for file in std::fs::read_dir(datadir)?.filter_map(Result::ok) {
let path = file.path();
let name = path.file_name().unwrap().to_str().unwrap();
if name.starts_with("blk") && name.ends_with(".dat") {
files.push(path.to_str().unwrap().to_string());
}
}
files.sort();
Ok(files)
}
fn get_ordered_chain(
items: &HashMap<[u8; 32], Vec<BlockInfo>>,
prev_hash: &[u8; 32],
cur_height: u32,
) -> Vec<BlockInfo> {
let mut chain = Vec::new();
let mut prev_hash = *prev_hash;
let mut cur_height = cur_height;
while let Some(blocks) = items.get(&prev_hash) {
match blocks.len() {
0 => break,
1 => {
let mut block = blocks[0].clone();
block.height = cur_height;
chain.push(block);
}
_ => {
let mut longest_chain = Vec::new();
for block in blocks {
let mut block = block.clone();
block.height = cur_height;
let mut tmp_chain = get_ordered_chain(items, &block.block_hash, cur_height + 1);
tmp_chain.insert(0, block);
if tmp_chain.len() > longest_chain.len() {
longest_chain = tmp_chain;
}
}
chain.extend(longest_chain);
}
}
prev_hash = chain.last().unwrap().block_hash;
cur_height = chain.last().unwrap().height + 1;
}
chain
}
pub fn parse_dat_file(filepath: &str) -> Result<Vec<BlockInfo>> {
const HEADER_SIZE: usize = 4 + 4 + 80; let mut blocks = Vec::new();
let mut buf = BufReader::with_capacity(HEADER_SIZE, std::fs::File::open(&filepath)?);
let mut pos: u64 = 0;
loop {
buf.seek(SeekFrom::Start(pos))?;
buf.consume(HEADER_SIZE);
let magic = match read_u32(&mut buf) {
Ok(m) => m,
Err(_) => break, };
let block_len = match read_u32(&mut buf) {
Ok(l) => l,
Err(_) => break, };
if magic == 0 || block_len == 0 {
break; }
let block_pos = pos + 4 + 4;
let mut block_hdr = [0u8; 80];
buf.read_exact(&mut block_hdr)?;
pos = block_pos + block_len as u64;
let mut block_hash = sha256d(&block_hdr);
let mut prev_block: [u8; 32] = block_hdr[4..36].try_into().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Failed to convert prev_block slice to array: {}", e),
)
})?;
block_hash.reverse();
prev_block.reverse();
blocks.push(BlockInfo {
file: filepath.to_string(),
height: 0,
block_hash,
block_pos,
block_len,
prev_block,
});
}
Ok(blocks)
}
pub struct BcWalker {
blocks: Vec<BlockInfo>,
}
impl BcWalker {
pub fn from_dir(datadir: &str) -> Result<BcWalker> {
let files = get_dat_files(datadir)?;
let items = files.into_par_iter().map(|file| parse_dat_file(&file));
let items = items.map(|x| x.unwrap_or_default()).flatten().collect::<Vec<_>>();
let items = items.iter().fold(HashMap::new(), |mut map, (block)| {
map.entry(block.prev_block).or_insert_with(Vec::new).push(block.clone());
map
});
let prev_hash = [0u8; 32]; let ordered_blocks = get_ordered_chain(&items, &prev_hash, 0);
let mut prev_hash = prev_hash;
let mut cur_height = 0;
for block in &ordered_blocks {
let a = hex::encode(prev_hash);
let b = hex::encode(block.prev_block);
assert_eq!(cur_height, block.height, "cur_height mismatch at {}", cur_height);
assert_eq!(a, b, "prev_hash mismatch at {}", cur_height);
prev_hash = block.block_hash.clone();
cur_height += 1;
}
Ok(BcWalker { blocks: ordered_blocks })
}
pub fn blocks_count(&self) -> usize {
self.blocks.len()
}
pub fn bc_size(&self) -> u64 {
self.blocks.iter().map(|b| b.block_len as u64).sum()
}
pub fn bc_size_at(&self, min_block: usize, max_block: usize) -> u64 {
let max_block = max_block.min(self.blocks.len() - 1);
self.blocks[min_block..=max_block].iter().map(|b| b.block_len as u64).sum::<u64>()
}
pub fn get_chan(&self) -> mpsc::Receiver<(Block, u32)> {
self.get_chan_at(0, self.blocks.len() - 1)
}
pub fn get_chan_at(&self, min_block: usize, max_block: usize) -> mpsc::Receiver<(Block, u32)> {
let blocks_info: Arc<[BlockInfo]> = Arc::from(&self.blocks[min_block..=max_block]);
let cpu_count = std::thread::available_parallelism().unwrap().get() * 2;
let (tx, rx) = mpsc::sync_channel(cpu_count);
let cur_idx = Arc::new(Mutex::new(0));
let res_idx = Arc::new((Mutex::new(0), Condvar::new()));
for _ in 0..cpu_count {
let blocks_info = blocks_info.clone();
let cur_idx = cur_idx.clone();
let res_idx = res_idx.clone();
let tx = tx.clone();
std::thread::spawn(move || {
loop {
let mut cur_idx_lock = cur_idx.lock().unwrap();
let idx = *cur_idx_lock;
*cur_idx_lock += 1;
drop(cur_idx_lock);
if idx >= blocks_info.len() {
break;
}
let block_info = &blocks_info[idx];
let block = block_info.read().unwrap_or_else(|e| {
panic!("Error reading block {} from {}: {}", block_info.height, block_info.file, e);
});
let (res_idx_lock, cvar) = &*res_idx;
let mut start = cvar.wait_while(res_idx_lock.lock().unwrap(), |x| idx != *x).unwrap();
tx.send((block, block_info.block_len)).expect("Failed to send block");
*start += 1;
cvar.notify_all();
}
});
}
rx
}
}