tycho_core/storage/block_handle/
mod.rs1use std::sync::Arc;
2
3pub(crate) use handle::BlockDataGuard;
4use tycho_block_util::block::{BlockStuff, ShardHeights};
5use tycho_storage::kv::StoredValue;
6use tycho_types::models::BlockId;
7use tycho_util::FastDashMap;
8
9pub use self::handle::{BlockHandle, WeakBlockHandle};
10pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
11use super::{CoreDb, PartialBlockId};
12
13mod handle;
14mod meta;
15
16type BlockHandleCache = FastDashMap<BlockId, WeakBlockHandle>;
17
18pub struct BlockHandleStorage {
19 db: CoreDb,
20 cache: Arc<BlockHandleCache>,
21}
22
23impl BlockHandleStorage {
24 pub fn new(db: CoreDb) -> Self {
25 Self {
26 db,
27 cache: Arc::new(Default::default()),
28 }
29 }
30
31 pub fn set_block_committed(&self, handle: &BlockHandle) -> bool {
32 let updated = handle.meta().add_flags(BlockFlags::IS_COMMITTED);
33 if updated {
34 self.store_handle(handle, false);
35 }
36 updated
37 }
38
39 pub fn set_has_shard_state(&self, handle: &BlockHandle) -> bool {
40 let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
41 if updated {
42 self.store_handle(handle, false);
43 }
44 updated
45 }
46
47 pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
48 let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
49 if updated {
50 self.store_handle(handle, false);
51 }
52 updated
53 }
54
55 pub fn set_has_persistent_shard_state(&self, handle: &BlockHandle) -> bool {
56 let updated = handle
57 .meta()
58 .add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
59 if updated {
60 self.store_handle(handle, false);
61 }
62 updated
63 }
64
65 pub fn set_has_persistent_queue_state(&self, handle: &BlockHandle) -> bool {
66 let updated = handle
67 .meta()
68 .add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
69 if updated {
70 self.store_handle(handle, false);
71 }
72 updated
73 }
74
75 pub fn set_is_zerostate(&self, handle: &BlockHandle) -> bool {
76 let updated = handle.meta().add_flags(BlockFlags::IS_ZEROSTATE);
77 if updated {
78 self.store_handle(handle, false);
79 }
80 updated
81 }
82
83 pub fn create_or_load_handle(
84 &self,
85 block_id: &BlockId,
86 meta_data: NewBlockMeta,
87 ) -> (BlockHandle, HandleCreationStatus) {
88 use dashmap::mapref::entry::Entry;
89
90 let block_handles = &self.db.block_handles;
91
92 if let Some(handle) = self.cache.get(block_id)
94 && let Some(handle) = handle.upgrade()
95 {
96 return (handle, HandleCreationStatus::Fetched);
97 }
98
99 match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
100 Some(data) => {
102 let meta = BlockMeta::from_slice(data.as_ref());
103
104 let handle = self.fill_cache(block_id, meta);
106
107 (handle, HandleCreationStatus::Fetched)
109 }
110 None => {
111 let handle = BlockHandle::new(
113 block_id,
114 BlockMeta::with_data(meta_data),
115 self.cache.clone(),
116 );
117
118 let is_new = match self.cache.entry(*block_id) {
120 Entry::Vacant(entry) => {
121 entry.insert(handle.downgrade());
122 true
123 }
124 Entry::Occupied(mut entry) => match entry.get().upgrade() {
125 Some(handle) => return (handle, HandleCreationStatus::Fetched),
127 None => {
128 entry.insert(handle.downgrade());
129 true
130 }
131 },
132 };
133
134 self.store_handle(&handle, is_new);
136
137 (handle, HandleCreationStatus::Created)
139 }
140 }
141 }
142
143 pub fn load_handle(&self, block_id: &BlockId) -> Option<BlockHandle> {
144 let block_handles = &self.db.block_handles;
145
146 if let Some(handle) = self.cache.get(block_id)
148 && let Some(handle) = handle.upgrade()
149 {
150 return Some(handle);
151 }
152
153 let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
155 Some(data) => BlockMeta::from_slice(data.as_ref()),
156 None => return None,
157 };
158
159 Some(self.fill_cache(block_id, meta))
161 }
162
163 pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
164 let id = handle.id();
165
166 let is_key_block = handle.is_key_block();
167
168 if is_new || is_key_block {
169 let mut batch = weedb::rocksdb::WriteBatch::default();
170
171 batch.merge_cf(
172 &self.db.block_handles.cf(),
173 id.root_hash,
174 handle.meta().to_vec(),
175 );
176
177 if is_new {
178 batch.put_cf(
179 &self.db.full_block_ids.cf(),
180 PartialBlockId::from(id).to_vec(),
181 id.file_hash,
182 );
183 }
184
185 if is_key_block {
186 batch.put_cf(
187 &self.db.key_blocks.cf(),
188 id.seqno.to_be_bytes(),
189 id.to_vec(),
190 );
191 }
192
193 self.db
194 .rocksdb()
195 .write_opt(batch, self.db.block_handles.write_config())
196 } else {
197 self.db.rocksdb().merge_cf_opt(
198 &self.db.block_handles.cf(),
199 id.root_hash,
200 handle.meta().to_vec(),
201 self.db.block_handles.write_config(),
202 )
203 }
204 .unwrap();
205 }
206
207 pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
208 let key_blocks = &self.db.key_blocks;
209 let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() {
210 Some(data) => BlockId::from_slice(data.as_ref()),
211 None => return None,
212 };
213 self.load_handle(&key_block_id)
214 }
215
216 pub fn find_last_key_block(&self) -> Option<BlockHandle> {
217 let mut iter = self.db.key_blocks.raw_iterator();
218 iter.seek_to_last();
219
220 let key_block_id = BlockId::from_slice(iter.value()?);
222 self.load_handle(&key_block_id)
223 }
224
225 pub fn find_prev_key_block(&self, seqno: u32) -> Option<BlockHandle> {
226 if seqno == 0 {
227 return None;
228 }
229
230 let mut iter = self.db.key_blocks.raw_iterator();
232 iter.seek_for_prev((seqno - 1u32).to_be_bytes());
233
234 let key_block_id = BlockId::from_slice(iter.value()?);
236 self.load_handle(&key_block_id)
237 }
238
239 pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option<BlockHandle> {
240 if seqno == 0 {
241 return None;
242 }
243
244 let mut iter = self.db.key_blocks.raw_iterator();
246 iter.seek_for_prev((seqno - 1u32).to_be_bytes());
247
248 let mut get_key_block = move || -> Option<BlockHandle> {
250 let key_block_id = BlockId::from_slice(iter.value()?);
252
253 let handle = self.load_handle(&key_block_id)?;
255
256 iter.prev();
258
259 Some(handle)
261 };
262
263 let mut key_block = get_key_block()?;
265
266 while let Some(prev_key_block) = get_key_block() {
268 if BlockStuff::compute_is_persistent(key_block.gen_utime(), prev_key_block.gen_utime())
269 {
270 return Some(key_block);
272 }
273 key_block = prev_key_block;
274 }
275
276 None
278 }
279
280 pub fn key_blocks_iterator(
281 &self,
282 direction: KeyBlocksDirection,
283 ) -> impl Iterator<Item = BlockId> + '_ {
284 let mut raw_iterator = self.db.key_blocks.raw_iterator();
285 let reverse = match direction {
286 KeyBlocksDirection::ForwardFrom(seqno) => {
287 raw_iterator.seek(seqno.to_be_bytes());
288 false
289 }
290 KeyBlocksDirection::Backward => {
291 raw_iterator.seek_to_last();
292 true
293 }
294 };
295
296 KeyBlocksIterator {
297 raw_iterator,
298 reverse,
299 }
300 }
301
302 pub fn gc_handles_cache(&self, mc_seqno: u32, shard_heights: &ShardHeights) -> usize {
303 let mut total_removed = 0;
304
305 self.cache.retain(|block_id, value| {
306 let value = match value.upgrade() {
307 Some(value) => value,
308 None => {
309 total_removed += 1;
310 return false;
311 }
312 };
313
314 let is_masterchain = block_id.is_masterchain();
315
316 if block_id.seqno == 0
317 || is_masterchain && (block_id.seqno >= mc_seqno || value.is_key_block())
318 || !is_masterchain
319 && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
320 {
321 true
323 } else {
324 total_removed += 1;
326 value.meta().add_flags(BlockFlags::IS_REMOVED);
327 false
328 }
329 });
330
331 total_removed
332 }
333
334 fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle {
335 use dashmap::mapref::entry::Entry;
336
337 match self.cache.entry(*block_id) {
338 Entry::Vacant(entry) => {
339 let handle = BlockHandle::new(block_id, meta, self.cache.clone());
340 entry.insert(handle.downgrade());
341 handle
342 }
343 Entry::Occupied(mut entry) => match entry.get().upgrade() {
344 Some(handle) => handle,
345 None => {
346 let handle = BlockHandle::new(block_id, meta, self.cache.clone());
347 entry.insert(handle.downgrade());
348 handle
349 }
350 },
351 }
352 }
353}
354
355#[derive(Debug, Copy, Clone, Eq, PartialEq)]
356pub enum HandleCreationStatus {
357 Created,
358 Fetched,
359}
360
361#[derive(Debug, Copy, Clone, Eq, PartialEq)]
362pub enum KeyBlocksDirection {
363 ForwardFrom(u32),
364 Backward,
365}
366
367struct KeyBlocksIterator<'a> {
368 raw_iterator: weedb::rocksdb::DBRawIterator<'a>,
369 reverse: bool,
370}
371
372impl Iterator for KeyBlocksIterator<'_> {
373 type Item = BlockId;
374
375 fn next(&mut self) -> Option<Self::Item> {
376 let value = self.raw_iterator.value().map(BlockId::from_slice)?;
377 if self.reverse {
378 self.raw_iterator.prev();
379 } else {
380 self.raw_iterator.next();
381 }
382 Some(value)
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use tycho_storage::StorageContext;
389 use tycho_types::models::ShardIdent;
390
391 use super::*;
392 use crate::storage::{CoreStorage, CoreStorageConfig};
393
394 #[tokio::test]
395 async fn merge_operator_works() -> anyhow::Result<()> {
396 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
397 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
398
399 let block_handles = storage.block_handle_storage();
400
401 let block_id = BlockId {
402 shard: ShardIdent::BASECHAIN,
403 seqno: 100,
404 ..Default::default()
405 };
406
407 let meta = NewBlockMeta {
408 is_key_block: false,
409 gen_utime: 123,
410 ref_by_mc_seqno: 456,
411 };
412
413 {
414 let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
415 assert_eq!(status, HandleCreationStatus::Created);
416
417 assert_eq!(handle.ref_by_mc_seqno(), 456);
418 assert!(!handle.is_key_block());
419 assert!(!handle.is_committed());
420
421 let updated = block_handles.set_block_committed(&handle);
422 assert!(updated);
423 assert!(handle.is_committed());
424
425 let (handle2, status) = block_handles.create_or_load_handle(&block_id, meta);
427 assert_eq!(status, HandleCreationStatus::Fetched);
428
429 assert_eq!(
430 arc_swap::RefCnt::as_ptr(&handle),
431 arc_swap::RefCnt::as_ptr(&handle2),
432 );
433 }
434
435 assert!(!block_handles.cache.contains_key(&block_id));
437
438 {
440 let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
441 assert_eq!(status, HandleCreationStatus::Fetched);
442
443 assert_eq!(handle.ref_by_mc_seqno(), 456);
444 assert!(!handle.is_key_block());
445 assert!(handle.is_committed());
446 }
447
448 assert!(!block_handles.cache.contains_key(&block_id));
450
451 Ok(())
452 }
453}