Skip to main content

tycho_core/storage/
gc.rs

1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::time::{Duration, Instant};
6
7use parking_lot::Mutex;
8use rand::Rng;
9use scopeguard::defer;
10use serde::{Deserialize, Serialize};
11use tokio::sync::{Notify, watch};
12use tokio::task::AbortHandle;
13use tycho_block_util::block::BlockStuff;
14use tycho_types::models::BlockId;
15use tycho_util::metrics::HistogramGuard;
16
17use super::{
18    ArchivesGcConfig, BlockHandleStorage, BlockStorage, BlocksGcConfig, BlocksGcType,
19    CoreStorageConfig, NodeStateStorage, PersistentStateStorage, ShardStateStorage, StatesGcConfig,
20};
21
22#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
23pub enum ManualGcTrigger {
24    /// Triggers GC for the specified MC block seqno.
25    Exact(u32),
26    /// Triggers GC for the MC block seqno relative to the latest MC block.
27    Distance(u32),
28}
29
30pub(crate) struct CoreStorageGc {
31    tick_tx: TickTx,
32    last_key_block_seqno: AtomicU32,
33    diff_tail_cache: DiffTailCache,
34
35    archives_gc_trigger: ManualTriggerTx,
36    blocks_gc_trigger: ManualTriggerTx,
37    states_gc_trigger: ManualTriggerTx,
38
39    blocks_gc_handle: AbortHandle,
40    states_gc_handle: AbortHandle,
41    archive_gc_handle: AbortHandle,
42}
43
44impl Drop for CoreStorageGc {
45    fn drop(&mut self) {
46        self.blocks_gc_handle.abort();
47        self.states_gc_handle.abort();
48        self.archive_gc_handle.abort();
49    }
50}
51
52impl CoreStorageGc {
53    pub fn new(
54        node_state: &NodeStateStorage,
55        block_handles: Arc<BlockHandleStorage>,
56        blocks: Arc<BlockStorage>,
57        shard_states: Arc<ShardStateStorage>,
58        persistent_states: PersistentStateStorage,
59        config: &CoreStorageConfig,
60    ) -> Self {
61        let last_key_block_seqno = block_handles
62            .find_last_key_block()
63            .map_or(0, |handle| handle.id().seqno);
64
65        let diff_tail_cache = DiffTailCache::default();
66
67        let (tick_tx, tick_rx) = watch::channel(None::<Tick>);
68
69        let (archives_gc_trigger, archives_gc_rx) = watch::channel(None::<ManualGcTrigger>);
70        let archives_gc = tokio::spawn(Self::archives_gc(
71            tick_rx.clone(),
72            archives_gc_rx,
73            persistent_states,
74            blocks.clone(),
75            config.archives_gc,
76        ));
77
78        let (blocks_gc_trigger, blocks_gc_rx) = watch::channel(None::<ManualGcTrigger>);
79        let blocks_gc = tokio::spawn(Self::blocks_gc(
80            tick_rx.clone(),
81            blocks_gc_rx,
82            diff_tail_cache.clone(),
83            block_handles,
84            blocks,
85            config.blocks_gc,
86        ));
87
88        let (states_gc_trigger, states_gc_rx) = watch::channel(None::<ManualGcTrigger>);
89        let states_gc = tokio::spawn(Self::states_gc(
90            tick_rx,
91            states_gc_rx,
92            shard_states,
93            config.states_gc,
94        ));
95
96        let last_known_mc_block = node_state.load_last_mc_block_id();
97        if let Some(mc_block_id) = last_known_mc_block {
98            tracing::info!(
99                %mc_block_id,
100                "starting GC subscriber with the last known master block"
101            );
102            metrics::gauge!("tycho_core_last_mc_block_seqno").set(mc_block_id.seqno as f64);
103            tick_tx.send_replace(Some(Tick {
104                last_key_block_seqno,
105                mc_block_id,
106            }));
107        }
108
109        Self {
110            tick_tx,
111            last_key_block_seqno: AtomicU32::new(last_key_block_seqno),
112            diff_tail_cache,
113
114            archives_gc_trigger,
115            blocks_gc_trigger,
116            states_gc_trigger,
117
118            archive_gc_handle: archives_gc.abort_handle(),
119            blocks_gc_handle: blocks_gc.abort_handle(),
120            states_gc_handle: states_gc.abort_handle(),
121        }
122    }
123
124    pub fn trigger_archives_gc(&self, trigger: ManualGcTrigger) {
125        self.archives_gc_trigger.send_replace(Some(trigger));
126    }
127
128    pub fn trigger_blocks_gc(&self, trigger: ManualGcTrigger) {
129        self.blocks_gc_trigger.send_replace(Some(trigger));
130    }
131
132    pub fn trigger_states_gc(&self, trigger: ManualGcTrigger) {
133        self.states_gc_trigger.send_replace(Some(trigger));
134    }
135
136    pub fn handle_block(&self, is_key_block: bool, block: &BlockStuff) {
137        // Accumulate diff tail len in cache for each block.
138        self.diff_tail_cache.handle_block(block);
139
140        if !block.id().is_masterchain() {
141            return;
142        }
143
144        if is_key_block {
145            self.last_key_block_seqno
146                .store(block.id().seqno, Ordering::Relaxed);
147        }
148
149        self.tick_tx.send_replace(Some(Tick {
150            last_key_block_seqno: self.last_key_block_seqno.load(Ordering::Relaxed),
151            mc_block_id: *block.id(),
152        }));
153    }
154
155    #[tracing::instrument(skip_all)]
156    async fn archives_gc(
157        mut tick_rx: TickRx,
158        mut manual_rx: ManualTriggerRx,
159        persistent_states: PersistentStateStorage,
160        blocks: Arc<BlockStorage>,
161        config: Option<ArchivesGcConfig>,
162    ) {
163        let Some(config) = config else {
164            tracing::warn!("manager disabled");
165            return;
166        };
167        tracing::info!("manager started");
168        defer! {
169            tracing::info!("manager stopped");
170        }
171
172        let compute_offset = |gen_utime: u32| -> Duration {
173            let usable_at = std::time::UNIX_EPOCH
174                + Duration::from_secs(gen_utime as _)
175                + BlockStuff::BOOT_OFFSET;
176            (usable_at + config.persistent_state_offset)
177                .duration_since(std::time::SystemTime::now())
178                .unwrap_or_default()
179        };
180
181        let mut prev_pss_seqno = 0;
182        'outer: loop {
183            // Wait for a target seqno
184            let target_seqno = 'seqno: {
185                let wait_for_state_fut = async {
186                    loop {
187                        let mut new_state_found =
188                            pin!(persistent_states.oldest_known_handle_changed());
189
190                        let pss_handle = match persistent_states.load_oldest_known_handle() {
191                            Some(handle) if handle.id().seqno > prev_pss_seqno => handle,
192                            _ => {
193                                // Wait for the new state
194                                new_state_found.await;
195                                continue;
196                            }
197                        };
198
199                        // Wait until it's safe to remove the archives.
200                        let time_to_wait = compute_offset(pss_handle.gen_utime());
201                        tokio::select! {
202                            _ = tokio::time::sleep(time_to_wait) => break pss_handle.id().seqno,
203                            _ = &mut new_state_found => {},
204                        }
205                    }
206                };
207
208                tokio::select! {
209                    // Wait until we can remove archives before that state
210                    seqno = wait_for_state_fut => {
211                        prev_pss_seqno = seqno;
212                        break 'seqno seqno
213                    },
214                    // Or handle the manual trigger
215                    trigger = manual_rx.changed() => {
216                        if trigger.is_err() {
217                            break 'outer;
218                        }
219                    }
220                }
221
222                let (Some(tick), Some(trigger)) =
223                    (*tick_rx.borrow_and_update(), *manual_rx.borrow_and_update())
224                else {
225                    continue 'outer;
226                };
227
228                tick.adjust(trigger)
229            };
230
231            if let Err(e) = blocks.remove_outdated_archives(target_seqno).await {
232                tracing::error!("failed to remove outdated archives: {e:?}");
233            }
234        }
235    }
236
237    #[tracing::instrument(skip_all)]
238    async fn blocks_gc(
239        mut tick_rx: TickRx,
240        mut manual_rx: ManualTriggerRx,
241        diff_tail_cache: DiffTailCache,
242        block_handles: Arc<BlockHandleStorage>,
243        blocks: Arc<BlockStorage>,
244        config: Option<BlocksGcConfig>,
245    ) {
246        let Some(config) = config else {
247            tracing::warn!("manager disabled");
248            return;
249        };
250        tracing::info!("manager started");
251        defer! {
252            tracing::info!("manager stopped");
253        }
254
255        let mut last_tiggered_at = None::<Instant>;
256        let mut sleep_until = None::<Instant>;
257        let mut known_key_block_seqno = 0;
258
259        // Wait for the tick or manual trigger or exit the loop if any of the channels are closed
260        while let Some(source) = wait_with_sleep(&mut tick_rx, &mut manual_rx, sleep_until).await {
261            sleep_until = None;
262
263            let Some(tick) = *tick_rx.borrow_and_update() else {
264                continue;
265            };
266            tracing::debug!(?tick);
267
268            // NOTE: Track the last mc block seqno since we cannot rely on the broadcasted block.
269            // It may be updated faster than the iteration of the GC manager.
270            let has_new_key_block = tick.last_key_block_seqno > known_key_block_seqno;
271            known_key_block_seqno = tick.last_key_block_seqno;
272
273            let target_seqno = match (source, config.ty) {
274                (GcSource::Manual, _) => {
275                    let Some(trigger) = *manual_rx.borrow_and_update() else {
276                        continue;
277                    };
278
279                    // Compute the target masterchain block seqno
280                    let target_seqno = tick.adjust(trigger);
281                    if target_seqno == 0 {
282                        continue;
283                    }
284
285                    // Don't debounce manual triggers, but update the last trigger time
286                    last_tiggered_at = Some(Instant::now());
287                    target_seqno
288                }
289                (
290                    GcSource::Schedule,
291                    BlocksGcType::BeforeSafeDistance {
292                        safe_distance,
293                        min_interval,
294                    },
295                ) => {
296                    // TODO: Must be in sync with the largest possible archive size (in mc blocks).
297                    const MIN_SAFE_DISTANCE: u32 = 100;
298
299                    let Some(tail_len) = diff_tail_cache
300                        .wait_for_tail_len(tick.mc_block_id.seqno)
301                        .await
302                    else {
303                        tracing::warn!(
304                            seqno = ?tick.mc_block_id.seqno ,
305                            "tail diff not found in cache, skipping GC. This is expected during startup."
306                        );
307                        continue;
308                    };
309
310                    metrics::gauge!("tycho_core_blocks_gc_tail_len").set(tail_len);
311
312                    tracing::info!(tail_len, "found longest diffs tail");
313
314                    let safe_distance = [safe_distance, MIN_SAFE_DISTANCE, tail_len + 1]
315                        .into_iter()
316                        .max()
317                        .unwrap();
318
319                    // Compute the target masterchain block seqno
320                    let target_seqno = match tick.mc_block_id.seqno.checked_sub(safe_distance) {
321                        // Skip GC for the first N blocks
322                        None | Some(0) => continue,
323                        Some(seqno) => seqno,
324                    };
325
326                    // Debounce GC
327                    if let Some(last) = last_tiggered_at
328                        && last.elapsed() < min_interval
329                    {
330                        // Sleep until the desired interval
331                        // AND continue to wait for the next trigger
332                        sleep_until = Some(last + min_interval);
333                        continue;
334                    }
335
336                    // NOTE: You should update this in other branches as well,
337                    // if we want to debounce other types of GC.
338                    last_tiggered_at = Some(Instant::now());
339                    target_seqno
340                }
341                (GcSource::Schedule, BlocksGcType::BeforePreviousKeyBlock) => {
342                    if !has_new_key_block {
343                        continue;
344                    }
345
346                    // Find a key block before the last key block from the trigger
347                    let target_seqno = tick.last_key_block_seqno;
348                    match block_handles.find_prev_key_block(target_seqno) {
349                        Some(handle) => handle.id().seqno,
350                        None => {
351                            tracing::warn!(target_seqno, "previous key block not found");
352                            continue;
353                        }
354                    }
355                }
356                (GcSource::Schedule, BlocksGcType::BeforePreviousPersistentState) => {
357                    if !has_new_key_block {
358                        continue;
359                    }
360
361                    // Find a persistent block before the last key block from the trigger
362                    let target_seqno = tick.last_key_block_seqno;
363                    match block_handles.find_prev_persistent_key_block(target_seqno) {
364                        Some(handle) => handle.id().seqno,
365                        None => {
366                            tracing::warn!(target_seqno, "previous persistent block not found");
367                            continue;
368                        }
369                    }
370                }
371            };
372
373            metrics::gauge!("tycho_core_mc_blocks_gc_lag")
374                .set(tick.mc_block_id.seqno.saturating_sub(target_seqno));
375
376            if let Err(e) = blocks
377                .remove_outdated_blocks(target_seqno, config.max_blocks_per_batch)
378                .await
379            {
380                tracing::error!("failed to remove outdated blocks: {e:?}");
381            }
382
383            // Clean up cache entries for removed blocks.
384            diff_tail_cache.cleanup(target_seqno);
385        }
386    }
387
388    #[tracing::instrument(skip_all)]
389    async fn states_gc(
390        mut tick_rx: TickRx,
391        mut manual_rx: ManualTriggerRx,
392        shard_states: Arc<ShardStateStorage>,
393        config: Option<StatesGcConfig>,
394    ) {
395        let Some(config) = config else {
396            tracing::warn!("manager disabled");
397            return;
398        };
399        tracing::info!(?config, "manager started");
400        defer! {
401            tracing::info!("manager stopped");
402        }
403
404        let mut random_offset = config
405            .random_offset
406            .then(|| rand::rng().random_range(Duration::ZERO..config.interval));
407        let mut last_triggered_at = None::<Instant>;
408        let mut sleep_until = None::<Instant>;
409
410        while let Some(source) = wait_with_sleep(&mut tick_rx, &mut manual_rx, sleep_until).await {
411            sleep_until = None;
412            if let GcSource::Manual = source {
413                tracing::info!("manual states gc triggered");
414            }
415
416            let Some(tick) = *tick_rx.borrow_and_update() else {
417                tracing::debug!("no tick available, continuing");
418                continue;
419            };
420            tracing::debug!(?tick, "states gc tick");
421
422            let now = Instant::now();
423
424            // Compute the target masterchain block seqno
425            let target_seqno = match source {
426                // NOTE: Interval is ignored for manual triggers
427                GcSource::Manual => {
428                    let Some(trigger) = *manual_rx.borrow_and_update() else {
429                        tracing::debug!("no manual trigger available, continuing");
430                        continue;
431                    };
432                    tracing::info!(seqno = tick.adjust(trigger), "manual GC triggered");
433                    tick.adjust(trigger)
434                }
435                GcSource::Schedule => {
436                    // Make sure to sleep between the ticks
437                    if let Some(last) = last_triggered_at {
438                        let next_gc = last + config.interval;
439                        if next_gc > now {
440                            sleep_until = Some(next_gc);
441                            let sleep_duration = next_gc - now;
442                            tracing::debug!(
443                                duration = sleep_duration.as_secs_f64(),
444                                "sleeping until next GC"
445                            );
446                            continue;
447                        }
448                    } else if let Some(offset) = random_offset.take() {
449                        sleep_until = Some(now + offset);
450                        let sleep_duration = offset;
451                        tracing::debug!(
452                            duration = sleep_duration.as_secs_f64(),
453                            "sleeping with random offset"
454                        );
455                        continue;
456                    }
457
458                    tracing::info!(seqno = tick.mc_block_id.seqno, "scheduled GC");
459                    tick.mc_block_id.seqno
460                }
461            };
462
463            if target_seqno == 0 {
464                tracing::warn!("target seqno is 0, skipping GC");
465                continue;
466            }
467
468            last_triggered_at = Some(now);
469            tracing::info!("starting GC for target seqno: {}", target_seqno);
470
471            let hist = HistogramGuard::begin("tycho_gc_states_time");
472
473            if let Err(e) = shard_states.remove_outdated_states(target_seqno).await {
474                tracing::error!("failed to remove outdated states: {e:?}");
475            }
476
477            let took = hist.finish();
478            tracing::info!(
479                duration = took.as_secs_f64(),
480                "completed GC for target seqno: {}",
481                target_seqno
482            );
483        }
484    }
485}
486
487#[derive(Debug, Clone)]
488enum GcSource {
489    Schedule,
490    Manual,
491}
492
493#[derive(Debug, Clone, Copy)]
494struct Tick {
495    pub mc_block_id: BlockId,
496    pub last_key_block_seqno: u32,
497}
498
499impl Tick {
500    fn adjust(&self, trigger: ManualGcTrigger) -> u32 {
501        match trigger {
502            ManualGcTrigger::Exact(seqno) => seqno,
503            ManualGcTrigger::Distance(distance) => self.mc_block_id.seqno.saturating_sub(distance),
504        }
505    }
506}
507
508type TickTx = watch::Sender<Option<Tick>>;
509type TickRx = watch::Receiver<Option<Tick>>;
510
511type ManualTriggerTx = watch::Sender<Option<ManualGcTrigger>>;
512type ManualTriggerRx = watch::Receiver<Option<ManualGcTrigger>>;
513
514async fn wait_with_sleep(
515    tick_rx: &mut TickRx,
516    manual_rx: &mut ManualTriggerRx,
517    sleep_until: Option<Instant>,
518) -> Option<GcSource> {
519    use futures_util::future::Either;
520
521    let fut = match sleep_until {
522        // Ignore all ticks if we need to sleep
523        Some(deadline) => Either::Left(async move {
524            tokio::time::sleep_until(deadline.into()).await;
525            Some(GcSource::Schedule)
526        }),
527        // Wait for the tick otherwise
528        None => Either::Right(async {
529            let res = tick_rx.changed().await;
530            res.is_ok().then_some(GcSource::Schedule)
531        }),
532    };
533
534    tokio::select! {
535        res = fut => res,
536        trigger = manual_rx.changed() => {
537            trigger.is_ok().then_some(GcSource::Manual)
538        },
539    }
540}
541
542// === Diff Tail Cache ===
543
544#[derive(Default, Clone)]
545struct DiffTailCache {
546    inner: Arc<DiffTailCacheInner>,
547}
548
549#[derive(Default)]
550struct DiffTailCacheInner {
551    new_block_finalized: Notify,
552    latest_finalized_seqno: AtomicU32,
553    max_tail_len: AtomicU32,
554    finalized: Mutex<BTreeMap<u32, DiffTailCacheEntry>>,
555}
556
557impl DiffTailCache {
558    /// Caches block tail len at each known mc seqno.
559    ///
560    /// NOTE: Must be called for each block in such an order that
561    /// the masterchain block is handled right after all the shard
562    /// blocks it referenced.
563    fn handle_block(&self, block: &BlockStuff) {
564        const OFFSET: u32 = 1;
565
566        let this = self.inner.as_ref();
567
568        let seqno = block.id().seqno;
569        let tail_len = block.as_ref().out_msg_queue_updates.tail_len;
570
571        if block.id().is_masterchain() {
572            // Reset `max_tail_len`.
573            let acc = this.max_tail_len.swap(0, Ordering::AcqRel);
574
575            {
576                let mut finalized = this.finalized.lock();
577
578                let sc_tail_len = match acc.checked_sub(OFFSET) {
579                    // There were some shard blocks so the tail len has changed.
580                    Some(tail_len) => tail_len,
581                    // There were no shard blocks so we must reuse previous sc tail len.
582                    //
583                    // NOTE: This is quite a strange situation since non-empty internal
584                    // messages queue will force collating shard blocks and there might
585                    // not be such situation. But it's better to be safe just in case.
586                    None => match finalized.get(&seqno.saturating_sub(1)) {
587                        Some(prev) => prev.sc_tail_len,
588                        None => 1,
589                    },
590                };
591
592                let prev = finalized.insert(seqno, DiffTailCacheEntry {
593                    mc_tail_len: tail_len,
594                    sc_tail_len,
595                });
596                debug_assert!(prev.is_none(), "same block handled twice at runtime");
597            }
598
599            this.latest_finalized_seqno.store(seqno, Ordering::Release);
600            this.new_block_finalized.notify_waiters();
601        } else {
602            // Accumulate the maximum tail len at the block.
603            // NOTE: We use `OFFSET` here to check for "no shard blocks"
604            // situation later.
605            this.max_tail_len
606                .fetch_max(OFFSET + tail_len, Ordering::Release);
607        }
608    }
609
610    /// Waits until the cache has processed any block including
611    /// or after the specified seqno.
612    ///
613    /// Returns a tail len for that block or `None` if it was
614    /// already cleared.
615    async fn wait_for_tail_len(&self, mc_seqno: u32) -> Option<u32> {
616        let this = self.inner.as_ref();
617
618        // Wait until the specified mc seqno is reached.
619        loop {
620            let updated = this.new_block_finalized.notified();
621            if this.latest_finalized_seqno.load(Ordering::Acquire) >= mc_seqno {
622                break;
623            }
624            updated.await;
625        }
626
627        // At this point `finalized` map should contain the range
628        // at least upto the specified mc seqno.
629        this.finalized
630            .lock()
631            .get(&mc_seqno)
632            .map(DiffTailCacheEntry::compute_max)
633    }
634
635    /// Cleanup the finalized range up until the specified seqno
636    /// (including it).
637    fn cleanup(&self, upto_mc_seqno: u32) {
638        let mut finalized = self.inner.finalized.lock();
639        if let Some(lower_bound) = upto_mc_seqno.checked_add(1) {
640            let rest = finalized.split_off(&lower_bound);
641            *finalized = rest;
642        } else {
643            finalized.clear();
644        }
645    }
646}
647
648#[derive(Clone, Copy)]
649struct DiffTailCacheEntry {
650    mc_tail_len: u32,
651    sc_tail_len: u32,
652}
653
654impl DiffTailCacheEntry {
655    fn compute_max(&self) -> u32 {
656        self.mc_tail_len.max(self.sc_tail_len)
657    }
658}
659
660#[cfg(test)]
661mod tests {
662    use std::pin::pin;
663
664    use futures_util::future::poll_immediate;
665    use tycho_types::models::ShardIdent;
666
667    use super::*;
668
669    #[tokio::test]
670    async fn test_tail_cache_basic_flow() {
671        let cache = DiffTailCache::default();
672
673        // Should not find any block after the startup.
674        assert_eq!(cache.wait_for_tail_len(0).await, None);
675
676        // === Masterchain block 1 ===
677
678        let mut wait_tail_len = pin!(cache.wait_for_tail_len(1));
679
680        // No block is finalized yet for this seqno.
681        assert_eq!(poll_immediate(&mut wait_tail_len).await, None);
682
683        // Handle shard blocks with different tail lengths
684        for (sc_seqno, tail_len) in [(100, 20), (101, 15), (102, 16)] {
685            let sc_block = BlockStuff::new_with(ShardIdent::BASECHAIN, sc_seqno, |block| {
686                block.out_msg_queue_updates.tail_len = tail_len;
687            });
688            cache.handle_block(&sc_block);
689        }
690
691        // Still no block until the masterchain is processed.
692        assert_eq!(poll_immediate(&mut wait_tail_len).await, None);
693
694        // Handle master block after all referenced shard blocks.
695        cache.handle_block(&BlockStuff::new_with(ShardIdent::MASTERCHAIN, 1, |block| {
696            block.out_msg_queue_updates.tail_len = 10;
697        }));
698
699        // Only now tail len should be available.
700        assert_eq!(wait_tail_len.await, Some(20));
701
702        // === Masterchain block 2 ===
703
704        let mut wait_tail_len = pin!(cache.wait_for_tail_len(2));
705
706        // No block is finalized yet for this seqno.
707        assert_eq!(poll_immediate(&mut wait_tail_len).await, None);
708
709        // No shard blocks here.
710        cache.handle_block(&BlockStuff::new_with(ShardIdent::MASTERCHAIN, 2, |block| {
711            block.out_msg_queue_updates.tail_len = 5;
712        }));
713
714        // Now the wait_task should complete with the finalized value (max from previous shards).
715        assert_eq!(wait_tail_len.await, Some(20));
716
717        // Also check directly
718        assert_eq!(cache.wait_for_tail_len(1).await, Some(20));
719        assert_eq!(cache.wait_for_tail_len(2).await, Some(20));
720
721        // === Masterchain block 3 ===
722
723        // Now it has shard block.
724        cache.handle_block(&BlockStuff::new_with(ShardIdent::BASECHAIN, 103, |block| {
725            block.out_msg_queue_updates.tail_len = 1;
726        }));
727
728        let mut wait_tail_len = pin!(cache.wait_for_tail_len(3));
729
730        // No block is finalized yet for this seqno (even after we started processing its shard blocks).
731        assert_eq!(poll_immediate(&mut wait_tail_len).await, None);
732
733        // Master with small tail.
734        cache.handle_block(&BlockStuff::new_with(ShardIdent::MASTERCHAIN, 3, |block| {
735            block.out_msg_queue_updates.tail_len = 2;
736        }));
737
738        assert_eq!(wait_tail_len.await, Some(2));
739
740        // Also check directly
741        assert_eq!(cache.wait_for_tail_len(3).await, Some(2));
742
743        // === Cleanup ===
744
745        cache.cleanup(1);
746
747        // Already cleared.
748        assert_eq!(poll_immediate(cache.wait_for_tail_len(0)).await, Some(None));
749        assert_eq!(poll_immediate(cache.wait_for_tail_len(1)).await, Some(None));
750        // Should still exist.
751        assert_eq!(
752            poll_immediate(cache.wait_for_tail_len(2)).await,
753            Some(Some(20))
754        );
755        assert_eq!(
756            poll_immediate(cache.wait_for_tail_len(3)).await,
757            Some(Some(2))
758        );
759        // Not yet exists.
760        assert_eq!(poll_immediate(cache.wait_for_tail_len(4)).await, None);
761
762        // Cleanup upto the future.
763        cache.cleanup(10);
764
765        // Not yet exists (since the latest processed block has not changed).
766        assert_eq!(poll_immediate(cache.wait_for_tail_len(4)).await, None);
767    }
768}