bcloop 0.1.0

A tool for processing Bitcoin-like blockchain data
Documentation
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Read, Result, Seek, SeekFrom};
use std::sync::{Arc, Condvar, Mutex, mpsc};

use rayon::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{Block, sha256d};

// MARK: BlockInfo

#[derive(Debug, Clone)]
pub struct BlockInfo {
  pub file: String,
  pub height: u32,
  pub block_hash: [u8; 32],
  pub prev_block: [u8; 32],
  pub block_pos: u64,
  pub block_len: u32,
}

impl BlockInfo {
  pub fn read_raw(&self) -> Result<Vec<u8>> {
    let mut data = Vec::with_capacity(self.block_len as usize);
    let mut file = std::fs::File::open(&self.file)?;

    file.seek(SeekFrom::Start(self.block_pos))?;
    let bytes_read = file.take(self.block_len as u64).read_to_end(&mut data)?;

    if bytes_read != self.block_len as usize {
      return Err(std::io::Error::new(
        std::io::ErrorKind::UnexpectedEof,
        format!("{} != {} at block {} ({})", bytes_read, self.block_len, self.height, self.file),
      ));
    }

    Ok(data)
  }

  pub fn read(&self) -> Result<Block> {
    let dat = self.read_raw()?;
    let mut cur = std::io::Cursor::new(dat);
    let mut block = Block::load(&mut cur).map_err(|e| {
      std::io::Error::new(
        std::io::ErrorKind::InvalidData,
        format!("Error decoding block {} ({}): {}", self.height, self.file, e),
      )
    })?;

    block.height = self.height;
    block.block_hash = self.block_hash;
    Ok(block)
  }
}

// Helper functions

fn read_u32(buf: &mut BufReader<std::fs::File>) -> Result<u32> {
  let mut bytes = [0u8; 4];
  buf.read_exact(&mut bytes)?;
  Ok(u32::from_le_bytes(bytes))
}

fn get_dat_files(datadir: &str) -> Result<Vec<String>> {
  let mut files: Vec<String> = Vec::new();
  for file in std::fs::read_dir(datadir)?.filter_map(Result::ok) {
    let path = file.path();
    let name = path.file_name().unwrap().to_str().unwrap();
    if name.starts_with("blk") && name.ends_with(".dat") {
      files.push(path.to_str().unwrap().to_string());
    }
  }

  files.sort();
  Ok(files)
}

fn get_ordered_chain(
  items: &HashMap<[u8; 32], Vec<BlockInfo>>,
  prev_hash: &[u8; 32],
  cur_height: u32,
) -> Vec<BlockInfo> {
  let mut chain = Vec::new();
  let mut prev_hash = *prev_hash;
  let mut cur_height = cur_height;

  while let Some(blocks) = items.get(&prev_hash) {
    match blocks.len() {
      0 => break,
      1 => {
        let mut block = blocks[0].clone();
        block.height = cur_height;
        chain.push(block);
      }
      _ => {
        // two or more blocks with the same hash, only longest chain is valid
        // when we found longest chain, we can join it with the current chain
        let mut longest_chain = Vec::new();
        for block in blocks {
          let mut block = block.clone();
          block.height = cur_height;

          let mut tmp_chain = get_ordered_chain(items, &block.block_hash, cur_height + 1);
          tmp_chain.insert(0, block);

          if tmp_chain.len() > longest_chain.len() {
            longest_chain = tmp_chain;
          }
        }

        chain.extend(longest_chain);
      }
    }

    prev_hash = chain.last().unwrap().block_hash;
    cur_height = chain.last().unwrap().height + 1;
  }

  chain
}

