tycho_core/storage/block_handle/
mod.rs1use std::sync::Arc;
2
3pub(crate) use handle::BlockDataGuard;
4use tycho_block_util::block::{BlockStuff, ShardHeights};
5use tycho_storage::kv::StoredValue;
6use tycho_types::models::BlockId;
7use tycho_util::FastDashMap;
8
9pub use self::handle::{BlockHandle, WeakBlockHandle};
10pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
11use super::{CoreDb, PartialBlockId};
12
13mod handle;
14mod meta;
15
16type BlockHandleCache = FastDashMap<BlockId, WeakBlockHandle>;
17
18pub struct BlockHandleStorage {
19 db: CoreDb,
20 cache: Arc<BlockHandleCache>,
21}
22
23impl BlockHandleStorage {
24 pub fn new(db: CoreDb) -> Self {
25 Self {
26 db,
27 cache: Arc::new(Default::default()),
28 }
29 }
30
31 pub fn set_block_committed(&self, handle: &BlockHandle) -> bool {
32 let updated = handle.meta().add_flags(BlockFlags::IS_COMMITTED);
33 if updated {
34 self.store_handle(handle, false);
35 }
36 updated
37 }
38
39 pub fn set_has_shard_state(&self, handle: &BlockHandle) -> bool {
40 let updated = handle.meta().add_flags(BlockFlags::HAS_STATE);
41 if updated {
42 self.store_handle(handle, false);
43 }
44 updated
45 }
46
47 pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
48 let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
49 if updated {
50 self.store_handle(handle, false);
51 }
52 updated
53 }
54
55 pub fn set_has_persistent_shard_state(&self, handle: &BlockHandle) -> bool {
56 let updated = handle
57 .meta()
58 .add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
59 if updated {
60 self.store_handle(handle, false);
61 }
62 updated
63 }
64
65 pub fn set_has_persistent_queue_state(&self, handle: &BlockHandle) -> bool {
66 let updated = handle
67 .meta()
68 .add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
69 if updated {
70 self.store_handle(handle, false);
71 }
72 updated
73 }
74
75 pub fn create_or_load_handle(
76 &self,
77 block_id: &BlockId,
78 meta_data: NewBlockMeta,
79 ) -> (BlockHandle, HandleCreationStatus) {
80 use dashmap::mapref::entry::Entry;
81
82 let block_handles = &self.db.block_handles;
83
84 if let Some(handle) = self.cache.get(block_id)
86 && let Some(handle) = handle.upgrade()
87 {
88 return (handle, HandleCreationStatus::Fetched);
89 }
90
91 match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
92 Some(data) => {
94 let meta = BlockMeta::from_slice(data.as_ref());
95
96 let handle = self.fill_cache(block_id, meta);
98
99 (handle, HandleCreationStatus::Fetched)
101 }
102 None => {
103 let handle = BlockHandle::new(
105 block_id,
106 BlockMeta::with_data(meta_data),
107 self.cache.clone(),
108 );
109
110 let is_new = match self.cache.entry(*block_id) {
112 Entry::Vacant(entry) => {
113 entry.insert(handle.downgrade());
114 true
115 }
116 Entry::Occupied(mut entry) => match entry.get().upgrade() {
117 Some(handle) => return (handle, HandleCreationStatus::Fetched),
119 None => {
120 entry.insert(handle.downgrade());
121 true
122 }
123 },
124 };
125
126 self.store_handle(&handle, is_new);
128
129 (handle, HandleCreationStatus::Created)
131 }
132 }
133 }
134
135 pub fn load_handle(&self, block_id: &BlockId) -> Option<BlockHandle> {
136 let block_handles = &self.db.block_handles;
137
138 if let Some(handle) = self.cache.get(block_id)
140 && let Some(handle) = handle.upgrade()
141 {
142 return Some(handle);
143 }
144
145 let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() {
147 Some(data) => BlockMeta::from_slice(data.as_ref()),
148 None => return None,
149 };
150
151 Some(self.fill_cache(block_id, meta))
153 }
154
155 pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
156 let id = handle.id();
157
158 let is_key_block = handle.is_key_block();
159
160 if is_new || is_key_block {
161 let mut batch = weedb::rocksdb::WriteBatch::default();
162
163 batch.merge_cf(
164 &self.db.block_handles.cf(),
165 id.root_hash,
166 handle.meta().to_vec(),
167 );
168
169 if is_new {
170 batch.put_cf(
171 &self.db.full_block_ids.cf(),
172 PartialBlockId::from(id).to_vec(),
173 id.file_hash,
174 );
175 }
176
177 if is_key_block {
178 batch.put_cf(
179 &self.db.key_blocks.cf(),
180 id.seqno.to_be_bytes(),
181 id.to_vec(),
182 );
183 }
184
185 self.db
186 .rocksdb()
187 .write_opt(batch, self.db.block_handles.write_config())
188 } else {
189 self.db.rocksdb().merge_cf_opt(
190 &self.db.block_handles.cf(),
191 id.root_hash,
192 handle.meta().to_vec(),
193 self.db.block_handles.write_config(),
194 )
195 }
196 .unwrap();
197 }
198
199 pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
200 let key_blocks = &self.db.key_blocks;
201 let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() {
202 Some(data) => BlockId::from_slice(data.as_ref()),
203 None => return None,
204 };
205 self.load_handle(&key_block_id)
206 }
207
208 pub fn find_last_key_block(&self) -> Option<BlockHandle> {
209 let mut iter = self.db.key_blocks.raw_iterator();
210 iter.seek_to_last();
211
212 let key_block_id = BlockId::from_slice(iter.value()?);
214 self.load_handle(&key_block_id)
215 }
216
217 pub fn find_prev_key_block(&self, seqno: u32) -> Option<BlockHandle> {
218 if seqno == 0 {
219 return None;
220 }
221
222 let mut iter = self.db.key_blocks.raw_iterator();
224 iter.seek_for_prev((seqno - 1u32).to_be_bytes());
225
226 let key_block_id = BlockId::from_slice(iter.value()?);
228 self.load_handle(&key_block_id)
229 }
230
231 pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option<BlockHandle> {
232 if seqno == 0 {
233 return None;
234 }
235
236 let mut iter = self.db.key_blocks.raw_iterator();
238 iter.seek_for_prev((seqno - 1u32).to_be_bytes());
239
240 let mut get_key_block = move || -> Option<BlockHandle> {
242 let key_block_id = BlockId::from_slice(iter.value()?);
244
245 let handle = self.load_handle(&key_block_id)?;
247
248 iter.prev();
250
251 Some(handle)
253 };
254
255 let mut key_block = get_key_block()?;
257
258 while let Some(prev_key_block) = get_key_block() {
260 if BlockStuff::compute_is_persistent(key_block.gen_utime(), prev_key_block.gen_utime())
261 {
262 return Some(key_block);
264 }
265 key_block = prev_key_block;
266 }
267
268 None
270 }
271
272 pub fn key_blocks_iterator(
273 &self,
274 direction: KeyBlocksDirection,
275 ) -> impl Iterator<Item = BlockId> + '_ {
276 let mut raw_iterator = self.db.key_blocks.raw_iterator();
277 let reverse = match direction {
278 KeyBlocksDirection::ForwardFrom(seqno) => {
279 raw_iterator.seek(seqno.to_be_bytes());
280 false
281 }
282 KeyBlocksDirection::Backward => {
283 raw_iterator.seek_to_last();
284 true
285 }
286 };
287
288 KeyBlocksIterator {
289 raw_iterator,
290 reverse,
291 }
292 }
293
294 pub fn gc_handles_cache(&self, mc_seqno: u32, shard_heights: &ShardHeights) -> usize {
295 let mut total_removed = 0;
296
297 self.cache.retain(|block_id, value| {
298 let value = match value.upgrade() {
299 Some(value) => value,
300 None => {
301 total_removed += 1;
302 return false;
303 }
304 };
305
306 let is_masterchain = block_id.is_masterchain();
307
308 if block_id.seqno == 0
309 || is_masterchain && (block_id.seqno >= mc_seqno || value.is_key_block())
310 || !is_masterchain
311 && shard_heights.contains_shard_seqno(&block_id.shard, block_id.seqno)
312 {
313 true
315 } else {
316 total_removed += 1;
318 value.meta().add_flags(BlockFlags::IS_REMOVED);
319 false
320 }
321 });
322
323 total_removed
324 }
325
326 fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle {
327 use dashmap::mapref::entry::Entry;
328
329 match self.cache.entry(*block_id) {
330 Entry::Vacant(entry) => {
331 let handle = BlockHandle::new(block_id, meta, self.cache.clone());
332 entry.insert(handle.downgrade());
333 handle
334 }
335 Entry::Occupied(mut entry) => match entry.get().upgrade() {
336 Some(handle) => handle,
337 None => {
338 let handle = BlockHandle::new(block_id, meta, self.cache.clone());
339 entry.insert(handle.downgrade());
340 handle
341 }
342 },
343 }
344 }
345}
346
347#[derive(Debug, Copy, Clone, Eq, PartialEq)]
348pub enum HandleCreationStatus {
349 Created,
350 Fetched,
351}
352
353#[derive(Debug, Copy, Clone, Eq, PartialEq)]
354pub enum KeyBlocksDirection {
355 ForwardFrom(u32),
356 Backward,
357}
358
359struct KeyBlocksIterator<'a> {
360 raw_iterator: weedb::rocksdb::DBRawIterator<'a>,
361 reverse: bool,
362}
363
364impl Iterator for KeyBlocksIterator<'_> {
365 type Item = BlockId;
366
367 fn next(&mut self) -> Option<Self::Item> {
368 let value = self.raw_iterator.value().map(BlockId::from_slice)?;
369 if self.reverse {
370 self.raw_iterator.prev();
371 } else {
372 self.raw_iterator.next();
373 }
374 Some(value)
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use tycho_storage::StorageContext;
381 use tycho_types::models::ShardIdent;
382
383 use super::*;
384 use crate::storage::{CoreStorage, CoreStorageConfig};
385
386 #[tokio::test]
387 async fn merge_operator_works() -> anyhow::Result<()> {
388 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
389 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
390
391 let block_handles = storage.block_handle_storage();
392
393 let block_id = BlockId {
394 shard: ShardIdent::BASECHAIN,
395 seqno: 100,
396 ..Default::default()
397 };
398
399 let meta = NewBlockMeta {
400 is_key_block: false,
401 gen_utime: 123,
402 ref_by_mc_seqno: 456,
403 };
404
405 {
406 let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
407 assert_eq!(status, HandleCreationStatus::Created);
408
409 assert_eq!(handle.ref_by_mc_seqno(), 456);
410 assert!(!handle.is_key_block());
411 assert!(!handle.is_committed());
412
413 let updated = block_handles.set_block_committed(&handle);
414 assert!(updated);
415 assert!(handle.is_committed());
416
417 let (handle2, status) = block_handles.create_or_load_handle(&block_id, meta);
419 assert_eq!(status, HandleCreationStatus::Fetched);
420
421 assert_eq!(
422 arc_swap::RefCnt::as_ptr(&handle),
423 arc_swap::RefCnt::as_ptr(&handle2),
424 );
425 }
426
427 assert!(!block_handles.cache.contains_key(&block_id));
429
430 {
432 let (handle, status) = block_handles.create_or_load_handle(&block_id, meta);
433 assert_eq!(status, HandleCreationStatus::Fetched);
434
435 assert_eq!(handle.ref_by_mc_seqno(), 456);
436 assert!(!handle.is_key_block());
437 assert!(handle.is_committed());
438 }
439
440 assert!(!block_handles.cache.contains_key(&block_id));
442
443 Ok(())
444 }
445}