kaspa_consensus/model/stores/
headers.rs

1use std::sync::Arc;
2
3use kaspa_consensus_core::{header::Header, BlockHasher, BlockLevel};
4use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess};
5use kaspa_database::prelude::{CachePolicy, DB};
6use kaspa_database::prelude::{StoreError, StoreResult};
7use kaspa_database::registry::DatabaseStorePrefixes;
8use kaspa_hashes::Hash;
9use kaspa_utils::mem_size::MemSizeEstimator;
10use rocksdb::WriteBatch;
11use serde::{Deserialize, Serialize};
12
13pub trait HeaderStoreReader {
14    fn get_daa_score(&self, hash: Hash) -> Result<u64, StoreError>;
15    fn get_blue_score(&self, hash: Hash) -> Result<u64, StoreError>;
16    fn get_timestamp(&self, hash: Hash) -> Result<u64, StoreError>;
17    fn get_bits(&self, hash: Hash) -> Result<u32, StoreError>;
18    fn get_header(&self, hash: Hash) -> Result<Arc<Header>, StoreError>;
19    fn get_header_with_block_level(&self, hash: Hash) -> Result<HeaderWithBlockLevel, StoreError>;
20    fn get_compact_header_data(&self, hash: Hash) -> Result<CompactHeaderData, StoreError>;
21}
22
23#[derive(Clone, Serialize, Deserialize)]
24pub struct HeaderWithBlockLevel {
25    pub header: Arc<Header>,
26    pub block_level: BlockLevel,
27}
28
29impl MemSizeEstimator for HeaderWithBlockLevel {
30    fn estimate_mem_bytes(&self) -> usize {
31        self.header.as_ref().estimate_mem_bytes() + size_of::<Self>()
32    }
33}
34
35pub trait HeaderStore: HeaderStoreReader {
36    // This is append only
37    fn insert(&self, hash: Hash, header: Arc<Header>, block_level: BlockLevel) -> Result<(), StoreError>;
38    fn delete(&self, hash: Hash) -> Result<(), StoreError>;
39}
40
41#[derive(Clone, Copy, Serialize, Deserialize)]
42pub struct CompactHeaderData {
43    pub daa_score: u64,
44    pub timestamp: u64,
45    pub bits: u32,
46    pub blue_score: u64,
47}
48
49impl MemSizeEstimator for CompactHeaderData {}
50
51impl From<&Header> for CompactHeaderData {
52    fn from(header: &Header) -> Self {
53        Self { daa_score: header.daa_score, timestamp: header.timestamp, bits: header.bits, blue_score: header.blue_score }
54    }
55}
56
57/// A DB + cache implementation of `HeaderStore` trait, with concurrency support.
58#[derive(Clone)]
59pub struct DbHeadersStore {
60    db: Arc<DB>,
61    compact_headers_access: CachedDbAccess<Hash, CompactHeaderData, BlockHasher>,
62    headers_access: CachedDbAccess<Hash, HeaderWithBlockLevel, BlockHasher>,
63}
64
65impl DbHeadersStore {
66    pub fn new(db: Arc<DB>, cache_policy: CachePolicy, compact_cache_policy: CachePolicy) -> Self {
67        Self {
68            db: Arc::clone(&db),
69            compact_headers_access: CachedDbAccess::new(
70                Arc::clone(&db),
71                compact_cache_policy,
72                DatabaseStorePrefixes::HeadersCompact.into(),
73            ),
74            headers_access: CachedDbAccess::new(db, cache_policy, DatabaseStorePrefixes::Headers.into()),
75        }
76    }
77
78    pub fn clone_with_new_cache(&self, cache_policy: CachePolicy, compact_cache_policy: CachePolicy) -> Self {
79        Self::new(Arc::clone(&self.db), cache_policy, compact_cache_policy)
80    }
81
82    pub fn has(&self, hash: Hash) -> StoreResult<bool> {
83        self.headers_access.has(hash)
84    }
85
86    pub fn insert_batch(
87        &self,
88        batch: &mut WriteBatch,
89        hash: Hash,
90        header: Arc<Header>,
91        block_level: BlockLevel,
92    ) -> Result<(), StoreError> {
93        if self.headers_access.has(hash)? {
94            return Err(StoreError::HashAlreadyExists(hash));
95        }
96        self.headers_access.write(BatchDbWriter::new(batch), hash, HeaderWithBlockLevel { header: header.clone(), block_level })?;
97        self.compact_headers_access.write(BatchDbWriter::new(batch), hash, header.as_ref().into())?;
98        Ok(())
99    }
100
101    pub fn delete_batch(&self, batch: &mut WriteBatch, hash: Hash) -> Result<(), StoreError> {
102        self.compact_headers_access.delete(BatchDbWriter::new(batch), hash)?;
103        self.headers_access.delete(BatchDbWriter::new(batch), hash)
104    }
105}
106
107impl HeaderStoreReader for DbHeadersStore {
108    fn get_daa_score(&self, hash: Hash) -> Result<u64, StoreError> {
109        if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) {
110            return Ok(header_with_block_level.header.daa_score);
111        }
112        Ok(self.compact_headers_access.read(hash)?.daa_score)
113    }
114
115    fn get_blue_score(&self, hash: Hash) -> Result<u64, StoreError> {
116        if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) {
117            return Ok(header_with_block_level.header.blue_score);
118        }
119        Ok(self.compact_headers_access.read(hash)?.blue_score)
120    }
121
122    fn get_timestamp(&self, hash: Hash) -> Result<u64, StoreError> {
123        if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) {
124            return Ok(header_with_block_level.header.timestamp);
125        }
126        Ok(self.compact_headers_access.read(hash)?.timestamp)
127    }
128
129    fn get_bits(&self, hash: Hash) -> Result<u32, StoreError> {
130        if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) {
131            return Ok(header_with_block_level.header.bits);
132        }
133        Ok(self.compact_headers_access.read(hash)?.bits)
134    }
135
136    fn get_header(&self, hash: Hash) -> Result<Arc<Header>, StoreError> {
137        Ok(self.headers_access.read(hash)?.header)
138    }
139
140    fn get_header_with_block_level(&self, hash: Hash) -> Result<HeaderWithBlockLevel, StoreError> {
141        self.headers_access.read(hash)
142    }
143
144    fn get_compact_header_data(&self, hash: Hash) -> Result<CompactHeaderData, StoreError> {
145        if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) {
146            return Ok(header_with_block_level.header.as_ref().into());
147        }
148        self.compact_headers_access.read(hash)
149    }
150}
151
152impl HeaderStore for DbHeadersStore {
153    fn insert(&self, hash: Hash, header: Arc<Header>, block_level: u8) -> Result<(), StoreError> {
154        if self.headers_access.has(hash)? {
155            return Err(StoreError::HashAlreadyExists(hash));
156        }
157        if self.compact_headers_access.has(hash)? {
158            return Err(StoreError::DataInconsistency(format!("store has compact data for {} but is missing full data", hash)));
159        }
160        let mut batch = WriteBatch::default();
161        self.compact_headers_access.write(BatchDbWriter::new(&mut batch), hash, header.as_ref().into())?;
162        self.headers_access.write(BatchDbWriter::new(&mut batch), hash, HeaderWithBlockLevel { header, block_level })?;
163        self.db.write(batch)?;
164        Ok(())
165    }
166
167    fn delete(&self, hash: Hash) -> Result<(), StoreError> {
168        let mut batch = WriteBatch::default();
169        self.compact_headers_access.delete(BatchDbWriter::new(&mut batch), hash)?;
170        self.headers_access.delete(BatchDbWriter::new(&mut batch), hash)?;
171        self.db.write(batch)?;
172        Ok(())
173    }
174}