tycho_core/storage/block_handle/
mod.rs1use std::sync::Arc;
2
3pub(crate) use handle::BlockDataGuard;
4use tycho_block_util::block::{BlockStuff, ShardHeights};
5use tycho_storage::kv::StoredValue;
6use tycho_types::models::BlockId;
7use tycho_util::FastDashMap;
8
9pub use self::handle::{BlockHandle, WeakBlockHandle};
10pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
11use super::{CoreDb, PartialBlockId};
12
13mod handle;
14mod meta;
15
16type BlockHandleCache = FastDashMap<BlockId, WeakBlockHandle>;
17
18pub struct BlockHandleStorage {
19 db: CoreDb,
20 cache: Arc<BlockHandleCache>,
21}
22
23impl BlockHandleStorage {
24 pub fn new(db: CoreDb) -> Self {
25 Self {
26 db,
27 cache: Arc::new(Default::default()),
28 }
29 }
30
31 pub fn set_skip_states_gc(&self, handle: &BlockHandle) -> bool {
32 let updated = handle.meta().add_flags(BlockFlags::SKIP_STATES_GC);
33 if updated {
34 self.store_handle(handle, false);
35 tracing::debug!(block_id = %handle.id(), "skip states gc was set");
36 }
37 updated
38 }
39
40 pub fn set_skip_states_gc_finished(&self, handle: &BlockHandle) -> bool {
41 let updated = handle.meta().add_flags(BlockFlags::SKIP_STATES_GC_FINISHED);
42 if updated {
43 self.store_handle(handle, false);
44 tracing::debug!(block_id = %handle.id(), "skip states gc was finished");
45 }
46 updated
47 }
48
49 pub fn set_skip_blocks_gc(&self, handle: &BlockHandle) -> bool {
50 let updated = handle.meta().add_flags(BlockFlags::SKIP_BLOCKS_GC);
51 if updated {
52 self.store_handle(handle, false);
53 tracing::debug!(block_id = %handle.id(), "skip blocks gc was set");
54 }
55 updated
56 }
57
58 pub fn set_skip_blocks_gc_finished(&self, handle: &BlockHandle) -> bool {
59 let updated = handle.meta().add_flags(BlockFlags::SKIP_BLOCKS_GC_FINISHED);
60 if updated {
61 self.store_handle(handle, false);
62 tracing::debug!(block_id = %handle.id(), "skip blocks gc was finished");
63 }
64 updated
65 }
66
67 pub fn set_block_committed(&self, handle: &BlockHandle) -> bool {
68 let updated = handle.meta().add_flags(BlockFlags::IS_COMMITTED);
69 if updated {
70 self.store_handle(handle, false);
71 }
72 updated
73 }
74
75 pub fn set_has_shard_state(&self, handle: &BlockHandle) -> bool {
76 let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
77 if updated {
78 self.store_handle(handle, false);
79 }
80 updated
81 }
82
83 pub fn set_has_virtual_shard_state(&self, handle: &BlockHandle) -> bool {
84 let updated = handle.meta().add_flags(BlockFlags::HAS_VIRTUAL_STATE);
85 if updated {
86 self.store_handle(handle, false);
87 }
88 updated
89 }
90
91 pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
92 let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
93 if updated {
94 self.store_handle(handle, false);
95 }
96 updated
97 }
98
99 pub fn set_has_persistent_shard_state(&self, handle: &BlockHandle) -> bool {
100 let updated = handle
101 .meta()
102 .add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
103 if updated {
104 self.store_handle(handle, false);
105 }
106 updated
107 }
108
109 pub fn set_has_persistent_queue_state(&self, handle: &BlockHandle) -> bool {
110 let updated = handle
111 .meta()
112 .add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
113 if updated {
114 self.store_handle(handle, false);
115 }
116 updated
117 }
118
119 pub fn set_is_zerostate(&self, handle: &BlockHandle) -> bool {
120 let updated = handle.meta().add_flags(BlockFlags::IS_ZEROSTATE);
121 if updated {
122 self.store_handle(handle, false);
123 }
124 updated
125 }
126
127 pub fn create_or_load_handle(
128 &self,
129 block_id: &BlockId,
130 meta_data: NewBlockMeta,
131 ) -> (BlockHandle, HandleCreationStatus) {
132 use dashmap::mapref::entry::Entry;
133
134 let block_handles = &self.db.block_handles;
135
136 if let Some(handle) = self.cache.get(block_id)
138 && let Some(handle) = handle.upgrade()
139 {
140 return (handle, HandleCreationStatus::Fetched);
141 }
142
143 match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
144 Some(data) => {
146 let meta = BlockMeta::from_slice(data.as_ref());
147
148 let handle = self.fill_cache(block_id, meta);
150
151 (handle, HandleCreationStatus::Fetched)
153 }
154 None => {
155 let handle = BlockHandle::new(
157 block_id,
158 BlockMeta::with_data(meta_data),
159 self.cache.clone(),
160 );
161
162 let is_new = match self.cache.entry(*block_id) {
164 Entry::Vacant(entry) => {
165 entry.insert(handle.downgrade());
166 true
167 }
168 Entry::Occupied(mut entry) => match entry.get().upgrade() {
169 Some(handle) => return (handle, HandleCreationStatus::Fetched),
171 None => {
172 entry.insert(handle.downgrade());
173 true
174 }
175 },
176 };
177
178 self.store_handle(&handle, is_new);
180
181 (handle, HandleCreationStatus::Created)
183 }
184 }
185 }
186
187 pub fn load_handle(&self, block_id: &BlockId) -> Option<BlockHandle> {
188 let block_handles = &self.db.block_handles;
189
190 if let Some(handle) = self.cache.get(block_id)
192 && let Some(handle) = handle.upgrade()
193 {
194 return Some(handle);
195 }
196
197 let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
199 Some(data) => BlockMeta::from_slice(data.as_ref()),
200 None => return None,
201 };
202
203 Some(self.fill_cache(block_id, meta))
205 }
206
207 pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
208 let id = handle.id();
209
210 let is_key_block = handle.is_key_block();
211
212 if is_new || is_key_block {
213 let mut batch = weedb::rocksdb::WriteBatch::default();
214
215 batch.merge_cf(
216 &self.db.block_handles.cf(),
217 id.root_hash,
218 handle.meta().to_vec(),
219 );
220
221 if is_new {
222 batch.put_cf(
223 &self.db.full_block_ids.cf(),
224 PartialBlockId::from(id).to_vec(),
225 id.file_hash,
226 );
227 }
228
229 if is_key_block {
230 batch.put_cf(
231 &self.db.key_blocks.cf(),
232 id.seqno.to_be_bytes(),
233 id.to_vec(),
234 );
235 }
236
237 self.db
238 .rocksdb()
239 .write_opt(batch, self.db.block_handles.write_config())
240 } else {
241 self.db.rocksdb().merge_cf_opt(
242 &self.db.block_handles.cf(),
243 id.root_hash,
244 handle.meta().to_vec(),
245 self.db.block_handles.write_config(),
246 )
247 }
248 .unwrap();
249 }
250
251 pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
252 let key_blocks = &self.db.key_blocks;
253 let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() {
254 Some(data) => BlockId::from_slice(data.as_ref()),
255 None => return None,
256 };
257 self.load_handle(&key_block_id)
258 }
259
260 pub fn find_last_key_block(&self) -> Option<BlockHandle> {
261 let mut iter = self.db.key_blocks.raw_iterator();
262 iter.seek_to_last();
263
264 let key_block_id = BlockId::from_slice(iter.value()?);
266 self.load_handle(&key_block_id)
267 }
268
269 pub fn find_prev_key_block(&self, seqno: u32) -> Option<BlockHandle> {
270 if seqno == 0 {
271 return None;
272 }
273
274 let mut iter = self.db.key_blocks.raw_iterator();
276 iter.seek_for_prev((seqno - 1u32).to_be_bytes());
277
278 let key_block_id = BlockId::from_slice(iter.value()?);
280 self.load_handle(&key_block_id)
281 }
282
283 pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option<BlockHandle> {
284 if seqno == 0 {
285 return None;
286 }
287
288 let mut iter = self.db.key_blocks.raw_iterator();
290 iter.seek_for_prev((seqno - 1u32).to_be_bytes());
291
292 let mut get_key_block = move || -> Option<BlockHandle> {
294 let key_block_id = BlockId::from_slice(iter.value()?);
296
297 let handle = self.load_handle(&key_block_id)?;
299
300 iter.prev();
302
303 Some(handle)
305 };
306
307 let mut key_block = get_key_block()?;
309
310 while let Some(prev_key_block) = get_key_block() {
312 if BlockStuff::compute_is_persistent(key_block.gen_utime(), prev_key_block.gen_utime())
313 {
314 return Some(key_block);
316 }
317 key_block = prev_key_block;
318 }
319
320 None
322 }
323
324 pub fn key_blocks_iterator(
325 &self,
326 direction: KeyBlocksDirection,
327 ) -> impl Iterator<Item = BlockId> + '_ {
328 let mut raw_iterator = self.db.key_blocks.raw_iterator();
329 let reverse = match direction {
330 KeyBlocksDirection::ForwardFrom(seqno) => {
331 raw_iterator.seek(seqno.to_be_bytes());
332 false
333 }
334 KeyBlocksDirection::Backward => {
335 raw_iterator.seek_to_last();
336 true
337 }
338 };
339
340 KeyBlocksIterator {
341 raw_iterator,
342 reverse,
343 }
344 }
345
346 pub fn gc_handles_cache(&self, mc_seqno: u32, shard_heights: &ShardHeights) -> usize {
347 let mut total_removed = 0;
348
349 self.cache.retain(|block_id, value| {
350 let value = match value.upgrade() {
351 Some(value) => value,
352 None => {
353 total_removed += 1;
354 return false;
355 }
356 };
357
358 let is_masterchain = block_id.is_masterchain();
359
360 if block_id.seqno == 0
361 || is_masterchain && (block_id.seqno >= mc_seqno || value.is_key_block())
362 || !is_masterchain
363 && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
364 {
365 true
367 } else {
368 total_removed += 1;
370 value.meta().add_flags(BlockFlags::IS_REMOVED);
371 false
372 }
373 });
374
375 total_removed
376 }
377
378 fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle {
379 use dashmap::mapref::entry::Entry;
380
381 match self.cache.entry(*block_id) {
382 Entry::Vacant(entry) => {
383 let handle = BlockHandle::new(block_id, meta, self.cache.clone());
384 entry.insert(handle.downgrade());
385 handle
386 }
387 Entry::Occupied(mut entry) => match entry.get().upgrade() {
388 Some(handle) => handle,
389 None => {
390 let handle = BlockHandle::new(block_id, meta, self.cache.clone());
391 entry.insert(handle.downgrade());
392 handle
393 }
394 },
395 }
396 }
397}
398
399#[derive(Debug, Copy, Clone, Eq, PartialEq)]
400pub enum HandleCreationStatus {
401 Created,
402 Fetched,
403}
404
405#[derive(Debug, Copy, Clone, Eq, PartialEq)]
406pub enum KeyBlocksDirection {
407 ForwardFrom(u32),
408 Backward,
409}
410
411struct KeyBlocksIterator<'a> {
412 raw_iterator: weedb::rocksdb::DBRawIterator<'a>,
413 reverse: bool,
414}
415
416impl Iterator for KeyBlocksIterator<'_> {
417 type Item = BlockId;
418
419 fn next(&mut self) -> Option<Self::Item> {
420 let value = self.raw_iterator.value().map(BlockId::from_slice)?;
421 if self.reverse {
422 self.raw_iterator.prev();
423 } else {
424 self.raw_iterator.next();
425 }
426 Some(value)
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use tycho_storage::StorageContext;
433 use tycho_types::models::ShardIdent;
434
435 use super::*;
436 use crate::storage::{CoreStorage, CoreStorageConfig};
437
438 #[tokio::test]
439 async fn merge_operator_works() -> anyhow::Result<()> {
440 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
441 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
442
443 let block_handles = storage.block_handle_storage();
444
445 let block_id = BlockId {
446 shard: ShardIdent::BASECHAIN,
447 seqno: 100,
448 ..Default::default()
449 };
450
451 let meta = NewBlockMeta {
452 is_key_block: false,
453 gen_utime: 123,
454 ref_by_mc_seqno: 456,
455 };
456
457 {
458 let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
459 assert_eq!(status, HandleCreationStatus::Created);
460
461 assert_eq!(handle.ref_by_mc_seqno(), 456);
462 assert!(!handle.is_key_block());
463 assert!(!handle.is_committed());
464
465 let updated = block_handles.set_block_committed(&handle);
466 assert!(updated);
467 assert!(handle.is_committed());
468
469 let (handle2, status) = block_handles.create_or_load_handle(&block_id, meta);
471 assert_eq!(status, HandleCreationStatus::Fetched);
472
473 assert_eq!(
474 arc_swap::RefCnt::as_ptr(&handle),
475 arc_swap::RefCnt::as_ptr(&handle2),
476 );
477 }
478
479 assert!(!block_handles.cache.contains_key(&block_id));
481
482 {
484 let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
485 assert_eq!(status, HandleCreationStatus::Fetched);
486
487 assert_eq!(handle.ref_by_mc_seqno(), 456);
488 assert!(!handle.is_key_block());
489 assert!(handle.is_committed());
490 }
491
492 assert!(!block_handles.cache.contains_key(&block_id));
494
495 Ok(())
496 }
497}