Skip to main content

tycho_core/storage/block_handle/
handle.rs

1use std::sync::{Arc, Weak};
2
3use tokio::sync::{Semaphore, SemaphorePermit};
4use tycho_types::models::*;
5
6use super::{BlockFlags, BlockHandleCache, BlockMeta};
7
8#[derive(Clone)]
9#[repr(transparent)]
10pub struct WeakBlockHandle {
11    inner: Weak<Inner>,
12}
13
14impl WeakBlockHandle {
15    pub fn strong_count(&self) -> usize {
16        self.inner.strong_count()
17    }
18
19    pub fn upgrade(&self) -> Option<BlockHandle> {
20        self.inner.upgrade().map(|inner| BlockHandle { inner })
21    }
22}
23
24#[derive(Clone)]
25#[repr(transparent)]
26pub struct BlockHandle {
27    inner: Arc<Inner>,
28}
29
30impl BlockHandle {
31    pub fn new(id: &BlockId, meta: BlockMeta, cache: Arc<BlockHandleCache>) -> Self {
32        Self {
33            inner: Arc::new(Inner {
34                id: *id,
35                meta,
36                block_data_lock: Default::default(),
37                proof_data_block: Default::default(),
38                queue_diff_data_lock: Default::default(),
39                cache,
40            }),
41        }
42    }
43
44    pub fn downgrade(&self) -> WeakBlockHandle {
45        WeakBlockHandle {
46            inner: Arc::downgrade(&self.inner),
47        }
48    }
49
50    pub fn id(&self) -> &BlockId {
51        &self.inner.id
52    }
53
54    pub fn is_masterchain(&self) -> bool {
55        self.inner.id.is_masterchain()
56    }
57
58    pub fn meta(&self) -> &BlockMeta {
59        &self.inner.meta
60    }
61
62    pub fn gen_utime(&self) -> u32 {
63        self.inner.meta.gen_utime()
64    }
65
66    pub fn is_key_block(&self) -> bool {
67        self.inner.meta.flags().contains(BlockFlags::IS_KEY_BLOCK)
68            || self.inner.id.is_masterchain() && self.inner.id.seqno == 0
69    }
70
71    pub fn is_committed(&self) -> bool {
72        self.inner.meta.flags().contains(BlockFlags::IS_COMMITTED)
73    }
74
75    pub fn is_persistent(&self) -> bool {
76        self.inner.meta.flags().contains(BlockFlags::IS_PERSISTENT) || self.inner.id.seqno == 0
77    }
78
79    pub fn is_zerostate(&self) -> bool {
80        self.inner.meta.flags().contains(BlockFlags::IS_ZEROSTATE) || self.inner.id.seqno == 0
81    }
82
83    pub fn has_data(&self) -> bool {
84        const MASK: u32 = BlockFlags::HAS_DATA.bits() | BlockFlags::IS_REMOVED.bits();
85        let flags = self.inner.meta.flags();
86        flags.bits() & MASK == BlockFlags::HAS_DATA.bits()
87    }
88
89    pub fn has_proof(&self) -> bool {
90        const MASK: u32 = BlockFlags::HAS_PROOF.bits() | BlockFlags::IS_REMOVED.bits();
91        let flags = self.inner.meta.flags();
92        flags.bits() & MASK == BlockFlags::HAS_PROOF.bits()
93    }
94
95    pub fn has_queue_diff(&self) -> bool {
96        const MASK: u32 = BlockFlags::HAS_QUEUE_DIFF.bits() | BlockFlags::IS_REMOVED.bits();
97        let flags = self.inner.meta.flags();
98        flags.bits() & MASK == BlockFlags::HAS_QUEUE_DIFF.bits()
99    }
100
101    pub fn has_all_block_parts(&self) -> bool {
102        const MASK: u32 = BlockFlags::HAS_ALL_BLOCK_PARTS.bits() | BlockFlags::IS_REMOVED.bits();
103        let flags = self.inner.meta.flags();
104        flags.bits() & MASK == BlockFlags::HAS_ALL_BLOCK_PARTS.bits()
105    }
106
107    pub fn has_next1(&self) -> bool {
108        self.inner.meta.flags().contains(BlockFlags::HAS_NEXT_1)
109    }
110
111    pub fn has_state(&self) -> bool {
112        self.inner.meta.flags().contains(BlockFlags::HAS_STATE)
113    }
114
115    pub fn has_persistent_shard_state(&self) -> bool {
116        self.inner
117            .meta
118            .flags()
119            .contains(BlockFlags::HAS_PERSISTENT_SHARD_STATE)
120    }
121
122    pub fn has_persistent_queue_state(&self) -> bool {
123        self.inner
124            .meta
125            .flags()
126            .contains(BlockFlags::HAS_PERSISTENT_QUEUE_STATE)
127    }
128
129    pub fn ref_by_mc_seqno(&self) -> u32 {
130        if self.inner.id.shard.is_masterchain() {
131            self.inner.id.seqno
132        } else {
133            self.inner.meta.ref_by_mc_seqno()
134        }
135    }
136
137    pub(crate) fn block_data_lock(&self) -> &BlockDataLock {
138        &self.inner.block_data_lock
139    }
140
141    pub(crate) fn proof_data_lock(&self) -> &BlockDataLock {
142        &self.inner.proof_data_block
143    }
144
145    pub(crate) fn queue_diff_data_lock(&self) -> &BlockDataLock {
146        &self.inner.queue_diff_data_lock
147    }
148}
149
150unsafe impl arc_swap::RefCnt for BlockHandle {
151    type Base = Inner;
152
153    fn into_ptr(me: Self) -> *mut Self::Base {
154        arc_swap::RefCnt::into_ptr(me.inner)
155    }
156
157    fn as_ptr(me: &Self) -> *mut Self::Base {
158        arc_swap::RefCnt::as_ptr(&me.inner)
159    }
160
161    unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
162        Self {
163            inner: unsafe { arc_swap::RefCnt::from_ptr(ptr) },
164        }
165    }
166}
167
168#[doc(hidden)]
169pub struct Inner {
170    id: BlockId,
171    meta: BlockMeta,
172    block_data_lock: BlockDataLock,
173    proof_data_block: BlockDataLock,
174    queue_diff_data_lock: BlockDataLock,
175    cache: Arc<BlockHandleCache>,
176}
177
178impl Drop for Inner {
179    fn drop(&mut self) {
180        self.cache
181            .remove_if(&self.id, |_, weak| weak.strong_count() == 0);
182    }
183}
184
185pub(crate) struct BlockDataLock {
186    semaphore: Semaphore,
187}
188
189impl BlockDataLock {
190    const fn new() -> Self {
191        Self {
192            semaphore: Semaphore::const_new(MAX_READS as usize),
193        }
194    }
195
196    pub async fn read(&self) -> BlockDataGuard<'_> {
197        BlockDataGuard(self.semaphore.acquire().await.unwrap_or_else(|_| {
198            // The semaphore was closed. but, we never explicitly close it.
199            unreachable!()
200        }))
201    }
202
203    pub async fn write(&self) -> BlockDataGuard<'_> {
204        BlockDataGuard(
205            self.semaphore
206                .acquire_many(MAX_READS)
207                .await
208                .unwrap_or_else(|_| {
209                    // The semaphore was closed. but, we never explicitly close it.
210                    unreachable!()
211                }),
212        )
213    }
214}
215
216impl Default for BlockDataLock {
217    fn default() -> Self {
218        Self::new()
219    }
220}
221
222pub(crate) struct BlockDataGuard<'a>(#[allow(unused)] SemaphorePermit<'a>);
223
224const MAX_READS: u32 = u32::MAX >> 3;