tycho_core/storage/shard_state/
mod.rs1use std::fs::File;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use bytesize::ByteSize;
8use tycho_block_util::block::*;
9use tycho_block_util::dict::split_aug_dict_raw;
10use tycho_block_util::state::*;
11use tycho_storage::fs::TempFileStorage;
12use tycho_storage::kv::StoredValue;
13use tycho_types::models::*;
14use tycho_types::prelude::*;
15use tycho_util::metrics::HistogramGuard;
16use tycho_util::{FastHashMap, FastHashSet};
17use weedb::rocksdb;
18
19use self::cell_storage::*;
20use self::store_state_raw::StoreStateContext;
21use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CoreDb};
22
23mod cell_storage;
24mod entries_buffer;
25mod store_state_raw;
26
27pub struct ShardStateStorage {
28 db: CoreDb,
29
30 block_handle_storage: Arc<BlockHandleStorage>,
31 block_storage: Arc<BlockStorage>,
32 cell_storage: Arc<CellStorage>,
33 temp_file_storage: TempFileStorage,
34
35 gc_lock: tokio::sync::Mutex<()>,
36 min_ref_mc_state: MinRefMcStateTracker,
37 max_new_mc_cell_count: AtomicUsize,
38 max_new_sc_cell_count: AtomicUsize,
39
40 accounts_split_depth: u8,
41}
42
43impl ShardStateStorage {
44 pub fn new(
45 db: CoreDb,
46 block_handle_storage: Arc<BlockHandleStorage>,
47 block_storage: Arc<BlockStorage>,
48 temp_file_storage: TempFileStorage,
49 cache_size_bytes: ByteSize,
50 ) -> Result<Arc<Self>> {
51 let cell_storage = CellStorage::new(db.clone(), cache_size_bytes);
52
53 Ok(Arc::new(Self {
54 db,
55 block_handle_storage,
56 block_storage,
57 temp_file_storage,
58 cell_storage,
59 gc_lock: Default::default(),
60 min_ref_mc_state: MinRefMcStateTracker::new(),
61 max_new_mc_cell_count: AtomicUsize::new(0),
62 max_new_sc_cell_count: AtomicUsize::new(0),
63 accounts_split_depth: 4,
64 }))
65 }
66
67 pub fn metrics(&self) -> ShardStateStorageMetrics {
68 ShardStateStorageMetrics {
69 max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel),
70 max_new_sc_cell_count: self.max_new_sc_cell_count.swap(0, Ordering::AcqRel),
71 }
72 }
73
74 pub fn min_ref_mc_state(&self) -> &MinRefMcStateTracker {
80 &self.min_ref_mc_state
81 }
82
83 pub async fn store_state(
84 &self,
85 handle: &BlockHandle,
86 state: &ShardStateStuff,
87 hint: StoreStateHint,
88 ) -> Result<bool> {
89 if handle.id() != state.block_id() {
90 return Err(ShardStateStorageError::BlockHandleIdMismatch.into());
91 }
92
93 self.store_state_root(handle, state.root_cell().clone(), hint)
94 .await
95 }
96
97 pub async fn store_state_root(
98 &self,
99 handle: &BlockHandle,
100 root_cell: Cell,
101 hint: StoreStateHint,
102 ) -> Result<bool> {
103 if handle.has_state() {
104 return Ok(false);
105 }
106
107 let _gc_lock = {
108 let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high");
109 self.gc_lock.lock().await
110 };
111
112 if handle.has_state() {
114 return Ok(false);
115 }
116 let _hist = HistogramGuard::begin("tycho_storage_state_store_time");
117
118 let block_id = *handle.id();
119 let raw_db = self.db.rocksdb().clone();
120 let cf = self.db.shard_states.get_unbounded_cf();
121 let cell_storage = self.cell_storage.clone();
122 let block_handle_storage = self.block_handle_storage.clone();
123 let handle = handle.clone();
124 let accounts_split_depth = self.accounts_split_depth;
125
126 let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
128 let root_hash = *root_cell.repr_hash();
129 let estimated_merkle_update_size = hint.estimate_cell_count();
130
131 let estimated_update_size_bytes = estimated_merkle_update_size * 192; let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);
133
134 let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high");
135
136 let new_cell_count = if block_id.is_masterchain() {
137 cell_storage.store_cell(
138 &mut batch,
139 root_cell.as_ref(),
140 estimated_merkle_update_size,
141 )?
142 } else {
143 let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?;
144
145 cell_storage.store_cell_mt(
146 root_cell.as_ref(),
147 &mut batch,
148 split_at,
149 estimated_merkle_update_size,
150 )?
151 };
152
153 in_mem_store.finish();
154 metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);
155
156 batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());
157
158 let hist = HistogramGuard::begin("tycho_storage_state_update_time_high");
159 metrics::histogram!("tycho_storage_state_update_size_bytes")
160 .record(batch.size_in_bytes() as f64);
161 metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
162 .record(estimated_update_size_bytes as f64);
163
164 raw_db.write(batch)?;
165
166 drop(root_cell);
167
168 hist.finish();
169
170 let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
171 if updated {
172 block_handle_storage.store_handle(&handle, false);
173 }
174
175 Ok::<_, anyhow::Error>((new_cell_count, updated))
176 })
177 .await??;
178
179 let count = if block_id.shard.is_masterchain() {
180 &self.max_new_mc_cell_count
181 } else {
182 &self.max_new_sc_cell_count
183 };
184
185 count.fetch_max(new_cell_count, Ordering::Release);
186
187 Ok(updated)
188 }
189
190 pub async fn store_state_file(&self, block_id: &BlockId, boc: File) -> Result<ShardStateStuff> {
191 let ctx = StoreStateContext {
192 db: self.db.clone(),
193 cell_storage: self.cell_storage.clone(),
194 temp_file_storage: self.temp_file_storage.clone(),
195 min_ref_mc_state: self.min_ref_mc_state.clone(),
196 };
197
198 let block_id = *block_id;
199
200 let _gc_lock = self.gc_lock.lock().await;
201 tokio::task::spawn_blocking(move || ctx.store(&block_id, boc)).await?
202 }
203
204 pub async fn load_state(&self, block_id: &BlockId) -> Result<ShardStateStuff> {
205 let cell_id = self.load_state_root(block_id)?;
206 let cell = self.cell_storage.load_cell(cell_id)?;
207
208 ShardStateStuff::from_root(block_id, Cell::from(cell as Arc<_>), &self.min_ref_mc_state)
209 }
210
211 #[tracing::instrument(skip(self))]
212 pub async fn remove_outdated_states(&self, mc_seqno: u32) -> Result<()> {
213 let Some(top_blocks) = self.compute_recent_blocks(mc_seqno).await? else {
215 tracing::warn!("recent blocks edge not found");
216 return Ok(());
217 };
218
219 tracing::info!(
220 target_block_id = %top_blocks.mc_block,
221 "started states GC",
222 );
223 let started_at = Instant::now();
224
225 let raw = self.db.rocksdb();
226
227 let snapshot = raw.snapshot();
229 let shard_states_cf = self.db.shard_states.get_unbounded_cf();
230 let mut states_read_options = self.db.shard_states.new_read_config();
231 states_read_options.set_snapshot(&snapshot);
232
233 let mut alloc = bumpalo_herd::Herd::new();
234
235 let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options);
237 iter.seek_to_first();
238
239 let mut removed_states = 0usize;
241 let mut removed_cells = 0usize;
242 loop {
243 let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high");
244 let (key, value) = match iter.item() {
245 Some(item) => item,
246 None => match iter.status() {
247 Ok(()) => break,
248 Err(e) => return Err(e.into()),
249 },
250 };
251
252 let block_id = BlockId::from_slice(key);
253 let root_hash = HashBytes::from_slice(value);
254
255 if block_id.seqno == 0
257 || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno)
258 {
259 iter.next();
260 continue;
261 }
262
263 alloc.reset();
264
265 let guard = {
266 let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high");
267 self.gc_lock.lock().await
268 };
269
270 let db = self.db.clone();
271 let cell_storage = self.cell_storage.clone();
272 let key = key.to_vec();
273 let accounts_split_depth = self.accounts_split_depth;
274 let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
275 let in_mem_remove =
276 HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high");
277
278 let (stats, mut batch) = if block_id.is_masterchain() {
279 cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)?
280 } else {
281 let root_cell = Cell::from(cell_storage.load_cell(root_hash)? as Arc<_>);
282
283 let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?
284 .into_keys()
285 .collect::<FastHashSet<HashBytes>>();
286 cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)?
287 };
288
289 in_mem_remove.finish();
290
291 batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
292 db.raw()
293 .rocksdb()
294 .write_opt(batch, db.cells.write_config())?;
295
296 Ok::<_, anyhow::Error>((stats, alloc))
297 })
298 .await??;
299
300 drop(guard);
301
302 removed_cells += total;
303 alloc = inner_alloc; tracing::debug!(removed_cells = total, %block_id);
306
307 removed_states += 1;
308 iter.next();
309
310 metrics::counter!("tycho_storage_state_gc_count").increment(1);
311 metrics::counter!("tycho_storage_state_gc_cells_count").increment(1);
312 if block_id.is_masterchain() {
313 metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64);
314 }
315 tracing::debug!(removed_states, removed_cells, %block_id, "removed state");
316 }
317
318 tracing::info!(
320 removed_states,
321 removed_cells,
322 block_id = %top_blocks.mc_block,
323 elapsed_sec = started_at.elapsed().as_secs_f64(),
324 "finished states GC",
325 );
326 Ok(())
327 }
328
329 pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result<Option<TopBlocks>> {
333 if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno() {
335 if min_ref_mc_seqno < mc_seqno {
336 mc_seqno = min_ref_mc_seqno;
337 }
338 }
339
340 let snapshot = self.db.rocksdb().snapshot();
341
342 let mc_block_id = match self
346 .find_mc_block_id(mc_seqno, &snapshot)
347 .context("Failed to find block id by seqno")?
348 {
349 Some(block_id) => block_id,
350 None => return Ok(None),
351 };
352
353 let handle = match self.block_handle_storage.load_handle(&mc_block_id) {
355 Some(handle) if handle.has_data() => handle,
356 _ => return Ok(None),
358 };
359
360 let block_data = self.block_storage.load_block_data(&handle).await?;
363 let block_info = block_data
364 .load_info()
365 .context("Failed to read target block info")?;
366
367 let min_ref_mc_seqno = block_info.min_ref_mc_seqno;
369 let min_ref_block_id = match self.find_mc_block_id(min_ref_mc_seqno, &snapshot)? {
370 Some(block_id) => block_id,
371 None => return Ok(None),
372 };
373
374 let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) {
376 Some(handle) if handle.has_data() => handle,
377 _ => return Ok(None),
379 };
380
381 self.block_storage
383 .load_block_data(&min_ref_block_handle)
384 .await
385 .and_then(|block_data| TopBlocks::from_mc_block(&block_data))
386 .map(Some)
387 }
388
389 pub fn load_state_root(&self, block_id: &BlockId) -> Result<HashBytes> {
390 let shard_states = &self.db.shard_states;
391 let shard_state = shard_states.get(block_id.to_vec())?;
392 match shard_state {
393 Some(root) => Ok(HashBytes::from_slice(&root[..32])),
394 None => Err(ShardStateStorageError::NotFound.into()),
395 }
396 }
397
398 fn find_mc_block_id(
399 &self,
400 mc_seqno: u32,
401 snapshot: &rocksdb::Snapshot<'_>,
402 ) -> Result<Option<BlockId>> {
403 let shard_states = &self.db.shard_states;
404
405 let mut bound = BlockId {
406 shard: ShardIdent::MASTERCHAIN,
407 seqno: mc_seqno,
408 root_hash: HashBytes::ZERO,
409 file_hash: HashBytes::ZERO,
410 };
411
412 let mut readopts = shard_states.new_read_config();
413 readopts.set_snapshot(snapshot);
414 readopts.set_iterate_lower_bound(bound.to_vec().as_slice());
415 bound.seqno += 1;
416 readopts.set_iterate_upper_bound(bound.to_vec().as_slice());
417
418 let mut iter = self
419 .db
420 .rocksdb()
421 .raw_iterator_cf_opt(&shard_states.cf(), readopts);
422 iter.seek_to_first();
423
424 Ok(iter.key().map(BlockId::from_slice))
425 }
426}
427
428#[derive(Default, Debug, Clone, Copy)]
429pub struct StoreStateHint {
430 pub block_data_size: Option<usize>,
431}
432
433impl StoreStateHint {
434 fn estimate_cell_count(&self) -> usize {
435 const MIN_BLOCK_SIZE: usize = 4 << 10; let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);
438
439 ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
442 }
443}
444
445#[derive(Debug, Copy, Clone)]
446pub struct ShardStateStorageMetrics {
447 pub max_new_mc_cell_count: usize,
448 pub max_new_sc_cell_count: usize,
449}
450
451#[derive(thiserror::Error, Debug)]
452pub enum ShardStateStorageError {
453 #[error("Not found")]
454 NotFound,
455 #[error("Block handle id mismatch")]
456 BlockHandleIdMismatch,
457}
458
459fn split_shard_accounts(
460 root_cell: impl AsRef<DynCell>,
461 split_depth: u8,
462) -> Result<FastHashMap<HashBytes, Cell>> {
463 let shard_accounts = root_cell
466 .as_ref()
467 .reference_cloned(1)
468 .context("invalid shard state")?
469 .parse::<ShardAccounts>()
470 .context("failed to load shard accounts")?;
471
472 split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts")
473}