tycho_core/storage/shard_state/
mod.rs1use std::fs::File;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use bytesize::ByteSize;
8use tycho_block_util::block::*;
9use tycho_block_util::dict::split_aug_dict_raw;
10use tycho_block_util::state::*;
11use tycho_storage::fs::TempFileStorage;
12use tycho_storage::kv::StoredValue;
13use tycho_types::models::*;
14use tycho_types::prelude::*;
15use tycho_util::mem::Reclaimer;
16use tycho_util::metrics::HistogramGuard;
17use tycho_util::{FastHashMap, FastHashSet};
18use weedb::rocksdb;
19
20use self::cell_storage::*;
21use self::store_state_raw::StoreStateContext;
22use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb};
23
24mod cell_storage;
25mod entries_buffer;
26mod store_state_raw;
27
28pub struct ShardStateStorage {
29 cells_db: CellsDb,
30
31 block_handle_storage: Arc<BlockHandleStorage>,
32 block_storage: Arc<BlockStorage>,
33 cell_storage: Arc<CellStorage>,
34 temp_file_storage: TempFileStorage,
35
36 gc_lock: Arc<tokio::sync::Mutex<()>>,
37 min_ref_mc_state: MinRefMcStateTracker,
38 max_new_mc_cell_count: AtomicUsize,
39 max_new_sc_cell_count: AtomicUsize,
40
41 accounts_split_depth: u8,
42}
43
44impl ShardStateStorage {
45 pub fn new(
47 cells_db: CellsDb,
48 block_handle_storage: Arc<BlockHandleStorage>,
49 block_storage: Arc<BlockStorage>,
50 temp_file_storage: TempFileStorage,
51 cache_size_bytes: ByteSize,
52 drop_interval: u32,
53 ) -> Result<Arc<Self>> {
54 let cell_storage = CellStorage::new(cells_db.clone(), cache_size_bytes, drop_interval);
55
56 Ok(Arc::new(Self {
57 cells_db,
58 block_handle_storage,
59 block_storage,
60 temp_file_storage,
61 cell_storage,
62 gc_lock: Default::default(),
63 min_ref_mc_state: MinRefMcStateTracker::new(),
64 max_new_mc_cell_count: AtomicUsize::new(0),
65 max_new_sc_cell_count: AtomicUsize::new(0),
66 accounts_split_depth: 4,
67 }))
68 }
69
70 pub fn metrics(&self) -> ShardStateStorageMetrics {
71 ShardStateStorageMetrics {
72 max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
73 max_new_sc_cell_count: self.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
74 }
75 }
76
77 pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
83 &self.min_ref_mc_state
84 }
85
86 pub fn cell_storage(&self) -> &Arc<CellStorage> {
87 &self.cell_storage
88 }
89
90 pub async fn store_state(
91 &self,
92 handle: &BlockHandle,
93 state: &ShardStateStuff,
94 hint: StoreStateHint,
95 ) -> Result<bool> {
96 anyhow::ensure!(
97 handle.id() == state.block_id(),
98 ShardStateStorageError::BlockHandleIdMismatch {
99 expected: state.block_id().as_short_id(),
100 actual: handle.id().as_short_id(),
101 }
102 );
103
104 self.store_state_root(handle, state.root_cell().clone(), hint)
105 .await
106 }
107
108 pub async fn store_state_root(
109 &self,
110 handle: &BlockHandle,
111 root_cell: Cell,
112 hint: StoreStateHint,
113 ) -> Result<bool> {
114 if handle.has_state() {
115 return Ok(false);
116 }
117
118 let gc_lock = {
119 let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
120 self.gc_lock.clone().lock_owned().await
121 };
122
123 if handle.has_state() {
125 return Ok(false);
126 }
127 let _hist = HistogramGuard::begin("tycho_storage_state_store_time");
128
129 let block_id = *handle.id();
130 let raw_db = self.cells_db.rocksdb().clone();
131 let cf = self.cells_db.shard_states.get_unbounded_cf();
132 let cell_storage = self.cell_storage.clone();
133 let block_handle_storage = self.block_handle_storage.clone();
134 let handle = handle.clone();
135 let accounts_split_depth = self.accounts_split_depth;
136
137 let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
139 let root_hash = *root_cell.repr_hash();
140 let estimated_merkle_update_size = hint.estimate_cell_count();
141
142 let estimated_update_size_bytes = estimated_merkle_update_size * 192; let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
144
145 let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
146
147 let new_cell_count = if block_id.is_masterchain() {
148 cell_storage.store_cell(
149 &mut batch,
150 root_cell.as_ref(),
151 estimated_merkle_update_size,
152 )?
153 } else {
154 let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?;
155
156 cell_storage.store_cell_mt(
157 root_cell.as_ref(),
158 &mut batch,
159 split_at,
160 estimated_merkle_update_size,
161 )?
162 };
163
164 in_mem_store.finish();
165 metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
166
167 batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());
168
169 let hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
170 metrics::histogram!("tycho_storage_state_update_size_bytes")
171 .record(batch.size_in_bytes() as f64);
172 metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
173 .record(estimated_update_size_bytes as f64);
174
175 raw_db.write(batch)?;
176
177 Reclaimer::instance().drop(root_cell);
178
179 hist.finish();
180
181 let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
182 if updated {
183 block_handle_storage.store_handle(&handle, false);
184 }
185
186 drop(gc_lock);
188
189 Ok::<_, anyhow::Error>((new_cell_count, updated))
190 })
191 .await??;
192
193 let count = if block_id.shard.is_masterchain() {
194 &self.max_new_mc_cell_count
195 } else {
196 &self.max_new_sc_cell_count
197 };
198
199 count.fetch_max(new_cell_count, Ordering::Release);
200
201 Ok(updated)
202 }
203
204 pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<HashBytes> {
206 let ctx = StoreStateContext {
207 cells_db: self.cells_db.clone(),
208 cell_storage: self.cell_storage.clone(),
209 temp_file_storage: self.temp_file_storage.clone(),
210 };
211
212 let block_id = *block_id;
213
214 let gc_lock = self.gc_lock.clone().lock_owned().await;
215 tokio::task::spawn_blocking(move || {
216 let _gc_lock = gc_lock;
218
219 ctx.store(&block_id, boc)
220 })
221 .await?
222 }
223
224 pub async fn load_state(
230 &self,
231 ref_by_mc_seqno: u32,
232 block_id: &BlockId,
233 ) -> Result<ShardStateStuff> {
234 static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0);
236
237 let root_hash = self.load_state_root_hash(block_id)?;
238 let root = self.cell_storage.load_cell(&root_hash, ref_by_mc_seqno)?;
239 let root = Cell::from(root as Arc<_>);
240
241 let max_known_epoch = MAX_KNOWN_EPOCH
242 .fetch_max(ref_by_mc_seqno, Ordering::Relaxed)
243 .max(ref_by_mc_seqno);
244 metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch);
245
246 let shard_state = root.parse::<Box<ShardStateUnsplit>>()?;
247 let handle = self.min_ref_mc_state.insert(&shard_state);
248 ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle)
249 }
250
251 pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result<HashBytes> {
252 let shard_states = &self.cells_db.shard_states;
253 let shard_state = shard_states.get(block_id.to_vec())?;
254 match shard_state {
255 Some(root) => Ok(HashBytes::from_slice(&root[..32])),
256 None => {
257 anyhow::bail!(ShardStateStorageError::NotFound(block_id.as_short_id()))
258 }
259 }
260 }
261
262 #[tracing::instrument(skip(self))]
263 pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
264 let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
266 tracing::warn!("recent blocks edge not found");
267 return Ok(());
268 };
269
270 tracing::info!(
271 target_block_id = %top_blocks.mc_block,
272 "started states GC",
273 );
274 let started_at = Instant::now();
275
276 let raw = self.cells_db.rocksdb();
277
278 let snapshot = raw.snapshot();
280 let shard_states_cf = self.cells_db.shard_states.get_unbounded_cf();
281 let mut states_read_options = self.cells_db.shard_states.new_read_config();
282 states_read_options.set_snapshot(&snapshot);
283
284 let mut alloc = bumpalo_herd::Herd::new();
285
286 let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
288 iter.seek_to_first();
289
290 let mut removed_states = 0usize;
292 let mut removed_cells = 0usize;
293 loop {
294 let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
295 let (key, value) = match iter.item() {
296 Some(item) => item,
297 None => match iter.status() {
298 Ok(()) => break,
299 Err(e) => return Err(e.into()),
300 },
301 };
302
303 let block_id = BlockId::from_slice(key);
304 let root_hash = HashBytes::from_slice(value);
305
306 if block_id.seqno == 0
308 || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
309 {
310 iter.next();
311 continue;
312 }
313
314 alloc.reset();
315
316 let guard = {
317 let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
318 self.gc_lock.clone().lock_owned().await
319 };
320
321 let db = self.cells_db.clone();
322 let cell_storage = self.cell_storage.clone();
323 let key = key.to_vec();
324 let accounts_split_depth = self.accounts_split_depth;
325 let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
326 let in_mem_remove =
327 HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
328
329 let (stats, mut batch) = if block_id.is_masterchain() {
330 cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
331 } else {
332 let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>);
335
336 let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?
337 .into_keys()
338 .collect::<FastHashSet<HashBytes>>();
339 cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
340 };
341
342 in_mem_remove.finish();
343
344 batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
345 db.raw()
346 .rocksdb()
347 .write_opt(batch, db.cells.write_config())?;
348
349 drop(guard);
351
352 Ok::<_, anyhow::Error>((stats, alloc))
353 })
354 .await??;
355
356 removed_cells += total;
357 alloc = inner_alloc; tracing::debug!(removed_cells = total, %block_id);
360
361 removed_states += 1;
362 iter.next();
363
364 metrics::counter!("tycho_storage_state_gc_count").increment(1);
365 metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
366 if block_id.is_masterchain() {
367 metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
368 }
369 tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
370 }
371
372 tracing::info!(
374 removed_states,
375 removed_cells,
376 block_id = %top_blocks.mc_block,
377 elapsed_sec = started_at.elapsed().as_secs_f64(),
378 "finished states GC",
379 );
380 Ok(())
381 }
382
383 pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
387 if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno()
389 && min_ref_mc_seqno < mc_seqno
390 {
391 mc_seqno = min_ref_mc_seqno;
392 }
393
394 let snapshot = self.cells_db.rocksdb().snapshot();
395
396 let mc_block_id = match self
400 .find_mc_block_id(mc_seqno, &snapshot)
401 .context("Failed to find block id by seqno")?
402 {
403 Some(block_id) => block_id,
404 None => return Ok(None),
405 };
406
407 let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
409 Some(handle) if handle.has_data() => handle,
410 _ => return Ok(None),
412 };
413
414 let block_data = self.block_storage.load_block_data(&handle).await?;
417 let block_info = block_data
418 .load_info()
419 .context("Failed to read target block info")?;
420
421 let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
423 let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
424 Some(block_id) => block_id,
425 None => return Ok(None),
426 };
427
428 let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
430 Some(handle) if handle.has_data() => handle,
431 _ => return Ok(None),
433 };
434
435 self.block_storage
437 .load_block_data(&min_ref_block_handle)
438 .await
439 .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
440 .map(Some)
441 }
442
443 fn find_mc_block_id(
444 &self,
445 mc_seqno: u32,
446 snapshot: &rocksdb::Snapshot<'_>,
447 ) -> Result<Option<BlockId>> {
448 let shard_states = &self.cells_db.shard_states;
449
450 let mut bound = BlockId {
451 shard: ShardIdent::MASTERCHAIN,
452 seqno: mc_seqno,
453 root_hash: HashBytes::ZERO,
454 file_hash: HashBytes::ZERO,
455 };
456
457 let mut readopts = shard_states.new_read_config();
458 readopts.set_snapshot(snapshot);
459 readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
460 bound.seqno += 1;
461 readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
462
463 let mut iter = self
464 .cells_db
465 .rocksdb()
466 .raw_iterator_cf_opt(&shard_states.cf(), readopts);
467 iter.seek_to_first();
468
469 Ok(iter.key().map(BlockId::from_slice))
470 }
471}
472
473#[derive(Default, Debug, Clone, Copy)]
474pub struct StoreStateHint {
475 pub block_data_size: Option<usize>,
476}
477
478impl StoreStateHint {
479 fn estimate_cell_count(&self) -> usize {
480 const MIN_BLOCK_SIZE: usize = 4 << 10; let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);
483
484 ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
487 }
488}
489
490#[derive(Debug, Copy, Clone)]
491pub struct ShardStateStorageMetrics {
492 pub max_new_mc_cell_count: usize,
493 pub max_new_sc_cell_count: usize,
494}
495
496#[derive(thiserror::Error, Debug)]
497pub enum ShardStateStorageError {
498 #[error("Shard state not found for block: {0}")]
499 NotFound(BlockIdShort),
500 #[error("Block handle id mismatch: expected {expected}, got {actual}")]
501 BlockHandleIdMismatch {
502 expected: BlockIdShort,
503 actual: BlockIdShort,
504 },
505}
506
507pub fn split_shard_accounts(
508 root_cell: impl AsRef<DynCell>,
509 split_depth: u8,
510) -> Result<FastHashMap<HashBytes, Cell>> {
511 let shard_accounts = root_cell
514 .as_ref()
515 .reference_cloned(1)
516 .context("invalid shard state")?
517 .parse::<ShardAccounts>()
518 .context("failed to load shard accounts")?;
519
520 split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
521}