1use std::collections::BTreeSet;
2use std::num::NonZeroU32;
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use bytes::Buf;
8use bytesize::ByteSize;
9use parking_lot::RwLock;
10use tl_proto::TlWrite;
11use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast};
12use tokio::task::JoinHandle;
13use tycho_block_util::archive::{
14 ARCHIVE_ENTRY_HEADER_LEN, ARCHIVE_PREFIX, ArchiveData, ArchiveEntryHeader, ArchiveEntryType,
15};
16use tycho_block_util::block::{
17 BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug, ShardHeights,
18};
19use tycho_block_util::queue::{QueueDiffStuff, QueueDiffStuffAug};
20use tycho_storage::kv::StoredValue;
21use tycho_types::boc::{Boc, BocRepr};
22use tycho_types::cell::HashBytes;
23use tycho_types::models::*;
24use tycho_util::compression::ZstdCompressStream;
25use tycho_util::metrics::HistogramGuard;
26use tycho_util::sync::{CancellationFlag, rayon_run};
27use tycho_util::{FastHashSet, FastHasherState};
28use weedb::{ColumnFamily, OwnedPinnableSlice, rocksdb};
29
30pub use self::package_entry::{BlockDataEntryKey, PackageEntryKey, PartialBlockId};
31use super::util::SlotSubscriptions;
32use super::{
33 BlockConnectionStorage, BlockDataGuard, BlockFlags, BlockHandle, BlockHandleStorage,
34 BlocksCacheConfig, CoreDb, HandleCreationStatus, NewBlockMeta, tables,
35};
36
37mod package_entry;
38
39const METRIC_LOAD_BLOCK_TOTAL: &str = "tycho_storage_load_block_total";
40const METRIC_BLOCK_CACHE_HIT_TOTAL: &str = "tycho_storage_block_cache_hit_total";
41
42pub struct BlockStorage {
43 db: CoreDb,
44 blocks_cache: BlocksCache,
45 block_handle_storage: Arc<BlockHandleStorage>,
46 block_connection_storage: Arc<BlockConnectionStorage>,
47 archive_ids: RwLock<ArchiveIds>,
48 block_subscriptions: SlotSubscriptions<BlockId, BlockStuff>,
49 store_block_data: tokio::sync::RwLock<()>,
50 prev_archive_commit: tokio::sync::Mutex<Option<CommitArchiveTask>>,
51 archive_ids_tx: ArchiveIdsTx,
52 archive_chunk_size: NonZeroU32,
53 split_block_semaphore: Arc<Semaphore>,
54}
55
56impl BlockStorage {
57 pub fn new(
60 db: CoreDb,
61 config: BlockStorageConfig,
62 block_handle_storage: Arc<BlockHandleStorage>,
63 block_connection_storage: Arc<BlockConnectionStorage>,
64 archive_chunk_size: ByteSize,
65 ) -> Self {
66 fn weigher(_key: &BlockId, value: &BlockStuff) -> u32 {
67 const BLOCK_STUFF_OVERHEAD: u32 = 1024; std::mem::size_of::<BlockId>() as u32
70 + BLOCK_STUFF_OVERHEAD
71 + value.data_size().try_into().unwrap_or(u32::MAX)
72 }
73
74 let blocks_cache = moka::sync::Cache::builder()
75 .time_to_live(config.blocks_cache.ttl)
76 .max_capacity(config.blocks_cache.size.as_u64())
77 .weigher(weigher)
78 .build_with_hasher(Default::default());
79
80 let (archive_ids_tx, _) = broadcast::channel(4);
81
82 let archive_chunk_size =
83 NonZeroU32::new(archive_chunk_size.as_u64().clamp(1, u32::MAX as _) as _).unwrap();
84
85 let split_block_semaphore = Arc::new(Semaphore::new(config.split_block_tasks));
86
87 Self {
88 db,
89 blocks_cache,
90 block_handle_storage,
91 block_connection_storage,
92 archive_ids_tx,
93 archive_chunk_size,
94 split_block_semaphore,
95 archive_ids: Default::default(),
96 block_subscriptions: Default::default(),
97 store_block_data: Default::default(),
98 prev_archive_commit: Default::default(),
99 }
100 }
101
102 pub fn archive_chunk_size(&self) -> NonZeroU32 {
105 self.archive_chunk_size
106 }
107
108 pub fn block_data_chunk_size(&self) -> NonZeroU32 {
109 NonZeroU32::new(BLOCK_DATA_CHUNK_SIZE).unwrap()
110 }
111
112 pub async fn finish_block_data(&self) -> Result<()> {
113 let started_at = Instant::now();
114
115 tracing::info!("started finishing compressed block data");
116
117 let mut iter = self.db.block_data_entries.raw_iterator();
118 iter.seek_to_first();
119
120 let mut blocks_to_finish = Vec::new();
121
122 loop {
123 let Some(key) = iter.key() else {
124 if let Err(e) = iter.status() {
125 tracing::error!("failed to iterate through compressed block data: {e:?}");
126 }
127 break;
128 };
129
130 let key = BlockDataEntryKey::from_slice(key);
131
132 const _: () = const {
133 assert!(BLOCK_DATA_STARTED_MAGIC < BLOCK_DATA_SIZE_MAGIC);
135 };
136
137 match key.chunk_index {
139 BLOCK_DATA_STARTED_MAGIC => {
141 blocks_to_finish.push(key.block_id);
142 }
143 BLOCK_DATA_SIZE_MAGIC => {
145 let last = blocks_to_finish.pop();
147 anyhow::ensure!(last == Some(key.block_id), "invalid block data SIZE entry");
148 }
149 _ => {}
150 }
151
152 iter.next();
153 }
154
155 drop(iter);
156
157 for block_id in blocks_to_finish {
158 tracing::info!(?block_id, "found unfinished block");
159
160 let key = PackageEntryKey {
161 block_id,
162 ty: ArchiveEntryType::Block,
163 };
164
165 let data = match self.db.package_entries.get(key.to_vec())? {
166 Some(data) => data,
167 None => return Err(BlockStorageError::BlockDataNotFound.into()),
168 };
169
170 let permit = self.split_block_semaphore.clone().acquire_owned().await?;
171 self.spawn_split_block_data(&block_id, &data, permit)
172 .await??;
173 }
174
175 tracing::info!(
176 elapsed = %humantime::format_duration(started_at.elapsed()),
177 "finished handling unfinished blocks"
178 );
179
180 Ok(())
181 }
182
183 pub async fn preload_archive_ids(&self) -> Result<()> {
185 let started_at = Instant::now();
186
187 tracing::info!("started preloading archive ids");
188
189 let db = self.db.clone();
190
191 let (archive_ids, override_next_id, to_commit) = tokio::task::spawn_blocking(move || {
192 let mut iter = db.archives.raw_iterator();
193 iter.seek_to_first();
194
195 let mut archive_ids = BTreeSet::new();
196 let mut archives_to_commit = Vec::new();
197 let mut override_next_id = None;
198 loop {
199 let Some((key, value)) = iter.item() else {
200 if let Err(e) = iter.status() {
201 tracing::error!("failed to iterate through archives: {e:?}");
202 }
203 break;
204 };
205
206 let archive_id = u32::from_be_bytes(key[..4].try_into().unwrap());
207 let chunk_index = u64::from_be_bytes(key[4..].try_into().unwrap());
208
209 const _: () = const {
210 assert!(ARCHIVE_STARTED_MAGIC < ARCHIVE_OVERRIDE_NEXT_MAGIC);
212 assert!(ARCHIVE_OVERRIDE_NEXT_MAGIC < ARCHIVE_TO_COMMIT_MAGIC);
213 assert!(ARCHIVE_TO_COMMIT_MAGIC < ARCHIVE_SIZE_MAGIC);
214 };
215
216 let mut skip = None;
217
218 if let Some(next_id) = override_next_id {
219 if archive_id > next_id {
221 override_next_id = None;
222 }
223 }
224
225 match chunk_index {
227 ARCHIVE_STARTED_MAGIC => {
229 archive_ids.insert(archive_id);
230 }
231 ARCHIVE_OVERRIDE_NEXT_MAGIC => {
233 override_next_id = Some(u32::from_le_bytes(value[..4].try_into().unwrap()));
234 }
235 ARCHIVE_TO_COMMIT_MAGIC => {
237 anyhow::ensure!(
238 archive_ids.contains(&archive_id),
239 "invalid archive TO_COMMIT entry"
240 );
241 archives_to_commit.push(archive_id);
242 }
243 ARCHIVE_SIZE_MAGIC => {
245 let last = archives_to_commit.pop();
247 anyhow::ensure!(last == Some(archive_id), "invalid archive SIZE entry");
248
249 anyhow::ensure!(archives_to_commit.is_empty(), "skipped archive commit");
251 }
252 _ => {
253 if chunk_index < ARCHIVE_STARTED_MAGIC {
255 let mut next_key = [0; tables::Archives::KEY_LEN];
256 next_key[..4].copy_from_slice(&archive_id.to_be_bytes());
257 next_key[4..].copy_from_slice(&ARCHIVE_STARTED_MAGIC.to_be_bytes());
258 skip = Some(next_key);
259 }
260 }
261 }
262
263 match skip {
264 None => iter.next(),
265 Some(key) => iter.seek(key),
266 }
267 }
268
269 Ok::<_, anyhow::Error>((archive_ids, override_next_id, archives_to_commit))
270 })
271 .await??;
272
273 {
274 let mut ids = self.archive_ids.write();
275 ids.items.extend(archive_ids);
276 ids.override_next_id = override_next_id;
277 }
278
279 tracing::info!(
280 elapsed = %humantime::format_duration(started_at.elapsed()),
281 ?override_next_id,
282 "finished preloading archive ids"
283 );
284
285 for archive_id in to_commit {
286 tracing::info!(archive_id, "clear partially committed archive");
287 self.clear_archive(archive_id)?;
290
291 tracing::info!(archive_id, "rewrite partially committed archive");
292 let mut task = self.spawn_commit_archive(archive_id);
293 task.finish().await?;
294
295 self.archive_ids_tx.send(task.archive_id).ok();
297 }
298
299 Ok(())
300 }
301
302 pub async fn wait_for_block(&self, block_id: &BlockId) -> Result<BlockStuffAug> {
305 let block_handle_storage = &self.block_handle_storage;
306
307 let guard = self.store_block_data.write().await;
309
310 if let Some(handle) = block_handle_storage.load_handle(block_id) {
312 if handle.has_data() {
313 drop(guard);
314 let block = self.load_block_data(&handle).await?;
315 return Ok(BlockStuffAug::loaded(block));
316 }
317 }
318
319 let rx = self.block_subscriptions.subscribe(block_id);
321
322 drop(guard);
324
325 let block = rx.await;
326 Ok(BlockStuffAug::loaded(block))
327 }
328
329 pub async fn wait_for_next_block(&self, prev_block_id: &BlockId) -> Result<BlockStuffAug> {
330 let block_id = self
331 .block_connection_storage
332 .wait_for_next1(prev_block_id)
333 .await;
334
335 self.wait_for_block(&block_id).await
336 }
337
338 pub async fn store_block_data(
341 &self,
342 block: &BlockStuff,
343 archive_data: &ArchiveData,
344 meta_data: NewBlockMeta,
345 ) -> Result<StoreBlockResult> {
346 let guard = self.store_block_data.read().await;
350
351 let block_id = block.id();
352 let (handle, status) = self
353 .block_handle_storage
354 .create_or_load_handle(block_id, meta_data);
355
356 let archive_id = PackageEntryKey::block(block_id);
357 let mut updated = false;
358 if !handle.has_data() {
359 let data = archive_data.as_new_archive_data()?;
360 metrics::histogram!("tycho_storage_store_block_data_size").record(data.len() as f64);
361
362 let _lock = handle.block_data_lock().write().await;
363 if !handle.has_data() {
364 self.add_block_data_and_split(&archive_id, data).await?;
365 if handle.meta().add_flags(BlockFlags::HAS_DATA) {
366 self.block_handle_storage.store_handle(&handle, false);
367 updated = true;
368 }
369 }
370 }
371
372 self.block_subscriptions.notify(block_id, block);
374
375 drop(guard);
376
377 self.blocks_cache.insert(*block_id, block.clone());
379
380 Ok(StoreBlockResult {
381 handle,
382 updated,
383 new: status == HandleCreationStatus::Created,
384 })
385 }
386
387 pub async fn load_block_data(&self, handle: &BlockHandle) -> Result<BlockStuff> {
388 metrics::counter!(METRIC_LOAD_BLOCK_TOTAL).increment(1);
389
390 const BIG_DATA_THRESHOLD: usize = 1 << 20; let _histogram = HistogramGuard::begin("tycho_storage_load_block_data_time");
393
394 if !handle.has_data() {
395 return Err(BlockStorageError::BlockDataNotFound.into());
396 }
397
398 if let Some(block) = self.blocks_cache.get(handle.id()) {
400 metrics::counter!(METRIC_BLOCK_CACHE_HIT_TOTAL).increment(1);
401 return Ok(block.clone());
402 }
403
404 let FullBlockDataGuard { _lock, data } = self
405 .get_data_ref(handle, &PackageEntryKey::block(handle.id()))
406 .await?;
407
408 if data.len() > BIG_DATA_THRESHOLD {
409 BlockStuff::deserialize(handle.id(), data.as_ref())
410 } else {
411 let handle = handle.clone();
412
413 let owned_data =
415 unsafe { weedb::OwnedPinnableSlice::new(self.db.rocksdb().clone(), data) };
416 rayon_run(move || BlockStuff::deserialize(handle.id(), owned_data.as_ref())).await
417 }
418 }
419
420 pub async fn load_block_data_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
421 if !handle.has_data() {
422 return Err(BlockStorageError::BlockDataNotFound.into());
423 }
424 self.get_data(handle, &PackageEntryKey::block(handle.id()))
425 .await
426 }
427
428 pub async fn list_blocks(
429 &self,
430 continuation: Option<BlockIdShort>,
431 ) -> Result<(Vec<BlockId>, Option<BlockIdShort>)> {
432 const LIMIT: usize = 1000; const MAX_BYTES: usize = 1 << 20; let continuation = continuation.map(|block_id| {
436 PackageEntryKey::block(&BlockId {
437 shard: block_id.shard,
438 seqno: block_id.seqno,
439 root_hash: HashBytes::ZERO,
440 file_hash: HashBytes::ZERO,
441 })
442 .to_vec()
443 });
444
445 let mut iter = {
446 let mut readopts = rocksdb::ReadOptions::default();
447 tables::PackageEntries::read_options(&mut readopts);
448 if let Some(key) = &continuation {
449 readopts.set_iterate_lower_bound(key.as_slice());
450 }
451 self.db
452 .rocksdb()
453 .raw_iterator_cf_opt(&self.db.package_entries.cf(), readopts)
454 };
455
456 match continuation {
458 None => iter.seek_to_first(),
459 Some(key) => iter.seek(key),
460 }
461
462 let mut bytes = 0;
463 let mut blocks = Vec::new();
464
465 let continuation = loop {
466 let (key, value) = match iter.item() {
467 Some(item) => item,
468 None => {
469 iter.status()?;
470 break None;
471 }
472 };
473
474 let id = PackageEntryKey::from_slice(key);
475 if id.ty != ArchiveEntryType::Block {
476 iter.next();
478 continue;
479 }
480
481 if blocks.len() >= LIMIT || bytes >= MAX_BYTES {
482 break Some(id.block_id.as_short_id());
483 }
484
485 let file_hash = Boc::file_hash_blake(value);
486 let block_id = id.block_id.make_full(file_hash);
487
488 bytes += value.len();
489 blocks.push(block_id);
490
491 iter.next();
492 };
493
494 Ok((blocks, continuation))
495 }
496
497 pub fn list_archive_ids(&self) -> Vec<u32> {
498 self.archive_ids.read().items.iter().cloned().collect()
499 }
500
501 pub async fn load_block_data_raw_ref<'a>(
502 &'a self,
503 handle: &'a BlockHandle,
504 ) -> Result<impl AsRef<[u8]> + 'a> {
505 if !handle.has_data() {
506 return Err(BlockStorageError::BlockDataNotFound.into());
507 }
508 self.get_data_ref(handle, &PackageEntryKey::block(handle.id()))
509 .await
510 }
511
512 pub fn find_mc_block_data(&self, mc_seqno: u32) -> Result<Option<Block>> {
513 let package_entries = &self.db.package_entries;
514
515 let bound = BlockId {
516 shard: ShardIdent::MASTERCHAIN,
517 seqno: mc_seqno,
518 root_hash: HashBytes::ZERO,
519 file_hash: HashBytes::ZERO,
520 };
521
522 let mut bound = PackageEntryKey::block(&bound);
523
524 let mut readopts = package_entries.new_read_config();
525 readopts.set_iterate_lower_bound(bound.to_vec().into_vec());
526 bound.block_id.seqno += 1;
527 readopts.set_iterate_upper_bound(bound.to_vec().into_vec());
528
529 let mut iter = self
530 .db
531 .rocksdb()
532 .raw_iterator_cf_opt(&package_entries.cf(), readopts);
533
534 iter.seek_to_first();
535 loop {
536 let Some((key, value)) = iter.item() else {
537 iter.status()?;
538 return Ok(None);
539 };
540
541 let Some(ArchiveEntryType::Block) = extract_entry_type(key) else {
542 continue;
543 };
544
545 return Ok(Some(BocRepr::decode::<Block, _>(value)?));
546 }
547 }
548
549 pub async fn store_block_proof(
552 &self,
553 proof: &BlockProofStuffAug,
554 handle: MaybeExistingHandle,
555 ) -> Result<StoreBlockResult> {
556 let block_id = proof.id();
557 if matches!(&handle, MaybeExistingHandle::Existing(handle) if handle.id() != block_id) {
558 return Err(BlockStorageError::BlockHandleIdMismatch.into());
559 }
560
561 let (handle, status) = match handle {
562 MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
563 MaybeExistingHandle::New(meta_data) => self
564 .block_handle_storage
565 .create_or_load_handle(block_id, meta_data),
566 };
567
568 let mut updated = false;
569 let archive_id = PackageEntryKey::proof(block_id);
570 if !handle.has_proof() {
571 let data = proof.as_new_archive_data()?;
572
573 let _lock = handle.proof_data_lock().write().await;
574 if !handle.has_proof() {
575 self.add_data(&archive_id, data)?;
576 if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
577 self.block_handle_storage.store_handle(&handle, false);
578 updated = true;
579 }
580 }
581 }
582
583 Ok(StoreBlockResult {
584 handle,
585 updated,
586 new: status == HandleCreationStatus::Created,
587 })
588 }
589
590 pub async fn load_block_proof(&self, handle: &BlockHandle) -> Result<BlockProofStuff> {
591 let raw_proof = self.load_block_proof_raw_ref(handle).await?;
592 BlockProofStuff::deserialize(handle.id(), raw_proof.as_ref())
593 }
594
595 pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
596 if !handle.has_proof() {
597 return Err(BlockStorageError::BlockProofNotFound.into());
598 }
599
600 self.get_data(handle, &PackageEntryKey::proof(handle.id()))
601 .await
602 }
603
604 pub async fn load_block_proof_raw_ref<'a>(
605 &'a self,
606 handle: &'a BlockHandle,
607 ) -> Result<impl AsRef<[u8]> + 'a> {
608 if !handle.has_proof() {
609 return Err(BlockStorageError::BlockProofNotFound.into());
610 }
611
612 self.get_data_ref(handle, &PackageEntryKey::proof(handle.id()))
613 .await
614 }
615
616 pub async fn store_queue_diff(
619 &self,
620 queue_diff: &QueueDiffStuffAug,
621 handle: MaybeExistingHandle,
622 ) -> Result<StoreBlockResult> {
623 let block_id = queue_diff.block_id();
624 if matches!(&handle, MaybeExistingHandle::Existing(handle) if handle.id() != block_id) {
625 return Err(BlockStorageError::BlockHandleIdMismatch.into());
626 }
627
628 let (handle, status) = match handle {
629 MaybeExistingHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched),
630 MaybeExistingHandle::New(meta_data) => self
631 .block_handle_storage
632 .create_or_load_handle(block_id, meta_data),
633 };
634
635 let mut updated = false;
636 let archive_id = PackageEntryKey::queue_diff(block_id);
637 if !handle.has_queue_diff() {
638 let data = queue_diff.as_new_archive_data()?;
639
640 let _lock = handle.queue_diff_data_lock().write().await;
641 if !handle.has_queue_diff() {
642 self.add_data(&archive_id, data)?;
643 if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
644 self.block_handle_storage.store_handle(&handle, false);
645 updated = true;
646 }
647 }
648 }
649
650 Ok(StoreBlockResult {
651 handle,
652 updated,
653 new: status == HandleCreationStatus::Created,
654 })
655 }
656
657 pub async fn load_queue_diff(&self, handle: &BlockHandle) -> Result<QueueDiffStuff> {
658 let raw_diff = self.load_queue_diff_raw_ref(handle).await?;
659 QueueDiffStuff::deserialize(handle.id(), raw_diff.as_ref())
660 }
661
662 pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
663 if !handle.has_queue_diff() {
664 return Err(BlockStorageError::QueueDiffNotFound.into());
665 }
666
667 self.get_data(handle, &PackageEntryKey::queue_diff(handle.id()))
668 .await
669 }
670
671 pub async fn load_queue_diff_raw_ref<'a>(
672 &'a self,
673 handle: &'a BlockHandle,
674 ) -> Result<impl AsRef<[u8]> + 'a> {
675 if !handle.has_queue_diff() {
676 return Err(BlockStorageError::QueueDiffNotFound.into());
677 }
678 self.get_data_ref(handle, &PackageEntryKey::queue_diff(handle.id()))
679 .await
680 }
681
682 pub async fn move_into_archive(
686 &self,
687 handle: &BlockHandle,
688 mc_is_key_block: bool,
689 ) -> Result<()> {
690 let _histogram = HistogramGuard::begin("tycho_storage_move_into_archive_time");
691
692 let block_id_bytes = handle.id().to_vec();
694
695 let archive_block_ids_cf = self.db.archive_block_ids.cf();
697 let chunks_cf = self.db.archives.cf();
698
699 let archive_id = self.prepare_archive_id(
701 handle.ref_by_mc_seqno(),
702 mc_is_key_block || handle.is_key_block(),
703 );
704 let archive_id_bytes = archive_id.id.to_be_bytes();
705
706 let mut batch = rocksdb::WriteBatch::default();
708
709 batch.merge_cf(&archive_block_ids_cf, archive_id_bytes, &block_id_bytes);
711
712 if archive_id.is_new {
714 let mut key = [0u8; tables::Archives::KEY_LEN];
715 key[..4].copy_from_slice(&archive_id_bytes);
716 key[4..].copy_from_slice(&ARCHIVE_STARTED_MAGIC.to_be_bytes());
717 batch.put_cf(&chunks_cf, key, []);
718 }
719 if let Some(next_id) = archive_id.override_next_id {
721 let mut key = [0u8; tables::Archives::KEY_LEN];
722 key[..4].copy_from_slice(&archive_id_bytes);
723 key[4..].copy_from_slice(&ARCHIVE_OVERRIDE_NEXT_MAGIC.to_be_bytes());
724 batch.put_cf(&chunks_cf, key, next_id.to_le_bytes());
725 }
726 if let Some(to_commit) = archive_id.to_commit {
728 let mut key = [0u8; tables::Archives::KEY_LEN];
729 key[..4].copy_from_slice(&to_commit.to_be_bytes());
730 key[4..].copy_from_slice(&ARCHIVE_TO_COMMIT_MAGIC.to_be_bytes());
731 batch.put_cf(&chunks_cf, key, []);
732 }
733 self.db.rocksdb().write(batch)?;
735
736 tracing::debug!(block_id = %handle.id(), "saved block id into archive");
737 if let Some(to_commit) = archive_id.to_commit {
740 let mut prev_archive_commit = self.prev_archive_commit.lock().await;
742
743 if let Some(task) = &mut *prev_archive_commit {
745 task.finish().await?;
747
748 self.archive_ids_tx.send(task.archive_id).ok();
750 }
751 *prev_archive_commit = Some(self.spawn_commit_archive(to_commit));
752 }
753
754 Ok(())
756 }
757
758 pub async fn wait_for_archive_commit(&self) -> Result<()> {
759 let mut prev_archive_commit = self.prev_archive_commit.lock().await;
760 if let Some(task) = &mut *prev_archive_commit {
761 task.finish().await?;
762 *prev_archive_commit = None;
763 }
764 Ok(())
765 }
766
767 pub fn get_archive_id(&self, mc_seqno: u32) -> ArchiveId {
769 let archive_ids = self.archive_ids.read();
770
771 if !matches!(archive_ids.items.last(), Some(id) if mc_seqno < *id) {
772 return ArchiveId::TooNew;
775 }
776
777 match archive_ids.items.range(..=mc_seqno).next_back() {
778 Some(id) if mc_seqno < id + ARCHIVE_PACKAGE_SIZE => ArchiveId::Found(*id),
782 _ => ArchiveId::NotFound,
783 }
784 }
785
786 pub fn get_archive_size(&self, id: u32) -> Result<Option<usize>> {
787 let mut key = [0u8; tables::Archives::KEY_LEN];
788 key[..4].copy_from_slice(&id.to_be_bytes());
789 key[4..].copy_from_slice(&ARCHIVE_SIZE_MAGIC.to_be_bytes());
790
791 match self.db.archives.get(key.as_slice())? {
792 Some(slice) => Ok(Some(
793 u64::from_le_bytes(slice.as_ref().try_into().unwrap()) as usize
794 )),
795 None => Ok(None),
796 }
797 }
798
799 pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<OwnedPinnableSlice> {
801 let chunk_size = self.archive_chunk_size().get() as u64;
802 if offset % chunk_size != 0 {
803 return Err(BlockStorageError::InvalidOffset.into());
804 }
805
806 let chunk_index = offset / chunk_size;
807
808 let mut key = [0u8; tables::Archives::KEY_LEN];
809 key[..4].copy_from_slice(&id.to_be_bytes());
810 key[4..].copy_from_slice(&chunk_index.to_be_bytes());
811
812 let chunk = self
813 .db
814 .archives
815 .get(key.as_slice())?
816 .ok_or(BlockStorageError::ArchiveNotFound)?;
817
818 Ok(unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), chunk) })
820 }
821
822 pub fn get_block_data_size(&self, block_id: &BlockId) -> Result<Option<u32>> {
823 let key = BlockDataEntryKey {
824 block_id: block_id.into(),
825 chunk_index: BLOCK_DATA_SIZE_MAGIC,
826 };
827 let size = self
828 .db
829 .block_data_entries
830 .get(key.to_vec())?
831 .map(|slice| u32::from_le_bytes(slice.as_ref().try_into().unwrap()));
832
833 Ok(size)
834 }
835
836 pub fn get_block_data_chunk(
837 &self,
838 block_id: &BlockId,
839 offset: u32,
840 ) -> Result<Option<OwnedPinnableSlice>> {
841 let chunk_size = self.block_data_chunk_size().get();
842 if offset % chunk_size != 0 {
843 return Err(BlockStorageError::InvalidOffset.into());
844 }
845
846 let key = BlockDataEntryKey {
847 block_id: block_id.into(),
848 chunk_index: offset / chunk_size,
849 };
850
851 Ok(self.db.block_data_entries.get(key.to_vec())?.map(|value| {
852 unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), value) }
854 }))
855 }
856
857 pub fn subscribe_to_archive_ids(&self) -> broadcast::Receiver<u32> {
858 self.archive_ids_tx.subscribe()
859 }
860
861 pub fn archive_chunks_iterator(&self, archive_id: u32) -> rocksdb::DBRawIterator<'_> {
862 let mut from = [0u8; tables::Archives::KEY_LEN];
863 from[..4].copy_from_slice(&archive_id.to_be_bytes());
864
865 let mut to = [0u8; tables::Archives::KEY_LEN];
866 to[..4].copy_from_slice(&archive_id.to_be_bytes());
867 to[4..].copy_from_slice(&ARCHIVE_MAGIC_MIN.to_be_bytes());
868
869 let mut read_opts = self.db.archives.new_read_config();
870 read_opts.set_iterate_upper_bound(to.as_slice());
871
872 let rocksdb = self.db.rocksdb();
873 let archives_cf = self.db.archives.cf();
874
875 let mut raw_iterator = rocksdb.raw_iterator_cf_opt(&archives_cf, read_opts);
876 raw_iterator.seek(from);
877
878 raw_iterator
879 }
880
881 #[tracing::instrument(skip(self, max_blocks_per_batch))]
884 pub async fn remove_outdated_blocks(
885 &self,
886 mc_seqno: u32,
887 max_blocks_per_batch: Option<usize>,
888 ) -> Result<()> {
889 if mc_seqno == 0 {
890 return Ok(());
891 }
892
893 tracing::info!("started blocks GC for mc_block {mc_seqno}");
894
895 let block = self
896 .find_mc_block_data(mc_seqno)
897 .context("failed to load target block data")?
898 .context("target block not found")?;
899
900 let shard_heights = {
901 let extra = block.extra.load()?;
902 let custom = extra.custom.context("mc block extra not found")?.load()?;
903 custom
904 .shards
905 .latest_blocks()
906 .map(|id| id.map(|id| (id.shard, id.seqno)))
907 .collect::<Result<_, tycho_types::error::Error>>()?
908 };
909
910 let total_cached_handles_removed = self
912 .block_handle_storage
913 .gc_handles_cache(mc_seqno, &shard_heights);
914
915 let cancelled = CancellationFlag::new();
916 scopeguard::defer! {
917 cancelled.cancel();
918 }
919
920 let span = tracing::Span::current();
921 let cancelled = cancelled.clone();
922 let db = self.db.clone();
923
924 let BlockGcStats {
925 mc_blocks_removed,
926 total_blocks_removed,
927 } = rayon_run(move || {
928 let _span = span.enter();
929
930 let guard = scopeguard::guard((), |_| {
931 tracing::warn!("cancelled");
932 });
933
934 let stats = remove_blocks(
935 db,
936 max_blocks_per_batch,
937 mc_seqno,
938 shard_heights,
939 Some(&cancelled),
940 )?;
941
942 scopeguard::ScopeGuard::into_inner(guard);
943 Ok::<_, anyhow::Error>(stats)
944 })
945 .await?;
946
947 tracing::info!(
948 total_cached_handles_removed,
949 mc_blocks_removed,
950 total_blocks_removed,
951 "finished blocks GC"
952 );
953 Ok(())
954 }
955
956 #[tracing::instrument(skip(self))]
957 pub fn remove_outdated_archives(&self, until_id: u32) -> Result<()> {
958 tracing::trace!("started archives GC");
959
960 let mut archive_ids = self.archive_ids.write();
961
962 let retained_ids = match archive_ids
963 .items
964 .iter()
965 .rev()
966 .find(|&id| *id < until_id)
967 .cloned()
968 {
969 Some(until_id) => archive_ids.items.split_off(&until_id),
972 None => {
973 tracing::trace!("nothing to remove");
974 return Ok(());
975 }
976 };
977 let removed_ids = std::mem::replace(&mut archive_ids.items, retained_ids);
979
980 let (Some(first), Some(last)) = (removed_ids.first(), removed_ids.last()) else {
982 tracing::info!("nothing to remove");
983 return Ok(());
984 };
985
986 let len = removed_ids.len();
987 let until_id = match archive_ids.items.first() {
988 Some(until_id) => *until_id,
989 None => *last + 1,
990 };
991
992 drop(archive_ids);
993
994 let archives_cf = self.db.archives.cf();
996 let write_options = self.db.archives.write_config();
997
998 let start_key = [0u8; tables::Archives::KEY_LEN];
999
1000 let mut end_key = [0u8; tables::Archives::KEY_LEN];
1003 end_key[..4].copy_from_slice(&until_id.to_be_bytes());
1004 end_key[4..].copy_from_slice(&[0; 8]);
1005
1006 self.db
1007 .rocksdb()
1008 .delete_range_cf_opt(&archives_cf, start_key, end_key, write_options)?;
1009
1010 tracing::info!(archive_count = len, first, last, "finished archives GC");
1011 Ok(())
1012 }
1013
1014 fn add_data(&self, id: &PackageEntryKey, data: &[u8]) -> Result<(), rocksdb::Error> {
1017 self.db.package_entries.insert(id.to_vec(), data)
1018 }
1019
1020 async fn add_block_data_and_split(&self, id: &PackageEntryKey, data: &[u8]) -> Result<()> {
1021 let mut batch = rocksdb::WriteBatch::default();
1022
1023 batch.put_cf(&self.db.package_entries.cf(), id.to_vec(), data);
1024
1025 let key = BlockDataEntryKey {
1027 block_id: id.block_id,
1028 chunk_index: BLOCK_DATA_STARTED_MAGIC,
1029 };
1030 batch.put_cf(&self.db.block_data_entries.cf(), key.to_vec(), []);
1031
1032 self.db.rocksdb().write(batch)?;
1033
1034 let permit = self.split_block_semaphore.clone().acquire_owned().await?;
1036 let _handle = self.spawn_split_block_data(&id.block_id, data, permit);
1037
1038 Ok(())
1039 }
1040
1041 async fn get_data(
1042 &self,
1043 handle: &BlockHandle,
1044 id: &PackageEntryKey,
1045 ) -> Result<OwnedPinnableSlice> {
1046 let _lock = match id.ty {
1047 ArchiveEntryType::Block => handle.block_data_lock(),
1048 ArchiveEntryType::Proof => handle.proof_data_lock(),
1049 ArchiveEntryType::QueueDiff => handle.queue_diff_data_lock(),
1050 }
1051 .read()
1052 .await;
1053
1054 match self.db.package_entries.get(id.to_vec())? {
1055 Some(value) => Ok(unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), value) }),
1057 None => Err(BlockStorageError::PackageEntryNotFound.into()),
1058 }
1059 }
1060
1061 async fn get_data_ref<'a, 'b: 'a>(
1062 &'a self,
1063 handle: &'b BlockHandle,
1064 id: &PackageEntryKey,
1065 ) -> Result<FullBlockDataGuard<'a>> {
1066 let lock = match id.ty {
1067 ArchiveEntryType::Block => handle.block_data_lock(),
1068 ArchiveEntryType::Proof => handle.proof_data_lock(),
1069 ArchiveEntryType::QueueDiff => handle.queue_diff_data_lock(),
1070 }
1071 .read()
1072 .await;
1073
1074 match self.db.package_entries.get(id.to_vec())? {
1075 Some(data) => Ok(FullBlockDataGuard { _lock: lock, data }),
1076 None => Err(BlockStorageError::PackageEntryNotFound.into()),
1077 }
1078 }
1079
1080 fn prepare_archive_id(&self, mc_seqno: u32, force_split_archive: bool) -> PreparedArchiveId {
1081 let mut archive_ids = self.archive_ids.write();
1082
1083 let prev_id = archive_ids.items.range(..=mc_seqno).next_back().cloned();
1085
1086 if force_split_archive {
1087 archive_ids.override_next_id = Some(mc_seqno + 1);
1088 } else if let Some(next_id) = archive_ids.override_next_id {
1089 match mc_seqno.cmp(&next_id) {
1090 std::cmp::Ordering::Less => {}
1091 std::cmp::Ordering::Equal => {
1092 let is_new = archive_ids.items.insert(mc_seqno);
1093 return PreparedArchiveId {
1094 id: mc_seqno,
1095 is_new,
1096 override_next_id: None,
1097 to_commit: if is_new { prev_id } else { None },
1098 };
1099 }
1100 std::cmp::Ordering::Greater => {
1101 archive_ids.override_next_id = None;
1102 }
1103 }
1104 }
1105
1106 let mut archive_id = PreparedArchiveId {
1107 id: prev_id.unwrap_or_default(),
1108 override_next_id: archive_ids.override_next_id,
1109 ..Default::default()
1110 };
1111
1112 let is_first_archive = prev_id.is_none();
1113 if is_first_archive || mc_seqno.saturating_sub(archive_id.id) >= ARCHIVE_PACKAGE_SIZE {
1114 let is_new = archive_ids.items.insert(mc_seqno);
1115 archive_id = PreparedArchiveId {
1116 id: mc_seqno,
1117 is_new,
1118 override_next_id: None,
1119 to_commit: if is_new { prev_id } else { None },
1120 };
1121 }
1122
1123 debug_assert!(mc_seqno - archive_id.id <= ARCHIVE_PACKAGE_SIZE);
1125
1126 archive_id
1127 }
1128
1129 fn clear_archive(&self, archive_id: u32) -> Result<()> {
1130 let archives_cf = self.db.archives.cf();
1131 let write_options = self.db.archives.write_config();
1132
1133 let mut start_key = [0u8; tables::Archives::KEY_LEN];
1134 start_key[..4].copy_from_slice(&archive_id.to_be_bytes());
1135 start_key[4..].fill(0x00);
1136
1137 let mut end_key = [0u8; tables::Archives::KEY_LEN];
1138 end_key[..4].copy_from_slice(&archive_id.to_be_bytes());
1139 end_key[4..].fill(0xFF);
1140
1141 self.db
1142 .rocksdb()
1143 .delete_range_cf_opt(&archives_cf, start_key, end_key, write_options)?;
1144
1145 Ok(())
1146 }
1147
1148 #[tracing::instrument(skip(self))]
1149 fn spawn_commit_archive(&self, archive_id: u32) -> CommitArchiveTask {
1150 let db = self.db.clone();
1151 let block_handle_storage = self.block_handle_storage.clone();
1152 let chunk_size = self.archive_chunk_size().get() as u64;
1153
1154 let span = tracing::Span::current();
1155 let cancelled = CancellationFlag::new();
1156
1157 let handle = tokio::task::spawn_blocking({
1158 let cancelled = cancelled.clone();
1159
1160 move || {
1161 let _span = span.enter();
1162
1163 let histogram = HistogramGuard::begin("tycho_storage_commit_archive_time");
1164
1165 tracing::info!("started");
1166 let guard = scopeguard::guard((), |_| {
1167 tracing::warn!("cancelled");
1168 });
1169
1170 let raw_block_ids = db
1171 .archive_block_ids
1172 .get(archive_id.to_be_bytes())?
1173 .ok_or(BlockStorageError::ArchiveNotFound)?;
1174 assert_eq!(raw_block_ids.len() % BlockId::SIZE_HINT, 0);
1175
1176 let mut writer = ArchiveWriter::new(&db, archive_id, chunk_size)?;
1177 let mut header_buffer = Vec::with_capacity(ARCHIVE_ENTRY_HEADER_LEN);
1178
1179 writer.write(&ARCHIVE_PREFIX)?;
1181
1182 let mut unique_ids = FastHashSet::default();
1184 for ty in [
1185 ArchiveEntryType::Block,
1186 ArchiveEntryType::Proof,
1187 ArchiveEntryType::QueueDiff,
1188 ] {
1189 for raw_block_id in raw_block_ids.chunks_exact(BlockId::SIZE_HINT) {
1190 anyhow::ensure!(!cancelled.check(), "task aborted");
1191
1192 let block_id = BlockId::from_slice(raw_block_id);
1193 if !unique_ids.insert(block_id) {
1194 tracing::warn!(%block_id, "skipped duplicate block id");
1195 continue;
1196 }
1197
1198 if ty == ArchiveEntryType::Block {
1200 let handle = block_handle_storage
1201 .load_handle(&block_id)
1202 .ok_or(BlockStorageError::BlockHandleNotFound)?;
1203
1204 let flags = handle.meta().flags();
1205 anyhow::ensure!(
1206 flags.contains(BlockFlags::HAS_ALL_BLOCK_PARTS),
1207 "block does not have all parts: {block_id}, \
1208 has_data={}, has_proof={}, queue_diff={}",
1209 flags.contains(BlockFlags::HAS_DATA),
1210 flags.contains(BlockFlags::HAS_PROOF),
1211 flags.contains(BlockFlags::HAS_QUEUE_DIFF)
1212 );
1213 }
1214
1215 let key = PackageEntryKey::from((block_id, ty));
1216 let Some(data) = db.package_entries.get(key.to_vec()).unwrap() else {
1217 return Err(BlockStorageError::BlockDataNotFound.into());
1218 };
1219
1220 header_buffer.clear();
1222 ArchiveEntryHeader {
1223 block_id,
1224 ty,
1225 data_len: data.len() as u32,
1226 }
1227 .write_to(&mut header_buffer);
1228
1229 writer.write(&header_buffer)?;
1231 writer.write(data.as_ref())?;
1232 }
1233
1234 unique_ids.clear();
1235 }
1236
1237 drop(raw_block_ids);
1239
1240 writer.finalize()?;
1242
1243 scopeguard::ScopeGuard::into_inner(guard);
1245 tracing::info!(
1246 elapsed = %humantime::format_duration(histogram.finish()),
1247 "finished"
1248 );
1249
1250 Ok(())
1251 }
1252 });
1253
1254 CommitArchiveTask {
1255 archive_id,
1256 cancelled,
1257 handle: Some(handle),
1258 }
1259 }
1260
1261 #[tracing::instrument(skip(self, data))]
1262 fn spawn_split_block_data(
1263 &self,
1264 block_id: &PartialBlockId,
1265 data: &[u8],
1266 permit: OwnedSemaphorePermit,
1267 ) -> JoinHandle<Result<()>> {
1268 let db = self.db.clone();
1269 let chunk_size = self.block_data_chunk_size().get() as usize;
1270
1271 let span = tracing::Span::current();
1272 tokio::task::spawn_blocking({
1273 let block_id = *block_id;
1274 let data = data.to_vec();
1275
1276 move || {
1277 let _span = span.enter();
1278
1279 let _histogram = HistogramGuard::begin("tycho_storage_split_block_data_time");
1280
1281 let mut compressed = Vec::new();
1282 tycho_util::compression::zstd_compress(&data, &mut compressed, 3);
1283
1284 let chunks = compressed.chunks(chunk_size);
1285 for (index, chunk) in chunks.enumerate() {
1286 let key = BlockDataEntryKey {
1287 block_id,
1288 chunk_index: index as u32,
1289 };
1290
1291 db.block_data_entries.insert(key.to_vec(), chunk)?;
1292 }
1293
1294 let key = BlockDataEntryKey {
1295 block_id,
1296 chunk_index: BLOCK_DATA_SIZE_MAGIC,
1297 };
1298 db.block_data_entries
1299 .insert(key.to_vec(), (compressed.len() as u32).to_le_bytes())?;
1300
1301 drop(permit);
1302
1303 Ok(())
1304 }
1305 })
1306 }
1307}
1308
1309struct CommitArchiveTask {
1310 archive_id: u32,
1311 cancelled: CancellationFlag,
1312 handle: Option<JoinHandle<Result<()>>>,
1313}
1314
1315impl CommitArchiveTask {
1316 async fn finish(&mut self) -> Result<()> {
1317 if let Some(handle) = &mut self.handle {
1319 if let Err(e) = handle
1320 .await
1321 .map_err(|e| {
1322 if e.is_panic() {
1323 std::panic::resume_unwind(e.into_panic());
1324 }
1325 anyhow::Error::from(e)
1326 })
1327 .and_then(std::convert::identity)
1328 {
1329 tracing::error!(
1330 archive_id = self.archive_id,
1331 "failed to commit archive: {e:?}"
1332 );
1333 }
1334
1335 self.handle = None;
1336 }
1337
1338 Ok(())
1339 }
1340}
1341
1342impl Drop for CommitArchiveTask {
1343 fn drop(&mut self) {
1344 self.cancelled.cancel();
1345 if let Some(handle) = &self.handle {
1346 handle.abort();
1347 }
1348 }
1349}
1350
1351struct ArchiveWriter<'a> {
1352 db: &'a CoreDb,
1353 archive_id: u32,
1354 chunk_len: usize,
1355 total_len: u64,
1356 chunk_index: u64,
1357 chunks_buffer: Vec<u8>,
1358 zstd_compressor: ZstdCompressStream<'a>,
1359}
1360
1361impl<'a> ArchiveWriter<'a> {
1362 fn new(db: &'a CoreDb, archive_id: u32, chunk_len: u64) -> Result<Self> {
1363 let chunk_len = chunk_len as usize;
1364
1365 let mut zstd_compressor = ZstdCompressStream::new(9, chunk_len)?;
1366
1367 let workers = (std::thread::available_parallelism()?.get() / 4) as u8;
1368 zstd_compressor.multithreaded(workers)?;
1369
1370 Ok(Self {
1371 db,
1372 archive_id,
1373 chunk_len,
1374 total_len: 0,
1375 chunk_index: 0,
1376 chunks_buffer: Vec::with_capacity(chunk_len),
1377 zstd_compressor,
1378 })
1379 }
1380
1381 fn write(&mut self, data: &[u8]) -> Result<()> {
1382 self.zstd_compressor.write(data, &mut self.chunks_buffer)?;
1383 self.flush(false)
1384 }
1385
1386 fn finalize(mut self) -> Result<()> {
1387 self.zstd_compressor.finish(&mut self.chunks_buffer)?;
1388
1389 self.flush(true)?;
1391 debug_assert!(self.chunks_buffer.is_empty());
1392
1393 let archives_cf = self.db.archives.cf();
1395 let block_ids_cf = self.db.archive_block_ids.cf();
1396
1397 let mut batch = rocksdb::WriteBatch::default();
1398
1399 let mut key = [0u8; tables::Archives::KEY_LEN];
1401 key[..4].copy_from_slice(&self.archive_id.to_be_bytes());
1402 key[4..].copy_from_slice(&ARCHIVE_SIZE_MAGIC.to_be_bytes());
1403 batch.put_cf(&archives_cf, key.as_slice(), self.total_len.to_le_bytes());
1404
1405 batch.delete_cf(&block_ids_cf, self.archive_id.to_be_bytes());
1407
1408 self.db.rocksdb().write(batch)?;
1409 Ok(())
1410 }
1411
1412 fn flush(&mut self, finalize: bool) -> Result<()> {
1413 let buffer_len = self.chunks_buffer.len();
1414 if buffer_len == 0 {
1415 return Ok(());
1416 }
1417
1418 let mut key = [0u8; tables::Archives::KEY_LEN];
1419 key[..4].copy_from_slice(&self.archive_id.to_be_bytes());
1420
1421 let mut do_flush = |data: &[u8]| {
1422 key[4..].copy_from_slice(&self.chunk_index.to_be_bytes());
1423
1424 self.total_len += data.len() as u64;
1425 self.chunk_index += 1;
1426
1427 self.db.archives.insert(key, data)
1428 };
1429
1430 let mut buffer_offset = 0;
1432 while buffer_offset + self.chunk_len <= buffer_len {
1433 do_flush(&self.chunks_buffer[buffer_offset..buffer_offset + self.chunk_len])?;
1434 buffer_offset += self.chunk_len;
1435 }
1436
1437 if finalize {
1438 do_flush(&self.chunks_buffer[buffer_offset..])?;
1440 self.chunks_buffer.clear();
1441 } else {
1442 let rem = buffer_len % self.chunk_len;
1444 if rem == 0 {
1445 self.chunks_buffer.clear();
1446 } else if buffer_offset > 0 {
1447 self.chunks_buffer.copy_within(buffer_offset.., 0);
1449 self.chunks_buffer.truncate(rem);
1450 }
1451 }
1452
1453 Ok(())
1454 }
1455}
1456
1457#[derive(Clone)]
1458pub enum MaybeExistingHandle {
1459 Existing(BlockHandle),
1460 New(NewBlockMeta),
1461}
1462
1463impl From<BlockHandle> for MaybeExistingHandle {
1464 fn from(handle: BlockHandle) -> Self {
1465 Self::Existing(handle)
1466 }
1467}
1468
1469impl From<NewBlockMeta> for MaybeExistingHandle {
1470 fn from(meta_data: NewBlockMeta) -> Self {
1471 Self::New(meta_data)
1472 }
1473}
1474
1475pub struct StoreBlockResult {
1476 pub handle: BlockHandle,
1477 pub updated: bool,
1478 pub new: bool,
1479}
1480
1481#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1482pub enum ArchiveId {
1483 Found(u32),
1484 TooNew,
1485 NotFound,
1486}
1487
1488#[derive(Default)]
1489struct ArchiveIds {
1490 items: BTreeSet<u32>,
1491 override_next_id: Option<u32>,
1492}
1493
1494fn remove_blocks(
1495 db: CoreDb,
1496 max_blocks_per_batch: Option<usize>,
1497 mc_seqno: u32,
1498 shard_heights: ShardHeights,
1499 cancelled: Option<&CancellationFlag>,
1500) -> Result<BlockGcStats> {
1501 let mut stats = BlockGcStats::default();
1502
1503 let raw = db.rocksdb().as_ref();
1504 let full_block_ids_cf = db.full_block_ids.cf();
1505 let block_connections_cf = db.block_connections.cf();
1506 let package_entries_cf = db.package_entries.cf();
1507 let block_data_entries_cf = db.block_data_entries.cf();
1508 let block_handles_cf = db.block_handles.cf();
1509
1510 let mut batch = rocksdb::WriteBatch::default();
1512 let mut batch_len = 0;
1513
1514 let mut blocks_iter =
1516 raw.raw_iterator_cf_opt(&full_block_ids_cf, db.full_block_ids.new_read_config());
1517 blocks_iter.seek_to_first();
1518
1519 let block_handles_readopts = db.block_handles.new_read_config();
1520 let is_persistent = |root_hash: &[u8; 32]| -> Result<bool> {
1521 const FLAGS: u64 =
1522 ((BlockFlags::IS_KEY_BLOCK.bits() | BlockFlags::IS_PERSISTENT.bits()) as u64) << 32;
1523
1524 let Some(value) =
1525 raw.get_pinned_cf_opt(&block_handles_cf, root_hash, &block_handles_readopts)?
1526 else {
1527 return Ok(false);
1528 };
1529 Ok(value.as_ref().get_u64_le() & FLAGS != 0)
1530 };
1531
1532 let mut key_buffer = [0u8; tables::PackageEntries::KEY_LEN];
1533 let mut delete_range =
1534 |batch: &mut rocksdb::WriteBatch, from: &BlockIdShort, to: &BlockIdShort| {
1535 debug_assert_eq!(from.shard, to.shard);
1536 debug_assert!(from.seqno <= to.seqno);
1537
1538 let range_from = &mut key_buffer;
1539 range_from[..4].copy_from_slice(&from.shard.workchain().to_be_bytes());
1540 range_from[4..12].copy_from_slice(&from.shard.prefix().to_be_bytes());
1541 range_from[12..16].copy_from_slice(&from.seqno.to_be_bytes());
1542
1543 let mut range_to = *range_from;
1544 range_to[12..16].copy_from_slice(&to.seqno.saturating_add(1).to_be_bytes());
1545
1546 batch.delete_range_cf(&full_block_ids_cf, &*range_from, &range_to);
1553 batch.delete_range_cf(&package_entries_cf, &*range_from, &range_to);
1554 batch.delete_range_cf(&block_data_entries_cf, &*range_from, &range_to);
1555 batch.delete_range_cf(&block_connections_cf, &*range_from, &range_to);
1556
1557 tracing::debug!(%from, %to, "delete_range");
1558 };
1559
1560 let mut cancelled = cancelled.map(|c| c.debounce(100));
1561 let mut current_range = None::<(BlockIdShort, BlockIdShort)>;
1562 loop {
1563 let key = match blocks_iter.key() {
1564 Some(key) => key,
1565 None => break blocks_iter.status()?,
1566 };
1567
1568 if let Some(cancelled) = &mut cancelled {
1569 if cancelled.check() {
1570 anyhow::bail!("blocks GC cancelled");
1571 }
1572 }
1573
1574 let block_id = BlockIdShort::from_slice(key);
1581 let root_hash: &[u8; 32] = key[16..48].try_into().unwrap();
1582 let is_masterchain = block_id.shard.is_masterchain();
1583
1584 if block_id.seqno == 0
1586 || is_masterchain && block_id.seqno >= mc_seqno
1587 || !is_masterchain
1588 && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
1589 || is_persistent(root_hash)?
1590 {
1591 if let Some((from, to)) = current_range.take() {
1593 delete_range(&mut batch, &from, &to);
1594 batch_len += 1; }
1596 blocks_iter.next();
1597 continue;
1598 }
1599
1600 match &mut current_range {
1601 Some((from, to)) if from.shard != block_id.shard => {
1603 delete_range(&mut batch, from, to);
1604 *from = block_id;
1605 *to = block_id;
1606 }
1607 Some((_, to)) => *to = block_id,
1609 None => current_range = Some((block_id, block_id)),
1611 }
1612
1613 stats.total_blocks_removed += 1;
1615 if is_masterchain {
1616 stats.mc_blocks_removed += 1;
1617 }
1618
1619 batch.delete_cf(&block_handles_cf, root_hash);
1620
1621 batch_len += 1;
1622 if matches!(
1623 max_blocks_per_batch,
1624 Some(max_blocks_per_batch) if batch_len >= max_blocks_per_batch
1625 ) {
1626 tracing::info!(
1627 total_blocks_removed = stats.total_blocks_removed,
1628 "applying intermediate batch",
1629 );
1630 let batch = std::mem::take(&mut batch);
1631 raw.write(batch)?;
1632 batch_len = 0;
1633 }
1634
1635 blocks_iter.next();
1636 }
1637
1638 if let Some((from, to)) = current_range.take() {
1639 delete_range(&mut batch, &from, &to);
1640 batch_len += 1; }
1642
1643 if batch_len > 0 {
1644 tracing::info!("applying final batch");
1645 raw.write(batch)?;
1646 }
1647
1648 Ok(stats)
1650}
1651
1652pub struct BlockStorageConfig {
1653 pub archive_chunk_size: ByteSize,
1654 pub blocks_cache: BlocksCacheConfig,
1655 pub split_block_tasks: usize,
1656}
1657
1658#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
1659pub struct BlockGcStats {
1660 pub mc_blocks_removed: usize,
1661 pub total_blocks_removed: usize,
1662}
1663
1664struct FullBlockDataGuard<'a> {
1665 _lock: BlockDataGuard<'a>,
1666 data: rocksdb::DBPinnableSlice<'a>,
1667}
1668
1669impl AsRef<[u8]> for FullBlockDataGuard<'_> {
1670 fn as_ref(&self) -> &[u8] {
1671 self.data.as_ref()
1672 }
1673}
1674
1675fn extract_entry_type(key: &[u8]) -> Option<ArchiveEntryType> {
1676 key.get(48).copied().and_then(ArchiveEntryType::from_byte)
1677}
1678
1679const ARCHIVE_PACKAGE_SIZE: u32 = 100;
1680const ARCHIVE_SIZE_MAGIC: u64 = u64::MAX;
1682const ARCHIVE_TO_COMMIT_MAGIC: u64 = u64::MAX - 1;
1684const ARCHIVE_OVERRIDE_NEXT_MAGIC: u64 = u64::MAX - 2;
1686const ARCHIVE_STARTED_MAGIC: u64 = u64::MAX - 3;
1688
1689const ARCHIVE_MAGIC_MIN: u64 = u64::MAX & !0xff;
1690
1691const BLOCK_DATA_CHUNK_SIZE: u32 = 1024 * 1024; const BLOCK_DATA_SIZE_MAGIC: u32 = u32::MAX;
1695const BLOCK_DATA_STARTED_MAGIC: u32 = u32::MAX - 2;
1697
1698#[derive(Default)]
1699struct PreparedArchiveId {
1700 id: u32,
1701 is_new: bool,
1702 override_next_id: Option<u32>,
1703 to_commit: Option<u32>,
1704}
1705
1706type ArchiveIdsTx = broadcast::Sender<u32>;
1707type BlocksCache = moka::sync::Cache<BlockId, BlockStuff, FastHasherState>;
1708
1709#[derive(thiserror::Error, Debug)]
1710enum BlockStorageError {
1711 #[error("Archive not found")]
1712 ArchiveNotFound,
1713 #[error("Block data not found")]
1714 BlockDataNotFound,
1715 #[error("Block proof not found")]
1716 BlockProofNotFound,
1717 #[error("Queue diff not found")]
1718 QueueDiffNotFound,
1719 #[error("Block handle id mismatch")]
1720 BlockHandleIdMismatch,
1721 #[error("Block handle not found")]
1722 BlockHandleNotFound,
1723 #[error("Package entry not found")]
1724 PackageEntryNotFound,
1725 #[error("Offset is outside of the archive slice")]
1726 InvalidOffset,
1727}
1728
1729#[cfg(test)]
1730mod tests {
1731 use std::pin::pin;
1732
1733 use tycho_block_util::archive::WithArchiveData;
1734 use tycho_storage::StorageContext;
1735 use tycho_util::FastHashMap;
1736 use tycho_util::futures::JoinTask;
1737
1738 use super::*;
1739 use crate::storage::{BlockConnection, CoreStorage, CoreStorageConfig};
1740
1741 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1742 async fn parallel_store_data() -> Result<()> {
1743 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1744 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1745
1746 let shard = ShardIdent::MASTERCHAIN;
1747 for seqno in 0..1000 {
1748 let block = BlockStuff::new_empty(shard, seqno);
1749 let block = {
1750 let data = BocRepr::encode_rayon(block.as_ref()).unwrap();
1751 WithArchiveData::new(block, data)
1752 };
1753 let block_id = block.id();
1754
1755 let proof = BlockProofStuff::new_empty(block_id);
1756 let proof = {
1757 let data = BocRepr::encode_rayon(proof.as_ref()).unwrap();
1758 WithArchiveData::new(proof, data)
1759 };
1760
1761 let queue_diff = QueueDiffStuff::builder(shard, seqno, &HashBytes::ZERO)
1762 .serialize()
1763 .build(block_id);
1764
1765 let block_meta = NewBlockMeta {
1766 is_key_block: shard.is_masterchain() && seqno == 0,
1767 gen_utime: 0,
1768 ref_by_mc_seqno: seqno,
1769 };
1770
1771 let store_block_data = || {
1772 let storage = storage.clone();
1773 JoinTask::new(async move {
1774 let res = storage
1775 .block_storage()
1776 .store_block_data(&block, &block.archive_data, block_meta)
1777 .await?;
1778
1779 Ok::<_, anyhow::Error>(res.handle)
1780 })
1781 };
1782
1783 let store_proof_and_queue = || {
1784 let storage = storage.clone();
1785 let proof = proof.clone();
1786 let queue_diff = queue_diff.clone();
1787 JoinTask::new(async move {
1788 if rand::random::<bool>() {
1789 tokio::task::yield_now().await;
1790 }
1791
1792 let res = storage
1793 .block_storage()
1794 .store_block_proof(&proof, MaybeExistingHandle::New(block_meta))
1795 .await?;
1796
1797 if rand::random::<bool>() {
1798 tokio::task::yield_now().await;
1799 }
1800
1801 let res = storage
1802 .block_storage()
1803 .store_queue_diff(&queue_diff, res.handle.into())
1804 .await?;
1805
1806 if rand::random::<bool>() {
1807 tokio::task::yield_now().await;
1808 }
1809
1810 Ok::<_, anyhow::Error>(res.handle)
1811 })
1812 };
1813
1814 let (data_res, proof_and_queue_res) = async move {
1815 let data_fut = pin!(store_block_data());
1816 let proof_and_queue_fut = pin!(async {
1817 tokio::select! {
1818 left = store_proof_and_queue() => left,
1819 right = store_proof_and_queue() => right,
1820 }
1821 });
1822
1823 let (data, other) = futures_util::future::join(data_fut, proof_and_queue_fut).await;
1824
1825 Ok::<_, anyhow::Error>((data?, other?))
1826 }
1827 .await?;
1828
1829 assert!(std::ptr::addr_eq(
1830 arc_swap::RefCnt::as_ptr(&data_res),
1831 arc_swap::RefCnt::as_ptr(&proof_and_queue_res)
1832 ));
1833 assert!(data_res.has_all_block_parts());
1834 }
1835
1836 Ok(())
1837 }
1838
1839 #[tokio::test]
1840 async fn blocks_gc() -> Result<()> {
1841 const GARBAGE: &[u8] = b"garbage";
1842 const ENTRY_TYPES: [ArchiveEntryType; 3] = [
1843 ArchiveEntryType::Block,
1844 ArchiveEntryType::Proof,
1845 ArchiveEntryType::QueueDiff,
1846 ];
1847 const CONNECTION_TYPES: [BlockConnection; 2] =
1848 [BlockConnection::Prev1, BlockConnection::Next1];
1849
1850 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1851 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1852
1853 let blocks = storage.block_storage();
1854 let block_handles = storage.block_handle_storage();
1855 let block_connections = storage.block_connection_storage();
1856
1857 let mut shard_block_ids = FastHashMap::<ShardIdent, Vec<BlockId>>::default();
1858
1859 for shard in [ShardIdent::MASTERCHAIN, ShardIdent::BASECHAIN] {
1860 let entry = shard_block_ids.entry(shard).or_default();
1861
1862 for seqno in 0..100 {
1863 let block_id = BlockId {
1864 shard,
1865 seqno,
1866 root_hash: HashBytes(rand::random()),
1867 file_hash: HashBytes(rand::random()),
1868 };
1869 entry.push(block_id);
1870
1871 let (handle, _) = block_handles.create_or_load_handle(&block_id, NewBlockMeta {
1872 is_key_block: shard.is_masterchain() && seqno == 0,
1873 gen_utime: 0,
1874 ref_by_mc_seqno: seqno,
1875 });
1876
1877 for ty in ENTRY_TYPES {
1878 blocks.add_data(&(block_id, ty).into(), GARBAGE)?;
1879 }
1880 for direction in CONNECTION_TYPES {
1881 block_connections.store_connection(&handle, direction, &block_id);
1882 }
1883
1884 handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
1885 block_handles.store_handle(&handle, false);
1886 }
1887 }
1888
1889 let stats = remove_blocks(
1891 blocks.db.clone(),
1892 None,
1893 70,
1894 [(ShardIdent::BASECHAIN, 50)].into(),
1895 None,
1896 )?;
1897 assert_eq!(stats, BlockGcStats {
1898 mc_blocks_removed: 69,
1899 total_blocks_removed: 69 + 49,
1900 });
1901
1902 let removed_ranges = FastHashMap::from_iter([
1903 (ShardIdent::MASTERCHAIN, vec![1..=69]),
1904 (ShardIdent::BASECHAIN, vec![1..=49]),
1905 ]);
1906 for (shard, block_ids) in shard_block_ids {
1907 let removed_ranges = removed_ranges.get(&shard).unwrap();
1908
1909 for block_id in block_ids {
1910 let must_be_removed = 'removed: {
1911 for range in removed_ranges {
1912 if range.contains(&block_id.seqno) {
1913 break 'removed true;
1914 }
1915 }
1916 false
1917 };
1918
1919 let handle = block_handles.load_handle(&block_id);
1920 assert_eq!(handle.is_none(), must_be_removed);
1921
1922 for ty in ENTRY_TYPES {
1923 let key = PackageEntryKey::from((block_id, ty));
1924 let stored = blocks.db.package_entries.get(key.to_vec())?;
1925 assert_eq!(stored.is_none(), must_be_removed);
1926 }
1927
1928 for direction in CONNECTION_TYPES {
1929 let connection = block_connections.load_connection(&block_id, direction);
1930 assert_eq!(connection.is_none(), must_be_removed);
1931 }
1932 }
1933 }
1934
1935 let stats = remove_blocks(
1937 blocks.db.clone(),
1938 None,
1939 71,
1940 [(ShardIdent::BASECHAIN, 51)].into(),
1941 None,
1942 )?;
1943 assert_eq!(stats, BlockGcStats {
1944 mc_blocks_removed: 1,
1945 total_blocks_removed: 2,
1946 });
1947
1948 let stats = remove_blocks(
1950 blocks.db.clone(),
1951 None,
1952 71,
1953 [(ShardIdent::BASECHAIN, 51)].into(),
1954 None,
1955 )?;
1956 assert_eq!(stats, BlockGcStats {
1957 mc_blocks_removed: 0,
1958 total_blocks_removed: 0,
1959 });
1960
1961 Ok(())
1962 }
1963}