tycho_core/storage/block_connection/
mod.rs1use parking_lot::RwLock;
2use tycho_storage::kv::StoredValue;
3use tycho_types::models::*;
4
5use super::block_handle::{BlockFlags, BlockHandle};
6use super::db::CoreDb;
7use super::tables;
8use super::util::{SlotSubscriptions, read_block_id_le, write_block_id_le};
9
10pub struct BlockConnectionStorage {
12 db: CoreDb,
13 next1_subscriptions: SlotSubscriptions<BlockId, BlockId>,
14 store_next1: RwLock<()>,
15}
16
17impl BlockConnectionStorage {
18 pub fn new(db: CoreDb) -> Self {
19 Self {
20 db,
21 next1_subscriptions: Default::default(),
22 store_next1: RwLock::new(()),
23 }
24 }
25
26 pub async fn wait_for_next1(&self, prev_block_id: &BlockId) -> BlockId {
27 let guard = self.store_next1.write();
28
29 if let Some(block_id) = self.load_connection(prev_block_id, BlockConnection::Next1) {
31 return block_id;
32 }
33
34 let rx = self.next1_subscriptions.subscribe(prev_block_id);
36
37 drop(guard);
39
40 rx.await
41 }
42
43 pub fn store_connection(
44 &self,
45 handle: &BlockHandle,
46 direction: BlockConnection,
47 connected_block_id: &BlockId,
48 ) {
49 let is_next1 = direction == BlockConnection::Next1;
50 let guard = if is_next1 {
51 Some(self.store_next1.read())
52 } else {
53 None
54 };
55
56 let store = {
57 let flag = direction.as_flag();
58 if handle.meta().flags().contains(flag) {
59 return;
60 }
61
62 let key = make_key(handle.id(), direction);
63 self.db
64 .block_connections
65 .insert(key, write_block_id_le(connected_block_id))
66 .unwrap();
67
68 handle.meta().add_flags(flag)
69 };
70
71 if is_next1 {
72 self.next1_subscriptions
73 .notify(handle.id(), connected_block_id);
74 }
75
76 drop(guard);
77
78 if !store {
79 return;
80 }
81
82 let id = handle.id();
83
84 let block_handle_cf = &self.db.block_handles.cf();
85 let rocksdb = self.db.rocksdb();
86
87 rocksdb
88 .merge_cf_opt(
89 block_handle_cf,
90 id.root_hash.as_slice(),
91 handle.meta().to_vec(),
92 self.db.block_handles.write_config(),
93 )
94 .unwrap();
95 }
96
97 pub fn load_connection(
98 &self,
99 block_id: &BlockId,
100 direction: BlockConnection,
101 ) -> Option<BlockId> {
102 let key = make_key(block_id, direction);
103 let data = self.db.block_connections.get(key).unwrap()?;
104 Some(read_block_id_le(&data))
105 }
106}
107
108#[repr(u8)]
109#[derive(Debug, Copy, Clone, Eq, PartialEq)]
110pub enum BlockConnection {
111 Prev1 = 0,
112 Prev2 = 1,
113 Next1 = 2,
114 Next2 = 3,
115}
116
117impl BlockConnection {
118 const fn as_flag(self) -> BlockFlags {
119 match self {
120 Self::Prev1 => BlockFlags::HAS_PREV_1,
121 Self::Prev2 => BlockFlags::HAS_PREV_2,
122 Self::Next1 => BlockFlags::HAS_NEXT_1,
123 Self::Next2 => BlockFlags::HAS_NEXT_2,
124 }
125 }
126}
127
128fn make_key(block_id: &BlockId, ty: BlockConnection) -> [u8; tables::BlockConnections::KEY_LEN] {
129 let mut key = [0u8; tables::BlockConnections::KEY_LEN];
130 key[..4].copy_from_slice(&block_id.shard.workchain().to_be_bytes());
131 key[4..12].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
132 key[12..16].copy_from_slice(&block_id.seqno.to_be_bytes());
133 key[16..48].copy_from_slice(block_id.root_hash.as_slice());
134 key[48] = ty as u8;
135 key
136}