Skip to main content

tycho_core/storage/block_handle/
mod.rs

1use std::sync::Arc;
2
3pub(crate) use handle::BlockDataGuard;
4use tycho_block_util::block::{BlockStuff, ShardHeights};
5use tycho_storage::kv::StoredValue;
6use tycho_types::models::BlockId;
7use tycho_util::FastDashMap;
8
9pub use self::handle::{BlockHandle, WeakBlockHandle};
10pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
11use super::{CoreDb, PartialBlockId};
12
13mod handle;
14mod meta;
15
16type BlockHandleCache = FastDashMap<BlockId, WeakBlockHandle>;
17
18pub struct BlockHandleStorage {
19    db: CoreDb,
20    cache: Arc<BlockHandleCache>,
21}
22
23impl BlockHandleStorage {
24    pub fn new(db: CoreDb) -> Self {
25        Self {
26            db,
27            cache: Arc::new(Default::default()),
28        }
29    }
30
31    pub fn set_skip_states_gc(&self, handle: &BlockHandle) -> bool {
32        let updated = handle.meta().add_flags(BlockFlags::SKIP_STATES_GC);
33        if updated {
34            self.store_handle(handle, false);
35            tracing::debug!(block_id = %handle.id(), "skip states gc was set");
36        }
37        updated
38    }
39
40    pub fn set_skip_states_gc_finished(&self, handle: &BlockHandle) -> bool {
41        let updated = handle.meta().add_flags(BlockFlags::SKIP_STATES_GC_FINISHED);
42        if updated {
43            self.store_handle(handle, false);
44            tracing::debug!(block_id = %handle.id(), "skip states gc was finished");
45        }
46        updated
47    }
48
49    pub fn set_skip_blocks_gc(&self, handle: &BlockHandle) -> bool {
50        let updated = handle.meta().add_flags(BlockFlags::SKIP_BLOCKS_GC);
51        if updated {
52            self.store_handle(handle, false);
53            tracing::debug!(block_id = %handle.id(), "skip blocks gc was set");
54        }
55        updated
56    }
57
58    pub fn set_skip_blocks_gc_finished(&self, handle: &BlockHandle) -> bool {
59        let updated = handle.meta().add_flags(BlockFlags::SKIP_BLOCKS_GC_FINISHED);
60        if updated {
61            self.store_handle(handle, false);
62            tracing::debug!(block_id = %handle.id(), "skip blocks gc was finished");
63        }
64        updated
65    }
66
67    pub fn set_block_committed(&self, handle: &BlockHandle) -> bool {
68        let updated = handle.meta().add_flags(BlockFlags::IS_COMMITTED);
69        if updated {
70            self.store_handle(handle, false);
71        }
72        updated
73    }
74
75    pub fn set_has_shard_state(&self, handle: &BlockHandle) -> bool {
76        let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
77        if updated {
78            self.store_handle(handle, false);
79        }
80        updated
81    }
82
83    pub fn set_has_virtual_shard_state(&self, handle: &BlockHandle) -> bool {
84        let updated = handle.meta().add_flags(BlockFlags::HAS_VIRTUAL_STATE);
85        if updated {
86            self.store_handle(handle, false);
87        }
88        updated
89    }
90
91    pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
92        let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
93        if updated {
94            self.store_handle(handle, false);
95        }
96        updated
97    }
98
99    pub fn set_has_persistent_shard_state(&self, handle: &BlockHandle) -> bool {
100        let updated = handle
101            .meta()
102            .add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
103        if updated {
104            self.store_handle(handle, false);
105        }
106        updated
107    }
108
109    pub fn set_has_persistent_queue_state(&self, handle: &BlockHandle) -> bool {
110        let updated = handle
111            .meta()
112            .add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
113        if updated {
114            self.store_handle(handle, false);
115        }
116        updated
117    }
118
119    pub fn set_is_zerostate(&self, handle: &BlockHandle) -> bool {
120        let updated = handle.meta().add_flags(BlockFlags::IS_ZEROSTATE);
121        if updated {
122            self.store_handle(handle, false);
123        }
124        updated
125    }
126
127    pub fn create_or_load_handle(
128        &self,
129        block_id: &BlockId,
130        meta_data: NewBlockMeta,
131    ) -> (BlockHandle, HandleCreationStatus) {
132        use dashmap::mapref::entry::Entry;
133
134        let block_handles = &self.db.block_handles;
135
136        // Fast path - lookup in cache
137        if let Some(handle) = self.cache.get(block_id)
138            && let Some(handle) = handle.upgrade()
139        {
140            return (handle, HandleCreationStatus::Fetched);
141        }
142
143        match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
144            // Try to load block handle from an existing data
145            Some(data) => {
146                let meta = BlockMeta::from_slice(data.as_ref());
147
148                // Fill the cache with a new handle
149                let handle = self.fill_cache(block_id, meta);
150
151                // Done
152                (handle, HandleCreationStatus::Fetched)
153            }
154            None => {
155                // Create a new handle
156                let handle = BlockHandle::new(
157                    block_id,
158                    BlockMeta::with_data(meta_data),
159                    self.cache.clone(),
160                );
161
162                // Fill the cache with the new handle
163                let is_new = match self.cache.entry(*block_id) {
164                    Entry::Vacant(entry) => {
165                        entry.insert(handle.downgrade());
166                        true
167                    }
168                    Entry::Occupied(mut entry) => match entry.get().upgrade() {
169                        // Another thread has created the handle
170                        Some(handle) => return (handle, HandleCreationStatus::Fetched),
171                        None => {
172                            entry.insert(handle.downgrade());
173                            true
174                        }
175                    },
176                };
177
178                // Store the handle in the storage
179                self.store_handle(&handle, is_new);
180
181                // Done
182                (handle, HandleCreationStatus::Created)
183            }
184        }
185    }
186
187    pub fn load_handle(&self, block_id: &BlockId) -> Option<BlockHandle> {
188        let block_handles = &self.db.block_handles;
189
190        // Fast path - lookup in cache
191        if let Some(handle) = self.cache.get(block_id)
192            && let Some(handle) = handle.upgrade()
193        {
194            return Some(handle);
195        }
196
197        // Load meta from storage
198        let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
199            Some(data) => BlockMeta::from_slice(data.as_ref()),
200            None => return None,
201        };
202
203        // Fill the cache with a new handle
204        Some(self.fill_cache(block_id, meta))
205    }
206
207    pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
208        let id = handle.id();
209
210        let is_key_block = handle.is_key_block();
211
212        if is_new || is_key_block {
213            let mut batch = weedb::rocksdb::WriteBatch::default();
214
215            batch.merge_cf(
216                &self.db.block_handles.cf(),
217                id.root_hash,
218                handle.meta().to_vec(),
219            );
220
221            if is_new {
222                batch.put_cf(
223                    &self.db.full_block_ids.cf(),
224                    PartialBlockId::from(id).to_vec(),
225                    id.file_hash,
226                );
227            }
228
229            if is_key_block {
230                batch.put_cf(
231                    &self.db.key_blocks.cf(),
232                    id.seqno.to_be_bytes(),
233                    id.to_vec(),
234                );
235            }
236
237            self.db
238                .rocksdb()
239                .write_opt(batch, self.db.block_handles.write_config())
240        } else {
241            self.db.rocksdb().merge_cf_opt(
242                &self.db.block_handles.cf(),
243                id.root_hash,
244                handle.meta().to_vec(),
245                self.db.block_handles.write_config(),
246            )
247        }
248        .unwrap();
249    }
250
251    pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
252        let key_blocks = &self.db.key_blocks;
253        let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() {
254            Some(data) => BlockId::from_slice(data.as_ref()),
255            None => return None,
256        };
257        self.load_handle(&key_block_id)
258    }
259
260    pub fn find_last_key_block(&self) -> Option<BlockHandle> {
261        let mut iter = self.db.key_blocks.raw_iterator();
262        iter.seek_to_last();
263
264        // Load key block from the current iterator value
265        let key_block_id = BlockId::from_slice(iter.value()?);
266        self.load_handle(&key_block_id)
267    }
268
269    pub fn find_prev_key_block(&self, seqno: u32) -> Option<BlockHandle> {
270        if seqno == 0 {
271            return None;
272        }
273
274        // Create iterator and move it to the previous key block before the specified
275        let mut iter = self.db.key_blocks.raw_iterator();
276        iter.seek_for_prev((seqno - 1u32).to_be_bytes());
277
278        // Load key block from current iterator value
279        let key_block_id = BlockId::from_slice(iter.value()?);
280        self.load_handle(&key_block_id)
281    }
282
283    pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option<BlockHandle> {
284        if seqno == 0 {
285            return None;
286        }
287
288        // Create iterator and move it to the previous key block before the specified
289        let mut iter = self.db.key_blocks.raw_iterator();
290        iter.seek_for_prev((seqno - 1u32).to_be_bytes());
291
292        // Loads key block from current iterator value and moves it backward
293        let mut get_key_block = move || -> Option<BlockHandle> {
294            // Load key block id
295            let key_block_id = BlockId::from_slice(iter.value()?);
296
297            // Load block handle for this id
298            let handle = self.load_handle(&key_block_id)?;
299
300            // Move iterator backward
301            iter.prev();
302
303            // Done
304            Some(handle)
305        };
306
307        // Load previous key block
308        let mut key_block = get_key_block()?;
309
310        // Load previous key blocks and check if the `key_block` is for persistent state
311        while let Some(prev_key_block) = get_key_block() {
312            if BlockStuff::compute_is_persistent(key_block.gen_utime(), prev_key_block.gen_utime())
313            {
314                // Found
315                return Some(key_block);
316            }
317            key_block = prev_key_block;
318        }
319
320        // Not found
321        None
322    }
323
324    pub fn key_blocks_iterator(
325        &self,
326        direction: KeyBlocksDirection,
327    ) -> impl Iterator<Item = BlockId> + '_ {
328        let mut raw_iterator = self.db.key_blocks.raw_iterator();
329        let reverse = match direction {
330            KeyBlocksDirection::ForwardFrom(seqno) => {
331                raw_iterator.seek(seqno.to_be_bytes());
332                false
333            }
334            KeyBlocksDirection::Backward => {
335                raw_iterator.seek_to_last();
336                true
337            }
338        };
339
340        KeyBlocksIterator {
341            raw_iterator,
342            reverse,
343        }
344    }
345
346    pub fn gc_handles_cache(&self, mc_seqno: u32, shard_heights: &ShardHeights) -> usize {
347        let mut total_removed = 0;
348
349        self.cache.retain(|block_id, value| {
350            let value = match value.upgrade() {
351                Some(value) => value,
352                None => {
353                    total_removed += 1;
354                    return false;
355                }
356            };
357
358            let is_masterchain = block_id.is_masterchain();
359
360            if block_id.seqno == 0
361                || is_masterchain && (block_id.seqno >= mc_seqno || value.is_key_block())
362                || !is_masterchain
363                    && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
364            {
365                // Keep zero state, key blocks and latest blocks
366                true
367            } else {
368                // Remove all outdated
369                total_removed += 1;
370                value.meta().add_flags(BlockFlags::IS_REMOVED);
371                false
372            }
373        });
374
375        total_removed
376    }
377
378    fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle {
379        use dashmap::mapref::entry::Entry;
380
381        match self.cache.entry(*block_id) {
382            Entry::Vacant(entry) => {
383                let handle = BlockHandle::new(block_id, meta, self.cache.clone());
384                entry.insert(handle.downgrade());
385                handle
386            }
387            Entry::Occupied(mut entry) => match entry.get().upgrade() {
388                Some(handle) => handle,
389                None => {
390                    let handle = BlockHandle::new(block_id, meta, self.cache.clone());
391                    entry.insert(handle.downgrade());
392                    handle
393                }
394            },
395        }
396    }
397}
398
399#[derive(Debug, Copy, Clone, Eq, PartialEq)]
400pub enum HandleCreationStatus {
401    Created,
402    Fetched,
403}
404
405#[derive(Debug, Copy, Clone, Eq, PartialEq)]
406pub enum KeyBlocksDirection {
407    ForwardFrom(u32),
408    Backward,
409}
410
411struct KeyBlocksIterator<'a> {
412    raw_iterator: weedb::rocksdb::DBRawIterator<'a>,
413    reverse: bool,
414}
415
416impl Iterator for KeyBlocksIterator<'_> {
417    type Item = BlockId;
418
419    fn next(&mut self) -> Option<Self::Item> {
420        let value = self.raw_iterator.value().map(BlockId::from_slice)?;
421        if self.reverse {
422            self.raw_iterator.prev();
423        } else {
424            self.raw_iterator.next();
425        }
426        Some(value)
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use tycho_storage::StorageContext;
433    use tycho_types::models::ShardIdent;
434
435    use super::*;
436    use crate::storage::{CoreStorage, CoreStorageConfig};
437
438    #[tokio::test]
439    async fn merge_operator_works() -> anyhow::Result<()> {
440        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
441        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
442
443        let block_handles = storage.block_handle_storage();
444
445        let block_id = BlockId {
446            shard: ShardIdent::BASECHAIN,
447            seqno: 100,
448            ..Default::default()
449        };
450
451        let meta = NewBlockMeta {
452            is_key_block: false,
453            gen_utime: 123,
454            ref_by_mc_seqno: 456,
455        };
456
457        {
458            let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
459            assert_eq!(status, HandleCreationStatus::Created);
460
461            assert_eq!(handle.ref_by_mc_seqno(), 456);
462            assert!(!handle.is_key_block());
463            assert!(!handle.is_committed());
464
465            let updated = block_handles.set_block_committed(&handle);
466            assert!(updated);
467            assert!(handle.is_committed());
468
469            // Ensure that handles are reused
470            let (handle2, status) = block_handles.create_or_load_handle(&block_id, meta);
471            assert_eq!(status, HandleCreationStatus::Fetched);
472
473            assert_eq!(
474                arc_swap::RefCnt::as_ptr(&handle),
475                arc_swap::RefCnt::as_ptr(&handle2),
476            );
477        }
478
479        // Ensure that the handle is dropped
480        assert!(!block_handles.cache.contains_key(&block_id));
481
482        // Ensure that storage is properly updated
483        {
484            let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
485            assert_eq!(status, HandleCreationStatus::Fetched);
486
487            assert_eq!(handle.ref_by_mc_seqno(), 456);
488            assert!(!handle.is_key_block());
489            assert!(handle.is_committed());
490        }
491
492        // Ensure that the handle is dropped
493        assert!(!block_handles.cache.contains_key(&block_id));
494
495        Ok(())
496    }
497}