Skip to main content

tycho_core/storage/block_handle/
mod.rs

1use std::sync::Arc;
2
3pub(crate) use handle::BlockDataGuard;
4use tycho_block_util::block::{BlockStuff, ShardHeights};
5use tycho_storage::kv::StoredValue;
6use tycho_types::models::BlockId;
7use tycho_util::FastDashMap;
8
9pub use self::handle::{BlockHandle, WeakBlockHandle};
10pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
11use super::{CoreDb, PartialBlockId};
12
13mod handle;
14mod meta;
15
16type BlockHandleCache = FastDashMap<BlockId, WeakBlockHandle>;
17
18pub struct BlockHandleStorage {
19    db: CoreDb,
20    cache: Arc<BlockHandleCache>,
21}
22
23impl BlockHandleStorage {
24    pub fn new(db: CoreDb) -> Self {
25        Self {
26            db,
27            cache: Arc::new(Default::default()),
28        }
29    }
30
31    pub fn set_block_committed(&self, handle: &BlockHandle) -> bool {
32        let updated = handle.meta().add_flags(BlockFlags::IS_COMMITTED);
33        if updated {
34            self.store_handle(handle, false);
35        }
36        updated
37    }
38
39    pub fn set_has_shard_state(&self, handle: &BlockHandle) -> bool {
40        let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
41        if updated {
42            self.store_handle(handle, false);
43        }
44        updated
45    }
46
47    pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
48        let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
49        if updated {
50            self.store_handle(handle, false);
51        }
52        updated
53    }
54
55    pub fn set_has_persistent_shard_state(&self, handle: &BlockHandle) -> bool {
56        let updated = handle
57            .meta()
58            .add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
59        if updated {
60            self.store_handle(handle, false);
61        }
62        updated
63    }
64
65    pub fn set_has_persistent_queue_state(&self, handle: &BlockHandle) -> bool {
66        let updated = handle
67            .meta()
68            .add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
69        if updated {
70            self.store_handle(handle, false);
71        }
72        updated
73    }
74
75    pub fn set_is_zerostate(&self, handle: &BlockHandle) -> bool {
76        let updated = handle.meta().add_flags(BlockFlags::IS_ZEROSTATE);
77        if updated {
78            self.store_handle(handle, false);
79        }
80        updated
81    }
82
83    pub fn create_or_load_handle(
84        &self,
85        block_id: &BlockId,
86        meta_data: NewBlockMeta,
87    ) -> (BlockHandle, HandleCreationStatus) {
88        use dashmap::mapref::entry::Entry;
89
90        let block_handles = &self.db.block_handles;
91
92        // Fast path - lookup in cache
93        if let Some(handle) = self.cache.get(block_id)
94            && let Some(handle) = handle.upgrade()
95        {
96            return (handle, HandleCreationStatus::Fetched);
97        }
98
99        match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
100            // Try to load block handle from an existing data
101            Some(data) => {
102                let meta = BlockMeta::from_slice(data.as_ref());
103
104                // Fill the cache with a new handle
105                let handle = self.fill_cache(block_id, meta);
106
107                // Done
108                (handle, HandleCreationStatus::Fetched)
109            }
110            None => {
111                // Create a new handle
112                let handle = BlockHandle::new(
113                    block_id,
114                    BlockMeta::with_data(meta_data),
115                    self.cache.clone(),
116                );
117
118                // Fill the cache with the new handle
119                let is_new = match self.cache.entry(*block_id) {
120                    Entry::Vacant(entry) => {
121                        entry.insert(handle.downgrade());
122                        true
123                    }
124                    Entry::Occupied(mut entry) => match entry.get().upgrade() {
125                        // Another thread has created the handle
126                        Some(handle) => return (handle, HandleCreationStatus::Fetched),
127                        None => {
128                            entry.insert(handle.downgrade());
129                            true
130                        }
131                    },
132                };
133
134                // Store the handle in the storage
135                self.store_handle(&handle, is_new);
136
137                // Done
138                (handle, HandleCreationStatus::Created)
139            }
140        }
141    }
142
143    pub fn load_handle(&self, block_id: &BlockId) -> Option<BlockHandle> {
144        let block_handles = &self.db.block_handles;
145
146        // Fast path - lookup in cache
147        if let Some(handle) = self.cache.get(block_id)
148            && let Some(handle) = handle.upgrade()
149        {
150            return Some(handle);
151        }
152
153        // Load meta from storage
154        let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
155            Some(data) => BlockMeta::from_slice(data.as_ref()),
156            None => return None,
157        };
158
159        // Fill the cache with a new handle
160        Some(self.fill_cache(block_id, meta))
161    }
162
163    pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
164        let id = handle.id();
165
166        let is_key_block = handle.is_key_block();
167
168        if is_new || is_key_block {
169            let mut batch = weedb::rocksdb::WriteBatch::default();
170
171            batch.merge_cf(
172                &self.db.block_handles.cf(),
173                id.root_hash,
174                handle.meta().to_vec(),
175            );
176
177            if is_new {
178                batch.put_cf(
179                    &self.db.full_block_ids.cf(),
180                    PartialBlockId::from(id).to_vec(),
181                    id.file_hash,
182                );
183            }
184
185            if is_key_block {
186                batch.put_cf(
187                    &self.db.key_blocks.cf(),
188                    id.seqno.to_be_bytes(),
189                    id.to_vec(),
190                );
191            }
192
193            self.db
194                .rocksdb()
195                .write_opt(batch, self.db.block_handles.write_config())
196        } else {
197            self.db.rocksdb().merge_cf_opt(
198                &self.db.block_handles.cf(),
199                id.root_hash,
200                handle.meta().to_vec(),
201                self.db.block_handles.write_config(),
202            )
203        }
204        .unwrap();
205    }
206
207    pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
208        let key_blocks = &self.db.key_blocks;
209        let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() {
210            Some(data) => BlockId::from_slice(data.as_ref()),
211            None => return None,
212        };
213        self.load_handle(&key_block_id)
214    }
215
216    pub fn find_last_key_block(&self) -> Option<BlockHandle> {
217        let mut iter = self.db.key_blocks.raw_iterator();
218        iter.seek_to_last();
219
220        // Load key block from the current iterator value
221        let key_block_id = BlockId::from_slice(iter.value()?);
222        self.load_handle(&key_block_id)
223    }
224
225    pub fn find_prev_key_block(&self, seqno: u32) -> Option<BlockHandle> {
226        if seqno == 0 {
227            return None;
228        }
229
230        // Create iterator and move it to the previous key block before the specified
231        let mut iter = self.db.key_blocks.raw_iterator();
232        iter.seek_for_prev((seqno - 1u32).to_be_bytes());
233
234        // Load key block from current iterator value
235        let key_block_id = BlockId::from_slice(iter.value()?);
236        self.load_handle(&key_block_id)
237    }
238
239    pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option<BlockHandle> {
240        if seqno == 0 {
241            return None;
242        }
243
244        // Create iterator and move it to the previous key block before the specified
245        let mut iter = self.db.key_blocks.raw_iterator();
246        iter.seek_for_prev((seqno - 1u32).to_be_bytes());
247
248        // Loads key block from current iterator value and moves it backward
249        let mut get_key_block = move || -> Option<BlockHandle> {
250            // Load key block id
251            let key_block_id = BlockId::from_slice(iter.value()?);
252
253            // Load block handle for this id
254            let handle = self.load_handle(&key_block_id)?;
255
256            // Move iterator backward
257            iter.prev();
258
259            // Done
260            Some(handle)
261        };
262
263        // Load previous key block
264        let mut key_block = get_key_block()?;
265
266        // Load previous key blocks and check if the `key_block` is for persistent state
267        while let Some(prev_key_block) = get_key_block() {
268            if BlockStuff::compute_is_persistent(key_block.gen_utime(), prev_key_block.gen_utime())
269            {
270                // Found
271                return Some(key_block);
272            }
273            key_block = prev_key_block;
274        }
275
276        // Not found
277        None
278    }
279
280    pub fn key_blocks_iterator(
281        &self,
282        direction: KeyBlocksDirection,
283    ) -> impl Iterator<Item = BlockId> + '_ {
284        let mut raw_iterator = self.db.key_blocks.raw_iterator();
285        let reverse = match direction {
286            KeyBlocksDirection::ForwardFrom(seqno) => {
287                raw_iterator.seek(seqno.to_be_bytes());
288                false
289            }
290            KeyBlocksDirection::Backward => {
291                raw_iterator.seek_to_last();
292                true
293            }
294        };
295
296        KeyBlocksIterator {
297            raw_iterator,
298            reverse,
299        }
300    }
301
302    pub fn gc_handles_cache(&self, mc_seqno: u32, shard_heights: &ShardHeights) -> usize {
303        let mut total_removed = 0;
304
305        self.cache.retain(|block_id, value| {
306            let value = match value.upgrade() {
307                Some(value) => value,
308                None => {
309                    total_removed += 1;
310                    return false;
311                }
312            };
313
314            let is_masterchain = block_id.is_masterchain();
315
316            if block_id.seqno == 0
317                || is_masterchain && (block_id.seqno >= mc_seqno || value.is_key_block())
318                || !is_masterchain
319                    && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
320            {
321                // Keep zero state, key blocks and latest blocks
322                true
323            } else {
324                // Remove all outdated
325                total_removed += 1;
326                value.meta().add_flags(BlockFlags::IS_REMOVED);
327                false
328            }
329        });
330
331        total_removed
332    }
333
334    fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle {
335        use dashmap::mapref::entry::Entry;
336
337        match self.cache.entry(*block_id) {
338            Entry::Vacant(entry) => {
339                let handle = BlockHandle::new(block_id, meta, self.cache.clone());
340                entry.insert(handle.downgrade());
341                handle
342            }
343            Entry::Occupied(mut entry) => match entry.get().upgrade() {
344                Some(handle) => handle,
345                None => {
346                    let handle = BlockHandle::new(block_id, meta, self.cache.clone());
347                    entry.insert(handle.downgrade());
348                    handle
349                }
350            },
351        }
352    }
353}
354
355#[derive(Debug, Copy, Clone, Eq, PartialEq)]
356pub enum HandleCreationStatus {
357    Created,
358    Fetched,
359}
360
361#[derive(Debug, Copy, Clone, Eq, PartialEq)]
362pub enum KeyBlocksDirection {
363    ForwardFrom(u32),
364    Backward,
365}
366
367struct KeyBlocksIterator<'a> {
368    raw_iterator: weedb::rocksdb::DBRawIterator<'a>,
369    reverse: bool,
370}
371
372impl Iterator for KeyBlocksIterator<'_> {
373    type Item = BlockId;
374
375    fn next(&mut self) -> Option<Self::Item> {
376        let value = self.raw_iterator.value().map(BlockId::from_slice)?;
377        if self.reverse {
378            self.raw_iterator.prev();
379        } else {
380            self.raw_iterator.next();
381        }
382        Some(value)
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use tycho_storage::StorageContext;
389    use tycho_types::models::ShardIdent;
390
391    use super::*;
392    use crate::storage::{CoreStorage, CoreStorageConfig};
393
394    #[tokio::test]
395    async fn merge_operator_works() -> anyhow::Result<()> {
396        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
397        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
398
399        let block_handles = storage.block_handle_storage();
400
401        let block_id = BlockId {
402            shard: ShardIdent::BASECHAIN,
403            seqno: 100,
404            ..Default::default()
405        };
406
407        let meta = NewBlockMeta {
408            is_key_block: false,
409            gen_utime: 123,
410            ref_by_mc_seqno: 456,
411        };
412
413        {
414            let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
415            assert_eq!(status, HandleCreationStatus::Created);
416
417            assert_eq!(handle.ref_by_mc_seqno(), 456);
418            assert!(!handle.is_key_block());
419            assert!(!handle.is_committed());
420
421            let updated = block_handles.set_block_committed(&handle);
422            assert!(updated);
423            assert!(handle.is_committed());
424
425            // Ensure that handles are reused
426            let (handle2, status) = block_handles.create_or_load_handle(&block_id, meta);
427            assert_eq!(status, HandleCreationStatus::Fetched);
428
429            assert_eq!(
430                arc_swap::RefCnt::as_ptr(&handle),
431                arc_swap::RefCnt::as_ptr(&handle2),
432            );
433        }
434
435        // Ensure that the handle is dropped
436        assert!(!block_handles.cache.contains_key(&block_id));
437
438        // Ensure that storage is properly updated
439        {
440            let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
441            assert_eq!(status, HandleCreationStatus::Fetched);
442
443            assert_eq!(handle.ref_by_mc_seqno(), 456);
444            assert!(!handle.is_key_block());
445            assert!(handle.is_committed());
446        }
447
448        // Ensure that the handle is dropped
449        assert!(!block_handles.cache.contains_key(&block_id));
450
451        Ok(())
452    }
453}