use crate::chain::fork_tree;
use alloc::vec::Vec;
use core::{cmp, mem, ops, time::Duration};
pub use fork_tree::NodeIndex;
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct AsyncOpId(u64);
#[derive(Debug)]
pub enum NextNecessaryAsyncOp<TNow> {
Ready(AsyncOpParams),
NotReady { when: Option<TNow> },
}
#[derive(Debug)]
pub struct AsyncOpParams {
pub id: AsyncOpId,
pub block_index: NodeIndex,
}
pub struct Config<TAsync> {
pub finalized_async_user_data: TAsync,
pub retry_after_failed: Duration,
pub blocks_capacity: usize,
}
pub struct AsyncTree<TNow, TBl, TAsync> {
non_finalized_blocks: fork_tree::ForkTree<Block<TNow, TBl, TAsync>>,
output_finalized_async_user_data: TAsync,
output_best_block_index: Option<fork_tree::NodeIndex>,
input_finalized_index: Option<fork_tree::NodeIndex>,
input_best_block_index: Option<fork_tree::NodeIndex>,
input_best_block_next_weight: u32,
output_finalized_block_weight: u32,
next_async_op_id: AsyncOpId,
retry_after_failed: Duration,
}
impl<TNow, TBl, TAsync> AsyncTree<TNow, TBl, TAsync>
where
TNow: Clone + ops::Add<Duration, Output = TNow> + Ord,
TAsync: Clone,
{
pub fn new(config: Config<TAsync>) -> Self {
AsyncTree {
output_best_block_index: None,
output_finalized_async_user_data: config.finalized_async_user_data,
non_finalized_blocks: fork_tree::ForkTree::with_capacity(config.blocks_capacity),
input_finalized_index: None,
input_best_block_index: None,
input_best_block_next_weight: 2,
output_finalized_block_weight: 1, next_async_op_id: AsyncOpId(0),
retry_after_failed: config.retry_after_failed,
}
}
pub fn num_input_non_finalized_blocks(&self) -> usize {
self.non_finalized_blocks.len()
}
pub fn map_async_op_user_data<TAsync2>(
self,
mut map: impl FnMut(TAsync) -> TAsync2,
) -> AsyncTree<TNow, TBl, TAsync2> {
AsyncTree {
output_best_block_index: self.output_best_block_index,
output_finalized_async_user_data: map(self.output_finalized_async_user_data),
non_finalized_blocks: self.non_finalized_blocks.map(move |block| Block {
async_op: match block.async_op {
AsyncOpState::Finished {
user_data,
reported,
} => AsyncOpState::Finished {
user_data: map(user_data),
reported,
},
AsyncOpState::InProgress {
async_op_id,
timeout,
} => AsyncOpState::InProgress {
async_op_id,
timeout,
},
AsyncOpState::Pending {
same_as_parent,
timeout,
} => AsyncOpState::Pending {
same_as_parent,
timeout,
},
},
input_best_block_weight: block.input_best_block_weight,
user_data: block.user_data,
}),
input_finalized_index: self.input_finalized_index,
input_best_block_index: self.input_best_block_index,
input_best_block_next_weight: self.input_best_block_next_weight,
output_finalized_block_weight: self.output_finalized_block_weight,
next_async_op_id: self.next_async_op_id,
retry_after_failed: self.retry_after_failed,
}
}
pub fn output_best_block_index(&self) -> Option<(NodeIndex, &TAsync)> {
self.output_best_block_index.map(|best_block_index| {
(
best_block_index,
match &self
.non_finalized_blocks
.get(best_block_index)
.unwrap()
.async_op
{
AsyncOpState::Finished {
reported: true,
user_data,
} => user_data,
_ => unreachable!(),
},
)
})
}
pub fn output_finalized_async_user_data(&self) -> &TAsync {
&self.output_finalized_async_user_data
}
pub fn block_async_user_data(&self, node_index: NodeIndex) -> Option<&TAsync> {
match &self.non_finalized_blocks.get(node_index).unwrap().async_op {
AsyncOpState::Finished { user_data, .. } => Some(user_data),
_ => None,
}
}
pub fn block_async_user_data_mut(&mut self, node_index: NodeIndex) -> Option<&mut TAsync> {
match &mut self
.non_finalized_blocks
.get_mut(node_index)
.unwrap()
.async_op
{
AsyncOpState::Finished { user_data, .. } => Some(user_data),
_ => None,
}
}
pub fn parent(&self, node: NodeIndex) -> Option<NodeIndex> {
self.non_finalized_blocks.parent(node)
}
pub fn ancestors(&self, node: NodeIndex) -> impl Iterator<Item = NodeIndex> {
self.non_finalized_blocks.ancestors(node)
}
pub fn children(&self, node: Option<NodeIndex>) -> impl Iterator<Item = NodeIndex> {
self.non_finalized_blocks.children(node)
}
pub fn input_best_block_index(&self) -> Option<NodeIndex> {
self.input_best_block_index
}
pub fn input_output_iter_ancestry_order(
&self,
) -> impl Iterator<Item = InputIterItem<'_, TBl, TAsync>> {
self.non_finalized_blocks
.iter_ancestry_order()
.map(move |(id, b)| {
let async_op_user_data = match &b.async_op {
AsyncOpState::Finished {
reported: true,
user_data,
} => Some(user_data),
_ => None,
};
InputIterItem {
id,
user_data: &b.user_data,
async_op_user_data,
is_output_best: self.output_best_block_index == Some(id),
}
})
}
pub fn input_output_iter_unordered(
&self,
) -> impl Iterator<Item = InputIterItem<'_, TBl, TAsync>> {
self.non_finalized_blocks
.iter_unordered()
.map(move |(id, b)| {
let async_op_user_data = match &b.async_op {
AsyncOpState::Finished {
reported: true,
user_data,
} => Some(user_data),
_ => None,
};
InputIterItem {
id,
user_data: &b.user_data,
async_op_user_data,
is_output_best: self.output_best_block_index == Some(id),
}
})
}
pub fn async_op_blocks(&self, async_op_id: AsyncOpId) -> impl Iterator<Item = &TBl> {
self.non_finalized_blocks
.iter_unordered()
.map(|(_, b)| b)
.filter(move |b| {
matches!(b.async_op, AsyncOpState::InProgress { async_op_id: id, .. } if id == async_op_id)
})
.map(|b| &b.user_data)
}
pub fn async_op_finished(&mut self, async_op_id: AsyncOpId, user_data: TAsync) -> Vec<NodeIndex>
where
TAsync: Clone,
{
let list = self
.non_finalized_blocks
.iter_unordered()
.filter(|(_, b)| {
matches!(b.async_op,
AsyncOpState::InProgress {
async_op_id: id, ..
} if id == async_op_id)
})
.map(|(b, _)| b)
.collect::<Vec<_>>();
for index in &list {
let block = self.non_finalized_blocks.get_mut(*index).unwrap();
match block.async_op {
AsyncOpState::InProgress {
async_op_id: id, ..
} if id == async_op_id => {
block.async_op = AsyncOpState::Finished {
user_data: user_data.clone(),
reported: false,
};
}
_ => {}
}
}
list
}
pub fn async_op_failure(&mut self, async_op_id: AsyncOpId, now: &TNow) {
let new_timeout = now.clone() + self.retry_after_failed;
for index in self
.non_finalized_blocks
.iter_ancestry_order()
.map(|(index, _)| index)
.collect::<Vec<_>>()
.into_iter()
.rev()
{
let new_timeout = match self.non_finalized_blocks.get_mut(index).unwrap().async_op {
AsyncOpState::InProgress {
async_op_id: id,
timeout: Some(ref timeout),
} if id == async_op_id => Some(cmp::min(timeout.clone(), new_timeout.clone())),
AsyncOpState::InProgress {
async_op_id: id,
timeout: None,
} if id == async_op_id => Some(new_timeout.clone()),
_ => continue,
};
let same_as_parent = self
.non_finalized_blocks
.parent(index)
.map_or(false, |idx| {
match self.non_finalized_blocks.get(idx).unwrap().async_op {
AsyncOpState::InProgress {
async_op_id: id, ..
} => id == async_op_id,
_ => false,
}
});
self.non_finalized_blocks.get_mut(index).unwrap().async_op = AsyncOpState::Pending {
same_as_parent,
timeout: new_timeout,
};
}
}
pub fn next_necessary_async_op(&mut self, now: &TNow) -> NextNecessaryAsyncOp<TNow> {
let mut when_not_ready = None;
if let Some(idx) = self.input_finalized_index {
match self.start_necessary_async_op(idx, now) {
NextNecessaryAsyncOpInternal::Ready(async_op_id, block_index) => {
return NextNecessaryAsyncOp::Ready(AsyncOpParams {
id: async_op_id,
block_index,
});
}
NextNecessaryAsyncOpInternal::NotReady { when } => {
when_not_ready = match (when, when_not_ready.take()) {
(None, None) => None,
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(Some(a), Some(b)) => Some(cmp::min(a, b)),
};
}
}
}
if let Some((idx, _)) = self
.non_finalized_blocks
.iter_unordered()
.max_by_key(|(_, b)| b.input_best_block_weight)
{
match self.start_necessary_async_op(idx, now) {
NextNecessaryAsyncOpInternal::Ready(async_op_id, block_index) => {
return NextNecessaryAsyncOp::Ready(AsyncOpParams {
id: async_op_id,
block_index,
});
}
NextNecessaryAsyncOpInternal::NotReady { when } => {
when_not_ready = match (when, when_not_ready.take()) {
(None, None) => None,
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(Some(a), Some(b)) => Some(cmp::min(a, b)),
};
}
}
}
for idx in self
.non_finalized_blocks
.iter_unordered()
.map(|(idx, _)| idx)
.collect::<Vec<_>>()
{
match self.start_necessary_async_op(idx, now) {
NextNecessaryAsyncOpInternal::Ready(async_op_id, block_index) => {
return NextNecessaryAsyncOp::Ready(AsyncOpParams {
id: async_op_id,
block_index,
});
}
NextNecessaryAsyncOpInternal::NotReady { when } => {
when_not_ready = match (when, when_not_ready.take()) {
(None, None) => None,
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(Some(a), Some(b)) => Some(cmp::min(a, b)),
};
}
}
}
NextNecessaryAsyncOp::NotReady {
when: when_not_ready,
}
}
fn start_necessary_async_op(
&mut self,
block_index: NodeIndex,
now: &TNow,
) -> NextNecessaryAsyncOpInternal<TNow> {
match self
.non_finalized_blocks
.get_mut(block_index)
.unwrap()
.async_op
{
AsyncOpState::Pending {
same_as_parent: false,
ref timeout,
..
} if timeout.as_ref().map_or(true, |t| t <= now) => {}
AsyncOpState::Pending {
same_as_parent: false,
ref timeout,
..
} => {
return NextNecessaryAsyncOpInternal::NotReady {
when: timeout.clone(),
};
}
_ => return NextNecessaryAsyncOpInternal::NotReady { when: None },
};
let async_op_id = self.next_async_op_id;
self.next_async_op_id.0 += 1;
let mut to_update = Vec::new();
for (child_index, _) in self.non_finalized_blocks.iter_unordered() {
if !self
.non_finalized_blocks
.is_ancestor(block_index, child_index)
{
continue;
}
if !self
.non_finalized_blocks
.node_to_root_path(child_index)
.take_while(|idx| *idx != block_index)
.all(|idx| {
matches!(
self.non_finalized_blocks.get(idx).unwrap().async_op,
AsyncOpState::Pending {
same_as_parent: true,
..
}
)
})
{
continue;
}
to_update.push(child_index);
}
debug_assert!(to_update.iter().any(|idx| *idx == block_index));
for to_update in to_update {
self.non_finalized_blocks
.get_mut(to_update)
.unwrap()
.async_op = AsyncOpState::InProgress {
async_op_id,
timeout: None,
};
}
NextNecessaryAsyncOpInternal::Ready(async_op_id, block_index)
}
pub fn input_insert_block(
&mut self,
block: TBl,
parent_index: Option<NodeIndex>,
same_async_op_as_parent: bool,
is_new_best: bool,
) -> NodeIndex {
let input_best_block_weight = if is_new_best {
let id = self.input_best_block_next_weight;
debug_assert!(
self.non_finalized_blocks
.iter_unordered()
.all(|(_, b)| b.input_best_block_weight < id)
);
self.input_best_block_next_weight += 1;
id
} else {
0
};
let async_op = match (same_async_op_as_parent, parent_index) {
(true, Some(parent_index)) => {
match &self
.non_finalized_blocks
.get(parent_index)
.unwrap()
.async_op
{
AsyncOpState::InProgress { async_op_id, .. } => AsyncOpState::InProgress {
async_op_id: *async_op_id,
timeout: None,
},
AsyncOpState::Finished { user_data, .. } => AsyncOpState::Finished {
user_data: user_data.clone(),
reported: false,
},
AsyncOpState::Pending { .. } => AsyncOpState::Pending {
same_as_parent: true,
timeout: None,
},
}
}
(true, None) => AsyncOpState::Finished {
user_data: self.output_finalized_async_user_data.clone(),
reported: false,
},
(false, _) => AsyncOpState::Pending {
same_as_parent: false,
timeout: None,
},
};
let new_index = self.non_finalized_blocks.insert(
parent_index,
Block {
user_data: block,
async_op,
input_best_block_weight,
},
);
if is_new_best {
self.input_best_block_index = Some(new_index);
}
new_index
}
pub fn input_set_best_block(&mut self, new_best_block: Option<NodeIndex>) {
assert!(match (self.input_finalized_index, new_best_block) {
(Some(f), Some(b)) => self.non_finalized_blocks.is_ancestor(f, b),
(Some(_), None) => false,
(None, Some(b)) => {
assert!(self.non_finalized_blocks.contains(b));
true
}
(None, None) => true,
});
self.input_best_block_index = new_best_block;
match new_best_block
.map(|new_best_block| {
&mut self
.non_finalized_blocks
.get_mut(new_best_block)
.unwrap()
.input_best_block_weight
})
.unwrap_or(&mut self.output_finalized_block_weight)
{
w if *w == self.input_best_block_next_weight - 1 => {}
w => {
*w = self.input_best_block_next_weight;
self.input_best_block_next_weight += 1;
}
}
debug_assert!(
self.non_finalized_blocks
.iter_unordered()
.all(|(_, b)| b.input_best_block_weight < self.input_best_block_next_weight)
);
}
pub fn input_finalize(&mut self, node_to_finalize: NodeIndex) {
assert!(
self.input_best_block_index
.map_or(false, |current_input_best| self
.non_finalized_blocks
.is_ancestor(node_to_finalize, current_input_best))
);
self.input_finalized_index = Some(node_to_finalize);
}
pub fn try_advance_output(&mut self) -> Option<OutputUpdate<TBl, TAsync>> {
if let Some(input_finalized_index) = self.input_finalized_index {
let new_finalized = {
self.non_finalized_blocks
.root_to_node_path(input_finalized_index)
.take(1)
.find(|node_index| {
matches!(
self.non_finalized_blocks.get(*node_index).unwrap().async_op,
AsyncOpState::Finished { reported: true, .. }
)
})
};
if let Some(new_finalized) = new_finalized {
if self.input_finalized_index == Some(new_finalized) {
self.input_finalized_index = None;
}
if self.input_best_block_index == Some(new_finalized) {
self.input_best_block_index = None;
}
let mut pruned_blocks = Vec::new();
let mut pruned_finalized = None;
let mut best_output_block_updated = false;
if self.output_best_block_index.is_none() {
best_output_block_updated = true;
}
for pruned in self.non_finalized_blocks.prune_ancestors(new_finalized) {
debug_assert_ne!(Some(pruned.index), self.input_finalized_index);
if self
.output_best_block_index
.map_or(false, |b| b == pruned.index)
{
self.output_best_block_index = None;
best_output_block_updated = true;
}
if pruned.index == new_finalized {
self.output_finalized_block_weight =
pruned.user_data.input_best_block_weight;
pruned_finalized = Some(pruned);
continue;
}
let async_op = match pruned.user_data.async_op {
AsyncOpState::Finished {
user_data,
reported,
..
} => {
if reported { Some(user_data) } else { None }
}
_ => None,
};
pruned_blocks.push((pruned.index, pruned.user_data.user_data, async_op));
}
let mut previously_reported_best_block_weight = match self.output_best_block_index {
None => self.output_finalized_block_weight,
Some(idx) => {
self.non_finalized_blocks
.get(idx)
.unwrap()
.input_best_block_weight
}
};
for (node_index, block) in self.non_finalized_blocks.iter_unordered() {
debug_assert!(
block.input_best_block_weight != previously_reported_best_block_weight
|| block.input_best_block_weight == 0
|| self.output_best_block_index == Some(node_index)
);
if block.input_best_block_weight <= previously_reported_best_block_weight {
continue;
}
if !matches!(
block.async_op,
AsyncOpState::Finished { reported: true, .. }
) {
continue;
}
previously_reported_best_block_weight = block.input_best_block_weight;
self.output_best_block_index = Some(node_index);
best_output_block_updated = true;
}
let pruned_finalized = pruned_finalized.unwrap();
let former_finalized_async_op_user_data = match pruned_finalized.user_data.async_op
{
AsyncOpState::Finished { user_data, .. } => {
mem::replace(&mut self.output_finalized_async_user_data, user_data)
}
_ => unreachable!(),
};
return Some(OutputUpdate::Finalized {
former_index: new_finalized,
user_data: pruned_finalized.user_data.user_data,
former_finalized_async_op_user_data,
pruned_blocks,
best_output_block_updated,
});
}
}
for node_index in self
.non_finalized_blocks
.iter_unordered()
.map(|(idx, _)| idx)
.collect::<Vec<_>>()
{
if let Some(parent) = self.non_finalized_blocks.parent(node_index) {
if !matches!(
self.non_finalized_blocks.get(parent).unwrap().async_op,
AsyncOpState::Finished { reported: true, .. }
) {
continue;
}
}
match &mut self
.non_finalized_blocks
.get_mut(node_index)
.unwrap()
.async_op
{
AsyncOpState::Finished { reported, .. } if !*reported => {
*reported = true;
}
_ => continue,
}
let is_new_best = self
.non_finalized_blocks
.get(node_index)
.unwrap()
.input_best_block_weight
> self
.output_best_block_index
.map_or(self.output_finalized_block_weight, |idx| {
self.non_finalized_blocks
.get(idx)
.unwrap()
.input_best_block_weight
});
if is_new_best {
debug_assert_ne!(self.output_best_block_index, Some(node_index));
self.output_best_block_index = Some(node_index);
}
return Some(OutputUpdate::Block(OutputUpdateBlock {
index: node_index,
is_new_best,
}));
}
{
let mut best_block_updated = false;
let mut current_runtime_service_best_block_weight = match self.output_best_block_index {
None => self.output_finalized_block_weight,
Some(idx) => {
self.non_finalized_blocks
.get(idx)
.unwrap()
.input_best_block_weight
}
};
for (node_index, block) in self.non_finalized_blocks.iter_unordered() {
debug_assert!(
block.input_best_block_weight != current_runtime_service_best_block_weight
|| block.input_best_block_weight == 0
|| self.output_best_block_index == Some(node_index)
);
if block.input_best_block_weight <= current_runtime_service_best_block_weight {
continue;
}
if !matches!(
block.async_op,
AsyncOpState::Finished { reported: true, .. }
) {
continue;
}
current_runtime_service_best_block_weight = block.input_best_block_weight;
self.output_best_block_index = Some(node_index);
best_block_updated = true;
}
if best_block_updated {
return Some(OutputUpdate::BestBlockChanged {
best_block_index: self.output_best_block_index,
});
}
}
None
}
}
impl<TNow, TBl, TAsync> ops::Index<NodeIndex> for AsyncTree<TNow, TBl, TAsync> {
type Output = TBl;
fn index(&self, node_index: NodeIndex) -> &Self::Output {
&self.non_finalized_blocks.get(node_index).unwrap().user_data
}
}
impl<TNow, TBl, TAsync> ops::IndexMut<NodeIndex> for AsyncTree<TNow, TBl, TAsync> {
fn index_mut(&mut self, node_index: NodeIndex) -> &mut Self::Output {
&mut self
.non_finalized_blocks
.get_mut(node_index)
.unwrap()
.user_data
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputIterItem<'a, TBl, TAsync> {
pub id: NodeIndex,
pub user_data: &'a TBl,
pub async_op_user_data: Option<&'a TAsync>,
pub is_output_best: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OutputUpdate<TBl, TAsync> {
Finalized {
former_index: NodeIndex,
user_data: TBl,
former_finalized_async_op_user_data: TAsync,
best_output_block_updated: bool,
pruned_blocks: Vec<(NodeIndex, TBl, Option<TAsync>)>,
},
Block(OutputUpdateBlock),
BestBlockChanged {
best_block_index: Option<NodeIndex>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OutputUpdateBlock {
pub index: NodeIndex,
pub is_new_best: bool,
}
struct Block<TNow, TBl, TAsync> {
user_data: TBl,
async_op: AsyncOpState<TNow, TAsync>,
input_best_block_weight: u32,
}
enum AsyncOpState<TNow, TAsync> {
Finished {
user_data: TAsync,
reported: bool,
},
InProgress {
async_op_id: AsyncOpId,
timeout: Option<TNow>,
},
Pending {
same_as_parent: bool,
timeout: Option<TNow>,
},
}
#[derive(Debug)]
enum NextNecessaryAsyncOpInternal<TNow> {
Ready(AsyncOpId, NodeIndex),
NotReady { when: Option<TNow> },
}