Skip to main content

tycho_core/storage/block_handle/
handle.rs

1use std::sync::{Arc, Weak};
2
3use tokio::sync::{Semaphore, SemaphorePermit};
4use tycho_types::models::*;
5
6use super::{BlockFlags, BlockHandleCache, BlockMeta};
7
8#[derive(Clone)]
9#[repr(transparent)]
10pub struct WeakBlockHandle {
11    inner: Weak<Inner>,
12}
13
14impl WeakBlockHandle {
15    pub fn strong_count(&self) -> usize {
16        self.inner.strong_count()
17    }
18
19    pub fn upgrade(&self) -> Option<BlockHandle> {
20        self.inner.upgrade().map(|inner| BlockHandle { inner })
21    }
22}
23
24#[derive(Clone)]
25#[repr(transparent)]
26pub struct BlockHandle {
27    inner: Arc<Inner>,
28}
29
30impl BlockHandle {
31    pub fn new(id: &BlockId, meta: BlockMeta, cache: Arc<BlockHandleCache>) -> Self {
32        Self {
33            inner: Arc::new(Inner {
34                id: *id,
35                meta,
36                block_data_lock: Default::default(),
37                proof_data_block: Default::default(),
38                queue_diff_data_lock: Default::default(),
39                cache,
40            }),
41        }
42    }
43
44    pub fn downgrade(&self) -> WeakBlockHandle {
45        WeakBlockHandle {
46            inner: Arc::downgrade(&self.inner),
47        }
48    }
49
50    pub fn id(&self) -> &BlockId {
51        &self.inner.id
52    }
53
54    pub fn is_masterchain(&self) -> bool {
55        self.inner.id.is_masterchain()
56    }
57
58    pub fn meta(&self) -> &BlockMeta {
59        &self.inner.meta
60    }
61
62    pub fn gen_utime(&self) -> u32 {
63        self.inner.meta.gen_utime()
64    }
65
66    pub fn is_key_block(&self) -> bool {
67        self.inner.meta.flags().contains(BlockFlags::IS_KEY_BLOCK)
68            || self.inner.id.is_masterchain() && self.inner.id.seqno == 0
69    }
70
71    pub fn is_committed(&self) -> bool {
72        self.inner.meta.flags().contains(BlockFlags::IS_COMMITTED)
73    }
74
75    pub fn is_persistent(&self) -> bool {
76        self.inner.meta.flags().contains(BlockFlags::IS_PERSISTENT) || self.inner.id.seqno == 0
77    }
78
79    pub fn is_zerostate(&self) -> bool {
80        self.inner.meta.flags().contains(BlockFlags::IS_ZEROSTATE) || self.inner.id.seqno == 0
81    }
82
83    pub fn skip_states_gc(&self) -> bool {
84        let flags = self.inner.meta.flags();
85        flags.contains(BlockFlags::SKIP_STATES_GC)
86            && !flags.contains(BlockFlags::SKIP_STATES_GC_FINISHED)
87    }
88
89    pub fn skip_blocks_gc(&self) -> bool {
90        let flags = self.inner.meta.flags();
91        flags.contains(BlockFlags::SKIP_BLOCKS_GC)
92            && !flags.contains(BlockFlags::SKIP_BLOCKS_GC_FINISHED)
93    }
94
95    pub fn has_data(&self) -> bool {
96        const MASK: u32 = BlockFlags::HAS_DATA.bits() | BlockFlags::IS_REMOVED.bits();
97        let flags = self.inner.meta.flags();
98        flags.bits() & MASK == BlockFlags::HAS_DATA.bits()
99    }
100
101    pub fn has_proof(&self) -> bool {
102        const MASK: u32 = BlockFlags::HAS_PROOF.bits() | BlockFlags::IS_REMOVED.bits();
103        let flags = self.inner.meta.flags();
104        flags.bits() & MASK == BlockFlags::HAS_PROOF.bits()
105    }
106
107    pub fn has_queue_diff(&self) -> bool {
108        const MASK: u32 = BlockFlags::HAS_QUEUE_DIFF.bits() | BlockFlags::IS_REMOVED.bits();
109        let flags = self.inner.meta.flags();
110        flags.bits() & MASK == BlockFlags::HAS_QUEUE_DIFF.bits()
111    }
112
113    pub fn has_all_block_parts(&self) -> bool {
114        const MASK: u32 = BlockFlags::HAS_ALL_BLOCK_PARTS.bits() | BlockFlags::IS_REMOVED.bits();
115        let flags = self.inner.meta.flags();
116        flags.bits() & MASK == BlockFlags::HAS_ALL_BLOCK_PARTS.bits()
117    }
118
119    pub fn has_next1(&self) -> bool {
120        self.inner.meta.flags().contains(BlockFlags::HAS_NEXT_1)
121    }
122
123    pub fn has_state(&self) -> bool {
124        self.inner.meta.flags().contains(BlockFlags::HAS_STATE)
125    }
126
127    pub fn has_virtual_state(&self) -> bool {
128        self.inner
129            .meta
130            .flags()
131            .contains(BlockFlags::HAS_VIRTUAL_STATE)
132    }
133
134    pub fn has_persistent_shard_state(&self) -> bool {
135        self.inner
136            .meta
137            .flags()
138            .contains(BlockFlags::HAS_PERSISTENT_SHARD_STATE)
139    }
140
141    pub fn has_persistent_queue_state(&self) -> bool {
142        self.inner
143            .meta
144            .flags()
145            .contains(BlockFlags::HAS_PERSISTENT_QUEUE_STATE)
146    }
147
148    pub fn ref_by_mc_seqno(&self) -> u32 {
149        if self.inner.id.shard.is_masterchain() {
150            self.inner.id.seqno
151        } else {
152            self.inner.meta.ref_by_mc_seqno()
153        }
154    }
155
156    pub(crate) fn block_data_lock(&self) -> &BlockDataLock {
157        &self.inner.block_data_lock
158    }
159
160    pub(crate) fn proof_data_lock(&self) -> &BlockDataLock {
161        &self.inner.proof_data_block
162    }
163
164    pub(crate) fn queue_diff_data_lock(&self) -> &BlockDataLock {
165        &self.inner.queue_diff_data_lock
166    }
167}
168
169unsafe impl arc_swap::RefCnt for BlockHandle {
170    type Base = Inner;
171
172    fn into_ptr(me: Self) -> *mut Self::Base {
173        arc_swap::RefCnt::into_ptr(me.inner)
174    }
175
176    fn as_ptr(me: &Self) -> *mut Self::Base {
177        arc_swap::RefCnt::as_ptr(&me.inner)
178    }
179
180    unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
181        Self {
182            inner: unsafe { arc_swap::RefCnt::from_ptr(ptr) },
183        }
184    }
185}
186
187#[doc(hidden)]
188pub struct Inner {
189    id: BlockId,
190    meta: BlockMeta,
191    block_data_lock: BlockDataLock,
192    proof_data_block: BlockDataLock,
193    queue_diff_data_lock: BlockDataLock,
194    cache: Arc<BlockHandleCache>,
195}
196
197impl Drop for Inner {
198    fn drop(&mut self) {
199        self.cache
200            .remove_if(&self.id, |_, weak| weak.strong_count() == 0);
201    }
202}
203
204pub(crate) struct BlockDataLock {
205    semaphore: Semaphore,
206}
207
208impl BlockDataLock {
209    const fn new() -> Self {
210        Self {
211            semaphore: Semaphore::const_new(MAX_READS as usize),
212        }
213    }
214
215    pub async fn read(&self) -> BlockDataGuard<'_> {
216        BlockDataGuard(self.semaphore.acquire().await.unwrap_or_else(|_| {
217            // The semaphore was closed. but, we never explicitly close it.
218            unreachable!()
219        }))
220    }
221
222    pub async fn write(&self) -> BlockDataGuard<'_> {
223        BlockDataGuard(
224            self.semaphore
225                .acquire_many(MAX_READS)
226                .await
227                .unwrap_or_else(|_| {
228                    // The semaphore was closed. but, we never explicitly close it.
229                    unreachable!()
230                }),
231        )
232    }
233}
234
235impl Default for BlockDataLock {
236    fn default() -> Self {
237        Self::new()
238    }
239}
240
241pub(crate) struct BlockDataGuard<'a>(#[allow(unused)] SemaphorePermit<'a>);
242
243const MAX_READS: u32 = u32::MAX >> 3;