pub fn parse_dat_file(filepath: &str) -> Result<Vec<BlockInfo>> {
  const HEADER_SIZE: usize = 4 + 4 + 80; // magic + block_len + block_hdr
  let mut blocks = Vec::new();

  let mut buf = BufReader::with_capacity(HEADER_SIZE, std::fs::File::open(&filepath)?);
  let mut pos: u64 = 0;

  loop {
    buf.seek(SeekFrom::Start(pos))?;
    buf.consume(HEADER_SIZE);
    // buf.fill_buf()?;

    let magic = match read_u32(&mut buf) {
      Ok(m) => m,
      Err(_) => break, // EOF
    };

    let block_len = match read_u32(&mut buf) {
      Ok(l) => l,
      Err(_) => break, // EOF
    };

    if magic == 0 || block_len == 0 {
      break; // block reserved, but not loaded yet
    }

    let block_pos = pos + 4 + 4;
    let mut block_hdr = [0u8; 80];
    buf.read_exact(&mut block_hdr)?;

    pos = block_pos + block_len as u64;

    let mut block_hash = sha256d(&block_hdr);
    let mut prev_block: [u8; 32] = block_hdr[4..36].try_into().map_err(|e| {
      std::io::Error::new(
        std::io::ErrorKind::InvalidData,
        format!("Failed to convert prev_block slice to array: {}", e),
      )
    })?;

    // big endian to little endian
    block_hash.reverse();
    prev_block.reverse();

    blocks.push(BlockInfo {
      file: filepath.to_string(),
      height: 0,
      block_hash,
      block_pos,
      block_len,
      prev_block,
    });
  }

  Ok(blocks)
}

// BcWalker

pub struct BcWalker {
  blocks: Vec<BlockInfo>,
}

impl BcWalker {
  pub fn from_dir(datadir: &str) -> Result<BcWalker> {
    let files = get_dat_files(datadir)?;

    let items = files.into_par_iter().map(|file| parse_dat_file(&file));
    let items = items.map(|x| x.unwrap_or_default()).flatten().collect::<Vec<_>>();
    let items = items.iter().fold(HashMap::new(), |mut map, (block)| {
      map.entry(block.prev_block).or_insert_with(Vec::new).push(block.clone());
      map
    });

    let prev_hash = [0u8; 32]; // genesis block hash
    let ordered_blocks = get_ordered_chain(&items, &prev_hash, 0);

    // verify
    let mut prev_hash = prev_hash;
    let mut cur_height = 0;
    for block in &ordered_blocks {
      let a = hex::encode(prev_hash);
      let b = hex::encode(block.prev_block);
      assert_eq!(cur_height, block.height, "cur_height mismatch at {}", cur_height);
      assert_eq!(a, b, "prev_hash mismatch at {}", cur_height);
      prev_hash = block.block_hash.clone();
      cur_height += 1;
      // println!("{:6} {}", block.height, hex::encode(block.block_hash));
    }

    Ok(BcWalker { blocks: ordered_blocks })
  }

  pub fn blocks_count(&self) -> usize {
    self.blocks.len()
  }

  pub fn bc_size(&self) -> u64 {
    self.blocks.iter().map(|b| b.block_len as u64).sum()
  }

  pub fn bc_size_at(&self, min_block: usize, max_block: usize) -> u64 {
    let max_block = max_block.min(self.blocks.len() - 1);
    self.blocks[min_block..=max_block].iter().map(|b| b.block_len as u64).sum::<u64>()
  }

  pub fn get_chan(&self) -> mpsc::Receiver<(Block, u32)> {
    self.get_chan_at(0, self.blocks.len() - 1)
  }

  pub fn get_chan_at(&self, min_block: usize, max_block: usize) -> mpsc::Receiver<(Block, u32)> {
    let blocks_info: Arc<[BlockInfo]> = Arc::from(&self.blocks[min_block..=max_block]);

    let cpu_count = std::thread::available_parallelism().unwrap().get() * 2;
    let (tx, rx) = mpsc::sync_channel(cpu_count);

    let cur_idx = Arc::new(Mutex::new(0));
    let res_idx = Arc::new((Mutex::new(0), Condvar::new()));

    for _ in 0..cpu_count {
      let blocks_info = blocks_info.clone();
      let cur_idx = cur_idx.clone();
      let res_idx = res_idx.clone();
      let tx = tx.clone();

      std::thread::spawn(move || {
        loop {
          let mut cur_idx_lock = cur_idx.lock().unwrap();
          let idx = *cur_idx_lock;
          *cur_idx_lock += 1;
          drop(cur_idx_lock);

          if idx >= blocks_info.len() {
            break;
          }

          let block_info = &blocks_info[idx];
          let block = block_info.read().unwrap_or_else(|e| {
            panic!("Error reading block {} from {}: {}", block_info.height, block_info.file, e);
          });

          let (res_idx_lock, cvar) = &*res_idx;
          let mut start = cvar.wait_while(res_idx_lock.lock().unwrap(), |x| idx != *x).unwrap();

          tx.send((block, block_info.block_len)).expect("Failed to send block");
          *start += 1;
          cvar.notify_all();
        }
      });
    }

    rx
  }
}