Skip to main content

tycho_core/storage/shard_state/
mod.rs

1use std::collections::hash_map;
2use std::fs::File;
3use std::io::Cursor;
4use std::mem::ManuallyDrop;
5use std::num::NonZeroU32;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
8use std::time::Instant;
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use futures_util::FutureExt;
13use futures_util::future::BoxFuture;
14use tycho_block_util::block::*;
15use tycho_block_util::dict::split_aug_dict_raw;
16use tycho_block_util::state::*;
17use tycho_storage::fs::TempFileStorage;
18use tycho_storage::kv::StoredValue;
19use tycho_types::merkle::{FindCell, MerkleUpdate, ParMerkleUpdateApplier};
20use tycho_types::models::*;
21use tycho_types::prelude::*;
22use tycho_util::futures::Shared;
23use tycho_util::mem::Reclaimer;
24use tycho_util::metrics::HistogramGuard;
25use tycho_util::sync::rayon_run;
26use tycho_util::{FastDashMap, FastHashMap, FastHashSet};
27use weedb::rocksdb;
28
29use self::cell_storage::*;
30use self::store_state_raw::StoreStateContext;
31use super::{
32    BlockConnectionStorage, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb,
33    CoreStorageConfig,
34};
35use crate::storage::BlockConnection;
36
37mod cell_storage;
38mod entries_buffer;
39mod store_state_raw;
40
41pub struct ShardStateStorage {
42    cells_db: CellsDb,
43
44    block_handle_storage: Arc<BlockHandleStorage>,
45    block_storage: Arc<BlockStorage>,
46    cell_storage: Arc<CellStorage>,
47    block_connections: Arc<BlockConnectionStorage>,
48    temp_file_storage: TempFileStorage,
49
50    gc_lock: Arc<tokio::sync::Mutex<()>>,
51    min_ref_mc_state: MinRefMcStateTracker,
52    counters: Arc<StoreStateCounters>,
53
54    shard_split_depth: u8,
55    new_cells_threshold: usize,
56    store_shard_state_step: NonZeroU32,
57
58    shard_states_cache: FastDashMap<ShardIdent, ShardStatesCache>,
59}
60
61impl ShardStateStorage {
62    // TODO: Replace args with a config.
63    pub fn new(
64        cells_db: CellsDb,
65        block_handle_storage: Arc<BlockHandleStorage>,
66        block_storage: Arc<BlockStorage>,
67        block_connections: Arc<BlockConnectionStorage>,
68        temp_file_storage: TempFileStorage,
69        config: &CoreStorageConfig,
70    ) -> Result<Arc<Self>> {
71        let cell_storage = CellStorage::new(
72            cells_db.clone(),
73            config.cells_cache_size,
74            config.drop_interval,
75        );
76
77        Ok(Arc::new(Self {
78            cells_db,
79            block_handle_storage,
80            block_storage,
81            temp_file_storage,
82            cell_storage,
83            block_connections,
84            shard_split_depth: config.shard_split_depth,
85            new_cells_threshold: config.max_new_cells_threshold,
86            store_shard_state_step: config.store_shard_state_step,
87            gc_lock: Default::default(),
88            min_ref_mc_state: MinRefMcStateTracker::new(),
89            counters: Default::default(),
90            shard_states_cache: Default::default(),
91        }))
92    }
93
94    pub fn metrics(&self) -> ShardStateStorageMetrics {
95        let counters = self.counters.as_ref();
96        ShardStateStorageMetrics {
97            max_new_mc_cell_count: counters.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
98            max_new_sc_cell_count: counters.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
99        }
100    }
101
102    pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
103        &self.min_ref_mc_state
104    }
105
106    pub fn cell_storage(&self) -> &Arc<CellStorage> {
107        &self.cell_storage
108    }
109
110    /// Find mc block id from db snapshot
111    pub fn load_mc_block_id(&self, seqno: u32) -> Result<Option<BlockId>> {
112        let snapshot = self.cells_db.rocksdb().snapshot();
113        self.find_mc_block_id(seqno, &snapshot)
114    }
115
116    pub async fn store_state_ignore_cache(
117        &self,
118        handle: &BlockHandle,
119        state: &ShardStateStuff,
120        hint: StoreStateHint,
121    ) -> Result<()> {
122        self.store_state_root_ignore_cache(
123            handle,
124            state.root_cell().clone(),
125            state.ref_mc_state_handle().clone(),
126            hint,
127        )
128        .await
129    }
130
131    pub async fn store_state_root_ignore_cache(
132        &self,
133        handle: &BlockHandle,
134        root: Cell,
135        ref_mc_state_handle: RefMcStateHandle,
136        hint: StoreStateHint,
137    ) -> Result<()> {
138        let root_cell = DirectStoreRoot::Exact {
139            root,
140            ref_mc_state_handle,
141        };
142        let complete = Arc::new(AtomicBool::new(false));
143
144        let f = self.make_store_state_root_direct_task(handle.clone(), root_cell, hint, complete);
145        tokio::task::spawn_blocking(move || f(None)).await??;
146
147        Ok(())
148    }
149
150    /// Stores a state for the specified block pair of blocks
151    /// using their merkle update and and optional state root.
152    ///
153    /// ## How does it work
154    ///
155    /// State must be stored into [`CellsDb`] at least every [`store_shard_state_step`]
156    /// blocks (where it is forced to be `1` for masterchain). When we first initialize
157    /// the cache state for a shard we load the closest direct state for the target
158    /// with all required "virtual" states. After that we decide how should the next
159    /// state be stored (as "direct" or "virtual") and store a spawned task into the cache.
160    ///
161    /// [`store_shard_state_step`]: CoreStorageConfig::store_shard_state_step
162    pub fn begin_store_next_state(
163        self: &Arc<Self>,
164        prev_handle: &BlockHandle,
165        next_handle: &BlockHandle,
166        merkle_update: &MerkleUpdate,
167        state: Option<ShardStateStuff>,
168        hint: StoreStateHint,
169        get_merkle_update: Option<Box<FnGetBlockInfoForApply>>,
170    ) -> Result<InitiatedStoreState> {
171        enum StoreType {
172            Direct,
173            Virtual,
174        }
175
176        // TODO: Consider splits.
177        let prev_block_id = prev_handle.id();
178        let block_id = next_handle.id();
179        anyhow::ensure!(
180            prev_block_id.shard == block_id.shard,
181            "handle shard mismatch: {} != {}",
182            prev_block_id.shard,
183            block_id.shard,
184        );
185        anyhow::ensure!(
186            prev_block_id.seqno + 1 == block_id.seqno,
187            "merkle update can only be applied to consecutive blocks: \
188            prev={prev_block_id}, next={block_id}",
189        );
190
191        if next_handle.has_state() || next_handle.has_virtual_state() {
192            return Ok(InitiatedStoreState::existing(next_handle, self));
193        }
194
195        let direct_store_step = if block_id.is_masterchain() {
196            // store every masterchain state as direct
197            NonZeroU32::MIN
198        } else {
199            self.store_shard_state_step
200        };
201
202        let mut cache = self.shard_states_cache.entry(block_id.shard).or_default();
203
204        if cache.accumulator.blocks.insert(block_id.seqno) {
205            cache.accumulator.new_cells = cache
206                .accumulator
207                .new_cells
208                .saturating_add(hint.new_cell_count());
209        }
210
211        metrics::histogram!("tycho_storage_shard_state_accumulated_new_cells")
212            .record(cache.accumulator.new_cells as f64);
213
214        let force_store = block_id.seqno.is_multiple_of(direct_store_step.get())
215            || cache.accumulator.new_cells >= self.new_cells_threshold
216            || hint.is_top_block == Some(true);
217
218        let store_type = if force_store {
219            metrics::counter!("tycho_storage_shard_state_stored").increment(1);
220
221            // New cells accumulator reset
222            cache.accumulator.reset();
223
224            StoreType::Direct
225        } else {
226            metrics::counter!("tycho_storage_shard_state_skipped").increment(1);
227            StoreType::Virtual
228        };
229
230        let spawn_cleanup = |task: PendingStoreTask| {
231            let this = Arc::downgrade(self);
232            let block_id = *block_id;
233            tokio::task::spawn(async move {
234                let (result, _) = task.await;
235
236                let Some(this) = this.upgrade() else {
237                    return;
238                };
239
240                this.shard_states_cache
241                    .get_mut(&block_id.shard)
242                    .expect("shard must not dissapear from cache")
243                    .save_result(&block_id, result);
244            });
245        };
246
247        let task = {
248            let cache = &mut *cache;
249
250            let prev_state_task = cache
251                .states
252                .get(&prev_block_id.root_hash)
253                .map(|entry| entry.state.clone());
254
255            let make_prev_task_fut = move || {
256                let Some(cached) = prev_state_task else {
257                    return self
258                        .load_prev_state_root_no_cache(
259                            next_handle.ref_by_mc_seqno(),
260                            prev_block_id,
261                            direct_store_step,
262                            get_merkle_update,
263                        )
264                        .boxed();
265                };
266
267                Box::pin(async move {
268                    match cached {
269                        CachedState::Stored(res) => Ok(res),
270                        CachedState::Failed(error) => Err(anyhow::Error::from(error)),
271                        CachedState::Pending(task) => {
272                            let (res, _) = task.await;
273                            res.map_err(anyhow::Error::from)
274                        }
275                    }
276                })
277            };
278
279            match cache.states.entry(block_id.root_hash) {
280                hash_map::Entry::Occupied(entry) => match &entry.get().state {
281                    CachedState::Pending(task) => task.clone(),
282                    CachedState::Stored(res) => {
283                        return Ok(InitiatedStoreState {
284                            handle: next_handle.clone(),
285                            pending: Some(futures_util::future::ok(res.state.clone()).boxed()),
286                            storage: self.clone(),
287                        });
288                    }
289                    CachedState::Failed(e) => return Err(e.clone().into()),
290                },
291                hash_map::Entry::Vacant(entry) => {
292                    let complete = Arc::new(AtomicBool::new(false));
293                    let is_virtual = matches!(store_type, StoreType::Virtual);
294                    let partial_root = merkle_update.new.clone();
295                    let task: PendingStoreTask = match store_type {
296                        StoreType::Direct => {
297                            let prev_task_fut;
298                            let state_root = match state {
299                                Some(state) => {
300                                    prev_task_fut = None;
301                                    DirectStoreRoot::Exact {
302                                        root: state.root_cell().clone(),
303                                        ref_mc_state_handle: state.ref_mc_state_handle().clone(),
304                                    }
305                                }
306                                None => {
307                                    prev_task_fut = Some(make_prev_task_fut());
308                                    DirectStoreRoot::Next { partial_root }
309                                }
310                            };
311
312                            let store_task = self.make_store_state_root_direct_task(
313                                next_handle.clone(),
314                                state_root,
315                                hint,
316                                complete.clone(),
317                            );
318
319                            Shared::new(Box::pin(async move {
320                                let prev = match prev_task_fut {
321                                    None => None,
322                                    Some(fut) => Some(
323                                        fut.await
324                                            .context("previous state task failed (on direct)")?,
325                                    ),
326                                };
327                                tokio::task::spawn_blocking(move || store_task(prev))
328                                    .await?
329                                    .context("direct state store failed")
330                                    .map_err(StoreStateError::from)
331                            }))
332                        }
333                        StoreType::Virtual => {
334                            let prev_task_fut = make_prev_task_fut();
335                            let store_task = self.make_store_state_root_virtual_task(
336                                next_handle.clone(),
337                                partial_root,
338                                hint,
339                                complete.clone(),
340                            );
341
342                            Shared::new(Box::pin(async move {
343                                let prev = prev_task_fut
344                                    .await
345                                    .context("previous state task failed (on virtual)")?;
346                                tokio::task::spawn_blocking(move || store_task(prev))
347                                    .await?
348                                    .context("virtual state store failed")
349                                    .map_err(StoreStateError::from)
350                            }))
351                        }
352                    };
353
354                    spawn_cleanup(task.clone());
355
356                    entry.insert(ShardStatesCacheItem {
357                        prev_block_id: *prev_block_id,
358                        block_id: block_id.as_short_id(),
359                        is_virtual,
360                        partial_root_cell: merkle_update.new.clone(),
361                        state: CachedState::Pending(task.clone()),
362                        complete,
363                    });
364
365                    task
366                }
367            }
368        };
369
370        metrics::gauge!(
371            ShardStatesCache::METRIC_CACHE_SIZE,
372            "workchain" => block_id.shard.workchain().to_string()
373        )
374        .set(clamp_u64_to_u32(cache.states.len() as _));
375
376        drop(cache);
377
378        Ok(InitiatedStoreState {
379            handle: next_handle.clone(),
380            pending: Some(Box::pin(async move {
381                let (result, _) = task.await;
382                Ok(result?.state)
383            })),
384            storage: self.clone(),
385        })
386    }
387
388    fn load_prev_state_root_no_cache(
389        &self,
390        ref_by_mc_seqno: u32,
391        block_id: &BlockId,
392        max_tail: NonZeroU32,
393        get_merkle_update: Option<Box<FnGetBlockInfoForApply>>,
394    ) -> impl Future<Output = Result<StateWithApplier>> + Send + 'static {
395        let block_id = *block_id;
396        let block_handles = self.block_handle_storage.clone();
397        let blocks = self.block_storage.clone();
398        let block_connections = self.block_connections.clone();
399        let cell_storage = self.cell_storage.clone();
400        let tracker = self.min_ref_mc_state.clone();
401
402        async move {
403            let max_tail = max_tail.get() as usize - 1;
404
405            let get_merkle_update: &FnGetBlockInfoForApply = match &get_merkle_update {
406                Some(f) => f,
407                None => &|_| None,
408            };
409
410            let mut to_apply = Vec::new();
411            let mut pivot_block_id = block_id;
412            let pivot = 'pivot: {
413                while to_apply.len() <= max_tail {
414                    let res = load_state_or_update(
415                        ref_by_mc_seqno,
416                        &pivot_block_id,
417                        &block_handles,
418                        &block_connections,
419                        &cell_storage,
420                        &tracker,
421                        get_merkle_update,
422                    )
423                    .context("failed to load state or update on first access")?;
424
425                    match res {
426                        None => break,
427                        Some(FromStorage::Virtual(f)) => {
428                            to_apply.push(f.partial_root);
429                            pivot_block_id = f.prev_block_id;
430                        }
431                        Some(FromStorage::Applied(applied)) => {
432                            break 'pivot Some(applied);
433                        }
434                    }
435
436                    // Split long executor stalls.
437                    tokio::task::yield_now().await;
438                }
439
440                None
441            };
442
443            // Build the loaded state using the pivot and updates.
444            let Some(StateWithApplier { state, applier }) = pivot else {
445                anyhow::bail!(StateNotFound(pivot_block_id.as_short_id()));
446            };
447
448            // Fast path when there are no updates.
449            if to_apply.is_empty() {
450                anyhow::ensure!(state.block_id() == &block_id, "loaded state id mismatch");
451                return Ok(StateWithApplier { state, applier });
452            }
453
454            // Full case with applies.
455            let state =
456                apply_updates_chain(&block_id, state, to_apply, applier.clone(), blocks).await?;
457
458            Ok(StateWithApplier { state, applier })
459        }
460    }
461
462    fn make_store_state_root_virtual_task(
463        &self,
464        handle: BlockHandle,
465        partial_root: Cell,
466        hint: StoreStateHint,
467        complete: Arc<AtomicBool>,
468    ) -> impl FnOnce(StateWithApplier) -> Result<StateWithApplier> + Send + 'static {
469        let block_handles = self.block_handle_storage.clone();
470
471        let complete_on_drop = scopeguard::guard(complete, |c| c.store(true, Ordering::Release));
472
473        move |StateWithApplier { applier, state }| {
474            let _guard = complete_on_drop;
475            let _hist = HistogramGuard::begin("tycho_storage_cell_virtual_store_time_high");
476
477            debug_assert_eq!(
478                applier.shard(),
479                handle.id().shard,
480                "applier must always be created for the same shard"
481            );
482            debug_assert!(
483                applier.pivot_block_seqno() < handle.id().seqno,
484                "cannot use applier for the future"
485            );
486
487            let state = state.par_make_next_state(handle.id(), partial_root, &applier)?;
488            block_handles.set_has_virtual_shard_state(&handle);
489
490            applier.add_new_virtual_cells(hint.new_cell_count());
491
492            Ok::<_, anyhow::Error>(StateWithApplier { state, applier })
493        }
494    }
495
496    fn make_store_state_root_direct_task(
497        &self,
498        handle: BlockHandle,
499        root_cell: DirectStoreRoot,
500        hint: StoreStateHint,
501        complete: Arc<AtomicBool>,
502    ) -> impl FnOnce(Option<StateWithApplier>) -> Result<StateWithApplier> + Send + 'static {
503        let cells_db = self.cells_db.clone();
504        let cell_storage = self.cell_storage.clone();
505        let block_handles = self.block_handle_storage.clone();
506        let shard_split_depth = self.shard_split_depth;
507        let counters = self.counters.clone();
508        let gc_lock = self.gc_lock.clone();
509
510        let complete_on_drop = scopeguard::guard(complete, |c| c.store(true, Ordering::Release));
511
512        move |prev| {
513            let guard = complete_on_drop;
514
515            let block_id = handle.id();
516            let tracker;
517            let root_hash = match &root_cell {
518                DirectStoreRoot::Exact {
519                    root,
520                    ref_mc_state_handle,
521                } => {
522                    tracker = ref_mc_state_handle.tracker().clone();
523                    *root.repr_hash()
524                }
525                DirectStoreRoot::Next { partial_root } => {
526                    let prev = prev
527                        .as_ref()
528                        .expect("prev must be specified when storing next direct state");
529                    tracker = prev.applier.ref_mc_state_handle().tracker().clone();
530                    *partial_root.hash(0)
531                }
532            };
533
534            let load_existing_state = || {
535                let epoch = handle.ref_by_mc_seqno();
536                let root = cell_storage.load_cell(&root_hash, epoch)?;
537                let root = Cell::from(root as Arc<_>);
538
539                track_max_epoch(epoch);
540
541                let shard_state = root
542                    .parse::<Box<ShardStateUnsplit>>()
543                    .with_context(|| format!("failed to parse existing state: {block_id}"))?;
544                let handle = tracker.insert(&shard_state);
545
546                let state = ShardStateStuff::from_state_and_root(
547                    block_id,
548                    shard_state,
549                    root,
550                    handle.clone(),
551                )?;
552
553                let applier =
554                    MerkleUpdateApplier::new(epoch, block_id, cell_storage.clone(), handle);
555
556                Ok::<_, anyhow::Error>(StateWithApplier { state, applier })
557            };
558
559            // Fast path if already exists (before a possibly long apply).
560            if handle.has_state() {
561                return load_existing_state();
562            }
563
564            // Resolve root if needed.
565            let virtual_cell_count;
566            let prev_ref_mc_state_handle;
567            let root_cell = match root_cell {
568                // We already know the state so use it.
569                DirectStoreRoot::Exact {
570                    root,
571                    ref_mc_state_handle,
572                } => {
573                    virtual_cell_count = 0;
574                    prev_ref_mc_state_handle = ref_mc_state_handle;
575                    root
576                }
577                // The most common case when we just use the applier to get the next state.
578                // Applier MUST be created for a block which was BEFORE the current.
579                DirectStoreRoot::Next { partial_root } => {
580                    let prev = prev
581                        .as_ref()
582                        .expect("prev must be specified when storing next direct state");
583                    debug_assert_eq!(
584                        prev.applier.shard(),
585                        handle.id().shard,
586                        "applier must always be created for the same shard"
587                    );
588                    debug_assert!(
589                        prev.applier.pivot_block_seqno() < handle.id().seqno,
590                        "cannot use applier for the future"
591                    );
592                    virtual_cell_count = prev.applier.new_virtual_cells();
593                    prev_ref_mc_state_handle = prev.applier.ref_mc_state_handle().clone();
594                    prev.applier
595                        .make_next_state(partial_root)
596                        .context("failed to make next direct state")?
597                }
598            };
599            drop(prev);
600
601            // Fast path if already exists (before a possibly long gc lock).
602            if handle.has_state() {
603                return load_existing_state();
604            }
605
606            // Wait for GC lock.
607            let gc_lock = {
608                let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
609                gc_lock.blocking_lock()
610            };
611
612            // Fast path if already exists (after a possibly long gc lock).
613            if handle.has_state() {
614                return load_existing_state();
615            }
616
617            // Build store cell transaction.
618            let estimated_merkle_update_size = virtual_cell_count + hint.new_cell_count();
619            let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes
620            let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
621
622            let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
623
624            let new_cell_count = if handle.is_masterchain() {
625                cell_storage.store_cell(
626                    &mut batch,
627                    root_cell.as_ref(),
628                    estimated_merkle_update_size,
629                )?
630            } else {
631                let split_at = split_shard_accounts(&root_cell, shard_split_depth)?;
632
633                cell_storage.store_cell_mt(
634                    root_cell.as_ref(),
635                    &mut batch,
636                    split_at,
637                    estimated_merkle_update_size,
638                )?
639            };
640
641            in_mem_store.finish();
642
643            batch.put_cf(
644                &cells_db.shard_states.cf(),
645                block_id.to_vec(),
646                root_hash.as_slice(),
647            );
648
649            // Apply store cell transaction.
650            let _hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
651
652            metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
653            metrics::histogram!("tycho_storage_state_update_size_bytes")
654                .record(batch.size_in_bytes() as f64);
655            metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
656                .record(estimated_update_size_bytes as f64);
657
658            let counter = if handle.is_masterchain() {
659                &counters.max_new_mc_cell_count
660            } else {
661                &counters.max_new_sc_cell_count
662            };
663            counter.fetch_max(new_cell_count, Ordering::Release);
664
665            cells_db.rocksdb().write(batch)?;
666
667            // NOTE: We can signal that the future is complete as soon as the longest
668            //       part is done. Maybe we can event signal about that before the actual
669            //       write, just after building a transaction.
670            drop(guard);
671
672            // NOTE: Cell tree is still alive, just in case couple the ref handle lifetime with it.
673            Reclaimer::instance().drop((root_cell, prev_ref_mc_state_handle));
674
675            block_handles.set_has_shard_state(&handle);
676
677            // NOTE: Ensure that GC lock is dropped only after storing the state.
678            drop(gc_lock);
679
680            // Reload state.
681            load_existing_state()
682        }
683    }
684
685    // Stores shard state and returns the hash of its root cell.
686    pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<HashBytes> {
687        self.store_state_raw_inner(block_id, boc).await
688    }
689
690    pub async fn store_state_bytes(&self, block_id: &BlockId, boc: Bytes) -> Result<HashBytes> {
691        let cursor = Cursor::new(boc);
692        self.store_state_raw_inner(block_id, cursor).await
693    }
694
695    async fn store_state_raw_inner<R>(&self, block_id: &BlockId, boc: R) -> Result<HashBytes>
696    where
697        R: std::io::Read + Send + 'static,
698    {
699        let ctx = StoreStateContext {
700            cells_db: self.cells_db.clone(),
701            cell_storage: self.cell_storage.clone(),
702            temp_file_storage: self.temp_file_storage.clone(),
703        };
704
705        let block_id = *block_id;
706
707        let gc_lock = self.gc_lock.clone().lock_owned().await;
708        tokio::task::spawn_blocking(move || {
709            // NOTE: Ensure that GC lock is captured by the spawned thread.
710            let _gc_lock = gc_lock;
711
712            ctx.store(&block_id, boc)
713        })
714        .await?
715    }
716
717    /// Loads (or computes) a state for the specified block.
718    pub fn load_state(
719        &self,
720        ref_by_mc_seqno: u32,
721        block_id: &BlockId,
722    ) -> impl Future<Output = Result<ShardStateStuff>> {
723        self.load_state_ext(ref_by_mc_seqno, block_id, Default::default(), |_| None)
724    }
725
726    /// Loads (or computes) a state for the specified block
727    /// using an optional cache of yet unsaved blocks.
728    ///
729    /// ## How does it work
730    ///
731    /// State is stored into [`CellsDb`] at least every [`store_shard_state_step`]
732    /// blocks. So when we load a state we must find some pivot state first and then
733    /// apply required intermediate merkle updates to it. We know the maximum
734    /// number of such updates so we can tell when the state is absent.
735    ///
736    /// Merkle updates are loaded either using some external `get_merkle_update`
737    /// or from the storage.
738    ///
739    /// ## Store/load operation reuse
740    ///
741    /// Saving states takes some time. So while loading a state we can
742    /// attach to the original "saving future" and use its result.
743    /// We will always attach to the "virtual" state future because
744    /// otherwise we will be doing the same work. And we attach to
745    /// "direct" states when their future is either complete, or
746    /// we are deep enough in our apply routine.
747    ///
748    /// [`store_shard_state_step`]: CoreStorageConfig::store_shard_state_step
749    pub async fn load_state_ext<F>(
750        &self,
751        ref_by_mc_seqno: u32,
752        block_id: &BlockId,
753        hint: LoadStateHint,
754        get_merkle_update: F,
755    ) -> Result<ShardStateStuff>
756    where
757        F: Fn(&BlockId) -> Option<BlockInfoForApply>,
758    {
759        fn load_failed(error: StoreStateError) -> anyhow::Error {
760            anyhow::anyhow!("unable to load a state that failed to save with error: {error:?}")
761        }
762
763        let try_load_from_storage = |block_id: &BlockId| {
764            load_state_or_update(
765                ref_by_mc_seqno,
766                block_id,
767                &self.block_handle_storage,
768                &self.block_connections,
769                &self.cell_storage,
770                &self.min_ref_mc_state,
771                &get_merkle_update,
772            )
773        };
774
775        let max_tail = self.store_shard_state_step.get() as usize;
776
777        let mut pivot_block_id = *block_id;
778        let mut to_apply = Vec::new();
779        let pivot = 'pivot: {
780            while to_apply.len() <= max_tail {
781                if let Some(cache) = self.shard_states_cache.get(&pivot_block_id.shard)
782                    && let Some(item) = cache.states.get(&pivot_block_id.root_hash)
783                {
784                    // We found a pending operation for that block id so we can try to reuse its state.
785                    match &item.state {
786                        // State is in cache and already loaded.
787                        CachedState::Stored(stored) => {
788                            break 'pivot Some(stored.clone());
789                        }
790                        // State is in cache but its store was unsuccessful.
791                        CachedState::Failed(error) => return Err(load_failed(error.clone())),
792                        // We can wait for the result if:
793                        // - We explicitly hinted to wait for the operation.
794                        // - The state is already virtual (we will be doing the same work so there
795                        //   is no need to do it twice).
796                        // - The pending task is already complete.
797                        // - There are too many pending states.
798                        // - No previous state in cache.
799                        CachedState::Pending(task)
800                            if !hint.allow_ignore_direct
801                                || item.is_virtual
802                                || item.complete.load(Ordering::Acquire)
803                                || to_apply.len() >= max_tail
804                                || !cache.states.contains_key(&item.prev_block_id.root_hash) =>
805                        {
806                            let task = task.clone();
807                            drop(cache);
808
809                            let (result, _) = task.await;
810                            break 'pivot Some(result.map_err(load_failed)?);
811                        }
812                        // Otherwise we assume that applying merkle updates will be faster.
813                        // Continue searching back for the first item with "pending virtual"
814                        // or "stored" state (or we will find something in storage).
815                        CachedState::Pending(_) => {
816                            to_apply.push(ToApply::Loaded(item.partial_root_cell.clone()));
817                            pivot_block_id = item.prev_block_id;
818                        }
819                    }
820                } else {
821                    // NOTE: `cache` must be dropped here (we rely on Rust edition 2024 behavior).
822
823                    // There was no such state in cache so we search in storage.
824                    match try_load_from_storage(&pivot_block_id)? {
825                        // No handle or provided state for this id means that we can stop here.
826                        None => break,
827                        // Only merkle update was found for this block.
828                        Some(FromStorage::Virtual(f)) => {
829                            to_apply.push(f.partial_root);
830                            pivot_block_id = f.prev_block_id;
831                        }
832                        // A directly stored state was found for this block so we
833                        // can use it as a pivot.
834                        Some(FromStorage::Applied(applied)) => {
835                            break 'pivot Some(applied);
836                        }
837                    }
838
839                    // Split long executor stalls.
840                    tokio::task::yield_now().await;
841                }
842            }
843
844            // No pivot states were found in cache or storage.
845            None
846        };
847
848        // Build the loaded state using the pivot and updates.
849        let Some(StateWithApplier { state, applier }) = pivot else {
850            anyhow::bail!(StateNotFound(pivot_block_id.as_short_id()));
851        };
852
853        // Fast path when there are no updates.
854        if to_apply.is_empty() {
855            anyhow::ensure!(state.block_id() == block_id, "loaded state id mismatch");
856            return Ok(state);
857        }
858
859        // Full case with applies.
860        apply_updates_chain(
861            block_id,
862            state,
863            to_apply,
864            applier,
865            self.block_storage.clone(),
866        )
867        .await
868    }
869
870    pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result<HashBytes> {
871        load_state_root_hash(&self.cells_db, block_id)
872    }
873
874    pub fn load_state_root_hash_opt(&self, block_id: &BlockId) -> Result<Option<HashBytes>> {
875        load_state_root_hash_opt(&self.cells_db, block_id)
876    }
877
878    #[tracing::instrument(skip(self))]
879    pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
880        // Compute recent block ids for the specified masterchain seqno
881        let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
882            tracing::warn!("recent blocks edge not found");
883            return Ok(());
884        };
885
886        let target_block_id = top_blocks.mc_block;
887        tracing::info!(%target_block_id, "started states GC");
888
889        let started_at = Instant::now();
890        let block_handle_storage = self.block_handle_storage.clone();
891        let cell_storage = self.cell_storage.clone();
892        let cells_db = self.cells_db.clone();
893        let gc_lock = self.gc_lock.clone();
894        let shard_split_depth = self.shard_split_depth;
895
896        let (removed_states, removed_cells) = tokio::task::spawn_blocking(move || {
897            let raw = cells_db.rocksdb();
898
899            // Manually get required column factory and r/w options
900            let snapshot = raw.snapshot();
901            let shard_states_cf = cells_db.shard_states.get_unbounded_cf();
902            let mut states_read_options = cells_db.shard_states.new_read_config();
903            states_read_options.set_snapshot(&snapshot);
904
905            let mut alloc = bumpalo_herd::Herd::new();
906
907            // Create iterator
908            let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
909            iter.seek_to_first();
910
911            // Iterate all states and remove outdated
912            let mut removed_states = 0usize;
913            let mut removed_cells = 0usize;
914
915            loop {
916                let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
917                let (key, value) = match iter.item() {
918                    Some(item) => item,
919                    None => match iter.status() {
920                        Ok(()) => break,
921                        Err(e) => return Err(e.into()),
922                    },
923                };
924
925                let block_id = BlockId::from_slice(key);
926                let root_hash = HashBytes::from_slice(&value[0..32]);
927
928                // Skip blocks from zero state and top blocks.
929                // NOTE: We intentionally don't skip hardforked zerostates (seqno > 0),
930                // because we don't really need to keep them. For proof checker we
931                // use zerostate proof which is stored separately, and for serving the
932                // state we use a persistent state (where we don't remove these states).
933                if block_id.seqno == 0
934                    || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
935                {
936                    iter.next();
937                    continue;
938                }
939
940                // skip block marked by SKIP_GC flag
941                if let Some(handle) = block_handle_storage.load_handle(&block_id)
942                    && handle.skip_states_gc()
943                {
944                    tracing::debug!(
945                        block_id = %block_id,
946                        "skipping states GC since it flagged by SKIP_STATES_GC"
947                    );
948                    iter.next();
949                    continue;
950                }
951
952                alloc.reset();
953
954                let guard = {
955                    let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
956                    gc_lock.blocking_lock()
957                };
958
959                let in_mem_remove =
960                    HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
961
962                let (total, mut batch) = if block_id.is_masterchain() {
963                    cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
964                } else {
965                    // NOTE: We use epoch `0` here so that cells of old states
966                    // will not be used by recent loads.
967                    let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>);
968
969                    let split_at = split_shard_accounts(&root_cell, shard_split_depth)?
970                        .into_keys()
971                        .collect::<FastHashSet<HashBytes>>();
972                    cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
973                };
974
975                in_mem_remove.finish();
976
977                batch.delete_cf(&cells_db.shard_states.get_unbounded_cf().bound(), key);
978                cells_db
979                    .raw()
980                    .rocksdb()
981                    .write_opt(batch, cells_db.cells.write_config())?;
982
983                // NOTE: Ensure that guard is dropped only after writing the batch.
984                drop(guard);
985
986                removed_cells += total;
987                tracing::debug!(removed_cells = total, %block_id);
988
989                removed_states += 1;
990                iter.next();
991
992                metrics::counter!("tycho_storage_state_gc_count").increment(1);
993                metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
994                if block_id.is_masterchain() {
995                    metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
996                }
997                tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
998            }
999
1000            Ok::<_, anyhow::Error>((removed_states, removed_cells))
1001        })
1002        .await??;
1003
1004        // Done
1005        tracing::info!(
1006            removed_states,
1007            removed_cells,
1008            block_id = %target_block_id,
1009            elapsed_sec = started_at.elapsed().as_secs_f64(),
1010            "finished states GC",
1011        );
1012        Ok(())
1013    }
1014
1015    /// Searches for an edge with the least referenced masterchain block
1016    ///
1017    /// Returns `None` if all states are recent enough
1018    pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
1019        // 0. Adjust masterchain seqno with minimal referenced masterchain state
1020        if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno()
1021            && min_ref_mc_seqno < mc_seqno
1022        {
1023            mc_seqno = min_ref_mc_seqno;
1024        }
1025
1026        let snapshot = self.cells_db.rocksdb().snapshot();
1027
1028        // 1. Find target block
1029
1030        // Find block id using states table
1031        let mc_block_id = match self
1032            .find_mc_block_id(mc_seqno, &snapshot)
1033            .context("Failed to find block id by seqno")?
1034        {
1035            Some(block_id) => block_id,
1036            None => return Ok(None),
1037        };
1038
1039        // Find block handle
1040        let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
1041            Some(handle) if handle.has_data() => handle,
1042            // Skip blocks without handle or data
1043            _ => return Ok(None),
1044        };
1045
1046        // 2. Find minimal referenced masterchain block from the target block
1047
1048        let block_data = self.block_storage.load_block_data(&handle).await?;
1049        let block_info = block_data
1050            .load_info()
1051            .context("Failed to read target block info")?;
1052
1053        // Find full min masterchain reference id
1054        let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
1055        let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
1056            Some(block_id) => block_id,
1057            None => return Ok(None),
1058        };
1059
1060        // Find block handle
1061        let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
1062            Some(handle) if handle.has_data() => handle,
1063            // Skip blocks without handle or data
1064            _ => return Ok(None),
1065        };
1066
1067        // Compute `TopBlocks` from block data
1068        self.block_storage
1069            .load_block_data(&min_ref_block_handle)
1070            .await
1071            .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
1072            .map(Some)
1073    }
1074
1075    fn find_mc_block_id(
1076        &self,
1077        mc_seqno: u32,
1078        snapshot: &rocksdb::Snapshot<'_>,
1079    ) -> Result<Option<BlockId>> {
1080        let shard_states = &self.cells_db.shard_states;
1081
1082        let mut bound = BlockId {
1083            shard: ShardIdent::MASTERCHAIN,
1084            seqno: mc_seqno,
1085            root_hash: HashBytes::ZERO,
1086            file_hash: HashBytes::ZERO,
1087        };
1088
1089        let mut readopts = shard_states.new_read_config();
1090        readopts.set_snapshot(snapshot);
1091        readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
1092        bound.seqno += 1;
1093        readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
1094
1095        let mut iter = self
1096            .cells_db
1097            .rocksdb()
1098            .raw_iterator_cf_opt(&shard_states.cf(), readopts);
1099        iter.seek_to_first();
1100
1101        Ok(iter.key().map(BlockId::from_slice))
1102    }
1103
1104    #[cfg(test)]
1105    fn contains_state(&self, block_id: &BlockId) -> Result<bool> {
1106        let shard_states = &self.cells_db.shard_states;
1107        Ok(shard_states.get(block_id.to_vec())?.is_some())
1108    }
1109}
1110
1111fn load_state_by_hash(
1112    ref_by_mc_seqno: u32,
1113    block_id: &BlockId,
1114    root_hash: &HashBytes,
1115    cell_storage: &Arc<CellStorage>,
1116    tracker: &MinRefMcStateTracker,
1117) -> Result<ShardStateStuff> {
1118    let root = cell_storage.load_cell(root_hash, ref_by_mc_seqno)?;
1119    let root = Cell::from(root as Arc<_>);
1120
1121    track_max_epoch(ref_by_mc_seqno);
1122    let shard_state = root.parse::<Box<ShardStateUnsplit>>()?;
1123    let handle = tracker.insert(&shard_state);
1124    ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle)
1125}
1126
1127fn load_state_root_hash(cells_db: &CellsDb, block_id: &BlockId) -> Result<HashBytes> {
1128    match load_state_root_hash_opt(cells_db, block_id)? {
1129        Some(hash) => Ok(hash),
1130        None => anyhow::bail!(StateNotFound(block_id.as_short_id())),
1131    }
1132}
1133
1134fn load_state_root_hash_opt(cells_db: &CellsDb, block_id: &BlockId) -> Result<Option<HashBytes>> {
1135    let Some(root) = cells_db.shard_states.get(block_id.to_vec())? else {
1136        return Ok(None);
1137    };
1138    Ok(Some(HashBytes::from_slice(&root[..32])))
1139}
1140
1141enum ToApply {
1142    Loaded(Cell),
1143    Deferred(BlockHandle),
1144}
1145
1146enum FromStorage {
1147    Applied(StateWithApplier),
1148    Virtual(VirtualBlockInfo),
1149}
1150
1151fn load_state_or_update<F>(
1152    ref_by_mc_seqno: u32,
1153    block_id: &BlockId,
1154    block_handles: &BlockHandleStorage,
1155    block_connections: &BlockConnectionStorage,
1156    cell_storage: &Arc<CellStorage>,
1157    tracker: &MinRefMcStateTracker,
1158    get_merkle_update: F,
1159) -> Result<Option<FromStorage>>
1160where
1161    F: Fn(&BlockId) -> Option<BlockInfoForApply>,
1162{
1163    if let Some(root_hash) = load_state_root_hash_opt(cell_storage.db(), block_id)? {
1164        let state =
1165            load_state_by_hash(ref_by_mc_seqno, block_id, &root_hash, cell_storage, tracker)?;
1166        let ref_mc_state_handle = state.ref_mc_state_handle().clone();
1167        return Ok(Some(FromStorage::Applied(StateWithApplier {
1168            state,
1169            applier: MerkleUpdateApplier::new(
1170                ref_by_mc_seqno,
1171                block_id,
1172                cell_storage.clone(),
1173                ref_mc_state_handle,
1174            ),
1175        })));
1176    }
1177
1178    let handle = match block_handles.load_handle(block_id) {
1179        Some(handle) => handle,
1180        None => {
1181            return Ok(get_merkle_update(block_id).map(|f| {
1182                FromStorage::Virtual(VirtualBlockInfo {
1183                    prev_block_id: f.prev_block_id,
1184                    partial_root: ToApply::Loaded(f.partial_root_cell),
1185                })
1186            }));
1187        }
1188    };
1189
1190    Ok(Some(FromStorage::Virtual(
1191        match get_merkle_update(block_id) {
1192            Some(f) => VirtualBlockInfo {
1193                prev_block_id: f.prev_block_id,
1194                partial_root: ToApply::Loaded(f.partial_root_cell),
1195            },
1196            None => {
1197                if !handle.has_data() {
1198                    return Ok(None);
1199                }
1200
1201                let Some(prev_block_id) =
1202                    block_connections.load_connection(block_id, BlockConnection::Prev1)
1203                else {
1204                    anyhow::bail!("prev block id not found for {block_id}");
1205                };
1206
1207                VirtualBlockInfo {
1208                    prev_block_id,
1209                    partial_root: ToApply::Deferred(handle),
1210                }
1211            }
1212        },
1213    )))
1214}
1215
1216async fn apply_updates_chain(
1217    block_id: &BlockId,
1218    pivot_state: ShardStateStuff,
1219    mut to_apply: Vec<ToApply>,
1220    applier: Arc<MerkleUpdateApplier>,
1221    blocks: Arc<BlockStorage>,
1222) -> Result<ShardStateStuff> {
1223    let ref_mc_state_handle = pivot_state.ref_mc_state_handle().clone();
1224    let mut pivot_root = pivot_state.root_cell().clone();
1225    drop(pivot_state);
1226
1227    let pivot_root = rayon_run(move || {
1228        while let Some(item) = to_apply.pop() {
1229            let partial_new_root = match item {
1230                ToApply::Loaded(cell) => cell,
1231                ToApply::Deferred(handle) => {
1232                    let block = blocks.blocking_load_block_data(&handle)?;
1233                    block.as_ref().load_state_update()?.new
1234                }
1235            };
1236
1237            pivot_root = applier
1238                .make_next_state(partial_new_root)
1239                .context("failed to apply next state for chain from storage")?;
1240        }
1241
1242        Ok::<_, anyhow::Error>(pivot_root)
1243    })
1244    .await?;
1245
1246    let shard_state = pivot_root.parse::<Box<ShardStateUnsplit>>()?;
1247    ShardStateStuff::from_state_and_root(block_id, shard_state, pivot_root, ref_mc_state_handle)
1248}
1249
1250pub struct InitiatedStoreState {
1251    handle: BlockHandle,
1252    pending: Option<BoxFuture<'static, Result<ShardStateStuff>>>,
1253    storage: Arc<ShardStateStorage>,
1254}
1255
1256impl InitiatedStoreState {
1257    fn existing(handle: &BlockHandle, storage: &Arc<ShardStateStorage>) -> Self {
1258        Self {
1259            handle: handle.clone(),
1260            pending: None,
1261            storage: storage.clone(),
1262        }
1263    }
1264
1265    pub fn handle(&self) -> &BlockHandle {
1266        &self.handle
1267    }
1268
1269    pub async fn wait_store_only(self) -> Result<()> {
1270        if let Some(task) = self.pending {
1271            task.await?;
1272        }
1273        Ok(())
1274    }
1275
1276    pub async fn wait_reload(self) -> Result<ShardStateStuff> {
1277        match self.pending {
1278            None => {
1279                self.storage
1280                    .load_state(self.handle.ref_by_mc_seqno(), self.handle.id())
1281                    .await
1282            }
1283            Some(pending) => pending.await,
1284        }
1285    }
1286}
1287
1288fn track_max_epoch(epoch: u32) {
1289    // NOTE: only for metrics.
1290    static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0);
1291
1292    let max_known_epoch = MAX_KNOWN_EPOCH
1293        .fetch_max(epoch, Ordering::Relaxed)
1294        .max(epoch);
1295    metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch);
1296}
1297
1298#[derive(Default)]
1299struct ShardStatesCache {
1300    pivot_block_seqno: u32,
1301    accumulator: ShardCellsAccumulator,
1302    states: FastHashMap<HashBytes, ShardStatesCacheItem>,
1303}
1304
1305#[derive(Default)]
1306struct ShardCellsAccumulator {
1307    new_cells: usize,
1308    blocks: FastHashSet<u32>,
1309}
1310
1311impl ShardCellsAccumulator {
1312    fn reset(&mut self) {
1313        self.new_cells = 0;
1314
1315        // Reuse allocation
1316        self.blocks.clear();
1317    }
1318}
1319
1320impl ShardStatesCache {
1321    const METRIC_PIVOT_SEQNO: &str = "tycho_storage_state_shard_cache_pivot_seqno";
1322    const METRIC_CACHE_SIZE: &str = "tycho_storage_state_shard_cache_size";
1323
1324    fn save_result(&mut self, block_id: &BlockId, result: StoreTaskResult) {
1325        let Some(item) = self.states.get_mut(&block_id.root_hash) else {
1326            return;
1327        };
1328
1329        match result {
1330            Ok(res) => {
1331                let pivot_block_seqno = res.applier.pivot_block_seqno();
1332                item.state = CachedState::Stored(res);
1333
1334                // Reset cache tail on each saved block.
1335                if !item.is_virtual && pivot_block_seqno > self.pivot_block_seqno {
1336                    self.pivot_block_seqno = pivot_block_seqno;
1337                    self.states
1338                        .retain(|_, item| item.block_id.seqno >= pivot_block_seqno);
1339
1340                    let labels = [("workchain", block_id.shard.workchain().to_string())];
1341                    metrics::gauge!(Self::METRIC_PIVOT_SEQNO, &labels).set(pivot_block_seqno);
1342                    metrics::gauge!(Self::METRIC_CACHE_SIZE, &labels)
1343                        .set(clamp_u64_to_u32(self.states.len() as _));
1344                }
1345            }
1346            Err(error) => {
1347                tracing::error!(%block_id, "store state failed: {error:?}");
1348                item.state = CachedState::Failed(error);
1349            }
1350        }
1351    }
1352}
1353
1354enum DirectStoreRoot {
1355    Exact {
1356        root: Cell,
1357        ref_mc_state_handle: RefMcStateHandle,
1358    },
1359    Next {
1360        partial_root: Cell,
1361    },
1362}
1363
1364struct ShardStatesCacheItem {
1365    prev_block_id: BlockId,
1366    block_id: BlockIdShort,
1367    is_virtual: bool,
1368    partial_root_cell: Cell,
1369    state: CachedState,
1370    complete: Arc<AtomicBool>,
1371}
1372
1373impl Drop for ShardStatesCacheItem {
1374    fn drop(&mut self) {
1375        Reclaimer::instance().drop(std::mem::take(&mut self.partial_root_cell));
1376    }
1377}
1378
1379#[derive(Clone)]
1380enum CachedState {
1381    Pending(PendingStoreTask),
1382    Stored(StateWithApplier),
1383    Failed(StoreStateError),
1384}
1385
1386#[derive(Clone)]
1387struct StateWithApplier {
1388    state: ShardStateStuff,
1389    applier: Arc<MerkleUpdateApplier>,
1390}
1391
1392type PendingStoreTask = Shared<BoxFuture<'static, StoreTaskResult>>;
1393type StoreTaskResult = Result<StateWithApplier, StoreStateError>;
1394
1395#[derive(Clone, thiserror::Error)]
1396#[error(transparent)]
1397struct StoreStateError(Arc<anyhow::Error>);
1398
1399impl std::fmt::Debug for StoreStateError {
1400    #[inline]
1401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1402        std::fmt::Debug::fmt(self.0.as_ref(), f)
1403    }
1404}
1405
1406impl From<anyhow::Error> for StoreStateError {
1407    fn from(value: anyhow::Error) -> Self {
1408        Self(Arc::new(value))
1409    }
1410}
1411
1412impl From<tokio::task::JoinError> for StoreStateError {
1413    fn from(value: tokio::task::JoinError) -> Self {
1414        Self(Arc::new(anyhow::anyhow!("task failed: {value}")))
1415    }
1416}
1417
1418/// A wrapper for par applier to drop a potentially huge map
1419/// with new cells on a [`Reclaimer`] thread.
1420struct MerkleUpdateApplier(ManuallyDrop<MerkleUpdateApplierInner>);
1421
1422struct MerkleUpdateApplierInner {
1423    shard: ShardIdent,
1424    pivot_block_seqno: u32,
1425    applier: ParMerkleUpdateApplier<'static, MerkleCellsProvider>,
1426    ref_mc_state_handle: RefMcStateHandle,
1427    new_virtual_cells: AtomicUsize,
1428    /// Separate atomic as a sum of all diffs.
1429    metric_total_new_cells: AtomicUsize,
1430}
1431
1432impl MerkleUpdateApplier {
1433    const METRIC_ALIVE_APPLIERS: &str = "tycho_storage_state_applier_count";
1434    const METRIC_NEW_CELL_COUNT: &str = "tycho_storage_state_applier_all_new_cell_count";
1435
1436    fn new(
1437        epoch: u32,
1438        pivot_block_id: &BlockId,
1439        cell_storage: Arc<CellStorage>,
1440        ref_mc_state_handle: RefMcStateHandle,
1441    ) -> Arc<Self> {
1442        let v = ALIVE_STATE_APPLIERS.fetch_add(1, Ordering::Relaxed) + 1;
1443        metrics::gauge!(Self::METRIC_ALIVE_APPLIERS).set(v);
1444
1445        Arc::new(Self(ManuallyDrop::new(MerkleUpdateApplierInner {
1446            shard: pivot_block_id.shard,
1447            pivot_block_seqno: pivot_block_id.seqno,
1448            applier: ParMerkleUpdateApplier {
1449                new_cells: Default::default(),
1450                total_new_cells: Default::default(),
1451                context: Cell::empty_context(),
1452                find_cell: MerkleCellsProvider {
1453                    epoch,
1454                    storage: cell_storage,
1455                },
1456                find_in_new_cells: true,
1457            },
1458            ref_mc_state_handle,
1459            new_virtual_cells: AtomicUsize::new(0),
1460            metric_total_new_cells: AtomicUsize::new(0),
1461        })))
1462    }
1463
1464    fn shard(&self) -> ShardIdent {
1465        self.0.shard
1466    }
1467
1468    fn pivot_block_seqno(&self) -> u32 {
1469        self.0.pivot_block_seqno
1470    }
1471
1472    fn add_new_virtual_cells(&self, count: usize) {
1473        self.0.new_virtual_cells.fetch_add(count, Ordering::Release);
1474    }
1475
1476    fn new_virtual_cells(&self) -> usize {
1477        self.0.new_virtual_cells.load(Ordering::Acquire)
1478    }
1479
1480    fn ref_mc_state_handle(&self) -> &RefMcStateHandle {
1481        &self.0.ref_mc_state_handle
1482    }
1483
1484    fn make_next_state(&self, partial_new_root: Cell) -> Result<Cell, tycho_types::error::Error> {
1485        if let Some(cell) = self.find_cell.find_cell(partial_new_root.hash(0)) {
1486            return Ok(cell);
1487        }
1488
1489        let new = rayon::scope(|scope| self.run(partial_new_root.as_ref(), 0, 0, Some(scope)))?;
1490        let res = new.resolve(Cell::empty_context());
1491
1492        // Multiple applies can be run simultaneously so we must ensure
1493        // that only one diff is added to metrics.
1494        let diff = self.total_new_cells.swap(0, Ordering::Relaxed);
1495        self.0
1496            .metric_total_new_cells
1497            .fetch_add(diff, Ordering::Release);
1498
1499        let v = NEW_CELL_COUNT
1500            .fetch_add(diff as u64, Ordering::Release)
1501            .saturating_add(diff as u64);
1502        metrics::gauge!(Self::METRIC_NEW_CELL_COUNT).set(clamp_u64_to_u32(v));
1503
1504        res
1505    }
1506}
1507
1508impl Drop for MerkleUpdateApplier {
1509    fn drop(&mut self) {
1510        // SAFETY: drop is called only once.
1511        let mut inner = unsafe { ManuallyDrop::take(&mut self.0) };
1512
1513        // Explicitly drop new cells dictionary using the reclaimer.
1514        // NOTE: Ensure that ref mc state handle outlives the cells map.
1515        Reclaimer::instance().drop((inner.applier.new_cells, inner.ref_mc_state_handle));
1516
1517        let v = ALIVE_STATE_APPLIERS.fetch_sub(1, Ordering::Relaxed) - 1;
1518        metrics::gauge!(Self::METRIC_ALIVE_APPLIERS).set(v);
1519
1520        let diff = *inner.metric_total_new_cells.get_mut() as u64;
1521        let v = NEW_CELL_COUNT
1522            .fetch_sub(diff, Ordering::Release)
1523            .saturating_sub(diff);
1524        metrics::gauge!(Self::METRIC_NEW_CELL_COUNT).set(clamp_u64_to_u32(v));
1525    }
1526}
1527
1528impl std::ops::Deref for MerkleUpdateApplier {
1529    type Target = ParMerkleUpdateApplier<'static, MerkleCellsProvider>;
1530
1531    #[inline]
1532    fn deref(&self) -> &Self::Target {
1533        &self.0.applier
1534    }
1535}
1536
1537fn clamp_u64_to_u32(value: u64) -> u32 {
1538    u32::try_from(value).unwrap_or(u32::MAX)
1539}
1540
1541static ALIVE_STATE_APPLIERS: AtomicU32 = AtomicU32::new(0);
1542static NEW_CELL_COUNT: AtomicU64 = AtomicU64::new(0);
1543
1544struct MerkleCellsProvider {
1545    epoch: u32,
1546    storage: Arc<CellStorage>,
1547}
1548
1549impl FindCell for MerkleCellsProvider {
1550    fn find_cell(&self, hash: &HashBytes) -> Option<Cell> {
1551        let cell = self.storage.load_cell(hash, self.epoch).ok()?;
1552        Some(Cell::from(cell as Arc<_>))
1553    }
1554}
1555
1556#[derive(Default, Debug, Clone, Copy)]
1557pub struct StoreStateHint {
1558    pub block_data_size: usize,
1559    pub new_cell_count: Option<usize>,
1560    pub is_top_block: Option<bool>,
1561}
1562
1563impl StoreStateHint {
1564    fn new_cell_count(&self) -> usize {
1565        match self.new_cell_count {
1566            None => estimate_cell_count(self.block_data_size),
1567            Some(count) => count,
1568        }
1569    }
1570}
1571
1572fn estimate_cell_count(block_data_size: usize) -> usize {
1573    // y = 3889.9821 + 14.7480 × √x
1574    // R-squared: 0.7035
1575    ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
1576}
1577
1578#[derive(Default, Debug, Clone, Copy)]
1579pub struct LoadStateHint {
1580    /// Allow applying merkle updates even when there is a pending
1581    /// direct store.
1582    pub allow_ignore_direct: bool,
1583}
1584
1585#[derive(Debug, Copy, Clone)]
1586pub struct ShardStateStorageMetrics {
1587    pub max_new_mc_cell_count: usize,
1588    pub max_new_sc_cell_count: usize,
1589}
1590
1591#[derive(Default)]
1592struct StoreStateCounters {
1593    max_new_mc_cell_count: AtomicUsize,
1594    max_new_sc_cell_count: AtomicUsize,
1595}
1596
1597#[derive(thiserror::Error, Debug)]
1598#[error("shard state not found for block: {0}")]
1599pub struct StateNotFound(pub BlockIdShort);
1600
1601pub fn split_shard_accounts(
1602    root_cell: impl AsRef<DynCell>,
1603    split_depth: u8,
1604) -> Result<FastHashMap<HashBytes, Cell>> {
1605    // Cell#0 - processed_upto
1606    // Cell#1 - accounts
1607    let shard_accounts = root_cell
1608        .as_ref()
1609        .reference_cloned(1)
1610        .context("invalid shard state")?
1611        .parse::<ShardAccounts>()
1612        .context("failed to load shard accounts")?;
1613
1614    split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
1615}
1616
1617#[derive(Debug, Clone)]
1618pub struct BlockInfoForApply {
1619    pub prev_block_id: BlockId,
1620    pub partial_root_cell: Cell,
1621}
1622
1623type FnGetBlockInfoForApply = dyn Fn(&BlockId) -> Option<BlockInfoForApply> + Send + Sync + 'static;
1624
1625struct VirtualBlockInfo {
1626    prev_block_id: BlockId,
1627    partial_root: ToApply,
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632    use anyhow::Result;
1633    use tycho_block_util::archive::WithArchiveData;
1634    use tycho_block_util::block::BlockStuff;
1635    use tycho_block_util::state::ShardStateStuff;
1636    use tycho_storage::StorageContext;
1637    use tycho_types::boc::BocRepr;
1638    use tycho_types::cell::{CellBuilder, Lazy};
1639    use tycho_types::models::{
1640        BlockExtra, BlockId, BlockInfo, McBlockExtra, ShardHashes, ShardIdent, ShardStateUnsplit,
1641    };
1642
1643    use crate::storage::{CoreStorage, CoreStorageConfig, NewBlockMeta};
1644
1645    #[tokio::test]
1646    async fn states_gc_skip_lifecycle() -> Result<()> {
1647        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1648        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1649
1650        let handles = storage.block_handle_storage();
1651        let blocks = storage.block_storage();
1652        let states = storage.shard_state_storage();
1653
1654        let target = 10u32;
1655        let prev = target - 1;
1656
1657        let top = BlockStuff::new_with(ShardIdent::MASTERCHAIN, target, |block| {
1658            let info = BlockInfo {
1659                shard: ShardIdent::MASTERCHAIN,
1660                seqno: target,
1661                min_ref_mc_seqno: target,
1662                ..Default::default()
1663            };
1664            block.info = Lazy::new(&info).unwrap();
1665
1666            let extra = BlockExtra {
1667                custom: Some(
1668                    Lazy::new(&McBlockExtra {
1669                        shards: ShardHashes::default(),
1670                        ..Default::default()
1671                    })
1672                    .unwrap(),
1673                ),
1674                ..Default::default()
1675            };
1676            block.extra = Lazy::new(&extra).unwrap();
1677        });
1678        let top_id = *top.id();
1679
1680        let data = BocRepr::encode_rayon(top.as_ref()).unwrap();
1681        let top = WithArchiveData::new(top, data);
1682
1683        let stored = blocks
1684            .store_block_data(&top, &top.archive_data, NewBlockMeta {
1685                is_key_block: false,
1686                gen_utime: 0,
1687                ref_by_mc_seqno: target,
1688            })
1689            .await?;
1690        let handle = stored.handle;
1691
1692        let prev_id = *BlockStuff::new_empty(ShardIdent::MASTERCHAIN, prev).id();
1693
1694        let make_state = |id: BlockId| -> Result<ShardStateStuff> {
1695            let state = ShardStateUnsplit {
1696                shard_ident: id.shard,
1697                seqno: id.seqno,
1698                min_ref_mc_seqno: target,
1699                ..Default::default()
1700            };
1701
1702            let root = CellBuilder::build_from(&state)?;
1703            let handle = states.min_ref_mc_state().insert_untracked();
1704            ShardStateStuff::from_state_and_root(&id, Box::new(state), root, handle)
1705        };
1706
1707        let top_state = make_state(top_id)?;
1708        states
1709            .store_state_ignore_cache(&handle, &top_state, Default::default())
1710            .await?;
1711
1712        let prev_state = make_state(prev_id)?;
1713        let (handle, _) = handles.create_or_load_handle(&prev_id, NewBlockMeta {
1714            is_key_block: false,
1715            gen_utime: 0,
1716            ref_by_mc_seqno: prev,
1717        });
1718        states
1719            .store_state_ignore_cache(&handle, &prev_state, Default::default())
1720            .await?;
1721
1722        handles.set_skip_states_gc(&handle);
1723        assert!(handle.skip_states_gc());
1724
1725        states.remove_outdated_states(target).await?;
1726        assert!(states.contains_state(&prev_id)?);
1727        assert!(states.contains_state(&top_id)?);
1728
1729        handles.set_skip_states_gc_finished(&handle);
1730        assert!(!handle.skip_states_gc());
1731
1732        states.remove_outdated_states(target).await?;
1733        assert!(!states.contains_state(&prev_id)?);
1734        assert!(states.contains_state(&top_id)?);
1735
1736        Ok(())
1737    }
1738}