use std::collections::{HashMap, HashSet, hash_map};
use std::sync::Arc;
use std::time::Duration;
use blockstore::Blockstore;
use lumina_utils::executor::{JoinHandle, spawn};
use lumina_utils::time::{Instant, sleep};
use tendermint::Time;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
use crate::daser::{Daser, DaserError};
use crate::events::{EventPublisher, NodeEvent};
use crate::p2p::P2pError;
use crate::store::{BlockRanges, Store, StoreError};
use crate::utils::TimeExt;
const MAX_PRUNABLE_BATCH_SIZE: u64 = 512;
type Result<T, E = PrunerError> = std::result::Result<T, E>;
#[derive(Debug, thiserror::Error)]
pub(crate) enum PrunerError {
#[error("P2p: {0}")]
P2p(#[from] P2pError),
#[error("Syncer: {0}")]
Store(#[from] StoreError),
#[error("Blockstore: {0}")]
Blockstore(#[from] blockstore::Error),
#[error("Daser: {0}")]
Daser(#[from] DaserError),
}
pub(crate) struct Pruner {
cancellation_token: CancellationToken,
join_handle: JoinHandle,
}
pub(crate) struct PrunerArgs<S, B>
where
S: Store,
B: Blockstore,
{
pub daser: Arc<Daser>,
pub store: Arc<S>,
pub blockstore: Arc<B>,
pub event_pub: EventPublisher,
pub block_time: Duration,
pub pruning_window: Duration,
pub sampling_window: Duration,
}
impl Pruner {
pub(crate) fn start<S, B>(args: PrunerArgs<S, B>) -> Self
where
S: Store + 'static,
B: Blockstore + 'static,
{
let cancellation_token = CancellationToken::new();
let event_pub = args.event_pub.clone();
let mut worker = Worker::new(args, cancellation_token.child_token());
let join_handle = spawn(async move {
if let Err(e) = worker.run().await {
error!("Pruner stopped because of a fatal error: {e}");
event_pub.send(NodeEvent::FatalPrunerError {
error: e.to_string(),
});
}
});
Pruner {
cancellation_token,
join_handle,
}
}
pub(crate) fn stop(&self) {
self.cancellation_token.cancel();
}
pub(crate) async fn join(&self) {
self.join_handle.join().await;
}
}
impl Drop for Pruner {
fn drop(&mut self) {
self.stop();
}
}
struct Worker<S, B>
where
S: Store + 'static,
B: Blockstore + 'static,
{
daser: Arc<Daser>,
cancellation_token: CancellationToken,
event_pub: EventPublisher,
store: Arc<S>,
blockstore: Arc<B>,
block_time: Duration,
pruning_window: Duration,
sampling_window: Duration,
prev_num_of_prunable_blocks: u64,
cache: Cache,
}
#[derive(Default)]
struct Cache {
updated_at: Option<Instant>,
after_pruning_window: Option<u64>,
after_sampling_window: Option<u64>,
block_info: HashMap<u64, BlockInfo>,
keep_block_info: HashSet<u64>,
}
#[derive(Debug, Clone)]
struct BlockInfo {
height: u64,
time: Time,
}
impl Cache {
fn garbage_collect(&mut self) {
self.block_info.retain(|&height, _| {
self.after_pruning_window == Some(height)
|| self.after_sampling_window == Some(height)
|| self.keep_block_info.contains(&height)
});
}
async fn get_block_info<S>(&mut self, store: &S, height: u64) -> Result<BlockInfo>
where
S: Store,
{
match self.block_info.entry(height) {
hash_map::Entry::Occupied(entry) => Ok(entry.get().to_owned()),
hash_map::Entry::Vacant(entry) => {
let header = store.get_by_height(height).await?;
let info = BlockInfo {
height: header.height(),
time: header.time(),
};
entry.insert(info.clone());
Ok(info)
}
}
}
}
impl<S, B> Worker<S, B>
where
S: Store,
B: Blockstore,
{
fn new(args: PrunerArgs<S, B>, cancellation_token: CancellationToken) -> Self {
Worker {
cancellation_token,
event_pub: args.event_pub,
store: args.store,
blockstore: args.blockstore,
block_time: args.block_time,
pruning_window: args.pruning_window,
sampling_window: args.sampling_window,
daser: args.daser,
prev_num_of_prunable_blocks: 0,
cache: Cache {
updated_at: None,
after_pruning_window: None,
after_sampling_window: None,
block_info: HashMap::new(),
keep_block_info: HashSet::new(),
},
}
}
async fn run(&mut self) -> Result<()> {
loop {
if self.cancellation_token.is_cancelled() {
break;
}
let now = Time::now();
let sampling_window_end = now.saturating_sub(self.sampling_window);
let pruning_window_end = now.saturating_sub(self.pruning_window);
let prunable_batch = self
.get_next_prunable_batch(sampling_window_end, pruning_window_end)
.await?;
if prunable_batch.is_empty() {
select! {
_ = self.cancellation_token.cancelled() => break,
_ = sleep(self.block_time) => continue,
}
}
info!("Going to prune {} blocks", prunable_batch.len());
for range in prunable_batch.into_inner() {
let from_height = *range.start();
let mut to_height = None;
for height in range {
if self.cancellation_token.is_cancelled() {
break;
}
let cids = self
.store
.get_sampling_metadata(height)
.await?
.map(|m| m.cids)
.unwrap_or_default();
for cid in cids {
self.blockstore.remove(&cid).await?;
}
self.store.remove_height(height).await?;
to_height = Some(height);
}
if let Some(to_height) = to_height {
self.event_pub.send(NodeEvent::PrunedHeaders {
from_height,
to_height,
});
}
}
}
debug!("Pruner stopped");
Ok(())
}
async fn update_cached_data(
&mut self,
stored_blocks: &BlockRanges,
sampling_cutoff: &Time,
pruning_cutoff: &Time,
) -> Result<()> {
let update_after = Duration::from_secs(1).min(self.block_time);
if self
.cache
.updated_at
.is_some_and(|updated_at| updated_at.elapsed() < update_after)
{
return Ok(());
}
let after_sampling_window = find_height_after_window(
&*self.store,
stored_blocks,
sampling_cutoff,
self.cache.after_sampling_window,
&mut self.cache,
)
.await?;
let after_pruning_window = find_height_after_window(
&*self.store,
stored_blocks,
pruning_cutoff,
self.cache.after_pruning_window,
&mut self.cache,
)
.await?;
if self.cache.after_sampling_window < after_sampling_window {
self.cache.after_sampling_window = after_sampling_window;
}
if self.cache.after_pruning_window < after_pruning_window {
self.cache.after_pruning_window = after_pruning_window;
if let Some(height) = after_pruning_window {
self.daser.update_highest_prunable_block(height).await?;
}
}
self.cache.keep_block_info.clear();
if let Some(stored_tail) = stored_blocks.tail() {
self.cache.keep_block_info.insert(stored_tail);
}
if let Some(sampling_window_tail) = self
.cache
.after_sampling_window
.and_then(|height| stored_blocks.right_of(height))
{
self.cache.keep_block_info.insert(sampling_window_tail);
}
if let Some(pruning_window_tail) = self
.cache
.after_pruning_window
.and_then(|height| stored_blocks.right_of(height))
{
self.cache.keep_block_info.insert(pruning_window_tail);
}
self.cache.garbage_collect();
self.cache.updated_at = Some(Instant::now());
Ok(())
}
async fn get_next_prunable_batch(
&mut self,
sampling_cutoff: Time,
pruning_cutoff: Time,
) -> Result<BlockRanges> {
let stored_ranges = self.store.get_stored_header_ranges().await?;
let pruned_ranges = self.store.get_pruned_ranges().await?;
let sampled_ranges = self.store.get_sampled_ranges().await?;
self.update_cached_data(&stored_ranges, &sampling_cutoff, &pruning_cutoff)
.await?;
let non_sampling_area = self
.cache
.after_sampling_window
.map(|height| BlockRanges::try_from(1..=height).expect("never fails"))
.unwrap_or_default();
let prunable_area = self
.cache
.after_pruning_window
.map(|height| BlockRanges::try_from(1..=height).expect("never fails"))
.unwrap_or_default();
let synced_ranges = pruned_ranges + &stored_ranges;
let edges = synced_ranges.edges();
let prune_candidates = stored_ranges & &prunable_area;
let after_sampling_window = prune_candidates.clone() & &non_sampling_area;
let prunable_and_sampled =
(prune_candidates.clone() - &after_sampling_window - &edges) & &sampled_ranges;
let num_of_prunable_blocks = after_sampling_window.len() + prunable_and_sampled.len();
if self.prev_num_of_prunable_blocks != num_of_prunable_blocks {
self.daser
.update_number_of_prunable_blocks(num_of_prunable_blocks)
.await?;
self.prev_num_of_prunable_blocks = num_of_prunable_blocks;
}
let mut prunable_batch = prunable_and_sampled.headn(MAX_PRUNABLE_BATCH_SIZE);
for height in after_sampling_window.rev() {
if prunable_batch.len() == MAX_PRUNABLE_BATCH_SIZE {
break;
}
if sampled_ranges.contains(height) || self.daser.want_to_prune(height).await? {
prunable_batch
.insert_relaxed(height..=height)
.expect("never fails");
}
}
Ok(prunable_batch)
}
}
async fn find_height_after_window<S>(
store: &S,
stored_headers: &BlockRanges,
cutoff: &Time,
prev_after_window: Option<u64>,
cache: &mut Cache,
) -> Result<Option<u64>>
where
S: Store,
{
if let Some(res) =
find_height_after_window_fast(store, stored_headers, cutoff, prev_after_window, cache)
.await?
{
return Ok(res);
}
find_height_after_window_slow(store, stored_headers, cutoff, cache).await
}
async fn find_height_after_window_fast<S>(
store: &S,
stored_headers: &BlockRanges,
cutoff: &Time,
prev_after_window: Option<u64>,
cache: &mut Cache,
) -> Result<Option<Option<u64>>>
where
S: Store,
{
match prev_after_window {
Some(prev_after_window) => match stored_headers.right_of(prev_after_window) {
Some(right_of_after_window) => {
let block_info = cache.get_block_info(store, right_of_after_window).await?;
if *cutoff < block_info.time {
if stored_headers.contains(prev_after_window) {
return Ok(Some(Some(prev_after_window)));
} else {
return Ok(Some(stored_headers.left_of(prev_after_window)));
}
}
match stored_headers.right_of(right_of_after_window) {
Some(right_of_right) => {
let block_info = cache.get_block_info(store, right_of_right).await?;
if *cutoff < block_info.time {
return Ok(Some(Some(right_of_after_window)));
}
}
None => return Ok(Some(Some(right_of_after_window))),
}
}
None => {
if stored_headers.contains(prev_after_window) {
return Ok(Some(Some(prev_after_window)));
} else {
return Ok(Some(stored_headers.left_of(prev_after_window)));
}
}
},
None => {
let Some(tail) = stored_headers.tail() else {
return Ok(Some(None));
};
let block_info = cache.get_block_info(store, tail).await?;
if *cutoff < block_info.time {
return Ok(Some(None));
}
}
}
Ok(None)
}
async fn find_height_after_window_slow<S>(
store: &S,
stored_headers: &BlockRanges,
cutoff: &Time,
cache: &mut Cache,
) -> Result<Option<u64>>
where
S: Store,
{
let mut ranges = stored_headers.to_owned();
let mut highest: Option<BlockInfo> = None;
while let Some((left, middle, right)) = ranges.partitions() {
let middle = cache.get_block_info(store, middle).await?;
if middle.time < *cutoff {
if highest
.as_ref()
.is_none_or(|highest| highest.time < middle.time)
{
highest = Some(middle);
}
ranges = right;
} else {
ranges = left;
}
}
Ok(highest.map(|block_info| block_info.height))
}
#[cfg(test)]
mod test {
use blockstore::block::{Block, CidError};
use celestia_types::test_utils::ExtendedHeaderGenerator;
use cid::CidGeneric;
use cid::multihash::Multihash;
use super::*;
use crate::blockstore::InMemoryBlockstore;
use crate::events::{EventChannel, EventSubscriber, TryRecvError};
use crate::node::{DEFAULT_PRUNING_WINDOW, SAMPLING_WINDOW};
use crate::store::InMemoryStore;
use crate::test_utils::{ExtendedHeaderGeneratorExt, gen_filled_store, new_block_ranges};
use lumina_utils::test_utils::async_test;
use lumina_utils::time::timeout;
const TEST_CODEC: u64 = 0x0D;
const TEST_MH_CODE: u64 = 0x0D;
#[async_test]
async fn prunable_height() {
let now = Time::now();
let store = InMemoryStore::new();
let mut cache = Cache::default();
let pruning_window = Duration::from_secs(60);
let mut generator = ExtendedHeaderGenerator::new();
let first_header_time = (now - Duration::from_secs(120)).unwrap();
generator.set_time(first_header_time, Duration::from_secs(1));
store.insert(generator.next_many(120)).await.unwrap();
let stored_headers = store.get_stored_header_ranges().await.unwrap();
let pruning_cutoff = now.saturating_sub(pruning_window);
assert_eq!(
find_height_after_window_slow(&store, &stored_headers, &pruning_cutoff, &mut cache)
.await
.unwrap(),
Some(59)
);
}
#[async_test]
async fn prunable_height_with_gaps() {
let now = Time::now();
let store = InMemoryStore::new();
let mut cache = Cache::default();
let pruning_window = Duration::from_secs(60);
let mut generator = ExtendedHeaderGenerator::new();
let first_header_time = (now - Duration::from_secs(120)).unwrap();
generator.set_time(first_header_time, Duration::from_secs(1));
let headers = generator.next_many(120);
store.insert(&headers[0..10]).await.unwrap();
store.insert(&headers[20..30]).await.unwrap();
store.insert(&headers[65..70]).await.unwrap();
store.insert(&headers[80..90]).await.unwrap();
store.insert(&headers[100..120]).await.unwrap();
let stored_headers = store.get_stored_header_ranges().await.unwrap();
let pruning_cutoff = now.saturating_sub(pruning_window);
assert_eq!(
find_height_after_window_slow(&store, &stored_headers, &pruning_cutoff, &mut cache)
.await
.unwrap(),
Some(30)
);
store.insert(&headers[58..=64]).await.unwrap();
assert_eq!(
find_height_after_window_slow(&store, &stored_headers, &pruning_cutoff, &mut cache)
.await
.unwrap(),
Some(30)
);
}
#[async_test]
async fn prunable_height_beyond_genesis() {
let now = Time::now();
let store = InMemoryStore::new();
let mut cache = Cache::default();
let pruning_window = Duration::from_secs(121);
let mut generator = ExtendedHeaderGenerator::new();
let first_header_time = (now - Duration::from_secs(120)).unwrap();
generator.set_time(first_header_time, Duration::from_secs(1));
let headers = generator.next_many(120);
store.insert(headers).await.unwrap();
let stored_headers = store.get_stored_header_ranges().await.unwrap();
let pruning_cutoff = now.saturating_sub(pruning_window);
assert_eq!(
find_height_after_window_slow(&store, &stored_headers, &pruning_cutoff, &mut cache)
.await
.unwrap(),
None
);
}
#[async_test]
async fn prunable_height_cached_empty_store() {
let now = Time::now();
let store = InMemoryStore::new();
let mut cache = Cache::default();
let pruning_window = Duration::from_secs(60);
let stored_headers = store.get_stored_header_ranges().await.unwrap();
let pruning_cutoff = now.saturating_sub(pruning_window);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
None,
&mut cache
)
.await
.unwrap(),
Some(None)
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(40),
&mut cache
)
.await
.unwrap(),
Some(None)
);
}
#[async_test]
async fn prunable_height_cached_all_blocks_in_window() {
let now = Time::now();
let store = InMemoryStore::new();
let mut cache = Cache::default();
let pruning_window = Duration::from_secs(60);
let mut generator = ExtendedHeaderGenerator::new();
let first_header_time = (now - Duration::from_secs(120)).unwrap();
generator.set_time(first_header_time, Duration::from_secs(1));
let headers = generator.next_many(120);
store.insert(&headers[110..120]).await.unwrap();
let stored_headers = store.get_stored_header_ranges().await.unwrap();
let pruning_cutoff = now.saturating_sub(pruning_window);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
None,
&mut cache
)
.await
.unwrap(),
Some(None)
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(40),
&mut cache
)
.await
.unwrap(),
Some(None)
);
}
#[async_test]
async fn prunable_height_cached_all_blocks_out_of_window() {
let now = Time::now();
let store = InMemoryStore::new();
let mut cache = Cache::default();
let pruning_window = Duration::from_secs(60);
let mut generator = ExtendedHeaderGenerator::new();
let first_header_time = (now - Duration::from_secs(120)).unwrap();
generator.set_time(first_header_time, Duration::from_secs(1));
let headers = generator.next_many(120);
store.insert(&headers[30..40]).await.unwrap();
let stored_headers = store.get_stored_header_ranges().await.unwrap();
let pruning_cutoff = now.saturating_sub(pruning_window);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
None,
&mut cache
)
.await
.unwrap(),
None
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(50),
&mut cache
)
.await
.unwrap(),
Some(Some(40))
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(40),
&mut cache
)
.await
.unwrap(),
Some(Some(40))
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(39),
&mut cache
)
.await
.unwrap(),
Some(Some(40))
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(38),
&mut cache
)
.await
.unwrap(),
None
);
}
#[async_test]
async fn prunable_height_cached_mixed() {
let now = Time::now();
let store = InMemoryStore::new();
let mut cache = Cache::default();
let pruning_window = Duration::from_secs(60);
let mut generator = ExtendedHeaderGenerator::new();
let first_header_time = (now - Duration::from_secs(120)).unwrap();
generator.set_time(first_header_time, Duration::from_secs(1));
let headers = generator.next_many(120);
store.insert(&headers[30..40]).await.unwrap();
store.insert(&headers[110..120]).await.unwrap();
let stored_headers = store.get_stored_header_ranges().await.unwrap();
let pruning_cutoff = now.saturating_sub(pruning_window);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
None,
&mut cache
)
.await
.unwrap(),
None
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(50),
&mut cache
)
.await
.unwrap(),
Some(Some(40))
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(40),
&mut cache
)
.await
.unwrap(),
Some(Some(40))
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(39),
&mut cache
)
.await
.unwrap(),
Some(Some(40))
);
assert_eq!(
find_height_after_window_fast(
&store,
&stored_headers,
&pruning_cutoff,
Some(38),
&mut cache
)
.await
.unwrap(),
None
);
}
#[async_test]
async fn empty_store() {
let events = EventChannel::new();
let store = Arc::new(InMemoryStore::new());
let blockstore = Arc::new(InMemoryBlockstore::new());
let mut event_subscriber = events.subscribe();
let (daser, mut daser_handle) = Daser::mocked();
let pruner = Pruner::start(PrunerArgs {
daser: Arc::new(daser),
store,
blockstore,
event_pub: events.publisher(),
block_time: Duration::from_secs(1),
pruning_window: DEFAULT_PRUNING_WINDOW,
sampling_window: SAMPLING_WINDOW,
});
sleep(Duration::from_secs(1)).await;
daser_handle.expect_no_cmd().await;
pruner.stop();
pruner.join().await;
assert!(matches!(
event_subscriber.try_recv().unwrap_err(),
TryRecvError::Empty
));
}
#[async_test]
async fn nothing_to_prune() {
let events = EventChannel::new();
let (store, _gen) = gen_filled_store(100).await;
let store = Arc::new(store);
let blockstore = Arc::new(InMemoryBlockstore::new());
let mut event_subscriber = events.subscribe();
let (daser, mut daser_handle) = Daser::mocked();
let pruner = Pruner::start(PrunerArgs {
daser: Arc::new(daser),
store: store.clone(),
blockstore,
event_pub: events.publisher(),
block_time: Duration::from_secs(1),
pruning_window: DEFAULT_PRUNING_WINDOW,
sampling_window: SAMPLING_WINDOW,
});
sleep(Duration::from_secs(1)).await;
daser_handle.expect_no_cmd().await;
pruner.stop();
pruner.join().await;
assert!(matches!(
event_subscriber.try_recv().unwrap_err(),
TryRecvError::Empty
));
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([1..=100])
);
}
#[async_test]
async fn prune_large_tail_with_cids() {
let events = EventChannel::new();
let store = Arc::new(InMemoryStore::new());
let mut generator = ExtendedHeaderGenerator::new();
let blockstore = Arc::new(InMemoryBlockstore::new());
let mut event_subscriber = events.subscribe();
let (daser, mut daser_handle) = Daser::mocked();
let first_header_time = (Time::now()
- (DEFAULT_PRUNING_WINDOW + Duration::from_secs(30 * 24 * 60 * 60)))
.unwrap();
generator.set_time(first_header_time, Duration::from_secs(1));
let blocks_with_sampling = (1..=500)
.chain(601..=1000)
.map(|height| {
let block = TestBlock::from(height);
let sampled = height % 3 == 0;
(height, block, block.cid().unwrap(), sampled)
})
.collect::<Vec<_>>();
store
.insert(generator.next_many_verified(500))
.await
.unwrap();
generator.skip(100);
store
.insert(generator.next_many_verified(400))
.await
.unwrap();
for (height, block, cid, sampled) in &blocks_with_sampling {
blockstore.put_keyed(cid, block.data()).await.unwrap();
if *sampled {
store.mark_as_sampled(*height).await.unwrap();
}
store
.update_sampling_metadata(*height, vec![*cid])
.await
.unwrap();
}
let stored_ranges = store.get_stored_header_ranges().await.unwrap();
let sampled_ranges = store.get_sampled_ranges().await.unwrap();
let pruner = Pruner::start(PrunerArgs {
daser: Arc::new(daser),
store: store.clone(),
blockstore: blockstore.clone(),
event_pub: events.publisher(),
block_time: Duration::from_secs(1),
pruning_window: DEFAULT_PRUNING_WINDOW,
sampling_window: SAMPLING_WINDOW,
});
assert_eq!(
daser_handle.expect_update_highest_prunable_block().await,
1000
);
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
900
);
for height in (601..=1000).rev().chain((1..=500).rev()) {
assert!(stored_ranges.contains(height));
if height == 388 {
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
388
);
}
if !sampled_ranges.contains(height) {
let (want_to_prune, respond_to) = daser_handle.expect_want_to_prune().await;
assert_eq!(want_to_prune, height);
respond_to.send(true).unwrap();
}
}
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
0
);
daser_handle.expect_no_cmd().await;
assert_pruned_headers_event(&mut event_subscriber, 389, 500).await;
assert_pruned_headers_event(&mut event_subscriber, 601, 1000).await;
assert_pruned_headers_event(&mut event_subscriber, 1, 388).await;
assert!(store.get_stored_header_ranges().await.unwrap().is_empty());
for (height, _, cid, _) in &blocks_with_sampling {
assert!(matches!(
store.get_sampling_metadata(*height).await.unwrap_err(),
StoreError::NotFound
));
assert!(!blockstore.has(cid).await.unwrap());
}
pruner.stop();
pruner.join().await;
assert!(matches!(
event_subscriber.try_recv().unwrap_err(),
TryRecvError::Empty
));
}
#[async_test]
async fn prune_tail() {
let block_time = Duration::from_millis(1);
let pruning_window = Duration::from_millis(4000);
let sampling_window = Duration::from_millis(500);
let events = EventChannel::new();
let store = Arc::new(InMemoryStore::new());
let mut generator = ExtendedHeaderGenerator::new();
let blockstore = Arc::new(InMemoryBlockstore::new());
let mut event_subscriber = events.subscribe();
let (daser, mut daser_handle) = Daser::mocked();
let first_header_time = (Time::now() - Duration::from_millis(5000)).unwrap();
generator.set_time(first_header_time, block_time);
store
.insert(generator.next_many_verified(10))
.await
.unwrap();
generator.skip(2480);
store
.insert(generator.next_many_verified(10))
.await
.unwrap();
generator.skip(2490);
store
.insert(generator.next_many_verified(10))
.await
.unwrap();
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([1..=10, 2491..=2500, 4991..=5000]),
);
let pruner = Pruner::start(PrunerArgs {
daser: Arc::new(daser),
store: store.clone(),
blockstore,
event_pub: events.publisher(),
block_time,
pruning_window,
sampling_window,
});
assert_eq!(
daser_handle.expect_update_highest_prunable_block().await,
10
);
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
10
);
for expected_height in (1..=10).rev() {
let (height, respond_to) = daser_handle.expect_want_to_prune().await;
assert_eq!(height, expected_height);
respond_to.send(true).unwrap();
}
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
0
);
assert_pruned_headers_event(&mut event_subscriber, 1, 10).await;
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([2491..=2500, 4991..=5000]),
);
sleep(Duration::from_millis(1500)).await;
let batch1_high_height = daser_handle.expect_update_highest_prunable_block().await;
assert!((2491..=2500).contains(&batch1_high_height));
let batch1_num_of_prunable_blocks =
daser_handle.expect_update_number_of_prunable_blocks().await;
assert!((1..=10).contains(&batch1_num_of_prunable_blocks));
for expected_height in (2491..=batch1_high_height).rev() {
let (height, respond_to) = daser_handle.expect_want_to_prune().await;
assert_eq!(height, expected_height);
respond_to.send(true).unwrap();
}
assert_pruned_headers_event(&mut event_subscriber, 2491, batch1_high_height).await;
if batch1_high_height < 2500 {
assert_eq!(
daser_handle.expect_update_highest_prunable_block().await,
2500
);
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
10 - batch1_num_of_prunable_blocks
);
for expected_height in (batch1_high_height + 1..=2500).rev() {
let (height, respond_to) = daser_handle.expect_want_to_prune().await;
assert_eq!(height, expected_height);
respond_to.send(true).unwrap();
}
assert_pruned_headers_event(&mut event_subscriber, batch1_high_height + 1, 2500).await;
}
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
0
);
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([4991..=5000]),
);
daser_handle.expect_no_cmd().await;
pruner.stop();
pruner.join().await;
assert!(matches!(
event_subscriber.try_recv().unwrap_err(),
TryRecvError::Empty
));
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([4991..=5000])
);
}
#[async_test]
async fn sampling_window_bigger_than_pruning_window() {
let pruning_window = Duration::from_secs(60);
let sampling_window = Duration::from_secs(120);
let mut generator = ExtendedHeaderGenerator::new();
let store = Arc::new(InMemoryStore::new());
let blockstore = Arc::new(InMemoryBlockstore::new());
let first_header_time = (Time::now() - Duration::from_secs(260)).unwrap();
generator.set_time(first_header_time, Duration::from_secs(1));
store
.insert(generator.next_many_empty_verified(120))
.await
.unwrap();
generator.skip(25);
store
.insert(generator.next_many_empty_verified(10))
.await
.unwrap();
generator.skip(10);
store
.insert(generator.next_many_empty_verified(10))
.await
.unwrap();
generator.skip(10);
store
.insert(generator.next_many_empty_verified(14))
.await
.unwrap();
generator.skip(2);
store
.insert(generator.next_many_empty_verified(59))
.await
.unwrap();
store.remove_height(188).await.unwrap();
store.remove_height(189).await.unwrap();
store.remove_height(192).await.unwrap();
for height in (1..=100).chain(103..=195).chain(199..=210) {
let _ = store.mark_as_sampled(height).await;
}
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([
1..=120,
146..=155,
166..=175,
186..=187,
190..=191,
193..=199,
202..=260
])
);
assert_eq!(
store.get_sampled_ranges().await.unwrap(),
new_block_ranges([
1..=100,
103..=120,
146..=155,
166..=175,
186..=187,
190..=191,
193..=195,
199..=199,
202..=210
])
);
assert_eq!(
store.get_pruned_ranges().await.unwrap(),
new_block_ranges([188..=189, 192..=192])
);
let events = EventChannel::new();
let mut event_subscriber = events.subscribe();
let (daser, mut daser_handle) = Daser::mocked();
let pruner = Pruner::start(PrunerArgs {
daser: Arc::new(daser),
store: store.clone(),
blockstore,
event_pub: events.publisher(),
block_time: Duration::from_secs(1),
pruning_window,
sampling_window,
});
assert_eq!(
daser_handle.expect_update_highest_prunable_block().await,
199
);
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
142
);
let (want_to_prune, respond_to) = daser_handle.expect_want_to_prune().await;
assert_eq!(want_to_prune, 102);
respond_to.send(true).unwrap();
let (want_to_prune, respond_to) = daser_handle.expect_want_to_prune().await;
assert_eq!(want_to_prune, 101);
respond_to.send(false).unwrap();
sleep(Duration::from_millis(100)).await;
assert_pruned_headers_event(&mut event_subscriber, 1, 100).await;
assert_pruned_headers_event(&mut event_subscriber, 102, 120).await;
assert_pruned_headers_event(&mut event_subscriber, 147, 154).await;
assert_pruned_headers_event(&mut event_subscriber, 167, 174).await;
assert_pruned_headers_event(&mut event_subscriber, 187, 187).await;
assert_pruned_headers_event(&mut event_subscriber, 190, 191).await;
assert_pruned_headers_event(&mut event_subscriber, 193, 195).await;
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([
101..=101,
146..=146,
155..=155,
166..=166,
175..=175,
186..=186,
196..=199,
202..=260
])
);
assert_eq!(
store.get_pruned_ranges().await.unwrap(),
new_block_ranges([1..=100, 102..=120, 147..=154, 167..=174, 187..=195])
);
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
1
);
let (want_to_prune, respond_to) = daser_handle.expect_want_to_prune().await;
assert_eq!(want_to_prune, 101);
respond_to.send(true).unwrap();
assert_eq!(
daser_handle.expect_update_number_of_prunable_blocks().await,
0
);
assert_pruned_headers_event(&mut event_subscriber, 101, 101).await;
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([
146..=146,
155..=155,
166..=166,
175..=175,
186..=186,
196..=199,
202..=260
])
);
assert_eq!(
store.get_pruned_ranges().await.unwrap(),
new_block_ranges([1..=120, 147..=154, 167..=174, 187..=195])
);
daser_handle.expect_no_cmd().await;
pruner.stop();
pruner.join().await;
assert!(matches!(
event_subscriber.try_recv().unwrap_err(),
TryRecvError::Empty
));
}
async fn assert_pruned_headers_event(
subscriber: &mut EventSubscriber,
from_height: u64,
to_height: u64,
) {
let event = timeout(Duration::from_secs(1), subscriber.recv())
.await
.unwrap_or_else(|_| {
panic!("Expecting PrunedHeaders({from_height}-{to_height}) event but nothing is received")
})
.unwrap()
.event;
match event {
NodeEvent::PrunedHeaders {
from_height: from,
to_height: to,
} => {
assert_eq!((from, to), (from_height, to_height));
}
ev => panic!(
"Expecting PrunedHeaders({from_height}-{to_height}) event, but received: {ev:?}"
),
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
struct TestBlock(pub [u8; 8]);
impl From<u64> for TestBlock {
fn from(value: u64) -> Self {
TestBlock(value.to_le_bytes())
}
}
impl Block<64> for TestBlock {
fn cid(&self) -> Result<CidGeneric<64>, CidError> {
let mh = Multihash::wrap(TEST_MH_CODE, &self.0).unwrap();
Ok(CidGeneric::new_v1(TEST_CODEC, mh))
}
fn data(&self) -> &[u8] {
&self.0
}
}
}