Skip to main content

tycho_core/storage/block/
mod.rs

1use std::fs::File;
2use std::io::BufReader;
3use std::num::NonZeroU32;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use moka::policy::EvictionPolicy;
10use tokio::sync::broadcast;
11use tycho_block_util::archive::ArchiveData;
12use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
13use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
14use tycho_types::models::*;
15use tycho_util::FastHasherState;
16use tycho_util::metrics::HistogramGuard;
17use tycho_util::sync::{CancellationFlag, rayon_run};
18
19pub use self::package_entry::{PackageEntryKey, PartialBlockId};
20use super::util::SlotSubscriptions;
21use super::{
22    BlockConnectionStorage, BlockFlags, BlockHandle, BlockHandleStorage, BlocksCacheConfig, CoreDb,
23    HandleCreationStatus, NewBlockMeta,
24};
25
26pub(crate) mod blobs;
27mod package_entry;
28
29pub use blobs::{ArchiveId, BlockGcStats, OpenStats};
30
31use crate::storage::block::blobs::DEFAULT_CHUNK_SIZE;
32use crate::storage::config::BlobDbConfig;
33
34const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
35const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
36
37pub struct BlockStorage {
38    blocks_cache: BlocksCache,
39    block_handle_storage: Arc<BlockHandleStorage>,
40    block_connection_storage: Arc<BlockConnectionStorage>,
41    block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
42    store_block_data: tokio::sync::RwLock<()>,
43    pub(crate) blob_storage: Arc<blobs::BlobStorage>,
44}
45
46impl BlockStorage {
47    pub const DEFAULT_BLOB_CHUNK_SIZE: NonZeroU32 =
48        NonZeroU32::new(DEFAULT_CHUNK_SIZE as u32).unwrap();
49    // === Init stuff ===
50
51    pub async fn new(
52        db: CoreDb,
53        config: BlockStorageConfig,
54        block_handle_storage: Arc<BlockHandleStorage>,
55        block_connection_storage: Arc<BlockConnectionStorage>,
56    ) -> Result<Self> {
57        fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
58            const BLOCK_STUFF_OVERHEAD: u32 = 1024; // 1 KB
59
60            size_of::<BlockId>() as u32
61                + BLOCK_STUFF_OVERHEAD
62                + value.data_size().try_into().unwrap_or(u32::MAX)
63        }
64
65        let blocks_cache = moka::sync::Cache::builder()
66            .eviction_policy(EvictionPolicy::lru())
67            .time_to_live(config.blocks_cache.ttl)
68            .max_capacity(config.blocks_cache.size.as_u64())
69            .weigher(weigher)
70            .build_with_hasher(Default::default());
71
72        let blob_storage = blobs::BlobStorage::new(
73            db,
74            block_handle_storage.clone(),
75            &config.blobs_root,
76            config.blob_db_config.pre_create_cas_tree,
77        )
78        .await
79        .map(Arc::new)?;
80
81        Ok(Self {
82            blocks_cache,
83            block_handle_storage,
84            block_connection_storage,
85            block_subscriptions: Default::default(),
86            store_block_data: Default::default(),
87            blob_storage,
88        })
89    }
90
91    pub fn open_stats(&self) -> &OpenStats {
92        self.blob_storage.open_stats()
93    }
94
95    // === Subscription stuff ===
96
97    pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
98        let block_handle_storage = &self.block_handle_storage;
99
100        // Take an exclusive lock to prevent any block data from being stored
101        let guard = self.store_block_data.write().await;
102
103        // Try to load the block data
104        if let Some(handle) = block_handle_storage.load_handle(block_id)
105            && handle.has_data()
106        {
107            drop(guard);
108            let block = self.load_block_data(&handle).await?;
109            return Ok(BlockStuffAug::loaded(block));
110        }
111
112        // Add subscription for the block and drop the lock
113        let rx = self.block_subscriptions.subscribe(block_id);
114
115        // NOTE: Guard must be dropped before awaiting
116        drop(guard);
117
118        let block = rx.await;
119        Ok(BlockStuffAug::loaded(block))
120    }
121
122    pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
123        let block_id = self
124            .block_connection_storage
125            .wait_for_next1(prev_block_id)
126            .await;
127
128        self.wait_for_block(&block_id).await
129    }
130
131    // === Block data ===
132
133    pub async fn store_block_data(
134        &self,
135        block: &BlockStuff,
136        archive_data: &ArchiveData,
137        meta_data: NewBlockMeta,
138    ) -> Result<StoreBlockResult> {
139        // NOTE: Any amount of blocks can be stored concurrently,
140        // but the subscription lock can be acquired only while
141        // no block data is being stored.
142        let guard = self.store_block_data.read().await;
143
144        let block_id = block.id();
145        let (handle, status) = self
146            .block_handle_storage
147            .create_or_load_handle(block_id, meta_data);
148
149        let archive_id = PackageEntryKey::block(block_id);
150        let mut updated = false;
151        if !handle.has_data() {
152            let data = archive_data.clone_new_archive_data()?;
153            metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
154
155            let lock = handle.block_data_lock().write().await;
156            if !handle.has_data() {
157                self.blob_storage.add_data(&archive_id, data, &lock).await?;
158                if handle.meta().add_flags(BlockFlags::HAS_DATA) {
159                    self.block_handle_storage.store_handle(&handle, false);
160                    updated = true;
161                }
162            }
163        }
164
165        // TODO: only notify subscribers if `updated`?
166        self.block_subscriptions.notify(block_id, block);
167
168        drop(guard);
169
170        // Update block cache
171        self.blocks_cache.insert(*block_id, block.clone());
172
173        Ok(StoreBlockResult {
174            handle,
175            updated,
176            new: status == HandleCreationStatus::Created,
177        })
178    }
179
180    pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
181        metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
182
183        const BIG_DATA_THRESHOLD: usize = 1 << 20; // 1 MB
184
185        let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
186
187        anyhow::ensure!(
188            handle.has_data(),
189            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
190        );
191
192        // Fast path - lookup in cache
193        if let Some(block) = self.blocks_cache.get(handle.id()) {
194            metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
195            return Ok(block.clone());
196        }
197
198        let data = self
199            .blob_storage
200            .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
201            .await?;
202
203        if data.len() < BIG_DATA_THRESHOLD {
204            BlockStuff::deserialize(handle.id(), data.as_ref())
205        } else {
206            let handle = handle.clone();
207            rayon_run(move || BlockStuff::deserialize(handle.id(), data.as_ref())).await
208        }
209    }
210
211    pub fn blocking_load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
212        metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
213
214        let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
215
216        anyhow::ensure!(
217            handle.has_data(),
218            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
219        );
220
221        // Fast path - lookup in cache
222        if let Some(block) = self.blocks_cache.get(handle.id()) {
223            metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
224            return Ok(block.clone());
225        }
226
227        let data = self
228            .blob_storage
229            .blocking_get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))?;
230
231        BlockStuff::deserialize(handle.id(), data.as_ref())
232    }
233
234    pub async fn load_block_data_decompressed(&self, handle: &BlockHandle) -> Result<Bytes> {
235        anyhow::ensure!(
236            handle.has_data(),
237            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
238        );
239
240        self.blob_storage
241            .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
242            .await
243    }
244
245    pub async fn list_blocks(
246        &self,
247        continuation: Option<BlockIdShort>,
248    ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
249        self.blob_storage.list_blocks(continuation).await
250    }
251
252    pub fn list_archive_ids(&self) -> Vec<u32> {
253        self.blob_storage.list_archive_ids()
254    }
255
256    pub async fn load_block_data_range(
257        &self,
258        handle: &BlockHandle,
259        offset: u64,
260        length: u64,
261    ) -> Result<Option<Bytes>> {
262        anyhow::ensure!(
263            handle.has_data(),
264            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
265        );
266        self.blob_storage
267            .get_block_data_range(handle, offset, length)
268            .await
269    }
270
271    pub fn get_compressed_block_data_size(&self, handle: &BlockHandle) -> Result<Option<u64>> {
272        let key = PackageEntryKey::block(handle.id());
273        Ok(self.blob_storage.blocks().get_size(&key)?)
274    }
275
276    pub async fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
277        self.blob_storage.find_mc_block_data(mc_seqno).await
278    }
279
280    // === Block proof ===
281
282    pub async fn store_block_proof(
283        &self,
284        proof: &BlockProofStuffAug,
285        handle: MaybeExistingHandle,
286    ) -> Result<StoreBlockResult> {
287        let block_id = proof.id();
288        if let MaybeExistingHandle::Existing(handle) = &handle
289            && handle.id() != block_id
290        {
291            anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
292                expected: block_id.as_short_id(),
293                actual: handle.id().as_short_id(),
294            });
295        }
296
297        let (handle, status) = match handle {
298            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
299            MaybeExistingHandle::New(meta_data) => self
300                .block_handle_storage
301                .create_or_load_handle(block_id, meta_data),
302        };
303
304        let mut updated = false;
305        let archive_id = PackageEntryKey::proof(block_id);
306        if !handle.has_proof() {
307            let data = proof.clone_new_archive_data()?;
308
309            let lock = handle.proof_data_lock().write().await;
310            if !handle.has_proof() {
311                self.blob_storage.add_data(&archive_id, data, &lock).await?;
312                if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
313                    self.block_handle_storage.store_handle(&handle, false);
314                    updated = true;
315                }
316            }
317        }
318
319        Ok(StoreBlockResult {
320            handle,
321            updated,
322            new: status == HandleCreationStatus::Created,
323        })
324    }
325
326    pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
327        let raw_proof = self.load_block_proof_raw(handle).await?;
328        BlockProofStuff::deserialize(handle.id(), &raw_proof)
329    }
330
331    pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
332        anyhow::ensure!(
333            handle.has_proof(),
334            BlockStorageError::BlockProofNotFound(handle.id().as_short_id())
335        );
336
337        self.blob_storage
338            .get_block_data_decompressed(handle, &PackageEntryKey::proof(handle.id()))
339            .await
340    }
341
342    // === Queue diff ===
343
344    pub async fn store_queue_diff(
345        &self,
346        queue_diff: &QueueDiffStuffAug,
347        handle: MaybeExistingHandle,
348    ) -> Result<StoreBlockResult> {
349        let block_id = queue_diff.block_id();
350        if let MaybeExistingHandle::Existing(handle) = &handle
351            && handle.id() != block_id
352        {
353            anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
354                expected: block_id.as_short_id(),
355                actual: handle.id().as_short_id(),
356            });
357        }
358
359        let (handle, status) = match handle {
360            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
361            MaybeExistingHandle::New(meta_data) => self
362                .block_handle_storage
363                .create_or_load_handle(block_id, meta_data),
364        };
365
366        let mut updated = false;
367        let archive_id = PackageEntryKey::queue_diff(block_id);
368        if !handle.has_queue_diff() {
369            let data = queue_diff.clone_new_archive_data()?;
370
371            let lock = handle.queue_diff_data_lock().write().await;
372            if !handle.has_queue_diff() {
373                self.blob_storage.add_data(&archive_id, data, &lock).await?;
374                if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
375                    self.block_handle_storage.store_handle(&handle, false);
376                    updated = true;
377                }
378            }
379        }
380
381        Ok(StoreBlockResult {
382            handle,
383            updated,
384            new: status == HandleCreationStatus::Created,
385        })
386    }
387
388    pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
389        const BIG_DATA_THRESHOLD: usize = 100 << 10; // 100 kib
390
391        let data = self.load_queue_diff_raw(handle).await?;
392
393        if data.len() < BIG_DATA_THRESHOLD {
394            QueueDiffStuff::deserialize(handle.id(), &data)
395        } else {
396            let handle = handle.clone();
397            rayon_run(move || QueueDiffStuff::deserialize(handle.id(), data.as_ref())).await
398        }
399    }
400
401    pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
402        anyhow::ensure!(
403            handle.has_queue_diff(),
404            BlockStorageError::QueueDiffNotFound(handle.id().as_short_id())
405        );
406
407        self.blob_storage
408            .get_block_data_decompressed(handle, &PackageEntryKey::queue_diff(handle.id()))
409            .await
410    }
411
412    // === Archive stuff ===
413
414    pub async fn move_into_archive(
415        &self,
416        handle: &BlockHandle,
417        mc_is_key_block: bool,
418    ) -> Result<()> {
419        self.blob_storage
420            .move_into_archive(handle, mc_is_key_block)
421            .await
422    }
423
424    pub async fn wait_for_archive_commit(&self) -> Result<()> {
425        self.blob_storage.wait_for_archive_commit().await
426    }
427
428    /// Returns a corresponding archive id for the specified masterchain seqno.
429    pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
430        self.blob_storage.get_archive_id(mc_seqno)
431    }
432
433    pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
434        self.blob_storage.get_archive_size(id)
435    }
436
437    pub fn get_archive_reader(&self, id: u32) -> Result<Option<BufReader<File>>> {
438        self.blob_storage.get_archive_reader(id)
439    }
440
441    /// Get the complete archive (compressed).
442    ///
443    /// Must not be used for anything other than tests.
444    #[cfg(any(test, feature = "test"))]
445    pub async fn get_archive_compressed_full(&self, id: u32) -> Result<Option<Bytes>> {
446        self.blob_storage.get_archive_full(id).await
447    }
448
449    /// Get a chunk of the archive at the specified offset.
450    pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<Bytes> {
451        self.blob_storage.get_archive_chunk(id, offset).await
452    }
453
454    pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
455        self.blob_storage.subscribe_to_archive_ids()
456    }
457
458    pub async fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
459        self.blob_storage.remove_outdated_archives(until_id).await
460    }
461
462    pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
463        self.blob_storage.estimate_archive_id(mc_seqno)
464    }
465
466    // === GC stuff ===
467
468    #[tracing::instrument(skip(self, max_blocks_per_batch))]
469    pub async fn remove_outdated_blocks(
470        &self,
471        mc_seqno: u32,
472        max_blocks_per_batch: Option<usize>,
473    ) -> Result<()> {
474        if mc_seqno == 0 {
475            return Ok(());
476        }
477
478        tracing::info!("started blocks GC for mc_block {mc_seqno}");
479
480        let block = self
481            .blob_storage
482            .find_mc_block_data(mc_seqno)
483            .await
484            .context("failed to load target block data")?
485            .context("target block not found")?;
486
487        let shard_heights = {
488            let extra = block.extra.load()?;
489            let custom = extra.custom.context("mc block extra not found")?.load()?;
490            custom
491                .shards
492                .latest_blocks()
493                .map(|id| id.map(|id| (id.shard, id.seqno)))
494                .collect::<Result<_, tycho_types::error::Error>>()?
495        };
496
497        // Remove all expired entries
498        let total_cached_handles_removed = self
499            .block_handle_storage
500            .gc_handles_cache(mc_seqno, &shard_heights);
501
502        let cancelled = CancellationFlag::new();
503        scopeguard::defer! {
504            cancelled.cancel();
505        }
506
507        // Since remove_blocks needs blob_storage but we can't move self into the closure,
508        // we'll call it directly without rayon_run for now
509        let BlockGcStats {
510            mc_blocks_removed,
511            total_blocks_removed,
512        } = tokio::task::spawn_blocking({
513            let blob_storage = self.blob_storage.clone();
514            let cancelled = cancelled.clone();
515            move || {
516                blobs::remove_blocks(
517                    &blob_storage,
518                    max_blocks_per_batch,
519                    mc_seqno,
520                    shard_heights,
521                    Some(&cancelled),
522                )
523            }
524        })
525        .await??;
526
527        tracing::info!(
528            total_cached_handles_removed,
529            mc_blocks_removed,
530            total_blocks_removed,
531            "finished blocks GC"
532        );
533        Ok(())
534    }
535
536    // === Internal ===
537
538    #[cfg(any(test, feature = "test"))]
539    pub fn blob_storage(&self) -> &blobs::BlobStorage {
540        &self.blob_storage
541    }
542}
543
544#[derive(Clone)]
545pub enum MaybeExistingHandle {
546    Existing(BlockHandle),
547    New(NewBlockMeta),
548}
549
550impl From<BlockHandle> for MaybeExistingHandle {
551    fn from(handle: BlockHandle) -> Self {
552        Self::Existing(handle)
553    }
554}
555
556impl From<NewBlockMeta> for MaybeExistingHandle {
557    fn from(meta_data: NewBlockMeta) -> Self {
558        Self::New(meta_data)
559    }
560}
561
562pub struct StoreBlockResult {
563    pub handle: BlockHandle,
564    pub updated: bool,
565    pub new: bool,
566}
567
568pub struct BlockStorageConfig {
569    pub blocks_cache: BlocksCacheConfig,
570    pub blobs_root: PathBuf,
571    pub blob_db_config: BlobDbConfig,
572}
573
574type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
575
576#[derive(thiserror::Error, Debug)]
577enum BlockStorageError {
578    #[error("Block data not found for block: {0}")]
579    BlockDataNotFound(BlockIdShort),
580    #[error("Block proof not found for block: {0}")]
581    BlockProofNotFound(BlockIdShort),
582    #[error("Queue diff not found for block: {0}")]
583    QueueDiffNotFound(BlockIdShort),
584    #[error("Block handle id mismatch: expected {expected}, got {actual}")]
585    BlockHandleIdMismatch {
586        expected: BlockIdShort,
587        actual: BlockIdShort,
588    },
589}
590
591#[cfg(test)]
592mod tests {
593    use std::pin::pin;
594
595    use blobs::*;
596    use tycho_block_util::archive::{ArchiveEntryType, WithArchiveData};
597    use tycho_block_util::block::ShardHeights;
598    use tycho_storage::StorageContext;
599    use tycho_types::prelude::*;
600    use tycho_util::FastHashMap;
601    use tycho_util::futures::JoinTask;
602
603    use super::*;
604    use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
605
606    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
607    async fn parallel_store_data() -> Result<()> {
608        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
609        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
610
611        let shard = ShardIdent::MASTERCHAIN;
612        for seqno in 0..1000 {
613            let block = BlockStuff::new_empty(shard, seqno);
614            let block = {
615                let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
616                WithArchiveData::new(block, data)
617            };
618            let block_id = block.id();
619
620            let proof = BlockProofStuff::new_empty(block_id);
621            let proof = {
622                let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
623                WithArchiveData::new(proof, data)
624            };
625
626            let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
627                .serialize()
628                .build(block_id);
629
630            let block_meta = NewBlockMeta {
631                is_key_block: shard.is_masterchain() && seqno == 0,
632                gen_utime: 0,
633                ref_by_mc_seqno: seqno,
634            };
635
636            let store_block_data = || {
637                let storage = storage.clone();
638                JoinTask::new(async move {
639                    let res = storage
640                        .block_storage()
641                        .store_block_data(&block, &block.archive_data, block_meta)
642                        .await?;
643
644                    Ok::<_, anyhow::Error>(res.handle)
645                })
646            };
647
648            let store_proof_and_queue = || {
649                let storage = storage.clone();
650                let proof = proof.clone();
651                let queue_diff = queue_diff.clone();
652                JoinTask::new(async move {
653                    if rand::random::<bool>() {
654                        tokio::task::yield_now().await;
655                    }
656
657                    let res = storage
658                        .block_storage()
659                        .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
660                        .await?;
661
662                    if rand::random::<bool>() {
663                        tokio::task::yield_now().await;
664                    }
665
666                    let res = storage
667                        .block_storage()
668                        .store_queue_diff(&queue_diff, res.handle.into())
669                        .await?;
670
671                    if rand::random::<bool>() {
672                        tokio::task::yield_now().await;
673                    }
674
675                    Ok::<_, anyhow::Error>(res.handle)
676                })
677            };
678
679            let (data_res, proof_and_queue_res) = async move {
680                let data_fut = pin!(store_block_data());
681                let proof_and_queue_fut = pin!(async {
682                    tokio::select! {
683                        left = store_proof_and_queue() => left,
684                        right = store_proof_and_queue() => right,
685                    }
686                });
687
688                let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
689
690                Ok::<_, anyhow::Error>((data?, other?))
691            }
692            .await?;
693
694            assert!(std::ptr::addr_eq(
695                arc_swap::RefCnt::as_ptr(&data_res),
696                arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
697            ));
698            assert!(data_res.has_all_block_parts());
699        }
700
701        Ok(())
702    }
703
704    #[tokio::test]
705    async fn blocks_gc() -> Result<()> {
706        const GARBAGE: Bytes = Bytes::from_static(b"garbage");
707        const ENTRY_TYPES: [ArchiveEntryType; 3] = [
708            ArchiveEntryType::Block,
709            ArchiveEntryType::Proof,
710            ArchiveEntryType::QueueDiff,
711        ];
712        const CONNECTION_TYPES: [BlockConnection; 2] =
713            [BlockConnection::Prev1, BlockConnection::Next1];
714
715        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
716        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
717
718        let blocks = storage.block_storage();
719        let block_handles = storage.block_handle_storage();
720        let block_connections = storage.block_connection_storage();
721
722        let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
723
724        for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
725            let entry = shard_block_ids.entry(shard).or_default();
726
727            for seqno in 0..100 {
728                let block_id = BlockId {
729                    shard,
730                    seqno,
731                    root_hash: HashBytes(rand::random()),
732                    file_hash: HashBytes(rand::random()),
733                };
734                entry.push(block_id);
735
736                let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
737                    is_key_block: shard.is_masterchain() && seqno == 0,
738                    gen_utime: 0,
739                    ref_by_mc_seqno: seqno,
740                });
741                let lock = handle.block_data_lock().write().await;
742
743                for ty in ENTRY_TYPES {
744                    blocks
745                        .blob_storage
746                        .add_data(&(block_id, ty).into(), GARBAGE, &lock)
747                        .await?;
748                }
749                for direction in CONNECTION_TYPES {
750                    block_connections.store_connection(&handle, direction, &block_id);
751                }
752
753                handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
754                block_handles.store_handle(&handle, false);
755            }
756        }
757
758        // Remove some blocks
759        let stats = blobs::remove_blocks(
760            &blocks.blob_storage,
761            None,
762            70,
763            [(ShardIdent::BASECHAIN, 50)].into(),
764            None,
765        )?;
766        assert_eq!(stats, BlockGcStats {
767            mc_blocks_removed: 69,
768            total_blocks_removed: 69 + 49,
769        });
770
771        let removed_ranges = FastHashMap::from_iter([
772            (ShardIdent::MASTERCHAIN, vec![1..=69]),
773            (ShardIdent::BASECHAIN, vec![1..=49]),
774        ]);
775        for (shard, block_ids) in shard_block_ids {
776            let removed_ranges = removed_ranges.get(&shard).unwrap();
777
778            for block_id in block_ids {
779                let must_be_removed = 'removed: {
780                    for range in removed_ranges {
781                        if range.contains(&block_id.seqno) {
782                            break 'removed true;
783                        }
784                    }
785                    false
786                };
787
788                let handle = block_handles.load_handle(&block_id);
789                assert_eq!(handle.is_none(), must_be_removed);
790
791                for ty in ENTRY_TYPES {
792                    let key = PackageEntryKey::from((block_id, ty));
793                    // Check if the entry exists in Cassadilia
794                    let exists_in_cas = blocks
795                        .blob_storage()
796                        .blocks()
797                        .read_index_state()
798                        .contains_key(&key);
799                    assert_eq!(!exists_in_cas, must_be_removed);
800                }
801
802                for direction in CONNECTION_TYPES {
803                    let connection = block_connections.load_connection(&block_id, direction);
804                    assert_eq!(connection.is_none(), must_be_removed);
805                }
806            }
807        }
808
809        // Remove single block
810        let stats = blobs::remove_blocks(
811            &blocks.blob_storage,
812            None,
813            71,
814            [(ShardIdent::BASECHAIN, 51)].into(),
815            None,
816        )?;
817        assert_eq!(stats, BlockGcStats {
818            mc_blocks_removed: 1,
819            total_blocks_removed: 2,
820        });
821
822        // Remove no blocks
823        let stats = blobs::remove_blocks(
824            &blocks.blob_storage,
825            None,
826            71,
827            [(ShardIdent::BASECHAIN, 51)].into(),
828            None,
829        )?;
830        assert_eq!(stats, BlockGcStats {
831            mc_blocks_removed: 0,
832            total_blocks_removed: 0,
833        });
834
835        Ok(())
836    }
837
838    #[tokio::test]
839    async fn blocks_gc_skip_lifecycle() -> Result<()> {
840        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
841        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
842
843        let handles = storage.block_handle_storage();
844        let blob_storage = storage.block_storage().blob_storage.clone();
845
846        let id = BlockId {
847            shard: ShardIdent::MASTERCHAIN,
848            seqno: 1,
849            root_hash: HashBytes(rand::random()),
850            file_hash: HashBytes(rand::random()),
851        };
852
853        let (handle, _) = handles.create_or_load_handle(&id, NewBlockMeta {
854            is_key_block: false,
855            gen_utime: 0,
856            ref_by_mc_seqno: id.seqno,
857        });
858
859        handles.set_skip_blocks_gc(&handle);
860        assert!(handle.skip_blocks_gc());
861
862        blobs::remove_blocks(&blob_storage, None, 100, ShardHeights::default(), None)?;
863        assert!(handles.load_handle(&id).is_some());
864
865        handles.set_skip_blocks_gc_finished(&handle);
866        assert!(!handle.skip_blocks_gc());
867
868        drop(handle);
869
870        blobs::remove_blocks(&blob_storage, None, 100, ShardHeights::default(), None)?;
871        assert!(handles.load_handle(&id).is_none());
872
873        Ok(())
874    }
875}