tycho_core/storage/block/
mod.rs

1use std::collections::BTreeSet;
2use std::num::NonZeroU32;
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use bytes::Buf;
8use bytesize::ByteSize;
9use parking_lot::RwLock;
10use tl_proto::TlWrite;
11use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast};
12use tokio::task::JoinHandle;
13use tycho_block_util::archive::{
14    ARCHIVE_ENTRY_HEADER_LEN, ARCHIVE_PREFIX, ArchiveData, ArchiveEntryHeader, ArchiveEntryType,
15};
16use tycho_block_util::block::{
17    BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug, ShardHeights,
18};
19use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
20use tycho_storage::kv::StoredValue;
21use tycho_types::boc::{Boc, BocRepr};
22use tycho_types::cell::HashBytes;
23use tycho_types::models::*;
24use tycho_util::compression::ZstdCompressStream;
25use tycho_util::metrics::HistogramGuard;
26use tycho_util::sync::{CancellationFlag, rayon_run};
27use tycho_util::{FastHashSet, FastHasherState};
28use weedb::{ColumnFamily, OwnedPinnableSlice, rocksdb};
29
30pub use self::package_entry::{BlockDataEntryKey, PackageEntryKey, PartialBlockId};
31use super::util::SlotSubscriptions;
32use super::{
33    BlockConnectionStorage, BlockDataGuard, BlockFlags, BlockHandle, BlockHandleStorage,
34    BlocksCacheConfig, CoreDb, HandleCreationStatus, NewBlockMeta, tables,
35};
36
37mod package_entry;
38
39const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
40const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
41
42pub struct BlockStorage {
43    db: CoreDb,
44    blocks_cache: BlocksCache,
45    block_handle_storage: Arc<BlockHandleStorage>,
46    block_connection_storage: Arc<BlockConnectionStorage>,
47    archive_ids: RwLock<ArchiveIds>,
48    block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
49    store_block_data: tokio::sync::RwLock<()>,
50    prev_archive_commit: tokio::sync::Mutex<Option<CommitArchiveTask>>,
51    archive_ids_tx: ArchiveIdsTx,
52    archive_chunk_size: NonZeroU32,
53    split_block_semaphore: Arc<Semaphore>,
54}
55
56impl BlockStorage {
57    // === Init stuff ===
58
59    pub fn new(
60        db: CoreDb,
61        config: BlockStorageConfig,
62        block_handle_storage: Arc<BlockHandleStorage>,
63        block_connection_storage: Arc<BlockConnectionStorage>,
64        archive_chunk_size: ByteSize,
65    ) -> Self {
66        fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
67            const BLOCK_STUFF_OVERHEAD: u32 = 1024; // 1 KB
68
69            std::mem::size_of::<BlockId>() as u32
70                + BLOCK_STUFF_OVERHEAD
71                + value.data_size().try_into().unwrap_or(u32::MAX)
72        }
73
74        let blocks_cache = moka::sync::Cache::builder()
75            .time_to_live(config.blocks_cache.ttl)
76            .max_capacity(config.blocks_cache.size.as_u64())
77            .weigher(weigher)
78            .build_with_hasher(Default::default());
79
80        let (archive_ids_tx, _) = broadcast::channel(4);
81
82        let archive_chunk_size =
83            NonZeroU32::new(archive_chunk_size.as_u64().clamp(1, u32::MAX as _) as _).unwrap();
84
85        let split_block_semaphore = Arc::new(Semaphore::new(config.split_block_tasks));
86
87        Self {
88            db,
89            blocks_cache,
90            block_handle_storage,
91            block_connection_storage,
92            archive_ids_tx,
93            archive_chunk_size,
94            split_block_semaphore,
95            archive_ids: Default::default(),
96            block_subscriptions: Default::default(),
97            store_block_data: Default::default(),
98            prev_archive_commit: Default::default(),
99        }
100    }
101
102    // NOTE: This is intentionally a method, not a constant because
103    // it might be useful to allow configure it during the first run.
104    pub fn archive_chunk_size(&self) -> NonZeroU32 {
105        self.archive_chunk_size
106    }
107
108    pub fn block_data_chunk_size(&self) -> NonZeroU32 {
109        NonZeroU32::new(BLOCK_DATA_CHUNK_SIZE).unwrap()
110    }
111
112    pub async fn finish_block_data(&self) -> Result<()> {
113        let started_at = Instant::now();
114
115        tracing::info!("started finishing compressed block data");
116
117        let mut iter = self.db.block_data_entries.raw_iterator();
118        iter.seek_to_first();
119
120        let mut blocks_to_finish = Vec::new();
121
122        loop {
123            let Some(key) = iter.key() else {
124                if let Err(e) = iter.status() {
125                    tracing::error!("failed to iterate through compressed block data: {e:?}");
126                }
127                break;
128            };
129
130            let key = BlockDataEntryKey::from_slice(key);
131
132            const _: () = const {
133                // Rely on the specific order of these constants
134                assert!(BLOCK_DATA_STARTED_MAGIC < BLOCK_DATA_SIZE_MAGIC);
135            };
136
137            // Chunk keys are sorted by offset.
138            match key.chunk_index {
139                // "Started" magic comes first, and indicates that the block exists.
140                BLOCK_DATA_STARTED_MAGIC => {
141                    blocks_to_finish.push(key.block_id);
142                }
143                // "Size" magic comes last, and indicates that the block data is finished.
144                BLOCK_DATA_SIZE_MAGIC => {
145                    // Last block id is already finished
146                    let last = blocks_to_finish.pop();
147                    anyhow::ensure!(last == Some(key.block_id), "invalid block data SIZE entry");
148                }
149                _ => {}
150            }
151
152            iter.next();
153        }
154
155        drop(iter);
156
157        for block_id in blocks_to_finish {
158            tracing::info!(?block_id, "found unfinished block");
159
160            let key = PackageEntryKey {
161                block_id,
162                ty: ArchiveEntryType::Block,
163            };
164
165            let data = match self.db.package_entries.get(key.to_vec())? {
166                Some(data) => data,
167                None => return Err(BlockStorageError::BlockDataNotFound.into()),
168            };
169
170            let permit = self.split_block_semaphore.clone().acquire_owned().await?;
171            self.spawn_split_block_data(&block_id, &data, permit)
172                .await??;
173        }
174
175        tracing::info!(
176            elapsed = %humantime::format_duration(started_at.elapsed()),
177            "finished handling unfinished blocks"
178        );
179
180        Ok(())
181    }
182
183    /// Iterates over all archives and preloads their ids into memory.
184    pub async fn preload_archive_ids(&self) -> Result<()> {
185        let started_at = Instant::now();
186
187        tracing::info!("started preloading archive ids");
188
189        let db = self.db.clone();
190
191        let (archive_ids, override_next_id, to_commit) = tokio::task::spawn_blocking(move || {
192            let mut iter = db.archives.raw_iterator();
193            iter.seek_to_first();
194
195            let mut archive_ids = BTreeSet::new();
196            let mut archives_to_commit = Vec::new();
197            let mut override_next_id = None;
198            loop {
199                let Some((key, value)) = iter.item() else {
200                    if let Err(e) = iter.status() {
201                        tracing::error!("failed to iterate through archives: {e:?}");
202                    }
203                    break;
204                };
205
206                let archive_id = u32::from_be_bytes(key[..4].try_into().unwrap());
207                let chunk_index = u64::from_be_bytes(key[4..].try_into().unwrap());
208
209                const _: () = const {
210                    // Rely on the specific order of these constants
211                    assert!(ARCHIVE_STARTED_MAGIC < ARCHIVE_OVERRIDE_NEXT_MAGIC);
212                    assert!(ARCHIVE_OVERRIDE_NEXT_MAGIC < ARCHIVE_TO_COMMIT_MAGIC);
213                    assert!(ARCHIVE_TO_COMMIT_MAGIC < ARCHIVE_SIZE_MAGIC);
214                };
215
216                let mut skip = None;
217
218                if let Some(next_id) = override_next_id {
219                    // Reset override when it is not needed.
220                    if archive_id > next_id {
221                        override_next_id = None;
222                    }
223                }
224
225                // Chunk keys are sorted by offset.
226                match chunk_index {
227                    // "Started" magic comes first, and indicates that the archive exists.
228                    ARCHIVE_STARTED_MAGIC => {
229                        archive_ids.insert(archive_id);
230                    }
231                    // "Override" marig comes next, and sets the next archive id if was finished earlier.
232                    ARCHIVE_OVERRIDE_NEXT_MAGIC => {
233                        override_next_id = Some(u32::from_le_bytes(value[..4].try_into().unwrap()));
234                    }
235                    // "To commit" magic comes next, commit should have been started.
236                    ARCHIVE_TO_COMMIT_MAGIC => {
237                        anyhow::ensure!(
238                            archive_ids.contains(&archive_id),
239                            "invalid archive TO_COMMIT entry"
240                        );
241                        archives_to_commit.push(archive_id);
242                    }
243                    // "Size" magic comes last, and indicates that the archive is fully committed.
244                    ARCHIVE_SIZE_MAGIC => {
245                        // Last archive is already committed
246                        let last = archives_to_commit.pop();
247                        anyhow::ensure!(last == Some(archive_id), "invalid archive SIZE entry");
248
249                        // Require only contiguous uncommited archives list
250                        anyhow::ensure!(archives_to_commit.is_empty(), "skipped archive commit");
251                    }
252                    _ => {
253                        // Skip all chunks until the magic
254                        if chunk_index < ARCHIVE_STARTED_MAGIC {
255                            let mut next_key = [0; tables::Archives::KEY_LEN];
256                            next_key[..4].copy_from_slice(&archive_id.to_be_bytes());
257                            next_key[4..].copy_from_slice(&ARCHIVE_STARTED_MAGIC.to_be_bytes());
258                            skip = Some(next_key);
259                        }
260                    }
261                }
262
263                match skip {
264                    None => iter.next(),
265                    Some(key) => iter.seek(key),
266                }
267            }
268
269            Ok::<_, anyhow::Error>((archive_ids, override_next_id, archives_to_commit))
270        })
271        .await??;
272
273        {
274            let mut ids = self.archive_ids.write();
275            ids.items.extend(archive_ids);
276            ids.override_next_id = override_next_id;
277        }
278
279        tracing::info!(
280            elapsed = %humantime::format_duration(started_at.elapsed()),
281            ?override_next_id,
282            "finished preloading archive ids"
283        );
284
285        for archive_id in to_commit {
286            tracing::info!(archive_id, "clear partially committed archive");
287            // Solves the problem of non-deterministic compression when commit archive
288            // was interrupted and should be rewritten
289            self.clear_archive(archive_id)?;
290
291            tracing::info!(archive_id, "rewrite partially committed archive");
292            let mut task = self.spawn_commit_archive(archive_id);
293            task.finish().await?;
294
295            // Notify archive subscribers
296            self.archive_ids_tx.send(task.archive_id).ok();
297        }
298
299        Ok(())
300    }
301
302    // === Subscription stuff ===
303
304    pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
305        let block_handle_storage = &self.block_handle_storage;
306
307        // Take an exclusive lock to prevent any block data from being stored
308        let guard = self.store_block_data.write().await;
309
310        // Try to load the block data
311        if let Some(handle) = block_handle_storage.load_handle(block_id) {
312            if handle.has_data() {
313                drop(guard);
314                let block = self.load_block_data(&handle).await?;
315                return Ok(BlockStuffAug::loaded(block));
316            }
317        }
318
319        // Add subscription for the block and drop the lock
320        let rx = self.block_subscriptions.subscribe(block_id);
321
322        // NOTE: Guard must be dropped before awaiting
323        drop(guard);
324
325        let block = rx.await;
326        Ok(BlockStuffAug::loaded(block))
327    }
328
329    pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
330        let block_id = self
331            .block_connection_storage
332            .wait_for_next1(prev_block_id)
333            .await;
334
335        self.wait_for_block(&block_id).await
336    }
337
338    // === Block data ===
339
340    pub async fn store_block_data(
341        &self,
342        block: &BlockStuff,
343        archive_data: &ArchiveData,
344        meta_data: NewBlockMeta,
345    ) -> Result<StoreBlockResult> {
346        // NOTE: Any amount of blocks can be stored concurrently,
347        // but the subscription lock can be acquired only while
348        // no block data is being stored.
349        let guard = self.store_block_data.read().await;
350
351        let block_id = block.id();
352        let (handle, status) = self
353            .block_handle_storage
354            .create_or_load_handle(block_id, meta_data);
355
356        let archive_id = PackageEntryKey::block(block_id);
357        let mut updated = false;
358        if !handle.has_data() {
359            let data = archive_data.as_new_archive_data()?;
360            metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
361
362            let _lock = handle.block_data_lock().write().await;
363            if !handle.has_data() {
364                self.add_block_data_and_split(&archive_id, data).await?;
365                if handle.meta().add_flags(BlockFlags::HAS_DATA) {
366                    self.block_handle_storage.store_handle(&handle, false);
367                    updated = true;
368                }
369            }
370        }
371
372        // TODO: only notify subscribers if `updated`?
373        self.block_subscriptions.notify(block_id, block);
374
375        drop(guard);
376
377        // Update block cache
378        self.blocks_cache.insert(*block_id, block.clone());
379
380        Ok(StoreBlockResult {
381            handle,
382            updated,
383            new: status == HandleCreationStatus::Created,
384        })
385    }
386
387    pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
388        metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
389
390        const BIG_DATA_THRESHOLD: usize = 1 << 20; // 1 MB
391
392        let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
393
394        if !handle.has_data() {
395            return Err(BlockStorageError::BlockDataNotFound.into());
396        }
397
398        // Fast path - lookup in cache
399        if let Some(block) = self.blocks_cache.get(handle.id()) {
400            metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
401            return Ok(block.clone());
402        }
403
404        let FullBlockDataGuard { _lock, data } = self
405            .get_data_ref(handle, &PackageEntryKey::block(handle.id()))
406            .await?;
407
408        if data.len() > BIG_DATA_THRESHOLD {
409            BlockStuff::deserialize(handle.id(), data.as_ref())
410        } else {
411            let handle = handle.clone();
412
413            // SAFETY: `data` was created by the `self.db` RocksDB instance.
414            let owned_data =
415                unsafe { weedb::OwnedPinnableSlice::new(self.db.rocksdb().clone(), data) };
416            rayon_run(move || BlockStuff::deserialize(handle.id(), owned_data.as_ref())).await
417        }
418    }
419
420    pub async fn load_block_data_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
421        if !handle.has_data() {
422            return Err(BlockStorageError::BlockDataNotFound.into());
423        }
424        self.get_data(handle, &PackageEntryKey::block(handle.id()))
425            .await
426    }
427
428    pub async fn list_blocks(
429        &self,
430        continuation: Option<BlockIdShort>,
431    ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
432        const LIMIT: usize = 1000; // Max blocks per response
433        const MAX_BYTES: usize = 1 << 20; // 1 MB processed per response
434
435        let continuation = continuation.map(|block_id| {
436            PackageEntryKey::block(&BlockId {
437                shard: block_id.shard,
438                seqno: block_id.seqno,
439                root_hash: HashBytes::ZERO,
440                file_hash: HashBytes::ZERO,
441            })
442            .to_vec()
443        });
444
445        let mut iter = {
446            let mut readopts = rocksdb::ReadOptions::default();
447            tables::PackageEntries::read_options(&mut readopts);
448            if let Some(key) = &continuation {
449                readopts.set_iterate_lower_bound(key.as_slice());
450            }
451            self.db
452                .rocksdb()
453                .raw_iterator_cf_opt(&self.db.package_entries.cf(), readopts)
454        };
455
456        // NOTE: Despite setting the lower bound we must still seek to the exact key.
457        match continuation {
458            None => iter.seek_to_first(),
459            Some(key) => iter.seek(key),
460        }
461
462        let mut bytes = 0;
463        let mut blocks = Vec::new();
464
465        let continuation = loop {
466            let (key, value) = match iter.item() {
467                Some(item) => item,
468                None => {
469                    iter.status()?;
470                    break None;
471                }
472            };
473
474            let id = PackageEntryKey::from_slice(key);
475            if id.ty != ArchiveEntryType::Block {
476                // Ignore non-block entries
477                iter.next();
478                continue;
479            }
480
481            if blocks.len() >= LIMIT || bytes >= MAX_BYTES {
482                break Some(id.block_id.as_short_id());
483            }
484
485            let file_hash = Boc::file_hash_blake(value);
486            let block_id = id.block_id.make_full(file_hash);
487
488            bytes += value.len();
489            blocks.push(block_id);
490
491            iter.next();
492        };
493
494        Ok((blocks, continuation))
495    }
496
497    pub fn list_archive_ids(&self) -> Vec<u32> {
498        self.archive_ids.read().items.iter().cloned().collect()
499    }
500
501    pub async fn load_block_data_raw_ref<'a>(
502        &'a self,
503        handle: &'a BlockHandle,
504    ) -> Result<impl AsRef<[u8]> + 'a> {
505        if !handle.has_data() {
506            return Err(BlockStorageError::BlockDataNotFound.into());
507        }
508        self.get_data_ref(handle, &PackageEntryKey::block(handle.id()))
509            .await
510    }
511
512    pub fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
513        let package_entries = &self.db.package_entries;
514
515        let bound = BlockId {
516            shard: ShardIdent::MASTERCHAIN,
517            seqno: mc_seqno,
518            root_hash: HashBytes::ZERO,
519            file_hash: HashBytes::ZERO,
520        };
521
522        let mut bound = PackageEntryKey::block(&bound);
523
524        let mut readopts = package_entries.new_read_config();
525        readopts.set_iterate_lower_bound(bound.to_vec().into_vec());
526        bound.block_id.seqno += 1;
527        readopts.set_iterate_upper_bound(bound.to_vec().into_vec());
528
529        let mut iter = self
530            .db
531            .rocksdb()
532            .raw_iterator_cf_opt(&package_entries.cf(), readopts);
533
534        iter.seek_to_first();
535        loop {
536            let Some((key, value)) = iter.item() else {
537                iter.status()?;
538                return Ok(None);
539            };
540
541            let Some(ArchiveEntryType::Block) = extract_entry_type(key) else {
542                continue;
543            };
544
545            return Ok(Some(BocRepr::decode::<Block, _>(value)?));
546        }
547    }
548
549    // === Block proof ===
550
551    pub async fn store_block_proof(
552        &self,
553        proof: &BlockProofStuffAug,
554        handle: MaybeExistingHandle,
555    ) -> Result<StoreBlockResult> {
556        let block_id = proof.id();
557        if matches!(&handle, MaybeExistingHandle::Existing(handle) if handle.id() != block_id) {
558            return Err(BlockStorageError::BlockHandleIdMismatch.into());
559        }
560
561        let (handle, status) = match handle {
562            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
563            MaybeExistingHandle::New(meta_data) => self
564                .block_handle_storage
565                .create_or_load_handle(block_id, meta_data),
566        };
567
568        let mut updated = false;
569        let archive_id = PackageEntryKey::proof(block_id);
570        if !handle.has_proof() {
571            let data = proof.as_new_archive_data()?;
572
573            let _lock = handle.proof_data_lock().write().await;
574            if !handle.has_proof() {
575                self.add_data(&archive_id, data)?;
576                if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
577                    self.block_handle_storage.store_handle(&handle, false);
578                    updated = true;
579                }
580            }
581        }
582
583        Ok(StoreBlockResult {
584            handle,
585            updated,
586            new: status == HandleCreationStatus::Created,
587        })
588    }
589
590    pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
591        let raw_proof = self.load_block_proof_raw_ref(handle).await?;
592        BlockProofStuff::deserialize(handle.id(), raw_proof.as_ref())
593    }
594
595    pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
596        if !handle.has_proof() {
597            return Err(BlockStorageError::BlockProofNotFound.into());
598        }
599
600        self.get_data(handle, &PackageEntryKey::proof(handle.id()))
601            .await
602    }
603
604    pub async fn load_block_proof_raw_ref<'a>(
605        &'a self,
606        handle: &'a BlockHandle,
607    ) -> Result<impl AsRef<[u8]> + 'a> {
608        if !handle.has_proof() {
609            return Err(BlockStorageError::BlockProofNotFound.into());
610        }
611
612        self.get_data_ref(handle, &PackageEntryKey::proof(handle.id()))
613            .await
614    }
615
616    // === Queue diff ===
617
618    pub async fn store_queue_diff(
619        &self,
620        queue_diff: &QueueDiffStuffAug,
621        handle: MaybeExistingHandle,
622    ) -> Result<StoreBlockResult> {
623        let block_id = queue_diff.block_id();
624        if matches!(&handle, MaybeExistingHandle::Existing(handle) if handle.id() != block_id) {
625            return Err(BlockStorageError::BlockHandleIdMismatch.into());
626        }
627
628        let (handle, status) = match handle {
629            MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
630            MaybeExistingHandle::New(meta_data) => self
631                .block_handle_storage
632                .create_or_load_handle(block_id, meta_data),
633        };
634
635        let mut updated = false;
636        let archive_id = PackageEntryKey::queue_diff(block_id);
637        if !handle.has_queue_diff() {
638            let data = queue_diff.as_new_archive_data()?;
639
640            let _lock = handle.queue_diff_data_lock().write().await;
641            if !handle.has_queue_diff() {
642                self.add_data(&archive_id, data)?;
643                if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
644                    self.block_handle_storage.store_handle(&handle, false);
645                    updated = true;
646                }
647            }
648        }
649
650        Ok(StoreBlockResult {
651            handle,
652            updated,
653            new: status == HandleCreationStatus::Created,
654        })
655    }
656
657    pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
658        let raw_diff = self.load_queue_diff_raw_ref(handle).await?;
659        QueueDiffStuff::deserialize(handle.id(), raw_diff.as_ref())
660    }
661
662    pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
663        if !handle.has_queue_diff() {
664            return Err(BlockStorageError::QueueDiffNotFound.into());
665        }
666
667        self.get_data(handle, &PackageEntryKey::queue_diff(handle.id()))
668            .await
669    }
670
671    pub async fn load_queue_diff_raw_ref<'a>(
672        &'a self,
673        handle: &'a BlockHandle,
674    ) -> Result<impl AsRef<[u8]> + 'a> {
675        if !handle.has_queue_diff() {
676            return Err(BlockStorageError::QueueDiffNotFound.into());
677        }
678        self.get_data_ref(handle, &PackageEntryKey::queue_diff(handle.id()))
679            .await
680    }
681
682    // === Archive stuff ===
683
684    /// Loads data and proof for the block and appends them to the corresponding archive.
685    pub async fn move_into_archive(
686        &self,
687        handle: &BlockHandle,
688        mc_is_key_block: bool,
689    ) -> Result<()> {
690        let _histogram = HistogramGuard::begin("tycho_storage_move_into_archive_time");
691
692        // Prepare data
693        let block_id_bytes = handle.id().to_vec();
694
695        // Prepare cf
696        let archive_block_ids_cf = self.db.archive_block_ids.cf();
697        let chunks_cf = self.db.archives.cf();
698
699        // Prepare archive
700        let archive_id = self.prepare_archive_id(
701            handle.ref_by_mc_seqno(),
702            mc_is_key_block || handle.is_key_block(),
703        );
704        let archive_id_bytes = archive_id.id.to_be_bytes();
705
706        // 0. Create transaction
707        let mut batch = rocksdb::WriteBatch::default();
708
709        // 1. Append archive block id
710        batch.merge_cf(&archive_block_ids_cf, archive_id_bytes, &block_id_bytes);
711
712        // 2. Store info that new archive was started
713        if archive_id.is_new {
714            let mut key = [0u8; tables::Archives::KEY_LEN];
715            key[..4].copy_from_slice(&archive_id_bytes);
716            key[4..].copy_from_slice(&ARCHIVE_STARTED_MAGIC.to_be_bytes());
717            batch.put_cf(&chunks_cf, key, []);
718        }
719        // 3. Store info about overriding next archive id
720        if let Some(next_id) = archive_id.override_next_id {
721            let mut key = [0u8; tables::Archives::KEY_LEN];
722            key[..4].copy_from_slice(&archive_id_bytes);
723            key[4..].copy_from_slice(&ARCHIVE_OVERRIDE_NEXT_MAGIC.to_be_bytes());
724            batch.put_cf(&chunks_cf, key, next_id.to_le_bytes());
725        }
726        // 4. Store info that archive commit is in progress
727        if let Some(to_commit) = archive_id.to_commit {
728            let mut key = [0u8; tables::Archives::KEY_LEN];
729            key[..4].copy_from_slice(&to_commit.to_be_bytes());
730            key[4..].copy_from_slice(&ARCHIVE_TO_COMMIT_MAGIC.to_be_bytes());
731            batch.put_cf(&chunks_cf, key, []);
732        }
733        // 4. Execute transaction
734        self.db.rocksdb().write(batch)?;
735
736        tracing::debug!(block_id = %handle.id(), "saved block id into archive");
737        // Block will be removed after blocks gc
738
739        if let Some(to_commit) = archive_id.to_commit {
740            // Commit previous archive
741            let mut prev_archive_commit = self.prev_archive_commit.lock().await;
742
743            // NOTE: Wait on reference to make sure that the task is cancel safe
744            if let Some(task) = &mut *prev_archive_commit {
745                // Wait commit archive
746                task.finish().await?;
747
748                // Notify archive subscribers
749                self.archive_ids_tx.send(task.archive_id).ok();
750            }
751            *prev_archive_commit = Some(self.spawn_commit_archive(to_commit));
752        }
753
754        // Done
755        Ok(())
756    }
757
758    pub async fn wait_for_archive_commit(&self) -> Result<()> {
759        let mut prev_archive_commit = self.prev_archive_commit.lock().await;
760        if let Some(task) = &mut *prev_archive_commit {
761            task.finish().await?;
762            *prev_archive_commit = None;
763        }
764        Ok(())
765    }
766
767    /// Returns a corresponding archive id for the specified masterchain seqno.
768    pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
769        let archive_ids = self.archive_ids.read();
770
771        if !matches!(archive_ids.items.last(), Some(id) if mc_seqno < *id) {
772            // Return `TooNew` if there are no archives yet, or the requested
773            // seqno is greater than the beginning of the last archive. beg
774            return ArchiveId::TooNew;
775        }
776
777        match archive_ids.items.range(..=mc_seqno).next_back() {
778            // NOTE: handles case when mc_seqno is far in the future.
779            // However if there is a key block between `id` and `mc_seqno`,
780            // this will return an archive without that specified block.
781            Some(id) if mc_seqno < id + ARCHIVE_PACKAGE_SIZE => ArchiveId::Found(*id),
782            _ => ArchiveId::NotFound,
783        }
784    }
785
786    pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
787        let mut key = [0u8; tables::Archives::KEY_LEN];
788        key[..4].copy_from_slice(&id.to_be_bytes());
789        key[4..].copy_from_slice(&ARCHIVE_SIZE_MAGIC.to_be_bytes());
790
791        match self.db.archives.get(key.as_slice())? {
792            Some(slice) => Ok(Some(
793                u64::from_le_bytes(slice.as_ref().try_into().unwrap()) as usize
794            )),
795            None => Ok(None),
796        }
797    }
798
799    /// Loads an archive chunk.
800    pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<OwnedPinnableSlice> {
801        let chunk_size = self.archive_chunk_size().get() as u64;
802        if offset % chunk_size != 0 {
803            return Err(BlockStorageError::InvalidOffset.into());
804        }
805
806        let chunk_index = offset / chunk_size;
807
808        let mut key = [0u8; tables::Archives::KEY_LEN];
809        key[..4].copy_from_slice(&id.to_be_bytes());
810        key[4..].copy_from_slice(&chunk_index.to_be_bytes());
811
812        let chunk = self
813            .db
814            .archives
815            .get(key.as_slice())?
816            .ok_or(BlockStorageError::ArchiveNotFound)?;
817
818        // SAFETY: A value was received from the same RocksDB instance.
819        Ok(unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), chunk) })
820    }
821
822    pub fn get_block_data_size(&self, block_id: &BlockId) -> Result<Option<u32>> {
823        let key = BlockDataEntryKey {
824            block_id: block_id.into(),
825            chunk_index: BLOCK_DATA_SIZE_MAGIC,
826        };
827        let size = self
828            .db
829            .block_data_entries
830            .get(key.to_vec())?
831            .map(|slice| u32::from_le_bytes(slice.as_ref().try_into().unwrap()));
832
833        Ok(size)
834    }
835
836    pub fn get_block_data_chunk(
837        &self,
838        block_id: &BlockId,
839        offset: u32,
840    ) -> Result<Option<OwnedPinnableSlice>> {
841        let chunk_size = self.block_data_chunk_size().get();
842        if offset % chunk_size != 0 {
843            return Err(BlockStorageError::InvalidOffset.into());
844        }
845
846        let key = BlockDataEntryKey {
847            block_id: block_id.into(),
848            chunk_index: offset / chunk_size,
849        };
850
851        Ok(self.db.block_data_entries.get(key.to_vec())?.map(|value| {
852            // SAFETY: A value was received from the same RocksDB instance.
853            unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), value) }
854        }))
855    }
856
857    pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
858        self.archive_ids_tx.subscribe()
859    }
860
861    pub fn archive_chunks_iterator(&self, archive_id: u32) -> rocksdb::DBRawIterator<'_> {
862        let mut from = [0u8; tables::Archives::KEY_LEN];
863        from[..4].copy_from_slice(&archive_id.to_be_bytes());
864
865        let mut to = [0u8; tables::Archives::KEY_LEN];
866        to[..4].copy_from_slice(&archive_id.to_be_bytes());
867        to[4..].copy_from_slice(&ARCHIVE_MAGIC_MIN.to_be_bytes());
868
869        let mut read_opts = self.db.archives.new_read_config();
870        read_opts.set_iterate_upper_bound(to.as_slice());
871
872        let rocksdb = self.db.rocksdb();
873        let archives_cf = self.db.archives.cf();
874
875        let mut raw_iterator = rocksdb.raw_iterator_cf_opt(&archives_cf, read_opts);
876        raw_iterator.seek(from);
877
878        raw_iterator
879    }
880
881    // === GC stuff ===
882
883    #[tracing::instrument(skip(self, max_blocks_per_batch))]
884    pub async fn remove_outdated_blocks(
885        &self,
886        mc_seqno: u32,
887        max_blocks_per_batch: Option<usize>,
888    ) -> Result<()> {
889        if mc_seqno == 0 {
890            return Ok(());
891        }
892
893        tracing::info!("started blocks GC for mc_block {mc_seqno}");
894
895        let block = self
896            .find_mc_block_data(mc_seqno)
897            .context("failed to load target block data")?
898            .context("target block not found")?;
899
900        let shard_heights = {
901            let extra = block.extra.load()?;
902            let custom = extra.custom.context("mc block extra not found")?.load()?;
903            custom
904                .shards
905                .latest_blocks()
906                .map(|id| id.map(|id| (id.shard, id.seqno)))
907                .collect::<Result<_, tycho_types::error::Error>>()?
908        };
909
910        // Remove all expired entries
911        let total_cached_handles_removed = self
912            .block_handle_storage
913            .gc_handles_cache(mc_seqno, &shard_heights);
914
915        let cancelled = CancellationFlag::new();
916        scopeguard::defer! {
917            cancelled.cancel();
918        }
919
920        let span = tracing::Span::current();
921        let cancelled = cancelled.clone();
922        let db = self.db.clone();
923
924        let BlockGcStats {
925            mc_blocks_removed,
926            total_blocks_removed,
927        } = rayon_run(move || {
928            let _span = span.enter();
929
930            let guard = scopeguard::guard((), |_| {
931                tracing::warn!("cancelled");
932            });
933
934            let stats = remove_blocks(
935                db,
936                max_blocks_per_batch,
937                mc_seqno,
938                shard_heights,
939                Some(&cancelled),
940            )?;
941
942            scopeguard::ScopeGuard::into_inner(guard);
943            Ok::<_, anyhow::Error>(stats)
944        })
945        .await?;
946
947        tracing::info!(
948            total_cached_handles_removed,
949            mc_blocks_removed,
950            total_blocks_removed,
951            "finished blocks GC"
952        );
953        Ok(())
954    }
955
956    #[tracing::instrument(skip(self))]
957    pub fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
958        tracing::trace!("started archives GC");
959
960        let mut archive_ids = self.archive_ids.write();
961
962        let retained_ids = match archive_ids
963            .items
964            .iter()
965            .rev()
966            .find(|&id| *id < until_id)
967            .cloned()
968        {
969            // Splits `archive_ids` into two parts - [..until_id] and [until_id..]
970            // `archive_ids` will now contain [..until_id]
971            Some(until_id) => archive_ids.items.split_off(&until_id),
972            None => {
973                tracing::trace!("nothing to remove");
974                return Ok(());
975            }
976        };
977        // so we must swap maps to retain [until_id..] and get ids to remove
978        let removed_ids = std::mem::replace(&mut archive_ids.items, retained_ids);
979
980        // Print removed range bounds and compute real `until_id`
981        let (Some(first), Some(last)) = (removed_ids.first(), removed_ids.last()) else {
982            tracing::info!("nothing to remove");
983            return Ok(());
984        };
985
986        let len = removed_ids.len();
987        let until_id = match archive_ids.items.first() {
988            Some(until_id) => *until_id,
989            None => *last + 1,
990        };
991
992        drop(archive_ids);
993
994        // Remove all archives in range `[0, until_id)`
995        let archives_cf = self.db.archives.cf();
996        let write_options = self.db.archives.write_config();
997
998        let start_key = [0u8; tables::Archives::KEY_LEN];
999
1000        // NOTE: End key points to the first entry of the `until_id` archive,
1001        // because `delete_range` removes all entries in range ["from", "to").
1002        let mut end_key = [0u8; tables::Archives::KEY_LEN];
1003        end_key[..4].copy_from_slice(&until_id.to_be_bytes());
1004        end_key[4..].copy_from_slice(&[0; 8]);
1005
1006        self.db
1007            .rocksdb()
1008            .delete_range_cf_opt(&archives_cf, start_key, end_key, write_options)?;
1009
1010        tracing::info!(archive_count = len, first, last, "finished archives GC");
1011        Ok(())
1012    }
1013
1014    // === Internal ===
1015
1016    fn add_data(&self, id: &PackageEntryKey, data: &[u8]) -> Result<(), rocksdb::Error> {
1017        self.db.package_entries.insert(id.to_vec(), data)
1018    }
1019
1020    async fn add_block_data_and_split(&self, id: &PackageEntryKey, data: &[u8]) -> Result<()> {
1021        let mut batch = rocksdb::WriteBatch::default();
1022
1023        batch.put_cf(&self.db.package_entries.cf(), id.to_vec(), data);
1024
1025        // Store info that new block was started
1026        let key = BlockDataEntryKey {
1027            block_id: id.block_id,
1028            chunk_index: BLOCK_DATA_STARTED_MAGIC,
1029        };
1030        batch.put_cf(&self.db.block_data_entries.cf(), key.to_vec(), []);
1031
1032        self.db.rocksdb().write(batch)?;
1033
1034        // Start splitting block data
1035        let permit = self.split_block_semaphore.clone().acquire_owned().await?;
1036        let _handle = self.spawn_split_block_data(&id.block_id, data, permit);
1037
1038        Ok(())
1039    }
1040
1041    async fn get_data(
1042        &self,
1043        handle: &BlockHandle,
1044        id: &PackageEntryKey,
1045    ) -> Result<OwnedPinnableSlice> {
1046        let _lock = match id.ty {
1047            ArchiveEntryType::Block => handle.block_data_lock(),
1048            ArchiveEntryType::Proof => handle.proof_data_lock(),
1049            ArchiveEntryType::QueueDiff => handle.queue_diff_data_lock(),
1050        }
1051        .read()
1052        .await;
1053
1054        match self.db.package_entries.get(id.to_vec())? {
1055            // SAFETY: A value was received from the same RocksDB instance.
1056            Some(value) => Ok(unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), value) }),
1057            None => Err(BlockStorageError::PackageEntryNotFound.into()),
1058        }
1059    }
1060
1061    async fn get_data_ref<'a, 'b: 'a>(
1062        &'a self,
1063        handle: &'b BlockHandle,
1064        id: &PackageEntryKey,
1065    ) -> Result<FullBlockDataGuard<'a>> {
1066        let lock = match id.ty {
1067            ArchiveEntryType::Block => handle.block_data_lock(),
1068            ArchiveEntryType::Proof => handle.proof_data_lock(),
1069            ArchiveEntryType::QueueDiff => handle.queue_diff_data_lock(),
1070        }
1071        .read()
1072        .await;
1073
1074        match self.db.package_entries.get(id.to_vec())? {
1075            Some(data) => Ok(FullBlockDataGuard { _lock: lock, data }),
1076            None => Err(BlockStorageError::PackageEntryNotFound.into()),
1077        }
1078    }
1079
1080    fn prepare_archive_id(&self, mc_seqno: u32, force_split_archive: bool) -> PreparedArchiveId {
1081        let mut archive_ids = self.archive_ids.write();
1082
1083        // Get the closest archive id
1084        let prev_id = archive_ids.items.range(..=mc_seqno).next_back().cloned();
1085
1086        if force_split_archive {
1087            archive_ids.override_next_id = Some(mc_seqno + 1);
1088        } else if let Some(next_id) = archive_ids.override_next_id {
1089            match mc_seqno.cmp(&next_id) {
1090                std::cmp::Ordering::Less => {}
1091                std::cmp::Ordering::Equal => {
1092                    let is_new = archive_ids.items.insert(mc_seqno);
1093                    return PreparedArchiveId {
1094                        id: mc_seqno,
1095                        is_new,
1096                        override_next_id: None,
1097                        to_commit: if is_new { prev_id } else { None },
1098                    };
1099                }
1100                std::cmp::Ordering::Greater => {
1101                    archive_ids.override_next_id = None;
1102                }
1103            }
1104        }
1105
1106        let mut archive_id = PreparedArchiveId {
1107            id: prev_id.unwrap_or_default(),
1108            override_next_id: archive_ids.override_next_id,
1109            ..Default::default()
1110        };
1111
1112        let is_first_archive = prev_id.is_none();
1113        if is_first_archive || mc_seqno.saturating_sub(archive_id.id) >= ARCHIVE_PACKAGE_SIZE {
1114            let is_new = archive_ids.items.insert(mc_seqno);
1115            archive_id = PreparedArchiveId {
1116                id: mc_seqno,
1117                is_new,
1118                override_next_id: None,
1119                to_commit: if is_new { prev_id } else { None },
1120            };
1121        }
1122
1123        // NOTE: subtraction is intentional to panic if archive_id > mc_seqno
1124        debug_assert!(mc_seqno - archive_id.id <= ARCHIVE_PACKAGE_SIZE);
1125
1126        archive_id
1127    }
1128
1129    fn clear_archive(&self, archive_id: u32) -> Result<()> {
1130        let archives_cf = self.db.archives.cf();
1131        let write_options = self.db.archives.write_config();
1132
1133        let mut start_key = [0u8; tables::Archives::KEY_LEN];
1134        start_key[..4].copy_from_slice(&archive_id.to_be_bytes());
1135        start_key[4..].fill(0x00);
1136
1137        let mut end_key = [0u8; tables::Archives::KEY_LEN];
1138        end_key[..4].copy_from_slice(&archive_id.to_be_bytes());
1139        end_key[4..].fill(0xFF);
1140
1141        self.db
1142            .rocksdb()
1143            .delete_range_cf_opt(&archives_cf, start_key, end_key, write_options)?;
1144
1145        Ok(())
1146    }
1147
1148    #[tracing::instrument(skip(self))]
1149    fn spawn_commit_archive(&self, archive_id: u32) -> CommitArchiveTask {
1150        let db = self.db.clone();
1151        let block_handle_storage = self.block_handle_storage.clone();
1152        let chunk_size = self.archive_chunk_size().get() as u64;
1153
1154        let span = tracing::Span::current();
1155        let cancelled = CancellationFlag::new();
1156
1157        let handle = tokio::task::spawn_blocking({
1158            let cancelled = cancelled.clone();
1159
1160            move || {
1161                let _span = span.enter();
1162
1163                let histogram = HistogramGuard::begin("tycho_storage_commit_archive_time");
1164
1165                tracing::info!("started");
1166                let guard = scopeguard::guard((), |_| {
1167                    tracing::warn!("cancelled");
1168                });
1169
1170                let raw_block_ids = db
1171                    .archive_block_ids
1172                    .get(archive_id.to_be_bytes())?
1173                    .ok_or(BlockStorageError::ArchiveNotFound)?;
1174                assert_eq!(raw_block_ids.len() % BlockId::SIZE_HINT, 0);
1175
1176                let mut writer = ArchiveWriter::new(&db, archive_id, chunk_size)?;
1177                let mut header_buffer = Vec::with_capacity(ARCHIVE_ENTRY_HEADER_LEN);
1178
1179                // Write archive prefix
1180                writer.write(&ARCHIVE_PREFIX)?;
1181
1182                // Write all entries. We group them by type to achieve better compression.
1183                let mut unique_ids = FastHashSet::default();
1184                for ty in [
1185                    ArchiveEntryType::Block,
1186                    ArchiveEntryType::Proof,
1187                    ArchiveEntryType::QueueDiff,
1188                ] {
1189                    for raw_block_id in raw_block_ids.chunks_exact(BlockId::SIZE_HINT) {
1190                        anyhow::ensure!(!cancelled.check(), "task aborted");
1191
1192                        let block_id = BlockId::from_slice(raw_block_id);
1193                        if !unique_ids.insert(block_id) {
1194                            tracing::warn!(%block_id, "skipped duplicate block id");
1195                            continue;
1196                        }
1197
1198                        // Check handle flags (only for the first type).
1199                        if ty == ArchiveEntryType::Block {
1200                            let handle = block_handle_storage
1201                                .load_handle(&block_id)
1202                                .ok_or(BlockStorageError::BlockHandleNotFound)?;
1203
1204                            let flags = handle.meta().flags();
1205                            anyhow::ensure!(
1206                                flags.contains(BlockFlags::HAS_ALL_BLOCK_PARTS),
1207                                "block does not have all parts: {block_id}, \
1208                                has_data={}, has_proof={}, queue_diff={}",
1209                                flags.contains(BlockFlags::HAS_DATA),
1210                                flags.contains(BlockFlags::HAS_PROOF),
1211                                flags.contains(BlockFlags::HAS_QUEUE_DIFF)
1212                            );
1213                        }
1214
1215                        let key = PackageEntryKey::from((block_id, ty));
1216                        let Some(data) = db.package_entries.get(key.to_vec()).unwrap() else {
1217                            return Err(BlockStorageError::BlockDataNotFound.into());
1218                        };
1219
1220                        // Serialize entry header
1221                        header_buffer.clear();
1222                        ArchiveEntryHeader {
1223                            block_id,
1224                            ty,
1225                            data_len: data.len() as u32,
1226                        }
1227                        .write_to(&mut header_buffer);
1228
1229                        // Write entry header and data
1230                        writer.write(&header_buffer)?;
1231                        writer.write(data.as_ref())?;
1232                    }
1233
1234                    unique_ids.clear();
1235                }
1236
1237                // Drop ids entry just in case (before removing it)
1238                drop(raw_block_ids);
1239
1240                // Finalize the archive
1241                writer.finalize()?;
1242
1243                // Done
1244                scopeguard::ScopeGuard::into_inner(guard);
1245                tracing::info!(
1246                    elapsed = %humantime::format_duration(histogram.finish()),
1247                    "finished"
1248                );
1249
1250                Ok(())
1251            }
1252        });
1253
1254        CommitArchiveTask {
1255            archive_id,
1256            cancelled,
1257            handle: Some(handle),
1258        }
1259    }
1260
1261    #[tracing::instrument(skip(self, data))]
1262    fn spawn_split_block_data(
1263        &self,
1264        block_id: &PartialBlockId,
1265        data: &[u8],
1266        permit: OwnedSemaphorePermit,
1267    ) -> JoinHandle<Result<()>> {
1268        let db = self.db.clone();
1269        let chunk_size = self.block_data_chunk_size().get() as usize;
1270
1271        let span = tracing::Span::current();
1272        tokio::task::spawn_blocking({
1273            let block_id = *block_id;
1274            let data = data.to_vec();
1275
1276            move || {
1277                let _span = span.enter();
1278
1279                let _histogram = HistogramGuard::begin("tycho_storage_split_block_data_time");
1280
1281                let mut compressed = Vec::new();
1282                tycho_util::compression::zstd_compress(&data, &mut compressed, 3);
1283
1284                let chunks = compressed.chunks(chunk_size);
1285                for (index, chunk) in chunks.enumerate() {
1286                    let key = BlockDataEntryKey {
1287                        block_id,
1288                        chunk_index: index as u32,
1289                    };
1290
1291                    db.block_data_entries.insert(key.to_vec(), chunk)?;
1292                }
1293
1294                let key = BlockDataEntryKey {
1295                    block_id,
1296                    chunk_index: BLOCK_DATA_SIZE_MAGIC,
1297                };
1298                db.block_data_entries
1299                    .insert(key.to_vec(), (compressed.len() as u32).to_le_bytes())?;
1300
1301                drop(permit);
1302
1303                Ok(())
1304            }
1305        })
1306    }
1307}
1308
1309struct CommitArchiveTask {
1310    archive_id: u32,
1311    cancelled: CancellationFlag,
1312    handle: Option<JoinHandle<Result<()>>>,
1313}
1314
1315impl CommitArchiveTask {
1316    async fn finish(&mut self) -> Result<()> {
1317        // NOTE: Await on reference to make sure that the task is cancel safe
1318        if let Some(handle) = &mut self.handle {
1319            if let Err(e) = handle
1320                .await
1321                .map_err(|e| {
1322                    if e.is_panic() {
1323                        std::panic::resume_unwind(e.into_panic());
1324                    }
1325                    anyhow::Error::from(e)
1326                })
1327                .and_then(std::convert::identity)
1328            {
1329                tracing::error!(
1330                    archive_id = self.archive_id,
1331                    "failed to commit archive: {e:?}"
1332                );
1333            }
1334
1335            self.handle = None;
1336        }
1337
1338        Ok(())
1339    }
1340}
1341
1342impl Drop for CommitArchiveTask {
1343    fn drop(&mut self) {
1344        self.cancelled.cancel();
1345        if let Some(handle) = &self.handle {
1346            handle.abort();
1347        }
1348    }
1349}
1350
1351struct ArchiveWriter<'a> {
1352    db: &'a CoreDb,
1353    archive_id: u32,
1354    chunk_len: usize,
1355    total_len: u64,
1356    chunk_index: u64,
1357    chunks_buffer: Vec<u8>,
1358    zstd_compressor: ZstdCompressStream<'a>,
1359}
1360
1361impl<'a> ArchiveWriter<'a> {
1362    fn new(db: &'a CoreDb, archive_id: u32, chunk_len: u64) -> Result<Self> {
1363        let chunk_len = chunk_len as usize;
1364
1365        let mut zstd_compressor = ZstdCompressStream::new(9, chunk_len)?;
1366
1367        let workers = (std::thread::available_parallelism()?.get() / 4) as u8;
1368        zstd_compressor.multithreaded(workers)?;
1369
1370        Ok(Self {
1371            db,
1372            archive_id,
1373            chunk_len,
1374            total_len: 0,
1375            chunk_index: 0,
1376            chunks_buffer: Vec::with_capacity(chunk_len),
1377            zstd_compressor,
1378        })
1379    }
1380
1381    fn write(&mut self, data: &[u8]) -> Result<()> {
1382        self.zstd_compressor.write(data, &mut self.chunks_buffer)?;
1383        self.flush(false)
1384    }
1385
1386    fn finalize(mut self) -> Result<()> {
1387        self.zstd_compressor.finish(&mut self.chunks_buffer)?;
1388
1389        // Write the last chunk
1390        self.flush(true)?;
1391        debug_assert!(self.chunks_buffer.is_empty());
1392
1393        // Write archive size and remove archive block ids atomically
1394        let archives_cf = self.db.archives.cf();
1395        let block_ids_cf = self.db.archive_block_ids.cf();
1396
1397        let mut batch = rocksdb::WriteBatch::default();
1398
1399        // Write a special entry with the total size of the archive
1400        let mut key = [0u8; tables::Archives::KEY_LEN];
1401        key[..4].copy_from_slice(&self.archive_id.to_be_bytes());
1402        key[4..].copy_from_slice(&ARCHIVE_SIZE_MAGIC.to_be_bytes());
1403        batch.put_cf(&archives_cf, key.as_slice(), self.total_len.to_le_bytes());
1404
1405        // Remove related block ids
1406        batch.delete_cf(&block_ids_cf, self.archive_id.to_be_bytes());
1407
1408        self.db.rocksdb().write(batch)?;
1409        Ok(())
1410    }
1411
1412    fn flush(&mut self, finalize: bool) -> Result<()> {
1413        let buffer_len = self.chunks_buffer.len();
1414        if buffer_len == 0 {
1415            return Ok(());
1416        }
1417
1418        let mut key = [0u8; tables::Archives::KEY_LEN];
1419        key[..4].copy_from_slice(&self.archive_id.to_be_bytes());
1420
1421        let mut do_flush = |data: &[u8]| {
1422            key[4..].copy_from_slice(&self.chunk_index.to_be_bytes());
1423
1424            self.total_len += data.len() as u64;
1425            self.chunk_index += 1;
1426
1427            self.db.archives.insert(key, data)
1428        };
1429
1430        // Write all full chunks
1431        let mut buffer_offset = 0;
1432        while buffer_offset + self.chunk_len <= buffer_len {
1433            do_flush(&self.chunks_buffer[buffer_offset..buffer_offset + self.chunk_len])?;
1434            buffer_offset += self.chunk_len;
1435        }
1436
1437        if finalize {
1438            // Just write the remaining data on finalize
1439            do_flush(&self.chunks_buffer[buffer_offset..])?;
1440            self.chunks_buffer.clear();
1441        } else {
1442            // Shift the remaining data to the beginning of the buffer and clear the rest
1443            let rem = buffer_len % self.chunk_len;
1444            if rem == 0 {
1445                self.chunks_buffer.clear();
1446            } else if buffer_offset > 0 {
1447                // TODO: Use memmove since we are copying non-overlapping regions
1448                self.chunks_buffer.copy_within(buffer_offset.., 0);
1449                self.chunks_buffer.truncate(rem);
1450            }
1451        }
1452
1453        Ok(())
1454    }
1455}
1456
1457#[derive(Clone)]
1458pub enum MaybeExistingHandle {
1459    Existing(BlockHandle),
1460    New(NewBlockMeta),
1461}
1462
1463impl From<BlockHandle> for MaybeExistingHandle {
1464    fn from(handle: BlockHandle) -> Self {
1465        Self::Existing(handle)
1466    }
1467}
1468
1469impl From<NewBlockMeta> for MaybeExistingHandle {
1470    fn from(meta_data: NewBlockMeta) -> Self {
1471        Self::New(meta_data)
1472    }
1473}
1474
1475pub struct StoreBlockResult {
1476    pub handle: BlockHandle,
1477    pub updated: bool,
1478    pub new: bool,
1479}
1480
1481#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1482pub enum ArchiveId {
1483    Found(u32),
1484    TooNew,
1485    NotFound,
1486}
1487
1488#[derive(Default)]
1489struct ArchiveIds {
1490    items: BTreeSet<u32>,
1491    override_next_id: Option<u32>,
1492}
1493
1494fn remove_blocks(
1495    db: CoreDb,
1496    max_blocks_per_batch: Option<usize>,
1497    mc_seqno: u32,
1498    shard_heights: ShardHeights,
1499    cancelled: Option<&CancellationFlag>,
1500) -> Result<BlockGcStats> {
1501    let mut stats = BlockGcStats::default();
1502
1503    let raw = db.rocksdb().as_ref();
1504    let full_block_ids_cf = db.full_block_ids.cf();
1505    let block_connections_cf = db.block_connections.cf();
1506    let package_entries_cf = db.package_entries.cf();
1507    let block_data_entries_cf = db.block_data_entries.cf();
1508    let block_handles_cf = db.block_handles.cf();
1509
1510    // Create batch
1511    let mut batch = rocksdb::WriteBatch::default();
1512    let mut batch_len = 0;
1513
1514    // Iterate all entries and find expired items
1515    let mut blocks_iter =
1516        raw.raw_iterator_cf_opt(&full_block_ids_cf, db.full_block_ids.new_read_config());
1517    blocks_iter.seek_to_first();
1518
1519    let block_handles_readopts = db.block_handles.new_read_config();
1520    let is_persistent = |root_hash: &[u8; 32]| -> Result<bool> {
1521        const FLAGS: u64 =
1522            ((BlockFlags::IS_KEY_BLOCK.bits() | BlockFlags::IS_PERSISTENT.bits()) as u64) << 32;
1523
1524        let Some(value) =
1525            raw.get_pinned_cf_opt(&block_handles_cf, root_hash, &block_handles_readopts)?
1526        else {
1527            return Ok(false);
1528        };
1529        Ok(value.as_ref().get_u64_le() & FLAGS != 0)
1530    };
1531
1532    let mut key_buffer = [0u8; tables::PackageEntries::KEY_LEN];
1533    let mut delete_range =
1534        |batch: &mut rocksdb::WriteBatch, from: &BlockIdShort, to: &BlockIdShort| {
1535            debug_assert_eq!(from.shard, to.shard);
1536            debug_assert!(from.seqno <= to.seqno);
1537
1538            let range_from = &mut key_buffer;
1539            range_from[..4].copy_from_slice(&from.shard.workchain().to_be_bytes());
1540            range_from[4..12].copy_from_slice(&from.shard.prefix().to_be_bytes());
1541            range_from[12..16].copy_from_slice(&from.seqno.to_be_bytes());
1542
1543            let mut range_to = *range_from;
1544            range_to[12..16].copy_from_slice(&to.seqno.saturating_add(1).to_be_bytes());
1545
1546            // At this point we have two keys:
1547            // [workchain, shard, from_seqno, 0...]
1548            // [workchain, shard, to_seqno + 1, 0...]
1549            //
1550            // It will delete all entries in range [from_seqno, to_seqno) for this shard.
1551            // Note that package entry keys are the same as block connection keys.
1552            batch.delete_range_cf(&full_block_ids_cf, &*range_from, &range_to);
1553            batch.delete_range_cf(&package_entries_cf, &*range_from, &range_to);
1554            batch.delete_range_cf(&block_data_entries_cf, &*range_from, &range_to);
1555            batch.delete_range_cf(&block_connections_cf, &*range_from, &range_to);
1556
1557            tracing::debug!(%from, %to, "delete_range");
1558        };
1559
1560    let mut cancelled = cancelled.map(|c| c.debounce(100));
1561    let mut current_range = None::<(BlockIdShort, BlockIdShort)>;
1562    loop {
1563        let key = match blocks_iter.key() {
1564            Some(key) => key,
1565            None => break blocks_iter.status()?,
1566        };
1567
1568        if let Some(cancelled) = &mut cancelled {
1569            if cancelled.check() {
1570                anyhow::bail!("blocks GC cancelled");
1571            }
1572        }
1573
1574        // Key structure:
1575        // [workchain id, 4 bytes]  |
1576        // [shard id, 8 bytes]      | BlockIdShort
1577        // [seqno, 4 bytes]         |
1578        // [root hash, 32 bytes] <-
1579        // ..
1580        let block_id = BlockIdShort::from_slice(key);
1581        let root_hash: &[u8; 32] = key[16..48].try_into().unwrap();
1582        let is_masterchain = block_id.shard.is_masterchain();
1583
1584        // Don't gc latest blocks, key blocks or persistent blocks
1585        if block_id.seqno == 0
1586            || is_masterchain && block_id.seqno >= mc_seqno
1587            || !is_masterchain
1588                && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
1589            || is_persistent(root_hash)?
1590        {
1591            // Remove the current range
1592            if let Some((from, to)) = current_range.take() {
1593                delete_range(&mut batch, &from, &to);
1594                batch_len += 1; // Ensure that we flush the batch
1595            }
1596            blocks_iter.next();
1597            continue;
1598        }
1599
1600        match &mut current_range {
1601            // Delete the previous range and start a new one
1602            Some((from, to)) if from.shard != block_id.shard => {
1603                delete_range(&mut batch, from, to);
1604                *from = block_id;
1605                *to = block_id;
1606            }
1607            // Update the current range
1608            Some((_, to)) => *to = block_id,
1609            // Start a new range
1610            None => current_range = Some((block_id, block_id)),
1611        }
1612
1613        // Count entry
1614        stats.total_blocks_removed += 1;
1615        if is_masterchain {
1616            stats.mc_blocks_removed += 1;
1617        }
1618
1619        batch.delete_cf(&block_handles_cf, root_hash);
1620
1621        batch_len += 1;
1622        if matches!(
1623            max_blocks_per_batch,
1624            Some(max_blocks_per_batch) if batch_len >= max_blocks_per_batch
1625        ) {
1626            tracing::info!(
1627                total_blocks_removed = stats.total_blocks_removed,
1628                "applying intermediate batch",
1629            );
1630            let batch = std::mem::take(&mut batch);
1631            raw.write(batch)?;
1632            batch_len = 0;
1633        }
1634
1635        blocks_iter.next();
1636    }
1637
1638    if let Some((from, to)) = current_range.take() {
1639        delete_range(&mut batch, &from, &to);
1640        batch_len += 1; // Ensure that we flush the batch
1641    }
1642
1643    if batch_len > 0 {
1644        tracing::info!("applying final batch");
1645        raw.write(batch)?;
1646    }
1647
1648    // Done
1649    Ok(stats)
1650}
1651
1652pub struct BlockStorageConfig {
1653    pub archive_chunk_size: ByteSize,
1654    pub blocks_cache: BlocksCacheConfig,
1655    pub split_block_tasks: usize,
1656}
1657
1658#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
1659pub struct BlockGcStats {
1660    pub mc_blocks_removed: usize,
1661    pub total_blocks_removed: usize,
1662}
1663
1664struct FullBlockDataGuard<'a> {
1665    _lock: BlockDataGuard<'a>,
1666    data: rocksdb::DBPinnableSlice<'a>,
1667}
1668
1669impl AsRef<[u8]> for FullBlockDataGuard<'_> {
1670    fn as_ref(&self) -> &[u8] {
1671        self.data.as_ref()
1672    }
1673}
1674
1675fn extract_entry_type(key: &[u8]) -> Option<ArchiveEntryType> {
1676    key.get(48).copied().and_then(ArchiveEntryType::from_byte)
1677}
1678
1679const ARCHIVE_PACKAGE_SIZE: u32 = 100;
1680// Reserved key in which the archive size is stored
1681const ARCHIVE_SIZE_MAGIC: u64 = u64::MAX;
1682// Reserved key in which we store the fact that the archive must be committed
1683const ARCHIVE_TO_COMMIT_MAGIC: u64 = u64::MAX - 1;
1684// Reserved key in which we store the next archive id to override.
1685const ARCHIVE_OVERRIDE_NEXT_MAGIC: u64 = u64::MAX - 2;
1686// Reserved key in which we store the fact that archive was started
1687const ARCHIVE_STARTED_MAGIC: u64 = u64::MAX - 3;
1688
1689const ARCHIVE_MAGIC_MIN: u64 = u64::MAX & !0xff;
1690
1691const BLOCK_DATA_CHUNK_SIZE: u32 = 1024 * 1024; // 1MB
1692
1693// Reserved key in which the compressed block size is stored
1694const BLOCK_DATA_SIZE_MAGIC: u32 = u32::MAX;
1695// Reserved key in which we store the fact that compressed block was started
1696const BLOCK_DATA_STARTED_MAGIC: u32 = u32::MAX - 2;
1697
1698#[derive(Default)]
1699struct PreparedArchiveId {
1700    id: u32,
1701    is_new: bool,
1702    override_next_id: Option<u32>,
1703    to_commit: Option<u32>,
1704}
1705
1706type ArchiveIdsTx = broadcast::Sender<u32>;
1707type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
1708
1709#[derive(thiserror::Error, Debug)]
1710enum BlockStorageError {
1711    #[error("Archive not found")]
1712    ArchiveNotFound,
1713    #[error("Block data not found")]
1714    BlockDataNotFound,
1715    #[error("Block proof not found")]
1716    BlockProofNotFound,
1717    #[error("Queue diff not found")]
1718    QueueDiffNotFound,
1719    #[error("Block handle id mismatch")]
1720    BlockHandleIdMismatch,
1721    #[error("Block handle not found")]
1722    BlockHandleNotFound,
1723    #[error("Package entry not found")]
1724    PackageEntryNotFound,
1725    #[error("Offset is outside of the archive slice")]
1726    InvalidOffset,
1727}
1728
1729#[cfg(test)]
1730mod tests {
1731    use std::pin::pin;
1732
1733    use tycho_block_util::archive::WithArchiveData;
1734    use tycho_storage::StorageContext;
1735    use tycho_util::FastHashMap;
1736    use tycho_util::futures::JoinTask;
1737
1738    use super::*;
1739    use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
1740
1741    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1742    async fn parallel_store_data() -> Result<()> {
1743        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1744        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1745
1746        let shard = ShardIdent::MASTERCHAIN;
1747        for seqno in 0..1000 {
1748            let block = BlockStuff::new_empty(shard, seqno);
1749            let block = {
1750                let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
1751                WithArchiveData::new(block, data)
1752            };
1753            let block_id = block.id();
1754
1755            let proof = BlockProofStuff::new_empty(block_id);
1756            let proof = {
1757                let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
1758                WithArchiveData::new(proof, data)
1759            };
1760
1761            let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
1762                .serialize()
1763                .build(block_id);
1764
1765            let block_meta = NewBlockMeta {
1766                is_key_block: shard.is_masterchain() && seqno == 0,
1767                gen_utime: 0,
1768                ref_by_mc_seqno: seqno,
1769            };
1770
1771            let store_block_data = || {
1772                let storage = storage.clone();
1773                JoinTask::new(async move {
1774                    let res = storage
1775                        .block_storage()
1776                        .store_block_data(&block, &block.archive_data, block_meta)
1777                        .await?;
1778
1779                    Ok::<_, anyhow::Error>(res.handle)
1780                })
1781            };
1782
1783            let store_proof_and_queue = || {
1784                let storage = storage.clone();
1785                let proof = proof.clone();
1786                let queue_diff = queue_diff.clone();
1787                JoinTask::new(async move {
1788                    if rand::random::<bool>() {
1789                        tokio::task::yield_now().await;
1790                    }
1791
1792                    let res = storage
1793                        .block_storage()
1794                        .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
1795                        .await?;
1796
1797                    if rand::random::<bool>() {
1798                        tokio::task::yield_now().await;
1799                    }
1800
1801                    let res = storage
1802                        .block_storage()
1803                        .store_queue_diff(&queue_diff, res.handle.into())
1804                        .await?;
1805
1806                    if rand::random::<bool>() {
1807                        tokio::task::yield_now().await;
1808                    }
1809
1810                    Ok::<_, anyhow::Error>(res.handle)
1811                })
1812            };
1813
1814            let (data_res, proof_and_queue_res) = async move {
1815                let data_fut = pin!(store_block_data());
1816                let proof_and_queue_fut = pin!(async {
1817                    tokio::select! {
1818                        left = store_proof_and_queue() => left,
1819                        right = store_proof_and_queue() => right,
1820                    }
1821                });
1822
1823                let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
1824
1825                Ok::<_, anyhow::Error>((data?, other?))
1826            }
1827            .await?;
1828
1829            assert!(std::ptr::addr_eq(
1830                arc_swap::RefCnt::as_ptr(&data_res),
1831                arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
1832            ));
1833            assert!(data_res.has_all_block_parts());
1834        }
1835
1836        Ok(())
1837    }
1838
1839    #[tokio::test]
1840    async fn blocks_gc() -> Result<()> {
1841        const GARBAGE: &[u8] = b"garbage";
1842        const ENTRY_TYPES: [ArchiveEntryType; 3] = [
1843            ArchiveEntryType::Block,
1844            ArchiveEntryType::Proof,
1845            ArchiveEntryType::QueueDiff,
1846        ];
1847        const CONNECTION_TYPES: [BlockConnection; 2] =
1848            [BlockConnection::Prev1, BlockConnection::Next1];
1849
1850        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1851        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1852
1853        let blocks = storage.block_storage();
1854        let block_handles = storage.block_handle_storage();
1855        let block_connections = storage.block_connection_storage();
1856
1857        let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
1858
1859        for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
1860            let entry = shard_block_ids.entry(shard).or_default();
1861
1862            for seqno in 0..100 {
1863                let block_id = BlockId {
1864                    shard,
1865                    seqno,
1866                    root_hash: HashBytes(rand::random()),
1867                    file_hash: HashBytes(rand::random()),
1868                };
1869                entry.push(block_id);
1870
1871                let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
1872                    is_key_block: shard.is_masterchain() && seqno == 0,
1873                    gen_utime: 0,
1874                    ref_by_mc_seqno: seqno,
1875                });
1876
1877                for ty in ENTRY_TYPES {
1878                    blocks.add_data(&(block_id, ty).into(), GARBAGE)?;
1879                }
1880                for direction in CONNECTION_TYPES {
1881                    block_connections.store_connection(&handle, direction, &block_id);
1882                }
1883
1884                handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
1885                block_handles.store_handle(&handle, false);
1886            }
1887        }
1888
1889        // Remove some blocks
1890        let stats = remove_blocks(
1891            blocks.db.clone(),
1892            None,
1893            70,
1894            [(ShardIdent::BASECHAIN, 50)].into(),
1895            None,
1896        )?;
1897        assert_eq!(stats, BlockGcStats {
1898            mc_blocks_removed: 69,
1899            total_blocks_removed: 69 + 49,
1900        });
1901
1902        let removed_ranges = FastHashMap::from_iter([
1903            (ShardIdent::MASTERCHAIN, vec![1..=69]),
1904            (ShardIdent::BASECHAIN, vec![1..=49]),
1905        ]);
1906        for (shard, block_ids) in shard_block_ids {
1907            let removed_ranges = removed_ranges.get(&shard).unwrap();
1908
1909            for block_id in block_ids {
1910                let must_be_removed = 'removed: {
1911                    for range in removed_ranges {
1912                        if range.contains(&block_id.seqno) {
1913                            break 'removed true;
1914                        }
1915                    }
1916                    false
1917                };
1918
1919                let handle = block_handles.load_handle(&block_id);
1920                assert_eq!(handle.is_none(), must_be_removed);
1921
1922                for ty in ENTRY_TYPES {
1923                    let key = PackageEntryKey::from((block_id, ty));
1924                    let stored = blocks.db.package_entries.get(key.to_vec())?;
1925                    assert_eq!(stored.is_none(), must_be_removed);
1926                }
1927
1928                for direction in CONNECTION_TYPES {
1929                    let connection = block_connections.load_connection(&block_id, direction);
1930                    assert_eq!(connection.is_none(), must_be_removed);
1931                }
1932            }
1933        }
1934
1935        // Remove single block
1936        let stats = remove_blocks(
1937            blocks.db.clone(),
1938            None,
1939            71,
1940            [(ShardIdent::BASECHAIN, 51)].into(),
1941            None,
1942        )?;
1943        assert_eq!(stats, BlockGcStats {
1944            mc_blocks_removed: 1,
1945            total_blocks_removed: 2,
1946        });
1947
1948        // Remove no blocks
1949        let stats = remove_blocks(
1950            blocks.db.clone(),
1951            None,
1952            71,
1953            [(ShardIdent::BASECHAIN, 51)].into(),
1954            None,
1955        )?;
1956        assert_eq!(stats, BlockGcStats {
1957            mc_blocks_removed: 0,
1958            total_blocks_removed: 0,
1959        });
1960
1961        Ok(())
1962    }
1963}