use crate::{
helpers::BlockLocators,
storage::{DataMap, Map, MapId, Storage},
};
use snarkvm::dpc::prelude::*;
use anyhow::{anyhow, Result};
use circular_queue::CircularQueue;
use itertools::Itertools;
use parking_lot::RwLock;
use rand::{CryptoRng, Rng};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, HashSet},
path::Path,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
thread,
thread::JoinHandle,
};
use time::OffsetDateTime;
pub const MAXIMUM_LINEAR_BLOCK_LOCATORS: u32 = 64;
pub const MAXIMUM_QUADRATIC_BLOCK_LOCATORS: u32 = 32;
pub const MAXIMUM_BLOCK_LOCATORS: u32 = MAXIMUM_LINEAR_BLOCK_LOCATORS.saturating_add(MAXIMUM_QUADRATIC_BLOCK_LOCATORS);
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct Metadata<N: Network> {
block_height: u32,
block_hash: N::BlockHash,
block_timestamp: i64,
transaction_index: u16,
}
impl<N: Network> Metadata<N> {
pub fn new(block_height: u32, block_hash: N::BlockHash, block_timestamp: i64, transaction_index: u16) -> Self {
Self {
block_height,
block_hash,
block_timestamp,
transaction_index,
}
}
}
#[derive(Debug)]
pub struct LedgerState<N: Network> {
ledger_tree: RwLock<LedgerTree<N>>,
latest_block: RwLock<Block<N>>,
latest_block_hashes_and_headers: RwLock<CircularQueue<(N::BlockHash, BlockHeader<N>)>>,
latest_block_locators: RwLock<BlockLocators<N>>,
ledger_roots: DataMap<N::LedgerRoot, u32>,
blocks: BlockState<N>,
read_only: (bool, Arc<AtomicU32>, RwLock<Option<Arc<JoinHandle<()>>>>),
map_lock: Arc<RwLock<()>>,
}
impl<N: Network> LedgerState<N> {
pub fn open_writer<S: Storage, P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_writer_with_increment::<S, P>(path, 10_000)
}
#[doc(hidden)]
pub fn open_writer_with_increment<S: Storage, P: AsRef<Path>>(path: P, validation_increment: u32) -> Result<Self> {
let context = N::NETWORK_ID;
let is_read_only = false;
let storage = S::open(path, context, is_read_only)?;
let ledger = Self {
ledger_tree: RwLock::new(LedgerTree::<N>::new()?),
latest_block: RwLock::new(N::genesis_block().clone()),
latest_block_hashes_and_headers: RwLock::new(CircularQueue::with_capacity(MAXIMUM_LINEAR_BLOCK_LOCATORS as usize)),
latest_block_locators: Default::default(),
ledger_roots: storage.open_map(MapId::LedgerRoots)?,
blocks: BlockState::open(storage)?,
read_only: (is_read_only, Arc::new(AtomicU32::new(0)), RwLock::new(None)),
map_lock: Default::default(),
};
let mut latest_block_height = match (ledger.ledger_roots.values().max(), ledger.blocks.block_heights.keys().max()) {
(Some(latest_block_height_0), Some(latest_block_height_1)) => match latest_block_height_0 == latest_block_height_1 {
true => latest_block_height_0,
false => match ledger.try_fixing_inconsistent_state() {
Ok(current_block_height) => current_block_height,
Err(error) => return Err(error),
},
},
(None, None) => 0u32,
_ => return Err(anyhow!("Ledger storage state is inconsistent")),
};
if latest_block_height == 0u32 && !ledger.blocks.contains_block_height(0u32)? {
let genesis = N::genesis_block();
ledger.ledger_roots.insert(&genesis.previous_ledger_root(), &genesis.height())?;
let _map_lock = ledger.map_lock.read();
ledger.blocks.add_block(genesis)?;
}
let count = ledger.blocks.get_block_header_count()?;
assert_eq!(count, latest_block_height.saturating_add(1));
if N::NETWORK_ID == 2
&& latest_block_height > snarkvm::dpc::testnet2::V12_UPGRADE_BLOCK_HEIGHT
&& ledger.get_block(latest_block_height).is_err()
{
let revert_block_height = snarkvm::dpc::testnet2::V12_UPGRADE_BLOCK_HEIGHT.saturating_sub(1);
warn!("Ledger is not V12-compliant, reverting to block {}", revert_block_height);
warn!("{:?}", ledger.get_block(latest_block_height));
latest_block_height = ledger.clear_incompatible_blocks(latest_block_height, revert_block_height)?;
info!("Ledger successfully transitioned and is now V12-compliant");
}
let mut start_block_height = 0u32;
while start_block_height <= latest_block_height {
let end_block_height = std::cmp::min(start_block_height.saturating_add(validation_increment), latest_block_height);
let block_hashes = ledger.get_block_hashes(start_block_height, end_block_height)?;
if let Some((last_block_hash, block_hashes_excluding_last)) = block_hashes.split_last() {
if !block_hashes_excluding_last.is_empty() {
ledger.ledger_tree.write().add_all(block_hashes_excluding_last)?;
}
let ledger_root = ledger.get_previous_ledger_root(end_block_height)?;
if ledger_root != ledger.ledger_tree.read().root() {
return Err(anyhow!("Ledger has incorrect ledger tree state at block {}", end_block_height));
}
let candidate_height = match ledger.ledger_roots.get(&ledger_root)? {
Some(candidate_height) => candidate_height,
None => return Err(anyhow!("Ledger is missing ledger root for block {}", end_block_height)),
};
if end_block_height != candidate_height {
return Err(anyhow!(
"Ledger expected block {}, found block {}",
end_block_height,
candidate_height
));
}
ledger.ledger_tree.write().add(last_block_hash)?;
}
let progress = (end_block_height as f64 / latest_block_height as f64 * 100f64) as u8;
debug!("Validating the ledger up to block {} ({}%)", end_block_height, progress);
start_block_height = end_block_height.saturating_add(1);
}
if start_block_height == 0u32 {
ledger.ledger_tree.write().add(&N::genesis_block().hash())?;
}
*ledger.latest_block.write() = ledger.get_block(latest_block_height)?;
ledger.regenerate_latest_ledger_state()?;
let latest_ledger_root = ledger.ledger_tree.read().root();
ledger.regenerate_ledger_tree()?;
assert_eq!(ledger.ledger_tree.read().root(), latest_ledger_root);
info!("Ledger successfully loaded at block {}", ledger.latest_block_height());
Ok(ledger)
}
pub fn open_reader<S: Storage, P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
let context = N::NETWORK_ID;
let is_read_only = true;
let storage = S::open(path, context, is_read_only)?;
let ledger = Arc::new(Self {
ledger_tree: RwLock::new(LedgerTree::<N>::new()?),
latest_block: RwLock::new(N::genesis_block().clone()),
latest_block_hashes_and_headers: RwLock::new(CircularQueue::with_capacity(MAXIMUM_LINEAR_BLOCK_LOCATORS as usize)),
latest_block_locators: Default::default(),
ledger_roots: storage.open_map(MapId::LedgerRoots)?,
blocks: BlockState::open(storage)?,
read_only: (is_read_only, Arc::new(AtomicU32::new(0)), RwLock::new(None)),
map_lock: Default::default(),
});
let latest_block_height = match (ledger.ledger_roots.values().max(), ledger.blocks.block_heights.keys().max()) {
(Some(latest_block_height_0), Some(latest_block_height_1)) => match latest_block_height_0 == latest_block_height_1 {
true => latest_block_height_0,
false => {
return Err(anyhow!(
"Ledger storage state is incorrect, use `LedgerState::open_writer` to attempt to automatically fix the problem"
));
}
},
(None, None) => 0u32,
_ => return Err(anyhow!("Ledger storage state is inconsistent")),
};
if latest_block_height == 0u32 && !ledger.blocks.contains_block_height(0u32)? {
let genesis = N::genesis_block();
let _map_lock = ledger.map_lock.read();
ledger.ledger_roots.insert(&genesis.previous_ledger_root(), &genesis.height())?;
ledger.blocks.add_block(genesis)?;
}
*ledger.latest_block.write() = ledger.get_block(latest_block_height)?;
ledger.regenerate_latest_ledger_state()?;
ledger.regenerate_ledger_tree()?;
*ledger.read_only.2.write() = Some(Arc::new(ledger.initialize_reader_heartbeat(latest_block_height)?));
trace!("[Read-Only] Ledger successfully loaded at block {}", ledger.latest_block_height());
Ok(ledger)
}
pub fn is_read_only(&self) -> bool {
self.read_only.0
}
pub fn latest_block(&self) -> Block<N> {
self.latest_block.read().clone()
}
pub fn latest_block_height(&self) -> u32 {
self.latest_block.read().height()
}
pub fn latest_block_hash(&self) -> N::BlockHash {
self.latest_block.read().hash()
}
pub fn latest_block_timestamp(&self) -> i64 {
self.latest_block.read().timestamp()
}
pub fn latest_block_difficulty_target(&self) -> u64 {
self.latest_block.read().difficulty_target()
}
pub fn latest_cumulative_weight(&self) -> u128 {
self.latest_block.read().cumulative_weight()
}
pub fn latest_block_header(&self) -> BlockHeader<N> {
self.latest_block.read().header().clone()
}
pub fn latest_block_transactions(&self) -> Transactions<N> {
self.latest_block.read().transactions().clone()
}
pub fn latest_block_locators(&self) -> BlockLocators<N> {
self.latest_block_locators.read().clone()
}
pub fn latest_ledger_root(&self) -> N::LedgerRoot {
self.ledger_tree.read().root()
}
pub fn contains_ledger_root(&self, ledger_root: &N::LedgerRoot) -> Result<bool> {
Ok(*ledger_root == self.latest_ledger_root() || self.ledger_roots.contains_key(ledger_root)?)
}
pub fn contains_block_height(&self, block_height: u32) -> Result<bool> {
self.blocks.contains_block_height(block_height)
}
pub fn contains_block_hash(&self, block_hash: &N::BlockHash) -> Result<bool> {
self.blocks.contains_block_hash(block_hash)
}
pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> Result<bool> {
self.blocks.contains_transaction(transaction_id)
}
pub fn contains_serial_number(&self, serial_number: &N::SerialNumber) -> Result<bool> {
self.blocks.contains_serial_number(serial_number)
}
pub fn contains_commitment(&self, commitment: &N::Commitment) -> Result<bool> {
self.blocks.contains_commitment(commitment)
}
pub fn get_ciphertext(&self, commitment: &N::Commitment) -> Result<N::RecordCiphertext> {
self.blocks.get_ciphertext(commitment)
}
pub fn get_transition(&self, transition_id: &N::TransitionID) -> Result<Transition<N>> {
self.blocks.get_transition(transition_id)
}
pub fn get_transaction(&self, transaction_id: &N::TransactionID) -> Result<Transaction<N>> {
self.blocks.get_transaction(transaction_id)
}
pub fn get_transaction_metadata(&self, transaction_id: &N::TransactionID) -> Result<Metadata<N>> {
self.blocks.get_transaction_metadata(transaction_id)
}
pub fn get_cumulative_weight(&self, block_height: u32) -> Result<u128> {
self.blocks.get_cumulative_weight(block_height)
}
pub fn get_block_height(&self, block_hash: &N::BlockHash) -> Result<u32> {
self.blocks.get_block_height(block_hash)
}
pub fn get_block_hash(&self, block_height: u32) -> Result<N::BlockHash> {
self.blocks.get_block_hash(block_height)
}
pub fn get_block_hashes(&self, start_block_height: u32, end_block_height: u32) -> Result<Vec<N::BlockHash>> {
self.blocks.get_block_hashes(start_block_height, end_block_height)
}
pub fn get_previous_block_hash(&self, block_height: u32) -> Result<N::BlockHash> {
self.blocks.get_previous_block_hash(block_height)
}
pub fn get_block_header(&self, block_height: u32) -> Result<BlockHeader<N>> {
self.blocks.get_block_header(block_height)
}
pub fn get_block_headers(&self, start_block_height: u32, end_block_height: u32) -> Result<Vec<BlockHeader<N>>> {
self.blocks.get_block_headers(start_block_height, end_block_height)
}
pub fn get_block_transactions(&self, block_height: u32) -> Result<Transactions<N>> {
self.blocks.get_block_transactions(block_height)
}
pub fn get_block(&self, block_height: u32) -> Result<Block<N>> {
self.blocks.get_block(block_height)
}
pub fn get_blocks(&self, start_block_height: u32, end_block_height: u32) -> Result<Vec<Block<N>>> {
self.blocks.get_blocks(start_block_height, end_block_height)
}
pub fn get_previous_ledger_root(&self, block_height: u32) -> Result<N::LedgerRoot> {
self.blocks.get_previous_ledger_root(block_height)
}
pub fn get_block_locators(&self, block_height: u32) -> Result<BlockLocators<N>> {
let mut block_locator_height = block_height;
let num_block_headers = std::cmp::min(MAXIMUM_LINEAR_BLOCK_LOCATORS, block_locator_height);
let block_locator_headers = self
.latest_block_hashes_and_headers
.read()
.asc_iter()
.filter(|(_, header)| header.height() != 0) .take(num_block_headers as usize)
.cloned()
.map(|(hash, header)| (header.height(), (hash, Some(header))))
.collect::<Vec<_>>();
block_locator_height -= num_block_headers;
if block_locator_height == 0 {
let mut block_locators: BTreeMap<u32, (N::BlockHash, Option<BlockHeader<N>>)> = block_locator_headers.into_iter().collect();
block_locators.insert(0, (self.get_block_hash(0)?, None));
return BlockLocators::<N>::from(block_locators);
}
let num_block_hashes = std::cmp::min(MAXIMUM_QUADRATIC_BLOCK_LOCATORS, block_locator_height);
let mut block_locator_hashes = Vec::with_capacity(num_block_hashes as usize);
let mut accumulator = 1;
while block_locator_height > 0 && block_locator_hashes.len() < num_block_hashes as usize {
block_locator_hashes.push((block_locator_height, (self.get_block_hash(block_locator_height)?, None)));
block_locator_height = block_locator_height.saturating_sub(accumulator);
accumulator *= 2;
}
let mut block_locators: BTreeMap<u32, (N::BlockHash, Option<BlockHeader<N>>)> =
block_locator_headers.into_iter().chain(block_locator_hashes).collect();
block_locators.insert(0, (self.get_block_hash(0)?, None));
BlockLocators::<N>::from(block_locators)
}
pub fn check_block_locators(&self, block_locators: &BlockLocators<N>) -> Result<bool> {
let (expected_genesis_block_hash, expected_genesis_header) = match block_locators.get(&0) {
Some((expected_genesis_block_hash, expected_genesis_header)) => (expected_genesis_block_hash, expected_genesis_header),
None => return Ok(false),
};
if expected_genesis_block_hash != &N::genesis_block().hash() || expected_genesis_header.is_some() {
return Ok(false);
}
let num_linear_block_headers = std::cmp::min(MAXIMUM_LINEAR_BLOCK_LOCATORS as usize, block_locators.len() - 1);
let num_quadratic_block_headers = block_locators.len().saturating_sub(num_linear_block_headers + 1);
let mut last_block_height = match block_locators.keys().max() {
Some(height) => *height,
None => return Ok(false),
};
for (block_height, (_block_hash, block_header)) in block_locators.iter().rev().take(num_linear_block_headers) {
match last_block_height == *block_height {
true => last_block_height = block_height.saturating_sub(1),
false => return Ok(false),
}
let block_header = match block_header {
Some(header) => header,
None => return Ok(false),
};
if block_height != &block_header.height() {
return Ok(false);
}
}
if block_locators.len() > MAXIMUM_LINEAR_BLOCK_LOCATORS as usize {
let mut previous_block_height = u32::MAX;
let mut accumulator = 1;
for (block_height, (_block_hash, block_header)) in block_locators
.iter()
.rev()
.skip(num_linear_block_headers + 1)
.take(num_quadratic_block_headers - 1)
{
if previous_block_height != u32::MAX && previous_block_height.saturating_sub(accumulator) != *block_height {
return Ok(false);
}
if block_header.is_some() {
return Ok(false);
}
previous_block_height = *block_height;
accumulator *= 2;
}
}
Ok(true)
}
pub fn get_block_template<R: Rng + CryptoRng>(
&self,
recipient: Address<N>,
is_public: bool,
transactions: &[Transaction<N>],
rng: &mut R,
) -> Result<BlockTemplate<N>> {
let latest_block = self.latest_block();
let previous_ledger_root = self.latest_ledger_root();
let previous_block_hash = latest_block.hash();
let block_height = latest_block.height().saturating_add(1);
let block_timestamp = std::cmp::max(
OffsetDateTime::now_utc().unix_timestamp(),
latest_block.timestamp().saturating_add(1),
);
let difficulty_target = if N::NETWORK_ID == 2 && block_height <= snarkvm::dpc::testnet2::V12_UPGRADE_BLOCK_HEIGHT {
Blocks::<N>::compute_difficulty_target(latest_block.header(), block_timestamp, block_height)
} else if N::NETWORK_ID == 2 {
let anchor_block_header = self.get_block_header(snarkvm::dpc::testnet2::V12_UPGRADE_BLOCK_HEIGHT)?;
Blocks::<N>::compute_difficulty_target(&anchor_block_header, block_timestamp, block_height)
} else {
Blocks::<N>::compute_difficulty_target(N::genesis_block().header(), block_timestamp, block_height)
};
let cumulative_weight = latest_block
.cumulative_weight()
.saturating_add((u64::MAX / difficulty_target) as u128);
let mut coinbase_reward = Block::<N>::block_reward(block_height);
let mut transaction_fees = AleoAmount::ZERO;
let mut transactions: Vec<Transaction<N>> = transactions
.iter()
.filter(|transaction| {
for serial_number in transaction.serial_numbers() {
if let Ok(true) = self.contains_serial_number(serial_number) {
trace!(
"Ledger is filtering out transaction {} (serial_number {})",
transaction.transaction_id(),
serial_number
);
return false;
}
}
for commitment in transaction.commitments() {
if let Ok(true) = self.contains_commitment(commitment) {
trace!(
"Ledger is filtering out transaction {} (commitment {})",
transaction.transaction_id(),
commitment
);
return false;
}
}
trace!("Adding transaction {} to block template", transaction.transaction_id());
transaction_fees = transaction_fees.add(transaction.value_balance());
true
})
.cloned()
.collect();
if transaction_fees.is_negative() {
return Err(anyhow!("Invalid transaction fees"));
}
coinbase_reward = coinbase_reward.add(transaction_fees);
let (coinbase_transaction, coinbase_record) = Transaction::<N>::new_coinbase(recipient, coinbase_reward, is_public, rng)?;
transactions.push(coinbase_transaction);
let transactions = Transactions::from(&transactions)?;
Ok(BlockTemplate::new(
previous_block_hash,
block_height,
block_timestamp,
difficulty_target,
cumulative_weight,
previous_ledger_root,
transactions,
coinbase_record,
))
}
pub fn mine_next_block<R: Rng + CryptoRng>(
&self,
recipient: Address<N>,
is_public: bool,
transactions: &[Transaction<N>],
terminator: &AtomicBool,
rng: &mut R,
) -> Result<(Block<N>, Record<N>)> {
let template = self.get_block_template(recipient, is_public, transactions, rng)?;
let coinbase_record = template.coinbase_record().clone();
match Block::mine(&template, terminator, rng) {
Ok(block) => Ok((block, coinbase_record)),
Err(error) => Err(anyhow!("Unable to mine the next block: {}", error)),
}
}
pub fn add_next_block(&self, block: &Block<N>) -> Result<()> {
if self.is_read_only() {
return Err(anyhow!("Ledger is in read-only mode"));
}
if !block.is_valid() {
return Err(anyhow!("Block {} is invalid", block.height()));
}
let current_block = self.latest_block();
let block_height = block.height();
if block_height != current_block.height() + 1 {
return Err(anyhow!(
"Block {} should have block height {}",
block_height,
current_block.height() + 1
));
}
if block.previous_block_hash() != current_block.hash() {
return Err(anyhow!(
"Block {} has an incorrect previous block hash in the canon chain",
block_height
));
}
let now = OffsetDateTime::now_utc().unix_timestamp();
if block.timestamp() > (now + N::ALEO_FUTURE_TIME_LIMIT_IN_SECS) {
return Err(anyhow!("The given block timestamp exceeds the time limit"));
}
if block.timestamp() <= current_block.timestamp() {
return Err(anyhow!("The given block timestamp is before the current timestamp"));
}
let expected_difficulty_target = if N::NETWORK_ID == 2 && block_height <= snarkvm::dpc::testnet2::V12_UPGRADE_BLOCK_HEIGHT {
Blocks::<N>::compute_difficulty_target(current_block.header(), block.timestamp(), block.height())
} else if N::NETWORK_ID == 2 {
let anchor_block_header = self.get_block_header(snarkvm::dpc::testnet2::V12_UPGRADE_BLOCK_HEIGHT)?;
Blocks::<N>::compute_difficulty_target(&anchor_block_header, block.timestamp(), block.height())
} else {
Blocks::<N>::compute_difficulty_target(N::genesis_block().header(), block.timestamp(), block.height())
};
if block.difficulty_target() != expected_difficulty_target {
return Err(anyhow!(
"Block {} has an incorrect difficulty target. Found {}, but expected {}",
block_height,
block.difficulty_target(),
expected_difficulty_target
));
}
let expected_cumulative_weight = current_block
.cumulative_weight()
.saturating_add((u64::MAX / expected_difficulty_target) as u128);
if block.cumulative_weight() != expected_cumulative_weight {
return Err(anyhow!(
"The given cumulative weight is incorrect. Found {}, but expected {}",
block.cumulative_weight(),
expected_cumulative_weight
));
}
if self.contains_block_height(block_height)? {
return Err(anyhow!("Block {} already exists in the canon chain", block_height));
}
if self.contains_block_hash(&block.hash())? {
return Err(anyhow!("Block {} has a repeat block hash in the canon chain", block_height));
}
if block.previous_ledger_root() != self.latest_ledger_root() {
return Err(anyhow!("Block {} declares an incorrect ledger root", block_height));
}
for serial_number in block.serial_numbers() {
if self.contains_serial_number(serial_number)? {
return Err(anyhow!("Serial number {} already exists in the ledger", serial_number));
}
}
for commitment in block.commitments() {
if self.contains_commitment(commitment)? {
return Err(anyhow!("Commitment {} already exists in the ledger", commitment));
}
}
for transaction in block.transactions().iter() {
if self.contains_transaction(&transaction.transaction_id())? {
return Err(anyhow!(
"Transaction {} in block {} has a duplicate transaction in the ledger",
transaction.transaction_id(),
block_height
));
}
if !self.contains_ledger_root(&transaction.ledger_root())? {
return Err(anyhow!(
"Transaction {} in block {} references non-existent ledger root {}",
transaction.transaction_id(),
block_height,
&transaction.ledger_root()
));
}
}
let _map_lock = self.map_lock.read();
self.blocks.add_block(block)?;
self.ledger_tree.write().add(&block.hash())?;
self.ledger_roots.insert(&block.previous_ledger_root(), &block.height())?;
self.latest_block_hashes_and_headers
.write()
.push((block.hash(), block.header().clone()));
*self.latest_block_locators.write() = self.get_block_locators(block.height())?;
*self.latest_block.write() = block.clone();
Ok(())
}
pub fn revert_to_block_height(&self, block_height: u32) -> Result<Vec<Block<N>>> {
if self.is_read_only() {
return Err(anyhow!("Ledger is in read-only mode"));
}
let latest_block_height = self.latest_block_height();
let number_of_blocks = latest_block_height.saturating_sub(block_height);
if block_height >= latest_block_height || number_of_blocks > N::ALEO_MAXIMUM_FORK_DEPTH || self.get_block(block_height).is_err() {
return Err(anyhow!("Attempted to return to block height {}, which is invalid", block_height));
}
let start_block_height = latest_block_height.saturating_sub(number_of_blocks);
let blocks: BTreeMap<u32, Block<N>> = self
.get_blocks(start_block_height, latest_block_height)?
.iter()
.map(|block| (block.height(), block.clone()))
.collect();
let _map_lock = self.map_lock.read();
let mut current_block_height = latest_block_height;
let mut current_block = blocks.get(¤t_block_height);
while current_block_height > block_height {
match current_block {
Some(block) => {
self.blocks.remove_block(current_block_height)?;
self.ledger_roots.remove(&block.previous_ledger_root())?;
current_block_height = current_block_height.saturating_sub(1);
current_block = blocks.get(¤t_block_height);
}
None => match self.try_fixing_inconsistent_state() {
Ok(block_height) => {
current_block_height = block_height;
break;
}
Err(error) => return Err(error),
},
}
}
*self.latest_block.write() = self.get_block(current_block_height)?;
self.regenerate_latest_ledger_state()?;
self.regenerate_ledger_tree()?;
Ok(blocks.values().skip(1).cloned().collect())
}
pub fn get_ledger_inclusion_proof(&self, commitment: N::Commitment) -> Result<LedgerProof<N>> {
let commitment_transition_id = match self.blocks.transactions.commitments.get(&commitment)? {
Some(transition_id) => transition_id,
None => return Err(anyhow!("commitment {} missing from commitments map", commitment)),
};
let transaction_id = match self.blocks.transactions.transitions.get(&commitment_transition_id)? {
Some((transaction_id, _, _)) => transaction_id,
None => return Err(anyhow!("transition id {} missing from transactions map", commitment_transition_id)),
};
let transaction = self.get_transaction(&transaction_id)?;
let block_hash = match self.blocks.transactions.transactions.get(&transaction_id)? {
Some((_, _, metadata)) => metadata.block_hash,
None => return Err(anyhow!("transaction id {} missing from transactions map", transaction_id)),
};
let block_header = match self.blocks.block_headers.get(&block_hash)? {
Some(block_header) => block_header,
None => return Err(anyhow!("Block {} missing from block headers map", block_hash)),
};
let local_proof = transaction.to_local_proof(commitment)?;
let transaction_id = local_proof.transaction_id();
let transactions = self.get_block_transactions(block_header.height())?;
let transactions_inclusion_proof = {
let index = transactions
.transaction_ids()
.enumerate()
.filter_map(|(index, id)| match id == transaction_id {
true => Some(index),
false => None,
})
.collect::<Vec<_>>();
assert_eq!(1, index.len()); transactions.to_transactions_inclusion_proof(index[0], transaction_id)?
};
let transactions_root = transactions.transactions_root();
let block_header_inclusion_proof = block_header.to_header_inclusion_proof(1, transactions_root)?;
let block_header_root = block_header.to_header_root()?;
let previous_block_hash = self.get_previous_block_hash(self.get_block_height(&block_hash)?)?;
let record_proof = RecordProof::new(
block_hash,
previous_block_hash,
block_header_root,
block_header_inclusion_proof,
transactions_root,
transactions_inclusion_proof,
local_proof,
)?;
let ledger_root = self.ledger_tree.read().root();
let ledger_root_inclusion_proof = self.ledger_tree.read().to_ledger_inclusion_proof(&block_hash)?;
LedgerProof::new(ledger_root, ledger_root_inclusion_proof, record_proof)
}
fn regenerate_latest_ledger_state(&self) -> Result<()> {
let end_block_height = self.latest_block_height();
let start_block_height = end_block_height.saturating_sub(MAXIMUM_LINEAR_BLOCK_LOCATORS - 1);
let block_hashes = self.get_block_hashes(start_block_height, end_block_height)?;
let block_headers = self.get_block_headers(start_block_height, end_block_height)?;
assert_eq!(block_hashes.len(), block_headers.len());
{
let mut latest_block_hashes_and_headers = self.latest_block_hashes_and_headers.write();
latest_block_hashes_and_headers.clear();
for (block_hash, block_header) in block_hashes.into_iter().zip_eq(block_headers) {
latest_block_hashes_and_headers.push((block_hash, block_header));
}
}
*self.latest_block_locators.write() = self.get_block_locators(end_block_height)?;
Ok(())
}
fn regenerate_ledger_tree(&self) -> Result<()> {
let mut ledger_tree = self.ledger_tree.write();
let mut block_hashes = Vec::with_capacity(self.latest_block_height() as usize);
for height in 0..=self.latest_block_height() {
block_hashes.push(self.get_block_hash(height)?);
}
let mut new_ledger_tree = LedgerTree::<N>::new()?;
new_ledger_tree.add_all(&block_hashes)?;
*ledger_tree = new_ledger_tree;
Ok(())
}
fn initialize_reader_heartbeat(self: &Arc<Self>, starting_block_height: u32) -> Result<JoinHandle<()>> {
if !self.is_read_only() {
return Err(anyhow!("Ledger must be read-only to initialize a reader heartbeat"));
}
let ledger = self.clone();
Ok(thread::spawn(move || {
let last_seen_block_height = ledger.read_only.1.clone();
ledger.read_only.1.store(starting_block_height, Ordering::SeqCst);
loop {
if ledger.ledger_roots.refresh() {
if let Some(latest_block_height) = ledger.blocks.block_heights.keys().max() {
let current_block_height = last_seen_block_height.load(Ordering::SeqCst);
trace!(
"[Read-Only] Updating ledger state from block {} to {}",
current_block_height,
latest_block_height
);
match ledger.get_block(latest_block_height) {
Ok(block) => *ledger.latest_block.write() = block,
Err(error) => warn!("[Read-Only] {}", error),
};
if let Err(error) = ledger.regenerate_ledger_tree() {
warn!("[Read-Only] {}", error);
};
if let Err(error) = ledger.regenerate_latest_ledger_state() {
warn!("[Read-Only] {}", error);
};
last_seen_block_height.store(latest_block_height, Ordering::SeqCst);
}
}
thread::sleep(std::time::Duration::from_secs(6));
}
}))
}
fn try_fixing_inconsistent_state(&self) -> Result<u32> {
if self.is_read_only() {
return Err(anyhow!("Ledger must be writable to fix inconsistent state"));
}
match (self.ledger_roots.values().max(), self.blocks.block_heights.keys().max()) {
(Some(latest_block_height_0), Some(latest_block_height_1)) => match latest_block_height_0 == latest_block_height_1 {
true => Ok(latest_block_height_0),
false => {
if latest_block_height_0 > latest_block_height_1 {
debug!("Attempting to automatically resolve inconsistent ledger state");
let mut current_block_height = latest_block_height_0;
while current_block_height > latest_block_height_1 {
let mut candidate_ledger_root = None;
for (previous_ledger_root, block_height) in self.ledger_roots.iter() {
if block_height == current_block_height {
candidate_ledger_root = Some(previous_ledger_root);
break;
}
}
if let Some(previous_ledger_root) = candidate_ledger_root {
self.ledger_roots.remove(&previous_ledger_root)?;
current_block_height = current_block_height.saturating_sub(1);
} else {
return Err(anyhow!(
"Loaded a ledger with inconsistent state ({} != {}) (failed to automatically resolve)",
current_block_height,
latest_block_height_1
));
}
}
debug!("Successfully resolved inconsistent ledger state");
Ok(current_block_height)
} else {
Err(anyhow!(
"Loaded a ledger with inconsistent state ({} != {}) (unable to automatically resolve)",
latest_block_height_0,
latest_block_height_1
))
}
}
},
(None, None) => Ok(0u32),
_ => Err(anyhow!("Ledger storage state is inconsistent")),
}
}
fn clear_incompatible_blocks(&self, latest_block_height: u32, revert_block_height: u32) -> Result<u32> {
let _map_lock = self.map_lock.read();
let mut current_block_height = latest_block_height;
while current_block_height > revert_block_height {
if current_block_height == 0 {
break;
}
let block_hash = match self.blocks.block_heights.get(¤t_block_height)? {
Some(block_hash) => block_hash,
None => {
warn!("Block {} missing from block heights map", current_block_height);
break;
}
};
let transaction_ids = match self.blocks.block_transactions.get(&block_hash)? {
Some(transaction_ids) => transaction_ids,
None => {
warn!("Block {} missing from block transactions map", block_hash);
break;
}
};
self.blocks.block_heights.remove(¤t_block_height)?;
self.blocks.block_headers.remove(&block_hash)?;
self.blocks.block_transactions.remove(&block_hash)?;
for transaction_ids in transaction_ids.iter() {
self.blocks.transactions.remove_transaction(transaction_ids)?;
}
let remove_ledger_root = self
.ledger_roots
.iter()
.filter(|(_, block_height)| current_block_height == *block_height)
.collect::<Vec<_>>();
for (ledger_root, _) in remove_ledger_root {
self.ledger_roots.remove(&ledger_root)?;
}
current_block_height = current_block_height.saturating_sub(1);
trace!("Ledger successfully reverted to block {}", current_block_height);
}
Ok(current_block_height)
}
pub fn shut_down(&self) -> Arc<RwLock<()>> {
self.map_lock.clone()
}
#[cfg(test)]
#[allow(dead_code)]
fn dump_blocks<P: AsRef<Path>>(&self, path: P, count: u32) -> Result<()> {
let mut file = std::fs::File::create(path)?;
let mut blocks = Vec::with_capacity(count as usize);
println!("Commencing block dump");
for i in 1..count {
if i % 10 == 0 {
println!("Dumping block {}/{}", i, count);
}
let block = self.get_block(i)?;
blocks.push(block);
}
println!("Block dump complete");
bincode::serialize_into(&mut file, &blocks)?;
Ok(())
}
}
#[derive(Clone, Debug)]
struct BlockState<N: Network> {
block_heights: DataMap<u32, N::BlockHash>,
block_headers: DataMap<N::BlockHash, BlockHeader<N>>,
block_transactions: DataMap<N::BlockHash, Vec<N::TransactionID>>,
transactions: TransactionState<N>,
}
impl<N: Network> BlockState<N> {
fn open<S: Storage>(storage: S) -> Result<Self> {
Ok(Self {
block_heights: storage.open_map(MapId::BlockHeights)?,
block_headers: storage.open_map(MapId::BlockHeaders)?,
block_transactions: storage.open_map(MapId::BlockTransactions)?,
transactions: TransactionState::open(storage)?,
})
}
fn contains_block_height(&self, block_height: u32) -> Result<bool> {
self.block_heights.contains_key(&block_height)
}
fn contains_block_hash(&self, block_hash: &N::BlockHash) -> Result<bool> {
self.block_headers.contains_key(block_hash)
}
fn contains_transaction(&self, transaction_id: &N::TransactionID) -> Result<bool> {
self.transactions.contains_transaction(transaction_id)
}
fn contains_serial_number(&self, serial_number: &N::SerialNumber) -> Result<bool> {
self.transactions.contains_serial_number(serial_number)
}
fn contains_commitment(&self, commitment: &N::Commitment) -> Result<bool> {
self.transactions.contains_commitment(commitment)
}
fn get_ciphertext(&self, commitment: &N::Commitment) -> Result<N::RecordCiphertext> {
self.transactions.get_ciphertext(commitment)
}
fn get_transition(&self, transition_id: &N::TransitionID) -> Result<Transition<N>> {
self.transactions.get_transition(transition_id)
}
fn get_transaction(&self, transaction_id: &N::TransactionID) -> Result<Transaction<N>> {
self.transactions.get_transaction(transaction_id)
}
fn get_transaction_metadata(&self, transaction_id: &N::TransactionID) -> Result<Metadata<N>> {
self.transactions.get_transaction_metadata(transaction_id)
}
fn get_cumulative_weight(&self, block_height: u32) -> Result<u128> {
Ok(self.get_block_header(block_height)?.cumulative_weight())
}
fn get_block_height(&self, block_hash: &N::BlockHash) -> Result<u32> {
match self.block_headers.get(block_hash)? {
Some(block_header) => Ok(block_header.height()),
None => return Err(anyhow!("Block {} missing from block headers map", block_hash)),
}
}
fn get_block_hash(&self, block_height: u32) -> Result<N::BlockHash> {
self.get_previous_block_hash(block_height + 1)
}
fn get_block_hashes(&self, start_block_height: u32, end_block_height: u32) -> Result<Vec<N::BlockHash>> {
if start_block_height > end_block_height {
return Err(anyhow!("Invalid starting and ending block heights"));
}
(start_block_height..=end_block_height)
.into_iter()
.map(|height| self.get_block_hash(height))
.collect()
}
fn get_previous_block_hash(&self, block_height: u32) -> Result<N::BlockHash> {
match block_height == 0 {
true => Ok(N::genesis_block().previous_block_hash()),
false => match self.block_heights.get(&(block_height - 1))? {
Some(block_hash) => Ok(block_hash),
None => return Err(anyhow!("Block {} missing in block heights map", block_height - 1)),
},
}
}
fn get_block_header(&self, block_height: u32) -> Result<BlockHeader<N>> {
let block_hash = self.get_block_hash(block_height)?;
match self.block_headers.get(&block_hash)? {
Some(block_header) => Ok(block_header),
None => return Err(anyhow!("Block {} missing from block headers map", block_hash)),
}
}
fn get_block_headers(&self, start_block_height: u32, end_block_height: u32) -> Result<Vec<BlockHeader<N>>> {
if start_block_height > end_block_height {
return Err(anyhow!("Invalid starting and ending block heights"));
}
(start_block_height..=end_block_height)
.into_par_iter()
.map(|height| self.get_block_header(height))
.collect()
}
fn get_block_header_count(&self) -> Result<u32> {
let block_hashes = self.block_heights.values().collect::<HashSet<_>>();
let count = self.block_headers.keys().filter(|hash| block_hashes.contains(hash)).count();
Ok(count as u32)
}
fn get_block_transactions(&self, block_height: u32) -> Result<Transactions<N>> {
let block_hash = self.get_block_hash(block_height)?;
let transaction_ids = match self.block_transactions.get(&block_hash)? {
Some(transaction_ids) => transaction_ids,
None => return Err(anyhow!("Block {} missing from block transactions map", block_hash)),
};
let transactions = {
let mut transactions = Vec::with_capacity(transaction_ids.len());
for transaction_id in transaction_ids.iter() {
transactions.push(self.transactions.get_transaction(transaction_id)?)
}
Transactions::from(&transactions)?
};
Ok(transactions)
}
fn get_block(&self, block_height: u32) -> Result<Block<N>> {
let previous_block_hash = self.get_previous_block_hash(block_height)?;
let block_header = self.get_block_header(block_height)?;
let transactions = self.get_block_transactions(block_height)?;
Ok(Block::from(previous_block_hash, block_header, transactions)?)
}
fn get_blocks(&self, start_block_height: u32, end_block_height: u32) -> Result<Vec<Block<N>>> {
if start_block_height > end_block_height {
return Err(anyhow!("Invalid starting and ending block heights"));
}
(start_block_height..=end_block_height)
.into_par_iter()
.map(|height| self.get_block(height))
.collect()
}
fn get_previous_ledger_root(&self, block_height: u32) -> Result<N::LedgerRoot> {
let block_header = self.get_block_header(block_height)?;
Ok(block_header.previous_ledger_root())
}
fn add_block(&self, block: &Block<N>) -> Result<()> {
let block_height = block.height();
if self.block_heights.contains_key(&block_height)? {
Err(anyhow!("Block {} already exists in storage", block_height))
} else {
let block_hash = block.hash();
let block_header = block.header();
let transactions = block.transactions();
let transaction_ids = transactions.transaction_ids().collect::<Vec<_>>();
self.block_heights.insert(&block_height, &block_hash)?;
self.block_headers.insert(&block_hash, block_header)?;
self.block_transactions.insert(&block_hash, &transaction_ids)?;
for (index, transaction) in transactions.iter().enumerate() {
let metadata = Metadata::<N>::new(block_height, block_hash, block.timestamp(), index as u16);
self.transactions.add_transaction(transaction, metadata)?;
}
Ok(())
}
}
fn remove_block(&self, block_height: u32) -> Result<()> {
if block_height == 0 {
Err(anyhow!("Block {} cannot be removed from storage", block_height))
}
else {
let block_hash = match self.block_heights.get(&block_height)? {
Some(block_hash) => block_hash,
None => return Err(anyhow!("Block {} missing from block heights map", block_height)),
};
let block_header = match self.block_headers.get(&block_hash)? {
Some(block_header) => block_header,
None => return Err(anyhow!("Block {} missing from block headers map", block_hash)),
};
let transaction_ids = match self.block_transactions.get(&block_hash)? {
Some(transaction_ids) => transaction_ids,
None => return Err(anyhow!("Block {} missing from block transactions map", block_hash)),
};
let block_height = block_header.height();
self.block_heights.remove(&block_height)?;
self.block_headers.remove(&block_hash)?;
self.block_transactions.remove(&block_hash)?;
for transaction_ids in transaction_ids.iter() {
self.transactions.remove_transaction(transaction_ids)?;
}
Ok(())
}
}
}
#[derive(Clone, Debug)]
#[allow(clippy::type_complexity)]
struct TransactionState<N: Network> {
transactions: DataMap<N::TransactionID, (N::LedgerRoot, Vec<N::TransitionID>, Metadata<N>)>,
transitions: DataMap<N::TransitionID, (N::TransactionID, u8, Transition<N>)>,
serial_numbers: DataMap<N::SerialNumber, N::TransitionID>,
commitments: DataMap<N::Commitment, N::TransitionID>,
}
impl<N: Network> TransactionState<N> {
fn open<S: Storage>(storage: S) -> Result<Self> {
Ok(Self {
transactions: storage.open_map(MapId::Transactions)?,
transitions: storage.open_map(MapId::Transitions)?,
serial_numbers: storage.open_map(MapId::SerialNumbers)?,
commitments: storage.open_map(MapId::Commitments)?,
})
}
fn contains_transaction(&self, transaction_id: &N::TransactionID) -> Result<bool> {
self.transactions.contains_key(transaction_id)
}
fn contains_serial_number(&self, serial_number: &N::SerialNumber) -> Result<bool> {
self.serial_numbers.contains_key(serial_number)
}
fn contains_commitment(&self, commitment: &N::Commitment) -> Result<bool> {
self.commitments.contains_key(commitment)
}
fn get_ciphertext(&self, commitment: &N::Commitment) -> Result<N::RecordCiphertext> {
let transition_id = match self.commitments.get(commitment)? {
Some(transition_id) => transition_id,
None => return Err(anyhow!("Commitment {} does not exist in storage", commitment)),
};
let transition = match self.transitions.get(&transition_id)? {
Some((_, _, transition)) => transition,
None => return Err(anyhow!("Transition {} does not exist in storage", transition_id)),
};
for (candidate_commitment, candidate_ciphertext) in transition.commitments().zip_eq(transition.ciphertexts()) {
if candidate_commitment == commitment {
return Ok(candidate_ciphertext.clone());
}
}
Err(anyhow!("Commitment {} is missing in storage", commitment))
}
fn get_transition(&self, transition_id: &N::TransitionID) -> Result<Transition<N>> {
match self.transitions.get(transition_id)? {
Some((_, _, transition)) => Ok(transition),
None => return Err(anyhow!("Transition {} does not exist in storage", transition_id)),
}
}
fn get_transaction(&self, transaction_id: &N::TransactionID) -> Result<Transaction<N>> {
let (ledger_root, transition_ids) = match self.transactions.get(transaction_id)? {
Some((ledger_root, transition_ids, _)) => (ledger_root, transition_ids),
None => return Err(anyhow!("Transaction {} does not exist in storage", transaction_id)),
};
let mut transitions = Vec::with_capacity(transition_ids.len());
for transition_id in transition_ids.iter() {
match self.transitions.get(transition_id)? {
Some((_, _, transition)) => transitions.push(transition),
None => return Err(anyhow!("Transition {} missing in storage", transition_id)),
};
}
Transaction::from(*N::inner_circuit_id(), ledger_root, transitions)
}
fn get_transaction_metadata(&self, transaction_id: &N::TransactionID) -> Result<Metadata<N>> {
match self.transactions.get(transaction_id)? {
Some((_, _, metadata)) => Ok(metadata),
None => Err(anyhow!("Transaction {} does not exist in storage", transaction_id)),
}
}
fn add_transaction(&self, transaction: &Transaction<N>, metadata: Metadata<N>) -> Result<()> {
let transaction_id = transaction.transaction_id();
if self.transactions.contains_key(&transaction_id)? {
Err(anyhow!("Transaction {} already exists in storage", transaction_id))
} else {
let transition_ids = transaction.transition_ids().collect();
let transitions = transaction.transitions();
let ledger_root = transaction.ledger_root();
self.transactions
.insert(&transaction_id, &(ledger_root, transition_ids, metadata))?;
for (i, transition) in transitions.iter().enumerate() {
let transition_id = transition.transition_id();
self.transitions
.insert(&transition_id, &(transaction_id, i as u8, transition.clone()))?;
for serial_number in transition.serial_numbers() {
self.serial_numbers.insert(serial_number, &transition_id)?;
}
for commitment in transition.commitments() {
self.commitments.insert(commitment, &transition_id)?;
}
}
Ok(())
}
}
fn remove_transaction(&self, transaction_id: &N::TransactionID) -> Result<()> {
let transition_ids = match self.transactions.get(transaction_id)? {
Some((_, transition_ids, _)) => transition_ids,
None => return Err(anyhow!("Transaction {} does not exist in storage", transaction_id)),
};
self.transactions.remove(transaction_id)?;
for (_, transition_id) in transition_ids.iter().enumerate() {
let transition = match self.transitions.get(transition_id)? {
Some((_, _, transition)) => transition,
None => return Err(anyhow!("Transition {} missing from transitions map", transition_id)),
};
self.transitions.remove(transition_id)?;
for serial_number in transition.serial_numbers() {
self.serial_numbers.remove(serial_number)?;
}
for commitment in transition.commitments() {
self.commitments.remove(commitment)?;
}
}
Ok(())
}
}