tycho_core/storage/shard_state/
mod.rs1use std::fs::File;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use bytesize::ByteSize;
8use tycho_block_util::block::*;
9use tycho_block_util::dict::split_aug_dict_raw;
10use tycho_block_util::state::*;
11use tycho_storage::fs::TempFileStorage;
12use tycho_storage::kv::StoredValue;
13use tycho_types::models::*;
14use tycho_types::prelude::*;
15use tycho_util::mem::Reclaimer;
16use tycho_util::metrics::HistogramGuard;
17use tycho_util::{FastHashMap, FastHashSet};
18use weedb::rocksdb;
19
20use self::cell_storage::*;
21use self::store_state_raw::StoreStateContext;
22use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb};
23
24mod cell_storage;
25mod entries_buffer;
26mod store_state_raw;
27
28pub struct ShardStateStorage {
29 cells_db: CellsDb,
30
31 block_handle_storage: Arc<BlockHandleStorage>,
32 block_storage: Arc<BlockStorage>,
33 cell_storage: Arc<CellStorage>,
34 temp_file_storage: TempFileStorage,
35
36 gc_lock: Arc<tokio::sync::Mutex<()>>,
37 min_ref_mc_state: MinRefMcStateTracker,
38 max_new_mc_cell_count: AtomicUsize,
39 max_new_sc_cell_count: AtomicUsize,
40
41 accounts_split_depth: u8,
42}
43
44impl ShardStateStorage {
45 pub fn new(
47 cells_db: CellsDb,
48 block_handle_storage: Arc<BlockHandleStorage>,
49 block_storage: Arc<BlockStorage>,
50 temp_file_storage: TempFileStorage,
51 cache_size_bytes: ByteSize,
52 drop_interval: u32,
53 ) -> Result<Arc<Self>> {
54 let cell_storage = CellStorage::new(cells_db.clone(), cache_size_bytes, drop_interval);
55
56 Ok(Arc::new(Self {
57 cells_db,
58 block_handle_storage,
59 block_storage,
60 temp_file_storage,
61 cell_storage,
62 gc_lock: Default::default(),
63 min_ref_mc_state: MinRefMcStateTracker::new(),
64 max_new_mc_cell_count: AtomicUsize::new(0),
65 max_new_sc_cell_count: AtomicUsize::new(0),
66 accounts_split_depth: 4,
67 }))
68 }
69
70 pub fn metrics(&self) -> ShardStateStorageMetrics {
71 ShardStateStorageMetrics {
72 max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
73 max_new_sc_cell_count: self.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
74 }
75 }
76
77 pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
83 &self.min_ref_mc_state
84 }
85
86 pub async fn store_state(
87 &self,
88 handle: &BlockHandle,
89 state: &ShardStateStuff,
90 hint: StoreStateHint,
91 ) -> Result<bool> {
92 anyhow::ensure!(
93 handle.id() == state.block_id(),
94 ShardStateStorageError::BlockHandleIdMismatch {
95 expected: state.block_id().as_short_id(),
96 actual: handle.id().as_short_id(),
97 }
98 );
99
100 self.store_state_root(handle, state.root_cell().clone(), hint)
101 .await
102 }
103
104 pub async fn store_state_root(
105 &self,
106 handle: &BlockHandle,
107 root_cell: Cell,
108 hint: StoreStateHint,
109 ) -> Result<bool> {
110 if handle.has_state() {
111 return Ok(false);
112 }
113
114 let gc_lock = {
115 let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
116 self.gc_lock.clone().lock_owned().await
117 };
118
119 if handle.has_state() {
121 return Ok(false);
122 }
123 let _hist = HistogramGuard::begin("tycho_storage_state_store_time");
124
125 let block_id = *handle.id();
126 let raw_db = self.cells_db.rocksdb().clone();
127 let cf = self.cells_db.shard_states.get_unbounded_cf();
128 let cell_storage = self.cell_storage.clone();
129 let block_handle_storage = self.block_handle_storage.clone();
130 let handle = handle.clone();
131 let accounts_split_depth = self.accounts_split_depth;
132
133 let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
135 let root_hash = *root_cell.repr_hash();
136 let estimated_merkle_update_size = hint.estimate_cell_count();
137
138 let estimated_update_size_bytes = estimated_merkle_update_size * 192; let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
140
141 let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
142
143 let new_cell_count = if block_id.is_masterchain() {
144 cell_storage.store_cell(
145 &mut batch,
146 root_cell.as_ref(),
147 estimated_merkle_update_size,
148 )?
149 } else {
150 let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?;
151
152 cell_storage.store_cell_mt(
153 root_cell.as_ref(),
154 &mut batch,
155 split_at,
156 estimated_merkle_update_size,
157 )?
158 };
159
160 in_mem_store.finish();
161 metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
162
163 batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());
164
165 let hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
166 metrics::histogram!("tycho_storage_state_update_size_bytes")
167 .record(batch.size_in_bytes() as f64);
168 metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
169 .record(estimated_update_size_bytes as f64);
170
171 raw_db.write(batch)?;
172
173 Reclaimer::instance().drop(root_cell);
174
175 hist.finish();
176
177 let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
178 if updated {
179 block_handle_storage.store_handle(&handle, false);
180 }
181
182 drop(gc_lock);
184
185 Ok::<_, anyhow::Error>((new_cell_count, updated))
186 })
187 .await??;
188
189 let count = if block_id.shard.is_masterchain() {
190 &self.max_new_mc_cell_count
191 } else {
192 &self.max_new_sc_cell_count
193 };
194
195 count.fetch_max(new_cell_count, Ordering::Release);
196
197 Ok(updated)
198 }
199
200 pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<HashBytes> {
202 let ctx = StoreStateContext {
203 cells_db: self.cells_db.clone(),
204 cell_storage: self.cell_storage.clone(),
205 temp_file_storage: self.temp_file_storage.clone(),
206 };
207
208 let block_id = *block_id;
209
210 let gc_lock = self.gc_lock.clone().lock_owned().await;
211 tokio::task::spawn_blocking(move || {
212 let _gc_lock = gc_lock;
214
215 ctx.store(&block_id, boc)
216 })
217 .await?
218 }
219
220 pub async fn load_state(
226 &self,
227 ref_by_mc_seqno: u32,
228 block_id: &BlockId,
229 ) -> Result<ShardStateStuff> {
230 static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0);
232
233 let root_hash = self.load_state_root_hash(block_id)?;
234 let root = self.cell_storage.load_cell(&root_hash, ref_by_mc_seqno)?;
235 let root = Cell::from(root as Arc<_>);
236
237 let max_known_epoch = MAX_KNOWN_EPOCH
238 .fetch_max(ref_by_mc_seqno, Ordering::Relaxed)
239 .max(ref_by_mc_seqno);
240 metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch);
241
242 let shard_state = root.parse::<Box<ShardStateUnsplit>>()?;
243 let handle = self.min_ref_mc_state.insert(&shard_state);
244 ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle)
245 }
246
247 pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result<HashBytes> {
248 let shard_states = &self.cells_db.shard_states;
249 let shard_state = shard_states.get(block_id.to_vec())?;
250 match shard_state {
251 Some(root) => Ok(HashBytes::from_slice(&root[..32])),
252 None => {
253 anyhow::bail!(ShardStateStorageError::NotFound(block_id.as_short_id()))
254 }
255 }
256 }
257
258 #[tracing::instrument(skip(self))]
259 pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
260 let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
262 tracing::warn!("recent blocks edge not found");
263 return Ok(());
264 };
265
266 tracing::info!(
267 target_block_id = %top_blocks.mc_block,
268 "started states GC",
269 );
270 let started_at = Instant::now();
271
272 let raw = self.cells_db.rocksdb();
273
274 let snapshot = raw.snapshot();
276 let shard_states_cf = self.cells_db.shard_states.get_unbounded_cf();
277 let mut states_read_options = self.cells_db.shard_states.new_read_config();
278 states_read_options.set_snapshot(&snapshot);
279
280 let mut alloc = bumpalo_herd::Herd::new();
281
282 let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
284 iter.seek_to_first();
285
286 let mut removed_states = 0usize;
288 let mut removed_cells = 0usize;
289 loop {
290 let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
291 let (key, value) = match iter.item() {
292 Some(item) => item,
293 None => match iter.status() {
294 Ok(()) => break,
295 Err(e) => return Err(e.into()),
296 },
297 };
298
299 let block_id = BlockId::from_slice(key);
300 let root_hash = HashBytes::from_slice(value);
301
302 if block_id.seqno == 0
304 || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
305 {
306 iter.next();
307 continue;
308 }
309
310 alloc.reset();
311
312 let guard = {
313 let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
314 self.gc_lock.clone().lock_owned().await
315 };
316
317 let db = self.cells_db.clone();
318 let cell_storage = self.cell_storage.clone();
319 let key = key.to_vec();
320 let accounts_split_depth = self.accounts_split_depth;
321 let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
322 let in_mem_remove =
323 HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
324
325 let (stats, mut batch) = if block_id.is_masterchain() {
326 cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
327 } else {
328 let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>);
331
332 let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?
333 .into_keys()
334 .collect::<FastHashSet<HashBytes>>();
335 cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
336 };
337
338 in_mem_remove.finish();
339
340 batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
341 db.raw()
342 .rocksdb()
343 .write_opt(batch, db.cells.write_config())?;
344
345 drop(guard);
347
348 Ok::<_, anyhow::Error>((stats, alloc))
349 })
350 .await??;
351
352 removed_cells += total;
353 alloc = inner_alloc; tracing::debug!(removed_cells = total, %block_id);
356
357 removed_states += 1;
358 iter.next();
359
360 metrics::counter!("tycho_storage_state_gc_count").increment(1);
361 metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
362 if block_id.is_masterchain() {
363 metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
364 }
365 tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
366 }
367
368 tracing::info!(
370 removed_states,
371 removed_cells,
372 block_id = %top_blocks.mc_block,
373 elapsed_sec = started_at.elapsed().as_secs_f64(),
374 "finished states GC",
375 );
376 Ok(())
377 }
378
379 pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
383 if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno()
385 && min_ref_mc_seqno < mc_seqno
386 {
387 mc_seqno = min_ref_mc_seqno;
388 }
389
390 let snapshot = self.cells_db.rocksdb().snapshot();
391
392 let mc_block_id = match self
396 .find_mc_block_id(mc_seqno, &snapshot)
397 .context("Failed to find block id by seqno")?
398 {
399 Some(block_id) => block_id,
400 None => return Ok(None),
401 };
402
403 let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
405 Some(handle) if handle.has_data() => handle,
406 _ => return Ok(None),
408 };
409
410 let block_data = self.block_storage.load_block_data(&handle).await?;
413 let block_info = block_data
414 .load_info()
415 .context("Failed to read target block info")?;
416
417 let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
419 let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
420 Some(block_id) => block_id,
421 None => return Ok(None),
422 };
423
424 let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
426 Some(handle) if handle.has_data() => handle,
427 _ => return Ok(None),
429 };
430
431 self.block_storage
433 .load_block_data(&min_ref_block_handle)
434 .await
435 .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
436 .map(Some)
437 }
438
439 fn find_mc_block_id(
440 &self,
441 mc_seqno: u32,
442 snapshot: &rocksdb::Snapshot<'_>,
443 ) -> Result<Option<BlockId>> {
444 let shard_states = &self.cells_db.shard_states;
445
446 let mut bound = BlockId {
447 shard: ShardIdent::MASTERCHAIN,
448 seqno: mc_seqno,
449 root_hash: HashBytes::ZERO,
450 file_hash: HashBytes::ZERO,
451 };
452
453 let mut readopts = shard_states.new_read_config();
454 readopts.set_snapshot(snapshot);
455 readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
456 bound.seqno += 1;
457 readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
458
459 let mut iter = self
460 .cells_db
461 .rocksdb()
462 .raw_iterator_cf_opt(&shard_states.cf(), readopts);
463 iter.seek_to_first();
464
465 Ok(iter.key().map(BlockId::from_slice))
466 }
467}
468
469#[derive(Default, Debug, Clone, Copy)]
470pub struct StoreStateHint {
471 pub block_data_size: Option<usize>,
472}
473
474impl StoreStateHint {
475 fn estimate_cell_count(&self) -> usize {
476 const MIN_BLOCK_SIZE: usize = 4 << 10; let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);
479
480 ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
483 }
484}
485
486#[derive(Debug, Copy, Clone)]
487pub struct ShardStateStorageMetrics {
488 pub max_new_mc_cell_count: usize,
489 pub max_new_sc_cell_count: usize,
490}
491
492#[derive(thiserror::Error, Debug)]
493pub enum ShardStateStorageError {
494 #[error("Shard state not found for block: {0}")]
495 NotFound(BlockIdShort),
496 #[error("Block handle id mismatch: expected {expected}, got {actual}")]
497 BlockHandleIdMismatch {
498 expected: BlockIdShort,
499 actual: BlockIdShort,
500 },
501}
502
503fn split_shard_accounts(
504 root_cell: impl AsRef<DynCell>,
505 split_depth: u8,
506) -> Result<FastHashMap<HashBytes, Cell>> {
507 let shard_accounts = root_cell
510 .as_ref()
511 .reference_cloned(1)
512 .context("invalid shard state")?
513 .parse::<ShardAccounts>()
514 .context("failed to load shard accounts")?;
515
516 split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
517}