tycho_core/storage/shard_state/
mod.rs

1use std::fs::File;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use bytesize::ByteSize;
8use tycho_block_util::block::*;
9use tycho_block_util::dict::split_aug_dict_raw;
10use tycho_block_util::state::*;
11use tycho_storage::fs::TempFileStorage;
12use tycho_storage::kv::StoredValue;
13use tycho_types::models::*;
14use tycho_types::prelude::*;
15use tycho_util::metrics::HistogramGuard;
16use tycho_util::{FastHashMap, FastHashSet};
17use weedb::rocksdb;
18
19use self::cell_storage::*;
20use self::store_state_raw::StoreStateContext;
21use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CoreDb};
22
23mod cell_storage;
24mod entries_buffer;
25mod store_state_raw;
26
27pub struct ShardStateStorage {
28    db: CoreDb,
29
30    block_handle_storage: Arc<BlockHandleStorage>,
31    block_storage: Arc<BlockStorage>,
32    cell_storage: Arc<CellStorage>,
33    temp_file_storage: TempFileStorage,
34
35    gc_lock: tokio::sync::Mutex<()>,
36    min_ref_mc_state: MinRefMcStateTracker,
37    max_new_mc_cell_count: AtomicUsize,
38    max_new_sc_cell_count: AtomicUsize,
39
40    accounts_split_depth: u8,
41}
42
43impl ShardStateStorage {
44    pub fn new(
45        db: CoreDb,
46        block_handle_storage: Arc<BlockHandleStorage>,
47        block_storage: Arc<BlockStorage>,
48        temp_file_storage: TempFileStorage,
49        cache_size_bytes: ByteSize,
50    ) -> Result<Arc<Self>> {
51        let cell_storage = CellStorage::new(db.clone(), cache_size_bytes);
52
53        Ok(Arc::new(Self {
54            db,
55            block_handle_storage,
56            block_storage,
57            temp_file_storage,
58            cell_storage,
59            gc_lock: Default::default(),
60            min_ref_mc_state: MinRefMcStateTracker::new(),
61            max_new_mc_cell_count: AtomicUsize::new(0),
62            max_new_sc_cell_count: AtomicUsize::new(0),
63            accounts_split_depth: 4,
64        }))
65    }
66
67    pub fn metrics(&self) -> ShardStateStorageMetrics {
68        ShardStateStorageMetrics {
69            max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
70            max_new_sc_cell_count: self.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
71        }
72    }
73
74    // TODO: implement metrics
75    // pub fn cache_metrics(&self) -> CacheStats {
76    // self.cell_storage.cache_stats()
77    // }
78
79    pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
80        &self.min_ref_mc_state
81    }
82
83    pub async fn store_state(
84        &self,
85        handle: &BlockHandle,
86        state: &ShardStateStuff,
87        hint: StoreStateHint,
88    ) -> Result<bool> {
89        if handle.id() != state.block_id() {
90            return Err(ShardStateStorageError::BlockHandleIdMismatch.into());
91        }
92
93        self.store_state_root(handle, state.root_cell().clone(), hint)
94            .await
95    }
96
97    pub async fn store_state_root(
98        &self,
99        handle: &BlockHandle,
100        root_cell: Cell,
101        hint: StoreStateHint,
102    ) -> Result<bool> {
103        if handle.has_state() {
104            return Ok(false);
105        }
106
107        let _gc_lock = {
108            let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
109            self.gc_lock.lock().await
110        };
111
112        // Double check if the state is already stored
113        if handle.has_state() {
114            return Ok(false);
115        }
116        let _hist = HistogramGuard::begin("tycho_storage_state_store_time");
117
118        let block_id = *handle.id();
119        let raw_db = self.db.rocksdb().clone();
120        let cf = self.db.shard_states.get_unbounded_cf();
121        let cell_storage = self.cell_storage.clone();
122        let block_handle_storage = self.block_handle_storage.clone();
123        let handle = handle.clone();
124        let accounts_split_depth = self.accounts_split_depth;
125
126        // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
127        let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
128            let root_hash = *root_cell.repr_hash();
129            let estimated_merkle_update_size = hint.estimate_cell_count();
130
131            let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes
132            let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
133
134            let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
135
136            let new_cell_count = if block_id.is_masterchain() {
137                cell_storage.store_cell(
138                    &mut batch,
139                    root_cell.as_ref(),
140                    estimated_merkle_update_size,
141                )?
142            } else {
143                let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?;
144
145                cell_storage.store_cell_mt(
146                    root_cell.as_ref(),
147                    &mut batch,
148                    split_at,
149                    estimated_merkle_update_size,
150                )?
151            };
152
153            in_mem_store.finish();
154            metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
155
156            batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());
157
158            let hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
159            metrics::histogram!("tycho_storage_state_update_size_bytes")
160                .record(batch.size_in_bytes() as f64);
161            metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
162                .record(estimated_update_size_bytes as f64);
163
164            raw_db.write(batch)?;
165
166            drop(root_cell);
167
168            hist.finish();
169
170            let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
171            if updated {
172                block_handle_storage.store_handle(&handle, false);
173            }
174
175            Ok::<_, anyhow::Error>((new_cell_count, updated))
176        })
177        .await??;
178
179        let count = if block_id.shard.is_masterchain() {
180            &self.max_new_mc_cell_count
181        } else {
182            &self.max_new_sc_cell_count
183        };
184
185        count.fetch_max(new_cell_count, Ordering::Release);
186
187        Ok(updated)
188    }
189
190    pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<ShardStateStuff> {
191        let ctx = StoreStateContext {
192            db: self.db.clone(),
193            cell_storage: self.cell_storage.clone(),
194            temp_file_storage: self.temp_file_storage.clone(),
195            min_ref_mc_state: self.min_ref_mc_state.clone(),
196        };
197
198        let block_id = *block_id;
199
200        let _gc_lock = self.gc_lock.lock().await;
201        tokio::task::spawn_blocking(move || ctx.store(&block_id, boc)).await?
202    }
203
204    pub async fn load_state(&self, block_id: &BlockId) -> Result<ShardStateStuff> {
205        let cell_id = self.load_state_root(block_id)?;
206        let cell = self.cell_storage.load_cell(cell_id)?;
207
208        ShardStateStuff::from_root(block_id, Cell::from(cell as Arc<_>), &self.min_ref_mc_state)
209    }
210
211    #[tracing::instrument(skip(self))]
212    pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
213        // Compute recent block ids for the specified masterchain seqno
214        let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
215            tracing::warn!("recent blocks edge not found");
216            return Ok(());
217        };
218
219        tracing::info!(
220            target_block_id = %top_blocks.mc_block,
221            "started states GC",
222        );
223        let started_at = Instant::now();
224
225        let raw = self.db.rocksdb();
226
227        // Manually get required column factory and r/w options
228        let snapshot = raw.snapshot();
229        let shard_states_cf = self.db.shard_states.get_unbounded_cf();
230        let mut states_read_options = self.db.shard_states.new_read_config();
231        states_read_options.set_snapshot(&snapshot);
232
233        let mut alloc = bumpalo_herd::Herd::new();
234
235        // Create iterator
236        let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
237        iter.seek_to_first();
238
239        // Iterate all states and remove outdated
240        let mut removed_states = 0usize;
241        let mut removed_cells = 0usize;
242        loop {
243            let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
244            let (key, value) = match iter.item() {
245                Some(item) => item,
246                None => match iter.status() {
247                    Ok(()) => break,
248                    Err(e) => return Err(e.into()),
249                },
250            };
251
252            let block_id = BlockId::from_slice(key);
253            let root_hash = HashBytes::from_slice(value);
254
255            // Skip blocks from zero state and top blocks
256            if block_id.seqno == 0
257                || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
258            {
259                iter.next();
260                continue;
261            }
262
263            alloc.reset();
264
265            let guard = {
266                let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
267                self.gc_lock.lock().await
268            };
269
270            let db = self.db.clone();
271            let cell_storage = self.cell_storage.clone();
272            let key = key.to_vec();
273            let accounts_split_depth = self.accounts_split_depth;
274            let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
275                let in_mem_remove =
276                    HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
277
278                let (stats, mut batch) = if block_id.is_masterchain() {
279                    cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
280                } else {
281                    let root_cell = Cell::from(cell_storage.load_cell(root_hash)? as Arc<_>);
282
283                    let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?
284                        .into_keys()
285                        .collect::<FastHashSet<HashBytes>>();
286                    cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
287                };
288
289                in_mem_remove.finish();
290
291                batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
292                db.raw()
293                    .rocksdb()
294                    .write_opt(batch, db.cells.write_config())?;
295
296                Ok::<_, anyhow::Error>((stats, alloc))
297            })
298            .await??;
299
300            drop(guard);
301
302            removed_cells += total;
303            alloc = inner_alloc; // Reuse allocation without passing alloc by ref
304
305            tracing::debug!(removed_cells = total, %block_id);
306
307            removed_states += 1;
308            iter.next();
309
310            metrics::counter!("tycho_storage_state_gc_count").increment(1);
311            metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
312            if block_id.is_masterchain() {
313                metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
314            }
315            tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
316        }
317
318        // Done
319        tracing::info!(
320            removed_states,
321            removed_cells,
322            block_id = %top_blocks.mc_block,
323            elapsed_sec = started_at.elapsed().as_secs_f64(),
324            "finished states GC",
325        );
326        Ok(())
327    }
328
329    /// Searches for an edge with the least referenced masterchain block
330    ///
331    /// Returns `None` if all states are recent enough
332    pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
333        // 0. Adjust masterchain seqno with minimal referenced masterchain state
334        if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno() {
335            if min_ref_mc_seqno < mc_seqno {
336                mc_seqno = min_ref_mc_seqno;
337            }
338        }
339
340        let snapshot = self.db.rocksdb().snapshot();
341
342        // 1. Find target block
343
344        // Find block id using states table
345        let mc_block_id = match self
346            .find_mc_block_id(mc_seqno, &snapshot)
347            .context("Failed to find block id by seqno")?
348        {
349            Some(block_id) => block_id,
350            None => return Ok(None),
351        };
352
353        // Find block handle
354        let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
355            Some(handle) if handle.has_data() => handle,
356            // Skip blocks without handle or data
357            _ => return Ok(None),
358        };
359
360        // 2. Find minimal referenced masterchain block from the target block
361
362        let block_data = self.block_storage.load_block_data(&handle).await?;
363        let block_info = block_data
364            .load_info()
365            .context("Failed to read target block info")?;
366
367        // Find full min masterchain reference id
368        let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
369        let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
370            Some(block_id) => block_id,
371            None => return Ok(None),
372        };
373
374        // Find block handle
375        let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
376            Some(handle) if handle.has_data() => handle,
377            // Skip blocks without handle or data
378            _ => return Ok(None),
379        };
380
381        // Compute `TopBlocks` from block data
382        self.block_storage
383            .load_block_data(&min_ref_block_handle)
384            .await
385            .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
386            .map(Some)
387    }
388
389    pub fn load_state_root(&self, block_id: &BlockId) -> Result<HashBytes> {
390        let shard_states = &self.db.shard_states;
391        let shard_state = shard_states.get(block_id.to_vec())?;
392        match shard_state {
393            Some(root) => Ok(HashBytes::from_slice(&root[..32])),
394            None => Err(ShardStateStorageError::NotFound.into()),
395        }
396    }
397
398    fn find_mc_block_id(
399        &self,
400        mc_seqno: u32,
401        snapshot: &rocksdb::Snapshot<'_>,
402    ) -> Result<Option<BlockId>> {
403        let shard_states = &self.db.shard_states;
404
405        let mut bound = BlockId {
406            shard: ShardIdent::MASTERCHAIN,
407            seqno: mc_seqno,
408            root_hash: HashBytes::ZERO,
409            file_hash: HashBytes::ZERO,
410        };
411
412        let mut readopts = shard_states.new_read_config();
413        readopts.set_snapshot(snapshot);
414        readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
415        bound.seqno += 1;
416        readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
417
418        let mut iter = self
419            .db
420            .rocksdb()
421            .raw_iterator_cf_opt(&shard_states.cf(), readopts);
422        iter.seek_to_first();
423
424        Ok(iter.key().map(BlockId::from_slice))
425    }
426}
427
428#[derive(Default, Debug, Clone, Copy)]
429pub struct StoreStateHint {
430    pub block_data_size: Option<usize>,
431}
432
433impl StoreStateHint {
434    fn estimate_cell_count(&self) -> usize {
435        const MIN_BLOCK_SIZE: usize = 4 << 10; // 4 KB
436
437        let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);
438
439        // y = 3889.9821 + 14.7480 × √x
440        // R-squared: 0.7035
441        ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
442    }
443}
444
445#[derive(Debug, Copy, Clone)]
446pub struct ShardStateStorageMetrics {
447    pub max_new_mc_cell_count: usize,
448    pub max_new_sc_cell_count: usize,
449}
450
451#[derive(thiserror::Error, Debug)]
452pub enum ShardStateStorageError {
453    #[error("Not found")]
454    NotFound,
455    #[error("Block handle id mismatch")]
456    BlockHandleIdMismatch,
457}
458
459fn split_shard_accounts(
460    root_cell: impl AsRef<DynCell>,
461    split_depth: u8,
462) -> Result<FastHashMap<HashBytes, Cell>> {
463    // Cell#0 - processed_upto
464    // Cell#1 - accounts
465    let shard_accounts = root_cell
466        .as_ref()
467        .reference_cloned(1)
468        .context("invalid shard state")?
469        .parse::<ShardAccounts>()
470        .context("failed to load shard accounts")?;
471
472    split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
473}