tycho_core/storage/shard_state/
mod.rs

1use std::fs::File;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use bytesize::ByteSize;
8use tycho_block_util::block::*;
9use tycho_block_util::dict::split_aug_dict_raw;
10use tycho_block_util::state::*;
11use tycho_storage::fs::TempFileStorage;
12use tycho_storage::kv::StoredValue;
13use tycho_types::models::*;
14use tycho_types::prelude::*;
15use tycho_util::mem::Reclaimer;
16use tycho_util::metrics::HistogramGuard;
17use tycho_util::{FastHashMap, FastHashSet};
18use weedb::rocksdb;
19
20use self::cell_storage::*;
21use self::store_state_raw::StoreStateContext;
22use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb};
23
24mod cell_storage;
25mod entries_buffer;
26mod store_state_raw;
27
28pub struct ShardStateStorage {
29    cells_db: CellsDb,
30
31    block_handle_storage: Arc<BlockHandleStorage>,
32    block_storage: Arc<BlockStorage>,
33    cell_storage: Arc<CellStorage>,
34    temp_file_storage: TempFileStorage,
35
36    gc_lock: Arc<tokio::sync::Mutex<()>>,
37    min_ref_mc_state: MinRefMcStateTracker,
38    max_new_mc_cell_count: AtomicUsize,
39    max_new_sc_cell_count: AtomicUsize,
40
41    accounts_split_depth: u8,
42}
43
44impl ShardStateStorage {
45    // TODO: Replace args with a config.
46    pub fn new(
47        cells_db: CellsDb,
48        block_handle_storage: Arc<BlockHandleStorage>,
49        block_storage: Arc<BlockStorage>,
50        temp_file_storage: TempFileStorage,
51        cache_size_bytes: ByteSize,
52        drop_interval: u32,
53    ) -> Result<Arc<Self>> {
54        let cell_storage = CellStorage::new(cells_db.clone(), cache_size_bytes, drop_interval);
55
56        Ok(Arc::new(Self {
57            cells_db,
58            block_handle_storage,
59            block_storage,
60            temp_file_storage,
61            cell_storage,
62            gc_lock: Default::default(),
63            min_ref_mc_state: MinRefMcStateTracker::new(),
64            max_new_mc_cell_count: AtomicUsize::new(0),
65            max_new_sc_cell_count: AtomicUsize::new(0),
66            accounts_split_depth: 4,
67        }))
68    }
69
70    pub fn metrics(&self) -> ShardStateStorageMetrics {
71        ShardStateStorageMetrics {
72            max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
73            max_new_sc_cell_count: self.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
74        }
75    }
76
77    // TODO: implement metrics
78    // pub fn cache_metrics(&self) -> CacheStats {
79    // self.cell_storage.cache_stats()
80    // }
81
82    pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
83        &self.min_ref_mc_state
84    }
85
86    pub fn cell_storage(&self) -> &Arc<CellStorage> {
87        &self.cell_storage
88    }
89
90    pub async fn store_state(
91        &self,
92        handle: &BlockHandle,
93        state: &ShardStateStuff,
94        hint: StoreStateHint,
95    ) -> Result<bool> {
96        anyhow::ensure!(
97            handle.id() == state.block_id(),
98            ShardStateStorageError::BlockHandleIdMismatch {
99                expected: state.block_id().as_short_id(),
100                actual: handle.id().as_short_id(),
101            }
102        );
103
104        self.store_state_root(handle, state.root_cell().clone(), hint)
105            .await
106    }
107
108    pub async fn store_state_root(
109        &self,
110        handle: &BlockHandle,
111        root_cell: Cell,
112        hint: StoreStateHint,
113    ) -> Result<bool> {
114        if handle.has_state() {
115            return Ok(false);
116        }
117
118        let gc_lock = {
119            let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
120            self.gc_lock.clone().lock_owned().await
121        };
122
123        // Double check if the state is already stored
124        if handle.has_state() {
125            return Ok(false);
126        }
127        let _hist = HistogramGuard::begin("tycho_storage_state_store_time");
128
129        let block_id = *handle.id();
130        let raw_db = self.cells_db.rocksdb().clone();
131        let cf = self.cells_db.shard_states.get_unbounded_cf();
132        let cell_storage = self.cell_storage.clone();
133        let block_handle_storage = self.block_handle_storage.clone();
134        let handle = handle.clone();
135        let accounts_split_depth = self.accounts_split_depth;
136
137        // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
138        let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
139            let root_hash = *root_cell.repr_hash();
140            let estimated_merkle_update_size = hint.estimate_cell_count();
141
142            let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes
143            let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
144
145            let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
146
147            let new_cell_count = if block_id.is_masterchain() {
148                cell_storage.store_cell(
149                    &mut batch,
150                    root_cell.as_ref(),
151                    estimated_merkle_update_size,
152                )?
153            } else {
154                let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?;
155
156                cell_storage.store_cell_mt(
157                    root_cell.as_ref(),
158                    &mut batch,
159                    split_at,
160                    estimated_merkle_update_size,
161                )?
162            };
163
164            in_mem_store.finish();
165            metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
166
167            batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());
168
169            let hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
170            metrics::histogram!("tycho_storage_state_update_size_bytes")
171                .record(batch.size_in_bytes() as f64);
172            metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
173                .record(estimated_update_size_bytes as f64);
174
175            raw_db.write(batch)?;
176
177            Reclaimer::instance().drop(root_cell);
178
179            hist.finish();
180
181            let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
182            if updated {
183                block_handle_storage.store_handle(&handle, false);
184            }
185
186            // NOTE: Ensure that GC lock is dropped only after storing the state.
187            drop(gc_lock);
188
189            Ok::<_, anyhow::Error>((new_cell_count, updated))
190        })
191        .await??;
192
193        let count = if block_id.shard.is_masterchain() {
194            &self.max_new_mc_cell_count
195        } else {
196            &self.max_new_sc_cell_count
197        };
198
199        count.fetch_max(new_cell_count, Ordering::Release);
200
201        Ok(updated)
202    }
203
204    // Stores shard state and returns the hash of its root cell.
205    pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<HashBytes> {
206        let ctx = StoreStateContext {
207            cells_db: self.cells_db.clone(),
208            cell_storage: self.cell_storage.clone(),
209            temp_file_storage: self.temp_file_storage.clone(),
210        };
211
212        let block_id = *block_id;
213
214        let gc_lock = self.gc_lock.clone().lock_owned().await;
215        tokio::task::spawn_blocking(move || {
216            // NOTE: Ensure that GC lock is captured by the spawned thread.
217            let _gc_lock = gc_lock;
218
219            ctx.store(&block_id, boc)
220        })
221        .await?
222    }
223
224    // NOTE: DO NOT try to make a separate `load_state_root` method
225    // since the root must be properly tracked, and this tracking requires
226    // knowing its `min_ref_mc_seqno` which can only be found out by
227    // parsing the state. Creating a "Brief State" struct won't work either
228    // because due to model complexity it is going to be error-prone.
229    pub async fn load_state(
230        &self,
231        ref_by_mc_seqno: u32,
232        block_id: &BlockId,
233    ) -> Result<ShardStateStuff> {
234        // NOTE: only for metrics.
235        static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0);
236
237        let root_hash = self.load_state_root_hash(block_id)?;
238        let root = self.cell_storage.load_cell(&root_hash, ref_by_mc_seqno)?;
239        let root = Cell::from(root as Arc<_>);
240
241        let max_known_epoch = MAX_KNOWN_EPOCH
242            .fetch_max(ref_by_mc_seqno, Ordering::Relaxed)
243            .max(ref_by_mc_seqno);
244        metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch);
245
246        let shard_state = root.parse::<Box<ShardStateUnsplit>>()?;
247        let handle = self.min_ref_mc_state.insert(&shard_state);
248        ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle)
249    }
250
251    pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result<HashBytes> {
252        let shard_states = &self.cells_db.shard_states;
253        let shard_state = shard_states.get(block_id.to_vec())?;
254        match shard_state {
255            Some(root) => Ok(HashBytes::from_slice(&root[..32])),
256            None => {
257                anyhow::bail!(ShardStateStorageError::NotFound(block_id.as_short_id()))
258            }
259        }
260    }
261
262    #[tracing::instrument(skip(self))]
263    pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
264        // Compute recent block ids for the specified masterchain seqno
265        let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
266            tracing::warn!("recent blocks edge not found");
267            return Ok(());
268        };
269
270        tracing::info!(
271            target_block_id = %top_blocks.mc_block,
272            "started states GC",
273        );
274        let started_at = Instant::now();
275
276        let raw = self.cells_db.rocksdb();
277
278        // Manually get required column factory and r/w options
279        let snapshot = raw.snapshot();
280        let shard_states_cf = self.cells_db.shard_states.get_unbounded_cf();
281        let mut states_read_options = self.cells_db.shard_states.new_read_config();
282        states_read_options.set_snapshot(&snapshot);
283
284        let mut alloc = bumpalo_herd::Herd::new();
285
286        // Create iterator
287        let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
288        iter.seek_to_first();
289
290        // Iterate all states and remove outdated
291        let mut removed_states = 0usize;
292        let mut removed_cells = 0usize;
293        loop {
294            let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
295            let (key, value) = match iter.item() {
296                Some(item) => item,
297                None => match iter.status() {
298                    Ok(()) => break,
299                    Err(e) => return Err(e.into()),
300                },
301            };
302
303            let block_id = BlockId::from_slice(key);
304            let root_hash = HashBytes::from_slice(value);
305
306            // Skip blocks from zero state and top blocks
307            if block_id.seqno == 0
308                || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
309            {
310                iter.next();
311                continue;
312            }
313
314            alloc.reset();
315
316            let guard = {
317                let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
318                self.gc_lock.clone().lock_owned().await
319            };
320
321            let db = self.cells_db.clone();
322            let cell_storage = self.cell_storage.clone();
323            let key = key.to_vec();
324            let accounts_split_depth = self.accounts_split_depth;
325            let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
326                let in_mem_remove =
327                    HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
328
329                let (stats, mut batch) = if block_id.is_masterchain() {
330                    cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
331                } else {
332                    // NOTE: We use epoch `0` here so that cells of old states
333                    // will not be used by recent loads.
334                    let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>);
335
336                    let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?
337                        .into_keys()
338                        .collect::<FastHashSet<HashBytes>>();
339                    cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
340                };
341
342                in_mem_remove.finish();
343
344                batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
345                db.raw()
346                    .rocksdb()
347                    .write_opt(batch, db.cells.write_config())?;
348
349                // NOTE: Ensure that guard is dropped only after writing the batch.
350                drop(guard);
351
352                Ok::<_, anyhow::Error>((stats, alloc))
353            })
354            .await??;
355
356            removed_cells += total;
357            alloc = inner_alloc; // Reuse allocation without passing alloc by ref
358
359            tracing::debug!(removed_cells = total, %block_id);
360
361            removed_states += 1;
362            iter.next();
363
364            metrics::counter!("tycho_storage_state_gc_count").increment(1);
365            metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
366            if block_id.is_masterchain() {
367                metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
368            }
369            tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
370        }
371
372        // Done
373        tracing::info!(
374            removed_states,
375            removed_cells,
376            block_id = %top_blocks.mc_block,
377            elapsed_sec = started_at.elapsed().as_secs_f64(),
378            "finished states GC",
379        );
380        Ok(())
381    }
382
383    /// Searches for an edge with the least referenced masterchain block
384    ///
385    /// Returns `None` if all states are recent enough
386    pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
387        // 0. Adjust masterchain seqno with minimal referenced masterchain state
388        if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno()
389            && min_ref_mc_seqno < mc_seqno
390        {
391            mc_seqno = min_ref_mc_seqno;
392        }
393
394        let snapshot = self.cells_db.rocksdb().snapshot();
395
396        // 1. Find target block
397
398        // Find block id using states table
399        let mc_block_id = match self
400            .find_mc_block_id(mc_seqno, &snapshot)
401            .context("Failed to find block id by seqno")?
402        {
403            Some(block_id) => block_id,
404            None => return Ok(None),
405        };
406
407        // Find block handle
408        let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
409            Some(handle) if handle.has_data() => handle,
410            // Skip blocks without handle or data
411            _ => return Ok(None),
412        };
413
414        // 2. Find minimal referenced masterchain block from the target block
415
416        let block_data = self.block_storage.load_block_data(&handle).await?;
417        let block_info = block_data
418            .load_info()
419            .context("Failed to read target block info")?;
420
421        // Find full min masterchain reference id
422        let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
423        let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
424            Some(block_id) => block_id,
425            None => return Ok(None),
426        };
427
428        // Find block handle
429        let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
430            Some(handle) if handle.has_data() => handle,
431            // Skip blocks without handle or data
432            _ => return Ok(None),
433        };
434
435        // Compute `TopBlocks` from block data
436        self.block_storage
437            .load_block_data(&min_ref_block_handle)
438            .await
439            .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
440            .map(Some)
441    }
442
443    fn find_mc_block_id(
444        &self,
445        mc_seqno: u32,
446        snapshot: &rocksdb::Snapshot<'_>,
447    ) -> Result<Option<BlockId>> {
448        let shard_states = &self.cells_db.shard_states;
449
450        let mut bound = BlockId {
451            shard: ShardIdent::MASTERCHAIN,
452            seqno: mc_seqno,
453            root_hash: HashBytes::ZERO,
454            file_hash: HashBytes::ZERO,
455        };
456
457        let mut readopts = shard_states.new_read_config();
458        readopts.set_snapshot(snapshot);
459        readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
460        bound.seqno += 1;
461        readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
462
463        let mut iter = self
464            .cells_db
465            .rocksdb()
466            .raw_iterator_cf_opt(&shard_states.cf(), readopts);
467        iter.seek_to_first();
468
469        Ok(iter.key().map(BlockId::from_slice))
470    }
471}
472
473#[derive(Default, Debug, Clone, Copy)]
474pub struct StoreStateHint {
475    pub block_data_size: Option<usize>,
476}
477
478impl StoreStateHint {
479    fn estimate_cell_count(&self) -> usize {
480        const MIN_BLOCK_SIZE: usize = 4 << 10; // 4 KB
481
482        let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);
483
484        // y = 3889.9821 + 14.7480 × √x
485        // R-squared: 0.7035
486        ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
487    }
488}
489
490#[derive(Debug, Copy, Clone)]
491pub struct ShardStateStorageMetrics {
492    pub max_new_mc_cell_count: usize,
493    pub max_new_sc_cell_count: usize,
494}
495
496#[derive(thiserror::Error, Debug)]
497pub enum ShardStateStorageError {
498    #[error("Shard state not found for block: {0}")]
499    NotFound(BlockIdShort),
500    #[error("Block handle id mismatch: expected {expected}, got {actual}")]
501    BlockHandleIdMismatch {
502        expected: BlockIdShort,
503        actual: BlockIdShort,
504    },
505}
506
507pub fn split_shard_accounts(
508    root_cell: impl AsRef<DynCell>,
509    split_depth: u8,
510) -> Result<FastHashMap<HashBytes, Cell>> {
511    // Cell#0 - processed_upto
512    // Cell#1 - accounts
513    let shard_accounts = root_cell
514        .as_ref()
515        .reference_cloned(1)
516        .context("invalid shard state")?
517        .parse::<ShardAccounts>()
518        .context("failed to load shard accounts")?;
519
520    split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
521}