1use std::fs::File;
2use std::io::BufReader;
3use std::num::NonZeroU32;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use tokio::sync::broadcast;
10use tycho_block_util::archive::ArchiveData;
11use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
12use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
13use tycho_types::models::*;
14use tycho_util::FastHasherState;
15use tycho_util::metrics::HistogramGuard;
16use tycho_util::sync::{CancellationFlag, rayon_run};
17
18pub use self::package_entry::{PackageEntryKey, PartialBlockId};
19use super::util::SlotSubscriptions;
20use super::{
21 BlockConnectionStorage, BlockFlags, BlockHandle, BlockHandleStorage, BlocksCacheConfig, CoreDb,
22 HandleCreationStatus, NewBlockMeta,
23};
24
25pub(crate) mod blobs;
26mod package_entry;
27
28pub use blobs::{ArchiveId, BlockGcStats, OpenStats};
29
30use crate::storage::block::blobs::DEFAULT_CHUNK_SIZE;
31use crate::storage::config::BlobDbConfig;
32
33const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
34const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
35
36pub struct BlockStorage {
37 blocks_cache: BlocksCache,
38 block_handle_storage: Arc<BlockHandleStorage>,
39 block_connection_storage: Arc<BlockConnectionStorage>,
40 block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
41 store_block_data: tokio::sync::RwLock<()>,
42 pub(crate) blob_storage: Arc<blobs::BlobStorage>,
43}
44
45impl BlockStorage {
46 pub const DEFAULT_BLOB_CHUNK_SIZE: NonZeroU32 =
47 NonZeroU32::new(DEFAULT_CHUNK_SIZE as u32).unwrap();
48 pub async fn new(
51 db: CoreDb,
52 config: BlockStorageConfig,
53 block_handle_storage: Arc<BlockHandleStorage>,
54 block_connection_storage: Arc<BlockConnectionStorage>,
55 ) -> Result<Self> {
56 fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
57 const BLOCK_STUFF_OVERHEAD: u32 = 1024; size_of::<BlockId>() as u32
60 + BLOCK_STUFF_OVERHEAD
61 + value.data_size().try_into().unwrap_or(u32::MAX)
62 }
63
64 let blocks_cache = moka::sync::Cache::builder()
65 .time_to_live(config.blocks_cache.ttl)
66 .max_capacity(config.blocks_cache.size.as_u64())
67 .weigher(weigher)
68 .build_with_hasher(Default::default());
69
70 let blob_storage = blobs::BlobStorage::new(
71 db,
72 block_handle_storage.clone(),
73 &config.blobs_root,
74 config.blob_db_config.pre_create_cas_tree,
75 )
76 .await
77 .map(Arc::new)?;
78
79 Ok(Self {
80 blocks_cache,
81 block_handle_storage,
82 block_connection_storage,
83 block_subscriptions: Default::default(),
84 store_block_data: Default::default(),
85 blob_storage,
86 })
87 }
88
89 pub fn open_stats(&self) -> &OpenStats {
90 self.blob_storage.open_stats()
91 }
92
93 pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
96 let block_handle_storage = &self.block_handle_storage;
97
98 let guard = self.store_block_data.write().await;
100
101 if let Some(handle) = block_handle_storage.load_handle(block_id)
103 && handle.has_data()
104 {
105 drop(guard);
106 let block = self.load_block_data(&handle).await?;
107 return Ok(BlockStuffAug::loaded(block));
108 }
109
110 let rx = self.block_subscriptions.subscribe(block_id);
112
113 drop(guard);
115
116 let block = rx.await;
117 Ok(BlockStuffAug::loaded(block))
118 }
119
120 pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
121 let block_id = self
122 .block_connection_storage
123 .wait_for_next1(prev_block_id)
124 .await;
125
126 self.wait_for_block(&block_id).await
127 }
128
129 pub async fn store_block_data(
132 &self,
133 block: &BlockStuff,
134 archive_data: &ArchiveData,
135 meta_data: NewBlockMeta,
136 ) -> Result<StoreBlockResult> {
137 let guard = self.store_block_data.read().await;
141
142 let block_id = block.id();
143 let (handle, status) = self
144 .block_handle_storage
145 .create_or_load_handle(block_id, meta_data);
146
147 let archive_id = PackageEntryKey::block(block_id);
148 let mut updated = false;
149 if !handle.has_data() {
150 let data = archive_data.clone_new_archive_data()?;
151 metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
152
153 let lock = handle.block_data_lock().write().await;
154 if !handle.has_data() {
155 self.blob_storage.add_data(&archive_id, data, &lock).await?;
156 if handle.meta().add_flags(BlockFlags::HAS_DATA) {
157 self.block_handle_storage.store_handle(&handle, false);
158 updated = true;
159 }
160 }
161 }
162
163 self.block_subscriptions.notify(block_id, block);
165
166 drop(guard);
167
168 self.blocks_cache.insert(*block_id, block.clone());
170
171 Ok(StoreBlockResult {
172 handle,
173 updated,
174 new: status == HandleCreationStatus::Created,
175 })
176 }
177
178 pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
179 metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
180
181 const BIG_DATA_THRESHOLD: usize = 1 << 20; let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
184
185 anyhow::ensure!(
186 handle.has_data(),
187 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
188 );
189
190 if let Some(block) = self.blocks_cache.get(handle.id()) {
192 metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
193 return Ok(block.clone());
194 }
195
196 let data = self
197 .blob_storage
198 .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
199 .await?;
200
201 if data.len() < BIG_DATA_THRESHOLD {
202 BlockStuff::deserialize(handle.id(), data.as_ref())
203 } else {
204 let handle = handle.clone();
205 rayon_run(move || BlockStuff::deserialize(handle.id(), data.as_ref())).await
206 }
207 }
208
209 pub async fn load_block_data_decompressed(&self, handle: &BlockHandle) -> Result<Bytes> {
210 anyhow::ensure!(
211 handle.has_data(),
212 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
213 );
214
215 self.blob_storage
216 .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
217 .await
218 }
219
220 pub async fn list_blocks(
221 &self,
222 continuation: Option<BlockIdShort>,
223 ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
224 self.blob_storage.list_blocks(continuation).await
225 }
226
227 pub fn list_archive_ids(&self) -> Vec<u32> {
228 self.blob_storage.list_archive_ids()
229 }
230
231 pub async fn load_block_data_range(
232 &self,
233 handle: &BlockHandle,
234 offset: u64,
235 length: u64,
236 ) -> Result<Option<Bytes>> {
237 anyhow::ensure!(
238 handle.has_data(),
239 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
240 );
241 self.blob_storage
242 .get_block_data_range(handle, offset, length)
243 .await
244 }
245
246 pub fn get_compressed_block_data_size(&self, handle: &BlockHandle) -> Result<Option<u64>> {
247 let key = PackageEntryKey::block(handle.id());
248 Ok(self.blob_storage.blocks().get_size(&key)?)
249 }
250
251 pub async fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
252 self.blob_storage.find_mc_block_data(mc_seqno).await
253 }
254
255 pub async fn store_block_proof(
258 &self,
259 proof: &BlockProofStuffAug,
260 handle: MaybeExistingHandle,
261 ) -> Result<StoreBlockResult> {
262 let block_id = proof.id();
263 if let MaybeExistingHandle::Existing(handle) = &handle
264 && handle.id() != block_id
265 {
266 anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
267 expected: block_id.as_short_id(),
268 actual: handle.id().as_short_id(),
269 });
270 }
271
272 let (handle, status) = match handle {
273 MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
274 MaybeExistingHandle::New(meta_data) => self
275 .block_handle_storage
276 .create_or_load_handle(block_id, meta_data),
277 };
278
279 let mut updated = false;
280 let archive_id = PackageEntryKey::proof(block_id);
281 if !handle.has_proof() {
282 let data = proof.clone_new_archive_data()?;
283
284 let lock = handle.proof_data_lock().write().await;
285 if !handle.has_proof() {
286 self.blob_storage.add_data(&archive_id, data, &lock).await?;
287 if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
288 self.block_handle_storage.store_handle(&handle, false);
289 updated = true;
290 }
291 }
292 }
293
294 Ok(StoreBlockResult {
295 handle,
296 updated,
297 new: status == HandleCreationStatus::Created,
298 })
299 }
300
301 pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
302 let raw_proof = self.load_block_proof_raw(handle).await?;
303 BlockProofStuff::deserialize(handle.id(), &raw_proof)
304 }
305
306 pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
307 anyhow::ensure!(
308 handle.has_proof(),
309 BlockStorageError::BlockProofNotFound(handle.id().as_short_id())
310 );
311
312 self.blob_storage
313 .get_block_data_decompressed(handle, &PackageEntryKey::proof(handle.id()))
314 .await
315 }
316
317 pub async fn store_queue_diff(
320 &self,
321 queue_diff: &QueueDiffStuffAug,
322 handle: MaybeExistingHandle,
323 ) -> Result<StoreBlockResult> {
324 let block_id = queue_diff.block_id();
325 if let MaybeExistingHandle::Existing(handle) = &handle
326 && handle.id() != block_id
327 {
328 anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
329 expected: block_id.as_short_id(),
330 actual: handle.id().as_short_id(),
331 });
332 }
333
334 let (handle, status) = match handle {
335 MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
336 MaybeExistingHandle::New(meta_data) => self
337 .block_handle_storage
338 .create_or_load_handle(block_id, meta_data),
339 };
340
341 let mut updated = false;
342 let archive_id = PackageEntryKey::queue_diff(block_id);
343 if !handle.has_queue_diff() {
344 let data = queue_diff.clone_new_archive_data()?;
345
346 let lock = handle.queue_diff_data_lock().write().await;
347 if !handle.has_queue_diff() {
348 self.blob_storage.add_data(&archive_id, data, &lock).await?;
349 if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
350 self.block_handle_storage.store_handle(&handle, false);
351 updated = true;
352 }
353 }
354 }
355
356 Ok(StoreBlockResult {
357 handle,
358 updated,
359 new: status == HandleCreationStatus::Created,
360 })
361 }
362
363 pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
364 const BIG_DATA_THRESHOLD: usize = 100 << 10; let data = self.load_queue_diff_raw(handle).await?;
367
368 if data.len() < BIG_DATA_THRESHOLD {
369 QueueDiffStuff::deserialize(handle.id(), &data)
370 } else {
371 let handle = handle.clone();
372 rayon_run(move || QueueDiffStuff::deserialize(handle.id(), data.as_ref())).await
373 }
374 }
375
376 pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
377 anyhow::ensure!(
378 handle.has_queue_diff(),
379 BlockStorageError::QueueDiffNotFound(handle.id().as_short_id())
380 );
381
382 self.blob_storage
383 .get_block_data_decompressed(handle, &PackageEntryKey::queue_diff(handle.id()))
384 .await
385 }
386
387 pub async fn move_into_archive(
390 &self,
391 handle: &BlockHandle,
392 mc_is_key_block: bool,
393 ) -> Result<()> {
394 self.blob_storage
395 .move_into_archive(handle, mc_is_key_block)
396 .await
397 }
398
399 pub async fn wait_for_archive_commit(&self) -> Result<()> {
400 self.blob_storage.wait_for_archive_commit().await
401 }
402
403 pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
405 self.blob_storage.get_archive_id(mc_seqno)
406 }
407
408 pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
409 self.blob_storage.get_archive_size(id)
410 }
411
412 pub fn get_archive_reader(&self, id: u32) -> Result<Option<BufReader<File>>> {
413 self.blob_storage.get_archive_reader(id)
414 }
415
416 #[cfg(any(test, feature = "test"))]
420 pub async fn get_archive_compressed_full(&self, id: u32) -> Result<Option<Bytes>> {
421 self.blob_storage.get_archive_full(id).await
422 }
423
424 pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<Bytes> {
426 self.blob_storage.get_archive_chunk(id, offset).await
427 }
428
429 pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
430 self.blob_storage.subscribe_to_archive_ids()
431 }
432
433 pub async fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
434 self.blob_storage.remove_outdated_archives(until_id).await
435 }
436
437 pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
438 self.blob_storage.estimate_archive_id(mc_seqno)
439 }
440
441 #[tracing::instrument(skip(self, max_blocks_per_batch))]
444 pub async fn remove_outdated_blocks(
445 &self,
446 mc_seqno: u32,
447 max_blocks_per_batch: Option<usize>,
448 ) -> Result<()> {
449 if mc_seqno == 0 {
450 return Ok(());
451 }
452
453 tracing::info!("started blocks GC for mc_block {mc_seqno}");
454
455 let block = self
456 .blob_storage
457 .find_mc_block_data(mc_seqno)
458 .await
459 .context("failed to load target block data")?
460 .context("target block not found")?;
461
462 let shard_heights = {
463 let extra = block.extra.load()?;
464 let custom = extra.custom.context("mc block extra not found")?.load()?;
465 custom
466 .shards
467 .latest_blocks()
468 .map(|id| id.map(|id| (id.shard, id.seqno)))
469 .collect::<Result<_, tycho_types::error::Error>>()?
470 };
471
472 let total_cached_handles_removed = self
474 .block_handle_storage
475 .gc_handles_cache(mc_seqno, &shard_heights);
476
477 let cancelled = CancellationFlag::new();
478 scopeguard::defer! {
479 cancelled.cancel();
480 }
481
482 let BlockGcStats {
485 mc_blocks_removed,
486 total_blocks_removed,
487 } = tokio::task::spawn_blocking({
488 let blob_storage = self.blob_storage.clone();
489 let cancelled = cancelled.clone();
490 move || {
491 blobs::remove_blocks(
492 &blob_storage,
493 max_blocks_per_batch,
494 mc_seqno,
495 shard_heights,
496 Some(&cancelled),
497 )
498 }
499 })
500 .await??;
501
502 tracing::info!(
503 total_cached_handles_removed,
504 mc_blocks_removed,
505 total_blocks_removed,
506 "finished blocks GC"
507 );
508 Ok(())
509 }
510
511 #[cfg(any(test, feature = "test"))]
514 pub fn blob_storage(&self) -> &blobs::BlobStorage {
515 &self.blob_storage
516 }
517}
518
519#[derive(Clone)]
520pub enum MaybeExistingHandle {
521 Existing(BlockHandle),
522 New(NewBlockMeta),
523}
524
525impl From<BlockHandle> for MaybeExistingHandle {
526 fn from(handle: BlockHandle) -> Self {
527 Self::Existing(handle)
528 }
529}
530
531impl From<NewBlockMeta> for MaybeExistingHandle {
532 fn from(meta_data: NewBlockMeta) -> Self {
533 Self::New(meta_data)
534 }
535}
536
537pub struct StoreBlockResult {
538 pub handle: BlockHandle,
539 pub updated: bool,
540 pub new: bool,
541}
542
543pub struct BlockStorageConfig {
544 pub blocks_cache: BlocksCacheConfig,
545 pub blobs_root: PathBuf,
546 pub blob_db_config: BlobDbConfig,
547}
548
549type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
550
551#[derive(thiserror::Error, Debug)]
552enum BlockStorageError {
553 #[error("Block data not found for block: {0}")]
554 BlockDataNotFound(BlockIdShort),
555 #[error("Block proof not found for block: {0}")]
556 BlockProofNotFound(BlockIdShort),
557 #[error("Queue diff not found for block: {0}")]
558 QueueDiffNotFound(BlockIdShort),
559 #[error("Block handle id mismatch: expected {expected}, got {actual}")]
560 BlockHandleIdMismatch {
561 expected: BlockIdShort,
562 actual: BlockIdShort,
563 },
564}
565
566#[cfg(test)]
567mod tests {
568 use std::pin::pin;
569
570 use blobs::*;
571 use tycho_block_util::archive::{ArchiveEntryType, WithArchiveData};
572 use tycho_storage::StorageContext;
573 use tycho_types::prelude::*;
574 use tycho_util::FastHashMap;
575 use tycho_util::futures::JoinTask;
576
577 use super::*;
578 use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
579
580 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
581 async fn parallel_store_data() -> Result<()> {
582 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
583 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
584
585 let shard = ShardIdent::MASTERCHAIN;
586 for seqno in 0..1000 {
587 let block = BlockStuff::new_empty(shard, seqno);
588 let block = {
589 let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
590 WithArchiveData::new(block, data)
591 };
592 let block_id = block.id();
593
594 let proof = BlockProofStuff::new_empty(block_id);
595 let proof = {
596 let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
597 WithArchiveData::new(proof, data)
598 };
599
600 let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
601 .serialize()
602 .build(block_id);
603
604 let block_meta = NewBlockMeta {
605 is_key_block: shard.is_masterchain() && seqno == 0,
606 gen_utime: 0,
607 ref_by_mc_seqno: seqno,
608 };
609
610 let store_block_data = || {
611 let storage = storage.clone();
612 JoinTask::new(async move {
613 let res = storage
614 .block_storage()
615 .store_block_data(&block, &block.archive_data, block_meta)
616 .await?;
617
618 Ok::<_, anyhow::Error>(res.handle)
619 })
620 };
621
622 let store_proof_and_queue = || {
623 let storage = storage.clone();
624 let proof = proof.clone();
625 let queue_diff = queue_diff.clone();
626 JoinTask::new(async move {
627 if rand::random::<bool>() {
628 tokio::task::yield_now().await;
629 }
630
631 let res = storage
632 .block_storage()
633 .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
634 .await?;
635
636 if rand::random::<bool>() {
637 tokio::task::yield_now().await;
638 }
639
640 let res = storage
641 .block_storage()
642 .store_queue_diff(&queue_diff, res.handle.into())
643 .await?;
644
645 if rand::random::<bool>() {
646 tokio::task::yield_now().await;
647 }
648
649 Ok::<_, anyhow::Error>(res.handle)
650 })
651 };
652
653 let (data_res, proof_and_queue_res) = async move {
654 let data_fut = pin!(store_block_data());
655 let proof_and_queue_fut = pin!(async {
656 tokio::select! {
657 left = store_proof_and_queue() => left,
658 right = store_proof_and_queue() => right,
659 }
660 });
661
662 let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
663
664 Ok::<_, anyhow::Error>((data?, other?))
665 }
666 .await?;
667
668 assert!(std::ptr::addr_eq(
669 arc_swap::RefCnt::as_ptr(&data_res),
670 arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
671 ));
672 assert!(data_res.has_all_block_parts());
673 }
674
675 Ok(())
676 }
677
678 #[tokio::test]
679 async fn blocks_gc() -> Result<()> {
680 const GARBAGE: Bytes = Bytes::from_static(b"garbage");
681 const ENTRY_TYPES: [ArchiveEntryType; 3] = [
682 ArchiveEntryType::Block,
683 ArchiveEntryType::Proof,
684 ArchiveEntryType::QueueDiff,
685 ];
686 const CONNECTION_TYPES: [BlockConnection; 2] =
687 [BlockConnection::Prev1, BlockConnection::Next1];
688
689 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
690 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
691
692 let blocks = storage.block_storage();
693 let block_handles = storage.block_handle_storage();
694 let block_connections = storage.block_connection_storage();
695
696 let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
697
698 for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
699 let entry = shard_block_ids.entry(shard).or_default();
700
701 for seqno in 0..100 {
702 let block_id = BlockId {
703 shard,
704 seqno,
705 root_hash: HashBytes(rand::random()),
706 file_hash: HashBytes(rand::random()),
707 };
708 entry.push(block_id);
709
710 let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
711 is_key_block: shard.is_masterchain() && seqno == 0,
712 gen_utime: 0,
713 ref_by_mc_seqno: seqno,
714 });
715 let lock = handle.block_data_lock().write().await;
716
717 for ty in ENTRY_TYPES {
718 blocks
719 .blob_storage
720 .add_data(&(block_id, ty).into(), GARBAGE, &lock)
721 .await?;
722 }
723 for direction in CONNECTION_TYPES {
724 block_connections.store_connection(&handle, direction, &block_id);
725 }
726
727 handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
728 block_handles.store_handle(&handle, false);
729 }
730 }
731
732 let stats = blobs::remove_blocks(
734 &blocks.blob_storage,
735 None,
736 70,
737 [(ShardIdent::BASECHAIN, 50)].into(),
738 None,
739 )?;
740 assert_eq!(stats, BlockGcStats {
741 mc_blocks_removed: 69,
742 total_blocks_removed: 69 + 49,
743 });
744
745 let removed_ranges = FastHashMap::from_iter([
746 (ShardIdent::MASTERCHAIN, vec![1..=69]),
747 (ShardIdent::BASECHAIN, vec![1..=49]),
748 ]);
749 for (shard, block_ids) in shard_block_ids {
750 let removed_ranges = removed_ranges.get(&shard).unwrap();
751
752 for block_id in block_ids {
753 let must_be_removed = 'removed: {
754 for range in removed_ranges {
755 if range.contains(&block_id.seqno) {
756 break 'removed true;
757 }
758 }
759 false
760 };
761
762 let handle = block_handles.load_handle(&block_id);
763 assert_eq!(handle.is_none(), must_be_removed);
764
765 for ty in ENTRY_TYPES {
766 let key = PackageEntryKey::from((block_id, ty));
767 let exists_in_cas = blocks
769 .blob_storage()
770 .blocks()
771 .read_index_state()
772 .contains_key(&key);
773 assert_eq!(!exists_in_cas, must_be_removed);
774 }
775
776 for direction in CONNECTION_TYPES {
777 let connection = block_connections.load_connection(&block_id, direction);
778 assert_eq!(connection.is_none(), must_be_removed);
779 }
780 }
781 }
782
783 let stats = blobs::remove_blocks(
785 &blocks.blob_storage,
786 None,
787 71,
788 [(ShardIdent::BASECHAIN, 51)].into(),
789 None,
790 )?;
791 assert_eq!(stats, BlockGcStats {
792 mc_blocks_removed: 1,
793 total_blocks_removed: 2,
794 });
795
796 let stats = blobs::remove_blocks(
798 &blocks.blob_storage,
799 None,
800 71,
801 [(ShardIdent::BASECHAIN, 51)].into(),
802 None,
803 )?;
804 assert_eq!(stats, BlockGcStats {
805 mc_blocks_removed: 0,
806 total_blocks_removed: 0,
807 });
808
809 Ok(())
810 }
811}