tycho_core/storage/block_handle/
handle.rs1use std::sync::{Arc, Weak};
2
3use tokio::sync::{Semaphore, SemaphorePermit};
4use tycho_types::models::*;
5
6use super::{BlockFlags, BlockHandleCache, BlockMeta};
7
8#[derive(Clone)]
9#[repr(transparent)]
10pub struct WeakBlockHandle {
11 inner: Weak<Inner>,
12}
13
14impl WeakBlockHandle {
15 pub fn strong_count(&self) -> usize {
16 self.inner.strong_count()
17 }
18
19 pub fn upgrade(&self) -> Option<BlockHandle> {
20 self.inner.upgrade().map(|inner| BlockHandle { inner })
21 }
22}
23
24#[derive(Clone)]
25#[repr(transparent)]
26pub struct BlockHandle {
27 inner: Arc<Inner>,
28}
29
30impl BlockHandle {
31 pub fn new(id: &BlockId, meta: BlockMeta, cache: Arc<BlockHandleCache>) -> Self {
32 Self {
33 inner: Arc::new(Inner {
34 id: *id,
35 meta,
36 block_data_lock: Default::default(),
37 proof_data_block: Default::default(),
38 queue_diff_data_lock: Default::default(),
39 cache,
40 }),
41 }
42 }
43
44 pub fn downgrade(&self) -> WeakBlockHandle {
45 WeakBlockHandle {
46 inner: Arc::downgrade(&self.inner),
47 }
48 }
49
50 pub fn id(&self) -> &BlockId {
51 &self.inner.id
52 }
53
54 pub fn is_masterchain(&self) -> bool {
55 self.inner.id.is_masterchain()
56 }
57
58 pub fn meta(&self) -> &BlockMeta {
59 &self.inner.meta
60 }
61
62 pub fn gen_utime(&self) -> u32 {
63 self.inner.meta.gen_utime()
64 }
65
66 pub fn is_key_block(&self) -> bool {
67 self.inner.meta.flags().contains(BlockFlags::IS_KEY_BLOCK)
68 || self.inner.id.is_masterchain() && self.inner.id.seqno == 0
69 }
70
71 pub fn is_committed(&self) -> bool {
72 self.inner.meta.flags().contains(BlockFlags::IS_COMMITTED)
73 }
74
75 pub fn is_persistent(&self) -> bool {
76 self.inner.meta.flags().contains(BlockFlags::IS_PERSISTENT) || self.inner.id.seqno == 0
77 }
78
79 pub fn is_zerostate(&self) -> bool {
80 self.inner.meta.flags().contains(BlockFlags::IS_ZEROSTATE) || self.inner.id.seqno == 0
81 }
82
83 pub fn skip_states_gc(&self) -> bool {
84 let flags = self.inner.meta.flags();
85 flags.contains(BlockFlags::SKIP_STATES_GC)
86 && !flags.contains(BlockFlags::SKIP_STATES_GC_FINISHED)
87 }
88
89 pub fn skip_blocks_gc(&self) -> bool {
90 let flags = self.inner.meta.flags();
91 flags.contains(BlockFlags::SKIP_BLOCKS_GC)
92 && !flags.contains(BlockFlags::SKIP_BLOCKS_GC_FINISHED)
93 }
94
95 pub fn has_data(&self) -> bool {
96 const MASK: u32 = BlockFlags::HAS_DATA.bits() | BlockFlags::IS_REMOVED.bits();
97 let flags = self.inner.meta.flags();
98 flags.bits() & MASK == BlockFlags::HAS_DATA.bits()
99 }
100
101 pub fn has_proof(&self) -> bool {
102 const MASK: u32 = BlockFlags::HAS_PROOF.bits() | BlockFlags::IS_REMOVED.bits();
103 let flags = self.inner.meta.flags();
104 flags.bits() & MASK == BlockFlags::HAS_PROOF.bits()
105 }
106
107 pub fn has_queue_diff(&self) -> bool {
108 const MASK: u32 = BlockFlags::HAS_QUEUE_DIFF.bits() | BlockFlags::IS_REMOVED.bits();
109 let flags = self.inner.meta.flags();
110 flags.bits() & MASK == BlockFlags::HAS_QUEUE_DIFF.bits()
111 }
112
113 pub fn has_all_block_parts(&self) -> bool {
114 const MASK: u32 = BlockFlags::HAS_ALL_BLOCK_PARTS.bits() | BlockFlags::IS_REMOVED.bits();
115 let flags = self.inner.meta.flags();
116 flags.bits() & MASK == BlockFlags::HAS_ALL_BLOCK_PARTS.bits()
117 }
118
119 pub fn has_next1(&self) -> bool {
120 self.inner.meta.flags().contains(BlockFlags::HAS_NEXT_1)
121 }
122
123 pub fn has_state(&self) -> bool {
124 self.inner.meta.flags().contains(BlockFlags::HAS_STATE)
125 }
126
127 pub fn has_virtual_state(&self) -> bool {
128 self.inner
129 .meta
130 .flags()
131 .contains(BlockFlags::HAS_VIRTUAL_STATE)
132 }
133
134 pub fn has_persistent_shard_state(&self) -> bool {
135 self.inner
136 .meta
137 .flags()
138 .contains(BlockFlags::HAS_PERSISTENT_SHARD_STATE)
139 }
140
141 pub fn has_persistent_queue_state(&self) -> bool {
142 self.inner
143 .meta
144 .flags()
145 .contains(BlockFlags::HAS_PERSISTENT_QUEUE_STATE)
146 }
147
148 pub fn ref_by_mc_seqno(&self) -> u32 {
149 if self.inner.id.shard.is_masterchain() {
150 self.inner.id.seqno
151 } else {
152 self.inner.meta.ref_by_mc_seqno()
153 }
154 }
155
156 pub(crate) fn block_data_lock(&self) -> &BlockDataLock {
157 &self.inner.block_data_lock
158 }
159
160 pub(crate) fn proof_data_lock(&self) -> &BlockDataLock {
161 &self.inner.proof_data_block
162 }
163
164 pub(crate) fn queue_diff_data_lock(&self) -> &BlockDataLock {
165 &self.inner.queue_diff_data_lock
166 }
167}
168
169unsafe impl arc_swap::RefCnt for BlockHandle {
170 type Base = Inner;
171
172 fn into_ptr(me: Self) -> *mut Self::Base {
173 arc_swap::RefCnt::into_ptr(me.inner)
174 }
175
176 fn as_ptr(me: &Self) -> *mut Self::Base {
177 arc_swap::RefCnt::as_ptr(&me.inner)
178 }
179
180 unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
181 Self {
182 inner: unsafe { arc_swap::RefCnt::from_ptr(ptr) },
183 }
184 }
185}
186
187#[doc(hidden)]
188pub struct Inner {
189 id: BlockId,
190 meta: BlockMeta,
191 block_data_lock: BlockDataLock,
192 proof_data_block: BlockDataLock,
193 queue_diff_data_lock: BlockDataLock,
194 cache: Arc<BlockHandleCache>,
195}
196
197impl Drop for Inner {
198 fn drop(&mut self) {
199 self.cache
200 .remove_if(&self.id, |_, weak| weak.strong_count() == 0);
201 }
202}
203
204pub(crate) struct BlockDataLock {
205 semaphore: Semaphore,
206}
207
208impl BlockDataLock {
209 const fn new() -> Self {
210 Self {
211 semaphore: Semaphore::const_new(MAX_READS as usize),
212 }
213 }
214
215 pub async fn read(&self) -> BlockDataGuard<'_> {
216 BlockDataGuard(self.semaphore.acquire().await.unwrap_or_else(|_| {
217 unreachable!()
219 }))
220 }
221
222 pub async fn write(&self) -> BlockDataGuard<'_> {
223 BlockDataGuard(
224 self.semaphore
225 .acquire_many(MAX_READS)
226 .await
227 .unwrap_or_else(|_| {
228 unreachable!()
230 }),
231 )
232 }
233}
234
235impl Default for BlockDataLock {
236 fn default() -> Self {
237 Self::new()
238 }
239}
240
241pub(crate) struct BlockDataGuard<'a>(#[allow(unused)] SemaphorePermit<'a>);
242
243const MAX_READS: u32 = u32::MAX >> 3;