use std::collections::HashSet;
use std::iter::FromIterator;
use vapory_types::H256;
use tetsy_keccak_hash::keccak;
use log::trace;
use vapcore_snapshot::SnapshotService;
use common_types::snapshot::ManifestData;
use indexmap::IndexSet;
#[derive(PartialEq, Eq, Debug)]
pub enum ChunkType {
State(H256),
Block(H256),
}
#[derive(Default, MallocSizeOf)]
pub struct Snapshot {
#[ignore_malloc_size_of = "no impl for IndexSet (yet)"]
pending_state_chunks: IndexSet<H256>,
#[ignore_malloc_size_of = "no impl for IndexSet (yet)"]
pending_block_chunks: IndexSet<H256>,
downloading_chunks: HashSet<H256>,
completed_chunks: HashSet<H256>,
snapshot_hash: Option<H256>,
total_chunks: Option<usize>,
bad_hashes: HashSet<H256>,
initialized: bool,
}
impl Snapshot {
pub fn new() -> Self {
Default::default()
}
pub fn initialize(&mut self, snapshot_service: &dyn SnapshotService, total_chunks: usize) {
if self.initialized {
return;
}
if let Some(completed_chunks) = snapshot_service.completed_chunks() {
self.completed_chunks = HashSet::from_iter(completed_chunks);
}
trace!(
target: "snapshot_sync",
"Snapshot initialized. {}/{} completed chunks.",
self.completed_chunks.len(), total_chunks
);
self.total_chunks = Some(total_chunks);
self.initialized = true;
}
pub fn clear(&mut self) {
self.pending_state_chunks.clear();
self.pending_block_chunks.clear();
self.downloading_chunks.clear();
self.completed_chunks.clear();
self.snapshot_hash = None;
self.total_chunks = None;
self.initialized = false;
}
pub fn have_manifest(&self) -> bool {
self.snapshot_hash.is_some()
}
pub fn reset_to(&mut self, manifest: &ManifestData, hash: &H256) {
self.clear();
self.pending_state_chunks = IndexSet::from_iter(manifest.state_hashes.clone());
self.pending_block_chunks = IndexSet::from_iter(manifest.block_hashes.clone());
self.total_chunks = Some(self.pending_block_chunks.len() + self.pending_state_chunks.len());
self.snapshot_hash = Some(hash.clone());
}
pub fn validate_chunk(&mut self, chunk: &[u8]) -> Result<ChunkType, ()> {
let hash = keccak(chunk);
if self.completed_chunks.contains(&hash) {
trace!(target: "snapshot_sync", "Already proccessed chunk {:x}. Ignoring.", hash);
return Err(());
}
self.downloading_chunks.remove(&hash);
self.pending_block_chunks.take(&hash)
.and_then(|h| {
self.completed_chunks.insert(h);
Some(ChunkType::Block(hash))
})
.or(
self.pending_state_chunks.take(&hash)
.and_then(|h| {
self.completed_chunks.insert(h);
Some(ChunkType::State(hash))
})
).ok_or_else(|| {
trace!(target: "snapshot_sync", "Ignoring unknown chunk: {:x}", hash);
()
})
}
pub fn needed_chunk(&mut self) -> Option<H256> {
let chunk = {
let filter = |h| !self.downloading_chunks.contains(h) && !self.completed_chunks.contains(h);
self.pending_block_chunks.iter()
.find(|&h| filter(h))
.or(self.pending_state_chunks.iter()
.find(|&h| filter(h))
)
.map(|h| *h)
};
if let Some(hash) = chunk {
self.downloading_chunks.insert(hash.clone());
}
chunk
}
pub fn clear_chunk_download(&mut self, hash: &H256) {
self.downloading_chunks.remove(hash);
}
pub fn note_bad(&mut self, hash: H256) {
self.bad_hashes.insert(hash);
}
pub fn is_known_bad(&self, hash: &H256) -> bool {
self.bad_hashes.contains(hash)
}
pub fn snapshot_hash(&self) -> Option<H256> {
self.snapshot_hash
}
pub fn total_chunks(&self) -> usize {
self.total_chunks.unwrap_or_default()
}
pub fn done_chunks(&self) -> usize {
self.completed_chunks.len()
}
pub fn is_complete(&self) -> bool {
self.total_chunks() == self.completed_chunks.len()
}
}
#[cfg(test)]
mod test {
use super::{ChunkType, H256, Snapshot};
use bytes::Bytes;
use tetsy_keccak_hash::keccak;
use common_types::snapshot::ManifestData;
fn is_empty(snapshot: &Snapshot) -> bool {
snapshot.pending_block_chunks.is_empty() &&
snapshot.pending_state_chunks.is_empty() &&
snapshot.completed_chunks.is_empty() &&
snapshot.downloading_chunks.is_empty() &&
snapshot.snapshot_hash.is_none()
}
fn test_manifest() -> (ManifestData, H256, Vec<Bytes>, Vec<Bytes>) {
let state_chunks: Vec<Bytes> = (0..20).map(|_| H256::random().as_bytes().to_vec()).collect();
let block_chunks: Vec<Bytes> = (0..20).map(|_| H256::random().as_bytes().to_vec()).collect();
let manifest = ManifestData {
version: 2,
state_hashes: state_chunks.iter().map(|data| keccak(data)).collect(),
block_hashes: block_chunks.iter().map(|data| keccak(data)).collect(),
state_root: H256::zero(),
block_number: 42,
block_hash: H256::zero(),
};
let mhash = keccak(manifest.clone().into_rlp());
(manifest, mhash, state_chunks, block_chunks)
}
#[test]
fn create_clear() {
let mut snapshot = Snapshot::new();
assert!(is_empty(&snapshot));
let (manifest, mhash, _, _,) = test_manifest();
snapshot.reset_to(&manifest, &mhash);
assert!(!is_empty(&snapshot));
snapshot.clear();
assert!(is_empty(&snapshot));
}
#[test]
fn validate_chunks() {
let mut snapshot = Snapshot::new();
let (manifest, mhash, state_chunks, block_chunks) = test_manifest();
snapshot.reset_to(&manifest, &mhash);
assert_eq!(snapshot.done_chunks(), 0, "no chunks done at outset");
assert!(snapshot.validate_chunk(&H256::random().as_bytes().to_vec()).is_err(), "random chunk is invalid");
let requested: Vec<H256> = (0..40).map(|_| snapshot.needed_chunk().unwrap()).collect();
assert!(snapshot.needed_chunk().is_none(), "no chunks left after all are drained");
let requested_all_block_chunks = manifest.block_hashes.iter()
.all(|h| requested.iter().any(|rh| rh == h));
assert!(requested_all_block_chunks, "all block chunks in the manifest accounted for");
let requested_all_state_chunks = manifest.state_hashes.iter()
.all(|h| requested.iter().any(|rh| rh == h));
assert!(requested_all_state_chunks, "all state chunks in the manifest accounted for");
assert_eq!(snapshot.downloading_chunks.len(), 40, "all requested chunks are downloading");
assert_eq!(
snapshot.validate_chunk(&state_chunks[4]),
Ok(ChunkType::State(manifest.state_hashes[4].clone())),
"4th state chunk hash validates as such"
);
assert_eq!(snapshot.completed_chunks.len(), 1, "after validating a chunk, it's in the completed set");
assert_eq!(snapshot.downloading_chunks.len(), 39, "after validating a chunk, there's one less in the downloading set");
assert_eq!(snapshot.validate_chunk(&block_chunks[10]), Ok(ChunkType::Block(manifest.block_hashes[10].clone())));
assert_eq!(snapshot.completed_chunks.len(), 2);
assert_eq!(snapshot.downloading_chunks.len(), 38);
for (i, data) in state_chunks.iter().enumerate() {
if i != 4 {
assert!(snapshot.validate_chunk(data).is_ok());
}
}
for (i, data) in block_chunks.iter().enumerate() {
if i != 10 {
assert!(snapshot.validate_chunk(data).is_ok());
}
}
assert!(snapshot.is_complete(), "when all chunks have been validated, we're done");
assert_eq!(snapshot.done_chunks(), 40);
assert_eq!(snapshot.done_chunks(), snapshot.total_chunks());
assert_eq!(snapshot.snapshot_hash(), Some(keccak(manifest.into_rlp())));
}
#[test]
fn tracks_known_bad() {
let mut snapshot = Snapshot::new();
let hash = H256::random();
assert_eq!(snapshot.is_known_bad(&hash), false);
snapshot.note_bad(hash);
assert_eq!(snapshot.is_known_bad(&hash), true);
}
}