1use std::fs::File;
2use std::io::BufReader;
3use std::num::NonZeroU32;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use moka::policy::EvictionPolicy;
10use tokio::sync::broadcast;
11use tycho_block_util::archive::ArchiveData;
12use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
13use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
14use tycho_types::models::*;
15use tycho_util::FastHasherState;
16use tycho_util::metrics::HistogramGuard;
17use tycho_util::sync::{CancellationFlag, rayon_run};
18
19pub use self::package_entry::{PackageEntryKey, PartialBlockId};
20use super::util::SlotSubscriptions;
21use super::{
22 BlockConnectionStorage, BlockFlags, BlockHandle, BlockHandleStorage, BlocksCacheConfig, CoreDb,
23 HandleCreationStatus, NewBlockMeta,
24};
25
26pub(crate) mod blobs;
27mod package_entry;
28
29pub use blobs::{ArchiveId, BlockGcStats, OpenStats};
30
31use crate::storage::block::blobs::DEFAULT_CHUNK_SIZE;
32use crate::storage::config::BlobDbConfig;
33
34const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
35const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
36
37pub struct BlockStorage {
38 blocks_cache: BlocksCache,
39 block_handle_storage: Arc<BlockHandleStorage>,
40 block_connection_storage: Arc<BlockConnectionStorage>,
41 block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
42 store_block_data: tokio::sync::RwLock<()>,
43 pub(crate) blob_storage: Arc<blobs::BlobStorage>,
44}
45
46impl BlockStorage {
47 pub const DEFAULT_BLOB_CHUNK_SIZE: NonZeroU32 =
48 NonZeroU32::new(DEFAULT_CHUNK_SIZE as u32).unwrap();
49 pub async fn new(
52 db: CoreDb,
53 config: BlockStorageConfig,
54 block_handle_storage: Arc<BlockHandleStorage>,
55 block_connection_storage: Arc<BlockConnectionStorage>,
56 ) -> Result<Self> {
57 fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
58 const BLOCK_STUFF_OVERHEAD: u32 = 1024; size_of::<BlockId>() as u32
61 + BLOCK_STUFF_OVERHEAD
62 + value.data_size().try_into().unwrap_or(u32::MAX)
63 }
64
65 let blocks_cache = moka::sync::Cache::builder()
66 .eviction_policy(EvictionPolicy::lru())
67 .time_to_live(config.blocks_cache.ttl)
68 .max_capacity(config.blocks_cache.size.as_u64())
69 .weigher(weigher)
70 .build_with_hasher(Default::default());
71
72 let blob_storage = blobs::BlobStorage::new(
73 db,
74 block_handle_storage.clone(),
75 &config.blobs_root,
76 config.blob_db_config.pre_create_cas_tree,
77 )
78 .await
79 .map(Arc::new)?;
80
81 Ok(Self {
82 blocks_cache,
83 block_handle_storage,
84 block_connection_storage,
85 block_subscriptions: Default::default(),
86 store_block_data: Default::default(),
87 blob_storage,
88 })
89 }
90
91 pub fn open_stats(&self) -> &OpenStats {
92 self.blob_storage.open_stats()
93 }
94
95 pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
98 let block_handle_storage = &self.block_handle_storage;
99
100 let guard = self.store_block_data.write().await;
102
103 if let Some(handle) = block_handle_storage.load_handle(block_id)
105 && handle.has_data()
106 {
107 drop(guard);
108 let block = self.load_block_data(&handle).await?;
109 return Ok(BlockStuffAug::loaded(block));
110 }
111
112 let rx = self.block_subscriptions.subscribe(block_id);
114
115 drop(guard);
117
118 let block = rx.await;
119 Ok(BlockStuffAug::loaded(block))
120 }
121
122 pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
123 let block_id = self
124 .block_connection_storage
125 .wait_for_next1(prev_block_id)
126 .await;
127
128 self.wait_for_block(&block_id).await
129 }
130
131 pub async fn store_block_data(
134 &self,
135 block: &BlockStuff,
136 archive_data: &ArchiveData,
137 meta_data: NewBlockMeta,
138 ) -> Result<StoreBlockResult> {
139 let guard = self.store_block_data.read().await;
143
144 let block_id = block.id();
145 let (handle, status) = self
146 .block_handle_storage
147 .create_or_load_handle(block_id, meta_data);
148
149 let archive_id = PackageEntryKey::block(block_id);
150 let mut updated = false;
151 if !handle.has_data() {
152 let data = archive_data.clone_new_archive_data()?;
153 metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
154
155 let lock = handle.block_data_lock().write().await;
156 if !handle.has_data() {
157 self.blob_storage.add_data(&archive_id, data, &lock).await?;
158 if handle.meta().add_flags(BlockFlags::HAS_DATA) {
159 self.block_handle_storage.store_handle(&handle, false);
160 updated = true;
161 }
162 }
163 }
164
165 self.block_subscriptions.notify(block_id, block);
167
168 drop(guard);
169
170 self.blocks_cache.insert(*block_id, block.clone());
172
173 Ok(StoreBlockResult {
174 handle,
175 updated,
176 new: status == HandleCreationStatus::Created,
177 })
178 }
179
180 pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
181 metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
182
183 const BIG_DATA_THRESHOLD: usize = 1 << 20; let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
186
187 anyhow::ensure!(
188 handle.has_data(),
189 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
190 );
191
192 if let Some(block) = self.blocks_cache.get(handle.id()) {
194 metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
195 return Ok(block.clone());
196 }
197
198 let data = self
199 .blob_storage
200 .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
201 .await?;
202
203 if data.len() < BIG_DATA_THRESHOLD {
204 BlockStuff::deserialize(handle.id(), data.as_ref())
205 } else {
206 let handle = handle.clone();
207 rayon_run(move || BlockStuff::deserialize(handle.id(), data.as_ref())).await
208 }
209 }
210
211 pub fn blocking_load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
212 metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
213
214 let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
215
216 anyhow::ensure!(
217 handle.has_data(),
218 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
219 );
220
221 if let Some(block) = self.blocks_cache.get(handle.id()) {
223 metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
224 return Ok(block.clone());
225 }
226
227 let data = self
228 .blob_storage
229 .blocking_get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))?;
230
231 BlockStuff::deserialize(handle.id(), data.as_ref())
232 }
233
234 pub async fn load_block_data_decompressed(&self, handle: &BlockHandle) -> Result<Bytes> {
235 anyhow::ensure!(
236 handle.has_data(),
237 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
238 );
239
240 self.blob_storage
241 .get_block_data_decompressed(handle, &PackageEntryKey::block(handle.id()))
242 .await
243 }
244
245 pub async fn list_blocks(
246 &self,
247 continuation: Option<BlockIdShort>,
248 ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
249 self.blob_storage.list_blocks(continuation).await
250 }
251
252 pub fn list_archive_ids(&self) -> Vec<u32> {
253 self.blob_storage.list_archive_ids()
254 }
255
256 pub async fn load_block_data_range(
257 &self,
258 handle: &BlockHandle,
259 offset: u64,
260 length: u64,
261 ) -> Result<Option<Bytes>> {
262 anyhow::ensure!(
263 handle.has_data(),
264 BlockStorageError::BlockDataNotFound(handle.id().as_short_id())
265 );
266 self.blob_storage
267 .get_block_data_range(handle, offset, length)
268 .await
269 }
270
271 pub fn get_compressed_block_data_size(&self, handle: &BlockHandle) -> Result<Option<u64>> {
272 let key = PackageEntryKey::block(handle.id());
273 Ok(self.blob_storage.blocks().get_size(&key)?)
274 }
275
276 pub async fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
277 self.blob_storage.find_mc_block_data(mc_seqno).await
278 }
279
280 pub async fn store_block_proof(
283 &self,
284 proof: &BlockProofStuffAug,
285 handle: MaybeExistingHandle,
286 ) -> Result<StoreBlockResult> {
287 let block_id = proof.id();
288 if let MaybeExistingHandle::Existing(handle) = &handle
289 && handle.id() != block_id
290 {
291 anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
292 expected: block_id.as_short_id(),
293 actual: handle.id().as_short_id(),
294 });
295 }
296
297 let (handle, status) = match handle {
298 MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
299 MaybeExistingHandle::New(meta_data) => self
300 .block_handle_storage
301 .create_or_load_handle(block_id, meta_data),
302 };
303
304 let mut updated = false;
305 let archive_id = PackageEntryKey::proof(block_id);
306 if !handle.has_proof() {
307 let data = proof.clone_new_archive_data()?;
308
309 let lock = handle.proof_data_lock().write().await;
310 if !handle.has_proof() {
311 self.blob_storage.add_data(&archive_id, data, &lock).await?;
312 if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
313 self.block_handle_storage.store_handle(&handle, false);
314 updated = true;
315 }
316 }
317 }
318
319 Ok(StoreBlockResult {
320 handle,
321 updated,
322 new: status == HandleCreationStatus::Created,
323 })
324 }
325
326 pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
327 let raw_proof = self.load_block_proof_raw(handle).await?;
328 BlockProofStuff::deserialize(handle.id(), &raw_proof)
329 }
330
331 pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
332 anyhow::ensure!(
333 handle.has_proof(),
334 BlockStorageError::BlockProofNotFound(handle.id().as_short_id())
335 );
336
337 self.blob_storage
338 .get_block_data_decompressed(handle, &PackageEntryKey::proof(handle.id()))
339 .await
340 }
341
342 pub async fn store_queue_diff(
345 &self,
346 queue_diff: &QueueDiffStuffAug,
347 handle: MaybeExistingHandle,
348 ) -> Result<StoreBlockResult> {
349 let block_id = queue_diff.block_id();
350 if let MaybeExistingHandle::Existing(handle) = &handle
351 && handle.id() != block_id
352 {
353 anyhow::bail!(BlockStorageError::BlockHandleIdMismatch {
354 expected: block_id.as_short_id(),
355 actual: handle.id().as_short_id(),
356 });
357 }
358
359 let (handle, status) = match handle {
360 MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
361 MaybeExistingHandle::New(meta_data) => self
362 .block_handle_storage
363 .create_or_load_handle(block_id, meta_data),
364 };
365
366 let mut updated = false;
367 let archive_id = PackageEntryKey::queue_diff(block_id);
368 if !handle.has_queue_diff() {
369 let data = queue_diff.clone_new_archive_data()?;
370
371 let lock = handle.queue_diff_data_lock().write().await;
372 if !handle.has_queue_diff() {
373 self.blob_storage.add_data(&archive_id, data, &lock).await?;
374 if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
375 self.block_handle_storage.store_handle(&handle, false);
376 updated = true;
377 }
378 }
379 }
380
381 Ok(StoreBlockResult {
382 handle,
383 updated,
384 new: status == HandleCreationStatus::Created,
385 })
386 }
387
388 pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
389 const BIG_DATA_THRESHOLD: usize = 100 << 10; let data = self.load_queue_diff_raw(handle).await?;
392
393 if data.len() < BIG_DATA_THRESHOLD {
394 QueueDiffStuff::deserialize(handle.id(), &data)
395 } else {
396 let handle = handle.clone();
397 rayon_run(move || QueueDiffStuff::deserialize(handle.id(), data.as_ref())).await
398 }
399 }
400
401 pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<Bytes> {
402 anyhow::ensure!(
403 handle.has_queue_diff(),
404 BlockStorageError::QueueDiffNotFound(handle.id().as_short_id())
405 );
406
407 self.blob_storage
408 .get_block_data_decompressed(handle, &PackageEntryKey::queue_diff(handle.id()))
409 .await
410 }
411
412 pub async fn move_into_archive(
415 &self,
416 handle: &BlockHandle,
417 mc_is_key_block: bool,
418 ) -> Result<()> {
419 self.blob_storage
420 .move_into_archive(handle, mc_is_key_block)
421 .await
422 }
423
424 pub async fn wait_for_archive_commit(&self) -> Result<()> {
425 self.blob_storage.wait_for_archive_commit().await
426 }
427
428 pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
430 self.blob_storage.get_archive_id(mc_seqno)
431 }
432
433 pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
434 self.blob_storage.get_archive_size(id)
435 }
436
437 pub fn get_archive_reader(&self, id: u32) -> Result<Option<BufReader<File>>> {
438 self.blob_storage.get_archive_reader(id)
439 }
440
441 #[cfg(any(test, feature = "test"))]
445 pub async fn get_archive_compressed_full(&self, id: u32) -> Result<Option<Bytes>> {
446 self.blob_storage.get_archive_full(id).await
447 }
448
449 pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<Bytes> {
451 self.blob_storage.get_archive_chunk(id, offset).await
452 }
453
454 pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
455 self.blob_storage.subscribe_to_archive_ids()
456 }
457
458 pub async fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
459 self.blob_storage.remove_outdated_archives(until_id).await
460 }
461
462 pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
463 self.blob_storage.estimate_archive_id(mc_seqno)
464 }
465
466 #[tracing::instrument(skip(self, max_blocks_per_batch))]
469 pub async fn remove_outdated_blocks(
470 &self,
471 mc_seqno: u32,
472 max_blocks_per_batch: Option<usize>,
473 ) -> Result<()> {
474 if mc_seqno == 0 {
475 return Ok(());
476 }
477
478 tracing::info!("started blocks GC for mc_block {mc_seqno}");
479
480 let block = self
481 .blob_storage
482 .find_mc_block_data(mc_seqno)
483 .await
484 .context("failed to load target block data")?
485 .context("target block not found")?;
486
487 let shard_heights = {
488 let extra = block.extra.load()?;
489 let custom = extra.custom.context("mc block extra not found")?.load()?;
490 custom
491 .shards
492 .latest_blocks()
493 .map(|id| id.map(|id| (id.shard, id.seqno)))
494 .collect::<Result<_, tycho_types::error::Error>>()?
495 };
496
497 let total_cached_handles_removed = self
499 .block_handle_storage
500 .gc_handles_cache(mc_seqno, &shard_heights);
501
502 let cancelled = CancellationFlag::new();
503 scopeguard::defer! {
504 cancelled.cancel();
505 }
506
507 let BlockGcStats {
510 mc_blocks_removed,
511 total_blocks_removed,
512 } = tokio::task::spawn_blocking({
513 let blob_storage = self.blob_storage.clone();
514 let cancelled = cancelled.clone();
515 move || {
516 blobs::remove_blocks(
517 &blob_storage,
518 max_blocks_per_batch,
519 mc_seqno,
520 shard_heights,
521 Some(&cancelled),
522 )
523 }
524 })
525 .await??;
526
527 tracing::info!(
528 total_cached_handles_removed,
529 mc_blocks_removed,
530 total_blocks_removed,
531 "finished blocks GC"
532 );
533 Ok(())
534 }
535
536 #[cfg(any(test, feature = "test"))]
539 pub fn blob_storage(&self) -> &blobs::BlobStorage {
540 &self.blob_storage
541 }
542}
543
544#[derive(Clone)]
545pub enum MaybeExistingHandle {
546 Existing(BlockHandle),
547 New(NewBlockMeta),
548}
549
550impl From<BlockHandle> for MaybeExistingHandle {
551 fn from(handle: BlockHandle) -> Self {
552 Self::Existing(handle)
553 }
554}
555
556impl From<NewBlockMeta> for MaybeExistingHandle {
557 fn from(meta_data: NewBlockMeta) -> Self {
558 Self::New(meta_data)
559 }
560}
561
562pub struct StoreBlockResult {
563 pub handle: BlockHandle,
564 pub updated: bool,
565 pub new: bool,
566}
567
568pub struct BlockStorageConfig {
569 pub blocks_cache: BlocksCacheConfig,
570 pub blobs_root: PathBuf,
571 pub blob_db_config: BlobDbConfig,
572}
573
574type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
575
576#[derive(thiserror::Error, Debug)]
577enum BlockStorageError {
578 #[error("Block data not found for block: {0}")]
579 BlockDataNotFound(BlockIdShort),
580 #[error("Block proof not found for block: {0}")]
581 BlockProofNotFound(BlockIdShort),
582 #[error("Queue diff not found for block: {0}")]
583 QueueDiffNotFound(BlockIdShort),
584 #[error("Block handle id mismatch: expected {expected}, got {actual}")]
585 BlockHandleIdMismatch {
586 expected: BlockIdShort,
587 actual: BlockIdShort,
588 },
589}
590
591#[cfg(test)]
592mod tests {
593 use std::pin::pin;
594
595 use blobs::*;
596 use tycho_block_util::archive::{ArchiveEntryType, WithArchiveData};
597 use tycho_block_util::block::ShardHeights;
598 use tycho_storage::StorageContext;
599 use tycho_types::prelude::*;
600 use tycho_util::FastHashMap;
601 use tycho_util::futures::JoinTask;
602
603 use super::*;
604 use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
605
606 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
607 async fn parallel_store_data() -> Result<()> {
608 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
609 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
610
611 let shard = ShardIdent::MASTERCHAIN;
612 for seqno in 0..1000 {
613 let block = BlockStuff::new_empty(shard, seqno);
614 let block = {
615 let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
616 WithArchiveData::new(block, data)
617 };
618 let block_id = block.id();
619
620 let proof = BlockProofStuff::new_empty(block_id);
621 let proof = {
622 let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
623 WithArchiveData::new(proof, data)
624 };
625
626 let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
627 .serialize()
628 .build(block_id);
629
630 let block_meta = NewBlockMeta {
631 is_key_block: shard.is_masterchain() && seqno == 0,
632 gen_utime: 0,
633 ref_by_mc_seqno: seqno,
634 };
635
636 let store_block_data = || {
637 let storage = storage.clone();
638 JoinTask::new(async move {
639 let res = storage
640 .block_storage()
641 .store_block_data(&block, &block.archive_data, block_meta)
642 .await?;
643
644 Ok::<_, anyhow::Error>(res.handle)
645 })
646 };
647
648 let store_proof_and_queue = || {
649 let storage = storage.clone();
650 let proof = proof.clone();
651 let queue_diff = queue_diff.clone();
652 JoinTask::new(async move {
653 if rand::random::<bool>() {
654 tokio::task::yield_now().await;
655 }
656
657 let res = storage
658 .block_storage()
659 .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
660 .await?;
661
662 if rand::random::<bool>() {
663 tokio::task::yield_now().await;
664 }
665
666 let res = storage
667 .block_storage()
668 .store_queue_diff(&queue_diff, res.handle.into())
669 .await?;
670
671 if rand::random::<bool>() {
672 tokio::task::yield_now().await;
673 }
674
675 Ok::<_, anyhow::Error>(res.handle)
676 })
677 };
678
679 let (data_res, proof_and_queue_res) = async move {
680 let data_fut = pin!(store_block_data());
681 let proof_and_queue_fut = pin!(async {
682 tokio::select! {
683 left = store_proof_and_queue() => left,
684 right = store_proof_and_queue() => right,
685 }
686 });
687
688 let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
689
690 Ok::<_, anyhow::Error>((data?, other?))
691 }
692 .await?;
693
694 assert!(std::ptr::addr_eq(
695 arc_swap::RefCnt::as_ptr(&data_res),
696 arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
697 ));
698 assert!(data_res.has_all_block_parts());
699 }
700
701 Ok(())
702 }
703
704 #[tokio::test]
705 async fn blocks_gc() -> Result<()> {
706 const GARBAGE: Bytes = Bytes::from_static(b"garbage");
707 const ENTRY_TYPES: [ArchiveEntryType; 3] = [
708 ArchiveEntryType::Block,
709 ArchiveEntryType::Proof,
710 ArchiveEntryType::QueueDiff,
711 ];
712 const CONNECTION_TYPES: [BlockConnection; 2] =
713 [BlockConnection::Prev1, BlockConnection::Next1];
714
715 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
716 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
717
718 let blocks = storage.block_storage();
719 let block_handles = storage.block_handle_storage();
720 let block_connections = storage.block_connection_storage();
721
722 let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
723
724 for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
725 let entry = shard_block_ids.entry(shard).or_default();
726
727 for seqno in 0..100 {
728 let block_id = BlockId {
729 shard,
730 seqno,
731 root_hash: HashBytes(rand::random()),
732 file_hash: HashBytes(rand::random()),
733 };
734 entry.push(block_id);
735
736 let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
737 is_key_block: shard.is_masterchain() && seqno == 0,
738 gen_utime: 0,
739 ref_by_mc_seqno: seqno,
740 });
741 let lock = handle.block_data_lock().write().await;
742
743 for ty in ENTRY_TYPES {
744 blocks
745 .blob_storage
746 .add_data(&(block_id, ty).into(), GARBAGE, &lock)
747 .await?;
748 }
749 for direction in CONNECTION_TYPES {
750 block_connections.store_connection(&handle, direction, &block_id);
751 }
752
753 handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
754 block_handles.store_handle(&handle, false);
755 }
756 }
757
758 let stats = blobs::remove_blocks(
760 &blocks.blob_storage,
761 None,
762 70,
763 [(ShardIdent::BASECHAIN, 50)].into(),
764 None,
765 )?;
766 assert_eq!(stats, BlockGcStats {
767 mc_blocks_removed: 69,
768 total_blocks_removed: 69 + 49,
769 });
770
771 let removed_ranges = FastHashMap::from_iter([
772 (ShardIdent::MASTERCHAIN, vec![1..=69]),
773 (ShardIdent::BASECHAIN, vec![1..=49]),
774 ]);
775 for (shard, block_ids) in shard_block_ids {
776 let removed_ranges = removed_ranges.get(&shard).unwrap();
777
778 for block_id in block_ids {
779 let must_be_removed = 'removed: {
780 for range in removed_ranges {
781 if range.contains(&block_id.seqno) {
782 break 'removed true;
783 }
784 }
785 false
786 };
787
788 let handle = block_handles.load_handle(&block_id);
789 assert_eq!(handle.is_none(), must_be_removed);
790
791 for ty in ENTRY_TYPES {
792 let key = PackageEntryKey::from((block_id, ty));
793 let exists_in_cas = blocks
795 .blob_storage()
796 .blocks()
797 .read_index_state()
798 .contains_key(&key);
799 assert_eq!(!exists_in_cas, must_be_removed);
800 }
801
802 for direction in CONNECTION_TYPES {
803 let connection = block_connections.load_connection(&block_id, direction);
804 assert_eq!(connection.is_none(), must_be_removed);
805 }
806 }
807 }
808
809 let stats = blobs::remove_blocks(
811 &blocks.blob_storage,
812 None,
813 71,
814 [(ShardIdent::BASECHAIN, 51)].into(),
815 None,
816 )?;
817 assert_eq!(stats, BlockGcStats {
818 mc_blocks_removed: 1,
819 total_blocks_removed: 2,
820 });
821
822 let stats = blobs::remove_blocks(
824 &blocks.blob_storage,
825 None,
826 71,
827 [(ShardIdent::BASECHAIN, 51)].into(),
828 None,
829 )?;
830 assert_eq!(stats, BlockGcStats {
831 mc_blocks_removed: 0,
832 total_blocks_removed: 0,
833 });
834
835 Ok(())
836 }
837
838 #[tokio::test]
839 async fn blocks_gc_skip_lifecycle() -> Result<()> {
840 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
841 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
842
843 let handles = storage.block_handle_storage();
844 let blob_storage = storage.block_storage().blob_storage.clone();
845
846 let id = BlockId {
847 shard: ShardIdent::MASTERCHAIN,
848 seqno: 1,
849 root_hash: HashBytes(rand::random()),
850 file_hash: HashBytes(rand::random()),
851 };
852
853 let (handle, _) = handles.create_or_load_handle(&id, NewBlockMeta {
854 is_key_block: false,
855 gen_utime: 0,
856 ref_by_mc_seqno: id.seqno,
857 });
858
859 handles.set_skip_blocks_gc(&handle);
860 assert!(handle.skip_blocks_gc());
861
862 blobs::remove_blocks(&blob_storage, None, 100, ShardHeights::default(), None)?;
863 assert!(handles.load_handle(&id).is_some());
864
865 handles.set_skip_blocks_gc_finished(&handle);
866 assert!(!handle.skip_blocks_gc());
867
868 drop(handle);
869
870 blobs::remove_blocks(&blob_storage, None, 100, ShardHeights::default(), None)?;
871 assert!(handles.load_handle(&id).is_none());
872
873 Ok(())
874 }
875}