Skip to main content

tycho_core/storage/node_state/
mod.rs

1use std::cmp::Ordering;
2use std::sync::Arc;
3
4use arc_swap::ArcSwapOption;
5use bytes::Bytes;
6use tycho_storage::kv::InstanceId;
7use tycho_types::models::*;
8use tycho_types::prelude::*;
9use tycho_util::FastHasherState;
10use weedb::rocksdb;
11
12use super::CoreDb;
13use super::util::{read_block_id_le, write_block_id_le};
14use crate::global_config::ZerostateId;
15
16pub struct NodeStateStorage {
17    db: CoreDb,
18    last_mc_block_id: BlockIdCache,
19    init_mc_block_id: BlockIdCache,
20    zerostate_id: ArcSwapOption<ZerostateId>,
21    zerostate_proof: ArcSwapOption<Cell>,
22    zerostate_proof_bytes: ArcSwapOption<Bytes>,
23}
24
25pub enum NodeSyncState {
26    PersistentState,
27    Blocks,
28}
29
30impl NodeStateStorage {
31    pub fn new(db: CoreDb) -> Self {
32        let this = Self {
33            db,
34            last_mc_block_id: (Default::default(), LAST_MC_BLOCK_ID),
35            init_mc_block_id: (Default::default(), INIT_MC_BLOCK_ID),
36            zerostate_id: Default::default(),
37            zerostate_proof: Default::default(),
38            zerostate_proof_bytes: Default::default(),
39        };
40
41        let state = &this.db.state;
42        if state.get(INSTANCE_ID).unwrap().is_none() {
43            state
44                .insert(INSTANCE_ID, rand::random::<InstanceId>())
45                .unwrap();
46        }
47
48        this
49    }
50
51    pub fn get_node_sync_state(&self) -> Option<NodeSyncState> {
52        let init = self.load_init_mc_block_id()?;
53        let last = self.load_last_mc_block_id()?;
54
55        match last.seqno.cmp(&init.seqno) {
56            Ordering::Equal => Some(NodeSyncState::PersistentState),
57            Ordering::Greater => Some(NodeSyncState::Blocks),
58            Ordering::Less => None,
59        }
60    }
61
62    pub fn store_last_mc_block_id(&self, id: &BlockId) {
63        self.store_block_id(&self.last_mc_block_id, id);
64    }
65
66    pub fn load_last_mc_block_id(&self) -> Option<BlockId> {
67        self.load_block_id(&self.last_mc_block_id)
68    }
69
70    pub fn store_init_mc_block_id(&self, id: &BlockId) {
71        self.store_block_id(&self.init_mc_block_id, id);
72    }
73
74    pub fn load_init_mc_block_id(&self) -> Option<BlockId> {
75        self.load_block_id(&self.init_mc_block_id)
76    }
77
78    pub fn store_zerostate_info(&self, zerostate_id: &ZerostateId, zerostate_proof: &Cell) {
79        let node_states = &self.db.state;
80
81        let mut value = Vec::with_capacity(256);
82
83        let mut batch = rocksdb::WriteBatch::default();
84
85        // Insert zerostate ID.
86        value.extend_from_slice(&zerostate_id.seqno.to_le_bytes());
87        value.extend_from_slice(zerostate_id.root_hash.as_slice());
88        value.extend_from_slice(zerostate_id.file_hash.as_slice());
89        batch.put_cf(&node_states.cf(), ZEROSTATE_ID, value.as_slice());
90
91        // Insert zerostate proof.
92        value.clear();
93        tycho_types::boc::ser::BocHeader::<FastHasherState>::with_root(zerostate_proof.as_ref())
94            .encode(&mut value);
95        batch.put_cf(&node_states.cf(), ZEROSTATE_PROOF, value);
96
97        // Apply
98        node_states
99            .db()
100            .write_opt(batch, node_states.write_config())
101            .unwrap();
102    }
103
104    pub fn load_zerostate_id(&self) -> Option<ZerostateId> {
105        if let Some(cached) = &*self.zerostate_id.load() {
106            return Some(*cached.as_ref());
107        }
108
109        let value = self.load_zerostate_id_no_cache()?;
110        self.zerostate_id.store(Some(Arc::new(value)));
111        Some(value)
112    }
113
114    pub fn load_zerostate_mc_seqno(&self) -> Option<u32> {
115        if let Some(cached) = &*self.zerostate_id.load() {
116            return Some(cached.seqno);
117        }
118
119        let value = self.load_zerostate_id_no_cache()?;
120        self.zerostate_id.store(Some(Arc::new(value)));
121        Some(value.seqno)
122    }
123
124    fn load_zerostate_id_no_cache(&self) -> Option<ZerostateId> {
125        let value = self.db.state.get(ZEROSTATE_ID).unwrap()?;
126        let value = value.as_ref();
127        Some(ZerostateId {
128            seqno: u32::from_le_bytes(value[0..4].try_into().unwrap()),
129            root_hash: HashBytes::from_slice(&value[4..36]),
130            file_hash: HashBytes::from_slice(&value[36..68]),
131        })
132    }
133
134    pub fn load_zerostate_proof(&self) -> Option<Cell> {
135        if let Some(cached) = &*self.zerostate_proof.load() {
136            return Some(cached.as_ref().clone());
137        }
138
139        let bytes = self.load_zerostate_proof_bytes()?;
140        let cell = Boc::decode(&bytes).unwrap();
141
142        self.zerostate_proof.store(Some(Arc::new(cell.clone())));
143        Some(cell)
144    }
145
146    pub fn load_zerostate_proof_bytes(&self) -> Option<Bytes> {
147        if let Some(cached) = &*self.zerostate_proof_bytes.load() {
148            return Some(cached.as_ref().clone());
149        }
150
151        let value = self.db.state.get(ZEROSTATE_PROOF).unwrap()?;
152        let bytes = Bytes::copy_from_slice(value.as_ref());
153
154        self.zerostate_proof_bytes
155            .store(Some(Arc::new(bytes.clone())));
156
157        Some(bytes)
158    }
159
160    #[inline(always)]
161    fn store_block_id(&self, (cache, key): &BlockIdCache, block_id: &BlockId) {
162        let node_states = &self.db.state;
163        node_states
164            .insert(key, write_block_id_le(block_id))
165            .unwrap();
166        cache.store(Some(Arc::new(*block_id)));
167    }
168
169    #[inline(always)]
170    fn load_block_id(&self, (cache, key): &BlockIdCache) -> Option<BlockId> {
171        if let Some(cached) = &*cache.load() {
172            return Some(*cached.as_ref());
173        }
174
175        let value = read_block_id_le(&self.db.state.get(key).unwrap()?);
176        cache.store(Some(Arc::new(value)));
177        Some(value)
178    }
179
180    pub fn load_instance_id(&self) -> InstanceId {
181        let id = self.db.state.get(INSTANCE_ID).unwrap().unwrap();
182        InstanceId::from_slice(id.as_ref())
183    }
184}
185
186type BlockIdCache = (ArcSwapOption<BlockId>, &'static [u8]);
187
188const LAST_MC_BLOCK_ID: &[u8] = b"last_mc_block";
189const INIT_MC_BLOCK_ID: &[u8] = b"init_mc_block";
190const INSTANCE_ID: &[u8] = b"instance_id";
191const ZEROSTATE_ID: &[u8] = b"zerostate_id";
192const ZEROSTATE_PROOF: &[u8] = b"zerostate_proof";