tycho_core/storage/shard_state/
mod.rs

1use std::fs::File;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use bytesize::ByteSize;
8use tycho_block_util::block::*;
9use tycho_block_util::dict::split_aug_dict_raw;
10use tycho_block_util::state::*;
11use tycho_storage::fs::TempFileStorage;
12use tycho_storage::kv::StoredValue;
13use tycho_types::models::*;
14use tycho_types::prelude::*;
15use tycho_util::mem::Reclaimer;
16use tycho_util::metrics::HistogramGuard;
17use tycho_util::{FastHashMap, FastHashSet};
18use weedb::rocksdb;
19
20use self::cell_storage::*;
21use self::store_state_raw::StoreStateContext;
22use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb};
23
24mod cell_storage;
25mod entries_buffer;
26mod store_state_raw;
27
28pub struct ShardStateStorage {
29    cells_db: CellsDb,
30
31    block_handle_storage: Arc<BlockHandleStorage>,
32    block_storage: Arc<BlockStorage>,
33    cell_storage: Arc<CellStorage>,
34    temp_file_storage: TempFileStorage,
35
36    gc_lock: Arc<tokio::sync::Mutex<()>>,
37    min_ref_mc_state: MinRefMcStateTracker,
38    max_new_mc_cell_count: AtomicUsize,
39    max_new_sc_cell_count: AtomicUsize,
40
41    accounts_split_depth: u8,
42}
43
44impl ShardStateStorage {
45    // TODO: Replace args with a config.
46    pub fn new(
47        cells_db: CellsDb,
48        block_handle_storage: Arc<BlockHandleStorage>,
49        block_storage: Arc<BlockStorage>,
50        temp_file_storage: TempFileStorage,
51        cache_size_bytes: ByteSize,
52        drop_interval: u32,
53    ) -> Result<Arc<Self>> {
54        let cell_storage = CellStorage::new(cells_db.clone(), cache_size_bytes, drop_interval);
55
56        Ok(Arc::new(Self {
57            cells_db,
58            block_handle_storage,
59            block_storage,
60            temp_file_storage,
61            cell_storage,
62            gc_lock: Default::default(),
63            min_ref_mc_state: MinRefMcStateTracker::new(),
64            max_new_mc_cell_count: AtomicUsize::new(0),
65            max_new_sc_cell_count: AtomicUsize::new(0),
66            accounts_split_depth: 4,
67        }))
68    }
69
70    pub fn metrics(&self) -> ShardStateStorageMetrics {
71        ShardStateStorageMetrics {
72            max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
73            max_new_sc_cell_count: self.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
74        }
75    }
76
77    // TODO: implement metrics
78    // pub fn cache_metrics(&self) -> CacheStats {
79    // self.cell_storage.cache_stats()
80    // }
81
82    pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
83        &self.min_ref_mc_state
84    }
85
86    pub async fn store_state(
87        &self,
88        handle: &BlockHandle,
89        state: &ShardStateStuff,
90        hint: StoreStateHint,
91    ) -> Result<bool> {
92        anyhow::ensure!(
93            handle.id() == state.block_id(),
94            ShardStateStorageError::BlockHandleIdMismatch {
95                expected: state.block_id().as_short_id(),
96                actual: handle.id().as_short_id(),
97            }
98        );
99
100        self.store_state_root(handle, state.root_cell().clone(), hint)
101            .await
102    }
103
104    pub async fn store_state_root(
105        &self,
106        handle: &BlockHandle,
107        root_cell: Cell,
108        hint: StoreStateHint,
109    ) -> Result<bool> {
110        if handle.has_state() {
111            return Ok(false);
112        }
113
114        let gc_lock = {
115            let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
116            self.gc_lock.clone().lock_owned().await
117        };
118
119        // Double check if the state is already stored
120        if handle.has_state() {
121            return Ok(false);
122        }
123        let _hist = HistogramGuard::begin("tycho_storage_state_store_time");
124
125        let block_id = *handle.id();
126        let raw_db = self.cells_db.rocksdb().clone();
127        let cf = self.cells_db.shard_states.get_unbounded_cf();
128        let cell_storage = self.cell_storage.clone();
129        let block_handle_storage = self.block_handle_storage.clone();
130        let handle = handle.clone();
131        let accounts_split_depth = self.accounts_split_depth;
132
133        // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
134        let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
135            let root_hash = *root_cell.repr_hash();
136            let estimated_merkle_update_size = hint.estimate_cell_count();
137
138            let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes
139            let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
140
141            let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
142
143            let new_cell_count = if block_id.is_masterchain() {
144                cell_storage.store_cell(
145                    &mut batch,
146                    root_cell.as_ref(),
147                    estimated_merkle_update_size,
148                )?
149            } else {
150                let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?;
151
152                cell_storage.store_cell_mt(
153                    root_cell.as_ref(),
154                    &mut batch,
155                    split_at,
156                    estimated_merkle_update_size,
157                )?
158            };
159
160            in_mem_store.finish();
161            metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
162
163            batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());
164
165            let hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
166            metrics::histogram!("tycho_storage_state_update_size_bytes")
167                .record(batch.size_in_bytes() as f64);
168            metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
169                .record(estimated_update_size_bytes as f64);
170
171            raw_db.write(batch)?;
172
173            Reclaimer::instance().drop(root_cell);
174
175            hist.finish();
176
177            let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
178            if updated {
179                block_handle_storage.store_handle(&handle, false);
180            }
181
182            // NOTE: Ensure that GC lock is dropped only after storing the state.
183            drop(gc_lock);
184
185            Ok::<_, anyhow::Error>((new_cell_count, updated))
186        })
187        .await??;
188
189        let count = if block_id.shard.is_masterchain() {
190            &self.max_new_mc_cell_count
191        } else {
192            &self.max_new_sc_cell_count
193        };
194
195        count.fetch_max(new_cell_count, Ordering::Release);
196
197        Ok(updated)
198    }
199
200    // Stores shard state and returns the hash of its root cell.
201    pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<HashBytes> {
202        let ctx = StoreStateContext {
203            cells_db: self.cells_db.clone(),
204            cell_storage: self.cell_storage.clone(),
205            temp_file_storage: self.temp_file_storage.clone(),
206        };
207
208        let block_id = *block_id;
209
210        let gc_lock = self.gc_lock.clone().lock_owned().await;
211        tokio::task::spawn_blocking(move || {
212            // NOTE: Ensure that GC lock is captured by the spawned thread.
213            let _gc_lock = gc_lock;
214
215            ctx.store(&block_id, boc)
216        })
217        .await?
218    }
219
220    // NOTE: DO NOT try to make a separate `load_state_root` method
221    // since the root must be properly tracked, and this tracking requires
222    // knowing its `min_ref_mc_seqno` which can only be found out by
223    // parsing the state. Creating a "Brief State" struct won't work either
224    // because due to model complexity it is going to be error-prone.
225    pub async fn load_state(
226        &self,
227        ref_by_mc_seqno: u32,
228        block_id: &BlockId,
229    ) -> Result<ShardStateStuff> {
230        // NOTE: only for metrics.
231        static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0);
232
233        let root_hash = self.load_state_root_hash(block_id)?;
234        let root = self.cell_storage.load_cell(&root_hash, ref_by_mc_seqno)?;
235        let root = Cell::from(root as Arc<_>);
236
237        let max_known_epoch = MAX_KNOWN_EPOCH
238            .fetch_max(ref_by_mc_seqno, Ordering::Relaxed)
239            .max(ref_by_mc_seqno);
240        metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch);
241
242        let shard_state = root.parse::<Box<ShardStateUnsplit>>()?;
243        let handle = self.min_ref_mc_state.insert(&shard_state);
244        ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle)
245    }
246
247    pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result<HashBytes> {
248        let shard_states = &self.cells_db.shard_states;
249        let shard_state = shard_states.get(block_id.to_vec())?;
250        match shard_state {
251            Some(root) => Ok(HashBytes::from_slice(&root[..32])),
252            None => {
253                anyhow::bail!(ShardStateStorageError::NotFound(block_id.as_short_id()))
254            }
255        }
256    }
257
258    #[tracing::instrument(skip(self))]
259    pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
260        // Compute recent block ids for the specified masterchain seqno
261        let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
262            tracing::warn!("recent blocks edge not found");
263            return Ok(());
264        };
265
266        tracing::info!(
267            target_block_id = %top_blocks.mc_block,
268            "started states GC",
269        );
270        let started_at = Instant::now();
271
272        let raw = self.cells_db.rocksdb();
273
274        // Manually get required column factory and r/w options
275        let snapshot = raw.snapshot();
276        let shard_states_cf = self.cells_db.shard_states.get_unbounded_cf();
277        let mut states_read_options = self.cells_db.shard_states.new_read_config();
278        states_read_options.set_snapshot(&snapshot);
279
280        let mut alloc = bumpalo_herd::Herd::new();
281
282        // Create iterator
283        let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
284        iter.seek_to_first();
285
286        // Iterate all states and remove outdated
287        let mut removed_states = 0usize;
288        let mut removed_cells = 0usize;
289        loop {
290            let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
291            let (key, value) = match iter.item() {
292                Some(item) => item,
293                None => match iter.status() {
294                    Ok(()) => break,
295                    Err(e) => return Err(e.into()),
296                },
297            };
298
299            let block_id = BlockId::from_slice(key);
300            let root_hash = HashBytes::from_slice(value);
301
302            // Skip blocks from zero state and top blocks
303            if block_id.seqno == 0
304                || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
305            {
306                iter.next();
307                continue;
308            }
309
310            alloc.reset();
311
312            let guard = {
313                let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
314                self.gc_lock.clone().lock_owned().await
315            };
316
317            let db = self.cells_db.clone();
318            let cell_storage = self.cell_storage.clone();
319            let key = key.to_vec();
320            let accounts_split_depth = self.accounts_split_depth;
321            let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
322                let in_mem_remove =
323                    HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
324
325                let (stats, mut batch) = if block_id.is_masterchain() {
326                    cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
327                } else {
328                    // NOTE: We use epoch `0` here so that cells of old states
329                    // will not be used by recent loads.
330                    let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>);
331
332                    let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?
333                        .into_keys()
334                        .collect::<FastHashSet<HashBytes>>();
335                    cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
336                };
337
338                in_mem_remove.finish();
339
340                batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
341                db.raw()
342                    .rocksdb()
343                    .write_opt(batch, db.cells.write_config())?;
344
345                // NOTE: Ensure that guard is dropped only after writing the batch.
346                drop(guard);
347
348                Ok::<_, anyhow::Error>((stats, alloc))
349            })
350            .await??;
351
352            removed_cells += total;
353            alloc = inner_alloc; // Reuse allocation without passing alloc by ref
354
355            tracing::debug!(removed_cells = total, %block_id);
356
357            removed_states += 1;
358            iter.next();
359
360            metrics::counter!("tycho_storage_state_gc_count").increment(1);
361            metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
362            if block_id.is_masterchain() {
363                metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
364            }
365            tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
366        }
367
368        // Done
369        tracing::info!(
370            removed_states,
371            removed_cells,
372            block_id = %top_blocks.mc_block,
373            elapsed_sec = started_at.elapsed().as_secs_f64(),
374            "finished states GC",
375        );
376        Ok(())
377    }
378
379    /// Searches for an edge with the least referenced masterchain block
380    ///
381    /// Returns `None` if all states are recent enough
382    pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
383        // 0. Adjust masterchain seqno with minimal referenced masterchain state
384        if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno()
385            && min_ref_mc_seqno < mc_seqno
386        {
387            mc_seqno = min_ref_mc_seqno;
388        }
389
390        let snapshot = self.cells_db.rocksdb().snapshot();
391
392        // 1. Find target block
393
394        // Find block id using states table
395        let mc_block_id = match self
396            .find_mc_block_id(mc_seqno, &snapshot)
397            .context("Failed to find block id by seqno")?
398        {
399            Some(block_id) => block_id,
400            None => return Ok(None),
401        };
402
403        // Find block handle
404        let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
405            Some(handle) if handle.has_data() => handle,
406            // Skip blocks without handle or data
407            _ => return Ok(None),
408        };
409
410        // 2. Find minimal referenced masterchain block from the target block
411
412        let block_data = self.block_storage.load_block_data(&handle).await?;
413        let block_info = block_data
414            .load_info()
415            .context("Failed to read target block info")?;
416
417        // Find full min masterchain reference id
418        let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
419        let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
420            Some(block_id) => block_id,
421            None => return Ok(None),
422        };
423
424        // Find block handle
425        let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
426            Some(handle) if handle.has_data() => handle,
427            // Skip blocks without handle or data
428            _ => return Ok(None),
429        };
430
431        // Compute `TopBlocks` from block data
432        self.block_storage
433            .load_block_data(&min_ref_block_handle)
434            .await
435            .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
436            .map(Some)
437    }
438
439    fn find_mc_block_id(
440        &self,
441        mc_seqno: u32,
442        snapshot: &rocksdb::Snapshot<'_>,
443    ) -> Result<Option<BlockId>> {
444        let shard_states = &self.cells_db.shard_states;
445
446        let mut bound = BlockId {
447            shard: ShardIdent::MASTERCHAIN,
448            seqno: mc_seqno,
449            root_hash: HashBytes::ZERO,
450            file_hash: HashBytes::ZERO,
451        };
452
453        let mut readopts = shard_states.new_read_config();
454        readopts.set_snapshot(snapshot);
455        readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
456        bound.seqno += 1;
457        readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
458
459        let mut iter = self
460            .cells_db
461            .rocksdb()
462            .raw_iterator_cf_opt(&shard_states.cf(), readopts);
463        iter.seek_to_first();
464
465        Ok(iter.key().map(BlockId::from_slice))
466    }
467}
468
469#[derive(Default, Debug, Clone, Copy)]
470pub struct StoreStateHint {
471    pub block_data_size: Option<usize>,
472}
473
474impl StoreStateHint {
475    fn estimate_cell_count(&self) -> usize {
476        const MIN_BLOCK_SIZE: usize = 4 << 10; // 4 KB
477
478        let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);
479
480        // y = 3889.9821 + 14.7480 × √x
481        // R-squared: 0.7035
482        ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
483    }
484}
485
486#[derive(Debug, Copy, Clone)]
487pub struct ShardStateStorageMetrics {
488    pub max_new_mc_cell_count: usize,
489    pub max_new_sc_cell_count: usize,
490}
491
492#[derive(thiserror::Error, Debug)]
493pub enum ShardStateStorageError {
494    #[error("Shard state not found for block: {0}")]
495    NotFound(BlockIdShort),
496    #[error("Block handle id mismatch: expected {expected}, got {actual}")]
497    BlockHandleIdMismatch {
498        expected: BlockIdShort,
499        actual: BlockIdShort,
500    },
501}
502
503fn split_shard_accounts(
504    root_cell: impl AsRef<DynCell>,
505    split_depth: u8,
506) -> Result<FastHashMap<HashBytes, Cell>> {
507    // Cell#0 - processed_upto
508    // Cell#1 - accounts
509    let shard_accounts = root_cell
510        .as_ref()
511        .reference_cloned(1)
512        .context("invalid shard state")?
513        .parse::<ShardAccounts>()
514        .context("failed to load shard accounts")?;
515
516    split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
517}