use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use blockchain::{BlockChain, BlockChainDB, BlockProvider};
use bytes::Bytes;
use common_types::{
encoded,
engines::epoch::Transition as EpochTransition,
errors::{SnapshotError, VapcoreError},
snapshot::{ChunkSink, ManifestData, Progress},
receipt::Receipt,
};
use enjen::Engine;
use vapory_types::{H256, U256};
use tetsy_kvdb::KeyValueDB;
use log::trace;
use parking_lot::RwLock;
use rand::rngs::OsRng;
use tetsy_rlp::{RlpStream, Rlp};
use triehash::ordered_trie_root;
use crate::{
SnapshotComponents, Rebuilder,
block::AbridgedBlock,
verify_old_block
};
#[derive(Clone, Copy, PartialEq)]
pub struct PowSnapshot {
pub blocks: u64,
pub max_restore_blocks: u64,
}
impl PowSnapshot {
pub fn new(blocks: u64, max_restore_blocks: u64) -> PowSnapshot {
PowSnapshot { blocks, max_restore_blocks }
}
}
impl SnapshotComponents for PowSnapshot {
fn chunk_all(
&mut self,
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &RwLock<Progress>,
preferred_size: usize,
) -> Result<(), SnapshotError> {
PowWorker {
chain,
rlps: VecDeque::new(),
current_hash: block_at,
writer: chunk_sink,
progress,
preferred_size,
}.chunk_all(self.blocks)
}
fn rebuilder(
&self,
chain: BlockChain,
db: Arc<dyn BlockChainDB>,
manifest: &ManifestData,
) -> Result<Box<dyn Rebuilder>, VapcoreError> {
PowRebuilder::new(
chain,
db.key_value().clone(),
manifest,
self.max_restore_blocks
).map(|r| Box::new(r) as Box<_>)
}
fn min_supported_version(&self) -> u64 { crate::MIN_SUPPORTED_STATE_CHUNK_VERSION }
fn current_version(&self) -> u64 { crate::STATE_CHUNK_VERSION }
}
struct PowWorker<'a> {
chain: &'a BlockChain,
rlps: VecDeque<Bytes>,
current_hash: H256,
writer: &'a mut ChunkSink<'a>,
progress: &'a RwLock<Progress>,
preferred_size: usize,
}
impl<'a> PowWorker<'a> {
fn chunk_all(&mut self, snapshot_blocks: u64) -> Result<(), SnapshotError> {
let mut loaded_size = 0;
let mut last = self.current_hash;
let genesis_hash = self.chain.genesis_hash();
for _ in 0..snapshot_blocks {
if self.current_hash == genesis_hash { break }
let (block, receipts) = self.chain.block(&self.current_hash)
.and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r)))
.ok_or_else(||SnapshotError::BlockNotFound(self.current_hash))?;
let abridged_rlp = AbridgedBlock::from_block_view(&block.view()).into_inner();
let pair = {
let mut pair_stream = RlpStream::new_list(2);
pair_stream.append_raw(&abridged_rlp, 1).append(&receipts);
pair_stream.out()
};
let new_loaded_size = loaded_size + pair.len();
if new_loaded_size > self.preferred_size && !self.rlps.is_empty() {
self.write_chunk(last)?;
loaded_size = pair.len();
} else {
loaded_size = new_loaded_size;
}
self.rlps.push_front(pair);
last = self.current_hash;
self.current_hash = block.header_view().parent_hash();
self.progress.write().blocks += 1;
}
if loaded_size != 0 {
self.write_chunk(last)?;
}
Ok(())
}
fn write_chunk(&mut self, last: H256) -> Result<(), SnapshotError> {
trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len());
let (last_header, last_details) = self.chain.block_header_data(&last)
.and_then(|n| self.chain.block_details(&last).map(|d| (n, d)))
.ok_or_else(||SnapshotError::BlockNotFound(last))?;
let parent_number = last_header.number() - 1;
let parent_hash = last_header.parent_hash();
let parent_total_difficulty = last_details.total_difficulty - last_header.difficulty();
trace!(target: "snapshot", "parent last written block: #{}/{}", parent_number, parent_hash);
let num_entries = self.rlps.len();
let mut rlp_stream = RlpStream::new_list(3 + num_entries);
rlp_stream.append(&parent_number).append(&parent_hash).append(&parent_total_difficulty);
for pair in self.rlps.drain(..) {
rlp_stream.append_raw(&pair, 1);
}
let raw_data = rlp_stream.out();
(self.writer)(&raw_data)?;
Ok(())
}
}
pub struct PowRebuilder {
chain: BlockChain,
db: Arc<dyn KeyValueDB>,
rng: OsRng,
disconnected: Vec<(u64, H256)>,
best_number: u64,
best_hash: H256,
best_root: H256,
fed_blocks: u64,
snapshot_blocks: u64,
}
impl PowRebuilder {
fn new(chain: BlockChain, db: Arc<dyn KeyValueDB>, manifest: &ManifestData, snapshot_blocks: u64) -> Result<Self, VapcoreError> {
Ok(PowRebuilder {
chain,
db,
rng: OsRng,
disconnected: Vec::new(),
best_number: manifest.block_number,
best_hash: manifest.block_hash,
best_root: manifest.state_root,
fed_blocks: 0,
snapshot_blocks,
})
}
}
impl Rebuilder for PowRebuilder {
fn feed(&mut self, chunk: &[u8], engine: &dyn Engine, abort_flag: &AtomicBool) -> Result<(), VapcoreError> {
let rlp = Rlp::new(chunk);
let item_count = rlp.item_count()?;
let num_blocks = (item_count - 3) as u64;
trace!(target: "snapshot", "restoring block chunk with {} blocks.", num_blocks);
if self.fed_blocks + num_blocks > self.snapshot_blocks {
return Err(SnapshotError::TooManyBlocks(self.snapshot_blocks, self.fed_blocks + num_blocks).into())
}
let mut cur_number = rlp.val_at::<u64>(0)? + 1;
let mut parent_hash = rlp.val_at::<H256>(1)?;
let parent_total_difficulty = rlp.val_at::<U256>(2)?;
for idx in 3..item_count {
if !abort_flag.load(Ordering::SeqCst) { return Err(SnapshotError::RestorationAborted.into()) }
let pair = rlp.at(idx)?;
let abridged_rlp = pair.at(0)?.as_raw().to_owned();
let abridged_block = AbridgedBlock::from_raw(abridged_rlp);
let receipts: Vec<Receipt> = pair.list_at(1)?;
let receipts_root = ordered_trie_root(pair.at(1)?.iter().map(|r| r.as_raw()));
let block = abridged_block.to_block(parent_hash, cur_number, receipts_root)?;
let block_bytes = encoded::Block::new(block.rlp_bytes());
let is_best = cur_number == self.best_number;
if is_best {
if block.header.hash() != self.best_hash {
return Err(SnapshotError::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into())
}
if block.header.state_root() != &self.best_root {
return Err(SnapshotError::WrongStateRoot(self.best_root, *block.header.state_root()).into())
}
}
verify_old_block(
&mut self.rng,
&block.header,
engine,
&self.chain,
is_best
)?;
let mut batch = self.db.transaction();
if idx == 3 {
if self.chain.insert_unordered_block(&mut batch, block_bytes, receipts, Some(parent_total_difficulty), is_best, false) {
self.disconnected.push((cur_number, block.header.hash()));
}
} else {
self.chain.insert_unordered_block(&mut batch, block_bytes, receipts, None, is_best, false);
}
self.db.write_buffered(batch);
self.chain.commit();
parent_hash = block.header.hash();
cur_number += 1;
}
self.fed_blocks += num_blocks;
Ok(())
}
fn finalize(&mut self) -> Result<(), VapcoreError> {
let mut batch = self.db.transaction();
trace!(target: "snapshot", "rebuilder, finalize: inserting {} disconnected chunks", self.disconnected.len());
for (first_num, first_hash) in self.disconnected.drain(..) {
let parent_num = first_num - 1;
if let Some(parent_hash) = self.chain.block_hash(parent_num) {
self.chain.add_child(&mut batch, parent_hash, first_hash);
}
}
let genesis_hash = self.chain.genesis_hash();
self.chain.insert_epoch_transition(&mut batch, 0, EpochTransition {
block_number: 0,
block_hash: genesis_hash,
proof: vec![],
});
self.db.write_buffered(batch);
Ok(())
}
}