use crate::{
chain::chain_information::{
self, ChainInformationConsensusRef, ChainInformationFinality, ChainInformationFinalityRef,
ValidChainInformation, ValidChainInformationRef,
},
executor::{
self,
host::{self, HostVmPrototype},
vm::ExecHint,
},
finality::{decode, verify},
header,
informant::HashDisplay,
trie::{self, proof_decode},
};
use alloc::{
borrow::{Cow, ToOwned as _},
collections::{BTreeSet, VecDeque},
vec,
vec::Vec,
};
use core::{cmp, fmt, iter, mem, ops};
pub use trie::Nibble;
#[derive(Debug)]
pub struct Config {
pub start_chain_information: ValidChainInformation,
pub block_number_bytes: usize,
pub sources_capacity: usize,
pub requests_capacity: usize,
pub code_trie_node_hint: Option<ConfigCodeTrieNodeHint>,
pub num_download_ahead_fragments: usize,
pub warp_sync_minimum_gap: usize,
pub download_block_body: bool,
pub download_all_chain_information_storage_proofs: bool,
}
#[derive(Debug)]
pub struct ConfigCodeTrieNodeHint {
pub merkle_value: Vec<u8>,
pub storage_value: Vec<u8>,
pub closest_ancestor_excluding: Vec<Nibble>,
}
pub fn start_warp_sync<TSrc, TRq>(
config: Config,
) -> Result<WarpSync<TSrc, TRq>, (ValidChainInformation, WarpSyncInitError)> {
match config.start_chain_information.as_ref().finality {
ChainInformationFinalityRef::Grandpa {
finalized_scheduled_change: None,
..
} => {}
_ => {
return Err((
config.start_chain_information,
WarpSyncInitError::NotGrandpa,
));
}
}
match config.start_chain_information.as_ref().consensus {
ChainInformationConsensusRef::Babe { .. } | ChainInformationConsensusRef::Aura { .. } => {}
ChainInformationConsensusRef::Unknown => {
return Err((
config.start_chain_information,
WarpSyncInitError::UnknownConsensus,
));
}
}
let warped_header = config
.start_chain_information
.as_ref()
.finalized_block_header
.scale_encoding_vec(config.block_number_bytes);
Ok(WarpSync {
warped_header_number: config
.start_chain_information
.as_ref()
.finalized_block_header
.number,
warped_header_state_root: *config
.start_chain_information
.as_ref()
.finalized_block_header
.state_root,
warped_header_extrinsics_root: *config
.start_chain_information
.as_ref()
.finalized_block_header
.extrinsics_root,
warped_header_hash: header::hash_from_scale_encoded_header(&warped_header),
warped_header,
warped_finality: config.start_chain_information.as_ref().finality.into(),
warped_block_ty: WarpedBlockTy::AlreadyVerified,
runtime_calls: runtime_calls_default_value(
config.start_chain_information.as_ref().consensus,
),
verified_chain_information: config.start_chain_information,
code_trie_node_hint: config.code_trie_node_hint,
num_download_ahead_fragments: config.num_download_ahead_fragments,
warp_sync_minimum_gap: config.warp_sync_minimum_gap,
block_number_bytes: config.block_number_bytes,
download_all_chain_information_storage_proofs: config
.download_all_chain_information_storage_proofs,
sources: slab::Slab::with_capacity(config.sources_capacity),
sources_by_finalized_height: BTreeSet::new(),
in_progress_requests: slab::Slab::with_capacity(config.requests_capacity),
in_progress_requests_by_source: BTreeSet::new(),
warp_sync_fragments_download: None,
verify_queue: VecDeque::new(),
runtime_download: RuntimeDownload::NotStarted {
hint_doesnt_match: false,
},
body_download: if config.download_block_body {
BodyDownload::NotStarted
} else {
BodyDownload::NotNeeded
},
})
}
#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
pub enum WarpSyncInitError {
NotGrandpa,
UnknownConsensus,
}
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct SourceId(usize);
impl SourceId {
pub const MIN: Self = SourceId(usize::MIN);
}
pub struct RuntimeInformation {
pub finalized_runtime: HostVmPrototype,
pub finalized_body: Option<Vec<Vec<u8>>>,
pub finalized_storage_code: Option<Vec<u8>>,
pub finalized_storage_heap_pages: Option<Vec<u8>>,
pub finalized_storage_code_merkle_value: Option<Vec<u8>>,
pub finalized_storage_code_closest_ancestor_excluding: Option<Vec<Nibble>>,
}
#[derive(Debug)]
pub struct WarpSyncFragment {
pub scale_encoded_header: Vec<u8>,
pub scale_encoded_justification: Vec<u8>,
}
pub struct WarpSync<TSrc, TRq> {
warped_header: Vec<u8>,
warped_header_hash: [u8; 32],
warped_header_state_root: [u8; 32],
warped_header_extrinsics_root: [u8; 32],
warped_header_number: u64,
warped_finality: ChainInformationFinality,
warped_block_ty: WarpedBlockTy,
code_trie_node_hint: Option<ConfigCodeTrieNodeHint>,
verified_chain_information: ValidChainInformation,
num_download_ahead_fragments: usize,
warp_sync_minimum_gap: usize,
block_number_bytes: usize,
download_all_chain_information_storage_proofs: bool,
sources: slab::Slab<Source<TSrc>>,
sources_by_finalized_height: BTreeSet<(u64, SourceId)>,
in_progress_requests: slab::Slab<(SourceId, TRq, RequestDetail)>,
in_progress_requests_by_source: BTreeSet<(SourceId, RequestId)>,
warp_sync_fragments_download: Option<RequestId>,
verify_queue: VecDeque<PendingVerify>,
runtime_download: RuntimeDownload,
body_download: BodyDownload,
runtime_calls:
hashbrown::HashMap<chain_information::build::RuntimeCall, CallProof, fnv::FnvBuildHasher>,
}
#[derive(Debug, Copy, Clone)]
struct Source<TSrc> {
user_data: TSrc,
finalized_block_height: u64,
}
enum WarpedBlockTy {
AlreadyVerified,
KnownBad,
Normal,
}
enum RuntimeDownload {
NotStarted {
hint_doesnt_match: bool,
},
Downloading {
hint_doesnt_match: bool,
request_id: RequestId,
},
NotVerified {
downloaded_source: Option<SourceId>,
hint_doesnt_match: bool,
trie_proof: Vec<u8>,
},
Verified {
downloaded_runtime: DownloadedRuntime,
chain_info_builder: chain_information::build::ChainInformationBuild,
},
}
enum BodyDownload {
NotNeeded,
NotStarted,
Downloading {
request_id: RequestId,
},
Downloaded {
downloaded_source: Option<SourceId>,
body: Vec<Vec<u8>>,
},
}
struct PendingVerify {
downloaded_source: Option<SourceId>,
final_set_of_fragments: bool,
fragments: Vec<WarpSyncFragment>,
next_fragment_to_verify_index: usize,
}
struct DownloadedRuntime {
storage_code: Option<Vec<u8>>,
storage_heap_pages: Option<Vec<u8>>,
code_merkle_value: Option<Vec<u8>>,
closest_ancestor_excluding: Option<Vec<Nibble>>,
}
enum CallProof {
NotStarted,
Downloading(RequestId),
Downloaded {
downloaded_source: Option<SourceId>,
proof: Vec<u8>,
},
}
fn runtime_calls_default_value(
verified_chain_information_consensus: chain_information::ChainInformationConsensusRef,
) -> hashbrown::HashMap<chain_information::build::RuntimeCall, CallProof, fnv::FnvBuildHasher> {
let mut list = hashbrown::HashMap::with_capacity_and_hasher(8, Default::default());
match verified_chain_information_consensus {
ChainInformationConsensusRef::Aura { .. } => {
list.insert(
chain_information::build::RuntimeCall::AuraApiAuthorities,
CallProof::NotStarted,
);
list.insert(
chain_information::build::RuntimeCall::AuraApiSlotDuration,
CallProof::NotStarted,
);
}
ChainInformationConsensusRef::Babe { .. } => {
list.insert(
chain_information::build::RuntimeCall::BabeApiCurrentEpoch,
CallProof::NotStarted,
);
list.insert(
chain_information::build::RuntimeCall::BabeApiNextEpoch,
CallProof::NotStarted,
);
list.insert(
chain_information::build::RuntimeCall::BabeApiConfiguration,
CallProof::NotStarted,
);
}
ChainInformationConsensusRef::Unknown => {}
}
list
}
#[derive(Debug)]
pub enum Status<'a, TSrc> {
Fragments {
source: Option<(SourceId, &'a TSrc)>,
finalized_block_hash: [u8; 32],
finalized_block_number: u64,
},
ChainInformation {
finalized_block_hash: [u8; 32],
finalized_block_number: u64,
},
}
impl<TSrc, TRq> WarpSync<TSrc, TRq> {
pub fn block_number_bytes(&self) -> usize {
self.block_number_bytes
}
pub fn as_chain_information(&'_ self) -> ValidChainInformationRef<'_> {
(&self.verified_chain_information).into()
}
pub fn set_chain_information(&mut self, chain_information: ValidChainInformationRef) {
if self.warped_header_number <= chain_information.as_ref().finalized_block_header.number {
self.warped_header = chain_information
.as_ref()
.finalized_block_header
.scale_encoding_vec(self.block_number_bytes);
self.warped_header_hash = chain_information
.as_ref()
.finalized_block_header
.hash(self.block_number_bytes);
self.warped_header_state_root =
*chain_information.as_ref().finalized_block_header.state_root;
self.warped_header_extrinsics_root = *chain_information
.as_ref()
.finalized_block_header
.extrinsics_root;
self.warped_header_number = chain_information.as_ref().finalized_block_header.number;
self.warped_finality = chain_information.as_ref().finality.into();
self.warped_block_ty = WarpedBlockTy::AlreadyVerified;
self.verified_chain_information = chain_information.into();
self.runtime_calls =
runtime_calls_default_value(self.verified_chain_information.as_ref().consensus);
self.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: false,
};
if !matches!(self.body_download, BodyDownload::NotNeeded) {
self.body_download = BodyDownload::NotStarted;
}
}
}
pub fn status(&'_ self) -> Status<'_, TSrc> {
match &self.runtime_download {
RuntimeDownload::NotStarted { .. } => {
let finalized_block_hash = self.warped_header_hash;
let source_id =
if let Some(warp_sync_fragments_download) = self.warp_sync_fragments_download {
Some(
self.in_progress_requests
.get(warp_sync_fragments_download.0)
.unwrap()
.0,
)
} else {
self.verify_queue.back().and_then(|f| f.downloaded_source)
};
Status::Fragments {
source: source_id.map(|id| (id, &self.sources[id.0].user_data)),
finalized_block_hash,
finalized_block_number: self.warped_header_number,
}
}
_ => Status::ChainInformation {
finalized_block_hash: self.warped_header_hash,
finalized_block_number: self.warped_header_number,
},
}
}
pub fn sources(&self) -> impl Iterator<Item = SourceId> {
self.sources.iter().map(|(id, _)| SourceId(id))
}
pub fn source_num_ongoing_requests(&self, source_id: SourceId) -> usize {
assert!(self.sources.contains(source_id.0));
self.in_progress_requests_by_source
.range((source_id, RequestId(usize::MIN))..=(source_id, RequestId(usize::MAX)))
.count()
}
pub fn add_source(&mut self, user_data: TSrc) -> SourceId {
let source_id = SourceId(self.sources.insert(Source {
user_data,
finalized_block_height: 0,
}));
let _inserted = self.sources_by_finalized_height.insert((0, source_id));
debug_assert!(_inserted);
debug_assert!(self.sources.len() >= self.sources_by_finalized_height.len());
source_id
}
pub fn remove_source(
&mut self,
to_remove: SourceId,
) -> (TSrc, impl Iterator<Item = (RequestId, TRq)>) {
debug_assert!(self.sources.contains(to_remove.0));
let removed = self.sources.remove(to_remove.0);
let _was_in = self
.sources_by_finalized_height
.remove(&(removed.finalized_block_height, to_remove));
debug_assert!(_was_in);
debug_assert!(self.sources.len() >= self.sources_by_finalized_height.len());
for item in &mut self.verify_queue {
if item.downloaded_source == Some(to_remove) {
item.downloaded_source = None;
}
}
if let RuntimeDownload::NotVerified {
downloaded_source, ..
} = &mut self.runtime_download
{
if *downloaded_source == Some(to_remove) {
*downloaded_source = None;
}
}
if let BodyDownload::Downloaded {
downloaded_source, ..
} = &mut self.body_download
{
if *downloaded_source == Some(to_remove) {
*downloaded_source = None;
}
}
for (_, call_proof) in &mut self.runtime_calls {
if let CallProof::Downloaded {
downloaded_source, ..
} = call_proof
{
if *downloaded_source == Some(to_remove) {
*downloaded_source = None;
}
}
}
let obsolete_requests_indices = self
.in_progress_requests_by_source
.range((to_remove, RequestId(usize::MIN))..=(to_remove, RequestId(usize::MAX)))
.map(|(_, rq_id)| rq_id.0)
.collect::<Vec<_>>();
let mut obsolete_requests = Vec::with_capacity(obsolete_requests_indices.len());
for index in obsolete_requests_indices {
let (_, user_data, _) = self.in_progress_requests.remove(index);
self.in_progress_requests_by_source
.remove(&(to_remove, RequestId(index)));
if self.warp_sync_fragments_download == Some(RequestId(index)) {
self.warp_sync_fragments_download = None;
}
for call in self.runtime_calls.values_mut() {
if matches!(call, CallProof::Downloading(rq_id) if *rq_id == RequestId(index)) {
*call = CallProof::NotStarted;
}
}
if let RuntimeDownload::Downloading {
request_id,
hint_doesnt_match,
} = &mut self.runtime_download
{
if *request_id == RequestId(index) {
self.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
};
}
}
if let BodyDownload::Downloading { request_id } = &mut self.body_download {
if *request_id == RequestId(index) {
self.body_download = BodyDownload::NotStarted;
}
}
obsolete_requests.push((RequestId(index), user_data));
}
(removed.user_data, obsolete_requests.into_iter())
}
pub fn set_source_finality_state(&mut self, source_id: SourceId, finalized_block_height: u64) {
let stored_height = &mut self.sources[source_id.0].finalized_block_height;
if *stored_height == finalized_block_height {
return;
}
let _was_in = self
.sources_by_finalized_height
.remove(&(*stored_height, source_id));
debug_assert!(_was_in);
let _inserted = self
.sources_by_finalized_height
.insert((finalized_block_height, source_id));
debug_assert!(_inserted);
*stored_height = finalized_block_height;
}
pub fn source_finality_state(&self, source_id: SourceId) -> u64 {
self.sources[source_id.0].finalized_block_height
}
pub fn desired_requests(&self) -> impl Iterator<Item = (SourceId, &TSrc, DesiredRequest)> {
let mut desired_warp_sync_request = if self.warp_sync_fragments_download.is_none() {
if self.verify_queue.iter().fold(0, |sum, entry| {
sum + entry.fragments.len() - entry.next_fragment_to_verify_index
}) < self.num_download_ahead_fragments
{
let start_block_hash = self
.verify_queue
.back()
.and_then(|entry| entry.fragments.last())
.map(|fragment| {
header::hash_from_scale_encoded_header(&fragment.scale_encoded_header)
})
.unwrap_or(self.warped_header_hash);
let verify_queue_tail_block_number = self
.verify_queue
.back()
.map(|entry| {
entry
.fragments
.last()
.and_then(|fragment| {
header::decode(
&fragment.scale_encoded_header,
self.block_number_bytes,
)
.ok()
})
.map(|header| header.number)
})
.unwrap_or(Some(self.warped_header_number));
let warp_sync_minimum_gap = self.warp_sync_minimum_gap;
if let Some(verify_queue_tail_block_number) = verify_queue_tail_block_number {
either::Left(self.sources.iter().filter_map(move |(src_id, src)| {
if src.finalized_block_height
<= verify_queue_tail_block_number.saturating_add(
u64::try_from(warp_sync_minimum_gap).unwrap_or(u64::MAX),
)
{
return None;
}
Some((
SourceId(src_id),
&src.user_data,
DesiredRequest::WarpSyncRequest {
block_hash: start_block_hash,
},
))
}))
} else {
either::Right(iter::empty())
}
} else {
either::Right(iter::empty())
}
} else {
either::Right(iter::empty())
}
.peekable();
let desired_runtime_parameters_get = if let (
WarpedBlockTy::Normal,
RuntimeDownload::NotStarted { hint_doesnt_match },
None,
true,
None,
) = (
&self.warped_block_ty,
&self.runtime_download,
self.warp_sync_fragments_download,
self.verify_queue.is_empty(),
desired_warp_sync_request.peek(),
) {
let code_key_to_request = if let (false, Some(hint)) =
(*hint_doesnt_match, self.code_trie_node_hint.as_ref())
{
Cow::Owned(
trie::nibbles_to_bytes_truncate(
hint.closest_ancestor_excluding.iter().copied(),
)
.collect::<Vec<_>>(),
)
} else {
Cow::Borrowed(&b":code"[..])
};
let sources_with_block = self
.sources_by_finalized_height
.range((self.warped_header_number, SourceId(usize::MIN))..)
.map(|(_, src_id)| src_id);
either::Left(sources_with_block.map(move |source_id| {
(
*source_id,
&self.sources[source_id.0].user_data,
DesiredRequest::StorageGetMerkleProof {
block_hash: self.warped_header_hash,
state_trie_root: self.warped_header_state_root,
keys: vec![code_key_to_request.to_vec(), b":heappages".to_vec()],
},
)
}))
} else {
either::Right(iter::empty())
};
let desired_body_download =
if let (WarpedBlockTy::Normal, BodyDownload::NotStarted, None, true, None) = (
&self.warped_block_ty,
&self.body_download,
self.warp_sync_fragments_download,
self.verify_queue.is_empty(),
desired_warp_sync_request.peek(),
) {
let sources_with_block = self
.sources_by_finalized_height
.range((self.warped_header_number, SourceId(usize::MIN))..)
.map(|(_, src_id)| src_id);
either::Left(sources_with_block.map(move |source_id| {
(
*source_id,
&self.sources[source_id.0].user_data,
DesiredRequest::BlockBodyDownload {
block_hash: self.warped_header_hash,
block_number: self.warped_header_number,
extrinsics_root: self.warped_header_extrinsics_root,
},
)
}))
} else {
either::Right(iter::empty())
};
let desired_call_proofs = if matches!(self.warped_block_ty, WarpedBlockTy::Normal)
&& self.warp_sync_fragments_download.is_none()
&& self.verify_queue.is_empty()
&& desired_warp_sync_request.peek().is_none()
{
either::Left(
self.runtime_calls
.iter()
.filter(|(_, v)| matches!(v, CallProof::NotStarted))
.map(|(call, _)| DesiredRequest::RuntimeCallMerkleProof {
block_hash: self.warped_header_hash,
function_name: call.function_name().into(),
parameter_vectored: Cow::Owned(call.parameter_vectored_vec()),
})
.flat_map(move |request_detail| {
let sources_with_block = self
.sources_by_finalized_height
.range((self.warped_header_number, SourceId(usize::MIN))..)
.map(|(_, src_id)| src_id);
sources_with_block.map(move |source_id| {
(
*source_id,
&self.sources[source_id.0].user_data,
request_detail.clone(),
)
})
}),
)
} else {
either::Right(iter::empty())
};
desired_warp_sync_request
.chain(desired_runtime_parameters_get)
.chain(desired_body_download)
.chain(desired_call_proofs)
}
pub fn add_request(
&mut self,
source_id: SourceId,
user_data: TRq,
detail: RequestDetail,
) -> RequestId {
assert!(self.sources.contains(source_id.0));
let request_slot = self.in_progress_requests.vacant_entry();
let request_id = RequestId(request_slot.key());
match (&detail, &mut self.runtime_download, &mut self.body_download) {
(RequestDetail::WarpSyncRequest { block_hash }, _, _)
if self.warp_sync_fragments_download.is_none()
&& *block_hash
== self
.verify_queue
.back()
.and_then(|entry| entry.fragments.last())
.map(|fragment| {
header::hash_from_scale_encoded_header(
&fragment.scale_encoded_header,
)
})
.unwrap_or(self.warped_header_hash) =>
{
self.warp_sync_fragments_download = Some(request_id);
}
(
RequestDetail::BlockBodyDownload {
block_hash,
block_number,
},
_,
BodyDownload::NotStarted,
) => {
if self.sources[source_id.0].finalized_block_height >= self.warped_header_number
&& *block_number == self.warped_header_number
&& *block_hash == self.warped_header_hash
{
self.body_download = BodyDownload::Downloading { request_id };
}
}
(
RequestDetail::StorageGetMerkleProof { block_hash, keys },
RuntimeDownload::NotStarted { hint_doesnt_match },
_,
) => {
let code_key_to_request = if let (false, Some(hint)) =
(*hint_doesnt_match, self.code_trie_node_hint.as_ref())
{
Cow::Owned(
trie::nibbles_to_bytes_truncate(
hint.closest_ancestor_excluding.iter().copied(),
)
.collect::<Vec<_>>(),
)
} else {
Cow::Borrowed(&b":code"[..])
};
if self.sources[source_id.0].finalized_block_height >= self.warped_header_number
&& *block_hash == self.warped_header_hash
&& keys.iter().any(|k| *k == *code_key_to_request)
&& keys.iter().any(|k| k == b":heappages")
{
self.runtime_download = RuntimeDownload::Downloading {
hint_doesnt_match: *hint_doesnt_match,
request_id,
};
}
}
(
RequestDetail::RuntimeCallMerkleProof {
block_hash,
function_name,
parameter_vectored,
},
_,
_,
) => {
for (info, status) in &mut self.runtime_calls {
if matches!(status, CallProof::NotStarted)
&& self.sources[source_id.0].finalized_block_height
>= self.warped_header_number
&& *block_hash == self.warped_header_hash
&& function_name == info.function_name()
&& parameters_equal(parameter_vectored, info.parameter_vectored())
{
*status = CallProof::Downloading(request_id);
break;
}
}
}
_ => {}
}
request_slot.insert((source_id, user_data, detail));
let _was_inserted = self
.in_progress_requests_by_source
.insert((source_id, request_id));
debug_assert!(_was_inserted);
request_id
}
pub fn remove_request(&mut self, id: RequestId) -> TRq {
if self.warp_sync_fragments_download == Some(id) {
self.warp_sync_fragments_download = None;
}
for call in self.runtime_calls.values_mut() {
if matches!(call, CallProof::Downloading(rq_id) if *rq_id == id) {
*call = CallProof::NotStarted;
}
}
if let RuntimeDownload::Downloading {
request_id,
hint_doesnt_match,
} = &mut self.runtime_download
{
if *request_id == id {
self.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
}
}
}
if let BodyDownload::Downloading { request_id } = &mut self.body_download {
if *request_id == id {
self.body_download = BodyDownload::NotStarted;
}
}
let (source_id, user_data, _) = self.in_progress_requests.remove(id.0);
let _was_removed = self.in_progress_requests_by_source.remove(&(source_id, id));
debug_assert!(_was_removed);
user_data
}
pub fn request_source_id(&self, request_id: RequestId) -> SourceId {
self.in_progress_requests[request_id.0].0
}
pub fn body_download_response(&mut self, id: RequestId, body: Vec<Vec<u8>>) -> TRq {
let (source_id, user_data) =
match (self.in_progress_requests.remove(id.0), &self.body_download) {
((source_id, user_data, _), BodyDownload::Downloading { request_id })
if *request_id == id =>
{
(source_id, user_data)
}
((source_id, user_data, RequestDetail::BlockBodyDownload { .. }), _) => {
let _was_removed = self.in_progress_requests_by_source.remove(&(source_id, id));
debug_assert!(_was_removed);
return user_data;
}
(
(
_,
_,
RequestDetail::RuntimeCallMerkleProof { .. }
| RequestDetail::WarpSyncRequest { .. }
| RequestDetail::StorageGetMerkleProof { .. },
),
_,
) => panic!(),
};
self.body_download = BodyDownload::Downloaded {
downloaded_source: Some(source_id),
body,
};
let _was_removed = self.in_progress_requests_by_source.remove(&(source_id, id));
debug_assert!(_was_removed);
user_data
}
pub fn storage_get_response(&mut self, id: RequestId, merkle_proof: Vec<u8>) -> TRq {
let (source_id, hint_doesnt_match, user_data) = match (
self.in_progress_requests.remove(id.0),
&self.runtime_download,
) {
(
(source_id, user_data, _),
RuntimeDownload::Downloading {
request_id,
hint_doesnt_match,
},
) if *request_id == id => (source_id, *hint_doesnt_match, user_data),
((source_id, user_data, RequestDetail::StorageGetMerkleProof { .. }), _) => {
let _was_removed = self.in_progress_requests_by_source.remove(&(source_id, id));
debug_assert!(_was_removed);
return user_data;
}
(
(
_,
_,
RequestDetail::RuntimeCallMerkleProof { .. }
| RequestDetail::WarpSyncRequest { .. }
| RequestDetail::BlockBodyDownload { .. },
),
_,
) => panic!(),
};
self.runtime_download = RuntimeDownload::NotVerified {
downloaded_source: Some(source_id),
hint_doesnt_match,
trie_proof: merkle_proof,
};
let _was_removed = self.in_progress_requests_by_source.remove(&(source_id, id));
debug_assert!(_was_removed);
user_data
}
pub fn runtime_call_merkle_proof_response(
&mut self,
request_id: RequestId,
response: Vec<u8>,
) -> TRq {
let (source_id, user_data, RequestDetail::RuntimeCallMerkleProof { .. }) =
self.in_progress_requests.remove(request_id.0)
else {
panic!()
};
for call in self.runtime_calls.values_mut() {
if matches!(call, CallProof::Downloading(rq_id) if *rq_id == request_id) {
*call = CallProof::Downloaded {
downloaded_source: Some(source_id),
proof: response,
};
break;
}
}
let _was_removed = self
.in_progress_requests_by_source
.remove(&(source_id, request_id));
debug_assert!(_was_removed);
user_data
}
pub fn warp_sync_request_response(
&mut self,
request_id: RequestId,
fragments: Vec<WarpSyncFragment>,
final_set_of_fragments: bool,
) -> TRq {
let (rq_source_id, user_data) = match self.in_progress_requests.remove(request_id.0) {
(rq_source_id, user_data, RequestDetail::WarpSyncRequest { .. }) => {
(rq_source_id, user_data)
}
(_, _, _) => panic!(),
};
debug_assert!(self.sources.contains(rq_source_id.0));
if let Some(last_header) = fragments
.last()
.and_then(|h| header::decode(&h.scale_encoded_header, self.block_number_bytes).ok())
{
let src_finalized_height = &mut self.sources[rq_source_id.0].finalized_block_height;
let new_height = if final_set_of_fragments {
last_header.number
} else {
cmp::max(*src_finalized_height, last_header.number.saturating_add(1))
};
if *src_finalized_height != new_height {
let _was_in = self
.sources_by_finalized_height
.remove(&(*src_finalized_height, rq_source_id));
debug_assert!(_was_in);
*src_finalized_height = new_height;
let _inserted = self
.sources_by_finalized_height
.insert((*src_finalized_height, rq_source_id));
debug_assert!(_inserted);
}
}
if self.warp_sync_fragments_download == Some(request_id) {
self.warp_sync_fragments_download = None;
self.verify_queue.push_back(PendingVerify {
final_set_of_fragments,
downloaded_source: Some(rq_source_id),
fragments,
next_fragment_to_verify_index: 0,
});
}
let _was_removed = self
.in_progress_requests_by_source
.remove(&(rq_source_id, request_id));
debug_assert!(_was_removed);
user_data
}
pub fn process_one(self) -> ProcessOne<TSrc, TRq> {
if matches!(self.runtime_download, RuntimeDownload::Verified { .. })
&& matches!(
self.body_download,
BodyDownload::NotNeeded | BodyDownload::Downloaded { .. }
)
&& self
.runtime_calls
.values()
.all(|c| matches!(c, CallProof::Downloaded { .. }))
{
return ProcessOne::BuildChainInformation(BuildChainInformation { inner: self });
}
if let RuntimeDownload::NotVerified { .. } = &self.runtime_download {
return ProcessOne::BuildRuntime(BuildRuntime { inner: self });
}
if !self.verify_queue.is_empty() {
return ProcessOne::VerifyWarpSyncFragment(VerifyWarpSyncFragment { inner: self });
}
ProcessOne::Idle(self)
}
}
impl<TSrc, TRq> ops::Index<SourceId> for WarpSync<TSrc, TRq> {
type Output = TSrc;
#[track_caller]
fn index(&self, source_id: SourceId) -> &TSrc {
debug_assert!(self.sources.contains(source_id.0));
&self.sources[source_id.0].user_data
}
}
impl<TSrc, TRq> ops::IndexMut<SourceId> for WarpSync<TSrc, TRq> {
#[track_caller]
fn index_mut(&mut self, source_id: SourceId) -> &mut TSrc {
debug_assert!(self.sources.contains(source_id.0));
&mut self.sources[source_id.0].user_data
}
}
impl<TSrc, TRq> ops::Index<RequestId> for WarpSync<TSrc, TRq> {
type Output = TRq;
#[track_caller]
fn index(&self, request_id: RequestId) -> &TRq {
debug_assert!(self.in_progress_requests.contains(request_id.0));
&self.in_progress_requests[request_id.0].1
}
}
impl<TSrc, TRq> ops::IndexMut<RequestId> for WarpSync<TSrc, TRq> {
#[track_caller]
fn index_mut(&mut self, request_id: RequestId) -> &mut TRq {
debug_assert!(self.in_progress_requests.contains(request_id.0));
&mut self.in_progress_requests[request_id.0].1
}
}
#[derive(Debug, Clone)]
pub enum DesiredRequest {
WarpSyncRequest {
block_hash: [u8; 32],
},
BlockBodyDownload {
block_hash: [u8; 32],
block_number: u64,
extrinsics_root: [u8; 32],
},
StorageGetMerkleProof {
block_hash: [u8; 32],
state_trie_root: [u8; 32],
keys: Vec<Vec<u8>>,
},
RuntimeCallMerkleProof {
block_hash: [u8; 32],
function_name: Cow<'static, str>,
parameter_vectored: Cow<'static, [u8]>,
},
}
#[derive(Debug, Clone)]
pub enum RequestDetail {
WarpSyncRequest {
block_hash: [u8; 32],
},
BlockBodyDownload {
block_hash: [u8; 32],
block_number: u64,
},
StorageGetMerkleProof {
block_hash: [u8; 32],
keys: Vec<Vec<u8>>,
},
RuntimeCallMerkleProof {
block_hash: [u8; 32],
function_name: Cow<'static, str>,
parameter_vectored: Cow<'static, [u8]>,
},
}
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct RequestId(usize);
pub enum ProcessOne<TSrc, TRq> {
Idle(WarpSync<TSrc, TRq>),
VerifyWarpSyncFragment(VerifyWarpSyncFragment<TSrc, TRq>),
BuildRuntime(BuildRuntime<TSrc, TRq>),
BuildChainInformation(BuildChainInformation<TSrc, TRq>),
}
pub struct VerifyWarpSyncFragment<TSrc, TRq> {
inner: WarpSync<TSrc, TRq>,
}
impl<TSrc, TRq> VerifyWarpSyncFragment<TSrc, TRq> {
pub fn proof_sender(&self) -> Option<(SourceId, &TSrc)> {
let entry_to_verify = self.inner.verify_queue.front().unwrap();
let source_id = entry_to_verify.downloaded_source?;
Some((source_id, &self.inner.sources[source_id.0].user_data))
}
pub fn verify(
mut self,
randomness_seed: [u8; 32],
) -> (
WarpSync<TSrc, TRq>,
Result<([u8; 32], u64), VerifyFragmentError>,
) {
debug_assert!(!self.inner.verify_queue.is_empty());
let fragments_to_verify = self
.inner
.verify_queue
.front_mut()
.unwrap_or_else(|| unreachable!());
if fragments_to_verify.fragments.is_empty() {
self.inner.verify_queue.pop_front().unwrap();
return (self.inner, Err(VerifyFragmentError::EmptyProof));
}
let fragment_to_verify = fragments_to_verify
.fragments
.get(fragments_to_verify.next_fragment_to_verify_index)
.unwrap_or_else(|| unreachable!());
let chain_information::ChainInformationFinality::Grandpa {
after_finalized_block_authorities_set_id,
finalized_triggered_authorities,
.. } = &mut self.inner.warped_finality
else {
unreachable!()
};
let fragment_header_hash =
header::hash_from_scale_encoded_header(&fragment_to_verify.scale_encoded_header);
let fragment_decoded_header = match header::decode(
&fragment_to_verify.scale_encoded_header,
self.inner.block_number_bytes,
) {
Ok(j) => j,
Err(err) => {
self.inner.verify_queue.clear();
self.inner.warp_sync_fragments_download = None;
return (self.inner, Err(VerifyFragmentError::InvalidHeader(err)));
}
};
let fragment_decoded_justification = match decode::decode_grandpa_justification(
&fragment_to_verify.scale_encoded_justification,
self.inner.block_number_bytes,
) {
Ok(j) => j,
Err(err) => {
self.inner.verify_queue.clear();
self.inner.warp_sync_fragments_download = None;
return (
self.inner,
Err(VerifyFragmentError::InvalidJustification(err)),
);
}
};
if fragment_decoded_header.number <= self.inner.warped_header_number {
self.inner.verify_queue.clear();
self.inner.warp_sync_fragments_download = None;
return (
self.inner,
Err(VerifyFragmentError::BlockNumberNotIncrementing),
);
}
if *fragment_decoded_justification.target_hash != fragment_header_hash
|| fragment_decoded_justification.target_number != fragment_decoded_header.number
{
let error = VerifyFragmentError::TargetHashMismatch {
justification_target_hash: *fragment_decoded_justification.target_hash,
justification_target_height: fragment_decoded_justification.target_number,
header_hash: fragment_header_hash,
header_height: fragment_decoded_header.number,
};
self.inner.verify_queue.clear();
self.inner.warp_sync_fragments_download = None;
return (self.inner, Err(error));
}
if let Err(err) = verify::verify_justification(verify::JustificationVerifyConfig {
justification: &fragment_to_verify.scale_encoded_justification,
block_number_bytes: self.inner.block_number_bytes,
authorities_list: finalized_triggered_authorities
.iter()
.map(|a| &a.public_key[..]),
authorities_set_id: *after_finalized_block_authorities_set_id,
randomness_seed,
}) {
self.inner.verify_queue.clear();
self.inner.warp_sync_fragments_download = None;
return (
self.inner,
Err(VerifyFragmentError::JustificationVerify(err)),
);
}
let new_authorities_list = fragment_decoded_header
.digest
.logs()
.find_map(|log_item| match log_item {
header::DigestItemRef::GrandpaConsensus(grandpa_log_item) => match grandpa_log_item
{
header::GrandpaConsensusLogRef::ScheduledChange(change)
| header::GrandpaConsensusLogRef::ForcedChange { change, .. } => {
Some(change.next_authorities)
}
_ => None,
},
_ => None,
})
.map(|next_authorities| {
next_authorities
.map(header::GrandpaAuthority::from)
.collect()
});
if new_authorities_list.is_none()
&& (!fragments_to_verify.final_set_of_fragments
|| fragments_to_verify.next_fragment_to_verify_index
!= fragments_to_verify.fragments.len() - 1)
{
self.inner.verify_queue.clear();
self.inner.warp_sync_fragments_download = None;
return (self.inner, Err(VerifyFragmentError::NonMinimalProof));
}
fragments_to_verify.next_fragment_to_verify_index += 1;
self.inner.warped_header_number = fragment_decoded_header.number;
self.inner.warped_header_state_root = *fragment_decoded_header.state_root;
self.inner.warped_header_extrinsics_root = *fragment_decoded_header.extrinsics_root;
self.inner.warped_header_hash = fragment_header_hash;
self.inner.warped_header = fragment_to_verify.scale_encoded_header.clone(); self.inner.warped_block_ty = WarpedBlockTy::Normal;
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: false,
};
if !matches!(self.inner.body_download, BodyDownload::NotNeeded) {
self.inner.body_download = BodyDownload::NotStarted;
}
self.inner.runtime_calls =
runtime_calls_default_value(self.inner.verified_chain_information.as_ref().consensus);
if let Some(new_authorities_list) = new_authorities_list {
*finalized_triggered_authorities = new_authorities_list;
*after_finalized_block_authorities_set_id += 1;
}
if fragments_to_verify.next_fragment_to_verify_index == fragments_to_verify.fragments.len()
{
self.inner.verify_queue.pop_front().unwrap();
}
let result = Ok((
self.inner.warped_header_hash,
self.inner.warped_header_number,
));
(self.inner, result)
}
}
#[derive(Debug)]
pub enum VerifyFragmentError {
JustificationVerify(verify::JustificationVerifyError),
TargetHashMismatch {
justification_target_hash: [u8; 32],
justification_target_height: u64,
header_hash: [u8; 32],
header_height: u64,
},
NonMinimalProof,
BlockNumberNotIncrementing,
EmptyProof,
InvalidHeader(header::Error),
InvalidJustification(decode::JustificationDecodeError),
}
impl fmt::Display for VerifyFragmentError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
VerifyFragmentError::JustificationVerify(err) => fmt::Display::fmt(err, f),
VerifyFragmentError::TargetHashMismatch {
justification_target_hash,
justification_target_height,
header_hash,
header_height,
} => {
write!(
f,
"Justification target (hash: {}, height: {}) doesn't match the associated header (hash: {}, height: {})",
HashDisplay(justification_target_hash),
justification_target_height,
HashDisplay(header_hash),
header_height,
)
}
VerifyFragmentError::NonMinimalProof => write!(
f,
"Warp sync proof fragment doesn't contain an authorities list change"
),
VerifyFragmentError::BlockNumberNotIncrementing => write!(
f,
"Warp sync proof header doesn't advance the warp syncing process"
),
VerifyFragmentError::EmptyProof => write!(f, "Warp sync proof is empty"),
VerifyFragmentError::InvalidHeader(err) => write!(f, "Failed to decode header: {err}"),
VerifyFragmentError::InvalidJustification(err) => {
write!(f, "Failed to decode justification: {err}")
}
}
}
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
#[display("{error}")]
pub struct SourceMisbehavior {
pub source_id: Option<SourceId>,
#[error(source)]
pub error: SourceMisbehaviorTy,
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum SourceMisbehaviorTy {
InvalidMerkleProof(proof_decode::Error),
MerkleProofEntriesMissing,
BlockBodyExtrinsicsRootMismatch,
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum BuildRuntimeError {
#[display("The chain doesn't include any storage item at `:code`")]
MissingCode,
#[display("Invalid heap pages value: {_0}")]
InvalidHeapPages(executor::InvalidHeapPagesError),
#[display("Error building the runtime: {_0}")]
RuntimeBuild(executor::host::NewErr),
SourceMisbehavior(SourceMisbehavior),
}
pub struct BuildRuntime<TSrc, TRq> {
inner: WarpSync<TSrc, TRq>,
}
impl<TSrc, TRq> BuildRuntime<TSrc, TRq> {
pub fn build(
mut self,
exec_hint: ExecHint,
allow_unresolved_imports: bool,
) -> (WarpSync<TSrc, TRq>, Result<(), BuildRuntimeError>) {
let RuntimeDownload::NotVerified {
downloaded_source,
hint_doesnt_match,
trie_proof,
} = &mut self.inner.runtime_download
else {
unreachable!()
};
let downloaded_runtime = mem::take(trie_proof);
let decoded_downloaded_runtime =
match proof_decode::decode_and_verify_proof(proof_decode::Config {
proof: &downloaded_runtime[..],
}) {
Ok(p) => p,
Err(err) => {
let downloaded_source = *downloaded_source;
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
};
return (
self.inner,
Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior {
source_id: downloaded_source,
error: SourceMisbehaviorTy::InvalidMerkleProof(err),
})),
);
}
};
let (
finalized_storage_code_merkle_value,
finalized_storage_code_closest_ancestor_excluding,
) = {
let code_nibbles = trie::bytes_to_nibbles(b":code".iter().copied()).collect::<Vec<_>>();
match decoded_downloaded_runtime.closest_ancestor_in_proof(
&self.inner.warped_header_state_root,
code_nibbles.iter().take(code_nibbles.len() - 1).copied(),
) {
Ok(Some(closest_ancestor_key)) => {
let closest_ancestor_key = closest_ancestor_key.collect::<Vec<_>>();
let next_nibble = code_nibbles[closest_ancestor_key.len()];
let merkle_value = decoded_downloaded_runtime
.trie_node_info(
&self.inner.warped_header_state_root,
closest_ancestor_key.iter().copied(),
)
.unwrap()
.children
.child(next_nibble)
.merkle_value();
match merkle_value {
Some(mv) => (mv.to_owned(), closest_ancestor_key),
None => {
self.inner.warped_block_ty = WarpedBlockTy::KnownBad;
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
};
if !matches!(self.inner.body_download, BodyDownload::NotNeeded) {
self.inner.body_download = BodyDownload::NotStarted;
}
return (self.inner, Err(BuildRuntimeError::MissingCode));
}
}
}
Ok(None) => {
self.inner.warped_block_ty = WarpedBlockTy::KnownBad;
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
};
if !matches!(self.inner.body_download, BodyDownload::NotNeeded) {
self.inner.body_download = BodyDownload::NotStarted;
}
return (self.inner, Err(BuildRuntimeError::MissingCode));
}
Err(proof_decode::IncompleteProofError { .. }) => {
let downloaded_source = *downloaded_source;
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
};
if !matches!(self.inner.body_download, BodyDownload::NotNeeded) {
self.inner.body_download = BodyDownload::NotStarted;
}
return (
self.inner,
Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior {
source_id: downloaded_source,
error: SourceMisbehaviorTy::MerkleProofEntriesMissing,
})),
);
}
}
};
let finalized_storage_code = if let (false, Some(hint)) =
(*hint_doesnt_match, self.inner.code_trie_node_hint.as_ref())
{
if hint.merkle_value == finalized_storage_code_merkle_value {
&hint.storage_value
} else {
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: true,
};
return (self.inner, Ok(()));
}
} else {
match decoded_downloaded_runtime
.storage_value(&self.inner.warped_header_state_root, b":code")
{
Ok(Some((code, _))) => code,
Ok(None) => {
self.inner.warped_block_ty = WarpedBlockTy::KnownBad;
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
};
if !matches!(self.inner.body_download, BodyDownload::NotNeeded) {
self.inner.body_download = BodyDownload::NotStarted;
}
return (self.inner, Err(BuildRuntimeError::MissingCode));
}
Err(proof_decode::IncompleteProofError { .. }) => {
let downloaded_source = *downloaded_source;
return (
self.inner,
Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior {
source_id: downloaded_source,
error: SourceMisbehaviorTy::MerkleProofEntriesMissing,
})),
);
}
}
};
let finalized_storage_heappages = match decoded_downloaded_runtime
.storage_value(&self.inner.warped_header_state_root, b":heappages")
{
Ok(val) => val.map(|(v, _)| v),
Err(proof_decode::IncompleteProofError { .. }) => {
let downloaded_source = *downloaded_source;
return (
self.inner,
Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior {
source_id: downloaded_source,
error: SourceMisbehaviorTy::MerkleProofEntriesMissing,
})),
);
}
};
let decoded_heap_pages =
match executor::storage_heap_pages_to_value(finalized_storage_heappages) {
Ok(hp) => hp,
Err(err) => {
self.inner.warped_block_ty = WarpedBlockTy::KnownBad;
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
};
if !matches!(self.inner.body_download, BodyDownload::NotNeeded) {
self.inner.body_download = BodyDownload::NotStarted;
}
return (self.inner, Err(BuildRuntimeError::InvalidHeapPages(err)));
}
};
let runtime = match HostVmPrototype::new(host::Config {
module: &finalized_storage_code,
heap_pages: decoded_heap_pages,
exec_hint,
allow_unresolved_imports,
}) {
Ok(runtime) => runtime,
Err(err) => {
self.inner.warped_block_ty = WarpedBlockTy::KnownBad;
self.inner.runtime_download = RuntimeDownload::NotStarted {
hint_doesnt_match: *hint_doesnt_match,
};
if !matches!(self.inner.body_download, BodyDownload::NotNeeded) {
self.inner.body_download = BodyDownload::NotStarted;
}
return (self.inner, Err(BuildRuntimeError::RuntimeBuild(err)));
}
};
let chain_info_builder = chain_information::build::ChainInformationBuild::new(
chain_information::build::Config {
finalized_block_header: chain_information::build::ConfigFinalizedBlockHeader::Any {
scale_encoded_header: self.inner.warped_header.clone(),
known_finality: if self.inner.download_all_chain_information_storage_proofs {
None
} else {
Some(self.inner.warped_finality.clone())
},
},
block_number_bytes: self.inner.block_number_bytes,
runtime,
},
);
if let chain_information::build::ChainInformationBuild::InProgress(in_progress) =
&chain_info_builder
{
for call in in_progress.remaining_calls() {
if let hashbrown::hash_map::Entry::Vacant(entry) =
self.inner.runtime_calls.entry(call)
{
entry.insert(CallProof::NotStarted);
}
}
}
self.inner.runtime_download = RuntimeDownload::Verified {
downloaded_runtime: DownloadedRuntime {
storage_code: Some(finalized_storage_code.to_vec()),
storage_heap_pages: finalized_storage_heappages.map(|v| v.to_vec()),
code_merkle_value: Some(finalized_storage_code_merkle_value),
closest_ancestor_excluding: Some(finalized_storage_code_closest_ancestor_excluding),
},
chain_info_builder,
};
(self.inner, Ok(()))
}
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum BuildChainInformationError {
#[display("Error building the chain information: {_0}")]
ChainInformationBuild(chain_information::build::Error),
SourceMisbehavior(SourceMisbehavior),
}
pub struct BuildChainInformation<TSrc, TRq> {
inner: WarpSync<TSrc, TRq>,
}
impl<TSrc, TRq> BuildChainInformation<TSrc, TRq> {
pub fn build(
mut self,
) -> (
WarpSync<TSrc, TRq>,
Result<RuntimeInformation, BuildChainInformationError>,
) {
let downloaded_body = match &mut self.inner.body_download {
BodyDownload::NotNeeded => None,
BodyDownload::Downloaded {
downloaded_source,
body,
} => {
if header::extrinsics_root(body) != self.inner.warped_header_extrinsics_root {
let source_id = *downloaded_source;
self.inner.body_download = BodyDownload::NotStarted;
return (
self.inner,
Err(BuildChainInformationError::SourceMisbehavior(
SourceMisbehavior {
source_id,
error: SourceMisbehaviorTy::BlockBodyExtrinsicsRootMismatch,
},
)),
);
}
Some(body)
}
_ => unreachable!(),
};
let RuntimeDownload::Verified {
mut chain_info_builder,
downloaded_runtime,
..
} = mem::replace(
&mut self.inner.runtime_download,
RuntimeDownload::NotStarted {
hint_doesnt_match: false,
},
)
else {
unreachable!()
};
let runtime_calls = mem::take(&mut self.inner.runtime_calls);
debug_assert!(
runtime_calls
.values()
.all(|c| matches!(c, CallProof::Downloaded { .. }))
);
let calls = {
let mut decoded_proofs = hashbrown::HashMap::with_capacity_and_hasher(
runtime_calls.len(),
fnv::FnvBuildHasher::default(),
);
for (call, proof) in runtime_calls {
let CallProof::Downloaded {
proof,
downloaded_source,
} = proof
else {
unreachable!()
};
let decoded_proof =
match proof_decode::decode_and_verify_proof(proof_decode::Config {
proof: proof.into_iter(),
}) {
Ok(d) => d,
Err(err) => {
return (
self.inner,
Err(BuildChainInformationError::SourceMisbehavior(
SourceMisbehavior {
source_id: downloaded_source,
error: SourceMisbehaviorTy::InvalidMerkleProof(err),
},
)),
);
}
};
decoded_proofs.insert(call, (decoded_proof, downloaded_source));
}
decoded_proofs
};
loop {
let in_progress = match chain_info_builder {
chain_information::build::ChainInformationBuild::Finished {
result: Ok(chain_information),
virtual_machine,
} => {
if self.inner.warped_header_number
== chain_information.as_ref().finalized_block_header.number
{
self.inner.warped_block_ty = WarpedBlockTy::AlreadyVerified;
}
let finalized_body = downloaded_body.map(mem::take);
if !matches!(self.inner.body_download, BodyDownload::NotNeeded) {
self.inner.body_download = BodyDownload::NotStarted;
}
self.inner.verified_chain_information = chain_information;
self.inner.runtime_calls = runtime_calls_default_value(
self.inner.verified_chain_information.as_ref().consensus,
);
return (
self.inner,
Ok(RuntimeInformation {
finalized_runtime: virtual_machine,
finalized_body,
finalized_storage_code: downloaded_runtime.storage_code,
finalized_storage_heap_pages: downloaded_runtime.storage_heap_pages,
finalized_storage_code_merkle_value: downloaded_runtime
.code_merkle_value,
finalized_storage_code_closest_ancestor_excluding: downloaded_runtime
.closest_ancestor_excluding,
}),
);
}
chain_information::build::ChainInformationBuild::Finished {
result: Err(err),
..
} => {
self.inner.warped_block_ty = WarpedBlockTy::KnownBad;
return (
self.inner,
Err(BuildChainInformationError::ChainInformationBuild(err)),
);
}
chain_information::build::ChainInformationBuild::InProgress(in_progress) => {
in_progress
}
};
chain_info_builder = match in_progress {
chain_information::build::InProgress::StorageGet(get) => {
let (proof, downloaded_source) = calls.get(&get.call_in_progress()).unwrap();
let value = match proof
.storage_value(&self.inner.warped_header_state_root, get.key().as_ref())
{
Ok(v) => v,
Err(proof_decode::IncompleteProofError { .. }) => {
return (
self.inner,
Err(BuildChainInformationError::SourceMisbehavior(
SourceMisbehavior {
source_id: *downloaded_source,
error: SourceMisbehaviorTy::MerkleProofEntriesMissing,
},
)),
);
}
};
get.inject_value(value.map(|(val, ver)| (iter::once(val), ver)))
}
chain_information::build::InProgress::NextKey(nk) => {
let (proof, downloaded_source) = calls.get(&nk.call_in_progress()).unwrap();
let value = match proof.next_key(
&self.inner.warped_header_state_root,
nk.key(),
nk.or_equal(),
nk.prefix(),
nk.branch_nodes(),
) {
Ok(v) => v,
Err(proof_decode::IncompleteProofError { .. }) => {
return (
self.inner,
Err(BuildChainInformationError::SourceMisbehavior(
SourceMisbehavior {
source_id: *downloaded_source,
error: SourceMisbehaviorTy::MerkleProofEntriesMissing,
},
)),
);
}
};
nk.inject_key(value)
}
chain_information::build::InProgress::ClosestDescendantMerkleValue(mv) => {
let (proof, downloaded_source) = calls.get(&mv.call_in_progress()).unwrap();
let value = match proof.closest_descendant_merkle_value(
&self.inner.warped_header_state_root,
mv.key(),
) {
Ok(v) => v,
Err(proof_decode::IncompleteProofError { .. }) => {
return (
self.inner,
Err(BuildChainInformationError::SourceMisbehavior(
SourceMisbehavior {
source_id: *downloaded_source,
error: SourceMisbehaviorTy::MerkleProofEntriesMissing,
},
)),
);
}
};
mv.inject_merkle_value(value)
}
};
}
}
}
fn parameters_equal(mut a: &[u8], b: impl Iterator<Item = impl AsRef<[u8]>>) -> bool {
for slice in b {
let slice = slice.as_ref();
if a.len() < slice.len() {
return false;
}
if &a[..slice.len()] != slice {
return false;
}
a = &a[slice.len()..];
}
true
}