tycho_core/storage/node_state/
mod.rs

1use std::cmp::Ordering;
2
3use parking_lot::Mutex;
4use tycho_storage::kv::InstanceId;
5use tycho_types::models::*;
6
7use super::CoreDb;
8use super::util::{read_block_id_le, write_block_id_le};
9
10pub struct NodeStateStorage {
11    db: CoreDb,
12    last_mc_block_id: BlockIdCache,
13    init_mc_block_id: BlockIdCache,
14}
15
16pub enum NodeSyncState {
17    PersistentState,
18    Blocks,
19}
20
21impl NodeStateStorage {
22    pub fn new(db: CoreDb) -> Self {
23        let this = Self {
24            db,
25            last_mc_block_id: (Default::default(), LAST_MC_BLOCK_ID),
26            init_mc_block_id: (Default::default(), INIT_MC_BLOCK_ID),
27        };
28
29        let state = &this.db.state;
30        if state.get(INSTANCE_ID).unwrap().is_none() {
31            state
32                .insert(INSTANCE_ID, rand::random::<InstanceId>())
33                .unwrap();
34        }
35
36        this
37    }
38
39    pub fn get_node_sync_state(&self) -> Option<NodeSyncState> {
40        let init = self.load_init_mc_block_id()?;
41        let last = self.load_last_mc_block_id()?;
42
43        match last.seqno.cmp(&init.seqno) {
44            Ordering::Equal => Some(NodeSyncState::PersistentState),
45            Ordering::Greater => Some(NodeSyncState::Blocks),
46            Ordering::Less => None,
47        }
48    }
49
50    pub fn store_last_mc_block_id(&self, id: &BlockId) {
51        self.store_block_id(&self.last_mc_block_id, id);
52    }
53
54    pub fn load_last_mc_block_id(&self) -> Option<BlockId> {
55        self.load_block_id(&self.last_mc_block_id)
56    }
57
58    pub fn store_init_mc_block_id(&self, id: &BlockId) {
59        self.store_block_id(&self.init_mc_block_id, id);
60    }
61
62    pub fn load_init_mc_block_id(&self) -> Option<BlockId> {
63        self.load_block_id(&self.init_mc_block_id)
64    }
65
66    #[inline(always)]
67    fn store_block_id(&self, (cache, key): &BlockIdCache, block_id: &BlockId) {
68        let node_states = &self.db.state;
69        node_states
70            .insert(key, write_block_id_le(block_id))
71            .unwrap();
72        *cache.lock() = Some(*block_id);
73    }
74
75    #[inline(always)]
76    fn load_block_id(&self, (cache, key): &BlockIdCache) -> Option<BlockId> {
77        if let Some(cached) = &*cache.lock() {
78            return Some(*cached);
79        }
80
81        let value = read_block_id_le(&self.db.state.get(key).unwrap()?);
82        *cache.lock() = Some(value);
83        Some(value)
84    }
85
86    pub fn load_instance_id(&self) -> InstanceId {
87        let id = self.db.state.get(INSTANCE_ID).unwrap().unwrap();
88        InstanceId::from_slice(id.as_ref())
89    }
90}
91
92type BlockIdCache = (Mutex<Option<BlockId>>, &'static [u8]);
93
94const LAST_MC_BLOCK_ID: &[u8] = b"last_mc_block";
95const INIT_MC_BLOCK_ID: &[u8] = b"init_mc_block";
96const INSTANCE_ID: &[u8] = b"instance_id";