use crate::core::core::merkle_proof::MerkleProof;
use crate::core::core::{
Block, BlockHeader, BlockSums, Committed, Inputs, KernelFeatures, Output, OutputIdentifier,
SegmentIdentifier, Transaction, TxKernel,
};
use crate::core::global;
use crate::core::pow;
use crate::core::ser::ProtocolVersion;
use crate::error::{Error, ErrorKind};
use crate::pipe;
use crate::store;
use crate::txhashset;
use crate::txhashset::{PMMRHandle, Segmenter, TxHashSet};
use crate::types::{
BlockStatus, ChainAdapter, CommitPos, NoStatus, Options, Tip, TxHashsetWriteStatus,
};
use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::util::RwLock;
use crate::{
core::core::hash::{Hash, Hashed},
store::Batch,
txhashset::{ExtensionPair, HeaderExtension},
};
use grin_store::Error::NotFoundErr;
use std::collections::HashMap;
use std::fs::{self, File};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
pub const MAX_ORPHAN_SIZE: usize = 200;
const MAX_ORPHAN_AGE_SECS: u64 = 300;
#[derive(Debug, Clone)]
struct Orphan {
block: Block,
opts: Options,
added: Instant,
}
pub struct OrphanBlockPool {
orphans: RwLock<HashMap<Hash, Orphan>>,
height_idx: RwLock<HashMap<u64, Vec<Hash>>>,
evicted: AtomicUsize,
}
impl OrphanBlockPool {
fn new() -> OrphanBlockPool {
OrphanBlockPool {
orphans: RwLock::new(HashMap::new()),
height_idx: RwLock::new(HashMap::new()),
evicted: AtomicUsize::new(0),
}
}
fn len(&self) -> usize {
let orphans = self.orphans.read();
orphans.len()
}
fn len_evicted(&self) -> usize {
self.evicted.load(Ordering::Relaxed)
}
fn add(&self, orphan: Orphan) {
let mut orphans = self.orphans.write();
let mut height_idx = self.height_idx.write();
{
let height_hashes = height_idx
.entry(orphan.block.header.height)
.or_insert_with(|| vec![]);
height_hashes.push(orphan.block.hash());
orphans.insert(orphan.block.hash(), orphan);
}
if orphans.len() > MAX_ORPHAN_SIZE {
let old_len = orphans.len();
orphans.retain(|_, ref mut x| {
x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS)
});
let mut heights = height_idx.keys().cloned().collect::<Vec<u64>>();
heights.sort_unstable();
for h in heights.iter().rev() {
if let Some(hs) = height_idx.remove(h) {
for h in hs {
let _ = orphans.remove(&h);
}
}
if orphans.len() < MAX_ORPHAN_SIZE {
break;
}
}
height_idx.retain(|_, ref mut xs| xs.iter().any(|x| orphans.contains_key(&x)));
self.evicted
.fetch_add(old_len - orphans.len(), Ordering::Relaxed);
}
}
fn remove_by_height(&self, height: u64) -> Option<Vec<Orphan>> {
let mut orphans = self.orphans.write();
let mut height_idx = self.height_idx.write();
height_idx
.remove(&height)
.map(|hs| hs.iter().filter_map(|h| orphans.remove(h)).collect())
}
pub fn contains(&self, hash: &Hash) -> bool {
let orphans = self.orphans.read();
orphans.contains_key(hash)
}
}
pub struct Chain {
db_root: String,
store: Arc<store::ChainStore>,
adapter: Arc<dyn ChainAdapter + Send + Sync>,
orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
pibd_segmenter: Arc<RwLock<Option<Segmenter>>>,
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
denylist: Arc<RwLock<Vec<Hash>>>,
archive_mode: bool,
genesis: BlockHeader,
}
impl Chain {
pub fn init(
db_root: String,
adapter: Arc<dyn ChainAdapter + Send + Sync>,
genesis: Block,
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
archive_mode: bool,
) -> Result<Chain, Error> {
let store = Arc::new(store::ChainStore::new(&db_root)?);
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;
let mut header_pmmr = PMMRHandle::new(
Path::new(&db_root).join("header").join("header_head"),
false,
ProtocolVersion(1),
None,
)?;
setup_head(&genesis, &store, &mut header_pmmr, &mut txhashset)?;
{
let batch = store.batch()?;
txhashset.init_output_pos_index(&header_pmmr, &batch)?;
txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;
batch.commit()?;
}
let chain = Chain {
db_root,
store,
adapter,
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
header_pmmr: Arc::new(RwLock::new(header_pmmr)),
pibd_segmenter: Arc::new(RwLock::new(None)),
pow_verifier,
denylist: Arc::new(RwLock::new(vec![])),
archive_mode,
genesis: genesis.header,
};
chain.log_heads()?;
if let Ok(segmenter) = chain.segmenter() {
let _ = segmenter.kernel_segment(SegmentIdentifier { height: 9, idx: 0 });
let _ = segmenter.bitmap_segment(SegmentIdentifier { height: 9, idx: 0 });
let _ = segmenter.output_segment(SegmentIdentifier { height: 11, idx: 0 });
let _ = segmenter.rangeproof_segment(SegmentIdentifier { height: 7, idx: 0 });
}
Ok(chain)
}
pub fn invalidate_header(&self, hash: Hash) -> Result<(), Error> {
self.denylist.write().push(hash);
Ok(())
}
pub fn reset_chain_head<T: Into<Tip>>(&self, head: T) -> Result<(), Error> {
let head = head.into();
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::extending(
&mut header_pmmr,
&mut txhashset,
&mut batch,
|ext, batch| {
self.rewind_and_apply_fork(&header, ext, batch)?;
batch.save_body_head(&head)?;
Ok(())
},
)?;
txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| {
self.rewind_and_apply_header_fork(&header, ext, batch)?;
batch.save_header_head(&head)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}
pub fn archive_mode(&self) -> bool {
self.archive_mode
}
pub fn header_pmmr(&self) -> Arc<RwLock<PMMRHandle<BlockHeader>>> {
self.header_pmmr.clone()
}
pub fn txhashset(&self) -> Arc<RwLock<TxHashSet>> {
self.txhashset.clone()
}
pub fn store(&self) -> Arc<store::ChainStore> {
self.store.clone()
}
fn log_heads(&self) -> Result<(), Error> {
let log_head = |name, head: Tip| {
debug!(
"{}: {} @ {} [{}]",
name,
head.total_difficulty.to_num(),
head.height,
head.hash(),
);
};
log_head("head", self.head()?);
log_head("header_head", self.header_head()?);
Ok(())
}
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
let height = b.header.height;
let res = self.process_block_single(b, opts);
if res.is_ok() {
self.check_orphans(height + 1);
}
res
}
fn determine_status(
&self,
head: Option<Tip>,
prev: Tip,
prev_head: Tip,
fork_point: Tip,
) -> BlockStatus {
if let Some(head) = head {
if self.is_on_current_chain(prev_head, head).is_ok() {
BlockStatus::Next { prev }
} else {
BlockStatus::Reorg {
prev,
prev_head,
fork_point,
}
}
} else {
BlockStatus::Fork {
prev,
head: prev_head,
fork_point,
}
}
}
pub fn is_known(&self, header: &BlockHeader) -> Result<(), Error> {
let head = self.head()?;
if head.hash() == header.hash() {
return Err(ErrorKind::Unfit("duplicate block".into()).into());
}
if header.total_difficulty() <= head.total_difficulty {
if self.block_exists(header.hash())? {
return Err(ErrorKind::Unfit("duplicate block".into()).into());
}
}
Ok(())
}
fn check_orphan(&self, block: &Block, opts: Options) -> Result<(), Error> {
let head = self.head()?;
let is_next = block.header.prev_hash == head.last_block_h;
if is_next || self.block_exists(block.header.prev_hash)? {
return Ok(());
}
let block_hash = block.hash();
let orphan = Orphan {
block: block.clone(),
opts,
added: Instant::now(),
};
self.orphans.add(orphan);
debug!(
"is_orphan: {:?}, # orphans {}{}",
block_hash,
self.orphans.len(),
if self.orphans.len_evicted() > 0 {
format!(", # evicted {}", self.orphans.len_evicted())
} else {
String::new()
},
);
Err(ErrorKind::Orphan.into())
}
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
self.process_block_header(&b.header, opts)?;
self.is_known(&b.header)?;
self.check_orphan(&b, opts)?;
let (head, fork_point, prev_head) = {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let prev_head = batch.head()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
let (head, fork_point) = pipe::process_block(&b, &mut ctx)?;
ctx.batch.commit()?;
(head, fork_point, prev_head)
};
let prev = self.get_previous_header(&b.header)?;
let status = self.determine_status(
head,
Tip::from_header(&prev),
prev_head,
Tip::from_header(&fork_point),
);
self.adapter.block_accepted(&b, status, opts);
Ok(head)
}
pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
pipe::process_block_header(bh, &mut ctx)?;
ctx.batch.commit()?;
Ok(())
}
pub fn sync_block_headers(
&self,
headers: &[BlockHeader],
sync_head: Tip,
opts: Options,
) -> Result<Option<Tip>, Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
let sync_head = pipe::process_block_headers(headers, sync_head, &mut ctx)?;
ctx.batch.commit()?;
Ok(sync_head)
}
pub fn new_ctx<'a>(
&self,
opts: Options,
batch: store::Batch<'a>,
header_pmmr: &'a mut txhashset::PMMRHandle<BlockHeader>,
txhashset: &'a mut txhashset::TxHashSet,
) -> Result<pipe::BlockContext<'a>, Error> {
let denylist = self.denylist.read().clone();
Ok(pipe::BlockContext {
opts,
pow_verifier: self.pow_verifier,
header_allowed: Box::new(move |header| {
pipe::validate_header_denylist(header, &denylist)
}),
header_pmmr,
txhashset,
batch,
})
}
pub fn is_orphan(&self, hash: &Hash) -> bool {
self.orphans.contains(hash)
}
pub fn orphans_evicted_len(&self) -> usize {
self.orphans.len_evicted()
}
fn check_orphans(&self, mut height: u64) {
let initial_height = height;
loop {
trace!(
"check_orphans: at {}, # orphans {}",
height,
self.orphans.len(),
);
let mut orphan_accepted = false;
let mut height_accepted = height;
if let Some(orphans) = self.orphans.remove_by_height(height) {
let orphans_len = orphans.len();
for (i, orphan) in orphans.into_iter().enumerate() {
debug!(
"check_orphans: get block {} at {}{}",
orphan.block.hash(),
height,
if orphans_len > 1 {
format!(", no.{} of {} orphans", i, orphans_len)
} else {
String::new()
},
);
let height = orphan.block.header.height;
let res = self.process_block_single(orphan.block, orphan.opts);
if res.is_ok() {
orphan_accepted = true;
height_accepted = height;
}
}
if orphan_accepted {
height = height_accepted + 1;
continue;
}
}
break;
}
if initial_height != height {
debug!(
"check_orphans: {} blocks accepted since height {}, remaining # orphans {}",
height - initial_height,
initial_height,
self.orphans.len(),
);
}
}
pub fn get_unspent(
&self,
commit: Commitment,
) -> Result<Option<(OutputIdentifier, CommitPos)>, Error> {
self.txhashset.read().get_unspent(commit)
}
pub fn get_unspent_output_at(&self, pos: u64) -> Result<Output, Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, _| {
utxo.get_unspent_output_at(pos)
})
}
pub fn validate_tx(&self, tx: &Transaction) -> Result<(), Error> {
self.validate_tx_against_utxo(tx)?;
self.validate_tx_kernels(tx)?;
Ok(())
}
fn validate_tx_kernels(&self, tx: &Transaction) -> Result<(), Error> {
let has_nrd_kernel = tx.kernels().iter().any(|k| match k.features {
KernelFeatures::NoRecentDuplicate { .. } => true,
_ => false,
});
if !has_nrd_kernel {
return Ok(());
}
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
let height = self.next_block_height()?;
ext.extension.apply_kernels(tx.kernels(), height, batch)
})
}
fn validate_tx_against_utxo(
&self,
tx: &Transaction,
) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
utxo.validate_tx(tx, batch)
})
}
pub fn validate_inputs(
&self,
inputs: &Inputs,
) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
utxo.validate_inputs(inputs, batch)
})
}
fn next_block_height(&self) -> Result<u64, Error> {
let bh = self.head_header()?;
Ok(bh.height + 1)
}
pub fn verify_coinbase_maturity(&self, inputs: &Inputs) -> Result<(), Error> {
let height = self.next_block_height()?;
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
utxo.verify_coinbase_maturity(inputs, height, batch)?;
Ok(())
})
}
pub fn verify_tx_lock_height(&self, tx: &Transaction) -> Result<(), Error> {
let height = self.next_block_height()?;
if tx.lock_height() <= height {
Ok(())
} else {
Err(ErrorKind::TxLockHeight.into())
}
}
pub fn validate(&self, fast_validation: bool) -> Result<(), Error> {
let header = self.store.head_header()?;
if header.height == 0 {
return Ok(());
}
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
self.rewind_and_apply_fork(&header, ext, batch)?;
ext.extension
.validate(&self.genesis, fast_validation, &NoStatus, &header)?;
Ok(())
})
}
pub fn set_prev_root_only(&self, header: &mut BlockHeader) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let prev_root =
txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
let prev_header = batch.get_previous_header(header)?;
self.rewind_and_apply_header_fork(&prev_header, ext, batch)?;
ext.root()
})?;
header.prev_root = prev_root;
Ok(())
}
pub fn set_txhashset_roots(&self, b: &mut Block) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let (prev_root, roots, sizes) =
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
let previous_header = batch.get_previous_header(&b.header)?;
self.rewind_and_apply_fork(&previous_header, ext, batch)?;
let extension = &mut ext.extension;
let header_extension = &mut ext.header_extension;
let prev_root = header_extension.root()?;
extension.apply_block(b, header_extension, batch)?;
Ok((prev_root, extension.roots()?, extension.sizes()))
})?;
{
let (output_mmr_size, _, kernel_mmr_size) = sizes;
b.header.output_mmr_size = output_mmr_size;
b.header.kernel_mmr_size = kernel_mmr_size;
}
b.header.prev_root = prev_root;
b.header.output_root = roots.output_root(&b.header);
b.header.range_proof_root = roots.rproof_root;
b.header.kernel_root = roots.kernel_root;
Ok(())
}
pub fn get_merkle_proof<T: AsRef<OutputIdentifier>>(
&self,
out_id: T,
header: &BlockHeader,
) -> Result<MerkleProof, Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let merkle_proof =
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
self.rewind_and_apply_fork(&header, ext, batch)?;
ext.extension.merkle_proof(out_id, batch)
})?;
Ok(merkle_proof)
}
pub fn get_merkle_proof_for_pos(&self, commit: Commitment) -> Result<MerkleProof, Error> {
let mut txhashset = self.txhashset.write();
txhashset.merkle_proof(commit)
}
fn rewind_and_apply_fork(
&self,
header: &BlockHeader,
ext: &mut ExtensionPair,
batch: &Batch,
) -> Result<BlockHeader, Error> {
let denylist = self.denylist.read().clone();
pipe::rewind_and_apply_fork(header, ext, batch, &|header| {
pipe::validate_header_denylist(header, &denylist)
})
}
fn rewind_and_apply_header_fork(
&self,
header: &BlockHeader,
ext: &mut HeaderExtension,
batch: &Batch,
) -> Result<(), Error> {
let denylist = self.denylist.read().clone();
pipe::rewind_and_apply_header_fork(header, ext, batch, &|header| {
pipe::validate_header_denylist(header, &denylist)
})
}
pub fn txhashset_read(&self, h: Hash) -> Result<(u64, u64, File), Error> {
let header = self.get_block_header(&h)?;
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
self.rewind_and_apply_fork(&header, ext, batch)?;
ext.extension.snapshot(batch)?;
txhashset::zip_read(self.db_root.clone(), &header)
.map(|file| (header.output_mmr_size, header.kernel_mmr_size, file))
})
}
pub fn segmenter(&self) -> Result<Segmenter, Error> {
let ref archive_header = self.txhashset_archive_header()?;
if let Some(x) = self.pibd_segmenter.read().as_ref() {
if x.header() == archive_header {
return Ok(x.clone());
}
}
let segmenter = self.init_segmenter(archive_header)?;
let mut cache = self.pibd_segmenter.write();
*cache = Some(segmenter.clone());
return Ok(segmenter);
}
fn init_segmenter(&self, header: &BlockHeader) -> Result<Segmenter, Error> {
let now = Instant::now();
debug!(
"init_segmenter: initializing new segmenter for {} at {}",
header.hash(),
header.height
);
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let bitmap_snapshot =
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
ext.extension.rewind(header, batch)?;
Ok(ext.extension.bitmap_accumulator())
})?;
debug!("init_segmenter: done, took {}ms", now.elapsed().as_millis());
Ok(Segmenter::new(
self.txhashset(),
Arc::new(bitmap_snapshot),
header.clone(),
))
}
pub fn txhashset_archive_header(&self) -> Result<BlockHeader, Error> {
let sync_threshold = global::state_sync_threshold() as u64;
let body_head = self.head()?;
let archive_interval = global::txhashset_archive_interval();
let mut txhashset_height = body_head.height.saturating_sub(sync_threshold);
txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval);
debug!(
"txhashset_archive_header: body_head - {}, {}, txhashset height - {}",
body_head.last_block_h, body_head.height, txhashset_height,
);
self.get_header_by_height(txhashset_height)
}
fn validate_kernel_history(
&self,
header: &BlockHeader,
txhashset: &txhashset::TxHashSet,
) -> Result<(), Error> {
debug!("validate_kernel_history: rewinding and validating kernel history (readonly)");
let mut count = 0;
let mut current = header.clone();
txhashset::rewindable_kernel_view(&txhashset, |view, batch| {
while current.height > 0 {
view.rewind(¤t)?;
view.validate_root()?;
current = batch.get_previous_header(¤t)?;
count += 1;
}
Ok(())
})?;
debug!(
"validate_kernel_history: validated kernel root on {} headers",
count,
);
Ok(())
}
pub fn fork_point(&self) -> Result<BlockHeader, Error> {
let body_head = self.head()?;
let mut current = self.get_block_header(&body_head.hash())?;
while !self.is_on_current_chain(¤t, body_head).is_ok() {
current = self.get_previous_header(¤t)?;
}
Ok(current)
}
pub fn check_txhashset_needed(&self, fork_point: &BlockHeader) -> Result<bool, Error> {
if self.archive_mode() {
debug!("check_txhashset_needed: we are running with archive_mode=true, not needed");
return Ok(false);
}
let header_head = self.header_head()?;
let horizon = global::cut_through_horizon() as u64;
Ok(fork_point.height < header_head.height.saturating_sub(horizon))
}
pub fn clean_txhashset_sandbox(&self) {
txhashset::clean_txhashset_folder(&self.get_tmp_dir());
}
pub fn get_tmp_dir(&self) -> PathBuf {
let mut tmp_dir = PathBuf::from(self.db_root.clone());
tmp_dir = tmp_dir
.parent()
.expect("fail to get parent of db_root dir")
.to_path_buf();
tmp_dir.push("tmp");
tmp_dir
}
pub fn get_tmpfile_pathname(&self, tmpfile_name: String) -> PathBuf {
let mut tmp = self.get_tmp_dir();
if !tmp.exists() {
if let Err(e) = fs::create_dir(tmp.clone()) {
warn!("fail to create tmp folder on {:?}. err: {}", tmp, e);
}
}
tmp.push(tmpfile_name);
if tmp.exists() {
if let Err(e) = fs::remove_file(tmp.clone()) {
warn!("fail to clean existing tmp file: {:?}. err: {}", tmp, e);
}
}
tmp
}
pub fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
status: &dyn TxHashsetWriteStatus,
) -> Result<bool, Error> {
status.on_setup();
let fork_point = self.fork_point()?;
if !self.check_txhashset_needed(&fork_point)? {
warn!("txhashset_write: txhashset received but it's not needed! ignored.");
return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
}
let header = match self.get_block_header(&h) {
Ok(header) => header,
Err(_) => {
warn!("txhashset_write: cannot find block header");
return Ok(true);
}
};
let sandbox_dir = self.get_tmp_dir();
txhashset::clean_txhashset_folder(&sandbox_dir);
txhashset::zip_write(sandbox_dir.clone(), txhashset_data.try_clone()?, &header)?;
let mut txhashset = txhashset::TxHashSet::open(
sandbox_dir
.to_str()
.expect("invalid sandbox folder")
.to_owned(),
self.store.clone(),
Some(&header),
)?;
{
self.validate_kernel_history(&header, &txhashset)?;
let header_pmmr = self.header_pmmr.read();
let batch = self.store.batch()?;
txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch)?;
}
debug!("txhashset_write: rewinding a 2nd time (writeable)");
let mut header_pmmr = self.header_pmmr.write();
let mut batch = self.store.batch()?;
txhashset::extending(
&mut header_pmmr,
&mut txhashset,
&mut batch,
|ext, batch| {
let extension = &mut ext.extension;
extension.rewind(&header, batch)?;
let (utxo_sum, kernel_sum) =
extension.validate(&self.genesis, false, status, &header)?;
batch.save_block_sums(
&header.hash(),
BlockSums {
utxo_sum,
kernel_sum,
},
)?;
Ok(())
},
)?;
debug!("txhashset_write: finished validating and rebuilding");
status.on_save();
{
let tip = Tip::from_header(&header);
batch.save_body_head(&tip)?;
batch.save_body_tail(&tip)?;
}
txhashset.init_output_pos_index(&header_pmmr, &batch)?;
txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;
batch.commit()?;
debug!("txhashset_write: finished committing the batch (head etc.)");
{
let mut txhashset_ref = self.txhashset.write();
txhashset_ref.release_backend_files();
txhashset.release_backend_files();
txhashset::txhashset_replace(sandbox_dir, PathBuf::from(self.db_root.clone()))?;
txhashset = txhashset::TxHashSet::open(
self.db_root.clone(),
self.store.clone(),
Some(&header),
)?;
*txhashset_ref = txhashset;
}
debug!("txhashset_write: replaced our txhashset with the new one");
status.on_done();
Ok(false)
}
fn remove_historical_blocks(
&self,
header_pmmr: &txhashset::PMMRHandle<BlockHeader>,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
if self.archive_mode() {
return Ok(());
}
let horizon = global::cut_through_horizon() as u64;
let head = batch.head()?;
let tail = match batch.tail() {
Ok(tail) => tail,
Err(_) => Tip::from_header(&self.genesis),
};
let cutoff = head.height.saturating_sub(horizon);
debug!(
"remove_historical_blocks: head height: {}, tail height: {}, horizon: {}, cutoff: {}",
head.height, tail.height, horizon, cutoff,
);
if cutoff == 0 {
return Ok(());
}
let mut count = 0;
let tail_hash = header_pmmr.get_header_hash_by_height(head.height - horizon)?;
let tail = batch.get_block_header(&tail_hash)?;
for block in batch.blocks_iter()? {
if block.header.height < tail.height {
let _ = batch.delete_block(&block.hash());
count += 1;
}
}
batch.save_body_tail(&Tip::from_header(&tail))?;
debug!(
"remove_historical_blocks: removed {} blocks. tail height: {}",
count, tail.height
);
Ok(())
}
pub fn compact(&self) -> Result<(), Error> {
if let (Ok(tail), Ok(head)) = (self.tail(), self.head()) {
let horizon = global::cut_through_horizon() as u64;
let threshold = horizon.saturating_add(60);
let next_compact = tail.height.saturating_add(threshold);
if next_compact > head.height {
debug!(
"compact: skipping startup compaction (next at {})",
next_compact
);
return Ok(());
}
}
let header_pmmr = self.header_pmmr.read();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
{
let head_header = batch.head_header()?;
let current_height = head_header.height;
let horizon_height =
current_height.saturating_sub(global::cut_through_horizon().into());
let horizon_hash = header_pmmr.get_header_hash_by_height(horizon_height)?;
let horizon_header = batch.get_block_header(&horizon_hash)?;
txhashset.compact(&horizon_header, &batch)?;
}
if !self.archive_mode() {
self.remove_historical_blocks(&header_pmmr, &batch)?;
}
txhashset.init_output_pos_index(&header_pmmr, &batch)?;
txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;
batch.commit()?;
Ok(())
}
pub fn get_last_n_output(&self, distance: u64) -> Vec<(Hash, OutputIdentifier)> {
self.txhashset.read().last_n_output(distance)
}
pub fn get_last_n_rangeproof(&self, distance: u64) -> Vec<(Hash, RangeProof)> {
self.txhashset.read().last_n_rangeproof(distance)
}
pub fn get_last_n_kernel(&self, distance: u64) -> Vec<(Hash, TxKernel)> {
self.txhashset.read().last_n_kernel(distance)
}
pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
Ok(self.txhashset.read().get_output_pos(commit)?)
}
pub fn unspent_outputs_by_pmmr_index(
&self,
start_index: u64,
max_count: u64,
max_pmmr_index: Option<u64>,
) -> Result<(u64, u64, Vec<Output>), Error> {
let txhashset = self.txhashset.read();
let last_index = match max_pmmr_index {
Some(i) => i,
None => txhashset.highest_output_insertion_index(),
};
let outputs = txhashset.outputs_by_pmmr_index(start_index, max_count, max_pmmr_index);
let rangeproofs =
txhashset.rangeproofs_by_pmmr_index(start_index, max_count, max_pmmr_index);
if outputs.0 != rangeproofs.0 || outputs.1.len() != rangeproofs.1.len() {
return Err(ErrorKind::TxHashSetErr(String::from(
"Output and rangeproof sets don't match",
))
.into());
}
let mut output_vec: Vec<Output> = vec![];
for (ref x, &y) in outputs.1.iter().zip(rangeproofs.1.iter()) {
output_vec.push(Output::new(x.features, x.commitment(), y));
}
Ok((outputs.0, last_index, output_vec))
}
pub fn block_height_range_to_pmmr_indices(
&self,
start_block_height: u64,
end_block_height: Option<u64>,
) -> Result<(u64, u64), Error> {
let end_block_height = match end_block_height {
Some(h) => h,
None => self.head_header()?.height,
};
let prev_to_start_header =
self.get_header_by_height(start_block_height.saturating_sub(1))?;
let end_header = self.get_header_by_height(end_block_height)?;
Ok((
prev_to_start_header.output_mmr_size + 1,
end_header.output_mmr_size,
))
}
pub fn orphans_len(&self) -> usize {
self.orphans.len()
}
pub fn head(&self) -> Result<Tip, Error> {
self.store
.head()
.map_err(|e| ErrorKind::StoreErr(e, "chain head".to_owned()).into())
}
pub fn tail(&self) -> Result<Tip, Error> {
self.store
.tail()
.map_err(|e| ErrorKind::StoreErr(e, "chain tail".to_owned()).into())
}
pub fn header_head(&self) -> Result<Tip, Error> {
self.store
.header_head()
.map_err(|e| ErrorKind::StoreErr(e, "header head".to_owned()).into())
}
pub fn head_header(&self) -> Result<BlockHeader, Error> {
self.store
.head_header()
.map_err(|e| ErrorKind::StoreErr(e, "chain head header".to_owned()).into())
}
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
self.store
.get_block(h)
.map_err(|e| ErrorKind::StoreErr(e, "chain get block".to_owned()).into())
}
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
self.store
.get_block_header(h)
.map_err(|e| ErrorKind::StoreErr(e, "chain get header".to_owned()).into())
}
pub fn get_previous_header(&self, header: &BlockHeader) -> Result<BlockHeader, Error> {
self.store
.get_previous_header(header)
.map_err(|e| ErrorKind::StoreErr(e, "chain get previous header".to_owned()).into())
}
pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
self.store
.get_block_sums(h)
.map_err(|e| ErrorKind::StoreErr(e, "chain get block_sums".to_owned()).into())
}
pub fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, Error> {
let hash = self.get_header_hash_by_height(height)?;
self.get_block_header(&hash)
}
fn get_header_hash_by_height(&self, height: u64) -> Result<Hash, Error> {
self.header_pmmr.read().get_header_hash_by_height(height)
}
pub fn get_header_for_output(&self, commit: Commitment) -> Result<BlockHeader, Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
let (_, pos) = match txhashset.get_unspent(commit)? {
Some(o) => o,
None => return Err(ErrorKind::OutputNotFound.into()),
};
let hash = header_pmmr.get_header_hash_by_height(pos.height)?;
Ok(self.get_block_header(&hash)?)
}
pub fn get_kernel_height(
&self,
excess: &Commitment,
min_height: Option<u64>,
max_height: Option<u64>,
) -> Result<Option<(TxKernel, u64, u64)>, Error> {
let head = self.head()?;
if let (Some(min), Some(max)) = (min_height, max_height) {
if min > max {
return Ok(None);
}
}
let min_index = match min_height {
Some(0) => None,
Some(h) => {
if h > head.height {
return Ok(None);
}
let header = self.get_header_by_height(h)?;
let prev_header = self.get_previous_header(&header)?;
Some(prev_header.kernel_mmr_size + 1)
}
None => None,
};
let max_index = match max_height {
Some(h) => {
if h > head.height {
None
} else {
let header = self.get_header_by_height(h)?;
Some(header.kernel_mmr_size)
}
}
None => None,
};
let (kernel, mmr_index) = match self
.txhashset
.read()
.find_kernel(&excess, min_index, max_index)
{
Some(k) => k,
None => return Ok(None),
};
let header = self.get_header_for_kernel_index(mmr_index, min_height, max_height)?;
Ok(Some((kernel, header.height, mmr_index)))
}
pub fn get_header_for_kernel_index(
&self,
kernel_mmr_index: u64,
min_height: Option<u64>,
max_height: Option<u64>,
) -> Result<BlockHeader, Error> {
let header_pmmr = self.header_pmmr.read();
let mut min = min_height.unwrap_or(0).saturating_sub(1);
let mut max = match max_height {
Some(h) => h,
None => self.head()?.height,
};
loop {
let search_height = max - (max - min) / 2;
let hash = header_pmmr.get_header_hash_by_height(search_height)?;
let h = self.get_block_header(&hash)?;
if search_height == 0 {
return Ok(h);
}
let hash_prev = header_pmmr.get_header_hash_by_height(search_height - 1)?;
let h_prev = self.get_block_header(&hash_prev)?;
if kernel_mmr_index > h.kernel_mmr_size {
min = search_height;
} else if kernel_mmr_index < h_prev.kernel_mmr_size {
max = search_height;
} else {
if kernel_mmr_index == h_prev.kernel_mmr_size {
return Ok(h_prev);
}
return Ok(h);
}
}
}
fn is_on_current_chain<T: Into<Tip>>(&self, x: T, head: Tip) -> Result<(), Error> {
let x: Tip = x.into();
if x.height > head.height {
return Err(ErrorKind::Other("not on current chain".to_string()).into());
}
if x.hash() == self.get_header_hash_by_height(x.height)? {
Ok(())
} else {
Err(ErrorKind::Other("not on current chain".to_string()).into())
}
}
pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let mut header_pmmr = self.header_pmmr.write();
txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
let header = batch.get_block_header(&sync_head.hash())?;
self.rewind_and_apply_header_fork(&header, ext, batch)?;
let hashes = heights
.iter()
.filter_map(|h| ext.get_header_hash_by_height(*h))
.collect();
Ok(hashes)
})
}
pub fn difficulty_iter(&self) -> Result<store::DifficultyIter<'_>, Error> {
let head = self.head()?;
let store = self.store.clone();
Ok(store::DifficultyIter::from(head.last_block_h, store))
}
pub fn block_exists(&self, h: Hash) -> Result<bool, Error> {
self.store
.block_exists(&h)
.map_err(|e| ErrorKind::StoreErr(e, "chain block exists".to_owned()).into())
}
}
fn setup_head(
genesis: &Block,
store: &store::ChainStore,
header_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = store.batch()?;
{
if batch.get_block_header(&genesis.hash()).is_err() {
batch.save_block_header(&genesis.header)?;
}
if header_pmmr.last_pos == 0 {
txhashset::header_extending(header_pmmr, &mut batch, |ext, _| {
ext.apply_header(&genesis.header)
})?;
}
}
if let Ok(head) = batch.header_head() {
header_pmmr.init_head(&head)?;
txhashset::header_extending(header_pmmr, &mut batch, |ext, batch| {
let header = batch.get_block_header(&head.hash())?;
ext.rewind(&header)
})?;
} else {
let hash = header_pmmr.head_hash()?;
let header = batch.get_block_header(&hash)?;
batch.save_header_head(&Tip::from_header(&header))?;
}
let head_res = batch.head();
let mut head: Tip;
match head_res {
Ok(h) => {
head = h;
loop {
let header = batch.get_block_header(&head.last_block_h)?;
let res = txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
pipe::rewind_and_apply_fork(&header, ext, batch, &|_| Ok(()))?;
let extension = &mut ext.extension;
extension.validate_roots(&header)?;
if header.height > 0 && batch.get_block_sums(&header.hash()).is_err() {
debug!(
"init: building (missing) block sums for {} @ {}",
header.height,
header.hash()
);
let (utxo_sum, kernel_sum) =
extension.validate_kernel_sums(&genesis.header, &header)?;
batch.save_block_sums(
&header.hash(),
BlockSums {
utxo_sum,
kernel_sum,
},
)?;
}
debug!(
"init: rewinding and validating before we start... {} at {}",
header.hash(),
header.height,
);
Ok(())
});
if res.is_ok() {
break;
} else {
let prev_header = batch.get_block_header(&head.prev_block_h)?;
txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
pipe::rewind_and_apply_fork(&prev_header, ext, batch, &|_| Ok(()))
})?;
{
let _ = batch.delete_block(&header.hash());
head = Tip::from_header(&prev_header);
batch.save_body_head(&head)?;
}
}
}
}
Err(NotFoundErr(_)) => {
let mut sums = BlockSums::default();
batch.save_block(&genesis)?;
batch.save_spent_index(&genesis.hash(), &vec![])?;
batch.save_body_head(&Tip::from_header(&genesis.header))?;
if !genesis.kernels().is_empty() {
let (utxo_sum, kernel_sum) = (sums, genesis as &dyn Committed).verify_kernel_sums(
genesis.header.overage(),
genesis.header.total_kernel_offset(),
)?;
sums = BlockSums {
utxo_sum,
kernel_sum,
};
}
txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
ext.extension
.apply_block(&genesis, ext.header_extension, batch)
})?;
batch.save_block_sums(&genesis.hash(), sums)?;
info!("init: saved genesis: {:?}", genesis.hash());
}
Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()).into()),
};
batch.commit()?;
Ok(())
}