Skip to main content

tycho_core/storage/block_handle/
meta.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use bytes::Buf;
4use tycho_storage::kv::{StoredValue, StoredValueBuffer};
5
6#[derive(Debug, Copy, Clone)]
7pub struct NewBlockMeta {
8    pub is_key_block: bool,
9    pub gen_utime: u32,
10    pub ref_by_mc_seqno: u32,
11}
12
13impl NewBlockMeta {
14    pub fn zero_state(gen_utime: u32, is_key_block: bool) -> Self {
15        Self {
16            is_key_block,
17            gen_utime,
18            ref_by_mc_seqno: 0,
19        }
20    }
21}
22
23#[derive(Debug, Default)]
24pub struct BlockMeta {
25    flags: AtomicU64,
26    gen_utime: u32,
27}
28
29#[derive(Debug, Clone, Copy)]
30pub struct LoadedBlockMeta {
31    pub flags: BlockFlags,
32    pub mc_ref_seqno: u32,
33    pub gen_utime: u32,
34}
35
36impl BlockMeta {
37    pub fn with_data(data: NewBlockMeta) -> Self {
38        const IS_KEY_BLOCK_MASK: u64 =
39            (BlockFlags::IS_KEY_BLOCK.bits() as u64) << BLOCK_FLAGS_OFFSET;
40
41        Self {
42            flags: AtomicU64::new(
43                if data.is_key_block {
44                    IS_KEY_BLOCK_MASK
45                } else {
46                    0
47                } | data.ref_by_mc_seqno as u64,
48            ),
49            gen_utime: data.gen_utime,
50        }
51    }
52
53    pub fn load(&self) -> LoadedBlockMeta {
54        let flags = self.flags.load(Ordering::Acquire);
55        LoadedBlockMeta {
56            flags: BlockFlags::from_bits_retain((flags >> BLOCK_FLAGS_OFFSET) as u32),
57            mc_ref_seqno: flags as u32,
58            gen_utime: self.gen_utime,
59        }
60    }
61
62    pub fn flags(&self) -> BlockFlags {
63        let flags = self.flags.load(Ordering::Acquire) >> BLOCK_FLAGS_OFFSET;
64        BlockFlags::from_bits_retain(flags as u32)
65    }
66
67    pub fn ref_by_mc_seqno(&self) -> u32 {
68        self.flags.load(Ordering::Acquire) as u32
69    }
70
71    pub fn gen_utime(&self) -> u32 {
72        self.gen_utime
73    }
74
75    pub fn add_flags(&self, flags: BlockFlags) -> bool {
76        let flags = (flags.bits() as u64) << BLOCK_FLAGS_OFFSET;
77        self.flags.fetch_or(flags, Ordering::Release) & flags != flags
78    }
79
80    pub fn remove_flags(&self, flags: BlockFlags) -> bool {
81        let flags = (flags.bits() as u64) << BLOCK_FLAGS_OFFSET;
82        self.flags.fetch_and(!flags, Ordering::Release) & flags != 0
83    }
84}
85
86impl StoredValue for BlockMeta {
87    /// 8 bytes flags
88    /// 4 bytes `gen_utime`
89    const SIZE_HINT: usize = 8 + 4;
90
91    type OnStackSlice = [u8; Self::SIZE_HINT];
92
93    fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
94        let flags = self.flags.load(Ordering::Acquire);
95        buffer.write_raw_slice(&flags.to_le_bytes());
96        buffer.write_raw_slice(&self.gen_utime.to_le_bytes());
97    }
98
99    fn deserialize(reader: &mut &[u8]) -> Self
100    where
101        Self: Sized,
102    {
103        assert_eq!(reader.len(), Self::SIZE_HINT, "invalid block meta");
104        let flags = reader.get_u64_le();
105        let gen_utime = reader.get_u32_le();
106
107        Self {
108            flags: AtomicU64::new(flags),
109            gen_utime,
110        }
111    }
112}
113
114bitflags::bitflags! {
115    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
116    pub struct BlockFlags: u32 {
117        // Persistent flags
118        const HAS_DATA = 1 << 0;
119        const HAS_PROOF = 1 << 1;
120        const HAS_QUEUE_DIFF = 1 << 2;
121
122        const HAS_STATE = 1 << 3;
123        const HAS_PERSISTENT_SHARD_STATE = 1 << 4;
124        const HAS_PERSISTENT_QUEUE_STATE = 1 << 5;
125
126        const HAS_NEXT_1 = 1 << 6;
127        const HAS_NEXT_2 = 1 << 7;
128        const HAS_PREV_1 = 1 << 8;
129        const HAS_PREV_2 = 1 << 9;
130        const IS_COMMITTED = 1 << 10;
131        const IS_KEY_BLOCK = 1 << 11;
132        const IS_PERSISTENT = 1 << 12;
133
134        const IS_REMOVED = 1 << 15;
135        const IS_ZEROSTATE = 1 << 16;
136
137        // Composite flags
138        const HAS_ALL_BLOCK_PARTS =
139            Self::HAS_DATA.bits() | Self::HAS_PROOF.bits() | Self::HAS_QUEUE_DIFF.bits();
140    }
141}
142
143const BLOCK_FLAGS_OFFSET: usize = 32;
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn meta_store_load() {
151        let meta = BlockMeta::with_data(NewBlockMeta {
152            is_key_block: true,
153            gen_utime: 123456789,
154            ref_by_mc_seqno: 4311231,
155        });
156        assert_eq!(meta.flags(), BlockFlags::IS_KEY_BLOCK);
157        assert_eq!(meta.ref_by_mc_seqno(), 4311231);
158        assert_eq!(meta.gen_utime(), 123456789);
159
160        let stored = meta.to_vec();
161        assert_eq!(stored.len(), BlockMeta::SIZE_HINT);
162
163        let loaded = BlockMeta::from_slice(&stored);
164        assert_eq!(loaded.flags(), BlockFlags::IS_KEY_BLOCK);
165        assert_eq!(loaded.ref_by_mc_seqno(), 4311231);
166        assert_eq!(loaded.gen_utime(), 123456789);
167
168        let updated = meta.add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
169        assert!(updated);
170        assert_eq!(
171            meta.flags(),
172            BlockFlags::HAS_ALL_BLOCK_PARTS | BlockFlags::IS_KEY_BLOCK
173        );
174
175        meta.add_flags(BlockFlags::IS_REMOVED);
176        assert_eq!(
177            meta.flags(),
178            BlockFlags::IS_KEY_BLOCK | BlockFlags::HAS_ALL_BLOCK_PARTS | BlockFlags::IS_REMOVED
179        );
180    }
181}