Skip to main content

tycho_core/storage/block/
mod.rs

1use std::fs::File;
2use std::io::BufReader;
3use std::num::NonZeroU32;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use tokio::sync::broadcast;
10use tycho_block_util::archive::ArchiveData;
11use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
12use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
13use tycho_types::models::*;
14use tycho_util::FastHasherState;
15use tycho_util::metrics::HistogramGuard;
16use tycho_util::sync::{CancellationFlag, rayon_run};
17
18pub use self::package_entry::{PackageEntryKey, PartialBlockId};
19use super::util::SlotSubscriptions;
20use super::{
21    BlockConnectionStorage, BlockFlags, BlockHandle, BlockHandleStorage, BlocksCacheConfig, CoreDb,
22    HandleCreationStatus, NewBlockMeta,
23};
24
25pub(crate) mod blobs;
26mod package_entry;
27
28pub use blobs::{ArchiveId, BlockGcStats, OpenStats};
29
30use crate::storage::block::blobs::DEFAULT_CHUNK_SIZE;
31use crate::storage::config::BlobDbConfig;
32
33const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
34const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
35
36pub struct BlockStorage {
37    blocks_cache: BlocksCache,
38    block_handle_storage: Arc<BlockHandleStorage>,
39    block_connection_storage: Arc<BlockConnectionStorage>,
40    block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
41    store_block_data: tokio::sync::RwLock<()>,
42    pub(crate) blob_storage: Arc<blobs::BlobStorage>,
43}
44
45impl BlockStorage {
46    pub const DEFAULT_BLOB_CHUNK_SIZE: NonZeroU32 =
47        NonZeroU32::new(DEFAULT_CHUNK_SIZE as u32).unwrap();
48    // === Init stuff ===
49
50    pub async fn new(
51        db: CoreDb,
52        config: BlockStorageConfig,
53        block_handle_storage: Arc<BlockHandleStorage>,
54        block_connection_storage: Arc<BlockConnectionStorage>,
55    ) -> Result<Self> {
56        fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
57            const BLOCK_STUFF_OVERHEAD: u32 = 1024; // 1 KB
58
59            size_of::<BlockId>() as u32
60                + BLOCK_STUFF_OVERHEAD
61                + value.data_size().try_into().unwrap_or(u32::MAX)
62        }
63
64        let blocks_cache = moka::sync::Cache::builder()
65            .time_to_live(config.blocks_cache.ttl)
66            .max_capacity(config.blocks_cache.size.as_u64())
67            .weigher(weigher)
68            .build_with_hasher(Default::default());
69
70        let blob_storage = blobs::BlobStorage::new(
71            db,
72            block_handle_storage.clone(),
73            &config.blobs_root,
74            config.blob_db_config.pre_create_cas_tree,
75        )
76        .await
77        .map(Arc::new)?;
78
79        Ok(Self {
80            blocks_cache,
81            block_handle_storage,
82            block_connection_storage,
83            block_subscriptions: Default::default(),
84            store_block_data: Default::default(),
85            blob_storage,
86        })
87    }
88
89    pub fn open_stats(&self) -> &OpenStats {
90        self.blob_storage.open_stats()
91    }
92
93    // === Subscription stuff ===
94
95    pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
96        let block_handle_storage = &self.block_handle_storage;
97
98        // Take an exclusive lock to prevent any block data from being stored
99        let guard = self.store_block_data.write().await;
100
101        // Try to load the block data
102        if let Some(handle) = block_handle_storage.load_handle(block_id)
103            && handle.has_data()
104        {
105            drop(guard);
106            let block = self.load_block_data(&handle).await?;
107            return Ok(BlockStuffAug::loaded(block));
108        }
109
110        // Add subscription for the block and drop the lock
111        let rx = self.block_subscriptions.subscribe(block_id);
112
113        // NOTE: Guard must be dropped before awaiting
114        drop(guard);
115
116        let block = rx.await;
117        Ok(BlockStuffAug::loaded(block))
118    }
119
120    pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
121        let block_id = self
122            .block_connection_storage
123            .wait_for_next1(prev_block_id)
124            .await;
125
126        self.wait_for_block(&block_id).await
127    }
128
129    // === Block data ===
130
131    pub async fn store_block_data(
132        &self,
133        block: &BlockStuff,
134        archive_data: &ArchiveData,
135        meta_data: NewBlockMeta,
136    ) -> Result<StoreBlockResult> {
137        // NOTE: Any amount of blocks can be stored concurrently,
138        // but the subscription lock can be acquired only while
139        // no block data is being stored.
140        let guard = self.store_block_data.read().await;
141
142        let block_id = block.id();
143        let (handle, status) = self
144            .block_handle_storage
145            .create_or_load_handle(block_id, meta_data);
146
147        let archive_id = PackageEntryKey::block(block_id);
148        let mut updated = false;
149        if !handle.has_data() {
150            let data = archive_data.clone_new_archive_data()?;
151            metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
152
153            let lock = handle.block_data_lock().write().await;
154            if !handle.has_data() {
155                self.blob_storage.add_data(&archive_id, data, &lock).await?;
156                if handle.meta().add_flags(BlockFlags::HAS_DATA) {
157                    self.block_handle_storage.store_handle(&handle, false);
158                    updated = true;
159                }
160            }
161        }
162
163        // TODO: only notify subscribers if `updated`?
164        self.block_subscriptions.notify(block_id, block);
165
166        drop(guard);
167
168        // Update block cache
169        self.blocks_cache.insert(*block_id, block.clone());
170
171        Ok(StoreBlockResult {
172            handle,
173            updated,
174            new: status == HandleCreationStatus::Created,
175        })
176    }
177
178    pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
179        metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
180
181        const BIG_DATA_THRESHOLD: usize = 1 << 20; // 1 MB
182
183        let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
184
185        anyhow::ensure!(
186            handle.has_data(),
187            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
188        );
189
190        // Fast path - lookup in cache
191        if let Some(block) = self.blocks_cache.get(handle.id()) {
192            metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
193            return Ok(block.clone());
194        }
195
196        let data = self
197            .blob_storage
198            .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
199            .await?;
200
201        if data.len() < BIG_DATA_THRESHOLD {
202            BlockStuff::deserialize(handle.id(), data.as_ref())
203        } else {
204            let handle = handle.clone();
205            rayon_run(move || BlockStuff::deserialize(handle.id(), data.as_ref())).await
206        }
207    }
208
209    pub async fn load_block_data_decompressed(&self, handle: &BlockHandle) -> Result<Bytes> {
210        anyhow::ensure!(
211            handle.has_data(),
212            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
213        );
214
215        self.blob_storage
216            .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
217            .await
218    }
219
220    pub async fn list_blocks(
221        &self,
222        continuation: Option<BlockIdShort>,
223    ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
224        self.blob_storage.list_blocks(continuation).await
225    }
226
227    pub fn list_archive_ids(&self) -> Vec<u32> {
228        self.blob_storage.list_archive_ids()
229    }
230
231    pub async fn load_block_data_range(
232        &self,
233        handle: &BlockHandle,
234        offset: u64,
235        length: u64,
236    ) -> Result<Option<Bytes>> {
237        anyhow::ensure!(
238            handle.has_data(),
239            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
240        );
241        self.blob_storage
242            .get_block_data_range(handle, offset, length)
243            .await
244    }
245
246    pub fn get_compressed_block_data_size(&self, handle: &BlockHandle) -> Result<Option<u64>> {
247        let key = PackageEntryKey::block(handle.id());
248        Ok(self.blob_storage.blocks().get_size(&key)?)
249    }
250
251    pub async fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
252        self.blob_storage.find_mc_block_data(mc_seqno).await
253    }
254
255    // === Block proof ===
256
257    pub async fn store_block_proof(
258        &self,
259        proof: &BlockProofStuffAug,
260        handle: MaybeExistingHandle,
261    ) -> Result<StoreBlockResult> {
262        let block_id = proof.id();
263        if let MaybeExistingHandle::Existing(handle) = &handle
264            && handle.id() != block_id
265        {
266            anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
267                expected: block_id.as_short_id(),
268                actual: handle.id().as_short_id(),
269            });
270        }
271
272        let (handle, status) = match handle {
273            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
274            MaybeExistingHandle::New(meta_data) => self
275                .block_handle_storage
276                .create_or_load_handle(block_id, meta_data),
277        };
278
279        let mut updated = false;
280        let archive_id = PackageEntryKey::proof(block_id);
281        if !handle.has_proof() {
282            let data = proof.clone_new_archive_data()?;
283
284            let lock = handle.proof_data_lock().write().await;
285            if !handle.has_proof() {
286                self.blob_storage.add_data(&archive_id, data, &lock).await?;
287                if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
288                    self.block_handle_storage.store_handle(&handle, false);
289                    updated = true;
290                }
291            }
292        }
293
294        Ok(StoreBlockResult {
295            handle,
296            updated,
297            new: status == HandleCreationStatus::Created,
298        })
299    }
300
301    pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
302        let raw_proof = self.load_block_proof_raw(handle).await?;
303        BlockProofStuff::deserialize(handle.id(), &raw_proof)
304    }
305
306    pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
307        anyhow::ensure!(
308            handle.has_proof(),
309            BlockStorageError::BlockProofNotFound(handle.id().as_short_id())
310        );
311
312        self.blob_storage
313            .get_block_data_decompressed(handle, &PackageEntryKey::proof(handle.id()))
314            .await
315    }
316
317    // === Queue diff ===
318
319    pub async fn store_queue_diff(
320        &self,
321        queue_diff: &QueueDiffStuffAug,
322        handle: MaybeExistingHandle,
323    ) -> Result<StoreBlockResult> {
324        let block_id = queue_diff.block_id();
325        if let MaybeExistingHandle::Existing(handle) = &handle
326            && handle.id() != block_id
327        {
328            anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
329                expected: block_id.as_short_id(),
330                actual: handle.id().as_short_id(),
331            });
332        }
333
334        let (handle, status) = match handle {
335            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
336            MaybeExistingHandle::New(meta_data) => self
337                .block_handle_storage
338                .create_or_load_handle(block_id, meta_data),
339        };
340
341        let mut updated = false;
342        let archive_id = PackageEntryKey::queue_diff(block_id);
343        if !handle.has_queue_diff() {
344            let data = queue_diff.clone_new_archive_data()?;
345
346            let lock = handle.queue_diff_data_lock().write().await;
347            if !handle.has_queue_diff() {
348                self.blob_storage.add_data(&archive_id, data, &lock).await?;
349                if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
350                    self.block_handle_storage.store_handle(&handle, false);
351                    updated = true;
352                }
353            }
354        }
355
356        Ok(StoreBlockResult {
357            handle,
358            updated,
359            new: status == HandleCreationStatus::Created,
360        })
361    }
362
363    pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
364        const BIG_DATA_THRESHOLD: usize = 100 << 10; // 100 kib
365
366        let data = self.load_queue_diff_raw(handle).await?;
367
368        if data.len() < BIG_DATA_THRESHOLD {
369            QueueDiffStuff::deserialize(handle.id(), &data)
370        } else {
371            let handle = handle.clone();
372            rayon_run(move || QueueDiffStuff::deserialize(handle.id(), data.as_ref())).await
373        }
374    }
375
376    pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
377        anyhow::ensure!(
378            handle.has_queue_diff(),
379            BlockStorageError::QueueDiffNotFound(handle.id().as_short_id())
380        );
381
382        self.blob_storage
383            .get_block_data_decompressed(handle, &PackageEntryKey::queue_diff(handle.id()))
384            .await
385    }
386
387    // === Archive stuff ===
388
389    pub async fn move_into_archive(
390        &self,
391        handle: &BlockHandle,
392        mc_is_key_block: bool,
393    ) -> Result<()> {
394        self.blob_storage
395            .move_into_archive(handle, mc_is_key_block)
396            .await
397    }
398
399    pub async fn wait_for_archive_commit(&self) -> Result<()> {
400        self.blob_storage.wait_for_archive_commit().await
401    }
402
403    /// Returns a corresponding archive id for the specified masterchain seqno.
404    pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
405        self.blob_storage.get_archive_id(mc_seqno)
406    }
407
408    pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
409        self.blob_storage.get_archive_size(id)
410    }
411
412    pub fn get_archive_reader(&self, id: u32) -> Result<Option<BufReader<File>>> {
413        self.blob_storage.get_archive_reader(id)
414    }
415
416    /// Get the complete archive (compressed).
417    ///
418    /// Must not be used for anything other than tests.
419    #[cfg(any(test, feature = "test"))]
420    pub async fn get_archive_compressed_full(&self, id: u32) -> Result<Option<Bytes>> {
421        self.blob_storage.get_archive_full(id).await
422    }
423
424    /// Get a chunk of the archive at the specified offset.
425    pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<Bytes> {
426        self.blob_storage.get_archive_chunk(id, offset).await
427    }
428
429    pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
430        self.blob_storage.subscribe_to_archive_ids()
431    }
432
433    pub async fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
434        self.blob_storage.remove_outdated_archives(until_id).await
435    }
436
437    pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
438        self.blob_storage.estimate_archive_id(mc_seqno)
439    }
440
441    // === GC stuff ===
442
443    #[tracing::instrument(skip(self, max_blocks_per_batch))]
444    pub async fn remove_outdated_blocks(
445        &self,
446        mc_seqno: u32,
447        max_blocks_per_batch: Option<usize>,
448    ) -> Result<()> {
449        if mc_seqno == 0 {
450            return Ok(());
451        }
452
453        tracing::info!("started blocks GC for mc_block {mc_seqno}");
454
455        let block = self
456            .blob_storage
457            .find_mc_block_data(mc_seqno)
458            .await
459            .context("failed to load target block data")?
460            .context("target block not found")?;
461
462        let shard_heights = {
463            let extra = block.extra.load()?;
464            let custom = extra.custom.context("mc block extra not found")?.load()?;
465            custom
466                .shards
467                .latest_blocks()
468                .map(|id| id.map(|id| (id.shard, id.seqno)))
469                .collect::<Result<_, tycho_types::error::Error>>()?
470        };
471
472        // Remove all expired entries
473        let total_cached_handles_removed = self
474            .block_handle_storage
475            .gc_handles_cache(mc_seqno, &shard_heights);
476
477        let cancelled = CancellationFlag::new();
478        scopeguard::defer! {
479            cancelled.cancel();
480        }
481
482        // Since remove_blocks needs blob_storage but we can't move self into the closure,
483        // we'll call it directly without rayon_run for now
484        let BlockGcStats {
485            mc_blocks_removed,
486            total_blocks_removed,
487        } = tokio::task::spawn_blocking({
488            let blob_storage = self.blob_storage.clone();
489            let cancelled = cancelled.clone();
490            move || {
491                blobs::remove_blocks(
492                    &blob_storage,
493                    max_blocks_per_batch,
494                    mc_seqno,
495                    shard_heights,
496                    Some(&cancelled),
497                )
498            }
499        })
500        .await??;
501
502        tracing::info!(
503            total_cached_handles_removed,
504            mc_blocks_removed,
505            total_blocks_removed,
506            "finished blocks GC"
507        );
508        Ok(())
509    }
510
511    // === Internal ===
512
513    #[cfg(any(test, feature = "test"))]
514    pub fn blob_storage(&self) -> &blobs::BlobStorage {
515        &self.blob_storage
516    }
517}
518
519#[derive(Clone)]
520pub enum MaybeExistingHandle {
521    Existing(BlockHandle),
522    New(NewBlockMeta),
523}
524
525impl From<BlockHandle> for MaybeExistingHandle {
526    fn from(handle: BlockHandle) -> Self {
527        Self::Existing(handle)
528    }
529}
530
531impl From<NewBlockMeta> for MaybeExistingHandle {
532    fn from(meta_data: NewBlockMeta) -> Self {
533        Self::New(meta_data)
534    }
535}
536
537pub struct StoreBlockResult {
538    pub handle: BlockHandle,
539    pub updated: bool,
540    pub new: bool,
541}
542
543pub struct BlockStorageConfig {
544    pub blocks_cache: BlocksCacheConfig,
545    pub blobs_root: PathBuf,
546    pub blob_db_config: BlobDbConfig,
547}
548
549type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
550
551#[derive(thiserror::Error, Debug)]
552enum BlockStorageError {
553    #[error("Block data not found for block: {0}")]
554    BlockDataNotFound(BlockIdShort),
555    #[error("Block proof not found for block: {0}")]
556    BlockProofNotFound(BlockIdShort),
557    #[error("Queue diff not found for block: {0}")]
558    QueueDiffNotFound(BlockIdShort),
559    #[error("Block handle id mismatch: expected {expected}, got {actual}")]
560    BlockHandleIdMismatch {
561        expected: BlockIdShort,
562        actual: BlockIdShort,
563    },
564}
565
566#[cfg(test)]
567mod tests {
568    use std::pin::pin;
569
570    use blobs::*;
571    use tycho_block_util::archive::{ArchiveEntryType, WithArchiveData};
572    use tycho_storage::StorageContext;
573    use tycho_types::prelude::*;
574    use tycho_util::FastHashMap;
575    use tycho_util::futures::JoinTask;
576
577    use super::*;
578    use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
579
580    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
581    async fn parallel_store_data() -> Result<()> {
582        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
583        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
584
585        let shard = ShardIdent::MASTERCHAIN;
586        for seqno in 0..1000 {
587            let block = BlockStuff::new_empty(shard, seqno);
588            let block = {
589                let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
590                WithArchiveData::new(block, data)
591            };
592            let block_id = block.id();
593
594            let proof = BlockProofStuff::new_empty(block_id);
595            let proof = {
596                let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
597                WithArchiveData::new(proof, data)
598            };
599
600            let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
601                .serialize()
602                .build(block_id);
603
604            let block_meta = NewBlockMeta {
605                is_key_block: shard.is_masterchain() && seqno == 0,
606                gen_utime: 0,
607                ref_by_mc_seqno: seqno,
608            };
609
610            let store_block_data = || {
611                let storage = storage.clone();
612                JoinTask::new(async move {
613                    let res = storage
614                        .block_storage()
615                        .store_block_data(&block, &block.archive_data, block_meta)
616                        .await?;
617
618                    Ok::<_, anyhow::Error>(res.handle)
619                })
620            };
621
622            let store_proof_and_queue = || {
623                let storage = storage.clone();
624                let proof = proof.clone();
625                let queue_diff = queue_diff.clone();
626                JoinTask::new(async move {
627                    if rand::random::<bool>() {
628                        tokio::task::yield_now().await;
629                    }
630
631                    let res = storage
632                        .block_storage()
633                        .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
634                        .await?;
635
636                    if rand::random::<bool>() {
637                        tokio::task::yield_now().await;
638                    }
639
640                    let res = storage
641                        .block_storage()
642                        .store_queue_diff(&queue_diff, res.handle.into())
643                        .await?;
644
645                    if rand::random::<bool>() {
646                        tokio::task::yield_now().await;
647                    }
648
649                    Ok::<_, anyhow::Error>(res.handle)
650                })
651            };
652
653            let (data_res, proof_and_queue_res) = async move {
654                let data_fut = pin!(store_block_data());
655                let proof_and_queue_fut = pin!(async {
656                    tokio::select! {
657                        left = store_proof_and_queue() => left,
658                        right = store_proof_and_queue() => right,
659                    }
660                });
661
662                let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
663
664                Ok::<_, anyhow::Error>((data?, other?))
665            }
666            .await?;
667
668            assert!(std::ptr::addr_eq(
669                arc_swap::RefCnt::as_ptr(&data_res),
670                arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
671            ));
672            assert!(data_res.has_all_block_parts());
673        }
674
675        Ok(())
676    }
677
678    #[tokio::test]
679    async fn blocks_gc() -> Result<()> {
680        const GARBAGE: Bytes = Bytes::from_static(b"garbage");
681        const ENTRY_TYPES: [ArchiveEntryType; 3] = [
682            ArchiveEntryType::Block,
683            ArchiveEntryType::Proof,
684            ArchiveEntryType::QueueDiff,
685        ];
686        const CONNECTION_TYPES: [BlockConnection; 2] =
687            [BlockConnection::Prev1, BlockConnection::Next1];
688
689        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
690        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
691
692        let blocks = storage.block_storage();
693        let block_handles = storage.block_handle_storage();
694        let block_connections = storage.block_connection_storage();
695
696        let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
697
698        for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
699            let entry = shard_block_ids.entry(shard).or_default();
700
701            for seqno in 0..100 {
702                let block_id = BlockId {
703                    shard,
704                    seqno,
705                    root_hash: HashBytes(rand::random()),
706                    file_hash: HashBytes(rand::random()),
707                };
708                entry.push(block_id);
709
710                let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
711                    is_key_block: shard.is_masterchain() && seqno == 0,
712                    gen_utime: 0,
713                    ref_by_mc_seqno: seqno,
714                });
715                let lock = handle.block_data_lock().write().await;
716
717                for ty in ENTRY_TYPES {
718                    blocks
719                        .blob_storage
720                        .add_data(&(block_id, ty).into(), GARBAGE, &lock)
721                        .await?;
722                }
723                for direction in CONNECTION_TYPES {
724                    block_connections.store_connection(&handle, direction, &block_id);
725                }
726
727                handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
728                block_handles.store_handle(&handle, false);
729            }
730        }
731
732        // Remove some blocks
733        let stats = blobs::remove_blocks(
734            &blocks.blob_storage,
735            None,
736            70,
737            [(ShardIdent::BASECHAIN, 50)].into(),
738            None,
739        )?;
740        assert_eq!(stats, BlockGcStats {
741            mc_blocks_removed: 69,
742            total_blocks_removed: 69 + 49,
743        });
744
745        let removed_ranges = FastHashMap::from_iter([
746            (ShardIdent::MASTERCHAIN, vec![1..=69]),
747            (ShardIdent::BASECHAIN, vec![1..=49]),
748        ]);
749        for (shard, block_ids) in shard_block_ids {
750            let removed_ranges = removed_ranges.get(&shard).unwrap();
751
752            for block_id in block_ids {
753                let must_be_removed = 'removed: {
754                    for range in removed_ranges {
755                        if range.contains(&block_id.seqno) {
756                            break 'removed true;
757                        }
758                    }
759                    false
760                };
761
762                let handle = block_handles.load_handle(&block_id);
763                assert_eq!(handle.is_none(), must_be_removed);
764
765                for ty in ENTRY_TYPES {
766                    let key = PackageEntryKey::from((block_id, ty));
767                    // Check if the entry exists in Cassadilia
768                    let exists_in_cas = blocks
769                        .blob_storage()
770                        .blocks()
771                        .read_index_state()
772                        .contains_key(&key);
773                    assert_eq!(!exists_in_cas, must_be_removed);
774                }
775
776                for direction in CONNECTION_TYPES {
777                    let connection = block_connections.load_connection(&block_id, direction);
778                    assert_eq!(connection.is_none(), must_be_removed);
779                }
780            }
781        }
782
783        // Remove single block
784        let stats = blobs::remove_blocks(
785            &blocks.blob_storage,
786            None,
787            71,
788            [(ShardIdent::BASECHAIN, 51)].into(),
789            None,
790        )?;
791        assert_eq!(stats, BlockGcStats {
792            mc_blocks_removed: 1,
793            total_blocks_removed: 2,
794        });
795
796        // Remove no blocks
797        let stats = blobs::remove_blocks(
798            &blocks.blob_storage,
799            None,
800            71,
801            [(ShardIdent::BASECHAIN, 51)].into(),
802            None,
803        )?;
804        assert_eq!(stats, BlockGcStats {
805            mc_blocks_removed: 0,
806            total_blocks_removed: 0,
807        });
808
809        Ok(())
810    }
811}