use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _};
use sp_blockchain::{HashAndNumber, HeaderMetadata, TreeRoute};
use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
const LOG_TARGET: &str = "level-monitor";
pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32;
const CLEANUP_THRESHOLD: u32 = 32;
pub enum LevelLimit {
Default,
None,
Some(usize),
}
pub struct LevelMonitor<Block: BlockT, BE> {
level_limit: usize,
pub(crate) import_counter: NumberFor<Block>,
pub(crate) freshness: HashMap<Block::Hash, NumberFor<Block>>,
pub(crate) levels: HashMap<NumberFor<Block>, HashSet<Block::Hash>>,
lowest_level: NumberFor<Block>,
backend: Arc<BE>,
}
struct TargetInfo<Block: BlockT> {
freshest_leaf_idx: usize,
freshest_route: TreeRoute<Block>,
}
impl<Block, BE> LevelMonitor<Block, BE>
where
Block: BlockT,
BE: Backend<Block>,
{
pub fn new(level_limit: usize, backend: Arc<BE>) -> Self {
let mut monitor = LevelMonitor {
level_limit,
import_counter: Zero::zero(),
freshness: HashMap::new(),
levels: HashMap::new(),
lowest_level: Zero::zero(),
backend,
};
monitor.restore();
monitor
}
fn restore(&mut self) {
let info = self.backend.blockchain().info();
log::debug!(
target: LOG_TARGET,
"Restoring chain level monitor from last finalized block: {} {}",
info.finalized_number,
info.finalized_hash
);
self.lowest_level = info.finalized_number;
self.import_counter = info.finalized_number;
for leaf in self.backend.blockchain().leaves().unwrap_or_default() {
let Ok(mut meta) = self.backend.blockchain().header_metadata(leaf) else {
log::debug!(
target: LOG_TARGET,
"Could not fetch header metadata for leaf: {leaf:?}",
);
continue;
};
self.import_counter = self.import_counter.max(meta.number);
while !self.freshness.contains_key(&meta.hash) {
self.freshness.insert(meta.hash, meta.number);
self.levels.entry(meta.number).or_default().insert(meta.hash);
if meta.number <= self.lowest_level {
break;
}
meta = match self.backend.blockchain().header_metadata(meta.parent) {
Ok(m) => m,
Err(_) => {
log::debug!(
target: LOG_TARGET,
"Could not fetch header metadata for parent: {:?}",
meta.parent,
);
break;
},
}
}
}
log::debug!(
target: LOG_TARGET,
"Restored chain level monitor up to height {}",
self.import_counter
);
}
pub fn enforce_limit(&mut self, number: NumberFor<Block>) {
let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default();
if level_len < self.level_limit {
return;
}
let mut leaves = self.backend.blockchain().leaves().unwrap_or_default();
leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b)));
let mut invalidated_leaves = HashSet::new();
let remove_count = level_len - self.level_limit + 1;
log::debug!(
target: LOG_TARGET,
"Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks",
);
(0..remove_count).all(|_| {
self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| {
self.remove_target(target, number, &leaves, &mut invalidated_leaves);
true
})
});
}
fn find_target(
&self,
number: NumberFor<Block>,
leaves: &[Block::Hash],
invalidated_leaves: &HashSet<usize>,
) -> Option<TargetInfo<Block>> {
let mut target_info: Option<TargetInfo<Block>> = None;
let blockchain = self.backend.blockchain();
let best_hash = blockchain.info().best_hash;
let mut assigned_leaves = HashSet::new();
let level = self.levels.get(&number)?;
for blk_hash in level.iter().filter(|hash| **hash != best_hash) {
let candidate_info = leaves
.iter()
.enumerate()
.filter(|(leaf_idx, _)| {
!assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx)
})
.rev()
.find_map(|(leaf_idx, leaf_hash)| {
if blk_hash == leaf_hash {
let entry = HashAndNumber { number, hash: *blk_hash };
TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo {
freshest_leaf_idx: leaf_idx,
freshest_route,
})
} else {
match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) {
Ok(route) if route.retracted().is_empty() => Some(TargetInfo {
freshest_leaf_idx: leaf_idx,
freshest_route: route,
}),
Err(err) => {
log::warn!(
target: LOG_TARGET,
"(Lookup) Unable getting route from {:?} to {:?}: {}",
blk_hash,
leaf_hash,
err,
);
None
},
_ => None,
}
}
});
let candidate_info = match candidate_info {
Some(candidate_info) => {
assigned_leaves.insert(candidate_info.freshest_leaf_idx);
candidate_info
},
None => {
log::error!(
target: LOG_TARGET,
"Unable getting route to any leaf from {:?} (this is a bug)",
blk_hash,
);
continue;
},
};
let is_less_fresh = || {
target_info
.as_ref()
.map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx)
.unwrap_or(true)
};
let not_contains_best = || {
candidate_info
.freshest_route
.enacted()
.iter()
.all(|entry| entry.hash != best_hash)
};
if is_less_fresh() && not_contains_best() {
let early_stop = candidate_info.freshest_leaf_idx == 0;
target_info = Some(candidate_info);
if early_stop {
break;
}
}
}
target_info
}
fn remove_target(
&mut self,
target: TargetInfo<Block>,
number: NumberFor<Block>,
leaves: &[Block::Hash],
invalidated_leaves: &mut HashSet<usize>,
) {
let mut remove_leaf = |number, hash| {
log::debug!(target: LOG_TARGET, "Removing block (@{}) {:?}", number, hash);
if let Err(err) = self.backend.remove_leaf_block(hash) {
log::debug!(target: LOG_TARGET, "Remove not possible for {}: {}", hash, err);
return false;
}
self.levels.get_mut(&number).map(|level| level.remove(&hash));
self.freshness.remove(&hash);
true
};
invalidated_leaves.insert(target.freshest_leaf_idx);
let mut remove_route = |route: TreeRoute<Block>| {
route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash));
};
let target_hash = target.freshest_route.common_block().hash;
debug_assert_eq!(
target.freshest_route.common_block().number,
number,
"This is a bug in LevelMonitor::find_target() or the Backend is corrupted"
);
remove_route(target.freshest_route);
let to_skip = leaves.len() - target.freshest_leaf_idx;
leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| {
if invalidated_leaves.contains(&leaf_idx) {
return;
}
match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) {
Ok(route) if route.retracted().is_empty() => {
invalidated_leaves.insert(leaf_idx);
remove_route(route);
},
Err(err) => {
log::warn!(
target: LOG_TARGET,
"(Removal) unable getting route from {:?} to {:?}: {}",
target_hash,
leaf_hash,
err,
);
},
_ => (),
};
});
remove_leaf(number, target_hash);
}
pub fn block_imported(&mut self, number: NumberFor<Block>, hash: Block::Hash) {
let finalized_num = self.backend.blockchain().info().finalized_number;
if number > finalized_num {
self.import_counter += One::one();
self.freshness.insert(hash, self.import_counter);
self.levels.entry(number).or_default().insert(hash);
}
let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into();
if delta >= CLEANUP_THRESHOLD {
for i in 0..delta {
let number = self.lowest_level + i.unique_saturated_into();
self.levels.remove(&number).map(|level| {
level.iter().for_each(|hash| {
self.freshness.remove(hash);
})
});
}
self.lowest_level = finalized_num;
}
}
}