tycho_core/storage/node_state/
mod.rs1use std::cmp::Ordering;
2use std::sync::Arc;
3
4use arc_swap::ArcSwapOption;
5use bytes::Bytes;
6use tycho_storage::kv::InstanceId;
7use tycho_types::models::*;
8use tycho_types::prelude::*;
9use tycho_util::FastHasherState;
10use weedb::rocksdb;
11
12use super::CoreDb;
13use super::util::{read_block_id_le, write_block_id_le};
14use crate::global_config::ZerostateId;
15
16pub struct NodeStateStorage {
17 db: CoreDb,
18 last_mc_block_id: BlockIdCache,
19 init_mc_block_id: BlockIdCache,
20 zerostate_id: ArcSwapOption<ZerostateId>,
21 zerostate_proof: ArcSwapOption<Cell>,
22 zerostate_proof_bytes: ArcSwapOption<Bytes>,
23}
24
25pub enum NodeSyncState {
26 PersistentState,
27 Blocks,
28}
29
30impl NodeStateStorage {
31 pub fn new(db: CoreDb) -> Self {
32 let this = Self {
33 db,
34 last_mc_block_id: (Default::default(), LAST_MC_BLOCK_ID),
35 init_mc_block_id: (Default::default(), INIT_MC_BLOCK_ID),
36 zerostate_id: Default::default(),
37 zerostate_proof: Default::default(),
38 zerostate_proof_bytes: Default::default(),
39 };
40
41 let state = &this.db.state;
42 if state.get(INSTANCE_ID).unwrap().is_none() {
43 state
44 .insert(INSTANCE_ID, rand::random::<InstanceId>())
45 .unwrap();
46 }
47
48 this
49 }
50
51 pub fn get_node_sync_state(&self) -> Option<NodeSyncState> {
52 let init = self.load_init_mc_block_id()?;
53 let last = self.load_last_mc_block_id()?;
54
55 match last.seqno.cmp(&init.seqno) {
56 Ordering::Equal => Some(NodeSyncState::PersistentState),
57 Ordering::Greater => Some(NodeSyncState::Blocks),
58 Ordering::Less => None,
59 }
60 }
61
62 pub fn store_last_mc_block_id(&self, id: &BlockId) {
63 self.store_block_id(&self.last_mc_block_id, id);
64 }
65
66 pub fn load_last_mc_block_id(&self) -> Option<BlockId> {
67 self.load_block_id(&self.last_mc_block_id)
68 }
69
70 pub fn store_init_mc_block_id(&self, id: &BlockId) {
71 self.store_block_id(&self.init_mc_block_id, id);
72 }
73
74 pub fn load_init_mc_block_id(&self) -> Option<BlockId> {
75 self.load_block_id(&self.init_mc_block_id)
76 }
77
78 pub fn store_zerostate_info(&self, zerostate_id: &ZerostateId, zerostate_proof: &Cell) {
79 let node_states = &self.db.state;
80
81 let mut value = Vec::with_capacity(256);
82
83 let mut batch = rocksdb::WriteBatch::default();
84
85 value.extend_from_slice(&zerostate_id.seqno.to_le_bytes());
87 value.extend_from_slice(zerostate_id.root_hash.as_slice());
88 value.extend_from_slice(zerostate_id.file_hash.as_slice());
89 batch.put_cf(&node_states.cf(), ZEROSTATE_ID, value.as_slice());
90
91 value.clear();
93 tycho_types::boc::ser::BocHeader::<FastHasherState>::with_root(zerostate_proof.as_ref())
94 .encode(&mut value);
95 batch.put_cf(&node_states.cf(), ZEROSTATE_PROOF, value);
96
97 node_states
99 .db()
100 .write_opt(batch, node_states.write_config())
101 .unwrap();
102 }
103
104 pub fn load_zerostate_id(&self) -> Option<ZerostateId> {
105 if let Some(cached) = &*self.zerostate_id.load() {
106 return Some(*cached.as_ref());
107 }
108
109 let value = self.load_zerostate_id_no_cache()?;
110 self.zerostate_id.store(Some(Arc::new(value)));
111 Some(value)
112 }
113
114 pub fn load_zerostate_mc_seqno(&self) -> Option<u32> {
115 if let Some(cached) = &*self.zerostate_id.load() {
116 return Some(cached.seqno);
117 }
118
119 let value = self.load_zerostate_id_no_cache()?;
120 self.zerostate_id.store(Some(Arc::new(value)));
121 Some(value.seqno)
122 }
123
124 fn load_zerostate_id_no_cache(&self) -> Option<ZerostateId> {
125 let value = self.db.state.get(ZEROSTATE_ID).unwrap()?;
126 let value = value.as_ref();
127 Some(ZerostateId {
128 seqno: u32::from_le_bytes(value[0..4].try_into().unwrap()),
129 root_hash: HashBytes::from_slice(&value[4..36]),
130 file_hash: HashBytes::from_slice(&value[36..68]),
131 })
132 }
133
134 pub fn load_zerostate_proof(&self) -> Option<Cell> {
135 if let Some(cached) = &*self.zerostate_proof.load() {
136 return Some(cached.as_ref().clone());
137 }
138
139 let bytes = self.load_zerostate_proof_bytes()?;
140 let cell = Boc::decode(&bytes).unwrap();
141
142 self.zerostate_proof.store(Some(Arc::new(cell.clone())));
143 Some(cell)
144 }
145
146 pub fn load_zerostate_proof_bytes(&self) -> Option<Bytes> {
147 if let Some(cached) = &*self.zerostate_proof_bytes.load() {
148 return Some(cached.as_ref().clone());
149 }
150
151 let value = self.db.state.get(ZEROSTATE_PROOF).unwrap()?;
152 let bytes = Bytes::copy_from_slice(value.as_ref());
153
154 self.zerostate_proof_bytes
155 .store(Some(Arc::new(bytes.clone())));
156
157 Some(bytes)
158 }
159
160 #[inline(always)]
161 fn store_block_id(&self, (cache, key): &BlockIdCache, block_id: &BlockId) {
162 let node_states = &self.db.state;
163 node_states
164 .insert(key, write_block_id_le(block_id))
165 .unwrap();
166 cache.store(Some(Arc::new(*block_id)));
167 }
168
169 #[inline(always)]
170 fn load_block_id(&self, (cache, key): &BlockIdCache) -> Option<BlockId> {
171 if let Some(cached) = &*cache.load() {
172 return Some(*cached.as_ref());
173 }
174
175 let value = read_block_id_le(&self.db.state.get(key).unwrap()?);
176 cache.store(Some(Arc::new(value)));
177 Some(value)
178 }
179
180 pub fn load_instance_id(&self) -> InstanceId {
181 let id = self.db.state.get(INSTANCE_ID).unwrap().unwrap();
182 InstanceId::from_slice(id.as_ref())
183 }
184}
185
186type BlockIdCache = (ArcSwapOption<BlockId>, &'static [u8]);
187
188const LAST_MC_BLOCK_ID: &[u8] = b"last_mc_block";
189const INIT_MC_BLOCK_ID: &[u8] = b"init_mc_block";
190const INSTANCE_ID: &[u8] = b"instance_id";
191const ZEROSTATE_ID: &[u8] = b"zerostate_id";
192const ZEROSTATE_PROOF: &[u8] = b"zerostate_proof";