use std::collections::{HashSet, VecDeque};
use std::io;
use std::sync::Arc;
use vapory_types::{Address, H256};
use tetsy_hash_db::HashDB;
use tetsy_keccak_hash::keccak;
use tetsy_kvdb::{DBTransaction, DBValue, KeyValueDB};
use log::trace;
use lru_cache::LruCache;
use parking_lot::Mutex;
use account_state::{self, Account};
use bloom_journal::{Bloom, BloomJournal};
use common_types::BlockNumber;
use vapcore_db::COL_ACCOUNT_BLOOM;
use journaldb::JournalDB;
use tetsy_keccak_hasher::KeccakHasher;
use memory_cache::MemoryLruCache;
pub const ACCOUNT_BLOOM_SPACE: usize = 1048576;
pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000;
pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count";
const STATE_CACHE_BLOCKS: usize = 12;
const ACCOUNT_CACHE_RATIO: usize = 90;
struct AccountCache {
accounts: LruCache<Address, Option<Account>>,
modifications: VecDeque<BlockChanges>,
}
struct CacheQueueItem {
address: Address,
account: SyncAccount,
modified: bool,
}
#[derive(Debug)]
struct BlockChanges {
number: BlockNumber,
hash: H256,
parent: H256,
accounts: HashSet<Address>,
is_canon: bool,
}
pub struct StateDB {
db: Box<dyn JournalDB>,
account_cache: Arc<Mutex<AccountCache>>,
code_cache: Arc<Mutex<MemoryLruCache<H256, Arc<Vec<u8>>>>>,
local_cache: Vec<CacheQueueItem>,
account_bloom: Arc<Mutex<Bloom>>,
cache_size: usize,
parent_hash: Option<H256>,
commit_hash: Option<H256>,
commit_number: Option<BlockNumber>,
}
impl Clone for StateDB {
fn clone(&self) -> Self {
self.boxed_clone()
}
}
impl StateDB {
pub fn new(db: Box<dyn JournalDB>, cache_size: usize) -> StateDB {
let bloom = Self::load_bloom(&**db.backing());
let acc_cache_size = cache_size * ACCOUNT_CACHE_RATIO / 100;
let code_cache_size = cache_size - acc_cache_size;
let cache_items = acc_cache_size / ::std::mem::size_of::<Option<Account>>();
StateDB {
db,
account_cache: Arc::new(Mutex::new(AccountCache {
accounts: LruCache::new(cache_items),
modifications: VecDeque::new(),
})),
code_cache: Arc::new(Mutex::new(MemoryLruCache::new(code_cache_size))),
local_cache: Vec::new(),
account_bloom: Arc::new(Mutex::new(bloom)),
cache_size,
parent_hash: None,
commit_hash: None,
commit_number: None,
}
}
pub fn load_bloom(db: &dyn KeyValueDB) -> Bloom {
let hash_count_entry = db.get(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY)
.expect("Low-level database error");
let hash_count_bytes = match hash_count_entry {
Some(bytes) => bytes,
None => return Bloom::new(ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET),
};
assert_eq!(hash_count_bytes.len(), 1);
let hash_count = hash_count_bytes[0];
let mut bloom_parts = vec![0u64; ACCOUNT_BLOOM_SPACE / 8];
for i in 0..ACCOUNT_BLOOM_SPACE / 8 {
let key: [u8; 8] = (i as u64).to_le_bytes();
bloom_parts[i] = db.get(COL_ACCOUNT_BLOOM, &key).expect("low-level database error")
.map(|val| {
assert_eq!(val.len(), 8, "low-level database error");
let mut buff = [0u8; 8];
buff.copy_from_slice(&*val);
u64::from_le_bytes(buff)
})
.unwrap_or(0u64);
}
let bloom = Bloom::from_parts(&bloom_parts, hash_count as u32);
trace!(target: "account_bloom", "Bloom is {:?} full, hash functions count = {:?}", bloom.saturation(), hash_count);
bloom
}
pub fn commit_bloom(batch: &mut DBTransaction, journal: BloomJournal) -> io::Result<()> {
assert!(journal.hash_functions <= 255);
batch.put(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY, &[journal.hash_functions as u8]);
for (bloom_part_index, bloom_part_value) in journal.entries {
let key: [u8; 8] = (bloom_part_index as u64).to_le_bytes();
let val: [u8; 8] = bloom_part_value.to_le_bytes();
batch.put(COL_ACCOUNT_BLOOM, &key, &val);
}
Ok(())
}
pub fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> io::Result<u32> {
{
let mut bloom_lock = self.account_bloom.lock();
Self::commit_bloom(batch, bloom_lock.drain_journal())?;
}
let records = self.db.journal_under(batch, now, id)?;
self.commit_hash = Some(id.clone());
self.commit_number = Some(now);
Ok(records)
}
pub fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> io::Result<u32> {
self.db.mark_canonical(batch, end_era, canon_id)
}
pub fn sync_cache(&mut self, enacted: &[H256], retracted: &[H256], is_best: bool) {
trace!("sync_cache id = (#{:?}, {:?}), parent={:?}, best={}", self.commit_number, self.commit_hash, self.parent_hash, is_best);
let mut cache = self.account_cache.lock();
let cache = &mut *cache;
let mut clear = false;
for block in enacted.iter().filter(|h| self.commit_hash.as_ref().map_or(true, |p| *h != p)) {
clear = clear || {
if let Some(ref mut m) = cache.modifications.iter_mut().find(|m| &m.hash == block) {
trace!("Reverting enacted block {:?}", block);
m.is_canon = true;
for a in &m.accounts {
trace!("Reverting enacted address {:?}", a);
cache.accounts.remove(a);
}
false
} else {
true
}
};
}
for block in retracted {
clear = clear || {
if let Some(ref mut m) = cache.modifications.iter_mut().find(|m| &m.hash == block) {
trace!("Retracting block {:?}", block);
m.is_canon = false;
for a in &m.accounts {
trace!("Retracted address {:?}", a);
cache.accounts.remove(a);
}
false
} else {
true
}
};
}
if clear {
trace!("Wiping cache");
cache.accounts.clear();
cache.modifications.clear();
}
if let (Some(ref number), Some(ref hash), Some(ref parent)) = (self.commit_number, self.commit_hash, self.parent_hash) {
if cache.modifications.len() == STATE_CACHE_BLOCKS {
cache.modifications.pop_back();
}
let mut modifications = HashSet::new();
trace!("committing {} cache entries", self.local_cache.len());
for account in self.local_cache.drain(..) {
if account.modified {
modifications.insert(account.address.clone());
}
if is_best {
let acc = account.account.0;
if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&account.address) {
if let Some(new) = acc {
if account.modified {
existing.overwrite_with(new);
}
continue;
}
}
cache.accounts.insert(account.address, acc);
}
}
let block_changes = BlockChanges {
accounts: modifications,
number: *number,
hash: hash.clone(),
is_canon: is_best,
parent: parent.clone(),
};
let insert_at = cache.modifications.iter().enumerate().find(|&(_, m)| m.number < *number).map(|(i, _)| i);
trace!("inserting modifications at {:?}", insert_at);
if let Some(insert_at) = insert_at {
cache.modifications.insert(insert_at, block_changes);
} else {
cache.modifications.push_back(block_changes);
}
}
}
pub fn as_hash_db(&self) -> &dyn HashDB<KeccakHasher, DBValue> {
self.db.as_hash_db()
}
pub fn as_hash_db_mut(&mut self) -> &mut dyn HashDB<KeccakHasher, DBValue> {
self.db.as_hash_db_mut()
}
pub fn boxed_clone(&self) -> StateDB {
StateDB {
db: self.db.boxed_clone(),
account_cache: self.account_cache.clone(),
code_cache: self.code_cache.clone(),
local_cache: Vec::new(),
account_bloom: self.account_bloom.clone(),
cache_size: self.cache_size,
parent_hash: None,
commit_hash: None,
commit_number: None,
}
}
pub fn boxed_clone_canon(&self, parent: &H256) -> StateDB {
StateDB {
db: self.db.boxed_clone(),
account_cache: self.account_cache.clone(),
code_cache: self.code_cache.clone(),
local_cache: Vec::new(),
account_bloom: self.account_bloom.clone(),
cache_size: self.cache_size,
parent_hash: Some(parent.clone()),
commit_hash: None,
commit_number: None,
}
}
pub fn is_prunable(&self) -> bool {
self.db.is_prunable()
}
pub fn mem_used(&self) -> usize {
self.db.mem_used() + {
let accounts = self.account_cache.lock().accounts.len();
let code_size = self.code_cache.lock().current_size();
code_size + accounts * ::std::mem::size_of::<Option<Account>>()
}
}
pub fn journal_db(&self) -> &dyn JournalDB {
&*self.db
}
pub fn cache_size(&self) -> usize {
self.cache_size
}
fn is_allowed(addr: &Address, parent_hash: &H256, modifications: &VecDeque<BlockChanges>) -> bool {
if modifications.is_empty() {
return true;
}
let mut parent = parent_hash;
for m in modifications {
if &m.hash == parent {
if m.is_canon {
return true;
}
parent = &m.parent;
}
if m.accounts.contains(addr) {
trace!("Cache lookup skipped for {:?}: modified in a later block", addr);
return false;
}
}
trace!("Cache lookup skipped for {:?}: parent hash is unknown", addr);
false
}
}
impl account_state::Backend for StateDB {
fn as_hash_db(&self) -> &dyn HashDB<KeccakHasher, DBValue> { self.db.as_hash_db() }
fn as_hash_db_mut(&mut self) -> &mut dyn HashDB<KeccakHasher, DBValue> {
self.db.as_hash_db_mut()
}
fn add_to_account_cache(&mut self, address: Address, data: Option<Account>, modified: bool) {
self.local_cache.push(CacheQueueItem {
address,
account: SyncAccount(data),
modified,
})
}
fn cache_code(&self, hash: H256, code: Arc<Vec<u8>>) {
let mut cache = self.code_cache.lock();
cache.insert(hash, code);
}
fn get_cached_account(&self, addr: &Address) -> Option<Option<Account>> {
self.parent_hash.as_ref().and_then(|parent_hash| {
let mut cache = self.account_cache.lock();
if !Self::is_allowed(addr, parent_hash, &cache.modifications) {
return None;
}
cache.accounts.get_mut(addr).map(|a| a.as_ref().map(|a| a.clone_basic()))
})
}
fn get_cached<F, U>(&self, a: &Address, f: F) -> Option<U>
where F: FnOnce(Option<&mut Account>) -> U
{
self.parent_hash.as_ref().and_then(|parent_hash| {
let mut cache = self.account_cache.lock();
if !Self::is_allowed(a, parent_hash, &cache.modifications) {
return None;
}
cache.accounts.get_mut(a).map(|c| f(c.as_mut()))
})
}
fn get_cached_code(&self, hash: &H256) -> Option<Arc<Vec<u8>>> {
let mut cache = self.code_cache.lock();
cache.get_mut(hash).map(|code| code.clone())
}
fn note_non_null_account(&self, address: &Address) {
trace!(target: "account_bloom", "Note account bloom: {:?}", address);
let mut bloom = self.account_bloom.lock();
bloom.set(keccak(address).as_bytes());
}
fn is_known_null(&self, address: &Address) -> bool {
trace!(target: "account_bloom", "Check account bloom: {:?}", address);
let bloom = self.account_bloom.lock();
let is_null = !bloom.check(keccak(address).as_bytes());
is_null
}
}
struct SyncAccount(Option<Account>);
unsafe impl Sync for SyncAccount {}
#[cfg(test)]
mod tests {
use vapory_types::{Address, H256, U256};
use tetsy_kvdb::DBTransaction;
use account_state::{Account, Backend};
use vapcore::test_helpers::get_temp_state_db;
#[test]
fn state_db_smoke() {
let _ = ::env_logger::try_init();
let state_db = get_temp_state_db();
let root_parent = H256::random();
let address = Address::random();
let h0 = H256::random();
let h1a = H256::random();
let h1b = H256::random();
let h2a = H256::random();
let h2b = H256::random();
let h3a = H256::random();
let h3b = H256::random();
let mut batch = DBTransaction::new();
let mut s = state_db.boxed_clone_canon(&root_parent);
s.add_to_account_cache(address, Some(Account::new_basic(2.into(), 0.into())), false);
s.journal_under(&mut batch, 0, &h0).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h0);
s.journal_under(&mut batch, 1, &h1a).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h0);
s.add_to_account_cache(address, Some(Account::new_basic(3.into(), 0.into())), true);
s.journal_under(&mut batch, 1, &h1b).unwrap();
s.sync_cache(&[], &[], false);
let mut s = state_db.boxed_clone_canon(&h1b);
s.add_to_account_cache(address, Some(Account::new_basic(4.into(), 0.into())), true);
s.journal_under(&mut batch, 2, &h2b).unwrap();
s.sync_cache(&[], &[], false);
let mut s = state_db.boxed_clone_canon(&h1a);
s.add_to_account_cache(address, Some(Account::new_basic(5.into(), 0.into())), true);
s.journal_under(&mut batch, 2, &h2a).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h2a);
s.journal_under(&mut batch, 3, &h3a).unwrap();
s.sync_cache(&[], &[], true);
let s = state_db.boxed_clone_canon(&h3a);
assert_eq!(s.get_cached_account(&address).unwrap().unwrap().balance(), &U256::from(5));
let s = state_db.boxed_clone_canon(&h1a);
assert!(s.get_cached_account(&address).is_none());
let s = state_db.boxed_clone_canon(&h2b);
assert!(s.get_cached_account(&address).is_none());
let s = state_db.boxed_clone_canon(&h1b);
assert!(s.get_cached_account(&address).is_none());
let mut s = state_db.boxed_clone_canon(&h2b);
s.journal_under(&mut batch, 3, &h3b).unwrap();
s.sync_cache(&[h1b.clone(), h2b.clone(), h3b.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()], true);
let s = state_db.boxed_clone_canon(&h3a);
assert!(s.get_cached_account(&address).is_none());
}
}