tycho_core/storage/shard_state/
mod.rs1use std::fs::File;
2use std::io::Cursor;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use bytes::Bytes;
9use bytesize::ByteSize;
10use tycho_block_util::block::*;
11use tycho_block_util::dict::split_aug_dict_raw;
12use tycho_block_util::state::*;
13use tycho_storage::fs::TempFileStorage;
14use tycho_storage::kv::StoredValue;
15use tycho_types::models::*;
16use tycho_types::prelude::*;
17use tycho_util::mem::Reclaimer;
18use tycho_util::metrics::HistogramGuard;
19use tycho_util::{FastHashMap, FastHashSet};
20use weedb::rocksdb;
21
22use self::cell_storage::*;
23use self::store_state_raw::StoreStateContext;
24use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb};
25
26mod cell_storage;
27mod entries_buffer;
28mod store_state_raw;
29
30pub struct ShardStateStorage {
31 cells_db: CellsDb,
32
33 block_handle_storage: Arc<BlockHandleStorage>,
34 block_storage: Arc<BlockStorage>,
35 cell_storage: Arc<CellStorage>,
36 temp_file_storage: TempFileStorage,
37
38 gc_lock: Arc<tokio::sync::Mutex<()>>,
39 min_ref_mc_state: MinRefMcStateTracker,
40 max_new_mc_cell_count: AtomicUsize,
41 max_new_sc_cell_count: AtomicUsize,
42
43 accounts_split_depth: u8,
44}
45
46impl ShardStateStorage {
47 pub fn new(
49 cells_db: CellsDb,
50 block_handle_storage: Arc<BlockHandleStorage>,
51 block_storage: Arc<BlockStorage>,
52 temp_file_storage: TempFileStorage,
53 cache_size_bytes: ByteSize,
54 drop_interval: u32,
55 ) -> Result<Arc<Self>> {
56 let cell_storage = CellStorage::new(cells_db.clone(), cache_size_bytes, drop_interval);
57
58 Ok(Arc::new(Self {
59 cells_db,
60 block_handle_storage,
61 block_storage,
62 temp_file_storage,
63 cell_storage,
64 gc_lock: Default::default(),
65 min_ref_mc_state: MinRefMcStateTracker::new(),
66 max_new_mc_cell_count: AtomicUsize::new(0),
67 max_new_sc_cell_count: AtomicUsize::new(0),
68 accounts_split_depth: 4,
69 }))
70 }
71
72 pub fn metrics(&self) -> ShardStateStorageMetrics {
73 ShardStateStorageMetrics {
74 max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
75 max_new_sc_cell_count: self.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
76 }
77 }
78
79 pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
85 &self.min_ref_mc_state
86 }
87
88 pub fn cell_storage(&self) -> &Arc<CellStorage> {
89 &self.cell_storage
90 }
91
92 pub fn load_mc_block_id(&self, seqno: u32) -> Result<Option<BlockId>> {
94 let snapshot = self.cells_db.rocksdb().snapshot();
95 self.find_mc_block_id(seqno, &snapshot)
96 }
97
98 pub async fn store_state(
99 &self,
100 handle: &BlockHandle,
101 state: &ShardStateStuff,
102 hint: StoreStateHint,
103 ) -> Result<bool> {
104 anyhow::ensure!(
105 handle.id() == state.block_id(),
106 ShardStateStorageError::BlockHandleIdMismatch {
107 expected: state.block_id().as_short_id(),
108 actual: handle.id().as_short_id(),
109 }
110 );
111
112 self.store_state_root(handle, state.root_cell().clone(), hint)
113 .await
114 }
115
116 pub async fn store_state_root(
117 &self,
118 handle: &BlockHandle,
119 root_cell: Cell,
120 hint: StoreStateHint,
121 ) -> Result<bool> {
122 if handle.has_state() {
123 return Ok(false);
124 }
125
126 let gc_lock = {
127 let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
128 self.gc_lock.clone().lock_owned().await
129 };
130
131 if handle.has_state() {
133 return Ok(false);
134 }
135 let _hist = HistogramGuard::begin("tycho_storage_state_store_time");
136
137 let block_id = *handle.id();
138 let raw_db = self.cells_db.rocksdb().clone();
139 let cf = self.cells_db.shard_states.get_unbounded_cf();
140 let cell_storage = self.cell_storage.clone();
141 let block_handle_storage = self.block_handle_storage.clone();
142 let handle = handle.clone();
143 let accounts_split_depth = self.accounts_split_depth;
144
145 let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
147 let root_hash = *root_cell.repr_hash();
148 let estimated_merkle_update_size = hint.estimate_cell_count();
149
150 let estimated_update_size_bytes = estimated_merkle_update_size * 192; let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
152
153 let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
154
155 let new_cell_count = if block_id.is_masterchain() {
156 cell_storage.store_cell(
157 &mut batch,
158 root_cell.as_ref(),
159 estimated_merkle_update_size,
160 )?
161 } else {
162 let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?;
163
164 cell_storage.store_cell_mt(
165 root_cell.as_ref(),
166 &mut batch,
167 split_at,
168 estimated_merkle_update_size,
169 )?
170 };
171
172 in_mem_store.finish();
173 metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
174
175 batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());
176
177 let hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
178 metrics::histogram!("tycho_storage_state_update_size_bytes")
179 .record(batch.size_in_bytes() as f64);
180 metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
181 .record(estimated_update_size_bytes as f64);
182
183 raw_db.write(batch)?;
184
185 Reclaimer::instance().drop(root_cell);
186
187 hist.finish();
188
189 let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
190 if updated {
191 block_handle_storage.store_handle(&handle, false);
192 }
193
194 drop(gc_lock);
196
197 Ok::<_, anyhow::Error>((new_cell_count, updated))
198 })
199 .await??;
200
201 let count = if block_id.shard.is_masterchain() {
202 &self.max_new_mc_cell_count
203 } else {
204 &self.max_new_sc_cell_count
205 };
206
207 count.fetch_max(new_cell_count, Ordering::Release);
208
209 Ok(updated)
210 }
211
212 async fn store_state_inner<R>(&self, block_id: &BlockId, boc: R) -> Result<HashBytes>
213 where
214 R: std::io::Read + Send + 'static,
215 {
216 let ctx = StoreStateContext {
217 cells_db: self.cells_db.clone(),
218 cell_storage: self.cell_storage.clone(),
219 temp_file_storage: self.temp_file_storage.clone(),
220 };
221
222 let block_id = *block_id;
223
224 let gc_lock = self.gc_lock.clone().lock_owned().await;
225 tokio::task::spawn_blocking(move || {
226 let _gc_lock = gc_lock;
228
229 ctx.store(&block_id, boc)
230 })
231 .await?
232 }
233
234 pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<HashBytes> {
236 self.store_state_inner(block_id, boc).await
237 }
238
239 pub async fn store_state_bytes(&self, block_id: &BlockId, boc: Bytes) -> Result<HashBytes> {
240 let cursor = Cursor::new(boc);
241 self.store_state_inner(block_id, cursor).await
242 }
243
244 pub async fn load_state(
250 &self,
251 ref_by_mc_seqno: u32,
252 block_id: &BlockId,
253 ) -> Result<ShardStateStuff> {
254 static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0);
256
257 let root_hash = self.load_state_root_hash(block_id)?;
258 let root = self.cell_storage.load_cell(&root_hash, ref_by_mc_seqno)?;
259 let root = Cell::from(root as Arc<_>);
260
261 let max_known_epoch = MAX_KNOWN_EPOCH
262 .fetch_max(ref_by_mc_seqno, Ordering::Relaxed)
263 .max(ref_by_mc_seqno);
264 metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch);
265
266 let shard_state = root.parse::<Box<ShardStateUnsplit>>()?;
267 let handle = self.min_ref_mc_state.insert(&shard_state);
268 ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle)
269 }
270
271 pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result<HashBytes> {
272 let shard_states = &self.cells_db.shard_states;
273 let shard_state = shard_states.get(block_id.to_vec())?;
274 match shard_state {
275 Some(root) => Ok(HashBytes::from_slice(&root[..32])),
276 None => {
277 anyhow::bail!(ShardStateStorageError::NotFound(block_id.as_short_id()))
278 }
279 }
280 }
281
282 #[tracing::instrument(skip(self))]
283 pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
284 let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
286 tracing::warn!("recent blocks edge not found");
287 return Ok(());
288 };
289
290 tracing::info!(
291 target_block_id = %top_blocks.mc_block,
292 "started states GC",
293 );
294 let started_at = Instant::now();
295
296 let raw = self.cells_db.rocksdb();
297
298 let snapshot = raw.snapshot();
300 let shard_states_cf = self.cells_db.shard_states.get_unbounded_cf();
301 let mut states_read_options = self.cells_db.shard_states.new_read_config();
302 states_read_options.set_snapshot(&snapshot);
303
304 let mut alloc = bumpalo_herd::Herd::new();
305
306 let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
308 iter.seek_to_first();
309
310 let mut removed_states = 0usize;
312 let mut removed_cells = 0usize;
313 loop {
314 let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
315 let (key, value) = match iter.item() {
316 Some(item) => item,
317 None => match iter.status() {
318 Ok(()) => break,
319 Err(e) => return Err(e.into()),
320 },
321 };
322
323 let block_id = BlockId::from_slice(key);
324 let root_hash = HashBytes::from_slice(&value[0..32]);
325
326 if block_id.seqno == 0
332 || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
333 {
334 iter.next();
335 continue;
336 }
337
338 alloc.reset();
339
340 let guard = {
341 let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
342 self.gc_lock.clone().lock_owned().await
343 };
344
345 let db = self.cells_db.clone();
346 let cell_storage = self.cell_storage.clone();
347 let key = key.to_vec();
348 let accounts_split_depth = self.accounts_split_depth;
349 let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
350 let in_mem_remove =
351 HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
352
353 let (stats, mut batch) = if block_id.is_masterchain() {
354 cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
355 } else {
356 let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>);
359
360 let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?
361 .into_keys()
362 .collect::<FastHashSet<HashBytes>>();
363 cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
364 };
365
366 in_mem_remove.finish();
367
368 batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
369 db.raw()
370 .rocksdb()
371 .write_opt(batch, db.cells.write_config())?;
372
373 drop(guard);
375
376 Ok::<_, anyhow::Error>((stats, alloc))
377 })
378 .await??;
379
380 removed_cells += total;
381 alloc = inner_alloc; tracing::debug!(removed_cells = total, %block_id);
384
385 removed_states += 1;
386 iter.next();
387
388 metrics::counter!("tycho_storage_state_gc_count").increment(1);
389 metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
390 if block_id.is_masterchain() {
391 metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
392 }
393 tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
394 }
395
396 tracing::info!(
398 removed_states,
399 removed_cells,
400 block_id = %top_blocks.mc_block,
401 elapsed_sec = started_at.elapsed().as_secs_f64(),
402 "finished states GC",
403 );
404 Ok(())
405 }
406
407 pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
411 if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno()
413 && min_ref_mc_seqno < mc_seqno
414 {
415 mc_seqno = min_ref_mc_seqno;
416 }
417
418 let snapshot = self.cells_db.rocksdb().snapshot();
419
420 let mc_block_id = match self
424 .find_mc_block_id(mc_seqno, &snapshot)
425 .context("Failed to find block id by seqno")?
426 {
427 Some(block_id) => block_id,
428 None => return Ok(None),
429 };
430
431 let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
433 Some(handle) if handle.has_data() => handle,
434 _ => return Ok(None),
436 };
437
438 let block_data = self.block_storage.load_block_data(&handle).await?;
441 let block_info = block_data
442 .load_info()
443 .context("Failed to read target block info")?;
444
445 let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
447 let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
448 Some(block_id) => block_id,
449 None => return Ok(None),
450 };
451
452 let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
454 Some(handle) if handle.has_data() => handle,
455 _ => return Ok(None),
457 };
458
459 self.block_storage
461 .load_block_data(&min_ref_block_handle)
462 .await
463 .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
464 .map(Some)
465 }
466
467 fn find_mc_block_id(
468 &self,
469 mc_seqno: u32,
470 snapshot: &rocksdb::Snapshot<'_>,
471 ) -> Result<Option<BlockId>> {
472 let shard_states = &self.cells_db.shard_states;
473
474 let mut bound = BlockId {
475 shard: ShardIdent::MASTERCHAIN,
476 seqno: mc_seqno,
477 root_hash: HashBytes::ZERO,
478 file_hash: HashBytes::ZERO,
479 };
480
481 let mut readopts = shard_states.new_read_config();
482 readopts.set_snapshot(snapshot);
483 readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
484 bound.seqno += 1;
485 readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
486
487 let mut iter = self
488 .cells_db
489 .rocksdb()
490 .raw_iterator_cf_opt(&shard_states.cf(), readopts);
491 iter.seek_to_first();
492
493 Ok(iter.key().map(BlockId::from_slice))
494 }
495}
496
497#[derive(Default, Debug, Clone, Copy)]
498pub struct StoreStateHint {
499 pub block_data_size: Option<usize>,
500}
501
502impl StoreStateHint {
503 fn estimate_cell_count(&self) -> usize {
504 const MIN_BLOCK_SIZE: usize = 4 << 10; let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);
507
508 ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
511 }
512}
513
514#[derive(Debug, Copy, Clone)]
515pub struct ShardStateStorageMetrics {
516 pub max_new_mc_cell_count: usize,
517 pub max_new_sc_cell_count: usize,
518}
519
520#[derive(thiserror::Error, Debug)]
521pub enum ShardStateStorageError {
522 #[error("Shard state not found for block: {0}")]
523 NotFound(BlockIdShort),
524 #[error("Block handle id mismatch: expected {expected}, got {actual}")]
525 BlockHandleIdMismatch {
526 expected: BlockIdShort,
527 actual: BlockIdShort,
528 },
529}
530
531pub fn split_shard_accounts(
532 root_cell: impl AsRef<DynCell>,
533 split_depth: u8,
534) -> Result<FastHashMap<HashBytes, Cell>> {
535 let shard_accounts = root_cell
538 .as_ref()
539 .reference_cloned(1)
540 .context("invalid shard state")?
541 .parse::<ShardAccounts>()
542 .context("failed to load shard accounts")?;
543
544 split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
545}