tycho_core/storage/block_handle/
mod.rs1use std::sync::Arc;
2
3use tycho_block_util::block::{BlockStuff, ShardHeights};
4use tycho_storage::kv::StoredValue;
5use tycho_types::models::BlockId;
6use tycho_util::FastDashMap;
7
8pub(crate) use self::handle::BlockDataGuard;
9pub use self::handle::{BlockHandle, WeakBlockHandle};
10pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
11use super::{CoreDb, PartialBlockId};
12
13mod handle;
14mod meta;
15
16type BlockHandleCache = FastDashMap<BlockId, WeakBlockHandle>;
17
18pub struct BlockHandleStorage {
19 db: CoreDb,
20 cache: Arc<BlockHandleCache>,
21}
22
23impl BlockHandleStorage {
24 pub fn new(db: CoreDb) -> Self {
25 Self {
26 db,
27 cache: Arc::new(Default::default()),
28 }
29 }
30
31 pub fn set_block_committed(&self, handle: &BlockHandle) -> bool {
32 let updated = handle.meta().add_flags(BlockFlags::IS_COMMITTED);
33 if updated {
34 self.store_handle(handle, false);
35 }
36 updated
37 }
38
39 pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
40 let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
41 if updated {
42 self.store_handle(handle, false);
43 }
44 updated
45 }
46
47 pub fn set_has_persistent_shard_state(&self, handle: &BlockHandle) -> bool {
48 let updated = handle
49 .meta()
50 .add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
51 if updated {
52 self.store_handle(handle, false);
53 }
54 updated
55 }
56
57 pub fn set_has_persistent_queue_state(&self, handle: &BlockHandle) -> bool {
58 let updated = handle
59 .meta()
60 .add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
61 if updated {
62 self.store_handle(handle, false);
63 }
64 updated
65 }
66
67 pub fn create_or_load_handle(
68 &self,
69 block_id: &BlockId,
70 meta_data: NewBlockMeta,
71 ) -> (BlockHandle, HandleCreationStatus) {
72 use dashmap::mapref::entry::Entry;
73
74 let block_handles = &self.db.block_handles;
75
76 if let Some(handle) = self.cache.get(block_id) {
78 if let Some(handle) = handle.upgrade() {
79 return (handle, HandleCreationStatus::Fetched);
80 }
81 }
82
83 match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
84 Some(data) => {
86 let meta = BlockMeta::from_slice(data.as_ref());
87
88 let handle = self.fill_cache(block_id, meta);
90
91 (handle, HandleCreationStatus::Fetched)
93 }
94 None => {
95 let handle = BlockHandle::new(
97 block_id,
98 BlockMeta::with_data(meta_data),
99 self.cache.clone(),
100 );
101
102 let is_new = match self.cache.entry(*block_id) {
104 Entry::Vacant(entry) => {
105 entry.insert(handle.downgrade());
106 true
107 }
108 Entry::Occupied(mut entry) => match entry.get().upgrade() {
109 Some(handle) => return (handle, HandleCreationStatus::Fetched),
111 None => {
112 entry.insert(handle.downgrade());
113 true
114 }
115 },
116 };
117
118 self.store_handle(&handle, is_new);
120
121 (handle, HandleCreationStatus::Created)
123 }
124 }
125 }
126
127 pub fn load_handle(&self, block_id: &BlockId) -> Option<BlockHandle> {
128 let block_handles = &self.db.block_handles;
129
130 if let Some(handle) = self.cache.get(block_id) {
132 if let Some(handle) = handle.upgrade() {
133 return Some(handle);
134 }
135 }
136
137 let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
139 Some(data) => BlockMeta::from_slice(data.as_ref()),
140 None => return None,
141 };
142
143 Some(self.fill_cache(block_id, meta))
145 }
146
147 pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
148 let id = handle.id();
149
150 let is_key_block = handle.is_key_block();
151
152 if is_new || is_key_block {
153 let mut batch = weedb::rocksdb::WriteBatch::default();
154
155 batch.merge_cf(
156 &self.db.block_handles.cf(),
157 id.root_hash,
158 handle.meta().to_vec(),
159 );
160
161 if is_new {
162 batch.put_cf(
163 &self.db.full_block_ids.cf(),
164 PartialBlockId::from(id).to_vec(),
165 id.file_hash,
166 );
167 }
168
169 if is_key_block {
170 batch.put_cf(
171 &self.db.key_blocks.cf(),
172 id.seqno.to_be_bytes(),
173 id.to_vec(),
174 );
175 }
176
177 self.db
178 .rocksdb()
179 .write_opt(batch, self.db.block_handles.write_config())
180 } else {
181 self.db.rocksdb().merge_cf_opt(
182 &self.db.block_handles.cf(),
183 id.root_hash,
184 handle.meta().to_vec(),
185 self.db.block_handles.write_config(),
186 )
187 }
188 .unwrap();
189 }
190
191 pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
192 let key_blocks = &self.db.key_blocks;
193 let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() {
194 Some(data) => BlockId::from_slice(data.as_ref()),
195 None => return None,
196 };
197 self.load_handle(&key_block_id)
198 }
199
200 pub fn find_last_key_block(&self) -> Option<BlockHandle> {
201 let mut iter = self.db.key_blocks.raw_iterator();
202 iter.seek_to_last();
203
204 let key_block_id = BlockId::from_slice(iter.value()?);
206 self.load_handle(&key_block_id)
207 }
208
209 pub fn find_prev_key_block(&self, seqno: u32) -> Option<BlockHandle> {
210 if seqno == 0 {
211 return None;
212 }
213
214 let mut iter = self.db.key_blocks.raw_iterator();
216 iter.seek_for_prev((seqno - 1u32).to_be_bytes());
217
218 let key_block_id = BlockId::from_slice(iter.value()?);
220 self.load_handle(&key_block_id)
221 }
222
223 pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option<BlockHandle> {
224 if seqno == 0 {
225 return None;
226 }
227
228 let mut iter = self.db.key_blocks.raw_iterator();
230 iter.seek_for_prev((seqno - 1u32).to_be_bytes());
231
232 let mut get_key_block = move || -> Option<BlockHandle> {
234 let key_block_id = BlockId::from_slice(iter.value()?);
236
237 let handle = self.load_handle(&key_block_id)?;
239
240 iter.prev();
242
243 Some(handle)
245 };
246
247 let mut key_block = get_key_block()?;
249
250 while let Some(prev_key_block) = get_key_block() {
252 if BlockStuff::compute_is_persistent(key_block.gen_utime(), prev_key_block.gen_utime())
253 {
254 return Some(key_block);
256 }
257 key_block = prev_key_block;
258 }
259
260 None
262 }
263
264 pub fn key_blocks_iterator(
265 &self,
266 direction: KeyBlocksDirection,
267 ) -> impl Iterator<Item = BlockId> + '_ {
268 let mut raw_iterator = self.db.key_blocks.raw_iterator();
269 let reverse = match direction {
270 KeyBlocksDirection::ForwardFrom(seqno) => {
271 raw_iterator.seek(seqno.to_be_bytes());
272 false
273 }
274 KeyBlocksDirection::Backward => {
275 raw_iterator.seek_to_last();
276 true
277 }
278 };
279
280 KeyBlocksIterator {
281 raw_iterator,
282 reverse,
283 }
284 }
285
286 pub fn gc_handles_cache(&self, mc_seqno: u32, shard_heights: &ShardHeights) -> usize {
287 let mut total_removed = 0;
288
289 self.cache.retain(|block_id, value| {
290 let value = match value.upgrade() {
291 Some(value) => value,
292 None => {
293 total_removed += 1;
294 return false;
295 }
296 };
297
298 let is_masterchain = block_id.is_masterchain();
299
300 if block_id.seqno == 0
301 || is_masterchain && (block_id.seqno >= mc_seqno || value.is_key_block())
302 || !is_masterchain
303 && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
304 {
305 true
307 } else {
308 total_removed += 1;
310 value.meta().add_flags(BlockFlags::IS_REMOVED);
311 false
312 }
313 });
314
315 total_removed
316 }
317
318 fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle {
319 use dashmap::mapref::entry::Entry;
320
321 match self.cache.entry(*block_id) {
322 Entry::Vacant(entry) => {
323 let handle = BlockHandle::new(block_id, meta, self.cache.clone());
324 entry.insert(handle.downgrade());
325 handle
326 }
327 Entry::Occupied(mut entry) => match entry.get().upgrade() {
328 Some(handle) => handle,
329 None => {
330 let handle = BlockHandle::new(block_id, meta, self.cache.clone());
331 entry.insert(handle.downgrade());
332 handle
333 }
334 },
335 }
336 }
337}
338
339#[derive(Debug, Copy, Clone, Eq, PartialEq)]
340pub enum HandleCreationStatus {
341 Created,
342 Fetched,
343}
344
345#[derive(Debug, Copy, Clone, Eq, PartialEq)]
346pub enum KeyBlocksDirection {
347 ForwardFrom(u32),
348 Backward,
349}
350
351struct KeyBlocksIterator<'a> {
352 raw_iterator: weedb::rocksdb::DBRawIterator<'a>,
353 reverse: bool,
354}
355
356impl Iterator for KeyBlocksIterator<'_> {
357 type Item = BlockId;
358
359 fn next(&mut self) -> Option<Self::Item> {
360 let value = self.raw_iterator.value().map(BlockId::from_slice)?;
361 if self.reverse {
362 self.raw_iterator.prev();
363 } else {
364 self.raw_iterator.next();
365 }
366 Some(value)
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use tycho_storage::StorageContext;
373 use tycho_types::models::ShardIdent;
374
375 use super::*;
376 use crate::storage::{CoreStorage, CoreStorageConfig};
377
378 #[tokio::test]
379 async fn merge_operator_works() -> anyhow::Result<()> {
380 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
381 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
382
383 let block_handles = storage.block_handle_storage();
384
385 let block_id = BlockId {
386 shard: ShardIdent::BASECHAIN,
387 seqno: 100,
388 ..Default::default()
389 };
390
391 let meta = NewBlockMeta {
392 is_key_block: false,
393 gen_utime: 123,
394 ref_by_mc_seqno: 456,
395 };
396
397 {
398 let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
399 assert_eq!(status, HandleCreationStatus::Created);
400
401 assert_eq!(handle.ref_by_mc_seqno(), 456);
402 assert!(!handle.is_key_block());
403 assert!(!handle.is_committed());
404
405 let updated = block_handles.set_block_committed(&handle);
406 assert!(updated);
407 assert!(handle.is_committed());
408
409 let (handle2, status) = block_handles.create_or_load_handle(&block_id, meta);
411 assert_eq!(status, HandleCreationStatus::Fetched);
412
413 assert_eq!(
414 arc_swap::RefCnt::as_ptr(&handle),
415 arc_swap::RefCnt::as_ptr(&handle2),
416 );
417 }
418
419 assert!(!block_handles.cache.contains_key(&block_id));
421
422 {
424 let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
425 assert_eq!(status, HandleCreationStatus::Fetched);
426
427 assert_eq!(handle.ref_by_mc_seqno(), 456);
428 assert!(!handle.is_key_block());
429 assert!(handle.is_committed());
430 }
431
432 assert!(!block_handles.cache.contains_key(&block_id));
434
435 Ok(())
436 }
437}