use super::{ctl::Ctl, Consensus};
use crate::{model::stores::U64Key, pipeline::ProcessingCounters};
use kaspa_consensus_core::config::Config;
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
use kaspa_consensusmanager::{ConsensusFactory, ConsensusInstance, DynConsensusCtl};
use kaspa_core::time::unix_now;
use kaspa_database::prelude::{
BatchDbWriter, CachedDbAccess, CachedDbItem, DirectDbWriter, StoreError, StoreResult, StoreResultExtensions, DB,
};
use parking_lot::RwLock;
use rocksdb::WriteBatch;
use serde::{Deserialize, Serialize};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::RwLock as TokioRwLock;
#[derive(Serialize, Deserialize, Clone)]
pub struct ConsensusEntry {
key: u64,
directory_name: String,
creation_timestamp: u64,
}
impl ConsensusEntry {
pub fn new(key: u64, directory_name: String, creation_timestamp: u64) -> Self {
Self { key, directory_name, creation_timestamp }
}
pub fn from_key(key: u64) -> Self {
Self { key, directory_name: format!("consensus-{:0>3}", key), creation_timestamp: unix_now() }
}
}
pub enum ConsensusEntryType {
Existing(ConsensusEntry),
New(ConsensusEntry),
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct MultiConsensusMetadata {
current_consensus_key: Option<u64>,
staging_consensus_key: Option<u64>,
max_key_used: u64,
is_archival_node: bool,
}
const CONSENSUS_ENTRIES_PREFIX: &[u8] = b"consensus-entries-prefix";
const MULTI_CONSENSUS_METADATA_KEY: &[u8] = b"multi-consensus-metadata-key";
#[derive(Clone)]
pub struct MultiConsensusManagementStore {
db: Arc<DB>,
entries: CachedDbAccess<U64Key, ConsensusEntry>,
metadata: CachedDbItem<MultiConsensusMetadata>,
}
impl MultiConsensusManagementStore {
pub fn new(db: Arc<DB>) -> Self {
let mut store = Self {
db: db.clone(),
entries: CachedDbAccess::new(db.clone(), 16, CONSENSUS_ENTRIES_PREFIX.to_vec()),
metadata: CachedDbItem::new(db, MULTI_CONSENSUS_METADATA_KEY.to_vec()),
};
store.init();
store
}
fn init(&mut self) {
if self.metadata.read().unwrap_option().is_none() {
let mut batch = WriteBatch::default();
let metadata = MultiConsensusMetadata::default();
self.metadata.write(BatchDbWriter::new(&mut batch), &metadata).unwrap();
self.db.write(batch).unwrap();
}
}
pub fn active_consensus_entry(&mut self) -> StoreResult<ConsensusEntryType> {
let mut metadata = self.metadata.read()?;
match metadata.current_consensus_key {
Some(key) => Ok(ConsensusEntryType::Existing(self.entries.read(key.into())?)),
None => {
metadata.max_key_used += 1; let key = metadata.max_key_used;
self.metadata.write(DirectDbWriter::new(&self.db), &metadata)?;
Ok(ConsensusEntryType::New(ConsensusEntry::from_key(key)))
}
}
}
pub fn save_new_active_consensus(&mut self, entry: ConsensusEntry) -> StoreResult<()> {
let key = entry.key;
if self.entries.has(key.into())? {
return Err(StoreError::KeyAlreadyExists(format!("{key}")));
}
let mut batch = WriteBatch::default();
self.entries.write(BatchDbWriter::new(&mut batch), key.into(), entry)?;
self.metadata.update(BatchDbWriter::new(&mut batch), |mut data| {
data.current_consensus_key = Some(key);
data
})?;
self.db.write(batch)?;
Ok(())
}
pub fn new_staging_consensus_entry(&mut self) -> StoreResult<ConsensusEntry> {
let mut metadata = self.metadata.read()?;
metadata.max_key_used += 1;
let new_key = metadata.max_key_used;
metadata.staging_consensus_key = Some(new_key);
let new_entry = ConsensusEntry::from_key(new_key);
let mut batch = WriteBatch::default();
self.metadata.write(BatchDbWriter::new(&mut batch), &metadata)?;
self.entries.write(BatchDbWriter::new(&mut batch), new_key.into(), new_entry.clone())?;
self.db.write(batch)?;
Ok(new_entry)
}
pub fn commit_staging_consensus(&mut self) -> StoreResult<()> {
self.metadata.update(DirectDbWriter::new(&self.db), |mut data| {
assert!(data.staging_consensus_key.is_some());
data.current_consensus_key = data.staging_consensus_key.take();
data
})?;
Ok(())
}
pub fn cancel_staging_consensus(&mut self) -> StoreResult<()> {
self.metadata.update(DirectDbWriter::new(&self.db), |mut data| {
data.staging_consensus_key = None;
data
})?;
Ok(())
}
}
pub struct Factory {
management_store: Arc<RwLock<MultiConsensusManagementStore>>,
config: Config,
db_root_dir: PathBuf,
db_parallelism: usize,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
}
impl Factory {
pub fn new(
management_db: Arc<DB>,
config: &Config,
db_root_dir: PathBuf,
db_parallelism: usize,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
) -> Self {
let mut config = config.clone();
config.process_genesis = false;
Self {
management_store: Arc::new(RwLock::new(MultiConsensusManagementStore::new(management_db))),
config,
db_root_dir,
db_parallelism,
notification_root,
counters,
}
}
}
impl ConsensusFactory for Factory {
fn new_active_consensus(&self) -> (ConsensusInstance, DynConsensusCtl) {
let mut config = self.config.clone();
let mut is_new_consensus = false;
let entry = match self.management_store.write().active_consensus_entry().unwrap() {
ConsensusEntryType::Existing(entry) => {
config.process_genesis = false;
entry
}
ConsensusEntryType::New(entry) => {
config.process_genesis = true;
is_new_consensus = true;
entry
}
};
let dir = self.db_root_dir.join(entry.directory_name.clone());
let db = kaspa_database::prelude::open_db(dir, true, self.db_parallelism);
let session_lock = Arc::new(TokioRwLock::new(()));
let consensus = Arc::new(Consensus::new(db.clone(), Arc::new(config), self.notification_root.clone(), self.counters.clone()));
if is_new_consensus {
self.management_store.write().save_new_active_consensus(entry).unwrap();
}
(ConsensusInstance::new(session_lock, consensus.clone()), Arc::new(Ctl::new(self.management_store.clone(), db, consensus)))
}
fn new_staging_consensus(&self) -> (ConsensusInstance, DynConsensusCtl) {
let entry = self.management_store.write().new_staging_consensus_entry().unwrap();
let dir = self.db_root_dir.join(entry.directory_name);
let db = kaspa_database::prelude::open_db(dir, true, self.db_parallelism);
let session_lock = Arc::new(TokioRwLock::new(()));
let consensus = Arc::new(Consensus::new(
db.clone(),
Arc::new(self.config.to_builder().skip_adding_genesis().build()),
self.notification_root.clone(),
self.counters.clone(),
));
(ConsensusInstance::new(session_lock, consensus.clone()), Arc::new(Ctl::new(self.management_store.clone(), db, consensus)))
}
}