1use std::fs::File;
2use std::io::BufReader;
3use std::num::NonZeroU32;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use tokio::sync::broadcast;
10use tycho_block_util::archive::ArchiveData;
11use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
12use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
13use tycho_types::models::*;
14use tycho_util::FastHasherState;
15use tycho_util::metrics::HistogramGuard;
16use tycho_util::sync::{CancellationFlag, rayon_run};
17
18pub use self::package_entry::{PackageEntryKey, PartialBlockId};
19use super::util::SlotSubscriptions;
20use super::{
21 BlockConnectionStorage, BlockFlags, BlockHandle, BlockHandleStorage, BlocksCacheConfig, CoreDb,
22 HandleCreationStatus, NewBlockMeta,
23};
24
25pub(crate) mod blobs;
26mod package_entry;
27
28pub use blobs::{ArchiveId, BlockGcStats, OpenStats};
29
30use crate::storage::block::blobs::DEFAULT_CHUNK_SIZE;
31use crate::storage::config::BlobDbConfig;
32
33const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
34const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
35
36pub struct BlockStorage {
37 blocks_cache: BlocksCache,
38 block_handle_storage: Arc<BlockHandleStorage>,
39 block_connection_storage: Arc<BlockConnectionStorage>,
40 block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
41 store_block_data: tokio::sync::RwLock<()>,
42 pub(crate) blob_storage: blobs::BlobStorage,
43}
44
45impl BlockStorage {
46 pub const DEFAULT_BLOB_CHUNK_SIZE: NonZeroU32 =
47 NonZeroU32::new(DEFAULT_CHUNK_SIZE as u32).unwrap();
48 pub async fn new(
51 db: CoreDb,
52 config: BlockStorageConfig,
53 block_handle_storage: Arc<BlockHandleStorage>,
54 block_connection_storage: Arc<BlockConnectionStorage>,
55 ) -> Result<Self> {
56 fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
57 const BLOCK_STUFF_OVERHEAD: u32 = 1024; size_of::<BlockId>() as u32
60 + BLOCK_STUFF_OVERHEAD
61 + value.data_size().try_into().unwrap_or(u32::MAX)
62 }
63
64 let blocks_cache = moka::sync::Cache::builder()
65 .time_to_live(config.blocks_cache.ttl)
66 .max_capacity(config.blocks_cache.size.as_u64())
67 .weigher(weigher)
68 .build_with_hasher(Default::default());
69
70 let blob_storage = blobs::BlobStorage::new(
71 db,
72 block_handle_storage.clone(),
73 &config.blobs_root,
74 config.blob_db_config.pre_create_cas_tree,
75 )
76 .await?;
77
78 Ok(Self {
79 blocks_cache,
80 block_handle_storage,
81 block_connection_storage,
82 block_subscriptions: Default::default(),
83 store_block_data: Default::default(),
84 blob_storage,
85 })
86 }
87
88 pub fn open_stats(&self) -> &OpenStats {
89 self.blob_storage.open_stats()
90 }
91
92 pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
95 let block_handle_storage = &self.block_handle_storage;
96
97 let guard = self.store_block_data.write().await;
99
100 if let Some(handle) = block_handle_storage.load_handle(block_id)
102 && handle.has_data()
103 {
104 drop(guard);
105 let block = self.load_block_data(&handle).await?;
106 return Ok(BlockStuffAug::loaded(block));
107 }
108
109 let rx = self.block_subscriptions.subscribe(block_id);
111
112 drop(guard);
114
115 let block = rx.await;
116 Ok(BlockStuffAug::loaded(block))
117 }
118
119 pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
120 let block_id = self
121 .block_connection_storage
122 .wait_for_next1(prev_block_id)
123 .await;
124
125 self.wait_for_block(&block_id).await
126 }
127
128 pub async fn store_block_data(
131 &self,
132 block: &BlockStuff,
133 archive_data: &ArchiveData,
134 meta_data: NewBlockMeta,
135 ) -> Result<StoreBlockResult> {
136 let guard = self.store_block_data.read().await;
140
141 let block_id = block.id();
142 let (handle, status) = self
143 .block_handle_storage
144 .create_or_load_handle(block_id, meta_data);
145
146 let archive_id = PackageEntryKey::block(block_id);
147 let mut updated = false;
148 if !handle.has_data() {
149 let data = archive_data.clone_new_archive_data()?;
150 metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
151
152 let lock = handle.block_data_lock().write().await;
153 if !handle.has_data() {
154 self.blob_storage.add_data(&archive_id, data, &lock).await?;
155 if handle.meta().add_flags(BlockFlags::HAS_DATA) {
156 self.block_handle_storage.store_handle(&handle, false);
157 updated = true;
158 }
159 }
160 }
161
162 self.block_subscriptions.notify(block_id, block);
164
165 drop(guard);
166
167 self.blocks_cache.insert(*block_id, block.clone());
169
170 Ok(StoreBlockResult {
171 handle,
172 updated,
173 new: status == HandleCreationStatus::Created,
174 })
175 }
176
177 pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
178 metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
179
180 const BIG_DATA_THRESHOLD: usize = 1 << 20; let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
183
184 anyhow::ensure!(
185 handle.has_data(),
186 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
187 );
188
189 if let Some(block) = self.blocks_cache.get(handle.id()) {
191 metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
192 return Ok(block.clone());
193 }
194
195 let data = self
196 .blob_storage
197 .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
198 .await?;
199
200 if data.len() < BIG_DATA_THRESHOLD {
201 BlockStuff::deserialize(handle.id(), data.as_ref())
202 } else {
203 let handle = handle.clone();
204 rayon_run(move || BlockStuff::deserialize(handle.id(), data.as_ref())).await
205 }
206 }
207
208 pub async fn load_block_data_decompressed(&self, handle: &BlockHandle) -> Result<Bytes> {
209 anyhow::ensure!(
210 handle.has_data(),
211 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
212 );
213
214 self.blob_storage
215 .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
216 .await
217 }
218
219 pub async fn list_blocks(
220 &self,
221 continuation: Option<BlockIdShort>,
222 ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
223 self.blob_storage.list_blocks(continuation).await
224 }
225
226 pub fn list_archive_ids(&self) -> Vec<u32> {
227 self.blob_storage.list_archive_ids()
228 }
229
230 pub async fn load_block_data_range(
231 &self,
232 handle: &BlockHandle,
233 offset: u64,
234 length: u64,
235 ) -> Result<Option<Bytes>> {
236 anyhow::ensure!(
237 handle.has_data(),
238 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
239 );
240 self.blob_storage
241 .get_block_data_range(handle, offset, length)
242 .await
243 }
244
245 pub fn get_compressed_block_data_size(&self, handle: &BlockHandle) -> Result<Option<u64>> {
246 let key = PackageEntryKey::block(handle.id());
247 Ok(self.blob_storage.blocks().get_size(&key)?)
248 }
249
250 pub async fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
251 self.blob_storage.find_mc_block_data(mc_seqno).await
252 }
253
254 pub async fn store_block_proof(
257 &self,
258 proof: &BlockProofStuffAug,
259 handle: MaybeExistingHandle,
260 ) -> Result<StoreBlockResult> {
261 let block_id = proof.id();
262 if let MaybeExistingHandle::Existing(handle) = &handle
263 && handle.id() != block_id
264 {
265 anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
266 expected: block_id.as_short_id(),
267 actual: handle.id().as_short_id(),
268 });
269 }
270
271 let (handle, status) = match handle {
272 MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
273 MaybeExistingHandle::New(meta_data) => self
274 .block_handle_storage
275 .create_or_load_handle(block_id, meta_data),
276 };
277
278 let mut updated = false;
279 let archive_id = PackageEntryKey::proof(block_id);
280 if !handle.has_proof() {
281 let data = proof.clone_new_archive_data()?;
282
283 let lock = handle.proof_data_lock().write().await;
284 if !handle.has_proof() {
285 self.blob_storage.add_data(&archive_id, data, &lock).await?;
286 if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
287 self.block_handle_storage.store_handle(&handle, false);
288 updated = true;
289 }
290 }
291 }
292
293 Ok(StoreBlockResult {
294 handle,
295 updated,
296 new: status == HandleCreationStatus::Created,
297 })
298 }
299
300 pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
301 let raw_proof = self.load_block_proof_raw(handle).await?;
302 BlockProofStuff::deserialize(handle.id(), &raw_proof)
303 }
304
305 pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
306 anyhow::ensure!(
307 handle.has_proof(),
308 BlockStorageError::BlockProofNotFound(handle.id().as_short_id())
309 );
310
311 self.blob_storage
312 .get_block_data_decompressed(handle, &PackageEntryKey::proof(handle.id()))
313 .await
314 }
315
316 pub async fn store_queue_diff(
319 &self,
320 queue_diff: &QueueDiffStuffAug,
321 handle: MaybeExistingHandle,
322 ) -> Result<StoreBlockResult> {
323 let block_id = queue_diff.block_id();
324 if let MaybeExistingHandle::Existing(handle) = &handle
325 && handle.id() != block_id
326 {
327 anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
328 expected: block_id.as_short_id(),
329 actual: handle.id().as_short_id(),
330 });
331 }
332
333 let (handle, status) = match handle {
334 MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
335 MaybeExistingHandle::New(meta_data) => self
336 .block_handle_storage
337 .create_or_load_handle(block_id, meta_data),
338 };
339
340 let mut updated = false;
341 let archive_id = PackageEntryKey::queue_diff(block_id);
342 if !handle.has_queue_diff() {
343 let data = queue_diff.clone_new_archive_data()?;
344
345 let lock = handle.queue_diff_data_lock().write().await;
346 if !handle.has_queue_diff() {
347 self.blob_storage.add_data(&archive_id, data, &lock).await?;
348 if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
349 self.block_handle_storage.store_handle(&handle, false);
350 updated = true;
351 }
352 }
353 }
354
355 Ok(StoreBlockResult {
356 handle,
357 updated,
358 new: status == HandleCreationStatus::Created,
359 })
360 }
361
362 pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
363 let raw_diff = self.load_queue_diff_raw(handle).await?;
364 QueueDiffStuff::deserialize(handle.id(), &raw_diff)
365 }
366
367 pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
368 anyhow::ensure!(
369 handle.has_queue_diff(),
370 BlockStorageError::QueueDiffNotFound(handle.id().as_short_id())
371 );
372
373 self.blob_storage
374 .get_block_data_decompressed(handle, &PackageEntryKey::queue_diff(handle.id()))
375 .await
376 }
377
378 pub async fn move_into_archive(
381 &self,
382 handle: &BlockHandle,
383 mc_is_key_block: bool,
384 ) -> Result<()> {
385 self.blob_storage
386 .move_into_archive(handle, mc_is_key_block)
387 .await
388 }
389
390 pub async fn wait_for_archive_commit(&self) -> Result<()> {
391 self.blob_storage.wait_for_archive_commit().await
392 }
393
394 pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
396 self.blob_storage.get_archive_id(mc_seqno)
397 }
398
399 pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
400 self.blob_storage.get_archive_size(id)
401 }
402
403 pub fn get_archive_reader(&self, id: u32) -> Result<Option<BufReader<File>>> {
404 self.blob_storage.get_archive_reader(id)
405 }
406
407 #[cfg(any(test, feature = "test"))]
411 pub async fn get_archive_compressed_full(&self, id: u32) -> Result<Option<Bytes>> {
412 self.blob_storage.get_archive_full(id).await
413 }
414
415 pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<Bytes> {
417 self.blob_storage.get_archive_chunk(id, offset).await
418 }
419
420 pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
421 self.blob_storage.subscribe_to_archive_ids()
422 }
423
424 pub async fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
425 self.blob_storage.remove_outdated_archives(until_id).await
426 }
427
428 #[tracing::instrument(skip(self, max_blocks_per_batch))]
431 pub async fn remove_outdated_blocks(
432 &self,
433 mc_seqno: u32,
434 max_blocks_per_batch: Option<usize>,
435 ) -> Result<()> {
436 if mc_seqno == 0 {
437 return Ok(());
438 }
439
440 tracing::info!("started blocks GC for mc_block {mc_seqno}");
441
442 let block = self
443 .blob_storage
444 .find_mc_block_data(mc_seqno)
445 .await
446 .context("failed to load target block data")?
447 .context("target block not found")?;
448
449 let shard_heights = {
450 let extra = block.extra.load()?;
451 let custom = extra.custom.context("mc block extra not found")?.load()?;
452 custom
453 .shards
454 .latest_blocks()
455 .map(|id| id.map(|id| (id.shard, id.seqno)))
456 .collect::<Result<_, tycho_types::error::Error>>()?
457 };
458
459 let total_cached_handles_removed = self
461 .block_handle_storage
462 .gc_handles_cache(mc_seqno, &shard_heights);
463
464 let cancelled = CancellationFlag::new();
465 scopeguard::defer! {
466 cancelled.cancel();
467 }
468
469 let BlockGcStats {
472 mc_blocks_removed,
473 total_blocks_removed,
474 } = blobs::remove_blocks(
475 self.blob_storage.db().clone(),
476 &self.blob_storage,
477 max_blocks_per_batch,
478 mc_seqno,
479 shard_heights,
480 Some(&cancelled),
481 )?;
482
483 tracing::info!(
484 total_cached_handles_removed,
485 mc_blocks_removed,
486 total_blocks_removed,
487 "finished blocks GC"
488 );
489 Ok(())
490 }
491
492 #[cfg(test)]
495 pub fn db(&self) -> &CoreDb {
496 self.blob_storage.db()
497 }
498
499 #[cfg(any(test, feature = "test"))]
500 pub fn blob_storage(&self) -> &blobs::BlobStorage {
501 &self.blob_storage
502 }
503}
504
505#[derive(Clone)]
506pub enum MaybeExistingHandle {
507 Existing(BlockHandle),
508 New(NewBlockMeta),
509}
510
511impl From<BlockHandle> for MaybeExistingHandle {
512 fn from(handle: BlockHandle) -> Self {
513 Self::Existing(handle)
514 }
515}
516
517impl From<NewBlockMeta> for MaybeExistingHandle {
518 fn from(meta_data: NewBlockMeta) -> Self {
519 Self::New(meta_data)
520 }
521}
522
523pub struct StoreBlockResult {
524 pub handle: BlockHandle,
525 pub updated: bool,
526 pub new: bool,
527}
528
529pub struct BlockStorageConfig {
530 pub blocks_cache: BlocksCacheConfig,
531 pub blobs_root: PathBuf,
532 pub blob_db_config: BlobDbConfig,
533}
534
535type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
536
537#[derive(thiserror::Error, Debug)]
538enum BlockStorageError {
539 #[error("Block data not found for block: {0}")]
540 BlockDataNotFound(BlockIdShort),
541 #[error("Block proof not found for block: {0}")]
542 BlockProofNotFound(BlockIdShort),
543 #[error("Queue diff not found for block: {0}")]
544 QueueDiffNotFound(BlockIdShort),
545 #[error("Block handle id mismatch: expected {expected}, got {actual}")]
546 BlockHandleIdMismatch {
547 expected: BlockIdShort,
548 actual: BlockIdShort,
549 },
550}
551
552#[cfg(test)]
553mod tests {
554 use std::pin::pin;
555
556 use blobs::*;
557 use tycho_block_util::archive::{ArchiveEntryType, WithArchiveData};
558 use tycho_storage::StorageContext;
559 use tycho_types::prelude::*;
560 use tycho_util::FastHashMap;
561 use tycho_util::futures::JoinTask;
562
563 use super::*;
564 use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
565
566 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
567 async fn parallel_store_data() -> Result<()> {
568 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
569 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
570
571 let shard = ShardIdent::MASTERCHAIN;
572 for seqno in 0..1000 {
573 let block = BlockStuff::new_empty(shard, seqno);
574 let block = {
575 let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
576 WithArchiveData::new(block, data)
577 };
578 let block_id = block.id();
579
580 let proof = BlockProofStuff::new_empty(block_id);
581 let proof = {
582 let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
583 WithArchiveData::new(proof, data)
584 };
585
586 let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
587 .serialize()
588 .build(block_id);
589
590 let block_meta = NewBlockMeta {
591 is_key_block: shard.is_masterchain() && seqno == 0,
592 gen_utime: 0,
593 ref_by_mc_seqno: seqno,
594 };
595
596 let store_block_data = || {
597 let storage = storage.clone();
598 JoinTask::new(async move {
599 let res = storage
600 .block_storage()
601 .store_block_data(&block, &block.archive_data, block_meta)
602 .await?;
603
604 Ok::<_, anyhow::Error>(res.handle)
605 })
606 };
607
608 let store_proof_and_queue = || {
609 let storage = storage.clone();
610 let proof = proof.clone();
611 let queue_diff = queue_diff.clone();
612 JoinTask::new(async move {
613 if rand::random::<bool>() {
614 tokio::task::yield_now().await;
615 }
616
617 let res = storage
618 .block_storage()
619 .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
620 .await?;
621
622 if rand::random::<bool>() {
623 tokio::task::yield_now().await;
624 }
625
626 let res = storage
627 .block_storage()
628 .store_queue_diff(&queue_diff, res.handle.into())
629 .await?;
630
631 if rand::random::<bool>() {
632 tokio::task::yield_now().await;
633 }
634
635 Ok::<_, anyhow::Error>(res.handle)
636 })
637 };
638
639 let (data_res, proof_and_queue_res) = async move {
640 let data_fut = pin!(store_block_data());
641 let proof_and_queue_fut = pin!(async {
642 tokio::select! {
643 left = store_proof_and_queue() => left,
644 right = store_proof_and_queue() => right,
645 }
646 });
647
648 let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
649
650 Ok::<_, anyhow::Error>((data?, other?))
651 }
652 .await?;
653
654 assert!(std::ptr::addr_eq(
655 arc_swap::RefCnt::as_ptr(&data_res),
656 arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
657 ));
658 assert!(data_res.has_all_block_parts());
659 }
660
661 Ok(())
662 }
663
664 #[tokio::test]
665 async fn blocks_gc() -> Result<()> {
666 const GARBAGE: Bytes = Bytes::from_static(b"garbage");
667 const ENTRY_TYPES: [ArchiveEntryType; 3] = [
668 ArchiveEntryType::Block,
669 ArchiveEntryType::Proof,
670 ArchiveEntryType::QueueDiff,
671 ];
672 const CONNECTION_TYPES: [BlockConnection; 2] =
673 [BlockConnection::Prev1, BlockConnection::Next1];
674
675 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
676 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
677
678 let blocks = storage.block_storage();
679 let block_handles = storage.block_handle_storage();
680 let block_connections = storage.block_connection_storage();
681
682 let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
683
684 for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
685 let entry = shard_block_ids.entry(shard).or_default();
686
687 for seqno in 0..100 {
688 let block_id = BlockId {
689 shard,
690 seqno,
691 root_hash: HashBytes(rand::random()),
692 file_hash: HashBytes(rand::random()),
693 };
694 entry.push(block_id);
695
696 let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
697 is_key_block: shard.is_masterchain() && seqno == 0,
698 gen_utime: 0,
699 ref_by_mc_seqno: seqno,
700 });
701 let lock = handle.block_data_lock().write().await;
702
703 for ty in ENTRY_TYPES {
704 blocks
705 .blob_storage
706 .add_data(&(block_id, ty).into(), GARBAGE, &lock)
707 .await?;
708 }
709 for direction in CONNECTION_TYPES {
710 block_connections.store_connection(&handle, direction, &block_id);
711 }
712
713 handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
714 block_handles.store_handle(&handle, false);
715 }
716 }
717
718 let stats = blobs::remove_blocks(
720 blocks.db().clone(),
721 &blocks.blob_storage,
722 None,
723 70,
724 [(ShardIdent::BASECHAIN, 50)].into(),
725 None,
726 )?;
727 assert_eq!(stats, BlockGcStats {
728 mc_blocks_removed: 69,
729 total_blocks_removed: 69 + 49,
730 });
731
732 let removed_ranges = FastHashMap::from_iter([
733 (ShardIdent::MASTERCHAIN, vec![1..=69]),
734 (ShardIdent::BASECHAIN, vec![1..=49]),
735 ]);
736 for (shard, block_ids) in shard_block_ids {
737 let removed_ranges = removed_ranges.get(&shard).unwrap();
738
739 for block_id in block_ids {
740 let must_be_removed = 'removed: {
741 for range in removed_ranges {
742 if range.contains(&block_id.seqno) {
743 break 'removed true;
744 }
745 }
746 false
747 };
748
749 let handle = block_handles.load_handle(&block_id);
750 assert_eq!(handle.is_none(), must_be_removed);
751
752 for ty in ENTRY_TYPES {
753 let key = PackageEntryKey::from((block_id, ty));
754 let exists_in_cas = blocks
756 .blob_storage()
757 .blocks()
758 .read_index_state()
759 .contains_key(&key);
760 assert_eq!(!exists_in_cas, must_be_removed);
761 }
762
763 for direction in CONNECTION_TYPES {
764 let connection = block_connections.load_connection(&block_id, direction);
765 assert_eq!(connection.is_none(), must_be_removed);
766 }
767 }
768 }
769
770 let stats = blobs::remove_blocks(
772 blocks.db().clone(),
773 &blocks.blob_storage,
774 None,
775 71,
776 [(ShardIdent::BASECHAIN, 51)].into(),
777 None,
778 )?;
779 assert_eq!(stats, BlockGcStats {
780 mc_blocks_removed: 1,
781 total_blocks_removed: 2,
782 });
783
784 let stats = blobs::remove_blocks(
786 blocks.db().clone(),
787 &blocks.blob_storage,
788 None,
789 71,
790 [(ShardIdent::BASECHAIN, 51)].into(),
791 None,
792 )?;
793 assert_eq!(stats, BlockGcStats {
794 mc_blocks_removed: 0,
795 total_blocks_removed: 0,
796 });
797
798 Ok(())
799 }
800}