Skip to main content

tycho_core/storage/shard_state/
mod.rs

1use std::fs::File;
2use std::io::Cursor;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use bytesize::ByteSize;
10use tycho_block_util::block::*;
11use tycho_block_util::dict::split_aug_dict_raw;
12use tycho_block_util::state::*;
13use tycho_storage::fs::TempFileStorage;
14use tycho_storage::kv::StoredValue;
15use tycho_types::models::*;
16use tycho_types::prelude::*;
17use tycho_util::mem::Reclaimer;
18use tycho_util::metrics::HistogramGuard;
19use tycho_util::{FastHashMap, FastHashSet};
20use weedb::rocksdb;
21
22use self::cell_storage::*;
23use self::store_state_raw::StoreStateContext;
24use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb};
25
26mod cell_storage;
27mod entries_buffer;
28mod store_state_raw;
29
30pub struct ShardStateStorage {
31    cells_db: CellsDb,
32
33    block_handle_storage: Arc<BlockHandleStorage>,
34    block_storage: Arc<BlockStorage>,
35    cell_storage: Arc<CellStorage>,
36    temp_file_storage: TempFileStorage,
37
38    gc_lock: Arc<tokio::sync::Mutex<()>>,
39    min_ref_mc_state: MinRefMcStateTracker,
40    max_new_mc_cell_count: AtomicUsize,
41    max_new_sc_cell_count: AtomicUsize,
42
43    accounts_split_depth: u8,
44}
45
46impl ShardStateStorage {
47    // TODO: Replace args with a config.
48    pub fn new(
49        cells_db: CellsDb,
50        block_handle_storage: Arc<BlockHandleStorage>,
51        block_storage: Arc<BlockStorage>,
52        temp_file_storage: TempFileStorage,
53        cache_size_bytes: ByteSize,
54        drop_interval: u32,
55    ) -> Result<Arc<Self>> {
56        let cell_storage = CellStorage::new(cells_db.clone(), cache_size_bytes, drop_interval);
57
58        Ok(Arc::new(Self {
59            cells_db,
60            block_handle_storage,
61            block_storage,
62            temp_file_storage,
63            cell_storage,
64            gc_lock: Default::default(),
65            min_ref_mc_state: MinRefMcStateTracker::new(),
66            max_new_mc_cell_count: AtomicUsize::new(0),
67            max_new_sc_cell_count: AtomicUsize::new(0),
68            accounts_split_depth: 4,
69        }))
70    }
71
72    pub fn metrics(&self) -> ShardStateStorageMetrics {
73        ShardStateStorageMetrics {
74            max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
75            max_new_sc_cell_count: self.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
76        }
77    }
78
79    // TODO: implement metrics
80    // pub fn cache_metrics(&self) -> CacheStats {
81    // self.cell_storage.cache_stats()
82    // }
83
84    pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
85        &self.min_ref_mc_state
86    }
87
88    pub fn cell_storage(&self) -> &Arc<CellStorage> {
89        &self.cell_storage
90    }
91
92    /// Find mc block id from db snapshot
93    pub fn load_mc_block_id(&self, seqno: u32) -> Result<Option<BlockId>> {
94        let snapshot = self.cells_db.rocksdb().snapshot();
95        self.find_mc_block_id(seqno, &snapshot)
96    }
97
98    pub async fn store_state(
99        &self,
100        handle: &BlockHandle,
101        state: &ShardStateStuff,
102        hint: StoreStateHint,
103    ) -> Result<bool> {
104        anyhow::ensure!(
105            handle.id() == state.block_id(),
106            ShardStateStorageError::BlockHandleIdMismatch {
107                expected: state.block_id().as_short_id(),
108                actual: handle.id().as_short_id(),
109            }
110        );
111
112        self.store_state_root(handle, state.root_cell().clone(), hint)
113            .await
114    }
115
116    pub async fn store_state_root(
117        &self,
118        handle: &BlockHandle,
119        root_cell: Cell,
120        hint: StoreStateHint,
121    ) -> Result<bool> {
122        if handle.has_state() {
123            return Ok(false);
124        }
125
126        let gc_lock = {
127            let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
128            self.gc_lock.clone().lock_owned().await
129        };
130
131        // Double check if the state is already stored
132        if handle.has_state() {
133            return Ok(false);
134        }
135        let _hist = HistogramGuard::begin("tycho_storage_state_store_time");
136
137        let block_id = *handle.id();
138        let raw_db = self.cells_db.rocksdb().clone();
139        let cf = self.cells_db.shard_states.get_unbounded_cf();
140        let cell_storage = self.cell_storage.clone();
141        let block_handle_storage = self.block_handle_storage.clone();
142        let handle = handle.clone();
143        let accounts_split_depth = self.accounts_split_depth;
144
145        // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
146        let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
147            let root_hash = *root_cell.repr_hash();
148            let estimated_merkle_update_size = hint.estimate_cell_count();
149
150            let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes
151            let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
152
153            let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
154
155            let new_cell_count = if block_id.is_masterchain() {
156                cell_storage.store_cell(
157                    &mut batch,
158                    root_cell.as_ref(),
159                    estimated_merkle_update_size,
160                )?
161            } else {
162                let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?;
163
164                cell_storage.store_cell_mt(
165                    root_cell.as_ref(),
166                    &mut batch,
167                    split_at,
168                    estimated_merkle_update_size,
169                )?
170            };
171
172            in_mem_store.finish();
173            metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
174
175            batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());
176
177            let hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
178            metrics::histogram!("tycho_storage_state_update_size_bytes")
179                .record(batch.size_in_bytes() as f64);
180            metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
181                .record(estimated_update_size_bytes as f64);
182
183            raw_db.write(batch)?;
184
185            Reclaimer::instance().drop(root_cell);
186
187            hist.finish();
188
189            let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
190            if updated {
191                block_handle_storage.store_handle(&handle, false);
192            }
193
194            // NOTE: Ensure that GC lock is dropped only after storing the state.
195            drop(gc_lock);
196
197            Ok::<_, anyhow::Error>((new_cell_count, updated))
198        })
199        .await??;
200
201        let count = if block_id.shard.is_masterchain() {
202            &self.max_new_mc_cell_count
203        } else {
204            &self.max_new_sc_cell_count
205        };
206
207        count.fetch_max(new_cell_count, Ordering::Release);
208
209        Ok(updated)
210    }
211
212    async fn store_state_inner<R>(&self, block_id: &BlockId, boc: R) -> Result<HashBytes>
213    where
214        R: std::io::Read + Send + 'static,
215    {
216        let ctx = StoreStateContext {
217            cells_db: self.cells_db.clone(),
218            cell_storage: self.cell_storage.clone(),
219            temp_file_storage: self.temp_file_storage.clone(),
220        };
221
222        let block_id = *block_id;
223
224        let gc_lock = self.gc_lock.clone().lock_owned().await;
225        tokio::task::spawn_blocking(move || {
226            // NOTE: Ensure that GC lock is captured by the spawned thread.
227            let _gc_lock = gc_lock;
228
229            ctx.store(&block_id, boc)
230        })
231        .await?
232    }
233
234    // Stores shard state and returns the hash of its root cell.
235    pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<HashBytes> {
236        self.store_state_inner(block_id, boc).await
237    }
238
239    pub async fn store_state_bytes(&self, block_id: &BlockId, boc: Bytes) -> Result<HashBytes> {
240        let cursor = Cursor::new(boc);
241        self.store_state_inner(block_id, cursor).await
242    }
243
244    // NOTE: DO NOT try to make a separate `load_state_root` method
245    // since the root must be properly tracked, and this tracking requires
246    // knowing its `min_ref_mc_seqno` which can only be found out by
247    // parsing the state. Creating a "Brief State" struct won't work either
248    // because due to model complexity it is going to be error-prone.
249    pub async fn load_state(
250        &self,
251        ref_by_mc_seqno: u32,
252        block_id: &BlockId,
253    ) -> Result<ShardStateStuff> {
254        // NOTE: only for metrics.
255        static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0);
256
257        let root_hash = self.load_state_root_hash(block_id)?;
258        let root = self.cell_storage.load_cell(&root_hash, ref_by_mc_seqno)?;
259        let root = Cell::from(root as Arc<_>);
260
261        let max_known_epoch = MAX_KNOWN_EPOCH
262            .fetch_max(ref_by_mc_seqno, Ordering::Relaxed)
263            .max(ref_by_mc_seqno);
264        metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch);
265
266        let shard_state = root.parse::<Box<ShardStateUnsplit>>()?;
267        let handle = self.min_ref_mc_state.insert(&shard_state);
268        ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle)
269    }
270
271    pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result<HashBytes> {
272        let shard_states = &self.cells_db.shard_states;
273        let shard_state = shard_states.get(block_id.to_vec())?;
274        match shard_state {
275            Some(root) => Ok(HashBytes::from_slice(&root[..32])),
276            None => {
277                anyhow::bail!(ShardStateStorageError::NotFound(block_id.as_short_id()))
278            }
279        }
280    }
281
282    #[tracing::instrument(skip(self))]
283    pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
284        // Compute recent block ids for the specified masterchain seqno
285        let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
286            tracing::warn!("recent blocks edge not found");
287            return Ok(());
288        };
289
290        tracing::info!(
291            target_block_id = %top_blocks.mc_block,
292            "started states GC",
293        );
294        let started_at = Instant::now();
295
296        let raw = self.cells_db.rocksdb();
297
298        // Manually get required column factory and r/w options
299        let snapshot = raw.snapshot();
300        let shard_states_cf = self.cells_db.shard_states.get_unbounded_cf();
301        let mut states_read_options = self.cells_db.shard_states.new_read_config();
302        states_read_options.set_snapshot(&snapshot);
303
304        let mut alloc = bumpalo_herd::Herd::new();
305
306        // Create iterator
307        let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
308        iter.seek_to_first();
309
310        // Iterate all states and remove outdated
311        let mut removed_states = 0usize;
312        let mut removed_cells = 0usize;
313        loop {
314            let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
315            let (key, value) = match iter.item() {
316                Some(item) => item,
317                None => match iter.status() {
318                    Ok(()) => break,
319                    Err(e) => return Err(e.into()),
320                },
321            };
322
323            let block_id = BlockId::from_slice(key);
324            let root_hash = HashBytes::from_slice(&value[0..32]);
325
326            // Skip blocks from zero state and top blocks.
327            // NOTE: We intentionally don't skip hardforked zerostates (seqno > 0),
328            // because we don't really need to keep them. For proof checker we
329            // use zerostate proof which is stored separately, and for serving the
330            // state we use a persistent state (where we don't remove these states).
331            if block_id.seqno == 0
332                || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
333            {
334                iter.next();
335                continue;
336            }
337
338            alloc.reset();
339
340            let guard = {
341                let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
342                self.gc_lock.clone().lock_owned().await
343            };
344
345            let db = self.cells_db.clone();
346            let cell_storage = self.cell_storage.clone();
347            let key = key.to_vec();
348            let accounts_split_depth = self.accounts_split_depth;
349            let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
350                let in_mem_remove =
351                    HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
352
353                let (stats, mut batch) = if block_id.is_masterchain() {
354                    cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
355                } else {
356                    // NOTE: We use epoch `0` here so that cells of old states
357                    // will not be used by recent loads.
358                    let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>);
359
360                    let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?
361                        .into_keys()
362                        .collect::<FastHashSet<HashBytes>>();
363                    cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
364                };
365
366                in_mem_remove.finish();
367
368                batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
369                db.raw()
370                    .rocksdb()
371                    .write_opt(batch, db.cells.write_config())?;
372
373                // NOTE: Ensure that guard is dropped only after writing the batch.
374                drop(guard);
375
376                Ok::<_, anyhow::Error>((stats, alloc))
377            })
378            .await??;
379
380            removed_cells += total;
381            alloc = inner_alloc; // Reuse allocation without passing alloc by ref
382
383            tracing::debug!(removed_cells = total, %block_id);
384
385            removed_states += 1;
386            iter.next();
387
388            metrics::counter!("tycho_storage_state_gc_count").increment(1);
389            metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
390            if block_id.is_masterchain() {
391                metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
392            }
393            tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
394        }
395
396        // Done
397        tracing::info!(
398            removed_states,
399            removed_cells,
400            block_id = %top_blocks.mc_block,
401            elapsed_sec = started_at.elapsed().as_secs_f64(),
402            "finished states GC",
403        );
404        Ok(())
405    }
406
407    /// Searches for an edge with the least referenced masterchain block
408    ///
409    /// Returns `None` if all states are recent enough
410    pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
411        // 0. Adjust masterchain seqno with minimal referenced masterchain state
412        if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno()
413            && min_ref_mc_seqno < mc_seqno
414        {
415            mc_seqno = min_ref_mc_seqno;
416        }
417
418        let snapshot = self.cells_db.rocksdb().snapshot();
419
420        // 1. Find target block
421
422        // Find block id using states table
423        let mc_block_id = match self
424            .find_mc_block_id(mc_seqno, &snapshot)
425            .context("Failed to find block id by seqno")?
426        {
427            Some(block_id) => block_id,
428            None => return Ok(None),
429        };
430
431        // Find block handle
432        let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
433            Some(handle) if handle.has_data() => handle,
434            // Skip blocks without handle or data
435            _ => return Ok(None),
436        };
437
438        // 2. Find minimal referenced masterchain block from the target block
439
440        let block_data = self.block_storage.load_block_data(&handle).await?;
441        let block_info = block_data
442            .load_info()
443            .context("Failed to read target block info")?;
444
445        // Find full min masterchain reference id
446        let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
447        let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
448            Some(block_id) => block_id,
449            None => return Ok(None),
450        };
451
452        // Find block handle
453        let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
454            Some(handle) if handle.has_data() => handle,
455            // Skip blocks without handle or data
456            _ => return Ok(None),
457        };
458
459        // Compute `TopBlocks` from block data
460        self.block_storage
461            .load_block_data(&min_ref_block_handle)
462            .await
463            .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
464            .map(Some)
465    }
466
467    fn find_mc_block_id(
468        &self,
469        mc_seqno: u32,
470        snapshot: &rocksdb::Snapshot<'_>,
471    ) -> Result<Option<BlockId>> {
472        let shard_states = &self.cells_db.shard_states;
473
474        let mut bound = BlockId {
475            shard: ShardIdent::MASTERCHAIN,
476            seqno: mc_seqno,
477            root_hash: HashBytes::ZERO,
478            file_hash: HashBytes::ZERO,
479        };
480
481        let mut readopts = shard_states.new_read_config();
482        readopts.set_snapshot(snapshot);
483        readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
484        bound.seqno += 1;
485        readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
486
487        let mut iter = self
488            .cells_db
489            .rocksdb()
490            .raw_iterator_cf_opt(&shard_states.cf(), readopts);
491        iter.seek_to_first();
492
493        Ok(iter.key().map(BlockId::from_slice))
494    }
495}
496
497#[derive(Default, Debug, Clone, Copy)]
498pub struct StoreStateHint {
499    pub block_data_size: Option<usize>,
500}
501
502impl StoreStateHint {
503    fn estimate_cell_count(&self) -> usize {
504        const MIN_BLOCK_SIZE: usize = 4 << 10; // 4 KB
505
506        let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);
507
508        // y = 3889.9821 + 14.7480 × √x
509        // R-squared: 0.7035
510        ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
511    }
512}
513
514#[derive(Debug, Copy, Clone)]
515pub struct ShardStateStorageMetrics {
516    pub max_new_mc_cell_count: usize,
517    pub max_new_sc_cell_count: usize,
518}
519
520#[derive(thiserror::Error, Debug)]
521pub enum ShardStateStorageError {
522    #[error("Shard state not found for block: {0}")]
523    NotFound(BlockIdShort),
524    #[error("Block handle id mismatch: expected {expected}, got {actual}")]
525    BlockHandleIdMismatch {
526        expected: BlockIdShort,
527        actual: BlockIdShort,
528    },
529}
530
531pub fn split_shard_accounts(
532    root_cell: impl AsRef<DynCell>,
533    split_depth: u8,
534) -> Result<FastHashMap<HashBytes, Cell>> {
535    // Cell#0 - processed_upto
536    // Cell#1 - accounts
537    let shard_accounts = root_cell
538        .as_ref()
539        .reference_cloned(1)
540        .context("invalid shard state")?
541        .parse::<ShardAccounts>()
542        .context("failed to load shard accounts")?;
543
544    split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
545}