use crate::node::block_processor::{
parse_block_from_wire, prepare_block_validation_context, store_block_with_context_and_index,
validate_block_with_context,
};
use crate::node::metrics::MetricsCollector;
use crate::node::performance::{OperationType, PerformanceProfiler, PerformanceTimer};
use crate::storage::blockstore::BlockStore;
use crate::storage::Storage;
use anyhow::Result;
use blvm_protocol::{BitcoinProtocolEngine, Block, BlockHeader, UtxoSet, ValidationResult};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, warn};
pub trait BlockProvider {
fn get_block(&self, hash: &[u8; 32]) -> Result<Option<Block>>;
fn get_block_header(&self, hash: &[u8; 32]) -> Result<Option<BlockHeader>>;
fn get_best_header(&self) -> Result<Option<BlockHeader>>;
fn store_block(&mut self, block: &Block) -> Result<()>;
fn store_block_header(&mut self, header: &BlockHeader) -> Result<()>;
fn get_block_count(&self) -> Result<u64>;
}
pub struct InMemoryBlockProvider {
blocks: std::collections::HashMap<[u8; 32], Block>,
headers: std::collections::HashMap<[u8; 32], BlockHeader>,
block_count: u64,
}
pub struct SyncStateMachine {
state: SyncState,
best_header: Option<BlockHeader>,
chain_tip: Option<BlockHeader>,
progress: f64,
error_message: Option<String>,
}
impl Default for SyncStateMachine {
fn default() -> Self {
Self::new()
}
}
impl SyncStateMachine {
pub fn new() -> Self {
Self {
state: SyncState::Initial,
best_header: None,
chain_tip: None,
progress: 0.0,
error_message: None,
}
}
pub fn transition_to(&mut self, new_state: SyncState) {
debug!("Sync state transition: {:?} -> {:?}", self.state, new_state);
self.state = new_state;
self.update_progress();
}
pub fn set_error(&mut self, error: String) {
self.state = SyncState::Error(error.clone());
self.error_message = Some(error);
self.progress = 0.0;
}
pub fn update_best_header(&mut self, header: BlockHeader) {
self.best_header = Some(header);
}
pub fn update_chain_tip(&mut self, header: BlockHeader) {
self.chain_tip = Some(header);
}
pub fn state(&self) -> &SyncState {
&self.state
}
pub fn progress(&self) -> f64 {
self.progress
}
pub fn is_synced(&self) -> bool {
matches!(self.state, SyncState::Synced)
}
pub fn best_header(&self) -> Option<&BlockHeader> {
self.best_header.as_ref()
}
pub fn chain_tip(&self) -> Option<&BlockHeader> {
self.chain_tip.as_ref()
}
fn update_progress(&mut self) {
self.progress = match self.state {
SyncState::Initial => 0.0,
SyncState::Headers => 0.3,
SyncState::Blocks => 0.6,
SyncState::Synced => 1.0,
SyncState::Error(_) => 0.0,
};
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncState {
Initial,
Headers,
Blocks,
Synced,
Error(String),
}
impl SyncState {
pub fn as_event_str(&self) -> &'static str {
match self {
SyncState::Initial => "Initial",
SyncState::Headers => "Headers",
SyncState::Blocks => "Blocks",
SyncState::Synced => "Synced",
SyncState::Error(_) => "Error",
}
}
}
pub struct SyncCoordinator {
state_machine: SyncStateMachine,
block_provider: InMemoryBlockProvider,
}
impl Default for SyncCoordinator {
fn default() -> Self {
Self::new()
}
}
impl Clone for SyncCoordinator {
fn clone(&self) -> Self {
Self::new()
}
}
impl SyncCoordinator {
pub fn new() -> Self {
Self {
state_machine: SyncStateMachine::new(),
block_provider: InMemoryBlockProvider::new(),
}
}
pub fn start_sync(&mut self) -> Result<()> {
info!("Starting blockchain sync");
self.state_machine.transition_to(SyncState::Headers);
self.state_machine.transition_to(SyncState::Synced);
Ok(())
}
#[cfg(feature = "production")]
pub async fn start_parallel_ibd(
&mut self,
synced_chain_height: u64,
first_block_height: u64,
target_height: u64,
blockstore: Arc<BlockStore>,
storage: Option<Arc<Storage>>,
protocol: Arc<BitcoinProtocolEngine>,
utxo_set: &mut UtxoSet,
network: Option<Arc<crate::network::NetworkManager>>,
peer_addresses: Vec<String>,
ibd_config: Option<&crate::config::IbdConfig>,
event_publisher: Option<Arc<crate::node::event_publisher::EventPublisher>>,
ibd_data_dir: Option<&std::path::Path>,
) -> Result<bool> {
use crate::node::parallel_ibd::{ParallelIBD, ParallelIBDConfig};
let config = ParallelIBDConfig::from_config(ibd_config);
let min_peers = if config.preferred_peers.is_empty() {
2
} else {
1
};
if peer_addresses.len() < min_peers {
debug!(
"Not enough peers for parallel IBD (have {}, need {}). Sequential sync is not supported.",
peer_addresses.len(), min_peers
);
return Ok(false);
}
if target_height <= synced_chain_height {
debug!(
"Already synced (synced tip {} >= target {}), skipping parallel IBD",
synced_chain_height, target_height
);
return Ok(false);
}
info!(
"Attempting parallel IBD from block height {} (synced tip {}) to {} with {} peers",
first_block_height,
synced_chain_height,
target_height,
peer_addresses.len()
);
let old_state = self.state_machine.state().as_event_str();
self.state_machine.transition_to(SyncState::Blocks);
if let Some(ref ep) = event_publisher {
ep.publish_sync_state_changed(old_state, "Blocks").await;
}
let mut parallel_ibd = ParallelIBD::new(config);
parallel_ibd.initialize_peers(&peer_addresses);
let parallel_ibd = std::sync::Arc::new(parallel_ibd);
let ep_for_completion = event_publisher.clone();
match parallel_ibd
.sync_parallel(
first_block_height,
target_height,
&peer_addresses,
blockstore,
storage.as_ref(),
std::sync::Arc::clone(&protocol),
utxo_set,
network,
event_publisher,
)
.await
{
Ok(()) => {
info!("Parallel IBD completed successfully");
self.state_machine.transition_to(SyncState::Synced);
if let Some(ref ep) = ep_for_completion {
ep.publish_sync_state_changed("Blocks", "Synced").await;
}
Ok(true)
}
Err(e) => {
if let Some(dir) = ibd_data_dir {
if crate::storage::ibd_autorepair::validation_error_suggests_utxo_repair(&e) {
if let Err(flag_e) =
crate::storage::ibd_autorepair::set_ibd_utxo_repair_flag(dir)
{
tracing::warn!("Could not write IBD UTXO repair marker: {}", flag_e);
}
}
}
error!("Parallel IBD failed: {}. Sequential sync is not supported - IBD must succeed in parallel mode.", e);
Err(e)
}
}
}
#[cfg(not(feature = "production"))]
pub async fn start_parallel_ibd(
&mut self,
_synced_chain_height: u64,
_first_block_height: u64,
_target_height: u64,
_blockstore: Arc<BlockStore>,
_storage: Option<Arc<Storage>>,
_protocol: Arc<BitcoinProtocolEngine>,
_utxo_set: &mut UtxoSet,
_network: Option<Arc<crate::network::NetworkManager>>,
_peer_addresses: Vec<String>,
_ibd_config: Option<&crate::config::IbdConfig>,
_event_publisher: Option<Arc<crate::node::event_publisher::EventPublisher>>,
_ibd_data_dir: Option<&std::path::Path>,
) -> Result<bool> {
Ok(false)
}
pub fn progress(&self) -> f64 {
self.state_machine.progress()
}
pub fn is_synced(&self) -> bool {
self.state_machine.is_synced()
}
pub fn current_sync_state(&self) -> SyncState {
self.state_machine.state().clone()
}
pub fn process_block(
&mut self,
blockstore: &BlockStore,
protocol: &BitcoinProtocolEngine,
storage: Option<&Arc<Storage>>,
block_data: &[u8],
current_height: u64,
utxo_set: &mut UtxoSet,
metrics: Option<Arc<MetricsCollector>>,
profiler: Option<Arc<PerformanceProfiler>>,
) -> Result<bool> {
let _timer = profiler
.as_ref()
.map(|p| PerformanceTimer::start(Arc::clone(p), OperationType::BlockProcessing));
let start_time = Instant::now();
let (block, witnesses) = parse_block_from_wire(block_data)?;
let (stored_witnesses, recent_headers) =
prepare_block_validation_context(blockstore, &block, current_height)?;
let witnesses_to_use = if !witnesses.is_empty() {
&witnesses
} else {
&stored_witnesses
};
if let Some(ref headers) = recent_headers {
debug!(
"Using {} recent headers for BIP113 median time validation",
headers.len()
);
}
let validation_result = validate_block_with_context(
blockstore,
protocol,
&block,
witnesses_to_use,
utxo_set,
current_height,
)?;
let processing_time = start_time.elapsed();
if matches!(validation_result, ValidationResult::Valid) {
store_block_with_context_and_index(
blockstore,
storage,
&block,
witnesses_to_use,
current_height,
)?;
if let Some(ref metrics) = metrics {
metrics.update_storage(|m| {
m.block_count += 1;
m.transaction_count += block.transactions.len();
});
metrics.update_performance(|m| {
let time_ms = processing_time.as_secs_f64() * 1000.0;
m.avg_block_processing_time_ms =
(m.avg_block_processing_time_ms * 0.9) + (time_ms * 0.1);
if processing_time.as_secs_f64() > 0.0 {
m.blocks_per_second = 1.0 / processing_time.as_secs_f64();
}
});
}
info!(
"Block validated and stored at height {} (took {:?})",
current_height, processing_time
);
Ok(true)
} else {
error!("Block validation failed at height {}", current_height);
Ok(false)
}
}
}
impl Default for InMemoryBlockProvider {
fn default() -> Self {
Self::new()
}
}
impl InMemoryBlockProvider {
pub fn new() -> Self {
Self {
blocks: std::collections::HashMap::new(),
headers: std::collections::HashMap::new(),
block_count: 0,
}
}
fn calculate_block_hash(&self, block: &Block) -> [u8; 32] {
let mut hash = [0u8; 32];
hash[0] = block.header.version as u8;
hash[1] = block.transactions.len() as u8;
hash
}
fn calculate_header_hash(&self, header: &BlockHeader) -> [u8; 32] {
let mut hash = [0u8; 32];
hash[0] = header.version as u8;
hash[1] = header.timestamp as u8;
hash
}
}
impl BlockProvider for InMemoryBlockProvider {
fn get_block(&self, hash: &[u8; 32]) -> Result<Option<Block>> {
Ok(self.blocks.get(hash).cloned())
}
fn get_block_header(&self, hash: &[u8; 32]) -> Result<Option<BlockHeader>> {
Ok(self.headers.get(hash).cloned())
}
fn get_best_header(&self) -> Result<Option<BlockHeader>> {
Ok(self.headers.values().last().cloned())
}
fn store_block(&mut self, block: &Block) -> Result<()> {
let hash = self.calculate_block_hash(block);
self.blocks.insert(hash, block.clone());
self.block_count += 1;
Ok(())
}
fn store_block_header(&mut self, header: &BlockHeader) -> Result<()> {
let hash = self.calculate_header_hash(header);
self.headers.insert(hash, header.clone());
Ok(())
}
fn get_block_count(&self) -> Result<u64> {
Ok(self.block_count)
}
}
pub struct MockBlockProvider {
blocks: HashMap<[u8; 32], Block>,
headers: HashMap<[u8; 32], BlockHeader>,
best_header: Option<BlockHeader>,
block_count: u64,
}
impl Default for MockBlockProvider {
fn default() -> Self {
Self::new()
}
}
impl MockBlockProvider {
pub fn new() -> Self {
Self {
blocks: HashMap::new(),
headers: HashMap::new(),
best_header: None,
block_count: 0,
}
}
fn calculate_block_hash(&self, block: &Block) -> [u8; 32] {
let mut hash = [0u8; 32];
hash[0] = block.header.version as u8;
hash[1] = block.transactions.len() as u8;
hash
}
fn calculate_header_hash(&self, header: &BlockHeader) -> [u8; 32] {
let mut hash = [0u8; 32];
hash[0] = header.version as u8;
hash[1] = header.timestamp as u8;
hash
}
pub fn add_block(&mut self, block: Block) {
let hash = self.calculate_block_hash(&block);
self.blocks.insert(hash, block);
self.block_count += 1;
}
pub fn add_header(&mut self, header: BlockHeader) {
let hash = self.calculate_header_hash(&header);
self.headers.insert(hash, header.clone());
if self.best_header.is_none() {
self.best_header = Some(header);
}
}
pub fn set_best_header(&mut self, header: BlockHeader) {
self.best_header = Some(header);
}
pub fn set_block_count(&mut self, count: u64) {
self.block_count = count;
}
}
impl BlockProvider for MockBlockProvider {
fn get_block(&self, hash: &[u8; 32]) -> Result<Option<Block>> {
Ok(self.blocks.get(hash).cloned())
}
fn get_block_header(&self, hash: &[u8; 32]) -> Result<Option<BlockHeader>> {
Ok(self.headers.get(hash).cloned())
}
fn get_best_header(&self) -> Result<Option<BlockHeader>> {
Ok(self.best_header.clone())
}
fn store_block(&mut self, block: &Block) -> Result<()> {
let hash = self.calculate_block_hash(block);
self.blocks.insert(hash, block.clone());
self.block_count += 1;
Ok(())
}
fn store_block_header(&mut self, header: &BlockHeader) -> Result<()> {
let hash = self.calculate_header_hash(header);
self.headers.insert(hash, header.clone());
self.best_header = Some(header.clone());
Ok(())
}
fn get_block_count(&self) -> Result<u64> {
Ok(self.block_count)
}
}
#[cfg(feature = "utxo-commitments")]
pub async fn run_utxo_commitments_initial_sync(
network_manager: std::sync::Arc<tokio::sync::RwLock<crate::network::NetworkManager>>,
headers: &[blvm_protocol::BlockHeader],
peers: Vec<(
blvm_protocol::utxo_commitments::peer_consensus::PeerInfo,
String,
)>,
) -> anyhow::Result<blvm_protocol::utxo_commitments::data_structures::UtxoCommitment> {
use crate::network::utxo_commitments_client::UtxoCommitmentsClient;
use blvm_protocol::utxo_commitments::initial_sync::InitialSync;
use blvm_protocol::utxo_commitments::peer_consensus::ConsensusConfig;
let client = UtxoCommitmentsClient::new(std::sync::Arc::clone(&network_manager));
let config = ConsensusConfig::default();
let initial_sync = InitialSync::new(config);
let commitment = initial_sync
.execute_initial_sync(&peers, headers, &client)
.await?;
Ok(commitment)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sync_coordinator_new() {
let coordinator = SyncCoordinator::new();
assert_eq!(coordinator.progress(), 0.0);
}
}