tycho_core/storage/block_handle/
handle.rs

1use std::sync::{Arc, Weak};
2
3use tokio::sync::{Semaphore, SemaphorePermit};
4use tycho_types::models::*;
5
6use super::{BlockFlags, BlockHandleCache, BlockMeta};
7
8#[derive(Clone)]
9#[repr(transparent)]
10pub struct WeakBlockHandle {
11    inner: Weak<Inner>,
12}
13
14impl WeakBlockHandle {
15    pub fn strong_count(&self) -> usize {
16        self.inner.strong_count()
17    }
18
19    pub fn upgrade(&self) -> Option<BlockHandle> {
20        self.inner.upgrade().map(|inner| BlockHandle { inner })
21    }
22}
23
24#[derive(Clone)]
25#[repr(transparent)]
26pub struct BlockHandle {
27    inner: Arc<Inner>,
28}
29
30impl BlockHandle {
31    pub(crate) fn new(id: &BlockId, meta: BlockMeta, cache: Arc<BlockHandleCache>) -> Self {
32        Self {
33            inner: Arc::new(Inner {
34                id: *id,
35                meta,
36                block_data_lock: Default::default(),
37                proof_data_block: Default::default(),
38                queue_diff_data_lock: Default::default(),
39                cache,
40            }),
41        }
42    }
43
44    pub fn downgrade(&self) -> WeakBlockHandle {
45        WeakBlockHandle {
46            inner: Arc::downgrade(&self.inner),
47        }
48    }
49
50    pub fn id(&self) -> &BlockId {
51        &self.inner.id
52    }
53
54    pub fn is_masterchain(&self) -> bool {
55        self.inner.id.is_masterchain()
56    }
57
58    pub fn meta(&self) -> &BlockMeta {
59        &self.inner.meta
60    }
61
62    pub fn gen_utime(&self) -> u32 {
63        self.inner.meta.gen_utime()
64    }
65
66    pub fn is_key_block(&self) -> bool {
67        self.inner.meta.flags().contains(BlockFlags::IS_KEY_BLOCK)
68            || self.inner.id.is_masterchain() && self.inner.id.seqno == 0
69    }
70
71    pub fn is_committed(&self) -> bool {
72        self.inner.meta.flags().contains(BlockFlags::IS_COMMITTED)
73    }
74
75    pub fn is_persistent(&self) -> bool {
76        self.inner.meta.flags().contains(BlockFlags::IS_PERSISTENT) || self.inner.id.seqno == 0
77    }
78
79    pub fn has_data(&self) -> bool {
80        const MASK: u32 = BlockFlags::HAS_DATA.bits() | BlockFlags::IS_REMOVED.bits();
81        let flags = self.inner.meta.flags();
82        flags.bits() & MASK == BlockFlags::HAS_DATA.bits()
83    }
84
85    pub fn has_proof(&self) -> bool {
86        const MASK: u32 = BlockFlags::HAS_PROOF.bits() | BlockFlags::IS_REMOVED.bits();
87        let flags = self.inner.meta.flags();
88        flags.bits() & MASK == BlockFlags::HAS_PROOF.bits()
89    }
90
91    pub fn has_queue_diff(&self) -> bool {
92        const MASK: u32 = BlockFlags::HAS_QUEUE_DIFF.bits() | BlockFlags::IS_REMOVED.bits();
93        let flags = self.inner.meta.flags();
94        flags.bits() & MASK == BlockFlags::HAS_QUEUE_DIFF.bits()
95    }
96
97    pub fn has_all_block_parts(&self) -> bool {
98        const MASK: u32 = BlockFlags::HAS_ALL_BLOCK_PARTS.bits() | BlockFlags::IS_REMOVED.bits();
99        let flags = self.inner.meta.flags();
100        flags.bits() & MASK == BlockFlags::HAS_ALL_BLOCK_PARTS.bits()
101    }
102
103    pub fn has_next1(&self) -> bool {
104        self.inner.meta.flags().contains(BlockFlags::HAS_NEXT_1)
105    }
106
107    pub fn has_state(&self) -> bool {
108        self.inner.meta.flags().contains(BlockFlags::HAS_STATE)
109    }
110
111    pub fn has_persistent_shard_state(&self) -> bool {
112        self.inner
113            .meta
114            .flags()
115            .contains(BlockFlags::HAS_PERSISTENT_SHARD_STATE)
116    }
117
118    pub fn has_persistent_queue_state(&self) -> bool {
119        self.inner
120            .meta
121            .flags()
122            .contains(BlockFlags::HAS_PERSISTENT_QUEUE_STATE)
123    }
124
125    pub fn ref_by_mc_seqno(&self) -> u32 {
126        if self.inner.id.shard.is_masterchain() {
127            self.inner.id.seqno
128        } else {
129            self.inner.meta.ref_by_mc_seqno()
130        }
131    }
132
133    pub(crate) fn block_data_lock(&self) -> &BlockDataLock {
134        &self.inner.block_data_lock
135    }
136
137    pub(crate) fn proof_data_lock(&self) -> &BlockDataLock {
138        &self.inner.proof_data_block
139    }
140
141    pub(crate) fn queue_diff_data_lock(&self) -> &BlockDataLock {
142        &self.inner.queue_diff_data_lock
143    }
144}
145
146unsafe impl arc_swap::RefCnt for BlockHandle {
147    type Base = Inner;
148
149    fn into_ptr(me: Self) -> *mut Self::Base {
150        arc_swap::RefCnt::into_ptr(me.inner)
151    }
152
153    fn as_ptr(me: &Self) -> *mut Self::Base {
154        arc_swap::RefCnt::as_ptr(&me.inner)
155    }
156
157    unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
158        Self {
159            inner: unsafe { arc_swap::RefCnt::from_ptr(ptr) },
160        }
161    }
162}
163
164#[doc(hidden)]
165pub struct Inner {
166    id: BlockId,
167    meta: BlockMeta,
168    block_data_lock: BlockDataLock,
169    proof_data_block: BlockDataLock,
170    queue_diff_data_lock: BlockDataLock,
171    cache: Arc<BlockHandleCache>,
172}
173
174impl Drop for Inner {
175    fn drop(&mut self) {
176        self.cache
177            .remove_if(&self.id, |_, weak| weak.strong_count() == 0);
178    }
179}
180
181pub(crate) struct BlockDataLock {
182    semaphore: Semaphore,
183}
184
185impl BlockDataLock {
186    const fn new() -> Self {
187        Self {
188            semaphore: Semaphore::const_new(MAX_READS as usize),
189        }
190    }
191
192    pub async fn read(&self) -> BlockDataGuard<'_> {
193        BlockDataGuard(self.semaphore.acquire().await.unwrap_or_else(|_| {
194            // The semaphore was closed. but, we never explicitly close it.
195            unreachable!()
196        }))
197    }
198
199    pub async fn write(&self) -> BlockDataGuard<'_> {
200        BlockDataGuard(
201            self.semaphore
202                .acquire_many(MAX_READS)
203                .await
204                .unwrap_or_else(|_| {
205                    // The semaphore was closed. but, we never explicitly close it.
206                    unreachable!()
207                }),
208        )
209    }
210}
211
212impl Default for BlockDataLock {
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218pub(crate) struct BlockDataGuard<'a>(#[allow(unused)] SemaphorePermit<'a>);
219
220const MAX_READS: u32 = u32::MAX >> 3;