tycho_core/storage/block/
mod.rs

1use std::fs::File;
2use std::io::BufReader;
3use std::num::NonZeroU32;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use tokio::sync::broadcast;
10use tycho_block_util::archive::ArchiveData;
11use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
12use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
13use tycho_types::models::*;
14use tycho_util::FastHasherState;
15use tycho_util::metrics::HistogramGuard;
16use tycho_util::sync::{CancellationFlag, rayon_run};
17
18pub use self::package_entry::{PackageEntryKey, PartialBlockId};
19use super::util::SlotSubscriptions;
20use super::{
21    BlockConnectionStorage, BlockFlags, BlockHandle, BlockHandleStorage, BlocksCacheConfig, CoreDb,
22    HandleCreationStatus, NewBlockMeta,
23};
24
25pub(crate) mod blobs;
26mod package_entry;
27
28pub use blobs::{ArchiveId, BlockGcStats, OpenStats};
29
30use crate::storage::block::blobs::DEFAULT_CHUNK_SIZE;
31use crate::storage::config::BlobDbConfig;
32
33const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
34const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
35
36pub struct BlockStorage {
37    blocks_cache: BlocksCache,
38    block_handle_storage: Arc<BlockHandleStorage>,
39    block_connection_storage: Arc<BlockConnectionStorage>,
40    block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
41    store_block_data: tokio::sync::RwLock<()>,
42    pub(crate) blob_storage: Arc<blobs::BlobStorage>,
43}
44
45impl BlockStorage {
46    pub const DEFAULT_BLOB_CHUNK_SIZE: NonZeroU32 =
47        NonZeroU32::new(DEFAULT_CHUNK_SIZE as u32).unwrap();
48    // === Init stuff ===
49
50    pub async fn new(
51        db: CoreDb,
52        config: BlockStorageConfig,
53        block_handle_storage: Arc<BlockHandleStorage>,
54        block_connection_storage: Arc<BlockConnectionStorage>,
55    ) -> Result<Self> {
56        fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
57            const BLOCK_STUFF_OVERHEAD: u32 = 1024; // 1 KB
58
59            size_of::<BlockId>() as u32
60                + BLOCK_STUFF_OVERHEAD
61                + value.data_size().try_into().unwrap_or(u32::MAX)
62        }
63
64        let blocks_cache = moka::sync::Cache::builder()
65            .time_to_live(config.blocks_cache.ttl)
66            .max_capacity(config.blocks_cache.size.as_u64())
67            .weigher(weigher)
68            .build_with_hasher(Default::default());
69
70        let blob_storage = blobs::BlobStorage::new(
71            db,
72            block_handle_storage.clone(),
73            &config.blobs_root,
74            config.blob_db_config.pre_create_cas_tree,
75        )
76        .await
77        .map(Arc::new)?;
78
79        Ok(Self {
80            blocks_cache,
81            block_handle_storage,
82            block_connection_storage,
83            block_subscriptions: Default::default(),
84            store_block_data: Default::default(),
85            blob_storage,
86        })
87    }
88
89    pub fn open_stats(&self) -> &OpenStats {
90        self.blob_storage.open_stats()
91    }
92
93    // === Subscription stuff ===
94
95    pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
96        let block_handle_storage = &self.block_handle_storage;
97
98        // Take an exclusive lock to prevent any block data from being stored
99        let guard = self.store_block_data.write().await;
100
101        // Try to load the block data
102        if let Some(handle) = block_handle_storage.load_handle(block_id)
103            && handle.has_data()
104        {
105            drop(guard);
106            let block = self.load_block_data(&handle).await?;
107            return Ok(BlockStuffAug::loaded(block));
108        }
109
110        // Add subscription for the block and drop the lock
111        let rx = self.block_subscriptions.subscribe(block_id);
112
113        // NOTE: Guard must be dropped before awaiting
114        drop(guard);
115
116        let block = rx.await;
117        Ok(BlockStuffAug::loaded(block))
118    }
119
120    pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
121        let block_id = self
122            .block_connection_storage
123            .wait_for_next1(prev_block_id)
124            .await;
125
126        self.wait_for_block(&block_id).await
127    }
128
129    // === Block data ===
130
131    pub async fn store_block_data(
132        &self,
133        block: &BlockStuff,
134        archive_data: &ArchiveData,
135        meta_data: NewBlockMeta,
136    ) -> Result<StoreBlockResult> {
137        // NOTE: Any amount of blocks can be stored concurrently,
138        // but the subscription lock can be acquired only while
139        // no block data is being stored.
140        let guard = self.store_block_data.read().await;
141
142        let block_id = block.id();
143        let (handle, status) = self
144            .block_handle_storage
145            .create_or_load_handle(block_id, meta_data);
146
147        let archive_id = PackageEntryKey::block(block_id);
148        let mut updated = false;
149        if !handle.has_data() {
150            let data = archive_data.clone_new_archive_data()?;
151            metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
152
153            let lock = handle.block_data_lock().write().await;
154            if !handle.has_data() {
155                self.blob_storage.add_data(&archive_id, data, &lock).await?;
156                if handle.meta().add_flags(BlockFlags::HAS_DATA) {
157                    self.block_handle_storage.store_handle(&handle, false);
158                    updated = true;
159                }
160            }
161        }
162
163        // TODO: only notify subscribers if `updated`?
164        self.block_subscriptions.notify(block_id, block);
165
166        drop(guard);
167
168        // Update block cache
169        self.blocks_cache.insert(*block_id, block.clone());
170
171        Ok(StoreBlockResult {
172            handle,
173            updated,
174            new: status == HandleCreationStatus::Created,
175        })
176    }
177
178    pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
179        metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
180
181        const BIG_DATA_THRESHOLD: usize = 1 << 20; // 1 MB
182
183        let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
184
185        anyhow::ensure!(
186            handle.has_data(),
187            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
188        );
189
190        // Fast path - lookup in cache
191        if let Some(block) = self.blocks_cache.get(handle.id()) {
192            metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
193            return Ok(block.clone());
194        }
195
196        let data = self
197            .blob_storage
198            .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
199            .await?;
200
201        if data.len() < BIG_DATA_THRESHOLD {
202            BlockStuff::deserialize(handle.id(), data.as_ref())
203        } else {
204            let handle = handle.clone();
205            rayon_run(move || BlockStuff::deserialize(handle.id(), data.as_ref())).await
206        }
207    }
208
209    pub async fn load_block_data_decompressed(&self, handle: &BlockHandle) -> Result<Bytes> {
210        anyhow::ensure!(
211            handle.has_data(),
212            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
213        );
214
215        self.blob_storage
216            .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
217            .await
218    }
219
220    pub async fn list_blocks(
221        &self,
222        continuation: Option<BlockIdShort>,
223    ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
224        self.blob_storage.list_blocks(continuation).await
225    }
226
227    pub fn list_archive_ids(&self) -> Vec<u32> {
228        self.blob_storage.list_archive_ids()
229    }
230
231    pub async fn load_block_data_range(
232        &self,
233        handle: &BlockHandle,
234        offset: u64,
235        length: u64,
236    ) -> Result<Option<Bytes>> {
237        anyhow::ensure!(
238            handle.has_data(),
239            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
240        );
241        self.blob_storage
242            .get_block_data_range(handle, offset, length)
243            .await
244    }
245
246    pub fn get_compressed_block_data_size(&self, handle: &BlockHandle) -> Result<Option<u64>> {
247        let key = PackageEntryKey::block(handle.id());
248        Ok(self.blob_storage.blocks().get_size(&key)?)
249    }
250
251    pub async fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
252        self.blob_storage.find_mc_block_data(mc_seqno).await
253    }
254
255    // === Block proof ===
256
257    pub async fn store_block_proof(
258        &self,
259        proof: &BlockProofStuffAug,
260        handle: MaybeExistingHandle,
261    ) -> Result<StoreBlockResult> {
262        let block_id = proof.id();
263        if let MaybeExistingHandle::Existing(handle) = &handle
264            && handle.id() != block_id
265        {
266            anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
267                expected: block_id.as_short_id(),
268                actual: handle.id().as_short_id(),
269            });
270        }
271
272        let (handle, status) = match handle {
273            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
274            MaybeExistingHandle::New(meta_data) => self
275                .block_handle_storage
276                .create_or_load_handle(block_id, meta_data),
277        };
278
279        let mut updated = false;
280        let archive_id = PackageEntryKey::proof(block_id);
281        if !handle.has_proof() {
282            let data = proof.clone_new_archive_data()?;
283
284            let lock = handle.proof_data_lock().write().await;
285            if !handle.has_proof() {
286                self.blob_storage.add_data(&archive_id, data, &lock).await?;
287                if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
288                    self.block_handle_storage.store_handle(&handle, false);
289                    updated = true;
290                }
291            }
292        }
293
294        Ok(StoreBlockResult {
295            handle,
296            updated,
297            new: status == HandleCreationStatus::Created,
298        })
299    }
300
301    pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
302        let raw_proof = self.load_block_proof_raw(handle).await?;
303        BlockProofStuff::deserialize(handle.id(), &raw_proof)
304    }
305
306    pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
307        anyhow::ensure!(
308            handle.has_proof(),
309            BlockStorageError::BlockProofNotFound(handle.id().as_short_id())
310        );
311
312        self.blob_storage
313            .get_block_data_decompressed(handle, &PackageEntryKey::proof(handle.id()))
314            .await
315    }
316
317    // === Queue diff ===
318
319    pub async fn store_queue_diff(
320        &self,
321        queue_diff: &QueueDiffStuffAug,
322        handle: MaybeExistingHandle,
323    ) -> Result<StoreBlockResult> {
324        let block_id = queue_diff.block_id();
325        if let MaybeExistingHandle::Existing(handle) = &handle
326            && handle.id() != block_id
327        {
328            anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
329                expected: block_id.as_short_id(),
330                actual: handle.id().as_short_id(),
331            });
332        }
333
334        let (handle, status) = match handle {
335            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
336            MaybeExistingHandle::New(meta_data) => self
337                .block_handle_storage
338                .create_or_load_handle(block_id, meta_data),
339        };
340
341        let mut updated = false;
342        let archive_id = PackageEntryKey::queue_diff(block_id);
343        if !handle.has_queue_diff() {
344            let data = queue_diff.clone_new_archive_data()?;
345
346            let lock = handle.queue_diff_data_lock().write().await;
347            if !handle.has_queue_diff() {
348                self.blob_storage.add_data(&archive_id, data, &lock).await?;
349                if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
350                    self.block_handle_storage.store_handle(&handle, false);
351                    updated = true;
352                }
353            }
354        }
355
356        Ok(StoreBlockResult {
357            handle,
358            updated,
359            new: status == HandleCreationStatus::Created,
360        })
361    }
362
363    pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
364        const BIG_DATA_THRESHOLD: usize = 100 << 10; // 100 kib
365
366        let data = self.load_queue_diff_raw(handle).await?;
367
368        if data.len() < BIG_DATA_THRESHOLD {
369            QueueDiffStuff::deserialize(handle.id(), &data)
370        } else {
371            let handle = handle.clone();
372            rayon_run(move || QueueDiffStuff::deserialize(handle.id(), data.as_ref())).await
373        }
374    }
375
376    pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
377        anyhow::ensure!(
378            handle.has_queue_diff(),
379            BlockStorageError::QueueDiffNotFound(handle.id().as_short_id())
380        );
381
382        self.blob_storage
383            .get_block_data_decompressed(handle, &PackageEntryKey::queue_diff(handle.id()))
384            .await
385    }
386
387    // === Archive stuff ===
388
389    pub async fn move_into_archive(
390        &self,
391        handle: &BlockHandle,
392        mc_is_key_block: bool,
393    ) -> Result<()> {
394        self.blob_storage
395            .move_into_archive(handle, mc_is_key_block)
396            .await
397    }
398
399    pub async fn wait_for_archive_commit(&self) -> Result<()> {
400        self.blob_storage.wait_for_archive_commit().await
401    }
402
403    /// Returns a corresponding archive id for the specified masterchain seqno.
404    pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
405        self.blob_storage.get_archive_id(mc_seqno)
406    }
407
408    pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
409        self.blob_storage.get_archive_size(id)
410    }
411
412    pub fn get_archive_reader(&self, id: u32) -> Result<Option<BufReader<File>>> {
413        self.blob_storage.get_archive_reader(id)
414    }
415
416    /// Get the complete archive (compressed).
417    ///
418    /// Must not be used for anything other than tests.
419    #[cfg(any(test, feature = "test"))]
420    pub async fn get_archive_compressed_full(&self, id: u32) -> Result<Option<Bytes>> {
421        self.blob_storage.get_archive_full(id).await
422    }
423
424    /// Get a chunk of the archive at the specified offset.
425    pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<Bytes> {
426        self.blob_storage.get_archive_chunk(id, offset).await
427    }
428
429    pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
430        self.blob_storage.subscribe_to_archive_ids()
431    }
432
433    pub async fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
434        self.blob_storage.remove_outdated_archives(until_id).await
435    }
436
437    // === GC stuff ===
438
439    #[tracing::instrument(skip(self, max_blocks_per_batch))]
440    pub async fn remove_outdated_blocks(
441        &self,
442        mc_seqno: u32,
443        max_blocks_per_batch: Option<usize>,
444    ) -> Result<()> {
445        if mc_seqno == 0 {
446            return Ok(());
447        }
448
449        tracing::info!("started blocks GC for mc_block {mc_seqno}");
450
451        let block = self
452            .blob_storage
453            .find_mc_block_data(mc_seqno)
454            .await
455            .context("failed to load target block data")?
456            .context("target block not found")?;
457
458        let shard_heights = {
459            let extra = block.extra.load()?;
460            let custom = extra.custom.context("mc block extra not found")?.load()?;
461            custom
462                .shards
463                .latest_blocks()
464                .map(|id| id.map(|id| (id.shard, id.seqno)))
465                .collect::<Result<_, tycho_types::error::Error>>()?
466        };
467
468        // Remove all expired entries
469        let total_cached_handles_removed = self
470            .block_handle_storage
471            .gc_handles_cache(mc_seqno, &shard_heights);
472
473        let cancelled = CancellationFlag::new();
474        scopeguard::defer! {
475            cancelled.cancel();
476        }
477
478        // Since remove_blocks needs blob_storage but we can't move self into the closure,
479        // we'll call it directly without rayon_run for now
480        let BlockGcStats {
481            mc_blocks_removed,
482            total_blocks_removed,
483        } = tokio::task::spawn_blocking({
484            let blob_storage = self.blob_storage.clone();
485            let cancelled = cancelled.clone();
486            move || {
487                blobs::remove_blocks(
488                    &blob_storage,
489                    max_blocks_per_batch,
490                    mc_seqno,
491                    shard_heights,
492                    Some(&cancelled),
493                )
494            }
495        })
496        .await??;
497
498        tracing::info!(
499            total_cached_handles_removed,
500            mc_blocks_removed,
501            total_blocks_removed,
502            "finished blocks GC"
503        );
504        Ok(())
505    }
506
507    // === Internal ===
508
509    #[cfg(any(test, feature = "test"))]
510    pub fn blob_storage(&self) -> &blobs::BlobStorage {
511        &self.blob_storage
512    }
513}
514
515#[derive(Clone)]
516pub enum MaybeExistingHandle {
517    Existing(BlockHandle),
518    New(NewBlockMeta),
519}
520
521impl From<BlockHandle> for MaybeExistingHandle {
522    fn from(handle: BlockHandle) -> Self {
523        Self::Existing(handle)
524    }
525}
526
527impl From<NewBlockMeta> for MaybeExistingHandle {
528    fn from(meta_data: NewBlockMeta) -> Self {
529        Self::New(meta_data)
530    }
531}
532
533pub struct StoreBlockResult {
534    pub handle: BlockHandle,
535    pub updated: bool,
536    pub new: bool,
537}
538
539pub struct BlockStorageConfig {
540    pub blocks_cache: BlocksCacheConfig,
541    pub blobs_root: PathBuf,
542    pub blob_db_config: BlobDbConfig,
543}
544
545type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
546
547#[derive(thiserror::Error, Debug)]
548enum BlockStorageError {
549    #[error("Block data not found for block: {0}")]
550    BlockDataNotFound(BlockIdShort),
551    #[error("Block proof not found for block: {0}")]
552    BlockProofNotFound(BlockIdShort),
553    #[error("Queue diff not found for block: {0}")]
554    QueueDiffNotFound(BlockIdShort),
555    #[error("Block handle id mismatch: expected {expected}, got {actual}")]
556    BlockHandleIdMismatch {
557        expected: BlockIdShort,
558        actual: BlockIdShort,
559    },
560}
561
562#[cfg(test)]
563mod tests {
564    use std::pin::pin;
565
566    use blobs::*;
567    use tycho_block_util::archive::{ArchiveEntryType, WithArchiveData};
568    use tycho_storage::StorageContext;
569    use tycho_types::prelude::*;
570    use tycho_util::FastHashMap;
571    use tycho_util::futures::JoinTask;
572
573    use super::*;
574    use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
575
576    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
577    async fn parallel_store_data() -> Result<()> {
578        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
579        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
580
581        let shard = ShardIdent::MASTERCHAIN;
582        for seqno in 0..1000 {
583            let block = BlockStuff::new_empty(shard, seqno);
584            let block = {
585                let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
586                WithArchiveData::new(block, data)
587            };
588            let block_id = block.id();
589
590            let proof = BlockProofStuff::new_empty(block_id);
591            let proof = {
592                let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
593                WithArchiveData::new(proof, data)
594            };
595
596            let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
597                .serialize()
598                .build(block_id);
599
600            let block_meta = NewBlockMeta {
601                is_key_block: shard.is_masterchain() && seqno == 0,
602                gen_utime: 0,
603                ref_by_mc_seqno: seqno,
604            };
605
606            let store_block_data = || {
607                let storage = storage.clone();
608                JoinTask::new(async move {
609                    let res = storage
610                        .block_storage()
611                        .store_block_data(&block, &block.archive_data, block_meta)
612                        .await?;
613
614                    Ok::<_, anyhow::Error>(res.handle)
615                })
616            };
617
618            let store_proof_and_queue = || {
619                let storage = storage.clone();
620                let proof = proof.clone();
621                let queue_diff = queue_diff.clone();
622                JoinTask::new(async move {
623                    if rand::random::<bool>() {
624                        tokio::task::yield_now().await;
625                    }
626
627                    let res = storage
628                        .block_storage()
629                        .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
630                        .await?;
631
632                    if rand::random::<bool>() {
633                        tokio::task::yield_now().await;
634                    }
635
636                    let res = storage
637                        .block_storage()
638                        .store_queue_diff(&queue_diff, res.handle.into())
639                        .await?;
640
641                    if rand::random::<bool>() {
642                        tokio::task::yield_now().await;
643                    }
644
645                    Ok::<_, anyhow::Error>(res.handle)
646                })
647            };
648
649            let (data_res, proof_and_queue_res) = async move {
650                let data_fut = pin!(store_block_data());
651                let proof_and_queue_fut = pin!(async {
652                    tokio::select! {
653                        left = store_proof_and_queue() => left,
654                        right = store_proof_and_queue() => right,
655                    }
656                });
657
658                let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
659
660                Ok::<_, anyhow::Error>((data?, other?))
661            }
662            .await?;
663
664            assert!(std::ptr::addr_eq(
665                arc_swap::RefCnt::as_ptr(&data_res),
666                arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
667            ));
668            assert!(data_res.has_all_block_parts());
669        }
670
671        Ok(())
672    }
673
674    #[tokio::test]
675    async fn blocks_gc() -> Result<()> {
676        const GARBAGE: Bytes = Bytes::from_static(b"garbage");
677        const ENTRY_TYPES: [ArchiveEntryType; 3] = [
678            ArchiveEntryType::Block,
679            ArchiveEntryType::Proof,
680            ArchiveEntryType::QueueDiff,
681        ];
682        const CONNECTION_TYPES: [BlockConnection; 2] =
683            [BlockConnection::Prev1, BlockConnection::Next1];
684
685        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
686        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
687
688        let blocks = storage.block_storage();
689        let block_handles = storage.block_handle_storage();
690        let block_connections = storage.block_connection_storage();
691
692        let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
693
694        for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
695            let entry = shard_block_ids.entry(shard).or_default();
696
697            for seqno in 0..100 {
698                let block_id = BlockId {
699                    shard,
700                    seqno,
701                    root_hash: HashBytes(rand::random()),
702                    file_hash: HashBytes(rand::random()),
703                };
704                entry.push(block_id);
705
706                let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
707                    is_key_block: shard.is_masterchain() && seqno == 0,
708                    gen_utime: 0,
709                    ref_by_mc_seqno: seqno,
710                });
711                let lock = handle.block_data_lock().write().await;
712
713                for ty in ENTRY_TYPES {
714                    blocks
715                        .blob_storage
716                        .add_data(&(block_id, ty).into(), GARBAGE, &lock)
717                        .await?;
718                }
719                for direction in CONNECTION_TYPES {
720                    block_connections.store_connection(&handle, direction, &block_id);
721                }
722
723                handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
724                block_handles.store_handle(&handle, false);
725            }
726        }
727
728        // Remove some blocks
729        let stats = blobs::remove_blocks(
730            &blocks.blob_storage,
731            None,
732            70,
733            [(ShardIdent::BASECHAIN, 50)].into(),
734            None,
735        )?;
736        assert_eq!(stats, BlockGcStats {
737            mc_blocks_removed: 69,
738            total_blocks_removed: 69 + 49,
739        });
740
741        let removed_ranges = FastHashMap::from_iter([
742            (ShardIdent::MASTERCHAIN, vec![1..=69]),
743            (ShardIdent::BASECHAIN, vec![1..=49]),
744        ]);
745        for (shard, block_ids) in shard_block_ids {
746            let removed_ranges = removed_ranges.get(&shard).unwrap();
747
748            for block_id in block_ids {
749                let must_be_removed = 'removed: {
750                    for range in removed_ranges {
751                        if range.contains(&block_id.seqno) {
752                            break 'removed true;
753                        }
754                    }
755                    false
756                };
757
758                let handle = block_handles.load_handle(&block_id);
759                assert_eq!(handle.is_none(), must_be_removed);
760
761                for ty in ENTRY_TYPES {
762                    let key = PackageEntryKey::from((block_id, ty));
763                    // Check if the entry exists in Cassadilia
764                    let exists_in_cas = blocks
765                        .blob_storage()
766                        .blocks()
767                        .read_index_state()
768                        .contains_key(&key);
769                    assert_eq!(!exists_in_cas, must_be_removed);
770                }
771
772                for direction in CONNECTION_TYPES {
773                    let connection = block_connections.load_connection(&block_id, direction);
774                    assert_eq!(connection.is_none(), must_be_removed);
775                }
776            }
777        }
778
779        // Remove single block
780        let stats = blobs::remove_blocks(
781            &blocks.blob_storage,
782            None,
783            71,
784            [(ShardIdent::BASECHAIN, 51)].into(),
785            None,
786        )?;
787        assert_eq!(stats, BlockGcStats {
788            mc_blocks_removed: 1,
789            total_blocks_removed: 2,
790        });
791
792        // Remove no blocks
793        let stats = blobs::remove_blocks(
794            &blocks.blob_storage,
795            None,
796            71,
797            [(ShardIdent::BASECHAIN, 51)].into(),
798            None,
799        )?;
800        assert_eq!(stats, BlockGcStats {
801            mc_blocks_removed: 0,
802            total_blocks_removed: 0,
803        });
804
805        Ok(())
806    }
807}