tycho_core/storage/block/
mod.rs

1use std::fs::File;
2use std::io::BufReader;
3use std::num::NonZeroU32;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use tokio::sync::broadcast;
10use tycho_block_util::archive::ArchiveData;
11use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
12use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
13use tycho_types::models::*;
14use tycho_util::FastHasherState;
15use tycho_util::metrics::HistogramGuard;
16use tycho_util::sync::{CancellationFlag, rayon_run};
17
18pub use self::package_entry::{PackageEntryKey, PartialBlockId};
19use super::util::SlotSubscriptions;
20use super::{
21    BlockConnectionStorage, BlockFlags, BlockHandle, BlockHandleStorage, BlocksCacheConfig, CoreDb,
22    HandleCreationStatus, NewBlockMeta,
23};
24
25pub(crate) mod blobs;
26mod package_entry;
27
28pub use blobs::{ArchiveId, BlockGcStats, OpenStats};
29
30use crate::storage::block::blobs::DEFAULT_CHUNK_SIZE;
31use crate::storage::config::BlobDbConfig;
32
33const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
34const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
35
36pub struct BlockStorage {
37    blocks_cache: BlocksCache,
38    block_handle_storage: Arc<BlockHandleStorage>,
39    block_connection_storage: Arc<BlockConnectionStorage>,
40    block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
41    store_block_data: tokio::sync::RwLock<()>,
42    pub(crate) blob_storage: blobs::BlobStorage,
43}
44
45impl BlockStorage {
46    pub const DEFAULT_BLOB_CHUNK_SIZE: NonZeroU32 =
47        NonZeroU32::new(DEFAULT_CHUNK_SIZE as u32).unwrap();
48    // === Init stuff ===
49
50    pub async fn new(
51        db: CoreDb,
52        config: BlockStorageConfig,
53        block_handle_storage: Arc<BlockHandleStorage>,
54        block_connection_storage: Arc<BlockConnectionStorage>,
55    ) -> Result<Self> {
56        fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
57            const BLOCK_STUFF_OVERHEAD: u32 = 1024; // 1 KB
58
59            size_of::<BlockId>() as u32
60                + BLOCK_STUFF_OVERHEAD
61                + value.data_size().try_into().unwrap_or(u32::MAX)
62        }
63
64        let blocks_cache = moka::sync::Cache::builder()
65            .time_to_live(config.blocks_cache.ttl)
66            .max_capacity(config.blocks_cache.size.as_u64())
67            .weigher(weigher)
68            .build_with_hasher(Default::default());
69
70        let blob_storage = blobs::BlobStorage::new(
71            db,
72            block_handle_storage.clone(),
73            &config.blobs_root,
74            config.blob_db_config.pre_create_cas_tree,
75        )
76        .await?;
77
78        Ok(Self {
79            blocks_cache,
80            block_handle_storage,
81            block_connection_storage,
82            block_subscriptions: Default::default(),
83            store_block_data: Default::default(),
84            blob_storage,
85        })
86    }
87
88    pub fn open_stats(&self) -> &OpenStats {
89        self.blob_storage.open_stats()
90    }
91
92    // === Subscription stuff ===
93
94    pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
95        let block_handle_storage = &self.block_handle_storage;
96
97        // Take an exclusive lock to prevent any block data from being stored
98        let guard = self.store_block_data.write().await;
99
100        // Try to load the block data
101        if let Some(handle) = block_handle_storage.load_handle(block_id)
102            && handle.has_data()
103        {
104            drop(guard);
105            let block = self.load_block_data(&handle).await?;
106            return Ok(BlockStuffAug::loaded(block));
107        }
108
109        // Add subscription for the block and drop the lock
110        let rx = self.block_subscriptions.subscribe(block_id);
111
112        // NOTE: Guard must be dropped before awaiting
113        drop(guard);
114
115        let block = rx.await;
116        Ok(BlockStuffAug::loaded(block))
117    }
118
119    pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
120        let block_id = self
121            .block_connection_storage
122            .wait_for_next1(prev_block_id)
123            .await;
124
125        self.wait_for_block(&block_id).await
126    }
127
128    // === Block data ===
129
130    pub async fn store_block_data(
131        &self,
132        block: &BlockStuff,
133        archive_data: &ArchiveData,
134        meta_data: NewBlockMeta,
135    ) -> Result<StoreBlockResult> {
136        // NOTE: Any amount of blocks can be stored concurrently,
137        // but the subscription lock can be acquired only while
138        // no block data is being stored.
139        let guard = self.store_block_data.read().await;
140
141        let block_id = block.id();
142        let (handle, status) = self
143            .block_handle_storage
144            .create_or_load_handle(block_id, meta_data);
145
146        let archive_id = PackageEntryKey::block(block_id);
147        let mut updated = false;
148        if !handle.has_data() {
149            let data = archive_data.clone_new_archive_data()?;
150            metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
151
152            let lock = handle.block_data_lock().write().await;
153            if !handle.has_data() {
154                self.blob_storage.add_data(&archive_id, data, &lock).await?;
155                if handle.meta().add_flags(BlockFlags::HAS_DATA) {
156                    self.block_handle_storage.store_handle(&handle, false);
157                    updated = true;
158                }
159            }
160        }
161
162        // TODO: only notify subscribers if `updated`?
163        self.block_subscriptions.notify(block_id, block);
164
165        drop(guard);
166
167        // Update block cache
168        self.blocks_cache.insert(*block_id, block.clone());
169
170        Ok(StoreBlockResult {
171            handle,
172            updated,
173            new: status == HandleCreationStatus::Created,
174        })
175    }
176
177    pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
178        metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
179
180        const BIG_DATA_THRESHOLD: usize = 1 << 20; // 1 MB
181
182        let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
183
184        anyhow::ensure!(
185            handle.has_data(),
186            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
187        );
188
189        // Fast path - lookup in cache
190        if let Some(block) = self.blocks_cache.get(handle.id()) {
191            metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
192            return Ok(block.clone());
193        }
194
195        let data = self
196            .blob_storage
197            .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
198            .await?;
199
200        if data.len() < BIG_DATA_THRESHOLD {
201            BlockStuff::deserialize(handle.id(), data.as_ref())
202        } else {
203            let handle = handle.clone();
204            rayon_run(move || BlockStuff::deserialize(handle.id(), data.as_ref())).await
205        }
206    }
207
208    pub async fn load_block_data_decompressed(&self, handle: &BlockHandle) -> Result<Bytes> {
209        anyhow::ensure!(
210            handle.has_data(),
211            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
212        );
213
214        self.blob_storage
215            .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
216            .await
217    }
218
219    pub async fn list_blocks(
220        &self,
221        continuation: Option<BlockIdShort>,
222    ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
223        self.blob_storage.list_blocks(continuation).await
224    }
225
226    pub fn list_archive_ids(&self) -> Vec<u32> {
227        self.blob_storage.list_archive_ids()
228    }
229
230    pub async fn load_block_data_range(
231        &self,
232        handle: &BlockHandle,
233        offset: u64,
234        length: u64,
235    ) -> Result<Option<Bytes>> {
236        anyhow::ensure!(
237            handle.has_data(),
238            BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
239        );
240        self.blob_storage
241            .get_block_data_range(handle, offset, length)
242            .await
243    }
244
245    pub fn get_compressed_block_data_size(&self, handle: &BlockHandle) -> Result<Option<u64>> {
246        let key = PackageEntryKey::block(handle.id());
247        Ok(self.blob_storage.blocks().get_size(&key)?)
248    }
249
250    pub async fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
251        self.blob_storage.find_mc_block_data(mc_seqno).await
252    }
253
254    // === Block proof ===
255
256    pub async fn store_block_proof(
257        &self,
258        proof: &BlockProofStuffAug,
259        handle: MaybeExistingHandle,
260    ) -> Result<StoreBlockResult> {
261        let block_id = proof.id();
262        if let MaybeExistingHandle::Existing(handle) = &handle
263            && handle.id() != block_id
264        {
265            anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
266                expected: block_id.as_short_id(),
267                actual: handle.id().as_short_id(),
268            });
269        }
270
271        let (handle, status) = match handle {
272            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
273            MaybeExistingHandle::New(meta_data) => self
274                .block_handle_storage
275                .create_or_load_handle(block_id, meta_data),
276        };
277
278        let mut updated = false;
279        let archive_id = PackageEntryKey::proof(block_id);
280        if !handle.has_proof() {
281            let data = proof.clone_new_archive_data()?;
282
283            let lock = handle.proof_data_lock().write().await;
284            if !handle.has_proof() {
285                self.blob_storage.add_data(&archive_id, data, &lock).await?;
286                if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
287                    self.block_handle_storage.store_handle(&handle, false);
288                    updated = true;
289                }
290            }
291        }
292
293        Ok(StoreBlockResult {
294            handle,
295            updated,
296            new: status == HandleCreationStatus::Created,
297        })
298    }
299
300    pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
301        let raw_proof = self.load_block_proof_raw(handle).await?;
302        BlockProofStuff::deserialize(handle.id(), &raw_proof)
303    }
304
305    pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
306        anyhow::ensure!(
307            handle.has_proof(),
308            BlockStorageError::BlockProofNotFound(handle.id().as_short_id())
309        );
310
311        self.blob_storage
312            .get_block_data_decompressed(handle, &PackageEntryKey::proof(handle.id()))
313            .await
314    }
315
316    // === Queue diff ===
317
318    pub async fn store_queue_diff(
319        &self,
320        queue_diff: &QueueDiffStuffAug,
321        handle: MaybeExistingHandle,
322    ) -> Result<StoreBlockResult> {
323        let block_id = queue_diff.block_id();
324        if let MaybeExistingHandle::Existing(handle) = &handle
325            && handle.id() != block_id
326        {
327            anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
328                expected: block_id.as_short_id(),
329                actual: handle.id().as_short_id(),
330            });
331        }
332
333        let (handle, status) = match handle {
334            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
335            MaybeExistingHandle::New(meta_data) => self
336                .block_handle_storage
337                .create_or_load_handle(block_id, meta_data),
338        };
339
340        let mut updated = false;
341        let archive_id = PackageEntryKey::queue_diff(block_id);
342        if !handle.has_queue_diff() {
343            let data = queue_diff.clone_new_archive_data()?;
344
345            let lock = handle.queue_diff_data_lock().write().await;
346            if !handle.has_queue_diff() {
347                self.blob_storage.add_data(&archive_id, data, &lock).await?;
348                if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
349                    self.block_handle_storage.store_handle(&handle, false);
350                    updated = true;
351                }
352            }
353        }
354
355        Ok(StoreBlockResult {
356            handle,
357            updated,
358            new: status == HandleCreationStatus::Created,
359        })
360    }
361
362    pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
363        let raw_diff = self.load_queue_diff_raw(handle).await?;
364        QueueDiffStuff::deserialize(handle.id(), &raw_diff)
365    }
366
367    pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
368        anyhow::ensure!(
369            handle.has_queue_diff(),
370            BlockStorageError::QueueDiffNotFound(handle.id().as_short_id())
371        );
372
373        self.blob_storage
374            .get_block_data_decompressed(handle, &PackageEntryKey::queue_diff(handle.id()))
375            .await
376    }
377
378    // === Archive stuff ===
379
380    pub async fn move_into_archive(
381        &self,
382        handle: &BlockHandle,
383        mc_is_key_block: bool,
384    ) -> Result<()> {
385        self.blob_storage
386            .move_into_archive(handle, mc_is_key_block)
387            .await
388    }
389
390    pub async fn wait_for_archive_commit(&self) -> Result<()> {
391        self.blob_storage.wait_for_archive_commit().await
392    }
393
394    /// Returns a corresponding archive id for the specified masterchain seqno.
395    pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
396        self.blob_storage.get_archive_id(mc_seqno)
397    }
398
399    pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
400        self.blob_storage.get_archive_size(id)
401    }
402
403    pub fn get_archive_reader(&self, id: u32) -> Result<Option<BufReader<File>>> {
404        self.blob_storage.get_archive_reader(id)
405    }
406
407    /// Get the complete archive (compressed).
408    ///
409    /// Must not be used for anything other than tests.
410    #[cfg(any(test, feature = "test"))]
411    pub async fn get_archive_compressed_full(&self, id: u32) -> Result<Option<Bytes>> {
412        self.blob_storage.get_archive_full(id).await
413    }
414
415    /// Get a chunk of the archive at the specified offset.
416    pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<Bytes> {
417        self.blob_storage.get_archive_chunk(id, offset).await
418    }
419
420    pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
421        self.blob_storage.subscribe_to_archive_ids()
422    }
423
424    pub async fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
425        self.blob_storage.remove_outdated_archives(until_id).await
426    }
427
428    // === GC stuff ===
429
430    #[tracing::instrument(skip(self, max_blocks_per_batch))]
431    pub async fn remove_outdated_blocks(
432        &self,
433        mc_seqno: u32,
434        max_blocks_per_batch: Option<usize>,
435    ) -> Result<()> {
436        if mc_seqno == 0 {
437            return Ok(());
438        }
439
440        tracing::info!("started blocks GC for mc_block {mc_seqno}");
441
442        let block = self
443            .blob_storage
444            .find_mc_block_data(mc_seqno)
445            .await
446            .context("failed to load target block data")?
447            .context("target block not found")?;
448
449        let shard_heights = {
450            let extra = block.extra.load()?;
451            let custom = extra.custom.context("mc block extra not found")?.load()?;
452            custom
453                .shards
454                .latest_blocks()
455                .map(|id| id.map(|id| (id.shard, id.seqno)))
456                .collect::<Result<_, tycho_types::error::Error>>()?
457        };
458
459        // Remove all expired entries
460        let total_cached_handles_removed = self
461            .block_handle_storage
462            .gc_handles_cache(mc_seqno, &shard_heights);
463
464        let cancelled = CancellationFlag::new();
465        scopeguard::defer! {
466            cancelled.cancel();
467        }
468
469        // Since remove_blocks needs blob_storage but we can't move self into the closure,
470        // we'll call it directly without rayon_run for now
471        let BlockGcStats {
472            mc_blocks_removed,
473            total_blocks_removed,
474        } = blobs::remove_blocks(
475            self.blob_storage.db().clone(),
476            &self.blob_storage,
477            max_blocks_per_batch,
478            mc_seqno,
479            shard_heights,
480            Some(&cancelled),
481        )?;
482
483        tracing::info!(
484            total_cached_handles_removed,
485            mc_blocks_removed,
486            total_blocks_removed,
487            "finished blocks GC"
488        );
489        Ok(())
490    }
491
492    // === Internal ===
493
494    #[cfg(test)]
495    pub fn db(&self) -> &CoreDb {
496        self.blob_storage.db()
497    }
498
499    #[cfg(any(test, feature = "test"))]
500    pub fn blob_storage(&self) -> &blobs::BlobStorage {
501        &self.blob_storage
502    }
503}
504
505#[derive(Clone)]
506pub enum MaybeExistingHandle {
507    Existing(BlockHandle),
508    New(NewBlockMeta),
509}
510
511impl From<BlockHandle> for MaybeExistingHandle {
512    fn from(handle: BlockHandle) -> Self {
513        Self::Existing(handle)
514    }
515}
516
517impl From<NewBlockMeta> for MaybeExistingHandle {
518    fn from(meta_data: NewBlockMeta) -> Self {
519        Self::New(meta_data)
520    }
521}
522
523pub struct StoreBlockResult {
524    pub handle: BlockHandle,
525    pub updated: bool,
526    pub new: bool,
527}
528
529pub struct BlockStorageConfig {
530    pub blocks_cache: BlocksCacheConfig,
531    pub blobs_root: PathBuf,
532    pub blob_db_config: BlobDbConfig,
533}
534
535type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
536
537#[derive(thiserror::Error, Debug)]
538enum BlockStorageError {
539    #[error("Block data not found for block: {0}")]
540    BlockDataNotFound(BlockIdShort),
541    #[error("Block proof not found for block: {0}")]
542    BlockProofNotFound(BlockIdShort),
543    #[error("Queue diff not found for block: {0}")]
544    QueueDiffNotFound(BlockIdShort),
545    #[error("Block handle id mismatch: expected {expected}, got {actual}")]
546    BlockHandleIdMismatch {
547        expected: BlockIdShort,
548        actual: BlockIdShort,
549    },
550}
551
552#[cfg(test)]
553mod tests {
554    use std::pin::pin;
555
556    use blobs::*;
557    use tycho_block_util::archive::{ArchiveEntryType, WithArchiveData};
558    use tycho_storage::StorageContext;
559    use tycho_types::prelude::*;
560    use tycho_util::FastHashMap;
561    use tycho_util::futures::JoinTask;
562
563    use super::*;
564    use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
565
566    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
567    async fn parallel_store_data() -> Result<()> {
568        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
569        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
570
571        let shard = ShardIdent::MASTERCHAIN;
572        for seqno in 0..1000 {
573            let block = BlockStuff::new_empty(shard, seqno);
574            let block = {
575                let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
576                WithArchiveData::new(block, data)
577            };
578            let block_id = block.id();
579
580            let proof = BlockProofStuff::new_empty(block_id);
581            let proof = {
582                let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
583                WithArchiveData::new(proof, data)
584            };
585
586            let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
587                .serialize()
588                .build(block_id);
589
590            let block_meta = NewBlockMeta {
591                is_key_block: shard.is_masterchain() && seqno == 0,
592                gen_utime: 0,
593                ref_by_mc_seqno: seqno,
594            };
595
596            let store_block_data = || {
597                let storage = storage.clone();
598                JoinTask::new(async move {
599                    let res = storage
600                        .block_storage()
601                        .store_block_data(&block, &block.archive_data, block_meta)
602                        .await?;
603
604                    Ok::<_, anyhow::Error>(res.handle)
605                })
606            };
607
608            let store_proof_and_queue = || {
609                let storage = storage.clone();
610                let proof = proof.clone();
611                let queue_diff = queue_diff.clone();
612                JoinTask::new(async move {
613                    if rand::random::<bool>() {
614                        tokio::task::yield_now().await;
615                    }
616
617                    let res = storage
618                        .block_storage()
619                        .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
620                        .await?;
621
622                    if rand::random::<bool>() {
623                        tokio::task::yield_now().await;
624                    }
625
626                    let res = storage
627                        .block_storage()
628                        .store_queue_diff(&queue_diff, res.handle.into())
629                        .await?;
630
631                    if rand::random::<bool>() {
632                        tokio::task::yield_now().await;
633                    }
634
635                    Ok::<_, anyhow::Error>(res.handle)
636                })
637            };
638
639            let (data_res, proof_and_queue_res) = async move {
640                let data_fut = pin!(store_block_data());
641                let proof_and_queue_fut = pin!(async {
642                    tokio::select! {
643                        left = store_proof_and_queue() => left,
644                        right = store_proof_and_queue() => right,
645                    }
646                });
647
648                let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
649
650                Ok::<_, anyhow::Error>((data?, other?))
651            }
652            .await?;
653
654            assert!(std::ptr::addr_eq(
655                arc_swap::RefCnt::as_ptr(&data_res),
656                arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
657            ));
658            assert!(data_res.has_all_block_parts());
659        }
660
661        Ok(())
662    }
663
664    #[tokio::test]
665    async fn blocks_gc() -> Result<()> {
666        const GARBAGE: Bytes = Bytes::from_static(b"garbage");
667        const ENTRY_TYPES: [ArchiveEntryType; 3] = [
668            ArchiveEntryType::Block,
669            ArchiveEntryType::Proof,
670            ArchiveEntryType::QueueDiff,
671        ];
672        const CONNECTION_TYPES: [BlockConnection; 2] =
673            [BlockConnection::Prev1, BlockConnection::Next1];
674
675        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
676        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
677
678        let blocks = storage.block_storage();
679        let block_handles = storage.block_handle_storage();
680        let block_connections = storage.block_connection_storage();
681
682        let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
683
684        for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
685            let entry = shard_block_ids.entry(shard).or_default();
686
687            for seqno in 0..100 {
688                let block_id = BlockId {
689                    shard,
690                    seqno,
691                    root_hash: HashBytes(rand::random()),
692                    file_hash: HashBytes(rand::random()),
693                };
694                entry.push(block_id);
695
696                let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
697                    is_key_block: shard.is_masterchain() && seqno == 0,
698                    gen_utime: 0,
699                    ref_by_mc_seqno: seqno,
700                });
701                let lock = handle.block_data_lock().write().await;
702
703                for ty in ENTRY_TYPES {
704                    blocks
705                        .blob_storage
706                        .add_data(&(block_id, ty).into(), GARBAGE, &lock)
707                        .await?;
708                }
709                for direction in CONNECTION_TYPES {
710                    block_connections.store_connection(&handle, direction, &block_id);
711                }
712
713                handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
714                block_handles.store_handle(&handle, false);
715            }
716        }
717
718        // Remove some blocks
719        let stats = blobs::remove_blocks(
720            blocks.db().clone(),
721            &blocks.blob_storage,
722            None,
723            70,
724            [(ShardIdent::BASECHAIN, 50)].into(),
725            None,
726        )?;
727        assert_eq!(stats, BlockGcStats {
728            mc_blocks_removed: 69,
729            total_blocks_removed: 69 + 49,
730        });
731
732        let removed_ranges = FastHashMap::from_iter([
733            (ShardIdent::MASTERCHAIN, vec![1..=69]),
734            (ShardIdent::BASECHAIN, vec![1..=49]),
735        ]);
736        for (shard, block_ids) in shard_block_ids {
737            let removed_ranges = removed_ranges.get(&shard).unwrap();
738
739            for block_id in block_ids {
740                let must_be_removed = 'removed: {
741                    for range in removed_ranges {
742                        if range.contains(&block_id.seqno) {
743                            break 'removed true;
744                        }
745                    }
746                    false
747                };
748
749                let handle = block_handles.load_handle(&block_id);
750                assert_eq!(handle.is_none(), must_be_removed);
751
752                for ty in ENTRY_TYPES {
753                    let key = PackageEntryKey::from((block_id, ty));
754                    // Check if the entry exists in Cassadilia
755                    let exists_in_cas = blocks
756                        .blob_storage()
757                        .blocks()
758                        .read_index_state()
759                        .contains_key(&key);
760                    assert_eq!(!exists_in_cas, must_be_removed);
761                }
762
763                for direction in CONNECTION_TYPES {
764                    let connection = block_connections.load_connection(&block_id, direction);
765                    assert_eq!(connection.is_none(), must_be_removed);
766                }
767            }
768        }
769
770        // Remove single block
771        let stats = blobs::remove_blocks(
772            blocks.db().clone(),
773            &blocks.blob_storage,
774            None,
775            71,
776            [(ShardIdent::BASECHAIN, 51)].into(),
777            None,
778        )?;
779        assert_eq!(stats, BlockGcStats {
780            mc_blocks_removed: 1,
781            total_blocks_removed: 2,
782        });
783
784        // Remove no blocks
785        let stats = blobs::remove_blocks(
786            blocks.db().clone(),
787            &blocks.blob_storage,
788            None,
789            71,
790            [(ShardIdent::BASECHAIN, 51)].into(),
791            None,
792        )?;
793        assert_eq!(stats, BlockGcStats {
794            mc_blocks_removed: 0,
795            total_blocks_removed: 0,
796        });
797
798        Ok(())
799    }
800}