tycho_core/storage/block_handle/
meta.rs1use std::sync::atomic::{AtomicU64, Ordering};
2
3use bytes::Buf;
4use tycho_storage::kv::{StoredValue, StoredValueBuffer};
5
6#[derive(Debug, Copy, Clone)]
7pub struct NewBlockMeta {
8 pub is_key_block: bool,
9 pub gen_utime: u32,
10 pub ref_by_mc_seqno: u32,
11}
12
13impl NewBlockMeta {
14 pub fn zero_state(gen_utime: u32, is_key_block: bool) -> Self {
15 Self {
16 is_key_block,
17 gen_utime,
18 ref_by_mc_seqno: 0,
19 }
20 }
21}
22
23#[derive(Debug, Default)]
24pub struct BlockMeta {
25 flags: AtomicU64,
26 gen_utime: u32,
27}
28
29#[derive(Debug, Clone, Copy)]
30pub struct LoadedBlockMeta {
31 pub flags: BlockFlags,
32 pub mc_ref_seqno: u32,
33 pub gen_utime: u32,
34}
35
36impl BlockMeta {
37 pub fn with_data(data: NewBlockMeta) -> Self {
38 const IS_KEY_BLOCK_MASK: u64 =
39 (BlockFlags::IS_KEY_BLOCK.bits() as u64) << BLOCK_FLAGS_OFFSET;
40
41 Self {
42 flags: AtomicU64::new(
43 if data.is_key_block {
44 IS_KEY_BLOCK_MASK
45 } else {
46 0
47 } | data.ref_by_mc_seqno as u64,
48 ),
49 gen_utime: data.gen_utime,
50 }
51 }
52
53 pub fn load(&self) -> LoadedBlockMeta {
54 let flags = self.flags.load(Ordering::Acquire);
55 LoadedBlockMeta {
56 flags: BlockFlags::from_bits_retain((flags >> BLOCK_FLAGS_OFFSET) as u32),
57 mc_ref_seqno: flags as u32,
58 gen_utime: self.gen_utime,
59 }
60 }
61
62 pub fn flags(&self) -> BlockFlags {
63 let flags = self.flags.load(Ordering::Acquire) >> BLOCK_FLAGS_OFFSET;
64 BlockFlags::from_bits_retain(flags as u32)
65 }
66
67 pub fn ref_by_mc_seqno(&self) -> u32 {
68 self.flags.load(Ordering::Acquire) as u32
69 }
70
71 pub fn gen_utime(&self) -> u32 {
72 self.gen_utime
73 }
74
75 pub fn add_flags(&self, flags: BlockFlags) -> bool {
76 let flags = (flags.bits() as u64) << BLOCK_FLAGS_OFFSET;
77 self.flags.fetch_or(flags, Ordering::Release) & flags != flags
78 }
79
80 pub fn remove_flags(&self, flags: BlockFlags) -> bool {
81 let flags = (flags.bits() as u64) << BLOCK_FLAGS_OFFSET;
82 self.flags.fetch_and(!flags, Ordering::Release) & flags != 0
83 }
84}
85
86impl StoredValue for BlockMeta {
87 const SIZE_HINT: usize = 8 + 4;
90
91 type OnStackSlice = [u8; Self::SIZE_HINT];
92
93 fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
94 let flags = self.flags.load(Ordering::Acquire);
95 buffer.write_raw_slice(&flags.to_le_bytes());
96 buffer.write_raw_slice(&self.gen_utime.to_le_bytes());
97 }
98
99 fn deserialize(reader: &mut &[u8]) -> Self
100 where
101 Self: Sized,
102 {
103 assert_eq!(reader.len(), Self::SIZE_HINT, "invalid block meta");
104 let flags = reader.get_u64_le();
105 let gen_utime = reader.get_u32_le();
106
107 Self {
108 flags: AtomicU64::new(flags),
109 gen_utime,
110 }
111 }
112}
113
114bitflags::bitflags! {
115 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
116 pub struct BlockFlags: u32 {
117 const HAS_DATA = 1 << 0;
119 const HAS_PROOF = 1 << 1;
120 const HAS_QUEUE_DIFF = 1 << 2;
121
122 const HAS_STATE = 1 << 3;
123 const HAS_PERSISTENT_SHARD_STATE = 1 << 4;
124 const HAS_PERSISTENT_QUEUE_STATE = 1 << 5;
125
126 const HAS_NEXT_1 = 1 << 6;
127 const HAS_NEXT_2 = 1 << 7;
128 const HAS_PREV_1 = 1 << 8;
129 const HAS_PREV_2 = 1 << 9;
130 const IS_COMMITTED = 1 << 10;
131 const IS_KEY_BLOCK = 1 << 11;
132 const IS_PERSISTENT = 1 << 12;
133
134 const IS_REMOVED = 1 << 15;
135
136 const HAS_ALL_BLOCK_PARTS =
138 Self::HAS_DATA.bits() | Self::HAS_PROOF.bits() | Self::HAS_QUEUE_DIFF.bits();
139 }
140}
141
142const BLOCK_FLAGS_OFFSET: usize = 32;
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn meta_store_load() {
150 let meta = BlockMeta::with_data(NewBlockMeta {
151 is_key_block: true,
152 gen_utime: 123456789,
153 ref_by_mc_seqno: 4311231,
154 });
155 assert_eq!(meta.flags(), BlockFlags::IS_KEY_BLOCK);
156 assert_eq!(meta.ref_by_mc_seqno(), 4311231);
157 assert_eq!(meta.gen_utime(), 123456789);
158
159 let stored = meta.to_vec();
160 assert_eq!(stored.len(), BlockMeta::SIZE_HINT);
161
162 let loaded = BlockMeta::from_slice(&stored);
163 assert_eq!(loaded.flags(), BlockFlags::IS_KEY_BLOCK);
164 assert_eq!(loaded.ref_by_mc_seqno(), 4311231);
165 assert_eq!(loaded.gen_utime(), 123456789);
166
167 let updated = meta.add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
168 assert!(updated);
169 assert_eq!(
170 meta.flags(),
171 BlockFlags::HAS_ALL_BLOCK_PARTS | BlockFlags::IS_KEY_BLOCK
172 );
173
174 meta.add_flags(BlockFlags::IS_REMOVED);
175 assert_eq!(
176 meta.flags(),
177 BlockFlags::IS_KEY_BLOCK | BlockFlags::HAS_ALL_BLOCK_PARTS | BlockFlags::IS_REMOVED
178 );
179 }
180}