tycho_core/storage/block_handle/
mod.rs

1use std::sync::Arc;
2
3pub(crate) use handle::BlockDataGuard;
4use tycho_block_util::block::{BlockStuff, ShardHeights};
5use tycho_storage::kv::StoredValue;
6use tycho_types::models::BlockId;
7use tycho_util::FastDashMap;
8
9pub use self::handle::{BlockHandle, WeakBlockHandle};
10pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
11use super::{CoreDb, PartialBlockId};
12
13mod handle;
14mod meta;
15
16type BlockHandleCache = FastDashMap<BlockId, WeakBlockHandle>;
17
18pub struct BlockHandleStorage {
19    db: CoreDb,
20    cache: Arc<BlockHandleCache>,
21}
22
23impl BlockHandleStorage {
24    pub fn new(db: CoreDb) -> Self {
25        Self {
26            db,
27            cache: Arc::new(Default::default()),
28        }
29    }
30
31    pub fn set_block_committed(&self, handle: &BlockHandle) -> bool {
32        let updated = handle.meta().add_flags(BlockFlags::IS_COMMITTED);
33        if updated {
34            self.store_handle(handle, false);
35        }
36        updated
37    }
38
39    pub fn set_has_shard_state(&self, handle: &BlockHandle) -> bool {
40        let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
41        if updated {
42            self.store_handle(handle, false);
43        }
44        updated
45    }
46
47    pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
48        let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
49        if updated {
50            self.store_handle(handle, false);
51        }
52        updated
53    }
54
55    pub fn set_has_persistent_shard_state(&self, handle: &BlockHandle) -> bool {
56        let updated = handle
57            .meta()
58            .add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
59        if updated {
60            self.store_handle(handle, false);
61        }
62        updated
63    }
64
65    pub fn set_has_persistent_queue_state(&self, handle: &BlockHandle) -> bool {
66        let updated = handle
67            .meta()
68            .add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
69        if updated {
70            self.store_handle(handle, false);
71        }
72        updated
73    }
74
75    pub fn create_or_load_handle(
76        &self,
77        block_id: &BlockId,
78        meta_data: NewBlockMeta,
79    ) -> (BlockHandle, HandleCreationStatus) {
80        use dashmap::mapref::entry::Entry;
81
82        let block_handles = &self.db.block_handles;
83
84        // Fast path - lookup in cache
85        if let Some(handle) = self.cache.get(block_id)
86            && let Some(handle) = handle.upgrade()
87        {
88            return (handle, HandleCreationStatus::Fetched);
89        }
90
91        match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
92            // Try to load block handle from an existing data
93            Some(data) => {
94                let meta = BlockMeta::from_slice(data.as_ref());
95
96                // Fill the cache with a new handle
97                let handle = self.fill_cache(block_id, meta);
98
99                // Done
100                (handle, HandleCreationStatus::Fetched)
101            }
102            None => {
103                // Create a new handle
104                let handle = BlockHandle::new(
105                    block_id,
106                    BlockMeta::with_data(meta_data),
107                    self.cache.clone(),
108                );
109
110                // Fill the cache with the new handle
111                let is_new = match self.cache.entry(*block_id) {
112                    Entry::Vacant(entry) => {
113                        entry.insert(handle.downgrade());
114                        true
115                    }
116                    Entry::Occupied(mut entry) => match entry.get().upgrade() {
117                        // Another thread has created the handle
118                        Some(handle) => return (handle, HandleCreationStatus::Fetched),
119                        None => {
120                            entry.insert(handle.downgrade());
121                            true
122                        }
123                    },
124                };
125
126                // Store the handle in the storage
127                self.store_handle(&handle, is_new);
128
129                // Done
130                (handle, HandleCreationStatus::Created)
131            }
132        }
133    }
134
135    pub fn load_handle(&self, block_id: &BlockId) -> Option<BlockHandle> {
136        let block_handles = &self.db.block_handles;
137
138        // Fast path - lookup in cache
139        if let Some(handle) = self.cache.get(block_id)
140            && let Some(handle) = handle.upgrade()
141        {
142            return Some(handle);
143        }
144
145        // Load meta from storage
146        let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
147            Some(data) => BlockMeta::from_slice(data.as_ref()),
148            None => return None,
149        };
150
151        // Fill the cache with a new handle
152        Some(self.fill_cache(block_id, meta))
153    }
154
155    pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
156        let id = handle.id();
157
158        let is_key_block = handle.is_key_block();
159
160        if is_new || is_key_block {
161            let mut batch = weedb::rocksdb::WriteBatch::default();
162
163            batch.merge_cf(
164                &self.db.block_handles.cf(),
165                id.root_hash,
166                handle.meta().to_vec(),
167            );
168
169            if is_new {
170                batch.put_cf(
171                    &self.db.full_block_ids.cf(),
172                    PartialBlockId::from(id).to_vec(),
173                    id.file_hash,
174                );
175            }
176
177            if is_key_block {
178                batch.put_cf(
179                    &self.db.key_blocks.cf(),
180                    id.seqno.to_be_bytes(),
181                    id.to_vec(),
182                );
183            }
184
185            self.db
186                .rocksdb()
187                .write_opt(batch, self.db.block_handles.write_config())
188        } else {
189            self.db.rocksdb().merge_cf_opt(
190                &self.db.block_handles.cf(),
191                id.root_hash,
192                handle.meta().to_vec(),
193                self.db.block_handles.write_config(),
194            )
195        }
196        .unwrap();
197    }
198
199    pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
200        let key_blocks = &self.db.key_blocks;
201        let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() {
202            Some(data) => BlockId::from_slice(data.as_ref()),
203            None => return None,
204        };
205        self.load_handle(&key_block_id)
206    }
207
208    pub fn find_last_key_block(&self) -> Option<BlockHandle> {
209        let mut iter = self.db.key_blocks.raw_iterator();
210        iter.seek_to_last();
211
212        // Load key block from the current iterator value
213        let key_block_id = BlockId::from_slice(iter.value()?);
214        self.load_handle(&key_block_id)
215    }
216
217    pub fn find_prev_key_block(&self, seqno: u32) -> Option<BlockHandle> {
218        if seqno == 0 {
219            return None;
220        }
221
222        // Create iterator and move it to the previous key block before the specified
223        let mut iter = self.db.key_blocks.raw_iterator();
224        iter.seek_for_prev((seqno - 1u32).to_be_bytes());
225
226        // Load key block from current iterator value
227        let key_block_id = BlockId::from_slice(iter.value()?);
228        self.load_handle(&key_block_id)
229    }
230
231    pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option<BlockHandle> {
232        if seqno == 0 {
233            return None;
234        }
235
236        // Create iterator and move it to the previous key block before the specified
237        let mut iter = self.db.key_blocks.raw_iterator();
238        iter.seek_for_prev((seqno - 1u32).to_be_bytes());
239
240        // Loads key block from current iterator value and moves it backward
241        let mut get_key_block = move || -> Option<BlockHandle> {
242            // Load key block id
243            let key_block_id = BlockId::from_slice(iter.value()?);
244
245            // Load block handle for this id
246            let handle = self.load_handle(&key_block_id)?;
247
248            // Move iterator backward
249            iter.prev();
250
251            // Done
252            Some(handle)
253        };
254
255        // Load previous key block
256        let mut key_block = get_key_block()?;
257
258        // Load previous key blocks and check if the `key_block` is for persistent state
259        while let Some(prev_key_block) = get_key_block() {
260            if BlockStuff::compute_is_persistent(key_block.gen_utime(), prev_key_block.gen_utime())
261            {
262                // Found
263                return Some(key_block);
264            }
265            key_block = prev_key_block;
266        }
267
268        // Not found
269        None
270    }
271
272    pub fn key_blocks_iterator(
273        &self,
274        direction: KeyBlocksDirection,
275    ) -> impl Iterator<Item = BlockId> + '_ {
276        let mut raw_iterator = self.db.key_blocks.raw_iterator();
277        let reverse = match direction {
278            KeyBlocksDirection::ForwardFrom(seqno) => {
279                raw_iterator.seek(seqno.to_be_bytes());
280                false
281            }
282            KeyBlocksDirection::Backward => {
283                raw_iterator.seek_to_last();
284                true
285            }
286        };
287
288        KeyBlocksIterator {
289            raw_iterator,
290            reverse,
291        }
292    }
293
294    pub fn gc_handles_cache(&self, mc_seqno: u32, shard_heights: &ShardHeights) -> usize {
295        let mut total_removed = 0;
296
297        self.cache.retain(|block_id, value| {
298            let value = match value.upgrade() {
299                Some(value) => value,
300                None => {
301                    total_removed += 1;
302                    return false;
303                }
304            };
305
306            let is_masterchain = block_id.is_masterchain();
307
308            if block_id.seqno == 0
309                || is_masterchain && (block_id.seqno >= mc_seqno || value.is_key_block())
310                || !is_masterchain
311                    && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
312            {
313                // Keep zero state, key blocks and latest blocks
314                true
315            } else {
316                // Remove all outdated
317                total_removed += 1;
318                value.meta().add_flags(BlockFlags::IS_REMOVED);
319                false
320            }
321        });
322
323        total_removed
324    }
325
326    fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle {
327        use dashmap::mapref::entry::Entry;
328
329        match self.cache.entry(*block_id) {
330            Entry::Vacant(entry) => {
331                let handle = BlockHandle::new(block_id, meta, self.cache.clone());
332                entry.insert(handle.downgrade());
333                handle
334            }
335            Entry::Occupied(mut entry) => match entry.get().upgrade() {
336                Some(handle) => handle,
337                None => {
338                    let handle = BlockHandle::new(block_id, meta, self.cache.clone());
339                    entry.insert(handle.downgrade());
340                    handle
341                }
342            },
343        }
344    }
345}
346
347#[derive(Debug, Copy, Clone, Eq, PartialEq)]
348pub enum HandleCreationStatus {
349    Created,
350    Fetched,
351}
352
353#[derive(Debug, Copy, Clone, Eq, PartialEq)]
354pub enum KeyBlocksDirection {
355    ForwardFrom(u32),
356    Backward,
357}
358
359struct KeyBlocksIterator<'a> {
360    raw_iterator: weedb::rocksdb::DBRawIterator<'a>,
361    reverse: bool,
362}
363
364impl Iterator for KeyBlocksIterator<'_> {
365    type Item = BlockId;
366
367    fn next(&mut self) -> Option<Self::Item> {
368        let value = self.raw_iterator.value().map(BlockId::from_slice)?;
369        if self.reverse {
370            self.raw_iterator.prev();
371        } else {
372            self.raw_iterator.next();
373        }
374        Some(value)
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use tycho_storage::StorageContext;
381    use tycho_types::models::ShardIdent;
382
383    use super::*;
384    use crate::storage::{CoreStorage, CoreStorageConfig};
385
386    #[tokio::test]
387    async fn merge_operator_works() -> anyhow::Result<()> {
388        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
389        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
390
391        let block_handles = storage.block_handle_storage();
392
393        let block_id = BlockId {
394            shard: ShardIdent::BASECHAIN,
395            seqno: 100,
396            ..Default::default()
397        };
398
399        let meta = NewBlockMeta {
400            is_key_block: false,
401            gen_utime: 123,
402            ref_by_mc_seqno: 456,
403        };
404
405        {
406            let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
407            assert_eq!(status, HandleCreationStatus::Created);
408
409            assert_eq!(handle.ref_by_mc_seqno(), 456);
410            assert!(!handle.is_key_block());
411            assert!(!handle.is_committed());
412
413            let updated = block_handles.set_block_committed(&handle);
414            assert!(updated);
415            assert!(handle.is_committed());
416
417            // Ensure that handles are reused
418            let (handle2, status) = block_handles.create_or_load_handle(&block_id, meta);
419            assert_eq!(status, HandleCreationStatus::Fetched);
420
421            assert_eq!(
422                arc_swap::RefCnt::as_ptr(&handle),
423                arc_swap::RefCnt::as_ptr(&handle2),
424            );
425        }
426
427        // Ensure that the handle is dropped
428        assert!(!block_handles.cache.contains_key(&block_id));
429
430        // Ensure that storage is properly updated
431        {
432            let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
433            assert_eq!(status, HandleCreationStatus::Fetched);
434
435            assert_eq!(handle.ref_by_mc_seqno(), 456);
436            assert!(!handle.is_key_block());
437            assert!(handle.is_committed());
438        }
439
440        // Ensure that the handle is dropped
441        assert!(!block_handles.cache.contains_key(&block_id));
442
443        Ok(())
444    }
445}