tycho_core/storage/node_state/
mod.rs1use std::cmp::Ordering;
2
3use parking_lot::Mutex;
4use tycho_storage::kv::InstanceId;
5use tycho_types::models::*;
6
7use super::CoreDb;
8use super::util::{read_block_id_le, write_block_id_le};
9
10pub struct NodeStateStorage {
11 db: CoreDb,
12 last_mc_block_id: BlockIdCache,
13 init_mc_block_id: BlockIdCache,
14}
15
16pub enum NodeSyncState {
17 PersistentState,
18 Blocks,
19}
20
21impl NodeStateStorage {
22 pub fn new(db: CoreDb) -> Self {
23 let this = Self {
24 db,
25 last_mc_block_id: (Default::default(), LAST_MC_BLOCK_ID),
26 init_mc_block_id: (Default::default(), INIT_MC_BLOCK_ID),
27 };
28
29 let state = &this.db.state;
30 if state.get(INSTANCE_ID).unwrap().is_none() {
31 state
32 .insert(INSTANCE_ID, rand::random::<InstanceId>())
33 .unwrap();
34 }
35
36 this
37 }
38
39 pub fn get_node_sync_state(&self) -> Option<NodeSyncState> {
40 let init = self.load_init_mc_block_id()?;
41 let last = self.load_last_mc_block_id()?;
42
43 match last.seqno.cmp(&init.seqno) {
44 Ordering::Equal => Some(NodeSyncState::PersistentState),
45 Ordering::Greater => Some(NodeSyncState::Blocks),
46 Ordering::Less => None,
47 }
48 }
49
50 pub fn store_last_mc_block_id(&self, id: &BlockId) {
51 self.store_block_id(&self.last_mc_block_id, id);
52 }
53
54 pub fn load_last_mc_block_id(&self) -> Option<BlockId> {
55 self.load_block_id(&self.last_mc_block_id)
56 }
57
58 pub fn store_init_mc_block_id(&self, id: &BlockId) {
59 self.store_block_id(&self.init_mc_block_id, id);
60 }
61
62 pub fn load_init_mc_block_id(&self) -> Option<BlockId> {
63 self.load_block_id(&self.init_mc_block_id)
64 }
65
66 #[inline(always)]
67 fn store_block_id(&self, (cache, key): &BlockIdCache, block_id: &BlockId) {
68 let node_states = &self.db.state;
69 node_states
70 .insert(key, write_block_id_le(block_id))
71 .unwrap();
72 *cache.lock() = Some(*block_id);
73 }
74
75 #[inline(always)]
76 fn load_block_id(&self, (cache, key): &BlockIdCache) -> Option<BlockId> {
77 if let Some(cached) = &*cache.lock() {
78 return Some(*cached);
79 }
80
81 let value = read_block_id_le(&self.db.state.get(key).unwrap()?);
82 *cache.lock() = Some(value);
83 Some(value)
84 }
85
86 pub fn load_instance_id(&self) -> InstanceId {
87 let id = self.db.state.get(INSTANCE_ID).unwrap().unwrap();
88 InstanceId::from_slice(id.as_ref())
89 }
90}
91
92type BlockIdCache = (Mutex<Option<BlockId>>, &'static [u8]);
93
94const LAST_MC_BLOCK_ID: &[u8] = b"last_mc_block";
95const INIT_MC_BLOCK_ID: &[u8] = b"init_mc_block";
96const INSTANCE_ID: &[u8] = b"instance_id";