tycho_core/storage/block_handle/
mod.rs

1use std::sync::Arc;
2
3pub(crate) use handle::BlockDataGuard;
4use tycho_block_util::block::{BlockStuff, ShardHeights};
5use tycho_storage::kv::StoredValue;
6use tycho_types::models::BlockId;
7use tycho_util::FastDashMap;
8
9pub use self::handle::{BlockHandle, WeakBlockHandle};
10pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
11use super::{CoreDb, PartialBlockId};
12
13mod handle;
14mod meta;
15
16type BlockHandleCache = FastDashMap<BlockId, WeakBlockHandle>;
17
18pub struct BlockHandleStorage {
19    db: CoreDb,
20    cache: Arc<BlockHandleCache>,
21}
22
23impl BlockHandleStorage {
24    pub fn new(db: CoreDb) -> Self {
25        Self {
26            db,
27            cache: Arc::new(Default::default()),
28        }
29    }
30
31    pub fn set_block_committed(&self, handle: &BlockHandle) -> bool {
32        let updated = handle.meta().add_flags(BlockFlags::IS_COMMITTED);
33        if updated {
34            self.store_handle(handle, false);
35        }
36        updated
37    }
38
39    pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
40        let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
41        if updated {
42            self.store_handle(handle, false);
43        }
44        updated
45    }
46
47    pub fn set_has_persistent_shard_state(&self, handle: &BlockHandle) -> bool {
48        let updated = handle
49            .meta()
50            .add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
51        if updated {
52            self.store_handle(handle, false);
53        }
54        updated
55    }
56
57    pub fn set_has_persistent_queue_state(&self, handle: &BlockHandle) -> bool {
58        let updated = handle
59            .meta()
60            .add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
61        if updated {
62            self.store_handle(handle, false);
63        }
64        updated
65    }
66
67    pub fn create_or_load_handle(
68        &self,
69        block_id: &BlockId,
70        meta_data: NewBlockMeta,
71    ) -> (BlockHandle, HandleCreationStatus) {
72        use dashmap::mapref::entry::Entry;
73
74        let block_handles = &self.db.block_handles;
75
76        // Fast path - lookup in cache
77        if let Some(handle) = self.cache.get(block_id)
78            && let Some(handle) = handle.upgrade()
79        {
80            return (handle, HandleCreationStatus::Fetched);
81        }
82
83        match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
84            // Try to load block handle from an existing data
85            Some(data) => {
86                let meta = BlockMeta::from_slice(data.as_ref());
87
88                // Fill the cache with a new handle
89                let handle = self.fill_cache(block_id, meta);
90
91                // Done
92                (handle, HandleCreationStatus::Fetched)
93            }
94            None => {
95                // Create a new handle
96                let handle = BlockHandle::new(
97                    block_id,
98                    BlockMeta::with_data(meta_data),
99                    self.cache.clone(),
100                );
101
102                // Fill the cache with the new handle
103                let is_new = match self.cache.entry(*block_id) {
104                    Entry::Vacant(entry) => {
105                        entry.insert(handle.downgrade());
106                        true
107                    }
108                    Entry::Occupied(mut entry) => match entry.get().upgrade() {
109                        // Another thread has created the handle
110                        Some(handle) => return (handle, HandleCreationStatus::Fetched),
111                        None => {
112                            entry.insert(handle.downgrade());
113                            true
114                        }
115                    },
116                };
117
118                // Store the handle in the storage
119                self.store_handle(&handle, is_new);
120
121                // Done
122                (handle, HandleCreationStatus::Created)
123            }
124        }
125    }
126
127    pub fn load_handle(&self, block_id: &BlockId) -> Option<BlockHandle> {
128        let block_handles = &self.db.block_handles;
129
130        // Fast path - lookup in cache
131        if let Some(handle) = self.cache.get(block_id)
132            && let Some(handle) = handle.upgrade()
133        {
134            return Some(handle);
135        }
136
137        // Load meta from storage
138        let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
139            Some(data) => BlockMeta::from_slice(data.as_ref()),
140            None => return None,
141        };
142
143        // Fill the cache with a new handle
144        Some(self.fill_cache(block_id, meta))
145    }
146
147    pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
148        let id = handle.id();
149
150        let is_key_block = handle.is_key_block();
151
152        if is_new || is_key_block {
153            let mut batch = weedb::rocksdb::WriteBatch::default();
154
155            batch.merge_cf(
156                &self.db.block_handles.cf(),
157                id.root_hash,
158                handle.meta().to_vec(),
159            );
160
161            if is_new {
162                batch.put_cf(
163                    &self.db.full_block_ids.cf(),
164                    PartialBlockId::from(id).to_vec(),
165                    id.file_hash,
166                );
167            }
168
169            if is_key_block {
170                batch.put_cf(
171                    &self.db.key_blocks.cf(),
172                    id.seqno.to_be_bytes(),
173                    id.to_vec(),
174                );
175            }
176
177            self.db
178                .rocksdb()
179                .write_opt(batch, self.db.block_handles.write_config())
180        } else {
181            self.db.rocksdb().merge_cf_opt(
182                &self.db.block_handles.cf(),
183                id.root_hash,
184                handle.meta().to_vec(),
185                self.db.block_handles.write_config(),
186            )
187        }
188        .unwrap();
189    }
190
191    pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
192        let key_blocks = &self.db.key_blocks;
193        let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() {
194            Some(data) => BlockId::from_slice(data.as_ref()),
195            None => return None,
196        };
197        self.load_handle(&key_block_id)
198    }
199
200    pub fn find_last_key_block(&self) -> Option<BlockHandle> {
201        let mut iter = self.db.key_blocks.raw_iterator();
202        iter.seek_to_last();
203
204        // Load key block from the current iterator value
205        let key_block_id = BlockId::from_slice(iter.value()?);
206        self.load_handle(&key_block_id)
207    }
208
209    pub fn find_prev_key_block(&self, seqno: u32) -> Option<BlockHandle> {
210        if seqno == 0 {
211            return None;
212        }
213
214        // Create iterator and move it to the previous key block before the specified
215        let mut iter = self.db.key_blocks.raw_iterator();
216        iter.seek_for_prev((seqno - 1u32).to_be_bytes());
217
218        // Load key block from current iterator value
219        let key_block_id = BlockId::from_slice(iter.value()?);
220        self.load_handle(&key_block_id)
221    }
222
223    pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option<BlockHandle> {
224        if seqno == 0 {
225            return None;
226        }
227
228        // Create iterator and move it to the previous key block before the specified
229        let mut iter = self.db.key_blocks.raw_iterator();
230        iter.seek_for_prev((seqno - 1u32).to_be_bytes());
231
232        // Loads key block from current iterator value and moves it backward
233        let mut get_key_block = move || -> Option<BlockHandle> {
234            // Load key block id
235            let key_block_id = BlockId::from_slice(iter.value()?);
236
237            // Load block handle for this id
238            let handle = self.load_handle(&key_block_id)?;
239
240            // Move iterator backward
241            iter.prev();
242
243            // Done
244            Some(handle)
245        };
246
247        // Load previous key block
248        let mut key_block = get_key_block()?;
249
250        // Load previous key blocks and check if the `key_block` is for persistent state
251        while let Some(prev_key_block) = get_key_block() {
252            if BlockStuff::compute_is_persistent(key_block.gen_utime(), prev_key_block.gen_utime())
253            {
254                // Found
255                return Some(key_block);
256            }
257            key_block = prev_key_block;
258        }
259
260        // Not found
261        None
262    }
263
264    pub fn key_blocks_iterator(
265        &self,
266        direction: KeyBlocksDirection,
267    ) -> impl Iterator<Item = BlockId> + '_ {
268        let mut raw_iterator = self.db.key_blocks.raw_iterator();
269        let reverse = match direction {
270            KeyBlocksDirection::ForwardFrom(seqno) => {
271                raw_iterator.seek(seqno.to_be_bytes());
272                false
273            }
274            KeyBlocksDirection::Backward => {
275                raw_iterator.seek_to_last();
276                true
277            }
278        };
279
280        KeyBlocksIterator {
281            raw_iterator,
282            reverse,
283        }
284    }
285
286    pub fn gc_handles_cache(&self, mc_seqno: u32, shard_heights: &ShardHeights) -> usize {
287        let mut total_removed = 0;
288
289        self.cache.retain(|block_id, value| {
290            let value = match value.upgrade() {
291                Some(value) => value,
292                None => {
293                    total_removed += 1;
294                    return false;
295                }
296            };
297
298            let is_masterchain = block_id.is_masterchain();
299
300            if block_id.seqno == 0
301                || is_masterchain && (block_id.seqno >= mc_seqno || value.is_key_block())
302                || !is_masterchain
303                    && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
304            {
305                // Keep zero state, key blocks and latest blocks
306                true
307            } else {
308                // Remove all outdated
309                total_removed += 1;
310                value.meta().add_flags(BlockFlags::IS_REMOVED);
311                false
312            }
313        });
314
315        total_removed
316    }
317
318    fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle {
319        use dashmap::mapref::entry::Entry;
320
321        match self.cache.entry(*block_id) {
322            Entry::Vacant(entry) => {
323                let handle = BlockHandle::new(block_id, meta, self.cache.clone());
324                entry.insert(handle.downgrade());
325                handle
326            }
327            Entry::Occupied(mut entry) => match entry.get().upgrade() {
328                Some(handle) => handle,
329                None => {
330                    let handle = BlockHandle::new(block_id, meta, self.cache.clone());
331                    entry.insert(handle.downgrade());
332                    handle
333                }
334            },
335        }
336    }
337}
338
339#[derive(Debug, Copy, Clone, Eq, PartialEq)]
340pub enum HandleCreationStatus {
341    Created,
342    Fetched,
343}
344
345#[derive(Debug, Copy, Clone, Eq, PartialEq)]
346pub enum KeyBlocksDirection {
347    ForwardFrom(u32),
348    Backward,
349}
350
351struct KeyBlocksIterator<'a> {
352    raw_iterator: weedb::rocksdb::DBRawIterator<'a>,
353    reverse: bool,
354}
355
356impl Iterator for KeyBlocksIterator<'_> {
357    type Item = BlockId;
358
359    fn next(&mut self) -> Option<Self::Item> {
360        let value = self.raw_iterator.value().map(BlockId::from_slice)?;
361        if self.reverse {
362            self.raw_iterator.prev();
363        } else {
364            self.raw_iterator.next();
365        }
366        Some(value)
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use tycho_storage::StorageContext;
373    use tycho_types::models::ShardIdent;
374
375    use super::*;
376    use crate::storage::{CoreStorage, CoreStorageConfig};
377
378    #[tokio::test]
379    async fn merge_operator_works() -> anyhow::Result<()> {
380        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
381        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
382
383        let block_handles = storage.block_handle_storage();
384
385        let block_id = BlockId {
386            shard: ShardIdent::BASECHAIN,
387            seqno: 100,
388            ..Default::default()
389        };
390
391        let meta = NewBlockMeta {
392            is_key_block: false,
393            gen_utime: 123,
394            ref_by_mc_seqno: 456,
395        };
396
397        {
398            let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
399            assert_eq!(status, HandleCreationStatus::Created);
400
401            assert_eq!(handle.ref_by_mc_seqno(), 456);
402            assert!(!handle.is_key_block());
403            assert!(!handle.is_committed());
404
405            let updated = block_handles.set_block_committed(&handle);
406            assert!(updated);
407            assert!(handle.is_committed());
408
409            // Ensure that handles are reused
410            let (handle2, status) = block_handles.create_or_load_handle(&block_id, meta);
411            assert_eq!(status, HandleCreationStatus::Fetched);
412
413            assert_eq!(
414                arc_swap::RefCnt::as_ptr(&handle),
415                arc_swap::RefCnt::as_ptr(&handle2),
416            );
417        }
418
419        // Ensure that the handle is dropped
420        assert!(!block_handles.cache.contains_key(&block_id));
421
422        // Ensure that storage is properly updated
423        {
424            let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
425            assert_eq!(status, HandleCreationStatus::Fetched);
426
427            assert_eq!(handle.ref_by_mc_seqno(), 456);
428            assert!(!handle.is_key_block());
429            assert!(handle.is_committed());
430        }
431
432        // Ensure that the handle is dropped
433        assert!(!block_handles.cache.contains_key(&block_id));
434
435        Ok(())
436    }
437}