1use std::collections::hash_map;
2use std::fs::File;
3use std::io::Cursor;
4use std::mem::ManuallyDrop;
5use std::num::NonZeroU32;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
8use std::time::Instant;
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use futures_util::FutureExt;
13use futures_util::future::BoxFuture;
14use tycho_block_util::block::*;
15use tycho_block_util::dict::split_aug_dict_raw;
16use tycho_block_util::state::*;
17use tycho_storage::fs::TempFileStorage;
18use tycho_storage::kv::StoredValue;
19use tycho_types::merkle::{FindCell, MerkleUpdate, ParMerkleUpdateApplier};
20use tycho_types::models::*;
21use tycho_types::prelude::*;
22use tycho_util::futures::Shared;
23use tycho_util::mem::Reclaimer;
24use tycho_util::metrics::HistogramGuard;
25use tycho_util::sync::rayon_run;
26use tycho_util::{FastDashMap, FastHashMap, FastHashSet};
27use weedb::rocksdb;
28
29use self::cell_storage::*;
30use self::store_state_raw::StoreStateContext;
31use super::{
32 BlockConnectionStorage, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb,
33 CoreStorageConfig,
34};
35use crate::storage::BlockConnection;
36
37mod cell_storage;
38mod entries_buffer;
39mod store_state_raw;
40
41pub struct ShardStateStorage {
42 cells_db: CellsDb,
43
44 block_handle_storage: Arc<BlockHandleStorage>,
45 block_storage: Arc<BlockStorage>,
46 cell_storage: Arc<CellStorage>,
47 block_connections: Arc<BlockConnectionStorage>,
48 temp_file_storage: TempFileStorage,
49
50 gc_lock: Arc<tokio::sync::Mutex<()>>,
51 min_ref_mc_state: MinRefMcStateTracker,
52 counters: Arc<StoreStateCounters>,
53
54 shard_split_depth: u8,
55 new_cells_threshold: usize,
56 store_shard_state_step: NonZeroU32,
57
58 shard_states_cache: FastDashMap<ShardIdent, ShardStatesCache>,
59}
60
61impl ShardStateStorage {
62 pub fn new(
64 cells_db: CellsDb,
65 block_handle_storage: Arc<BlockHandleStorage>,
66 block_storage: Arc<BlockStorage>,
67 block_connections: Arc<BlockConnectionStorage>,
68 temp_file_storage: TempFileStorage,
69 config: &CoreStorageConfig,
70 ) -> Result<Arc<Self>> {
71 let cell_storage = CellStorage::new(
72 cells_db.clone(),
73 config.cells_cache_size,
74 config.drop_interval,
75 );
76
77 Ok(Arc::new(Self {
78 cells_db,
79 block_handle_storage,
80 block_storage,
81 temp_file_storage,
82 cell_storage,
83 block_connections,
84 shard_split_depth: config.shard_split_depth,
85 new_cells_threshold: config.max_new_cells_threshold,
86 store_shard_state_step: config.store_shard_state_step,
87 gc_lock: Default::default(),
88 min_ref_mc_state: MinRefMcStateTracker::new(),
89 counters: Default::default(),
90 shard_states_cache: Default::default(),
91 }))
92 }
93
94 pub fn metrics(&self) -> ShardStateStorageMetrics {
95 let counters = self.counters.as_ref();
96 ShardStateStorageMetrics {
97 max_new_mc_cell_count: counters.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
98 max_new_sc_cell_count: counters.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
99 }
100 }
101
102 pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
103 &self.min_ref_mc_state
104 }
105
106 pub fn cell_storage(&self) -> &Arc<CellStorage> {
107 &self.cell_storage
108 }
109
110 pub fn load_mc_block_id(&self, seqno: u32) -> Result<Option<BlockId>> {
112 let snapshot = self.cells_db.rocksdb().snapshot();
113 self.find_mc_block_id(seqno, &snapshot)
114 }
115
116 pub async fn store_state_ignore_cache(
117 &self,
118 handle: &BlockHandle,
119 state: &ShardStateStuff,
120 hint: StoreStateHint,
121 ) -> Result<()> {
122 self.store_state_root_ignore_cache(
123 handle,
124 state.root_cell().clone(),
125 state.ref_mc_state_handle().clone(),
126 hint,
127 )
128 .await
129 }
130
131 pub async fn store_state_root_ignore_cache(
132 &self,
133 handle: &BlockHandle,
134 root: Cell,
135 ref_mc_state_handle: RefMcStateHandle,
136 hint: StoreStateHint,
137 ) -> Result<()> {
138 let root_cell = DirectStoreRoot::Exact {
139 root,
140 ref_mc_state_handle,
141 };
142 let complete = Arc::new(AtomicBool::new(false));
143
144 let f = self.make_store_state_root_direct_task(handle.clone(), root_cell, hint, complete);
145 tokio::task::spawn_blocking(move || f(None)).await??;
146
147 Ok(())
148 }
149
150 pub fn begin_store_next_state(
163 self: &Arc<Self>,
164 prev_handle: &BlockHandle,
165 next_handle: &BlockHandle,
166 merkle_update: &MerkleUpdate,
167 state: Option<ShardStateStuff>,
168 hint: StoreStateHint,
169 get_merkle_update: Option<Box<FnGetBlockInfoForApply>>,
170 ) -> Result<InitiatedStoreState> {
171 enum StoreType {
172 Direct,
173 Virtual,
174 }
175
176 let prev_block_id = prev_handle.id();
178 let block_id = next_handle.id();
179 anyhow::ensure!(
180 prev_block_id.shard == block_id.shard,
181 "handle shard mismatch: {} != {}",
182 prev_block_id.shard,
183 block_id.shard,
184 );
185 anyhow::ensure!(
186 prev_block_id.seqno + 1 == block_id.seqno,
187 "merkle update can only be applied to consecutive blocks: \
188 prev={prev_block_id}, next={block_id}",
189 );
190
191 if next_handle.has_state() || next_handle.has_virtual_state() {
192 return Ok(InitiatedStoreState::existing(next_handle, self));
193 }
194
195 let direct_store_step = if block_id.is_masterchain() {
196 NonZeroU32::MIN
198 } else {
199 self.store_shard_state_step
200 };
201
202 let mut cache = self.shard_states_cache.entry(block_id.shard).or_default();
203
204 if cache.accumulator.blocks.insert(block_id.seqno) {
205 cache.accumulator.new_cells = cache
206 .accumulator
207 .new_cells
208 .saturating_add(hint.new_cell_count());
209 }
210
211 metrics::histogram!("tycho_storage_shard_state_accumulated_new_cells")
212 .record(cache.accumulator.new_cells as f64);
213
214 let force_store = block_id.seqno.is_multiple_of(direct_store_step.get())
215 || cache.accumulator.new_cells >= self.new_cells_threshold
216 || hint.is_top_block == Some(true);
217
218 let store_type = if force_store {
219 metrics::counter!("tycho_storage_shard_state_stored").increment(1);
220
221 cache.accumulator.reset();
223
224 StoreType::Direct
225 } else {
226 metrics::counter!("tycho_storage_shard_state_skipped").increment(1);
227 StoreType::Virtual
228 };
229
230 let spawn_cleanup = |task: PendingStoreTask| {
231 let this = Arc::downgrade(self);
232 let block_id = *block_id;
233 tokio::task::spawn(async move {
234 let (result, _) = task.await;
235
236 let Some(this) = this.upgrade() else {
237 return;
238 };
239
240 this.shard_states_cache
241 .get_mut(&block_id.shard)
242 .expect("shard must not dissapear from cache")
243 .save_result(&block_id, result);
244 });
245 };
246
247 let task = {
248 let cache = &mut *cache;
249
250 let prev_state_task = cache
251 .states
252 .get(&prev_block_id.root_hash)
253 .map(|entry| entry.state.clone());
254
255 let make_prev_task_fut = move || {
256 let Some(cached) = prev_state_task else {
257 return self
258 .load_prev_state_root_no_cache(
259 next_handle.ref_by_mc_seqno(),
260 prev_block_id,
261 direct_store_step,
262 get_merkle_update,
263 )
264 .boxed();
265 };
266
267 Box::pin(async move {
268 match cached {
269 CachedState::Stored(res) => Ok(res),
270 CachedState::Failed(error) => Err(anyhow::Error::from(error)),
271 CachedState::Pending(task) => {
272 let (res, _) = task.await;
273 res.map_err(anyhow::Error::from)
274 }
275 }
276 })
277 };
278
279 match cache.states.entry(block_id.root_hash) {
280 hash_map::Entry::Occupied(entry) => match &entry.get().state {
281 CachedState::Pending(task) => task.clone(),
282 CachedState::Stored(res) => {
283 return Ok(InitiatedStoreState {
284 handle: next_handle.clone(),
285 pending: Some(futures_util::future::ok(res.state.clone()).boxed()),
286 storage: self.clone(),
287 });
288 }
289 CachedState::Failed(e) => return Err(e.clone().into()),
290 },
291 hash_map::Entry::Vacant(entry) => {
292 let complete = Arc::new(AtomicBool::new(false));
293 let is_virtual = matches!(store_type, StoreType::Virtual);
294 let partial_root = merkle_update.new.clone();
295 let task: PendingStoreTask = match store_type {
296 StoreType::Direct => {
297 let prev_task_fut;
298 let state_root = match state {
299 Some(state) => {
300 prev_task_fut = None;
301 DirectStoreRoot::Exact {
302 root: state.root_cell().clone(),
303 ref_mc_state_handle: state.ref_mc_state_handle().clone(),
304 }
305 }
306 None => {
307 prev_task_fut = Some(make_prev_task_fut());
308 DirectStoreRoot::Next { partial_root }
309 }
310 };
311
312 let store_task = self.make_store_state_root_direct_task(
313 next_handle.clone(),
314 state_root,
315 hint,
316 complete.clone(),
317 );
318
319 Shared::new(Box::pin(async move {
320 let prev = match prev_task_fut {
321 None => None,
322 Some(fut) => Some(
323 fut.await
324 .context("previous state task failed (on direct)")?,
325 ),
326 };
327 tokio::task::spawn_blocking(move || store_task(prev))
328 .await?
329 .context("direct state store failed")
330 .map_err(StoreStateError::from)
331 }))
332 }
333 StoreType::Virtual => {
334 let prev_task_fut = make_prev_task_fut();
335 let store_task = self.make_store_state_root_virtual_task(
336 next_handle.clone(),
337 partial_root,
338 hint,
339 complete.clone(),
340 );
341
342 Shared::new(Box::pin(async move {
343 let prev = prev_task_fut
344 .await
345 .context("previous state task failed (on virtual)")?;
346 tokio::task::spawn_blocking(move || store_task(prev))
347 .await?
348 .context("virtual state store failed")
349 .map_err(StoreStateError::from)
350 }))
351 }
352 };
353
354 spawn_cleanup(task.clone());
355
356 entry.insert(ShardStatesCacheItem {
357 prev_block_id: *prev_block_id,
358 block_id: block_id.as_short_id(),
359 is_virtual,
360 partial_root_cell: merkle_update.new.clone(),
361 state: CachedState::Pending(task.clone()),
362 complete,
363 });
364
365 task
366 }
367 }
368 };
369
370 metrics::gauge!(
371 ShardStatesCache::METRIC_CACHE_SIZE,
372 "workchain" => block_id.shard.workchain().to_string()
373 )
374 .set(clamp_u64_to_u32(cache.states.len() as _));
375
376 drop(cache);
377
378 Ok(InitiatedStoreState {
379 handle: next_handle.clone(),
380 pending: Some(Box::pin(async move {
381 let (result, _) = task.await;
382 Ok(result?.state)
383 })),
384 storage: self.clone(),
385 })
386 }
387
388 fn load_prev_state_root_no_cache(
389 &self,
390 ref_by_mc_seqno: u32,
391 block_id: &BlockId,
392 max_tail: NonZeroU32,
393 get_merkle_update: Option<Box<FnGetBlockInfoForApply>>,
394 ) -> impl Future<Output = Result<StateWithApplier>> + Send + 'static {
395 let block_id = *block_id;
396 let block_handles = self.block_handle_storage.clone();
397 let blocks = self.block_storage.clone();
398 let block_connections = self.block_connections.clone();
399 let cell_storage = self.cell_storage.clone();
400 let tracker = self.min_ref_mc_state.clone();
401
402 async move {
403 let max_tail = max_tail.get() as usize - 1;
404
405 let get_merkle_update: &FnGetBlockInfoForApply = match &get_merkle_update {
406 Some(f) => f,
407 None => &|_| None,
408 };
409
410 let mut to_apply = Vec::new();
411 let mut pivot_block_id = block_id;
412 let pivot = 'pivot: {
413 while to_apply.len() <= max_tail {
414 let res = load_state_or_update(
415 ref_by_mc_seqno,
416 &pivot_block_id,
417 &block_handles,
418 &block_connections,
419 &cell_storage,
420 &tracker,
421 get_merkle_update,
422 )
423 .context("failed to load state or update on first access")?;
424
425 match res {
426 None => break,
427 Some(FromStorage::Virtual(f)) => {
428 to_apply.push(f.partial_root);
429 pivot_block_id = f.prev_block_id;
430 }
431 Some(FromStorage::Applied(applied)) => {
432 break 'pivot Some(applied);
433 }
434 }
435
436 tokio::task::yield_now().await;
438 }
439
440 None
441 };
442
443 let Some(StateWithApplier { state, applier }) = pivot else {
445 anyhow::bail!(StateNotFound(pivot_block_id.as_short_id()));
446 };
447
448 if to_apply.is_empty() {
450 anyhow::ensure!(state.block_id() == &block_id, "loaded state id mismatch");
451 return Ok(StateWithApplier { state, applier });
452 }
453
454 let state =
456 apply_updates_chain(&block_id, state, to_apply, applier.clone(), blocks).await?;
457
458 Ok(StateWithApplier { state, applier })
459 }
460 }
461
462 fn make_store_state_root_virtual_task(
463 &self,
464 handle: BlockHandle,
465 partial_root: Cell,
466 hint: StoreStateHint,
467 complete: Arc<AtomicBool>,
468 ) -> impl FnOnce(StateWithApplier) -> Result<StateWithApplier> + Send + 'static {
469 let block_handles = self.block_handle_storage.clone();
470
471 let complete_on_drop = scopeguard::guard(complete, |c| c.store(true, Ordering::Release));
472
473 move |StateWithApplier { applier, state }| {
474 let _guard = complete_on_drop;
475 let _hist = HistogramGuard::begin("tycho_storage_cell_virtual_store_time_high");
476
477 debug_assert_eq!(
478 applier.shard(),
479 handle.id().shard,
480 "applier must always be created for the same shard"
481 );
482 debug_assert!(
483 applier.pivot_block_seqno() < handle.id().seqno,
484 "cannot use applier for the future"
485 );
486
487 let state = state.par_make_next_state(handle.id(), partial_root, &applier)?;
488 block_handles.set_has_virtual_shard_state(&handle);
489
490 applier.add_new_virtual_cells(hint.new_cell_count());
491
492 Ok::<_, anyhow::Error>(StateWithApplier { state, applier })
493 }
494 }
495
496 fn make_store_state_root_direct_task(
497 &self,
498 handle: BlockHandle,
499 root_cell: DirectStoreRoot,
500 hint: StoreStateHint,
501 complete: Arc<AtomicBool>,
502 ) -> impl FnOnce(Option<StateWithApplier>) -> Result<StateWithApplier> + Send + 'static {
503 let cells_db = self.cells_db.clone();
504 let cell_storage = self.cell_storage.clone();
505 let block_handles = self.block_handle_storage.clone();
506 let shard_split_depth = self.shard_split_depth;
507 let counters = self.counters.clone();
508 let gc_lock = self.gc_lock.clone();
509
510 let complete_on_drop = scopeguard::guard(complete, |c| c.store(true, Ordering::Release));
511
512 move |prev| {
513 let guard = complete_on_drop;
514
515 let block_id = handle.id();
516 let tracker;
517 let root_hash = match &root_cell {
518 DirectStoreRoot::Exact {
519 root,
520 ref_mc_state_handle,
521 } => {
522 tracker = ref_mc_state_handle.tracker().clone();
523 *root.repr_hash()
524 }
525 DirectStoreRoot::Next { partial_root } => {
526 let prev = prev
527 .as_ref()
528 .expect("prev must be specified when storing next direct state");
529 tracker = prev.applier.ref_mc_state_handle().tracker().clone();
530 *partial_root.hash(0)
531 }
532 };
533
534 let load_existing_state = || {
535 let epoch = handle.ref_by_mc_seqno();
536 let root = cell_storage.load_cell(&root_hash, epoch)?;
537 let root = Cell::from(root as Arc<_>);
538
539 track_max_epoch(epoch);
540
541 let shard_state = root
542 .parse::<Box<ShardStateUnsplit>>()
543 .with_context(|| format!("failed to parse existing state: {block_id}"))?;
544 let handle = tracker.insert(&shard_state);
545
546 let state = ShardStateStuff::from_state_and_root(
547 block_id,
548 shard_state,
549 root,
550 handle.clone(),
551 )?;
552
553 let applier =
554 MerkleUpdateApplier::new(epoch, block_id, cell_storage.clone(), handle);
555
556 Ok::<_, anyhow::Error>(StateWithApplier { state, applier })
557 };
558
559 if handle.has_state() {
561 return load_existing_state();
562 }
563
564 let virtual_cell_count;
566 let prev_ref_mc_state_handle;
567 let root_cell = match root_cell {
568 DirectStoreRoot::Exact {
570 root,
571 ref_mc_state_handle,
572 } => {
573 virtual_cell_count = 0;
574 prev_ref_mc_state_handle = ref_mc_state_handle;
575 root
576 }
577 DirectStoreRoot::Next { partial_root } => {
580 let prev = prev
581 .as_ref()
582 .expect("prev must be specified when storing next direct state");
583 debug_assert_eq!(
584 prev.applier.shard(),
585 handle.id().shard,
586 "applier must always be created for the same shard"
587 );
588 debug_assert!(
589 prev.applier.pivot_block_seqno() < handle.id().seqno,
590 "cannot use applier for the future"
591 );
592 virtual_cell_count = prev.applier.new_virtual_cells();
593 prev_ref_mc_state_handle = prev.applier.ref_mc_state_handle().clone();
594 prev.applier
595 .make_next_state(partial_root)
596 .context("failed to make next direct state")?
597 }
598 };
599 drop(prev);
600
601 if handle.has_state() {
603 return load_existing_state();
604 }
605
606 let gc_lock = {
608 let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
609 gc_lock.blocking_lock()
610 };
611
612 if handle.has_state() {
614 return load_existing_state();
615 }
616
617 let estimated_merkle_update_size = virtual_cell_count + hint.new_cell_count();
619 let estimated_update_size_bytes = estimated_merkle_update_size * 192; let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
621
622 let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
623
624 let new_cell_count = if handle.is_masterchain() {
625 cell_storage.store_cell(
626 &mut batch,
627 root_cell.as_ref(),
628 estimated_merkle_update_size,
629 )?
630 } else {
631 let split_at = split_shard_accounts(&root_cell, shard_split_depth)?;
632
633 cell_storage.store_cell_mt(
634 root_cell.as_ref(),
635 &mut batch,
636 split_at,
637 estimated_merkle_update_size,
638 )?
639 };
640
641 in_mem_store.finish();
642
643 batch.put_cf(
644 &cells_db.shard_states.cf(),
645 block_id.to_vec(),
646 root_hash.as_slice(),
647 );
648
649 let _hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
651
652 metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
653 metrics::histogram!("tycho_storage_state_update_size_bytes")
654 .record(batch.size_in_bytes() as f64);
655 metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
656 .record(estimated_update_size_bytes as f64);
657
658 let counter = if handle.is_masterchain() {
659 &counters.max_new_mc_cell_count
660 } else {
661 &counters.max_new_sc_cell_count
662 };
663 counter.fetch_max(new_cell_count, Ordering::Release);
664
665 cells_db.rocksdb().write(batch)?;
666
667 drop(guard);
671
672 Reclaimer::instance().drop((root_cell, prev_ref_mc_state_handle));
674
675 block_handles.set_has_shard_state(&handle);
676
677 drop(gc_lock);
679
680 load_existing_state()
682 }
683 }
684
685 pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<HashBytes> {
687 self.store_state_raw_inner(block_id, boc).await
688 }
689
690 pub async fn store_state_bytes(&self, block_id: &BlockId, boc: Bytes) -> Result<HashBytes> {
691 let cursor = Cursor::new(boc);
692 self.store_state_raw_inner(block_id, cursor).await
693 }
694
695 async fn store_state_raw_inner<R>(&self, block_id: &BlockId, boc: R) -> Result<HashBytes>
696 where
697 R: std::io::Read + Send + 'static,
698 {
699 let ctx = StoreStateContext {
700 cells_db: self.cells_db.clone(),
701 cell_storage: self.cell_storage.clone(),
702 temp_file_storage: self.temp_file_storage.clone(),
703 };
704
705 let block_id = *block_id;
706
707 let gc_lock = self.gc_lock.clone().lock_owned().await;
708 tokio::task::spawn_blocking(move || {
709 let _gc_lock = gc_lock;
711
712 ctx.store(&block_id, boc)
713 })
714 .await?
715 }
716
717 pub fn load_state(
719 &self,
720 ref_by_mc_seqno: u32,
721 block_id: &BlockId,
722 ) -> impl Future<Output = Result<ShardStateStuff>> {
723 self.load_state_ext(ref_by_mc_seqno, block_id, Default::default(), |_| None)
724 }
725
726 pub async fn load_state_ext<F>(
750 &self,
751 ref_by_mc_seqno: u32,
752 block_id: &BlockId,
753 hint: LoadStateHint,
754 get_merkle_update: F,
755 ) -> Result<ShardStateStuff>
756 where
757 F: Fn(&BlockId) -> Option<BlockInfoForApply>,
758 {
759 fn load_failed(error: StoreStateError) -> anyhow::Error {
760 anyhow::anyhow!("unable to load a state that failed to save with error: {error:?}")
761 }
762
763 let try_load_from_storage = |block_id: &BlockId| {
764 load_state_or_update(
765 ref_by_mc_seqno,
766 block_id,
767 &self.block_handle_storage,
768 &self.block_connections,
769 &self.cell_storage,
770 &self.min_ref_mc_state,
771 &get_merkle_update,
772 )
773 };
774
775 let max_tail = self.store_shard_state_step.get() as usize;
776
777 let mut pivot_block_id = *block_id;
778 let mut to_apply = Vec::new();
779 let pivot = 'pivot: {
780 while to_apply.len() <= max_tail {
781 if let Some(cache) = self.shard_states_cache.get(&pivot_block_id.shard)
782 && let Some(item) = cache.states.get(&pivot_block_id.root_hash)
783 {
784 match &item.state {
786 CachedState::Stored(stored) => {
788 break 'pivot Some(stored.clone());
789 }
790 CachedState::Failed(error) => return Err(load_failed(error.clone())),
792 CachedState::Pending(task)
800 if !hint.allow_ignore_direct
801 || item.is_virtual
802 || item.complete.load(Ordering::Acquire)
803 || to_apply.len() >= max_tail
804 || !cache.states.contains_key(&item.prev_block_id.root_hash) =>
805 {
806 let task = task.clone();
807 drop(cache);
808
809 let (result, _) = task.await;
810 break 'pivot Some(result.map_err(load_failed)?);
811 }
812 CachedState::Pending(_) => {
816 to_apply.push(ToApply::Loaded(item.partial_root_cell.clone()));
817 pivot_block_id = item.prev_block_id;
818 }
819 }
820 } else {
821 match try_load_from_storage(&pivot_block_id)? {
825 None => break,
827 Some(FromStorage::Virtual(f)) => {
829 to_apply.push(f.partial_root);
830 pivot_block_id = f.prev_block_id;
831 }
832 Some(FromStorage::Applied(applied)) => {
835 break 'pivot Some(applied);
836 }
837 }
838
839 tokio::task::yield_now().await;
841 }
842 }
843
844 None
846 };
847
848 let Some(StateWithApplier { state, applier }) = pivot else {
850 anyhow::bail!(StateNotFound(pivot_block_id.as_short_id()));
851 };
852
853 if to_apply.is_empty() {
855 anyhow::ensure!(state.block_id() == block_id, "loaded state id mismatch");
856 return Ok(state);
857 }
858
859 apply_updates_chain(
861 block_id,
862 state,
863 to_apply,
864 applier,
865 self.block_storage.clone(),
866 )
867 .await
868 }
869
870 pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result<HashBytes> {
871 load_state_root_hash(&self.cells_db, block_id)
872 }
873
874 pub fn load_state_root_hash_opt(&self, block_id: &BlockId) -> Result<Option<HashBytes>> {
875 load_state_root_hash_opt(&self.cells_db, block_id)
876 }
877
878 #[tracing::instrument(skip(self))]
879 pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
880 let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
882 tracing::warn!("recent blocks edge not found");
883 return Ok(());
884 };
885
886 let target_block_id = top_blocks.mc_block;
887 tracing::info!(%target_block_id, "started states GC");
888
889 let started_at = Instant::now();
890 let block_handle_storage = self.block_handle_storage.clone();
891 let cell_storage = self.cell_storage.clone();
892 let cells_db = self.cells_db.clone();
893 let gc_lock = self.gc_lock.clone();
894 let shard_split_depth = self.shard_split_depth;
895
896 let (removed_states, removed_cells) = tokio::task::spawn_blocking(move || {
897 let raw = cells_db.rocksdb();
898
899 let snapshot = raw.snapshot();
901 let shard_states_cf = cells_db.shard_states.get_unbounded_cf();
902 let mut states_read_options = cells_db.shard_states.new_read_config();
903 states_read_options.set_snapshot(&snapshot);
904
905 let mut alloc = bumpalo_herd::Herd::new();
906
907 let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
909 iter.seek_to_first();
910
911 let mut removed_states = 0usize;
913 let mut removed_cells = 0usize;
914
915 loop {
916 let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
917 let (key, value) = match iter.item() {
918 Some(item) => item,
919 None => match iter.status() {
920 Ok(()) => break,
921 Err(e) => return Err(e.into()),
922 },
923 };
924
925 let block_id = BlockId::from_slice(key);
926 let root_hash = HashBytes::from_slice(&value[0..32]);
927
928 if block_id.seqno == 0
934 || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
935 {
936 iter.next();
937 continue;
938 }
939
940 if let Some(handle) = block_handle_storage.load_handle(&block_id)
942 && handle.skip_states_gc()
943 {
944 tracing::debug!(
945 block_id = %block_id,
946 "skipping states GC since it flagged by SKIP_STATES_GC"
947 );
948 iter.next();
949 continue;
950 }
951
952 alloc.reset();
953
954 let guard = {
955 let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
956 gc_lock.blocking_lock()
957 };
958
959 let in_mem_remove =
960 HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
961
962 let (total, mut batch) = if block_id.is_masterchain() {
963 cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
964 } else {
965 let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>);
968
969 let split_at = split_shard_accounts(&root_cell, shard_split_depth)?
970 .into_keys()
971 .collect::<FastHashSet<HashBytes>>();
972 cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
973 };
974
975 in_mem_remove.finish();
976
977 batch.delete_cf(&cells_db.shard_states.get_unbounded_cf().bound(), key);
978 cells_db
979 .raw()
980 .rocksdb()
981 .write_opt(batch, cells_db.cells.write_config())?;
982
983 drop(guard);
985
986 removed_cells += total;
987 tracing::debug!(removed_cells = total, %block_id);
988
989 removed_states += 1;
990 iter.next();
991
992 metrics::counter!("tycho_storage_state_gc_count").increment(1);
993 metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
994 if block_id.is_masterchain() {
995 metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
996 }
997 tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
998 }
999
1000 Ok::<_, anyhow::Error>((removed_states, removed_cells))
1001 })
1002 .await??;
1003
1004 tracing::info!(
1006 removed_states,
1007 removed_cells,
1008 block_id = %target_block_id,
1009 elapsed_sec = started_at.elapsed().as_secs_f64(),
1010 "finished states GC",
1011 );
1012 Ok(())
1013 }
1014
1015 pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
1019 if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno()
1021 && min_ref_mc_seqno < mc_seqno
1022 {
1023 mc_seqno = min_ref_mc_seqno;
1024 }
1025
1026 let snapshot = self.cells_db.rocksdb().snapshot();
1027
1028 let mc_block_id = match self
1032 .find_mc_block_id(mc_seqno, &snapshot)
1033 .context("Failed to find block id by seqno")?
1034 {
1035 Some(block_id) => block_id,
1036 None => return Ok(None),
1037 };
1038
1039 let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
1041 Some(handle) if handle.has_data() => handle,
1042 _ => return Ok(None),
1044 };
1045
1046 let block_data = self.block_storage.load_block_data(&handle).await?;
1049 let block_info = block_data
1050 .load_info()
1051 .context("Failed to read target block info")?;
1052
1053 let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
1055 let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
1056 Some(block_id) => block_id,
1057 None => return Ok(None),
1058 };
1059
1060 let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
1062 Some(handle) if handle.has_data() => handle,
1063 _ => return Ok(None),
1065 };
1066
1067 self.block_storage
1069 .load_block_data(&min_ref_block_handle)
1070 .await
1071 .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
1072 .map(Some)
1073 }
1074
1075 fn find_mc_block_id(
1076 &self,
1077 mc_seqno: u32,
1078 snapshot: &rocksdb::Snapshot<'_>,
1079 ) -> Result<Option<BlockId>> {
1080 let shard_states = &self.cells_db.shard_states;
1081
1082 let mut bound = BlockId {
1083 shard: ShardIdent::MASTERCHAIN,
1084 seqno: mc_seqno,
1085 root_hash: HashBytes::ZERO,
1086 file_hash: HashBytes::ZERO,
1087 };
1088
1089 let mut readopts = shard_states.new_read_config();
1090 readopts.set_snapshot(snapshot);
1091 readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
1092 bound.seqno += 1;
1093 readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
1094
1095 let mut iter = self
1096 .cells_db
1097 .rocksdb()
1098 .raw_iterator_cf_opt(&shard_states.cf(), readopts);
1099 iter.seek_to_first();
1100
1101 Ok(iter.key().map(BlockId::from_slice))
1102 }
1103
1104 #[cfg(test)]
1105 fn contains_state(&self, block_id: &BlockId) -> Result<bool> {
1106 let shard_states = &self.cells_db.shard_states;
1107 Ok(shard_states.get(block_id.to_vec())?.is_some())
1108 }
1109}
1110
1111fn load_state_by_hash(
1112 ref_by_mc_seqno: u32,
1113 block_id: &BlockId,
1114 root_hash: &HashBytes,
1115 cell_storage: &Arc<CellStorage>,
1116 tracker: &MinRefMcStateTracker,
1117) -> Result<ShardStateStuff> {
1118 let root = cell_storage.load_cell(root_hash, ref_by_mc_seqno)?;
1119 let root = Cell::from(root as Arc<_>);
1120
1121 track_max_epoch(ref_by_mc_seqno);
1122 let shard_state = root.parse::<Box<ShardStateUnsplit>>()?;
1123 let handle = tracker.insert(&shard_state);
1124 ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle)
1125}
1126
1127fn load_state_root_hash(cells_db: &CellsDb, block_id: &BlockId) -> Result<HashBytes> {
1128 match load_state_root_hash_opt(cells_db, block_id)? {
1129 Some(hash) => Ok(hash),
1130 None => anyhow::bail!(StateNotFound(block_id.as_short_id())),
1131 }
1132}
1133
1134fn load_state_root_hash_opt(cells_db: &CellsDb, block_id: &BlockId) -> Result<Option<HashBytes>> {
1135 let Some(root) = cells_db.shard_states.get(block_id.to_vec())? else {
1136 return Ok(None);
1137 };
1138 Ok(Some(HashBytes::from_slice(&root[..32])))
1139}
1140
1141enum ToApply {
1142 Loaded(Cell),
1143 Deferred(BlockHandle),
1144}
1145
1146enum FromStorage {
1147 Applied(StateWithApplier),
1148 Virtual(VirtualBlockInfo),
1149}
1150
1151fn load_state_or_update<F>(
1152 ref_by_mc_seqno: u32,
1153 block_id: &BlockId,
1154 block_handles: &BlockHandleStorage,
1155 block_connections: &BlockConnectionStorage,
1156 cell_storage: &Arc<CellStorage>,
1157 tracker: &MinRefMcStateTracker,
1158 get_merkle_update: F,
1159) -> Result<Option<FromStorage>>
1160where
1161 F: Fn(&BlockId) -> Option<BlockInfoForApply>,
1162{
1163 if let Some(root_hash) = load_state_root_hash_opt(cell_storage.db(), block_id)? {
1164 let state =
1165 load_state_by_hash(ref_by_mc_seqno, block_id, &root_hash, cell_storage, tracker)?;
1166 let ref_mc_state_handle = state.ref_mc_state_handle().clone();
1167 return Ok(Some(FromStorage::Applied(StateWithApplier {
1168 state,
1169 applier: MerkleUpdateApplier::new(
1170 ref_by_mc_seqno,
1171 block_id,
1172 cell_storage.clone(),
1173 ref_mc_state_handle,
1174 ),
1175 })));
1176 }
1177
1178 let handle = match block_handles.load_handle(block_id) {
1179 Some(handle) => handle,
1180 None => {
1181 return Ok(get_merkle_update(block_id).map(|f| {
1182 FromStorage::Virtual(VirtualBlockInfo {
1183 prev_block_id: f.prev_block_id,
1184 partial_root: ToApply::Loaded(f.partial_root_cell),
1185 })
1186 }));
1187 }
1188 };
1189
1190 Ok(Some(FromStorage::Virtual(
1191 match get_merkle_update(block_id) {
1192 Some(f) => VirtualBlockInfo {
1193 prev_block_id: f.prev_block_id,
1194 partial_root: ToApply::Loaded(f.partial_root_cell),
1195 },
1196 None => {
1197 if !handle.has_data() {
1198 return Ok(None);
1199 }
1200
1201 let Some(prev_block_id) =
1202 block_connections.load_connection(block_id, BlockConnection::Prev1)
1203 else {
1204 anyhow::bail!("prev block id not found for {block_id}");
1205 };
1206
1207 VirtualBlockInfo {
1208 prev_block_id,
1209 partial_root: ToApply::Deferred(handle),
1210 }
1211 }
1212 },
1213 )))
1214}
1215
1216async fn apply_updates_chain(
1217 block_id: &BlockId,
1218 pivot_state: ShardStateStuff,
1219 mut to_apply: Vec<ToApply>,
1220 applier: Arc<MerkleUpdateApplier>,
1221 blocks: Arc<BlockStorage>,
1222) -> Result<ShardStateStuff> {
1223 let ref_mc_state_handle = pivot_state.ref_mc_state_handle().clone();
1224 let mut pivot_root = pivot_state.root_cell().clone();
1225 drop(pivot_state);
1226
1227 let pivot_root = rayon_run(move || {
1228 while let Some(item) = to_apply.pop() {
1229 let partial_new_root = match item {
1230 ToApply::Loaded(cell) => cell,
1231 ToApply::Deferred(handle) => {
1232 let block = blocks.blocking_load_block_data(&handle)?;
1233 block.as_ref().load_state_update()?.new
1234 }
1235 };
1236
1237 pivot_root = applier
1238 .make_next_state(partial_new_root)
1239 .context("failed to apply next state for chain from storage")?;
1240 }
1241
1242 Ok::<_, anyhow::Error>(pivot_root)
1243 })
1244 .await?;
1245
1246 let shard_state = pivot_root.parse::<Box<ShardStateUnsplit>>()?;
1247 ShardStateStuff::from_state_and_root(block_id, shard_state, pivot_root, ref_mc_state_handle)
1248}
1249
1250pub struct InitiatedStoreState {
1251 handle: BlockHandle,
1252 pending: Option<BoxFuture<'static, Result<ShardStateStuff>>>,
1253 storage: Arc<ShardStateStorage>,
1254}
1255
1256impl InitiatedStoreState {
1257 fn existing(handle: &BlockHandle, storage: &Arc<ShardStateStorage>) -> Self {
1258 Self {
1259 handle: handle.clone(),
1260 pending: None,
1261 storage: storage.clone(),
1262 }
1263 }
1264
1265 pub fn handle(&self) -> &BlockHandle {
1266 &self.handle
1267 }
1268
1269 pub async fn wait_store_only(self) -> Result<()> {
1270 if let Some(task) = self.pending {
1271 task.await?;
1272 }
1273 Ok(())
1274 }
1275
1276 pub async fn wait_reload(self) -> Result<ShardStateStuff> {
1277 match self.pending {
1278 None => {
1279 self.storage
1280 .load_state(self.handle.ref_by_mc_seqno(), self.handle.id())
1281 .await
1282 }
1283 Some(pending) => pending.await,
1284 }
1285 }
1286}
1287
1288fn track_max_epoch(epoch: u32) {
1289 static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0);
1291
1292 let max_known_epoch = MAX_KNOWN_EPOCH
1293 .fetch_max(epoch, Ordering::Relaxed)
1294 .max(epoch);
1295 metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch);
1296}
1297
1298#[derive(Default)]
1299struct ShardStatesCache {
1300 pivot_block_seqno: u32,
1301 accumulator: ShardCellsAccumulator,
1302 states: FastHashMap<HashBytes, ShardStatesCacheItem>,
1303}
1304
1305#[derive(Default)]
1306struct ShardCellsAccumulator {
1307 new_cells: usize,
1308 blocks: FastHashSet<u32>,
1309}
1310
1311impl ShardCellsAccumulator {
1312 fn reset(&mut self) {
1313 self.new_cells = 0;
1314
1315 self.blocks.clear();
1317 }
1318}
1319
1320impl ShardStatesCache {
1321 const METRIC_PIVOT_SEQNO: &str = "tycho_storage_state_shard_cache_pivot_seqno";
1322 const METRIC_CACHE_SIZE: &str = "tycho_storage_state_shard_cache_size";
1323
1324 fn save_result(&mut self, block_id: &BlockId, result: StoreTaskResult) {
1325 let Some(item) = self.states.get_mut(&block_id.root_hash) else {
1326 return;
1327 };
1328
1329 match result {
1330 Ok(res) => {
1331 let pivot_block_seqno = res.applier.pivot_block_seqno();
1332 item.state = CachedState::Stored(res);
1333
1334 if !item.is_virtual && pivot_block_seqno > self.pivot_block_seqno {
1336 self.pivot_block_seqno = pivot_block_seqno;
1337 self.states
1338 .retain(|_, item| item.block_id.seqno >= pivot_block_seqno);
1339
1340 let labels = [("workchain", block_id.shard.workchain().to_string())];
1341 metrics::gauge!(Self::METRIC_PIVOT_SEQNO, &labels).set(pivot_block_seqno);
1342 metrics::gauge!(Self::METRIC_CACHE_SIZE, &labels)
1343 .set(clamp_u64_to_u32(self.states.len() as _));
1344 }
1345 }
1346 Err(error) => {
1347 tracing::error!(%block_id, "store state failed: {error:?}");
1348 item.state = CachedState::Failed(error);
1349 }
1350 }
1351 }
1352}
1353
1354enum DirectStoreRoot {
1355 Exact {
1356 root: Cell,
1357 ref_mc_state_handle: RefMcStateHandle,
1358 },
1359 Next {
1360 partial_root: Cell,
1361 },
1362}
1363
1364struct ShardStatesCacheItem {
1365 prev_block_id: BlockId,
1366 block_id: BlockIdShort,
1367 is_virtual: bool,
1368 partial_root_cell: Cell,
1369 state: CachedState,
1370 complete: Arc<AtomicBool>,
1371}
1372
1373impl Drop for ShardStatesCacheItem {
1374 fn drop(&mut self) {
1375 Reclaimer::instance().drop(std::mem::take(&mut self.partial_root_cell));
1376 }
1377}
1378
1379#[derive(Clone)]
1380enum CachedState {
1381 Pending(PendingStoreTask),
1382 Stored(StateWithApplier),
1383 Failed(StoreStateError),
1384}
1385
1386#[derive(Clone)]
1387struct StateWithApplier {
1388 state: ShardStateStuff,
1389 applier: Arc<MerkleUpdateApplier>,
1390}
1391
1392type PendingStoreTask = Shared<BoxFuture<'static, StoreTaskResult>>;
1393type StoreTaskResult = Result<StateWithApplier, StoreStateError>;
1394
1395#[derive(Clone, thiserror::Error)]
1396#[error(transparent)]
1397struct StoreStateError(Arc<anyhow::Error>);
1398
1399impl std::fmt::Debug for StoreStateError {
1400 #[inline]
1401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1402 std::fmt::Debug::fmt(self.0.as_ref(), f)
1403 }
1404}
1405
1406impl From<anyhow::Error> for StoreStateError {
1407 fn from(value: anyhow::Error) -> Self {
1408 Self(Arc::new(value))
1409 }
1410}
1411
1412impl From<tokio::task::JoinError> for StoreStateError {
1413 fn from(value: tokio::task::JoinError) -> Self {
1414 Self(Arc::new(anyhow::anyhow!("task failed: {value}")))
1415 }
1416}
1417
1418struct MerkleUpdateApplier(ManuallyDrop<MerkleUpdateApplierInner>);
1421
1422struct MerkleUpdateApplierInner {
1423 shard: ShardIdent,
1424 pivot_block_seqno: u32,
1425 applier: ParMerkleUpdateApplier<'static, MerkleCellsProvider>,
1426 ref_mc_state_handle: RefMcStateHandle,
1427 new_virtual_cells: AtomicUsize,
1428 metric_total_new_cells: AtomicUsize,
1430}
1431
1432impl MerkleUpdateApplier {
1433 const METRIC_ALIVE_APPLIERS: &str = "tycho_storage_state_applier_count";
1434 const METRIC_NEW_CELL_COUNT: &str = "tycho_storage_state_applier_all_new_cell_count";
1435
1436 fn new(
1437 epoch: u32,
1438 pivot_block_id: &BlockId,
1439 cell_storage: Arc<CellStorage>,
1440 ref_mc_state_handle: RefMcStateHandle,
1441 ) -> Arc<Self> {
1442 let v = ALIVE_STATE_APPLIERS.fetch_add(1, Ordering::Relaxed) + 1;
1443 metrics::gauge!(Self::METRIC_ALIVE_APPLIERS).set(v);
1444
1445 Arc::new(Self(ManuallyDrop::new(MerkleUpdateApplierInner {
1446 shard: pivot_block_id.shard,
1447 pivot_block_seqno: pivot_block_id.seqno,
1448 applier: ParMerkleUpdateApplier {
1449 new_cells: Default::default(),
1450 total_new_cells: Default::default(),
1451 context: Cell::empty_context(),
1452 find_cell: MerkleCellsProvider {
1453 epoch,
1454 storage: cell_storage,
1455 },
1456 find_in_new_cells: true,
1457 },
1458 ref_mc_state_handle,
1459 new_virtual_cells: AtomicUsize::new(0),
1460 metric_total_new_cells: AtomicUsize::new(0),
1461 })))
1462 }
1463
1464 fn shard(&self) -> ShardIdent {
1465 self.0.shard
1466 }
1467
1468 fn pivot_block_seqno(&self) -> u32 {
1469 self.0.pivot_block_seqno
1470 }
1471
1472 fn add_new_virtual_cells(&self, count: usize) {
1473 self.0.new_virtual_cells.fetch_add(count, Ordering::Release);
1474 }
1475
1476 fn new_virtual_cells(&self) -> usize {
1477 self.0.new_virtual_cells.load(Ordering::Acquire)
1478 }
1479
1480 fn ref_mc_state_handle(&self) -> &RefMcStateHandle {
1481 &self.0.ref_mc_state_handle
1482 }
1483
1484 fn make_next_state(&self, partial_new_root: Cell) -> Result<Cell, tycho_types::error::Error> {
1485 if let Some(cell) = self.find_cell.find_cell(partial_new_root.hash(0)) {
1486 return Ok(cell);
1487 }
1488
1489 let new = rayon::scope(|scope| self.run(partial_new_root.as_ref(), 0, 0, Some(scope)))?;
1490 let res = new.resolve(Cell::empty_context());
1491
1492 let diff = self.total_new_cells.swap(0, Ordering::Relaxed);
1495 self.0
1496 .metric_total_new_cells
1497 .fetch_add(diff, Ordering::Release);
1498
1499 let v = NEW_CELL_COUNT
1500 .fetch_add(diff as u64, Ordering::Release)
1501 .saturating_add(diff as u64);
1502 metrics::gauge!(Self::METRIC_NEW_CELL_COUNT).set(clamp_u64_to_u32(v));
1503
1504 res
1505 }
1506}
1507
1508impl Drop for MerkleUpdateApplier {
1509 fn drop(&mut self) {
1510 let mut inner = unsafe { ManuallyDrop::take(&mut self.0) };
1512
1513 Reclaimer::instance().drop((inner.applier.new_cells, inner.ref_mc_state_handle));
1516
1517 let v = ALIVE_STATE_APPLIERS.fetch_sub(1, Ordering::Relaxed) - 1;
1518 metrics::gauge!(Self::METRIC_ALIVE_APPLIERS).set(v);
1519
1520 let diff = *inner.metric_total_new_cells.get_mut() as u64;
1521 let v = NEW_CELL_COUNT
1522 .fetch_sub(diff, Ordering::Release)
1523 .saturating_sub(diff);
1524 metrics::gauge!(Self::METRIC_NEW_CELL_COUNT).set(clamp_u64_to_u32(v));
1525 }
1526}
1527
1528impl std::ops::Deref for MerkleUpdateApplier {
1529 type Target = ParMerkleUpdateApplier<'static, MerkleCellsProvider>;
1530
1531 #[inline]
1532 fn deref(&self) -> &Self::Target {
1533 &self.0.applier
1534 }
1535}
1536
1537fn clamp_u64_to_u32(value: u64) -> u32 {
1538 u32::try_from(value).unwrap_or(u32::MAX)
1539}
1540
1541static ALIVE_STATE_APPLIERS: AtomicU32 = AtomicU32::new(0);
1542static NEW_CELL_COUNT: AtomicU64 = AtomicU64::new(0);
1543
1544struct MerkleCellsProvider {
1545 epoch: u32,
1546 storage: Arc<CellStorage>,
1547}
1548
1549impl FindCell for MerkleCellsProvider {
1550 fn find_cell(&self, hash: &HashBytes) -> Option<Cell> {
1551 let cell = self.storage.load_cell(hash, self.epoch).ok()?;
1552 Some(Cell::from(cell as Arc<_>))
1553 }
1554}
1555
1556#[derive(Default, Debug, Clone, Copy)]
1557pub struct StoreStateHint {
1558 pub block_data_size: usize,
1559 pub new_cell_count: Option<usize>,
1560 pub is_top_block: Option<bool>,
1561}
1562
1563impl StoreStateHint {
1564 fn new_cell_count(&self) -> usize {
1565 match self.new_cell_count {
1566 None => estimate_cell_count(self.block_data_size),
1567 Some(count) => count,
1568 }
1569 }
1570}
1571
1572fn estimate_cell_count(block_data_size: usize) -> usize {
1573 ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
1576}
1577
1578#[derive(Default, Debug, Clone, Copy)]
1579pub struct LoadStateHint {
1580 pub allow_ignore_direct: bool,
1583}
1584
1585#[derive(Debug, Copy, Clone)]
1586pub struct ShardStateStorageMetrics {
1587 pub max_new_mc_cell_count: usize,
1588 pub max_new_sc_cell_count: usize,
1589}
1590
1591#[derive(Default)]
1592struct StoreStateCounters {
1593 max_new_mc_cell_count: AtomicUsize,
1594 max_new_sc_cell_count: AtomicUsize,
1595}
1596
1597#[derive(thiserror::Error, Debug)]
1598#[error("shard state not found for block: {0}")]
1599pub struct StateNotFound(pub BlockIdShort);
1600
1601pub fn split_shard_accounts(
1602 root_cell: impl AsRef<DynCell>,
1603 split_depth: u8,
1604) -> Result<FastHashMap<HashBytes, Cell>> {
1605 let shard_accounts = root_cell
1608 .as_ref()
1609 .reference_cloned(1)
1610 .context("invalid shard state")?
1611 .parse::<ShardAccounts>()
1612 .context("failed to load shard accounts")?;
1613
1614 split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
1615}
1616
1617#[derive(Debug, Clone)]
1618pub struct BlockInfoForApply {
1619 pub prev_block_id: BlockId,
1620 pub partial_root_cell: Cell,
1621}
1622
1623type FnGetBlockInfoForApply = dyn Fn(&BlockId) -> Option<BlockInfoForApply> + Send + Sync + 'static;
1624
1625struct VirtualBlockInfo {
1626 prev_block_id: BlockId,
1627 partial_root: ToApply,
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632 use anyhow::Result;
1633 use tycho_block_util::archive::WithArchiveData;
1634 use tycho_block_util::block::BlockStuff;
1635 use tycho_block_util::state::ShardStateStuff;
1636 use tycho_storage::StorageContext;
1637 use tycho_types::boc::BocRepr;
1638 use tycho_types::cell::{CellBuilder, Lazy};
1639 use tycho_types::models::{
1640 BlockExtra, BlockId, BlockInfo, McBlockExtra, ShardHashes, ShardIdent, ShardStateUnsplit,
1641 };
1642
1643 use crate::storage::{CoreStorage, CoreStorageConfig, NewBlockMeta};
1644
1645 #[tokio::test]
1646 async fn states_gc_skip_lifecycle() -> Result<()> {
1647 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1648 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1649
1650 let handles = storage.block_handle_storage();
1651 let blocks = storage.block_storage();
1652 let states = storage.shard_state_storage();
1653
1654 let target = 10u32;
1655 let prev = target - 1;
1656
1657 let top = BlockStuff::new_with(ShardIdent::MASTERCHAIN, target, |block| {
1658 let info = BlockInfo {
1659 shard: ShardIdent::MASTERCHAIN,
1660 seqno: target,
1661 min_ref_mc_seqno: target,
1662 ..Default::default()
1663 };
1664 block.info = Lazy::new(&info).unwrap();
1665
1666 let extra = BlockExtra {
1667 custom: Some(
1668 Lazy::new(&McBlockExtra {
1669 shards: ShardHashes::default(),
1670 ..Default::default()
1671 })
1672 .unwrap(),
1673 ),
1674 ..Default::default()
1675 };
1676 block.extra = Lazy::new(&extra).unwrap();
1677 });
1678 let top_id = *top.id();
1679
1680 let data = BocRepr::encode_rayon(top.as_ref()).unwrap();
1681 let top = WithArchiveData::new(top, data);
1682
1683 let stored = blocks
1684 .store_block_data(&top, &top.archive_data, NewBlockMeta {
1685 is_key_block: false,
1686 gen_utime: 0,
1687 ref_by_mc_seqno: target,
1688 })
1689 .await?;
1690 let handle = stored.handle;
1691
1692 let prev_id = *BlockStuff::new_empty(ShardIdent::MASTERCHAIN, prev).id();
1693
1694 let make_state = |id: BlockId| -> Result<ShardStateStuff> {
1695 let state = ShardStateUnsplit {
1696 shard_ident: id.shard,
1697 seqno: id.seqno,
1698 min_ref_mc_seqno: target,
1699 ..Default::default()
1700 };
1701
1702 let root = CellBuilder::build_from(&state)?;
1703 let handle = states.min_ref_mc_state().insert_untracked();
1704 ShardStateStuff::from_state_and_root(&id, Box::new(state), root, handle)
1705 };
1706
1707 let top_state = make_state(top_id)?;
1708 states
1709 .store_state_ignore_cache(&handle, &top_state, Default::default())
1710 .await?;
1711
1712 let prev_state = make_state(prev_id)?;
1713 let (handle, _) = handles.create_or_load_handle(&prev_id, NewBlockMeta {
1714 is_key_block: false,
1715 gen_utime: 0,
1716 ref_by_mc_seqno: prev,
1717 });
1718 states
1719 .store_state_ignore_cache(&handle, &prev_state, Default::default())
1720 .await?;
1721
1722 handles.set_skip_states_gc(&handle);
1723 assert!(handle.skip_states_gc());
1724
1725 states.remove_outdated_states(target).await?;
1726 assert!(states.contains_state(&prev_id)?);
1727 assert!(states.contains_state(&top_id)?);
1728
1729 handles.set_skip_states_gc_finished(&handle);
1730 assert!(!handle.skip_states_gc());
1731
1732 states.remove_outdated_states(target).await?;
1733 assert!(!states.contains_state(&prev_id)?);
1734 assert!(states.contains_state(&top_id)?);
1735
1736 Ok(())
1737 }
1738}