#![allow(dead_code)]
use super::{disjoint, sources};
use alloc::{collections::BTreeSet, vec::Vec};
use core::{iter, num::NonZero, ops};
pub use disjoint::TreeRoot;
pub use sources::SourceId;
#[derive(Debug)]
pub struct Config {
pub blocks_capacity: usize,
pub sources_capacity: usize,
pub finalized_block_height: u64,
pub download_bodies: bool,
pub max_requests_per_block: NonZero<u32>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum UnverifiedBlockState {
HeightHash,
Header {
parent_hash: [u8; 32],
},
HeaderBody {
parent_hash: [u8; 32],
},
}
impl UnverifiedBlockState {
pub fn parent_hash(&self) -> Option<&[u8; 32]> {
match self {
UnverifiedBlockState::HeightHash => None,
UnverifiedBlockState::Header { parent_hash } => Some(parent_hash),
UnverifiedBlockState::HeaderBody { parent_hash } => Some(parent_hash),
}
}
}
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct RequestId(usize);
pub struct PendingBlocks<TBl, TRq, TSrc> {
sources: sources::AllForksSources<Source<TSrc>>,
blocks: disjoint::DisjointBlocks<UnverifiedBlock<TBl>>,
download_bodies: bool,
blocks_requests: BTreeSet<(u64, [u8; 32], RequestId)>,
requested_blocks: BTreeSet<(RequestId, u64, [u8; 32])>,
source_occupations: BTreeSet<(SourceId, RequestId)>,
requests: slab::Slab<Request<TRq>>,
max_requests_per_block: usize,
}
struct UnverifiedBlock<TBl> {
state: UnverifiedBlockState,
user_data: TBl,
}
struct Request<TRq> {
detail: RequestParams,
source_id: SourceId,
user_data: TRq,
}
#[derive(Debug)]
struct Source<TSrc> {
user_data: TSrc,
}
impl<TBl, TRq, TSrc> PendingBlocks<TBl, TRq, TSrc> {
pub fn new(config: Config) -> Self {
PendingBlocks {
sources: sources::AllForksSources::new(
config.sources_capacity,
config.finalized_block_height,
),
blocks: disjoint::DisjointBlocks::with_capacity(config.blocks_capacity),
download_bodies: config.download_bodies,
blocks_requests: Default::default(),
requested_blocks: Default::default(),
source_occupations: Default::default(),
requests: slab::Slab::with_capacity(
config.blocks_capacity
* usize::try_from(config.max_requests_per_block.get()).unwrap_or(usize::MAX),
),
max_requests_per_block: usize::try_from(config.max_requests_per_block.get())
.unwrap_or(usize::MAX),
}
}
pub fn downloading_bodies(&self) -> bool {
self.download_bodies
}
pub fn add_source(
&mut self,
user_data: TSrc,
best_block_number: u64,
best_block_hash: [u8; 32],
) -> SourceId {
self.sources
.add_source(best_block_number, best_block_hash, Source { user_data })
}
pub fn remove_source(
&mut self,
source_id: SourceId,
) -> (TSrc, impl Iterator<Item = (RequestId, RequestParams, TRq)>) {
let user_data = self.sources.remove(source_id);
let source_occupations_entries = self
.source_occupations
.range((source_id, RequestId(usize::MIN))..=(source_id, RequestId(usize::MAX)))
.copied()
.collect::<Vec<_>>();
let mut pending_requests = Vec::new();
for (_source_id, pending_request_id) in source_occupations_entries {
debug_assert_eq!(source_id, _source_id);
debug_assert!(self.requests.contains(pending_request_id.0));
let request = self.requests.remove(pending_request_id.0);
let _was_in = self
.source_occupations
.remove(&(source_id, pending_request_id));
debug_assert!(_was_in);
let _was_in = self.blocks_requests.remove(&(
request.detail.first_block_height,
request.detail.first_block_hash,
pending_request_id,
));
debug_assert!(_was_in);
let _was_in = self.requested_blocks.remove(&(
pending_request_id,
request.detail.first_block_height,
request.detail.first_block_hash,
));
debug_assert!(_was_in);
pending_requests.push((pending_request_id, request.detail, request.user_data));
}
debug_assert_eq!(self.source_occupations.len(), self.requests.len());
(user_data.user_data, pending_requests.into_iter())
}
pub fn sources(&self) -> impl ExactSizeIterator<Item = SourceId> {
self.sources.keys()
}
pub fn sources_user_data_iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut TSrc> {
self.sources.user_data_iter_mut().map(|s| &mut s.user_data)
}
pub fn add_known_block_to_source(&mut self, source_id: SourceId, height: u64, hash: [u8; 32]) {
self.sources.add_known_block(source_id, height, hash);
}
pub fn remove_known_block_of_source(
&mut self,
source_id: SourceId,
height: u64,
hash: &[u8; 32],
) {
self.sources
.source_remove_known_block(source_id, height, hash);
}
pub fn add_known_block_to_source_and_set_best(
&mut self,
source_id: SourceId,
height: u64,
hash: [u8; 32],
) {
self.sources
.add_known_block_and_set_best(source_id, height, hash);
}
pub fn source_best_block(&self, source_id: SourceId) -> (u64, &[u8; 32]) {
self.sources.best_block(source_id)
}
pub fn source_num_ongoing_requests(&self, source_id: SourceId) -> usize {
self.source_occupations
.range((source_id, RequestId(usize::MIN))..=(source_id, RequestId(usize::MAX)))
.count()
}
pub fn knows_non_finalized_block<'a>(
&'a self,
height: u64,
hash: &[u8; 32],
) -> impl Iterator<Item = SourceId> + use<'a, TBl, TRq, TSrc> {
self.sources.knows_non_finalized_block(height, hash)
}
pub fn source_knows_non_finalized_block(
&self,
source_id: SourceId,
height: u64,
hash: &[u8; 32],
) -> bool {
self.sources
.source_knows_non_finalized_block(source_id, height, hash)
}
pub fn set_finalized_block_height(
&mut self,
height: u64,
) -> impl ExactSizeIterator<Item = TBl> + use<TBl, TRq, TSrc> {
self.sources.set_finalized_block_height(height);
self.blocks
.remove_below_height(height + 1)
.map(|(_, _, bl)| bl.user_data)
}
pub fn insert_unverified_block(
&mut self,
height: u64,
hash: [u8; 32],
state: UnverifiedBlockState,
user_data: TBl,
) -> Option<(TBl, UnverifiedBlockState)> {
if height <= self.sources.finalized_block_height() {
return None;
}
let parent_hash = state.parent_hash().copied();
self.blocks
.insert(
height,
hash,
parent_hash,
UnverifiedBlock { state, user_data },
)
.map(|b| (b.user_data, b.state))
}
pub fn contains_unverified_block(&self, height: u64, hash: &[u8; 32]) -> bool {
self.blocks.contains(height, hash)
}
pub fn unverified_block_user_data(&self, height: u64, hash: &[u8; 32]) -> &TBl {
&self.blocks.user_data(height, hash).unwrap().user_data
}
pub fn unverified_block_user_data_mut(&mut self, height: u64, hash: &[u8; 32]) -> &mut TBl {
&mut self.blocks.user_data_mut(height, hash).unwrap().user_data
}
pub fn set_unverified_block_state(
&mut self,
height: u64,
hash: &[u8; 32],
state: UnverifiedBlockState,
) {
if let Some(parent_hash) = state.parent_hash() {
self.blocks.set_parent_hash(height, hash, *parent_hash);
}
self.blocks.user_data_mut(height, hash).unwrap().state = state;
}
pub fn set_unverified_block_header_known(
&mut self,
height: u64,
hash: &[u8; 32],
parent_hash: [u8; 32],
) {
let curr = &mut self.blocks.user_data_mut(height, hash).unwrap().state;
match curr {
UnverifiedBlockState::Header {
parent_hash: cur_ph,
}
| UnverifiedBlockState::HeaderBody {
parent_hash: cur_ph,
} if *cur_ph == parent_hash => return,
UnverifiedBlockState::Header { .. } | UnverifiedBlockState::HeaderBody { .. } => {
panic!()
}
UnverifiedBlockState::HeightHash => {}
}
*curr = UnverifiedBlockState::Header { parent_hash };
self.blocks.set_parent_hash(height, hash, parent_hash);
}
pub fn set_unverified_block_header_body_known(
&mut self,
height: u64,
hash: &[u8; 32],
parent_hash: [u8; 32],
) {
let curr = &mut self.blocks.user_data_mut(height, hash).unwrap().state;
match curr {
UnverifiedBlockState::Header {
parent_hash: cur_ph,
} if *cur_ph == parent_hash => {}
UnverifiedBlockState::HeaderBody {
parent_hash: cur_ph,
} if *cur_ph == parent_hash => return,
UnverifiedBlockState::Header { .. } | UnverifiedBlockState::HeaderBody { .. } => {
panic!()
}
UnverifiedBlockState::HeightHash => {}
}
*curr = UnverifiedBlockState::HeaderBody { parent_hash };
self.blocks.set_parent_hash(height, hash, parent_hash);
}
pub fn remove_sources_known_block(&mut self, height: u64, hash: &[u8; 32]) {
self.sources.remove_known_block(height, hash);
}
pub fn remove_unverified_block(&mut self, height: u64, hash: &[u8; 32]) -> TBl {
self.blocks.remove(height, hash).user_data
}
#[track_caller]
pub fn mark_unverified_block_as_bad(&mut self, height: u64, hash: &[u8; 32]) {
self.blocks.set_block_bad(height, hash);
}
pub fn num_unverified_blocks(&self) -> usize {
self.blocks.len()
}
pub fn unverified_leaves(&self) -> impl Iterator<Item = TreeRoot> {
self.blocks.good_tree_roots().filter(move |pending| {
match self
.blocks
.user_data(pending.block_number, &pending.block_hash)
.unwrap()
.state
{
UnverifiedBlockState::HeightHash => false,
UnverifiedBlockState::Header { .. } => !self.download_bodies,
UnverifiedBlockState::HeaderBody { .. } => true,
}
})
}
pub fn unnecessary_unverified_blocks(&self) -> impl Iterator<Item = (u64, &[u8; 32])> {
let bad_parent_iter = self
.blocks
.iter()
.filter(|(height, hash, _)| self.blocks.is_parent_bad(*height, hash).unwrap_or(false));
let parent_known_iter = self.blocks.iter().filter(|(height, hash, _)| {
match (
height.checked_sub(1),
self.blocks.parent_hash(*height, hash),
) {
(Some(n), Some(h)) => self.blocks.contains(n, h),
_ => false,
}
});
let bad_iter = self
.blocks
.iter()
.filter(|(height, hash, _)| self.blocks.is_bad(*height, hash).unwrap())
.filter(|(height, hash, _)| !self.blocks.is_parent_bad(*height, hash).unwrap_or(false));
bad_parent_iter
.chain(parent_known_iter)
.chain(bad_iter)
.map(|(height, hash, _)| (height, hash))
.filter(|(height, hash)| {
!self
.sources
.keys()
.any(|source_id| self.sources.best_block(source_id) == (*height, hash))
})
}
pub fn add_request(
&mut self,
source_id: SourceId,
detail: RequestParams,
user_data: TRq,
) -> RequestId {
assert!(self.sources.contains(source_id));
let request_id = RequestId(self.requests.insert(Request {
detail,
source_id,
user_data,
}));
let _was_inserted = self.source_occupations.insert((source_id, request_id));
debug_assert!(_was_inserted);
debug_assert_eq!(self.source_occupations.len(), self.requests.len());
let mut iter = (detail.first_block_height, detail.first_block_hash);
loop {
self.blocks_requests.insert((iter.0, iter.1, request_id));
self.requested_blocks.insert((request_id, iter.0, iter.1));
match self.blocks.parent_hash(iter.0, &iter.1) {
Some(p) => iter = (iter.0 - 1, *p),
None => break,
}
}
request_id
}
#[track_caller]
pub fn remove_request(&mut self, request_id: RequestId) -> (RequestParams, SourceId, TRq) {
assert!(self.requests.contains(request_id.0));
let request = self.requests.remove(request_id.0);
let blocks_to_remove = self
.requested_blocks
.range((request_id, u64::MIN, [0; 32])..=(request_id, u64::MAX, [0xff; 32]))
.cloned()
.collect::<Vec<_>>();
for (request_id, block_height, block_hash) in blocks_to_remove {
let _was_in = self
.blocks_requests
.remove(&(block_height, block_hash, request_id));
debug_assert!(_was_in);
let _was_in = self
.requested_blocks
.remove(&(request_id, block_height, block_hash));
debug_assert!(_was_in);
}
let _was_in = self
.source_occupations
.remove(&(request.source_id, request_id));
debug_assert!(_was_in);
debug_assert_eq!(self.source_occupations.len(), self.requests.len());
debug_assert_eq!(self.blocks_requests.len(), self.requested_blocks.len());
(request.detail, request.source_id, request.user_data)
}
#[track_caller]
pub fn request_source_id(&self, request_id: RequestId) -> SourceId {
self.requests.get(request_id.0).unwrap().source_id
}
pub fn obsolete_requests(&self) -> impl Iterator<Item = (RequestId, &TRq)> {
self.requests
.iter()
.filter(move |(_, rq)| {
rq.detail.first_block_height <= self.sources.finalized_block_height()
})
.map(|(id, rq)| (RequestId(id), &rq.user_data))
}
pub fn desired_requests(&self) -> impl Iterator<Item = DesiredRequest> {
self.desired_requests_inner(None)
}
pub fn source_desired_requests(
&self,
source_id: SourceId,
) -> impl Iterator<Item = RequestParams> {
self.desired_requests_inner(Some(source_id)).map(move |rq| {
debug_assert_eq!(rq.source_id, source_id);
rq.request_params
})
}
fn desired_requests_inner(
&self,
force_source: Option<SourceId>,
) -> impl Iterator<Item = DesiredRequest> {
let unknown_body_iter = if self.download_bodies {
either::Left(
self.blocks
.iter()
.filter(move |(_, _, block_info)| {
matches!(&block_info.state, UnverifiedBlockState::Header { .. })
})
.map(|(height, hash, _)| (height, hash)),
)
} else {
either::Right(iter::empty())
};
let unknown_header_iter = self
.blocks
.unknown_blocks()
.filter(move |(unknown_block_height, _)| {
*unknown_block_height > self.sources.finalized_block_height()
})
.inspect(move |(unknown_block_height, unknown_block_hash)| {
debug_assert!(match self
.blocks
.user_data(*unknown_block_height, unknown_block_hash)
.map(|ud| &ud.state)
{
None | Some(UnverifiedBlockState::HeightHash) => true,
Some(
UnverifiedBlockState::Header { .. }
| UnverifiedBlockState::HeaderBody { .. },
) => false,
});
});
unknown_body_iter
.map(|(n, h)| (n, h, false))
.chain(unknown_header_iter.map(|(n, h)| (n, h, true)))
.filter(move |(unknown_block_height, unknown_block_hash, _)| {
let num_existing_requests = self
.blocks_requests
.range(
(
*unknown_block_height,
**unknown_block_hash,
RequestId(usize::MIN),
)
..=(
*unknown_block_height,
**unknown_block_hash,
RequestId(usize::MAX),
),
)
.count();
num_existing_requests < self.max_requests_per_block
})
.flat_map(
move |(unknown_block_height, unknown_block_hash, download_many)| {
let possible_sources =
if let Some(force_source) = force_source {
either::Left(iter::once(force_source).filter(move |id| {
self.sources.source_knows_non_finalized_block(
*id,
unknown_block_height,
unknown_block_hash,
)
}))
} else {
either::Right(self.sources.knows_non_finalized_block(
unknown_block_height,
unknown_block_hash,
))
};
possible_sources
.filter(move |source_id| {
!self
.blocks_requests
.range(
(
unknown_block_height,
*unknown_block_hash,
RequestId(usize::MIN),
)
..=(
unknown_block_height,
*unknown_block_hash,
RequestId(usize::MAX),
),
)
.any(|(_, _, request_id)| {
self.requests[request_id.0].source_id == *source_id
})
})
.map(move |source_id| {
debug_assert!(self.sources.source_knows_non_finalized_block(
source_id,
unknown_block_height,
unknown_block_hash
));
DesiredRequest {
source_id,
request_params: RequestParams {
first_block_hash: *unknown_block_hash,
first_block_height: unknown_block_height,
num_blocks: NonZero::<u64>::new(if download_many {
unknown_block_height - self.sources.finalized_block_height()
} else {
1
})
.unwrap(),
},
}
})
},
)
}
}
impl<TBl, TRq, TSrc> ops::Index<SourceId> for PendingBlocks<TBl, TRq, TSrc> {
type Output = TSrc;
#[track_caller]
fn index(&self, id: SourceId) -> &TSrc {
&self.sources[id].user_data
}
}
impl<TBl, TRq, TSrc> ops::IndexMut<SourceId> for PendingBlocks<TBl, TRq, TSrc> {
#[track_caller]
fn index_mut(&mut self, id: SourceId) -> &mut TSrc {
&mut self.sources[id].user_data
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DesiredRequest {
pub source_id: SourceId,
pub request_params: RequestParams,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RequestParams {
pub first_block_height: u64,
pub first_block_hash: [u8; 32],
pub num_blocks: NonZero<u64>,
}